This commit is contained in:
Joan Fontanals 2026-05-28 04:00:21 +02:00 committed by GitHub
commit 3c3bf69d66
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 1267 additions and 310 deletions

View file

@ -1681,6 +1681,12 @@ int loadSingleAppendOnlyFile(char *filename) {
queueMultiCommand(fakeClient, cmd->flags);
} else {
cmd->proc(fakeClient);
/* AOF replay bypasses call()/afterCommand(); drain the per-key
* post-notification queue here so module callbacks fire once per
* replayed single command, mirroring the normal-execution path.
* EXEC's sub-commands run through call() above and are drained
* by afterCommand() for each sub-command. */
firePostKeyedNotificationJobs();
fakeClient->all_argv_len_sum = 0; /* Otherwise no one cleans this up and we reach cleanup with it non-zero */
}
@ -1760,6 +1766,12 @@ fmterr: /* Format error. */
/* fall through to cleanup. */
cleanup:
/* Drain any per-key post-notification jobs left over from a partially
* applied command before tearing down the fake client. In the success
* path the per-iteration drain after cmd->proc() has already emptied
* the queue; this catches stragglers from any aborted-mid-execution
* path so the next caller doesn't observe stale jobs. */
firePostKeyedNotificationJobs();
if (fakeClient) freeClient(fakeClient);
server.current_client = old_cur_client;
server.executing_client = old_exec_client;

File diff suppressed because it is too large Load diff

View file

@ -996,6 +996,7 @@ typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlocke
typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
typedef void (*RedisModuleNotificationWithSubkeysFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key, RedisModuleString **subkeys, int count);
typedef void (*RedisModulePostNotificationJobFunc) (RedisModuleCtx *ctx, void *pd);
typedef void (*RedisModulePostNotificationJobPerKeyFunc) (RedisModuleCtx *ctx, RedisModuleString *key, void *pd);
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int when);
@ -1383,6 +1384,7 @@ REDISMODULE_API int (*RedisModule_UnsubscribeFromKeyspaceEvents)(RedisModuleCtx
REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEventsWithSubkeys)(RedisModuleCtx *ctx, int types, int flags, RedisModuleNotificationWithSubkeysFunc cb) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_UnsubscribeFromKeyspaceEventsWithSubkeys)(RedisModuleCtx *ctx, int types, int flags, RedisModuleNotificationWithSubkeysFunc cb) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AddPostNotificationJob)(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AddPostNotificationJobForKey)(RedisModuleCtx *ctx, RedisModulePostNotificationJobPerKeyFunc callback, RedisModuleString *key, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_NotifyKeyspaceEventWithSubkeys)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key, RedisModuleString **subkeys, int count) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)(void) REDISMODULE_ATTR;
@ -1789,6 +1791,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(SubscribeToKeyspaceEventsWithSubkeys);
REDISMODULE_GET_API(UnsubscribeFromKeyspaceEventsWithSubkeys);
REDISMODULE_GET_API(AddPostNotificationJob);
REDISMODULE_GET_API(AddPostNotificationJobForKey);
REDISMODULE_GET_API(NotifyKeyspaceEvent);
REDISMODULE_GET_API(NotifyKeyspaceEventWithSubkeys);
REDISMODULE_GET_API(GetNotifyKeyspaceEvents);

View file

