add first draft version to support PostNotificationJobsPerKey

This commit is contained in:
Joan Fontanals Martinez 2026-05-21 14:32:46 +02:00
parent ba1a4b2c8f
commit b92690d7fd
5 changed files with 402 additions and 9 deletions

View file

@ -308,6 +308,7 @@ typedef void (*RedisModuleNotificationWithSubkeysFunc)(RedisModuleCtx *ctx, int
/* Function pointer type for post jobs */
typedef void (*RedisModulePostNotificationJobFunc) (RedisModuleCtx *ctx, void *pd);
typedef void (*RedisModulePostNotificationJobPerKeyFunc) (RedisModuleCtx *ctx, RedisModuleString *key, void *pd);
/* Keyspace notification subscriber information.
* See RM_SubscribeToKeyspaceEvents() for more information. */
@ -327,15 +328,44 @@ typedef struct RedisModuleKeyspaceSubscriber {
int active;
} RedisModuleKeyspaceSubscriber;
typedef struct RedisModulePostExecUnitJob {
/* The module subscribed to the event */
RedisModule *module;
RedisModulePostNotificationJobFunc callback;
typedef enum {
POST_EXEC_UNIT_JOB_SINGLE = 0,
POST_EXEC_UNIT_JOB_KEYED = 1,
} PostExecUnitJobType;
/* Per-key entry inside a keyed PostExecUnit job. The keyed job fans out and
* invokes its callback once per entry, in submission order. */
typedef struct RedisModulePostExecUnitKeyedEntry {
RedisModuleString *key; /* Owned reference; freed after the callback runs. */
void *pd;
void (*free_pd)(void*);
} RedisModulePostExecUnitKeyedEntry;
typedef struct RedisModulePostExecUnitJob {
PostExecUnitJobType type;
/* The module subscribed to the event */
RedisModule *module;
int dbid;
union {
struct {
RedisModulePostNotificationJobFunc callback;
void *pd;
void (*free_pd)(void*);
} single;
struct {
RedisModulePostNotificationJobPerKeyFunc callback;
list *entries; /* list of RedisModulePostExecUnitKeyedEntry* */
} keyed;
} u;
} RedisModulePostExecUnitJob;
/* Index key used to coalesce keyed post-exec-unit jobs by (module, callback)
* while the current execution unit is still in progress. */
typedef struct {
RedisModule *module;
RedisModulePostNotificationJobPerKeyFunc callback;
} PostExecUnitKeyedJobIndexKey;
/* The module keyspace notification subscribers list */
static list *moduleKeyspaceSubscribers;
@ -347,6 +377,12 @@ static int moduleKeyspaceSubscribersWithSubkeysTypes = 0;
/* The module post keyspace jobs list */
static list *modulePostExecUnitJobs;
/* Index from (module, per-key callback) -> listNode* in modulePostExecUnitJobs.
* Used so that submitting a second key for an already-pending keyed job appends
* to that job's entry list rather than enqueuing a new job. Reset to empty at
* the end of every drain. */
static dict *modulePostExecUnitKeyedJobIndex;
/* Data structures related to the exported dictionary data structure. */
typedef struct RedisModuleDict {
rax *rax; /* The radix tree. */
@ -9434,12 +9470,40 @@ void firePostExecutionUnitJobs(void) {
moduleCreateContext(&ctx, job->module, REDISMODULE_CTX_TEMP_CLIENT);
selectDb(ctx.client, job->dbid);
job->callback(&ctx, job->pd);
if (job->free_pd) job->free_pd(job->pd);
if (job->type == POST_EXEC_UNIT_JOB_SINGLE) {
job->u.single.callback(&ctx, job->u.single.pd);
if (job->u.single.free_pd) job->u.single.free_pd(job->u.single.pd);
} else {
/* Keyed job: fan out the callback over each collected key, sharing
* the same context. The index entry is removed so that any keys
* submitted *after* this point (e.g. from a nested KSN triggered
* inside the callback) start a fresh keyed job. */
PostExecUnitKeyedJobIndexKey idx = {
.module = job->module,
.callback = job->u.keyed.callback,
};
dictDelete(modulePostExecUnitKeyedJobIndex, &idx);
listIter li;
listNode *eln;
listRewind(job->u.keyed.entries, &li);
while ((eln = listNext(&li)) != NULL) {
RedisModulePostExecUnitKeyedEntry *e = listNodeValue(eln);
job->u.keyed.callback(&ctx, e->key, e->pd);
if (e->free_pd) e->free_pd(e->pd);
decrRefCount(e->key);
zfree(e);
}
listRelease(job->u.keyed.entries);
}
moduleFreeContext(&ctx);
zfree(job);
}
/* Defensive: any stale index entries (shouldn't be possible since every
* keyed job we enqueued got drained above, but cheap insurance). */
if (dictSize(modulePostExecUnitKeyedJobIndex) > 0)
dictEmpty(modulePostExecUnitKeyedJobIndex, NULL);
exitExecutionUnit();
}
@ -9465,16 +9529,69 @@ int RM_AddPostNotificationJob(RedisModuleCtx *ctx, RedisModulePostNotificationJo
return REDISMODULE_ERR;
}
RedisModulePostExecUnitJob *job = zmalloc(sizeof(*job));
job->type = POST_EXEC_UNIT_JOB_SINGLE;
job->module = ctx->module;
job->callback = callback;
job->pd = privdata;
job->free_pd = free_privdata;
job->dbid = ctx->client->db->id;
job->u.single.callback = callback;
job->u.single.pd = privdata;
job->u.single.free_pd = free_privdata;
listAddNodeTail(modulePostExecUnitJobs, job);
return REDISMODULE_OK;
}
/* Sibling of `RM_AddPostNotificationJob` that fans out per-key. Multiple
* submissions of the same (module, callback) pair within a single execution
* unit are coalesced into a single job whose callback is invoked once per
* collected key, in submission order. This lets a module react to several
* keys touched by the same MULTI/EXEC in one atomic propagation block.
*
* Refusal rules, dbid pinning, replication atomicity, and re-entrancy
* semantics are identical to `RM_AddPostNotificationJob`.
*
* `key` must be a valid RedisModuleString. The implementation takes its own
* reference; the caller retains ownership of its own reference and may free
* it at any time. `free_pd` may be NULL.
*
* Return REDISMODULE_OK on success and REDISMODULE_ERR if called while loading
* data from disk (AOF or RDB) or on a read-only replica. */
int RM_AddPostNotificationJobForKey(RedisModuleCtx *ctx, RedisModulePostNotificationJobPerKeyFunc callback, RedisModuleString *key, void *privdata, void (*free_privdata)(void*)) {
if (server.loading || (server.masterhost && server.repl_slave_ro)) {
return REDISMODULE_ERR;
}
RedisModulePostExecUnitKeyedEntry *entry = zmalloc(sizeof(*entry));
entry->key = key;
incrRefCount(entry->key);
entry->pd = privdata;
entry->free_pd = free_privdata;
PostExecUnitKeyedJobIndexKey idx_key = {
.module = ctx->module,
.callback = callback,
};
dictEntry *de = dictFind(modulePostExecUnitKeyedJobIndex, &idx_key);
if (de) {
RedisModulePostExecUnitJob *job = dictGetVal(de);
listAddNodeTail(job->u.keyed.entries, entry);
return REDISMODULE_OK;
}
RedisModulePostExecUnitJob *job = zmalloc(sizeof(*job));
job->type = POST_EXEC_UNIT_JOB_KEYED;
job->module = ctx->module;
job->dbid = ctx->client->db->id;
job->u.keyed.callback = callback;
job->u.keyed.entries = listCreate();
listAddNodeTail(job->u.keyed.entries, entry);
listAddNodeTail(modulePostExecUnitJobs, job);
PostExecUnitKeyedJobIndexKey *idx = zmalloc(sizeof(*idx));
*idx = idx_key;
dictAdd(modulePostExecUnitKeyedJobIndex, idx, job);
return REDISMODULE_OK;
}
/* Get the configured bitmap of notify-keyspace-events (Could be used
* for additional filtering in RedisModuleNotificationFunc) */
int RM_GetNotifyKeyspaceEvents(void) {
@ -12994,6 +13111,31 @@ dictType moduleAPIDictType = {
NULL /* allow to expand */
};
static uint64_t postExecUnitKeyedJobIndexHash(const void *key) {
return dictGenHashFunction(key, sizeof(PostExecUnitKeyedJobIndexKey));
}
static int postExecUnitKeyedJobIndexCompare(dictCmpCache *cache, const void *k1, const void *k2) {
UNUSED(cache);
const PostExecUnitKeyedJobIndexKey *a = k1, *b = k2;
return a->module == b->module && a->callback == b->callback;
}
static void postExecUnitKeyedJobIndexKeyDtor(dict *d, void *k) {
UNUSED(d);
zfree(k);
}
dictType postExecUnitKeyedJobIndexDictType = {
postExecUnitKeyedJobIndexHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
postExecUnitKeyedJobIndexCompare, /* key compare */
postExecUnitKeyedJobIndexKeyDtor, /* key destructor */
NULL, /* val destructor */
NULL /* allow to expand */
};
int moduleRegisterApi(const char *funcname, void *funcptr) {
return dictAdd(server.moduleapi, (char*)funcname, funcptr);
}
@ -13034,6 +13176,7 @@ void moduleInitModulesSystem(void) {
moduleKeyspaceSubscribers = listCreate();
modulePostExecUnitJobs = listCreate();
modulePostExecUnitKeyedJobIndex = dictCreate(&postExecUnitKeyedJobIndexDictType);
/* Set up filter list */
moduleCommandFilters = listCreate();
@ -15573,6 +15716,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(SubscribeToKeyspaceEventsWithSubkeys);
REGISTER_API(UnsubscribeFromKeyspaceEventsWithSubkeys);
REGISTER_API(AddPostNotificationJob);
REGISTER_API(AddPostNotificationJobForKey);
REGISTER_API(RegisterClusterMessageReceiver);
REGISTER_API(SendClusterMessage);
REGISTER_API(GetClusterNodeInfo);

View file

@ -996,6 +996,7 @@ typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlocke
typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
typedef void (*RedisModuleNotificationWithSubkeysFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key, RedisModuleString **subkeys, int count);
typedef void (*RedisModulePostNotificationJobFunc) (RedisModuleCtx *ctx, void *pd);
typedef void (*RedisModulePostNotificationJobPerKeyFunc) (RedisModuleCtx *ctx, RedisModuleString *key, void *pd);
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int when);
@ -1383,6 +1384,7 @@ REDISMODULE_API int (*RedisModule_UnsubscribeFromKeyspaceEvents)(RedisModuleCtx
REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEventsWithSubkeys)(RedisModuleCtx *ctx, int types, int flags, RedisModuleNotificationWithSubkeysFunc cb) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_UnsubscribeFromKeyspaceEventsWithSubkeys)(RedisModuleCtx *ctx, int types, int flags, RedisModuleNotificationWithSubkeysFunc cb) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AddPostNotificationJob)(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AddPostNotificationJobForKey)(RedisModuleCtx *ctx, RedisModulePostNotificationJobPerKeyFunc callback, RedisModuleString *key, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_NotifyKeyspaceEventWithSubkeys)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key, RedisModuleString **subkeys, int count) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)(void) REDISMODULE_ATTR;
@ -1788,6 +1790,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(SubscribeToKeyspaceEventsWithSubkeys);
REDISMODULE_GET_API(UnsubscribeFromKeyspaceEventsWithSubkeys);
REDISMODULE_GET_API(AddPostNotificationJob);
REDISMODULE_GET_API(AddPostNotificationJobForKey);
REDISMODULE_GET_API(NotifyKeyspaceEvent);
REDISMODULE_GET_API(NotifyKeyspaceEventWithSubkeys);
REDISMODULE_GET_API(GetNotifyKeyspaceEvents);

View file

@ -49,6 +49,27 @@ static void KeySpace_PostNotificationString(RedisModuleCtx *ctx, void *pd) {
RedisModule_FreeCallReply(rep);
}
/* Per-key post-notification callback: appends each batched key to a single
* list, so the test can assert all keys touched in one execution unit fan
* out into the same MULTI/EXEC replication block. */
static void KeySpace_PostNotificationBatchedKey(RedisModuleCtx *ctx, RedisModuleString *key, void *pd) {
REDISMODULE_NOT_USED(pd);
RedisModuleCallReply *rep = RedisModule_Call(ctx, "lpush", "!cs", "batched_keys", key);
RedisModule_FreeCallReply(rep);
}
static int KeySpace_NotificationBatched(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "batched_", 8) != 0) return REDISMODULE_OK;
if (strcmp(key_str, "batched_keys") == 0) return REDISMODULE_OK; /* skip our sink list */
RedisModule_AddPostNotificationJobForKey(ctx, KeySpace_PostNotificationBatchedKey, key, NULL, NULL);
return REDISMODULE_OK;
}
static int KeySpace_NotificationExpired(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
@ -269,6 +290,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationBatched) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if (with_key_events) {
if(RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_Key, KeySpace_ServerEventCallback) != REDISMODULE_OK){
return REDISMODULE_ERR;

View file

@ -0,0 +1,173 @@
# Keyspace Notification Test Coverage Analysis
This document analyzes the test coverage for keyspace notifications that affect
modules like RediSearch, which use `SetKeyMeta` in notification callbacks.
## RediSearch Notification Events
RediSearch's `HashNotificationCallback` handles the following events (from
`modules/redisearch/src/src/notifications.c`):
```c
typedef enum {
_null_cmd,
hset_cmd,
hmset_cmd,
hsetnx_cmd,
hincrby_cmd,
hincrbyfloat_cmd,
hdel_cmd,
del_cmd,
set_cmd,
rename_from_cmd,
rename_to_cmd,
trimmed_cmd,
restore_cmd,
expire_cmd,
persist_cmd,
expired_cmd,
hexpire_cmd,
hpersist_cmd,
hexpired_cmd,
evicted_cmd,
change_cmd,
loaded_cmd,
copy_to_cmd,
} RedisCmd;
```
## Test Coverage Matrix
Legend:
- ✅ = Already covered (before this PR)
- 🆕 = Added in this PR
- ❌ = Not covered
### Hash Events (NOTIFY_HASH)
| Event | Commands That Trigger | Coverage | Notes |
|-------|----------------------|----------|-------|
| `hset` | HSET, HMSET, HSETNX | ✅ Already covered | HSET, HMSET, HSETNX tests |
| `hset` | HSETEX | 🆕 **This PR** | HSETEX test |
| `hdel` | HDEL | 🆕 **This PR** | HDEL test |
| `hdel` | HGETDEL | 🆕 **This PR** | HGETDEL test |
| `hdel` | HGETEX (past timestamp), HSETEX (past timestamp), HEXPIRE (past timestamp) | 🆕 **This PR** | Covered via HGETEX/HSETEX/HEXPIRE tests |
| `hincrby` | HINCRBY | ✅ Already covered | HINCRBY test |
| `hincrbyfloat` | HINCRBYFLOAT | ✅ Already covered | HINCRBYFLOAT test |
| `hexpire` | HEXPIRE, HPEXPIRE, HEXPIREAT, HPEXPIREAT | 🆕 **This PR** | HEXPIRE test |
| `hexpire` | HGETEX (with EX/PX), HSETEX (with EX/PX) | 🆕 **This PR** | HGETEX/HSETEX tests |
| `hpersist` | HPERSIST | 🆕 **This PR** | HPERSIST test |
| `hpersist` | HGETEX (with PERSIST) | 🆕 **This PR** | HGETEX test |
| `hexpired` | Lazy field expiration, Active field expiration | 🆕 **This PR** | Hash field expiry test |
### Generic Events (NOTIFY_GENERIC)
| Event | Commands That Trigger | Coverage | Notes |
|-------|----------------------|----------|-------|
| `del` | DEL, UNLINK | ✅ Already covered | DEL test |
| `del` | Hash becomes empty | 🆕 **This PR** | Via HDEL/HGETDEL tests |
| `rename_from` | RENAME, RENAMENX | ✅ Already covered | RENAME test |
| `rename_to` | RENAME, RENAMENX | ✅ Already covered | RENAME test |
| `restore` | RESTORE | ✅ Already covered | RESTORE test |
| `expire` | EXPIRE, PEXPIRE, EXPIREAT, PEXPIREAT | ✅ Already covered | EXPIRE test |
| `expire` | SET (with EX/PX), GETEX (with EX/PX), SETEX, PSETEX | ⚠️ Partial | SET test covers some |
| `persist` | PERSIST | 🆕 **This PR** | PERSIST test |
| `copy_to` | COPY | 🆕 **This PR** | COPY test |
| `loaded` | RDB load (DEBUG RELOAD, server restart) | ✅ Already covered | DEBUG RELOAD test |
### String Events (NOTIFY_STRING)
| Event | Commands That Trigger | Coverage | Notes |
|-------|----------------------|----------|-------|
| `set` | SET, SETEX, PSETEX, SETNX, SETRANGE, etc. | ✅ Already covered | SET test |
### Expired/Evicted Events
| Event | Commands That Trigger | Coverage | Notes |
|-------|----------------------|----------|-------|
| `expired` | Key expiration (lazy or active) | ✅ Already covered | EXPIRE test waits for expiry |
| `evicted` | Memory eviction | ❌ Not tested | Requires maxmemory config |
## Summary: What This PR Adds
### New Tests Added in This PR
| Test | Events Covered | Status Without Fix |
|------|----------------|-------------------|
| HSETEX | `hset`, `hexpire`, `hdel` | ❌ **CRASHES** |
| HGETDEL | `hdel`, `hexpired` | ❌ **CRASHES** |
| HGETEX | `hexpire`, `hpersist`, `hdel` | ❌ **CRASHES** |
| HDEL | `hdel` | ❌ **CRASHES** |
| HEXPIRE | `hexpire`, `hdel` | ❌ **CRASHES** |
| HPERSIST | `hpersist` | ✅ Passes |
| (field expiry) | `hexpired` | ✅ Passes |
| PERSIST | `persist` | ✅ Passes |
| COPY | `copy_to` | ✅ Passes |
### Bug Fixes Required
Commands that need fixing for use-after-reallocation when `SetKeyMeta` is called:
- `hsetexCommand` - accesses `o` after `notifyKeyspaceEvent`
- `hgetdelCommand` - accesses `o` after `notifyKeyspaceEvent`
- `hgetexCommand` - accesses `o` after `notifyKeyspaceEvent`
- `hdelCommand` - accesses `o` after `notifyKeyspaceEvent`
- `hexpireGenericCommand` - accesses `hashObj` after `notifyKeyspaceEvent`
## Still Not Covered (Future Work)
| Event | Command/Trigger | Reason |
|-------|-----------------|--------|
| `evicted` | Memory eviction | Requires maxmemory configuration |
## Command to Event Mapping
### Commands Already Covered (Before This PR)
| Command | Event(s) Triggered |
|---------|-------------------|
| HSETNX | `hset` |
| HSET | `hset` |
| HMSET | `hset` |
| HINCRBY | `hincrby` |
| HINCRBYFLOAT | `hincrbyfloat` |
| SET | `set` |
| APPEND | `append` |
| INCR | `incrby` |
| INCRBY | `incrby` |
| INCRBYFLOAT | `incrbyfloat` |
| GETSET | `set` |
| SETRANGE | `setrange` |
| DEL | `del` |
| RENAME | `rename_from`, `rename_to` |
| RESTORE | `restore` |
| EXPIRE/PEXPIRE | `expire` |
| DEBUG RELOAD | `loaded` |
### Commands Added in This PR
| Command | Event(s) Triggered |
|---------|-------------------|
| HSETEX | `hset`, `hexpire`, `hdel` |
| HGETDEL | `hdel`, `hexpired` |
| HGETEX | `hexpire`, `hpersist`, `hdel` |
| HDEL | `hdel` |
| HEXPIRE/HPEXPIRE | `hexpire`, `hdel` |
| HPERSIST | `hpersist` |
### Commands NOT Covered
| Command | Event(s) Triggered |
|---------|-------------------|
| PERSIST | `persist` |
| COPY | `copy_to` |
| (field expiry wait) | `hexpired` |
## Source Files Reference
- Notification events: `src/notify.c`
- Hash notifications: `src/t_hash.c`
- String notifications: `src/t_string.c`
- Generic notifications: `src/db.c`, `src/expire.c`
- RediSearch handler: `modules/redisearch/src/src/notifications.c`
- Test module: `tests/modules/keymeta_notify.c`
- Test file: `tests/unit/moduleapi/ksn_notify_side_effect.tcl`

