This commit is contained in:
Sergei Georgiev 2026-05-28 04:00:21 +02:00 committed by GitHub
commit 3767aaa34d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 1930 additions and 292 deletions

View file

@ -384,7 +384,7 @@ endif
REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX)
REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX)
REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o entry.o kvstore.o fwtree.o estore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_array.o sparsearray.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_asm.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o keymeta.o chk.o hotkeys.o gcra.o vector.o fast_float_strtod.o
REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o entry.o kvstore.o fwtree.o estore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_array.o sparsearray.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_asm.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o flax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o keymeta.o chk.o hotkeys.o gcra.o vector.o fast_float_strtod.o
REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX)
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX)

View file

@ -2353,7 +2353,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
while(raxNext(&ri_cons)) {
streamConsumer *consumer = ri_cons.data;
/* If there are no pending entries, just emit XGROUP CREATECONSUMER */
if (raxSize(consumer->pel) == 0) {
if (consumer->pel_count == 0) {
if (rioWriteStreamEmptyConsumer(r,key,(char*)ri.key,
ri.key_len,consumer) == 0)
{
@ -2365,22 +2365,22 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
}
/* For the current consumer, iterate all the PEL entries
* to emit the XCLAIM protocol. */
raxIterator ri_pel;
raxStart(&ri_pel,consumer->pel);
raxSeek(&ri_pel,"^",NULL,0);
while(raxNext(&ri_pel)) {
streamNACK *nack = ri_pel.data;
pelIterator pi;
pelIterStart(&pi,consumer->pel);
pelIterSeek(&pi,"^",NULL);
while (pelIterNext(&pi)) {
streamNACK *nack = pi.data;
if (rioWriteStreamPendingEntry(r,key,(char*)ri.key,
ri.key_len,consumer,
ri_pel.key,nack) == 0)
pi.key,nack) == 0)
{
raxStop(&ri_pel);
pelIterStop(&pi);
raxStop(&ri_cons);
raxStop(&ri);
return 0;
}
}
raxStop(&ri_pel);
pelIterStop(&pi);
}
raxStop(&ri_cons);

View file

@ -19,6 +19,7 @@
*/
#include "server.h"
#include "stream.h"
#include <stddef.h>
#include <math.h>
@ -881,48 +882,34 @@ void defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_c
raxStop(&ri);
}
void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) {
/* Walk a consumer-PEL entry and fix up the consumer back-pointer for
* every NACK. NACKs themselves are defragged during the group PEL walk
* (defragStreamCGPendingEntry) which also covers unowned NACK-zone entries;
* here we only repair the stale consumer pointer. */
void* defragStreamConsumerPelEntry(raxIterator *ri, void *privdata) {
streamConsumer *c = privdata;
streamNACK *nack = ri->data;
/* NACKs are already defragged by the CG PEL walk (defragStreamCGPendingEntry).
* cgroup_ref_node->value is also updated there for all NACKs (including
* unowned NACK-zone entries that have no consumer PEL walk).
* Here we only fix up the back-pointer to the possibly-relocated consumer. */
nack->consumer = c;
return NULL;
}
void* defragStreamCGPendingEntry(raxIterator *ri, void *privdata) {
streamCG *cg = privdata;
streamNACK *nack = ri->data, *newnack;
/* Update cgroup_ref_node to the possibly-relocated CG for every NACK.
* Consumer-owned entries will get this overwritten again redundantly by
* defragStreamConsumerPendingEntry; unowned (NACK zone) entries have no
* consumer PEL walk, so this is their only chance. */
nack->cgroup_ref_node->value = cg;
newnack = activeDefragAlloc(nack);
if (newnack) {
/* If this NACK is owned by a consumer, update the consumer's PEL. */
if (newnack->consumer) {
void *prev;
raxInsert(newnack->consumer->pel, ri->key, ri->key_len, newnack, &prev);
serverAssert(prev == nack);
}
if (newnack->pel_prev) {
newnack->pel_prev->pel_next = newnack;
} else {
cg->pel_time_head = newnack;
}
if (newnack->pel_next) {
newnack->pel_next->pel_prev = newnack;
} else {
cg->pel_time_tail = newnack;
}
if (cg->pel_nack_tail == nack) {
cg->pel_nack_tail = newnack;
}
if (ri->key_len == PEL_RAX_DIRECT_KEYLEN) {
streamNACK *nack = ri->data;
nack->consumer = c;
return NULL;
}
return newnack;
flax *f = ri->data;
flax *newflax = activeDefragAlloc(f);
if (newflax) f = newflax;
void *newdata = activeDefragAlloc(f->data);
if (newdata) f->data = newdata;
flaxIterator fi;
flaxStart(&fi, f);
if (flaxSeek(&fi, "^", 0)) {
do {
streamNACK *nack = fi.data;
nack->consumer = c;
} while (flaxNext(&fi));
}
return newflax;
}
void* defragStreamConsumer(raxIterator *ri, void *privdata) {
@ -936,29 +923,90 @@ void* defragStreamConsumer(raxIterator *ri, void *privdata) {
if (newsds)
c->name = newsds;
if (c->pel) {
/* Update pel back-pointer to new stream */
c->pel->alloc_size = &s->alloc_size;
defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, c);
pelCacheInvalidate(c->pel);
defragRadixTree(&c->pel, 0, defragStreamConsumerPelEntry, c);
}
return newc; /* returns NULL if c was not defragged */
}
/* After a NACK has been relocated by activeDefragAlloc(), fix up all
* cross-references that still point to the old address: the consumer PEL,
* the doubly-linked time list, and the NACK-zone tail. */
static void relinkStreamNack(streamCG *cg, streamNACK *nack, streamNACK *newnack) {
if (newnack->consumer) {
pelReplace(newnack->consumer->pel, &newnack->id, newnack);
}
if (newnack->pel_prev) {
newnack->pel_prev->pel_next = newnack;
} else {
cg->pel_time_head = newnack;
}
if (newnack->pel_next) {
newnack->pel_next->pel_prev = newnack;
} else {
cg->pel_time_tail = newnack;
}
if (cg->pel_nack_tail == nack) {
cg->pel_nack_tail = newnack;
}
}
/* Walk a group-PEL entry: for direct entries (16-byte key) defrag the single
* NACK; for flax buckets (15-byte key) defrag the flax struct itself and then
* every NACK inside. Update pointers in the consumer PEL, the doubly-linked
* time list, and the NACK-zone tail. cgroup_ref_node->value is also updated
* here for every NACK (including unowned NACK-zone entries). */
void* defragStreamCGPendingEntry(raxIterator *ri, void *privdata) {
streamCG *cg = privdata;
if (ri->key_len == PEL_RAX_DIRECT_KEYLEN) {
streamNACK *nack = ri->data, *newnack;
nack->cgroup_ref_node->value = cg;
newnack = activeDefragAlloc(nack);
if (newnack) {
relinkStreamNack(cg, nack, newnack);
}
return newnack;
}
flax *f = ri->data;
flax *newf = activeDefragAlloc(f);
if (newf) f = newf;
void *newdata = activeDefragAlloc(f->data);
if (newdata) f->data = newdata;
flaxIterator fi;
flaxStart(&fi, f);
if (flaxSeek(&fi, "^", 0)) {
do {
streamNACK *nack = fi.data, *newnack;
nack->cgroup_ref_node->value = cg;
newnack = activeDefragAlloc(nack);
if (newnack) {
flaxIterSetData(&fi, newnack);
relinkStreamNack(cg, nack, newnack);
}
} while (flaxNext(&fi));
}
return newf;
}
void* defragStreamConsumerGroup(raxIterator *ri, void *privdata) {
stream *s = privdata;
streamCG *newcg, *cg = ri->data;
if ((newcg = activeDefragAlloc(cg)))
cg = newcg;
if (cg->pel) {
/* Update pel back-pointer to new stream */
cg->pel->alloc_size = &s->alloc_size;
pelCacheInvalidate(cg->pel);
defragRadixTree(&cg->pel, 0, defragStreamCGPendingEntry, cg);
}
if (cg->consumers) {
/* Update consumers back-pointer to new stream */
cg->consumers->alloc_size = &s->alloc_size;
defragRadixTree(&cg->consumers, 0, defragStreamConsumer, s);
}
return cg;
return newcg;
}
/* Defrag a single idmpProducer's dict and linked list entries. */

901
src/flax.c Normal file
View file

@ -0,0 +1,901 @@
/* Flax -- A flat sorted-array map for uint8_t keys.
*
* Copyright (c) 2025-Present, Redis Ltd.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3).
*/
#include "flax.h"
#include "redisassert.h"
#include <stdlib.h>
#include <string.h>
#include <stdalign.h>
#ifndef flax_malloc
#ifndef FLAX_MALLOC_INCLUDE
#define FLAX_MALLOC_INCLUDE "flax_malloc.h"
#endif
#include FLAX_MALLOC_INCLUDE
#endif
/* ----------------------------------------------------------------------------
* Flax internals
*
* A flax stores a sorted array of (uint8_t key, void *value) pairs inside a
* single contiguous heap block. The block is split into two sub-arrays:
*
* [ keys: uint8_t * capacity ][ padding ][ values: void* * capacity ]
*
* The padding between keys and values ensures that the values array starts
* at a pointer-aligned offset (see flax_values_offset()).
*
* Only the first `numele` slots in each sub-array hold live data; the
* remainder up to `capacity` is unused reserved space.
*
* Lookup uses linear scan rather than binary search. The expected element
* count is small (e.g. per-consumer stream PEL), so sequential cache-friendly
* access outperforms binary search whose branch-misprediction cost dominates
* at these sizes. Fast-path checks for the head and tail positions further
* accelerate the common case of monotonically increasing keys.
*
* Growth: capacity doubles on insert when full.
* Shrink: flaxShrink() reallocates to fit exactly.
* -------------------------------------------------------------------------- */
/* ----------------------------- Internal helpers ---------------------------- */
/* Return the byte offset where the values array starts within the data
* block for a given capacity. The offset is aligned to pointer size. */
static size_t flax_values_offset(uint16_t capacity) {
size_t raw = (size_t)capacity * sizeof(uint8_t);
size_t align = alignof(void *);
return (raw + align - 1) & ~(align - 1);
}
/* Return a pointer to the keys array inside the flax data block. */
static uint8_t *flax_keys(flax *f) {
return (uint8_t *)f->data;
}
/* Return a pointer to the values array inside the flax data block. */
static void **flax_values(flax *f) {
return (void **)((char *)f->data + flax_values_offset(f->capacity));
}
/* Search for 'key' in the sorted 'keys' array of length 'numele'.
*
* Returns 1 if the key is found, storing its position in *out_idx.
* Returns 0 if the key is absent, storing the insertion point in *out_idx
* (i.e. the index where the key would be placed to keep the array sorted).
*
* The search is a linear scan rather than binary search. This is deliberate:
* flax instances are expected to be small (tens of elements -- e.g. a stream
* consumer's PEL). At these sizes, a sequential walk through a contiguous
* uint8_t array is faster than binary search because:
* 1. The entire keys array fits in one or two cache lines.
* 2. Linear access has no branch-misprediction overhead -- the branch
* predictor can reliably learn the "not found yet, keep going" pattern.
* 3. Binary search touches O(log N) *random* cache lines and suffers a
* misprediction at every comparison.
*
* Two fast paths are checked first:
* - Tail: key > keys[numele-1] is the append case, overwhelmingly common
* when keys are monotonically increasing sequence numbers.
* - Head: key <= keys[0] catches prepend and exact-match-at-zero. */
static int flax_search(const uint8_t *keys, uint16_t numele, uint8_t key, int16_t *out_idx) {
if (numele == 0) {
*out_idx = 0;
return 0;
}
/* Fast path: append (most common — seq numbers grow monotonically). */
if (key > keys[numele - 1]) {
*out_idx = numele;
return 0;
}
if (key == keys[numele - 1]) {
*out_idx = numele - 1;
return 1;
}
/* Fast path: match or prepend at head. */
if (key <= keys[0]) {
*out_idx = 0;
return key == keys[0];
}
/* Linear scan through the middle. */
for (uint16_t i = 1; i < numele - 1; i++) {
if (keys[i] < key) continue;
*out_idx = i;
return keys[i] == key;
}
*out_idx = numele - 1;
return 0;
}
/* Resize the internal storage to 'new_capacity'.
*
* A fresh data block is allocated and the live keys and values are copied
* into it. Because the keys and values sub-arrays sit at different offsets
* that depend on the capacity (the values offset is re-aligned for the new
* capacity), we must perform two independent memcpy operations -- one for
* the keys at the start of the block and one for the values at the new
* aligned offset. The old data block is freed afterwards.
*
* IMPORTANT: this function replaces f->data but never moves the flax struct
* itself. External code (e.g. the PEL cache in t_stream.c) relies on the
* struct pointer remaining stable across resize operations. */
static void flax_resize(flax *f, uint16_t new_capacity) {
if (new_capacity > UINT8_MAX + 1) new_capacity = UINT8_MAX + 1;
size_t new_voff = flax_values_offset(new_capacity);
size_t new_alloc = new_voff + (size_t)new_capacity * sizeof(void *);
size_t new_usable;
void *new_data = flax_malloc_usable(new_alloc, &new_usable);
if (f->data && f->numele > 0) {
memcpy(new_data, f->data, (size_t)f->numele * sizeof(uint8_t));
memcpy((char *)new_data + new_voff,
(char *)f->data + flax_values_offset(f->capacity),
(size_t)f->numele * sizeof(void *));
}
size_t old_usable;
flax_free_usable(f->data, &old_usable);
f->data = new_data;
f->capacity = new_capacity;
f->alloc_size += new_usable - old_usable;
}
/* Update the iterator key and data fields from the underlying flax
* at the current index position. */
static void flaxIterRefresh(flaxIterator *it) {
it->key = flax_keys(it->f)[it->idx];
it->data = flax_values(it->f)[it->idx];
}
/* ----------------------------------------------------------------------------
* Core API
* -------------------------------------------------------------------------- */
/* Allocate a new flax and return its pointer. On out of memory the function
* returns NULL. */
flax *flaxNew(void) {
size_t usable;
flax *f = flax_malloc_usable(sizeof(flax), &usable);
f->alloc_size = usable;
f->numele = 0;
f->capacity = FLAX_INIT_CAPACITY;
size_t voff = flax_values_offset(FLAX_INIT_CAPACITY);
f->data = flax_malloc_usable(voff + (size_t)FLAX_INIT_CAPACITY * sizeof(void *), &usable);
f->alloc_size += usable;
return f;
}
/* Generic insert. If 'overwrite' is true and the key already exists, the
* associated data is updated and 0 is returned. If 'overwrite' is false
* and the key exists, the data is left unchanged and 0 is returned. In
* both cases, if 'old' is not NULL the previous value is stored there.
* When the key is new, a new element is created and 1 is returned (and
* *old is set to NULL if provided). */
static int flaxGenericInsert(flax *f, uint8_t key, void *data, void **old, int overwrite) {
int16_t idx;
if (flax_search(flax_keys(f), f->numele, key, &idx)) {
if (old) *old = flax_values(f)[idx];
if (overwrite) flax_values(f)[idx] = data;
return 0;
}
if (f->numele == f->capacity)
flax_resize(f, f->capacity * 2);
uint8_t *keys = flax_keys(f);
void **vals = flax_values(f);
int16_t tail = f->numele - idx; /* elements from [idx] onward that must shift right */
/* Shift elements after idx one slot to the right to open a gap. */
if (tail > 0) {
memmove(&keys[idx + 1], &keys[idx], (size_t)tail * sizeof(uint8_t));
memmove(&vals[idx + 1], &vals[idx], (size_t)tail * sizeof(void *));
}
keys[idx] = key;
vals[idx] = data;
f->numele++;
if (old) *old = NULL;
return 1; /* new element created */
}
/* Overwriting insert. This is just a wrapper for flaxGenericInsert(). */
int flaxInsert(flax *f, uint8_t key, void *data, void **old) {
return flaxGenericInsert(f, key, data, old, 1);
}
/* Non overwriting insert function: this is just a wrapper for
* flaxGenericInsert(). */
int flaxTryInsert(flax *f, uint8_t key, void *data, void **old) {
return flaxGenericInsert(f, key, data, old, 0);
}
/* Remove the specified item. Returns 1 if the item was found and
* deleted, 0 otherwise. If 'old' is not NULL the removed value is
* stored at that address. */
int flaxRemove(flax *f, uint8_t key, void **old) {
if (!f || f->numele == 0) {
if (old) *old = NULL;
return 0;
}
int16_t idx;
if (!flax_search(flax_keys(f), f->numele, key, &idx)) {
if (old) *old = NULL;
return 0;
}
uint8_t *keys = flax_keys(f);
void **vals = flax_values(f);
if (old) *old = vals[idx];
int16_t tail = f->numele - idx - 1; /* elements after [idx] that must shift left */
/* Collapse the gap left by the removed element. */
if (tail > 0) {
memmove(&keys[idx], &keys[idx + 1], (size_t)tail * sizeof(uint8_t));
memmove(&vals[idx], &vals[idx + 1], (size_t)tail * sizeof(void *));
}
f->numele--;
if (f->numele > 0 &&
f->capacity > FLAX_INIT_CAPACITY &&
f->numele <= f->capacity / 2)
{
uint16_t new_cap = f->capacity / 2;
if (new_cap < FLAX_INIT_CAPACITY) new_cap = FLAX_INIT_CAPACITY;
flax_resize(f, new_cap);
}
return 1;
}
/* Find a key in the flax, returning 1 if found, 0 otherwise. If the key
* is found and 'value' is not NULL, the associated data pointer is stored
* at that address. */
int flaxFind(flax *f, uint8_t key, void **value) {
if (!f || f->numele == 0) {
if (value) *value = NULL;
return 0;
}
int16_t idx;
if (flax_search(flax_keys(f), f->numele, key, &idx)) {
if (value) *value = flax_values(f)[idx];
return 1;
}
if (value) *value = NULL;
return 0;
}
/* Free a whole flax. */
void flaxFree(flax *f) {
if (!f) return;
flax_free(f->data);
flax_free(f);
}
/* Free a whole flax, calling the specified callback with a context
* argument in order to free the auxiliary data. */
void flaxFreeWithCallback(flax *f,
void (*free_callback)(void *item, void *ctx),
void *ctx) {
if (!f) return;
if (free_callback && f->data && f->numele > 0) {
void **vals = flax_values(f);
for (uint32_t i = 0; i < f->numele; i++)
free_callback(vals[i], ctx);
}
flax_free(f->data);
flax_free(f);
}
/* Return the number of elements inside the flax. */
uint16_t flaxSize(flax *f) {
return f->numele;
}
/* Return the total heap memory used by the flax struct and its data block.
* O(1): the value is maintained incrementally by alloc/resize operations. */
size_t flaxAllocSize(flax *f) {
if (!f) return 0;
return f->alloc_size;
}
/* Shrink the internal storage to fit the current number of elements,
* releasing unused memory. No-op when the flax is empty (the caller should
* flaxFree() the whole structure instead) or already at exact capacity. */
void flaxShrink(flax *f) {
if (f->numele > 0 && f->numele < f->capacity)
flax_resize(f, f->numele);
}
/* ------------------------------- Iterator --------------------------------- */
/* Initialize a flax iterator. This call should be performed a single time
* to initialize the iterator, and must be followed by a flaxSeek() call,
* otherwise the flaxNext() function will just return 0. */
void flaxStart(flaxIterator *it, flax *f) {
it->f = f;
it->idx = -1;
it->key = 0;
it->data = NULL;
}
/* Seek an iterator at the specified element. The 'op' argument selects the
* seek mode: "^" for the first element, "$" for the last, ">=" for greater
* or equal. Return 0 if no matching element was found, otherwise 1 is
* returned. */
int flaxSeek(flaxIterator *it, const char *op, uint8_t key) {
if (!it->f || it->f->numele == 0) {
it->idx = -1;
it->key = 0;
it->data = NULL;
return 0;
}
if (op[0] == '^') {
it->idx = 0;
flaxIterRefresh(it);
return 1;
}
if (op[0] == '$') {
it->idx = it->f->numele - 1;
flaxIterRefresh(it);
return 1;
}
if (op[0] == '>' && op[1] == '=') {
int16_t idx;
flax_search(flax_keys(it->f), it->f->numele, key, &idx);
if (idx >= it->f->numele) {
it->idx = -1;
it->key = 0;
it->data = NULL;
return 0;
}
it->idx = idx;
flaxIterRefresh(it);
return 1;
}
assert(0 && "flaxSeek: unrecognized op");
it->idx = -1;
it->key = 0;
it->data = NULL;
return 0;
}
/* Go to the next element in the scope of the iterator 'it'.
* If EOF is reached, 0 is returned, otherwise 1 is returned. */
int flaxNext(flaxIterator *it) {
if (it->idx < 0) return 0;
it->idx++;
if (it->idx >= it->f->numele) {
it->idx = -1;
it->key = 0;
it->data = NULL;
return 0;
}
flaxIterRefresh(it);
return 1;
}
/* Replace the data pointer at the current iterator position. Unlike
* flaxInsert(), this is safe to call during iteration: it only writes to
* the value slot at the current index and never touches the key layout,
* element count, or capacity. The iterator's own 'data' field is updated
* to reflect the new value. */
void flaxIterSetData(flaxIterator *it, void *data) {
assert(it->idx >= 0 && it->idx < it->f->numele);
flax_values(it->f)[it->idx] = data;
it->data = data;
}
/* ----------------------------- Unit tests --------------------------------- */
#ifdef REDIS_TEST
#include "testhelp.h"
#include <stdio.h>
#include <string.h>
#define UNUSED(x) (void)(x)
#define ERR(x, ...) \
do { \
printf("%s:%s:%d:\t", __FILE__, __func__, __LINE__); \
printf("ERROR! " x "\n", __VA_ARGS__); \
err++; \
} while (0)
#define TEST(name) printf("test — %s\n", name);
static int flax_test_free_count;
static void flax_test_counting_free(void *p, void *ctx) {
(void)ctx;
flax_test_free_count++;
flax_free(p);
}
static void flax_test_ctx_free(void *p, void *ctx) {
(void)p;
int *cnt = ctx;
(*cnt)++;
}
int flaxTest(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
int err = 0;
TEST("new and free empty") {
flax *a = flaxNew();
assert(a != NULL);
assert(a->numele == 0);
assert(a->capacity == FLAX_INIT_CAPACITY);
assert(a->data != NULL);
flaxFree(a);
}
TEST("find on empty flax") {
flax *a = flaxNew();
void *val;
assert(flaxFind(a, 42, &val) == 0);
assert(val == NULL);
assert(flaxFind(a, 1, &val) == 0);
flaxFree(a);
}
TEST("insert and find") {
flax *a = flaxNew();
void *old, *val;
flaxInsert(a, 30, "thirty", &old);
assert(old == NULL);
flaxInsert(a, 10, "ten", &old);
assert(old == NULL);
flaxInsert(a, 50, "fifty", &old);
assert(old == NULL);
flaxInsert(a, 20, "twenty", &old);
assert(old == NULL);
flaxInsert(a, 40, "forty", &old);
assert(old == NULL);
assert(flaxSize(a) == 5);
assert(flaxFind(a, 10, &val) == 1);
assert(strcmp(val, "ten") == 0);
assert(flaxFind(a, 20, &val) == 1);
assert(strcmp(val, "twenty") == 0);
assert(flaxFind(a, 30, &val) == 1);
assert(strcmp(val, "thirty") == 0);
assert(flaxFind(a, 40, &val) == 1);
assert(strcmp(val, "forty") == 0);
assert(flaxFind(a, 50, &val) == 1);
assert(strcmp(val, "fifty") == 0);
assert(flaxFind(a, 99, &val) == 0);
assert(flaxFind(a, 0, &val) == 0);
flaxFree(a);
}
TEST("insert duplicate replaces value") {
flax *a = flaxNew();
flaxInsert(a, 5, "old_five", NULL);
flaxInsert(a, 10, "old_ten", NULL);
void *old, *val;
flaxInsert(a, 5, "new_five", &old);
assert(old != NULL);
assert(strcmp(old, "old_five") == 0);
assert(flaxSize(a) == 2);
assert(flaxFind(a, 5, &val) == 1);
assert(strcmp(val, "new_five") == 0);
flaxInsert(a, 10, "new_ten", &old);
assert(strcmp(old, "old_ten") == 0);
assert(flaxSize(a) == 2);
assert(flaxFind(a, 10, &val) == 1);
assert(strcmp(val, "new_ten") == 0);
flaxFree(a);
}
TEST("remove basic") {
flax *a = flaxNew();
flaxInsert(a, 1, "one", NULL);
flaxInsert(a, 2, "two", NULL);
flaxInsert(a, 3, "three", NULL);
void *old, *val;
assert(flaxRemove(a, 2, &old) == 1);
assert(strcmp(old, "two") == 0);
assert(flaxSize(a) == 2);
assert(flaxFind(a, 2, &val) == 0);
assert(flaxFind(a, 1, &val) == 1);
assert(strcmp(val, "one") == 0);
assert(flaxFind(a, 3, &val) == 1);
assert(strcmp(val, "three") == 0);
flaxFree(a);
}
TEST("remove not found") {
flax *a = flaxNew();
flaxInsert(a, 1, "one", NULL);
void *old;
assert(flaxRemove(a, 99, &old) == 0);
assert(old == NULL);
assert(flaxSize(a) == 1);
flax *b = flaxNew();
assert(flaxRemove(b, 1, &old) == 0);
flaxFree(b);
flaxFree(a);
}
TEST("remove only element") {
flax *a = flaxNew();
flaxInsert(a, 42, "answer", NULL);
void *old, *val;
assert(flaxRemove(a, 42, &old) == 1);
assert(strcmp(old, "answer") == 0);
assert(flaxSize(a) == 0);
assert(flaxFind(a, 42, &val) == 0);
flaxFree(a);
}
TEST("insert at beginning and end") {
flax *a = flaxNew();
flaxInsert(a, 50, "middle", NULL);
flaxInsert(a, 100, "end", NULL);
flaxInsert(a, 1, "begin", NULL);
void *val;
assert(flaxSize(a) == 3);
assert(flaxFind(a, 1, &val) == 1);
assert(strcmp(val, "begin") == 0);
assert(flaxFind(a, 50, &val) == 1);
assert(strcmp(val, "middle") == 0);
assert(flaxFind(a, 100, &val) == 1);
assert(strcmp(val, "end") == 0);
flaxFree(a);
}
TEST("grow beyond initial capacity") {
flax *a = flaxNew();
for (int i = 0; i < 128; i++) {
char *buf = flax_malloc(16);
snprintf(buf, 16, "v%d", i);
flaxInsert(a, (uint8_t)(i * 2), buf, NULL);
}
assert(flaxSize(a) == 128);
assert(a->capacity >= 128);
for (int i = 0; i < 128; i++) {
char expected[16];
snprintf(expected, sizeof(expected), "v%d", i);
void *val;
assert(flaxFind(a, (uint8_t)(i * 2), &val) == 1);
if (strcmp(val, expected) != 0) {
ERR("grow: key %d expected '%s' got '%s'",
i * 2, expected, (char *)val);
}
}
flaxFreeWithCallback(a, flax_test_counting_free, NULL);
}
TEST("auto-shrink on remove") {
flax *a = flaxNew();
for (int i = 0; i < 64; i++)
flaxInsert(a, (uint8_t)i, "x", NULL);
assert(flaxSize(a) == 64);
uint16_t cap_full = a->capacity;
for (int i = 0; i < 56; i++)
flaxRemove(a, (uint8_t)i, NULL);
assert(flaxSize(a) == 8);
if (a->capacity >= cap_full) {
ERR("auto-shrink: capacity %u should be less than %u after removals",
a->capacity, cap_full);
}
for (int i = 56; i < 64; i++) {
void *val;
assert(flaxFind(a, (uint8_t)i, &val) == 1);
assert(strcmp(val, "x") == 0);
}
flaxFree(a);
}
TEST("explicit shrink after removals") {
flax *a = flaxNew();
for (int i = 0; i < 64; i++)
flaxInsert(a, (uint8_t)i, "x", NULL);
for (int i = 0; i < 56; i++)
flaxRemove(a, (uint8_t)i, NULL);
assert(flaxSize(a) == 8);
uint16_t cap_after_remove = a->capacity;
flaxShrink(a);
assert(a->capacity <= cap_after_remove);
assert(a->capacity >= a->numele);
for (int i = 56; i < 64; i++) {
void *val;
assert(flaxFind(a, (uint8_t)i, &val) == 1);
assert(strcmp(val, "x") == 0);
}
flaxFree(a);
}
TEST("no shrink below FLAX_INIT_CAPACITY") {
flax *a = flaxNew();
for (int i = 0; i < 4; i++)
flaxInsert(a, (uint8_t)i, "x", NULL);
assert(a->capacity == FLAX_INIT_CAPACITY);
for (int i = 0; i < 3; i++)
flaxRemove(a, (uint8_t)i, NULL);
assert(flaxSize(a) == 1);
assert(a->capacity == FLAX_INIT_CAPACITY);
flaxFree(a);
}
TEST("flaxFreeWithCallback invokes callback") {
flax_test_free_count = 0;
flax *a = flaxNew();
for (int i = 0; i < 5; i++) {
char *s = flax_malloc(8);
snprintf(s, 8, "str%d", i);
flaxInsert(a, i, s, NULL);
}
flaxFreeWithCallback(a, flax_test_counting_free, NULL);
if (flax_test_free_count != 5) {
ERR("freeWithCallback: expected 5 frees, got %d",
flax_test_free_count);
}
}
TEST("flaxFreeWithCallback on empty flax") {
flax_test_free_count = 0;
flax *a = flaxNew();
flaxFreeWithCallback(a, flax_test_counting_free, NULL);
if (flax_test_free_count != 0) {
ERR("freeWithCallback empty: expected 0 frees, got %d",
flax_test_free_count);
}
}
TEST("keys near uint8 boundaries") {
flax *a = flaxNew();
flaxInsert(a, 0, "zero", NULL);
flaxInsert(a, 255, "max", NULL);
flaxInsert(a, 254, "max-1", NULL);
flaxInsert(a, 100, "hundred", NULL);
void *val;
assert(flaxSize(a) == 4);
assert(flaxFind(a, 0, &val) == 1);
assert(strcmp(val, "zero") == 0);
assert(flaxFind(a, 100, &val) == 1);
assert(strcmp(val, "hundred") == 0);
assert(flaxFind(a, 254, &val) == 1);
assert(strcmp(val, "max-1") == 0);
assert(flaxFind(a, 255, &val) == 1);
assert(strcmp(val, "max") == 0);
flaxFree(a);
}
TEST("flaxTryInsert does not overwrite") {
flax *a = flaxNew();
assert(flaxTryInsert(a, 10, "ten", NULL) == 1);
assert(flaxTryInsert(a, 20, "twenty", NULL) == 1);
assert(flaxSize(a) == 2);
void *old, *val;
assert(flaxTryInsert(a, 10, "new_ten", &old) == 0);
assert(strcmp(old, "ten") == 0);
assert(flaxSize(a) == 2);
assert(flaxFind(a, 10, &val) == 1);
assert(strcmp(val, "ten") == 0);
flaxFree(a);
}
TEST("iterator on empty flax") {
flax *a = flaxNew();
flaxIterator it;
flaxStart(&it, a);
assert(flaxSeek(&it, "^", 0) == 0);
assert(flaxSeek(&it, "$", 0) == 0);
assert(flaxSeek(&it, ">=", 42) == 0);
flaxFree(a);
}
TEST("iterator forward") {
flax *a = flaxNew();
flaxInsert(a, 10, "ten", NULL);
flaxInsert(a, 30, "thirty", NULL);
flaxInsert(a, 20, "twenty", NULL);
flaxInsert(a, 40, "forty", NULL);
flaxIterator it;
flaxStart(&it, a);
assert(flaxSeek(&it, "^", 0));
assert(it.key == 10);
assert(strcmp(it.data, "ten") == 0);
assert(flaxNext(&it));
assert(it.key == 20);
assert(flaxNext(&it));
assert(it.key == 30);
assert(flaxNext(&it));
assert(it.key == 40);
assert(flaxNext(&it) == 0);
flaxFree(a);
}
TEST("iterator seek >=") {
flax *a = flaxNew();
flaxInsert(a, 10, "ten", NULL);
flaxInsert(a, 20, "twenty", NULL);
flaxInsert(a, 30, "thirty", NULL);
flaxInsert(a, 40, "forty", NULL);
flaxIterator it;
flaxStart(&it, a);
assert(flaxSeek(&it, ">=", 20));
assert(it.key == 20);
assert(flaxSeek(&it, ">=", 25));
assert(it.key == 30);
assert(flaxSeek(&it, ">=", 5));
assert(it.key == 10);
assert(flaxSeek(&it, ">=", 41) == 0);
flaxFree(a);
}
TEST("iterator on single element") {
flax *a = flaxNew();
flaxInsert(a, 42, "answer", NULL);
flaxIterator it;
flaxStart(&it, a);
assert(flaxSeek(&it, "^", 0));
assert(it.key == 42);
assert(strcmp(it.data, "answer") == 0);
assert(flaxNext(&it) == 0);
flaxFree(a);
}
TEST("flaxFreeWithCallback") {
int ctx_free_count = 0;
flax *a = flaxNew();
flaxInsert(a, 1, "one", NULL);
flaxInsert(a, 2, "two", NULL);
flaxInsert(a, 3, "three", NULL);
flaxFreeWithCallback(a, flax_test_ctx_free, &ctx_free_count);
if (ctx_free_count != 3) {
ERR("freeWithCallback: expected 3 frees, got %d",
ctx_free_count);
}
}
TEST("flaxAllocSize tracks allocations") {
flax *a = flaxNew();
size_t sz0 = flaxAllocSize(a);
assert(sz0 > 0);
for (int i = 0; i < 64; i++)
flaxInsert(a, (uint8_t)i, "x", NULL);
size_t sz1 = flaxAllocSize(a);
assert(sz1 >= sz0);
flaxShrink(a);
size_t sz2 = flaxAllocSize(a);
assert(sz2 <= sz1);
assert(sz2 > 0);
assert(flaxAllocSize(NULL) == 0);
flaxFree(a);
}
TEST("flaxFree and flaxFreeWithCallback on NULL") {
flaxFree(NULL);
flaxFreeWithCallback(NULL, flax_test_counting_free, NULL);
}
TEST("flaxFind and flaxRemove on NULL flax") {
void *val;
assert(flaxFind(NULL, 42, &val) == 0);
assert(val == NULL);
assert(flaxRemove(NULL, 42, &val) == 0);
assert(val == NULL);
}
TEST("flaxTryInsert with old=NULL on duplicate") {
flax *a = flaxNew();
assert(flaxTryInsert(a, 10, "ten", NULL) == 1);
assert(flaxTryInsert(a, 10, "new_ten", NULL) == 0);
assert(flaxSize(a) == 1);
void *val;
assert(flaxFind(a, 10, &val) == 1);
assert(strcmp(val, "ten") == 0);
flaxFree(a);
}
TEST("flaxIterSetData replaces value during iteration") {
flax *a = flaxNew();
flaxInsert(a, 10, "ten", NULL);
flaxInsert(a, 20, "twenty", NULL);
flaxInsert(a, 30, "thirty", NULL);
flaxIterator it;
flaxStart(&it, a);
assert(flaxSeek(&it, "^", 0));
do {
if (it.key == 20) flaxIterSetData(&it, "TWENTY");
} while (flaxNext(&it));
void *val;
assert(flaxFind(a, 10, &val) == 1);
assert(strcmp(val, "ten") == 0);
assert(flaxFind(a, 20, &val) == 1);
assert(strcmp(val, "TWENTY") == 0);
assert(flaxFind(a, 30, &val) == 1);
assert(strcmp(val, "thirty") == 0);
assert(flaxSize(a) == 3);
flaxFree(a);
}
if (!err)
printf("ALL TESTS PASSED!\n");
else
ERR("Sorry, not all tests passed! In fact, %d tests failed.", err);
return err;
}
#endif

100
src/flax.h Normal file
View file

@ -0,0 +1,100 @@
/* Flax -- A flat sorted-array map for uint8_t keys.
*
* Copyright (c) 2025-Present, Redis Ltd.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3).
*/
#ifndef FLAX_H
#define FLAX_H
#include <stdint.h>
#include <stddef.h>
#define FLAX_INIT_CAPACITY 16
/* A flax is a sorted associative container that maps uint8_t keys to void*
* values. Both arrays live in a single heap allocation ("data block") laid
* out as follows:
*
* flax struct data block (single allocation)
* +------------+ +------------------------------------+
* | *data -----------> | keys[0..cap-1] (uint8_t) |
* | numele | +-- aligned to sizeof(void*) --------+
* | capacity | | values[0..cap-1] (void*) |
* | alloc_size | +------------------------------------+
* +------------+
*
* Keys are maintained in ascending sorted order. Only the first 'numele'
* slots in each array contain live data; the remainder up to 'capacity'
* is reserved space for future inserts.
*
* Lookup, insert and delete use a linear scan over the keys array rather
* than binary search. This is intentional: the expected element count is
* small (e.g. per-consumer stream PEL), so the sequential, cache-friendly
* access pattern outperforms binary search whose branch-misprediction cost
* dominates at these sizes. The scan includes fast-path checks for the
* head and tail positions to accelerate the common case of monotonically
* increasing keys (e.g. stream entry IDs).
*
* Growth strategy: the data block doubles in capacity when full (on insert)
* and can be shrunk to fit with flaxShrink().
*/
typedef struct flax {
void *data; /* Packed storage: keys array followed by values array. */
uint16_t numele; /* Number of elements currently stored (max 256). */
uint16_t capacity; /* Current allocated capacity. */
uint32_t alloc_size; /* Total usable bytes: struct allocation + data block. */
} flax;
/* Flax iterator state. The typical lifecycle is:
*
* flaxIterator it;
* flaxStart(&it, myflax); -- initialize
* flaxSeek(&it, ">=", somekey); -- position
* while (flaxNext(&it)) { ... } -- iterate
*
* After flaxStart() the iterator is in EOF state until a successful
* flaxSeek(). The iterator does not allocate heap memory.
*
* WARNING: the iterator is invalidated by any mutation (insert / remove /
* resize) on the underlying flax. Do not modify the flax while iterating. */
typedef struct flaxIterator {
flax *f; /* Flax we are iterating. */
uint8_t key; /* The current key. */
void *data; /* Data associated to this key. */
int16_t idx; /* Current index into the flax arrays, -1 if EOF. */
} flaxIterator;
/* --- Creation and destruction --- */
flax *flaxNew(void);
void flaxFree(flax *f);
void flaxFreeWithCallback(flax *f,
void (*free_callback)(void *item, void *ctx),
void *ctx);
/* --- Lookup and mutation --- */
int flaxInsert(flax *f, uint8_t key, void *data, void **old);
int flaxTryInsert(flax *f, uint8_t key, void *data, void **old);
int flaxRemove(flax *f, uint8_t key, void **old);
int flaxFind(flax *f, uint8_t key, void **value);
/* --- Iterator --- */
void flaxStart(flaxIterator *it, flax *f);
int flaxSeek(flaxIterator *it, const char *op, uint8_t key);
int flaxNext(flaxIterator *it);
void flaxIterSetData(flaxIterator *it, void *data);
/* --- Introspection --- */
uint16_t flaxSize(flax *f);
size_t flaxAllocSize(flax *f);
void flaxShrink(flax *f);
#ifdef REDIS_TEST
int flaxTest(int argc, char *argv[], int flags);
#endif
#endif

25
src/flax_malloc.h Normal file
View file

@ -0,0 +1,25 @@
/* Flax -- A flat sorted-array map for uint8_t keys.
*
* Copyright (c) 2025-Present, Redis Ltd.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3).
*/
/* Allocator selection.
*
* This file is used in order to change the Flax allocator at compile time.
* Just define the following defines to what you want to use. Also add
* the include of your alternate allocator if needed (not needed in order
* to use the default libc allocator). */
#ifndef FLAX_MALLOC_H
#define FLAX_MALLOC_H
#include "zmalloc.h"
#define flax_malloc zmalloc
#define flax_malloc_usable zmalloc_usable
#define flax_free zfree
#define flax_free_usable zfree_usable
#endif

View file

@ -198,7 +198,7 @@ size_t lazyfreeGetFreeEffort(robj *key, robj *obj, int dbid) {
* work. */
serverAssert(raxNext(&ri));
cg = ri.data;
effort += raxSize(s->cgroups)*(1+raxSize(cg->pel));
effort += raxSize(s->cgroups)*(1+cg->pel_count);
raxStop(&ri);
}
return effort;

View file

@ -752,35 +752,35 @@ int rdbLoadObjectType(rio *rdb) {
* we serialized the NACKs as well, but when serializing the local consumer
* PELs we just add the ID, that will be resolved inside the global PEL to
* put a reference to the same structure. */
ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, int nacks) {
ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, uint64_t pel_count, int nacks) {
ssize_t n, nwritten = 0;
/* Number of entries in the PEL. */
if ((n = rdbSaveLen(rdb,raxSize(pel))) == -1) return -1;
if ((n = rdbSaveLen(rdb,pel_count)) == -1) return -1;
nwritten += n;
/* Save each entry. */
raxIterator ri;
raxStart(&ri,pel);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
/* We store IDs in raw form as 128 big big endian numbers, like
* they are inside the radix tree key. */
if ((n = rdbWriteRaw(rdb,ri.key,sizeof(streamID))) == -1) {
raxStop(&ri);
pelIterator pi;
pelIterStart(&pi,pel);
pelIterSeek(&pi,"^",NULL);
while (pelIterNext(&pi)) {
/* We store IDs in raw form as 128 big big endian numbers,
* reconstructed from the two-level structure. */
if ((n = rdbWriteRaw(rdb,pi.key,sizeof(streamID))) == -1) {
pelIterStop(&pi);
return -1;
}
nwritten += n;
if (nacks) {
streamNACK *nack = ri.data;
streamNACK *nack = pi.data;
if ((n = rdbSaveMillisecondTime(rdb,nack->delivery_time)) == -1) {
raxStop(&ri);
pelIterStop(&pi);
return -1;
}
nwritten += n;
if ((n = rdbSaveLen(rdb,nack->delivery_count)) == -1) {
raxStop(&ri);
pelIterStop(&pi);
return -1;
}
nwritten += n;
@ -789,7 +789,7 @@ ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, int nacks) {
* at loading time. */
}
}
raxStop(&ri);
pelIterStop(&pi);
return nwritten;
}
@ -1040,7 +1040,7 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) {
* passed with value of 0), at loading time we'll lookup the ID
* in the consumer group global PEL and will put a reference in the
* consumer local PEL. */
if ((n = rdbSaveStreamPEL(rdb,consumer->pel,0)) == -1) {
if ((n = rdbSaveStreamPEL(rdb,consumer->pel,consumer->pel_count,0)) == -1) {
raxStop(&ri);
return -1;
}
@ -1416,7 +1416,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
nwritten += n;
/* Save the global PEL. */
if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) {
if ((n = rdbSaveStreamPEL(rdb,cg->pel,cg->pel_count,1)) == -1) {
raxStop(&ri);
return -1;
}
@ -3483,14 +3483,14 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error)
streamNACK *nack = streamCreateNACK(s, NULL, &nack_id);
nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
nack->delivery_count = rdbLoadLen(rdb,NULL);
nack->cgroup_ref_node = streamLinkCGroupToEntry(s, cgroup, rawid);
nack->cgroup_ref_node = streamLinkCGroupToEntry(s, cgroup, &nack_id);
if (rioGetReadError(rdb)) {
rdbReportReadError("Stream PEL NACK loading failed.");
streamFreeNACK(s, nack);
decrRefCount(o);
return NULL;
}
if (!raxTryInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL)) {
if (!pelTryInsert(cgroup->pel,&nack_id,nack,&cgroup->pel_count)) {
rdbReportCorruptRDB("Duplicated global PEL entry "
"loading stream consumer group");
streamFreeNACK(s, nack);
@ -3563,8 +3563,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error)
decrRefCount(o);
return NULL;
}
streamID nack_id;
streamDecodeID(rawid, &nack_id);
void *result;
if (!raxFind(cgroup->pel,rawid,sizeof(rawid),&result)) {
if (!pelFind(cgroup->pel, &nack_id, &result)) {
rdbReportCorruptRDB("Consumer entry not found in "
"group global PEL");
decrRefCount(o);
@ -3584,7 +3586,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error)
* loading the global PEL. Then set the same shared
* NACK structure also in the consumer-specific PEL. */
nack->consumer = consumer;
if (!raxTryInsert(consumer->pel,rawid,sizeof(rawid),nack,NULL)) {
if (!pelTryInsert(consumer->pel,&nack_id,nack,&consumer->pel_count)) {
rdbReportCorruptRDB("Duplicated consumer PEL entry "
" loading a stream consumer "
"group");
@ -3619,8 +3621,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error)
return NULL;
}
streamID nack_id;
streamDecodeID(rawid, &nack_id);
void *result;
if (!raxFind(cgroup->pel, rawid, sizeof(rawid), &result)) {
if (!pelFind(cgroup->pel, &nack_id, &result)) {
rdbReportCorruptRDB("Stream NACK zone entry not found "
"in group global PEL");
decrRefCount(o);

View file

@ -7883,6 +7883,7 @@ struct redisTest {
{"vector", vectorTest},
{"bitmap", bitopsTest},
{"rax", raxTest},
{"flax", flaxTest},
{"zset", zsetTest},
{"topk", chkTopKTest},
{"fastfloat", fastFloatTest},

View file

@ -64,6 +64,7 @@ typedef long long ustime_t; /* microsecond time type. */
#include "quicklist.h" /* Lists are encoded as linked lists of
N-elements flat arrays */
#include "rax.h" /* Radix tree */
#include "flax.h" /* Flat sorted array */
#include "connection.h" /* Connection abstraction */
#include "eventnotifier.h" /* Event notification */
#include "memory_prefetch.h"

View file

@ -5,6 +5,18 @@
#include "listpack.h"
#include "dict.h"
#include "xxhash.h"
#include "flax.h"
#include <stdint.h>
/* Two-level PEL key-length convention:
*
* Single-entry ("direct") buckets use a 16-byte rax key (the complete
* big-endian streamID) and store the data pointer directly as the rax
* value. Multi-entry ("flax") buckets use a 15-byte rax key (ms + upper
* 7 bytes of seq) and store a flax* mapping the low byte of seq to data
* pointers. The key length in rax (16 vs 15) distinguishes the two. */
#define PEL_RAX_DIRECT_KEYLEN sizeof(streamID) /* 16 */
#define PEL_RAX_FLAX_KEYLEN 15
/* Stream item ID: a 128 bit number composed of a milliseconds time and
* a sequence counter. IDs generated in the same millisecond (or in a past
@ -42,7 +54,11 @@ typedef struct stream {
uint64_t entries_added; /* All time count of elements added. */
size_t alloc_size; /* Total allocated memory (in bytes) by this stream. */
rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
rax *cgroups_ref; /* Index mapping message IDs to their consumer groups. */
rax *cgroups_ref; /* Two-level index mapping message IDs to their
consumer groups. Same key scheme as PEL:
outer rax -> flax(low byte of seq -> list*
of streamCG pointers). Direct / flax key
lengths follow the PEL convention (16/15). */
streamID min_cgroup_last_id; /* The minimum ID of consume group. */
unsigned int min_cgroup_last_id_valid: 1;
uint64_t idmp_duration; /* IDMP duration in seconds. */
@ -99,12 +115,13 @@ typedef struct streamCG {
group reads. In the real world, the reasoning behind
this value is detailed at the top comment of
streamEstimateDistanceFromFirstEverEntry(). */
rax *pel; /* Pending entries list. This is a radix tree that
has every message delivered to consumers (without
the NOACK option) that was yet not acknowledged
as processed. The key of the radix tree is the
ID as a 64 bit big endian number, while the
associated value is a streamNACK structure.*/
rax *pel; /* Two-level pending entries list. Single-entry
buckets (direct) use a 16-byte rax key (full
big-endian streamID). Multi-entry buckets use
a 15-byte key (ms + upper 7 bytes of seq) and
store a flax* mapping the low byte of seq to
streamNACK pointers. Max 256 per bucket. */
uint64_t pel_count; /* Total number of NACK entries in this PEL (direct + flax). */
streamNACK *pel_time_head; /* Head of time-ordered doubly-linked list of pending
entries (oldest delivery_time). Used for efficient
CLAIM operations. O(1) access to oldest entries. */
@ -127,13 +144,11 @@ typedef struct streamConsumer {
sds name; /* Consumer name. This is how the consumer
will be identified in the consumer group
protocol. Case sensitive. */
rax *pel; /* Consumer specific pending entries list: all
the pending messages delivered to this
consumer not yet acknowledged. Keys are
big endian message IDs, while values are
the same streamNACK structure referenced
in the "pel" of the consumer group structure
itself, so the value is shared. */
rax *pel; /* Two-level consumer PEL: same structure as
streamCG.pel 16-byte key for direct,
15-byte key for flax buckets. NACKs are
shared with group PEL. */
uint64_t pel_count; /* Total NACK count for this consumer. */
} streamConsumer;
/* Pending (yet not acknowledged) message in a consumer group. */
@ -183,7 +198,7 @@ void streamEncodeID(void *buf, streamID *id);
void streamDecodeID(void *buf, streamID *id);
int streamCompareID(streamID *a, streamID *b);
void streamFreeNACK(stream *s, streamNACK *na);
void streamDestroyNACK(stream *s, streamNACK *na, unsigned char *key);
void streamDestroyNACK(stream *s, streamNACK *na);
int streamIncrID(streamID *id);
int streamDecrID(streamID *id);
void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername);
@ -201,7 +216,37 @@ int streamEntryExists(stream *s, streamID *id);
void streamKeyLoaded(redisDb *db, robj *key, robj *val);
void streamKeyRemoved(redisDb *db, robj *key, robj *val);
listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, unsigned char *key);
listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, streamID *id);
/* Two-level PEL iterator: walks outer rax and inner flax.
* Direct entries (16-byte rax key) have no flax allocation;
* direct tracks whether the current entry is direct. */
typedef struct pelIterator {
raxIterator ri;
flaxIterator fi;
int valid;
int just_seeked;
int direct;
streamID id;
void *data;
unsigned char key[sizeof(streamID)];
} pelIterator;
/* Two-level PEL operations. */
void pelCacheInvalidate(rax *pel);
rax *pelNew(size_t *alloc_size);
void pelFree(rax *pel, void (*nack_free)(void *, void *), void *ctx);
void pelFreeShallow(rax *pel);
int pelInsert(rax *pel, streamID *id, void *data, uint64_t *count);
int pelTryInsert(rax *pel, streamID *id, void *data, uint64_t *count);
void pelReplace(rax *pel, streamID *id, void *data);
int pelFind(rax *pel, streamID *id, void **data);
void *pelRemove(rax *pel, streamID *id, uint64_t *count);
void pelIterStart(pelIterator *pi, rax *pel);
int pelIterSeek(pelIterator *pi, const char *op, streamID *id);
int pelIterNext(pelIterator *pi);
void pelIterStop(pelIterator *pi);
/* PEL time list management (used by RDB loading) */
void pelListInsertSorted(streamCG *cg, streamNACK *nack);

File diff suppressed because it is too large Load diff