From 220a577d3c49f4a5105f2c0d0a471bdc725ef829 Mon Sep 17 00:00:00 2001 From: Yuan Wang Date: Tue, 17 Mar 2026 09:33:37 +0800 Subject: [PATCH] Use flush db if sflush all owned slots (#14887) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SFLUSH now detects when the requested slot ranges exactly match the node’s local slot coverage and, in that case, skips slot trimming and instead performs a full DB flush via flushCommandCommon (while still replying with the flushed slot ranges). --- src/cluster.c | 16 ++++++- src/db.c | 18 +++++--- src/server.h | 2 +- tests/unit/cluster/multi-slot-operations.tcl | 48 +++++++++++++++++--- 4 files changed, 67 insertions(+), 17 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index ea0c67636..c315ba1a5 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -2159,8 +2159,7 @@ void sflushCommand(client *c) { slotRangeArray *slots = parseSlotRangesOrReply(c, argc, 1); if (!slots) return; - /* If client is AOF or master, we must obey the slot ranges. - * NOTE: we should exclude CLIENT_PSEUDO_MASTER when merging into fork. */ + /* If client is AOF or master, we must obey the slot ranges. */ int must_obey = mustObeyClient(c); /* Iterate and find the slot ranges that belong to this node. Save them in @@ -2184,6 +2183,19 @@ void sflushCommand(client *c) { } slotRangeArrayFree(slots); + /* If the selected slots are exactly the same as the local slots, we can + * simply flush the entire DB by flushCommandCommon. */ + slotRangeArray *local_slots = clusterGetLocalSlotRanges(); + int all_slots_covered = slotRangeArrayIsEqual(myslots, local_slots); + slotRangeArrayFree(local_slots); + if (all_slots_covered) { + /* If not flush as blocking async, then reply immediately */ + if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, flags, myslots) == 0) { + replySlotsFlushAndFree(c, myslots); + } + return; + } + /* Cancel all ASM tasks that overlap with the given slot ranges. */ clusterAsmCancelBySlotRangeArray(myslots, c->argv[0]->ptr); diff --git a/src/db.c b/src/db.c index b50b7c5d5..6d31e7b17 100644 --- a/src/db.c +++ b/src/db.c @@ -1277,12 +1277,16 @@ void unblockClientForAsyncFlush(uint64_t client_id, void *userdata) { server.current_client = old_client; } -/* Common flush command implementation for FLUSHALL, FLUSHDB. +/* Common flush command implementation for FLUSHALL, FLUSHDB and SFLUSH. * * Return 1 indicates that flush SYNC is actually running in bg as blocking ASYNC * Return 0 otherwise + * + * slots - provided only by SFLUSH command, otherwise NULL. Will be used on + * completion to reply with the slots flush result. Ownership is passed + * to the completion job in case of `blocking_async`. */ -int flushCommandCommon(client *c, int type, int flags) { +int flushCommandCommon(client *c, int type, int flags, struct slotRangeArray *slots) { int blocking_async = 0; /* Flush SYNC option to run as blocking ASYNC */ /* in case of SYNC, check if we can optimize and run it in bg as blocking ASYNC */ @@ -1292,8 +1296,8 @@ int flushCommandCommon(client *c, int type, int flags) { blocking_async = 1; } - /* Cancel all ASM tasks. */ - clusterAsmCancelBySlotRangeArray(NULL, c->argv[0]->ptr); + /* Cancel all ASM tasks that overlap with the given slot ranges. */ + clusterAsmCancelBySlotRangeArray(slots, c->argv[0]->ptr); if (type == FLUSH_TYPE_ALL) flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS); @@ -1309,7 +1313,7 @@ int flushCommandCommon(client *c, int type, int flags) { * lazyfree jobs in queue were processed */ if (blocking_async) { blockClientForAsyncFlush(c); - bioCreateCompRq(BIO_WORKER_LAZY_FREE, unblockClientForAsyncFlush, c->id, NULL); + bioCreateCompRq(BIO_WORKER_LAZY_FREE, unblockClientForAsyncFlush, c->id, slots); } #if defined(USE_JEMALLOC) @@ -1338,7 +1342,7 @@ void flushallCommand(client *c) { if (getFlushCommandFlags(c,&flags) == C_ERR) return; /* If FLUSH SYNC isn't running as blocking async, then reply */ - if (flushCommandCommon(c, FLUSH_TYPE_ALL, flags) == 0) + if (flushCommandCommon(c, FLUSH_TYPE_ALL, flags, NULL) == 0) addReply(c, shared.ok); } @@ -1350,7 +1354,7 @@ void flushdbCommand(client *c) { if (getFlushCommandFlags(c,&flags) == C_ERR) return; /* If FLUSH SYNC isn't running as blocking async, then reply */ - if (flushCommandCommon(c, FLUSH_TYPE_DB,flags) == 0) + if (flushCommandCommon(c, FLUSH_TYPE_DB, flags, NULL) == 0) addReply(c, shared.ok); } diff --git a/src/server.h b/src/server.h index eb1445f84..e6f930cf4 100644 --- a/src/server.h +++ b/src/server.h @@ -3960,7 +3960,7 @@ kvobj *dbUnshareStringValueByLink(redisDb *db, robj *key, kvobj *kv, dictEntryLi #define FLUSH_TYPE_DB 1 #define FLUSH_TYPE_SLOTS 2 void replySlotsFlushAndFree(client *c, struct slotRangeArray *slots); -int flushCommandCommon(client *c, int type, int flags); +int flushCommandCommon(client *c, int type, int flags, struct slotRangeArray *ranges); void unblockClientForAsyncFlush(uint64_t client_id, void *userdata); void blockClientForAsyncFlush(client *c); #define EMPTYDB_NO_FLAGS 0 /* No flags. */ diff --git a/tests/unit/cluster/multi-slot-operations.tcl b/tests/unit/cluster/multi-slot-operations.tcl index b42d30f0a..905fe676a 100644 --- a/tests/unit/cluster/multi-slot-operations.tcl +++ b/tests/unit/cluster/multi-slot-operations.tcl @@ -195,6 +195,7 @@ foreach trim_method {"active" "bg"} { wait_for_ofs_sync [Rn 0] [Rn 2] set loglines [count_log_lines 0] + set loglines2 [count_log_lines -2] # since we have optimization, if the master is not running in blocking context, # we will try to run in blocking ASYNC mode, so we need to use MULTI/EXEC to make it blocking @@ -207,7 +208,7 @@ foreach trim_method {"active" "bg"} { if {$sync_method eq "ASYNC"} { set sync_option "ASYNC" } - R 0 SFLUSH 0 8191 $sync_option + R 0 SFLUSH 0 8190 $sync_option # Execute EXEC if using SYNC if {$sync_method eq "SYNC"} { @@ -223,10 +224,12 @@ foreach trim_method {"active" "bg"} { if {$sync_method ne "SYNC"} { if {$trim_method eq "active"} { - wait_for_log_messages 0 {"*Active trim completed for slots*0-8191*"} $loglines 1000 10 + wait_for_log_messages 0 {"*Active trim completed for slots*0-8190*"} $loglines 1000 10 + wait_for_log_messages -2 {"*Active trim completed for slots*0-8190*"} $loglines2 1000 10 } else { # background trim - wait_for_log_messages 0 {"*Background trim started for slots*0-8191*"} $loglines 1000 10 + wait_for_log_messages 0 {"*Background trim started for slots*0-8190*"} $loglines 1000 10 + wait_for_log_messages -2 {"*Background trim started for slots*0-8190*"} $loglines2 1000 10 } } } @@ -241,14 +244,14 @@ foreach trim_method {"active" "bg"} { } set rd [redis_deferring_client 0] - $rd SFLUSH 0 8191 SYNC ;# running in blocking async method + $rd SFLUSH 0 8190 SYNC ;# running in blocking async method # FLUSHDB will cancel all trim jobs R 0 SELECT 0 R 0 FLUSHDB SYNC # SFLUSH should be unblocked and return empty array - assert_equal [$rd read] "{0 8191}" + assert_equal [$rd read] "{0 8190}" $rd close } @@ -267,7 +270,7 @@ foreach trim_method {"active" "bg"} { wait_for_ofs_sync [Rn 0] [Rn 2] set rd [redis_deferring_client 0] - $rd SFLUSH 0 8191 SYNC ;# running in blocking async method + $rd SFLUSH 0 8190 SYNC ;# running in blocking async method # we can read the slot 1 key assert_equal [R 0 get $slot1_key] "slot1" @@ -278,11 +281,42 @@ foreach trim_method {"active" "bg"} { assert_error "*TRYAGAIN Slot is being trimmed*" {R 0 set $slot1_key "value1"} # wait for SFLUSH to complete - assert_equal [$rd read] "{0 8191}" + assert_equal [$rd read] "{0 8190}" $rd close # there is no trim event since we sfluh the owned slots of this node assert_equal [R 0 asm.get_cluster_event_log] {} assert_equal [R 2 asm.get_cluster_event_log] {} } + + test "SFLUSH all local slots uses flushdb optimization (no trim)" { + R 0 flushall + R 0 debug asm-trim-method active + + # Add keys in slot 0 + for {set i 0} {$i < 100} {incr i} { + R 0 set "{06S}$i" "value$i" + } + assert {[R 0 DBSIZE] == 100} + wait_for_ofs_sync [Rn 0] [Rn 2] + assert {[R 2 DBSIZE] == 100} + + set prev_trim_done [CI 0 cluster_slot_migration_stats_active_trim_completed] + set prev_trim_done2 [CI 2 cluster_slot_migration_stats_active_trim_completed] + + # SFLUSH with multiple ranges that together cover all local slots. + # If the selected slots are exactly the same as the local slots, we can + # simply flush the entire DB. + assert_match "{0 8191}" [R 0 SFLUSH 0 1000 1001 5000 5001 8191] + assert {[R 0 DBSIZE] == 0} + + # Verify replica is also flushed + wait_for_ofs_sync [Rn 0] [Rn 2] + assert {[R 2 DBSIZE] == 0} + + # Verify active_trim_completed counter did NOT increase since it will trigger + # flush (similar to flushdb command) instead of triggering trim. + assert_equal [CI 0 cluster_slot_migration_stats_active_trim_completed] $prev_trim_done + assert_equal [CI 2 cluster_slot_migration_stats_active_trim_completed] $prev_trim_done2 + } }