From 832090d82163086c7492733b366dca3d8bea7a35 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 20 Feb 2025 18:30:59 +0100 Subject: [PATCH] Expr filtering: draft of the dual ported object. --- expr.c | 33 ++++++++++++++++++++ vset.c | 97 +++++++++++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 116 insertions(+), 14 deletions(-) diff --git a/expr.c b/expr.c index 408a1b4b3a..07ec1ec0a6 100644 --- a/expr.c +++ b/expr.c @@ -56,6 +56,9 @@ typedef struct exprtoken { size_t len; } tuple; // Tuples are like [1, 2, 3] for "in" operator. }; + char *heapstr; // True if we have a private allocation for this + // string. When possible, it just references to the + // string expression we compiled, exprstate->expr. } exprtoken; /* Simple stack of expr tokens. This is used both to represent the stack @@ -118,6 +121,7 @@ struct { /* ================================ Expr token ============================== */ void exprFreeToken(exprtoken *t) { if (t == NULL) return; + if (t->heapstr != NULL) free(t->heapstr); free(t); } @@ -618,9 +622,38 @@ int exprRun(exprstate *es, char *json, size_t json_len) { if (t->token_type == EXPR_TOKEN_SELECTOR) { exprtoken *result = malloc(sizeof(exprtoken)); if (result != NULL && json != NULL) { + cJSON *attrib = NULL; if (parsed_json == NULL) parsed_json = cJSON_ParseWithLength(json,json_len); if (parsed_json) { + char item_name[128]; + if (t->str.len <= sizeof(item_name)) { + memcpy(item_name,t->str.start+1,t->str.len-1); + item_name[t->str.len] = 0; + attrib = cJSON_GetObjectItem(parsed_json,item_name); + } + /* Fill the token according to the JSON type stored + * at the attribute. */ + if (attrib) { + if (cJSON_IsNumber(attrib)) { + result->token_type = EXPR_TOKEN_NUM; + result->num = cJSON_GetNumberValue(attrib); + } else if (cJSON_IsString(attrib)) { + result->token_type = EXPR_TOKEN_STR; + char *strval = cJSON_GetStringValue(attrib); + result->heapstr = strdup(strval); + if (result->heapstr != NULL) { + result->str.start = result->heapstr; + result->str.len = strlen(result->heapstr); + } else { + attrib = NULL; + } + } else { + attrib = NULL; // Unsupported type. + } + } + } + if (attrib) { exprStackPush(&es->values_stack, result); continue; } diff --git a/vset.c b/vset.c index 6d9971c46a..e14537202e 100644 --- a/vset.c +++ b/vset.c @@ -48,6 +48,15 @@ struct vsetObject { pthread_rwlock_t in_use_lock; // Lock needed to destroy the object safely. uint64_t id; // Unique ID used by threaded VADD to know the // object is still the same. + uint64_t numattribs; // Number of nodes associated with an attribute. +}; + +/* Each node has two associated values: the associated string (the item + * in the set) and potentially a JSON string, that is, the attributes, used + * for hybrid search with the VSIM FILTER option. */ +struct vsetNodeVal { + RedisModuleString *item; + RedisModuleString *attrib; }; /* Create a random projection matrix for dimensionality reduction. @@ -108,13 +117,16 @@ struct vsetObject *createVectorSetObject(unsigned int dim, uint32_t quant_type) o->proj_matrix = NULL; o->proj_input_size = 0; + o->numattribs = 0; pthread_rwlock_init(&o->in_use_lock,NULL); - return o; } void vectorSetReleaseNodeValue(void *v) { - RedisModule_FreeString(NULL,v); + struct vsetNodeVal *nv = v; + RedisModule_FreeString(NULL,nv->item); + RedisModule_FreeString(NULL,nv->attrib); + RedisModule_Free(nv); } /* Free the vector set object. */ @@ -142,24 +154,60 @@ const char *vectorSetGetQuantName(struct vsetObject *o) { * * Returns 1 if the element was added, or 0 if the element was already there * and was just updated. */ -int vectorSetInsert(struct vsetObject *o, float *vec, int8_t *qvec, float qrange, RedisModuleString *val, int update, int ef) +int vectorSetInsert(struct vsetObject *o, float *vec, int8_t *qvec, float qrange, RedisModuleString *val, RedisModuleString *attrib, int update, int ef) { hnswNode *node = RedisModule_DictGet(o->dict,val,NULL); if (node != NULL) { if (update) { - void *old_val = node->value; + struct vsetNodeVal *nv = node->value; /* Pass NULL as value-free function. We want to reuse * the old value. */ hnsw_delete_node(o->hnsw, node, NULL); - node = hnsw_insert(o->hnsw,vec,qvec,qrange,0,old_val,ef); + node = hnsw_insert(o->hnsw,vec,qvec,qrange,0,nv,ef); RedisModule_DictReplace(o->dict,val,node); + + /* If attrib != NULL, the user wants that in case of an update we + * update the attribute as well (otherwise it reamins as it was). + * Note that the order of operations is conceinved so that it + * works in case the old attrib and the new attrib pointer is the + * same. */ + if (attrib) { + // Empty attribute string means: unset the attribute during + // the update. + size_t attrlen; + RedisModule_StringPtrLen(attrib,&attrlen); + if (attrlen != 0) { + RedisModule_RetainString(NULL,attrib); + o->numattribs++; + } else { + attrib = NULL; + } + + if (nv->attrib) { + o->numattribs--; + RedisModule_FreeString(NULL,nv->attrib); + } + nv->attrib = attrib; + } } return 0; } - node = hnsw_insert(o->hnsw,vec,qvec,qrange,0,val,ef); - if (!node) return 0; + struct vsetNodeVal *nv = RedisModule_Alloc(sizeof(*nv)); + nv->item = val; + nv->attrib = attrib; + node = hnsw_insert(o->hnsw,vec,qvec,qrange,0,nv,ef); + if (node == NULL) { + // XXX Technically in Redis-land we don't have out of memories as we + // crash. However the HNSW library may fail for error in the locking + // libc call. There is understand if this may actually happen or not. + RedisModule_Free(nv); + return 0; + } + if (attrib != NULL) o->numattribs++; RedisModule_DictSet(o->dict,val,node); + RedisModule_RetainString(NULL,val); + if (attrib) RedisModule_RetainString(NULL,val); return 1; } @@ -268,6 +316,7 @@ int VADD_CASReply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModuleString *val = targ[4]; InsertContext *ic = targ[5]; int ef = (uint64_t)targ[6]; + RedisModuleString *attrib = targ[7]; RedisModule_Free(targ); /* Open the key: there are no guarantees it still exists, or contains @@ -300,6 +349,13 @@ int VADD_CASReply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { /* Otherwise try to insert the new element with the neighbors * collected in background. If we fail, do it synchronously again * from scratch. */ + + // First: allocate the dual-ported value for the node. + struct vsetNodeVal *nv = RedisModule_Alloc(sizeof(*nv)); + nv->item = val; + nv->attrib = attrib; + + // Then: insert the node in the HNSW data structure. hnswNode *newnode; if ((newnode = hnsw_try_commit_insert(vset->hnsw, ic)) == NULL) { newnode = hnsw_insert(vset->hnsw, vec, NULL, 0, 0, val, ef); @@ -313,6 +369,7 @@ int VADD_CASReply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { // Whatever happens is a success... :D RedisModule_ReplyWithLongLong(ctx,1); if (val) RedisModule_FreeString(ctx,val); // Not added? Free it. + if (attrib) RedisModule_FreeString(ctx,attrib); // Not added? Free it. RedisModule_Free(vec); return retval; } @@ -330,6 +387,7 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { int cas = 0; // Threaded check-and-set style insert. long long ef = VSET_DEFAULT_C_EF; // HNSW creation time EF for new nodes. float *vec = parseVector(argv, argc, 2, &dim, &reduce_dim, &consumed_args); + RedisModuleString *attrib; // Attributes if passed via ATTRIB. if (!vec) return RedisModule_ReplyWithError(ctx,"ERR invalid vector specification"); @@ -350,7 +408,10 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_Free(vec); return RedisModule_ReplyWithError(ctx, "ERR invalid EF"); } - j++; // skip EF argument. + j++; // skip argument. + } else if (!strcasecmp(opt, "SETATTR") && j+1 < argc) { + attrib = argv[j+1]; + j++; // skip argument. } else if (!strcasecmp(opt, "NOQUANT")) { quant_type = HNSW_QUANT_NONE; } else if (!strcasecmp(opt, "BIN")) { @@ -464,8 +525,7 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (!cas) { /* Insert vector synchronously. */ - int added = vectorSetInsert(vset,vec,NULL,0,val,1,ef); - if (added) RedisModule_RetainString(ctx,val); + int added = vectorSetInsert(vset,vec,NULL,0,val,attrib,1,ef); RedisModule_Free(vec); RedisModule_ReplyWithLongLong(ctx,added); @@ -478,7 +538,7 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,VADD_CASReply,NULL,NULL,0); pthread_t tid; - void **targ = RedisModule_Alloc(sizeof(void*)*7); + void **targ = RedisModule_Alloc(sizeof(void*)*8); targ[0] = bc; targ[1] = vset; targ[2] = (void*)(unsigned long)vset->id; @@ -486,7 +546,9 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { targ[4] = val; targ[5] = NULL; // Used later for insertion context. targ[6] = (void*)(unsigned long)ef; + targ[7] = attrib; RedisModule_RetainString(ctx,val); + if (attrib) RedisModule_RetainString(ctx,attrib); if (pthread_create(&tid,NULL,VADD_thread,targ) != 0) { pthread_rwlock_unlock(&vset->in_use_lock); RedisModule_AbortBlock(bc); @@ -839,6 +901,8 @@ int VREM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { /* Remove from HNSW graph using the high-level API that handles * locking and cleanup. We pass RedisModule_FreeString as the value * free function since the strings were retained at insertion time. */ + struct vsetNodeVal *nv = node->value; + if (nv->attrib != NULL) vset->numattribs--; hnsw_delete_node(vset->hnsw, node, vectorSetReleaseNodeValue); /* Destroy empty vector set. */ @@ -1035,6 +1099,9 @@ int VINFO_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) /* ============================== vset type methods ========================= */ +#define SAVE_FLAG_HAS_PROJMATRIX (1<<0) +#define SAVE_FLAG_HAS_ATTRIBS (1<<1) + /* Save object to RDB */ void VectorSetRdbSave(RedisModuleIO *rdb, void *value) { struct vsetObject *vset = value; @@ -1042,9 +1109,13 @@ void VectorSetRdbSave(RedisModuleIO *rdb, void *value) { RedisModule_SaveUnsigned(rdb, vset->hnsw->node_count); RedisModule_SaveUnsigned(rdb, vset->hnsw->quant_type); + uint32_t save_flags = 0; + if (vset->proj_matrix) save_flags |= SAVE_FLAG_HAS_PROJMATRIX; + if (vset->numattribs != 0) save_flags |= SAVE_FLAG_HAS_ATTRIBS; + RedisModule_SaveUnsigned(rdb, save_flags); + /* Save projection matrix if present */ if (vset->proj_matrix) { - RedisModule_SaveUnsigned(rdb, 1); // has projection uint32_t input_dim = vset->proj_input_size; uint32_t output_dim = vset->hnsw->vector_dim; RedisModule_SaveUnsigned(rdb, input_dim); @@ -1054,8 +1125,6 @@ void VectorSetRdbSave(RedisModuleIO *rdb, void *value) { // Save projection matrix as binary blob size_t matrix_size = sizeof(float) * input_dim * output_dim; RedisModule_SaveStringBuffer(rdb, (const char *)vset->proj_matrix, matrix_size); - } else { - RedisModule_SaveUnsigned(rdb, 0); // no projection } hnswNode *node = vset->hnsw->head;