diff --git a/include/ldap_pvt_thread.h b/include/ldap_pvt_thread.h index 1a0a012c31..67f7ede3ee 100644 --- a/include/ldap_pvt_thread.h +++ b/include/ldap_pvt_thread.h @@ -282,6 +282,10 @@ LDAP_F( int ) ldap_pvt_thread_pool_pausecheck LDAP_P(( ldap_pvt_thread_pool_t *pool )); +LDAP_F( int ) +ldap_pvt_thread_pool_pausecheck_native LDAP_P(( + ldap_pvt_thread_pool_t *pool )); + LDAP_F( int ) ldap_pvt_thread_pool_pause LDAP_P(( ldap_pvt_thread_pool_t *pool )); diff --git a/libraries/libldap/tpool.c b/libraries/libldap/tpool.c index 5df3dd76af..67507c0ae6 100644 --- a/libraries/libldap/tpool.c +++ b/libraries/libldap/tpool.c @@ -1239,6 +1239,32 @@ ldap_pvt_thread_pool_pausecheck( ldap_pvt_thread_pool_t *tpool ) return handle_pause(tpool, PAUSE_ARG(CHECK_PAUSE)); } +/* + * Wait for a pause, from a non-pooled thread. + */ +int +ldap_pvt_thread_pool_pausecheck_native( ldap_pvt_thread_pool_t *tpool ) +{ + struct ldap_int_thread_pool_s *pool; + + if (tpool == NULL) + return(-1); + + pool = *tpool; + + if (pool == NULL) + return(0); + + if (!pool->ltp_pause) + return(0); + + ldap_pvt_thread_mutex_lock(&pool->ltp_mutex); + while (pool->ltp_pause) + ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex); + ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); + return 1; +} + /* * Pause the pool. The calling task must be active, not idle. * Return when all other tasks are paused or idle. diff --git a/servers/slapd/bconfig.c b/servers/slapd/bconfig.c index ead975b3be..c8bcdbadd2 100644 --- a/servers/slapd/bconfig.c +++ b/servers/slapd/bconfig.c @@ -1043,6 +1043,14 @@ typedef struct ADlist { static ADlist *sortVals; +static int new_daemon_threads; + +static int +config_resize_lthreads(ConfigArgs *c) +{ + return slapd_daemon_resize( new_daemon_threads ); +} + static int config_generic(ConfigArgs *c) { int i; @@ -1806,7 +1814,7 @@ config_generic(ConfigArgs *c) { case CFG_THREADQS: if ( c->value_int < 1 ) { snprintf( c->cr_msg, sizeof( c->cr_msg ), - "threadqueuess=%d smaller than minimum value 1", + "threadqueues=%d smaller than minimum value 1", c->value_int ); Debug(LDAP_DEBUG_ANY, "%s: %s.\n", c->log, c->cr_msg ); @@ -1824,6 +1832,14 @@ config_generic(ConfigArgs *c) { break; case CFG_LTHREADS: + if ( c->value_uint < 1 ) { + snprintf( c->cr_msg, sizeof( c->cr_msg ), + "listenerthreads=%u smaller than minimum value 1", + c->value_uint ); + Debug(LDAP_DEBUG_ANY, "%s: %s.\n", + c->log, c->cr_msg ); + return 1; + } { int mask = 0; /* use a power of two */ while (c->value_uint > 1) { @@ -1831,8 +1847,8 @@ config_generic(ConfigArgs *c) { mask <<= 1; mask |= 1; } - slapd_daemon_mask = mask; - slapd_daemon_threads = mask+1; + new_daemon_threads = mask+1; + config_push_cleanup( c, config_resize_lthreads ); } break; @@ -4195,11 +4211,11 @@ config_tls_option(ConfigArgs *c) { if (c->op == SLAP_CONFIG_EMIT) { return ldap_pvt_tls_get_option( ld, flag, berval ? (void *)&c->value_bv : (void *)&c->value_string ); } else if ( c->op == LDAP_MOD_DELETE ) { - c->cleanup = config_tls_cleanup; + config_push_cleanup( c, config_tls_cleanup ); return ldap_pvt_tls_set_option( ld, flag, NULL ); } if ( !berval ) ch_free(c->value_string); - c->cleanup = config_tls_cleanup; + config_push_cleanup( c, config_tls_cleanup ); rc = ldap_pvt_tls_set_option(ld, flag, berval ? (void *)&c->value_bv : (void *)c->argv[1]); if ( berval ) ch_free(c->value_bv.bv_val); return rc; @@ -4223,11 +4239,11 @@ config_tls_config(ConfigArgs *c) { return slap_tls_get_config( slap_tls_ld, flag, &c->value_string ); } else if ( c->op == LDAP_MOD_DELETE ) { int i = 0; - c->cleanup = config_tls_cleanup; + config_push_cleanup( c, config_tls_cleanup ); return ldap_pvt_tls_set_option( slap_tls_ld, flag, &i ); } ch_free( c->value_string ); - c->cleanup = config_tls_cleanup; + config_push_cleanup( c, config_tls_cleanup ); if ( isdigit( (unsigned char)c->argv[1][0] ) && c->type != CFG_TLS_PROTOCOL_MIN ) { if ( lutil_atoi( &i, c->argv[1] ) != 0 ) { Debug(LDAP_DEBUG_ANY, "%s: " @@ -5613,8 +5629,8 @@ ok: rc = ca->bi->bi_db_open( ca->be, &ca->reply ); ca->be->bd_info = bi_orig; } - } else if ( ca->cleanup ) { - rc = ca->cleanup( ca ); + } else if ( ca->num_cleanups ) { + rc = config_run_cleanup( ca ); } if ( rc ) { if (ca->cr_msg[0] == '\0') @@ -5684,8 +5700,8 @@ done: overlay_destroy_one( ca->be, (slap_overinst *)ca->bi ); } else if ( coptr->co_type == Cft_Schema ) { schema_destroy_one( ca, colst, nocs, last ); - } else if ( ca->cleanup ) { - ca->cleanup( ca ); + } else if ( ca->num_cleanups ) { + config_run_cleanup( ca ); } } done_noop: @@ -6224,8 +6240,8 @@ out: ca->reply = msg; } - if ( ca->cleanup ) { - i = ca->cleanup( ca ); + if ( ca->num_cleanups ) { + i = config_run_cleanup( ca ); if (rc == LDAP_SUCCESS) rc = i; } diff --git a/servers/slapd/connection.c b/servers/slapd/connection.c index 6bb48b87b9..e9130a9d0b 100644 --- a/servers/slapd/connection.c +++ b/servers/slapd/connection.c @@ -706,6 +706,17 @@ connection_destroy( Connection *c ) } } +int connection_is_active( ber_socket_t s ) +{ + Connection *c; + + assert( s < dtblsize ); + c = &connections[s]; + return c->c_conn_state == SLAP_C_CLOSING || + c->c_conn_state == SLAP_C_BINDING || + c->c_conn_state == SLAP_C_ACTIVE ; +} + int connection_valid( Connection *c ) { /* c_mutex must be locked by caller */ diff --git a/servers/slapd/daemon.c b/servers/slapd/daemon.c index 050a62ce3e..48d502136e 100644 --- a/servers/slapd/daemon.c +++ b/servers/slapd/daemon.c @@ -81,9 +81,6 @@ ber_socket_t dtblsize; slap_ssf_t local_ssf = LDAP_PVT_SASL_LOCAL_SSF; struct runqueue_s slapd_rq; -#ifndef SLAPD_MAX_DAEMON_THREADS -#define SLAPD_MAX_DAEMON_THREADS 16 -#endif int slapd_daemon_threads = 1; int slapd_daemon_mask; @@ -94,7 +91,6 @@ int slapd_tcp_wmem; Listener **slap_listeners = NULL; static volatile sig_atomic_t listening = 1; /* 0 when slap_listeners closed */ -static ldap_pvt_thread_t *listener_tid; #ifndef SLAPD_LISTEN_BACKLOG #define SLAPD_LISTEN_BACKLOG 2048 @@ -102,7 +98,9 @@ static ldap_pvt_thread_t *listener_tid; #define DAEMON_ID(fd) (fd & slapd_daemon_mask) -static ber_socket_t wake_sds[SLAPD_MAX_DAEMON_THREADS][2]; +typedef ber_socket_t sdpair[2]; + +static sdpair *wake_sds; static ldap_pvt_thread_mutex_t emfile_mutex; static int emfile; @@ -136,6 +134,7 @@ typedef struct slap_daemon_st { ber_socket_t sd_nactives; int sd_nwriters; int sd_nfds; + ldap_pvt_thread_t sd_tid; #if defined(HAVE_KQUEUE) uint8_t* sd_fdmodes; /* indexed by fd */ @@ -173,7 +172,7 @@ typedef struct slap_daemon_st { #endif /* ! kqueue && ! epoll && ! /dev/poll */ } slap_daemon_st; -static slap_daemon_st slap_daemon[SLAPD_MAX_DAEMON_THREADS]; +static slap_daemon_st *slap_daemon; /* * NOTE: naming convention for macros: @@ -1881,11 +1880,13 @@ slapd_daemon_init( const char *urls ) Debug( LDAP_DEBUG_ARGS, "daemon_init: %s\n", urls ? urls : "" ); - for ( i=0; isl_sd == i ) { + sl = slap_listeners[j]; + num_listeners--; + break; + } + } + } + SLAP_SOCK_ADD( newid, i, sl ); + if ( SLAP_SOCK_IS_READ( oldid, i )) { + SLAP_SOCK_SET_READ( newid, i ); + } + if ( SLAP_SOCK_IS_WRITE( oldid, i )) { + SLAP_SOCK_SET_WRITE( newid, i ); + slap_daemon[oldid].sd_nwriters--; + slap_daemon[newid].sd_nwriters++; + } + if ( connection_is_active( i )) { + slap_daemon[oldid].sd_nactives--; + slap_daemon[newid].sd_nactives++; + } + SLAP_SOCK_DEL( oldid, i ); + } +} + int slapd_daemon_destroy( void ) @@ -2409,7 +2465,8 @@ slapd_daemon_task( int l; time_t last_idle_check = 0; int ebadf = 0; - int tid = (ldap_pvt_thread_t *) ptr - listener_tid; + int tid = (slap_daemon_st *) ptr - slap_daemon; + int old_threads = slapd_daemon_threads; #define SLAPD_IDLE_CHECK_LIMIT 4 @@ -2783,6 +2840,8 @@ loop: continue; } + if ( DAEMON_ID( lr->sl_sd ) != tid ) continue; + if ( lr->sl_mute ) { Debug( LDAP_DEBUG_CONNS, "daemon: " SLAP_EVENT_FNAME ": " @@ -2870,6 +2929,7 @@ loop: if ( ns <= 0 ) break; if ( slap_listeners[l]->sl_sd == AC_SOCKET_INVALID ) continue; + if ( DAEMON_ID( slap_listeners[l]->sl_sd ) != tid ) continue; #ifdef LDAP_CONNECTIONLESS if ( slap_listeners[l]->sl_is_udp ) continue; #endif /* LDAP_CONNECTIONLESS */ @@ -3088,6 +3148,13 @@ loop: } #endif /* SLAP_EVENTS_ARE_INDEXED */ + /* Was number of listener threads decreased? */ + if ( ldap_pvt_thread_pool_pausecheck_native( &connection_pool )) { + /* decreased, let this thread finish */ + if ( tid >= slapd_daemon_threads ) + break; + } + #ifndef HAVE_YIELDING_SELECT ldap_pvt_thread_yield(); #endif /* ! HAVE_YIELDING_SELECT */ @@ -3136,6 +3203,107 @@ loop: return NULL; } +typedef struct slap_tid_waiter { + int num_tids; + ldap_pvt_thread_t tids[0]; +} slap_tid_waiter; + +static void * +slapd_daemon_tid_cleanup( + void *ctx, + void *ptr ) +{ + slap_tid_waiter *tids = ptr; + int i; + + for ( i=0; inum_tids; i++ ) + ldap_pvt_thread_join( tids->tids[i], (void *)NULL ); + ch_free( ptr ); + return NULL; +} + +int +slapd_daemon_resize( int newnum ) +{ + int i, rc; + + if ( newnum == slapd_daemon_threads ) + return 0; + + /* wake up all current listener threads */ + for ( i=0; i slapd_daemon_threads ) { + wake_sds = ch_realloc( wake_sds, newnum * sizeof( sdpair )); + slap_daemon = ch_realloc( slap_daemon, newnum * sizeof( slap_daemon_st )); + + for ( i=slapd_daemon_threads; inum_tids = slapd_daemon_threads - newnum; + for ( i=newnum, j=0; itids[j] = slap_daemon[i].sd_tid; +#ifdef HAVE_WINSOCK + if ( wake_sds[i][1] != INVALID_SOCKET && + SLAP_FD2SOCK( wake_sds[i][1] ) != SLAP_FD2SOCK( wake_sds[i][0] )) +#endif /* HAVE_WINSOCK */ + tcp_close( SLAP_FD2SOCK(wake_sds[i][1]) ); +#ifdef HAVE_WINSOCK + if ( wake_sds[i][0] != INVALID_SOCKET ) +#endif /* HAVE_WINSOCK */ + tcp_close( SLAP_FD2SOCK(wake_sds[i][0]) ); + + SLAP_SOCK_DESTROY( i ); + } + + wake_sds = ch_realloc( wake_sds, newnum * sizeof( sdpair )); + slap_daemon = ch_realloc( slap_daemon, newnum * sizeof( slap_daemon_st )); + for ( i=0; i SLAPD_MAX_DAEMON_THREADS ) - slapd_daemon_threads = SLAPD_MAX_DAEMON_THREADS; - - listener_tid = ch_malloc(slapd_daemon_threads * sizeof(ldap_pvt_thread_t)); - SLAP_SOCK_INIT2(); /* daemon_init only inits element 0 */ @@ -3202,8 +3365,8 @@ slapd_daemon( void ) for ( i=0; i