redis/tests/modules/keyspace_events.c
Yuan Wang 4757561861
Subkey notification for hash fields (#14958)
## Motivation

Redis's existing keyspace notification system operates at the **key
level** only — when a hash field is modified via `HSET`, `HDEL`, or
`HEXPIRE`, the subscriber receives the key name and the event type, but
not **which fields** were affected, therefore, these notifications has
very little practical value.

This PR introduces a subkey notification system that extends keyspace
events to include field-level (subkey) details for hash operations,
through both Pub/Sub channels and the Module API.

## New Pub/Sub Notification Channels

Four new channels are added:

|Channel Format | Payload |
|---------------|---------|
| `__subkeyspace@<db>__:<key>` | `<event>\|<len>:<subkey>[,...]` |
|`__subkeyevent@<db>__:<event>` |
`<key_len>:<key>\|<len>:<subkey>[,...]` |
| `__subkeyspaceitem@<db>__:<key>\n<subkey>` | `<event>` |
|`__subkeyspaceevent@<db>__:<event>\|<key>` | `<len>:<subkey>[,...]` |

**Design rationale for 4 channels:**
- **Subkeyspace**: Subscribe to a specific key, receive all field
changes in a single message — efficient for key-centric consumers.
- **Subkeyevent**: Subscribe to a specific event type, receive
key+fields — efficient for event-centric consumers.
- **Subkeyspaceitem**: Subscribe to a specific key+field combination —
the most selective, one message per field, no parsing needed.
- **Subkeyspaceevent**: Subscribe to event+key combination, receiving
only the affected fields — server-side filtering on both dimensions.

Subkeys are encoded in a length-prefixed format (`<len>:<subkey>`) to
support binary-safe field names containing delimiters.

**Safety guards:**
- Events containing `|` are skipped for `__subkeyspace` and
`__subkeyspaceevent ` channels (to avoid parsing ambiguity).
- Keys containing `\n` are skipped for the `__subkeyspaceitem` channel
(newline is the key/subkey separator).
- Subkeys channels are only published when `subkeys != NULL && count >
0`.

## Hash Command Integration

The following hash operations now emit subkey level notifications with
the affected field names:

| Command | Event | Subkeys |
|---------|-------|---------|
| `HSET` / `HMSET` | `hset` | All fields being set |
| `HSETNX` | `hset` | The field (if set) |
| `HDEL` | `hdel` | All fields deleted |
| `HGETDEL` | `hdel` / `hexpired` | Deleted or lazily expired fields |
| `HGETEX` | `hexpire` / `hpersist` / `hdel` / `hexpired` | Affected
fields per event |
| `HINCRBY` | `hincrby` | The field |
| `HINCRBYFLOAT` | `hincrbyfloat` | The field |
| `HEXPIRE` / `HPEXPIRE` / `HEXPIREAT` / `HPEXPIREAT` | `hexpire` |
Updated fields |
| `HPERSIST` | `hpersist` | Persisted fields |
| `HSETEX` | `hset` / `hdel` / `hexpire` / `hexpired` | Affected fields
per event |
| Field expiration (active/lazy) | `hexpired` | All expired fields
(batched) |

For field expiration, expired fields are collected into a dynamic array
and sent as a single batched notification after the expiration loop,
rather than one notification per field.

## Module API

Three new APIs and one new callback type:

```c
/* Function pointer type for keyspace event notifications with subkeys from modules. */
typedef void (*RedisModuleNotificationWithSubkeysFunc)(
    RedisModuleCtx *ctx, int type, const char *event,
    RedisModuleString *key, RedisModuleString **subkeys, int count);

/* Subscribe to keyspace notifications with subkey information.
 *
 * This is the extended version of RM_SubscribeToKeyspaceEvents. When subkeys
 * are available, the `subkeys` array and `count` are passed to the callback.
 * `subkeys` contains only the names of affected subkeys (values are not included),
 * and `count` is the number of elements. The array may contain duplicates when
 * the same subkey appears more than once in a command (e.g. HSET key f1 v1 f1 v2
 * produces subkeys=["f1","f1"], count=2). When no subkeys are present, `subkeys`
 * will be NULL and `count` will be 0. Whether events without subkeys are delivered
 * depends on the `flags` parameter (see below).
 *
 * `types` is a bit mask of event types the module is interested in
 * (using the same REDISMODULE_NOTIFY_* flags as RM_SubscribeToKeyspaceEvents).
 *
 * `flags` controls delivery filtering:
 *  - REDISMODULE_NOTIFY_FLAG_NONE: The callback is invoked for all matching
 *    events regardless of whether subkeys are present, so a separate
 *    RM_SubscribeToKeyspaceEvents registration can be omitted.
 *  - REDISMODULE_NOTIFY_FLAG_SUBKEYS_REQUIRED: The callback is only invoked
 *    when subkeys are not empty. Events without subkey information (e.g. SET,
 *    EXPIRE, DEL) are skipped.
 *
 * The callback signature is:
 *   void callback(RedisModuleCtx *ctx, int type, const char *event,
 *                 RedisModuleString *key, RedisModuleString **subkeys, int count);
 *
 * The subkeys array and its contents are only valid during the callback.
 * The underlying objects may be stack-allocated or temporary, so
 * RM_RetainString must NOT be used on them. To keep a subkey beyond
 * the callback (e.g. in a RM_AddPostNotificationJob callback), use
 * RM_HoldString (which handles static objects by copying) or
 * RM_CreateStringFromString to make a deep copy before returning.
 */
int RM_SubscribeToKeyspaceEventsWithSubkeys(RedisModuleCtx *ctx, int types, int flags, RedisModuleNotificationWithSubkeysFunc callback);

/* Unregister a module's callback from keyspace notifications with subkeys
 * for specific event types.
 *
 * This function removes a previously registered subscription identified by
 * the event mask, delivery flags, and the callback function.
 *
 * Parameters:
 *  - ctx: The RedisModuleCtx associated with the calling module.
 *  - types: The event mask representing the notification types to unsubscribe from.
 *  - flags: The delivery flags that were used during registration.
 *  - callback: The callback function pointer that was originally registered.
 *
 * Returns:
 *  - REDISMODULE_OK on successful removal of the subscription.
 *  - REDISMODULE_ERR if no matching subscription was found. */ 
int RM_UnsubscribeFromKeyspaceEventsWithSubkeys(
    RedisModuleCtx *ctx, int types, int flags,
    RedisModuleNotificationWithSubkeysFunc cb);

/* Like RM_NotifyKeyspaceEvent, but also triggers subkey-level notifications
 * when subkeys are provided. Both key-level (keyspace/keyevent) and
 * subkey-level (subkeyspace/subkeyevent/subkeyspaceitem/subkeyspaceevent)
 * channels are published to, depending on the server configuration.
 *
 * This is the extended version of RM_NotifyKeyspaceEvent and can actually
 * replace it. When called with subkeys=NULL and count=0, it behaves
 * identically to RM_NotifyKeyspaceEvent. */
int RM_NotifyKeyspaceEventWithSubkeys(
    RedisModuleCtx *ctx, int type, const char *event,
    RedisModuleString *key, RedisModuleString **subkeys, int count);
```

## Configuration

Subkey notifications are controlled via the existing
`notify-keyspace-events` configuration string with four new characters:
`notify-keyspace-events` "STIV"

**S** -> Subkeyspace events, published with `__subkeyspace@<db>__:<key>`
prefix.
**T** -> Subkeyevent events, published with
`__subkeyevent@<db>__:<event>` prefix.
**I** -> Subkeyspaceitem events, published per subkey with
`__subkeyspaceitem@<db>__:<key>\n<subkey>` prefix.
**V** -> Subkeyspaceevent events, published with
`__subkeyspaceevent@<db>__:<event>|<key>` prefix.

These flags are **independent** from the existing key-level flags (`K`,
`E`, etc.). Enabling subkey notifications does **not** implicitly enable
or depend on keyspace/keyevent notifications, and vice versa.

## Known Limitations

- **Duplicate fields in subkey notifications**: Subkey notification
payloads may contain duplicate field names when the same field is
affected more than once within a single command. Since duplicate fields
are not the common case and deduplication would introduce significant
overhead on every notification, we chose not to deduplicate at this
time.
- **Subkey is sds encoding object**: We assume the subkey is sds
encoding object, and access it by `subkey->ptr`, and there is an assert,
redis will crash if not.
2026-04-17 13:39:04 +08:00

610 lines
23 KiB
C

/* This module is used to test the server keyspace events API.
*
* -----------------------------------------------------------------------------
*
* Copyright (c) 2020-Present, Redis Ltd.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3).
*/
#define _BSD_SOURCE
#define _DEFAULT_SOURCE /* For usleep */
#include "redismodule.h"
#include <stdio.h>
#include <string.h>
#include <strings.h>
#include <unistd.h>
ustime_t cached_time = 0;
/** stores all the keys on which we got 'loaded' keyspace notification **/
RedisModuleDict *loaded_event_log = NULL;
/** stores all the keys on which we got 'module' keyspace notification **/
RedisModuleDict *module_event_log = NULL;
/** Counts how many deleted KSN we got on keys with a prefix of "count_dels_" **/
static size_t dels = 0;
/* Subkey notification log */
#define SUBKEY_LOG_MAX 256
static char subkey_log[SUBKEY_LOG_MAX][512];
static int subkey_log_count = 0;
static int KeySpace_NotificationLoaded(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(type);
if(strcmp(event, "loaded") == 0){
const char* keyName = RedisModule_StringPtrLen(key, NULL);
int nokey;
RedisModule_DictGetC(loaded_event_log, (void*)keyName, strlen(keyName), &nokey);
if(nokey){
RedisModule_DictSetC(loaded_event_log, (void*)keyName, strlen(keyName), RedisModule_HoldString(ctx, key));
}
}
return REDISMODULE_OK;
}
static long long callback_call_count = 0;
static int KeySpace_NotificationGeneric(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(type);
callback_call_count++;
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "count_dels_", 11) == 0 && strcmp(event, "del") == 0) {
if (RedisModule_GetContextFlags(ctx) & REDISMODULE_CTX_FLAGS_MASTER) {
dels++;
RedisModule_Replicate(ctx, "keyspace.incr_dels", "");
}
return REDISMODULE_OK;
}
if (cached_time) {
RedisModule_Assert(cached_time == RedisModule_CachedMicroseconds());
usleep(1);
RedisModule_Assert(cached_time != RedisModule_Microseconds());
}
if (strcmp(event, "del") == 0) {
RedisModuleString *copykey = RedisModule_CreateStringPrintf(ctx, "%s_copy", RedisModule_StringPtrLen(key, NULL));
RedisModuleCallReply* rep = RedisModule_Call(ctx, "DEL", "s!", copykey);
RedisModule_FreeString(ctx, copykey);
RedisModule_FreeCallReply(rep);
int ctx_flags = RedisModule_GetContextFlags(ctx);
if (ctx_flags & REDISMODULE_CTX_FLAGS_LUA) {
RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "c", "lua");
RedisModule_FreeCallReply(rep);
}
if (ctx_flags & REDISMODULE_CTX_FLAGS_MULTI) {
RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "c", "multi");
RedisModule_FreeCallReply(rep);
}
}
return REDISMODULE_OK;
}
static int KeySpace_NotificationExpired(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
REDISMODULE_NOT_USED(key);
RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "c!", "testkeyspace:expired");
RedisModule_FreeCallReply(rep);
return REDISMODULE_OK;
}
/* This key miss notification handler is performing a write command inside the notification callback.
* Notice, it is discourage and currently wrong to perform a write command inside key miss event.
* It can cause read commands to be replicated to the replica/aof. This test is here temporary (for coverage and
* verification that it's not crashing). */
static int KeySpace_NotificationModuleKeyMiss(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
REDISMODULE_NOT_USED(key);
int flags = RedisModule_GetContextFlags(ctx);
if (!(flags & REDISMODULE_CTX_FLAGS_MASTER)) {
return REDISMODULE_OK; // ignore the event on replica
}
RedisModuleCallReply* rep = RedisModule_Call(ctx, "incr", "!c", "missed");
RedisModule_FreeCallReply(rep);
return REDISMODULE_OK;
}
static int KeySpace_NotificationModuleString(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
RedisModuleKey *redis_key = RedisModule_OpenKey(ctx, key, REDISMODULE_READ);
size_t len = 0;
/* RedisModule_StringDMA could change the data format and cause the old robj to be freed.
* This code verifies that such format change will not cause any crashes.*/
char *data = RedisModule_StringDMA(redis_key, &len, REDISMODULE_READ);
int res = strncmp(data, "dummy", 5);
REDISMODULE_NOT_USED(res);
RedisModule_CloseKey(redis_key);
return REDISMODULE_OK;
}
static void KeySpace_PostNotificationStringFreePD(void *pd) {
RedisModule_FreeString(NULL, pd);
}
static void KeySpace_PostNotificationString(RedisModuleCtx *ctx, void *pd) {
REDISMODULE_NOT_USED(ctx);
RedisModuleCallReply* rep = RedisModule_Call(ctx, "incr", "!s", pd);
RedisModule_FreeCallReply(rep);
}
static int KeySpace_NotificationModuleStringPostNotificationJob(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "string1_", 8) != 0) {
return REDISMODULE_OK;
}
RedisModuleString *new_key = RedisModule_CreateStringPrintf(NULL, "string_changed{%s}", key_str);
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
return REDISMODULE_OK;
}
static int KeySpace_NotificationModule(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
const char* keyName = RedisModule_StringPtrLen(key, NULL);
int nokey;
RedisModule_DictGetC(module_event_log, (void*)keyName, strlen(keyName), &nokey);
if(nokey){
RedisModule_DictSetC(module_event_log, (void*)keyName, strlen(keyName), RedisModule_HoldString(ctx, key));
}
return REDISMODULE_OK;
}
static int cmdNotify(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
if(argc != 2){
return RedisModule_WrongArity(ctx);
}
RedisModule_NotifyKeyspaceEvent(ctx, REDISMODULE_NOTIFY_MODULE, "notify", argv[1]);
RedisModule_ReplyWithNull(ctx);
return REDISMODULE_OK;
}
static int cmdIsModuleKeyNotified(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
if(argc != 2){
return RedisModule_WrongArity(ctx);
}
const char* key = RedisModule_StringPtrLen(argv[1], NULL);
int nokey;
RedisModuleString* keyStr = RedisModule_DictGetC(module_event_log, (void*)key, strlen(key), &nokey);
RedisModule_ReplyWithArray(ctx, 2);
RedisModule_ReplyWithLongLong(ctx, !nokey);
if(nokey){
RedisModule_ReplyWithNull(ctx);
}else{
RedisModule_ReplyWithString(ctx, keyStr);
}
return REDISMODULE_OK;
}
static int cmdIsKeyLoaded(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
if(argc != 2){
return RedisModule_WrongArity(ctx);
}
const char* key = RedisModule_StringPtrLen(argv[1], NULL);
int nokey;
RedisModuleString* keyStr = RedisModule_DictGetC(loaded_event_log, (void*)key, strlen(key), &nokey);
RedisModule_ReplyWithArray(ctx, 2);
RedisModule_ReplyWithLongLong(ctx, !nokey);
if(nokey){
RedisModule_ReplyWithNull(ctx);
}else{
RedisModule_ReplyWithString(ctx, keyStr);
}
return REDISMODULE_OK;
}
static int cmdDelKeyCopy(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 2)
return RedisModule_WrongArity(ctx);
cached_time = RedisModule_CachedMicroseconds();
RedisModuleCallReply* rep = RedisModule_Call(ctx, "DEL", "s!", argv[1]);
if (!rep) {
RedisModule_ReplyWithError(ctx, "NULL reply returned");
} else {
RedisModule_ReplyWithCallReply(ctx, rep);
RedisModule_FreeCallReply(rep);
}
cached_time = 0;
return REDISMODULE_OK;
}
/* Call INCR and propagate using RM_Call with `!`. */
static int cmdIncrCase1(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 2)
return RedisModule_WrongArity(ctx);
RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "s!", argv[1]);
if (!rep) {
RedisModule_ReplyWithError(ctx, "NULL reply returned");
} else {
RedisModule_ReplyWithCallReply(ctx, rep);
RedisModule_FreeCallReply(rep);
}
return REDISMODULE_OK;
}
/* Call INCR and propagate using RM_Replicate. */
static int cmdIncrCase2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 2)
return RedisModule_WrongArity(ctx);
RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "s", argv[1]);
if (!rep) {
RedisModule_ReplyWithError(ctx, "NULL reply returned");
} else {
RedisModule_ReplyWithCallReply(ctx, rep);
RedisModule_FreeCallReply(rep);
}
RedisModule_Replicate(ctx, "INCR", "s", argv[1]);
return REDISMODULE_OK;
}
/* Call INCR and propagate using RM_ReplicateVerbatim. */
static int cmdIncrCase3(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 2)
return RedisModule_WrongArity(ctx);
RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "s", argv[1]);
if (!rep) {
RedisModule_ReplyWithError(ctx, "NULL reply returned");
} else {
RedisModule_ReplyWithCallReply(ctx, rep);
RedisModule_FreeCallReply(rep);
}
RedisModule_ReplicateVerbatim(ctx);
return REDISMODULE_OK;
}
static int cmdIncrDels(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
dels++;
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
static int cmdGetDels(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
return RedisModule_ReplyWithLongLong(ctx, dels);
}
/* Subkey notification callback */
static void KeySpace_NotificationSubkeys(RedisModuleCtx *ctx, int type, const char *event,
RedisModuleString *key, RedisModuleString **subkeys, int count) {
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(type);
if (subkey_log_count >= SUBKEY_LOG_MAX) return;
const char *key_str = RedisModule_StringPtrLen(key, NULL);
/* Format: "<event> <key> <count> <subkey1> <subkey2> ..." or "<event> <key> 0" */
char buf[512];
int off = snprintf(buf, sizeof(buf), "%s %s %d", event, key_str, count);
for (int i = 0; i < count && (size_t)off < sizeof(buf) - 1; i++) {
const char *sk = RedisModule_StringPtrLen(subkeys[i], NULL);
off += snprintf(buf + off, sizeof(buf) - off, " %s", sk);
}
snprintf(subkey_log[subkey_log_count], sizeof(subkey_log[0]), "%s", buf);
subkey_log_count++;
}
/* keyspace.get_subkey_events — return all logged subkey events as an array */
static int cmdGetSubkeyEvents(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
RedisModule_ReplyWithArray(ctx, subkey_log_count);
for (int i = 0; i < subkey_log_count; i++) {
RedisModule_ReplyWithCString(ctx, subkey_log[i]);
}
return REDISMODULE_OK;
}
/* keyspace.reset_subkey_events — clear the log */
static int cmdResetSubkeyEvents(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
subkey_log_count = 0;
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
/* keyspace.notify_with_subkeys <key> <subkey1> [subkey2 ...] — trigger a module subkey notification */
static int cmdNotifyWithSubkeys(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc < 3) return RedisModule_WrongArity(ctx);
RedisModuleString *key = argv[1];
RedisModuleString **subkeys = &argv[2];
int count = argc - 2;
RedisModule_NotifyKeyspaceEventWithSubkeys(ctx, REDISMODULE_NOTIFY_HASH, "module_subkey_event", key, subkeys, count);
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
/* keyspace.subscribe_subkeys — subscribe with NONE flag (all events) */
static int cmdSubscribeSubkeys(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_SubscribeToKeyspaceEventsWithSubkeys(ctx, REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_GENERIC,
REDISMODULE_NOTIFY_FLAG_NONE, KeySpace_NotificationSubkeys) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx, "ERR subscribe failed");
}
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
/* keyspace.unsubscribe_subkeys — unsubscribe the subkey callback */
static int cmdUnsubscribeSubkeys(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_UnsubscribeFromKeyspaceEventsWithSubkeys(ctx, REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_GENERIC,
REDISMODULE_NOTIFY_FLAG_NONE, KeySpace_NotificationSubkeys) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx, "ERR unsubscribe failed");
}
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
/* keyspace.subscribe_require_subkeys — subscribe with SUBKEYS_REQUIRED flag */
static int cmdSubscribeRequireSubkeys(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_SubscribeToKeyspaceEventsWithSubkeys(ctx, REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_GENERIC,
REDISMODULE_NOTIFY_FLAG_SUBKEYS_REQUIRED,
KeySpace_NotificationSubkeys) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx, "ERR subscribe failed");
}
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
/* keyspace.unsubscribe_require_subkeys — unsubscribe the SUBKEYS_REQUIRED callback */
static int cmdUnsubscribeRequireSubkeys(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_UnsubscribeFromKeyspaceEventsWithSubkeys(ctx, REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_GENERIC,
REDISMODULE_NOTIFY_FLAG_SUBKEYS_REQUIRED,
KeySpace_NotificationSubkeys) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx, "ERR unsubscribe failed");
}
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
static RedisModuleNotificationFunc get_callback_for_event(int event_mask) {
switch(event_mask) {
case REDISMODULE_NOTIFY_LOADED:
return KeySpace_NotificationLoaded;
case REDISMODULE_NOTIFY_GENERIC:
return KeySpace_NotificationGeneric;
case REDISMODULE_NOTIFY_EXPIRED:
return KeySpace_NotificationExpired;
case REDISMODULE_NOTIFY_MODULE:
return KeySpace_NotificationModule;
case REDISMODULE_NOTIFY_KEY_MISS:
return KeySpace_NotificationModuleKeyMiss;
case REDISMODULE_NOTIFY_STRING:
// We have two callbacks for STRING events in your OnLoad,
// For simplicity, pick the first:
return KeySpace_NotificationModuleString;
default:
return NULL;
}
}
int GetCallbackCountCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
RedisModule_ReplyWithLongLong(ctx, callback_call_count);
return REDISMODULE_OK;
}
static int CmdUnsub(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 2) {
return RedisModule_WrongArity(ctx);
}
long long event_mask;
if (RedisModule_StringToLongLong(argv[1], &event_mask) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx, "ERR invalid event mask");
}
RedisModuleNotificationFunc cb = get_callback_for_event((int)event_mask);
if (cb == NULL) {
return RedisModule_ReplyWithError(ctx, "ERR unknown event mask");
}
if (RedisModule_UnsubscribeFromKeyspaceEvents(ctx, (int)event_mask, cb) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx, "ERR unsubscribe failed");
}
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
/* This function must be present on each Redis module. It is used in order to
* register the commands into the Redis server. */
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (RedisModule_Init(ctx,"testkeyspace",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
loaded_event_log = RedisModule_CreateDict(ctx);
module_event_log = RedisModule_CreateDict(ctx);
int keySpaceAll = RedisModule_GetKeyspaceNotificationFlagsAll();
if (!(keySpaceAll & REDISMODULE_NOTIFY_LOADED)) {
// REDISMODULE_NOTIFY_LOADED event are not supported we can not start
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_LOADED, KeySpace_NotificationLoaded) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_GENERIC, KeySpace_NotificationGeneric) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_EXPIRED, KeySpace_NotificationExpired) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_MODULE, KeySpace_NotificationModule) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_MISS, KeySpace_NotificationModuleKeyMiss) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationModuleString) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationModuleStringPostNotificationJob) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx,"keyspace.notify", cmdNotify,"",0,0,0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx,"keyspace.is_module_key_notified", cmdIsModuleKeyNotified,"",0,0,0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx,"keyspace.is_key_loaded", cmdIsKeyLoaded,"",0,0,0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.del_key_copy", cmdDelKeyCopy,
"write", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.incr_case1", cmdIncrCase1,
"write", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.incr_case2", cmdIncrCase2,
"write", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.incr_case3", cmdIncrCase3,
"write", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.incr_dels", cmdIncrDels,
"write", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.get_dels", cmdGetDels,
"readonly", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.unsubscribe", CmdUnsub, "write", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.callback_count", GetCallbackCountCommand, "", 0, 0, 0)== REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.subscribe_subkeys", cmdSubscribeSubkeys, "", 0, 0, 0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.unsubscribe_subkeys", cmdUnsubscribeSubkeys, "", 0, 0, 0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.get_subkey_events", cmdGetSubkeyEvents, "readonly", 0, 0, 0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.reset_subkey_events", cmdResetSubkeyEvents, "", 0, 0, 0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.notify_with_subkeys", cmdNotifyWithSubkeys, "write", 0, 0, 0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.subscribe_require_subkeys", cmdSubscribeRequireSubkeys, "", 0, 0, 0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.unsubscribe_require_subkeys", cmdUnsubscribeRequireSubkeys, "", 0, 0, 0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
if (argc == 1) {
const char *ptr = RedisModule_StringPtrLen(argv[0], NULL);
if (!strcasecmp(ptr, "noload")) {
/* This is a hint that we return ERR at the last moment of OnLoad. */
RedisModule_FreeDict(ctx, loaded_event_log);
RedisModule_FreeDict(ctx, module_event_log);
return REDISMODULE_ERR;
}
}
return REDISMODULE_OK;
}
int RedisModule_OnUnload(RedisModuleCtx *ctx) {
RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(loaded_event_log, "^", NULL, 0);
char* key;
size_t keyLen;
RedisModuleString* val;
while((key = RedisModule_DictNextC(iter, &keyLen, (void**)&val))){
RedisModule_FreeString(ctx, val);
}
RedisModule_FreeDict(ctx, loaded_event_log);
RedisModule_DictIteratorStop(iter);
loaded_event_log = NULL;
iter = RedisModule_DictIteratorStartC(module_event_log, "^", NULL, 0);
while((key = RedisModule_DictNextC(iter, &keyLen, (void**)&val))){
RedisModule_FreeString(ctx, val);
}
RedisModule_FreeDict(ctx, module_event_log);
RedisModule_DictIteratorStop(iter);
module_event_log = NULL;
return REDISMODULE_OK;
}