@ -186,7 +186,7 @@ void _serverLog(int level, const char *fmt, ...) {
serverLogRaw(level,msg);
}
/* Low level logging from signal handler. Should be used with pre-formatted strings.
/* Low level logging from signal handler. Should be used with pre-formatted strings.
See serverLogFromHandler. */
void serverLogRawFromHandler(int level, const char *msg) {
int fd;
@ -274,7 +274,7 @@ mstime_t commandTimeSnapshot(void) {
/* After an RDB dump or AOF rewrite we exit from children using _exit() instead of
* exit(), because the latter may interact with the same file objects used by
* the parent process. However if we are testing the coverage normal exit() is
* used in order to obtain the right coverage information.
* used in order to obtain the right coverage information.
* There is a caveat for when we exit due to a signal.
* In this case we want the function to be async signal safe, so we can't use exit()
*/
@ -793,7 +793,7 @@ dictType clientDictType = {
NULL, /* val dup */
dictClientKeyCompare, /* key compare */
.no_value = 1, /* no values in this dict */
.keys_are_odd = 0 /* a client pointer is not an odd pointer */
.keys_are_odd = 0 /* a client pointer is not an odd pointer */
};
kvstoreType kvstoreBaseType = {
@ -1735,7 +1735,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* a higher frequency. */
run_with_period(1000) {
if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) &&
server.aof_last_write_status == C_ERR)
server.aof_last_write_status == C_ERR)
{
flushAppendOnlyFile(0);
}
@ -1745,8 +1745,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
updatePausedActions();
/* Replication cron function -- used to reconnect to master,
* detect transfer failures, start background RDB transfers and so forth.
*
* detect transfer failures, start background RDB transfers and so forth.
*
* If Redis is trying to failover then run the replication cron faster so
* progress on the handshake happens more quickly. */
if (server.failover_state != NO_FAILOVER) {
@ -1986,7 +1986,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* processUnblockedClients(), so if there are multiple pipelined WAITs
* and the just unblocked WAIT gets blocked again, we don't have to wait
* a server cron cycle in absence of other event loop events. See #6623.
*
*
* We also don't send the ACKs while clients are paused, since it can
* increment the replication backlog, they'll be sent after the pause
* if we are still the master. */
@ -1996,7 +1996,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
}
/* We may have received updates from clients about their current offset. NOTE:
* this can't be done where the ACK is received since failover will disconnect
* this can't be done where the ACK is received since failover will disconnect
* our clients. */
updateFailoverStatus();
@ -2972,6 +2972,8 @@ void initServer(void) {
server.errors = raxNew();
server.errors_enabled = 1;
server.execution_nesting = 0;
server.firing_keyed_post_notif_jobs = 0;
server.in_keyspace_notification = 0;
server.clients = listCreate();
server.clients_index = raxNew();
server.clients_to_close = listCreate();
@ -3847,7 +3849,25 @@ void postExecutionUnitOperations(void) {
if (server.execution_nesting)
return;
/* At this site the regular queue drains first and the keyed queue runs
* after it. We're past every command's afterCommand drain by now (no
* more sub-commands coming), so the per-sub-command ordering that
* afterCommand needs doesn't apply here. Keeping regular first matches
* the in-process registration order of the existing API and gives
* identical propagated streams between the two APIs for the cron-driven
* paths (active-expire in expire.c, eviction in evict.c both call us
* directly after their own enter/exitExecutionUnit).
*
* Cross-queue registration during these drains is unsupported: a keyed
* callback that causes a regular job to be queued (e.g. via a KSN
* surfaced from its RM_Call landing in another module's handler) leaves
* that regular job in the queue past propagatePendingCommands, so its
* writes land in a separate replication transaction. The contract
* around what keyed callbacks may do (documented on
* RM_AddPostNotificationJobForKey in module.c) is what keeps this from
* triggering in practice; modules are responsible for upholding it. */
firePostExecutionUnitJobs();
firePostKeyedNotificationJobs();
/* If we are at the top-most call() and not inside a an active module
* context (e.g. within a module timer) we can propagate what we accumulated. */
@ -4248,6 +4268,12 @@ void rejectCommandFormat(client *c, const char *fmt, ...) {
/* This is called after a command in call, we can do some maintenance job in it. */
void afterCommand(client *c) {
/* Fire keyed post-notification jobs first, before any propagation. These
* fire after every command (including each sub-command inside MULTI/EXEC),
* regardless of execution-unit nesting, so a module can react to a key
* before the next sub-command observes it. */
firePostKeyedNotificationJobs();
/* Should be done before trackingHandlePendingKeyInvalidations so that we
* reply to client before invalidating cache (makes more sense) */
postExecutionUnitOperations();
@ -4743,12 +4769,12 @@ int processCommand(client *c) {
/* If the server is paused, block the client until
* the pause has ended. Replicas are never paused. */
if (!(c->flags & CLIENT_SLAVE) &&
if (!(c->flags & CLIENT_SLAVE) &&
((isPausedActions(PAUSE_ACTION_CLIENT_ALL)) ||
((isPausedActions(PAUSE_ACTION_CLIENT_WRITE)) && is_may_replicate_command)))
{
blockPostponeClient(c);
return C_OK;
return C_OK;
}
/* Exec the command */
@ -6913,7 +6939,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
/* Hotkeys */
if (all_sections || (dictFind(section_dict,"hotkeys") != NULL))
{
if (sections++) info = sdscat(info,"\r\n");
if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info, "# Hotkeys\r\n");
if (server.hotkeys) {

View file

@ -2066,6 +2066,12 @@ struct redisServer {
int execution_nesting; /* Execution nesting level.
* e.g. call(), async module stuff (timers, events, etc.),
* cron stuff (active expire, eviction) */
int firing_keyed_post_notif_jobs; /* Re-entrance guard while
* firePostKeyedNotificationJobs is draining. */
int in_keyspace_notification; /* >0 while inside a moduleNotifyKeyspaceEvent
* dispatch. Defines the scope from which
* RM_AddPostNotificationJobForKey may be called;
* a counter so nested notifications nest cleanly. */
rax *clients_index; /* Active clients dictionary by client ID. */
uint32_t paused_actions; /* Bitmask of actions that are currently paused */
list *postponed_clients; /* List of postponed clients */
@ -3123,6 +3129,7 @@ int moduleTryAcquireGIL(void);
void moduleReleaseGIL(void);
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid, robj **subkeys, int count);
void firePostExecutionUnitJobs(void);
void firePostKeyedNotificationJobs(void);
void moduleCallCommandFilters(client *c);
void modulePostExecutionUnitOperations(void);
void ModuleForkDoneHandler(int exitcode, int bysignal);

View file

@ -86,6 +86,7 @@ TEST_MODULES = \
configaccess.so \
test_keymeta.so \
keymeta_notify.so \
postnotifications_perkey_metadata.so \
atomicslotmigration.so
.PHONY: all

View file

@ -10,20 +10,13 @@
* GNU Affero General Public License v3 (AGPLv3).
*/
/* This module allow to verify 'RedisModule_AddPostNotificationJob' by registering to 3
* key space event:
* * STRINGS - the module register to all strings notifications and set post notification job
* that increase a counter indicating how many times the string key was changed.
* In addition, it increase another counter that counts the total changes that
* was made on all strings keys.
* * EXPIRED - the module register to expired event and set post notification job that that
* counts the total number of expired events.
* * EVICTED - the module register to evicted event and set post notification job that that
* counts the total number of evicted events.
*
* In addition, the module register a new command, 'postnotification.async_set', that performs a set
* command from a background thread. This allows to check the 'RedisModule_AddPostNotificationJob' on
* notifications that was triggered on a background thread. */
/* This module supports both the regular post-notification API
* (RedisModule_AddPostNotificationJob) and the per-key API
* (RedisModule_AddPostNotificationJobForKey). A load arg
* "regular" or "perkey" selects which API the keyspace handlers register
* against (defaults to "regular" if omitted). The keyspace handlers and post-job
* side effects are otherwise unchanged: only the registration call differs
* between modes. */
#define _BSD_SOURCE
#define _DEFAULT_SOURCE /* For usleep */
@ -34,6 +27,9 @@
#include <unistd.h>
#include <pthread.h>
enum api_mode { MODE_REGULAR, MODE_PERKEY };
static int g_api_mode = MODE_REGULAR;
static void KeySpace_PostNotificationStringFreePD(void *pd) {
RedisModule_FreeString(NULL, pd);
}
@ -49,13 +45,42 @@ static void KeySpace_PostNotificationString(RedisModuleCtx *ctx, void *pd) {
RedisModule_FreeCallReply(rep);
}
/* Per-key-API trampolines: the per-key API's callback takes an extra `key`
* argument; we ignore it and delegate to the regular-API callback so the
* post-job effect stays identical across modes. */
static void KeySpace_PostNotificationStringPerKey(RedisModuleCtx *ctx, RedisModuleString *key, void *pd) {
REDISMODULE_NOT_USED(key);
KeySpace_PostNotificationString(ctx, pd);
}
static void KeySpace_PostNotificationReadKeyPerKey(RedisModuleCtx *ctx, RedisModuleString *key, void *pd) {
REDISMODULE_NOT_USED(key);
KeySpace_PostNotificationReadKey(ctx, pd);
}
/* Register a post-notification job through the API selected by g_api_mode. */
static int RegisterIncrJob(RedisModuleCtx *ctx, RedisModuleString *trigger_key, RedisModuleString *target) {
if (g_api_mode == MODE_REGULAR) {
return RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, target, KeySpace_PostNotificationStringFreePD);
} else {
return RedisModule_AddPostNotificationJobForKey(ctx, KeySpace_PostNotificationStringPerKey, trigger_key, target, KeySpace_PostNotificationStringFreePD);
}
}
static int RegisterGetJob(RedisModuleCtx *ctx, RedisModuleString *trigger_key, RedisModuleString *target) {
if (g_api_mode == MODE_REGULAR) {
return RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationReadKey, target, KeySpace_PostNotificationStringFreePD);
} else {
return RedisModule_AddPostNotificationJobForKey(ctx, KeySpace_PostNotificationReadKeyPerKey, trigger_key, target, KeySpace_PostNotificationStringFreePD);
}
}
static int KeySpace_NotificationExpired(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
REDISMODULE_NOT_USED(key);
RedisModuleString *new_key = RedisModule_CreateString(NULL, "expired", 7);
int res = RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
int res = RegisterIncrJob(ctx, key, new_key);
if (res == REDISMODULE_ERR) KeySpace_PostNotificationStringFreePD(new_key);
return REDISMODULE_OK;
}
@ -76,7 +101,7 @@ static int KeySpace_NotificationEvicted(RedisModuleCtx *ctx, int type, const cha
}
RedisModuleString *new_key = RedisModule_CreateString(NULL, "evicted", 7);
int res = RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
int res = RegisterIncrJob(ctx, key, new_key);
if (res == REDISMODULE_ERR) KeySpace_PostNotificationStringFreePD(new_key);
return REDISMODULE_OK;
}
@ -103,7 +128,7 @@ static int KeySpace_NotificationString(RedisModuleCtx *ctx, int type, const char
new_key = RedisModule_CreateStringPrintf(NULL, "string_changed{%s}", key_str);
}
int res = RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
int res = RegisterIncrJob(ctx, key, new_key);
if (res == REDISMODULE_ERR) KeySpace_PostNotificationStringFreePD(new_key);
return REDISMODULE_OK;
}
@ -120,7 +145,7 @@ static int KeySpace_LazyExpireInsidePostNotificationJob(RedisModuleCtx *ctx, int
}
RedisModuleString *new_key = RedisModule_CreateString(NULL, key_str + 5, strlen(key_str) - 5);;
int res = RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationReadKey, new_key, KeySpace_PostNotificationStringFreePD);
int res = RegisterGetJob(ctx, key, new_key);
if (res == REDISMODULE_ERR) KeySpace_PostNotificationStringFreePD(new_key);
return REDISMODULE_OK;
}
@ -145,30 +170,40 @@ static int KeySpace_NestedNotification(RedisModuleCtx *ctx, int type, const char
return REDISMODULE_OK;
}
typedef struct AsyncSetArgs {
RedisModuleBlockedClient *bc;
RedisModuleString *key;
} AsyncSetArgs;
static void *KeySpace_PostNotificationsAsyncSetInner(void *arg) {
RedisModuleBlockedClient *bc = arg;
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
AsyncSetArgs *args = arg;
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(args->bc);
RedisModule_ThreadSafeContextLock(ctx);
RedisModuleCallReply* rep = RedisModule_Call(ctx, "set", "!cc", "string_x", "1");
RedisModuleCallReply* rep = RedisModule_Call(ctx, "set", "!sc", args->key, "1");
RedisModule_ThreadSafeContextUnlock(ctx);
RedisModule_ReplyWithCallReply(ctx, rep);
RedisModule_FreeCallReply(rep);
RedisModule_UnblockClient(bc, NULL);
RedisModule_UnblockClient(args->bc, NULL);
RedisModule_FreeThreadSafeContext(ctx);
RedisModule_FreeString(NULL, args->key);
RedisModule_Free(args);
return NULL;
}
static int KeySpace_PostNotificationsAsyncSet(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
if (argc != 1)
if (argc != 2)
return RedisModule_WrongArity(ctx);
pthread_t tid;
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
AsyncSetArgs *args = RedisModule_Alloc(sizeof(*args));
args->bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
args->key = RedisModule_HoldString(NULL, argv[1]);
if (pthread_create(&tid,NULL,KeySpace_PostNotificationsAsyncSetInner,bc) != 0) {
RedisModule_AbortBlock(bc);
pthread_t tid;
if (pthread_create(&tid,NULL,KeySpace_PostNotificationsAsyncSetInner,args) != 0) {
RedisModule_AbortBlock(args->bc);
RedisModule_FreeString(NULL, args->key);
RedisModule_Free(args);
return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
}
pthread_detach(tid);
@ -225,12 +260,96 @@ static void KeySpace_ServerEventCallback(RedisModuleCtx *ctx, RedisModuleEvent e
if (res == REDISMODULE_ERR) KeySpace_ServerEventPostNotificationFree(pn_ctx);
}
/* Per-key-only fixtures: behaviors with no regular-API equivalent. Registered
* only when the module is loaded in "perkey" mode. */
static void KeySpace_PostNotificationBatchedKey(RedisModuleCtx *ctx, RedisModuleString *key, void *pd) {
REDISMODULE_NOT_USED(pd);
RedisModuleCallReply *rep = RedisModule_Call(ctx, "lpush", "!cs", "batched_keys", key);
if (rep) RedisModule_FreeCallReply(rep);
}
static int KeySpace_NotificationBatched(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "batched_", 8) != 0) return REDISMODULE_OK;
if (strcmp(key_str, "batched_keys") == 0) return REDISMODULE_OK; /* skip our sink list */
RedisModule_AddPostNotificationJobForKey(ctx, KeySpace_PostNotificationBatchedKey, key, NULL, NULL);
return REDISMODULE_OK;
}
static void KeySpace_PostNotificationHashKey(RedisModuleCtx *ctx, RedisModuleString *key, void *pd) {
REDISMODULE_NOT_USED(pd);
RedisModuleCallReply *rep = RedisModule_Call(ctx, "lpush", "!cs", "hash_keys", key);
if (rep) RedisModule_FreeCallReply(rep);
}
static int KeySpace_NotificationHash(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "hash_", 5) != 0) return REDISMODULE_OK;
RedisModule_AddPostNotificationJobForKey(ctx, KeySpace_PostNotificationHashKey, key, NULL, NULL);
return REDISMODULE_OK;
}
static int reentrance_in_outer_callback = 0;
static void KeySpace_PostNotificationReentranceProbe(RedisModuleCtx *ctx, RedisModuleString *key, void *pd) {
REDISMODULE_NOT_USED(pd);
const char *key_str = RedisModule_StringPtrLen(key, NULL);
RedisModuleCallReply *rep;
if (strcmp(key_str, "reentrant_outer") == 0) {
reentrance_in_outer_callback = 1;
rep = RedisModule_Call(ctx, "set", "!cc", "reentrant_inner", "1");
if (rep) RedisModule_FreeCallReply(rep);
reentrance_in_outer_callback = 0;
rep = RedisModule_Call(ctx, "lpush", "!cc", "reentrance_log", "outer_done");
if (rep) RedisModule_FreeCallReply(rep);
} else if (strcmp(key_str, "reentrant_inner") == 0) {
const char *marker = reentrance_in_outer_callback ? "REENTRANCE_DETECTED" : "inner_after_outer";
rep = RedisModule_Call(ctx, "lpush", "!cc", "reentrance_log", marker);
if (rep) RedisModule_FreeCallReply(rep);
}
}
static int KeySpace_NotificationReentrance(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "reentrant_", 10) != 0) return REDISMODULE_OK;
RedisModule_AddPostNotificationJobForKey(ctx, KeySpace_PostNotificationReentranceProbe, key, NULL, NULL);
return REDISMODULE_OK;
}
static void KeySpace_PostNotificationMissKey(RedisModuleCtx *ctx, RedisModuleString *key, void *pd) {
REDISMODULE_NOT_USED(pd);
RedisModuleCallReply *rep = RedisModule_Call(ctx, "lpush", "!cs", "mget_misses", key);
if (rep) RedisModule_FreeCallReply(rep);
}
static int KeySpace_NotificationMiss(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "miss_", 5) != 0) return REDISMODULE_OK;
RedisModule_AddPostNotificationJobForKey(ctx, KeySpace_PostNotificationMissKey, key, NULL, NULL);
return REDISMODULE_OK;
}
/* This function must be present on each Redis module. It is used in order to
* register the commands into the Redis server. */
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_Init(ctx,"postnotifications",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
@ -240,10 +359,14 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
}
int with_key_events = 0;
if (argc >= 1) {
const char *arg = RedisModule_StringPtrLen(argv[0], 0);
for (int i = 0; i < argc; i++) {
const char *arg = RedisModule_StringPtrLen(argv[i], 0);
if (strcmp(arg, "with_key_events") == 0) {
with_key_events = 1;
} else if (strcmp(arg, "perkey") == 0) {
g_api_mode = MODE_PERKEY;
} else if (strcmp(arg, "regular") == 0) {
g_api_mode = MODE_REGULAR;
}
}
@ -275,8 +398,24 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
}
}
/* Per-key-only fixtures (behaviors with no regular API equivalent). */
if (g_api_mode == MODE_PERKEY) {
if (RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationBatched) != REDISMODULE_OK) {
return REDISMODULE_ERR;
}
if (RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_HASH, KeySpace_NotificationHash) != REDISMODULE_OK) {
return REDISMODULE_ERR;
}
if (RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationReentrance) != REDISMODULE_OK) {
return REDISMODULE_ERR;
}
if (RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_MISS, KeySpace_NotificationMiss) != REDISMODULE_OK) {
return REDISMODULE_ERR;
}
}
if (RedisModule_CreateCommand(ctx, "postnotification.async_set", KeySpace_PostNotificationsAsyncSet,
"write", 0, 0, 0) == REDISMODULE_ERR){
"write", 1, 1, 1) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}

