diff --git a/src/t_stream.c b/src/t_stream.c index 4b47546d7a..0f5e1da912 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1717,10 +1717,10 @@ long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) { return SCG_INVALID_ENTRIES_READ; } -/* As a result of an explicit XCLAIM or XREADGROUP command, new entries - * are created in the pending list of the stream and consumers. We need - * to propagate this changes in the form of XCLAIM commands. */ -void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) { +/* Copy-free version of streamPropagateXCLAIM that expects pre-created robj* arguments. + * This is useful when propagating multiple XCLAIMs in a loop to avoid repeated + * object creation/destruction overhead. */ +static inline void streamPropagateXCLAIMCopyFree(int dbid, robj *key, robj *group_last_id, robj *groupname, robj *id, robj *consumername, robj *delivery_time, robj *delivery_count) { /* We need to generate an XCLAIM that will work in a idempotent fashion: * * XCLAIM 0 TIME @@ -1732,24 +1732,36 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam argv[0] = shared.xclaim; argv[1] = key; argv[2] = groupname; - argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name)); + argv[3] = consumername; argv[4] = shared.integers[0]; argv[5] = id; argv[6] = shared.time; - argv[7] = createStringObjectFromLongLong(nack->delivery_time); + argv[7] = delivery_time; argv[8] = shared.retrycount; - argv[9] = createStringObjectFromLongLong(nack->delivery_count); + argv[9] = delivery_count; argv[10] = shared.force; argv[11] = shared.justid; argv[12] = shared.lastid; - argv[13] = createObjectFromStreamID(&group->last_id); + argv[13] = group_last_id; - alsoPropagate(c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(dbid,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); +} - decrRefCount(argv[3]); - decrRefCount(argv[7]); - decrRefCount(argv[9]); - decrRefCount(argv[13]); +/* As a result of an explicit XCLAIM or XREADGROUP command, new entries + * are created in the pending list of the stream and consumers. We need + * to propagate this changes in the form of XCLAIM commands. */ +static inline void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) { + robj *consumername = createStringObject(nack->consumer->name,sdslen(nack->consumer->name)); + robj *delivery_time = createStringObjectFromLongLong(nack->delivery_time); + robj *delivery_count = createStringObjectFromLongLong(nack->delivery_count); + robj *group_last_id = createObjectFromStreamID(&group->last_id); + + streamPropagateXCLAIMCopyFree(c->db->id, key, group_last_id, groupname, id, consumername, delivery_time, delivery_count); + + decrRefCount(consumername); + decrRefCount(delivery_time); + decrRefCount(delivery_count); + decrRefCount(group_last_id); } /* We need this when we want to propagate the new last-id of a consumer group @@ -1863,7 +1875,17 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end streamID id; int propagate_last_id = 0; int noack = flags & STREAM_RWR_NOACK; - + const int db_id = c->db->id; + const mstime_t cmd_time_snapshot = commandTimeSnapshot(); + /* to be used in case of stream propagation */ + robj *consumername = NULL; + robj *delivery_time = NULL; + robj *group_last_id = NULL; + if (spi && consumer) { + consumername = createStringObject(consumer->name,sdslen(consumer->name)); + delivery_time = createStringObjectFromLongLong(cmd_time_snapshot); + group_last_id = createObjectFromStreamID(&group->last_id); + } if (propCount) *propCount = 0; if (group && min_idle_time != -1) { @@ -1891,7 +1913,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end while (raxNext(&ri)) { pelTimeKey pelKey; decodePelTimeKey(ri.key, &pelKey); - uint64_t idle = commandTimeSnapshot() - pelKey.delivery_time; + uint64_t idle = cmd_time_snapshot - pelKey.delivery_time; if (idle < (uint64_t)min_idle_time) break; @@ -1932,8 +1954,9 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end if (streamIteratorGetID(&si,&pel_id,&numfields)) { /* Emit a four elements array: ID, array of field-value pairs, * idle time and delivery count. */ + robj *idarg = createObjectFromStreamID(&pel_id); addReplyArrayLen(c,4); - addReplyStreamID(c,&pel_id); + addReplyBulk(c,idarg); addReplyArrayLen(c,numfields*2); /* Emit the field-value pairs. */ @@ -1945,7 +1968,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end addReplyBulkCBuffer(c,value,value_len); } - uint64_t idle = commandTimeSnapshot() - pelKey->delivery_time; + uint64_t idle = cmd_time_snapshot - pelKey->delivery_time; addReplyBulkLongLong(c, idle); addReplyBulkLongLong(c, delivery_count); @@ -1955,21 +1978,21 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end /* Transfer NACK to new consumer with updated metadata. */ nack->consumer = consumer; - nack->delivery_time = commandTimeSnapshot(); + nack->delivery_time = cmd_time_snapshot; nack->delivery_count++; raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &pel_id); - consumer->active_time = commandTimeSnapshot(); + consumer->active_time = cmd_time_snapshot; /* Propagate as XCLAIM. */ if (spi) { - robj *idarg = createObjectFromStreamID(&pel_id); - streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack); - decrRefCount(idarg); + robj *delivery_count = createStringObjectFromLongLong(nack->delivery_count); + streamPropagateXCLAIMCopyFree(db_id,spi->keyname,group_last_id,spi->groupname,idarg,consumername,delivery_time,delivery_count); + decrRefCount(delivery_count); if (propCount) (*propCount)++; } - + decrRefCount(idarg); arraylen++; } streamIteratorStop(&si); @@ -1982,6 +2005,11 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end * the history of messages delivered to it and not yet confirmed * as delivered. */ if (group && (flags & STREAM_RWR_HISTORY)) { + if (spi && consumer) { + decrRefCount(delivery_time); + decrRefCount(consumername); + decrRefCount(group_last_id); + } return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count, group, consumer); } @@ -1989,6 +2017,11 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end /* Stop here if client only wants claimed entries or count is satisfied. */ if ((group && (flags & STREAM_RWR_CLAIMED)) || (count && count == arraylen)) { if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen); + if (spi && consumer) { + decrRefCount(delivery_time); + decrRefCount(consumername); + decrRefCount(group_last_id); + } return arraylen; } @@ -2027,7 +2060,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end * the ID, the second is an array of field-value pairs. */ addReplyArrayLen(c,2); } - addReplyStreamID(c,&id); + robj *idarg = createObjectFromStreamID(&id); + addReplyBulk(c,idarg); addReplyArrayLen(c,numfields*2); /* Emit the field-value pairs. */ @@ -2081,7 +2115,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id); /* Update the consumer and NACK metadata. */ nack->consumer = consumer; - nack->delivery_time = commandTimeSnapshot(); + nack->delivery_time = cmd_time_snapshot; nack->delivery_count = 1; /* Add the entry in the new consumer local PEL. */ raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); @@ -2094,21 +2128,27 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end /* We have new NACK or updated existing one. */ raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id); - consumer->active_time = commandTimeSnapshot(); + consumer->active_time = cmd_time_snapshot; /* Propagate as XCLAIM. */ if (spi) { - robj *idarg = createObjectFromStreamID(&id); - streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack); - decrRefCount(idarg); + robj *delivery_count = createStringObjectFromLongLong(nack->delivery_count); + streamPropagateXCLAIMCopyFree(db_id,spi->keyname,group_last_id,spi->groupname,idarg,consumername,delivery_time,delivery_count); + decrRefCount(delivery_count); if (propCount) (*propCount)++; } } - + decrRefCount(idarg); arraylen++; if (count && count == arraylen) break; } + if (spi && consumer) { + decrRefCount(delivery_time); + decrRefCount(consumername); + decrRefCount(group_last_id); + } + if (spi && propagate_last_id) { streamPropagateGroupID(c,spi->keyname,group,spi->groupname); if (propCount) (*propCount)++;