mirror of
https://github.com/isc-projects/bind9.git
synced 2026-06-05 08:52:04 -04:00
Merge branch 'ondrej/convert-isc_async-to-use-urcu-wfcqueue' into 'main'
Change the isc_async and isc_quota API to use cds_wfcqueue internally See merge request isc-projects/bind9!7894
This commit is contained in:
commit
dee7b46a8d
15 changed files with 166 additions and 152 deletions
|
|
@ -244,6 +244,9 @@ AC_DEFINE_UNQUOTED([RCU_FLAVOR], ["$RCU_FLAVOR"], [Chosen Userspace-RCU flavor])
|
|||
PKG_CHECK_VERSION([RCU_VERSION], [$RCU_FLAVOR])
|
||||
AC_DEFINE_UNQUOTED([RCU_VERSION], ["$RCU_VERSION"], [Compile-time Userspace-RCU version])
|
||||
|
||||
CFLAGS="$CFLAGS $LIBURCU_CFLAGS"
|
||||
LIBS="$LIBS $LIBURCU_LIBS"
|
||||
|
||||
# Fuzzing is not included in pairwise testing as fuzzing tools are
|
||||
# not present in the relevant Docker image.
|
||||
#
|
||||
|
|
|
|||
|
|
@ -263,7 +263,6 @@ libdns_la_CPPFLAGS = \
|
|||
$(AM_CPPFLAGS) \
|
||||
$(LIBDNS_CFLAGS) \
|
||||
$(LIBISC_CFLAGS) \
|
||||
$(LIBURCU_CFLAGS) \
|
||||
$(LIBUV_CFLAGS) \
|
||||
$(OPENSSL_CFLAGS)
|
||||
|
||||
|
|
@ -273,7 +272,6 @@ libdns_la_LDFLAGS = \
|
|||
|
||||
libdns_la_LIBADD = \
|
||||
$(LIBISC_LIBS) \
|
||||
$(LIBURCU_LIBS) \
|
||||
$(LIBUV_LIBS) \
|
||||
$(OPENSSL_LIBS)
|
||||
|
||||
|
|
|
|||
|
|
@ -212,7 +212,6 @@ endif USE_ISC_RWLOCK
|
|||
libisc_la_CPPFLAGS = \
|
||||
$(AM_CPPFLAGS) \
|
||||
$(LIBISC_CFLAGS) \
|
||||
$(LIBURCU_CFLAGS) \
|
||||
$(LIBUV_CFLAGS) \
|
||||
$(OPENSSL_CFLAGS) \
|
||||
$(ZLIB_CFLAGS)
|
||||
|
|
@ -222,7 +221,6 @@ libisc_la_LDFLAGS = \
|
|||
-release "$(PACKAGE_VERSION)"
|
||||
|
||||
libisc_la_LIBADD = \
|
||||
$(LIBURCU_LIBS) \
|
||||
$(LIBUV_LIBS) \
|
||||
$(OPENSSL_LIBS) \
|
||||
$(ZLIB_LIBS)
|
||||
|
|
|
|||
|
|
@ -27,7 +27,6 @@
|
|||
#include <isc/refcount.h>
|
||||
#include <isc/result.h>
|
||||
#include <isc/signal.h>
|
||||
#include <isc/stack.h>
|
||||
#include <isc/strerr.h>
|
||||
#include <isc/thread.h>
|
||||
#include <isc/util.h>
|
||||
|
|
@ -45,42 +44,72 @@ isc_async_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
|
|||
|
||||
isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
|
||||
*job = (isc_job_t){
|
||||
.link = ISC_LINK_INITIALIZER,
|
||||
.cb = cb,
|
||||
.cbarg = cbarg,
|
||||
};
|
||||
|
||||
/*
|
||||
* Now send the half-initialized job to the loop queue.
|
||||
*/
|
||||
ISC_ASTACK_PUSH(loop->async_jobs, job, link);
|
||||
cds_wfcq_node_init(&job->wfcq_node);
|
||||
|
||||
int r = uv_async_send(&loop->async_trigger);
|
||||
UV_RUNTIME_CHECK(uv_async_send, r);
|
||||
/*
|
||||
* cds_wfcq_enqueue() is non-blocking and enqueues the job to async
|
||||
* queue.
|
||||
*
|
||||
* The function returns 'false' in case the queue was empty - in such
|
||||
* case we need to trigger the async callback.
|
||||
*/
|
||||
__tsan_release(job);
|
||||
if (!cds_wfcq_enqueue(&loop->async_jobs.head, &loop->async_jobs.tail,
|
||||
&job->wfcq_node))
|
||||
{
|
||||
int r = uv_async_send(&loop->async_trigger);
|
||||
UV_RUNTIME_CHECK(uv_async_send, r);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
isc__async_cb(uv_async_t *handle) {
|
||||
isc_loop_t *loop = uv_handle_get_data(handle);
|
||||
isc_jobqueue_t jobs;
|
||||
|
||||
REQUIRE(VALID_LOOP(loop));
|
||||
|
||||
ISC_STACK(isc_job_t) drain = ISC_ASTACK_TO_STACK(loop->async_jobs);
|
||||
ISC_LIST(isc_job_t) jobs = ISC_LIST_INITIALIZER;
|
||||
/* Initialize local wfcqueue */
|
||||
__cds_wfcq_init(&jobs.head, &jobs.tail);
|
||||
|
||||
isc_job_t *job = ISC_STACK_POP(drain, link);
|
||||
isc_job_t *next = NULL;
|
||||
while (job != NULL) {
|
||||
ISC_LIST_PREPEND(jobs, job, link);
|
||||
|
||||
job = ISC_STACK_POP(drain, link);
|
||||
/*
|
||||
* Move all the elements from loop->async_jobs to a local jobs queue.
|
||||
*
|
||||
* __cds_wfcq_splice_blocking() assumes that synchronization is
|
||||
* done externally - there's no internal locking, unlike
|
||||
* cds_wfcq_splice_blocking(), and we do not need to check whether
|
||||
* it needs to block, unlike __cds_wfcq_splice_nonblocking().
|
||||
*
|
||||
* The reason we can use __cds_wfcq_splice_blocking() is that the
|
||||
* only other function we use is cds_wfcq_enqueue() which doesn't
|
||||
* require any synchronization (see the table in urcu/wfcqueue.h
|
||||
* for more details).
|
||||
*/
|
||||
enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
|
||||
&jobs.head, &jobs.tail, &loop->async_jobs.head,
|
||||
&loop->async_jobs.tail);
|
||||
INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK);
|
||||
if (ret == CDS_WFCQ_RET_SRC_EMPTY) {
|
||||
/*
|
||||
* Nothing to do, the source queue was empty - most
|
||||
* probably we were called from isc__async_close() below.
|
||||
*/
|
||||
return;
|
||||
}
|
||||
|
||||
for (job = ISC_LIST_HEAD(jobs),
|
||||
next = (job ? ISC_LIST_NEXT(job, link) : NULL);
|
||||
job != NULL;
|
||||
job = next, next = (job ? ISC_LIST_NEXT(job, link) : NULL))
|
||||
{
|
||||
/*
|
||||
* Walk through the local queue which has now all the members copied
|
||||
* locally, and call the callbacks and free all the isc_job_t(s).
|
||||
*/
|
||||
struct cds_wfcq_node *node, *next;
|
||||
__cds_wfcq_for_each_blocking_safe(&jobs.head, &jobs.tail, node, next) {
|
||||
isc_job_t *job = caa_container_of(node, isc_job_t, wfcq_node);
|
||||
__tsan_acquire(job);
|
||||
|
||||
job->cb(job->cbarg);
|
||||
|
||||
isc_mem_put(loop->mctx, job, sizeof(*job));
|
||||
|
|
|
|||
|
|
@ -17,11 +17,8 @@
|
|||
#include <isc/job.h>
|
||||
#include <isc/loop.h>
|
||||
#include <isc/mem.h>
|
||||
#include <isc/stack.h>
|
||||
#include <isc/uv.h>
|
||||
|
||||
typedef ISC_ASTACK(isc_job_t) isc_asyncstack_t;
|
||||
|
||||
void
|
||||
isc__async_cb(uv_async_t *handle);
|
||||
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@
|
|||
#include <isc/mem.h>
|
||||
#include <isc/refcount.h>
|
||||
#include <isc/types.h>
|
||||
#include <isc/urcu.h>
|
||||
|
||||
typedef void (*isc_job_cb)(void *);
|
||||
typedef struct isc_job isc_job_t;
|
||||
|
|
@ -35,12 +36,15 @@ typedef struct isc_job isc_job_t;
|
|||
struct isc_job {
|
||||
isc_job_cb cb;
|
||||
void *cbarg;
|
||||
ISC_LINK(isc_job_t) link;
|
||||
union {
|
||||
struct cds_wfcq_node wfcq_node;
|
||||
ISC_LINK(isc_job_t) link;
|
||||
};
|
||||
};
|
||||
|
||||
#define ISC_JOB_INITIALIZER \
|
||||
{ \
|
||||
.link = ISC_LINK_INITIALIZER \
|
||||
#define ISC_JOB_INITIALIZER \
|
||||
{ \
|
||||
.link = ISC_LINK_INITIALIZER, \
|
||||
}
|
||||
|
||||
ISC_LANG_BEGINDECLS
|
||||
|
|
|
|||
|
|
@ -122,11 +122,6 @@ isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg);
|
|||
* yet been started.
|
||||
*/
|
||||
|
||||
void
|
||||
isc_loop_nosetup(isc_loop_t *loop, isc_job_t *job);
|
||||
void
|
||||
isc_loop_noteardown(isc_loop_t *loop, isc_job_t *job);
|
||||
|
||||
void
|
||||
isc_loopmgr_setup(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg);
|
||||
void
|
||||
|
|
|
|||
|
|
@ -30,13 +30,16 @@
|
|||
*** Imports.
|
||||
***/
|
||||
|
||||
#include <isc/align.h>
|
||||
#include <isc/atomic.h>
|
||||
#include <isc/job.h>
|
||||
#include <isc/lang.h>
|
||||
#include <isc/magic.h>
|
||||
#include <isc/mutex.h>
|
||||
#include <isc/os.h>
|
||||
#include <isc/refcount.h>
|
||||
#include <isc/types.h>
|
||||
#include <isc/urcu.h>
|
||||
|
||||
/*****
|
||||
***** Types.
|
||||
|
|
@ -46,15 +49,23 @@
|
|||
|
||||
ISC_LANG_BEGINDECLS
|
||||
|
||||
/*% isc_quota structure */
|
||||
/*%
|
||||
* isc_quota structure
|
||||
*
|
||||
* NOTE: We are using struct cds_wfcq_head which has an internal
|
||||
* mutex, because we are using enqueue and dequeue, and dequeues need
|
||||
* synchronization between multiple threads (see urcu/wfcqueue.h for
|
||||
* detailed description).
|
||||
*/
|
||||
struct isc_quota {
|
||||
int magic;
|
||||
atomic_uint_fast32_t max;
|
||||
atomic_uint_fast32_t used;
|
||||
atomic_uint_fast32_t soft;
|
||||
atomic_uint_fast32_t waiting;
|
||||
isc_mutex_t cblock;
|
||||
ISC_LIST(isc_job_t) jobs;
|
||||
struct {
|
||||
alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_head head;
|
||||
alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_tail tail;
|
||||
} jobs;
|
||||
ISC_LINK(isc_quota_t) link;
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -48,6 +48,7 @@ isc_job_run(isc_loop_t *loop, isc_job_t *job, isc_job_cb cb, void *cbarg) {
|
|||
|
||||
job->cb = cb;
|
||||
job->cbarg = cbarg;
|
||||
ISC_LINK_INIT(job, link);
|
||||
|
||||
ISC_LIST_APPEND(loop->run_jobs, job, link);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,10 +13,22 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <isc/align.h>
|
||||
#include <isc/job.h>
|
||||
#include <isc/loop.h>
|
||||
#include <isc/os.h>
|
||||
#include <isc/uv.h>
|
||||
|
||||
/*%
|
||||
* NOTE: We are using struct __cds_wfcq_head that doesn't have an internal
|
||||
* mutex, because we are only using enqueue and splice, and those don't need
|
||||
* any synchronization (see urcu/wfcqueue.h for detailed description).
|
||||
*/
|
||||
typedef struct isc_jobqueue {
|
||||
alignas(ISC_OS_CACHELINE_SIZE) struct __cds_wfcq_head head;
|
||||
alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_tail tail;
|
||||
} isc_jobqueue_t;
|
||||
|
||||
typedef ISC_LIST(isc_job_t) isc_joblist_t;
|
||||
|
||||
void
|
||||
|
|
|
|||
|
|
@ -159,7 +159,6 @@ destroy_cb(uv_async_t *handle) {
|
|||
|
||||
static void
|
||||
shutdown_cb(uv_async_t *handle) {
|
||||
isc_job_t *job = NULL;
|
||||
isc_loop_t *loop = uv_handle_get_data(handle);
|
||||
isc_loopmgr_t *loopmgr = loop->loopmgr;
|
||||
|
||||
|
|
@ -178,17 +177,12 @@ shutdown_cb(uv_async_t *handle) {
|
|||
isc_signal_destroy(&loopmgr->sigint);
|
||||
}
|
||||
|
||||
job = ISC_LIST_TAIL(loop->teardown_jobs);
|
||||
while (job != NULL) {
|
||||
isc_job_t *prev = ISC_LIST_PREV(job, link);
|
||||
ISC_LIST_UNLINK(loop->teardown_jobs, job, link);
|
||||
|
||||
job->cb(job->cbarg);
|
||||
|
||||
isc_mem_put(loop->mctx, job, sizeof(*job));
|
||||
|
||||
job = prev;
|
||||
}
|
||||
enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
|
||||
&loop->async_jobs.head, &loop->async_jobs.tail,
|
||||
&loop->teardown_jobs.head, &loop->teardown_jobs.tail);
|
||||
INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK);
|
||||
int r = uv_async_send(&loop->async_trigger);
|
||||
UV_RUNTIME_CHECK(uv_async_send, r);
|
||||
}
|
||||
|
||||
static void
|
||||
|
|
@ -202,12 +196,13 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
|
|||
*loop = (isc_loop_t){
|
||||
.tid = tid,
|
||||
.loopmgr = loopmgr,
|
||||
.async_jobs = ISC_ASTACK_INITIALIZER,
|
||||
.run_jobs = ISC_LIST_INITIALIZER,
|
||||
.setup_jobs = ISC_LIST_INITIALIZER,
|
||||
.teardown_jobs = ISC_LIST_INITIALIZER,
|
||||
};
|
||||
|
||||
__cds_wfcq_init(&loop->async_jobs.head, &loop->async_jobs.tail);
|
||||
__cds_wfcq_init(&loop->setup_jobs.head, &loop->setup_jobs.tail);
|
||||
__cds_wfcq_init(&loop->teardown_jobs.head, &loop->teardown_jobs.tail);
|
||||
|
||||
int r = uv_loop_init(&loop->loop);
|
||||
UV_RUNTIME_CHECK(uv_loop_init, r);
|
||||
|
||||
|
|
@ -248,23 +243,6 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
|
|||
loop->magic = LOOP_MAGIC;
|
||||
}
|
||||
|
||||
static void
|
||||
setup_jobs_cb(void *arg) {
|
||||
isc_loop_t *loop = arg;
|
||||
isc_job_t *job = ISC_LIST_HEAD(loop->setup_jobs);
|
||||
|
||||
while (job != NULL) {
|
||||
isc_job_t *next = ISC_LIST_NEXT(job, link);
|
||||
ISC_LIST_UNLINK(loop->setup_jobs, job, link);
|
||||
|
||||
job->cb(job->cbarg);
|
||||
|
||||
isc_mem_put(loop->mctx, job, sizeof(*job));
|
||||
|
||||
job = next;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
quiescent_cb(uv_prepare_t *handle) {
|
||||
isc__qsbr_quiescent_cb(handle);
|
||||
|
|
@ -285,7 +263,7 @@ loop_close(isc_loop_t *loop) {
|
|||
int r = uv_loop_close(&loop->loop);
|
||||
UV_RUNTIME_CHECK(uv_loop_close, r);
|
||||
|
||||
INSIST(ISC_ASTACK_EMPTY(loop->async_jobs));
|
||||
INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail));
|
||||
INSIST(ISC_LIST_EMPTY(loop->run_jobs));
|
||||
|
||||
loop->magic = 0;
|
||||
|
|
@ -306,7 +284,13 @@ loop_thread(void *arg) {
|
|||
|
||||
isc_barrier_wait(&loop->loopmgr->starting);
|
||||
|
||||
isc_async_run(loop, setup_jobs_cb, loop);
|
||||
enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
|
||||
&loop->async_jobs.head, &loop->async_jobs.tail,
|
||||
&loop->setup_jobs.head, &loop->setup_jobs.tail);
|
||||
INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK);
|
||||
|
||||
r = uv_async_send(&loop->async_trigger);
|
||||
UV_RUNTIME_CHECK(uv_async_send, r);
|
||||
|
||||
r = uv_run(&loop->loop, UV_RUN_DEFAULT);
|
||||
UV_RUNTIME_CHECK(uv_run, r);
|
||||
|
|
@ -316,16 +300,6 @@ loop_thread(void *arg) {
|
|||
return (NULL);
|
||||
}
|
||||
|
||||
void
|
||||
isc_loop_nosetup(isc_loop_t *loop, isc_job_t *job) {
|
||||
ISC_LIST_DEQUEUE(loop->setup_jobs, job, link);
|
||||
}
|
||||
|
||||
void
|
||||
isc_loop_noteardown(isc_loop_t *loop, isc_job_t *job) {
|
||||
ISC_LIST_DEQUEUE(loop->teardown_jobs, job, link);
|
||||
}
|
||||
|
||||
/**
|
||||
* Public
|
||||
*/
|
||||
|
|
@ -406,13 +380,15 @@ isc_loop_setup(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
|
|||
*job = (isc_job_t){
|
||||
.cb = cb,
|
||||
.cbarg = cbarg,
|
||||
.link = ISC_LINK_INITIALIZER,
|
||||
};
|
||||
|
||||
cds_wfcq_node_init(&job->wfcq_node);
|
||||
|
||||
REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
|
||||
atomic_load(&loopmgr->paused));
|
||||
|
||||
ISC_LIST_APPEND(loop->setup_jobs, job, link);
|
||||
cds_wfcq_enqueue(&loop->setup_jobs.head, &loop->setup_jobs.tail,
|
||||
&job->wfcq_node);
|
||||
|
||||
return (job);
|
||||
}
|
||||
|
|
@ -426,13 +402,14 @@ isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
|
|||
*job = (isc_job_t){
|
||||
.cb = cb,
|
||||
.cbarg = cbarg,
|
||||
.link = ISC_LINK_INITIALIZER,
|
||||
};
|
||||
cds_wfcq_node_init(&job->wfcq_node);
|
||||
|
||||
REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
|
||||
atomic_load(&loopmgr->paused));
|
||||
|
||||
ISC_LIST_APPEND(loop->teardown_jobs, job, link);
|
||||
cds_wfcq_enqueue(&loop->teardown_jobs.head, &loop->teardown_jobs.tail,
|
||||
&job->wfcq_node);
|
||||
|
||||
return (job);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,9 +25,9 @@
|
|||
#include <isc/refcount.h>
|
||||
#include <isc/result.h>
|
||||
#include <isc/signal.h>
|
||||
#include <isc/stack.h>
|
||||
#include <isc/thread.h>
|
||||
#include <isc/types.h>
|
||||
#include <isc/urcu.h>
|
||||
#include <isc/uv.h>
|
||||
#include <isc/work.h>
|
||||
|
||||
|
|
@ -58,7 +58,7 @@ struct isc_loop {
|
|||
|
||||
/* Async queue */
|
||||
uv_async_t async_trigger;
|
||||
isc_asyncstack_t async_jobs;
|
||||
isc_jobqueue_t async_jobs;
|
||||
|
||||
/* Jobs queue */
|
||||
uv_idle_t run_trigger;
|
||||
|
|
@ -69,8 +69,8 @@ struct isc_loop {
|
|||
|
||||
/* Shutdown */
|
||||
uv_async_t shutdown_trigger;
|
||||
isc_joblist_t setup_jobs;
|
||||
isc_joblist_t teardown_jobs;
|
||||
isc_jobqueue_t setup_jobs;
|
||||
isc_jobqueue_t teardown_jobs;
|
||||
|
||||
/* Destroy */
|
||||
uv_async_t destroy_trigger;
|
||||
|
|
|
|||
|
|
@ -692,8 +692,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker,
|
|||
.active_handles = ISC_LIST_INITIALIZER,
|
||||
.active_link = ISC_LINK_INITIALIZER,
|
||||
.active = true,
|
||||
.job = ISC_JOB_INITIALIZER,
|
||||
.quotacb = ISC_JOB_INITIALIZER,
|
||||
};
|
||||
|
||||
if (iface != NULL) {
|
||||
|
|
@ -716,8 +714,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker,
|
|||
isc__networker_attach(worker, &sock->worker);
|
||||
sock->uv_handle.handle.data = sock;
|
||||
|
||||
ISC_LINK_INIT(&sock->quotacb, link);
|
||||
|
||||
switch (type) {
|
||||
case isc_nm_udpsocket:
|
||||
case isc_nm_udplistener:
|
||||
|
|
@ -805,7 +801,6 @@ alloc_handle(isc_nmsocket_t *sock) {
|
|||
.magic = NMHANDLE_MAGIC,
|
||||
.active_link = ISC_LINK_INITIALIZER,
|
||||
.inactive_link = ISC_LINK_INITIALIZER,
|
||||
.job = ISC_JOB_INITIALIZER,
|
||||
};
|
||||
isc_refcount_init(&handle->references, 1);
|
||||
|
||||
|
|
@ -1522,7 +1517,6 @@ isc___nm_uvreq_get(isc_nmsocket_t *sock FLARG) {
|
|||
.connect_tries = 3,
|
||||
.link = ISC_LINK_INITIALIZER,
|
||||
.active_link = ISC_LINK_INITIALIZER,
|
||||
.job = ISC_JOB_INITIALIZER,
|
||||
.magic = UVREQ_MAGIC,
|
||||
};
|
||||
uv_handle_set_data(&req->uv_req.handle, req);
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
#include <isc/atomic.h>
|
||||
#include <isc/quota.h>
|
||||
#include <isc/urcu.h>
|
||||
#include <isc/util.h>
|
||||
|
||||
#define QUOTA_MAGIC ISC_MAGIC('Q', 'U', 'O', 'T')
|
||||
|
|
@ -27,9 +28,7 @@ isc_quota_init(isc_quota_t *quota, unsigned int max) {
|
|||
atomic_init("a->max, max);
|
||||
atomic_init("a->used, 0);
|
||||
atomic_init("a->soft, 0);
|
||||
atomic_init("a->waiting, 0);
|
||||
ISC_LIST_INIT(quota->jobs);
|
||||
isc_mutex_init("a->cblock);
|
||||
cds_wfcq_init("a->jobs.head, "a->jobs.tail);
|
||||
ISC_LINK_INIT(quota, link);
|
||||
quota->magic = QUOTA_MAGIC;
|
||||
}
|
||||
|
|
@ -37,6 +36,7 @@ isc_quota_init(isc_quota_t *quota, unsigned int max) {
|
|||
void
|
||||
isc_quota_soft(isc_quota_t *quota, unsigned int soft) {
|
||||
REQUIRE(VALID_QUOTA(quota));
|
||||
REQUIRE(atomic_load_relaxed("a->max) > soft);
|
||||
atomic_store_relaxed("a->soft, soft);
|
||||
}
|
||||
|
||||
|
|
@ -64,49 +64,24 @@ isc_quota_getused(isc_quota_t *quota) {
|
|||
return (atomic_load_relaxed("a->used));
|
||||
}
|
||||
|
||||
/* Must be quota->cblock locked */
|
||||
static void
|
||||
enqueue(isc_quota_t *quota, isc_job_t *cb) {
|
||||
REQUIRE(cb != NULL);
|
||||
ISC_LIST_ENQUEUE(quota->jobs, cb, link);
|
||||
atomic_fetch_add_relaxed("a->waiting, 1);
|
||||
}
|
||||
|
||||
/* Must be quota->cblock locked */
|
||||
static isc_job_t *
|
||||
dequeue(isc_quota_t *quota) {
|
||||
isc_job_t *cb = ISC_LIST_HEAD(quota->jobs);
|
||||
if (cb != NULL) {
|
||||
ISC_LIST_DEQUEUE(quota->jobs, cb, link);
|
||||
atomic_fetch_sub_relaxed("a->waiting, 1);
|
||||
}
|
||||
return (cb);
|
||||
}
|
||||
|
||||
void
|
||||
isc_quota_release(isc_quota_t *quota) {
|
||||
uint_fast32_t used;
|
||||
|
||||
/*
|
||||
* This is opportunistic - we might race with a failing quota_attach_cb
|
||||
* and not detect that something is waiting, but eventually someone will
|
||||
* be releasing quota and will detect it, so we don't need to worry -
|
||||
* and we're saving a lot by not locking cblock every time.
|
||||
* We are using the cds_wfcq_dequeue_blocking() variant here that
|
||||
* has an internal mutex because we need synchronization on
|
||||
* multiple dequeues running from different threads.
|
||||
*/
|
||||
|
||||
if (atomic_load_acquire("a->waiting) > 0) {
|
||||
isc_job_t *cb = NULL;
|
||||
LOCK("a->cblock);
|
||||
cb = dequeue(quota);
|
||||
UNLOCK("a->cblock);
|
||||
if (cb != NULL) {
|
||||
cb->cb(cb->cbarg);
|
||||
return;
|
||||
}
|
||||
struct cds_wfcq_node *node =
|
||||
cds_wfcq_dequeue_blocking("a->jobs.head, "a->jobs.tail);
|
||||
if (node == NULL) {
|
||||
uint_fast32_t used = atomic_fetch_sub_relaxed("a->used, 1);
|
||||
INSIST(used > 0);
|
||||
return;
|
||||
}
|
||||
|
||||
used = atomic_fetch_sub_release("a->used, 1);
|
||||
INSIST(used > 0);
|
||||
isc_job_t *job = caa_container_of(node, isc_job_t, wfcq_node);
|
||||
__tsan_acquire(job);
|
||||
job->cb(job->cbarg);
|
||||
}
|
||||
|
||||
isc_result_t
|
||||
|
|
@ -115,17 +90,22 @@ isc_quota_acquire_cb(isc_quota_t *quota, isc_job_t *job, isc_job_cb cb,
|
|||
REQUIRE(VALID_QUOTA(quota));
|
||||
REQUIRE(job == NULL || cb != NULL);
|
||||
|
||||
uint_fast32_t used = atomic_fetch_add_release("a->used, 1);
|
||||
|
||||
uint_fast32_t used = atomic_fetch_add_relaxed("a->used, 1);
|
||||
uint_fast32_t max = atomic_load_relaxed("a->max);
|
||||
if (max != 0 && used >= max) {
|
||||
(void)atomic_fetch_sub_relaxed("a->used, 1);
|
||||
if (job != NULL) {
|
||||
job->cb = cb;
|
||||
job->cbarg = cbarg;
|
||||
LOCK("a->cblock);
|
||||
enqueue(quota, job);
|
||||
UNLOCK("a->cblock);
|
||||
cds_wfcq_node_init(&job->wfcq_node);
|
||||
|
||||
/*
|
||||
* The cds_wfcq_enqueue() is non-blocking (no internal
|
||||
* mutex involved), so it offers a slight advantage.
|
||||
*/
|
||||
__tsan_release(job);
|
||||
cds_wfcq_enqueue("a->jobs.head, "a->jobs.tail,
|
||||
&job->wfcq_node);
|
||||
}
|
||||
return (ISC_R_QUOTA);
|
||||
}
|
||||
|
|
@ -144,7 +124,7 @@ isc_quota_destroy(isc_quota_t *quota) {
|
|||
quota->magic = 0;
|
||||
|
||||
INSIST(atomic_load("a->used) == 0);
|
||||
INSIST(atomic_load("a->waiting) == 0);
|
||||
INSIST(ISC_LIST_EMPTY(quota->jobs));
|
||||
isc_mutex_destroy("a->cblock);
|
||||
INSIST(cds_wfcq_empty("a->jobs.head, "a->jobs.tail));
|
||||
|
||||
cds_wfcq_destroy("a->jobs.head, "a->jobs.tail);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -515,6 +515,9 @@ ISC_LOOP_TEST_IMPL(udp_timeout_recovery) {
|
|||
udp_timeout_recovery_connect_cb, listen_sock, T_SOFT);
|
||||
}
|
||||
|
||||
static void
|
||||
udp_connect_udpconnect(void *arg ISC_ATTR_UNUSED);
|
||||
|
||||
static void
|
||||
udp_shutdown_connect_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult,
|
||||
void *cbarg) {
|
||||
|
|
@ -523,20 +526,29 @@ udp_shutdown_connect_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult,
|
|||
|
||||
isc_refcount_decrement(&active_cconnects);
|
||||
|
||||
assert_int_equal(eresult, ISC_R_SHUTTINGDOWN);
|
||||
|
||||
atomic_fetch_add(&cconnects, 1);
|
||||
/*
|
||||
* The first UDP connect is faster than asynchronous shutdown procedure,
|
||||
* restart the UDP connect again and expect the failure only in the
|
||||
* second loop.
|
||||
*/
|
||||
if (atomic_fetch_add(&cconnects, 1) == 0) {
|
||||
assert_int_equal(eresult, ISC_R_SUCCESS);
|
||||
isc_async_current(loopmgr, udp_connect_udpconnect, netmgr);
|
||||
} else {
|
||||
assert_int_equal(eresult, ISC_R_SHUTTINGDOWN);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
udp_connect_udpconnect(void *arg ISC_ATTR_UNUSED) {
|
||||
isc_refcount_increment0(&active_cconnects);
|
||||
isc_nm_udpconnect(netmgr, &udp_connect_addr, &udp_listen_addr,
|
||||
udp_shutdown_connect_connect_cb, NULL, T_SOFT);
|
||||
}
|
||||
|
||||
ISC_SETUP_TEST_IMPL(udp_shutdown_connect) {
|
||||
setup_test(state);
|
||||
expected_cconnects = 1;
|
||||
expected_cconnects = 2;
|
||||
return (0);
|
||||
}
|
||||
|
||||
|
|
@ -548,7 +560,10 @@ ISC_TEARDOWN_TEST_IMPL(udp_shutdown_connect) {
|
|||
|
||||
ISC_LOOP_TEST_IMPL(udp_shutdown_connect) {
|
||||
isc_loopmgr_shutdown(loopmgr);
|
||||
isc_refcount_increment0(&active_cconnects);
|
||||
/*
|
||||
* isc_nm_udpconnect() is synchronous, so we need to launch this on the
|
||||
* async loop.
|
||||
*/
|
||||
isc_async_current(loopmgr, udp_connect_udpconnect, netmgr);
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue