Optimize Redis XREADGROUP CLAIM (#14726)

## Overview
This PR optimizes Redis Streams consumer group performance by replacing
the `pel_by_time` rax tree with a doubly-linked list, delivering
significant performance improvements for NACK updates and XREADGROUP
CLAIM operations while also reducing memory usage.

## The Problem
Consumer groups maintain a time-ordered index of pending entries using a
radix tree (`pel_by_time`). Every time a pending entry is reclaimed or
delivered, we need to update its delivery time, which currently
requires:

```c
raxRemovePelByTime(group->pel_by_time, old_time, &id);  // O(k) where k=key length
nack->delivery_time = current_time;
raxInsertPelByTime(group->pel_by_time, current_time, &id); // O(k) where k=key length
```

## The Key Insight

**99% of delivery_time updates set the value to the current time** —
which means they're appending to the tail of a time-ordered structure.

We're using a radix tree (O(k) operations where k is key length, plus
tree traversal overhead) for what is essentially an append-only workload
(should be O(1)).

## The Solution

Replace the rax tree with a doubly-linked list embedded directly in each
`streamNACK`:

```c
typedef struct streamNACK {
    mstime_t delivery_time;
    uint64_t delivery_count;
    streamConsumer *consumer;
    listNode *cgroup_ref_node;
    streamID id;                    // NEW
    struct streamNACK *pel_prev;    // NEW
    struct streamNACK *pel_next;    // NEW
} streamNACK;
```

Now updating a NACK becomes:
```c
pelListUpdate(group, nack, current_time);  // O(1): unlink + append
```

## Why This Works

**Typical case (99%):** Delivery time = current time
- Unlink from current position: O(1) — just update 2-4 pointers
- Append to tail: O(1) — update tail pointer and link

**Edge case (1%):** XCLAIM with explicit past IDLE time
- Still handled correctly by `pelListInsertSorted()` which scans
backward from tail
- Rare enough that O(N) worst case doesn't matter

## Memory Reduction

The linked list approach uses less memory than the rax tree:

**What we add:**
- 3 new fields in `streamNACK`: `id` (16 bytes) + `pel_prev` (8 bytes) +
`pel_next` (8 bytes) = 32 bytes per entry

**What we remove:**
- Entire `pel_by_time` rax tree with its node overhead (~40-50 bytes per
entry)

**Net result:** Lower memory footprint per pending entry, plus better
cache locality from eliminating the separate tree structure.

## Performance Impact

### Theoretical Analysis

| Operation | Before | After |
|-----------|--------|-------|
| NACK update | O(k) × 2 + tree overhead | O(1) |
| CLAIM iteration | O(k) per entry + traversal | O(1) per entry |

*k = key length (32 bytes: timestamp + stream ID)*

For a consumer group with 10,000 pending entries claiming 100 oldest:
- **Before:** Tree traversal + key comparisons for each operation
- **After:** Simple pointer updates

**Key Findings:**
- **28% higher throughput** for XREADGROUP with CLAIM
- **22% lower average latency** (0.195ms → 0.152ms)
- **21% lower P99 latency** (0.212ms → 0.168ms)
- XADD performance unchanged (69K ops/sec both implementations)
This commit is contained in:
Sergei Georgiev 2026-01-27 14:16:16 +02:00 committed by GitHub
parent 37f685908e
commit cecdc99873
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 241 additions and 200 deletions

View file

