diff --git a/lib/isc/include/isc/atomic.h b/lib/isc/include/isc/atomic.h index e39e07880b..2d1a09fb33 100644 --- a/lib/isc/include/isc/atomic.h +++ b/lib/isc/include/isc/atomic.h @@ -60,6 +60,8 @@ atomic_exchange_explicit((o), (v), memory_order_acquire) #define atomic_exchange_acq_rel(o, v) \ atomic_exchange_explicit((o), (v), memory_order_acq_rel) +#define atomic_fetch_add_acq_rel(o, v) \ + atomic_fetch_add_explicit((o), (v), memory_order_acq_rel) #define atomic_fetch_sub_acq_rel(o, v) \ atomic_fetch_sub_explicit((o), (v), memory_order_acq_rel) #define atomic_compare_exchange_weak_acq_rel(o, e, d) \ diff --git a/lib/isc/quota.c b/lib/isc/quota.c index 06ad1e4627..2f5b04e8b9 100644 --- a/lib/isc/quota.c +++ b/lib/isc/quota.c @@ -60,11 +60,12 @@ isc_quota_getsoft(isc_quota_t *quota) { unsigned int isc_quota_getused(isc_quota_t *quota) { REQUIRE(VALID_QUOTA(quota)); - return atomic_load_relaxed("a->used); + return atomic_load_acquire("a->used); } void isc_quota_release(isc_quota_t *quota) { + struct cds_wfcq_node *node; /* * We are using the cds_wfcq_dequeue_blocking() variant here that * has an internal mutex because we need synchronization on @@ -74,11 +75,25 @@ isc_quota_release(isc_quota_t *quota) { * with cds_wfcq_empty() before acquiring the internal lock, so if * there's nothing queued, the call should be very lightweight. */ - struct cds_wfcq_node *node = - cds_wfcq_dequeue_blocking("a->jobs.head, "a->jobs.tail); +again: + node = cds_wfcq_dequeue_blocking("a->jobs.head, "a->jobs.tail); if (node == NULL) { - uint_fast32_t used = atomic_fetch_sub_relaxed("a->used, 1); + uint_fast32_t used = atomic_fetch_sub_acq_rel("a->used, 1); INSIST(used > 0); + + /* + * If this was the last quota released and in the meantime a + * new job has appeared in the queue, then give it a chance + * to run, otherwise it could get stuck there until a new quota + * is acquired and released again. + */ + if (used == 1 && + !cds_wfcq_empty("a->jobs.head, "a->jobs.tail)) + { + atomic_fetch_add_acq_rel("a->used, 1); + goto again; + } + return; } @@ -92,10 +107,10 @@ isc_quota_acquire_cb(isc_quota_t *quota, isc_job_t *job, isc_job_cb cb, REQUIRE(VALID_QUOTA(quota)); REQUIRE(job == NULL || cb != NULL); - uint_fast32_t used = atomic_fetch_add_relaxed("a->used, 1); + uint_fast32_t used = atomic_fetch_add_acq_rel("a->used, 1); uint_fast32_t max = atomic_load_relaxed("a->max); if (max != 0 && used >= max) { - (void)atomic_fetch_sub_relaxed("a->used, 1); + (void)atomic_fetch_sub_acq_rel("a->used, 1); if (job != NULL) { job->cb = cb; job->cbarg = cbarg; @@ -107,6 +122,23 @@ isc_quota_acquire_cb(isc_quota_t *quota, isc_job_t *job, isc_job_cb cb, */ cds_wfcq_enqueue("a->jobs.head, "a->jobs.tail, &job->wfcq_node); + + /* + * While we were initializing and enqueuing a new node, + * quotas might have been released, and if no quota is + * used any more, then our newly enqueued job won't + * have a chance to get running until a new quota is + * acquired and released. To avoid a hangup, check + * quota->used again, if it's 0 then simulate a quota + * acquire/release for the current job to run as soon as + * possible, although we will still return ISC_R_QUOTA + * to the caller. + */ + if (atomic_compare_exchange_strong_acq_rel( + "a->used, &(uint_fast32_t){ 0 }, 1)) + { + isc_quota_release(quota); + } } return ISC_R_QUOTA; } @@ -124,7 +156,7 @@ isc_quota_destroy(isc_quota_t *quota) { REQUIRE(VALID_QUOTA(quota)); quota->magic = 0; - INSIST(atomic_load("a->used) == 0); + INSIST(atomic_load_acquire("a->used) == 0); INSIST(cds_wfcq_empty("a->jobs.head, "a->jobs.tail)); cds_wfcq_destroy("a->jobs.head, "a->jobs.tail);