KEYSIZES/ASM: simplify histograms, fix background trim, and refactor debug assertions (#14877)
Some checks are pending
CI / test-ubuntu-latest (push) Waiting to run
CI / test-sanitizer-address (push) Waiting to run
CI / build-debian-old (push) Waiting to run
CI / build-macos-latest (push) Waiting to run
CI / build-32bit (push) Waiting to run
CI / build-libc-malloc (push) Waiting to run
CI / build-centos-jemalloc (push) Waiting to run
CI / build-old-chain-jemalloc (push) Waiting to run
Codecov / code-coverage (push) Waiting to run
External Server Tests / test-external-standalone (push) Waiting to run
External Server Tests / test-external-cluster (push) Waiting to run
External Server Tests / test-external-nodebug (push) Waiting to run
Spellcheck / Spellcheck (push) Waiting to run

- Simplify KEYSIZES tracking and saving memory by removing per‑slot histogram state in kvstoreDictMetadata and routing all updates through kvsUpdateHistogram + updateKeysizesHist(db, type, ...) (per‑DB only).
- Fix KEYSIZES consistency during ASM background trim by introducing asmTrimCtx, passing it through emptyDbDataAsync/BIO lazy‑free, computing histogram deltas in the background, and applying them on completion; add bg_trim_running to coordinate with validation.
- Refactor and relax debug assertions into a unified dbg_assert_flags bitmask and dbgRunAssertions(db), and skip KEYSIZES/ALLOCSIZE checks during nested execution, RDB load, ASM import, and ASM background trim.
- Update commands, module APIs, tests (including UNLINK async deletion coverage), and daily CI workflow to reflect the new histogram behavior and re‑enable ASM/cluster tests.
- Revise the daily CI “debug-assert-keyspace” workflow to run ASM and slot-stats unit tests again

Potential edge case with histogram accuracy:
The histogram could become inaccurate if the database is flushed while ASM trim is running in the background. I considered adding a generation counter to detect this, but decided against it since this is purely an INFO/diagnostic feature and the edge case is quite rare. The target_kvstore pointer check prevents applying stale deltas to the wrong kvstore, and if the histogram does become incorrect we have debugServerAssert() to catch negative values - it won't cause crashes in production. This is a known limitation we can document and revisit if it becomes a real issue in practice.
This commit is contained in:
Moti Cohen 2026-03-22 15:26:06 +02:00 committed by GitHub
parent 9accf8bd24
commit f4d176b3b7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 433 additions and 205 deletions

View file

@ -185,7 +185,6 @@ jobs:
# - Info keysizes histogram
# - Cluster slot stats
# Skips slow and defrag tests to avoid timeouts while maintaining good coverage.
# TODO: Fix ASM (atomic slot migration) compatibility with keysizes histogram tracking
test-debug-assert-keyspace:
runs-on: ubuntu-latest
if: |
@ -207,15 +206,13 @@ jobs:
repository: ${{ env.GITHUB_REPOSITORY }}
ref: ${{ env.GITHUB_HEAD_REF }}
- name: make
run: make REDIS_CFLAGS='-Werror -DREDIS_TEST -DDEBUG_ASSERT_KEYSPACE'
run: make REDIS_CFLAGS='-Werror -DREDIS_TEST -DDEBUG_ASSERT_KEYSPACE -DDEBUG_ASSERTIONS'
- name: testprep
run: sudo apt-get install tcl8.6 tclx
- name: test
if: true && !contains(github.event.inputs.skiptests, 'redis')
run: |
./runtest --verbose --tags "-slow -defrag" \
--skipunit unit/cluster/slot-stats \
--skipunit unit/cluster/atomic-slot-migration \
--dump-logs ${{github.event.inputs.test_args}}
- name: cluster tests
if: true && !contains(github.event.inputs.skiptests, 'cluster')

View file

@ -885,8 +885,7 @@ void setbitCommand(client *c) {
* update the keysizes histogram. Otherwise, the histogram already
* updated in lookupStringForBitCommand() by calling dbAdd(). */
if ((strOldSize > 0) && (strGrowSize != 0))
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_STRING,
strOldSize, strOldSize + strGrowSize);
updateKeysizesHist(c->db, OBJ_STRING, strOldSize, strOldSize + strGrowSize);
}
/* Return original value. */
@ -2108,8 +2107,7 @@ void bitfieldGeneric(client *c, int flags) {
* update the keysizes histogram. Otherwise, the histogram already
* updated in lookupStringForBitCommand() by calling dbAdd(). */
if ((strOldSize > 0) && (strGrowSize != 0))
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_STRING,
strOldSize, strOldSize + strGrowSize);
updateKeysizesHist(c->db, OBJ_STRING, strOldSize, strOldSize + strGrowSize);
keyModified(c,c->db,c->argv[1],o,1);
notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id);

View file

@ -1769,14 +1769,13 @@ int clusterIsMySlot(int slot) {
return getMyClusterNode() == getNodeBySlot(slot);
}
void replySlotsFlushAndFree(client *c, slotRangeArray *slots) {
void replySlotsFlush(client *c, slotRangeArray *slots) {
addReplyArrayLen(c, slots->num_ranges);
for (int i = 0 ; i < slots->num_ranges ; i++) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, slots->ranges[i].start);
addReplyLongLong(c, slots->ranges[i].end);
}
slotRangeArrayFree(slots);
}
/* Normalizes (sorts and merges adjacent ranges), checks that slot ranges are
@ -2190,6 +2189,9 @@ void sflushCommand(client *c) {
return;
}
slotRangeArrayFree(slots);
/* takes ownership of myslots */
asmTrimCtx *trim_ctx = asmTrimCtxCreate(myslots, server.db[0].keys);
/* If the selected slots are exactly the same as the local slots, we can
* simply flush the entire DB by flushCommandCommon. */
@ -2198,9 +2200,10 @@ void sflushCommand(client *c) {
slotRangeArrayFree(local_slots);
if (all_slots_covered) {
/* If not flush as blocking async, then reply immediately */
if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, flags, myslots) == 0) {
replySlotsFlushAndFree(c, myslots);
if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, flags, trim_ctx) == 0) {
replySlotsFlush(c, trim_ctx->slots);
}
asmTrimCtxRelease(trim_ctx);
return;
}
@ -2220,7 +2223,7 @@ void sflushCommand(client *c) {
/* Update dirty stats before trimming. */
server.dirty += getKeyCountInSlotRangeArray(myslots);
/* Pass client id for active trim to unblock client when trim completes. */
trim_method = asmTrimSlots(myslots, blocking_async ? c->id : 0, 0);
trim_method = asmTrimSlots(trim_ctx, blocking_async ? c->id : CLIENT_ID_NONE, 0);
} else {
clusterDelKeysInSlotRangeArray(myslots, 1);
}
@ -2237,15 +2240,13 @@ void sflushCommand(client *c) {
* unblock client and reply in active trim completion. */
if (blocking_async && trim_method != ASM_TRIM_METHOD_NONE) {
blockClientForAsyncFlush(c);
if (trim_method == ASM_TRIM_METHOD_BG)
bioCreateCompRq(BIO_WORKER_LAZY_FREE, unblockClientForAsyncFlush, c->id, myslots);
else /* ASM_TRIM_METHOD_ACTIVE, just free the slot ranges */
slotRangeArrayFree(myslots);
} else {
/* Reply with slot ranges that were flushed. SYNC and ASYNC mode will be
* replied here immediately. */
replySlotsFlushAndFree(c, myslots);
replySlotsFlush(c, trim_ctx->slots);
}
asmTrimCtxRelease(trim_ctx); /* if bg trim, released later by kvsAsyncFreeDoneCB() */
}
/* The READWRITE command just clears the READONLY command state. */

View file

