diff --git a/src/rdb.c b/src/rdb.c index 470de9806..192a8825a 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3431,6 +3431,14 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) } streamNACK *nack = result; + /* If the NACK already has a consumer assigned, the + * payload is corrupt — each global PEL entry must be + * claimed by exactly one consumer. */ + if (nack->consumer != NULL) { + rdbReportCorruptRDB("Stream consumer PEL entry already has a consumer assigned"); + decrRefCount(o); + return NULL; + } /* Set the NACK consumer, that was left to NULL when * loading the global PEL. Then set the same shared * NACK structure also in the consumer-specific PEL. */ diff --git a/tests/integration/corrupt-dump.tcl b/tests/integration/corrupt-dump.tcl index 412d8a018..c693da91d 100644 --- a/tests/integration/corrupt-dump.tcl +++ b/tests/integration/corrupt-dump.tcl @@ -999,5 +999,21 @@ test {corrupt payload: fuzzer findings - decrRefCount on NULL robj on corrupt KE } } +test {corrupt payload: stream with NACK shared between two consumers} { + start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no]] { + r debug set-skip-checksum-validation 1 + # Payload: stream with entry 1-0, one consumer group (mygroup), + # two consumers whose PELs both reference 1-0 (shared NACK). + # XACK on one consumer frees the NACK, leaving a dangling + # pointer in the other consumer's PEL (use-after-free). + catch {r RESTORE mystream 0 "\x1a\x01\x10\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x1d\x1d\x00\x00\x00\x0a\x00\x01\x01\x00\x01\x01\x01\x81\x6b\x02\x00\x01\x02\x01\x00\x01\x00\x01\x81\x76\x02\x04\x01\xff\x01\x01\x00\x01\x00\x00\x00\x01\x01\x07\x6d\x79\x67\x72\x6f\x75\x70\x01\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x64\x42\xb9\x9d\x01\x00\x00\x01\x02\x09\x63\x6f\x6e\x73\x75\x6d\x65\x72\x41\x01\x64\x42\xb9\x9d\x01\x00\x00\x01\x64\x42\xb9\x9d\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x09\x63\x6f\x6e\x73\x75\x6d\x65\x72\x42\x01\x64\x42\xb9\x9d\x01\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x40\x64\x40\x64\x00\x00\x00\x0d\x00\xe7\x12\xf7\xcc\x25\xd5\x0e\x44"} err + catch {r XACK mystream mygroup 1-0} _ + catch {r XREADGROUP GROUP mygroup consumerA COUNT 10 STREAMS mystream 0} _ + catch {r DEL mystream} _ + assert_match "*Bad data format*" $err + r ping + } +} + } ;# tags