From e8887dc44c40b5cd369f6057fd679d9f5ef73dcf Mon Sep 17 00:00:00 2001 From: Yuan Wang Date: Thu, 12 Feb 2026 16:20:14 +0800 Subject: [PATCH] Support SFLUSH to flush slots partially (#14750) Currently, the SFLUSH command is permitted only when the specified slot range fully covers all slot ranges owned by the node. We want to enable it to perform partial slot flushes instead, and the reply should be list of ranges that were flushed. for implementation, we use TRIMSLOTS functionalities (added as part of ASM work) to support flush async. **NOTE**: Redis will reply `-TRYAGAIN Slot is being trimmed` error if clients send write commands to given slots during SFLUSH execution, clients should not send write commands to these slots before getting reply of SFLUSH command. And there is still an issue if we use active trim for `SFLUSH ASYNC` https://github.com/redis/redis/pull/14750#discussion_r2791904891, so we still mark this command `experimental`, we may remove this tag when we deprecate active slot trimming or don't support ASYNC option. --------- Co-authored-by: Ozan Tezcan --- src/blocked.c | 12 +- src/cluster.c | 91 ++++++++++-- src/cluster.h | 2 + src/cluster_asm.c | 143 ++++++++++++------- src/cluster_asm.h | 5 + src/db.c | 54 +++---- src/script.c | 2 + src/server.h | 4 +- tests/unit/cluster/multi-slot-operations.tcl | 126 ++++++++++++++-- 9 files changed, 334 insertions(+), 105 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 4f518c9a5..b973adeaf 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -237,7 +237,11 @@ int blockedClientMayTimeout(client *c) { * unblockClient() will be called with the same client as argument. */ void replyToBlockedClientTimedOut(client *c) { if (c->bstate.btype == BLOCKED_LAZYFREE) { - addReply(c, shared.ok); /* No reason lazy-free to fail */ + /* SFLUSH: reply with empty array, FLUSH*: reply with OK */ + if (c->cmd && c->cmd->proc == sflushCommand) + addReplyArrayLen(c, 0); + else + addReply(c, shared.ok); /* No reason lazy-free to fail */ } else if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) { @@ -297,7 +301,11 @@ void disconnectAllBlockedClients(void) { continue; if (c->bstate.btype == BLOCKED_LAZYFREE) { - addReply(c, shared.ok); /* No reason lazy-free to fail */ + /* SFLUSH: reply with empty array, FLUSH*: reply with OK */ + if (c->cmd && c->cmd->proc == sflushCommand) + addReplyArrayLen(c, 0); + else + addReply(c, shared.ok); updateStatsOnUnblock(c, 0, 0, 0); c->flags &= ~CLIENT_PENDING_COMMAND; unblockClient(c, 1); diff --git a/src/cluster.c b/src/cluster.c index d07c31c59..a642101da 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -24,6 +24,7 @@ #include "cluster_slot_stats.h" #include +#include "bio.h" /* ----------------------------------------------------------------------------- * Key space handling @@ -1175,6 +1176,8 @@ int extractSlotFromKeysResult(robj **argv, getKeysResult *keys_result) { * already "down" but it is fragile to rely on the update of the global state, * so we also handle it here. * + * CLUSTER_REDIR_TRIMMING if the request addresses a slot that is being trimmed. + * * CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is * down but the user attempts to execute a command that addresses one or more keys. */ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, @@ -1416,6 +1419,15 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in return myself; } + /* If this node is responsible for the slot and is currently trimming it, + * SFLUSH may have triggered active trimming and it could still be in progress. + * Here we reject any write commands as no writes should be accepted for + * trimming slots while active trimming is in progress. */ + if (n == myself && is_write_command && isSlotInTrimJob(slot)) { + if (error_code) *error_code = CLUSTER_REDIR_TRIMMING; + return NULL; + } + /* Base case: just return the right node. However, if this node is not * myself, set error_code to MOVED since we need to issue a redirection. */ if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED; @@ -1452,6 +1464,8 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co "-%s %d %s:%d", (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", hashslot, clusterNodePreferredEndpoint(n), port)); + } else if (error_code == CLUSTER_REDIR_TRIMMING) { + addReplyError(c,"-TRYAGAIN Slot is being trimmed"); } else { serverPanic("getNodeByQuery() unknown error."); } @@ -1973,6 +1987,19 @@ void slotRangeArrayFreeGeneric(void *slots) { slotRangeArrayFree(slots); } +/* Returns the number of keys in the given slot ranges. */ +unsigned long long getKeyCountInSlotRangeArray(slotRangeArray *slots) { + if (!slots) return 0; + + unsigned long long key_count = 0; + for (int i = 0; i < slots->num_ranges; i++) { + for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { + key_count += countKeysInSlot(j); + } + } + return key_count; +} + /* Slot range array iterator */ slotRangeArrayIter *slotRangeArrayGetIterator(slotRangeArray *slots) { slotRangeArrayIter *it = zmalloc(sizeof(*it)); @@ -2100,6 +2127,7 @@ slotRangeArray *clusterGetLocalSlotRanges(void) { */ void sflushCommand(client *c) { int flags = EMPTYDB_NO_FLAGS, argc = c->argc; + int trim_method = ASM_TRIM_METHOD_NONE; if (server.cluster_enabled == 0) { addReplyError(c,"This instance has cluster support disabled"); @@ -2127,40 +2155,73 @@ void sflushCommand(client *c) { slotRangeArray *slots = parseSlotRangesOrReply(c, argc, 1); if (!slots) return; + /* If client is AOF or master, we must obey the slot ranges. + * NOTE: we should exclude CLIENT_PSEUDO_MASTER when merging into fork. */ + int must_obey = mustObeyClient(c); + /* Iterate and find the slot ranges that belong to this node. Save them in * a new slotRangeArray. It is allocated on heap since there is a chance * that FLUSH SYNC will be running as blocking ASYNC and only later reply * with slot ranges */ - unsigned char slots_to_flush[CLUSTER_SLOTS] = {0}; /* Requested slots to flush */ slotRangeArray *myslots = NULL; for (int i = 0; i < slots->num_ranges; i++) { for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { - if (clusterIsMySlot(j)) { + if (must_obey || clusterIsMySlot(j)) { myslots = slotRangeArrayAppend(myslots, j); - slots_to_flush[j] = 1; } } } - /* Verify that all slots of mynode got covered. See sflushCommand() comment. */ - int all_slots_covered = 1; - for (int i = 0; i < CLUSTER_SLOTS; i++) { - if (clusterIsMySlot(i) && !slots_to_flush[i]) { - all_slots_covered = 0; - break; - } - } - if (myslots == NULL || !all_slots_covered) { + /* If no slots belong to this node, return empty array. */ + if (myslots == NULL) { addReplyArrayLen(c, 0); slotRangeArrayFree(slots); - slotRangeArrayFree(myslots); return; } slotRangeArrayFree(slots); - /* Flush selected slots. If not flush as blocking async, then reply immediately */ - if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, flags, myslots) == 0) + /* Cancel all ASM tasks that overlap with the given slot ranges. */ + clusterAsmCancelBySlotRangeArray(myslots, c->argv[0]->ptr); + + /* In case of SYNC, check if we can optimize and run it in bg as blocking ASYNC */ + int blocking_async = 0; + if ((!(flags & EMPTYDB_ASYNC)) && (!(c->flags & CLIENT_AVOID_BLOCKING_ASYNC_FLUSH))) { + flags |= EMPTYDB_ASYNC; /* Run as ASYNC */ + blocking_async = 1; + } + + /* Trim the slots if running in async mode and not loading from AOF, + * otherwise delete the keys synchronously. */ + if (flags & EMPTYDB_ASYNC && server.loading == 0) { + /* Update dirty stats before trimming. */ + server.dirty += getKeyCountInSlotRangeArray(myslots); + /* Pass client id for active trim to unblock client when trim completes. */ + trim_method = asmTrimSlots(myslots, blocking_async ? c->id : 0, 0); + } else { + clusterDelKeysInSlotRangeArray(myslots, 1); + } + + /* Without the forceCommandPropagation, when DB was already empty, + * SFLUSH will not be replicated nor put into the AOF. */ + forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF); + + /* Handle waiting for trim job to complete in case of blocking async flush. + * Block the client and schedule completion callback based on trim method: + * - BG trim uses BIO lazyfree worker to trim the slots, so schedule a new + * BIO lazyfree worker to wait for completion, then unblock client and reply. + * - Active trim works in cron job of the main thread, it will automatically + * unblock client and reply in active trim completion. */ + if (blocking_async && trim_method != ASM_TRIM_METHOD_NONE) { + blockClientForAsyncFlush(c); + if (trim_method == ASM_TRIM_METHOD_BG) + bioCreateCompRq(BIO_WORKER_LAZY_FREE, unblockClientForAsyncFlush, c->id, myslots); + else /* ASM_TRIM_METHOD_ACTIVE, just free the slot ranges */ + slotRangeArrayFree(myslots); + } else { + /* Reply with slot ranges that were flushed. SYNC and ASYNC mode will be + * replied here immediately. */ replySlotsFlushAndFree(c, myslots); + } } /* The READWRITE command just clears the READONLY command state. */ diff --git a/src/cluster.h b/src/cluster.h index 7daf09323..b594a7dbd 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -37,6 +37,7 @@ #define CLUSTER_REDIR_DOWN_STATE 5 /* -CLUSTERDOWN, global state. */ #define CLUSTER_REDIR_DOWN_UNBOUND 6 /* -CLUSTERDOWN, unbound slot. */ #define CLUSTER_REDIR_DOWN_RO_STATE 7 /* -CLUSTERDOWN, allow reads. */ +#define CLUSTER_REDIR_TRIMMING 8 /* -TRYAGAIN, slot is being trimmed. */ typedef struct _clusterNode clusterNode; struct clusterState; @@ -196,6 +197,7 @@ int slotRangeArrayGetCurrentSlot(slotRangeArrayIter *it); void slotRangeArrayIteratorFree(slotRangeArrayIter *it); int slotRangeArrayNormalizeAndValidate(slotRangeArray *slots, sds *err); slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos); +unsigned long long getKeyCountInSlotRangeArray(slotRangeArray *slots); unsigned int clusterDelKeysInSlot(unsigned int hashslot, int by_command); unsigned int clusterDelKeysInSlotRangeArray(slotRangeArray *slots, int by_command); diff --git a/src/cluster_asm.c b/src/cluster_asm.c index a09045377..06c1323d7 100644 --- a/src/cluster_asm.c +++ b/src/cluster_asm.c @@ -54,6 +54,12 @@ typedef struct asmTask { redisOpArray *pre_snapshot_module_cmds; /* Module commands to be propagated at the beginning of slot migration */ } asmTask; +typedef struct activeTrimJob { + slotRangeArray *slots; /* Slots being trimmed */ + uint64_t client_id; /* Client ID waiting for active trim completion (0 if none) */ + int migration_cleanup; /* Whether this is a migration cleanup of slots no longer owned */ +} activeTrimJob; + struct asmManager { list *tasks; /* List of asmTask to be processed */ list *archived_tasks; /* List of archived asmTask */ @@ -140,11 +146,12 @@ static void propagateTrimSlots(slotRangeArray *slots); void asmTrimJobSchedule(slotRangeArray *slots); void asmTrimJobProcessPending(void); void asmCancelPendingTrimJobs(void); -void asmTriggerActiveTrim(slotRangeArray *slots); +void asmTriggerActiveTrim(slotRangeArray *slots, uint64_t client_id, int migration_cleanup); void asmActiveTrimEnd(void); int asmIsAnyTrimJobOverlaps(slotRangeArray *slots); void asmTrimSlotsIfNotOwned(slotRangeArray *slots); void asmNotifyStateChange(asmTask *task, int event); +void activeTrimJobFreeMethod(void *ptr); void asmInit(void) { asmManager = zcalloc(sizeof(*asmManager)); @@ -161,7 +168,7 @@ void asmInit(void) { asmManager->active_trim_started = 0; asmManager->active_trim_completed = 0; asmManager->active_trim_cancelled = 0; - listSetFreeMethod(asmManager->active_trim_jobs, slotRangeArrayFreeGeneric); + listSetFreeMethod(asmManager->active_trim_jobs, activeTrimJobFreeMethod); } char *asmTaskStateToString(int state) { @@ -1005,19 +1012,6 @@ void clusterMigrationCommand(client *c) { } } -/* Return the number of keys in the specified slot ranges. */ -unsigned long long asmCountKeysInSlots(slotRangeArray *slots) { - if (!slots) return 0; - - unsigned long long key_count = 0; - for (int i = 0; i < slots->num_ranges; i++) { - for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { - key_count += kvstoreDictSize(server.db[0].keys, j); - } - } - return key_count; -} - /* Log a human-readable message for ASM task lifecycle events. */ void asmLogTaskEvent(asmTask *task, int event) { sds str = slotRangeArrayToString(task->slots); @@ -1034,11 +1028,11 @@ void asmLogTaskEvent(asmTask *task, int event) { break; case ASM_EVENT_IMPORT_COMPLETED: serverLog(LL_NOTICE, "Import task %s completed for slots: %s (imported %llu keys)", - task->id, str, asmCountKeysInSlots(task->slots)); + task->id, str, getKeyCountInSlotRangeArray(task->slots)); break; case ASM_EVENT_MIGRATE_STARTED: serverLog(LL_NOTICE, "Migrate task %s started for slots: %s (keys at start: %llu)", - task->id, str, asmCountKeysInSlots(task->slots)); + task->id, str, getKeyCountInSlotRangeArray(task->slots)); break; case ASM_EVENT_MIGRATE_FAILED: serverLog(LL_NOTICE, "Migrate task %s failed for slots: %s", task->id, str); @@ -1048,7 +1042,7 @@ void asmLogTaskEvent(asmTask *task, int event) { break; case ASM_EVENT_MIGRATE_COMPLETED: serverLog(LL_NOTICE, "Migrate task %s completed for slots: %s (migrated %llu keys)", - task->id, str, asmCountKeysInSlots(task->slots)); + task->id, str, getKeyCountInSlotRangeArray(task->slots)); break; default: break; @@ -1888,6 +1882,14 @@ void clusterSyncSlotsCommand(client *c) { } } + /* Check if there is any trim job in progress for the slot ranges. + * We can't start the migrate task since the trim job will modify the data.*/ + if (asmIsAnyTrimJobOverlaps(slots)) { + addReplyError(c, "Trim job in progress for the slots"); + slotRangeArrayFree(slots); + return; + } + sds task_id = c->argv[3]->ptr; /* Notify the cluster implementation to prepare for the migrate task. */ if (clusterAsmOnEvent(task_id, ASM_EVENT_MIGRATE_PREP, slots) != C_OK || @@ -2779,8 +2781,8 @@ int isSlotInTrimJob(int slot) { /* Check if the slot is in any active trim job. */ listRewind(asmManager->active_trim_jobs, &li); while ((ln = listNext(&li)) != NULL) { - slotRangeArray *slots = listNodeValue(ln); - if (slotRangeArrayOverlaps(slots, &req)) + activeTrimJob *job = listNodeValue(ln); + if (slotRangeArrayOverlaps(job->slots, &req)) return 1; } return 0; @@ -2946,16 +2948,19 @@ void asmUnblockMasterAfterTrim(void) { } } -/* Trim the slots asynchronously in the BIO thread. */ -void asmTriggerBackgroundTrim(slotRangeArray *slots) { +/* Trim the slots asynchronously in the BIO thread. migration_cleanup is true if this + * is a migration cleanup of slots no longer owned. */ +void asmTriggerBackgroundTrim(slotRangeArray *slots, int migration_cleanup) { RedisModuleClusterSlotMigrationTrimInfoV1 fsi = { REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION, (RedisModuleSlotRangeArray *) slots }; - moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, - REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_BACKGROUND, - &fsi); + /* Fire the trim event to modules only if this is a migration cleanup. */ + if (migration_cleanup) + moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, + REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_BACKGROUND, + &fsi); signalFlushedDb(0, 1, slots); @@ -2993,10 +2998,12 @@ void asmTriggerBackgroundTrim(slotRangeArray *slots) { asmUnblockMasterAfterTrim(); } -/* Trim the slots. */ -void asmTrimSlots(slotRangeArray *slots) { +/* Trim the slots, return the trim method used. + * If client_id is non-zero, the client will be unblocked when trim completes. + * If migration_cleanup is true, this is a migration cleanup of slots no longer owned. */ +int asmTrimSlots(struct slotRangeArray *slots, uint64_t client_id, int migration_cleanup) { if (asmManager->debug_trim_method == ASM_DEBUG_TRIM_NONE) - return; + return ASM_TRIM_METHOD_NONE; /* Trigger active trim for the following cases: * 1. Debug override: trim method is set to 'active'. @@ -3011,9 +3018,11 @@ void asmTrimSlots(slotRangeArray *slots) { (asmManager->debug_trim_method == ASM_DEBUG_TRIM_DEFAULT && moduleHasSubscribersForKeyspaceEvent(NOTIFY_KEY_TRIMMED)); if (activetrim) - asmTriggerActiveTrim(slots); + asmTriggerActiveTrim(slots, client_id, migration_cleanup); else - asmTriggerBackgroundTrim(slots); + asmTriggerBackgroundTrim(slots, migration_cleanup); + + return activetrim ? ASM_TRIM_METHOD_ACTIVE : ASM_TRIM_METHOD_BG; } /* Schedule a trim job for the specified slot ranges. The job will be @@ -3068,7 +3077,7 @@ void asmTrimJobProcessPending(void) { listRewind(asmManager->pending_trim_jobs, &li); while ((ln = listNext(&li)) != NULL) { slotRangeArray *slots = listNodeValue(ln); - asmTrimSlots(slots); + asmTrimSlots(slots, 0, 1); propagateTrimSlots(slots); listDelNode(asmManager->pending_trim_jobs, ln); slotRangeArrayFree(slots); @@ -3234,6 +3243,19 @@ void asmCancelPendingTrimJobs(void) { } } +/* Free an activeTrimJob and unblock pending client if needed. */ +void activeTrimJobFreeMethod(void *ptr) { + activeTrimJob *job = ptr; + if (job->client_id != 0) { + /* Reply with the slot ranges that requested to be trimmed. Generally we + * cancel trim jobs as the dataset is reset, no need to trim anymore. */ + unblockClientForAsyncFlush(job->client_id, job->slots); + job->slots = NULL; /* slots were freed in unblockClientForAsyncFlush */ + } + if (job->slots) slotRangeArrayFree(job->slots); + zfree(job); +} + /* Cancel all pending and active trim jobs. */ void asmCancelTrimJobs(void) { if (!asmManager) return; @@ -3308,7 +3330,7 @@ void trimslotsCommand(client *c) { } } } - asmTrimSlots(slots); + asmTrimSlots(slots, 0, 1); } /* Command will not be propagated automatically since it does not modify @@ -3321,7 +3343,8 @@ void trimslotsCommand(client *c) { /* Start the active trim job. */ void asmActiveTrimStart(void) { - slotRangeArray *slots = listNodeValue(listFirst(asmManager->active_trim_jobs)); + activeTrimJob *job = listNodeValue(listFirst(asmManager->active_trim_jobs)); + slotRangeArray *slots = job->slots; serverAssert(asmManager->active_trim_it == NULL); asmManager->active_trim_it = slotRangeArrayGetIterator(slots); @@ -3330,16 +3353,18 @@ void asmActiveTrimStart(void) { asmManager->active_trim_current_job_trimmed = 0; /* Count the number of keys to trim */ - asmManager->active_trim_current_job_keys += asmCountKeysInSlots(slots); + asmManager->active_trim_current_job_keys += getKeyCountInSlotRangeArray(slots); RedisModuleClusterSlotMigrationTrimInfoV1 fsi = { REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION, (RedisModuleSlotRangeArray *) slots }; - moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, - REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_STARTED, - &fsi); + /* Fire the trim event to modules only if this is a migration cleanup. */ + if (job->migration_cleanup) + moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, + REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_STARTED, + &fsi); sds str = slotRangeArrayToString(slots); serverLog(LL_NOTICE, "Active trim initiated for slots: %s, to trim %llu keys.", @@ -3347,9 +3372,14 @@ void asmActiveTrimStart(void) { sdsfree(str); } -/* Schedule an active trim job. */ -void asmTriggerActiveTrim(slotRangeArray *slots) { - listAddNodeTail(asmManager->active_trim_jobs, slotRangeArrayDup(slots)); +/* Schedule an active trim job with optional client waiting for completion. */ +void asmTriggerActiveTrim(slotRangeArray *slots, uint64_t client_id, int migration_cleanup) { + activeTrimJob *job = zmalloc(sizeof(*job)); + job->slots = slotRangeArrayDup(slots); + job->client_id = client_id; + job->migration_cleanup = migration_cleanup; + + listAddNodeTail(asmManager->active_trim_jobs, job); sds str = slotRangeArrayToString(slots); serverLog(LL_NOTICE, "Active trim scheduled for slots: %s", str); sdsfree(str); @@ -3363,7 +3393,8 @@ void asmTriggerActiveTrim(slotRangeArray *slots) { /* End the active trim job. */ void asmActiveTrimEnd(void) { - slotRangeArray *slots = listNodeValue(listFirst(asmManager->active_trim_jobs)); + activeTrimJob *job = listNodeValue(listFirst(asmManager->active_trim_jobs)); + slotRangeArray *slots = job->slots; if (asmManager->active_trim_it) { slotRangeArrayIteratorFree(asmManager->active_trim_it); @@ -3378,9 +3409,11 @@ void asmActiveTrimEnd(void) { (RedisModuleSlotRangeArray *) slots }; - moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, - REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_COMPLETED, - &fsi); + /* Fire the trim event to modules only if this is a migration cleanup. */ + if (job->migration_cleanup) + moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, + REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_COMPLETED, + &fsi); sds str = slotRangeArrayToString(slots); serverLog(LL_NOTICE, "Active trim completed for slots: %s, %llu keys trimmed.", @@ -3434,7 +3467,7 @@ int asmGetTrimmingSlotForCommand(struct redisCommand *cmd, robj **argv, int argc } /* Delete the key and notify the modules. */ -void asmActiveTrimDeleteKey(redisDb *db, robj *keyobj) { +void asmActiveTrimDeleteKey(redisDb *db, robj *keyobj, int migration_cleanup) { if (asmManager->debug_active_trim_delay > 0) debugDelay(asmManager->debug_active_trim_delay); @@ -3444,11 +3477,17 @@ void asmActiveTrimDeleteKey(redisDb *db, robj *keyobj) { dbDelete(db, keyobj); keyModified(NULL, db, keyobj, NULL, 1); - /* The keys are not actually logically deleted from the database, just moved - * to another node. The modules need to know that these keys are no longer - * available locally, so just send the keyspace notification to the modules, - * but not to clients. */ - moduleNotifyKeyspaceEvent(NOTIFY_KEY_TRIMMED, "key_trimmed", keyobj, db->id); + if (migration_cleanup) { + /* The keys are not actually logically deleted from the database, just moved + * to another node. The modules need to know that these keys are no longer + * available locally, so just send the keyspace notification to the modules, + * but not to clients. */ + moduleNotifyKeyspaceEvent(NOTIFY_KEY_TRIMMED, "key_trimmed", keyobj, db->id); + } else { + /* Not a migration cleanup, the key is really deleted from the database, + * need to notify the clients. */ + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", keyobj, db->id); + } asmManager->active_trim_current_job_trimmed++; if (static_key) decrRefCount(keyobj); @@ -3492,6 +3531,8 @@ void asmActiveTrimCycle(void) { timelimit = 1000000 * trim_cycle_time_perc / server.hz / 100; if (timelimit <= 0) timelimit = 1; + activeTrimJob *job = listNodeValue(listFirst(asmManager->active_trim_jobs)); + serverAssert(asmManager->active_trim_it); int slot = slotRangeArrayGetCurrentSlot(asmManager->active_trim_it); @@ -3505,7 +3546,7 @@ void asmActiveTrimCycle(void) { enterExecutionUnit(1, 0); robj *keyobj = createStringObject(sdskey, sdslen(sdskey)); - asmActiveTrimDeleteKey(&server.db[0], keyobj); + asmActiveTrimDeleteKey(&server.db[0], keyobj, job->migration_cleanup); decrRefCount(keyobj); exitExecutionUnit(); postExecutionUnitOperations(); diff --git a/src/cluster_asm.h b/src/cluster_asm.h index 841a55d76..c1e7ccab1 100644 --- a/src/cluster_asm.h +++ b/src/cluster_asm.h @@ -15,6 +15,10 @@ struct asmTask; struct slotRangeArray; struct slotRange; +#define ASM_TRIM_METHOD_NONE 0 +#define ASM_TRIM_METHOD_BG 1 +#define ASM_TRIM_METHOD_ACTIVE 2 + void asmInit(void); void asmBeforeSleep(void); void asmCron(void); @@ -53,5 +57,6 @@ int asmGetTrimmingSlotForCommand(struct redisCommand *cmd, robj **argv, int argc void asmActiveTrimCycle(void); int asmIsKeyInTrimJob(sds keyname); int asmModulePropagateBeforeSlotSnapshot(struct redisCommand *cmd, robj **argv, int argc); +int asmTrimSlots(struct slotRangeArray *slots, uint64_t client_id, int migration_cleanup); #endif diff --git a/src/db.c b/src/db.c index 7b977c8ac..60bb79692 100644 --- a/src/db.c +++ b/src/db.c @@ -1209,11 +1209,22 @@ void flushAllDataAndResetRDB(int flags) { #endif } -/* CB function on blocking ASYNC FLUSH completion - * - * Utilized by commands SFLUSH, FLUSHALL and FLUSHDB. - */ -void flushallSyncBgDone(uint64_t client_id, void *userdata) { +/* Block client for blocking ASYNC FLUSH operation (FLUSH*, SFLUSH). */ +void blockClientForAsyncFlush(client *c) { + /* measure bg job till completion as elapsed time of flush command */ + elapsedStart(&c->bstate.lazyfreeStartTime); + + c->bstate.timeout = 0; + /* We still need to perform cleanup operations for the command, including + * updating the replication offset, so mark this command as pending to + * avoid command from being reset during unblock. */ + c->flags |= CLIENT_PENDING_COMMAND; + blockClient(c, BLOCKED_LAZYFREE); +} + +/* CB function on blocking ASYNC FLUSH completion. + * We will unblock the client and send the proper reply. */ +void unblockClientForAsyncFlush(uint64_t client_id, void *userdata) { slotRangeArray *slots = userdata; client *c = lookupClientByID(client_id); @@ -1254,16 +1265,12 @@ void flushallSyncBgDone(uint64_t client_id, void *userdata) { server.current_client = old_client; } -/* Common flush command implementation for FLUSHALL, FLUSHDB and SFLUSH. +/* Common flush command implementation for FLUSHALL, FLUSHDB. * * Return 1 indicates that flush SYNC is actually running in bg as blocking ASYNC * Return 0 otherwise - * - * slots - provided only by SFLUSH command, otherwise NULL. Will be used on - * completion to reply with the slots flush result. Ownership is passed - * to the completion job in case of `blocking_async`. */ -int flushCommandCommon(client *c, int type, int flags, slotRangeArray *slots) { +int flushCommandCommon(client *c, int type, int flags) { int blocking_async = 0; /* Flush SYNC option to run as blocking ASYNC */ /* in case of SYNC, check if we can optimize and run it in bg as blocking ASYNC */ @@ -1273,8 +1280,8 @@ int flushCommandCommon(client *c, int type, int flags, slotRangeArray *slots) { blocking_async = 1; } - /* Cancel all ASM tasks that overlap with the given slot ranges. */ - clusterAsmCancelBySlotRangeArray(slots, c->argv[0]->ptr); + /* Cancel all ASM tasks. */ + clusterAsmCancelBySlotRangeArray(NULL, c->argv[0]->ptr); if (type == FLUSH_TYPE_ALL) flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS); @@ -1289,16 +1296,8 @@ int flushCommandCommon(client *c, int type, int flags, slotRangeArray *slots) { * worker's queue. To be called and reply with OK only after all preceding pending * lazyfree jobs in queue were processed */ if (blocking_async) { - /* measure bg job till completion as elapsed time of flush command */ - elapsedStart(&c->bstate.lazyfreeStartTime); - - c->bstate.timeout = 0; - /* We still need to perform cleanup operations for the command, including - * updating the replication offset, so mark this command as pending to - * avoid command from being reset during unblock. */ - c->flags |= CLIENT_PENDING_COMMAND; - blockClient(c,BLOCKED_LAZYFREE); - bioCreateCompRq(BIO_WORKER_LAZY_FREE, flushallSyncBgDone, c->id, slots); + blockClientForAsyncFlush(c); + bioCreateCompRq(BIO_WORKER_LAZY_FREE, unblockClientForAsyncFlush, c->id, NULL); } #if defined(USE_JEMALLOC) @@ -1327,7 +1326,7 @@ void flushallCommand(client *c) { if (getFlushCommandFlags(c,&flags) == C_ERR) return; /* If FLUSH SYNC isn't running as blocking async, then reply */ - if (flushCommandCommon(c, FLUSH_TYPE_ALL, flags, NULL) == 0) + if (flushCommandCommon(c, FLUSH_TYPE_ALL, flags) == 0) addReply(c, shared.ok); } @@ -1339,7 +1338,7 @@ void flushdbCommand(client *c) { if (getFlushCommandFlags(c,&flags) == C_ERR) return; /* If FLUSH SYNC isn't running as blocking async, then reply */ - if (flushCommandCommon(c, FLUSH_TYPE_DB,flags, NULL) == 0) + if (flushCommandCommon(c, FLUSH_TYPE_DB,flags) == 0) addReply(c, shared.ok); } @@ -2859,7 +2858,10 @@ keyStatus expireIfNeeded(redisDb *db, robj *key, kvobj *kv, int flags) { if (server.allow_access_trimmed || (flags & EXPIRE_ALLOW_ACCESS_TRIMMED)) return KEY_VALID; - return KEY_TRIMMED; + /* If the slot is not served by this node, we should not allow access + * to the key, we consider it as trimmed. */ + if (!clusterCanAccessKeysInSlot(getKeySlot(key_name))) + return KEY_TRIMMED; } if ((flags & EXPIRE_ALLOW_ACCESS_EXPIRED) || diff --git a/src/script.c b/src/script.c index f222b89e6..66b316abf 100644 --- a/src/script.c +++ b/src/script.c @@ -523,6 +523,8 @@ static int scriptVerifyClusterState(scriptRunCtx *run_ctx, client *c, client *or c->cmd->fullname); } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) { *err = sdsnew("Script attempted to access a slot not served"); + } else if (error_code == CLUSTER_REDIR_TRIMMING) { + *err = sdsnew("Script attempted to access a slot being trimmed"); } else { /* error_code == CLUSTER_REDIR_MOVED || error_code == CLUSTER_REDIR_ASK */ *err = sdsnew("Script attempted to access a non local key in a " diff --git a/src/server.h b/src/server.h index e6a28a621..74c1443fd 100644 --- a/src/server.h +++ b/src/server.h @@ -3943,7 +3943,9 @@ kvobj *dbUnshareStringValueByLink(redisDb *db, robj *key, kvobj *kv, dictEntryLi #define FLUSH_TYPE_DB 1 #define FLUSH_TYPE_SLOTS 2 void replySlotsFlushAndFree(client *c, struct slotRangeArray *slots); -int flushCommandCommon(client *c, int type, int flags, struct slotRangeArray *ranges); +int flushCommandCommon(client *c, int type, int flags); +void unblockClientForAsyncFlush(uint64_t client_id, void *userdata); +void blockClientForAsyncFlush(client *c); #define EMPTYDB_NO_FLAGS 0 /* No flags. */ #define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ #define EMPTYDB_NOFUNCTIONS (1<<1) /* Indicate not to flush the functions. */ diff --git a/tests/unit/cluster/multi-slot-operations.tcl b/tests/unit/cluster/multi-slot-operations.tcl index 5d2d03e85..b42d30f0a 100644 --- a/tests/unit/cluster/multi-slot-operations.tcl +++ b/tests/unit/cluster/multi-slot-operations.tcl @@ -133,20 +133,20 @@ test "SFLUSH - Errors and output validation" { assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 0 999 2001 8191 ASYNCX} # Test SFLUSH output validation - assert_match "" [$master1 SFLUSH 2 4] - assert_match "" [$master1 SFLUSH 0 4] + assert_match "{2 4}" [$master1 SFLUSH 2 4] + assert_match "{0 4}" [$master1 SFLUSH 0 4] assert_match "" [$master2 SFLUSH 0 4] - assert_match "" [$master1 SFLUSH 1 8191] - assert_match "" [$master1 SFLUSH 0 8190] - assert_match "" [$master1 SFLUSH 0 998 2001 8191] - assert_match "" [$master1 SFLUSH 1 999 2001 8191] - assert_match "" [$master1 SFLUSH 0 999 2001 8190] - assert_match "" [$master1 SFLUSH 0 999 2002 8191] + assert_match "{1 999} {2001 8191}" [$master1 SFLUSH 1 8191] + assert_match "{0 999} {2001 8190}" [$master1 SFLUSH 0 8190] + assert_match "{0 998} {2001 8191}" [$master1 SFLUSH 0 998 2001 8191] + assert_match "{1 999} {2001 8191}" [$master1 SFLUSH 1 999 2001 8191] + assert_match "{0 999} {2001 8190}" [$master1 SFLUSH 0 999 2001 8190] + assert_match "{0 999} {2002 8191}" [$master1 SFLUSH 0 999 2002 8191] assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 999 2001 8191] assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 8191] assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 4000 4001 8191] - assert_match "" [$master2 SFLUSH 8193 16383] - assert_match "" [$master2 SFLUSH 8192 16382] + assert_match "{8193 16383}" [$master2 SFLUSH 8193 16383] + assert_match "{8192 16382}" [$master2 SFLUSH 8192 16382] assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383] assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383 SYNC] assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383 ASYNC] @@ -180,3 +180,109 @@ test "SFLUSH - Deletes the keys with argument /SYNC/ASYNC" { } } + +set testmodule [file normalize tests/modules/atomicslotmigration.so] +start_cluster 2 2 [list tags {external:skip cluster experimental modules} config_lines [list loadmodule $testmodule]] { +foreach sync_method {"SYNC" "BLOCKING-ASYNC" "ASYNC"} { +foreach trim_method {"active" "bg"} { + test "sflush can propagate to replicas (sync method: $sync_method, trim method: $trim_method)" { + R 0 flushall + R 0 debug asm-trim-method $trim_method + R 2 debug asm-trim-method $trim_method + + # Add keys in master + R 0 set "06S" "slot0" + wait_for_ofs_sync [Rn 0] [Rn 2] + + set loglines [count_log_lines 0] + + # since we have optimization, if the master is not running in blocking context, + # we will try to run in blocking ASYNC mode, so we need to use MULTI/EXEC to make it blocking + if {$sync_method eq "SYNC"} { + R 0 MULTI + } + + # Execute SFLUSH on master, SYNC will be run as blocking ASYNC if not running in MULTI/EXEC + set sync_option "SYNC" + if {$sync_method eq "ASYNC"} { + set sync_option "ASYNC" + } + R 0 SFLUSH 0 8191 $sync_option + + # Execute EXEC if using SYNC + if {$sync_method eq "SYNC"} { + R 0 EXEC + } + + # Wait for SFLUSH to propagate to replica, and complete the trim + wait_for_condition 1000 10 { + [R 0 DBSIZE] == 0 && [R 2 DBSIZE] == 0 + } else { + fail "SFLUSH did not propagate to replica" + } + + if {$sync_method ne "SYNC"} { + if {$trim_method eq "active"} { + wait_for_log_messages 0 {"*Active trim completed for slots*0-8191*"} $loglines 1000 10 + } else { + # background trim + wait_for_log_messages 0 {"*Background trim started for slots*0-8191*"} $loglines 1000 10 + } + } + } +} +} + test "Canceling active trimming can unblock sflush" { + # Delay active trim to make sure it is not completed before FLUSHDB + R 0 debug asm-trim-method active 10000 ;# delay 10ms per key + # Add slot 0 keys in master + for {set i 0} {$i < 1000} {incr i} { + R 0 set "{06S}$i" "value$i" + } + + set rd [redis_deferring_client 0] + $rd SFLUSH 0 8191 SYNC ;# running in blocking async method + + # FLUSHDB will cancel all trim jobs + R 0 SELECT 0 + R 0 FLUSHDB SYNC + + # SFLUSH should be unblocked and return empty array + assert_equal [$rd read] "{0 8191}" + $rd close + } + + test "Write is rejected and read is allowed in SFLUSH slots using active trim" { + R 0 debug asm-trim-method active 1000 ;# delay 1ms per key + R 0 asm.clear_event_log + R 2 asm.clear_event_log + + # Add slot 0 keys + for {set i 0} {$i < 1000} {incr i} { + R 0 set "{06S}$i" "value$i" + } + # Add a slot 1 key, we should trim slot 0 first, then slot 1 + set slot1_key "Qi" + R 0 set $slot1_key "slot1" + wait_for_ofs_sync [Rn 0] [Rn 2] + + set rd [redis_deferring_client 0] + $rd SFLUSH 0 8191 SYNC ;# running in blocking async method + + # we can read the slot 1 key + assert_equal [R 0 get $slot1_key] "slot1" + # Module with flag REDISMODULE_OPEN_KEY_ACCESS_TRIMMED also can read the key + assert_equal [R 0 asm.read_pending_trim_key $slot1_key] "slot1" + + # we can not write to the slot 1 key + assert_error "*TRYAGAIN Slot is being trimmed*" {R 0 set $slot1_key "value1"} + + # wait for SFLUSH to complete + assert_equal [$rd read] "{0 8191}" + $rd close + + # there is no trim event since we sfluh the owned slots of this node + assert_equal [R 0 asm.get_cluster_event_log] {} + assert_equal [R 2 asm.get_cluster_event_log] {} + } +}