@ -1,11 +1,46 @@
/* cluster_asm.c -- Atomic slot migration implementation for cluster
*
/*
* Copyright (c) 2025-Present, Redis Ltd.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3).
*
* cluster_asm.c -- Atomic slot migration implementation for cluster
*
* TERMINOLOGY:
* - SOURCE: The node that currently owns the slots (sending data away)
* - DESTINATION: The node that will own the slots (receiving data)
*
* Example: Moving slots 0-100 from Node A to Node B
* - Node A = SOURCE (has the data, will lose ownership)
* - Node B = DESTINATION (will receive data, will gain ownership)
*
* Migration Flow:
* 1. DESTINATION initiates: CLUSTER MIGRATION IMPORT <slots>
* (Operator runs command on Node B, the receiving node)
*
* 2. SOURCE forks and sends slot snapshot (RESTORE commands) via RDB channel
* (Node A creates snapshot of slots 0-100)
*
* 3. SOURCE streams incremental changes via main channel
* (Node A forwards new writes to Node B while snapshot is being sent)
*
* 4. DESTINATION applies snapshot and buffers incremental changes
* (Node B receives snapshot, buffers ongoing writes)
*
* 5. SOURCE pauses writes when destination catches up
* (Node A stops accepting writes for slots 0-100 when Node B is nearly caught up)
*
* 6. DESTINATION drains buffer and takes ownership
* (Node B applies final buffered commands, updates config to own slots 0-100)
*
* 7. Config updated atomically via cluster bus
* (All nodes learn: slots 0-100 now belong to Node B)
*
* 8. SOURCE trims migrated keys (background or active)
* (Node A deletes keys from slots 0-100 since it no longer owns them)
*
*/
#include "server.h"
@ -13,17 +48,25 @@
#include "functions.h"
#include "cluster_asm.h"
#include "cluster_slot_stats.h"
#include "bio.h"
/* Operation types: import (destination side) or migrate (source side) */
#define ASM_IMPORT (1 << 1)
#define ASM_MIGRATE (1 << 2)
#define ASM_DEBUG_TRIM_DEFAULT 0
#define ASM_DEBUG_TRIM_NONE 1
#define ASM_DEBUG_TRIM_BG 2
#define ASM_DEBUG_TRIM_ACTIVE 3
/* Trimming methods for cleaning up migrated keys */
#define ASM_DEBUG_TRIM_DEFAULT 0 /* Auto-select based on module subscriptions and client tracking */
#define ASM_DEBUG_TRIM_NONE 1 /* No trimming (for testing) */
#define ASM_DEBUG_TRIM_BG 2 /* Background trim: hand off to BIO thread (fast, non-blocking) */
#define ASM_DEBUG_TRIM_ACTIVE 3 /* Active trim: delete in main thread cron (slow, fires notifications) */
#define ASM_AOF_MIN_ITEMS_PER_KEY 512 /* Minimum number of items per key to use AOF format encoding */
/* ASM Task: Represents a single slot migration operation.
* Each task tracks the complete lifecycle of migrating one or more slot ranges
* from a source node to a destination node. The task exists on both sides but
* with different states (import states on destination, migrate states on source).
*/
typedef struct asmTask {
sds id; /* Task ID */
int operation; /* Either ASM_IMPORT or ASM_MIGRATE */
@ -59,6 +102,8 @@ typedef struct activeTrimJob {
int migration_cleanup; /* Whether this is a migration cleanup of slots no longer owned */
} activeTrimJob;
/* ASM Manager: Global singleton that manages all ASM operations.
* Coordinates migration tasks, trim jobs, and maintains statistics. */
struct asmManager {
list *tasks; /* List of asmTask to be processed */
list *archived_tasks; /* List of archived asmTask */
@ -74,6 +119,9 @@ struct asmManager {
int debug_trim_method; /* Method to trim the buffer */
int debug_active_trim_delay; /* Sleep before trimming each key */
/* Background trim tracking */
size_t bg_trim_running; /* Number of bg trim jobs in progress */
/* Active trim stats */
unsigned long long active_trim_started; /* Number of times active trim was started */
unsigned long long active_trim_completed; /* Number of times active trim was completed */
@ -2953,9 +3001,16 @@ void asmUnblockMasterAfterTrim(void) {
}
}
/* Trim the slots asynchronously in the BIO thread. migration_cleanup is true if this
* is a migration cleanup of slots no longer owned. */
void asmTriggerBackgroundTrim(slotRangeArray *slots, int migration_cleanup) {
/* Background Trim: Delete migrated keys asynchronously in BIO thread.
*
* It works by moving entire slot data structures (dictionaries) to temporary
* kvstores, then handing them off to BIO thread for deletion.
*
* @param trim_ctx Context for slot ranges and histogram tracking
* @param migration_cleanup True if this is post-migration cleanup (fires module events)
*/
void asmTriggerBackgroundTrim(asmTrimCtx *trim_ctx, int migration_cleanup) {
slotRangeArray *slots = trim_ctx->slots;
RedisModuleClusterSlotMigrationTrimInfoV1 fsi = {
REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION,
(RedisModuleSlotRangeArray *) slots
@ -2969,8 +3024,8 @@ void asmTriggerBackgroundTrim(slotRangeArray *slots, int migration_cleanup) {
signalFlushedDb(0, 1, slots);
/* Create temp kvstores and estore, move relevant slot dicts/ebuckets into them,
* and delete them in BIO thread asynchronously. */
/* Create temporary kvstores to hold the slot data we're about to move.
* These will be deleted in the BIO thread. */
kvstore *keys = kvstoreCreate(&kvstoreBaseType, &dbDictType,
CLUSTER_SLOT_MASK_BITS,
KVSTORE_ALLOCATE_DICTS_ON_DEMAND);
@ -2982,6 +3037,7 @@ void asmTriggerBackgroundTrim(slotRangeArray *slots, int migration_cleanup) {
size_t total_keys = 0;
/* Move slot dictionaries from main DB to temp kvstores (O(1) per slot) */
for (int i = 0; i < slots->num_ranges; i++) {
for (int slot = slots->ranges[i].start; slot <= slots->ranges[i].end; slot++) {
total_keys += kvstoreDictSize(server.db[0].keys, slot);
@ -2992,7 +3048,7 @@ void asmTriggerBackgroundTrim(slotRangeArray *slots, int migration_cleanup) {
}
}
emptyDbDataAsync(keys, expires, subexpires, stream_idmp_keys);
emptyDbDataAsync(keys, expires, subexpires, stream_idmp_keys, trim_ctx);
sds str = slotRangeArrayToString(slots);
serverLog(LL_NOTICE, "Background trim started for slots: %s to trim %zu keys.", str, total_keys);
@ -3049,7 +3105,39 @@ void asmTriggerBackgroundTrim(slotRangeArray *slots, int migration_cleanup) {
* Trim the slots, return the trim method used.
* If client_id is non-zero, the client will be unblocked when trim completes.
* If migration_cleanup is true, this is a migration cleanup of slots no longer owned. */
int asmTrimSlots(struct slotRangeArray *slots, uint64_t client_id, int migration_cleanup) {
/* Create ASM trim context with refcount=1 */
asmTrimCtx *asmTrimCtxCreate(slotRangeArray *slots, kvstore *target_kvstore) {
asmTrimCtx *ctx = zcalloc(sizeof(asmTrimCtx));
ctx->refcount = 1;
ctx->slots = slots;
ctx->target_kvstore = target_kvstore;
/* delta histograms are zero-initialized by zcalloc */
return ctx;
}
/* Increment refcount */
void asmTrimCtxRetain(asmTrimCtx *ctx) {
if (!ctx) return;
ctx->refcount++;
}
/* Decrement refcount, free if reaches 0 */
void asmTrimCtxRelease(asmTrimCtx *ctx) {
if (!ctx) return;
serverAssert(ctx->refcount > 0);
ctx->refcount--;
if (ctx->refcount == 0) {
slotRangeArrayFree(ctx->slots);
zfree(ctx);
}
}
int asmTrimSlots(asmTrimCtx *ctx, uint64_t client_id, int migration_cleanup) {
serverAssert(ctx != NULL);
if (asmManager->debug_trim_method == ASM_DEBUG_TRIM_NONE)
return ASM_TRIM_METHOD_NONE;
@ -3065,10 +3153,18 @@ int asmTrimSlots(struct slotRangeArray *slots, uint64_t client_id, int migration
(asmManager->debug_trim_method == ASM_DEBUG_TRIM_ACTIVE) ||
(asmManager->debug_trim_method == ASM_DEBUG_TRIM_DEFAULT &&
moduleHasSubscribersForKeyspaceEvent(NOTIFY_KEY_TRIMMED));
if (activetrim)
asmTriggerActiveTrim(slots, client_id, migration_cleanup);
else
asmTriggerBackgroundTrim(slots, migration_cleanup);
if (activetrim) {
asmTriggerActiveTrim(ctx->slots, client_id, migration_cleanup);
} else {
/* Background trim:
* - Retain ctx for kvsAsyncFreeDoneCB() to release ctx later
* - Trigger background trim. Also updates ctx delta histogram.
* - Schedule completion cb to deduce delta histogram from DB */
asmBgTrimCounterIncr();
asmTrimCtxRetain(ctx);
asmTriggerBackgroundTrim(ctx, migration_cleanup);
bioCreateCompRq(BIO_WORKER_LAZY_FREE, kvsAsyncFreeDoneCB, client_id, ctx);
}
return activetrim ? ASM_TRIM_METHOD_ACTIVE : ASM_TRIM_METHOD_BG;
}
@ -3126,10 +3222,11 @@ void asmTrimJobProcessPending(void) {
listRewind(asmManager->pending_trim_jobs, &li);
while ((ln = listNext(&li)) != NULL) {
slotRangeArray *slots = listNodeValue(ln);
asmTrimSlots(slots, 0, 1);
asmTrimCtx *ctx = asmTrimCtxCreate(slots, server.db[0].keys);
asmTrimSlots(ctx, CLIENT_ID_NONE, 1);
propagateTrimSlots(slots);
listDelNode(asmManager->pending_trim_jobs, ln);
slotRangeArrayFree(slots);
asmTrimCtxRelease(ctx); /* Release ctx (if bg trim, released later by kvsAsyncFreeDoneCB) */
}
}
@ -3299,7 +3396,6 @@ void activeTrimJobFreeMethod(void *ptr) {
/* Reply with the slot ranges that requested to be trimmed. Generally we
* cancel trim jobs as the dataset is reset, no need to trim anymore. */
unblockClientForAsyncFlush(job->client_id, job->slots);
job->slots = NULL; /* slots were freed in unblockClientForAsyncFlush */
}
if (job->slots) slotRangeArrayFree(job->slots);
zfree(job);
@ -3366,6 +3462,7 @@ void trimslotsCommand(client *c) {
* command may have an update for the same key that is supposed to be
* trimmed. We have to trim the keys synchronously. */
clusterDelKeysInSlotRangeArray(slots, 1);
slotRangeArrayFree(slots);
} else {
/* We cannot trim any slot served by this node. */
if (clusterNodeIsMaster(getMyClusterNode())) {
@ -3379,14 +3476,15 @@ void trimslotsCommand(client *c) {
}
}
}
asmTrimSlots(slots, 0, 1);
asmTrimCtx *ctx = asmTrimCtxCreate(slots, server.db[0].keys);
asmTrimSlots(ctx, CLIENT_ID_NONE, 1);
/* Release ctx - if bg trim, will be freed when BIO completes */
asmTrimCtxRelease(ctx);
}
/* Command will not be propagated automatically since it does not modify
* the dataset. */
forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF);
slotRangeArrayFree(slots);
addReply(c, shared.ok);
}
@ -3483,6 +3581,25 @@ int asmIsAnyTrimJobOverlaps(slotRangeArray *slots) {
return 0;
}
/* Decrement background trim counter. Called from completion callback. */
void asmBgTrimCounterDecr(void) {
if (!asmManager) return;
debugServerAssert(asmManager->bg_trim_running > 0);
asmManager->bg_trim_running--;
}
/* Increment background trim counter. */
void asmBgTrimCounterIncr(void) {
if (!asmManager) return;
asmManager->bg_trim_running++;
}
/* Check if background trim is running (for skipping debug assertions). */
int asmIsBgTrimRunning(void) {
if (!asmManager) return 0;
return asmManager->bg_trim_running > 0;
}
/* Check if there is any trim job in progress. */
int asmIsTrimInProgress(void) {
if (!server.cluster_enabled) return 0;

View file

@ -57,6 +57,14 @@ int asmGetTrimmingSlotForCommand(struct redisCommand *cmd, robj **argv, int argc
void asmActiveTrimCycle(void);
int asmIsKeyInTrimJob(sds keyname);
int asmModulePropagateBeforeSlotSnapshot(struct redisCommand *cmd, robj **argv, int argc);
int asmTrimSlots(struct slotRangeArray *slots, uint64_t client_id, int migration_cleanup);
int asmTrimSlots(struct asmTrimCtx *ctx, uint64_t client_id, int migration_cleanup);
int asmIsBgTrimRunning(void);
void asmBgTrimCounterDecr(void);
void asmBgTrimCounterIncr(void);
/* Context for ASM background trim */
struct asmTrimCtx *asmTrimCtxCreate(struct slotRangeArray *slots, kvstore *target_kvstore);
void asmTrimCtxRetain(struct asmTrimCtx *ctx);
void asmTrimCtxRelease(struct asmTrimCtx *ctx);
#endif

152
src/db.c
View file

@ -76,8 +76,7 @@ void updateLRM(robj *o) {
* represents the number of keys with a size in the range 2^i and 2^(i+1)
* exclusive. oldLen/newLen must be smaller than 2^48, and if their value
* equals -1, it means that the key is being created/deleted, respectively. Each
* data type has its own histogram and it is per database (In addition, there is
* histogram per slot for future cluster use).
* data type has its own histogram and it is maintained per database.
*
* Example mapping of key lengths to bins:
* [1,2)->1 [2,4)->2 [4,8)->3 [8,16)->4 ...
@ -85,19 +84,13 @@ void updateLRM(robj *o) {
* Since strings can be zero length, the histogram also tracks:
* [0,1)->0
*/
void updateSlotHist(keysizesHist kvstoreHist, keysizesHist dictHist, uint32_t type, int64_t oldLen, int64_t newLen) {
void kvsUpdateHistogram(keysizesHist kvstoreHist, uint32_t type, int64_t oldLen, int64_t newLen) {
if(unlikely(type >= OBJ_TYPE_BASIC_MAX))
return;
if (oldLen > 0) {
int old_bin = log2ceil(oldLen) + 1;
debugServerAssert(old_bin < MAX_KEYSIZES_BINS);
/* If following a key deletion it is last one in slot's dict, then
* slot's dict might get released as well. Verify if metadata is not NULL. */
if (dictHist) {
dictHist[type][old_bin]--;
debugServerAssert(dictHist[type][old_bin] >= 0);
}
kvstoreHist[type][old_bin]--;
debugServerAssert(kvstoreHist[type][old_bin] >= 0);
} else {
@ -105,11 +98,6 @@ void updateSlotHist(keysizesHist kvstoreHist, keysizesHist dictHist, uint32_t ty
if (oldLen == 0) {
/* Only strings can be empty. Yet, a command flow might temporarily
* dbAdd() empty collection, and only after add elements. */
if (dictHist) {
dictHist[type][0]--;
debugServerAssert(dictHist[type][0] >= 0);
}
kvstoreHist[type][0]--;
debugServerAssert(kvstoreHist[type][0] >= 0);
}
@ -118,31 +106,24 @@ void updateSlotHist(keysizesHist kvstoreHist, keysizesHist dictHist, uint32_t ty
if (newLen > 0) {
int new_bin = log2ceil(newLen) + 1;
debugServerAssert(new_bin < MAX_KEYSIZES_BINS);
/* If following a key deletion it is last one in slot's dict, then
* slot's dict might get released as well. Verify if metadata is not NULL. */
if (dictHist) dictHist[type][new_bin]++;
kvstoreHist[type][new_bin]++;
} else {
/* here, newLen can be either 0 or -1 */
if (newLen == 0) {
/* Only strings can be empty. Yet, a command flow might temporarily
* dbAdd() empty collection, and only after add elements. */
if (dictHist) dictHist[type][0]++;
kvstoreHist[type][0]++;
}
}
}
void updateKeysizesHist(redisDb *db, int didx, uint32_t type, int64_t oldLen, int64_t newLen) {
void updateKeysizesHist(redisDb *db, uint32_t type, int64_t oldLen, int64_t newLen) {
kvstoreMetadata *kvstoreMeta = kvstoreGetMetadata(db->keys);
kvstoreDictMetadata *dictMeta = kvstoreGetDictMeta(db->keys, didx, 0);
updateSlotHist(kvstoreMeta->keysizes_hist, dictMeta ? dictMeta->keysizes_hist : NULL, type, oldLen, newLen);
kvsUpdateHistogram(kvstoreMeta->keysizes_hist, type, oldLen, newLen);
}
void updateSlotAllocSize(redisDb *db, int didx, kvobj *kv, int64_t oldsize, int64_t newsize) {
debugServerAssert(server.memory_tracking_enabled);
kvstoreMetadata *kvstoreMeta = kvstoreGetMetadata(db->keys);
kvstoreDictMetadata *dictMeta = kvstoreGetDictMeta(db->keys, didx, 0);
/* Early return if nothing changed */
@ -160,7 +141,8 @@ void updateSlotAllocSize(redisDb *db, int didx, kvobj *kv, int64_t oldsize, int6
}
/* Update allocation size histogram */
updateSlotHist(kvstoreMeta->allocsizes_hist, NULL, kv->type, oldsize, newsize);
kvstoreMetadata *kvstoreMeta = kvstoreGetMetadata(db->keys);
kvsUpdateHistogram(kvstoreMeta->allocsizes_hist, kv->type, oldsize, newsize);
}
static void dbgAssertHist(kvstore *kvs, keysizesHist hist,
@ -201,35 +183,21 @@ static void dbgAssertHist(kvstore *kvs, keysizesHist hist,
}
}
/* Assert keysizes histogram (For debugging only)
*
* Triggered by DEBUG KEYSIZES-HIST-ASSERT 1 and tested after each command.
*/
void dbgAssertKeysizesHist(redisDb *db) {
/* Don't assert during nested calls. Intermediate state may be inconsistent. */
if (server.execution_nesting) return;
/* Don't assert during RDB loading. Database may be in inconsistent state. */
if (server.loading || server.async_loading) return;
/* Assert keysizes histogram (For debugging only) */
static void dbgAssertKeysizesHist(redisDb *db) {
kvstoreMetadata *meta = kvstoreGetMetadata(db->keys);
dbgAssertHist(db->keys, meta->keysizes_hist, getObjectLength, "dbgAssertKeysizesHist");
if (server.memory_tracking_enabled)
dbgAssertHist(db->keys, meta->allocsizes_hist, kvobjAllocSize, "dbgAssertAllocsizesHist");
}
/* Assert per-slot alloc_size (For debugging only)
*
* Triggered by DEBUG ALLOCSIZE-SLOTS-ASSERT 1 and tested after each command.
*/
void dbgAssertAllocSizePerSlot(redisDb *db) {
/* Don't assert during nested calls. Intermediate state may be inconsistent. */
if (server.execution_nesting) return;
/* Don't assert during RDB loading. Database may be in inconsistent state. */
if (server.loading || server.async_loading) return;
/* Assert per-slot alloc_size (For debugging only) */
static void dbgAssertAllocSizePerSlot(redisDb *db) {
if (!server.memory_tracking_enabled) return;
/* Check allocsizes histogram per db */
kvstoreMetadata *meta = kvstoreGetMetadata(db->keys);
dbgAssertHist(db->keys, meta->allocsizes_hist, kvobjAllocSize, "dbgAssertAllocsizesHist");
/* Check alloc_size per slot */
size_t slot_sizes[CLUSTER_SLOTS] = {0};
dictEntry *de;
kvstoreIterator kvs_it;
@ -252,6 +220,28 @@ void dbgAssertAllocSizePerSlot(redisDb *db) {
}
}
/* Run debug assertions based on server.dbg_assert_flags.
*
* DBG_ASSERT_KEYSIZES: Triggered by DEBUG KEYSIZES-HIST-ASSERT 1
* DBG_ASSERT_ALLOC_SLOT: Triggered by DEBUG ALLOCSIZE-SLOTS-ASSERT 1
*/
void dbgRunAssertions(redisDb *db) {
/* Don't assert during nested calls. Intermediate state may be inconsistent. */
if (server.execution_nesting) return;
/* Don't assert during RDB loading. Database may be in inconsistent state. */
if (server.loading || server.async_loading) return;
/* Don't assert during ASM background trim. Histogram delta hasn't been applied yet. */
if (asmIsBgTrimRunning()) return;
if (server.dbg_assert_flags & DBG_ASSERT_KEYSIZES)
dbgAssertKeysizesHist(db);
if (server.dbg_assert_flags & DBG_ASSERT_ALLOC_SLOT)
dbgAssertAllocSizePerSlot(db);
}
/* Lookup a kvobj for read or write operations, or return NULL if the it is not
* found in the specified DB. This function implements the functionality of
* lookupKeyRead(), lookupKeyWrite() and their ...WithFlags() variants.
@ -450,7 +440,7 @@ kvobj *dbAddInternal(redisDb *db, robj *key, robj **valref, dictEntryLink *link,
signalKeyAsReady(db, key, kv->type);
notifyKeyspaceEvent(NOTIFY_NEW,"new",key,db->id);
updateKeysizesHist(db, slot, kv->type, -1, getObjectLength(kv)); /* add hist */
updateKeysizesHist(db, kv->type, -1, getObjectLength(kv)); /* add hist */
if (server.memory_tracking_enabled)
updateSlotAllocSize(db, slot, kv, -1, kvobjAllocSize(kv));
*valref = kv;
@ -555,7 +545,7 @@ kvobj *dbAddRDBLoad(redisDb *db, sds key, robj **valref, const KeyMetaSpec *keyM
keyMetaSpec->numMeta * sizeof(uint64_t));
}
updateKeysizesHist(db, slot, kv->type, -1, (int64_t) getObjectLength(kv));
updateKeysizesHist(db, kv->type, -1, (int64_t) getObjectLength(kv));
if (server.memory_tracking_enabled)
updateSlotAllocSize(db, slot, kv, -1, kvobjAllocSize(kv));
return *valref = kv;
@ -685,10 +675,10 @@ static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link
if (updateKeySizes) {
/* Save one call if old and new are the same type */
if (oldtype == kvNew->type) {
updateKeysizesHist(db, slot, oldtype, oldlen, newlen);
updateKeysizesHist(db, oldtype, oldlen, newlen);
} else {
updateKeysizesHist(db, slot, oldtype, oldlen, -1);
updateKeysizesHist(db, slot, kvNew->type, -1, newlen);
updateKeysizesHist(db, oldtype, oldlen, -1);
updateKeysizesHist(db, kvNew->type, -1, newlen);
}
}
@ -899,7 +889,7 @@ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) {
/* remove key from histogram */
if(!(flags & DB_FLAG_NO_UPDATE_KEYSIZES))
updateKeysizesHist(db, slot, type, oldlen, -1);
updateKeysizesHist(db, type, oldlen, -1);
return 1;
} else {
return 0;
@ -1263,15 +1253,41 @@ void blockClientForAsyncFlush(client *c) {
blockClient(c, BLOCKED_LAZYFREE);
}
/* CB function on blocking ASYNC FLUSH completion.
* We will unblock the client and send the proper reply. */
void unblockClientForAsyncFlush(uint64_t client_id, void *userdata) {
slotRangeArray *slots = userdata;
/* CB function on blocking ASYNC FLUSH/TRIM completion.
* We will unblock the client and send the proper reply if provided. */
void kvsAsyncFreeDoneCB(uint64_t client_id, void *userdata) {
/* If ASM Trim context provided, apply histogram delta */
asmTrimCtx *ctx = userdata;
if (ctx) {
kvstoreMetadata *meta = kvstoreGetMetadata(server.db[0].keys);
/* Apply histogram delta only if target_kvstore hasn't changed */
if (ctx->target_kvstore == server.db[0].keys && meta) {
for (int type = 0; type < MAX_KEYSIZES_TYPES; type++) {
for (int bin = 0; bin < MAX_KEYSIZES_BINS; bin++) {
meta->keysizes_hist[type][bin] -= ctx->delta_keysizes_hist[type][bin];
meta->allocsizes_hist[type][bin] -= ctx->delta_allocsizes_hist[type][bin];
}
}
}
/* Decrement counter unconditionally to track job completion. If kvstore was
* replaced (e.g., by FLUSHALL), the new histogram is already consistent (reset
* to 0 for empty DB), so it's safe to resume assertions when counter reaches 0. */
asmBgTrimCounterDecr();
}
unblockClientForAsyncFlush(client_id, (ctx) ? ctx->slots : NULL);
/* Release context and slots */
asmTrimCtxRelease(ctx);
}
/* Unblock client on async flush/trim completion */
void unblockClientForAsyncFlush(uint64_t client_id, struct slotRangeArray *slots) {
client *c = lookupClientByID(client_id);
/* Verify that client still exists and being blocked. */
if (!(c && c->flags & CLIENT_BLOCKED)) {
slotRangeArrayFree(slots);
return;
}
@ -1284,7 +1300,7 @@ void unblockClientForAsyncFlush(uint64_t client_id, void *userdata) {
/* Only SFLUSH command pass user data pointer. */
if (slots)
replySlotsFlushAndFree(c, slots);
replySlotsFlush(c, slots);
else
addReply(c, shared.ok);
@ -1311,11 +1327,10 @@ void unblockClientForAsyncFlush(uint64_t client_id, void *userdata) {
* Return 1 indicates that flush SYNC is actually running in bg as blocking ASYNC
* Return 0 otherwise
*
* slots - provided only by SFLUSH command, otherwise NULL. Will be used on
* completion to reply with the slots flush result. Ownership is passed
* to the completion job in case of `blocking_async`.
* trim_ctx - provided only by SFLUSH command, otherwise NULL. Contains slots to
* be used on completion to reply with the slots flush result.
*/
int flushCommandCommon(client *c, int type, int flags, struct slotRangeArray *slots) {
int flushCommandCommon(client *c, int type, int flags, asmTrimCtx *trim_ctx) {
int blocking_async = 0; /* Flush SYNC option to run as blocking ASYNC */
/* in case of SYNC, check if we can optimize and run it in bg as blocking ASYNC */
@ -1326,7 +1341,7 @@ int flushCommandCommon(client *c, int type, int flags, struct slotRangeArray *sl
}
/* Cancel all ASM tasks that overlap with the given slot ranges. */
clusterAsmCancelBySlotRangeArray(slots, c->argv[0]->ptr);
clusterAsmCancelBySlotRangeArray(trim_ctx ? trim_ctx->slots : NULL, c->argv[0]->ptr);
if (type == FLUSH_TYPE_ALL)
flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS);
@ -1342,7 +1357,12 @@ int flushCommandCommon(client *c, int type, int flags, struct slotRangeArray *sl
* lazyfree jobs in queue were processed */
if (blocking_async) {
blockClientForAsyncFlush(c);
bioCreateCompRq(BIO_WORKER_LAZY_FREE, unblockClientForAsyncFlush, c->id, slots);
/* Retain trim_ctx if provided so kvsAsyncFreeDoneCB can release it later */
if (trim_ctx) {
asmBgTrimCounterIncr();
asmTrimCtxRetain(trim_ctx);
}
bioCreateCompRq(BIO_WORKER_LAZY_FREE, kvsAsyncFreeDoneCB, c->id, trim_ctx);
}
#if defined(USE_JEMALLOC)

View file

@ -553,13 +553,19 @@ NULL
long long flag;
if (getLongLongFromObjectOrReply(c, c->argv[2], &flag, NULL) != C_OK)
return;
server.dbg_assert_keysizes = (flag != 0);
if (flag)
server.dbg_assert_flags |= DBG_ASSERT_KEYSIZES;
else
server.dbg_assert_flags &= ~DBG_ASSERT_KEYSIZES;
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"ALLOCSIZE-SLOTS-ASSERT") && c->argc == 3) {
long long flag;
if (getLongLongFromObjectOrReply(c, c->argv[2], &flag, NULL) != C_OK)
return;
server.dbg_assert_alloc_per_slot = (flag != 0);
if (flag)
server.dbg_assert_flags |= DBG_ASSERT_ALLOC_SLOT;
else
server.dbg_assert_flags &= ~DBG_ASSERT_ALLOC_SLOT;
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"log") && c->argc == 3) {
serverLog(LL_WARNING, "DEBUG LOG: %s", (char*)c->argv[2]->ptr);

View file

@ -1650,7 +1650,7 @@ void pfaddCommand(client *c) {
}
hdr = kv->ptr;
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_STRING, oldlen, stringObjectLen(kv));
updateKeysizesHist(c->db, OBJ_STRING, oldlen, stringObjectLen(kv));
if (server.memory_tracking_enabled)
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), kv, oldsize, kvobjAllocSize(kv));
if (updated) {
@ -1841,8 +1841,7 @@ void pfmergeCommand(client *c) {
* since in theory this is a mass-add of elements. */
notifyKeyspaceEvent(NOTIFY_STRING,"pfadd",c->argv[1],c->db->id);
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr),
OBJ_STRING, oldLen, stringObjectLen(kv));
updateKeysizesHist(c->db, OBJ_STRING, oldLen, stringObjectLen(kv));
server.dirty++;
addReply(c,shared.ok);
}
@ -2011,7 +2010,7 @@ void pfdebugCommand(client *c) {
addReplyError(c,invalid_hll_err);
return;
}
updateKeysizesHist(c->db, getKeySlot(c->argv[2]->ptr), OBJ_STRING, oldlen, stringObjectLen(o));
updateKeysizesHist(c->db, OBJ_STRING, oldlen, stringObjectLen(o));
if (server.memory_tracking_enabled)
updateSlotAllocSize(c->db, getKeySlot(c->argv[2]->ptr), o, oldsize, kvobjAllocSize(o));
server.dirty++; /* Force propagation on encoding change. */
@ -2080,7 +2079,7 @@ void pfdebugCommand(client *c) {
addReplyError(c,invalid_hll_err);
return;
}
updateKeysizesHist(c->db, getKeySlot(c->argv[2]->ptr), OBJ_STRING, oldlen, stringObjectLen(o));
updateKeysizesHist(c->db, OBJ_STRING, oldlen, stringObjectLen(o));
if (server.memory_tracking_enabled)
updateSlotAllocSize(c->db, getKeySlot(c->argv[2]->ptr), o, oldsize, kvobjAllocSize(o));
conv = 1;

View file

@ -3,6 +3,7 @@
#include "atomicvar.h"
#include "functions.h"
#include "cluster.h"
#include "cluster_asm.h"
#include "ebuckets.h"
static redisAtomic size_t lazyfree_objects = 0;
@ -17,14 +18,50 @@ void lazyfreeFreeObject(void *args[]) {
atomicIncr(lazyfreed_objects,1);
}
/* Populate delta histograms by iterating through keys in the kvstore. To be
* deduced from the main db histogram later on kvsAsyncFreeDoneCB */
static void populateDeltaHistograms(kvstore *kvs, asmTrimCtx *ctx) {
kvstoreIterator kvs_it;
kvstoreIteratorInit(&kvs_it, kvs);
dictEntry *de;
while ((de = kvstoreIteratorNext(&kvs_it)) != NULL) {
kvobj *kv = dictGetKV(de);
if ((!kv) || (kv->type >= OBJ_TYPE_BASIC_MAX)) continue;
/* Update keysizes_hist delta */
size_t len = getObjectLength(kv);
int sizeBin = (len == 0) ? 0 : log2ceil(len) + 1; /* Only strings can be empty */
debugServerAssert(sizeBin < MAX_KEYSIZES_BINS);
ctx->delta_keysizes_hist[kv->type][sizeBin]++;
/* Update allocsizes_hist delta */
if (server.memory_tracking_enabled) {
size_t alloc_size = kvobjAllocSize(kv);
int allocBin = (alloc_size == 0) ? 0 : log2ceil(alloc_size) + 1;
debugServerAssert(allocBin < MAX_KEYSIZES_BINS);
ctx->delta_allocsizes_hist[kv->type][allocBin]++;
}
}
kvstoreIteratorReset(&kvs_it);
}
/* Release a database from the lazyfree thread. The 'db' pointer is the
* database which was substituted with a fresh one in the main thread
* when the database was logically deleted. */
void lazyfreeFreeDatabase(void *args[]) {
* when the database was logically deleted.
*
* If args[3] is provided, it's an asmTrimCtx for tracking histogram deltas
* during ASM background trim. */
void kvsLazyfreeFree(void *args[]) {
kvstore *da1 = args[0];
kvstore *da2 = args[1];
estore *subexpires = args[2];
dict *stream_idmp_keys = args[3];
asmTrimCtx *ctx = args[4]; /* Optional: ASM trim context */
/* If ASM context provided, populate delta histograms */
if (ctx) populateDeltaHistograms(da1, ctx);
estoreRelease(subexpires);
dictRelease(stream_idmp_keys);
size_t numkeys = kvstoreSize(da1);
@ -297,13 +334,13 @@ void emptyDbAsync(redisDb *db) {
db->subexpires = estoreCreate(&subexpiresBucketsType, slot_count_bits);
db->stream_idmp_keys = dictCreate(&objectKeyPointerValueDictType);
protectClientReplyObjects(); /* Protect client reply objects before async free. */
emptyDbDataAsync(oldkeys, oldexpires, oldsubexpires, old_stream_idmp_keys);
emptyDbDataAsync(oldkeys, oldexpires, oldsubexpires, old_stream_idmp_keys, NULL);
}
/* Empty a Redis DB data asynchronously. */
void emptyDbDataAsync(kvstore *keys, kvstore *expires, ebuckets hexpires, dict *stream_idmp_keys) {
/* Empty a kvstore data asynchronously. */
void emptyDbDataAsync(kvstore *keys, kvstore *expires, ebuckets hexpires, dict *stream_idmp_keys, asmTrimCtx *ctx) {
atomicIncr(lazyfree_objects, kvstoreSize(keys));
bioCreateLazyFreeJob(lazyfreeFreeDatabase, 4, keys, expires, hexpires, stream_idmp_keys);
bioCreateLazyFreeJob(kvsLazyfreeFree, 5, keys, expires, hexpires, stream_idmp_keys, ctx);
}
/* Free the key tracking table.

View file

@ -4746,7 +4746,7 @@ int RM_StringTruncate(RedisModuleKey *key, size_t newlen) {
if (server.memory_tracking_enabled)
updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv));
if (curlen != newlen)
updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_STRING, curlen, newlen);
updateKeysizesHist(key->db, OBJ_STRING, curlen, newlen);
}
return REDISMODULE_OK;
}
@ -4861,7 +4861,7 @@ int RM_ListPush(RedisModuleKey *key, int where, RedisModuleString *ele) {
listTypePush(key->kv, ele,
(where == REDISMODULE_LIST_HEAD) ? LIST_HEAD : LIST_TAIL);
int64_t l = listTypeLength(key->kv);
updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_LIST, l-1, l);
updateKeysizesHist(key->db, OBJ_LIST, l-1, l);
if (server.memory_tracking_enabled)
updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv));
return REDISMODULE_OK;
@ -4899,7 +4899,7 @@ RedisModuleString *RM_ListPop(RedisModuleKey *key, int where) {
robj *decoded = getDecodedObject(ele);
decrRefCount(ele);
int64_t l = (int64_t) listTypeLength(key->kv);
updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_LIST, l+1, l);
updateKeysizesHist(key->db, OBJ_LIST, l+1, l);
if (server.memory_tracking_enabled)
updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv));
if (!moduleDelKeyIfEmpty(key)) {
@ -5027,7 +5027,7 @@ int RM_ListInsert(RedisModuleKey *key, long index, RedisModuleString *value) {
int where = index < 0 ? LIST_TAIL : LIST_HEAD;
listTypeInsert(&key->u.list.entry, value, where);
int64_t l = (int64_t) listTypeLength(key->kv);
updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_LIST, l-1, l);
updateKeysizesHist(key->db, OBJ_LIST, l-1, l);
if (server.memory_tracking_enabled)
updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv));
/* A note in quicklist.c forbids use of iterator after insert. */
@ -5058,7 +5058,7 @@ int RM_ListDelete(RedisModuleKey *key, long index) {
oldsize = kvobjAllocSize(key->kv);
listTypeDelete(key->iter, &key->u.list.entry);
int64_t l = (int64_t) listTypeLength(key->kv);
updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_LIST, l+1, l);
updateKeysizesHist(key->db, OBJ_LIST, l+1, l);
if (server.memory_tracking_enabled)
updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv));
if (moduleDelKeyIfEmpty(key)) return REDISMODULE_OK;
@ -5167,7 +5167,7 @@ int RM_ZsetAdd(RedisModuleKey *key, double score, RedisModuleString *ele, int *f
}
if (flagsptr) *flagsptr = moduleZsetAddFlagsFromCoreFlags(out_flags);
int64_t l = (int64_t) zsetLength(key->kv);
updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_ZSET, l-1, l);
updateKeysizesHist(key->db, OBJ_ZSET, l-1, l);
if (server.memory_tracking_enabled)
updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv));
return REDISMODULE_OK;
@ -5207,7 +5207,7 @@ int RM_ZsetIncrby(RedisModuleKey *key, double score, RedisModuleString *ele, int
updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv));
if (out_flags & ZADD_OUT_ADDED) {
int64_t l = (int64_t) zsetLength(key->kv);
updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_ZSET, l-1, l);
updateKeysizesHist(key->db, OBJ_ZSET, l-1, l);
}
if (flagsptr) *flagsptr = moduleZsetAddFlagsFromCoreFlags(out_flags);
return REDISMODULE_OK;
@ -5244,7 +5244,7 @@ int RM_ZsetRem(RedisModuleKey *key, RedisModuleString *ele, int *deleted) {
if (zsetDel(key->kv,ele->ptr)) {
if (deleted) *deleted = 1;
int64_t l = (int64_t) zsetLength(key->kv);
updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_ZSET, l+1, l);
updateKeysizesHist(key->db, OBJ_ZSET, l+1, l);
if (server.memory_tracking_enabled)
updateSlotAllocSize(key->db, getKeySlot(key->key->ptr), key->kv, oldsize, kvobjAllocSize(key->kv));
moduleDelKeyIfEmpty(key);
@ -5747,7 +5747,7 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) {
}
}
va_end(ap);
updateKeysizesHist(key->db, getKeySlot(key->key->ptr), OBJ_HASH, oldlen,
updateKeysizesHist(key->db, OBJ_HASH, oldlen,
(int64_t) hashTypeLength(key->kv, 0));
moduleDelKeyIfEmpty(key);

View file

@ -350,8 +350,13 @@ static void dictDestructorKV(dict *d, void *key) {
meta->alloc_size -= alloc_size;
/* kvstoreMeta may be NULL when freeing kvstore created with kvstoreBaseType
* (e.g. in lazy free context). */
if (kvstoreMeta)
updateSlotHist(kvstoreMeta->allocsizes_hist, NULL, kv->type, alloc_size, -1);
if (kvstoreMeta && kv->type < OBJ_TYPE_BASIC_MAX) {
/* we don't call kvsUpdateHistogram() because it contains debugServerAssert
* that may fail in bg thread as kvstore might not being fully initialized */
int old_bin = (alloc_size == 0) ? 0 : log2ceil(alloc_size) + 1;
debugServerAssert(old_bin < MAX_KEYSIZES_BINS);
kvstoreMeta->allocsizes_hist[kv->type][old_bin]--;
}
}
decrRefCount(kv);
}
@ -540,11 +545,11 @@ static void kvstoreOnEmpty(kvstore *kvs) {
static void kvstoreOnDictEmpty(kvstore *kvs, int didx) {
kvstoreDictMetadata *meta = kvstoreGetDictMeta(kvs, didx, 0);
UNUSED(meta);
#ifdef DEBUG_ASSERTIONS
dictEmpty(kvstoreGetDict(kvs, didx), NULL);
#endif
debugServerAssert(meta->alloc_size == 0);
memset(&meta->keysizes_hist, 0, sizeof(meta->keysizes_hist));
}
/* Return 1 if currently we allow dict to expand. Dict may allocate huge
@ -2325,11 +2330,9 @@ void initServerConfig(void) {
server.executable = NULL;
server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
#if DEBUG_ASSERT_KEYSPACE
server.dbg_assert_keysizes = 1;
server.dbg_assert_alloc_per_slot = 1;
server.dbg_assert_flags = DBG_ASSERT_KEYSIZES | DBG_ASSERT_ALLOC_SLOT;
#else
server.dbg_assert_keysizes = 0;
server.dbg_assert_alloc_per_slot = 0;
server.dbg_assert_flags = 0;
#endif
server.bindaddr_count = CONFIG_DEFAULT_BINDADDR_COUNT;
for (j = 0; j < CONFIG_DEFAULT_BINDADDR_COUNT; j++)
@ -4194,13 +4197,9 @@ void afterCommand(client *c) {
if (!server.execution_nesting)
listJoin(c->reply, server.pending_push_messages);
/* Assert keysizes histogram if enabled */
if (unlikely(server.dbg_assert_keysizes))
dbgAssertKeysizesHist(c->db);
/* Assert per-slot alloc_size if enabled */
if (unlikely(server.dbg_assert_alloc_per_slot))
dbgAssertAllocSizePerSlot(c->db);
/* Run debug assertions if any are enabled */
if (unlikely(server.dbg_assert_flags))
dbgRunAssertions(c->db);
}
/* Check if c->cmd exists, fills `err` with details in case it doesn't.

View file

@ -1199,9 +1199,17 @@ typedef struct {
uint64_t cpu_usec; /* CPU time (in microseconds) spent on given slot */
uint64_t network_bytes_in; /* Network ingress (in bytes) received for given slot */
uint64_t network_bytes_out; /* Network egress (in bytes) sent for given slot */
keysizesHist keysizes_hist;
} kvstoreDictMetadata;
/* Context for ASM background trim with delta histogram tracking */
typedef struct asmTrimCtx {
int refcount; /* For shared bg/main thread ownership */
struct slotRangeArray *slots; /* Slot ranges being trimmed */
kvstore *target_kvstore; /* Target kvstore to update (for validation) */
keysizesHist delta_keysizes_hist; /* Delta populated by BIO thread */
keysizesHist delta_allocsizes_hist;/* Delta populated by BIO thread */
} asmTrimCtx;
/* forward declaration for functions ctx */
typedef struct functionsLibCtx functionsLibCtx;
@ -1339,6 +1347,8 @@ typedef struct {
#define CLIENT_ID_AOF (UINT64_MAX) /* Reserved ID for the AOF client. If you
need more reserved IDs use UINT64_MAX-1,
-2, ... and so forth. */
#define CLIENT_ID_NONE (0) /* Non-existent client ID, used when no client
is associated with an operation. */
/* Replication backlog is not a separate memory, it just is one consumer of
* the global replication buffer. This structure records the reference of
@ -2512,10 +2522,13 @@ struct redisServer {
int reply_copy_avoidance_enabled; /* Is reply copy avoidance enabled (1 by default) */
/* Local environment */
char *locale_collate;
int dbg_assert_keysizes; /* Assert keysizes histogram after each command */
int dbg_assert_alloc_per_slot; /* Assert per-slot alloc_size after each command */
unsigned int dbg_assert_flags; /* Bitmask of debug assertions to run after each command */
};
/* Debug assertion flags for server.dbg_assert_flags */
#define DBG_ASSERT_KEYSIZES (1 << 0) /* Assert keysizes histogram */
#define DBG_ASSERT_ALLOC_SLOT (1 << 1) /* Assert per-slot alloc_size */
/* we use 6 so that all getKeyResult fits a cacheline */
#define MAX_KEYS_BUFFER 6
@ -3897,11 +3910,10 @@ int moduleSetEnumConfig(client *c, sds name, sds *vals, int vals_cnt, const char
int moduleSetNumericConfig(client *c, sds name, long long val, const char **err);
/* db.c -- Keyspace access API */
void updateKeysizesHist(redisDb *db, int didx, uint32_t type, int64_t oldLen, int64_t newLen);
void kvsUpdateHistogram(keysizesHist kvstoreHist, uint32_t type, int64_t oldLen, int64_t newLen);
void updateKeysizesHist(redisDb *db, uint32_t type, int64_t oldLen, int64_t newLen);
void updateSlotAllocSize(redisDb *db, int didx, kvobj *kv, int64_t oldsize, int64_t newsize);
void updateSlotHist(keysizesHist kvstoreHist, keysizesHist dictHist, uint32_t type, int64_t oldLen, int64_t newLen);
void dbgAssertKeysizesHist(redisDb *db);
void dbgAssertAllocSizePerSlot(redisDb *db);
void dbgRunAssertions(redisDb *db);
int removeExpire(redisDb *db, robj *key);
void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj);
void deleteEvictedKeyAndPropagate(redisDb *db, robj *keyobj, long long *key_mem_freed);
@ -3959,9 +3971,10 @@ kvobj *dbUnshareStringValueByLink(redisDb *db, robj *key, kvobj *kv, dictEntryLi
#define FLUSH_TYPE_ALL 0
#define FLUSH_TYPE_DB 1
#define FLUSH_TYPE_SLOTS 2
void replySlotsFlushAndFree(client *c, struct slotRangeArray *slots);
int flushCommandCommon(client *c, int type, int flags, struct slotRangeArray *ranges);
void unblockClientForAsyncFlush(uint64_t client_id, void *userdata);
void replySlotsFlush(client *c, struct slotRangeArray *slots);
int flushCommandCommon(client *c, int type, int flags, struct asmTrimCtx *trim_ctx);
void kvsAsyncFreeDoneCB(uint64_t client_id, void *userdata);
void unblockClientForAsyncFlush(uint64_t client_id, struct slotRangeArray *slots);
void blockClientForAsyncFlush(client *c);
#define EMPTYDB_NO_FLAGS 0 /* No flags. */
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
@ -3982,7 +3995,7 @@ int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor);
int dbAsyncDelete(redisDb *db, robj *key);
void emptyDbAsync(redisDb *db);
void streamMoveIdmpKeys(dict *src, dict *dst, int slot);
void emptyDbDataAsync(kvstore *keys, kvstore *expires, ebuckets hexpires, dict *stream_idmp_keys);
void emptyDbDataAsync(kvstore *keys, kvstore *expires, ebuckets hexpires, dict *stream_idmp_keys, struct asmTrimCtx *ctx);
size_t lazyfreeGetPendingObjectsCount(void);
size_t lazyfreeGetFreedObjectsCount(void);
void lazyfreeResetStats(void);

View file

@ -407,7 +407,7 @@ void listpackExExpire(redisDb *db, kvobj *kv, ExpireInfo *info, int activeEx) {
/* update keysizes */
unsigned long l = lpLength(lpt->lp) / 3;
updateKeysizesHist(db, getKeySlot(key), OBJ_HASH, l + expired, l);
updateKeysizesHist(db, OBJ_HASH, l + expired, l);
}
min = hashTypeGetMinExpire(kv, 1 /*accurate*/);
@ -774,7 +774,7 @@ GetFieldRes hashTypeGetValue(redisDb *db, kvobj *o, sds field, unsigned char **v
if (!(hfeFlags & HFE_LAZY_NO_UPDATE_KEYSIZES)) {
uint64_t l = hashTypeLength(o, 0);
updateKeysizesHist(db, getKeySlot(key), OBJ_HASH, l+1, l);
updateKeysizesHist(db, OBJ_HASH, l+1, l);
}
/* If the field is the last one in the hash, then the hash will be deleted */
@ -2102,7 +2102,7 @@ void hsetnxCommand(client *c) {
keyModified(c,c->db,c->argv[1], kv, 1);
notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);
hlen = hashTypeLength(kv, 0);
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, hlen - 1, hlen);
updateKeysizesHist(c->db, OBJ_HASH, hlen - 1, hlen);
if (server.memory_tracking_enabled)
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), kv, oldsize, kvobjAllocSize(kv));
server.dirty++;
@ -2138,7 +2138,7 @@ void hsetCommand(client *c) {
}
keyModified(c,c->db,c->argv[1],kv,1);
unsigned long l = hashTypeLength(kv, 0);
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, l - created, l);
updateKeysizesHist(c->db, OBJ_HASH, l - created, l);
if (server.memory_tracking_enabled)
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), kv, oldsize, kvobjAllocSize(kv));
notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);
@ -2489,8 +2489,7 @@ out:
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id);
}
if (oldlen != newlen)
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH,
oldlen, newlen);
updateKeysizesHist(c->db, OBJ_HASH, oldlen, newlen);
}
void hincrbyCommand(client *c) {
@ -2516,13 +2515,13 @@ void hincrbyCommand(client *c) {
} else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) {
value = 0;
unsigned long l = hashTypeLength(o, 0);
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, l, l + 1);
updateKeysizesHist(c->db, OBJ_HASH, l, l + 1);
} else {
/* Field expired and in turn hash deleted. Create new one! */
o = createHashObject();
dbAdd(c->db,c->argv[1],&o);
value = 0;
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, 0, 1);
updateKeysizesHist(c->db, OBJ_HASH, 0, 1);
}
oldvalue = value;
@ -2573,13 +2572,13 @@ void hincrbyfloatCommand(client *c) {
} else if ((res == GETF_NOT_FOUND) || (res == GETF_EXPIRED)) {
value = 0;
unsigned long l = hashTypeLength(o, 0);
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, l, l + 1);
updateKeysizesHist(c->db, OBJ_HASH, l, l + 1);
} else {
/* Field expired and in turn hash deleted. Create new one! */
o = createHashObject();
dbAdd(c->db, c->argv[1], &o);
value = 0;
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, 0, 1);
updateKeysizesHist(c->db, OBJ_HASH, 0, 1);
}
value += incr;
@ -2770,8 +2769,7 @@ void hgetdelCommand(client *c) {
}
if (oldlen != newlen)
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH,
oldlen, newlen);
updateKeysizesHist(c->db, OBJ_HASH, oldlen, newlen);
}
/* Get the value of one or more fields of a given hash key and optionally set
@ -2907,7 +2905,7 @@ void hgetexCommand(client *c) {
* or the new expiration time is in the past.*/
newlen = hashTypeLength(o, 0);
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, oldlen, newlen);
updateKeysizesHist(c->db, OBJ_HASH, oldlen, newlen);
if (newlen == 0) {
dbDelete(c->db, c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id);
@ -2966,7 +2964,7 @@ void hdelCommand(client *c) {
estoreRemove(c->db->subexpires, getKeySlot(c->argv[1]->ptr), o);
newLen = oldLen - deleted;
}
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, oldLen, newLen);
updateKeysizesHist(c->db, OBJ_HASH, oldLen, newLen);
server.dirty += deleted;
}
addReplyLongLong(c,deleted);
@ -3514,7 +3512,7 @@ static ExpireAction onFieldExpire(eItem item, void *ctx) {
/* update keysizes */
unsigned long l = hashTypeLength(expCtx->hashObj, 0);
updateKeysizesHist(expCtx->db, getKeySlot(key), OBJ_HASH, l, l - 1);
updateKeysizesHist(expCtx->db, OBJ_HASH, l, l - 1);
serverAssert(hashTypeDelete(expCtx->hashObj, field) == 1);
if (server.memory_tracking_enabled)
@ -3877,8 +3875,7 @@ static void hexpireGenericCommand(client *c, long long basetime, int unit) {
}
if (oldlen != newlen)
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH,
oldlen, newlen);
updateKeysizesHist(c->db, OBJ_HASH, oldlen, newlen);
/* Avoid propagating command if not even one field was updated (Either because
* the time is in the past, and corresponding HDELs were sent, or conditions

View file

@ -513,7 +513,7 @@ void pushGenericCommand(client *c, int where, int xx) {
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
keyModified(c,c->db,c->argv[1],lobj,1);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_LIST, llen - (c->argc - 2), llen);
updateKeysizesHist(c->db, OBJ_LIST, llen - (c->argc - 2), llen);
if (server.memory_tracking_enabled)
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), lobj, oldsize, kvobjAllocSize(lobj));
}
@ -590,7 +590,7 @@ void linsertCommand(client *c) {
c->argv[1],c->db->id);
server.dirty++;
unsigned long ll = listTypeLength(subject);
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_LIST, ll-1, ll);
updateKeysizesHist(c->db, OBJ_LIST, ll-1, ll);
} else {
/* Notify client of a failed insert */
addReplyLongLong(c,-1);
@ -786,7 +786,7 @@ void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, s
unsigned long llen = listTypeLength(o);
notifyKeyspaceEvent(NOTIFY_LIST, event, key, c->db->id);
updateKeysizesHist(c->db, getKeySlot(key->ptr), OBJ_LIST, llen + count, llen);
updateKeysizesHist(c->db, OBJ_LIST, llen + count, llen);
if (llen == 0) {
if (deleted) *deleted = 1;
@ -979,7 +979,7 @@ void ltrimCommand(client *c) {
if (server.memory_tracking_enabled)
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), o, oldsize, kvobjAllocSize(o));
}
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_LIST, llen, llenNew);
updateKeysizesHist(c->db, OBJ_LIST, llen, llenNew);
keyModified(c, c->db, c->argv[1], (llenNew > 0) ? o : NULL, 1);
server.dirty += (ltrim + rtrim);
addReply(c,shared.ok);
@ -1141,7 +1141,7 @@ void lremCommand(client *c) {
if (removed) {
long ll = listTypeLength(subject);
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_LIST, ll + removed, ll);
updateKeysizesHist(c->db, OBJ_LIST, ll + removed, ll);
notifyKeyspaceEvent(NOTIFY_LIST,"lrem",c->argv[1],c->db->id);
if (ll == 0) {
@ -1233,7 +1233,7 @@ void lmoveGenericCommand(client *c, int wherefrom, int whereto) {
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), kvsrc, oldsize, kvobjAllocSize(kvsrc));
lmoveHandlePush(c, c->argv[2], kvdst, value, whereto);
/* Update dst obj cardinality in KEYSIZES */
updateKeysizesHist(c->db, getKeySlot(c->argv[2]->ptr), OBJ_LIST, oldlen, newlen);
updateKeysizesHist(c->db, OBJ_LIST, oldlen, newlen);
/* Update src obj cardinality in KEYSIZES by listElementsRemoved() */
size_t srcsize = server.memory_tracking_enabled ? kvobjAllocSize(kvsrc) : 0;
listElementsRemoved(c, skey, wherefrom, kvsrc, 1, srcsize, 1, NULL);

View file

@ -637,7 +637,7 @@ void saddCommand(client *c) {
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), set, oldsize, kvobjAllocSize(set));
if (added) {
unsigned long size = setTypeSize(set);
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_SET, size - added, size);
updateKeysizesHist(c->db, OBJ_SET, size - added, size);
keyModified(c,c->db,c->argv[1],set,1);
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
}
@ -687,7 +687,7 @@ void sremCommand(client *c) {
c->db->id);
newSize = -1; /* removed */
}
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_SET, oldSize, newSize);
updateKeysizesHist(c->db, OBJ_SET, oldSize, newSize);
server.dirty += deleted;
}
addReplyLongLong(c,deleted);
@ -739,7 +739,7 @@ void smoveCommand(client *c) {
srcNewLen = -1; /* removed */
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
}
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_SET, srcOldLen, srcNewLen);
updateKeysizesHist(c->db, OBJ_SET, srcOldLen, srcNewLen);
/* Create the destination set when it doesn't exist */
if (!dstset) {
@ -755,7 +755,7 @@ void smoveCommand(client *c) {
/* An extra key has changed when ele was successfully added to dstset */
if (setTypeAdd(dstset,ele->ptr)) {
unsigned long dstLen = setTypeSize(dstset);
updateKeysizesHist(c->db, getKeySlot(c->argv[2]->ptr), OBJ_SET, dstLen - 1, dstLen);
updateKeysizesHist(c->db, OBJ_SET, dstLen - 1, dstLen);
server.dirty++;
keyModified(c,c->db,c->argv[2],dstset,1);
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[2],c->db->id);
@ -930,7 +930,7 @@ void spopWithCountCommand(client *c) {
lp = lpBatchDelete(lp, ps, count);
zfree(ps);
set->ptr = lp;
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_SET, size, size - count);
updateKeysizesHist(c->db, OBJ_SET, size, size - count);
if (server.memory_tracking_enabled)
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), set, oldsize, kvobjAllocSize(set));
} else if (remaining*SPOP_MOVE_STRATEGY_MUL > count) {
@ -949,7 +949,7 @@ void spopWithCountCommand(client *c) {
propindex = 2;
}
}
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_SET, size, size - count);
updateKeysizesHist(c->db, OBJ_SET, size, size - count);
if (server.memory_tracking_enabled)
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), set, oldsize, kvobjAllocSize(set));
} else {
@ -1022,7 +1022,7 @@ void spopWithCountCommand(client *c) {
* since function dbReplaceValue() assumes the entire set is being replaced,
* but here we're building the new set from the existing one. As a result,
* the size of the old set has already changed by the time we reach this point. */
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_SET, size, size-count);
updateKeysizesHist(c->db, OBJ_SET, size, size-count);
if (server.memory_tracking_enabled)
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), set, oldsize, kvobjAllocSize(set));
dbReplaceValue(c->db, c->argv[1], &newset, 0);
@ -1066,7 +1066,7 @@ void spopCommand(client *c) {
if (kv == NULL || checkType(c, kv, OBJ_SET)) return;
size = setTypeSize(kv);
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_SET, size, size-1);
updateKeysizesHist(c->db, OBJ_SET, size, size-1);
if (server.memory_tracking_enabled)
oldsize = kvobjAllocSize(kv);

