diff --git a/lib/isc/include/isc/result.h b/lib/isc/include/isc/result.h index 22969775df..21071c7dad 100644 --- a/lib/isc/include/isc/result.h +++ b/lib/isc/include/isc/result.h @@ -64,7 +64,7 @@ #define ISC_R_MULTICAST 43 /*%< invalid use of multicast */ #define ISC_R_NOTFILE 44 /*%< not a file */ #define ISC_R_NOTDIRECTORY 45 /*%< not a directory */ -#define ISC_R_QUEUEFULL 46 /*%< queue is full */ +#define ISC_R_EMPTY 46 /*%< queue is empty */ #define ISC_R_FAMILYMISMATCH 47 /*%< address family mismatch */ #define ISC_R_FAMILYNOSUPPORT 48 /*%< AF not supported */ #define ISC_R_BADHEX 49 /*%< bad hex encoding */ diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index f131400152..0ad9bd42c8 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -40,8 +40,6 @@ #include "uv-compat.h" -#define ISC_NETMGR_QUANTUM_DEFAULT 1024 - #define ISC_NETMGR_TID_UNKNOWN -1 /* Must be different from ISC_NETMGR_TID_UNKNOWN */ @@ -165,6 +163,17 @@ isc__nm_dump_active(isc_nm_t *nm); #define isc__nmsocket_prep_destroy(sock) isc___nmsocket_prep_destroy(sock) #endif +/* + * Queue types in the order of processing priority. + */ +typedef enum { + NETIEVENT_PRIORITY = 0, + NETIEVENT_PRIVILEGED = 1, + NETIEVENT_TASK = 2, + NETIEVENT_NORMAL = 3, + NETIEVENT_MAX = 4, +} netievent_type_t; + /* * Single network event loop worker. */ @@ -178,13 +187,8 @@ typedef struct isc__networker { bool paused; bool finished; isc_thread_t thread; - isc_queue_t *ievents; /* incoming async events */ - isc_queue_t *ievents_priv; /* privileged async tasks */ - isc_queue_t *ievents_task; /* async tasks */ - isc_queue_t *ievents_prio; /* priority async events - * used for listening etc. - * can be processed while - * worker is paused */ + isc_queue_t *ievents[NETIEVENT_MAX]; + atomic_uint_fast32_t nievents[NETIEVENT_MAX]; isc_condition_t cond_prio; isc_refcount_t references; @@ -192,7 +196,6 @@ typedef struct isc__networker { char *recvbuf; char *sendbuf; bool recvbuf_inuse; - unsigned int quantum; } isc__networker_t; /* diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index ac2f38992a..f9affb2c04 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -137,28 +137,57 @@ static void async_cb(uv_async_t *handle); static bool process_netievent(isc__networker_t *worker, isc__netievent_t *ievent); -static bool -process_queue(isc__networker_t *worker, isc_queue_t *queue, - unsigned int *quantump); +static isc_result_t +process_queue(isc__networker_t *worker, netievent_type_t type); static void wait_for_priority_queue(isc__networker_t *worker); -static bool -process_priority_queue(isc__networker_t *worker, unsigned int *quantump); -static bool -process_privilege_queue(isc__networker_t *worker, unsigned int *quantump); -static bool -process_task_queue(isc__networker_t *worker, unsigned int *quantump); -static bool -process_normal_queue(isc__networker_t *worker, unsigned int *quantump); +static void +drain_queue(isc__networker_t *worker, netievent_type_t type); -#define drain_priority_queue(worker) \ - (void)process_priority_queue(worker, &(unsigned int){ UINT_MAX }) -#define drain_privilege_queue(worker) \ - (void)process_privilege_queue(worker, &(unsigned int){ UINT_MAX }) -#define drain_task_queue(worker) \ - (void)process_task_queue(worker, &(unsigned int){ UINT_MAX }) -#define drain_normal_queue(worker) \ - (void)process_normal_queue(worker, &(unsigned int){ UINT_MAX }) +#define ENQUEUE_NETIEVENT(worker, queue, event) \ + isc_queue_enqueue(worker->ievents[queue], (uintptr_t)event) +#define DEQUEUE_NETIEVENT(worker, queue) \ + (isc__netievent_t *)isc_queue_dequeue(worker->ievents[queue]) + +#define ENQUEUE_PRIORITY_NETIEVENT(worker, event) \ + ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY, event) +#define ENQUEUE_PRIVILEGED_NETIEVENT(worker, event) \ + ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED, event) +#define ENQUEUE_TASK_NETIEVENT(worker, event) \ + ENQUEUE_NETIEVENT(worker, NETIEVENT_TASK, event) +#define ENQUEUE_NORMAL_NETIEVENT(worker, event) \ + ENQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL, event) + +#define DEQUEUE_PRIORITY_NETIEVENT(worker) \ + DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY) +#define DEQUEUE_PRIVILEGED_NETIEVENT(worker) \ + DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED) +#define DEQUEUE_TASK_NETIEVENT(worker) DEQUEUE_NETIEVENT(worker, NETIEVENT_TASK) +#define DEQUEUE_NORMAL_NETIEVENT(worker) \ + DEQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL) + +#define INCREMENT_NETIEVENT(worker, queue) \ + atomic_fetch_add_release(&worker->nievents[queue], 1) +#define DECREMENT_NETIEVENT(worker, queue) \ + atomic_fetch_sub_release(&worker->nievents[queue], 1) + +#define INCREMENT_PRIORITY_NETIEVENT(worker) \ + INCREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY) +#define INCREMENT_PRIVILEGED_NETIEVENT(worker) \ + INCREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED) +#define INCREMENT_TASK_NETIEVENT(worker) \ + INCREMENT_NETIEVENT(worker, NETIEVENT_TASK) +#define INCREMENT_NORMAL_NETIEVENT(worker) \ + INCREMENT_NETIEVENT(worker, NETIEVENT_NORMAL) + +#define DECREMENT_PRIORITY_NETIEVENT(worker) \ + DECREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY) +#define DECREMENT_PRIVILEGED_NETIEVENT(worker) \ + DECREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED) +#define DECREMENT_TASK_NETIEVENT(worker) \ + DECREMENT_NETIEVENT(worker, NETIEVENT_TASK) +#define DECREMENT_NORMAL_NETIEVENT(worker) \ + DECREMENT_NETIEVENT(worker, NETIEVENT_NORMAL) static void isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0); @@ -283,7 +312,6 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) { *worker = (isc__networker_t){ .mgr = mgr, .id = i, - .quantum = ISC_NETMGR_QUANTUM_DEFAULT, }; r = uv_loop_init(&worker->loop); @@ -296,11 +324,10 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) { isc_mutex_init(&worker->lock); - worker->ievents = isc_queue_new(mgr->mctx, 128); - worker->ievents_task = isc_queue_new(mgr->mctx, 128); - worker->ievents_priv = isc_queue_new(mgr->mctx, 128); - worker->ievents_prio = isc_queue_new(mgr->mctx, 128); - isc_condition_init(&worker->cond_prio); + for (size_t type = 0; type < NETIEVENT_MAX; type++) { + worker->ievents[type] = isc_queue_new(mgr->mctx, 128); + atomic_init(&worker->nievents[type], 0); + } worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE); worker->sendbuf = isc_mem_get(mctx, ISC_NETMGR_SENDBUF_SIZE); @@ -354,20 +381,14 @@ nm_destroy(isc_nm_t **mgr0) { int r; /* Empty the async event queues */ - while ((ievent = (isc__netievent_t *)isc_queue_dequeue( - worker->ievents)) != NULL) - { + while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) { isc_mempool_put(mgr->evpool, ievent); } - INSIST(isc_queue_dequeue(worker->ievents_priv) == - (uintptr_t)NULL); - INSIST(isc_queue_dequeue(worker->ievents_task) == - (uintptr_t)NULL); + INSIST(DEQUEUE_PRIVILEGED_NETIEVENT(worker) == NULL); + INSIST(DEQUEUE_TASK_NETIEVENT(worker) == NULL); - while ((ievent = (isc__netievent_t *)isc_queue_dequeue( - worker->ievents_prio)) != NULL) - { + while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) { isc_mempool_put(mgr->evpool, ievent); } isc_condition_destroy(&worker->cond_prio); @@ -375,11 +396,9 @@ nm_destroy(isc_nm_t **mgr0) { r = uv_loop_close(&worker->loop); INSIST(r == 0); - isc_queue_destroy(worker->ievents); - isc_queue_destroy(worker->ievents_priv); - isc_queue_destroy(worker->ievents_task); - isc_queue_destroy(worker->ievents_prio); - isc_mutex_destroy(&worker->lock); + for (size_t type = 0; type < NETIEVENT_MAX; type++) { + isc_queue_destroy(worker->ievents[type]); + } isc_mem_put(mgr->mctx, worker->sendbuf, ISC_NETMGR_SENDBUF_SIZE); @@ -487,7 +506,7 @@ isc_nm_resume(isc_nm_t *mgr) { if (isc__nm_in_netthread()) { REQUIRE(isc_nm_tid() == 0); - drain_priority_queue(&mgr->workers[isc_nm_tid()]); + drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIORITY); } for (int i = 0; i < mgr->nworkers; i++) { @@ -500,7 +519,7 @@ isc_nm_resume(isc_nm_t *mgr) { } if (isc__nm_in_netthread()) { - drain_privilege_queue(&mgr->workers[isc_nm_tid()]); + drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIVILEGED); atomic_fetch_sub(&mgr->workers_paused, 1); isc_barrier_wait(&mgr->resuming); @@ -711,7 +730,7 @@ nm_thread(isc_threadarg_t worker0) { * All workers must drain the privileged event * queue before we resume from pause. */ - drain_privilege_queue(worker); + drain_queue(worker, NETIEVENT_PRIVILEGED); atomic_fetch_sub(&mgr->workers_paused, 1); if (isc_barrier_wait(&mgr->resuming) != 0) { @@ -734,8 +753,8 @@ nm_thread(isc_threadarg_t worker0) { * (they may include shutdown events) but do not process * the netmgr event queue. */ - drain_privilege_queue(worker); - drain_task_queue(worker); + drain_queue(worker, NETIEVENT_PRIVILEGED); + drain_queue(worker, NETIEVENT_TASK); LOCK(&mgr->lock); mgr->workers_running--; @@ -746,20 +765,32 @@ nm_thread(isc_threadarg_t worker0) { } static bool -process_all_queues(isc__networker_t *worker, unsigned int quantum) { +process_all_queues(isc__networker_t *worker) { + bool reschedule = false; /* * The queue processing functions will return false when the - * system is pausing or stopping, or if we have completed - * 'quantum' events. - * - * We don't want to proceed to a new queue until the previous one - * has been fully drained, so whenever one queue is interrupted, - * we skip all the later ones. + * system is pausing or stopping and we don't want to process + * the other queues in such case, but we need the async event + * to be rescheduled in the next uv_run(). */ - return (process_priority_queue(worker, &quantum) && - process_privilege_queue(worker, &quantum) && - process_task_queue(worker, &quantum) && - process_normal_queue(worker, &quantum)); + for (size_t type = 0; type < NETIEVENT_MAX; type++) { + isc_result_t result = process_queue(worker, type); + switch (result) { + case ISC_R_SUSPEND: + return (true); + case ISC_R_EMPTY: + /* empty queue */ + break; + case ISC_R_SUCCESS: + reschedule = true; + break; + default: + INSIST(0); + ISC_UNREACHABLE(); + } + } + + return (reschedule); } /* @@ -771,9 +802,8 @@ process_all_queues(isc__networker_t *worker, unsigned int quantum) { static void async_cb(uv_async_t *handle) { isc__networker_t *worker = (isc__networker_t *)handle->loop->data; - unsigned int quantum = worker->quantum; - if (!process_all_queues(worker, quantum)) { + if (process_all_queues(worker)) { /* If we didn't process all the events, we need to enqueue * async_cb to be run in the next iteration of the uv_loop */ @@ -840,19 +870,17 @@ isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) { static void wait_for_priority_queue(isc__networker_t *worker) { - isc_queue_t *queue = worker->ievents_prio; isc_condition_t *cond = &worker->cond_prio; bool wait_for_work = true; while (true) { isc__netievent_t *ievent; LOCK(&worker->lock); - ievent = (isc__netievent_t *)isc_queue_dequeue(queue); + ievent = DEQUEUE_PRIORITY_NETIEVENT(worker); if (wait_for_work) { while (ievent == NULL) { WAIT(cond, &worker->lock); - ievent = (isc__netievent_t *)isc_queue_dequeue( - queue); + ievent = DEQUEUE_PRIORITY_NETIEVENT(worker); } } UNLOCK(&worker->lock); @@ -861,29 +889,17 @@ wait_for_priority_queue(isc__networker_t *worker) { if (ievent == NULL) { return; } + DECREMENT_PRIORITY_NETIEVENT(worker); (void)process_netievent(worker, ievent); } } -static bool -process_priority_queue(isc__networker_t *worker, unsigned int *quantump) { - return (process_queue(worker, worker->ievents_prio, quantump)); -} - -static bool -process_privilege_queue(isc__networker_t *worker, unsigned int *quantump) { - return (process_queue(worker, worker->ievents_priv, quantump)); -} - -static bool -process_task_queue(isc__networker_t *worker, unsigned int *quantump) { - return (process_queue(worker, worker->ievents_task, quantump)); -} - -static bool -process_normal_queue(isc__networker_t *worker, unsigned int *quantump) { - return (process_queue(worker, worker->ievents, quantump)); +static void +drain_queue(isc__networker_t *worker, netievent_type_t type) { + while (process_queue(worker, type) != ISC_R_EMPTY) { + ; + } } /* @@ -984,28 +1000,46 @@ process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) { return (true); } -static bool -process_queue(isc__networker_t *worker, isc_queue_t *queue, - unsigned int *quantump) { - while (*quantump > 0) { - isc__netievent_t *ievent = - (isc__netievent_t *)isc_queue_dequeue(queue); +static isc_result_t +process_queue(isc__networker_t *worker, netievent_type_t type) { + /* + * The number of items on the queue is only loosely synchronized with + * the items on the queue. But there's a guarantee that if there's an + * item on the queue, it will be accounted for. However there's a + * possibility that the counter might be higher than the items on the + * queue stored. + */ + uint_fast32_t waiting = atomic_load_acquire(&worker->nievents[type]); + isc__netievent_t *ievent = DEQUEUE_NETIEVENT(worker, type); - if (ievent == NULL) { - /* We fully drained this queue */ - return (true); - } - - (*quantump)--; - - if (!process_netievent(worker, ievent)) { - /* Netievent told us to stop */ - return (false); - } + if (ievent == NULL && waiting == 0) { + /* There's nothing scheduled */ + return (ISC_R_EMPTY); + } else if (ievent == NULL) { + /* There's at least one item scheduled, but not on the queue yet + */ + return (ISC_R_SUCCESS); } - /* No more quantum */ - return (false); + while (ievent != NULL) { + DECREMENT_NETIEVENT(worker, type); + bool stop = !process_netievent(worker, ievent); + + if (stop) { + /* Netievent told us to stop */ + return (ISC_R_SUSPEND); + } + + if (waiting-- == 0) { + /* We reached this round "quota" */ + break; + } + + ievent = DEQUEUE_NETIEVENT(worker, type); + } + + /* We processed at least one */ + return (ISC_R_SUCCESS); } void * @@ -1107,15 +1141,19 @@ isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) { * the queue will be processed. */ LOCK(&worker->lock); - isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event); + INCREMENT_PRIORITY_NETIEVENT(worker); + ENQUEUE_PRIORITY_NETIEVENT(worker, event); SIGNAL(&worker->cond_prio); UNLOCK(&worker->lock); } else if (event->type == netievent_privilegedtask) { - isc_queue_enqueue(worker->ievents_priv, (uintptr_t)event); + INCREMENT_PRIVILEGED_NETIEVENT(worker); + ENQUEUE_PRIVILEGED_NETIEVENT(worker, event); } else if (event->type == netievent_task) { - isc_queue_enqueue(worker->ievents_task, (uintptr_t)event); + INCREMENT_TASK_NETIEVENT(worker); + ENQUEUE_TASK_NETIEVENT(worker, event); } else { - isc_queue_enqueue(worker->ievents, (uintptr_t)event); + INCREMENT_NORMAL_NETIEVENT(worker); + ENQUEUE_NORMAL_NETIEVENT(worker, event); } uv_async_send(&worker->async); } diff --git a/lib/isc/result.c b/lib/isc/result.c index 565e8406d3..72e7a3c28e 100644 --- a/lib/isc/result.c +++ b/lib/isc/result.c @@ -77,7 +77,7 @@ static const char *description[ISC_R_NRESULTS] = { "invalid use of multicast address", /*%< 43 */ "not a file", /*%< 44 */ "not a directory", /*%< 45 */ - "queue is full", /*%< 46 */ + "queue is empty", /*%< 46 */ "address family mismatch", /*%< 47 */ "address family not supported", /*%< 48 */ "bad hex encoding", /*%< 49 */ @@ -151,7 +151,7 @@ static const char *identifier[ISC_R_NRESULTS] = { "ISC_R_SUCCESS", "ISC_R_MULTICAST", "ISC_R_NOTFILE", "ISC_R_NOTDIRECTORY", - "ISC_R_QUEUEFULL", + "ISC_R_EMPTY", "ISC_R_FAMILYMISMATCH", "ISC_R_FAMILYNOSUPPORT", "ISC_R_BADHEX",