This commit is contained in:
Shubham S Taple 2026-05-27 03:03:49 +03:00 committed by GitHub
commit a5f4fcb692
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 130 additions and 9 deletions

View file

@ -3058,9 +3058,10 @@ void asmTriggerBackgroundTrim(asmTrimCtx *trim_ctx, int migration_cleanup) {
kvstoreMoveDict(server.db[0].keys, keys, slot);
kvstoreMoveDict(server.db[0].expires, expires, slot);
estoreMoveEbuckets(server.db[0].subexpires, subexpires, slot);
streamMoveIdmpKeys(server.db[0].stream_idmp_keys, stream_idmp_keys, slot);
}
}
/* Move stream IDMP keys from main DB to temp dict (O(IDMP entries x number of slot ranges)) */
streamMoveIdmpKeys(server.db[0].stream_idmp_keys, stream_idmp_keys, slots);
emptyDbDataAsync(keys, expires, subexpires, stream_idmp_keys, trim_ctx);

View file

@ -1108,21 +1108,24 @@ void discardTempDb(redisDb *tempDb) {
zfree(tempDb);
}
/* Move entries whose robj keys belong to the given slot from src dict to dst.
/* Move entries whose robj keys belong to the given slotRangeArray from src dict to dst.
* Matching entries are removed from src and added to dst. */
void streamMoveIdmpKeys(dict *src, dict *dst, int slot) {
void streamMoveIdmpKeys(dict *src, dict *dst, slotRangeArray *slots) {
if (dictSize(src) == 0) return;
/* slots must not be NULL */
serverAssert(slots != NULL);
dictIterator *di = dictGetSafeIterator(src);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
if (calculateKeySlot(key->ptr) == slot) {
if (dictAddRaw(dst, key, NULL)) {
incrRefCount(key);
}
dictDelete(src, key);
/* Check if key belongs to the slot range. */
if (!slotRangeArrayContains(slots, keyHashSlot(key->ptr, sdslen(key->ptr))))
continue;
if (dictAddRaw(dst, key, NULL)) {
incrRefCount(key);
}
dictDelete(src, key);
}
dictReleaseIterator(di);
}

View file

@ -4076,7 +4076,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor);
int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor);
int dbAsyncDelete(redisDb *db, robj *key);
void emptyDbAsync(redisDb *db);
void streamMoveIdmpKeys(dict *src, dict *dst, int slot);
void streamMoveIdmpKeys(dict *src, dict *dst, struct slotRangeArray *slots);
void emptyDbDataAsync(kvstore *keys, kvstore *expires, ebuckets hexpires, dict *stream_idmp_keys, struct asmTrimCtx *ctx);
size_t lazyfreeGetPendingObjectsCount(void);
size_t lazyfreeGetFreedObjectsCount(void);

View file

@ -1690,6 +1690,123 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout
R 0 debug asm-trim-method default
}
test {Test bgtrim IDMP: disjoint ranges with in-range gap + far-slot; dedup on dest and source} {
# One IMPORT with two non-adjacent slot ranges (union trim). Slot between the ranges and a
# far slot stay on the source; migrated keys move to the dest. Exercises slotRangeArray
# membership + IDMP correctness (stream_idmp_keys vs one bg trim batch).
R 0 debug asm-trim-method bg
R 0 flushall
set k0 [slot_key 0 idmp]
set k1 [slot_key 1 idmp]
set k2 [slot_key 2 idmp]
set k4 [slot_key 4 idmp]
set k5 [slot_key 5 idmp]
set k6 [slot_key 6 idmp]
set k101 [slot_key 101 idmp]
foreach k [list $k0 $k1 $k2 $k4 $k5 $k6 $k101] {
R 0 XADD $k IDMP p1 "init" * field "init"
R 0 XCFGSET $k IDMP-DURATION 86400
}
set id0 [R 0 XADD $k0 IDMP p1 "r0" * field v0]
set id1 [R 0 XADD $k1 IDMP p1 "r1" * field v1]
set id2 [R 0 XADD $k2 IDMP p1 "r2" * field v2]
set id4 [R 0 XADD $k4 IDMP p1 "r4" * field v4]
set id5 [R 0 XADD $k5 IDMP p1 "r5" * field v5]
set id6 [R 0 XADD $k6 IDMP p1 "r6" * field v6]
set id101 [R 0 XADD $k101 IDMP p1 "r101" * field v101]
foreach k [list $k0 $k1 $k2 $k4 $k5 $k6 $k101] {
assert_equal 1 [dict get [R 0 XINFO STREAM $k] pids-tracked]
}
# Migrate [0-2] and [5-6] in a single IMPORT; slot 4 and 101 are outside both ranges.
# After migration, trimmed slots are owned by node 1: use R 1 for those keys (R 0 replies MOVED).
R 1 CLUSTER MIGRATION IMPORT 0 2 5 6
wait_for_asm_done
wait_for_condition 1000 10 {
[R 1 EXISTS $k0] == 1 && [R 1 EXISTS $k1] == 1 && [R 1 EXISTS $k2] == 1 &&
[R 1 EXISTS $k5] == 1 && [R 1 EXISTS $k6] == 1 &&
[R 0 EXISTS $k4] == 1 && [R 0 EXISTS $k101] == 1
} else {
fail "Migrated streams missing on destination or survivors missing on source"
}
foreach k [list $k0 $k1 $k2 $k5 $k6] {
assert_equal 1 [dict get [R 1 XINFO STREAM $k] pids-tracked]
}
assert_equal $id0 [R 1 XADD $k0 IDMP p1 "r0" * field dup]
assert_equal $id1 [R 1 XADD $k1 IDMP p1 "r1" * field dup]
assert_equal $id2 [R 1 XADD $k2 IDMP p1 "r2" * field dup]
assert_equal $id5 [R 1 XADD $k5 IDMP p1 "r5" * field dup]
assert_equal $id6 [R 1 XADD $k6 IDMP p1 "r6" * field dup]
assert_equal 1 [dict get [R 0 XINFO STREAM $k4] pids-tracked]
assert_equal 1 [dict get [R 0 XINFO STREAM $k101] pids-tracked]
assert_equal $id4 [R 0 XADD $k4 IDMP p1 "r4" * field dup]
assert_equal $id101 [R 0 XADD $k101 IDMP p1 "r101" * field dup]
# cleanup
wait_for_asm_done
R 0 CLUSTER MIGRATION IMPORT 0 2 5 6
wait_for_asm_done
R 0 flushall
R 0 debug asm-trim-method default
}
test {Test bgtrim IDMP: many streams; outer ranges migrated, inner + high slots kept} {
# Many IDMP entries across the shard; only outer slot ranges migrate so most keys stay on
# the source selective trim vs a large stream_idmp_keys dict (same multi-range ASM path).
R 0 debug asm-trim-method bg
R 0 flushall
set keys {}
foreach slot {0 1 2 3 4 5 6 7 101 102 103} {
lappend keys [slot_key $slot idmp]
}
foreach k $keys {
R 0 XADD $k IDMP p1 "init" * field "init"
R 0 XCFGSET $k IDMP-DURATION 86400
R 0 XADD $k IDMP p1 "req" * field v1
assert_equal 1 [dict get [R 0 XINFO STREAM $k] pids-tracked]
}
# Single IMPORT: ranges [0-1] and [6-7]; slots 2-5 and 101-103 are not trimmed on source.
R 1 CLUSTER MIGRATION IMPORT 0 1 6 7
wait_for_asm_done
set kept_slots {2 3 4 5 101 102 103}
# Trimmed slots are served by node 1 after migration; survivors stay on node 0.
# wait_for_condition body must be a single expr (no foreach/set).
wait_for_condition 1000 10 {
[R 1 EXISTS [slot_key 0 idmp]] == 1 && [R 1 EXISTS [slot_key 1 idmp]] == 1 &&
[R 1 EXISTS [slot_key 6 idmp]] == 1 && [R 1 EXISTS [slot_key 7 idmp]] == 1 &&
[R 0 EXISTS [slot_key 2 idmp]] == 1 && [R 0 EXISTS [slot_key 3 idmp]] == 1 &&
[R 0 EXISTS [slot_key 4 idmp]] == 1 && [R 0 EXISTS [slot_key 5 idmp]] == 1 &&
[R 0 EXISTS [slot_key 101 idmp]] == 1 && [R 0 EXISTS [slot_key 102 idmp]] == 1 &&
[R 0 EXISTS [slot_key 103 idmp]] == 1
} else {
fail "Selective IDMP trim did not match migrated slot ranges"
}
foreach slot $kept_slots {
set k [slot_key $slot idmp]
assert_equal 1 [dict get [R 0 XINFO STREAM $k] pids-tracked]
R 0 XADD $k IDMP p1 "after_trim" * field x
}
# cleanup
wait_for_asm_done
R 0 CLUSTER MIGRATION IMPORT 0 1 6 7
wait_for_asm_done
R 0 flushall
R 0 debug asm-trim-method default
}
test "Test bgtrim after a FAILOVER on destination side" {
R 1 debug asm-trim-method bg
R 4 debug asm-trim-method bg