mirror of
https://github.com/redis/redis.git
synced 2026-06-04 14:16:31 -04:00
Use flush db if sflush all owned slots (#14887)
Some checks are pending
CI / test-ubuntu-latest (push) Waiting to run
CI / test-sanitizer-address (push) Waiting to run
CI / build-debian-old (push) Waiting to run
CI / build-macos-latest (push) Waiting to run
CI / build-32bit (push) Waiting to run
CI / build-libc-malloc (push) Waiting to run
CI / build-centos-jemalloc (push) Waiting to run
CI / build-old-chain-jemalloc (push) Waiting to run
Codecov / code-coverage (push) Waiting to run
External Server Tests / test-external-standalone (push) Waiting to run
External Server Tests / test-external-cluster (push) Waiting to run
External Server Tests / test-external-nodebug (push) Waiting to run
Spellcheck / Spellcheck (push) Waiting to run
Some checks are pending
CI / test-ubuntu-latest (push) Waiting to run
CI / test-sanitizer-address (push) Waiting to run
CI / build-debian-old (push) Waiting to run
CI / build-macos-latest (push) Waiting to run
CI / build-32bit (push) Waiting to run
CI / build-libc-malloc (push) Waiting to run
CI / build-centos-jemalloc (push) Waiting to run
CI / build-old-chain-jemalloc (push) Waiting to run
Codecov / code-coverage (push) Waiting to run
External Server Tests / test-external-standalone (push) Waiting to run
External Server Tests / test-external-cluster (push) Waiting to run
External Server Tests / test-external-nodebug (push) Waiting to run
Spellcheck / Spellcheck (push) Waiting to run
SFLUSH now detects when the requested slot ranges exactly match the node’s local slot coverage and, in that case, skips slot trimming and instead performs a full DB flush via flushCommandCommon (while still replying with the flushed slot ranges).
This commit is contained in:
parent
7e866b47e3
commit
220a577d3c
4 changed files with 67 additions and 17 deletions
|
|
@ -2159,8 +2159,7 @@ void sflushCommand(client *c) {
|
|||
slotRangeArray *slots = parseSlotRangesOrReply(c, argc, 1);
|
||||
if (!slots) return;
|
||||
|
||||
/* If client is AOF or master, we must obey the slot ranges.
|
||||
* NOTE: we should exclude CLIENT_PSEUDO_MASTER when merging into fork. */
|
||||
/* If client is AOF or master, we must obey the slot ranges. */
|
||||
int must_obey = mustObeyClient(c);
|
||||
|
||||
/* Iterate and find the slot ranges that belong to this node. Save them in
|
||||
|
|
@ -2184,6 +2183,19 @@ void sflushCommand(client *c) {
|
|||
}
|
||||
slotRangeArrayFree(slots);
|
||||
|
||||
/* If the selected slots are exactly the same as the local slots, we can
|
||||
* simply flush the entire DB by flushCommandCommon. */
|
||||
slotRangeArray *local_slots = clusterGetLocalSlotRanges();
|
||||
int all_slots_covered = slotRangeArrayIsEqual(myslots, local_slots);
|
||||
slotRangeArrayFree(local_slots);
|
||||
if (all_slots_covered) {
|
||||
/* If not flush as blocking async, then reply immediately */
|
||||
if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, flags, myslots) == 0) {
|
||||
replySlotsFlushAndFree(c, myslots);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/* Cancel all ASM tasks that overlap with the given slot ranges. */
|
||||
clusterAsmCancelBySlotRangeArray(myslots, c->argv[0]->ptr);
|
||||
|
||||
|
|
|
|||
18
src/db.c
18
src/db.c
|
|
@ -1277,12 +1277,16 @@ void unblockClientForAsyncFlush(uint64_t client_id, void *userdata) {
|
|||
server.current_client = old_client;
|
||||
}
|
||||
|
||||
/* Common flush command implementation for FLUSHALL, FLUSHDB.
|
||||
/* Common flush command implementation for FLUSHALL, FLUSHDB and SFLUSH.
|
||||
*
|
||||
* Return 1 indicates that flush SYNC is actually running in bg as blocking ASYNC
|
||||
* Return 0 otherwise
|
||||
*
|
||||
* slots - provided only by SFLUSH command, otherwise NULL. Will be used on
|
||||
* completion to reply with the slots flush result. Ownership is passed
|
||||
* to the completion job in case of `blocking_async`.
|
||||
*/
|
||||
int flushCommandCommon(client *c, int type, int flags) {
|
||||
int flushCommandCommon(client *c, int type, int flags, struct slotRangeArray *slots) {
|
||||
int blocking_async = 0; /* Flush SYNC option to run as blocking ASYNC */
|
||||
|
||||
/* in case of SYNC, check if we can optimize and run it in bg as blocking ASYNC */
|
||||
|
|
@ -1292,8 +1296,8 @@ int flushCommandCommon(client *c, int type, int flags) {
|
|||
blocking_async = 1;
|
||||
}
|
||||
|
||||
/* Cancel all ASM tasks. */
|
||||
clusterAsmCancelBySlotRangeArray(NULL, c->argv[0]->ptr);
|
||||
/* Cancel all ASM tasks that overlap with the given slot ranges. */
|
||||
clusterAsmCancelBySlotRangeArray(slots, c->argv[0]->ptr);
|
||||
|
||||
if (type == FLUSH_TYPE_ALL)
|
||||
flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS);
|
||||
|
|
@ -1309,7 +1313,7 @@ int flushCommandCommon(client *c, int type, int flags) {
|
|||
* lazyfree jobs in queue were processed */
|
||||
if (blocking_async) {
|
||||
blockClientForAsyncFlush(c);
|
||||
bioCreateCompRq(BIO_WORKER_LAZY_FREE, unblockClientForAsyncFlush, c->id, NULL);
|
||||
bioCreateCompRq(BIO_WORKER_LAZY_FREE, unblockClientForAsyncFlush, c->id, slots);
|
||||
}
|
||||
|
||||
#if defined(USE_JEMALLOC)
|
||||
|
|
@ -1338,7 +1342,7 @@ void flushallCommand(client *c) {
|
|||
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
|
||||
|
||||
/* If FLUSH SYNC isn't running as blocking async, then reply */
|
||||
if (flushCommandCommon(c, FLUSH_TYPE_ALL, flags) == 0)
|
||||
if (flushCommandCommon(c, FLUSH_TYPE_ALL, flags, NULL) == 0)
|
||||
addReply(c, shared.ok);
|
||||
}
|
||||
|
||||
|
|
@ -1350,7 +1354,7 @@ void flushdbCommand(client *c) {
|
|||
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
|
||||
|
||||
/* If FLUSH SYNC isn't running as blocking async, then reply */
|
||||
if (flushCommandCommon(c, FLUSH_TYPE_DB,flags) == 0)
|
||||
if (flushCommandCommon(c, FLUSH_TYPE_DB, flags, NULL) == 0)
|
||||
addReply(c, shared.ok);
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3960,7 +3960,7 @@ kvobj *dbUnshareStringValueByLink(redisDb *db, robj *key, kvobj *kv, dictEntryLi
|
|||
#define FLUSH_TYPE_DB 1
|
||||
#define FLUSH_TYPE_SLOTS 2
|
||||
void replySlotsFlushAndFree(client *c, struct slotRangeArray *slots);
|
||||
int flushCommandCommon(client *c, int type, int flags);
|
||||
int flushCommandCommon(client *c, int type, int flags, struct slotRangeArray *ranges);
|
||||
void unblockClientForAsyncFlush(uint64_t client_id, void *userdata);
|
||||
void blockClientForAsyncFlush(client *c);
|
||||
#define EMPTYDB_NO_FLAGS 0 /* No flags. */
|
||||
|
|
|
|||
|
|
@ -195,6 +195,7 @@ foreach trim_method {"active" "bg"} {
|
|||
wait_for_ofs_sync [Rn 0] [Rn 2]
|
||||
|
||||
set loglines [count_log_lines 0]
|
||||
set loglines2 [count_log_lines -2]
|
||||
|
||||
# since we have optimization, if the master is not running in blocking context,
|
||||
# we will try to run in blocking ASYNC mode, so we need to use MULTI/EXEC to make it blocking
|
||||
|
|
@ -207,7 +208,7 @@ foreach trim_method {"active" "bg"} {
|
|||
if {$sync_method eq "ASYNC"} {
|
||||
set sync_option "ASYNC"
|
||||
}
|
||||
R 0 SFLUSH 0 8191 $sync_option
|
||||
R 0 SFLUSH 0 8190 $sync_option
|
||||
|
||||
# Execute EXEC if using SYNC
|
||||
if {$sync_method eq "SYNC"} {
|
||||
|
|
@ -223,10 +224,12 @@ foreach trim_method {"active" "bg"} {
|
|||
|
||||
if {$sync_method ne "SYNC"} {
|
||||
if {$trim_method eq "active"} {
|
||||
wait_for_log_messages 0 {"*Active trim completed for slots*0-8191*"} $loglines 1000 10
|
||||
wait_for_log_messages 0 {"*Active trim completed for slots*0-8190*"} $loglines 1000 10
|
||||
wait_for_log_messages -2 {"*Active trim completed for slots*0-8190*"} $loglines2 1000 10
|
||||
} else {
|
||||
# background trim
|
||||
wait_for_log_messages 0 {"*Background trim started for slots*0-8191*"} $loglines 1000 10
|
||||
wait_for_log_messages 0 {"*Background trim started for slots*0-8190*"} $loglines 1000 10
|
||||
wait_for_log_messages -2 {"*Background trim started for slots*0-8190*"} $loglines2 1000 10
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -241,14 +244,14 @@ foreach trim_method {"active" "bg"} {
|
|||
}
|
||||
|
||||
set rd [redis_deferring_client 0]
|
||||
$rd SFLUSH 0 8191 SYNC ;# running in blocking async method
|
||||
$rd SFLUSH 0 8190 SYNC ;# running in blocking async method
|
||||
|
||||
# FLUSHDB will cancel all trim jobs
|
||||
R 0 SELECT 0
|
||||
R 0 FLUSHDB SYNC
|
||||
|
||||
# SFLUSH should be unblocked and return empty array
|
||||
assert_equal [$rd read] "{0 8191}"
|
||||
assert_equal [$rd read] "{0 8190}"
|
||||
$rd close
|
||||
}
|
||||
|
||||
|
|
@ -267,7 +270,7 @@ foreach trim_method {"active" "bg"} {
|
|||
wait_for_ofs_sync [Rn 0] [Rn 2]
|
||||
|
||||
set rd [redis_deferring_client 0]
|
||||
$rd SFLUSH 0 8191 SYNC ;# running in blocking async method
|
||||
$rd SFLUSH 0 8190 SYNC ;# running in blocking async method
|
||||
|
||||
# we can read the slot 1 key
|
||||
assert_equal [R 0 get $slot1_key] "slot1"
|
||||
|
|
@ -278,11 +281,42 @@ foreach trim_method {"active" "bg"} {
|
|||
assert_error "*TRYAGAIN Slot is being trimmed*" {R 0 set $slot1_key "value1"}
|
||||
|
||||
# wait for SFLUSH to complete
|
||||
assert_equal [$rd read] "{0 8191}"
|
||||
assert_equal [$rd read] "{0 8190}"
|
||||
$rd close
|
||||
|
||||
# there is no trim event since we sfluh the owned slots of this node
|
||||
assert_equal [R 0 asm.get_cluster_event_log] {}
|
||||
assert_equal [R 2 asm.get_cluster_event_log] {}
|
||||
}
|
||||
|
||||
test "SFLUSH all local slots uses flushdb optimization (no trim)" {
|
||||
R 0 flushall
|
||||
R 0 debug asm-trim-method active
|
||||
|
||||
# Add keys in slot 0
|
||||
for {set i 0} {$i < 100} {incr i} {
|
||||
R 0 set "{06S}$i" "value$i"
|
||||
}
|
||||
assert {[R 0 DBSIZE] == 100}
|
||||
wait_for_ofs_sync [Rn 0] [Rn 2]
|
||||
assert {[R 2 DBSIZE] == 100}
|
||||
|
||||
set prev_trim_done [CI 0 cluster_slot_migration_stats_active_trim_completed]
|
||||
set prev_trim_done2 [CI 2 cluster_slot_migration_stats_active_trim_completed]
|
||||
|
||||
# SFLUSH with multiple ranges that together cover all local slots.
|
||||
# If the selected slots are exactly the same as the local slots, we can
|
||||
# simply flush the entire DB.
|
||||
assert_match "{0 8191}" [R 0 SFLUSH 0 1000 1001 5000 5001 8191]
|
||||
assert {[R 0 DBSIZE] == 0}
|
||||
|
||||
# Verify replica is also flushed
|
||||
wait_for_ofs_sync [Rn 0] [Rn 2]
|
||||
assert {[R 2 DBSIZE] == 0}
|
||||
|
||||
# Verify active_trim_completed counter did NOT increase since it will trigger
|
||||
# flush (similar to flushdb command) instead of triggering trim.
|
||||
assert_equal [CI 0 cluster_slot_migration_stats_active_trim_completed] $prev_trim_done
|
||||
assert_equal [CI 2 cluster_slot_migration_stats_active_trim_completed] $prev_trim_done2
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue