diff --git a/src/pubsub.c b/src/pubsub.c index 07459c1c8..da1bd1fc2 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -39,6 +39,7 @@ typedef struct pubsubtype { dict **serverPubSubChannels; robj **subscribeMsg; robj **unsubscribeMsg; + robj **messageBulk; }pubsubtype; /* @@ -78,6 +79,7 @@ pubsubtype pubSubType = { .serverPubSubChannels = &server.pubsub_channels, .subscribeMsg = &shared.subscribebulk, .unsubscribeMsg = &shared.unsubscribebulk, + .messageBulk = &shared.messagebulk, }; /* @@ -89,7 +91,8 @@ pubsubtype pubSubShardType = { .subscriptionCount = clientShardSubscriptionsCount, .serverPubSubChannels = &server.pubsubshard_channels, .subscribeMsg = &shared.ssubscribebulk, - .unsubscribeMsg = &shared.sunsubscribebulk + .unsubscribeMsg = &shared.sunsubscribebulk, + .messageBulk = &shared.smessagebulk, }; /*----------------------------------------------------------------------------- @@ -101,12 +104,12 @@ pubsubtype pubSubShardType = { * message. However if the caller sets 'msg' as NULL, it will be able * to send a special message (for instance an Array type) by using the * addReply*() API family. */ -void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { +void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk) { if (c->resp == 2) addReply(c,shared.mbulkhdr[3]); else addReplyPushLen(c,3); - addReply(c,shared.messagebulk); + addReply(c,message_bulk); addReplyBulk(c,channel); if (msg) addReplyBulk(c,msg); } @@ -461,7 +464,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { client *c = ln->value; - addReplyPubsubMessage(c,channel,message); + addReplyPubsubMessage(c,channel,message,*type.messageBulk); updateClientMemUsage(c); receivers++; } diff --git a/src/server.c b/src/server.c index 8f4654ad8..75dbab0d2 100644 --- a/src/server.c +++ b/src/server.c @@ -1755,6 +1755,7 @@ void createSharedObjects(void) { shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18); shared.ssubscribebulk = createStringObject("$10\r\nssubscribe\r\n", 17); shared.sunsubscribebulk = createStringObject("$12\r\nsunsubscribe\r\n", 19); + shared.smessagebulk = createStringObject("$8\r\nsmessage\r\n", 14); shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17); shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19); diff --git a/src/server.h b/src/server.h index 75f7aba60..b34a84cd7 100644 --- a/src/server.h +++ b/src/server.h @@ -1228,7 +1228,7 @@ struct sharedObjectsStruct { *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack, *special_asterick, *special_equals, *default_username, *redacted, - *ssubscribebulk,*sunsubscribebulk, + *ssubscribebulk,*sunsubscribebulk, *smessagebulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ @@ -2991,7 +2991,7 @@ void pubsubUnsubscribeShardChannels(robj **channels, unsigned int count); int pubsubUnsubscribeAllPatterns(client *c, int notify); int pubsubPublishMessage(robj *channel, robj *message, int sharded); int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded); -void addReplyPubsubMessage(client *c, robj *channel, robj *msg); +void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk); int serverPubsubSubscriptionCount(); int serverPubsubShardSubscriptionCount(); diff --git a/src/tracking.c b/src/tracking.c index b86d984a7..a659e98dd 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -295,7 +295,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { } else if (using_redirection && c->flags & CLIENT_PUBSUB) { /* We use a static object to speedup things, however we assume * that addReplyPubsubMessage() will not take a reference. */ - addReplyPubsubMessage(c,TrackingChannelName,NULL); + addReplyPubsubMessage(c,TrackingChannelName,NULL,shared.messagebulk); } else { /* If are here, the client is not using RESP3, nor is * redirecting to another client. We can't send anything to diff --git a/tests/cluster/tests/25-pubsubshard-slot-migration.tcl b/tests/cluster/tests/25-pubsubshard-slot-migration.tcl index 11b77d36a..0f59ffef2 100644 --- a/tests/cluster/tests/25-pubsubshard-slot-migration.tcl +++ b/tests/cluster/tests/25-pubsubshard-slot-migration.tcl @@ -29,7 +29,7 @@ test "Migrate a slot, verify client receives sunsubscribe on primary serving the # Verify subscribe is still valid, able to receive messages. $nodefrom(link) spublish $channelname hello - assert_equal {message mychannel hello} [$subscribeclient read] + assert_equal {smessage mychannel hello} [$subscribeclient read] assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)] @@ -66,7 +66,7 @@ test "Client subscribes to multiple channels, migrate a slot, verify client rece # Verify subscribe is still valid, able to receive messages. $nodefrom(link) spublish $channelname hello - assert_equal {message ch3 hello} [$subscribeclient read] + assert_equal {smessage ch3 hello} [$subscribeclient read] assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)] @@ -82,7 +82,7 @@ test "Client subscribes to multiple channels, migrate a slot, verify client rece # Verify the client is still connected and receives message from the other channel. set msg [$subscribeclient read] - assert {"message" eq [lindex $msg 0]} + assert {"smessage" eq [lindex $msg 0]} assert {$anotherchannelname eq [lindex $msg 1]} assert {"hello" eq [lindex $msg 2]} @@ -114,7 +114,7 @@ test "Migrate a slot, verify client receives sunsubscribe on replica serving the # Verify subscribe is still valid, able to receive messages. $nodefrom(link) spublish $channelname hello - assert_equal {message mychannel1 hello} [$subscribeclient read] + assert_equal {smessage mychannel1 hello} [$subscribeclient read] assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)] assert_equal {OK} [$nodeto(link) cluster setslot $slot node $nodeto(id)] diff --git a/tests/unit/moduleapi/publish.tcl b/tests/unit/moduleapi/publish.tcl index ab3611093..ccc966a43 100644 --- a/tests/unit/moduleapi/publish.tcl +++ b/tests/unit/moduleapi/publish.tcl @@ -11,7 +11,7 @@ start_server {tags {"modules"}} { assert_equal {1} [subscribe $rd2 {chan1}] assert_equal 1 [r publish.shard chan1 hello] assert_equal 1 [r publish.classic chan1 world] - assert_equal {message chan1 hello} [$rd1 read] + assert_equal {smessage chan1 hello} [$rd1 read] assert_equal {message chan1 world} [$rd2 read] } } diff --git a/tests/unit/pubsubshard.tcl b/tests/unit/pubsubshard.tcl index d0023a841..8cccdcff6 100644 --- a/tests/unit/pubsubshard.tcl +++ b/tests/unit/pubsubshard.tcl @@ -7,14 +7,14 @@ start_server {tags {"pubsubshard external:skip"}} { assert_equal {2} [ssubscribe $rd1 {chan2}] assert_equal 1 [r SPUBLISH chan1 hello] assert_equal 1 [r SPUBLISH chan2 world] - assert_equal {message chan1 hello} [$rd1 read] - assert_equal {message chan2 world} [$rd1 read] + assert_equal {smessage chan1 hello} [$rd1 read] + assert_equal {smessage chan2 world} [$rd1 read] # unsubscribe from one of the channels sunsubscribe $rd1 {chan1} assert_equal 0 [r SPUBLISH chan1 hello] assert_equal 1 [r SPUBLISH chan2 world] - assert_equal {message chan2 world} [$rd1 read] + assert_equal {smessage chan2 world} [$rd1 read] # unsubscribe from the remaining channel sunsubscribe $rd1 {chan2} @@ -32,8 +32,8 @@ start_server {tags {"pubsubshard external:skip"}} { assert_equal {1} [ssubscribe $rd1 {chan1}] assert_equal {1} [ssubscribe $rd2 {chan1}] assert_equal 2 [r SPUBLISH chan1 hello] - assert_equal {message chan1 hello} [$rd1 read] - assert_equal {message chan1 hello} [$rd2 read] + assert_equal {smessage chan1 hello} [$rd1 read] + assert_equal {smessage chan1 hello} [$rd2 read] # clean up clients $rd1 close @@ -58,7 +58,7 @@ start_server {tags {"pubsubshard external:skip"}} { set rd1 [redis_deferring_client] assert_equal {1 1 1} [ssubscribe $rd1 {chan1 chan1 chan1}] assert_equal 1 [r SPUBLISH chan1 hello] - assert_equal {message chan1 hello} [$rd1 read] + assert_equal {smessage chan1 hello} [$rd1 read] # clean up clients $rd1 close @@ -129,9 +129,9 @@ start_server {tags {"pubsubshard external:skip"}} { assert_equal {1} [ssubscribe $rd1 {chan1}] $rd0 SPUBLISH chan1 hello - assert_equal {message chan1 hello} [$rd1 read] + assert_equal {smessage chan1 hello} [$rd1 read] $rd0 SPUBLISH chan1 world - assert_equal {message chan1 world} [$rd1 read] + assert_equal {smessage chan1 world} [$rd1 read] } } } \ No newline at end of file