diff --git a/src/Makefile b/src/Makefile index 75d18f5e6..acdd671bd 100644 --- a/src/Makefile +++ b/src/Makefile @@ -345,7 +345,7 @@ endif REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX) -REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o +REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX) REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX) diff --git a/src/acl.c b/src/acl.c index e2ec11e55..26827f97a 100644 --- a/src/acl.c +++ b/src/acl.c @@ -1903,12 +1903,6 @@ int ACLCheckAllPerm(client *c, int *idxptr) { return ACLCheckAllUserCommandPerm(c->user, c->cmd, c->argv, c->argc, idxptr); } -int totalSubscriptions(void) { - return dictSize(server.pubsub_patterns) + - dictSize(server.pubsub_channels) + - server.shard_channel_count; -} - /* If 'new' can access all channels 'original' could then return NULL; Otherwise return a list of channels that the new user can access */ list *getUpcomingChannelList(user *new, user *original) { @@ -2017,7 +2011,7 @@ int ACLShouldKillPubsubClient(client *c, list *upcoming) { * permissions specified via the upcoming argument, and kill them if so. */ void ACLKillPubsubClientsIfNeeded(user *new, user *original) { /* Do nothing if there are no subscribers. */ - if (totalSubscriptions() == 0) + if (pubsubTotalSubscriptions() == 0) return; list *channels = getUpcomingChannelList(new, original); @@ -2450,7 +2444,7 @@ sds ACLLoadFromFile(const char *filename) { /* If there are some subscribers, we need to check if we need to drop some clients. */ rax *user_channels = NULL; - if (totalSubscriptions() > 0) { + if (pubsubTotalSubscriptions() > 0) { user_channels = raxNew(); } diff --git a/src/aof.c b/src/aof.c index cb9d899bb..dfd58853c 100644 --- a/src/aof.c +++ b/src/aof.c @@ -2244,7 +2244,7 @@ int rewriteAppendOnlyFileRio(rio *aof) { int j; long key_count = 0; long long updated_time = 0; - dbIterator *dbit = NULL; + kvstoreIterator *kvs_it = NULL; /* Record timestamp at the beginning of rewriting AOF. */ if (server.aof_timestamp_enabled) { @@ -2258,15 +2258,15 @@ int rewriteAppendOnlyFileRio(rio *aof) { for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = server.db + j; - if (dbSize(db, DB_MAIN) == 0) continue; + if (kvstoreSize(db->keys) == 0) continue; /* SELECT the new DB */ if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; if (rioWriteBulkLongLong(aof,j) == 0) goto werr; - dbit = dbIteratorInit(db, DB_MAIN); + kvs_it = kvstoreIteratorInit(db->keys); /* Iterate this DB writing every entry */ - while((de = dbIteratorNext(dbit)) != NULL) { + while((de = kvstoreIteratorNext(kvs_it)) != NULL) { sds keystr; robj key, *o; long long expiretime; @@ -2331,12 +2331,12 @@ int rewriteAppendOnlyFileRio(rio *aof) { if (server.rdb_key_save_delay) debugDelay(server.rdb_key_save_delay); } - dbReleaseIterator(dbit); + kvstoreIteratorRelease(kvs_it); } return C_OK; werr: - if (dbit) dbReleaseIterator(dbit); + if (kvs_it) kvstoreIteratorRelease(kvs_it); return C_ERR; } diff --git a/src/cluster.c b/src/cluster.c index 90c1291de..c52db6b7a 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -817,7 +817,7 @@ static int shouldReturnTlsInfo(void) { } unsigned int countKeysInSlot(unsigned int slot) { - return dictSize(server.db->dict[slot]); + return kvstoreDictSize(server.db->keys, slot); } void clusterCommandHelp(client *c) { @@ -919,7 +919,7 @@ void clusterCommand(client *c) { addReplyArrayLen(c,numkeys); dictIterator *iter = NULL; dictEntry *de = NULL; - iter = dictGetIterator(server.db->dict[slot]); + iter = kvstoreDictGetIterator(server.db->keys, slot); for (unsigned int i = 0; i < numkeys; i++) { de = dictNext(iter); serverAssert(de != NULL); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 3f5073f50..f33a7d5a9 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -5104,7 +5104,7 @@ int verifyClusterConfigWithData(void) { /* Make sure we only have keys in DB0. */ for (j = 1; j < server.dbnum; j++) { - if (dbSize(&server.db[j], DB_MAIN)) return C_ERR; + if (kvstoreSize(server.db[j].keys)) return C_ERR; } /* Check that all the slots we see populated memory have a corresponding @@ -5140,7 +5140,7 @@ int verifyClusterConfigWithData(void) { /* Remove all the shard channel related information not owned by the current shard. */ static inline void removeAllNotOwnedShardChannelSubscriptions(void) { - if (!server.shard_channel_count) return; + if (!kvstoreSize(server.pubsubshard_channels)) return; clusterNode *currmaster = clusterNodeIsMaster(myself) ? myself : myself->slaveof; for (int j = 0; j < CLUSTER_SLOTS; j++) { if (server.cluster->slots[j] != currmaster) { @@ -5734,16 +5734,17 @@ void removeChannelsInSlot(unsigned int slot) { pubsubShardUnsubscribeAllChannelsInSlot(slot); } - - /* Remove all the keys in the specified hash slot. * The number of removed items is returned. */ unsigned int delKeysInSlot(unsigned int hashslot) { + if (!kvstoreDictSize(server.db->keys, hashslot)) + return 0; + unsigned int j = 0; dictIterator *iter = NULL; dictEntry *de = NULL; - iter = dictGetSafeIterator(server.db->dict[hashslot]); + iter = kvstoreDictGetSafeIterator(server.db->keys, hashslot); while((de = dictNext(iter)) != NULL) { enterExecutionUnit(1, 0); sds sdskey = dictGetKey(de); @@ -5768,8 +5769,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) { /* Get the count of the channels for a given slot. */ unsigned int countChannelsInSlot(unsigned int hashslot) { - dict *d = server.pubsubshard_channels[hashslot]; - return d ? dictSize(d) : 0; + return kvstoreDictSize(server.pubsubshard_channels, hashslot); } int clusterNodeIsMyself(clusterNode *n) { @@ -5939,7 +5939,7 @@ int clusterCommandSpecial(client *c) { } } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) { /* CLUSTER FLUSHSLOTS */ - if (dbSize(&server.db[0], DB_MAIN) != 0) { + if (kvstoreSize(server.db[0].keys) != 0) { addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS."); return 1; } @@ -6205,7 +6205,7 @@ int clusterCommandSpecial(client *c) { * slots nor keys to accept to replicate some other node. * Slaves can switch to another master without issues. */ if (clusterNodeIsMaster(myself) && - (myself->numslots != 0 || dbSize(&server.db[0], DB_MAIN) != 0)) { + (myself->numslots != 0 || kvstoreSize(server.db[0].keys) != 0)) { addReplyError(c, "To set a master the node must be empty and " "without assigned slots."); @@ -6339,7 +6339,7 @@ int clusterCommandSpecial(client *c) { /* Slaves can be reset while containing data, but not master nodes * that must be empty. */ - if (clusterNodeIsMaster(myself) && dbSize(c->db, DB_MAIN) != 0) { + if (clusterNodeIsMaster(myself) && kvstoreSize(c->db->keys) != 0) { addReplyError(c,"CLUSTER RESET can't be called with " "master nodes containing keys"); return 1; diff --git a/src/db.c b/src/db.c index e5b7387d0..ff62c51e7 100644 --- a/src/db.c +++ b/src/db.c @@ -33,20 +33,10 @@ #include "latency.h" #include "script.h" #include "functions.h" -#include "cluster.h" #include #include -/* Structure for DB iterator that allows iterating across multiple slot specific dictionaries in cluster mode. */ -struct dbIterator { - redisDb *db; - int slot; - int next_slot; - dictIterator di; - dbKeyType keyType; -}; - /*----------------------------------------------------------------------------- * C-level DB API *----------------------------------------------------------------------------*/ @@ -57,79 +47,8 @@ struct dbIterator { int expireIfNeeded(redisDb *db, robj *key, int flags); int keyIsExpired(redisDb *db, robj *key); -void cumulativeKeyCountAdd(redisDb *db, int idx, long delta, dbKeyType keyType); static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEntry *de); -dict *dbGetDictFromIterator(dbIterator *dbit) { - if (dbit->keyType == DB_MAIN) - return dbit->db->dict[dbit->slot]; - else if (dbit->keyType == DB_EXPIRES) - return dbit->db->expires[dbit->slot]; - else - serverPanic("Unknown keyType"); -} - -/* Returns next dictionary from the iterator, or NULL if iteration is complete. */ -dict *dbIteratorNextDict(dbIterator *dbit) { - if (dbit->next_slot == -1) return NULL; - dbit->slot = dbit->next_slot; - dbit->next_slot = dbGetNextNonEmptySlot(dbit->db, dbit->slot, dbit->keyType); - return dbGetDictFromIterator(dbit); -} - -int dbIteratorGetCurrentSlot(dbIterator *dbit) { - serverAssert(dbit->slot >= 0 && dbit->slot < CLUSTER_SLOTS); - return dbit->slot; -} - -/* Returns next entry from the multi slot db. */ -dictEntry *dbIteratorNext(dbIterator *dbit) { - dictEntry *de = dbit->di.d ? dictNext(&dbit->di) : NULL; - if (!de) { /* No current dict or reached the end of the dictionary. */ - dict *d = dbIteratorNextDict(dbit); - if (!d) return NULL; - - if (dbit->di.d) { - /* Before we move to the next dict, reset the iter of the previous dict. */ - dictIterator *iter = &dbit->di; - dictResetIterator(iter); - } - dictInitSafeIterator(&dbit->di, d); - de = dictNext(&dbit->di); - } - return de; -} - -/* Returns DB iterator that can be used to iterate through sub-dictionaries. - * Primary database contains only one dictionary when node runs without cluster mode, - * or 16k dictionaries (one per slot) when node runs with cluster mode enabled. - * - * The caller should free the resulting dbit with dbReleaseIterator. */ -dbIterator *dbIteratorInit(redisDb *db, dbKeyType keyType) { - dbIterator *dbit = zmalloc(sizeof(*dbit)); - dbit->db = db; - dbit->slot = -1; - dbit->keyType = keyType; - dbit->next_slot = findSlotByKeyIndex(dbit->db, 1, dbit->keyType); /* Finds first non-empty slot. */ - dictInitSafeIterator(&dbit->di, NULL); - return dbit; -} - -/* Free the dbit returned by dbIteratorInit. */ -void dbReleaseIterator(dbIterator *dbit) { - dictIterator *iter = &dbit->di; - dictResetIterator(iter); - - zfree(dbit); -} - -/* Returns next non-empty slot strictly after given one, or -1 if provided slot is the last one. */ -int dbGetNextNonEmptySlot(redisDb *db, int slot, dbKeyType keyType) { - unsigned long long next_key = cumulativeKeyCountRead(db, slot, keyType) + 1; - return next_key <= dbSize(db, keyType) ? findSlotByKeyIndex(db, next_key, keyType) : -1; -} - - /* Update LFU when an object is accessed. * Firstly, decrement the counter if the decrement time is reached. * Then logarithmically increment the counter, and update the access time. */ @@ -167,7 +86,7 @@ void updateLFU(robj *val) { * expired on replicas even if the master is lagging expiring our key via DELs * in the replication link. */ robj *lookupKey(redisDb *db, robj *key, int flags) { - dictEntry *de = dbFind(db, key->ptr, DB_MAIN); + dictEntry *de = dbFind(db, key->ptr); robj *val = NULL; if (de) { val = dictGetVal(de); @@ -274,17 +193,15 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { static void dbAddInternal(redisDb *db, robj *key, robj *val, int update_if_existing) { dictEntry *existing; int slot = getKeySlot(key->ptr); - dict *d = db->dict[slot]; - dictEntry *de = dictAddRaw(d, key->ptr, &existing); + dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key->ptr, &existing); if (update_if_existing && existing) { dbSetValue(db, key, val, 1, existing); return; } serverAssertWithInfo(NULL, key, de != NULL); - dictSetKey(d, de, sdsdup(key->ptr)); + kvstoreDictSetKey(db->keys, slot, de, sdsdup(key->ptr)); initObjectLRUOrLFU(val); - dictSetVal(d, de, val); - cumulativeKeyCountAdd(db, slot, 1, DB_MAIN); + kvstoreDictSetVal(db->keys, slot, de, val); signalKeyAsReady(db, key, val->type); notifyKeyspaceEvent(NOTIFY_NEW,"new",key,db->id); } @@ -329,12 +246,10 @@ int getKeySlot(sds key) { * caller to free the SDS string. */ int dbAddRDBLoad(redisDb *db, sds key, robj *val) { int slot = getKeySlot(key); - dict *d = db->dict[slot]; - dictEntry *de = dictAddRaw(d, key, NULL); + dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key, NULL); if (de == NULL) return 0; initObjectLRUOrLFU(val); - dictSetVal(d, de, val); - cumulativeKeyCountAdd(db, slot, 1, DB_MAIN); + kvstoreDictSetVal(db->keys, slot, de, val); return 1; } @@ -351,7 +266,8 @@ int dbAddRDBLoad(redisDb *db, sds key, robj *val) { * * The program is aborted if the key was not already present. */ static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEntry *de) { - if (!de) de = dbFind(db, key->ptr, DB_MAIN); + int slot = getKeySlot(key->ptr); + if (!de) de = kvstoreDictFind(db->keys, slot, key->ptr); serverAssertWithInfo(NULL,key,de != NULL); robj *old = dictGetVal(de); @@ -371,14 +287,11 @@ static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEnt /* Because of RM_StringDMA, old may be changed, so we need get old again */ old = dictGetVal(de); } - dict *d = db->dict[getKeySlot(key->ptr)]; - dictSetVal(d, de, val); - + kvstoreDictSetVal(db->keys, slot, de, val); if (server.lazyfree_lazy_server_del) { freeObjAsync(key,old,db->id); } else { - /* This is just decrRefCount(old); */ - d->type->valDestructor(d, old); + decrRefCount(old); } } @@ -430,18 +343,18 @@ void setKey(client *c, redisDb *db, robj *key, robj *val, int flags) { robj *dbRandomKey(redisDb *db) { dictEntry *de; int maxtries = 100; - int allvolatile = dbSize(db, DB_MAIN) == dbSize(db, DB_EXPIRES); + int allvolatile = kvstoreSize(db->keys) == kvstoreSize(db->expires); while(1) { sds key; robj *keyobj; - int randomSlot = getFairRandomSlot(db, DB_MAIN); - de = dictGetFairRandomKey(db->dict[randomSlot]); + int randomSlot = kvstoreGetFairRandomDictIndex(db->keys); + de = kvstoreDictGetFairRandomKey(db->keys, randomSlot); if (de == NULL) return NULL; key = dictGetKey(de); keyobj = createStringObject(key,sdslen(key)); - if (dbFind(db, key, DB_EXPIRES)) { + if (dbFindExpires(db, key)) { if (allvolatile && server.masterhost && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, * it could happen that all the keys are already logically @@ -462,102 +375,12 @@ robj *dbRandomKey(redisDb *db) { } } -/* Updates binary index tree (also known as Fenwick tree), increasing key count for a given slot. - * You can read more about this data structure here https://en.wikipedia.org/wiki/Fenwick_tree - * Time complexity is O(log(CLUSTER_SLOTS)). */ -void cumulativeKeyCountAdd(redisDb *db, int slot, long delta, dbKeyType keyType) { - db->sub_dict[keyType].key_count += delta; - dict *d = (keyType == DB_MAIN ? db->dict[slot] : db->expires[slot]); - if (dictSize(d) == 1) - db->sub_dict[keyType].non_empty_slots++; - if (dictSize(d) == 0) - db->sub_dict[keyType].non_empty_slots--; - - /* BIT does not need to be calculated when the cluster is turned off. */ - if (!server.cluster_enabled) return; - int idx = slot + 1; /* Unlike slots, BIT is 1-based, so we need to add 1. */ - while (idx <= CLUSTER_SLOTS) { - if (delta < 0) { - serverAssert(db->sub_dict[keyType].slot_size_index[idx] >= (unsigned long long)labs(delta)); - } - db->sub_dict[keyType].slot_size_index[idx] += delta; - idx += (idx & -idx); - } -} - -/* Returns total (cumulative) number of keys up until given slot (inclusive). - * Time complexity is O(log(CLUSTER_SLOTS)). */ -unsigned long long cumulativeKeyCountRead(redisDb *db, int slot, dbKeyType keyType) { - if (!server.cluster_enabled) { - serverAssert(slot == 0); - return dbSize(db, keyType); - } - int idx = slot + 1; - unsigned long long sum = 0; - while (idx > 0) { - sum += db->sub_dict[keyType].slot_size_index[idx]; - idx -= (idx & -idx); - } - return sum; -} - -/* Returns fair random slot, probability of each slot being returned is proportional to the number of elements that slot dictionary holds. - * This function guarantees that it returns a slot whose dict is non-empty, unless the entire db is empty. - * Time complexity of this function is O(log(CLUSTER_SLOTS)). */ -int getFairRandomSlot(redisDb *db, dbKeyType keyType) { - unsigned long target = dbSize(db, keyType) ? (randomULong() % dbSize(db, keyType)) + 1 : 0; - int slot = findSlotByKeyIndex(db, target, keyType); - return slot; -} - -/* Finds a slot containing target element in a key space ordered by slot id. - * Consider this example. Slots are represented by brackets and keys by dots: - * #0 #1 #2 #3 #4 - * [..][....][...][.......][.] - * ^ - * target - * - * In this case slot #3 contains key that we are trying to find. - * - * The return value is 0 based slot, and the range of the target is [1..dbSize], dbSize inclusive. - * - * To find the slot, we start with the root node of the binary index tree and search through its children - * from the highest index (2^14 in our case) to the lowest index. At each node, we check if the target - * value is greater than the node's value. If it is, we remove the node's value from the target and recursively - * search for the new target using the current node as the parent. - * Time complexity of this function is O(log(CLUSTER_SLOTS)) - */ -int findSlotByKeyIndex(redisDb *db, unsigned long target, dbKeyType keyType) { - if (!server.cluster_enabled || dbSize(db, keyType) == 0) return 0; - serverAssert(target <= dbSize(db, keyType)); - - int result = 0, bit_mask = 1 << CLUSTER_SLOT_MASK_BITS; - for (int i = bit_mask; i != 0; i >>= 1) { - int current = result + i; - /* When the target index is greater than 'current' node value the we will update - * the target and search in the 'current' node tree. */ - if (target > db->sub_dict[keyType].slot_size_index[current]) { - target -= db->sub_dict[keyType].slot_size_index[current]; - result = current; - } - } - /* Adjust the result to get the correct slot: - * 1. result += 1; - * After the calculations, the index of target in slot_size_index should be the next one, - * so we should add 1. - * 2. result -= 1; - * Unlike BIT(slot_size_index is 1-based), slots are 0-based, so we need to subtract 1. - * As the addition and subtraction cancel each other out, we can simply return the result. */ - return result; -} - /* Helper for sync and async delete. */ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) { dictEntry **plink; int table; int slot = getKeySlot(key->ptr); - dict *d = db->dict[slot]; - dictEntry *de = dictTwoPhaseUnlinkFind(d,key->ptr,&plink,&table); + dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, slot, key->ptr, &plink, &table); if (de) { robj *val = dictGetVal(de); /* RM_StringDMA may call dbUnshareStringValue which may free val, so we @@ -573,17 +396,13 @@ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) { if (async) { /* Because of dbUnshareStringValue, the val in de may change. */ freeObjAsync(key, dictGetVal(de), db->id); - dictSetVal(d, de, NULL); + kvstoreDictSetVal(db->keys, slot, de, NULL); } /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ - if (dictSize(db->expires[slot]) > 0) { - if (dictDelete(db->expires[slot],key->ptr) == DICT_OK) { - cumulativeKeyCountAdd(db, slot, -1, DB_EXPIRES); - } - } - dictTwoPhaseUnlinkFree(d,de,plink,table); - cumulativeKeyCountAdd(db, slot, -1, DB_MAIN); + kvstoreDictDelete(db->expires, slot, key->ptr); + + kvstoreDictTwoPhaseUnlinkFree(db->keys, slot, de, plink, table); return 1; } else { return 0; @@ -665,40 +484,16 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, } for (int j = startdb; j <= enddb; j++) { - removed += dbSize(&dbarray[j], DB_MAIN); + removed += kvstoreSize(dbarray[j].keys); if (async) { emptyDbAsync(&dbarray[j]); } else { - dbDictMetadata *metadata; - for (int k = 0; k < dbarray[j].dict_count; k++) { - dictEmpty(dbarray[j].dict[k],callback); - metadata = (dbDictMetadata *)dictMetadata(dbarray[j].dict[k]); - if (metadata->rehashing_node) { - listDelNode(server.rehashing, metadata->rehashing_node); - metadata->rehashing_node = NULL; - } - - dictEmpty(dbarray[j].expires[k],callback); - metadata = (dbDictMetadata *)dictMetadata(dbarray[j].expires[k]); - if (metadata->rehashing_node) { - listDelNode(server.rehashing, metadata->rehashing_node); - metadata->rehashing_node = NULL; - } - } + kvstoreEmpty(dbarray[j].keys, callback); + kvstoreEmpty(dbarray[j].expires, callback); } /* Because all keys of database are removed, reset average ttl. */ dbarray[j].avg_ttl = 0; dbarray[j].expires_cursor = 0; - for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { - dbarray[j].sub_dict[subdict].non_empty_slots = 0; - dbarray[j].sub_dict[subdict].key_count = 0; - dbarray[j].sub_dict[subdict].resize_cursor = 0; - if (server.cluster_enabled) { - dbarray[j].sub_dict[subdict].bucket_count = 0; - unsigned long long *slot_size_index = dbarray[j].sub_dict[subdict].slot_size_index; - memset(slot_size_index, 0, sizeof(unsigned long long) * (CLUSTER_SLOTS + 1)); - } - } } return removed; @@ -764,12 +559,9 @@ redisDb *initTempDb(void) { redisDb *tempDb = zcalloc(sizeof(redisDb)*server.dbnum); for (int i=0; iargv[1]->ptr; int plen = sdslen(pattern), allkeys, pslot = -1; - long numkeys = 0; + unsigned long numkeys = 0; void *replylen = addReplyDeferredLen(c); allkeys = (pattern[0] == '*' && plen == 1); if (server.cluster_enabled && !allkeys) { pslot = patternHashSlot(pattern, plen); } dictIterator *di = NULL; - dbIterator *dbit = NULL; + kvstoreIterator *kvs_it = NULL; if (pslot != -1) { - di = dictGetSafeIterator(c->db->dict[pslot]); + if (!kvstoreDictSize(c->db->keys, pslot)) { + /* Requested slot is empty */ + setDeferredArrayLen(c,replylen,0); + return; + } + di = kvstoreDictGetSafeIterator(c->db->keys, pslot); } else { - dbit = dbIteratorInit(c->db, DB_MAIN); + kvs_it = kvstoreIteratorInit(c->db->keys); } robj keyobj; - while ((de = di ? dictNext(di) : dbIteratorNext(dbit)) != NULL) { + while ((de = di ? dictNext(di) : kvstoreIteratorNext(kvs_it)) != NULL) { sds key = dictGetKey(de); if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) { @@ -1040,8 +828,8 @@ void keysCommand(client *c) { } if (di) dictReleaseIterator(di); - if (dbit) - dbReleaseIterator(dbit); + if (kvs_it) + kvstoreIteratorRelease(kvs_it); setDeferredArrayLen(c,replylen,numkeys); } @@ -1312,15 +1100,15 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) { }; /* A pattern may restrict all matching keys to one cluster slot. */ - int onlyslot = -1; + int onlydidx = -1; if (o == NULL && use_pattern && server.cluster_enabled) { - onlyslot = patternHashSlot(pat, patlen); + onlydidx = patternHashSlot(pat, patlen); } do { /* In cluster mode there is a separate dictionary for each slot. * If cursor is empty, we should try exploring next non-empty slot. */ if (o == NULL) { - cursor = dbScan(c->db, DB_MAIN, cursor, onlyslot, scanCallback, NULL, &data); + cursor = kvstoreScan(c->db->keys, cursor, onlydidx, scanCallback, NULL, &data); } else { cursor = dictScan(ht, cursor, scanCallback, &data); } @@ -1412,20 +1200,6 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) { listRelease(keys); } -void addSlotIdToCursor(int slot, unsigned long long *cursor) { - if (!server.cluster_enabled) return; - /* Slot id can be -1 when iteration is over and there are no more slots to visit. */ - if (slot < 0) return; - *cursor = (*cursor << CLUSTER_SLOT_MASK_BITS) | slot; -} - -int getAndClearSlotIdFromCursor(unsigned long long *cursor) { - if (!server.cluster_enabled) return 0; - int slot = (int) (*cursor & CLUSTER_SLOT_MASK); - *cursor = *cursor >> CLUSTER_SLOT_MASK_BITS; - return slot; -} - /* The SCAN command completely relies on scanGenericCommand. */ void scanCommand(client *c) { unsigned long long cursor; @@ -1434,113 +1208,7 @@ void scanCommand(client *c) { } void dbsizeCommand(client *c) { - redisDb *db = c->db; - unsigned long long int size = dbSize(db, DB_MAIN); - addReplyLongLong(c, size); -} - -unsigned long long int dbSize(redisDb *db, dbKeyType keyType) { - return db->sub_dict[keyType].key_count; -} - -int dbNonEmptySlots(redisDb *db, dbKeyType keyType) { - return db->sub_dict[keyType].non_empty_slots; -} - -/* This method provides the cumulative sum of all the dictionary buckets - * across dictionaries in a database. */ -unsigned long dbBuckets(redisDb *db, dbKeyType keyType) { - if (server.cluster_enabled) { - return db->sub_dict[keyType].bucket_count; - } else { - if (keyType == DB_MAIN) - return dictBuckets(db->dict[0]); - else if (keyType == DB_EXPIRES) - return dictBuckets(db->expires[0]); - else - serverPanic("Unknown keyType"); - } -} - -size_t dbMemUsage(redisDb *db, dbKeyType keyType) { - size_t mem = 0; - unsigned long long keys_count = dbSize(db, keyType); - mem += keys_count * dictEntryMemUsage() + - dbBuckets(db, keyType) * sizeof(dictEntry*) + - db->dict_count * (sizeof(dict) + dictMetadataSize(db->dict[0])); - if (keyType == DB_MAIN) { - mem+=keys_count * sizeof(robj); - } - return mem; -} - -dictEntry *dbFind(redisDb *db, void *key, dbKeyType keyType){ - int slot = getKeySlot(key); - if (keyType == DB_MAIN) - return dictFind(db->dict[slot], key); - else if (keyType == DB_EXPIRES) - return dictFind(db->expires[slot], key); - else - serverPanic("Unknown keyType"); -} - -/* - * This method is used to iterate over the elements of the entire database specifically across slots. - * It's a three pronged approach. - * - * 1. It uses the provided cursor `v` to retrieve the slot from it. - * 2. If the dictionary is in a valid state checked through the provided callback `dictScanValidFunction`, - * it performs a dictScan over the appropriate `keyType` dictionary of `db`. - * 3. If the slot is entirely scanned i.e. the cursor has reached 0, the next non empty slot is discovered. - * The slot information is embedded into the cursor and returned. - * - * To restrict the scan to a single cluster slot, pass a valid slot as - * 'onlyslot', otherwise pass -1. - */ -unsigned long long dbScan(redisDb *db, dbKeyType keyType, unsigned long long v, - int onlyslot, dictScanFunction *fn, - int (dictScanValidFunction)(dict *d), void *privdata) { - dict *d; - unsigned long long cursor = 0; - /* During main dictionary traversal in cluster mode, 48 lower bits in the cursor are used for positioning in the HT. - * Following 14 bits are used for the slot number, ranging from 0 to 2^14-1. - * Slot is always 0 at the start of iteration and can be incremented only in cluster mode. */ - int slot = getAndClearSlotIdFromCursor(&v); - if (onlyslot >= 0) { - if (slot < onlyslot) { - /* Fast-forward to onlyslot. */ - serverAssert(onlyslot < CLUSTER_SLOTS); - slot = onlyslot; - v = 0; - } else if (slot > onlyslot) { - /* The cursor is already past onlyslot. */ - return 0; - } - } - if (keyType == DB_MAIN) - d = db->dict[slot]; - else if (keyType == DB_EXPIRES) - d = db->expires[slot]; - else - serverPanic("Unknown keyType"); - - int is_dict_valid = (dictScanValidFunction == NULL || dictScanValidFunction(d) == C_OK); - if (is_dict_valid) { - cursor = dictScan(d, v, fn, privdata); - } else { - serverLog(LL_DEBUG, "Slot [%d] not valid for scanning, skipping.", slot); - } - /* scanning done for the current dictionary or if the scanning wasn't possible, move to the next slot. */ - if (cursor == 0 || !is_dict_valid) { - if (onlyslot >= 0) - return 0; - slot = dbGetNextNonEmptySlot(db, slot, keyType); - } - if (slot == -1) { - return 0; - } - addSlotIdToCursor(slot, &cursor); - return cursor; + addReplyLongLong(c,kvstoreSize(c->db->keys)); } void lastsaveCommand(client *c) { @@ -1837,7 +1505,7 @@ void scanDatabaseForReadyKeys(redisDb *db) { dictIterator *di = dictGetSafeIterator(db->blocking_keys); while((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); - dictEntry *kde = dbFind(db, key->ptr, DB_MAIN); + dictEntry *kde = dbFind(db, key->ptr); if (kde) { robj *value = dictGetVal(kde); signalKeyAsReady(db, key, value->type); @@ -1857,7 +1525,7 @@ void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with) { int existed = 0, exists = 0; int original_type = -1, curr_type = -1; - dictEntry *kde = dbFind(emptied, key->ptr, DB_MAIN); + dictEntry *kde = dbFind(emptied, key->ptr); if (kde) { robj *value = dictGetVal(kde); original_type = value->type; @@ -1865,7 +1533,7 @@ void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with) { } if (replaced_with) { - kde = dbFind(replaced_with, key->ptr, DB_MAIN); + kde = dbFind(replaced_with, key->ptr); if (kde) { robj *value = dictGetVal(kde); curr_type = value->type; @@ -1906,31 +1574,15 @@ int dbSwapDatabases(int id1, int id2) { /* Swap hash tables. Note that we don't swap blocking_keys, * ready_keys and watched_keys, since we want clients to * remain in the same DB they were. */ - db1->dict = db2->dict; + db1->keys = db2->keys; db1->expires = db2->expires; db1->avg_ttl = db2->avg_ttl; db1->expires_cursor = db2->expires_cursor; - db1->dict_count = db2->dict_count; - for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { - db1->sub_dict[subdict].key_count = db2->sub_dict[subdict].key_count; - db1->sub_dict[subdict].bucket_count = db2->sub_dict[subdict].bucket_count; - db1->sub_dict[subdict].non_empty_slots = db2->sub_dict[subdict].non_empty_slots; - db1->sub_dict[subdict].resize_cursor = db2->sub_dict[subdict].resize_cursor; - db1->sub_dict[subdict].slot_size_index = db2->sub_dict[subdict].slot_size_index; - } - db2->dict = aux.dict; + db2->keys = aux.keys; db2->expires = aux.expires; db2->avg_ttl = aux.avg_ttl; db2->expires_cursor = aux.expires_cursor; - db2->dict_count = aux.dict_count; - for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { - db2->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count; - db2->sub_dict[subdict].bucket_count = aux.sub_dict[subdict].bucket_count; - db2->sub_dict[subdict].non_empty_slots = aux.sub_dict[subdict].non_empty_slots; - db2->sub_dict[subdict].resize_cursor = aux.sub_dict[subdict].resize_cursor; - db2->sub_dict[subdict].slot_size_index = aux.sub_dict[subdict].slot_size_index; - } /* Now we need to handle clients blocked on lists: as an effect * of swapping the two DBs, a client that was waiting for list @@ -1964,31 +1616,16 @@ void swapMainDbWithTempDb(redisDb *tempDb) { /* Swap hash tables. Note that we don't swap blocking_keys, * ready_keys and watched_keys, since clients * remain in the same DB they were. */ - activedb->dict = newdb->dict; + activedb->keys = newdb->keys; activedb->expires = newdb->expires; activedb->avg_ttl = newdb->avg_ttl; activedb->expires_cursor = newdb->expires_cursor; - activedb->dict_count = newdb->dict_count; - for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { - activedb->sub_dict[subdict].key_count = newdb->sub_dict[subdict].key_count; - activedb->sub_dict[subdict].bucket_count = newdb->sub_dict[subdict].bucket_count; - activedb->sub_dict[subdict].non_empty_slots = newdb->sub_dict[subdict].non_empty_slots; - activedb->sub_dict[subdict].resize_cursor = newdb->sub_dict[subdict].resize_cursor; - activedb->sub_dict[subdict].slot_size_index = newdb->sub_dict[subdict].slot_size_index; - } - newdb->dict = aux.dict; + newdb->keys = aux.keys; newdb->expires = aux.expires; newdb->avg_ttl = aux.avg_ttl; newdb->expires_cursor = aux.expires_cursor; - newdb->dict_count = aux.dict_count; - for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { - newdb->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count; - newdb->sub_dict[subdict].bucket_count = aux.sub_dict[subdict].bucket_count; - newdb->sub_dict[subdict].non_empty_slots = aux.sub_dict[subdict].non_empty_slots; - newdb->sub_dict[subdict].resize_cursor = aux.sub_dict[subdict].resize_cursor; - newdb->sub_dict[subdict].slot_size_index = aux.sub_dict[subdict].slot_size_index; - } + /* Now we need to handle clients blocked on lists: as an effect * of swapping the two DBs, a client that was waiting for list * X in a given DB, may now actually be unblocked if X happens @@ -2041,13 +1678,7 @@ void swapdbCommand(client *c) { *----------------------------------------------------------------------------*/ int removeExpire(redisDb *db, robj *key) { - int slot = getKeySlot(key->ptr); - if (dictDelete(db->expires[slot],key->ptr) == DICT_OK) { - cumulativeKeyCountAdd(db, slot, -1, DB_EXPIRES); - return 1; - } else { - return 0; - } + return kvstoreDictDelete(db->expires, getKeySlot(key->ptr), key->ptr) == DICT_OK; } /* Set an expire to the specified key. If the expire is set in the context @@ -2058,15 +1689,14 @@ void setExpire(client *c, redisDb *db, robj *key, long long when) { dictEntry *kde, *de, *existing; /* Reuse the sds from the main dict in the expire dict */ - kde = dbFind(db, key->ptr, DB_MAIN); - serverAssertWithInfo(NULL,key,kde != NULL); int slot = getKeySlot(key->ptr); - de = dictAddRaw(db->expires[slot], dictGetKey(kde), &existing); + kde = kvstoreDictFind(db->keys, slot, key->ptr); + serverAssertWithInfo(NULL,key,kde != NULL); + de = kvstoreDictAddRaw(db->expires, slot, dictGetKey(kde), &existing); if (existing) { dictSetSignedIntegerVal(existing, when); } else { dictSetSignedIntegerVal(de, when); - cumulativeKeyCountAdd(db, slot, 1, DB_EXPIRES); } int writable_slave = server.masterhost && server.repl_slave_ro == 0; @@ -2079,9 +1709,8 @@ void setExpire(client *c, redisDb *db, robj *key, long long when) { long long getExpire(redisDb *db, robj *key) { dictEntry *de; - /* No expire? return ASAP */ - if (dictSize(db->expires[getKeySlot(key->ptr)]) == 0 || - (de = dbFind(db,key->ptr, DB_EXPIRES)) == NULL) return -1; + if ((de = dbFindExpires(db, key->ptr)) == NULL) + return -1; return dictGetSignedIntegerVal(de); } @@ -2228,6 +1857,13 @@ int expireIfNeeded(redisDb *db, robj *key, int flags) { return 1; } +/* CB passed to kvstoreExpand. + * The purpose is to skip expansion of unused dicts in cluster mode (all + * dicts not mapped to *my* slots) */ +static int dbExpandSkipSlot(int slot) { + return !clusterNodeCoversSlot(getMyClusterNode(), slot); +} + /* * This functions increases size of the main/expires db to match desired number. * In cluster mode resizes all individual dictionaries for slots that this node owns. @@ -2238,47 +1874,48 @@ int expireIfNeeded(redisDb *db, robj *key, int flags) { * `DICT_OK` response is for successful expansion. However ,`DICT_ERR` response signifies failure in allocation in * `dictTryExpand` call and in case of `dictExpand` call it signifies no expansion was performed. */ -int dbExpand(const redisDb *db, uint64_t db_size, dbKeyType keyType, int try_expand) { - dict *d; +static int dbExpandGeneric(kvstore *kvs, uint64_t db_size, int try_expand) { + int ret; if (server.cluster_enabled) { /* We don't know exact number of keys that would fall into each slot, but we can * approximate it, assuming even distribution, divide it by the number of slots. */ int slots = getMyShardSlotCount(); if (slots == 0) return C_OK; db_size = db_size / slots; - - for (int i = 0; i < CLUSTER_SLOTS; i++) { - if (clusterNodeCoversSlot(getMyClusterNode(), i)) { - if (keyType == DB_MAIN) { - d = db->dict[i]; - } else { - d = db->expires[i]; - } - int result = try_expand ? dictTryExpand(d, db_size) : dictExpand(d, db_size); - if (try_expand && result == DICT_ERR) { - serverLog(LL_WARNING, "Dict expansion failed for db type: %s, slot: %d", - keyType == DB_MAIN ? "main" : "expires", i); - return C_ERR; - } else if (result == DICT_ERR) { - serverLog(LL_DEBUG, "Dict expansion skipped for db type: %s, slot: %d", - keyType == DB_MAIN ? "main" : "expires", i); - } - } - } + ret = kvstoreExpand(kvs, db_size, try_expand, dbExpandSkipSlot); } else { - if (keyType == DB_MAIN) { - d = db->dict[0]; - } else { - d = db->expires[0]; - } - int result = try_expand ? dictTryExpand(d, db_size) : dictExpand(d, db_size); - if (try_expand && result == DICT_ERR) { - serverLog(LL_WARNING, "Dict expansion failed for db type: %s", - keyType == DB_MAIN ? "main" : "expires"); - return C_ERR; - } + ret = kvstoreExpand(kvs, db_size, try_expand, NULL); } - return C_OK; + + return ret? C_OK : C_ERR; +} + +int dbExpand(redisDb *db, uint64_t db_size, int try_expand) { + return dbExpandGeneric(db->keys, db_size, try_expand); +} + +int dbExpandExpires(redisDb *db, uint64_t db_size, int try_expand) { + return dbExpandGeneric(db->expires, db_size, try_expand); +} + +static dictEntry *dbFindGeneric(kvstore *kvs, void *key) { + return kvstoreDictFind(kvs, getKeySlot(key), key); +} + +dictEntry *dbFind(redisDb *db, void *key) { + return dbFindGeneric(db->keys, key); +} + +dictEntry *dbFindExpires(redisDb *db, void *key) { + return dbFindGeneric(db->expires, key); +} + +unsigned long long dbSize(redisDb *db) { + return kvstoreSize(db->keys); +} + +unsigned long long dbScan(redisDb *db, unsigned long long cursor, dictScanFunction *scan_cb, void *privdata) { + return kvstoreScan(db->keys, cursor, -1, scan_cb, NULL, privdata); } /* ----------------------------------------------------------------------------- @@ -3020,42 +2657,3 @@ int bitfieldGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResu } return 1; } - -void dbGetStats(char *buf, size_t bufsize, redisDb *db, int full, dbKeyType keyType) { - size_t l; - char *orig_buf = buf; - size_t orig_bufsize = bufsize; - dictStats *mainHtStats = NULL; - dictStats *rehashHtStats = NULL; - dict *d; - dbIterator *dbit = dbIteratorInit(db, keyType); - while ((d = dbIteratorNextDict(dbit))) { - dictStats *stats = dictGetStatsHt(d, 0, full); - if (!mainHtStats) { - mainHtStats = stats; - } else { - dictCombineStats(stats, mainHtStats); - dictFreeStats(stats); - } - if (dictIsRehashing(d)) { - stats = dictGetStatsHt(d, 1, full); - if (!rehashHtStats) { - rehashHtStats = stats; - } else { - dictCombineStats(stats, rehashHtStats); - dictFreeStats(stats); - } - } - } - dbReleaseIterator(dbit); - l = dictGetStatsMsg(buf, bufsize, mainHtStats, full); - dictFreeStats(mainHtStats); - buf += l; - bufsize -= l; - if (rehashHtStats && bufsize > 0) { - dictGetStatsMsg(buf, bufsize, rehashHtStats, full); - dictFreeStats(rehashHtStats); - } - /* Make sure there is a NULL term at the end. */ - if (orig_bufsize) orig_buf[orig_bufsize - 1] = '\0'; -} diff --git a/src/debug.c b/src/debug.c index 192b69d9c..1b89a7f6e 100644 --- a/src/debug.c +++ b/src/debug.c @@ -76,7 +76,6 @@ int bugReportStart(void); void printCrashReport(void); void bugReportEnd(int killViaSignal, int sig); void logStackTrace(void *eip, int uplevel, int current_thread); -void dbGetStats(char *buf, size_t bufsize, redisDb *db, int full, dbKeyType keyType); void sigalrmSignalHandler(int sig, siginfo_t *info, void *secret); /* ================================= Debugging ============================== */ @@ -290,15 +289,16 @@ void computeDatasetDigest(unsigned char *final) { for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; - if (dbSize(db, DB_MAIN) == 0) continue; - dbIterator *dbit = dbIteratorInit(db, DB_MAIN); + if (kvstoreSize(db->keys) == 0) + continue; + kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys); /* hash the DB id, so the same dataset moved in a different DB will lead to a different digest */ aux = htonl(j); mixDigest(final,&aux,sizeof(aux)); /* Iterate this DB writing every entry */ - while((de = dbIteratorNext(dbit)) != NULL) { + while((de = kvstoreIteratorNext(kvs_it)) != NULL) { sds key; robj *keyobj, *o; @@ -315,7 +315,7 @@ void computeDatasetDigest(unsigned char *final) { xorDigest(final,digest,20); decrRefCount(keyobj); } - dbReleaseIterator(dbit); + kvstoreIteratorRelease(kvs_it); } } @@ -606,7 +606,7 @@ NULL robj *val; char *strenc; - if ((de = dbFind(c->db, c->argv[2]->ptr, DB_MAIN)) == NULL) { + if ((de = dbFind(c->db, c->argv[2]->ptr)) == NULL) { addReplyErrorObject(c,shared.nokeyerr); return; } @@ -658,7 +658,7 @@ NULL robj *val; sds key; - if ((de = dbFind(c->db, c->argv[2]->ptr, DB_MAIN)) == NULL) { + if ((de = dbFind(c->db, c->argv[2]->ptr)) == NULL) { addReplyErrorObject(c,shared.nokeyerr); return; } @@ -719,7 +719,7 @@ NULL return; } - if (dbExpand(c->db, keys, DB_MAIN, 1) == C_ERR) { + if (dbExpand(c->db, keys, 1) == C_ERR) { addReplyError(c, "OOM in dictTryExpand"); return; } @@ -767,7 +767,7 @@ NULL /* We don't use lookupKey because a debug command should * work on logically expired keys */ dictEntry *de; - robj *o = ((de = dbFind(c->db, c->argv[j]->ptr, DB_MAIN)) == NULL) ? NULL : dictGetVal(de); + robj *o = ((de = dbFind(c->db, c->argv[j]->ptr)) == NULL) ? NULL : dictGetVal(de); if (o) xorObjectDigest(c->db,c->argv[j],digest,o); sds d = sdsempty(); @@ -911,11 +911,11 @@ NULL full = 1; stats = sdscatprintf(stats,"[Dictionary HT]\n"); - dbGetStats(buf, sizeof(buf), &server.db[dbid], full, DB_MAIN); + kvstoreGetStats(server.db[dbid].keys, buf, sizeof(buf), full); stats = sdscat(stats,buf); stats = sdscatprintf(stats,"[Expires HT]\n"); - dbGetStats(buf, sizeof(buf), &server.db[dbid], full, DB_EXPIRES); + kvstoreGetStats(server.db[dbid].expires, buf, sizeof(buf), full); stats = sdscat(stats,buf); addReplyVerbatim(c,stats,sdslen(stats),"txt"); @@ -2051,7 +2051,7 @@ void logCurrentClient(client *cc, const char *title) { dictEntry *de; key = getDecodedObject(cc->argv[1]); - de = dbFind(cc->db, key->ptr, DB_MAIN); + de = dbFind(cc->db, key->ptr); if (de) { val = dictGetVal(de); serverLog(LL_WARNING,"key '%s' found in DB containing the following object:", (char*)key->ptr); diff --git a/src/defrag.c b/src/defrag.c index 130913339..6ef3c6496 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -684,21 +684,21 @@ void defragKey(defragCtx *ctx, dictEntry *de) { /* Try to defrag the key name. */ newsds = activeDefragSds(keysds); if (newsds) { - dictSetKey(db->dict[slot], de, newsds); - if (dbSize(db, DB_EXPIRES)) { + kvstoreDictSetKey(db->keys, slot, de, newsds); + if (kvstoreSize(db->expires)) { /* We can't search in db->expires for that key after we've released * the pointer it holds, since it won't be able to do the string * compare, but we can find the entry using key hash and pointer. */ - uint64_t hash = dictGetHash(db->dict[slot], newsds); - dictEntry *expire_de = dictFindEntryByPtrAndHash(db->expires[slot], keysds, hash); - if (expire_de) dictSetKey(db->expires[slot], expire_de, newsds); + uint64_t hash = kvstoreGetHash(db->keys, newsds); + dictEntry *expire_de = kvstoreDictFindEntryByPtrAndHash(db->expires, slot, keysds, hash); + if (expire_de) kvstoreDictSetKey(db->expires, slot, expire_de, newsds); } } /* Try to defrag robj and / or string value. */ ob = dictGetVal(de); if ((newob = activeDefragStringOb(ob))) { - dictSetVal(db->dict[slot], de, newob); + kvstoreDictSetVal(db->keys, slot, de, newob); ob = newob; } @@ -856,7 +856,7 @@ int defragLaterStep(redisDb *db, int slot, long long endtime) { } /* each time we enter this function we need to fetch the key from the dict again (if it still exists) */ - dictEntry *de = dictFind(db->dict[slot], defrag_later_current_key); + dictEntry *de = kvstoreDictFind(db->keys, slot, defrag_later_current_key); key_defragged = server.stat_active_defrag_hits; do { int quit = 0; @@ -1022,13 +1022,12 @@ void activeDefragCycle(void) { db = &server.db[current_db]; cursor = 0; expires_cursor = 0; - slot = findSlotByKeyIndex(db, 1, DB_MAIN); + slot = kvstoreFindDictIndexByKeyIndex(db->keys, 1); defrag_later_item_in_progress = 0; ctx.db = db; ctx.slot = slot; } do { - dict *d = db->dict[slot]; /* before scanning the next bucket, see if we have big keys left from the previous bucket to scan */ if (defragLaterStep(db, slot, endtime)) { quit = 1; /* time is up, we didn't finish all the work */ @@ -1038,13 +1037,14 @@ void activeDefragCycle(void) { if (!defrag_later_item_in_progress) { /* Scan the keyspace dict unless we're scanning the expire dict. */ if (!expires_cursor) - cursor = dictScanDefrag(d, cursor, defragScanCallback, - &defragfns, &ctx); + cursor = kvstoreDictScanDefrag(db->keys, slot, cursor, + defragScanCallback, + &defragfns, &ctx); /* When done scanning the keyspace dict, we scan the expire dict. */ if (!cursor) - expires_cursor = dictScanDefrag(db->expires[slot], expires_cursor, - scanCallbackCountScanned, - &defragfns, NULL); + expires_cursor = kvstoreDictScanDefrag(db->expires, slot, expires_cursor, + scanCallbackCountScanned, + &defragfns, NULL); } if (!(cursor || expires_cursor)) { /* Move to the next slot only if regular and large item scanning has been completed. */ @@ -1052,7 +1052,7 @@ void activeDefragCycle(void) { defrag_later_item_in_progress = 1; continue; } - slot = dbGetNextNonEmptySlot(db, slot, DB_MAIN); + slot = kvstoreGetNextNonEmptyDictIndex(db->keys, slot); defrag_later_item_in_progress = 0; ctx.slot = slot; } diff --git a/src/dict.c b/src/dict.c index 03659381c..6e9b13150 100644 --- a/src/dict.c +++ b/src/dict.c @@ -194,16 +194,6 @@ dict *dictCreate(dictType *type) return d; } -/* Create an array of dictionaries */ -dict **dictCreateMultiple(dictType *type, int count) -{ - dict **d = zmalloc(sizeof(dict*) * count); - for (int i = 0; i < count; i++) { - d[i] = dictCreate(type); - } - return d; -} - /* Initialize the hash table */ int _dictInit(dict *d, dictType *type) { diff --git a/src/dict.h b/src/dict.h index 7a7338279..324e7512f 100644 --- a/src/dict.h +++ b/src/dict.h @@ -51,6 +51,7 @@ typedef struct dictEntry dictEntry; /* opaque */ typedef struct dict dict; typedef struct dictType { + /* Callbacks */ uint64_t (*hashFunction)(const void *key); void *(*keyDup)(dict *d, const void *key); void *(*valDup)(dict *d, const void *obj); @@ -66,6 +67,10 @@ typedef struct dictType { /* Allow a dict to carry extra caller-defined metadata. The * extra memory is initialized to 0 when a dict is allocated. */ size_t (*dictMetadataBytes)(dict *d); + + /* Data */ + void *userdata; + /* Flags */ /* The 'no_value' flag, if set, indicates that values are not used, i.e. the * dict is a set. When this flag is set, it's not possible to access the @@ -177,7 +182,6 @@ typedef enum { /* API */ dict *dictCreate(dictType *type); -dict **dictCreateMultiple(dictType *type, int count); int dictExpand(dict *d, unsigned long size); int dictTryExpand(dict *d, unsigned long size); int dictShrink(dict *d, unsigned long size); diff --git a/src/evict.c b/src/evict.c index c31d578c4..f40612627 100644 --- a/src/evict.c +++ b/src/evict.c @@ -143,11 +143,12 @@ void evictionPoolAlloc(void) { * We insert keys on place in ascending order, so keys with the smaller * idle time are on the left, and keys with the higher idle time on the * right. */ -int evictionPoolPopulate(int dbid, int slot, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) { +int evictionPoolPopulate(redisDb *db, kvstore *samplekvs, struct evictionPoolEntry *pool) { int j, k, count; dictEntry *samples[server.maxmemory_samples]; - count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples); + int slot = kvstoreGetFairRandomDictIndex(samplekvs); + count = kvstoreDictGetSomeKeys(samplekvs,slot,samples,server.maxmemory_samples); for (j = 0; j < count; j++) { unsigned long long idle; sds key; @@ -161,7 +162,8 @@ int evictionPoolPopulate(int dbid, int slot, dict *sampledict, dict *keydict, st * dictionary (but the expires one) we need to lookup the key * again in the key dictionary to obtain the value object. */ if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) { - if (sampledict != keydict) de = dictFind(keydict, key); + if (samplekvs != db->keys) + de = kvstoreDictFind(db->keys, slot, key); o = dictGetVal(de); } @@ -236,7 +238,7 @@ int evictionPoolPopulate(int dbid, int slot, dict *sampledict, dict *keydict, st pool[k].key = pool[k].cached; } pool[k].idle = idle; - pool[k].dbid = dbid; + pool[k].dbid = db->id; pool[k].slot = slot; } @@ -578,16 +580,12 @@ int performEvictions(void) { sds bestkey = NULL; int bestdbid; redisDb *db; - dict *dict; dictEntry *de; if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) || server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { struct evictionPoolEntry *pool = EvictionPoolLRU; - dbKeyType keyType = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS ? - DB_MAIN : DB_EXPIRES); - while (bestkey == NULL) { unsigned long total_keys = 0; @@ -596,17 +594,21 @@ int performEvictions(void) { * every DB. */ for (i = 0; i < server.dbnum; i++) { db = server.db+i; + kvstore *kvs; + if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { + kvs = db->keys; + } else { + kvs = db->expires; + } unsigned long sampled_keys = 0; - unsigned long current_db_keys = dbSize(db, keyType); + unsigned long current_db_keys = kvstoreSize(kvs); if (current_db_keys == 0) continue; total_keys += current_db_keys; - int l = dbNonEmptySlots(db, keyType); + int l = kvstoreNumNonEmptyDicts(kvs); /* Do not exceed the number of non-empty slots when looping. */ while (l--) { - int slot = getFairRandomSlot(db, keyType); - dict = (keyType == DB_MAIN ? db->dict[slot] : db->expires[slot]); - sampled_keys += evictionPoolPopulate(i, slot, dict, db->dict[slot], pool); + sampled_keys += evictionPoolPopulate(db, kvs, pool); /* We have sampled enough keys in the current db, exit the loop. */ if (sampled_keys >= (unsigned long) server.maxmemory_samples) break; @@ -624,13 +626,13 @@ int performEvictions(void) { if (pool[k].key == NULL) continue; bestdbid = pool[k].dbid; + kvstore *kvs; if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { - de = dictFind(server.db[bestdbid].dict[pool[k].slot], - pool[k].key); + kvs = server.db[bestdbid].keys; } else { - de = dictFind(server.db[bestdbid].expires[pool[k].slot], - pool[k].key); + kvs = server.db[bestdbid].expires; } + de = kvstoreDictFind(kvs, pool[k].slot, pool[k].key); /* Remove the entry from the pool. */ if (pool[k].key != pool[k].cached) @@ -660,10 +662,15 @@ int performEvictions(void) { for (i = 0; i < server.dbnum; i++) { j = (++next_db) % server.dbnum; db = server.db+j; - dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ? - db->dict[getFairRandomSlot(db, DB_MAIN)] : db->expires[getFairRandomSlot(db, DB_EXPIRES)]; - if (dictSize(dict) != 0) { - de = dictGetRandomKey(dict); + kvstore *kvs; + if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) { + kvs = db->keys; + } else { + kvs = db->expires; + } + int slot = kvstoreGetFairRandomDictIndex(kvs); + de = kvstoreDictGetRandomKey(kvs, slot); + if (de) { bestkey = dictGetKey(de); bestdbid = j; break; diff --git a/src/expire.c b/src/expire.c index a1006809c..2092c5a09 100644 --- a/src/expire.c +++ b/src/expire.c @@ -253,7 +253,8 @@ void activeExpireCycle(int type) { * distribute the time evenly across DBs. */ current_db++; - if (dbSize(db, DB_EXPIRES)) dbs_performed++; + if (kvstoreSize(db->expires)) + dbs_performed++; /* Continue to expire if at the end of the cycle there are still * a big percentage of keys to expire, compared to the number of keys @@ -264,7 +265,7 @@ void activeExpireCycle(int type) { iteration++; /* If there is nothing to expire try next DB ASAP. */ - if ((num = dbSize(db, DB_EXPIRES)) == 0) { + if ((num = kvstoreSize(db->expires)) == 0) { db->avg_ttl = 0; break; } @@ -294,7 +295,7 @@ void activeExpireCycle(int type) { int origin_ttl_samples = data.ttl_samples; while (data.sampled < num && checked_buckets < max_buckets) { - db->expires_cursor = dbScan(db, DB_EXPIRES, db->expires_cursor, -1, expireScanCallback, isExpiryDictValidForSamplingCb, &data); + db->expires_cursor = kvstoreScan(db->expires, db->expires_cursor, -1, expireScanCallback, isExpiryDictValidForSamplingCb, &data); if (db->expires_cursor == 0) { db_done = 1; break; @@ -429,7 +430,7 @@ void expireSlaveKeys(void) { while(dbids && dbid < server.dbnum) { if ((dbids & 1) != 0) { redisDb *db = server.db+dbid; - dictEntry *expire = dictFind(db->expires[getKeySlot(keyname)],keyname); + dictEntry *expire = dbFindExpires(db, keyname); int expired = 0; if (expire && diff --git a/src/kvstore.c b/src/kvstore.c new file mode 100644 index 000000000..d9ca45adb --- /dev/null +++ b/src/kvstore.c @@ -0,0 +1,749 @@ +/* + * Index-based KV store implementation + * This file implements a KV store comprised of an array of dicts (see dict.c) + * The purpose of this KV store is to have easy access to all keys that belong + * in the same dict (i.e. are in the same dict-index) + * + * For example, when Redis is running in cluster mode, we use kvstore to save + * all keys that map to the same hash-slot in a separate dict within the kvstore + * struct. + * This enables us to easily access all keys that map to a specific hash-slot. + * + * Copyright (c) Redis contributors. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +#include "fmacros.h" + +#include +#include + +#include "zmalloc.h" +#include "kvstore.h" +#include "redisassert.h" +#include "monotonic.h" + +#define UNUSED(V) ((void) V) + +struct _kvstore { + int flags; + dictType dtype; + dict **dicts; + long long num_dicts; + long long num_dicts_bits; + list *rehashing; /* List of dictionaries in this kvstore that are currently rehashing. */ + int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries (only used if num_dicts > 1). */ + int allocated_dicts; /* The number of allocated dicts. */ + int non_empty_dicts; /* The number of non-empty dicts. */ + unsigned long long key_count; /* Total number of keys in this kvstore. */ + unsigned long long bucket_count; /* Total number of buckets in this kvstore across dictionaries. */ + unsigned long long *dict_size_index; /* Binary indexed tree (BIT) that describes cumulative key frequencies up until given dict-index. */ +}; + +/* Structure for kvstore iterator that allows iterating across multiple dicts. */ +struct _kvstoreIterator { + kvstore *kvs; + long long didx; + long long next_didx; + dictIterator di; +}; + +/* Dict metadata for database, used for record the position in rehashing list. */ +typedef struct { + listNode *rehashing_node; /* list node in rehashing list */ +} kvstoreDictMetadata; + +/**********************************/ +/*** Helpers **********************/ +/**********************************/ + +static dict *kvstoreGetDict(kvstore *kvs, int didx) { + return kvs->dicts[didx]; +} + +/* Returns total (cumulative) number of keys up until given dict-index (inclusive). + * Time complexity is O(log(kvs->num_dicts)). */ +static unsigned long long cumulativeKeyCountRead(kvstore *kvs, int didx) { + if (kvs->num_dicts == 1) { + assert(didx == 0); + return kvstoreSize(kvs); + } + int idx = didx + 1; + unsigned long long sum = 0; + while (idx > 0) { + sum += kvs->dict_size_index[idx]; + idx -= (idx & -idx); + } + return sum; +} + +static void addDictIndexToCursor(kvstore *kvs, int didx, unsigned long long *cursor) { + if (kvs->num_dicts == 1) + return; + /* didx can be -1 when iteration is over and there are no more dicts to visit. */ + if (didx < 0) + return; + *cursor = (*cursor << kvs->num_dicts_bits) | didx; +} + +static int getAndClearDictIndexFromCursor(kvstore *kvs, unsigned long long *cursor) { + if (kvs->num_dicts == 1) + return 0; + int didx = (int) (*cursor & (kvs->num_dicts-1)); + *cursor = *cursor >> kvs->num_dicts_bits; + return didx; +} + +/* Updates binary index tree (also known as Fenwick tree), increasing key count for a given dict. + * You can read more about this data structure here https://en.wikipedia.org/wiki/Fenwick_tree + * Time complexity is O(log(kvs->num_dicts)). */ +static void cumulativeKeyCountAdd(kvstore *kvs, int didx, long delta) { + kvs->key_count += delta; + + dict *d = kvstoreGetDict(kvs, didx); + size_t dsize = dictSize(d); + int non_empty_dicts_delta = dsize == 1? 1 : dsize == 0? -1 : 0; + kvs->non_empty_dicts += non_empty_dicts_delta; + + /* BIT does not need to be calculated when there's only one dict. */ + if (kvs->num_dicts == 1) + return; + + /* Update the BIT */ + int idx = didx + 1; /* Unlike dict indices, BIT is 1-based, so we need to add 1. */ + while (idx <= kvs->num_dicts) { + if (delta < 0) { + assert(kvs->dict_size_index[idx] >= (unsigned long long)labs(delta)); + } + kvs->dict_size_index[idx] += delta; + idx += (idx & -idx); + } +} + +static void createDictIfNeeded(kvstore *kvs, int didx) { + if (kvstoreGetDict(kvs, didx)) + return; + kvs->dicts[didx] = dictCreate(&kvs->dtype); + kvs->allocated_dicts++; +} + +static void freeDictIfNeeded(kvstore *kvs, int didx) { + if (!(kvs->flags & KVSTORE_FREE_EMPTY_DICTS) || + !kvstoreGetDict(kvs, didx) || + kvstoreDictSize(kvs, didx) != 0) + return; + dictRelease(kvs->dicts[didx]); + kvs->dicts[didx] = NULL; + kvs->allocated_dicts--; +} + +/**********************************/ +/*** dict callbacks ***************/ +/**********************************/ + +/* Adds dictionary to the rehashing list, which allows us + * to quickly find rehash targets during incremental rehashing. + * + * If there are multiple dicts, updates the bucket count for the given dictionary + * in a DB, bucket count incremented with the new ht size during the rehashing phase. + * If there's one dict, bucket count can be retrieved directly from single dict bucket. */ +static void kvstoreDictRehashingStarted(dict *d) { + kvstore *kvs = d->type->userdata; + kvstoreDictMetadata *metadata = (kvstoreDictMetadata *)dictMetadata(d); + listAddNodeTail(kvs->rehashing, d); + metadata->rehashing_node = listLast(kvs->rehashing); + + if (kvs->num_dicts == 1) + return; + unsigned long long from, to; + dictRehashingInfo(d, &from, &to); + kvs->bucket_count += to; /* Started rehashing (Add the new ht size) */ +} + +/* Remove dictionary from the rehashing list. + * + * Updates the bucket count for the given dictionary in a DB. It removes + * the old ht size of the dictionary from the total sum of buckets for a DB. */ +static void kvstoreDictRehashingCompleted(dict *d) { + kvstore *kvs = d->type->userdata; + kvstoreDictMetadata *metadata = (kvstoreDictMetadata *)dictMetadata(d); + if (metadata->rehashing_node) { + listDelNode(kvs->rehashing, metadata->rehashing_node); + metadata->rehashing_node = NULL; + } + + if (kvs->num_dicts == 1) + return; + unsigned long long from, to; + dictRehashingInfo(d, &from, &to); + kvs->bucket_count -= from; /* Finished rehashing (Remove the old ht size) */ +} + +/* Returns the size of the DB dict metadata in bytes. */ +static size_t kvstoreDictMetadataSize(dict *d) { + UNUSED(d); + return sizeof(kvstoreDictMetadata); +} + +/**********************************/ +/*** API **************************/ +/**********************************/ + +/* Create an array of dictionaries + * num_dicts_bits is the log2 of the amount of dictionaries needed (e.g. 0 for 1 dict, + * 3 for 8 dicts, etc.) */ +kvstore *kvstoreCreate(dictType *type, int num_dicts_bits, int flags) { + /* We can't support more than 2^16 dicts because we want to save 48 bits + * for the dict cursor, see kvstoreScan */ + assert(num_dicts_bits <= 16); + + kvstore *kvs = zcalloc(sizeof(*kvs)); + memcpy(&kvs->dtype, type, sizeof(kvs->dtype)); + kvs->flags = flags; + + /* kvstore must be the one to set this callbacks, so we make sure the + * caller didn't do it */ + assert(!type->userdata); + assert(!type->dictMetadataBytes); + assert(!type->rehashingStarted); + assert(!type->rehashingCompleted); + kvs->dtype.userdata = kvs; + kvs->dtype.dictMetadataBytes = kvstoreDictMetadataSize; + kvs->dtype.rehashingStarted = kvstoreDictRehashingStarted; + kvs->dtype.rehashingCompleted = kvstoreDictRehashingCompleted; + + kvs->num_dicts_bits = num_dicts_bits; + kvs->num_dicts = 1 << kvs->num_dicts_bits; + kvs->dicts = zcalloc(sizeof(dict*) * kvs->num_dicts); + if (!(kvs->flags & KVSTORE_ALLOCATE_DICTS_ON_DEMAND)) { + for (int i = 0; i < kvs->num_dicts; i++) + createDictIfNeeded(kvs, i); + } + + kvs->rehashing = listCreate(); + kvs->key_count = 0; + kvs->non_empty_dicts = 0; + kvs->resize_cursor = 0; + kvs->dict_size_index = kvs->num_dicts > 1? zcalloc(sizeof(unsigned long long) * (kvs->num_dicts + 1)) : NULL; + kvs->bucket_count = 0; + + return kvs; +} + +void kvstoreEmpty(kvstore *kvs, void(callback)(dict*)) { + for (int didx = 0; didx < kvs->num_dicts; didx++) { + dict *d = kvstoreGetDict(kvs, didx); + if (!d) + continue; + kvstoreDictMetadata *metadata = (kvstoreDictMetadata *)dictMetadata(d); + if (metadata->rehashing_node) + metadata->rehashing_node = NULL; + dictEmpty(d, callback); + } + + listEmpty(kvs->rehashing); + + kvs->key_count = 0; + kvs->non_empty_dicts = 0; + kvs->resize_cursor = 0; + kvs->bucket_count = 0; + if (kvs->dict_size_index) + memset(kvs->dict_size_index, 0, sizeof(unsigned long long) * (kvs->num_dicts + 1)); +} + +void kvstoreRelease(kvstore *kvs) { + for (int didx = 0; didx < kvs->num_dicts; didx++) { + dict *d = kvstoreGetDict(kvs, didx); + if (!d) + continue; + kvstoreDictMetadata *metadata = (kvstoreDictMetadata *)dictMetadata(d); + if (metadata->rehashing_node) + metadata->rehashing_node = NULL; + dictRelease(d); + } + zfree(kvs->dicts); + + listRelease(kvs->rehashing); + if (kvs->dict_size_index) + zfree(kvs->dict_size_index); + + zfree(kvs); +} + +unsigned long long int kvstoreSize(kvstore *kvs) { + if (kvs->num_dicts != 1) { + return kvs->key_count; + } else { + return kvs->dicts[0]? dictSize(kvs->dicts[0]) : 0; + } +} + +/* This method provides the cumulative sum of all the dictionary buckets + * across dictionaries in a database. */ +unsigned long kvstoreBuckets(kvstore *kvs) { + if (kvs->num_dicts != 1) { + return kvs->bucket_count; + } else { + return kvs->dicts[0]? dictBuckets(kvs->dicts[0]) : 0; + } +} + +size_t kvstoreMemUsage(kvstore *kvs) { + size_t mem = sizeof(*kvs); + + unsigned long long keys_count = kvstoreSize(kvs); + mem += keys_count * dictEntryMemUsage() + + kvstoreBuckets(kvs) * sizeof(dictEntry*) + + kvs->allocated_dicts * (sizeof(dict) + kvstoreDictMetadataSize(NULL)); + + /* Values are dict* shared with kvs->dicts */ + mem += listLength(kvs->rehashing) * sizeof(listNode); + + if (kvs->dict_size_index) + mem += sizeof(unsigned long long) * (kvs->num_dicts + 1); + + return mem; +} + +/* + * This method is used to iterate over the elements of the entire kvstore specifically across dicts. + * It's a three pronged approach. + * + * 1. It uses the provided cursor `cursor` to retrieve the dict index from it. + * 2. If the dictionary is in a valid state checked through the provided callback `dictScanValidFunction`, + * it performs a dictScan over the appropriate `keyType` dictionary of `db`. + * 3. If the dict is entirely scanned i.e. the cursor has reached 0, the next non empty dict is discovered. + * The dict information is embedded into the cursor and returned. + * + * To restrict the scan to a single dict, pass a valid dict index as + * 'onlydidx', otherwise pass -1. + */ +unsigned long long kvstoreScan(kvstore *kvs, unsigned long long cursor, + int onlydidx, dictScanFunction *scan_cb, + kvstoreScanShouldSkipDict *skip_cb, + void *privdata) +{ + unsigned long long _cursor = 0; + /* During dictionary traversal, 48 upper bits in the cursor are used for positioning in the HT. + * Following lower bits are used for the dict index number, ranging from 0 to 2^num_dicts_bits-1. + * Dict index is always 0 at the start of iteration and can be incremented only if there are + * multiple dicts. */ + int didx = getAndClearDictIndexFromCursor(kvs, &cursor); + if (onlydidx >= 0) { + if (didx < onlydidx) { + /* Fast-forward to onlydidx. */ + assert(onlydidx < kvs->num_dicts); + didx = onlydidx; + cursor = 0; + } else if (didx > onlydidx) { + /* The cursor is already past onlydidx. */ + return 0; + } + } + + dict *d = kvstoreGetDict(kvs, didx); + + int skip = !d || (skip_cb && skip_cb(d)); + if (!skip) { + _cursor = dictScan(d, cursor, scan_cb, privdata); + } + /* scanning done for the current dictionary or if the scanning wasn't possible, move to the next dict index. */ + if (_cursor == 0 || skip) { + if (onlydidx >= 0) + return 0; + didx = kvstoreGetNextNonEmptyDictIndex(kvs, didx); + } + if (didx == -1) { + return 0; + } + addDictIndexToCursor(kvs, didx, &_cursor); + return _cursor; +} + +/* + * This functions increases size of kvstore to match desired number. + * It resizes all individual dictionaries, unless skip_cb indicates otherwise. + * + * Based on the parameter `try_expand`, appropriate dict expand API is invoked. + * if try_expand is set to 1, `dictTryExpand` is used else `dictExpand`. + * The return code is either `DICT_OK`/`DICT_ERR` for both the API(s). + * `DICT_OK` response is for successful expansion. However ,`DICT_ERR` response signifies failure in allocation in + * `dictTryExpand` call and in case of `dictExpand` call it signifies no expansion was performed. + */ +int kvstoreExpand(kvstore *kvs, uint64_t newsize, int try_expand, kvstoreExpandShouldSkipDictIndex *skip_cb) { + for (int i = 0; i < kvs->num_dicts; i++) { + dict *d = kvstoreGetDict(kvs, i); + if (!d || (skip_cb && skip_cb(i))) + continue; + int result = try_expand ? dictTryExpand(d, newsize) : dictExpand(d, newsize); + if (try_expand && result == DICT_ERR) + return 0; + } + + return 1; +} + +/* Returns fair random dict index, probability of each dict being returned is proportional to the number of elements that dictionary holds. + * This function guarantees that it returns a dict-index of a non-empty dict, unless the entire kvstore is empty. + * Time complexity of this function is O(log(kvs->num_dicts)). */ +int kvstoreGetFairRandomDictIndex(kvstore *kvs) { + unsigned long target = kvstoreSize(kvs) ? (randomULong() % kvstoreSize(kvs)) + 1 : 0; + return kvstoreFindDictIndexByKeyIndex(kvs, target); +} + +void kvstoreGetStats(kvstore *kvs, char *buf, size_t bufsize, int full) { + buf[0] = '\0'; + + size_t l; + char *orig_buf = buf; + size_t orig_bufsize = bufsize; + dictStats *mainHtStats = NULL; + dictStats *rehashHtStats = NULL; + dict *d; + kvstoreIterator *kvs_it = kvstoreIteratorInit(kvs); + while ((d = kvstoreIteratorNextDict(kvs_it))) { + dictStats *stats = dictGetStatsHt(d, 0, full); + if (!mainHtStats) { + mainHtStats = stats; + } else { + dictCombineStats(stats, mainHtStats); + dictFreeStats(stats); + } + if (dictIsRehashing(d)) { + stats = dictGetStatsHt(d, 1, full); + if (!rehashHtStats) { + rehashHtStats = stats; + } else { + dictCombineStats(stats, rehashHtStats); + dictFreeStats(stats); + } + } + } + kvstoreIteratorRelease(kvs_it); + + if (mainHtStats && bufsize > 0) { + l = dictGetStatsMsg(buf, bufsize, mainHtStats, full); + dictFreeStats(mainHtStats); + buf += l; + bufsize -= l; + } + + if (rehashHtStats && bufsize > 0) { + l = dictGetStatsMsg(buf, bufsize, rehashHtStats, full); + dictFreeStats(rehashHtStats); + buf += l; + bufsize -= l; + } + /* Make sure there is a NULL term at the end. */ + if (orig_bufsize) orig_buf[orig_bufsize - 1] = '\0'; +} + +/* Finds a dict containing target element in a key space ordered by dict index. + * Consider this example. Dictionaries are represented by brackets and keys by dots: + * #0 #1 #2 #3 #4 + * [..][....][...][.......][.] + * ^ + * target + * + * In this case dict #3 contains key that we are trying to find. + * + * The return value is 0 based dict-index, and the range of the target is [1..kvstoreSize], kvstoreSize inclusive. + * + * To find the dict, we start with the root node of the binary index tree and search through its children + * from the highest index (2^num_dicts_bits in our case) to the lowest index. At each node, we check if the target + * value is greater than the node's value. If it is, we remove the node's value from the target and recursively + * search for the new target using the current node as the parent. + * Time complexity of this function is O(log(kvs->num_dicts)) + */ +int kvstoreFindDictIndexByKeyIndex(kvstore *kvs, unsigned long target) { + if (kvs->num_dicts == 1 || kvstoreSize(kvs) == 0) + return 0; + assert(target <= kvstoreSize(kvs)); + + int result = 0, bit_mask = 1 << kvs->num_dicts_bits; + for (int i = bit_mask; i != 0; i >>= 1) { + int current = result + i; + /* When the target index is greater than 'current' node value the we will update + * the target and search in the 'current' node tree. */ + if (target > kvs->dict_size_index[current]) { + target -= kvs->dict_size_index[current]; + result = current; + } + } + /* Adjust the result to get the correct dict: + * 1. result += 1; + * After the calculations, the index of target in dict_size_index should be the next one, + * so we should add 1. + * 2. result -= 1; + * Unlike BIT(dict_size_index is 1-based), dict indices are 0-based, so we need to subtract 1. + * As the addition and subtraction cancel each other out, we can simply return the result. */ + return result; +} + +/* Returns next non-empty dict index strictly after given one, or -1 if provided didx is the last one. */ +int kvstoreGetNextNonEmptyDictIndex(kvstore *kvs, int didx) { + unsigned long long next_key = cumulativeKeyCountRead(kvs, didx) + 1; + return next_key <= kvstoreSize(kvs) ? kvstoreFindDictIndexByKeyIndex(kvs, next_key) : -1; +} + +int kvstoreNumNonEmptyDicts(kvstore *kvs) { + return kvs->non_empty_dicts; +} + +int kvstoreNumDicts(kvstore *kvs) { + return kvs->num_dicts; +} + +/* Returns kvstore iterator that can be used to iterate through sub-dictionaries. + * + * The caller should free the resulting kvs_it with kvstoreIteratorRelease. */ +kvstoreIterator *kvstoreIteratorInit(kvstore *kvs) { + kvstoreIterator *kvs_it = zmalloc(sizeof(*kvs_it)); + kvs_it->kvs = kvs; + kvs_it->didx = -1; + kvs_it->next_didx = kvstoreFindDictIndexByKeyIndex(kvs_it->kvs, 1); /* Finds first non-empty dict index. */ + dictInitSafeIterator(&kvs_it->di, NULL); + return kvs_it; +} + +/* Free the dbit returned by dbIteratorInit. */ +void kvstoreIteratorRelease(kvstoreIterator *kvs_it) { + dictIterator *iter = &kvs_it->di; + dictResetIterator(iter); + + zfree(kvs_it); +} + +/* Returns next dictionary from the iterator, or NULL if iteration is complete. */ +dict *kvstoreIteratorNextDict(kvstoreIterator *kvs_it) { + if (kvs_it->next_didx == -1) + return NULL; + kvs_it->didx = kvs_it->next_didx; + kvs_it->next_didx = kvstoreGetNextNonEmptyDictIndex(kvs_it->kvs, kvs_it->didx); + return kvs_it->kvs->dicts[kvs_it->didx]; +} + +int kvstoreIteratorGetCurrentDictIndex(kvstoreIterator *kvs_it) { + assert(kvs_it->didx >= 0 && kvs_it->didx < kvs_it->kvs->num_dicts); + return kvs_it->didx; +} + +/* Returns next entry. */ +dictEntry *kvstoreIteratorNext(kvstoreIterator *kvs_it) { + dictEntry *de = kvs_it->di.d ? dictNext(&kvs_it->di) : NULL; + if (!de) { /* No current dict or reached the end of the dictionary. */ + dict *d = kvstoreIteratorNextDict(kvs_it); + if (!d) + return NULL; + if (kvs_it->di.d) { + /* Before we move to the next dict, reset the iter of the previous dict. */ + dictIterator *iter = &kvs_it->di; + dictResetIterator(iter); + } + dictInitSafeIterator(&kvs_it->di, d); + de = dictNext(&kvs_it->di); + } + return de; +} + +/* This method traverses through kvstore dictionaries and triggers a resize . + * It first tries to shrink if needed, and if it isn't, it tries to expand. */ +void kvstoreTryResizeDicts(kvstore *kvs, int limit) { + if (limit > kvs->num_dicts) + limit = kvs->num_dicts; + + for (int i = 0; i < limit; i++) { + int didx = kvs->resize_cursor; + dict *d = kvstoreGetDict(kvs, didx); + if (!d) + continue; + if (dictShrinkIfNeeded(d) == DICT_ERR) { + dictExpandIfNeeded(d); + } + kvs->resize_cursor = (didx + 1) % kvs->num_dicts; + } +} + +/* Our hash table implementation performs rehashing incrementally while + * we write/read from the hash table. Still if the server is idle, the hash + * table will use two tables for a long time. So we try to use 1 millisecond + * of CPU time at every call of this function to perform some rehashing. + * + * The function returns the amount of microsecs spent if some rehashing was + * performed, otherwise 0 is returned. */ +uint64_t kvstoreIncrementallyRehash(kvstore *kvs, uint64_t threshold_us) { + if (listLength(kvs->rehashing) == 0) + return 0; + + /* Our goal is to rehash as many dictionaries as we can before reaching predefined threshold, + * after each dictionary completes rehashing, it removes itself from the list. */ + listNode *node; + monotime timer; + uint64_t elapsed_us = UINT64_MAX; + elapsedStart(&timer); + while ((node = listFirst(kvs->rehashing))) { + elapsed_us = elapsedUs(timer); + if (elapsed_us >= threshold_us) { + break; /* Reached the time limit. */ + } + dictRehashMicroseconds(listNodeValue(node), threshold_us - elapsed_us); + } + assert(elapsed_us != UINT64_MAX); + return elapsed_us; +} + +unsigned long kvstoreDictSize(kvstore *kvs, int didx) +{ + dict *d = kvstoreGetDict(kvs, didx); + if (!d) + return 0; + return dictSize(d); +} + +dictIterator *kvstoreDictGetIterator(kvstore *kvs, int didx) +{ + dict *d = kvstoreGetDict(kvs, didx); + return dictGetIterator(d); +} + +dictIterator *kvstoreDictGetSafeIterator(kvstore *kvs, int didx) +{ + dict *d = kvstoreGetDict(kvs, didx); + return dictGetSafeIterator(d); +} + +dictEntry *kvstoreDictGetRandomKey(kvstore *kvs, int didx) +{ + dict *d = kvstoreGetDict(kvs, didx); + if (!d) + return NULL; + return dictGetRandomKey(d); +} + +dictEntry *kvstoreDictGetFairRandomKey(kvstore *kvs, int didx) +{ + dict *d = kvstoreGetDict(kvs, didx); + if (!d) + return NULL; + return dictGetFairRandomKey(d); +} + +dictEntry *kvstoreDictFindEntryByPtrAndHash(kvstore *kvs, int didx, const void *oldptr, uint64_t hash) +{ + dict *d = kvstoreGetDict(kvs, didx); + if (!d) + return NULL; + return dictFindEntryByPtrAndHash(d, oldptr, hash); +} + +unsigned int kvstoreDictGetSomeKeys(kvstore *kvs, int didx, dictEntry **des, unsigned int count) +{ + dict *d = kvstoreGetDict(kvs, didx); + if (!d) + return 0; + return dictGetSomeKeys(d, des, count); +} + +int kvstoreDictExpand(kvstore *kvs, int didx, unsigned long size) +{ + dict *d = kvstoreGetDict(kvs, didx); + if (!d) + return DICT_ERR; + return dictExpand(d, size); +} + +unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata) +{ + dict *d = kvstoreGetDict(kvs, didx); + if (!d) + return 0; + return dictScanDefrag(d, v, fn, defragfns, privdata); +} + +uint64_t kvstoreGetHash(kvstore *kvs, const void *key) +{ + return kvs->dtype.hashFunction(key); +} + +void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key) +{ + dict *d = kvstoreGetDict(kvs, didx); + if (!d) + return NULL; + return dictFetchValue(d, key); +} + +dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key) { + dict *d = kvstoreGetDict(kvs, didx); + if (!d) + return NULL; + return dictFind(d, key); +} + +dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **existing) { + createDictIfNeeded(kvs, didx); + dict *d = kvstoreGetDict(kvs, didx); + dictEntry *ret = dictAddRaw(d, key, existing); + if (ret) + cumulativeKeyCountAdd(kvs, didx, 1); + return ret; +} + +void kvstoreDictSetKey(kvstore *kvs, int didx, dictEntry* de, void *key) { + dict *d = kvstoreGetDict(kvs, didx); + dictSetKey(d, de, key); +} + +void kvstoreDictSetVal(kvstore *kvs, int didx, dictEntry *de, void *val) { + dict *d = kvstoreGetDict(kvs, didx); + dictSetVal(d, de, val); +} + +dictEntry *kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, dictEntry ***plink, int *table_index) { + dict *d = kvstoreGetDict(kvs, didx); + if (!d) + return NULL; + return dictTwoPhaseUnlinkFind(kvstoreGetDict(kvs, didx), key, plink, table_index); +} + +void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntry *he, dictEntry **plink, int table_index) { + dict *d = kvstoreGetDict(kvs, didx); + dictTwoPhaseUnlinkFree(d, he, plink, table_index); + cumulativeKeyCountAdd(kvs, didx, -1); + freeDictIfNeeded(kvs, didx); +} + +int kvstoreDictDelete(kvstore *kvs, int didx, const void *key) { + dict *d = kvstoreGetDict(kvs, didx); + if (!d) + return DICT_ERR; + int ret = dictDelete(kvstoreGetDict(kvs, didx), key); + if (ret == DICT_OK) { + cumulativeKeyCountAdd(kvs, didx, -1); + freeDictIfNeeded(kvs, didx); + } + return ret; +} diff --git a/src/kvstore.h b/src/kvstore.h new file mode 100644 index 000000000..2e5508be8 --- /dev/null +++ b/src/kvstore.h @@ -0,0 +1,65 @@ +#ifndef DICTARRAY_H_ +#define DICTARRAY_H_ + +#include "dict.h" +#include "adlist.h" + +typedef struct _kvstore kvstore; +typedef struct _kvstoreIterator kvstoreIterator; + +typedef int (kvstoreScanShouldSkipDict)(dict *d); +typedef int (kvstoreExpandShouldSkipDictIndex)(int didx); + +#define KVSTORE_ALLOCATE_DICTS_ON_DEMAND (1<<0) +#define KVSTORE_FREE_EMPTY_DICTS (1<<1) +kvstore *kvstoreCreate(dictType *type, int num_dicts_bits, int flags); +void kvstoreEmpty(kvstore *kvs, void(callback)(dict*)); +void kvstoreRelease(kvstore *kvs); +unsigned long long kvstoreSize(kvstore *kvs); +unsigned long kvstoreBuckets(kvstore *kvs); +size_t kvstoreMemUsage(kvstore *kvs); +unsigned long long kvstoreScan(kvstore *kvs, unsigned long long cursor, + int onlydidx, dictScanFunction *scan_cb, + kvstoreScanShouldSkipDict *skip_cb, + void *privdata); +int kvstoreExpand(kvstore *kvs, uint64_t newsize, int try_expand, kvstoreExpandShouldSkipDictIndex *skip_cb); +int kvstoreGetFairRandomDictIndex(kvstore *kvs); +void kvstoreGetStats(kvstore *kvs, char *buf, size_t bufsize, int full); + +int kvstoreFindDictIndexByKeyIndex(kvstore *kvs, unsigned long target); +int kvstoreGetNextNonEmptyDictIndex(kvstore *kvs, int didx); +int kvstoreNumNonEmptyDicts(kvstore *kvs); +int kvstoreNumDicts(kvstore *kvs); +uint64_t kvstoreGetHash(kvstore *kvs, const void *key); + +/* kvstore iterator specific functions */ +kvstoreIterator *kvstoreIteratorInit(kvstore *kvs); +void kvstoreIteratorRelease(kvstoreIterator *kvs_it); +dict *kvstoreIteratorNextDict(kvstoreIterator *kvs_it); +int kvstoreIteratorGetCurrentDictIndex(kvstoreIterator *kvs_it); +dictEntry *kvstoreIteratorNext(kvstoreIterator *kvs_it); + +/* Rehashing */ +void kvstoreTryResizeDicts(kvstore *kvs, int limit); +uint64_t kvstoreIncrementallyRehash(kvstore *kvs, uint64_t threshold_ms); + +/* Specific dict access by dict-index */ +unsigned long kvstoreDictSize(kvstore *kvs, int didx); +dictIterator *kvstoreDictGetIterator(kvstore *kvs, int didx); +dictIterator *kvstoreDictGetSafeIterator(kvstore *kvs, int didx); +dictEntry *kvstoreDictGetRandomKey(kvstore *kvs, int didx); +dictEntry *kvstoreDictGetFairRandomKey(kvstore *kvs, int didx); +dictEntry *kvstoreDictFindEntryByPtrAndHash(kvstore *kvs, int didx, const void *oldptr, uint64_t hash); +unsigned int kvstoreDictGetSomeKeys(kvstore *kvs, int didx, dictEntry **des, unsigned int count); +int kvstoreDictExpand(kvstore *kvs, int didx, unsigned long size); +unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata); +void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key); +dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key); +dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **existing); +void kvstoreDictSetKey(kvstore *kvs, int didx, dictEntry* de, void *key); +void kvstoreDictSetVal(kvstore *kvs, int didx, dictEntry *de, void *val); +dictEntry *kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, dictEntry ***plink, int *table_index); +void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntry *he, dictEntry **plink, int table_index); +int kvstoreDictDelete(kvstore *kvs, int didx, const void *key); + +#endif /* DICTARRAY_H_ */ diff --git a/src/lazyfree.c b/src/lazyfree.c index 5e7a1cb34..5344c24ab 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -2,6 +2,7 @@ #include "bio.h" #include "atomicvar.h" #include "functions.h" +#include "cluster.h" static redisAtomic size_t lazyfree_objects = 0; static redisAtomic size_t lazyfreed_objects = 0; @@ -19,19 +20,14 @@ void lazyfreeFreeObject(void *args[]) { * database which was substituted with a fresh one in the main thread * when the database was logically deleted. */ void lazyfreeFreeDatabase(void *args[]) { - dict **ht1 = (dict **) args[0]; - dict **ht2 = (dict **) args[1]; - int *dictCount = (int *) args[2]; - for (int i=0; i<*dictCount; i++) { - size_t numkeys = dictSize(ht1[i]); - dictRelease(ht1[i]); - dictRelease(ht2[i]); - atomicDecr(lazyfree_objects,numkeys); - atomicIncr(lazyfreed_objects,numkeys); - } - zfree(ht1); - zfree(ht2); - zfree(dictCount); + kvstore *da1 = args[0]; + kvstore *da2 = args[1]; + + size_t numkeys = kvstoreSize(da1); + kvstoreRelease(da1); + kvstoreRelease(da2); + atomicDecr(lazyfree_objects,numkeys); + atomicIncr(lazyfreed_objects,numkeys); } /* Release the key tracking table. */ @@ -179,28 +175,12 @@ void freeObjAsync(robj *key, robj *obj, int dbid) { * create a new empty set of hash tables and scheduling the old ones for * lazy freeing. */ void emptyDbAsync(redisDb *db) { - dbDictMetadata *metadata; - for (int i = 0; i < db->dict_count; i++) { - metadata = (dbDictMetadata *)dictMetadata(db->dict[i]); - if (metadata->rehashing_node) { - listDelNode(server.rehashing, metadata->rehashing_node); - metadata->rehashing_node = NULL; - } - - metadata = (dbDictMetadata *)dictMetadata(db->expires[i]); - if (metadata->rehashing_node) { - listDelNode(server.rehashing, metadata->rehashing_node); - metadata->rehashing_node = NULL; - } - } - dict **oldDict = db->dict; - dict **oldExpires = db->expires; - atomicIncr(lazyfree_objects,dbSize(db, DB_MAIN)); - db->dict = dictCreateMultiple(&dbDictType, db->dict_count); - db->expires = dictCreateMultiple(&dbExpiresDictType, db->dict_count); - int *count = zmalloc(sizeof(int)); - *count = db->dict_count; - bioCreateLazyFreeJob(lazyfreeFreeDatabase, 3, oldDict, oldExpires, count); + int slotCountBits = server.cluster_enabled? CLUSTER_SLOT_MASK_BITS : 0; + kvstore *oldkeys = db->keys, *oldexpires = db->expires; + db->keys = kvstoreCreate(&dbDictType, slotCountBits, KVSTORE_ALLOCATE_DICTS_ON_DEMAND); + db->expires = kvstoreCreate(&dbExpiresDictType, slotCountBits, KVSTORE_ALLOCATE_DICTS_ON_DEMAND); + atomicIncr(lazyfree_objects, kvstoreSize(oldkeys)); + bioCreateLazyFreeJob(lazyfreeFreeDatabase, 2, oldkeys, oldexpires); } /* Free the key tracking table. diff --git a/src/module.c b/src/module.c index 2fe2be8c1..19e50bda1 100644 --- a/src/module.c +++ b/src/module.c @@ -4295,7 +4295,7 @@ void RM_ResetDataset(int restart_aof, int async) { /* Returns the number of keys in the current db. */ unsigned long long RM_DbSize(RedisModuleCtx *ctx) { - return dbSize(ctx->client->db, DB_MAIN); + return dbSize(ctx->client->db); } /* Returns a name of a random key, or NULL if current db is empty. */ @@ -11058,7 +11058,7 @@ int RM_Scan(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanC } int ret = 1; ScanCBData data = { ctx, privdata, fn }; - cursor->cursor = dbScan(ctx->client->db, DB_MAIN, cursor->cursor, -1, moduleScanCallback, NULL, &data); + cursor->cursor = dbScan(ctx->client->db, cursor->cursor, moduleScanCallback, &data); if (cursor->cursor == 0) { cursor->done = 1; ret = 0; diff --git a/src/multi.c b/src/multi.c index 5a0bd7ebf..1e331b36f 100644 --- a/src/multi.c +++ b/src/multi.c @@ -394,7 +394,7 @@ void touchWatchedKey(redisDb *db, robj *key) { /* The key was already expired when WATCH was called. */ if (db == wk->db && equalStringObjects(key, wk->key) && - dictFind(db->dict[calculateKeySlot(key->ptr)], key->ptr) == NULL) + dbFind(db, key->ptr) == NULL) { /* Already expired key is deleted, so logically no change. Clear * the flag. Deleted keys are not flagged as expired. */ @@ -432,9 +432,9 @@ void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with) { dictIterator *di = dictGetSafeIterator(emptied->watched_keys); while((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); - int exists_in_emptied = dictFind(emptied->dict[calculateKeySlot(key->ptr)], key->ptr) != NULL; + int exists_in_emptied = dbFind(emptied, key->ptr) != NULL; if (exists_in_emptied || - (replaced_with && dictFind(replaced_with->dict[calculateKeySlot(key->ptr)], key->ptr))) + (replaced_with && dbFind(replaced_with, key->ptr) != NULL)) { list *clients = dictGetVal(de); if (!clients) continue; @@ -442,7 +442,7 @@ void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with) { while((ln = listNext(&li))) { watchedKey *wk = redis_member2struct(watchedKey, node, ln); if (wk->expired) { - if (!replaced_with || !dictFind(replaced_with->dict[calculateKeySlot(key->ptr)], key->ptr)) { + if (!replaced_with || !dbFind(replaced_with, key->ptr)) { /* Expired key now deleted. No logical change. Clear the * flag. Deleted keys are not flagged as expired. */ wk->expired = 0; diff --git a/src/object.c b/src/object.c index 48f4820b1..054f7d348 100644 --- a/src/object.c +++ b/src/object.c @@ -1246,18 +1246,19 @@ struct redisMemOverhead *getMemoryOverheadData(void) { for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; - unsigned long long keyscount = dbSize(db, DB_MAIN); + unsigned long long keyscount = kvstoreSize(db->keys); if (keyscount == 0) continue; mh->total_keys += keyscount; mh->db = zrealloc(mh->db,sizeof(mh->db[0])*(mh->num_dbs+1)); mh->db[mh->num_dbs].dbid = j; - mem = dbMemUsage(db, DB_MAIN); + mem = kvstoreMemUsage(db->keys) + + keyscount * sizeof(robj); mh->db[mh->num_dbs].overhead_ht_main = mem; mem_total+=mem; - mem = dbMemUsage(db, DB_EXPIRES); + mem = kvstoreMemUsage(db->expires); mh->db[mh->num_dbs].overhead_ht_expires = mem; mem_total+=mem; @@ -1544,7 +1545,7 @@ NULL return; } } - if ((de = dbFind(c->db, c->argv[2]->ptr, DB_MAIN)) == NULL) { + if ((de = dbFind(c->db, c->argv[2]->ptr)) == NULL) { addReplyNull(c); return; } diff --git a/src/pubsub.c b/src/pubsub.c index afaf0832f..a2c28ddcb 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -36,7 +36,7 @@ typedef struct pubsubtype { int shard; dict *(*clientPubSubChannels)(client*); int (*subscriptionCount)(client*); - dict **(*serverPubSubChannels)(unsigned int); + kvstore **serverPubSubChannels; robj **subscribeMsg; robj **unsubscribeMsg; robj **messageBulk; @@ -62,22 +62,12 @@ dict* getClientPubSubChannels(client *c); */ dict* getClientPubSubShardChannels(client *c); -/* - * Get server's global Pub/Sub channels dict. - */ -dict **getServerPubSubChannels(unsigned int slot); - -/* - * Get server's shard level Pub/Sub channels dict. - */ -dict **getServerPubSubShardChannels(unsigned int slot); - /* * Get list of channels client is subscribed to. * If a pattern is provided, the subset of channels is returned * matching the pattern. */ -void channelList(client *c, sds pat, dict** pubsub_channels, int is_sharded); +void channelList(client *c, sds pat, kvstore *pubsub_channels); /* * Pub/Sub type for global channels. @@ -86,7 +76,7 @@ pubsubtype pubSubType = { .shard = 0, .clientPubSubChannels = getClientPubSubChannels, .subscriptionCount = clientSubscriptionsCount, - .serverPubSubChannels = getServerPubSubChannels, + .serverPubSubChannels = &server.pubsub_channels, .subscribeMsg = &shared.subscribebulk, .unsubscribeMsg = &shared.unsubscribebulk, .messageBulk = &shared.messagebulk, @@ -99,7 +89,7 @@ pubsubtype pubSubShardType = { .shard = 1, .clientPubSubChannels = getClientPubSubShardChannels, .subscriptionCount = clientShardSubscriptionsCount, - .serverPubSubChannels = getServerPubSubShardChannels, + .serverPubSubChannels = &server.pubsubshard_channels, .subscribeMsg = &shared.ssubscribebulk, .unsubscribeMsg = &shared.sunsubscribebulk, .messageBulk = &shared.smessagebulk, @@ -218,15 +208,14 @@ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) { /* Return the number of pubsub channels + patterns is handled. */ int serverPubsubSubscriptionCount(void) { - return dictSize(server.pubsub_channels) + dictSize(server.pubsub_patterns); + return kvstoreSize(server.pubsub_channels) + dictSize(server.pubsub_patterns); } /* Return the number of pubsub shard level channels is handled. */ int serverPubsubShardSubscriptionCount(void) { - return server.shard_channel_count; + return kvstoreSize(server.pubsubshard_channels); } - /* Return the number of channels + patterns a client is subscribed to. */ int clientSubscriptionsCount(client *c) { return dictSize(c->pubsub_channels) + dictSize(c->pubsub_patterns); @@ -245,16 +234,6 @@ dict* getClientPubSubShardChannels(client *c) { return c->pubsubshard_channels; } -dict **getServerPubSubChannels(unsigned int slot) { - UNUSED(slot); - return &server.pubsub_channels; -} - -dict **getServerPubSubShardChannels(unsigned int slot) { - serverAssert(server.cluster_enabled || slot == 0); - return &server.pubsubshard_channels[slot]; -} - /* Return the number of pubsub + pubsub shard level channels * a client is subscribed to. */ int clientTotalPubSubSubscriptionCount(client *c) { @@ -278,8 +257,7 @@ void unmarkClientAsPubSub(client *c) { /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */ int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) { - dict **d_ptr; - dictEntry *de; + dictEntry *de, *existing; dict *clients = NULL; int retval = 0; unsigned int slot = 0; @@ -292,23 +270,17 @@ int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) { if (server.cluster_enabled && type.shard) { slot = getKeySlot(channel->ptr); } - d_ptr = type.serverPubSubChannels(slot); - if (*d_ptr == NULL) { - *d_ptr = dictCreate(&objToDictDictType); - de = NULL; + + de = kvstoreDictAddRaw(*type.serverPubSubChannels, slot, channel, &existing); + + if (existing) { + clients = dictGetVal(existing); } else { - de = dictFind(*d_ptr, channel); - } - if (de == NULL) { clients = dictCreate(&clientDictType); - dictAdd(*d_ptr, channel, clients); + kvstoreDictSetVal(*type.serverPubSubChannels, slot, de, clients); incrRefCount(channel); - if (type.shard) { - server.shard_channel_count++; - } - } else { - clients = dictGetVal(de); } + serverAssert(dictAdd(clients, c, NULL) != DICT_ERR); } /* Notify the client */ @@ -319,7 +291,6 @@ int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) { /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or * 0 if the client was not subscribed to the specified channel. */ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype type) { - dict *d; dictEntry *de; dict *clients; int retval = 0; @@ -334,9 +305,7 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype ty if (server.cluster_enabled && type.shard) { slot = getKeySlot(channel->ptr); } - d = *type.serverPubSubChannels(slot); - serverAssertWithInfo(c,NULL,d != NULL); - de = dictFind(d, channel); + de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel); serverAssertWithInfo(c,NULL,de != NULL); clients = dictGetVal(de); serverAssertWithInfo(c, NULL, dictDelete(clients, c) == DICT_OK); @@ -344,15 +313,7 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype ty /* Free the dict and associated hash entry at all if this was * the latest client, so that it will be possible to abuse * Redis PUBSUB creating millions of channels. */ - dictDelete(d, channel); - if (type.shard) { - if (dictSize(d) == 0) { - dictRelease(d); - dict **d_ptr = type.serverPubSubChannels(slot); - *d_ptr = NULL; - } - server.shard_channel_count--; - } + kvstoreDictDelete(*type.serverPubSubChannels, slot, channel); } } /* Notify the client */ @@ -365,11 +326,10 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype ty /* Unsubscribe all shard channels in a slot. */ void pubsubShardUnsubscribeAllChannelsInSlot(unsigned int slot) { - dict *d = server.pubsubshard_channels[slot]; - if (!d) { + if (!kvstoreDictSize(server.pubsubshard_channels, slot)) return; - } - dictIterator *di = dictGetSafeIterator(d); + + dictIterator *di = kvstoreDictGetSafeIterator(server.pubsubshard_channels, slot); dictEntry *de; while ((de = dictNext(di)) != NULL) { robj *channel = dictGetKey(de); @@ -389,12 +349,9 @@ void pubsubShardUnsubscribeAllChannelsInSlot(unsigned int slot) { } } dictReleaseIterator(iter); - server.shard_channel_count--; - dictDelete(d, channel); + kvstoreDictDelete(server.pubsubshard_channels, slot, channel); } dictReleaseIterator(di); - dictRelease(d); - server.pubsubshard_channels[slot] = NULL; } /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */ @@ -513,7 +470,6 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) { */ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) { int receivers = 0; - dict *d; dictEntry *de; dictIterator *di; unsigned int slot = 0; @@ -522,8 +478,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) if (server.cluster_enabled && type.shard) { slot = keyHashSlot(channel->ptr, sdslen(channel->ptr)); } - d = *type.serverPubSubChannels(slot); - de = d ? dictFind(d, channel) : NULL; + de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel); if (de) { dict *clients = dictGetVal(de); dictEntry *entry; @@ -693,14 +648,14 @@ NULL { /* PUBSUB CHANNELS [] */ sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr; - channelList(c, pat, &server.pubsub_channels, 0); + channelList(c, pat, server.pubsub_channels); } else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) { /* PUBSUB NUMSUB [Channel_1 ... Channel_N] */ int j; addReplyArrayLen(c,(c->argc-2)*2); for (j = 2; j < c->argc; j++) { - dict *d = dictFetchValue(server.pubsub_channels, c->argv[j]); + dict *d = kvstoreDictFetchValue(server.pubsub_channels, 0, c->argv[j]); addReplyBulk(c,c->argv[j]); addReplyLongLong(c, d ? dictSize(d) : 0); @@ -713,35 +668,33 @@ NULL { /* PUBSUB SHARDCHANNELS */ sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr; - channelList(c,pat,server.pubsubshard_channels,server.cluster_enabled); + channelList(c,pat,server.pubsubshard_channels); } else if (!strcasecmp(c->argv[1]->ptr,"shardnumsub") && c->argc >= 2) { /* PUBSUB SHARDNUMSUB [ShardChannel_1 ... ShardChannel_N] */ int j; addReplyArrayLen(c, (c->argc-2)*2); for (j = 2; j < c->argc; j++) { unsigned int slot = calculateKeySlot(c->argv[j]->ptr); - dict *d = server.pubsubshard_channels[slot]; - dict *clients = d ? dictFetchValue(d, c->argv[j]) : NULL; + dict *clients = kvstoreDictFetchValue(server.pubsubshard_channels, slot, c->argv[j]); addReplyBulk(c,c->argv[j]); - addReplyLongLong(c, d ? dictSize(clients) : 0); + addReplyLongLong(c, clients ? dictSize(clients) : 0); } } else { addReplySubcommandSyntaxError(c); } } -void channelList(client *c, sds pat, dict **pubsub_channels, int is_sharded) { +void channelList(client *c, sds pat, kvstore *pubsub_channels) { long mblen = 0; void *replylen; - unsigned int slot_cnt = is_sharded ? CLUSTER_SLOTS : 1; + unsigned int slot_cnt = kvstoreNumDicts(pubsub_channels); replylen = addReplyDeferredLen(c); for (unsigned int i = 0; i < slot_cnt; i++) { - if (pubsub_channels[i] == NULL) { + if (!kvstoreDictSize(pubsub_channels, i)) continue; - } - dictIterator *di = dictGetIterator(pubsub_channels[i]); + dictIterator *di = kvstoreDictGetIterator(pubsub_channels, i); dictEntry *de; while((de = dictNext(di)) != NULL) { robj *cobj = dictGetKey(de); @@ -805,3 +758,9 @@ size_t pubsubMemOverhead(client *c) { mem += dictMemUsage(c->pubsubshard_channels); return mem; } + +int pubsubTotalSubscriptions(void) { + return dictSize(server.pubsub_patterns) + + kvstoreSize(server.pubsub_channels) + + kvstoreSize(server.pubsubshard_channels); +} diff --git a/src/rdb.c b/src/rdb.c index a8d58c08f..8bd630bf5 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1301,12 +1301,12 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { dictEntry *de; ssize_t written = 0; ssize_t res; - dbIterator *dbit = NULL; + kvstoreIterator *kvs_it = NULL; static long long info_updated_time = 0; char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB"; redisDb *db = server.db + dbid; - unsigned long long int db_size = dbSize(db, DB_MAIN); + unsigned long long int db_size = kvstoreSize(db->keys); if (db_size == 0) return 0; /* Write the SELECT DB opcode */ @@ -1316,7 +1316,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { written += res; /* Write the RESIZE DB opcode. */ - unsigned long long expires_size = dbSize(db, DB_EXPIRES); + unsigned long long expires_size = kvstoreSize(db->expires); if ((res = rdbSaveType(rdb,RDB_OPCODE_RESIZEDB)) < 0) goto werr; written += res; if ((res = rdbSaveLen(rdb,db_size)) < 0) goto werr; @@ -1324,20 +1324,20 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { if ((res = rdbSaveLen(rdb,expires_size)) < 0) goto werr; written += res; - dbit = dbIteratorInit(db, DB_MAIN); + kvs_it = kvstoreIteratorInit(db->keys); int last_slot = -1; /* Iterate this DB writing every entry */ - while ((de = dbIteratorNext(dbit)) != NULL) { - int curr_slot = dbIteratorGetCurrentSlot(dbit); + while ((de = kvstoreIteratorNext(kvs_it)) != NULL) { + int curr_slot = kvstoreIteratorGetCurrentDictIndex(kvs_it); /* Save slot info. */ if (server.cluster_enabled && curr_slot != last_slot) { if ((res = rdbSaveType(rdb, RDB_OPCODE_SLOT_INFO)) < 0) goto werr; written += res; if ((res = rdbSaveLen(rdb, curr_slot)) < 0) goto werr; written += res; - if ((res = rdbSaveLen(rdb, dictSize(db->dict[curr_slot]))) < 0) goto werr; + if ((res = rdbSaveLen(rdb, kvstoreDictSize(db->keys, curr_slot))) < 0) goto werr; written += res; - if ((res = rdbSaveLen(rdb, dictSize(db->expires[curr_slot]))) < 0) goto werr; + if ((res = rdbSaveLen(rdb, kvstoreDictSize(db->expires, curr_slot))) < 0) goto werr; written += res; last_slot = curr_slot; } @@ -1368,11 +1368,11 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { } } } - dbReleaseIterator(dbit); + kvstoreIteratorRelease(kvs_it); return written; werr: - if (dbit) dbReleaseIterator(dbit); + if (kvs_it) kvstoreIteratorRelease(kvs_it); return -1; } @@ -3027,7 +3027,6 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { return retval; } - /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned. * The rdb_loading_ctx argument holds objects to which the rdb will be loaded to, @@ -3131,8 +3130,8 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin continue; /* Ignore gracefully. */ } /* In cluster mode we resize individual slot specific dictionaries based on the number of keys that slot holds. */ - dictExpand(db->dict[slot_id], slot_size); - dictExpand(db->expires[slot_id], expires_slot_size); + kvstoreDictExpand(db->keys, slot_id, slot_size); + kvstoreDictExpand(db->expires, slot_id, slot_size); should_expand_db = 0; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_AUX) { @@ -3266,8 +3265,8 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin /* If there is no slot info, it means that it's either not cluster mode or we are trying to load legacy RDB file. * In this case we want to estimate number of keys per slot and resize accordingly. */ if (should_expand_db) { - dbExpand(db, db_size, DB_MAIN, 0); - dbExpand(db, expires_size, DB_EXPIRES, 0); + dbExpand(db, db_size, 0); + dbExpandExpires(db, db_size, 0); should_expand_db = 0; } diff --git a/src/server.c b/src/server.c index f53d583d5..3e600a948 100644 --- a/src/server.c +++ b/src/server.c @@ -436,63 +436,6 @@ int dictResizeAllowed(size_t moreMem, double usedRatio) { } } -/* Adds dictionary to the rehashing list, which allows us - * to quickly find rehash targets during incremental rehashing. - * - * Updates the bucket count in cluster-mode for the given dictionary in a DB, bucket count - * incremented with the new ht size during the rehashing phase. In non-cluster mode, - * bucket count can be retrieved directly from single dict bucket. */ -void dictRehashingStarted(dict *d, dbKeyType keyType) { - dbDictMetadata *metadata = (dbDictMetadata *)dictMetadata(d); - listAddNodeTail(server.rehashing, d); - metadata->rehashing_node = listLast(server.rehashing); - - if (!server.cluster_enabled) return; - unsigned long long from, to; - dictRehashingInfo(d, &from, &to); - server.db[0].sub_dict[keyType].bucket_count += to; /* Started rehashing (Add the new ht size) */ -} - -/* Remove dictionary from the rehashing list. - * - * Updates the bucket count for the given dictionary in a DB. It removes - * the old ht size of the dictionary from the total sum of buckets for a DB. */ -void dictRehashingCompleted(dict *d, dbKeyType keyType) { - dbDictMetadata *metadata = (dbDictMetadata *)dictMetadata(d); - if (metadata->rehashing_node) { - listDelNode(server.rehashing, metadata->rehashing_node); - metadata->rehashing_node = NULL; - } - - if (!server.cluster_enabled) return; - unsigned long long from, to; - dictRehashingInfo(d, &from, &to); - server.db[0].sub_dict[keyType].bucket_count -= from; /* Finished rehashing (Remove the old ht size) */ -} - -void dbDictRehashingStarted(dict *d) { - dictRehashingStarted(d, DB_MAIN); -} - -void dbDictRehashingCompleted(dict *d) { - dictRehashingCompleted(d, DB_MAIN); -} - -void dbExpiresRehashingStarted(dict *d) { - dictRehashingStarted(d, DB_EXPIRES); -} - -void dbExpiresRehashingCompleted(dict *d) { - dictRehashingCompleted(d, DB_EXPIRES); -} - -/* Returns the size of the DB dict metadata in bytes. */ -size_t dbDictMetadataSize(dict *d) { - UNUSED(d); - /* NOTICE: this also affects overhead_ht_main and overhead_ht_expires in getMemoryOverheadData. */ - return sizeof(dbDictMetadata); -} - /* Generic hash table type where keys are Redis Objects, Values * dummy pointers. */ dictType objectKeyPointerValueDictType = { @@ -524,6 +467,8 @@ dictType setDictType = { NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ dictSdsDestructor, /* key destructor */ + NULL, /* val destructor */ + NULL, /* allow to expand */ .no_value = 1, /* no values in this dict */ .keys_are_odd = 1 /* an SDS string is always an odd pointer */ }; @@ -536,7 +481,7 @@ dictType zsetDictType = { dictSdsKeyCompare, /* key compare */ NULL, /* Note: SDS string shared & freed by skiplist */ NULL, /* val destructor */ - NULL /* allow to expand */ + NULL, /* allow to expand */ }; /* Db->dict, keys are sds strings, vals are Redis objects. */ @@ -548,9 +493,6 @@ dictType dbDictType = { dictSdsDestructor, /* key destructor */ dictObjectDestructor, /* val destructor */ dictResizeAllowed, /* allow to resize */ - dbDictRehashingStarted, - dbDictRehashingCompleted, - dbDictMetadataSize, }; /* Db->expires */ @@ -562,9 +504,6 @@ dictType dbExpiresDictType = { NULL, /* key destructor */ NULL, /* val destructor */ dictResizeAllowed, /* allow to resize */ - dbExpiresRehashingStarted, - dbExpiresRehashingCompleted, - dbDictMetadataSize, }; /* Command table. sds string -> command struct pointer. */ @@ -586,7 +525,7 @@ dictType hashDictType = { dictSdsKeyCompare, /* key compare */ dictSdsDestructor, /* key destructor */ dictSdsDestructor, /* val destructor */ - NULL /* allow to expand */ + NULL, /* allow to expand */ }; /* Dict type without destructor */ @@ -693,53 +632,6 @@ dictType clientDictType = { .no_value = 1 /* no values in this dict */ }; -/* In cluster-enabled setup, this method traverses through all main/expires dictionaries (CLUSTER_SLOTS) - * and triggers a resize if the percentage of used buckets in the HT reaches (100 / HASHTABLE_MIN_FILL) - * we shrink the hash table to save memory, or expand the hash when the percentage of used buckets reached - * 100. - * - * In non cluster-enabled setup, it resize main/expires dictionary based on the same condition described above. */ -void tryResizeHashTables(int dbid) { - redisDb *db = &server.db[dbid]; - int dicts_per_call = min(CRON_DICTS_PER_DB, db->dict_count); - for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { - for (int i = 0; i < dicts_per_call; i++) { - int slot = db->sub_dict[subdict].resize_cursor; - dict *d = (subdict == DB_MAIN ? db->dict[slot] : db->expires[slot]); - if (dictShrinkIfNeeded(d) == DICT_ERR) { - dictExpandIfNeeded(d); - } - db->sub_dict[subdict].resize_cursor = (slot + 1) % db->dict_count; - } - } -} - -/* Our hash table implementation performs rehashing incrementally while - * we write/read from the hash table. Still if the server is idle, the hash - * table will use two tables for a long time. So we try to use 1 millisecond - * of CPU time at every call of this function to perform some rehashing. - * - * The function returns 1 if some rehashing was performed, otherwise 0 - * is returned. */ -int incrementallyRehash(void) { - if (listLength(server.rehashing) == 0) return 0; - serverLog(LL_DEBUG,"Rehashing list length: %lu", listLength(server.rehashing)); - - /* Our goal is to rehash as many dictionaries as we can before reaching predefined threshold, - * after each dictionary completes rehashing, it removes itself from the list. */ - listNode *node; - monotime timer; - elapsedStart(&timer); - while ((node = listFirst(server.rehashing))) { - uint64_t elapsed_us = elapsedUs(timer); - if (elapsed_us >= INCREMENTAL_REHASHING_THRESHOLD_US) { - break; /* Reached the time limit. */ - } - dictRehashMicroseconds(listNodeValue(node), INCREMENTAL_REHASHING_THRESHOLD_US - elapsed_us); - } - return 1; -} - /* This function is called once a background process of some kind terminates, * as we want to avoid resizing the hash tables when there is a child in order * to play well with copy-on-write (otherwise when a resize happens lots of @@ -1179,21 +1071,33 @@ void databasesCron(void) { * DB we'll be able to start from the successive in the next * cron loop iteration. */ static unsigned int resize_db = 0; + static unsigned int rehash_db = 0; int dbs_per_call = CRON_DBS_PER_CALL; int j; /* Don't test more DBs than we have. */ if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum; - /* Resize */ for (j = 0; j < dbs_per_call; j++) { - tryResizeHashTables(resize_db % server.dbnum); + redisDb *db = &server.db[resize_db % server.dbnum]; + kvstoreTryResizeDicts(db->keys, CRON_DICTS_PER_DB); + kvstoreTryResizeDicts(db->expires, CRON_DICTS_PER_DB); resize_db++; } /* Rehash */ if (server.activerehashing) { - incrementallyRehash(); + uint64_t elapsed_us = 0; + for (j = 0; j < dbs_per_call; j++) { + redisDb *db = &server.db[rehash_db % server.dbnum]; + elapsed_us += kvstoreIncrementallyRehash(db->keys, INCREMENTAL_REHASHING_THRESHOLD_US); + if (elapsed_us >= INCREMENTAL_REHASHING_THRESHOLD_US) + break; + elapsed_us += kvstoreIncrementallyRehash(db->expires, INCREMENTAL_REHASHING_THRESHOLD_US); + if (elapsed_us >= INCREMENTAL_REHASHING_THRESHOLD_US) + break; + rehash_db++; + } } } } @@ -1449,9 +1353,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { for (j = 0; j < server.dbnum; j++) { long long size, used, vkeys; - size = dbBuckets(&server.db[j], DB_MAIN); - used = dbSize(&server.db[j], DB_MAIN); - vkeys = dbSize(&server.db[j], DB_EXPIRES); + size = kvstoreBuckets(server.db[j].keys); + used = kvstoreSize(server.db[j].keys); + vkeys = kvstoreSize(server.db[j].expires); if (used || vkeys) { serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); } @@ -2669,17 +2573,6 @@ void makeThreadKillable(void) { pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); } -/* When adding fields, please check the initTempDb related logic. */ -void initDbState(redisDb *db){ - for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { - db->sub_dict[subdict].non_empty_slots = 0; - db->sub_dict[subdict].key_count = 0; - db->sub_dict[subdict].resize_cursor = 0; - db->sub_dict[subdict].slot_size_index = server.cluster_enabled ? zcalloc(sizeof(unsigned long long) * (CLUSTER_SLOTS + 1)) : NULL; - db->sub_dict[subdict].bucket_count = 0; - } -} - void initServer(void) { int j; @@ -2755,10 +2648,10 @@ void initServer(void) { server.db = zmalloc(sizeof(redisDb)*server.dbnum); /* Create the Redis databases, and initialize other internal state. */ - int slot_count = (server.cluster_enabled) ? CLUSTER_SLOTS : 1; - for (j = 0; j < server.dbnum; j++) { - server.db[j].dict = dictCreateMultiple(&dbDictType, slot_count); - server.db[j].expires = dictCreateMultiple(&dbExpiresDictType,slot_count); + int slot_count_bits = (server.cluster_enabled) ? CLUSTER_SLOT_MASK_BITS : 0; + for (j = 0; j < server.dbnum; j++) { + server.db[j].keys = kvstoreCreate(&dbDictType, slot_count_bits, KVSTORE_ALLOCATE_DICTS_ON_DEMAND); + server.db[j].expires = kvstoreCreate(&dbExpiresDictType, slot_count_bits, KVSTORE_ALLOCATE_DICTS_ON_DEMAND); server.db[j].expires_cursor = 0; server.db[j].blocking_keys = dictCreate(&keylistDictType); server.db[j].blocking_keys_unblock_on_nokey = dictCreate(&objectKeyPointerValueDictType); @@ -2767,16 +2660,15 @@ void initServer(void) { server.db[j].id = j; server.db[j].avg_ttl = 0; server.db[j].defrag_later = listCreate(); - server.db[j].dict_count = slot_count; - initDbState(&server.db[j]); listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree); } - server.rehashing = listCreate(); evictionPoolAlloc(); /* Initialize the LRU keys pool. */ - server.pubsub_channels = dictCreate(&objToDictDictType); + /* Note that server.pubsub_channels was chosen to be a kvstore (with only one dict, which + * seems odd) just to make the code cleaner by making it be the same type as server.pubsubshard_channels + * (which has to be kvstore), see pubsubtype.serverPubSubChannels */ + server.pubsub_channels = kvstoreCreate(&objToDictDictType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND); server.pubsub_patterns = dictCreate(&objToDictDictType); - server.pubsubshard_channels = zcalloc(sizeof(dict *) * slot_count); - server.shard_channel_count = 0; + server.pubsubshard_channels = kvstoreCreate(&objToDictDictType, slot_count_bits, KVSTORE_ALLOCATE_DICTS_ON_DEMAND | KVSTORE_FREE_EMPTY_DICTS); server.pubsub_clients = 0; server.cronloops = 0; server.in_exec = 0; @@ -5906,9 +5798,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "current_eviction_exceeded_time:%lld\r\n", current_eviction_exceeded_time / 1000, "keyspace_hits:%lld\r\n", server.stat_keyspace_hits, "keyspace_misses:%lld\r\n", server.stat_keyspace_misses, - "pubsub_channels:%ld\r\n", dictSize(server.pubsub_channels), + "pubsub_channels:%llu\r\n", kvstoreSize(server.pubsub_channels), "pubsub_patterns:%lu\r\n", dictSize(server.pubsub_patterns), - "pubsubshard_channels:%llu\r\n", server.shard_channel_count, + "pubsubshard_channels:%llu\r\n", kvstoreSize(server.pubsubshard_channels), "latest_fork_usec:%lld\r\n", server.stat_fork_time, "total_forks:%lld\r\n", server.stat_total_forks, "migrate_cached_sockets:%ld\r\n", dictSize(server.migrate_cached_sockets), @@ -6135,8 +6027,8 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { for (j = 0; j < server.dbnum; j++) { long long keys, vkeys; - keys = dbSize(&server.db[j], DB_MAIN); - vkeys = dbSize(&server.db[j], DB_EXPIRES); + keys = kvstoreSize(server.db[j].keys); + vkeys = kvstoreSize(server.db[j].expires); if (keys || vkeys) { info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n", diff --git a/src/server.h b/src/server.h index b1e851f34..23802fe68 100644 --- a/src/server.h +++ b/src/server.h @@ -67,6 +67,7 @@ typedef long long ustime_t; /* microsecond time type. */ #include "ae.h" /* Event driven programming library */ #include "sds.h" /* Dynamic safe strings */ #include "dict.h" /* Hash tables */ +#include "kvstore.h" /* Slot-based hash table */ #include "adlist.h" /* Linked lists */ #include "zmalloc.h" /* total memory usage aware version of malloc/free */ #include "anet.h" /* Networking the easy way */ @@ -970,31 +971,12 @@ typedef struct replBufBlock { char buf[]; } replBufBlock; -/* When adding fields, please check the swap db related logic. */ -typedef struct dbDictState { - int resize_cursor; /* Cron job uses this cursor to gradually resize all dictionaries. */ - int non_empty_slots; /* The number of non-empty slots. */ - unsigned long long key_count; /* Total number of keys in this DB. */ - unsigned long long bucket_count; /* Total number of buckets in this DB across dictionaries (only used for cluster-enabled). */ - unsigned long long *slot_size_index; /* Binary indexed tree (BIT) that describes cumulative key frequencies up until given slot. */ -} dbDictState; - -typedef enum dbKeyType { - DB_MAIN, - DB_EXPIRES -} dbKeyType; - -/* Dict metadata for database, used for record the position in rehashing list. */ -typedef struct dbDictMetadata { - listNode *rehashing_node; /* list node in rehashing list */ -} dbDictMetadata; - /* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { - dict **dict; /* The keyspace for this DB */ - dict **expires; /* Timeout of keys with a timeout set */ + kvstore *keys; /* The keyspace for this DB */ + kvstore *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *blocking_keys_unblock_on_nokey; /* Keys with clients waiting for * data, and should be unblocked if key is deleted (XREADEDGROUP). @@ -1005,8 +987,6 @@ typedef struct redisDb { long long avg_ttl; /* Average TTL, just for stats */ unsigned long expires_cursor; /* Cursor of the active expire cycle. */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ - int dict_count; /* Indicates total number of dictionaries owned by this DB, 1 dict per slot in cluster mode. */ - dbDictState sub_dict[2]; /* Metadata for main and expires dictionaries */ } redisDb; /* forward declaration for functions ctx */ @@ -1574,7 +1554,6 @@ struct redisServer { int hz; /* serverCron() calls frequency in hertz */ int in_fork_child; /* indication that this is a fork child */ redisDb *db; - list *rehashing; /* List of dictionaries in DBs that are currently rehashing. */ dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ aeEventLoop *el; @@ -1994,12 +1973,11 @@ struct redisServer { size_t blocking_op_nesting; /* Nesting level of blocking operation, used to reset blocked_last_cron. */ long long blocked_last_cron; /* Indicate the mstime of the last time we did cron jobs from a blocking operation */ /* Pubsub */ - dict *pubsub_channels; /* Map channels to list of subscribed clients */ + kvstore *pubsub_channels; /* Map channels to list of subscribed clients */ dict *pubsub_patterns; /* A dict of pubsub_patterns */ int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an xor of NOTIFY_... flags. */ - dict **pubsubshard_channels; /* Map shard channels in every slot to list of subscribed clients */ - unsigned long long shard_channel_count; + kvstore *pubsubshard_channels; /* Map shard channels in every slot to list of subscribed clients */ unsigned int pubsub_clients; /* # of clients in Pub/Sub mode */ /* Cluster */ int cluster_enabled; /* Is cluster enabled? */ @@ -2445,20 +2423,6 @@ typedef struct { unsigned char *lpi; /* listpack iterator */ } setTypeIterator; -typedef struct dbIterator dbIterator; - -/* DB iterator specific functions */ -dbIterator *dbIteratorInit(redisDb *db, dbKeyType keyType); -void dbReleaseIterator(dbIterator *dbit); -dict *dbIteratorNextDict(dbIterator *dbit); -dict *dbGetDictFromIterator(dbIterator *dbit); -int dbIteratorGetCurrentSlot(dbIterator *dbit); -dictEntry *dbIteratorNext(dbIterator *iter); - -/* SCAN specific commands for easy cursor manipulation, shared between main code and modules. */ -int getAndClearSlotIdFromCursor(unsigned long long *cursor); -void addSlotIdToCursor(int slot, unsigned long long *cursor); - /* Structure to hold hash iteration abstraction. Note that iteration over * hashes involves both fields and values. Because it is possible that * not both are required, store pointers in the iterator to avoid @@ -3143,21 +3107,16 @@ void dismissMemoryInChild(void); #define RESTART_SERVER_GRACEFULLY (1<<0) /* Do proper shutdown. */ #define RESTART_SERVER_CONFIG_REWRITE (1<<1) /* CONFIG REWRITE before restart.*/ int restartServer(int flags, mstime_t delay); -unsigned long long int dbSize(redisDb *db, dbKeyType keyType); -int dbNonEmptySlots(redisDb *db, dbKeyType keyType); int getKeySlot(sds key); int calculateKeySlot(sds key); -unsigned long dbBuckets(redisDb *db, dbKeyType keyType); -size_t dbMemUsage(redisDb *db, dbKeyType keyType); -dictEntry *dbFind(redisDb *db, void *key, dbKeyType keyType); -unsigned long long dbScan(redisDb *db, dbKeyType keyType, unsigned long long cursor, - int onlyslot, dictScanFunction *fn, - int (dictScanValidFunction)(dict *d), void *privdata); -int dbExpand(const redisDb *db, uint64_t db_size, dbKeyType keyType, int try_expand); -unsigned long long cumulativeKeyCountRead(redisDb *db, int idx, dbKeyType keyType); -int getFairRandomSlot(redisDb *db, dbKeyType keyType); -int dbGetNextNonEmptySlot(redisDb *db, int slot, dbKeyType keyType); -int findSlotByKeyIndex(redisDb *db, unsigned long target, dbKeyType keyType); + +/* kvstore wrappers */ +int dbExpand(redisDb *db, uint64_t db_size, int try_expand); +int dbExpandExpires(redisDb *db, uint64_t db_size, int try_expand); +dictEntry *dbFind(redisDb *db, void *key); +dictEntry *dbFindExpires(redisDb *db, void *key); +unsigned long long dbSize(redisDb *db); +unsigned long long dbScan(redisDb *db, unsigned long long cursor, dictScanFunction *scan_cb, void *privdata); /* Set data type */ robj *setTypeCreate(sds value, size_t size_hint); @@ -3214,6 +3173,7 @@ int serverPubsubSubscriptionCount(void); int serverPubsubShardSubscriptionCount(void); size_t pubsubMemOverhead(client *c); void unmarkClientAsPubSub(client *c); +int pubsubTotalSubscriptions(void); /* Keyspace events notification */ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid); diff --git a/tests/unit/other.tcl b/tests/unit/other.tcl index 3fe808f56..823675033 100644 --- a/tests/unit/other.tcl +++ b/tests/unit/other.tcl @@ -527,9 +527,9 @@ start_server {tags {"other external:skip"}} { # Set a key to enable overhead display of db 0 r set a b # The dict containing 128 keys must have expanded, - # its hash table itself takes a lot more than 200 bytes + # its hash table itself takes a lot more than 400 bytes wait_for_condition 100 50 { - [get_overhead_hashtable_main] < 200 + [get_overhead_hashtable_main] < 400 } else { fail "dict did not resize in time" }