diff --git a/servers/lloadd/client.c b/servers/lloadd/client.c index f3e2870b66..19c15af30e 100644 --- a/servers/lloadd/client.c +++ b/servers/lloadd/client.c @@ -266,6 +266,11 @@ handle_one_request( LloadConnection *c ) 0 ); return LDAP_SUCCESS; } + if ( c->c_io_state & LLOAD_C_READ_PAUSE ) { + operation_send_reject( op, LDAP_BUSY, + "writing side backlogged, please keep reading", 0 ); + return LDAP_SUCCESS; + } if ( op->o_tag == LDAP_REQ_EXTENDED ) { handler = request_extended; } else { diff --git a/servers/lloadd/config.c b/servers/lloadd/config.c index e9ea231850..638c167b70 100644 --- a/servers/lloadd/config.c +++ b/servers/lloadd/config.c @@ -1856,6 +1856,7 @@ config_feature( ConfigArgs *c ) { BER_BVC("vc"), LLOAD_FEATURE_VC }, #endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */ { BER_BVC("proxyauthz"), LLOAD_FEATURE_PROXYAUTHZ }, + { BER_BVC("read_pause"), LLOAD_FEATURE_PAUSE }, { BER_BVNULL, 0 } }; slap_mask_t mask = 0; diff --git a/servers/lloadd/connection.c b/servers/lloadd/connection.c index 0bcbc73fcb..3bfa2d8820 100644 --- a/servers/lloadd/connection.c +++ b/servers/lloadd/connection.c @@ -102,6 +102,10 @@ handle_pdus( void *ctx, void *arg ) c->c_currentber = ber; checked_lock( &c->c_io_mutex ); + if ( (lload_features & LLOAD_FEATURE_PAUSE) && + (c->c_io_state & LLOAD_C_READ_PAUSE) ) { + goto pause; + } tag = ber_get_next( c->c_sb, &len, ber ); checked_unlock( &c->c_io_mutex ); if ( tag != LDAP_TAG_MESSAGE ) { @@ -135,10 +139,18 @@ handle_pdus( void *ctx, void *arg ) assert( IS_ALIVE( c, c_refcnt ) ); } - event_add( c->c_read_event, c->c_read_timeout ); - Debug( LDAP_DEBUG_CONNS, "handle_pdus: " - "re-enabled read event on connid=%lu\n", - c->c_connid ); + checked_lock( &c->c_io_mutex ); + if ( !(lload_features & LLOAD_FEATURE_PAUSE) || + !(c->c_io_state & LLOAD_C_READ_PAUSE) ) { + event_add( c->c_read_event, c->c_read_timeout ); + Debug( LDAP_DEBUG_CONNS, "handle_pdus: " + "re-enabled read event on connid=%lu\n", + c->c_connid ); + } +pause: + c->c_io_state &= ~LLOAD_C_READ_HANDOVER; + checked_unlock( &c->c_io_mutex ); + done: RELEASE_REF( c, c_refcnt, c->c_destroy ); epoch_leave( epoch ); @@ -160,6 +172,7 @@ connection_read_cb( evutil_socket_t s, short what, void *arg ) ber_tag_t tag; ber_len_t len; epoch_t epoch; + int pause; if ( !IS_ALIVE( c, c_live ) ) { event_del( c->c_read_event ); @@ -199,7 +212,9 @@ connection_read_cb( evutil_socket_t s, short what, void *arg ) c->c_currentber = ber; checked_lock( &c->c_io_mutex ); + assert( !(c->c_io_state & LLOAD_C_READ_HANDOVER) ); tag = ber_get_next( c->c_sb, &len, ber ); + pause = c->c_io_state & LLOAD_C_READ_PAUSE; checked_unlock( &c->c_io_mutex ); if ( tag != LDAP_TAG_MESSAGE ) { @@ -229,20 +244,34 @@ connection_read_cb( evutil_socket_t s, short what, void *arg ) CONNECTION_LOCK_DESTROY(c); goto out; } - event_add( c->c_read_event, c->c_read_timeout ); - Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " - "re-enabled read event on connid=%lu\n", - c->c_connid ); + if ( !(lload_features & LLOAD_FEATURE_PAUSE) || !pause ) { + event_add( c->c_read_event, c->c_read_timeout ); + Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " + "re-enabled read event on connid=%lu\n", + c->c_connid ); + } goto out; } + checked_lock( &c->c_io_mutex ); + c->c_io_state |= LLOAD_C_READ_HANDOVER; + checked_unlock( &c->c_io_mutex ); event_del( c->c_read_event ); + if ( !lload_conn_max_pdus_per_cycle || ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) { /* If we're overloaded or configured as such, process one and resume in * the next cycle. */ - event_add( c->c_read_event, c->c_read_timeout ); - c->c_pdu_cb( c ); + int rc = c->c_pdu_cb( c ); + + checked_lock( &c->c_io_mutex ); + c->c_io_state &= ~LLOAD_C_READ_HANDOVER; + if ( rc == LDAP_SUCCESS && + ( !(lload_features & LLOAD_FEATURE_PAUSE) || + !(c->c_io_state & LLOAD_C_READ_PAUSE) ) ) { + event_add( c->c_read_event, c->c_read_timeout ); + } + checked_unlock( &c->c_io_mutex ); goto out; } @@ -313,9 +342,28 @@ connection_write_cb( evutil_socket_t s, short what, void *arg ) CONNECTION_LOCK_DESTROY(c); goto done; } + + if ( !(c->c_io_state & LLOAD_C_READ_PAUSE) ) { + Debug( LDAP_DEBUG_CONNS, "connection_write_cb: " + "connection connid=%lu blocked on writing, marking " + "paused\n", + c->c_connid ); + } + c->c_io_state |= LLOAD_C_READ_PAUSE; + + /* TODO: Do not reset write timeout unless we wrote something */ event_add( c->c_write_event, lload_write_timeout ); } else { c->c_pendingber = NULL; + if ( c->c_io_state & LLOAD_C_READ_PAUSE ) { + c->c_io_state ^= LLOAD_C_READ_PAUSE; + Debug( LDAP_DEBUG_CONNS, "connection_write_cb: " + "Unpausing connection connid=%lu\n", + c->c_connid ); + if ( !(c->c_io_state & LLOAD_C_READ_HANDOVER) ) { + event_add( c->c_read_event, c->c_read_timeout ); + } + } } checked_unlock( &c->c_io_mutex ); diff --git a/servers/lloadd/daemon.c b/servers/lloadd/daemon.c index 853b380ba5..f03e1ab48b 100644 --- a/servers/lloadd/daemon.c +++ b/servers/lloadd/daemon.c @@ -1637,6 +1637,8 @@ lload_handle_global_invalidation( LloadChange *change ) * - ProxyAuthz: * - on: nothing needed * - off: clear c_auth/privileged on each client + * - read pause (WIP): + * - nothing needed? */ assert( change->target ); @@ -1644,6 +1646,9 @@ lload_handle_global_invalidation( LloadChange *change ) assert(0); feature_diff &= ~LLOAD_FEATURE_VC; } + if ( feature_diff & LLOAD_FEATURE_PAUSE ) { + feature_diff &= ~LLOAD_FEATURE_PAUSE; + } if ( feature_diff & LLOAD_FEATURE_PROXYAUTHZ ) { if ( !(lload_features & LLOAD_FEATURE_PROXYAUTHZ) ) { LloadConnection *c; diff --git a/servers/lloadd/extended.c b/servers/lloadd/extended.c index 330523c076..f8e491f950 100644 --- a/servers/lloadd/extended.c +++ b/servers/lloadd/extended.c @@ -89,6 +89,7 @@ handle_starttls( LloadConnection *c, LloadOperation *op ) ber_printf( output, "t{tit{ess}}", LDAP_TAG_MESSAGE, LDAP_TAG_MSGID, op->o_client_msgid, LDAP_RES_EXTENDED, LDAP_SUCCESS, "", "" ); + c->c_io_state &= ~LLOAD_C_READ_HANDOVER; checked_unlock( &c->c_io_mutex ); CONNECTION_LOCK(c); diff --git a/servers/lloadd/lload.h b/servers/lloadd/lload.h index 0de410bebe..72c3451172 100644 --- a/servers/lloadd/lload.h +++ b/servers/lloadd/lload.h @@ -174,6 +174,7 @@ typedef enum { LLOAD_FEATURE_VC = 1 << 0, #endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */ LLOAD_FEATURE_PROXYAUTHZ = 1 << 1, + LLOAD_FEATURE_PAUSE = 1 << 2, } lload_features_t; #ifdef BALANCER_MODULE @@ -272,7 +273,8 @@ enum sc_state { LLOAD_C_CLOSING, /* closing */ LLOAD_C_ACTIVE, /* exclusive operation (tls setup, ...) in progress */ LLOAD_C_BINDING, /* binding */ - LLOAD_C_DYING, /* part-processed dead but someone still holds a reference */ + LLOAD_C_DYING, /* part-processed dead waiting to be freed, someone + * might still be observing it */ }; enum sc_type { LLOAD_C_OPEN = 0, /* regular connection */ @@ -280,12 +282,22 @@ enum sc_type { LLOAD_C_BIND, /* connection used to handle bind client requests if VC not enabled */ LLOAD_C_PRIVILEGED, /* connection can override proxyauthz control */ }; +enum sc_io_state { + LLOAD_C_OPERATIONAL = 0, /* all is good */ + LLOAD_C_READ_HANDOVER = 1 << 0, /* A task to process PDUs is scheduled or + * running, do not re-enable c_read_event */ + LLOAD_C_READ_PAUSE = 1 << 1, /* We want to pause reading until the client + * has sufficiently caught up with what we + * sent */ +}; + /* * represents a connection from an ldap client/to ldap server */ struct LloadConnection { enum sc_state c_state; /* connection state */ enum sc_type c_type; + enum sc_io_state c_io_state; ber_socket_t c_fd; /* diff --git a/servers/lloadd/upstream.c b/servers/lloadd/upstream.c index dd153923b5..352adec18a 100644 --- a/servers/lloadd/upstream.c +++ b/servers/lloadd/upstream.c @@ -513,6 +513,9 @@ upstream_bind_cb( LloadConnection *c ) goto fail; } + checked_lock( &c->c_io_mutex ); + c->c_io_state &= ~LLOAD_C_READ_HANDOVER; + checked_unlock( &c->c_io_mutex ); event_add( c->c_read_event, c->c_read_timeout ); ber_free( ber, 1 ); return -1; @@ -578,6 +581,9 @@ upstream_bind( void *ctx, void *arg ) } #endif /* HAVE_CYRUS_SASL */ } + /* TODO: can we be paused at this point? Then we'd have to move this line + * after connection_write_cb */ + c->c_io_state &= ~LLOAD_C_READ_HANDOVER; checked_unlock( &c->c_io_mutex ); connection_write_cb( -1, 0, c ); @@ -832,11 +838,16 @@ upstream_starttls( LloadConnection *c ) ber_free( ber, 1 ); CONNECTION_UNLOCK(c); + checked_lock( &c->c_io_mutex ); + c->c_io_state &= ~LLOAD_C_READ_HANDOVER; + checked_unlock( &c->c_io_mutex ); + return rc; } base = event_get_base( c->c_read_event ); + c->c_io_state &= ~LLOAD_C_READ_HANDOVER; event_del( c->c_read_event ); event_del( c->c_write_event );