diff --git a/src/Makefile b/src/Makefile index 8939f89e0..86c40b722 100644 --- a/src/Makefile +++ b/src/Makefile @@ -384,7 +384,7 @@ endif REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX) -REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o entry.o kvstore.o fwtree.o estore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_array.o sparsearray.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_asm.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o keymeta.o chk.o hotkeys.o gcra.o vector.o fast_float_strtod.o +REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o entry.o kvstore.o fwtree.o estore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_array.o sparsearray.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_asm.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o flax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o keymeta.o chk.o hotkeys.o gcra.o vector.o fast_float_strtod.o REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX) REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX) diff --git a/src/aof.c b/src/aof.c index 9e55a78b7..16c169597 100644 --- a/src/aof.c +++ b/src/aof.c @@ -2353,7 +2353,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { while(raxNext(&ri_cons)) { streamConsumer *consumer = ri_cons.data; /* If there are no pending entries, just emit XGROUP CREATECONSUMER */ - if (raxSize(consumer->pel) == 0) { + if (consumer->pel_count == 0) { if (rioWriteStreamEmptyConsumer(r,key,(char*)ri.key, ri.key_len,consumer) == 0) { @@ -2365,22 +2365,22 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { } /* For the current consumer, iterate all the PEL entries * to emit the XCLAIM protocol. */ - raxIterator ri_pel; - raxStart(&ri_pel,consumer->pel); - raxSeek(&ri_pel,"^",NULL,0); - while(raxNext(&ri_pel)) { - streamNACK *nack = ri_pel.data; + pelIterator pi; + pelIterStart(&pi,consumer->pel); + pelIterSeek(&pi,"^",NULL); + while (pelIterNext(&pi)) { + streamNACK *nack = pi.data; if (rioWriteStreamPendingEntry(r,key,(char*)ri.key, ri.key_len,consumer, - ri_pel.key,nack) == 0) + pi.key,nack) == 0) { - raxStop(&ri_pel); + pelIterStop(&pi); raxStop(&ri_cons); raxStop(&ri); return 0; } } - raxStop(&ri_pel); + pelIterStop(&pi); } raxStop(&ri_cons); diff --git a/src/defrag.c b/src/defrag.c index 913e457c2..15df6c8f4 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -19,6 +19,7 @@ */ #include "server.h" +#include "stream.h" #include #include @@ -881,48 +882,34 @@ void defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_c raxStop(&ri); } -void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) { +/* Walk a consumer-PEL entry and fix up the consumer back-pointer for + * every NACK. NACKs themselves are defragged during the group PEL walk + * (defragStreamCGPendingEntry) which also covers unowned NACK-zone entries; + * here we only repair the stale consumer pointer. */ +void* defragStreamConsumerPelEntry(raxIterator *ri, void *privdata) { streamConsumer *c = privdata; - streamNACK *nack = ri->data; - /* NACKs are already defragged by the CG PEL walk (defragStreamCGPendingEntry). - * cgroup_ref_node->value is also updated there for all NACKs (including - * unowned NACK-zone entries that have no consumer PEL walk). - * Here we only fix up the back-pointer to the possibly-relocated consumer. */ - nack->consumer = c; - return NULL; -} -void* defragStreamCGPendingEntry(raxIterator *ri, void *privdata) { - streamCG *cg = privdata; - streamNACK *nack = ri->data, *newnack; - /* Update cgroup_ref_node to the possibly-relocated CG for every NACK. - * Consumer-owned entries will get this overwritten again redundantly by - * defragStreamConsumerPendingEntry; unowned (NACK zone) entries have no - * consumer PEL walk, so this is their only chance. */ - nack->cgroup_ref_node->value = cg; - newnack = activeDefragAlloc(nack); - if (newnack) { - /* If this NACK is owned by a consumer, update the consumer's PEL. */ - if (newnack->consumer) { - void *prev; - raxInsert(newnack->consumer->pel, ri->key, ri->key_len, newnack, &prev); - serverAssert(prev == nack); - } - if (newnack->pel_prev) { - newnack->pel_prev->pel_next = newnack; - } else { - cg->pel_time_head = newnack; - } - if (newnack->pel_next) { - newnack->pel_next->pel_prev = newnack; - } else { - cg->pel_time_tail = newnack; - } - if (cg->pel_nack_tail == nack) { - cg->pel_nack_tail = newnack; - } + if (ri->key_len == PEL_RAX_DIRECT_KEYLEN) { + streamNACK *nack = ri->data; + nack->consumer = c; + return NULL; } - return newnack; + + flax *f = ri->data; + flax *newflax = activeDefragAlloc(f); + if (newflax) f = newflax; + void *newdata = activeDefragAlloc(f->data); + if (newdata) f->data = newdata; + + flaxIterator fi; + flaxStart(&fi, f); + if (flaxSeek(&fi, "^", 0)) { + do { + streamNACK *nack = fi.data; + nack->consumer = c; + } while (flaxNext(&fi)); + } + return newflax; } void* defragStreamConsumer(raxIterator *ri, void *privdata) { @@ -936,29 +923,90 @@ void* defragStreamConsumer(raxIterator *ri, void *privdata) { if (newsds) c->name = newsds; if (c->pel) { - /* Update pel back-pointer to new stream */ c->pel->alloc_size = &s->alloc_size; - defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, c); + pelCacheInvalidate(c->pel); + defragRadixTree(&c->pel, 0, defragStreamConsumerPelEntry, c); } return newc; /* returns NULL if c was not defragged */ } +/* After a NACK has been relocated by activeDefragAlloc(), fix up all + * cross-references that still point to the old address: the consumer PEL, + * the doubly-linked time list, and the NACK-zone tail. */ +static void relinkStreamNack(streamCG *cg, streamNACK *nack, streamNACK *newnack) { + if (newnack->consumer) { + pelReplace(newnack->consumer->pel, &newnack->id, newnack); + } + if (newnack->pel_prev) { + newnack->pel_prev->pel_next = newnack; + } else { + cg->pel_time_head = newnack; + } + if (newnack->pel_next) { + newnack->pel_next->pel_prev = newnack; + } else { + cg->pel_time_tail = newnack; + } + if (cg->pel_nack_tail == nack) { + cg->pel_nack_tail = newnack; + } +} + +/* Walk a group-PEL entry: for direct entries (16-byte key) defrag the single + * NACK; for flax buckets (15-byte key) defrag the flax struct itself and then + * every NACK inside. Update pointers in the consumer PEL, the doubly-linked + * time list, and the NACK-zone tail. cgroup_ref_node->value is also updated + * here for every NACK (including unowned NACK-zone entries). */ +void* defragStreamCGPendingEntry(raxIterator *ri, void *privdata) { + streamCG *cg = privdata; + + if (ri->key_len == PEL_RAX_DIRECT_KEYLEN) { + streamNACK *nack = ri->data, *newnack; + nack->cgroup_ref_node->value = cg; + newnack = activeDefragAlloc(nack); + if (newnack) { + relinkStreamNack(cg, nack, newnack); + } + return newnack; + } + + flax *f = ri->data; + flax *newf = activeDefragAlloc(f); + if (newf) f = newf; + void *newdata = activeDefragAlloc(f->data); + if (newdata) f->data = newdata; + + flaxIterator fi; + flaxStart(&fi, f); + if (flaxSeek(&fi, "^", 0)) { + do { + streamNACK *nack = fi.data, *newnack; + nack->cgroup_ref_node->value = cg; + newnack = activeDefragAlloc(nack); + if (newnack) { + flaxIterSetData(&fi, newnack); + relinkStreamNack(cg, nack, newnack); + } + } while (flaxNext(&fi)); + } + return newf; +} + void* defragStreamConsumerGroup(raxIterator *ri, void *privdata) { stream *s = privdata; streamCG *newcg, *cg = ri->data; if ((newcg = activeDefragAlloc(cg))) cg = newcg; if (cg->pel) { - /* Update pel back-pointer to new stream */ cg->pel->alloc_size = &s->alloc_size; + pelCacheInvalidate(cg->pel); defragRadixTree(&cg->pel, 0, defragStreamCGPendingEntry, cg); } if (cg->consumers) { - /* Update consumers back-pointer to new stream */ cg->consumers->alloc_size = &s->alloc_size; defragRadixTree(&cg->consumers, 0, defragStreamConsumer, s); } - return cg; + return newcg; } /* Defrag a single idmpProducer's dict and linked list entries. */ diff --git a/src/flax.c b/src/flax.c new file mode 100644 index 000000000..2aa9bffe9 --- /dev/null +++ b/src/flax.c @@ -0,0 +1,901 @@ +/* Flax -- A flat sorted-array map for uint8_t keys. + * + * Copyright (c) 2025-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +#include "flax.h" +#include "redisassert.h" +#include +#include +#include + +#ifndef flax_malloc +#ifndef FLAX_MALLOC_INCLUDE +#define FLAX_MALLOC_INCLUDE "flax_malloc.h" +#endif +#include FLAX_MALLOC_INCLUDE +#endif + +/* ---------------------------------------------------------------------------- + * Flax internals + * + * A flax stores a sorted array of (uint8_t key, void *value) pairs inside a + * single contiguous heap block. The block is split into two sub-arrays: + * + * [ keys: uint8_t * capacity ][ padding ][ values: void* * capacity ] + * + * The padding between keys and values ensures that the values array starts + * at a pointer-aligned offset (see flax_values_offset()). + * + * Only the first `numele` slots in each sub-array hold live data; the + * remainder up to `capacity` is unused reserved space. + * + * Lookup uses linear scan rather than binary search. The expected element + * count is small (e.g. per-consumer stream PEL), so sequential cache-friendly + * access outperforms binary search whose branch-misprediction cost dominates + * at these sizes. Fast-path checks for the head and tail positions further + * accelerate the common case of monotonically increasing keys. + * + * Growth: capacity doubles on insert when full. + * Shrink: flaxShrink() reallocates to fit exactly. + * -------------------------------------------------------------------------- */ + +/* ----------------------------- Internal helpers ---------------------------- */ + +/* Return the byte offset where the values array starts within the data + * block for a given capacity. The offset is aligned to pointer size. */ +static size_t flax_values_offset(uint16_t capacity) { + size_t raw = (size_t)capacity * sizeof(uint8_t); + size_t align = alignof(void *); + return (raw + align - 1) & ~(align - 1); +} + +/* Return a pointer to the keys array inside the flax data block. */ +static uint8_t *flax_keys(flax *f) { + return (uint8_t *)f->data; +} + +/* Return a pointer to the values array inside the flax data block. */ +static void **flax_values(flax *f) { + return (void **)((char *)f->data + flax_values_offset(f->capacity)); +} + +/* Search for 'key' in the sorted 'keys' array of length 'numele'. + * + * Returns 1 if the key is found, storing its position in *out_idx. + * Returns 0 if the key is absent, storing the insertion point in *out_idx + * (i.e. the index where the key would be placed to keep the array sorted). + * + * The search is a linear scan rather than binary search. This is deliberate: + * flax instances are expected to be small (tens of elements -- e.g. a stream + * consumer's PEL). At these sizes, a sequential walk through a contiguous + * uint8_t array is faster than binary search because: + * 1. The entire keys array fits in one or two cache lines. + * 2. Linear access has no branch-misprediction overhead -- the branch + * predictor can reliably learn the "not found yet, keep going" pattern. + * 3. Binary search touches O(log N) *random* cache lines and suffers a + * misprediction at every comparison. + * + * Two fast paths are checked first: + * - Tail: key > keys[numele-1] is the append case, overwhelmingly common + * when keys are monotonically increasing sequence numbers. + * - Head: key <= keys[0] catches prepend and exact-match-at-zero. */ +static int flax_search(const uint8_t *keys, uint16_t numele, uint8_t key, int16_t *out_idx) { + if (numele == 0) { + *out_idx = 0; + return 0; + } + + /* Fast path: append (most common — seq numbers grow monotonically). */ + if (key > keys[numele - 1]) { + *out_idx = numele; + return 0; + } + if (key == keys[numele - 1]) { + *out_idx = numele - 1; + return 1; + } + + /* Fast path: match or prepend at head. */ + if (key <= keys[0]) { + *out_idx = 0; + return key == keys[0]; + } + + /* Linear scan through the middle. */ + for (uint16_t i = 1; i < numele - 1; i++) { + if (keys[i] < key) continue; + *out_idx = i; + return keys[i] == key; + } + + *out_idx = numele - 1; + return 0; +} + +/* Resize the internal storage to 'new_capacity'. + * + * A fresh data block is allocated and the live keys and values are copied + * into it. Because the keys and values sub-arrays sit at different offsets + * that depend on the capacity (the values offset is re-aligned for the new + * capacity), we must perform two independent memcpy operations -- one for + * the keys at the start of the block and one for the values at the new + * aligned offset. The old data block is freed afterwards. + * + * IMPORTANT: this function replaces f->data but never moves the flax struct + * itself. External code (e.g. the PEL cache in t_stream.c) relies on the + * struct pointer remaining stable across resize operations. */ +static void flax_resize(flax *f, uint16_t new_capacity) { + if (new_capacity > UINT8_MAX + 1) new_capacity = UINT8_MAX + 1; + size_t new_voff = flax_values_offset(new_capacity); + size_t new_alloc = new_voff + (size_t)new_capacity * sizeof(void *); + size_t new_usable; + void *new_data = flax_malloc_usable(new_alloc, &new_usable); + + if (f->data && f->numele > 0) { + memcpy(new_data, f->data, (size_t)f->numele * sizeof(uint8_t)); + memcpy((char *)new_data + new_voff, + (char *)f->data + flax_values_offset(f->capacity), + (size_t)f->numele * sizeof(void *)); + } + + size_t old_usable; + flax_free_usable(f->data, &old_usable); + f->data = new_data; + f->capacity = new_capacity; + f->alloc_size += new_usable - old_usable; +} + +/* Update the iterator key and data fields from the underlying flax + * at the current index position. */ +static void flaxIterRefresh(flaxIterator *it) { + it->key = flax_keys(it->f)[it->idx]; + it->data = flax_values(it->f)[it->idx]; +} + +/* ---------------------------------------------------------------------------- + * Core API + * -------------------------------------------------------------------------- */ + +/* Allocate a new flax and return its pointer. On out of memory the function + * returns NULL. */ +flax *flaxNew(void) { + size_t usable; + flax *f = flax_malloc_usable(sizeof(flax), &usable); + f->alloc_size = usable; + f->numele = 0; + f->capacity = FLAX_INIT_CAPACITY; + size_t voff = flax_values_offset(FLAX_INIT_CAPACITY); + f->data = flax_malloc_usable(voff + (size_t)FLAX_INIT_CAPACITY * sizeof(void *), &usable); + f->alloc_size += usable; + return f; +} + +/* Generic insert. If 'overwrite' is true and the key already exists, the + * associated data is updated and 0 is returned. If 'overwrite' is false + * and the key exists, the data is left unchanged and 0 is returned. In + * both cases, if 'old' is not NULL the previous value is stored there. + * When the key is new, a new element is created and 1 is returned (and + * *old is set to NULL if provided). */ +static int flaxGenericInsert(flax *f, uint8_t key, void *data, void **old, int overwrite) { + int16_t idx; + if (flax_search(flax_keys(f), f->numele, key, &idx)) { + if (old) *old = flax_values(f)[idx]; + if (overwrite) flax_values(f)[idx] = data; + return 0; + } + + if (f->numele == f->capacity) + flax_resize(f, f->capacity * 2); + + uint8_t *keys = flax_keys(f); + void **vals = flax_values(f); + int16_t tail = f->numele - idx; /* elements from [idx] onward that must shift right */ + + /* Shift elements after idx one slot to the right to open a gap. */ + if (tail > 0) { + memmove(&keys[idx + 1], &keys[idx], (size_t)tail * sizeof(uint8_t)); + memmove(&vals[idx + 1], &vals[idx], (size_t)tail * sizeof(void *)); + } + + keys[idx] = key; + vals[idx] = data; + f->numele++; + if (old) *old = NULL; + return 1; /* new element created */ +} + +/* Overwriting insert. This is just a wrapper for flaxGenericInsert(). */ +int flaxInsert(flax *f, uint8_t key, void *data, void **old) { + return flaxGenericInsert(f, key, data, old, 1); +} + +/* Non overwriting insert function: this is just a wrapper for + * flaxGenericInsert(). */ +int flaxTryInsert(flax *f, uint8_t key, void *data, void **old) { + return flaxGenericInsert(f, key, data, old, 0); +} + +/* Remove the specified item. Returns 1 if the item was found and + * deleted, 0 otherwise. If 'old' is not NULL the removed value is + * stored at that address. */ +int flaxRemove(flax *f, uint8_t key, void **old) { + if (!f || f->numele == 0) { + if (old) *old = NULL; + return 0; + } + + int16_t idx; + if (!flax_search(flax_keys(f), f->numele, key, &idx)) { + if (old) *old = NULL; + return 0; + } + + uint8_t *keys = flax_keys(f); + void **vals = flax_values(f); + if (old) *old = vals[idx]; + int16_t tail = f->numele - idx - 1; /* elements after [idx] that must shift left */ + + /* Collapse the gap left by the removed element. */ + if (tail > 0) { + memmove(&keys[idx], &keys[idx + 1], (size_t)tail * sizeof(uint8_t)); + memmove(&vals[idx], &vals[idx + 1], (size_t)tail * sizeof(void *)); + } + + f->numele--; + + if (f->numele > 0 && + f->capacity > FLAX_INIT_CAPACITY && + f->numele <= f->capacity / 2) + { + uint16_t new_cap = f->capacity / 2; + if (new_cap < FLAX_INIT_CAPACITY) new_cap = FLAX_INIT_CAPACITY; + flax_resize(f, new_cap); + } + + return 1; +} + +/* Find a key in the flax, returning 1 if found, 0 otherwise. If the key + * is found and 'value' is not NULL, the associated data pointer is stored + * at that address. */ +int flaxFind(flax *f, uint8_t key, void **value) { + if (!f || f->numele == 0) { + if (value) *value = NULL; + return 0; + } + int16_t idx; + if (flax_search(flax_keys(f), f->numele, key, &idx)) { + if (value) *value = flax_values(f)[idx]; + return 1; + } + if (value) *value = NULL; + return 0; +} + +/* Free a whole flax. */ +void flaxFree(flax *f) { + if (!f) return; + flax_free(f->data); + flax_free(f); +} + +/* Free a whole flax, calling the specified callback with a context + * argument in order to free the auxiliary data. */ +void flaxFreeWithCallback(flax *f, + void (*free_callback)(void *item, void *ctx), + void *ctx) { + if (!f) return; + if (free_callback && f->data && f->numele > 0) { + void **vals = flax_values(f); + for (uint32_t i = 0; i < f->numele; i++) + free_callback(vals[i], ctx); + } + flax_free(f->data); + flax_free(f); +} + +/* Return the number of elements inside the flax. */ +uint16_t flaxSize(flax *f) { + return f->numele; +} + +/* Return the total heap memory used by the flax struct and its data block. + * O(1): the value is maintained incrementally by alloc/resize operations. */ +size_t flaxAllocSize(flax *f) { + if (!f) return 0; + return f->alloc_size; +} + +/* Shrink the internal storage to fit the current number of elements, + * releasing unused memory. No-op when the flax is empty (the caller should + * flaxFree() the whole structure instead) or already at exact capacity. */ +void flaxShrink(flax *f) { + if (f->numele > 0 && f->numele < f->capacity) + flax_resize(f, f->numele); +} + +/* ------------------------------- Iterator --------------------------------- */ + +/* Initialize a flax iterator. This call should be performed a single time + * to initialize the iterator, and must be followed by a flaxSeek() call, + * otherwise the flaxNext() function will just return 0. */ +void flaxStart(flaxIterator *it, flax *f) { + it->f = f; + it->idx = -1; + it->key = 0; + it->data = NULL; +} + +/* Seek an iterator at the specified element. The 'op' argument selects the + * seek mode: "^" for the first element, "$" for the last, ">=" for greater + * or equal. Return 0 if no matching element was found, otherwise 1 is + * returned. */ +int flaxSeek(flaxIterator *it, const char *op, uint8_t key) { + if (!it->f || it->f->numele == 0) { + it->idx = -1; + it->key = 0; + it->data = NULL; + return 0; + } + + if (op[0] == '^') { + it->idx = 0; + flaxIterRefresh(it); + return 1; + } + + if (op[0] == '$') { + it->idx = it->f->numele - 1; + flaxIterRefresh(it); + return 1; + } + + if (op[0] == '>' && op[1] == '=') { + int16_t idx; + flax_search(flax_keys(it->f), it->f->numele, key, &idx); + if (idx >= it->f->numele) { + it->idx = -1; + it->key = 0; + it->data = NULL; + return 0; + } + it->idx = idx; + flaxIterRefresh(it); + return 1; + } + + assert(0 && "flaxSeek: unrecognized op"); + it->idx = -1; + it->key = 0; + it->data = NULL; + return 0; +} + +/* Go to the next element in the scope of the iterator 'it'. + * If EOF is reached, 0 is returned, otherwise 1 is returned. */ +int flaxNext(flaxIterator *it) { + if (it->idx < 0) return 0; + it->idx++; + if (it->idx >= it->f->numele) { + it->idx = -1; + it->key = 0; + it->data = NULL; + return 0; + } + flaxIterRefresh(it); + return 1; +} + +/* Replace the data pointer at the current iterator position. Unlike + * flaxInsert(), this is safe to call during iteration: it only writes to + * the value slot at the current index and never touches the key layout, + * element count, or capacity. The iterator's own 'data' field is updated + * to reflect the new value. */ +void flaxIterSetData(flaxIterator *it, void *data) { + assert(it->idx >= 0 && it->idx < it->f->numele); + flax_values(it->f)[it->idx] = data; + it->data = data; +} + +/* ----------------------------- Unit tests --------------------------------- */ + +#ifdef REDIS_TEST +#include "testhelp.h" +#include +#include + +#define UNUSED(x) (void)(x) + +#define ERR(x, ...) \ + do { \ + printf("%s:%s:%d:\t", __FILE__, __func__, __LINE__); \ + printf("ERROR! " x "\n", __VA_ARGS__); \ + err++; \ + } while (0) + +#define TEST(name) printf("test — %s\n", name); + +static int flax_test_free_count; + +static void flax_test_counting_free(void *p, void *ctx) { + (void)ctx; + flax_test_free_count++; + flax_free(p); +} + +static void flax_test_ctx_free(void *p, void *ctx) { + (void)p; + int *cnt = ctx; + (*cnt)++; +} + +int flaxTest(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + int err = 0; + + TEST("new and free empty") { + flax *a = flaxNew(); + assert(a != NULL); + assert(a->numele == 0); + assert(a->capacity == FLAX_INIT_CAPACITY); + assert(a->data != NULL); + flaxFree(a); + } + + TEST("find on empty flax") { + flax *a = flaxNew(); + void *val; + assert(flaxFind(a, 42, &val) == 0); + assert(val == NULL); + assert(flaxFind(a, 1, &val) == 0); + flaxFree(a); + } + + TEST("insert and find") { + flax *a = flaxNew(); + void *old, *val; + + flaxInsert(a, 30, "thirty", &old); + assert(old == NULL); + flaxInsert(a, 10, "ten", &old); + assert(old == NULL); + flaxInsert(a, 50, "fifty", &old); + assert(old == NULL); + flaxInsert(a, 20, "twenty", &old); + assert(old == NULL); + flaxInsert(a, 40, "forty", &old); + assert(old == NULL); + assert(flaxSize(a) == 5); + + assert(flaxFind(a, 10, &val) == 1); + assert(strcmp(val, "ten") == 0); + assert(flaxFind(a, 20, &val) == 1); + assert(strcmp(val, "twenty") == 0); + assert(flaxFind(a, 30, &val) == 1); + assert(strcmp(val, "thirty") == 0); + assert(flaxFind(a, 40, &val) == 1); + assert(strcmp(val, "forty") == 0); + assert(flaxFind(a, 50, &val) == 1); + assert(strcmp(val, "fifty") == 0); + assert(flaxFind(a, 99, &val) == 0); + assert(flaxFind(a, 0, &val) == 0); + + flaxFree(a); + } + + TEST("insert duplicate replaces value") { + flax *a = flaxNew(); + flaxInsert(a, 5, "old_five", NULL); + flaxInsert(a, 10, "old_ten", NULL); + + void *old, *val; + flaxInsert(a, 5, "new_five", &old); + assert(old != NULL); + assert(strcmp(old, "old_five") == 0); + assert(flaxSize(a) == 2); + assert(flaxFind(a, 5, &val) == 1); + assert(strcmp(val, "new_five") == 0); + + flaxInsert(a, 10, "new_ten", &old); + assert(strcmp(old, "old_ten") == 0); + assert(flaxSize(a) == 2); + assert(flaxFind(a, 10, &val) == 1); + assert(strcmp(val, "new_ten") == 0); + + flaxFree(a); + } + + TEST("remove basic") { + flax *a = flaxNew(); + flaxInsert(a, 1, "one", NULL); + flaxInsert(a, 2, "two", NULL); + flaxInsert(a, 3, "three", NULL); + + void *old, *val; + assert(flaxRemove(a, 2, &old) == 1); + assert(strcmp(old, "two") == 0); + assert(flaxSize(a) == 2); + assert(flaxFind(a, 2, &val) == 0); + assert(flaxFind(a, 1, &val) == 1); + assert(strcmp(val, "one") == 0); + assert(flaxFind(a, 3, &val) == 1); + assert(strcmp(val, "three") == 0); + + flaxFree(a); + } + + TEST("remove not found") { + flax *a = flaxNew(); + flaxInsert(a, 1, "one", NULL); + void *old; + assert(flaxRemove(a, 99, &old) == 0); + assert(old == NULL); + assert(flaxSize(a) == 1); + + flax *b = flaxNew(); + assert(flaxRemove(b, 1, &old) == 0); + flaxFree(b); + + flaxFree(a); + } + + TEST("remove only element") { + flax *a = flaxNew(); + flaxInsert(a, 42, "answer", NULL); + void *old, *val; + assert(flaxRemove(a, 42, &old) == 1); + assert(strcmp(old, "answer") == 0); + assert(flaxSize(a) == 0); + assert(flaxFind(a, 42, &val) == 0); + + flaxFree(a); + } + + TEST("insert at beginning and end") { + flax *a = flaxNew(); + flaxInsert(a, 50, "middle", NULL); + flaxInsert(a, 100, "end", NULL); + flaxInsert(a, 1, "begin", NULL); + + void *val; + assert(flaxSize(a) == 3); + assert(flaxFind(a, 1, &val) == 1); + assert(strcmp(val, "begin") == 0); + assert(flaxFind(a, 50, &val) == 1); + assert(strcmp(val, "middle") == 0); + assert(flaxFind(a, 100, &val) == 1); + assert(strcmp(val, "end") == 0); + + flaxFree(a); + } + + TEST("grow beyond initial capacity") { + flax *a = flaxNew(); + for (int i = 0; i < 128; i++) { + char *buf = flax_malloc(16); + snprintf(buf, 16, "v%d", i); + flaxInsert(a, (uint8_t)(i * 2), buf, NULL); + } + assert(flaxSize(a) == 128); + assert(a->capacity >= 128); + + for (int i = 0; i < 128; i++) { + char expected[16]; + snprintf(expected, sizeof(expected), "v%d", i); + void *val; + assert(flaxFind(a, (uint8_t)(i * 2), &val) == 1); + if (strcmp(val, expected) != 0) { + ERR("grow: key %d expected '%s' got '%s'", + i * 2, expected, (char *)val); + } + } + + flaxFreeWithCallback(a, flax_test_counting_free, NULL); + } + + TEST("auto-shrink on remove") { + flax *a = flaxNew(); + for (int i = 0; i < 64; i++) + flaxInsert(a, (uint8_t)i, "x", NULL); + + assert(flaxSize(a) == 64); + uint16_t cap_full = a->capacity; + + for (int i = 0; i < 56; i++) + flaxRemove(a, (uint8_t)i, NULL); + + assert(flaxSize(a) == 8); + if (a->capacity >= cap_full) { + ERR("auto-shrink: capacity %u should be less than %u after removals", + a->capacity, cap_full); + } + + for (int i = 56; i < 64; i++) { + void *val; + assert(flaxFind(a, (uint8_t)i, &val) == 1); + assert(strcmp(val, "x") == 0); + } + + flaxFree(a); + } + + TEST("explicit shrink after removals") { + flax *a = flaxNew(); + for (int i = 0; i < 64; i++) + flaxInsert(a, (uint8_t)i, "x", NULL); + + for (int i = 0; i < 56; i++) + flaxRemove(a, (uint8_t)i, NULL); + + assert(flaxSize(a) == 8); + uint16_t cap_after_remove = a->capacity; + flaxShrink(a); + assert(a->capacity <= cap_after_remove); + assert(a->capacity >= a->numele); + + for (int i = 56; i < 64; i++) { + void *val; + assert(flaxFind(a, (uint8_t)i, &val) == 1); + assert(strcmp(val, "x") == 0); + } + + flaxFree(a); + } + + TEST("no shrink below FLAX_INIT_CAPACITY") { + flax *a = flaxNew(); + for (int i = 0; i < 4; i++) + flaxInsert(a, (uint8_t)i, "x", NULL); + + assert(a->capacity == FLAX_INIT_CAPACITY); + for (int i = 0; i < 3; i++) + flaxRemove(a, (uint8_t)i, NULL); + + assert(flaxSize(a) == 1); + assert(a->capacity == FLAX_INIT_CAPACITY); + + flaxFree(a); + } + + TEST("flaxFreeWithCallback invokes callback") { + flax_test_free_count = 0; + flax *a = flaxNew(); + for (int i = 0; i < 5; i++) { + char *s = flax_malloc(8); + snprintf(s, 8, "str%d", i); + flaxInsert(a, i, s, NULL); + } + flaxFreeWithCallback(a, flax_test_counting_free, NULL); + if (flax_test_free_count != 5) { + ERR("freeWithCallback: expected 5 frees, got %d", + flax_test_free_count); + } + } + + TEST("flaxFreeWithCallback on empty flax") { + flax_test_free_count = 0; + flax *a = flaxNew(); + flaxFreeWithCallback(a, flax_test_counting_free, NULL); + if (flax_test_free_count != 0) { + ERR("freeWithCallback empty: expected 0 frees, got %d", + flax_test_free_count); + } + } + + TEST("keys near uint8 boundaries") { + flax *a = flaxNew(); + flaxInsert(a, 0, "zero", NULL); + flaxInsert(a, 255, "max", NULL); + flaxInsert(a, 254, "max-1", NULL); + flaxInsert(a, 100, "hundred", NULL); + + void *val; + assert(flaxSize(a) == 4); + assert(flaxFind(a, 0, &val) == 1); + assert(strcmp(val, "zero") == 0); + assert(flaxFind(a, 100, &val) == 1); + assert(strcmp(val, "hundred") == 0); + assert(flaxFind(a, 254, &val) == 1); + assert(strcmp(val, "max-1") == 0); + assert(flaxFind(a, 255, &val) == 1); + assert(strcmp(val, "max") == 0); + + flaxFree(a); + } + + TEST("flaxTryInsert does not overwrite") { + flax *a = flaxNew(); + assert(flaxTryInsert(a, 10, "ten", NULL) == 1); + assert(flaxTryInsert(a, 20, "twenty", NULL) == 1); + assert(flaxSize(a) == 2); + + void *old, *val; + assert(flaxTryInsert(a, 10, "new_ten", &old) == 0); + assert(strcmp(old, "ten") == 0); + assert(flaxSize(a) == 2); + assert(flaxFind(a, 10, &val) == 1); + assert(strcmp(val, "ten") == 0); + + flaxFree(a); + } + + TEST("iterator on empty flax") { + flax *a = flaxNew(); + flaxIterator it; + flaxStart(&it, a); + assert(flaxSeek(&it, "^", 0) == 0); + assert(flaxSeek(&it, "$", 0) == 0); + assert(flaxSeek(&it, ">=", 42) == 0); + + + flaxFree(a); + } + + TEST("iterator forward") { + flax *a = flaxNew(); + flaxInsert(a, 10, "ten", NULL); + flaxInsert(a, 30, "thirty", NULL); + flaxInsert(a, 20, "twenty", NULL); + flaxInsert(a, 40, "forty", NULL); + + flaxIterator it; + flaxStart(&it, a); + assert(flaxSeek(&it, "^", 0)); + assert(it.key == 10); + assert(strcmp(it.data, "ten") == 0); + assert(flaxNext(&it)); + assert(it.key == 20); + assert(flaxNext(&it)); + assert(it.key == 30); + assert(flaxNext(&it)); + assert(it.key == 40); + assert(flaxNext(&it) == 0); + + + flaxFree(a); + } + + TEST("iterator seek >=") { + flax *a = flaxNew(); + flaxInsert(a, 10, "ten", NULL); + flaxInsert(a, 20, "twenty", NULL); + flaxInsert(a, 30, "thirty", NULL); + flaxInsert(a, 40, "forty", NULL); + + flaxIterator it; + flaxStart(&it, a); + + assert(flaxSeek(&it, ">=", 20)); + assert(it.key == 20); + + assert(flaxSeek(&it, ">=", 25)); + assert(it.key == 30); + + assert(flaxSeek(&it, ">=", 5)); + assert(it.key == 10); + + assert(flaxSeek(&it, ">=", 41) == 0); + + + flaxFree(a); + } + + TEST("iterator on single element") { + flax *a = flaxNew(); + flaxInsert(a, 42, "answer", NULL); + + flaxIterator it; + flaxStart(&it, a); + assert(flaxSeek(&it, "^", 0)); + assert(it.key == 42); + assert(strcmp(it.data, "answer") == 0); + assert(flaxNext(&it) == 0); + + + flaxFree(a); + } + + TEST("flaxFreeWithCallback") { + int ctx_free_count = 0; + flax *a = flaxNew(); + flaxInsert(a, 1, "one", NULL); + flaxInsert(a, 2, "two", NULL); + flaxInsert(a, 3, "three", NULL); + flaxFreeWithCallback(a, flax_test_ctx_free, &ctx_free_count); + if (ctx_free_count != 3) { + ERR("freeWithCallback: expected 3 frees, got %d", + ctx_free_count); + } + } + + TEST("flaxAllocSize tracks allocations") { + flax *a = flaxNew(); + size_t sz0 = flaxAllocSize(a); + assert(sz0 > 0); + + for (int i = 0; i < 64; i++) + flaxInsert(a, (uint8_t)i, "x", NULL); + + size_t sz1 = flaxAllocSize(a); + assert(sz1 >= sz0); + + flaxShrink(a); + size_t sz2 = flaxAllocSize(a); + assert(sz2 <= sz1); + assert(sz2 > 0); + + assert(flaxAllocSize(NULL) == 0); + + flaxFree(a); + } + + TEST("flaxFree and flaxFreeWithCallback on NULL") { + flaxFree(NULL); + flaxFreeWithCallback(NULL, flax_test_counting_free, NULL); + } + + TEST("flaxFind and flaxRemove on NULL flax") { + void *val; + assert(flaxFind(NULL, 42, &val) == 0); + assert(val == NULL); + assert(flaxRemove(NULL, 42, &val) == 0); + assert(val == NULL); + } + + TEST("flaxTryInsert with old=NULL on duplicate") { + flax *a = flaxNew(); + assert(flaxTryInsert(a, 10, "ten", NULL) == 1); + assert(flaxTryInsert(a, 10, "new_ten", NULL) == 0); + assert(flaxSize(a) == 1); + + void *val; + assert(flaxFind(a, 10, &val) == 1); + assert(strcmp(val, "ten") == 0); + + flaxFree(a); + } + + TEST("flaxIterSetData replaces value during iteration") { + flax *a = flaxNew(); + flaxInsert(a, 10, "ten", NULL); + flaxInsert(a, 20, "twenty", NULL); + flaxInsert(a, 30, "thirty", NULL); + + flaxIterator it; + flaxStart(&it, a); + assert(flaxSeek(&it, "^", 0)); + do { + if (it.key == 20) flaxIterSetData(&it, "TWENTY"); + } while (flaxNext(&it)); + + + void *val; + assert(flaxFind(a, 10, &val) == 1); + assert(strcmp(val, "ten") == 0); + assert(flaxFind(a, 20, &val) == 1); + assert(strcmp(val, "TWENTY") == 0); + assert(flaxFind(a, 30, &val) == 1); + assert(strcmp(val, "thirty") == 0); + assert(flaxSize(a) == 3); + + flaxFree(a); + } + + if (!err) + printf("ALL TESTS PASSED!\n"); + else + ERR("Sorry, not all tests passed! In fact, %d tests failed.", err); + + return err; +} + +#endif diff --git a/src/flax.h b/src/flax.h new file mode 100644 index 000000000..bd62eb756 --- /dev/null +++ b/src/flax.h @@ -0,0 +1,100 @@ +/* Flax -- A flat sorted-array map for uint8_t keys. + * + * Copyright (c) 2025-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +#ifndef FLAX_H +#define FLAX_H + +#include +#include + +#define FLAX_INIT_CAPACITY 16 + +/* A flax is a sorted associative container that maps uint8_t keys to void* + * values. Both arrays live in a single heap allocation ("data block") laid + * out as follows: + * + * flax struct data block (single allocation) + * +------------+ +------------------------------------+ + * | *data -----------> | keys[0..cap-1] (uint8_t) | + * | numele | +-- aligned to sizeof(void*) --------+ + * | capacity | | values[0..cap-1] (void*) | + * | alloc_size | +------------------------------------+ + * +------------+ + * + * Keys are maintained in ascending sorted order. Only the first 'numele' + * slots in each array contain live data; the remainder up to 'capacity' + * is reserved space for future inserts. + * + * Lookup, insert and delete use a linear scan over the keys array rather + * than binary search. This is intentional: the expected element count is + * small (e.g. per-consumer stream PEL), so the sequential, cache-friendly + * access pattern outperforms binary search whose branch-misprediction cost + * dominates at these sizes. The scan includes fast-path checks for the + * head and tail positions to accelerate the common case of monotonically + * increasing keys (e.g. stream entry IDs). + * + * Growth strategy: the data block doubles in capacity when full (on insert) + * and can be shrunk to fit with flaxShrink(). + */ +typedef struct flax { + void *data; /* Packed storage: keys array followed by values array. */ + uint16_t numele; /* Number of elements currently stored (max 256). */ + uint16_t capacity; /* Current allocated capacity. */ + uint32_t alloc_size; /* Total usable bytes: struct allocation + data block. */ +} flax; + +/* Flax iterator state. The typical lifecycle is: + * + * flaxIterator it; + * flaxStart(&it, myflax); -- initialize + * flaxSeek(&it, ">=", somekey); -- position + * while (flaxNext(&it)) { ... } -- iterate + * + * After flaxStart() the iterator is in EOF state until a successful + * flaxSeek(). The iterator does not allocate heap memory. + * + * WARNING: the iterator is invalidated by any mutation (insert / remove / + * resize) on the underlying flax. Do not modify the flax while iterating. */ +typedef struct flaxIterator { + flax *f; /* Flax we are iterating. */ + uint8_t key; /* The current key. */ + void *data; /* Data associated to this key. */ + int16_t idx; /* Current index into the flax arrays, -1 if EOF. */ +} flaxIterator; + +/* --- Creation and destruction --- */ +flax *flaxNew(void); +void flaxFree(flax *f); +void flaxFreeWithCallback(flax *f, + void (*free_callback)(void *item, void *ctx), + void *ctx); + +/* --- Lookup and mutation --- */ +int flaxInsert(flax *f, uint8_t key, void *data, void **old); +int flaxTryInsert(flax *f, uint8_t key, void *data, void **old); +int flaxRemove(flax *f, uint8_t key, void **old); +int flaxFind(flax *f, uint8_t key, void **value); + +/* --- Iterator --- */ +void flaxStart(flaxIterator *it, flax *f); +int flaxSeek(flaxIterator *it, const char *op, uint8_t key); +int flaxNext(flaxIterator *it); +void flaxIterSetData(flaxIterator *it, void *data); + +/* --- Introspection --- */ +uint16_t flaxSize(flax *f); +size_t flaxAllocSize(flax *f); +void flaxShrink(flax *f); + +#ifdef REDIS_TEST +int flaxTest(int argc, char *argv[], int flags); +#endif + +#endif diff --git a/src/flax_malloc.h b/src/flax_malloc.h new file mode 100644 index 000000000..6b1ae2783 --- /dev/null +++ b/src/flax_malloc.h @@ -0,0 +1,25 @@ +/* Flax -- A flat sorted-array map for uint8_t keys. + * + * Copyright (c) 2025-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +/* Allocator selection. + * + * This file is used in order to change the Flax allocator at compile time. + * Just define the following defines to what you want to use. Also add + * the include of your alternate allocator if needed (not needed in order + * to use the default libc allocator). */ + +#ifndef FLAX_MALLOC_H +#define FLAX_MALLOC_H +#include "zmalloc.h" +#define flax_malloc zmalloc +#define flax_malloc_usable zmalloc_usable +#define flax_free zfree +#define flax_free_usable zfree_usable +#endif diff --git a/src/lazyfree.c b/src/lazyfree.c index f9cde4e7e..5b4b7b6c6 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -198,7 +198,7 @@ size_t lazyfreeGetFreeEffort(robj *key, robj *obj, int dbid) { * work. */ serverAssert(raxNext(&ri)); cg = ri.data; - effort += raxSize(s->cgroups)*(1+raxSize(cg->pel)); + effort += raxSize(s->cgroups)*(1+cg->pel_count); raxStop(&ri); } return effort; diff --git a/src/rdb.c b/src/rdb.c index 9793c2672..4ae492a4d 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -752,35 +752,35 @@ int rdbLoadObjectType(rio *rdb) { * we serialized the NACKs as well, but when serializing the local consumer * PELs we just add the ID, that will be resolved inside the global PEL to * put a reference to the same structure. */ -ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, int nacks) { +ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, uint64_t pel_count, int nacks) { ssize_t n, nwritten = 0; /* Number of entries in the PEL. */ - if ((n = rdbSaveLen(rdb,raxSize(pel))) == -1) return -1; + if ((n = rdbSaveLen(rdb,pel_count)) == -1) return -1; nwritten += n; /* Save each entry. */ - raxIterator ri; - raxStart(&ri,pel); - raxSeek(&ri,"^",NULL,0); - while(raxNext(&ri)) { - /* We store IDs in raw form as 128 big big endian numbers, like - * they are inside the radix tree key. */ - if ((n = rdbWriteRaw(rdb,ri.key,sizeof(streamID))) == -1) { - raxStop(&ri); + pelIterator pi; + pelIterStart(&pi,pel); + pelIterSeek(&pi,"^",NULL); + while (pelIterNext(&pi)) { + /* We store IDs in raw form as 128 big big endian numbers, + * reconstructed from the two-level structure. */ + if ((n = rdbWriteRaw(rdb,pi.key,sizeof(streamID))) == -1) { + pelIterStop(&pi); return -1; } nwritten += n; if (nacks) { - streamNACK *nack = ri.data; + streamNACK *nack = pi.data; if ((n = rdbSaveMillisecondTime(rdb,nack->delivery_time)) == -1) { - raxStop(&ri); + pelIterStop(&pi); return -1; } nwritten += n; if ((n = rdbSaveLen(rdb,nack->delivery_count)) == -1) { - raxStop(&ri); + pelIterStop(&pi); return -1; } nwritten += n; @@ -789,7 +789,7 @@ ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, int nacks) { * at loading time. */ } } - raxStop(&ri); + pelIterStop(&pi); return nwritten; } @@ -1040,7 +1040,7 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) { * passed with value of 0), at loading time we'll lookup the ID * in the consumer group global PEL and will put a reference in the * consumer local PEL. */ - if ((n = rdbSaveStreamPEL(rdb,consumer->pel,0)) == -1) { + if ((n = rdbSaveStreamPEL(rdb,consumer->pel,consumer->pel_count,0)) == -1) { raxStop(&ri); return -1; } @@ -1416,7 +1416,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { nwritten += n; /* Save the global PEL. */ - if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) { + if ((n = rdbSaveStreamPEL(rdb,cg->pel,cg->pel_count,1)) == -1) { raxStop(&ri); return -1; } @@ -3483,14 +3483,14 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) streamNACK *nack = streamCreateNACK(s, NULL, &nack_id); nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); nack->delivery_count = rdbLoadLen(rdb,NULL); - nack->cgroup_ref_node = streamLinkCGroupToEntry(s, cgroup, rawid); + nack->cgroup_ref_node = streamLinkCGroupToEntry(s, cgroup, &nack_id); if (rioGetReadError(rdb)) { rdbReportReadError("Stream PEL NACK loading failed."); streamFreeNACK(s, nack); decrRefCount(o); return NULL; } - if (!raxTryInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL)) { + if (!pelTryInsert(cgroup->pel,&nack_id,nack,&cgroup->pel_count)) { rdbReportCorruptRDB("Duplicated global PEL entry " "loading stream consumer group"); streamFreeNACK(s, nack); @@ -3563,8 +3563,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) decrRefCount(o); return NULL; } + streamID nack_id; + streamDecodeID(rawid, &nack_id); void *result; - if (!raxFind(cgroup->pel,rawid,sizeof(rawid),&result)) { + if (!pelFind(cgroup->pel, &nack_id, &result)) { rdbReportCorruptRDB("Consumer entry not found in " "group global PEL"); decrRefCount(o); @@ -3584,7 +3586,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) * loading the global PEL. Then set the same shared * NACK structure also in the consumer-specific PEL. */ nack->consumer = consumer; - if (!raxTryInsert(consumer->pel,rawid,sizeof(rawid),nack,NULL)) { + if (!pelTryInsert(consumer->pel,&nack_id,nack,&consumer->pel_count)) { rdbReportCorruptRDB("Duplicated consumer PEL entry " " loading a stream consumer " "group"); @@ -3619,8 +3621,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) return NULL; } + streamID nack_id; + streamDecodeID(rawid, &nack_id); void *result; - if (!raxFind(cgroup->pel, rawid, sizeof(rawid), &result)) { + if (!pelFind(cgroup->pel, &nack_id, &result)) { rdbReportCorruptRDB("Stream NACK zone entry not found " "in group global PEL"); decrRefCount(o); diff --git a/src/server.c b/src/server.c index df660175e..88c89210b 100644 --- a/src/server.c +++ b/src/server.c @@ -7883,6 +7883,7 @@ struct redisTest { {"vector", vectorTest}, {"bitmap", bitopsTest}, {"rax", raxTest}, + {"flax", flaxTest}, {"zset", zsetTest}, {"topk", chkTopKTest}, {"fastfloat", fastFloatTest}, diff --git a/src/server.h b/src/server.h index 9318eec68..cc4d66fb0 100644 --- a/src/server.h +++ b/src/server.h @@ -64,6 +64,7 @@ typedef long long ustime_t; /* microsecond time type. */ #include "quicklist.h" /* Lists are encoded as linked lists of N-elements flat arrays */ #include "rax.h" /* Radix tree */ +#include "flax.h" /* Flat sorted array */ #include "connection.h" /* Connection abstraction */ #include "eventnotifier.h" /* Event notification */ #include "memory_prefetch.h" diff --git a/src/stream.h b/src/stream.h index 8adbeffe4..9bd893302 100644 --- a/src/stream.h +++ b/src/stream.h @@ -5,6 +5,18 @@ #include "listpack.h" #include "dict.h" #include "xxhash.h" +#include "flax.h" +#include + +/* Two-level PEL key-length convention: + * + * Single-entry ("direct") buckets use a 16-byte rax key (the complete + * big-endian streamID) and store the data pointer directly as the rax + * value. Multi-entry ("flax") buckets use a 15-byte rax key (ms + upper + * 7 bytes of seq) and store a flax* mapping the low byte of seq to data + * pointers. The key length in rax (16 vs 15) distinguishes the two. */ +#define PEL_RAX_DIRECT_KEYLEN sizeof(streamID) /* 16 */ +#define PEL_RAX_FLAX_KEYLEN 15 /* Stream item ID: a 128 bit number composed of a milliseconds time and * a sequence counter. IDs generated in the same millisecond (or in a past @@ -42,7 +54,11 @@ typedef struct stream { uint64_t entries_added; /* All time count of elements added. */ size_t alloc_size; /* Total allocated memory (in bytes) by this stream. */ rax *cgroups; /* Consumer groups dictionary: name -> streamCG */ - rax *cgroups_ref; /* Index mapping message IDs to their consumer groups. */ + rax *cgroups_ref; /* Two-level index mapping message IDs to their + consumer groups. Same key scheme as PEL: + outer rax -> flax(low byte of seq -> list* + of streamCG pointers). Direct / flax key + lengths follow the PEL convention (16/15). */ streamID min_cgroup_last_id; /* The minimum ID of consume group. */ unsigned int min_cgroup_last_id_valid: 1; uint64_t idmp_duration; /* IDMP duration in seconds. */ @@ -99,12 +115,13 @@ typedef struct streamCG { group reads. In the real world, the reasoning behind this value is detailed at the top comment of streamEstimateDistanceFromFirstEverEntry(). */ - rax *pel; /* Pending entries list. This is a radix tree that - has every message delivered to consumers (without - the NOACK option) that was yet not acknowledged - as processed. The key of the radix tree is the - ID as a 64 bit big endian number, while the - associated value is a streamNACK structure.*/ + rax *pel; /* Two-level pending entries list. Single-entry + buckets (direct) use a 16-byte rax key (full + big-endian streamID). Multi-entry buckets use + a 15-byte key (ms + upper 7 bytes of seq) and + store a flax* mapping the low byte of seq to + streamNACK pointers. Max 256 per bucket. */ + uint64_t pel_count; /* Total number of NACK entries in this PEL (direct + flax). */ streamNACK *pel_time_head; /* Head of time-ordered doubly-linked list of pending entries (oldest delivery_time). Used for efficient CLAIM operations. O(1) access to oldest entries. */ @@ -127,13 +144,11 @@ typedef struct streamConsumer { sds name; /* Consumer name. This is how the consumer will be identified in the consumer group protocol. Case sensitive. */ - rax *pel; /* Consumer specific pending entries list: all - the pending messages delivered to this - consumer not yet acknowledged. Keys are - big endian message IDs, while values are - the same streamNACK structure referenced - in the "pel" of the consumer group structure - itself, so the value is shared. */ + rax *pel; /* Two-level consumer PEL: same structure as + streamCG.pel — 16-byte key for direct, + 15-byte key for flax buckets. NACKs are + shared with group PEL. */ + uint64_t pel_count; /* Total NACK count for this consumer. */ } streamConsumer; /* Pending (yet not acknowledged) message in a consumer group. */ @@ -183,7 +198,7 @@ void streamEncodeID(void *buf, streamID *id); void streamDecodeID(void *buf, streamID *id); int streamCompareID(streamID *a, streamID *b); void streamFreeNACK(stream *s, streamNACK *na); -void streamDestroyNACK(stream *s, streamNACK *na, unsigned char *key); +void streamDestroyNACK(stream *s, streamNACK *na); int streamIncrID(streamID *id); int streamDecrID(streamID *id); void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername); @@ -201,7 +216,37 @@ int streamEntryExists(stream *s, streamID *id); void streamKeyLoaded(redisDb *db, robj *key, robj *val); void streamKeyRemoved(redisDb *db, robj *key, robj *val); -listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, unsigned char *key); +listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, streamID *id); + +/* Two-level PEL iterator: walks outer rax and inner flax. + * Direct entries (16-byte rax key) have no flax allocation; + * direct tracks whether the current entry is direct. */ +typedef struct pelIterator { + raxIterator ri; + flaxIterator fi; + int valid; + int just_seeked; + int direct; + streamID id; + void *data; + unsigned char key[sizeof(streamID)]; +} pelIterator; + +/* Two-level PEL operations. */ +void pelCacheInvalidate(rax *pel); +rax *pelNew(size_t *alloc_size); +void pelFree(rax *pel, void (*nack_free)(void *, void *), void *ctx); +void pelFreeShallow(rax *pel); +int pelInsert(rax *pel, streamID *id, void *data, uint64_t *count); +int pelTryInsert(rax *pel, streamID *id, void *data, uint64_t *count); +void pelReplace(rax *pel, streamID *id, void *data); +int pelFind(rax *pel, streamID *id, void **data); +void *pelRemove(rax *pel, streamID *id, uint64_t *count); + +void pelIterStart(pelIterator *pi, rax *pel); +int pelIterSeek(pelIterator *pi, const char *op, streamID *id); +int pelIterNext(pelIterator *pi); +void pelIterStop(pelIterator *pi); /* PEL time list management (used by RDB loading) */ void pelListInsertSorted(streamCG *cg, streamNACK *nack); diff --git a/src/t_stream.c b/src/t_stream.c index d77625a47..8ee73ab37 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -61,6 +61,560 @@ static void pelListInsertAfter(streamCG *cg, streamNACK *after, streamNACK *nack static void pelListInsertAtTail(streamCG *cg, streamNACK *nack); static void pelListUpdate(streamCG *cg, streamNACK *nack, mstime_t new_delivery_time); +void streamEncodeID(void *buf, streamID *id); + +/* ----------------------------------------------------------------------- + * Two-level PEL: rax -> direct | flax(seq -> data*) + * + * Multi-entry buckets use a 15-byte rax key (full ms + upper 7 bytes of + * big-endian seq) and store a flax* mapping the low byte of seq to data + * pointers. Single-entry buckets ("direct entries") use a 16-byte rax + * key (the complete big-endian streamID) and store the raw data pointer + * directly in the rax value, avoiding the flax allocation. The key + * length in rax (16 vs 15) distinguishes the two bucket types. + * ----------------------------------------------------------------------- */ + + +/* Cache embedded in rax metadata to speed up sequential PEL ops + * when consecutive operations target the same ID prefix. + * When 'dirty' is set, the cached value has been created/updated but not yet + * inserted into the rax; it will be committed on the next cache eviction + * or explicit flush. 'direct' mirrors the key-length convention: + * 1 = direct (16-byte key, raw data pointer), 0 = flax (15-byte key). + * + * KEY INVARIANT: dirty==1 means "this bucket exists ONLY in the cache, not in + * the rax". This permits type transitions (direct<->flax) while dirty without + * touching the rax: the old entry was never committed, so there is nothing + * stale to remove. For example, a dirty direct can be promoted to a dirty + * flax (pelGenericInsert), or a dirty flax can be demoted to a dirty direct + * (pelRemove), and in both cases pelCacheFlush will later insert the new + * representation using the correct key length derived from cache->direct. */ +typedef struct pelCache { + unsigned char key[PEL_RAX_DIRECT_KEYLEN]; + void *val; + int dirty; + int direct; +} pelCache; + +static void pelCacheFlush(rax *r) { + pelCache *cache = (pelCache *)r->metadata; + if (!cache->dirty) return; + size_t keylen = cache->direct ? PEL_RAX_DIRECT_KEYLEN : PEL_RAX_FLAX_KEYLEN; + raxInsert(r, cache->key, keylen, cache->val, NULL); + cache->dirty = 0; +} + +void pelCacheInvalidate(rax *pel) { + pelCacheFlush(pel); + pelCache *cache = (pelCache *)pel->metadata; + cache->val = NULL; + cache->dirty = 0; + cache->direct = 0; +} + +rax *pelNew(size_t *alloc_size) { + rax *pel = raxNewWithMetadata(sizeof(pelCache), alloc_size); + if (pel) { + pelCache *cache = (pelCache *)pel->metadata; + cache->val = NULL; + cache->dirty = 0; + cache->direct = 0; + } + return pel; +} + +/* Free all buckets and call nack_free for each data pointer. */ +void pelFree(rax *pel, void (*nack_free)(void *, void *), void *ctx) { + if (!pel) return; + pelCacheFlush(pel); + raxIterator ri; + raxStart(&ri, pel); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + if (ri.key_len == PEL_RAX_DIRECT_KEYLEN) { + if (nack_free) nack_free(ri.data, ctx); + } else { + flax *f = (flax *)ri.data; + if (pel->alloc_size) *pel->alloc_size -= flaxAllocSize(f); + if (nack_free) + flaxFreeWithCallback(f, nack_free, ctx); + else + flaxFree(f); + } + } + raxStop(&ri); + raxFreeWithCbAndContext(pel, NULL, NULL); +} + +/* Free buckets without freeing data (for consumer PEL where NACKs are shared). */ +void pelFreeShallow(rax *pel) { + if (!pel) return; + pelCacheFlush(pel); + raxIterator ri; + raxStart(&ri, pel); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + if (ri.key_len == PEL_RAX_FLAX_KEYLEN) { + flax *f = (flax *)ri.data; + if (pel->alloc_size) *pel->alloc_size -= flaxAllocSize(f); + flaxFree(f); + } + } + raxStop(&ri); + raxFree(pel); +} + +/* Generic insert into two-level PEL. If 'overwrite' is true, an existing + * entry's value is replaced; otherwise the insert is skipped when the key + * already exists. Returns 1 if a new entry was created, 0 otherwise. + * + * New buckets are created as direct entries (16-byte rax key). When a second + * entry is inserted into a direct bucket with a different fkey, the bucket + * is promoted to a flax (15-byte rax key). */ +static int pelGenericInsert(rax *pel, streamID *id, void *data, uint64_t *count, int overwrite) { + unsigned char fullkey[PEL_RAX_DIRECT_KEYLEN]; + streamEncodeID(fullkey, id); + uint8_t fkey = fullkey[PEL_RAX_FLAX_KEYLEN]; + pelCache *cache = (pelCache *)pel->metadata; + + /* Cache lookup: compare the 15-byte prefix. */ + int cache_hit = (cache->val != NULL && + memcmp(cache->key, fullkey, PEL_RAX_FLAX_KEYLEN) == 0); + + int direct = 0; + void *bucket = NULL; + if (cache_hit) { + bucket = cache->val; + direct = cache->direct; + } else { + /* Switching away from previous bucket: shrink it if it was a flax. */ + if (cache->val && !cache->direct) { + flax *prev = (flax *)cache->val; + size_t before = flaxAllocSize(prev); + flaxShrink(prev); + if (pel->alloc_size) *pel->alloc_size -= before - flaxAllocSize(prev); + } + pelCacheFlush(pel); + + /* Look for an existing bucket: try 15-byte flax first, then + * prefix-scan for a 16-byte direct entry with any fkey. */ + void *raxval; + if (raxFind(pel, fullkey, PEL_RAX_FLAX_KEYLEN, &raxval)) { + bucket = raxval; + cache->val = bucket; + cache->direct = 0; + memcpy(cache->key, fullkey, PEL_RAX_FLAX_KEYLEN); + } else { + raxIterator ri; + raxStart(&ri, pel); + raxSeek(&ri, ">=", fullkey, PEL_RAX_FLAX_KEYLEN); + if (raxNext(&ri) && ri.key_len == PEL_RAX_DIRECT_KEYLEN && + memcmp(ri.key, fullkey, PEL_RAX_FLAX_KEYLEN) == 0) { + bucket = ri.data; + direct = 1; + cache->val = bucket; + cache->direct = 1; + memcpy(cache->key, ri.key, PEL_RAX_DIRECT_KEYLEN); + } + raxStop(&ri); + } + } + + /* No existing bucket: store as direct entry with 16-byte key. */ + if (!bucket) { + cache->val = data; + cache->direct = 1; + memcpy(cache->key, fullkey, PEL_RAX_DIRECT_KEYLEN); + cache->dirty = 1; + if (count) (*count)++; + return 1; + } + + /* Existing direct entry. */ + if (direct) { + uint8_t efkey = cache->key[PEL_RAX_FLAX_KEYLEN]; + if (efkey == fkey) { + if (overwrite) { + cache->val = data; + if (!cache->dirty) + raxInsert(pel, fullkey, PEL_RAX_DIRECT_KEYLEN, data, NULL); + } + return 0; + } + + /* Different fkey: promote direct -> flax. + * When dirty, the old direct was never committed to rax, so we + * skip rax ops and just overwrite the cache (see pelCache invariant). + * + * When non-dirty, we commit the new flax entry to rax immediately + * and leave dirty==0. Subsequent inserts into the same flax bucket + * modify the flax object in-place (including possible flax_resize of + * its internal data block), but the flax *struct pointer* stored in + * the rax never changes -- flax_resize only replaces f->data, not f + * itself. So the rax entry remains valid without needing a dirty + * flush. This invariant depends on flax using a separate heap + * allocation for the struct vs. its data block. */ + flax *f = flaxNew(); + flaxInsert(f, efkey, bucket, NULL); + flaxInsert(f, fkey, data, NULL); + if (pel->alloc_size) *pel->alloc_size += flaxAllocSize(f); + if (!cache->dirty) { + raxRemove(pel, cache->key, PEL_RAX_DIRECT_KEYLEN, NULL); + raxInsert(pel, fullkey, PEL_RAX_FLAX_KEYLEN, f, NULL); + } + cache->val = f; + cache->direct = 0; + memcpy(cache->key, fullkey, PEL_RAX_FLAX_KEYLEN); + if (count) (*count)++; + return 1; + } + + /* Existing flax bucket. */ + flax *f = (flax *)bucket; + size_t before = flaxAllocSize(f); + int inserted = overwrite ? flaxInsert(f, fkey, data, NULL) + : flaxTryInsert(f, fkey, data, NULL); + if (pel->alloc_size) *pel->alloc_size += flaxAllocSize(f) - before; + if (inserted && count) (*count)++; + return inserted; +} + +/* Overwriting insert. Just a wrapper for pelGenericInsert() that will + * update the element if there is already one for the same key. */ +int pelInsert(rax *pel, streamID *id, void *data, uint64_t *count) { + return pelGenericInsert(pel, id, data, count, 1); +} + +/* Non overwriting insert function: if an element with the same key + * exists, the value is not updated and the function returns 0. + * This is just a wrapper for pelGenericInsert(). */ +int pelTryInsert(rax *pel, streamID *id, void *data, uint64_t *count) { + return pelGenericInsert(pel, id, data, count, 0); +} + +/* Replace the data pointer for an existing entry without cache interaction or + * flax shrink side-effects. Intended for defrag, where the key is guaranteed + * to exist and we must avoid allocations that would increase fragmentation. */ +void pelReplace(rax *pel, streamID *id, void *data) { + unsigned char fullkey[PEL_RAX_DIRECT_KEYLEN]; + streamEncodeID(fullkey, id); + uint8_t fkey = fullkey[PEL_RAX_FLAX_KEYLEN]; + pelCache *cache = (pelCache *)pel->metadata; + + int cache_hit = (cache->val != NULL && + memcmp(cache->key, fullkey, PEL_RAX_FLAX_KEYLEN) == 0); + int direct; + void *bucket; + if (cache_hit) { + bucket = cache->val; + direct = cache->direct; + } else { + if (raxFind(pel, fullkey, PEL_RAX_DIRECT_KEYLEN, &bucket)) { + direct = 1; + } else { + int found = raxFind(pel, fullkey, PEL_RAX_FLAX_KEYLEN, &bucket); + serverAssert(found); + direct = 0; + } + } + + if (direct) { + if (cache_hit) { + cache->val = data; + if (!cache->dirty) + raxInsert(pel, fullkey, PEL_RAX_DIRECT_KEYLEN, data, NULL); + } else { + raxInsert(pel, fullkey, PEL_RAX_DIRECT_KEYLEN, data, NULL); + } + } else { + flaxInsert((flax *)bucket, fkey, data, NULL); + } +} + +/* Find a value by streamID. Returns 1 if found (setting *data), 0 if not. */ +int pelFind(rax *pel, streamID *id, void **data) { + unsigned char fullkey[PEL_RAX_DIRECT_KEYLEN]; + streamEncodeID(fullkey, id); + uint8_t fkey = fullkey[PEL_RAX_FLAX_KEYLEN]; + pelCache *cache = (pelCache *)pel->metadata; + + void *bucket; + if (cache->val && memcmp(cache->key, fullkey, PEL_RAX_FLAX_KEYLEN) == 0) { + if (cache->direct) { + if (cache->key[PEL_RAX_FLAX_KEYLEN] != fkey) return 0; + if (data) *data = cache->val; + return 1; + } + bucket = cache->val; + } else { + pelCacheFlush(pel); + /* Try 16-byte key (direct entry) — exact match guarantees fkey. */ + if (raxFind(pel, fullkey, PEL_RAX_DIRECT_KEYLEN, &bucket)) { + if (data) *data = bucket; + return 1; + } + /* Try 15-byte key (flax bucket). */ + if (!raxFind(pel, fullkey, PEL_RAX_FLAX_KEYLEN, &bucket)) + return 0; + } + + void *val; + if (!flaxFind((flax *)bucket, fkey, &val)) return 0; + if (data) *data = val; + return 1; +} + +/* Remove a value by streamID. Returns the removed value or NULL. + * When a flax bucket drops to 1 entry, it is demoted to a direct entry. */ +void *pelRemove(rax *pel, streamID *id, uint64_t *count) { + unsigned char fullkey[PEL_RAX_DIRECT_KEYLEN]; + streamEncodeID(fullkey, id); + uint8_t fkey = fullkey[PEL_RAX_FLAX_KEYLEN]; + pelCache *cache = (pelCache *)pel->metadata; + + int cache_hit = (cache->val != NULL && + memcmp(cache->key, fullkey, PEL_RAX_FLAX_KEYLEN) == 0); + int direct; + void *bucket; + if (cache_hit) { + bucket = cache->val; + direct = cache->direct; + } else { + pelCacheFlush(pel); + if (raxFind(pel, fullkey, PEL_RAX_DIRECT_KEYLEN, &bucket)) { + direct = 1; + } else if (raxFind(pel, fullkey, PEL_RAX_FLAX_KEYLEN, &bucket)) { + direct = 0; + } else { + return NULL; + } + } + + /* Direct entry. */ + if (direct) { + if (cache_hit && cache->key[PEL_RAX_FLAX_KEYLEN] != fkey) return NULL; + void *old = bucket; + if (count) (*count)--; + if (cache_hit && cache->dirty) { + cache->val = NULL; + cache->dirty = 0; + cache->direct = 0; + } else { + raxRemove(pel, fullkey, PEL_RAX_DIRECT_KEYLEN, NULL); + pelCacheInvalidate(pel); + } + return old; + } + + /* Flax bucket. */ + flax *f = (flax *)bucket; + void *old; + size_t before = flaxAllocSize(f); + if (!flaxRemove(f, fkey, &old)) return NULL; + if (pel->alloc_size) *pel->alloc_size -= before - flaxAllocSize(f); + if (count) (*count)--; + + if (f->numele == 0) { + if (pel->alloc_size) *pel->alloc_size -= flaxAllocSize(f); + flaxFree(f); + if (cache_hit && cache->dirty) { + cache->val = NULL; + cache->dirty = 0; + cache->direct = 0; + } else { + raxRemove(pel, fullkey, PEL_RAX_FLAX_KEYLEN, NULL); + pelCacheInvalidate(pel); + } + } else if (f->numele == 1) { + /* Demote flax -> direct entry with 16-byte key. + * When dirty, the flax was never committed to rax, so we just + * replace the cache contents (see pelCache invariant). */ + flaxIterator fi; + flaxStart(&fi, f); + flaxSeek(&fi, "^", 0); + void *directval = fi.data; + unsigned char directkey[PEL_RAX_DIRECT_KEYLEN]; + memcpy(directkey, fullkey, PEL_RAX_FLAX_KEYLEN); + directkey[PEL_RAX_FLAX_KEYLEN] = (unsigned char)fi.key; + if (pel->alloc_size) *pel->alloc_size -= flaxAllocSize(f); + flaxFree(f); + if (cache_hit) { + cache->val = directval; + cache->direct = 1; + memcpy(cache->key, directkey, PEL_RAX_DIRECT_KEYLEN); + if (!cache->dirty) { + raxRemove(pel, fullkey, PEL_RAX_FLAX_KEYLEN, NULL); + raxInsert(pel, directkey, PEL_RAX_DIRECT_KEYLEN, directval, NULL); + } + } else { + raxRemove(pel, fullkey, PEL_RAX_FLAX_KEYLEN, NULL); + raxInsert(pel, directkey, PEL_RAX_DIRECT_KEYLEN, directval, NULL); + } + } + + return old; +} + +/* --- PEL Iterator --- */ + +/* Refresh iterator fields from current rax+flax positions (flax bucket). */ +static void pelIterRefresh(pelIterator *pi) { + memcpy(pi->key, pi->ri.key, PEL_RAX_FLAX_KEYLEN); + pi->key[PEL_RAX_FLAX_KEYLEN] = (unsigned char)pi->fi.key; + streamDecodeID(pi->key, &pi->id); + pi->data = pi->fi.data; + pi->valid = 1; + pi->direct = 0; +} + +/* Refresh iterator fields from a direct entry in the current rax node. + * Direct entries have a 16-byte rax key (full streamID). */ +static void pelIterRefreshDirect(pelIterator *pi) { + memcpy(pi->key, pi->ri.key, PEL_RAX_DIRECT_KEYLEN); + streamDecodeID(pi->key, &pi->id); + pi->data = pi->ri.data; + pi->valid = 1; + pi->direct = 1; +} + +/* Position the iterator on the first valid entry starting from the current + * rax position (calling raxNext first). Returns 1 on success, 0 if no more. */ +static int pelIterAdvanceRax(pelIterator *pi) { + while (raxNext(&pi->ri)) { + if (pi->ri.key_len == PEL_RAX_DIRECT_KEYLEN) { + pelIterRefreshDirect(pi); + return 1; + } + flaxStart(&pi->fi, (flax *)pi->ri.data); + if (flaxSeek(&pi->fi, "^", 0)) { + pelIterRefresh(pi); + return 1; + } + } + pi->valid = 0; + return 0; +} + +/* Position the iterator on the first entry of the current rax node's bucket. + * Returns 1 on success, 0 if bucket is empty (should not happen). */ +static int pelIterEnterBucketHead(pelIterator *pi) { + if (pi->ri.key_len == PEL_RAX_DIRECT_KEYLEN) { + pelIterRefreshDirect(pi); + return 1; + } + flaxStart(&pi->fi, (flax *)pi->ri.data); + if (flaxSeek(&pi->fi, "^", 0)) { + pelIterRefresh(pi); + return 1; + } + return 0; +} + +/* Position the iterator on the last entry of the current rax node's bucket. + * Returns 1 on success, 0 if bucket is empty. */ +static int pelIterEnterBucketTail(pelIterator *pi) { + if (pi->ri.key_len == PEL_RAX_DIRECT_KEYLEN) { + pelIterRefreshDirect(pi); + return 1; + } + flaxStart(&pi->fi, (flax *)pi->ri.data); + if (flaxSeek(&pi->fi, "$", 0)) { + pelIterRefresh(pi); + return 1; + } + return 0; +} + +void pelIterStart(pelIterator *pi, rax *pel) { + pelCacheFlush(pel); + raxStart(&pi->ri, pel); + pi->valid = 0; + pi->just_seeked = 0; + pi->direct = 0; + memset(&pi->fi, 0, sizeof(pi->fi)); + memset(&pi->id, 0, sizeof(pi->id)); + pi->data = NULL; +} + +int pelIterSeek(pelIterator *pi, const char *op, streamID *id) { + pelCacheFlush(pi->ri.rt); + pi->valid = 0; + pi->just_seeked = 0; + if (op[0] == '^') { + raxSeek(&pi->ri, "^", NULL, 0); + if (!raxNext(&pi->ri)) return 0; + if (!pelIterEnterBucketHead(pi)) return 0; + pi->just_seeked = 1; + return 1; + } else if (op[0] == '$') { + raxSeek(&pi->ri, "$", NULL, 0); + if (!raxNext(&pi->ri)) return 0; + if (!pelIterEnterBucketTail(pi)) return 0; + pi->just_seeked = 1; + return 1; + } else if (op[0] == '>' && op[1] == '=') { + unsigned char fullkey[PEL_RAX_DIRECT_KEYLEN]; + streamEncodeID(fullkey, id); + uint8_t fkey = fullkey[PEL_RAX_FLAX_KEYLEN]; + + raxSeek(&pi->ri, "<=", fullkey, PEL_RAX_DIRECT_KEYLEN); + if (!raxNext(&pi->ri)) { + raxSeek(&pi->ri, "^", NULL, 0); + if (!raxNext(&pi->ri)) return 0; + if (!pelIterEnterBucketHead(pi)) return 0; + pi->just_seeked = 1; + return 1; + } + + int prefix_cmp = memcmp(pi->ri.key, fullkey, PEL_RAX_FLAX_KEYLEN); + + /* raxSeek("<=", fullkey, 16) guarantees the returned key is <= fullkey, + * so its first 15 bytes cannot exceed fullkey's prefix. */ + serverAssert(prefix_cmp <= 0); + if (prefix_cmp == 0) { + if (pi->ri.key_len == PEL_RAX_DIRECT_KEYLEN) { + if (pi->ri.key[PEL_RAX_FLAX_KEYLEN] >= fkey) { + pelIterRefreshDirect(pi); + } else { + if (!pelIterAdvanceRax(pi)) return 0; + } + } else { + flaxStart(&pi->fi, (flax *)pi->ri.data); + if (!flaxSeek(&pi->fi, ">=", fkey)) { + if (!pelIterAdvanceRax(pi)) return 0; + } else { + pelIterRefresh(pi); + } + } + } else { + if (!pelIterAdvanceRax(pi)) return 0; + } + pi->just_seeked = 1; + return 1; + } + return 0; +} + +int pelIterNext(pelIterator *pi) { + if (pi->just_seeked) { + pi->just_seeked = 0; + return pi->valid; + } + if (!pi->valid) return 0; + + if (!pi->direct && flaxNext(&pi->fi)) { + pelIterRefresh(pi); + return 1; + } + + return pelIterAdvanceRax(pi); +} + +void pelIterStop(pelIterator *pi) { + raxStop(&pi->ri); + pi->valid = 0; +} + /* ----------------------------------------------------------------------- * Low level stream encoding: a radix tree of listpacks. * ----------------------------------------------------------------------- */ @@ -98,6 +652,11 @@ static void streamLpFreeGeneric(void *lp, void *strm) { lpFree(lp); } +static void listReleaseGenericCb(void *val, void *ctx) { + (void)ctx; + listReleaseGeneric(val); +} + void streamFreeIdmpProducerGeneric(void *producer, void *strm) { stream *s = strm; idmpProducerFree((idmpProducer *)producer, &s->alloc_size); @@ -109,7 +668,7 @@ void freeStream(stream *s) { if (s->cgroups) raxFreeWithCbAndContext(s->cgroups, streamFreeCGGeneric, s); if (s->cgroups_ref) - raxFreeWithCallback(s->cgroups_ref, listReleaseGeneric); + pelFree(s->cgroups_ref, listReleaseGenericCb, NULL); /* Free IDMP producers rax tree */ if (s->idmp_producers) raxFreeWithCbAndContext(s->idmp_producers, streamFreeIdmpProducerGeneric, s); @@ -284,13 +843,13 @@ robj *streamDup(robj *o) { /* Consumer Group PEL -- walk the time-ordered list so we can * append directly and preserve NACK zone structure. */ for (streamNACK *nack = cg->pel_time_head; nack; nack = nack->pel_next) { - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf, &nack->id); streamNACK *new_nack = streamCreateNACK(new_s, NULL, &nack->id); new_nack->delivery_time = nack->delivery_time; new_nack->delivery_count = nack->delivery_count; - new_nack->cgroup_ref_node = streamLinkCGroupToEntry(new_s, new_cg, buf); - raxInsert(new_cg->pel, buf, sizeof(streamID), new_nack, NULL); + new_nack->cgroup_ref_node = streamLinkCGroupToEntry(new_s, new_cg, &nack->id); + pelInsert(new_cg->pel, &nack->id, new_nack, &new_cg->pel_count); + + /* Insert in sorted order to preserve ordering */ pelListInsertAtTail(new_cg, new_nack); if (nack == cg->pel_nack_tail) new_cg->pel_nack_tail = new_nack; } @@ -307,27 +866,26 @@ robj *streamDup(robj *o) { new_s->alloc_size += usable; new_consumer->name = sdsdup(consumer->name); new_s->alloc_size += sdsAllocSize(new_consumer->name); - new_consumer->pel = raxNewWithMetadata(0, &new_s->alloc_size); + new_consumer->pel = pelNew(&new_s->alloc_size); + new_consumer->pel_count = 0; raxInsert(new_cg->consumers,(unsigned char *)new_consumer->name, sdslen(new_consumer->name), new_consumer, NULL); new_consumer->seen_time = consumer->seen_time; new_consumer->active_time = consumer->active_time; /* Consumer PEL */ - raxIterator ri_cpel; - raxStart(&ri_cpel, consumer->pel); - raxSeek(&ri_cpel, "^", NULL, 0); - while (raxNext(&ri_cpel)) { + pelIterator pi_cpel; + pelIterStart(&pi_cpel, consumer->pel); + pelIterSeek(&pi_cpel, "^", NULL); + while(pelIterNext(&pi_cpel)) { void *result; - int found = raxFind(new_cg->pel,ri_cpel.key,sizeof(streamID),&result); - + int found = pelFind(new_cg->pel, &pi_cpel.id, &result); serverAssert(found); - streamNACK *new_nack = result; new_nack->consumer = new_consumer; - raxInsert(new_consumer->pel,ri_cpel.key,sizeof(streamID),new_nack,NULL); + pelInsert(new_consumer->pel, &pi_cpel.id, new_nack, &new_consumer->pel_count); } - raxStop(&ri_cpel); + pelIterStop(&pi_cpel); } raxStop(&ri_consumers); } @@ -2097,12 +2655,10 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end /* Transfer ownership if needed */ if (nack->consumer != consumer) { - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf, &nack->id); if (nack->consumer) - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + pelRemove(nack->consumer->pel, &nack->id, &nack->consumer->pel_count); nack->consumer = consumer; - raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); + pelInsert(consumer->pel, &nack->id, nack, &consumer->pel_count); } nack->delivery_count += nack->delivery_count == LLONG_MAX ? 0 : 1; pelListUpdate(group, nack, cmd_time_snapshot); /* Moves element from beginning to end of list */ @@ -2221,15 +2777,12 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end * a NACK for the entry, we need to associate it to the new * consumer. */ if (group && !noack) { - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf,&id); - /* Try to add a new NACK. Most of the time this will work and * will not require extra lookups. We'll fix the problem later * if we find that there is already an entry for this ID. */ streamNACK *nack = streamCreateNACK(s, consumer, &id); int group_inserted = - raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL); + pelTryInsert(group->pel,&id,nack,&group->pel_count); /* Now we can check if the entry was already busy, and * in that case reassign the entry to the new consumer, @@ -2237,23 +2790,23 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end if (group_inserted == 0) { streamFreeNACK(s,nack); void *result; - int found = raxFind(group->pel,buf,sizeof(buf),&result); + int found = pelFind(group->pel, &id, &result); serverAssert(found); nack = result; /* Only transfer between consumers if they're different */ if (nack->consumer != consumer) { - if (nack->consumer) - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + if (nack->consumer) + pelRemove(nack->consumer->pel, &id, &nack->consumer->pel_count); nack->consumer = consumer; - raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); + pelInsert(consumer->pel, &id, nack, &consumer->pel_count); } nack->delivery_count = 1; /* Update delivery time and reposition in time list */ pelListUpdate(group, nack, cmd_time_snapshot); } else { /* New NACK - insert into consumer's PEL and time list */ - raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); - nack->cgroup_ref_node = streamLinkCGroupToEntry(s, group, buf); + pelInsert(consumer->pel, &id, nack, &consumer->pel_count); + nack->cgroup_ref_node = streamLinkCGroupToEntry(s, group, &id); pelListInsertAtTail(group, nack); } @@ -2302,21 +2855,14 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end * to the client. However clients only reach this code path when they are * fetching the history of already retrieved messages, which is rare. */ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer) { - raxIterator ri; - unsigned char startkey[sizeof(streamID)]; - unsigned char endkey[sizeof(streamID)]; - streamEncodeID(startkey,start); - if (end) streamEncodeID(endkey,end); - size_t arraylen = 0; void *arraylen_ptr = addReplyDeferredLen(c); - raxStart(&ri,consumer->pel); - raxSeek(&ri,">=",startkey,sizeof(startkey)); - while(raxNext(&ri) && (!count || arraylen < count)) { - if (end && memcmp(ri.key,endkey,ri.key_len) > 0) break; - streamID thisid; - streamDecodeID(ri.key,&thisid); - if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,-1,NULL,NULL, + pelIterator pi; + pelIterStart(&pi, consumer->pel); + pelIterSeek(&pi, ">=", start); + while (pelIterNext(&pi) && (!count || arraylen < count)) { + if (end && streamCompareID(&pi.id, end) > 0) break; + if (streamReplyWithRange(c,s,&pi.id,&pi.id,1,0,-1,NULL,NULL, STREAM_RWR_RAWENTRIES,NULL,NULL) == 0) { /* Note that we may have a not acknowledged entry in the PEL @@ -2324,16 +2870,16 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start * by the user by other means. In that case we signal it emitting * the ID but then a NULL entry for the fields. */ addReplyArrayLen(c,2); - addReplyStreamID(c,&thisid); + addReplyStreamID(c,&pi.id); addReplyNullArray(c); } else { - streamNACK *nack = ri.data; + streamNACK *nack = pi.data; nack->delivery_count += nack->delivery_count == LLONG_MAX ? 0 : 1; pelListUpdate(group, nack, commandTimeSnapshot()); } arraylen++; } - raxStop(&ri); + pelIterStop(&pi); setDeferredArrayLen(c,arraylen_ptr,arraylen); return arraylen; } @@ -3121,18 +3667,17 @@ void streamUpdateCGroupLastId(stream *s, streamCG *cg, streamID *id) { /* Link a consumer group to a stream entry in the cgroups_ref index. * Returns a pointer to the list node, so that it can be used for future deletion. */ -listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, unsigned char *key) { +listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, streamID *id) { list *cglist; if (!s->cgroups_ref) - s->cgroups_ref = raxNewWithMetadata(0, &s->alloc_size); - + s->cgroups_ref = pelNew(&s->alloc_size); + /* Try to find the list for this stream ID, create it if it doesn't exist */ - if (!raxFind(s->cgroups_ref, key, sizeof(streamID), (void**)&cglist)) { + if (!pelFind(s->cgroups_ref, id, (void**)&cglist)) { cglist = listCreate(); - serverAssert(raxInsert(s->cgroups_ref, key, sizeof(streamID), cglist, NULL)); + serverAssert(pelInsert(s->cgroups_ref, id, cglist, NULL)); } - /* Add the consumer group to the list and return the list node */ listAddNodeTail(cglist, cg); return listLast(cglist); @@ -3140,15 +3685,15 @@ listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, unsigned char *key) { /* Unlink a consumer group reference from the entry index for a specific stream ID. * This is called when a message is acknowledged or when a consumer group is deleted. */ -void streamUnlinkEntryFromCGroupRef(stream *s, streamNACK *na, unsigned char *key) { +void streamUnlinkEntryFromCGroupRef(stream *s, streamNACK *na) { list *cglist; if (!s->cgroups_ref) return; - if (raxFind(s->cgroups_ref, key, sizeof(streamID), (void**)&cglist)) { + if (pelFind(s->cgroups_ref, &na->id, (void**)&cglist)) { listDelNode(cglist, na->cgroup_ref_node); - + /* If the list is now empty, remove it from the index. */ if (listLength(cglist) == 0) { - raxRemove(s->cgroups_ref, key, sizeof(streamID), NULL); + pelRemove(s->cgroups_ref, &na->id, NULL); listRelease(cglist); } } @@ -3160,32 +3705,30 @@ void streamCleanupEntryCGroupRefs(stream *s, streamID *id) { list *cglist; listIter li; listNode *ln; - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf, id); - + /* If message is not in any consumer group, nothing to do */ - if (!raxFind(s->cgroups_ref, buf, sizeof(streamID), (void **)&cglist)) + if (!pelFind(s->cgroups_ref, id, (void**)&cglist)) return; listRewind(cglist, &li); while ((ln = listNext(&li))) { streamNACK *nack; streamCG *group = listNodeValue(ln); - + /* Find the message in this consumer group's PEL */ - serverAssert(raxFind(group->pel, buf, sizeof(buf), (void **)&nack)); - + serverAssert(pelFind(group->pel, id, (void**)&nack)); + /* Remove from group and consumer PELs */ pelListUnlink(group, nack); - raxRemove(group->pel, buf, sizeof(buf), NULL); - if (nack->consumer) - raxRemove(nack->consumer->pel, buf, sizeof(buf), NULL); /* Since we're removing all references from the cgroups_ref, we can directly * free the NACK without unlinking it from the cgroups_ref. */ + pelRemove(group->pel, id, &group->pel_count); + if (nack->consumer) + pelRemove(nack->consumer->pel, id, &nack->consumer->pel_count); streamFreeNACK(s, nack); } - raxRemove(s->cgroups_ref, buf, sizeof(streamID), NULL); + pelRemove(s->cgroups_ref, id, NULL); listRelease(cglist); } @@ -3222,9 +3765,7 @@ int streamEntryIsReferenced(stream *s, streamID *id) { /* Check if the message is in any consumer group's PEL */ if (!s->cgroups_ref) return 0; - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf, id); - return raxFind(s->cgroups_ref, buf, sizeof(streamID), NULL); + return pelFind(s->cgroups_ref, id, NULL); } /* Create a NACK entry setting the delivery count to 1 and the delivery @@ -3254,10 +3795,10 @@ void streamFreeNACK(stream *s, streamNACK *na) { /* Free a NACK entry and remove its reference from the cgroups_ref. * This ensures proper cleanup of the consumer group list associated with the message ID. * Note: Caller must ensure NACK is unlinked from pel_time list before calling. */ -void streamDestroyNACK(stream *s, streamNACK *na, unsigned char *key) { +void streamDestroyNACK(stream *s, streamNACK *na) { size_t usable; serverAssert(na->pel_prev == NULL && na->pel_next == NULL); - streamUnlinkEntryFromCGroupRef(s, na, key); + streamUnlinkEntryFromCGroupRef(s, na); zfree_usable(na, &usable); s->alloc_size -= usable; } @@ -3283,8 +3824,7 @@ void streamFreeNACKGeneric(void *na, void *ctx) { * should do some work before. */ void streamFreeConsumer(stream *s, streamConsumer *sc) { size_t usable; - raxFree(sc->pel); /* No value free callback: the PEL entries are shared - between the consumer and the main stream PEL. */ + pelFreeShallow(sc->pel); s->alloc_size -= sdsAllocSize(sc->name); sdsfree(sc->name); zfree_usable(sc, &usable); @@ -3309,7 +3849,8 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, lo size_t usable; streamCG *cg = zmalloc_usable(sizeof(*cg), &usable); s->alloc_size += usable; - cg->pel = raxNewWithMetadata(0, &s->alloc_size); + cg->pel = pelNew(&s->alloc_size); + cg->pel_count = 0; cg->pel_time_head = NULL; cg->pel_time_tail = NULL; cg->pel_nack_tail = NULL; @@ -3326,7 +3867,7 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, lo static void streamFreeCG(stream *s, streamCG *cg) { /* Free the pel, unlinking each NACK from the time list in the callback */ streamFreeNACKCtx ctx = {s, cg}; - raxFreeWithCbAndContext(cg->pel, streamFreeNACKGeneric, &ctx); + pelFree(cg->pel, streamFreeNACKGeneric, &ctx); /* pel_time_head/tail/pel_nack_tail should now be NULL after unlinking all NACKs */ serverAssert(cg->pel_time_head == NULL && cg->pel_time_tail == NULL && cg->pel_nack_tail == NULL); @@ -3340,14 +3881,13 @@ static void streamFreeCG(stream *s, streamCG *cg) { /* Destroy a consumer group and clean up all associated references. */ void streamDestroyCG(stream *s, streamCG *cg) { /* Remove all references from the cgroups_ref. */ - raxIterator it; - raxStart(&it, cg->pel); - raxSeek(&it, "^", NULL, 0); - while (raxNext(&it)) { - streamNACK *nack = it.data; - streamUnlinkEntryFromCGroupRef(s, nack, it.key); + pelIterator pi; + pelIterStart(&pi, cg->pel); + pelIterSeek(&pi, "^", NULL); + while (pelIterNext(&pi)) { + streamUnlinkEntryFromCGroupRef(s,pi.data); } - raxStop(&it); + pelIterStop(&pi); /* If we're destroying the group with the minimum last_id, the cached * minimum is no longer valid and needs to be recalculated from the @@ -3391,7 +3931,8 @@ streamConsumer *streamCreateConsumer(stream *s, streamCG *cg, sds name, robj *ke s->alloc_size += usable; consumer->name = sdsdup(name); s->alloc_size += sdsAllocSize(consumer->name); - consumer->pel = raxNewWithMetadata(0, &s->alloc_size); + consumer->pel = pelNew(&s->alloc_size); + consumer->pel_count = 0; consumer->active_time = -1; consumer->seen_time = commandTimeSnapshot(); if (dirty) server.dirty++; @@ -3411,22 +3952,19 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name) { void streamDelConsumer(stream *s, streamCG *cg, streamConsumer *consumer) { /* Iterate all the consumer pending messages, deleting every corresponding * entry from the global entry. */ - raxIterator ri; - raxStart(&ri,consumer->pel); - raxSeek(&ri,"^",NULL,0); - while(raxNext(&ri)) { - streamNACK *nack = ri.data; - streamUnlinkEntryFromCGroupRef(s, nack, ri.key); + pelIterator pi; + pelIterStart(&pi, consumer->pel); + pelIterSeek(&pi, "^", NULL); + while (pelIterNext(&pi)) { + streamNACK *nack = pi.data; + streamUnlinkEntryFromCGroupRef(s,nack); - streamID id; - streamDecodeID(ri.key, &id); + pelListUnlink(cg,nack); + pelRemove(cg->pel,&pi.id,&cg->pel_count); - pelListUnlink(cg, nack); - raxRemove(cg->pel,ri.key,ri.key_len,NULL); - - streamFreeNACK(s, nack); + streamFreeNACK(s,nack); } - raxStop(&ri); + pelIterStop(&pi); /* Deallocate the consumer. */ raxRemove(cg->consumers,(unsigned char*)consumer->name, @@ -3622,7 +4160,7 @@ NULL * that were yet associated with such a consumer. */ if (server.memory_tracking_enabled) old_alloc = kvobjAllocSize(o); - pending = raxSize(consumer->pel); + pending = consumer->pel_count; streamDelConsumer(s,cg,consumer); if (server.memory_tracking_enabled) updateSlotAllocSize(c->db,getKeySlot(c->argv[2]->ptr),o,old_alloc,kvobjAllocSize(o)); @@ -3809,20 +4347,17 @@ void xackCommand(client *c) { int acknowledged = 0; size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(kv) : 0; for (int j = 3; j < c->argc; j++) { - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf,&ids[j-3]); - /* Lookup the ID in the group PEL: it will have a reference to the * NACK structure that will have a reference to the consumer, so that * we are able to remove the entry from both PELs. */ void *result; - if (raxFind(group->pel,buf,sizeof(buf),&result)) { + if (pelFind(group->pel, &ids[j-3], &result)) { streamNACK *nack = result; pelListUnlink(group, nack); - raxRemove(group->pel,buf,sizeof(buf),NULL); + pelRemove(group->pel, &ids[j-3], &group->pel_count); if (nack->consumer) - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); - streamDestroyNACK(kv->ptr, nack, buf); + pelRemove(nack->consumer->pel, &ids[j-3], &nack->consumer->pel_count); + streamDestroyNACK(kv->ptr, nack); acknowledged++; server.dirty++; keyModified(c,c->db,c->argv[1],kv,0); @@ -3935,16 +4470,13 @@ void xnackCommand(client *c) { int nacked = 0; size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(kv) : 0; for (int j = 0; j < numids; j++) { - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf,&ids[j]); - void *result; - int found = raxFind(group->pel,buf,sizeof(buf),&result); + int found = pelFind(group->pel,&ids[j],&result); if (found) { streamNACK *nack = result; nackSetDeliveryCount(nack, mode, retrycount); if (nack->consumer != NULL) { - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + pelRemove(nack->consumer->pel,&ids[j],&nack->consumer->pel_count); nack->consumer = NULL; } @@ -3966,9 +4498,9 @@ void xnackCommand(client *c) { nack->delivery_count = 0; nackSetDeliveryCount(nack, mode, retrycount); - raxInsert(group->pel, buf, sizeof(buf), nack, NULL); + pelInsert(group->pel, &ids[j], nack, &group->pel_count); pelListInsertNacked(group, nack); - nack->cgroup_ref_node = streamLinkCGroupToEntry(s, group, buf); + nack->cgroup_ref_node = streamLinkCGroupToEntry(s, group, &ids[j]); } else { continue; } @@ -4041,20 +4573,18 @@ void xackdelCommand(client *c) { for (int j = 0; j < args.numids; j++) { int res = XACKDEL_NO_ID; streamID *id = &ids[j]; - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf,id); /* Lookup the ID in the group PEL: it will have a reference to the * NACK structure that will have a reference to the consumer, so that * we are able to remove the entry from both PELs. */ void *result; - if (raxFind(group->pel,buf,sizeof(buf),&result)) { + if (pelFind(group->pel, id, &result)) { streamNACK *nack = result; pelListUnlink(group, nack); - raxRemove(group->pel,buf,sizeof(buf),NULL); + pelRemove(group->pel, id, &group->pel_count); if (nack->consumer) - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); - streamDestroyNACK(s, nack, buf); + pelRemove(nack->consumer->pel, id, &nack->consumer->pel_count); + streamDestroyNACK(s, nack); server.dirty++; int can_delete = 1; @@ -4196,39 +4726,38 @@ void xpendingCommand(client *c) { if (justinfo) { addReplyArrayLen(c,4); /* Total number of messages in the PEL. */ - addReplyLongLong(c,raxSize(group->pel)); + addReplyLongLong(c,group->pel_count); /* First and last IDs. */ - if (raxSize(group->pel) == 0) { + if (group->pel_count == 0) { addReplyNull(c); /* Start. */ addReplyNull(c); /* End. */ addReplyNullArray(c); /* Clients. */ } else { /* Start. */ - raxIterator ri; - raxStart(&ri,group->pel); - raxSeek(&ri,"^",NULL,0); - raxNext(&ri); - streamDecodeID(ri.key,&startid); - addReplyStreamID(c,&startid); + pelIterator pi; + pelIterStart(&pi,group->pel); + pelIterSeek(&pi,"^",NULL); + pelIterNext(&pi); + addReplyStreamID(c,&pi.id); /* End. */ - raxSeek(&ri,"$",NULL,0); - raxNext(&ri); - streamDecodeID(ri.key,&endid); - addReplyStreamID(c,&endid); - raxStop(&ri); + pelIterSeek(&pi,"$",NULL); + pelIterNext(&pi); + addReplyStreamID(c,&pi.id); + pelIterStop(&pi); /* Consumers with pending messages. */ + raxIterator ri; raxStart(&ri,group->consumers); raxSeek(&ri,"^",NULL,0); void *arraylen_ptr = addReplyDeferredLen(c); size_t arraylen = 0; while(raxNext(&ri)) { streamConsumer *consumer = ri.data; - if (raxSize(consumer->pel) == 0) continue; + if (consumer->pel_count == 0) continue; addReplyArrayLen(c,2); addReplyBulkCBuffer(c,ri.key,ri.key_len); - addReplyBulkLongLong(c,raxSize(consumer->pel)); + addReplyBulkLongLong(c,consumer->pel_count); arraylen++; } setDeferredArrayLen(c,arraylen_ptr,arraylen); @@ -4248,20 +4777,16 @@ void xpendingCommand(client *c) { } rax *pel = consumer ? consumer->pel : group->pel; - unsigned char startkey[sizeof(streamID)]; - unsigned char endkey[sizeof(streamID)]; - raxIterator ri; mstime_t now = commandTimeSnapshot(); - streamEncodeID(startkey,&startid); - streamEncodeID(endkey,&endid); - raxStart(&ri,pel); - raxSeek(&ri,">=",startkey,sizeof(startkey)); + pelIterator pi; + pelIterStart(&pi,pel); + pelIterSeek(&pi,">=",&startid); void *arraylen_ptr = addReplyDeferredLen(c); size_t arraylen = 0; - while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) { - streamNACK *nack = ri.data; + while (count && pelIterNext(&pi) && streamCompareID(&pi.id,&endid) <= 0) { + streamNACK *nack = pi.data; if (nack->consumer && minidle) { mstime_t this_idle = now - nack->delivery_time; @@ -4273,9 +4798,7 @@ void xpendingCommand(client *c) { addReplyArrayLen(c,4); /* Entry ID. */ - streamID id; - streamDecodeID(ri.key,&id); - addReplyStreamID(c,&id); + addReplyStreamID(c,&pi.id); /* Consumer name (empty string if NACKed / unowned). */ if (nack->consumer) { @@ -4298,7 +4821,7 @@ void xpendingCommand(client *c) { /* Number of deliveries. */ addReplyLongLong(c,nack->delivery_count); } - raxStop(&ri); + pelIterStop(&pi); setDeferredArrayLen(c,arraylen_ptr,arraylen); } } @@ -4483,12 +5006,10 @@ void xclaimCommand(client *c) { size_t arraylen = 0; for (int j = 5; j <= last_id_arg; j++) { streamID id = ids[j-5]; - unsigned char buf[sizeof(streamID)]; - streamEncodeID(buf,&id); /* Lookup the ID in the group PEL. */ void *result = NULL; - raxFind(group->pel,buf,sizeof(buf),&result); + pelFind(group->pel,&id,&result); streamNACK *nack = result; /* Item must exist for us to transfer it to another consumer. */ @@ -4506,11 +5027,11 @@ void xclaimCommand(client *c) { } server.dirty++; /* Release the NACK */ - pelListUnlink(group, nack); - raxRemove(group->pel,buf,sizeof(buf),NULL); + pelListUnlink(group,nack); + pelRemove(group->pel,&id,&group->pel_count); if (nack->consumer) - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); - streamDestroyNACK(s, nack, buf); + pelRemove(nack->consumer->pel,&id,&nack->consumer->pel_count); + streamDestroyNACK(s,nack); } continue; } @@ -4522,10 +5043,10 @@ void xclaimCommand(client *c) { * and replication of consumer groups. */ if (force && nack == NULL) { /* Create the NACK. */ - nack = streamCreateNACK(s, NULL, &id); - raxInsert(group->pel,buf,sizeof(buf),nack,NULL); + nack = streamCreateNACK(s,NULL,&id); + pelInsert(group->pel,&id,nack,&group->pel_count); pelListInsertAtTail(group, nack); - nack->cgroup_ref_node = streamLinkCGroupToEntry(s, group, buf); + nack->cgroup_ref_node = streamLinkCGroupToEntry(s,group,&id); } if (nack != NULL) { @@ -4545,7 +5066,7 @@ void xclaimCommand(client *c) { * Note that nack->consumer is NULL if we created the * NACK above because of the FORCE option. */ if (nack->consumer) { - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + pelRemove(nack->consumer->pel,&id,&nack->consumer->pel_count); } } @@ -4560,7 +5081,7 @@ void xclaimCommand(client *c) { } if (nack->consumer != consumer) { /* Add the entry in the new consumer local PEL. */ - raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); + pelInsert(consumer->pel,&id,nack,&consumer->pel_count); nack->consumer = consumer; } /* Send the reply for this entry. */ @@ -4685,19 +5206,15 @@ void xautoclaimCommand(client *c) { void *endidptr = addReplyDeferredLen(c); /* reply[0] */ void *arraylenptr = addReplyDeferredLen(c); /* reply[1] */ - unsigned char startkey[sizeof(streamID)]; - streamEncodeID(startkey,&startid); - raxIterator ri; - raxStart(&ri,group->pel); - raxSeek(&ri,">=",startkey,sizeof(startkey)); + pelIterator pi; + pelIterStart(&pi,group->pel); + pelIterSeek(&pi,">=",&startid); size_t arraylen = 0; mstime_t now = commandTimeSnapshot(); int deleted_id_num = 0; - while (attempts-- && count && raxNext(&ri)) { - streamNACK *nack = ri.data; - - streamID id; - streamDecodeID(ri.key, &id); + while (attempts-- && count && pelIterNext(&pi)) { + streamNACK *nack = pi.data; + streamID id = pi.id; /* Item must exist for us to transfer it to another consumer. */ if (!streamEntryExists(s,&id)) { @@ -4715,14 +5232,14 @@ void xautoclaimCommand(client *c) { } server.dirty++; /* Clear this entry from the PEL, it no longer exists */ - pelListUnlink(group, nack); - raxRemove(group->pel,ri.key,ri.key_len,NULL); + pelListUnlink(group,nack); + pelRemove(group->pel,&id,&group->pel_count); if (nack->consumer) - raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL); - streamDestroyNACK(s, nack, ri.key); + pelRemove(nack->consumer->pel,&id,&nack->consumer->pel_count); + streamDestroyNACK(s,nack); /* Remember the ID for later */ deleted_ids[deleted_id_num++] = id; - raxSeek(&ri,">=",ri.key,ri.key_len); + pelIterSeek(&pi,">=",&id); count--; /* Count is a limit of the command response size. */ continue; } @@ -4738,7 +5255,7 @@ void xautoclaimCommand(client *c) { * Note that nack->consumer is NULL if we created the * NACK above because of the FORCE option. */ if (nack->consumer) { - raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL); + pelRemove(nack->consumer->pel,&id,&nack->consumer->pel_count); } } @@ -4751,7 +5268,7 @@ void xautoclaimCommand(client *c) { if (nack->consumer != consumer) { /* Add the entry in the new consumer local PEL. */ - raxInsert(consumer->pel,ri.key,ri.key_len,nack,NULL); + pelInsert(consumer->pel,&id,nack,&consumer->pel_count); nack->consumer = consumer; } @@ -4774,18 +5291,18 @@ void xautoclaimCommand(client *c) { } /* We need to return the next entry as a cursor for the next XAUTOCLAIM call */ - raxNext(&ri); + pelIterNext(&pi); if (server.memory_tracking_enabled) updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),o,old_alloc,kvobjAllocSize(o)); streamID endid; - if (raxEOF(&ri)) { + if (!pi.valid) { endid.ms = endid.seq = 0; } else { - streamDecodeID(ri.key, &endid); + endid = pi.id; } - raxStop(&ri); + pelIterStop(&pi); setDeferredArrayLen(c,arraylenptr,arraylen); setDeferredReplyStreamID(c,endidptr,&endid); @@ -5175,7 +5692,7 @@ void xinfoReplyWithStreamInfo(client *c, robj *key, kvobj *kv) { /* Group PEL count */ addReplyBulkCString(c,"pel-count"); - addReplyLongLong(c,raxSize(cg->pel)); + addReplyLongLong(c,cg->pel_count); /* NACKed entries count (entries in the NACK zone) */ addReplyBulkCString(c,"nacked-count"); @@ -5185,17 +5702,15 @@ void xinfoReplyWithStreamInfo(client *c, robj *key, kvobj *kv) { addReplyBulkCString(c,"pending"); long long arraylen_cg_pel = 0; void *arrayptr_cg_pel = addReplyDeferredLen(c); - raxIterator ri_cg_pel; - raxStart(&ri_cg_pel,cg->pel); - raxSeek(&ri_cg_pel,"^",NULL,0); - while(raxNext(&ri_cg_pel) && (!count || arraylen_cg_pel < count)) { - streamNACK *nack = ri_cg_pel.data; + pelIterator pi_cg; + pelIterStart(&pi_cg,cg->pel); + pelIterSeek(&pi_cg,"^",NULL); + while (pelIterNext(&pi_cg) && (!count || arraylen_cg_pel < count)) { + streamNACK *nack = pi_cg.data; addReplyArrayLen(c,4); /* Entry ID. */ - streamID id; - streamDecodeID(ri_cg_pel.key,&id); - addReplyStreamID(c,&id); + addReplyStreamID(c,&pi_cg.id); /* Consumer name (empty string if NACKed / unowned). */ if (nack->consumer) { @@ -5214,7 +5729,7 @@ void xinfoReplyWithStreamInfo(client *c, robj *key, kvobj *kv) { arraylen_cg_pel++; } setDeferredArrayLen(c,arrayptr_cg_pel,arraylen_cg_pel); - raxStop(&ri_cg_pel); + pelIterStop(&pi_cg); /* Consumers */ addReplyBulkCString(c,"consumers"); @@ -5240,23 +5755,21 @@ void xinfoReplyWithStreamInfo(client *c, robj *key, kvobj *kv) { /* Consumer PEL count */ addReplyBulkCString(c,"pel-count"); - addReplyLongLong(c,raxSize(consumer->pel)); + addReplyLongLong(c,consumer->pel_count); /* Consumer PEL */ addReplyBulkCString(c,"pending"); long long arraylen_cpel = 0; void *arrayptr_cpel = addReplyDeferredLen(c); - raxIterator ri_cpel; - raxStart(&ri_cpel,consumer->pel); - raxSeek(&ri_cpel,"^",NULL,0); - while(raxNext(&ri_cpel) && (!count || arraylen_cpel < count)) { - streamNACK *nack = ri_cpel.data; + pelIterator pi_cpel; + pelIterStart(&pi_cpel,consumer->pel); + pelIterSeek(&pi_cpel,"^",NULL); + while (pelIterNext(&pi_cpel) && (!count || arraylen_cpel < count)) { + streamNACK *nack = pi_cpel.data; addReplyArrayLen(c,3); /* Entry ID. */ - streamID id; - streamDecodeID(ri_cpel.key,&id); - addReplyStreamID(c,&id); + addReplyStreamID(c,&pi_cpel.id); /* Last delivery. */ addReplyLongLong(c,nack->delivery_time); @@ -5267,7 +5780,7 @@ void xinfoReplyWithStreamInfo(client *c, robj *key, kvobj *kv) { arraylen_cpel++; } setDeferredArrayLen(c,arrayptr_cpel,arraylen_cpel); - raxStop(&ri_cpel); + pelIterStop(&pi_cpel); } raxStop(&ri_consumers); } @@ -5338,7 +5851,7 @@ NULL addReplyBulkCString(c,"name"); addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); addReplyBulkCString(c,"pending"); - addReplyLongLong(c,raxSize(consumer->pel)); + addReplyLongLong(c,consumer->pel_count); addReplyBulkCString(c,"idle"); addReplyLongLong(c,idle); addReplyBulkCString(c,"inactive"); @@ -5364,7 +5877,7 @@ NULL addReplyBulkCString(c,"consumers"); addReplyLongLong(c,raxSize(cg->consumers)); addReplyBulkCString(c,"pending"); - addReplyLongLong(c,raxSize(cg->pel)); + addReplyLongLong(c,cg->pel_count); addReplyBulkCString(c,"last-delivered-id"); addReplyStreamID(c,&cg->last_id); addReplyBulkCString(c,"entries-read");