diff --git a/src/module.c b/src/module.c index 42005dfb5..cea2f1322 100644 --- a/src/module.c +++ b/src/module.c @@ -5971,7 +5971,7 @@ int RM_StreamAdd(RedisModuleKey *key, int flags, RedisModuleStreamID *id, RedisM use_id_ptr = &use_id; } - size_t oldsize = kvobjAllocSize(key->kv); + size_t oldsize = server.memory_tracking_enabled ? kvobjAllocSize(key->kv) : 0; if (streamAppendItem(s,argv,numfields,&added_id,use_id_ptr,1) == C_ERR) { /* Either the ID not greater than all existing IDs in the stream, or * the elements are too large to be stored. either way, errno is already @@ -6024,7 +6024,7 @@ int RM_StreamDelete(RedisModuleKey *key, RedisModuleStreamID *id) { return REDISMODULE_ERR; } stream *s = key->kv->ptr; - size_t oldsize = kvobjAllocSize(key->kv); + size_t oldsize = server.memory_tracking_enabled ? kvobjAllocSize(key->kv) : 0; streamID streamid = {id->ms, id->seq}; if (streamDeleteItem(s, &streamid)) { if (server.memory_tracking_enabled) @@ -6328,7 +6328,7 @@ long long RM_StreamTrimByLength(RedisModuleKey *key, int flags, long long length } int approx = flags & REDISMODULE_STREAM_TRIM_APPROX ? 1 : 0; stream *s = key->kv->ptr; - size_t oldsize = kvobjAllocSize(key->kv); + size_t oldsize = server.memory_tracking_enabled ? kvobjAllocSize(key->kv) : 0; long long retval = streamTrimByLength(s, length, approx); if (server.memory_tracking_enabled) updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv)); @@ -6364,7 +6364,7 @@ long long RM_StreamTrimByID(RedisModuleKey *key, int flags, RedisModuleStreamID int approx = flags & REDISMODULE_STREAM_TRIM_APPROX ? 1 : 0; streamID minid = (streamID){id->ms, id->seq}; stream *s = key->kv->ptr; - size_t oldsize = kvobjAllocSize(key->kv); + size_t oldsize = server.memory_tracking_enabled ? kvobjAllocSize(key->kv) : 0; long long retval = streamTrimByID(s, minid, approx); if (server.memory_tracking_enabled) updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv)); diff --git a/src/t_list.c b/src/t_list.c index ba982a5e7..bfba02faa 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -1235,7 +1235,8 @@ void lmoveGenericCommand(client *c, int wherefrom, int whereto) { /* Update dst obj cardinality in KEYSIZES */ updateKeysizesHist(c->db, getKeySlot(c->argv[2]->ptr), OBJ_LIST, oldlen, newlen); /* Update src obj cardinality in KEYSIZES by listElementsRemoved() */ - listElementsRemoved(c, skey, wherefrom, kvsrc, 1, kvobjAllocSize(kvsrc), 1, NULL); + size_t srcsize = server.memory_tracking_enabled ? kvobjAllocSize(kvsrc) : 0; + listElementsRemoved(c, skey, wherefrom, kvsrc, 1, srcsize, 1, NULL); /* listTypePop returns an object with its refcount incremented */ decrRefCount(value); diff --git a/src/t_stream.c b/src/t_stream.c index d65606312..29a4ed2c5 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2435,7 +2435,7 @@ void xaddCommand(client *c) { stream *s; if ((kv = streamTypeLookupWriteOrCreate(c,c->argv[1],parsed_args.no_mkstream)) == NULL) return; s = kv->ptr; - size_t old_alloc = kvobjAllocSize(kv); + size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(kv) : 0; /* IDMP: Check if IID already exists, save IID for later insertion */ XXH128_hash_t hash; @@ -2564,7 +2564,7 @@ void xrangeGenericCommand(client *c, int rev) { robj *startarg = rev ? c->argv[3] : c->argv[2]; robj *endarg = rev ? c->argv[2] : c->argv[3]; int startex = 0, endex = 0; - size_t old_alloc; + size_t old_alloc = 0; /* Parse start and end IDs. */ if (streamParseIntervalIDOrReply(c,startarg,&startid,&startex,0) != C_OK) @@ -2606,7 +2606,8 @@ void xrangeGenericCommand(client *c, int rev) { addReplyNullArray(c); } else { if (count == -1) count = 0; - old_alloc = kvobjAllocSize(kv); + if (server.memory_tracking_enabled) + old_alloc = kvobjAllocSize(kv); streamReplyWithRange(c,s,&startid,&endid,count,rev,-1,NULL,NULL,0,NULL,NULL); if (server.memory_tracking_enabled) updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),kv,old_alloc,kvobjAllocSize(kv)); @@ -2653,7 +2654,7 @@ void xreadCommand(client *c) { int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */ robj *groupname = NULL; robj *consumername = NULL; - size_t old_alloc; + size_t old_alloc = 0; /* Parse arguments. */ for (int i = 1; i < c->argc; i++) { @@ -2881,7 +2882,8 @@ void xreadCommand(client *c) { } consumer = streamLookupConsumer(groups[i],consumername->ptr); if (consumer == NULL) { - old_alloc = kvobjAllocSize(o); + if (server.memory_tracking_enabled) + old_alloc = kvobjAllocSize(o); consumer = streamCreateConsumer(s,groups[i],consumername->ptr, c->argv[streams_arg+i], c->db->id,SCC_DEFAULT); @@ -2933,7 +2935,8 @@ void xreadCommand(client *c) { unsigned long propCount = 0; if (noack) flags |= STREAM_RWR_NOACK; if (serve_history) flags |= STREAM_RWR_HISTORY; - old_alloc = kvobjAllocSize(o); + if (server.memory_tracking_enabled) + old_alloc = kvobjAllocSize(o); streamReplyWithRange(c,s,&start,NULL,count,0, min_idle_time, groups ? groups[i] : NULL, consumer, flags, &spi, &propCount); @@ -3356,7 +3359,7 @@ void xgroupCommand(client *c) { int mkstream = 0; long long entries_read = SCG_INVALID_ENTRIES_READ; robj *o; - size_t old_alloc; + size_t old_alloc = 0; /* Everything but the "HELP" option requires a key and group name. */ if (c->argc >= 4) { @@ -3460,7 +3463,8 @@ NULL entries_read = s->entries_added; } - old_alloc = kvobjAllocSize(o); + if (server.memory_tracking_enabled) + old_alloc = kvobjAllocSize(o); streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id,entries_read); if (cg) { if (server.memory_tracking_enabled) @@ -3493,7 +3497,8 @@ NULL keyModified(c,c->db,c->argv[2],o,0); } else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) { if (cg) { - old_alloc = kvobjAllocSize(o); + if (server.memory_tracking_enabled) + old_alloc = kvobjAllocSize(o); raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL); streamDestroyCG(s, cg); if (server.memory_tracking_enabled) @@ -3509,7 +3514,8 @@ NULL addReply(c,shared.czero); } } else if (!strcasecmp(opt,"CREATECONSUMER") && c->argc == 5) { - old_alloc = kvobjAllocSize(o); + if (server.memory_tracking_enabled) + old_alloc = kvobjAllocSize(o); streamConsumer *created = streamCreateConsumer(s,cg,c->argv[4]->ptr,c->argv[2], c->db->id,SCC_DEFAULT); keyModified(c,c->db,c->argv[2],o,0); @@ -3522,7 +3528,8 @@ NULL if (consumer) { /* Delete the consumer and returns the number of pending messages * that were yet associated with such a consumer. */ - old_alloc = kvobjAllocSize(o); + if (server.memory_tracking_enabled) + old_alloc = kvobjAllocSize(o); pending = raxSize(consumer->pel); streamDelConsumer(s,cg,consumer); if (server.memory_tracking_enabled) @@ -3650,7 +3657,7 @@ void xackCommand(client *c) { } int acknowledged = 0; - size_t old_alloc = kvobjAllocSize(kv); + size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(kv) : 0; for (int j = 3; j < c->argc; j++) { unsigned char buf[sizeof(streamID)]; streamEncodeID(buf,&ids[j-3]); @@ -3721,7 +3728,7 @@ void xackdelCommand(client *c) { } s = kv->ptr; - size_t old_alloc = kvobjAllocSize(kv); + size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(kv) : 0; int first_entry = 0; int deleted = 0, dirty = server.dirty; addReplyArrayLen(c, args.numids); @@ -4149,7 +4156,7 @@ void xclaimCommand(client *c) { /* Do the actual claiming. */ stream *s = o->ptr; - size_t old_alloc = kvobjAllocSize(o); + size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(o) : 0; streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr); if (consumer == NULL) { consumer = streamCreateConsumer(o->ptr,group,c->argv[3]->ptr,c->argv[1],c->db->id,SCC_DEFAULT); @@ -4342,7 +4349,7 @@ void xautoclaimCommand(client *c) { /* Do the actual claiming. */ stream *s = o->ptr; - size_t old_alloc = kvobjAllocSize(o); + size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(o) : 0; streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr); if (consumer == NULL) { consumer = streamCreateConsumer(o->ptr,group,c->argv[3]->ptr,c->argv[1],c->db->id,SCC_DEFAULT); @@ -4471,7 +4478,7 @@ void xdelCommand(client *c) { kvobj *kv = lookupKeyWriteOrReply(c, c->argv[1], shared.czero); if (kv == NULL || checkType(c, kv, OBJ_STREAM)) return; stream *s = kv->ptr; - size_t old_alloc = kvobjAllocSize(kv); + size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(kv) : 0; /* We need to sanity check the IDs passed to start. Even if not * a big issue, it is not great that the command is only partially @@ -4569,7 +4576,7 @@ void xdelexCommand(client *c) { } stream *s = kv->ptr; - size_t old_alloc = kvobjAllocSize(kv); + size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(kv) : 0; int first_entry = 0; int deleted = 0; addReplyArrayLen(c, args.numids); @@ -4674,7 +4681,7 @@ void xtrimCommand(client *c) { stream *s = kv->ptr; /* Perform the trimming. */ - size_t old_alloc = kvobjAllocSize(kv); + size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(kv) : 0; int64_t deleted = streamTrim(s, &parsed_args); if (server.memory_tracking_enabled) updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),kv,old_alloc,kvobjAllocSize(kv)); @@ -4774,7 +4781,7 @@ void xinfoReplyWithStreamInfo(client *c, kvobj *kv) { addReplyBulkCString(c,"iids-duplicates"); addReplyLongLong(c,s->iids_duplicates); - size_t old_alloc = kvobjAllocSize(kv); + size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(kv) : 0; if (!full) { /* XINFO STREAM */