diff --git a/src/module.c b/src/module.c index a1a64ce2b..41e6d93d3 100644 --- a/src/module.c +++ b/src/module.c @@ -8866,6 +8866,38 @@ int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNoti return REDISMODULE_OK; } +/* + * RM_UnsubscribeFromKeyspaceEvents - Unregister a module's callback from keyspace notifications for specific event types. + * + * This function removes a previously registered subscription identified by both the event mask and the callback function. + * It is useful to reduce performance overhead when the module no longer requires notifications for certain events. + * + * Parameters: + * - ctx: The RedisModuleCtx associated with the calling module. + * - types: The event mask representing the keyspace notification types to unsubscribe from. + * - callback: The callback function pointer that was originally registered for these events. + * + * Returns: + * - REDISMODULE_OK on successful removal of the subscription. + * - REDISMODULE_ERR if no matching subscription was found or if invalid parameters were provided. + */ +int RM_UnsubscribeFromKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc callback) { + if (!ctx || !callback) return REDISMODULE_ERR; + int removed = 0; + listIter li; + listNode *ln; + listRewind(moduleKeyspaceSubscribers,&li); + while ((ln = listNext(&li))) { + RedisModuleKeyspaceSubscriber *sub = ln->value; + if (sub->event_mask == types && sub->notify_callback == callback && sub->module == ctx->module) { + zfree(sub); + listDelNode(moduleKeyspaceSubscribers, ln); + removed++; + } + } + return removed > 0 ? REDISMODULE_OK : REDISMODULE_ERR; +} + void firePostExecutionUnitJobs(void) { /* Avoid propagation of commands. * In that way, postExecutionUnitOperations will prevent @@ -14730,6 +14762,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(NotifyKeyspaceEvent); REGISTER_API(GetNotifyKeyspaceEvents); REGISTER_API(SubscribeToKeyspaceEvents); + REGISTER_API(UnsubscribeFromKeyspaceEvents); REGISTER_API(AddPostNotificationJob); REGISTER_API(RegisterClusterMessageReceiver); REGISTER_API(SendClusterMessage); diff --git a/src/redismodule.h b/src/redismodule.h index 2a0f272a5..436e64ca9 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -1255,6 +1255,7 @@ REDISMODULE_API void (*RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx) R REDISMODULE_API int (*RedisModule_ThreadSafeContextTryLock)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_UnsubscribeFromKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_AddPostNotificationJob)(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)(void) REDISMODULE_ATTR; @@ -1643,6 +1644,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(BlockedClientMeasureTimeEnd); REDISMODULE_GET_API(SetDisconnectCallback); REDISMODULE_GET_API(SubscribeToKeyspaceEvents); + REDISMODULE_GET_API(UnsubscribeFromKeyspaceEvents); REDISMODULE_GET_API(AddPostNotificationJob); REDISMODULE_GET_API(NotifyKeyspaceEvent); REDISMODULE_GET_API(GetNotifyKeyspaceEvents); diff --git a/tests/modules/keyspace_events.c b/tests/modules/keyspace_events.c index e8dc1be4a..146261f6e 100644 --- a/tests/modules/keyspace_events.c +++ b/tests/modules/keyspace_events.c @@ -45,8 +45,10 @@ static int KeySpace_NotificationLoaded(RedisModuleCtx *ctx, int type, const char return REDISMODULE_OK; } +static long long callback_call_count = 0; static int KeySpace_NotificationGeneric(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { REDISMODULE_NOT_USED(type); + callback_call_count++; const char *key_str = RedisModule_StringPtrLen(key, NULL); if (strncmp(key_str, "count_dels_", 11) == 0 && strcmp(event, "del") == 0) { if (RedisModule_GetContextFlags(ctx) & REDISMODULE_CTX_FLAGS_MASTER) { @@ -296,6 +298,55 @@ static int cmdGetDels(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return RedisModule_ReplyWithLongLong(ctx, dels); } +static RedisModuleNotificationFunc get_callback_for_event(int event_mask) { + switch(event_mask) { + case REDISMODULE_NOTIFY_LOADED: + return KeySpace_NotificationLoaded; + case REDISMODULE_NOTIFY_GENERIC: + return KeySpace_NotificationGeneric; + case REDISMODULE_NOTIFY_EXPIRED: + return KeySpace_NotificationExpired; + case REDISMODULE_NOTIFY_MODULE: + return KeySpace_NotificationModule; + case REDISMODULE_NOTIFY_KEY_MISS: + return KeySpace_NotificationModuleKeyMiss; + case REDISMODULE_NOTIFY_STRING: + // We have two callbacks for STRING events in your OnLoad, + // For simplicity, pick the first: + return KeySpace_NotificationModuleString; + default: + return NULL; + } +} + +int GetCallbackCountCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModule_ReplyWithLongLong(ctx, callback_call_count); + return REDISMODULE_OK; +} + +static int CmdUnsub(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) { + return RedisModule_WrongArity(ctx); + } + + long long event_mask; + if (RedisModule_StringToLongLong(argv[1], &event_mask) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx, "ERR invalid event mask"); + } + + RedisModuleNotificationFunc cb = get_callback_for_event((int)event_mask); + if (cb == NULL) { + return RedisModule_ReplyWithError(ctx, "ERR unknown event mask"); + } + + if (RedisModule_UnsubscribeFromKeyspaceEvents(ctx, (int)event_mask, cb) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx, "ERR unsubscribe failed"); + } + + return RedisModule_ReplyWithSimpleString(ctx, "OK"); +} /* This function must be present on each Redis module. It is used in order to * register the commands into the Redis server. */ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { @@ -357,17 +408,17 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) "write", 0, 0, 0) == REDISMODULE_ERR){ return REDISMODULE_ERR; } - + if (RedisModule_CreateCommand(ctx, "keyspace.incr_case1", cmdIncrCase1, "write", 0, 0, 0) == REDISMODULE_ERR){ return REDISMODULE_ERR; } - + if (RedisModule_CreateCommand(ctx, "keyspace.incr_case2", cmdIncrCase2, "write", 0, 0, 0) == REDISMODULE_ERR){ return REDISMODULE_ERR; } - + if (RedisModule_CreateCommand(ctx, "keyspace.incr_case3", cmdIncrCase3, "write", 0, 0, 0) == REDISMODULE_ERR){ return REDISMODULE_ERR; @@ -383,6 +434,14 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; } + if (RedisModule_CreateCommand(ctx, "keyspace.unsubscribe", CmdUnsub, "write", 0, 0, 0) == REDISMODULE_ERR){ + return REDISMODULE_ERR; + } + + if (RedisModule_CreateCommand(ctx, "keyspace.callback_count", GetCallbackCountCommand, "", 0, 0, 0)== REDISMODULE_ERR){ + return REDISMODULE_ERR; + } + if (argc == 1) { const char *ptr = RedisModule_StringPtrLen(argv[0], NULL); if (!strcasecmp(ptr, "noload")) { diff --git a/tests/unit/moduleapi/keyspace_events.tcl b/tests/unit/moduleapi/keyspace_events.tcl index 30fdc1e2b..0b8dd476c 100644 --- a/tests/unit/moduleapi/keyspace_events.tcl +++ b/tests/unit/moduleapi/keyspace_events.tcl @@ -87,6 +87,24 @@ tags "modules" { $rd1 close } + test "Keyspace notifications: unsubscribe removes handler" { + r config set notify-keyspace-events KEA + set before [r keyspace.callback_count] + r set a 1 + r del a + wait_for_condition 100 10 { + [r keyspace.callback_count] > $before + } else { + fail "callback did not trigger" + } + set before_unsub [r keyspace.callback_count] + r keyspace.unsubscribe 4 ;# REDISMODULE_NOTIFY_GENERIC + r set a 1 + r del a + set after_unsub [r keyspace.callback_count] + assert_equal $before_unsub $after_unsub + } + test {Test expired key space event} { set prev_expired [s expired_keys] r set exp 1 PX 10