diff --git a/src/aof.c b/src/aof.c index 9e55a78b7..df80861b9 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1681,6 +1681,12 @@ int loadSingleAppendOnlyFile(char *filename) { queueMultiCommand(fakeClient, cmd->flags); } else { cmd->proc(fakeClient); + /* AOF replay bypasses call()/afterCommand(); drain the per-key + * post-notification queue here so module callbacks fire once per + * replayed single command, mirroring the normal-execution path. + * EXEC's sub-commands run through call() above and are drained + * by afterCommand() for each sub-command. */ + firePostKeyedNotificationJobs(); fakeClient->all_argv_len_sum = 0; /* Otherwise no one cleans this up and we reach cleanup with it non-zero */ } @@ -1760,6 +1766,12 @@ fmterr: /* Format error. */ /* fall through to cleanup. */ cleanup: + /* Drain any per-key post-notification jobs left over from a partially + * applied command before tearing down the fake client. In the success + * path the per-iteration drain after cmd->proc() has already emptied + * the queue; this catches stragglers from any aborted-mid-execution + * path so the next caller doesn't observe stale jobs. */ + firePostKeyedNotificationJobs(); if (fakeClient) freeClient(fakeClient); server.current_client = old_cur_client; server.executing_client = old_exec_client; diff --git a/src/module.c b/src/module.c index 50a594987..134dba25e 100644 --- a/src/module.c +++ b/src/module.c @@ -148,7 +148,7 @@ struct RedisModuleCtx { gets called for clients blocked on keys. */ - /* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST or + /* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST or * REDISMODULE_CTX_CHANNEL_POS_REQUEST flag set. */ getKeysResult *keys_result; @@ -308,6 +308,7 @@ typedef void (*RedisModuleNotificationWithSubkeysFunc)(RedisModuleCtx *ctx, int /* Function pointer type for post jobs */ typedef void (*RedisModulePostNotificationJobFunc) (RedisModuleCtx *ctx, void *pd); +typedef void (*RedisModulePostNotificationJobPerKeyFunc) (RedisModuleCtx *ctx, RedisModuleString *key, void *pd); /* Keyspace notification subscriber information. * See RM_SubscribeToKeyspaceEvents() for more information. */ @@ -336,6 +337,17 @@ typedef struct RedisModulePostExecUnitJob { int dbid; } RedisModulePostExecUnitJob; +/* A keyed post-notification job. Fires at the tail of every call() so each + * sub-command boundary is observed; jobs fire in submission order. */ +typedef struct RedisModulePostKeyedNotificationJob { + RedisModule *module; + RedisModulePostNotificationJobPerKeyFunc callback; + RedisModuleString *key; /* Owned reference; freed after the callback runs. */ + void *pd; + void (*free_pd)(void*); + int dbid; +} RedisModulePostKeyedNotificationJob; + /* The module keyspace notification subscribers list */ static list *moduleKeyspaceSubscribers; @@ -344,9 +356,14 @@ static list *moduleKeyspaceSubscribers; static int moduleKeyspaceSubscribersTypes = 0; static int moduleKeyspaceSubscribersWithSubkeysTypes = 0; -/* The module post keyspace jobs list */ +/* The module post keyspace jobs list (single-shot jobs, fired once at the end + * of the outermost execution unit). */ static list *modulePostExecUnitJobs; +/* The module per-key post-notification jobs list (fired at the tail of every + * call(), so each sub-command boundary inside MULTI/EXEC is observed). */ +static list *modulePostKeyedNotificationJobs; + /* Data structures related to the exported dictionary data structure. */ typedef struct RedisModuleDict { rax *rax; /* The radix tree. */ @@ -459,8 +476,8 @@ struct ModuleConfig { sds name; /* Fullname of the config (as it appears in the config file) */ sds alias; /* Optional alias for the configuration. NULL if none exists */ - int unprefixedFlag; /* Indicates if the REDISMODULE_CONFIG_UNPREFIXED flag was set. - * If the configuration name was prefixed,during get_fn/set_fn + int unprefixedFlag; /* Indicates if the REDISMODULE_CONFIG_UNPREFIXED flag was set. + * If the configuration name was prefixed,during get_fn/set_fn * callbacks, it should be reported without the prefix */ void *privdata; /* Optional data passed into the module config callbacks */ @@ -1099,14 +1116,14 @@ int RM_IsChannelsPositionRequest(RedisModuleCtx *ctx) { * registration, the command implementation checks for this special call * using the RedisModule_IsChannelsPositionRequest() API and uses this * function in order to report the channels. - * + * * The supported flags are: * * REDISMODULE_CMD_CHANNEL_SUBSCRIBE: This command will subscribe to the channel. * * REDISMODULE_CMD_CHANNEL_UNSUBSCRIBE: This command will unsubscribe from this channel. * * REDISMODULE_CMD_CHANNEL_PUBLISH: This command will publish to this channel. - * * REDISMODULE_CMD_CHANNEL_PATTERN: Instead of acting on a specific channel, will act on any + * * REDISMODULE_CMD_CHANNEL_PATTERN: Instead of acting on a specific channel, will act on any * channel specified by the pattern. This is the same access - * used by the PSUBSCRIBE and PUNSUBSCRIBE commands available + * used by the PSUBSCRIBE and PUNSUBSCRIBE commands available * in Redis. Not intended to be used with PUBLISH permissions. * * The following is an example of how it could be used: @@ -1514,13 +1531,13 @@ int populateArgsStructure(struct redisCommandArg *args) { /* RedisModule_AddACLCategory can be used to add new ACL command categories. Category names * can only contain alphanumeric characters, underscores, or dashes. Categories can only be added - * during the RedisModule_OnLoad function. Once a category has been added, it can not be removed. + * during the RedisModule_OnLoad function. Once a category has been added, it can not be removed. * Any module can register a command to any added categories using RedisModule_SetCommandACLCategories. - * + * * Returns: - * - REDISMODULE_OK on successfully adding the new ACL category. + * - REDISMODULE_OK on successfully adding the new ACL category. * - REDISMODULE_ERR on failure. - * + * * On error the errno is set to: * - EINVAL if the name contains invalid characters. * - EBUSY if the category name already exists. @@ -1566,9 +1583,9 @@ int matchAclCategoryFlag(char *flag, int64_t *acl_categories_flags) { } /* Helper for RM_SetCommandACLCategories(). Turns a string representing acl category - * flags into the acl category flags used by Redis ACL which allows users to access + * flags into the acl category flags used by Redis ACL which allows users to access * the module commands by acl categories. - * + * * It returns the set of acl flags, or -1 if unknown flags are found. */ int64_t categoryFlagsFromString(char *aclflags) { int count, j; @@ -1589,12 +1606,12 @@ int64_t categoryFlagsFromString(char *aclflags) { /* RedisModule_SetCommandACLCategories can be used to set ACL categories to module * commands and subcommands. The set of ACL categories should be passed as * a space separated C string 'aclflags'. - * - * Example, the acl flags 'write slow' marks the command as part of the write and + * + * Example, the acl flags 'write slow' marks the command as part of the write and * slow ACL categories. - * + * * On success REDISMODULE_OK is returned. On error REDISMODULE_ERR is returned. - * + * * This function can only be called during the RedisModule_OnLoad function. If called * outside of this function, an error is returned. */ @@ -1826,7 +1843,7 @@ int RM_SetCommandACLCategories(RedisModuleCommand *command, const char *aclflags * * Other flags: * - * * `REDISMODULE_CMD_KEY_NOT_KEY`: The key is not actually a key, but + * * `REDISMODULE_CMD_KEY_NOT_KEY`: The key is not actually a key, but * should be routed in cluster mode as if it was a key. * * * `REDISMODULE_CMD_KEY_INCOMPLETE`: The keyspec might not point out all @@ -2407,7 +2424,7 @@ ustime_t RM_CachedMicroseconds(void) { * RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd() * to accumulate independent time intervals to the background duration. * This method always return REDISMODULE_OK. - * + * * This function is not thread safe, If used in module thread and blocked callback (possibly main thread) * simultaneously, it's recommended to protect them with lock owned by caller instead of GIL. */ int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) { @@ -2420,7 +2437,7 @@ int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) { * On success REDISMODULE_OK is returned. * This method only returns REDISMODULE_ERR if no start time was * previously defined ( meaning RM_BlockedClientMeasureTimeStart was not called ). - * + * * This function is not thread safe, If used in module thread and blocked callback (possibly main thread) * simultaneously, it's recommended to protect them with lock owned by caller instead of GIL. */ int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) { @@ -2534,7 +2551,7 @@ void RM_Yield(RedisModuleCtx *ctx, int flags, const char *busy_reply) { * * REDISMODULE_OPTION_NO_IMPLICIT_SIGNAL_MODIFIED: * See RM_SignalModifiedKey(). - * + * * REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD: * Setting this flag indicates module awareness of diskless async replication (repl-diskless-load=swapdb) * and that redis could be serving reads during replication instead of blocking with LOADING status. @@ -3271,9 +3288,9 @@ int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) { * * If the connected client is using RESP2, the reply will be converted to a flat * array. - * + * * Use RM_ReplySetMapLength() to set deferred length. - * + * * The function always returns REDISMODULE_OK. */ int RM_ReplyWithMap(RedisModuleCtx *ctx, long len) { return moduleReplyWithCollection(ctx, len, COLLECTION_REPLY_MAP); @@ -3290,7 +3307,7 @@ int RM_ReplyWithMap(RedisModuleCtx *ctx, long len) { * array type. * * Use RM_ReplySetSetLength() to set deferred length. - * + * * The function always returns REDISMODULE_OK. */ int RM_ReplyWithSet(RedisModuleCtx *ctx, long len) { return moduleReplyWithCollection(ctx, len, COLLECTION_REPLY_SET); @@ -3305,12 +3322,12 @@ int RM_ReplyWithSet(RedisModuleCtx *ctx, long len) { * See Reply APIs section for more details. * * Use RM_ReplySetAttributeLength() to set deferred length. - * + * * Not supported by RESP2 and will return REDISMODULE_ERR, otherwise * the function always returns REDISMODULE_OK. */ int RM_ReplyWithAttribute(RedisModuleCtx *ctx, long len) { if (ctx->client->resp == 2) return REDISMODULE_ERR; - + return moduleReplyWithCollection(ctx, len, COLLECTION_REPLY_ATTRIBUTE); } @@ -3557,7 +3574,7 @@ int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) { * a string into a C buffer, and then calling the function * RedisModule_ReplyWithStringBuffer() with the buffer and length. * - * In RESP3 the string is tagged as a double, while in RESP2 it's just a plain string + * In RESP3 the string is tagged as a double, while in RESP2 it's just a plain string * that the user will have to parse. * * The function always returns REDISMODULE_OK. */ @@ -3571,7 +3588,7 @@ int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) { /* Reply with a RESP3 BigNumber type. * Visit https://github.com/antirez/RESP3/blob/master/spec.md for more info about RESP3. * - * In RESP3, this is a string of length `len` that is tagged as a BigNumber, + * In RESP3, this is a string of length `len` that is tagged as a BigNumber, * however, it's up to the caller to ensure that it's a valid BigNumber. * In RESP2, this is just a plain bulk string response. * @@ -3727,7 +3744,7 @@ RedisModuleString *RM_GetClientUserNameById(RedisModuleCtx *ctx, uint64_t id) { errno = ENOENT; return NULL; } - + if (client->user == NULL) { errno = ENOTSUP; return NULL; @@ -4324,7 +4341,7 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) { return REDISMODULE_ERR; if (expire != REDISMODULE_NO_EXPIRE) { expire += commandTimeSnapshot(); - /* setExpire() might realloc kvobj */ + /* setExpire() might realloc kvobj */ key->kv = setExpire(key->ctx->client,key->db,key->key,expire); } else { removeExpire(key->db,key->key); @@ -4345,7 +4362,7 @@ mstime_t RM_GetAbsExpire(RedisModuleKey *key) { /* Set a new expire for the key. If the special expire * REDISMODULE_NO_EXPIRE is set, the expire is cancelled if there was * one (the same as the PERSIST command). - * + * * Note that the expire must be provided as a positive integer representing * the absolute Unix timestamp the key should have. * @@ -4409,8 +4426,8 @@ int RM_SetAbsExpire(RedisModuleKey *key, mstime_t expire) { * .free_effort = myMeta_FreeEffortCallback * } * - * Redis does NOT take ownership of the config structure itself. The `confPtr` - * parameter only needs to remain valid during the RM_CreateKeyMetaClass() call + * Redis does NOT take ownership of the config structure itself. The `confPtr` + * parameter only needs to remain valid during the RM_CreateKeyMetaClass() call * and can be freed immediately after. * * * **version**: Module must set it to REDISMODULE_KEY_META_VERSION. This field is @@ -4536,7 +4553,7 @@ RedisModuleKeyMetaClassId RM_CreateKeyMetaClass(RedisModuleCtx *ctx, void *confPtr) { RedisModuleKeyMetaClassId id; - + /* Allow registration during OnLoad, server startup, or when debug flag is set */ int ctx_flags = RM_GetContextFlags(ctx); if (!ctx->module->onload && @@ -4546,7 +4563,7 @@ RedisModuleKeyMetaClassId RM_CreateKeyMetaClass(RedisModuleCtx *ctx, if (!confPtr) return -2; - + /* This structure supposed to evolve over time and defines the superset of all * module type methods supported across different Redis module API versions */ struct KeyMetaConfAllVersions { @@ -4562,11 +4579,11 @@ RedisModuleKeyMetaClassId RM_CreateKeyMetaClass(RedisModuleCtx *ctx, KeyMetaLoadFunc rdb_load; KeyMetaSaveFunc rdb_save; KeyMetaAOFRewriteFunc aof_rewrite; - KeyMetaDefragFunc defrag; + KeyMetaDefragFunc defrag; KeyMetaMemUsageFunc mem_usage; KeyMetaFreeEffortFunc free_effort; } *legacy = (struct KeyMetaConfAllVersions *)confPtr; - + if (legacy->version == 0 || legacy->version > REDISMODULE_KEY_META_VERSION) return -3; @@ -4590,7 +4607,7 @@ RedisModuleKeyMetaClassId RM_CreateKeyMetaClass(RedisModuleCtx *ctx, id = keyMetaClassCreate(ctx->module, metaname, metaver, &conf); if (id == 0) return -4; - + return id; } @@ -4620,10 +4637,10 @@ int RM_SetKeyMeta(RedisModuleKeyMetaClassId id, RedisModuleKey *key, uint64_t me int RM_GetKeyMeta(RedisModuleKeyMetaClassId id, RedisModuleKey *key, uint64_t *metadata) { if ((!key) || (key->kv == NULL) || (!metadata)) return REDISMODULE_ERR; - + if (keyMetaGetMetadata(id, key->kv, metadata) == 0) return REDISMODULE_ERR; - + return REDISMODULE_OK; } @@ -5168,7 +5185,7 @@ int moduleZsetAddFlagsFromCoreFlags(int flags) { * * REDISMODULE_ZADD_XX: Element must already exist. Do nothing otherwise. * REDISMODULE_ZADD_NX: Element must not exist. Do nothing otherwise. - * REDISMODULE_ZADD_GT: If element exists, new score must be greater than the current score. + * REDISMODULE_ZADD_GT: If element exists, new score must be greater than the current score. * Do nothing otherwise. Can optionally be combined with XX. * REDISMODULE_ZADD_LT: If element exists, new score must be less than the current score. * Do nothing otherwise. Can optionally be combined with XX. @@ -5814,12 +5831,12 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) { * expecting a RedisModuleString pointer to pointer, the function just * reports if the field exists or not and expects an integer pointer * as the second element of each pair. - * + * * REDISMODULE_HASH_EXPIRE_TIME: retrieves the expiration time of a field in the hash. * The function expects a `mstime_t` pointer as the second element of each pair. - * If the field does not exist or has no expiration, the value is set to + * If the field does not exist or has no expiration, the value is set to * `REDISMODULE_NO_EXPIRE`. This flag must not be used with `REDISMODULE_HASH_EXISTS`. - * + * * Example of REDISMODULE_HASH_CFIELDS: * * RedisModuleString *username, *hashedpass; @@ -5832,9 +5849,9 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) { * * Example of REDISMODULE_HASH_EXPIRE_TIME: * - * mstime_t hpExpireTime; + * mstime_t hpExpireTime; * RedisModule_HashGet(mykey,REDISMODULE_HASH_EXPIRE_TIME,"hp",&hpExpireTime,NULL); - * + * * The function returns REDISMODULE_OK on success and REDISMODULE_ERR if * the key is not a hash value. * @@ -5852,8 +5869,8 @@ int RM_HashGet(RedisModuleKey *key, int flags, ...) { hfeFlags = HFE_LAZY_ACCESS_EXPIRED; /* allow read also expired fields */ /* Verify flag HASH_EXISTS is not set together with HASH_EXPIRE_TIME */ - if ((flags & REDISMODULE_HASH_EXISTS) && (flags & REDISMODULE_HASH_EXPIRE_TIME)) - return REDISMODULE_ERR; + if ((flags & REDISMODULE_HASH_EXISTS) && (flags & REDISMODULE_HASH_EXPIRE_TIME)) + return REDISMODULE_ERR; va_start(ap, flags); while(1) { @@ -5878,7 +5895,7 @@ int RM_HashGet(RedisModuleKey *key, int flags, ...) { *existsptr = 0; } } else if (flags & REDISMODULE_HASH_EXPIRE_TIME) { - mstime_t *expireptr = va_arg(ap,mstime_t*); + mstime_t *expireptr = va_arg(ap,mstime_t*); *expireptr = REDISMODULE_NO_EXPIRE; if (key->kv) { uint64_t expireTime = 0; @@ -5915,7 +5932,7 @@ int RM_HashGet(RedisModuleKey *key, int flags, ...) { /** * Retrieves the minimum expiration time of fields in a hash. - * + * * Return: * - The minimum expiration time (in milliseconds) of the hash fields if at * least one field has an expiration set. @@ -5925,7 +5942,7 @@ int RM_HashGet(RedisModuleKey *key, int flags, ...) { mstime_t RM_HashFieldMinExpire(RedisModuleKey *key) { if ((!key->kv) || (key->kv->type != OBJ_HASH)) return REDISMODULE_NO_EXPIRE; - + mstime_t min = hashTypeGetMinExpire(key->kv, 1); return (min == EB_EXPIRE_TIME_INVALID) ? REDISMODULE_NO_EXPIRE : min; } @@ -7350,7 +7367,7 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj } else { newval = mt->copy(fromkey, tokey, mv->value); } - + if (!newval) { addReplyError(c, "module key failed to copy"); return NULL; @@ -7400,7 +7417,7 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj * .unlink = myType_UnlinkCallBack, * .copy = myType_CopyCallback, * .defrag = myType_DefragCallback - * + * * // Enhanced optional fields * .mem_usage2 = myType_MemUsageCallBack2, * .free_effort2 = myType_FreeEffortCallBack2, @@ -7419,11 +7436,11 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj * Similar to aux_save, returns REDISMODULE_OK on success, and ERR otherwise. * * **free_effort**: A callback function pointer that used to determine whether the module's * memory needs to be lazy reclaimed. The module should return the complexity involved by - * freeing the value. for example: how many pointers are gonna be freed. Note that if it + * freeing the value. for example: how many pointers are gonna be freed. Note that if it * returns 0, we'll always do an async free. - * * **unlink**: A callback function pointer that used to notifies the module that the key has - * been removed from the DB by redis, and may soon be freed by a background thread. Note that - * it won't be called on FLUSHALL/FLUSHDB (both sync and async), and the module can use the + * * **unlink**: A callback function pointer that used to notifies the module that the key has + * been removed from the DB by redis, and may soon be freed by a background thread. Note that + * it won't be called on FLUSHALL/FLUSHDB (both sync and async), and the module can use the * RedisModuleEvent_FlushDB to hook into that. * * **copy**: A callback function pointer that is used to make a copy of the specified key. * The module is expected to perform a deep copy of the specified value and return it. @@ -7431,7 +7448,7 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj * A NULL return value is considered an error and the copy operation fails. * Note: if the target key exists and is being overwritten, the copy callback will be * called first, followed by a free callback to the value that is being replaced. - * + * * * **defrag**: A callback function pointer that is used to request the module to defrag * a key. The module should then iterate pointers and call the relevant RM_Defrag*() * functions to defragment pointers or complex types. The module should continue @@ -7459,7 +7476,7 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj * * **aux_save2**: Similar to `aux_save`, but with small semantic change, if the module * saves nothing on this callback then no data about this aux field will be written to the * RDB and it will be possible to load the RDB even if the module is not loaded. - * + * * Note: the module name "AAAAAAAAA" is reserved and produces an error, it * happens to be pretty lame as well. * @@ -8016,7 +8033,7 @@ void *RM_LoadDataTypeFromStringEncver(const RedisModuleString *str, const module void *ret; rioInitWithBuffer(&payload, str->ptr); - moduleType *mt_non_const = (moduleType *)mt; /*cast const away*/ + moduleType *mt_non_const = (moduleType *)mt; /*cast const away*/ moduleInitIOContext(&io, &mt_non_const->entity, &payload, NULL, -1); /* All RM_Save*() calls always write a version 2 compatible format, so we @@ -8031,7 +8048,7 @@ void *RM_LoadDataTypeFromStringEncver(const RedisModuleString *str, const module } /* Similar to RM_LoadDataTypeFromStringEncver, original version of the API, kept - * for backward compatibility. + * for backward compatibility. */ void *RM_LoadDataTypeFromString(const RedisModuleString *str, const moduleType *mt) { return RM_LoadDataTypeFromStringEncver(str, mt, 0); @@ -8982,7 +8999,7 @@ int moduleBlockedClientMayTimeout(client *c) { /* Called when our client timed out. After this function unblockClient() * is called, and it will invalidate the blocked client. So this function * does not need to do any cleanup. Eventually the module will call the - * API to unblock the client and the memory will be released. + * API to unblock the client and the memory will be released. * * This function should only be called from the main thread, we must handle the unblocking * of the client synchronously. This ensures that we can reply to the client before @@ -9443,6 +9460,52 @@ void firePostExecutionUnitJobs(void) { exitExecutionUnit(); } +/* Drain the keyed post-notification jobs queued during the current call(). + * Invoked at the tail of every call() (see afterCommand), so callbacks fire + * between sub-commands inside MULTI/EXEC. Uses server.firing_keyed_post_notif_jobs + * as a reentrance guard, since the per-call() hook bypasses the execution_nesting + * gating used by firePostExecutionUnitJobs. + * + * Also invoked from the AOF replay loop in loadSingleAppendOnlyFile after + * each single command (sub-commands inside MULTI/EXEC are drained by + * afterCommand() since EXEC's processor goes through call()). This is what + * makes the firing pattern during AOF replay match normal command + * execution: one drain per single command, one per MULTI/EXEC sub-command. + * + * Contract for the callback: it MUST only touch non-replicated, + * non-AOF-persisted state — module-attached key metadata is the canonical + * case. The same callback fires on the master, on every replica receiving + * master-propagated commands, and during AOF replay; consistency relies on + * each instance running the same callback over the same KSN stream, not + * on replication of the callback's side-effects. Writes back into the + * keyspace via RM_Call(..., "!...") will duplicate effects already in the + * AOF on replay and diverge the replica from its master in steady state. + * The runtime does not enforce this; it is a documented contract. */ +void firePostKeyedNotificationJobs(void) { + if (server.firing_keyed_post_notif_jobs) return; + if (listLength(modulePostKeyedNotificationJobs) == 0) return; + server.firing_keyed_post_notif_jobs = 1; + enterExecutionUnit(0, 0); + while (listLength(modulePostKeyedNotificationJobs) > 0) { + listNode *ln = listFirst(modulePostKeyedNotificationJobs); + RedisModulePostKeyedNotificationJob *job = listNodeValue(ln); + listDelNode(modulePostKeyedNotificationJobs, ln); + + RedisModuleCtx ctx; + moduleCreateContext(&ctx, job->module, REDISMODULE_CTX_TEMP_CLIENT); + selectDb(ctx.client, job->dbid); + + job->callback(&ctx, job->key, job->pd); + if (job->free_pd) job->free_pd(job->pd); + decrRefCount(job->key); + + moduleFreeContext(&ctx); + zfree(job); + } + exitExecutionUnit(); + server.firing_keyed_post_notif_jobs = 0; +} + /* When running inside a key space notification callback, it is dangerous and highly discouraged to perform any write * operation (See `RM_SubscribeToKeyspaceEvents`). In order to still perform write actions in this scenario, * Redis provides `RM_AddPostNotificationJob` API. The API allows to register a job callback which Redis will call @@ -9475,6 +9538,91 @@ int RM_AddPostNotificationJob(RedisModuleCtx *ctx, RedisModulePostNotificationJo return REDISMODULE_OK; } +/* Sibling of `RM_AddPostNotificationJob` that fires per-key. May only be + * called from within a keyspace-notification handler (the single-key context + * the API is scoped to). Each call binds the job to the caller-supplied + * `key`, so multi-key commands such as MSET (which emit one notification per + * key) may register one job per affected key. + * + * Firing schedule (superset of `RM_AddPostNotificationJob`): + * - At the tail of every `call()`, so per-key effects are observable between + * MULTI/EXEC sub-commands. + * - At the end of the outermost execution unit, alongside the regular + * post-notification jobs. This covers the active-expire and eviction + * paths (both call `postExecutionUnitOperations` themselves). + * - During AOF replay, at the tail of every replayed command — single + * commands and each sub-command of MULTI/EXEC — so per-key state that + * lives outside the AOF (e.g. module-attached key metadata) is rebuilt + * in the same pattern as during normal execution. + * + * Jobs fire in submission order. `key` must be a valid RedisModuleString; the + * implementation takes its own reference and the caller retains ownership of + * its own reference. `free_pd` may be NULL. + * + * Cross-phase contract (enforced by documentation, not by the runtime): + * - The callback MUST only touch non-replicated, non-AOF-persisted state, + * such as module-attached key metadata via `RM_SetKeyMeta` / + * `RM_GetKeyMeta`. The same callback fires on the master, on every + * replica receiving master-propagated commands, and during AOF replay; + * each instance maintains its own per-key state independently and they + * converge because they all run the same callback over the same KSN + * stream. + * - Touching the keyspace from inside the callback (e.g. + * `RM_Call(..., "!...")`) is a contract violation. On AOF replay it + * amplifies the AOF; on a replica it diverges the replica from its + * master. The runtime does not suppress propagation inside the per-key + * drain and does not enforce this contract today — modules registering + * per-key jobs are responsible for upholding it. A future change may + * add runtime checks if reviewers want stronger enforcement. + * - Causing a regular post-notification job to be queued from inside a + * per-key callback is unsupported. The outermost drain in + * `postExecutionUnitOperations` runs the regular queue first and the + * per-key queue second; a regular job appended during the per-key drain + * (for example via an `RM_Call` whose KSN handler in another module + * calls `RM_AddPostNotificationJob`) is not drained before + * `propagatePendingCommands`, so its writes land in a separate + * replication transaction from the originating command. This falls out + * naturally if the keyspace-mutation contract above is upheld, since + * keyed callbacks would not be issuing the writes that surface the + * cross-queueing KSN in the first place. + * + * Return REDISMODULE_OK on success and REDISMODULE_ERR if called outside a + * keyspace-notification handler. The API is permitted on read-only replicas + * and during AOF replay, so per-key state stays continuously in sync with + * the keyspace events the instance observes. */ +int RM_AddPostNotificationJobForKey(RedisModuleCtx *ctx, RedisModulePostNotificationJobPerKeyFunc callback, RedisModuleString *key, void *privdata, void (*free_privdata)(void*)) { + + /* The API is only meaningful from inside a keyspace-notification handler: + * that is the single-key context the per-key contract is scoped to. */ + if (!server.in_keyspace_notification) { + serverLog(LL_WARNING, + "API misuse detected in module %s: " + "RedisModule_AddPostNotificationJobForKey called outside a " + "keyspace-notification handler.", + ctx->module->name); + return REDISMODULE_ERR; + } + + if (!callback || !key) { + serverLog(LL_WARNING, + "API misuse detected in module %s: " + "RedisModule_AddPostNotificationJobForKey called with NULL callback or key.", + ctx->module->name); + return REDISMODULE_ERR; + } + + RedisModulePostKeyedNotificationJob *job = zmalloc(sizeof(*job)); + job->module = ctx->module; + job->callback = callback; + job->key = key; + incrRefCount(job->key); + job->pd = privdata; + job->free_pd = free_privdata; + job->dbid = ctx->client->db->id; + listAddNodeTail(modulePostKeyedNotificationJobs, job); + return REDISMODULE_OK; +} + /* Get the configured bitmap of notify-keyspace-events (Could be used * for additional filtering in RedisModuleNotificationFunc) */ int RM_GetNotifyKeyspaceEvents(void) { @@ -9534,6 +9682,12 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid, * but we do not want to update the cached time */ enterExecutionUnit(0, 0); + /* Mark that we are inside a keyspace-notification dispatch. This is the + * single-key context that RM_AddPostNotificationJobForKey requires; the + * counter form lets nested notifications (a callback that triggers + * another KSN) nest cleanly. */ + server.in_keyspace_notification++; + listIter li; listNode *ln; listRewind(moduleKeyspaceSubscribers,&li); @@ -9581,6 +9735,7 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid, } } + server.in_keyspace_notification--; exitExecutionUnit(); } @@ -10506,9 +10661,9 @@ int RM_FreeModuleUser(RedisModuleUser *user) { * The returned string must be freed by the caller with RedisModule_FreeString() * or by enabling automatic memory management on a context. */ RedisModuleString *RM_GetUserUsername(RedisModuleCtx *ctx, const RedisModuleUser *user) { - if(user == NULL || user->user == NULL || user->user->name == NULL) + if(user == NULL || user->user == NULL || user->user->name == NULL) return NULL; - + return RM_CreateString(ctx, user->user->name, sdslen(user->user->name)); } @@ -10640,13 +10795,13 @@ int RM_ACLCheckCommandPermissions(RedisModuleUser *user, RedisModuleString **arg * keyspec for logical operations. These flags are documented in RedisModule_SetCommandInfo as * the REDISMODULE_CMD_KEY_ACCESS, REDISMODULE_CMD_KEY_UPDATE, REDISMODULE_CMD_KEY_INSERT, * and REDISMODULE_CMD_KEY_DELETE flags. - * + * * If no flags are supplied, the user is still required to have some access to the key for * this command to return successfully. * * If the user is able to access the key then REDISMODULE_OK is returned, otherwise * REDISMODULE_ERR is returned and errno is set to one of the following values: - * + * * * EINVAL: The provided flags are invalid. * * EACCESS: The user does not have permission to access the key. */ @@ -10670,18 +10825,18 @@ int RM_ACLCheckKeyPermissions(RedisModuleUser *user, RedisModuleString *key, int return REDISMODULE_OK; } -/* Check if the user can access keys matching the given key prefix according to the ACLs - * attached to the user and the flags representing key access. The flags are the same that - * are used in the keyspec for logical operations. These flags are documented in - * RedisModule_SetCommandInfo as the REDISMODULE_CMD_KEY_ACCESS, +/* Check if the user can access keys matching the given key prefix according to the ACLs + * attached to the user and the flags representing key access. The flags are the same that + * are used in the keyspec for logical operations. These flags are documented in + * RedisModule_SetCommandInfo as the REDISMODULE_CMD_KEY_ACCESS, * REDISMODULE_CMD_KEY_UPDATE, REDISMODULE_CMD_KEY_INSERT, and REDISMODULE_CMD_KEY_DELETE flags. - * - * If no flags are supplied, the user is still required to have some access to keys matching + * + * If no flags are supplied, the user is still required to have some access to keys matching * the prefix for this command to return successfully. * * If the user is able to access keys matching the prefix, then REDISMODULE_OK is returned. * Otherwise, REDISMODULE_ERR is returned and errno is set to one of the following values: - * + * * * EINVAL: The provided flags are invalid. * * EACCES: The user does not have permission to access keys matching the prefix. */ @@ -10715,9 +10870,9 @@ int RM_ACLCheckKeyPrefixPermissions(RedisModuleUser *user, RedisModuleString *pr * * If the user is able to access the pubsub channel then REDISMODULE_OK is returned, otherwise * REDISMODULE_ERR is returned and errno is set to one of the following values: - * + * * * EINVAL: The provided flags are invalid. - * * EACCESS: The user does not have permission to access the pubsub channel. + * * EACCESS: The user does not have permission to access the pubsub channel. */ int RM_ACLCheckChannelPermissions(RedisModuleUser *user, RedisModuleString *ch, int flags) { const int allow_mask = (REDISMODULE_CMD_CHANNEL_PUBLISH @@ -10875,15 +11030,15 @@ int RM_DeauthenticateAndCloseClient(RedisModuleCtx *ctx, uint64_t client_id) { return REDISMODULE_OK; } -/* Redact the client command argument specified at the given position. Redacted arguments +/* Redact the client command argument specified at the given position. Redacted arguments * are obfuscated in user facing commands such as SLOWLOG or MONITOR, as well as * never being written to server logs. This command may be called multiple times on the * same position. - * - * Note that the command name, position 0, can not be redacted. - * - * Returns REDISMODULE_OK if the argument was redacted and REDISMODULE_ERR if there - * was an invalid parameter passed in or the position is outside the client + * + * Note that the command name, position 0, can not be redacted. + * + * Returns REDISMODULE_OK if the argument was redacted and REDISMODULE_ERR if there + * was an invalid parameter passed in or the position is outside the client * argument range. */ int RM_RedactClientCommandArgument(RedisModuleCtx *ctx, int pos) { if (!ctx || !ctx->client || pos <= 0 || ctx->client->argc <= pos) { @@ -12074,7 +12229,7 @@ static void moduleScanKeyCallback(void *privdata, const dictEntry *de, dictEntry field = createStringObject(fieldStr, sdslen(fieldStr)); value = createStringObjectFromLongDouble(znode->score, 0); } - + serverAssert(field != NULL); data->fn(data->key, field, value, data->user_data); decrRefCount(field); @@ -12562,7 +12717,7 @@ static uint64_t moduleEventVersions[] = { * int32_t dbnum_second; // Swap Db second dbnum * * * RedisModuleEvent_ReplBackup - * + * * WARNING: Replication Backup events are deprecated since Redis 7.0 and are never fired. * See RedisModuleEvent_ReplAsyncLoad for understanding how Async Replication Loading events * are now triggered when repl-diskless-load is set to swapdb. @@ -12577,7 +12732,7 @@ static uint64_t moduleEventVersions[] = { * * `REDISMODULE_SUBEVENT_REPL_BACKUP_CREATE` * * `REDISMODULE_SUBEVENT_REPL_BACKUP_RESTORE` * * `REDISMODULE_SUBEVENT_REPL_BACKUP_DISCARD` - * + * * * RedisModuleEvent_ReplAsyncLoad * * Called when repl-diskless-load config is set to swapdb and a replication with a master of same @@ -12619,7 +12774,7 @@ static uint64_t moduleEventVersions[] = { * structure with the following fields: * * const char **config_names; // An array of C string pointers containing the - * // name of each modified configuration item + * // name of each modified configuration item * uint32_t num_changes; // The number of elements in the config_names array * * * RedisModule_Event_Key @@ -12773,7 +12928,7 @@ int RM_IsSubEventSupported(RedisModuleEvent event, int64_t subevent) { case REDISMODULE_EVENT_EVENTLOOP: return subevent < _REDISMODULE_SUBEVENT_EVENTLOOP_NEXT; case REDISMODULE_EVENT_CONFIG: - return subevent < _REDISMODULE_SUBEVENT_CONFIG_NEXT; + return subevent < _REDISMODULE_SUBEVENT_CONFIG_NEXT; case REDISMODULE_EVENT_KEY: return subevent < _REDISMODULE_SUBEVENT_KEY_NEXT; case REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION: @@ -12955,7 +13110,7 @@ void moduleNotifyKeyUnlink(robj *key, kvobj *kv, int dbid, int flags) { server.allow_access_trimmed--; } -/* Return the free_effort of the module, it will automatically choose to call +/* Return the free_effort of the module, it will automatically choose to call * `free_effort` or `free_effort2`, and the default return value is 1. * value of 0 means very high effort (always asynchronous freeing). */ size_t moduleGetFreeEffort(robj *key, robj *val, int dbid) { @@ -12968,12 +13123,12 @@ size_t moduleGetFreeEffort(robj *key, robj *val, int dbid) { effort = mt->free_effort2(&ctx,mv->value); } else if (mt->free_effort != NULL) { effort = mt->free_effort(key,mv->value); - } + } return effort; } -/* Return the memory usage of the module, it will automatically choose to call +/* Return the memory usage of the module, it will automatically choose to call * `mem_usage` or `mem_usage2`, and the default return value is 0. */ size_t moduleGetMemUsage(robj *key, robj *val, size_t sample_size, int dbid) { moduleValue *mv = val->ptr; @@ -12985,7 +13140,7 @@ size_t moduleGetMemUsage(robj *key, robj *val, size_t sample_size, int dbid) { size = mt->mem_usage2(&ctx, mv->value, sample_size); } else if (mt->mem_usage != NULL) { size = mt->mem_usage(mv->value); - } + } return size; } @@ -13056,6 +13211,7 @@ void moduleInitModulesSystem(void) { moduleKeyspaceSubscribers = listCreate(); modulePostExecUnitJobs = listCreate(); + modulePostKeyedNotificationJobs = listCreate(); /* Set up filter list */ moduleCommandFilters = listCreate(); @@ -13456,7 +13612,7 @@ int moduleOnLoad(int (*onload)(void *, void **, int), const char *path, void *ha /* Unload the module registered with the specified name. On success * C_OK is returned, otherwise C_ERR is returned and errmsg is set * with an appropriate message. - * Only forcefully unload this module, passing forced_unload != 0, + * Only forcefully unload this module, passing forced_unload != 0, * if it is certain that it has not yet been in use (e.g., immediate * unload on failed load). */ int moduleUnload(sds name, const char **errmsg, int forced_unload) { @@ -13639,7 +13795,7 @@ sds genModulesInfoString(sds info) { /* -------------------------------------------------------------------------- * Module Configurations API internals * -------------------------------------------------------------------------- */ - + /* Check if the configuration name is already registered */ int isModuleConfigNameRegistered(RedisModule *module, const char *name) { listNode *match = listSearchKey(module->module_configs, (void *) name); @@ -13693,11 +13849,11 @@ int moduleVerifyResourceName(const char *name) { return REDISMODULE_OK; } -/* Verify unprefixed name config might be a single "" or in the form - * "|". Unlike moduleVerifyResourceName(), unprefixed name config - * allows a single dot in the name or alias. - * - * delim - Updates to point to "|" if it exists, NULL otherwise. +/* Verify unprefixed name config might be a single "" or in the form + * "|". Unlike moduleVerifyResourceName(), unprefixed name config + * allows a single dot in the name or alias. + * + * delim - Updates to point to "|" if it exists, NULL otherwise. */ int moduleVerifyUnprefixedName(const char *nameAlias, const char **delim) { if (nameAlias[0] == '\0') @@ -13708,7 +13864,7 @@ int moduleVerifyUnprefixedName(const char *nameAlias, const char **delim) { for (size_t i = 0; nameAlias[i] != '\0'; i++) { char ch = nameAlias[i]; - + if (((*delim) == NULL) && (ch == '|')) { /* Handle single separator between name and alias */ if (!lname) { @@ -13723,7 +13879,7 @@ int moduleVerifyUnprefixedName(const char *nameAlias, const char **delim) { ++lname; } else if (ch == '.') { /* Allow only one dot per section (name or alias) */ - if (++dot_count > 1) { + if (++dot_count > 1) { serverLog(LL_WARNING, "Invalid character sequence in Module configuration name or alias: %s", nameAlias); return REDISMODULE_ERR; } @@ -13732,7 +13888,7 @@ int moduleVerifyUnprefixedName(const char *nameAlias, const char **delim) { return REDISMODULE_ERR; } } - + if (!lname) { serverLog(LL_WARNING, "Module configuration name or alias is empty : %s", nameAlias); return REDISMODULE_ERR; @@ -13741,7 +13897,7 @@ int moduleVerifyUnprefixedName(const char *nameAlias, const char **delim) { return REDISMODULE_OK; } -/* This is a series of set functions for each type that act as dispatchers for +/* This is a series of set functions for each type that act as dispatchers for * config.c to call module set callbacks. */ #define CONFIG_ERR_SIZE 256 static char configerr[CONFIG_ERR_SIZE]; @@ -13753,8 +13909,8 @@ static void propagateErrorString(RedisModuleString *err_in, const char **err) { } } -/* If configuration was originally registered with indication to prefix the name, - * return the name without the prefix by skipping prefix ".". +/* If configuration was originally registered with indication to prefix the name, + * return the name without the prefix by skipping prefix ".". * Otherwise, return the stored name as is. */ static char *getRegisteredConfigName(ModuleConfig *config) { if (config->unprefixedFlag) @@ -13762,7 +13918,7 @@ static char *getRegisteredConfigName(ModuleConfig *config) { /* For prefixed configuration, find the '.' indicating the end of the prefix */ char *endOfPrefix = strchr(config->name, '.'); - serverAssert(endOfPrefix != NULL); + serverAssert(endOfPrefix != NULL); return endOfPrefix + 1; } @@ -13778,7 +13934,7 @@ int setModuleBoolConfig(ModuleConfig *config, int val, const char **err) { int setModuleStringConfig(ModuleConfig *config, sds strval, const char **err) { RedisModuleString *error = NULL; RedisModuleString *new = createStringObject(strval, sdslen(strval)); - + char *rname = getRegisteredConfigName(config); int return_code = config->set_fn.set_string(rname, new, config->privdata, &error); propagateErrorString(error, err); @@ -13802,7 +13958,7 @@ int setModuleNumericConfig(ModuleConfig *config, long long val, const char **err return return_code == REDISMODULE_OK ? 1 : 0; } -/* This is a series of get functions for each type that act as dispatchers for +/* This is a series of get functions for each type that act as dispatchers for * config.c to call module set callbacks. */ int getModuleBoolConfig(ModuleConfig *module_config) { char *rname = getRegisteredConfigName(module_config); @@ -13941,19 +14097,19 @@ int moduleConfigApplyConfig(list *module_configs, const char **err, const char * * -------------------------------------------------------------------------- */ /* Resolve config name and create a module config object */ -ModuleConfig *createModuleConfig(const char *name, RedisModuleConfigApplyFunc apply_fn, - void *privdata, RedisModule *module, unsigned int flags) +ModuleConfig *createModuleConfig(const char *name, RedisModuleConfigApplyFunc apply_fn, + void *privdata, RedisModule *module, unsigned int flags) { sds cname, alias = NULL; /* Determine the configuration name: * - If the unprefixed flag is set, the "." prefix is omitted. * - An optional alias can be specified using "|". - * + * * Examples: * - Unprefixed: "bf.initial_size" or "bf-initial-size|bf.initial_size". * - Prefixed: "initial_size" becomes ".initial_size". - */ + */ if (flags & REDISMODULE_CONFIG_UNPREFIXED) { const char *delim = strchr(name, '|'); cname = sdsnew(name); @@ -13965,7 +14121,7 @@ ModuleConfig *createModuleConfig(const char *name, RedisModuleConfigApplyFunc ap /* Add the module name prefix */ cname = sdscatfmt(sdsempty(), "%s.%s", module->name, name); } - + ModuleConfig *new_config = zmalloc(sizeof(ModuleConfig)); new_config->unprefixedFlag = flags & REDISMODULE_CONFIG_UNPREFIXED; new_config->name = cname; @@ -13977,7 +14133,7 @@ ModuleConfig *createModuleConfig(const char *name, RedisModuleConfigApplyFunc ap } /* Verify the configuration name and check for duplicates. - * + * * - If the configuration is flagged as unprefixed, it checks for duplicate * names and optional aliases in the format |. * - If the configuration is prefixed, it ensures the name is unique with @@ -13992,22 +14148,22 @@ int moduleConfigValidityCheck(RedisModule *module, const char *name, unsigned in errno = EINVAL; return REDISMODULE_ERR; } - - int isdup = 0; + + int isdup = 0; if (flags & REDISMODULE_CONFIG_UNPREFIXED) { const char *delim = NULL; /* Pointer to the '|' delimiter in | */ if (moduleVerifyUnprefixedName(name, &delim)){ errno = EINVAL; return REDISMODULE_ERR; } - - if (delim) { + + if (delim) { /* Temporary split the "|" for the check */ int count; sds *ar = sdssplitlen(name, strlen(name), "|", 1, &count); serverAssert(count == 2); /* Already validated */ - isdup = configExists(ar[0]) || - configExists(ar[1]) || + isdup = configExists(ar[0]) || + configExists(ar[1]) || (sdscmp(ar[0], ar[1]) == 0); sdsfreesplitres(ar, count); } else { @@ -14025,7 +14181,7 @@ int moduleConfigValidityCheck(RedisModule *module, const char *name, unsigned in isdup = configExists(fullname); sdsfree(fullname); } - + if (isdup) { serverLog(LL_WARNING, "Configuration by the name: %s already registered", name); errno = EALREADY; @@ -14136,19 +14292,19 @@ int RM_RegisterStringConfig(RedisModuleCtx *ctx, const char *name, const char *d if (moduleConfigValidityCheck(module, name, flags, NUMERIC_CONFIG)) { return REDISMODULE_ERR; } - + ModuleConfig *mc = createModuleConfig(name, applyfn, privdata, module, flags); mc->get_fn.get_string = getfn; mc->set_fn.set_string = setfn; listAddNodeTail(module->module_configs, mc); unsigned int cflags = maskModuleConfigFlags(flags); - addModuleStringConfig(sdsdup(mc->name), (mc->alias) ? sdsdup(mc->alias) : NULL, + addModuleStringConfig(sdsdup(mc->name), (mc->alias) ? sdsdup(mc->alias) : NULL, cflags, mc, default_val ? sdsnew(default_val) : NULL); return REDISMODULE_OK; } -/* Create a bool config that server clients can interact with via the - * `CONFIG SET`, `CONFIG GET`, and `CONFIG REWRITE` commands. See +/* Create a bool config that server clients can interact with via the + * `CONFIG SET`, `CONFIG GET`, and `CONFIG REWRITE` commands. See * RedisModule_RegisterStringConfig for detailed information about configs. */ int RM_RegisterBoolConfig(RedisModuleCtx *ctx, const char *name, int default_val, unsigned int flags, RedisModuleConfigGetBoolFunc getfn, RedisModuleConfigSetBoolFunc setfn, RedisModuleConfigApplyFunc applyfn, void *privdata) { RedisModule *module = ctx->module; @@ -14160,15 +14316,15 @@ int RM_RegisterBoolConfig(RedisModuleCtx *ctx, const char *name, int default_val mc->set_fn.set_bool = setfn; listAddNodeTail(module->module_configs, mc); unsigned int cflags = maskModuleConfigFlags(flags); - addModuleBoolConfig(sdsdup(mc->name), (mc->alias) ? sdsdup(mc->alias) : NULL, + addModuleBoolConfig(sdsdup(mc->name), (mc->alias) ? sdsdup(mc->alias) : NULL, cflags, mc, default_val); return REDISMODULE_OK; } -/* - * Create an enum config that server clients can interact with via the - * `CONFIG SET`, `CONFIG GET`, and `CONFIG REWRITE` commands. - * Enum configs are a set of string tokens to corresponding integer values, where +/* + * Create an enum config that server clients can interact with via the + * `CONFIG SET`, `CONFIG GET`, and `CONFIG REWRITE` commands. + * Enum configs are a set of string tokens to corresponding integer values, where * the string value is exposed to Redis clients but the value passed Redis and the * module is the integer value. These values are defined in enum_values, an array * of null-terminated c strings, and int_vals, an array of enum values who has an @@ -14181,7 +14337,7 @@ int RM_RegisterBoolConfig(RedisModuleCtx *ctx, const char *name, int default_val * int getEnumConfigCommand(const char *name, void *privdata) { * return enum_val; * } - * + * * int setEnumConfigCommand(const char *name, int val, void *privdata, const char **err) { * enum_val = val; * return REDISMODULE_OK; @@ -14212,14 +14368,14 @@ int RM_RegisterEnumConfig(RedisModuleCtx *ctx, const char *name, int default_val listAddNodeTail(module->module_configs, mc); unsigned int cflags = maskModuleConfigFlags(flags) | maskModuleEnumConfigFlags(flags); - addModuleEnumConfig(sdsdup(mc->name), (mc->alias) ? sdsdup(mc->alias) : NULL, + addModuleEnumConfig(sdsdup(mc->name), (mc->alias) ? sdsdup(mc->alias) : NULL, cflags, mc, default_val, enum_vals, num_enum_vals); return REDISMODULE_OK; } /* - * Create an integer config that server clients can interact with via the - * `CONFIG SET`, `CONFIG GET`, and `CONFIG REWRITE` commands. See + * Create an integer config that server clients can interact with via the + * `CONFIG SET`, `CONFIG GET`, and `CONFIG REWRITE` commands. See * RedisModule_RegisterStringConfig for detailed information about configs. */ int RM_RegisterNumericConfig(RedisModuleCtx *ctx, const char *name, long long default_val, unsigned int flags, long long min, long long max, RedisModuleConfigGetNumericFunc getfn, RedisModuleConfigSetNumericFunc setfn, RedisModuleConfigApplyFunc applyfn, void *privdata) { RedisModule *module = ctx->module; @@ -14233,7 +14389,7 @@ int RM_RegisterNumericConfig(RedisModuleCtx *ctx, const char *name, long long de unsigned int numeric_flags = maskModuleNumericConfigFlags(flags); unsigned int cflags = maskModuleConfigFlags(flags); - addModuleNumericConfig(sdsdup(mc->name), (mc->alias) ? sdsdup(mc->alias) : NULL, + addModuleNumericConfig(sdsdup(mc->name), (mc->alias) ? sdsdup(mc->alias) : NULL, cflags, mc, default_val, numeric_flags, min, max); return REDISMODULE_OK; } @@ -14741,7 +14897,7 @@ NULL argc = c->argc - 3; argv = &c->argv[3]; } - /* If this is a loadex command we want to populate server.module_configs_queue with + /* If this is a loadex command we want to populate server.module_configs_queue with * sds NAME VALUE pairs. We also want to increment argv to just after ARGS, if supplied. */ if (parseLoadexArguments((RedisModuleString ***) &argv, &argc) == REDISMODULE_OK && moduleLoad(c->argv[2]->ptr, (void **)argv, argc, 1) == C_OK) @@ -15146,8 +15302,8 @@ void *RM_DefragAlloc(RedisModuleDefragCtx *ctx, void *ptr) { * owner. For such usecase RM_DefragAlloc is enough. But on some usecases the user * might want to replace a pointer with multiple owners in different keys. * In such case, an in place replacement can not work because the other key still - * keep a pointer to the old value. - * + * keep a pointer to the old value. + * * RM_DefragAllocRaw and RM_DefragFreeRaw allows to control when the memory * for defrag purposes will be allocated and when it will be freed, * allow to support more complex defrag usecases. */ @@ -15157,7 +15313,7 @@ void *RM_DefragAllocRaw(RedisModuleDefragCtx *ctx, size_t size) { } /* Free memory for defrag purposes - * + * * See RM_DefragAllocRaw for more information. */ void RM_DefragFreeRaw(RedisModuleDefragCtx *ctx, void *ptr) { UNUSED(ctx); @@ -15333,7 +15489,7 @@ int moduleDefragValue(robj *key, robj *value, int dbid) { /* Call registered module API defrag start functions */ void moduleDefragStart(void) { - dictForEach(modules, struct RedisModule, module, + dictForEach(modules, struct RedisModule, module, if (module->defrag_start_cb) { RedisModuleDefragCtx defrag_ctx = INIT_MODULE_DEFRAG_CTX(0, NULL, NULL, -1); module->defrag_start_cb(&defrag_ctx); @@ -15343,7 +15499,7 @@ void moduleDefragStart(void) { /* Call registered module API defrag end functions */ void moduleDefragEnd(void) { - dictForEach(modules, struct RedisModule, module, + dictForEach(modules, struct RedisModule, module, if (module->defrag_end_cb) { RedisModuleDefragCtx defrag_ctx = INIT_MODULE_DEFRAG_CTX(0, NULL, NULL, -1); module->defrag_end_cb(&defrag_ctx); @@ -15595,6 +15751,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(SubscribeToKeyspaceEventsWithSubkeys); REGISTER_API(UnsubscribeFromKeyspaceEventsWithSubkeys); REGISTER_API(AddPostNotificationJob); + REGISTER_API(AddPostNotificationJobForKey); REGISTER_API(RegisterClusterMessageReceiver); REGISTER_API(SendClusterMessage); REGISTER_API(GetClusterNodeInfo); diff --git a/src/redismodule.h b/src/redismodule.h index 59a592c4a..0c1d146f9 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -996,6 +996,7 @@ typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlocke typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); typedef void (*RedisModuleNotificationWithSubkeysFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key, RedisModuleString **subkeys, int count); typedef void (*RedisModulePostNotificationJobFunc) (RedisModuleCtx *ctx, void *pd); +typedef void (*RedisModulePostNotificationJobPerKeyFunc) (RedisModuleCtx *ctx, RedisModuleString *key, void *pd); typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver); typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value); typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int when); @@ -1383,6 +1384,7 @@ REDISMODULE_API int (*RedisModule_UnsubscribeFromKeyspaceEvents)(RedisModuleCtx REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEventsWithSubkeys)(RedisModuleCtx *ctx, int types, int flags, RedisModuleNotificationWithSubkeysFunc cb) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_UnsubscribeFromKeyspaceEventsWithSubkeys)(RedisModuleCtx *ctx, int types, int flags, RedisModuleNotificationWithSubkeysFunc cb) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_AddPostNotificationJob)(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_AddPostNotificationJobForKey)(RedisModuleCtx *ctx, RedisModulePostNotificationJobPerKeyFunc callback, RedisModuleString *key, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_NotifyKeyspaceEventWithSubkeys)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key, RedisModuleString **subkeys, int count) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)(void) REDISMODULE_ATTR; @@ -1789,6 +1791,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(SubscribeToKeyspaceEventsWithSubkeys); REDISMODULE_GET_API(UnsubscribeFromKeyspaceEventsWithSubkeys); REDISMODULE_GET_API(AddPostNotificationJob); + REDISMODULE_GET_API(AddPostNotificationJobForKey); REDISMODULE_GET_API(NotifyKeyspaceEvent); REDISMODULE_GET_API(NotifyKeyspaceEventWithSubkeys); REDISMODULE_GET_API(GetNotifyKeyspaceEvents); diff --git a/src/server.c b/src/server.c index df660175e..4c0ba8841 100644 --- a/src/server.c +++ b/src/server.c @@ -186,7 +186,7 @@ void _serverLog(int level, const char *fmt, ...) { serverLogRaw(level,msg); } -/* Low level logging from signal handler. Should be used with pre-formatted strings. +/* Low level logging from signal handler. Should be used with pre-formatted strings. See serverLogFromHandler. */ void serverLogRawFromHandler(int level, const char *msg) { int fd; @@ -274,7 +274,7 @@ mstime_t commandTimeSnapshot(void) { /* After an RDB dump or AOF rewrite we exit from children using _exit() instead of * exit(), because the latter may interact with the same file objects used by * the parent process. However if we are testing the coverage normal exit() is - * used in order to obtain the right coverage information. + * used in order to obtain the right coverage information. * There is a caveat for when we exit due to a signal. * In this case we want the function to be async signal safe, so we can't use exit() */ @@ -793,7 +793,7 @@ dictType clientDictType = { NULL, /* val dup */ dictClientKeyCompare, /* key compare */ .no_value = 1, /* no values in this dict */ - .keys_are_odd = 0 /* a client pointer is not an odd pointer */ + .keys_are_odd = 0 /* a client pointer is not an odd pointer */ }; kvstoreType kvstoreBaseType = { @@ -1735,7 +1735,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * a higher frequency. */ run_with_period(1000) { if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) && - server.aof_last_write_status == C_ERR) + server.aof_last_write_status == C_ERR) { flushAppendOnlyFile(0); } @@ -1745,8 +1745,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { updatePausedActions(); /* Replication cron function -- used to reconnect to master, - * detect transfer failures, start background RDB transfers and so forth. - * + * detect transfer failures, start background RDB transfers and so forth. + * * If Redis is trying to failover then run the replication cron faster so * progress on the handshake happens more quickly. */ if (server.failover_state != NO_FAILOVER) { @@ -1986,7 +1986,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * processUnblockedClients(), so if there are multiple pipelined WAITs * and the just unblocked WAIT gets blocked again, we don't have to wait * a server cron cycle in absence of other event loop events. See #6623. - * + * * We also don't send the ACKs while clients are paused, since it can * increment the replication backlog, they'll be sent after the pause * if we are still the master. */ @@ -1996,7 +1996,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { } /* We may have received updates from clients about their current offset. NOTE: - * this can't be done where the ACK is received since failover will disconnect + * this can't be done where the ACK is received since failover will disconnect * our clients. */ updateFailoverStatus(); @@ -2972,6 +2972,8 @@ void initServer(void) { server.errors = raxNew(); server.errors_enabled = 1; server.execution_nesting = 0; + server.firing_keyed_post_notif_jobs = 0; + server.in_keyspace_notification = 0; server.clients = listCreate(); server.clients_index = raxNew(); server.clients_to_close = listCreate(); @@ -3847,7 +3849,25 @@ void postExecutionUnitOperations(void) { if (server.execution_nesting) return; + /* At this site the regular queue drains first and the keyed queue runs + * after it. We're past every command's afterCommand drain by now (no + * more sub-commands coming), so the per-sub-command ordering that + * afterCommand needs doesn't apply here. Keeping regular first matches + * the in-process registration order of the existing API and gives + * identical propagated streams between the two APIs for the cron-driven + * paths (active-expire in expire.c, eviction in evict.c — both call us + * directly after their own enter/exitExecutionUnit). + * + * Cross-queue registration during these drains is unsupported: a keyed + * callback that causes a regular job to be queued (e.g. via a KSN + * surfaced from its RM_Call landing in another module's handler) leaves + * that regular job in the queue past propagatePendingCommands, so its + * writes land in a separate replication transaction. The contract + * around what keyed callbacks may do (documented on + * RM_AddPostNotificationJobForKey in module.c) is what keeps this from + * triggering in practice; modules are responsible for upholding it. */ firePostExecutionUnitJobs(); + firePostKeyedNotificationJobs(); /* If we are at the top-most call() and not inside a an active module * context (e.g. within a module timer) we can propagate what we accumulated. */ @@ -4248,6 +4268,12 @@ void rejectCommandFormat(client *c, const char *fmt, ...) { /* This is called after a command in call, we can do some maintenance job in it. */ void afterCommand(client *c) { + /* Fire keyed post-notification jobs first, before any propagation. These + * fire after every command (including each sub-command inside MULTI/EXEC), + * regardless of execution-unit nesting, so a module can react to a key + * before the next sub-command observes it. */ + firePostKeyedNotificationJobs(); + /* Should be done before trackingHandlePendingKeyInvalidations so that we * reply to client before invalidating cache (makes more sense) */ postExecutionUnitOperations(); @@ -4743,12 +4769,12 @@ int processCommand(client *c) { /* If the server is paused, block the client until * the pause has ended. Replicas are never paused. */ - if (!(c->flags & CLIENT_SLAVE) && + if (!(c->flags & CLIENT_SLAVE) && ((isPausedActions(PAUSE_ACTION_CLIENT_ALL)) || ((isPausedActions(PAUSE_ACTION_CLIENT_WRITE)) && is_may_replicate_command))) { blockPostponeClient(c); - return C_OK; + return C_OK; } /* Exec the command */ @@ -6913,7 +6939,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { /* Hotkeys */ if (all_sections || (dictFind(section_dict,"hotkeys") != NULL)) { - if (sections++) info = sdscat(info,"\r\n"); + if (sections++) info = sdscat(info,"\r\n"); info = sdscatprintf(info, "# Hotkeys\r\n"); if (server.hotkeys) { diff --git a/src/server.h b/src/server.h index 9318eec68..ce26c91fe 100644 --- a/src/server.h +++ b/src/server.h @@ -2066,6 +2066,12 @@ struct redisServer { int execution_nesting; /* Execution nesting level. * e.g. call(), async module stuff (timers, events, etc.), * cron stuff (active expire, eviction) */ + int firing_keyed_post_notif_jobs; /* Re-entrance guard while + * firePostKeyedNotificationJobs is draining. */ + int in_keyspace_notification; /* >0 while inside a moduleNotifyKeyspaceEvent + * dispatch. Defines the scope from which + * RM_AddPostNotificationJobForKey may be called; + * a counter so nested notifications nest cleanly. */ rax *clients_index; /* Active clients dictionary by client ID. */ uint32_t paused_actions; /* Bitmask of actions that are currently paused */ list *postponed_clients; /* List of postponed clients */ @@ -3123,6 +3129,7 @@ int moduleTryAcquireGIL(void); void moduleReleaseGIL(void); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid, robj **subkeys, int count); void firePostExecutionUnitJobs(void); +void firePostKeyedNotificationJobs(void); void moduleCallCommandFilters(client *c); void modulePostExecutionUnitOperations(void); void ModuleForkDoneHandler(int exitcode, int bysignal); diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 0659497cf..a20d9530a 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -86,6 +86,7 @@ TEST_MODULES = \ configaccess.so \ test_keymeta.so \ keymeta_notify.so \ + postnotifications_perkey_metadata.so \ atomicslotmigration.so .PHONY: all diff --git a/tests/modules/postnotifications.c b/tests/modules/postnotifications.c index 96fb85918..4f8a5727b 100644 --- a/tests/modules/postnotifications.c +++ b/tests/modules/postnotifications.c @@ -10,20 +10,13 @@ * GNU Affero General Public License v3 (AGPLv3). */ -/* This module allow to verify 'RedisModule_AddPostNotificationJob' by registering to 3 - * key space event: - * * STRINGS - the module register to all strings notifications and set post notification job - * that increase a counter indicating how many times the string key was changed. - * In addition, it increase another counter that counts the total changes that - * was made on all strings keys. - * * EXPIRED - the module register to expired event and set post notification job that that - * counts the total number of expired events. - * * EVICTED - the module register to evicted event and set post notification job that that - * counts the total number of evicted events. - * - * In addition, the module register a new command, 'postnotification.async_set', that performs a set - * command from a background thread. This allows to check the 'RedisModule_AddPostNotificationJob' on - * notifications that was triggered on a background thread. */ +/* This module supports both the regular post-notification API + * (RedisModule_AddPostNotificationJob) and the per-key API + * (RedisModule_AddPostNotificationJobForKey). A load arg — + * "regular" or "perkey" — selects which API the keyspace handlers register + * against (defaults to "regular" if omitted). The keyspace handlers and post-job + * side effects are otherwise unchanged: only the registration call differs + * between modes. */ #define _BSD_SOURCE #define _DEFAULT_SOURCE /* For usleep */ @@ -34,6 +27,9 @@ #include #include +enum api_mode { MODE_REGULAR, MODE_PERKEY }; +static int g_api_mode = MODE_REGULAR; + static void KeySpace_PostNotificationStringFreePD(void *pd) { RedisModule_FreeString(NULL, pd); } @@ -49,13 +45,42 @@ static void KeySpace_PostNotificationString(RedisModuleCtx *ctx, void *pd) { RedisModule_FreeCallReply(rep); } +/* Per-key-API trampolines: the per-key API's callback takes an extra `key` + * argument; we ignore it and delegate to the regular-API callback so the + * post-job effect stays identical across modes. */ +static void KeySpace_PostNotificationStringPerKey(RedisModuleCtx *ctx, RedisModuleString *key, void *pd) { + REDISMODULE_NOT_USED(key); + KeySpace_PostNotificationString(ctx, pd); +} + +static void KeySpace_PostNotificationReadKeyPerKey(RedisModuleCtx *ctx, RedisModuleString *key, void *pd) { + REDISMODULE_NOT_USED(key); + KeySpace_PostNotificationReadKey(ctx, pd); +} + +/* Register a post-notification job through the API selected by g_api_mode. */ +static int RegisterIncrJob(RedisModuleCtx *ctx, RedisModuleString *trigger_key, RedisModuleString *target) { + if (g_api_mode == MODE_REGULAR) { + return RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, target, KeySpace_PostNotificationStringFreePD); + } else { + return RedisModule_AddPostNotificationJobForKey(ctx, KeySpace_PostNotificationStringPerKey, trigger_key, target, KeySpace_PostNotificationStringFreePD); + } +} + +static int RegisterGetJob(RedisModuleCtx *ctx, RedisModuleString *trigger_key, RedisModuleString *target) { + if (g_api_mode == MODE_REGULAR) { + return RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationReadKey, target, KeySpace_PostNotificationStringFreePD); + } else { + return RedisModule_AddPostNotificationJobForKey(ctx, KeySpace_PostNotificationReadKeyPerKey, trigger_key, target, KeySpace_PostNotificationStringFreePD); + } +} + static int KeySpace_NotificationExpired(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){ REDISMODULE_NOT_USED(type); REDISMODULE_NOT_USED(event); - REDISMODULE_NOT_USED(key); RedisModuleString *new_key = RedisModule_CreateString(NULL, "expired", 7); - int res = RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD); + int res = RegisterIncrJob(ctx, key, new_key); if (res == REDISMODULE_ERR) KeySpace_PostNotificationStringFreePD(new_key); return REDISMODULE_OK; } @@ -76,7 +101,7 @@ static int KeySpace_NotificationEvicted(RedisModuleCtx *ctx, int type, const cha } RedisModuleString *new_key = RedisModule_CreateString(NULL, "evicted", 7); - int res = RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD); + int res = RegisterIncrJob(ctx, key, new_key); if (res == REDISMODULE_ERR) KeySpace_PostNotificationStringFreePD(new_key); return REDISMODULE_OK; } @@ -103,7 +128,7 @@ static int KeySpace_NotificationString(RedisModuleCtx *ctx, int type, const char new_key = RedisModule_CreateStringPrintf(NULL, "string_changed{%s}", key_str); } - int res = RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD); + int res = RegisterIncrJob(ctx, key, new_key); if (res == REDISMODULE_ERR) KeySpace_PostNotificationStringFreePD(new_key); return REDISMODULE_OK; } @@ -120,7 +145,7 @@ static int KeySpace_LazyExpireInsidePostNotificationJob(RedisModuleCtx *ctx, int } RedisModuleString *new_key = RedisModule_CreateString(NULL, key_str + 5, strlen(key_str) - 5);; - int res = RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationReadKey, new_key, KeySpace_PostNotificationStringFreePD); + int res = RegisterGetJob(ctx, key, new_key); if (res == REDISMODULE_ERR) KeySpace_PostNotificationStringFreePD(new_key); return REDISMODULE_OK; } @@ -145,30 +170,40 @@ static int KeySpace_NestedNotification(RedisModuleCtx *ctx, int type, const char return REDISMODULE_OK; } +typedef struct AsyncSetArgs { + RedisModuleBlockedClient *bc; + RedisModuleString *key; +} AsyncSetArgs; + static void *KeySpace_PostNotificationsAsyncSetInner(void *arg) { - RedisModuleBlockedClient *bc = arg; - RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc); + AsyncSetArgs *args = arg; + RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(args->bc); RedisModule_ThreadSafeContextLock(ctx); - RedisModuleCallReply* rep = RedisModule_Call(ctx, "set", "!cc", "string_x", "1"); + RedisModuleCallReply* rep = RedisModule_Call(ctx, "set", "!sc", args->key, "1"); RedisModule_ThreadSafeContextUnlock(ctx); RedisModule_ReplyWithCallReply(ctx, rep); RedisModule_FreeCallReply(rep); - RedisModule_UnblockClient(bc, NULL); + RedisModule_UnblockClient(args->bc, NULL); RedisModule_FreeThreadSafeContext(ctx); + RedisModule_FreeString(NULL, args->key); + RedisModule_Free(args); return NULL; } static int KeySpace_PostNotificationsAsyncSet(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - REDISMODULE_NOT_USED(argv); - if (argc != 1) + if (argc != 2) return RedisModule_WrongArity(ctx); - pthread_t tid; - RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0); + AsyncSetArgs *args = RedisModule_Alloc(sizeof(*args)); + args->bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0); + args->key = RedisModule_HoldString(NULL, argv[1]); - if (pthread_create(&tid,NULL,KeySpace_PostNotificationsAsyncSetInner,bc) != 0) { - RedisModule_AbortBlock(bc); + pthread_t tid; + if (pthread_create(&tid,NULL,KeySpace_PostNotificationsAsyncSetInner,args) != 0) { + RedisModule_AbortBlock(args->bc); + RedisModule_FreeString(NULL, args->key); + RedisModule_Free(args); return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread"); } pthread_detach(tid); @@ -225,12 +260,96 @@ static void KeySpace_ServerEventCallback(RedisModuleCtx *ctx, RedisModuleEvent e if (res == REDISMODULE_ERR) KeySpace_ServerEventPostNotificationFree(pn_ctx); } +/* Per-key-only fixtures: behaviors with no regular-API equivalent. Registered + * only when the module is loaded in "perkey" mode. */ + +static void KeySpace_PostNotificationBatchedKey(RedisModuleCtx *ctx, RedisModuleString *key, void *pd) { + REDISMODULE_NOT_USED(pd); + RedisModuleCallReply *rep = RedisModule_Call(ctx, "lpush", "!cs", "batched_keys", key); + if (rep) RedisModule_FreeCallReply(rep); +} + +static int KeySpace_NotificationBatched(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { + REDISMODULE_NOT_USED(type); + REDISMODULE_NOT_USED(event); + + const char *key_str = RedisModule_StringPtrLen(key, NULL); + if (strncmp(key_str, "batched_", 8) != 0) return REDISMODULE_OK; + if (strcmp(key_str, "batched_keys") == 0) return REDISMODULE_OK; /* skip our sink list */ + + RedisModule_AddPostNotificationJobForKey(ctx, KeySpace_PostNotificationBatchedKey, key, NULL, NULL); + return REDISMODULE_OK; +} + +static void KeySpace_PostNotificationHashKey(RedisModuleCtx *ctx, RedisModuleString *key, void *pd) { + REDISMODULE_NOT_USED(pd); + RedisModuleCallReply *rep = RedisModule_Call(ctx, "lpush", "!cs", "hash_keys", key); + if (rep) RedisModule_FreeCallReply(rep); +} + +static int KeySpace_NotificationHash(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { + REDISMODULE_NOT_USED(type); + REDISMODULE_NOT_USED(event); + + const char *key_str = RedisModule_StringPtrLen(key, NULL); + if (strncmp(key_str, "hash_", 5) != 0) return REDISMODULE_OK; + + RedisModule_AddPostNotificationJobForKey(ctx, KeySpace_PostNotificationHashKey, key, NULL, NULL); + return REDISMODULE_OK; +} + +static int reentrance_in_outer_callback = 0; + +static void KeySpace_PostNotificationReentranceProbe(RedisModuleCtx *ctx, RedisModuleString *key, void *pd) { + REDISMODULE_NOT_USED(pd); + const char *key_str = RedisModule_StringPtrLen(key, NULL); + RedisModuleCallReply *rep; + + if (strcmp(key_str, "reentrant_outer") == 0) { + reentrance_in_outer_callback = 1; + rep = RedisModule_Call(ctx, "set", "!cc", "reentrant_inner", "1"); + if (rep) RedisModule_FreeCallReply(rep); + reentrance_in_outer_callback = 0; + rep = RedisModule_Call(ctx, "lpush", "!cc", "reentrance_log", "outer_done"); + if (rep) RedisModule_FreeCallReply(rep); + } else if (strcmp(key_str, "reentrant_inner") == 0) { + const char *marker = reentrance_in_outer_callback ? "REENTRANCE_DETECTED" : "inner_after_outer"; + rep = RedisModule_Call(ctx, "lpush", "!cc", "reentrance_log", marker); + if (rep) RedisModule_FreeCallReply(rep); + } +} + +static int KeySpace_NotificationReentrance(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { + REDISMODULE_NOT_USED(type); + REDISMODULE_NOT_USED(event); + + const char *key_str = RedisModule_StringPtrLen(key, NULL); + if (strncmp(key_str, "reentrant_", 10) != 0) return REDISMODULE_OK; + + RedisModule_AddPostNotificationJobForKey(ctx, KeySpace_PostNotificationReentranceProbe, key, NULL, NULL); + return REDISMODULE_OK; +} + +static void KeySpace_PostNotificationMissKey(RedisModuleCtx *ctx, RedisModuleString *key, void *pd) { + REDISMODULE_NOT_USED(pd); + RedisModuleCallReply *rep = RedisModule_Call(ctx, "lpush", "!cs", "mget_misses", key); + if (rep) RedisModule_FreeCallReply(rep); +} + +static int KeySpace_NotificationMiss(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { + REDISMODULE_NOT_USED(type); + REDISMODULE_NOT_USED(event); + + const char *key_str = RedisModule_StringPtrLen(key, NULL); + if (strncmp(key_str, "miss_", 5) != 0) return REDISMODULE_OK; + + RedisModule_AddPostNotificationJobForKey(ctx, KeySpace_PostNotificationMissKey, key, NULL, NULL); + return REDISMODULE_OK; +} + /* This function must be present on each Redis module. It is used in order to * register the commands into the Redis server. */ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - REDISMODULE_NOT_USED(argv); - REDISMODULE_NOT_USED(argc); - if (RedisModule_Init(ctx,"postnotifications",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR){ return REDISMODULE_ERR; } @@ -240,10 +359,14 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) } int with_key_events = 0; - if (argc >= 1) { - const char *arg = RedisModule_StringPtrLen(argv[0], 0); + for (int i = 0; i < argc; i++) { + const char *arg = RedisModule_StringPtrLen(argv[i], 0); if (strcmp(arg, "with_key_events") == 0) { with_key_events = 1; + } else if (strcmp(arg, "perkey") == 0) { + g_api_mode = MODE_PERKEY; + } else if (strcmp(arg, "regular") == 0) { + g_api_mode = MODE_REGULAR; } } @@ -275,8 +398,24 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) } } + /* Per-key-only fixtures (behaviors with no regular API equivalent). */ + if (g_api_mode == MODE_PERKEY) { + if (RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationBatched) != REDISMODULE_OK) { + return REDISMODULE_ERR; + } + if (RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_HASH, KeySpace_NotificationHash) != REDISMODULE_OK) { + return REDISMODULE_ERR; + } + if (RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationReentrance) != REDISMODULE_OK) { + return REDISMODULE_ERR; + } + if (RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_MISS, KeySpace_NotificationMiss) != REDISMODULE_OK) { + return REDISMODULE_ERR; + } + } + if (RedisModule_CreateCommand(ctx, "postnotification.async_set", KeySpace_PostNotificationsAsyncSet, - "write", 0, 0, 0) == REDISMODULE_ERR){ + "write", 1, 1, 1) == REDISMODULE_ERR){ return REDISMODULE_ERR; } diff --git a/tests/modules/postnotifications_perkey_metadata.c b/tests/modules/postnotifications_perkey_metadata.c new file mode 100644 index 000000000..41aa4b6fa --- /dev/null +++ b/tests/modules/postnotifications_perkey_metadata.c @@ -0,0 +1,175 @@ +/* Test module for RM_AddPostNotificationJobForKey firing across phases. + * + * Used by tests/unit/moduleapi/postnotifications_perkey_aof_repl.tcl. The + * module exercises the contract that a per-key post-notification callback + * MUST only touch non-replicated, non-AOF-persisted state — here, module + * key metadata. This lets the same callback fire on a master, on a replica + * receiving master-propagated commands, and during AOF replay, with each + * instance maintaining its per-key state independently. + * + * Subscribes to KSN events for STRING / HASH / GENERIC / EXPIRED / EVICTED. + * For each notification the KSN handler enqueues a per-key job; the job + * later attaches metadata via RM_SetKeyMeta. A module-internal counter + * (NOT a Redis key — to avoid AOF / replication pollution) records how + * many times the per-key job actually ran. + * + * Commands: + * pkmeta.getmeta - Return the metadata string, or nil. + * pkmeta.firecount - Return the module-internal fire counter. + * pkmeta.reset - Zero the fire counter. + * pkmeta.try_outside - Call RM_AddPostNotificationJobForKey from + * outside a KSN handler; reply OK/ERR for the + * negative-coverage test. + * + * Copyright (c) 2006-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +#include "redismodule.h" +#include +#include + +static RedisModuleKeyMetaClassId meta_class_id = -1; + +/* Module-internal counter — kept out of the keyspace on purpose so it is + * neither replicated nor AOF-persisted. Tests assert on it to confirm the + * per-key callback actually ran. */ +static long long fire_count = 0; + +static void MetaFreeCallback(const char *keyname, uint64_t meta) { + REDISMODULE_NOT_USED(keyname); + if (meta != 0) free((char *)meta); +} + +/* Per-key post-notification job: attaches a "notified" string as metadata. + * Runs at the tail of the originating command (or sub-command for + * MULTI/EXEC), outside the KSN handler stack. */ +static void PerKeyMetadataJob(RedisModuleCtx *ctx, RedisModuleString *key, void *pd) { + REDISMODULE_NOT_USED(pd); + if (meta_class_id < 0) return; + + RedisModuleKey *k = RedisModule_OpenKey(ctx, key, REDISMODULE_WRITE); + if (!k) return; + if (RedisModule_KeyType(k) == REDISMODULE_KEYTYPE_EMPTY) { + RedisModule_CloseKey(k); + return; + } + + uint64_t existing = 0; + if (RedisModule_GetKeyMeta(meta_class_id, k, &existing) == REDISMODULE_OK && + existing != 0) { + free((char *)existing); + } + + char *new_str = strdup("notified"); + if (RedisModule_SetKeyMeta(meta_class_id, k, (uint64_t)new_str) == REDISMODULE_OK) { + fire_count++; + } else { + free(new_str); + } + RedisModule_CloseKey(k); +} + +/* KSN handler: defers SetKeyMeta into a per-key job rather than calling it + * inline. This is the path under test — the per-key API is what makes the + * write happen at a safe firing point, including during AOF replay and on + * a replica receiving propagated commands. */ +static int NotifyCallback(RedisModuleCtx *ctx, int type, const char *event, + RedisModuleString *key) { + REDISMODULE_NOT_USED(type); + REDISMODULE_NOT_USED(event); + RedisModule_AddPostNotificationJobForKey(ctx, PerKeyMetadataJob, key, NULL, NULL); + return REDISMODULE_OK; +} + +static int GetMetaCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) return RedisModule_WrongArity(ctx); + RedisModuleKey *k = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); + if (!k || RedisModule_KeyType(k) == REDISMODULE_KEYTYPE_EMPTY) { + if (k) RedisModule_CloseKey(k); + return RedisModule_ReplyWithNull(ctx); + } + uint64_t meta = 0; + if (RedisModule_GetKeyMeta(meta_class_id, k, &meta) == REDISMODULE_OK && meta != 0) { + RedisModule_ReplyWithCString(ctx, (const char *)meta); + } else { + RedisModule_ReplyWithNull(ctx); + } + RedisModule_CloseKey(k); + return REDISMODULE_OK; +} + +static int FireCountCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithLongLong(ctx, fire_count); +} + +static int ResetCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + fire_count = 0; + return RedisModule_ReplyWithSimpleString(ctx, "OK"); +} + +/* Calls RM_AddPostNotificationJobForKey from outside a KSN handler — must + * return REDISMODULE_ERR. Used by the negative coverage test. */ +static int TryOutsideCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) return RedisModule_WrongArity(ctx); + int rc = RedisModule_AddPostNotificationJobForKey(ctx, PerKeyMetadataJob, + argv[1], NULL, NULL); + if (rc == REDISMODULE_OK) { + return RedisModule_ReplyWithSimpleString(ctx, "OK"); + } + return RedisModule_ReplyWithError(ctx, "ERR registration refused"); +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + if (RedisModule_Init(ctx, "pkmeta", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + RedisModuleKeyMetaClassConfig config = {0}; + config.version = REDISMODULE_KEY_META_VERSION; + config.flags = (1 << REDISMODULE_META_ALLOW_IGNORE); + config.reset_value = (uint64_t)NULL; + config.free = MetaFreeCallback; + meta_class_id = RedisModule_CreateKeyMetaClass(ctx, "pkmc", 1, &config); + if (meta_class_id < 0) return REDISMODULE_ERR; + + int notifyFlags = REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_HASH | + REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_EXPIRED | + REDISMODULE_NOTIFY_EVICTED; + if (RedisModule_SubscribeToKeyspaceEvents(ctx, notifyFlags, NotifyCallback) != REDISMODULE_OK) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "pkmeta.getmeta", GetMetaCommand, + "readonly", 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "pkmeta.firecount", FireCountCommand, + "readonly", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "pkmeta.reset", ResetCommand, + "readonly", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "pkmeta.try_outside", TryOutsideCommand, + "readonly", 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} + +int RedisModule_OnUnload(RedisModuleCtx *ctx) { + REDISMODULE_NOT_USED(ctx); + if (meta_class_id >= 0) { + RedisModule_ReleaseKeyMetaClass(meta_class_id); + meta_class_id = -1; + } + return REDISMODULE_OK; +} diff --git a/tests/unit/moduleapi/postnotifications.tcl b/tests/unit/moduleapi/postnotifications.tcl index 31a466941..012ce3479 100644 --- a/tests/unit/moduleapi/postnotifications.tcl +++ b/tests/unit/moduleapi/postnotifications.tcl @@ -1,174 +1,344 @@ set testmodule [file normalize tests/modules/postnotifications.so] +foreach api {regular perkey} { + tags "modules external:skip" { + start_server {} { + r module load $testmodule $api + + test "Test write on post notification callback ($api API)" { + set repl [attach_to_replication_stream] + + r set string_x 1 + assert_equal {1} [r get string_changed{string_x}] + assert_equal {1} [r get string_total] + + r set string_x 2 + assert_equal {2} [r get string_changed{string_x}] + assert_equal {2} [r get string_total] + + assert_replication_stream $repl { + {multi} + {select *} + {set string_x 1} + {incr string_changed{string_x}} + {incr string_total} + {exec} + {multi} + {set string_x 2} + {incr string_changed{string_x}} + {incr string_total} + {exec} + } + close_replication_stream $repl + } + } + } +} + +foreach api {regular perkey} { + tags "modules external:skip" { + start_server {} { + r module load $testmodule $api + + test "Test write on post notification callback from module thread ($api API)" { + r flushall + set repl [attach_to_replication_stream] + + assert_equal {OK} [r postnotification.async_set string_x] + assert_equal {1} [r get string_changed{string_x}] + assert_equal {1} [r get string_total] + + assert_replication_stream $repl { + {multi} + {select *} + {set string_x 1} + {incr string_changed{string_x}} + {incr string_total} + {exec} + } + close_replication_stream $repl + } + } + } +} + +foreach api {regular perkey} { + tags "modules external:skip" { + start_server {} { + r module load $testmodule $api with_key_events + + test "Test active expire ($api API)" { + r flushall + set repl [attach_to_replication_stream] + + r set x 1 + r pexpire x 10 + + wait_for_condition 100 50 { + [r keys expired] == {expired} + } else { + puts [r keys *] + fail "Failed waiting for x to expire" + } + + # {lpush before_expired x} comes from the RedisModuleEvent_Key + # server event (always uses the regular post-notif queue). + # {incr expired} comes from the keyspace handler (regular or + # per-key queue depending on $api). Both APIs propagate the + # same stream: postExecutionUnitOperations drains regular + # before per-key, so the ordering between the two side-effects + # matches their in-process registration order. + assert_replication_stream $repl { + {select *} + {set x 1} + {pexpireat x *} + {multi} + {del x} + {lpush before_expired x} + {incr expired} + {exec} + } + close_replication_stream $repl + } + } + } +} + +foreach api {regular perkey} { + tags "modules external:skip" { + start_server {} { + r module load $testmodule $api + + test "Test lazy expire ($api API)" { + r flushall + r DEBUG SET-ACTIVE-EXPIRE 0 + set repl [attach_to_replication_stream] + + r set x 1 + r pexpire x 1 + after 10 + assert_equal {} [r get x] + + assert_replication_stream $repl { + {select *} + {set x 1} + {pexpireat x *} + {multi} + {del x} + {incr expired} + {exec} + } + close_replication_stream $repl + r DEBUG SET-ACTIVE-EXPIRE 1 + } {OK} {needs:debug} + } + } +} + +foreach api {regular perkey} { + tags "modules external:skip" { + start_server {} { + r module load $testmodule $api + + test "Test lazy expire inside post job notification ($api API)" { + r flushall + r DEBUG SET-ACTIVE-EXPIRE 0 + set repl [attach_to_replication_stream] + + r set x 1 + r pexpire x 1 + after 10 + assert_equal {OK} [r set read_x 1] + + assert_replication_stream $repl { + {select *} + {set x 1} + {pexpireat x *} + {multi} + {set read_x 1} + {del x} + {incr expired} + {exec} + } + close_replication_stream $repl + r DEBUG SET-ACTIVE-EXPIRE 1 + } {OK} {needs:debug} + } + } +} + +foreach api {regular perkey} { + tags "modules external:skip" { + start_server {} { + r module load $testmodule $api with_key_events + + test "Test nested keyspace notification ($api API)" { + r flushall + set repl [attach_to_replication_stream] + + assert_equal {OK} [r set write_sync_write_sync_x 1] + + assert_replication_stream $repl { + {multi} + {select *} + {set x 1} + {set write_sync_x 1} + {set write_sync_write_sync_x 1} + {exec} + } + close_replication_stream $repl + } + } + } +} + +foreach api {regular perkey} { + tags "modules external:skip" { + start_server {} { + r module load $testmodule $api with_key_events + + test "Test eviction ($api API)" { + r flushall + set repl [attach_to_replication_stream] + r set x 1 + r config set maxmemory-policy allkeys-random + r config set maxmemory 1 + + assert_error {OOM *} {r set y 1} + + # {lpush before_evicted x} comes from the + # RedisModuleEvent_Key/before_evicted server event (always uses + # the regular post-notif queue). {incr evicted} comes from the + # keyspace handler (regular or per-key queue depending on + # $api). Both APIs propagate the same stream: regular drains + # before per-key inside postExecutionUnitOperations. + assert_replication_stream $repl { + {select *} + {set x 1} + {multi} + {del x} + {lpush before_evicted x} + {incr evicted} + {exec} + } + close_replication_stream $repl + } {} {needs:config-maxmemory} + } + } +} + +# Per-key-only tests (no regular-API equivalent). tags "modules external:skip" { start_server {} { - r module load $testmodule with_key_events + r module load $testmodule perkey - test {Test write on post notification callback} { + test {Per-key post notification job fires between MULTI/EXEC sub-commands} { + r flushall set repl [attach_to_replication_stream] - r set string_x 1 - assert_equal {1} [r get string_changed{string_x}] - assert_equal {1} [r get string_total] + r multi + r set batched_a 1 + r set batched_b 2 + r set batched_c 3 + r exec - r set string_x 2 - assert_equal {2} [r get string_changed{string_x}] - assert_equal {2} [r get string_total] + assert_equal {batched_c batched_b batched_a} [r lrange batched_keys 0 -1] - # the {lpush before_overwritten string_x} is a post notification job registered when 'string_x' was overwritten assert_replication_stream $repl { {multi} {select *} - {set string_x 1} - {incr string_changed{string_x}} - {incr string_total} - {exec} - {multi} - {set string_x 2} - {lpush before_overwritten string_x} - {incr string_changed{string_x}} - {incr string_total} + {set batched_a 1} + {lpush batched_keys batched_a} + {set batched_b 2} + {lpush batched_keys batched_b} + {set batched_c 3} + {lpush batched_keys batched_c} {exec} } close_replication_stream $repl } - test {Test write on post notification callback from module thread} { + test {Per-key callback does not re-enter firing while a nested RM_Call is in flight} { + r flushall + + r set reentrant_outer 1 + + set log [r lrange reentrance_log 0 -1] + assert_equal -1 [lsearch $log "REENTRANCE_DETECTED"] + assert_equal {inner_after_outer outer_done} $log + } + + test {Per-key post notification job fires per affected key on multi-key commands} { r flushall set repl [attach_to_replication_stream] - assert_equal {OK} [r postnotification.async_set] - assert_equal {1} [r get string_changed{string_x}] - assert_equal {1} [r get string_total] + # MSET emits one notifyKeyspaceEvent per key. Each dispatch sets + # server.in_keyspace_notification, so the keyspace handler can + # register one per-key job per affected key. All three jobs fire + # at the tail of MSET's call() and propagate inside the same + # multi/exec the propagation buffer flushes. + r mset batched_a 1 batched_b 2 batched_c 3 + assert_equal {batched_c batched_b batched_a} [r lrange batched_keys 0 -1] assert_replication_stream $repl { {multi} {select *} - {set string_x 1} - {incr string_changed{string_x}} - {incr string_total} + {mset batched_a 1 batched_b 2 batched_c 3} + {lpush batched_keys batched_a} + {lpush batched_keys batched_b} + {lpush batched_keys batched_c} {exec} } close_replication_stream $repl } - test {Test active expire} { + test {Per-key post notification job fires per missing key on MGET (multi-key read)} { r flushall set repl [attach_to_replication_stream] - r set x 1 - r pexpire x 10 + # MGET emits one NOTIFY_KEY_MISS notification per missing key. Each + # dispatch sets server.in_keyspace_notification, so the per-key + # handler registers one job per miss. The jobs drain at the tail of + # MGET's call(), propagating as a multi/exec after the read. + assert_equal {{} {} {}} [r mget miss_a miss_b miss_c] + assert_equal {miss_c miss_b miss_a} [r lrange mget_misses 0 -1] - wait_for_condition 100 50 { - [r keys expired] == {expired} - } else { - puts [r keys *] - fail "Failed waiting for x to expired" - } - - # the {lpush before_expired x} is a post notification job registered before 'x' got expired assert_replication_stream $repl { - {select *} - {set x 1} - {pexpireat x *} {multi} - {del x} - {lpush before_expired x} - {incr expired} + {select *} + {lpush mget_misses miss_a} + {lpush mget_misses miss_b} + {lpush mget_misses miss_c} {exec} } close_replication_stream $repl } - test {Test lazy expire} { - r flushall - r DEBUG SET-ACTIVE-EXPIRE 0 - set repl [attach_to_replication_stream] - - r set x 1 - r pexpire x 1 - after 10 - assert_equal {} [r get x] - - # the {lpush before_expired x} is a post notification job registered before 'x' got expired - assert_replication_stream $repl { - {select *} - {set x 1} - {pexpireat x *} - {multi} - {del x} - {lpush before_expired x} - {incr expired} - {exec} - } - close_replication_stream $repl - r DEBUG SET-ACTIVE-EXPIRE 1 - } {OK} {needs:debug} - - test {Test lazy expire inside post job notification} { - r flushall - r DEBUG SET-ACTIVE-EXPIRE 0 - set repl [attach_to_replication_stream] - - r set x 1 - r pexpire x 1 - after 10 - assert_equal {OK} [r set read_x 1] - - # the {lpush before_expired x} is a post notification job registered before 'x' got expired - assert_replication_stream $repl { - {select *} - {set x 1} - {pexpireat x *} - {multi} - {set read_x 1} - {del x} - {lpush before_expired x} - {incr expired} - {exec} - } - close_replication_stream $repl - r DEBUG SET-ACTIVE-EXPIRE 1 - } {OK} {needs:debug} - - test {Test nested keyspace notification} { + test {Per-key post notification job fires between HSET and HEXPIRE on the same hash inside MULTI/EXEC} { r flushall set repl [attach_to_replication_stream] - assert_equal {OK} [r set write_sync_write_sync_x 1] + r multi + r hset hash_h f1 v1 + r hset hash_h f2 v2 + r hexpire hash_h 100 FIELDS 1 f1 + r exec + + assert_equal {hash_h hash_h hash_h} [r lrange hash_keys 0 -1] assert_replication_stream $repl { {multi} {select *} - {set x 1} - {set write_sync_x 1} - {set write_sync_write_sync_x 1} + {hset hash_h f1 v1} + {lpush hash_keys hash_h} + {hset hash_h f2 v2} + {lpush hash_keys hash_h} + {hpexpireat hash_h * FIELDS 1 f1} + {lpush hash_keys hash_h} {exec} } close_replication_stream $repl } - - test {Test eviction} { - r flushall - set repl [attach_to_replication_stream] - r set x 1 - r config set maxmemory-policy allkeys-random - r config set maxmemory 1 - - assert_error {OOM *} {r set y 1} - - # the {lpush before_evicted x} is a post notification job registered before 'x' got evicted - assert_replication_stream $repl { - {select *} - {set x 1} - {multi} - {del x} - {lpush before_evicted x} - {incr evicted} - {exec} - } - close_replication_stream $repl - } {} {needs:config-maxmemory} } } @@ -207,11 +377,6 @@ tags "modules external:skip" { {incr string_changed{string_x}} {incr string_total} {exec} - {multi} - {set string1_x 1} - {incr string_changed{string1_x}} - {incr string_total} - {exec} } close_replication_stream $repl } diff --git a/tests/unit/moduleapi/postnotifications_perkey_aof_repl.tcl b/tests/unit/moduleapi/postnotifications_perkey_aof_repl.tcl new file mode 100644 index 000000000..858066ae1 --- /dev/null +++ b/tests/unit/moduleapi/postnotifications_perkey_aof_repl.tcl @@ -0,0 +1,272 @@ +set testmodule [file normalize tests/modules/postnotifications_perkey_metadata.so] + +# AOF replay on a standalone master. +# +# Asserts that per-key post-notification jobs fire during AOF replay with the +# same pattern as during normal execution: once per single command, once per +# MULTI/EXEC sub-command. The per-key callback attaches module key metadata, +# which is NOT in the AOF — so its presence after reload is direct evidence +# that the callback re-ran during replay. The module-internal fire counter +# (read via `pkmeta.firecount`) is the load-bearing assertion. +tags "modules aof external:skip" { + foreach aofload_type {debug_cmd startup} { + test "perkey-aof: single command rebuilds metadata via AOF reload (load=$aofload_type)" { + start_server [list overrides [list loadmodule "$testmodule"]] { + r config set appendonly yes + r config set auto-aof-rewrite-percentage 0 + waitForBgrewriteaof r + + r hset h1 f v + assert_equal "notified" [r pkmeta.getmeta h1] + assert_equal 1 [r pkmeta.firecount] + + # Reset the counter so the post-reload count reflects only + # what the AOF replay path produced. + r pkmeta.reset + + if {$aofload_type == "debug_cmd"} { + r debug loadaof + } else { + r config rewrite + restart_server 0 true false + wait_done_loading r + } + + assert_equal "notified" [r pkmeta.getmeta h1] + assert_equal 1 [r pkmeta.firecount] + } + } + + test "perkey-aof: MULTI/EXEC fires once per sub-command during AOF reload (load=$aofload_type)" { + start_server [list overrides [list loadmodule "$testmodule"]] { + r config set appendonly yes + r config set auto-aof-rewrite-percentage 0 + waitForBgrewriteaof r + + r multi + r hset h1 f v + r hset h2 f v + r hset h3 f v + r exec + + assert_equal 3 [r pkmeta.firecount] + r pkmeta.reset + + if {$aofload_type == "debug_cmd"} { + r debug loadaof + } else { + r config rewrite + restart_server 0 true false + wait_done_loading r + } + + # Each sub-command's per-key drain must have fired during + # replay — three HSETs → three callback invocations. + assert_equal 3 [r pkmeta.firecount] + assert_equal "notified" [r pkmeta.getmeta h1] + assert_equal "notified" [r pkmeta.getmeta h2] + assert_equal "notified" [r pkmeta.getmeta h3] + } + } + + test "perkey-aof: HSET + HEXPIRE in MULTI/EXEC fires twice during AOF reload (load=$aofload_type)" { + start_server [list overrides [list loadmodule "$testmodule"]] { + r config set appendonly yes + r config set auto-aof-rewrite-percentage 0 + waitForBgrewriteaof r + + r multi + r hset h_hexp f v + r hexpire h_hexp 100 FIELDS 1 f + r exec + + assert_equal 2 [r pkmeta.firecount] + r pkmeta.reset + + if {$aofload_type == "debug_cmd"} { + r debug loadaof + } else { + r config rewrite + restart_server 0 true false + wait_done_loading r + } + + # HSET + HEXPIRE on the same key — two KSN events, two + # per-key job firings. This was the original motivating + # scenario (RED-197766). + assert_equal 2 [r pkmeta.firecount] + assert_equal "notified" [r pkmeta.getmeta h_hexp] + } + } + } +} + +# RDB load is intentionally outside the firing pattern. +# +# RDB load decodes keys directly without running commands, so no KSN fires, +# so per-key callbacks do not run. This test pins that design boundary: if +# anyone later "fixes" RDB load to fire KSN, this assertion will break and +# force the change to be considered explicitly. +tags "modules external:skip" { + test "perkey-rdb: RDB-only restart does NOT rebuild metadata (no KSN on RDB load)" { + start_server [list overrides [list loadmodule "$testmodule" appendonly no]] { + r hset h_rdb f v + assert_equal "notified" [r pkmeta.getmeta h_rdb] + assert_equal 1 [r pkmeta.firecount] + + r pkmeta.reset + r debug reload + + # RDB roundtrip: the key is back, but its metadata is not (the + # metadata class doesn't persist via rdb_save/rdb_load in this + # module), and the per-key job did NOT fire during load. + assert_equal "hash" [r type h_rdb] + assert_equal {} [r pkmeta.getmeta h_rdb] + assert_equal 0 [r pkmeta.firecount] + } + } {} {needs:debug} +} + +# AOF replay on a replica at startup. +# +# Exercises the carve-out in RM_AddPostNotificationJobForKey that permits +# registration during loading even when masterhost is set. Without that +# carve-out the per-key job would be refused on a replica's own AOF replay +# and metadata would not be rebuilt at startup. +tags "modules aof external:skip" { + test "perkey-aof-replica: AOF replay on a replica at startup rebuilds metadata" { + # Master is loaded with the module too so its propagation path is + # comparable to a real deployment. The point under test is the AOF + # replay step on the replica at restart, not the initial sync. + start_server [list overrides [list loadmodule "$testmodule"]] { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + start_server [list overrides [list \ + loadmodule $testmodule \ + appendonly yes \ + auto-aof-rewrite-percentage 0 \ + replicaof "$master_host $master_port"]] { + set replica [srv 0 client] + wait_for_sync $replica + # The replica boots with appendonly=yes and replicaof, so + # post-sync it kicks off a background AOF rewrite. Until that + # child finishes, propagated commands land in a temp incr + # file that `debug loadaof` won't see — wait it out before + # driving the write under test. + waitForBgrewriteaof $replica + + # Drive a write on the master; replica receives it via + # propagation and writes it to its own AOF. + $master hset h_repl f v + wait_for_ofs_sync $master $replica + + # Sanity: key did propagate + assert_equal 1 [$replica hexists h_repl f] + assert_equal "notified" [$replica pkmeta.getmeta h_repl] + $replica pkmeta.reset + + # Use debug loadaof to exercise the AOF replay path + # specifically on a configured replica (masterhost set, + # repl_slave_ro true, server.loading=1). A full restart + # would re-sync from master via RDB and wipe metadata — + # that is a separate code path. We deliberately do not + # rewrite the AOF here: rewriting converts the HSET into + # the RDB-encoded base AOF, and RDB load (preamble or + # otherwise) intentionally does not fire KSN. The + # incremental AOF — which is what propagated commands + # land in — is what the per-key drain runs against. + $replica debug loadaof + + # The AOF reload fires the per-key job on the replica; the + # callback runs with masterhost set, repl_slave_ro on, and + # server.loading == 1, which is exactly the carve-out. + assert_equal "notified" [$replica pkmeta.getmeta h_repl] + assert {[$replica pkmeta.firecount] >= 1} + } + } + } +} + +# Master → replica steady-state propagation. +# +# With the replica check dropped, per-key jobs fire on the replica too: +# both sides run the same KSN over the same command stream and maintain +# their per-key state independently. No metadata traffic on the wire. +tags "modules external:skip" { + test "perkey-repl: replica builds metadata from master-propagated single command" { + start_server [list overrides [list loadmodule "$testmodule"]] { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + start_server [list overrides [list loadmodule "$testmodule"]] { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + $replica replicaof $master_host $master_port + wait_for_sync $replica + + $master pkmeta.reset + $replica pkmeta.reset + + $master hset h_prop f v + wait_for_ofs_sync $master $replica + + # Both sides ran the per-key job locally — no metadata + # crossed the replication stream. + assert_equal "notified" [$master pkmeta.getmeta h_prop] + assert_equal "notified" [$replica pkmeta.getmeta h_prop] + assert_equal 1 [$master pkmeta.firecount] + assert_equal 1 [$replica pkmeta.firecount] + } + } + } + + test "perkey-repl: replica fires per sub-command for propagated MULTI/EXEC" { + start_server [list overrides [list loadmodule "$testmodule"]] { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + start_server [list overrides [list loadmodule "$testmodule"]] { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + $replica replicaof $master_host $master_port + wait_for_sync $replica + + $master pkmeta.reset + $replica pkmeta.reset + + $master multi + $master hset hp1 f v + $master hset hp2 f v + $master hset hp3 f v + $master exec + wait_for_ofs_sync $master $replica + + assert_equal 3 [$master pkmeta.firecount] + assert_equal 3 [$replica pkmeta.firecount] + foreach key {hp1 hp2 hp3} { + assert_equal "notified" [$master pkmeta.getmeta $key] + assert_equal "notified" [$replica pkmeta.getmeta $key] + } + } + } + } +} + +# Negative coverage: API misuse outside a KSN handler. +# +# The only remaining runtime guard. Calling RM_AddPostNotificationJobForKey +# from a regular module command (not a KSN handler) must return +# REDISMODULE_ERR with a LL_WARNING log entry. +tags "modules external:skip" { + test "perkey-misuse: registration refused outside a KSN handler" { + start_server [list overrides [list loadmodule "$testmodule"]] { + assert_error {ERR registration refused*} {r pkmeta.try_outside any_key} + } + } +}