ITS#5364, reduce work with ltp_mutex locked:

Replace ltp_state with ltp_finishing.  Drop state
LDAP_INT_THREAD_POOL_STOPPING, flush pending list instead.
ltp_max_pending = default value instead of 0, and negative when finishing.
This commit is contained in:
Hallvard Furuseth 2008-02-10 17:28:20 +00:00
parent 84c2a01c3b
commit ee73fca523

View file

@ -32,12 +32,6 @@
#ifndef LDAP_THREAD_HAVE_TPOOL
typedef enum ldap_int_thread_pool_state_e {
LDAP_INT_THREAD_POOL_RUNNING,
LDAP_INT_THREAD_POOL_FINISHING,
LDAP_INT_THREAD_POOL_STOPPING
} ldap_int_thread_pool_state_t;
/* Thread-specific key with data and optional free function */
typedef struct ldap_int_tpool_key_s {
void *ltk_key;
@ -53,6 +47,9 @@ typedef struct ldap_int_tpool_key_s {
/* Max number of threads */
#define LDAP_MAXTHR 1024 /* must be a power of 2 */
/* (Theoretical) max number of pending requests */
#define MAX_PENDING (INT_MAX/2) /* INT_MAX - (room to avoid overflow) */
/* Context: thread ID and thread-specific key/data pairs */
typedef struct ldap_int_thread_userctx_s {
ldap_pvt_thread_t ltu_id;
@ -105,15 +102,23 @@ struct ldap_int_thread_pool_s {
LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ltp_pending_list;
LDAP_SLIST_HEAD(tcl, ldap_int_thread_task_s) ltp_free_list;
ldap_int_thread_pool_state_t ltp_state;
/* Some active task needs to be the sole active task.
* Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
/* The pool is finishing, waiting for its threads to close.
* They close when ltp_pending_list is done. pool_submit()
* rejects new tasks. ltp_max_pending = -(its old value).
*/
volatile sig_atomic_t ltp_pause;
int ltp_finishing;
/* Some active task needs to be the sole active task.
* Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
*/
volatile sig_atomic_t ltp_pause;
/* Max number of threads in pool, or 0 for default (LDAP_MAXTHR) */
int ltp_max_count;
/* Max number of pending + paused requests, negated when ltp_finishing */
int ltp_max_pending;
int ltp_max_count; /* Max number of threads in pool, or 0 */
int ltp_max_pending; /* Max pending or paused requests, or 0 */
int ltp_pending_count; /* Pending or paused requests */
int ltp_active_count; /* Active, not paused requests */
int ltp_open_count; /* Number of threads */
@ -171,8 +176,8 @@ ldap_pvt_thread_pool_init (
if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
max_threads = 0;
if (max_pending < 0)
max_pending = 0;
if (! (1 <= max_pending && max_pending <= MAX_PENDING))
max_pending = MAX_PENDING;
*tpool = NULL;
pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
@ -191,7 +196,6 @@ ldap_pvt_thread_pool_init (
return(rc);
ldap_int_has_thread_pool = 1;
pool->ltp_state = LDAP_INT_THREAD_POOL_RUNNING;
pool->ltp_max_count = max_threads;
pool->ltp_max_pending = max_pending;
LDAP_STAILQ_INIT(&pool->ltp_pending_list);
@ -259,9 +263,8 @@ ldap_pvt_thread_pool_submit (
return(-1);
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
if (pool->ltp_state != LDAP_INT_THREAD_POOL_RUNNING
|| (pool->ltp_max_pending
&& pool->ltp_pending_count >= pool->ltp_max_pending))
if (pool->ltp_pending_count >= pool->ltp_max_pending)
goto failed;
task = LDAP_SLIST_FIRST(&pool->ltp_free_list);
@ -389,6 +392,10 @@ ldap_pvt_thread_pool_query(
case LDAP_PVT_THREAD_POOL_PARAM_MAX_PENDING:
count = pool->ltp_max_pending;
if (count < 0)
count = -count;
if (count == MAX_PENDING)
count = 0;
break;
case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
@ -424,32 +431,12 @@ ldap_pvt_thread_pool_query(
case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD_MAX:
break;
case LDAP_PVT_THREAD_POOL_PARAM_STATE: {
static struct {
char *name;
ldap_int_thread_pool_state_t state;
} str2state[] = {
{ "running", LDAP_INT_THREAD_POOL_RUNNING },
{ "finishing", LDAP_INT_THREAD_POOL_FINISHING },
{ "stopping", LDAP_INT_THREAD_POOL_STOPPING },
{ NULL }
};
int i;
if ( pool->ltp_pause ) {
*((char **)value) = "pausing";
} else {
for ( i = 0; str2state[ i ].name != NULL; i++ ) {
if ( str2state[ i ].state == pool->ltp_state ) {
break;
}
}
*((char **)value) = str2state[ i ].name;
}
if ( *((char **)value) != NULL ) {
count = -2;
}
} break;
case LDAP_PVT_THREAD_POOL_PARAM_STATE:
*((char **)value) =
pool->ltp_pause ? "pausing" :
!pool->ltp_finishing ? "running" :
pool->ltp_pending_count ? "finishing" : "stopping";
break;
case LDAP_PVT_THREAD_POOL_PARAM_UNKNOWN:
break;
@ -499,6 +486,17 @@ ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
return rc;
}
static void
flush_pending_list( struct ldap_int_thread_pool_s *pool )
{
ldap_int_thread_task_t *task;
while ((task = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL) {
LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltt_next.q);
LDAP_FREE(task);
}
}
/* Destroy the pool after making its threads finish */
int
ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
@ -524,9 +522,12 @@ ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
if (pool != pptr) return(-1);
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
pool->ltp_state = run_pending
? LDAP_INT_THREAD_POOL_FINISHING
: LDAP_INT_THREAD_POOL_STOPPING;
pool->ltp_finishing = 1;
if (pool->ltp_max_pending > 0)
pool->ltp_max_pending = -pool->ltp_max_pending;
if (!run_pending)
flush_pending_list(pool);
while (pool->ltp_open_count) {
if (!pool->ltp_pause)
@ -535,11 +536,7 @@ ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
}
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
while ((task = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL)
{
LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltt_next.q);
LDAP_FREE(task);
}
flush_pending_list(pool);
while ((task = LDAP_SLIST_FIRST(&pool->ltp_free_list)) != NULL)
{
@ -600,12 +597,9 @@ ldap_int_thread_pool_wrapper (
while (pool->ltp_pause)
ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
if (pool->ltp_state == LDAP_INT_THREAD_POOL_STOPPING)
break;
task = LDAP_STAILQ_FIRST(&pool->ltp_pending_list);
if (task == NULL) {
if (pool->ltp_state == LDAP_INT_THREAD_POOL_FINISHING)
if (pool->ltp_finishing)
break;
if (pool->ltp_open_count >
@ -631,7 +625,6 @@ ldap_int_thread_pool_wrapper (
* check idle time.
*/
assert(pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING);
ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
continue;
}
@ -728,7 +721,7 @@ ldap_pvt_thread_pool_resume (
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
pool->ltp_pause = 0;
if (pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING)
if (!pool->ltp_finishing)
ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
return(0);