From ef741a95a2338c95cc9a9fc193a5142d57c04fe2 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Thu, 26 Mar 2026 15:39:29 +0200 Subject: [PATCH] fix handleExpiredIdmpEntries to mark keys as modified (#14933) refactor code to use streamKeyLoaded and streamKeyRemoved instead of handling stream internal concerns direction from random places. --- src/cluster.c | 9 ++------- src/db.c | 25 ++++++++----------------- src/rdb.c | 10 ++-------- src/stream.h | 2 ++ src/t_stream.c | 27 +++++++++++++++++++++++++++ 5 files changed, 41 insertions(+), 32 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index e9b1a8b5e..04ba14647 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -306,13 +306,8 @@ void restoreCommand(client *c) { estoreAdd(c->db->subexpires, getKeySlot(key->ptr), kv, minExpiredField); } - if (kvtype == OBJ_STREAM) { - stream *s = kv->ptr; - if (s->idmp_producers != NULL) { - if (dictAdd(c->db->stream_idmp_keys, key, NULL) == DICT_OK) - incrRefCount(key); - } - } + if (kvtype == OBJ_STREAM) + streamKeyLoaded(c->db, key, kv); if (ttl) { if (!absttl) { diff --git a/src/db.c b/src/db.c index 36d6d96e4..8db93aade 100644 --- a/src/db.c +++ b/src/db.c @@ -593,7 +593,7 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link estoreRemove(db->subexpires, slot, old); if (old->type == OBJ_STREAM) - dictDelete(db->stream_idmp_keys, key); + streamKeyRemoved(db, key, old); long long oldExpire = getExpire(db, key->ptr, old); @@ -858,7 +858,7 @@ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) { /* If stream with IDMP tracking, remove it from stream_idmp_keys */ if (type == OBJ_STREAM) - dictDelete(db->stream_idmp_keys, key); + streamKeyRemoved(db, key, kv); /* RM_StringDMA may call dbUnshareStringValue which may free kv, so we * need to incr to retain kv */ @@ -2243,10 +2243,8 @@ void renameGenericCommand(client *c, int nx) { estoreAdd(c->db->subexpires, getKeySlot(c->argv[2]->ptr), o, minHashExpireTime); /* Re-register stream IDMP tracking under the new key name. */ - if (srctype == OBJ_STREAM && ((stream *)o->ptr)->idmp_producers != NULL) { - if (dictAdd(c->db->stream_idmp_keys, c->argv[2], NULL) == DICT_OK) - incrRefCount(c->argv[2]); - } + if (srctype == OBJ_STREAM) + streamKeyLoaded(c->db, c->argv[2], o); keyModified(c,c->db,c->argv[1],NULL,1); keyModified(c,c->db,c->argv[2],NULL,1); /* LRM already updated by dbAddInternal */ @@ -2343,10 +2341,8 @@ void moveCommand(client *c) { estoreAdd(dst->subexpires, slot, kv, hashExpireTime); /* Register stream IDMP tracking in the destination DB. */ - if (kv->type == OBJ_STREAM && ((stream *)kv->ptr)->idmp_producers != NULL) { - if (dictAdd(dst->stream_idmp_keys, c->argv[1], NULL) == DICT_OK) - incrRefCount(c->argv[1]); - } + if (kv->type == OBJ_STREAM) + streamKeyLoaded(dst, c->argv[1], kv); keyModified(c,src,c->argv[1],NULL,1); keyModified(c,dst,c->argv[1],NULL,1); /* LRM already updated by dbAddInternal */ @@ -2469,13 +2465,8 @@ void copyCommand(client *c) { estoreAdd(dst->subexpires, getKeySlot(newkey->ptr), kvCopy, minHashExpire); /* Register copied stream with IDMP producers for cron-based expiration. */ - if (kvCopy->type == OBJ_STREAM) { - stream *s = kvCopy->ptr; - if (s->idmp_producers != NULL) { - if (dictAdd(dst->stream_idmp_keys, newkey, NULL) == DICT_OK) - incrRefCount(newkey); - } - } + if (kvCopy->type == OBJ_STREAM) + streamKeyLoaded(dst, newkey, kvCopy); /* OK! key copied. Signal modification (LRM already updated by dbAddInternal) */ keyModified(c,dst,c->argv[2],NULL,1); diff --git a/src/rdb.c b/src/rdb.c index bcae57502..e9429d28f 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -4037,14 +4037,8 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin } /* Register streams with IDMP producers for cron-based expiration. */ - if (kv->type == OBJ_STREAM) { - stream *s = kv->ptr; - if (s->idmp_producers != NULL) { - robj *kobj = createStringObject(key, sdslen(key)); - if (dictAddRaw(db->stream_idmp_keys, kobj, NULL) == NULL) - decrRefCount(kobj); - } - } + if (kv->type == OBJ_STREAM) + streamKeyLoaded(db, &keyobj, kv); /* Set usage information (for eviction). */ objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000); diff --git a/src/stream.h b/src/stream.h index 028e37435..da9d41a69 100644 --- a/src/stream.h +++ b/src/stream.h @@ -193,6 +193,8 @@ long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id); int64_t streamTrimByLength(stream *s, long long maxlen, int approx); int64_t streamTrimByID(stream *s, streamID minid, int approx); int streamEntryExists(stream *s, streamID *id); +void streamKeyLoaded(redisDb *db, robj *key, robj *val); +void streamKeyRemoved(redisDb *db, robj *key, robj *val); listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, unsigned char *key); diff --git a/src/t_stream.c b/src/t_stream.c index d263f2672..ba7ca2968 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -5704,6 +5704,27 @@ static void trackStreamIdmpEntries(client *c, robj *key) { } } +/* To be used when a stream key was loaded into ram, re-register it in stream_idmp_keys if needed */ +void streamKeyLoaded(redisDb *db, robj *key, robj *val) { + stream *s = val->ptr; + if (s->idmp_producers != NULL) { + robj *tracked_key = key; + if (key->refcount == OBJ_STATIC_REFCOUNT) + tracked_key = createStringObject(key->ptr, sdslen(key->ptr)); + if (dictAddRaw(db->stream_idmp_keys, tracked_key, NULL)) { + incrRefCount(tracked_key); + } + if (tracked_key != key) + decrRefCount(tracked_key); + } +} + +/* To be used when a steam key was removed from ram, un-redigster from stream_idmp_keys if needed */ +void streamKeyRemoved(redisDb *db, robj *key, robj *val) { + UNUSED(val); + dictDelete(db->stream_idmp_keys, key); +} + /* Clean up expired idempotency entries from tracked streams. This function * is invoked regularly from serverCron() to remove expired entries * from the idmp_dict of streams that have idempotency tracking enabled, @@ -5750,6 +5771,7 @@ void handleExpiredIdmpEntries(void) { } /* Iterate through all producers and remove expired entries */ + int modified = 0; raxIterator ri; raxStart(&ri, s->idmp_producers); raxSeek(&ri, "^", NULL, 0); @@ -5769,6 +5791,7 @@ void handleExpiredIdmpEntries(void) { } /* Free the entry */ idmpEntryFree(entry, &s->alloc_size); + modified = 1; } else { break; } @@ -5779,10 +5802,14 @@ void handleExpiredIdmpEntries(void) { raxRemove(s->idmp_producers, ri.key, ri.key_len, NULL); idmpProducerFree(producer, &s->alloc_size); raxSeek(&ri, ">=", ri.key, ri.key_len); + modified = 1; } } raxStop(&ri); + if (modified) + keyModified(NULL, db, key, kv, 0); + /* If no producers remain, free the entire rax tree */ if (raxSize(s->idmp_producers) == 0) { raxFree(s->idmp_producers);