View file

@ -148,6 +148,54 @@ tags "modules external:skip" {
close_replication_stream $repl
}
test {Test per-key post notification job fans out within a MULTI/EXEC} {
r flushall
set repl [attach_to_replication_stream]
r multi
r set batched_a 1
r set batched_b 2
r set batched_c 3
r exec
assert_equal {batched_c batched_b batched_a} [r lrange batched_keys 0 -1]
# The three SETs are one execution unit; the keyed post-notification
# job coalesces them and the callback fans out into three LPUSHes
# inside the same MULTI/EXEC propagation block.
assert_replication_stream $repl {
{multi}
{select *}
{set batched_a 1}
{set batched_b 2}
{set batched_c 3}
{lpush batched_keys batched_a}
{lpush batched_keys batched_b}
{lpush batched_keys batched_c}
{exec}
}
close_replication_stream $repl
}
test {Test per-key post notification job from a multi-key command} {
r flushall
set repl [attach_to_replication_stream]
r mset batched_a 1 batched_b 2 batched_c 3
assert_equal {batched_c batched_b batched_a} [r lrange batched_keys 0 -1]
assert_replication_stream $repl {
{multi}
{select *}
{mset batched_a 1 batched_b 2 batched_c 3}
{lpush batched_keys batched_a}
{lpush batched_keys batched_b}
{lpush batched_keys batched_c}
{exec}
}
close_replication_stream $repl
}
test {Test eviction} {
r flushall
set repl [attach_to_replication_stream]