@ -867,11 +867,26 @@ void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) {
nack->cgroup_ref_node->value = ctx->cg; /* Update the value of cgroups_ref node to the consumer group. */
newnack = activeDefragAlloc(nack);
if (newnack) {
/* Update consumer group pointer to the nack.
* pel_by_time doesn't need updating since delivery time is unchanged. */
/* Update consumer group pointer to the nack. */
void *prev;
raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev);
serverAssert(prev==nack);
/* Update the doubly-linked list pointers in adjacent nacks.
* When we move a nack to a new address, we need to update the
* pel_prev->pel_next and pel_next->pel_prev pointers. */
if (newnack->pel_prev) {
newnack->pel_prev->pel_next = newnack;
} else {
/* This is the head of the list */
ctx->cg->pel_time_head = newnack;
}
if (newnack->pel_next) {
newnack->pel_next->pel_prev = newnack;
} else {
/* This is the tail of the list */
ctx->cg->pel_time_tail = newnack;
}
}
return newnack;
}
@ -912,11 +927,7 @@ void* defragStreamConsumerGroup(raxIterator *ri, void *privdata) {
cg->pel->alloc_size = &s->alloc_size;
defragRadixTree(&cg->pel, 0, NULL, NULL);
}
if (cg->pel_by_time) {
/* Update pel_by_time back-pointer to new stream */
cg->pel_by_time->alloc_size = &s->alloc_size;
defragRadixTree(&cg->pel_by_time, 0, NULL, NULL);
}
/* pel_time_head/tail are just pointers to NACKs in pel, no separate defrag needed */
if (cg->consumers) {
/* Update consumers back-pointer to new stream */
cg->consumers->alloc_size = &s->alloc_size;

View file

@ -3233,7 +3233,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error)
decrRefCount(o);
return NULL;
}
streamNACK *nack = streamCreateNACK(s, NULL);
streamID nack_id;
streamDecodeID(rawid, &nack_id);
streamNACK *nack = streamCreateNACK(s, NULL, &nack_id);
nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
nack->delivery_count = rdbLoadLen(rdb,NULL);
nack->cgroup_ref_node = streamLinkCGroupToEntry(s, cgroup, rawid);
@ -3251,9 +3253,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error)
return NULL;
}
streamID id;
streamDecodeID(rawid, &id);
raxInsertPelByTime(cgroup->pel_by_time, nack->delivery_time, &id);
/* Insert in sorted order since RDB entries may not be time-ordered */
pelListInsertSorted(cgroup, nack);
}
/* Now that we loaded our global PEL, we need to load the

View file

@ -86,6 +86,9 @@ typedef struct streamIterator {
unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;
/* Forward declarations */
typedef struct streamNACK streamNACK;
/* Consumer group. */
typedef struct streamCG {
streamID last_id; /* Last delivered (not acknowledged) ID for this
@ -102,11 +105,12 @@ typedef struct streamCG {
as processed. The key of the radix tree is the
ID as a 64 bit big endian number, while the
associated value is a streamNACK structure.*/
rax *pel_by_time; /* A radix tree mapping delivery time to pending
entries, so that we can query faster PEL entries
by time. The key is a pelTimeKey structure containing
both delivery_time and stream ID. All information is
in the key; no value is stored. */
streamNACK *pel_time_head; /* Head of time-ordered doubly-linked list of pending
entries (oldest delivery_time). Used for efficient
CLAIM operations. O(1) access to oldest entries. */
streamNACK *pel_time_tail; /* Tail of time-ordered doubly-linked list of pending
entries (newest delivery_time). O(1) append for
updates that set delivery_time to current time. */
rax *consumers; /* A radix tree representing the consumers by name
and their associated representation in the form
of streamConsumer structures. */
@ -129,13 +133,16 @@ typedef struct streamConsumer {
} streamConsumer;
/* Pending (yet not acknowledged) message in a consumer group. */
typedef struct streamNACK {
struct streamNACK {
mstime_t delivery_time; /* Last time this message was delivered. */
uint64_t delivery_count; /* Number of times this message was delivered.*/
streamConsumer *consumer; /* The consumer this message was delivered to
in the last delivery. */
listNode *cgroup_ref_node; /* Reference to this NACK in the cgroups_ref list. */
} streamNACK;
streamID id; /* Stream ID for this pending entry. */
struct streamNACK *pel_prev; /* Previous NACK in time-ordered doubly-linked list. */
struct streamNACK *pel_next; /* Next NACK in time-ordered doubly-linked list. */
};
/* Stream propagation information, passed to functions in order to propagate
* XCLAIM commands to AOF and slaves. */
@ -144,12 +151,6 @@ typedef struct streamPropInfo {
robj *groupname;
} streamPropInfo;
/* Pending entry in the consumer group's PEL, indexed by delivery time. */
typedef struct pelTimeKey {
uint64_t delivery_time;
streamID id;
} pelTimeKey;
/* Prototypes of exported APIs. */
struct client;
@ -173,10 +174,11 @@ streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
streamConsumer *streamCreateConsumer(stream *s, streamCG *cg, sds name, robj *key, int dbid, int flags);
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read);
streamNACK *streamCreateNACK(stream *s, streamConsumer *consumer);
streamNACK *streamCreateNACK(stream *s, streamConsumer *consumer, streamID *id);
void streamDecodeID(void *buf, streamID *id);
int streamCompareID(streamID *a, streamID *b);
void streamFreeNACK(stream *s, streamNACK *na);
void streamDestroyNACK(stream *s, streamNACK *na, unsigned char *key);
int streamIncrID(streamID *id);
int streamDecrID(streamID *id);
void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername);
@ -193,10 +195,8 @@ int64_t streamTrimByID(stream *s, streamID minid, int approx);
listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, unsigned char *key);
void encodePelTimeKey(void* buf, pelTimeKey *timeKey);
void decodePelTimeKey(void *buf, pelTimeKey *timeKey);
void raxInsertPelByTime(rax *pel_by_time, uint64_t delivery_time, streamID *id);
void raxRemovePelByTime(rax *pel_by_time, uint64_t delivery_time, streamID *id);
/* PEL time list management (used by RDB loading) */
void pelListInsertSorted(streamCG *cg, streamNACK *nack);
/* IDMP functions */
idmpEntry *idmpEntryCreate(const char *iid, size_t iid_len, size_t *alloc_size);

