mirror of
https://github.com/redis/redis.git
synced 2026-06-08 16:24:26 -04:00
Offload length formatting in tryAvoidBulkStrCopyToReply() to IO thread (#14844)
## Summary In the reply copy-avoidance path (bulkStrRef), the RESP bulk-string prefix `$<len>\r\n` was formatted eagerly on the main thread inside `_addBulkStrRefToBufferOrList()`. This PR defers that formatting to write time via a new idempotent helper `formatBulkStrRefPrefix()`, so the work happens on the IO thread for clients served by IO threads. --------- Co-authored-by: Yuan Wang <yuan.wang@redis.com>
This commit is contained in:
parent
a36cfc0829
commit
65401042dc
3 changed files with 20 additions and 12 deletions
|
|
@ -112,6 +112,7 @@ static size_t reqresAppendEncodedBuffer(client *c, char *buf, size_t len) {
|
|||
} else {
|
||||
/* BULK_STR_REF - expand to full RESP format */
|
||||
bulkStrRef *str_ref = (bulkStrRef *)(ptr + sizeof(payloadHeader));
|
||||
formatBulkStrRefPrefix(str_ref);
|
||||
|
||||
/* Append prefix: "$<len>\r\n" */
|
||||
ret += reqresAppendBuffer(c, str_ref->prefix, str_ref->prefix_cnt);
|
||||
|
|
|
|||
|
|
@ -552,6 +552,17 @@ static inline int clientIsInPendingRefReplyList(client *c) {
|
|||
listFirst(server.clients_with_pending_ref_reply) == &c->pending_ref_reply_node;
|
||||
}
|
||||
|
||||
/* Format "$<len>\r\n" into str_ref->prefix and set prefix_cnt. Called at write time. */
|
||||
unsigned int formatBulkStrRefPrefix(bulkStrRef *str_ref) {
|
||||
if (str_ref->prefix_cnt != 0) return str_ref->prefix_cnt;
|
||||
str_ref->prefix[0] = '$';
|
||||
size_t num_len = ll2string(str_ref->prefix + 1, sizeof(str_ref->prefix) - 3, sdslen(str_ref->obj->ptr));
|
||||
str_ref->prefix[num_len + 1] = '\r';
|
||||
str_ref->prefix[num_len + 2] = '\n';
|
||||
str_ref->prefix_cnt = (unsigned int)(num_len + 3);
|
||||
return str_ref->prefix_cnt;
|
||||
}
|
||||
|
||||
/* Increment reference to object and add pointer to object and
|
||||
* pointer to string itself to current reply buffer */
|
||||
static void _addBulkStrRefToBufferOrList(client *c, robj *obj, size_t len) {
|
||||
|
|
@ -560,18 +571,12 @@ static void _addBulkStrRefToBufferOrList(client *c, robj *obj, size_t len) {
|
|||
bulkStrRef str_ref;
|
||||
str_ref.obj = obj;
|
||||
incrRefCount(obj); /* Refcount will be decremented in write handler */
|
||||
|
||||
/* Fill prefix with bulk string length: "$<len>\r\n" */
|
||||
str_ref.prefix[0] = '$';
|
||||
size_t num_len = ll2string(str_ref.prefix + 1, sizeof(str_ref.prefix) - 3, len);
|
||||
str_ref.prefix[num_len + 1] = '\r';
|
||||
str_ref.prefix[num_len + 2] = '\n';
|
||||
str_ref.prefix_cnt = num_len + 3;
|
||||
str_ref.prefix_cnt = 0; /* filled at write time */
|
||||
str_ref.crlf[0] = '\r';
|
||||
str_ref.crlf[1] = '\n';
|
||||
|
||||
/* Track output bytes: bulk string prefix + content + trailing CRLF */
|
||||
c->net_output_bytes_curr_cmd += str_ref.prefix_cnt + len + 2;
|
||||
c->net_output_bytes_curr_cmd += digits10(len) + 3 + len + 2;
|
||||
|
||||
/* We call it here because this function may affect the reply
|
||||
* buffer offset (see function comment) */
|
||||
|
|
@ -2465,9 +2470,9 @@ static void processEncodedBufferForWrite(ReplyIOV *reply_iov, char *start_ptr, c
|
|||
reply_iov->iov[reply_iov->iovcnt].iov_len = head->payload_len - offset;
|
||||
reply_iov->iov_bytes_len += reply_iov->iov[reply_iov->iovcnt++].iov_len;
|
||||
} else {
|
||||
/* BULK_STR_REF - expand to prefix + string + crlf */
|
||||
/* BULK_STR_REF - expand to prefix + string + crlf (format prefix at write time) */
|
||||
bulkStrRef *str_ref = (bulkStrRef *)(ptr + sizeof(payloadHeader));
|
||||
size_t prefix_len = str_ref->prefix_cnt;
|
||||
size_t prefix_len = formatBulkStrRefPrefix(str_ref);
|
||||
size_t str_len = sdslen(str_ref->obj->ptr);
|
||||
|
||||
/* Add prefix */
|
||||
|
|
@ -2528,6 +2533,7 @@ static payloadHeader *processSentDataInEncodedBuffer(client *c, char *start_ptr,
|
|||
} else {
|
||||
/* BULK_STR_REF - release object references */
|
||||
bulkStrRef *str_ref = (bulkStrRef *)(ptr + sizeof(payloadHeader));
|
||||
formatBulkStrRefPrefix(str_ref); /* ensure prefix_cnt is set for writen_len */
|
||||
|
||||
size_t writen_len = str_ref->prefix_cnt + sdslen(str_ref->obj->ptr) + 2;
|
||||
if (*remaining < (ssize_t)(writen_len - *sentlen)) {
|
||||
|
|
|
|||
|
|
@ -1139,8 +1139,8 @@ static_assert(offsetof(payloadHeader, payload_len) == 0, "payload_len must be at
|
|||
* we store pointers to object and string itself */
|
||||
typedef struct __attribute__((__packed__)) bulkStrRef {
|
||||
robj *obj; /* pointer to object used for reference count management */
|
||||
unsigned int prefix_cnt;
|
||||
char prefix[LONG_STR_SIZE + 3]; /* $<len>\r\n */
|
||||
unsigned int prefix_cnt; /* length of prefix; 0 means prefix not yet formatted */
|
||||
char prefix[LONG_STR_SIZE + 3]; /* $<len>\r\n, lazily filled when writing */
|
||||
char crlf[2]; /* \r\n */
|
||||
} bulkStrRef;
|
||||
|
||||
|
|
@ -3250,6 +3250,7 @@ void addReplyLoadedModules(client *c);
|
|||
void copyReplicaOutputBuffer(client *dst, client *src);
|
||||
void addListRangeReply(client *c, robj *o, long start, long end, int reverse);
|
||||
void deferredAfterErrorReply(client *c, list *errors);
|
||||
unsigned int formatBulkStrRefPrefix(bulkStrRef *str_ref);
|
||||
size_t sdsZmallocSize(sds s);
|
||||
size_t getStringObjectSdsUsedMemory(robj *o);
|
||||
void freeClientReplyValue(void *o);
|
||||
|
|
|
|||
Loading…
Reference in a new issue