From 8f5bae921e87fa2a16e46c8213bccc32d0fc8e31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Kuzn=C3=ADk?= Date: Fri, 14 Apr 2017 09:45:18 +0100 Subject: [PATCH] Pending operation tracking and limiting --- servers/lloadd/backend.c | 19 ++++++++++++++++--- servers/lloadd/client.c | 25 +++++++++++++++++++++++++ servers/lloadd/config.c | 3 +++ servers/lloadd/operation.c | 27 +++++++++++++++++++++++++-- servers/lloadd/slap.h | 3 +++ servers/lloadd/upstream.c | 15 +++++++++++++-- 6 files changed, 85 insertions(+), 7 deletions(-) diff --git a/servers/lloadd/backend.c b/servers/lloadd/backend.c index 89123f9c76..b4614e58a6 100644 --- a/servers/lloadd/backend.c +++ b/servers/lloadd/backend.c @@ -100,6 +100,15 @@ backend_select( Operation *op ) Connection *c; ldap_pvt_thread_mutex_lock( &b->b_mutex ); + + if ( b->b_max_pending && b->b_n_ops_executing >= b->b_max_pending ) { + Debug( LDAP_DEBUG_CONNS, "backend_select: " + "backend %s too busy\n", + b->b_bindconf.sb_uri.bv_val ); + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + continue; + } + if ( op->o_tag == LDAP_REQ_BIND && !(lload_features & LLOAD_FEATURE_VC) ) { head = &b->b_bindconns; @@ -112,11 +121,15 @@ backend_select( Operation *op ) LDAP_LIST_FOREACH( c, head, c_next ) { ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); - if ( c->c_state == SLAP_C_READY && !c->c_pendingber ) { + if ( c->c_state == SLAP_C_READY && !c->c_pendingber && + ( b->b_max_conn_pending == 0 || + c->c_n_ops_executing < b->b_max_conn_pending ) ) { Debug( LDAP_DEBUG_CONNS, "backend_select: " "selected connection %lu for client %lu msgid=%d\n", - c->c_connid, op->o_client->c_connid, - op->o_client_msgid ); + c->c_connid, op->o_client_connid, op->o_client_msgid ); + + b->b_n_ops_executing++; + c->c_n_ops_executing++; ldap_pvt_thread_mutex_unlock( &b->b_mutex ); return c; } diff --git a/servers/lloadd/client.c b/servers/lloadd/client.c index 9220eb51e9..bc21baeb15 100644 --- a/servers/lloadd/client.c +++ b/servers/lloadd/client.c @@ -201,6 +201,12 @@ fail: void client_destroy( Connection *c ) { + TAvlnode *root, *node; + + Debug( LDAP_DEBUG_CONNS, "client_destroy: " + "destroying client %lu\n", + c->c_connid ); + assert( c->c_read_event != NULL ); event_del( c->c_read_event ); event_free( c->c_read_event ); @@ -209,6 +215,25 @@ client_destroy( Connection *c ) event_del( c->c_write_event ); event_free( c->c_write_event ); + root = c->c_ops; + c->c_ops = NULL; + + if ( !BER_BVISNULL( &c->c_auth ) ) { + ch_free( c->c_auth.bv_val ); + } + c->c_state = SLAP_C_INVALID; connection_destroy( c ); + + if ( !root ) return; + + /* We don't hold c_mutex anymore */ + node = tavl_end( root, TAVL_DIR_LEFT ); + do { + Operation *op = node->avl_data; + + op->o_client = NULL; + operation_abandon( op ); + } while ( (node = tavl_next( node, TAVL_DIR_RIGHT )) ); + tavl_free( root, NULL ); } diff --git a/servers/lloadd/config.c b/servers/lloadd/config.c index 531e2196d9..e3cb01cf6f 100644 --- a/servers/lloadd/config.c +++ b/servers/lloadd/config.c @@ -1917,6 +1917,9 @@ static slap_cf_aux_table bindkey[] = { { BER_BVC("numconns="), offsetof(Backend, b_numconns), 'i', 0, NULL }, { BER_BVC("bindconns="), offsetof(Backend, b_numbindconns), 'i', 0, NULL }, { BER_BVC("retry="), offsetof(Backend, b_retry_timeout), 'i', 0, NULL }, + + { BER_BVC("max-pending-ops="), offsetof(Backend, b_max_pending), 'i', 0, NULL }, + { BER_BVC("conn-max-pending="), offsetof(Backend, b_max_conn_pending), 'i', 0, NULL }, #ifdef HAVE_TLS { BER_BVC("starttls="), offsetof(Backend, b_bindconf.sb_tls), 'i', 0, tlskey }, { BER_BVC("tls_cert="), offsetof(Backend, b_bindconf.sb_tls_cert), 's', 1, NULL }, diff --git a/servers/lloadd/operation.c b/servers/lloadd/operation.c index 185d07a3fa..3f4465dae3 100644 --- a/servers/lloadd/operation.c +++ b/servers/lloadd/operation.c @@ -115,15 +115,28 @@ operation_destroy( Operation *op ) if ( op->o_client ) { c = op->o_client; ldap_pvt_thread_mutex_lock( &c->c_mutex ); - tavl_delete( &c->c_ops, op, operation_client_cmp ); + if ( tavl_delete( &c->c_ops, op, operation_client_cmp ) ) { + c->c_n_ops_executing--; + } ldap_pvt_thread_mutex_unlock( &c->c_mutex ); } if ( op->o_upstream ) { + Backend *b = NULL; + c = op->o_upstream; ldap_pvt_thread_mutex_lock( &c->c_mutex ); - tavl_delete( &c->c_ops, op, operation_upstream_cmp ); + if ( tavl_delete( &c->c_ops, op, operation_upstream_cmp ) ) { + c->c_n_ops_executing--; + b = (Backend *)c->c_private; + } ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + + if ( b ) { + ldap_pvt_thread_mutex_lock( &b->b_mutex ); + b->b_n_ops_executing--; + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + } } ch_free( op ); @@ -177,6 +190,7 @@ operation_init( Connection *c, BerElement *ber ) slap_msgtype2str( op->o_tag ), op->o_client_msgid, op->o_client_connid ); + c->c_n_ops_executing++; return op; fail: @@ -192,9 +206,14 @@ operation_abandon( Operation *op ) if ( op->o_upstream ) { Connection *c = op->o_upstream; BerElement *ber; + Backend *b; ldap_pvt_thread_mutex_lock( &c->c_mutex ); rc = ( tavl_delete( &c->c_ops, op, operation_upstream_cmp ) == NULL ); + if ( !rc ) { + c->c_n_ops_executing--; + } + b = (Backend *)c->c_private; ldap_pvt_thread_mutex_unlock( &c->c_mutex ); if ( rc ) { @@ -202,6 +221,10 @@ operation_abandon( Operation *op ) goto done; } + ldap_pvt_thread_mutex_lock( &b->b_mutex ); + b->b_n_ops_executing--; + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); ber = c->c_pendingber; diff --git a/servers/lloadd/slap.h b/servers/lloadd/slap.h index 9e06ed51b7..fba2a68907 100644 --- a/servers/lloadd/slap.h +++ b/servers/lloadd/slap.h @@ -255,6 +255,9 @@ struct Backend { int b_bindavail, b_active, b_opening; LDAP_LIST_HEAD(ConnSt, Connection) b_conns, b_bindconns; + long b_max_pending, b_max_conn_pending; + long b_n_ops_executing; + LDAP_STAILQ_ENTRY(Backend) b_next; }; diff --git a/servers/lloadd/upstream.c b/servers/lloadd/upstream.c index bd77b6a9a3..c52e872ffa 100644 --- a/servers/lloadd/upstream.c +++ b/servers/lloadd/upstream.c @@ -258,7 +258,10 @@ static int handle_unsolicited( Connection *c, BerElement *ber ) { TAvlnode *root; - int freed; + Backend *b; + long freed, executing; + + b = (Backend *)c->c_private; Debug( LDAP_DEBUG_CONNS, "handle_unsolicited: " "teardown for upstream connection %lu\n", @@ -266,17 +269,24 @@ handle_unsolicited( Connection *c, BerElement *ber ) root = c->c_ops; c->c_ops = NULL; + executing = c->c_n_ops_executing; + c->c_n_ops_executing = 0; ldap_pvt_thread_mutex_unlock( &c->c_mutex ); freed = tavl_free( root, (AVL_FREE)operation_lost_upstream ); + assert( freed == executing ); Debug( LDAP_DEBUG_TRACE, "handle_unsolicited: " - "dropped %d operations\n", + "dropped %ld operations\n", freed ); ldap_pvt_thread_mutex_lock( &c->c_mutex ); upstream_destroy( c ); ber_free( ber, 1 ); + ldap_pvt_thread_mutex_lock( &b->b_mutex ); + b->b_n_ops_executing -= executing; + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + return -1; } @@ -830,6 +840,7 @@ upstream_destroy( Connection *c ) } else { b->b_active--; } + b->b_n_ops_executing -= c->c_n_ops_executing; ldap_pvt_thread_mutex_unlock( &b->b_mutex ); backend_retry( b );