From 485a169758b14640bc22254850b769298f299c0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Kuzn=C3=ADk?= Date: Wed, 7 Feb 2018 12:38:40 +0000 Subject: [PATCH] Implement pause handlers --- servers/lloadd/daemon.c | 96 ++++++++++++++++++++++++++++++++++-- servers/lloadd/init.c | 4 ++ servers/lloadd/lload.h | 5 ++ servers/lloadd/module_init.c | 23 +++++++++ servers/lloadd/proto-lload.h | 3 ++ 5 files changed, 128 insertions(+), 3 deletions(-) diff --git a/servers/lloadd/daemon.c b/servers/lloadd/daemon.c index ed1a7b7a56..bb8fc8e22a 100644 --- a/servers/lloadd/daemon.c +++ b/servers/lloadd/daemon.c @@ -74,6 +74,10 @@ volatile sig_atomic_t slapd_abrupt_shutdown = 0; static int emfile; +ldap_pvt_thread_mutex_t lload_wait_mutex; +ldap_pvt_thread_cond_t lload_wait_cond; +ldap_pvt_thread_cond_t lload_pause_cond; + #ifndef SLAPD_MAX_DAEMON_THREADS #define SLAPD_MAX_DAEMON_THREADS 16 #endif @@ -139,6 +143,43 @@ lloadd_close( ber_socket_t s ) tcp_close( s ); } +static int +lload_base_dispatch( struct event_base *base ) +{ + int rc; + + while ( (rc = event_base_dispatch( base )) == 0 ) { + if ( event_base_got_exit( base ) ) { + break; + } + + Debug( LDAP_DEBUG_TRACE, "lload_base_dispatch: " + "handling pause\n" ); + /* + * We are pausing, signal the pausing thread we've finished and + * wait until the thread pool resumes operation. + * + * Do this in lockstep with the pausing thread. + */ + ldap_pvt_thread_mutex_lock( &lload_wait_mutex ); + ldap_pvt_thread_cond_signal( &lload_wait_cond ); + + /* Now wait until we resume */ + ldap_pvt_thread_cond_wait( &lload_pause_cond, &lload_wait_mutex ); + ldap_pvt_thread_mutex_unlock( &lload_wait_mutex ); + + Debug( LDAP_DEBUG_TRACE, "lload_base_dispatch: " + "resuming\n" ); + } + + if ( rc ) { + Debug( LDAP_DEBUG_ANY, "lload_base_dispatch: " + "event_base_dispatch() returned an error rc=%d\n", + rc ); + } + return rc; +} + static void lload_free_listener_addresses( struct sockaddr **sal ) { @@ -946,7 +987,7 @@ lload_listener( static void * lload_listener_thread( void *ctx ) { - int rc = event_base_dispatch( listener_base ); + int rc = lload_base_dispatch( listener_base ); Debug( LDAP_DEBUG_ANY, "lload_listener_thread: " "event loop finished: rc=%d\n", rc ); @@ -1214,7 +1255,7 @@ lloadd_io_task( void *ptr ) lload_daemon[tid].wakeup_event = event; /* run */ - rc = event_base_dispatch( base ); + rc = lload_base_dispatch( base ); Debug( LDAP_DEBUG_ANY, "lloadd_io_task: " "Daemon %d, event loop finished: rc=%d\n", tid, rc ); @@ -1305,7 +1346,7 @@ lloadd_daemon( struct event_base *daemon_base ) } lloadd_inited = 1; - rc = event_base_dispatch( daemon_base ); + rc = lload_base_dispatch( daemon_base ); Debug( LDAP_DEBUG_ANY, "lloadd shutdown: " "Main event loop finished: rc=%d\n", rc ); @@ -1352,6 +1393,55 @@ daemon_wakeup_cb( evutil_socket_t sig, short what, void *arg ) } } +#ifdef BALANCER_MODULE +/* + * Signal the event base to terminate processing as soon as it can and wait for + * lload_base_dispatch to notify us this has happened. + */ +static int +lload_pause_base( struct event_base *base ) +{ + int rc; + + ldap_pvt_thread_mutex_lock( &lload_wait_mutex ); + event_base_loopbreak( base ); + rc = ldap_pvt_thread_cond_wait( &lload_wait_cond, &lload_wait_mutex ); + ldap_pvt_thread_mutex_unlock( &lload_wait_mutex ); + + return rc; +} + +void +lload_pause_server( void ) +{ + int i; + + lload_pause_base( listener_base ); + lload_pause_base( daemon_base ); + + for ( i = 0; i < lload_daemon_threads; i++ ) { + lload_pause_base( lload_daemon[i].base ); + } +} + +void +lload_unpause_server( void ) +{ + /* + * Make sure lloadd is completely ready to unpause by now: + * + * After the broadcast, we handle I/O and begin filling the thread pool, in + * high load conditions, we might hit the pool limits and start processing + * operations in the I/O threads (one PDU per socket at a time for fairness + * sake) even before a pause has finished from slapd's point of view! + * + * When (max_pdus_per_cycle == 0) we don't use the pool for these at all and + * most lload processing starts immediately making this even more prominent. + */ + ldap_pvt_thread_cond_broadcast( &lload_pause_cond ); +} +#endif /* BALANCER_MODULE */ + void lload_sig_shutdown( evutil_socket_t sig, short what, void *arg ) { diff --git a/servers/lloadd/init.c b/servers/lloadd/init.c index a2f95d0d1b..96481ccc41 100644 --- a/servers/lloadd/init.c +++ b/servers/lloadd/init.c @@ -101,6 +101,10 @@ lload_init( int mode, const char *name ) LDAP_STAILQ_INIT( &slapd_rq.task_list ); LDAP_STAILQ_INIT( &slapd_rq.run_list ); + ldap_pvt_thread_mutex_init( &lload_wait_mutex ); + ldap_pvt_thread_cond_init( &lload_wait_cond ); + ldap_pvt_thread_cond_init( &lload_pause_cond ); + ldap_pvt_thread_mutex_init( &backend_mutex ); ldap_pvt_thread_mutex_init( &clients_mutex ); ldap_pvt_thread_mutex_init( &lload_pin_mutex ); diff --git a/servers/lloadd/lload.h b/servers/lloadd/lload.h index 2c7519aaad..73fde464fe 100644 --- a/servers/lloadd/lload.h +++ b/servers/lloadd/lload.h @@ -92,6 +92,11 @@ LDAP_SLAPD_V (LloadBackend *) current_backend; LDAP_SLAPD_V (struct slap_bindconf) bindconf; LDAP_SLAPD_V (struct berval) lloadd_identity; +/* Used to coordinate server (un)pause, shutdown */ +LDAP_SLAPD_V (ldap_pvt_thread_mutex_t) lload_wait_mutex; +LDAP_SLAPD_V (ldap_pvt_thread_cond_t) lload_pause_cond; +LDAP_SLAPD_V (ldap_pvt_thread_cond_t) lload_wait_cond; + typedef int lload_cf_aux_table_parse_x( struct berval *val, void *bc, slap_cf_aux_table *tab0, diff --git a/servers/lloadd/module_init.c b/servers/lloadd/module_init.c index 7caa526a3b..4f8caff575 100644 --- a/servers/lloadd/module_init.c +++ b/servers/lloadd/module_init.c @@ -64,6 +64,10 @@ lload_conn_pool_init() { int rc = 0; + ldap_pvt_thread_mutex_init( &lload_wait_mutex ); + ldap_pvt_thread_cond_init( &lload_pause_cond ); + ldap_pvt_thread_cond_init( &lload_wait_cond ); + ldap_pvt_thread_mutex_init( &backend_mutex ); ldap_pvt_thread_mutex_init( &clients_mutex ); ldap_pvt_thread_mutex_init( &lload_pin_mutex ); @@ -74,6 +78,20 @@ lload_conn_pool_init() return rc; } +static int +lload_pause_cb( BackendInfo *bi ) +{ + lload_pause_server(); + return 0; +} + +static int +lload_unpause_cb( BackendInfo *bi ) +{ + lload_unpause_server(); + return 0; +} + int lload_back_open( BackendInfo *bi ) { @@ -110,7 +128,10 @@ lload_back_close( BackendInfo *bi ) return 0; } + ldap_pvt_thread_mutex_lock( &lload_wait_mutex ); event_base_loopexit( daemon_base, NULL ); + ldap_pvt_thread_cond_wait( &lload_wait_cond, &lload_wait_mutex ); + ldap_pvt_thread_mutex_unlock( &lload_wait_mutex ); ldap_pvt_thread_join( lloadd_main_thread, (void *)NULL ); return 0; @@ -122,6 +143,8 @@ lload_back_initialize( BackendInfo *bi ) bi->bi_flags = SLAP_BFLAG_STANDALONE; bi->bi_open = lload_back_open; bi->bi_config = config_generic_wrapper; + bi->bi_pause = lload_pause_cb; + bi->bi_unpause = lload_unpause_cb; bi->bi_close = lload_back_close; bi->bi_destroy = 0; diff --git a/servers/lloadd/proto-lload.h b/servers/lloadd/proto-lload.h index adfe96c145..25baf8b775 100644 --- a/servers/lloadd/proto-lload.h +++ b/servers/lloadd/proto-lload.h @@ -104,6 +104,9 @@ LDAP_SLAPD_V (int) lload_daemon_mask; LDAP_SLAPD_F (void) lload_sig_shutdown( evutil_socket_t sig, short what, void *arg ); +LDAP_SLAPD_F (void) lload_pause_server( void ); +LDAP_SLAPD_F (void) lload_unpause_server( void ); + LDAP_SLAPD_V (struct event_base *) daemon_base; LDAP_SLAPD_V (struct evdns_base *) dnsbase; LDAP_SLAPD_V (volatile sig_atomic_t) slapd_shutdown;