From cecdc998736d81108711982cc0fb7b90bdf1c624 Mon Sep 17 00:00:00 2001 From: Sergei Georgiev Date: Tue, 27 Jan 2026 14:16:16 +0200 Subject: [PATCH] Optimize Redis XREADGROUP CLAIM (#14726) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Overview This PR optimizes Redis Streams consumer group performance by replacing the `pel_by_time` rax tree with a doubly-linked list, delivering significant performance improvements for NACK updates and XREADGROUP CLAIM operations while also reducing memory usage. ## The Problem Consumer groups maintain a time-ordered index of pending entries using a radix tree (`pel_by_time`). Every time a pending entry is reclaimed or delivered, we need to update its delivery time, which currently requires: ```c raxRemovePelByTime(group->pel_by_time, old_time, &id); // O(k) where k=key length nack->delivery_time = current_time; raxInsertPelByTime(group->pel_by_time, current_time, &id); // O(k) where k=key length ``` ## The Key Insight **99% of delivery_time updates set the value to the current time** — which means they're appending to the tail of a time-ordered structure. We're using a radix tree (O(k) operations where k is key length, plus tree traversal overhead) for what is essentially an append-only workload (should be O(1)). ## The Solution Replace the rax tree with a doubly-linked list embedded directly in each `streamNACK`: ```c typedef struct streamNACK { mstime_t delivery_time; uint64_t delivery_count; streamConsumer *consumer; listNode *cgroup_ref_node; streamID id; // NEW struct streamNACK *pel_prev; // NEW struct streamNACK *pel_next; // NEW } streamNACK; ``` Now updating a NACK becomes: ```c pelListUpdate(group, nack, current_time); // O(1): unlink + append ``` ## Why This Works **Typical case (99%):** Delivery time = current time - Unlink from current position: O(1) — just update 2-4 pointers - Append to tail: O(1) — update tail pointer and link **Edge case (1%):** XCLAIM with explicit past IDLE time - Still handled correctly by `pelListInsertSorted()` which scans backward from tail - Rare enough that O(N) worst case doesn't matter ## Memory Reduction The linked list approach uses less memory than the rax tree: **What we add:** - 3 new fields in `streamNACK`: `id` (16 bytes) + `pel_prev` (8 bytes) + `pel_next` (8 bytes) = 32 bytes per entry **What we remove:** - Entire `pel_by_time` rax tree with its node overhead (~40-50 bytes per entry) **Net result:** Lower memory footprint per pending entry, plus better cache locality from eliminating the separate tree structure. ## Performance Impact ### Theoretical Analysis | Operation | Before | After | |-----------|--------|-------| | NACK update | O(k) × 2 + tree overhead | O(1) | | CLAIM iteration | O(k) per entry + traversal | O(1) per entry | *k = key length (32 bytes: timestamp + stream ID)* For a consumer group with 10,000 pending entries claiming 100 oldest: - **Before:** Tree traversal + key comparisons for each operation - **After:** Simple pointer updates **Key Findings:** - **28% higher throughput** for XREADGROUP with CLAIM - **22% lower average latency** (0.195ms → 0.152ms) - **21% lower P99 latency** (0.212ms → 0.168ms) - XADD performance unchanged (69K ops/sec both implementations) --- src/defrag.c | 25 +++- src/rdb.c | 9 +- src/stream.h | 36 ++--- src/t_stream.c | 371 ++++++++++++++++++++++++++----------------------- 4 files changed, 241 insertions(+), 200 deletions(-) diff --git a/src/defrag.c b/src/defrag.c index d4b06cc07e..b058dfd01c 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -867,11 +867,26 @@ void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) { nack->cgroup_ref_node->value = ctx->cg; /* Update the value of cgroups_ref node to the consumer group. */ newnack = activeDefragAlloc(nack); if (newnack) { - /* Update consumer group pointer to the nack. - * pel_by_time doesn't need updating since delivery time is unchanged. */ + /* Update consumer group pointer to the nack. */ void *prev; raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev); serverAssert(prev==nack); + + /* Update the doubly-linked list pointers in adjacent nacks. + * When we move a nack to a new address, we need to update the + * pel_prev->pel_next and pel_next->pel_prev pointers. */ + if (newnack->pel_prev) { + newnack->pel_prev->pel_next = newnack; + } else { + /* This is the head of the list */ + ctx->cg->pel_time_head = newnack; + } + if (newnack->pel_next) { + newnack->pel_next->pel_prev = newnack; + } else { + /* This is the tail of the list */ + ctx->cg->pel_time_tail = newnack; + } } return newnack; } @@ -912,11 +927,7 @@ void* defragStreamConsumerGroup(raxIterator *ri, void *privdata) { cg->pel->alloc_size = &s->alloc_size; defragRadixTree(&cg->pel, 0, NULL, NULL); } - if (cg->pel_by_time) { - /* Update pel_by_time back-pointer to new stream */ - cg->pel_by_time->alloc_size = &s->alloc_size; - defragRadixTree(&cg->pel_by_time, 0, NULL, NULL); - } + /* pel_time_head/tail are just pointers to NACKs in pel, no separate defrag needed */ if (cg->consumers) { /* Update consumers back-pointer to new stream */ cg->consumers->alloc_size = &s->alloc_size; diff --git a/src/rdb.c b/src/rdb.c index c67236b4a9..45d213b969 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3233,7 +3233,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) decrRefCount(o); return NULL; } - streamNACK *nack = streamCreateNACK(s, NULL); + streamID nack_id; + streamDecodeID(rawid, &nack_id); + streamNACK *nack = streamCreateNACK(s, NULL, &nack_id); nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); nack->delivery_count = rdbLoadLen(rdb,NULL); nack->cgroup_ref_node = streamLinkCGroupToEntry(s, cgroup, rawid); @@ -3251,9 +3253,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) return NULL; } - streamID id; - streamDecodeID(rawid, &id); - raxInsertPelByTime(cgroup->pel_by_time, nack->delivery_time, &id); + /* Insert in sorted order since RDB entries may not be time-ordered */ + pelListInsertSorted(cgroup, nack); } /* Now that we loaded our global PEL, we need to load the diff --git a/src/stream.h b/src/stream.h index 8112e38a55..03dfb14f20 100644 --- a/src/stream.h +++ b/src/stream.h @@ -86,6 +86,9 @@ typedef struct streamIterator { unsigned char value_buf[LP_INTBUF_SIZE]; } streamIterator; +/* Forward declarations */ +typedef struct streamNACK streamNACK; + /* Consumer group. */ typedef struct streamCG { streamID last_id; /* Last delivered (not acknowledged) ID for this @@ -102,11 +105,12 @@ typedef struct streamCG { as processed. The key of the radix tree is the ID as a 64 bit big endian number, while the associated value is a streamNACK structure.*/ - rax *pel_by_time; /* A radix tree mapping delivery time to pending - entries, so that we can query faster PEL entries - by time. The key is a pelTimeKey structure containing - both delivery_time and stream ID. All information is - in the key; no value is stored. */ + streamNACK *pel_time_head; /* Head of time-ordered doubly-linked list of pending + entries (oldest delivery_time). Used for efficient + CLAIM operations. O(1) access to oldest entries. */ + streamNACK *pel_time_tail; /* Tail of time-ordered doubly-linked list of pending + entries (newest delivery_time). O(1) append for + updates that set delivery_time to current time. */ rax *consumers; /* A radix tree representing the consumers by name and their associated representation in the form of streamConsumer structures. */ @@ -129,13 +133,16 @@ typedef struct streamConsumer { } streamConsumer; /* Pending (yet not acknowledged) message in a consumer group. */ -typedef struct streamNACK { +struct streamNACK { mstime_t delivery_time; /* Last time this message was delivered. */ uint64_t delivery_count; /* Number of times this message was delivered.*/ streamConsumer *consumer; /* The consumer this message was delivered to in the last delivery. */ listNode *cgroup_ref_node; /* Reference to this NACK in the cgroups_ref list. */ -} streamNACK; + streamID id; /* Stream ID for this pending entry. */ + struct streamNACK *pel_prev; /* Previous NACK in time-ordered doubly-linked list. */ + struct streamNACK *pel_next; /* Next NACK in time-ordered doubly-linked list. */ +}; /* Stream propagation information, passed to functions in order to propagate * XCLAIM commands to AOF and slaves. */ @@ -144,12 +151,6 @@ typedef struct streamPropInfo { robj *groupname; } streamPropInfo; -/* Pending entry in the consumer group's PEL, indexed by delivery time. */ -typedef struct pelTimeKey { - uint64_t delivery_time; - streamID id; -} pelTimeKey; - /* Prototypes of exported APIs. */ struct client; @@ -173,10 +174,11 @@ streamCG *streamLookupCG(stream *s, sds groupname); streamConsumer *streamLookupConsumer(streamCG *cg, sds name); streamConsumer *streamCreateConsumer(stream *s, streamCG *cg, sds name, robj *key, int dbid, int flags); streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read); -streamNACK *streamCreateNACK(stream *s, streamConsumer *consumer); +streamNACK *streamCreateNACK(stream *s, streamConsumer *consumer, streamID *id); void streamDecodeID(void *buf, streamID *id); int streamCompareID(streamID *a, streamID *b); void streamFreeNACK(stream *s, streamNACK *na); +void streamDestroyNACK(stream *s, streamNACK *na, unsigned char *key); int streamIncrID(streamID *id); int streamDecrID(streamID *id); void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername); @@ -193,10 +195,8 @@ int64_t streamTrimByID(stream *s, streamID minid, int approx); listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, unsigned char *key); -void encodePelTimeKey(void* buf, pelTimeKey *timeKey); -void decodePelTimeKey(void *buf, pelTimeKey *timeKey); -void raxInsertPelByTime(rax *pel_by_time, uint64_t delivery_time, streamID *id); -void raxRemovePelByTime(rax *pel_by_time, uint64_t delivery_time, streamID *id); +/* PEL time list management (used by RDB loading) */ +void pelListInsertSorted(streamCG *cg, streamNACK *nack); /* IDMP functions */ idmpEntry *idmpEntryCreate(const char *iid, size_t iid_len, size_t *alloc_size); diff --git a/src/t_stream.c b/src/t_stream.c index a47431df6d..d9cca65d8c 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -55,6 +55,11 @@ static idmpProducer *idmpGetOrCreateProducer(stream *s, const char *pid, size_t static int createIdempotencyHash(robj **argv, int64_t numfields, XXH128_hash_t *out_hash); static void idmpEvictOldestEntry(stream *s, idmpProducer *producer); +/* Forward declarations for PEL time list functions */ +static void pelListInsertAtTail(streamCG *cg, streamNACK *nack); +static void pelListUnlink(streamCG *cg, streamNACK *nack); +static void pelListUpdate(streamCG *cg, streamNACK *nack, mstime_t new_delivery_time); + /* ----------------------------------------------------------------------- * Low level stream encoding: a radix tree of listpacks. * ----------------------------------------------------------------------- */ @@ -239,15 +244,16 @@ robj *streamDup(robj *o) { raxSeek(&ri_cg_pel,"^",NULL,0); while(raxNext(&ri_cg_pel)){ streamNACK *nack = ri_cg_pel.data; - streamNACK *new_nack = streamCreateNACK(new_s, NULL); + streamID nack_id; + streamDecodeID(ri_cg_pel.key, &nack_id); + streamNACK *new_nack = streamCreateNACK(new_s, NULL, &nack_id); new_nack->delivery_time = nack->delivery_time; new_nack->delivery_count = nack->delivery_count; new_nack->cgroup_ref_node = streamLinkCGroupToEntry(new_s, new_cg, ri_cg_pel.key); raxInsert(new_cg->pel, ri_cg_pel.key, sizeof(streamID), new_nack, NULL); - streamID id; - streamDecodeID(ri_cg_pel.key, &id); - raxInsertPelByTime(new_cg->pel_by_time, new_nack->delivery_time, &id); + /* Insert in sorted order to preserve ordering */ + pelListInsertSorted(new_cg, new_nack); } raxStop(&ri_cg_pel); @@ -1963,76 +1969,41 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end if (group && min_idle_time != -1) { arraylen_ptr = addReplyDeferredLen(c); - /* Scan the group's pending entries list (PEL) to find messages that have been - * idle for at least min_idle_time milliseconds. The pel_by_time radix tree - * stores entries ordered by their last delivery timestamp, allowing us to - * efficiently iterate from oldest to newest. + /* Scan and process the group's pending entries list (PEL) in a single loop. + * To prevent a dead loop caused by pelListUpdate() moving elements from the + * beginning to the end of the list, we store the current tail pointer before + * processing. We iterate only up to this pre-determined boundary, ensuring we + * never process entries that are added or moved during iteration. * - * We collect eligible entries into a temporary list rather than processing - * them inline because: - * 1. We cannot safely modify a radix tree while iterating over it - * 2. The claiming process requires removing and re-inserting entries in - * both pel_by_time and the consumer PELs - * - * The iteration can terminate early in two cases: - * 1. We find an entry that hasn't been idle long enough - due to time-based - * ordering, all subsequent entries will be even newer - * 2. We've collected enough entries to satisfy the requested count limit */ - list *eligible_pels = listCreate(); - listSetFreeMethod(eligible_pels, zfree); - raxIterator ri; - raxStart(&ri, group->pel_by_time); - raxSeek(&ri, "^", NULL, 0); - while (raxNext(&ri)) { - pelTimeKey pelKey; - decodePelTimeKey(ri.key, &pelKey); - uint64_t idle = cmd_time_snapshot - pelKey.delivery_time; - if (idle < (uint64_t)min_idle_time) - break; - - /* Store a copy of the key for later processing */ - pelTimeKey *keyCopy = zmalloc(sizeof(pelTimeKey)); - memcpy(keyCopy, &pelKey, sizeof(pelTimeKey)); - listAddNodeTail(eligible_pels, keyCopy); + * The iteration can terminate early when: + * 1. We find an entry that hasn't been idle long enough + * 2. We've processed enough entries to satisfy the count limit + * 3. We reach the pre-stored tail boundary */ + + /* Store the current tail to prevent infinite loops */ + streamNACK *tail = group->pel_time_tail; + size_t processed = 0; + + streamNACK *nack = group->pel_time_head; + while (nack) { + /* Capture next pointer BEFORE modifications (pelListUpdate may reorder) */ + streamNACK *next = nack->pel_next; - if (count && listLength(eligible_pels) >= count) break; - } - raxStop(&ri); + uint64_t idle = cmd_time_snapshot - nack->delivery_time; + if (idle < (uint64_t)min_idle_time) break; - /* Process each eligible pending entry, claiming it for the current consumer. - * For each entry we: - * 1. Fetch the actual message data from the stream - * 2. Send the message to the client with metadata (idle time, delivery count) - * 3. Transfer ownership from the previous consumer to the current consumer - * 4. Update all relevant data structures and propagate the claim operation */ - listIter li; - listNode *ln; - listRewind(eligible_pels, &li); - while ((ln = listNext(&li))) { - pelTimeKey *pelKey = (pelTimeKey*)listNodeValue(ln); - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf, &pelKey->id); - - void *result; - streamNACK *nack = NULL; - uint64_t delivery_count = 0; - /* Must exist, we got the ID from pel_by_time */ - serverAssert(raxFind(group->pel,buf,sizeof(buf),&result)); - - nack = (streamNACK*)result; - delivery_count = nack->delivery_count; + /* Process and claim this entry */ + uint64_t delivery_count = nack->delivery_count; streamID pel_id; - streamIteratorStart(&si,s,&pelKey->id,&pelKey->id,rev); + streamIteratorStart(&si,s,&nack->id,&nack->id,rev); if (streamIteratorGetID(&si,&pel_id,&numfields)) { - /* Emit a four elements array: ID, array of field-value pairs, - * idle time and delivery count. */ robj *idarg = createObjectFromStreamID(&pel_id); addReplyArrayLen(c,4); addReplyBulk(c,idarg); addReplyArrayLen(c,numfields*2); - /* Emit the field-value pairs. */ + /* Emit field-value pairs */ while (numfields--) { unsigned char *key, *value; int64_t key_len, value_len; @@ -2041,24 +2012,23 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end addReplyBulkCBuffer(c,value,value_len); } - uint64_t idle = cmd_time_snapshot - pelKey->delivery_time; addReplyLongLong(c, idle); addReplyLongLong(c, delivery_count); - /* Remove the NACK from old consumer and time-based PEL. */ - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &pel_id); - - /* Transfer NACK to new consumer with updated metadata. */ - nack->consumer = consumer; - nack->delivery_time = cmd_time_snapshot; + /* Transfer ownership if needed */ + if (nack->consumer != consumer) { + unsigned char buf[sizeof(streamID)]; + streamEncodeID(buf, &nack->id); + raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + nack->consumer = consumer; + raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); + } nack->delivery_count++; - raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); - raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &pel_id); + pelListUpdate(group, nack, cmd_time_snapshot); /* Moves element from beginning to end of list */ consumer->active_time = cmd_time_snapshot; - /* Propagate as XCLAIM. */ + /* Propagate as XCLAIM */ if (spi) { robj *delivery_count = createStringObjectFromLongLong(nack->delivery_count); streamPropagateXCLAIMCopyFree(db_id,spi->keyname,group_last_id,spi->groupname,idarg,consumername,delivery_time,delivery_count); @@ -2067,10 +2037,18 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end } decrRefCount(idarg); arraylen++; + + /* Check count limit */ + if (count && ++processed >= count) { + streamIteratorStop(&si); + break; + } } - streamIteratorStop(&si); + streamIteratorStop(&si); + + /* Advance to next, stopping if we reached the tail */ + nack = (nack == tail) ? NULL : next; } - listRelease(eligible_pels); } /* If the client is asking for some history, we serve it using a * different function, so that we return entries *solely* from its @@ -2168,11 +2146,9 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end /* Try to add a new NACK. Most of the time this will work and * will not require extra lookups. We'll fix the problem later * if we find that there is already an entry for this ID. */ - streamNACK *nack = streamCreateNACK(s, consumer); + streamNACK *nack = streamCreateNACK(s, consumer, &id); int group_inserted = raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL); - int consumer_inserted = - raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL); /* Now we can check if the entry was already busy, and * in that case reassign the entry to the new consumer, @@ -2183,24 +2159,22 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end int found = raxFind(group->pel,buf,sizeof(buf),&result); serverAssert(found); nack = result; - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); - /* Remove old entry from the PEL by time. */ - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id); - /* Update the consumer and NACK metadata. */ - nack->consumer = consumer; - nack->delivery_time = cmd_time_snapshot; + /* Only transfer between consumers if they're different */ + if (nack->consumer != consumer) { + raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + nack->consumer = consumer; + raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); + } nack->delivery_count = 1; - /* Add the entry in the new consumer local PEL. */ + /* Update delivery time and reposition in time list */ + pelListUpdate(group, nack, cmd_time_snapshot); + } else { + /* New NACK - insert into consumer's PEL and time list */ raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); - } else if (group_inserted == 1 && consumer_inserted == 1) { nack->cgroup_ref_node = streamLinkCGroupToEntry(s, group, buf); - } else if (group_inserted == 1 && consumer_inserted == 0) { - serverPanic("NACK half-created. Should not be possible."); + pelListInsertAtTail(group, nack); } - /* We have new NACK or updated existing one. */ - raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id); - consumer->active_time = cmd_time_snapshot; /* Propagate as XCLAIM. */ @@ -2272,12 +2246,8 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start addReplyNullArray(c); } else { streamNACK *nack = ri.data; - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &thisid); - - nack->delivery_time = commandTimeSnapshot(); nack->delivery_count++; - - raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &thisid); + pelListUpdate(group, nack, commandTimeSnapshot()); } arraylen++; } @@ -2849,7 +2819,7 @@ void xreadCommand(client *c) { /* Try to serve the client synchronously. */ size_t arraylen = 0; void *arraylen_ptr = NULL; - uint64_t min_pel_delivery_time = UINT64_MAX; + mstime_t min_pel_delivery_time = LLONG_MAX; for (int i = 0; i < streams_count; i++) { kvobj *o = lookupKeyRead(c->db, c->argv[streams_arg + i]); if (o == NULL) continue; @@ -2869,26 +2839,25 @@ void xreadCommand(client *c) { * get the minimum delivery time in the PEL, in order to use it * later if block option is set. */ if (min_idle_time != -1) { - raxIterator ri; - raxStart(&ri, groups[i]->pel_by_time); - raxSeek(&ri, "^", NULL, 0); - while(raxNext(&ri)) { - pelTimeKey timeKey; - decodePelTimeKey(ri.key, &timeKey); - if (!streamEntryExists(s, &timeKey.id)) + streamNACK *nack = groups[i]->pel_time_head; + /* Iterate through PEL entries to find the first one that exists */ + while (nack) { + /* Skip entries that don't exist in the stream anymore */ + if (!streamEntryExists(s, &nack->id)) { + nack = nack->pel_next; continue; - - if (timeKey.delivery_time < min_pel_delivery_time) { - min_pel_delivery_time = timeKey.delivery_time; } - uint64_t idle = commandTimeSnapshot() - timeKey.delivery_time; + if (nack->delivery_time < min_pel_delivery_time) { + min_pel_delivery_time = nack->delivery_time; + } + + uint64_t idle = commandTimeSnapshot() - nack->delivery_time; if (idle >= (uint64_t)min_idle_time) { serve_claimed = 1; } - break; + break; /* Found a valid entry, stop searching */ } - raxStop(&ri); } /* If the consumer is blocked on a group, we always serve it @@ -3013,7 +2982,7 @@ void xreadCommand(client *c) { * If there are no entries in the PELs we will unblock the client after min_idle_time. */ if (min_idle_time != -1) { uint64_t pel_expire_time = min_idle_time; - if (min_pel_delivery_time != UINT64_MAX) + if (min_pel_delivery_time != LLONG_MAX) pel_expire_time += min_pel_delivery_time; else pel_expire_time += commandTimeSnapshot(); @@ -3114,8 +3083,8 @@ void streamCleanupEntryCGroupRefs(stream *s, streamID *id) { serverAssert(raxFind(group->pel, buf, sizeof(buf), (void **)&nack)); /* Remove from group and consumer PELs */ + pelListUnlink(group, nack); raxRemove(group->pel, buf, sizeof(buf), NULL); - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, id); raxRemove(nack->consumer->pel, buf, sizeof(buf), NULL); /* Since we're removing all references from the cgroups_ref, we can directly * free the NACK without unlinking it from the cgroups_ref. */ @@ -3167,7 +3136,7 @@ int streamEntryIsReferenced(stream *s, streamID *id) { /* Create a NACK entry setting the delivery count to 1 and the delivery * time to the current time. The NACK consumer will be set to the one * specified as argument of the function. */ -streamNACK *streamCreateNACK(stream *s, streamConsumer *consumer) { +streamNACK *streamCreateNACK(stream *s, streamConsumer *consumer, streamID *id) { size_t usable; streamNACK *nack = zmalloc_usable(sizeof(*nack), &usable); s->alloc_size += usable; @@ -3175,6 +3144,9 @@ streamNACK *streamCreateNACK(stream *s, streamConsumer *consumer) { nack->delivery_count = 1; nack->consumer = consumer; nack->cgroup_ref_node = NULL; /* Will be set when added to cgroups_ref */ + nack->id = *id; + nack->pel_prev = NULL; + nack->pel_next = NULL; return nack; } @@ -3186,17 +3158,28 @@ void streamFreeNACK(stream *s, streamNACK *na) { } /* Free a NACK entry and remove its reference from the cgroups_ref. - * This ensures proper cleanup of the consumer group list associated with the message ID. */ + * This ensures proper cleanup of the consumer group list associated with the message ID. + * Note: Caller must ensure NACK is unlinked from pel_time list before calling. */ void streamDestroyNACK(stream *s, streamNACK *na, unsigned char *key) { size_t usable; + serverAssert(na->pel_prev == NULL && na->pel_next == NULL); streamUnlinkEntryFromCGroupRef(s, na, key); zfree_usable(na, &usable); s->alloc_size -= usable; } -/* Generic version of streamFreeNACK. */ -void streamFreeNACKGeneric(void *na, void *s) { - streamFreeNACK((stream *)s, (streamNACK *)na); +/* Context for streamFreeNACKGeneric callback. */ +typedef struct { + stream *s; + streamCG *cg; +} streamFreeNACKCtx; + +/* Generic version of streamFreeNACK with PEL list unlinking. */ +void streamFreeNACKGeneric(void *na, void *ctx) { + streamFreeNACKCtx *c = (streamFreeNACKCtx *)ctx; + streamNACK *nack = (streamNACK *)na; + pelListUnlink(c->cg, nack); + streamFreeNACK(c->s, nack); } /* Free a consumer and associated data structures. Note that this function @@ -3233,7 +3216,8 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, lo streamCG *cg = zmalloc_usable(sizeof(*cg), &usable); s->alloc_size += usable; cg->pel = raxNewWithMetadata(0, &s->alloc_size); - cg->pel_by_time = raxNewWithMetadata(0, &s->alloc_size); + cg->pel_time_head = NULL; + cg->pel_time_tail = NULL; cg->consumers = raxNewWithMetadata(0, &s->alloc_size); cg->last_id.ms = 0; cg->last_id.seq = 0; @@ -3245,8 +3229,13 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, lo /* Free a consumer group and all its associated data. */ static void streamFreeCG(stream *s, streamCG *cg) { - raxFreeWithCbAndContext(cg->pel, streamFreeNACKGeneric, s); - raxFree(cg->pel_by_time); + /* Free the pel, unlinking each NACK from the time list in the callback */ + streamFreeNACKCtx ctx = {s, cg}; + raxFreeWithCbAndContext(cg->pel, streamFreeNACKGeneric, &ctx); + + /* pel_time_head/tail should now be NULL after unlinking all NACKs */ + serverAssert(cg->pel_time_head == NULL && cg->pel_time_tail == NULL); + raxFreeWithCbAndContext(cg->consumers, streamFreeConsumerGeneric, s); size_t usable; zfree_usable(cg, &usable); @@ -3337,7 +3326,7 @@ void streamDelConsumer(stream *s, streamCG *cg, streamConsumer *consumer) { streamID id; streamDecodeID(ri.key, &id); - raxRemovePelByTime(cg->pel_by_time, nack->delivery_time, &id); + pelListUnlink(cg, nack); raxRemove(cg->pel,ri.key,ri.key_len,NULL); streamFreeNACK(s, nack); @@ -3672,7 +3661,7 @@ void xackCommand(client *c) { void *result; if (raxFind(group->pel,buf,sizeof(buf),&result)) { streamNACK *nack = result; - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &ids[j-3]); + pelListUnlink(group, nack); raxRemove(group->pel,buf,sizeof(buf),NULL); raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); streamDestroyNACK(kv->ptr, nack, buf); @@ -3748,7 +3737,7 @@ void xackdelCommand(client *c) { void *result; if (raxFind(group->pel,buf,sizeof(buf),&result)) { streamNACK *nack = result; - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, id); + pelListUnlink(group, nack); raxRemove(group->pel,buf,sizeof(buf),NULL); raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); streamDestroyNACK(s, nack, buf); @@ -4188,7 +4177,7 @@ void xclaimCommand(client *c) { propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */ server.dirty++; /* Release the NACK */ - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id); + pelListUnlink(group, nack); raxRemove(group->pel,buf,sizeof(buf),NULL); raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); streamDestroyNACK(s, nack, buf); @@ -4203,9 +4192,9 @@ void xclaimCommand(client *c) { * and replication of consumer groups. */ if (force && nack == NULL) { /* Create the NACK. */ - nack = streamCreateNACK(s,NULL); + nack = streamCreateNACK(s, NULL, &id); raxInsert(group->pel,buf,sizeof(buf),nack,NULL); - raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id); + pelListInsertAtTail(group, nack); nack->cgroup_ref_node = streamLinkCGroupToEntry(s, group, buf); } @@ -4230,9 +4219,7 @@ void xclaimCommand(client *c) { } } - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id); - nack->delivery_time = deliverytime; - raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id); + pelListUpdate(group, nack, deliverytime); /* Set the delivery attempts counter if given, otherwise * autoincrement unless JUSTID option provided */ @@ -4390,7 +4377,7 @@ void xautoclaimCommand(client *c) { decrRefCount(idstr); server.dirty++; /* Clear this entry from the PEL, it no longer exists */ - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id); + pelListUnlink(group, nack); raxRemove(group->pel,ri.key,ri.key_len,NULL); raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL); streamDestroyNACK(s, nack, ri.key); @@ -4417,9 +4404,7 @@ void xautoclaimCommand(client *c) { } /* Update the consumer and idle time. */ - raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id); - nack->delivery_time = now; - raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id); + pelListUpdate(group, nack, now); /* Increment the delivery attempts counter unless JUSTID option provided */ if (!justid) @@ -5234,51 +5219,95 @@ int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep) { return 1; } -/* Convert the specified pelTimeKey as a 192 bit big endian number, so - * that the key can be sorted lexicographically. */ -void encodePelTimeKey(void *buf, pelTimeKey *key) { - uint64_t e[3]; - e[0] = htonu64(key->delivery_time); - e[1] = htonu64(key->id.ms); - e[2] = htonu64(key->id.seq); - memcpy(buf,e,sizeof(e)); +/* ----------------------------------------------------------------------- + * PEL Time-Ordered List Helpers + * ----------------------------------------------------------------------- */ + +/* The following functions manage a doubly-linked list of pending entries (NACKs) + * ordered by delivery_time. Almost all NACK updates set delivery_time to current + * time, making this an append-to-tail workload. The doubly-linked list provides + * O(1) unlink from any position, O(1) append to tail, O(1) access to oldest + * entries for CLAIM operations. */ + +/* Insert a NACK at the tail of the PEL time-ordered list. This is used when + * delivery_time is set to current time, which is the common case. */ +static void pelListInsertAtTail(streamCG *cg, streamNACK *nack) { + nack->pel_prev = cg->pel_time_tail; + nack->pel_next = NULL; + if (cg->pel_time_tail) { + cg->pel_time_tail->pel_next = nack; + } else { + cg->pel_time_head = nack; + } + cg->pel_time_tail = nack; } -/* This is the reverse of encodePelTimeKey(): the decoded key will be stored - * in the 'key' structure passed by reference. The buffer 'buf' must point - * to a 192 bit big-endian encoded key. */ -void decodePelTimeKey(void *buf, pelTimeKey *key) { - uint64_t e[3]; - memcpy(e,buf,sizeof(e)); - key->delivery_time = ntohu64(e[0]); - key->id.ms = ntohu64(e[1]); - key->id.seq = ntohu64(e[2]); +/* Unlink a NACK from the PEL time-ordered list. */ +static void pelListUnlink(streamCG *cg, streamNACK *nack) { + if (nack->pel_prev) { + nack->pel_prev->pel_next = nack->pel_next; + } else { + /* Removing head. */ + cg->pel_time_head = nack->pel_next; + } + if (nack->pel_next) { + nack->pel_next->pel_prev = nack->pel_prev; + } else { + /* Removing tail. */ + cg->pel_time_tail = nack->pel_prev; + } + nack->pel_prev = nack->pel_next = NULL; } -/* Helper function to prepare an encoded PEL time key. - * This encapsulates the creation and encoding of a pelTimeKey structure. */ -static inline void preparePelTimeKey(unsigned char *keyBuf, uint64_t delivery_time, streamID *id) { - pelTimeKey timeKey; - timeKey.delivery_time = delivery_time; - timeKey.id = *id; - encodePelTimeKey(keyBuf, &timeKey); +/* Insert a NACK in sorted order by delivery_time. Used for edge cases where + * delivery_time is set to a past time, and also by RDB loading where entries + * may not be time-ordered. We scan backwards from the tail since most times + * are recent, so the common case is still fast. */ +void pelListInsertSorted(streamCG *cg, streamNACK *nack) { + /* Empty list. */ + if (cg->pel_time_head == NULL) { + cg->pel_time_head = cg->pel_time_tail = nack; + nack->pel_prev = nack->pel_next = NULL; + return; + } + + /* Append to tail (common case: delivery_time >= tail time). */ + if (nack->delivery_time >= cg->pel_time_tail->delivery_time) { + pelListInsertAtTail(cg, nack); + return; + } + + /* Prepend to head (rare: delivery_time < head time). */ + if (nack->delivery_time < cg->pel_time_head->delivery_time) { + nack->pel_next = cg->pel_time_head; + nack->pel_prev = NULL; + cg->pel_time_head->pel_prev = nack; + cg->pel_time_head = nack; + return; + } + + /* Insert in middle: scan backwards from tail since most times are recent. */ + streamNACK *curr = cg->pel_time_tail; + while (curr && curr->delivery_time > nack->delivery_time) { + curr = curr->pel_prev; + } + + /* Insert after curr. */ + nack->pel_next = curr->pel_next; + nack->pel_prev = curr; + if (curr->pel_next) { + curr->pel_next->pel_prev = nack; + } + curr->pel_next = nack; } -/* Helper function to insert a NACK into the PEL by time index. - * This encapsulates the encoding and insertion into the pel_by_time rax tree. */ -void raxInsertPelByTime(rax *pel_by_time, uint64_t delivery_time, streamID *id) { - unsigned char keyBuf[sizeof(pelTimeKey)]; - preparePelTimeKey(keyBuf, delivery_time, id); - raxInsert(pel_by_time, keyBuf, sizeof(keyBuf), NULL, NULL); +/* Update a NACK's delivery_time and reposition it in the time-ordered list. */ +static void pelListUpdate(streamCG *cg, streamNACK *nack, mstime_t new_delivery_time) { + pelListUnlink(cg, nack); + nack->delivery_time = new_delivery_time; + pelListInsertSorted(cg, nack); } -/* Helper function to remove a NACK from the PEL by time index. - * This encapsulates the encoding and removal from the pel_by_time rax tree. */ -void raxRemovePelByTime(rax *pel_by_time, uint64_t delivery_time, streamID *id) { - unsigned char keyBuf[sizeof(pelTimeKey)]; - preparePelTimeKey(keyBuf, delivery_time, id); - raxRemove(pel_by_time, keyBuf, sizeof(keyBuf), NULL); -} /* Register stream keys for monitoring of expired pending entries to enable * reactive blocking behavior for XREADGROUP commands with CLAIM. When a client