View file

@ -627,7 +627,7 @@ void setrangeCommand(client *c) {
kv = dbUnshareStringValueByLink(c->db, c->argv[1], kv, link);
newLen = max(oldLen, (int64_t) (offset + value_len));
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_STRING, oldLen, newLen);
updateKeysizesHist(c->db, OBJ_STRING, oldLen, newLen);
}
if (value_len > 0) {
@ -844,8 +844,7 @@ void incrDecrCommand(client *c, long long incr) {
{
new = o;
o->ptr = (void*)((long)value);
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr),
OBJ_STRING,
updateKeysizesHist(c->db, OBJ_STRING,
(int64_t) sdigits10(oldvalue),
(int64_t) sdigits10(value));
} else {
@ -958,7 +957,7 @@ void appendCommand(client *c) {
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), o, oldsize, kvobjAllocSize(o));
totlen = sdslen(o->ptr);
int64_t oldlen = totlen - append_len;
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_STRING, oldlen, totlen);
updateKeysizesHist(c->db, OBJ_STRING, oldlen, totlen);
}
keyModified(c,c->db,c->argv[1],o,1);
notifyKeyspaceEvent(NOTIFY_STRING,"append",c->argv[1],c->db->id);

View file

@ -2078,7 +2078,7 @@ void zaddGenericCommand(client *c, int flags) {
server.dirty += (added+updated);
if (server.memory_tracking_enabled)
updateSlotAllocSize(c->db, getKeySlot(key->ptr), zobj, oldsize, kvobjAllocSize(zobj));
updateKeysizesHist(c->db, getKeySlot(key->ptr), OBJ_ZSET, llen, llen+added);
updateKeysizesHist(c->db, OBJ_ZSET, llen, llen+added);
reply_to_client:
if (incr) { /* ZINCRBY or INCR option. */
@ -2146,7 +2146,7 @@ void zremCommand(client *c) {
newlen = -1; /* means key got deleted */
}
updateKeysizesHist(c->db, getKeySlot(key->ptr), OBJ_ZSET, oldlen, newlen);
updateKeysizesHist(c->db, OBJ_ZSET, oldlen, newlen);
keyModified(c, c->db, key, keyremoved ? NULL : zobj, 1);
server.dirty += deleted;
}
@ -2278,7 +2278,7 @@ void zremrangeGenericCommand(client *c, zrange_type rangetype) {
newlen = zsetLength(zobj);
oldlen = newlen + deleted;
}
updateKeysizesHist(c->db, getKeySlot(key->ptr), OBJ_ZSET, oldlen, newlen);
updateKeysizesHist(c->db, OBJ_ZSET, oldlen, newlen);
}
server.dirty += deleted;
addReplyLongLong(c,deleted);
@ -4342,7 +4342,7 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey
newlen = -1;
}
updateKeysizesHist(c->db, getKeySlot(key->ptr), OBJ_ZSET, oldlen, newlen);
updateKeysizesHist(c->db, OBJ_ZSET, oldlen, newlen);
keyModified(c, c->db, key, (newlen > 0) ? zobj : NULL, 1);
if (c->cmd->proc == zmpopCommand) {

View file

@ -112,7 +112,7 @@ proc asm_all_instances_idle {total} {
proc wait_for_asm_done {} {
set total_instances [expr {$::cluster_master_nodes + $::cluster_replica_nodes}]
wait_for_condition 1000 10 {
wait_for_condition 3000 10 {
[asm_all_instances_idle $total_instances] == 1
} else {
# Print the number of active tasks on each instance

View file

@ -201,14 +201,16 @@ proc test_all_keysizes { {replMode 0} } {
run_cmd_verify_hist {$server FLUSHALL} {}
# PFADD (sparse & dense)
for {set i 1} {$i <= 3000} {incr i} {
run_cmd_verify_hist {$server PFADD hll1 a$i b$i c$i} {__EVAL_DB_HIST__ 0}
run_cmd_verify_hist {$server PFADD hll2 x$i y$i z$i} {__EVAL_DB_HIST__ 0}
$server PFADD hll1 a$i b$i c$i
$server PFADD hll2 x$i y$i z$i
run_cmd_verify_hist {} {__EVAL_DB_HIST__ 0}
}
# PFMERGE, PFCOUNT (sparse & dense)
for {set i 1} {$i <= 3000} {incr i} {
run_cmd_verify_hist {$server PFADD hll3 x$i y$i z$i} {__EVAL_DB_HIST__ 0}
run_cmd_verify_hist {$server PFMERGE hll4 hll1 hll2 hll3} {__EVAL_DB_HIST__ 0}
run_cmd_verify_hist {$server PFCOUNT hll1 hll2 hll3 hll4} {__EVAL_DB_HIST__ 0}
$server PFADD hll3 x$i y$i z$i
$server PFMERGE hll4 hll1 hll2 hll3
$server PFCOUNT hll1 hll2 hll3 hll4
run_cmd_verify_hist {} {__EVAL_DB_HIST__ 0}
}
# DEL
run_cmd_verify_hist {$server DEL hll4} {__EVAL_DB_HIST__ 0}
@ -242,8 +244,8 @@ proc test_all_keysizes { {replMode 0} } {
# RPOP
run_cmd_verify_hist {$server RPOP l1} {db0_LIST:4=1,8=1}
run_cmd_verify_hist {$server RPOP l1} {db0_LIST:4=2}
# DEL
run_cmd_verify_hist {$server DEL l1} {db0_LIST:4=1}
# DEL
run_cmd_verify_hist {$server DEL l1} {db0_LIST:4=1}
# LINSERT, LTRIM
run_cmd_verify_hist {$server RPUSH l3 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14} {db0_LIST:4=1,8=1}
run_cmd_verify_hist {$server LINSERT l3 AFTER 9 10} {db0_LIST:4=1,16=1}
@ -420,7 +422,7 @@ proc test_all_keysizes { {replMode 0} } {
} {} {cluster:skip}
test "KEYSIZES - Test STRING $suffixRepl" {
test "KEYSIZES - Test STRING $suffixRepl" {
# SETRANGE
run_cmd_verify_hist {$server FLUSHALL} {}
run_cmd_verify_hist {$server SET s2 1234567890} {db0_STR:8=1}
@ -464,7 +466,42 @@ proc test_all_keysizes { {replMode 0} } {
run_cmd_verify_hist {$server APPEND s2 y} {db0_STR:1=2}
} {} {cluster:skip}
test "KEYSIZES - Test UNLINK (async deletion) $suffixRepl" {
# UNLINK on STRING
run_cmd_verify_hist {$server FLUSHALL} {}
run_cmd_verify_hist {$server SET s1 1234567890} {db0_STR:8=1}
run_cmd_verify_hist {$server UNLINK s1} {} 1
# UNLINK on LIST
run_cmd_verify_hist {$server FLUSHALL} {}
run_cmd_verify_hist {$server RPUSH l1 1 2 3 4 5 6 7 8} {db0_LIST:8=1}
run_cmd_verify_hist {$server UNLINK l1} {} 1
# UNLINK on SET
run_cmd_verify_hist {$server FLUSHALL} {}
run_cmd_verify_hist {$server SADD s1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16} {db0_SET:16=1}
run_cmd_verify_hist {$server UNLINK s1} {} 1
# UNLINK on ZSET
run_cmd_verify_hist {$server FLUSHALL} {}
run_cmd_verify_hist {$server ZADD z1 1 a 2 b 3 c 4 d 5 e 6 f 7 g 8 h} {db0_ZSET:8=1}
run_cmd_verify_hist {$server UNLINK z1} {} 1
# UNLINK on HASH
run_cmd_verify_hist {$server FLUSHALL} {}
run_cmd_verify_hist {$server HSET h1 f1 v1 f2 v2 f3 v3 f4 v4} {db0_HASH:4=1}
run_cmd_verify_hist {$server UNLINK h1} {} 1
# UNLINK multiple keys of different types
run_cmd_verify_hist {$server FLUSHALL} {}
run_cmd_verify_hist {$server SET s1 12345678} {db0_STR:8=1}
run_cmd_verify_hist {$server RPUSH l1 1 2 3 4} {db0_STR:8=1 db0_LIST:4=1}
run_cmd_verify_hist {$server SADD set1 a b c d e f g h} {db0_STR:8=1 db0_LIST:4=1 db0_SET:8=1}
run_cmd_verify_hist {$server ZADD z1 1 x 2 y} {db0_STR:8=1 db0_LIST:4=1 db0_SET:8=1 db0_ZSET:2=1}
run_cmd_verify_hist {$server UNLINK s1 l1 set1 z1} {} 1
} {} {cluster:skip}
test "KEYSIZES - Test complex dataset $suffixRepl" {
run_cmd_verify_hist {$server FLUSHALL} {}
createComplexDataset $server 1000

View file

@ -296,7 +296,7 @@ start_server {tags {"external:skip needs:debug"}} {
test "HPEXPIRETIME persists after RDB reload ($type)" {
r del myhash
r hset myhash field1 value1 field2 value2
r hpexpire myhash 150 NX FIELDS 1 field1
r hpexpire myhash 300 NX FIELDS 1 field1
set before [r HPEXPIRETIME myhash FIELDS 1 field1]
r debug reload
set after [r HPEXPIRETIME myhash FIELDS 1 field1]