mirror of
https://github.com/redis/redis.git
synced 2026-05-28 04:02:46 -04:00
RED-135816: Lookahead pre-fetching (#14440)
Some checks are pending
CI / test-ubuntu-latest (push) Waiting to run
CI / test-sanitizer-address (push) Waiting to run
CI / build-debian-old (push) Waiting to run
CI / build-macos-latest (push) Waiting to run
CI / build-32bit (push) Waiting to run
CI / build-libc-malloc (push) Waiting to run
CI / build-centos-jemalloc (push) Waiting to run
CI / build-old-chain-jemalloc (push) Waiting to run
Codecov / code-coverage (push) Waiting to run
External Server Tests / test-external-standalone (push) Waiting to run
External Server Tests / test-external-cluster (push) Waiting to run
External Server Tests / test-external-nodebug (push) Waiting to run
Spellcheck / Spellcheck (push) Waiting to run
Some checks are pending
CI / test-ubuntu-latest (push) Waiting to run
CI / test-sanitizer-address (push) Waiting to run
CI / build-debian-old (push) Waiting to run
CI / build-macos-latest (push) Waiting to run
CI / build-32bit (push) Waiting to run
CI / build-libc-malloc (push) Waiting to run
CI / build-centos-jemalloc (push) Waiting to run
CI / build-old-chain-jemalloc (push) Waiting to run
Codecov / code-coverage (push) Waiting to run
External Server Tests / test-external-standalone (push) Waiting to run
External Server Tests / test-external-cluster (push) Waiting to run
External Server Tests / test-external-nodebug (push) Waiting to run
Spellcheck / Spellcheck (push) Waiting to run
## Problem and Motivation
Currently, the client only parses one command, then executes it, then
parses new commands until the querybuf is consumed. Doing it this way
means we cannot perform memory prefetch when IO threads are not enabled,
and when IO threads are enabled, we can only parse the first command in
the IO thread, while the remaining command parsing still needs to be
done in the main thread.
This describes a limitation in the current Redis command processing
pipeline where:
Without IO threads: Commands are parsed and executed one by one
sequentially, preventing memory prefetching optimizations
With IO threads: Only the first command gets parsed in the IO thread,
but subsequent commands from the same client's query buffer must still
be parsed in the main thread
## Solution Overview
**Core Innovation**: Parse multiple user commands in advance through a
lookahead pipeline.
**Key Insight**: Since Redis already parses commands to extract keys, we
can do this parsing earlier and memory prefetch operations before the
command reaches execution, allowing multiple I/O operations to run in
parallel.
The bulk of the PR is a redesign of the command processing flow for both
standalone commands and transactional commands.
### High Level Command Processing Flow
#### Before This PR (processInputBuffer())
- While there is data in the client's query buffer:
- Read the data and try to parse a complete command
(processInlineBuffer() or processMultibulkBuffer()).
- If the command is incomplete, exit and wait for more data.
- The Command is complete. Process and potentially execute it
(processCommandAndResetClient(), processCommand()):
- Prepare for the next command (commandProcessed()).
### Major Changes in the Client's Structure
To support the new command processing flow:
- **New pendingCommand structure**: Since the previous flow processed
commands one at a time, it used the client structure to hold the current
(and only) parsed command arguments (argv/argc) and other metadata. In
the new design, multiple commands are processed, waiting for execution.
So, a new pendingCommand structure is introduced to hold a parsed
command's arguments and its metadata.
- **New pendingCommandList structure (pending_cmds)** that contains all
the pending commands with maintained order and includes a ready_len
counter that tracks the number of fully parsed commands ready for
execution. All commands are fully parsed except possibly the last one
(client's command order is maintained).
- **New pendingCommandPool structure (cmd_pool)** that manages a shared
pool for reusing pendingCommand objects to reduce memory allocation
overhead.
There is a configurable lookahead limit (server.lookahead) that controls
how many fully parsed commands (ready pending commands) to process ahead
of time.
#### New High Level Flow for Standalone Commands (processInputBuffer())
- While there is data in the client's query buffer or there are ready
pending commands:
- While there is data in the client's query buffer and we haven't
reached the lookahead limit:
- Read the data and try to parse a complete command
(processInlineBuffer() or processMultibulkBuffer()). Allocate a new
pending command if needed, store the command's metadata in the pending
command, and add the pending command to the client's pending commands
list.
- If the command is incomplete, exit and wait for more data.
- The command is complete, we have a new ready pending command,
preprocess it (preprocessCommand()):
- Extract the keys of the command and store the results in the pending
command (extractKeysAndSlot()).
- If there are pending commands, continue executing them until the queue
is empty.
## Transaction Support
### Major Changes in Structures
- The multiState structure now contains an array of pendingCommand
pointers instead of multiCmd pointers.
- The multiCmd structure was deleted (no longer needed).
### New Transaction Support
- queueMultiCommand():
- The pending commands are moved from the client's pending_cmds list to
the multiState's commands array.
## Detailed Changes
### Additional Client Structure Changes
- Replaced argv_len_sum with all_argv_len_sum to reflect the total
memory consumed by all pending commands.
### Clients and Pending Commands Management
- Clients using pending commands now manage the command arguments via
the pendingCommand. Specifically, the memory occupied by argv.
- **Pending commands management functions**:
- `initPendingCommand()` initializes a newly allocated pending command.
- `freeClientPendingCommand()` frees a pending command of a client and
its associated resources.
- `freeClientPendingCommands()` receives the number of pending commands
to free and calls freeClientPendingCommand() to free them.
### Buffer Processing Changes
- `processInlineBuffer()`, once a full command is read, used to populate
the client's command fields (argc, argv, etc.). Now it creates and
populates a pendingCommand, and adds it to the client's pending_cmds
list.
- `processMultibulkBuffer()`: Similar changes to processInlineBuffer().
The difference is that a pending command may already exist from a
previous call to the function, so parsing will continue populating it
instead of creating a new one.
- `resetClientInternal()` used to receive a free_argv parameter and pass
it to freeClientArgvInternal(), which freed the client's argv if set,
and also reset client's command fields. It now receives the number of
pending commands to free and handles two cases:
- The client uses pending commands so they are freed by calling
freeClientPendingCommands().
- The client doesn't use pending commands (e.g., LUA client) so the
client's argv is freed by calling freeClientArgvInternal().
It then frees the client's command fields that freeClientArgvInternal()
doesn't free now.
### Other Changes
- Simulate lookahead command preprocessing when loading an AOF and
queuing transaction commands; This is necessary since
queueMultiCommand() now requires a pending command.
- The INVALID_CLUSTER_SLOT constant was defined to indicate an invalid
cluster slot. It is used to signal a cross-slot error in
preprocessCommand().
- getNodeByQuery() no longer performs cross-slot checks, relying instead
on the checks already performed in preprocessCommand(). It also no
longer calls getKeysFromCommand() as this was also done in
preprocessCommand().
### Debugging
- Added "debug lookahead" command to print the size of the lookahead
pipeline for each client.
## New Configuration
- **lookahead**: Runtime-configurable lookahead depth (default: 16)
## Security
- **Limit lookahead for unauthenticated clients to 1**. This is both to
reduce memory overhead, and to prevent errors; AUTH can affect the
handling of succeeding commands.
---------
Co-authored-by: Slava Koyfman <slava.koyfman@redis.com>
Co-authored-by: Oran Agra <oran@redis.com>
Co-authored-by: Udi Ron <udi.ron@redis.com>
Co-authored-by: moticless <moticless@github.com>
Co-authored-by: Yuan Wang <yuan.wang@redis.com>
This commit is contained in:
parent
340615255b
commit
235e688b01
22 changed files with 1047 additions and 271 deletions
|
|
@ -2174,6 +2174,9 @@ client-output-buffer-limit pubsub 32mb 8mb 60
|
|||
#
|
||||
# client-query-buffer-limit 1gb
|
||||
|
||||
# Defines how many commands in each client pipeline to decode and prefetch
|
||||
# lookahead 16
|
||||
|
||||
# In some scenarios client connections can hog up memory leading to OOM
|
||||
# errors or data eviction. To avoid this we can cap the accumulated memory
|
||||
# used by all client connections (all pubsub and normal clients). Once we
|
||||
|
|
|
|||
35
src/acl.c
35
src/acl.c
|
|
@ -421,8 +421,7 @@ user *ACLCreateUser(const char *name, size_t namelen) {
|
|||
if (raxFind(Users,(unsigned char*)name,namelen,NULL)) return NULL;
|
||||
user *u = zmalloc(sizeof(*u));
|
||||
u->name = sdsnewlen(name,namelen);
|
||||
u->flags = USER_FLAG_DISABLED;
|
||||
u->flags |= USER_FLAG_SANITIZE_PAYLOAD;
|
||||
atomicSet(u->flags, USER_FLAG_DISABLED | USER_FLAG_SANITIZE_PAYLOAD);
|
||||
u->passwords = listCreate();
|
||||
u->acl_string = NULL;
|
||||
listSetMatchMethod(u->passwords,ACLListMatchSds);
|
||||
|
|
@ -1289,22 +1288,18 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) {
|
|||
if (oplen == -1) oplen = strlen(op);
|
||||
if (oplen == 0) return C_OK; /* Empty string is a no-operation. */
|
||||
if (!strcasecmp(op,"on")) {
|
||||
u->flags |= USER_FLAG_ENABLED;
|
||||
u->flags &= ~USER_FLAG_DISABLED;
|
||||
atomicSet(u->flags, (u->flags | USER_FLAG_ENABLED) & ~USER_FLAG_DISABLED);
|
||||
} else if (!strcasecmp(op,"off")) {
|
||||
u->flags |= USER_FLAG_DISABLED;
|
||||
u->flags &= ~USER_FLAG_ENABLED;
|
||||
atomicSet(u->flags, (u->flags | USER_FLAG_DISABLED) & ~USER_FLAG_ENABLED);
|
||||
} else if (!strcasecmp(op,"skip-sanitize-payload")) {
|
||||
u->flags |= USER_FLAG_SANITIZE_PAYLOAD_SKIP;
|
||||
u->flags &= ~USER_FLAG_SANITIZE_PAYLOAD;
|
||||
atomicSet(u->flags, (u->flags | USER_FLAG_SANITIZE_PAYLOAD_SKIP) & ~USER_FLAG_SANITIZE_PAYLOAD);
|
||||
} else if (!strcasecmp(op,"sanitize-payload")) {
|
||||
u->flags &= ~USER_FLAG_SANITIZE_PAYLOAD_SKIP;
|
||||
u->flags |= USER_FLAG_SANITIZE_PAYLOAD;
|
||||
atomicSet(u->flags, (u->flags | USER_FLAG_SANITIZE_PAYLOAD) & ~USER_FLAG_SANITIZE_PAYLOAD_SKIP);
|
||||
} else if (!strcasecmp(op,"nopass")) {
|
||||
u->flags |= USER_FLAG_NOPASS;
|
||||
atomicSet(u->flags, u->flags | USER_FLAG_NOPASS);
|
||||
listEmpty(u->passwords);
|
||||
} else if (!strcasecmp(op,"resetpass")) {
|
||||
u->flags &= ~USER_FLAG_NOPASS;
|
||||
atomicSet(u->flags, u->flags & ~USER_FLAG_NOPASS);
|
||||
listEmpty(u->passwords);
|
||||
} else if (op[0] == '>' || op[0] == '#') {
|
||||
sds newpass;
|
||||
|
|
@ -1324,7 +1319,7 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) {
|
|||
listAddNodeTail(u->passwords,newpass);
|
||||
else
|
||||
sdsfree(newpass);
|
||||
u->flags &= ~USER_FLAG_NOPASS;
|
||||
atomicSet(u->flags, u->flags & ~USER_FLAG_NOPASS);
|
||||
} else if (op[0] == '<' || op[0] == '!') {
|
||||
sds delpass;
|
||||
if (op[0] == '<') {
|
||||
|
|
@ -1852,7 +1847,7 @@ int ACLUserCheckChannelPerm(user *u, sds channel, int is_pattern) {
|
|||
* If the command fails an ACL check, idxptr will be to set to the first argv entry that
|
||||
* causes the failure, either 0 if the command itself fails or the idx of the key/channel
|
||||
* that causes the failure */
|
||||
int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, int *idxptr) {
|
||||
int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, getKeysResult *key_result, int *idxptr) {
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
|
||||
|
|
@ -1869,6 +1864,10 @@ int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, i
|
|||
* calls to prevent duplicate lookups. */
|
||||
aclKeyResultCache cache;
|
||||
initACLKeyResultCache(&cache);
|
||||
if (key_result) {
|
||||
cache.keys = *key_result;
|
||||
cache.keys_init = 1;
|
||||
}
|
||||
|
||||
/* Check each selector sequentially */
|
||||
listRewind(u->selectors,&li);
|
||||
|
|
@ -1876,7 +1875,7 @@ int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, i
|
|||
aclSelector *s = (aclSelector *) listNodeValue(ln);
|
||||
int acl_retval = ACLSelectorCheckCmd(s, cmd, argv, argc, &local_idxptr, &cache);
|
||||
if (acl_retval == ACL_OK) {
|
||||
cleanupACLKeyResultCache(&cache);
|
||||
if (!key_result) cleanupACLKeyResultCache(&cache);
|
||||
return ACL_OK;
|
||||
}
|
||||
if (acl_retval > relevant_error ||
|
||||
|
|
@ -1888,13 +1887,13 @@ int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, i
|
|||
}
|
||||
|
||||
*idxptr = last_idx;
|
||||
cleanupACLKeyResultCache(&cache);
|
||||
if (!key_result) cleanupACLKeyResultCache(&cache);
|
||||
return relevant_error;
|
||||
}
|
||||
|
||||
/* High level API for checking if a client can execute the queued up command */
|
||||
int ACLCheckAllPerm(client *c, int *idxptr) {
|
||||
return ACLCheckAllUserCommandPerm(c->user, c->cmd, c->argv, c->argc, idxptr);
|
||||
return ACLCheckAllUserCommandPerm(c->user, c->cmd, c->argv, c->argc, getClientCachedKeyResult(c), idxptr);
|
||||
}
|
||||
|
||||
/* If 'new' can access all channels 'original' could then return NULL;
|
||||
|
|
@ -3144,7 +3143,7 @@ void aclCommand(client *c) {
|
|||
}
|
||||
|
||||
int idx;
|
||||
int result = ACLCheckAllUserCommandPerm(u, cmd, c->argv + 3, c->argc - 3, &idx);
|
||||
int result = ACLCheckAllUserCommandPerm(u, cmd, c->argv + 3, c->argc - 3, NULL, &idx);
|
||||
if (result != ACL_OK) {
|
||||
sds err = getAclErrorMessage(result, u, cmd, c->argv[idx+3]->ptr, 1);
|
||||
addReplyBulkSds(c, err);
|
||||
|
|
|
|||
12
src/aof.c
12
src/aof.c
|
|
@ -1641,12 +1641,24 @@ int loadSingleAppendOnlyFile(char *filename) {
|
|||
if (fakeClient->flags & CLIENT_MULTI &&
|
||||
fakeClient->cmd->proc != execCommand)
|
||||
{
|
||||
/* queueMultiCommand requires a pendingCommand, so we create a "fake" one here
|
||||
* for it to consume */
|
||||
pendingCommand *pcmd = zmalloc(sizeof(pendingCommand));
|
||||
initPendingCommand(pcmd);
|
||||
addPendingCommand(&fakeClient->pending_cmds, pcmd);
|
||||
|
||||
pcmd->argc = argc;
|
||||
pcmd->argv_len = argc;
|
||||
pcmd->argv = argv;
|
||||
pcmd->cmd = cmd;
|
||||
|
||||
/* Note: we don't have to attempt calling evalGetCommandFlags,
|
||||
* since this is AOF, the checks in processCommand are not made
|
||||
* anyway.*/
|
||||
queueMultiCommand(fakeClient, cmd->flags);
|
||||
} else {
|
||||
cmd->proc(fakeClient);
|
||||
fakeClient->all_argv_len_sum = 0; /* Otherwise no one cleans this up and we reach cleanup with it non-zero */
|
||||
}
|
||||
|
||||
/* The fake client should not have a reply */
|
||||
|
|
|
|||
|
|
@ -130,8 +130,7 @@ void processUnblockedClients(void) {
|
|||
* call reqresAppendResponse here (for clients blocked on key,
|
||||
* unblockClientOnKey is called, which eventually calls processCommand,
|
||||
* which calls reqresAppendResponse) */
|
||||
reqresAppendResponse(c);
|
||||
resetClient(c);
|
||||
prepareForNextCommand(c, 0);
|
||||
}
|
||||
|
||||
if (c->flags & CLIENT_MODULE) {
|
||||
|
|
|
|||
|
|
@ -1086,6 +1086,31 @@ void clusterCommand(client *c) {
|
|||
}
|
||||
}
|
||||
|
||||
/* Extract slot number from keys in a keys_result structure and return to caller.
|
||||
* Returns INVALID_CLUSTER_SLOT if keys belong to different slots (cross-slot error),
|
||||
* or if there are no keys.
|
||||
*/
|
||||
int extractSlotFromKeysResult(robj **argv, getKeysResult *keys_result) {
|
||||
if (keys_result->numkeys == 0)
|
||||
return INVALID_CLUSTER_SLOT;
|
||||
|
||||
if (!server.cluster_enabled)
|
||||
return 0;
|
||||
|
||||
int first_slot = INVALID_CLUSTER_SLOT;
|
||||
for (int j = 0; j < keys_result->numkeys; j++) {
|
||||
robj *this_key = argv[keys_result->keys[j].pos];
|
||||
int this_slot = (int)keyHashSlot((char*)this_key->ptr, sdslen(this_key->ptr));
|
||||
|
||||
if (first_slot == INVALID_CLUSTER_SLOT)
|
||||
first_slot = this_slot;
|
||||
else if (first_slot != this_slot) {
|
||||
return INVALID_CLUSTER_SLOT;
|
||||
}
|
||||
}
|
||||
return first_slot;
|
||||
}
|
||||
|
||||
/* Return the pointer to the cluster node that is able to serve the command.
|
||||
* For the function to succeed the command should only target either:
|
||||
*
|
||||
|
|
@ -1118,13 +1143,16 @@ void clusterCommand(client *c) {
|
|||
*
|
||||
* CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is
|
||||
* down but the user attempts to execute a command that addresses one or more keys. */
|
||||
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, uint64_t cmd_flags, int *error_code) {
|
||||
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot,
|
||||
getKeysResult *keys_result, uint8_t read_error, uint64_t cmd_flags, int *error_code)
|
||||
{
|
||||
clusterNode *myself = getMyClusterNode();
|
||||
clusterNode *n = NULL;
|
||||
robj *firstkey = NULL;
|
||||
int multiple_keys = 0;
|
||||
multiState *ms, _ms;
|
||||
multiCmd mc;
|
||||
pendingCommand mc;
|
||||
pendingCommand *mcp = &mc;
|
||||
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0,
|
||||
existing_keys = 0;
|
||||
int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */
|
||||
|
|
@ -1152,11 +1180,20 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
|||
* structure if the client is not in MULTI/EXEC state, this way
|
||||
* we have a single codepath below. */
|
||||
ms = &_ms;
|
||||
_ms.commands = &mc;
|
||||
_ms.commands = &mcp;
|
||||
_ms.count = 1;
|
||||
|
||||
/* Properly initialize the fake pendingCommand */
|
||||
initPendingCommand(&mc);
|
||||
mc.argv = argv;
|
||||
mc.argc = argc;
|
||||
mc.cmd = cmd;
|
||||
mc.slot = hashslot ? *hashslot : INVALID_CLUSTER_SLOT;
|
||||
mc.read_error = read_error;
|
||||
if (keys_result) {
|
||||
mc.keys_result = *keys_result;
|
||||
mc.flags |= PENDING_CMD_KEYS_RESULT_VALID;
|
||||
}
|
||||
}
|
||||
|
||||
/* Check that all the keys are in the same hash slot, and obtain this
|
||||
|
|
@ -1164,12 +1201,14 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
|||
for (i = 0; i < ms->count; i++) {
|
||||
struct redisCommand *mcmd;
|
||||
robj **margv;
|
||||
int margc, numkeys, j;
|
||||
int margc, j;
|
||||
keyReference *keyindex;
|
||||
|
||||
mcmd = ms->commands[i].cmd;
|
||||
margc = ms->commands[i].argc;
|
||||
margv = ms->commands[i].argv;
|
||||
pendingCommand *pcmd = ms->commands[i];
|
||||
|
||||
mcmd = pcmd->cmd;
|
||||
margc = pcmd->argc;
|
||||
margv = pcmd->argv;
|
||||
|
||||
/* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
|
||||
if (!pubsubshard_included &&
|
||||
|
|
@ -1178,14 +1217,29 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
|||
pubsubshard_included = 1;
|
||||
}
|
||||
|
||||
/* If we have a cached keys result from preprocessCommand(), use it.
|
||||
* Otherwise, extract keys result. */
|
||||
int use_cache_keys_result = pcmd->flags & PENDING_CMD_KEYS_RESULT_VALID;
|
||||
getKeysResult result = GETKEYS_RESULT_INIT;
|
||||
numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
|
||||
if (use_cache_keys_result)
|
||||
result = pcmd->keys_result;
|
||||
else
|
||||
getKeysFromCommand(mcmd,margv,margc,&result);
|
||||
keyindex = result.keys;
|
||||
|
||||
for (j = 0; j < numkeys; j++) {
|
||||
for (j = 0; j < result.numkeys; j++) {
|
||||
/* The command has keys and was checked for cross-slot between its keys in preprocessCommand() */
|
||||
if (pcmd->read_error == CLIENT_READ_CROSS_SLOT) {
|
||||
/* Error: multiple keys from different slots. */
|
||||
if (error_code)
|
||||
*error_code = CLUSTER_REDIR_CROSS_SLOT;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
robj *thiskey = margv[keyindex[j].pos];
|
||||
int thisslot = keyHashSlot((char*)thiskey->ptr,
|
||||
sdslen(thiskey->ptr));
|
||||
int thisslot = pcmd->slot;
|
||||
if (thisslot == INVALID_CLUSTER_SLOT)
|
||||
thisslot = keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr));
|
||||
|
||||
if (firstkey == NULL) {
|
||||
/* This is the first key we see. Check what is the slot
|
||||
|
|
@ -1199,7 +1253,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
|||
* not trapped earlier in processCommand(). Report the same
|
||||
* error to the client. */
|
||||
if (n == NULL) {
|
||||
getKeysFreeResult(&result);
|
||||
if (!use_cache_keys_result) getKeysFreeResult(&result);
|
||||
if (error_code)
|
||||
*error_code = CLUSTER_REDIR_DOWN_UNBOUND;
|
||||
return NULL;
|
||||
|
|
@ -1222,7 +1276,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
|||
* the same key/channel as the first we saw. */
|
||||
if (slot != thisslot) {
|
||||
/* Error: multiple keys from different slots. */
|
||||
getKeysFreeResult(&result);
|
||||
if (!use_cache_keys_result) getKeysFreeResult(&result);
|
||||
if (error_code)
|
||||
*error_code = CLUSTER_REDIR_CROSS_SLOT;
|
||||
return NULL;
|
||||
|
|
@ -1247,7 +1301,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
|||
else existing_keys++;
|
||||
}
|
||||
}
|
||||
getKeysFreeResult(&result);
|
||||
if (!use_cache_keys_result) getKeysFreeResult(&result);
|
||||
}
|
||||
|
||||
/* No key at all in command? then we can serve the request
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@
|
|||
#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */
|
||||
#define CLUSTER_SLOTS (1<<CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */
|
||||
#define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */
|
||||
#define INVALID_CLUSTER_SLOT (-1) /* Invalid slot number. */
|
||||
#define CLUSTER_OK 0 /* Everything looks ok */
|
||||
#define CLUSTER_FAIL 1 /* The cluster can't work */
|
||||
#define CLUSTER_NAMELEN 40 /* sha1 hex length */
|
||||
|
|
@ -158,7 +159,9 @@ int clusterCanAccessKeysInSlot(int slot);
|
|||
struct slotRangeArray *clusterGetLocalSlotRanges(void);
|
||||
|
||||
/* functions with shared implementations */
|
||||
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, uint64_t cmd_flags, int *error_code);
|
||||
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot,
|
||||
getKeysResult *result, uint8_t read_error, uint64_t cmd_flags, int *error_code);
|
||||
int extractSlotFromKeysResult(robj **argv, getKeysResult *keys_result);
|
||||
int clusterRedirectBlockedClientIfNeeded(client *c);
|
||||
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code);
|
||||
void migrateCloseTimedoutSockets(void);
|
||||
|
|
|
|||
|
|
@ -3218,6 +3218,7 @@ standardConfig static_configs[] = {
|
|||
createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("cluster-compatibility-sample-ratio", NULL, MODIFIABLE_CONFIG, 0, 100, server.cluster_compatibility_sample_ratio, 0, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("cluster-slot-migration-max-archived-tasks", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 1, INT_MAX, server.asm_max_archived_tasks, 32, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("lookahead", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.lookahead, REDIS_DEFAULT_LOOKAHEAD, INTEGER_CONFIG, NULL, NULL),
|
||||
|
||||
/* Unsigned int configs */
|
||||
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
|
||||
|
|
|
|||
34
src/db.c
34
src/db.c
|
|
@ -567,7 +567,7 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link
|
|||
* Besides, we never free a string object in BIO threads, so, even with
|
||||
* lazyfree-lazy-server-del enabled, a fallback to main thread freeing
|
||||
* due to defer free failure doesn't go against the config intention. */
|
||||
tryDeferFreeClientObject(server.current_client, old);
|
||||
tryDeferFreeClientObject(server.current_client, DEFERRED_OBJECT_TYPE_ROBJ, old);
|
||||
} else if (server.lazyfree_lazy_server_del) {
|
||||
freeObjAsync(key, old, db->id);
|
||||
} else {
|
||||
|
|
@ -3134,6 +3134,38 @@ int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getK
|
|||
return 0;
|
||||
}
|
||||
|
||||
/* Extract keys/channels from a command and calculate the cluster slot.
|
||||
* Returns the number of keys/channels extracted.
|
||||
* The slot number is returned by reference into *slot.
|
||||
* If is_incomplete is not NULL, it will be set for key extraction.
|
||||
*
|
||||
* This function handles both regular commands (keys) and sharded pubsub
|
||||
* commands (channels), but excludes regular pubsub commands which don't
|
||||
* have slots.
|
||||
*/
|
||||
int extractKeysAndSlot(struct redisCommand *cmd, robj **argv, int argc,
|
||||
getKeysResult *result, int *slot) {
|
||||
int num_keys = -1;
|
||||
|
||||
if (!doesCommandHaveChannelsWithFlags(cmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE)) {
|
||||
num_keys = getKeysFromCommandWithSpecs(cmd, argv, argc, GET_KEYSPEC_DEFAULT, result);
|
||||
} else {
|
||||
/* Only extract channels for commands that have key_specs (sharded pubsub).
|
||||
* Regular pubsub commands (PUBLISH, SUBSCRIBE) don't have slots. */
|
||||
if (cmd->key_specs_num > 0) {
|
||||
num_keys = getChannelsFromCommand(cmd, argv, argc, result);
|
||||
} else {
|
||||
num_keys = 0;
|
||||
}
|
||||
}
|
||||
|
||||
*slot = INVALID_CLUSTER_SLOT;
|
||||
if (num_keys >= 0)
|
||||
*slot = extractSlotFromKeysResult(argv, result);
|
||||
|
||||
return num_keys;
|
||||
}
|
||||
|
||||
/* The base case is to use the keys position as given in the command table
|
||||
* (firstkey, lastkey, step).
|
||||
* This function works only on command with the legacy_range_key_spec,
|
||||
|
|
|
|||
|
|
@ -175,7 +175,7 @@ void assignClientToIOThread(client *c) {
|
|||
server.io_threads_clients_num[min_id]++;
|
||||
|
||||
/* The client running in IO thread needs to have deferred objects array. */
|
||||
c->deferred_objects = zmalloc(sizeof(robj*) * CLIENT_MAX_DEFERRED_OBJECTS);
|
||||
c->deferred_objects = zmalloc(sizeof(deferredObject) * CLIENT_MAX_DEFERRED_OBJECTS);
|
||||
|
||||
/* Unbind connection of client from main thread event loop, disable read and
|
||||
* write, and then put it in the list, main thread will send these clients
|
||||
|
|
@ -355,11 +355,12 @@ int prefetchIOThreadCommands(IOThread *t) {
|
|||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(mainThreadProcessingClients[t->id], &li);
|
||||
while((ln = listNext(&li)) && clients++ < to_prefetch) {
|
||||
while((ln = listNext(&li)) && clients < to_prefetch) {
|
||||
client *c = listNodeValue(ln);
|
||||
/* A single command may contain multiple keys. If the batch is full,
|
||||
* we stop adding clients to it. */
|
||||
if (addCommandToBatch(c) == C_ERR) break;
|
||||
clients++;
|
||||
}
|
||||
|
||||
/* Prefetch the commands in the batch. */
|
||||
|
|
@ -423,10 +424,13 @@ int processClientsFromIOThread(IOThread *t) {
|
|||
|
||||
listNode *node = NULL;
|
||||
while (listLength(mainThreadProcessingClients[t->id])) {
|
||||
/* Prefetch the commands if no clients in the batch. */
|
||||
if (prefetch_clients <= 0) prefetch_clients = prefetchIOThreadCommands(t);
|
||||
/* Reset the prefetching batch if we have processed all clients. */
|
||||
if (--prefetch_clients <= 0) resetCommandsBatch();
|
||||
if (prefetch_clients <= 0) {
|
||||
/* Reset the prefetching batch if we have processed all clients. */
|
||||
resetCommandsBatch();
|
||||
/* Prefetch the commands if no clients in the batch. */
|
||||
prefetch_clients = prefetchIOThreadCommands(t);
|
||||
}
|
||||
prefetch_clients--;
|
||||
|
||||
/* Each time we pop up only the first client to process to guarantee
|
||||
* reentrancy safety. */
|
||||
|
|
@ -445,7 +449,7 @@ int processClientsFromIOThread(IOThread *t) {
|
|||
|
||||
/* If a read error occurs, handle it in the main thread first, since we
|
||||
* want to print logs about client information before freeing. */
|
||||
if (c->read_error) handleClientReadError(c);
|
||||
if (isClientReadErrorFatal(c)) handleClientReadError(c);
|
||||
|
||||
/* The client is asked to close in IO thread. */
|
||||
if (c->io_flags & CLIENT_IO_CLOSE_ASAP) {
|
||||
|
|
@ -466,7 +470,7 @@ int processClientsFromIOThread(IOThread *t) {
|
|||
}
|
||||
|
||||
/* Process the pending command and input buffer. */
|
||||
if (!c->read_error && c->io_flags & CLIENT_IO_PENDING_COMMAND) {
|
||||
if (!isClientReadErrorFatal(c) && c->io_flags & CLIENT_IO_PENDING_COMMAND) {
|
||||
c->flags |= CLIENT_PENDING_COMMAND;
|
||||
if (processPendingCommandAndInputBuffer(c) == C_ERR) {
|
||||
/* If the client is no longer valid, it must be freed safely. */
|
||||
|
|
@ -730,8 +734,6 @@ void initThreadedIO(void) {
|
|||
exit(1);
|
||||
}
|
||||
|
||||
prefetchCommandsBatchInit();
|
||||
|
||||
/* Spawn and initialize the I/O threads. */
|
||||
for (int i = 1; i < server.io_threads_num; i++) {
|
||||
IOThread *t = &IOThreads[i];
|
||||
|
|
|
|||
|
|
@ -380,19 +380,32 @@ int addCommandToBatch(client *c) {
|
|||
return C_ERR;
|
||||
}
|
||||
|
||||
/* Avoid partial prefetching: if the batch already has keys and adding this
|
||||
* client's ready commands would likely exceed the batch size limit, reject
|
||||
* the entire client. This is a conservative estimate using command count as
|
||||
* a proxy for key count to ensure all keys from a client are either fully
|
||||
* prefetched together or not prefetched at all. */
|
||||
if (batch->key_count > 0 &&
|
||||
c->pending_cmds.ready_len + batch->key_count > batch->max_prefetch_size)
|
||||
{
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
batch->clients[batch->client_count++] = c;
|
||||
|
||||
if (likely(c->iolookedcmd)) {
|
||||
/* Get command's keys positions */
|
||||
getKeysResult result = GETKEYS_RESULT_INIT;
|
||||
int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result);
|
||||
for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) {
|
||||
batch->keys[batch->key_count] = c->argv[result.keys[i].pos];
|
||||
pendingCommand *pcmd = c->pending_cmds.head;
|
||||
while (pcmd != NULL) {
|
||||
/* Skip commands that have not been preprocessed, or have errors. */
|
||||
if ((pcmd->flags & PENDING_CMD_FLAG_INCOMPLETE) || !pcmd->cmd || pcmd->read_error) break;
|
||||
|
||||
serverAssert(pcmd->flags & PENDING_CMD_KEYS_RESULT_VALID);
|
||||
for (int i = 0; i < pcmd->keys_result.numkeys && batch->key_count < batch->max_prefetch_size; i++) {
|
||||
batch->keys[batch->key_count] = pcmd->argv[pcmd->keys_result.keys[i].pos];
|
||||
batch->keys_dicts[batch->key_count] =
|
||||
kvstoreGetDict(c->db->keys, c->slot > 0 ? c->slot : 0);
|
||||
kvstoreGetDict(c->db->keys, pcmd->slot > 0 ? pcmd->slot : 0);
|
||||
batch->key_count++;
|
||||
}
|
||||
getKeysFreeResult(&result);
|
||||
pcmd = pcmd->next;
|
||||
}
|
||||
|
||||
return C_OK;
|
||||
|
|
|
|||
30
src/module.c
30
src/module.c
|
|
@ -677,11 +677,12 @@ void moduleReleaseTempClient(client *c) {
|
|||
listEmpty(c->reply);
|
||||
c->reply_bytes = 0;
|
||||
c->duration = 0;
|
||||
resetClient(c);
|
||||
resetClient(c, -1);
|
||||
serverAssert(c->all_argv_len_sum == 0);
|
||||
c->bufpos = 0;
|
||||
c->flags = CLIENT_MODULE;
|
||||
c->user = NULL; /* Root user */
|
||||
c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL;
|
||||
c->cmd = c->lastcmd = c->realcmd = NULL;
|
||||
if (c->bstate.async_rm_call_handle) {
|
||||
RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle;
|
||||
promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */
|
||||
|
|
@ -6635,7 +6636,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
|
|||
int acl_errpos;
|
||||
int acl_retval;
|
||||
|
||||
acl_retval = ACLCheckAllUserCommandPerm(user,c->cmd,c->argv,c->argc,&acl_errpos);
|
||||
acl_retval = ACLCheckAllUserCommandPerm(user,c->cmd,c->argv,c->argc,NULL,&acl_errpos);
|
||||
if (acl_retval != ACL_OK) {
|
||||
sds object = (acl_retval == ACL_DENIED_CMD) ? sdsdup(c->cmd->fullname) : sdsdup(c->argv[acl_errpos]->ptr);
|
||||
addACLLogEntry(ctx->client, acl_retval, ACL_LOG_CTX_MODULE, -1, c->user->name, object);
|
||||
|
|
@ -6660,7 +6661,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
|
|||
c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING);
|
||||
c->flags |= ctx->client->flags & (CLIENT_READONLY|CLIENT_ASKING);
|
||||
const uint64_t cmd_flags = getCommandFlags(c);
|
||||
if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,cmd_flags,&error_code) !=
|
||||
if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,NULL,0,cmd_flags,&error_code) !=
|
||||
getMyClusterNode())
|
||||
{
|
||||
sds msg = NULL;
|
||||
|
|
@ -10022,7 +10023,7 @@ int RM_ACLCheckCommandPermissions(RedisModuleUser *user, RedisModuleString **arg
|
|||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
if (ACLCheckAllUserCommandPerm(user->user, cmd, argv, argc, &keyidxptr) != ACL_OK) {
|
||||
if (ACLCheckAllUserCommandPerm(user->user, cmd, argv, argc, NULL, &keyidxptr) != ACL_OK) {
|
||||
errno = EACCES;
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
|
@ -11157,12 +11158,27 @@ void moduleCallCommandFilters(client *c) {
|
|||
}
|
||||
|
||||
/* If the filter sets a new command, including command or subcommand,
|
||||
* the command looked up in IO threads will be invalid. */
|
||||
c->iolookedcmd = NULL;
|
||||
* the command looked up will be invalid. */
|
||||
c->lookedcmd = NULL;
|
||||
|
||||
c->argv = filter.argv;
|
||||
c->argv_len = filter.argv_len;
|
||||
c->argc = filter.argc;
|
||||
|
||||
/* Update pending command if it exists. */
|
||||
pendingCommand *pcmd = c->current_pending_cmd;
|
||||
if (pcmd) {
|
||||
pcmd->argv = filter.argv;
|
||||
pcmd->argc = filter.argc;
|
||||
pcmd->argv_len = filter.argv_len;
|
||||
pcmd->cmd = NULL;
|
||||
pcmd->slot = INVALID_CLUSTER_SLOT;
|
||||
pcmd->flags = 0;
|
||||
|
||||
/* Reset keys result */
|
||||
getKeysFreeResult(&pcmd->keys_result);
|
||||
pcmd->keys_result = (getKeysResult)GETKEYS_RESULT_INIT;
|
||||
}
|
||||
}
|
||||
|
||||
/* Return the number of arguments a filtered command has. The number of
|
||||
|
|
|
|||
70
src/multi.c
70
src/multi.c
|
|
@ -20,27 +20,19 @@ void initClientMultiState(client *c) {
|
|||
c->mstate.cmd_inv_flags = 0;
|
||||
c->mstate.argv_len_sums = 0;
|
||||
c->mstate.alloc_count = 0;
|
||||
c->mstate.executing_cmd = -1;
|
||||
}
|
||||
|
||||
/* Release all the resources associated with MULTI/EXEC state */
|
||||
void freeClientMultiState(client *c) {
|
||||
int j;
|
||||
|
||||
for (j = 0; j < c->mstate.count; j++) {
|
||||
int i;
|
||||
multiCmd *mc = c->mstate.commands+j;
|
||||
|
||||
for (i = 0; i < mc->argc; i++)
|
||||
decrRefCount(mc->argv[i]);
|
||||
zfree(mc->argv);
|
||||
for (int i = 0; i < c->mstate.count; i++) {
|
||||
freePendingCommand(c, c->mstate.commands[i]);
|
||||
}
|
||||
zfree(c->mstate.commands);
|
||||
}
|
||||
|
||||
/* Add a new command into the MULTI commands queue */
|
||||
void queueMultiCommand(client *c, uint64_t cmd_flags) {
|
||||
multiCmd *mc;
|
||||
|
||||
/* No sense to waste memory if the transaction is already aborted.
|
||||
* this is useful in case client sends these in a pipeline, or doesn't
|
||||
* bother to read previous responses and didn't notice the multi was already
|
||||
|
|
@ -50,29 +42,35 @@ void queueMultiCommand(client *c, uint64_t cmd_flags) {
|
|||
if (c->mstate.count == 0) {
|
||||
/* If a client is using multi/exec, assuming it is used to execute at least
|
||||
* two commands. Hence, creating by default size of 2. */
|
||||
c->mstate.commands = zmalloc(sizeof(multiCmd)*2);
|
||||
c->mstate.commands = zmalloc(sizeof(pendingCommand*)*2);
|
||||
c->mstate.alloc_count = 2;
|
||||
}
|
||||
if (c->mstate.count == c->mstate.alloc_count) {
|
||||
c->mstate.alloc_count = c->mstate.alloc_count < INT_MAX/2 ? c->mstate.alloc_count*2 : INT_MAX;
|
||||
c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.alloc_count));
|
||||
c->mstate.commands = zrealloc(c->mstate.commands, sizeof(pendingCommand*)*(c->mstate.alloc_count));
|
||||
}
|
||||
mc = c->mstate.commands+c->mstate.count;
|
||||
mc->cmd = c->cmd;
|
||||
mc->argc = c->argc;
|
||||
mc->argv = c->argv;
|
||||
mc->argv_len = c->argv_len;
|
||||
|
||||
/* Move the pending command into the multi-state.
|
||||
* We leave the empty list node in 'pending_cmds' for freeClientPendingCommands to clean up
|
||||
* later, but set the value to NULL to indicate it has been moved out and should not be freed. */
|
||||
pendingCommand *pcmd = popPendingCommandFromHead(&c->pending_cmds);
|
||||
c->current_pending_cmd = NULL;
|
||||
pendingCommand **mc = c->mstate.commands + c->mstate.count;
|
||||
*mc = pcmd;
|
||||
|
||||
c->mstate.count++;
|
||||
c->mstate.cmd_flags |= cmd_flags;
|
||||
c->mstate.cmd_inv_flags |= ~cmd_flags;
|
||||
c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj*)*c->argc;
|
||||
c->mstate.argv_len_sums += (*mc)->argv_len_sum;
|
||||
c->all_argv_len_sum -= (*mc)->argv_len_sum;
|
||||
|
||||
/* Reset the client's args since we copied them into the mstate and shouldn't
|
||||
* reference them from c anymore. */
|
||||
(*mc)->argv_len_sum = 0; /* This is no longer tracked through all_argv_len_sum, so we don't want */
|
||||
/* to subtract it from there later. */
|
||||
|
||||
/* Reset the client's args since we moved them into the mstate and shouldn't
|
||||
* reference them from 'c' anymore. */
|
||||
c->argv = NULL;
|
||||
c->argc = 0;
|
||||
c->argv_len_sum = 0;
|
||||
c->argv_len = 0;
|
||||
}
|
||||
|
||||
|
|
@ -130,6 +128,7 @@ void execCommand(client *c) {
|
|||
int j;
|
||||
robj **orig_argv;
|
||||
int orig_argc, orig_argv_len;
|
||||
size_t orig_all_argv_len_sum;
|
||||
struct redisCommand *orig_cmd;
|
||||
|
||||
if (!(c->flags & CLIENT_MULTI)) {
|
||||
|
|
@ -173,12 +172,19 @@ void execCommand(client *c) {
|
|||
orig_argv_len = c->argv_len;
|
||||
orig_argc = c->argc;
|
||||
orig_cmd = c->cmd;
|
||||
|
||||
/* Multi-state commands aren't tracked through all_argv_len_sum, so we don't want anything done while executing them to affect that field.
|
||||
* Otherwise, we get inconsistencies and all_argv_len_sum doesn't go back to exactly 0 when the client is finished */
|
||||
orig_all_argv_len_sum = c->all_argv_len_sum;
|
||||
|
||||
c->all_argv_len_sum = c->mstate.argv_len_sums;
|
||||
|
||||
addReplyArrayLen(c,c->mstate.count);
|
||||
for (j = 0; j < c->mstate.count; j++) {
|
||||
c->argc = c->mstate.commands[j].argc;
|
||||
c->argv = c->mstate.commands[j].argv;
|
||||
c->argv_len = c->mstate.commands[j].argv_len;
|
||||
c->cmd = c->realcmd = c->mstate.commands[j].cmd;
|
||||
c->argc = c->mstate.commands[j]->argc;
|
||||
c->argv = c->mstate.commands[j]->argv;
|
||||
c->argv_len = c->mstate.commands[j]->argv_len;
|
||||
c->cmd = c->realcmd = c->mstate.commands[j]->cmd;
|
||||
|
||||
/* ACL permissions are also checked at the time of execution in case
|
||||
* they were changed after the commands were queued. */
|
||||
|
|
@ -208,6 +214,7 @@ void execCommand(client *c) {
|
|||
"This command is no longer allowed for the "
|
||||
"following reason: %s", reason);
|
||||
} else {
|
||||
c->mstate.executing_cmd = j;
|
||||
if (c->id == CLIENT_ID_AOF)
|
||||
call(c,CMD_CALL_NONE);
|
||||
else
|
||||
|
|
@ -217,10 +224,10 @@ void execCommand(client *c) {
|
|||
}
|
||||
|
||||
/* Commands may alter argc/argv, restore mstate. */
|
||||
c->mstate.commands[j].argc = c->argc;
|
||||
c->mstate.commands[j].argv = c->argv;
|
||||
c->mstate.commands[j].argv_len = c->argv_len;
|
||||
c->mstate.commands[j].cmd = c->cmd;
|
||||
c->mstate.commands[j]->argc = c->argc;
|
||||
c->mstate.commands[j]->argv = c->argv;
|
||||
c->mstate.commands[j]->argv_len = c->argv_len;
|
||||
c->mstate.commands[j]->cmd = c->cmd;
|
||||
}
|
||||
|
||||
// restore old DENY_BLOCKING value
|
||||
|
|
@ -231,6 +238,7 @@ void execCommand(client *c) {
|
|||
c->argv_len = orig_argv_len;
|
||||
c->argc = orig_argc;
|
||||
c->cmd = c->realcmd = orig_cmd;
|
||||
c->all_argv_len_sum = orig_all_argv_len_sum;
|
||||
discardTransaction(c);
|
||||
|
||||
server.in_exec = 0;
|
||||
|
|
@ -490,6 +498,6 @@ size_t multiStateMemOverhead(client *c) {
|
|||
/* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */
|
||||
mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey));
|
||||
/* Reserved memory for queued multi commands. */
|
||||
mem += c->mstate.alloc_count * sizeof(multiCmd);
|
||||
mem += c->mstate.alloc_count * sizeof(pendingCommand);
|
||||
return mem;
|
||||
}
|
||||
|
|
|
|||
700
src/networking.c
700
src/networking.c
File diff suppressed because it is too large
Load diff
|
|
@ -4330,12 +4330,14 @@ void replicationCacheMaster(client *c) {
|
|||
server.master->qb_pos = 0;
|
||||
server.master->repl_applied = 0;
|
||||
server.master->read_reploff = server.master->reploff;
|
||||
server.master->reploff_next = 0;
|
||||
if (c->flags & CLIENT_MULTI) discardTransaction(c);
|
||||
listEmpty(c->reply);
|
||||
c->sentlen = 0;
|
||||
c->reply_bytes = 0;
|
||||
c->bufpos = 0;
|
||||
resetClient(c);
|
||||
resetClient(c, -1);
|
||||
resetClientQbufState(c);
|
||||
|
||||
/* Save the master. Server.master will be set to null later by
|
||||
* replicationHandleMasterDisconnection(). */
|
||||
|
|
|
|||
|
|
@ -486,7 +486,7 @@ static int scriptVerifyClusterState(scriptRunCtx *run_ctx, client *c, client *or
|
|||
c->flags |= original_c->flags & (CLIENT_READONLY | CLIENT_ASKING);
|
||||
const uint64_t cmd_flags = getCommandFlags(c);
|
||||
int hashslot = -1;
|
||||
if (getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, cmd_flags, &error_code) != getMyClusterNode()) {
|
||||
if (getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, NULL, 0, cmd_flags, &error_code) != getMyClusterNode()) {
|
||||
if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
|
||||
*err = sdsnew(
|
||||
"Script attempted to execute a write command while the "
|
||||
|
|
|
|||
|
|
@ -974,7 +974,8 @@ cleanup:
|
|||
c->argc = c->argv_len = 0;
|
||||
c->user = NULL;
|
||||
c->argv = NULL;
|
||||
resetClient(c);
|
||||
c->all_argv_len_sum = 0;
|
||||
resetClient(c, 1);
|
||||
inuse--;
|
||||
|
||||
if (raise_error) {
|
||||
|
|
@ -1134,7 +1135,7 @@ static int luaRedisAclCheckCmdPermissionsCommand(lua_State *lua) {
|
|||
raise_error = 1;
|
||||
} else {
|
||||
int keyidxptr;
|
||||
if (ACLCheckAllUserCommandPerm(rctx->original_client->user, cmd, argv, argc, &keyidxptr) != ACL_OK) {
|
||||
if (ACLCheckAllUserCommandPerm(rctx->original_client->user, cmd, argv, argc, NULL, &keyidxptr) != ACL_OK) {
|
||||
lua_pushboolean(lua, 0);
|
||||
} else {
|
||||
lua_pushboolean(lua, 1);
|
||||
|
|
|
|||
118
src/server.c
118
src/server.c
|
|
@ -873,23 +873,6 @@ int clientsCronResizeQueryBuffer(client *c) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
/* If the client has been idle for too long, free the client's arguments. */
|
||||
int clientsCronFreeArgvIfIdle(client *c) {
|
||||
/* If the client is in the middle of parsing a command, or if argv is in use
|
||||
* (e.g. parsed in the IO thread but not yet executed, or blocked), exit ASAP. */
|
||||
if (!c->argv || c->multibulklen || c->argc) return 0;
|
||||
|
||||
/* Free argv if the client has been idle for more than 2 seconds or if argv
|
||||
* size is too large. */
|
||||
time_t idletime = server.unixtime - c->lastinteraction;
|
||||
if (idletime > 2 || c->argv_len > 128) {
|
||||
c->argv_len = 0;
|
||||
zfree(c->argv);
|
||||
c->argv = NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* The client output buffer can be adjusted to better fit the memory requirements.
|
||||
*
|
||||
* the logic is:
|
||||
|
|
@ -961,7 +944,7 @@ int CurrentPeakMemUsageSlot = 0;
|
|||
int clientsCronTrackExpansiveClients(client *c) {
|
||||
size_t qb_size = c->querybuf ? sdsZmallocSize(c->querybuf) : 0;
|
||||
size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0;
|
||||
size_t in_usage = qb_size + c->argv_len_sum + argv_size;
|
||||
size_t in_usage = qb_size + c->all_argv_len_sum + argv_size;
|
||||
size_t out_usage = getClientOutputBufferMemoryUsage(c);
|
||||
|
||||
/* Track the biggest values observed so far in this slot. */
|
||||
|
|
@ -1113,7 +1096,6 @@ int clientsCronRunClient(client *c) {
|
|||
* terminated. */
|
||||
if (clientsCronHandleTimeout(c,now)) return 1;
|
||||
if (clientsCronResizeQueryBuffer(c)) return 1;
|
||||
if (clientsCronFreeArgvIfIdle(c)) return 1;
|
||||
if (clientsCronResizeOutputBuffer(c,now)) return 1;
|
||||
|
||||
if (clientsCronTrackExpansiveClients(c)) return 1;
|
||||
|
|
@ -1131,6 +1113,24 @@ int clientsCronRunClient(client *c) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
/* Periodic maintenance for the pending command pool.
|
||||
* This function should be called from serverCron to manage pool size based on utilization patterns. */
|
||||
void pendingCommandPoolCron(void) {
|
||||
/* Only shrink pool when IO threads are not active */
|
||||
if (server.io_threads_active) return;
|
||||
|
||||
/* Calculate utilization rate based on minimum pool size reached */
|
||||
if (server.cmd_pool.capacity > PENDING_COMMAND_POOL_SIZE) {
|
||||
/* If utilization is below threshold, shrink the pool */
|
||||
double utilization_ratio = 1.0 - (double)server.cmd_pool.min_size / server.cmd_pool.capacity;
|
||||
if (utilization_ratio < 0.5)
|
||||
shrinkPendingCommandPool();
|
||||
}
|
||||
|
||||
/* Reset tracking for next interval */
|
||||
server.cmd_pool.min_size = server.cmd_pool.size; /* Reset to current size */
|
||||
}
|
||||
|
||||
/* This function is called by serverCron() and is used in order to perform
|
||||
* operations on clients that are important to perform constantly. For instance
|
||||
* we use this function in order to disconnect clients after a timeout, including
|
||||
|
|
@ -1653,6 +1653,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||
migrateCloseTimedoutSockets();
|
||||
}
|
||||
|
||||
/* Periodically shrink pending command reuse pool */
|
||||
run_with_period(2000) {
|
||||
pendingCommandPoolCron();
|
||||
}
|
||||
|
||||
/* Resize tracking keys table if needed. This is also done at every
|
||||
* command execution, but we want to be sure that if the last command
|
||||
* executed changes the value via CONFIG SET, the server will perform
|
||||
|
|
@ -2940,6 +2945,12 @@ void initServer(void) {
|
|||
server.acl_info.user_auth_failures = 0;
|
||||
server.acl_info.invalid_channel_accesses = 0;
|
||||
|
||||
/* Initialize the shared pending command pool. */
|
||||
server.cmd_pool.size = 0;
|
||||
server.cmd_pool.capacity = PENDING_COMMAND_POOL_SIZE;
|
||||
server.cmd_pool.pool = zmalloc(sizeof(pendingCommand*) * PENDING_COMMAND_POOL_SIZE);
|
||||
server.cmd_pool.min_size = 0;
|
||||
|
||||
/* Create the timer callback, this is our way to process many background
|
||||
* operations incrementally, like clients timeout, eviction of unaccessed
|
||||
* expired keys and so forth. */
|
||||
|
|
@ -2987,6 +2998,8 @@ void initServer(void) {
|
|||
|
||||
if (server.maxmemory_clients != 0)
|
||||
initServerClientMemUsageBuckets();
|
||||
|
||||
prefetchCommandsBatchInit();
|
||||
}
|
||||
|
||||
void initListeners(void) {
|
||||
|
|
@ -4070,6 +4083,47 @@ uint64_t getCommandFlags(client *c) {
|
|||
return cmd_flags;
|
||||
}
|
||||
|
||||
void preprocessCommand(client *c, pendingCommand *pcmd) {
|
||||
pcmd->slot = INVALID_CLUSTER_SLOT;
|
||||
if (pcmd->argc == 0)
|
||||
return;
|
||||
|
||||
/* Check if we can reuse the previous command instead of looking it up.
|
||||
* The previous command is either the penultimate pending command (if it exists), or c->lastcmd. */
|
||||
struct redisCommand *last_cmd = pcmd->prev ? pcmd->prev->cmd : c->lastcmd;
|
||||
|
||||
if (isCommandReusable(last_cmd, pcmd->argv[0]))
|
||||
pcmd->cmd = last_cmd;
|
||||
else
|
||||
pcmd->cmd = lookupCommand(pcmd->argv, pcmd->argc);
|
||||
|
||||
if (!pcmd->cmd) {
|
||||
pcmd->read_error = CLIENT_READ_COMMAND_NOT_FOUND;
|
||||
return;
|
||||
}
|
||||
|
||||
if ((pcmd->cmd->arity > 0 && pcmd->cmd->arity != pcmd->argc) ||
|
||||
(pcmd->argc < -pcmd->cmd->arity))
|
||||
{
|
||||
pcmd->read_error = CLIENT_READ_BAD_ARITY;
|
||||
return;
|
||||
}
|
||||
|
||||
pcmd->keys_result = (getKeysResult)GETKEYS_RESULT_INIT;
|
||||
int num_keys = extractKeysAndSlot(pcmd->cmd, pcmd->argv, pcmd->argc,
|
||||
&pcmd->keys_result, &pcmd->slot);
|
||||
if (num_keys < 0) {
|
||||
/* We skip the checks below since We expect the command to be rejected in this case */
|
||||
return;
|
||||
} else if (num_keys > 0) {
|
||||
/* If the command has keys but the slot is invalid, it means
|
||||
* there is a cross-slot case. */
|
||||
if (pcmd->slot == INVALID_CLUSTER_SLOT)
|
||||
pcmd->read_error = CLIENT_READ_CROSS_SLOT;
|
||||
}
|
||||
pcmd->flags |= PENDING_CMD_KEYS_RESULT_VALID;
|
||||
}
|
||||
|
||||
/* If this function gets called we already read a whole
|
||||
* command, arguments are in the client argv/argc fields.
|
||||
* processCommand() execute the command or prepare the
|
||||
|
|
@ -4113,11 +4167,17 @@ int processCommand(client *c) {
|
|||
* we do not have to repeat the same checks */
|
||||
if (!client_reprocessing_command) {
|
||||
/* check if we can reuse the last command instead of looking up if we already have that info */
|
||||
struct redisCommand *cmd = NULL;
|
||||
if (isCommandReusable(c->lastcmd, c->argv[0]))
|
||||
cmd = c->lastcmd;
|
||||
else
|
||||
cmd = c->iolookedcmd ? c->iolookedcmd : lookupCommand(c->argv, c->argc);
|
||||
struct redisCommand *cmd = c->lookedcmd;
|
||||
|
||||
/* The command may have been modified by modules (e.g., in CommandFilters callbacks),
|
||||
* so we need to look it up again. */
|
||||
if (!cmd) {
|
||||
if (isCommandReusable(c->lastcmd, c->argv[0]))
|
||||
cmd = c->lastcmd;
|
||||
else
|
||||
cmd = lookupCommand(c->argv, c->argc);
|
||||
}
|
||||
|
||||
if (!cmd) {
|
||||
/* Handle possible security attacks. */
|
||||
if (!strcasecmp(c->argv[0]->ptr,"host:") || !strcasecmp(c->argv[0]->ptr,"post")) {
|
||||
|
|
@ -4215,7 +4275,7 @@ int processCommand(client *c) {
|
|||
{
|
||||
int error_code;
|
||||
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
|
||||
&c->slot,cmd_flags,&error_code);
|
||||
&c->slot,getClientCachedKeyResult(c),c->read_error,cmd_flags,&error_code);
|
||||
if (n == NULL || !clusterNodeIsMyself(n)) {
|
||||
if (c->cmd->proc == execCommand) {
|
||||
discardTransaction(c);
|
||||
|
|
@ -4470,9 +4530,9 @@ int areCommandKeysInSameSlot(client *c, int *hashslot) {
|
|||
/* If client is in multi-exec, we need to check the slot of all keys
|
||||
* in the transaction. */
|
||||
for (int i = 0; i < (ms ? ms->count : 1); i++) {
|
||||
struct redisCommand *cmd = ms ? ms->commands[i].cmd : c->cmd;
|
||||
robj **argv = ms ? ms->commands[i].argv : c->argv;
|
||||
int argc = ms ? ms->commands[i].argc : c->argc;
|
||||
struct redisCommand *cmd = ms ? ms->commands[i]->cmd : c->cmd;
|
||||
robj **argv = ms ? ms->commands[i]->argv : c->argv;
|
||||
int argc = ms ? ms->commands[i]->argc : c->argc;
|
||||
|
||||
getKeysResult result = GETKEYS_RESULT_INIT;
|
||||
int numkeys = getKeysFromCommand(cmd, argv, argc, &result);
|
||||
|
|
@ -7008,7 +7068,7 @@ void dismissClientMemory(client *c) {
|
|||
dismissMemory(c->buf, c->buf_usable_size);
|
||||
if (c->querybuf) dismissSds(c->querybuf);
|
||||
/* Dismiss argv array only if we estimate it contains a big buffer. */
|
||||
if (c->argc && c->argv_len_sum/c->argc >= server.page_size) {
|
||||
if (c->argc && c->all_argv_len_sum/c->argc >= server.page_size) {
|
||||
for (int i = 0; i < c->argc; i++) {
|
||||
dismissObject(c->argv[i], 0);
|
||||
}
|
||||
|
|
|
|||
110
src/server.h
110
src/server.h
|
|
@ -202,6 +202,9 @@ struct hdr_histogram;
|
|||
* in order to make sure of not over provisioning more than 128 fds. */
|
||||
#define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96)
|
||||
|
||||
/* Default lookahead value */
|
||||
#define REDIS_DEFAULT_LOOKAHEAD 16
|
||||
|
||||
/* OOM Score Adjustment classes. */
|
||||
#define CONFIG_OOM_MASTER 0
|
||||
#define CONFIG_OOM_REPLICA 1
|
||||
|
|
@ -463,6 +466,9 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
|
|||
#define CLIENT_READ_CONN_DISCONNECTED 11
|
||||
#define CLIENT_READ_CONN_CLOSED 12
|
||||
#define CLIENT_READ_REACHED_MAX_QUERYBUF 13
|
||||
#define CLIENT_READ_COMMAND_NOT_FOUND 14
|
||||
#define CLIENT_READ_BAD_ARITY 15
|
||||
#define CLIENT_READ_CROSS_SLOT 16
|
||||
|
||||
/* Client block type (btype field in client structure)
|
||||
* if CLIENT_BLOCKED flag is set. */
|
||||
|
|
@ -1146,16 +1152,11 @@ typedef struct rdbLoadingCtx {
|
|||
functionsLibCtx* functions_lib_ctx;
|
||||
}rdbLoadingCtx;
|
||||
|
||||
/* Client MULTI/EXEC state */
|
||||
typedef struct multiCmd {
|
||||
robj **argv;
|
||||
int argv_len;
|
||||
int argc;
|
||||
struct redisCommand *cmd;
|
||||
} multiCmd;
|
||||
|
||||
typedef struct pendingCommand pendingCommand;
|
||||
typedef struct multiState {
|
||||
multiCmd *commands; /* Array of MULTI commands */
|
||||
pendingCommand **commands; /* Array of pointers to MULTI commands */
|
||||
int executing_cmd; /* The index of the currently executed transaction
|
||||
command (index in commands field) */
|
||||
int count; /* Total number of MULTI commands */
|
||||
int cmd_flags; /* The accumulated command flags OR-ed together.
|
||||
So if at least a command has a given flag, it
|
||||
|
|
@ -1164,7 +1165,7 @@ typedef struct multiState {
|
|||
is possible to know if all the commands have a
|
||||
certain flag. */
|
||||
size_t argv_len_sums; /* mem used by all commands arguments */
|
||||
int alloc_count; /* total number of multiCmd struct memory reserved. */
|
||||
int alloc_count; /* total number of pendingCommand struct memory reserved. */
|
||||
} multiState;
|
||||
|
||||
/* This structure holds the blocking operation state for a client.
|
||||
|
|
@ -1213,6 +1214,24 @@ typedef struct readyList {
|
|||
robj *key;
|
||||
} readyList;
|
||||
|
||||
/* List of pending commands. */
|
||||
typedef struct pendingCommandList {
|
||||
pendingCommand *head;
|
||||
pendingCommand *tail;
|
||||
int len; /* Number of commands in the list */
|
||||
int ready_len; /* Number of commands that are ready to be processed */
|
||||
} pendingCommandList;
|
||||
|
||||
/* Pending command pool management structure */
|
||||
#define PENDING_COMMAND_POOL_SIZE 16
|
||||
#define PENDING_COMMAND_POOL_MAX_SIZE 1024
|
||||
typedef struct pendingCommandPool {
|
||||
pendingCommand **pool; /* Pool array for reusing pendingCommand objects */
|
||||
int size; /* Current number of objects in pool */
|
||||
int capacity; /* Current capacity of the pool array */
|
||||
int min_size; /* Minimum size since last check (indicates peak usage) */
|
||||
} pendingCommandPool;
|
||||
|
||||
/* This structure represents a Redis user. This is useful for ACLs, the
|
||||
* user is associated to the connection after the connection is authenticated.
|
||||
* If there is no associated user, the connection uses the default user. */
|
||||
|
|
@ -1243,7 +1262,7 @@ typedef struct readyList {
|
|||
|
||||
typedef struct {
|
||||
sds name; /* The username as an SDS string. */
|
||||
uint32_t flags; /* See USER_FLAG_* */
|
||||
redisAtomic uint32_t flags; /* See USER_FLAG_* */
|
||||
list *passwords; /* A list of SDS valid passwords for this user. */
|
||||
list *selectors; /* A list of selectors this user validates commands
|
||||
against. This list will always contain at least
|
||||
|
|
@ -1302,6 +1321,16 @@ typedef struct {
|
|||
size_t mem_usage_sum;
|
||||
} clientMemUsageBucket;
|
||||
|
||||
#define DEFERRED_OBJECT_TYPE_PENDING_COMMAND 1
|
||||
#define DEFERRED_OBJECT_TYPE_ROBJ 2
|
||||
/* Structure to hold objects that need to be freed later by IO threads.
|
||||
* This allows the main thread to defer memory cleanup operations to
|
||||
* IO threads to avoid blocking the main event loop. */
|
||||
typedef struct deferredObject {
|
||||
int type; /* Pointer to the object to be freed */
|
||||
void *ptr; /* Type of object: DEFERRED_OBJECT_TYPE_* */
|
||||
} deferredObject;
|
||||
|
||||
#define SHOULD_CLUSTER_COMPATIBILITY_SAMPLE() \
|
||||
(server.cluster_compatibility_sample_ratio == 100 || \
|
||||
(double)rand()/RAND_MAX * 100 < server.cluster_compatibility_sample_ratio)
|
||||
|
|
@ -1352,11 +1381,13 @@ typedef struct client {
|
|||
int argv_len; /* Size of argv array (may be more than argc) */
|
||||
int original_argc; /* Num of arguments of original command if arguments were rewritten. */
|
||||
robj **original_argv; /* Arguments of original command if arguments were rewritten. */
|
||||
size_t argv_len_sum; /* Sum of lengths of objects in argv list. */
|
||||
robj **deferred_objects; /* Array of deferred objects to free. */
|
||||
size_t all_argv_len_sum; /* Sum of lengths of objects in all pendingCommand argv lists */
|
||||
pendingCommandList pending_cmds; /* List of parsed pending commands */
|
||||
pendingCommand *current_pending_cmd;
|
||||
deferredObject *deferred_objects; /* Array of deferred objects to free. */
|
||||
int deferred_objects_num; /* Number of deferred objects to free. */
|
||||
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
|
||||
struct redisCommand *iolookedcmd; /* Command looked up in IO threads. */
|
||||
struct redisCommand *lookedcmd; /* Command looked up in lookahead. */
|
||||
struct redisCommand *realcmd; /* The original command that was executed by the client,
|
||||
Used to update error stats in case the c->cmd was modified
|
||||
during the command invocation (like on GEOADD for example). */
|
||||
|
|
@ -1392,6 +1423,7 @@ typedef struct client {
|
|||
sds replpreamble; /* Replication DB preamble. */
|
||||
long long read_reploff; /* Read replication offset if this is a master. */
|
||||
long long reploff; /* Applied replication offset if this is a master. */
|
||||
long long reploff_next; /* Next value to set for reploff when a command finishes executing */
|
||||
long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */
|
||||
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
|
||||
long long repl_aof_off; /* Replication AOF fsync ack offset, if this is a slave. */
|
||||
|
|
@ -1872,6 +1904,8 @@ struct redisServer {
|
|||
int io_threads_clients_num[IO_THREADS_MAX_NUM]; /* Number of clients assigned to each IO thread. */
|
||||
int io_threads_do_reads; /* Read and parse from IO threads? */
|
||||
int io_threads_active; /* Is IO threads currently active? */
|
||||
pendingCommandPool cmd_pool; /* Shared pool for reusing pendingCommand,
|
||||
* only when IO threads disabled */
|
||||
int prefetch_batch_max_size;/* Maximum number of keys to prefetch in a single batch */
|
||||
long long events_processed_while_blocked; /* processEventsWhileBlocked() */
|
||||
int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */
|
||||
|
|
@ -1993,6 +2027,7 @@ struct redisServer {
|
|||
int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */
|
||||
unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */
|
||||
size_t client_max_querybuf_len; /* Limit for client query buffer length */
|
||||
int lookahead; /* how many commands in each client pipeline to decode and prefetch */
|
||||
int dbnum; /* Total number of configured DBs */
|
||||
int supervised; /* 1 if supervised, 0 otherwise. */
|
||||
int supervised_mode; /* See SUPERVISED_* */
|
||||
|
|
@ -2365,6 +2400,32 @@ typedef struct {
|
|||
} getKeysResult;
|
||||
#define GETKEYS_RESULT_INIT { 0, MAX_KEYS_BUFFER, {{0}}, NULL }
|
||||
|
||||
/* pendingCommand flags */
|
||||
enum {
|
||||
PENDING_CMD_FLAG_INCOMPLETE = 1 << 0, /* Command parsing is incomplete, still waiting for more data */
|
||||
PENDING_CMD_FLAG_PREPROCESSED = 1 << 1, /* This command has passed pre-processing */
|
||||
PENDING_CMD_KEYS_RESULT_VALID = 1 << 2, /* Command's keys_result is valid and cached */
|
||||
};
|
||||
|
||||
/* Parser state and parse result of a command from a client's input buffer. */
|
||||
struct pendingCommand {
|
||||
int argc; /* Num of arguments of current command. */
|
||||
int argv_len; /* Size of argv array (may be more than argc) */
|
||||
robj **argv; /* Arguments of current command. */
|
||||
size_t argv_len_sum; /* Sum of lengths of objects in argv list. */
|
||||
unsigned long long input_bytes;
|
||||
struct redisCommand *cmd;
|
||||
getKeysResult keys_result;
|
||||
long long reploff; /* c->reploff should be set to this value when the command is processed */
|
||||
int flags;
|
||||
int slot; /* The slot the command is executing against. Set to INVALID_CLUSTER_SLOT
|
||||
* if no slot is being used or if the command has a cross slot error */
|
||||
uint8_t read_error;
|
||||
|
||||
struct pendingCommand *next;
|
||||
struct pendingCommand *prev;
|
||||
};
|
||||
|
||||
/* Key specs definitions.
|
||||
*
|
||||
* Brief: This is a scheme that tries to describe the location
|
||||
|
|
@ -2826,6 +2887,14 @@ void *moduleGetHandleByName(char *modulename);
|
|||
int moduleIsModuleCommand(void *module_handle, struct redisCommand *cmd);
|
||||
int moduleHasSubscribersForKeyspaceEvent(int type);
|
||||
|
||||
/* pcmd */
|
||||
void initPendingCommand(pendingCommand *pcmd);
|
||||
void freePendingCommand(client *c, pendingCommand *pcmd);
|
||||
void addPendingCommand(pendingCommandList *queue, pendingCommand *cmd);
|
||||
pendingCommand *popPendingCommandFromHead(pendingCommandList *queue);
|
||||
pendingCommand *popPendingCommandFromTail(pendingCommandList *queue);
|
||||
void shrinkPendingCommandPool(void);
|
||||
|
||||
/* Utils */
|
||||
long long ustime(void);
|
||||
mstime_t mstime(void);
|
||||
|
|
@ -2851,10 +2920,12 @@ void deauthenticateAndCloseClient(client *c);
|
|||
void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...);
|
||||
int beforeNextClient(client *c);
|
||||
void clearClientConnectionState(client *c);
|
||||
void resetClient(client *c);
|
||||
void resetClient(client *c, int num_pcmds_to_free);
|
||||
void resetClientQbufState(client *c);
|
||||
void freeClientOriginalArgv(client *c);
|
||||
void freeClientArgv(client *c);
|
||||
void tryDeferFreeClientObject(client *c, robj *o);
|
||||
void freeClientPendingCommands(client *c, int num_pcmds_to_free);
|
||||
void tryDeferFreeClientObject(client *c, int type, void *ptr);
|
||||
void freeClientDeferredObjects(client *c, int free_array);
|
||||
void sendReplyToClient(connection *conn);
|
||||
void *addReplyDeferredLen(client *c);
|
||||
|
|
@ -2863,6 +2934,7 @@ void setDeferredMapLen(client *c, void *node, long length);
|
|||
void setDeferredSetLen(client *c, void *node, long length);
|
||||
void setDeferredAttributeLen(client *c, void *node, long length);
|
||||
void setDeferredPushLen(client *c, void *node, long length);
|
||||
int isClientReadErrorFatal(client *c);
|
||||
int processInputBuffer(client *c);
|
||||
void acceptCommonHandler(connection *conn, int flags, char *ip);
|
||||
void readQueryFromClient(connection *conn);
|
||||
|
|
@ -2958,6 +3030,7 @@ void unprotectClient(client *c);
|
|||
client *lookupClientByID(uint64_t id);
|
||||
int authRequired(client *c);
|
||||
void putClientInPendingWriteQueue(client *c);
|
||||
getKeysResult *getClientCachedKeyResult(client *c);
|
||||
/* reply macros */
|
||||
#define ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c, str) addReplyBulkCBuffer(c, str, strlen(str))
|
||||
|
||||
|
|
@ -3276,7 +3349,7 @@ void ACLClearCommandID(void);
|
|||
user *ACLGetUserByName(const char *name, size_t namelen);
|
||||
int ACLUserCheckKeyPerm(user *u, const char *key, int keylen, int flags);
|
||||
int ACLUserCheckChannelPerm(user *u, sds channel, int literal);
|
||||
int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, int *idxptr);
|
||||
int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, getKeysResult *key_result, int *idxptr);
|
||||
int ACLUserCheckCmdWithUnrestrictedKeyAccess(user *u, struct redisCommand *cmd, robj **argv, int argc, int flags);
|
||||
int ACLCheckAllPerm(client *c, int *idxptr);
|
||||
int ACLSetUser(user *u, const char *op, ssize_t oplen);
|
||||
|
|
@ -3370,8 +3443,10 @@ void updatePeakMemory(size_t used_memory);
|
|||
size_t freeMemoryGetNotCountedMemory(void);
|
||||
int overMaxmemoryAfterAlloc(size_t moremem);
|
||||
uint64_t getCommandFlags(client *c);
|
||||
void preprocessCommand(client *c, pendingCommand *pcmd);
|
||||
int processCommand(client *c);
|
||||
void commandProcessed(client *c);
|
||||
void prepareForNextCommand(client *c, int update_slot_stats);
|
||||
int processPendingCommandAndInputBuffer(client *c);
|
||||
int processCommandAndResetClient(client *c);
|
||||
int areCommandKeysInSameSlot(client *c, int *hashslot);
|
||||
|
|
@ -3776,6 +3851,7 @@ int doesCommandHaveKeys(struct redisCommand *cmd);
|
|||
int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
|
||||
int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags);
|
||||
void getKeysFreeResult(getKeysResult *result);
|
||||
int extractKeysAndSlot(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result, int *slot);
|
||||
int sintercardGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result);
|
||||
int zunionInterDiffGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result);
|
||||
int zunionInterDiffStoreGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result);
|
||||
|
|
|
|||
|
|
@ -22,5 +22,15 @@ start_cluster 2 2 {tags {external:skip cluster}} {
|
|||
R 0 flushall
|
||||
assert_equal {OK} [R 0 CLUSTER flushslots]
|
||||
}
|
||||
}
|
||||
|
||||
test "CROSSSLOT error for keys in different slots" {
|
||||
# Test MSET with keys in different slots
|
||||
assert_error {*CROSSSLOT Keys in request don't hash to the same slot*} {R 0 MSET foo bar baz qux}
|
||||
|
||||
# Test DEL with keys in different slots
|
||||
assert_error {*CROSSSLOT Keys in request don't hash to the same slot*} {R 0 DEL foo bar}
|
||||
|
||||
# Test MGET with keys in different slots
|
||||
assert_error {*CROSSSLOT Keys in request don't hash to the same slot*} {R 0 MGET foo bar}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ start_cluster 1 1 {tags {external:skip cluster}} {
|
|||
$primary MULTI
|
||||
$primary SPUBLISH ch1 "hello"
|
||||
$primary GET foo
|
||||
catch {[$primary EXEC]} err
|
||||
catch {$primary EXEC} err
|
||||
assert_match {CROSSSLOT*} $err
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -76,6 +76,10 @@ run_solo {defrag} {
|
|||
}
|
||||
|
||||
proc test_active_defrag {type} {
|
||||
|
||||
# note: Disabling lookahead because it changes the number and order of allocations which interferes with defrag and causes tests to fail
|
||||
r config set lookahead 1
|
||||
|
||||
if {[string match {*jemalloc*} [s mem_allocator]] && [r debug mallctl arenas.page] <= 8192} {
|
||||
test "Active defrag main dictionary: $type" {
|
||||
r config set hz 100
|
||||
|
|
|
|||
|
|
@ -375,3 +375,38 @@ start_server {tags {"timeout external:skip"}} {
|
|||
assert_equal "PONG" [r ping]
|
||||
}
|
||||
}
|
||||
|
||||
test {Pending command pool expansion and shrinking} {
|
||||
start_server {overrides {loglevel debug} tags {external:skip}} {
|
||||
set rd1 [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
|
||||
# Client1 sends 16 commands in pipeline, and was blocked at the first command
|
||||
set buf ""
|
||||
append buf "blpop mylist 0\r\n"
|
||||
for {set i 1} {$i < 16} {incr i} {
|
||||
append buf "set key$i value$i\r\n"
|
||||
}
|
||||
$rd1 write $buf
|
||||
$rd1 flush
|
||||
wait_for_blocked_clients_count 1
|
||||
|
||||
# Client2 sends 1 command, this will trigger pending command pool expansion
|
||||
# from 16 to 32 since A client has used up all 16 commands in the command pool.
|
||||
$rd2 set bkey bvalue
|
||||
assert_equal {OK} [$rd2 read]
|
||||
|
||||
# Unblock client1, allowing it to return all pending commands back to the pool.
|
||||
r lpush mylist unblock_value
|
||||
assert_equal {mylist unblock_value} [$rd1 read]
|
||||
for {set i 1} {$i < 16} {incr i} {
|
||||
assert_equal {OK} [$rd1 read]
|
||||
}
|
||||
|
||||
# Wait for the pending command pool to shrink back to 16 due to low utilization.
|
||||
wait_for_log_messages 0 {"*Shrunk pending command pool: capacity 32->16*"} 0 10 1000
|
||||
|
||||
$rd1 close
|
||||
$rd2 close
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue