MEDIUM: tasks: Remove the per-thread group wait queue

Totally remove the per-thread group wait queue. This was potentially a
source of contention, because there were only a global lock for all
those wait queues.
Instead, for shared tasks, there is now the concept of ownership for the
task. When a task is in the wait queue, run queue, or is running on that
particular thread, the task's tid is set to -2 - thread_tid, and only
that thread will be responsible for it until it is no longer running,
and in none of its queue.
When a shared task is scheduled to be run at a later time, if its
current tid is -1, then the current thread will take ownership, and put
it in its own wait queue. If it is already owned, then TASK_WOKEN_WQ is
added to the task's state, and a task_wakeup() is done, so that the
owner thread will add it in its wait queue.
If there is any owner, then a task_wakeup() will just add the task to
the owner's runqueue, otherwise the current thread will become the
owner.
This commit is contained in:
Olivier Houchard 2026-04-28 18:46:54 +02:00 committed by Olivier Houchard
parent c9f3ddcb1e
commit 0988b9c773
3 changed files with 89 additions and 167 deletions

View file

@ -53,7 +53,7 @@
/* use this to check a task state or to clean it up before queueing */
#define TASK_WOKEN_ANY (TASK_WOKEN_OTHER|TASK_WOKEN_INIT|TASK_WOKEN_TIMER| \
TASK_WOKEN_IO|TASK_WOKEN_SIGNAL|TASK_WOKEN_MSG| \
TASK_WOKEN_RES)
TASK_WOKEN_RES|TASK_WOKEN_WQ)
#define TASK_F_TASKLET 0x00008000 /* nature of this task: 0=task 1=tasklet */
#define TASK_F_USR1 0x00010000 /* preserved user flag 1, application-specific, def:0 */
@ -61,7 +61,8 @@
#define TASK_F_UEVT2 0x00040000 /* one-shot user event type 2, application specific, def:0 */
#define TASK_F_WANTS_TIME 0x00080000 /* task/tasklet wants th_ctx->sched_call_date to be set */
#define TASK_F_UEVT3 0x00100000 /* one-shot user event type 3, application specific, def:0 */
/* unused: 0x200000..0x80000000 */
#define TASK_WOKEN_WQ 0x00200000 /* The task has been waken up only to be put in the wait queue, because its expire changed */
/* unused: 0x400000..0x80000000 */
/* These flags are persistent across scheduler calls */
#define TASK_PERSISTENT (TASK_SELF_WAKING | TASK_KILLED | \
@ -82,7 +83,7 @@ static forceinline char *task_show_state(char *buf, size_t len, const char *deli
_(TASK_KILLED, _(TASK_HEAVY, _(TASK_WOKEN_INIT,
_(TASK_WOKEN_TIMER, _(TASK_WOKEN_IO, _(TASK_WOKEN_SIGNAL,
_(TASK_WOKEN_MSG, _(TASK_WOKEN_RES, _(TASK_WOKEN_OTHER,
_(TASK_F_TASKLET, _(TASK_F_USR1))))))))))))));
_(TASK_F_TASKLET, _(TASK_F_USR1, _(TASK_WOKEN_WQ)))))))))))))));
/* epilogue */
_(~0U);
return buf;

View file

