From 5776d13dccd43ff5fe2c8ad2ea351aefceac5774 Mon Sep 17 00:00:00 2001 From: Sergey Georgiev Date: Wed, 18 Feb 2026 15:39:19 +0200 Subject: [PATCH] fixed: issues from review --- src/aof.c | 21 +++++++++------------ tests/unit/type/stream-cgroups.tcl | 8 ++++---- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/src/aof.c b/src/aof.c index b492e0f39..a1352f0d0 100644 --- a/src/aof.c +++ b/src/aof.c @@ -2369,26 +2369,23 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { } raxStop(&ri_cons); - /* Emit XNACK FORCE for NACKed (unowned) entries in the group - * PEL. These entries have consumer == NULL and are not - * referenced from any consumer's PEL. */ - raxIterator ri_gpel; - raxStart(&ri_gpel, group->pel); - raxSeek(&ri_gpel, "^", NULL, 0); - while (raxNext(&ri_gpel)) { - streamNACK *nack = ri_gpel.data; - if (nack->consumer != NULL) continue; + /* Emit XNACK FORCE for NACKed (unowned) entries from the + * NACK zone of the PEL time-ordered list + * (pel_time_head..pel_nack_tail). */ + streamNACK *nack_end = group->pel_nack_tail; + streamNACK *nack = group->pel_time_head; + for (; nack && nack->pel_prev != nack_end; nack = nack->pel_next) { + unsigned char buf[sizeof(streamID)]; + streamEncodeID(buf, &nack->id); if (rioWriteStreamNackedEntry(r, key, (char*)ri.key, - ri.key_len, ri_gpel.key, + ri.key_len, buf, nack) == 0) { - raxStop(&ri_gpel); raxStop(&ri); streamIteratorStop(&si); return 0; } } - raxStop(&ri_gpel); } raxStop(&ri); } diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 2a71b5da3..92cca92d5 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -3386,8 +3386,8 @@ start_server { set pending [r XPENDING mystream grp - + 10] assert_equal [llength $pending] 1 assert_equal [lindex $pending 0 1] {} - # UINT64_MAX wraps to -1 as signed long long in RESP - assert_equal [lindex $pending 0 3] -1 + # UINT32_MAX delivered as integer in RESP + assert_equal [lindex $pending 0 3] 4294967295 } test {XNACK releases entries regardless of owning consumer} { @@ -3660,8 +3660,8 @@ start_server { set pending [r XPENDING mystream grp - + 10] assert_equal [lindex $pending 0 1] {} - # UINT64_MAX wraps to -1 as signed long long in RESP - assert_equal [lindex $pending 0 3] -1 + # UINT32_MAX delivered as integer in RESP + assert_equal [lindex $pending 0 3] 4294967295 } {} {external:skip needs:debug} start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb-preamble no appendfsync always}} {