From 1f6d8611a3dfa6662a50a5e8ea33fd405796e7b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Kuzn=C3=ADk?= Date: Fri, 29 Mar 2019 12:56:24 +0000 Subject: [PATCH] Implement read throttling when writes backlog Reject operations in such a case with LDAP_BUSY. If read_event feature is on, just stop reading from the connection. However this could still result in deadlocks in reasonable situations. Need to figure out better ways to make it safe and still protect ourselves. --- servers/lloadd/client.c | 5 +++ servers/lloadd/config.c | 1 + servers/lloadd/connection.c | 68 +++++++++++++++++++++++++++++++------ servers/lloadd/daemon.c | 5 +++ servers/lloadd/extended.c | 1 + servers/lloadd/lload.h | 14 +++++++- servers/lloadd/upstream.c | 11 ++++++ 7 files changed, 94 insertions(+), 11 deletions(-) diff --git a/servers/lloadd/client.c b/servers/lloadd/client.c index f3e2870b66..19c15af30e 100644 --- a/servers/lloadd/client.c +++ b/servers/lloadd/client.c @@ -266,6 +266,11 @@ handle_one_request( LloadConnection *c ) 0 ); return LDAP_SUCCESS; } + if ( c->c_io_state & LLOAD_C_READ_PAUSE ) { + operation_send_reject( op, LDAP_BUSY, + "writing side backlogged, please keep reading", 0 ); + return LDAP_SUCCESS; + } if ( op->o_tag == LDAP_REQ_EXTENDED ) { handler = request_extended; } else { diff --git a/servers/lloadd/config.c b/servers/lloadd/config.c index e9ea231850..638c167b70 100644 --- a/servers/lloadd/config.c +++ b/servers/lloadd/config.c @@ -1856,6 +1856,7 @@ config_feature( ConfigArgs *c ) { BER_BVC("vc"), LLOAD_FEATURE_VC }, #endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */ { BER_BVC("proxyauthz"), LLOAD_FEATURE_PROXYAUTHZ }, + { BER_BVC("read_pause"), LLOAD_FEATURE_PAUSE }, { BER_BVNULL, 0 } }; slap_mask_t mask = 0; diff --git a/servers/lloadd/connection.c b/servers/lloadd/connection.c index 0bcbc73fcb..3bfa2d8820 100644 --- a/servers/lloadd/connection.c +++ b/servers/lloadd/connection.c @@ -102,6 +102,10 @@ handle_pdus( void *ctx, void *arg ) c->c_currentber = ber; checked_lock( &c->c_io_mutex ); + if ( (lload_features & LLOAD_FEATURE_PAUSE) && + (c->c_io_state & LLOAD_C_READ_PAUSE) ) { + goto pause; + } tag = ber_get_next( c->c_sb, &len, ber ); checked_unlock( &c->c_io_mutex ); if ( tag != LDAP_TAG_MESSAGE ) { @@ -135,10 +139,18 @@ handle_pdus( void *ctx, void *arg ) assert( IS_ALIVE( c, c_refcnt ) ); } - event_add( c->c_read_event, c->c_read_timeout ); - Debug( LDAP_DEBUG_CONNS, "handle_pdus: " - "re-enabled read event on connid=%lu\n", - c->c_connid ); + checked_lock( &c->c_io_mutex ); + if ( !(lload_features & LLOAD_FEATURE_PAUSE) || + !(c->c_io_state & LLOAD_C_READ_PAUSE) ) { + event_add( c->c_read_event, c->c_read_timeout ); + Debug( LDAP_DEBUG_CONNS, "handle_pdus: " + "re-enabled read event on connid=%lu\n", + c->c_connid ); + } +pause: + c->c_io_state &= ~LLOAD_C_READ_HANDOVER; + checked_unlock( &c->c_io_mutex ); + done: RELEASE_REF( c, c_refcnt, c->c_destroy ); epoch_leave( epoch ); @@ -160,6 +172,7 @@ connection_read_cb( evutil_socket_t s, short what, void *arg ) ber_tag_t tag; ber_len_t len; epoch_t epoch; + int pause; if ( !IS_ALIVE( c, c_live ) ) { event_del( c->c_read_event ); @@ -199,7 +212,9 @@ connection_read_cb( evutil_socket_t s, short what, void *arg ) c->c_currentber = ber; checked_lock( &c->c_io_mutex ); + assert( !(c->c_io_state & LLOAD_C_READ_HANDOVER) ); tag = ber_get_next( c->c_sb, &len, ber ); + pause = c->c_io_state & LLOAD_C_READ_PAUSE; checked_unlock( &c->c_io_mutex ); if ( tag != LDAP_TAG_MESSAGE ) { @@ -229,20 +244,34 @@ connection_read_cb( evutil_socket_t s, short what, void *arg ) CONNECTION_LOCK_DESTROY(c); goto out; } - event_add( c->c_read_event, c->c_read_timeout ); - Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " - "re-enabled read event on connid=%lu\n", - c->c_connid ); + if ( !(lload_features & LLOAD_FEATURE_PAUSE) || !pause ) { + event_add( c->c_read_event, c->c_read_timeout ); + Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " + "re-enabled read event on connid=%lu\n", + c->c_connid ); + } goto out; } + checked_lock( &c->c_io_mutex ); + c->c_io_state |= LLOAD_C_READ_HANDOVER; + checked_unlock( &c->c_io_mutex ); event_del( c->c_read_event ); + if ( !lload_conn_max_pdus_per_cycle || ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) { /* If we're overloaded or configured as such, process one and resume in * the next cycle. */ - event_add( c->c_read_event, c->c_read_timeout ); - c->c_pdu_cb( c ); + int rc = c->c_pdu_cb( c ); + + checked_lock( &c->c_io_mutex ); + c->c_io_state &= ~LLOAD_C_READ_HANDOVER; + if ( rc == LDAP_SUCCESS && + ( !(lload_features & LLOAD_FEATURE_PAUSE) || + !(c->c_io_state & LLOAD_C_READ_PAUSE) ) ) { + event_add( c->c_read_event, c->c_read_timeout ); + } + checked_unlock( &c->c_io_mutex ); goto out; } @@ -313,9 +342,28 @@ connection_write_cb( evutil_socket_t s, short what, void *arg ) CONNECTION_LOCK_DESTROY(c); goto done; } + + if ( !(c->c_io_state & LLOAD_C_READ_PAUSE) ) { + Debug( LDAP_DEBUG_CONNS, "connection_write_cb: " + "connection connid=%lu blocked on writing, marking " + "paused\n", + c->c_connid ); + } + c->c_io_state |= LLOAD_C_READ_PAUSE; + + /* TODO: Do not reset write timeout unless we wrote something */ event_add( c->c_write_event, lload_write_timeout ); } else { c->c_pendingber = NULL; + if ( c->c_io_state & LLOAD_C_READ_PAUSE ) { + c->c_io_state ^= LLOAD_C_READ_PAUSE; + Debug( LDAP_DEBUG_CONNS, "connection_write_cb: " + "Unpausing connection connid=%lu\n", + c->c_connid ); + if ( !(c->c_io_state & LLOAD_C_READ_HANDOVER) ) { + event_add( c->c_read_event, c->c_read_timeout ); + } + } } checked_unlock( &c->c_io_mutex ); diff --git a/servers/lloadd/daemon.c b/servers/lloadd/daemon.c index 853b380ba5..f03e1ab48b 100644 --- a/servers/lloadd/daemon.c +++ b/servers/lloadd/daemon.c @@ -1637,6 +1637,8 @@ lload_handle_global_invalidation( LloadChange *change ) * - ProxyAuthz: * - on: nothing needed * - off: clear c_auth/privileged on each client + * - read pause (WIP): + * - nothing needed? */ assert( change->target ); @@ -1644,6 +1646,9 @@ lload_handle_global_invalidation( LloadChange *change ) assert(0); feature_diff &= ~LLOAD_FEATURE_VC; } + if ( feature_diff & LLOAD_FEATURE_PAUSE ) { + feature_diff &= ~LLOAD_FEATURE_PAUSE; + } if ( feature_diff & LLOAD_FEATURE_PROXYAUTHZ ) { if ( !(lload_features & LLOAD_FEATURE_PROXYAUTHZ) ) { LloadConnection *c; diff --git a/servers/lloadd/extended.c b/servers/lloadd/extended.c index 330523c076..f8e491f950 100644 --- a/servers/lloadd/extended.c +++ b/servers/lloadd/extended.c @@ -89,6 +89,7 @@ handle_starttls( LloadConnection *c, LloadOperation *op ) ber_printf( output, "t{tit{ess}}", LDAP_TAG_MESSAGE, LDAP_TAG_MSGID, op->o_client_msgid, LDAP_RES_EXTENDED, LDAP_SUCCESS, "", "" ); + c->c_io_state &= ~LLOAD_C_READ_HANDOVER; checked_unlock( &c->c_io_mutex ); CONNECTION_LOCK(c); diff --git a/servers/lloadd/lload.h b/servers/lloadd/lload.h index 0de410bebe..72c3451172 100644 --- a/servers/lloadd/lload.h +++ b/servers/lloadd/lload.h @@ -174,6 +174,7 @@ typedef enum { LLOAD_FEATURE_VC = 1 << 0, #endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */ LLOAD_FEATURE_PROXYAUTHZ = 1 << 1, + LLOAD_FEATURE_PAUSE = 1 << 2, } lload_features_t; #ifdef BALANCER_MODULE @@ -272,7 +273,8 @@ enum sc_state { LLOAD_C_CLOSING, /* closing */ LLOAD_C_ACTIVE, /* exclusive operation (tls setup, ...) in progress */ LLOAD_C_BINDING, /* binding */ - LLOAD_C_DYING, /* part-processed dead but someone still holds a reference */ + LLOAD_C_DYING, /* part-processed dead waiting to be freed, someone + * might still be observing it */ }; enum sc_type { LLOAD_C_OPEN = 0, /* regular connection */ @@ -280,12 +282,22 @@ enum sc_type { LLOAD_C_BIND, /* connection used to handle bind client requests if VC not enabled */ LLOAD_C_PRIVILEGED, /* connection can override proxyauthz control */ }; +enum sc_io_state { + LLOAD_C_OPERATIONAL = 0, /* all is good */ + LLOAD_C_READ_HANDOVER = 1 << 0, /* A task to process PDUs is scheduled or + * running, do not re-enable c_read_event */ + LLOAD_C_READ_PAUSE = 1 << 1, /* We want to pause reading until the client + * has sufficiently caught up with what we + * sent */ +}; + /* * represents a connection from an ldap client/to ldap server */ struct LloadConnection { enum sc_state c_state; /* connection state */ enum sc_type c_type; + enum sc_io_state c_io_state; ber_socket_t c_fd; /* diff --git a/servers/lloadd/upstream.c b/servers/lloadd/upstream.c index dd153923b5..352adec18a 100644 --- a/servers/lloadd/upstream.c +++ b/servers/lloadd/upstream.c @@ -513,6 +513,9 @@ upstream_bind_cb( LloadConnection *c ) goto fail; } + checked_lock( &c->c_io_mutex ); + c->c_io_state &= ~LLOAD_C_READ_HANDOVER; + checked_unlock( &c->c_io_mutex ); event_add( c->c_read_event, c->c_read_timeout ); ber_free( ber, 1 ); return -1; @@ -578,6 +581,9 @@ upstream_bind( void *ctx, void *arg ) } #endif /* HAVE_CYRUS_SASL */ } + /* TODO: can we be paused at this point? Then we'd have to move this line + * after connection_write_cb */ + c->c_io_state &= ~LLOAD_C_READ_HANDOVER; checked_unlock( &c->c_io_mutex ); connection_write_cb( -1, 0, c ); @@ -832,11 +838,16 @@ upstream_starttls( LloadConnection *c ) ber_free( ber, 1 ); CONNECTION_UNLOCK(c); + checked_lock( &c->c_io_mutex ); + c->c_io_state &= ~LLOAD_C_READ_HANDOVER; + checked_unlock( &c->c_io_mutex ); + return rc; } base = event_get_base( c->c_read_event ); + c->c_io_state &= ~LLOAD_C_READ_HANDOVER; event_del( c->c_read_event ); event_del( c->c_write_event );