View file

@ -55,6 +55,11 @@ static idmpProducer *idmpGetOrCreateProducer(stream *s, const char *pid, size_t
static int createIdempotencyHash(robj **argv, int64_t numfields, XXH128_hash_t *out_hash);
static void idmpEvictOldestEntry(stream *s, idmpProducer *producer);
/* Forward declarations for PEL time list functions */
static void pelListInsertAtTail(streamCG *cg, streamNACK *nack);
static void pelListUnlink(streamCG *cg, streamNACK *nack);
static void pelListUpdate(streamCG *cg, streamNACK *nack, mstime_t new_delivery_time);
/* -----------------------------------------------------------------------
* Low level stream encoding: a radix tree of listpacks.
* ----------------------------------------------------------------------- */
@ -239,15 +244,16 @@ robj *streamDup(robj *o) {
raxSeek(&ri_cg_pel,"^",NULL,0);
while(raxNext(&ri_cg_pel)){
streamNACK *nack = ri_cg_pel.data;
streamNACK *new_nack = streamCreateNACK(new_s, NULL);
streamID nack_id;
streamDecodeID(ri_cg_pel.key, &nack_id);
streamNACK *new_nack = streamCreateNACK(new_s, NULL, &nack_id);
new_nack->delivery_time = nack->delivery_time;
new_nack->delivery_count = nack->delivery_count;
new_nack->cgroup_ref_node = streamLinkCGroupToEntry(new_s, new_cg, ri_cg_pel.key);
raxInsert(new_cg->pel, ri_cg_pel.key, sizeof(streamID), new_nack, NULL);
streamID id;
streamDecodeID(ri_cg_pel.key, &id);
raxInsertPelByTime(new_cg->pel_by_time, new_nack->delivery_time, &id);
/* Insert in sorted order to preserve ordering */
pelListInsertSorted(new_cg, new_nack);
}
raxStop(&ri_cg_pel);
@ -1963,76 +1969,41 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
if (group && min_idle_time != -1) {
arraylen_ptr = addReplyDeferredLen(c);
/* Scan the group's pending entries list (PEL) to find messages that have been
* idle for at least min_idle_time milliseconds. The pel_by_time radix tree
* stores entries ordered by their last delivery timestamp, allowing us to
* efficiently iterate from oldest to newest.
/* Scan and process the group's pending entries list (PEL) in a single loop.
* To prevent a dead loop caused by pelListUpdate() moving elements from the
* beginning to the end of the list, we store the current tail pointer before
* processing. We iterate only up to this pre-determined boundary, ensuring we
* never process entries that are added or moved during iteration.
*
* We collect eligible entries into a temporary list rather than processing
* them inline because:
* 1. We cannot safely modify a radix tree while iterating over it
* 2. The claiming process requires removing and re-inserting entries in
* both pel_by_time and the consumer PELs
*
* The iteration can terminate early in two cases:
* 1. We find an entry that hasn't been idle long enough - due to time-based
* ordering, all subsequent entries will be even newer
* 2. We've collected enough entries to satisfy the requested count limit */
list *eligible_pels = listCreate();
listSetFreeMethod(eligible_pels, zfree);
raxIterator ri;
raxStart(&ri, group->pel_by_time);
raxSeek(&ri, "^", NULL, 0);
while (raxNext(&ri)) {
pelTimeKey pelKey;
decodePelTimeKey(ri.key, &pelKey);
uint64_t idle = cmd_time_snapshot - pelKey.delivery_time;
if (idle < (uint64_t)min_idle_time)
break;
/* Store a copy of the key for later processing */
pelTimeKey *keyCopy = zmalloc(sizeof(pelTimeKey));
memcpy(keyCopy, &pelKey, sizeof(pelTimeKey));
listAddNodeTail(eligible_pels, keyCopy);
* The iteration can terminate early when:
* 1. We find an entry that hasn't been idle long enough
* 2. We've processed enough entries to satisfy the count limit
* 3. We reach the pre-stored tail boundary */
/* Store the current tail to prevent infinite loops */
streamNACK *tail = group->pel_time_tail;
size_t processed = 0;
streamNACK *nack = group->pel_time_head;
while (nack) {
/* Capture next pointer BEFORE modifications (pelListUpdate may reorder) */
streamNACK *next = nack->pel_next;
if (count && listLength(eligible_pels) >= count) break;
}
raxStop(&ri);
uint64_t idle = cmd_time_snapshot - nack->delivery_time;
if (idle < (uint64_t)min_idle_time) break;
/* Process each eligible pending entry, claiming it for the current consumer.
* For each entry we:
* 1. Fetch the actual message data from the stream
* 2. Send the message to the client with metadata (idle time, delivery count)
* 3. Transfer ownership from the previous consumer to the current consumer
* 4. Update all relevant data structures and propagate the claim operation */
listIter li;
listNode *ln;
listRewind(eligible_pels, &li);
while ((ln = listNext(&li))) {
pelTimeKey *pelKey = (pelTimeKey*)listNodeValue(ln);
unsigned char buf[sizeof(streamID)];
streamEncodeID(buf, &pelKey->id);
void *result;
streamNACK *nack = NULL;
uint64_t delivery_count = 0;
/* Must exist, we got the ID from pel_by_time */
serverAssert(raxFind(group->pel,buf,sizeof(buf),&result));
nack = (streamNACK*)result;
delivery_count = nack->delivery_count;
/* Process and claim this entry */
uint64_t delivery_count = nack->delivery_count;
streamID pel_id;
streamIteratorStart(&si,s,&pelKey->id,&pelKey->id,rev);
streamIteratorStart(&si,s,&nack->id,&nack->id,rev);
if (streamIteratorGetID(&si,&pel_id,&numfields)) {
/* Emit a four elements array: ID, array of field-value pairs,
* idle time and delivery count. */
robj *idarg = createObjectFromStreamID(&pel_id);
addReplyArrayLen(c,4);
addReplyBulk(c,idarg);
addReplyArrayLen(c,numfields*2);
/* Emit the field-value pairs. */
/* Emit field-value pairs */
while (numfields--) {
unsigned char *key, *value;
int64_t key_len, value_len;
@ -2041,24 +2012,23 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
addReplyBulkCBuffer(c,value,value_len);
}
uint64_t idle = cmd_time_snapshot - pelKey->delivery_time;
addReplyLongLong(c, idle);
addReplyLongLong(c, delivery_count);
/* Remove the NACK from old consumer and time-based PEL. */
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &pel_id);
/* Transfer NACK to new consumer with updated metadata. */
nack->consumer = consumer;
nack->delivery_time = cmd_time_snapshot;
/* Transfer ownership if needed */
if (nack->consumer != consumer) {
unsigned char buf[sizeof(streamID)];
streamEncodeID(buf, &nack->id);
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
nack->consumer = consumer;
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
}
nack->delivery_count++;
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &pel_id);
pelListUpdate(group, nack, cmd_time_snapshot); /* Moves element from beginning to end of list */
consumer->active_time = cmd_time_snapshot;
/* Propagate as XCLAIM. */
/* Propagate as XCLAIM */
if (spi) {
robj *delivery_count = createStringObjectFromLongLong(nack->delivery_count);
streamPropagateXCLAIMCopyFree(db_id,spi->keyname,group_last_id,spi->groupname,idarg,consumername,delivery_time,delivery_count);
@ -2067,10 +2037,18 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
}
decrRefCount(idarg);
arraylen++;
/* Check count limit */
if (count && ++processed >= count) {
streamIteratorStop(&si);
break;
}
}
streamIteratorStop(&si);
streamIteratorStop(&si);
/* Advance to next, stopping if we reached the tail */
nack = (nack == tail) ? NULL : next;
}
listRelease(eligible_pels);
}
/* If the client is asking for some history, we serve it using a
* different function, so that we return entries *solely* from its
@ -2168,11 +2146,9 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
/* Try to add a new NACK. Most of the time this will work and
* will not require extra lookups. We'll fix the problem later
* if we find that there is already an entry for this ID. */
streamNACK *nack = streamCreateNACK(s, consumer);
streamNACK *nack = streamCreateNACK(s, consumer, &id);
int group_inserted =
raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL);
int consumer_inserted =
raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
/* Now we can check if the entry was already busy, and
* in that case reassign the entry to the new consumer,
@ -2183,24 +2159,22 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
int found = raxFind(group->pel,buf,sizeof(buf),&result);
serverAssert(found);
nack = result;
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
/* Remove old entry from the PEL by time. */
raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id);
/* Update the consumer and NACK metadata. */
nack->consumer = consumer;
nack->delivery_time = cmd_time_snapshot;
/* Only transfer between consumers if they're different */
if (nack->consumer != consumer) {
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
nack->consumer = consumer;
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
}
nack->delivery_count = 1;
/* Add the entry in the new consumer local PEL. */
/* Update delivery time and reposition in time list */
pelListUpdate(group, nack, cmd_time_snapshot);
} else {
/* New NACK - insert into consumer's PEL and time list */
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
} else if (group_inserted == 1 && consumer_inserted == 1) {
nack->cgroup_ref_node = streamLinkCGroupToEntry(s, group, buf);
} else if (group_inserted == 1 && consumer_inserted == 0) {
serverPanic("NACK half-created. Should not be possible.");
pelListInsertAtTail(group, nack);
}
/* We have new NACK or updated existing one. */
raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id);
consumer->active_time = cmd_time_snapshot;
/* Propagate as XCLAIM. */
@ -2272,12 +2246,8 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
addReplyNullArray(c);
} else {
streamNACK *nack = ri.data;
raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &thisid);
nack->delivery_time = commandTimeSnapshot();
nack->delivery_count++;
raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &thisid);
pelListUpdate(group, nack, commandTimeSnapshot());
}
arraylen++;
}
@ -2849,7 +2819,7 @@ void xreadCommand(client *c) {
/* Try to serve the client synchronously. */
size_t arraylen = 0;
void *arraylen_ptr = NULL;
uint64_t min_pel_delivery_time = UINT64_MAX;
mstime_t min_pel_delivery_time = LLONG_MAX;
for (int i = 0; i < streams_count; i++) {
kvobj *o = lookupKeyRead(c->db, c->argv[streams_arg + i]);
if (o == NULL) continue;
@ -2869,26 +2839,25 @@ void xreadCommand(client *c) {
* get the minimum delivery time in the PEL, in order to use it
* later if block option is set. */
if (min_idle_time != -1) {
raxIterator ri;
raxStart(&ri, groups[i]->pel_by_time);
raxSeek(&ri, "^", NULL, 0);
while(raxNext(&ri)) {
pelTimeKey timeKey;
decodePelTimeKey(ri.key, &timeKey);
if (!streamEntryExists(s, &timeKey.id))
streamNACK *nack = groups[i]->pel_time_head;
/* Iterate through PEL entries to find the first one that exists */
while (nack) {
/* Skip entries that don't exist in the stream anymore */
if (!streamEntryExists(s, &nack->id)) {
nack = nack->pel_next;
continue;
if (timeKey.delivery_time < min_pel_delivery_time) {
min_pel_delivery_time = timeKey.delivery_time;
}
uint64_t idle = commandTimeSnapshot() - timeKey.delivery_time;
if (nack->delivery_time < min_pel_delivery_time) {
min_pel_delivery_time = nack->delivery_time;
}
uint64_t idle = commandTimeSnapshot() - nack->delivery_time;
if (idle >= (uint64_t)min_idle_time) {
serve_claimed = 1;
}
break;
break; /* Found a valid entry, stop searching */
}
raxStop(&ri);
}
/* If the consumer is blocked on a group, we always serve it
@ -3013,7 +2982,7 @@ void xreadCommand(client *c) {
* If there are no entries in the PELs we will unblock the client after min_idle_time. */
if (min_idle_time != -1) {
uint64_t pel_expire_time = min_idle_time;
if (min_pel_delivery_time != UINT64_MAX)
if (min_pel_delivery_time != LLONG_MAX)
pel_expire_time += min_pel_delivery_time;
else
pel_expire_time += commandTimeSnapshot();
@ -3114,8 +3083,8 @@ void streamCleanupEntryCGroupRefs(stream *s, streamID *id) {
serverAssert(raxFind(group->pel, buf, sizeof(buf), (void **)&nack));
/* Remove from group and consumer PELs */
pelListUnlink(group, nack);
raxRemove(group->pel, buf, sizeof(buf), NULL);
raxRemovePelByTime(group->pel_by_time, nack->delivery_time, id);
raxRemove(nack->consumer->pel, buf, sizeof(buf), NULL);
/* Since we're removing all references from the cgroups_ref, we can directly
* free the NACK without unlinking it from the cgroups_ref. */
@ -3167,7 +3136,7 @@ int streamEntryIsReferenced(stream *s, streamID *id) {
/* Create a NACK entry setting the delivery count to 1 and the delivery
* time to the current time. The NACK consumer will be set to the one
* specified as argument of the function. */
streamNACK *streamCreateNACK(stream *s, streamConsumer *consumer) {
streamNACK *streamCreateNACK(stream *s, streamConsumer *consumer, streamID *id) {
size_t usable;
streamNACK *nack = zmalloc_usable(sizeof(*nack), &usable);
s->alloc_size += usable;
@ -3175,6 +3144,9 @@ streamNACK *streamCreateNACK(stream *s, streamConsumer *consumer) {
nack->delivery_count = 1;
nack->consumer = consumer;
nack->cgroup_ref_node = NULL; /* Will be set when added to cgroups_ref */
nack->id = *id;
nack->pel_prev = NULL;
nack->pel_next = NULL;
return nack;
}
@ -3186,17 +3158,28 @@ void streamFreeNACK(stream *s, streamNACK *na) {
}
/* Free a NACK entry and remove its reference from the cgroups_ref.
* This ensures proper cleanup of the consumer group list associated with the message ID. */
* This ensures proper cleanup of the consumer group list associated with the message ID.
* Note: Caller must ensure NACK is unlinked from pel_time list before calling. */
void streamDestroyNACK(stream *s, streamNACK *na, unsigned char *key) {
size_t usable;
serverAssert(na->pel_prev == NULL && na->pel_next == NULL);
streamUnlinkEntryFromCGroupRef(s, na, key);
zfree_usable(na, &usable);
s->alloc_size -= usable;
}
/* Generic version of streamFreeNACK. */
void streamFreeNACKGeneric(void *na, void *s) {
streamFreeNACK((stream *)s, (streamNACK *)na);
/* Context for streamFreeNACKGeneric callback. */
typedef struct {
stream *s;
streamCG *cg;
} streamFreeNACKCtx;
/* Generic version of streamFreeNACK with PEL list unlinking. */
void streamFreeNACKGeneric(void *na, void *ctx) {
streamFreeNACKCtx *c = (streamFreeNACKCtx *)ctx;
streamNACK *nack = (streamNACK *)na;
pelListUnlink(c->cg, nack);
streamFreeNACK(c->s, nack);
}
/* Free a consumer and associated data structures. Note that this function
@ -3233,7 +3216,8 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, lo
streamCG *cg = zmalloc_usable(sizeof(*cg), &usable);
s->alloc_size += usable;
cg->pel = raxNewWithMetadata(0, &s->alloc_size);
cg->pel_by_time = raxNewWithMetadata(0, &s->alloc_size);
cg->pel_time_head = NULL;
cg->pel_time_tail = NULL;
cg->consumers = raxNewWithMetadata(0, &s->alloc_size);
cg->last_id.ms = 0;
cg->last_id.seq = 0;
@ -3245,8 +3229,13 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, lo
/* Free a consumer group and all its associated data. */
static void streamFreeCG(stream *s, streamCG *cg) {
raxFreeWithCbAndContext(cg->pel, streamFreeNACKGeneric, s);
raxFree(cg->pel_by_time);
/* Free the pel, unlinking each NACK from the time list in the callback */
streamFreeNACKCtx ctx = {s, cg};
raxFreeWithCbAndContext(cg->pel, streamFreeNACKGeneric, &ctx);
/* pel_time_head/tail should now be NULL after unlinking all NACKs */
serverAssert(cg->pel_time_head == NULL && cg->pel_time_tail == NULL);
raxFreeWithCbAndContext(cg->consumers, streamFreeConsumerGeneric, s);
size_t usable;
zfree_usable(cg, &usable);
@ -3337,7 +3326,7 @@ void streamDelConsumer(stream *s, streamCG *cg, streamConsumer *consumer) {
streamID id;
streamDecodeID(ri.key, &id);
raxRemovePelByTime(cg->pel_by_time, nack->delivery_time, &id);
pelListUnlink(cg, nack);
raxRemove(cg->pel,ri.key,ri.key_len,NULL);
streamFreeNACK(s, nack);
@ -3672,7 +3661,7 @@ void xackCommand(client *c) {
void *result;
if (raxFind(group->pel,buf,sizeof(buf),&result)) {
streamNACK *nack = result;
raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &ids[j-3]);
pelListUnlink(group, nack);
raxRemove(group->pel,buf,sizeof(buf),NULL);
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
streamDestroyNACK(kv->ptr, nack, buf);
@ -3748,7 +3737,7 @@ void xackdelCommand(client *c) {
void *result;
if (raxFind(group->pel,buf,sizeof(buf),&result)) {
streamNACK *nack = result;
raxRemovePelByTime(group->pel_by_time, nack->delivery_time, id);
pelListUnlink(group, nack);
raxRemove(group->pel,buf,sizeof(buf),NULL);
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
streamDestroyNACK(s, nack, buf);
@ -4188,7 +4177,7 @@ void xclaimCommand(client *c) {
propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */
server.dirty++;
/* Release the NACK */
raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id);
pelListUnlink(group, nack);
raxRemove(group->pel,buf,sizeof(buf),NULL);
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
streamDestroyNACK(s, nack, buf);
@ -4203,9 +4192,9 @@ void xclaimCommand(client *c) {
* and replication of consumer groups. */
if (force && nack == NULL) {
/* Create the NACK. */
nack = streamCreateNACK(s,NULL);
nack = streamCreateNACK(s, NULL, &id);
raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id);
pelListInsertAtTail(group, nack);
nack->cgroup_ref_node = streamLinkCGroupToEntry(s, group, buf);
}
@ -4230,9 +4219,7 @@ void xclaimCommand(client *c) {
}
}
raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id);
nack->delivery_time = deliverytime;
raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id);
pelListUpdate(group, nack, deliverytime);
/* Set the delivery attempts counter if given, otherwise
* autoincrement unless JUSTID option provided */
@ -4390,7 +4377,7 @@ void xautoclaimCommand(client *c) {
decrRefCount(idstr);
server.dirty++;
/* Clear this entry from the PEL, it no longer exists */
raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id);
pelListUnlink(group, nack);
raxRemove(group->pel,ri.key,ri.key_len,NULL);
raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL);
streamDestroyNACK(s, nack, ri.key);
@ -4417,9 +4404,7 @@ void xautoclaimCommand(client *c) {
}
/* Update the consumer and idle time. */
raxRemovePelByTime(group->pel_by_time, nack->delivery_time, &id);
nack->delivery_time = now;
raxInsertPelByTime(group->pel_by_time, nack->delivery_time, &id);
pelListUpdate(group, nack, now);
/* Increment the delivery attempts counter unless JUSTID option provided */
if (!justid)
@ -5234,51 +5219,95 @@ int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep) {
return 1;
}
/* Convert the specified pelTimeKey as a 192 bit big endian number, so
* that the key can be sorted lexicographically. */
void encodePelTimeKey(void *buf, pelTimeKey *key) {
uint64_t e[3];
e[0] = htonu64(key->delivery_time);
e[1] = htonu64(key->id.ms);
e[2] = htonu64(key->id.seq);
memcpy(buf,e,sizeof(e));
/* -----------------------------------------------------------------------
* PEL Time-Ordered List Helpers
* ----------------------------------------------------------------------- */
/* The following functions manage a doubly-linked list of pending entries (NACKs)
* ordered by delivery_time. Almost all NACK updates set delivery_time to current
* time, making this an append-to-tail workload. The doubly-linked list provides
* O(1) unlink from any position, O(1) append to tail, O(1) access to oldest
* entries for CLAIM operations. */
/* Insert a NACK at the tail of the PEL time-ordered list. This is used when
* delivery_time is set to current time, which is the common case. */
static void pelListInsertAtTail(streamCG *cg, streamNACK *nack) {
nack->pel_prev = cg->pel_time_tail;
nack->pel_next = NULL;
if (cg->pel_time_tail) {
cg->pel_time_tail->pel_next = nack;
} else {
cg->pel_time_head = nack;
}
cg->pel_time_tail = nack;
}
/* This is the reverse of encodePelTimeKey(): the decoded key will be stored
* in the 'key' structure passed by reference. The buffer 'buf' must point
* to a 192 bit big-endian encoded key. */
void decodePelTimeKey(void *buf, pelTimeKey *key) {
uint64_t e[3];
memcpy(e,buf,sizeof(e));
key->delivery_time = ntohu64(e[0]);
key->id.ms = ntohu64(e[1]);
key->id.seq = ntohu64(e[2]);
/* Unlink a NACK from the PEL time-ordered list. */
static void pelListUnlink(streamCG *cg, streamNACK *nack) {
if (nack->pel_prev) {
nack->pel_prev->pel_next = nack->pel_next;
} else {
/* Removing head. */
cg->pel_time_head = nack->pel_next;
}
if (nack->pel_next) {
nack->pel_next->pel_prev = nack->pel_prev;
} else {
/* Removing tail. */
cg->pel_time_tail = nack->pel_prev;
}
nack->pel_prev = nack->pel_next = NULL;
}
/* Helper function to prepare an encoded PEL time key.
* This encapsulates the creation and encoding of a pelTimeKey structure. */
static inline void preparePelTimeKey(unsigned char *keyBuf, uint64_t delivery_time, streamID *id) {
pelTimeKey timeKey;
timeKey.delivery_time = delivery_time;
timeKey.id = *id;
encodePelTimeKey(keyBuf, &timeKey);
/* Insert a NACK in sorted order by delivery_time. Used for edge cases where
* delivery_time is set to a past time, and also by RDB loading where entries
* may not be time-ordered. We scan backwards from the tail since most times
* are recent, so the common case is still fast. */
void pelListInsertSorted(streamCG *cg, streamNACK *nack) {
/* Empty list. */
if (cg->pel_time_head == NULL) {
cg->pel_time_head = cg->pel_time_tail = nack;
nack->pel_prev = nack->pel_next = NULL;
return;
}
/* Append to tail (common case: delivery_time >= tail time). */
if (nack->delivery_time >= cg->pel_time_tail->delivery_time) {
pelListInsertAtTail(cg, nack);
return;
}
/* Prepend to head (rare: delivery_time < head time). */
if (nack->delivery_time < cg->pel_time_head->delivery_time) {
nack->pel_next = cg->pel_time_head;
nack->pel_prev = NULL;
cg->pel_time_head->pel_prev = nack;
cg->pel_time_head = nack;
return;
}
/* Insert in middle: scan backwards from tail since most times are recent. */
streamNACK *curr = cg->pel_time_tail;
while (curr && curr->delivery_time > nack->delivery_time) {
curr = curr->pel_prev;
}
/* Insert after curr. */
nack->pel_next = curr->pel_next;
nack->pel_prev = curr;
if (curr->pel_next) {
curr->pel_next->pel_prev = nack;
}
curr->pel_next = nack;
}
/* Helper function to insert a NACK into the PEL by time index.
* This encapsulates the encoding and insertion into the pel_by_time rax tree. */
void raxInsertPelByTime(rax *pel_by_time, uint64_t delivery_time, streamID *id) {
unsigned char keyBuf[sizeof(pelTimeKey)];
preparePelTimeKey(keyBuf, delivery_time, id);
raxInsert(pel_by_time, keyBuf, sizeof(keyBuf), NULL, NULL);
/* Update a NACK's delivery_time and reposition it in the time-ordered list. */
static void pelListUpdate(streamCG *cg, streamNACK *nack, mstime_t new_delivery_time) {
pelListUnlink(cg, nack);
nack->delivery_time = new_delivery_time;
pelListInsertSorted(cg, nack);
}
/* Helper function to remove a NACK from the PEL by time index.
* This encapsulates the encoding and removal from the pel_by_time rax tree. */
void raxRemovePelByTime(rax *pel_by_time, uint64_t delivery_time, streamID *id) {
unsigned char keyBuf[sizeof(pelTimeKey)];
preparePelTimeKey(keyBuf, delivery_time, id);
raxRemove(pel_by_time, keyBuf, sizeof(keyBuf), NULL);
}
/* Register stream keys for monitoring of expired pending entries to enable
* reactive blocking behavior for XREADGROUP commands with CLAIM. When a client