Merge branch 'ondrej/clear-the-uv-event-loop-before-exiting' into 'main'

Clear the libuv event loop before exiting

See merge request isc-projects/bind9!4181
This commit is contained in:
Ondřej Surý 2020-09-28 10:56:33 +00:00
commit 36195cf797
2 changed files with 142 additions and 132 deletions

View file

@ -153,6 +153,7 @@ typedef enum isc__netievent_type {
netievent_closecb,
netievent_shutdown,
netievent_stop,
netievent_pause,
netievent_prio = 0xff, /* event type values higher than this
* will be treated as high-priority
@ -161,6 +162,7 @@ typedef enum isc__netievent_type {
*/
netievent_udplisten,
netievent_tcplisten,
netievent_resume,
} isc__netievent_type;
typedef union {
@ -297,10 +299,9 @@ struct isc_nm {
isc_mempool_t *evpool;
isc_mutex_t evlock;
atomic_uint_fast32_t workers_running;
atomic_uint_fast32_t workers_paused;
uint_fast32_t workers_running;
uint_fast32_t workers_paused;
atomic_uint_fast32_t maxudp;
atomic_bool paused;
/*
* Active connections are being closed and new connections are

View file

@ -130,8 +130,21 @@ static isc_threadresult_t
nm_thread(isc_threadarg_t worker0);
static void
async_cb(uv_async_t *handle);
static void
static bool
process_queue(isc__networker_t *worker, isc_queue_t *queue);
static bool
process_priority_queue(isc__networker_t *worker);
static bool
process_normal_queue(isc__networker_t *worker);
static void
process_queues(isc__networker_t *worker);
static void
isc__nm_async_stopcb(isc__networker_t *worker, isc__netievent_t *ev0);
static void
isc__nm_async_pausecb(isc__networker_t *worker, isc__netievent_t *ev0);
static void
isc__nm_async_resumecb(isc__networker_t *worker, isc__netievent_t *ev0);
int
isc_nm_tid(void) {
@ -155,10 +168,7 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) {
isc_mutex_init(&mgr->lock);
isc_condition_init(&mgr->wkstatecond);
isc_refcount_init(&mgr->references, 1);
atomic_init(&mgr->workers_running, 0);
atomic_init(&mgr->workers_paused, 0);
atomic_init(&mgr->maxudp, 0);
atomic_init(&mgr->paused, false);
atomic_init(&mgr->interlocked, false);
#ifdef NETMGR_TRACE
@ -218,8 +228,7 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) {
* race - we could exit isc_nm_start, launch nm_destroy,
* and nm_thread would still not be up.
*/
atomic_fetch_add_explicit(&mgr->workers_running, 1,
memory_order_relaxed);
mgr->workers_running++;
isc_thread_create(nm_thread, &mgr->workers[i], &worker->thread);
snprintf(name, sizeof(name), "isc-net-%04zu", i);
@ -246,17 +255,14 @@ nm_destroy(isc_nm_t **mgr0) {
mgr->magic = 0;
for (size_t i = 0; i < mgr->nworkers; i++) {
isc__netievent_t *event = NULL;
LOCK(&mgr->workers[i].lock);
mgr->workers[i].finished = true;
UNLOCK(&mgr->workers[i].lock);
event = isc__nm_get_ievent(mgr, netievent_stop);
isc__nm_enqueue_ievent(&mgr->workers[i], event);
isc__networker_t *worker = &mgr->workers[i];
isc__netievent_t *event = isc__nm_get_ievent(mgr,
netievent_stop);
isc__nm_enqueue_ievent(worker, event);
}
LOCK(&mgr->lock);
while (atomic_load(&mgr->workers_running) > 0) {
while (mgr->workers_running > 0) {
WAIT(&mgr->wkstatecond, &mgr->lock);
}
UNLOCK(&mgr->lock);
@ -315,28 +321,17 @@ isc_nm_pause(isc_nm_t *mgr) {
REQUIRE(VALID_NM(mgr));
REQUIRE(!isc__nm_in_netthread());
atomic_store(&mgr->paused, true);
isc__nm_acquire_interlocked_force(mgr);
for (size_t i = 0; i < mgr->nworkers; i++) {
isc__netievent_t *event = NULL;
LOCK(&mgr->workers[i].lock);
mgr->workers[i].paused = true;
UNLOCK(&mgr->workers[i].lock);
/*
* We have to issue a stop, otherwise the uv_run loop will
* run indefinitely!
*/
event = isc__nm_get_ievent(mgr, netievent_stop);
isc__nm_enqueue_ievent(&mgr->workers[i], event);
isc__networker_t *worker = &mgr->workers[i];
isc__netievent_t *event = isc__nm_get_ievent(mgr,
netievent_pause);
isc__nm_enqueue_ievent(worker, event);
}
LOCK(&mgr->lock);
while (atomic_load_relaxed(&mgr->workers_paused) !=
atomic_load_relaxed(&mgr->workers_running))
{
while (mgr->workers_paused != mgr->workers_running) {
WAIT(&mgr->wkstatecond, &mgr->lock);
}
UNLOCK(&mgr->lock);
@ -348,17 +343,19 @@ isc_nm_resume(isc_nm_t *mgr) {
REQUIRE(!isc__nm_in_netthread());
for (size_t i = 0; i < mgr->nworkers; i++) {
LOCK(&mgr->workers[i].lock);
mgr->workers[i].paused = false;
SIGNAL(&mgr->workers[i].cond);
UNLOCK(&mgr->workers[i].lock);
isc__networker_t *worker = &mgr->workers[i];
isc__netievent_t *event = isc__nm_get_ievent(mgr,
netievent_resume);
isc__nm_enqueue_ievent(worker, event);
}
isc__nm_drop_interlocked(mgr);
/*
* We're not waiting for all the workers to come back to life;
* they eventually will, we don't care.
*/
LOCK(&mgr->lock);
while (mgr->workers_paused != 0) {
WAIT(&mgr->wkstatecond, &mgr->lock);
}
UNLOCK(&mgr->lock);
isc__nm_drop_interlocked(mgr);
}
void
@ -402,6 +399,7 @@ void
isc_nm_destroy(isc_nm_t **mgr0) {
isc_nm_t *mgr = NULL;
int counter = 0;
uint_fast32_t references;
REQUIRE(mgr0 != NULL);
REQUIRE(VALID_NM(*mgr0));
@ -416,28 +414,17 @@ isc_nm_destroy(isc_nm_t **mgr0) {
/*
* Wait for the manager to be dereferenced elsewhere.
*/
while (isc_refcount_current(&mgr->references) > 1 && counter++ < 1000) {
/*
* Sometimes libuv gets stuck, pausing and unpausing
* netmgr goes over all events in async queue for all
* the workers, and since it's done only on shutdown it
* doesn't cost us anything.
*/
isc_nm_pause(mgr);
isc_nm_resume(mgr);
while ((references = isc_refcount_current(&mgr->references)) > 1 &&
counter++ < 1000)
{
#ifdef WIN32
_sleep(10);
#else /* ifdef WIN32 */
usleep(10000);
#endif /* ifdef WIN32 */
}
#ifdef NETMGR_TRACE
if (!ISC_LIST_EMPTY(mgr->active_sockets)) {
isc__nm_dump_active(mgr);
INSIST(ISC_LIST_EMPTY(mgr->active_sockets));
}
#endif
INSIST(counter <= 1000);
INSIST(references == 1);
/*
* Detach final reference.
@ -492,93 +479,59 @@ isc_nm_tcp_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle,
static isc_threadresult_t
nm_thread(isc_threadarg_t worker0) {
isc__networker_t *worker = (isc__networker_t *)worker0;
isc_nm_t *mgr = worker->mgr;
isc__nm_tid_v = worker->id;
isc_thread_setaffinity(isc__nm_tid_v);
while (true) {
int r = uv_run(&worker->loop, UV_RUN_DEFAULT);
bool pausing = false;
/* There's always the async handle until we are done */
INSIST(r > 0 || worker->finished);
/*
* or there's nothing to do. In the first case - wait
* for condition. In the latter - timedwait
*/
LOCK(&worker->lock);
while (worker->paused) {
LOCK(&worker->mgr->lock);
if (!pausing) {
atomic_fetch_add_explicit(
&worker->mgr->workers_paused, 1,
memory_order_acquire);
pausing = true;
}
SIGNAL(&worker->mgr->wkstatecond);
UNLOCK(&worker->mgr->lock);
WAIT(&worker->cond, &worker->lock);
/* Process priority events */
process_queue(worker, worker->ievents_prio);
}
if (pausing) {
uint32_t wp = atomic_fetch_sub_explicit(
&worker->mgr->workers_paused, 1,
memory_order_release);
if (wp == 1) {
atomic_store(&worker->mgr->paused, false);
}
}
bool finished = worker->finished;
UNLOCK(&worker->lock);
if (finished) {
/*
* We need to launch the loop one more time
* in UV_RUN_NOWAIT mode to make sure that
* worker->async is closed, so that we can
* close the loop cleanly. We don't care
* about the callback, as in this case we can
* be certain that uv_run() will eat the event.
*
* XXX: We may need to take steps here to ensure
* that all netmgr handles are freed.
if (worker->paused) {
LOCK(&worker->lock);
/* We need to lock the worker first otherwise
* isc_nm_resume() might slip in before WAIT() in the
* while loop starts and the signal never gets delivered
* and we are forever stuck in the paused loop.
*/
uv_close((uv_handle_t *)&worker->async, NULL);
uv_run(&worker->loop, UV_RUN_NOWAIT);
break;
LOCK(&mgr->lock);
mgr->workers_paused++;
SIGNAL(&mgr->wkstatecond);
UNLOCK(&mgr->lock);
while (worker->paused) {
WAIT(&worker->cond, &worker->lock);
(void)process_priority_queue(worker);
}
LOCK(&mgr->lock);
mgr->workers_paused--;
SIGNAL(&mgr->wkstatecond);
UNLOCK(&mgr->lock);
UNLOCK(&worker->lock);
}
if (r == 0) {
/*
* XXX: uv_run() in UV_RUN_DEFAULT mode returns
* zero if there are still active uv_handles.
* This shouldn't happen, but if it does, we just
* keep checking until they're done. We nap for a
* tenth of a second on each loop so as not to burn
* CPU. (We do a conditional wait instead, but it
* seems like overkill for this case.)
*/
#ifdef WIN32
_sleep(100);
#else /* ifdef WIN32 */
usleep(100000);
#endif /* ifdef WIN32 */
INSIST(worker->finished);
break;
}
INSIST(!worker->finished);
/*
* Empty the async queue.
*/
process_queue(worker, worker->ievents_prio);
process_queue(worker, worker->ievents);
process_queues(worker);
}
LOCK(&worker->mgr->lock);
atomic_fetch_sub_explicit(&worker->mgr->workers_running, 1,
memory_order_relaxed);
SIGNAL(&worker->mgr->wkstatecond);
UNLOCK(&worker->mgr->lock);
LOCK(&mgr->lock);
mgr->workers_running--;
SIGNAL(&mgr->wkstatecond);
UNLOCK(&mgr->lock);
return ((isc_threadresult_t)0);
}
@ -592,21 +545,64 @@ nm_thread(isc_threadarg_t worker0) {
static void
async_cb(uv_async_t *handle) {
isc__networker_t *worker = (isc__networker_t *)handle->loop->data;
process_queue(worker, worker->ievents_prio);
process_queue(worker, worker->ievents);
process_queues(worker);
}
static void
isc__nm_async_stopcb(isc__networker_t *worker, isc__netievent_t *ev0) {
UNUSED(ev0);
worker->finished = true;
/* Close the async handler */
uv_close((uv_handle_t *)&worker->async, NULL);
/* uv_stop(&worker->loop); */
}
static void
isc__nm_async_pausecb(isc__networker_t *worker, isc__netievent_t *ev0) {
UNUSED(ev0);
REQUIRE(worker->paused == false);
worker->paused = true;
uv_stop(&worker->loop);
}
static void
isc__nm_async_resumecb(isc__networker_t *worker, isc__netievent_t *ev0) {
UNUSED(ev0);
REQUIRE(worker->paused == true);
worker->paused = false;
}
static bool
process_priority_queue(isc__networker_t *worker) {
return (process_queue(worker, worker->ievents_prio));
}
static bool
process_normal_queue(isc__networker_t *worker) {
return (process_queue(worker, worker->ievents));
}
static void
process_queues(isc__networker_t *worker) {
if (!process_priority_queue(worker)) {
return;
}
(void)process_normal_queue(worker);
}
static bool
process_queue(isc__networker_t *worker, isc_queue_t *queue) {
isc__netievent_t *ievent = NULL;
bool more = true;
while ((ievent = (isc__netievent_t *)isc_queue_dequeue(queue)) != NULL)
{
switch (ievent->type) {
case netievent_stop:
uv_stop(&worker->loop);
isc_mempool_put(worker->mgr->evpool, ievent);
return;
isc__nm_async_stopcb(worker, ievent);
/* Don't process more ievents when we are stopping */
more = false;
break;
case netievent_udplisten:
isc__nm_async_udplisten(worker, ievent);
@ -659,13 +655,26 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
case netievent_shutdown:
isc__nm_async_shutdown(worker, ievent);
break;
case netievent_resume:
isc__nm_async_resumecb(worker, ievent);
break;
case netievent_pause:
isc__nm_async_pausecb(worker, ievent);
/* Don't process more ievents when we are pausing */
more = false;
break;
default:
INSIST(0);
ISC_UNREACHABLE();
}
isc__nm_put_ievent(worker->mgr, ievent);
if (!more) {
break;
}
}
return (more);
}
void *