mirror of
https://github.com/redis/redis.git
synced 2026-06-11 09:52:05 -04:00
## Motivation
Redis's existing keyspace notification system operates at the **key
level** only — when a hash field is modified via `HSET`, `HDEL`, or
`HEXPIRE`, the subscriber receives the key name and the event type, but
not **which fields** were affected, therefore, these notifications has
very little practical value.
This PR introduces a subkey notification system that extends keyspace
events to include field-level (subkey) details for hash operations,
through both Pub/Sub channels and the Module API.
## New Pub/Sub Notification Channels
Four new channels are added:
|Channel Format | Payload |
|---------------|---------|
| `__subkeyspace@<db>__:<key>` | `<event>\|<len>:<subkey>[,...]` |
|`__subkeyevent@<db>__:<event>` |
`<key_len>:<key>\|<len>:<subkey>[,...]` |
| `__subkeyspaceitem@<db>__:<key>\n<subkey>` | `<event>` |
|`__subkeyspaceevent@<db>__:<event>\|<key>` | `<len>:<subkey>[,...]` |
**Design rationale for 4 channels:**
- **Subkeyspace**: Subscribe to a specific key, receive all field
changes in a single message — efficient for key-centric consumers.
- **Subkeyevent**: Subscribe to a specific event type, receive
key+fields — efficient for event-centric consumers.
- **Subkeyspaceitem**: Subscribe to a specific key+field combination —
the most selective, one message per field, no parsing needed.
- **Subkeyspaceevent**: Subscribe to event+key combination, receiving
only the affected fields — server-side filtering on both dimensions.
Subkeys are encoded in a length-prefixed format (`<len>:<subkey>`) to
support binary-safe field names containing delimiters.
**Safety guards:**
- Events containing `|` are skipped for `__subkeyspace` and
`__subkeyspaceevent ` channels (to avoid parsing ambiguity).
- Keys containing `\n` are skipped for the `__subkeyspaceitem` channel
(newline is the key/subkey separator).
- Subkeys channels are only published when `subkeys != NULL && count >
0`.
## Hash Command Integration
The following hash operations now emit subkey level notifications with
the affected field names:
| Command | Event | Subkeys |
|---------|-------|---------|
| `HSET` / `HMSET` | `hset` | All fields being set |
| `HSETNX` | `hset` | The field (if set) |
| `HDEL` | `hdel` | All fields deleted |
| `HGETDEL` | `hdel` / `hexpired` | Deleted or lazily expired fields |
| `HGETEX` | `hexpire` / `hpersist` / `hdel` / `hexpired` | Affected
fields per event |
| `HINCRBY` | `hincrby` | The field |
| `HINCRBYFLOAT` | `hincrbyfloat` | The field |
| `HEXPIRE` / `HPEXPIRE` / `HEXPIREAT` / `HPEXPIREAT` | `hexpire` |
Updated fields |
| `HPERSIST` | `hpersist` | Persisted fields |
| `HSETEX` | `hset` / `hdel` / `hexpire` / `hexpired` | Affected fields
per event |
| Field expiration (active/lazy) | `hexpired` | All expired fields
(batched) |
For field expiration, expired fields are collected into a dynamic array
and sent as a single batched notification after the expiration loop,
rather than one notification per field.
## Module API
Three new APIs and one new callback type:
```c
/* Function pointer type for keyspace event notifications with subkeys from modules. */
typedef void (*RedisModuleNotificationWithSubkeysFunc)(
RedisModuleCtx *ctx, int type, const char *event,
RedisModuleString *key, RedisModuleString **subkeys, int count);
/* Subscribe to keyspace notifications with subkey information.
*
* This is the extended version of RM_SubscribeToKeyspaceEvents. When subkeys
* are available, the `subkeys` array and `count` are passed to the callback.
* `subkeys` contains only the names of affected subkeys (values are not included),
* and `count` is the number of elements. The array may contain duplicates when
* the same subkey appears more than once in a command (e.g. HSET key f1 v1 f1 v2
* produces subkeys=["f1","f1"], count=2). When no subkeys are present, `subkeys`
* will be NULL and `count` will be 0. Whether events without subkeys are delivered
* depends on the `flags` parameter (see below).
*
* `types` is a bit mask of event types the module is interested in
* (using the same REDISMODULE_NOTIFY_* flags as RM_SubscribeToKeyspaceEvents).
*
* `flags` controls delivery filtering:
* - REDISMODULE_NOTIFY_FLAG_NONE: The callback is invoked for all matching
* events regardless of whether subkeys are present, so a separate
* RM_SubscribeToKeyspaceEvents registration can be omitted.
* - REDISMODULE_NOTIFY_FLAG_SUBKEYS_REQUIRED: The callback is only invoked
* when subkeys are not empty. Events without subkey information (e.g. SET,
* EXPIRE, DEL) are skipped.
*
* The callback signature is:
* void callback(RedisModuleCtx *ctx, int type, const char *event,
* RedisModuleString *key, RedisModuleString **subkeys, int count);
*
* The subkeys array and its contents are only valid during the callback.
* The underlying objects may be stack-allocated or temporary, so
* RM_RetainString must NOT be used on them. To keep a subkey beyond
* the callback (e.g. in a RM_AddPostNotificationJob callback), use
* RM_HoldString (which handles static objects by copying) or
* RM_CreateStringFromString to make a deep copy before returning.
*/
int RM_SubscribeToKeyspaceEventsWithSubkeys(RedisModuleCtx *ctx, int types, int flags, RedisModuleNotificationWithSubkeysFunc callback);
/* Unregister a module's callback from keyspace notifications with subkeys
* for specific event types.
*
* This function removes a previously registered subscription identified by
* the event mask, delivery flags, and the callback function.
*
* Parameters:
* - ctx: The RedisModuleCtx associated with the calling module.
* - types: The event mask representing the notification types to unsubscribe from.
* - flags: The delivery flags that were used during registration.
* - callback: The callback function pointer that was originally registered.
*
* Returns:
* - REDISMODULE_OK on successful removal of the subscription.
* - REDISMODULE_ERR if no matching subscription was found. */
int RM_UnsubscribeFromKeyspaceEventsWithSubkeys(
RedisModuleCtx *ctx, int types, int flags,
RedisModuleNotificationWithSubkeysFunc cb);
/* Like RM_NotifyKeyspaceEvent, but also triggers subkey-level notifications
* when subkeys are provided. Both key-level (keyspace/keyevent) and
* subkey-level (subkeyspace/subkeyevent/subkeyspaceitem/subkeyspaceevent)
* channels are published to, depending on the server configuration.
*
* This is the extended version of RM_NotifyKeyspaceEvent and can actually
* replace it. When called with subkeys=NULL and count=0, it behaves
* identically to RM_NotifyKeyspaceEvent. */
int RM_NotifyKeyspaceEventWithSubkeys(
RedisModuleCtx *ctx, int type, const char *event,
RedisModuleString *key, RedisModuleString **subkeys, int count);
```
## Configuration
Subkey notifications are controlled via the existing
`notify-keyspace-events` configuration string with four new characters:
`notify-keyspace-events` "STIV"
**S** -> Subkeyspace events, published with `__subkeyspace@<db>__:<key>`
prefix.
**T** -> Subkeyevent events, published with
`__subkeyevent@<db>__:<event>` prefix.
**I** -> Subkeyspaceitem events, published per subkey with
`__subkeyspaceitem@<db>__:<key>\n<subkey>` prefix.
**V** -> Subkeyspaceevent events, published with
`__subkeyspaceevent@<db>__:<event>|<key>` prefix.
These flags are **independent** from the existing key-level flags (`K`,
`E`, etc.). Enabling subkey notifications does **not** implicitly enable
or depend on keyspace/keyevent notifications, and vice versa.
## Known Limitations
- **Duplicate fields in subkey notifications**: Subkey notification
payloads may contain duplicate field names when the same field is
affected more than once within a single command. Since duplicate fields
are not the common case and deduplication would introduce significant
overhead on every notification, we chose not to deduplicate at this
time.
- **Subkey is sds encoding object**: We assume the subkey is sds
encoding object, and access it by `subkey->ptr`, and there is an assert,
redis will crash if not.
610 lines
23 KiB
C
610 lines
23 KiB
C
/* This module is used to test the server keyspace events API.
|
|
*
|
|
* -----------------------------------------------------------------------------
|
|
*
|
|
* Copyright (c) 2020-Present, Redis Ltd.
|
|
* All rights reserved.
|
|
*
|
|
* Licensed under your choice of (a) the Redis Source Available License 2.0
|
|
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
|
|
* GNU Affero General Public License v3 (AGPLv3).
|
|
*/
|
|
|
|
#define _BSD_SOURCE
|
|
#define _DEFAULT_SOURCE /* For usleep */
|
|
|
|
#include "redismodule.h"
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
#include <strings.h>
|
|
#include <unistd.h>
|
|
|
|
ustime_t cached_time = 0;
|
|
|
|
/** stores all the keys on which we got 'loaded' keyspace notification **/
|
|
RedisModuleDict *loaded_event_log = NULL;
|
|
/** stores all the keys on which we got 'module' keyspace notification **/
|
|
RedisModuleDict *module_event_log = NULL;
|
|
|
|
/** Counts how many deleted KSN we got on keys with a prefix of "count_dels_" **/
|
|
static size_t dels = 0;
|
|
|
|
/* Subkey notification log */
|
|
#define SUBKEY_LOG_MAX 256
|
|
static char subkey_log[SUBKEY_LOG_MAX][512];
|
|
static int subkey_log_count = 0;
|
|
|
|
static int KeySpace_NotificationLoaded(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
|
|
REDISMODULE_NOT_USED(ctx);
|
|
REDISMODULE_NOT_USED(type);
|
|
|
|
if(strcmp(event, "loaded") == 0){
|
|
const char* keyName = RedisModule_StringPtrLen(key, NULL);
|
|
int nokey;
|
|
RedisModule_DictGetC(loaded_event_log, (void*)keyName, strlen(keyName), &nokey);
|
|
if(nokey){
|
|
RedisModule_DictSetC(loaded_event_log, (void*)keyName, strlen(keyName), RedisModule_HoldString(ctx, key));
|
|
}
|
|
}
|
|
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
static long long callback_call_count = 0;
|
|
static int KeySpace_NotificationGeneric(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
|
|
REDISMODULE_NOT_USED(type);
|
|
callback_call_count++;
|
|
const char *key_str = RedisModule_StringPtrLen(key, NULL);
|
|
if (strncmp(key_str, "count_dels_", 11) == 0 && strcmp(event, "del") == 0) {
|
|
if (RedisModule_GetContextFlags(ctx) & REDISMODULE_CTX_FLAGS_MASTER) {
|
|
dels++;
|
|
RedisModule_Replicate(ctx, "keyspace.incr_dels", "");
|
|
}
|
|
return REDISMODULE_OK;
|
|
}
|
|
if (cached_time) {
|
|
RedisModule_Assert(cached_time == RedisModule_CachedMicroseconds());
|
|
usleep(1);
|
|
RedisModule_Assert(cached_time != RedisModule_Microseconds());
|
|
}
|
|
|
|
if (strcmp(event, "del") == 0) {
|
|
RedisModuleString *copykey = RedisModule_CreateStringPrintf(ctx, "%s_copy", RedisModule_StringPtrLen(key, NULL));
|
|
RedisModuleCallReply* rep = RedisModule_Call(ctx, "DEL", "s!", copykey);
|
|
RedisModule_FreeString(ctx, copykey);
|
|
RedisModule_FreeCallReply(rep);
|
|
|
|
int ctx_flags = RedisModule_GetContextFlags(ctx);
|
|
if (ctx_flags & REDISMODULE_CTX_FLAGS_LUA) {
|
|
RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "c", "lua");
|
|
RedisModule_FreeCallReply(rep);
|
|
}
|
|
if (ctx_flags & REDISMODULE_CTX_FLAGS_MULTI) {
|
|
RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "c", "multi");
|
|
RedisModule_FreeCallReply(rep);
|
|
}
|
|
}
|
|
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
static int KeySpace_NotificationExpired(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
|
|
REDISMODULE_NOT_USED(type);
|
|
REDISMODULE_NOT_USED(event);
|
|
REDISMODULE_NOT_USED(key);
|
|
|
|
RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "c!", "testkeyspace:expired");
|
|
RedisModule_FreeCallReply(rep);
|
|
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* This key miss notification handler is performing a write command inside the notification callback.
|
|
* Notice, it is discourage and currently wrong to perform a write command inside key miss event.
|
|
* It can cause read commands to be replicated to the replica/aof. This test is here temporary (for coverage and
|
|
* verification that it's not crashing). */
|
|
static int KeySpace_NotificationModuleKeyMiss(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
|
|
REDISMODULE_NOT_USED(type);
|
|
REDISMODULE_NOT_USED(event);
|
|
REDISMODULE_NOT_USED(key);
|
|
|
|
int flags = RedisModule_GetContextFlags(ctx);
|
|
if (!(flags & REDISMODULE_CTX_FLAGS_MASTER)) {
|
|
return REDISMODULE_OK; // ignore the event on replica
|
|
}
|
|
|
|
RedisModuleCallReply* rep = RedisModule_Call(ctx, "incr", "!c", "missed");
|
|
RedisModule_FreeCallReply(rep);
|
|
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
static int KeySpace_NotificationModuleString(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
|
|
REDISMODULE_NOT_USED(type);
|
|
REDISMODULE_NOT_USED(event);
|
|
RedisModuleKey *redis_key = RedisModule_OpenKey(ctx, key, REDISMODULE_READ);
|
|
|
|
size_t len = 0;
|
|
/* RedisModule_StringDMA could change the data format and cause the old robj to be freed.
|
|
* This code verifies that such format change will not cause any crashes.*/
|
|
char *data = RedisModule_StringDMA(redis_key, &len, REDISMODULE_READ);
|
|
int res = strncmp(data, "dummy", 5);
|
|
REDISMODULE_NOT_USED(res);
|
|
|
|
RedisModule_CloseKey(redis_key);
|
|
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
static void KeySpace_PostNotificationStringFreePD(void *pd) {
|
|
RedisModule_FreeString(NULL, pd);
|
|
}
|
|
|
|
static void KeySpace_PostNotificationString(RedisModuleCtx *ctx, void *pd) {
|
|
REDISMODULE_NOT_USED(ctx);
|
|
RedisModuleCallReply* rep = RedisModule_Call(ctx, "incr", "!s", pd);
|
|
RedisModule_FreeCallReply(rep);
|
|
}
|
|
|
|
static int KeySpace_NotificationModuleStringPostNotificationJob(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
|
|
REDISMODULE_NOT_USED(ctx);
|
|
REDISMODULE_NOT_USED(type);
|
|
REDISMODULE_NOT_USED(event);
|
|
|
|
const char *key_str = RedisModule_StringPtrLen(key, NULL);
|
|
|
|
if (strncmp(key_str, "string1_", 8) != 0) {
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
RedisModuleString *new_key = RedisModule_CreateStringPrintf(NULL, "string_changed{%s}", key_str);
|
|
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
static int KeySpace_NotificationModule(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
|
|
REDISMODULE_NOT_USED(ctx);
|
|
REDISMODULE_NOT_USED(type);
|
|
REDISMODULE_NOT_USED(event);
|
|
|
|
const char* keyName = RedisModule_StringPtrLen(key, NULL);
|
|
int nokey;
|
|
RedisModule_DictGetC(module_event_log, (void*)keyName, strlen(keyName), &nokey);
|
|
if(nokey){
|
|
RedisModule_DictSetC(module_event_log, (void*)keyName, strlen(keyName), RedisModule_HoldString(ctx, key));
|
|
}
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
static int cmdNotify(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
|
|
if(argc != 2){
|
|
return RedisModule_WrongArity(ctx);
|
|
}
|
|
|
|
RedisModule_NotifyKeyspaceEvent(ctx, REDISMODULE_NOTIFY_MODULE, "notify", argv[1]);
|
|
RedisModule_ReplyWithNull(ctx);
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
static int cmdIsModuleKeyNotified(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
|
|
if(argc != 2){
|
|
return RedisModule_WrongArity(ctx);
|
|
}
|
|
|
|
const char* key = RedisModule_StringPtrLen(argv[1], NULL);
|
|
|
|
int nokey;
|
|
RedisModuleString* keyStr = RedisModule_DictGetC(module_event_log, (void*)key, strlen(key), &nokey);
|
|
|
|
RedisModule_ReplyWithArray(ctx, 2);
|
|
RedisModule_ReplyWithLongLong(ctx, !nokey);
|
|
if(nokey){
|
|
RedisModule_ReplyWithNull(ctx);
|
|
}else{
|
|
RedisModule_ReplyWithString(ctx, keyStr);
|
|
}
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
static int cmdIsKeyLoaded(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
|
|
if(argc != 2){
|
|
return RedisModule_WrongArity(ctx);
|
|
}
|
|
|
|
const char* key = RedisModule_StringPtrLen(argv[1], NULL);
|
|
|
|
int nokey;
|
|
RedisModuleString* keyStr = RedisModule_DictGetC(loaded_event_log, (void*)key, strlen(key), &nokey);
|
|
|
|
RedisModule_ReplyWithArray(ctx, 2);
|
|
RedisModule_ReplyWithLongLong(ctx, !nokey);
|
|
if(nokey){
|
|
RedisModule_ReplyWithNull(ctx);
|
|
}else{
|
|
RedisModule_ReplyWithString(ctx, keyStr);
|
|
}
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
static int cmdDelKeyCopy(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|
if (argc != 2)
|
|
return RedisModule_WrongArity(ctx);
|
|
|
|
cached_time = RedisModule_CachedMicroseconds();
|
|
|
|
RedisModuleCallReply* rep = RedisModule_Call(ctx, "DEL", "s!", argv[1]);
|
|
if (!rep) {
|
|
RedisModule_ReplyWithError(ctx, "NULL reply returned");
|
|
} else {
|
|
RedisModule_ReplyWithCallReply(ctx, rep);
|
|
RedisModule_FreeCallReply(rep);
|
|
}
|
|
cached_time = 0;
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* Call INCR and propagate using RM_Call with `!`. */
|
|
static int cmdIncrCase1(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|
if (argc != 2)
|
|
return RedisModule_WrongArity(ctx);
|
|
|
|
RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "s!", argv[1]);
|
|
if (!rep) {
|
|
RedisModule_ReplyWithError(ctx, "NULL reply returned");
|
|
} else {
|
|
RedisModule_ReplyWithCallReply(ctx, rep);
|
|
RedisModule_FreeCallReply(rep);
|
|
}
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* Call INCR and propagate using RM_Replicate. */
|
|
static int cmdIncrCase2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|
if (argc != 2)
|
|
return RedisModule_WrongArity(ctx);
|
|
|
|
RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "s", argv[1]);
|
|
if (!rep) {
|
|
RedisModule_ReplyWithError(ctx, "NULL reply returned");
|
|
} else {
|
|
RedisModule_ReplyWithCallReply(ctx, rep);
|
|
RedisModule_FreeCallReply(rep);
|
|
}
|
|
RedisModule_Replicate(ctx, "INCR", "s", argv[1]);
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* Call INCR and propagate using RM_ReplicateVerbatim. */
|
|
static int cmdIncrCase3(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|
if (argc != 2)
|
|
return RedisModule_WrongArity(ctx);
|
|
|
|
RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "s", argv[1]);
|
|
if (!rep) {
|
|
RedisModule_ReplyWithError(ctx, "NULL reply returned");
|
|
} else {
|
|
RedisModule_ReplyWithCallReply(ctx, rep);
|
|
RedisModule_FreeCallReply(rep);
|
|
}
|
|
RedisModule_ReplicateVerbatim(ctx);
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
static int cmdIncrDels(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|
REDISMODULE_NOT_USED(argv);
|
|
REDISMODULE_NOT_USED(argc);
|
|
dels++;
|
|
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
|
}
|
|
|
|
static int cmdGetDels(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|
REDISMODULE_NOT_USED(argv);
|
|
REDISMODULE_NOT_USED(argc);
|
|
return RedisModule_ReplyWithLongLong(ctx, dels);
|
|
}
|
|
|
|
/* Subkey notification callback */
|
|
static void KeySpace_NotificationSubkeys(RedisModuleCtx *ctx, int type, const char *event,
|
|
RedisModuleString *key, RedisModuleString **subkeys, int count) {
|
|
REDISMODULE_NOT_USED(ctx);
|
|
REDISMODULE_NOT_USED(type);
|
|
|
|
if (subkey_log_count >= SUBKEY_LOG_MAX) return;
|
|
|
|
const char *key_str = RedisModule_StringPtrLen(key, NULL);
|
|
|
|
/* Format: "<event> <key> <count> <subkey1> <subkey2> ..." or "<event> <key> 0" */
|
|
char buf[512];
|
|
int off = snprintf(buf, sizeof(buf), "%s %s %d", event, key_str, count);
|
|
for (int i = 0; i < count && (size_t)off < sizeof(buf) - 1; i++) {
|
|
const char *sk = RedisModule_StringPtrLen(subkeys[i], NULL);
|
|
off += snprintf(buf + off, sizeof(buf) - off, " %s", sk);
|
|
}
|
|
snprintf(subkey_log[subkey_log_count], sizeof(subkey_log[0]), "%s", buf);
|
|
subkey_log_count++;
|
|
}
|
|
|
|
/* keyspace.get_subkey_events — return all logged subkey events as an array */
|
|
static int cmdGetSubkeyEvents(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|
REDISMODULE_NOT_USED(argv);
|
|
REDISMODULE_NOT_USED(argc);
|
|
RedisModule_ReplyWithArray(ctx, subkey_log_count);
|
|
for (int i = 0; i < subkey_log_count; i++) {
|
|
RedisModule_ReplyWithCString(ctx, subkey_log[i]);
|
|
}
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* keyspace.reset_subkey_events — clear the log */
|
|
static int cmdResetSubkeyEvents(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|
REDISMODULE_NOT_USED(argv);
|
|
REDISMODULE_NOT_USED(argc);
|
|
subkey_log_count = 0;
|
|
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
|
}
|
|
|
|
/* keyspace.notify_with_subkeys <key> <subkey1> [subkey2 ...] — trigger a module subkey notification */
|
|
static int cmdNotifyWithSubkeys(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|
if (argc < 3) return RedisModule_WrongArity(ctx);
|
|
|
|
RedisModuleString *key = argv[1];
|
|
RedisModuleString **subkeys = &argv[2];
|
|
int count = argc - 2;
|
|
|
|
RedisModule_NotifyKeyspaceEventWithSubkeys(ctx, REDISMODULE_NOTIFY_HASH, "module_subkey_event", key, subkeys, count);
|
|
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
|
}
|
|
|
|
/* keyspace.subscribe_subkeys — subscribe with NONE flag (all events) */
|
|
static int cmdSubscribeSubkeys(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|
REDISMODULE_NOT_USED(argv);
|
|
REDISMODULE_NOT_USED(argc);
|
|
if (RedisModule_SubscribeToKeyspaceEventsWithSubkeys(ctx, REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_GENERIC,
|
|
REDISMODULE_NOTIFY_FLAG_NONE, KeySpace_NotificationSubkeys) != REDISMODULE_OK) {
|
|
return RedisModule_ReplyWithError(ctx, "ERR subscribe failed");
|
|
}
|
|
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
|
}
|
|
|
|
/* keyspace.unsubscribe_subkeys — unsubscribe the subkey callback */
|
|
static int cmdUnsubscribeSubkeys(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|
REDISMODULE_NOT_USED(argv);
|
|
REDISMODULE_NOT_USED(argc);
|
|
if (RedisModule_UnsubscribeFromKeyspaceEventsWithSubkeys(ctx, REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_GENERIC,
|
|
REDISMODULE_NOTIFY_FLAG_NONE, KeySpace_NotificationSubkeys) != REDISMODULE_OK) {
|
|
return RedisModule_ReplyWithError(ctx, "ERR unsubscribe failed");
|
|
}
|
|
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
|
}
|
|
|
|
/* keyspace.subscribe_require_subkeys — subscribe with SUBKEYS_REQUIRED flag */
|
|
static int cmdSubscribeRequireSubkeys(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|
REDISMODULE_NOT_USED(argv);
|
|
REDISMODULE_NOT_USED(argc);
|
|
if (RedisModule_SubscribeToKeyspaceEventsWithSubkeys(ctx, REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_GENERIC,
|
|
REDISMODULE_NOTIFY_FLAG_SUBKEYS_REQUIRED,
|
|
KeySpace_NotificationSubkeys) != REDISMODULE_OK) {
|
|
return RedisModule_ReplyWithError(ctx, "ERR subscribe failed");
|
|
}
|
|
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
|
}
|
|
|
|
/* keyspace.unsubscribe_require_subkeys — unsubscribe the SUBKEYS_REQUIRED callback */
|
|
static int cmdUnsubscribeRequireSubkeys(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|
REDISMODULE_NOT_USED(argv);
|
|
REDISMODULE_NOT_USED(argc);
|
|
if (RedisModule_UnsubscribeFromKeyspaceEventsWithSubkeys(ctx, REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_GENERIC,
|
|
REDISMODULE_NOTIFY_FLAG_SUBKEYS_REQUIRED,
|
|
KeySpace_NotificationSubkeys) != REDISMODULE_OK) {
|
|
return RedisModule_ReplyWithError(ctx, "ERR unsubscribe failed");
|
|
}
|
|
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
|
}
|
|
|
|
static RedisModuleNotificationFunc get_callback_for_event(int event_mask) {
|
|
switch(event_mask) {
|
|
case REDISMODULE_NOTIFY_LOADED:
|
|
return KeySpace_NotificationLoaded;
|
|
case REDISMODULE_NOTIFY_GENERIC:
|
|
return KeySpace_NotificationGeneric;
|
|
case REDISMODULE_NOTIFY_EXPIRED:
|
|
return KeySpace_NotificationExpired;
|
|
case REDISMODULE_NOTIFY_MODULE:
|
|
return KeySpace_NotificationModule;
|
|
case REDISMODULE_NOTIFY_KEY_MISS:
|
|
return KeySpace_NotificationModuleKeyMiss;
|
|
case REDISMODULE_NOTIFY_STRING:
|
|
// We have two callbacks for STRING events in your OnLoad,
|
|
// For simplicity, pick the first:
|
|
return KeySpace_NotificationModuleString;
|
|
default:
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
int GetCallbackCountCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|
REDISMODULE_NOT_USED(argv);
|
|
REDISMODULE_NOT_USED(argc);
|
|
RedisModule_ReplyWithLongLong(ctx, callback_call_count);
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
static int CmdUnsub(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|
if (argc != 2) {
|
|
return RedisModule_WrongArity(ctx);
|
|
}
|
|
|
|
long long event_mask;
|
|
if (RedisModule_StringToLongLong(argv[1], &event_mask) != REDISMODULE_OK) {
|
|
return RedisModule_ReplyWithError(ctx, "ERR invalid event mask");
|
|
}
|
|
|
|
RedisModuleNotificationFunc cb = get_callback_for_event((int)event_mask);
|
|
if (cb == NULL) {
|
|
return RedisModule_ReplyWithError(ctx, "ERR unknown event mask");
|
|
}
|
|
|
|
if (RedisModule_UnsubscribeFromKeyspaceEvents(ctx, (int)event_mask, cb) != REDISMODULE_OK) {
|
|
return RedisModule_ReplyWithError(ctx, "ERR unsubscribe failed");
|
|
}
|
|
|
|
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
|
}
|
|
/* This function must be present on each Redis module. It is used in order to
|
|
* register the commands into the Redis server. */
|
|
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|
if (RedisModule_Init(ctx,"testkeyspace",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
loaded_event_log = RedisModule_CreateDict(ctx);
|
|
module_event_log = RedisModule_CreateDict(ctx);
|
|
|
|
int keySpaceAll = RedisModule_GetKeyspaceNotificationFlagsAll();
|
|
|
|
if (!(keySpaceAll & REDISMODULE_NOTIFY_LOADED)) {
|
|
// REDISMODULE_NOTIFY_LOADED event are not supported we can not start
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_LOADED, KeySpace_NotificationLoaded) != REDISMODULE_OK){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_GENERIC, KeySpace_NotificationGeneric) != REDISMODULE_OK){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_EXPIRED, KeySpace_NotificationExpired) != REDISMODULE_OK){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_MODULE, KeySpace_NotificationModule) != REDISMODULE_OK){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_MISS, KeySpace_NotificationModuleKeyMiss) != REDISMODULE_OK){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationModuleString) != REDISMODULE_OK){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationModuleStringPostNotificationJob) != REDISMODULE_OK){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx,"keyspace.notify", cmdNotify,"",0,0,0) == REDISMODULE_ERR){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx,"keyspace.is_module_key_notified", cmdIsModuleKeyNotified,"",0,0,0) == REDISMODULE_ERR){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx,"keyspace.is_key_loaded", cmdIsKeyLoaded,"",0,0,0) == REDISMODULE_ERR){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx, "keyspace.del_key_copy", cmdDelKeyCopy,
|
|
"write", 0, 0, 0) == REDISMODULE_ERR){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx, "keyspace.incr_case1", cmdIncrCase1,
|
|
"write", 0, 0, 0) == REDISMODULE_ERR){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx, "keyspace.incr_case2", cmdIncrCase2,
|
|
"write", 0, 0, 0) == REDISMODULE_ERR){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx, "keyspace.incr_case3", cmdIncrCase3,
|
|
"write", 0, 0, 0) == REDISMODULE_ERR){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx, "keyspace.incr_dels", cmdIncrDels,
|
|
"write", 0, 0, 0) == REDISMODULE_ERR){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx, "keyspace.get_dels", cmdGetDels,
|
|
"readonly", 0, 0, 0) == REDISMODULE_ERR){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx, "keyspace.unsubscribe", CmdUnsub, "write", 0, 0, 0) == REDISMODULE_ERR){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx, "keyspace.callback_count", GetCallbackCountCommand, "", 0, 0, 0)== REDISMODULE_ERR){
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx, "keyspace.subscribe_subkeys", cmdSubscribeSubkeys, "", 0, 0, 0) == REDISMODULE_ERR) {
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx, "keyspace.unsubscribe_subkeys", cmdUnsubscribeSubkeys, "", 0, 0, 0) == REDISMODULE_ERR) {
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx, "keyspace.get_subkey_events", cmdGetSubkeyEvents, "readonly", 0, 0, 0) == REDISMODULE_ERR) {
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx, "keyspace.reset_subkey_events", cmdResetSubkeyEvents, "", 0, 0, 0) == REDISMODULE_ERR) {
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx, "keyspace.notify_with_subkeys", cmdNotifyWithSubkeys, "write", 0, 0, 0) == REDISMODULE_ERR) {
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx, "keyspace.subscribe_require_subkeys", cmdSubscribeRequireSubkeys, "", 0, 0, 0) == REDISMODULE_ERR) {
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (RedisModule_CreateCommand(ctx, "keyspace.unsubscribe_require_subkeys", cmdUnsubscribeRequireSubkeys, "", 0, 0, 0) == REDISMODULE_ERR) {
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
if (argc == 1) {
|
|
const char *ptr = RedisModule_StringPtrLen(argv[0], NULL);
|
|
if (!strcasecmp(ptr, "noload")) {
|
|
/* This is a hint that we return ERR at the last moment of OnLoad. */
|
|
RedisModule_FreeDict(ctx, loaded_event_log);
|
|
RedisModule_FreeDict(ctx, module_event_log);
|
|
return REDISMODULE_ERR;
|
|
}
|
|
}
|
|
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
int RedisModule_OnUnload(RedisModuleCtx *ctx) {
|
|
RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(loaded_event_log, "^", NULL, 0);
|
|
char* key;
|
|
size_t keyLen;
|
|
RedisModuleString* val;
|
|
while((key = RedisModule_DictNextC(iter, &keyLen, (void**)&val))){
|
|
RedisModule_FreeString(ctx, val);
|
|
}
|
|
RedisModule_FreeDict(ctx, loaded_event_log);
|
|
RedisModule_DictIteratorStop(iter);
|
|
loaded_event_log = NULL;
|
|
|
|
iter = RedisModule_DictIteratorStartC(module_event_log, "^", NULL, 0);
|
|
while((key = RedisModule_DictNextC(iter, &keyLen, (void**)&val))){
|
|
RedisModule_FreeString(ctx, val);
|
|
}
|
|
RedisModule_FreeDict(ctx, module_event_log);
|
|
RedisModule_DictIteratorStop(iter);
|
|
module_event_log = NULL;
|
|
|
|
return REDISMODULE_OK;
|
|
}
|