From ee376cdc3cbf93428ee05829074fd83f33800f50 Mon Sep 17 00:00:00 2001 From: Sergei Georgiev Date: Fri, 13 Mar 2026 14:00:28 +0200 Subject: [PATCH] Fix IDMP cron expiration not working after RDB load (#14869) **Summary** Registers streams with IDMP producers in `db->stream_idmp_keys` during RDB load, so that the periodic cron job can expire stale IDMP entries after a server restart. Without this fix, streams loaded from RDB were never registered, causing IDMP entries to accumulate indefinitely and never be cleaned up by the cron. **Changes** - **`rdbLoadRioWithLoadingCtx` (src/rdb.c):** After loading each key-value pair, added a check for `OBJ_STREAM` type with a non-NULL `idmp_producers` rax. When found, creates a string object for the key and inserts it into `db->stream_idmp_keys` via `dictAddRaw`. If the key already exists (duplicate insert), the reference is decremented to avoid a leak. - **Test `XADD IDMP cron expiration works after RDB load` (tests/unit/type/stream.tcl):** Added an integration test that creates a stream with IDMP producers, sets a short `IDMP-DURATION`, saves and restarts the server, then verifies that the cron eventually expires the entries (pids-tracked and iids-tracked drop to 0) and that previously-expired IIDs can be re-added as new entries. **Benefits** - **Cron expiration works after restart:** Streams with IDMP producers are now discoverable by the periodic cleanup cron after an RDB load, ensuring expired entries are reaped as expected. - **No memory leak on stale entries:** Without registration, IDMP entries that outlived their duration would persist in memory forever after a restart, growing unboundedly. This fix ensures they are cleaned up. - **Consistency between runtime and post-load behavior:** IDMP expiration now behaves identically whether the stream was created at runtime or restored from an RDB snapshot. --- src/rdb.c | 10 ++++++++++ tests/unit/type/stream.tcl | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/src/rdb.c b/src/rdb.c index 5e90299e2..c41585bb3 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3991,6 +3991,16 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin estoreAdd(db->subexpires, getKeySlot(key), kv, minExpiredField); } + /* Register streams with IDMP producers for cron-based expiration. */ + if (kv->type == OBJ_STREAM) { + stream *s = kv->ptr; + if (s->idmp_producers != NULL) { + robj *kobj = createStringObject(key, sdslen(key)); + if (dictAddRaw(db->stream_idmp_keys, kobj, NULL) == NULL) + decrRefCount(kobj); + } + } + /* Set usage information (for eviction). */ objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000); diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 3b8a28e87..ab2c08815 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -927,6 +927,42 @@ start_server { assert_equal $id3 [r XADD mystream IDMP p3 "req-1" * field "dup"] } {} {external:skip} + test {XADD IDMP cron expiration works after RDB load} { + r DEL mystream + + # Create stream and set IDMP-DURATION before adding entries, + # since XCFGSET clears existing entries when the duration changes. + r XADD mystream IDMP p1 "init" * field "init" + r XCFGSET mystream IDMP-DURATION 2 + r XADD mystream IDMP p1 "req-1" * field "v1" + r XADD mystream IDMP p2 "req-1" * field "v2" + + set reply [r XINFO STREAM mystream] + assert_equal 2 [dict get $reply pids-tracked] + assert_equal 2 [dict get $reply iids-tracked] + + # Save and restart — this triggers RDB load which should + # register the stream in stream_idmp_keys for cron cleanup. + r SAVE + restart_server 0 true false + + # Wait for IDMP entries to expire and for the cron to clean them up. + # If the stream was not registered in stream_idmp_keys after RDB load, + # the counts would never reach 0. + # Poll instead of a fixed sleep so the test finishes as soon as possible. + wait_for_condition 50 100 { + [dict get [r XINFO STREAM mystream] pids-tracked] == 0 && + [dict get [r XINFO STREAM mystream] iids-tracked] == 0 + } else { + fail "IDMP entries were not cleaned up after RDB load" + } + + # Expired IIDs should be re-addable as new entries + set new_id [r XADD mystream IDMP p1 "req-1" * field "new"] + assert {$new_id ne ""} + assert_equal 4 [r XLEN mystream] + } {} {external:skip} + test {XADD IDMP multiple producers concurrent access} { r DEL mystream