Fixes for multiple threadpool queues

Remove poolq_hash, it wasn't distributing work evenly to the queues.
Just walk through all queues and use the one with smallest
active+pending count. Since pool_retract also relied on the hash,
a different means of locating the thread to retract was needed.
Add pool_submit2 which returns the threadpool task structure,
and record which poolq this task lives on.
This commit is contained in:
Howard Chu 2017-03-15 11:13:09 +00:00
parent 53c6c9d16b
commit e12ca8b6fe
5 changed files with 49 additions and 37 deletions

View file

@ -227,10 +227,15 @@ ldap_pvt_thread_pool_submit LDAP_P((
void *arg )); void *arg ));
LDAP_F( int ) LDAP_F( int )
ldap_pvt_thread_pool_retract LDAP_P(( ldap_pvt_thread_pool_submit2 LDAP_P((
ldap_pvt_thread_pool_t *pool, ldap_pvt_thread_pool_t *pool,
ldap_pvt_thread_start_t *start, ldap_pvt_thread_start_t *start,
void *arg )); void *arg,
void **cookie ));
LDAP_F( int )
ldap_pvt_thread_pool_retract LDAP_P((
void *cookie ));
LDAP_F( int ) LDAP_F( int )
ldap_pvt_thread_pool_maxthreads LDAP_P(( ldap_pvt_thread_pool_maxthreads LDAP_P((

View file

@ -29,6 +29,7 @@ typedef struct re_s {
void *arg; void *arg;
char *tname; char *tname;
char *tspec; char *tspec;
void *pool_cookie;
} re_t; } re_t;
typedef struct runqueue_s { typedef struct runqueue_s {

View file

@ -90,6 +90,7 @@ typedef struct ldap_int_thread_task_s {
} ltt_next; } ltt_next;
ldap_pvt_thread_start_t *ltt_start_routine; ldap_pvt_thread_start_t *ltt_start_routine;
void *ltt_arg; void *ltt_arg;
struct ldap_int_thread_poolq_s *ltt_queue;
} ldap_int_thread_task_t; } ldap_int_thread_task_t;
typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t; typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t;
@ -337,25 +338,21 @@ ldap_pvt_thread_pool_init (
return ldap_pvt_thread_pool_init_q( tpool, max_threads, max_pending, 1 ); return ldap_pvt_thread_pool_init_q( tpool, max_threads, max_pending, 1 );
} }
static int
ldap_int_poolq_hash(
struct ldap_int_thread_pool_s *pool,
void *arg )
{
int i = 0, j;
unsigned char *ptr = (unsigned char *)&arg;
/* dumb hash of arg to choose a queue */
for (j=0; j<sizeof(arg); j++)
i += *ptr++;
i %= pool->ltp_numqs;
return i;
}
/* Submit a task to be performed by the thread pool */ /* Submit a task to be performed by the thread pool */
int int
ldap_pvt_thread_pool_submit ( ldap_pvt_thread_pool_submit (
ldap_pvt_thread_pool_t *tpool, ldap_pvt_thread_pool_t *tpool,
ldap_pvt_thread_start_t *start_routine, void *arg ) ldap_pvt_thread_start_t *start_routine, void *arg )
{
return ldap_pvt_thread_pool_submit2( tpool, start_routine, arg, NULL );
}
/* Submit a task to be performed by the thread pool */
int
ldap_pvt_thread_pool_submit2 (
ldap_pvt_thread_pool_t *tpool,
ldap_pvt_thread_start_t *start_routine, void *arg,
void **cookie )
{ {
struct ldap_int_thread_pool_s *pool; struct ldap_int_thread_pool_s *pool;
struct ldap_int_thread_poolq_s *pq; struct ldap_int_thread_poolq_s *pq;
@ -371,9 +368,23 @@ ldap_pvt_thread_pool_submit (
if (pool == NULL) if (pool == NULL)
return(-1); return(-1);
if ( pool->ltp_numqs > 1 ) if ( pool->ltp_numqs > 1 ) {
i = ldap_int_poolq_hash( pool, arg ); int min = pool->ltp_wqs[0]->ltp_max_pending + pool->ltp_wqs[0]->ltp_max_count;
else int min_x = 0, cnt;
for ( i = 0; i < pool->ltp_numqs; i++ ) {
/* take first queue that has nothing active */
if ( !pool->ltp_wqs[i]->ltp_active_count ) {
min_x = i;
break;
}
cnt = pool->ltp_wqs[i]->ltp_active_count + pool->ltp_wqs[i]->ltp_pending_count;
if ( cnt < min ) {
min = cnt;
min_x = i;
}
}
i = min_x;
} else
i = 0; i = 0;
j = i; j = i;
@ -401,6 +412,9 @@ ldap_pvt_thread_pool_submit (
task->ltt_start_routine = start_routine; task->ltt_start_routine = start_routine;
task->ltt_arg = arg; task->ltt_arg = arg;
task->ltt_queue = pq;
if ( cookie )
*cookie = task;
pq->ltp_pending_count++; pq->ltp_pending_count++;
LDAP_STAILQ_INSERT_TAIL(&pq->ltp_pending_list, task, ltt_next.q); LDAP_STAILQ_INSERT_TAIL(&pq->ltp_pending_list, task, ltt_next.q);
@ -475,29 +489,22 @@ no_task( void *ctx, void *arg )
*/ */
int int
ldap_pvt_thread_pool_retract ( ldap_pvt_thread_pool_retract (
ldap_pvt_thread_pool_t *tpool, void *cookie )
ldap_pvt_thread_start_t *start_routine, void *arg )
{ {
struct ldap_int_thread_pool_s *pool; ldap_int_thread_task_t *task, *ttmp;
struct ldap_int_thread_poolq_s *pq; struct ldap_int_thread_poolq_s *pq;
ldap_int_thread_task_t *task;
int i;
if (tpool == NULL) if (cookie == NULL)
return(-1); return(-1);
pool = *tpool; ttmp = cookie;
pq = ttmp->ltt_queue;
if (pool == NULL) if (pq == NULL)
return(-1); return(-1);
i = ldap_int_poolq_hash( pool, arg );
pq = pool->ltp_wqs[i];
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex); ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q) LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q)
if (task->ltt_start_routine == start_routine && if (task == ttmp) {
task->ltt_arg == arg) {
/* Could LDAP_STAILQ_REMOVE the task, but that /* Could LDAP_STAILQ_REMOVE the task, but that
* walks ltp_pending_list again to find it. * walks ltp_pending_list again to find it.
*/ */

View file

@ -2460,8 +2460,8 @@ loop:
ldap_pvt_runqueue_runtask( &slapd_rq, rtask ); ldap_pvt_runqueue_runtask( &slapd_rq, rtask );
ldap_pvt_runqueue_resched( &slapd_rq, rtask, 0 ); ldap_pvt_runqueue_resched( &slapd_rq, rtask, 0 );
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
ldap_pvt_thread_pool_submit( &connection_pool, ldap_pvt_thread_pool_submit2( &connection_pool,
rtask->routine, (void *) rtask ); rtask->routine, (void *) rtask, &rtask->pool_cookie );
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
} }
rtask = ldap_pvt_runqueue_next_sched( &slapd_rq, &cat ); rtask = ldap_pvt_runqueue_next_sched( &slapd_rq, &cat );

View file

@ -5769,8 +5769,7 @@ syncrepl_config( ConfigArgs *c )
ldap_pvt_runqueue_stoptask( &slapd_rq, re ); ldap_pvt_runqueue_stoptask( &slapd_rq, re );
isrunning = 1; isrunning = 1;
} }
if ( ldap_pvt_thread_pool_retract( &connection_pool, if ( ldap_pvt_thread_pool_retract( re->pool_cookie ) > 0 )
re->routine, re ) > 0 )
isrunning = 0; isrunning = 0;
ldap_pvt_runqueue_remove( &slapd_rq, re ); ldap_pvt_runqueue_remove( &slapd_rq, re );