diff --git a/src/aof.c b/src/aof.c index 9632d9c5b..17e72febb 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1939,7 +1939,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) { * * The function returns 0 on error, non-zero on success. */ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) { - if (hi->encoding == OBJ_ENCODING_LISTPACK) { + if ((hi->encoding == OBJ_ENCODING_LISTPACK) || (hi->encoding == OBJ_ENCODING_LISTPACK_EX)) { unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; @@ -1963,37 +1963,60 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) { /* Emit the commands needed to rebuild a hash object. * The function returns 0 on error, 1 on success. */ int rewriteHashObject(rio *r, robj *key, robj *o) { + int res = 0; /*fail*/ + hashTypeIterator *hi; long long count = 0, items = hashTypeLength(o, 0); + int isHFE = hashTypeGetMinExpire(o) != EB_EXPIRE_TIME_INVALID; hi = hashTypeInitIterator(o); - while (hashTypeNext(hi, 0) != C_ERR) { - if (count == 0) { - int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ? - AOF_REWRITE_ITEMS_PER_CMD : items; - if (!rioWriteBulkCount(r,'*',2+cmd_items*2) || - !rioWriteBulkString(r,"HMSET",5) || - !rioWriteBulkObject(r,key)) - { - hashTypeReleaseIterator(hi); - return 0; + if (!isHFE) { + while (hashTypeNext(hi, 0) != C_ERR) { + if (count == 0) { + int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ? + AOF_REWRITE_ITEMS_PER_CMD : items; + if (!rioWriteBulkCount(r, '*', 2 + cmd_items * 2) || + !rioWriteBulkString(r, "HMSET", 5) || + !rioWriteBulkObject(r, key)) + goto reHashEnd; + } + + if (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) || + !rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE)) + goto reHashEnd; + + if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0; + items--; + } + } else { + while (hashTypeNext(hi, 0) != C_ERR) { + + char hmsetCmd[] = "*4\r\n$5\r\nHMSET\r\n"; + if ( (!rioWrite(r, hmsetCmd, sizeof(hmsetCmd) - 1)) || + (!rioWriteBulkObject(r, key)) || + (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY)) || + (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE)) ) + goto reHashEnd; + + if (hi->expire_time != EB_EXPIRE_TIME_INVALID) { + char cmd[] = "*6\r\n$10\r\nHPEXPIREAT\r\n"; + if ( (!rioWrite(r, cmd, sizeof(cmd) - 1)) || + (!rioWriteBulkObject(r, key)) || + (!rioWriteBulkLongLong(r, hi->expire_time)) || + (!rioWriteBulkString(r, "FIELDS", 6)) || + (!rioWriteBulkString(r, "1", 1)) || + (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY)) ) + goto reHashEnd; } } - - if (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) || - !rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE)) - { - hashTypeReleaseIterator(hi); - return 0; - } - if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0; - items--; } - hashTypeReleaseIterator(hi); + res = 1; /* success */ - return 1; +reHashEnd: + hashTypeReleaseIterator(hi); + return res; } /* Helper for rewriteStreamObject() that generates a bulk string into the diff --git a/src/cluster.c b/src/cluster.c index f0d25ea8f..6988d6b53 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -174,6 +174,7 @@ void dumpCommand(client *c) { /* RESTORE key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency] */ void restoreCommand(client *c) { + uint64_t minExpiredField = EB_EXPIRE_TIME_INVALID; long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1; rio payload; int j, type, replace = 0, absttl = 0; @@ -237,7 +238,7 @@ void restoreCommand(client *c) { rioInitWithBuffer(&payload,c->argv[3]->ptr); if (((type = rdbLoadObjectType(&payload)) == -1) || - ((obj = rdbLoadObject(type,&payload,key->ptr,c->db,0,NULL)) == NULL)) + ((obj = rdbLoadObject(type,&payload,key->ptr,c->db,NULL, &minExpiredField)) == NULL)) { addReplyError(c,"Bad data format"); return; @@ -263,7 +264,13 @@ void restoreCommand(client *c) { } /* Create the key and set the TTL if any */ - dbAdd(c->db,key,obj); + dictEntry *de = dbAdd(c->db,key,obj); + + /* If minExpiredField was set, then the object is hash with expiration + * on fields and need to register it in global HFE DS */ + if (minExpiredField != EB_EXPIRE_TIME_INVALID) + hashTypeAddToExpires(c->db, dictGetKey(de), obj, minExpiredField); + if (ttl) { setExpire(c,c->db,key,ttl); if (!absttl) { diff --git a/src/db.c b/src/db.c index fa41cc9dc..1991a5807 100644 --- a/src/db.c +++ b/src/db.c @@ -276,6 +276,11 @@ static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEnt old = dictGetVal(de); } kvstoreDictSetVal(db->keys, slot, de, val); + + /* if hash with HFEs, take care to remove from global HFE DS */ + if (old->type == OBJ_HASH) + hashTypeRemoveFromExpires(&db->hexpires, old); + if (server.lazyfree_lazy_server_del) { freeObjAsync(key,old,db->id); } else { @@ -1632,7 +1637,8 @@ void copyCommand(client *c) { if (expire != -1) setExpire(c, dst, newkey, expire); - /* If hash with expiration on fields then add it to 'dst' global HFE DS */ + /* If minExpiredField was set, then the object is hash with expiration + * on fields and need to register it in global HFE DS */ if (minHashExpire != EB_EXPIRE_TIME_INVALID) hashTypeAddToExpires(dst, dictGetKey(deCopy), newobj, minHashExpire); @@ -1768,11 +1774,13 @@ void swapMainDbWithTempDb(redisDb *tempDb) { * remain in the same DB they were. */ activedb->keys = newdb->keys; activedb->expires = newdb->expires; + activedb->hexpires = newdb->hexpires; activedb->avg_ttl = newdb->avg_ttl; activedb->expires_cursor = newdb->expires_cursor; newdb->keys = aux.keys; newdb->expires = aux.expires; + newdb->hexpires = aux.hexpires; newdb->avg_ttl = aux.avg_ttl; newdb->expires_cursor = aux.expires_cursor; diff --git a/src/debug.c b/src/debug.c index 84e96aa14..4b5f73061 100644 --- a/src/debug.c +++ b/src/debug.c @@ -204,13 +204,18 @@ void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o) unsigned char eledigest[20]; sds sdsele; + /* field */ memset(eledigest,0,20); sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_KEY); mixDigest(eledigest,sdsele,sdslen(sdsele)); sdsfree(sdsele); + /* val */ sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE); mixDigest(eledigest,sdsele,sdslen(sdsele)); sdsfree(sdsele); + /* hash-field expiration (HFE) */ + if (hi->expire_time != EB_EXPIRE_TIME_INVALID) + xorDigest(eledigest,"!!hexpire!!",11); xorDigest(digest,eledigest,20); } hashTypeReleaseIterator(hi); diff --git a/src/ebuckets.c b/src/ebuckets.c index 723586285..387aef88c 100644 --- a/src/ebuckets.c +++ b/src/ebuckets.c @@ -23,6 +23,12 @@ * #define EB_VALIDATE_DEBUG 1 */ +#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK) +#define EB_VALIDATE_STRUCTURE(eb, type) ebValidate(eb, type) +#else +#define EB_VALIDATE_STRUCTURE(eb, type) // Do nothing +#endif + /*** BENCHMARK * * To benchmark ebuckets creation and active-expire with 10 million items, apply @@ -190,7 +196,7 @@ static inline uint64_t raxKey2BucketKey(unsigned char *raxKey) { * Before: [segHdr] -> {item1,..,item16} -> [..] * After: [segHdr] -> {newItem} -> [nextSegHdr] -> {item1,..,item16} -> [..] * - * Take care to persist `segHdr` to be the same instance after the change. + * Taken care to persist `segHdr` to be the same instance after the change. * This is important because the rax tree is pointing to it. */ static int ebSegAddExtended(EbucketsType *type, FirstSegHdr *firstSegHdr, eItem newItem) { /* Allocate nextSegHdr and let it take the items of first segment header */ @@ -1390,9 +1396,8 @@ int ebRemove(ebuckets *eb, EbucketsType *type, eItem item) { if (res) type->getExpireMeta(item)->trash = 1; -#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK) - ebValidate(*eb, type); -#endif + EB_VALIDATE_STRUCTURE(*eb, type); + return res; } @@ -1435,9 +1440,9 @@ int ebAdd(ebuckets *eb, EbucketsType *type, eItem item, uint64_t expireTime) { /* Add item to rax */ res = ebAddToRax(eb, type, item, EB_BUCKET_KEY(expireTime)); } -#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK) - ebValidate(*eb, type); -#endif + + EB_VALIDATE_STRUCTURE(*eb, type); + return res; } @@ -1521,9 +1526,9 @@ END_ACTEXP: ebAdd(eb, type, updateList, ebGetMetaExpTime(mItem)); updateList = next; } -#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK) - ebValidate(*eb, type); -#endif + + EB_VALIDATE_STRUCTURE(*eb, type); + return; } diff --git a/src/ebuckets.h b/src/ebuckets.h index f30337330..66954131a 100644 --- a/src/ebuckets.h +++ b/src/ebuckets.h @@ -139,7 +139,7 @@ * The idea of it is to trim the rax tree depth, avoid having too many branches, * and reduce frequent modifications of the tree to the minimum. */ -#define EB_BUCKET_KEY_PRECISION 0 /* 1024msec */ +#define EB_BUCKET_KEY_PRECISION 0 /* TBD: modify to 10 */ /* From expiration time to bucket-key */ #define EB_BUCKET_KEY(exptime) ((exptime) >> EB_BUCKET_KEY_PRECISION) diff --git a/src/module.c b/src/module.c index 2f3a81515..d4eefdd99 100644 --- a/src/module.c +++ b/src/module.c @@ -5271,7 +5271,10 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) { /* Handle XX and NX */ if (flags & (REDISMODULE_HASH_XX|REDISMODULE_HASH_NX)) { - int exists = hashTypeExists(key->value, field->ptr); + int isHashDeleted; + int exists = hashTypeExists(key->db, key->value, field->ptr, &isHashDeleted); + /* hash-field-expiration is not exposed to modules */ + serverAssert(isHashDeleted == 0); if (((flags & REDISMODULE_HASH_XX) && !exists) || ((flags & REDISMODULE_HASH_NX) && exists)) { @@ -5282,7 +5285,7 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) { /* Handle deletion if value is REDISMODULE_HASH_DELETE. */ if (value == REDISMODULE_HASH_DELETE) { - count += hashTypeDelete(key->value, field->ptr); + count += hashTypeDelete(key->value, field->ptr, 1); if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field); continue; } @@ -5374,14 +5377,22 @@ int RM_HashGet(RedisModuleKey *key, int flags, ...) { /* Query the hash for existence or value object. */ if (flags & REDISMODULE_HASH_EXISTS) { existsptr = va_arg(ap,int*); - if (key->value) - *existsptr = hashTypeExists(key->value,field->ptr); - else + if (key->value) { + int isHashDeleted; + *existsptr = hashTypeExists(key->db, key->value, field->ptr, &isHashDeleted); + /* hash-field-expiration is not exposed to modules */ + serverAssert(isHashDeleted == 0); + } else { *existsptr = 0; + } } else { + int isHashDeleted; valueptr = va_arg(ap,RedisModuleString**); if (key->value) { - *valueptr = hashTypeGetValueObject(key->value,field->ptr); + *valueptr = hashTypeGetValueObject(key->db,key->value,field->ptr, &isHashDeleted); + + /* Currently hash-field-expiration is not exposed to modules */ + serverAssert(isHashDeleted == 0); if (*valueptr) { robj *decoded = getDecodedObject(*valueptr); decrRefCount(*valueptr); diff --git a/src/mstr.h b/src/mstr.h index fa7d4b214..1613a637e 100644 --- a/src/mstr.h +++ b/src/mstr.h @@ -209,6 +209,9 @@ static inline int mstrIsMetaAttached(mstr s) { return s[-1] & MSTR_META_MASK; } /* return whether if a specific flag-index is set */ static inline int mstrGetFlag(mstr s, int flagIdx) { return *mstrFlagsRef(s) & (1 << flagIdx); } +/* DEBUG */ +void mstrPrint(mstr s, struct mstrKind *kind, int verbose); + /* See comment above about MSTR-ALIGNMENT(2) */ static_assert(sizeof(struct mstrhdr5 ) % 2 == 1, "must be odd"); static_assert(sizeof(struct mstrhdr8 ) % 2 == 1, "must be odd"); diff --git a/src/networking.c b/src/networking.c index 245e2ebdd..885863a18 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3757,7 +3757,9 @@ void replaceClientCommandVector(client *c, int argc, robj **argv) { * 1. Make sure there are no "holes" and all the arguments are set. * 2. If the original argument vector was longer than the one we * want to end with, it's up to the caller to set c->argc and - * free the no longer used objects on c->argv. */ + * free the no longer used objects on c->argv. + * 3. To remove argument at i'th index, pass NULL as new value + */ void rewriteClientCommandArgument(client *c, int i, robj *newval) { robj *oldval; retainOriginalCommandVector(c); @@ -3775,9 +3777,18 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { } oldval = c->argv[i]; if (oldval) c->argv_len_sum -= getStringObjectLen(oldval); - if (newval) c->argv_len_sum += getStringObjectLen(newval); - c->argv[i] = newval; - incrRefCount(newval); + + if (newval) { + c->argv[i] = newval; + incrRefCount(newval); + c->argv_len_sum += getStringObjectLen(newval); + } else { + /* move the remaining arguments one step left */ + for (int j = i+1; j < c->argc; j++) { + c->argv[j-1] = c->argv[j]; + } + c->argv[--c->argc] = NULL; + } if (oldval) decrRefCount(oldval); /* If this is the command name make sure to fix c->cmd. */ diff --git a/src/rdb.c b/src/rdb.c index 6053cf40a..4330c5694 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -270,7 +270,7 @@ int rdbEncodeInteger(long long value, unsigned char *enc) { void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) { int plainFlag = flags & RDB_LOAD_PLAIN; int sdsFlag = flags & RDB_LOAD_SDS; - int hfldFlag = flags & RDB_LOAD_HFLD; + int hfldFlag = flags & (RDB_LOAD_HFLD|RDB_LOAD_HFLD_TTL); int encode = flags & RDB_LOAD_ENC; unsigned char enc[4]; long long val; @@ -305,7 +305,7 @@ void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) { } else if (sdsFlag) { p = sdsnewlen(SDS_NOINIT,len); } else { /* hfldFlag */ - p = hfieldNew(NULL, len, 0); + p = hfieldNew(NULL, len, (flags&RDB_LOAD_HFLD) ? 0 : 1); } memcpy(p,buf,len); return p; @@ -377,7 +377,7 @@ ssize_t rdbSaveLzfStringObject(rio *rdb, unsigned char *s, size_t len) { void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) { int plainFlag = flags & RDB_LOAD_PLAIN; int sdsFlag = flags & RDB_LOAD_SDS; - int hfldFlag = flags & RDB_LOAD_HFLD; + int hfldFlag = flags & (RDB_LOAD_HFLD | RDB_LOAD_HFLD_TTL); int robjFlag = (!(plainFlag || sdsFlag || hfldFlag)); /* not plain/sds/hfld */ uint64_t len, clen; @@ -397,7 +397,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) { } else if (sdsFlag || robjFlag) { val = sdstrynewlen(SDS_NOINIT,len); } else { /* hfldFlag */ - val = hfieldTryNew(NULL, len, 0); + val = hfieldTryNew(NULL, len, (flags&RDB_LOAD_HFLD) ? 0 : 1); } if (!val) { @@ -505,6 +505,7 @@ ssize_t rdbSaveStringObject(rio *rdb, robj *obj) { * instead of a Redis object with an sds in it. * RDB_LOAD_SDS: Return an SDS string instead of a Redis object. * RDB_LOAD_HFLD: Return a hash field object (mstr) + * RDB_LOAD_HFLD_TTL: Return a hash field with TTL metadata reserved * * On I/O error NULL is returned. */ @@ -512,7 +513,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) { void *buf; int plainFlag = flags & RDB_LOAD_PLAIN; int sdsFlag = flags & RDB_LOAD_SDS; - int hfldFlag = flags & RDB_LOAD_HFLD; + int hfldFlag = flags & (RDB_LOAD_HFLD|RDB_LOAD_HFLD_TTL); int robjFlag = (!(plainFlag || sdsFlag || hfldFlag)); /* not plain/sds/hfld */ int isencoded; @@ -555,7 +556,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) { } else if (sdsFlag) { buf = sdstrynewlen(SDS_NOINIT,len); } else { /* hfldFlag */ - buf = hfieldTryNew(NULL, len, 0); + buf = hfieldTryNew(NULL, len, (flags&RDB_LOAD_HFLD) ? 0 : 1); } if (!buf) { serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len); @@ -1888,10 +1889,18 @@ int lpValidateIntegrityAndDups(unsigned char *lp, size_t size, int deep, int tup /* Load a Redis object of the specified type from the specified file. * On success a newly allocated object is returned, otherwise NULL. - * When the function returns NULL and if 'error' is not NULL, the - * integer pointed by 'error' is set to the type of error that occurred */ -robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, - int *error) { + * + * error - When the function returns NULL and if 'error' is not NULL, the + * integer pointed by 'error' is set to the type of error that occurred + * minExpiredField - If loading a hash with expiration on fields, then this value + * will be set to the minimum expire time found in the hash fields. If there are + * no fields with expiration or it is not a hash, then it will set be to + * EB_EXPIRE_TIME_INVALID. + */ +robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int *error, + uint64_t *minExpiredField) +{ + uint64_t minExpField = EB_EXPIRE_TIME_INVALID; robj *o = NULL, *ele, *dec; uint64_t len; unsigned int i; @@ -2110,7 +2119,6 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, sds value; hfield field; dict *dupSearchDict = NULL; - ebuckets *hexpires = (db != NULL ? &db->hexpires : NULL); len = rdbLoadLen(rdb, NULL); if (len == RDB_LENERR) return NULL; @@ -2120,7 +2128,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, /* Too many entries? Use a hash table right from the start. */ if (len > server.hash_max_listpack_entries) - hashTypeConvert(o, OBJ_ENCODING_HT, hexpires); + hashTypeConvert(o, OBJ_ENCODING_HT, NULL); else if (deep_integrity_validation) { /* In this mode, we need to guarantee that the server won't crash * later when the ziplist is converted to a dict. @@ -2164,7 +2172,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, sdslen(value) > server.hash_max_listpack_value || !lpSafeToAdd(o->ptr, hfieldlen(field) + sdslen(value))) { - hashTypeConvert(o, OBJ_ENCODING_HT, hexpires); + hashTypeConvert(o, OBJ_ENCODING_HT, NULL); dictUseStoredKeyApi((dict *)o->ptr, 1); ret = dictAdd((dict*)o->ptr, field, value); dictUseStoredKeyApi((dict *)o->ptr, 0); @@ -2233,13 +2241,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, /* All pairs should be read by now */ serverAssert(len == 0); } else if (rdbtype == RDB_TYPE_HASH_METADATA) { - size_t fieldLen; sds value, field; - uint64_t expire, minExpire = EB_EXPIRE_TIME_INVALID; - mstime_t now = mstime(); + uint64_t expireAt; dict *dupSearchDict = NULL; - ebuckets *hexpires = (db != NULL ? &db->hexpires : NULL); len = rdbLoadLen(rdb, NULL); if (len == RDB_LENERR) return NULL; @@ -2248,11 +2253,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, o = createHashObject(); /* Too many entries? Use a hash table right from the start. */ if (len > server.hash_max_listpack_entries) { - hashTypeConvert(o, OBJ_ENCODING_HT, hexpires); + hashTypeConvert(o, OBJ_ENCODING_HT, NULL); dictTypeAddMeta((dict**)&o->ptr, &mstrHashDictTypeWithHFE); initDictExpireMetadata(key, o); } else { - hashTypeConvert(o, OBJ_ENCODING_LISTPACK_EX, hexpires); + hashTypeConvert(o, OBJ_ENCODING_LISTPACK_EX, NULL); if (deep_integrity_validation) { /* In this mode, we need to guarantee that the server won't crash * later when the listpack is converted to a dict. @@ -2266,20 +2271,25 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, len--; /* read the TTL */ - if (rdbLoadLenByRef(rdb, NULL, &expire) == -1) { + if (rdbLoadLenByRef(rdb, NULL, &expireAt) == -1) { serverLog(LL_WARNING, "failed reading hash TTL"); decrRefCount(o); if (dupSearchDict != NULL) dictRelease(dupSearchDict); return NULL; } - if (expire > EB_EXPIRE_TIME_MAX) { - rdbReportCorruptRDB("invalid expire time: %llu", (unsigned long long)expire); + if (expireAt > EB_EXPIRE_TIME_MAX) { + rdbReportCorruptRDB("invalid expireAt time: %llu", (unsigned long long)expireAt); decrRefCount(o); return NULL; } - /* read the field name */ - if ((field = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, &fieldLen)) == NULL) { + /* if needed create field with TTL metadata */ + if (expireAt !=0) + field = rdbGenericLoadStringObject(rdb, RDB_LOAD_HFLD_TTL, &fieldLen); + else + field = rdbGenericLoadStringObject(rdb, RDB_LOAD_HFLD, &fieldLen); + + if (field == NULL) { serverLog(LL_WARNING, "failed reading hash field"); decrRefCount(o); if (dupSearchDict != NULL) dictRelease(dupSearchDict); @@ -2291,40 +2301,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, serverLog(LL_WARNING, "failed reading hash value"); decrRefCount(o); if (dupSearchDict != NULL) dictRelease(dupSearchDict); - sdsfree(field); + hfieldFree(field); return NULL; } - /* Check if the hash field already expired. This function is used when - * loading an RDB file from disk, either at startup, or when an RDB was - * received from the master. In the latter case, the master is - * responsible for hash field expiry. If we would expire hash fields here, - * the snapshot taken by the master may not be reflected on the slave. - * Similarly, if the base AOF is RDB format, we want to load all - * the hash fields there are, since the log of operations in the incr AOF - * is assumed to work in the exact keyspace state. - * Expired hash fields on the master are silently discarded. - * Note that if all fields in a hash has expired, the hash would not be - * created in memory (because it is created on the first valid field), and - * thus the key would be discarded as an "empty key" */ - if (expire != 0 && iAmMaster() && ((mstime_t)expire < now) && /* note: expire was saved to RDB as unix-time in milliseconds */ - !(rdbflags & RDBFLAGS_AOF_PREAMBLE)) - { - /* TODO: consider replication (like in rdbLoadAddKeyToDb) */ - server.rdb_last_load_hash_fields_expired++; - sdsfree(field); - sdsfree(value); - continue; - } - /* keep the nearest expiration to connect listpack object to db expiry */ - if ((expire != 0) && (expire < minExpire)) minExpire = expire; + if ((expireAt != 0) && (expireAt < minExpField)) minExpField = expireAt; /* store the values read - either to listpack or dict */ if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { /* integrity - check for key duplication (if required) */ if (dupSearchDict) { - sds field_dup = sdsnewlen(field, sdslen(field)); + sds field_dup = sdsnewlen(field, hfieldlen(field)); if (dictAdd(dupSearchDict, field_dup, NULL) != DICT_OK) { rdbReportCorruptRDB("Hash with dup elements"); @@ -2332,18 +2320,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, decrRefCount(o); sdsfree(field_dup); sdsfree(value); - sdsfree(field); + hfieldFree(field); return NULL; } } /* check if the values can be saved to listpack (or should convert to dict encoding) */ - if (sdslen(field) > server.hash_max_listpack_value || + if (hfieldlen(field) > server.hash_max_listpack_value || sdslen(value) > server.hash_max_listpack_value || - !lpSafeToAdd(((listpackEx*)o->ptr)->lp, sdslen(field) + sdslen(value) + lpEntrySizeInteger(expire))) + !lpSafeToAdd(((listpackEx*)o->ptr)->lp, hfieldlen(field) + sdslen(value) + lpEntrySizeInteger(expireAt))) { /* convert to hash */ - hashTypeConvert(o, OBJ_ENCODING_HT, hexpires); + hashTypeConvert(o, OBJ_ENCODING_HT, NULL); if (len > DICT_HT_INITIAL_SIZE) { /* TODO: this is NOT the original len, but this is also the case for simple hash, is this a bug? */ if (dictTryExpand(o->ptr, len) != DICT_OK) { @@ -2351,36 +2339,40 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, decrRefCount(o); if (dupSearchDict != NULL) dictRelease(dupSearchDict); sdsfree(value); - sdsfree(field); + hfieldFree(field); return NULL; } } /* don't add the values to the new hash: the next if will catch and the values will be added there */ } else { - listpackExAddNew(o, field, value, expire); - sdsfree(field); + listpackExAddNew(o, field, hfieldlen(field), + value, sdslen(value), expireAt); + hfieldFree(field); sdsfree(value); } } if (o->encoding == OBJ_ENCODING_HT) { - /* WA for check-rdb mode, when there's no DB so can't attach expired items to ebuckets, - * or when no expiry was not set for this field */ - if ((db == NULL) || (expire == 0)) { - hashTypeSet(db, o, field, value, 0); - } else { - if (hashTypeSetExRdb(db, o, field, value, expire) != C_OK) { - serverLog(LL_WARNING, "failed adding hash field %s to key %s", field, key); - decrRefCount(o); - if (dupSearchDict != NULL) dictRelease(dupSearchDict); - sdsfree(value); - sdsfree(field); - return NULL; - } + /* Add pair to hash table */ + dict *d = o->ptr; + dictUseStoredKeyApi(d, 1); + int ret = dictAdd(d, field, value); + dictUseStoredKeyApi(d, 0); + + /* Attach expiry to the hash field and register in hash private HFE DS */ + if ((ret != DICT_ERR) && expireAt) { + dictExpireMetadata *m = (dictExpireMetadata *) dictMetadata(d); + ret = ebAdd(&m->hfe, &hashFieldExpireBucketsType, field, expireAt); + } + + if (ret == DICT_ERR) { + rdbReportCorruptRDB("Duplicate hash fields detected"); + sdsfree(value); + hfieldFree(field); + decrRefCount(o); + return NULL; } - sdsfree(field); - sdsfree(value); } } @@ -2389,10 +2381,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, /* check for empty key (if all fields were expired) */ if (hashTypeLength(o, 0) == 0) { decrRefCount(o); - goto emptykey; + goto expiredHash; } - if ((db != NULL) && (minExpire != EB_EXPIRE_TIME_INVALID)) - hashTypeAddToExpires(db, key, o, minExpire); } else if (rdbtype == RDB_TYPE_LIST_QUICKLIST || rdbtype == RDB_TYPE_LIST_QUICKLIST_2) { if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; if (len == 0) goto emptykey; @@ -2717,33 +2707,13 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, } /* for TTL listpack, find the minimum expiry */ - uint64_t minExpire = hashTypeGetNextTimeToExpire(o); + minExpField = hashTypeGetNextTimeToExpire(o); + + /* Convert listpack to hash table without registering in global HFE DS, + * if has HFEs, since the listpack is not connected yet to the DB */ + if (hashTypeLength(o, 0) > server.hash_max_listpack_entries) + hashTypeConvert(o, OBJ_ENCODING_HT, NULL /*db->hexpires*/); - /* check if need to convert to dict encoding */ - if ((db != NULL) && - (hashTypeLength(o, 0) > server.hash_max_listpack_entries)) /* TODO: each field length is not verified against server.hash_max_listpack_value */ - { - hashTypeConvert(o, OBJ_ENCODING_HT, &db->hexpires); - /* - * hashTypeAddToExpires is presumably called from within - * the convert function (from listpackEx to dict), BUT, - * this call depends on the lpt->meta field to be updated, - * which is not the case here as hashTypeAddToExpires was - * not yet called for the listpack (which is what updating - * its meta). - * Instead, this "manual" call is added here. - * Another approach would be to have the conversion function - * find the minExpire by itself when iterating on the listpack - * instead of relying on the meta and use this value for the - * final ebAdd call. - */ - hashTypeAddToExpires(db, key, o, minExpire); - } else if (rdbtype == RDB_TYPE_HASH_LISTPACK_EX) { - /* connect the listpack to the DB-global expiry data structure */ - if ((minExpire != EB_EXPIRE_TIME_INVALID) && (db != NULL)) { /* DB can be NULL when checking rdb */ - hashTypeAddToExpires(db, key, o, minExpire); - } - } break; default: /* totally unreachable */ @@ -3128,12 +3098,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags, rdbReportReadError("Unknown RDB encoding type %d",rdbtype); return NULL; } + + if (minExpiredField) *minExpiredField = minExpField; + if (error) *error = 0; return o; emptykey: if (error) *error = RDB_LOAD_ERR_EMPTY_KEY; return NULL; +expiredHash: + if (error) *error = RDB_LOAD_ERR_EXPIRED_HASH; + return NULL; } /* Mark that we are loading in the global state and setup the fields @@ -3303,6 +3279,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { * currently it only allow to set db object and functionLibCtx to which the data * will be loaded (in the future it might contains more such objects). */ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx) { + uint64_t minExpiredField = EB_EXPIRE_TIME_INVALID; uint64_t dbid = 0; int type, rdbver; uint64_t db_size = 0, expires_size = 0; @@ -3544,7 +3521,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin if ((key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) goto eoferr; /* Read value */ - val = rdbLoadObject(type,rdb,key,db,rdbflags,&error); + val = rdbLoadObject(type,rdb,key,db,&error, &minExpiredField); /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was @@ -3563,6 +3540,9 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin if(empty_keys_skipped++ < 10) serverLog(LL_NOTICE, "rdbLoadObject skipping empty key: %s", key); sdsfree(key); + } else if (error == RDB_LOAD_ERR_EXPIRED_HASH) { + /* Valid flow. Continue. */ + sdsfree(key); } else { sdsfree(key); goto eoferr; @@ -3607,6 +3587,11 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin } } + /* If minExpiredField was set, then the object is hash with expiration + * on fields and need to register it in global HFE DS */ + if (minExpiredField != EB_EXPIRE_TIME_INVALID) + hashTypeAddToExpires(db, key, val, minExpiredField); + /* Set the expire time if needed */ if (expiretime != -1) { setExpire(NULL,db,&keyobj,expiretime); @@ -3652,12 +3637,12 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin if (empty_keys_skipped) { serverLog(LL_NOTICE, - "Done loading RDB, keys loaded: %lld, keys expired: %lld, hash fields expired: %lld, empty keys skipped: %lld.", - server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired, server.rdb_last_load_hash_fields_expired, empty_keys_skipped); + "Done loading RDB, keys loaded: %lld, keys expired: %lld, empty keys skipped: %lld.", + server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired, empty_keys_skipped); } else { serverLog(LL_NOTICE, - "Done loading RDB, keys loaded: %lld, keys expired: %lld, hash fields expired: %lld.", - server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired, server.rdb_last_load_hash_fields_expired); + "Done loading RDB, keys loaded: %lld, keys expired: %lld.", + server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired); } return C_OK; diff --git a/src/rdb.h b/src/rdb.h index 2f4c49954..f34e139c1 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -103,11 +103,12 @@ #define RDB_MODULE_OPCODE_STRING 5 /* String. */ /* rdbLoad...() functions flags. */ -#define RDB_LOAD_NONE 0 -#define RDB_LOAD_ENC (1<<0) -#define RDB_LOAD_PLAIN (1<<1) -#define RDB_LOAD_SDS (1<<2) -#define RDB_LOAD_HFLD (1<<3) +#define RDB_LOAD_NONE 0 +#define RDB_LOAD_ENC (1<<0) +#define RDB_LOAD_PLAIN (1<<1) +#define RDB_LOAD_SDS (1<<2) +#define RDB_LOAD_HFLD (1<<3) +#define RDB_LOAD_HFLD_TTL (1<<4) /* flags on the purpose of rdb save or load */ #define RDBFLAGS_NONE 0 /* No special RDB loading or saving. */ @@ -119,8 +120,9 @@ /* When rdbLoadObject() returns NULL, the err flag is * set to hold the type of error that occurred */ -#define RDB_LOAD_ERR_EMPTY_KEY 1 /* Error of empty key */ -#define RDB_LOAD_ERR_OTHER 2 /* Any other errors */ +#define RDB_LOAD_ERR_EMPTY_KEY 1 /* Error of empty key */ +#define RDB_LOAD_ERR_EXPIRED_HASH 2 /* Expired hash since all its fields are expired */ +#define RDB_LOAD_ERR_OTHER 3 /* Any other errors */ ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len); int rdbSaveType(rio *rdb, unsigned char type); @@ -141,7 +143,7 @@ int rdbSaveToFile(const char *filename); int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags); ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid); size_t rdbSavedObjectLen(robj *o, robj *key, int dbid); -robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb *db, int rdbflags, int *error); +robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb *db, int *error, uint64_t *minExpiredField); void backgroundSaveDoneHandler(int exitcode, int bysignal); int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime,int dbid); ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt); diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index edadb7dd7..f364bf7b7 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -331,7 +331,8 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { rdbstate.keys++; /* Read value */ rdbstate.doing = RDB_CHECK_DOING_READ_OBJECT_VALUE; - if ((val = rdbLoadObject(type,&rdb,key->ptr,NULL,0,NULL)) == NULL) goto eoferr; + if ((val = rdbLoadObject(type,&rdb,key->ptr,NULL,NULL,NULL)) == NULL) + goto eoferr; /* Check if the key already expired. */ if (expiretime != -1 && expiretime < now) rdbstate.already_expired++; diff --git a/src/server.c b/src/server.c index 9e0c35412..563516879 100644 --- a/src/server.c +++ b/src/server.c @@ -1955,6 +1955,8 @@ void createSharedObjects(void) { shared.persist = createStringObject("PERSIST",7); shared.set = createStringObject("SET",3); shared.eval = createStringObject("EVAL",4); + shared.hpexpireat = createStringObject("HPEXPIREAT",10); + shared.hdel = createStringObject("HDEL",4); /* Shared command argument */ shared.left = createStringObject("left",4); @@ -2707,7 +2709,6 @@ void initServer(void) { server.rdb_save_time_start = -1; server.rdb_last_load_keys_expired = 0; server.rdb_last_load_keys_loaded = 0; - server.rdb_last_load_hash_fields_expired = 0; server.dirty = 0; resetServerStats(); /* A few stats we don't want to reset: server startup time, and peak mem. */ @@ -5771,7 +5772,6 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "rdb_last_cow_size:%zu\r\n", server.stat_rdb_cow_bytes, "rdb_last_load_keys_expired:%lld\r\n", server.rdb_last_load_keys_expired, "rdb_last_load_keys_loaded:%lld\r\n", server.rdb_last_load_keys_loaded, - "rdb_last_load_hash_fields_expired:%lld\r\n", server.rdb_last_load_hash_fields_expired, "aof_enabled:%d\r\n", server.aof_state != AOF_OFF, "aof_rewrite_in_progress:%d\r\n", server.child_type == CHILD_TYPE_AOF, "aof_rewrite_scheduled:%d\r\n", server.aof_rewrite_scheduled, diff --git a/src/server.h b/src/server.h index c433f5dab..9db58285f 100644 --- a/src/server.h +++ b/src/server.h @@ -1317,7 +1317,8 @@ struct sharedObjectsStruct { *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, *rpop, *lpop, *lpush, *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax, *emptyscan, *multi, *exec, *left, *right, *hset, *srem, *xgroup, *xclaim, - *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, + *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, + *hdel, *hpexpireat, *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack, *special_asterick, *special_equals, *default_username, *redacted, @@ -1802,7 +1803,6 @@ struct redisServer { long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */ long long rdb_last_load_keys_expired; /* number of expired keys when loading RDB */ long long rdb_last_load_keys_loaded; /* number of loaded keys when loading RDB */ - long long rdb_last_load_hash_fields_expired; /* number of expired hash fields when loading RDB */ struct saveparam *saveparams; /* Save points array for RDB */ int saveparamslen; /* Number of saving points */ char *rdb_filename; /* Name of RDB file */ @@ -3166,6 +3166,20 @@ typedef struct listpackEx { are ordered by ttl. */ } listpackEx; +/* Each dict of hash object that has fields with time-Expiration will have the + * following metadata attached to dict header */ +typedef struct dictExpireMetadata { + ExpireMeta expireMeta; /* embedded ExpireMeta in dict. + To be used in order to register the hash in the + global ebuckets (i.e db->hexpires) with next, + minimum, hash-field to expire */ + ebuckets hfe; /* DS of Hash Fields Expiration, associated to each hash */ + sds key; /* reference to the key, same one that stored in + db->dict. Will be used from active-expiration flow + for notification and deletion of the object, if + needed. */ +} dictExpireMetadata; + /* Hash data type */ #define HASH_SET_TAKE_FIELD (1<<0) #define HASH_SET_TAKE_VALUE (1<<1) @@ -3173,8 +3187,8 @@ typedef struct listpackEx { void hashTypeConvert(robj *o, int enc, ebuckets *hexpires); void hashTypeTryConversion(redisDb *db, robj *subject, robj **argv, int start, int end); -int hashTypeExists(robj *o, sds key); -int hashTypeDelete(robj *o, sds key); +int hashTypeExists(redisDb *db, robj *o, sds key, int *isHashDeleted); +int hashTypeDelete(robj *o, void *key, int isSdsField); unsigned long hashTypeLength(const robj *o, int subtractExpiredFields); hashTypeIterator *hashTypeInitIterator(robj *subject); void hashTypeReleaseIterator(hashTypeIterator *hi); @@ -3190,24 +3204,24 @@ void hashTypeCurrentObject(hashTypeIterator *hi, int what, unsigned char **vstr, unsigned int *vlen, long long *vll, uint64_t *expireTime); sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what); hfield hashTypeCurrentObjectNewHfield(hashTypeIterator *hi); -robj *hashTypeGetValueObject(robj *o, sds field); +robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field, int *isHashDeleted); int hashTypeSet(redisDb *db, robj *o, sds field, sds value, int flags); robj *hashTypeDup(robj *o, sds newkey, uint64_t *minHashExpire); uint64_t hashTypeRemoveFromExpires(ebuckets *hexpires, robj *o); void hashTypeAddToExpires(redisDb *db, sds key, robj *hashObj, uint64_t expireTime); void hashTypeFree(robj *o); int hashTypeIsExpired(const robj *o, uint64_t expireAt); +uint64_t hashTypeGetMinExpire(robj *o); unsigned char *hashTypeListpackGetLp(robj *o); uint64_t hashTypeGetMinExpire(robj *o); void hashTypeUpdateKeyRef(robj *o, sds newkey); ebuckets *hashTypeGetDictMetaHFE(dict *d); -void listpackExExpire(robj *o, ExpireInfo *info); -int hashTypeSetExRdb(redisDb *db, robj *o, sds field, sds value, uint64_t expire_at); uint64_t hashTypeGetMinExpire(robj *keyObj); uint64_t hashTypeGetNextTimeToExpire(robj *o); void initDictExpireMetadata(sds key, robj *o); struct listpackEx *listpackExCreate(void); -void listpackExAddNew(robj *o, sds field, sds value, uint64_t expireAt); +void listpackExAddNew(robj *o, char *field, size_t flen, + char *value, size_t vlen, uint64_t expireAt); /* Hash-Field data type (of t_hash.c) */ hfield hfieldNew(const void *field, size_t fieldlen, int withExpireMeta); diff --git a/src/sort.c b/src/sort.c index 426ff0c1d..d45c380ac 100644 --- a/src/sort.c +++ b/src/sort.c @@ -94,7 +94,12 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { /* Retrieve value from hash by the field name. The returned object * is a new object with refcount already incremented. */ - o = hashTypeGetValueObject(o, fieldobj->ptr); + int isHashDeleted; + o = hashTypeGetValueObject(db, o, fieldobj->ptr, &isHashDeleted); + + if (isHashDeleted) + goto noobj; + } else { if (o->type != OBJ_STRING) goto noobj; diff --git a/src/t_hash.c b/src/t_hash.c index 8f5f0ee3f..a4b182d91 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -14,6 +14,18 @@ * update the expiration time of the hash object in global HFE DS. */ #define HASH_NEW_EXPIRE_DIFF_THRESHOLD max(4000, 1<hexpires) with next, - minimum, hash-field to expire */ - ebuckets hfe; /* DS of Hash Fields Expiration, associated to each hash */ - sds key; /* reference to the key, same one that stored in - db->dict. Will be used from active-expiration flow - for notification and deletion of the object, if - needed. */ -} dictExpireMetadata; - /* ActiveExpireCtx passed to hashTypeActiveExpire() */ typedef struct ActiveExpireCtx { uint32_t fieldsToExpireQuota; redisDb *db; } ActiveExpireCtx; +/* OnFieldExpireCtx passed to OnFieldExpire() */ +typedef struct OnFieldExpireCtx { + robj *hashObj; + redisDb *db; +} OnFieldExpireCtx; + /* The implementation of hashes by dict was modified from storing fields as sds * strings to store "mstr" (Immutable string with metadata) in order to be able to * attach TTL (ExpireMeta) to the hash-field. This usage of mstr opens up the @@ -217,7 +222,6 @@ typedef struct HashTypeSetEx { /*** config ***/ FieldSetCond fieldSetCond; /* [DCF | DOF] */ ExpireSetCond expireSetCond; /* [XX | NX | GT | LT] */ - FieldGet fieldGet; /* [GETNEW | GETOLD] TODO */ /*** metadata ***/ uint64_t minExpire; /* if uninit EB_EXPIRE_TIME_INVALID */ @@ -235,12 +239,11 @@ typedef struct HashTypeSetEx { const char *cmd; } HashTypeSetEx; -static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeSet *s, - uint64_t expireAt, HashTypeSetEx *ex); +static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeSet *setParams, + uint64_t expireAt, HashTypeSetEx *exParams); int hashTypeSetExInit(robj *key, robj *o, client *c, redisDb *db, const char *cmd, - FieldSetCond fieldSetCond, FieldGet fieldGet, - ExpireSetCond expireSetCond, HashTypeSetEx *ex); + FieldSetCond fieldSetCond, ExpireSetCond expireSetCond, HashTypeSetEx *ex); SetExRes hashTypeSetEx(redisDb *db, robj *o, sds field, HashTypeSet *setKeyVal, uint64_t expireAt, HashTypeSetEx *exInfo); @@ -406,22 +409,41 @@ static uint64_t listpackExGetMinExpire(robj *o) { } /* Walk over fields and delete the expired ones. */ -void listpackExExpire(robj *o, ExpireInfo *info) { +void listpackExExpire(redisDb *db, robj *o, ExpireInfo *info) { serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX); - uint64_t min = EB_EXPIRE_TIME_INVALID; + uint64_t expired = 0, min = EB_EXPIRE_TIME_INVALID; + unsigned char *ptr; listpackEx *lpt = o->ptr; - struct lpFingArgs r = { - .max_to_search = info->maxToExpire, - .expire_time = info->now - }; + ptr = lpFirst(lpt->lp); - lpFindCb(lpt->lp, NULL, &r, cbFindInListpack, 0); - info->itemsExpired += r.expired; + while (ptr != NULL && (info->itemsExpired < info->maxToExpire)) { + long long val; + int64_t flen; + unsigned char intbuf[LP_INTBUF_SIZE], *fref; - /* Delete all the expired fields in one go */ - if (r.expired > 0) - lpt->lp = lpDeleteRange(lpt->lp, 0, r.expired * 3); + fref = lpGet(ptr, &flen, intbuf); + + ptr = lpNext(lpt->lp, ptr); + serverAssert(ptr); + ptr = lpNext(lpt->lp, ptr); + serverAssert(ptr && lpGetIntegerValue(ptr, &val)); + + /* Fields are ordered by expiry time. If we reached to a non-expired + * or a non-volatile field, we know rest is not yet expired. */ + if (val == HASH_LP_NO_TTL || (uint64_t) val > info->now) + break; + + propagateHashFieldDeletion(db, ((listpackEx *) o->ptr)->key, (char *)((fref) ? fref : intbuf), flen); + + ptr = lpNext(lpt->lp, ptr); + + info->itemsExpired++; + expired++; + } + + if (expired) + lpt->lp = lpDeleteRange(lpt->lp, 0, expired * 3); min = hashTypeGetNextTimeToExpire(o); info->nextExpireTime = (min != EB_EXPIRE_TIME_INVALID) ? min : 0; @@ -453,10 +475,11 @@ static void listpackExAddInternal(robj *o, listpackEntry ent[3]) { } /* Add new field ordered by expire time. */ -void listpackExAddNew(robj *o, sds field, sds value, uint64_t expireAt) { +void listpackExAddNew(robj *o, char *field, size_t flen, + char *value, size_t vlen, uint64_t expireAt) { listpackEntry ent[3] = { - {.sval = (unsigned char*) field, .slen = sdslen(field)}, - {.sval = (unsigned char*) value, .slen = sdslen(value)}, + {.sval = (unsigned char*) field, .slen = flen}, + {.sval = (unsigned char*) value, .slen = vlen}, {.lval = expireAt} }; @@ -524,7 +547,8 @@ SetExRes hashTypeSetExpiryListpack(HashTypeSetEx *ex, sds field, } if (prevExpire == EB_EXPIRE_TIME_INVALID) { - if (ex->expireSetCond & (HFE_XX | HFE_LT | HFE_GT)) + /* For fields without expiry, LT condition is considered valid */ + if (ex->expireSetCond & (HFE_XX | HFE_GT)) return HSETEX_NO_CONDITION_MET; } else { if (((ex->expireSetCond == HFE_GT) && (prevExpire >= expireAt)) || @@ -539,7 +563,7 @@ SetExRes hashTypeSetExpiryListpack(HashTypeSetEx *ex, sds field, /* if expiration time is in the past */ if (unlikely(checkAlreadyExpired(expireAt))) { - hashTypeDelete(ex->hashObj, field); + hashTypeDelete(ex->hashObj, field, 1); ex->fieldDeleted++; return HSETEX_DELETED; } @@ -615,13 +639,14 @@ void hashTypeTryConversion(redisDb *db, robj *o, robj **argv, int start, int end hashTypeConvert(o, OBJ_ENCODING_HT, &db->hexpires); } -/* Get the value from a listpack encoded hash, identified by field. - * Returns -1 when the field cannot be found. */ -int hashTypeGetFromListpack(robj *o, sds field, +/* Get the value from a listpack encoded hash, identified by field. */ +GetFieldRes hashTypeGetFromListpack(robj *o, sds field, unsigned char **vstr, unsigned int *vlen, - long long *vll) + long long *vll, + uint64_t *expiredAt) { + *expiredAt = EB_EXPIRE_TIME_INVALID; unsigned char *zl, *fptr = NULL, *vptr = NULL; if (o->encoding == OBJ_ENCODING_LISTPACK) { @@ -649,9 +674,8 @@ int hashTypeGetFromListpack(robj *o, sds field, h = lpNext(lpt->lp, vptr); serverAssert(h && lpGetIntegerValue(h, &expire)); - - if (hashTypeIsExpired(o, expire)) - return -1; + if (expire != HASH_LP_NO_TTL) + *expiredAt = expire; } } } else { @@ -660,96 +684,150 @@ int hashTypeGetFromListpack(robj *o, sds field, if (vptr != NULL) { *vstr = lpGetValue(vptr, vlen, vll); - return 0; + return GETF_OK; } - return -1; + return GETF_NOT_FOUND; } /* Get the value from a hash table encoded hash, identified by field. * Returns NULL when the field cannot be found, otherwise the SDS value * is returned. */ -sds hashTypeGetFromHashTable(robj *o, sds field) { +GetFieldRes hashTypeGetFromHashTable(robj *o, sds field, sds *value, uint64_t *expiredAt) { dictEntry *de; + *expiredAt = EB_EXPIRE_TIME_INVALID; + serverAssert(o->encoding == OBJ_ENCODING_HT); de = dictFind(o->ptr, field); - if (de == NULL) return NULL; + if (de == NULL) + return GETF_NOT_FOUND; - /* Check if the field is expired */ - if (hfieldIsExpired(dictGetKey(de))) return NULL; - - return dictGetVal(de); + *expiredAt = hfieldGetExpireTime(dictGetKey(de)); + *value = (sds) dictGetVal(de); + return GETF_OK; } /* Higher level function of hashTypeGet*() that returns the hash value - * associated with the specified field. If the field is found C_OK - * is returned, otherwise C_ERR. The returned object is returned by - * reference in either *vstr and *vlen if it's returned in string form, - * or stored in *vll if it's returned as a number. + * associated with the specified field. * - * If *vll is populated *vstr is set to NULL, so the caller - * can always check the function return by checking the return value - * for C_OK and checking if vll (or vstr) is NULL. */ -int hashTypeGetValue(robj *o, sds field, unsigned char **vstr, unsigned int *vlen, long long *vll) { + * Returned: + * - GetFieldRes: OK: Return Field's valid value + * NOT_FOUND: Field was not found. + * EXPIRED: Field is expired and Lazy deleted + * EXPIRED_HASH: Returned only if the field is the last one in the + * hash and the hash is deleted. + * - vstr, vlen : if string, ref in either *vstr and *vlen if it's + * returned in string form, + * - vll : or stored in *vll if it's returned as a number. + * If *vll is populated *vstr is set to NULL, so the caller can + * always check the function return by checking the return value + * for GETF_OK and checking if vll (or vstr) is NULL. + * + */ +GetFieldRes hashTypeGetValue(redisDb *db, robj *o, sds field, unsigned char **vstr, + unsigned int *vlen, long long *vll) { + uint64_t expiredAt; + sds key; + GetFieldRes res; if (o->encoding == OBJ_ENCODING_LISTPACK || - o->encoding == OBJ_ENCODING_LISTPACK_EX) - { + o->encoding == OBJ_ENCODING_LISTPACK_EX) { *vstr = NULL; - if (hashTypeGetFromListpack(o, field, vstr, vlen, vll) == 0) - return C_OK; + res = hashTypeGetFromListpack(o, field, vstr, vlen, vll, &expiredAt); + + if (res == GETF_NOT_FOUND) + return GETF_NOT_FOUND; + } else if (o->encoding == OBJ_ENCODING_HT) { - sds value; - if ((value = hashTypeGetFromHashTable(o, field)) != NULL) { - *vstr = (unsigned char*) value; - *vlen = sdslen(value); - return C_OK; - } + sds value = NULL; + res = hashTypeGetFromHashTable(o, field, &value, &expiredAt); + + if (res == GETF_NOT_FOUND) + return GETF_NOT_FOUND; + + *vstr = (unsigned char*) value; + *vlen = sdslen(value); } else { serverPanic("Unknown hash encoding"); } - return C_ERR; + + /* Don't expire anything while loading. It will be done later. */ + if ( (server.loading) || + (server.lazy_expire_disabled) || + ((server.masterhost) && (server.current_client && (server.current_client->flags & CLIENT_MASTER))) || + (expiredAt >= (uint64_t) commandTimeSnapshot()) ) + return GETF_OK; + + /* Got expired. Extract attached key from LISTPACK_EX/HT */ + if (o->encoding == OBJ_ENCODING_LISTPACK_EX) + key = ((listpackEx *) o->ptr)->key; + else + key = ((dictExpireMetadata *) dictMetadata((dict*)o->ptr))->key; + + /* delete the field and propagate the deletion */ + serverAssert(hashTypeDelete(o, field, 1) == 1); + propagateHashFieldDeletion(db, key, field, sdslen(field)); + + /* If the field is the last one in the hash, then the hash will be deleted */ + if (hashTypeLength(o, 0) == 0) { + robj *keyObj = createStringObject(key, sdslen(key)); + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", keyObj, db->id); + dbDelete(db,keyObj); + decrRefCount(keyObj); + return GETF_EXPIRED_HASH; + } + + return GETF_EXPIRED; } /* Like hashTypeGetValue() but returns a Redis object, which is useful for * interaction with the hash type outside t_hash.c. * The function returns NULL if the field is not found in the hash. Otherwise - * a newly allocated string object with the value is returned. */ -robj *hashTypeGetValueObject(robj *o, sds field) { + * a newly allocated string object with the value is returned. + * + * isHashDeleted - If attempted to access expired field and it's the last field + * in the hash, then the hash will as well be deleted. In this case, + * isHashDeleted will be set to 1. + */ +robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field, int *isHashDeleted) { unsigned char *vstr; unsigned int vlen; long long vll; - if (hashTypeGetValue(o,field,&vstr,&vlen,&vll) == C_ERR) return NULL; - if (vstr) return createStringObject((char*)vstr,vlen); - else return createStringObjectFromLongLong(vll); + *isHashDeleted = 0; /*default*/ + GetFieldRes res = hashTypeGetValue(db,o,field,&vstr,&vlen,&vll); + + if (res == GETF_OK) { + if (vstr) return createStringObject((char*)vstr,vlen); + else return createStringObjectFromLongLong(vll); + } + + if (res == GETF_EXPIRED_HASH) + *isHashDeleted = 1; + + /* GETF_EXPIRED_HASH, GETF_EXPIRED, GETF_NOT_FOUND */ + return NULL; } -/* Higher level function using hashTypeGet*() to return the length of the - * object associated with the requested field, or 0 if the field does not - * exist. */ -size_t hashTypeGetValueLength(robj *o, sds field) { - size_t len = 0; +/* Test if the specified field exists in the given hash. If the field is + * expired (HFE), then it will be lazy deleted + * + * Returns 1 if the field exists, and 0 when it doesn't. + * + * isHashDeleted - If attempted to access expired field and it is the last field + * in the hash, then the hash will as well be deleted. In this case, + * isHashDeleted will be set to 1. + */ +int hashTypeExists(redisDb *db, robj *o, sds field, int *isHashDeleted) { unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; - if (hashTypeGetValue(o, field, &vstr, &vlen, &vll) == C_OK) - len = vstr ? vlen : sdigits10(vll); - - return len; -} - -/* Test if the specified field exists in the given hash. Returns 1 if the field - * exists, and 0 when it doesn't. */ -int hashTypeExists(robj *o, sds field) { - unsigned char *vstr = NULL; - unsigned int vlen = UINT_MAX; - long long vll = LLONG_MAX; - - return hashTypeGetValue(o, field, &vstr, &vlen, &vll) == C_OK; + GetFieldRes res = hashTypeGetValue(db, o, field, &vstr, &vlen, &vll); + *isHashDeleted = (res == GETF_EXPIRED_HASH) ? 1 : 0; + return (res == GETF_OK) ? 1 : 0; } /* Add a new field, overwrite the old with the new value if it already exists. @@ -759,8 +837,9 @@ int hashTypeExists(robj *o, sds field) { * caller retains ownership of the strings passed. However this behavior * can be effected by passing appropriate flags (possibly bitwise OR-ed): * - * HASH_SET_TAKE_FIELD -- The SDS field ownership passes to the function. - * HASH_SET_TAKE_VALUE -- The SDS value ownership passes to the function. + * HASH_SET_TAKE_FIELD -- The SDS field ownership passes to the function. + * HASH_SET_TAKE_VALUE -- The SDS value ownership passes to the function. + * HASH_SET_KEEP_FIELD -- keep original field along with TTL if already exists * * When the flags are used the caller does not need to release the passed * SDS string(s). It's up to the function to use the string to create a new @@ -770,8 +849,9 @@ int hashTypeExists(robj *o, sds field) { * semantics of copying the values if needed. * */ -#define HASH_SET_TAKE_FIELD (1<<0) -#define HASH_SET_TAKE_VALUE (1<<1) +#define HASH_SET_TAKE_FIELD (1<<0) +#define HASH_SET_TAKE_VALUE (1<<1) +#define HASH_SET_KEEP_FIELD (1<<2) #define HASH_SET_COPY 0 int hashTypeSet(redisDb *db, robj *o, sds field, sds value, int flags) { HashTypeSet set = {value, flags}; @@ -817,7 +897,8 @@ SetExRes hashTypeSetExpiry(HashTypeSetEx *ex, sds field, uint64_t expireAt, dict /* If field doesn't have expiry metadata attached */ if (!hfieldIsExpireAttached(hfOld)) { - if (ex->expireSetCond & (HFE_XX | HFE_LT | HFE_GT)) { + /* For fields without expiry, LT condition is considered valid */ + if (ex->expireSetCond & (HFE_XX | HFE_GT)) { hfieldFree(hfNew); return HSETEX_NO_CONDITION_MET; } @@ -851,7 +932,7 @@ SetExRes hashTypeSetExpiry(HashTypeSetEx *ex, sds field, uint64_t expireAt, dict /* field has invalid expiry. No need to ebRemove() */ /* Check XX|LT|GT */ - if (ex->expireSetCond & (HFE_XX | HFE_LT | HFE_GT)) + if (ex->expireSetCond & (HFE_XX | HFE_GT)) return HSETEX_NO_CONDITION_MET; } @@ -864,7 +945,7 @@ SetExRes hashTypeSetExpiry(HashTypeSetEx *ex, sds field, uint64_t expireAt, dict /* if expiration time is in the past */ if (unlikely(checkAlreadyExpired(expireAt))) { - hashTypeDelete(ex->hashObj, field); + hashTypeDelete(ex->hashObj, field, 1); ex->fieldDeleted++; return HSETEX_DELETED; } @@ -898,23 +979,23 @@ SetExRes hashTypeSetEx(redisDb *db, robj *o, sds field, HashTypeSet *setKeyVal, /* Check if the field is too long for listpack, and convert before adding the item. * This is needed for HINCRBY* case since in other commands this is handled early by * hashTypeTryConversion, so this check will be a NOP. */ - if (isSetKeyValue && (o->encoding == OBJ_ENCODING_LISTPACK || - o->encoding == OBJ_ENCODING_LISTPACK_EX)) - { - if (sdslen(field) > server.hash_max_listpack_value || - sdslen(setKeyVal->value) > server.hash_max_listpack_value) - hashTypeConvert(o, OBJ_ENCODING_HT, &db->hexpires); - } - if (o->encoding == OBJ_ENCODING_LISTPACK || o->encoding == OBJ_ENCODING_LISTPACK_EX) { - res = hashTypeSetExListpack(db, o, field, setKeyVal, expireAt, exInfo); - goto SetExDone; - } else if (o->encoding != OBJ_ENCODING_HT) { - serverPanic("Unknown hash encoding"); + if ( (isSetKeyValue) && + (sdslen(field) > server.hash_max_listpack_value || + sdslen(setKeyVal->value) > server.hash_max_listpack_value) ) + { + hashTypeConvert(o, OBJ_ENCODING_HT, &db->hexpires); + } else { + res = hashTypeSetExListpack(db, o, field, setKeyVal, expireAt, exInfo); + goto SetExDone; /*done*/ + } } + if (o->encoding != OBJ_ENCODING_HT) + serverPanic("Unknown hash encoding"); + /*** now deal with HT ***/ hfield newField; dict *ht = o->ptr; @@ -936,13 +1017,17 @@ SetExRes hashTypeSetEx(redisDb *db, robj *o, sds field, HashTypeSet *setKeyVal, /* If field already exists, then update "field". "Value" will be set afterward */ if (de == NULL) { - /* If attached TTL to the old field, then remove it from hash's private ebuckets */ - hfield oldField = dictGetKey(existing); - hfieldPersist(o, oldField); - - hfieldFree(oldField); + if (flags & HASH_SET_KEEP_FIELD) { + /* Not keep old field along with TTL */ + hfieldFree(newField); + } else { + /* If attached TTL to the old field, then remove it from hash's private ebuckets */ + hfield oldField = dictGetKey(existing); + hfieldPersist(o, oldField); + hfieldFree(oldField); + dictSetKey(ht, existing, newField); + } sdsfree(dictGetVal(existing)); - dictSetKey(ht, existing, newField); res = HSET_UPDATE; de = existing; } @@ -966,33 +1051,6 @@ SetExDone: return res; } -/* - * hashTypeSetExRdb provide a simplified API for setting fields & expiry by RDB load - * - * It is the duty of RDB reading process to track minimal expiration time of the - * fields and eventually call hashTypeAddToExpires() to update global HFE DS with - * next expiration time. - * - * To just add a field with no expiry, use hashTypeSet instead. - */ -int hashTypeSetExRdb(redisDb *db, robj *o, sds field, sds value, uint64_t expire_at) { - /* Dummy struct to be used in hashTypeSetEx() */ - HashTypeSetEx setEx = { - .fieldSetCond = FIELD_DONT_OVRWRT, /* Shouldn't be any duplication */ - .expireSetCond = HFE_NX, /* Should set expiry once each field */ - .minExpire = EB_EXPIRE_TIME_INVALID, /* Won't be used. Accounting made by RDB already */ - .key = NULL, /* Not going to call hashTypeSetExDone() */ - .hashObj = o, - .minExpireFields = EB_EXPIRE_TIME_INVALID, /* Not needed by RDB */ - .c = NULL, /* No notification required */ - .cmd = NULL, /* No notification required */ - }; - - HashTypeSet setKeyVal = {.value = value, .flags = 0}; - SetExRes res = hashTypeSetEx(db, o, field, &setKeyVal, expire_at, (expire_at) ? &setEx : NULL); - return (res == HSETEX_OK || res == HSET_UPDATE) ? C_OK : C_ERR; -} - void initDictExpireMetadata(sds key, robj *o) { dict *ht = o->ptr; @@ -1009,13 +1067,11 @@ void initDictExpireMetadata(sds key, robj *o) { * done by function hashTypeSetExDone(). */ int hashTypeSetExInit(robj *key, robj *o, client *c, redisDb *db, const char *cmd, FieldSetCond fieldSetCond, - FieldGet fieldGet, ExpireSetCond expireSetCond, - HashTypeSetEx *ex) + ExpireSetCond expireSetCond, HashTypeSetEx *ex) { dict *ht = o->ptr; ex->fieldSetCond = fieldSetCond; - ex->fieldGet = fieldGet; /* TODO */ ex->expireSetCond = expireSetCond; ex->minExpire = EB_EXPIRE_TIME_INVALID; ex->c = c; @@ -1027,6 +1083,7 @@ int hashTypeSetExInit(robj *key, robj *o, client *c, redisDb *db, const char *cm ex->fieldUpdated = 0; ex->minExpireFields = EB_EXPIRE_TIME_INVALID; + /* Take care that HASH support expiration */ if (ex->hashObj->encoding == OBJ_ENCODING_LISTPACK) { hashTypeConvert(ex->hashObj, OBJ_ENCODING_LISTPACK_EX, &c->db->hexpires); @@ -1107,13 +1164,16 @@ void hashTypeSetExDone(HashTypeSetEx *ex) { /* Check if the field is too long for listpack, and convert before adding the item. * This is needed for HINCRBY* case since in other commands this is handled early by * hashTypeTryConversion, so this check will be a NOP. */ -static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeSet *s, - uint64_t expireAt, HashTypeSetEx *ex) +static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeSet *setParams, + uint64_t expireAt, HashTypeSetEx *exParams) { int res = HSETEX_OK; unsigned char *fptr = NULL, *vptr = NULL, *tptr = NULL; if (o->encoding == OBJ_ENCODING_LISTPACK) { + /* If reached here, then no need to set expiration. Otherwise, as precond + * listpack is converted to listpackex by hashTypeSetExInit() */ + unsigned char *zl = o->ptr; fptr = lpFirst(zl); if (fptr != NULL) { @@ -1125,14 +1185,14 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS res = HSET_UPDATE; /* Replace value */ - zl = lpReplace(zl, &vptr, (unsigned char *) s->value, sdslen(s->value)); + zl = lpReplace(zl, &vptr, (unsigned char *) setParams->value, sdslen(setParams->value)); } } if (res != HSET_UPDATE) { /* Push new field/value pair onto the tail of the listpack */ zl = lpAppend(zl, (unsigned char*)field, sdslen(field)); - zl = lpAppend(zl, (unsigned char*)s->value, sdslen(s->value)); + zl = lpAppend(zl, (unsigned char*)setParams->value, sdslen(setParams->value)); } o->ptr = zl; goto out; @@ -1148,11 +1208,11 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS vptr = lpNext(lpt->lp, fptr); serverAssert(vptr != NULL); - if (s) { + if (setParams) { /* Replace value */ lpt->lp = lpReplace(lpt->lp, &vptr, - (unsigned char *) s->value, - sdslen(s->value)); + (unsigned char *) setParams->value, + sdslen(setParams->value)); fptr = lpPrev(lpt->lp, vptr); serverAssert(fptr != NULL); @@ -1161,8 +1221,11 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS tptr = lpNext(lpt->lp, vptr); serverAssert(tptr && lpGetIntegerValue(tptr, &expireTime)); - if (ex) { - res = hashTypeSetExpiryListpack(ex, field, fptr, vptr, tptr, + /* Keep, update or clear TTL */ + if (setParams && setParams->flags & HASH_SET_KEEP_FIELD) { + /* keep old field along with TTL */ + } else if (exParams) { + res = hashTypeSetExpiryListpack(exParams, field, fptr, vptr, tptr, expireAt); if (res != HSETEX_OK) goto out; @@ -1174,9 +1237,10 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS } if (!fptr) { - if (s) { - listpackExAddNew(o, field, s->value, - ex ? expireAt : HASH_LP_NO_TTL); + if (setParams) { + listpackExAddNew(o, field, sdslen(field), + setParams->value, sdslen(setParams->value), + exParams ? expireAt : HASH_LP_NO_TTL); } else { res = HSETEX_NO_FIELD; } @@ -1191,9 +1255,12 @@ out: } /* Delete an element from a hash. - * Return 1 on deleted and 0 on not found. */ -int hashTypeDelete(robj *o, sds field) { + * + * Return 1 on deleted and 0 on not found. + * isSdsField - 1 if the field is sds, 0 if it is hfield */ +int hashTypeDelete(robj *o, void *field, int isSdsField) { int deleted = 0; + int fieldLen = (isSdsField) ? sdslen((sds)field) : hfieldlen((hfield)field); if (o->encoding == OBJ_ENCODING_LISTPACK) { unsigned char *zl, *fptr; @@ -1201,7 +1268,7 @@ int hashTypeDelete(robj *o, sds field) { zl = o->ptr; fptr = lpFirst(zl); if (fptr != NULL) { - fptr = lpFind(zl, fptr, (unsigned char*)field, sdslen(field), 1); + fptr = lpFind(zl, fptr, (unsigned char*)field, fieldLen, 1); if (fptr != NULL) { /* Delete both of the key and the value. */ zl = lpDeleteRangeWithEntry(zl,&fptr,2); @@ -1215,7 +1282,7 @@ int hashTypeDelete(robj *o, sds field) { fptr = lpFirst(lpt->lp); if (fptr != NULL) { - fptr = lpFind(lpt->lp, fptr, (unsigned char*)field, sdslen(field), 2); + fptr = lpFind(lpt->lp, fptr, (unsigned char*)field, fieldLen, 2); if (fptr != NULL) { /* Delete field, value and ttl */ lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &fptr, 3); @@ -1224,9 +1291,11 @@ int hashTypeDelete(robj *o, sds field) { } } else if (o->encoding == OBJ_ENCODING_HT) { /* dictDelete() will call dictHfieldDestructor() */ + dictUseStoredKeyApi((dict*)o->ptr, isSdsField ? 0 : 1); if (dictDelete((dict*)o->ptr, field) == C_OK) { deleted = 1; } + dictUseStoredKeyApi((dict*)o->ptr, 0); } else { serverPanic("Unknown hash encoding"); @@ -1296,6 +1365,7 @@ void hashTypeReleaseIterator(hashTypeIterator *hi) { /* Move to the next entry in the hash. Return C_OK when the next entry * could be found and C_ERR when the iterator reaches the end. */ int hashTypeNext(hashTypeIterator *hi, int skipExpiredFields) { + hi->expire_time = EB_EXPIRE_TIME_INVALID; if (hi->encoding == OBJ_ENCODING_LISTPACK) { unsigned char *zl; unsigned char *fptr, *vptr; @@ -1363,8 +1433,11 @@ int hashTypeNext(hashTypeIterator *hi, int skipExpiredFields) { hi->tptr = tptr; hi->expire_time = (expire_time != HASH_LP_NO_TTL) ? (uint64_t) expire_time : EB_EXPIRE_TIME_INVALID; } else if (hi->encoding == OBJ_ENCODING_HT) { + while ((hi->de = dictNext(hi->di)) != NULL) { - if (skipExpiredFields && hfieldIsExpired(dictGetKey(hi->de))) + hi->expire_time = hfieldGetExpireTime(dictGetKey(hi->de)); + /* this condition still valid if expire_time equals EB_EXPIRE_TIME_INVALID */ + if (skipExpiredFields && ((mstime_t)hi->expire_time < commandTimeSnapshot())) continue; return C_OK; } @@ -1417,11 +1490,8 @@ void hashTypeCurrentFromHashTable(hashTypeIterator *hi, int what, char **str, si *len = sdslen(val); } - if (expireTime) { - if (!key) key = dictGetKey(hi->de); - *expireTime = hfieldGetExpireTime( key ); - } - + if (expireTime) + *expireTime = hi->expire_time; } /* Higher level function of hashTypeCurrent*() that returns the hash value @@ -1446,7 +1516,6 @@ void hashTypeCurrentObject(hashTypeIterator *hi, { *vstr = NULL; hashTypeCurrentFromListpack(hi, what, vstr, vlen, vll, expireTime); - /* TODO-HFE: Handle expireTime */ } else if (hi->encoding == OBJ_ENCODING_HT) { char *ele; size_t eleLen; @@ -1534,7 +1603,6 @@ void hashTypeConvertListpack(robj *o, int enc) { dict = dictCreate(&mstrHashDictType); /* Presize the dict to avoid rehashing */ - /* TODO: activeExpire list pack. Should be small */ dictExpand(dict,hashTypeLength(o, 0)); while (hashTypeNext(hi, 0) != C_ERR) { @@ -1618,7 +1686,7 @@ void hashTypeConvertListpackEx(robj *o, int enc, ebuckets *hexpires) { } } -/* NOTE: hexpires can be NULL (Won't attempt to register in global HFE DS) */ +/* NOTE: hexpires can be NULL (Won't register in global HFE DS) */ void hashTypeConvert(robj *o, int enc, ebuckets *hexpires) { if (o->encoding == OBJ_ENCODING_LISTPACK) { hashTypeConvertListpack(o, enc); @@ -1785,11 +1853,10 @@ static ExpireAction hashTypeActiveExpire(eItem _hashObj, void *ctx) { if (hashObj->encoding == OBJ_ENCODING_LISTPACK_EX) { info = (ExpireInfo){ .maxToExpire = activeExpireCtx->fieldsToExpireQuota, - .ctx = hashObj, .now = commandTimeSnapshot(), .itemsExpired = 0}; - listpackExExpire(hashObj, &info); + listpackExExpire(activeExpireCtx->db, hashObj, &info); server.stat_expired_hash_fields += info.itemsExpired; keystr = ((listpackEx*)hashObj->ptr)->key; } else { @@ -1798,11 +1865,16 @@ static ExpireAction hashTypeActiveExpire(eItem _hashObj, void *ctx) { dict *d = hashObj->ptr; dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *) dictMetadata(d); + OnFieldExpireCtx onFieldExpireCtx = { + .hashObj = hashObj, + .db = activeExpireCtx->db + }; + info = (ExpireInfo){ - .maxToExpire = activeExpireCtx->fieldsToExpireQuota, - .onExpireItem = onFieldExpire, - .ctx = hashObj, - .now = commandTimeSnapshot() + .maxToExpire = activeExpireCtx->fieldsToExpireQuota, + .onExpireItem = onFieldExpire, + .ctx = &onFieldExpireCtx, + .now = commandTimeSnapshot() }; ebExpire(&dictExpireMeta->hfe, &hashFieldExpireBucketsType, &info); @@ -1817,7 +1889,6 @@ static ExpireAction hashTypeActiveExpire(eItem _hashObj, void *ctx) { if (hashTypeLength(hashObj, 0) == 0) { robj *key = createStringObject(keystr, sdslen(keystr)); dbDelete(activeExpireCtx->db, key); - //notifyKeyspaceEvent(NOTIFY_HASH,"xxxxxxxxx",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key, activeExpireCtx->db->id); server.dirty++; signalModifiedKey(NULL, &server.db[0], key); @@ -1901,29 +1972,33 @@ uint64_t hashTypeRemoveFromExpires(ebuckets *hexpires, robj *o) { /* Add hash to global HFE DS and update key for notifications. * - * key - must be the same instance that is stored in db->dict + * key - must be the same key instance that is persisted in db->dict + * expireTime - expiration in msec. + * If eq. 0 then the hash will be added to the global HFE DS with + * the minimum expiration time that is already written in advance + * to attached metadata (which considered as trash as long as it is + * not attached to global HFE DS). + * + * Precondition: It is a hash of type listpackex or HT with HFE metadata. */ void hashTypeAddToExpires(redisDb *db, sds key, robj *hashObj, uint64_t expireTime) { - if (expireTime == EB_EXPIRE_TIME_INVALID) + if (expireTime > EB_EXPIRE_TIME_MAX) return; if (hashObj->encoding == OBJ_ENCODING_LISTPACK_EX) { listpackEx *lpt = hashObj->ptr; lpt->key = key; + expireTime = (expireTime) ? expireTime : ebGetMetaExpTime(&lpt->meta); ebAdd(&db->hexpires, &hashExpireBucketsType, hashObj, expireTime); - return; + } else if (hashObj->encoding == OBJ_ENCODING_HT) { + dict *d = hashObj->ptr; + if (isDictWithMetaHFE(d)) { + dictExpireMetadata *meta = (dictExpireMetadata *) dictMetadata(d); + expireTime = (expireTime) ? expireTime : ebGetMetaExpTime(&meta->expireMeta); + meta->key = key; + ebAdd(&db->hexpires, &hashExpireBucketsType, hashObj, expireTime); + } } - serverAssert(hashObj->encoding == OBJ_ENCODING_HT); - - serverAssert(isDictWithMetaHFE(hashObj->ptr)); - - /* Update hash with key for notifications */ - dict *d = hashObj->ptr; - dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *) dictMetadata(d); - dictExpireMeta->key = key; - - /* Add hash to global HFE DS */ - ebAdd(&db->hexpires, &hashExpireBucketsType, hashObj, expireTime); } /* DB active expire and update hashes with time-expiration on fields. @@ -1957,12 +2032,19 @@ uint64_t hashTypeDbActiveExpire(redisDb *db, uint32_t maxFieldsToExpire) { void hashTypeFree(robj *o) { switch (o->encoding) { case OBJ_ENCODING_HT: + /* Verify hash is not registered in global HFE ds */ + if (isDictWithMetaHFE((dict*)o->ptr)) { + dictExpireMetadata *m = (dictExpireMetadata *)dictMetadata((dict*)o->ptr); + serverAssert(m->expireMeta.trash == 1); + } dictRelease((dict*) o->ptr); break; case OBJ_ENCODING_LISTPACK: lpFree(o->ptr); break; case OBJ_ENCODING_LISTPACK_EX: + /* Verify hash is not registered in global HFE ds */ + serverAssert(((listpackEx *) o->ptr)->meta.trash == 1); listpackExFree(o->ptr); break; default: @@ -1994,19 +2076,27 @@ ebuckets *hashTypeGetDictMetaHFE(dict *d) { *----------------------------------------------------------------------------*/ void hsetnxCommand(client *c) { + int isHashDeleted; robj *o; if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - if (hashTypeExists(o, c->argv[2]->ptr)) { + if (hashTypeExists(c->db, o, c->argv[2]->ptr, &isHashDeleted)) { addReply(c, shared.czero); - } else { - hashTypeTryConversion(c->db, o,c->argv,2,3); - hashTypeSet(c->db, o,c->argv[2]->ptr,c->argv[3]->ptr,HASH_SET_COPY); - addReply(c, shared.cone); - signalModifiedKey(c,c->db,c->argv[1]); - notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); - server.dirty++; + return; } + + /* Field expired and in turn hash deleted. Create new one! */ + if (isHashDeleted) { + o = createHashObject(); + dbAdd(c->db,c->argv[1],o); + } + + hashTypeTryConversion(c->db, o,c->argv,2,3); + hashTypeSet(c->db, o,c->argv[2]->ptr,c->argv[3]->ptr,HASH_SET_COPY); + addReply(c, shared.cone); + signalModifiedKey(c,c->db,c->argv[1]); + notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); + server.dirty++; } void hsetCommand(client *c) { @@ -2047,14 +2137,21 @@ void hincrbyCommand(client *c) { if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != C_OK) return; if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - if (hashTypeGetValue(o,c->argv[2]->ptr,&vstr,&vlen,&value) == C_OK) { + + GetFieldRes res = hashTypeGetValue(c->db,o,c->argv[2]->ptr,&vstr,&vlen,&value); + if (res == GETF_OK) { if (vstr) { if (string2ll((char*)vstr,vlen,&value) == 0) { addReplyError(c,"hash value is not an integer"); return; } } /* Else hashTypeGetValue() already stored it into &value */ + } else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) { + value = 0; } else { + /* Field expired and in turn hash deleted. Create new one! */ + o = createHashObject(); + dbAdd(c->db,c->argv[1],o); value = 0; } @@ -2066,7 +2163,7 @@ void hincrbyCommand(client *c) { } value += incr; new = sdsfromlonglong(value); - hashTypeSet(c->db, o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE); + hashTypeSet(c->db, o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE | HASH_SET_KEEP_FIELD); addReplyLongLong(c,value); signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH,"hincrby",c->argv[1],c->db->id); @@ -2087,7 +2184,8 @@ void hincrbyfloatCommand(client *c) { return; } if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - if (hashTypeGetValue(o,c->argv[2]->ptr,&vstr,&vlen,&ll) == C_OK) { + GetFieldRes res = hashTypeGetValue(c->db, o,c->argv[2]->ptr,&vstr,&vlen,&ll); + if (res == GETF_OK) { if (vstr) { if (string2ld((char*)vstr,vlen,&value) == 0) { addReplyError(c,"hash value is not a float"); @@ -2096,7 +2194,12 @@ void hincrbyfloatCommand(client *c) { } else { value = (long double)ll; } + } else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) { + value = 0; } else { + /* Field expired and in turn hash deleted. Create new one! */ + o = createHashObject(); + dbAdd(c->db,c->argv[1],o); value = 0; } @@ -2109,7 +2212,7 @@ void hincrbyfloatCommand(client *c) { char buf[MAX_LONG_DOUBLE_CHARS]; int len = ld2string(buf,sizeof(buf),value,LD_STR_HUMAN); new = sdsnewlen(buf,len); - hashTypeSet(c->db, o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE); + hashTypeSet(c->db, o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE | HASH_SET_KEEP_FIELD); addReplyBulkCBuffer(c,buf,len); signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH,"hincrbyfloat",c->argv[1],c->db->id); @@ -2125,17 +2228,18 @@ void hincrbyfloatCommand(client *c) { decrRefCount(newobj); } -static void addHashFieldToReply(client *c, robj *o, sds field) { +static GetFieldRes addHashFieldToReply(client *c, robj *o, sds field) { if (o == NULL) { addReplyNull(c); - return; + return GETF_NOT_FOUND; } unsigned char *vstr = NULL; unsigned int vlen = UINT_MAX; long long vll = LLONG_MAX; - if (hashTypeGetValue(o, field, &vstr, &vlen, &vll) == C_OK) { + GetFieldRes res = hashTypeGetValue(c->db, o, field, &vstr, &vlen, &vll); + if (res == GETF_OK) { if (vstr) { addReplyBulkCBuffer(c, vstr, vlen); } else { @@ -2144,6 +2248,7 @@ static void addHashFieldToReply(client *c, robj *o, sds field) { } else { addReplyNull(c); } + return res; } void hgetCommand(client *c) { @@ -2156,6 +2261,7 @@ void hgetCommand(client *c) { } void hmgetCommand(client *c) { + GetFieldRes res = GETF_OK; robj *o; int i; @@ -2165,8 +2271,17 @@ void hmgetCommand(client *c) { if (checkType(c,o,OBJ_HASH)) return; addReplyArrayLen(c, c->argc-2); - for (i = 2; i < c->argc; i++) { - addHashFieldToReply(c, o, c->argv[i]->ptr); + for (i = 2; i < c->argc ; i++) { + + res = addHashFieldToReply(c, o, c->argv[i]->ptr); + + /* If hash got lazy expired since all fields are expired (o is invalid), + * then fill the rest with trivial nulls and return */ + if (res == GETF_EXPIRED_HASH) { + while (++i < c->argc) + addReplyNull(c); + return; + } } } @@ -2178,7 +2293,7 @@ void hdelCommand(client *c) { checkType(c,o,OBJ_HASH)) return; for (j = 2; j < c->argc; j++) { - if (hashTypeDelete(o,c->argv[j]->ptr)) { + if (hashTypeDelete(o,c->argv[j]->ptr,1)) { deleted++; if (hashTypeLength(o, 0) == 0) { dbDelete(c->db,c->argv[1]); @@ -2209,10 +2324,22 @@ void hlenCommand(client *c) { void hstrlenCommand(client *c) { robj *o; + unsigned char *vstr = NULL; + unsigned int vlen = UINT_MAX; + long long vll = LLONG_MAX; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,OBJ_HASH)) return; - addReplyLongLong(c,hashTypeGetValueLength(o,c->argv[2]->ptr)); + + GetFieldRes res = hashTypeGetValue(c->db, o, c->argv[2]->ptr, &vstr, &vlen, &vll); + + if (res == GETF_NOT_FOUND || res == GETF_EXPIRED || res == GETF_EXPIRED_HASH) { + addReply(c, shared.czero); + return; + } + + size_t len = vstr ? vlen : sdigits10(vll); + addReplyLongLong(c,len); } static void addHashIteratorCursorToReply(client *c, hashTypeIterator *hi, int what) { @@ -2295,10 +2422,11 @@ void hgetallCommand(client *c) { void hexistsCommand(client *c) { robj *o; + int isHashDeleted; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,OBJ_HASH)) return; - addReply(c, hashTypeExists(o,c->argv[2]->ptr) ? shared.cone : shared.czero); + addReply(c,hashTypeExists(c->db,o,c->argv[2]->ptr,&isHashDeleted) ? shared.cone : shared.czero); } void hscanCommand(client *c) { @@ -2600,6 +2728,8 @@ static hfield _hfieldNew(const void *field, size_t fieldlen, int withExpireMeta, hfield hf = mstrNewWithMeta(&mstrFieldKind, field, fieldlen, (mstrFlags) 1 << HFIELD_META_EXPIRE, trymalloc); + if (!hf) return NULL; + ExpireMeta *expireMeta = mstrMetaRef(hf, &mstrFieldKind, HFIELD_META_EXPIRE); /* as long as it is not inside ebuckets, it is considered trash */ @@ -2669,14 +2799,37 @@ int hfieldIsExpired(hfield field) { /*----------------------------------------------------------------------------- * Hash Field Expiration (HFE) *----------------------------------------------------------------------------*/ -/* Called during active expiration of hash-fields */ +/* Can be called either by active-expire cron job or query from the client */ +static void propagateHashFieldDeletion(redisDb *db, sds key, char *field, size_t fieldLen) { + robj *argv[] = { + shared.hdel, + createStringObject((char*) key, sdslen(key)), + createStringObject(field, fieldLen) + }; + + enterExecutionUnit(1, 0); + int prev_replication_allowed = server.replication_allowed; + server.replication_allowed = 1; + alsoPropagate(db->id,argv, 3, PROPAGATE_AOF|PROPAGATE_REPL); + server.replication_allowed = prev_replication_allowed; + exitExecutionUnit(); + + /* Propagate the HDEL command */ + postExecutionUnitOperations(); + + decrRefCount(argv[1]); + decrRefCount(argv[2]); +} + +/* Called during active expiration of hash-fields. Propagate to replica & Delete. */ static ExpireAction onFieldExpire(eItem item, void *ctx) { + OnFieldExpireCtx *expCtx = ctx; hfield hf = item; - robj *hashobj = (robj *) ctx; - dictUseStoredKeyApi((dict *)hashobj->ptr, 1); - hashTypeDelete(hashobj, hf); + dict *d = expCtx->hashObj->ptr; + dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *) dictMetadata(d); + propagateHashFieldDeletion(expCtx->db, dictExpireMeta->key, hf, hfieldlen(hf)); + serverAssert(hashTypeDelete(expCtx->hashObj, hf, 0) == 1); server.stat_expired_hash_fields++; - dictUseStoredKeyApi((dict *)hashobj->ptr, 0); return ACT_REMOVE_EXP_ITEM; } @@ -2888,17 +3041,31 @@ static void hexpireGenericCommand(client *c, const char *cmd, long long basetime HashTypeSetEx exCtx; hashTypeSetExInit(keyArg, hashObj, c, c->db, cmd, FIELD_DONT_CREATE2, - FIELD_GET_NONE, expireSetCond, &exCtx); addReplyArrayLen(c, numFields); + for (int i = 0 ; i < numFields ; i++) { sds field = c->argv[numFieldsAt+i+1]->ptr; SetExRes res = hashTypeSetEx(c->db, hashObj, field, NULL, expire, &exCtx); addReplyLongLong(c,res); } hashTypeSetExDone(&exCtx); + + /* rewrite command for the replica sake */ + + /* Propagate as HPEXPIREAT millisecond-timestamp. Rewrite only if not already */ + if (c->cmd->proc != hpexpireatCommand) { + rewriteClientCommandArgument(c,0,shared.hpexpireat); + } + + /* rewrite expiration time to unix time in msec */ + if (basetime != 0 || unit == UNIT_SECONDS) { + robj *expireObj = createStringObjectFromLongLong(expire); + rewriteClientCommandArgument(c, 2, expireObj); + decrRefCount(expireObj); + } } /* HPEXPIRE key milliseconds [ NX | XX | GT | LT] numfields */ diff --git a/tests/integration/psync2-master-restart.tcl b/tests/integration/psync2-master-restart.tcl index 6f7a31d21..b0d394389 100644 --- a/tests/integration/psync2-master-restart.tcl +++ b/tests/integration/psync2-master-restart.tcl @@ -179,6 +179,17 @@ start_server {} { $master set $j somevalue px 10 } + ##### hash-field-expiration + # Hashes of type OBJ_ENCODING_LISTPACK_EX won't be discarded during + # RDB load, even if they are expired. + $master hset myhash1 f1 v1 f2 v2 f3 v3 + $master hpexpire myhash1 10 FIELDS 3 f1 f2 f3 + # Hashes of type RDB_TYPE_HASH_METADATA will be discarded during RDB load. + $master config set hash-max-listpack-entries 0 + $master hset myhash2 f1 v1 f2 v2 + $master hpexpire myhash2 10 FIELDS 2 f1 f2 + $master config set hash-max-listpack-entries 1 + after 20 wait_for_condition 500 100 { diff --git a/tests/integration/rdb.tcl b/tests/integration/rdb.tcl index 827ecee33..9db781689 100644 --- a/tests/integration/rdb.tcl +++ b/tests/integration/rdb.tcl @@ -421,14 +421,20 @@ set server_path [tmpdir "server.partial-hfield-exp-test"] # verifies writing and reading hash key with expiring and persistent fields start_server [list overrides [list "dir" $server_path]] { foreach {type lp_entries} {listpack 512 dict 0} { - test "hash field expiration save and load rdb one expired field, ($type)" { + test "HFE - save and load expired fields, expired soon after, or long after ($type)" { r config set hash-max-listpack-entries $lp_entries r FLUSHALL - r HMSET key a 1 b 2 c 3 d 4 - r HEXPIREAT key 2524600800 FIELDS 2 a b - r HPEXPIRE key 100 FIELDS 1 d + r HMSET key a 1 b 2 c 3 d 4 e 5 + # expected to be expired long after restart + r HEXPIREAT key 2524600800 FIELDS 1 a + # expected long TTL value (6 bytes) is saved and loaded correctly + r HPEXPIREAT key 188900976391764 FIELDS 1 b + # expected to be already expired after restart + r HPEXPIRE key 80 FIELDS 1 d + # expected to be expired soon after restart + r HPEXPIRE key 200 FIELDS 1 e r save # sleep 101 ms to make sure d will expire after restart @@ -437,14 +443,14 @@ start_server [list overrides [list "dir" $server_path]] { wait_done_loading r assert_equal [lsort [r hgetall key]] "1 2 3 a b c" - assert_equal [r hexpiretime key FIELDS 3 a b c] {2524600800 2524600800 -1} + assert_equal [r hpexpiretime key FIELDS 3 a b c] {2524600800000 188900976391764 -1} assert_equal [s rdb_last_load_keys_loaded] 1 - # hash keys saved in listpack encoding are loaded as a blob, - # so individual field expiry is not verified on load - if {$type eq "dict"} { - assert_equal [s rdb_last_load_hash_fields_expired] 1 + + # wait until expired_hash_fields equals 2 + wait_for_condition 10 100 { + [s expired_hash_fields] == 2 } else { - assert_equal [s rdb_last_load_hash_fields_expired] 0 + fail "Value of expired_hash_fields is not as expected" } } } @@ -455,7 +461,7 @@ set server_path [tmpdir "server.all-hfield-exp-test"] # verifies writing hash with several expired keys, and active-expiring it on load start_server [list overrides [list "dir" $server_path]] { foreach {type lp_entries} {listpack 512 dict 0} { - test "hash field expiration save and load rdb all fields expired, ($type)" { + test "HFE - save and load rdb all fields expired, ($type)" { r config set hash-max-listpack-entries $lp_entries r FLUSHALL @@ -470,53 +476,15 @@ start_server [list overrides [list "dir" $server_path]] { restart_server 0 true false wait_done_loading r - # hash keys saved as listpack-encoded are saved and loaded as a blob - # so individual field validation is not checked during load. - # Therefore, if the key was saved as dict it is expected that - # all 4 fields were expired during load, and thus the key was - # "declared" an empty key. - # On the other hand, if the key was saved as listpack, it is - # expected that no field was expired on load and the key was loaded, - # even though all its fields are actually expired. - if {$type eq "dict"} { - assert_equal [s rdb_last_load_keys_loaded] 0 - assert_equal [s rdb_last_load_hash_fields_expired] 4 - } else { - assert_equal [s rdb_last_load_keys_loaded] 1 - assert_equal [s rdb_last_load_hash_fields_expired] 0 - } + # it is expected that no field was expired on load and the key was + # loaded, even though all its fields are actually expired. + assert_equal [s rdb_last_load_keys_loaded] 1 - # in listpack encoding, the fields (and key) will be expired by - # lazy expiry assert_equal [r hgetall key] {} } } } -set server_path [tmpdir "server.long-ttl-test"] - -# verifies a long TTL value (6 bytes) is saved and loaded correctly -start_server [list overrides [list "dir" $server_path]] { - foreach {type lp_entries} {listpack 512 dict 0} { - test "hash field expiration save and load rdb long TTL, ($type)" { - r config set hash-max-listpack-entries $lp_entries - - r FLUSHALL - - r HSET key a 1 - # set expiry to 0xabcdef987654 (6 bytes) - r HPEXPIREAT key 188900976391764 FIELDS 1 a - - r save - restart_server 0 true false - wait_done_loading r - - assert_equal [r hget key a ] 1 - assert_equal [r hpexpiretime key FIELDS 1 a] {188900976391764} - } - } -} - set server_path [tmpdir "server.listpack-to-dict-test"] test "save listpack, load dict" { @@ -540,7 +508,6 @@ test "save listpack, load dict" { # first verify d was not expired during load (no expiry when loading # a hash that was saved listpack-encoded) assert_equal [s rdb_last_load_keys_loaded] 1 - assert_equal [s rdb_last_load_hash_fields_expired] 0 # d should be lazy expired in hgetall assert_equal [lsort [r hgetall key]] "1 2 3 a b c" @@ -570,7 +537,6 @@ test "save dict, load listpack" { # verify d was expired during load assert_equal [s rdb_last_load_keys_loaded] 1 - assert_equal [s rdb_last_load_hash_fields_expired] 1 assert_equal [lsort [r hgetall key]] "1 2 3 a b c" assert_match "*encoding:listpack*" [r debug object key] @@ -602,7 +568,6 @@ foreach {type lp_entries} {listpack 512 dict 0} { } assert_equal [s rdb_last_load_keys_loaded] 1 - assert_equal [s rdb_last_load_hash_fields_expired] 0 # hgetall might lazy expire fields, so it's only called after the stat asserts assert_equal [lsort [r hgetall key]] "1 2 5 6 a b e f" @@ -632,7 +597,6 @@ foreach {type lp_entries} {listpack 512 dict 0} { after 500 assert_equal [s rdb_last_load_keys_loaded] 1 - assert_equal [s rdb_last_load_hash_fields_expired] 0 assert_equal [s expired_hash_fields] 0 # hgetall will lazy expire fields, so it's only called after the stat asserts diff --git a/tests/support/util.tcl b/tests/support/util.tcl index a40c630df..fcdac8c94 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -293,6 +293,9 @@ proc findKeyWithType {r type} { proc createComplexDataset {r ops {opt {}}} { set useexpire [expr {[lsearch -exact $opt useexpire] != -1}] + # TODO: Remove usehexpire on next commit, when RDB will support replication + set usehexpire [expr {[lsearch -exact $opt usehexpire] != -1}] + if {[lsearch -exact $opt usetag] != -1} { set tag "{t}" } else { @@ -386,6 +389,10 @@ proc createComplexDataset {r ops {opt {}}} { {hash} { randpath {{*}$r hset $k $f $v} \ {{*}$r hdel $k $f} + + if { [{*}$r hexists $k $f] && $usehexpire && rand() < 0.5} { + {*}$r hexpire $k 1000 FIELDS 1 $f + } } } } diff --git a/tests/unit/other.tcl b/tests/unit/other.tcl index 1ba0e62fa..52f1fa75c 100644 --- a/tests/unit/other.tcl +++ b/tests/unit/other.tcl @@ -124,7 +124,8 @@ start_server {tags {"other"}} { if {$::accurate} {set numops 10000} else {set numops 1000} test {Check consistency of different data types after a reload} { r flushdb - createComplexDataset r $numops usetag + # TODO: integrate usehexpire following next commit that will support replication + createComplexDataset r $numops {usetag usehexpire} if {$::ignoredigest} { set _ 1 } else { diff --git a/tests/unit/type/hash-field-expire.tcl b/tests/unit/type/hash-field-expire.tcl index 08d0203ea..ef5217068 100644 --- a/tests/unit/type/hash-field-expire.tcl +++ b/tests/unit/type/hash-field-expire.tcl @@ -64,6 +64,19 @@ proc cmp_hrandfield_result {hash_name expected_result} { } } +proc dumpAllHashes {client} { + set keyAndFields(0,0) 0 + unset keyAndFields + # keep keys sorted for comparison + foreach key [lsort [$client keys *]] { + set fields [$client hgetall $key] + foreach f $fields { + set keyAndFields($key,$f) [$client hpexpiretime $key FIELDS 1 $f] + } + } + return [array get keyAndFields] +} + proc hrandfieldTest {activeExpireConfig} { r debug set-active-expire $activeExpireConfig r del myhash @@ -155,16 +168,16 @@ start_server {tags {"external:skip needs:debug"}} { test "HPEXPIRE(AT) - Test 'LT' flag ($type)" { r del myhash - r hset myhash field1 value1 field2 value2 + r hset myhash field1 value1 field2 value2 field3 value3 assert_equal [r hpexpire myhash 1000 NX FIELDS 1 field1] [list $E_OK] assert_equal [r hpexpire myhash 2000 NX FIELDS 1 field2] [list $E_OK] - assert_equal [r hpexpire myhash 1500 LT FIELDS 2 field1 field2] [list $E_FAIL $E_OK] + assert_equal [r hpexpire myhash 1500 LT FIELDS 3 field1 field2 field3] [list $E_FAIL $E_OK $E_OK] r del myhash - r hset myhash field1 value1 field2 value2 + r hset myhash field1 value1 field2 value2 field3 value3 assert_equal [r hpexpireat myhash [expr {([clock seconds]+1000)*1000}] NX FIELDS 1 field1] [list $E_OK] assert_equal [r hpexpireat myhash [expr {([clock seconds]+2000)*1000}] NX FIELDS 1 field2] [list $E_OK] - assert_equal [r hpexpireat myhash [expr {([clock seconds]+1500)*1000}] LT FIELDS 2 field1 field2] [list $E_FAIL $E_OK] + assert_equal [r hpexpireat myhash [expr {([clock seconds]+1500)*1000}] LT FIELDS 3 field1 field2 field3] [list $E_FAIL $E_OK $E_OK] } test "HPEXPIREAT - field not exists or TTL is in the past ($type)" { @@ -190,46 +203,41 @@ start_server {tags {"external:skip needs:debug"}} { assert_error {*invalid expire time*} {r hpexpire myhash [expr (1<<48) - [clock milliseconds] + 100 ] FIELDS 1 f1} } - test "Lazy - doesn't delete hash that all its fields got expired ($type)" { + test "Lazy Expire - fields are lazy deleted ($type)" { + + # TODO remove the SELECT once dbid will be embedded inside dict/listpack + r select 0 r debug set-active-expire 0 - r flushall + r del myhash - set hash_sizes {1 15 16 17 31 32 33 40} - foreach h $hash_sizes { - for {set i 1} {$i <= $h} {incr i} { - # random expiration time - r hset hrand$h f$i v$i - r hpexpire hrand$h [expr {50 + int(rand() * 50)}] FIELDS 1 f$i - assert_equal 1 [r HEXISTS hrand$h f$i] + r hset myhash f1 v1 f2 v2 f3 v3 + r hpexpire myhash 1 NX FIELDS 3 f1 f2 f3 + after 5 - # same expiration time - r hset same$h f$i v$i - r hpexpire same$h 100 FIELDS 1 f$i - assert_equal 1 [r HEXISTS same$h f$i] + # Verify that still exists even if all fields are expired + assert_equal 1 [r EXISTS myhash] - # same expiration time - r hset mix$h f$i v$i fieldWithoutExpire$i v$i - r hpexpire mix$h 100 FIELDS 1 f$i - assert_equal 1 [r HEXISTS mix$h f$i] - } - } + # Verify that len counts also expired fields + assert_equal 3 [r HLEN myhash] - after 150 + # Trying access to expired field should delete it. Len should be updated + assert_equal 0 [r hexists myhash f1] + assert_equal 2 [r HLEN myhash] + + # Trying access another expired field should delete it. Len should be updated + assert_equal "" [r hget myhash f2] + assert_equal 1 [r HLEN myhash] + + # Trying access last expired field should delete it. hash shouldn't exists afterward. + assert_equal 0 [r hstrlen myhash f3] + assert_equal 0 [r HLEN myhash] + assert_equal 0 [r EXISTS myhash] - # Verify that all fields got expired but keys wasn't lazy deleted - foreach h $hash_sizes { - for {set i 1} {$i <= $h} {incr i} { - assert_equal 0 [r HEXISTS mix$h f$i] - } - assert_equal 1 [r EXISTS hrand$h] - assert_equal 1 [r EXISTS same$h] - assert_equal [expr $h * 2] [r HLEN mix$h] - } # Restore default r debug set-active-expire 1 } - test "Active - deletes hash that all its fields got expired ($type)" { + test "Active Expire - deletes hash that all its fields got expired ($type)" { r flushall set hash_sizes {1 15 16 17 31 32 33 40} @@ -375,7 +383,7 @@ start_server {tags {"external:skip needs:debug"}} { assert_range [r httl myhash FIELDS 1 field1] 4 5 } - test "Lazy expire - delete hash with expired fields ($type)" { + test "Lazy Expire - delete hash with expired fields ($type)" { r del myhash r debug set-active-expire 0 r hset myhash k v @@ -403,7 +411,7 @@ start_server {tags {"external:skip needs:debug"}} { } - test "Lazy expire - HLEN does count expired fields ($type)" { + test "Lazy Expire - HLEN does count expired fields ($type)" { # Enforce only lazy expire r debug set-active-expire 0 @@ -432,7 +440,7 @@ start_server {tags {"external:skip needs:debug"}} { r debug set-active-expire 1 } - test "Lazy expire - HSCAN does not report expired fields ($type)" { + test "Lazy Expire - HSCAN does not report expired fields ($type)" { # Enforce only lazy expire r debug set-active-expire 0 @@ -493,7 +501,7 @@ start_server {tags {"external:skip needs:debug"}} { r debug set-active-expire 1 } - test "Lazy expire - verify various HASH commands handling expired fields ($type)" { + test "Lazy Expire - verify various HASH commands handling expired fields ($type)" { # Enforce only lazy expire r debug set-active-expire 0 r del h1 h2 h3 h4 h5 h18 @@ -676,7 +684,21 @@ start_server {tags {"external:skip needs:debug"}} { wait_for_condition 20 10 { [r exists myhash] == 0 } else { fail "'myhash' should be expired" } } {} {singledb:skip} - test "HPERSIST - Returns empty array if key does not exist" { + test "HMGET - returns empty entries if fields or hash expired ($type)" { + r debug set-active-expire 0 + r del h1 h2 + r hset h1 f1 v1 f2 v2 f3 v3 + r hset h2 f1 v1 f2 v2 f3 v3 + r hpexpire h1 10000000 NX FIELDS 1 f1 + r hpexpire h1 1 NX FIELDS 2 f2 f3 + r hpexpire h2 1 NX FIELDS 3 f1 f2 f3 + after 5 + assert_equal [r hmget h1 f1 f2 f3] {v1 {} {}} + assert_equal [r hmget h2 f1 f2 f3] {{} {} {}} + r debug set-active-expire 1 + } + + test "HPERSIST - Returns empty array if key does not exist ($type)" { r del myhash # Make sure we can distinguish between an empty array and a null response r readraw 1 @@ -743,6 +765,53 @@ start_server {tags {"external:skip needs:debug"}} { assert_equal [lsort [r hgetall myhash]] "1 2 3 a b c" assert_equal [r hexpiretime myhash FIELDS 3 a b c] {-1 -1 -1} } + + test {HINCRBY - discards pending expired field and reset its value} { + r debug set-active-expire 0 + r del h1 h2 + r hset h1 f1 10 f2 2 + r hset h2 f1 10 + assert_equal [r HINCRBY h1 f1 2] 12 + assert_equal [r HINCRBY h2 f1 2] 12 + r HPEXPIRE h1 10 FIELDS 1 f1 + r HPEXPIRE h2 10 FIELDS 1 f1 + after 15 + assert_equal [r HINCRBY h1 f1 1] 1 + assert_equal [r HINCRBY h2 f1 1] 1 + r debug set-active-expire 1 + } + + test {HINCRBY - preserve expiration time of the field} { + r del h1 + r hset h1 f1 10 + r hpexpire h1 20 FIELDS 1 f1 + assert_equal [r HINCRBY h1 f1 2] 12 + assert_range [r HPTTL h1 FIELDS 1 f1] 1 20 + } + + + test {HINCRBYFLOAT - discards pending expired field and reset its value} { + r debug set-active-expire 0 + r del h1 h2 + r hset h1 f1 10 f2 2 + r hset h2 f1 10 + assert_equal [r HINCRBYFLOAT h1 f1 2] 12 + assert_equal [r HINCRBYFLOAT h2 f1 2] 12 + r HPEXPIRE h1 10 FIELDS 1 f1 + r HPEXPIRE h2 10 FIELDS 1 f1 + after 15 + assert_equal [r HINCRBYFLOAT h1 f1 1] 1 + assert_equal [r HINCRBYFLOAT h2 f1 1] 1 + r debug set-active-expire 1 + } + + test {HINCRBYFLOAT - preserve expiration time of the field} { + r del h1 + r hset h1 f1 10 + r hpexpire h1 20 FIELDS 1 f1 + assert_equal [r HINCRBYFLOAT h1 f1 2.5] 12.5 + assert_range [r HPTTL h1 FIELDS 1 f1] 1 20 + } } r config set hash-max-listpack-entries 512 @@ -878,3 +947,205 @@ start_server {tags {"external:skip needs:debug"}} { } } } + +start_server {tags {"external:skip needs:debug"}} { + foreach type {listpack ht} { + if {$type eq "ht"} { + r config set hash-max-listpack-entries 0 + } else { + r config set hash-max-listpack-entries 512 + } + + test "Command rewrite and expired hash fields are propagated to replica ($type)" { + start_server {overrides {appendonly {yes} appendfsync always} tags {external:skip}} { + + set aof [get_last_incr_aof_path r] + r hset h1 f1 v1 f2 v2 + + r hpexpire h1 20 FIELDS 1 f1 + r hpexpire h1 30 FIELDS 1 f2 + r hpexpire h1 30 FIELDS 1 non_exists_field + r hset h2 f1 v1 f2 v2 f3 v3 f4 v4 + r hpexpire h2 40 FIELDS 2 f1 non_exists_field + r hpexpire h2 50 FIELDS 1 f2 + r hpexpireat h2 [expr [clock seconds]*1000+100000] LT FIELDS 1 f3 + r hexpireat h2 [expr [clock seconds]+10] NX FIELDS 1 f4 + + wait_for_condition 50 100 { + [r hlen h2] eq 2 + } else { + fail "Field f2 of hash h2 wasn't deleted" + } + + # Assert that each TTL-related command are persisted with absolute timestamps in AOF + assert_aof_content $aof { + {select *} + {hset h1 f1 v1 f2 v2} + {hpexpireat h1 * FIELDS 1 f1} + {hpexpireat h1 * FIELDS 1 f2} + {hset h2 f1 v1 f2 v2 f3 v3 f4 v4} + {hpexpireat h2 * FIELDS 2 f1 non_exists_field} + {hpexpireat h2 * FIELDS 1 f2} + {hpexpireat h2 * FIELDS 1 f3} + {hpexpireat h2 * FIELDS 1 f4} + {hdel h1 f1} + {hdel h1 f2} + {hdel h2 f1} + {hdel h2 f2} + } + } + } + + test "Lazy Expire - fields are lazy deleted and propagated to replicas ($type)" { + start_server {overrides {appendonly {yes} appendfsync always} tags {external:skip}} { + r debug set-active-expire 0 + set aof [get_last_incr_aof_path r] + + r del myhash + + r hset myhash f1 v1 f2 v2 f3 v3 + r hpexpire myhash 1 NX FIELDS 3 f1 f2 f3 + after 5 + + # Verify that still exists even if all fields are expired + assert_equal 1 [r EXISTS myhash] + + # Verify that len counts also expired fields + assert_equal 3 [r HLEN myhash] + + # Trying access to expired field should delete it. Len should be updated + assert_equal 0 [r hexists myhash f1] + assert_equal 2 [r HLEN myhash] + + # Trying access another expired field should delete it. Len should be updated + assert_equal "" [r hget myhash f2] + assert_equal 1 [r HLEN myhash] + + # Trying access last expired field should delete it. hash shouldn't exists afterward. + assert_equal 0 [r hstrlen myhash f3] + assert_equal 0 [r HLEN myhash] + assert_equal 0 [r EXISTS myhash] + + wait_for_condition 50 100 { [r exists h1] == 0 } else { fail "hash h1 wasn't deleted" } + + # HDEL are propagated as expected + assert_aof_content $aof { + {select *} + {hset myhash f1 v1 f2 v2 f3 v3} + {hpexpireat myhash * NX FIELDS 3 f1 f2 f3} + {hdel myhash f1} + {hdel myhash f2} + {hdel myhash f3} + } + r debug set-active-expire 1 + } + } + + # Start a new server with empty data and AOF file. + start_server {overrides {appendonly {yes} appendfsync always} tags {external:skip}} { + + # Based on test at expire.tcl: " All time-to-live(TTL) in commands are propagated as absolute ..." + test {All TTLs in commands are propagated as absolute timestamp in milliseconds in AOF} { + + set aof [get_last_incr_aof_path r] + + r hset h1 f1 v1 f2 v2 f3 v3 f4 v4 f5 v5 f6 v6 + r hexpireat h1 [expr [clock seconds]+100] NX FIELDS 1 f1 + r hpexpireat h1 [expr [clock seconds]*1000+100000] NX FIELDS 1 f2 + r hpexpire h1 100000 NX FIELDS 3 f3 f4 f5 + r hexpire h1 100000 FIELDS 1 f6 + r hset h5 f1 v1 + + assert_aof_content $aof { + {select *} + {hset h1 f1 v1 f2 v2 f3 v3 f4 v4 f5 v5 f6 v6} + {hpexpireat h1 * FIELDS 1 f1} + {hpexpireat h1 * FIELDS 1 f2} + {hpexpireat h1 * NX FIELDS 3 f3 f4 f5} + {hpexpireat h1 * FIELDS 1 f6} + {hset h5 f1 v1} + } + + array set keyAndFields1 [dumpAllHashes r] + # Let some time pass and reload data from AOF + after 2000 + r debug loadaof + array set keyAndFields2 [dumpAllHashes r] + + # Assert that absolute TTLs are the same + assert_equal [array get keyAndFields1] [array get keyAndFields2] + + } {} {needs:debug} + } + + # Based on test, with same name, at expire.tcl: + test {All TTL in commands are propagated as absolute timestamp in replication stream} { + # Make sure that both relative and absolute expire commands are propagated + # Consider also comment of the test, with same name, at expire.tcl + + r flushall ; # Clean up keyspace to avoid interference by keys from other tests + set repl [attach_to_replication_stream] + + r hset h1 f1 v1 + r hexpireat h1 [expr [clock seconds]+100] NX FIELDS 1 f1 + r hset h2 f2 v2 + r hpexpireat h2 [expr [clock seconds]*1000+100000] NX FIELDS 1 f2 + r hset h3 f3 v3 f4 v4 + r hexpire h3 100 FIELDS 3 f3 f4 non_exists_field + + assert_replication_stream $repl { + {select *} + {hset h1 f1 v1} + {hpexpireat h1 * NX FIELDS 1 f1} + {hset h2 f2 v2} + {hpexpireat h2 * NX FIELDS 1 f2} + {hset h3 f3 v3 f4 v4} + {hpexpireat h3 * FIELDS 3 f3 f4 non_exists_field} + } + close_replication_stream $repl + } {} {needs:repl} + + # Start another server to test replication of TTLs + start_server {tags {needs:repl external:skip}} { + # Set the outer layer server as primary + set primary [srv -1 client] + set primary_host [srv -1 host] + set primary_port [srv -1 port] + # Set this inner layer server as replica + set replica [srv 0 client] + + # Server should have role slave + $replica replicaof $primary_host $primary_port + wait_for_condition 50 100 { + [s 0 role] eq {slave} + } else { + fail "Replication not started." + } + + # Based on test, with same name, at expire.tcl + test {For all replicated TTL-related commands, absolute expire times are identical on primary and replica} { + # Apply each TTL-related command to a unique key on primary + $primary flushall + $primary hset h1 f v + $primary hexpireat h1 [expr [clock seconds]+10000] FIELDS 1 f + $primary hset h2 f v + $primary hpexpireat h2 [expr [clock milliseconds]+100000] FIELDS 1 f + $primary hset h3 f v + $primary hexpire h3 100 NX FIELDS 1 f + $primary hset h4 f v + $primary hpexpire h4 100000 NX FIELDS 1 f + $primary hset h5 f v + $primary hpexpireat h5 [expr [clock milliseconds]-100000] FIELDS 1 f + $primary hset h9 f v + + # Wait for replica to get the keys and TTLs + assert {[$primary wait 1 0] == 1} + + # Verify absolute TTLs are identical on primary and replica for all keys + # This is because TTLs are always replicated as absolute values + assert_equal [dumpAllHashes $primary] [dumpAllHashes $replica] + } + } + } +} +