mirror of
https://github.com/redis/redis.git
synced 2026-05-28 04:02:46 -04:00
Fix IDMP cron expiration not working after RDB load (#14869)
Some checks failed
CI / test-ubuntu-latest (push) Has been cancelled
CI / test-sanitizer-address (push) Has been cancelled
CI / build-debian-old (push) Has been cancelled
CI / build-macos-latest (push) Has been cancelled
CI / build-32bit (push) Has been cancelled
CI / build-libc-malloc (push) Has been cancelled
CI / build-centos-jemalloc (push) Has been cancelled
CI / build-old-chain-jemalloc (push) Has been cancelled
Codecov / code-coverage (push) Has been cancelled
External Server Tests / test-external-standalone (push) Has been cancelled
External Server Tests / test-external-cluster (push) Has been cancelled
External Server Tests / test-external-nodebug (push) Has been cancelled
Spellcheck / Spellcheck (push) Has been cancelled
Some checks failed
CI / test-ubuntu-latest (push) Has been cancelled
CI / test-sanitizer-address (push) Has been cancelled
CI / build-debian-old (push) Has been cancelled
CI / build-macos-latest (push) Has been cancelled
CI / build-32bit (push) Has been cancelled
CI / build-libc-malloc (push) Has been cancelled
CI / build-centos-jemalloc (push) Has been cancelled
CI / build-old-chain-jemalloc (push) Has been cancelled
Codecov / code-coverage (push) Has been cancelled
External Server Tests / test-external-standalone (push) Has been cancelled
External Server Tests / test-external-cluster (push) Has been cancelled
External Server Tests / test-external-nodebug (push) Has been cancelled
Spellcheck / Spellcheck (push) Has been cancelled
**Summary** Registers streams with IDMP producers in `db->stream_idmp_keys` during RDB load, so that the periodic cron job can expire stale IDMP entries after a server restart. Without this fix, streams loaded from RDB were never registered, causing IDMP entries to accumulate indefinitely and never be cleaned up by the cron. **Changes** - **`rdbLoadRioWithLoadingCtx` (src/rdb.c):** After loading each key-value pair, added a check for `OBJ_STREAM` type with a non-NULL `idmp_producers` rax. When found, creates a string object for the key and inserts it into `db->stream_idmp_keys` via `dictAddRaw`. If the key already exists (duplicate insert), the reference is decremented to avoid a leak. - **Test `XADD IDMP cron expiration works after RDB load` (tests/unit/type/stream.tcl):** Added an integration test that creates a stream with IDMP producers, sets a short `IDMP-DURATION`, saves and restarts the server, then verifies that the cron eventually expires the entries (pids-tracked and iids-tracked drop to 0) and that previously-expired IIDs can be re-added as new entries. **Benefits** - **Cron expiration works after restart:** Streams with IDMP producers are now discoverable by the periodic cleanup cron after an RDB load, ensuring expired entries are reaped as expected. - **No memory leak on stale entries:** Without registration, IDMP entries that outlived their duration would persist in memory forever after a restart, growing unboundedly. This fix ensures they are cleaned up. - **Consistency between runtime and post-load behavior:** IDMP expiration now behaves identically whether the stream was created at runtime or restored from an RDB snapshot.
This commit is contained in:
parent
708bfd5de5
commit
ee376cdc3c
2 changed files with 46 additions and 0 deletions
10
src/rdb.c
10
src/rdb.c
|
|
@ -3991,6 +3991,16 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
|||
estoreAdd(db->subexpires, getKeySlot(key), kv, minExpiredField);
|
||||
}
|
||||
|
||||
/* Register streams with IDMP producers for cron-based expiration. */
|
||||
if (kv->type == OBJ_STREAM) {
|
||||
stream *s = kv->ptr;
|
||||
if (s->idmp_producers != NULL) {
|
||||
robj *kobj = createStringObject(key, sdslen(key));
|
||||
if (dictAddRaw(db->stream_idmp_keys, kobj, NULL) == NULL)
|
||||
decrRefCount(kobj);
|
||||
}
|
||||
}
|
||||
|
||||
/* Set usage information (for eviction). */
|
||||
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000);
|
||||
|
||||
|
|
|
|||
|
|
@ -927,6 +927,42 @@ start_server {
|
|||
assert_equal $id3 [r XADD mystream IDMP p3 "req-1" * field "dup"]
|
||||
} {} {external:skip}
|
||||
|
||||
test {XADD IDMP cron expiration works after RDB load} {
|
||||
r DEL mystream
|
||||
|
||||
# Create stream and set IDMP-DURATION before adding entries,
|
||||
# since XCFGSET clears existing entries when the duration changes.
|
||||
r XADD mystream IDMP p1 "init" * field "init"
|
||||
r XCFGSET mystream IDMP-DURATION 2
|
||||
r XADD mystream IDMP p1 "req-1" * field "v1"
|
||||
r XADD mystream IDMP p2 "req-1" * field "v2"
|
||||
|
||||
set reply [r XINFO STREAM mystream]
|
||||
assert_equal 2 [dict get $reply pids-tracked]
|
||||
assert_equal 2 [dict get $reply iids-tracked]
|
||||
|
||||
# Save and restart — this triggers RDB load which should
|
||||
# register the stream in stream_idmp_keys for cron cleanup.
|
||||
r SAVE
|
||||
restart_server 0 true false
|
||||
|
||||
# Wait for IDMP entries to expire and for the cron to clean them up.
|
||||
# If the stream was not registered in stream_idmp_keys after RDB load,
|
||||
# the counts would never reach 0.
|
||||
# Poll instead of a fixed sleep so the test finishes as soon as possible.
|
||||
wait_for_condition 50 100 {
|
||||
[dict get [r XINFO STREAM mystream] pids-tracked] == 0 &&
|
||||
[dict get [r XINFO STREAM mystream] iids-tracked] == 0
|
||||
} else {
|
||||
fail "IDMP entries were not cleaned up after RDB load"
|
||||
}
|
||||
|
||||
# Expired IIDs should be re-addable as new entries
|
||||
set new_id [r XADD mystream IDMP p1 "req-1" * field "new"]
|
||||
assert {$new_id ne ""}
|
||||
assert_equal 4 [r XLEN mystream]
|
||||
} {} {external:skip}
|
||||
|
||||
test {XADD IDMP multiple producers concurrent access} {
|
||||
r DEL mystream
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue