Make defragStagePubsubClientSide respect endtime for bounded latency

The function previously walked all clients in a single unbounded loop,
ignoring the endtime parameter. With many pubsub clients this could
stall the event loop.

Switch from iterating server.clients (a list) to server.clients_index
(a rax keyed by big-endian client ID). The rax supports resumable
iteration: on time-out we save the last-processed key and re-seek with
">" on the next call, safely skipping any clients deleted in between.
The endtime is checked every 16 clients.
This commit is contained in:
Hristo Staykov 2026-05-22 17:32:51 +03:00
parent 7e3cd1a17a
commit 8b3bbfdf1a

View file

@ -133,6 +133,11 @@ typedef struct {
} defragPubSubCtx;
static_assert(offsetof(defragPubSubCtx, kvstate) == 0, "defragStageKvstoreHelper requires this");
/* Context for client-side pubsub defrag (iterates server.clients_index) */
typedef struct {
uint64_t last_client_id_raw; /* big-endian rax key of last processed client, 0 = start */
} defragPubsubClientCtx;
typedef struct {
sds module_name;
unsigned long cursor;
@ -1626,42 +1631,57 @@ static doneStatus defragStagePubsubKvstore(void *ctx, monotime endtime) {
* tables. Outer dict keys are user* pointers (user objects are not moved by
* active defrag today; if that changes, all user-pointer indexes must be
* updated). Inner dict key objects (robj) are NOT touched here - they are
* handled by the server-side defrag callbacks above. */
* handled by the server-side defrag callbacks above.
*
* Iteration uses server.clients_index (a rax keyed by big-endian client ID)
* so that clients deleted between calls are safely skipped, and no list
* mutation is required. */
static doneStatus defragStagePubsubClientSide(void *ctx, monotime endtime) {
UNUSED(ctx);
UNUSED(endtime);
defragPubsubClientCtx *dctx = ctx;
unsigned int iterations = 0;
listIter li;
listNode *ln;
listRewind(server.clients, &li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
if (dictSize(c->pubsub_subscriptions) == 0) continue;
raxIterator ri;
raxStart(&ri, server.clients_index);
raxSeek(&ri, ">", (unsigned char *)&dctx->last_client_id_raw,
sizeof(dctx->last_client_id_raw));
dict *newd = dictDefragTables(c->pubsub_subscriptions);
if (newd) c->pubsub_subscriptions = newd;
while (raxNext(&ri)) {
client *c = ri.data;
dictIterator di;
dictEntry *de;
dictInitIterator(&di, c->pubsub_subscriptions);
while ((de = dictNext(&di)) != NULL) {
pubsubUserSubs *subs = dictGetVal(de);
pubsubUserSubs *newsubs = activeDefragAlloc(subs);
if (newsubs) {
dictSetVal(c->pubsub_subscriptions, de, newsubs);
subs = newsubs;
if (dictSize(c->pubsub_subscriptions) > 0) {
dict *newd = dictDefragTables(c->pubsub_subscriptions);
if (newd) c->pubsub_subscriptions = newd;
dictIterator di;
dictEntry *de;
dictInitIterator(&di, c->pubsub_subscriptions);
while ((de = dictNext(&di)) != NULL) {
pubsubUserSubs *subs = dictGetVal(de);
pubsubUserSubs *newsubs = activeDefragAlloc(subs);
if (newsubs) {
dictSetVal(c->pubsub_subscriptions, de, newsubs);
subs = newsubs;
}
dict *newinner;
if ((newinner = dictDefragTables(subs->channels)))
subs->channels = newinner;
if ((newinner = dictDefragTables(subs->patterns)))
subs->patterns = newinner;
if ((newinner = dictDefragTables(subs->shard_channels)))
subs->shard_channels = newinner;
}
dict *newinner;
if ((newinner = dictDefragTables(subs->channels)))
subs->channels = newinner;
if ((newinner = dictDefragTables(subs->patterns)))
subs->patterns = newinner;
if ((newinner = dictDefragTables(subs->shard_channels)))
subs->shard_channels = newinner;
dictResetIterator(&di);
}
if (++iterations >= 16 && getMonotonicUs() >= endtime) {
memcpy(&dctx->last_client_id_raw, ri.key, sizeof(dctx->last_client_id_raw));
raxStop(&ri);
return DEFRAG_NOT_DONE;
}
dictResetIterator(&di);
}
raxStop(&ri);
return DEFRAG_DONE;
}
@ -2001,7 +2021,9 @@ static void beginDefragCycle(void) {
addDefragStage(defragStagePubsubKvstore, zfree, defrag_pubsubshard_ctx);
/* Add stage for client-side pubsub per-user dict structures. */
addDefragStage(defragStagePubsubClientSide, NULL, NULL);
defragPubsubClientCtx *defrag_pubsub_client_ctx = zmalloc(sizeof(defragPubsubClientCtx));
defrag_pubsub_client_ctx->last_client_id_raw = 0;
addDefragStage(defragStagePubsubClientSide, zfree, defrag_pubsub_client_ctx);
addDefragStage(defragLuaScripts, NULL, NULL);