Require isc_timer to be manipulated on the timer loop

Each isc_timer needs to be created, started and destroyed on the current
loop.  The isc_timer_stop() can be run on any loop, but when run from
different loop than the one associated with the timer, the request to
stop the timer will be recorded in atomic variable and the underlying
uv_timer_t will be stopped on next uv_timer_t callback call.  This
allows any thread to stop the timer.
This commit is contained in:
Ondřej Surý 2022-09-12 11:06:09 +02:00 committed by Evan Hunt
parent 5a473b305d
commit 5319d4f6c5

View file

@ -39,24 +39,15 @@
struct isc_timer {
unsigned int magic;
isc_refcount_t references;
isc_loop_t *loop;
uv_timer_t timer;
isc_job_cb cb;
void *cbarg;
/*
* We are locking the values here for now, but this needs to go away
* when the timers are pinned to the respective loops.
*/
isc_mutex_t lock;
uint64_t timeout;
uint64_t repeat;
atomic_bool running;
};
static void
isc__timer_detach(isc_timer_t **timerp);
void
isc_timer_create(isc_loop_t *loop, isc_job_cb cb, void *cbarg,
isc_timer_t **timerp) {
@ -73,47 +64,38 @@ isc_timer_create(isc_loop_t *loop, isc_job_cb cb, void *cbarg,
REQUIRE(VALID_LOOPMGR(loopmgr));
REQUIRE(loop == isc_loop_current(loopmgr) ||
!atomic_load(&loopmgr->running) ||
atomic_load(&loopmgr->paused) > 0);
REQUIRE(loop == isc_loop_current(loopmgr));
timer = isc_mem_get(loop->mctx, sizeof(*timer));
*timer = (isc_timer_t){
.cb = cb,
.cbarg = cbarg,
.magic = TIMER_MAGIC,
};
isc_loop_attach(loop, &timer->loop);
isc_refcount_init(&timer->references, 1);
isc_mutex_init(&timer->lock);
timer->magic = TIMER_MAGIC;
r = uv_timer_init(&timer->loop->loop, &timer->timer);
r = uv_timer_init(&loop->loop, &timer->timer);
UV_RUNTIME_CHECK(uv_timer_init, r);
uv_handle_set_data(&timer->timer, timer);
*timerp = timer;
}
static void
isc__timer_stop(void *arg) {
isc_timer_t *timer = (isc_timer_t *)arg;
uv_timer_stop(&timer->timer);
isc__timer_detach(&timer);
}
void
isc_timer_stop(isc_timer_t *timer) {
REQUIRE(VALID_TIMER(timer));
isc_refcount_increment(&timer->references);
if (!atomic_compare_exchange_strong_acq_rel(&timer->running,
&(bool){ true }, false))
{
/* Timer was already stopped */
return;
}
/* Stop the timer, if the loops are matching */
if (timer->loop == isc_loop_current(timer->loop->loopmgr)) {
isc__timer_stop(timer);
} else {
isc_async_run(timer->loop, isc__timer_stop, timer);
uv_timer_stop(&timer->timer);
}
}
@ -123,30 +105,24 @@ timer_cb(uv_timer_t *handle) {
REQUIRE(VALID_TIMER(timer));
if (!atomic_load_acquire(&timer->running)) {
uv_timer_stop(&timer->timer);
return;
}
timer->cb(timer->cbarg);
}
static void
isc__timer_start(void *arg) {
isc_timer_t *timer = (isc_timer_t *)arg;
LOCK(&timer->lock);
int r = uv_timer_start(&timer->timer, timer_cb, timer->timeout,
timer->repeat);
UV_RUNTIME_CHECK(uv_timer_start, r);
UNLOCK(&timer->lock);
isc__timer_detach(&timer);
}
void
isc_timer_start(isc_timer_t *timer, isc_timertype_t type,
const isc_interval_t *interval) {
isc_loopmgr_t *loopmgr = NULL;
isc_loop_t *loop = NULL;
int r;
REQUIRE(VALID_TIMER(timer));
REQUIRE(type == isc_timertype_ticker || type == isc_timertype_once);
REQUIRE(timer->loop == isc_loop_current(timer->loop->loopmgr));
loop = timer->loop;
@ -156,7 +132,6 @@ isc_timer_start(isc_timer_t *timer, isc_timertype_t type,
REQUIRE(VALID_LOOPMGR(loopmgr));
LOCK(&timer->lock);
switch (type) {
case isc_timertype_once:
timer->timeout = isc_interval_ms(interval);
@ -168,18 +143,15 @@ isc_timer_start(isc_timer_t *timer, isc_timertype_t type,
default:
UNREACHABLE();
}
UNLOCK(&timer->lock);
isc_refcount_increment(&timer->references);
if (timer->loop == isc_loop_current(timer->loop->loopmgr)) {
isc__timer_start(timer);
} else {
isc_async_run(timer->loop, isc__timer_start, timer);
}
atomic_store_release(&timer->running, true);
r = uv_timer_start(&timer->timer, timer_cb, timer->timeout,
timer->repeat);
UV_RUNTIME_CHECK(uv_timer_start, r);
}
static void
timer_destroy(uv_handle_t *handle) {
timer_close(uv_handle_t *handle) {
isc_timer_t *timer = uv_handle_get_data(handle);
isc_loop_t *loop;
@ -187,24 +159,22 @@ timer_destroy(uv_handle_t *handle) {
loop = timer->loop;
isc_refcount_destroy(&timer->references);
isc_mutex_destroy(&timer->lock);
isc_mem_put(loop->mctx, timer, sizeof(*timer));
isc_loop_detach(&loop);
}
static void
isc__timer_destroy(void *arg) {
isc_timer_t *timer = (isc_timer_t *)arg;
timer_destroy(void *arg) {
isc_timer_t *timer = arg;
atomic_store_release(&timer->running, false);
uv_timer_stop(&timer->timer);
uv_close(&timer->timer, timer_destroy);
uv_close(&timer->timer, timer_close);
}
static void
isc__timer_detach(isc_timer_t **timerp) {
void
isc_timer_destroy(isc_timer_t **timerp) {
isc_timer_t *timer = NULL;
REQUIRE(timerp != NULL && VALID_TIMER(*timerp));
@ -212,18 +182,9 @@ isc__timer_detach(isc_timer_t **timerp) {
timer = *timerp;
*timerp = NULL;
if (isc_refcount_decrement(&timer->references) == 1) {
if (timer->loop == isc_loop_current(timer->loop->loopmgr)) {
isc__timer_destroy(timer);
} else {
isc_async_run(timer->loop, isc__timer_destroy, timer);
}
}
}
REQUIRE(timer->loop == isc_loop_current(timer->loop->loopmgr));
void
isc_timer_destroy(isc_timer_t **timerp) {
isc__timer_detach(timerp);
timer_destroy(timer);
}
void
@ -236,5 +197,5 @@ isc_timer_async_destroy(isc_timer_t **timerp) {
*timerp = NULL;
isc_timer_stop(timer);
isc_async_run(timer->loop, isc__timer_destroy, timer);
isc_async_run(timer->loop, timer_destroy, timer);
}