Reduce overhead in command propagation (#15003)

Refactor command propagation code to reduce overhead on master

Currently, the main bottleneck is `feedReplicationBuffer()`. It is
called for each argument in the command and has bookkeeping overhead on
every call (e.g. checking whether to attach replicas to the replication
backlog). It is also not inlined by the compiler. These costs become
more visible with pipelining and commands with many arguments (e.g. HSET
with many fields).

Changes:

- Defer all bookkeeping to be done once per command instead of once per
command argument.
- Refactor the hot path so the compiler can inline
`replBufWriterAppend()`.
- Add `replBufWritterAppendBulkLen()` that uses shared RESP headers for
small values, avoiding formatting overhead.

These changes should not introduce any behavioral change.

**TODO:** In a follow-up PR, explore forwarding the exact command from
the client querybuf to avoid re-serialization. Many commands are
propagated without modification and can benefit from this.

--


| Benchmark | Before (ops/s) | After (ops/s) | Improvement |
|---|---|---|---|
| SET | 256,048 | 265,131 | **+3%** |
| SET (pipeline) | 1,477,310 | 1,671,272 | **+13%** |
| HSET 10 fields | 145,000 | 158,000 | **+9%** |
| HSET 10 fields (pipeline) | 363,483 | 430,855 | **+18%** |
| HSET 10 fields, 15B values (pipeline) | 387,443 | 487,135 | **+26%** |
| ZADD 5 members | 180,700 | 193,519 | **+7%** |
| ZADD 5 members (pipeline) | 466,453 | 564,872 | **+21%** |

------
Co-authored-by: Yuan Wang <yuan.wang@redis.com>
This commit is contained in:
Ozan Tezcan 2026-04-15 17:08:36 +03:00 committed by GitHub
parent 2f1a8b2bad
commit b89bc044a3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 173 additions and 131 deletions

View file

@ -376,23 +376,6 @@ int prepareReplicasToWrite(void) {
return prepared;
}
/* Wrapper for feedReplicationBuffer() that takes Redis string objects
* as input. */
void feedReplicationBufferWithObject(robj *o) {
char llstr[LONG_STR_SIZE];
void *p;
size_t len;
if (o->encoding == OBJ_ENCODING_INT) {
len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
p = llstr;
} else {
len = sdslen(o->ptr);
p = o->ptr;
}
feedReplicationBuffer(p,len);
}
/* Generally, we only have one replication buffer block to trim when replication
* backlog size exceeds our setting and no replica reference it. But if replica
* clients disconnect, we need to free many replication buffer blocks that are
@ -468,115 +451,175 @@ void freeReplicaReferencedReplBuffer(client *replica) {
replica->ref_block_pos = 0;
}
/* Append bytes into the global replication buffer list, replication backlog and
* all replica clients use replication buffers collectively, this function replace
* 'addReply*', 'feedReplicationBacklog' for replicas and replication backlog,
* First we add buffer into global replication buffer block list, and then
* update replica / replication-backlog referenced node and block position. */
void feedReplicationBuffer(char *s, size_t len) {
/* Batched write API for the global replication backlog, optimized for minimal
* overhead per append: data writes are just memcpys into the tail block.
* All bookkeeping is deferred to replBufWriterEnd(). */
typedef struct replBufWriter {
listNode *start_node; /* First repl buffer block written to. */
size_t start_pos; /* Byte offset within start_node where writing began. */
size_t total_len; /* Total bytes written across all writes. */
int new_blocks; /* Number of new blocks allocated during this stream. */
replBufBlock *tail; /* Current tail block. */
} replBufWriter;
/* Initialize the writer, cache the current tail position. */
static void replBufWriterBegin(replBufWriter *wr) {
listNode *ln = listLast(server.repl_buffer_blocks);
replBufBlock *tail = ln ? listNodeValue(ln) : NULL;
if (tail && tail->used < tail->size) {
wr->start_node = ln;
wr->start_pos = tail->used;
} else {
wr->start_node = NULL;
wr->start_pos = 0;
}
wr->total_len = 0;
wr->new_blocks = 0;
wr->tail = tail;
}
/* Allocate a new replication backlog block. Called when current block is full. */
static void replBufWriterAllocBlock(replBufWriter *wr, size_t hint) {
static long long repl_block_id = 0;
size_t usable_size;
/* Avoid creating nodes smaller than PROTO_REPLY_CHUNK_BYTES, so that we can append more data into them,
* and also avoid creating nodes bigger than repl_backlog_size / 16, so that we won't have huge nodes that can't
* trim when we only still need to hold a small portion from them. */
size_t limit = max((size_t)server.repl_backlog_size / 16, (size_t)PROTO_REPLY_CHUNK_BYTES);
size_t bsize = min(max(hint, (size_t)PROTO_REPLY_CHUNK_BYTES), limit);
replBufBlock *tail = zmalloc_usable(bsize + sizeof(replBufBlock), &usable_size);
/* Take over the allocation's internal fragmentation */
tail->size = usable_size - sizeof(replBufBlock);
tail->used = 0;
tail->refcount = 0;
tail->repl_offset = server.master_repl_offset + wr->total_len + 1;
tail->id = repl_block_id++;
listAddNodeTail(server.repl_buffer_blocks, tail);
server.repl_buffer_mem += (usable_size + sizeof(listNode));
createReplicationBacklogIndex(listLast(server.repl_buffer_blocks));
if (server.repl_backlog == NULL) return;
/* Update stream state. */
wr->tail = tail;
wr->new_blocks++;
if (wr->start_node == NULL) {
wr->start_node = listLast(server.repl_buffer_blocks);
wr->start_pos = 0;
}
}
clusterSlotStatsIncrNetworkBytesOutForReplication(len);
/* Slow path: fill remainder of current block + allocate as needed. */
static void replBufWriterAppendSlow(replBufWriter *wr, const char *buf, size_t len) {
while (len > 0) {
size_t avail = wr->tail ? wr->tail->size - wr->tail->used : 0;
if (avail > 0) {
size_t copy = (avail >= len) ? len : avail;
memcpy(wr->tail->buf + wr->tail->used, buf, copy);
wr->tail->used += copy;
wr->total_len += copy;
buf += copy;
len -= copy;
}
if (len > 0)
replBufWriterAllocBlock(wr, len);
}
}
/* Write data into the replication buffer. The slow path is split out to give
* the compiler a chance to inline the common case where the write fits entirely
* in the current block. */
static inline void replBufWriterAppend(replBufWriter *wr, const char *buf, size_t len) {
size_t avail = wr->tail ? wr->tail->size - wr->tail->used : 0;
if (len > 0 && avail >= len) {
memcpy(wr->tail->buf + wr->tail->used, buf, len);
wr->tail->used += len;
wr->total_len += len;
return;
}
replBufWriterAppendSlow(wr, buf, len);
}
/* Write a RESP header prefix<value>\r\n (e.g. "$12\r\n" or "*3\r\n").
* Uses pre-built shared objects for small values, formats manually otherwise. */
static inline void replBufWriterAppendBulkLen(replBufWriter *wr, char prefix, long long value) {
serverAssert(prefix == '$' || prefix == '*');
if (value >= 0 && value < OBJ_SHARED_BULKHDR_LEN) {
robj **tbl = (prefix == '$') ? shared.bulkhdr : shared.mbulkhdr;
replBufWriterAppend(wr, tbl[value]->ptr, OBJ_SHARED_HDR_STRLEN(value));
return;
}
char buf[LONG_STR_SIZE+3];
buf[0] = prefix;
int len = ll2string(buf+1, sizeof(buf)-1, value);
buf[len+1] = '\r';
buf[len+2] = '\n';
replBufWriterAppend(wr, buf, len+3);
}
/* Finalize the replication buffer write: update global offsets, set up replica
* references for new data, check output buffer limits, and trim the
* backlog if new blocks were allocated. */
static void replBufWriterEnd(replBufWriter *wr) {
if (wr->total_len == 0) return;
serverAssert(wr->start_node != NULL);
clusterSlotStatsIncrNetworkBytesOutForReplication(wr->total_len);
/* Update the current cmd's keys with the commands replication bytes*/
hotkeyMetrics metrics = {0, len};
hotkeyMetrics metrics = {0, wr->total_len};
hotkeyStatsUpdateCurrentCmd(server.hotkeys, metrics);
while(len > 0) {
size_t start_pos = 0; /* The position of referenced block to start sending. */
listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
int add_new_block = 0; /* Create new block if current block is total used. */
listNode *ln = listLast(server.repl_buffer_blocks);
replBufBlock *tail = ln ? listNodeValue(ln) : NULL;
server.master_repl_offset += wr->total_len;
server.repl_backlog->histlen += wr->total_len;
/* Append to tail string when possible. */
if (tail && tail->size > tail->used) {
start_node = listLast(server.repl_buffer_blocks);
start_pos = tail->used;
/* Copy the part we can fit into the tail, and leave the rest for a
* new node */
size_t avail = tail->size - tail->used;
size_t copy = (avail >= len) ? len : avail;
memcpy(tail->buf + tail->used, s, copy);
tail->used += copy;
s += copy;
len -= copy;
server.master_repl_offset += copy;
server.repl_backlog->histlen += copy;
}
if (len) {
/* Create a new node, make sure it is allocated to at
* least PROTO_REPLY_CHUNK_BYTES */
size_t usable_size;
/* Avoid creating nodes smaller than PROTO_REPLY_CHUNK_BYTES, so that we can append more data into them,
* and also avoid creating nodes bigger than repl_backlog_size / 16, so that we won't have huge nodes that can't
* trim when we only still need to hold a small portion from them. */
size_t limit = max((size_t)server.repl_backlog_size / 16, (size_t)PROTO_REPLY_CHUNK_BYTES);
size_t size = min(max(len, (size_t)PROTO_REPLY_CHUNK_BYTES), limit);
tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size);
/* Take over the allocation's internal fragmentation */
tail->size = usable_size - sizeof(replBufBlock);
size_t copy = (tail->size >= len) ? len : tail->size;
tail->used = copy;
tail->refcount = 0;
tail->repl_offset = server.master_repl_offset + 1;
tail->id = repl_block_id++;
memcpy(tail->buf, s, copy);
listAddNodeTail(server.repl_buffer_blocks, tail);
/* We also count the list node memory into replication buffer memory. */
server.repl_buffer_mem += (usable_size + sizeof(listNode));
add_new_block = 1;
if (start_node == NULL) {
start_node = listLast(server.repl_buffer_blocks);
start_pos = 0;
}
s += copy;
len -= copy;
server.master_repl_offset += copy;
server.repl_backlog->histlen += copy;
}
/* For output buffer of replicas. */
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (!canFeedReplicaReplBuffer(slave)) continue;
/* For output buffer of replicas. */
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (!canFeedReplicaReplBuffer(slave)) continue;
/* Update shared replication buffer start position. */
if (slave->ref_repl_buf_node == NULL) {
slave->ref_repl_buf_node = start_node;
slave->ref_block_pos = start_pos;
/* Only increase the start block reference count. */
((replBufBlock *)listNodeValue(start_node))->refcount++;
}
/* Check output buffer limit only when add new block. */
if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1);
}
/* For replication backlog */
if (server.repl_backlog->ref_repl_buf_node == NULL) {
server.repl_backlog->ref_repl_buf_node = start_node;
/* Update shared replication buffer start position. */
if (slave->ref_repl_buf_node == NULL) {
slave->ref_repl_buf_node = wr->start_node;
slave->ref_block_pos = wr->start_pos;
/* Only increase the start block reference count. */
((replBufBlock *)listNodeValue(start_node))->refcount++;
/* Replication buffer must be empty before adding replication stream
* into replication backlog. */
serverAssert(add_new_block == 1 && start_pos == 0);
((replBufBlock *)listNodeValue(wr->start_node))->refcount++;
}
if (add_new_block) {
createReplicationBacklogIndex(listLast(server.repl_buffer_blocks));
/* It is important to trim after adding replication data to keep the backlog size close to
* repl_backlog_size in the common case. We wait until we add a new block to avoid repeated
* unnecessary trimming attempts when small amounts of data are added. See comments in
* freeMemoryGetNotCountedMemory() for details on replication backlog memory tracking. */
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
/* Check output buffer limit only when new blocks were added. */
if (wr->new_blocks) closeClientOnOutputBufferLimitReached(slave, 1);
}
/* For replication backlog */
if (server.repl_backlog->ref_repl_buf_node == NULL) {
server.repl_backlog->ref_repl_buf_node = wr->start_node;
/* Only increase the start block reference count. */
((replBufBlock *)listNodeValue(wr->start_node))->refcount++;
/* Replication buffer must be empty before adding replication stream
* into replication backlog. */
serverAssert(wr->new_blocks > 0 && wr->start_pos == 0);
}
if (wr->new_blocks) {
/* It is important to trim after adding replication data to keep the backlog size close to
* repl_backlog_size in the common case. We wait until we add a new block to avoid repeated
* unnecessary trimming attempts when small amounts of data are added. See comments in
* freeMemoryGetNotCountedMemory() for details on replication backlog memory tracking. */
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
}
/* Append bytes into the global replication buffer. */
static void feedReplicationBuffer(const char *buf, size_t len) {
replBufWriter wr;
replBufWriterBegin(&wr);
replBufWriterAppend(&wr, buf, len);
replBufWriterEnd(&wr);
}
/* Propagate write commands to replication stream.
@ -642,7 +685,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
dictid_len, llstr));
}
feedReplicationBufferWithObject(selectcmd);
feedReplicationBuffer(selectcmd->ptr, sdslen(selectcmd->ptr));
/* Although the SELECT command is not associated with any slot,
* its per-slot network-bytes-out accumulation is made by the above function call.
@ -657,28 +700,28 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* Write the command to the replication buffer if any. */
char aux[LONG_STR_SIZE+3];
replBufWriter wr;
replBufWriterBegin(&wr);
/* Add the multi bulk reply length. */
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBuffer(aux,len+3);
/* Write the multi bulk count */
replBufWriterAppendBulkLen(&wr, '*', argc);
for (j = 0; j < argc; j++) {
/* Write the bulk count */
long objlen = stringObjectLen(argv[j]);
replBufWriterAppendBulkLen(&wr, '$', objlen);
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..CRLF payload len
* and add the final CRLF */
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBuffer(aux,len+3);
feedReplicationBufferWithObject(argv[j]);
feedReplicationBuffer(aux+len+1,2);
/* Write the bulk data */
if (argv[j]->encoding == OBJ_ENCODING_INT) {
len = ll2string(aux, sizeof(aux), (long)argv[j]->ptr);
replBufWriterAppend(&wr, aux, len);
} else {
replBufWriterAppend(&wr, argv[j]->ptr, objlen);
}
replBufWriterAppend(&wr, "\r\n", 2);
}
replBufWriterEnd(&wr);
}
/* This is a debugging function that gets called when we detect something

View file

@ -3360,7 +3360,6 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedStreamFromMasterStream(char *buf, size_t buflen);
void resetReplicationBuffer(void);
void feedReplicationBuffer(char *buf, size_t len);
void freeReplicaReferencedReplBuffer(client *replica);
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc);
void updateSlavesWaitingBgsave(int bgsaveerr, int type);