diff --git a/src/defrag.c b/src/defrag.c index 78de72248..a819eb8ac 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -54,6 +54,17 @@ void* activeDefragAlloc(void *ptr) { return newptr; } +/* Raw memory allocation for defrag, avoid using tcache. */ +void *activeDefragAllocRaw(size_t size) { + return zmalloc_no_tcache(size); +} + +/* Raw memory free for defrag, avoid using tcache. */ +void activeDefragFreeRaw(void *ptr) { + zfree_no_tcache(ptr); + server.stat_active_defrag_hits++; +} + /*Defrag helper for sds strings * * returns NULL in case the allocation wasn't moved. @@ -1078,6 +1089,7 @@ void activeDefragCycle(void) { slot = -1; defrag_later_item_in_progress = 0; db = NULL; + moduleDefragEnd(); goto update_metrics; } return; @@ -1119,6 +1131,10 @@ void activeDefragCycle(void) { break; /* this will exit the function and we'll continue on the next cycle */ } + if (current_db == -1) { + moduleDefragStart(); + } + /* Move on to next database, and stop if we reached the last one. */ if (++current_db >= server.dbnum) { /* defrag other items not part of the db / keys */ @@ -1140,6 +1156,8 @@ void activeDefragCycle(void) { db = NULL; server.active_defrag_running = 0; + moduleDefragEnd(); + computeDefragCycles(); /* if another scan is needed, start it right away */ if (server.active_defrag_running != 0 && ustime() < endtime) continue; @@ -1259,6 +1277,16 @@ void *activeDefragAlloc(void *ptr) { return NULL; } +void *activeDefragAllocRaw(size_t size) { + /* fallback to regular allocation */ + return zmalloc(size); +} + +void activeDefragFreeRaw(void *ptr) { + /* fallback to regular free */ + zfree(ptr); +} + robj *activeDefragStringOb(robj *ob) { UNUSED(ob); return NULL; diff --git a/src/dict.h b/src/dict.h index 1c0e6accd..e78833066 100644 --- a/src/dict.h +++ b/src/dict.h @@ -257,6 +257,18 @@ dictStats* dictGetStatsHt(dict *d, int htidx, int full); void dictCombineStats(dictStats *from, dictStats *into); void dictFreeStats(dictStats *stats); +#define dictForEach(d, ty, m, ...) do { \ + dictIterator *di = dictGetIterator(d); \ + dictEntry *de; \ + while ((de = dictNext(di)) != NULL) { \ + ty *m = dictGetVal(de); \ + do { \ + __VA_ARGS__ \ + } while(0); \ + } \ + dictReleaseIterator(di); \ +} while(0); + #ifdef REDIS_TEST int dictTest(int argc, char *argv[], int flags); #endif diff --git a/src/module.c b/src/module.c index b917a28a4..d9ef7dfcc 100644 --- a/src/module.c +++ b/src/module.c @@ -2287,6 +2287,8 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api module->options = 0; module->info_cb = 0; module->defrag_cb = 0; + module->defrag_start_cb = 0; + module->defrag_end_cb = 0; module->loadmod = NULL; module->num_commands_with_acl_categories = 0; module->onload = 1; @@ -13456,6 +13458,16 @@ int RM_RegisterDefragFunc(RedisModuleCtx *ctx, RedisModuleDefragFunc cb) { return REDISMODULE_OK; } +/* Register a defrag callbacks that will be called when defrag operation starts and ends. + * + * The callbacks are the same as `RM_RegisterDefragFunc` but the user + * can also assume the callbacks are called when the defrag operation starts and ends. */ +int RM_RegisterDefragCallbacks(RedisModuleCtx *ctx, RedisModuleDefragFunc start, RedisModuleDefragFunc end) { + ctx->module->defrag_start_cb = start; + ctx->module->defrag_end_cb = end; + return REDISMODULE_OK; +} + /* When the data type defrag callback iterates complex structures, this * function should be called periodically. A zero (false) return * indicates the callback may continue its work. A non-zero value (true) @@ -13534,6 +13546,30 @@ void *RM_DefragAlloc(RedisModuleDefragCtx *ctx, void *ptr) { return activeDefragAlloc(ptr); } +/* Allocate memory for defrag purposes + * + * On the common cases user simply want to reallocate a pointer with a single + * owner. For such usecase RM_DefragAlloc is enough. But on some usecases the user + * might want to replace a pointer with multiple owners in different keys. + * In such case, an in place replacement can not work because the other key still + * keep a pointer to the old value. + * + * RM_DefragAllocRaw and RM_DefragFreeRaw allows to control when the memory + * for defrag purposes will be allocated and when it will be freed, + * allow to support more complex defrag usecases. */ +void *RM_DefragAllocRaw(RedisModuleDefragCtx *ctx, size_t size) { + UNUSED(ctx); + return activeDefragAllocRaw(size); +} + +/* Free memory for defrag purposes + * + * See RM_DefragAllocRaw for more information. */ +void RM_DefragFreeRaw(RedisModuleDefragCtx *ctx, void *ptr) { + UNUSED(ctx); + activeDefragFreeRaw(ptr); +} + /* Defrag a RedisModuleString previously allocated by RM_Alloc, RM_Calloc, etc. * See RM_DefragAlloc() for more information on how the defragmentation process * works. @@ -13615,17 +13651,32 @@ int moduleDefragValue(robj *key, robj *value, int dbid) { /* Call registered module API defrag functions */ void moduleDefragGlobals(void) { - dictIterator *di = dictGetIterator(modules); - dictEntry *de; + dictForEach(modules, struct RedisModule, module, + if (module->defrag_cb) { + RedisModuleDefragCtx defrag_ctx = { 0, NULL, NULL, -1}; + module->defrag_cb(&defrag_ctx); + } + ); +} - while ((de = dictNext(di)) != NULL) { - struct RedisModule *module = dictGetVal(de); - if (!module->defrag_cb) - continue; - RedisModuleDefragCtx defrag_ctx = { 0, NULL, NULL, -1}; - module->defrag_cb(&defrag_ctx); - } - dictReleaseIterator(di); +/* Call registered module API defrag start functions */ +void moduleDefragStart(void) { + dictForEach(modules, struct RedisModule, module, + if (module->defrag_start_cb) { + RedisModuleDefragCtx defrag_ctx = { 0, NULL, NULL, -1}; + module->defrag_start_cb(&defrag_ctx); + } + ); +} + +/* Call registered module API defrag end functions */ +void moduleDefragEnd(void) { + dictForEach(modules, struct RedisModule, module, + if (module->defrag_end_cb) { + RedisModuleDefragCtx defrag_ctx = { 0, NULL, NULL, -1}; + module->defrag_end_cb(&defrag_ctx); + } + ); } /* Returns the name of the key currently being processed. @@ -13985,7 +14036,10 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(GetCurrentCommandName); REGISTER_API(GetTypeMethodVersion); REGISTER_API(RegisterDefragFunc); + REGISTER_API(RegisterDefragCallbacks); REGISTER_API(DefragAlloc); + REGISTER_API(DefragAllocRaw); + REGISTER_API(DefragFreeRaw); REGISTER_API(DefragRedisModuleString); REGISTER_API(DefragShouldStop); REGISTER_API(DefragCursorSet); diff --git a/src/redismodule.h b/src/redismodule.h index 8b5d2beb6..9e0b5c5ee 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -1296,7 +1296,10 @@ REDISMODULE_API int *(*RedisModule_GetCommandKeys)(RedisModuleCtx *ctx, RedisMod REDISMODULE_API int *(*RedisModule_GetCommandKeysWithFlags)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int *num_keys, int **out_flags) REDISMODULE_ATTR; REDISMODULE_API const char *(*RedisModule_GetCurrentCommandName)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_RegisterDefragFunc)(RedisModuleCtx *ctx, RedisModuleDefragFunc func) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_RegisterDefragCallbacks)(RedisModuleCtx *ctx, RedisModuleDefragFunc start, RedisModuleDefragFunc end) REDISMODULE_ATTR; REDISMODULE_API void *(*RedisModule_DefragAlloc)(RedisModuleDefragCtx *ctx, void *ptr) REDISMODULE_ATTR; +REDISMODULE_API void *(*RedisModule_DefragAllocRaw)(RedisModuleDefragCtx *ctx, size_t size) REDISMODULE_ATTR; +REDISMODULE_API void (*RedisModule_DefragFreeRaw)(RedisModuleDefragCtx *ctx, void *ptr) REDISMODULE_ATTR; REDISMODULE_API RedisModuleString *(*RedisModule_DefragRedisModuleString)(RedisModuleDefragCtx *ctx, RedisModuleString *str) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_DefragShouldStop)(RedisModuleDefragCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_DefragCursorSet)(RedisModuleDefragCtx *ctx, unsigned long cursor) REDISMODULE_ATTR; @@ -1662,7 +1665,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(GetCommandKeysWithFlags); REDISMODULE_GET_API(GetCurrentCommandName); REDISMODULE_GET_API(RegisterDefragFunc); + REDISMODULE_GET_API(RegisterDefragCallbacks); REDISMODULE_GET_API(DefragAlloc); + REDISMODULE_GET_API(DefragAllocRaw); + REDISMODULE_GET_API(DefragFreeRaw); REDISMODULE_GET_API(DefragRedisModuleString); REDISMODULE_GET_API(DefragShouldStop); REDISMODULE_GET_API(DefragCursorSet); diff --git a/src/server.h b/src/server.h index fe2da0a4a..7d7147874 100644 --- a/src/server.h +++ b/src/server.h @@ -820,6 +820,8 @@ struct RedisModule { int blocked_clients; /* Count of RedisModuleBlockedClient in this module. */ RedisModuleInfoFunc info_cb; /* Callback for module to add INFO fields. */ RedisModuleDefragFunc defrag_cb; /* Callback for global data defrag. */ + RedisModuleDefragFunc defrag_start_cb; /* Callback indicating defrag started. */ + RedisModuleDefragFunc defrag_end_cb; /* Callback indicating defrag ended. */ struct moduleLoadQueueEntry *loadmod; /* Module load arguments for config rewrite. */ int num_commands_with_acl_categories; /* Number of commands in this module included in acl categories */ int onload; /* Flag to identify if the call is being made from Onload (0 or 1) */ @@ -2555,6 +2557,8 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj int moduleDefragValue(robj *key, robj *obj, int dbid); int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, int dbid); void moduleDefragGlobals(void); +void moduleDefragStart(void); +void moduleDefragEnd(void); void *moduleGetHandleByName(char *modulename); int moduleIsModuleCommand(void *module_handle, struct redisCommand *cmd); @@ -3132,6 +3136,8 @@ void checkChildrenDone(void); int setOOMScoreAdj(int process_class); void rejectCommandFormat(client *c, const char *fmt, ...); void *activeDefragAlloc(void *ptr); +void *activeDefragAllocRaw(size_t size); +void activeDefragFreeRaw(void *ptr); robj *activeDefragStringOb(robj* ob); void dismissSds(sds s); void dismissMemory(void* ptr, size_t size_hint); diff --git a/tests/modules/defragtest.c b/tests/modules/defragtest.c index 6a02a059f..a27b57e13 100644 --- a/tests/modules/defragtest.c +++ b/tests/modules/defragtest.c @@ -3,6 +3,7 @@ #include "redismodule.h" #include +#include static RedisModuleType *FragType; @@ -17,9 +18,12 @@ unsigned long int last_set_cursor = 0; unsigned long int datatype_attempts = 0; unsigned long int datatype_defragged = 0; +unsigned long int datatype_raw_defragged = 0; unsigned long int datatype_resumes = 0; unsigned long int datatype_wrong_cursor = 0; unsigned long int global_attempts = 0; +unsigned long int defrag_started = 0; +unsigned long int defrag_ended = 0; unsigned long int global_defragged = 0; int global_strings_len = 0; @@ -47,16 +51,29 @@ static void defragGlobalStrings(RedisModuleDefragCtx *ctx) } } +static void defragStart(RedisModuleDefragCtx *ctx) { + REDISMODULE_NOT_USED(ctx); + defrag_started++; +} + +static void defragEnd(RedisModuleDefragCtx *ctx) { + REDISMODULE_NOT_USED(ctx); + defrag_ended++; +} + static void FragInfo(RedisModuleInfoCtx *ctx, int for_crash_report) { REDISMODULE_NOT_USED(for_crash_report); RedisModule_InfoAddSection(ctx, "stats"); RedisModule_InfoAddFieldLongLong(ctx, "datatype_attempts", datatype_attempts); RedisModule_InfoAddFieldLongLong(ctx, "datatype_defragged", datatype_defragged); + RedisModule_InfoAddFieldLongLong(ctx, "datatype_raw_defragged", datatype_raw_defragged); RedisModule_InfoAddFieldLongLong(ctx, "datatype_resumes", datatype_resumes); RedisModule_InfoAddFieldLongLong(ctx, "datatype_wrong_cursor", datatype_wrong_cursor); RedisModule_InfoAddFieldLongLong(ctx, "global_attempts", global_attempts); RedisModule_InfoAddFieldLongLong(ctx, "global_defragged", global_defragged); + RedisModule_InfoAddFieldLongLong(ctx, "defrag_started", defrag_started); + RedisModule_InfoAddFieldLongLong(ctx, "defrag_ended", defrag_ended); } struct FragObject *createFragObject(unsigned long len, unsigned long size, int maxstep) { @@ -79,10 +96,13 @@ static int fragResetStatsCommand(RedisModuleCtx *ctx, RedisModuleString **argv, datatype_attempts = 0; datatype_defragged = 0; + datatype_raw_defragged = 0; datatype_resumes = 0; datatype_wrong_cursor = 0; global_attempts = 0; global_defragged = 0; + defrag_started = 0; + defrag_ended = 0; RedisModule_ReplyWithSimpleString(ctx, "OK"); return REDISMODULE_OK; @@ -188,6 +208,14 @@ int FragDefrag(RedisModuleDefragCtx *ctx, RedisModuleString *key, void **value) } } + /* Defrag the values array itself using RedisModule_DefragAllocRaw + * and RedisModule_DefragFreeRaw for testing purposes. */ + void *new_values = RedisModule_DefragAllocRaw(ctx, o->len * sizeof(void*)); + memcpy(new_values, o->values, o->len * sizeof(void*)); + RedisModule_DefragFreeRaw(ctx, o->values); + o->values = new_values; + datatype_raw_defragged++; + last_set_cursor = 0; return 0; } @@ -230,6 +258,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) RedisModule_RegisterInfoFunc(ctx, FragInfo); RedisModule_RegisterDefragFunc(ctx, defragGlobalStrings); + RedisModule_RegisterDefragCallbacks(ctx, defragStart, defragEnd); return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/defrag.tcl b/tests/unit/moduleapi/defrag.tcl index b2e23967e..3f36ee191 100644 --- a/tests/unit/moduleapi/defrag.tcl +++ b/tests/unit/moduleapi/defrag.tcl @@ -18,6 +18,9 @@ start_server {tags {"modules"} overrides {{save ""}}} { set info [r info defragtest_stats] assert {[getInfoProperty $info defragtest_datatype_attempts] > 0} assert_equal 0 [getInfoProperty $info defragtest_datatype_resumes] + assert_morethan [getInfoProperty $info defragtest_datatype_raw_defragged] 0 + assert_morethan [getInfoProperty $info defragtest_defrag_started] 0 + assert_morethan [getInfoProperty $info defragtest_defrag_ended] 0 } test {Module defrag: late defrag with cursor works} { @@ -32,6 +35,9 @@ start_server {tags {"modules"} overrides {{save ""}}} { set info [r info defragtest_stats] assert {[getInfoProperty $info defragtest_datatype_resumes] > 10} assert_equal 0 [getInfoProperty $info defragtest_datatype_wrong_cursor] + assert_morethan [getInfoProperty $info defragtest_datatype_raw_defragged] 0 + assert_morethan [getInfoProperty $info defragtest_defrag_started] 0 + assert_morethan [getInfoProperty $info defragtest_defrag_ended] 0 } test {Module defrag: global defrag works} { @@ -41,6 +47,8 @@ start_server {tags {"modules"} overrides {{save ""}}} { after 2000 set info [r info defragtest_stats] assert {[getInfoProperty $info defragtest_global_attempts] > 0} + assert_morethan [getInfoProperty $info defragtest_defrag_started] 0 + assert_morethan [getInfoProperty $info defragtest_defrag_ended] 0 } } }