diff --git a/src/aof.c b/src/aof.c index 17e72febb..ec631c0e2 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1968,7 +1968,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) { hashTypeIterator *hi; long long count = 0, items = hashTypeLength(o, 0); - int isHFE = hashTypeGetMinExpire(o) != EB_EXPIRE_TIME_INVALID; + int isHFE = hashTypeGetMinExpire(o, 0) != EB_EXPIRE_TIME_INVALID; hi = hashTypeInitIterator(o); if (!isHFE) { diff --git a/src/cluster.c b/src/cluster.c index fa3e6da3a..d09a455b3 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -270,7 +270,7 @@ void restoreCommand(client *c) { /* If minExpiredField was set, then the object is hash with expiration * on fields and need to register it in global HFE DS */ if (obj->type == OBJ_HASH) { - uint64_t minExpiredField = hashTypeGetNextTimeToExpire(obj); + uint64_t minExpiredField = hashTypeGetMinExpire(obj, 1); if (minExpiredField != EB_EXPIRE_TIME_INVALID) hashTypeAddToExpires(c->db, dictGetKey(de), obj, minExpiredField); } diff --git a/src/defrag.c b/src/defrag.c index 122598c4a..78de72248 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -751,7 +751,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) { } /* Try to defrag robj and / or string value. */ - if (unlikely(ob->type == OBJ_HASH && hashTypeGetMinExpire(ob) != EB_EXPIRE_TIME_INVALID)) { + if (unlikely(ob->type == OBJ_HASH && hashTypeGetMinExpire(ob, 0) != EB_EXPIRE_TIME_INVALID)) { /* Update its reference in the ebucket while defragging it. */ newob = ebDefragItem(&db->hexpires, &hashExpireBucketsType, ob, (ebDefragFunction *)activeDefragStringOb); diff --git a/src/rdb.c b/src/rdb.c index a4749eb22..c5c0b04f6 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -699,7 +699,7 @@ int rdbSaveObjectType(rio *rdb, robj *o) { else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) return rdbSaveType(rdb,RDB_TYPE_HASH_LISTPACK_EX); else if (o->encoding == OBJ_ENCODING_HT) { - if (hashTypeGetMinExpire(o) == EB_EXPIRE_TIME_INVALID) + if (hashTypeGetMinExpire(o, 0) == EB_EXPIRE_TIME_INVALID) return rdbSaveType(rdb,RDB_TYPE_HASH); else return rdbSaveType(rdb,RDB_TYPE_HASH_METADATA); @@ -960,7 +960,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { * RDB_TYPE_HASH_METADATA layout, including tuples of [ttl][field][value]. * Otherwise, use the standard RDB_TYPE_HASH layout containing only * the tuples [field][value]. */ - int with_ttl = (hashTypeGetMinExpire(o) != EB_EXPIRE_TIME_INVALID); + int with_ttl = (hashTypeGetMinExpire(o, 0) != EB_EXPIRE_TIME_INVALID); /* save number of fields in hash */ if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) { @@ -3562,7 +3562,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin /* If minExpiredField was set, then the object is hash with expiration * on fields and need to register it in global HFE DS */ if (val->type == OBJ_HASH) { - uint64_t minExpiredField = hashTypeGetNextTimeToExpire(val); + uint64_t minExpiredField = hashTypeGetMinExpire(val, 1); if (minExpiredField != EB_EXPIRE_TIME_INVALID) hashTypeAddToExpires(db, key, val, minExpiredField); } diff --git a/src/server.c b/src/server.c index 2815f1010..11646e256 100644 --- a/src/server.c +++ b/src/server.c @@ -334,6 +334,10 @@ uint64_t dictObjHash(const void *key) { return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); } +uint64_t dictPtrHash(const void *key) { + return dictGenHashFunction((unsigned char*)&key,sizeof(key)); +} + uint64_t dictSdsHash(const void *key) { return dictGenHashFunction((unsigned char*)key, sdslen((char*)key)); } diff --git a/src/server.h b/src/server.h index dbb1b9621..25d14ebe5 100644 --- a/src/server.h +++ b/src/server.h @@ -3164,7 +3164,9 @@ robj *setTypeDup(robj *o); typedef struct listpackEx { ExpireMeta meta; /* To be used in order to register the hash in the global ebuckets (i.e. db->hexpires) with next, - minimum, hash-field to expire. */ + minimum, hash-field to expire. TTL value might be + inaccurate up-to few seconds due to optimization + consideration. */ sds key; /* reference to the key, same one that stored in db->dict. Will be used from active-expiration flow for notification and deletion of the object, if @@ -3179,7 +3181,9 @@ typedef struct dictExpireMetadata { ExpireMeta expireMeta; /* embedded ExpireMeta in dict. To be used in order to register the hash in the global ebuckets (i.e db->hexpires) with next, - minimum, hash-field to expire */ + minimum, hash-field to expire. TTL value might be + inaccurate up-to few seconds due to optimization + consideration. */ ebuckets hfe; /* DS of Hash Fields Expiration, associated to each hash */ sds key; /* reference to the key, same one that stored in db->dict. Will be used from active-expiration flow @@ -3225,13 +3229,10 @@ uint64_t hashTypeRemoveFromExpires(ebuckets *hexpires, robj *o); void hashTypeAddToExpires(redisDb *db, sds key, robj *hashObj, uint64_t expireTime); void hashTypeFree(robj *o); int hashTypeIsExpired(const robj *o, uint64_t expireAt); -uint64_t hashTypeGetMinExpire(robj *o); unsigned char *hashTypeListpackGetLp(robj *o); -uint64_t hashTypeGetMinExpire(robj *o); +uint64_t hashTypeGetMinExpire(robj *o, int accurate); void hashTypeUpdateKeyRef(robj *o, sds newkey); ebuckets *hashTypeGetDictMetaHFE(dict *d); -uint64_t hashTypeGetMinExpire(robj *keyObj); -uint64_t hashTypeGetNextTimeToExpire(robj *o); void initDictExpireMetadata(sds key, robj *o); struct listpackEx *listpackExCreate(void); void listpackExAddNew(robj *o, char *field, size_t flen, @@ -3539,6 +3540,7 @@ void startEvictionTimeProc(void); /* Keys hashing / comparison functions for dict.c hash tables. */ uint64_t dictSdsHash(const void *key); +uint64_t dictPtrHash(const void *key); uint64_t dictSdsCaseHash(const void *key); int dictSdsKeyCompare(dict *d, const void *key1, const void *key2); int dictSdsMstrKeyCompare(dict *d, const void *sdsLookup, const void *mstrStored); diff --git a/src/t_hash.c b/src/t_hash.c index 902094491..208e6775b 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -36,12 +36,21 @@ typedef enum GetFieldRes { * it was the last field in the hash. */ } GetFieldRes; +/* ActiveExpireCtx passed to hashTypeActiveExpire() */ +typedef struct ExpireCtx { + uint32_t fieldsToExpireQuota; + redisDb *db; +} ExpireCtx; + +typedef listpackEntry CommonEntry; /* extend usage beyond lp */ + /* hash field expiration (HFE) funcs */ static ExpireAction onFieldExpire(eItem item, void *ctx); static ExpireMeta* hfieldGetExpireMeta(const eItem field); static ExpireMeta *hashGetExpireMeta(const eItem hash); static void hexpireGenericCommand(client *c, const char *cmd, long long basetime, int unit); static ExpireAction hashTypeActiveExpire(eItem hashObj, void *ctx); +static uint64_t hashTypeExpire(robj *o, ExpireCtx *expireCtx, int updateGlobalHFE); static void hfieldPersist(robj *hashObj, hfield field); static void propagateHashFieldDeletion(redisDb *db, sds key, char *field, size_t fieldLen); @@ -118,12 +127,6 @@ EbucketsType hashFieldExpireBucketsType = { .itemsAddrAreOdd = 1, /* Addresses of hfield (mstr) are odd!! */ }; -/* ActiveExpireCtx passed to hashTypeActiveExpire() */ -typedef struct ActiveExpireCtx { - uint32_t fieldsToExpireQuota; - redisDb *db; -} ActiveExpireCtx; - /* OnFieldExpireCtx passed to OnFieldExpire() */ typedef struct OnFieldExpireCtx { robj *hashObj; @@ -421,7 +424,7 @@ void listpackExExpire(redisDb *db, robj *o, ExpireInfo *info) { if (expired) lpt->lp = lpDeleteRange(lpt->lp, 0, expired * 3); - min = hashTypeGetNextTimeToExpire(o); + min = hashTypeGetMinExpire(o, 1 /*accurate*/); info->nextExpireTime = min; } @@ -727,17 +730,22 @@ GetFieldRes hashTypeGetValue(redisDb *db, robj *o, sds field, unsigned char **vs serverPanic("Unknown hash encoding"); } - /* Don't expire anything while loading. It will be done later. */ - if ( (server.loading) || - (server.lazy_expire_disabled) || - ((server.masterhost) && (server.current_client && (server.current_client->flags & CLIENT_MASTER))) || - (expiredAt >= (uint64_t) commandTimeSnapshot()) ) + if (expiredAt >= (uint64_t) commandTimeSnapshot()) return GETF_OK; - /* Field is expired */ + if (server.masterhost) { + /* If CLIENT_MASTER, assume valid as long as it didn't get delete */ + if (server.current_client && (server.current_client->flags & CLIENT_MASTER)) + return GETF_OK; - /* If indicated to avoid deleting expired field */ - if (hfeFlags & HFE_LAZY_AVOID_FIELD_DEL) + /* If user client, then act as if expired, but don't delete! */ + return GETF_EXPIRED; + } + + if ((server.loading) || + (server.lazy_expire_disabled) || + (hfeFlags & HFE_LAZY_AVOID_FIELD_DEL) || + (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE))) return GETF_EXPIRED; if (o->encoding == OBJ_ENCODING_LISTPACK_EX) @@ -1142,7 +1150,8 @@ int hashTypeSetExInit(robj *key, robj *o, client *c, redisDb *db, const char *cm } } - ex->minExpire = hashTypeGetMinExpire(ex->hashObj); + /* Read minExpire from attached ExpireMeta to the hash */ + ex->minExpire = hashTypeGetMinExpire(ex->hashObj, 0); return C_OK; } @@ -1172,8 +1181,8 @@ void hashTypeSetExDone(HashTypeSetEx *ex) { if ((ex->minExpire < ex->minExpireFields)) return; - /* retrieve new expired time. It might have changed. */ - uint64_t newMinExpire = hashTypeGetNextTimeToExpire(ex->hashObj); + /* Retrieve new expired time. It might have changed. */ + uint64_t newMinExpire = hashTypeGetMinExpire(ex->hashObj, 1 /*accurate*/); /* Calculate the diff between old minExpire and newMinExpire. If it is * only few seconds, then don't have to update global HFE DS. At the worst @@ -1580,7 +1589,7 @@ void hashTypeConvertListpackEx(robj *o, int enc, ebuckets *hexpires) { dict *dict; dictExpireMetadata *dictExpireMeta; listpackEx *lpt = o->ptr; - uint64_t minExpire = hashTypeGetMinExpire(o); + uint64_t minExpire = hashTypeGetMinExpire(o, 0); if (hexpires && lpt->meta.trash != 1) ebRemove(hexpires, &hashExpireBucketsType, o); @@ -1745,7 +1754,7 @@ void hashReplyFromListpackEntry(client *c, listpackEntry *e) { * 'key' and 'val' will be set to hold the element. * The memory in them is not to be freed or modified by the caller. * 'val' can be NULL in which case it's not extracted. */ -void hashTypeRandomElement(robj *hashobj, unsigned long hashsize, listpackEntry *key, listpackEntry *val) { +void hashTypeRandomElement(robj *hashobj, unsigned long hashsize, CommonEntry *key, CommonEntry *val) { if (hashobj->encoding == OBJ_ENCODING_HT) { dictEntry *de = dictGetFairRandomKey(hashobj->ptr); hfield field = dictGetKey(de); @@ -1757,9 +1766,10 @@ void hashTypeRandomElement(robj *hashobj, unsigned long hashsize, listpackEntry val->slen = sdslen(s); } } else if (hashobj->encoding == OBJ_ENCODING_LISTPACK) { - lpRandomPair(hashobj->ptr, hashsize, key, val, 2); + lpRandomPair(hashobj->ptr, hashsize, (listpackEntry *) key, (listpackEntry *) val, 2); } else if (hashobj->encoding == OBJ_ENCODING_LISTPACK_EX) { - lpRandomPair(hashTypeListpackGetLp(hashobj), hashsize, key, val, 3); + lpRandomPair(hashTypeListpackGetLp(hashobj), hashsize, (listpackEntry *) key, + (listpackEntry *) val, 3); } else { serverPanic("Unknown hash encoding"); } @@ -1780,38 +1790,61 @@ void hashTypeRandomElement(robj *hashobj, unsigned long hashsize, listpackEntry * by returning ACT_REMOVE_EXP_ITEM. * - If hash has no more fields afterward, it will remove the hash from keyspace. */ -static ExpireAction hashTypeActiveExpire(eItem _hashObj, void *ctx) { - robj *hashObj = (robj *) _hashObj; - ActiveExpireCtx *activeExpireCtx = (ActiveExpireCtx *) ctx; +static ExpireAction hashTypeActiveExpire(eItem item, void *ctx) { + ExpireCtx *expireCtx = ctx; + + /* If no more quota left for this callback, stop */ + if (expireCtx->fieldsToExpireQuota == 0) + return ACT_STOP_ACTIVE_EXP; + + uint64_t nextExpTime = hashTypeExpire((robj *) item, expireCtx, 0); + + /* If hash has no more fields to expire or got deleted, indicate + * to remove it from HFE DB to the caller ebExpire() */ + if (nextExpTime == EB_EXPIRE_TIME_INVALID || nextExpTime == 0) { + return ACT_REMOVE_EXP_ITEM; + } else { + /* Hash has more fields to expire. Update next expiration time of the hash + * and indicate to add it back to global HFE DS */ + ebSetMetaExpTime(hashGetExpireMeta(item), nextExpTime); + return ACT_UPDATE_EXP_ITEM; + } +} + +/* Delete all expired fields from the hash and delete the hash if left empty. + * + * updateGlobalHFE - If the hash should be updated in the global HFE DS with new + * expiration time in case expired fields were deleted. + * + * Return next Expire time of the hash + * - 0 if hash got deleted + * - EB_EXPIRE_TIME_INVALID if no more fields to expire + */ +static uint64_t hashTypeExpire(robj *o, ExpireCtx *expireCtx, int updateGlobalHFE) { + uint64_t noExpireLeftRes = EB_EXPIRE_TIME_INVALID; + redisDb *db = expireCtx->db; sds keystr = NULL; ExpireInfo info = {0}; - /* If no more quota left for this callback, stop */ - if (activeExpireCtx->fieldsToExpireQuota == 0) - return ACT_STOP_ACTIVE_EXP; - - if (hashObj->encoding == OBJ_ENCODING_LISTPACK_EX) { - info = (ExpireInfo){ - .maxToExpire = activeExpireCtx->fieldsToExpireQuota, + if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { + info = (ExpireInfo) { + .maxToExpire = expireCtx->fieldsToExpireQuota, .now = commandTimeSnapshot(), .itemsExpired = 0}; - listpackExExpire(activeExpireCtx->db, hashObj, &info); + listpackExExpire(db, o, &info); server.stat_expired_hash_fields += info.itemsExpired; - keystr = ((listpackEx*)hashObj->ptr)->key; + keystr = ((listpackEx*)o->ptr)->key; } else { - serverAssert(hashObj->encoding == OBJ_ENCODING_HT); + serverAssert(o->encoding == OBJ_ENCODING_HT); - dict *d = hashObj->ptr; + dict *d = o->ptr; dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *) dictMetadata(d); - OnFieldExpireCtx onFieldExpireCtx = { - .hashObj = hashObj, - .db = activeExpireCtx->db - }; + OnFieldExpireCtx onFieldExpireCtx = { .hashObj = o, .db = db }; info = (ExpireInfo){ - .maxToExpire = activeExpireCtx->fieldsToExpireQuota, + .maxToExpire = expireCtx->fieldsToExpireQuota, .onExpireItem = onFieldExpire, .ctx = &onFieldExpireCtx, .now = commandTimeSnapshot() @@ -1822,41 +1855,98 @@ static ExpireAction hashTypeActiveExpire(eItem _hashObj, void *ctx) { } /* Update quota left */ - activeExpireCtx->fieldsToExpireQuota -= info.itemsExpired; + expireCtx->fieldsToExpireQuota -= info.itemsExpired; /* In some cases, a field might have been deleted without updating the global DS. * As a result, active-expire might not expire any fields, in such cases, * we don't need to send notifications or perform other operations for this key. */ if (info.itemsExpired) { robj *key = createStringObject(keystr, sdslen(keystr)); - notifyKeyspaceEvent(NOTIFY_HASH,"hexpired",key,activeExpireCtx->db->id); - if (hashTypeLength(hashObj, 0) == 0) { - dbDelete(activeExpireCtx->db, key); - notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,activeExpireCtx->db->id); + notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", key, db->id); + + if (updateGlobalHFE) + ebRemove(&db->hexpires, &hashExpireBucketsType, o); + + if (hashTypeLength(o, 0) == 0) { + dbDelete(db, key); + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db->id); + noExpireLeftRes = 0; + } else { + if ((updateGlobalHFE) && (info.nextExpireTime != EB_EXPIRE_TIME_INVALID)) + ebAdd(&db->hexpires, &hashExpireBucketsType, o, info.nextExpireTime); } - server.dirty++; - signalModifiedKey(NULL, activeExpireCtx->db, key); + + signalModifiedKey(NULL, db, key); decrRefCount(key); } - /* If hash has no more fields to expire, remove it from HFE DB */ - if (info.nextExpireTime == EB_EXPIRE_TIME_INVALID) { - return ACT_REMOVE_EXP_ITEM; - } else { - /* Hash has more fields to expire. Update next expiration time of the hash - * and indicate to add it back to global HFE DS */ - ebSetMetaExpTime(hashGetExpireMeta(hashObj), info.nextExpireTime); - return ACT_UPDATE_EXP_ITEM; - } + /* return 0 if hash got deleted, EB_EXPIRE_TIME_INVALID if no more fields + * with expiration. Else return next expiration time */ + return (info.nextExpireTime == EB_EXPIRE_TIME_INVALID) ? noExpireLeftRes : info.nextExpireTime; } -/* Return the next/minimum expiry time of the hash-field. This is useful if a - * field with the minimum expiry is deleted, and you want to get the next - * minimum expiry. Otherwise, consider using hashTypeGetMinExpire() which will - * be faster but less accurate. +/* Delete all expired fields in hash if needed (Currently used only by HRANDFIELD) * - * Return next min expiry. If none return EB_EXPIRE_TIME_INVALID */ -uint64_t hashTypeGetNextTimeToExpire(robj *o) { + * Return 1 if the entire hash was deleted, 0 otherwise. + * This function might be pricy in case there are many expired fields. + */ +static int hashTypeExpireIfNeeded(redisDb *db, robj *o) { + uint64_t nextExpireTime; + uint64_t minExpire = hashTypeGetMinExpire(o, 1 /*accurate*/); + + /* Nothing to expire */ + if ((mstime_t) minExpire >= commandTimeSnapshot()) + return 0; + + /* Follow expireIfNeeded() conditions of when not lazy-expire */ + if ( (server.loading) || + (server.lazy_expire_disabled) || + (server.masterhost) || /* master-client or user-client, don't delete */ + (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE))) + return 0; + + /* Take care to expire all the fields */ + ExpireCtx expireCtx = { .db = db, .fieldsToExpireQuota = UINT32_MAX }; + nextExpireTime = hashTypeExpire(o, &expireCtx, 1); + /* return 1 if the entire hash was deleted */ + return nextExpireTime == 0; +} + +/* Return the next/minimum expiry time of the hash-field. + * accurate=1 - Return the exact time by looking into the object DS. + * accurate=0 - Return the minimum expiration time maintained in expireMeta which + * might not be accurate due to optimization reasons. + * + * If not found, return EB_EXPIRE_TIME_INVALID + */ +uint64_t hashTypeGetMinExpire(robj *o, int accurate) { + ExpireMeta *expireMeta = NULL; + + if (!accurate) { + if (o->encoding == OBJ_ENCODING_LISTPACK) { + return EB_EXPIRE_TIME_INVALID; + } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { + listpackEx *lpt = o->ptr; + expireMeta = &lpt->meta; + } else { + serverAssert(o->encoding == OBJ_ENCODING_HT); + + dict *d = o->ptr; + if (!isDictWithMetaHFE(d)) + return EB_EXPIRE_TIME_INVALID; + + expireMeta = &((dictExpireMetadata *) dictMetadata(d))->expireMeta; + } + + /* Keep aside next hash-field expiry before updating HFE DS. Verify it is not trash */ + if (expireMeta->trash == 1) + return EB_EXPIRE_TIME_INVALID; + + return ebGetMetaExpTime(expireMeta); + } + + /* accurate == 1 */ + if (o->encoding == OBJ_ENCODING_LISTPACK) { return EB_EXPIRE_TIME_INVALID; } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { @@ -1873,33 +1963,6 @@ uint64_t hashTypeGetNextTimeToExpire(robj *o) { } } -/* Return the next/minimum expiry time of the hash-field. - * If not found, return EB_EXPIRE_TIME_INVALID */ -uint64_t hashTypeGetMinExpire(robj *o) { - ExpireMeta *expireMeta = NULL; - - if (o->encoding == OBJ_ENCODING_LISTPACK) { - return EB_EXPIRE_TIME_INVALID; - } else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { - listpackEx *lpt = o->ptr; - expireMeta = &lpt->meta; - } else { - serverAssert(o->encoding == OBJ_ENCODING_HT); - - dict *d = o->ptr; - if (!isDictWithMetaHFE(d)) - return EB_EXPIRE_TIME_INVALID; - - expireMeta = &((dictExpireMetadata *) dictMetadata(d))->expireMeta; - } - - /* Keep aside next hash-field expiry before updating HFE DS. Verify it is not trash */ - if (expireMeta->trash == 1) - return EB_EXPIRE_TIME_INVALID; - - return ebGetMetaExpTime(expireMeta); -} - uint64_t hashTypeRemoveFromExpires(ebuckets *hexpires, robj *o) { if (o->encoding == OBJ_ENCODING_LISTPACK) { return EB_EXPIRE_TIME_INVALID; @@ -1963,7 +2026,7 @@ void hashTypeAddToExpires(redisDb *db, sds key, robj *hashObj, uint64_t expireTi * Returns number of fields active-expired. */ uint64_t hashTypeDbActiveExpire(redisDb *db, uint32_t maxFieldsToExpire) { - ActiveExpireCtx ctx = { .db = db, .fieldsToExpireQuota = maxFieldsToExpire }; + ExpireCtx ctx = { .db = db, .fieldsToExpireQuota = maxFieldsToExpire }; ExpireInfo info = { .maxToExpire = UINT64_MAX, /* Only maxFieldsToExpire play a role */ .onExpireItem = hashTypeActiveExpire, @@ -2345,7 +2408,7 @@ void genericHgetallCommand(client *c, int flags) { /* Skip expired fields if the hash has an expire time set at global HFE DS. We could * set it to constant 1, but then it will make another lookup for each field expiration */ - int skipExpiredFields = (EB_EXPIRE_TIME_INVALID == hashTypeGetMinExpire(o)) ? 0 : 1; + int skipExpiredFields = (EB_EXPIRE_TIME_INVALID == hashTypeGetMinExpire(o, 0)) ? 0 : 1; while (hashTypeNext(hi, skipExpiredFields) != C_ERR) { if (flags & OBJ_HASH_KEY) { @@ -2431,8 +2494,6 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { if ((hash = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL || checkType(c,hash,OBJ_HASH)) return; - /* TODO: Active-expire */ - size = hashTypeLength(hash, 0); if(l >= 0) { count = (unsigned long) l; @@ -2441,6 +2502,15 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { uniq = 0; } + /* Delete all expired fields. If the entire hash got deleted then return empty array. */ + if (hashTypeExpireIfNeeded(c->db, hash)) { + addReply(c, shared.emptyarray); + return; + } + + /* Delete expired fields */ + size = hashTypeLength(hash, 0); + /* If count is zero, serve it ASAP to avoid special cases later. */ if (count == 0) { addReply(c,shared.emptyarray); @@ -2544,64 +2614,50 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { } /* CASE 3: - * The number of elements inside the hash is not greater than + * The number of elements inside the hash of type dict is not greater than * HRANDFIELD_SUB_STRATEGY_MUL times the number of requested elements. - * In this case we create a hash from scratch with all the elements, and - * subtract random elements to reach the requested number of elements. + * In this case we create an array of dictEntry pointers from the original hash, + * and subtract random elements to reach the requested number of elements. * * This is done because if the number of requested elements is just * a bit less than the number of elements in the hash, the natural approach * used into CASE 4 is highly inefficient. */ if (count*HRANDFIELD_SUB_STRATEGY_MUL > size) { /* Hashtable encoding (generic implementation) */ - dict *d = dictCreate(&sdsReplyDictType); /* without metadata! */ - dictExpand(d, size); - hashTypeIterator *hi = hashTypeInitIterator(hash); + dict *ht = hash->ptr; + dictIterator *di; + dictEntry *de; + unsigned long idx = 0; - /* Add all the elements into the temporary dictionary. */ - while ((hashTypeNext(hi, 0)) != C_ERR) { - int ret = DICT_ERR; - sds key, value = NULL; + /* Allocate a temporary array of pointers to stored key-values in dict and + * assist it to remove random elements to reach the right count. */ + struct FieldValPair { + hfield field; + sds value; + } *pairs = zmalloc(sizeof(struct FieldValPair) * size); - key = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_KEY); - if (withvalues) - value = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE); - ret = dictAdd(d, key, value); - - serverAssert(ret == DICT_OK); - } - serverAssert(dictSize(d) == size); - hashTypeReleaseIterator(hi); + /* Add all the elements into the temporary array. */ + di = dictGetIterator(ht); + while((de = dictNext(di)) != NULL) + pairs[idx++] = (struct FieldValPair) {dictGetKey(de), dictGetVal(de)}; + dictReleaseIterator(di); /* Remove random elements to reach the right count. */ while (size > count) { - dictEntry *de; - de = dictGetFairRandomKey(d); - dictUseStoredKeyApi(d, 1); - dictUnlink(d,dictGetKey(de)); - dictUseStoredKeyApi(d, 0); - sdsfree(dictGetKey(de)); - sdsfree(dictGetVal(de)); - dictFreeUnlinkedEntry(d,de); - size--; + unsigned long toDiscardIdx = rand() % size; + pairs[toDiscardIdx] = pairs[--size]; } - /* Reply with what's in the dict and release memory */ - dictIterator *di; - dictEntry *de; - di = dictGetIterator(d); - while ((de = dictNext(di)) != NULL) { - sds key = dictGetKey(de); - sds value = dictGetVal(de); + /* Reply with what's in the array */ + for (idx = 0; idx < size; idx++) { if (withvalues && c->resp > 2) addReplyArrayLen(c,2); - addReplyBulkSds(c, key); + addReplyBulkCBuffer(c, pairs[idx].field, hfieldlen(pairs[idx].field)); if (withvalues) - addReplyBulkSds(c, value); + addReplyBulkCBuffer(c, pairs[idx].value, sdslen(pairs[idx].value)); } - dictReleaseIterator(di); - dictRelease(d); + zfree(pairs); } /* CASE 4: We have a big hash compared to the requested number of elements. @@ -2609,43 +2665,78 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { * to the temporary hash, trying to eventually get enough unique elements * to reach the specified count. */ else { + /* Allocate temporary dictUnique to find unique elements. Just keep ref + * to key-value from the original hash. This dict relaxes hash function + * to be based on field's pointer */ + dictType uniqueDictType = { .hashFunction = dictPtrHash }; + dict *dictUnique = dictCreate(&uniqueDictType); + dictExpand(dictUnique, count); + /* Hashtable encoding (generic implementation) */ unsigned long added = 0; - listpackEntry key, value; - dict *d = dictCreate(&hashDictType); - dictExpand(d, count); + while(added < count) { - hashTypeRandomElement(hash, size, &key, withvalues? &value : NULL); + dictEntry *de = dictGetFairRandomKey(hash->ptr); + serverAssert(de != NULL); + hfield field = dictGetKey(de); + sds value = dictGetVal(de); /* Try to add the object to the dictionary. If it already exists * free it, otherwise increment the number of objects we have * in the result dictionary. */ - sds skey = hashSdsFromListpackEntry(&key); - if (dictAdd(d,skey,NULL) != DICT_OK) { - sdsfree(skey); + if (dictAdd(dictUnique, field, value) != DICT_OK) continue; - } + added++; /* We can reply right away, so that we don't need to store the value in the dict. */ if (withvalues && c->resp > 2) addReplyArrayLen(c,2); - hashReplyFromListpackEntry(c, &key); + + addReplyBulkCBuffer(c, field, hfieldlen(field)); if (withvalues) - hashReplyFromListpackEntry(c, &value); + addReplyBulkCBuffer(c, value, sdslen(value)); } /* Release memory */ - dictRelease(d); + dictRelease(dictUnique); } } -/* HRANDFIELD key [ [WITHVALUES]] */ +/* + * HRANDFIELD - Return a random field from the hash value stored at key. + * CLI usage: HRANDFIELD key [ [WITHVALUES]] + * + * Considerations for the current imp of HRANDFIELD & HFE feature: + * HRANDFIELD might access any of the fields in the hash as some of them might + * be expired. And so the Implementation of HRANDFIELD along with HFEs + * might be one of the two options: + * 1. Expire hash-fields before diving into handling HRANDFIELD. + * 2. Refine HRANDFIELD cases to deal with expired fields. + * + * Regarding the first option, as reference, the command RANDOMKEY also declares + * on O(1) complexity, yet might be stuck on a very long (but not infinite) loop + * trying to find non-expired keys. Furthermore RANDOMKEY also evicts expired keys + * along the way even though it is categorized as a read-only command. Note that + * the case of HRANDFIELD is more lightweight versus RANDOMKEY since HFEs have + * much more effective and aggressive active-expiration for fields behind. + * + * The second option introduces additional implementation complexity to HRANDFIELD. + * We could further refine HRANDFIELD cases to differentiate between scenarios + * with many expired fields versus few expired fields, and adjust based on the + * percentage of expired fields. However, this approach could still lead to long + * loops or necessitate expiring fields before selecting them. For the “lightweight” + * cases it is also expected to have a lightweight expiration. + * + * Considering the pros and cons, and the fact that HRANDFIELD is an infrequent + * command (particularly with HFEs) and the fact we have effective active-expiration + * behind for hash-fields, it is better to keep it simple and choose the option #1. + */ void hrandfieldCommand(client *c) { long l; int withvalues = 0; robj *hash; - listpackEntry ele; + CommonEntry ele; if (c->argc >= 3) { if (getRangeLongFromObjectOrReply(c,c->argv[2],-LONG_MAX,LONG_MAX,&l,NULL) != C_OK) return; @@ -2669,8 +2760,18 @@ void hrandfieldCommand(client *c) { return; } + /* Delete all expired fields. If the entire hash got deleted then return null. */ + if (hashTypeExpireIfNeeded(c->db, hash)) { + addReply(c,shared.null[c->resp]); + return; + } + hashTypeRandomElement(hash,hashTypeLength(hash, 0),&ele,NULL); - hashReplyFromListpackEntry(c, &ele); + + if (ele.sval) + addReplyBulkCBuffer(c, ele.sval, ele.slen); + else + addReplyBulkLongLong(c, ele.lval); } /*----------------------------------------------------------------------------- diff --git a/tests/unit/type/hash-field-expire.tcl b/tests/unit/type/hash-field-expire.tcl index f78f2b142..cca616b61 100644 --- a/tests/unit/type/hash-field-expire.tcl +++ b/tests/unit/type/hash-field-expire.tcl @@ -32,13 +32,6 @@ proc get_hashes_with_expiry_fields {r} { return 0 } -proc create_hash {key entries} { - r del $key - foreach entry $entries { - r hset $key [lindex $entry 0] [lindex $entry 1] - } -} - proc get_keys {l} { set res {} foreach entry $l { @@ -48,22 +41,6 @@ proc get_keys {l} { return $res } -proc cmp_hrandfield_result {hash_name expected_result} { - # Accumulate hrandfield results - unset -nocomplain myhash - array set myhash {} - for {set i 0} {$i < 100} {incr i} { - set key [r hrandfield $hash_name] - set myhash($key) 1 - } - set res [lsort [array names myhash]] - if {$res eq $expected_result} { - return 1 - } else { - return $res - } -} - proc dumpAllHashes {client} { set keyAndFields(0,0) 0 unset keyAndFields @@ -77,36 +54,6 @@ proc dumpAllHashes {client} { return [array get keyAndFields] } -proc hrandfieldTest {activeExpireConfig} { - r debug set-active-expire $activeExpireConfig - r del myhash - set contents {{field1 1} {field2 2} } - create_hash myhash $contents - - set factorValgrind [expr {$::valgrind ? 2 : 1}] - - # Set expiration time for field1 and field2 such that field1 expires first - r hpexpire myhash 1 NX FIELDS 1 field1 - r hpexpire myhash 100 NX FIELDS 1 field2 - - # On call hrandfield command lazy expire deletes field1 first - wait_for_condition 8 10 { - [cmp_hrandfield_result myhash "field2"] == 1 - } else { - fail "Expected field2 to be returned by HRANDFIELD." - } - - # On call hrandfield command lazy expire deletes field2 as well - wait_for_condition 8 20 { - [cmp_hrandfield_result myhash "{}"] == 1 - } else { - fail "Expected {} to be returned by HRANDFIELD." - } - - # restore the default value - r debug set-active-expire 1 -} - ############################### TESTS ######################################### start_server {tags {"external:skip needs:debug"}} { @@ -396,22 +343,33 @@ start_server {tags {"external:skip needs:debug"}} { r debug set-active-expire 1 } - # OPEN: To decide if to delete expired fields at start of HRANDFIELD. - # test "Test HRANDFIELD does not return expired fields ($type)" { - # hrandfieldTest 0 - # hrandfieldTest 1 - # } - - test "Test HRANDFIELD can return expired fields ($type)" { + test "Test HRANDFIELD deletes all expired fields ($type)" { r debug set-active-expire 0 - r del myhash + r flushall r hset myhash f1 v1 f2 v2 f3 v3 f4 v4 f5 v5 - r hpexpire myhash 1 NX FIELDS 4 f1 f2 f3 f4 + r hpexpire myhash 1 FIELDS 2 f1 f2 after 5 - set res [cmp_hrandfield_result myhash "f1 f2 f3 f4 f5"] - assert {$res == 1} - r debug set-active-expire 1 + assert_equal [lsort [r hrandfield myhash 5]] "f3 f4 f5" + r hpexpire myhash 1 FIELDS 3 f3 f4 f5 + after 5 + assert_equal [lsort [r hrandfield myhash 5]] "" + assert_equal [r keys *] "" + r del myhash + r hset myhash f1 v1 f2 v2 f3 v3 + r hpexpire myhash 1 FIELDS 1 f1 + after 5 + set res [r hrandfield myhash] + assert {$res == "f2" || $res == "f3"} + r hpexpire myhash 1 FIELDS 1 f2 + after 5 + assert_equal [lsort [r hrandfield myhash 5]] "f3" + r hpexpire myhash 1 FIELDS 1 f3 + after 5 + assert_equal [r hrandfield myhash] "" + assert_equal [r keys *] "" + + r debug set-active-expire 1 } test "Lazy Expire - HLEN does count expired fields ($type)" { @@ -1054,7 +1012,13 @@ start_server {tags {"external:skip needs:debug"}} { r hpexpireat h1 [expr [clock seconds]*1000+100000] NX FIELDS 1 f2 r hpexpire h1 100000 NX FIELDS 3 f3 f4 f5 r hexpire h1 100000 FIELDS 1 f6 - r hset h5 f1 v1 + + # Verify HRANDFIELD deletes expired fields and propagates it + r hset h2 f1 v1 f2 v2 + r hpexpire h2 1 FIELDS 1 f1 + r hpexpire h2 50 FIELDS 1 f2 + assert_equal [r hrandfield h4 2] "" + after 200 assert_aof_content $aof { {select *} @@ -1063,7 +1027,11 @@ start_server {tags {"external:skip needs:debug"}} { {hpexpireat h1 * FIELDS 1 f2} {hpexpireat h1 * NX FIELDS 3 f3 f4 f5} {hpexpireat h1 * FIELDS 1 f6} - {hset h5 f1 v1} + {hset h2 f1 v1 f2 v2} + {hpexpireat h2 * FIELDS 1 f1} + {hpexpireat h2 * FIELDS 1 f2} + {hdel h2 f1} + {hdel h2 f2} } array set keyAndFields1 [dumpAllHashes r] @@ -1086,6 +1054,7 @@ start_server {tags {"external:skip needs:debug"}} { r flushall ; # Clean up keyspace to avoid interference by keys from other tests set repl [attach_to_replication_stream] + # HEXPIRE/HPEXPIRE should be translated into HPEXPIREAT r hset h1 f1 v1 r hexpireat h1 [expr [clock seconds]+100] NX FIELDS 1 f1 r hset h2 f2 v2 @@ -1109,6 +1078,28 @@ start_server {tags {"external:skip needs:debug"}} { close_replication_stream $repl } {} {needs:repl} + test {HRANDFIELD delete expired fields and propagate DELs to replica} { + r flushall + set repl [attach_to_replication_stream] + + r hset h4 f1 v1 f2 v2 + r hpexpire h4 1 FIELDS 1 f1 + r hpexpire h4 2 FIELDS 1 f2 + after 100 + assert_equal [r hrandfield h4 2] "" + + + assert_replication_stream $repl { + {select *} + {hset h4 f1 v1 f2 v2} + {hpexpireat h4 * FIELDS 1 f1} + {hpexpireat h4 * FIELDS 1 f2} + {hdel h4 f1} + {hdel h4 f2} + } + close_replication_stream $repl + } {} {needs:repl} + # Start another server to test replication of TTLs start_server {tags {needs:repl external:skip}} { # Set the outer layer server as primary