diff --git a/src/commands.c b/src/commands.c index 700c810eb..37f031683 100644 --- a/src/commands.c +++ b/src/commands.c @@ -5922,7 +5922,10 @@ struct redisCommandArg XADD_Args[] = { /********** XAUTOCLAIM ********************/ /* XAUTOCLAIM history */ -#define XAUTOCLAIM_History NULL +commandHistory XAUTOCLAIM_History[] = { +{"7.0.0","Added an element to the reply array, containing deleted entries the command cleared from the PEL"}, +{0} +}; /* XAUTOCLAIM tips */ const char *XAUTOCLAIM_tips[] = { diff --git a/src/commands/xautoclaim.json b/src/commands/xautoclaim.json index b951eac29..726bf38fe 100644 --- a/src/commands/xautoclaim.json +++ b/src/commands/xautoclaim.json @@ -6,6 +6,12 @@ "since": "6.2.0", "arity": -6, "function": "xautoclaimCommand", + "history": [ + [ + "7.0.0", + "Added an element to the reply array, containing deleted entries the command cleared from the PEL" + ] + ], "command_flags": [ "WRITE", "FAST" diff --git a/src/t_stream.c b/src/t_stream.c index e47194926..9ff6a17ac 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1325,6 +1325,20 @@ void streamIteratorStop(streamIterator *si) { raxStop(&si->ri); } +/* Return 1 if `id` exists in `s` (and not marked as deleted) */ +int streamEntryExists(stream *s, streamID *id) { + streamIterator si; + streamIteratorStart(&si,s,id,id,0); + streamID myid; + int64_t numfields; + int found = streamIteratorGetID(&si,&myid,&numfields); + streamIteratorStop(&si); + if (!found) + return 0; + serverAssert(streamCompareID(id,&myid) == 0); + return 1; +} + /* Delete the specified item ID from the stream, returning 1 if the item * was deleted 0 otherwise (if it does not exist). */ int streamDeleteItem(stream *s, streamID *id) { @@ -2980,23 +2994,28 @@ void xclaimCommand(client *c) { /* Lookup the ID in the group PEL. */ streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); + /* Item must exist for us to transfer it to another consumer. */ + if (!streamEntryExists(o->ptr,&id)) { + /* Clear this entry from the PEL, it no longer exists */ + if (nack != raxNotFound) { + /* Propagate this change (we are going to delete the NACK). */ + streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack); + propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */ + server.dirty++; + /* Release the NACK */ + raxRemove(group->pel,buf,sizeof(buf),NULL); + raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + streamFreeNACK(nack); + } + continue; + } + /* If FORCE is passed, let's check if at least the entry * exists in the Stream. In such case, we'll create a new * entry in the PEL from scratch, so that XCLAIM can also * be used to create entries in the PEL. Useful for AOF * and replication of consumer groups. */ if (force && nack == raxNotFound) { - streamIterator myiterator; - streamIteratorStart(&myiterator,o->ptr,&id,&id,0); - int64_t numfields; - int found = 0; - streamID item_id; - if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1; - streamIteratorStop(&myiterator); - - /* Item must exist for us to create a NACK for it. */ - if (!found) continue; - /* Create the NACK. */ nack = streamCreateNACK(NULL); raxInsert(group->pel,buf,sizeof(buf),nack,NULL); @@ -3013,6 +3032,7 @@ void xclaimCommand(client *c) { mstime_t this_idle = now - nack->delivery_time; if (this_idle < minidle) continue; } + if (consumer == NULL && (consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL) { @@ -3042,9 +3062,7 @@ void xclaimCommand(client *c) { if (justid) { addReplyStreamID(c,&id); } else { - size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0, - NULL,NULL,STREAM_RWR_RAWENTRIES,NULL); - if (!emitted) addReplyNull(c); + serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1); } arraylen++; @@ -3138,9 +3156,9 @@ void xautoclaimCommand(client *c) { streamConsumer *consumer = NULL; long long attempts = count*10; - addReplyArrayLen(c, 2); - void *endidptr = addReplyDeferredLen(c); - void *arraylenptr = addReplyDeferredLen(c); + addReplyArrayLen(c, 3); /* We add another reply later */ + void *endidptr = addReplyDeferredLen(c); /* reply[0] */ + void *arraylenptr = addReplyDeferredLen(c); /* reply[1] */ unsigned char startkey[sizeof(streamID)]; streamEncodeID(startkey,&startid); @@ -3150,18 +3168,37 @@ void xautoclaimCommand(client *c) { size_t arraylen = 0; mstime_t now = mstime(); sds name = c->argv[3]->ptr; + streamID *deleted_ids = zmalloc(count * sizeof(streamID)); + int deleted_id_num = 0; while (attempts-- && count && raxNext(&ri)) { streamNACK *nack = ri.data; + streamID id; + streamDecodeID(ri.key, &id); + + /* Item must exist for us to transfer it to another consumer. */ + if (!streamEntryExists(o->ptr,&id)) { + /* Propagate this change (we are going to delete the NACK). */ + robj *idstr = createObjectFromStreamID(&id); + streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack); + decrRefCount(idstr); + server.dirty++; + /* Clear this entry from the PEL, it no longer exists */ + raxRemove(group->pel,ri.key,ri.key_len,NULL); + raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL); + streamFreeNACK(nack); + /* Remember the ID for later */ + deleted_ids[deleted_id_num++] = id; + raxSeek(&ri,">=",ri.key,ri.key_len); + continue; + } + if (minidle) { mstime_t this_idle = now - nack->delivery_time; if (this_idle < minidle) continue; } - streamID id; - streamDecodeID(ri.key, &id); - if (consumer == NULL && (consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL) { @@ -3191,11 +3228,7 @@ void xautoclaimCommand(client *c) { if (justid) { addReplyStreamID(c,&id); } else { - size_t emitted = - streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL); - if (!emitted) - addReplyNull(c); + serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1); } arraylen++; count--; @@ -3221,6 +3254,12 @@ void xautoclaimCommand(client *c) { setDeferredArrayLen(c,arraylenptr,arraylen); setDeferredReplyStreamID(c,endidptr,&endid); + addReplyArrayLen(c, deleted_id_num); /* reply[2] */ + for (int i = 0; i < deleted_id_num; i++) { + addReplyStreamID(c, &deleted_ids[i]); + } + zfree(deleted_ids); + preventCommandPropagation(c); } diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index ae8da27b8..9faa51808 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -355,24 +355,22 @@ start_server { # Delete item 2 from the stream. Now consumer 1 has PEL that contains # only item 3. Try to use consumer 2 to claim the deleted item 2 - # from the PEL of consumer 1, this should return nil + # from the PEL of consumer 1, this should be NOP r XDEL mystream $id2 set reply [ r XCLAIM mystream mygroup consumer2 10 $id2 ] - assert {[llength $reply] == 1} - assert_equal "" [lindex $reply 0] + assert {[llength $reply] == 0} # Delete item 3 from the stream. Now consumer 1 has PEL that is empty. # Try to use consumer 2 to claim the deleted item 3 from the PEL - # of consumer 1, this should return nil + # of consumer 1, this should be NOP after 200 r XDEL mystream $id3 set reply [ r XCLAIM mystream mygroup consumer2 10 $id3 ] - assert {[llength $reply] == 1} - assert_equal "" [lindex $reply 0] + assert {[llength $reply] == 0} } test {XCLAIM without JUSTID increments delivery count} { @@ -445,6 +443,7 @@ start_server { set id1 [r XADD mystream * a 1] set id2 [r XADD mystream * b 2] set id3 [r XADD mystream * c 3] + set id4 [r XADD mystream * d 4] r XGROUP CREATE mystream mygroup 0 # Consumer 1 reads item 1 from the stream without acknowledgements. @@ -454,7 +453,7 @@ start_server { assert_equal [lindex $reply 0 1 0 1] {a 1} after 200 set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 1] - assert_equal [llength $reply] 2 + assert_equal [llength $reply] 3 assert_equal [lindex $reply 0] "0-0" assert_equal [llength [lindex $reply 1]] 1 assert_equal [llength [lindex $reply 1 0]] 2 @@ -462,7 +461,7 @@ start_server { assert_equal [lindex $reply 1 0 1] {a 1} # Consumer 1 reads another 2 items from stream - r XREADGROUP GROUP mygroup consumer1 count 2 STREAMS mystream > + r XREADGROUP GROUP mygroup consumer1 count 3 STREAMS mystream > # For min-idle-time after 200 @@ -471,33 +470,37 @@ start_server { # only item 3. Try to use consumer 2 to claim the deleted item 2 # from the PEL of consumer 1, this should return nil r XDEL mystream $id2 + + # id1 and id3 are self-claimed here but not id2 ('count' was set to 2) + # we make sure id2 is indeed skipped (the cursor points to id4) set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 2] - # id1 is self-claimed here but not id2 ('count' was set to 2) - assert_equal [llength $reply] 2 - assert_equal [lindex $reply 0] $id3 + + assert_equal [llength $reply] 3 + assert_equal [lindex $reply 0] $id4 assert_equal [llength [lindex $reply 1]] 2 assert_equal [llength [lindex $reply 1 0]] 2 assert_equal [llength [lindex $reply 1 0 1]] 2 assert_equal [lindex $reply 1 0 1] {a 1} - assert_equal [lindex $reply 1 1] "" + assert_equal [lindex $reply 1 1 1] {c 3} # Delete item 3 from the stream. Now consumer 1 has PEL that is empty. # Try to use consumer 2 to claim the deleted item 3 from the PEL # of consumer 1, this should return nil after 200 - r XDEL mystream $id3 + + r XDEL mystream $id4 + + # id1 and id3 are self-claimed here but not id2 and id4 ('count' is default 100) set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - JUSTID] - # id1 is self-claimed here but not id2 and id3 ('count' is default 100) # we also test the JUSTID modifier here. note that, when using JUSTID, # deleted entries are returned in reply (consistent with XCLAIM). - assert_equal [llength $reply] 2 - assert_equal [lindex $reply 0] "0-0" - assert_equal [llength [lindex $reply 1]] 3 + assert_equal [llength $reply] 3 + assert_equal [lindex $reply 0] {0-0} + assert_equal [llength [lindex $reply 1]] 2 assert_equal [lindex $reply 1 0] $id1 - assert_equal [lindex $reply 1 1] $id2 - assert_equal [lindex $reply 1 2] $id3 + assert_equal [lindex $reply 1 1] $id3 } test {XAUTOCLAIM as an iterator} { @@ -518,7 +521,7 @@ start_server { # Claim 2 entries set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 2] - assert_equal [llength $reply] 2 + assert_equal [llength $reply] 3 set cursor [lindex $reply 0] assert_equal $cursor $id3 assert_equal [llength [lindex $reply 1]] 2 @@ -527,7 +530,7 @@ start_server { # Claim 2 more entries set reply [r XAUTOCLAIM mystream mygroup consumer2 10 $cursor COUNT 2] - assert_equal [llength $reply] 2 + assert_equal [llength $reply] 3 set cursor [lindex $reply 0] assert_equal $cursor $id5 assert_equal [llength [lindex $reply 1]] 2 @@ -536,7 +539,7 @@ start_server { # Claim last entry set reply [r XAUTOCLAIM mystream mygroup consumer2 10 $cursor COUNT 1] - assert_equal [llength $reply] 2 + assert_equal [llength $reply] 3 set cursor [lindex $reply 0] assert_equal $cursor {0-0} assert_equal [llength [lindex $reply 1]] 1 @@ -548,6 +551,56 @@ start_server { assert_error "ERR COUNT must be > 0" {r XAUTOCLAIM key group consumer 1 1 COUNT 0} } + test {XCLAIM with XDEL} { + r DEL x + r XADD x 1-0 f v + r XADD x 2-0 f v + r XADD x 3-0 f v + r XGROUP CREATE x grp 0 + assert_equal [r XREADGROUP GROUP grp Alice STREAMS x >] {{x {{1-0 {f v}} {2-0 {f v}} {3-0 {f v}}}}} + r XDEL x 2-0 + assert_equal [r XCLAIM x grp Bob 0 1-0 2-0 3-0] {{1-0 {f v}} {3-0 {f v}}} + assert_equal [r XPENDING x grp - + 10 Alice] {} + } + + test {XCLAIM with trimming} { + r DEL x + r config set stream-node-max-entries 2 + r XADD x 1-0 f v + r XADD x 2-0 f v + r XADD x 3-0 f v + r XGROUP CREATE x grp 0 + assert_equal [r XREADGROUP GROUP grp Alice STREAMS x >] {{x {{1-0 {f v}} {2-0 {f v}} {3-0 {f v}}}}} + r XTRIM x MAXLEN 1 + assert_equal [r XCLAIM x grp Bob 0 1-0 2-0 3-0] {{3-0 {f v}}} + assert_equal [r XPENDING x grp - + 10 Alice] {} + } + + test {XAUTOCLAIM with XDEL} { + r DEL x + r XADD x 1-0 f v + r XADD x 2-0 f v + r XADD x 3-0 f v + r XGROUP CREATE x grp 0 + assert_equal [r XREADGROUP GROUP grp Alice STREAMS x >] {{x {{1-0 {f v}} {2-0 {f v}} {3-0 {f v}}}}} + r XDEL x 2-0 + assert_equal [r XAUTOCLAIM x grp Bob 0 0-0] {0-0 {{1-0 {f v}} {3-0 {f v}}} 2-0} + assert_equal [r XPENDING x grp - + 10 Alice] {} + } + + test {XCLAIM with trimming} { + r DEL x + r config set stream-node-max-entries 2 + r XADD x 1-0 f v + r XADD x 2-0 f v + r XADD x 3-0 f v + r XGROUP CREATE x grp 0 + assert_equal [r XREADGROUP GROUP grp Alice STREAMS x >] {{x {{1-0 {f v}} {2-0 {f v}} {3-0 {f v}}}}} + r XTRIM x MAXLEN 1 + assert_equal [r XAUTOCLAIM x grp Bob 0 0-0] {0-0 {{3-0 {f v}}} {1-0 2-0}} + assert_equal [r XPENDING x grp - + 10 Alice] {} + } + test {XINFO FULL output} { r del x r XADD x 100 a 1 @@ -733,6 +786,46 @@ start_server { } } + start_server {tags {"external:skip"}} { + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + set replica [srv 0 client] + + foreach autoclaim {0 1} { + test "Replication tests of XCLAIM with deleted entries (autclaim=$autoclaim)" { + $replica replicaof $master_host $master_port + wait_for_condition 50 100 { + [s 0 master_link_status] eq {up} + } else { + fail "Replication not started." + } + + $master DEL x + $master XADD x 1-0 f v + $master XADD x 2-0 f v + $master XADD x 3-0 f v + $master XADD x 4-0 f v + $master XADD x 5-0 f v + $master XGROUP CREATE x grp 0 + assert_equal [$master XREADGROUP GROUP grp Alice STREAMS x >] {{x {{1-0 {f v}} {2-0 {f v}} {3-0 {f v}} {4-0 {f v}} {5-0 {f v}}}}} + wait_for_ofs_sync $master $replica + assert_equal [llength [$replica XPENDING x grp - + 10 Alice]] 5 + $master XDEL x 2-0 + $master XDEL x 4-0 + if {$autoclaim} { + assert_equal [$master XAUTOCLAIM x grp Bob 0 0-0] {0-0 {{1-0 {f v}} {3-0 {f v}} {5-0 {f v}}} {2-0 4-0}} + wait_for_ofs_sync $master $replica + assert_equal [llength [$replica XPENDING x grp - + 10 Alice]] 0 + } else { + assert_equal [$master XCLAIM x grp Bob 0 1-0 2-0 3-0 4-0] {{1-0 {f v}} {3-0 {f v}}} + wait_for_ofs_sync $master $replica + assert_equal [llength [$replica XPENDING x grp - + 10 Alice]] 1 + } + } + } + } + start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb-preamble no}} { test {Empty stream with no lastid can be rewrite into AOF correctly} { r XGROUP CREATE mystream group-name $ MKSTREAM