Introduce copy-free version streamPropagateXCLAIM to reduce allocation overhead in stream propagation (#14516)
Some checks failed
CI / test-ubuntu-latest (push) Has been cancelled
CI / test-sanitizer-address (push) Has been cancelled
CI / build-debian-old (push) Has been cancelled
CI / build-macos-latest (push) Has been cancelled
CI / build-32bit (push) Has been cancelled
CI / build-libc-malloc (push) Has been cancelled
CI / build-centos-jemalloc (push) Has been cancelled
CI / build-old-chain-jemalloc (push) Has been cancelled
Codecov / code-coverage (push) Has been cancelled
External Server Tests / test-external-standalone (push) Has been cancelled
External Server Tests / test-external-cluster (push) Has been cancelled
External Server Tests / test-external-nodebug (push) Has been cancelled
Spellcheck / Spellcheck (push) Has been cancelled

As seen in the following flamegraph, even after PR #14480, there a lot
of redundant work when propagating multiple XCLAIMs withing a
XREADGROUP.

This PR refactors streamPropagateXCLAIM to add a new static inline
variant, `streamPropagateXCLAIMCopyFree()`, which accepts pre-created
`robj*` arguments.
This enables reusing argument objects across multiple XCLAIM
propagations, reducing repeated creation and destruction costs during
high-throughput consumer group operations.
This commit is contained in:
Filipe Oliveira (Redis) 2025-11-07 11:55:30 +00:00 committed by GitHub
parent dc94d36295
commit a64e725034
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1717,10 +1717,10 @@ long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) {
return SCG_INVALID_ENTRIES_READ;
}
/* As a result of an explicit XCLAIM or XREADGROUP command, new entries
* are created in the pending list of the stream and consumers. We need
* to propagate this changes in the form of XCLAIM commands. */
void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) {
/* Copy-free version of streamPropagateXCLAIM that expects pre-created robj* arguments.
* This is useful when propagating multiple XCLAIMs in a loop to avoid repeated
* object creation/destruction overhead. */
static inline void streamPropagateXCLAIMCopyFree(int dbid, robj *key, robj *group_last_id, robj *groupname, robj *id, robj *consumername, robj *delivery_time, robj *delivery_count) {
/* We need to generate an XCLAIM that will work in a idempotent fashion:
*
* XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
@ -1732,24 +1732,36 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam
argv[0] = shared.xclaim;
argv[1] = key;
argv[2] = groupname;
argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name));
argv[3] = consumername;
argv[4] = shared.integers[0];
argv[5] = id;
argv[6] = shared.time;
argv[7] = createStringObjectFromLongLong(nack->delivery_time);
argv[7] = delivery_time;
argv[8] = shared.retrycount;
argv[9] = createStringObjectFromLongLong(nack->delivery_count);
argv[9] = delivery_count;
argv[10] = shared.force;
argv[11] = shared.justid;
argv[12] = shared.lastid;
argv[13] = createObjectFromStreamID(&group->last_id);
argv[13] = group_last_id;
alsoPropagate(c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
alsoPropagate(dbid,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
}
decrRefCount(argv[3]);
decrRefCount(argv[7]);
decrRefCount(argv[9]);
decrRefCount(argv[13]);
/* As a result of an explicit XCLAIM or XREADGROUP command, new entries
* are created in the pending list of the stream and consumers. We need
* to propagate this changes in the form of XCLAIM commands. */
static inline void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) {
robj *consumername = createStringObject(nack->consumer->name,sdslen(nack->consumer->name));
robj *delivery_time = createStringObjectFromLongLong(nack->delivery_time);
robj *delivery_count = createStringObjectFromLongLong(nack->delivery_count);
robj *group_last_id = createObjectFromStreamID(&group->last_id);
streamPropagateXCLAIMCopyFree(c->db->id, key, group_last_id, groupname, id, consumername, delivery_time, delivery_count);
decrRefCount(consumername);
decrRefCount(delivery_time);
decrRefCount(delivery_count);
decrRefCount(group_last_id);
}
/* We need this when we want to propagate the new last-id of a consumer group
@ -1863,7 +1875,17 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
streamID id;
int propagate_last_id = 0;
int noack = flags & STREAM_RWR_NOACK;
const int db_id = c->db->id;
const mstime_t cmd_time_snapshot = commandTimeSnapshot();
/* to be used in case of stream propagation */
robj *consumername = NULL;
robj *delivery_time = NULL;
robj *group_last_id = NULL;
if (spi && consumer) {
consumername = createStringObject(consumer->name,sdslen(consumer->name));
delivery_time = createStringObjectFromLongLong(cmd_time_snapshot);
group_last_id = createObjectFromStreamID(&group->last_id);
}
if (propCount) *propCount = 0;
if (group && min_idle_time != -1) {
@ -1891,7 +1913,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
while (raxNext(&ri)) {
pelTimeKey pelKey;
decodePelTimeKey(ri.key, &pelKey);
uint64_t idle = commandTimeSnapshot() - pelKey.delivery_time;
uint64_t idle = cmd_time_snapshot - pelKey.delivery_time;
if (idle < (uint64_t)min_idle_time)
break;
@ -1932,8 +1954,9 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
if (streamIteratorGetID(&si,&pel_id,&numfields)) {
/* Emit a four elements array: ID, array of field-value pairs,
* idle time and delivery count. */
robj *idarg = createObjectFromStreamID(&pel_id);
addReplyArrayLen(c,4);
addReplyStreamID(c,&pel_id);
addReplyBulk(c,idarg);
addReplyArrayLen(c,numfields*2);
/* Emit the field-value pairs. */
@ -1945,7 +1968,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
addReplyBulkCBuffer(c,value,value_len);
}
uint64_t idle = commandTimeSnapshot() - pelKey->delivery_time;
uint64_t idle = cmd_time_snapshot - pelKey->delivery_time;
addReplyBulkLongLong(c, idle);
addReplyBulkLongLong(c, delivery_count);
@ -1955,21 +1978,21 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
/* Transfer NACK to new consumer with updated metadata. */
nack->consumer = consumer;
nack->delivery_time = commandTimeSnapshot();
nack->delivery_time = cmd_time_snapshot;
nack->delivery_count++;
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &pel_id);
consumer->active_time = commandTimeSnapshot();
consumer->active_time = cmd_time_snapshot;
/* Propagate as XCLAIM. */
if (spi) {
robj *idarg = createObjectFromStreamID(&pel_id);
streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
decrRefCount(idarg);
robj *delivery_count = createStringObjectFromLongLong(nack->delivery_count);
streamPropagateXCLAIMCopyFree(db_id,spi->keyname,group_last_id,spi->groupname,idarg,consumername,delivery_time,delivery_count);
decrRefCount(delivery_count);
if (propCount) (*propCount)++;
}
decrRefCount(idarg);
arraylen++;
}
streamIteratorStop(&si);
@ -1982,6 +2005,11 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
* the history of messages delivered to it and not yet confirmed
* as delivered. */
if (group && (flags & STREAM_RWR_HISTORY)) {
if (spi && consumer) {
decrRefCount(delivery_time);
decrRefCount(consumername);
decrRefCount(group_last_id);
}
return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
group, consumer);
}
@ -1989,6 +2017,11 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
/* Stop here if client only wants claimed entries or count is satisfied. */
if ((group && (flags & STREAM_RWR_CLAIMED)) || (count && count == arraylen)) {
if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);
if (spi && consumer) {
decrRefCount(delivery_time);
decrRefCount(consumername);
decrRefCount(group_last_id);
}
return arraylen;
}
@ -2027,7 +2060,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
* the ID, the second is an array of field-value pairs. */
addReplyArrayLen(c,2);
}
addReplyStreamID(c,&id);
robj *idarg = createObjectFromStreamID(&id);
addReplyBulk(c,idarg);
addReplyArrayLen(c,numfields*2);
/* Emit the field-value pairs. */
@ -2081,7 +2115,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id);
/* Update the consumer and NACK metadata. */
nack->consumer = consumer;
nack->delivery_time = commandTimeSnapshot();
nack->delivery_time = cmd_time_snapshot;
nack->delivery_count = 1;
/* Add the entry in the new consumer local PEL. */
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
@ -2094,21 +2128,27 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
/* We have new NACK or updated existing one. */
raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id);
consumer->active_time = commandTimeSnapshot();
consumer->active_time = cmd_time_snapshot;
/* Propagate as XCLAIM. */
if (spi) {
robj *idarg = createObjectFromStreamID(&id);
streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
decrRefCount(idarg);
robj *delivery_count = createStringObjectFromLongLong(nack->delivery_count);
streamPropagateXCLAIMCopyFree(db_id,spi->keyname,group_last_id,spi->groupname,idarg,consumername,delivery_time,delivery_count);
decrRefCount(delivery_count);
if (propCount) (*propCount)++;
}
}
decrRefCount(idarg);
arraylen++;
if (count && count == arraylen) break;
}
if (spi && consumer) {
decrRefCount(delivery_time);
decrRefCount(consumername);
decrRefCount(group_last_id);
}
if (spi && propagate_last_id) {
streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
if (propCount) (*propCount)++;