diff --git a/configure.ac b/configure.ac index 46dcc2a441..11a7b68a3f 100644 --- a/configure.ac +++ b/configure.ac @@ -244,6 +244,9 @@ AC_DEFINE_UNQUOTED([RCU_FLAVOR], ["$RCU_FLAVOR"], [Chosen Userspace-RCU flavor]) PKG_CHECK_VERSION([RCU_VERSION], [$RCU_FLAVOR]) AC_DEFINE_UNQUOTED([RCU_VERSION], ["$RCU_VERSION"], [Compile-time Userspace-RCU version]) +CFLAGS="$CFLAGS $LIBURCU_CFLAGS" +LIBS="$LIBS $LIBURCU_LIBS" + # Fuzzing is not included in pairwise testing as fuzzing tools are # not present in the relevant Docker image. # diff --git a/lib/dns/Makefile.am b/lib/dns/Makefile.am index a70d7e7381..1db6d6585a 100644 --- a/lib/dns/Makefile.am +++ b/lib/dns/Makefile.am @@ -263,7 +263,6 @@ libdns_la_CPPFLAGS = \ $(AM_CPPFLAGS) \ $(LIBDNS_CFLAGS) \ $(LIBISC_CFLAGS) \ - $(LIBURCU_CFLAGS) \ $(LIBUV_CFLAGS) \ $(OPENSSL_CFLAGS) @@ -273,7 +272,6 @@ libdns_la_LDFLAGS = \ libdns_la_LIBADD = \ $(LIBISC_LIBS) \ - $(LIBURCU_LIBS) \ $(LIBUV_LIBS) \ $(OPENSSL_LIBS) diff --git a/lib/isc/Makefile.am b/lib/isc/Makefile.am index df23562ac4..7d74517464 100644 --- a/lib/isc/Makefile.am +++ b/lib/isc/Makefile.am @@ -212,7 +212,6 @@ endif USE_ISC_RWLOCK libisc_la_CPPFLAGS = \ $(AM_CPPFLAGS) \ $(LIBISC_CFLAGS) \ - $(LIBURCU_CFLAGS) \ $(LIBUV_CFLAGS) \ $(OPENSSL_CFLAGS) \ $(ZLIB_CFLAGS) @@ -222,7 +221,6 @@ libisc_la_LDFLAGS = \ -release "$(PACKAGE_VERSION)" libisc_la_LIBADD = \ - $(LIBURCU_LIBS) \ $(LIBUV_LIBS) \ $(OPENSSL_LIBS) \ $(ZLIB_LIBS) diff --git a/lib/isc/async.c b/lib/isc/async.c index 326d3a0803..db4388964e 100644 --- a/lib/isc/async.c +++ b/lib/isc/async.c @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -45,42 +44,72 @@ isc_async_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job)); *job = (isc_job_t){ - .link = ISC_LINK_INITIALIZER, .cb = cb, .cbarg = cbarg, }; - /* - * Now send the half-initialized job to the loop queue. - */ - ISC_ASTACK_PUSH(loop->async_jobs, job, link); + cds_wfcq_node_init(&job->wfcq_node); - int r = uv_async_send(&loop->async_trigger); - UV_RUNTIME_CHECK(uv_async_send, r); + /* + * cds_wfcq_enqueue() is non-blocking and enqueues the job to async + * queue. + * + * The function returns 'false' in case the queue was empty - in such + * case we need to trigger the async callback. + */ + __tsan_release(job); + if (!cds_wfcq_enqueue(&loop->async_jobs.head, &loop->async_jobs.tail, + &job->wfcq_node)) + { + int r = uv_async_send(&loop->async_trigger); + UV_RUNTIME_CHECK(uv_async_send, r); + } } void isc__async_cb(uv_async_t *handle) { isc_loop_t *loop = uv_handle_get_data(handle); + isc_jobqueue_t jobs; REQUIRE(VALID_LOOP(loop)); - ISC_STACK(isc_job_t) drain = ISC_ASTACK_TO_STACK(loop->async_jobs); - ISC_LIST(isc_job_t) jobs = ISC_LIST_INITIALIZER; + /* Initialize local wfcqueue */ + __cds_wfcq_init(&jobs.head, &jobs.tail); - isc_job_t *job = ISC_STACK_POP(drain, link); - isc_job_t *next = NULL; - while (job != NULL) { - ISC_LIST_PREPEND(jobs, job, link); - - job = ISC_STACK_POP(drain, link); + /* + * Move all the elements from loop->async_jobs to a local jobs queue. + * + * __cds_wfcq_splice_blocking() assumes that synchronization is + * done externally - there's no internal locking, unlike + * cds_wfcq_splice_blocking(), and we do not need to check whether + * it needs to block, unlike __cds_wfcq_splice_nonblocking(). + * + * The reason we can use __cds_wfcq_splice_blocking() is that the + * only other function we use is cds_wfcq_enqueue() which doesn't + * require any synchronization (see the table in urcu/wfcqueue.h + * for more details). + */ + enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking( + &jobs.head, &jobs.tail, &loop->async_jobs.head, + &loop->async_jobs.tail); + INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK); + if (ret == CDS_WFCQ_RET_SRC_EMPTY) { + /* + * Nothing to do, the source queue was empty - most + * probably we were called from isc__async_close() below. + */ + return; } - for (job = ISC_LIST_HEAD(jobs), - next = (job ? ISC_LIST_NEXT(job, link) : NULL); - job != NULL; - job = next, next = (job ? ISC_LIST_NEXT(job, link) : NULL)) - { + /* + * Walk through the local queue which has now all the members copied + * locally, and call the callbacks and free all the isc_job_t(s). + */ + struct cds_wfcq_node *node, *next; + __cds_wfcq_for_each_blocking_safe(&jobs.head, &jobs.tail, node, next) { + isc_job_t *job = caa_container_of(node, isc_job_t, wfcq_node); + __tsan_acquire(job); + job->cb(job->cbarg); isc_mem_put(loop->mctx, job, sizeof(*job)); diff --git a/lib/isc/async_p.h b/lib/isc/async_p.h index 0d854cd0ae..1ce94b99e1 100644 --- a/lib/isc/async_p.h +++ b/lib/isc/async_p.h @@ -17,11 +17,8 @@ #include #include #include -#include #include -typedef ISC_ASTACK(isc_job_t) isc_asyncstack_t; - void isc__async_cb(uv_async_t *handle); diff --git a/lib/isc/include/isc/job.h b/lib/isc/include/isc/job.h index 8eeb60d0f4..9a44974c25 100644 --- a/lib/isc/include/isc/job.h +++ b/lib/isc/include/isc/job.h @@ -28,6 +28,7 @@ #include #include #include +#include typedef void (*isc_job_cb)(void *); typedef struct isc_job isc_job_t; @@ -35,12 +36,15 @@ typedef struct isc_job isc_job_t; struct isc_job { isc_job_cb cb; void *cbarg; - ISC_LINK(isc_job_t) link; + union { + struct cds_wfcq_node wfcq_node; + ISC_LINK(isc_job_t) link; + }; }; -#define ISC_JOB_INITIALIZER \ - { \ - .link = ISC_LINK_INITIALIZER \ +#define ISC_JOB_INITIALIZER \ + { \ + .link = ISC_LINK_INITIALIZER, \ } ISC_LANG_BEGINDECLS diff --git a/lib/isc/include/isc/loop.h b/lib/isc/include/isc/loop.h index 05440375a0..93330e60d5 100644 --- a/lib/isc/include/isc/loop.h +++ b/lib/isc/include/isc/loop.h @@ -122,11 +122,6 @@ isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg); * yet been started. */ -void -isc_loop_nosetup(isc_loop_t *loop, isc_job_t *job); -void -isc_loop_noteardown(isc_loop_t *loop, isc_job_t *job); - void isc_loopmgr_setup(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg); void diff --git a/lib/isc/include/isc/quota.h b/lib/isc/include/isc/quota.h index 22e653345c..85654dd66f 100644 --- a/lib/isc/include/isc/quota.h +++ b/lib/isc/include/isc/quota.h @@ -30,13 +30,16 @@ *** Imports. ***/ +#include #include #include #include #include #include +#include #include #include +#include /***** ***** Types. @@ -46,15 +49,23 @@ ISC_LANG_BEGINDECLS -/*% isc_quota structure */ +/*% + * isc_quota structure + * + * NOTE: We are using struct cds_wfcq_head which has an internal + * mutex, because we are using enqueue and dequeue, and dequeues need + * synchronization between multiple threads (see urcu/wfcqueue.h for + * detailed description). + */ struct isc_quota { int magic; atomic_uint_fast32_t max; atomic_uint_fast32_t used; atomic_uint_fast32_t soft; - atomic_uint_fast32_t waiting; - isc_mutex_t cblock; - ISC_LIST(isc_job_t) jobs; + struct { + alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_head head; + alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_tail tail; + } jobs; ISC_LINK(isc_quota_t) link; }; diff --git a/lib/isc/job.c b/lib/isc/job.c index 9f64df73e8..963248df15 100644 --- a/lib/isc/job.c +++ b/lib/isc/job.c @@ -48,6 +48,7 @@ isc_job_run(isc_loop_t *loop, isc_job_t *job, isc_job_cb cb, void *cbarg) { job->cb = cb; job->cbarg = cbarg; + ISC_LINK_INIT(job, link); ISC_LIST_APPEND(loop->run_jobs, job, link); } diff --git a/lib/isc/job_p.h b/lib/isc/job_p.h index 89fa078077..06c7bc319e 100644 --- a/lib/isc/job_p.h +++ b/lib/isc/job_p.h @@ -13,10 +13,22 @@ #pragma once +#include #include #include +#include #include +/*% + * NOTE: We are using struct __cds_wfcq_head that doesn't have an internal + * mutex, because we are only using enqueue and splice, and those don't need + * any synchronization (see urcu/wfcqueue.h for detailed description). + */ +typedef struct isc_jobqueue { + alignas(ISC_OS_CACHELINE_SIZE) struct __cds_wfcq_head head; + alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_tail tail; +} isc_jobqueue_t; + typedef ISC_LIST(isc_job_t) isc_joblist_t; void diff --git a/lib/isc/loop.c b/lib/isc/loop.c index 77f3dcc52e..1cef7d35ad 100644 --- a/lib/isc/loop.c +++ b/lib/isc/loop.c @@ -159,7 +159,6 @@ destroy_cb(uv_async_t *handle) { static void shutdown_cb(uv_async_t *handle) { - isc_job_t *job = NULL; isc_loop_t *loop = uv_handle_get_data(handle); isc_loopmgr_t *loopmgr = loop->loopmgr; @@ -178,17 +177,12 @@ shutdown_cb(uv_async_t *handle) { isc_signal_destroy(&loopmgr->sigint); } - job = ISC_LIST_TAIL(loop->teardown_jobs); - while (job != NULL) { - isc_job_t *prev = ISC_LIST_PREV(job, link); - ISC_LIST_UNLINK(loop->teardown_jobs, job, link); - - job->cb(job->cbarg); - - isc_mem_put(loop->mctx, job, sizeof(*job)); - - job = prev; - } + enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking( + &loop->async_jobs.head, &loop->async_jobs.tail, + &loop->teardown_jobs.head, &loop->teardown_jobs.tail); + INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK); + int r = uv_async_send(&loop->async_trigger); + UV_RUNTIME_CHECK(uv_async_send, r); } static void @@ -202,12 +196,13 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) { *loop = (isc_loop_t){ .tid = tid, .loopmgr = loopmgr, - .async_jobs = ISC_ASTACK_INITIALIZER, .run_jobs = ISC_LIST_INITIALIZER, - .setup_jobs = ISC_LIST_INITIALIZER, - .teardown_jobs = ISC_LIST_INITIALIZER, }; + __cds_wfcq_init(&loop->async_jobs.head, &loop->async_jobs.tail); + __cds_wfcq_init(&loop->setup_jobs.head, &loop->setup_jobs.tail); + __cds_wfcq_init(&loop->teardown_jobs.head, &loop->teardown_jobs.tail); + int r = uv_loop_init(&loop->loop); UV_RUNTIME_CHECK(uv_loop_init, r); @@ -248,23 +243,6 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) { loop->magic = LOOP_MAGIC; } -static void -setup_jobs_cb(void *arg) { - isc_loop_t *loop = arg; - isc_job_t *job = ISC_LIST_HEAD(loop->setup_jobs); - - while (job != NULL) { - isc_job_t *next = ISC_LIST_NEXT(job, link); - ISC_LIST_UNLINK(loop->setup_jobs, job, link); - - job->cb(job->cbarg); - - isc_mem_put(loop->mctx, job, sizeof(*job)); - - job = next; - } -} - static void quiescent_cb(uv_prepare_t *handle) { isc__qsbr_quiescent_cb(handle); @@ -285,7 +263,7 @@ loop_close(isc_loop_t *loop) { int r = uv_loop_close(&loop->loop); UV_RUNTIME_CHECK(uv_loop_close, r); - INSIST(ISC_ASTACK_EMPTY(loop->async_jobs)); + INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail)); INSIST(ISC_LIST_EMPTY(loop->run_jobs)); loop->magic = 0; @@ -306,7 +284,13 @@ loop_thread(void *arg) { isc_barrier_wait(&loop->loopmgr->starting); - isc_async_run(loop, setup_jobs_cb, loop); + enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking( + &loop->async_jobs.head, &loop->async_jobs.tail, + &loop->setup_jobs.head, &loop->setup_jobs.tail); + INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK); + + r = uv_async_send(&loop->async_trigger); + UV_RUNTIME_CHECK(uv_async_send, r); r = uv_run(&loop->loop, UV_RUN_DEFAULT); UV_RUNTIME_CHECK(uv_run, r); @@ -316,16 +300,6 @@ loop_thread(void *arg) { return (NULL); } -void -isc_loop_nosetup(isc_loop_t *loop, isc_job_t *job) { - ISC_LIST_DEQUEUE(loop->setup_jobs, job, link); -} - -void -isc_loop_noteardown(isc_loop_t *loop, isc_job_t *job) { - ISC_LIST_DEQUEUE(loop->teardown_jobs, job, link); -} - /** * Public */ @@ -406,13 +380,15 @@ isc_loop_setup(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { *job = (isc_job_t){ .cb = cb, .cbarg = cbarg, - .link = ISC_LINK_INITIALIZER, }; + cds_wfcq_node_init(&job->wfcq_node); + REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) || atomic_load(&loopmgr->paused)); - ISC_LIST_APPEND(loop->setup_jobs, job, link); + cds_wfcq_enqueue(&loop->setup_jobs.head, &loop->setup_jobs.tail, + &job->wfcq_node); return (job); } @@ -426,13 +402,14 @@ isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { *job = (isc_job_t){ .cb = cb, .cbarg = cbarg, - .link = ISC_LINK_INITIALIZER, }; + cds_wfcq_node_init(&job->wfcq_node); REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) || atomic_load(&loopmgr->paused)); - ISC_LIST_APPEND(loop->teardown_jobs, job, link); + cds_wfcq_enqueue(&loop->teardown_jobs.head, &loop->teardown_jobs.tail, + &job->wfcq_node); return (job); } diff --git a/lib/isc/loop_p.h b/lib/isc/loop_p.h index 9594a0f306..b9bef13b19 100644 --- a/lib/isc/loop_p.h +++ b/lib/isc/loop_p.h @@ -25,9 +25,9 @@ #include #include #include -#include #include #include +#include #include #include @@ -58,7 +58,7 @@ struct isc_loop { /* Async queue */ uv_async_t async_trigger; - isc_asyncstack_t async_jobs; + isc_jobqueue_t async_jobs; /* Jobs queue */ uv_idle_t run_trigger; @@ -69,8 +69,8 @@ struct isc_loop { /* Shutdown */ uv_async_t shutdown_trigger; - isc_joblist_t setup_jobs; - isc_joblist_t teardown_jobs; + isc_jobqueue_t setup_jobs; + isc_jobqueue_t teardown_jobs; /* Destroy */ uv_async_t destroy_trigger; diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 583febaa8a..c861751555 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -692,8 +692,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker, .active_handles = ISC_LIST_INITIALIZER, .active_link = ISC_LINK_INITIALIZER, .active = true, - .job = ISC_JOB_INITIALIZER, - .quotacb = ISC_JOB_INITIALIZER, }; if (iface != NULL) { @@ -716,8 +714,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker, isc__networker_attach(worker, &sock->worker); sock->uv_handle.handle.data = sock; - ISC_LINK_INIT(&sock->quotacb, link); - switch (type) { case isc_nm_udpsocket: case isc_nm_udplistener: @@ -805,7 +801,6 @@ alloc_handle(isc_nmsocket_t *sock) { .magic = NMHANDLE_MAGIC, .active_link = ISC_LINK_INITIALIZER, .inactive_link = ISC_LINK_INITIALIZER, - .job = ISC_JOB_INITIALIZER, }; isc_refcount_init(&handle->references, 1); @@ -1522,7 +1517,6 @@ isc___nm_uvreq_get(isc_nmsocket_t *sock FLARG) { .connect_tries = 3, .link = ISC_LINK_INITIALIZER, .active_link = ISC_LINK_INITIALIZER, - .job = ISC_JOB_INITIALIZER, .magic = UVREQ_MAGIC, }; uv_handle_set_data(&req->uv_req.handle, req); diff --git a/lib/isc/quota.c b/lib/isc/quota.c index 4dbdbc5781..4816333e1c 100644 --- a/lib/isc/quota.c +++ b/lib/isc/quota.c @@ -17,6 +17,7 @@ #include #include +#include #include #define QUOTA_MAGIC ISC_MAGIC('Q', 'U', 'O', 'T') @@ -27,9 +28,7 @@ isc_quota_init(isc_quota_t *quota, unsigned int max) { atomic_init("a->max, max); atomic_init("a->used, 0); atomic_init("a->soft, 0); - atomic_init("a->waiting, 0); - ISC_LIST_INIT(quota->jobs); - isc_mutex_init("a->cblock); + cds_wfcq_init("a->jobs.head, "a->jobs.tail); ISC_LINK_INIT(quota, link); quota->magic = QUOTA_MAGIC; } @@ -37,6 +36,7 @@ isc_quota_init(isc_quota_t *quota, unsigned int max) { void isc_quota_soft(isc_quota_t *quota, unsigned int soft) { REQUIRE(VALID_QUOTA(quota)); + REQUIRE(atomic_load_relaxed("a->max) > soft); atomic_store_relaxed("a->soft, soft); } @@ -64,49 +64,24 @@ isc_quota_getused(isc_quota_t *quota) { return (atomic_load_relaxed("a->used)); } -/* Must be quota->cblock locked */ -static void -enqueue(isc_quota_t *quota, isc_job_t *cb) { - REQUIRE(cb != NULL); - ISC_LIST_ENQUEUE(quota->jobs, cb, link); - atomic_fetch_add_relaxed("a->waiting, 1); -} - -/* Must be quota->cblock locked */ -static isc_job_t * -dequeue(isc_quota_t *quota) { - isc_job_t *cb = ISC_LIST_HEAD(quota->jobs); - if (cb != NULL) { - ISC_LIST_DEQUEUE(quota->jobs, cb, link); - atomic_fetch_sub_relaxed("a->waiting, 1); - } - return (cb); -} - void isc_quota_release(isc_quota_t *quota) { - uint_fast32_t used; - /* - * This is opportunistic - we might race with a failing quota_attach_cb - * and not detect that something is waiting, but eventually someone will - * be releasing quota and will detect it, so we don't need to worry - - * and we're saving a lot by not locking cblock every time. + * We are using the cds_wfcq_dequeue_blocking() variant here that + * has an internal mutex because we need synchronization on + * multiple dequeues running from different threads. */ - - if (atomic_load_acquire("a->waiting) > 0) { - isc_job_t *cb = NULL; - LOCK("a->cblock); - cb = dequeue(quota); - UNLOCK("a->cblock); - if (cb != NULL) { - cb->cb(cb->cbarg); - return; - } + struct cds_wfcq_node *node = + cds_wfcq_dequeue_blocking("a->jobs.head, "a->jobs.tail); + if (node == NULL) { + uint_fast32_t used = atomic_fetch_sub_relaxed("a->used, 1); + INSIST(used > 0); + return; } - used = atomic_fetch_sub_release("a->used, 1); - INSIST(used > 0); + isc_job_t *job = caa_container_of(node, isc_job_t, wfcq_node); + __tsan_acquire(job); + job->cb(job->cbarg); } isc_result_t @@ -115,17 +90,22 @@ isc_quota_acquire_cb(isc_quota_t *quota, isc_job_t *job, isc_job_cb cb, REQUIRE(VALID_QUOTA(quota)); REQUIRE(job == NULL || cb != NULL); - uint_fast32_t used = atomic_fetch_add_release("a->used, 1); - + uint_fast32_t used = atomic_fetch_add_relaxed("a->used, 1); uint_fast32_t max = atomic_load_relaxed("a->max); if (max != 0 && used >= max) { (void)atomic_fetch_sub_relaxed("a->used, 1); if (job != NULL) { job->cb = cb; job->cbarg = cbarg; - LOCK("a->cblock); - enqueue(quota, job); - UNLOCK("a->cblock); + cds_wfcq_node_init(&job->wfcq_node); + + /* + * The cds_wfcq_enqueue() is non-blocking (no internal + * mutex involved), so it offers a slight advantage. + */ + __tsan_release(job); + cds_wfcq_enqueue("a->jobs.head, "a->jobs.tail, + &job->wfcq_node); } return (ISC_R_QUOTA); } @@ -144,7 +124,7 @@ isc_quota_destroy(isc_quota_t *quota) { quota->magic = 0; INSIST(atomic_load("a->used) == 0); - INSIST(atomic_load("a->waiting) == 0); - INSIST(ISC_LIST_EMPTY(quota->jobs)); - isc_mutex_destroy("a->cblock); + INSIST(cds_wfcq_empty("a->jobs.head, "a->jobs.tail)); + + cds_wfcq_destroy("a->jobs.head, "a->jobs.tail); } diff --git a/tests/isc/udp_test.c b/tests/isc/udp_test.c index c8482e2d1b..5b8a0ba527 100644 --- a/tests/isc/udp_test.c +++ b/tests/isc/udp_test.c @@ -515,6 +515,9 @@ ISC_LOOP_TEST_IMPL(udp_timeout_recovery) { udp_timeout_recovery_connect_cb, listen_sock, T_SOFT); } +static void +udp_connect_udpconnect(void *arg ISC_ATTR_UNUSED); + static void udp_shutdown_connect_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) { @@ -523,20 +526,29 @@ udp_shutdown_connect_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, isc_refcount_decrement(&active_cconnects); - assert_int_equal(eresult, ISC_R_SHUTTINGDOWN); - - atomic_fetch_add(&cconnects, 1); + /* + * The first UDP connect is faster than asynchronous shutdown procedure, + * restart the UDP connect again and expect the failure only in the + * second loop. + */ + if (atomic_fetch_add(&cconnects, 1) == 0) { + assert_int_equal(eresult, ISC_R_SUCCESS); + isc_async_current(loopmgr, udp_connect_udpconnect, netmgr); + } else { + assert_int_equal(eresult, ISC_R_SHUTTINGDOWN); + } } static void udp_connect_udpconnect(void *arg ISC_ATTR_UNUSED) { + isc_refcount_increment0(&active_cconnects); isc_nm_udpconnect(netmgr, &udp_connect_addr, &udp_listen_addr, udp_shutdown_connect_connect_cb, NULL, T_SOFT); } ISC_SETUP_TEST_IMPL(udp_shutdown_connect) { setup_test(state); - expected_cconnects = 1; + expected_cconnects = 2; return (0); } @@ -548,7 +560,10 @@ ISC_TEARDOWN_TEST_IMPL(udp_shutdown_connect) { ISC_LOOP_TEST_IMPL(udp_shutdown_connect) { isc_loopmgr_shutdown(loopmgr); - isc_refcount_increment0(&active_cconnects); + /* + * isc_nm_udpconnect() is synchronous, so we need to launch this on the + * async loop. + */ isc_async_current(loopmgr, udp_connect_udpconnect, netmgr); }