diff --git a/src/bio.c b/src/bio.c index 446955385..6f96ef709 100644 --- a/src/bio.c +++ b/src/bio.c @@ -1,16 +1,16 @@ /* Background I/O service for Redis. * * This file implements operations that we need to perform in the background. - * Currently there is only a single operation, that is a background close(2) - * system call. This is needed as when the process is the last owner of a - * reference to a file closing it means unlinking it, and the deletion of the - * file is slow, blocking the server. + * Currently there are 3 operations: + * 1) a background close(2) system call. This is needed when the process is + * the last owner of a reference to a file closing it means unlinking it, and + * the deletion of the file is slow, blocking the server. + * 2) AOF fsync + * 3) lazyfree of memory * * In the future we'll either continue implementing new things we need or * we'll switch to libeio. However there are probably long term uses for this - * file as we may want to put here Redis specific background tasks (for instance - * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL - * implementation). + * file as we may want to put here Redis specific background tasks. * * DESIGN * ------ @@ -26,8 +26,13 @@ * least-recently-inserted to the most-recently-inserted (older jobs processed * first). * - * Currently there is no way for the creator of the job to be notified about - * the completion of the operation, this will only be added when/if needed. + * To let the creator of the job to be notified about the completion of the + * operation, it will need to submit additional dummy job, coined as + * completion job request that will be written back eventually, by the + * background thread, into completion job response queue. This notification + * layout can simplify flows that might submit more than one job, such as + * in case of FLUSHALL which for a single command submits multiple jobs. It + * is also correct because jobs are processed in FIFO fashion. * * ---------------------------------------------------------------------------- * @@ -38,9 +43,9 @@ * (RSALv2) or the Server Side Public License v1 (SSPLv1). */ - #include "server.h" #include "bio.h" +#include static char* bio_worker_title[] = { "bio_close_file", @@ -55,6 +60,9 @@ static unsigned int bio_job_to_worker[] = { [BIO_AOF_FSYNC] = 1, [BIO_CLOSE_AOF] = 1, [BIO_LAZY_FREE] = 2, + [BIO_COMP_RQ_CLOSE_FILE] = 0, + [BIO_COMP_RQ_AOF_FSYNC] = 1, + [BIO_COMP_RQ_LAZY_FREE] = 2 }; static pthread_t bio_threads[BIO_WORKER_NUM]; @@ -63,6 +71,18 @@ static pthread_cond_t bio_newjob_cond[BIO_WORKER_NUM]; static list *bio_jobs[BIO_WORKER_NUM]; static unsigned long bio_jobs_counter[BIO_NUM_OPS] = {0}; +/* The bio_comp_list is used to hold completion job responses and to handover + * to main thread to callback as notification for job completion. Main + * thread will be triggered to read the list by signaling via writing to a pipe */ +static list *bio_comp_list; +static pthread_mutex_t bio_mutex_comp; +static int job_comp_pipe[2]; /* Pipe used to awake the event loop */ + +typedef struct bio_comp_item { + comp_fn *func; /* callback after completion job will be processed */ + uint64_t arg; /* user data to be passed to the function */ +} bio_comp_item; + /* This structure represents a background Job. It is only used locally to this * file as the API does not expose the internals at all. */ typedef union bio_job { @@ -86,9 +106,15 @@ typedef union bio_job { lazy_free_fn *free_fn; /* Function that will free the provided arguments */ void *free_args[]; /* List of arguments to be passed to the free function */ } free_args; + struct { + int type; /* header */ + comp_fn *fn; /* callback. Handover to main thread to cb as notify for job completion */ + uint64_t arg; /* callback arguments */ + } comp_rq; } bio_job; void *bioProcessBackgroundJobs(void *arg); +void bioPipeReadJobCompList(aeEventLoop *el, int fd, void *privdata, int mask); /* Make sure we have enough stack to perform all the things we do in the * main thread. */ @@ -108,6 +134,27 @@ void bioInit(void) { bio_jobs[j] = listCreate(); } + /* init jobs comp responses */ + bio_comp_list = listCreate(); + pthread_mutex_init(&bio_mutex_comp, NULL); + + /* Create a pipe for background thread to be able to wake up the redis main thread. + * Make the pipe non blocking. This is just a best effort aware mechanism + * and we do not want to block not in the read nor in the write half. + * Enable close-on-exec flag on pipes in case of the fork-exec system calls in + * sentinels or redis servers. */ + if (anetPipe(job_comp_pipe, O_CLOEXEC|O_NONBLOCK, O_CLOEXEC|O_NONBLOCK) == -1) { + serverLog(LL_WARNING, + "Can't create the pipe for bio thread: %s", strerror(errno)); + exit(1); + } + + /* Register a readable event for the pipe used to awake the event loop on job completion */ + if (aeCreateFileEvent(server.el, job_comp_pipe[0], AE_READABLE, + bioPipeReadJobCompList, NULL) == AE_ERR) { + serverPanic("Error registering the readable event for the bio pipe."); + } + /* Set the stack size as by default it may be small in some system */ pthread_attr_init(&attr); pthread_attr_getstacksize(&attr,&stacksize); @@ -153,6 +200,28 @@ void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) { bioSubmitJob(BIO_LAZY_FREE, job); } +void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data) { + int type; + switch (assigned_worker) { + case BIO_WORKER_CLOSE_FILE: + type = BIO_COMP_RQ_CLOSE_FILE; + break; + case BIO_WORKER_AOF_FSYNC: + type = BIO_COMP_RQ_AOF_FSYNC; + break; + case BIO_WORKER_LAZY_FREE: + type = BIO_COMP_RQ_LAZY_FREE; + break; + default: + serverPanic("Invalid worker type in bioCreateCompRq()."); + } + + bio_job *job = zmalloc(sizeof(*job)); + job->comp_rq.fn = func; + job->comp_rq.arg = user_data; + bioSubmitJob(type, job); +} + void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache) { bio_job *job = zmalloc(sizeof(*job)); job->fd_args.fd = fd; @@ -264,6 +333,21 @@ void *bioProcessBackgroundJobs(void *arg) { close(job->fd_args.fd); } else if (job_type == BIO_LAZY_FREE) { job->free_args.free_fn(job->free_args.free_args); + } else if ((job_type == BIO_COMP_RQ_CLOSE_FILE) || + (job_type == BIO_COMP_RQ_AOF_FSYNC) || + (job_type == BIO_COMP_RQ_LAZY_FREE)) { + bio_comp_item *comp_rsp = zmalloc(sizeof(bio_comp_item)); + comp_rsp->func = job->comp_rq.fn; + comp_rsp->arg = job->comp_rq.arg; + + /* just write it to completion job responses */ + pthread_mutex_lock(&bio_mutex_comp); + listAddNodeTail(bio_comp_list, comp_rsp); + pthread_mutex_unlock(&bio_mutex_comp); + + if (write(job_comp_pipe[1],"A",1) != 1) { + /* Pipe is non-blocking, write() may fail if it's full. */ + } } else { serverPanic("Wrong job type in bioProcessBackgroundJobs()."); } @@ -322,3 +406,34 @@ void bioKillThreads(void) { } } } + +void bioPipeReadJobCompList(aeEventLoop *el, int fd, void *privdata, int mask) { + UNUSED(el); + UNUSED(mask); + UNUSED(privdata); + + char buf[128]; + list *tmp_list = NULL; + + while (read(fd, buf, sizeof(buf)) == sizeof(buf)); + + /* Handle event loop events if pipe was written from event loop API */ + pthread_mutex_lock(&bio_mutex_comp); + if (listLength(bio_comp_list)) { + tmp_list = bio_comp_list; + bio_comp_list = listCreate(); + } + pthread_mutex_unlock(&bio_mutex_comp); + + if (!tmp_list) return; + + /* callback to all job completions */ + while (listLength(tmp_list)) { + listNode *ln = listFirst(tmp_list); + bio_comp_item *rsp = ln->value; + listDelNode(tmp_list, ln); + rsp->func(rsp->arg); + zfree(rsp); + } + listRelease(tmp_list); +} diff --git a/src/bio.h b/src/bio.h index da6f992da..2679a2bf5 100644 --- a/src/bio.h +++ b/src/bio.h @@ -10,6 +10,26 @@ #define __BIO_H typedef void lazy_free_fn(void *args[]); +typedef void comp_fn(uint64_t user_data); + +typedef enum bio_worker_t { + BIO_WORKER_CLOSE_FILE = 0, + BIO_WORKER_AOF_FSYNC, + BIO_WORKER_LAZY_FREE, + BIO_WORKER_NUM +} bio_worker_t; + +/* Background job opcodes */ +typedef enum bio_job_type_t { + BIO_CLOSE_FILE = 0, /* Deferred close(2) syscall. */ + BIO_AOF_FSYNC, /* Deferred AOF fsync. */ + BIO_LAZY_FREE, /* Deferred objects freeing. */ + BIO_CLOSE_AOF, + BIO_COMP_RQ_CLOSE_FILE, /* Job completion request, registered on close-file worker's queue */ + BIO_COMP_RQ_AOF_FSYNC, /* Job completion request, registered on aof-fsync worker's queue */ + BIO_COMP_RQ_LAZY_FREE, /* Job completion request, registered on lazy-free worker's queue */ + BIO_NUM_OPS +} bio_job_type_t; /* Exported API */ void bioInit(void); @@ -20,14 +40,7 @@ void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache); void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache); void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache); void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...); +void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data); -/* Background job opcodes */ -enum { - BIO_CLOSE_FILE = 0, /* Deferred close(2) syscall. */ - BIO_AOF_FSYNC, /* Deferred AOF fsync. */ - BIO_LAZY_FREE, /* Deferred objects freeing. */ - BIO_CLOSE_AOF, /* Deferred close for AOF files. */ - BIO_NUM_OPS -}; #endif diff --git a/src/blocked.c b/src/blocked.c index 307ae28be..009e2557b 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -68,6 +68,7 @@ void blockClient(client *c, int btype) { /* Master client should never be blocked unless pause or module */ serverAssert(!(c->flags & CLIENT_MASTER && btype != BLOCKED_MODULE && + btype != BLOCKED_LAZYFREE && btype != BLOCKED_POSTPONE)); c->flags |= CLIENT_BLOCKED; @@ -175,6 +176,8 @@ void unblockClient(client *c, int queue_for_reprocessing) { c->postponed_list_node = NULL; } else if (c->bstate.btype == BLOCKED_SHUTDOWN) { /* No special cleanup. */ + } else if (c->bstate.btype == BLOCKED_LAZYFREE) { + /* No special cleanup. */ } else { serverPanic("Unknown btype in unblockClient()."); } @@ -206,7 +209,9 @@ void unblockClient(client *c, int queue_for_reprocessing) { * send it a reply of some kind. After this function is called, * unblockClient() will be called with the same client as argument. */ void replyToBlockedClientTimedOut(client *c) { - if (c->bstate.btype == BLOCKED_LIST || + if (c->bstate.btype == BLOCKED_LAZYFREE) { + addReply(c, shared.ok); /* No reason lazy-free to fail */ + } else if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) { addReplyNullArray(c); @@ -263,9 +268,16 @@ void disconnectAllBlockedClients(void) { if (c->bstate.btype == BLOCKED_POSTPONE) continue; - unblockClientOnError(c, - "-UNBLOCKED force unblock from blocking operation, " - "instance state changed (master -> replica?)"); + if (c->bstate.btype == BLOCKED_LAZYFREE) { + addReply(c, shared.ok); /* No reason lazy-free to fail */ + c->flags &= ~CLIENT_PENDING_COMMAND; + unblockClient(c, 1); + } else { + + unblockClientOnError(c, + "-UNBLOCKED force unblock from blocking operation, " + "instance state changed (master -> replica?)"); + } c->flags |= CLIENT_CLOSE_AFTER_REPLY; } } diff --git a/src/db.c b/src/db.c index b3053e8a7..f25960c0f 100644 --- a/src/db.c +++ b/src/db.c @@ -15,6 +15,7 @@ #include #include +#include "bio.h" /*----------------------------------------------------------------------------- * C-level DB API @@ -667,50 +668,110 @@ void flushAllDataAndResetRDB(int flags) { /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. * for large databases, flushdb blocks for long anyway, so a bit more won't * harm and this way the flush and purge will be synchronous. */ - if (!(flags & EMPTYDB_ASYNC)) + if (!(flags & EMPTYDB_ASYNC)) { + /* Only clear the current thread cache. + * Ignore the return call since this will fail if the tcache is disabled. */ + je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0); + jemalloc_purge(); + } #endif } -/* FLUSHDB [ASYNC] - * - * Flushes the currently SELECTed Redis DB. */ -void flushdbCommand(client *c) { +/* Optimized FLUSHALL\FLUSHDB SYNC command finished to run by lazyfree thread */ +void flushallSyncBgDone(uint64_t client_id) { + + client *c = lookupClientByID(client_id); + + /* Verify that client still exists */ + if (!c) return; + + /* Update current_client (Called functions might rely on it) */ + client *old_client = server.current_client; + server.current_client = c; + + /* Don't update blocked_us since command was processed in bg by lazy_free thread */ + updateStatsOnUnblock(c, 0 /*blocked_us*/, elapsedUs(c->bstate.lazyfreeStartTime), 0); + + /* lazyfree bg job always succeed */ + addReply(c, shared.ok); + + /* mark client as unblocked */ + unblockClient(c, 1); + + /* FLUSH command is finished. resetClient() and update replication offset. */ + commandProcessed(c); + + /* On flush completion, update the client's memory */ + updateClientMemUsageAndBucket(c); + + /* restore current_client */ + server.current_client = old_client; +} + +void flushCommandCommon(client *c, int isFlushAll) { + int blocking_async = 0; /* FLUSHALL\FLUSHDB SYNC opt to run as blocking ASYNC */ int flags; - if (getFlushCommandFlags(c,&flags) == C_ERR) return; - /* flushdb should not flush the functions */ - server.dirty += emptyData(c->db->id,flags | EMPTYDB_NOFUNCTIONS,NULL); - /* Without the forceCommandPropagation, when DB was already empty, - * FLUSHDB will not be replicated nor put into the AOF. */ + /* in case of SYNC, check if we can optimize and run it in bg as blocking ASYNC */ + if ((!(flags & EMPTYDB_ASYNC)) && (!(c->flags & CLIENT_AVOID_BLOCKING_ASYNC_FLUSH))) { + /* Run as ASYNC */ + flags |= EMPTYDB_ASYNC; + blocking_async = 1; + } + + if (isFlushAll) + flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS); + else + server.dirty += emptyData(c->db->id,flags | EMPTYDB_NOFUNCTIONS,NULL); + + /* Without the forceCommandPropagation, when DB(s) was already empty, + * FLUSHALL\FLUSHDB will not be replicated nor put into the AOF. */ forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF); - addReply(c,shared.ok); + /* if blocking ASYNC, block client and add completion job request to BIO lazyfree + * worker's queue. To be called and reply with OK only after all preceding pending + * lazyfree jobs in queue were processed */ + if (blocking_async) { + /* measure bg job till completion as elapsed time of flush command */ + elapsedStart(&c->bstate.lazyfreeStartTime); + c->bstate.timeout = 0; + blockClient(c,BLOCKED_LAZYFREE); + bioCreateCompRq(BIO_WORKER_LAZY_FREE, flushallSyncBgDone, c->id); + } else { + addReply(c, shared.ok); + } #if defined(USE_JEMALLOC) /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. * for large databases, flushdb blocks for long anyway, so a bit more won't - * harm and this way the flush and purge will be synchronous. */ - if (!(flags & EMPTYDB_ASYNC)) + * harm and this way the flush and purge will be synchronous. + * + * Take care purge only FLUSHDB for sync flow. FLUSHALL sync flow already + * applied at flushAllDataAndResetRDB. Async flow will apply only later on */ + if ((!isFlushAll) && (!(flags & EMPTYDB_ASYNC))) { + /* Only clear the current thread cache. + * Ignore the return call since this will fail if the tcache is disabled. */ + je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0); + jemalloc_purge(); + } #endif } -/* FLUSHALL [ASYNC] +/* FLUSHALL [SYNC|ASYNC] * * Flushes the whole server data set. */ void flushallCommand(client *c) { - int flags; - if (getFlushCommandFlags(c,&flags) == C_ERR) return; - /* flushall should not flush the functions */ - flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS); + flushCommandCommon(c, 1); +} - /* Without the forceCommandPropagation, when DBs were already empty, - * FLUSHALL will not be replicated nor put into the AOF. */ - forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF); - - addReply(c,shared.ok); +/* FLUSHDB [SYNC|ASYNC] + * + * Flushes the currently SELECTed Redis DB. */ +void flushdbCommand(client *c) { + flushCommandCommon(c, 0); } /* This command implements DEL and UNLINK. */ diff --git a/src/lazyfree.c b/src/lazyfree.c index 80c4607d3..e743cb204 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -28,6 +28,14 @@ void lazyfreeFreeDatabase(void *args[]) { kvstoreRelease(da2); atomicDecr(lazyfree_objects,numkeys); atomicIncr(lazyfreed_objects,numkeys); + +#if defined(USE_JEMALLOC) + /* Only clear the current thread cache. + * Ignore the return call since this will fail if the tcache is disabled. */ + je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0); + + jemalloc_purge(); +#endif } /* Release the key tracking table. */ diff --git a/src/server.h b/src/server.h index a5f97baa4..582c83c26 100644 --- a/src/server.h +++ b/src/server.h @@ -382,6 +382,9 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */ #define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */ +/* Any flag that does not let optimize FLUSH SYNC to run it in bg as blocking client ASYNC */ +#define CLIENT_AVOID_BLOCKING_ASYNC_FLUSH (CLIENT_DENY_BLOCKING|CLIENT_MULTI|CLIENT_LUA_DEBUG|CLIENT_LUA_DEBUG_SYNC|CLIENT_MODULE) + /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ typedef enum blocking_type { @@ -394,6 +397,7 @@ typedef enum blocking_type { BLOCKED_ZSET, /* BZPOP et al. */ BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */ BLOCKED_SHUTDOWN, /* SHUTDOWN. */ + BLOCKED_LAZYFREE, /* LAZYFREE */ BLOCKED_NUM, /* Number of blocked states. */ BLOCKED_END /* End of enumeration */ } blocking_type; @@ -654,7 +658,7 @@ typedef enum { #define serverAssert(_e) (likely(_e)?(void)0 : (_serverAssert(#_e,__FILE__,__LINE__),redis_unreachable())) #define serverPanic(...) _serverPanic(__FILE__,__LINE__,__VA_ARGS__),redis_unreachable() -/* The following macros provide assertions that are only executed during test builds and should be used to add +/* The following macros provide assertions that are only executed during test builds and should be used to add * assertions that are too computationally expensive or dangerous to run during normal operations. */ #ifdef DEBUG_ASSERTIONS #define debugServerAssertWithInfo(...) serverAssertWithInfo(__VA_ARGS__) @@ -1027,6 +1031,9 @@ typedef struct blockingState { void *async_rm_call_handle; /* RedisModuleAsyncRMCallPromise structure. which is opaque for the Redis core, only handled in module.c. */ + + /* BLOCKED_LAZYFREE */ + monotime lazyfreeStartTime; } blockingState; /* The following structure represents a node in the server.ready_keys list, @@ -1305,7 +1312,7 @@ struct sharedObjectsStruct { *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, *rpop, *lpop, *lpush, *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax, - *emptyscan, *multi, *exec, *left, *right, *hset, *srem, *xgroup, *xclaim, + *emptyscan, *multi, *exec, *left, *right, *hset, *srem, *xgroup, *xclaim, *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, @@ -1709,7 +1716,7 @@ struct redisServer { long long el_cmd_cnt_max; /* The sum of active-expire, active-defrag and all other tasks done by cron and beforeSleep, but excluding read, write and AOF, which are counted by other sets of metrics. */ - monotime el_cron_duration; + monotime el_cron_duration; durationStats duration_stats[EL_DURATION_TYPE_NUM]; /* Configuration */ @@ -3023,6 +3030,7 @@ size_t freeMemoryGetNotCountedMemory(void); int overMaxmemoryAfterAlloc(size_t moremem); uint64_t getCommandFlags(client *c); int processCommand(client *c); +void commandProcessed(client *c); int processPendingCommandAndInputBuffer(client *c); int processCommandAndResetClient(client *c); void setupSignalHandlers(void); diff --git a/tests/unit/lazyfree.tcl b/tests/unit/lazyfree.tcl index 17f460003..b4ade4031 100644 --- a/tests/unit/lazyfree.tcl +++ b/tests/unit/lazyfree.tcl @@ -87,4 +87,91 @@ start_server {tags {"lazyfree"}} { } assert_equal [s lazyfreed_objects] 0 } {} {needs:config-resetstat} + + test "FLUSHALL SYNC optimized to run in bg as blocking FLUSHALL ASYNC" { + set num_keys 1000 + r config resetstat + + # Verify at start there are no lazyfree pending objects + assert_equal [s lazyfree_pending_objects] 0 + + # Fillup DB with items + populate $num_keys + + # Run FLUSHALL SYNC command, optimized as blocking ASYNC + r flushall + + # Verify all keys counted as lazyfreed + assert_equal [s lazyfreed_objects] $num_keys + } + + test "Run consecutive blocking FLUSHALL ASYNC successfully" { + r config resetstat + set rd [redis_deferring_client] + + # Fillup DB with items + r set x 1 + r set y 2 + + $rd write "FLUSHALL\r\nFLUSHALL\r\nFLUSHDB\r\n" + $rd flush + assert_equal [$rd read] {OK} + assert_equal [$rd read] {OK} + assert_equal [$rd read] {OK} + assert_equal [s lazyfreed_objects] 2 + $rd close + } + + test "FLUSHALL SYNC in MULTI not optimized to run as blocking FLUSHALL ASYNC" { + r config resetstat + + # Fillup DB with items + r set x 11 + r set y 22 + + # FLUSHALL SYNC in multi + r multi + r flushall + r exec + + # Verify flushall not run as lazyfree + assert_equal [s lazyfree_pending_objects] 0 + assert_equal [s lazyfreed_objects] 0 + } + + test "Client closed in the middle of blocking FLUSHALL ASYNC" { + set num_keys 100000 + r config resetstat + + # Fillup DB with items + populate $num_keys + + # close client in the middle of ongoing Blocking FLUSHALL ASYNC + set rd [redis_deferring_client] + $rd flushall + $rd close + + # Wait to verify all keys counted as lazyfreed + wait_for_condition 50 100 { + [s lazyfreed_objects] == $num_keys + } else { + fail "Unexpected number of lazyfreed_objects: [s lazyfreed_objects]" + } + } + + test "Pending commands in querybuf processed once unblocking FLUSHALL ASYNC" { + r config resetstat + set rd [redis_deferring_client] + + # Fillup DB with items + r set x 1 + r set y 2 + + $rd write "FLUSHALL\r\nPING\r\n" + $rd flush + assert_equal [$rd read] {OK} + assert_equal [$rd read] {PONG} + assert_equal [s lazyfreed_objects] 2 + $rd close + } }