diff --git a/src/module.c b/src/module.c index 9843e6cccb..0d2c7f3f86 100644 --- a/src/module.c +++ b/src/module.c @@ -308,6 +308,7 @@ typedef void (*RedisModuleNotificationWithSubkeysFunc)(RedisModuleCtx *ctx, int /* Function pointer type for post jobs */ typedef void (*RedisModulePostNotificationJobFunc) (RedisModuleCtx *ctx, void *pd); +typedef void (*RedisModulePostNotificationJobPerKeyFunc) (RedisModuleCtx *ctx, RedisModuleString *key, void *pd); /* Keyspace notification subscriber information. * See RM_SubscribeToKeyspaceEvents() for more information. */ @@ -327,15 +328,44 @@ typedef struct RedisModuleKeyspaceSubscriber { int active; } RedisModuleKeyspaceSubscriber; -typedef struct RedisModulePostExecUnitJob { - /* The module subscribed to the event */ - RedisModule *module; - RedisModulePostNotificationJobFunc callback; +typedef enum { + POST_EXEC_UNIT_JOB_SINGLE = 0, + POST_EXEC_UNIT_JOB_KEYED = 1, +} PostExecUnitJobType; + +/* Per-key entry inside a keyed PostExecUnit job. The keyed job fans out and + * invokes its callback once per entry, in submission order. */ +typedef struct RedisModulePostExecUnitKeyedEntry { + RedisModuleString *key; /* Owned reference; freed after the callback runs. */ void *pd; void (*free_pd)(void*); +} RedisModulePostExecUnitKeyedEntry; + +typedef struct RedisModulePostExecUnitJob { + PostExecUnitJobType type; + /* The module subscribed to the event */ + RedisModule *module; int dbid; + union { + struct { + RedisModulePostNotificationJobFunc callback; + void *pd; + void (*free_pd)(void*); + } single; + struct { + RedisModulePostNotificationJobPerKeyFunc callback; + list *entries; /* list of RedisModulePostExecUnitKeyedEntry* */ + } keyed; + } u; } RedisModulePostExecUnitJob; +/* Index key used to coalesce keyed post-exec-unit jobs by (module, callback) + * while the current execution unit is still in progress. */ +typedef struct { + RedisModule *module; + RedisModulePostNotificationJobPerKeyFunc callback; +} PostExecUnitKeyedJobIndexKey; + /* The module keyspace notification subscribers list */ static list *moduleKeyspaceSubscribers; @@ -347,6 +377,12 @@ static int moduleKeyspaceSubscribersWithSubkeysTypes = 0; /* The module post keyspace jobs list */ static list *modulePostExecUnitJobs; +/* Index from (module, per-key callback) -> listNode* in modulePostExecUnitJobs. + * Used so that submitting a second key for an already-pending keyed job appends + * to that job's entry list rather than enqueuing a new job. Reset to empty at + * the end of every drain. */ +static dict *modulePostExecUnitKeyedJobIndex; + /* Data structures related to the exported dictionary data structure. */ typedef struct RedisModuleDict { rax *rax; /* The radix tree. */ @@ -9434,12 +9470,40 @@ void firePostExecutionUnitJobs(void) { moduleCreateContext(&ctx, job->module, REDISMODULE_CTX_TEMP_CLIENT); selectDb(ctx.client, job->dbid); - job->callback(&ctx, job->pd); - if (job->free_pd) job->free_pd(job->pd); + if (job->type == POST_EXEC_UNIT_JOB_SINGLE) { + job->u.single.callback(&ctx, job->u.single.pd); + if (job->u.single.free_pd) job->u.single.free_pd(job->u.single.pd); + } else { + /* Keyed job: fan out the callback over each collected key, sharing + * the same context. The index entry is removed so that any keys + * submitted *after* this point (e.g. from a nested KSN triggered + * inside the callback) start a fresh keyed job. */ + PostExecUnitKeyedJobIndexKey idx = { + .module = job->module, + .callback = job->u.keyed.callback, + }; + dictDelete(modulePostExecUnitKeyedJobIndex, &idx); + + listIter li; + listNode *eln; + listRewind(job->u.keyed.entries, &li); + while ((eln = listNext(&li)) != NULL) { + RedisModulePostExecUnitKeyedEntry *e = listNodeValue(eln); + job->u.keyed.callback(&ctx, e->key, e->pd); + if (e->free_pd) e->free_pd(e->pd); + decrRefCount(e->key); + zfree(e); + } + listRelease(job->u.keyed.entries); + } moduleFreeContext(&ctx); zfree(job); } + /* Defensive: any stale index entries (shouldn't be possible since every + * keyed job we enqueued got drained above, but cheap insurance). */ + if (dictSize(modulePostExecUnitKeyedJobIndex) > 0) + dictEmpty(modulePostExecUnitKeyedJobIndex, NULL); exitExecutionUnit(); } @@ -9465,16 +9529,69 @@ int RM_AddPostNotificationJob(RedisModuleCtx *ctx, RedisModulePostNotificationJo return REDISMODULE_ERR; } RedisModulePostExecUnitJob *job = zmalloc(sizeof(*job)); + job->type = POST_EXEC_UNIT_JOB_SINGLE; job->module = ctx->module; - job->callback = callback; - job->pd = privdata; - job->free_pd = free_privdata; job->dbid = ctx->client->db->id; + job->u.single.callback = callback; + job->u.single.pd = privdata; + job->u.single.free_pd = free_privdata; listAddNodeTail(modulePostExecUnitJobs, job); return REDISMODULE_OK; } +/* Sibling of `RM_AddPostNotificationJob` that fans out per-key. Multiple + * submissions of the same (module, callback) pair within a single execution + * unit are coalesced into a single job whose callback is invoked once per + * collected key, in submission order. This lets a module react to several + * keys touched by the same MULTI/EXEC in one atomic propagation block. + * + * Refusal rules, dbid pinning, replication atomicity, and re-entrancy + * semantics are identical to `RM_AddPostNotificationJob`. + * + * `key` must be a valid RedisModuleString. The implementation takes its own + * reference; the caller retains ownership of its own reference and may free + * it at any time. `free_pd` may be NULL. + * + * Return REDISMODULE_OK on success and REDISMODULE_ERR if called while loading + * data from disk (AOF or RDB) or on a read-only replica. */ +int RM_AddPostNotificationJobForKey(RedisModuleCtx *ctx, RedisModulePostNotificationJobPerKeyFunc callback, RedisModuleString *key, void *privdata, void (*free_privdata)(void*)) { + if (server.loading || (server.masterhost && server.repl_slave_ro)) { + return REDISMODULE_ERR; + } + + RedisModulePostExecUnitKeyedEntry *entry = zmalloc(sizeof(*entry)); + entry->key = key; + incrRefCount(entry->key); + entry->pd = privdata; + entry->free_pd = free_privdata; + + PostExecUnitKeyedJobIndexKey idx_key = { + .module = ctx->module, + .callback = callback, + }; + dictEntry *de = dictFind(modulePostExecUnitKeyedJobIndex, &idx_key); + if (de) { + RedisModulePostExecUnitJob *job = dictGetVal(de); + listAddNodeTail(job->u.keyed.entries, entry); + return REDISMODULE_OK; + } + + RedisModulePostExecUnitJob *job = zmalloc(sizeof(*job)); + job->type = POST_EXEC_UNIT_JOB_KEYED; + job->module = ctx->module; + job->dbid = ctx->client->db->id; + job->u.keyed.callback = callback; + job->u.keyed.entries = listCreate(); + listAddNodeTail(job->u.keyed.entries, entry); + listAddNodeTail(modulePostExecUnitJobs, job); + + PostExecUnitKeyedJobIndexKey *idx = zmalloc(sizeof(*idx)); + *idx = idx_key; + dictAdd(modulePostExecUnitKeyedJobIndex, idx, job); + return REDISMODULE_OK; +} + /* Get the configured bitmap of notify-keyspace-events (Could be used * for additional filtering in RedisModuleNotificationFunc) */ int RM_GetNotifyKeyspaceEvents(void) { @@ -12994,6 +13111,31 @@ dictType moduleAPIDictType = { NULL /* allow to expand */ }; +static uint64_t postExecUnitKeyedJobIndexHash(const void *key) { + return dictGenHashFunction(key, sizeof(PostExecUnitKeyedJobIndexKey)); +} + +static int postExecUnitKeyedJobIndexCompare(dictCmpCache *cache, const void *k1, const void *k2) { + UNUSED(cache); + const PostExecUnitKeyedJobIndexKey *a = k1, *b = k2; + return a->module == b->module && a->callback == b->callback; +} + +static void postExecUnitKeyedJobIndexKeyDtor(dict *d, void *k) { + UNUSED(d); + zfree(k); +} + +dictType postExecUnitKeyedJobIndexDictType = { + postExecUnitKeyedJobIndexHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + postExecUnitKeyedJobIndexCompare, /* key compare */ + postExecUnitKeyedJobIndexKeyDtor, /* key destructor */ + NULL, /* val destructor */ + NULL /* allow to expand */ +}; + int moduleRegisterApi(const char *funcname, void *funcptr) { return dictAdd(server.moduleapi, (char*)funcname, funcptr); } @@ -13034,6 +13176,7 @@ void moduleInitModulesSystem(void) { moduleKeyspaceSubscribers = listCreate(); modulePostExecUnitJobs = listCreate(); + modulePostExecUnitKeyedJobIndex = dictCreate(&postExecUnitKeyedJobIndexDictType); /* Set up filter list */ moduleCommandFilters = listCreate(); @@ -15573,6 +15716,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(SubscribeToKeyspaceEventsWithSubkeys); REGISTER_API(UnsubscribeFromKeyspaceEventsWithSubkeys); REGISTER_API(AddPostNotificationJob); + REGISTER_API(AddPostNotificationJobForKey); REGISTER_API(RegisterClusterMessageReceiver); REGISTER_API(SendClusterMessage); REGISTER_API(GetClusterNodeInfo); diff --git a/src/redismodule.h b/src/redismodule.h index f0d9e8aa60..55557c1e50 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -996,6 +996,7 @@ typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlocke typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); typedef void (*RedisModuleNotificationWithSubkeysFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key, RedisModuleString **subkeys, int count); typedef void (*RedisModulePostNotificationJobFunc) (RedisModuleCtx *ctx, void *pd); +typedef void (*RedisModulePostNotificationJobPerKeyFunc) (RedisModuleCtx *ctx, RedisModuleString *key, void *pd); typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver); typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value); typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int when); @@ -1383,6 +1384,7 @@ REDISMODULE_API int (*RedisModule_UnsubscribeFromKeyspaceEvents)(RedisModuleCtx REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEventsWithSubkeys)(RedisModuleCtx *ctx, int types, int flags, RedisModuleNotificationWithSubkeysFunc cb) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_UnsubscribeFromKeyspaceEventsWithSubkeys)(RedisModuleCtx *ctx, int types, int flags, RedisModuleNotificationWithSubkeysFunc cb) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_AddPostNotificationJob)(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_AddPostNotificationJobForKey)(RedisModuleCtx *ctx, RedisModulePostNotificationJobPerKeyFunc callback, RedisModuleString *key, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_NotifyKeyspaceEventWithSubkeys)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key, RedisModuleString **subkeys, int count) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)(void) REDISMODULE_ATTR; @@ -1788,6 +1790,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(SubscribeToKeyspaceEventsWithSubkeys); REDISMODULE_GET_API(UnsubscribeFromKeyspaceEventsWithSubkeys); REDISMODULE_GET_API(AddPostNotificationJob); + REDISMODULE_GET_API(AddPostNotificationJobForKey); REDISMODULE_GET_API(NotifyKeyspaceEvent); REDISMODULE_GET_API(NotifyKeyspaceEventWithSubkeys); REDISMODULE_GET_API(GetNotifyKeyspaceEvents); diff --git a/tests/modules/postnotifications.c b/tests/modules/postnotifications.c index 96fb859184..9ae316363e 100644 --- a/tests/modules/postnotifications.c +++ b/tests/modules/postnotifications.c @@ -49,6 +49,27 @@ static void KeySpace_PostNotificationString(RedisModuleCtx *ctx, void *pd) { RedisModule_FreeCallReply(rep); } +/* Per-key post-notification callback: appends each batched key to a single + * list, so the test can assert all keys touched in one execution unit fan + * out into the same MULTI/EXEC replication block. */ +static void KeySpace_PostNotificationBatchedKey(RedisModuleCtx *ctx, RedisModuleString *key, void *pd) { + REDISMODULE_NOT_USED(pd); + RedisModuleCallReply *rep = RedisModule_Call(ctx, "lpush", "!cs", "batched_keys", key); + RedisModule_FreeCallReply(rep); +} + +static int KeySpace_NotificationBatched(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { + REDISMODULE_NOT_USED(type); + REDISMODULE_NOT_USED(event); + + const char *key_str = RedisModule_StringPtrLen(key, NULL); + if (strncmp(key_str, "batched_", 8) != 0) return REDISMODULE_OK; + if (strcmp(key_str, "batched_keys") == 0) return REDISMODULE_OK; /* skip our sink list */ + + RedisModule_AddPostNotificationJobForKey(ctx, KeySpace_PostNotificationBatchedKey, key, NULL, NULL); + return REDISMODULE_OK; +} + static int KeySpace_NotificationExpired(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){ REDISMODULE_NOT_USED(type); REDISMODULE_NOT_USED(event); @@ -269,6 +290,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; } + if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationBatched) != REDISMODULE_OK){ + return REDISMODULE_ERR; + } + if (with_key_events) { if(RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_Key, KeySpace_ServerEventCallback) != REDISMODULE_OK){ return REDISMODULE_ERR; diff --git a/tests/unit/moduleapi/ksn_notify_coverage.md b/tests/unit/moduleapi/ksn_notify_coverage.md new file mode 100644 index 0000000000..68f723411d --- /dev/null +++ b/tests/unit/moduleapi/ksn_notify_coverage.md @@ -0,0 +1,173 @@ +# Keyspace Notification Test Coverage Analysis + +This document analyzes the test coverage for keyspace notifications that affect +modules like RediSearch, which use `SetKeyMeta` in notification callbacks. + +## RediSearch Notification Events + +RediSearch's `HashNotificationCallback` handles the following events (from +`modules/redisearch/src/src/notifications.c`): + +```c +typedef enum { + _null_cmd, + hset_cmd, + hmset_cmd, + hsetnx_cmd, + hincrby_cmd, + hincrbyfloat_cmd, + hdel_cmd, + del_cmd, + set_cmd, + rename_from_cmd, + rename_to_cmd, + trimmed_cmd, + restore_cmd, + expire_cmd, + persist_cmd, + expired_cmd, + hexpire_cmd, + hpersist_cmd, + hexpired_cmd, + evicted_cmd, + change_cmd, + loaded_cmd, + copy_to_cmd, +} RedisCmd; +``` + +## Test Coverage Matrix + +Legend: +- ✅ = Already covered (before this PR) +- 🆕 = Added in this PR +- ❌ = Not covered + +### Hash Events (NOTIFY_HASH) + +| Event | Commands That Trigger | Coverage | Notes | +|-------|----------------------|----------|-------| +| `hset` | HSET, HMSET, HSETNX | ✅ Already covered | HSET, HMSET, HSETNX tests | +| `hset` | HSETEX | 🆕 **This PR** | HSETEX test | +| `hdel` | HDEL | 🆕 **This PR** | HDEL test | +| `hdel` | HGETDEL | 🆕 **This PR** | HGETDEL test | +| `hdel` | HGETEX (past timestamp), HSETEX (past timestamp), HEXPIRE (past timestamp) | 🆕 **This PR** | Covered via HGETEX/HSETEX/HEXPIRE tests | +| `hincrby` | HINCRBY | ✅ Already covered | HINCRBY test | +| `hincrbyfloat` | HINCRBYFLOAT | ✅ Already covered | HINCRBYFLOAT test | +| `hexpire` | HEXPIRE, HPEXPIRE, HEXPIREAT, HPEXPIREAT | 🆕 **This PR** | HEXPIRE test | +| `hexpire` | HGETEX (with EX/PX), HSETEX (with EX/PX) | 🆕 **This PR** | HGETEX/HSETEX tests | +| `hpersist` | HPERSIST | 🆕 **This PR** | HPERSIST test | +| `hpersist` | HGETEX (with PERSIST) | 🆕 **This PR** | HGETEX test | +| `hexpired` | Lazy field expiration, Active field expiration | 🆕 **This PR** | Hash field expiry test | + +### Generic Events (NOTIFY_GENERIC) + +| Event | Commands That Trigger | Coverage | Notes | +|-------|----------------------|----------|-------| +| `del` | DEL, UNLINK | ✅ Already covered | DEL test | +| `del` | Hash becomes empty | 🆕 **This PR** | Via HDEL/HGETDEL tests | +| `rename_from` | RENAME, RENAMENX | ✅ Already covered | RENAME test | +| `rename_to` | RENAME, RENAMENX | ✅ Already covered | RENAME test | +| `restore` | RESTORE | ✅ Already covered | RESTORE test | +| `expire` | EXPIRE, PEXPIRE, EXPIREAT, PEXPIREAT | ✅ Already covered | EXPIRE test | +| `expire` | SET (with EX/PX), GETEX (with EX/PX), SETEX, PSETEX | ⚠️ Partial | SET test covers some | +| `persist` | PERSIST | 🆕 **This PR** | PERSIST test | +| `copy_to` | COPY | 🆕 **This PR** | COPY test | +| `loaded` | RDB load (DEBUG RELOAD, server restart) | ✅ Already covered | DEBUG RELOAD test | + +### String Events (NOTIFY_STRING) + +| Event | Commands That Trigger | Coverage | Notes | +|-------|----------------------|----------|-------| +| `set` | SET, SETEX, PSETEX, SETNX, SETRANGE, etc. | ✅ Already covered | SET test | + +### Expired/Evicted Events + +| Event | Commands That Trigger | Coverage | Notes | +|-------|----------------------|----------|-------| +| `expired` | Key expiration (lazy or active) | ✅ Already covered | EXPIRE test waits for expiry | +| `evicted` | Memory eviction | ❌ Not tested | Requires maxmemory config | + +## Summary: What This PR Adds + +### New Tests Added in This PR + +| Test | Events Covered | Status Without Fix | +|------|----------------|-------------------| +| HSETEX | `hset`, `hexpire`, `hdel` | ❌ **CRASHES** | +| HGETDEL | `hdel`, `hexpired` | ❌ **CRASHES** | +| HGETEX | `hexpire`, `hpersist`, `hdel` | ❌ **CRASHES** | +| HDEL | `hdel` | ❌ **CRASHES** | +| HEXPIRE | `hexpire`, `hdel` | ❌ **CRASHES** | +| HPERSIST | `hpersist` | ✅ Passes | +| (field expiry) | `hexpired` | ✅ Passes | +| PERSIST | `persist` | ✅ Passes | +| COPY | `copy_to` | ✅ Passes | + +### Bug Fixes Required + +Commands that need fixing for use-after-reallocation when `SetKeyMeta` is called: +- `hsetexCommand` - accesses `o` after `notifyKeyspaceEvent` +- `hgetdelCommand` - accesses `o` after `notifyKeyspaceEvent` +- `hgetexCommand` - accesses `o` after `notifyKeyspaceEvent` +- `hdelCommand` - accesses `o` after `notifyKeyspaceEvent` +- `hexpireGenericCommand` - accesses `hashObj` after `notifyKeyspaceEvent` + +## Still Not Covered (Future Work) + +| Event | Command/Trigger | Reason | +|-------|-----------------|--------| +| `evicted` | Memory eviction | Requires maxmemory configuration | + +## Command to Event Mapping + +### Commands Already Covered (Before This PR) + +| Command | Event(s) Triggered | +|---------|-------------------| +| HSETNX | `hset` | +| HSET | `hset` | +| HMSET | `hset` | +| HINCRBY | `hincrby` | +| HINCRBYFLOAT | `hincrbyfloat` | +| SET | `set` | +| APPEND | `append` | +| INCR | `incrby` | +| INCRBY | `incrby` | +| INCRBYFLOAT | `incrbyfloat` | +| GETSET | `set` | +| SETRANGE | `setrange` | +| DEL | `del` | +| RENAME | `rename_from`, `rename_to` | +| RESTORE | `restore` | +| EXPIRE/PEXPIRE | `expire` | +| DEBUG RELOAD | `loaded` | + +### Commands Added in This PR + +| Command | Event(s) Triggered | +|---------|-------------------| +| HSETEX | `hset`, `hexpire`, `hdel` | +| HGETDEL | `hdel`, `hexpired` | +| HGETEX | `hexpire`, `hpersist`, `hdel` | +| HDEL | `hdel` | +| HEXPIRE/HPEXPIRE | `hexpire`, `hdel` | +| HPERSIST | `hpersist` | + +### Commands NOT Covered + +| Command | Event(s) Triggered | +|---------|-------------------| +| PERSIST | `persist` | +| COPY | `copy_to` | +| (field expiry wait) | `hexpired` | + +## Source Files Reference + +- Notification events: `src/notify.c` +- Hash notifications: `src/t_hash.c` +- String notifications: `src/t_string.c` +- Generic notifications: `src/db.c`, `src/expire.c` +- RediSearch handler: `modules/redisearch/src/src/notifications.c` +- Test module: `tests/modules/keymeta_notify.c` +- Test file: `tests/unit/moduleapi/ksn_notify_side_effect.tcl` diff --git a/tests/unit/moduleapi/postnotifications.tcl b/tests/unit/moduleapi/postnotifications.tcl index 31a4669413..5636acfa07 100644 --- a/tests/unit/moduleapi/postnotifications.tcl +++ b/tests/unit/moduleapi/postnotifications.tcl @@ -148,6 +148,54 @@ tags "modules external:skip" { close_replication_stream $repl } + test {Test per-key post notification job fans out within a MULTI/EXEC} { + r flushall + set repl [attach_to_replication_stream] + + r multi + r set batched_a 1 + r set batched_b 2 + r set batched_c 3 + r exec + + assert_equal {batched_c batched_b batched_a} [r lrange batched_keys 0 -1] + + # The three SETs are one execution unit; the keyed post-notification + # job coalesces them and the callback fans out into three LPUSHes + # inside the same MULTI/EXEC propagation block. + assert_replication_stream $repl { + {multi} + {select *} + {set batched_a 1} + {set batched_b 2} + {set batched_c 3} + {lpush batched_keys batched_a} + {lpush batched_keys batched_b} + {lpush batched_keys batched_c} + {exec} + } + close_replication_stream $repl + } + + test {Test per-key post notification job from a multi-key command} { + r flushall + set repl [attach_to_replication_stream] + + r mset batched_a 1 batched_b 2 batched_c 3 + assert_equal {batched_c batched_b batched_a} [r lrange batched_keys 0 -1] + + assert_replication_stream $repl { + {multi} + {select *} + {mset batched_a 1 batched_b 2 batched_c 3} + {lpush batched_keys batched_a} + {lpush batched_keys batched_b} + {lpush batched_keys batched_c} + {exec} + } + close_replication_stream $repl + } + test {Test eviction} { r flushall set repl [attach_to_replication_stream]