@ -98,7 +98,8 @@ struct list *__tasklet_wakeup_after(struct list *head, struct tasklet *tl);
void task_kill(struct task *t);
void tasklet_kill(struct tasklet *t);
void __task_wakeup(struct task *t);
void __task_queue(struct task *task, struct eb_root *wq);
void __task_queue(struct task *task);
static inline void _task_queue(struct task *task, const struct ha_caller *caller);
unsigned int run_tasks_from_lists(unsigned int budgets[]);
@ -325,12 +326,16 @@ static inline void task_drop_running(struct task *t, unsigned int f)
state = _HA_ATOMIC_LOAD(&t->state);
new_state = (state | f) &~ TASK_RUNNING;
cur_tid = t->tid;
if ((new_state & TASK_WOKEN_WQ) && __task_get_current_owner(cur_tid) == tid) {
_task_queue(t, NULL);
new_state &= ~TASK_WOKEN_WQ;
}
if (new_state & TASK_WOKEN_ANY)
new_state |= TASK_QUEUED;
cur_tid = t->tid;
if ((new_state & TASK_QUEUED) || cur_tid >= 0)
if ((new_state & TASK_QUEUED) || cur_tid >= 0 || task_in_wq(t))
new_tid = cur_tid;
else
new_tid = -1;
@ -399,34 +404,17 @@ static inline void _task_queue(struct task *task, const struct ha_caller *caller
if (!tick_isset(task->expire))
return;
#ifdef USE_THREAD
if (task->tid < 0) {
HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
if (likely(caller)) {
caller = HA_ATOMIC_XCHG(&task->caller, caller);
BUG_ON((ulong)caller & 1);
BUG_ON(task->tid >= 0 && task->tid != tid);
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
if (likely(caller)) {
caller = HA_ATOMIC_XCHG(&task->caller, caller);
BUG_ON((ulong)caller & 1);
#ifdef DEBUG_TASK
HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
#endif
}
__task_queue(task, &tg_ctx->timers);
}
HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock);
} else
#endif
{
BUG_ON(task->tid != tid);
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
if (likely(caller)) {
caller = HA_ATOMIC_XCHG(&task->caller, caller);
BUG_ON((ulong)caller & 1);
#ifdef DEBUG_TASK
HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
#endif
}
__task_queue(task, &th_ctx->timers);
}
__task_queue(task);
}
}
@ -446,6 +434,11 @@ static inline void task_set_thread(struct task *t, int thr)
/* no shared queue without threads */
thr = 0;
#endif
/*
* Nothing to do, the task is only temporarily owned
*/
if (thr == -1 && t->tid == -2 - tid)
return;
if (unlikely(task_in_wq(t))) {
task_unlink_wq(t);
t->tid = thr;
@ -790,11 +783,11 @@ static inline void tasklet_set_tid(struct tasklet *tl, int tid)
static inline void _task_schedule(struct task *task, int when, const struct ha_caller *caller)
{
int did_lock = 0;
/* TODO: mthread, check if there is no task with this test */
if (task_in_rq(task))
return;
#ifdef USE_THREAD
if (task->tid < 0) {
/*
* If the task is already running, then just wake it up, just
@ -812,44 +805,26 @@ static inline void _task_schedule(struct task *task, int when, const struct ha_c
task_wakeup(task, TASK_WOKEN_OTHER);
return;
}
/* FIXME: is it really needed to lock the WQ during the check ? */
HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
if (task_in_wq(task))
when = tick_first(when, task->expire);
task->expire = when;
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
if (likely(caller)) {
caller = HA_ATOMIC_XCHG(&task->caller, caller);
BUG_ON((ulong)caller & 1);
#ifdef DEBUG_TASK
HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
#endif
}
__task_queue(task, &tg_ctx->timers);
}
task_drop_running(task, 0);
HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock);
did_lock = 1;
} else
#endif
{
BUG_ON(task->tid != tid);
if (task_in_wq(task))
when = tick_first(when, task->expire);
task->expire = when;
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
if (likely(caller)) {
caller = HA_ATOMIC_XCHG(&task->caller, caller);
BUG_ON((ulong)caller & 1);
if (task_in_wq(task))
when = tick_first(when, task->expire);
task->expire = when;
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) {
if (likely(caller)) {
caller = HA_ATOMIC_XCHG(&task->caller, caller);
BUG_ON((ulong)caller & 1);
#ifdef DEBUG_TASK
HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
HA_ATOMIC_STORE(&task->debug.prev_caller, caller);
#endif
}
__task_queue(task, &th_ctx->timers);
}
__task_queue(task);
}
if (did_lock)
task_drop_running(task, 0);
}
/* returns the string corresponding to a task type as found in the task caller

View file

@ -222,7 +222,13 @@ struct list *__tasklet_wakeup_after(struct list *head, struct tasklet *tl)
void __task_wakeup(struct task *t)
{
struct eb_root *root = &th_ctx->rqueue;
int thr __maybe_unused = t->tid >= 0 ? t->tid : tid;
/*
* At this point the task tid should always be set to the relevant
* thread, so we can just use __task_get_current_owner();
*/
int thr __maybe_unused = __task_get_current_owner(t->tid);
BUG_ON(t->tid == -1);
#ifdef USE_THREAD
if (thr != tid) {
@ -281,18 +287,38 @@ void __task_wakeup(struct task *t)
* at all about locking so the caller must be careful when deciding whether to
* lock or not around this call.
*/
void __task_queue(struct task *task, struct eb_root *wq)
void __task_queue(struct task *task)
{
#ifdef USE_THREAD
BUG_ON((wq == &tg_ctx->timers && task->tid >= 0) ||
(wq == &th_ctx->timers && task->tid < 0) ||
(wq != &tg_ctx->timers && wq != &th_ctx->timers));
#endif
int old_state, new_state;
int old_tid;
int cur_owner;
/* if this happens the process is doomed anyway, so better catch it now
* so that we have the caller in the stack.
*/
BUG_ON(task->expire == TICK_ETERNITY);
do {
new_state = old_state = _HA_ATOMIC_LOAD(&task->state);
if (old_state & TASK_KILLED)
return;
old_tid = _HA_ATOMIC_LOAD(&task->tid);
cur_owner = __task_get_current_owner(old_tid);
if (old_tid != -1 && cur_owner != tid)
new_state |= TASK_WOKEN_WQ;
} while (!(__task_set_state_and_tid(task, old_tid, __task_get_new_tid_field(old_tid), old_state, new_state)));
if (cur_owner != tid && cur_owner != -1) {
/*
* If the task has already been woken up to be added in the
* wait queue, nothing left to do, the target thread will
* eventually do the right thing.
*/
if (!(old_state & TASK_WOKEN_WQ))
_task_wakeup(task, 0, NULL);
return;
}
if (likely(task_in_wq(task)))
__task_unlink_wq(task);
@ -304,7 +330,7 @@ void __task_queue(struct task *task, struct eb_root *wq)
return;
#endif
eb32_insert(wq, &task->wq);
eb32_insert(&th_ctx->timers, &task->wq);
}
/*
@ -317,7 +343,6 @@ void wake_expired_tasks()
int max_processed = global.tune.runqueue_depth;
struct task *task;
struct eb32_node *eb;
__decl_thread(int key);
while (1) {
if (max_processed-- <= 0)
@ -364,7 +389,7 @@ void wake_expired_tasks()
*/
__task_unlink_wq(task);
if (tick_isset(task->expire))
__task_queue(task, &tt->timers);
__task_queue(task);
}
else {
/* task not expired and correctly placed. It may not be eternal. */
@ -372,99 +397,6 @@ void wake_expired_tasks()
break;
}
}
#ifdef USE_THREAD
if (eb_is_empty(&tg_ctx->timers))
goto leave;
HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &wq_lock);
eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK);
if (!eb) {
eb = eb32_first(&tg_ctx->timers);
if (likely(!eb)) {
HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
goto leave;
}
}
key = eb->key;
if (tick_is_lt(now_ms, key)) {
HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
goto leave;
}
/* There's really something of interest here, let's visit the queue */
if (HA_RWLOCK_TRYRDTOSK(TASK_WQ_LOCK, &wq_lock)) {
/* if we failed to grab the lock it means another thread is
* already doing the same here, so let it do the job.
*/
HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock);
goto leave;
}
while (1) {
lookup_next:
if (max_processed-- <= 0)
break;
eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK);
if (!eb) {
/* we might have reached the end of the tree, typically because
* <now_ms> is in the first half and we're first scanning the last
* half. Let's loop back to the beginning of the tree now.
*/
eb = eb32_first(&tg_ctx->timers);
if (likely(!eb))
break;
}
task = eb32_entry(eb, struct task, wq);
/* Check for any competing run of the task (quite rare but may
* involve a dangerous concurrent access on task->expire). In
* order to protect against this, we'll take an exclusive access
* on TASK_RUNNING before checking/touching task->expire. If the
* task is already RUNNING on another thread, it will deal by
* itself with the requeuing so we must not do anything and
* simply quit the loop for now, because we cannot wait with the
* WQ lock held as this would prevent the running thread from
* requeuing the task. One annoying effect of holding RUNNING
* here is that a concurrent task_wakeup() will refrain from
* waking it up. This forces us to check for a wakeup after
* releasing the flag.
*/
if (HA_ATOMIC_FETCH_OR(&task->state, TASK_RUNNING) & TASK_RUNNING)
break;
if (tick_is_expired(task->expire, now_ms)) {
/* expired task, wake it up */
HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock);
__task_unlink_wq(task);
HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock);
task_drop_running(task, TASK_WOKEN_TIMER);
}
else if (task->expire != eb->key) {
/* task is not expired but its key doesn't match so let's
* update it and skip to next apparently expired task.
*/
HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock);
__task_unlink_wq(task);
if (tick_isset(task->expire))
__task_queue(task, &tg_ctx->timers);
HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock);
task_drop_running(task, 0);
goto lookup_next;
}
else {
/* task not expired and correctly placed. It may not be eternal. */
BUG_ON(task->expire == TICK_ETERNITY);
task_drop_running(task, 0);
break;
}
}
HA_RWLOCK_SKUNLOCK(TASK_WQ_LOCK, &wq_lock);
#endif
leave:
return;
}
@ -658,6 +590,19 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
goto next;
}
if (state & TASK_WOKEN_WQ) {
/* We should add this task to our wait queue */
task_queue(t);
/*
* If this is the only reason the task got scheduled,
* then we don't actually have ot run it.
*/
if ((state & TASK_WOKEN_ANY) == TASK_WOKEN_WQ) {
task_drop_running(t, 0);
goto next;
}
state &= ~TASK_WOKEN_WQ;
}
/* OK now the task or tasklet is well alive and is going to be run */
if (state & TASK_F_TASKLET) {
/* this is a tasklet */
@ -686,7 +631,8 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
__task_free(t);
}
else {
task_queue(t);
if (__task_get_current_owner(t->tid) == tid)
task_queue(t);
task_drop_running(t, 0);
}
}