From ffff7fea7cd98e1573db000c51f4d80af89c453c Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Wed, 10 Jul 2024 17:35:36 +0800 Subject: [PATCH] Rebuild function engines for function flush command (#13383) ### Issue The current implementation of `FUNCTION FLUSH` command uses `lua_unref()` to unreference script closures in Lua vm. However, invoking `lua_unref()` during lazy free (`ASYNC` argument) is risky since it is not thread-safe. Another issue is that using `lua_unref()` to unreference references does not trigger GC, This can result in the Lua VM leaves a significant amount of garbage, which may never be cleaned up if not properly GC. ### Solution The proposed solution is to completely rebuild the engines, resulting in a brand new Lua VM. --------- Co-authored-by: meir --- src/function_lua.c | 7 +++++++ src/functions.c | 21 +++++++++++++++++---- src/functions.h | 4 ++++ src/lazyfree.c | 10 +++++++--- src/server.h | 2 +- tests/unit/functions.tcl | 25 +++++++++++++++++++++++++ 6 files changed, 61 insertions(+), 8 deletions(-) diff --git a/src/function_lua.c b/src/function_lua.c index 61a20a4c6..acb16ce52 100644 --- a/src/function_lua.c +++ b/src/function_lua.c @@ -183,6 +183,12 @@ static void luaEngineFreeFunction(void *engine_ctx, void *compiled_function) { zfree(f_ctx); } +static void luaEngineFreeCtx(void *engine_ctx) { + luaEngineCtx *lua_engine_ctx = engine_ctx; + lua_close(lua_engine_ctx->lua); + zfree(lua_engine_ctx); +} + static void luaRegisterFunctionArgsInitialize(registerFunctionArgs *register_f_args, sds name, sds desc, @@ -480,6 +486,7 @@ int luaEngineInitEngine(void) { .get_function_memory_overhead = luaEngineFunctionMemoryOverhead, .get_engine_memory_overhead = luaEngineMemoryOverhead, .free_function = luaEngineFreeFunction, + .free_ctx = luaEngineFreeCtx, }; return functionsRegisterEngine(LUA_ENGINE_NAME, lua_engine); } diff --git a/src/functions.c b/src/functions.c index 427cda8d0..7f8d2a1d4 100644 --- a/src/functions.c +++ b/src/functions.c @@ -24,6 +24,7 @@ static size_t engine_cache_memory = 0; static void engineFunctionDispose(dict *d, void *obj); static void engineStatsDispose(dict *d, void *obj); static void engineLibraryDispose(dict *d, void *obj); +static void engineDispose(dict *d, void *obj); static int functionsVerifyName(sds name); typedef struct functionsLibEngineStats { @@ -50,7 +51,7 @@ dictType engineDictType = { NULL, /* val dup */ dictSdsKeyCaseCompare, /* key compare */ dictSdsDestructor, /* key destructor */ - NULL, /* val destructor */ + engineDispose, /* val destructor */ NULL /* allow to expand */ }; @@ -148,6 +149,16 @@ static void engineLibraryDispose(dict *d, void *obj) { engineLibraryFree(obj); } +static void engineDispose(dict *d, void *obj) { + UNUSED(d); + engineInfo *ei = obj; + freeClient(ei->c); + sdsfree(ei->name); + ei->engine->free_ctx(ei->engine->engine_ctx); + zfree(ei->engine); + zfree(ei); +} + /* Clear all the functions from the given library ctx */ void functionsLibCtxClear(functionsLibCtx *lib_ctx) { dictEmpty(lib_ctx->functions, NULL); @@ -166,11 +177,13 @@ void functionsLibCtxClear(functionsLibCtx *lib_ctx) { void functionsLibCtxClearCurrent(int async) { if (async) { functionsLibCtx *old_l_ctx = curr_functions_lib_ctx; - curr_functions_lib_ctx = functionsLibCtxCreate(); - freeFunctionsAsync(old_l_ctx); + dict *old_engines = engines; + freeFunctionsAsync(old_l_ctx, old_engines); } else { - functionsLibCtxClear(curr_functions_lib_ctx); + functionsLibCtxFree(curr_functions_lib_ctx); + dictRelease(engines); } + functionsInit(); } /* Free the given functions ctx */ diff --git a/src/functions.h b/src/functions.h index 1d69e3794..ed4392db3 100644 --- a/src/functions.h +++ b/src/functions.h @@ -67,6 +67,9 @@ typedef struct engine { /* free the given function */ void (*free_function)(void *engine_ctx, void *compiled_function); + + /* Free the engine context. */ + void (*free_ctx)(void *engine_ctx); } engine; /* Hold information about an engine. @@ -116,5 +119,6 @@ int functionLibCreateFunction(sds name, void *function, functionLibInfo *li, sds int luaEngineInitEngine(void); int functionsInit(void); +void functionsFree(functionsLibCtx *lib_ctx, dict *engs); #endif /* __FUNCTIONS_H_ */ diff --git a/src/lazyfree.c b/src/lazyfree.c index 2b98f9a06..858751757 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -72,8 +72,11 @@ void lazyFreeLuaScripts(void *args[]) { /* Release the functions ctx. */ void lazyFreeFunctionsCtx(void *args[]) { functionsLibCtx *functions_lib_ctx = args[0]; + dict *engs = args[1]; size_t len = functionsLibCtxFunctionsLen(functions_lib_ctx); functionsLibCtxFree(functions_lib_ctx); + len += dictSize(engs); + dictRelease(engs); atomicDecr(lazyfree_objects,len); atomicIncr(lazyfreed_objects,len); } @@ -247,12 +250,13 @@ void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_Stat } /* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */ -void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx) { +void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx, dict *engs) { if (functionsLibCtxFunctionsLen(functions_lib_ctx) > LAZYFREE_THRESHOLD) { - atomicIncr(lazyfree_objects,functionsLibCtxFunctionsLen(functions_lib_ctx)); - bioCreateLazyFreeJob(lazyFreeFunctionsCtx,1,functions_lib_ctx); + atomicIncr(lazyfree_objects,functionsLibCtxFunctionsLen(functions_lib_ctx)+dictSize(engs)); + bioCreateLazyFreeJob(lazyFreeFunctionsCtx,2,functions_lib_ctx,engs); } else { functionsLibCtxFree(functions_lib_ctx); + dictRelease(engs); } } diff --git a/src/server.h b/src/server.h index df629d364..a0e8ee556 100644 --- a/src/server.h +++ b/src/server.h @@ -3470,7 +3470,7 @@ int ldbPendingChildren(void); void luaLdbLineHook(lua_State *lua, lua_Debug *ar); void freeLuaScriptsSync(dict *lua_scripts, list *lua_scripts_lru_list, lua_State *lua); void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_State *lua); -void freeFunctionsAsync(functionsLibCtx *lib_ctx); +void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx, dict *engines); int ldbIsEnabled(void); void ldbLog(sds entry); void ldbLogRedisReply(char *reply); diff --git a/tests/unit/functions.tcl b/tests/unit/functions.tcl index 90d4bb801..ce47eb50d 100644 --- a/tests/unit/functions.tcl +++ b/tests/unit/functions.tcl @@ -294,6 +294,31 @@ start_server {tags {"scripting"}} { assert_match {} [r function list] } + test {FUNCTION - async function flush rebuilds Lua VM without causing race condition between main and lazyfree thread} { + # LAZYFREE_THRESHOLD is 64 + for {set i 0} {$i < 100} {incr i} { + r function load REPLACE [get_function_code lua test$i test$i {local a = 1 while true do a = a + 1 end}] + } + assert_morethan [s used_memory_vm_functions] 70000 + r config resetstat + r function flush async + assert_lessthan [s used_memory_vm_functions] 40000 + + # Wait for the completion of lazy free for both functions and engines. + set start_time [clock seconds] + while {1} { + # Tests for race conditions between async function flushes and main thread Lua VM operations. + r function load REPLACE [get_function_code lua test test {local a = 1 while true do a = a + 1 end}] + if {[s lazyfreed_objects] == 101 || [expr {[clock seconds] - $start_time}] > 5} { + break + } + } + if {[s lazyfreed_objects] != 101} { + error "Timeout or unexpected number of lazyfreed_objects: [s lazyfreed_objects]" + } + assert_match {{library_name test engine LUA functions {{name test description {} flags {}}}}} [r function list] + } + test {FUNCTION - test function wrong argument} { catch {r function flush bad_arg} e assert_match {*only supports SYNC|ASYNC*} $e