Replace netmgr quantum with loop-preventing barrier

Instead of using fixed quantum, this commit adds atomic counter for
number of items on each queue and uses the number of netievents
scheduled to run as the limit of maximum number of netievents for a
single process_queue() run.

This prevents the endless loops when the netievent would schedule more
netievents onto the same loop, but we don't have to pick "magic" number
for the quantum.
This commit is contained in:
Ondřej Surý 2021-05-07 12:58:40 +02:00
parent 9e21b80cbd
commit 9e3cb396b2
4 changed files with 157 additions and 116 deletions

View file

@ -64,7 +64,7 @@
#define ISC_R_MULTICAST 43 /*%< invalid use of multicast */
#define ISC_R_NOTFILE 44 /*%< not a file */
#define ISC_R_NOTDIRECTORY 45 /*%< not a directory */
#define ISC_R_QUEUEFULL 46 /*%< queue is full */
#define ISC_R_EMPTY 46 /*%< queue is empty */
#define ISC_R_FAMILYMISMATCH 47 /*%< address family mismatch */
#define ISC_R_FAMILYNOSUPPORT 48 /*%< AF not supported */
#define ISC_R_BADHEX 49 /*%< bad hex encoding */

View file

@ -40,8 +40,6 @@
#include "uv-compat.h"
#define ISC_NETMGR_QUANTUM_DEFAULT 1024
#define ISC_NETMGR_TID_UNKNOWN -1
/* Must be different from ISC_NETMGR_TID_UNKNOWN */
@ -165,6 +163,17 @@ isc__nm_dump_active(isc_nm_t *nm);
#define isc__nmsocket_prep_destroy(sock) isc___nmsocket_prep_destroy(sock)
#endif
/*
* Queue types in the order of processing priority.
*/
typedef enum {
NETIEVENT_PRIORITY = 0,
NETIEVENT_PRIVILEGED = 1,
NETIEVENT_TASK = 2,
NETIEVENT_NORMAL = 3,
NETIEVENT_MAX = 4,
} netievent_type_t;
/*
* Single network event loop worker.
*/
@ -178,13 +187,8 @@ typedef struct isc__networker {
bool paused;
bool finished;
isc_thread_t thread;
isc_queue_t *ievents; /* incoming async events */
isc_queue_t *ievents_priv; /* privileged async tasks */
isc_queue_t *ievents_task; /* async tasks */
isc_queue_t *ievents_prio; /* priority async events
* used for listening etc.
* can be processed while
* worker is paused */
isc_queue_t *ievents[NETIEVENT_MAX];
atomic_uint_fast32_t nievents[NETIEVENT_MAX];
isc_condition_t cond_prio;
isc_refcount_t references;
@ -192,7 +196,6 @@ typedef struct isc__networker {
char *recvbuf;
char *sendbuf;
bool recvbuf_inuse;
unsigned int quantum;
} isc__networker_t;
/*

View file

@ -137,28 +137,57 @@ static void
async_cb(uv_async_t *handle);
static bool
process_netievent(isc__networker_t *worker, isc__netievent_t *ievent);
static bool
process_queue(isc__networker_t *worker, isc_queue_t *queue,
unsigned int *quantump);
static isc_result_t
process_queue(isc__networker_t *worker, netievent_type_t type);
static void
wait_for_priority_queue(isc__networker_t *worker);
static bool
process_priority_queue(isc__networker_t *worker, unsigned int *quantump);
static bool
process_privilege_queue(isc__networker_t *worker, unsigned int *quantump);
static bool
process_task_queue(isc__networker_t *worker, unsigned int *quantump);
static bool
process_normal_queue(isc__networker_t *worker, unsigned int *quantump);
static void
drain_queue(isc__networker_t *worker, netievent_type_t type);
#define drain_priority_queue(worker) \
(void)process_priority_queue(worker, &(unsigned int){ UINT_MAX })
#define drain_privilege_queue(worker) \
(void)process_privilege_queue(worker, &(unsigned int){ UINT_MAX })
#define drain_task_queue(worker) \
(void)process_task_queue(worker, &(unsigned int){ UINT_MAX })
#define drain_normal_queue(worker) \
(void)process_normal_queue(worker, &(unsigned int){ UINT_MAX })
#define ENQUEUE_NETIEVENT(worker, queue, event) \
isc_queue_enqueue(worker->ievents[queue], (uintptr_t)event)
#define DEQUEUE_NETIEVENT(worker, queue) \
(isc__netievent_t *)isc_queue_dequeue(worker->ievents[queue])
#define ENQUEUE_PRIORITY_NETIEVENT(worker, event) \
ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY, event)
#define ENQUEUE_PRIVILEGED_NETIEVENT(worker, event) \
ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED, event)
#define ENQUEUE_TASK_NETIEVENT(worker, event) \
ENQUEUE_NETIEVENT(worker, NETIEVENT_TASK, event)
#define ENQUEUE_NORMAL_NETIEVENT(worker, event) \
ENQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL, event)
#define DEQUEUE_PRIORITY_NETIEVENT(worker) \
DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY)
#define DEQUEUE_PRIVILEGED_NETIEVENT(worker) \
DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
#define DEQUEUE_TASK_NETIEVENT(worker) DEQUEUE_NETIEVENT(worker, NETIEVENT_TASK)
#define DEQUEUE_NORMAL_NETIEVENT(worker) \
DEQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL)
#define INCREMENT_NETIEVENT(worker, queue) \
atomic_fetch_add_release(&worker->nievents[queue], 1)
#define DECREMENT_NETIEVENT(worker, queue) \
atomic_fetch_sub_release(&worker->nievents[queue], 1)
#define INCREMENT_PRIORITY_NETIEVENT(worker) \
INCREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY)
#define INCREMENT_PRIVILEGED_NETIEVENT(worker) \
INCREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
#define INCREMENT_TASK_NETIEVENT(worker) \
INCREMENT_NETIEVENT(worker, NETIEVENT_TASK)
#define INCREMENT_NORMAL_NETIEVENT(worker) \
INCREMENT_NETIEVENT(worker, NETIEVENT_NORMAL)
#define DECREMENT_PRIORITY_NETIEVENT(worker) \
DECREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY)
#define DECREMENT_PRIVILEGED_NETIEVENT(worker) \
DECREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
#define DECREMENT_TASK_NETIEVENT(worker) \
DECREMENT_NETIEVENT(worker, NETIEVENT_TASK)
#define DECREMENT_NORMAL_NETIEVENT(worker) \
DECREMENT_NETIEVENT(worker, NETIEVENT_NORMAL)
static void
isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0);
@ -283,7 +312,6 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
*worker = (isc__networker_t){
.mgr = mgr,
.id = i,
.quantum = ISC_NETMGR_QUANTUM_DEFAULT,
};
r = uv_loop_init(&worker->loop);
@ -296,11 +324,10 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
isc_mutex_init(&worker->lock);
worker->ievents = isc_queue_new(mgr->mctx, 128);
worker->ievents_task = isc_queue_new(mgr->mctx, 128);
worker->ievents_priv = isc_queue_new(mgr->mctx, 128);
worker->ievents_prio = isc_queue_new(mgr->mctx, 128);
isc_condition_init(&worker->cond_prio);
for (size_t type = 0; type < NETIEVENT_MAX; type++) {
worker->ievents[type] = isc_queue_new(mgr->mctx, 128);
atomic_init(&worker->nievents[type], 0);
}
worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE);
worker->sendbuf = isc_mem_get(mctx, ISC_NETMGR_SENDBUF_SIZE);
@ -354,20 +381,14 @@ nm_destroy(isc_nm_t **mgr0) {
int r;
/* Empty the async event queues */
while ((ievent = (isc__netievent_t *)isc_queue_dequeue(
worker->ievents)) != NULL)
{
while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) {
isc_mempool_put(mgr->evpool, ievent);
}
INSIST(isc_queue_dequeue(worker->ievents_priv) ==
(uintptr_t)NULL);
INSIST(isc_queue_dequeue(worker->ievents_task) ==
(uintptr_t)NULL);
INSIST(DEQUEUE_PRIVILEGED_NETIEVENT(worker) == NULL);
INSIST(DEQUEUE_TASK_NETIEVENT(worker) == NULL);
while ((ievent = (isc__netievent_t *)isc_queue_dequeue(
worker->ievents_prio)) != NULL)
{
while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) {
isc_mempool_put(mgr->evpool, ievent);
}
isc_condition_destroy(&worker->cond_prio);
@ -375,11 +396,9 @@ nm_destroy(isc_nm_t **mgr0) {
r = uv_loop_close(&worker->loop);
INSIST(r == 0);
isc_queue_destroy(worker->ievents);
isc_queue_destroy(worker->ievents_priv);
isc_queue_destroy(worker->ievents_task);
isc_queue_destroy(worker->ievents_prio);
isc_mutex_destroy(&worker->lock);
for (size_t type = 0; type < NETIEVENT_MAX; type++) {
isc_queue_destroy(worker->ievents[type]);
}
isc_mem_put(mgr->mctx, worker->sendbuf,
ISC_NETMGR_SENDBUF_SIZE);
@ -487,7 +506,7 @@ isc_nm_resume(isc_nm_t *mgr) {
if (isc__nm_in_netthread()) {
REQUIRE(isc_nm_tid() == 0);
drain_priority_queue(&mgr->workers[isc_nm_tid()]);
drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIORITY);
}
for (int i = 0; i < mgr->nworkers; i++) {
@ -500,7 +519,7 @@ isc_nm_resume(isc_nm_t *mgr) {
}
if (isc__nm_in_netthread()) {
drain_privilege_queue(&mgr->workers[isc_nm_tid()]);
drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIVILEGED);
atomic_fetch_sub(&mgr->workers_paused, 1);
isc_barrier_wait(&mgr->resuming);
@ -711,7 +730,7 @@ nm_thread(isc_threadarg_t worker0) {
* All workers must drain the privileged event
* queue before we resume from pause.
*/
drain_privilege_queue(worker);
drain_queue(worker, NETIEVENT_PRIVILEGED);
atomic_fetch_sub(&mgr->workers_paused, 1);
if (isc_barrier_wait(&mgr->resuming) != 0) {
@ -734,8 +753,8 @@ nm_thread(isc_threadarg_t worker0) {
* (they may include shutdown events) but do not process
* the netmgr event queue.
*/
drain_privilege_queue(worker);
drain_task_queue(worker);
drain_queue(worker, NETIEVENT_PRIVILEGED);
drain_queue(worker, NETIEVENT_TASK);
LOCK(&mgr->lock);
mgr->workers_running--;
@ -746,20 +765,32 @@ nm_thread(isc_threadarg_t worker0) {
}
static bool
process_all_queues(isc__networker_t *worker, unsigned int quantum) {
process_all_queues(isc__networker_t *worker) {
bool reschedule = false;
/*
* The queue processing functions will return false when the
* system is pausing or stopping, or if we have completed
* 'quantum' events.
*
* We don't want to proceed to a new queue until the previous one
* has been fully drained, so whenever one queue is interrupted,
* we skip all the later ones.
* system is pausing or stopping and we don't want to process
* the other queues in such case, but we need the async event
* to be rescheduled in the next uv_run().
*/
return (process_priority_queue(worker, &quantum) &&
process_privilege_queue(worker, &quantum) &&
process_task_queue(worker, &quantum) &&
process_normal_queue(worker, &quantum));
for (size_t type = 0; type < NETIEVENT_MAX; type++) {
isc_result_t result = process_queue(worker, type);
switch (result) {
case ISC_R_SUSPEND:
return (true);
case ISC_R_EMPTY:
/* empty queue */
break;
case ISC_R_SUCCESS:
reschedule = true;
break;
default:
INSIST(0);
ISC_UNREACHABLE();
}
}
return (reschedule);
}
/*
@ -771,9 +802,8 @@ process_all_queues(isc__networker_t *worker, unsigned int quantum) {
static void
async_cb(uv_async_t *handle) {
isc__networker_t *worker = (isc__networker_t *)handle->loop->data;
unsigned int quantum = worker->quantum;
if (!process_all_queues(worker, quantum)) {
if (process_all_queues(worker)) {
/* If we didn't process all the events, we need to enqueue
* async_cb to be run in the next iteration of the uv_loop
*/
@ -840,19 +870,17 @@ isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) {
static void
wait_for_priority_queue(isc__networker_t *worker) {
isc_queue_t *queue = worker->ievents_prio;
isc_condition_t *cond = &worker->cond_prio;
bool wait_for_work = true;
while (true) {
isc__netievent_t *ievent;
LOCK(&worker->lock);
ievent = (isc__netievent_t *)isc_queue_dequeue(queue);
ievent = DEQUEUE_PRIORITY_NETIEVENT(worker);
if (wait_for_work) {
while (ievent == NULL) {
WAIT(cond, &worker->lock);
ievent = (isc__netievent_t *)isc_queue_dequeue(
queue);
ievent = DEQUEUE_PRIORITY_NETIEVENT(worker);
}
}
UNLOCK(&worker->lock);
@ -861,29 +889,17 @@ wait_for_priority_queue(isc__networker_t *worker) {
if (ievent == NULL) {
return;
}
DECREMENT_PRIORITY_NETIEVENT(worker);
(void)process_netievent(worker, ievent);
}
}
static bool
process_priority_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents_prio, quantump));
}
static bool
process_privilege_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents_priv, quantump));
}
static bool
process_task_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents_task, quantump));
}
static bool
process_normal_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents, quantump));
static void
drain_queue(isc__networker_t *worker, netievent_type_t type) {
while (process_queue(worker, type) != ISC_R_EMPTY) {
;
}
}
/*
@ -984,28 +1000,46 @@ process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) {
return (true);
}
static bool
process_queue(isc__networker_t *worker, isc_queue_t *queue,
unsigned int *quantump) {
while (*quantump > 0) {
isc__netievent_t *ievent =
(isc__netievent_t *)isc_queue_dequeue(queue);
static isc_result_t
process_queue(isc__networker_t *worker, netievent_type_t type) {
/*
* The number of items on the queue is only loosely synchronized with
* the items on the queue. But there's a guarantee that if there's an
* item on the queue, it will be accounted for. However there's a
* possibility that the counter might be higher than the items on the
* queue stored.
*/
uint_fast32_t waiting = atomic_load_acquire(&worker->nievents[type]);
isc__netievent_t *ievent = DEQUEUE_NETIEVENT(worker, type);
if (ievent == NULL) {
/* We fully drained this queue */
return (true);
}
(*quantump)--;
if (!process_netievent(worker, ievent)) {
/* Netievent told us to stop */
return (false);
}
if (ievent == NULL && waiting == 0) {
/* There's nothing scheduled */
return (ISC_R_EMPTY);
} else if (ievent == NULL) {
/* There's at least one item scheduled, but not on the queue yet
*/
return (ISC_R_SUCCESS);
}
/* No more quantum */
return (false);
while (ievent != NULL) {
DECREMENT_NETIEVENT(worker, type);
bool stop = !process_netievent(worker, ievent);
if (stop) {
/* Netievent told us to stop */
return (ISC_R_SUSPEND);
}
if (waiting-- == 0) {
/* We reached this round "quota" */
break;
}
ievent = DEQUEUE_NETIEVENT(worker, type);
}
/* We processed at least one */
return (ISC_R_SUCCESS);
}
void *
@ -1107,15 +1141,19 @@ isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {
* the queue will be processed.
*/
LOCK(&worker->lock);
isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event);
INCREMENT_PRIORITY_NETIEVENT(worker);
ENQUEUE_PRIORITY_NETIEVENT(worker, event);
SIGNAL(&worker->cond_prio);
UNLOCK(&worker->lock);
} else if (event->type == netievent_privilegedtask) {
isc_queue_enqueue(worker->ievents_priv, (uintptr_t)event);
INCREMENT_PRIVILEGED_NETIEVENT(worker);
ENQUEUE_PRIVILEGED_NETIEVENT(worker, event);
} else if (event->type == netievent_task) {
isc_queue_enqueue(worker->ievents_task, (uintptr_t)event);
INCREMENT_TASK_NETIEVENT(worker);
ENQUEUE_TASK_NETIEVENT(worker, event);
} else {
isc_queue_enqueue(worker->ievents, (uintptr_t)event);
INCREMENT_NORMAL_NETIEVENT(worker);
ENQUEUE_NORMAL_NETIEVENT(worker, event);
}
uv_async_send(&worker->async);
}

View file

@ -77,7 +77,7 @@ static const char *description[ISC_R_NRESULTS] = {
"invalid use of multicast address", /*%< 43 */
"not a file", /*%< 44 */
"not a directory", /*%< 45 */
"queue is full", /*%< 46 */
"queue is empty", /*%< 46 */
"address family mismatch", /*%< 47 */
"address family not supported", /*%< 48 */
"bad hex encoding", /*%< 49 */
@ -151,7 +151,7 @@ static const char *identifier[ISC_R_NRESULTS] = { "ISC_R_SUCCESS",
"ISC_R_MULTICAST",
"ISC_R_NOTFILE",
"ISC_R_NOTDIRECTORY",
"ISC_R_QUEUEFULL",
"ISC_R_EMPTY",
"ISC_R_FAMILYMISMATCH",
"ISC_R_FAMILYNOSUPPORT",
"ISC_R_BADHEX",