From 82396716d05b3d63bc0d66a5820dfc604a6a2fd8 Mon Sep 17 00:00:00 2001 From: Stav-Levi <45394834+StavRLevi@users.noreply.github.com> Date: Mon, 28 Jul 2025 10:17:48 +0300 Subject: [PATCH] Add API to allow Redis modules to unsubscribe from keyspace notifications This API complements module subscribe by enabling modules to unsubscribe from specific keyspace event notifications when they are no longer needed. This helps reduce performance overhead and unnecessary callback invocations. The function matches subscriptions based on event mask, callback pointer, and module identity. If a matching subscription is found, it is removed. Returns REDISMODULE_OK if a subscription was successfully removed, otherwise REDISMODULE_ERR. --- src/module.c | 33 ++++++++++++ src/redismodule.h | 2 + tests/modules/keyspace_events.c | 65 ++++++++++++++++++++++-- tests/unit/moduleapi/keyspace_events.tcl | 18 +++++++ 4 files changed, 115 insertions(+), 3 deletions(-) diff --git a/src/module.c b/src/module.c index a1a64ce2b..41e6d93d3 100644 --- a/src/module.c +++ b/src/module.c @@ -8866,6 +8866,38 @@ int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNoti return REDISMODULE_OK; } +/* + * RM_UnsubscribeFromKeyspaceEvents - Unregister a module's callback from keyspace notifications for specific event types. + * + * This function removes a previously registered subscription identified by both the event mask and the callback function. + * It is useful to reduce performance overhead when the module no longer requires notifications for certain events. + * + * Parameters: + * - ctx: The RedisModuleCtx associated with the calling module. + * - types: The event mask representing the keyspace notification types to unsubscribe from. + * - callback: The callback function pointer that was originally registered for these events. + * + * Returns: + * - REDISMODULE_OK on successful removal of the subscription. + * - REDISMODULE_ERR if no matching subscription was found or if invalid parameters were provided. + */ +int RM_UnsubscribeFromKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc callback) { + if (!ctx || !callback) return REDISMODULE_ERR; + int removed = 0; + listIter li; + listNode *ln; + listRewind(moduleKeyspaceSubscribers,&li); + while ((ln = listNext(&li))) { + RedisModuleKeyspaceSubscriber *sub = ln->value; + if (sub->event_mask == types && sub->notify_callback == callback && sub->module == ctx->module) { + zfree(sub); + listDelNode(moduleKeyspaceSubscribers, ln); + removed++; + } + } + return removed > 0 ? REDISMODULE_OK : REDISMODULE_ERR; +} + void firePostExecutionUnitJobs(void) { /* Avoid propagation of commands. * In that way, postExecutionUnitOperations will prevent @@ -14730,6 +14762,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(NotifyKeyspaceEvent); REGISTER_API(GetNotifyKeyspaceEvents); REGISTER_API(SubscribeToKeyspaceEvents); + REGISTER_API(UnsubscribeFromKeyspaceEvents); REGISTER_API(AddPostNotificationJob); REGISTER_API(RegisterClusterMessageReceiver); REGISTER_API(SendClusterMessage); diff --git a/src/redismodule.h b/src/redismodule.h index 2a0f272a5..436e64ca9 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -1255,6 +1255,7 @@ REDISMODULE_API void (*RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx) R REDISMODULE_API int (*RedisModule_ThreadSafeContextTryLock)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_UnsubscribeFromKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_AddPostNotificationJob)(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)(void) REDISMODULE_ATTR; @@ -1643,6 +1644,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(BlockedClientMeasureTimeEnd); REDISMODULE_GET_API(SetDisconnectCallback); REDISMODULE_GET_API(SubscribeToKeyspaceEvents); + REDISMODULE_GET_API(UnsubscribeFromKeyspaceEvents); REDISMODULE_GET_API(AddPostNotificationJob); REDISMODULE_GET_API(NotifyKeyspaceEvent); REDISMODULE_GET_API(GetNotifyKeyspaceEvents); diff --git a/tests/modules/keyspace_events.c b/tests/modules/keyspace_events.c index e8dc1be4a..146261f6e 100644 --- a/tests/modules/keyspace_events.c +++ b/tests/modules/keyspace_events.c @@ -45,8 +45,10 @@ static int KeySpace_NotificationLoaded(RedisModuleCtx *ctx, int type, const char return REDISMODULE_OK; } +static long long callback_call_count = 0; static int KeySpace_NotificationGeneric(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { REDISMODULE_NOT_USED(type); + callback_call_count++; const char *key_str = RedisModule_StringPtrLen(key, NULL); if (strncmp(key_str, "count_dels_", 11) == 0 && strcmp(event, "del") == 0) { if (RedisModule_GetContextFlags(ctx) & REDISMODULE_CTX_FLAGS_MASTER) { @@ -296,6 +298,55 @@ static int cmdGetDels(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return RedisModule_ReplyWithLongLong(ctx, dels); } +static RedisModuleNotificationFunc get_callback_for_event(int event_mask) { + switch(event_mask) { + case REDISMODULE_NOTIFY_LOADED: + return KeySpace_NotificationLoaded; + case REDISMODULE_NOTIFY_GENERIC: + return KeySpace_NotificationGeneric; + case REDISMODULE_NOTIFY_EXPIRED: + return KeySpace_NotificationExpired; + case REDISMODULE_NOTIFY_MODULE: + return KeySpace_NotificationModule; + case REDISMODULE_NOTIFY_KEY_MISS: + return KeySpace_NotificationModuleKeyMiss; + case REDISMODULE_NOTIFY_STRING: + // We have two callbacks for STRING events in your OnLoad, + // For simplicity, pick the first: + return KeySpace_NotificationModuleString; + default: + return NULL; + } +} + +int GetCallbackCountCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModule_ReplyWithLongLong(ctx, callback_call_count); + return REDISMODULE_OK; +} + +static int CmdUnsub(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) { + return RedisModule_WrongArity(ctx); + } + + long long event_mask; + if (RedisModule_StringToLongLong(argv[1], &event_mask) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx, "ERR invalid event mask"); + } + + RedisModuleNotificationFunc cb = get_callback_for_event((int)event_mask); + if (cb == NULL) { + return RedisModule_ReplyWithError(ctx, "ERR unknown event mask"); + } + + if (RedisModule_UnsubscribeFromKeyspaceEvents(ctx, (int)event_mask, cb) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx, "ERR unsubscribe failed"); + } + + return RedisModule_ReplyWithSimpleString(ctx, "OK"); +} /* This function must be present on each Redis module. It is used in order to * register the commands into the Redis server. */ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { @@ -357,17 +408,17 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) "write", 0, 0, 0) == REDISMODULE_ERR){ return REDISMODULE_ERR; } - + if (RedisModule_CreateCommand(ctx, "keyspace.incr_case1", cmdIncrCase1, "write", 0, 0, 0) == REDISMODULE_ERR){ return REDISMODULE_ERR; } - + if (RedisModule_CreateCommand(ctx, "keyspace.incr_case2", cmdIncrCase2, "write", 0, 0, 0) == REDISMODULE_ERR){ return REDISMODULE_ERR; } - + if (RedisModule_CreateCommand(ctx, "keyspace.incr_case3", cmdIncrCase3, "write", 0, 0, 0) == REDISMODULE_ERR){ return REDISMODULE_ERR; @@ -383,6 +434,14 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; } + if (RedisModule_CreateCommand(ctx, "keyspace.unsubscribe", CmdUnsub, "write", 0, 0, 0) == REDISMODULE_ERR){ + return REDISMODULE_ERR; + } + + if (RedisModule_CreateCommand(ctx, "keyspace.callback_count", GetCallbackCountCommand, "", 0, 0, 0)== REDISMODULE_ERR){ + return REDISMODULE_ERR; + } + if (argc == 1) { const char *ptr = RedisModule_StringPtrLen(argv[0], NULL); if (!strcasecmp(ptr, "noload")) { diff --git a/tests/unit/moduleapi/keyspace_events.tcl b/tests/unit/moduleapi/keyspace_events.tcl index 30fdc1e2b..0b8dd476c 100644 --- a/tests/unit/moduleapi/keyspace_events.tcl +++ b/tests/unit/moduleapi/keyspace_events.tcl @@ -87,6 +87,24 @@ tags "modules" { $rd1 close } + test "Keyspace notifications: unsubscribe removes handler" { + r config set notify-keyspace-events KEA + set before [r keyspace.callback_count] + r set a 1 + r del a + wait_for_condition 100 10 { + [r keyspace.callback_count] > $before + } else { + fail "callback did not trigger" + } + set before_unsub [r keyspace.callback_count] + r keyspace.unsubscribe 4 ;# REDISMODULE_NOTIFY_GENERIC + r set a 1 + r del a + set after_unsub [r keyspace.callback_count] + assert_equal $before_unsub $after_unsub + } + test {Test expired key space event} { set prev_expired [s expired_keys] r set exp 1 PX 10