From 235e688b010b38496ea1de06b0bfc2786b1ebc63 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Thu, 23 Oct 2025 00:16:32 +0800 Subject: [PATCH] RED-135816: Lookahead pre-fetching (#14440) ## Problem and Motivation Currently, the client only parses one command, then executes it, then parses new commands until the querybuf is consumed. Doing it this way means we cannot perform memory prefetch when IO threads are not enabled, and when IO threads are enabled, we can only parse the first command in the IO thread, while the remaining command parsing still needs to be done in the main thread. This describes a limitation in the current Redis command processing pipeline where: Without IO threads: Commands are parsed and executed one by one sequentially, preventing memory prefetching optimizations With IO threads: Only the first command gets parsed in the IO thread, but subsequent commands from the same client's query buffer must still be parsed in the main thread ## Solution Overview **Core Innovation**: Parse multiple user commands in advance through a lookahead pipeline. **Key Insight**: Since Redis already parses commands to extract keys, we can do this parsing earlier and memory prefetch operations before the command reaches execution, allowing multiple I/O operations to run in parallel. The bulk of the PR is a redesign of the command processing flow for both standalone commands and transactional commands. ### High Level Command Processing Flow #### Before This PR (processInputBuffer()) - While there is data in the client's query buffer: - Read the data and try to parse a complete command (processInlineBuffer() or processMultibulkBuffer()). - If the command is incomplete, exit and wait for more data. - The Command is complete. Process and potentially execute it (processCommandAndResetClient(), processCommand()): - Prepare for the next command (commandProcessed()). ### Major Changes in the Client's Structure To support the new command processing flow: - **New pendingCommand structure**: Since the previous flow processed commands one at a time, it used the client structure to hold the current (and only) parsed command arguments (argv/argc) and other metadata. In the new design, multiple commands are processed, waiting for execution. So, a new pendingCommand structure is introduced to hold a parsed command's arguments and its metadata. - **New pendingCommandList structure (pending_cmds)** that contains all the pending commands with maintained order and includes a ready_len counter that tracks the number of fully parsed commands ready for execution. All commands are fully parsed except possibly the last one (client's command order is maintained). - **New pendingCommandPool structure (cmd_pool)** that manages a shared pool for reusing pendingCommand objects to reduce memory allocation overhead. There is a configurable lookahead limit (server.lookahead) that controls how many fully parsed commands (ready pending commands) to process ahead of time. #### New High Level Flow for Standalone Commands (processInputBuffer()) - While there is data in the client's query buffer or there are ready pending commands: - While there is data in the client's query buffer and we haven't reached the lookahead limit: - Read the data and try to parse a complete command (processInlineBuffer() or processMultibulkBuffer()). Allocate a new pending command if needed, store the command's metadata in the pending command, and add the pending command to the client's pending commands list. - If the command is incomplete, exit and wait for more data. - The command is complete, we have a new ready pending command, preprocess it (preprocessCommand()): - Extract the keys of the command and store the results in the pending command (extractKeysAndSlot()). - If there are pending commands, continue executing them until the queue is empty. ## Transaction Support ### Major Changes in Structures - The multiState structure now contains an array of pendingCommand pointers instead of multiCmd pointers. - The multiCmd structure was deleted (no longer needed). ### New Transaction Support - queueMultiCommand(): - The pending commands are moved from the client's pending_cmds list to the multiState's commands array. ## Detailed Changes ### Additional Client Structure Changes - Replaced argv_len_sum with all_argv_len_sum to reflect the total memory consumed by all pending commands. ### Clients and Pending Commands Management - Clients using pending commands now manage the command arguments via the pendingCommand. Specifically, the memory occupied by argv. - **Pending commands management functions**: - `initPendingCommand()` initializes a newly allocated pending command. - `freeClientPendingCommand()` frees a pending command of a client and its associated resources. - `freeClientPendingCommands()` receives the number of pending commands to free and calls freeClientPendingCommand() to free them. ### Buffer Processing Changes - `processInlineBuffer()`, once a full command is read, used to populate the client's command fields (argc, argv, etc.). Now it creates and populates a pendingCommand, and adds it to the client's pending_cmds list. - `processMultibulkBuffer()`: Similar changes to processInlineBuffer(). The difference is that a pending command may already exist from a previous call to the function, so parsing will continue populating it instead of creating a new one. - `resetClientInternal()` used to receive a free_argv parameter and pass it to freeClientArgvInternal(), which freed the client's argv if set, and also reset client's command fields. It now receives the number of pending commands to free and handles two cases: - The client uses pending commands so they are freed by calling freeClientPendingCommands(). - The client doesn't use pending commands (e.g., LUA client) so the client's argv is freed by calling freeClientArgvInternal(). It then frees the client's command fields that freeClientArgvInternal() doesn't free now. ### Other Changes - Simulate lookahead command preprocessing when loading an AOF and queuing transaction commands; This is necessary since queueMultiCommand() now requires a pending command. - The INVALID_CLUSTER_SLOT constant was defined to indicate an invalid cluster slot. It is used to signal a cross-slot error in preprocessCommand(). - getNodeByQuery() no longer performs cross-slot checks, relying instead on the checks already performed in preprocessCommand(). It also no longer calls getKeysFromCommand() as this was also done in preprocessCommand(). ### Debugging - Added "debug lookahead" command to print the size of the lookahead pipeline for each client. ## New Configuration - **lookahead**: Runtime-configurable lookahead depth (default: 16) ## Security - **Limit lookahead for unauthenticated clients to 1**. This is both to reduce memory overhead, and to prevent errors; AUTH can affect the handling of succeeding commands. --------- Co-authored-by: Slava Koyfman Co-authored-by: Oran Agra Co-authored-by: Udi Ron Co-authored-by: moticless Co-authored-by: Yuan Wang --- redis.conf | 3 + src/acl.c | 35 +- src/aof.c | 12 + src/blocked.c | 3 +- src/cluster.c | 82 ++- src/cluster.h | 5 +- src/config.c | 1 + src/db.c | 34 +- src/iothread.c | 22 +- src/memory_prefetch.c | 29 +- src/module.c | 30 +- src/multi.c | 70 +-- src/networking.c | 700 +++++++++++++++++++++----- src/replication.c | 4 +- src/script.c | 2 +- src/script_lua.c | 5 +- src/server.c | 118 +++-- src/server.h | 110 +++- tests/unit/cluster/misc.tcl | 12 +- tests/unit/cluster/sharded-pubsub.tcl | 2 +- tests/unit/memefficiency.tcl | 4 + tests/unit/networking.tcl | 35 ++ 22 files changed, 1047 insertions(+), 271 deletions(-) diff --git a/redis.conf b/redis.conf index 7229d5c27..51689b334 100644 --- a/redis.conf +++ b/redis.conf @@ -2174,6 +2174,9 @@ client-output-buffer-limit pubsub 32mb 8mb 60 # # client-query-buffer-limit 1gb +# Defines how many commands in each client pipeline to decode and prefetch +# lookahead 16 + # In some scenarios client connections can hog up memory leading to OOM # errors or data eviction. To avoid this we can cap the accumulated memory # used by all client connections (all pubsub and normal clients). Once we diff --git a/src/acl.c b/src/acl.c index 6bd3f0ee4..7caeb28fb 100644 --- a/src/acl.c +++ b/src/acl.c @@ -421,8 +421,7 @@ user *ACLCreateUser(const char *name, size_t namelen) { if (raxFind(Users,(unsigned char*)name,namelen,NULL)) return NULL; user *u = zmalloc(sizeof(*u)); u->name = sdsnewlen(name,namelen); - u->flags = USER_FLAG_DISABLED; - u->flags |= USER_FLAG_SANITIZE_PAYLOAD; + atomicSet(u->flags, USER_FLAG_DISABLED | USER_FLAG_SANITIZE_PAYLOAD); u->passwords = listCreate(); u->acl_string = NULL; listSetMatchMethod(u->passwords,ACLListMatchSds); @@ -1289,22 +1288,18 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) { if (oplen == -1) oplen = strlen(op); if (oplen == 0) return C_OK; /* Empty string is a no-operation. */ if (!strcasecmp(op,"on")) { - u->flags |= USER_FLAG_ENABLED; - u->flags &= ~USER_FLAG_DISABLED; + atomicSet(u->flags, (u->flags | USER_FLAG_ENABLED) & ~USER_FLAG_DISABLED); } else if (!strcasecmp(op,"off")) { - u->flags |= USER_FLAG_DISABLED; - u->flags &= ~USER_FLAG_ENABLED; + atomicSet(u->flags, (u->flags | USER_FLAG_DISABLED) & ~USER_FLAG_ENABLED); } else if (!strcasecmp(op,"skip-sanitize-payload")) { - u->flags |= USER_FLAG_SANITIZE_PAYLOAD_SKIP; - u->flags &= ~USER_FLAG_SANITIZE_PAYLOAD; + atomicSet(u->flags, (u->flags | USER_FLAG_SANITIZE_PAYLOAD_SKIP) & ~USER_FLAG_SANITIZE_PAYLOAD); } else if (!strcasecmp(op,"sanitize-payload")) { - u->flags &= ~USER_FLAG_SANITIZE_PAYLOAD_SKIP; - u->flags |= USER_FLAG_SANITIZE_PAYLOAD; + atomicSet(u->flags, (u->flags | USER_FLAG_SANITIZE_PAYLOAD) & ~USER_FLAG_SANITIZE_PAYLOAD_SKIP); } else if (!strcasecmp(op,"nopass")) { - u->flags |= USER_FLAG_NOPASS; + atomicSet(u->flags, u->flags | USER_FLAG_NOPASS); listEmpty(u->passwords); } else if (!strcasecmp(op,"resetpass")) { - u->flags &= ~USER_FLAG_NOPASS; + atomicSet(u->flags, u->flags & ~USER_FLAG_NOPASS); listEmpty(u->passwords); } else if (op[0] == '>' || op[0] == '#') { sds newpass; @@ -1324,7 +1319,7 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) { listAddNodeTail(u->passwords,newpass); else sdsfree(newpass); - u->flags &= ~USER_FLAG_NOPASS; + atomicSet(u->flags, u->flags & ~USER_FLAG_NOPASS); } else if (op[0] == '<' || op[0] == '!') { sds delpass; if (op[0] == '<') { @@ -1852,7 +1847,7 @@ int ACLUserCheckChannelPerm(user *u, sds channel, int is_pattern) { * If the command fails an ACL check, idxptr will be to set to the first argv entry that * causes the failure, either 0 if the command itself fails or the idx of the key/channel * that causes the failure */ -int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, int *idxptr) { +int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, getKeysResult *key_result, int *idxptr) { listIter li; listNode *ln; @@ -1869,6 +1864,10 @@ int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, i * calls to prevent duplicate lookups. */ aclKeyResultCache cache; initACLKeyResultCache(&cache); + if (key_result) { + cache.keys = *key_result; + cache.keys_init = 1; + } /* Check each selector sequentially */ listRewind(u->selectors,&li); @@ -1876,7 +1875,7 @@ int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, i aclSelector *s = (aclSelector *) listNodeValue(ln); int acl_retval = ACLSelectorCheckCmd(s, cmd, argv, argc, &local_idxptr, &cache); if (acl_retval == ACL_OK) { - cleanupACLKeyResultCache(&cache); + if (!key_result) cleanupACLKeyResultCache(&cache); return ACL_OK; } if (acl_retval > relevant_error || @@ -1888,13 +1887,13 @@ int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, i } *idxptr = last_idx; - cleanupACLKeyResultCache(&cache); + if (!key_result) cleanupACLKeyResultCache(&cache); return relevant_error; } /* High level API for checking if a client can execute the queued up command */ int ACLCheckAllPerm(client *c, int *idxptr) { - return ACLCheckAllUserCommandPerm(c->user, c->cmd, c->argv, c->argc, idxptr); + return ACLCheckAllUserCommandPerm(c->user, c->cmd, c->argv, c->argc, getClientCachedKeyResult(c), idxptr); } /* If 'new' can access all channels 'original' could then return NULL; @@ -3144,7 +3143,7 @@ void aclCommand(client *c) { } int idx; - int result = ACLCheckAllUserCommandPerm(u, cmd, c->argv + 3, c->argc - 3, &idx); + int result = ACLCheckAllUserCommandPerm(u, cmd, c->argv + 3, c->argc - 3, NULL, &idx); if (result != ACL_OK) { sds err = getAclErrorMessage(result, u, cmd, c->argv[idx+3]->ptr, 1); addReplyBulkSds(c, err); diff --git a/src/aof.c b/src/aof.c index 94a28775b..90d646bb0 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1641,12 +1641,24 @@ int loadSingleAppendOnlyFile(char *filename) { if (fakeClient->flags & CLIENT_MULTI && fakeClient->cmd->proc != execCommand) { + /* queueMultiCommand requires a pendingCommand, so we create a "fake" one here + * for it to consume */ + pendingCommand *pcmd = zmalloc(sizeof(pendingCommand)); + initPendingCommand(pcmd); + addPendingCommand(&fakeClient->pending_cmds, pcmd); + + pcmd->argc = argc; + pcmd->argv_len = argc; + pcmd->argv = argv; + pcmd->cmd = cmd; + /* Note: we don't have to attempt calling evalGetCommandFlags, * since this is AOF, the checks in processCommand are not made * anyway.*/ queueMultiCommand(fakeClient, cmd->flags); } else { cmd->proc(fakeClient); + fakeClient->all_argv_len_sum = 0; /* Otherwise no one cleans this up and we reach cleanup with it non-zero */ } /* The fake client should not have a reply */ diff --git a/src/blocked.c b/src/blocked.c index ee5a36514..4f518c9a5 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -130,8 +130,7 @@ void processUnblockedClients(void) { * call reqresAppendResponse here (for clients blocked on key, * unblockClientOnKey is called, which eventually calls processCommand, * which calls reqresAppendResponse) */ - reqresAppendResponse(c); - resetClient(c); + prepareForNextCommand(c, 0); } if (c->flags & CLIENT_MODULE) { diff --git a/src/cluster.c b/src/cluster.c index 330907b60..3fb7af5c0 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1086,6 +1086,31 @@ void clusterCommand(client *c) { } } +/* Extract slot number from keys in a keys_result structure and return to caller. + * Returns INVALID_CLUSTER_SLOT if keys belong to different slots (cross-slot error), + * or if there are no keys. + */ +int extractSlotFromKeysResult(robj **argv, getKeysResult *keys_result) { + if (keys_result->numkeys == 0) + return INVALID_CLUSTER_SLOT; + + if (!server.cluster_enabled) + return 0; + + int first_slot = INVALID_CLUSTER_SLOT; + for (int j = 0; j < keys_result->numkeys; j++) { + robj *this_key = argv[keys_result->keys[j].pos]; + int this_slot = (int)keyHashSlot((char*)this_key->ptr, sdslen(this_key->ptr)); + + if (first_slot == INVALID_CLUSTER_SLOT) + first_slot = this_slot; + else if (first_slot != this_slot) { + return INVALID_CLUSTER_SLOT; + } + } + return first_slot; +} + /* Return the pointer to the cluster node that is able to serve the command. * For the function to succeed the command should only target either: * @@ -1118,13 +1143,16 @@ void clusterCommand(client *c) { * * CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is * down but the user attempts to execute a command that addresses one or more keys. */ -clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, uint64_t cmd_flags, int *error_code) { +clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, + getKeysResult *keys_result, uint8_t read_error, uint64_t cmd_flags, int *error_code) +{ clusterNode *myself = getMyClusterNode(); clusterNode *n = NULL; robj *firstkey = NULL; int multiple_keys = 0; multiState *ms, _ms; - multiCmd mc; + pendingCommand mc; + pendingCommand *mcp = &mc; int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0, existing_keys = 0; int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */ @@ -1152,11 +1180,20 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * structure if the client is not in MULTI/EXEC state, this way * we have a single codepath below. */ ms = &_ms; - _ms.commands = &mc; + _ms.commands = &mcp; _ms.count = 1; + + /* Properly initialize the fake pendingCommand */ + initPendingCommand(&mc); mc.argv = argv; mc.argc = argc; mc.cmd = cmd; + mc.slot = hashslot ? *hashslot : INVALID_CLUSTER_SLOT; + mc.read_error = read_error; + if (keys_result) { + mc.keys_result = *keys_result; + mc.flags |= PENDING_CMD_KEYS_RESULT_VALID; + } } /* Check that all the keys are in the same hash slot, and obtain this @@ -1164,12 +1201,14 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in for (i = 0; i < ms->count; i++) { struct redisCommand *mcmd; robj **margv; - int margc, numkeys, j; + int margc, j; keyReference *keyindex; - mcmd = ms->commands[i].cmd; - margc = ms->commands[i].argc; - margv = ms->commands[i].argv; + pendingCommand *pcmd = ms->commands[i]; + + mcmd = pcmd->cmd; + margc = pcmd->argc; + margv = pcmd->argv; /* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */ if (!pubsubshard_included && @@ -1178,14 +1217,29 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in pubsubshard_included = 1; } + /* If we have a cached keys result from preprocessCommand(), use it. + * Otherwise, extract keys result. */ + int use_cache_keys_result = pcmd->flags & PENDING_CMD_KEYS_RESULT_VALID; getKeysResult result = GETKEYS_RESULT_INIT; - numkeys = getKeysFromCommand(mcmd,margv,margc,&result); + if (use_cache_keys_result) + result = pcmd->keys_result; + else + getKeysFromCommand(mcmd,margv,margc,&result); keyindex = result.keys; - for (j = 0; j < numkeys; j++) { + for (j = 0; j < result.numkeys; j++) { + /* The command has keys and was checked for cross-slot between its keys in preprocessCommand() */ + if (pcmd->read_error == CLIENT_READ_CROSS_SLOT) { + /* Error: multiple keys from different slots. */ + if (error_code) + *error_code = CLUSTER_REDIR_CROSS_SLOT; + return NULL; + } + robj *thiskey = margv[keyindex[j].pos]; - int thisslot = keyHashSlot((char*)thiskey->ptr, - sdslen(thiskey->ptr)); + int thisslot = pcmd->slot; + if (thisslot == INVALID_CLUSTER_SLOT) + thisslot = keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr)); if (firstkey == NULL) { /* This is the first key we see. Check what is the slot @@ -1199,7 +1253,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * not trapped earlier in processCommand(). Report the same * error to the client. */ if (n == NULL) { - getKeysFreeResult(&result); + if (!use_cache_keys_result) getKeysFreeResult(&result); if (error_code) *error_code = CLUSTER_REDIR_DOWN_UNBOUND; return NULL; @@ -1222,7 +1276,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * the same key/channel as the first we saw. */ if (slot != thisslot) { /* Error: multiple keys from different slots. */ - getKeysFreeResult(&result); + if (!use_cache_keys_result) getKeysFreeResult(&result); if (error_code) *error_code = CLUSTER_REDIR_CROSS_SLOT; return NULL; @@ -1247,7 +1301,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in else existing_keys++; } } - getKeysFreeResult(&result); + if (!use_cache_keys_result) getKeysFreeResult(&result); } /* No key at all in command? then we can serve the request diff --git a/src/cluster.h b/src/cluster.h index 369c1b91a..830dae87b 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -22,6 +22,7 @@ #define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */ #define CLUSTER_SLOTS (1<id); } else { @@ -3134,6 +3134,38 @@ int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getK return 0; } +/* Extract keys/channels from a command and calculate the cluster slot. + * Returns the number of keys/channels extracted. + * The slot number is returned by reference into *slot. + * If is_incomplete is not NULL, it will be set for key extraction. + * + * This function handles both regular commands (keys) and sharded pubsub + * commands (channels), but excludes regular pubsub commands which don't + * have slots. + */ +int extractKeysAndSlot(struct redisCommand *cmd, robj **argv, int argc, + getKeysResult *result, int *slot) { + int num_keys = -1; + + if (!doesCommandHaveChannelsWithFlags(cmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE)) { + num_keys = getKeysFromCommandWithSpecs(cmd, argv, argc, GET_KEYSPEC_DEFAULT, result); + } else { + /* Only extract channels for commands that have key_specs (sharded pubsub). + * Regular pubsub commands (PUBLISH, SUBSCRIBE) don't have slots. */ + if (cmd->key_specs_num > 0) { + num_keys = getChannelsFromCommand(cmd, argv, argc, result); + } else { + num_keys = 0; + } + } + + *slot = INVALID_CLUSTER_SLOT; + if (num_keys >= 0) + *slot = extractSlotFromKeysResult(argv, result); + + return num_keys; +} + /* The base case is to use the keys position as given in the command table * (firstkey, lastkey, step). * This function works only on command with the legacy_range_key_spec, diff --git a/src/iothread.c b/src/iothread.c index 083e20bf1..81014b3d0 100644 --- a/src/iothread.c +++ b/src/iothread.c @@ -175,7 +175,7 @@ void assignClientToIOThread(client *c) { server.io_threads_clients_num[min_id]++; /* The client running in IO thread needs to have deferred objects array. */ - c->deferred_objects = zmalloc(sizeof(robj*) * CLIENT_MAX_DEFERRED_OBJECTS); + c->deferred_objects = zmalloc(sizeof(deferredObject) * CLIENT_MAX_DEFERRED_OBJECTS); /* Unbind connection of client from main thread event loop, disable read and * write, and then put it in the list, main thread will send these clients @@ -355,11 +355,12 @@ int prefetchIOThreadCommands(IOThread *t) { listIter li; listNode *ln; listRewind(mainThreadProcessingClients[t->id], &li); - while((ln = listNext(&li)) && clients++ < to_prefetch) { + while((ln = listNext(&li)) && clients < to_prefetch) { client *c = listNodeValue(ln); /* A single command may contain multiple keys. If the batch is full, * we stop adding clients to it. */ if (addCommandToBatch(c) == C_ERR) break; + clients++; } /* Prefetch the commands in the batch. */ @@ -423,10 +424,13 @@ int processClientsFromIOThread(IOThread *t) { listNode *node = NULL; while (listLength(mainThreadProcessingClients[t->id])) { - /* Prefetch the commands if no clients in the batch. */ - if (prefetch_clients <= 0) prefetch_clients = prefetchIOThreadCommands(t); - /* Reset the prefetching batch if we have processed all clients. */ - if (--prefetch_clients <= 0) resetCommandsBatch(); + if (prefetch_clients <= 0) { + /* Reset the prefetching batch if we have processed all clients. */ + resetCommandsBatch(); + /* Prefetch the commands if no clients in the batch. */ + prefetch_clients = prefetchIOThreadCommands(t); + } + prefetch_clients--; /* Each time we pop up only the first client to process to guarantee * reentrancy safety. */ @@ -445,7 +449,7 @@ int processClientsFromIOThread(IOThread *t) { /* If a read error occurs, handle it in the main thread first, since we * want to print logs about client information before freeing. */ - if (c->read_error) handleClientReadError(c); + if (isClientReadErrorFatal(c)) handleClientReadError(c); /* The client is asked to close in IO thread. */ if (c->io_flags & CLIENT_IO_CLOSE_ASAP) { @@ -466,7 +470,7 @@ int processClientsFromIOThread(IOThread *t) { } /* Process the pending command and input buffer. */ - if (!c->read_error && c->io_flags & CLIENT_IO_PENDING_COMMAND) { + if (!isClientReadErrorFatal(c) && c->io_flags & CLIENT_IO_PENDING_COMMAND) { c->flags |= CLIENT_PENDING_COMMAND; if (processPendingCommandAndInputBuffer(c) == C_ERR) { /* If the client is no longer valid, it must be freed safely. */ @@ -730,8 +734,6 @@ void initThreadedIO(void) { exit(1); } - prefetchCommandsBatchInit(); - /* Spawn and initialize the I/O threads. */ for (int i = 1; i < server.io_threads_num; i++) { IOThread *t = &IOThreads[i]; diff --git a/src/memory_prefetch.c b/src/memory_prefetch.c index 8f3f77ef2..651312d5b 100644 --- a/src/memory_prefetch.c +++ b/src/memory_prefetch.c @@ -380,19 +380,32 @@ int addCommandToBatch(client *c) { return C_ERR; } + /* Avoid partial prefetching: if the batch already has keys and adding this + * client's ready commands would likely exceed the batch size limit, reject + * the entire client. This is a conservative estimate using command count as + * a proxy for key count to ensure all keys from a client are either fully + * prefetched together or not prefetched at all. */ + if (batch->key_count > 0 && + c->pending_cmds.ready_len + batch->key_count > batch->max_prefetch_size) + { + return C_ERR; + } + batch->clients[batch->client_count++] = c; - if (likely(c->iolookedcmd)) { - /* Get command's keys positions */ - getKeysResult result = GETKEYS_RESULT_INIT; - int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result); - for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) { - batch->keys[batch->key_count] = c->argv[result.keys[i].pos]; + pendingCommand *pcmd = c->pending_cmds.head; + while (pcmd != NULL) { + /* Skip commands that have not been preprocessed, or have errors. */ + if ((pcmd->flags & PENDING_CMD_FLAG_INCOMPLETE) || !pcmd->cmd || pcmd->read_error) break; + + serverAssert(pcmd->flags & PENDING_CMD_KEYS_RESULT_VALID); + for (int i = 0; i < pcmd->keys_result.numkeys && batch->key_count < batch->max_prefetch_size; i++) { + batch->keys[batch->key_count] = pcmd->argv[pcmd->keys_result.keys[i].pos]; batch->keys_dicts[batch->key_count] = - kvstoreGetDict(c->db->keys, c->slot > 0 ? c->slot : 0); + kvstoreGetDict(c->db->keys, pcmd->slot > 0 ? pcmd->slot : 0); batch->key_count++; } - getKeysFreeResult(&result); + pcmd = pcmd->next; } return C_OK; diff --git a/src/module.c b/src/module.c index a1b1bcb9e..08efc78ba 100644 --- a/src/module.c +++ b/src/module.c @@ -677,11 +677,12 @@ void moduleReleaseTempClient(client *c) { listEmpty(c->reply); c->reply_bytes = 0; c->duration = 0; - resetClient(c); + resetClient(c, -1); + serverAssert(c->all_argv_len_sum == 0); c->bufpos = 0; c->flags = CLIENT_MODULE; c->user = NULL; /* Root user */ - c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL; + c->cmd = c->lastcmd = c->realcmd = NULL; if (c->bstate.async_rm_call_handle) { RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle; promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */ @@ -6635,7 +6636,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch int acl_errpos; int acl_retval; - acl_retval = ACLCheckAllUserCommandPerm(user,c->cmd,c->argv,c->argc,&acl_errpos); + acl_retval = ACLCheckAllUserCommandPerm(user,c->cmd,c->argv,c->argc,NULL,&acl_errpos); if (acl_retval != ACL_OK) { sds object = (acl_retval == ACL_DENIED_CMD) ? sdsdup(c->cmd->fullname) : sdsdup(c->argv[acl_errpos]->ptr); addACLLogEntry(ctx->client, acl_retval, ACL_LOG_CTX_MODULE, -1, c->user->name, object); @@ -6660,7 +6661,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING); c->flags |= ctx->client->flags & (CLIENT_READONLY|CLIENT_ASKING); const uint64_t cmd_flags = getCommandFlags(c); - if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,cmd_flags,&error_code) != + if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,NULL,0,cmd_flags,&error_code) != getMyClusterNode()) { sds msg = NULL; @@ -10022,7 +10023,7 @@ int RM_ACLCheckCommandPermissions(RedisModuleUser *user, RedisModuleString **arg return REDISMODULE_ERR; } - if (ACLCheckAllUserCommandPerm(user->user, cmd, argv, argc, &keyidxptr) != ACL_OK) { + if (ACLCheckAllUserCommandPerm(user->user, cmd, argv, argc, NULL, &keyidxptr) != ACL_OK) { errno = EACCES; return REDISMODULE_ERR; } @@ -11157,12 +11158,27 @@ void moduleCallCommandFilters(client *c) { } /* If the filter sets a new command, including command or subcommand, - * the command looked up in IO threads will be invalid. */ - c->iolookedcmd = NULL; + * the command looked up will be invalid. */ + c->lookedcmd = NULL; c->argv = filter.argv; c->argv_len = filter.argv_len; c->argc = filter.argc; + + /* Update pending command if it exists. */ + pendingCommand *pcmd = c->current_pending_cmd; + if (pcmd) { + pcmd->argv = filter.argv; + pcmd->argc = filter.argc; + pcmd->argv_len = filter.argv_len; + pcmd->cmd = NULL; + pcmd->slot = INVALID_CLUSTER_SLOT; + pcmd->flags = 0; + + /* Reset keys result */ + getKeysFreeResult(&pcmd->keys_result); + pcmd->keys_result = (getKeysResult)GETKEYS_RESULT_INIT; + } } /* Return the number of arguments a filtered command has. The number of diff --git a/src/multi.c b/src/multi.c index 5a1330cfe..dd372031b 100644 --- a/src/multi.c +++ b/src/multi.c @@ -20,27 +20,19 @@ void initClientMultiState(client *c) { c->mstate.cmd_inv_flags = 0; c->mstate.argv_len_sums = 0; c->mstate.alloc_count = 0; + c->mstate.executing_cmd = -1; } /* Release all the resources associated with MULTI/EXEC state */ void freeClientMultiState(client *c) { - int j; - - for (j = 0; j < c->mstate.count; j++) { - int i; - multiCmd *mc = c->mstate.commands+j; - - for (i = 0; i < mc->argc; i++) - decrRefCount(mc->argv[i]); - zfree(mc->argv); + for (int i = 0; i < c->mstate.count; i++) { + freePendingCommand(c, c->mstate.commands[i]); } zfree(c->mstate.commands); } /* Add a new command into the MULTI commands queue */ void queueMultiCommand(client *c, uint64_t cmd_flags) { - multiCmd *mc; - /* No sense to waste memory if the transaction is already aborted. * this is useful in case client sends these in a pipeline, or doesn't * bother to read previous responses and didn't notice the multi was already @@ -50,29 +42,35 @@ void queueMultiCommand(client *c, uint64_t cmd_flags) { if (c->mstate.count == 0) { /* If a client is using multi/exec, assuming it is used to execute at least * two commands. Hence, creating by default size of 2. */ - c->mstate.commands = zmalloc(sizeof(multiCmd)*2); + c->mstate.commands = zmalloc(sizeof(pendingCommand*)*2); c->mstate.alloc_count = 2; } if (c->mstate.count == c->mstate.alloc_count) { c->mstate.alloc_count = c->mstate.alloc_count < INT_MAX/2 ? c->mstate.alloc_count*2 : INT_MAX; - c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.alloc_count)); + c->mstate.commands = zrealloc(c->mstate.commands, sizeof(pendingCommand*)*(c->mstate.alloc_count)); } - mc = c->mstate.commands+c->mstate.count; - mc->cmd = c->cmd; - mc->argc = c->argc; - mc->argv = c->argv; - mc->argv_len = c->argv_len; + + /* Move the pending command into the multi-state. + * We leave the empty list node in 'pending_cmds' for freeClientPendingCommands to clean up + * later, but set the value to NULL to indicate it has been moved out and should not be freed. */ + pendingCommand *pcmd = popPendingCommandFromHead(&c->pending_cmds); + c->current_pending_cmd = NULL; + pendingCommand **mc = c->mstate.commands + c->mstate.count; + *mc = pcmd; c->mstate.count++; c->mstate.cmd_flags |= cmd_flags; c->mstate.cmd_inv_flags |= ~cmd_flags; - c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj*)*c->argc; + c->mstate.argv_len_sums += (*mc)->argv_len_sum; + c->all_argv_len_sum -= (*mc)->argv_len_sum; - /* Reset the client's args since we copied them into the mstate and shouldn't - * reference them from c anymore. */ + (*mc)->argv_len_sum = 0; /* This is no longer tracked through all_argv_len_sum, so we don't want */ + /* to subtract it from there later. */ + + /* Reset the client's args since we moved them into the mstate and shouldn't + * reference them from 'c' anymore. */ c->argv = NULL; c->argc = 0; - c->argv_len_sum = 0; c->argv_len = 0; } @@ -130,6 +128,7 @@ void execCommand(client *c) { int j; robj **orig_argv; int orig_argc, orig_argv_len; + size_t orig_all_argv_len_sum; struct redisCommand *orig_cmd; if (!(c->flags & CLIENT_MULTI)) { @@ -173,12 +172,19 @@ void execCommand(client *c) { orig_argv_len = c->argv_len; orig_argc = c->argc; orig_cmd = c->cmd; + + /* Multi-state commands aren't tracked through all_argv_len_sum, so we don't want anything done while executing them to affect that field. + * Otherwise, we get inconsistencies and all_argv_len_sum doesn't go back to exactly 0 when the client is finished */ + orig_all_argv_len_sum = c->all_argv_len_sum; + + c->all_argv_len_sum = c->mstate.argv_len_sums; + addReplyArrayLen(c,c->mstate.count); for (j = 0; j < c->mstate.count; j++) { - c->argc = c->mstate.commands[j].argc; - c->argv = c->mstate.commands[j].argv; - c->argv_len = c->mstate.commands[j].argv_len; - c->cmd = c->realcmd = c->mstate.commands[j].cmd; + c->argc = c->mstate.commands[j]->argc; + c->argv = c->mstate.commands[j]->argv; + c->argv_len = c->mstate.commands[j]->argv_len; + c->cmd = c->realcmd = c->mstate.commands[j]->cmd; /* ACL permissions are also checked at the time of execution in case * they were changed after the commands were queued. */ @@ -208,6 +214,7 @@ void execCommand(client *c) { "This command is no longer allowed for the " "following reason: %s", reason); } else { + c->mstate.executing_cmd = j; if (c->id == CLIENT_ID_AOF) call(c,CMD_CALL_NONE); else @@ -217,10 +224,10 @@ void execCommand(client *c) { } /* Commands may alter argc/argv, restore mstate. */ - c->mstate.commands[j].argc = c->argc; - c->mstate.commands[j].argv = c->argv; - c->mstate.commands[j].argv_len = c->argv_len; - c->mstate.commands[j].cmd = c->cmd; + c->mstate.commands[j]->argc = c->argc; + c->mstate.commands[j]->argv = c->argv; + c->mstate.commands[j]->argv_len = c->argv_len; + c->mstate.commands[j]->cmd = c->cmd; } // restore old DENY_BLOCKING value @@ -231,6 +238,7 @@ void execCommand(client *c) { c->argv_len = orig_argv_len; c->argc = orig_argc; c->cmd = c->realcmd = orig_cmd; + c->all_argv_len_sum = orig_all_argv_len_sum; discardTransaction(c); server.in_exec = 0; @@ -490,6 +498,6 @@ size_t multiStateMemOverhead(client *c) { /* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */ mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey)); /* Reserved memory for queued multi commands. */ - mem += c->mstate.alloc_count * sizeof(multiCmd); + mem += c->mstate.alloc_count * sizeof(pendingCommand); return mem; } diff --git a/src/networking.c b/src/networking.c index 78914bf21..747d5479e 100644 --- a/src/networking.c +++ b/src/networking.c @@ -20,6 +20,7 @@ #include "fpconv_dtoa.h" #include "fmtargs.h" #include "cluster_asm.h" +#include "memory_prefetch.h" #include #include #include @@ -33,6 +34,9 @@ static inline int _clientHasPendingRepliesSlave(client *c); static inline int _clientHasPendingRepliesNonSlave(client *c); static inline int _writeToClientNonSlave(client *c, ssize_t *nwritten); static inline int _writeToClientSlave(client *c, ssize_t *nwritten); +static pendingCommand *acquirePendingCommand(void); +static void reclaimPendingCommand(client *c, pendingCommand *pcmd); + int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ __thread sds thread_reusable_qb = NULL; __thread int thread_reusable_qb_used = 0; /* Avoid multiple clients using reusable query @@ -113,8 +117,10 @@ static void clientSetDefaultAuth(client *c) { int authRequired(client *c) { /* Check if the user is authenticated. This check is skipped in case * the default user is flagged as "nopass" and is active. */ - int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) || - (DefaultUser->flags & USER_FLAG_DISABLED)) && + uint32_t default_flags; + atomicGet(DefaultUser->flags, default_flags); + int auth_required = (!(default_flags & USER_FLAG_NOPASS) || + (default_flags & USER_FLAG_DISABLED)) && !c->authenticated; return auth_required; } @@ -163,12 +169,15 @@ client *createClient(connection *conn) { c->argc = 0; c->argv = NULL; c->argv_len = 0; - c->argv_len_sum = 0; + c->all_argv_len_sum = 0; + c->pending_cmds.head = c->pending_cmds.tail = NULL; + c->pending_cmds.len = c->pending_cmds.ready_len = 0; + c->current_pending_cmd = NULL; c->original_argc = 0; c->original_argv = NULL; c->deferred_objects = NULL; c->deferred_objects_num = 0; - c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL; + c->cmd = c->lastcmd = c->realcmd = c->lookedcmd = NULL; c->cur_script = NULL; c->multibulklen = 0; c->bulklen = -1; @@ -184,6 +193,7 @@ client *createClient(connection *conn) { c->replstate = REPL_STATE_NONE; c->repl_start_cmd_stream_on_ack = 0; c->reploff = 0; + c->reploff_next = 0; c->read_reploff = 0; c->repl_applied = 0; c->repl_ack_off = 0; @@ -1483,28 +1493,41 @@ void acceptCommonHandler(connection *conn, int flags, char *ip) { } } +static void freeDeferredObject(client *c, int type, void *ptr) { + if (type == DEFERRED_OBJECT_TYPE_PENDING_COMMAND) { + freePendingCommand(c, ptr); + } else if (type == DEFERRED_OBJECT_TYPE_ROBJ) { + decrRefCount(ptr); + } else { + serverPanic("Unknown deferred object type: %d", type); + } +} + /* Attempt to defer freeing the object to the IO thread. We usually call this since * we know the object is allocated in the IO thread, to avoid memory arena contention, * and also reducing the load of the main thread. */ -void tryDeferFreeClientObject(client *c, robj *o) { - if (!c || c->tid == IOTHREAD_MAIN_THREAD_ID || o->refcount > 1) { - decrRefCount(o); +void tryDeferFreeClientObject(client *c, int type, void *ptr) { + if (!c || c->tid == IOTHREAD_MAIN_THREAD_ID) { + freeDeferredObject(c, type, ptr); return; } /* Put the object in the deferred objects array. */ if (c->deferred_objects && c->deferred_objects_num < CLIENT_MAX_DEFERRED_OBJECTS) { - c->deferred_objects[c->deferred_objects_num++] = o; + c->deferred_objects[c->deferred_objects_num].type = type; + c->deferred_objects[c->deferred_objects_num].ptr = ptr; + c->deferred_objects_num++; } else { - decrRefCount(o); + freeDeferredObject(c, type, ptr); } } -/* Free the objects in the deferred_objects array. If free_array is true +/* Free the objects in the deferred_pending_cmds array. If free_array is true * then free the array itself as well. */ void freeClientDeferredObjects(client *c, int free_array) { for (int j = 0; j < c->deferred_objects_num; j++) { - decrRefCount(c->deferred_objects[j]); + deferredObject *obj = &c->deferred_objects[j]; + freeDeferredObject(c, obj->type, obj->ptr); } c->deferred_objects_num = 0; @@ -1527,17 +1550,11 @@ void freeClientOriginalArgv(client *c) { static inline void freeClientArgvInternal(client *c, int free_argv) { int j; - if (c->tid == IOTHREAD_MAIN_THREAD_ID) { - for (j = 0; j < c->argc; j++) - decrRefCount(c->argv[j]); - } else { - for (j = 0; j < c->argc; j++) - tryDeferFreeClientObject(c, c->argv[j]); - } + for (j = 0; j < c->argc; j++) + decrRefCount(c->argv[j]); c->argc = 0; c->cmd = NULL; - c->iolookedcmd = NULL; - c->argv_len_sum = 0; + c->lookedcmd = NULL; if (free_argv) { c->argv_len = 0; zfree(c->argv); @@ -1549,6 +1566,18 @@ void freeClientArgv(client *c) { freeClientArgvInternal(c, 1); } +void freeClientPendingCommands(client *c, int num_pcmds_to_free) { + /* (-1) means free all pending commands */ + if (num_pcmds_to_free == -1) + num_pcmds_to_free = c->pending_cmds.len; + + while (num_pcmds_to_free--) { + pendingCommand *pcmd = popPendingCommandFromHead(&c->pending_cmds); + serverAssert(pcmd); + reclaimPendingCommand(c, pcmd); + } +} + /* Close all the slaves connections. This is useful in chained replication * when we resync with our own master and want to force all our slaves to * resync with us as well. */ @@ -1648,6 +1677,12 @@ void unlinkClient(client *c) { c->flags &= ~CLIENT_UNBLOCKED; } + freeClientPendingCommands(c, -1); + c->argv_len = 0; + c->argv = NULL; + c->argc = 0; + c->cmd = NULL; + /* Clear the tracking status. */ if (c->flags & CLIENT_TRACKING) disableTracking(c); } @@ -1831,7 +1866,6 @@ void freeClient(client *c) { listRelease(c->reply); zfree(c->buf); freeReplicaReferencedReplBuffer(c); - freeClientArgv(c); freeClientOriginalArgv(c); freeClientDeferredObjects(c, 1); if (c->deferred_reply_errors) @@ -1848,9 +1882,15 @@ void freeClient(client *c) { /* Unlink the client: this will close the socket, remove the I/O * handlers, and remove references of the client from different - * places where active clients may be referenced. */ + * places where active clients may be referenced. + * This will also clean all remaining pending commands in the client, + * as they are no longer valid. + */ unlinkClient(c); + freeClientMultiState(c); + serverAssert(c->pending_cmds.len == 0); + /* Master/slave cleanup Case 1: * we lost the connection with a slave. */ if (c->flags & CLIENT_SLAVE) { @@ -1905,7 +1945,7 @@ void freeClient(client *c) { if (c->name) decrRefCount(c->name); if (c->lib_name) decrRefCount(c->lib_name); if (c->lib_ver) decrRefCount(c->lib_ver); - freeClientMultiState(c); + serverAssert(c->all_argv_len_sum == 0); sdsfree(c->peerid); sdsfree(c->sockname); sdsfree(c->slave_addr); @@ -2293,14 +2333,35 @@ int handleClientsWithPendingWrites(void) { return processed; } -static inline void resetClientInternal(client *c, int free_argv) { - redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL; - - freeClientArgvInternal(c, free_argv); - c->cur_script = NULL; +/* Prepare the client for the parsing of the next command. */ +void resetClientQbufState(client *c) { c->reqtype = 0; c->multibulklen = 0; c->bulklen = -1; +} + +static inline void resetClientInternal(client *c, int num_pcmds_to_free) { + redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL; + + /* We may get here with no pending commands but with an argv that needs freeing. + * An example is in the case of modules (RM_Call) */ + if (c->current_pending_cmd) { + freeClientPendingCommands(c, num_pcmds_to_free); + if (c->pending_cmds.len == 0) + serverAssert(c->all_argv_len_sum == 0); + c->current_pending_cmd = NULL; + } else if (c->argv) { + freeClientArgvInternal(c, 1 /* free_argv */); + /* If we're dealing with a client that doesn't create pendingCommand structs (e.g.: a Lua client), + * clear the all_argv_len_sum counter so we don't get to freeing the client with it non-zero. */ + c->all_argv_len_sum = 0; + } + + c->argc = 0; + c->cmd = NULL; + c->argv_len = 0; + c->argv = NULL; + c->cur_script = NULL; c->slot = -1; c->cluster_compatibility_check_slot = -2; c->flags &= ~CLIENT_EXECUTING_COMMAND; @@ -2340,8 +2401,8 @@ static inline void resetClientInternal(client *c, int free_argv) { } /* resetClient prepare the client to process the next command */ -void resetClient(client *c) { - resetClientInternal(c, 1); +void resetClient(client *c, int num_pcmds_to_free) { + resetClientInternal(c, num_pcmds_to_free); } /* This function is used when we want to re-enter the event loop but there @@ -2384,7 +2445,7 @@ void unprotectClient(client *c) { * have a well formed command. The function also returns C_ERR when there is * a protocol error: in such a case the client structure is setup to reply * with the error and close the connection. */ -int processInlineBuffer(client *c) { +int processInlineBuffer(client *c, pendingCommand *pcmd) { char *newline; int argc, j, linefeed_chars = 1; sds *argv, aux; @@ -2396,7 +2457,7 @@ int processInlineBuffer(client *c) { /* Nothing to do without a \r\n */ if (newline == NULL) { if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { - c->read_error = CLIENT_READ_TOO_BIG_INLINE_REQUEST; + pcmd->read_error = CLIENT_READ_TOO_BIG_INLINE_REQUEST; } return C_ERR; } @@ -2411,7 +2472,7 @@ int processInlineBuffer(client *c) { argv = sdssplitargs(aux,&argc); sdsfree(aux); if (argv == NULL) { - c->read_error = CLIENT_READ_UNBALANCED_QUOTES; + pcmd->read_error = CLIENT_READ_UNBALANCED_QUOTES; return C_ERR; } @@ -2430,7 +2491,7 @@ int processInlineBuffer(client *c) { * to keep the connection active. */ if (querylen != 0 && c->flags & CLIENT_MASTER) { sdsfreesplitres(argv,argc); - c->read_error = CLIENT_READ_MASTER_USING_INLINE_PROTOCAL; + pcmd->read_error = CLIENT_READ_MASTER_USING_INLINE_PROTOCAL; return C_ERR; } @@ -2440,19 +2501,20 @@ int processInlineBuffer(client *c) { /* Setup argv array on client structure */ if (argc) { /* Create new argv if space is insufficient. */ - if (unlikely(argc > c->argv_len)) { - zfree(c->argv); - c->argv = zmalloc(sizeof(robj*)*argc); - c->argv_len = argc; + if (argc > pcmd->argv_len) { + zfree(pcmd->argv); + pcmd->argv = zmalloc(sizeof(robj*)*argc); + pcmd->argv_len = argc; + pcmd->argv_len_sum = 0; } - c->argv_len_sum = 0; } /* Create redis objects for all arguments. */ - for (c->argc = 0, j = 0; j < argc; j++) { - c->argv[c->argc] = createObject(OBJ_STRING,argv[j]); - c->argc++; - c->argv_len_sum += sdslen(argv[j]); + for (pcmd->argc = 0, j = 0; j < argc; j++) { + pcmd->argv[pcmd->argc] = createObject(OBJ_STRING,argv[j]); + pcmd->argc++; + pcmd->argv_len_sum += sdslen(argv[j]); + c->all_argv_len_sum += sdslen(argv[j]); } zfree(argv); @@ -2469,7 +2531,7 @@ int processInlineBuffer(client *c) { * Command) SET key value * Inline) SET key value\r\n */ - c->net_input_bytes_curr_cmd = (c->argv_len_sum + (c->argc - 1) + 2); + pcmd->input_bytes = (pcmd->argv_len_sum + (pcmd->argc - 1) + 2); return C_OK; } @@ -2518,21 +2580,21 @@ static void setProtocolError(const char *errstr, client *c) { * This function is called if processInputBuffer() detects that the next * command is in RESP format, so the first byte in the command is found * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */ -int processMultibulkBuffer(client *c) { +static int processMultibulkBuffer(client *c, pendingCommand *pcmd) { char *newline = NULL; int ok; long long ll; size_t querybuf_len = sdslen(c->querybuf); /* Cache sdslen */ if (c->multibulklen == 0) { - /* The client should have been reset */ - serverAssertWithInfo(c,NULL,c->argc == 0); + /* The pending command should have been reset */ + serverAssertWithInfo(c,NULL,pcmd->argc == 0); /* Multi bulk length cannot be read without a \r\n */ newline = strchr(c->querybuf+c->qb_pos,'\r'); if (newline == NULL) { if (querybuf_len-c->qb_pos > PROTO_INLINE_MAX_SIZE) { - c->read_error = CLIENT_READ_TOO_BIG_MBULK_COUNT_STRING; + pcmd->read_error = CLIENT_READ_TOO_BIG_MBULK_COUNT_STRING; } return C_ERR; } @@ -2547,10 +2609,10 @@ int processMultibulkBuffer(client *c) { size_t multibulklen_slen = newline - (c->querybuf + 1 + c->qb_pos); ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll); if (!ok || ll > INT_MAX) { - c->read_error = CLIENT_READ_INVALID_MULTIBUCK_LENGTH; + pcmd->read_error = CLIENT_READ_INVALID_MULTIBUCK_LENGTH; return C_ERR; } else if (ll > 10 && authRequired(c)) { - c->read_error = CLIENT_READ_UNAUTH_MBUCK_COUNT; + pcmd->read_error = CLIENT_READ_UNAUTH_MBUCK_COUNT; return C_ERR; } @@ -2559,19 +2621,16 @@ int processMultibulkBuffer(client *c) { if (ll <= 0) return C_OK; c->multibulklen = ll; + c->bulklen = -1; - /* Setup argv array on client structure. - * Create new argv in the following cases: - * 1) When the requested size is greater than the current size. - * 2) When the requested size is less than the current size, because - * we always allocate argv gradually with a maximum size of 1024, - * Therefore, if argv_len exceeds this limit, we always reallocate. */ - if (unlikely(c->multibulklen > c->argv_len || c->argv_len > 1024)) { - zfree(c->argv); - c->argv_len = min(c->multibulklen, 1024); - c->argv = zmalloc(sizeof(robj*)*c->argv_len); + /* Setup argv array on pending command structure. + * Reallocate argv array when the requested size is greater than current size. */ + if (c->multibulklen > pcmd->argv_len) { + zfree(pcmd->argv); + pcmd->argv_len = min(c->multibulklen, 1024); + pcmd->argv = zmalloc(sizeof(robj*)*(pcmd->argv_len)); + pcmd->argv_len_sum = 0; } - c->argv_len_sum = 0; /* Per-slot network bytes-in calculation. * @@ -2604,17 +2663,17 @@ int processMultibulkBuffer(client *c) { * * The 1st component is calculated within the below line. * */ - c->net_input_bytes_curr_cmd += (multibulklen_slen + 3); + pcmd->input_bytes += (multibulklen_slen + 3); } serverAssertWithInfo(c,NULL,c->multibulklen > 0); while(c->multibulklen) { /* Read bulk length if unknown */ if (c->bulklen == -1) { - newline = strchr(c->querybuf+c->qb_pos,'\r'); + newline = memchr(c->querybuf+c->qb_pos,'\r',sdslen(c->querybuf) - c->qb_pos); if (newline == NULL) { if (querybuf_len-c->qb_pos > PROTO_INLINE_MAX_SIZE) { - c->read_error = CLIENT_READ_TOO_BIG_BUCK_COUNT_STRING; + pcmd->read_error = CLIENT_READ_TOO_BIG_BUCK_COUNT_STRING; return C_ERR; } break; @@ -2625,7 +2684,7 @@ int processMultibulkBuffer(client *c) { break; if (c->querybuf[c->qb_pos] != '$') { - c->read_error = CLIENT_READ_EXPECTED_DOLLAR; + pcmd->read_error = CLIENT_READ_EXPECTED_DOLLAR; return C_ERR; } @@ -2633,10 +2692,10 @@ int processMultibulkBuffer(client *c) { ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll); if (!ok || ll < 0 || (!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) { - c->read_error = CLIENT_READ_INVALID_BUCK_LENGTH; + pcmd->read_error = CLIENT_READ_INVALID_BUCK_LENGTH; return C_ERR; } else if (ll > 16384 && authRequired(c)) { - c->read_error = CLIENT_READ_UNAUTH_BUCK_LENGTH; + pcmd->read_error = CLIENT_READ_UNAUTH_BUCK_LENGTH; return C_ERR; } @@ -2670,7 +2729,9 @@ int processMultibulkBuffer(client *c) { } c->bulklen = ll; /* Per-slot network bytes-in calculation, 2nd component. */ - c->net_input_bytes_curr_cmd += (bulklen_slen + 3); + pcmd->input_bytes += (bulklen_slen + 3); + } else { + serverAssert(pcmd->flags & PENDING_CMD_FLAG_INCOMPLETE); } /* Read bulk argument */ @@ -2678,10 +2739,9 @@ int processMultibulkBuffer(client *c) { break; } else { /* Check if we have space in argv, grow if needed */ - if (c->argc >= c->argv_len) { - serverAssert(c->argv_len); /* Ensure argv is not freed while the client is in the mid of parsing command. */ - c->argv_len = min(c->argv_len < INT_MAX/2 ? c->argv_len*2 : INT_MAX, c->argc+c->multibulklen); - c->argv = zrealloc(c->argv, sizeof(robj*)*c->argv_len); + if (pcmd->argc >= pcmd->argv_len) { + pcmd->argv_len = min(pcmd->argv_len < INT_MAX/2 ? (pcmd->argv_len)*2 : INT_MAX, pcmd->argc+c->multibulklen); + pcmd->argv = zrealloc(pcmd->argv, sizeof(robj*)*(pcmd->argv_len)); } /* Optimization: if a non-master client's buffer contains JUST our bulk element @@ -2692,8 +2752,9 @@ int processMultibulkBuffer(client *c) { c->bulklen >= PROTO_MBULK_BIG_ARG && querybuf_len == (size_t)(c->bulklen+2)) { - c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf); - c->argv_len_sum += c->bulklen; + (pcmd->argv)[(pcmd->argc)++] = createObject(OBJ_STRING,c->querybuf); + pcmd->argv_len_sum += c->bulklen; + c->all_argv_len_sum += c->bulklen; sdsIncrLen(c->querybuf,-2); /* remove CRLF */ /* Assume that if we saw a fat argument we'll see another one likely... * But only if that fat argument is not too big compared to the memory limit. */ @@ -2705,9 +2766,10 @@ int processMultibulkBuffer(client *c) { sdsclear(c->querybuf); querybuf_len = sdslen(c->querybuf); /* Update cached length */ } else { - c->argv[c->argc++] = + (pcmd->argv)[(pcmd->argc)++] = createStringObject(c->querybuf+c->qb_pos,c->bulklen); - c->argv_len_sum += c->bulklen; + pcmd->argv_len_sum += c->bulklen; + c->all_argv_len_sum += c->bulklen; c->qb_pos += c->bulklen+2; } c->bulklen = -1; @@ -2718,12 +2780,30 @@ int processMultibulkBuffer(client *c) { /* We're done when c->multibulk == 0 */ if (c->multibulklen == 0) { /* Per-slot network bytes-in calculation, 3rd and 4th components. */ - c->net_input_bytes_curr_cmd += (c->argv_len_sum + (c->argc * 2)); + pcmd->input_bytes += (pcmd->argv_len_sum + (pcmd->argc * 2)); + pcmd->flags &= ~PENDING_CMD_FLAG_INCOMPLETE; return C_OK; } /* Still not ready to process the command */ - return C_ERR; + pcmd->flags |= PENDING_CMD_FLAG_INCOMPLETE; + return C_OK; +} + +/* Prepare the client for executing the next command: + * + * 1. Append the response, if necessary. + * 2. Reset the client. + * 3. Update the all_argv_len_sum counter and advance the pending_cmd cyclic buffer. + * 4. Update the cluster slot stats, if necessary. + */ +void prepareForNextCommand(client *c, int update_slot_stats) { + reqresAppendResponse(c); + if (update_slot_stats) { + /* We should do this before reset client. */ + clusterSlotStatsAddNetworkBytesInForUserClient(c); + } + resetClientInternal(c, 1); } /* Perform necessary tasks after a command was executed: @@ -2741,14 +2821,13 @@ void commandProcessed(client *c) { * since we have not applied the command. */ if (c->flags & CLIENT_BLOCKED) return; - reqresAppendResponse(c); - clusterSlotStatsAddNetworkBytesInForUserClient(c); - resetClientInternal(c, 0); + prepareForNextCommand(c, 1); long long prev_offset = c->reploff; if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ - c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + serverAssert(c->reploff_next > 0); + c->reploff = c->reploff_next; } /* If the client is a master we need to compute the difference @@ -2825,7 +2904,7 @@ int processPendingCommandAndInputBuffer(client *c) { * Note: when a master client steps into this function, * it can always satisfy this condition, because its querybuf * contains data not applied. */ - if (c->querybuf && sdslen(c->querybuf) > 0) { + if ((c->querybuf && sdslen(c->querybuf) > 0) || c->pending_cmds.ready_len > 0) { return processInputBuffer(c); } return C_OK; @@ -2894,19 +2973,37 @@ void handleClientReadError(client *c) { break; } default: - serverPanic("Unknown client read error"); + serverPanic("Unknown client read error: %d", c->read_error); break; } } + +/* Helper function to check if a read error is fatal (should stop processing) */ +int isClientReadErrorFatal(client *c) { + return c->read_error != 0 && + c->read_error != CLIENT_READ_COMMAND_NOT_FOUND && + c->read_error != CLIENT_READ_BAD_ARITY && + c->read_error != CLIENT_READ_CROSS_SLOT; +} + /* This function is called every time, in the client structure 'c', there is * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. * return C_ERR in case the client was freed during the processing */ int processInputBuffer(client *c) { + /* We limit the lookahead for unauthenticated connections to 1. + * This is both to reduce memory overhead, and to prevent errors: AUTH can + * affect the handling of succeeding commands. Parsing of "large" + * unauthenticated multibulk commands is rejected, which would cause those + * commands to incorrectly return an error to the client. */ + const int lookahead = authRequired(c) ? 1 : server.lookahead; + /* Keep processing while there is something in the input buffer */ - while(c->qb_pos < sdslen(c->querybuf)) { + while ((c->querybuf && c->qb_pos < sdslen(c->querybuf)) || + c->pending_cmds.ready_len > 0) + { /* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED || c->flags & CLIENT_UNBLOCKED) break; @@ -2927,52 +3024,108 @@ int processInputBuffer(client *c) { * The same applies for clients we want to terminate ASAP. */ if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; - /* Determine request type when unknown. */ - if (!c->reqtype) { - if (c->querybuf[c->qb_pos] == '*') { - c->reqtype = PROTO_REQ_MULTIBULK; - } else { - c->reqtype = PROTO_REQ_INLINE; + /* Determine if we need to parse more commands from the query buffer. + * Only parse when there are no ready commands waiting to be processed. */ + const int parse_more = !c->pending_cmds.ready_len; + + /* Parse up to lookahead commands only if we don't have enough ready commands */ + while (parse_more && c->pending_cmds.ready_len < lookahead && + c->querybuf && c->qb_pos < sdslen(c->querybuf)) + { + /* Determine request type when unknown. */ + if (!c->reqtype) { + if (c->querybuf[c->qb_pos] == '*') { + c->reqtype = PROTO_REQ_MULTIBULK; + } else { + c->reqtype = PROTO_REQ_INLINE; + } } + + pendingCommand *pcmd = NULL; + if (c->reqtype == PROTO_REQ_INLINE) { + pcmd = acquirePendingCommand(); + if (processInlineBuffer(c, pcmd) == C_ERR && !pcmd->read_error) { + /* If it fails but there are no errors, it means that it might just be + * that the desired content cannot be parsed. At this point, we exit and wait for the next time. */ + freePendingCommand(c, pcmd); + break; + } + } else if (c->reqtype == PROTO_REQ_MULTIBULK) { + int incomplete = (c->pending_cmds.len != c->pending_cmds.ready_len); + if (unlikely(incomplete)) { + pcmd = popPendingCommandFromTail(&c->pending_cmds); + } else { + pcmd = acquirePendingCommand(); + } + + if (processMultibulkBuffer(c, pcmd) == C_ERR && !pcmd->read_error) { + /* If it fails but there are no errors, it means that it might just be + * that the desired content cannot be parsed. At this point, we exit and wait for the next time. */ + freePendingCommand(c, pcmd); + break; + } + } else { + serverPanic("Unknown request type"); + } + + addPendingCommand(&c->pending_cmds, pcmd); + if (unlikely(pcmd->read_error || (pcmd->flags & PENDING_CMD_FLAG_INCOMPLETE))) + break; + + pcmd->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + preprocessCommand(c, pcmd); + pcmd->flags |= PENDING_CMD_FLAG_PREPROCESSED; + resetClientQbufState(c); } - if (c->reqtype == PROTO_REQ_INLINE) { - if (processInlineBuffer(c) != C_OK) { - if (c->running_tid != IOTHREAD_MAIN_THREAD_ID && c->read_error) - enqueuePendingClientsToMainThread(c, 0); - break; + /* Try to consume the next ready command from the pending command list. */ + if (!c->pending_cmds.ready_len) + break; + pendingCommand *curcmd = c->pending_cmds.head; + + /* We populate the old client fields so we don't have to modify all existing logic to work with pendingCommands */ + c->argc = curcmd->argc; + c->argv = curcmd->argv; + c->argv_len = curcmd->argv_len; + c->net_input_bytes_curr_cmd += curcmd->input_bytes; + c->reploff_next = curcmd->reploff; + c->slot = curcmd->slot; + c->lookedcmd = curcmd->cmd; + c->read_error = curcmd->read_error; + c->current_pending_cmd = curcmd; + + /* Prefetch the command only when more commands have been parsed and we + * are in the main thread. If running in an IO thread, prefetch will be + * deferred until the client is processed by the main thread. Skip prefetch + * if there are too few commands to avoid meaningless prefetching. */ + if (parse_more && c->running_tid == IOTHREAD_MAIN_THREAD_ID && + c->pending_cmds.ready_len > 1) + { + /* Prefetch the commands. */ + resetCommandsBatch(); + addCommandToBatch(c); + prefetchCommands(); + } + + /* Check if the client has a fatal read error that requires stopping processing. */ + if (isClientReadErrorFatal(c)) { + if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) { + enqueuePendingClientsToMainThread(c, 0); } - } else if (c->reqtype == PROTO_REQ_MULTIBULK) { - if (processMultibulkBuffer(c) != C_OK) { - if (c->running_tid != IOTHREAD_MAIN_THREAD_ID && c->read_error) - enqueuePendingClientsToMainThread(c, 0); - break; - } - } else { - serverPanic("Unknown request type"); + break; } /* Multibulk processing could see a <= 0 length. */ - if (c->argc == 0) { - freeClientArgvInternal(c, 0); - c->reqtype = 0; - c->multibulklen = 0; - c->bulklen = -1; + if (!c->argc) { + /* A naked newline can be sent from masters as a keep-alive, or from slaves to refresh + * the last ACK time. In that case there's no command to actually execute. */ + prepareForNextCommand(c, 0); } else { /* If we are in the context of an I/O thread, we can't really * execute the command here. All we can do is to flag the client * as one that needs to process the command. */ if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) { c->io_flags |= CLIENT_IO_PENDING_COMMAND; - c->iolookedcmd = lookupCommand(c->argv, c->argc); - if (c->iolookedcmd && !commandCheckArity(c->iolookedcmd, c->argc, NULL)) { - /* The command was found, but the arity is invalid, reset it and let main - * thread handle. To avoid memory prefetching on an invalid command. */ - c->iolookedcmd = NULL; - } - int slot = getSlotFromCommand(c->iolookedcmd, c->argv, c->argc); - /* Reset to -1, since c->slot expects -1 if no slot is being used */ - c->slot = (slot == GETSLOT_CROSSSLOT || slot == GETSLOT_NOKEYS) ? -1 : slot; enqueuePendingClientsToMainThread(c, 0); break; } @@ -3002,6 +3155,7 @@ int processInputBuffer(client *c) { * so the repl_applied is not equal to qb_pos. */ if (c->repl_applied) { sdsrange(c->querybuf,c->repl_applied,-1); + serverAssert(c->qb_pos >= (size_t)c->repl_applied); c->qb_pos -= c->repl_applied; c->repl_applied = 0; } @@ -3144,7 +3298,7 @@ void readQueryFromClient(connection *conn) { c = NULL; done: - if (c && c->read_error) { + if (c && isClientReadErrorFatal(c)) { if (c->running_tid == IOTHREAD_MAIN_THREAD_ID) { handleClientReadError(c); } @@ -3296,7 +3450,7 @@ sds catClientInfoString(sds s, client *client) { " watch=%i", (int) listLength(client->watched_keys), " qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0, " qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0, - " argv-mem=%U", (unsigned long long) client->argv_len_sum, + " argv-mem=%U", (unsigned long long) client->all_argv_len_sum, " multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums, " rbs=%U", (unsigned long long) client->buf_usable_size, " rbp=%U", (unsigned long long) client->buf_peak, @@ -4205,14 +4359,52 @@ void rewriteClientCommandVector(client *c, int argc, ...) { void replaceClientCommandVector(client *c, int argc, robj **argv) { int j; retainOriginalCommandVector(c); + + /* We don't need to just fix the client argv, we also need to fix the pending command (same argv), + * But sometimes we reach here not from a real client, but from a Lua 'scriptRunCtx'. This flow bypasses the + * pending-command system entirely and uses c->argv directly. In this case there's no pending commands + * to update, so we skip that code. */ + pendingCommand *pcmd = NULL; + int is_mstate = 0; + if (c->mstate.executing_cmd < 0) { + is_mstate = 0; + if (c->pending_cmds.ready_len > 0) { + pcmd = c->pending_cmds.head; + serverAssert(!(pcmd->flags & PENDING_CMD_FLAG_INCOMPLETE)); + } + } else { + is_mstate = 1; + serverAssert(c->mstate.executing_cmd < c->mstate.count); + pcmd = c->mstate.commands[c->mstate.executing_cmd]; + } + + if (pcmd) { + serverAssert(pcmd->argv == c->argv); + pcmd->argv = argv; + pcmd->argc = argc; + pcmd->argv_len = argc; + } freeClientArgv(c); c->argv = argv; c->argc = c->argv_len = argc; - c->argv_len_sum = 0; - for (j = 0; j < c->argc; j++) - if (c->argv[j]) - c->argv_len_sum += getStringObjectLen(c->argv[j]); + + if (!is_mstate) { /* multi-state does not track all_argv_len_sum, see code in queueMultiCommand */ + size_t new_argv_len_sum = 0; + for (j = 0; j < c->argc; j++) + if (c->argv[j]) + new_argv_len_sum += getStringObjectLen(c->argv[j]); + + if (!pcmd) { + c->all_argv_len_sum = new_argv_len_sum; + } else { + c->all_argv_len_sum -= pcmd->argv_len_sum; + pcmd->argv_len_sum = new_argv_len_sum; + c->all_argv_len_sum += pcmd->argv_len_sum; + } + } c->cmd = lookupCommandOrOriginal(c->argv,c->argc); + if (pcmd) + pcmd->cmd = c->cmd; serverAssertWithInfo(c,NULL,c->cmd != NULL); } @@ -4233,6 +4425,13 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { robj *oldval; retainOriginalCommandVector(c); + /* We don't need to just fix the client argv, we also need to fix the pending command (same argv), + * But sometimes we reach here not from a real client, but from a Lua 'scriptRunCtx'. This flow bypasses the + * pending-command system entirely and uses c->argv directly. In this case there's no pending commands + * to update, so we skip that code. */ + pendingCommand *pcmd = c->pending_cmds.head ? c->pending_cmds.head: NULL; + int update_pcmd = pcmd && pcmd->argv == c->argv; + /* We need to handle both extending beyond argc (just update it and * initialize the new element) or beyond argv_len (realloc is needed). */ @@ -4245,12 +4444,12 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { c->argv[i] = NULL; } oldval = c->argv[i]; - if (oldval) c->argv_len_sum -= getStringObjectLen(oldval); + if (oldval) c->all_argv_len_sum -= getStringObjectLen(oldval); if (newval) { c->argv[i] = newval; incrRefCount(newval); - c->argv_len_sum += getStringObjectLen(newval); + c->all_argv_len_sum += getStringObjectLen(newval); } else { /* move the remaining arguments one step left */ for (int j = i+1; j < c->argc; j++) { @@ -4260,10 +4459,20 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { } if (oldval) decrRefCount(oldval); + if (update_pcmd) { + pcmd->argv = c->argv; + pcmd->argc = c->argc; + pcmd->argv_len = c->argv_len; + if (oldval) pcmd->argv_len_sum -= getStringObjectLen(oldval); + if (newval) pcmd->argv_len_sum += getStringObjectLen(newval); + } + /* If this is the command name make sure to fix c->cmd. */ if (i == 0) { c->cmd = lookupCommandOrOriginal(c->argv,c->argc); serverAssertWithInfo(c,NULL,c->cmd != NULL); + if (update_pcmd) + pcmd->cmd = c->cmd; } } @@ -4313,7 +4522,7 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to * spot problematic clients. */ - mem += c->argv_len_sum + sizeof(robj*)*c->argc; + mem += c->all_argv_len_sum + sizeof(robj*)*c->argc; mem += multiStateMemOverhead(c); /* Add memory overhead of pubsub channels and patterns. Note: this is just the overhead of the robj pointers @@ -4760,3 +4969,240 @@ void evictClients(void) { } } } + +/* Acquire a pending command from the shared pool or allocate a new one. + * Uses the shared pool when available (only when IO threads are inactive), + * otherwise allocates a new pending command structure. */ +static pendingCommand *acquirePendingCommand(void) { + /* Ensure pool is empty when IO threads are active to avoid race conditions */ + serverAssert(server.io_threads_active == 0 || server.cmd_pool.size == 0); + + pendingCommand *pcmd = NULL; + if (server.cmd_pool.size > 0) { + /* Shared pool is available. */ + pcmd = server.cmd_pool.pool[--server.cmd_pool.size]; + server.cmd_pool.pool[server.cmd_pool.size] = NULL; + + /* Track minimum pool size for utilization calculation */ + if (server.cmd_pool.size < server.cmd_pool.min_size) + server.cmd_pool.min_size = server.cmd_pool.size; + } else { + /* Shared pool is empty, allocate new pending command. */ + pcmd = zmalloc(sizeof(pendingCommand)); + initPendingCommand(pcmd); + } + return pcmd; +} + +/* Try to expand the pending command pool capacity. + * Returns 1 if expansion succeeded or wasn't needed, 0 if expansion failed. */ +static int tryExpandPendingCommandPool(void) { + /* Check if expansion is needed */ + if (server.cmd_pool.size < server.cmd_pool.capacity) { + return 1; /* No expansion needed */ + } + + /* Check if we can expand further */ + if (server.cmd_pool.capacity >= PENDING_COMMAND_POOL_MAX_SIZE) { + return 0; /* Already at maximum capacity */ + } + + /* Expand the pending command pool capacity by doubling it, up to the maximum size */ + int new_capacity = server.cmd_pool.capacity * 2; + if (new_capacity > PENDING_COMMAND_POOL_MAX_SIZE) + new_capacity = PENDING_COMMAND_POOL_MAX_SIZE; + + server.cmd_pool.pool = zrealloc(server.cmd_pool.pool, sizeof(pendingCommand*) * new_capacity); + server.cmd_pool.capacity = new_capacity; + return 1; /* Expansion succeeded */ +} + +/* Reclaim a pending command by adding it to the shared pool for reuse or freeing it. + * The shared pool is only used when IO threads are inactive to avoid race conditions + * between multiple clients. Additionally, pool reuse provides minimal benefit in + * multi-threaded scenarios, so we only use it in single-threaded mode. */ +static void reclaimPendingCommand(client *c, pendingCommand *pcmd) { + if (!server.io_threads_active) { + /* Try to add to shared pool for reuse if argv isn't too large */ + if (likely(pcmd->argv_len < 64)) { + /* Check if pool needs expansion before attempting to add */ + if (!tryExpandPendingCommandPool()) { + /* Pool is at maximum capacity, can't expand further */ + goto free_command; + } + + /* Clean up command resources before adding to pool */ + for (int j = 0; j < pcmd->argc; j++) + decrRefCount(pcmd->argv[j]); + + getKeysFreeResult(&pcmd->keys_result); + + if (c) { + serverAssert(c->all_argv_len_sum >= pcmd->argv_len_sum); /* assert this doesn't try to go negative */ + c->all_argv_len_sum -= pcmd->argv_len_sum; + pcmd->argv_len_sum = 0; + } + + /* Reset the pending command while preserving the argv array for shared pool reuse */ + robj **argv = pcmd->argv; + int argv_len = pcmd->argv_len; + memset(pcmd, 0, sizeof(pendingCommand)); + pcmd->argv = argv; + pcmd->argv_len = argv_len; + pcmd->slot = INVALID_CLUSTER_SLOT; + + server.cmd_pool.pool[server.cmd_pool.size++] = pcmd; + return; /* Successfully added to shared pool for reuse */ + } + } else { + /* IO threads are active, handle thread-specific cleanup */ + if (c && c->tid != IOTHREAD_MAIN_THREAD_ID) { + /* Partial cleanup for IO thread commands to avoid race issues. + * To avoid robj that may already be referenced elsewhere, we should + * decrease the reference count to release our reference to it. */ + for (int j = 0; j < pcmd->argc; j++) { + robj *o = pcmd->argv[j]; + if (o && o->refcount > 1) { + decrRefCount(o); + pcmd->argv[j] = NULL; + } + } + + serverAssert(c->all_argv_len_sum >= pcmd->argv_len_sum); /* assert this doesn't try to go negative */ + c->all_argv_len_sum -= pcmd->argv_len_sum; + pcmd->argv_len_sum = 0; + + tryDeferFreeClientObject(c, DEFERRED_OBJECT_TYPE_PENDING_COMMAND, pcmd); + return; + } + } + +free_command: + /* Shared pool is full or command argv is too large, free this pending command */ + freePendingCommand(c, pcmd); +} + +void initPendingCommand(pendingCommand *pcmd) { + memset(pcmd, 0, sizeof(pendingCommand)); + pcmd->slot = INVALID_CLUSTER_SLOT; +} + +void freePendingCommand(client *c, pendingCommand *pcmd) { + if (!pcmd) + return; + + getKeysFreeResult(&pcmd->keys_result); + + if (pcmd->argv) { + for (int j = 0; j < pcmd->argc; j++) { + robj *o = pcmd->argv[j]; + if (!o) continue; /* TODO */ + decrRefCount(o); + } + + zfree(pcmd->argv); + + /* c may be NULL when called from reclaimPendingCommand */ + if (c) { + serverAssert(c->all_argv_len_sum >= pcmd->argv_len_sum); /* assert this doesn't try to go negative */ + c->all_argv_len_sum -= pcmd->argv_len_sum; + } + } + + zfree(pcmd); +} + +/* Add a command to the tail of the pending command list. */ +void addPendingCommand(pendingCommandList *queue, pendingCommand *cmd) { + cmd->next = NULL; + cmd->prev = queue->tail; + + if (queue->tail) { + queue->tail->next = cmd; + } else { + /* Queue was empty */ + queue->head = cmd; + } + + queue->tail = cmd; + queue->len++; + if (!(cmd->flags & PENDING_CMD_FLAG_INCOMPLETE)) queue->ready_len++; +} + +/* Remove and return the first pending command from the list. + * Returns NULL if the list is empty. */ +pendingCommand *popPendingCommandFromHead(pendingCommandList *list) { + pendingCommand *cmd = list->head; + if (!cmd) return NULL; /* List is empty */ + + list->head = cmd->next; + if (list->head) { + list->head->prev = NULL; + } else { + /* Queue was empty */ + list->tail = NULL; + } + + cmd->next = cmd->prev = NULL; + list->len--; + if (!(cmd->flags & PENDING_CMD_FLAG_INCOMPLETE)) list->ready_len--; + return cmd; +} + +/* Remove and return the last pending command from the list. + * Returns NULL if the list is empty. */ +pendingCommand *popPendingCommandFromTail(pendingCommandList *list) { + pendingCommand *cmd = list->tail; + if (!cmd) return NULL; /* List is empty */ + + list->tail = cmd->prev; + if (list->tail) { + list->tail->next = NULL; + } else { + /* Queue became empty */ + list->head = NULL; + } + + cmd->next = cmd->prev = NULL; + list->len--; + if (!(cmd->flags & PENDING_CMD_FLAG_INCOMPLETE)) list->ready_len--; + return cmd; +} + +/* Get cached key result for current pending command */ +getKeysResult *getClientCachedKeyResult(client *c) { + pendingCommand *pcmd = c->current_pending_cmd; + if (pcmd) { + /* Preprocess the command if needed */ + if (!(pcmd->flags & PENDING_CMD_FLAG_PREPROCESSED)) { + preprocessCommand(c, pcmd); + pcmd->flags |= PENDING_CMD_FLAG_PREPROCESSED; + } + + /* Return cached result if available */ + if (pcmd->flags & PENDING_CMD_KEYS_RESULT_VALID) + return &c->current_pending_cmd->keys_result; + } + return NULL; +} + +void shrinkPendingCommandPool(void) { + /* Don't shrink if pool is too small. */ + if (server.cmd_pool.capacity <= PENDING_COMMAND_POOL_SIZE) return; + + /* Free commands until we have half the current size, but not below minimum. */ + int target_size = max(server.cmd_pool.size / 2, PENDING_COMMAND_POOL_SIZE); + + while (server.cmd_pool.size > target_size) { + pendingCommand *cmd = server.cmd_pool.pool[--server.cmd_pool.size]; + if (cmd) { + freePendingCommand(NULL, cmd); + server.cmd_pool.pool[server.cmd_pool.size] = NULL; + } + } + + int old_capacity = server.cmd_pool.capacity; + server.cmd_pool.capacity = target_size; + server.cmd_pool.pool = zrealloc(server.cmd_pool.pool, sizeof(pendingCommand*) * target_size); + serverLog(LL_DEBUG, "Shrunk pending command pool: capacity %d->%d", old_capacity, server.cmd_pool.capacity); +} diff --git a/src/replication.c b/src/replication.c index 2d43278b5..dcef34569 100644 --- a/src/replication.c +++ b/src/replication.c @@ -4330,12 +4330,14 @@ void replicationCacheMaster(client *c) { server.master->qb_pos = 0; server.master->repl_applied = 0; server.master->read_reploff = server.master->reploff; + server.master->reploff_next = 0; if (c->flags & CLIENT_MULTI) discardTransaction(c); listEmpty(c->reply); c->sentlen = 0; c->reply_bytes = 0; c->bufpos = 0; - resetClient(c); + resetClient(c, -1); + resetClientQbufState(c); /* Save the master. Server.master will be set to null later by * replicationHandleMasterDisconnection(). */ diff --git a/src/script.c b/src/script.c index dfbca9511..fb815241a 100644 --- a/src/script.c +++ b/src/script.c @@ -486,7 +486,7 @@ static int scriptVerifyClusterState(scriptRunCtx *run_ctx, client *c, client *or c->flags |= original_c->flags & (CLIENT_READONLY | CLIENT_ASKING); const uint64_t cmd_flags = getCommandFlags(c); int hashslot = -1; - if (getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, cmd_flags, &error_code) != getMyClusterNode()) { + if (getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, NULL, 0, cmd_flags, &error_code) != getMyClusterNode()) { if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) { *err = sdsnew( "Script attempted to execute a write command while the " diff --git a/src/script_lua.c b/src/script_lua.c index 2e8220743..f742611c7 100644 --- a/src/script_lua.c +++ b/src/script_lua.c @@ -974,7 +974,8 @@ cleanup: c->argc = c->argv_len = 0; c->user = NULL; c->argv = NULL; - resetClient(c); + c->all_argv_len_sum = 0; + resetClient(c, 1); inuse--; if (raise_error) { @@ -1134,7 +1135,7 @@ static int luaRedisAclCheckCmdPermissionsCommand(lua_State *lua) { raise_error = 1; } else { int keyidxptr; - if (ACLCheckAllUserCommandPerm(rctx->original_client->user, cmd, argv, argc, &keyidxptr) != ACL_OK) { + if (ACLCheckAllUserCommandPerm(rctx->original_client->user, cmd, argv, argc, NULL, &keyidxptr) != ACL_OK) { lua_pushboolean(lua, 0); } else { lua_pushboolean(lua, 1); diff --git a/src/server.c b/src/server.c index f127f61bf..bdf948961 100644 --- a/src/server.c +++ b/src/server.c @@ -873,23 +873,6 @@ int clientsCronResizeQueryBuffer(client *c) { return 0; } -/* If the client has been idle for too long, free the client's arguments. */ -int clientsCronFreeArgvIfIdle(client *c) { - /* If the client is in the middle of parsing a command, or if argv is in use - * (e.g. parsed in the IO thread but not yet executed, or blocked), exit ASAP. */ - if (!c->argv || c->multibulklen || c->argc) return 0; - - /* Free argv if the client has been idle for more than 2 seconds or if argv - * size is too large. */ - time_t idletime = server.unixtime - c->lastinteraction; - if (idletime > 2 || c->argv_len > 128) { - c->argv_len = 0; - zfree(c->argv); - c->argv = NULL; - } - return 0; -} - /* The client output buffer can be adjusted to better fit the memory requirements. * * the logic is: @@ -961,7 +944,7 @@ int CurrentPeakMemUsageSlot = 0; int clientsCronTrackExpansiveClients(client *c) { size_t qb_size = c->querybuf ? sdsZmallocSize(c->querybuf) : 0; size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0; - size_t in_usage = qb_size + c->argv_len_sum + argv_size; + size_t in_usage = qb_size + c->all_argv_len_sum + argv_size; size_t out_usage = getClientOutputBufferMemoryUsage(c); /* Track the biggest values observed so far in this slot. */ @@ -1113,7 +1096,6 @@ int clientsCronRunClient(client *c) { * terminated. */ if (clientsCronHandleTimeout(c,now)) return 1; if (clientsCronResizeQueryBuffer(c)) return 1; - if (clientsCronFreeArgvIfIdle(c)) return 1; if (clientsCronResizeOutputBuffer(c,now)) return 1; if (clientsCronTrackExpansiveClients(c)) return 1; @@ -1131,6 +1113,24 @@ int clientsCronRunClient(client *c) { return 0; } +/* Periodic maintenance for the pending command pool. + * This function should be called from serverCron to manage pool size based on utilization patterns. */ +void pendingCommandPoolCron(void) { + /* Only shrink pool when IO threads are not active */ + if (server.io_threads_active) return; + + /* Calculate utilization rate based on minimum pool size reached */ + if (server.cmd_pool.capacity > PENDING_COMMAND_POOL_SIZE) { + /* If utilization is below threshold, shrink the pool */ + double utilization_ratio = 1.0 - (double)server.cmd_pool.min_size / server.cmd_pool.capacity; + if (utilization_ratio < 0.5) + shrinkPendingCommandPool(); + } + + /* Reset tracking for next interval */ + server.cmd_pool.min_size = server.cmd_pool.size; /* Reset to current size */ +} + /* This function is called by serverCron() and is used in order to perform * operations on clients that are important to perform constantly. For instance * we use this function in order to disconnect clients after a timeout, including @@ -1653,6 +1653,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { migrateCloseTimedoutSockets(); } + /* Periodically shrink pending command reuse pool */ + run_with_period(2000) { + pendingCommandPoolCron(); + } + /* Resize tracking keys table if needed. This is also done at every * command execution, but we want to be sure that if the last command * executed changes the value via CONFIG SET, the server will perform @@ -2940,6 +2945,12 @@ void initServer(void) { server.acl_info.user_auth_failures = 0; server.acl_info.invalid_channel_accesses = 0; + /* Initialize the shared pending command pool. */ + server.cmd_pool.size = 0; + server.cmd_pool.capacity = PENDING_COMMAND_POOL_SIZE; + server.cmd_pool.pool = zmalloc(sizeof(pendingCommand*) * PENDING_COMMAND_POOL_SIZE); + server.cmd_pool.min_size = 0; + /* Create the timer callback, this is our way to process many background * operations incrementally, like clients timeout, eviction of unaccessed * expired keys and so forth. */ @@ -2987,6 +2998,8 @@ void initServer(void) { if (server.maxmemory_clients != 0) initServerClientMemUsageBuckets(); + + prefetchCommandsBatchInit(); } void initListeners(void) { @@ -4070,6 +4083,47 @@ uint64_t getCommandFlags(client *c) { return cmd_flags; } +void preprocessCommand(client *c, pendingCommand *pcmd) { + pcmd->slot = INVALID_CLUSTER_SLOT; + if (pcmd->argc == 0) + return; + + /* Check if we can reuse the previous command instead of looking it up. + * The previous command is either the penultimate pending command (if it exists), or c->lastcmd. */ + struct redisCommand *last_cmd = pcmd->prev ? pcmd->prev->cmd : c->lastcmd; + + if (isCommandReusable(last_cmd, pcmd->argv[0])) + pcmd->cmd = last_cmd; + else + pcmd->cmd = lookupCommand(pcmd->argv, pcmd->argc); + + if (!pcmd->cmd) { + pcmd->read_error = CLIENT_READ_COMMAND_NOT_FOUND; + return; + } + + if ((pcmd->cmd->arity > 0 && pcmd->cmd->arity != pcmd->argc) || + (pcmd->argc < -pcmd->cmd->arity)) + { + pcmd->read_error = CLIENT_READ_BAD_ARITY; + return; + } + + pcmd->keys_result = (getKeysResult)GETKEYS_RESULT_INIT; + int num_keys = extractKeysAndSlot(pcmd->cmd, pcmd->argv, pcmd->argc, + &pcmd->keys_result, &pcmd->slot); + if (num_keys < 0) { + /* We skip the checks below since We expect the command to be rejected in this case */ + return; + } else if (num_keys > 0) { + /* If the command has keys but the slot is invalid, it means + * there is a cross-slot case. */ + if (pcmd->slot == INVALID_CLUSTER_SLOT) + pcmd->read_error = CLIENT_READ_CROSS_SLOT; + } + pcmd->flags |= PENDING_CMD_KEYS_RESULT_VALID; +} + /* If this function gets called we already read a whole * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the @@ -4113,11 +4167,17 @@ int processCommand(client *c) { * we do not have to repeat the same checks */ if (!client_reprocessing_command) { /* check if we can reuse the last command instead of looking up if we already have that info */ - struct redisCommand *cmd = NULL; - if (isCommandReusable(c->lastcmd, c->argv[0])) - cmd = c->lastcmd; - else - cmd = c->iolookedcmd ? c->iolookedcmd : lookupCommand(c->argv, c->argc); + struct redisCommand *cmd = c->lookedcmd; + + /* The command may have been modified by modules (e.g., in CommandFilters callbacks), + * so we need to look it up again. */ + if (!cmd) { + if (isCommandReusable(c->lastcmd, c->argv[0])) + cmd = c->lastcmd; + else + cmd = lookupCommand(c->argv, c->argc); + } + if (!cmd) { /* Handle possible security attacks. */ if (!strcasecmp(c->argv[0]->ptr,"host:") || !strcasecmp(c->argv[0]->ptr,"post")) { @@ -4215,7 +4275,7 @@ int processCommand(client *c) { { int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, - &c->slot,cmd_flags,&error_code); + &c->slot,getClientCachedKeyResult(c),c->read_error,cmd_flags,&error_code); if (n == NULL || !clusterNodeIsMyself(n)) { if (c->cmd->proc == execCommand) { discardTransaction(c); @@ -4470,9 +4530,9 @@ int areCommandKeysInSameSlot(client *c, int *hashslot) { /* If client is in multi-exec, we need to check the slot of all keys * in the transaction. */ for (int i = 0; i < (ms ? ms->count : 1); i++) { - struct redisCommand *cmd = ms ? ms->commands[i].cmd : c->cmd; - robj **argv = ms ? ms->commands[i].argv : c->argv; - int argc = ms ? ms->commands[i].argc : c->argc; + struct redisCommand *cmd = ms ? ms->commands[i]->cmd : c->cmd; + robj **argv = ms ? ms->commands[i]->argv : c->argv; + int argc = ms ? ms->commands[i]->argc : c->argc; getKeysResult result = GETKEYS_RESULT_INIT; int numkeys = getKeysFromCommand(cmd, argv, argc, &result); @@ -7008,7 +7068,7 @@ void dismissClientMemory(client *c) { dismissMemory(c->buf, c->buf_usable_size); if (c->querybuf) dismissSds(c->querybuf); /* Dismiss argv array only if we estimate it contains a big buffer. */ - if (c->argc && c->argv_len_sum/c->argc >= server.page_size) { + if (c->argc && c->all_argv_len_sum/c->argc >= server.page_size) { for (int i = 0; i < c->argc; i++) { dismissObject(c->argv[i], 0); } diff --git a/src/server.h b/src/server.h index fb0d2efb7..b03b35682 100644 --- a/src/server.h +++ b/src/server.h @@ -202,6 +202,9 @@ struct hdr_histogram; * in order to make sure of not over provisioning more than 128 fds. */ #define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96) +/* Default lookahead value */ +#define REDIS_DEFAULT_LOOKAHEAD 16 + /* OOM Score Adjustment classes. */ #define CONFIG_OOM_MASTER 0 #define CONFIG_OOM_REPLICA 1 @@ -463,6 +466,9 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_READ_CONN_DISCONNECTED 11 #define CLIENT_READ_CONN_CLOSED 12 #define CLIENT_READ_REACHED_MAX_QUERYBUF 13 +#define CLIENT_READ_COMMAND_NOT_FOUND 14 +#define CLIENT_READ_BAD_ARITY 15 +#define CLIENT_READ_CROSS_SLOT 16 /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -1146,16 +1152,11 @@ typedef struct rdbLoadingCtx { functionsLibCtx* functions_lib_ctx; }rdbLoadingCtx; -/* Client MULTI/EXEC state */ -typedef struct multiCmd { - robj **argv; - int argv_len; - int argc; - struct redisCommand *cmd; -} multiCmd; - +typedef struct pendingCommand pendingCommand; typedef struct multiState { - multiCmd *commands; /* Array of MULTI commands */ + pendingCommand **commands; /* Array of pointers to MULTI commands */ + int executing_cmd; /* The index of the currently executed transaction + command (index in commands field) */ int count; /* Total number of MULTI commands */ int cmd_flags; /* The accumulated command flags OR-ed together. So if at least a command has a given flag, it @@ -1164,7 +1165,7 @@ typedef struct multiState { is possible to know if all the commands have a certain flag. */ size_t argv_len_sums; /* mem used by all commands arguments */ - int alloc_count; /* total number of multiCmd struct memory reserved. */ + int alloc_count; /* total number of pendingCommand struct memory reserved. */ } multiState; /* This structure holds the blocking operation state for a client. @@ -1213,6 +1214,24 @@ typedef struct readyList { robj *key; } readyList; +/* List of pending commands. */ +typedef struct pendingCommandList { + pendingCommand *head; + pendingCommand *tail; + int len; /* Number of commands in the list */ + int ready_len; /* Number of commands that are ready to be processed */ +} pendingCommandList; + +/* Pending command pool management structure */ +#define PENDING_COMMAND_POOL_SIZE 16 +#define PENDING_COMMAND_POOL_MAX_SIZE 1024 +typedef struct pendingCommandPool { + pendingCommand **pool; /* Pool array for reusing pendingCommand objects */ + int size; /* Current number of objects in pool */ + int capacity; /* Current capacity of the pool array */ + int min_size; /* Minimum size since last check (indicates peak usage) */ +} pendingCommandPool; + /* This structure represents a Redis user. This is useful for ACLs, the * user is associated to the connection after the connection is authenticated. * If there is no associated user, the connection uses the default user. */ @@ -1243,7 +1262,7 @@ typedef struct readyList { typedef struct { sds name; /* The username as an SDS string. */ - uint32_t flags; /* See USER_FLAG_* */ + redisAtomic uint32_t flags; /* See USER_FLAG_* */ list *passwords; /* A list of SDS valid passwords for this user. */ list *selectors; /* A list of selectors this user validates commands against. This list will always contain at least @@ -1302,6 +1321,16 @@ typedef struct { size_t mem_usage_sum; } clientMemUsageBucket; +#define DEFERRED_OBJECT_TYPE_PENDING_COMMAND 1 +#define DEFERRED_OBJECT_TYPE_ROBJ 2 +/* Structure to hold objects that need to be freed later by IO threads. + * This allows the main thread to defer memory cleanup operations to + * IO threads to avoid blocking the main event loop. */ +typedef struct deferredObject { + int type; /* Pointer to the object to be freed */ + void *ptr; /* Type of object: DEFERRED_OBJECT_TYPE_* */ +} deferredObject; + #define SHOULD_CLUSTER_COMPATIBILITY_SAMPLE() \ (server.cluster_compatibility_sample_ratio == 100 || \ (double)rand()/RAND_MAX * 100 < server.cluster_compatibility_sample_ratio) @@ -1352,11 +1381,13 @@ typedef struct client { int argv_len; /* Size of argv array (may be more than argc) */ int original_argc; /* Num of arguments of original command if arguments were rewritten. */ robj **original_argv; /* Arguments of original command if arguments were rewritten. */ - size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ - robj **deferred_objects; /* Array of deferred objects to free. */ + size_t all_argv_len_sum; /* Sum of lengths of objects in all pendingCommand argv lists */ + pendingCommandList pending_cmds; /* List of parsed pending commands */ + pendingCommand *current_pending_cmd; + deferredObject *deferred_objects; /* Array of deferred objects to free. */ int deferred_objects_num; /* Number of deferred objects to free. */ struct redisCommand *cmd, *lastcmd; /* Last command executed. */ - struct redisCommand *iolookedcmd; /* Command looked up in IO threads. */ + struct redisCommand *lookedcmd; /* Command looked up in lookahead. */ struct redisCommand *realcmd; /* The original command that was executed by the client, Used to update error stats in case the c->cmd was modified during the command invocation (like on GEOADD for example). */ @@ -1392,6 +1423,7 @@ typedef struct client { sds replpreamble; /* Replication DB preamble. */ long long read_reploff; /* Read replication offset if this is a master. */ long long reploff; /* Applied replication offset if this is a master. */ + long long reploff_next; /* Next value to set for reploff when a command finishes executing */ long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_aof_off; /* Replication AOF fsync ack offset, if this is a slave. */ @@ -1872,6 +1904,8 @@ struct redisServer { int io_threads_clients_num[IO_THREADS_MAX_NUM]; /* Number of clients assigned to each IO thread. */ int io_threads_do_reads; /* Read and parse from IO threads? */ int io_threads_active; /* Is IO threads currently active? */ + pendingCommandPool cmd_pool; /* Shared pool for reusing pendingCommand, + * only when IO threads disabled */ int prefetch_batch_max_size;/* Maximum number of keys to prefetch in a single batch */ long long events_processed_while_blocked; /* processEventsWhileBlocked() */ int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */ @@ -1993,6 +2027,7 @@ struct redisServer { int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */ unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */ size_t client_max_querybuf_len; /* Limit for client query buffer length */ + int lookahead; /* how many commands in each client pipeline to decode and prefetch */ int dbnum; /* Total number of configured DBs */ int supervised; /* 1 if supervised, 0 otherwise. */ int supervised_mode; /* See SUPERVISED_* */ @@ -2365,6 +2400,32 @@ typedef struct { } getKeysResult; #define GETKEYS_RESULT_INIT { 0, MAX_KEYS_BUFFER, {{0}}, NULL } +/* pendingCommand flags */ +enum { + PENDING_CMD_FLAG_INCOMPLETE = 1 << 0, /* Command parsing is incomplete, still waiting for more data */ + PENDING_CMD_FLAG_PREPROCESSED = 1 << 1, /* This command has passed pre-processing */ + PENDING_CMD_KEYS_RESULT_VALID = 1 << 2, /* Command's keys_result is valid and cached */ +}; + +/* Parser state and parse result of a command from a client's input buffer. */ +struct pendingCommand { + int argc; /* Num of arguments of current command. */ + int argv_len; /* Size of argv array (may be more than argc) */ + robj **argv; /* Arguments of current command. */ + size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ + unsigned long long input_bytes; + struct redisCommand *cmd; + getKeysResult keys_result; + long long reploff; /* c->reploff should be set to this value when the command is processed */ + int flags; + int slot; /* The slot the command is executing against. Set to INVALID_CLUSTER_SLOT + * if no slot is being used or if the command has a cross slot error */ + uint8_t read_error; + + struct pendingCommand *next; + struct pendingCommand *prev; +}; + /* Key specs definitions. * * Brief: This is a scheme that tries to describe the location @@ -2826,6 +2887,14 @@ void *moduleGetHandleByName(char *modulename); int moduleIsModuleCommand(void *module_handle, struct redisCommand *cmd); int moduleHasSubscribersForKeyspaceEvent(int type); +/* pcmd */ +void initPendingCommand(pendingCommand *pcmd); +void freePendingCommand(client *c, pendingCommand *pcmd); +void addPendingCommand(pendingCommandList *queue, pendingCommand *cmd); +pendingCommand *popPendingCommandFromHead(pendingCommandList *queue); +pendingCommand *popPendingCommandFromTail(pendingCommandList *queue); +void shrinkPendingCommandPool(void); + /* Utils */ long long ustime(void); mstime_t mstime(void); @@ -2851,10 +2920,12 @@ void deauthenticateAndCloseClient(client *c); void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...); int beforeNextClient(client *c); void clearClientConnectionState(client *c); -void resetClient(client *c); +void resetClient(client *c, int num_pcmds_to_free); +void resetClientQbufState(client *c); void freeClientOriginalArgv(client *c); void freeClientArgv(client *c); -void tryDeferFreeClientObject(client *c, robj *o); +void freeClientPendingCommands(client *c, int num_pcmds_to_free); +void tryDeferFreeClientObject(client *c, int type, void *ptr); void freeClientDeferredObjects(client *c, int free_array); void sendReplyToClient(connection *conn); void *addReplyDeferredLen(client *c); @@ -2863,6 +2934,7 @@ void setDeferredMapLen(client *c, void *node, long length); void setDeferredSetLen(client *c, void *node, long length); void setDeferredAttributeLen(client *c, void *node, long length); void setDeferredPushLen(client *c, void *node, long length); +int isClientReadErrorFatal(client *c); int processInputBuffer(client *c); void acceptCommonHandler(connection *conn, int flags, char *ip); void readQueryFromClient(connection *conn); @@ -2958,6 +3030,7 @@ void unprotectClient(client *c); client *lookupClientByID(uint64_t id); int authRequired(client *c); void putClientInPendingWriteQueue(client *c); +getKeysResult *getClientCachedKeyResult(client *c); /* reply macros */ #define ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c, str) addReplyBulkCBuffer(c, str, strlen(str)) @@ -3276,7 +3349,7 @@ void ACLClearCommandID(void); user *ACLGetUserByName(const char *name, size_t namelen); int ACLUserCheckKeyPerm(user *u, const char *key, int keylen, int flags); int ACLUserCheckChannelPerm(user *u, sds channel, int literal); -int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, int *idxptr); +int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, getKeysResult *key_result, int *idxptr); int ACLUserCheckCmdWithUnrestrictedKeyAccess(user *u, struct redisCommand *cmd, robj **argv, int argc, int flags); int ACLCheckAllPerm(client *c, int *idxptr); int ACLSetUser(user *u, const char *op, ssize_t oplen); @@ -3370,8 +3443,10 @@ void updatePeakMemory(size_t used_memory); size_t freeMemoryGetNotCountedMemory(void); int overMaxmemoryAfterAlloc(size_t moremem); uint64_t getCommandFlags(client *c); +void preprocessCommand(client *c, pendingCommand *pcmd); int processCommand(client *c); void commandProcessed(client *c); +void prepareForNextCommand(client *c, int update_slot_stats); int processPendingCommandAndInputBuffer(client *c); int processCommandAndResetClient(client *c); int areCommandKeysInSameSlot(client *c, int *hashslot); @@ -3776,6 +3851,7 @@ int doesCommandHaveKeys(struct redisCommand *cmd); int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags); void getKeysFreeResult(getKeysResult *result); +int extractKeysAndSlot(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result, int *slot); int sintercardGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result); int zunionInterDiffGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result); int zunionInterDiffStoreGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result); diff --git a/tests/unit/cluster/misc.tcl b/tests/unit/cluster/misc.tcl index cd66697c4..62bdcf7db 100644 --- a/tests/unit/cluster/misc.tcl +++ b/tests/unit/cluster/misc.tcl @@ -22,5 +22,15 @@ start_cluster 2 2 {tags {external:skip cluster}} { R 0 flushall assert_equal {OK} [R 0 CLUSTER flushslots] } -} + test "CROSSSLOT error for keys in different slots" { + # Test MSET with keys in different slots + assert_error {*CROSSSLOT Keys in request don't hash to the same slot*} {R 0 MSET foo bar baz qux} + + # Test DEL with keys in different slots + assert_error {*CROSSSLOT Keys in request don't hash to the same slot*} {R 0 DEL foo bar} + + # Test MGET with keys in different slots + assert_error {*CROSSSLOT Keys in request don't hash to the same slot*} {R 0 MGET foo bar} + } +} diff --git a/tests/unit/cluster/sharded-pubsub.tcl b/tests/unit/cluster/sharded-pubsub.tcl index 0347ac653..57b550ab7 100644 --- a/tests/unit/cluster/sharded-pubsub.tcl +++ b/tests/unit/cluster/sharded-pubsub.tcl @@ -29,7 +29,7 @@ start_cluster 1 1 {tags {external:skip cluster}} { $primary MULTI $primary SPUBLISH ch1 "hello" $primary GET foo - catch {[$primary EXEC]} err + catch {$primary EXEC} err assert_match {CROSSSLOT*} $err } diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 75bd96fba..2a1a9e641 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -76,6 +76,10 @@ run_solo {defrag} { } proc test_active_defrag {type} { + + # note: Disabling lookahead because it changes the number and order of allocations which interferes with defrag and causes tests to fail + r config set lookahead 1 + if {[string match {*jemalloc*} [s mem_allocator]] && [r debug mallctl arenas.page] <= 8192} { test "Active defrag main dictionary: $type" { r config set hz 100 diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index 4f63f4e01..accd64fa6 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -375,3 +375,38 @@ start_server {tags {"timeout external:skip"}} { assert_equal "PONG" [r ping] } } + +test {Pending command pool expansion and shrinking} { + start_server {overrides {loglevel debug} tags {external:skip}} { + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + + # Client1 sends 16 commands in pipeline, and was blocked at the first command + set buf "" + append buf "blpop mylist 0\r\n" + for {set i 1} {$i < 16} {incr i} { + append buf "set key$i value$i\r\n" + } + $rd1 write $buf + $rd1 flush + wait_for_blocked_clients_count 1 + + # Client2 sends 1 command, this will trigger pending command pool expansion + # from 16 to 32 since A client has used up all 16 commands in the command pool. + $rd2 set bkey bvalue + assert_equal {OK} [$rd2 read] + + # Unblock client1, allowing it to return all pending commands back to the pool. + r lpush mylist unblock_value + assert_equal {mylist unblock_value} [$rd1 read] + for {set i 1} {$i < 16} {incr i} { + assert_equal {OK} [$rd1 read] + } + + # Wait for the pending command pool to shrink back to 16 due to low utilization. + wait_for_log_messages 0 {"*Shrunk pending command pool: capacity 32->16*"} 0 10 1000 + + $rd1 close + $rd2 close + } +}