Reduce memory allocation overhead (#15096)

While profiling command execution, I noticed that command argv object
alloc/free overhead is quite high for workloads with many small
arguments (e.g. `HSET` with many fields). The effect is much more
visible with pipelining when Redis becomes CPU bound.

I experimented with replacing argv object alloc/free with a simple
object pool and saw significant speedups.
(Note: related effort around this topic:
https://github.com/redis/redis/pull/13726)

In this PR, I tried to improve the main hotspots in the memory
allocation path (focusing on command arg allocations) to close the gap
with custom pool performance, so we can avoid having a dedicated memory
pools and let the whole codebase benefit from these optimizations.

## Changes

### 1) Faster dealloc via passing size hint to jemalloc (separate PR
#15071)
Jemalloc does more work than an object pool on free (a lookup on a tree
to find the allocation's size class). For some deallocations, we can
reduce free path overhead by passing a size hint to jemalloc (i.e.
`sdallocx()`) which can skip metadata lookup in the common case. This PR
introduces `zfree_with_size()` and uses it where we can know the
allocation size i.e. `OBJ_ENCODING_EMBSTR` objects in `decrRefCount()`
and SDS free path.

### 2) Reduce atomic operation cost for stat updates
`update_zmalloc_stat_alloc()` / `update_zmalloc_stat_free()` previously
used atomic read-modify-write (RMW) operations (`atomicIncrGet` /
`atomicDecr`) which can emit expensive locked instructions on x86.

When we can guarantee a single writer to a counter, we can use a cheaper
load+add+store sequence instead of a locked RMW. This PR gives the first
16 threads dedicated slots for used_memory stats (intended to cover the
main thread/ I/O threads) so they can use this single writer fast path.
Threads beyond that fall back to a shared pool and continue to use full
atomic RMW.

### 3) Improve jemalloc tcache hit rate 

With the default `lookahead=16` config, a pipelined HSET with ~20 fields
does ~40 small allocations per command (fields + values), so you can get
16 x 40 = ~640 allocations. When args are small, many of these land in
the 32 byte size class (often `EMBSTR`). Jemalloc’s default per-bin
tcache cap is 200, so this kind of burst overflows the cache and it does
frequent flushes. I raised the small-bin tcache limits
(lg_tcache_nslots_mul:3, tcache_nslots_small_max:1000) to handle these
bursts better. In the worst case, tcache may have a higher memory usage
due to this change. Perhaps, another option was lowering `lookahead` to
tune it differently.

### 4) Inlining
When you have a simple pool, it has a few small functions and it is easy
for compiler to inline them. Compared to that, jemalloc alloc/free path
has a deeper call stack. Also, jemalloc was not compiled with `-flto`
which was preventing inlining jemalloc functions. As part of this PR, I
added `-flto` flag to jemalloc when it is enabled for Redis.

Compiler also chooses not to inline some hot path functions in Redis.
This suggests PGO (profile-guided optimization) could provide additional
wins and perhaps we can start experimenting with it sometime. We could
try to force inlining with attributes like `always_inline` but it is
hard to apply across a deep call stack and misuse can cause code bloat.
So, rather than going in this direction, I added `inline` keyword to
some functions for now. This doesn't make compiler to inline all hot
path functions but at least it is a step ahead. (If we can further
improve this in future, performance gets very close to custom memory
pool implementation).

## Benchmark results

Commands were like:

```
memtier_benchmark   --command="HSET __key__ username john_doe email john@example.com password hashed_pwd_123 created_at 1709125200 updated_at 1709125200 first_name John last_name Doe phone_number +1234567890 address 123_Main_St city NewYork country USA postal_code 10001 company Acme_Corp job_title Engineer bio Loves_coding"   --command-ratio=1   --command-key-pattern=P   --key-prefix="hsetkey"   --key-minimum=1   --key-maximum=100000   -n 1000000   -c 50   -t 2   --hide-histogram --pipeline 50
```

| Benchmark | Improvement |
| --- | ---: |
| SET | +0% |
| SET (pipeline) | +8% |
| HSET 15 fields | +2% |
| HSET 15 fields (pipeline) | +17% |
| ZADD 15 elements| +3% |
| ZADD 15 elements (pipeline) | +15% |
This commit is contained in:
Ozan Tezcan 2026-05-09 11:48:45 +03:00 committed by GitHub
parent 7cf63635f0
commit 7bdab45ff1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 144 additions and 26 deletions

4
deps/Makefile vendored
View file

@ -129,8 +129,8 @@ lua: .make-prerequisites
.PHONY: lua
JEMALLOC_CFLAGS=$(CFLAGS)
JEMALLOC_LDFLAGS=$(LDFLAGS)
JEMALLOC_CFLAGS=$(ENABLE_LTO) $(CFLAGS)
JEMALLOC_LDFLAGS=$(ENABLE_LTO) $(LDFLAGS)
ifneq ($(DEB_HOST_GNU_TYPE),)
JEMALLOC_CONFIGURE_OPTS += --host=$(DEB_HOST_GNU_TYPE)

View file

@ -25,12 +25,14 @@ CLANG := $(findstring clang,$(shell sh -c '$(CC) --version | head -1'))
# some automatic defaults are added to it. To specify optimization flags
# explicitly without any defaults added, pass the OPT variable instead.
OPTIMIZATION?=-O3
ENABLE_LTO?=
ifeq ($(OPTIMIZATION),-O3)
ifeq (clang,$(CLANG))
OPTIMIZATION+=-flto
ENABLE_LTO=-flto
else
OPTIMIZATION+=-flto=auto
ENABLE_LTO=-flto=auto
endif
OPTIMIZATION+=$(ENABLE_LTO)
endif
ifneq ($(OPTIMIZATION),-O0)
OPTIMIZATION+=-fno-omit-frame-pointer
@ -423,7 +425,7 @@ persist-settings: distclean
echo REDIS_LDFLAGS=$(REDIS_LDFLAGS) >> .make-settings
echo PREV_FINAL_CFLAGS=$(FINAL_CFLAGS) >> .make-settings
echo PREV_FINAL_LDFLAGS=$(FINAL_LDFLAGS) >> .make-settings
-(cd ../deps && $(MAKE) $(DEPENDENCY_TARGETS))
-(cd ../deps && $(MAKE) $(DEPENDENCY_TARGETS) ENABLE_LTO="$(ENABLE_LTO)")
.PHONY: persist-settings

View file

@ -183,4 +183,23 @@
#error "Unable to determine atomic operations for your platform"
#endif
/* atomicIncrGetSingleWriter(var, delta, newvalue_var)
*
* Adds `delta` to `var` and writes the resulting value to `newvalue_var`.
* Same end result as atomicIncrGet() but implemented as load+add+store instead
* of an atomic read-modify-write. This avoids the `lock` prefix on x86
* (~20-40 cycles vs ~2-3 for plain load+store).
*
* SAFETY: the caller MUST guarantee that no other thread ever writes to `var`
* (no atomicIncr, no atomicSet, no other call to this macro from a different
* thread). Concurrent writers cause silent lost updates. Readers on other
* threads using atomicGet are fine: they will observe either the pre or
* post update value. */
#define atomicIncrGetSingleWriter(var, delta, newvalue_var) do { \
atomicGet((var), (newvalue_var)); \
(newvalue_var) += (delta); \
atomicSet((var), (newvalue_var)); \
} while(0)
#endif /* __ATOMIC_VAR_H */

View file

@ -859,6 +859,8 @@ int IOThreadCron(struct aeEventLoop *eventLoop, long long id, void *clientData)
* and IO thread will communicate through event notifier. */
void *IOThreadMain(void *ptr) {
IOThread *t = ptr;
/* Claim a reserved used_memory accounting slot before any allocation. */
zmalloc_register_reserved_slot();
char thdname[16];
snprintf(thdname, sizeof(thdname), "io_thd_%d", t->id);
redis_set_thread_title(thdname);

View file

@ -36,7 +36,7 @@ static inline int _clientHasPendingRepliesNonSlave(client *c);
static inline int _writeToClientNonSlave(client *c, ssize_t *nwritten);
static inline int _writeToClientSlave(client *c, ssize_t *nwritten);
static pendingCommand *acquirePendingCommand(void);
static void reclaimPendingCommand(client *c, pendingCommand *pcmd);
static inline void reclaimPendingCommand(client *c, pendingCommand *pcmd);
static size_t getClientOutputBufferLogicalSize(client *c);
int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */
@ -1813,7 +1813,7 @@ void freeClientArgv(client *c) {
freeClientArgvInternal(c, 1);
}
void freeClientPendingCommands(client *c, int num_pcmds_to_free) {
static inline void freeClientPendingCommands(client *c, int num_pcmds_to_free) {
/* (-1) means free all pending commands */
if (num_pcmds_to_free == -1)
num_pcmds_to_free = c->pending_cmds.len;
@ -5708,7 +5708,7 @@ static int tryExpandPendingCommandPool(void) {
* The shared pool is only used when IO threads are inactive to avoid race conditions
* between multiple clients. Additionally, pool reuse provides minimal benefit in
* multi-threaded scenarios, so we only use it in single-threaded mode. */
static void reclaimPendingCommand(client *c, pendingCommand *pcmd) {
static inline void reclaimPendingCommand(client *c, pendingCommand *pcmd) {
if (!server.io_threads_active) {
/* Try to add to shared pool for reuse if argv isn't too large */
if (likely(pcmd->argv_len < 64)) {

View file

@ -218,7 +218,7 @@ static kvobj *kvobjCreateEmbedString(const char *val_ptr, size_t val_len,
* | robj (16) | key-hdr-size (1) | sdshdr8 "myvalue" \0 (11) |
* +-----------+------------------+----------------------------+
*/
robj *createEmbeddedStringObject(const char *val_ptr, size_t val_len) {
static inline robj *createEmbeddedStringObject(const char *val_ptr, size_t val_len) {
/* Calculate size for embedded value (always SDS_TYPE_8) */
size_t val_sds_size = sdsReqSize(val_len, SDS_TYPE_8);
@ -635,6 +635,14 @@ void decrRefCount(robj *o) {
}
if (--(o->refcount) == 0) {
/* Fast path for embedded strings: no inner allocation to free, and we
* can compute the alloc size to hint jemalloc for a faster deallocation. */
if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_EMBSTR && !o->iskvobj) {
serverAssert(sdsType(o->ptr) == SDS_TYPE_8); /* embstr always type_8 */
zfree_with_size(o, sizeof(robj) + sdsAllocSize(o->ptr));
return;
}
void *alloc = o;
if (o->iskvobj) {

View file

@ -8114,6 +8114,10 @@ int main(int argc, char **argv) {
}
if (server.sentinel_mode) sentinelCheckConfigFile();
/* Reserve dedicated used_memory slots for main + IO threads (single-writer
* fast path). See zmalloc_reserve_thread_slots(). */
zmalloc_reserve_thread_slots(server.io_threads_num);
/* Do system checks */
#ifdef __linux__
linuxMemoryWarnings();

View file

@ -3161,7 +3161,6 @@ void resetClient(client *c, int num_pcmds_to_free);
void resetClientQbufState(client *c);
void freeClientOriginalArgv(client *c);
void freeClientArgv(client *c);
void freeClientPendingCommands(client *c, int num_pcmds_to_free);
void tryDeferFreeClientObject(client *c, int type, void *ptr);
void freeClientDeferredObjects(client *c, int free_array);
void freeClientIODeferredObjects(client *c, int free_array);

View file

@ -80,10 +80,27 @@ void je_free_with_usize(void *ptr, size_t *usize);
#define realloc_with_usize(ptr,size,old_usize,new_usize) je_realloc_with_usize(ptr,size,old_usize,new_usize)
#define free_with_usize(ptr,usize) je_free_with_usize(ptr,usize)
#endif
/* Compile-time jemalloc tuning: raise per-bin tcache limits for small size
* classes so bursts of same size small allocations don't spill into the
* arena which reduces performance.
*
* lg_tcache_nslots_mul:3 default slot count log2 multiplier: 1 (2x) 3 (8x).
* tcache_nslots_small_max:1000 per-bin hard cap 200 -> 1000.
*/
const char *je_malloc_conf =
"lg_tcache_nslots_mul:3,tcache_nslots_small_max:1000";
#endif
#define MAX_THREADS 16 /* Keep it a power of 2 so we can use '&' instead of '%'. */
#define THREAD_MASK (MAX_THREADS - 1)
/* Per-thread memory accounting slots. The first DEDICATED_ENTRIES threads
* (typically the main thread plus io threads) each get a private slot and can
* use the cheap single-writer atomic operation (plain load+store).
* Threads beyond that share a pool hashed by thread index and pay the cost of
* a full atomic RMW. */
#define DEDICATED_ENTRIES 8
#define SHARED_ENTRIES 8 /* Must be a power of 2 for modulo */
#define SHARED_ENTRIES_MASK (SHARED_ENTRIES - 1)
#define MAX_ENTRIES (DEDICATED_ENTRIES + SHARED_ENTRIES)
#define PEAK_CHECK_THRESHOLD (1024 * 100) /* 100KB */
typedef struct used_memory_entry {
@ -92,7 +109,7 @@ typedef struct used_memory_entry {
char padding[CACHE_LINE_SIZE - sizeof(long long) - sizeof(long long)];
} used_memory_entry;
static __attribute__((aligned(CACHE_LINE_SIZE))) used_memory_entry used_memory[MAX_THREADS];
static __attribute__((aligned(CACHE_LINE_SIZE))) used_memory_entry used_memory[MAX_ENTRIES];
static redisAtomic size_t num_active_threads = 0;
static redisAtomic size_t zmalloc_peak = 0;
static redisAtomic time_t zmalloc_peak_time = 0;
@ -100,19 +117,75 @@ static __thread long my_thread_index = -1;
static inline void init_my_thread_index(void) {
if (unlikely(my_thread_index == -1)) {
atomicGetIncr(num_active_threads, my_thread_index, 1);
my_thread_index &= THREAD_MASK;
long idx;
atomicGetIncr(num_active_threads, idx, 1);
if (idx < DEDICATED_ENTRIES) {
my_thread_index = idx;
} else {
/* Overflow threads share the shared pool entries (atomic RMW). */
my_thread_index = DEDICATED_ENTRIES + (idx & SHARED_ENTRIES_MASK);
}
}
}
static void update_zmalloc_stat_alloc(long long bytes_delta) {
/* Pre-advance the thread index counter so reserved threads that call
* zmalloc_register_reserved_thread() can claim dedicated used_memory accounting
* slots. Must be called once by main() before any other thread can allocate via
* zmalloc(), otherwise background threads could auto-register into the
* dedicated range. See DEDICATED_ENTRIES comment for details. */
void zmalloc_reserve_thread_slots(int n) {
assert(n >= 1);
size_t cur;
atomicGet(num_active_threads, cur);
assert((my_thread_index == -1 && cur == 0) ||
(my_thread_index == 0 && cur == 1));
if (my_thread_index == -1) my_thread_index = 0; /* claim entry 0 for main thread */
atomicSet(num_active_threads, (size_t)n);
}
/* A reserved thread, e.g. an IO thread, calls this once at startup, before its
* first allocation. Claims the next dedicated slot via a private atomic counter,
* falls back to the shared pool if all dedicated slots have been taken. */
void zmalloc_register_reserved_slot(void) {
assert(my_thread_index == -1);
static redisAtomic int reserved_slot_counter = 1; /* Slot 0 is reserved for main thread. */
int slot;
atomicGetIncr(reserved_slot_counter, slot, 1);
if (slot < DEDICATED_ENTRIES) {
size_t reserved;
atomicGet(num_active_threads, reserved);
assert((size_t)slot < reserved);
my_thread_index = slot;
} else {
my_thread_index = DEDICATED_ENTRIES + (slot & SHARED_ENTRIES_MASK);
}
}
static inline long long update_used_memory_entry(used_memory_entry *entry, long long bytes_delta) {
long long thread_used;
if (my_thread_index < DEDICATED_ENTRIES) {
/* Dedicated slot: single writer, plain load+store (no lock prefix). */
atomicIncrGetSingleWriter(entry->used_memory, bytes_delta, thread_used);
} else {
/* Shared pool slots: multiple writers, atomic RMW required. */
atomicIncrGet(entry->used_memory, thread_used, bytes_delta);
}
return thread_used;
}
static inline void update_zmalloc_stat_alloc(long long bytes_delta) {
init_my_thread_index();
/* Per-thread allocation counter and the last counter value at which we ran a
* global peak check (throttles how often we call zmalloc_used_memory()). */
long long thread_used, thread_last_peak_check_used;
atomicIncrGet(used_memory[my_thread_index].used_memory, thread_used, bytes_delta);
atomicGet(used_memory[my_thread_index].last_peak_check, thread_last_peak_check_used);
used_memory_entry *entry = &used_memory[my_thread_index];
long long thread_used = update_used_memory_entry(entry, bytes_delta);
long long thread_last_peak_check_used;
atomicGet(entry->last_peak_check, thread_last_peak_check_used);
/* Only run the (expensive) global used/peak check after this thread's
* allocation counter has advanced enough since the last check. */
@ -143,13 +216,13 @@ static void update_zmalloc_stat_alloc(long long bytes_delta) {
/* Record the thread counter value at which we last ran a global peak check,
* to throttle future checks for this thread. */
atomicSet(used_memory[my_thread_index].last_peak_check, thread_used);
atomicSet(entry->last_peak_check, thread_used);
}
}
static void update_zmalloc_stat_free(long long num) {
static inline void update_zmalloc_stat_free(long long num) {
init_my_thread_index();
atomicDecr(used_memory[my_thread_index].used_memory, num);
update_used_memory_entry(&used_memory[my_thread_index], -num);
}
static void zmalloc_default_oom(size_t size) {
@ -586,8 +659,8 @@ size_t zmalloc_used_memory(void) {
size_t local_num_active_threads;
long long total_mem = 0;
atomicGet(num_active_threads,local_num_active_threads);
if (local_num_active_threads > MAX_THREADS) {
local_num_active_threads = MAX_THREADS;
if (local_num_active_threads > MAX_ENTRIES) {
local_num_active_threads = MAX_ENTRIES;
}
for (size_t i = 0; i < local_num_active_threads; ++i) {
long long thread_used_mem;

View file

@ -114,6 +114,8 @@ void *ztryrealloc_usable(void *ptr, size_t size, size_t *usable, size_t *old_usa
__attribute__((malloc)) char *zstrdup(const char *s);
__attribute__((malloc)) char *zstrdup_usable(const char *s, size_t *usable);
size_t zmalloc_used_memory(void);
void zmalloc_reserve_thread_slots(int n);
void zmalloc_register_reserved_slot(void);
size_t zmalloc_get_peak_memory(void);
time_t zmalloc_get_peak_memory_time(void);
void zmalloc_set_oom_handler(void (*oom_handler)(size_t));

View file

@ -30,6 +30,15 @@ start_server {tags {"other"}} {
assert_equal {OK} [r memory purge]
}
}
test {je_malloc_conf compile-time tuning is active} {
# Verify je_malloc_conf in src/zmalloc.c overrides jemalloc defaults:
# (tcache_nslots_small_max: 200, lg_tcache_nslots_mul: 1).
if {[string match {*jemalloc*} [s mem_allocator]]} {
assert_equal 1000 [r debug mallctl opt.tcache_nslots_small_max]
assert_equal 3 [r debug mallctl opt.lg_tcache_nslots_mul]
}
} {} {needs:debug}
test {SAVE - make sure there are all the types as values} {
# Wait for a background saving in progress to terminate