Expr filtering: draft of the dual ported object.

This commit is contained in:
antirez 2025-02-20 18:30:59 +01:00
parent af6fa6f732
commit 832090d821
2 changed files with 116 additions and 14 deletions

33
expr.c
View file

@ -56,6 +56,9 @@ typedef struct exprtoken {
size_t len;
} tuple; // Tuples are like [1, 2, 3] for "in" operator.
};
char *heapstr; // True if we have a private allocation for this
// string. When possible, it just references to the
// string expression we compiled, exprstate->expr.
} exprtoken;
/* Simple stack of expr tokens. This is used both to represent the stack
@ -118,6 +121,7 @@ struct {
/* ================================ Expr token ============================== */
void exprFreeToken(exprtoken *t) {
if (t == NULL) return;
if (t->heapstr != NULL) free(t->heapstr);
free(t);
}
@ -618,9 +622,38 @@ int exprRun(exprstate *es, char *json, size_t json_len) {
if (t->token_type == EXPR_TOKEN_SELECTOR) {
exprtoken *result = malloc(sizeof(exprtoken));
if (result != NULL && json != NULL) {
cJSON *attrib = NULL;
if (parsed_json == NULL)
parsed_json = cJSON_ParseWithLength(json,json_len);
if (parsed_json) {
char item_name[128];
if (t->str.len <= sizeof(item_name)) {
memcpy(item_name,t->str.start+1,t->str.len-1);
item_name[t->str.len] = 0;
attrib = cJSON_GetObjectItem(parsed_json,item_name);
}
/* Fill the token according to the JSON type stored
* at the attribute. */
if (attrib) {
if (cJSON_IsNumber(attrib)) {
result->token_type = EXPR_TOKEN_NUM;
result->num = cJSON_GetNumberValue(attrib);
} else if (cJSON_IsString(attrib)) {
result->token_type = EXPR_TOKEN_STR;
char *strval = cJSON_GetStringValue(attrib);
result->heapstr = strdup(strval);
if (result->heapstr != NULL) {
result->str.start = result->heapstr;
result->str.len = strlen(result->heapstr);
} else {
attrib = NULL;
}
} else {
attrib = NULL; // Unsupported type.
}
}
}
if (attrib) {
exprStackPush(&es->values_stack, result);
continue;
}

97
vset.c
View file

@ -48,6 +48,15 @@ struct vsetObject {
pthread_rwlock_t in_use_lock; // Lock needed to destroy the object safely.
uint64_t id; // Unique ID used by threaded VADD to know the
// object is still the same.
uint64_t numattribs; // Number of nodes associated with an attribute.
};
/* Each node has two associated values: the associated string (the item
* in the set) and potentially a JSON string, that is, the attributes, used
* for hybrid search with the VSIM FILTER option. */
struct vsetNodeVal {
RedisModuleString *item;
RedisModuleString *attrib;
};
/* Create a random projection matrix for dimensionality reduction.
@ -108,13 +117,16 @@ struct vsetObject *createVectorSetObject(unsigned int dim, uint32_t quant_type)
o->proj_matrix = NULL;
o->proj_input_size = 0;
o->numattribs = 0;
pthread_rwlock_init(&o->in_use_lock,NULL);
return o;
}
void vectorSetReleaseNodeValue(void *v) {
RedisModule_FreeString(NULL,v);
struct vsetNodeVal *nv = v;
RedisModule_FreeString(NULL,nv->item);
RedisModule_FreeString(NULL,nv->attrib);
RedisModule_Free(nv);
}
/* Free the vector set object. */
@ -142,24 +154,60 @@ const char *vectorSetGetQuantName(struct vsetObject *o) {
*
* Returns 1 if the element was added, or 0 if the element was already there
* and was just updated. */
int vectorSetInsert(struct vsetObject *o, float *vec, int8_t *qvec, float qrange, RedisModuleString *val, int update, int ef)
int vectorSetInsert(struct vsetObject *o, float *vec, int8_t *qvec, float qrange, RedisModuleString *val, RedisModuleString *attrib, int update, int ef)
{
hnswNode *node = RedisModule_DictGet(o->dict,val,NULL);
if (node != NULL) {
if (update) {
void *old_val = node->value;
struct vsetNodeVal *nv = node->value;
/* Pass NULL as value-free function. We want to reuse
* the old value. */
hnsw_delete_node(o->hnsw, node, NULL);
node = hnsw_insert(o->hnsw,vec,qvec,qrange,0,old_val,ef);
node = hnsw_insert(o->hnsw,vec,qvec,qrange,0,nv,ef);
RedisModule_DictReplace(o->dict,val,node);
/* If attrib != NULL, the user wants that in case of an update we
* update the attribute as well (otherwise it reamins as it was).
* Note that the order of operations is conceinved so that it
* works in case the old attrib and the new attrib pointer is the
* same. */
if (attrib) {
// Empty attribute string means: unset the attribute during
// the update.
size_t attrlen;
RedisModule_StringPtrLen(attrib,&attrlen);
if (attrlen != 0) {
RedisModule_RetainString(NULL,attrib);
o->numattribs++;
} else {
attrib = NULL;
}
if (nv->attrib) {
o->numattribs--;
RedisModule_FreeString(NULL,nv->attrib);
}
nv->attrib = attrib;
}
}
return 0;
}
node = hnsw_insert(o->hnsw,vec,qvec,qrange,0,val,ef);
if (!node) return 0;
struct vsetNodeVal *nv = RedisModule_Alloc(sizeof(*nv));
nv->item = val;
nv->attrib = attrib;
node = hnsw_insert(o->hnsw,vec,qvec,qrange,0,nv,ef);
if (node == NULL) {
// XXX Technically in Redis-land we don't have out of memories as we
// crash. However the HNSW library may fail for error in the locking
// libc call. There is understand if this may actually happen or not.
RedisModule_Free(nv);
return 0;
}
if (attrib != NULL) o->numattribs++;
RedisModule_DictSet(o->dict,val,node);
RedisModule_RetainString(NULL,val);
if (attrib) RedisModule_RetainString(NULL,val);
return 1;
}
@ -268,6 +316,7 @@ int VADD_CASReply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModuleString *val = targ[4];
InsertContext *ic = targ[5];
int ef = (uint64_t)targ[6];
RedisModuleString *attrib = targ[7];
RedisModule_Free(targ);
/* Open the key: there are no guarantees it still exists, or contains
@ -300,6 +349,13 @@ int VADD_CASReply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
/* Otherwise try to insert the new element with the neighbors
* collected in background. If we fail, do it synchronously again
* from scratch. */
// First: allocate the dual-ported value for the node.
struct vsetNodeVal *nv = RedisModule_Alloc(sizeof(*nv));
nv->item = val;
nv->attrib = attrib;
// Then: insert the node in the HNSW data structure.
hnswNode *newnode;
if ((newnode = hnsw_try_commit_insert(vset->hnsw, ic)) == NULL) {
newnode = hnsw_insert(vset->hnsw, vec, NULL, 0, 0, val, ef);
@ -313,6 +369,7 @@ int VADD_CASReply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
// Whatever happens is a success... :D
RedisModule_ReplyWithLongLong(ctx,1);
if (val) RedisModule_FreeString(ctx,val); // Not added? Free it.
if (attrib) RedisModule_FreeString(ctx,attrib); // Not added? Free it.
RedisModule_Free(vec);
return retval;
}
@ -330,6 +387,7 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
int cas = 0; // Threaded check-and-set style insert.
long long ef = VSET_DEFAULT_C_EF; // HNSW creation time EF for new nodes.
float *vec = parseVector(argv, argc, 2, &dim, &reduce_dim, &consumed_args);
RedisModuleString *attrib; // Attributes if passed via ATTRIB.
if (!vec)
return RedisModule_ReplyWithError(ctx,"ERR invalid vector specification");
@ -350,7 +408,10 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_Free(vec);
return RedisModule_ReplyWithError(ctx, "ERR invalid EF");
}
j++; // skip EF argument.
j++; // skip argument.
} else if (!strcasecmp(opt, "SETATTR") && j+1 < argc) {
attrib = argv[j+1];
j++; // skip argument.
} else if (!strcasecmp(opt, "NOQUANT")) {
quant_type = HNSW_QUANT_NONE;
} else if (!strcasecmp(opt, "BIN")) {
@ -464,8 +525,7 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (!cas) {
/* Insert vector synchronously. */
int added = vectorSetInsert(vset,vec,NULL,0,val,1,ef);
if (added) RedisModule_RetainString(ctx,val);
int added = vectorSetInsert(vset,vec,NULL,0,val,attrib,1,ef);
RedisModule_Free(vec);
RedisModule_ReplyWithLongLong(ctx,added);
@ -478,7 +538,7 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,VADD_CASReply,NULL,NULL,0);
pthread_t tid;
void **targ = RedisModule_Alloc(sizeof(void*)*7);
void **targ = RedisModule_Alloc(sizeof(void*)*8);
targ[0] = bc;
targ[1] = vset;
targ[2] = (void*)(unsigned long)vset->id;
@ -486,7 +546,9 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
targ[4] = val;
targ[5] = NULL; // Used later for insertion context.
targ[6] = (void*)(unsigned long)ef;
targ[7] = attrib;
RedisModule_RetainString(ctx,val);
if (attrib) RedisModule_RetainString(ctx,attrib);
if (pthread_create(&tid,NULL,VADD_thread,targ) != 0) {
pthread_rwlock_unlock(&vset->in_use_lock);
RedisModule_AbortBlock(bc);
@ -839,6 +901,8 @@ int VREM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
/* Remove from HNSW graph using the high-level API that handles
* locking and cleanup. We pass RedisModule_FreeString as the value
* free function since the strings were retained at insertion time. */
struct vsetNodeVal *nv = node->value;
if (nv->attrib != NULL) vset->numattribs--;
hnsw_delete_node(vset->hnsw, node, vectorSetReleaseNodeValue);
/* Destroy empty vector set. */
@ -1035,6 +1099,9 @@ int VINFO_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
/* ============================== vset type methods ========================= */
#define SAVE_FLAG_HAS_PROJMATRIX (1<<0)
#define SAVE_FLAG_HAS_ATTRIBS (1<<1)
/* Save object to RDB */
void VectorSetRdbSave(RedisModuleIO *rdb, void *value) {
struct vsetObject *vset = value;
@ -1042,9 +1109,13 @@ void VectorSetRdbSave(RedisModuleIO *rdb, void *value) {
RedisModule_SaveUnsigned(rdb, vset->hnsw->node_count);
RedisModule_SaveUnsigned(rdb, vset->hnsw->quant_type);
uint32_t save_flags = 0;
if (vset->proj_matrix) save_flags |= SAVE_FLAG_HAS_PROJMATRIX;
if (vset->numattribs != 0) save_flags |= SAVE_FLAG_HAS_ATTRIBS;
RedisModule_SaveUnsigned(rdb, save_flags);
/* Save projection matrix if present */
if (vset->proj_matrix) {
RedisModule_SaveUnsigned(rdb, 1); // has projection
uint32_t input_dim = vset->proj_input_size;
uint32_t output_dim = vset->hnsw->vector_dim;
RedisModule_SaveUnsigned(rdb, input_dim);
@ -1054,8 +1125,6 @@ void VectorSetRdbSave(RedisModuleIO *rdb, void *value) {
// Save projection matrix as binary blob
size_t matrix_size = sizeof(float) * input_dim * output_dim;
RedisModule_SaveStringBuffer(rdb, (const char *)vset->proj_matrix, matrix_size);
} else {
RedisModule_SaveUnsigned(rdb, 0); // no projection
}
hnswNode *node = vset->hnsw->head;