mirror of
https://git.openldap.org/openldap/openldap.git
synced 2025-12-25 09:09:54 -05:00
parent
f947f789f3
commit
eaa3469736
1 changed files with 62 additions and 19 deletions
|
|
@ -101,8 +101,10 @@ struct ldap_int_thread_poolq_s {
|
|||
/* not paused and something to do for pool_<wrapper/pause/destroy>() */
|
||||
ldap_pvt_thread_cond_t ltp_cond;
|
||||
|
||||
/* ltp_active_count <= 1 && ltp_pause */
|
||||
ldap_pvt_thread_cond_t ltp_pcond;
|
||||
/* Some active task needs to be the sole active task.
|
||||
* Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
|
||||
*/
|
||||
volatile sig_atomic_t ltp_pause;
|
||||
|
||||
/* ltp_pause == 0 ? <p_pending_list : &empty_pending_list,
|
||||
* maintaned to reduce work for pool_wrapper()
|
||||
|
|
@ -136,6 +138,12 @@ struct ldap_int_thread_pool_s {
|
|||
/* protect members below, and protect thread_keys[] during pauses */
|
||||
ldap_pvt_thread_mutex_t ltp_mutex;
|
||||
|
||||
/* ltp_active_count <= 1 && ltp_pause */
|
||||
ldap_pvt_thread_cond_t ltp_pcond;
|
||||
|
||||
/* number of active queues */
|
||||
int ltp_active_queues;
|
||||
|
||||
/* The pool is finishing, waiting for its threads to close.
|
||||
* They close when ltp_pending_list is done. pool_submit()
|
||||
* rejects new tasks. ltp_max_pending = -(its old value).
|
||||
|
|
@ -233,6 +241,10 @@ ldap_pvt_thread_pool_init_q (
|
|||
if (rc != 0)
|
||||
return(rc);
|
||||
|
||||
rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond);
|
||||
if (rc != 0)
|
||||
return(rc);
|
||||
|
||||
rem_thr = max_threads % numqs;
|
||||
rem_pend = max_pending % numqs;
|
||||
for ( i=0; i<numqs; i++ ) {
|
||||
|
|
@ -242,9 +254,6 @@ ldap_pvt_thread_pool_init_q (
|
|||
if (rc != 0)
|
||||
return(rc);
|
||||
rc = ldap_pvt_thread_cond_init(&pq->ltp_cond);
|
||||
if (rc != 0)
|
||||
return(rc);
|
||||
rc = ldap_pvt_thread_cond_init(&pq->ltp_pcond);
|
||||
if (rc != 0)
|
||||
return(rc);
|
||||
LDAP_STAILQ_INIT(&pq->ltp_pending_list);
|
||||
|
|
@ -684,6 +693,8 @@ ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
|
|||
if (pool->ltp_max_pending > 0)
|
||||
pool->ltp_max_pending = -pool->ltp_max_pending;
|
||||
|
||||
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
|
||||
|
||||
for (i=0; i<pool->ltp_numqs; i++) {
|
||||
pq = &pool->ltp_wqs[i];
|
||||
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
|
||||
|
|
@ -709,12 +720,11 @@ ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
|
|||
LDAP_FREE(task);
|
||||
}
|
||||
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
|
||||
ldap_pvt_thread_cond_destroy(&pq->ltp_pcond);
|
||||
ldap_pvt_thread_cond_destroy(&pq->ltp_cond);
|
||||
ldap_pvt_thread_mutex_destroy(&pq->ltp_mutex);
|
||||
}
|
||||
|
||||
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
|
||||
ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
|
||||
ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
|
||||
LDAP_FREE(pool);
|
||||
*tpool = NULL;
|
||||
|
|
@ -770,9 +780,15 @@ ldap_int_thread_pool_wrapper (
|
|||
work_list = pq->ltp_work_list; /* help the compiler a bit */
|
||||
task = LDAP_STAILQ_FIRST(work_list);
|
||||
if (task == NULL) { /* paused or no pending tasks */
|
||||
if (--(pq->ltp_active_count) < 2) {
|
||||
/* Notify pool_pause it is the sole active thread. */
|
||||
ldap_pvt_thread_cond_signal(&pq->ltp_pcond);
|
||||
if (--(pq->ltp_active_count) < 1) {
|
||||
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
|
||||
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
|
||||
if (--(pool->ltp_active_queues) < 1) {
|
||||
/* Notify pool_pause it is the sole active thread. */
|
||||
ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
|
||||
}
|
||||
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
|
||||
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
|
||||
}
|
||||
|
||||
do {
|
||||
|
|
@ -879,31 +895,46 @@ handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
|
|||
/* If ltp_pause and not GO_IDLE|GO_UNIDLE: Set GO_IDLE,GO_UNIDLE */
|
||||
pause_type -= pause;
|
||||
|
||||
if (pause_type & GO_IDLE) {
|
||||
if (!(pause_type & DO_PAUSE))
|
||||
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
|
||||
|
||||
if (pause_type & GO_IDLE) {
|
||||
int do_pool = 0;
|
||||
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
|
||||
pq->ltp_pending_count++;
|
||||
pq->ltp_active_count--;
|
||||
if (pause && pq->ltp_active_count < 1) {
|
||||
/* Tell the task waiting to DO_PAUSE it can proceed */
|
||||
ldap_pvt_thread_cond_signal(&pq->ltp_pcond);
|
||||
do_pool = 1;
|
||||
}
|
||||
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
|
||||
if (do_pool) {
|
||||
if (!(pause_type & DO_PAUSE))
|
||||
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
|
||||
pool->ltp_active_queues--;
|
||||
if (pool->ltp_active_queues < 1)
|
||||
/* Tell the task waiting to DO_PAUSE it can proceed */
|
||||
ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
|
||||
if (!(pause_type & DO_PAUSE))
|
||||
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
|
||||
}
|
||||
}
|
||||
|
||||
if (pause_type & GO_UNIDLE) {
|
||||
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
|
||||
if (pause_type & DO_PAUSE)
|
||||
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
|
||||
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
|
||||
/* Wait out pause if any, then cancel GO_IDLE */
|
||||
if (pause > max_ltp_pause) {
|
||||
ret = 1;
|
||||
do {
|
||||
ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
|
||||
} while (pool->ltp_pause > max_ltp_pause);
|
||||
} while (pq->ltp_pause > max_ltp_pause);
|
||||
}
|
||||
pq->ltp_pending_count--;
|
||||
pq->ltp_active_count++;
|
||||
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
|
||||
if (pause_type & DO_PAUSE)
|
||||
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
|
||||
}
|
||||
|
||||
if (pause_type & DO_PAUSE) {
|
||||
|
|
@ -912,6 +943,7 @@ handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
|
|||
ret = 0;
|
||||
assert(!pool->ltp_pause);
|
||||
pool->ltp_pause = WANT_PAUSE;
|
||||
pool->ltp_active_queues = 0;
|
||||
|
||||
for (i=0; i<pool->ltp_numqs; i++)
|
||||
if (&pool->ltp_wqs[i] == pq) break;
|
||||
|
|
@ -922,17 +954,21 @@ handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
|
|||
|
||||
j=i;
|
||||
do {
|
||||
pq = &pool->ltp_wqs[j];
|
||||
if (j != i)
|
||||
ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
|
||||
|
||||
pq->ltp_pause = WANT_PAUSE;
|
||||
|
||||
/* Let ldap_pvt_thread_pool_submit() through to its ltp_pause test,
|
||||
* and do not finish threads in ldap_pvt_thread_pool_wrapper() */
|
||||
pq->ltp_open_count = -pq->ltp_open_count;
|
||||
/* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */
|
||||
pq->ltp_work_list = &empty_pending_list;
|
||||
/* Wait for this task to become the sole active task */
|
||||
while (pq->ltp_active_count > 0) {
|
||||
ldap_pvt_thread_cond_wait(&pq->ltp_pcond, &pq->ltp_mutex);
|
||||
}
|
||||
|
||||
if (pq->ltp_active_count > 0)
|
||||
pool->ltp_active_queues++;
|
||||
|
||||
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
|
||||
if (pool->ltp_numqs > 1) {
|
||||
j++;
|
||||
|
|
@ -940,11 +976,17 @@ handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
|
|||
}
|
||||
} while (j != i);
|
||||
|
||||
/* Wait for this task to become the sole active task */
|
||||
while (pool->ltp_active_queues > 0)
|
||||
ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex);
|
||||
|
||||
/* restore us to active count */
|
||||
pool->ltp_wqs[i].ltp_active_count++;
|
||||
|
||||
assert(pool->ltp_pause == WANT_PAUSE);
|
||||
pool->ltp_pause = PAUSED;
|
||||
for (i=0; i<pool->ltp_numqs; i++)
|
||||
pool->ltp_wqs[i].ltp_pause = PAUSED;
|
||||
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
|
||||
}
|
||||
|
||||
|
|
@ -1015,6 +1057,7 @@ ldap_pvt_thread_pool_resume (
|
|||
pq->ltp_open_count = -pq->ltp_open_count;
|
||||
pq->ltp_work_list = &pq->ltp_pending_list;
|
||||
|
||||
pq->ltp_pause = 0;
|
||||
ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
|
||||
ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue