HFE to support AOF and replicas (#13285)

* For replica sake, rewrite commands `H*EXPIRE*` , `HSETF`, `HGETF` to
have absolute unix time in msec.
* On active-expiration of field, propagate HDEL to replica
(`propagateHashFieldDeletion()`)
* On lazy-expiration, propagate HDEL to replica (`hashTypeGetValue()`
now calls `hashTypeDelete()`. It also takes care to call
`propagateHashFieldDeletion()`).
* Fix `H*EXPIRE*` command such that if it gets flag `LT` and it doesn’t
have any expiration on the field then it will considered as valid
condition.

Note, replicas doesn’t make any active expiration, and should avoid lazy
expiration. On `hashTypeGetValue()` it doesn't check expiration (As long
as the master didn’t request to delete the field, it is valid)

TODO: 
* Attach `dbid` to HASH metadata. See
[here](https://github.com/redis/redis/pull/13209#discussion_r1593385850)

---------

Co-authored-by: debing.sun <debing.sun@redis.com>
This commit is contained in:
Moti Cohen 2024-05-29 14:47:48 +03:00 committed by GitHub
parent 6a11d458be
commit 33fc0fbfae
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 988 additions and 487 deletions

View file

@ -1939,7 +1939,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
*
* The function returns 0 on error, non-zero on success. */
static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
if (hi->encoding == OBJ_ENCODING_LISTPACK) {
if ((hi->encoding == OBJ_ENCODING_LISTPACK) || (hi->encoding == OBJ_ENCODING_LISTPACK_EX)) {
unsigned char *vstr = NULL;
unsigned int vlen = UINT_MAX;
long long vll = LLONG_MAX;
@ -1963,37 +1963,60 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
/* Emit the commands needed to rebuild a hash object.
* The function returns 0 on error, 1 on success. */
int rewriteHashObject(rio *r, robj *key, robj *o) {
int res = 0; /*fail*/
hashTypeIterator *hi;
long long count = 0, items = hashTypeLength(o, 0);
int isHFE = hashTypeGetMinExpire(o) != EB_EXPIRE_TIME_INVALID;
hi = hashTypeInitIterator(o);
while (hashTypeNext(hi, 0) != C_ERR) {
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
AOF_REWRITE_ITEMS_PER_CMD : items;
if (!rioWriteBulkCount(r,'*',2+cmd_items*2) ||
!rioWriteBulkString(r,"HMSET",5) ||
!rioWriteBulkObject(r,key))
{
hashTypeReleaseIterator(hi);
return 0;
if (!isHFE) {
while (hashTypeNext(hi, 0) != C_ERR) {
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
AOF_REWRITE_ITEMS_PER_CMD : items;
if (!rioWriteBulkCount(r, '*', 2 + cmd_items * 2) ||
!rioWriteBulkString(r, "HMSET", 5) ||
!rioWriteBulkObject(r, key))
goto reHashEnd;
}
if (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) ||
!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE))
goto reHashEnd;
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
} else {
while (hashTypeNext(hi, 0) != C_ERR) {
char hmsetCmd[] = "*4\r\n$5\r\nHMSET\r\n";
if ( (!rioWrite(r, hmsetCmd, sizeof(hmsetCmd) - 1)) ||
(!rioWriteBulkObject(r, key)) ||
(!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY)) ||
(!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE)) )
goto reHashEnd;
if (hi->expire_time != EB_EXPIRE_TIME_INVALID) {
char cmd[] = "*6\r\n$10\r\nHPEXPIREAT\r\n";
if ( (!rioWrite(r, cmd, sizeof(cmd) - 1)) ||
(!rioWriteBulkObject(r, key)) ||
(!rioWriteBulkLongLong(r, hi->expire_time)) ||
(!rioWriteBulkString(r, "FIELDS", 6)) ||
(!rioWriteBulkString(r, "1", 1)) ||
(!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY)) )
goto reHashEnd;
}
}
if (!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) ||
!rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE))
{
hashTypeReleaseIterator(hi);
return 0;
}
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
hashTypeReleaseIterator(hi);
res = 1; /* success */
return 1;
reHashEnd:
hashTypeReleaseIterator(hi);
return res;
}
/* Helper for rewriteStreamObject() that generates a bulk string into the

View file

@ -174,6 +174,7 @@ void dumpCommand(client *c) {
/* RESTORE key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency] */
void restoreCommand(client *c) {
uint64_t minExpiredField = EB_EXPIRE_TIME_INVALID;
long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
rio payload;
int j, type, replace = 0, absttl = 0;
@ -237,7 +238,7 @@ void restoreCommand(client *c) {
rioInitWithBuffer(&payload,c->argv[3]->ptr);
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload,key->ptr,c->db,0,NULL)) == NULL))
((obj = rdbLoadObject(type,&payload,key->ptr,c->db,NULL, &minExpiredField)) == NULL))
{
addReplyError(c,"Bad data format");
return;
@ -263,7 +264,13 @@ void restoreCommand(client *c) {
}
/* Create the key and set the TTL if any */
dbAdd(c->db,key,obj);
dictEntry *de = dbAdd(c->db,key,obj);
/* If minExpiredField was set, then the object is hash with expiration
* on fields and need to register it in global HFE DS */
if (minExpiredField != EB_EXPIRE_TIME_INVALID)
hashTypeAddToExpires(c->db, dictGetKey(de), obj, minExpiredField);
if (ttl) {
setExpire(c,c->db,key,ttl);
if (!absttl) {

View file

@ -276,6 +276,11 @@ static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEnt
old = dictGetVal(de);
}
kvstoreDictSetVal(db->keys, slot, de, val);
/* if hash with HFEs, take care to remove from global HFE DS */
if (old->type == OBJ_HASH)
hashTypeRemoveFromExpires(&db->hexpires, old);
if (server.lazyfree_lazy_server_del) {
freeObjAsync(key,old,db->id);
} else {
@ -1632,7 +1637,8 @@ void copyCommand(client *c) {
if (expire != -1)
setExpire(c, dst, newkey, expire);
/* If hash with expiration on fields then add it to 'dst' global HFE DS */
/* If minExpiredField was set, then the object is hash with expiration
* on fields and need to register it in global HFE DS */
if (minHashExpire != EB_EXPIRE_TIME_INVALID)
hashTypeAddToExpires(dst, dictGetKey(deCopy), newobj, minHashExpire);
@ -1768,11 +1774,13 @@ void swapMainDbWithTempDb(redisDb *tempDb) {
* remain in the same DB they were. */
activedb->keys = newdb->keys;
activedb->expires = newdb->expires;
activedb->hexpires = newdb->hexpires;
activedb->avg_ttl = newdb->avg_ttl;
activedb->expires_cursor = newdb->expires_cursor;
newdb->keys = aux.keys;
newdb->expires = aux.expires;
newdb->hexpires = aux.hexpires;
newdb->avg_ttl = aux.avg_ttl;
newdb->expires_cursor = aux.expires_cursor;

View file

@ -204,13 +204,18 @@ void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o)
unsigned char eledigest[20];
sds sdsele;
/* field */
memset(eledigest,0,20);
sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_KEY);
mixDigest(eledigest,sdsele,sdslen(sdsele));
sdsfree(sdsele);
/* val */
sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE);
mixDigest(eledigest,sdsele,sdslen(sdsele));
sdsfree(sdsele);
/* hash-field expiration (HFE) */
if (hi->expire_time != EB_EXPIRE_TIME_INVALID)
xorDigest(eledigest,"!!hexpire!!",11);
xorDigest(digest,eledigest,20);
}
hashTypeReleaseIterator(hi);

View file

@ -23,6 +23,12 @@
* #define EB_VALIDATE_DEBUG 1
*/
#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK)
#define EB_VALIDATE_STRUCTURE(eb, type) ebValidate(eb, type)
#else
#define EB_VALIDATE_STRUCTURE(eb, type) // Do nothing
#endif
/*** BENCHMARK
*
* To benchmark ebuckets creation and active-expire with 10 million items, apply
@ -190,7 +196,7 @@ static inline uint64_t raxKey2BucketKey(unsigned char *raxKey) {
* Before: [segHdr] -> {item1,..,item16} -> [..]
* After: [segHdr] -> {newItem} -> [nextSegHdr] -> {item1,..,item16} -> [..]
*
* Take care to persist `segHdr` to be the same instance after the change.
* Taken care to persist `segHdr` to be the same instance after the change.
* This is important because the rax tree is pointing to it. */
static int ebSegAddExtended(EbucketsType *type, FirstSegHdr *firstSegHdr, eItem newItem) {
/* Allocate nextSegHdr and let it take the items of first segment header */
@ -1390,9 +1396,8 @@ int ebRemove(ebuckets *eb, EbucketsType *type, eItem item) {
if (res)
type->getExpireMeta(item)->trash = 1;
#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK)
ebValidate(*eb, type);
#endif
EB_VALIDATE_STRUCTURE(*eb, type);
return res;
}
@ -1435,9 +1440,9 @@ int ebAdd(ebuckets *eb, EbucketsType *type, eItem item, uint64_t expireTime) {
/* Add item to rax */
res = ebAddToRax(eb, type, item, EB_BUCKET_KEY(expireTime));
}
#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK)
ebValidate(*eb, type);
#endif
EB_VALIDATE_STRUCTURE(*eb, type);
return res;
}
@ -1521,9 +1526,9 @@ END_ACTEXP:
ebAdd(eb, type, updateList, ebGetMetaExpTime(mItem));
updateList = next;
}
#if (REDIS_TEST || EB_VALIDATE_DEBUG) && !defined(EB_TEST_BENCHMARK)
ebValidate(*eb, type);
#endif
EB_VALIDATE_STRUCTURE(*eb, type);
return;
}

View file

@ -139,7 +139,7 @@
* The idea of it is to trim the rax tree depth, avoid having too many branches,
* and reduce frequent modifications of the tree to the minimum.
*/
#define EB_BUCKET_KEY_PRECISION 0 /* 1024msec */
#define EB_BUCKET_KEY_PRECISION 0 /* TBD: modify to 10 */
/* From expiration time to bucket-key */
#define EB_BUCKET_KEY(exptime) ((exptime) >> EB_BUCKET_KEY_PRECISION)

View file

@ -5271,7 +5271,10 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) {
/* Handle XX and NX */
if (flags & (REDISMODULE_HASH_XX|REDISMODULE_HASH_NX)) {
int exists = hashTypeExists(key->value, field->ptr);
int isHashDeleted;
int exists = hashTypeExists(key->db, key->value, field->ptr, &isHashDeleted);
/* hash-field-expiration is not exposed to modules */
serverAssert(isHashDeleted == 0);
if (((flags & REDISMODULE_HASH_XX) && !exists) ||
((flags & REDISMODULE_HASH_NX) && exists))
{
@ -5282,7 +5285,7 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) {
/* Handle deletion if value is REDISMODULE_HASH_DELETE. */
if (value == REDISMODULE_HASH_DELETE) {
count += hashTypeDelete(key->value, field->ptr);
count += hashTypeDelete(key->value, field->ptr, 1);
if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field);
continue;
}
@ -5374,14 +5377,22 @@ int RM_HashGet(RedisModuleKey *key, int flags, ...) {
/* Query the hash for existence or value object. */
if (flags & REDISMODULE_HASH_EXISTS) {
existsptr = va_arg(ap,int*);
if (key->value)
*existsptr = hashTypeExists(key->value,field->ptr);
else
if (key->value) {
int isHashDeleted;
*existsptr = hashTypeExists(key->db, key->value, field->ptr, &isHashDeleted);
/* hash-field-expiration is not exposed to modules */
serverAssert(isHashDeleted == 0);
} else {
*existsptr = 0;
}
} else {
int isHashDeleted;
valueptr = va_arg(ap,RedisModuleString**);
if (key->value) {
*valueptr = hashTypeGetValueObject(key->value,field->ptr);
*valueptr = hashTypeGetValueObject(key->db,key->value,field->ptr, &isHashDeleted);
/* Currently hash-field-expiration is not exposed to modules */
serverAssert(isHashDeleted == 0);
if (*valueptr) {
robj *decoded = getDecodedObject(*valueptr);
decrRefCount(*valueptr);

View file

@ -209,6 +209,9 @@ static inline int mstrIsMetaAttached(mstr s) { return s[-1] & MSTR_META_MASK; }
/* return whether if a specific flag-index is set */
static inline int mstrGetFlag(mstr s, int flagIdx) { return *mstrFlagsRef(s) & (1 << flagIdx); }
/* DEBUG */
void mstrPrint(mstr s, struct mstrKind *kind, int verbose);
/* See comment above about MSTR-ALIGNMENT(2) */
static_assert(sizeof(struct mstrhdr5 ) % 2 == 1, "must be odd");
static_assert(sizeof(struct mstrhdr8 ) % 2 == 1, "must be odd");

View file

@ -3757,7 +3757,9 @@ void replaceClientCommandVector(client *c, int argc, robj **argv) {
* 1. Make sure there are no "holes" and all the arguments are set.
* 2. If the original argument vector was longer than the one we
* want to end with, it's up to the caller to set c->argc and
* free the no longer used objects on c->argv. */
* free the no longer used objects on c->argv.
* 3. To remove argument at i'th index, pass NULL as new value
*/
void rewriteClientCommandArgument(client *c, int i, robj *newval) {
robj *oldval;
retainOriginalCommandVector(c);
@ -3775,9 +3777,18 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
}
oldval = c->argv[i];
if (oldval) c->argv_len_sum -= getStringObjectLen(oldval);
if (newval) c->argv_len_sum += getStringObjectLen(newval);
c->argv[i] = newval;
incrRefCount(newval);
if (newval) {
c->argv[i] = newval;
incrRefCount(newval);
c->argv_len_sum += getStringObjectLen(newval);
} else {
/* move the remaining arguments one step left */
for (int j = i+1; j < c->argc; j++) {
c->argv[j-1] = c->argv[j];
}
c->argv[--c->argc] = NULL;
}
if (oldval) decrRefCount(oldval);
/* If this is the command name make sure to fix c->cmd. */

195
src/rdb.c
View file

@ -270,7 +270,7 @@ int rdbEncodeInteger(long long value, unsigned char *enc) {
void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) {
int plainFlag = flags & RDB_LOAD_PLAIN;
int sdsFlag = flags & RDB_LOAD_SDS;
int hfldFlag = flags & RDB_LOAD_HFLD;
int hfldFlag = flags & (RDB_LOAD_HFLD|RDB_LOAD_HFLD_TTL);
int encode = flags & RDB_LOAD_ENC;
unsigned char enc[4];
long long val;
@ -305,7 +305,7 @@ void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) {
} else if (sdsFlag) {
p = sdsnewlen(SDS_NOINIT,len);
} else { /* hfldFlag */
p = hfieldNew(NULL, len, 0);
p = hfieldNew(NULL, len, (flags&RDB_LOAD_HFLD) ? 0 : 1);
}
memcpy(p,buf,len);
return p;
@ -377,7 +377,7 @@ ssize_t rdbSaveLzfStringObject(rio *rdb, unsigned char *s, size_t len) {
void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) {
int plainFlag = flags & RDB_LOAD_PLAIN;
int sdsFlag = flags & RDB_LOAD_SDS;
int hfldFlag = flags & RDB_LOAD_HFLD;
int hfldFlag = flags & (RDB_LOAD_HFLD | RDB_LOAD_HFLD_TTL);
int robjFlag = (!(plainFlag || sdsFlag || hfldFlag)); /* not plain/sds/hfld */
uint64_t len, clen;
@ -397,7 +397,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) {
} else if (sdsFlag || robjFlag) {
val = sdstrynewlen(SDS_NOINIT,len);
} else { /* hfldFlag */
val = hfieldTryNew(NULL, len, 0);
val = hfieldTryNew(NULL, len, (flags&RDB_LOAD_HFLD) ? 0 : 1);
}
if (!val) {
@ -505,6 +505,7 @@ ssize_t rdbSaveStringObject(rio *rdb, robj *obj) {
* instead of a Redis object with an sds in it.
* RDB_LOAD_SDS: Return an SDS string instead of a Redis object.
* RDB_LOAD_HFLD: Return a hash field object (mstr)
* RDB_LOAD_HFLD_TTL: Return a hash field with TTL metadata reserved
*
* On I/O error NULL is returned.
*/
@ -512,7 +513,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
void *buf;
int plainFlag = flags & RDB_LOAD_PLAIN;
int sdsFlag = flags & RDB_LOAD_SDS;
int hfldFlag = flags & RDB_LOAD_HFLD;
int hfldFlag = flags & (RDB_LOAD_HFLD|RDB_LOAD_HFLD_TTL);
int robjFlag = (!(plainFlag || sdsFlag || hfldFlag)); /* not plain/sds/hfld */
int isencoded;
@ -555,7 +556,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
} else if (sdsFlag) {
buf = sdstrynewlen(SDS_NOINIT,len);
} else { /* hfldFlag */
buf = hfieldTryNew(NULL, len, 0);
buf = hfieldTryNew(NULL, len, (flags&RDB_LOAD_HFLD) ? 0 : 1);
}
if (!buf) {
serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
@ -1888,10 +1889,18 @@ int lpValidateIntegrityAndDups(unsigned char *lp, size_t size, int deep, int tup
/* Load a Redis object of the specified type from the specified file.
* On success a newly allocated object is returned, otherwise NULL.
* When the function returns NULL and if 'error' is not NULL, the
* integer pointed by 'error' is set to the type of error that occurred */
robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
int *error) {
*
* error - When the function returns NULL and if 'error' is not NULL, the
* integer pointed by 'error' is set to the type of error that occurred
* minExpiredField - If loading a hash with expiration on fields, then this value
* will be set to the minimum expire time found in the hash fields. If there are
* no fields with expiration or it is not a hash, then it will set be to
* EB_EXPIRE_TIME_INVALID.
*/
robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int *error,
uint64_t *minExpiredField)
{
uint64_t minExpField = EB_EXPIRE_TIME_INVALID;
robj *o = NULL, *ele, *dec;
uint64_t len;
unsigned int i;
@ -2110,7 +2119,6 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
sds value;
hfield field;
dict *dupSearchDict = NULL;
ebuckets *hexpires = (db != NULL ? &db->hexpires : NULL);
len = rdbLoadLen(rdb, NULL);
if (len == RDB_LENERR) return NULL;
@ -2120,7 +2128,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
/* Too many entries? Use a hash table right from the start. */
if (len > server.hash_max_listpack_entries)
hashTypeConvert(o, OBJ_ENCODING_HT, hexpires);
hashTypeConvert(o, OBJ_ENCODING_HT, NULL);
else if (deep_integrity_validation) {
/* In this mode, we need to guarantee that the server won't crash
* later when the ziplist is converted to a dict.
@ -2164,7 +2172,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
sdslen(value) > server.hash_max_listpack_value ||
!lpSafeToAdd(o->ptr, hfieldlen(field) + sdslen(value)))
{
hashTypeConvert(o, OBJ_ENCODING_HT, hexpires);
hashTypeConvert(o, OBJ_ENCODING_HT, NULL);
dictUseStoredKeyApi((dict *)o->ptr, 1);
ret = dictAdd((dict*)o->ptr, field, value);
dictUseStoredKeyApi((dict *)o->ptr, 0);
@ -2233,13 +2241,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
/* All pairs should be read by now */
serverAssert(len == 0);
} else if (rdbtype == RDB_TYPE_HASH_METADATA) {
size_t fieldLen;
sds value, field;
uint64_t expire, minExpire = EB_EXPIRE_TIME_INVALID;
mstime_t now = mstime();
uint64_t expireAt;
dict *dupSearchDict = NULL;
ebuckets *hexpires = (db != NULL ? &db->hexpires : NULL);
len = rdbLoadLen(rdb, NULL);
if (len == RDB_LENERR) return NULL;
@ -2248,11 +2253,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
o = createHashObject();
/* Too many entries? Use a hash table right from the start. */
if (len > server.hash_max_listpack_entries) {
hashTypeConvert(o, OBJ_ENCODING_HT, hexpires);
hashTypeConvert(o, OBJ_ENCODING_HT, NULL);
dictTypeAddMeta((dict**)&o->ptr, &mstrHashDictTypeWithHFE);
initDictExpireMetadata(key, o);
} else {
hashTypeConvert(o, OBJ_ENCODING_LISTPACK_EX, hexpires);
hashTypeConvert(o, OBJ_ENCODING_LISTPACK_EX, NULL);
if (deep_integrity_validation) {
/* In this mode, we need to guarantee that the server won't crash
* later when the listpack is converted to a dict.
@ -2266,20 +2271,25 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
len--;
/* read the TTL */
if (rdbLoadLenByRef(rdb, NULL, &expire) == -1) {
if (rdbLoadLenByRef(rdb, NULL, &expireAt) == -1) {
serverLog(LL_WARNING, "failed reading hash TTL");
decrRefCount(o);
if (dupSearchDict != NULL) dictRelease(dupSearchDict);
return NULL;
}
if (expire > EB_EXPIRE_TIME_MAX) {
rdbReportCorruptRDB("invalid expire time: %llu", (unsigned long long)expire);
if (expireAt > EB_EXPIRE_TIME_MAX) {
rdbReportCorruptRDB("invalid expireAt time: %llu", (unsigned long long)expireAt);
decrRefCount(o);
return NULL;
}
/* read the field name */
if ((field = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, &fieldLen)) == NULL) {
/* if needed create field with TTL metadata */
if (expireAt !=0)
field = rdbGenericLoadStringObject(rdb, RDB_LOAD_HFLD_TTL, &fieldLen);
else
field = rdbGenericLoadStringObject(rdb, RDB_LOAD_HFLD, &fieldLen);
if (field == NULL) {
serverLog(LL_WARNING, "failed reading hash field");
decrRefCount(o);
if (dupSearchDict != NULL) dictRelease(dupSearchDict);
@ -2291,40 +2301,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
serverLog(LL_WARNING, "failed reading hash value");
decrRefCount(o);
if (dupSearchDict != NULL) dictRelease(dupSearchDict);
sdsfree(field);
hfieldFree(field);
return NULL;
}
/* Check if the hash field already expired. This function is used when
* loading an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is
* responsible for hash field expiry. If we would expire hash fields here,
* the snapshot taken by the master may not be reflected on the slave.
* Similarly, if the base AOF is RDB format, we want to load all
* the hash fields there are, since the log of operations in the incr AOF
* is assumed to work in the exact keyspace state.
* Expired hash fields on the master are silently discarded.
* Note that if all fields in a hash has expired, the hash would not be
* created in memory (because it is created on the first valid field), and
* thus the key would be discarded as an "empty key" */
if (expire != 0 && iAmMaster() && ((mstime_t)expire < now) && /* note: expire was saved to RDB as unix-time in milliseconds */
!(rdbflags & RDBFLAGS_AOF_PREAMBLE))
{
/* TODO: consider replication (like in rdbLoadAddKeyToDb) */
server.rdb_last_load_hash_fields_expired++;
sdsfree(field);
sdsfree(value);
continue;
}
/* keep the nearest expiration to connect listpack object to db expiry */
if ((expire != 0) && (expire < minExpire)) minExpire = expire;
if ((expireAt != 0) && (expireAt < minExpField)) minExpField = expireAt;
/* store the values read - either to listpack or dict */
if (o->encoding == OBJ_ENCODING_LISTPACK_EX) {
/* integrity - check for key duplication (if required) */
if (dupSearchDict) {
sds field_dup = sdsnewlen(field, sdslen(field));
sds field_dup = sdsnewlen(field, hfieldlen(field));
if (dictAdd(dupSearchDict, field_dup, NULL) != DICT_OK) {
rdbReportCorruptRDB("Hash with dup elements");
@ -2332,18 +2320,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
decrRefCount(o);
sdsfree(field_dup);
sdsfree(value);
sdsfree(field);
hfieldFree(field);
return NULL;
}
}
/* check if the values can be saved to listpack (or should convert to dict encoding) */
if (sdslen(field) > server.hash_max_listpack_value ||
if (hfieldlen(field) > server.hash_max_listpack_value ||
sdslen(value) > server.hash_max_listpack_value ||
!lpSafeToAdd(((listpackEx*)o->ptr)->lp, sdslen(field) + sdslen(value) + lpEntrySizeInteger(expire)))
!lpSafeToAdd(((listpackEx*)o->ptr)->lp, hfieldlen(field) + sdslen(value) + lpEntrySizeInteger(expireAt)))
{
/* convert to hash */
hashTypeConvert(o, OBJ_ENCODING_HT, hexpires);
hashTypeConvert(o, OBJ_ENCODING_HT, NULL);
if (len > DICT_HT_INITIAL_SIZE) { /* TODO: this is NOT the original len, but this is also the case for simple hash, is this a bug? */
if (dictTryExpand(o->ptr, len) != DICT_OK) {
@ -2351,36 +2339,40 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
decrRefCount(o);
if (dupSearchDict != NULL) dictRelease(dupSearchDict);
sdsfree(value);
sdsfree(field);
hfieldFree(field);
return NULL;
}
}
/* don't add the values to the new hash: the next if will catch and the values will be added there */
} else {
listpackExAddNew(o, field, value, expire);
sdsfree(field);
listpackExAddNew(o, field, hfieldlen(field),
value, sdslen(value), expireAt);
hfieldFree(field);
sdsfree(value);
}
}
if (o->encoding == OBJ_ENCODING_HT) {
/* WA for check-rdb mode, when there's no DB so can't attach expired items to ebuckets,
* or when no expiry was not set for this field */
if ((db == NULL) || (expire == 0)) {
hashTypeSet(db, o, field, value, 0);
} else {
if (hashTypeSetExRdb(db, o, field, value, expire) != C_OK) {
serverLog(LL_WARNING, "failed adding hash field %s to key %s", field, key);
decrRefCount(o);
if (dupSearchDict != NULL) dictRelease(dupSearchDict);
sdsfree(value);
sdsfree(field);
return NULL;
}
/* Add pair to hash table */
dict *d = o->ptr;
dictUseStoredKeyApi(d, 1);
int ret = dictAdd(d, field, value);
dictUseStoredKeyApi(d, 0);
/* Attach expiry to the hash field and register in hash private HFE DS */
if ((ret != DICT_ERR) && expireAt) {
dictExpireMetadata *m = (dictExpireMetadata *) dictMetadata(d);
ret = ebAdd(&m->hfe, &hashFieldExpireBucketsType, field, expireAt);
}
if (ret == DICT_ERR) {
rdbReportCorruptRDB("Duplicate hash fields detected");
sdsfree(value);
hfieldFree(field);
decrRefCount(o);
return NULL;
}
sdsfree(field);
sdsfree(value);
}
}
@ -2389,10 +2381,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
/* check for empty key (if all fields were expired) */
if (hashTypeLength(o, 0) == 0) {
decrRefCount(o);
goto emptykey;
goto expiredHash;
}
if ((db != NULL) && (minExpire != EB_EXPIRE_TIME_INVALID))
hashTypeAddToExpires(db, key, o, minExpire);
} else if (rdbtype == RDB_TYPE_LIST_QUICKLIST || rdbtype == RDB_TYPE_LIST_QUICKLIST_2) {
if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
if (len == 0) goto emptykey;
@ -2717,33 +2707,13 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
}
/* for TTL listpack, find the minimum expiry */
uint64_t minExpire = hashTypeGetNextTimeToExpire(o);
minExpField = hashTypeGetNextTimeToExpire(o);
/* Convert listpack to hash table without registering in global HFE DS,
* if has HFEs, since the listpack is not connected yet to the DB */
if (hashTypeLength(o, 0) > server.hash_max_listpack_entries)
hashTypeConvert(o, OBJ_ENCODING_HT, NULL /*db->hexpires*/);
/* check if need to convert to dict encoding */
if ((db != NULL) &&
(hashTypeLength(o, 0) > server.hash_max_listpack_entries)) /* TODO: each field length is not verified against server.hash_max_listpack_value */
{
hashTypeConvert(o, OBJ_ENCODING_HT, &db->hexpires);
/*
* hashTypeAddToExpires is presumably called from within
* the convert function (from listpackEx to dict), BUT,
* this call depends on the lpt->meta field to be updated,
* which is not the case here as hashTypeAddToExpires was
* not yet called for the listpack (which is what updating
* its meta).
* Instead, this "manual" call is added here.
* Another approach would be to have the conversion function
* find the minExpire by itself when iterating on the listpack
* instead of relying on the meta and use this value for the
* final ebAdd call.
*/
hashTypeAddToExpires(db, key, o, minExpire);
} else if (rdbtype == RDB_TYPE_HASH_LISTPACK_EX) {
/* connect the listpack to the DB-global expiry data structure */
if ((minExpire != EB_EXPIRE_TIME_INVALID) && (db != NULL)) { /* DB can be NULL when checking rdb */
hashTypeAddToExpires(db, key, o, minExpire);
}
}
break;
default:
/* totally unreachable */
@ -3128,12 +3098,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb* db, int rdbflags,
rdbReportReadError("Unknown RDB encoding type %d",rdbtype);
return NULL;
}
if (minExpiredField) *minExpiredField = minExpField;
if (error) *error = 0;
return o;
emptykey:
if (error) *error = RDB_LOAD_ERR_EMPTY_KEY;
return NULL;
expiredHash:
if (error) *error = RDB_LOAD_ERR_EXPIRED_HASH;
return NULL;
}
/* Mark that we are loading in the global state and setup the fields
@ -3303,6 +3279,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
* currently it only allow to set db object and functionLibCtx to which the data
* will be loaded (in the future it might contains more such objects). */
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx) {
uint64_t minExpiredField = EB_EXPIRE_TIME_INVALID;
uint64_t dbid = 0;
int type, rdbver;
uint64_t db_size = 0, expires_size = 0;
@ -3544,7 +3521,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
if ((key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL)
goto eoferr;
/* Read value */
val = rdbLoadObject(type,rdb,key,db,rdbflags,&error);
val = rdbLoadObject(type,rdb,key,db,&error, &minExpiredField);
/* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was
@ -3563,6 +3540,9 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
if(empty_keys_skipped++ < 10)
serverLog(LL_NOTICE, "rdbLoadObject skipping empty key: %s", key);
sdsfree(key);
} else if (error == RDB_LOAD_ERR_EXPIRED_HASH) {
/* Valid flow. Continue. */
sdsfree(key);
} else {
sdsfree(key);
goto eoferr;
@ -3607,6 +3587,11 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
}
}
/* If minExpiredField was set, then the object is hash with expiration
* on fields and need to register it in global HFE DS */
if (minExpiredField != EB_EXPIRE_TIME_INVALID)
hashTypeAddToExpires(db, key, val, minExpiredField);
/* Set the expire time if needed */
if (expiretime != -1) {
setExpire(NULL,db,&keyobj,expiretime);
@ -3652,12 +3637,12 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
if (empty_keys_skipped) {
serverLog(LL_NOTICE,
"Done loading RDB, keys loaded: %lld, keys expired: %lld, hash fields expired: %lld, empty keys skipped: %lld.",
server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired, server.rdb_last_load_hash_fields_expired, empty_keys_skipped);
"Done loading RDB, keys loaded: %lld, keys expired: %lld, empty keys skipped: %lld.",
server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired, empty_keys_skipped);
} else {
serverLog(LL_NOTICE,
"Done loading RDB, keys loaded: %lld, keys expired: %lld, hash fields expired: %lld.",
server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired, server.rdb_last_load_hash_fields_expired);
"Done loading RDB, keys loaded: %lld, keys expired: %lld.",
server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired);
}
return C_OK;

View file

@ -103,11 +103,12 @@
#define RDB_MODULE_OPCODE_STRING 5 /* String. */
/* rdbLoad...() functions flags. */
#define RDB_LOAD_NONE 0
#define RDB_LOAD_ENC (1<<0)
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
#define RDB_LOAD_HFLD (1<<3)
#define RDB_LOAD_NONE 0
#define RDB_LOAD_ENC (1<<0)
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
#define RDB_LOAD_HFLD (1<<3)
#define RDB_LOAD_HFLD_TTL (1<<4)
/* flags on the purpose of rdb save or load */
#define RDBFLAGS_NONE 0 /* No special RDB loading or saving. */
@ -119,8 +120,9 @@
/* When rdbLoadObject() returns NULL, the err flag is
* set to hold the type of error that occurred */
#define RDB_LOAD_ERR_EMPTY_KEY 1 /* Error of empty key */
#define RDB_LOAD_ERR_OTHER 2 /* Any other errors */
#define RDB_LOAD_ERR_EMPTY_KEY 1 /* Error of empty key */
#define RDB_LOAD_ERR_EXPIRED_HASH 2 /* Expired hash since all its fields are expired */
#define RDB_LOAD_ERR_OTHER 3 /* Any other errors */
ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len);
int rdbSaveType(rio *rdb, unsigned char type);
@ -141,7 +143,7 @@ int rdbSaveToFile(const char *filename);
int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags);
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid);
size_t rdbSavedObjectLen(robj *o, robj *key, int dbid);
robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb *db, int rdbflags, int *error);
robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, redisDb *db, int *error, uint64_t *minExpiredField);
void backgroundSaveDoneHandler(int exitcode, int bysignal);
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime,int dbid);
ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt);

View file

@ -331,7 +331,8 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
rdbstate.keys++;
/* Read value */
rdbstate.doing = RDB_CHECK_DOING_READ_OBJECT_VALUE;
if ((val = rdbLoadObject(type,&rdb,key->ptr,NULL,0,NULL)) == NULL) goto eoferr;
if ((val = rdbLoadObject(type,&rdb,key->ptr,NULL,NULL,NULL)) == NULL)
goto eoferr;
/* Check if the key already expired. */
if (expiretime != -1 && expiretime < now)
rdbstate.already_expired++;

View file

@ -1955,6 +1955,8 @@ void createSharedObjects(void) {
shared.persist = createStringObject("PERSIST",7);
shared.set = createStringObject("SET",3);
shared.eval = createStringObject("EVAL",4);
shared.hpexpireat = createStringObject("HPEXPIREAT",10);
shared.hdel = createStringObject("HDEL",4);
/* Shared command argument */
shared.left = createStringObject("left",4);
@ -2707,7 +2709,6 @@ void initServer(void) {
server.rdb_save_time_start = -1;
server.rdb_last_load_keys_expired = 0;
server.rdb_last_load_keys_loaded = 0;
server.rdb_last_load_hash_fields_expired = 0;
server.dirty = 0;
resetServerStats();
/* A few stats we don't want to reset: server startup time, and peak mem. */
@ -5771,7 +5772,6 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"rdb_last_cow_size:%zu\r\n", server.stat_rdb_cow_bytes,
"rdb_last_load_keys_expired:%lld\r\n", server.rdb_last_load_keys_expired,
"rdb_last_load_keys_loaded:%lld\r\n", server.rdb_last_load_keys_loaded,
"rdb_last_load_hash_fields_expired:%lld\r\n", server.rdb_last_load_hash_fields_expired,
"aof_enabled:%d\r\n", server.aof_state != AOF_OFF,
"aof_rewrite_in_progress:%d\r\n", server.child_type == CHILD_TYPE_AOF,
"aof_rewrite_scheduled:%d\r\n", server.aof_rewrite_scheduled,

View file

@ -1317,7 +1317,8 @@ struct sharedObjectsStruct {
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
*rpop, *lpop, *lpush, *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax,
*emptyscan, *multi, *exec, *left, *right, *hset, *srem, *xgroup, *xclaim,
*script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire,
*script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire,
*hdel, *hpexpireat,
*time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread,
*lastid, *ping, *setid, *keepttl, *load, *createconsumer,
*getack, *special_asterick, *special_equals, *default_username, *redacted,
@ -1802,7 +1803,6 @@ struct redisServer {
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
long long rdb_last_load_keys_expired; /* number of expired keys when loading RDB */
long long rdb_last_load_keys_loaded; /* number of loaded keys when loading RDB */
long long rdb_last_load_hash_fields_expired; /* number of expired hash fields when loading RDB */
struct saveparam *saveparams; /* Save points array for RDB */
int saveparamslen; /* Number of saving points */
char *rdb_filename; /* Name of RDB file */
@ -3166,6 +3166,20 @@ typedef struct listpackEx {
are ordered by ttl. */
} listpackEx;
/* Each dict of hash object that has fields with time-Expiration will have the
* following metadata attached to dict header */
typedef struct dictExpireMetadata {
ExpireMeta expireMeta; /* embedded ExpireMeta in dict.
To be used in order to register the hash in the
global ebuckets (i.e db->hexpires) with next,
minimum, hash-field to expire */
ebuckets hfe; /* DS of Hash Fields Expiration, associated to each hash */
sds key; /* reference to the key, same one that stored in
db->dict. Will be used from active-expiration flow
for notification and deletion of the object, if
needed. */
} dictExpireMetadata;
/* Hash data type */
#define HASH_SET_TAKE_FIELD (1<<0)
#define HASH_SET_TAKE_VALUE (1<<1)
@ -3173,8 +3187,8 @@ typedef struct listpackEx {
void hashTypeConvert(robj *o, int enc, ebuckets *hexpires);
void hashTypeTryConversion(redisDb *db, robj *subject, robj **argv, int start, int end);
int hashTypeExists(robj *o, sds key);
int hashTypeDelete(robj *o, sds key);
int hashTypeExists(redisDb *db, robj *o, sds key, int *isHashDeleted);
int hashTypeDelete(robj *o, void *key, int isSdsField);
unsigned long hashTypeLength(const robj *o, int subtractExpiredFields);
hashTypeIterator *hashTypeInitIterator(robj *subject);
void hashTypeReleaseIterator(hashTypeIterator *hi);
@ -3190,24 +3204,24 @@ void hashTypeCurrentObject(hashTypeIterator *hi, int what, unsigned char **vstr,
unsigned int *vlen, long long *vll, uint64_t *expireTime);
sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what);
hfield hashTypeCurrentObjectNewHfield(hashTypeIterator *hi);
robj *hashTypeGetValueObject(robj *o, sds field);
robj *hashTypeGetValueObject(redisDb *db, robj *o, sds field, int *isHashDeleted);
int hashTypeSet(redisDb *db, robj *o, sds field, sds value, int flags);
robj *hashTypeDup(robj *o, sds newkey, uint64_t *minHashExpire);
uint64_t hashTypeRemoveFromExpires(ebuckets *hexpires, robj *o);
void hashTypeAddToExpires(redisDb *db, sds key, robj *hashObj, uint64_t expireTime);
void hashTypeFree(robj *o);
int hashTypeIsExpired(const robj *o, uint64_t expireAt);
uint64_t hashTypeGetMinExpire(robj *o);
unsigned char *hashTypeListpackGetLp(robj *o);
uint64_t hashTypeGetMinExpire(robj *o);
void hashTypeUpdateKeyRef(robj *o, sds newkey);
ebuckets *hashTypeGetDictMetaHFE(dict *d);
void listpackExExpire(robj *o, ExpireInfo *info);
int hashTypeSetExRdb(redisDb *db, robj *o, sds field, sds value, uint64_t expire_at);
uint64_t hashTypeGetMinExpire(robj *keyObj);
uint64_t hashTypeGetNextTimeToExpire(robj *o);
void initDictExpireMetadata(sds key, robj *o);
struct listpackEx *listpackExCreate(void);
void listpackExAddNew(robj *o, sds field, sds value, uint64_t expireAt);
void listpackExAddNew(robj *o, char *field, size_t flen,
char *value, size_t vlen, uint64_t expireAt);
/* Hash-Field data type (of t_hash.c) */
hfield hfieldNew(const void *field, size_t fieldlen, int withExpireMeta);

View file

@ -94,7 +94,12 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
/* Retrieve value from hash by the field name. The returned object
* is a new object with refcount already incremented. */
o = hashTypeGetValueObject(o, fieldobj->ptr);
int isHashDeleted;
o = hashTypeGetValueObject(db, o, fieldobj->ptr, &isHashDeleted);
if (isHashDeleted)
goto noobj;
} else {
if (o->type != OBJ_STRING) goto noobj;

File diff suppressed because it is too large Load diff

View file

@ -179,6 +179,17 @@ start_server {} {
$master set $j somevalue px 10
}
##### hash-field-expiration
# Hashes of type OBJ_ENCODING_LISTPACK_EX won't be discarded during
# RDB load, even if they are expired.
$master hset myhash1 f1 v1 f2 v2 f3 v3
$master hpexpire myhash1 10 FIELDS 3 f1 f2 f3
# Hashes of type RDB_TYPE_HASH_METADATA will be discarded during RDB load.
$master config set hash-max-listpack-entries 0
$master hset myhash2 f1 v1 f2 v2
$master hpexpire myhash2 10 FIELDS 2 f1 f2
$master config set hash-max-listpack-entries 1
after 20
wait_for_condition 500 100 {

View file

@ -421,14 +421,20 @@ set server_path [tmpdir "server.partial-hfield-exp-test"]
# verifies writing and reading hash key with expiring and persistent fields
start_server [list overrides [list "dir" $server_path]] {
foreach {type lp_entries} {listpack 512 dict 0} {
test "hash field expiration save and load rdb one expired field, ($type)" {
test "HFE - save and load expired fields, expired soon after, or long after ($type)" {
r config set hash-max-listpack-entries $lp_entries
r FLUSHALL
r HMSET key a 1 b 2 c 3 d 4
r HEXPIREAT key 2524600800 FIELDS 2 a b
r HPEXPIRE key 100 FIELDS 1 d
r HMSET key a 1 b 2 c 3 d 4 e 5
# expected to be expired long after restart
r HEXPIREAT key 2524600800 FIELDS 1 a
# expected long TTL value (6 bytes) is saved and loaded correctly
r HPEXPIREAT key 188900976391764 FIELDS 1 b
# expected to be already expired after restart
r HPEXPIRE key 80 FIELDS 1 d
# expected to be expired soon after restart
r HPEXPIRE key 200 FIELDS 1 e
r save
# sleep 101 ms to make sure d will expire after restart
@ -437,14 +443,14 @@ start_server [list overrides [list "dir" $server_path]] {
wait_done_loading r
assert_equal [lsort [r hgetall key]] "1 2 3 a b c"
assert_equal [r hexpiretime key FIELDS 3 a b c] {2524600800 2524600800 -1}
assert_equal [r hpexpiretime key FIELDS 3 a b c] {2524600800000 188900976391764 -1}
assert_equal [s rdb_last_load_keys_loaded] 1
# hash keys saved in listpack encoding are loaded as a blob,
# so individual field expiry is not verified on load
if {$type eq "dict"} {
assert_equal [s rdb_last_load_hash_fields_expired] 1
# wait until expired_hash_fields equals 2
wait_for_condition 10 100 {
[s expired_hash_fields] == 2
} else {
assert_equal [s rdb_last_load_hash_fields_expired] 0
fail "Value of expired_hash_fields is not as expected"
}
}
}
@ -455,7 +461,7 @@ set server_path [tmpdir "server.all-hfield-exp-test"]
# verifies writing hash with several expired keys, and active-expiring it on load
start_server [list overrides [list "dir" $server_path]] {
foreach {type lp_entries} {listpack 512 dict 0} {
test "hash field expiration save and load rdb all fields expired, ($type)" {
test "HFE - save and load rdb all fields expired, ($type)" {
r config set hash-max-listpack-entries $lp_entries
r FLUSHALL
@ -470,53 +476,15 @@ start_server [list overrides [list "dir" $server_path]] {
restart_server 0 true false
wait_done_loading r
# hash keys saved as listpack-encoded are saved and loaded as a blob
# so individual field validation is not checked during load.
# Therefore, if the key was saved as dict it is expected that
# all 4 fields were expired during load, and thus the key was
# "declared" an empty key.
# On the other hand, if the key was saved as listpack, it is
# expected that no field was expired on load and the key was loaded,
# even though all its fields are actually expired.
if {$type eq "dict"} {
assert_equal [s rdb_last_load_keys_loaded] 0
assert_equal [s rdb_last_load_hash_fields_expired] 4
} else {
assert_equal [s rdb_last_load_keys_loaded] 1
assert_equal [s rdb_last_load_hash_fields_expired] 0
}
# it is expected that no field was expired on load and the key was
# loaded, even though all its fields are actually expired.
assert_equal [s rdb_last_load_keys_loaded] 1
# in listpack encoding, the fields (and key) will be expired by
# lazy expiry
assert_equal [r hgetall key] {}
}
}
}
set server_path [tmpdir "server.long-ttl-test"]
# verifies a long TTL value (6 bytes) is saved and loaded correctly
start_server [list overrides [list "dir" $server_path]] {
foreach {type lp_entries} {listpack 512 dict 0} {
test "hash field expiration save and load rdb long TTL, ($type)" {
r config set hash-max-listpack-entries $lp_entries
r FLUSHALL
r HSET key a 1
# set expiry to 0xabcdef987654 (6 bytes)
r HPEXPIREAT key 188900976391764 FIELDS 1 a
r save
restart_server 0 true false
wait_done_loading r
assert_equal [r hget key a ] 1
assert_equal [r hpexpiretime key FIELDS 1 a] {188900976391764}
}
}
}
set server_path [tmpdir "server.listpack-to-dict-test"]
test "save listpack, load dict" {
@ -540,7 +508,6 @@ test "save listpack, load dict" {
# first verify d was not expired during load (no expiry when loading
# a hash that was saved listpack-encoded)
assert_equal [s rdb_last_load_keys_loaded] 1
assert_equal [s rdb_last_load_hash_fields_expired] 0
# d should be lazy expired in hgetall
assert_equal [lsort [r hgetall key]] "1 2 3 a b c"
@ -570,7 +537,6 @@ test "save dict, load listpack" {
# verify d was expired during load
assert_equal [s rdb_last_load_keys_loaded] 1
assert_equal [s rdb_last_load_hash_fields_expired] 1
assert_equal [lsort [r hgetall key]] "1 2 3 a b c"
assert_match "*encoding:listpack*" [r debug object key]
@ -602,7 +568,6 @@ foreach {type lp_entries} {listpack 512 dict 0} {
}
assert_equal [s rdb_last_load_keys_loaded] 1
assert_equal [s rdb_last_load_hash_fields_expired] 0
# hgetall might lazy expire fields, so it's only called after the stat asserts
assert_equal [lsort [r hgetall key]] "1 2 5 6 a b e f"
@ -632,7 +597,6 @@ foreach {type lp_entries} {listpack 512 dict 0} {
after 500
assert_equal [s rdb_last_load_keys_loaded] 1
assert_equal [s rdb_last_load_hash_fields_expired] 0
assert_equal [s expired_hash_fields] 0
# hgetall will lazy expire fields, so it's only called after the stat asserts

View file

@ -293,6 +293,9 @@ proc findKeyWithType {r type} {
proc createComplexDataset {r ops {opt {}}} {
set useexpire [expr {[lsearch -exact $opt useexpire] != -1}]
# TODO: Remove usehexpire on next commit, when RDB will support replication
set usehexpire [expr {[lsearch -exact $opt usehexpire] != -1}]
if {[lsearch -exact $opt usetag] != -1} {
set tag "{t}"
} else {
@ -386,6 +389,10 @@ proc createComplexDataset {r ops {opt {}}} {
{hash} {
randpath {{*}$r hset $k $f $v} \
{{*}$r hdel $k $f}
if { [{*}$r hexists $k $f] && $usehexpire && rand() < 0.5} {
{*}$r hexpire $k 1000 FIELDS 1 $f
}
}
}
}

View file

@ -124,7 +124,8 @@ start_server {tags {"other"}} {
if {$::accurate} {set numops 10000} else {set numops 1000}
test {Check consistency of different data types after a reload} {
r flushdb
createComplexDataset r $numops usetag
# TODO: integrate usehexpire following next commit that will support replication
createComplexDataset r $numops {usetag usehexpire}
if {$::ignoredigest} {
set _ 1
} else {

View file

@ -64,6 +64,19 @@ proc cmp_hrandfield_result {hash_name expected_result} {
}
}
proc dumpAllHashes {client} {
set keyAndFields(0,0) 0
unset keyAndFields
# keep keys sorted for comparison
foreach key [lsort [$client keys *]] {
set fields [$client hgetall $key]
foreach f $fields {
set keyAndFields($key,$f) [$client hpexpiretime $key FIELDS 1 $f]
}
}
return [array get keyAndFields]
}
proc hrandfieldTest {activeExpireConfig} {
r debug set-active-expire $activeExpireConfig
r del myhash
@ -155,16 +168,16 @@ start_server {tags {"external:skip needs:debug"}} {
test "HPEXPIRE(AT) - Test 'LT' flag ($type)" {
r del myhash
r hset myhash field1 value1 field2 value2
r hset myhash field1 value1 field2 value2 field3 value3
assert_equal [r hpexpire myhash 1000 NX FIELDS 1 field1] [list $E_OK]
assert_equal [r hpexpire myhash 2000 NX FIELDS 1 field2] [list $E_OK]
assert_equal [r hpexpire myhash 1500 LT FIELDS 2 field1 field2] [list $E_FAIL $E_OK]
assert_equal [r hpexpire myhash 1500 LT FIELDS 3 field1 field2 field3] [list $E_FAIL $E_OK $E_OK]
r del myhash
r hset myhash field1 value1 field2 value2
r hset myhash field1 value1 field2 value2 field3 value3
assert_equal [r hpexpireat myhash [expr {([clock seconds]+1000)*1000}] NX FIELDS 1 field1] [list $E_OK]
assert_equal [r hpexpireat myhash [expr {([clock seconds]+2000)*1000}] NX FIELDS 1 field2] [list $E_OK]
assert_equal [r hpexpireat myhash [expr {([clock seconds]+1500)*1000}] LT FIELDS 2 field1 field2] [list $E_FAIL $E_OK]
assert_equal [r hpexpireat myhash [expr {([clock seconds]+1500)*1000}] LT FIELDS 3 field1 field2 field3] [list $E_FAIL $E_OK $E_OK]
}
test "HPEXPIREAT - field not exists or TTL is in the past ($type)" {
@ -190,46 +203,41 @@ start_server {tags {"external:skip needs:debug"}} {
assert_error {*invalid expire time*} {r hpexpire myhash [expr (1<<48) - [clock milliseconds] + 100 ] FIELDS 1 f1}
}
test "Lazy - doesn't delete hash that all its fields got expired ($type)" {
test "Lazy Expire - fields are lazy deleted ($type)" {
# TODO remove the SELECT once dbid will be embedded inside dict/listpack
r select 0
r debug set-active-expire 0
r flushall
r del myhash
set hash_sizes {1 15 16 17 31 32 33 40}
foreach h $hash_sizes {
for {set i 1} {$i <= $h} {incr i} {
# random expiration time
r hset hrand$h f$i v$i
r hpexpire hrand$h [expr {50 + int(rand() * 50)}] FIELDS 1 f$i
assert_equal 1 [r HEXISTS hrand$h f$i]
r hset myhash f1 v1 f2 v2 f3 v3
r hpexpire myhash 1 NX FIELDS 3 f1 f2 f3
after 5
# same expiration time
r hset same$h f$i v$i
r hpexpire same$h 100 FIELDS 1 f$i
assert_equal 1 [r HEXISTS same$h f$i]
# Verify that still exists even if all fields are expired
assert_equal 1 [r EXISTS myhash]
# same expiration time
r hset mix$h f$i v$i fieldWithoutExpire$i v$i
r hpexpire mix$h 100 FIELDS 1 f$i
assert_equal 1 [r HEXISTS mix$h f$i]
}
}
# Verify that len counts also expired fields
assert_equal 3 [r HLEN myhash]
after 150
# Trying access to expired field should delete it. Len should be updated
assert_equal 0 [r hexists myhash f1]
assert_equal 2 [r HLEN myhash]
# Trying access another expired field should delete it. Len should be updated
assert_equal "" [r hget myhash f2]
assert_equal 1 [r HLEN myhash]
# Trying access last expired field should delete it. hash shouldn't exists afterward.
assert_equal 0 [r hstrlen myhash f3]
assert_equal 0 [r HLEN myhash]
assert_equal 0 [r EXISTS myhash]
# Verify that all fields got expired but keys wasn't lazy deleted
foreach h $hash_sizes {
for {set i 1} {$i <= $h} {incr i} {
assert_equal 0 [r HEXISTS mix$h f$i]
}
assert_equal 1 [r EXISTS hrand$h]
assert_equal 1 [r EXISTS same$h]
assert_equal [expr $h * 2] [r HLEN mix$h]
}
# Restore default
r debug set-active-expire 1
}
test "Active - deletes hash that all its fields got expired ($type)" {
test "Active Expire - deletes hash that all its fields got expired ($type)" {
r flushall
set hash_sizes {1 15 16 17 31 32 33 40}
@ -375,7 +383,7 @@ start_server {tags {"external:skip needs:debug"}} {
assert_range [r httl myhash FIELDS 1 field1] 4 5
}
test "Lazy expire - delete hash with expired fields ($type)" {
test "Lazy Expire - delete hash with expired fields ($type)" {
r del myhash
r debug set-active-expire 0
r hset myhash k v
@ -403,7 +411,7 @@ start_server {tags {"external:skip needs:debug"}} {
}
test "Lazy expire - HLEN does count expired fields ($type)" {
test "Lazy Expire - HLEN does count expired fields ($type)" {
# Enforce only lazy expire
r debug set-active-expire 0
@ -432,7 +440,7 @@ start_server {tags {"external:skip needs:debug"}} {
r debug set-active-expire 1
}
test "Lazy expire - HSCAN does not report expired fields ($type)" {
test "Lazy Expire - HSCAN does not report expired fields ($type)" {
# Enforce only lazy expire
r debug set-active-expire 0
@ -493,7 +501,7 @@ start_server {tags {"external:skip needs:debug"}} {
r debug set-active-expire 1
}
test "Lazy expire - verify various HASH commands handling expired fields ($type)" {
test "Lazy Expire - verify various HASH commands handling expired fields ($type)" {
# Enforce only lazy expire
r debug set-active-expire 0
r del h1 h2 h3 h4 h5 h18
@ -676,7 +684,21 @@ start_server {tags {"external:skip needs:debug"}} {
wait_for_condition 20 10 { [r exists myhash] == 0 } else { fail "'myhash' should be expired" }
} {} {singledb:skip}
test "HPERSIST - Returns empty array if key does not exist" {
test "HMGET - returns empty entries if fields or hash expired ($type)" {
r debug set-active-expire 0
r del h1 h2
r hset h1 f1 v1 f2 v2 f3 v3
r hset h2 f1 v1 f2 v2 f3 v3
r hpexpire h1 10000000 NX FIELDS 1 f1
r hpexpire h1 1 NX FIELDS 2 f2 f3
r hpexpire h2 1 NX FIELDS 3 f1 f2 f3
after 5
assert_equal [r hmget h1 f1 f2 f3] {v1 {} {}}
assert_equal [r hmget h2 f1 f2 f3] {{} {} {}}
r debug set-active-expire 1
}
test "HPERSIST - Returns empty array if key does not exist ($type)" {
r del myhash
# Make sure we can distinguish between an empty array and a null response
r readraw 1
@ -743,6 +765,53 @@ start_server {tags {"external:skip needs:debug"}} {
assert_equal [lsort [r hgetall myhash]] "1 2 3 a b c"
assert_equal [r hexpiretime myhash FIELDS 3 a b c] {-1 -1 -1}
}
test {HINCRBY - discards pending expired field and reset its value} {
r debug set-active-expire 0
r del h1 h2
r hset h1 f1 10 f2 2
r hset h2 f1 10
assert_equal [r HINCRBY h1 f1 2] 12
assert_equal [r HINCRBY h2 f1 2] 12
r HPEXPIRE h1 10 FIELDS 1 f1
r HPEXPIRE h2 10 FIELDS 1 f1
after 15
assert_equal [r HINCRBY h1 f1 1] 1
assert_equal [r HINCRBY h2 f1 1] 1
r debug set-active-expire 1
}
test {HINCRBY - preserve expiration time of the field} {
r del h1
r hset h1 f1 10
r hpexpire h1 20 FIELDS 1 f1
assert_equal [r HINCRBY h1 f1 2] 12
assert_range [r HPTTL h1 FIELDS 1 f1] 1 20
}
test {HINCRBYFLOAT - discards pending expired field and reset its value} {
r debug set-active-expire 0
r del h1 h2
r hset h1 f1 10 f2 2
r hset h2 f1 10
assert_equal [r HINCRBYFLOAT h1 f1 2] 12
assert_equal [r HINCRBYFLOAT h2 f1 2] 12
r HPEXPIRE h1 10 FIELDS 1 f1
r HPEXPIRE h2 10 FIELDS 1 f1
after 15
assert_equal [r HINCRBYFLOAT h1 f1 1] 1
assert_equal [r HINCRBYFLOAT h2 f1 1] 1
r debug set-active-expire 1
}
test {HINCRBYFLOAT - preserve expiration time of the field} {
r del h1
r hset h1 f1 10
r hpexpire h1 20 FIELDS 1 f1
assert_equal [r HINCRBYFLOAT h1 f1 2.5] 12.5
assert_range [r HPTTL h1 FIELDS 1 f1] 1 20
}
}
r config set hash-max-listpack-entries 512
@ -878,3 +947,205 @@ start_server {tags {"external:skip needs:debug"}} {
}
}
}
start_server {tags {"external:skip needs:debug"}} {
foreach type {listpack ht} {
if {$type eq "ht"} {
r config set hash-max-listpack-entries 0
} else {
r config set hash-max-listpack-entries 512
}
test "Command rewrite and expired hash fields are propagated to replica ($type)" {
start_server {overrides {appendonly {yes} appendfsync always} tags {external:skip}} {
set aof [get_last_incr_aof_path r]
r hset h1 f1 v1 f2 v2
r hpexpire h1 20 FIELDS 1 f1
r hpexpire h1 30 FIELDS 1 f2
r hpexpire h1 30 FIELDS 1 non_exists_field
r hset h2 f1 v1 f2 v2 f3 v3 f4 v4
r hpexpire h2 40 FIELDS 2 f1 non_exists_field
r hpexpire h2 50 FIELDS 1 f2
r hpexpireat h2 [expr [clock seconds]*1000+100000] LT FIELDS 1 f3
r hexpireat h2 [expr [clock seconds]+10] NX FIELDS 1 f4
wait_for_condition 50 100 {
[r hlen h2] eq 2
} else {
fail "Field f2 of hash h2 wasn't deleted"
}
# Assert that each TTL-related command are persisted with absolute timestamps in AOF
assert_aof_content $aof {
{select *}
{hset h1 f1 v1 f2 v2}
{hpexpireat h1 * FIELDS 1 f1}
{hpexpireat h1 * FIELDS 1 f2}
{hset h2 f1 v1 f2 v2 f3 v3 f4 v4}
{hpexpireat h2 * FIELDS 2 f1 non_exists_field}
{hpexpireat h2 * FIELDS 1 f2}
{hpexpireat h2 * FIELDS 1 f3}
{hpexpireat h2 * FIELDS 1 f4}
{hdel h1 f1}
{hdel h1 f2}
{hdel h2 f1}
{hdel h2 f2}
}
}
}
test "Lazy Expire - fields are lazy deleted and propagated to replicas ($type)" {
start_server {overrides {appendonly {yes} appendfsync always} tags {external:skip}} {
r debug set-active-expire 0
set aof [get_last_incr_aof_path r]
r del myhash
r hset myhash f1 v1 f2 v2 f3 v3
r hpexpire myhash 1 NX FIELDS 3 f1 f2 f3
after 5
# Verify that still exists even if all fields are expired
assert_equal 1 [r EXISTS myhash]
# Verify that len counts also expired fields
assert_equal 3 [r HLEN myhash]
# Trying access to expired field should delete it. Len should be updated
assert_equal 0 [r hexists myhash f1]
assert_equal 2 [r HLEN myhash]
# Trying access another expired field should delete it. Len should be updated
assert_equal "" [r hget myhash f2]
assert_equal 1 [r HLEN myhash]
# Trying access last expired field should delete it. hash shouldn't exists afterward.
assert_equal 0 [r hstrlen myhash f3]
assert_equal 0 [r HLEN myhash]
assert_equal 0 [r EXISTS myhash]
wait_for_condition 50 100 { [r exists h1] == 0 } else { fail "hash h1 wasn't deleted" }
# HDEL are propagated as expected
assert_aof_content $aof {
{select *}
{hset myhash f1 v1 f2 v2 f3 v3}
{hpexpireat myhash * NX FIELDS 3 f1 f2 f3}
{hdel myhash f1}
{hdel myhash f2}
{hdel myhash f3}
}
r debug set-active-expire 1
}
}
# Start a new server with empty data and AOF file.
start_server {overrides {appendonly {yes} appendfsync always} tags {external:skip}} {
# Based on test at expire.tcl: " All time-to-live(TTL) in commands are propagated as absolute ..."
test {All TTLs in commands are propagated as absolute timestamp in milliseconds in AOF} {
set aof [get_last_incr_aof_path r]
r hset h1 f1 v1 f2 v2 f3 v3 f4 v4 f5 v5 f6 v6
r hexpireat h1 [expr [clock seconds]+100] NX FIELDS 1 f1
r hpexpireat h1 [expr [clock seconds]*1000+100000] NX FIELDS 1 f2
r hpexpire h1 100000 NX FIELDS 3 f3 f4 f5
r hexpire h1 100000 FIELDS 1 f6
r hset h5 f1 v1
assert_aof_content $aof {
{select *}
{hset h1 f1 v1 f2 v2 f3 v3 f4 v4 f5 v5 f6 v6}
{hpexpireat h1 * FIELDS 1 f1}
{hpexpireat h1 * FIELDS 1 f2}
{hpexpireat h1 * NX FIELDS 3 f3 f4 f5}
{hpexpireat h1 * FIELDS 1 f6}
{hset h5 f1 v1}
}
array set keyAndFields1 [dumpAllHashes r]
# Let some time pass and reload data from AOF
after 2000
r debug loadaof
array set keyAndFields2 [dumpAllHashes r]
# Assert that absolute TTLs are the same
assert_equal [array get keyAndFields1] [array get keyAndFields2]
} {} {needs:debug}
}
# Based on test, with same name, at expire.tcl:
test {All TTL in commands are propagated as absolute timestamp in replication stream} {
# Make sure that both relative and absolute expire commands are propagated
# Consider also comment of the test, with same name, at expire.tcl
r flushall ; # Clean up keyspace to avoid interference by keys from other tests
set repl [attach_to_replication_stream]
r hset h1 f1 v1
r hexpireat h1 [expr [clock seconds]+100] NX FIELDS 1 f1
r hset h2 f2 v2
r hpexpireat h2 [expr [clock seconds]*1000+100000] NX FIELDS 1 f2
r hset h3 f3 v3 f4 v4
r hexpire h3 100 FIELDS 3 f3 f4 non_exists_field
assert_replication_stream $repl {
{select *}
{hset h1 f1 v1}
{hpexpireat h1 * NX FIELDS 1 f1}
{hset h2 f2 v2}
{hpexpireat h2 * NX FIELDS 1 f2}
{hset h3 f3 v3 f4 v4}
{hpexpireat h3 * FIELDS 3 f3 f4 non_exists_field}
}
close_replication_stream $repl
} {} {needs:repl}
# Start another server to test replication of TTLs
start_server {tags {needs:repl external:skip}} {
# Set the outer layer server as primary
set primary [srv -1 client]
set primary_host [srv -1 host]
set primary_port [srv -1 port]
# Set this inner layer server as replica
set replica [srv 0 client]
# Server should have role slave
$replica replicaof $primary_host $primary_port
wait_for_condition 50 100 {
[s 0 role] eq {slave}
} else {
fail "Replication not started."
}
# Based on test, with same name, at expire.tcl
test {For all replicated TTL-related commands, absolute expire times are identical on primary and replica} {
# Apply each TTL-related command to a unique key on primary
$primary flushall
$primary hset h1 f v
$primary hexpireat h1 [expr [clock seconds]+10000] FIELDS 1 f
$primary hset h2 f v
$primary hpexpireat h2 [expr [clock milliseconds]+100000] FIELDS 1 f
$primary hset h3 f v
$primary hexpire h3 100 NX FIELDS 1 f
$primary hset h4 f v
$primary hpexpire h4 100000 NX FIELDS 1 f
$primary hset h5 f v
$primary hpexpireat h5 [expr [clock milliseconds]-100000] FIELDS 1 f
$primary hset h9 f v
# Wait for replica to get the keys and TTLs
assert {[$primary wait 1 0] == 1}
# Verify absolute TTLs are identical on primary and replica for all keys
# This is because TTLs are always replicated as absolute values
assert_equal [dumpAllHashes $primary] [dumpAllHashes $replica]
}
}
}
}