From 2df35f9fb0ac21fbc57e3b3d784c65d6928e6ef3 Mon Sep 17 00:00:00 2001 From: Moti Cohen Date: Tue, 2 Jun 2026 18:01:16 +0300 Subject: [PATCH] Avoid double-walk on find-then-insert via raxFindLink + raxInsertAt (#15252) Add `raxFindLink()` + `raxInsertAt()`: a two-step commit API that lets a caller walk the rax once to find a key, then commit an insert at the recorded position without re-walking. Today's "lookup then insert" pattern (`raxFind` + `raxInsert` on miss) walks the tree twice; this change collapses the worst case to a single walk. --------- Co-authored-by: Claude Opus 4.7 (1M context) Co-authored-by: debing.sun --- src/module.c | 5 +- src/rax.c | 211 ++++++++++++++++++++++++++++++++++++++++--------- src/rax.h | 32 ++++++++ src/t_stream.c | 28 ++++--- src/tracking.c | 12 +-- 5 files changed, 233 insertions(+), 55 deletions(-) diff --git a/src/module.c b/src/module.c index e1ed45c46..7bf6f5835 100644 --- a/src/module.c +++ b/src/module.c @@ -10108,8 +10108,9 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod while(1) { key = htonu64(expiretime); - if (!raxFind(Timers, (unsigned char*)&key,sizeof(key),NULL)) { - raxInsert(Timers,(unsigned char*)&key,sizeof(key),timer,NULL); + raxNodeLink link; + if (!raxFindLink(Timers, (unsigned char*)&key, sizeof(key), NULL, &link)) { + raxInsertAt(Timers, (unsigned char*)&key, sizeof(key), timer, NULL, &link); break; } else { expiretime++; diff --git a/src/rax.c b/src/rax.c index 1db3f1cc2..bc4dab798 100644 --- a/src/rax.c +++ b/src/rax.c @@ -524,25 +524,54 @@ static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode return i; } -/* Insert the element 's' of size 'len', setting as auxiliary data - * the pointer 'data'. If the element is already present, the associated - * data is updated (only if 'overwrite' is set to 1), and 0 is returned, - * otherwise the element is inserted and 1 is returned. On out of memory the - * function returns 0 as well but sets errno to ENOMEM, otherwise errno will - * be set to 0. - */ -int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old, int overwrite) { - size_t i, usable; - int j = 0; /* Split position. If raxLowWalk() stops in a compressed - node, the index 'j' represents the char we stopped within the - compressed node, that is, the position where to split the - node for insertion. */ - raxNode *h, **parentlink; +#ifdef DEBUG_ASSERTIONS +/* Re-walk the tree and verify `link` still matches the current state, + * i.e. no rax mutation happened between raxFindLink() and the current + * raxInsertAt(). Returns 1 if the link is still valid, 0 otherwise. + * Used only from a debugAssert(). */ +static int raxLinkStillValid(rax *rax, unsigned char *s, size_t len, raxNodeLink *link) { + raxNode *stopnode, **parentlink; + int splitpos = 0; + size_t consumed = raxLowWalk(rax,s,len,&stopnode,&parentlink,&splitpos,NULL); + return stopnode == link->stopnode && + parentlink == link->parentlink && + consumed == link->consumed && + splitpos == link->splitpos; +} +#endif + +/* Commit an insert at the position recorded in `link`. The link must + * have come from an immediately-preceding raxFindLink() on (rax, s, len) + * with no intervening rax mutation. + * + * If the link lands on an existing key, the associated data is + * overwritten with `data`, the prior value is stored at *old (when old + * is non-NULL), and 0 is returned. Otherwise the element is inserted + * and 1 is returned. Callers wanting try-insert semantics (preserve + * existing) should check raxFindLink's return first and skip this call + * when it reports 1. + * + * On out of memory the function returns 0 and sets errno to ENOMEM; + * otherwise errno is set to 0. */ +int raxInsertAt(rax *rax, unsigned char *s, size_t len, void *data, void **old, raxNodeLink *link) { + size_t usable; + /* Pull walk state from `link`. */ + size_t i = link->consumed; + int j = link->splitpos; /* Split position. If raxLowWalk() stopped in + a compressed node, 'j' is the char index + within the compressed node where we + stopped; i.e. the position where to split + the node for insertion. Only meaningful + when h->iscompr. */ + raxNode *h = link->stopnode, **parentlink = link->parentlink; size_t dummy, *alloc_size = &dummy; + /* The link must reflect the current tree: no rax mutation is allowed + * between the raxFindLink() that produced it and this commit. */ + debugAssert(raxLinkStillValid(rax,s,len,link)); + if (rax->alloc_size) alloc_size = rax->alloc_size; debugf("### Insert %.*s with value %p\n", (int)len, s, data); - i = raxLowWalk(rax,s,len,&h,&parentlink,&j,NULL); /* If i == len we walked following the whole string. If we are not * in the middle of a compressed node, the string is either already @@ -552,7 +581,7 @@ int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void ** if (i == len && (!h->iscompr || j == 0 /* not in the middle if j is 0 */)) { debugf("### Insert: node representing key exists\n"); /* Make space for the value pointer if needed. */ - if (!h->iskey || (h->isnull && overwrite)) { + if (!h->iskey || h->isnull) { h = raxReallocForData(rax,h,data); if (h) memcpy(parentlink,&h,sizeof(h)); } @@ -564,9 +593,9 @@ int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void ** /* Update the existing key if there is already one. */ if (h->iskey) { if (old) *old = raxGetData(h); - if (overwrite) raxSetData(h,data); + raxSetData(h,data); errno = 0; - return 0; /* Element already exists. */ + return 0; /* Element already exists, overwritten. */ } /* Otherwise set the node as a key. Note that raxSetData() @@ -931,32 +960,65 @@ oom: return 0; } -/* Overwriting insert. Just a wrapper for raxGenericInsert() that will - * update the element if there is already one for the same key. */ -int raxInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old) { - return raxGenericInsert(rax,s,len,data,old,1); +/* Walk the rax once and record the stop position in `link`. Returns 1 if + * `s` is an existing key (and, if `value` is non-NULL, stores the value + * at *value); 0 otherwise. The link is populated either way so a caller + * that meant "find or insert" can commit via raxInsertAt() without a + * second walk. + * + * Invalidation contract: `link->h` and `link->parentlink` are interior + * pointers into the tree. They become stale on ANY intervening rax + * mutation. Callers MUST commit (or discard) immediately after the + * find; do not interleave other rax calls on the same tree, do not + * retain across yield points. */ +int raxFindLink(rax *rax, unsigned char *s, size_t len, + void **value, raxNodeLink *link) { + debugf("### FindLink: %.*s\n", (int)len, s); + link->splitpos = 0; + link->consumed = raxLowWalk(rax,s,len, + &link->stopnode,&link->parentlink, + &link->splitpos,NULL); + /* Match condition: query fully consumed, stopped at a clean node + * boundary (not mid-prefix on a compressed node), and the stop node + * is a key. */ + if (link->consumed != len || + (link->stopnode->iscompr && link->splitpos != 0) || + !link->stopnode->iskey) + return 0; + if (value != NULL) *value = raxGetData(link->stopnode); + return 1; } -/* Non overwriting insert function: if an element with the same key - * exists, the value is not updated and the function returns 0. - * This is just a wrapper for raxGenericInsert(). */ +/* Overwriting insert. One walk via raxFindLink, then commit at the + * recorded position. Existing element is updated. */ +int raxInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old) { + raxNodeLink link; + raxFindLink(rax, s, len, NULL, &link); + return raxInsertAt(rax,s,len,data,old,&link); +} + +/* Non-overwriting insert. If an element with the same key exists, the + * value is not updated and 0 is returned (with *old set, if non-NULL, + * to the existing value). raxFindLink already tells us whether the key + * exists, so we skip raxInsertAt's overwrite path entirely in that + * case. */ int raxTryInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old) { - return raxGenericInsert(rax,s,len,data,old,0); + raxNodeLink link; + void *existing; + if (raxFindLink(rax, s, len, &existing, &link)) { + if (old) *old = existing; + errno = 0; + return 0; + } + return raxInsertAt(rax,s,len,data,old,&link); } /* Find a key in the rax: return 1 if the item is found, 0 otherwise. - * If there is an item and 'value' is passed in a non-NULL pointer, - * the value associated with the item is set at that address. */ + * If there is an item and 'value' is passed in a non-NULL pointer, the + * value associated with the item is set at that address. */ int raxFind(rax *rax, unsigned char *s, size_t len, void **value) { - raxNode *h; - - debugf("### Lookup: %.*s\n", (int)len, s); - int splitpos = 0; - size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,NULL); - if (i != len || (h->iscompr && splitpos != 0) || !h->iskey) - return 0; - if (value != NULL) *value = raxGetData(h); - return 1; + raxNodeLink link; + return raxFindLink(rax, s, len, value, &link); } /* Return the memory address where the 'parent' node stores the specified @@ -2106,6 +2168,81 @@ int raxTest(int argc, char **argv, int flags) { raxFreeWithCallback(r, rax_free); } + TEST("raxNodeLink: insert paths via raxFindLink + raxInsertAt") { + /* Three not-found scenarios in one tree: empty insert, ALGO 1 + * mid-prefix split, ALGO 2 end-of-prefix split. */ + rax *r = raxNew(); + void *vK = createTestValue(8); + void *vAB = createTestValue(8); + void *vXX = createTestValue(8); + void *vAN = createTestValue(8); + raxNodeLink link; + void *val; + + /* (a) Empty tree: link points at head, InsertAt commits. */ + assert(raxFindLink(r, (unsigned char*)"K", 1, NULL, &link) == 0); + assert(raxInsertAt(r, (unsigned char*)"K", 1, vK, NULL, &link) == 1); + assert(raxFind(r, (unsigned char*)"K", 1, &val) == 1 && val == vK); + + /* Seed compressed node "ANNIBALE". */ + assert(raxInsert(r, (unsigned char*)"ANNIBALE", 8, vAB, NULL) == 1); + + /* (b) ALGO 1: mismatch mid-prefix. "ANXX" stops at splitpos > 0 + * inside the compressed "ANNIBALE" node. */ + assert(raxFindLink(r, (unsigned char*)"ANXX", 4, NULL, &link) == 0); + assert(link.stopnode->iscompr && link.splitpos > 0); + assert(raxInsertAt(r, (unsigned char*)"ANXX", 4, vXX, NULL, &link) == 1); + + /* (c) ALGO 2: query exhausts mid-prefix (i == len, splitpos > 0). */ + assert(raxFindLink(r, (unsigned char*)"ANNI", 4, NULL, &link) == 0); + assert(raxInsertAt(r, (unsigned char*)"ANNI", 4, vAN, NULL, &link) == 1); + + /* All four keys reachable with the correct values. */ + assert(raxFind(r, (unsigned char*)"K", 1, &val) == 1 && val == vK); + assert(raxFind(r, (unsigned char*)"ANNIBALE", 8, &val) == 1 && val == vAB); + assert(raxFind(r, (unsigned char*)"ANXX", 4, &val) == 1 && val == vXX); + assert(raxFind(r, (unsigned char*)"ANNI", 4, &val) == 1 && val == vAN); + raxFreeWithCallback(r, rax_free); + } + + TEST("raxNodeLink: existing-key paths (overwrite vs try-insert vs not-found)") { + /* Three exists/not-exists scenarios on the same tree. */ + rax *r = raxNew(); + void *v1 = createTestValue(8), *v2 = createTestValue(8); + void *v3 = createTestValue(8), *vNew = createTestValue(8); + assert(raxInsert(r, (unsigned char*)"FOO", 3, v1, NULL) == 1); + assert(raxInsert(r, (unsigned char*)"BAR", 3, v3, NULL) == 1); + + raxNodeLink link; + void *val, *existing = NULL; + + /* (a) Overwrite path: FindLink returns 1, caller calls InsertAt + * anyway -- value replaced, *old carries the prior pointer. */ + assert(raxFindLink(r, (unsigned char*)"FOO", 3, &existing, &link) == 1); + assert(existing == v1); + void *old = NULL; + assert(raxInsertAt(r, (unsigned char*)"FOO", 3, v2, &old, &link) == 0); + assert(old == v1); + assert(raxFind(r, (unsigned char*)"FOO", 3, &val) == 1 && val == v2); + rax_free(v1); + + /* (b) Try-insert path: FindLink reports existing -- caller + * skips InsertAt entirely. No overwrite flag needed. */ + existing = NULL; + assert(raxFindLink(r, (unsigned char*)"BAR", 3, &existing, &link) == 1); + assert(existing == v3); + /* Deliberately skip raxInsertAt -- v3 must survive. */ + rax_free(vNew); + assert(raxFind(r, (unsigned char*)"BAR", 3, &val) == 1 && val == v3); + + /* (c) Not-found: FindLink on a missing key returns 0 and leaves + * the tree untouched (caller didn't commit). */ + assert(raxFindLink(r, (unsigned char*)"BAZ", 3, NULL, &link) == 0); + assert(raxFind(r, (unsigned char*)"BAZ", 3, NULL) == 0); + assert(raxSize(r) == 2); + raxFreeWithCallback(r, rax_free); + } + if (!err) printf("ALL TESTS PASSED!\n"); else diff --git a/src/rax.h b/src/rax.h index db8c4632f..20902f49b 100644 --- a/src/rax.h +++ b/src/rax.h @@ -168,6 +168,32 @@ typedef struct raxIterator { void *privdata; /* Optional private data for node callback. */ } raxIterator; +/* Result of a rax walk, used to commit a find-then-insert pair without + * re-walking the tree. + * + * raxFindLink() produces a link. + * raxInsertAt() consumes it. + * + * Invalidation contract: `stopnode` and `parentlink` are interior pointers into + * the tree. They become stale on ANY intervening rax mutation. Callers + * MUST commit (or discard) immediately after the find; do not interleave + * other rax calls on the same tree, do not retain across yield points. + * The commit itself is allowed to realloc `stopnode` (raxReallocForData, + * raxAddChild) and update *parentlink in-place -- the link's own fields + * survive the commit, but a second commit on the same link is undefined. */ +typedef struct raxNodeLink { + raxNode *stopnode; /* Stop node. */ + raxNode **parentlink; /* Slot in stopnode's parent that holds h. */ + size_t consumed; /* Bytes of key consumed at stop. */ + int splitpos; /* Split position inside stopnode's compressed + * prefix. Same semantic as raxLowWalk(): + * only meaningful when stopnode->iscompr; 0 with + * i == len means clean arrival at stopnode, 0 with + * i < len means the first prefix byte + * mismatched the next key byte, > 0 means + * the walk stopped mid-prefix. */ +} raxNodeLink; + /* Exported API. */ rax *raxNew(void); rax *raxNewWithMetadata(int metaSize, size_t *alloc_size); @@ -175,6 +201,12 @@ int raxInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old); int raxTryInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old); int raxRemove(rax *rax, unsigned char *s, size_t len, void **old); int raxFind(rax *rax, unsigned char *s, size_t len, void **value); + +int raxFindLink(rax *rax, unsigned char *s, size_t len, + void **value, raxNodeLink *link); +int raxInsertAt(rax *rax, unsigned char *s, size_t len, + void *data, void **old, raxNodeLink *link); + void raxFree(rax *rax); void raxFreeWithCallback(rax *rax, void (*free_callback)(void*)); void raxFreeWithCbAndContext(rax *rax, diff --git a/src/t_stream.c b/src/t_stream.c index d77625a47..4346f4cf7 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -3126,11 +3126,15 @@ listNode *streamLinkCGroupToEntry(stream *s, streamCG *cg, unsigned char *key) { if (!s->cgroups_ref) s->cgroups_ref = raxNewWithMetadata(0, &s->alloc_size); - - /* Try to find the list for this stream ID, create it if it doesn't exist */ - if (!raxFind(s->cgroups_ref, key, sizeof(streamID), (void**)&cglist)) { + + /* Find-or-insert in a single rax walk: raxFindLink stashes the stop + * position so raxInsertAt commits without re-walking the tree. */ + raxNodeLink link; + if (!raxFindLink(s->cgroups_ref, key, sizeof(streamID), + (void**)&cglist, &link)) { cglist = listCreate(); - serverAssert(raxInsert(s->cgroups_ref, key, sizeof(streamID), cglist, NULL)); + serverAssert(raxInsertAt(s->cgroups_ref, key, sizeof(streamID), + cglist, NULL, &link)); } /* Add the consumer group to the list and return the list node */ @@ -3303,7 +3307,8 @@ void streamFreeConsumerGeneric(void *sc, void *s) { streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read) { if (s->cgroups == NULL) s->cgroups = raxNewWithMetadata(0, &s->alloc_size); - if (raxFind(s->cgroups,(unsigned char*)name,namelen,NULL)) + raxNodeLink link; + if (raxFindLink(s->cgroups,(unsigned char*)name,namelen,NULL,&link)) return NULL; size_t usable; @@ -3318,7 +3323,7 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, lo cg->last_id.seq = 0; streamUpdateCGroupLastId(s, cg, id); cg->entries_read = entries_read; - raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL); + raxInsertAt(s->cgroups,(unsigned char*)name,namelen,cg,NULL,&link); return cg; } @@ -3939,7 +3944,8 @@ void xnackCommand(client *c) { streamEncodeID(buf,&ids[j]); void *result; - int found = raxFind(group->pel,buf,sizeof(buf),&result); + raxNodeLink link; + int found = raxFindLink(group->pel,buf,sizeof(buf),&result,&link); if (found) { streamNACK *nack = result; nackSetDeliveryCount(nack, mode, retrycount); @@ -3966,7 +3972,7 @@ void xnackCommand(client *c) { nack->delivery_count = 0; nackSetDeliveryCount(nack, mode, retrycount); - raxInsert(group->pel, buf, sizeof(buf), nack, NULL); + raxInsertAt(group->pel, buf, sizeof(buf), nack, NULL, &link); pelListInsertNacked(group, nack); nack->cgroup_ref_node = streamLinkCGroupToEntry(s, group, buf); } else { @@ -5942,14 +5948,14 @@ static idmpProducer *idmpGetOrCreateProducer(stream *s, const char *pid, size_t s->idmp_producers = raxNewWithMetadata(0, &s->alloc_size); } - /* Look up the producer */ idmpProducer *producer = NULL; - int found = raxFind(s->idmp_producers, (unsigned char *)pid, pid_len, (void **)&producer); + raxNodeLink link; + int found = raxFindLink(s->idmp_producers, (unsigned char *)pid, pid_len, (void **)&producer, &link); if (!found) { /* Create a new producer */ producer = idmpProducerCreate(&s->alloc_size); /* Insert into the rax tree - must succeed since we checked it doesn't exist */ - serverAssert(raxInsert(s->idmp_producers, (unsigned char *)pid, pid_len, producer, NULL)); + serverAssert(raxInsertAt(s->idmp_producers, (unsigned char *)pid, pid_len, producer, NULL, &link)); } return producer; diff --git a/src/tracking.c b/src/tracking.c index c235d5812..12fb86903 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -139,11 +139,12 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { bcastState *bs; /* If this is the first client subscribing to such prefix, create * the prefix in the table. */ - if (!raxFind(PrefixTable,(unsigned char*)prefix,plen,&result)) { + raxNodeLink link; + if (!raxFindLink(PrefixTable,(unsigned char*)prefix,plen,&result,&link)) { bs = zmalloc(sizeof(*bs)); bs->keys = raxNew(); bs->clients = raxNew(); - raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL); + raxInsertAt(PrefixTable,(unsigned char*)prefix,plen,bs,NULL,&link); } else { bs = result; } @@ -228,10 +229,11 @@ void trackingRememberKeys(client *tracking, client *executing) { sds sdskey = executing->argv[idx]->ptr; void *result; rax *ids; - if (!raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),&result)) { + raxNodeLink link; + if (!raxFindLink(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),&result,&link)) { ids = raxNew(); - int inserted = raxTryInsert(TrackingTable,(unsigned char*)sdskey, - sdslen(sdskey),ids, NULL); + int inserted = raxInsertAt(TrackingTable,(unsigned char*)sdskey, + sdslen(sdskey),ids,NULL,&link); serverAssert(inserted == 1); } else { ids = result;