From 8b3bbfdf1a2b1ec19cb10dca5b5fe564592d97a0 Mon Sep 17 00:00:00 2001 From: Hristo Staykov Date: Fri, 22 May 2026 17:32:51 +0300 Subject: [PATCH] Make defragStagePubsubClientSide respect endtime for bounded latency The function previously walked all clients in a single unbounded loop, ignoring the endtime parameter. With many pubsub clients this could stall the event loop. Switch from iterating server.clients (a list) to server.clients_index (a rax keyed by big-endian client ID). The rax supports resumable iteration: on time-out we save the last-processed key and re-seek with ">" on the next call, safely skipping any clients deleted in between. The endtime is checked every 16 clients. --- src/defrag.c | 82 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 52 insertions(+), 30 deletions(-) diff --git a/src/defrag.c b/src/defrag.c index 8c3141647..ccebeaca6 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -133,6 +133,11 @@ typedef struct { } defragPubSubCtx; static_assert(offsetof(defragPubSubCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); +/* Context for client-side pubsub defrag (iterates server.clients_index) */ +typedef struct { + uint64_t last_client_id_raw; /* big-endian rax key of last processed client, 0 = start */ +} defragPubsubClientCtx; + typedef struct { sds module_name; unsigned long cursor; @@ -1626,42 +1631,57 @@ static doneStatus defragStagePubsubKvstore(void *ctx, monotime endtime) { * tables. Outer dict keys are user* pointers (user objects are not moved by * active defrag today; if that changes, all user-pointer indexes must be * updated). Inner dict key objects (robj) are NOT touched here - they are - * handled by the server-side defrag callbacks above. */ + * handled by the server-side defrag callbacks above. + * + * Iteration uses server.clients_index (a rax keyed by big-endian client ID) + * so that clients deleted between calls are safely skipped, and no list + * mutation is required. */ static doneStatus defragStagePubsubClientSide(void *ctx, monotime endtime) { - UNUSED(ctx); - UNUSED(endtime); + defragPubsubClientCtx *dctx = ctx; + unsigned int iterations = 0; - listIter li; - listNode *ln; - listRewind(server.clients, &li); - while ((ln = listNext(&li)) != NULL) { - client *c = listNodeValue(ln); - if (dictSize(c->pubsub_subscriptions) == 0) continue; + raxIterator ri; + raxStart(&ri, server.clients_index); + raxSeek(&ri, ">", (unsigned char *)&dctx->last_client_id_raw, + sizeof(dctx->last_client_id_raw)); - dict *newd = dictDefragTables(c->pubsub_subscriptions); - if (newd) c->pubsub_subscriptions = newd; + while (raxNext(&ri)) { + client *c = ri.data; - dictIterator di; - dictEntry *de; - dictInitIterator(&di, c->pubsub_subscriptions); - while ((de = dictNext(&di)) != NULL) { - pubsubUserSubs *subs = dictGetVal(de); - pubsubUserSubs *newsubs = activeDefragAlloc(subs); - if (newsubs) { - dictSetVal(c->pubsub_subscriptions, de, newsubs); - subs = newsubs; + if (dictSize(c->pubsub_subscriptions) > 0) { + dict *newd = dictDefragTables(c->pubsub_subscriptions); + if (newd) c->pubsub_subscriptions = newd; + + dictIterator di; + dictEntry *de; + dictInitIterator(&di, c->pubsub_subscriptions); + while ((de = dictNext(&di)) != NULL) { + pubsubUserSubs *subs = dictGetVal(de); + pubsubUserSubs *newsubs = activeDefragAlloc(subs); + if (newsubs) { + dictSetVal(c->pubsub_subscriptions, de, newsubs); + subs = newsubs; + } + + dict *newinner; + if ((newinner = dictDefragTables(subs->channels))) + subs->channels = newinner; + if ((newinner = dictDefragTables(subs->patterns))) + subs->patterns = newinner; + if ((newinner = dictDefragTables(subs->shard_channels))) + subs->shard_channels = newinner; } - - dict *newinner; - if ((newinner = dictDefragTables(subs->channels))) - subs->channels = newinner; - if ((newinner = dictDefragTables(subs->patterns))) - subs->patterns = newinner; - if ((newinner = dictDefragTables(subs->shard_channels))) - subs->shard_channels = newinner; + dictResetIterator(&di); + } + + if (++iterations >= 16 && getMonotonicUs() >= endtime) { + memcpy(&dctx->last_client_id_raw, ri.key, sizeof(dctx->last_client_id_raw)); + raxStop(&ri); + return DEFRAG_NOT_DONE; } - dictResetIterator(&di); } + + raxStop(&ri); return DEFRAG_DONE; } @@ -2001,7 +2021,9 @@ static void beginDefragCycle(void) { addDefragStage(defragStagePubsubKvstore, zfree, defrag_pubsubshard_ctx); /* Add stage for client-side pubsub per-user dict structures. */ - addDefragStage(defragStagePubsubClientSide, NULL, NULL); + defragPubsubClientCtx *defrag_pubsub_client_ctx = zmalloc(sizeof(defragPubsubClientCtx)); + defrag_pubsub_client_ctx->last_client_id_raw = 0; + addDefragStage(defragStagePubsubClientSide, zfree, defrag_pubsub_client_ctx); addDefragStage(defragLuaScripts, NULL, NULL);