diff --git a/src/blocked.c b/src/blocked.c index 4f518c9a5..b973adeaf 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -237,7 +237,11 @@ int blockedClientMayTimeout(client *c) { * unblockClient() will be called with the same client as argument. */ void replyToBlockedClientTimedOut(client *c) { if (c->bstate.btype == BLOCKED_LAZYFREE) { - addReply(c, shared.ok); /* No reason lazy-free to fail */ + /* SFLUSH: reply with empty array, FLUSH*: reply with OK */ + if (c->cmd && c->cmd->proc == sflushCommand) + addReplyArrayLen(c, 0); + else + addReply(c, shared.ok); /* No reason lazy-free to fail */ } else if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) { @@ -297,7 +301,11 @@ void disconnectAllBlockedClients(void) { continue; if (c->bstate.btype == BLOCKED_LAZYFREE) { - addReply(c, shared.ok); /* No reason lazy-free to fail */ + /* SFLUSH: reply with empty array, FLUSH*: reply with OK */ + if (c->cmd && c->cmd->proc == sflushCommand) + addReplyArrayLen(c, 0); + else + addReply(c, shared.ok); updateStatsOnUnblock(c, 0, 0, 0); c->flags &= ~CLIENT_PENDING_COMMAND; unblockClient(c, 1); diff --git a/src/cluster.c b/src/cluster.c index d07c31c59..a642101da 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -24,6 +24,7 @@ #include "cluster_slot_stats.h" #include +#include "bio.h" /* ----------------------------------------------------------------------------- * Key space handling @@ -1175,6 +1176,8 @@ int extractSlotFromKeysResult(robj **argv, getKeysResult *keys_result) { * already "down" but it is fragile to rely on the update of the global state, * so we also handle it here. * + * CLUSTER_REDIR_TRIMMING if the request addresses a slot that is being trimmed. + * * CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is * down but the user attempts to execute a command that addresses one or more keys. */ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, @@ -1416,6 +1419,15 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in return myself; } + /* If this node is responsible for the slot and is currently trimming it, + * SFLUSH may have triggered active trimming and it could still be in progress. + * Here we reject any write commands as no writes should be accepted for + * trimming slots while active trimming is in progress. */ + if (n == myself && is_write_command && isSlotInTrimJob(slot)) { + if (error_code) *error_code = CLUSTER_REDIR_TRIMMING; + return NULL; + } + /* Base case: just return the right node. However, if this node is not * myself, set error_code to MOVED since we need to issue a redirection. */ if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED; @@ -1452,6 +1464,8 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co "-%s %d %s:%d", (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", hashslot, clusterNodePreferredEndpoint(n), port)); + } else if (error_code == CLUSTER_REDIR_TRIMMING) { + addReplyError(c,"-TRYAGAIN Slot is being trimmed"); } else { serverPanic("getNodeByQuery() unknown error."); } @@ -1973,6 +1987,19 @@ void slotRangeArrayFreeGeneric(void *slots) { slotRangeArrayFree(slots); } +/* Returns the number of keys in the given slot ranges. */ +unsigned long long getKeyCountInSlotRangeArray(slotRangeArray *slots) { + if (!slots) return 0; + + unsigned long long key_count = 0; + for (int i = 0; i < slots->num_ranges; i++) { + for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { + key_count += countKeysInSlot(j); + } + } + return key_count; +} + /* Slot range array iterator */ slotRangeArrayIter *slotRangeArrayGetIterator(slotRangeArray *slots) { slotRangeArrayIter *it = zmalloc(sizeof(*it)); @@ -2100,6 +2127,7 @@ slotRangeArray *clusterGetLocalSlotRanges(void) { */ void sflushCommand(client *c) { int flags = EMPTYDB_NO_FLAGS, argc = c->argc; + int trim_method = ASM_TRIM_METHOD_NONE; if (server.cluster_enabled == 0) { addReplyError(c,"This instance has cluster support disabled"); @@ -2127,40 +2155,73 @@ void sflushCommand(client *c) { slotRangeArray *slots = parseSlotRangesOrReply(c, argc, 1); if (!slots) return; + /* If client is AOF or master, we must obey the slot ranges. + * NOTE: we should exclude CLIENT_PSEUDO_MASTER when merging into fork. */ + int must_obey = mustObeyClient(c); + /* Iterate and find the slot ranges that belong to this node. Save them in * a new slotRangeArray. It is allocated on heap since there is a chance * that FLUSH SYNC will be running as blocking ASYNC and only later reply * with slot ranges */ - unsigned char slots_to_flush[CLUSTER_SLOTS] = {0}; /* Requested slots to flush */ slotRangeArray *myslots = NULL; for (int i = 0; i < slots->num_ranges; i++) { for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { - if (clusterIsMySlot(j)) { + if (must_obey || clusterIsMySlot(j)) { myslots = slotRangeArrayAppend(myslots, j); - slots_to_flush[j] = 1; } } } - /* Verify that all slots of mynode got covered. See sflushCommand() comment. */ - int all_slots_covered = 1; - for (int i = 0; i < CLUSTER_SLOTS; i++) { - if (clusterIsMySlot(i) && !slots_to_flush[i]) { - all_slots_covered = 0; - break; - } - } - if (myslots == NULL || !all_slots_covered) { + /* If no slots belong to this node, return empty array. */ + if (myslots == NULL) { addReplyArrayLen(c, 0); slotRangeArrayFree(slots); - slotRangeArrayFree(myslots); return; } slotRangeArrayFree(slots); - /* Flush selected slots. If not flush as blocking async, then reply immediately */ - if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, flags, myslots) == 0) + /* Cancel all ASM tasks that overlap with the given slot ranges. */ + clusterAsmCancelBySlotRangeArray(myslots, c->argv[0]->ptr); + + /* In case of SYNC, check if we can optimize and run it in bg as blocking ASYNC */ + int blocking_async = 0; + if ((!(flags & EMPTYDB_ASYNC)) && (!(c->flags & CLIENT_AVOID_BLOCKING_ASYNC_FLUSH))) { + flags |= EMPTYDB_ASYNC; /* Run as ASYNC */ + blocking_async = 1; + } + + /* Trim the slots if running in async mode and not loading from AOF, + * otherwise delete the keys synchronously. */ + if (flags & EMPTYDB_ASYNC && server.loading == 0) { + /* Update dirty stats before trimming. */ + server.dirty += getKeyCountInSlotRangeArray(myslots); + /* Pass client id for active trim to unblock client when trim completes. */ + trim_method = asmTrimSlots(myslots, blocking_async ? c->id : 0, 0); + } else { + clusterDelKeysInSlotRangeArray(myslots, 1); + } + + /* Without the forceCommandPropagation, when DB was already empty, + * SFLUSH will not be replicated nor put into the AOF. */ + forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF); + + /* Handle waiting for trim job to complete in case of blocking async flush. + * Block the client and schedule completion callback based on trim method: + * - BG trim uses BIO lazyfree worker to trim the slots, so schedule a new + * BIO lazyfree worker to wait for completion, then unblock client and reply. + * - Active trim works in cron job of the main thread, it will automatically + * unblock client and reply in active trim completion. */ + if (blocking_async && trim_method != ASM_TRIM_METHOD_NONE) { + blockClientForAsyncFlush(c); + if (trim_method == ASM_TRIM_METHOD_BG) + bioCreateCompRq(BIO_WORKER_LAZY_FREE, unblockClientForAsyncFlush, c->id, myslots); + else /* ASM_TRIM_METHOD_ACTIVE, just free the slot ranges */ + slotRangeArrayFree(myslots); + } else { + /* Reply with slot ranges that were flushed. SYNC and ASYNC mode will be + * replied here immediately. */ replySlotsFlushAndFree(c, myslots); + } } /* The READWRITE command just clears the READONLY command state. */ diff --git a/src/cluster.h b/src/cluster.h index 7daf09323..b594a7dbd 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -37,6 +37,7 @@ #define CLUSTER_REDIR_DOWN_STATE 5 /* -CLUSTERDOWN, global state. */ #define CLUSTER_REDIR_DOWN_UNBOUND 6 /* -CLUSTERDOWN, unbound slot. */ #define CLUSTER_REDIR_DOWN_RO_STATE 7 /* -CLUSTERDOWN, allow reads. */ +#define CLUSTER_REDIR_TRIMMING 8 /* -TRYAGAIN, slot is being trimmed. */ typedef struct _clusterNode clusterNode; struct clusterState; @@ -196,6 +197,7 @@ int slotRangeArrayGetCurrentSlot(slotRangeArrayIter *it); void slotRangeArrayIteratorFree(slotRangeArrayIter *it); int slotRangeArrayNormalizeAndValidate(slotRangeArray *slots, sds *err); slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos); +unsigned long long getKeyCountInSlotRangeArray(slotRangeArray *slots); unsigned int clusterDelKeysInSlot(unsigned int hashslot, int by_command); unsigned int clusterDelKeysInSlotRangeArray(slotRangeArray *slots, int by_command); diff --git a/src/cluster_asm.c b/src/cluster_asm.c index a09045377..06c1323d7 100644 --- a/src/cluster_asm.c +++ b/src/cluster_asm.c @@ -54,6 +54,12 @@ typedef struct asmTask { redisOpArray *pre_snapshot_module_cmds; /* Module commands to be propagated at the beginning of slot migration */ } asmTask; +typedef struct activeTrimJob { + slotRangeArray *slots; /* Slots being trimmed */ + uint64_t client_id; /* Client ID waiting for active trim completion (0 if none) */ + int migration_cleanup; /* Whether this is a migration cleanup of slots no longer owned */ +} activeTrimJob; + struct asmManager { list *tasks; /* List of asmTask to be processed */ list *archived_tasks; /* List of archived asmTask */ @@ -140,11 +146,12 @@ static void propagateTrimSlots(slotRangeArray *slots); void asmTrimJobSchedule(slotRangeArray *slots); void asmTrimJobProcessPending(void); void asmCancelPendingTrimJobs(void); -void asmTriggerActiveTrim(slotRangeArray *slots); +void asmTriggerActiveTrim(slotRangeArray *slots, uint64_t client_id, int migration_cleanup); void asmActiveTrimEnd(void); int asmIsAnyTrimJobOverlaps(slotRangeArray *slots); void asmTrimSlotsIfNotOwned(slotRangeArray *slots); void asmNotifyStateChange(asmTask *task, int event); +void activeTrimJobFreeMethod(void *ptr); void asmInit(void) { asmManager = zcalloc(sizeof(*asmManager)); @@ -161,7 +168,7 @@ void asmInit(void) { asmManager->active_trim_started = 0; asmManager->active_trim_completed = 0; asmManager->active_trim_cancelled = 0; - listSetFreeMethod(asmManager->active_trim_jobs, slotRangeArrayFreeGeneric); + listSetFreeMethod(asmManager->active_trim_jobs, activeTrimJobFreeMethod); } char *asmTaskStateToString(int state) { @@ -1005,19 +1012,6 @@ void clusterMigrationCommand(client *c) { } } -/* Return the number of keys in the specified slot ranges. */ -unsigned long long asmCountKeysInSlots(slotRangeArray *slots) { - if (!slots) return 0; - - unsigned long long key_count = 0; - for (int i = 0; i < slots->num_ranges; i++) { - for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { - key_count += kvstoreDictSize(server.db[0].keys, j); - } - } - return key_count; -} - /* Log a human-readable message for ASM task lifecycle events. */ void asmLogTaskEvent(asmTask *task, int event) { sds str = slotRangeArrayToString(task->slots); @@ -1034,11 +1028,11 @@ void asmLogTaskEvent(asmTask *task, int event) { break; case ASM_EVENT_IMPORT_COMPLETED: serverLog(LL_NOTICE, "Import task %s completed for slots: %s (imported %llu keys)", - task->id, str, asmCountKeysInSlots(task->slots)); + task->id, str, getKeyCountInSlotRangeArray(task->slots)); break; case ASM_EVENT_MIGRATE_STARTED: serverLog(LL_NOTICE, "Migrate task %s started for slots: %s (keys at start: %llu)", - task->id, str, asmCountKeysInSlots(task->slots)); + task->id, str, getKeyCountInSlotRangeArray(task->slots)); break; case ASM_EVENT_MIGRATE_FAILED: serverLog(LL_NOTICE, "Migrate task %s failed for slots: %s", task->id, str); @@ -1048,7 +1042,7 @@ void asmLogTaskEvent(asmTask *task, int event) { break; case ASM_EVENT_MIGRATE_COMPLETED: serverLog(LL_NOTICE, "Migrate task %s completed for slots: %s (migrated %llu keys)", - task->id, str, asmCountKeysInSlots(task->slots)); + task->id, str, getKeyCountInSlotRangeArray(task->slots)); break; default: break; @@ -1888,6 +1882,14 @@ void clusterSyncSlotsCommand(client *c) { } } + /* Check if there is any trim job in progress for the slot ranges. + * We can't start the migrate task since the trim job will modify the data.*/ + if (asmIsAnyTrimJobOverlaps(slots)) { + addReplyError(c, "Trim job in progress for the slots"); + slotRangeArrayFree(slots); + return; + } + sds task_id = c->argv[3]->ptr; /* Notify the cluster implementation to prepare for the migrate task. */ if (clusterAsmOnEvent(task_id, ASM_EVENT_MIGRATE_PREP, slots) != C_OK || @@ -2779,8 +2781,8 @@ int isSlotInTrimJob(int slot) { /* Check if the slot is in any active trim job. */ listRewind(asmManager->active_trim_jobs, &li); while ((ln = listNext(&li)) != NULL) { - slotRangeArray *slots = listNodeValue(ln); - if (slotRangeArrayOverlaps(slots, &req)) + activeTrimJob *job = listNodeValue(ln); + if (slotRangeArrayOverlaps(job->slots, &req)) return 1; } return 0; @@ -2946,16 +2948,19 @@ void asmUnblockMasterAfterTrim(void) { } } -/* Trim the slots asynchronously in the BIO thread. */ -void asmTriggerBackgroundTrim(slotRangeArray *slots) { +/* Trim the slots asynchronously in the BIO thread. migration_cleanup is true if this + * is a migration cleanup of slots no longer owned. */ +void asmTriggerBackgroundTrim(slotRangeArray *slots, int migration_cleanup) { RedisModuleClusterSlotMigrationTrimInfoV1 fsi = { REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION, (RedisModuleSlotRangeArray *) slots }; - moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, - REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_BACKGROUND, - &fsi); + /* Fire the trim event to modules only if this is a migration cleanup. */ + if (migration_cleanup) + moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, + REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_BACKGROUND, + &fsi); signalFlushedDb(0, 1, slots); @@ -2993,10 +2998,12 @@ void asmTriggerBackgroundTrim(slotRangeArray *slots) { asmUnblockMasterAfterTrim(); } -/* Trim the slots. */ -void asmTrimSlots(slotRangeArray *slots) { +/* Trim the slots, return the trim method used. + * If client_id is non-zero, the client will be unblocked when trim completes. + * If migration_cleanup is true, this is a migration cleanup of slots no longer owned. */ +int asmTrimSlots(struct slotRangeArray *slots, uint64_t client_id, int migration_cleanup) { if (asmManager->debug_trim_method == ASM_DEBUG_TRIM_NONE) - return; + return ASM_TRIM_METHOD_NONE; /* Trigger active trim for the following cases: * 1. Debug override: trim method is set to 'active'. @@ -3011,9 +3018,11 @@ void asmTrimSlots(slotRangeArray *slots) { (asmManager->debug_trim_method == ASM_DEBUG_TRIM_DEFAULT && moduleHasSubscribersForKeyspaceEvent(NOTIFY_KEY_TRIMMED)); if (activetrim) - asmTriggerActiveTrim(slots); + asmTriggerActiveTrim(slots, client_id, migration_cleanup); else - asmTriggerBackgroundTrim(slots); + asmTriggerBackgroundTrim(slots, migration_cleanup); + + return activetrim ? ASM_TRIM_METHOD_ACTIVE : ASM_TRIM_METHOD_BG; } /* Schedule a trim job for the specified slot ranges. The job will be @@ -3068,7 +3077,7 @@ void asmTrimJobProcessPending(void) { listRewind(asmManager->pending_trim_jobs, &li); while ((ln = listNext(&li)) != NULL) { slotRangeArray *slots = listNodeValue(ln); - asmTrimSlots(slots); + asmTrimSlots(slots, 0, 1); propagateTrimSlots(slots); listDelNode(asmManager->pending_trim_jobs, ln); slotRangeArrayFree(slots); @@ -3234,6 +3243,19 @@ void asmCancelPendingTrimJobs(void) { } } +/* Free an activeTrimJob and unblock pending client if needed. */ +void activeTrimJobFreeMethod(void *ptr) { + activeTrimJob *job = ptr; + if (job->client_id != 0) { + /* Reply with the slot ranges that requested to be trimmed. Generally we + * cancel trim jobs as the dataset is reset, no need to trim anymore. */ + unblockClientForAsyncFlush(job->client_id, job->slots); + job->slots = NULL; /* slots were freed in unblockClientForAsyncFlush */ + } + if (job->slots) slotRangeArrayFree(job->slots); + zfree(job); +} + /* Cancel all pending and active trim jobs. */ void asmCancelTrimJobs(void) { if (!asmManager) return; @@ -3308,7 +3330,7 @@ void trimslotsCommand(client *c) { } } } - asmTrimSlots(slots); + asmTrimSlots(slots, 0, 1); } /* Command will not be propagated automatically since it does not modify @@ -3321,7 +3343,8 @@ void trimslotsCommand(client *c) { /* Start the active trim job. */ void asmActiveTrimStart(void) { - slotRangeArray *slots = listNodeValue(listFirst(asmManager->active_trim_jobs)); + activeTrimJob *job = listNodeValue(listFirst(asmManager->active_trim_jobs)); + slotRangeArray *slots = job->slots; serverAssert(asmManager->active_trim_it == NULL); asmManager->active_trim_it = slotRangeArrayGetIterator(slots); @@ -3330,16 +3353,18 @@ void asmActiveTrimStart(void) { asmManager->active_trim_current_job_trimmed = 0; /* Count the number of keys to trim */ - asmManager->active_trim_current_job_keys += asmCountKeysInSlots(slots); + asmManager->active_trim_current_job_keys += getKeyCountInSlotRangeArray(slots); RedisModuleClusterSlotMigrationTrimInfoV1 fsi = { REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION, (RedisModuleSlotRangeArray *) slots }; - moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, - REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_STARTED, - &fsi); + /* Fire the trim event to modules only if this is a migration cleanup. */ + if (job->migration_cleanup) + moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, + REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_STARTED, + &fsi); sds str = slotRangeArrayToString(slots); serverLog(LL_NOTICE, "Active trim initiated for slots: %s, to trim %llu keys.", @@ -3347,9 +3372,14 @@ void asmActiveTrimStart(void) { sdsfree(str); } -/* Schedule an active trim job. */ -void asmTriggerActiveTrim(slotRangeArray *slots) { - listAddNodeTail(asmManager->active_trim_jobs, slotRangeArrayDup(slots)); +/* Schedule an active trim job with optional client waiting for completion. */ +void asmTriggerActiveTrim(slotRangeArray *slots, uint64_t client_id, int migration_cleanup) { + activeTrimJob *job = zmalloc(sizeof(*job)); + job->slots = slotRangeArrayDup(slots); + job->client_id = client_id; + job->migration_cleanup = migration_cleanup; + + listAddNodeTail(asmManager->active_trim_jobs, job); sds str = slotRangeArrayToString(slots); serverLog(LL_NOTICE, "Active trim scheduled for slots: %s", str); sdsfree(str); @@ -3363,7 +3393,8 @@ void asmTriggerActiveTrim(slotRangeArray *slots) { /* End the active trim job. */ void asmActiveTrimEnd(void) { - slotRangeArray *slots = listNodeValue(listFirst(asmManager->active_trim_jobs)); + activeTrimJob *job = listNodeValue(listFirst(asmManager->active_trim_jobs)); + slotRangeArray *slots = job->slots; if (asmManager->active_trim_it) { slotRangeArrayIteratorFree(asmManager->active_trim_it); @@ -3378,9 +3409,11 @@ void asmActiveTrimEnd(void) { (RedisModuleSlotRangeArray *) slots }; - moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, - REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_COMPLETED, - &fsi); + /* Fire the trim event to modules only if this is a migration cleanup. */ + if (job->migration_cleanup) + moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, + REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_COMPLETED, + &fsi); sds str = slotRangeArrayToString(slots); serverLog(LL_NOTICE, "Active trim completed for slots: %s, %llu keys trimmed.", @@ -3434,7 +3467,7 @@ int asmGetTrimmingSlotForCommand(struct redisCommand *cmd, robj **argv, int argc } /* Delete the key and notify the modules. */ -void asmActiveTrimDeleteKey(redisDb *db, robj *keyobj) { +void asmActiveTrimDeleteKey(redisDb *db, robj *keyobj, int migration_cleanup) { if (asmManager->debug_active_trim_delay > 0) debugDelay(asmManager->debug_active_trim_delay); @@ -3444,11 +3477,17 @@ void asmActiveTrimDeleteKey(redisDb *db, robj *keyobj) { dbDelete(db, keyobj); keyModified(NULL, db, keyobj, NULL, 1); - /* The keys are not actually logically deleted from the database, just moved - * to another node. The modules need to know that these keys are no longer - * available locally, so just send the keyspace notification to the modules, - * but not to clients. */ - moduleNotifyKeyspaceEvent(NOTIFY_KEY_TRIMMED, "key_trimmed", keyobj, db->id); + if (migration_cleanup) { + /* The keys are not actually logically deleted from the database, just moved + * to another node. The modules need to know that these keys are no longer + * available locally, so just send the keyspace notification to the modules, + * but not to clients. */ + moduleNotifyKeyspaceEvent(NOTIFY_KEY_TRIMMED, "key_trimmed", keyobj, db->id); + } else { + /* Not a migration cleanup, the key is really deleted from the database, + * need to notify the clients. */ + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", keyobj, db->id); + } asmManager->active_trim_current_job_trimmed++; if (static_key) decrRefCount(keyobj); @@ -3492,6 +3531,8 @@ void asmActiveTrimCycle(void) { timelimit = 1000000 * trim_cycle_time_perc / server.hz / 100; if (timelimit <= 0) timelimit = 1; + activeTrimJob *job = listNodeValue(listFirst(asmManager->active_trim_jobs)); + serverAssert(asmManager->active_trim_it); int slot = slotRangeArrayGetCurrentSlot(asmManager->active_trim_it); @@ -3505,7 +3546,7 @@ void asmActiveTrimCycle(void) { enterExecutionUnit(1, 0); robj *keyobj = createStringObject(sdskey, sdslen(sdskey)); - asmActiveTrimDeleteKey(&server.db[0], keyobj); + asmActiveTrimDeleteKey(&server.db[0], keyobj, job->migration_cleanup); decrRefCount(keyobj); exitExecutionUnit(); postExecutionUnitOperations(); diff --git a/src/cluster_asm.h b/src/cluster_asm.h index 841a55d76..c1e7ccab1 100644 --- a/src/cluster_asm.h +++ b/src/cluster_asm.h @@ -15,6 +15,10 @@ struct asmTask; struct slotRangeArray; struct slotRange; +#define ASM_TRIM_METHOD_NONE 0 +#define ASM_TRIM_METHOD_BG 1 +#define ASM_TRIM_METHOD_ACTIVE 2 + void asmInit(void); void asmBeforeSleep(void); void asmCron(void); @@ -53,5 +57,6 @@ int asmGetTrimmingSlotForCommand(struct redisCommand *cmd, robj **argv, int argc void asmActiveTrimCycle(void); int asmIsKeyInTrimJob(sds keyname); int asmModulePropagateBeforeSlotSnapshot(struct redisCommand *cmd, robj **argv, int argc); +int asmTrimSlots(struct slotRangeArray *slots, uint64_t client_id, int migration_cleanup); #endif diff --git a/src/db.c b/src/db.c index 7b977c8ac..60bb79692 100644 --- a/src/db.c +++ b/src/db.c @@ -1209,11 +1209,22 @@ void flushAllDataAndResetRDB(int flags) { #endif } -/* CB function on blocking ASYNC FLUSH completion - * - * Utilized by commands SFLUSH, FLUSHALL and FLUSHDB. - */ -void flushallSyncBgDone(uint64_t client_id, void *userdata) { +/* Block client for blocking ASYNC FLUSH operation (FLUSH*, SFLUSH). */ +void blockClientForAsyncFlush(client *c) { + /* measure bg job till completion as elapsed time of flush command */ + elapsedStart(&c->bstate.lazyfreeStartTime); + + c->bstate.timeout = 0; + /* We still need to perform cleanup operations for the command, including + * updating the replication offset, so mark this command as pending to + * avoid command from being reset during unblock. */ + c->flags |= CLIENT_PENDING_COMMAND; + blockClient(c, BLOCKED_LAZYFREE); +} + +/* CB function on blocking ASYNC FLUSH completion. + * We will unblock the client and send the proper reply. */ +void unblockClientForAsyncFlush(uint64_t client_id, void *userdata) { slotRangeArray *slots = userdata; client *c = lookupClientByID(client_id); @@ -1254,16 +1265,12 @@ void flushallSyncBgDone(uint64_t client_id, void *userdata) { server.current_client = old_client; } -/* Common flush command implementation for FLUSHALL, FLUSHDB and SFLUSH. +/* Common flush command implementation for FLUSHALL, FLUSHDB. * * Return 1 indicates that flush SYNC is actually running in bg as blocking ASYNC * Return 0 otherwise - * - * slots - provided only by SFLUSH command, otherwise NULL. Will be used on - * completion to reply with the slots flush result. Ownership is passed - * to the completion job in case of `blocking_async`. */ -int flushCommandCommon(client *c, int type, int flags, slotRangeArray *slots) { +int flushCommandCommon(client *c, int type, int flags) { int blocking_async = 0; /* Flush SYNC option to run as blocking ASYNC */ /* in case of SYNC, check if we can optimize and run it in bg as blocking ASYNC */ @@ -1273,8 +1280,8 @@ int flushCommandCommon(client *c, int type, int flags, slotRangeArray *slots) { blocking_async = 1; } - /* Cancel all ASM tasks that overlap with the given slot ranges. */ - clusterAsmCancelBySlotRangeArray(slots, c->argv[0]->ptr); + /* Cancel all ASM tasks. */ + clusterAsmCancelBySlotRangeArray(NULL, c->argv[0]->ptr); if (type == FLUSH_TYPE_ALL) flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS); @@ -1289,16 +1296,8 @@ int flushCommandCommon(client *c, int type, int flags, slotRangeArray *slots) { * worker's queue. To be called and reply with OK only after all preceding pending * lazyfree jobs in queue were processed */ if (blocking_async) { - /* measure bg job till completion as elapsed time of flush command */ - elapsedStart(&c->bstate.lazyfreeStartTime); - - c->bstate.timeout = 0; - /* We still need to perform cleanup operations for the command, including - * updating the replication offset, so mark this command as pending to - * avoid command from being reset during unblock. */ - c->flags |= CLIENT_PENDING_COMMAND; - blockClient(c,BLOCKED_LAZYFREE); - bioCreateCompRq(BIO_WORKER_LAZY_FREE, flushallSyncBgDone, c->id, slots); + blockClientForAsyncFlush(c); + bioCreateCompRq(BIO_WORKER_LAZY_FREE, unblockClientForAsyncFlush, c->id, NULL); } #if defined(USE_JEMALLOC) @@ -1327,7 +1326,7 @@ void flushallCommand(client *c) { if (getFlushCommandFlags(c,&flags) == C_ERR) return; /* If FLUSH SYNC isn't running as blocking async, then reply */ - if (flushCommandCommon(c, FLUSH_TYPE_ALL, flags, NULL) == 0) + if (flushCommandCommon(c, FLUSH_TYPE_ALL, flags) == 0) addReply(c, shared.ok); } @@ -1339,7 +1338,7 @@ void flushdbCommand(client *c) { if (getFlushCommandFlags(c,&flags) == C_ERR) return; /* If FLUSH SYNC isn't running as blocking async, then reply */ - if (flushCommandCommon(c, FLUSH_TYPE_DB,flags, NULL) == 0) + if (flushCommandCommon(c, FLUSH_TYPE_DB,flags) == 0) addReply(c, shared.ok); } @@ -2859,7 +2858,10 @@ keyStatus expireIfNeeded(redisDb *db, robj *key, kvobj *kv, int flags) { if (server.allow_access_trimmed || (flags & EXPIRE_ALLOW_ACCESS_TRIMMED)) return KEY_VALID; - return KEY_TRIMMED; + /* If the slot is not served by this node, we should not allow access + * to the key, we consider it as trimmed. */ + if (!clusterCanAccessKeysInSlot(getKeySlot(key_name))) + return KEY_TRIMMED; } if ((flags & EXPIRE_ALLOW_ACCESS_EXPIRED) || diff --git a/src/script.c b/src/script.c index f222b89e6..66b316abf 100644 --- a/src/script.c +++ b/src/script.c @@ -523,6 +523,8 @@ static int scriptVerifyClusterState(scriptRunCtx *run_ctx, client *c, client *or c->cmd->fullname); } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) { *err = sdsnew("Script attempted to access a slot not served"); + } else if (error_code == CLUSTER_REDIR_TRIMMING) { + *err = sdsnew("Script attempted to access a slot being trimmed"); } else { /* error_code == CLUSTER_REDIR_MOVED || error_code == CLUSTER_REDIR_ASK */ *err = sdsnew("Script attempted to access a non local key in a " diff --git a/src/server.h b/src/server.h index e6a28a621..74c1443fd 100644 --- a/src/server.h +++ b/src/server.h @@ -3943,7 +3943,9 @@ kvobj *dbUnshareStringValueByLink(redisDb *db, robj *key, kvobj *kv, dictEntryLi #define FLUSH_TYPE_DB 1 #define FLUSH_TYPE_SLOTS 2 void replySlotsFlushAndFree(client *c, struct slotRangeArray *slots); -int flushCommandCommon(client *c, int type, int flags, struct slotRangeArray *ranges); +int flushCommandCommon(client *c, int type, int flags); +void unblockClientForAsyncFlush(uint64_t client_id, void *userdata); +void blockClientForAsyncFlush(client *c); #define EMPTYDB_NO_FLAGS 0 /* No flags. */ #define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ #define EMPTYDB_NOFUNCTIONS (1<<1) /* Indicate not to flush the functions. */ diff --git a/tests/unit/cluster/multi-slot-operations.tcl b/tests/unit/cluster/multi-slot-operations.tcl index 5d2d03e85..b42d30f0a 100644 --- a/tests/unit/cluster/multi-slot-operations.tcl +++ b/tests/unit/cluster/multi-slot-operations.tcl @@ -133,20 +133,20 @@ test "SFLUSH - Errors and output validation" { assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 0 999 2001 8191 ASYNCX} # Test SFLUSH output validation - assert_match "" [$master1 SFLUSH 2 4] - assert_match "" [$master1 SFLUSH 0 4] + assert_match "{2 4}" [$master1 SFLUSH 2 4] + assert_match "{0 4}" [$master1 SFLUSH 0 4] assert_match "" [$master2 SFLUSH 0 4] - assert_match "" [$master1 SFLUSH 1 8191] - assert_match "" [$master1 SFLUSH 0 8190] - assert_match "" [$master1 SFLUSH 0 998 2001 8191] - assert_match "" [$master1 SFLUSH 1 999 2001 8191] - assert_match "" [$master1 SFLUSH 0 999 2001 8190] - assert_match "" [$master1 SFLUSH 0 999 2002 8191] + assert_match "{1 999} {2001 8191}" [$master1 SFLUSH 1 8191] + assert_match "{0 999} {2001 8190}" [$master1 SFLUSH 0 8190] + assert_match "{0 998} {2001 8191}" [$master1 SFLUSH 0 998 2001 8191] + assert_match "{1 999} {2001 8191}" [$master1 SFLUSH 1 999 2001 8191] + assert_match "{0 999} {2001 8190}" [$master1 SFLUSH 0 999 2001 8190] + assert_match "{0 999} {2002 8191}" [$master1 SFLUSH 0 999 2002 8191] assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 999 2001 8191] assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 8191] assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 4000 4001 8191] - assert_match "" [$master2 SFLUSH 8193 16383] - assert_match "" [$master2 SFLUSH 8192 16382] + assert_match "{8193 16383}" [$master2 SFLUSH 8193 16383] + assert_match "{8192 16382}" [$master2 SFLUSH 8192 16382] assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383] assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383 SYNC] assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383 ASYNC] @@ -180,3 +180,109 @@ test "SFLUSH - Deletes the keys with argument /SYNC/ASYNC" { } } + +set testmodule [file normalize tests/modules/atomicslotmigration.so] +start_cluster 2 2 [list tags {external:skip cluster experimental modules} config_lines [list loadmodule $testmodule]] { +foreach sync_method {"SYNC" "BLOCKING-ASYNC" "ASYNC"} { +foreach trim_method {"active" "bg"} { + test "sflush can propagate to replicas (sync method: $sync_method, trim method: $trim_method)" { + R 0 flushall + R 0 debug asm-trim-method $trim_method + R 2 debug asm-trim-method $trim_method + + # Add keys in master + R 0 set "06S" "slot0" + wait_for_ofs_sync [Rn 0] [Rn 2] + + set loglines [count_log_lines 0] + + # since we have optimization, if the master is not running in blocking context, + # we will try to run in blocking ASYNC mode, so we need to use MULTI/EXEC to make it blocking + if {$sync_method eq "SYNC"} { + R 0 MULTI + } + + # Execute SFLUSH on master, SYNC will be run as blocking ASYNC if not running in MULTI/EXEC + set sync_option "SYNC" + if {$sync_method eq "ASYNC"} { + set sync_option "ASYNC" + } + R 0 SFLUSH 0 8191 $sync_option + + # Execute EXEC if using SYNC + if {$sync_method eq "SYNC"} { + R 0 EXEC + } + + # Wait for SFLUSH to propagate to replica, and complete the trim + wait_for_condition 1000 10 { + [R 0 DBSIZE] == 0 && [R 2 DBSIZE] == 0 + } else { + fail "SFLUSH did not propagate to replica" + } + + if {$sync_method ne "SYNC"} { + if {$trim_method eq "active"} { + wait_for_log_messages 0 {"*Active trim completed for slots*0-8191*"} $loglines 1000 10 + } else { + # background trim + wait_for_log_messages 0 {"*Background trim started for slots*0-8191*"} $loglines 1000 10 + } + } + } +} +} + test "Canceling active trimming can unblock sflush" { + # Delay active trim to make sure it is not completed before FLUSHDB + R 0 debug asm-trim-method active 10000 ;# delay 10ms per key + # Add slot 0 keys in master + for {set i 0} {$i < 1000} {incr i} { + R 0 set "{06S}$i" "value$i" + } + + set rd [redis_deferring_client 0] + $rd SFLUSH 0 8191 SYNC ;# running in blocking async method + + # FLUSHDB will cancel all trim jobs + R 0 SELECT 0 + R 0 FLUSHDB SYNC + + # SFLUSH should be unblocked and return empty array + assert_equal [$rd read] "{0 8191}" + $rd close + } + + test "Write is rejected and read is allowed in SFLUSH slots using active trim" { + R 0 debug asm-trim-method active 1000 ;# delay 1ms per key + R 0 asm.clear_event_log + R 2 asm.clear_event_log + + # Add slot 0 keys + for {set i 0} {$i < 1000} {incr i} { + R 0 set "{06S}$i" "value$i" + } + # Add a slot 1 key, we should trim slot 0 first, then slot 1 + set slot1_key "Qi" + R 0 set $slot1_key "slot1" + wait_for_ofs_sync [Rn 0] [Rn 2] + + set rd [redis_deferring_client 0] + $rd SFLUSH 0 8191 SYNC ;# running in blocking async method + + # we can read the slot 1 key + assert_equal [R 0 get $slot1_key] "slot1" + # Module with flag REDISMODULE_OPEN_KEY_ACCESS_TRIMMED also can read the key + assert_equal [R 0 asm.read_pending_trim_key $slot1_key] "slot1" + + # we can not write to the slot 1 key + assert_error "*TRYAGAIN Slot is being trimmed*" {R 0 set $slot1_key "value1"} + + # wait for SFLUSH to complete + assert_equal [$rd read] "{0 8191}" + $rd close + + # there is no trim event since we sfluh the owned slots of this node + assert_equal [R 0 asm.get_cluster_event_log] {} + assert_equal [R 2 asm.get_cluster_event_log] {} + } +}