Avoid double-walk on find-then-insert via raxFindLink + raxInsertAt (#15252)

Add `raxFindLink()` + `raxInsertAt()`: a two-step commit API that
lets a caller walk the rax once to find a key, then commit an insert
at the recorded position without re-walking. Today's
"lookup then insert" pattern (`raxFind` + `raxInsert` on miss) walks
the tree twice; this change collapses the worst case to a single
walk.

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: debing.sun <debing.sun@redis.com>
This commit is contained in:
Moti Cohen 2026-06-02 18:01:16 +03:00 committed by GitHub
parent 1b3721eb22
commit 2df35f9fb0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 233 additions and 55 deletions

View file

@ -10108,8 +10108,9 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod
while(1) {
key = htonu64(expiretime);
if (!raxFind(Timers, (unsigned char*)&key,sizeof(key),NULL)) {
raxInsert(Timers,(unsigned char*)&key,sizeof(key),timer,NULL);
raxNodeLink link;
if (!raxFindLink(Timers, (unsigned char*)&key, sizeof(key), NULL, &link)) {
raxInsertAt(Timers, (unsigned char*)&key, sizeof(key), timer, NULL, &link);
break;
} else {
expiretime++;

211
src/rax.c
View file

@ -524,25 +524,54 @@ static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode
return i;
}
/* Insert the element 's' of size 'len', setting as auxiliary data
* the pointer 'data'. If the element is already present, the associated
* data is updated (only if 'overwrite' is set to 1), and 0 is returned,
* otherwise the element is inserted and 1 is returned. On out of memory the
* function returns 0 as well but sets errno to ENOMEM, otherwise errno will
* be set to 0.
*/
int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old, int overwrite) {
size_t i, usable;
int j = 0; /* Split position. If raxLowWalk() stops in a compressed
node, the index 'j' represents the char we stopped within the
compressed node, that is, the position where to split the
node for insertion. */
raxNode *h, **parentlink;
#ifdef DEBUG_ASSERTIONS
/* Re-walk the tree and verify `link` still matches the current state,
* i.e. no rax mutation happened between raxFindLink() and the current
* raxInsertAt(). Returns 1 if the link is still valid, 0 otherwise.
* Used only from a debugAssert(). */
static int raxLinkStillValid(rax *rax, unsigned char *s, size_t len, raxNodeLink *link) {
raxNode *stopnode, **parentlink;
int splitpos = 0;
size_t consumed = raxLowWalk(rax,s,len,&stopnode,&parentlink,&splitpos,NULL);
return stopnode == link->stopnode &&
parentlink == link->parentlink &&
consumed == link->consumed &&
splitpos == link->splitpos;
}
#endif
/* Commit an insert at the position recorded in `link`. The link must
* have come from an immediately-preceding raxFindLink() on (rax, s, len)
* with no intervening rax mutation.
*
* If the link lands on an existing key, the associated data is
* overwritten with `data`, the prior value is stored at *old (when old
* is non-NULL), and 0 is returned. Otherwise the element is inserted
* and 1 is returned. Callers wanting try-insert semantics (preserve
* existing) should check raxFindLink's return first and skip this call
* when it reports 1.
*
* On out of memory the function returns 0 and sets errno to ENOMEM;
* otherwise errno is set to 0. */
int raxInsertAt(rax *rax, unsigned char *s, size_t len, void *data, void **old, raxNodeLink *link) {
size_t usable;
/* Pull walk state from `link`. */
size_t i = link->consumed;
int j = link->splitpos; /* Split position. If raxLowWalk() stopped in
a compressed node, 'j' is the char index
within the compressed node where we
stopped; i.e. the position where to split
the node for insertion. Only meaningful
when h->iscompr. */
raxNode *h = link->stopnode, **parentlink = link->parentlink;
size_t dummy, *alloc_size = &dummy;
/* The link must reflect the current tree: no rax mutation is allowed
* between the raxFindLink() that produced it and this commit. */
debugAssert(raxLinkStillValid(rax,s,len,link));
if (rax->alloc_size) alloc_size = rax->alloc_size;
debugf("### Insert %.*s with value %p\n", (int)len, s, data);
i = raxLowWalk(rax,s,len,&h,&parentlink,&j,NULL);
/* If i == len we walked following the whole string. If we are not
* in the middle of a compressed node, the string is either already
@ -552,7 +581,7 @@ int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **
if (i == len && (!h->iscompr || j == 0 /* not in the middle if j is 0 */)) {
debugf("### Insert: node representing key exists\n");
/* Make space for the value pointer if needed. */
if (!h->iskey || (h->isnull && overwrite)) {
if (!h->iskey || h->isnull) {
h = raxReallocForData(rax,h,data);
if (h) memcpy(parentlink,&h,sizeof(h));
}
@ -564,9 +593,9 @@ int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **
/* Update the existing key if there is already one. */
if (h->iskey) {
if (old) *old = raxGetData(h);
if (overwrite) raxSetData(h,data);
raxSetData(h,data);
errno = 0;
return 0; /* Element already exists. */
return 0; /* Element already exists, overwritten. */
}
/* Otherwise set the node as a key. Note that raxSetData()
@ -931,32 +960,65 @@ oom:
return 0;
}
/* Overwriting insert. Just a wrapper for raxGenericInsert() that will
* update the element if there is already one for the same key. */
int raxInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old) {
return raxGenericInsert(rax,s,len,data,old,1);
/* Walk the rax once and record the stop position in `link`. Returns 1 if
* `s` is an existing key (and, if `value` is non-NULL, stores the value
* at *value); 0 otherwise. The link is populated either way so a caller
* that meant "find or insert" can commit via raxInsertAt() without a
* second walk.
*
* Invalidation contract: `link->h` and `link->parentlink` are interior
* pointers into the tree. They become stale on ANY intervening rax
* mutation. Callers MUST commit (or discard) immediately after the
* find; do not interleave other rax calls on the same tree, do not
* retain across yield points. */
int raxFindLink(rax *rax, unsigned char *s, size_t len,
void **value, raxNodeLink *link) {
debugf("### FindLink: %.*s\n", (int)len, s);
link->splitpos = 0;
link->consumed = raxLowWalk(rax,s,len,
&link->stopnode,&link->parentlink,
&link->splitpos,NULL);
/* Match condition: query fully consumed, stopped at a clean node
* boundary (not mid-prefix on a compressed node), and the stop node
* is a key. */
if (link->consumed != len ||
(link->stopnode->iscompr && link->splitpos != 0) ||
!link->stopnode->iskey)
return 0;
if (value != NULL) *value = raxGetData(link->stopnode);
return 1;
}
/* Non overwriting insert function: if an element with the same key
* exists, the value is not updated and the function returns 0.
* This is just a wrapper for raxGenericInsert(). */
/* Overwriting insert. One walk via raxFindLink, then commit at the
* recorded position. Existing element is updated. */
int raxInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old) {
raxNodeLink link;
raxFindLink(rax, s, len, NULL, &link);
return raxInsertAt(rax,s,len,data,old,&link);
}
/* Non-overwriting insert. If an element with the same key exists, the
* value is not updated and 0 is returned (with *old set, if non-NULL,
* to the existing value). raxFindLink already tells us whether the key
* exists, so we skip raxInsertAt's overwrite path entirely in that
* case. */
int raxTryInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old) {
return raxGenericInsert(rax,s,len,data,old,0);
raxNodeLink link;
void *existing;
if (raxFindLink(rax, s, len, &existing, &link)) {
if (old) *old = existing;
errno = 0;
return 0;
}
return raxInsertAt(rax,s,len,data,old,&link);
}
/* Find a key in the rax: return 1 if the item is found, 0 otherwise.
* If there is an item and 'value' is passed in a non-NULL pointer,
* the value associated with the item is set at that address. */
* If there is an item and 'value' is passed in a non-NULL pointer, the
* value associated with the item is set at that address. */
int raxFind(rax *rax, unsigned char *s, size_t len, void **value) {
raxNode *h;
debugf("### Lookup: %.*s\n", (int)len, s);
int splitpos = 0;
size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,NULL);
if (i != len || (h->iscompr && splitpos != 0) || !h->iskey)
return 0;
if (value != NULL) *value = raxGetData(h);
return 1;
raxNodeLink link;
return raxFindLink(rax, s, len, value, &link);
}
/* Return the memory address where the 'parent' node stores the specified
@ -2106,6 +2168,81 @@ int raxTest(int argc, char **argv, int flags) {
raxFreeWithCallback(r, rax_free);
}
TEST("raxNodeLink: insert paths via raxFindLink + raxInsertAt") {
/* Three not-found scenarios in one tree: empty insert, ALGO 1
* mid-prefix split, ALGO 2 end-of-prefix split. */
rax *r = raxNew();
void *vK = createTestValue(8);
void *vAB = createTestValue(8);
void *vXX = createTestValue(8);
void *vAN = createTestValue(8);
raxNodeLink link;
void *val;
/* (a) Empty tree: link points at head, InsertAt commits. */
assert(raxFindLink(r, (unsigned char*)"K", 1, NULL, &link) == 0);
assert(raxInsertAt(r, (unsigned char*)"K", 1, vK, NULL, &link) == 1);
assert(raxFind(r, (unsigned char*)"K", 1, &val) == 1 && val == vK);
/* Seed compressed node "ANNIBALE". */
assert(raxInsert(r, (unsigned char*)"ANNIBALE", 8, vAB, NULL) == 1);
/* (b) ALGO 1: mismatch mid-prefix. "ANXX" stops at splitpos > 0
* inside the compressed "ANNIBALE" node. */
assert(raxFindLink(r, (unsigned char*)"ANXX", 4, NULL, &link) == 0);
assert(link.stopnode->iscompr && link.splitpos > 0);
assert(raxInsertAt(r, (unsigned char*)"ANXX", 4, vXX, NULL, &link) == 1);
/* (c) ALGO 2: query exhausts mid-prefix (i == len, splitpos > 0). */
assert(raxFindLink(r, (unsigned char*)"ANNI", 4, NULL, &link) == 0);
assert(raxInsertAt(r, (unsigned char*)"ANNI", 4, vAN, NULL, &link) == 1);
/* All four keys reachable with the correct values. */
assert(raxFind(r, (unsigned char*)"K", 1, &val) == 1 && val == vK);
assert(raxFind(r, (unsigned char*)"ANNIBALE", 8, &val) == 1 && val == vAB);
assert(raxFind(r, (unsigned char*)"ANXX", 4, &val) == 1 && val == vXX);
assert(raxFind(r, (unsigned char*)"ANNI", 4, &val) == 1 && val == vAN);
raxFreeWithCallback(r, rax_free);
}
TEST("raxNodeLink: existing-key paths (overwrite vs try-insert vs not-found)") {
/* Three exists/not-exists scenarios on the same tree. */
rax *r = raxNew();
void *v1 = createTestValue(8), *v2 = createTestValue(8);
void *v3 = createTestValue(8), *vNew = createTestValue(8);
assert(raxInsert(r, (unsigned char*)"FOO", 3, v1, NULL) == 1);
assert(raxInsert(r, (unsigned char*)"BAR", 3, v3, NULL) == 1);
raxNodeLink link;
void *val, *existing = NULL;
/* (a) Overwrite path: FindLink returns 1, caller calls InsertAt
* anyway -- value replaced, *old carries the prior pointer. */
assert(raxFindLink(r, (unsigned char*)"FOO", 3, &existing, &link) == 1);
assert(existing == v1);
void *old = NULL;
assert(raxInsertAt(r, (unsigned char*)"FOO", 3, v2, &old, &link) == 0);
assert(old == v1);
assert(raxFind(r, (unsigned char*)"FOO", 3, &val) == 1 && val == v2);
rax_free(v1);
/* (b) Try-insert path: FindLink reports existing -- caller
* skips InsertAt entirely. No overwrite flag needed. */
existing = NULL;
assert(raxFindLink(r, (unsigned char*)"BAR", 3, &existing, &link) == 1);
assert(existing == v3);
/* Deliberately skip raxInsertAt -- v3 must survive. */
rax_free(vNew);
assert(raxFind(r, (unsigned char*)"BAR", 3, &val) == 1 && val == v3);
/* (c) Not-found: FindLink on a missing key returns 0 and leaves
* the tree untouched (caller didn't commit). */
assert(raxFindLink(r, (unsigned char*)"BAZ", 3, NULL, &link) == 0);
assert(raxFind(r, (unsigned char*)"BAZ", 3, NULL) == 0);
assert(raxSize(r) == 2);
raxFreeWithCallback(r, rax_free);
}
if (!err)
printf("ALL TESTS PASSED!\n");
else

