diff --git a/include/haproxy/task.h b/include/haproxy/task.h index 0209d6c20..a5b4c691d 100644 --- a/include/haproxy/task.h +++ b/include/haproxy/task.h @@ -333,7 +333,8 @@ static inline void task_drop_running(struct task *t, unsigned int f) new_state |= TASK_QUEUED; - if ((new_state & TASK_QUEUED) || cur_tid >= 0 || task_in_wq(t)) + if ((new_state & TASK_QUEUED) || cur_tid >= 0 || task_in_wq(t) || + __task_get_current_owner(cur_tid) != tid) new_tid = cur_tid; else new_tid = -1; diff --git a/src/task.c b/src/task.c index 0806689d6..1b736d863 100644 --- a/src/task.c +++ b/src/task.c @@ -372,9 +372,53 @@ void wake_expired_tasks() task = eb32_entry(eb, struct task, wq); if (tick_is_expired(task->expire, now_ms)) { + int set_running = 0; + /* expired task, wake it up */ __task_unlink_wq(task); - _task_wakeup(task, TASK_WOKEN_TIMER, 0); + /* + * If it's a shared task, see whether we should hand it + * to a less loaded thread. + */ + if (task->tid < 0) { + int attempts = MIN(global.nbthread, 3); + while (attempts-- > 0) { + uint new_tid = statistical_prng_range(global.nbthread); + + if (new_tid == tid) + continue; + if (ha_thread_ctx[new_tid].rq_total * 2 < th_ctx->rq_total) { + int cur_state; + do { + cur_state = _HA_ATOMIC_LOAD(&task->state); + /* + * Okay the task is already in our runqueue, + * or somebody owns the + * TASK_RUNNING flag because + * it is calling task_schedule(), give up. + */ + if (cur_state & (TASK_QUEUED | TASK_RUNNING)) + break; + /* + * Make sure we have TASK_RUNNING set + * so that the task don't + * immediately run on the + * new thread and gets + * freed. + */ + if (__task_set_state_and_tid(task, task->tid, -2 - new_tid, cur_state, cur_state | TASK_RUNNING)) { + set_running = 1; + break; + } + } while (1); + break; + } + } + } + if (set_running) + task_drop_running(task, TASK_WOKEN_TIMER); + else + _task_wakeup(task, TASK_WOKEN_TIMER, 0); } else if (task->expire != eb->key) { /* task is not expired but its key doesn't match so let's