fix handleExpiredIdmpEntries to mark keys as modified (#14933)
Some checks are pending
CI / test-ubuntu-latest (push) Waiting to run
CI / test-sanitizer-address (push) Waiting to run
CI / build-debian-old (push) Waiting to run
CI / build-macos-latest (push) Waiting to run
CI / build-32bit (push) Waiting to run
CI / build-libc-malloc (push) Waiting to run
CI / build-centos-jemalloc (push) Waiting to run
CI / build-old-chain-jemalloc (push) Waiting to run
Codecov / code-coverage (push) Waiting to run
External Server Tests / test-external-standalone (push) Waiting to run
External Server Tests / test-external-cluster (push) Waiting to run
External Server Tests / test-external-nodebug (push) Waiting to run
Spellcheck / Spellcheck (push) Waiting to run

refactor code to use streamKeyLoaded and streamKeyRemoved instead of
handling stream internal concerns direction from random places.
This commit is contained in:
Oran Agra 2026-03-26 15:39:29 +02:00 committed by GitHub
parent 40c140bf16
commit ef741a95a2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 41 additions and 32 deletions

View file

@ -306,13 +306,8 @@ void restoreCommand(client *c) {
estoreAdd(c->db->subexpires, getKeySlot(key->ptr), kv, minExpiredField);
}
if (kvtype == OBJ_STREAM) {
stream *s = kv->ptr;
if (s->idmp_producers != NULL) {
if (dictAdd(c->db->stream_idmp_keys, key, NULL) == DICT_OK)
incrRefCount(key);
}
}
if (kvtype == OBJ_STREAM)
streamKeyLoaded(c->db, key, kv);
if (ttl) {
if (!absttl) {

View file

@ -593,7 +593,7 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link
estoreRemove(db->subexpires, slot, old);
if (old->type == OBJ_STREAM)
dictDelete(db->stream_idmp_keys, key);
streamKeyRemoved(db, key, old);
long long oldExpire = getExpire(db, key->ptr, old);
@ -858,7 +858,7 @@ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) {
/* If stream with IDMP tracking, remove it from stream_idmp_keys */
if (type == OBJ_STREAM)
dictDelete(db->stream_idmp_keys, key);
streamKeyRemoved(db, key, kv);
/* RM_StringDMA may call dbUnshareStringValue which may free kv, so we
* need to incr to retain kv */
@ -2243,10 +2243,8 @@ void renameGenericCommand(client *c, int nx) {
estoreAdd(c->db->subexpires, getKeySlot(c->argv[2]->ptr), o, minHashExpireTime);
/* Re-register stream IDMP tracking under the new key name. */
if (srctype == OBJ_STREAM && ((stream *)o->ptr)->idmp_producers != NULL) {
if (dictAdd(c->db->stream_idmp_keys, c->argv[2], NULL) == DICT_OK)
incrRefCount(c->argv[2]);
}
if (srctype == OBJ_STREAM)
streamKeyLoaded(c->db, c->argv[2], o);
keyModified(c,c->db,c->argv[1],NULL,1);
keyModified(c,c->db,c->argv[2],NULL,1); /* LRM already updated by dbAddInternal */
@ -2343,10 +2341,8 @@ void moveCommand(client *c) {
estoreAdd(dst->subexpires, slot, kv, hashExpireTime);
/* Register stream IDMP tracking in the destination DB. */
if (kv->type == OBJ_STREAM && ((stream *)kv->ptr)->idmp_producers != NULL) {
if (dictAdd(dst->stream_idmp_keys, c->argv[1], NULL) == DICT_OK)
incrRefCount(c->argv[1]);
}
if (kv->type == OBJ_STREAM)
streamKeyLoaded(dst, c->argv[1], kv);
keyModified(c,src,c->argv[1],NULL,1);
keyModified(c,dst,c->argv[1],NULL,1); /* LRM already updated by dbAddInternal */
@ -2469,13 +2465,8 @@ void copyCommand(client *c) {
estoreAdd(dst->subexpires, getKeySlot(newkey->ptr), kvCopy, minHashExpire);
/* Register copied stream with IDMP producers for cron-based expiration. */
if (kvCopy->type == OBJ_STREAM) {
stream *s = kvCopy->ptr;
if (s->idmp_producers != NULL) {
if (dictAdd(dst->stream_idmp_keys, newkey, NULL) == DICT_OK)
incrRefCount(newkey);
}
}
if (kvCopy->type == OBJ_STREAM)
streamKeyLoaded(dst, newkey, kvCopy);
/* OK! key copied. Signal modification (LRM already updated by dbAddInternal) */
keyModified(c,dst,c->argv[2],NULL,1);

View file

@ -4037,14 +4037,8 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
}
/* Register streams with IDMP producers for cron-based expiration. */
if (kv->type == OBJ_STREAM) {
stream *s = kv->ptr;
if (s->idmp_producers != NULL) {
robj *kobj = createStringObject(key, sdslen(key));
if (dictAddRaw(db->stream_idmp_keys, kobj, NULL) == NULL)
decrRefCount(kobj);
}
}
if (kv->type == OBJ_STREAM)
streamKeyLoaded(db, &keyobj, kv);
/* Set usage information (for eviction). */
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000);

View file

@ -193,6 +193,8 @@ long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id);
int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
int64_t streamTrimByID(stream *s, streamID minid, int approx);
int streamEntryExists(stream *s, streamID *id);
void streamKeyLoaded(redisDb *db, robj *key, robj *val);
void streamKeyRemoved(redisDb *db, robj *key, robj *val);
listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, unsigned char *key);

View file

@ -5704,6 +5704,27 @@ static void trackStreamIdmpEntries(client *c, robj *key) {
}
}
/* To be used when a stream key was loaded into ram, re-register it in stream_idmp_keys if needed */
void streamKeyLoaded(redisDb *db, robj *key, robj *val) {
stream *s = val->ptr;
if (s->idmp_producers != NULL) {
robj *tracked_key = key;
if (key->refcount == OBJ_STATIC_REFCOUNT)
tracked_key = createStringObject(key->ptr, sdslen(key->ptr));
if (dictAddRaw(db->stream_idmp_keys, tracked_key, NULL)) {
incrRefCount(tracked_key);
}
if (tracked_key != key)
decrRefCount(tracked_key);
}
}
/* To be used when a steam key was removed from ram, un-redigster from stream_idmp_keys if needed */
void streamKeyRemoved(redisDb *db, robj *key, robj *val) {
UNUSED(val);
dictDelete(db->stream_idmp_keys, key);
}
/* Clean up expired idempotency entries from tracked streams. This function
* is invoked regularly from serverCron() to remove expired entries
* from the idmp_dict of streams that have idempotency tracking enabled,
@ -5750,6 +5771,7 @@ void handleExpiredIdmpEntries(void) {
}
/* Iterate through all producers and remove expired entries */
int modified = 0;
raxIterator ri;
raxStart(&ri, s->idmp_producers);
raxSeek(&ri, "^", NULL, 0);
@ -5769,6 +5791,7 @@ void handleExpiredIdmpEntries(void) {
}
/* Free the entry */
idmpEntryFree(entry, &s->alloc_size);
modified = 1;
} else {
break;
}
@ -5779,10 +5802,14 @@ void handleExpiredIdmpEntries(void) {
raxRemove(s->idmp_producers, ri.key, ri.key_len, NULL);
idmpProducerFree(producer, &s->alloc_size);
raxSeek(&ri, ">=", ri.key, ri.key_len);
modified = 1;
}
}
raxStop(&ri);
if (modified)
keyModified(NULL, db, key, kv, 0);
/* If no producers remain, free the entire rax tree */
if (raxSize(s->idmp_producers) == 0) {
raxFree(s->idmp_producers);