diff --git a/src/stream.h b/src/stream.h index bfc165440..146be3b12 100644 --- a/src/stream.h +++ b/src/stream.h @@ -116,7 +116,7 @@ struct client; stream *streamNew(void); void freeStream(stream *s); unsigned long streamLength(const robj *subject); -size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi); +size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, unsigned long *propCount); void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); diff --git a/src/t_stream.c b/src/t_stream.c index 2529cab08..478d75c5c 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1657,7 +1657,7 @@ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds #define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array boundaries, just the entries. */ #define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */ -size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) { +size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, unsigned long *propCount) { void *arraylen_ptr = NULL; size_t arraylen = 0; streamIterator si; @@ -1666,6 +1666,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end int propagate_last_id = 0; int noack = flags & STREAM_RWR_NOACK; + if (propCount) *propCount = 0; + /* If the client is asking for some history, we serve it using a * different function, so that we return entries *solely* from its * own PEL. This ensures each consumer will always and only see @@ -1764,6 +1766,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end robj *idarg = createObjectFromStreamID(&id); streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack); decrRefCount(idarg); + if (propCount) (*propCount)++; } } @@ -1771,8 +1774,10 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end if (count && count == arraylen) break; } - if (spi && propagate_last_id) + if (spi && propagate_last_id) { streamPropagateGroupID(c,spi->keyname,group,spi->groupname); + if (propCount) (*propCount)++; + } streamIteratorStop(&si); if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen); @@ -1808,7 +1813,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start streamID thisid; streamDecodeID(ri.key,&thisid); if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL) == 0) + STREAM_RWR_RAWENTRIES,NULL,NULL) == 0) { /* Note that we may have a not acknowledged entry in the PEL * about a message that's no longer here because was removed @@ -2124,7 +2129,7 @@ void xrangeGenericCommand(client *c, int rev) { addReplyNullArray(c); } else { if (count == -1) count = 0; - streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL); + streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL,NULL); } } @@ -2386,12 +2391,13 @@ void xreadCommand(client *c) { addReplyBulk(c,c->argv[streams_arg+i]); int flags = 0; + unsigned long propCount = 0; if (noack) flags |= STREAM_RWR_NOACK; if (serve_history) flags |= STREAM_RWR_HISTORY; streamReplyWithRange(c,s,&start,NULL,count,0, groups ? groups[i] : NULL, - consumer, flags, &spi); - if (groups) server.dirty++; + consumer, flags, &spi, &propCount); + if (propCount) server.dirty++; } } @@ -3298,7 +3304,7 @@ void xclaimCommand(client *c) { if (justid) { addReplyStreamID(c,&id); } else { - serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1); + serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,NULL) == 1); } arraylen++; @@ -3473,7 +3479,7 @@ void xautoclaimCommand(client *c) { if (justid) { addReplyStreamID(c,&id); } else { - serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1); + serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,NULL) == 1); } arraylen++; count--; @@ -3697,18 +3703,18 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) { end.ms = end.seq = UINT64_MAX; addReplyBulkCString(c,"first-entry"); emitted = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL); + STREAM_RWR_RAWENTRIES,NULL,NULL); if (!emitted) addReplyNull(c); addReplyBulkCString(c,"last-entry"); emitted = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL); + STREAM_RWR_RAWENTRIES,NULL,NULL); if (!emitted) addReplyNull(c); } else { /* XINFO STREAM FULL [COUNT ] */ /* Stream entries */ addReplyBulkCString(c,"entries"); - streamReplyWithRange(c,s,NULL,NULL,count,0,NULL,NULL,0,NULL); + streamReplyWithRange(c,s,NULL,NULL,count,0,NULL,NULL,0,NULL,NULL); /* Consumer groups */ addReplyBulkCString(c,"groups"); diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 9b457dc67..2462a25ba 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -1009,6 +1009,68 @@ start_server { assert_error "*NOGROUP*" {r XGROUP CREATECONSUMER mystream mygroup consumer} } + test {XREADGROUP of multiple entries changes dirty by one} { + r DEL x + r XADD x 1-0 data a + r XADD x 2-0 data b + r XADD x 3-0 data c + r XADD x 4-0 data d + r XGROUP CREATE x g1 0 + r XGROUP CREATECONSUMER x g1 Alice + + set dirty [s rdb_changes_since_last_save] + set res [r XREADGROUP GROUP g1 Alice COUNT 2 STREAMS x ">"] + assert_equal $res {{x {{1-0 {data a}} {2-0 {data b}}}}} + set dirty2 [s rdb_changes_since_last_save] + assert {$dirty2 == $dirty + 1} + + set dirty [s rdb_changes_since_last_save] + set res [r XREADGROUP GROUP g1 Alice NOACK COUNT 2 STREAMS x ">"] + assert_equal $res {{x {{3-0 {data c}} {4-0 {data d}}}}} + set dirty2 [s rdb_changes_since_last_save] + assert {$dirty2 == $dirty + 1} + } + + test {XREADGROUP from PEL does not change dirty} { + # Techinally speaking, XREADGROUP from PEL should cause propagation + # because it change the delivery count/time + # It was decided that this metadata changes are too insiginificant + # to justify propagation + # This test covers that. + r DEL x + r XADD x 1-0 data a + r XADD x 2-0 data b + r XADD x 3-0 data c + r XADD x 4-0 data d + r XGROUP CREATE x g1 0 + r XGROUP CREATECONSUMER x g1 Alice + + set res [r XREADGROUP GROUP g1 Alice COUNT 2 STREAMS x ">"] + assert_equal $res {{x {{1-0 {data a}} {2-0 {data b}}}}} + + set dirty [s rdb_changes_since_last_save] + set res [r XREADGROUP GROUP g1 Alice COUNT 2 STREAMS x 0] + assert_equal $res {{x {{1-0 {data a}} {2-0 {data b}}}}} + set dirty2 [s rdb_changes_since_last_save] + assert {$dirty2 == $dirty} + + set dirty [s rdb_changes_since_last_save] + set res [r XREADGROUP GROUP g1 Alice COUNT 2 STREAMS x 9000] + assert_equal $res {{x {}}} + set dirty2 [s rdb_changes_since_last_save] + assert {$dirty2 == $dirty} + + # The current behavior is that we create the consumer (causes dirty++) even + # if we onlyneed to read from PEL. + # It feels like we shouldn't create the consumer in that case, but I added + # this test just for coverage of current behavior + set dirty [s rdb_changes_since_last_save] + set res [r XREADGROUP GROUP g1 noconsumer COUNT 2 STREAMS x 0] + assert_equal $res {{x {}}} + set dirty2 [s rdb_changes_since_last_save] + assert {$dirty2 == $dirty + 1} + } + start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb-preamble no appendfsync always}} { test {XREADGROUP with NOACK creates consumer} { r del mystream @@ -1329,6 +1391,19 @@ start_server { assert_equal [dict get $group entries-read] 3 assert_equal [dict get $group lag] 0 } + + test {XREADGROUP from PEL inside MULTI} { + # This scenario used to cause propagation of EXEC without MULTI in 6.2 + $replica config set propagation-error-behavior panic + $master del mystream + $master xadd mystream 1-0 a b c d e f + $master xgroup create mystream mygroup 0 + $master xreadgroup group mygroup ryan count 1 streams mystream > + $master multi + $master xreadgroup group mygroup ryan count 1 streams mystream 0 + set reply [$master exec] + assert_equal $reply {{{mystream {{1-0 {a b c d e f}}}}}} + } } start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb-preamble no}} {