diff --git a/src/listpack.c b/src/listpack.c index 21b30eaee..5d9028e13 100644 --- a/src/listpack.c +++ b/src/listpack.c @@ -681,50 +681,35 @@ int lpGetIntegerValue(unsigned char *p, long long *lval) { return 0; } -/* Find pointer to the entry equal to the specified entry. Skip 'skip' entries - * between every comparison. Returns NULL when the field could not be found. */ -unsigned char *lpFind(unsigned char *lp, unsigned char *p, unsigned char *s, - uint32_t slen, unsigned int skip) { +/* Find pointer to the entry with a comparator callback. + * + * 'cmp' is a comparator callback. If it returns zero, current entry pointer + * will be returned. 'user' is passed to this callback. + * Skip 'skip' entries between every comparison. + * Returns NULL when the field could not be found. */ +unsigned char *lpFindCb(unsigned char *lp, unsigned char *p, + void *user, lpCmp cmp, unsigned int skip) +{ int skipcnt = 0; - unsigned char vencoding = 0; unsigned char *value; - int64_t ll, vll; + int64_t ll; uint64_t entry_size = 123456789; /* initialized to avoid warning. */ uint32_t lp_bytes = lpBytes(lp); - assert(p); + if (!p) + p = lpFirst(lp); + while (p) { if (skipcnt == 0) { value = lpGetWithSize(p, &ll, NULL, &entry_size); if (value) { /* check the value doesn't reach outside the listpack before accessing it */ assert(p >= lp + LP_HDR_SIZE && p + entry_size < lp + lp_bytes); - if (slen == ll && memcmp(value, s, slen) == 0) { - return p; - } - } else { - /* Find out if the searched field can be encoded. Note that - * we do it only the first time, once done vencoding is set - * to non-zero and vll is set to the integer value. */ - if (vencoding == 0) { - /* If the entry can be encoded as integer we set it to - * 1, else set it to UCHAR_MAX, so that we don't retry - * again the next time. */ - if (slen >= 32 || slen == 0 || !lpStringToInt64((const char*)s, slen, &vll)) { - vencoding = UCHAR_MAX; - } else { - vencoding = 1; - } - } - - /* Compare current entry with specified entry, do it only - * if vencoding != UCHAR_MAX because if there is no encoding - * possible for the field it can't be a valid integer. */ - if (vencoding != UCHAR_MAX && ll == vll) { - return p; - } } + if (cmp(lp, p, user, value, ll) == 0) + return p; + /* Reset skip count */ skipcnt = skip; p += entry_size; @@ -749,6 +734,62 @@ unsigned char *lpFind(unsigned char *lp, unsigned char *p, unsigned char *s, return NULL; } +struct lpFindArg { + unsigned char *s; /* Item to search */ + uint32_t slen; /* Item len */ + int vencoding; + int64_t vll; +}; + +/* Comparator function to find item */ +static inline int lpFindCmp(const unsigned char *lp, unsigned char *p, + void *user, unsigned char *s, long long slen) { + (void) lp; + (void) p; + struct lpFindArg *arg = user; + + if (s) { + if (slen == arg->slen && memcmp(arg->s, s, slen) == 0) { + return 0; + } + } else { + /* Find out if the searched field can be encoded. Note that + * we do it only the first time, once done vencoding is set + * to non-zero and vll is set to the integer value. */ + if (arg->vencoding == 0) { + /* If the entry can be encoded as integer we set it to + * 1, else set it to UCHAR_MAX, so that we don't retry + * again the next time. */ + if (arg->slen >= 32 || arg->slen == 0 || !lpStringToInt64((const char*)arg->s, arg->slen, &arg->vll)) { + arg->vencoding = UCHAR_MAX; + } else { + arg->vencoding = 1; + } + } + + /* Compare current entry with specified entry, do it only + * if vencoding != UCHAR_MAX because if there is no encoding + * possible for the field it can't be a valid integer. */ + if (arg->vencoding != UCHAR_MAX && slen == arg->vll) { + return 0; + } + } + + return 1; +} + +/* Find pointer to the entry equal to the specified entry. Skip 'skip' entries + * between every comparison. Returns NULL when the field could not be found. */ +unsigned char *lpFind(unsigned char *lp, unsigned char *p, unsigned char *s, + uint32_t slen, unsigned int skip) +{ + struct lpFindArg arg = { + .s = s, + .slen = slen + }; + return lpFindCb(lp, p, &arg, lpFindCmp, skip); +} + /* Insert, delete or replace the specified string element 'elestr' of length * 'size' or integer element 'eleint' at the specified position 'p', with 'p' * being a listpack element pointer obtained with lpFirst(), lpLast(), lpNext(), @@ -926,6 +967,140 @@ unsigned char *lpInsert(unsigned char *lp, unsigned char *elestr, unsigned char return lp; } +/* Insert the specified elements with 'entries' and 'len' at the specified + * position 'p', with 'p' being a listpack element pointer obtained with + * lpFirst(), lpLast(), lpNext(), lpPrev() or lpSeek(). + * + * This is similar to lpInsert() but allows you to insert batch of entries in + * one call. This function is more efficient than inserting entries one by one + * as it does single realloc()/memmove() calls for all the entries. + * + * In each listpackEntry, if 'sval' is not null, it is assumed entry is string + * and 'sval' and 'slen' will be used. Otherwise, 'lval' will be used to append + * the integer entry. + * + * The elements are inserted before or after the element pointed by 'p' + * depending on the 'where' argument, that can be LP_BEFORE or LP_AFTER. + * + * If 'newp' is not NULL, at the end of a successful call '*newp' will be set + * to the address of the element just added, so that it will be possible to + * continue an interaction with lpNext() and lpPrev(). + * + * Returns NULL on out of memory or when the listpack total length would exceed + * the max allowed size of 2^32-1, otherwise the new pointer to the listpack + * holding the new element is returned (and the old pointer passed is no longer + * considered valid). */ +unsigned char *lpBatchInsert(unsigned char *lp, unsigned char *p, int where, + listpackEntry *entries, unsigned int len, + unsigned char **newp) +{ + assert(where == LP_BEFORE || where == LP_AFTER); + assert(entries != NULL && len > 0); + + struct listpackInsertEntry { + int enctype; + uint64_t enclen; + unsigned char intenc[LP_MAX_INT_ENCODING_LEN]; + unsigned char backlen[LP_MAX_BACKLEN_SIZE]; + unsigned long backlen_size; + }; + + uint64_t addedlen = 0; /* The encoded length of the added elements. */ + struct listpackInsertEntry tmp[3]; /* Encoded entries */ + struct listpackInsertEntry *enc = tmp; + + if (len > sizeof(tmp) / sizeof(struct listpackInsertEntry)) { + /* If 'len' is larger than local buffer size, allocate on heap. */ + enc = zmalloc(len * sizeof(struct listpackInsertEntry)); + } + + /* If we need to insert after the current element, we just jump to the + * next element (that could be the EOF one) and handle the case of + * inserting before. So the function will actually deal with just one + * case: LP_BEFORE. */ + if (where == LP_AFTER) { + p = lpSkip(p); + where = LP_BEFORE; + ASSERT_INTEGRITY(lp, p); + } + + for (unsigned int i = 0; i < len; i++) { + listpackEntry *e = &entries[i]; + if (e->sval) { + /* Calling lpEncodeGetType() results into the encoded version of the + * element to be stored into 'intenc' in case it is representable as + * an integer: in that case, the function returns LP_ENCODING_INT. + * Otherwise, if LP_ENCODING_STR is returned, we'll have to call + * lpEncodeString() to actually write the encoded string on place + * later. + * + * Whatever the returned encoding is, 'enclen' is populated with the + * length of the encoded element. */ + enc[i].enctype = lpEncodeGetType(e->sval, e->slen, + enc[i].intenc, &enc[i].enclen); + } else { + enc[i].enctype = LP_ENCODING_INT; + lpEncodeIntegerGetType(e->lval, enc[i].intenc, &enc[i].enclen); + } + addedlen += enc[i].enclen; + + /* We need to also encode the backward-parsable length of the element + * and append it to the end: this allows to traverse the listpack from + * the end to the start. */ + enc[i].backlen_size = lpEncodeBacklen(enc[i].backlen, enc[i].enclen); + addedlen += enc[i].backlen_size; + } + + uint64_t old_listpack_bytes = lpGetTotalBytes(lp); + uint64_t new_listpack_bytes = old_listpack_bytes + addedlen; + if (new_listpack_bytes > UINT32_MAX) return NULL; + + /* Store the offset of the element 'p', so that we can obtain its + * address again after a reallocation. */ + unsigned long poff = p-lp; + unsigned char *dst = lp + poff; /* May be updated after reallocation. */ + + /* Realloc before: we need more room. */ + if (new_listpack_bytes > old_listpack_bytes && + new_listpack_bytes > lp_malloc_size(lp)) { + if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL; + dst = lp + poff; + } + + /* Setup the listpack relocating the elements to make the exact room + * we need to store the new ones. */ + memmove(dst+addedlen,dst,old_listpack_bytes-poff); + + for (unsigned int i = 0; i < len; i++) { + listpackEntry *ent = &entries[i]; + + if (newp) + *newp = dst; + + if (enc[i].enctype == LP_ENCODING_INT) + memcpy(dst, enc[i].intenc, enc[i].enclen); + else + lpEncodeString(dst, ent->sval, ent->slen); + + dst += enc[i].enclen; + memcpy(dst, enc[i].backlen, enc[i].backlen_size); + dst += enc[i].backlen_size; + } + + /* Update header. */ + uint32_t num_elements = lpGetNumElements(lp); + if (num_elements != LP_HDR_NUMELE_UNKNOWN) { + if ((int64_t) len > (int64_t) LP_HDR_NUMELE_UNKNOWN - (int64_t) num_elements) + lpSetNumElements(lp, LP_HDR_NUMELE_UNKNOWN); + else + lpSetNumElements(lp,num_elements + len); + } + lpSetTotalBytes(lp,new_listpack_bytes); + if (enc != tmp) lp_free(enc); + + return lp; +} + /* This is just a wrapper for lpInsert() to directly use a string. */ unsigned char *lpInsertString(unsigned char *lp, unsigned char *s, uint32_t slen, unsigned char *p, int where, unsigned char **newp) @@ -973,6 +1148,20 @@ unsigned char *lpAppendInteger(unsigned char *lp, long long lval) { return lpInsertInteger(lp, lval, eofptr, LP_BEFORE, NULL); } +/* Append batch of entries to the listpack. + * + * This call is more efficient than multiple lpAppend() calls as it only does + * a single realloc() for all the given entries. + * + * In each listpackEntry, if 'sval' is not null, it is assumed entry is string + * and 'sval' and 'slen' will be used. Otherwise, 'lval' will be used to append + * the integer entry. */ +unsigned char *lpBatchAppend(unsigned char *lp, listpackEntry *entries, unsigned long len) { + uint64_t listpack_bytes = lpGetTotalBytes(lp); + unsigned char *eofptr = lp + listpack_bytes - 1; + return lpBatchInsert(lp, eofptr, LP_BEFORE, entries, len, NULL); +} + /* This is just a wrapper for lpInsert() to directly use a string to replace * the current element. The function returns the new listpack as return * value, and also updates the current cursor by updating '*p'. */ @@ -1834,6 +2023,24 @@ static int lpValidation(unsigned char *p, unsigned int head_count, void *userdat return ret; } +static int lpFindCbCmp(const unsigned char *lp, unsigned char *p, void *user, unsigned char *s, long long slen) { + assert(lp); + assert(p); + + char *n = user; + + if (!s) { + int64_t sval; + if (lpStringToInt64((const char*)n, strlen(n), &sval)) + return slen == sval ? 0 : 1; + } else { + if (strlen(n) == (size_t) slen && memcmp(n, s, slen) == 0) + return 0; + } + + return 1; +} + int listpackTest(int argc, char *argv[], int flags) { UNUSED(argc); UNUSED(argv); @@ -2078,6 +2285,111 @@ int listpackTest(int argc, char *argv[], int flags) { zfree(lp); } + TEST("Batch append") { + listpackEntry ent[6] = { + {.sval = (unsigned char*)mixlist[0], .slen = strlen(mixlist[0])}, + {.sval = (unsigned char*)mixlist[1], .slen = strlen(mixlist[1])}, + {.sval = (unsigned char*)mixlist[2], .slen = strlen(mixlist[2])}, + {.lval = 4294967296}, + {.sval = (unsigned char*)mixlist[3], .slen = strlen(mixlist[3])}, + {.lval = -100} + }; + + lp = lpNew(0); + lp = lpBatchAppend(lp, ent, 2); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[1].sval, ent[1].slen); + assert(lpLength(lp) == 2); + + lp = lpBatchAppend(lp, &ent[2], 1); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[1].sval, ent[1].slen); + verifyEntry(lpSeek(lp, 2), ent[2].sval, ent[2].slen); + assert(lpLength(lp) == 3); + + lp = lpDeleteRange(lp, 1, 1); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[2].sval, ent[2].slen); + assert(lpLength(lp) == 2); + + lp = lpBatchAppend(lp, &ent[3], 3); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[2].sval, ent[2].slen); + verifyEntry(lpSeek(lp, 2), (unsigned char*) "4294967296", 10); + verifyEntry(lpSeek(lp, 3), ent[4].sval, ent[4].slen); + verifyEntry(lpSeek(lp, 4), (unsigned char*) "-100", 4); + assert(lpLength(lp) == 5); + + lp = lpDeleteRange(lp, 1, 3); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), (unsigned char*) "-100", 4); + assert(lpLength(lp) == 2); + + lpFree(lp); + } + + TEST("Batch insert") { + lp = lpNew(0); + listpackEntry ent[6] = { + {.sval = (unsigned char*)mixlist[0], .slen = strlen(mixlist[0])}, + {.sval = (unsigned char*)mixlist[1], .slen = strlen(mixlist[1])}, + {.sval = (unsigned char*)mixlist[2], .slen = strlen(mixlist[2])}, + {.lval = 4294967296}, + {.sval = (unsigned char*)mixlist[3], .slen = strlen(mixlist[3])}, + {.lval = -100} + }; + + lp = lpBatchAppend(lp, ent, 4); + assert(lpLength(lp) == 4); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[1].sval, ent[1].slen); + verifyEntry(lpSeek(lp, 2), ent[2].sval, ent[2].slen); + verifyEntry(lpSeek(lp, 3), (unsigned char*)"4294967296", 10); + + /* Insert with LP_BEFORE */ + p = lpSeek(lp, 3); + lp = lpBatchInsert(lp, p, LP_BEFORE, &ent[4], 2, &p); + verifyEntry(p, (unsigned char*)"-100", 4); + assert(lpLength(lp) == 6); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[1].sval, ent[1].slen); + verifyEntry(lpSeek(lp, 2), ent[2].sval, ent[2].slen); + verifyEntry(lpSeek(lp, 3), ent[4].sval, ent[4].slen); + verifyEntry(lpSeek(lp, 4), (unsigned char*)"-100", 4); + verifyEntry(lpSeek(lp, 5), (unsigned char*)"4294967296", 10); + + lp = lpDeleteRange(lp, 1, 2); + assert(lpLength(lp) == 4); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[4].sval, ent[4].slen); + verifyEntry(lpSeek(lp, 2), (unsigned char*)"-100", 4); + verifyEntry(lpSeek(lp, 3), (unsigned char*)"4294967296", 10); + + /* Insert with LP_AFTER */ + p = lpSeek(lp, 0); + lp = lpBatchInsert(lp, p, LP_AFTER, &ent[1], 2, &p); + verifyEntry(p, ent[2].sval, ent[2].slen); + assert(lpLength(lp) == 6); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[1].sval, ent[1].slen); + verifyEntry(lpSeek(lp, 2), ent[2].sval, ent[2].slen); + verifyEntry(lpSeek(lp, 3), ent[4].sval, ent[4].slen); + verifyEntry(lpSeek(lp, 4), (unsigned char*)"-100", 4); + verifyEntry(lpSeek(lp, 5), (unsigned char*)"4294967296", 10); + + lp = lpDeleteRange(lp, 2, 4); + assert(lpLength(lp) == 2); + p = lpSeek(lp, 1); + lp = lpBatchInsert(lp, p, LP_AFTER, &ent[2], 1, &p); + verifyEntry(p, ent[2].sval, ent[2].slen); + assert(lpLength(lp) == 3); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[1].sval, ent[1].slen); + verifyEntry(lpSeek(lp, 2), ent[2].sval, ent[2].slen); + + lpFree(lp); + } + TEST("Batch delete") { unsigned char *lp = createList(); /* char *mixlist[] = {"hello", "foo", "quux", "1024"} */ assert(lpLength(lp) == 4); /* Pre-condition */ @@ -2614,6 +2926,21 @@ int listpackTest(int argc, char *argv[], int flags) { lpFree(lp); } + TEST("Test lpFindCb") { + lp = createList(); /* "hello", "foo", "quux", "1024" */ + assert(lpFindCb(lp, lpFirst(lp), "abc", lpFindCbCmp, 0) == NULL); + verifyEntry(lpFindCb(lp, NULL, "hello", lpFindCbCmp, 0), (unsigned char*)"hello", 5); + verifyEntry(lpFindCb(lp, NULL, "1024", lpFindCbCmp, 0), (unsigned char*)"1024", 4); + verifyEntry(lpFindCb(lp, NULL, "quux", lpFindCbCmp, 0), (unsigned char*)"quux", 4); + verifyEntry(lpFindCb(lp, NULL, "foo", lpFindCbCmp, 0), (unsigned char*)"foo", 3); + lpFree(lp); + + lp = lpNew(0); + assert(lpFindCb(lp, lpFirst(lp), "hello", lpFindCbCmp, 0) == NULL); + assert(lpFindCb(lp, lpFirst(lp), "1024", lpFindCbCmp, 0) == NULL); + lpFree(lp); + } + TEST("Test lpValidateIntegrity") { lp = createList(); long count = 0; @@ -2636,6 +2963,26 @@ int listpackTest(int argc, char *argv[], int flags) { lpFree(lp); } + TEST("Test number of elements exceeds LP_HDR_NUMELE_UNKNOWN with batch insert") { + listpackEntry ent[2] = { + {.sval = (unsigned char*)mixlist[0], .slen = strlen(mixlist[0])}, + {.sval = (unsigned char*)mixlist[1], .slen = strlen(mixlist[1])} + }; + + lp = lpNew(0); + for (int i = 0; i < (LP_HDR_NUMELE_UNKNOWN/2) + 1; i++) + lp = lpBatchAppend(lp, ent, 2); + + assert(lpGetNumElements(lp) == LP_HDR_NUMELE_UNKNOWN); + assert(lpLength(lp) == LP_HDR_NUMELE_UNKNOWN+1); + + lp = lpDeleteRange(lp, -2, 2); + assert(lpGetNumElements(lp) == LP_HDR_NUMELE_UNKNOWN); + assert(lpLength(lp) == LP_HDR_NUMELE_UNKNOWN-1); + assert(lpGetNumElements(lp) == LP_HDR_NUMELE_UNKNOWN-1); /* update length after lpLength */ + lpFree(lp); + } + TEST("Stress with random payloads of different encoding") { unsigned long long start = usec(); int i,j,len,where; diff --git a/src/listpack.h b/src/listpack.h index df492a44c..c9fbc5624 100644 --- a/src/listpack.h +++ b/src/listpack.h @@ -49,6 +49,9 @@ unsigned char *lpReplaceInteger(unsigned char *lp, unsigned char **p, long long unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp); unsigned char *lpDeleteRangeWithEntry(unsigned char *lp, unsigned char **p, unsigned long num); unsigned char *lpDeleteRange(unsigned char *lp, long index, unsigned long num); +unsigned char *lpBatchAppend(unsigned char *lp, listpackEntry *entries, unsigned long len); +unsigned char *lpBatchInsert(unsigned char *lp, unsigned char *p, int where, + listpackEntry *entries, unsigned int len, unsigned char **newp); unsigned char *lpBatchDelete(unsigned char *lp, unsigned char **ps, unsigned long count); unsigned char *lpMerge(unsigned char **first, unsigned char **second); unsigned char *lpDup(unsigned char *lp); @@ -57,6 +60,8 @@ unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf); unsigned char *lpGetValue(unsigned char *p, unsigned int *slen, long long *lval); int lpGetIntegerValue(unsigned char *p, long long *lval); unsigned char *lpFind(unsigned char *lp, unsigned char *p, unsigned char *s, uint32_t slen, unsigned int skip); +typedef int (*lpCmp)(const unsigned char *lp, unsigned char *p, void *user, unsigned char *s, long long slen); +unsigned char *lpFindCb(unsigned char *lp, unsigned char *p, void *user, lpCmp cmp, unsigned int skip); unsigned char *lpFirst(unsigned char *lp); unsigned char *lpLast(unsigned char *lp); unsigned char *lpNext(unsigned char *lp, unsigned char *p); diff --git a/src/t_hash.c b/src/t_hash.c index f39459178..299bea8aa 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -329,31 +329,59 @@ static void listpackExFree(listpackEx *lpt) { zfree(lpt); } +struct lpFingArgs { + uint64_t max_to_search; /* [in] Max number of tuples to search */ + uint64_t expire_time; /* [in] Find the tuple that has a TTL larger than expire_time */ + unsigned char *p; /* [out] First item of the tuple that has a TTL larger than expire_time */ + int expired; /* [out] Number of tuples that have TTLs less than expire_time */ + int index; /* Internally used */ + unsigned char *fptr; /* Internally used, temp ptr */ +}; + +/* Callback for lpFindCb(). Used to find number of expired fields as part of + * active expiry or when trying to find the position for the new field according + * to its expiry time.*/ +static int cbFindInListpack(const unsigned char *lp, unsigned char *p, + void *user, unsigned char *s, long long slen) +{ + (void) lp; + struct lpFingArgs *r = user; + + r->index++; + + if (r->max_to_search == 0) + return 0; /* Break the loop and return */ + + if (r->index % 3 == 1) { + r->fptr = p; /* First item of the tuple. */ + } else if (r->index % 3 == 0) { + serverAssert(!s); + + /* Third item of a tuple is expiry time */ + if (slen == HASH_LP_NO_TTL || (uint64_t) slen >= r->expire_time) { + r->p = r->fptr; + return 0; /* Break the loop and return */ + } + r->expired++; + r->max_to_search--; + } + + return 1; +} + /* Returns number of expired fields. */ static uint64_t listpackExExpireDryRun(const robj *o) { serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX); - uint64_t expired = 0; - unsigned char *fptr; listpackEx *lpt = o->ptr; - fptr = lpFirst(lpt->lp); - while (fptr != NULL) { - long long val; + struct lpFingArgs r = { + .max_to_search = UINT64_MAX, + .expire_time = commandTimeSnapshot(), + }; - fptr = lpNext(lpt->lp, fptr); - serverAssert(fptr); - fptr = lpNext(lpt->lp, fptr); - serverAssert(fptr && lpGetIntegerValue(fptr, &val)); - - if (!hashTypeIsExpired(o, val)) - break; - - expired++; - fptr = lpNext(lpt->lp, fptr); - } - - return expired; + lpFindCb(lpt->lp, NULL, &r, cbFindInListpack, 0); + return r.expired; } /* Returns the expiration time of the item with the nearest expiration. */ @@ -382,74 +410,58 @@ static uint64_t listpackExGetMinExpire(robj *o) { void listpackExExpire(robj *o, ExpireInfo *info) { serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX); uint64_t min = EB_EXPIRE_TIME_INVALID; - unsigned char *ptr, *field; listpackEx *lpt = o->ptr; - ptr = lpFirst(lpt->lp); - while (ptr != NULL && (info->itemsExpired < info->maxToExpire)) { - long long val; + struct lpFingArgs r = { + .max_to_search = info->maxToExpire, + .expire_time = info->now + }; - field = ptr; - ptr = lpNext(lpt->lp, ptr); - serverAssert(ptr); - ptr = lpNext(lpt->lp, ptr); - serverAssert(ptr && lpGetIntegerValue(ptr, &val)); + lpFindCb(lpt->lp, NULL, &r, cbFindInListpack, 0); + info->itemsExpired += r.expired; - /* Fields are ordered by expiry time. If we reached to a non-expired - * field or a non-volatile field, we know rest is not yet expired. */ - if (val == HASH_LP_NO_TTL || (uint64_t) val > info->now) - break; - - lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &field, 3); - ptr = field; - info->itemsExpired++; - } + /* Delete all the expired fields in one go */ + if (r.expired > 0) + lpt->lp = lpDeleteRange(lpt->lp, 0, r.expired * 3); min = hashTypeGetNextTimeToExpire(o); info->nextExpireTime = (min != EB_EXPIRE_TIME_INVALID) ? min : 0; } -/* Remove TTL from the field. */ -static void listpackExPersist(robj *o, sds field, unsigned char *fptr, - unsigned char *vptr) -{ - serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX); - - unsigned char tmp[512]; - unsigned int slen; - long long val; - unsigned char *s; - sds p = NULL; +static void listpackExAddInternal(robj *o, listpackEntry ent[3]) { listpackEx *lpt = o->ptr; - /* To persist a field, we have to delete it first and append to the end as - * we want to maintain order by expiry time. Before deleting it, copy the - * value if it is stored as string. */ - s = lpGetValue(vptr, &slen, &val); - if (s) { - /* Normally, item length in the listpack is limited by - * 'hash-max-listpack-value' config. It is unlikely, but it might be - * larger than sizeof(tmp). */ - if (slen > sizeof(tmp)) - p = sdsnewlen(s, slen); - else - memcpy(tmp, s, slen); + /* Shortcut, just append at the end if this is a non-volatile field. */ + if (ent[2].lval == HASH_LP_NO_TTL) { + lpt->lp = lpBatchAppend(lpt->lp, ent, 3); + return; } - /* Delete field name, value and expiry time. */ - lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &fptr, 3); + struct lpFingArgs r = { + .max_to_search = UINT64_MAX, + .expire_time = ent[2].lval, + }; - /* Append field to the end as it does not have expiry time. */ - lpt->lp = lpAppend(lpt->lp, (unsigned char*)field, sdslen(field)); + /* Check if there is a field with a larger TTL. */ + lpFindCb(lpt->lp, NULL, &r, cbFindInListpack, 0); - if (s) - lpt->lp = lpAppend(lpt->lp, p ? (unsigned char*) p : tmp, slen); + /* If list is empty or there is no field with a larger TTL, result will be + * NULL. Otherwise, just insert before the found item.*/ + if (r.p) + lpt->lp = lpBatchInsert(lpt->lp, r.p, LP_BEFORE, ent, 3, NULL); else - lpt->lp = lpAppendInteger(lpt->lp, val); + lpt->lp = lpBatchAppend(lpt->lp, ent, 3); +} - lpt->lp = lpAppendInteger(lpt->lp, HASH_LP_NO_TTL); +/* Add new field ordered by expire time. */ +void listpackExAddNew(robj *o, sds field, sds value, uint64_t expireAt) { + listpackEntry ent[3] = { + {.sval = (unsigned char*) field, .slen = sdslen(field)}, + {.sval = (unsigned char*) value, .slen = sdslen(value)}, + {.lval = expireAt} + }; - sdsfree(p); + listpackExAddInternal(o, ent); } /* If expiry time is changed, this function will place field into the correct @@ -458,13 +470,13 @@ static void listpackExPersist(robj *o, sds field, unsigned char *fptr, static void listpackExUpdateExpiry(robj *o, sds field, unsigned char *fptr, unsigned char *vptr, - uint64_t expireAt) { - unsigned int slen; - long long val; + uint64_t expire_at) { + unsigned int slen = 0; + long long val = 0; unsigned char tmp[512] = {0}; - unsigned char *valstr, *elem; - listpackEx *lpt = o->ptr; + unsigned char *valstr; sds tmpval = NULL; + listpackEx *lpt = o->ptr; /* Copy value */ valstr = lpGetValue(vptr, &slen, &val); @@ -481,99 +493,23 @@ static void listpackExUpdateExpiry(robj *o, sds field, /* Delete field name, value and expiry time */ lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &fptr, 3); - /* Insert to the listpack */ - fptr = lpFirst(lpt->lp); - while (fptr) { - long long currExpiry; + listpackEntry ent[3] = {{0}}; - elem = fptr; /* Keep a pointer to field name */ - fptr = lpNext(lpt->lp, fptr); - serverAssert(fptr); - fptr = lpNext(lpt->lp, fptr); - serverAssert(fptr && lpGetIntegerValue(fptr, &currExpiry)); + ent[0].sval = (unsigned char*) field; + ent[0].slen = sdslen(field); - if (currExpiry == HASH_LP_NO_TTL || (uint64_t) currExpiry >= expireAt) { - /* Found a field with no expiry time or with a higher expiry time. - * Insert new field just before it. */ - lpt->lp = lpInsertString(lpt->lp, (unsigned char*) field, - sdslen(field), elem, LP_BEFORE, &fptr); - - /* Insert value after field name */ - if (valstr) { - lpt->lp = lpInsertString(lpt->lp, - tmpval ? (unsigned char*) tmpval : tmp, - slen, fptr, LP_AFTER, &fptr); - } else { - lpt->lp = lpInsertInteger(lpt->lp, val, fptr, LP_AFTER, &fptr); - } - - /* Insert expiry time after value. */ - lpt->lp = lpInsertInteger(lpt->lp, (long long) expireAt, fptr, - LP_AFTER, NULL); - goto out; - } - - fptr = lpNext(lpt->lp, fptr); + if (valstr) { + ent[1].sval = tmpval ? (unsigned char *) tmpval : tmp; + ent[1].slen = slen; + } else { + ent[1].lval = val; } + ent[2].lval = expire_at; - /* Listpack is empty, append new item */ - lpt->lp = lpAppend(lpt->lp, (unsigned char*)field, sdslen(field)); - if (valstr) - lpt->lp = lpAppend(lpt->lp, tmpval ? (unsigned char*) tmpval : tmp, slen); - else - lpt->lp = lpAppendInteger(lpt->lp, val); - - lpt->lp = lpAppendInteger(lpt->lp, (long long) expireAt); - -out: + listpackExAddInternal(o, ent); sdsfree(tmpval); } -/* Add new field ordered by expire time. */ -void listpackExAddNew(robj *o, sds field, sds value, uint64_t expireAt) { - unsigned char *fptr, *elem; - listpackEx *lpt = o->ptr; - - /* Shortcut, just append at the end if this is a non-volatile field. */ - if (expireAt == HASH_LP_NO_TTL) { - goto append; - } - - fptr = lpFirst(lpt->lp); - while (fptr) { - long long currExpiry; - - elem = fptr; /* Keep a pointer to field name */ - fptr = lpNext(lpt->lp, fptr); - serverAssert(fptr); - fptr = lpNext(lpt->lp, fptr); - serverAssert(fptr && lpGetIntegerValue(fptr, &currExpiry)); - - if (currExpiry == HASH_LP_NO_TTL || (uint64_t) currExpiry >= expireAt) { - /* Found a field with no expiry time or with a higher expiry time. - * Insert new field just before it. */ - lpt->lp = lpInsertString(lpt->lp, (unsigned char*) field, - sdslen(field), elem, LP_BEFORE, &fptr); - - lpt->lp = lpInsertString(lpt->lp,(unsigned char*) value, sdslen(value), - fptr, LP_AFTER, &fptr); - - /* Insert expiry time after value. */ - lpt->lp = lpInsertInteger(lpt->lp, (long long) expireAt, fptr, - LP_AFTER, NULL); - return; - } - - fptr = lpNext(lpt->lp, fptr); - } - - /* Either listpack is empty or field expiry time is HASH_LP_NO_TTL */ -append: - lpt->lp = lpAppend(lpt->lp, (unsigned char*)field, sdslen(field)); - lpt->lp = lpAppend(lpt->lp, (unsigned char*)value, sdslen(value)); - lpt->lp = lpAppendInteger(lpt->lp, (long long) expireAt); -} - /* Update field expire time. */ SetExRes hashTypeSetExpiryListpack(HashTypeSetEx *ex, sds field, unsigned char *fptr, unsigned char *vptr, @@ -1209,7 +1145,7 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS goto out; } else if (res == HSET_UPDATE && expireTime != HASH_LP_NO_TTL) { /* Clear TTL */ - listpackExPersist(o, field, fptr, vptr); + listpackExUpdateExpiry(o, field, fptr, vptr, HASH_LP_NO_TTL); } } } @@ -3058,7 +2994,7 @@ void hpersistCommand(client *c) { continue; } - listpackExPersist(hashObj, field, fptr, vptr); + listpackExUpdateExpiry(hashObj, field, fptr, vptr, HASH_LP_NO_TTL); addReplyLongLong(c, HFE_PERSIST_OK); changed = 1; } @@ -3473,10 +3409,8 @@ static int hgetfReplyValueAndSetExpiry(client *c, robj *o, sds field, int flag, ebAdd(&meta->hfe, &hashFieldExpireBucketsType, hf, expireAt); } } else { - if (flag & HFE_CMD_PERSIST) - listpackExPersist(o, field, fptr, vptr); - else - listpackExUpdateExpiry(o, field, fptr, vptr, expireAt); + uint64_t exp = flag & HFE_CMD_PERSIST ? HASH_LP_NO_TTL : expireAt; + listpackExUpdateExpiry(o, field, fptr, vptr, exp); } return 1; @@ -3659,25 +3593,26 @@ static int hsetfSetFieldAndReply(client *c, robj *o, sds field, sds value, } } } else { - lpt->lp = lpReplace(lpt->lp, &vptr, (unsigned char *) value, sdslen(value)); - fptr = lpPrev(lpt->lp, vptr); /* Update fptr as above line invalidates it. */ - serverAssert(fptr != NULL); + if (ret != HSETF_FIELD_AND_TTL) { + /* We just set the field value without updating the TTL */ + lpt->lp = lpReplace(lpt->lp, &vptr, (unsigned char *) value, sdslen(value)); + } else { + /* We are going to update TTL. Delete the field first and then + * insert again according to new TTL if necessary. */ + lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &fptr, 3); - if (ret == HSETF_FIELD_AND_TTL) { if (*minPrevExp > prevExpire) *minPrevExp = prevExpire; if (!(flag & HFE_CMD_EXPIRY_MASK)) { /* If none of EX,EXAT,PX,PXAT,KEEPTTL is specified, TTL is * discarded. */ - listpackExPersist(o, field, fptr, vptr); - } else if (checkAlreadyExpired(expireAt)) { - hashTypeDelete(o, field); - } else { + listpackExAddNew(o, field, value, HASH_LP_NO_TTL); + } else if (!checkAlreadyExpired(expireAt)){ if (*minPrevExp > expireAt) *minPrevExp = expireAt; - listpackExUpdateExpiry(o, field, fptr, vptr, expireAt); + listpackExAddNew(o, field, value, expireAt); } } }