From 56eb84dbe652289b9b67c57b710e93c6a3379c49 Mon Sep 17 00:00:00 2001 From: Hristo Staykov Date: Fri, 24 Apr 2026 16:54:06 +0300 Subject: [PATCH 01/12] Restructure BCAST tracking clients as user-keyed two-level rax MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the flat client-keyed rax in bcastState.clients with a two-level rax keyed by user pointer → inner rax of client pointers. This groups clients by ACL identity, enabling per-user proto caching and send-time ACL filtering via ACLUserCheckKeyPerm. Key changes: - trackingBuildBroadcastReply now accepts a user* for ACL key filtering and an optional client* for NOLOOP, replacing the old client*-only signature. - New trackingBroadcastBcastState helper builds one proto per user bucket and sends to each client, with NOLOOP fallback. - New trackingBcastHandleUserSwitch flushes pending invalidations under the old identity then re-buckets the client, called from checkPasswordBasedAuth and ACL LOAD before c->user is reassigned. - enableBcastTrackingForPrefix and disableTracking updated for the two-level rax structure. ACL permission checks drop from O(C × K) to O(U × K) per prefix per broadcast cycle. --- src/acl.c | 5 +- src/server.h | 1 + src/tracking.c | 223 ++++++++++++++++++++++++++++++---------- tests/unit/tracking.tcl | 121 ++++++++++++++++++++++ 4 files changed, 294 insertions(+), 56 deletions(-) diff --git a/src/acl.c b/src/acl.c index 177077d45..79037cf9b 100644 --- a/src/acl.c +++ b/src/acl.c @@ -1496,8 +1496,10 @@ void addAuthErrReply(client *c, robj *err) { * The return value is AUTH_OK on success (valid username / password pair) & AUTH_ERR otherwise. */ int checkPasswordBasedAuth(client *c, robj *username, robj *password) { if (ACLCheckUserCredentials(username,password) == C_OK) { + user *new_user = ACLGetUserByName(username->ptr,sdslen(username->ptr)); + trackingBroadcastPostUserSwitch(c, new_user); c->authenticated = 1; - c->user = ACLGetUserByName(username->ptr,sdslen(username->ptr)); + c->user = new_user; moduleNotifyUserChanged(c); return AUTH_OK; } else { @@ -2481,6 +2483,7 @@ sds ACLLoadFromFile(const char *filename) { deauthenticateAndCloseClient(c); continue; } + trackingBroadcastPostUserSwitch(c,new); c->user = new; } diff --git a/src/server.h b/src/server.h index 9318eec68..0fc00faab 100644 --- a/src/server.h +++ b/src/server.h @@ -3361,6 +3361,7 @@ uint64_t trackingGetTotalItems(void); uint64_t trackingGetTotalKeys(void); uint64_t trackingGetTotalPrefixes(void); void trackingBroadcastInvalidationMessages(void); +void trackingBroadcastPostUserSwitch(client *c, user *new_user); int checkPrefixCollisionsOrReply(client *c, robj **prefix, size_t numprefix); /* List data type */ diff --git a/src/tracking.c b/src/tracking.c index c235d5812..38532ab15 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -56,7 +56,20 @@ void disableTracking(client *c) { int found = raxFind(PrefixTable,ri.key,ri.key_len,&result); serverAssert(found); bcastState *bs = result; - raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL); + + /* Find the user bucket and remove this client from it. */ + rax *user_clients; + found = raxFind(bs->clients, + (unsigned char*)&c->user, sizeof(c->user), + (void**)&user_clients); + serverAssert(found); + raxRemove(user_clients,(unsigned char*)&c,sizeof(c),NULL); + if (raxSize(user_clients) == 0) { + raxFree(user_clients); + raxRemove(bs->clients, + (unsigned char*)&c->user,sizeof(c->user),NULL); + } + /* Was it the last client? Remove the prefix from the * table. */ if (raxSize(bs->clients) == 0) { @@ -134,7 +147,7 @@ int checkPrefixCollisionsOrReply(client *c, robj **prefixes, size_t numprefix) { /* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is * already registered for the specified prefix, no operation is performed. */ -void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { +static void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { void *result; bcastState *bs; /* If this is the first client subscribing to such prefix, create @@ -147,7 +160,20 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { } else { bs = result; } - if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) { + + /* Find or create the per-user client set. */ + rax *user_clients; + if (!raxFind(bs->clients, + (unsigned char*)&c->user,sizeof(c->user), + (void**)&user_clients)) + { + user_clients = raxNew(); + raxInsert(bs->clients, + (unsigned char*)&c->user,sizeof(c->user), + user_clients,NULL); + } + + if (raxTryInsert(user_clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) { if (c->client_tracking_prefixes == NULL) c->client_tracking_prefixes = raxNew(); raxInsert(c->client_tracking_prefixes, @@ -552,28 +578,30 @@ void trackingLimitUsedSlots(void) { } /* Generate Redis protocol for an array containing all the key names - * in the 'keys' radix tree. If the client is not NULL, the list will not - * include keys that were modified the last time by this client, in order - * to implement the NOLOOP option. + * in the 'keys' radix tree, filtered by ACL permissions of user 'u' and + * optionally by NOLOOP (skipping keys last modified by 'noloop_client'). + * + * If 'u' is non-NULL, keys the user is not permitted to observe are excluded. + * If 'c' is non-NULL, keys whose last modifier (ri.data) matches + * that client are excluded. * * If the resulting array would be empty, NULL is returned instead. */ -sds trackingBuildBroadcastReply(client *c, rax *keys) { +sds trackingBuildBroadcastReply(user *u, client *c, rax *keys) { + debugServerAssert(!c || c->flags & CLIENT_TRACKING_NOLOOP); raxIterator ri; - uint64_t count; + uint64_t count = 0; - if (c == NULL) { - count = raxSize(keys); - } else { - count = 0; - raxStart(&ri,keys); - raxSeek(&ri,"^",NULL,0); - while(raxNext(&ri)) { - if (ri.data != c) count++; - } - raxStop(&ri); - - if (count == 0) return NULL; + raxStart(&ri, keys); + raxSeek(&ri, "^",NULL,0); + while(raxNext(&ri)) { + if (c && ri.data == c) continue; + if (u && ACLUserCheckKeyPerm(u, (const char*)ri.key, ri.key_len, + ACL_READ_PERMISSION) != ACL_OK) continue; + count++; } + raxStop(&ri); + + if (count == 0) return NULL; /* Create the array reply with the list of keys once, then send * it to all the clients subscribed to this prefix. */ @@ -588,6 +616,8 @@ sds trackingBuildBroadcastReply(client *c, rax *keys) { raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { if (c && ri.data == c) continue; + if (u && ACLUserCheckKeyPerm(u,(const char*)ri.key,ri.key_len, + ACL_READ_PERMISSION) != ACL_OK) continue; len = ll2string(buf,sizeof(buf),ri.key_len); proto = sdscatlen(proto,"$",1); proto = sdscatlen(proto,buf,len); @@ -599,11 +629,127 @@ sds trackingBuildBroadcastReply(client *c, rax *keys) { return proto; } +/* Send pending BCAST invalidation messages for a single prefix's + * bcastState, then reset bs->keys. Iterates user buckets, builds + * one proto per user, and sends to each client in the bucket. */ +static void trackingBcastInvalidationsForPrefix(bcastState *bs) { + if (raxSize(bs->keys) == 0) return; + + raxIterator ri, ri2; + raxStart(&ri,bs->clients); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + user *u; + memcpy(&u,ri.key,sizeof(u)); + rax *user_clients = ri.data; + + sds proto = trackingBuildBroadcastReply(u, NULL, bs->keys); + + raxStart(&ri2,user_clients); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + client *c; + memcpy(&c,ri2.key,sizeof(c)); + + if (c->flags & CLIENT_TRACKING_NOLOOP) { + sds adhoc = trackingBuildBroadcastReply(u, c, bs->keys); + if (!adhoc) continue; + sendTrackingMessage(c, adhoc, + sdslen(adhoc), 1); + sdsfree(adhoc); + continue; + } + if (!proto) continue; + + sendTrackingMessage(c, proto, sdslen(proto), 1); + } + raxStop(&ri2); + + sdsfree(proto); + } + raxStop(&ri); + + raxFree(bs->keys); + bs->keys = raxNew(); +} + +/* Send pending BCAST invalidation messages for every prefix in + * 'prefixes' (a rax of prefix -> NULL, i.e. client_tracking_prefixes). + * This triggers the full broadcast cycle for each matching prefix. */ +static void trackingBcastSendInvalidationsForPrefixes(rax *prefixes) { + raxIterator ri; + raxStart(&ri,prefixes); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + void *result; + int found = raxFind(PrefixTable,ri.key,ri.key_len,&result); + serverAssert(found); + trackingBcastInvalidationsForPrefix(result); + } + raxStop(&ri); +} + +/* Move client 'c' from its current user bucket to the bucket for + * 'new_user' in every bcastState the client subscribes to. + * Must be called BEFORE c->user is updated. */ +static void trackingBcastMoveClient(client *c, user *new_user) { + raxIterator ri; + raxStart(&ri,c->client_tracking_prefixes); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + void *result; + int found = raxFind(PrefixTable,ri.key,ri.key_len,&result); + serverAssert(found); + bcastState *bs = result; + + /* Remove from old user bucket. */ + rax *from_clients; + found = raxFind(bs->clients, + (unsigned char*)&c->user,sizeof(c->user), + (void**)&from_clients); + serverAssert(found); + raxRemove(from_clients,(unsigned char*)&c,sizeof(c),NULL); + if (raxSize(from_clients) == 0) { + raxFree(from_clients); + raxRemove(bs->clients, + (unsigned char*)&c->user,sizeof(c->user),NULL); + } + + /* Insert into new user bucket. */ + rax *to_clients; + if (!raxFind(bs->clients, + (unsigned char*)&new_user,sizeof(new_user), + (void**)&to_clients)) + { + to_clients = raxNew(); + raxInsert(bs->clients, + (unsigned char*)&new_user,sizeof(new_user), + to_clients,NULL); + } + raxTryInsert(to_clients, + (unsigned char*)&c,sizeof(c),NULL,NULL); + } + raxStop(&ri); +} + +/* Prepare a BCAST tracking client for a user change: flush all pending + * invalidation messages for its prefixes (so every subscriber receives + * them under the current ACL identity), then move the client to the + * bucket for 'new_user'. + * Must be called BEFORE c->user is updated. */ +void trackingBroadcastPostUserSwitch(client *c, user *new_user) { + if (!(c->flags & CLIENT_TRACKING_BCAST)) return; + if (c->user == new_user) return; + + trackingBcastSendInvalidationsForPrefixes(c->client_tracking_prefixes); + trackingBcastMoveClient(c, new_user); +} + /* This function will run the prefixes of clients in BCAST mode and * keys that were modified about each prefix, and will send the * notifications to each client in each prefix. */ void trackingBroadcastInvalidationMessages(void) { - raxIterator ri, ri2; + raxIterator ri; /* Return ASAP if there is nothing to do here. */ if (TrackingTable == NULL || !server.tracking_clients) return; @@ -611,41 +757,8 @@ void trackingBroadcastInvalidationMessages(void) { raxStart(&ri,PrefixTable); raxSeek(&ri,"^",NULL,0); - /* For each prefix... */ while(raxNext(&ri)) { - bcastState *bs = ri.data; - - if (raxSize(bs->keys)) { - /* Generate the common protocol for all the clients that are - * not using the NOLOOP option. */ - sds proto = trackingBuildBroadcastReply(NULL,bs->keys); - - /* Send this array of keys to every client in the list. */ - raxStart(&ri2,bs->clients); - raxSeek(&ri2,"^",NULL,0); - while(raxNext(&ri2)) { - client *c; - memcpy(&c,ri2.key,sizeof(c)); - if (c->flags & CLIENT_TRACKING_NOLOOP) { - /* This client may have certain keys excluded. */ - sds adhoc = trackingBuildBroadcastReply(c,bs->keys); - if (adhoc) { - sendTrackingMessage(c,adhoc,sdslen(adhoc),1); - sdsfree(adhoc); - } - } else { - sendTrackingMessage(c,proto,sdslen(proto),1); - } - } - raxStop(&ri2); - - /* Clean up: we can remove everything from this state, because we - * want to only track the new keys that will be accumulated starting - * from now. */ - sdsfree(proto); - } - raxFree(bs->keys); - bs->keys = raxNew(); + trackingBcastInvalidationsForPrefix(ri.data); } raxStop(&ri); } diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl index 174575eee..d1fd54139 100644 --- a/tests/unit/tracking.tcl +++ b/tests/unit/tracking.tcl @@ -883,6 +883,127 @@ start_server {tags {"tracking network logreqres:skip"}} { assert_equal {PONG} [$rd read] } + test {BCAST ACL filtering - two clients same user see only permitted keys} { + clean_all + + r ACL SETUSER shareduser on >pass123 ~public:* +@all + set c1 [redis_deferring_client] + set c2 [redis_deferring_client] + + $c1 AUTH shareduser pass123 + $c1 read + + $c2 AUTH shareduser pass123 + $c2 read + + $c1 HELLO 3 + $c1 read + $c2 HELLO 3 + $c2 read + + $c1 CLIENT TRACKING on BCAST PREFIX public: PREFIX admin: + assert_match {*OK*} [$c1 read] + $c2 CLIENT TRACKING on BCAST PREFIX public: PREFIX admin: + assert_match {*OK*} [$c2 read] + + $rd_sg MSET public:a{t} 1 admin:b{t} 2 + + # Both clients should receive exactly {public:a{t}} for the + # public: prefix, and nothing for admin: (filtered out by ACL). + set c1_keys {} + set c2_keys {} + # Read invalidation messages: there are two prefixes, but only + # public: should have data for shareduser. + after 100 + $c1 PING + set c1_resp [$c1 read] + if {[lindex $c1_resp 0] eq "invalidate"} { + set c1_keys [lindex $c1_resp 1] + # Read the PONG + $c1 read + } + $c2 PING + set c2_resp [$c2 read] + if {[lindex $c2_resp 0] eq "invalidate"} { + set c2_keys [lindex $c2_resp 1] + # Read the PONG + $c2 read + } + + assert_equal [lsort $c1_keys] [list public:a{t}] + assert_equal [lsort $c2_keys] [list public:a{t}] + + $c1 CLIENT TRACKING off + $c1 read + $c2 CLIENT TRACKING off + $c2 read + $c1 close + $c2 close + r ACL DELUSER shareduser + } + + test {BCAST re-AUTH re-buckets correctly with ACL filtering} { + clean_all + + r ACL SETUSER userA on >passA ~a:* +@all + r ACL SETUSER userB on >passB ~b:* +@all + + set tc [redis_deferring_client] + $tc AUTH userA passA + $tc read + + $tc HELLO 3 + $tc read + + $tc CLIENT TRACKING on BCAST PREFIX a: PREFIX b: + assert_match {*OK*} [$tc read] + + # Write keys matching both prefixes. + $rd_sg SET a:1{t} val1 + $rd_sg SET b:1{t} val1 + + # Under userA, only a:* is visible. + after 100 + $tc PING + set keys {} + while 1 { + set resp [$tc read] + if {[lindex $resp 0] eq "invalidate"} { + lappend keys {*}[lindex $resp 1] + } else { + break + } + } + assert_equal $keys [list a:1{t}] + + # Re-AUTH as userB. + $tc AUTH userB passB + $tc read + + # Write again. + $rd_sg SET a:2{t} val2 + $rd_sg SET b:2{t} val2 + + after 100 + $tc PING + set keys {} + while 1 { + set resp [$tc read] + if {[lindex $resp 0] eq "invalidate"} { + lappend keys {*}[lindex $resp 1] + } else { + break + } + } + assert_equal $keys [list b:2{t}] + + $tc CLIENT TRACKING off + $tc read + $tc close + r ACL DELUSER userA + r ACL DELUSER userB + } + $rd_redirection close $rd_sg close $rd close From b2a46b0269bd822f9f42d32569ddb1e33be694de Mon Sep 17 00:00:00 2001 From: Hristo Staykov Date: Mon, 27 Apr 2026 16:11:07 +0300 Subject: [PATCH 02/12] Handle BCAST tracking client migration after user change Move trackingBroadcastPostUserSwitch to run after c->user is updated instead of before. The function now takes the old_user pointer as an argument and derives the new user from c->user. This allows the hook to be placed in ACLAuthenticateUser, covering both password-based and module-based authentication paths with a single call site. --- src/acl.c | 10 ++++++---- src/server.h | 2 +- src/tracking.c | 27 ++++++++++++++------------- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/acl.c b/src/acl.c index 79037cf9b..ce3aac94e 100644 --- a/src/acl.c +++ b/src/acl.c @@ -1496,10 +1496,8 @@ void addAuthErrReply(client *c, robj *err) { * The return value is AUTH_OK on success (valid username / password pair) & AUTH_ERR otherwise. */ int checkPasswordBasedAuth(client *c, robj *username, robj *password) { if (ACLCheckUserCredentials(username,password) == C_OK) { - user *new_user = ACLGetUserByName(username->ptr,sdslen(username->ptr)); - trackingBroadcastPostUserSwitch(c, new_user); c->authenticated = 1; - c->user = new_user; + c->user = ACLGetUserByName(username->ptr,sdslen(username->ptr)); moduleNotifyUserChanged(c); return AUTH_OK; } else { @@ -1516,11 +1514,15 @@ int checkPasswordBasedAuth(client *c, robj *username, robj *password) { * AUTH_BLOCKED - Indicates module authentication is in progress through a blocking implementation. */ int ACLAuthenticateUser(client *c, robj *username, robj *password, robj **err) { + user *old_user = c->user; int result = checkModuleAuthentication(c, username, password, err); /* If authentication was not handled by any Module, attempt normal password based auth. */ if (result == AUTH_NOT_HANDLED) { result = checkPasswordBasedAuth(c, username, password); } + if (result == AUTH_OK) { + trackingBroadcastPostUserSwitch(c, old_user); + } return result; } @@ -2483,8 +2485,8 @@ sds ACLLoadFromFile(const char *filename) { deauthenticateAndCloseClient(c); continue; } - trackingBroadcastPostUserSwitch(c,new); c->user = new; + trackingBroadcastPostUserSwitch(c, original); } if (user_channels) diff --git a/src/server.h b/src/server.h index 0fc00faab..b427966b4 100644 --- a/src/server.h +++ b/src/server.h @@ -3361,7 +3361,7 @@ uint64_t trackingGetTotalItems(void); uint64_t trackingGetTotalKeys(void); uint64_t trackingGetTotalPrefixes(void); void trackingBroadcastInvalidationMessages(void); -void trackingBroadcastPostUserSwitch(client *c, user *new_user); +void trackingBroadcastPostUserSwitch(client *c, user *old_user); int checkPrefixCollisionsOrReply(client *c, robj **prefix, size_t numprefix); /* List data type */ diff --git a/src/tracking.c b/src/tracking.c index 38532ab15..5c6caefb8 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -689,10 +689,11 @@ static void trackingBcastSendInvalidationsForPrefixes(rax *prefixes) { raxStop(&ri); } -/* Move client 'c' from its current user bucket to the bucket for - * 'new_user' in every bcastState the client subscribes to. - * Must be called BEFORE c->user is updated. */ -static void trackingBcastMoveClient(client *c, user *new_user) { +/* Move client 'c' from its old user bucket (keyed by 'old_user') to + * the bucket for c->user in every bcastState the client subscribes to. + * Must be called AFTER c->user is updated. */ +static void trackingBcastMoveClient(client *c, user *old_user) { + user *new_user = c->user; raxIterator ri; raxStart(&ri,c->client_tracking_prefixes); raxSeek(&ri,"^",NULL,0); @@ -705,14 +706,14 @@ static void trackingBcastMoveClient(client *c, user *new_user) { /* Remove from old user bucket. */ rax *from_clients; found = raxFind(bs->clients, - (unsigned char*)&c->user,sizeof(c->user), + (unsigned char*)&old_user,sizeof(old_user), (void**)&from_clients); serverAssert(found); raxRemove(from_clients,(unsigned char*)&c,sizeof(c),NULL); if (raxSize(from_clients) == 0) { raxFree(from_clients); raxRemove(bs->clients, - (unsigned char*)&c->user,sizeof(c->user),NULL); + (unsigned char*)&old_user,sizeof(old_user),NULL); } /* Insert into new user bucket. */ @@ -732,17 +733,17 @@ static void trackingBcastMoveClient(client *c, user *new_user) { raxStop(&ri); } -/* Prepare a BCAST tracking client for a user change: flush all pending +/* Handle a BCAST tracking client after a user change: flush all pending * invalidation messages for its prefixes (so every subscriber receives - * them under the current ACL identity), then move the client to the - * bucket for 'new_user'. - * Must be called BEFORE c->user is updated. */ -void trackingBroadcastPostUserSwitch(client *c, user *new_user) { + * them under the previous ACL identity), then move the client from the + * 'old_user' bucket to the bucket for c->user. + * Must be called AFTER c->user is updated. */ +void trackingBroadcastPostUserSwitch(client *c, user *old_user) { if (!(c->flags & CLIENT_TRACKING_BCAST)) return; - if (c->user == new_user) return; + if (c->user == old_user) return; trackingBcastSendInvalidationsForPrefixes(c->client_tracking_prefixes); - trackingBcastMoveClient(c, new_user); + trackingBcastMoveClient(c, old_user); } /* This function will run the prefixes of clients in BCAST mode and From e549a85ce8b4ddd8a70adfdba71e9aa44c4812ce Mon Sep 17 00:00:00 2001 From: Hristo Staykov Date: Mon, 27 Apr 2026 18:54:07 +0300 Subject: [PATCH 03/12] Replace inner per-user client rax with vec in BCAST tracking The per-user client set inside bcastState.clients was a rax used as a set (key = client pointer, value = NULL). Replace it with a heap-allocated vec, which is simpler and more cache-friendly for the typical small number of clients per user per prefix. Add vecFindIndexOf and vecSwapRemove to the vector API to support duplicate checking and O(1) unordered removal, with corresponding unit tests. --- src/tracking.c | 95 +++++++++++++++++++++++++------------------------- src/vector.c | 56 +++++++++++++++++++++++++++++ src/vector.h | 8 +++++ 3 files changed, 112 insertions(+), 47 deletions(-) diff --git a/src/tracking.c b/src/tracking.c index 5c6caefb8..130fe212d 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -9,6 +9,7 @@ */ #include "server.h" +#include "vector.h" /* The tracking table is constituted by a radix tree of keys, each pointing * to a radix tree of client IDs, used to track the clients that may have @@ -57,17 +58,18 @@ void disableTracking(client *c) { serverAssert(found); bcastState *bs = result; - /* Find the user bucket and remove this client from it. */ - rax *user_clients; + /* Find the user vector and swap-remove this client from it. */ + vec *user_clients; found = raxFind(bs->clients, (unsigned char*)&c->user, sizeof(c->user), (void**)&user_clients); serverAssert(found); - raxRemove(user_clients,(unsigned char*)&c,sizeof(c),NULL); - if (raxSize(user_clients) == 0) { - raxFree(user_clients); + vecSwapRemove(user_clients, c); + if (vecSize(user_clients) == 0) { + vecRelease(user_clients); + zfree(user_clients); raxRemove(bs->clients, - (unsigned char*)&c->user,sizeof(c->user),NULL); + (unsigned char*)&c->user, sizeof(c->user), NULL); } /* Was it the last client? Remove the prefix from the @@ -161,19 +163,21 @@ static void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { bs = result; } - /* Find or create the per-user client set. */ - rax *user_clients; + /* Find or create the per-user client vector. */ + vec *user_clients; if (!raxFind(bs->clients, - (unsigned char*)&c->user,sizeof(c->user), + (unsigned char*)&c->user, sizeof(c->user), (void**)&user_clients)) { - user_clients = raxNew(); + user_clients = zmalloc(sizeof(vec)); + vecInit(user_clients, NULL, 0); raxInsert(bs->clients, - (unsigned char*)&c->user,sizeof(c->user), - user_clients,NULL); + (unsigned char*)&c->user, sizeof(c->user), + user_clients, NULL); } - if (raxTryInsert(user_clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) { + if (vecFindIndexOf(user_clients, c) < 0) { + vecPush(user_clients, c); if (c->client_tracking_prefixes == NULL) c->client_tracking_prefixes = raxNew(); raxInsert(c->client_tracking_prefixes, @@ -592,7 +596,7 @@ sds trackingBuildBroadcastReply(user *u, client *c, rax *keys) { uint64_t count = 0; raxStart(&ri, keys); - raxSeek(&ri, "^",NULL,0); + raxSeek(&ri, "^", NULL, 0); while(raxNext(&ri)) { if (c && ri.data == c) continue; if (u && ACLUserCheckKeyPerm(u, (const char*)ri.key, ri.key_len, @@ -616,7 +620,7 @@ sds trackingBuildBroadcastReply(user *u, client *c, rax *keys) { raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { if (c && ri.data == c) continue; - if (u && ACLUserCheckKeyPerm(u,(const char*)ri.key,ri.key_len, + if (u && ACLUserCheckKeyPerm(u, (const char*)ri.key, ri.key_len, ACL_READ_PERMISSION) != ACL_OK) continue; len = ll2string(buf,sizeof(buf),ri.key_len); proto = sdscatlen(proto,"$",1); @@ -635,21 +639,18 @@ sds trackingBuildBroadcastReply(user *u, client *c, rax *keys) { static void trackingBcastInvalidationsForPrefix(bcastState *bs) { if (raxSize(bs->keys) == 0) return; - raxIterator ri, ri2; - raxStart(&ri,bs->clients); - raxSeek(&ri,"^",NULL,0); + raxIterator ri; + raxStart(&ri, bs->clients); + raxSeek(&ri, "^", NULL, 0); while(raxNext(&ri)) { user *u; - memcpy(&u,ri.key,sizeof(u)); - rax *user_clients = ri.data; + memcpy(&u, ri.key, sizeof(u)); + vec *user_clients = ri.data; sds proto = trackingBuildBroadcastReply(u, NULL, bs->keys); - raxStart(&ri2,user_clients); - raxSeek(&ri2,"^",NULL,0); - while(raxNext(&ri2)) { - client *c; - memcpy(&c,ri2.key,sizeof(c)); + for (size_t j = 0; j < vecSize(user_clients); j++) { + client *c = vecGet(user_clients, j); if (c->flags & CLIENT_TRACKING_NOLOOP) { sds adhoc = trackingBuildBroadcastReply(u, c, bs->keys); @@ -663,7 +664,6 @@ static void trackingBcastInvalidationsForPrefix(bcastState *bs) { sendTrackingMessage(c, proto, sdslen(proto), 1); } - raxStop(&ri2); sdsfree(proto); } @@ -678,11 +678,11 @@ static void trackingBcastInvalidationsForPrefix(bcastState *bs) { * This triggers the full broadcast cycle for each matching prefix. */ static void trackingBcastSendInvalidationsForPrefixes(rax *prefixes) { raxIterator ri; - raxStart(&ri,prefixes); - raxSeek(&ri,"^",NULL,0); + raxStart(&ri, prefixes); + raxSeek(&ri, "^", NULL, 0); while(raxNext(&ri)) { void *result; - int found = raxFind(PrefixTable,ri.key,ri.key_len,&result); + int found = raxFind(PrefixTable, ri.key, ri.key_len, &result); serverAssert(found); trackingBcastInvalidationsForPrefix(result); } @@ -695,40 +695,41 @@ static void trackingBcastSendInvalidationsForPrefixes(rax *prefixes) { static void trackingBcastMoveClient(client *c, user *old_user) { user *new_user = c->user; raxIterator ri; - raxStart(&ri,c->client_tracking_prefixes); - raxSeek(&ri,"^",NULL,0); + raxStart(&ri, c->client_tracking_prefixes); + raxSeek(&ri, "^", NULL, 0); while(raxNext(&ri)) { void *result; - int found = raxFind(PrefixTable,ri.key,ri.key_len,&result); + int found = raxFind(PrefixTable, ri.key, ri.key_len, &result); serverAssert(found); bcastState *bs = result; - /* Remove from old user bucket. */ - rax *from_clients; + /* Swap-remove from old user vector. */ + vec *from_clients; found = raxFind(bs->clients, - (unsigned char*)&old_user,sizeof(old_user), + (unsigned char*)&old_user, sizeof(old_user), (void**)&from_clients); serverAssert(found); - raxRemove(from_clients,(unsigned char*)&c,sizeof(c),NULL); - if (raxSize(from_clients) == 0) { - raxFree(from_clients); + vecSwapRemove(from_clients, c); + if (vecSize(from_clients) == 0) { + vecRelease(from_clients); + zfree(from_clients); raxRemove(bs->clients, - (unsigned char*)&old_user,sizeof(old_user),NULL); + (unsigned char*)&old_user, sizeof(old_user), NULL); } - /* Insert into new user bucket. */ - rax *to_clients; + /* Insert into new user vector. */ + vec *to_clients; if (!raxFind(bs->clients, - (unsigned char*)&new_user,sizeof(new_user), + (unsigned char*)&new_user, sizeof(new_user), (void**)&to_clients)) { - to_clients = raxNew(); + to_clients = zmalloc(sizeof(vec)); + vecInit(to_clients, NULL, 0); raxInsert(bs->clients, - (unsigned char*)&new_user,sizeof(new_user), - to_clients,NULL); + (unsigned char*)&new_user, sizeof(new_user), + to_clients, NULL); } - raxTryInsert(to_clients, - (unsigned char*)&c,sizeof(c),NULL,NULL); + vecPush(to_clients, c); } raxStop(&ri); } diff --git a/src/vector.c b/src/vector.c index fc0ba13e1..80802a627 100644 --- a/src/vector.c +++ b/src/vector.c @@ -100,6 +100,26 @@ void vecPush(vec *v, void *value) { v->data[v->size++] = value; } +/* Return the index of the first occurrence of 'elem', or -1 if not found. */ +ssize_t vecFindIndexOf(const vec *v, void *elem) { + for (size_t i = 0; i < v->size; i++) { + if (v->data[i] == elem) return (ssize_t)i; + } + return -1; +} + +/* Remove the first occurrence of 'elem' by swapping with the last element. + * Returns 1 if found and removed, 0 if not found. */ +int vecSwapRemove(vec *v, void *elem) { + for (size_t i = 0; i < v->size; i++) { + if (v->data[i] == elem) { + v->data[i] = v->data[--v->size]; + return 1; + } + } + return 0; +} + #ifdef REDIS_TEST #include @@ -221,6 +241,42 @@ int vectorTest(int argc, char **argv, int flags) vecRelease(&v); test_cond("vecRelease() free method is a no-op on empty vector", vecTestFreeCalls == 0); + /* vecFindIndexOf tests */ + vecInit(&v, NULL, 0); + test_cond("vecFindIndexOf() returns -1 on empty vector", + vecFindIndexOf(&v, &one) == -1); + vecPush(&v, &one); + vecPush(&v, &two); + vecPush(&v, &three); + test_cond("vecFindIndexOf() finds first element", + vecFindIndexOf(&v, &one) == 0); + test_cond("vecFindIndexOf() finds middle element", + vecFindIndexOf(&v, &two) == 1); + test_cond("vecFindIndexOf() finds last element", + vecFindIndexOf(&v, &three) == 2); + test_cond("vecFindIndexOf() returns -1 for missing element", + vecFindIndexOf(&v, &four) == -1); + vecRelease(&v); + + /* vecSwapRemove tests */ + vecInit(&v, NULL, 0); + vecPush(&v, &one); + vecPush(&v, &two); + vecPush(&v, &three); + test_cond("vecSwapRemove() removes middle element and swaps with last", + vecSwapRemove(&v, &two) == 1 && + vecSize(&v) == 2 && + vecGet(&v, 0) == &one && vecGet(&v, 1) == &three); + test_cond("vecSwapRemove() returns 0 for missing element", + vecSwapRemove(&v, &four) == 0 && vecSize(&v) == 2); + test_cond("vecSwapRemove() removes last element without swap", + vecSwapRemove(&v, &three) == 1 && + vecSize(&v) == 1 && vecGet(&v, 0) == &one); + test_cond("vecSwapRemove() removes sole element", + vecSwapRemove(&v, &one) == 1 && vecSize(&v) == 0); + test_cond("vecSwapRemove() returns 0 on empty vector", + vecSwapRemove(&v, &one) == 0); + vecRelease(&v); return 0; } diff --git a/src/vector.h b/src/vector.h index c89955c98..db9f0e4a4 100644 --- a/src/vector.h +++ b/src/vector.h @@ -2,6 +2,7 @@ #define REDIS_VECTOR_H #include +#include /* * Simple append-only vector (dynamic array) of void * elements. @@ -96,6 +97,13 @@ void vecReserve(vec *v, size_t mincap); /* Append one element, growing storage as needed. */ void vecPush(vec *v, void *value); +/* Return the index of the first occurrence of 'elem', or -1 if not found. */ +ssize_t vecFindIndexOf(const vec *v, void *elem); + +/* Remove the first occurrence of 'elem' by swapping with the last element. + * Returns 1 if found and removed, 0 if not found. */ +int vecSwapRemove(vec *v, void *elem); + #ifdef REDIS_TEST int vectorTest(int argc, char **argv, int flags); #endif From 441280fdb3fe6906bc2cf1ecbedc12c77f042951 Mon Sep 17 00:00:00 2001 From: Hristo Staykov Date: Mon, 27 Apr 2026 20:12:05 +0300 Subject: [PATCH 04/12] Disable tracking before user change in deauthenticateAndCloseClient When ACL DELUSER kills a client, deauthenticateAndCloseClient sets c->user to DefaultUser before the client is freed. With the user-keyed BCAST structure, the later disableTracking call (from freeClient) would look for the client in the DefaultUser bucket where it was never registered, hitting a serverAssert. Call disableTracking early, while c->user still points to the correct user, so the client is removed from the right bucket. The subsequent call from freeClient is a no-op since the tracking flags are already cleared. Also update the disableTracking comment to reflect the current behavior for both BCAST and non-BCAST modes. --- src/networking.c | 1 + src/tracking.c | 15 +++++++++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/networking.c b/src/networking.c index 2f5384c3b..10e259117 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2073,6 +2073,7 @@ void clearClientConnectionState(client *c) { } void deauthenticateAndCloseClient(client *c) { + disableTracking(c); c->user = DefaultUser; c->authenticated = 0; /* We will write replies to this client later, so we can't diff --git a/src/tracking.c b/src/tracking.c index 130fe212d..b2f544753 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -39,12 +39,15 @@ typedef struct bcastState { prefix. */ } bcastState; -/* Remove the tracking state from the client 'c'. Note that there is not much - * to do for us here, if not to decrement the counter of the clients in - * tracking mode, because we just store the ID of the client in the tracking - * table, so we'll remove the ID reference in a lazy way. Otherwise when a - * client with many entries in the table is removed, it would cost a lot of - * time to do the cleanup. */ +/* Remove the tracking state from the client 'c'. + * + * For BCAST mode, the client is immediately removed from its per-user + * vector in every prefix it subscribes to, and empty user/prefix entries + * are freed. + * + * For normal (non-BCAST) tracking, the client's ID references in the + * tracking table are removed lazily to avoid expensive cleanup when a + * client with many cached keys disconnects. */ void disableTracking(client *c) { /* If this client is in broadcasting mode, we need to unsubscribe it * from all the prefixes it is registered to. */ From deb368931b19e9a7bf435b046a9a2f769b0fae0a Mon Sep 17 00:00:00 2001 From: Hristo Staykov Date: Tue, 28 Apr 2026 10:58:37 +0300 Subject: [PATCH 05/12] Rename test ACL users to avoid codespell errors codespell flags 'userA' as a misspelling of 'users'. Rename to 'usr_a' and 'usr_b' in the BCAST re-AUTH tracking test. --- tests/unit/tracking.tcl | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl index d1fd54139..d94b3b41e 100644 --- a/tests/unit/tracking.tcl +++ b/tests/unit/tracking.tcl @@ -945,11 +945,11 @@ start_server {tags {"tracking network logreqres:skip"}} { test {BCAST re-AUTH re-buckets correctly with ACL filtering} { clean_all - r ACL SETUSER userA on >passA ~a:* +@all - r ACL SETUSER userB on >passB ~b:* +@all + r ACL SETUSER usr_a on >passA ~a:* +@all + r ACL SETUSER usr_b on >passB ~b:* +@all set tc [redis_deferring_client] - $tc AUTH userA passA + $tc AUTH usr_a passA $tc read $tc HELLO 3 @@ -962,7 +962,7 @@ start_server {tags {"tracking network logreqres:skip"}} { $rd_sg SET a:1{t} val1 $rd_sg SET b:1{t} val1 - # Under userA, only a:* is visible. + # Under usr_a, only a:* is visible. after 100 $tc PING set keys {} @@ -976,8 +976,8 @@ start_server {tags {"tracking network logreqres:skip"}} { } assert_equal $keys [list a:1{t}] - # Re-AUTH as userB. - $tc AUTH userB passB + # Re-AUTH as usr_b. + $tc AUTH usr_b passB $tc read # Write again. @@ -1000,8 +1000,8 @@ start_server {tags {"tracking network logreqres:skip"}} { $tc CLIENT TRACKING off $tc read $tc close - r ACL DELUSER userA - r ACL DELUSER userB + r ACL DELUSER usr_a + r ACL DELUSER usr_b } $rd_redirection close From 1ba550c8acdf1ad2e83a87936c49e88a117922c9 Mon Sep 17 00:00:00 2001 From: Hristo Staykov Date: Tue, 28 Apr 2026 16:27:15 +0300 Subject: [PATCH 06/12] Use while-loop drain pattern for RESP3 BCAST push reads in tests The previous single-read pattern could miss invalidation messages if multiple pushes arrived for different prefixes. Switch to the same while-loop drain used elsewhere in the test, and document why the ordering guarantee holds (synchronous $rd_sg + beforeSleep flush). --- tests/unit/tracking.tcl | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl index d94b3b41e..14d9daab2 100644 --- a/tests/unit/tracking.tcl +++ b/tests/unit/tracking.tcl @@ -915,19 +915,27 @@ start_server {tags {"tracking network logreqres:skip"}} { # Read invalidation messages: there are two prefixes, but only # public: should have data for shareduser. after 100 + # $rd_sg is synchronous, so modified keys are already recorded + # on the server by the time we send PING. BCAST invalidations + # are flushed in beforeSleep before PONG, so they precede it + # on the wire. Drain all push messages until we hit the PONG. $c1 PING - set c1_resp [$c1 read] - if {[lindex $c1_resp 0] eq "invalidate"} { - set c1_keys [lindex $c1_resp 1] - # Read the PONG - $c1 read + while 1 { + set resp [$c1 read] + if {[lindex $resp 0] eq "invalidate"} { + lappend c1_keys {*}[lindex $resp 1] + } else { + break + } } $c2 PING - set c2_resp [$c2 read] - if {[lindex $c2_resp 0] eq "invalidate"} { - set c2_keys [lindex $c2_resp 1] - # Read the PONG - $c2 read + while 1 { + set resp [$c2 read] + if {[lindex $resp 0] eq "invalidate"} { + lappend c2_keys {*}[lindex $resp 1] + } else { + break + } } assert_equal [lsort $c1_keys] [list public:a{t}] @@ -963,6 +971,10 @@ start_server {tags {"tracking network logreqres:skip"}} { $rd_sg SET b:1{t} val1 # Under usr_a, only a:* is visible. + # $rd_sg is synchronous, so modified keys are already recorded + # on the server by the time we send PING. BCAST invalidations + # are flushed in beforeSleep before PONG, so they precede it + # on the wire. Drain all push messages until we hit the PONG. after 100 $tc PING set keys {} From 7a356a2c0e0435236d2ca5f679b612ead1d82852 Mon Sep 17 00:00:00 2001 From: Hristo Staykov Date: Wed, 29 Apr 2026 15:37:28 +0300 Subject: [PATCH 07/12] Add clientSetUser() to keep BCAST tracking buckets in sync on user changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit bcastState.clients is keyed by user pointer, so any c->user change on an external-facing client must move the client accordingly, or the server will crash when disableTracking looks it up under the wrong user. Introduce clientSetUser() - a static inline setter that assigns c->user and calls trackingBroadcastPostUserSwitch() to flush pending invalidations and move the client to the correct place in bcastState.clients. The function is a no-op for clients without CLIENT_TRACKING_BCAST or when the user hasn't actually changed. Convert all external-facing c->user assignment sites: - checkPasswordBasedAuth (AUTH command) - ACL LOAD path (user reassignment on config reload) - internalAuth (cluster internal secret) - authenticateClientWithUser (module auth API) - clientSetDefaultAuth (client creation / RESET command) - TLS auto-auth (certificate-based authentication) Remove the now-redundant trackingBroadcastPostUserSwitch() call from ACLAuthenticateUser. Both code paths that ACLAuthenticateUser delegates to already handle it internally via the new setter: checkPasswordBasedAuth calls clientSetUser directly, and checkModuleAuthentication changes c->user indirectly when the module auth callback invokes RM_AuthenticateClientWithUser or RM_AuthenticateClientWithACLUser, which go through authenticateClientWithUser — also converted to use the new setter. Remaining direct c->user assignments are on internal clients (CLIENT_MODULE, CLIENT_MASTER, script fake clients) that do not have BCAST tracking enabled. The tracking command has the CMD_NOSCRIPT bit set, therefore it is not allowed to be called from the scripting internal client. --- src/acl.c | 11 +++-------- src/module.c | 2 +- src/networking.c | 4 ++-- src/server.h | 5 +++++ 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/acl.c b/src/acl.c index ce3aac94e..cdcaf52af 100644 --- a/src/acl.c +++ b/src/acl.c @@ -1497,7 +1497,7 @@ void addAuthErrReply(client *c, robj *err) { int checkPasswordBasedAuth(client *c, robj *username, robj *password) { if (ACLCheckUserCredentials(username,password) == C_OK) { c->authenticated = 1; - c->user = ACLGetUserByName(username->ptr,sdslen(username->ptr)); + clientSetUser(c, ACLGetUserByName(username->ptr,sdslen(username->ptr))); moduleNotifyUserChanged(c); return AUTH_OK; } else { @@ -1514,15 +1514,11 @@ int checkPasswordBasedAuth(client *c, robj *username, robj *password) { * AUTH_BLOCKED - Indicates module authentication is in progress through a blocking implementation. */ int ACLAuthenticateUser(client *c, robj *username, robj *password, robj **err) { - user *old_user = c->user; int result = checkModuleAuthentication(c, username, password, err); /* If authentication was not handled by any Module, attempt normal password based auth. */ if (result == AUTH_NOT_HANDLED) { result = checkPasswordBasedAuth(c, username, password); } - if (result == AUTH_OK) { - trackingBroadcastPostUserSwitch(c, old_user); - } return result; } @@ -2485,8 +2481,7 @@ sds ACLLoadFromFile(const char *filename) { deauthenticateAndCloseClient(c); continue; } - c->user = new; - trackingBroadcastPostUserSwitch(c, original); + clientSetUser(c, new); } if (user_channels) @@ -3246,7 +3241,7 @@ static void internalAuth(client *c) { c->authenticated = 1; /* Set the user to the unrestricted user, if it is not already set (default). */ if (c->user != NULL) { - c->user = NULL; + clientSetUser(c, NULL); moduleNotifyUserChanged(c); } addReply(c, shared.ok); diff --git a/src/module.c b/src/module.c index 50a594987..6221816b4 100644 --- a/src/module.c +++ b/src/module.c @@ -10809,8 +10809,8 @@ static int authenticateClientWithUser(RedisModuleCtx *ctx, user *user, RedisModu moduleNotifyUserChanged(ctx->client); - ctx->client->user = user; ctx->client->authenticated = 1; + clientSetUser(ctx->client, user); if (clientHasModuleAuthInProgress(ctx->client)) { ctx->client->flags |= CLIENT_MODULE_AUTH_HAS_RESULT; diff --git a/src/networking.c b/src/networking.c index 10e259117..282b4ad18 100644 --- a/src/networking.c +++ b/src/networking.c @@ -103,7 +103,7 @@ void linkClient(client *c) { static void clientSetDefaultAuth(client *c) { /* If the default user does not require authentication, the user is * directly authenticated. */ - c->user = DefaultUser; + clientSetUser(c, DefaultUser); c->authenticated = (c->user->flags & USER_FLAG_NOPASS) && !(c->user->flags & USER_FLAG_DISABLED); } @@ -1614,8 +1614,8 @@ void clientAcceptHandler(connection *conn) { if (username != NULL) { user *u = ACLGetUserByName(username, sdslen(username)); if (u && !(u->flags & USER_FLAG_DISABLED)) { - c->user = u; c->authenticated = 1; + clientSetUser(c, u); moduleNotifyUserChanged(c); serverLog(LL_VERBOSE, "TLS: Auto-authenticated client as %s", server.hide_user_data_from_log ? "*redacted*" : u->name); diff --git a/src/server.h b/src/server.h index b427966b4..1215dc4ac 100644 --- a/src/server.h +++ b/src/server.h @@ -3362,6 +3362,11 @@ uint64_t trackingGetTotalKeys(void); uint64_t trackingGetTotalPrefixes(void); void trackingBroadcastInvalidationMessages(void); void trackingBroadcastPostUserSwitch(client *c, user *old_user); +static inline void clientSetUser(client *c, user *new_user) { + user *old = c->user; + c->user = new_user; + trackingBroadcastPostUserSwitch(c, old); +} int checkPrefixCollisionsOrReply(client *c, robj **prefix, size_t numprefix); /* List data type */ From 0b2e48983cc5c3d2eda8d21fe5c72576041e8153 Mon Sep 17 00:00:00 2001 From: Hristo Staykov Date: Thu, 30 Apr 2026 19:57:31 +0300 Subject: [PATCH 08/12] Add vecSwapRemoveAt for O(1) removal by index Extract the swap-remove-by-index logic from vecSwapRemove into a new vecSwapRemoveAt function, allowing callers that already know the index to remove an element without a redundant linear scan. --- src/vector.c | 47 ++++++++++++++++++++++++++++++++++++++++------- src/vector.h | 6 +++++- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/src/vector.c b/src/vector.c index 80802a627..18e095e08 100644 --- a/src/vector.c +++ b/src/vector.c @@ -108,16 +108,22 @@ ssize_t vecFindIndexOf(const vec *v, void *elem) { return -1; } +/* Remove the element at 'index' by swapping with the last element. + * Does not invoke the free callback. Requires index < vecSize(v). */ +void vecSwapRemoveAt(vec *v, size_t index) { + assert(index < v->size); + v->data[index] = v->data[--v->size]; +} + /* Remove the first occurrence of 'elem' by swapping with the last element. - * Returns 1 if found and removed, 0 if not found. */ + * Does not invoke the free callback. Returns 1 if found and removed, 0 if not found. */ int vecSwapRemove(vec *v, void *elem) { - for (size_t i = 0; i < v->size; i++) { - if (v->data[i] == elem) { - v->data[i] = v->data[--v->size]; - return 1; - } + ssize_t index = vecFindIndexOf(v, elem); + if (index < 0) { + return 0; } - return 0; + vecSwapRemoveAt(v, index); + return 1; } #ifdef REDIS_TEST @@ -258,6 +264,33 @@ int vectorTest(int argc, char **argv, int flags) vecFindIndexOf(&v, &four) == -1); vecRelease(&v); + /* vecSwapRemoveAt tests */ + vecInit(&v, NULL, 0); + vecPush(&v, &one); + vecPush(&v, &two); + vecPush(&v, &three); + vecSwapRemoveAt(&v, 1); + test_cond("vecSwapRemoveAt() removes middle element and swaps with last", + vecSize(&v) == 2 && + vecGet(&v, 0) == &one && vecGet(&v, 1) == &three); + vecSwapRemoveAt(&v, 1); + test_cond("vecSwapRemoveAt() removes last element", + vecSize(&v) == 1 && vecGet(&v, 0) == &one); + vecSwapRemoveAt(&v, 0); + test_cond("vecSwapRemoveAt() removes sole element", + vecSize(&v) == 0); + vecRelease(&v); + + vecInit(&v, NULL, 0); + vecPush(&v, &one); + vecPush(&v, &two); + vecPush(&v, &three); + vecSwapRemoveAt(&v, 0); + test_cond("vecSwapRemoveAt() removes first element and swaps with last", + vecSize(&v) == 2 && + vecGet(&v, 0) == &three && vecGet(&v, 1) == &two); + vecRelease(&v); + /* vecSwapRemove tests */ vecInit(&v, NULL, 0); vecPush(&v, &one); diff --git a/src/vector.h b/src/vector.h index db9f0e4a4..32ab93fa6 100644 --- a/src/vector.h +++ b/src/vector.h @@ -100,8 +100,12 @@ void vecPush(vec *v, void *value); /* Return the index of the first occurrence of 'elem', or -1 if not found. */ ssize_t vecFindIndexOf(const vec *v, void *elem); +/* Remove the element at 'index' by swapping with the last element. + * Does not invoke the free callback. Requires index < vecSize(v). */ +void vecSwapRemoveAt(vec *v, size_t index); + /* Remove the first occurrence of 'elem' by swapping with the last element. - * Returns 1 if found and removed, 0 if not found. */ + * Does not invoke the free callback. Returns 1 if found and removed, 0 if not found. */ int vecSwapRemove(vec *v, void *elem); #ifdef REDIS_TEST From 616cac3d3d7d2b61d442c6729f8c76220cdcfdb4 Mon Sep 17 00:00:00 2001 From: Hristo Staykov Date: Mon, 4 May 2026 12:11:02 +0300 Subject: [PATCH 09/12] Assert vecSwapRemove success in tracking operations Add serverAssert on the return value of vecSwapRemove in disableTracking and trackingBcastMoveClient to ensure the client is actually present in the user's client vector, catching invariant violations immediately instead of silently leaving stale entries. --- src/tracking.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/tracking.c b/src/tracking.c index b2f544753..69c7b354c 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -67,7 +67,8 @@ void disableTracking(client *c) { (unsigned char*)&c->user, sizeof(c->user), (void**)&user_clients); serverAssert(found); - vecSwapRemove(user_clients, c); + int removed = vecSwapRemove(user_clients, c); + serverAssert(removed); if (vecSize(user_clients) == 0) { vecRelease(user_clients); zfree(user_clients); @@ -712,7 +713,8 @@ static void trackingBcastMoveClient(client *c, user *old_user) { (unsigned char*)&old_user, sizeof(old_user), (void**)&from_clients); serverAssert(found); - vecSwapRemove(from_clients, c); + int removed = vecSwapRemove(from_clients, c); + serverAssert(removed); if (vecSize(from_clients) == 0) { vecRelease(from_clients); zfree(from_clients); From cadce84439c8d7b7cf174f6e3138a06d19575f86 Mon Sep 17 00:00:00 2001 From: Hristo Staykov Date: Tue, 5 May 2026 11:49:12 +0300 Subject: [PATCH 10/12] Remove vecSwapRemove in favor of vecFindIndexOf + vecSwapRemoveAt Drop the vecSwapRemove wrapper and replace call sites in tracking.c with explicit vecFindIndexOf + vecSwapRemoveAt. This keeps the API surface of vector smaller. --- src/tracking.c | 10 ++++++---- src/vector.c | 31 ------------------------------- src/vector.h | 4 ---- 3 files changed, 6 insertions(+), 39 deletions(-) diff --git a/src/tracking.c b/src/tracking.c index 69c7b354c..492fe1d86 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -67,8 +67,9 @@ void disableTracking(client *c) { (unsigned char*)&c->user, sizeof(c->user), (void**)&user_clients); serverAssert(found); - int removed = vecSwapRemove(user_clients, c); - serverAssert(removed); + ssize_t idx = vecFindIndexOf(user_clients, c); + serverAssert(idx >= 0); + vecSwapRemoveAt(user_clients, idx); if (vecSize(user_clients) == 0) { vecRelease(user_clients); zfree(user_clients); @@ -713,8 +714,9 @@ static void trackingBcastMoveClient(client *c, user *old_user) { (unsigned char*)&old_user, sizeof(old_user), (void**)&from_clients); serverAssert(found); - int removed = vecSwapRemove(from_clients, c); - serverAssert(removed); + ssize_t idx = vecFindIndexOf(from_clients, c); + serverAssert(idx >= 0); + vecSwapRemoveAt(from_clients, idx); if (vecSize(from_clients) == 0) { vecRelease(from_clients); zfree(from_clients); diff --git a/src/vector.c b/src/vector.c index 18e095e08..2bd9d9cb2 100644 --- a/src/vector.c +++ b/src/vector.c @@ -115,17 +115,6 @@ void vecSwapRemoveAt(vec *v, size_t index) { v->data[index] = v->data[--v->size]; } -/* Remove the first occurrence of 'elem' by swapping with the last element. - * Does not invoke the free callback. Returns 1 if found and removed, 0 if not found. */ -int vecSwapRemove(vec *v, void *elem) { - ssize_t index = vecFindIndexOf(v, elem); - if (index < 0) { - return 0; - } - vecSwapRemoveAt(v, index); - return 1; -} - #ifdef REDIS_TEST #include @@ -291,26 +280,6 @@ int vectorTest(int argc, char **argv, int flags) vecGet(&v, 0) == &three && vecGet(&v, 1) == &two); vecRelease(&v); - /* vecSwapRemove tests */ - vecInit(&v, NULL, 0); - vecPush(&v, &one); - vecPush(&v, &two); - vecPush(&v, &three); - test_cond("vecSwapRemove() removes middle element and swaps with last", - vecSwapRemove(&v, &two) == 1 && - vecSize(&v) == 2 && - vecGet(&v, 0) == &one && vecGet(&v, 1) == &three); - test_cond("vecSwapRemove() returns 0 for missing element", - vecSwapRemove(&v, &four) == 0 && vecSize(&v) == 2); - test_cond("vecSwapRemove() removes last element without swap", - vecSwapRemove(&v, &three) == 1 && - vecSize(&v) == 1 && vecGet(&v, 0) == &one); - test_cond("vecSwapRemove() removes sole element", - vecSwapRemove(&v, &one) == 1 && vecSize(&v) == 0); - test_cond("vecSwapRemove() returns 0 on empty vector", - vecSwapRemove(&v, &one) == 0); - vecRelease(&v); - return 0; } #endif diff --git a/src/vector.h b/src/vector.h index 32ab93fa6..3c2b41c21 100644 --- a/src/vector.h +++ b/src/vector.h @@ -104,10 +104,6 @@ ssize_t vecFindIndexOf(const vec *v, void *elem); * Does not invoke the free callback. Requires index < vecSize(v). */ void vecSwapRemoveAt(vec *v, size_t index); -/* Remove the first occurrence of 'elem' by swapping with the last element. - * Does not invoke the free callback. Returns 1 if found and removed, 0 if not found. */ -int vecSwapRemove(vec *v, void *elem); - #ifdef REDIS_TEST int vectorTest(int argc, char **argv, int flags); #endif From 52fed302ca0d1934e2f2ec5474652dda2361a6db Mon Sep 17 00:00:00 2001 From: Hristo Staykov Date: Tue, 26 May 2026 13:59:15 +0300 Subject: [PATCH 11/12] Move clientSetUser() to acl.c, rename vecFindIndexOf to vecIndexOf Move clientSetUser() from a static inline in server.h to a regular function in acl.c alongside other user-management functions. The function has side-effect logic and is expected to grow; Rename vecFindIndexOf() to vecIndexOf() for brevity and consistency with standard container APIs. --- src/acl.c | 8 ++++++++ src/server.h | 6 +----- src/tracking.c | 6 +++--- src/vector.c | 24 ++++++++++++------------ src/vector.h | 2 +- 5 files changed, 25 insertions(+), 21 deletions(-) diff --git a/src/acl.c b/src/acl.c index cdcaf52af..25b0a7d4c 100644 --- a/src/acl.c +++ b/src/acl.c @@ -523,6 +523,14 @@ void ACLCopyUser(user *dst, user *src) { } } +/* Set the user for a client, performing any necessary bookkeeping such as + * updating broadcast tracking state for the user switch. */ +void clientSetUser(client *c, user *new_user) { + user *old = c->user; + c->user = new_user; + trackingBroadcastPostUserSwitch(c, old); +} + /* Given a command ID, this function set by reference 'word' and 'bit' * so that user->allowed_commands[word] will address the right word * where the corresponding bit for the provided ID is stored, and diff --git a/src/server.h b/src/server.h index 1215dc4ac..4c232afef 100644 --- a/src/server.h +++ b/src/server.h @@ -3362,11 +3362,7 @@ uint64_t trackingGetTotalKeys(void); uint64_t trackingGetTotalPrefixes(void); void trackingBroadcastInvalidationMessages(void); void trackingBroadcastPostUserSwitch(client *c, user *old_user); -static inline void clientSetUser(client *c, user *new_user) { - user *old = c->user; - c->user = new_user; - trackingBroadcastPostUserSwitch(c, old); -} +void clientSetUser(client *c, user *new_user); int checkPrefixCollisionsOrReply(client *c, robj **prefix, size_t numprefix); /* List data type */ diff --git a/src/tracking.c b/src/tracking.c index 492fe1d86..615e1a8bf 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -67,7 +67,7 @@ void disableTracking(client *c) { (unsigned char*)&c->user, sizeof(c->user), (void**)&user_clients); serverAssert(found); - ssize_t idx = vecFindIndexOf(user_clients, c); + ssize_t idx = vecIndexOf(user_clients, c); serverAssert(idx >= 0); vecSwapRemoveAt(user_clients, idx); if (vecSize(user_clients) == 0) { @@ -181,7 +181,7 @@ static void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { user_clients, NULL); } - if (vecFindIndexOf(user_clients, c) < 0) { + if (vecIndexOf(user_clients, c) < 0) { vecPush(user_clients, c); if (c->client_tracking_prefixes == NULL) c->client_tracking_prefixes = raxNew(); @@ -714,7 +714,7 @@ static void trackingBcastMoveClient(client *c, user *old_user) { (unsigned char*)&old_user, sizeof(old_user), (void**)&from_clients); serverAssert(found); - ssize_t idx = vecFindIndexOf(from_clients, c); + ssize_t idx = vecIndexOf(from_clients, c); serverAssert(idx >= 0); vecSwapRemoveAt(from_clients, idx); if (vecSize(from_clients) == 0) { diff --git a/src/vector.c b/src/vector.c index 2bd9d9cb2..70f080dbd 100644 --- a/src/vector.c +++ b/src/vector.c @@ -101,7 +101,7 @@ void vecPush(vec *v, void *value) { } /* Return the index of the first occurrence of 'elem', or -1 if not found. */ -ssize_t vecFindIndexOf(const vec *v, void *elem) { +ssize_t vecIndexOf(const vec *v, void *elem) { for (size_t i = 0; i < v->size; i++) { if (v->data[i] == elem) return (ssize_t)i; } @@ -236,21 +236,21 @@ int vectorTest(int argc, char **argv, int flags) vecRelease(&v); test_cond("vecRelease() free method is a no-op on empty vector", vecTestFreeCalls == 0); - /* vecFindIndexOf tests */ + /* vecIndexOf tests */ vecInit(&v, NULL, 0); - test_cond("vecFindIndexOf() returns -1 on empty vector", - vecFindIndexOf(&v, &one) == -1); + test_cond("vecIndexOf() returns -1 on empty vector", + vecIndexOf(&v, &one) == -1); vecPush(&v, &one); vecPush(&v, &two); vecPush(&v, &three); - test_cond("vecFindIndexOf() finds first element", - vecFindIndexOf(&v, &one) == 0); - test_cond("vecFindIndexOf() finds middle element", - vecFindIndexOf(&v, &two) == 1); - test_cond("vecFindIndexOf() finds last element", - vecFindIndexOf(&v, &three) == 2); - test_cond("vecFindIndexOf() returns -1 for missing element", - vecFindIndexOf(&v, &four) == -1); + test_cond("vecIndexOf() finds first element", + vecIndexOf(&v, &one) == 0); + test_cond("vecIndexOf() finds middle element", + vecIndexOf(&v, &two) == 1); + test_cond("vecIndexOf() finds last element", + vecIndexOf(&v, &three) == 2); + test_cond("vecIndexOf() returns -1 for missing element", + vecIndexOf(&v, &four) == -1); vecRelease(&v); /* vecSwapRemoveAt tests */ diff --git a/src/vector.h b/src/vector.h index 3c2b41c21..c89465567 100644 --- a/src/vector.h +++ b/src/vector.h @@ -98,7 +98,7 @@ void vecReserve(vec *v, size_t mincap); void vecPush(vec *v, void *value); /* Return the index of the first occurrence of 'elem', or -1 if not found. */ -ssize_t vecFindIndexOf(const vec *v, void *elem); +ssize_t vecIndexOf(const vec *v, void *elem); /* Remove the element at 'index' by swapping with the last element. * Does not invoke the free callback. Requires index < vecSize(v). */ From 6790d6efbdb339e698126c24a67b23aadd981c52 Mon Sep 17 00:00:00 2001 From: Hristo Staykov Date: Tue, 26 May 2026 15:45:56 +0300 Subject: [PATCH 12/12] Initialize c->user before clientSetDefaultAuth to avoid undefined behavior clientSetUser() reads c->user to pass the old value to trackingBroadcastPostUserSwitch(). In createClient(), c->user was never initialized before clientSetDefaultAuth() called clientSetUser(), resulting in a read of indeterminate memory. While harmless in practice (trackingBroadcastPostUserSwitch returns early on c->flags == 0), the read itself is undefined behavior and would be flagged by MSAN/Valgrind. Initialize c->user to DefaultUser before the call, so the subsequent clientSetUser(c, DefaultUser) sees old == new and the post-switch hook is a no-op. --- src/networking.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/networking.c b/src/networking.c index 282b4ad18..ebac816ab 100644 --- a/src/networking.c +++ b/src/networking.c @@ -193,6 +193,7 @@ client *createClient(connection *conn) { c->ctime = c->lastinteraction = server.unixtime; c->io_lastinteraction = 0; c->duration = 0; + c->user = DefaultUser; /* Set a safe default value: clientSetDefaultAuth reads c->user. */ clientSetDefaultAuth(c); c->replstate = REPL_STATE_NONE; c->repl_start_cmd_stream_on_ack = 0;