diff --git a/src/replication.c b/src/replication.c index f27ab8b7d..2ad39ab6f 100644 --- a/src/replication.c +++ b/src/replication.c @@ -376,23 +376,6 @@ int prepareReplicasToWrite(void) { return prepared; } -/* Wrapper for feedReplicationBuffer() that takes Redis string objects - * as input. */ -void feedReplicationBufferWithObject(robj *o) { - char llstr[LONG_STR_SIZE]; - void *p; - size_t len; - - if (o->encoding == OBJ_ENCODING_INT) { - len = ll2string(llstr,sizeof(llstr),(long)o->ptr); - p = llstr; - } else { - len = sdslen(o->ptr); - p = o->ptr; - } - feedReplicationBuffer(p,len); -} - /* Generally, we only have one replication buffer block to trim when replication * backlog size exceeds our setting and no replica reference it. But if replica * clients disconnect, we need to free many replication buffer blocks that are @@ -468,115 +451,175 @@ void freeReplicaReferencedReplBuffer(client *replica) { replica->ref_block_pos = 0; } -/* Append bytes into the global replication buffer list, replication backlog and - * all replica clients use replication buffers collectively, this function replace - * 'addReply*', 'feedReplicationBacklog' for replicas and replication backlog, - * First we add buffer into global replication buffer block list, and then - * update replica / replication-backlog referenced node and block position. */ -void feedReplicationBuffer(char *s, size_t len) { +/* Batched write API for the global replication backlog, optimized for minimal + * overhead per append: data writes are just memcpys into the tail block. + * All bookkeeping is deferred to replBufWriterEnd(). */ +typedef struct replBufWriter { + listNode *start_node; /* First repl buffer block written to. */ + size_t start_pos; /* Byte offset within start_node where writing began. */ + size_t total_len; /* Total bytes written across all writes. */ + int new_blocks; /* Number of new blocks allocated during this stream. */ + replBufBlock *tail; /* Current tail block. */ +} replBufWriter; + +/* Initialize the writer, cache the current tail position. */ +static void replBufWriterBegin(replBufWriter *wr) { + listNode *ln = listLast(server.repl_buffer_blocks); + replBufBlock *tail = ln ? listNodeValue(ln) : NULL; + + if (tail && tail->used < tail->size) { + wr->start_node = ln; + wr->start_pos = tail->used; + } else { + wr->start_node = NULL; + wr->start_pos = 0; + } + + wr->total_len = 0; + wr->new_blocks = 0; + wr->tail = tail; +} + +/* Allocate a new replication backlog block. Called when current block is full. */ +static void replBufWriterAllocBlock(replBufWriter *wr, size_t hint) { static long long repl_block_id = 0; + size_t usable_size; + /* Avoid creating nodes smaller than PROTO_REPLY_CHUNK_BYTES, so that we can append more data into them, + * and also avoid creating nodes bigger than repl_backlog_size / 16, so that we won't have huge nodes that can't + * trim when we only still need to hold a small portion from them. */ + size_t limit = max((size_t)server.repl_backlog_size / 16, (size_t)PROTO_REPLY_CHUNK_BYTES); + size_t bsize = min(max(hint, (size_t)PROTO_REPLY_CHUNK_BYTES), limit); + replBufBlock *tail = zmalloc_usable(bsize + sizeof(replBufBlock), &usable_size); + /* Take over the allocation's internal fragmentation */ + tail->size = usable_size - sizeof(replBufBlock); + tail->used = 0; + tail->refcount = 0; + tail->repl_offset = server.master_repl_offset + wr->total_len + 1; + tail->id = repl_block_id++; + listAddNodeTail(server.repl_buffer_blocks, tail); + server.repl_buffer_mem += (usable_size + sizeof(listNode)); + createReplicationBacklogIndex(listLast(server.repl_buffer_blocks)); - if (server.repl_backlog == NULL) return; + /* Update stream state. */ + wr->tail = tail; + wr->new_blocks++; + if (wr->start_node == NULL) { + wr->start_node = listLast(server.repl_buffer_blocks); + wr->start_pos = 0; + } +} - clusterSlotStatsIncrNetworkBytesOutForReplication(len); +/* Slow path: fill remainder of current block + allocate as needed. */ +static void replBufWriterAppendSlow(replBufWriter *wr, const char *buf, size_t len) { + while (len > 0) { + size_t avail = wr->tail ? wr->tail->size - wr->tail->used : 0; + if (avail > 0) { + size_t copy = (avail >= len) ? len : avail; + memcpy(wr->tail->buf + wr->tail->used, buf, copy); + wr->tail->used += copy; + wr->total_len += copy; + buf += copy; + len -= copy; + } + + if (len > 0) + replBufWriterAllocBlock(wr, len); + } +} + +/* Write data into the replication buffer. The slow path is split out to give + * the compiler a chance to inline the common case where the write fits entirely + * in the current block. */ +static inline void replBufWriterAppend(replBufWriter *wr, const char *buf, size_t len) { + size_t avail = wr->tail ? wr->tail->size - wr->tail->used : 0; + if (len > 0 && avail >= len) { + memcpy(wr->tail->buf + wr->tail->used, buf, len); + wr->tail->used += len; + wr->total_len += len; + return; + } + replBufWriterAppendSlow(wr, buf, len); +} + +/* Write a RESP header prefix\r\n (e.g. "$12\r\n" or "*3\r\n"). + * Uses pre-built shared objects for small values, formats manually otherwise. */ +static inline void replBufWriterAppendBulkLen(replBufWriter *wr, char prefix, long long value) { + serverAssert(prefix == '$' || prefix == '*'); + if (value >= 0 && value < OBJ_SHARED_BULKHDR_LEN) { + robj **tbl = (prefix == '$') ? shared.bulkhdr : shared.mbulkhdr; + replBufWriterAppend(wr, tbl[value]->ptr, OBJ_SHARED_HDR_STRLEN(value)); + return; + } + char buf[LONG_STR_SIZE+3]; + buf[0] = prefix; + int len = ll2string(buf+1, sizeof(buf)-1, value); + buf[len+1] = '\r'; + buf[len+2] = '\n'; + replBufWriterAppend(wr, buf, len+3); +} + + +/* Finalize the replication buffer write: update global offsets, set up replica + * references for new data, check output buffer limits, and trim the + * backlog if new blocks were allocated. */ +static void replBufWriterEnd(replBufWriter *wr) { + if (wr->total_len == 0) return; + + serverAssert(wr->start_node != NULL); + clusterSlotStatsIncrNetworkBytesOutForReplication(wr->total_len); /* Update the current cmd's keys with the commands replication bytes*/ - hotkeyMetrics metrics = {0, len}; + hotkeyMetrics metrics = {0, wr->total_len}; hotkeyStatsUpdateCurrentCmd(server.hotkeys, metrics); - while(len > 0) { - size_t start_pos = 0; /* The position of referenced block to start sending. */ - listNode *start_node = NULL; /* Replica/backlog starts referenced node. */ - int add_new_block = 0; /* Create new block if current block is total used. */ - listNode *ln = listLast(server.repl_buffer_blocks); - replBufBlock *tail = ln ? listNodeValue(ln) : NULL; + server.master_repl_offset += wr->total_len; + server.repl_backlog->histlen += wr->total_len; - /* Append to tail string when possible. */ - if (tail && tail->size > tail->used) { - start_node = listLast(server.repl_buffer_blocks); - start_pos = tail->used; - /* Copy the part we can fit into the tail, and leave the rest for a - * new node */ - size_t avail = tail->size - tail->used; - size_t copy = (avail >= len) ? len : avail; - memcpy(tail->buf + tail->used, s, copy); - tail->used += copy; - s += copy; - len -= copy; - server.master_repl_offset += copy; - server.repl_backlog->histlen += copy; - } - if (len) { - /* Create a new node, make sure it is allocated to at - * least PROTO_REPLY_CHUNK_BYTES */ - size_t usable_size; - /* Avoid creating nodes smaller than PROTO_REPLY_CHUNK_BYTES, so that we can append more data into them, - * and also avoid creating nodes bigger than repl_backlog_size / 16, so that we won't have huge nodes that can't - * trim when we only still need to hold a small portion from them. */ - size_t limit = max((size_t)server.repl_backlog_size / 16, (size_t)PROTO_REPLY_CHUNK_BYTES); - size_t size = min(max(len, (size_t)PROTO_REPLY_CHUNK_BYTES), limit); - tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size); - /* Take over the allocation's internal fragmentation */ - tail->size = usable_size - sizeof(replBufBlock); - size_t copy = (tail->size >= len) ? len : tail->size; - tail->used = copy; - tail->refcount = 0; - tail->repl_offset = server.master_repl_offset + 1; - tail->id = repl_block_id++; - memcpy(tail->buf, s, copy); - listAddNodeTail(server.repl_buffer_blocks, tail); - /* We also count the list node memory into replication buffer memory. */ - server.repl_buffer_mem += (usable_size + sizeof(listNode)); - add_new_block = 1; - if (start_node == NULL) { - start_node = listLast(server.repl_buffer_blocks); - start_pos = 0; - } - s += copy; - len -= copy; - server.master_repl_offset += copy; - server.repl_backlog->histlen += copy; - } + /* For output buffer of replicas. */ + listIter li; + listNode *ln; + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + client *slave = ln->value; + if (!canFeedReplicaReplBuffer(slave)) continue; - /* For output buffer of replicas. */ - listIter li; - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - if (!canFeedReplicaReplBuffer(slave)) continue; - - /* Update shared replication buffer start position. */ - if (slave->ref_repl_buf_node == NULL) { - slave->ref_repl_buf_node = start_node; - slave->ref_block_pos = start_pos; - /* Only increase the start block reference count. */ - ((replBufBlock *)listNodeValue(start_node))->refcount++; - } - - /* Check output buffer limit only when add new block. */ - if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1); - } - - /* For replication backlog */ - if (server.repl_backlog->ref_repl_buf_node == NULL) { - server.repl_backlog->ref_repl_buf_node = start_node; + /* Update shared replication buffer start position. */ + if (slave->ref_repl_buf_node == NULL) { + slave->ref_repl_buf_node = wr->start_node; + slave->ref_block_pos = wr->start_pos; /* Only increase the start block reference count. */ - ((replBufBlock *)listNodeValue(start_node))->refcount++; - - /* Replication buffer must be empty before adding replication stream - * into replication backlog. */ - serverAssert(add_new_block == 1 && start_pos == 0); + ((replBufBlock *)listNodeValue(wr->start_node))->refcount++; } - if (add_new_block) { - createReplicationBacklogIndex(listLast(server.repl_buffer_blocks)); - /* It is important to trim after adding replication data to keep the backlog size close to - * repl_backlog_size in the common case. We wait until we add a new block to avoid repeated - * unnecessary trimming attempts when small amounts of data are added. See comments in - * freeMemoryGetNotCountedMemory() for details on replication backlog memory tracking. */ - incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); - } + /* Check output buffer limit only when new blocks were added. */ + if (wr->new_blocks) closeClientOnOutputBufferLimitReached(slave, 1); } + + /* For replication backlog */ + if (server.repl_backlog->ref_repl_buf_node == NULL) { + server.repl_backlog->ref_repl_buf_node = wr->start_node; + /* Only increase the start block reference count. */ + ((replBufBlock *)listNodeValue(wr->start_node))->refcount++; + + /* Replication buffer must be empty before adding replication stream + * into replication backlog. */ + serverAssert(wr->new_blocks > 0 && wr->start_pos == 0); + } + if (wr->new_blocks) { + /* It is important to trim after adding replication data to keep the backlog size close to + * repl_backlog_size in the common case. We wait until we add a new block to avoid repeated + * unnecessary trimming attempts when small amounts of data are added. See comments in + * freeMemoryGetNotCountedMemory() for details on replication backlog memory tracking. */ + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); + } +} + +/* Append bytes into the global replication buffer. */ +static void feedReplicationBuffer(const char *buf, size_t len) { + replBufWriter wr; + replBufWriterBegin(&wr); + replBufWriterAppend(&wr, buf, len); + replBufWriterEnd(&wr); } /* Propagate write commands to replication stream. @@ -642,7 +685,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { dictid_len, llstr)); } - feedReplicationBufferWithObject(selectcmd); + feedReplicationBuffer(selectcmd->ptr, sdslen(selectcmd->ptr)); /* Although the SELECT command is not associated with any slot, * its per-slot network-bytes-out accumulation is made by the above function call. @@ -657,28 +700,28 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* Write the command to the replication buffer if any. */ char aux[LONG_STR_SIZE+3]; + replBufWriter wr; + replBufWriterBegin(&wr); - /* Add the multi bulk reply length. */ - aux[0] = '*'; - len = ll2string(aux+1,sizeof(aux)-1,argc); - aux[len+1] = '\r'; - aux[len+2] = '\n'; - feedReplicationBuffer(aux,len+3); + /* Write the multi bulk count */ + replBufWriterAppendBulkLen(&wr, '*', argc); for (j = 0; j < argc; j++) { + /* Write the bulk count */ long objlen = stringObjectLen(argv[j]); + replBufWriterAppendBulkLen(&wr, '$', objlen); - /* We need to feed the buffer with the object as a bulk reply - * not just as a plain string, so create the $..CRLF payload len - * and add the final CRLF */ - aux[0] = '$'; - len = ll2string(aux+1,sizeof(aux)-1,objlen); - aux[len+1] = '\r'; - aux[len+2] = '\n'; - feedReplicationBuffer(aux,len+3); - feedReplicationBufferWithObject(argv[j]); - feedReplicationBuffer(aux+len+1,2); + /* Write the bulk data */ + if (argv[j]->encoding == OBJ_ENCODING_INT) { + len = ll2string(aux, sizeof(aux), (long)argv[j]->ptr); + replBufWriterAppend(&wr, aux, len); + } else { + replBufWriterAppend(&wr, argv[j]->ptr, objlen); + } + replBufWriterAppend(&wr, "\r\n", 2); } + + replBufWriterEnd(&wr); } /* This is a debugging function that gets called when we detect something diff --git a/src/server.h b/src/server.h index 8a0ba3cbd..091f7c9ac 100644 --- a/src/server.h +++ b/src/server.h @@ -3360,7 +3360,6 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout); void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); void replicationFeedStreamFromMasterStream(char *buf, size_t buflen); void resetReplicationBuffer(void); -void feedReplicationBuffer(char *buf, size_t len); void freeReplicaReferencedReplBuffer(client *replica); void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc); void updateSlavesWaitingBgsave(int bgsaveerr, int type);