diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 91563ad0de..1c30c6c15b 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -153,6 +153,7 @@ typedef enum isc__netievent_type { netievent_closecb, netievent_shutdown, netievent_stop, + netievent_pause, netievent_prio = 0xff, /* event type values higher than this * will be treated as high-priority @@ -161,6 +162,7 @@ typedef enum isc__netievent_type { */ netievent_udplisten, netievent_tcplisten, + netievent_resume, } isc__netievent_type; typedef union { @@ -297,10 +299,9 @@ struct isc_nm { isc_mempool_t *evpool; isc_mutex_t evlock; - atomic_uint_fast32_t workers_running; - atomic_uint_fast32_t workers_paused; + uint_fast32_t workers_running; + uint_fast32_t workers_paused; atomic_uint_fast32_t maxudp; - atomic_bool paused; /* * Active connections are being closed and new connections are diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index aa47309b11..d4e2bf29f5 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -130,8 +130,21 @@ static isc_threadresult_t nm_thread(isc_threadarg_t worker0); static void async_cb(uv_async_t *handle); -static void +static bool process_queue(isc__networker_t *worker, isc_queue_t *queue); +static bool +process_priority_queue(isc__networker_t *worker); +static bool +process_normal_queue(isc__networker_t *worker); +static void +process_queues(isc__networker_t *worker); + +static void +isc__nm_async_stopcb(isc__networker_t *worker, isc__netievent_t *ev0); +static void +isc__nm_async_pausecb(isc__networker_t *worker, isc__netievent_t *ev0); +static void +isc__nm_async_resumecb(isc__networker_t *worker, isc__netievent_t *ev0); int isc_nm_tid(void) { @@ -155,10 +168,7 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) { isc_mutex_init(&mgr->lock); isc_condition_init(&mgr->wkstatecond); isc_refcount_init(&mgr->references, 1); - atomic_init(&mgr->workers_running, 0); - atomic_init(&mgr->workers_paused, 0); atomic_init(&mgr->maxudp, 0); - atomic_init(&mgr->paused, false); atomic_init(&mgr->interlocked, false); #ifdef NETMGR_TRACE @@ -218,8 +228,7 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) { * race - we could exit isc_nm_start, launch nm_destroy, * and nm_thread would still not be up. */ - atomic_fetch_add_explicit(&mgr->workers_running, 1, - memory_order_relaxed); + mgr->workers_running++; isc_thread_create(nm_thread, &mgr->workers[i], &worker->thread); snprintf(name, sizeof(name), "isc-net-%04zu", i); @@ -246,17 +255,14 @@ nm_destroy(isc_nm_t **mgr0) { mgr->magic = 0; for (size_t i = 0; i < mgr->nworkers; i++) { - isc__netievent_t *event = NULL; - - LOCK(&mgr->workers[i].lock); - mgr->workers[i].finished = true; - UNLOCK(&mgr->workers[i].lock); - event = isc__nm_get_ievent(mgr, netievent_stop); - isc__nm_enqueue_ievent(&mgr->workers[i], event); + isc__networker_t *worker = &mgr->workers[i]; + isc__netievent_t *event = isc__nm_get_ievent(mgr, + netievent_stop); + isc__nm_enqueue_ievent(worker, event); } LOCK(&mgr->lock); - while (atomic_load(&mgr->workers_running) > 0) { + while (mgr->workers_running > 0) { WAIT(&mgr->wkstatecond, &mgr->lock); } UNLOCK(&mgr->lock); @@ -315,28 +321,17 @@ isc_nm_pause(isc_nm_t *mgr) { REQUIRE(VALID_NM(mgr)); REQUIRE(!isc__nm_in_netthread()); - atomic_store(&mgr->paused, true); isc__nm_acquire_interlocked_force(mgr); for (size_t i = 0; i < mgr->nworkers; i++) { - isc__netievent_t *event = NULL; - - LOCK(&mgr->workers[i].lock); - mgr->workers[i].paused = true; - UNLOCK(&mgr->workers[i].lock); - - /* - * We have to issue a stop, otherwise the uv_run loop will - * run indefinitely! - */ - event = isc__nm_get_ievent(mgr, netievent_stop); - isc__nm_enqueue_ievent(&mgr->workers[i], event); + isc__networker_t *worker = &mgr->workers[i]; + isc__netievent_t *event = isc__nm_get_ievent(mgr, + netievent_pause); + isc__nm_enqueue_ievent(worker, event); } LOCK(&mgr->lock); - while (atomic_load_relaxed(&mgr->workers_paused) != - atomic_load_relaxed(&mgr->workers_running)) - { + while (mgr->workers_paused != mgr->workers_running) { WAIT(&mgr->wkstatecond, &mgr->lock); } UNLOCK(&mgr->lock); @@ -348,17 +343,19 @@ isc_nm_resume(isc_nm_t *mgr) { REQUIRE(!isc__nm_in_netthread()); for (size_t i = 0; i < mgr->nworkers; i++) { - LOCK(&mgr->workers[i].lock); - mgr->workers[i].paused = false; - SIGNAL(&mgr->workers[i].cond); - UNLOCK(&mgr->workers[i].lock); + isc__networker_t *worker = &mgr->workers[i]; + isc__netievent_t *event = isc__nm_get_ievent(mgr, + netievent_resume); + isc__nm_enqueue_ievent(worker, event); } - isc__nm_drop_interlocked(mgr); - /* - * We're not waiting for all the workers to come back to life; - * they eventually will, we don't care. - */ + LOCK(&mgr->lock); + while (mgr->workers_paused != 0) { + WAIT(&mgr->wkstatecond, &mgr->lock); + } + UNLOCK(&mgr->lock); + + isc__nm_drop_interlocked(mgr); } void @@ -402,6 +399,7 @@ void isc_nm_destroy(isc_nm_t **mgr0) { isc_nm_t *mgr = NULL; int counter = 0; + uint_fast32_t references; REQUIRE(mgr0 != NULL); REQUIRE(VALID_NM(*mgr0)); @@ -416,28 +414,17 @@ isc_nm_destroy(isc_nm_t **mgr0) { /* * Wait for the manager to be dereferenced elsewhere. */ - while (isc_refcount_current(&mgr->references) > 1 && counter++ < 1000) { - /* - * Sometimes libuv gets stuck, pausing and unpausing - * netmgr goes over all events in async queue for all - * the workers, and since it's done only on shutdown it - * doesn't cost us anything. - */ - isc_nm_pause(mgr); - isc_nm_resume(mgr); + while ((references = isc_refcount_current(&mgr->references)) > 1 && + counter++ < 1000) + { #ifdef WIN32 _sleep(10); #else /* ifdef WIN32 */ usleep(10000); #endif /* ifdef WIN32 */ } -#ifdef NETMGR_TRACE - if (!ISC_LIST_EMPTY(mgr->active_sockets)) { - isc__nm_dump_active(mgr); - INSIST(ISC_LIST_EMPTY(mgr->active_sockets)); - } -#endif - INSIST(counter <= 1000); + + INSIST(references == 1); /* * Detach final reference. @@ -492,93 +479,59 @@ isc_nm_tcp_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle, static isc_threadresult_t nm_thread(isc_threadarg_t worker0) { isc__networker_t *worker = (isc__networker_t *)worker0; + isc_nm_t *mgr = worker->mgr; isc__nm_tid_v = worker->id; isc_thread_setaffinity(isc__nm_tid_v); while (true) { int r = uv_run(&worker->loop, UV_RUN_DEFAULT); - bool pausing = false; + /* There's always the async handle until we are done */ + INSIST(r > 0 || worker->finished); - /* - * or there's nothing to do. In the first case - wait - * for condition. In the latter - timedwait - */ - LOCK(&worker->lock); - while (worker->paused) { - LOCK(&worker->mgr->lock); - if (!pausing) { - atomic_fetch_add_explicit( - &worker->mgr->workers_paused, 1, - memory_order_acquire); - pausing = true; - } - - SIGNAL(&worker->mgr->wkstatecond); - UNLOCK(&worker->mgr->lock); - - WAIT(&worker->cond, &worker->lock); - - /* Process priority events */ - process_queue(worker, worker->ievents_prio); - } - if (pausing) { - uint32_t wp = atomic_fetch_sub_explicit( - &worker->mgr->workers_paused, 1, - memory_order_release); - if (wp == 1) { - atomic_store(&worker->mgr->paused, false); - } - } - bool finished = worker->finished; - UNLOCK(&worker->lock); - - if (finished) { - /* - * We need to launch the loop one more time - * in UV_RUN_NOWAIT mode to make sure that - * worker->async is closed, so that we can - * close the loop cleanly. We don't care - * about the callback, as in this case we can - * be certain that uv_run() will eat the event. - * - * XXX: We may need to take steps here to ensure - * that all netmgr handles are freed. + if (worker->paused) { + LOCK(&worker->lock); + /* We need to lock the worker first otherwise + * isc_nm_resume() might slip in before WAIT() in the + * while loop starts and the signal never gets delivered + * and we are forever stuck in the paused loop. */ - uv_close((uv_handle_t *)&worker->async, NULL); - uv_run(&worker->loop, UV_RUN_NOWAIT); - break; + + LOCK(&mgr->lock); + mgr->workers_paused++; + SIGNAL(&mgr->wkstatecond); + UNLOCK(&mgr->lock); + + while (worker->paused) { + WAIT(&worker->cond, &worker->lock); + (void)process_priority_queue(worker); + } + + LOCK(&mgr->lock); + mgr->workers_paused--; + SIGNAL(&mgr->wkstatecond); + UNLOCK(&mgr->lock); + + UNLOCK(&worker->lock); } if (r == 0) { - /* - * XXX: uv_run() in UV_RUN_DEFAULT mode returns - * zero if there are still active uv_handles. - * This shouldn't happen, but if it does, we just - * keep checking until they're done. We nap for a - * tenth of a second on each loop so as not to burn - * CPU. (We do a conditional wait instead, but it - * seems like overkill for this case.) - */ -#ifdef WIN32 - _sleep(100); -#else /* ifdef WIN32 */ - usleep(100000); -#endif /* ifdef WIN32 */ + INSIST(worker->finished); + break; } + INSIST(!worker->finished); + /* * Empty the async queue. */ - process_queue(worker, worker->ievents_prio); - process_queue(worker, worker->ievents); + process_queues(worker); } - LOCK(&worker->mgr->lock); - atomic_fetch_sub_explicit(&worker->mgr->workers_running, 1, - memory_order_relaxed); - SIGNAL(&worker->mgr->wkstatecond); - UNLOCK(&worker->mgr->lock); + LOCK(&mgr->lock); + mgr->workers_running--; + SIGNAL(&mgr->wkstatecond); + UNLOCK(&mgr->lock); return ((isc_threadresult_t)0); } @@ -592,21 +545,64 @@ nm_thread(isc_threadarg_t worker0) { static void async_cb(uv_async_t *handle) { isc__networker_t *worker = (isc__networker_t *)handle->loop->data; - process_queue(worker, worker->ievents_prio); - process_queue(worker, worker->ievents); + process_queues(worker); } static void +isc__nm_async_stopcb(isc__networker_t *worker, isc__netievent_t *ev0) { + UNUSED(ev0); + worker->finished = true; + /* Close the async handler */ + uv_close((uv_handle_t *)&worker->async, NULL); + /* uv_stop(&worker->loop); */ +} + +static void +isc__nm_async_pausecb(isc__networker_t *worker, isc__netievent_t *ev0) { + UNUSED(ev0); + REQUIRE(worker->paused == false); + worker->paused = true; + uv_stop(&worker->loop); +} + +static void +isc__nm_async_resumecb(isc__networker_t *worker, isc__netievent_t *ev0) { + UNUSED(ev0); + REQUIRE(worker->paused == true); + worker->paused = false; +} + +static bool +process_priority_queue(isc__networker_t *worker) { + return (process_queue(worker, worker->ievents_prio)); +} + +static bool +process_normal_queue(isc__networker_t *worker) { + return (process_queue(worker, worker->ievents)); +} + +static void +process_queues(isc__networker_t *worker) { + if (!process_priority_queue(worker)) { + return; + } + (void)process_normal_queue(worker); +} + +static bool process_queue(isc__networker_t *worker, isc_queue_t *queue) { isc__netievent_t *ievent = NULL; + bool more = true; while ((ievent = (isc__netievent_t *)isc_queue_dequeue(queue)) != NULL) { switch (ievent->type) { case netievent_stop: - uv_stop(&worker->loop); - isc_mempool_put(worker->mgr->evpool, ievent); - return; + isc__nm_async_stopcb(worker, ievent); + /* Don't process more ievents when we are stopping */ + more = false; + break; case netievent_udplisten: isc__nm_async_udplisten(worker, ievent); @@ -659,13 +655,26 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) { case netievent_shutdown: isc__nm_async_shutdown(worker, ievent); break; + + case netievent_resume: + isc__nm_async_resumecb(worker, ievent); + break; + case netievent_pause: + isc__nm_async_pausecb(worker, ievent); + /* Don't process more ievents when we are pausing */ + more = false; + break; default: INSIST(0); ISC_UNREACHABLE(); } isc__nm_put_ievent(worker->mgr, ievent); + if (!more) { + break; + } } + return (more); } void *