diff --git a/deps/Makefile b/deps/Makefile index 60e0e569e..ef6168bbd 100644 --- a/deps/Makefile +++ b/deps/Makefile @@ -129,8 +129,8 @@ lua: .make-prerequisites .PHONY: lua -JEMALLOC_CFLAGS=$(CFLAGS) -JEMALLOC_LDFLAGS=$(LDFLAGS) +JEMALLOC_CFLAGS=$(ENABLE_LTO) $(CFLAGS) +JEMALLOC_LDFLAGS=$(ENABLE_LTO) $(LDFLAGS) ifneq ($(DEB_HOST_GNU_TYPE),) JEMALLOC_CONFIGURE_OPTS += --host=$(DEB_HOST_GNU_TYPE) diff --git a/src/Makefile b/src/Makefile index bb69f5dae..cf0395d1c 100644 --- a/src/Makefile +++ b/src/Makefile @@ -25,12 +25,14 @@ CLANG := $(findstring clang,$(shell sh -c '$(CC) --version | head -1')) # some automatic defaults are added to it. To specify optimization flags # explicitly without any defaults added, pass the OPT variable instead. OPTIMIZATION?=-O3 +ENABLE_LTO?= ifeq ($(OPTIMIZATION),-O3) ifeq (clang,$(CLANG)) - OPTIMIZATION+=-flto + ENABLE_LTO=-flto else - OPTIMIZATION+=-flto=auto + ENABLE_LTO=-flto=auto endif + OPTIMIZATION+=$(ENABLE_LTO) endif ifneq ($(OPTIMIZATION),-O0) OPTIMIZATION+=-fno-omit-frame-pointer @@ -423,7 +425,7 @@ persist-settings: distclean echo REDIS_LDFLAGS=$(REDIS_LDFLAGS) >> .make-settings echo PREV_FINAL_CFLAGS=$(FINAL_CFLAGS) >> .make-settings echo PREV_FINAL_LDFLAGS=$(FINAL_LDFLAGS) >> .make-settings - -(cd ../deps && $(MAKE) $(DEPENDENCY_TARGETS)) + -(cd ../deps && $(MAKE) $(DEPENDENCY_TARGETS) ENABLE_LTO="$(ENABLE_LTO)") .PHONY: persist-settings diff --git a/src/atomicvar.h b/src/atomicvar.h index 3c332ee69..43227639b 100644 --- a/src/atomicvar.h +++ b/src/atomicvar.h @@ -183,4 +183,23 @@ #error "Unable to determine atomic operations for your platform" #endif + +/* atomicIncrGetSingleWriter(var, delta, newvalue_var) + * + * Adds `delta` to `var` and writes the resulting value to `newvalue_var`. + * Same end result as atomicIncrGet() but implemented as load+add+store instead + * of an atomic read-modify-write. This avoids the `lock` prefix on x86 + * (~20-40 cycles vs ~2-3 for plain load+store). + * + * SAFETY: the caller MUST guarantee that no other thread ever writes to `var` + * (no atomicIncr, no atomicSet, no other call to this macro from a different + * thread). Concurrent writers cause silent lost updates. Readers on other + * threads using atomicGet are fine: they will observe either the pre or + * post update value. */ +#define atomicIncrGetSingleWriter(var, delta, newvalue_var) do { \ + atomicGet((var), (newvalue_var)); \ + (newvalue_var) += (delta); \ + atomicSet((var), (newvalue_var)); \ +} while(0) + #endif /* __ATOMIC_VAR_H */ diff --git a/src/iothread.c b/src/iothread.c index 981edb951..73919cce1 100644 --- a/src/iothread.c +++ b/src/iothread.c @@ -859,6 +859,8 @@ int IOThreadCron(struct aeEventLoop *eventLoop, long long id, void *clientData) * and IO thread will communicate through event notifier. */ void *IOThreadMain(void *ptr) { IOThread *t = ptr; + /* Claim a reserved used_memory accounting slot before any allocation. */ + zmalloc_register_reserved_slot(); char thdname[16]; snprintf(thdname, sizeof(thdname), "io_thd_%d", t->id); redis_set_thread_title(thdname); diff --git a/src/networking.c b/src/networking.c index 5c0b87e8f..0030078e7 100644 --- a/src/networking.c +++ b/src/networking.c @@ -36,7 +36,7 @@ static inline int _clientHasPendingRepliesNonSlave(client *c); static inline int _writeToClientNonSlave(client *c, ssize_t *nwritten); static inline int _writeToClientSlave(client *c, ssize_t *nwritten); static pendingCommand *acquirePendingCommand(void); -static void reclaimPendingCommand(client *c, pendingCommand *pcmd); +static inline void reclaimPendingCommand(client *c, pendingCommand *pcmd); static size_t getClientOutputBufferLogicalSize(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ @@ -1813,7 +1813,7 @@ void freeClientArgv(client *c) { freeClientArgvInternal(c, 1); } -void freeClientPendingCommands(client *c, int num_pcmds_to_free) { +static inline void freeClientPendingCommands(client *c, int num_pcmds_to_free) { /* (-1) means free all pending commands */ if (num_pcmds_to_free == -1) num_pcmds_to_free = c->pending_cmds.len; @@ -5708,7 +5708,7 @@ static int tryExpandPendingCommandPool(void) { * The shared pool is only used when IO threads are inactive to avoid race conditions * between multiple clients. Additionally, pool reuse provides minimal benefit in * multi-threaded scenarios, so we only use it in single-threaded mode. */ -static void reclaimPendingCommand(client *c, pendingCommand *pcmd) { +static inline void reclaimPendingCommand(client *c, pendingCommand *pcmd) { if (!server.io_threads_active) { /* Try to add to shared pool for reuse if argv isn't too large */ if (likely(pcmd->argv_len < 64)) { diff --git a/src/object.c b/src/object.c index 3af9e6c6c..44778014b 100644 --- a/src/object.c +++ b/src/object.c @@ -218,7 +218,7 @@ static kvobj *kvobjCreateEmbedString(const char *val_ptr, size_t val_len, * | robj (16) | key-hdr-size (1) | sdshdr8 "myvalue" \0 (11) | * +-----------+------------------+----------------------------+ */ -robj *createEmbeddedStringObject(const char *val_ptr, size_t val_len) { +static inline robj *createEmbeddedStringObject(const char *val_ptr, size_t val_len) { /* Calculate size for embedded value (always SDS_TYPE_8) */ size_t val_sds_size = sdsReqSize(val_len, SDS_TYPE_8); @@ -635,6 +635,14 @@ void decrRefCount(robj *o) { } if (--(o->refcount) == 0) { + /* Fast path for embedded strings: no inner allocation to free, and we + * can compute the alloc size to hint jemalloc for a faster deallocation. */ + if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_EMBSTR && !o->iskvobj) { + serverAssert(sdsType(o->ptr) == SDS_TYPE_8); /* embstr always type_8 */ + zfree_with_size(o, sizeof(robj) + sdsAllocSize(o->ptr)); + return; + } + void *alloc = o; if (o->iskvobj) { diff --git a/src/server.c b/src/server.c index 87ef3066c..eb110dfbd 100644 --- a/src/server.c +++ b/src/server.c @@ -8114,6 +8114,10 @@ int main(int argc, char **argv) { } if (server.sentinel_mode) sentinelCheckConfigFile(); + /* Reserve dedicated used_memory slots for main + IO threads (single-writer + * fast path). See zmalloc_reserve_thread_slots(). */ + zmalloc_reserve_thread_slots(server.io_threads_num); + /* Do system checks */ #ifdef __linux__ linuxMemoryWarnings(); diff --git a/src/server.h b/src/server.h index 123201869..59d653484 100644 --- a/src/server.h +++ b/src/server.h @@ -3161,7 +3161,6 @@ void resetClient(client *c, int num_pcmds_to_free); void resetClientQbufState(client *c); void freeClientOriginalArgv(client *c); void freeClientArgv(client *c); -void freeClientPendingCommands(client *c, int num_pcmds_to_free); void tryDeferFreeClientObject(client *c, int type, void *ptr); void freeClientDeferredObjects(client *c, int free_array); void freeClientIODeferredObjects(client *c, int free_array); diff --git a/src/zmalloc.c b/src/zmalloc.c index d32d7309f..5b84ccb07 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -80,10 +80,27 @@ void je_free_with_usize(void *ptr, size_t *usize); #define realloc_with_usize(ptr,size,old_usize,new_usize) je_realloc_with_usize(ptr,size,old_usize,new_usize) #define free_with_usize(ptr,usize) je_free_with_usize(ptr,usize) #endif + +/* Compile-time jemalloc tuning: raise per-bin tcache limits for small size + * classes so bursts of same size small allocations don't spill into the + * arena which reduces performance. + * + * lg_tcache_nslots_mul:3 default slot count log2 multiplier: 1 (2x) → 3 (8x). + * tcache_nslots_small_max:1000 per-bin hard cap 200 -> 1000. + */ +const char *je_malloc_conf = + "lg_tcache_nslots_mul:3,tcache_nslots_small_max:1000"; #endif -#define MAX_THREADS 16 /* Keep it a power of 2 so we can use '&' instead of '%'. */ -#define THREAD_MASK (MAX_THREADS - 1) +/* Per-thread memory accounting slots. The first DEDICATED_ENTRIES threads + * (typically the main thread plus io threads) each get a private slot and can + * use the cheap single-writer atomic operation (plain load+store). + * Threads beyond that share a pool hashed by thread index and pay the cost of + * a full atomic RMW. */ +#define DEDICATED_ENTRIES 8 +#define SHARED_ENTRIES 8 /* Must be a power of 2 for modulo */ +#define SHARED_ENTRIES_MASK (SHARED_ENTRIES - 1) +#define MAX_ENTRIES (DEDICATED_ENTRIES + SHARED_ENTRIES) #define PEAK_CHECK_THRESHOLD (1024 * 100) /* 100KB */ typedef struct used_memory_entry { @@ -92,7 +109,7 @@ typedef struct used_memory_entry { char padding[CACHE_LINE_SIZE - sizeof(long long) - sizeof(long long)]; } used_memory_entry; -static __attribute__((aligned(CACHE_LINE_SIZE))) used_memory_entry used_memory[MAX_THREADS]; +static __attribute__((aligned(CACHE_LINE_SIZE))) used_memory_entry used_memory[MAX_ENTRIES]; static redisAtomic size_t num_active_threads = 0; static redisAtomic size_t zmalloc_peak = 0; static redisAtomic time_t zmalloc_peak_time = 0; @@ -100,19 +117,75 @@ static __thread long my_thread_index = -1; static inline void init_my_thread_index(void) { if (unlikely(my_thread_index == -1)) { - atomicGetIncr(num_active_threads, my_thread_index, 1); - my_thread_index &= THREAD_MASK; + long idx; + atomicGetIncr(num_active_threads, idx, 1); + if (idx < DEDICATED_ENTRIES) { + my_thread_index = idx; + } else { + /* Overflow threads share the shared pool entries (atomic RMW). */ + my_thread_index = DEDICATED_ENTRIES + (idx & SHARED_ENTRIES_MASK); + } } } -static void update_zmalloc_stat_alloc(long long bytes_delta) { +/* Pre-advance the thread index counter so reserved threads that call + * zmalloc_register_reserved_thread() can claim dedicated used_memory accounting + * slots. Must be called once by main() before any other thread can allocate via + * zmalloc(), otherwise background threads could auto-register into the + * dedicated range. See DEDICATED_ENTRIES comment for details. */ +void zmalloc_reserve_thread_slots(int n) { + assert(n >= 1); + + size_t cur; + atomicGet(num_active_threads, cur); + assert((my_thread_index == -1 && cur == 0) || + (my_thread_index == 0 && cur == 1)); + + if (my_thread_index == -1) my_thread_index = 0; /* claim entry 0 for main thread */ + atomicSet(num_active_threads, (size_t)n); +} + +/* A reserved thread, e.g. an IO thread, calls this once at startup, before its + * first allocation. Claims the next dedicated slot via a private atomic counter, + * falls back to the shared pool if all dedicated slots have been taken. */ +void zmalloc_register_reserved_slot(void) { + assert(my_thread_index == -1); + static redisAtomic int reserved_slot_counter = 1; /* Slot 0 is reserved for main thread. */ + + int slot; + atomicGetIncr(reserved_slot_counter, slot, 1); + if (slot < DEDICATED_ENTRIES) { + size_t reserved; + atomicGet(num_active_threads, reserved); + assert((size_t)slot < reserved); + + my_thread_index = slot; + } else { + my_thread_index = DEDICATED_ENTRIES + (slot & SHARED_ENTRIES_MASK); + } +} + +static inline long long update_used_memory_entry(used_memory_entry *entry, long long bytes_delta) { + long long thread_used; + + if (my_thread_index < DEDICATED_ENTRIES) { + /* Dedicated slot: single writer, plain load+store (no lock prefix). */ + atomicIncrGetSingleWriter(entry->used_memory, bytes_delta, thread_used); + } else { + /* Shared pool slots: multiple writers, atomic RMW required. */ + atomicIncrGet(entry->used_memory, thread_used, bytes_delta); + } + return thread_used; +} + +static inline void update_zmalloc_stat_alloc(long long bytes_delta) { init_my_thread_index(); - /* Per-thread allocation counter and the last counter value at which we ran a - * global peak check (throttles how often we call zmalloc_used_memory()). */ - long long thread_used, thread_last_peak_check_used; - atomicIncrGet(used_memory[my_thread_index].used_memory, thread_used, bytes_delta); - atomicGet(used_memory[my_thread_index].last_peak_check, thread_last_peak_check_used); + used_memory_entry *entry = &used_memory[my_thread_index]; + long long thread_used = update_used_memory_entry(entry, bytes_delta); + + long long thread_last_peak_check_used; + atomicGet(entry->last_peak_check, thread_last_peak_check_used); /* Only run the (expensive) global used/peak check after this thread's * allocation counter has advanced enough since the last check. */ @@ -143,13 +216,13 @@ static void update_zmalloc_stat_alloc(long long bytes_delta) { /* Record the thread counter value at which we last ran a global peak check, * to throttle future checks for this thread. */ - atomicSet(used_memory[my_thread_index].last_peak_check, thread_used); + atomicSet(entry->last_peak_check, thread_used); } } -static void update_zmalloc_stat_free(long long num) { +static inline void update_zmalloc_stat_free(long long num) { init_my_thread_index(); - atomicDecr(used_memory[my_thread_index].used_memory, num); + update_used_memory_entry(&used_memory[my_thread_index], -num); } static void zmalloc_default_oom(size_t size) { @@ -586,8 +659,8 @@ size_t zmalloc_used_memory(void) { size_t local_num_active_threads; long long total_mem = 0; atomicGet(num_active_threads,local_num_active_threads); - if (local_num_active_threads > MAX_THREADS) { - local_num_active_threads = MAX_THREADS; + if (local_num_active_threads > MAX_ENTRIES) { + local_num_active_threads = MAX_ENTRIES; } for (size_t i = 0; i < local_num_active_threads; ++i) { long long thread_used_mem; diff --git a/src/zmalloc.h b/src/zmalloc.h index f24d3e996..40d16c547 100644 --- a/src/zmalloc.h +++ b/src/zmalloc.h @@ -114,6 +114,8 @@ void *ztryrealloc_usable(void *ptr, size_t size, size_t *usable, size_t *old_usa __attribute__((malloc)) char *zstrdup(const char *s); __attribute__((malloc)) char *zstrdup_usable(const char *s, size_t *usable); size_t zmalloc_used_memory(void); +void zmalloc_reserve_thread_slots(int n); +void zmalloc_register_reserved_slot(void); size_t zmalloc_get_peak_memory(void); time_t zmalloc_get_peak_memory_time(void); void zmalloc_set_oom_handler(void (*oom_handler)(size_t)); diff --git a/tests/unit/other.tcl b/tests/unit/other.tcl index 7ab9ab89b..9fdd576df 100644 --- a/tests/unit/other.tcl +++ b/tests/unit/other.tcl @@ -30,6 +30,15 @@ start_server {tags {"other"}} { assert_equal {OK} [r memory purge] } } + + test {je_malloc_conf compile-time tuning is active} { + # Verify je_malloc_conf in src/zmalloc.c overrides jemalloc defaults: + # (tcache_nslots_small_max: 200, lg_tcache_nslots_mul: 1). + if {[string match {*jemalloc*} [s mem_allocator]]} { + assert_equal 1000 [r debug mallctl opt.tcache_nslots_small_max] + assert_equal 3 [r debug mallctl opt.lg_tcache_nslots_mul] + } + } {} {needs:debug} test {SAVE - make sure there are all the types as values} { # Wait for a background saving in progress to terminate