diff --git a/.github/workflows/daily.yml b/.github/workflows/daily.yml index 8b4547c4b..b70f98618 100644 --- a/.github/workflows/daily.yml +++ b/.github/workflows/daily.yml @@ -185,7 +185,6 @@ jobs: # - Info keysizes histogram # - Cluster slot stats # Skips slow and defrag tests to avoid timeouts while maintaining good coverage. - # TODO: Fix ASM (atomic slot migration) compatibility with keysizes histogram tracking test-debug-assert-keyspace: runs-on: ubuntu-latest if: | @@ -207,15 +206,13 @@ jobs: repository: ${{ env.GITHUB_REPOSITORY }} ref: ${{ env.GITHUB_HEAD_REF }} - name: make - run: make REDIS_CFLAGS='-Werror -DREDIS_TEST -DDEBUG_ASSERT_KEYSPACE' + run: make REDIS_CFLAGS='-Werror -DREDIS_TEST -DDEBUG_ASSERT_KEYSPACE -DDEBUG_ASSERTIONS' - name: testprep run: sudo apt-get install tcl8.6 tclx - name: test if: true && !contains(github.event.inputs.skiptests, 'redis') run: | ./runtest --verbose --tags "-slow -defrag" \ - --skipunit unit/cluster/slot-stats \ - --skipunit unit/cluster/atomic-slot-migration \ --dump-logs ${{github.event.inputs.test_args}} - name: cluster tests if: true && !contains(github.event.inputs.skiptests, 'cluster') diff --git a/src/bitops.c b/src/bitops.c index f705bb65f..26a08acfa 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -885,8 +885,7 @@ void setbitCommand(client *c) { * update the keysizes histogram. Otherwise, the histogram already * updated in lookupStringForBitCommand() by calling dbAdd(). */ if ((strOldSize > 0) && (strGrowSize != 0)) - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_STRING, - strOldSize, strOldSize + strGrowSize); + updateKeysizesHist(c->db, OBJ_STRING, strOldSize, strOldSize + strGrowSize); } /* Return original value. */ @@ -2108,8 +2107,7 @@ void bitfieldGeneric(client *c, int flags) { * update the keysizes histogram. Otherwise, the histogram already * updated in lookupStringForBitCommand() by calling dbAdd(). */ if ((strOldSize > 0) && (strGrowSize != 0)) - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_STRING, - strOldSize, strOldSize + strGrowSize); + updateKeysizesHist(c->db, OBJ_STRING, strOldSize, strOldSize + strGrowSize); keyModified(c,c->db,c->argv[1],o,1); notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id); diff --git a/src/cluster.c b/src/cluster.c index e5c8509dc..f77b068b6 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1769,14 +1769,13 @@ int clusterIsMySlot(int slot) { return getMyClusterNode() == getNodeBySlot(slot); } -void replySlotsFlushAndFree(client *c, slotRangeArray *slots) { +void replySlotsFlush(client *c, slotRangeArray *slots) { addReplyArrayLen(c, slots->num_ranges); for (int i = 0 ; i < slots->num_ranges ; i++) { addReplyArrayLen(c, 2); addReplyLongLong(c, slots->ranges[i].start); addReplyLongLong(c, slots->ranges[i].end); } - slotRangeArrayFree(slots); } /* Normalizes (sorts and merges adjacent ranges), checks that slot ranges are @@ -2190,6 +2189,9 @@ void sflushCommand(client *c) { return; } slotRangeArrayFree(slots); + + /* takes ownership of myslots */ + asmTrimCtx *trim_ctx = asmTrimCtxCreate(myslots, server.db[0].keys); /* If the selected slots are exactly the same as the local slots, we can * simply flush the entire DB by flushCommandCommon. */ @@ -2198,9 +2200,10 @@ void sflushCommand(client *c) { slotRangeArrayFree(local_slots); if (all_slots_covered) { /* If not flush as blocking async, then reply immediately */ - if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, flags, myslots) == 0) { - replySlotsFlushAndFree(c, myslots); + if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, flags, trim_ctx) == 0) { + replySlotsFlush(c, trim_ctx->slots); } + asmTrimCtxRelease(trim_ctx); return; } @@ -2220,7 +2223,7 @@ void sflushCommand(client *c) { /* Update dirty stats before trimming. */ server.dirty += getKeyCountInSlotRangeArray(myslots); /* Pass client id for active trim to unblock client when trim completes. */ - trim_method = asmTrimSlots(myslots, blocking_async ? c->id : 0, 0); + trim_method = asmTrimSlots(trim_ctx, blocking_async ? c->id : CLIENT_ID_NONE, 0); } else { clusterDelKeysInSlotRangeArray(myslots, 1); } @@ -2237,15 +2240,13 @@ void sflushCommand(client *c) { * unblock client and reply in active trim completion. */ if (blocking_async && trim_method != ASM_TRIM_METHOD_NONE) { blockClientForAsyncFlush(c); - if (trim_method == ASM_TRIM_METHOD_BG) - bioCreateCompRq(BIO_WORKER_LAZY_FREE, unblockClientForAsyncFlush, c->id, myslots); - else /* ASM_TRIM_METHOD_ACTIVE, just free the slot ranges */ - slotRangeArrayFree(myslots); } else { /* Reply with slot ranges that were flushed. SYNC and ASYNC mode will be * replied here immediately. */ - replySlotsFlushAndFree(c, myslots); + replySlotsFlush(c, trim_ctx->slots); } + + asmTrimCtxRelease(trim_ctx); /* if bg trim, released later by kvsAsyncFreeDoneCB() */ } /* The READWRITE command just clears the READONLY command state. */ diff --git a/src/cluster_asm.c b/src/cluster_asm.c index 1756ffa18..db801a9a1 100644 --- a/src/cluster_asm.c +++ b/src/cluster_asm.c @@ -1,11 +1,46 @@ -/* cluster_asm.c -- Atomic slot migration implementation for cluster - * +/* * Copyright (c) 2025-Present, Redis Ltd. * All rights reserved. * * Licensed under your choice of (a) the Redis Source Available License 2.0 * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the * GNU Affero General Public License v3 (AGPLv3). + * + * cluster_asm.c -- Atomic slot migration implementation for cluster + * + * TERMINOLOGY: + * - SOURCE: The node that currently owns the slots (sending data away) + * - DESTINATION: The node that will own the slots (receiving data) + * + * Example: Moving slots 0-100 from Node A to Node B + * - Node A = SOURCE (has the data, will lose ownership) + * - Node B = DESTINATION (will receive data, will gain ownership) + * + * Migration Flow: + * 1. DESTINATION initiates: CLUSTER MIGRATION IMPORT + * (Operator runs command on Node B, the receiving node) + * + * 2. SOURCE forks and sends slot snapshot (RESTORE commands) via RDB channel + * (Node A creates snapshot of slots 0-100) + * + * 3. SOURCE streams incremental changes via main channel + * (Node A forwards new writes to Node B while snapshot is being sent) + * + * 4. DESTINATION applies snapshot and buffers incremental changes + * (Node B receives snapshot, buffers ongoing writes) + * + * 5. SOURCE pauses writes when destination catches up + * (Node A stops accepting writes for slots 0-100 when Node B is nearly caught up) + * + * 6. DESTINATION drains buffer and takes ownership + * (Node B applies final buffered commands, updates config to own slots 0-100) + * + * 7. Config updated atomically via cluster bus + * (All nodes learn: slots 0-100 now belong to Node B) + * + * 8. SOURCE trims migrated keys (background or active) + * (Node A deletes keys from slots 0-100 since it no longer owns them) + * */ #include "server.h" @@ -13,17 +48,25 @@ #include "functions.h" #include "cluster_asm.h" #include "cluster_slot_stats.h" +#include "bio.h" +/* Operation types: import (destination side) or migrate (source side) */ #define ASM_IMPORT (1 << 1) #define ASM_MIGRATE (1 << 2) -#define ASM_DEBUG_TRIM_DEFAULT 0 -#define ASM_DEBUG_TRIM_NONE 1 -#define ASM_DEBUG_TRIM_BG 2 -#define ASM_DEBUG_TRIM_ACTIVE 3 +/* Trimming methods for cleaning up migrated keys */ +#define ASM_DEBUG_TRIM_DEFAULT 0 /* Auto-select based on module subscriptions and client tracking */ +#define ASM_DEBUG_TRIM_NONE 1 /* No trimming (for testing) */ +#define ASM_DEBUG_TRIM_BG 2 /* Background trim: hand off to BIO thread (fast, non-blocking) */ +#define ASM_DEBUG_TRIM_ACTIVE 3 /* Active trim: delete in main thread cron (slow, fires notifications) */ #define ASM_AOF_MIN_ITEMS_PER_KEY 512 /* Minimum number of items per key to use AOF format encoding */ +/* ASM Task: Represents a single slot migration operation. + * Each task tracks the complete lifecycle of migrating one or more slot ranges + * from a source node to a destination node. The task exists on both sides but + * with different states (import states on destination, migrate states on source). + */ typedef struct asmTask { sds id; /* Task ID */ int operation; /* Either ASM_IMPORT or ASM_MIGRATE */ @@ -59,6 +102,8 @@ typedef struct activeTrimJob { int migration_cleanup; /* Whether this is a migration cleanup of slots no longer owned */ } activeTrimJob; +/* ASM Manager: Global singleton that manages all ASM operations. + * Coordinates migration tasks, trim jobs, and maintains statistics. */ struct asmManager { list *tasks; /* List of asmTask to be processed */ list *archived_tasks; /* List of archived asmTask */ @@ -74,6 +119,9 @@ struct asmManager { int debug_trim_method; /* Method to trim the buffer */ int debug_active_trim_delay; /* Sleep before trimming each key */ + /* Background trim tracking */ + size_t bg_trim_running; /* Number of bg trim jobs in progress */ + /* Active trim stats */ unsigned long long active_trim_started; /* Number of times active trim was started */ unsigned long long active_trim_completed; /* Number of times active trim was completed */ @@ -2953,9 +3001,16 @@ void asmUnblockMasterAfterTrim(void) { } } -/* Trim the slots asynchronously in the BIO thread. migration_cleanup is true if this - * is a migration cleanup of slots no longer owned. */ -void asmTriggerBackgroundTrim(slotRangeArray *slots, int migration_cleanup) { +/* Background Trim: Delete migrated keys asynchronously in BIO thread. + * + * It works by moving entire slot data structures (dictionaries) to temporary + * kvstores, then handing them off to BIO thread for deletion. + * + * @param trim_ctx Context for slot ranges and histogram tracking + * @param migration_cleanup True if this is post-migration cleanup (fires module events) + */ +void asmTriggerBackgroundTrim(asmTrimCtx *trim_ctx, int migration_cleanup) { + slotRangeArray *slots = trim_ctx->slots; RedisModuleClusterSlotMigrationTrimInfoV1 fsi = { REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION, (RedisModuleSlotRangeArray *) slots @@ -2969,8 +3024,8 @@ void asmTriggerBackgroundTrim(slotRangeArray *slots, int migration_cleanup) { signalFlushedDb(0, 1, slots); - /* Create temp kvstores and estore, move relevant slot dicts/ebuckets into them, - * and delete them in BIO thread asynchronously. */ + /* Create temporary kvstores to hold the slot data we're about to move. + * These will be deleted in the BIO thread. */ kvstore *keys = kvstoreCreate(&kvstoreBaseType, &dbDictType, CLUSTER_SLOT_MASK_BITS, KVSTORE_ALLOCATE_DICTS_ON_DEMAND); @@ -2982,6 +3037,7 @@ void asmTriggerBackgroundTrim(slotRangeArray *slots, int migration_cleanup) { size_t total_keys = 0; + /* Move slot dictionaries from main DB to temp kvstores (O(1) per slot) */ for (int i = 0; i < slots->num_ranges; i++) { for (int slot = slots->ranges[i].start; slot <= slots->ranges[i].end; slot++) { total_keys += kvstoreDictSize(server.db[0].keys, slot); @@ -2992,7 +3048,7 @@ void asmTriggerBackgroundTrim(slotRangeArray *slots, int migration_cleanup) { } } - emptyDbDataAsync(keys, expires, subexpires, stream_idmp_keys); + emptyDbDataAsync(keys, expires, subexpires, stream_idmp_keys, trim_ctx); sds str = slotRangeArrayToString(slots); serverLog(LL_NOTICE, "Background trim started for slots: %s to trim %zu keys.", str, total_keys); @@ -3049,7 +3105,39 @@ void asmTriggerBackgroundTrim(slotRangeArray *slots, int migration_cleanup) { * Trim the slots, return the trim method used. * If client_id is non-zero, the client will be unblocked when trim completes. * If migration_cleanup is true, this is a migration cleanup of slots no longer owned. */ -int asmTrimSlots(struct slotRangeArray *slots, uint64_t client_id, int migration_cleanup) { + +/* Create ASM trim context with refcount=1 */ +asmTrimCtx *asmTrimCtxCreate(slotRangeArray *slots, kvstore *target_kvstore) { + asmTrimCtx *ctx = zcalloc(sizeof(asmTrimCtx)); + ctx->refcount = 1; + ctx->slots = slots; + ctx->target_kvstore = target_kvstore; + /* delta histograms are zero-initialized by zcalloc */ + return ctx; +} + +/* Increment refcount */ +void asmTrimCtxRetain(asmTrimCtx *ctx) { + if (!ctx) return; + ctx->refcount++; +} + +/* Decrement refcount, free if reaches 0 */ +void asmTrimCtxRelease(asmTrimCtx *ctx) { + if (!ctx) return; + + serverAssert(ctx->refcount > 0); + ctx->refcount--; + + if (ctx->refcount == 0) { + slotRangeArrayFree(ctx->slots); + zfree(ctx); + } +} + +int asmTrimSlots(asmTrimCtx *ctx, uint64_t client_id, int migration_cleanup) { + serverAssert(ctx != NULL); + if (asmManager->debug_trim_method == ASM_DEBUG_TRIM_NONE) return ASM_TRIM_METHOD_NONE; @@ -3065,10 +3153,18 @@ int asmTrimSlots(struct slotRangeArray *slots, uint64_t client_id, int migration (asmManager->debug_trim_method == ASM_DEBUG_TRIM_ACTIVE) || (asmManager->debug_trim_method == ASM_DEBUG_TRIM_DEFAULT && moduleHasSubscribersForKeyspaceEvent(NOTIFY_KEY_TRIMMED)); - if (activetrim) - asmTriggerActiveTrim(slots, client_id, migration_cleanup); - else - asmTriggerBackgroundTrim(slots, migration_cleanup); + if (activetrim) { + asmTriggerActiveTrim(ctx->slots, client_id, migration_cleanup); + } else { + /* Background trim: + * - Retain ctx for kvsAsyncFreeDoneCB() to release ctx later + * - Trigger background trim. Also updates ctx delta histogram. + * - Schedule completion cb to deduce delta histogram from DB */ + asmBgTrimCounterIncr(); + asmTrimCtxRetain(ctx); + asmTriggerBackgroundTrim(ctx, migration_cleanup); + bioCreateCompRq(BIO_WORKER_LAZY_FREE, kvsAsyncFreeDoneCB, client_id, ctx); + } return activetrim ? ASM_TRIM_METHOD_ACTIVE : ASM_TRIM_METHOD_BG; } @@ -3126,10 +3222,11 @@ void asmTrimJobProcessPending(void) { listRewind(asmManager->pending_trim_jobs, &li); while ((ln = listNext(&li)) != NULL) { slotRangeArray *slots = listNodeValue(ln); - asmTrimSlots(slots, 0, 1); + asmTrimCtx *ctx = asmTrimCtxCreate(slots, server.db[0].keys); + asmTrimSlots(ctx, CLIENT_ID_NONE, 1); propagateTrimSlots(slots); listDelNode(asmManager->pending_trim_jobs, ln); - slotRangeArrayFree(slots); + asmTrimCtxRelease(ctx); /* Release ctx (if bg trim, released later by kvsAsyncFreeDoneCB) */ } } @@ -3299,7 +3396,6 @@ void activeTrimJobFreeMethod(void *ptr) { /* Reply with the slot ranges that requested to be trimmed. Generally we * cancel trim jobs as the dataset is reset, no need to trim anymore. */ unblockClientForAsyncFlush(job->client_id, job->slots); - job->slots = NULL; /* slots were freed in unblockClientForAsyncFlush */ } if (job->slots) slotRangeArrayFree(job->slots); zfree(job); @@ -3366,6 +3462,7 @@ void trimslotsCommand(client *c) { * command may have an update for the same key that is supposed to be * trimmed. We have to trim the keys synchronously. */ clusterDelKeysInSlotRangeArray(slots, 1); + slotRangeArrayFree(slots); } else { /* We cannot trim any slot served by this node. */ if (clusterNodeIsMaster(getMyClusterNode())) { @@ -3379,14 +3476,15 @@ void trimslotsCommand(client *c) { } } } - asmTrimSlots(slots, 0, 1); + asmTrimCtx *ctx = asmTrimCtxCreate(slots, server.db[0].keys); + asmTrimSlots(ctx, CLIENT_ID_NONE, 1); + /* Release ctx - if bg trim, will be freed when BIO completes */ + asmTrimCtxRelease(ctx); } /* Command will not be propagated automatically since it does not modify * the dataset. */ forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF); - - slotRangeArrayFree(slots); addReply(c, shared.ok); } @@ -3483,6 +3581,25 @@ int asmIsAnyTrimJobOverlaps(slotRangeArray *slots) { return 0; } +/* Decrement background trim counter. Called from completion callback. */ +void asmBgTrimCounterDecr(void) { + if (!asmManager) return; + debugServerAssert(asmManager->bg_trim_running > 0); + asmManager->bg_trim_running--; +} + +/* Increment background trim counter. */ +void asmBgTrimCounterIncr(void) { + if (!asmManager) return; + asmManager->bg_trim_running++; +} + +/* Check if background trim is running (for skipping debug assertions). */ +int asmIsBgTrimRunning(void) { + if (!asmManager) return 0; + return asmManager->bg_trim_running > 0; +} + /* Check if there is any trim job in progress. */ int asmIsTrimInProgress(void) { if (!server.cluster_enabled) return 0; diff --git a/src/cluster_asm.h b/src/cluster_asm.h index c1e7ccab1..475c97113 100644 --- a/src/cluster_asm.h +++ b/src/cluster_asm.h @@ -57,6 +57,14 @@ int asmGetTrimmingSlotForCommand(struct redisCommand *cmd, robj **argv, int argc void asmActiveTrimCycle(void); int asmIsKeyInTrimJob(sds keyname); int asmModulePropagateBeforeSlotSnapshot(struct redisCommand *cmd, robj **argv, int argc); -int asmTrimSlots(struct slotRangeArray *slots, uint64_t client_id, int migration_cleanup); +int asmTrimSlots(struct asmTrimCtx *ctx, uint64_t client_id, int migration_cleanup); +int asmIsBgTrimRunning(void); +void asmBgTrimCounterDecr(void); +void asmBgTrimCounterIncr(void); + +/* Context for ASM background trim */ +struct asmTrimCtx *asmTrimCtxCreate(struct slotRangeArray *slots, kvstore *target_kvstore); +void asmTrimCtxRetain(struct asmTrimCtx *ctx); +void asmTrimCtxRelease(struct asmTrimCtx *ctx); #endif diff --git a/src/db.c b/src/db.c index 943af270c..3bb6d2823 100644 --- a/src/db.c +++ b/src/db.c @@ -76,8 +76,7 @@ void updateLRM(robj *o) { * represents the number of keys with a size in the range 2^i and 2^(i+1) * exclusive. oldLen/newLen must be smaller than 2^48, and if their value * equals -1, it means that the key is being created/deleted, respectively. Each - * data type has its own histogram and it is per database (In addition, there is - * histogram per slot for future cluster use). + * data type has its own histogram and it is maintained per database. * * Example mapping of key lengths to bins: * [1,2)->1 [2,4)->2 [4,8)->3 [8,16)->4 ... @@ -85,19 +84,13 @@ void updateLRM(robj *o) { * Since strings can be zero length, the histogram also tracks: * [0,1)->0 */ -void updateSlotHist(keysizesHist kvstoreHist, keysizesHist dictHist, uint32_t type, int64_t oldLen, int64_t newLen) { +void kvsUpdateHistogram(keysizesHist kvstoreHist, uint32_t type, int64_t oldLen, int64_t newLen) { if(unlikely(type >= OBJ_TYPE_BASIC_MAX)) return; if (oldLen > 0) { int old_bin = log2ceil(oldLen) + 1; debugServerAssert(old_bin < MAX_KEYSIZES_BINS); - /* If following a key deletion it is last one in slot's dict, then - * slot's dict might get released as well. Verify if metadata is not NULL. */ - if (dictHist) { - dictHist[type][old_bin]--; - debugServerAssert(dictHist[type][old_bin] >= 0); - } kvstoreHist[type][old_bin]--; debugServerAssert(kvstoreHist[type][old_bin] >= 0); } else { @@ -105,11 +98,6 @@ void updateSlotHist(keysizesHist kvstoreHist, keysizesHist dictHist, uint32_t ty if (oldLen == 0) { /* Only strings can be empty. Yet, a command flow might temporarily * dbAdd() empty collection, and only after add elements. */ - - if (dictHist) { - dictHist[type][0]--; - debugServerAssert(dictHist[type][0] >= 0); - } kvstoreHist[type][0]--; debugServerAssert(kvstoreHist[type][0] >= 0); } @@ -118,31 +106,24 @@ void updateSlotHist(keysizesHist kvstoreHist, keysizesHist dictHist, uint32_t ty if (newLen > 0) { int new_bin = log2ceil(newLen) + 1; debugServerAssert(new_bin < MAX_KEYSIZES_BINS); - /* If following a key deletion it is last one in slot's dict, then - * slot's dict might get released as well. Verify if metadata is not NULL. */ - if (dictHist) dictHist[type][new_bin]++; kvstoreHist[type][new_bin]++; } else { /* here, newLen can be either 0 or -1 */ if (newLen == 0) { /* Only strings can be empty. Yet, a command flow might temporarily * dbAdd() empty collection, and only after add elements. */ - - if (dictHist) dictHist[type][0]++; kvstoreHist[type][0]++; } } } -void updateKeysizesHist(redisDb *db, int didx, uint32_t type, int64_t oldLen, int64_t newLen) { +void updateKeysizesHist(redisDb *db, uint32_t type, int64_t oldLen, int64_t newLen) { kvstoreMetadata *kvstoreMeta = kvstoreGetMetadata(db->keys); - kvstoreDictMetadata *dictMeta = kvstoreGetDictMeta(db->keys, didx, 0); - updateSlotHist(kvstoreMeta->keysizes_hist, dictMeta ? dictMeta->keysizes_hist : NULL, type, oldLen, newLen); + kvsUpdateHistogram(kvstoreMeta->keysizes_hist, type, oldLen, newLen); } void updateSlotAllocSize(redisDb *db, int didx, kvobj *kv, int64_t oldsize, int64_t newsize) { debugServerAssert(server.memory_tracking_enabled); - kvstoreMetadata *kvstoreMeta = kvstoreGetMetadata(db->keys); kvstoreDictMetadata *dictMeta = kvstoreGetDictMeta(db->keys, didx, 0); /* Early return if nothing changed */ @@ -160,7 +141,8 @@ void updateSlotAllocSize(redisDb *db, int didx, kvobj *kv, int64_t oldsize, int6 } /* Update allocation size histogram */ - updateSlotHist(kvstoreMeta->allocsizes_hist, NULL, kv->type, oldsize, newsize); + kvstoreMetadata *kvstoreMeta = kvstoreGetMetadata(db->keys); + kvsUpdateHistogram(kvstoreMeta->allocsizes_hist, kv->type, oldsize, newsize); } static void dbgAssertHist(kvstore *kvs, keysizesHist hist, @@ -201,35 +183,21 @@ static void dbgAssertHist(kvstore *kvs, keysizesHist hist, } } -/* Assert keysizes histogram (For debugging only) - * - * Triggered by DEBUG KEYSIZES-HIST-ASSERT 1 and tested after each command. - */ -void dbgAssertKeysizesHist(redisDb *db) { - /* Don't assert during nested calls. Intermediate state may be inconsistent. */ - if (server.execution_nesting) return; - - /* Don't assert during RDB loading. Database may be in inconsistent state. */ - if (server.loading || server.async_loading) return; - +/* Assert keysizes histogram (For debugging only) */ +static void dbgAssertKeysizesHist(redisDb *db) { kvstoreMetadata *meta = kvstoreGetMetadata(db->keys); dbgAssertHist(db->keys, meta->keysizes_hist, getObjectLength, "dbgAssertKeysizesHist"); - if (server.memory_tracking_enabled) - dbgAssertHist(db->keys, meta->allocsizes_hist, kvobjAllocSize, "dbgAssertAllocsizesHist"); } -/* Assert per-slot alloc_size (For debugging only) - * - * Triggered by DEBUG ALLOCSIZE-SLOTS-ASSERT 1 and tested after each command. - */ -void dbgAssertAllocSizePerSlot(redisDb *db) { - /* Don't assert during nested calls. Intermediate state may be inconsistent. */ - if (server.execution_nesting) return; - - /* Don't assert during RDB loading. Database may be in inconsistent state. */ - if (server.loading || server.async_loading) return; - +/* Assert per-slot alloc_size (For debugging only) */ +static void dbgAssertAllocSizePerSlot(redisDb *db) { if (!server.memory_tracking_enabled) return; + + /* Check allocsizes histogram per db */ + kvstoreMetadata *meta = kvstoreGetMetadata(db->keys); + dbgAssertHist(db->keys, meta->allocsizes_hist, kvobjAllocSize, "dbgAssertAllocsizesHist"); + + /* Check alloc_size per slot */ size_t slot_sizes[CLUSTER_SLOTS] = {0}; dictEntry *de; kvstoreIterator kvs_it; @@ -252,6 +220,28 @@ void dbgAssertAllocSizePerSlot(redisDb *db) { } } +/* Run debug assertions based on server.dbg_assert_flags. + * + * DBG_ASSERT_KEYSIZES: Triggered by DEBUG KEYSIZES-HIST-ASSERT 1 + * DBG_ASSERT_ALLOC_SLOT: Triggered by DEBUG ALLOCSIZE-SLOTS-ASSERT 1 + */ +void dbgRunAssertions(redisDb *db) { + /* Don't assert during nested calls. Intermediate state may be inconsistent. */ + if (server.execution_nesting) return; + + /* Don't assert during RDB loading. Database may be in inconsistent state. */ + if (server.loading || server.async_loading) return; + + /* Don't assert during ASM background trim. Histogram delta hasn't been applied yet. */ + if (asmIsBgTrimRunning()) return; + + if (server.dbg_assert_flags & DBG_ASSERT_KEYSIZES) + dbgAssertKeysizesHist(db); + + if (server.dbg_assert_flags & DBG_ASSERT_ALLOC_SLOT) + dbgAssertAllocSizePerSlot(db); +} + /* Lookup a kvobj for read or write operations, or return NULL if the it is not * found in the specified DB. This function implements the functionality of * lookupKeyRead(), lookupKeyWrite() and their ...WithFlags() variants. @@ -450,7 +440,7 @@ kvobj *dbAddInternal(redisDb *db, robj *key, robj **valref, dictEntryLink *link, signalKeyAsReady(db, key, kv->type); notifyKeyspaceEvent(NOTIFY_NEW,"new",key,db->id); - updateKeysizesHist(db, slot, kv->type, -1, getObjectLength(kv)); /* add hist */ + updateKeysizesHist(db, kv->type, -1, getObjectLength(kv)); /* add hist */ if (server.memory_tracking_enabled) updateSlotAllocSize(db, slot, kv, -1, kvobjAllocSize(kv)); *valref = kv; @@ -555,7 +545,7 @@ kvobj *dbAddRDBLoad(redisDb *db, sds key, robj **valref, const KeyMetaSpec *keyM keyMetaSpec->numMeta * sizeof(uint64_t)); } - updateKeysizesHist(db, slot, kv->type, -1, (int64_t) getObjectLength(kv)); + updateKeysizesHist(db, kv->type, -1, (int64_t) getObjectLength(kv)); if (server.memory_tracking_enabled) updateSlotAllocSize(db, slot, kv, -1, kvobjAllocSize(kv)); return *valref = kv; @@ -685,10 +675,10 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link if (updateKeySizes) { /* Save one call if old and new are the same type */ if (oldtype == kvNew->type) { - updateKeysizesHist(db, slot, oldtype, oldlen, newlen); + updateKeysizesHist(db, oldtype, oldlen, newlen); } else { - updateKeysizesHist(db, slot, oldtype, oldlen, -1); - updateKeysizesHist(db, slot, kvNew->type, -1, newlen); + updateKeysizesHist(db, oldtype, oldlen, -1); + updateKeysizesHist(db, kvNew->type, -1, newlen); } } @@ -899,7 +889,7 @@ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) { /* remove key from histogram */ if(!(flags & DB_FLAG_NO_UPDATE_KEYSIZES)) - updateKeysizesHist(db, slot, type, oldlen, -1); + updateKeysizesHist(db, type, oldlen, -1); return 1; } else { return 0; @@ -1263,15 +1253,41 @@ void blockClientForAsyncFlush(client *c) { blockClient(c, BLOCKED_LAZYFREE); } -/* CB function on blocking ASYNC FLUSH completion. - * We will unblock the client and send the proper reply. */ -void unblockClientForAsyncFlush(uint64_t client_id, void *userdata) { - slotRangeArray *slots = userdata; +/* CB function on blocking ASYNC FLUSH/TRIM completion. + * We will unblock the client and send the proper reply if provided. */ +void kvsAsyncFreeDoneCB(uint64_t client_id, void *userdata) { + + /* If ASM Trim context provided, apply histogram delta */ + asmTrimCtx *ctx = userdata; + if (ctx) { + kvstoreMetadata *meta = kvstoreGetMetadata(server.db[0].keys); + /* Apply histogram delta only if target_kvstore hasn't changed */ + if (ctx->target_kvstore == server.db[0].keys && meta) { + for (int type = 0; type < MAX_KEYSIZES_TYPES; type++) { + for (int bin = 0; bin < MAX_KEYSIZES_BINS; bin++) { + meta->keysizes_hist[type][bin] -= ctx->delta_keysizes_hist[type][bin]; + meta->allocsizes_hist[type][bin] -= ctx->delta_allocsizes_hist[type][bin]; + } + } + } + /* Decrement counter unconditionally to track job completion. If kvstore was + * replaced (e.g., by FLUSHALL), the new histogram is already consistent (reset + * to 0 for empty DB), so it's safe to resume assertions when counter reaches 0. */ + asmBgTrimCounterDecr(); + } + + unblockClientForAsyncFlush(client_id, (ctx) ? ctx->slots : NULL); + + /* Release context and slots */ + asmTrimCtxRelease(ctx); +} + +/* Unblock client on async flush/trim completion */ +void unblockClientForAsyncFlush(uint64_t client_id, struct slotRangeArray *slots) { client *c = lookupClientByID(client_id); /* Verify that client still exists and being blocked. */ if (!(c && c->flags & CLIENT_BLOCKED)) { - slotRangeArrayFree(slots); return; } @@ -1284,7 +1300,7 @@ void unblockClientForAsyncFlush(uint64_t client_id, void *userdata) { /* Only SFLUSH command pass user data pointer. */ if (slots) - replySlotsFlushAndFree(c, slots); + replySlotsFlush(c, slots); else addReply(c, shared.ok); @@ -1311,11 +1327,10 @@ void unblockClientForAsyncFlush(uint64_t client_id, void *userdata) { * Return 1 indicates that flush SYNC is actually running in bg as blocking ASYNC * Return 0 otherwise * - * slots - provided only by SFLUSH command, otherwise NULL. Will be used on - * completion to reply with the slots flush result. Ownership is passed - * to the completion job in case of `blocking_async`. + * trim_ctx - provided only by SFLUSH command, otherwise NULL. Contains slots to + * be used on completion to reply with the slots flush result. */ -int flushCommandCommon(client *c, int type, int flags, struct slotRangeArray *slots) { +int flushCommandCommon(client *c, int type, int flags, asmTrimCtx *trim_ctx) { int blocking_async = 0; /* Flush SYNC option to run as blocking ASYNC */ /* in case of SYNC, check if we can optimize and run it in bg as blocking ASYNC */ @@ -1326,7 +1341,7 @@ int flushCommandCommon(client *c, int type, int flags, struct slotRangeArray *sl } /* Cancel all ASM tasks that overlap with the given slot ranges. */ - clusterAsmCancelBySlotRangeArray(slots, c->argv[0]->ptr); + clusterAsmCancelBySlotRangeArray(trim_ctx ? trim_ctx->slots : NULL, c->argv[0]->ptr); if (type == FLUSH_TYPE_ALL) flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS); @@ -1342,7 +1357,12 @@ int flushCommandCommon(client *c, int type, int flags, struct slotRangeArray *sl * lazyfree jobs in queue were processed */ if (blocking_async) { blockClientForAsyncFlush(c); - bioCreateCompRq(BIO_WORKER_LAZY_FREE, unblockClientForAsyncFlush, c->id, slots); + /* Retain trim_ctx if provided so kvsAsyncFreeDoneCB can release it later */ + if (trim_ctx) { + asmBgTrimCounterIncr(); + asmTrimCtxRetain(trim_ctx); + } + bioCreateCompRq(BIO_WORKER_LAZY_FREE, kvsAsyncFreeDoneCB, c->id, trim_ctx); } #if defined(USE_JEMALLOC) diff --git a/src/debug.c b/src/debug.c index e2aec912e..370b572b3 100644 --- a/src/debug.c +++ b/src/debug.c @@ -553,13 +553,19 @@ NULL long long flag; if (getLongLongFromObjectOrReply(c, c->argv[2], &flag, NULL) != C_OK) return; - server.dbg_assert_keysizes = (flag != 0); + if (flag) + server.dbg_assert_flags |= DBG_ASSERT_KEYSIZES; + else + server.dbg_assert_flags &= ~DBG_ASSERT_KEYSIZES; addReply(c, shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"ALLOCSIZE-SLOTS-ASSERT") && c->argc == 3) { long long flag; if (getLongLongFromObjectOrReply(c, c->argv[2], &flag, NULL) != C_OK) return; - server.dbg_assert_alloc_per_slot = (flag != 0); + if (flag) + server.dbg_assert_flags |= DBG_ASSERT_ALLOC_SLOT; + else + server.dbg_assert_flags &= ~DBG_ASSERT_ALLOC_SLOT; addReply(c, shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"log") && c->argc == 3) { serverLog(LL_WARNING, "DEBUG LOG: %s", (char*)c->argv[2]->ptr); diff --git a/src/hyperloglog.c b/src/hyperloglog.c index c01b297b6..66a7f5e2a 100644 --- a/src/hyperloglog.c +++ b/src/hyperloglog.c @@ -1650,7 +1650,7 @@ void pfaddCommand(client *c) { } hdr = kv->ptr; - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_STRING, oldlen, stringObjectLen(kv)); + updateKeysizesHist(c->db, OBJ_STRING, oldlen, stringObjectLen(kv)); if (server.memory_tracking_enabled) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), kv, oldsize, kvobjAllocSize(kv)); if (updated) { @@ -1841,8 +1841,7 @@ void pfmergeCommand(client *c) { * since in theory this is a mass-add of elements. */ notifyKeyspaceEvent(NOTIFY_STRING,"pfadd",c->argv[1],c->db->id); - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), - OBJ_STRING, oldLen, stringObjectLen(kv)); + updateKeysizesHist(c->db, OBJ_STRING, oldLen, stringObjectLen(kv)); server.dirty++; addReply(c,shared.ok); } @@ -2011,7 +2010,7 @@ void pfdebugCommand(client *c) { addReplyError(c,invalid_hll_err); return; } - updateKeysizesHist(c->db, getKeySlot(c->argv[2]->ptr), OBJ_STRING, oldlen, stringObjectLen(o)); + updateKeysizesHist(c->db, OBJ_STRING, oldlen, stringObjectLen(o)); if (server.memory_tracking_enabled) updateSlotAllocSize(c->db, getKeySlot(c->argv[2]->ptr), o, oldsize, kvobjAllocSize(o)); server.dirty++; /* Force propagation on encoding change. */ @@ -2080,7 +2079,7 @@ void pfdebugCommand(client *c) { addReplyError(c,invalid_hll_err); return; } - updateKeysizesHist(c->db, getKeySlot(c->argv[2]->ptr), OBJ_STRING, oldlen, stringObjectLen(o)); + updateKeysizesHist(c->db, OBJ_STRING, oldlen, stringObjectLen(o)); if (server.memory_tracking_enabled) updateSlotAllocSize(c->db, getKeySlot(c->argv[2]->ptr), o, oldsize, kvobjAllocSize(o)); conv = 1; diff --git a/src/lazyfree.c b/src/lazyfree.c index e3f125ff9..5d89d00aa 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -3,6 +3,7 @@ #include "atomicvar.h" #include "functions.h" #include "cluster.h" +#include "cluster_asm.h" #include "ebuckets.h" static redisAtomic size_t lazyfree_objects = 0; @@ -17,14 +18,50 @@ void lazyfreeFreeObject(void *args[]) { atomicIncr(lazyfreed_objects,1); } +/* Populate delta histograms by iterating through keys in the kvstore. To be + * deduced from the main db histogram later on kvsAsyncFreeDoneCB */ +static void populateDeltaHistograms(kvstore *kvs, asmTrimCtx *ctx) { + kvstoreIterator kvs_it; + kvstoreIteratorInit(&kvs_it, kvs); + dictEntry *de; + + while ((de = kvstoreIteratorNext(&kvs_it)) != NULL) { + kvobj *kv = dictGetKV(de); + if ((!kv) || (kv->type >= OBJ_TYPE_BASIC_MAX)) continue; + + /* Update keysizes_hist delta */ + size_t len = getObjectLength(kv); + int sizeBin = (len == 0) ? 0 : log2ceil(len) + 1; /* Only strings can be empty */ + debugServerAssert(sizeBin < MAX_KEYSIZES_BINS); + ctx->delta_keysizes_hist[kv->type][sizeBin]++; + + /* Update allocsizes_hist delta */ + if (server.memory_tracking_enabled) { + size_t alloc_size = kvobjAllocSize(kv); + int allocBin = (alloc_size == 0) ? 0 : log2ceil(alloc_size) + 1; + debugServerAssert(allocBin < MAX_KEYSIZES_BINS); + ctx->delta_allocsizes_hist[kv->type][allocBin]++; + } + } + kvstoreIteratorReset(&kvs_it); +} + /* Release a database from the lazyfree thread. The 'db' pointer is the * database which was substituted with a fresh one in the main thread - * when the database was logically deleted. */ -void lazyfreeFreeDatabase(void *args[]) { + * when the database was logically deleted. + * + * If args[3] is provided, it's an asmTrimCtx for tracking histogram deltas + * during ASM background trim. */ +void kvsLazyfreeFree(void *args[]) { kvstore *da1 = args[0]; kvstore *da2 = args[1]; estore *subexpires = args[2]; dict *stream_idmp_keys = args[3]; + asmTrimCtx *ctx = args[4]; /* Optional: ASM trim context */ + + /* If ASM context provided, populate delta histograms */ + if (ctx) populateDeltaHistograms(da1, ctx); + estoreRelease(subexpires); dictRelease(stream_idmp_keys); size_t numkeys = kvstoreSize(da1); @@ -297,13 +334,13 @@ void emptyDbAsync(redisDb *db) { db->subexpires = estoreCreate(&subexpiresBucketsType, slot_count_bits); db->stream_idmp_keys = dictCreate(&objectKeyPointerValueDictType); protectClientReplyObjects(); /* Protect client reply objects before async free. */ - emptyDbDataAsync(oldkeys, oldexpires, oldsubexpires, old_stream_idmp_keys); + emptyDbDataAsync(oldkeys, oldexpires, oldsubexpires, old_stream_idmp_keys, NULL); } -/* Empty a Redis DB data asynchronously. */ -void emptyDbDataAsync(kvstore *keys, kvstore *expires, ebuckets hexpires, dict *stream_idmp_keys) { +/* Empty a kvstore data asynchronously. */ +void emptyDbDataAsync(kvstore *keys, kvstore *expires, ebuckets hexpires, dict *stream_idmp_keys, asmTrimCtx *ctx) { atomicIncr(lazyfree_objects, kvstoreSize(keys)); - bioCreateLazyFreeJob(lazyfreeFreeDatabase, 4, keys, expires, hexpires, stream_idmp_keys); + bioCreateLazyFreeJob(kvsLazyfreeFree, 5, keys, expires, hexpires, stream_idmp_keys, ctx); } /* Free the key tracking table. diff --git a/src/module.c b/src/module.c index dda2022f0..ff04842e4 100644 --- a/src/module.c +++ b/src/module.c @@ -4746,7 +4746,7 @@ int RM_StringTruncate(RedisModuleKey *key, size_t newlen) { if (server.memory_tracking_enabled) updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv)); if (curlen != newlen) - updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_STRING, curlen, newlen); + updateKeysizesHist(key->db, OBJ_STRING, curlen, newlen); } return REDISMODULE_OK; } @@ -4861,7 +4861,7 @@ int RM_ListPush(RedisModuleKey *key, int where, RedisModuleString *ele) { listTypePush(key->kv, ele, (where == REDISMODULE_LIST_HEAD) ? LIST_HEAD : LIST_TAIL); int64_t l = listTypeLength(key->kv); - updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_LIST, l-1, l); + updateKeysizesHist(key->db, OBJ_LIST, l-1, l); if (server.memory_tracking_enabled) updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv)); return REDISMODULE_OK; @@ -4899,7 +4899,7 @@ RedisModuleString *RM_ListPop(RedisModuleKey *key, int where) { robj *decoded = getDecodedObject(ele); decrRefCount(ele); int64_t l = (int64_t) listTypeLength(key->kv); - updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_LIST, l+1, l); + updateKeysizesHist(key->db, OBJ_LIST, l+1, l); if (server.memory_tracking_enabled) updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv)); if (!moduleDelKeyIfEmpty(key)) { @@ -5027,7 +5027,7 @@ int RM_ListInsert(RedisModuleKey *key, long index, RedisModuleString *value) { int where = index < 0 ? LIST_TAIL : LIST_HEAD; listTypeInsert(&key->u.list.entry, value, where); int64_t l = (int64_t) listTypeLength(key->kv); - updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_LIST, l-1, l); + updateKeysizesHist(key->db, OBJ_LIST, l-1, l); if (server.memory_tracking_enabled) updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv)); /* A note in quicklist.c forbids use of iterator after insert. */ @@ -5058,7 +5058,7 @@ int RM_ListDelete(RedisModuleKey *key, long index) { oldsize = kvobjAllocSize(key->kv); listTypeDelete(key->iter, &key->u.list.entry); int64_t l = (int64_t) listTypeLength(key->kv); - updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_LIST, l+1, l); + updateKeysizesHist(key->db, OBJ_LIST, l+1, l); if (server.memory_tracking_enabled) updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv)); if (moduleDelKeyIfEmpty(key)) return REDISMODULE_OK; @@ -5167,7 +5167,7 @@ int RM_ZsetAdd(RedisModuleKey *key, double score, RedisModuleString *ele, int *f } if (flagsptr) *flagsptr = moduleZsetAddFlagsFromCoreFlags(out_flags); int64_t l = (int64_t) zsetLength(key->kv); - updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_ZSET, l-1, l); + updateKeysizesHist(key->db, OBJ_ZSET, l-1, l); if (server.memory_tracking_enabled) updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv)); return REDISMODULE_OK; @@ -5207,7 +5207,7 @@ int RM_ZsetIncrby(RedisModuleKey *key, double score, RedisModuleString *ele, int updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv)); if (out_flags & ZADD_OUT_ADDED) { int64_t l = (int64_t) zsetLength(key->kv); - updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_ZSET, l-1, l); + updateKeysizesHist(key->db, OBJ_ZSET, l-1, l); } if (flagsptr) *flagsptr = moduleZsetAddFlagsFromCoreFlags(out_flags); return REDISMODULE_OK; @@ -5244,7 +5244,7 @@ int RM_ZsetRem(RedisModuleKey *key, RedisModuleString *ele, int *deleted) { if (zsetDel(key->kv,ele->ptr)) { if (deleted) *deleted = 1; int64_t l = (int64_t) zsetLength(key->kv); - updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_ZSET, l+1, l); + updateKeysizesHist(key->db, OBJ_ZSET, l+1, l); if (server.memory_tracking_enabled) updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv)); moduleDelKeyIfEmpty(key); @@ -5747,7 +5747,7 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) { } } va_end(ap); - updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_HASH, oldlen, + updateKeysizesHist(key->db, OBJ_HASH, oldlen, (int64_t) hashTypeLength(key->kv, 0)); moduleDelKeyIfEmpty(key); diff --git a/src/server.c b/src/server.c index a0245d710..7c5caf1a6 100644 --- a/src/server.c +++ b/src/server.c @@ -350,8 +350,13 @@ static void dictDestructorKV(dict *d, void *key) { meta->alloc_size -= alloc_size; /* kvstoreMeta may be NULL when freeing kvstore created with kvstoreBaseType * (e.g. in lazy free context). */ - if (kvstoreMeta) - updateSlotHist(kvstoreMeta->allocsizes_hist, NULL, kv->type, alloc_size, -1); + if (kvstoreMeta && kv->type < OBJ_TYPE_BASIC_MAX) { + /* we don't call kvsUpdateHistogram() because it contains debugServerAssert + * that may fail in bg thread as kvstore might not being fully initialized */ + int old_bin = (alloc_size == 0) ? 0 : log2ceil(alloc_size) + 1; + debugServerAssert(old_bin < MAX_KEYSIZES_BINS); + kvstoreMeta->allocsizes_hist[kv->type][old_bin]--; + } } decrRefCount(kv); } @@ -540,11 +545,11 @@ static void kvstoreOnEmpty(kvstore *kvs) { static void kvstoreOnDictEmpty(kvstore *kvs, int didx) { kvstoreDictMetadata *meta = kvstoreGetDictMeta(kvs, didx, 0); + UNUSED(meta); #ifdef DEBUG_ASSERTIONS dictEmpty(kvstoreGetDict(kvs, didx), NULL); #endif debugServerAssert(meta->alloc_size == 0); - memset(&meta->keysizes_hist, 0, sizeof(meta->keysizes_hist)); } /* Return 1 if currently we allow dict to expand. Dict may allocate huge @@ -2325,11 +2330,9 @@ void initServerConfig(void) { server.executable = NULL; server.arch_bits = (sizeof(long) == 8) ? 64 : 32; #if DEBUG_ASSERT_KEYSPACE - server.dbg_assert_keysizes = 1; - server.dbg_assert_alloc_per_slot = 1; + server.dbg_assert_flags = DBG_ASSERT_KEYSIZES | DBG_ASSERT_ALLOC_SLOT; #else - server.dbg_assert_keysizes = 0; - server.dbg_assert_alloc_per_slot = 0; + server.dbg_assert_flags = 0; #endif server.bindaddr_count = CONFIG_DEFAULT_BINDADDR_COUNT; for (j = 0; j < CONFIG_DEFAULT_BINDADDR_COUNT; j++) @@ -4194,13 +4197,9 @@ void afterCommand(client *c) { if (!server.execution_nesting) listJoin(c->reply, server.pending_push_messages); - /* Assert keysizes histogram if enabled */ - if (unlikely(server.dbg_assert_keysizes)) - dbgAssertKeysizesHist(c->db); - - /* Assert per-slot alloc_size if enabled */ - if (unlikely(server.dbg_assert_alloc_per_slot)) - dbgAssertAllocSizePerSlot(c->db); + /* Run debug assertions if any are enabled */ + if (unlikely(server.dbg_assert_flags)) + dbgRunAssertions(c->db); } /* Check if c->cmd exists, fills `err` with details in case it doesn't. diff --git a/src/server.h b/src/server.h index bd1545072..e1a19d97a 100644 --- a/src/server.h +++ b/src/server.h @@ -1199,9 +1199,17 @@ typedef struct { uint64_t cpu_usec; /* CPU time (in microseconds) spent on given slot */ uint64_t network_bytes_in; /* Network ingress (in bytes) received for given slot */ uint64_t network_bytes_out; /* Network egress (in bytes) sent for given slot */ - keysizesHist keysizes_hist; } kvstoreDictMetadata; +/* Context for ASM background trim with delta histogram tracking */ +typedef struct asmTrimCtx { + int refcount; /* For shared bg/main thread ownership */ + struct slotRangeArray *slots; /* Slot ranges being trimmed */ + kvstore *target_kvstore; /* Target kvstore to update (for validation) */ + keysizesHist delta_keysizes_hist; /* Delta populated by BIO thread */ + keysizesHist delta_allocsizes_hist;/* Delta populated by BIO thread */ +} asmTrimCtx; + /* forward declaration for functions ctx */ typedef struct functionsLibCtx functionsLibCtx; @@ -1339,6 +1347,8 @@ typedef struct { #define CLIENT_ID_AOF (UINT64_MAX) /* Reserved ID for the AOF client. If you need more reserved IDs use UINT64_MAX-1, -2, ... and so forth. */ +#define CLIENT_ID_NONE (0) /* Non-existent client ID, used when no client + is associated with an operation. */ /* Replication backlog is not a separate memory, it just is one consumer of * the global replication buffer. This structure records the reference of @@ -2512,10 +2522,13 @@ struct redisServer { int reply_copy_avoidance_enabled; /* Is reply copy avoidance enabled (1 by default) */ /* Local environment */ char *locale_collate; - int dbg_assert_keysizes; /* Assert keysizes histogram after each command */ - int dbg_assert_alloc_per_slot; /* Assert per-slot alloc_size after each command */ + unsigned int dbg_assert_flags; /* Bitmask of debug assertions to run after each command */ }; +/* Debug assertion flags for server.dbg_assert_flags */ +#define DBG_ASSERT_KEYSIZES (1 << 0) /* Assert keysizes histogram */ +#define DBG_ASSERT_ALLOC_SLOT (1 << 1) /* Assert per-slot alloc_size */ + /* we use 6 so that all getKeyResult fits a cacheline */ #define MAX_KEYS_BUFFER 6 @@ -3897,11 +3910,10 @@ int moduleSetEnumConfig(client *c, sds name, sds *vals, int vals_cnt, const char int moduleSetNumericConfig(client *c, sds name, long long val, const char **err); /* db.c -- Keyspace access API */ -void updateKeysizesHist(redisDb *db, int didx, uint32_t type, int64_t oldLen, int64_t newLen); +void kvsUpdateHistogram(keysizesHist kvstoreHist, uint32_t type, int64_t oldLen, int64_t newLen); +void updateKeysizesHist(redisDb *db, uint32_t type, int64_t oldLen, int64_t newLen); void updateSlotAllocSize(redisDb *db, int didx, kvobj *kv, int64_t oldsize, int64_t newsize); -void updateSlotHist(keysizesHist kvstoreHist, keysizesHist dictHist, uint32_t type, int64_t oldLen, int64_t newLen); -void dbgAssertKeysizesHist(redisDb *db); -void dbgAssertAllocSizePerSlot(redisDb *db); +void dbgRunAssertions(redisDb *db); int removeExpire(redisDb *db, robj *key); void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj); void deleteEvictedKeyAndPropagate(redisDb *db, robj *keyobj, long long *key_mem_freed); @@ -3959,9 +3971,10 @@ kvobj *dbUnshareStringValueByLink(redisDb *db, robj *key, kvobj *kv, dictEntryLi #define FLUSH_TYPE_ALL 0 #define FLUSH_TYPE_DB 1 #define FLUSH_TYPE_SLOTS 2 -void replySlotsFlushAndFree(client *c, struct slotRangeArray *slots); -int flushCommandCommon(client *c, int type, int flags, struct slotRangeArray *ranges); -void unblockClientForAsyncFlush(uint64_t client_id, void *userdata); +void replySlotsFlush(client *c, struct slotRangeArray *slots); +int flushCommandCommon(client *c, int type, int flags, struct asmTrimCtx *trim_ctx); +void kvsAsyncFreeDoneCB(uint64_t client_id, void *userdata); +void unblockClientForAsyncFlush(uint64_t client_id, struct slotRangeArray *slots); void blockClientForAsyncFlush(client *c); #define EMPTYDB_NO_FLAGS 0 /* No flags. */ #define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ @@ -3982,7 +3995,7 @@ int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor); int dbAsyncDelete(redisDb *db, robj *key); void emptyDbAsync(redisDb *db); void streamMoveIdmpKeys(dict *src, dict *dst, int slot); -void emptyDbDataAsync(kvstore *keys, kvstore *expires, ebuckets hexpires, dict *stream_idmp_keys); +void emptyDbDataAsync(kvstore *keys, kvstore *expires, ebuckets hexpires, dict *stream_idmp_keys, struct asmTrimCtx *ctx); size_t lazyfreeGetPendingObjectsCount(void); size_t lazyfreeGetFreedObjectsCount(void); void lazyfreeResetStats(void); diff --git a/src/t_hash.c b/src/t_hash.c index 9552fca27..3cdf4270f 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -407,7 +407,7 @@ void listpackExExpire(redisDb *db, kvobj *kv, ExpireInfo *info, int activeEx) { /* update keysizes */ unsigned long l = lpLength(lpt->lp) / 3; - updateKeysizesHist(db, getKeySlot(key), OBJ_HASH, l + expired, l); + updateKeysizesHist(db, OBJ_HASH, l + expired, l); } min = hashTypeGetMinExpire(kv, 1 /*accurate*/); @@ -774,7 +774,7 @@ GetFieldRes hashTypeGetValue(redisDb *db, kvobj *o, sds field, unsigned char **v if (!(hfeFlags & HFE_LAZY_NO_UPDATE_KEYSIZES)) { uint64_t l = hashTypeLength(o, 0); - updateKeysizesHist(db, getKeySlot(key), OBJ_HASH, l+1, l); + updateKeysizesHist(db, OBJ_HASH, l+1, l); } /* If the field is the last one in the hash, then the hash will be deleted */ @@ -2102,7 +2102,7 @@ void hsetnxCommand(client *c) { keyModified(c,c->db,c->argv[1], kv, 1); notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); hlen = hashTypeLength(kv, 0); - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, hlen - 1, hlen); + updateKeysizesHist(c->db, OBJ_HASH, hlen - 1, hlen); if (server.memory_tracking_enabled) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), kv, oldsize, kvobjAllocSize(kv)); server.dirty++; @@ -2138,7 +2138,7 @@ void hsetCommand(client *c) { } keyModified(c,c->db,c->argv[1],kv,1); unsigned long l = hashTypeLength(kv, 0); - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, l - created, l); + updateKeysizesHist(c->db, OBJ_HASH, l - created, l); if (server.memory_tracking_enabled) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), kv, oldsize, kvobjAllocSize(kv)); notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); @@ -2489,8 +2489,7 @@ out: notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); } if (oldlen != newlen) - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, - oldlen, newlen); + updateKeysizesHist(c->db, OBJ_HASH, oldlen, newlen); } void hincrbyCommand(client *c) { @@ -2516,13 +2515,13 @@ void hincrbyCommand(client *c) { } else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) { value = 0; unsigned long l = hashTypeLength(o, 0); - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, l, l + 1); + updateKeysizesHist(c->db, OBJ_HASH, l, l + 1); } else { /* Field expired and in turn hash deleted. Create new one! */ o = createHashObject(); dbAdd(c->db,c->argv[1],&o); value = 0; - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, 0, 1); + updateKeysizesHist(c->db, OBJ_HASH, 0, 1); } oldvalue = value; @@ -2573,13 +2572,13 @@ void hincrbyfloatCommand(client *c) { } else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) { value = 0; unsigned long l = hashTypeLength(o, 0); - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, l, l + 1); + updateKeysizesHist(c->db, OBJ_HASH, l, l + 1); } else { /* Field expired and in turn hash deleted. Create new one! */ o = createHashObject(); dbAdd(c->db, c->argv[1], &o); value = 0; - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, 0, 1); + updateKeysizesHist(c->db, OBJ_HASH, 0, 1); } value += incr; @@ -2770,8 +2769,7 @@ void hgetdelCommand(client *c) { } if (oldlen != newlen) - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, - oldlen, newlen); + updateKeysizesHist(c->db, OBJ_HASH, oldlen, newlen); } /* Get the value of one or more fields of a given hash key and optionally set @@ -2907,7 +2905,7 @@ void hgetexCommand(client *c) { * or the new expiration time is in the past.*/ newlen = hashTypeLength(o, 0); - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, oldlen, newlen); + updateKeysizesHist(c->db, OBJ_HASH, oldlen, newlen); if (newlen == 0) { dbDelete(c->db, c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); @@ -2966,7 +2964,7 @@ void hdelCommand(client *c) { estoreRemove(c->db->subexpires, getKeySlot(c->argv[1]->ptr), o); newLen = oldLen - deleted; } - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, oldLen, newLen); + updateKeysizesHist(c->db, OBJ_HASH, oldLen, newLen); server.dirty += deleted; } addReplyLongLong(c,deleted); @@ -3514,7 +3512,7 @@ static ExpireAction onFieldExpire(eItem item, void *ctx) { /* update keysizes */ unsigned long l = hashTypeLength(expCtx->hashObj, 0); - updateKeysizesHist(expCtx->db, getKeySlot(key), OBJ_HASH, l, l - 1); + updateKeysizesHist(expCtx->db, OBJ_HASH, l, l - 1); serverAssert(hashTypeDelete(expCtx->hashObj, field) == 1); if (server.memory_tracking_enabled) @@ -3877,8 +3875,7 @@ static void hexpireGenericCommand(client *c, long long basetime, int unit) { } if (oldlen != newlen) - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, - oldlen, newlen); + updateKeysizesHist(c->db, OBJ_HASH, oldlen, newlen); /* Avoid propagating command if not even one field was updated (Either because * the time is in the past, and corresponding HDELs were sent, or conditions diff --git a/src/t_list.c b/src/t_list.c index bfba02faa..48bcf3003 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -513,7 +513,7 @@ void pushGenericCommand(client *c, int where, int xx) { char *event = (where == LIST_HEAD) ? "lpush" : "rpush"; keyModified(c,c->db,c->argv[1],lobj,1); notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id); - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_LIST, llen - (c->argc - 2), llen); + updateKeysizesHist(c->db, OBJ_LIST, llen - (c->argc - 2), llen); if (server.memory_tracking_enabled) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), lobj, oldsize, kvobjAllocSize(lobj)); } @@ -590,7 +590,7 @@ void linsertCommand(client *c) { c->argv[1],c->db->id); server.dirty++; unsigned long ll = listTypeLength(subject); - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_LIST, ll-1, ll); + updateKeysizesHist(c->db, OBJ_LIST, ll-1, ll); } else { /* Notify client of a failed insert */ addReplyLongLong(c,-1); @@ -786,7 +786,7 @@ void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, s unsigned long llen = listTypeLength(o); notifyKeyspaceEvent(NOTIFY_LIST, event, key, c->db->id); - updateKeysizesHist(c->db, getKeySlot(key->ptr), OBJ_LIST, llen + count, llen); + updateKeysizesHist(c->db, OBJ_LIST, llen + count, llen); if (llen == 0) { if (deleted) *deleted = 1; @@ -979,7 +979,7 @@ void ltrimCommand(client *c) { if (server.memory_tracking_enabled) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), o, oldsize, kvobjAllocSize(o)); } - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_LIST, llen, llenNew); + updateKeysizesHist(c->db, OBJ_LIST, llen, llenNew); keyModified(c, c->db, c->argv[1], (llenNew > 0) ? o : NULL, 1); server.dirty += (ltrim + rtrim); addReply(c,shared.ok); @@ -1141,7 +1141,7 @@ void lremCommand(client *c) { if (removed) { long ll = listTypeLength(subject); - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_LIST, ll + removed, ll); + updateKeysizesHist(c->db, OBJ_LIST, ll + removed, ll); notifyKeyspaceEvent(NOTIFY_LIST,"lrem",c->argv[1],c->db->id); if (ll == 0) { @@ -1233,7 +1233,7 @@ void lmoveGenericCommand(client *c, int wherefrom, int whereto) { updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), kvsrc, oldsize, kvobjAllocSize(kvsrc)); lmoveHandlePush(c, c->argv[2], kvdst, value, whereto); /* Update dst obj cardinality in KEYSIZES */ - updateKeysizesHist(c->db, getKeySlot(c->argv[2]->ptr), OBJ_LIST, oldlen, newlen); + updateKeysizesHist(c->db, OBJ_LIST, oldlen, newlen); /* Update src obj cardinality in KEYSIZES by listElementsRemoved() */ size_t srcsize = server.memory_tracking_enabled ? kvobjAllocSize(kvsrc) : 0; listElementsRemoved(c, skey, wherefrom, kvsrc, 1, srcsize, 1, NULL); diff --git a/src/t_set.c b/src/t_set.c index 709b6ffec..12bab25a4 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -637,7 +637,7 @@ void saddCommand(client *c) { updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), set, oldsize, kvobjAllocSize(set)); if (added) { unsigned long size = setTypeSize(set); - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_SET, size - added, size); + updateKeysizesHist(c->db, OBJ_SET, size - added, size); keyModified(c,c->db,c->argv[1],set,1); notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id); } @@ -687,7 +687,7 @@ void sremCommand(client *c) { c->db->id); newSize = -1; /* removed */ } - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_SET, oldSize, newSize); + updateKeysizesHist(c->db, OBJ_SET, oldSize, newSize); server.dirty += deleted; } addReplyLongLong(c,deleted); @@ -739,7 +739,7 @@ void smoveCommand(client *c) { srcNewLen = -1; /* removed */ notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id); } - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_SET, srcOldLen, srcNewLen); + updateKeysizesHist(c->db, OBJ_SET, srcOldLen, srcNewLen); /* Create the destination set when it doesn't exist */ if (!dstset) { @@ -755,7 +755,7 @@ void smoveCommand(client *c) { /* An extra key has changed when ele was successfully added to dstset */ if (setTypeAdd(dstset,ele->ptr)) { unsigned long dstLen = setTypeSize(dstset); - updateKeysizesHist(c->db, getKeySlot(c->argv[2]->ptr), OBJ_SET, dstLen - 1, dstLen); + updateKeysizesHist(c->db, OBJ_SET, dstLen - 1, dstLen); server.dirty++; keyModified(c,c->db,c->argv[2],dstset,1); notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[2],c->db->id); @@ -930,7 +930,7 @@ void spopWithCountCommand(client *c) { lp = lpBatchDelete(lp, ps, count); zfree(ps); set->ptr = lp; - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_SET, size, size - count); + updateKeysizesHist(c->db, OBJ_SET, size, size - count); if (server.memory_tracking_enabled) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), set, oldsize, kvobjAllocSize(set)); } else if (remaining*SPOP_MOVE_STRATEGY_MUL > count) { @@ -949,7 +949,7 @@ void spopWithCountCommand(client *c) { propindex = 2; } } - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_SET, size, size - count); + updateKeysizesHist(c->db, OBJ_SET, size, size - count); if (server.memory_tracking_enabled) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), set, oldsize, kvobjAllocSize(set)); } else { @@ -1022,7 +1022,7 @@ void spopWithCountCommand(client *c) { * since function dbReplaceValue() assumes the entire set is being replaced, * but here we're building the new set from the existing one. As a result, * the size of the old set has already changed by the time we reach this point. */ - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_SET, size, size-count); + updateKeysizesHist(c->db, OBJ_SET, size, size-count); if (server.memory_tracking_enabled) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), set, oldsize, kvobjAllocSize(set)); dbReplaceValue(c->db, c->argv[1], &newset, 0); @@ -1066,7 +1066,7 @@ void spopCommand(client *c) { if (kv == NULL || checkType(c, kv, OBJ_SET)) return; size = setTypeSize(kv); - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_SET, size, size-1); + updateKeysizesHist(c->db, OBJ_SET, size, size-1); if (server.memory_tracking_enabled) oldsize = kvobjAllocSize(kv); diff --git a/src/t_string.c b/src/t_string.c index 4ba22cae7..b9fc3ad5c 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -627,7 +627,7 @@ void setrangeCommand(client *c) { kv = dbUnshareStringValueByLink(c->db, c->argv[1], kv, link); newLen = max(oldLen, (int64_t) (offset + value_len)); - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_STRING, oldLen, newLen); + updateKeysizesHist(c->db, OBJ_STRING, oldLen, newLen); } if (value_len > 0) { @@ -844,8 +844,7 @@ void incrDecrCommand(client *c, long long incr) { { new = o; o->ptr = (void*)((long)value); - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), - OBJ_STRING, + updateKeysizesHist(c->db, OBJ_STRING, (int64_t) sdigits10(oldvalue), (int64_t) sdigits10(value)); } else { @@ -958,7 +957,7 @@ void appendCommand(client *c) { updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), o, oldsize, kvobjAllocSize(o)); totlen = sdslen(o->ptr); int64_t oldlen = totlen - append_len; - updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_STRING, oldlen, totlen); + updateKeysizesHist(c->db, OBJ_STRING, oldlen, totlen); } keyModified(c,c->db,c->argv[1],o,1); notifyKeyspaceEvent(NOTIFY_STRING,"append",c->argv[1],c->db->id); diff --git a/src/t_zset.c b/src/t_zset.c index bfc104f52..3ce602e1d 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -2078,7 +2078,7 @@ void zaddGenericCommand(client *c, int flags) { server.dirty += (added+updated); if (server.memory_tracking_enabled) updateSlotAllocSize(c->db, getKeySlot(key->ptr), zobj, oldsize, kvobjAllocSize(zobj)); - updateKeysizesHist(c->db, getKeySlot(key->ptr), OBJ_ZSET, llen, llen+added); + updateKeysizesHist(c->db, OBJ_ZSET, llen, llen+added); reply_to_client: if (incr) { /* ZINCRBY or INCR option. */ @@ -2146,7 +2146,7 @@ void zremCommand(client *c) { newlen = -1; /* means key got deleted */ } - updateKeysizesHist(c->db, getKeySlot(key->ptr), OBJ_ZSET, oldlen, newlen); + updateKeysizesHist(c->db, OBJ_ZSET, oldlen, newlen); keyModified(c, c->db, key, keyremoved ? NULL : zobj, 1); server.dirty += deleted; } @@ -2278,7 +2278,7 @@ void zremrangeGenericCommand(client *c, zrange_type rangetype) { newlen = zsetLength(zobj); oldlen = newlen + deleted; } - updateKeysizesHist(c->db, getKeySlot(key->ptr), OBJ_ZSET, oldlen, newlen); + updateKeysizesHist(c->db, OBJ_ZSET, oldlen, newlen); } server.dirty += deleted; addReplyLongLong(c,deleted); @@ -4342,7 +4342,7 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey newlen = -1; } - updateKeysizesHist(c->db, getKeySlot(key->ptr), OBJ_ZSET, oldlen, newlen); + updateKeysizesHist(c->db, OBJ_ZSET, oldlen, newlen); keyModified(c, c->db, key, (newlen > 0) ? zobj : NULL, 1); if (c->cmd->proc == zmpopCommand) { diff --git a/tests/unit/cluster/atomic-slot-migration.tcl b/tests/unit/cluster/atomic-slot-migration.tcl index ada32c35c..826f0d69c 100644 --- a/tests/unit/cluster/atomic-slot-migration.tcl +++ b/tests/unit/cluster/atomic-slot-migration.tcl @@ -112,7 +112,7 @@ proc asm_all_instances_idle {total} { proc wait_for_asm_done {} { set total_instances [expr {$::cluster_master_nodes + $::cluster_replica_nodes}] - wait_for_condition 1000 10 { + wait_for_condition 3000 10 { [asm_all_instances_idle $total_instances] == 1 } else { # Print the number of active tasks on each instance diff --git a/tests/unit/info-keysizes.tcl b/tests/unit/info-keysizes.tcl index 4070711a9..065e3ee41 100644 --- a/tests/unit/info-keysizes.tcl +++ b/tests/unit/info-keysizes.tcl @@ -201,14 +201,16 @@ proc test_all_keysizes { {replMode 0} } { run_cmd_verify_hist {$server FLUSHALL} {} # PFADD (sparse & dense) for {set i 1} {$i <= 3000} {incr i} { - run_cmd_verify_hist {$server PFADD hll1 a$i b$i c$i} {__EVAL_DB_HIST__ 0} - run_cmd_verify_hist {$server PFADD hll2 x$i y$i z$i} {__EVAL_DB_HIST__ 0} + $server PFADD hll1 a$i b$i c$i + $server PFADD hll2 x$i y$i z$i + run_cmd_verify_hist {} {__EVAL_DB_HIST__ 0} } # PFMERGE, PFCOUNT (sparse & dense) for {set i 1} {$i <= 3000} {incr i} { - run_cmd_verify_hist {$server PFADD hll3 x$i y$i z$i} {__EVAL_DB_HIST__ 0} - run_cmd_verify_hist {$server PFMERGE hll4 hll1 hll2 hll3} {__EVAL_DB_HIST__ 0} - run_cmd_verify_hist {$server PFCOUNT hll1 hll2 hll3 hll4} {__EVAL_DB_HIST__ 0} + $server PFADD hll3 x$i y$i z$i + $server PFMERGE hll4 hll1 hll2 hll3 + $server PFCOUNT hll1 hll2 hll3 hll4 + run_cmd_verify_hist {} {__EVAL_DB_HIST__ 0} } # DEL run_cmd_verify_hist {$server DEL hll4} {__EVAL_DB_HIST__ 0} @@ -242,8 +244,8 @@ proc test_all_keysizes { {replMode 0} } { # RPOP run_cmd_verify_hist {$server RPOP l1} {db0_LIST:4=1,8=1} run_cmd_verify_hist {$server RPOP l1} {db0_LIST:4=2} - # DEL - run_cmd_verify_hist {$server DEL l1} {db0_LIST:4=1} + # DEL + run_cmd_verify_hist {$server DEL l1} {db0_LIST:4=1} # LINSERT, LTRIM run_cmd_verify_hist {$server RPUSH l3 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14} {db0_LIST:4=1,8=1} run_cmd_verify_hist {$server LINSERT l3 AFTER 9 10} {db0_LIST:4=1,16=1} @@ -420,7 +422,7 @@ proc test_all_keysizes { {replMode 0} } { } {} {cluster:skip} - test "KEYSIZES - Test STRING $suffixRepl" { + test "KEYSIZES - Test STRING $suffixRepl" { # SETRANGE run_cmd_verify_hist {$server FLUSHALL} {} run_cmd_verify_hist {$server SET s2 1234567890} {db0_STR:8=1} @@ -464,7 +466,42 @@ proc test_all_keysizes { {replMode 0} } { run_cmd_verify_hist {$server APPEND s2 y} {db0_STR:1=2} } {} {cluster:skip} - + + test "KEYSIZES - Test UNLINK (async deletion) $suffixRepl" { + # UNLINK on STRING + run_cmd_verify_hist {$server FLUSHALL} {} + run_cmd_verify_hist {$server SET s1 1234567890} {db0_STR:8=1} + run_cmd_verify_hist {$server UNLINK s1} {} 1 + + # UNLINK on LIST + run_cmd_verify_hist {$server FLUSHALL} {} + run_cmd_verify_hist {$server RPUSH l1 1 2 3 4 5 6 7 8} {db0_LIST:8=1} + run_cmd_verify_hist {$server UNLINK l1} {} 1 + + # UNLINK on SET + run_cmd_verify_hist {$server FLUSHALL} {} + run_cmd_verify_hist {$server SADD s1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16} {db0_SET:16=1} + run_cmd_verify_hist {$server UNLINK s1} {} 1 + + # UNLINK on ZSET + run_cmd_verify_hist {$server FLUSHALL} {} + run_cmd_verify_hist {$server ZADD z1 1 a 2 b 3 c 4 d 5 e 6 f 7 g 8 h} {db0_ZSET:8=1} + run_cmd_verify_hist {$server UNLINK z1} {} 1 + + # UNLINK on HASH + run_cmd_verify_hist {$server FLUSHALL} {} + run_cmd_verify_hist {$server HSET h1 f1 v1 f2 v2 f3 v3 f4 v4} {db0_HASH:4=1} + run_cmd_verify_hist {$server UNLINK h1} {} 1 + + # UNLINK multiple keys of different types + run_cmd_verify_hist {$server FLUSHALL} {} + run_cmd_verify_hist {$server SET s1 12345678} {db0_STR:8=1} + run_cmd_verify_hist {$server RPUSH l1 1 2 3 4} {db0_STR:8=1 db0_LIST:4=1} + run_cmd_verify_hist {$server SADD set1 a b c d e f g h} {db0_STR:8=1 db0_LIST:4=1 db0_SET:8=1} + run_cmd_verify_hist {$server ZADD z1 1 x 2 y} {db0_STR:8=1 db0_LIST:4=1 db0_SET:8=1 db0_ZSET:2=1} + run_cmd_verify_hist {$server UNLINK s1 l1 set1 z1} {} 1 + } {} {cluster:skip} + test "KEYSIZES - Test complex dataset $suffixRepl" { run_cmd_verify_hist {$server FLUSHALL} {} createComplexDataset $server 1000 diff --git a/tests/unit/type/hash-field-expire.tcl b/tests/unit/type/hash-field-expire.tcl index 841b283eb..7f3520e80 100644 --- a/tests/unit/type/hash-field-expire.tcl +++ b/tests/unit/type/hash-field-expire.tcl @@ -296,7 +296,7 @@ start_server {tags {"external:skip needs:debug"}} { test "HPEXPIRETIME persists after RDB reload ($type)" { r del myhash r hset myhash field1 value1 field2 value2 - r hpexpire myhash 150 NX FIELDS 1 field1 + r hpexpire myhash 300 NX FIELDS 1 field1 set before [r HPEXPIRETIME myhash FIELDS 1 field1] r debug reload set after [r HPEXPIRETIME myhash FIELDS 1 field1]