From 4065b4f27efc539b86beb63829bc148a02adecb1 Mon Sep 17 00:00:00 2001 From: Harkrishn Patro Date: Mon, 30 May 2022 22:03:59 -0700 Subject: [PATCH] Sharded pubsub publish messagebulk as smessage (#10792) To easily distinguish between sharded channel message and a global channel message, introducing `smessage` (instead of `message`) as message bulk for sharded channel publish message. This is gonna be a breaking change in 7.0.1! Background: Sharded pubsub introduced in redis 7.0, but after the release we quickly realized that the fact that it's problematic that the client can't distinguish between normal (global) pubsub messages and sharded ones. This is important because the same connection can subscribe to both, but messages sent to one pubsub system are not propagated to the other (they're completely separate), so if one connection is used to subscribe to both, we need to assist the client library to know which message it got so it can forward it to the correct callback. --- src/pubsub.c | 11 +++++++---- src/server.c | 1 + src/server.h | 4 ++-- src/tracking.c | 2 +- .../tests/25-pubsubshard-slot-migration.tcl | 8 ++++---- tests/unit/moduleapi/publish.tcl | 2 +- tests/unit/pubsubshard.tcl | 16 ++++++++-------- 7 files changed, 24 insertions(+), 20 deletions(-) diff --git a/src/pubsub.c b/src/pubsub.c index 07459c1c8..da1bd1fc2 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -39,6 +39,7 @@ typedef struct pubsubtype { dict **serverPubSubChannels; robj **subscribeMsg; robj **unsubscribeMsg; + robj **messageBulk; }pubsubtype; /* @@ -78,6 +79,7 @@ pubsubtype pubSubType = { .serverPubSubChannels = &server.pubsub_channels, .subscribeMsg = &shared.subscribebulk, .unsubscribeMsg = &shared.unsubscribebulk, + .messageBulk = &shared.messagebulk, }; /* @@ -89,7 +91,8 @@ pubsubtype pubSubShardType = { .subscriptionCount = clientShardSubscriptionsCount, .serverPubSubChannels = &server.pubsubshard_channels, .subscribeMsg = &shared.ssubscribebulk, - .unsubscribeMsg = &shared.sunsubscribebulk + .unsubscribeMsg = &shared.sunsubscribebulk, + .messageBulk = &shared.smessagebulk, }; /*----------------------------------------------------------------------------- @@ -101,12 +104,12 @@ pubsubtype pubSubShardType = { * message. However if the caller sets 'msg' as NULL, it will be able * to send a special message (for instance an Array type) by using the * addReply*() API family. */ -void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { +void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk) { if (c->resp == 2) addReply(c,shared.mbulkhdr[3]); else addReplyPushLen(c,3); - addReply(c,shared.messagebulk); + addReply(c,message_bulk); addReplyBulk(c,channel); if (msg) addReplyBulk(c,msg); } @@ -461,7 +464,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { client *c = ln->value; - addReplyPubsubMessage(c,channel,message); + addReplyPubsubMessage(c,channel,message,*type.messageBulk); updateClientMemUsage(c); receivers++; } diff --git a/src/server.c b/src/server.c index 8f4654ad8..75dbab0d2 100644 --- a/src/server.c +++ b/src/server.c @@ -1755,6 +1755,7 @@ void createSharedObjects(void) { shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18); shared.ssubscribebulk = createStringObject("$10\r\nssubscribe\r\n", 17); shared.sunsubscribebulk = createStringObject("$12\r\nsunsubscribe\r\n", 19); + shared.smessagebulk = createStringObject("$8\r\nsmessage\r\n", 14); shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17); shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19); diff --git a/src/server.h b/src/server.h index 75f7aba60..b34a84cd7 100644 --- a/src/server.h +++ b/src/server.h @@ -1228,7 +1228,7 @@ struct sharedObjectsStruct { *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack, *special_asterick, *special_equals, *default_username, *redacted, - *ssubscribebulk,*sunsubscribebulk, + *ssubscribebulk,*sunsubscribebulk, *smessagebulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ @@ -2991,7 +2991,7 @@ void pubsubUnsubscribeShardChannels(robj **channels, unsigned int count); int pubsubUnsubscribeAllPatterns(client *c, int notify); int pubsubPublishMessage(robj *channel, robj *message, int sharded); int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded); -void addReplyPubsubMessage(client *c, robj *channel, robj *msg); +void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk); int serverPubsubSubscriptionCount(); int serverPubsubShardSubscriptionCount(); diff --git a/src/tracking.c b/src/tracking.c index b86d984a7..a659e98dd 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -295,7 +295,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { } else if (using_redirection && c->flags & CLIENT_PUBSUB) { /* We use a static object to speedup things, however we assume * that addReplyPubsubMessage() will not take a reference. */ - addReplyPubsubMessage(c,TrackingChannelName,NULL); + addReplyPubsubMessage(c,TrackingChannelName,NULL,shared.messagebulk); } else { /* If are here, the client is not using RESP3, nor is * redirecting to another client. We can't send anything to diff --git a/tests/cluster/tests/25-pubsubshard-slot-migration.tcl b/tests/cluster/tests/25-pubsubshard-slot-migration.tcl index 11b77d36a..0f59ffef2 100644 --- a/tests/cluster/tests/25-pubsubshard-slot-migration.tcl +++ b/tests/cluster/tests/25-pubsubshard-slot-migration.tcl @@ -29,7 +29,7 @@ test "Migrate a slot, verify client receives sunsubscribe on primary serving the # Verify subscribe is still valid, able to receive messages. $nodefrom(link) spublish $channelname hello - assert_equal {message mychannel hello} [$subscribeclient read] + assert_equal {smessage mychannel hello} [$subscribeclient read] assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)] @@ -66,7 +66,7 @@ test "Client subscribes to multiple channels, migrate a slot, verify client rece # Verify subscribe is still valid, able to receive messages. $nodefrom(link) spublish $channelname hello - assert_equal {message ch3 hello} [$subscribeclient read] + assert_equal {smessage ch3 hello} [$subscribeclient read] assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)] @@ -82,7 +82,7 @@ test "Client subscribes to multiple channels, migrate a slot, verify client rece # Verify the client is still connected and receives message from the other channel. set msg [$subscribeclient read] - assert {"message" eq [lindex $msg 0]} + assert {"smessage" eq [lindex $msg 0]} assert {$anotherchannelname eq [lindex $msg 1]} assert {"hello" eq [lindex $msg 2]} @@ -114,7 +114,7 @@ test "Migrate a slot, verify client receives sunsubscribe on replica serving the # Verify subscribe is still valid, able to receive messages. $nodefrom(link) spublish $channelname hello - assert_equal {message mychannel1 hello} [$subscribeclient read] + assert_equal {smessage mychannel1 hello} [$subscribeclient read] assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)] assert_equal {OK} [$nodeto(link) cluster setslot $slot node $nodeto(id)] diff --git a/tests/unit/moduleapi/publish.tcl b/tests/unit/moduleapi/publish.tcl index ab3611093..ccc966a43 100644 --- a/tests/unit/moduleapi/publish.tcl +++ b/tests/unit/moduleapi/publish.tcl @@ -11,7 +11,7 @@ start_server {tags {"modules"}} { assert_equal {1} [subscribe $rd2 {chan1}] assert_equal 1 [r publish.shard chan1 hello] assert_equal 1 [r publish.classic chan1 world] - assert_equal {message chan1 hello} [$rd1 read] + assert_equal {smessage chan1 hello} [$rd1 read] assert_equal {message chan1 world} [$rd2 read] } } diff --git a/tests/unit/pubsubshard.tcl b/tests/unit/pubsubshard.tcl index d0023a841..8cccdcff6 100644 --- a/tests/unit/pubsubshard.tcl +++ b/tests/unit/pubsubshard.tcl @@ -7,14 +7,14 @@ start_server {tags {"pubsubshard external:skip"}} { assert_equal {2} [ssubscribe $rd1 {chan2}] assert_equal 1 [r SPUBLISH chan1 hello] assert_equal 1 [r SPUBLISH chan2 world] - assert_equal {message chan1 hello} [$rd1 read] - assert_equal {message chan2 world} [$rd1 read] + assert_equal {smessage chan1 hello} [$rd1 read] + assert_equal {smessage chan2 world} [$rd1 read] # unsubscribe from one of the channels sunsubscribe $rd1 {chan1} assert_equal 0 [r SPUBLISH chan1 hello] assert_equal 1 [r SPUBLISH chan2 world] - assert_equal {message chan2 world} [$rd1 read] + assert_equal {smessage chan2 world} [$rd1 read] # unsubscribe from the remaining channel sunsubscribe $rd1 {chan2} @@ -32,8 +32,8 @@ start_server {tags {"pubsubshard external:skip"}} { assert_equal {1} [ssubscribe $rd1 {chan1}] assert_equal {1} [ssubscribe $rd2 {chan1}] assert_equal 2 [r SPUBLISH chan1 hello] - assert_equal {message chan1 hello} [$rd1 read] - assert_equal {message chan1 hello} [$rd2 read] + assert_equal {smessage chan1 hello} [$rd1 read] + assert_equal {smessage chan1 hello} [$rd2 read] # clean up clients $rd1 close @@ -58,7 +58,7 @@ start_server {tags {"pubsubshard external:skip"}} { set rd1 [redis_deferring_client] assert_equal {1 1 1} [ssubscribe $rd1 {chan1 chan1 chan1}] assert_equal 1 [r SPUBLISH chan1 hello] - assert_equal {message chan1 hello} [$rd1 read] + assert_equal {smessage chan1 hello} [$rd1 read] # clean up clients $rd1 close @@ -129,9 +129,9 @@ start_server {tags {"pubsubshard external:skip"}} { assert_equal {1} [ssubscribe $rd1 {chan1}] $rd0 SPUBLISH chan1 hello - assert_equal {message chan1 hello} [$rd1 read] + assert_equal {smessage chan1 hello} [$rd1 read] $rd0 SPUBLISH chan1 world - assert_equal {message chan1 world} [$rd1 read] + assert_equal {smessage chan1 world} [$rd1 read] } } } \ No newline at end of file