mirror of
https://github.com/redis/redis.git
synced 2026-05-28 04:02:46 -04:00
Don't call kvobjAllocSize() in t_stream.c if memory tracking is not enabled (#14786)
Some checks are pending
CI / test-ubuntu-latest (push) Waiting to run
CI / test-sanitizer-address (push) Waiting to run
CI / build-debian-old (push) Waiting to run
CI / build-macos-latest (push) Waiting to run
CI / build-32bit (push) Waiting to run
CI / build-libc-malloc (push) Waiting to run
CI / build-centos-jemalloc (push) Waiting to run
CI / build-old-chain-jemalloc (push) Waiting to run
Codecov / code-coverage (push) Waiting to run
External Server Tests / test-external-standalone (push) Waiting to run
External Server Tests / test-external-cluster (push) Waiting to run
External Server Tests / test-external-nodebug (push) Waiting to run
Spellcheck / Spellcheck (push) Waiting to run
Some checks are pending
CI / test-ubuntu-latest (push) Waiting to run
CI / test-sanitizer-address (push) Waiting to run
CI / build-debian-old (push) Waiting to run
CI / build-macos-latest (push) Waiting to run
CI / build-32bit (push) Waiting to run
CI / build-libc-malloc (push) Waiting to run
CI / build-centos-jemalloc (push) Waiting to run
CI / build-old-chain-jemalloc (push) Waiting to run
Codecov / code-coverage (push) Waiting to run
External Server Tests / test-external-standalone (push) Waiting to run
External Server Tests / test-external-cluster (push) Waiting to run
External Server Tests / test-external-nodebug (push) Waiting to run
Spellcheck / Spellcheck (push) Waiting to run
Don't call kvobjAllocSize() in t_stream.c if memory tracking is not enabled. Reported-by: Sergei Georgiev <s_ggeorgiev@yahoo.com>
This commit is contained in:
parent
8e8483aac4
commit
3cc7e9956f
3 changed files with 32 additions and 24 deletions
|
|
@ -5971,7 +5971,7 @@ int RM_StreamAdd(RedisModuleKey *key, int flags, RedisModuleStreamID *id, RedisM
|
|||
use_id_ptr = &use_id;
|
||||
}
|
||||
|
||||
size_t oldsize = kvobjAllocSize(key->kv);
|
||||
size_t oldsize = server.memory_tracking_enabled ? kvobjAllocSize(key->kv) : 0;
|
||||
if (streamAppendItem(s,argv,numfields,&added_id,use_id_ptr,1) == C_ERR) {
|
||||
/* Either the ID not greater than all existing IDs in the stream, or
|
||||
* the elements are too large to be stored. either way, errno is already
|
||||
|
|
@ -6024,7 +6024,7 @@ int RM_StreamDelete(RedisModuleKey *key, RedisModuleStreamID *id) {
|
|||
return REDISMODULE_ERR;
|
||||
}
|
||||
stream *s = key->kv->ptr;
|
||||
size_t oldsize = kvobjAllocSize(key->kv);
|
||||
size_t oldsize = server.memory_tracking_enabled ? kvobjAllocSize(key->kv) : 0;
|
||||
streamID streamid = {id->ms, id->seq};
|
||||
if (streamDeleteItem(s, &streamid)) {
|
||||
if (server.memory_tracking_enabled)
|
||||
|
|
@ -6328,7 +6328,7 @@ long long RM_StreamTrimByLength(RedisModuleKey *key, int flags, long long length
|
|||
}
|
||||
int approx = flags & REDISMODULE_STREAM_TRIM_APPROX ? 1 : 0;
|
||||
stream *s = key->kv->ptr;
|
||||
size_t oldsize = kvobjAllocSize(key->kv);
|
||||
size_t oldsize = server.memory_tracking_enabled ? kvobjAllocSize(key->kv) : 0;
|
||||
long long retval = streamTrimByLength(s, length, approx);
|
||||
if (server.memory_tracking_enabled)
|
||||
updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv));
|
||||
|
|
@ -6364,7 +6364,7 @@ long long RM_StreamTrimByID(RedisModuleKey *key, int flags, RedisModuleStreamID
|
|||
int approx = flags & REDISMODULE_STREAM_TRIM_APPROX ? 1 : 0;
|
||||
streamID minid = (streamID){id->ms, id->seq};
|
||||
stream *s = key->kv->ptr;
|
||||
size_t oldsize = kvobjAllocSize(key->kv);
|
||||
size_t oldsize = server.memory_tracking_enabled ? kvobjAllocSize(key->kv) : 0;
|
||||
long long retval = streamTrimByID(s, minid, approx);
|
||||
if (server.memory_tracking_enabled)
|
||||
updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv));
|
||||
|
|
|
|||
|
|
@ -1235,7 +1235,8 @@ void lmoveGenericCommand(client *c, int wherefrom, int whereto) {
|
|||
/* Update dst obj cardinality in KEYSIZES */
|
||||
updateKeysizesHist(c->db, getKeySlot(c->argv[2]->ptr), OBJ_LIST, oldlen, newlen);
|
||||
/* Update src obj cardinality in KEYSIZES by listElementsRemoved() */
|
||||
listElementsRemoved(c, skey, wherefrom, kvsrc, 1, kvobjAllocSize(kvsrc), 1, NULL);
|
||||
size_t srcsize = server.memory_tracking_enabled ? kvobjAllocSize(kvsrc) : 0;
|
||||
listElementsRemoved(c, skey, wherefrom, kvsrc, 1, srcsize, 1, NULL);
|
||||
/* listTypePop returns an object with its refcount incremented */
|
||||
decrRefCount(value);
|
||||
|
||||
|
|
|
|||
|
|
@ -2435,7 +2435,7 @@ void xaddCommand(client *c) {
|
|||
stream *s;
|
||||
if ((kv = streamTypeLookupWriteOrCreate(c,c->argv[1],parsed_args.no_mkstream)) == NULL) return;
|
||||
s = kv->ptr;
|
||||
size_t old_alloc = kvobjAllocSize(kv);
|
||||
size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(kv) : 0;
|
||||
|
||||
/* IDMP: Check if IID already exists, save IID for later insertion */
|
||||
XXH128_hash_t hash;
|
||||
|
|
@ -2564,7 +2564,7 @@ void xrangeGenericCommand(client *c, int rev) {
|
|||
robj *startarg = rev ? c->argv[3] : c->argv[2];
|
||||
robj *endarg = rev ? c->argv[2] : c->argv[3];
|
||||
int startex = 0, endex = 0;
|
||||
size_t old_alloc;
|
||||
size_t old_alloc = 0;
|
||||
|
||||
/* Parse start and end IDs. */
|
||||
if (streamParseIntervalIDOrReply(c,startarg,&startid,&startex,0) != C_OK)
|
||||
|
|
@ -2606,7 +2606,8 @@ void xrangeGenericCommand(client *c, int rev) {
|
|||
addReplyNullArray(c);
|
||||
} else {
|
||||
if (count == -1) count = 0;
|
||||
old_alloc = kvobjAllocSize(kv);
|
||||
if (server.memory_tracking_enabled)
|
||||
old_alloc = kvobjAllocSize(kv);
|
||||
streamReplyWithRange(c,s,&startid,&endid,count,rev,-1,NULL,NULL,0,NULL,NULL);
|
||||
if (server.memory_tracking_enabled)
|
||||
updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),kv,old_alloc,kvobjAllocSize(kv));
|
||||
|
|
@ -2653,7 +2654,7 @@ void xreadCommand(client *c) {
|
|||
int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */
|
||||
robj *groupname = NULL;
|
||||
robj *consumername = NULL;
|
||||
size_t old_alloc;
|
||||
size_t old_alloc = 0;
|
||||
|
||||
/* Parse arguments. */
|
||||
for (int i = 1; i < c->argc; i++) {
|
||||
|
|
@ -2881,7 +2882,8 @@ void xreadCommand(client *c) {
|
|||
}
|
||||
consumer = streamLookupConsumer(groups[i],consumername->ptr);
|
||||
if (consumer == NULL) {
|
||||
old_alloc = kvobjAllocSize(o);
|
||||
if (server.memory_tracking_enabled)
|
||||
old_alloc = kvobjAllocSize(o);
|
||||
consumer = streamCreateConsumer(s,groups[i],consumername->ptr,
|
||||
c->argv[streams_arg+i],
|
||||
c->db->id,SCC_DEFAULT);
|
||||
|
|
@ -2933,7 +2935,8 @@ void xreadCommand(client *c) {
|
|||
unsigned long propCount = 0;
|
||||
if (noack) flags |= STREAM_RWR_NOACK;
|
||||
if (serve_history) flags |= STREAM_RWR_HISTORY;
|
||||
old_alloc = kvobjAllocSize(o);
|
||||
if (server.memory_tracking_enabled)
|
||||
old_alloc = kvobjAllocSize(o);
|
||||
streamReplyWithRange(c,s,&start,NULL,count,0, min_idle_time,
|
||||
groups ? groups[i] : NULL,
|
||||
consumer, flags, &spi, &propCount);
|
||||
|
|
@ -3356,7 +3359,7 @@ void xgroupCommand(client *c) {
|
|||
int mkstream = 0;
|
||||
long long entries_read = SCG_INVALID_ENTRIES_READ;
|
||||
robj *o;
|
||||
size_t old_alloc;
|
||||
size_t old_alloc = 0;
|
||||
|
||||
/* Everything but the "HELP" option requires a key and group name. */
|
||||
if (c->argc >= 4) {
|
||||
|
|
@ -3460,7 +3463,8 @@ NULL
|
|||
entries_read = s->entries_added;
|
||||
}
|
||||
|
||||
old_alloc = kvobjAllocSize(o);
|
||||
if (server.memory_tracking_enabled)
|
||||
old_alloc = kvobjAllocSize(o);
|
||||
streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id,entries_read);
|
||||
if (cg) {
|
||||
if (server.memory_tracking_enabled)
|
||||
|
|
@ -3493,7 +3497,8 @@ NULL
|
|||
keyModified(c,c->db,c->argv[2],o,0);
|
||||
} else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) {
|
||||
if (cg) {
|
||||
old_alloc = kvobjAllocSize(o);
|
||||
if (server.memory_tracking_enabled)
|
||||
old_alloc = kvobjAllocSize(o);
|
||||
raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL);
|
||||
streamDestroyCG(s, cg);
|
||||
if (server.memory_tracking_enabled)
|
||||
|
|
@ -3509,7 +3514,8 @@ NULL
|
|||
addReply(c,shared.czero);
|
||||
}
|
||||
} else if (!strcasecmp(opt,"CREATECONSUMER") && c->argc == 5) {
|
||||
old_alloc = kvobjAllocSize(o);
|
||||
if (server.memory_tracking_enabled)
|
||||
old_alloc = kvobjAllocSize(o);
|
||||
streamConsumer *created = streamCreateConsumer(s,cg,c->argv[4]->ptr,c->argv[2],
|
||||
c->db->id,SCC_DEFAULT);
|
||||
keyModified(c,c->db,c->argv[2],o,0);
|
||||
|
|
@ -3522,7 +3528,8 @@ NULL
|
|||
if (consumer) {
|
||||
/* Delete the consumer and returns the number of pending messages
|
||||
* that were yet associated with such a consumer. */
|
||||
old_alloc = kvobjAllocSize(o);
|
||||
if (server.memory_tracking_enabled)
|
||||
old_alloc = kvobjAllocSize(o);
|
||||
pending = raxSize(consumer->pel);
|
||||
streamDelConsumer(s,cg,consumer);
|
||||
if (server.memory_tracking_enabled)
|
||||
|
|
@ -3650,7 +3657,7 @@ void xackCommand(client *c) {
|
|||
}
|
||||
|
||||
int acknowledged = 0;
|
||||
size_t old_alloc = kvobjAllocSize(kv);
|
||||
size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(kv) : 0;
|
||||
for (int j = 3; j < c->argc; j++) {
|
||||
unsigned char buf[sizeof(streamID)];
|
||||
streamEncodeID(buf,&ids[j-3]);
|
||||
|
|
@ -3721,7 +3728,7 @@ void xackdelCommand(client *c) {
|
|||
}
|
||||
|
||||
s = kv->ptr;
|
||||
size_t old_alloc = kvobjAllocSize(kv);
|
||||
size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(kv) : 0;
|
||||
int first_entry = 0;
|
||||
int deleted = 0, dirty = server.dirty;
|
||||
addReplyArrayLen(c, args.numids);
|
||||
|
|
@ -4149,7 +4156,7 @@ void xclaimCommand(client *c) {
|
|||
|
||||
/* Do the actual claiming. */
|
||||
stream *s = o->ptr;
|
||||
size_t old_alloc = kvobjAllocSize(o);
|
||||
size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(o) : 0;
|
||||
streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr);
|
||||
if (consumer == NULL) {
|
||||
consumer = streamCreateConsumer(o->ptr,group,c->argv[3]->ptr,c->argv[1],c->db->id,SCC_DEFAULT);
|
||||
|
|
@ -4342,7 +4349,7 @@ void xautoclaimCommand(client *c) {
|
|||
|
||||
/* Do the actual claiming. */
|
||||
stream *s = o->ptr;
|
||||
size_t old_alloc = kvobjAllocSize(o);
|
||||
size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(o) : 0;
|
||||
streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr);
|
||||
if (consumer == NULL) {
|
||||
consumer = streamCreateConsumer(o->ptr,group,c->argv[3]->ptr,c->argv[1],c->db->id,SCC_DEFAULT);
|
||||
|
|
@ -4471,7 +4478,7 @@ void xdelCommand(client *c) {
|
|||
kvobj *kv = lookupKeyWriteOrReply(c, c->argv[1], shared.czero);
|
||||
if (kv == NULL || checkType(c, kv, OBJ_STREAM)) return;
|
||||
stream *s = kv->ptr;
|
||||
size_t old_alloc = kvobjAllocSize(kv);
|
||||
size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(kv) : 0;
|
||||
|
||||
/* We need to sanity check the IDs passed to start. Even if not
|
||||
* a big issue, it is not great that the command is only partially
|
||||
|
|
@ -4569,7 +4576,7 @@ void xdelexCommand(client *c) {
|
|||
}
|
||||
|
||||
stream *s = kv->ptr;
|
||||
size_t old_alloc = kvobjAllocSize(kv);
|
||||
size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(kv) : 0;
|
||||
int first_entry = 0;
|
||||
int deleted = 0;
|
||||
addReplyArrayLen(c, args.numids);
|
||||
|
|
@ -4674,7 +4681,7 @@ void xtrimCommand(client *c) {
|
|||
stream *s = kv->ptr;
|
||||
|
||||
/* Perform the trimming. */
|
||||
size_t old_alloc = kvobjAllocSize(kv);
|
||||
size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(kv) : 0;
|
||||
int64_t deleted = streamTrim(s, &parsed_args);
|
||||
if (server.memory_tracking_enabled)
|
||||
updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),kv,old_alloc,kvobjAllocSize(kv));
|
||||
|
|
@ -4774,7 +4781,7 @@ void xinfoReplyWithStreamInfo(client *c, kvobj *kv) {
|
|||
addReplyBulkCString(c,"iids-duplicates");
|
||||
addReplyLongLong(c,s->iids_duplicates);
|
||||
|
||||
size_t old_alloc = kvobjAllocSize(kv);
|
||||
size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(kv) : 0;
|
||||
if (!full) {
|
||||
/* XINFO STREAM <key> */
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue