redis/tests/unit/moduleapi/keyspace_events.tcl
Yuan Wang 4757561861
Subkey notification for hash fields (#14958)
## Motivation

Redis's existing keyspace notification system operates at the **key
level** only — when a hash field is modified via `HSET`, `HDEL`, or
`HEXPIRE`, the subscriber receives the key name and the event type, but
not **which fields** were affected, therefore, these notifications has
very little practical value.

This PR introduces a subkey notification system that extends keyspace
events to include field-level (subkey) details for hash operations,
through both Pub/Sub channels and the Module API.

## New Pub/Sub Notification Channels

Four new channels are added:

|Channel Format | Payload |
|---------------|---------|
| `__subkeyspace@<db>__:<key>` | `<event>\|<len>:<subkey>[,...]` |
|`__subkeyevent@<db>__:<event>` |
`<key_len>:<key>\|<len>:<subkey>[,...]` |
| `__subkeyspaceitem@<db>__:<key>\n<subkey>` | `<event>` |
|`__subkeyspaceevent@<db>__:<event>\|<key>` | `<len>:<subkey>[,...]` |

**Design rationale for 4 channels:**
- **Subkeyspace**: Subscribe to a specific key, receive all field
changes in a single message — efficient for key-centric consumers.
- **Subkeyevent**: Subscribe to a specific event type, receive
key+fields — efficient for event-centric consumers.
- **Subkeyspaceitem**: Subscribe to a specific key+field combination —
the most selective, one message per field, no parsing needed.
- **Subkeyspaceevent**: Subscribe to event+key combination, receiving
only the affected fields — server-side filtering on both dimensions.

Subkeys are encoded in a length-prefixed format (`<len>:<subkey>`) to
support binary-safe field names containing delimiters.

**Safety guards:**
- Events containing `|` are skipped for `__subkeyspace` and
`__subkeyspaceevent ` channels (to avoid parsing ambiguity).
- Keys containing `\n` are skipped for the `__subkeyspaceitem` channel
(newline is the key/subkey separator).
- Subkeys channels are only published when `subkeys != NULL && count >
0`.

## Hash Command Integration

The following hash operations now emit subkey level notifications with
the affected field names:

| Command | Event | Subkeys |
|---------|-------|---------|
| `HSET` / `HMSET` | `hset` | All fields being set |
| `HSETNX` | `hset` | The field (if set) |
| `HDEL` | `hdel` | All fields deleted |
| `HGETDEL` | `hdel` / `hexpired` | Deleted or lazily expired fields |
| `HGETEX` | `hexpire` / `hpersist` / `hdel` / `hexpired` | Affected
fields per event |
| `HINCRBY` | `hincrby` | The field |
| `HINCRBYFLOAT` | `hincrbyfloat` | The field |
| `HEXPIRE` / `HPEXPIRE` / `HEXPIREAT` / `HPEXPIREAT` | `hexpire` |
Updated fields |
| `HPERSIST` | `hpersist` | Persisted fields |
| `HSETEX` | `hset` / `hdel` / `hexpire` / `hexpired` | Affected fields
per event |
| Field expiration (active/lazy) | `hexpired` | All expired fields
(batched) |

For field expiration, expired fields are collected into a dynamic array
and sent as a single batched notification after the expiration loop,
rather than one notification per field.

## Module API

Three new APIs and one new callback type:

```c
/* Function pointer type for keyspace event notifications with subkeys from modules. */
typedef void (*RedisModuleNotificationWithSubkeysFunc)(
    RedisModuleCtx *ctx, int type, const char *event,
    RedisModuleString *key, RedisModuleString **subkeys, int count);

/* Subscribe to keyspace notifications with subkey information.
 *
 * This is the extended version of RM_SubscribeToKeyspaceEvents. When subkeys
 * are available, the `subkeys` array and `count` are passed to the callback.
 * `subkeys` contains only the names of affected subkeys (values are not included),
 * and `count` is the number of elements. The array may contain duplicates when
 * the same subkey appears more than once in a command (e.g. HSET key f1 v1 f1 v2
 * produces subkeys=["f1","f1"], count=2). When no subkeys are present, `subkeys`
 * will be NULL and `count` will be 0. Whether events without subkeys are delivered
 * depends on the `flags` parameter (see below).
 *
 * `types` is a bit mask of event types the module is interested in
 * (using the same REDISMODULE_NOTIFY_* flags as RM_SubscribeToKeyspaceEvents).
 *
 * `flags` controls delivery filtering:
 *  - REDISMODULE_NOTIFY_FLAG_NONE: The callback is invoked for all matching
 *    events regardless of whether subkeys are present, so a separate
 *    RM_SubscribeToKeyspaceEvents registration can be omitted.
 *  - REDISMODULE_NOTIFY_FLAG_SUBKEYS_REQUIRED: The callback is only invoked
 *    when subkeys are not empty. Events without subkey information (e.g. SET,
 *    EXPIRE, DEL) are skipped.
 *
 * The callback signature is:
 *   void callback(RedisModuleCtx *ctx, int type, const char *event,
 *                 RedisModuleString *key, RedisModuleString **subkeys, int count);
 *
 * The subkeys array and its contents are only valid during the callback.
 * The underlying objects may be stack-allocated or temporary, so
 * RM_RetainString must NOT be used on them. To keep a subkey beyond
 * the callback (e.g. in a RM_AddPostNotificationJob callback), use
 * RM_HoldString (which handles static objects by copying) or
 * RM_CreateStringFromString to make a deep copy before returning.
 */
int RM_SubscribeToKeyspaceEventsWithSubkeys(RedisModuleCtx *ctx, int types, int flags, RedisModuleNotificationWithSubkeysFunc callback);

/* Unregister a module's callback from keyspace notifications with subkeys
 * for specific event types.
 *
 * This function removes a previously registered subscription identified by
 * the event mask, delivery flags, and the callback function.
 *
 * Parameters:
 *  - ctx: The RedisModuleCtx associated with the calling module.
 *  - types: The event mask representing the notification types to unsubscribe from.
 *  - flags: The delivery flags that were used during registration.
 *  - callback: The callback function pointer that was originally registered.
 *
 * Returns:
 *  - REDISMODULE_OK on successful removal of the subscription.
 *  - REDISMODULE_ERR if no matching subscription was found. */ 
int RM_UnsubscribeFromKeyspaceEventsWithSubkeys(
    RedisModuleCtx *ctx, int types, int flags,
    RedisModuleNotificationWithSubkeysFunc cb);

/* Like RM_NotifyKeyspaceEvent, but also triggers subkey-level notifications
 * when subkeys are provided. Both key-level (keyspace/keyevent) and
 * subkey-level (subkeyspace/subkeyevent/subkeyspaceitem/subkeyspaceevent)
 * channels are published to, depending on the server configuration.
 *
 * This is the extended version of RM_NotifyKeyspaceEvent and can actually
 * replace it. When called with subkeys=NULL and count=0, it behaves
 * identically to RM_NotifyKeyspaceEvent. */
int RM_NotifyKeyspaceEventWithSubkeys(
    RedisModuleCtx *ctx, int type, const char *event,
    RedisModuleString *key, RedisModuleString **subkeys, int count);
```

## Configuration

Subkey notifications are controlled via the existing
`notify-keyspace-events` configuration string with four new characters:
`notify-keyspace-events` "STIV"

**S** -> Subkeyspace events, published with `__subkeyspace@<db>__:<key>`
prefix.
**T** -> Subkeyevent events, published with
`__subkeyevent@<db>__:<event>` prefix.
**I** -> Subkeyspaceitem events, published per subkey with
`__subkeyspaceitem@<db>__:<key>\n<subkey>` prefix.
**V** -> Subkeyspaceevent events, published with
`__subkeyspaceevent@<db>__:<event>|<key>` prefix.

These flags are **independent** from the existing key-level flags (`K`,
`E`, etc.). Enabling subkey notifications does **not** implicitly enable
or depend on keyspace/keyevent notifications, and vice versa.

## Known Limitations

- **Duplicate fields in subkey notifications**: Subkey notification
payloads may contain duplicate field names when the same field is
affected more than once within a single command. Since duplicate fields
are not the common case and deduplication would introduce significant
overhead on every notification, we chose not to deduplicate at this
time.
- **Subkey is sds encoding object**: We assume the subkey is sds
encoding object, and access it by `subkey->ptr`, and there is an assert,
redis will crash if not.
2026-04-17 13:39:04 +08:00

305 lines
11 KiB
Tcl

set testmodule [file normalize tests/modules/keyspace_events.so]
tags "modules external:skip" {
start_server [list overrides [list loadmodule "$testmodule"]] {
# avoid using shared integers, to increase the chance of detection heap issues
r config set maxmemory-policy allkeys-lru
r config set maxmemory 1gb
test {Test loaded key space event} {
r set x 1
r hset y f v
r lpush z 1 2 3
r sadd p 1 2 3
r zadd t 1 f1 2 f2
r xadd s * f v
r debug reload
assert_equal {1 x} [r keyspace.is_key_loaded x]
assert_equal {1 y} [r keyspace.is_key_loaded y]
assert_equal {1 z} [r keyspace.is_key_loaded z]
assert_equal {1 p} [r keyspace.is_key_loaded p]
assert_equal {1 t} [r keyspace.is_key_loaded t]
assert_equal {1 s} [r keyspace.is_key_loaded s]
}
test {Nested multi due to RM_Call} {
r del multi
r del lua
r set x 1
r set x_copy 1
r keyspace.del_key_copy x
r keyspace.incr_case1 x
r keyspace.incr_case2 x
r keyspace.incr_case3 x
assert_equal {} [r get multi]
assert_equal {} [r get lua]
r get x
} {3}
test {Nested multi due to RM_Call, with client MULTI} {
r del multi
r del lua
r set x 1
r set x_copy 1
r multi
r keyspace.del_key_copy x
r keyspace.incr_case1 x
r keyspace.incr_case2 x
r keyspace.incr_case3 x
r exec
assert_equal {1} [r get multi]
assert_equal {} [r get lua]
r get x
} {3}
test {Nested multi due to RM_Call, with EVAL} {
r del multi
r del lua
r set x 1
r set x_copy 1
r eval {
redis.pcall('keyspace.del_key_copy', KEYS[1])
redis.pcall('keyspace.incr_case1', KEYS[1])
redis.pcall('keyspace.incr_case2', KEYS[1])
redis.pcall('keyspace.incr_case3', KEYS[1])
} 1 x
assert_equal {} [r get multi]
assert_equal {1} [r get lua]
r get x
} {3}
test {Test module key space event} {
r keyspace.notify x
assert_equal {1 x} [r keyspace.is_module_key_notified x]
}
test "Keyspace notifications: module events test" {
r config set notify-keyspace-events Kd
r del x
set rd1 [redis_deferring_client]
assert_equal {1} [psubscribe $rd1 *]
r keyspace.notify x
assert_equal {pmessage * __keyspace@9__:x notify} [$rd1 read]
$rd1 close
}
test "Keyspace notifications: unsubscribe removes handler" {
r config set notify-keyspace-events KEA
set before [r keyspace.callback_count]
r set a 1
r del a
wait_for_condition 100 10 {
[r keyspace.callback_count] > $before
} else {
fail "callback did not trigger"
}
set before_unsub [r keyspace.callback_count]
r keyspace.unsubscribe 4 ;# REDISMODULE_NOTIFY_GENERIC
r set a 1
r del a
set after_unsub [r keyspace.callback_count]
assert_equal $before_unsub $after_unsub
}
test {Test expired key space event} {
set prev_expired [s expired_keys]
r set exp 1 PX 10
wait_for_condition 100 10 {
[s expired_keys] eq $prev_expired + 1
} else {
fail "key not expired"
}
assert_equal [r get testkeyspace:expired] 1
}
test "Subkey notification: subscribe starts callback" {
r keyspace.subscribe_subkeys
r keyspace.reset_subkey_events
r config set notify-keyspace-events ""
}
test "Subkey notification: HSET triggers module subkey callback" {
r keyspace.reset_subkey_events
r hset myhash f1 v1 f2 v2
set events [r keyspace.get_subkey_events]
assert_equal 1 [llength $events]
assert_equal "hset myhash 2 f1 f2" [lindex $events 0]
r del myhash
}
test "Subkey notification: HDEL triggers module subkey callback" {
r hset myhash f1 v1 f2 v2
r keyspace.reset_subkey_events
r hdel myhash f1
set events [r keyspace.get_subkey_events]
assert_equal 1 [llength $events]
assert_equal "hdel myhash 1 f1" [lindex $events 0]
r del myhash
}
test "Subkey notification: non-subkey event calls subkey callback with count=0" {
r hset myhash f1 v1
r keyspace.reset_subkey_events
r del myhash
set events [r keyspace.get_subkey_events]
# DEL is NOTIFY_GENERIC — our callback is registered for
# HASH|GENERIC, so it should be called with subkeys=NULL, count=0.
assert_equal 1 [llength $events]
assert_equal "del myhash 0" [lindex $events 0]
}
test "Subkey notification: module-triggered NotifyKeyspaceEventWithSubkeys" {
r keyspace.reset_subkey_events
r keyspace.notify_with_subkeys mykey sk1 sk2 sk3
set events [r keyspace.get_subkey_events]
assert_equal 1 [llength $events]
assert_equal "module_subkey_event mykey 3 sk1 sk2 sk3" [lindex $events 0]
}
test "Subkey notification: lazy hash field expiry triggers hexpired with subkeys" {
r debug set-active-expire 0
r del myhash
r hset myhash f1 v1 f2 v2 f3 v3
r hpexpire myhash 10 FIELDS 2 f1 f2
r keyspace.reset_subkey_events
after 100
r hmget myhash f1 f2
assert_equal "hexpired myhash 2 f1 f2" [lindex [r keyspace.get_subkey_events] 0]
r debug set-active-expire 1
} {OK} {needs:debug}
test "Subkey notification: active hash field expiry triggers hexpired with subkeys" {
r del myhash
r hset myhash f1 v1 f2 v2
r keyspace.reset_subkey_events
r hpexpire myhash 10 FIELDS 2 f1 f2
# wait for active expiry to kick in
wait_for_condition 50 100 {
[r exists myhash] == 0
} else {
fail "Fields not expired by active expiry"
}
# fields order is undefined
assert_match "hexpired myhash 2 f* f*" [lindex [r keyspace.get_subkey_events] 1]
r del myhash
}
test "Subkey notification: unsubscribe stops callback and resubscribe resumes" {
r keyspace.reset_subkey_events
r hset myhash f1 v1
set events [r keyspace.get_subkey_events]
assert_equal 1 [llength $events]
# Unsubscribe — events should stop
r keyspace.unsubscribe_subkeys
r keyspace.reset_subkey_events
r hset myhash f2 v2
set events [r keyspace.get_subkey_events]
assert_equal 0 [llength $events]
# active expire should not trigger subkey callback
r hpexpire myhash 10 FIELDS 2 f1 f2
wait_for_condition 50 100 {
[r exists myhash] == 0
} else {
fail "Fields not expired by active expiry"
}
set events [r keyspace.get_subkey_events]
assert_equal 0 [llength $events]
# Re-subscribe — events should resume
r keyspace.subscribe_subkeys
r del myhash
r hset myhash f1 v1 f2 v2
r keyspace.reset_subkey_events
r hpexpire myhash 10 FIELDS 2 f1 f2
assert_match "hexpire myhash 2 f* f*" [lindex [r keyspace.get_subkey_events] 0]
# active expire should also resume subkey callback
wait_for_condition 50 100 {
[r exists myhash] == 0
} else {
fail "Fields not expired by active expiry"
}
assert_match "hexpired myhash 2 f* f*" [lindex [r keyspace.get_subkey_events] 1]
r keyspace.unsubscribe_subkeys
r keyspace.reset_subkey_events
r del myhash
}
test "Subkey notification: SUBKEYS_REQUIRED flag skips events without subkeys" {
r keyspace.subscribe_require_subkeys
r keyspace.reset_subkey_events
# HSET has subkeys — should trigger callback
r hset myhash f1 v1 f2 v2
set events [r keyspace.get_subkey_events]
assert_equal 1 [llength $events]
assert_equal "hset myhash 2 f1 f2" [lindex $events 0]
# DEL has no subkeys — the callback should be skipped.
r keyspace.reset_subkey_events
r del myhash
set events [r keyspace.get_subkey_events]
assert_equal 0 [llength $events]
r keyspace.unsubscribe_require_subkeys
}
test "Unload the module - testkeyspace" {
assert_equal {OK} [r module unload testkeyspace]
}
test "Verify RM_StringDMA with expiration are not causing invalid memory access" {
assert_equal {OK} [r set x 1 EX 1]
}
}
# Replication test: replica module receives subkey notifications
start_server [list overrides [list loadmodule "$testmodule"]] {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
start_server [list overrides [list loadmodule "$testmodule"]] {
set replica [srv 0 client]
$replica replicaof $master_host $master_port
wait_for_sync $replica
test "Subkey notification: replica module receives subkey callback after replication" {
$master keyspace.subscribe_subkeys
$replica keyspace.subscribe_subkeys
$replica keyspace.reset_subkey_events
$master hset myhash f1 v1 f2 v2
wait_for_ofs_sync $master $replica
set events [$replica keyspace.get_subkey_events]
assert_equal 1 [llength $events]
assert_equal "hset myhash 2 f1 f2" [lindex $events 0]
$master del myhash
$master keyspace.unsubscribe_subkeys
$replica keyspace.unsubscribe_subkeys
}
}
}
start_server {} {
test {OnLoad failure will handle un-registration} {
catch {r module load $testmodule noload}
r set x 1
r hset y f v
r lpush z 1 2 3
r sadd p 1 2 3
r zadd t 1 f1 2 f2
r xadd s * f v
r ping
}
}
}