From b89bc044a3e58f1a8973df9f766254e048bca248 Mon Sep 17 00:00:00 2001 From: Ozan Tezcan Date: Wed, 15 Apr 2026 17:08:36 +0300 Subject: [PATCH] Reduce overhead in command propagation (#15003) Refactor command propagation code to reduce overhead on master Currently, the main bottleneck is `feedReplicationBuffer()`. It is called for each argument in the command and has bookkeeping overhead on every call (e.g. checking whether to attach replicas to the replication backlog). It is also not inlined by the compiler. These costs become more visible with pipelining and commands with many arguments (e.g. HSET with many fields). Changes: - Defer all bookkeeping to be done once per command instead of once per command argument. - Refactor the hot path so the compiler can inline `replBufWriterAppend()`. - Add `replBufWritterAppendBulkLen()` that uses shared RESP headers for small values, avoiding formatting overhead. These changes should not introduce any behavioral change. **TODO:** In a follow-up PR, explore forwarding the exact command from the client querybuf to avoid re-serialization. Many commands are propagated without modification and can benefit from this. -- | Benchmark | Before (ops/s) | After (ops/s) | Improvement | |---|---|---|---| | SET | 256,048 | 265,131 | **+3%** | | SET (pipeline) | 1,477,310 | 1,671,272 | **+13%** | | HSET 10 fields | 145,000 | 158,000 | **+9%** | | HSET 10 fields (pipeline) | 363,483 | 430,855 | **+18%** | | HSET 10 fields, 15B values (pipeline) | 387,443 | 487,135 | **+26%** | | ZADD 5 members | 180,700 | 193,519 | **+7%** | | ZADD 5 members (pipeline) | 466,453 | 564,872 | **+21%** | ------ Co-authored-by: Yuan Wang --- src/replication.c | 303 ++++++++++++++++++++++++++-------------------- src/server.h | 1 - 2 files changed, 173 insertions(+), 131 deletions(-) diff --git a/src/replication.c b/src/replication.c index f27ab8b7d..2ad39ab6f 100644 --- a/src/replication.c +++ b/src/replication.c @@ -376,23 +376,6 @@ int prepareReplicasToWrite(void) { return prepared; } -/* Wrapper for feedReplicationBuffer() that takes Redis string objects - * as input. */ -void feedReplicationBufferWithObject(robj *o) { - char llstr[LONG_STR_SIZE]; - void *p; - size_t len; - - if (o->encoding == OBJ_ENCODING_INT) { - len = ll2string(llstr,sizeof(llstr),(long)o->ptr); - p = llstr; - } else { - len = sdslen(o->ptr); - p = o->ptr; - } - feedReplicationBuffer(p,len); -} - /* Generally, we only have one replication buffer block to trim when replication * backlog size exceeds our setting and no replica reference it. But if replica * clients disconnect, we need to free many replication buffer blocks that are @@ -468,115 +451,175 @@ void freeReplicaReferencedReplBuffer(client *replica) { replica->ref_block_pos = 0; } -/* Append bytes into the global replication buffer list, replication backlog and - * all replica clients use replication buffers collectively, this function replace - * 'addReply*', 'feedReplicationBacklog' for replicas and replication backlog, - * First we add buffer into global replication buffer block list, and then - * update replica / replication-backlog referenced node and block position. */ -void feedReplicationBuffer(char *s, size_t len) { +/* Batched write API for the global replication backlog, optimized for minimal + * overhead per append: data writes are just memcpys into the tail block. + * All bookkeeping is deferred to replBufWriterEnd(). */ +typedef struct replBufWriter { + listNode *start_node; /* First repl buffer block written to. */ + size_t start_pos; /* Byte offset within start_node where writing began. */ + size_t total_len; /* Total bytes written across all writes. */ + int new_blocks; /* Number of new blocks allocated during this stream. */ + replBufBlock *tail; /* Current tail block. */ +} replBufWriter; + +/* Initialize the writer, cache the current tail position. */ +static void replBufWriterBegin(replBufWriter *wr) { + listNode *ln = listLast(server.repl_buffer_blocks); + replBufBlock *tail = ln ? listNodeValue(ln) : NULL; + + if (tail && tail->used < tail->size) { + wr->start_node = ln; + wr->start_pos = tail->used; + } else { + wr->start_node = NULL; + wr->start_pos = 0; + } + + wr->total_len = 0; + wr->new_blocks = 0; + wr->tail = tail; +} + +/* Allocate a new replication backlog block. Called when current block is full. */ +static void replBufWriterAllocBlock(replBufWriter *wr, size_t hint) { static long long repl_block_id = 0; + size_t usable_size; + /* Avoid creating nodes smaller than PROTO_REPLY_CHUNK_BYTES, so that we can append more data into them, + * and also avoid creating nodes bigger than repl_backlog_size / 16, so that we won't have huge nodes that can't + * trim when we only still need to hold a small portion from them. */ + size_t limit = max((size_t)server.repl_backlog_size / 16, (size_t)PROTO_REPLY_CHUNK_BYTES); + size_t bsize = min(max(hint, (size_t)PROTO_REPLY_CHUNK_BYTES), limit); + replBufBlock *tail = zmalloc_usable(bsize + sizeof(replBufBlock), &usable_size); + /* Take over the allocation's internal fragmentation */ + tail->size = usable_size - sizeof(replBufBlock); + tail->used = 0; + tail->refcount = 0; + tail->repl_offset = server.master_repl_offset + wr->total_len + 1; + tail->id = repl_block_id++; + listAddNodeTail(server.repl_buffer_blocks, tail); + server.repl_buffer_mem += (usable_size + sizeof(listNode)); + createReplicationBacklogIndex(listLast(server.repl_buffer_blocks)); - if (server.repl_backlog == NULL) return; + /* Update stream state. */ + wr->tail = tail; + wr->new_blocks++; + if (wr->start_node == NULL) { + wr->start_node = listLast(server.repl_buffer_blocks); + wr->start_pos = 0; + } +} - clusterSlotStatsIncrNetworkBytesOutForReplication(len); +/* Slow path: fill remainder of current block + allocate as needed. */ +static void replBufWriterAppendSlow(replBufWriter *wr, const char *buf, size_t len) { + while (len > 0) { + size_t avail = wr->tail ? wr->tail->size - wr->tail->used : 0; + if (avail > 0) { + size_t copy = (avail >= len) ? len : avail; + memcpy(wr->tail->buf + wr->tail->used, buf, copy); + wr->tail->used += copy; + wr->total_len += copy; + buf += copy; + len -= copy; + } + + if (len > 0) + replBufWriterAllocBlock(wr, len); + } +} + +/* Write data into the replication buffer. The slow path is split out to give + * the compiler a chance to inline the common case where the write fits entirely + * in the current block. */ +static inline void replBufWriterAppend(replBufWriter *wr, const char *buf, size_t len) { + size_t avail = wr->tail ? wr->tail->size - wr->tail->used : 0; + if (len > 0 && avail >= len) { + memcpy(wr->tail->buf + wr->tail->used, buf, len); + wr->tail->used += len; + wr->total_len += len; + return; + } + replBufWriterAppendSlow(wr, buf, len); +} + +/* Write a RESP header prefix\r\n (e.g. "$12\r\n" or "*3\r\n"). + * Uses pre-built shared objects for small values, formats manually otherwise. */ +static inline void replBufWriterAppendBulkLen(replBufWriter *wr, char prefix, long long value) { + serverAssert(prefix == '$' || prefix == '*'); + if (value >= 0 && value < OBJ_SHARED_BULKHDR_LEN) { + robj **tbl = (prefix == '$') ? shared.bulkhdr : shared.mbulkhdr; + replBufWriterAppend(wr, tbl[value]->ptr, OBJ_SHARED_HDR_STRLEN(value)); + return; + } + char buf[LONG_STR_SIZE+3]; + buf[0] = prefix; + int len = ll2string(buf+1, sizeof(buf)-1, value); + buf[len+1] = '\r'; + buf[len+2] = '\n'; + replBufWriterAppend(wr, buf, len+3); +} + + +/* Finalize the replication buffer write: update global offsets, set up replica + * references for new data, check output buffer limits, and trim the + * backlog if new blocks were allocated. */ +static void replBufWriterEnd(replBufWriter *wr) { + if (wr->total_len == 0) return; + + serverAssert(wr->start_node != NULL); + clusterSlotStatsIncrNetworkBytesOutForReplication(wr->total_len); /* Update the current cmd's keys with the commands replication bytes*/ - hotkeyMetrics metrics = {0, len}; + hotkeyMetrics metrics = {0, wr->total_len}; hotkeyStatsUpdateCurrentCmd(server.hotkeys, metrics); - while(len > 0) { - size_t start_pos = 0; /* The position of referenced block to start sending. */ - listNode *start_node = NULL; /* Replica/backlog starts referenced node. */ - int add_new_block = 0; /* Create new block if current block is total used. */ - listNode *ln = listLast(server.repl_buffer_blocks); - replBufBlock *tail = ln ? listNodeValue(ln) : NULL; + server.master_repl_offset += wr->total_len; + server.repl_backlog->histlen += wr->total_len; - /* Append to tail string when possible. */ - if (tail && tail->size > tail->used) { - start_node = listLast(server.repl_buffer_blocks); - start_pos = tail->used; - /* Copy the part we can fit into the tail, and leave the rest for a - * new node */ - size_t avail = tail->size - tail->used; - size_t copy = (avail >= len) ? len : avail; - memcpy(tail->buf + tail->used, s, copy); - tail->used += copy; - s += copy; - len -= copy; - server.master_repl_offset += copy; - server.repl_backlog->histlen += copy; - } - if (len) { - /* Create a new node, make sure it is allocated to at - * least PROTO_REPLY_CHUNK_BYTES */ - size_t usable_size; - /* Avoid creating nodes smaller than PROTO_REPLY_CHUNK_BYTES, so that we can append more data into them, - * and also avoid creating nodes bigger than repl_backlog_size / 16, so that we won't have huge nodes that can't - * trim when we only still need to hold a small portion from them. */ - size_t limit = max((size_t)server.repl_backlog_size / 16, (size_t)PROTO_REPLY_CHUNK_BYTES); - size_t size = min(max(len, (size_t)PROTO_REPLY_CHUNK_BYTES), limit); - tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size); - /* Take over the allocation's internal fragmentation */ - tail->size = usable_size - sizeof(replBufBlock); - size_t copy = (tail->size >= len) ? len : tail->size; - tail->used = copy; - tail->refcount = 0; - tail->repl_offset = server.master_repl_offset + 1; - tail->id = repl_block_id++; - memcpy(tail->buf, s, copy); - listAddNodeTail(server.repl_buffer_blocks, tail); - /* We also count the list node memory into replication buffer memory. */ - server.repl_buffer_mem += (usable_size + sizeof(listNode)); - add_new_block = 1; - if (start_node == NULL) { - start_node = listLast(server.repl_buffer_blocks); - start_pos = 0; - } - s += copy; - len -= copy; - server.master_repl_offset += copy; - server.repl_backlog->histlen += copy; - } + /* For output buffer of replicas. */ + listIter li; + listNode *ln; + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + client *slave = ln->value; + if (!canFeedReplicaReplBuffer(slave)) continue; - /* For output buffer of replicas. */ - listIter li; - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - if (!canFeedReplicaReplBuffer(slave)) continue; - - /* Update shared replication buffer start position. */ - if (slave->ref_repl_buf_node == NULL) { - slave->ref_repl_buf_node = start_node; - slave->ref_block_pos = start_pos; - /* Only increase the start block reference count. */ - ((replBufBlock *)listNodeValue(start_node))->refcount++; - } - - /* Check output buffer limit only when add new block. */ - if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1); - } - - /* For replication backlog */ - if (server.repl_backlog->ref_repl_buf_node == NULL) { - server.repl_backlog->ref_repl_buf_node = start_node; + /* Update shared replication buffer start position. */ + if (slave->ref_repl_buf_node == NULL) { + slave->ref_repl_buf_node = wr->start_node; + slave->ref_block_pos = wr->start_pos; /* Only increase the start block reference count. */ - ((replBufBlock *)listNodeValue(start_node))->refcount++; - - /* Replication buffer must be empty before adding replication stream - * into replication backlog. */ - serverAssert(add_new_block == 1 && start_pos == 0); + ((replBufBlock *)listNodeValue(wr->start_node))->refcount++; } - if (add_new_block) { - createReplicationBacklogIndex(listLast(server.repl_buffer_blocks)); - /* It is important to trim after adding replication data to keep the backlog size close to - * repl_backlog_size in the common case. We wait until we add a new block to avoid repeated - * unnecessary trimming attempts when small amounts of data are added. See comments in - * freeMemoryGetNotCountedMemory() for details on replication backlog memory tracking. */ - incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); - } + /* Check output buffer limit only when new blocks were added. */ + if (wr->new_blocks) closeClientOnOutputBufferLimitReached(slave, 1); } + + /* For replication backlog */ + if (server.repl_backlog->ref_repl_buf_node == NULL) { + server.repl_backlog->ref_repl_buf_node = wr->start_node; + /* Only increase the start block reference count. */ + ((replBufBlock *)listNodeValue(wr->start_node))->refcount++; + + /* Replication buffer must be empty before adding replication stream + * into replication backlog. */ + serverAssert(wr->new_blocks > 0 && wr->start_pos == 0); + } + if (wr->new_blocks) { + /* It is important to trim after adding replication data to keep the backlog size close to + * repl_backlog_size in the common case. We wait until we add a new block to avoid repeated + * unnecessary trimming attempts when small amounts of data are added. See comments in + * freeMemoryGetNotCountedMemory() for details on replication backlog memory tracking. */ + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); + } +} + +/* Append bytes into the global replication buffer. */ +static void feedReplicationBuffer(const char *buf, size_t len) { + replBufWriter wr; + replBufWriterBegin(&wr); + replBufWriterAppend(&wr, buf, len); + replBufWriterEnd(&wr); } /* Propagate write commands to replication stream. @@ -642,7 +685,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { dictid_len, llstr)); } - feedReplicationBufferWithObject(selectcmd); + feedReplicationBuffer(selectcmd->ptr, sdslen(selectcmd->ptr)); /* Although the SELECT command is not associated with any slot, * its per-slot network-bytes-out accumulation is made by the above function call. @@ -657,28 +700,28 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* Write the command to the replication buffer if any. */ char aux[LONG_STR_SIZE+3]; + replBufWriter wr; + replBufWriterBegin(&wr); - /* Add the multi bulk reply length. */ - aux[0] = '*'; - len = ll2string(aux+1,sizeof(aux)-1,argc); - aux[len+1] = '\r'; - aux[len+2] = '\n'; - feedReplicationBuffer(aux,len+3); + /* Write the multi bulk count */ + replBufWriterAppendBulkLen(&wr, '*', argc); for (j = 0; j < argc; j++) { + /* Write the bulk count */ long objlen = stringObjectLen(argv[j]); + replBufWriterAppendBulkLen(&wr, '$', objlen); - /* We need to feed the buffer with the object as a bulk reply - * not just as a plain string, so create the $..CRLF payload len - * and add the final CRLF */ - aux[0] = '$'; - len = ll2string(aux+1,sizeof(aux)-1,objlen); - aux[len+1] = '\r'; - aux[len+2] = '\n'; - feedReplicationBuffer(aux,len+3); - feedReplicationBufferWithObject(argv[j]); - feedReplicationBuffer(aux+len+1,2); + /* Write the bulk data */ + if (argv[j]->encoding == OBJ_ENCODING_INT) { + len = ll2string(aux, sizeof(aux), (long)argv[j]->ptr); + replBufWriterAppend(&wr, aux, len); + } else { + replBufWriterAppend(&wr, argv[j]->ptr, objlen); + } + replBufWriterAppend(&wr, "\r\n", 2); } + + replBufWriterEnd(&wr); } /* This is a debugging function that gets called when we detect something diff --git a/src/server.h b/src/server.h index 8a0ba3cbd..091f7c9ac 100644 --- a/src/server.h +++ b/src/server.h @@ -3360,7 +3360,6 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout); void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); void replicationFeedStreamFromMasterStream(char *buf, size_t buflen); void resetReplicationBuffer(void); -void feedReplicationBuffer(char *buf, size_t len); void freeReplicaReferencedReplBuffer(client *replica); void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc); void updateSlavesWaitingBgsave(int bgsaveerr, int type);