diff --git a/lib/isc/include/isc/quota.h b/lib/isc/include/isc/quota.h index 22e653345c..85654dd66f 100644 --- a/lib/isc/include/isc/quota.h +++ b/lib/isc/include/isc/quota.h @@ -30,13 +30,16 @@ *** Imports. ***/ +#include #include #include #include #include #include +#include #include #include +#include /***** ***** Types. @@ -46,15 +49,23 @@ ISC_LANG_BEGINDECLS -/*% isc_quota structure */ +/*% + * isc_quota structure + * + * NOTE: We are using struct cds_wfcq_head which has an internal + * mutex, because we are using enqueue and dequeue, and dequeues need + * synchronization between multiple threads (see urcu/wfcqueue.h for + * detailed description). + */ struct isc_quota { int magic; atomic_uint_fast32_t max; atomic_uint_fast32_t used; atomic_uint_fast32_t soft; - atomic_uint_fast32_t waiting; - isc_mutex_t cblock; - ISC_LIST(isc_job_t) jobs; + struct { + alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_head head; + alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_tail tail; + } jobs; ISC_LINK(isc_quota_t) link; }; diff --git a/lib/isc/quota.c b/lib/isc/quota.c index 4dbdbc5781..4816333e1c 100644 --- a/lib/isc/quota.c +++ b/lib/isc/quota.c @@ -17,6 +17,7 @@ #include #include +#include #include #define QUOTA_MAGIC ISC_MAGIC('Q', 'U', 'O', 'T') @@ -27,9 +28,7 @@ isc_quota_init(isc_quota_t *quota, unsigned int max) { atomic_init("a->max, max); atomic_init("a->used, 0); atomic_init("a->soft, 0); - atomic_init("a->waiting, 0); - ISC_LIST_INIT(quota->jobs); - isc_mutex_init("a->cblock); + cds_wfcq_init("a->jobs.head, "a->jobs.tail); ISC_LINK_INIT(quota, link); quota->magic = QUOTA_MAGIC; } @@ -37,6 +36,7 @@ isc_quota_init(isc_quota_t *quota, unsigned int max) { void isc_quota_soft(isc_quota_t *quota, unsigned int soft) { REQUIRE(VALID_QUOTA(quota)); + REQUIRE(atomic_load_relaxed("a->max) > soft); atomic_store_relaxed("a->soft, soft); } @@ -64,49 +64,24 @@ isc_quota_getused(isc_quota_t *quota) { return (atomic_load_relaxed("a->used)); } -/* Must be quota->cblock locked */ -static void -enqueue(isc_quota_t *quota, isc_job_t *cb) { - REQUIRE(cb != NULL); - ISC_LIST_ENQUEUE(quota->jobs, cb, link); - atomic_fetch_add_relaxed("a->waiting, 1); -} - -/* Must be quota->cblock locked */ -static isc_job_t * -dequeue(isc_quota_t *quota) { - isc_job_t *cb = ISC_LIST_HEAD(quota->jobs); - if (cb != NULL) { - ISC_LIST_DEQUEUE(quota->jobs, cb, link); - atomic_fetch_sub_relaxed("a->waiting, 1); - } - return (cb); -} - void isc_quota_release(isc_quota_t *quota) { - uint_fast32_t used; - /* - * This is opportunistic - we might race with a failing quota_attach_cb - * and not detect that something is waiting, but eventually someone will - * be releasing quota and will detect it, so we don't need to worry - - * and we're saving a lot by not locking cblock every time. + * We are using the cds_wfcq_dequeue_blocking() variant here that + * has an internal mutex because we need synchronization on + * multiple dequeues running from different threads. */ - - if (atomic_load_acquire("a->waiting) > 0) { - isc_job_t *cb = NULL; - LOCK("a->cblock); - cb = dequeue(quota); - UNLOCK("a->cblock); - if (cb != NULL) { - cb->cb(cb->cbarg); - return; - } + struct cds_wfcq_node *node = + cds_wfcq_dequeue_blocking("a->jobs.head, "a->jobs.tail); + if (node == NULL) { + uint_fast32_t used = atomic_fetch_sub_relaxed("a->used, 1); + INSIST(used > 0); + return; } - used = atomic_fetch_sub_release("a->used, 1); - INSIST(used > 0); + isc_job_t *job = caa_container_of(node, isc_job_t, wfcq_node); + __tsan_acquire(job); + job->cb(job->cbarg); } isc_result_t @@ -115,17 +90,22 @@ isc_quota_acquire_cb(isc_quota_t *quota, isc_job_t *job, isc_job_cb cb, REQUIRE(VALID_QUOTA(quota)); REQUIRE(job == NULL || cb != NULL); - uint_fast32_t used = atomic_fetch_add_release("a->used, 1); - + uint_fast32_t used = atomic_fetch_add_relaxed("a->used, 1); uint_fast32_t max = atomic_load_relaxed("a->max); if (max != 0 && used >= max) { (void)atomic_fetch_sub_relaxed("a->used, 1); if (job != NULL) { job->cb = cb; job->cbarg = cbarg; - LOCK("a->cblock); - enqueue(quota, job); - UNLOCK("a->cblock); + cds_wfcq_node_init(&job->wfcq_node); + + /* + * The cds_wfcq_enqueue() is non-blocking (no internal + * mutex involved), so it offers a slight advantage. + */ + __tsan_release(job); + cds_wfcq_enqueue("a->jobs.head, "a->jobs.tail, + &job->wfcq_node); } return (ISC_R_QUOTA); } @@ -144,7 +124,7 @@ isc_quota_destroy(isc_quota_t *quota) { quota->magic = 0; INSIST(atomic_load("a->used) == 0); - INSIST(atomic_load("a->waiting) == 0); - INSIST(ISC_LIST_EMPTY(quota->jobs)); - isc_mutex_destroy("a->cblock); + INSIST(cds_wfcq_empty("a->jobs.head, "a->jobs.tail)); + + cds_wfcq_destroy("a->jobs.head, "a->jobs.tail); }