From 89eb0f46a4714c2ebdabd012966c6c3bb93e600c Mon Sep 17 00:00:00 2001 From: Shubham Taple Date: Sun, 5 Apr 2026 21:25:12 +0530 Subject: [PATCH 1/5] Improve time complexity of streamMoveIdmpKeys() --- src/cluster_asm.c | 3 ++- src/db.c | 17 ++++++++++------- src/server.h | 2 +- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/cluster_asm.c b/src/cluster_asm.c index db801a9a1..f4a57ee46 100644 --- a/src/cluster_asm.c +++ b/src/cluster_asm.c @@ -3044,9 +3044,10 @@ void asmTriggerBackgroundTrim(asmTrimCtx *trim_ctx, int migration_cleanup) { kvstoreMoveDict(server.db[0].keys, keys, slot); kvstoreMoveDict(server.db[0].expires, expires, slot); estoreMoveEbuckets(server.db[0].subexpires, subexpires, slot); - streamMoveIdmpKeys(server.db[0].stream_idmp_keys, stream_idmp_keys, slot); } } + /* Move stream IDMP keys from main DB to temp dict (O(IDMP entries x number of slot ranges)) */ + streamMoveIdmpKeys(server.db[0].stream_idmp_keys, stream_idmp_keys, slots); emptyDbDataAsync(keys, expires, subexpires, stream_idmp_keys, trim_ctx); diff --git a/src/db.c b/src/db.c index 1565b7bef..acb0cf788 100644 --- a/src/db.c +++ b/src/db.c @@ -1107,21 +1107,24 @@ void discardTempDb(redisDb *tempDb) { zfree(tempDb); } -/* Move entries whose robj keys belong to the given slot from src dict to dst. +/* Move entries whose robj keys belong to the given slotRangeArray from src dict to dst. * Matching entries are removed from src and added to dst. */ -void streamMoveIdmpKeys(dict *src, dict *dst, int slot) { +void streamMoveIdmpKeys(dict *src, dict *dst, slotRangeArray *slots) { if (dictSize(src) == 0) return; + /* slots must not be NULL */ + serverAssert(slots != NULL); dictIterator *di = dictGetSafeIterator(src); dictEntry *de; while ((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); - if (calculateKeySlot(key->ptr) == slot) { - if (dictAdd(dst, key, dictGetVal(de)) == DICT_OK) { - incrRefCount(key); - } - dictDelete(src, key); + /* Check if key belongs to the slot range. */ + if (!slotRangeArrayContains(slots, keyHashSlot(key->ptr, sdslen(key->ptr)))) + continue; + if (dictAdd(dst, key, dictGetVal(de)) == DICT_OK) { + incrRefCount(key); } + dictDelete(src, key); } dictReleaseIterator(di); } diff --git a/src/server.h b/src/server.h index 72036f6b6..65847162d 100644 --- a/src/server.h +++ b/src/server.h @@ -3998,7 +3998,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor); int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor); int dbAsyncDelete(redisDb *db, robj *key); void emptyDbAsync(redisDb *db); -void streamMoveIdmpKeys(dict *src, dict *dst, int slot); +void streamMoveIdmpKeys(dict *src, dict *dst, struct slotRangeArray *slots); void emptyDbDataAsync(kvstore *keys, kvstore *expires, ebuckets hexpires, dict *stream_idmp_keys, struct asmTrimCtx *ctx); size_t lazyfreeGetPendingObjectsCount(void); size_t lazyfreeGetFreedObjectsCount(void); From f57ef334493a57ec4a7e1aa46c994a15e681308c Mon Sep 17 00:00:00 2001 From: Shubham Taple Date: Thu, 16 Apr 2026 22:23:47 +0530 Subject: [PATCH 2/5] Add test for asmTriggerBackgroundTrim, remove temporary EXPERIMENTAL tag from sflush.json and tcl --- src/commands.def | 36 ++++++++++ src/commands/sflush.json | 3 +- tests/unit/cluster/multi-slot-operations.tcl | 76 +++++++++++++++++++- 3 files changed, 112 insertions(+), 3 deletions(-) diff --git a/src/commands.def b/src/commands.def index 3980365d4..ecf5c7da6 100644 --- a/src/commands.def +++ b/src/commands.def @@ -8171,6 +8171,41 @@ struct COMMAND_ARG RESTORE_ASKING_Args[] = { #define SAVE_Keyspecs NULL #endif +/********** SFLUSH ********************/ + +#ifndef SKIP_CMD_HISTORY_TABLE +/* SFLUSH history */ +#define SFLUSH_History NULL +#endif + +#ifndef SKIP_CMD_TIPS_TABLE +/* SFLUSH tips */ +#define SFLUSH_Tips NULL +#endif + +#ifndef SKIP_CMD_KEY_SPECS_TABLE +/* SFLUSH key specs */ +#define SFLUSH_Keyspecs NULL +#endif + +/* SFLUSH data argument table */ +struct COMMAND_ARG SFLUSH_data_Subargs[] = { +{MAKE_ARG("slot-start",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)}, +{MAKE_ARG("slot-last",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)}, +}; + +/* SFLUSH flush_type argument table */ +struct COMMAND_ARG SFLUSH_flush_type_Subargs[] = { +{MAKE_ARG("async",ARG_TYPE_PURE_TOKEN,-1,"ASYNC",NULL,NULL,CMD_ARG_NONE,0,NULL)}, +{MAKE_ARG("sync",ARG_TYPE_PURE_TOKEN,-1,"SYNC",NULL,NULL,CMD_ARG_NONE,0,NULL)}, +}; + +/* SFLUSH argument table */ +struct COMMAND_ARG SFLUSH_Args[] = { +{MAKE_ARG("data",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_MULTIPLE,2,NULL),.subargs=SFLUSH_data_Subargs}, +{MAKE_ARG("flush-type",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=SFLUSH_flush_type_Subargs}, +}; + /********** SHUTDOWN ********************/ #ifndef SKIP_CMD_HISTORY_TABLE @@ -11910,6 +11945,7 @@ struct COMMAND_STRUCT redisCommandTable[] = { {MAKE_CMD("restore-asking","An internal command for migrating keys in a cluster.","O(1) to create the new key and additional O(N*M) to reconstruct the serialized value, where N is the number of Redis objects composing the value and M their average size. For small string values the time complexity is thus O(1)+O(1*M) where M is small, so simply O(1). However for sorted set values the complexity is O(N*M*log(N)) because inserting values into sorted sets is O(log(N)).","3.0.0",CMD_DOC_SYSCMD,NULL,NULL,"server",COMMAND_GROUP_SERVER,RESTORE_ASKING_History,3,RESTORE_ASKING_Tips,0,restoreCommand,-4,CMD_WRITE|CMD_DENYOOM|CMD_ASKING,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,RESTORE_ASKING_Keyspecs,1,NULL,7),.args=RESTORE_ASKING_Args}, {MAKE_CMD("role","Returns the replication role.","O(1)","2.8.12",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,ROLE_History,0,ROLE_Tips,0,roleCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_SENTINEL,ACL_CATEGORY_ADMIN|ACL_CATEGORY_DANGEROUS,ROLE_Keyspecs,0,NULL,0)}, {MAKE_CMD("save","Synchronously saves the database(s) to disk.","O(N) where N is the total number of keys in all databases","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,SAVE_History,0,SAVE_Tips,0,saveCommand,1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_NO_MULTI,0,SAVE_Keyspecs,0,NULL,0)}, +{MAKE_CMD("sflush","Remove all keys from selected range of slots.","O(N)+O(k) where N is the number of keys and k is the number of slots.","8.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,SFLUSH_History,0,SFLUSH_Tips,0,sflushCommand,-3,CMD_WRITE,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,SFLUSH_Keyspecs,0,NULL,2),.args=SFLUSH_Args}, {MAKE_CMD("shutdown","Synchronously saves the database(s) to disk and shuts down the Redis server.","O(N) when saving, where N is the total number of keys in all databases when saving data, otherwise O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,SHUTDOWN_History,1,SHUTDOWN_Tips,0,shutdownCommand,-1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_NO_MULTI|CMD_SENTINEL|CMD_ALLOW_BUSY,0,SHUTDOWN_Keyspecs,0,NULL,4),.args=SHUTDOWN_Args}, {MAKE_CMD("slaveof","Sets a Redis server as a replica of another, or promotes it to being a master.","O(1)","1.0.0",CMD_DOC_DEPRECATED,"`REPLICAOF`","5.0.0","server",COMMAND_GROUP_SERVER,SLAVEOF_History,0,SLAVEOF_Tips,0,replicaofCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_STALE,0,SLAVEOF_Keyspecs,0,NULL,1),.args=SLAVEOF_Args}, {MAKE_CMD("slowlog","A container for slow log commands.","Depends on subcommand.","2.2.12",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,SLOWLOG_History,0,SLOWLOG_Tips,0,NULL,-2,0,0,SLOWLOG_Keyspecs,0,NULL,0),.subcommands=SLOWLOG_Subcommands}, diff --git a/src/commands/sflush.json b/src/commands/sflush.json index bac27ad2d..b076e33d9 100644 --- a/src/commands/sflush.json +++ b/src/commands/sflush.json @@ -7,8 +7,7 @@ "arity": -3, "function": "sflushCommand", "command_flags": [ - "WRITE", - "EXPERIMENTAL" + "WRITE" ], "acl_categories": [ "KEYSPACE", diff --git a/tests/unit/cluster/multi-slot-operations.tcl b/tests/unit/cluster/multi-slot-operations.tcl index 905fe676a..32eab0bc1 100644 --- a/tests/unit/cluster/multi-slot-operations.tcl +++ b/tests/unit/cluster/multi-slot-operations.tcl @@ -108,11 +108,85 @@ test "DELSLOTSRANGE command with several boundary conditions test suite" { } } cluster_allocate_with_continuous_slots_local -start_cluster 2 0 {tags {external:skip cluster experimental}} { +start_cluster 2 0 {tags {external:skip cluster}} { set master1 [srv 0 "client"] set master2 [srv -1 "client"] +# Stream IDMP + cluster partial SFLUSH (multi slot ranges) +# Covers streamMoveIdmpKeys() moving keys for all trimmed ranges in one pass. +# Requires: cluster mode, background trim (default), partial SFLUSH (not full node flush). + +# Helper: find a key name whose hash tag lands on a given slot (0 - 16383). +# Only the part inside {...} is hashed, so we bruteforce the tag text. +proc stream_keys_for_slot {redis_cmd slot} { + for {set i 0} {$i < 100000} {incr i} { + set key "\{sidmp$i\}:s" + if {[$redis_cmd cluster keyslot $key] == $slot} { + return $key + } + } + error "could not build a key for slot $slot" +} + +test {SFLUSH multiple ranges removes IDMP streams in those slots only} { + # clean state on this master (owns slots 0-8191) + $master1 FLUSHALL + + # Use background trim (since we want to test asmTriggerBackgroundTrim()) + $master1 debug asm-trim-method bg + + # Three slots, all on node 0 (0-8191): two we will flush (slot_a and slot_b), one of it we will keep (slot_c) + set slot_a 100 + set slot_b 5000 + set slot_c 7000 + + set k_a [stream_keys_for_slot $master1 $slot_a] + set k_b [stream_keys_for_slot $master1 $slot_b] + set k_c [stream_keys_for_slot $master1 $slot_c] + + # create streams with IDMP so they are tracked in dict stream_idmp_keys + $master1 XADD $k_a IDMP p1 "init" * field "init" + $master1 XADD $k_b IDMP p1 "init" * field "init" + $master1 XADD $k_c IDMP p1 "init" * field "init" + + # Long duration so the cron does not clear IDMP state during the test + $master1 XCFGSET $k_a IDMP-DURATION 86400 + $master1 XCFGSET $k_b IDMP-DURATION 86400 + $master1 XCFGSET $k_c IDMP-DURATION 86400 + + # Real IDMP entries + set id_a [$master1 XADD $k_a IDMP p1 "req_a" * field v1] + set id_b [$master1 XADD $k_b IDMP p1 "req_b" * field v1] + set id_c [$master1 XADD $k_c IDMP p1 "req_c" * field v1] + + assert_equal 1 [dict get [$master1 XINFO STREAM $k_a] pids-tracked] + assert_equal 1 [dict get [$master1 XINFO STREAM $k_b] pids-tracked] + assert_equal 1 [dict get [$master1 XINFO STREAM $k_c] pids-tracked] + + # two disjoint ranges that include slot_a and slot_b but not slot_c + set lo_a [expr {$slot_a - 5}] + set hi_a [expr {$slot_a + 5}] + set lo_b [expr {$slot_b - 5}] + set hi_b [expr {$slot_b + 5}] + + # partial SFLUSH: not the whole 0-8191, this is a real trim (not FLUSHDB shortcut) + $master1 SFLUSH $lo_a $hi_a $lo_b $hi_b SYNC + + # Wait until two flushed streams are gone and third remains + wait_for_condition 1000 10 { + [$master1 EXISTS $k_a] == 0 && [$master1 EXISTS $k_b] == 0 && [$master1 EXISTS $k_c] == 1 + } else { + fail "SFLUSH did not delete only the expected streams in time" + } + + # remaining streams should still deduplicate IDMP correctly + assert_equal $id_c [$master1 XADD $k_c IDMP p1 "req_c" * field "dup"] + + # sanity, we did not accidentally wipe the whole db + assert {[$master1 DBSIZE] >= 1} +} + test "SFLUSH - Errors and output validation" { assert_match "* 0-8191*" [$master1 CLUSTER NODES] assert_match "* 8192-16383*" [$master2 CLUSTER NODES] From 396a28eabdf72d9a79455ec1c2cb9bb0e95ab297 Mon Sep 17 00:00:00 2001 From: Shubham Taple Date: Sat, 18 Apr 2026 16:58:26 +0530 Subject: [PATCH 3/5] Revert "Add test for asmTriggerBackgroundTrim, remove temporary EXPERIMENTAL tag from sflush.json and tcl" This reverts commit f57ef334493a57ec4a7e1aa46c994a15e681308c. Revert test using sflush --- src/commands.def | 36 ---------- src/commands/sflush.json | 3 +- tests/unit/cluster/multi-slot-operations.tcl | 76 +------------------- 3 files changed, 3 insertions(+), 112 deletions(-) diff --git a/src/commands.def b/src/commands.def index ecf5c7da6..3980365d4 100644 --- a/src/commands.def +++ b/src/commands.def @@ -8171,41 +8171,6 @@ struct COMMAND_ARG RESTORE_ASKING_Args[] = { #define SAVE_Keyspecs NULL #endif -/********** SFLUSH ********************/ - -#ifndef SKIP_CMD_HISTORY_TABLE -/* SFLUSH history */ -#define SFLUSH_History NULL -#endif - -#ifndef SKIP_CMD_TIPS_TABLE -/* SFLUSH tips */ -#define SFLUSH_Tips NULL -#endif - -#ifndef SKIP_CMD_KEY_SPECS_TABLE -/* SFLUSH key specs */ -#define SFLUSH_Keyspecs NULL -#endif - -/* SFLUSH data argument table */ -struct COMMAND_ARG SFLUSH_data_Subargs[] = { -{MAKE_ARG("slot-start",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)}, -{MAKE_ARG("slot-last",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)}, -}; - -/* SFLUSH flush_type argument table */ -struct COMMAND_ARG SFLUSH_flush_type_Subargs[] = { -{MAKE_ARG("async",ARG_TYPE_PURE_TOKEN,-1,"ASYNC",NULL,NULL,CMD_ARG_NONE,0,NULL)}, -{MAKE_ARG("sync",ARG_TYPE_PURE_TOKEN,-1,"SYNC",NULL,NULL,CMD_ARG_NONE,0,NULL)}, -}; - -/* SFLUSH argument table */ -struct COMMAND_ARG SFLUSH_Args[] = { -{MAKE_ARG("data",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_MULTIPLE,2,NULL),.subargs=SFLUSH_data_Subargs}, -{MAKE_ARG("flush-type",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=SFLUSH_flush_type_Subargs}, -}; - /********** SHUTDOWN ********************/ #ifndef SKIP_CMD_HISTORY_TABLE @@ -11945,7 +11910,6 @@ struct COMMAND_STRUCT redisCommandTable[] = { {MAKE_CMD("restore-asking","An internal command for migrating keys in a cluster.","O(1) to create the new key and additional O(N*M) to reconstruct the serialized value, where N is the number of Redis objects composing the value and M their average size. For small string values the time complexity is thus O(1)+O(1*M) where M is small, so simply O(1). However for sorted set values the complexity is O(N*M*log(N)) because inserting values into sorted sets is O(log(N)).","3.0.0",CMD_DOC_SYSCMD,NULL,NULL,"server",COMMAND_GROUP_SERVER,RESTORE_ASKING_History,3,RESTORE_ASKING_Tips,0,restoreCommand,-4,CMD_WRITE|CMD_DENYOOM|CMD_ASKING,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,RESTORE_ASKING_Keyspecs,1,NULL,7),.args=RESTORE_ASKING_Args}, {MAKE_CMD("role","Returns the replication role.","O(1)","2.8.12",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,ROLE_History,0,ROLE_Tips,0,roleCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_SENTINEL,ACL_CATEGORY_ADMIN|ACL_CATEGORY_DANGEROUS,ROLE_Keyspecs,0,NULL,0)}, {MAKE_CMD("save","Synchronously saves the database(s) to disk.","O(N) where N is the total number of keys in all databases","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,SAVE_History,0,SAVE_Tips,0,saveCommand,1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_NO_MULTI,0,SAVE_Keyspecs,0,NULL,0)}, -{MAKE_CMD("sflush","Remove all keys from selected range of slots.","O(N)+O(k) where N is the number of keys and k is the number of slots.","8.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,SFLUSH_History,0,SFLUSH_Tips,0,sflushCommand,-3,CMD_WRITE,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,SFLUSH_Keyspecs,0,NULL,2),.args=SFLUSH_Args}, {MAKE_CMD("shutdown","Synchronously saves the database(s) to disk and shuts down the Redis server.","O(N) when saving, where N is the total number of keys in all databases when saving data, otherwise O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,SHUTDOWN_History,1,SHUTDOWN_Tips,0,shutdownCommand,-1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_NO_MULTI|CMD_SENTINEL|CMD_ALLOW_BUSY,0,SHUTDOWN_Keyspecs,0,NULL,4),.args=SHUTDOWN_Args}, {MAKE_CMD("slaveof","Sets a Redis server as a replica of another, or promotes it to being a master.","O(1)","1.0.0",CMD_DOC_DEPRECATED,"`REPLICAOF`","5.0.0","server",COMMAND_GROUP_SERVER,SLAVEOF_History,0,SLAVEOF_Tips,0,replicaofCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_STALE,0,SLAVEOF_Keyspecs,0,NULL,1),.args=SLAVEOF_Args}, {MAKE_CMD("slowlog","A container for slow log commands.","Depends on subcommand.","2.2.12",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,SLOWLOG_History,0,SLOWLOG_Tips,0,NULL,-2,0,0,SLOWLOG_Keyspecs,0,NULL,0),.subcommands=SLOWLOG_Subcommands}, diff --git a/src/commands/sflush.json b/src/commands/sflush.json index b076e33d9..bac27ad2d 100644 --- a/src/commands/sflush.json +++ b/src/commands/sflush.json @@ -7,7 +7,8 @@ "arity": -3, "function": "sflushCommand", "command_flags": [ - "WRITE" + "WRITE", + "EXPERIMENTAL" ], "acl_categories": [ "KEYSPACE", diff --git a/tests/unit/cluster/multi-slot-operations.tcl b/tests/unit/cluster/multi-slot-operations.tcl index 32eab0bc1..905fe676a 100644 --- a/tests/unit/cluster/multi-slot-operations.tcl +++ b/tests/unit/cluster/multi-slot-operations.tcl @@ -108,85 +108,11 @@ test "DELSLOTSRANGE command with several boundary conditions test suite" { } } cluster_allocate_with_continuous_slots_local -start_cluster 2 0 {tags {external:skip cluster}} { +start_cluster 2 0 {tags {external:skip cluster experimental}} { set master1 [srv 0 "client"] set master2 [srv -1 "client"] -# Stream IDMP + cluster partial SFLUSH (multi slot ranges) -# Covers streamMoveIdmpKeys() moving keys for all trimmed ranges in one pass. -# Requires: cluster mode, background trim (default), partial SFLUSH (not full node flush). - -# Helper: find a key name whose hash tag lands on a given slot (0 - 16383). -# Only the part inside {...} is hashed, so we bruteforce the tag text. -proc stream_keys_for_slot {redis_cmd slot} { - for {set i 0} {$i < 100000} {incr i} { - set key "\{sidmp$i\}:s" - if {[$redis_cmd cluster keyslot $key] == $slot} { - return $key - } - } - error "could not build a key for slot $slot" -} - -test {SFLUSH multiple ranges removes IDMP streams in those slots only} { - # clean state on this master (owns slots 0-8191) - $master1 FLUSHALL - - # Use background trim (since we want to test asmTriggerBackgroundTrim()) - $master1 debug asm-trim-method bg - - # Three slots, all on node 0 (0-8191): two we will flush (slot_a and slot_b), one of it we will keep (slot_c) - set slot_a 100 - set slot_b 5000 - set slot_c 7000 - - set k_a [stream_keys_for_slot $master1 $slot_a] - set k_b [stream_keys_for_slot $master1 $slot_b] - set k_c [stream_keys_for_slot $master1 $slot_c] - - # create streams with IDMP so they are tracked in dict stream_idmp_keys - $master1 XADD $k_a IDMP p1 "init" * field "init" - $master1 XADD $k_b IDMP p1 "init" * field "init" - $master1 XADD $k_c IDMP p1 "init" * field "init" - - # Long duration so the cron does not clear IDMP state during the test - $master1 XCFGSET $k_a IDMP-DURATION 86400 - $master1 XCFGSET $k_b IDMP-DURATION 86400 - $master1 XCFGSET $k_c IDMP-DURATION 86400 - - # Real IDMP entries - set id_a [$master1 XADD $k_a IDMP p1 "req_a" * field v1] - set id_b [$master1 XADD $k_b IDMP p1 "req_b" * field v1] - set id_c [$master1 XADD $k_c IDMP p1 "req_c" * field v1] - - assert_equal 1 [dict get [$master1 XINFO STREAM $k_a] pids-tracked] - assert_equal 1 [dict get [$master1 XINFO STREAM $k_b] pids-tracked] - assert_equal 1 [dict get [$master1 XINFO STREAM $k_c] pids-tracked] - - # two disjoint ranges that include slot_a and slot_b but not slot_c - set lo_a [expr {$slot_a - 5}] - set hi_a [expr {$slot_a + 5}] - set lo_b [expr {$slot_b - 5}] - set hi_b [expr {$slot_b + 5}] - - # partial SFLUSH: not the whole 0-8191, this is a real trim (not FLUSHDB shortcut) - $master1 SFLUSH $lo_a $hi_a $lo_b $hi_b SYNC - - # Wait until two flushed streams are gone and third remains - wait_for_condition 1000 10 { - [$master1 EXISTS $k_a] == 0 && [$master1 EXISTS $k_b] == 0 && [$master1 EXISTS $k_c] == 1 - } else { - fail "SFLUSH did not delete only the expected streams in time" - } - - # remaining streams should still deduplicate IDMP correctly - assert_equal $id_c [$master1 XADD $k_c IDMP p1 "req_c" * field "dup"] - - # sanity, we did not accidentally wipe the whole db - assert {[$master1 DBSIZE] >= 1} -} - test "SFLUSH - Errors and output validation" { assert_match "* 0-8191*" [$master1 CLUSTER NODES] assert_match "* 8192-16383*" [$master2 CLUSTER NODES] From 23c513770d5fa5ee6b95743f9b33038a029bcd80 Mon Sep 17 00:00:00 2001 From: Shubham Taple Date: Sat, 18 Apr 2026 20:37:53 +0530 Subject: [PATCH 4/5] Add tests for bgtrim IDMP --- tests/unit/cluster/atomic-slot-migration.tcl | 117 +++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/tests/unit/cluster/atomic-slot-migration.tcl b/tests/unit/cluster/atomic-slot-migration.tcl index 826f0d69c..64c5c5310 100644 --- a/tests/unit/cluster/atomic-slot-migration.tcl +++ b/tests/unit/cluster/atomic-slot-migration.tcl @@ -1696,6 +1696,123 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout R 0 debug asm-trim-method default } + test {Test bgtrim IDMP: disjoint ranges with in-range gap + far-slot; dedup on dest and source} { + # One IMPORT with two non-adjacent slot ranges (union trim). Slot between the ranges and a + # far slot stay on the source; migrated keys move to the dest. Exercises slotRangeArray + # membership + IDMP correctness (stream_idmp_keys vs one bg trim batch). + R 0 debug asm-trim-method bg + R 0 flushall + + set k0 [slot_key 0 idmp] + set k1 [slot_key 1 idmp] + set k2 [slot_key 2 idmp] + set k4 [slot_key 4 idmp] + set k5 [slot_key 5 idmp] + set k6 [slot_key 6 idmp] + set k101 [slot_key 101 idmp] + + foreach k [list $k0 $k1 $k2 $k4 $k5 $k6 $k101] { + R 0 XADD $k IDMP p1 "init" * field "init" + R 0 XCFGSET $k IDMP-DURATION 86400 + } + + set id0 [R 0 XADD $k0 IDMP p1 "r0" * field v0] + set id1 [R 0 XADD $k1 IDMP p1 "r1" * field v1] + set id2 [R 0 XADD $k2 IDMP p1 "r2" * field v2] + set id4 [R 0 XADD $k4 IDMP p1 "r4" * field v4] + set id5 [R 0 XADD $k5 IDMP p1 "r5" * field v5] + set id6 [R 0 XADD $k6 IDMP p1 "r6" * field v6] + set id101 [R 0 XADD $k101 IDMP p1 "r101" * field v101] + + foreach k [list $k0 $k1 $k2 $k4 $k5 $k6 $k101] { + assert_equal 1 [dict get [R 0 XINFO STREAM $k] pids-tracked] + } + + # Migrate [0-2] and [5-6] in a single IMPORT; slot 4 and 101 are outside both ranges. + # After migration, trimmed slots are owned by node 1: use R 1 for those keys (R 0 replies MOVED). + R 1 CLUSTER MIGRATION IMPORT 0 2 5 6 + wait_for_asm_done + + wait_for_condition 1000 10 { + [R 1 EXISTS $k0] == 1 && [R 1 EXISTS $k1] == 1 && [R 1 EXISTS $k2] == 1 && + [R 1 EXISTS $k5] == 1 && [R 1 EXISTS $k6] == 1 && + [R 0 EXISTS $k4] == 1 && [R 0 EXISTS $k101] == 1 + } else { + fail "Migrated streams missing on destination or survivors missing on source" + } + + foreach k [list $k0 $k1 $k2 $k5 $k6] { + assert_equal 1 [dict get [R 1 XINFO STREAM $k] pids-tracked] + } + assert_equal $id0 [R 1 XADD $k0 IDMP p1 "r0" * field dup] + assert_equal $id1 [R 1 XADD $k1 IDMP p1 "r1" * field dup] + assert_equal $id2 [R 1 XADD $k2 IDMP p1 "r2" * field dup] + assert_equal $id5 [R 1 XADD $k5 IDMP p1 "r5" * field dup] + assert_equal $id6 [R 1 XADD $k6 IDMP p1 "r6" * field dup] + + assert_equal 1 [dict get [R 0 XINFO STREAM $k4] pids-tracked] + assert_equal 1 [dict get [R 0 XINFO STREAM $k101] pids-tracked] + assert_equal $id4 [R 0 XADD $k4 IDMP p1 "r4" * field dup] + assert_equal $id101 [R 0 XADD $k101 IDMP p1 "r101" * field dup] + + # cleanup + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 0 2 5 6 + wait_for_asm_done + R 0 flushall + R 0 debug asm-trim-method default + } + + test {Test bgtrim IDMP: many streams; outer ranges migrated, inner + high slots kept} { + # Many IDMP entries across the shard; only outer slot ranges migrate so most keys stay on + # the source — selective trim vs a large stream_idmp_keys dict (same multi-range ASM path). + R 0 debug asm-trim-method bg + R 0 flushall + + set keys {} + foreach slot {0 1 2 3 4 5 6 7 101 102 103} { + lappend keys [slot_key $slot idmp] + } + + foreach k $keys { + R 0 XADD $k IDMP p1 "init" * field "init" + R 0 XCFGSET $k IDMP-DURATION 86400 + R 0 XADD $k IDMP p1 "req" * field v1 + assert_equal 1 [dict get [R 0 XINFO STREAM $k] pids-tracked] + } + + # Single IMPORT: ranges [0-1] and [6-7]; slots 2-5 and 101-103 are not trimmed on source. + R 1 CLUSTER MIGRATION IMPORT 0 1 6 7 + wait_for_asm_done + + set kept_slots {2 3 4 5 101 102 103} + # Trimmed slots are served by node 1 after migration; survivors stay on node 0. + # wait_for_condition body must be a single expr (no foreach/set). + wait_for_condition 1000 10 { + [R 1 EXISTS [slot_key 0 idmp]] == 1 && [R 1 EXISTS [slot_key 1 idmp]] == 1 && + [R 1 EXISTS [slot_key 6 idmp]] == 1 && [R 1 EXISTS [slot_key 7 idmp]] == 1 && + [R 0 EXISTS [slot_key 2 idmp]] == 1 && [R 0 EXISTS [slot_key 3 idmp]] == 1 && + [R 0 EXISTS [slot_key 4 idmp]] == 1 && [R 0 EXISTS [slot_key 5 idmp]] == 1 && + [R 0 EXISTS [slot_key 101 idmp]] == 1 && [R 0 EXISTS [slot_key 102 idmp]] == 1 && + [R 0 EXISTS [slot_key 103 idmp]] == 1 + } else { + fail "Selective IDMP trim did not match migrated slot ranges" + } + + foreach slot $kept_slots { + set k [slot_key $slot idmp] + assert_equal 1 [dict get [R 0 XINFO STREAM $k] pids-tracked] + R 0 XADD $k IDMP p1 "after_trim" * field x + } + + # cleanup + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 0 1 6 7 + wait_for_asm_done + R 0 flushall + R 0 debug asm-trim-method default + } + test "Test bgtrim after a FAILOVER on destination side" { R 1 debug asm-trim-method bg R 4 debug asm-trim-method bg From 2cd117bb9b47c265da9f1fbae4e06dda63bfe34b Mon Sep 17 00:00:00 2001 From: Shubham Taple Date: Sat, 25 Apr 2026 09:18:08 +0530 Subject: [PATCH 5/5] Use dictAddRaw since we have dict with no_value --- src/db.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db.c b/src/db.c index acb0cf788..869e8fc75 100644 --- a/src/db.c +++ b/src/db.c @@ -1121,7 +1121,7 @@ void streamMoveIdmpKeys(dict *src, dict *dst, slotRangeArray *slots) { /* Check if key belongs to the slot range. */ if (!slotRangeArrayContains(slots, keyHashSlot(key->ptr, sdslen(key->ptr)))) continue; - if (dictAdd(dst, key, dictGetVal(de)) == DICT_OK) { + if (dictAddRaw(dst, key, NULL)) { incrRefCount(key); } dictDelete(src, key);