diff --git a/src/rdb.c b/src/rdb.c index 5e90299e2..c41585bb3 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3991,6 +3991,16 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin estoreAdd(db->subexpires, getKeySlot(key), kv, minExpiredField); } + /* Register streams with IDMP producers for cron-based expiration. */ + if (kv->type == OBJ_STREAM) { + stream *s = kv->ptr; + if (s->idmp_producers != NULL) { + robj *kobj = createStringObject(key, sdslen(key)); + if (dictAddRaw(db->stream_idmp_keys, kobj, NULL) == NULL) + decrRefCount(kobj); + } + } + /* Set usage information (for eviction). */ objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000); diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 3b8a28e87..ab2c08815 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -927,6 +927,42 @@ start_server { assert_equal $id3 [r XADD mystream IDMP p3 "req-1" * field "dup"] } {} {external:skip} + test {XADD IDMP cron expiration works after RDB load} { + r DEL mystream + + # Create stream and set IDMP-DURATION before adding entries, + # since XCFGSET clears existing entries when the duration changes. + r XADD mystream IDMP p1 "init" * field "init" + r XCFGSET mystream IDMP-DURATION 2 + r XADD mystream IDMP p1 "req-1" * field "v1" + r XADD mystream IDMP p2 "req-1" * field "v2" + + set reply [r XINFO STREAM mystream] + assert_equal 2 [dict get $reply pids-tracked] + assert_equal 2 [dict get $reply iids-tracked] + + # Save and restart — this triggers RDB load which should + # register the stream in stream_idmp_keys for cron cleanup. + r SAVE + restart_server 0 true false + + # Wait for IDMP entries to expire and for the cron to clean them up. + # If the stream was not registered in stream_idmp_keys after RDB load, + # the counts would never reach 0. + # Poll instead of a fixed sleep so the test finishes as soon as possible. + wait_for_condition 50 100 { + [dict get [r XINFO STREAM mystream] pids-tracked] == 0 && + [dict get [r XINFO STREAM mystream] iids-tracked] == 0 + } else { + fail "IDMP entries were not cleaned up after RDB load" + } + + # Expired IIDs should be re-addable as new entries + set new_id [r XADD mystream IDMP p1 "req-1" * field "new"] + assert {$new_id ne ""} + assert_equal 4 [r XLEN mystream] + } {} {external:skip} + test {XADD IDMP multiple producers concurrent access} { r DEL mystream