View file

@ -0,0 +1,175 @@
/* Test module for RM_AddPostNotificationJobForKey firing across phases.
*
* Used by tests/unit/moduleapi/postnotifications_perkey_aof_repl.tcl. The
* module exercises the contract that a per-key post-notification callback
* MUST only touch non-replicated, non-AOF-persisted state here, module
* key metadata. This lets the same callback fire on a master, on a replica
* receiving master-propagated commands, and during AOF replay, with each
* instance maintaining its per-key state independently.
*
* Subscribes to KSN events for STRING / HASH / GENERIC / EXPIRED / EVICTED.
* For each notification the KSN handler enqueues a per-key job; the job
* later attaches metadata via RM_SetKeyMeta. A module-internal counter
* (NOT a Redis key to avoid AOF / replication pollution) records how
* many times the per-key job actually ran.
*
* Commands:
* pkmeta.getmeta <key> - Return the metadata string, or nil.
* pkmeta.firecount - Return the module-internal fire counter.
* pkmeta.reset - Zero the fire counter.
* pkmeta.try_outside - Call RM_AddPostNotificationJobForKey from
* outside a KSN handler; reply OK/ERR for the
* negative-coverage test.
*
* Copyright (c) 2006-Present, Redis Ltd.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3).
*/
#include "redismodule.h"
#include <string.h>
#include <stdlib.h>
static RedisModuleKeyMetaClassId meta_class_id = -1;
/* Module-internal counter — kept out of the keyspace on purpose so it is
* neither replicated nor AOF-persisted. Tests assert on it to confirm the
* per-key callback actually ran. */
static long long fire_count = 0;
static void MetaFreeCallback(const char *keyname, uint64_t meta) {
REDISMODULE_NOT_USED(keyname);
if (meta != 0) free((char *)meta);
}
/* Per-key post-notification job: attaches a "notified" string as metadata.
* Runs at the tail of the originating command (or sub-command for
* MULTI/EXEC), outside the KSN handler stack. */
static void PerKeyMetadataJob(RedisModuleCtx *ctx, RedisModuleString *key, void *pd) {
REDISMODULE_NOT_USED(pd);
if (meta_class_id < 0) return;
RedisModuleKey *k = RedisModule_OpenKey(ctx, key, REDISMODULE_WRITE);
if (!k) return;
if (RedisModule_KeyType(k) == REDISMODULE_KEYTYPE_EMPTY) {
RedisModule_CloseKey(k);
return;
}
uint64_t existing = 0;
if (RedisModule_GetKeyMeta(meta_class_id, k, &existing) == REDISMODULE_OK &&
existing != 0) {
free((char *)existing);
}
char *new_str = strdup("notified");
if (RedisModule_SetKeyMeta(meta_class_id, k, (uint64_t)new_str) == REDISMODULE_OK) {
fire_count++;
} else {
free(new_str);
}
RedisModule_CloseKey(k);
}
/* KSN handler: defers SetKeyMeta into a per-key job rather than calling it
* inline. This is the path under test the per-key API is what makes the
* write happen at a safe firing point, including during AOF replay and on
* a replica receiving propagated commands. */
static int NotifyCallback(RedisModuleCtx *ctx, int type, const char *event,
RedisModuleString *key) {
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
RedisModule_AddPostNotificationJobForKey(ctx, PerKeyMetadataJob, key, NULL, NULL);
return REDISMODULE_OK;
}
static int GetMetaCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 2) return RedisModule_WrongArity(ctx);
RedisModuleKey *k = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ);
if (!k || RedisModule_KeyType(k) == REDISMODULE_KEYTYPE_EMPTY) {
if (k) RedisModule_CloseKey(k);
return RedisModule_ReplyWithNull(ctx);
}
uint64_t meta = 0;
if (RedisModule_GetKeyMeta(meta_class_id, k, &meta) == REDISMODULE_OK && meta != 0) {
RedisModule_ReplyWithCString(ctx, (const char *)meta);
} else {
RedisModule_ReplyWithNull(ctx);
}
RedisModule_CloseKey(k);
return REDISMODULE_OK;
}
static int FireCountCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
return RedisModule_ReplyWithLongLong(ctx, fire_count);
}
static int ResetCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
fire_count = 0;
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
/* Calls RM_AddPostNotificationJobForKey from outside a KSN handler — must
* return REDISMODULE_ERR. Used by the negative coverage test. */
static int TryOutsideCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 2) return RedisModule_WrongArity(ctx);
int rc = RedisModule_AddPostNotificationJobForKey(ctx, PerKeyMetadataJob,
argv[1], NULL, NULL);
if (rc == REDISMODULE_OK) {
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
return RedisModule_ReplyWithError(ctx, "ERR registration refused");
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_Init(ctx, "pkmeta", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
RedisModuleKeyMetaClassConfig config = {0};
config.version = REDISMODULE_KEY_META_VERSION;
config.flags = (1 << REDISMODULE_META_ALLOW_IGNORE);
config.reset_value = (uint64_t)NULL;
config.free = MetaFreeCallback;
meta_class_id = RedisModule_CreateKeyMetaClass(ctx, "pkmc", 1, &config);
if (meta_class_id < 0) return REDISMODULE_ERR;
int notifyFlags = REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_HASH |
REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_EXPIRED |
REDISMODULE_NOTIFY_EVICTED;
if (RedisModule_SubscribeToKeyspaceEvents(ctx, notifyFlags, NotifyCallback) != REDISMODULE_OK)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "pkmeta.getmeta", GetMetaCommand,
"readonly", 1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "pkmeta.firecount", FireCountCommand,
"readonly", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "pkmeta.reset", ResetCommand,
"readonly", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "pkmeta.try_outside", TryOutsideCommand,
"readonly", 1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK;
}
int RedisModule_OnUnload(RedisModuleCtx *ctx) {
REDISMODULE_NOT_USED(ctx);
if (meta_class_id >= 0) {
RedisModule_ReleaseKeyMetaClass(meta_class_id);
meta_class_id = -1;
}
return REDISMODULE_OK;
}

View file

@ -1,174 +1,344 @@
set testmodule [file normalize tests/modules/postnotifications.so]
foreach api {regular perkey} {
tags "modules external:skip" {
start_server {} {
r module load $testmodule $api
test "Test write on post notification callback ($api API)" {
set repl [attach_to_replication_stream]
r set string_x 1
assert_equal {1} [r get string_changed{string_x}]
assert_equal {1} [r get string_total]
r set string_x 2
assert_equal {2} [r get string_changed{string_x}]
assert_equal {2} [r get string_total]
assert_replication_stream $repl {
{multi}
{select *}
{set string_x 1}
{incr string_changed{string_x}}
{incr string_total}
{exec}
{multi}
{set string_x 2}
{incr string_changed{string_x}}
{incr string_total}
{exec}
}
close_replication_stream $repl
}
}
}
}
foreach api {regular perkey} {
tags "modules external:skip" {
start_server {} {
r module load $testmodule $api
test "Test write on post notification callback from module thread ($api API)" {
r flushall
set repl [attach_to_replication_stream]
assert_equal {OK} [r postnotification.async_set string_x]
assert_equal {1} [r get string_changed{string_x}]
assert_equal {1} [r get string_total]
assert_replication_stream $repl {
{multi}
{select *}
{set string_x 1}
{incr string_changed{string_x}}
{incr string_total}
{exec}
}
close_replication_stream $repl
}
}
}
}
foreach api {regular perkey} {
tags "modules external:skip" {
start_server {} {
r module load $testmodule $api with_key_events
test "Test active expire ($api API)" {
r flushall
set repl [attach_to_replication_stream]
r set x 1
r pexpire x 10
wait_for_condition 100 50 {
[r keys expired] == {expired}
} else {
puts [r keys *]
fail "Failed waiting for x to expire"
}
# {lpush before_expired x} comes from the RedisModuleEvent_Key
# server event (always uses the regular post-notif queue).
# {incr expired} comes from the keyspace handler (regular or
# per-key queue depending on $api). Both APIs propagate the
# same stream: postExecutionUnitOperations drains regular
# before per-key, so the ordering between the two side-effects
# matches their in-process registration order.
assert_replication_stream $repl {
{select *}
{set x 1}
{pexpireat x *}
{multi}
{del x}
{lpush before_expired x}
{incr expired}
{exec}
}
close_replication_stream $repl
}
}
}
}
foreach api {regular perkey} {
tags "modules external:skip" {
start_server {} {
r module load $testmodule $api
test "Test lazy expire ($api API)" {
r flushall
r DEBUG SET-ACTIVE-EXPIRE 0
set repl [attach_to_replication_stream]
r set x 1
r pexpire x 1
after 10
assert_equal {} [r get x]
assert_replication_stream $repl {
{select *}
{set x 1}
{pexpireat x *}
{multi}
{del x}
{incr expired}
{exec}
}
close_replication_stream $repl
r DEBUG SET-ACTIVE-EXPIRE 1
} {OK} {needs:debug}
}
}
}
foreach api {regular perkey} {
tags "modules external:skip" {
start_server {} {
r module load $testmodule $api
test "Test lazy expire inside post job notification ($api API)" {
r flushall
r DEBUG SET-ACTIVE-EXPIRE 0
set repl [attach_to_replication_stream]
r set x 1
r pexpire x 1
after 10
assert_equal {OK} [r set read_x 1]
assert_replication_stream $repl {
{select *}
{set x 1}
{pexpireat x *}
{multi}
{set read_x 1}
{del x}
{incr expired}
{exec}
}
close_replication_stream $repl
r DEBUG SET-ACTIVE-EXPIRE 1
} {OK} {needs:debug}
}
}
}
foreach api {regular perkey} {
tags "modules external:skip" {
start_server {} {
r module load $testmodule $api with_key_events
test "Test nested keyspace notification ($api API)" {
r flushall
set repl [attach_to_replication_stream]
assert_equal {OK} [r set write_sync_write_sync_x 1]
assert_replication_stream $repl {
{multi}
{select *}
{set x 1}
{set write_sync_x 1}
{set write_sync_write_sync_x 1}
{exec}
}
close_replication_stream $repl
}
}
}
}
foreach api {regular perkey} {
tags "modules external:skip" {
start_server {} {
r module load $testmodule $api with_key_events
test "Test eviction ($api API)" {
r flushall
set repl [attach_to_replication_stream]
r set x 1
r config set maxmemory-policy allkeys-random
r config set maxmemory 1
assert_error {OOM *} {r set y 1}
# {lpush before_evicted x} comes from the
# RedisModuleEvent_Key/before_evicted server event (always uses
# the regular post-notif queue). {incr evicted} comes from the
# keyspace handler (regular or per-key queue depending on
# $api). Both APIs propagate the same stream: regular drains
# before per-key inside postExecutionUnitOperations.
assert_replication_stream $repl {
{select *}
{set x 1}
{multi}
{del x}
{lpush before_evicted x}
{incr evicted}
{exec}
}
close_replication_stream $repl
} {} {needs:config-maxmemory}
}
}
}
# Per-key-only tests (no regular-API equivalent).
tags "modules external:skip" {
start_server {} {
r module load $testmodule with_key_events
r module load $testmodule perkey
test {Test write on post notification callback} {
test {Per-key post notification job fires between MULTI/EXEC sub-commands} {
r flushall
set repl [attach_to_replication_stream]
r set string_x 1
assert_equal {1} [r get string_changed{string_x}]
assert_equal {1} [r get string_total]
r multi
r set batched_a 1
r set batched_b 2
r set batched_c 3
r exec
r set string_x 2
assert_equal {2} [r get string_changed{string_x}]
assert_equal {2} [r get string_total]
assert_equal {batched_c batched_b batched_a} [r lrange batched_keys 0 -1]
# the {lpush before_overwritten string_x} is a post notification job registered when 'string_x' was overwritten
assert_replication_stream $repl {
{multi}
{select *}
{set string_x 1}
{incr string_changed{string_x}}
{incr string_total}
{exec}
{multi}
{set string_x 2}
{lpush before_overwritten string_x}
{incr string_changed{string_x}}
{incr string_total}
{set batched_a 1}
{lpush batched_keys batched_a}
{set batched_b 2}
{lpush batched_keys batched_b}
{set batched_c 3}
{lpush batched_keys batched_c}
{exec}
}
close_replication_stream $repl
}
test {Test write on post notification callback from module thread} {
test {Per-key callback does not re-enter firing while a nested RM_Call is in flight} {
r flushall
r set reentrant_outer 1
set log [r lrange reentrance_log 0 -1]
assert_equal -1 [lsearch $log "REENTRANCE_DETECTED"]
assert_equal {inner_after_outer outer_done} $log
}
test {Per-key post notification job fires per affected key on multi-key commands} {
r flushall
set repl [attach_to_replication_stream]
assert_equal {OK} [r postnotification.async_set]
assert_equal {1} [r get string_changed{string_x}]
assert_equal {1} [r get string_total]
# MSET emits one notifyKeyspaceEvent per key. Each dispatch sets
# server.in_keyspace_notification, so the keyspace handler can
# register one per-key job per affected key. All three jobs fire
# at the tail of MSET's call() and propagate inside the same
# multi/exec the propagation buffer flushes.
r mset batched_a 1 batched_b 2 batched_c 3
assert_equal {batched_c batched_b batched_a} [r lrange batched_keys 0 -1]
assert_replication_stream $repl {
{multi}
{select *}
{set string_x 1}
{incr string_changed{string_x}}
{incr string_total}
{mset batched_a 1 batched_b 2 batched_c 3}
{lpush batched_keys batched_a}
{lpush batched_keys batched_b}
{lpush batched_keys batched_c}
{exec}
}
close_replication_stream $repl
}
test {Test active expire} {
test {Per-key post notification job fires per missing key on MGET (multi-key read)} {
r flushall
set repl [attach_to_replication_stream]
r set x 1
r pexpire x 10
# MGET emits one NOTIFY_KEY_MISS notification per missing key. Each
# dispatch sets server.in_keyspace_notification, so the per-key
# handler registers one job per miss. The jobs drain at the tail of
# MGET's call(), propagating as a multi/exec after the read.
assert_equal {{} {} {}} [r mget miss_a miss_b miss_c]
assert_equal {miss_c miss_b miss_a} [r lrange mget_misses 0 -1]
wait_for_condition 100 50 {
[r keys expired] == {expired}
} else {
puts [r keys *]
fail "Failed waiting for x to expired"
}
# the {lpush before_expired x} is a post notification job registered before 'x' got expired
assert_replication_stream $repl {
{select *}
{set x 1}
{pexpireat x *}
{multi}
{del x}
{lpush before_expired x}
{incr expired}
{select *}
{lpush mget_misses miss_a}
{lpush mget_misses miss_b}
{lpush mget_misses miss_c}
{exec}
}
close_replication_stream $repl
}
test {Test lazy expire} {
r flushall
r DEBUG SET-ACTIVE-EXPIRE 0
set repl [attach_to_replication_stream]
r set x 1
r pexpire x 1
after 10
assert_equal {} [r get x]
# the {lpush before_expired x} is a post notification job registered before 'x' got expired
assert_replication_stream $repl {
{select *}
{set x 1}
{pexpireat x *}
{multi}
{del x}
{lpush before_expired x}
{incr expired}
{exec}
}
close_replication_stream $repl
r DEBUG SET-ACTIVE-EXPIRE 1
} {OK} {needs:debug}
test {Test lazy expire inside post job notification} {
r flushall
r DEBUG SET-ACTIVE-EXPIRE 0
set repl [attach_to_replication_stream]
r set x 1
r pexpire x 1
after 10
assert_equal {OK} [r set read_x 1]
# the {lpush before_expired x} is a post notification job registered before 'x' got expired
assert_replication_stream $repl {
{select *}
{set x 1}
{pexpireat x *}
{multi}
{set read_x 1}
{del x}
{lpush before_expired x}
{incr expired}
{exec}
}
close_replication_stream $repl
r DEBUG SET-ACTIVE-EXPIRE 1
} {OK} {needs:debug}
test {Test nested keyspace notification} {
test {Per-key post notification job fires between HSET and HEXPIRE on the same hash inside MULTI/EXEC} {
r flushall
set repl [attach_to_replication_stream]
assert_equal {OK} [r set write_sync_write_sync_x 1]
r multi
r hset hash_h f1 v1
r hset hash_h f2 v2
r hexpire hash_h 100 FIELDS 1 f1
r exec
assert_equal {hash_h hash_h hash_h} [r lrange hash_keys 0 -1]
assert_replication_stream $repl {
{multi}
{select *}
{set x 1}
{set write_sync_x 1}
{set write_sync_write_sync_x 1}
{hset hash_h f1 v1}
{lpush hash_keys hash_h}
{hset hash_h f2 v2}
{lpush hash_keys hash_h}
{hpexpireat hash_h * FIELDS 1 f1}
{lpush hash_keys hash_h}
{exec}
}
close_replication_stream $repl
}
test {Test eviction} {
r flushall
set repl [attach_to_replication_stream]
r set x 1
r config set maxmemory-policy allkeys-random
r config set maxmemory 1
assert_error {OOM *} {r set y 1}
# the {lpush before_evicted x} is a post notification job registered before 'x' got evicted
assert_replication_stream $repl {
{select *}
{set x 1}
{multi}
{del x}
{lpush before_evicted x}
{incr evicted}
{exec}
}
close_replication_stream $repl
} {} {needs:config-maxmemory}
}
}
@ -207,11 +377,6 @@ tags "modules external:skip" {
{incr string_changed{string_x}}
{incr string_total}
{exec}
{multi}
{set string1_x 1}
{incr string_changed{string1_x}}
{incr string_total}
{exec}
}
close_replication_stream $repl
}

View file

@ -0,0 +1,272 @@
set testmodule [file normalize tests/modules/postnotifications_perkey_metadata.so]
# AOF replay on a standalone master.
#
# Asserts that per-key post-notification jobs fire during AOF replay with the
# same pattern as during normal execution: once per single command, once per
# MULTI/EXEC sub-command. The per-key callback attaches module key metadata,
# which is NOT in the AOF so its presence after reload is direct evidence
# that the callback re-ran during replay. The module-internal fire counter
# (read via `pkmeta.firecount`) is the load-bearing assertion.
tags "modules aof external:skip" {
foreach aofload_type {debug_cmd startup} {
test "perkey-aof: single command rebuilds metadata via AOF reload (load=$aofload_type)" {
start_server [list overrides [list loadmodule "$testmodule"]] {
r config set appendonly yes
r config set auto-aof-rewrite-percentage 0
waitForBgrewriteaof r
r hset h1 f v
assert_equal "notified" [r pkmeta.getmeta h1]
assert_equal 1 [r pkmeta.firecount]
# Reset the counter so the post-reload count reflects only
# what the AOF replay path produced.
r pkmeta.reset
if {$aofload_type == "debug_cmd"} {
r debug loadaof
} else {
r config rewrite
restart_server 0 true false
wait_done_loading r
}
assert_equal "notified" [r pkmeta.getmeta h1]
assert_equal 1 [r pkmeta.firecount]
}
}
test "perkey-aof: MULTI/EXEC fires once per sub-command during AOF reload (load=$aofload_type)" {
start_server [list overrides [list loadmodule "$testmodule"]] {
r config set appendonly yes
r config set auto-aof-rewrite-percentage 0
waitForBgrewriteaof r
r multi
r hset h1 f v
r hset h2 f v
r hset h3 f v
r exec
assert_equal 3 [r pkmeta.firecount]
r pkmeta.reset
if {$aofload_type == "debug_cmd"} {
r debug loadaof
} else {
r config rewrite
restart_server 0 true false
wait_done_loading r
}
# Each sub-command's per-key drain must have fired during
# replay three HSETs three callback invocations.
assert_equal 3 [r pkmeta.firecount]
assert_equal "notified" [r pkmeta.getmeta h1]
assert_equal "notified" [r pkmeta.getmeta h2]
assert_equal "notified" [r pkmeta.getmeta h3]
}
}
test "perkey-aof: HSET + HEXPIRE in MULTI/EXEC fires twice during AOF reload (load=$aofload_type)" {
start_server [list overrides [list loadmodule "$testmodule"]] {
r config set appendonly yes
r config set auto-aof-rewrite-percentage 0
waitForBgrewriteaof r
r multi
r hset h_hexp f v
r hexpire h_hexp 100 FIELDS 1 f
r exec
assert_equal 2 [r pkmeta.firecount]
r pkmeta.reset
if {$aofload_type == "debug_cmd"} {
r debug loadaof
} else {
r config rewrite
restart_server 0 true false
wait_done_loading r
}
# HSET + HEXPIRE on the same key two KSN events, two
# per-key job firings. This was the original motivating
# scenario (RED-197766).
assert_equal 2 [r pkmeta.firecount]
assert_equal "notified" [r pkmeta.getmeta h_hexp]
}
}
}
}
# RDB load is intentionally outside the firing pattern.
#
# RDB load decodes keys directly without running commands, so no KSN fires,
# so per-key callbacks do not run. This test pins that design boundary: if
# anyone later "fixes" RDB load to fire KSN, this assertion will break and
# force the change to be considered explicitly.
tags "modules external:skip" {
test "perkey-rdb: RDB-only restart does NOT rebuild metadata (no KSN on RDB load)" {
start_server [list overrides [list loadmodule "$testmodule" appendonly no]] {
r hset h_rdb f v
assert_equal "notified" [r pkmeta.getmeta h_rdb]
assert_equal 1 [r pkmeta.firecount]
r pkmeta.reset
r debug reload
# RDB roundtrip: the key is back, but its metadata is not (the
# metadata class doesn't persist via rdb_save/rdb_load in this
# module), and the per-key job did NOT fire during load.
assert_equal "hash" [r type h_rdb]
assert_equal {} [r pkmeta.getmeta h_rdb]
assert_equal 0 [r pkmeta.firecount]
}
} {} {needs:debug}
}
# AOF replay on a replica at startup.
#
# Exercises the carve-out in RM_AddPostNotificationJobForKey that permits
# registration during loading even when masterhost is set. Without that
# carve-out the per-key job would be refused on a replica's own AOF replay
# and metadata would not be rebuilt at startup.
tags "modules aof external:skip" {
test "perkey-aof-replica: AOF replay on a replica at startup rebuilds metadata" {
# Master is loaded with the module too so its propagation path is
# comparable to a real deployment. The point under test is the AOF
# replay step on the replica at restart, not the initial sync.
start_server [list overrides [list loadmodule "$testmodule"]] {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
start_server [list overrides [list \
loadmodule $testmodule \
appendonly yes \
auto-aof-rewrite-percentage 0 \
replicaof "$master_host $master_port"]] {
set replica [srv 0 client]
wait_for_sync $replica
# The replica boots with appendonly=yes and replicaof, so
# post-sync it kicks off a background AOF rewrite. Until that
# child finishes, propagated commands land in a temp incr
# file that `debug loadaof` won't see wait it out before
# driving the write under test.
waitForBgrewriteaof $replica
# Drive a write on the master; replica receives it via
# propagation and writes it to its own AOF.
$master hset h_repl f v
wait_for_ofs_sync $master $replica
# Sanity: key did propagate
assert_equal 1 [$replica hexists h_repl f]
assert_equal "notified" [$replica pkmeta.getmeta h_repl]
$replica pkmeta.reset
# Use debug loadaof to exercise the AOF replay path
# specifically on a configured replica (masterhost set,
# repl_slave_ro true, server.loading=1). A full restart
# would re-sync from master via RDB and wipe metadata
# that is a separate code path. We deliberately do not
# rewrite the AOF here: rewriting converts the HSET into
# the RDB-encoded base AOF, and RDB load (preamble or
# otherwise) intentionally does not fire KSN. The
# incremental AOF which is what propagated commands
# land in is what the per-key drain runs against.
$replica debug loadaof
# The AOF reload fires the per-key job on the replica; the
# callback runs with masterhost set, repl_slave_ro on, and
# server.loading == 1, which is exactly the carve-out.
assert_equal "notified" [$replica pkmeta.getmeta h_repl]
assert {[$replica pkmeta.firecount] >= 1}
}
}
}
}
# Master replica steady-state propagation.
#
# With the replica check dropped, per-key jobs fire on the replica too:
# both sides run the same KSN over the same command stream and maintain
# their per-key state independently. No metadata traffic on the wire.
tags "modules external:skip" {
test "perkey-repl: replica builds metadata from master-propagated single command" {
start_server [list overrides [list loadmodule "$testmodule"]] {
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]
start_server [list overrides [list loadmodule "$testmodule"]] {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
$replica replicaof $master_host $master_port
wait_for_sync $replica
$master pkmeta.reset
$replica pkmeta.reset
$master hset h_prop f v
wait_for_ofs_sync $master $replica
# Both sides ran the per-key job locally no metadata
# crossed the replication stream.
assert_equal "notified" [$master pkmeta.getmeta h_prop]
assert_equal "notified" [$replica pkmeta.getmeta h_prop]
assert_equal 1 [$master pkmeta.firecount]
assert_equal 1 [$replica pkmeta.firecount]
}
}
}
test "perkey-repl: replica fires per sub-command for propagated MULTI/EXEC" {
start_server [list overrides [list loadmodule "$testmodule"]] {
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]
start_server [list overrides [list loadmodule "$testmodule"]] {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
$replica replicaof $master_host $master_port
wait_for_sync $replica
$master pkmeta.reset
$replica pkmeta.reset
$master multi
$master hset hp1 f v
$master hset hp2 f v
$master hset hp3 f v
$master exec
wait_for_ofs_sync $master $replica
assert_equal 3 [$master pkmeta.firecount]
assert_equal 3 [$replica pkmeta.firecount]
foreach key {hp1 hp2 hp3} {
assert_equal "notified" [$master pkmeta.getmeta $key]
assert_equal "notified" [$replica pkmeta.getmeta $key]
}
}
}
}
}
# Negative coverage: API misuse outside a KSN handler.
#
# The only remaining runtime guard. Calling RM_AddPostNotificationJobForKey
# from a regular module command (not a KSN handler) must return
# REDISMODULE_ERR with a LL_WARNING log entry.
tags "modules external:skip" {
test "perkey-misuse: registration refused outside a KSN handler" {
start_server [list overrides [list loadmodule "$testmodule"]] {
assert_error {ERR registration refused*} {r pkmeta.try_outside any_key}
}
}
}