diff --git a/src/cluster_asm.c b/src/cluster_asm.c index ac94a5103..8f1440310 100644 --- a/src/cluster_asm.c +++ b/src/cluster_asm.c @@ -3058,9 +3058,10 @@ void asmTriggerBackgroundTrim(asmTrimCtx *trim_ctx, int migration_cleanup) { kvstoreMoveDict(server.db[0].keys, keys, slot); kvstoreMoveDict(server.db[0].expires, expires, slot); estoreMoveEbuckets(server.db[0].subexpires, subexpires, slot); - streamMoveIdmpKeys(server.db[0].stream_idmp_keys, stream_idmp_keys, slot); } } + /* Move stream IDMP keys from main DB to temp dict (O(IDMP entries x number of slot ranges)) */ + streamMoveIdmpKeys(server.db[0].stream_idmp_keys, stream_idmp_keys, slots); emptyDbDataAsync(keys, expires, subexpires, stream_idmp_keys, trim_ctx); diff --git a/src/db.c b/src/db.c index 87881a991..fa4f272a3 100644 --- a/src/db.c +++ b/src/db.c @@ -1108,21 +1108,24 @@ void discardTempDb(redisDb *tempDb) { zfree(tempDb); } -/* Move entries whose robj keys belong to the given slot from src dict to dst. +/* Move entries whose robj keys belong to the given slotRangeArray from src dict to dst. * Matching entries are removed from src and added to dst. */ -void streamMoveIdmpKeys(dict *src, dict *dst, int slot) { +void streamMoveIdmpKeys(dict *src, dict *dst, slotRangeArray *slots) { if (dictSize(src) == 0) return; + /* slots must not be NULL */ + serverAssert(slots != NULL); dictIterator *di = dictGetSafeIterator(src); dictEntry *de; while ((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); - if (calculateKeySlot(key->ptr) == slot) { - if (dictAddRaw(dst, key, NULL)) { - incrRefCount(key); - } - dictDelete(src, key); + /* Check if key belongs to the slot range. */ + if (!slotRangeArrayContains(slots, keyHashSlot(key->ptr, sdslen(key->ptr)))) + continue; + if (dictAddRaw(dst, key, NULL)) { + incrRefCount(key); } + dictDelete(src, key); } dictReleaseIterator(di); } diff --git a/src/server.h b/src/server.h index 9318eec68..7eaf4f650 100644 --- a/src/server.h +++ b/src/server.h @@ -4076,7 +4076,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor); int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor); int dbAsyncDelete(redisDb *db, robj *key); void emptyDbAsync(redisDb *db); -void streamMoveIdmpKeys(dict *src, dict *dst, int slot); +void streamMoveIdmpKeys(dict *src, dict *dst, struct slotRangeArray *slots); void emptyDbDataAsync(kvstore *keys, kvstore *expires, ebuckets hexpires, dict *stream_idmp_keys, struct asmTrimCtx *ctx); size_t lazyfreeGetPendingObjectsCount(void); size_t lazyfreeGetFreedObjectsCount(void); diff --git a/tests/unit/cluster/atomic-slot-migration.tcl b/tests/unit/cluster/atomic-slot-migration.tcl index b5d2de08a..0a112cf04 100644 --- a/tests/unit/cluster/atomic-slot-migration.tcl +++ b/tests/unit/cluster/atomic-slot-migration.tcl @@ -1690,6 +1690,123 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout R 0 debug asm-trim-method default } + test {Test bgtrim IDMP: disjoint ranges with in-range gap + far-slot; dedup on dest and source} { + # One IMPORT with two non-adjacent slot ranges (union trim). Slot between the ranges and a + # far slot stay on the source; migrated keys move to the dest. Exercises slotRangeArray + # membership + IDMP correctness (stream_idmp_keys vs one bg trim batch). + R 0 debug asm-trim-method bg + R 0 flushall + + set k0 [slot_key 0 idmp] + set k1 [slot_key 1 idmp] + set k2 [slot_key 2 idmp] + set k4 [slot_key 4 idmp] + set k5 [slot_key 5 idmp] + set k6 [slot_key 6 idmp] + set k101 [slot_key 101 idmp] + + foreach k [list $k0 $k1 $k2 $k4 $k5 $k6 $k101] { + R 0 XADD $k IDMP p1 "init" * field "init" + R 0 XCFGSET $k IDMP-DURATION 86400 + } + + set id0 [R 0 XADD $k0 IDMP p1 "r0" * field v0] + set id1 [R 0 XADD $k1 IDMP p1 "r1" * field v1] + set id2 [R 0 XADD $k2 IDMP p1 "r2" * field v2] + set id4 [R 0 XADD $k4 IDMP p1 "r4" * field v4] + set id5 [R 0 XADD $k5 IDMP p1 "r5" * field v5] + set id6 [R 0 XADD $k6 IDMP p1 "r6" * field v6] + set id101 [R 0 XADD $k101 IDMP p1 "r101" * field v101] + + foreach k [list $k0 $k1 $k2 $k4 $k5 $k6 $k101] { + assert_equal 1 [dict get [R 0 XINFO STREAM $k] pids-tracked] + } + + # Migrate [0-2] and [5-6] in a single IMPORT; slot 4 and 101 are outside both ranges. + # After migration, trimmed slots are owned by node 1: use R 1 for those keys (R 0 replies MOVED). + R 1 CLUSTER MIGRATION IMPORT 0 2 5 6 + wait_for_asm_done + + wait_for_condition 1000 10 { + [R 1 EXISTS $k0] == 1 && [R 1 EXISTS $k1] == 1 && [R 1 EXISTS $k2] == 1 && + [R 1 EXISTS $k5] == 1 && [R 1 EXISTS $k6] == 1 && + [R 0 EXISTS $k4] == 1 && [R 0 EXISTS $k101] == 1 + } else { + fail "Migrated streams missing on destination or survivors missing on source" + } + + foreach k [list $k0 $k1 $k2 $k5 $k6] { + assert_equal 1 [dict get [R 1 XINFO STREAM $k] pids-tracked] + } + assert_equal $id0 [R 1 XADD $k0 IDMP p1 "r0" * field dup] + assert_equal $id1 [R 1 XADD $k1 IDMP p1 "r1" * field dup] + assert_equal $id2 [R 1 XADD $k2 IDMP p1 "r2" * field dup] + assert_equal $id5 [R 1 XADD $k5 IDMP p1 "r5" * field dup] + assert_equal $id6 [R 1 XADD $k6 IDMP p1 "r6" * field dup] + + assert_equal 1 [dict get [R 0 XINFO STREAM $k4] pids-tracked] + assert_equal 1 [dict get [R 0 XINFO STREAM $k101] pids-tracked] + assert_equal $id4 [R 0 XADD $k4 IDMP p1 "r4" * field dup] + assert_equal $id101 [R 0 XADD $k101 IDMP p1 "r101" * field dup] + + # cleanup + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 0 2 5 6 + wait_for_asm_done + R 0 flushall + R 0 debug asm-trim-method default + } + + test {Test bgtrim IDMP: many streams; outer ranges migrated, inner + high slots kept} { + # Many IDMP entries across the shard; only outer slot ranges migrate so most keys stay on + # the source — selective trim vs a large stream_idmp_keys dict (same multi-range ASM path). + R 0 debug asm-trim-method bg + R 0 flushall + + set keys {} + foreach slot {0 1 2 3 4 5 6 7 101 102 103} { + lappend keys [slot_key $slot idmp] + } + + foreach k $keys { + R 0 XADD $k IDMP p1 "init" * field "init" + R 0 XCFGSET $k IDMP-DURATION 86400 + R 0 XADD $k IDMP p1 "req" * field v1 + assert_equal 1 [dict get [R 0 XINFO STREAM $k] pids-tracked] + } + + # Single IMPORT: ranges [0-1] and [6-7]; slots 2-5 and 101-103 are not trimmed on source. + R 1 CLUSTER MIGRATION IMPORT 0 1 6 7 + wait_for_asm_done + + set kept_slots {2 3 4 5 101 102 103} + # Trimmed slots are served by node 1 after migration; survivors stay on node 0. + # wait_for_condition body must be a single expr (no foreach/set). + wait_for_condition 1000 10 { + [R 1 EXISTS [slot_key 0 idmp]] == 1 && [R 1 EXISTS [slot_key 1 idmp]] == 1 && + [R 1 EXISTS [slot_key 6 idmp]] == 1 && [R 1 EXISTS [slot_key 7 idmp]] == 1 && + [R 0 EXISTS [slot_key 2 idmp]] == 1 && [R 0 EXISTS [slot_key 3 idmp]] == 1 && + [R 0 EXISTS [slot_key 4 idmp]] == 1 && [R 0 EXISTS [slot_key 5 idmp]] == 1 && + [R 0 EXISTS [slot_key 101 idmp]] == 1 && [R 0 EXISTS [slot_key 102 idmp]] == 1 && + [R 0 EXISTS [slot_key 103 idmp]] == 1 + } else { + fail "Selective IDMP trim did not match migrated slot ranges" + } + + foreach slot $kept_slots { + set k [slot_key $slot idmp] + assert_equal 1 [dict get [R 0 XINFO STREAM $k] pids-tracked] + R 0 XADD $k IDMP p1 "after_trim" * field x + } + + # cleanup + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 0 1 6 7 + wait_for_asm_done + R 0 flushall + R 0 debug asm-trim-method default + } + test "Test bgtrim after a FAILOVER on destination side" { R 1 debug asm-trim-method bg R 4 debug asm-trim-method bg