Change the isc_quota API to use cds_wfcqueue internally

The isc_quota API was using locked list of isc_job_t objects to keep the
waiting TCP accepts.  Change the isc_quota implementation to use
cds_wfcqueue internally - the enqueue is wait-free and only dequeue
needs to be locked.
This commit is contained in:
Ondřej Surý 2023-05-08 23:31:54 +02:00
parent 0759612418
commit 00f1823366
No known key found for this signature in database
GPG key ID: 2820F37E873DEA41
2 changed files with 43 additions and 52 deletions

View file

@ -30,13 +30,16 @@
*** Imports.
***/
#include <isc/align.h>
#include <isc/atomic.h>
#include <isc/job.h>
#include <isc/lang.h>
#include <isc/magic.h>
#include <isc/mutex.h>
#include <isc/os.h>
#include <isc/refcount.h>
#include <isc/types.h>
#include <isc/urcu.h>
/*****
***** Types.
@ -46,15 +49,23 @@
ISC_LANG_BEGINDECLS
/*% isc_quota structure */
/*%
* isc_quota structure
*
* NOTE: We are using struct cds_wfcq_head which has an internal
* mutex, because we are using enqueue and dequeue, and dequeues need
* synchronization between multiple threads (see urcu/wfcqueue.h for
* detailed description).
*/
struct isc_quota {
int magic;
atomic_uint_fast32_t max;
atomic_uint_fast32_t used;
atomic_uint_fast32_t soft;
atomic_uint_fast32_t waiting;
isc_mutex_t cblock;
ISC_LIST(isc_job_t) jobs;
struct {
alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_head head;
alignas(ISC_OS_CACHELINE_SIZE) struct cds_wfcq_tail tail;
} jobs;
ISC_LINK(isc_quota_t) link;
};

View file

@ -17,6 +17,7 @@
#include <isc/atomic.h>
#include <isc/quota.h>
#include <isc/urcu.h>
#include <isc/util.h>
#define QUOTA_MAGIC ISC_MAGIC('Q', 'U', 'O', 'T')
@ -27,9 +28,7 @@ isc_quota_init(isc_quota_t *quota, unsigned int max) {
atomic_init(&quota->max, max);
atomic_init(&quota->used, 0);
atomic_init(&quota->soft, 0);
atomic_init(&quota->waiting, 0);
ISC_LIST_INIT(quota->jobs);
isc_mutex_init(&quota->cblock);
cds_wfcq_init(&quota->jobs.head, &quota->jobs.tail);
ISC_LINK_INIT(quota, link);
quota->magic = QUOTA_MAGIC;
}
@ -37,6 +36,7 @@ isc_quota_init(isc_quota_t *quota, unsigned int max) {
void
isc_quota_soft(isc_quota_t *quota, unsigned int soft) {
REQUIRE(VALID_QUOTA(quota));
REQUIRE(atomic_load_relaxed(&quota->max) > soft);
atomic_store_relaxed(&quota->soft, soft);
}
@ -64,49 +64,24 @@ isc_quota_getused(isc_quota_t *quota) {
return (atomic_load_relaxed(&quota->used));
}
/* Must be quota->cblock locked */
static void
enqueue(isc_quota_t *quota, isc_job_t *cb) {
REQUIRE(cb != NULL);
ISC_LIST_ENQUEUE(quota->jobs, cb, link);
atomic_fetch_add_relaxed(&quota->waiting, 1);
}
/* Must be quota->cblock locked */
static isc_job_t *
dequeue(isc_quota_t *quota) {
isc_job_t *cb = ISC_LIST_HEAD(quota->jobs);
if (cb != NULL) {
ISC_LIST_DEQUEUE(quota->jobs, cb, link);
atomic_fetch_sub_relaxed(&quota->waiting, 1);
}
return (cb);
}
void
isc_quota_release(isc_quota_t *quota) {
uint_fast32_t used;
/*
* This is opportunistic - we might race with a failing quota_attach_cb
* and not detect that something is waiting, but eventually someone will
* be releasing quota and will detect it, so we don't need to worry -
* and we're saving a lot by not locking cblock every time.
* We are using the cds_wfcq_dequeue_blocking() variant here that
* has an internal mutex because we need synchronization on
* multiple dequeues running from different threads.
*/
if (atomic_load_acquire(&quota->waiting) > 0) {
isc_job_t *cb = NULL;
LOCK(&quota->cblock);
cb = dequeue(quota);
UNLOCK(&quota->cblock);
if (cb != NULL) {
cb->cb(cb->cbarg);
return;
}
struct cds_wfcq_node *node =
cds_wfcq_dequeue_blocking(&quota->jobs.head, &quota->jobs.tail);
if (node == NULL) {
uint_fast32_t used = atomic_fetch_sub_relaxed(&quota->used, 1);
INSIST(used > 0);
return;
}
used = atomic_fetch_sub_release(&quota->used, 1);
INSIST(used > 0);
isc_job_t *job = caa_container_of(node, isc_job_t, wfcq_node);
__tsan_acquire(job);
job->cb(job->cbarg);
}
isc_result_t
@ -115,17 +90,22 @@ isc_quota_acquire_cb(isc_quota_t *quota, isc_job_t *job, isc_job_cb cb,
REQUIRE(VALID_QUOTA(quota));
REQUIRE(job == NULL || cb != NULL);
uint_fast32_t used = atomic_fetch_add_release(&quota->used, 1);
uint_fast32_t used = atomic_fetch_add_relaxed(&quota->used, 1);
uint_fast32_t max = atomic_load_relaxed(&quota->max);
if (max != 0 && used >= max) {
(void)atomic_fetch_sub_relaxed(&quota->used, 1);
if (job != NULL) {
job->cb = cb;
job->cbarg = cbarg;
LOCK(&quota->cblock);
enqueue(quota, job);
UNLOCK(&quota->cblock);
cds_wfcq_node_init(&job->wfcq_node);
/*
* The cds_wfcq_enqueue() is non-blocking (no internal
* mutex involved), so it offers a slight advantage.
*/
__tsan_release(job);
cds_wfcq_enqueue(&quota->jobs.head, &quota->jobs.tail,
&job->wfcq_node);
}
return (ISC_R_QUOTA);
}
@ -144,7 +124,7 @@ isc_quota_destroy(isc_quota_t *quota) {
quota->magic = 0;
INSIST(atomic_load(&quota->used) == 0);
INSIST(atomic_load(&quota->waiting) == 0);
INSIST(ISC_LIST_EMPTY(quota->jobs));
isc_mutex_destroy(&quota->cblock);
INSIST(cds_wfcq_empty(&quota->jobs.head, &quota->jobs.tail));
cds_wfcq_destroy(&quota->jobs.head, &quota->jobs.tail);
}