From 0988b9c7730663e24c0809879e40ffcdb245c1e9 Mon Sep 17 00:00:00 2001 From: Olivier Houchard Date: Tue, 28 Apr 2026 18:46:54 +0200 Subject: [PATCH] MEDIUM: tasks: Remove the per-thread group wait queue Totally remove the per-thread group wait queue. This was potentially a source of contention, because there were only a global lock for all those wait queues. Instead, for shared tasks, there is now the concept of ownership for the task. When a task is in the wait queue, run queue, or is running on that particular thread, the task's tid is set to -2 - thread_tid, and only that thread will be responsible for it until it is no longer running, and in none of its queue. When a shared task is scheduled to be run at a later time, if its current tid is -1, then the current thread will take ownership, and put it in its own wait queue. If it is already owned, then TASK_WOKEN_WQ is added to the task's state, and a task_wakeup() is done, so that the owner thread will add it in its wait queue. If there is any owner, then a task_wakeup() will just add the task to the owner's runqueue, otherwise the current thread will become the owner. --- include/haproxy/task-t.h | 7 +- include/haproxy/task.h | 95 +++++++++--------------- src/task.c | 154 +++++++++++++-------------------------- 3 files changed, 89 insertions(+), 167 deletions(-) diff --git a/include/haproxy/task-t.h b/include/haproxy/task-t.h index 45d68a85a..787409f26 100644 --- a/include/haproxy/task-t.h +++ b/include/haproxy/task-t.h @@ -53,7 +53,7 @@ /* use this to check a task state or to clean it up before queueing */ #define TASK_WOKEN_ANY (TASK_WOKEN_OTHER|TASK_WOKEN_INIT|TASK_WOKEN_TIMER| \ TASK_WOKEN_IO|TASK_WOKEN_SIGNAL|TASK_WOKEN_MSG| \ - TASK_WOKEN_RES) + TASK_WOKEN_RES|TASK_WOKEN_WQ) #define TASK_F_TASKLET 0x00008000 /* nature of this task: 0=task 1=tasklet */ #define TASK_F_USR1 0x00010000 /* preserved user flag 1, application-specific, def:0 */ @@ -61,7 +61,8 @@ #define TASK_F_UEVT2 0x00040000 /* one-shot user event type 2, application specific, def:0 */ #define TASK_F_WANTS_TIME 0x00080000 /* task/tasklet wants th_ctx->sched_call_date to be set */ #define TASK_F_UEVT3 0x00100000 /* one-shot user event type 3, application specific, def:0 */ -/* unused: 0x200000..0x80000000 */ +#define TASK_WOKEN_WQ 0x00200000 /* The task has been waken up only to be put in the wait queue, because its expire changed */ +/* unused: 0x400000..0x80000000 */ /* These flags are persistent across scheduler calls */ #define TASK_PERSISTENT (TASK_SELF_WAKING | TASK_KILLED | \ @@ -82,7 +83,7 @@ static forceinline char *task_show_state(char *buf, size_t len, const char *deli _(TASK_KILLED, _(TASK_HEAVY, _(TASK_WOKEN_INIT, _(TASK_WOKEN_TIMER, _(TASK_WOKEN_IO, _(TASK_WOKEN_SIGNAL, _(TASK_WOKEN_MSG, _(TASK_WOKEN_RES, _(TASK_WOKEN_OTHER, - _(TASK_F_TASKLET, _(TASK_F_USR1)))))))))))))); + _(TASK_F_TASKLET, _(TASK_F_USR1, _(TASK_WOKEN_WQ))))))))))))))); /* epilogue */ _(~0U); return buf; diff --git a/include/haproxy/task.h b/include/haproxy/task.h index 6f76e43a8..3ced8a9b2 100644 --- a/include/haproxy/task.h +++ b/include/haproxy/task.h @@ -98,7 +98,8 @@ struct list *__tasklet_wakeup_after(struct list *head, struct tasklet *tl); void task_kill(struct task *t); void tasklet_kill(struct tasklet *t); void __task_wakeup(struct task *t); -void __task_queue(struct task *task, struct eb_root *wq); +void __task_queue(struct task *task); +static inline void _task_queue(struct task *task, const struct ha_caller *caller); unsigned int run_tasks_from_lists(unsigned int budgets[]); @@ -325,12 +326,16 @@ static inline void task_drop_running(struct task *t, unsigned int f) state = _HA_ATOMIC_LOAD(&t->state); new_state = (state | f) &~ TASK_RUNNING; + cur_tid = t->tid; + if ((new_state & TASK_WOKEN_WQ) && __task_get_current_owner(cur_tid) == tid) { + _task_queue(t, NULL); + new_state &= ~TASK_WOKEN_WQ; + } if (new_state & TASK_WOKEN_ANY) new_state |= TASK_QUEUED; - cur_tid = t->tid; - if ((new_state & TASK_QUEUED) || cur_tid >= 0) + if ((new_state & TASK_QUEUED) || cur_tid >= 0 || task_in_wq(t)) new_tid = cur_tid; else new_tid = -1; @@ -399,34 +404,17 @@ static inline void _task_queue(struct task *task, const struct ha_caller *caller if (!tick_isset(task->expire)) return; -#ifdef USE_THREAD - if (task->tid < 0) { - HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock); - if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) { - if (likely(caller)) { - caller = HA_ATOMIC_XCHG(&task->caller, caller); - BUG_ON((ulong)caller & 1); + BUG_ON(task->tid >= 0 && task->tid != tid); + + if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) { + if (likely(caller)) { + caller = HA_ATOMIC_XCHG(&task->caller, caller); + BUG_ON((ulong)caller & 1); #ifdef DEBUG_TASK - HA_ATOMIC_STORE(&task->debug.prev_caller, caller); + HA_ATOMIC_STORE(&task->debug.prev_caller, caller); #endif - } - __task_queue(task, &tg_ctx->timers); - } - HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock); - } else -#endif - { - BUG_ON(task->tid != tid); - if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) { - if (likely(caller)) { - caller = HA_ATOMIC_XCHG(&task->caller, caller); - BUG_ON((ulong)caller & 1); -#ifdef DEBUG_TASK - HA_ATOMIC_STORE(&task->debug.prev_caller, caller); -#endif - } - __task_queue(task, &th_ctx->timers); } + __task_queue(task); } } @@ -446,6 +434,11 @@ static inline void task_set_thread(struct task *t, int thr) /* no shared queue without threads */ thr = 0; #endif + /* + * Nothing to do, the task is only temporarily owned + */ + if (thr == -1 && t->tid == -2 - tid) + return; if (unlikely(task_in_wq(t))) { task_unlink_wq(t); t->tid = thr; @@ -790,11 +783,11 @@ static inline void tasklet_set_tid(struct tasklet *tl, int tid) static inline void _task_schedule(struct task *task, int when, const struct ha_caller *caller) { + int did_lock = 0; /* TODO: mthread, check if there is no task with this test */ if (task_in_rq(task)) return; -#ifdef USE_THREAD if (task->tid < 0) { /* * If the task is already running, then just wake it up, just @@ -812,44 +805,26 @@ static inline void _task_schedule(struct task *task, int when, const struct ha_c task_wakeup(task, TASK_WOKEN_OTHER); return; } - - /* FIXME: is it really needed to lock the WQ during the check ? */ - HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock); - if (task_in_wq(task)) - when = tick_first(when, task->expire); - - task->expire = when; - if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) { - if (likely(caller)) { - caller = HA_ATOMIC_XCHG(&task->caller, caller); - BUG_ON((ulong)caller & 1); -#ifdef DEBUG_TASK - HA_ATOMIC_STORE(&task->debug.prev_caller, caller); -#endif - } - __task_queue(task, &tg_ctx->timers); - } - task_drop_running(task, 0); - HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock); + did_lock = 1; } else -#endif - { BUG_ON(task->tid != tid); - if (task_in_wq(task)) - when = tick_first(when, task->expire); - task->expire = when; - if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) { - if (likely(caller)) { - caller = HA_ATOMIC_XCHG(&task->caller, caller); - BUG_ON((ulong)caller & 1); + if (task_in_wq(task)) + when = tick_first(when, task->expire); + + task->expire = when; + if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) { + if (likely(caller)) { + caller = HA_ATOMIC_XCHG(&task->caller, caller); + BUG_ON((ulong)caller & 1); #ifdef DEBUG_TASK - HA_ATOMIC_STORE(&task->debug.prev_caller, caller); + HA_ATOMIC_STORE(&task->debug.prev_caller, caller); #endif - } - __task_queue(task, &th_ctx->timers); } + __task_queue(task); } + if (did_lock) + task_drop_running(task, 0); } /* returns the string corresponding to a task type as found in the task caller diff --git a/src/task.c b/src/task.c index 80f336803..de5849938 100644 --- a/src/task.c +++ b/src/task.c @@ -222,7 +222,13 @@ struct list *__tasklet_wakeup_after(struct list *head, struct tasklet *tl) void __task_wakeup(struct task *t) { struct eb_root *root = &th_ctx->rqueue; - int thr __maybe_unused = t->tid >= 0 ? t->tid : tid; + /* + * At this point the task tid should always be set to the relevant + * thread, so we can just use __task_get_current_owner(); + */ + int thr __maybe_unused = __task_get_current_owner(t->tid); + + BUG_ON(t->tid == -1); #ifdef USE_THREAD if (thr != tid) { @@ -281,18 +287,38 @@ void __task_wakeup(struct task *t) * at all about locking so the caller must be careful when deciding whether to * lock or not around this call. */ -void __task_queue(struct task *task, struct eb_root *wq) +void __task_queue(struct task *task) { -#ifdef USE_THREAD - BUG_ON((wq == &tg_ctx->timers && task->tid >= 0) || - (wq == &th_ctx->timers && task->tid < 0) || - (wq != &tg_ctx->timers && wq != &th_ctx->timers)); -#endif + int old_state, new_state; + int old_tid; + int cur_owner; + /* if this happens the process is doomed anyway, so better catch it now * so that we have the caller in the stack. */ BUG_ON(task->expire == TICK_ETERNITY); + do { + new_state = old_state = _HA_ATOMIC_LOAD(&task->state); + if (old_state & TASK_KILLED) + return; + old_tid = _HA_ATOMIC_LOAD(&task->tid); + cur_owner = __task_get_current_owner(old_tid); + if (old_tid != -1 && cur_owner != tid) + new_state |= TASK_WOKEN_WQ; + } while (!(__task_set_state_and_tid(task, old_tid, __task_get_new_tid_field(old_tid), old_state, new_state))); + + if (cur_owner != tid && cur_owner != -1) { + /* + * If the task has already been woken up to be added in the + * wait queue, nothing left to do, the target thread will + * eventually do the right thing. + */ + if (!(old_state & TASK_WOKEN_WQ)) + _task_wakeup(task, 0, NULL); + return; + } + if (likely(task_in_wq(task))) __task_unlink_wq(task); @@ -304,7 +330,7 @@ void __task_queue(struct task *task, struct eb_root *wq) return; #endif - eb32_insert(wq, &task->wq); + eb32_insert(&th_ctx->timers, &task->wq); } /* @@ -317,7 +343,6 @@ void wake_expired_tasks() int max_processed = global.tune.runqueue_depth; struct task *task; struct eb32_node *eb; - __decl_thread(int key); while (1) { if (max_processed-- <= 0) @@ -364,7 +389,7 @@ void wake_expired_tasks() */ __task_unlink_wq(task); if (tick_isset(task->expire)) - __task_queue(task, &tt->timers); + __task_queue(task); } else { /* task not expired and correctly placed. It may not be eternal. */ @@ -372,99 +397,6 @@ void wake_expired_tasks() break; } } - -#ifdef USE_THREAD - if (eb_is_empty(&tg_ctx->timers)) - goto leave; - - HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &wq_lock); - eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK); - if (!eb) { - eb = eb32_first(&tg_ctx->timers); - if (likely(!eb)) { - HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock); - goto leave; - } - } - key = eb->key; - - if (tick_is_lt(now_ms, key)) { - HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock); - goto leave; - } - - /* There's really something of interest here, let's visit the queue */ - - if (HA_RWLOCK_TRYRDTOSK(TASK_WQ_LOCK, &wq_lock)) { - /* if we failed to grab the lock it means another thread is - * already doing the same here, so let it do the job. - */ - HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock); - goto leave; - } - - while (1) { - lookup_next: - if (max_processed-- <= 0) - break; - eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK); - if (!eb) { - /* we might have reached the end of the tree, typically because - * is in the first half and we're first scanning the last - * half. Let's loop back to the beginning of the tree now. - */ - eb = eb32_first(&tg_ctx->timers); - if (likely(!eb)) - break; - } - - task = eb32_entry(eb, struct task, wq); - - /* Check for any competing run of the task (quite rare but may - * involve a dangerous concurrent access on task->expire). In - * order to protect against this, we'll take an exclusive access - * on TASK_RUNNING before checking/touching task->expire. If the - * task is already RUNNING on another thread, it will deal by - * itself with the requeuing so we must not do anything and - * simply quit the loop for now, because we cannot wait with the - * WQ lock held as this would prevent the running thread from - * requeuing the task. One annoying effect of holding RUNNING - * here is that a concurrent task_wakeup() will refrain from - * waking it up. This forces us to check for a wakeup after - * releasing the flag. - */ - if (HA_ATOMIC_FETCH_OR(&task->state, TASK_RUNNING) & TASK_RUNNING) - break; - - if (tick_is_expired(task->expire, now_ms)) { - /* expired task, wake it up */ - HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock); - __task_unlink_wq(task); - HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock); - task_drop_running(task, TASK_WOKEN_TIMER); - } - else if (task->expire != eb->key) { - /* task is not expired but its key doesn't match so let's - * update it and skip to next apparently expired task. - */ - HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock); - __task_unlink_wq(task); - if (tick_isset(task->expire)) - __task_queue(task, &tg_ctx->timers); - HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock); - task_drop_running(task, 0); - goto lookup_next; - } - else { - /* task not expired and correctly placed. It may not be eternal. */ - BUG_ON(task->expire == TICK_ETERNITY); - task_drop_running(task, 0); - break; - } - } - - HA_RWLOCK_SKUNLOCK(TASK_WQ_LOCK, &wq_lock); -#endif leave: return; } @@ -658,6 +590,19 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) goto next; } + if (state & TASK_WOKEN_WQ) { + /* We should add this task to our wait queue */ + task_queue(t); + /* + * If this is the only reason the task got scheduled, + * then we don't actually have ot run it. + */ + if ((state & TASK_WOKEN_ANY) == TASK_WOKEN_WQ) { + task_drop_running(t, 0); + goto next; + } + state &= ~TASK_WOKEN_WQ; + } /* OK now the task or tasklet is well alive and is going to be run */ if (state & TASK_F_TASKLET) { /* this is a tasklet */ @@ -686,7 +631,8 @@ unsigned int run_tasks_from_lists(unsigned int budgets[]) __task_free(t); } else { - task_queue(t); + if (__task_get_current_owner(t->tid) == tid) + task_queue(t); task_drop_running(t, 0); } }