View file

@ -168,6 +168,32 @@ typedef struct raxIterator {
void *privdata; /* Optional private data for node callback. */
} raxIterator;
/* Result of a rax walk, used to commit a find-then-insert pair without
* re-walking the tree.
*
* raxFindLink() produces a link.
* raxInsertAt() consumes it.
*
* Invalidation contract: `stopnode` and `parentlink` are interior pointers into
* the tree. They become stale on ANY intervening rax mutation. Callers
* MUST commit (or discard) immediately after the find; do not interleave
* other rax calls on the same tree, do not retain across yield points.
* The commit itself is allowed to realloc `stopnode` (raxReallocForData,
* raxAddChild) and update *parentlink in-place -- the link's own fields
* survive the commit, but a second commit on the same link is undefined. */
typedef struct raxNodeLink {
raxNode *stopnode; /* Stop node. */
raxNode **parentlink; /* Slot in stopnode's parent that holds h. */
size_t consumed; /* Bytes of key consumed at stop. */
int splitpos; /* Split position inside stopnode's compressed
* prefix. Same semantic as raxLowWalk():
* only meaningful when stopnode->iscompr; 0 with
* i == len means clean arrival at stopnode, 0 with
* i < len means the first prefix byte
* mismatched the next key byte, > 0 means
* the walk stopped mid-prefix. */
} raxNodeLink;
/* Exported API. */
rax *raxNew(void);
rax *raxNewWithMetadata(int metaSize, size_t *alloc_size);
@ -175,6 +201,12 @@ int raxInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old);
int raxTryInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old);
int raxRemove(rax *rax, unsigned char *s, size_t len, void **old);
int raxFind(rax *rax, unsigned char *s, size_t len, void **value);
int raxFindLink(rax *rax, unsigned char *s, size_t len,
void **value, raxNodeLink *link);
int raxInsertAt(rax *rax, unsigned char *s, size_t len,
void *data, void **old, raxNodeLink *link);
void raxFree(rax *rax);
void raxFreeWithCallback(rax *rax, void (*free_callback)(void*));
void raxFreeWithCbAndContext(rax *rax,

View file

@ -3126,11 +3126,15 @@ listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, unsigned char *key) {
if (!s->cgroups_ref)
s->cgroups_ref = raxNewWithMetadata(0, &s->alloc_size);
/* Try to find the list for this stream ID, create it if it doesn't exist */
if (!raxFind(s->cgroups_ref, key, sizeof(streamID), (void**)&cglist)) {
/* Find-or-insert in a single rax walk: raxFindLink stashes the stop
* position so raxInsertAt commits without re-walking the tree. */
raxNodeLink link;
if (!raxFindLink(s->cgroups_ref, key, sizeof(streamID),
(void**)&cglist, &link)) {
cglist = listCreate();
serverAssert(raxInsert(s->cgroups_ref, key, sizeof(streamID), cglist, NULL));
serverAssert(raxInsertAt(s->cgroups_ref, key, sizeof(streamID),
cglist, NULL, &link));
}
/* Add the consumer group to the list and return the list node */
@ -3303,7 +3307,8 @@ void streamFreeConsumerGeneric(void *sc, void *s) {
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read) {
if (s->cgroups == NULL)
s->cgroups = raxNewWithMetadata(0, &s->alloc_size);
if (raxFind(s->cgroups,(unsigned char*)name,namelen,NULL))
raxNodeLink link;
if (raxFindLink(s->cgroups,(unsigned char*)name,namelen,NULL,&link))
return NULL;
size_t usable;
@ -3318,7 +3323,7 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, lo
cg->last_id.seq = 0;
streamUpdateCGroupLastId(s, cg, id);
cg->entries_read = entries_read;
raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
raxInsertAt(s->cgroups,(unsigned char*)name,namelen,cg,NULL,&link);
return cg;
}
@ -3939,7 +3944,8 @@ void xnackCommand(client *c) {
streamEncodeID(buf,&ids[j]);
void *result;
int found = raxFind(group->pel,buf,sizeof(buf),&result);
raxNodeLink link;
int found = raxFindLink(group->pel,buf,sizeof(buf),&result,&link);
if (found) {
streamNACK *nack = result;
nackSetDeliveryCount(nack, mode, retrycount);
@ -3966,7 +3972,7 @@ void xnackCommand(client *c) {
nack->delivery_count = 0;
nackSetDeliveryCount(nack, mode, retrycount);
raxInsert(group->pel, buf, sizeof(buf), nack, NULL);
raxInsertAt(group->pel, buf, sizeof(buf), nack, NULL, &link);
pelListInsertNacked(group, nack);
nack->cgroup_ref_node = streamLinkCGroupToEntry(s, group, buf);
} else {
@ -5942,14 +5948,14 @@ static idmpProducer *idmpGetOrCreateProducer(stream *s, const char *pid, size_t
s->idmp_producers = raxNewWithMetadata(0, &s->alloc_size);
}
/* Look up the producer */
idmpProducer *producer = NULL;
int found = raxFind(s->idmp_producers, (unsigned char *)pid, pid_len, (void **)&producer);
raxNodeLink link;
int found = raxFindLink(s->idmp_producers, (unsigned char *)pid, pid_len, (void **)&producer, &link);
if (!found) {
/* Create a new producer */
producer = idmpProducerCreate(&s->alloc_size);
/* Insert into the rax tree - must succeed since we checked it doesn't exist */
serverAssert(raxInsert(s->idmp_producers, (unsigned char *)pid, pid_len, producer, NULL));
serverAssert(raxInsertAt(s->idmp_producers, (unsigned char *)pid, pid_len, producer, NULL, &link));
}
return producer;

View file

@ -139,11 +139,12 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
bcastState *bs;
/* If this is the first client subscribing to such prefix, create
* the prefix in the table. */
if (!raxFind(PrefixTable,(unsigned char*)prefix,plen,&result)) {
raxNodeLink link;
if (!raxFindLink(PrefixTable,(unsigned char*)prefix,plen,&result,&link)) {
bs = zmalloc(sizeof(*bs));
bs->keys = raxNew();
bs->clients = raxNew();
raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL);
raxInsertAt(PrefixTable,(unsigned char*)prefix,plen,bs,NULL,&link);
} else {
bs = result;
}
@ -228,10 +229,11 @@ void trackingRememberKeys(client *tracking, client *executing) {
sds sdskey = executing->argv[idx]->ptr;
void *result;
rax *ids;
if (!raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),&result)) {
raxNodeLink link;
if (!raxFindLink(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),&result,&link)) {
ids = raxNew();
int inserted = raxTryInsert(TrackingTable,(unsigned char*)sdskey,
sdslen(sdskey),ids, NULL);
int inserted = raxInsertAt(TrackingTable,(unsigned char*)sdskey,
sdslen(sdskey),ids,NULL,&link);
serverAssert(inserted == 1);
} else {
ids = result;