Epoch based memory reclamation

Similar to the algorithm presented in
https://www.cl.cam.ac.uk/techreports/UCAM-CL-TR-579.pdf

Not completely lock-free at the moment. Also the problems with epoch
based memory reclamation are still present - a thread actively observing
an epoch getting stuck will prevent LloadConnections and LloadOperations
being freed, potentially running out of memory.
This commit is contained in:
Ondřej Kuzník 2018-08-17 12:28:13 +01:00
parent aab6af1c4e
commit dc1961cb15
15 changed files with 1127 additions and 1120 deletions

View file

@ -20,7 +20,7 @@ NT_SRCS = nt_svc.c
NT_OBJS = nt_svc.o ../../libraries/liblutil/slapdmsg.res
SRCS = backend.c bind.c config.c connection.c client.c \
daemon.c extended.c init.c operation.c \
daemon.c epoch.c extended.c init.c operation.c \
upstream.c libevent_support.c \
$(@PLAT@_SRCS)

View file

@ -33,6 +33,7 @@ upstream_connect_cb( evutil_socket_t s, short what, void *arg )
LloadPendingConnection *conn = arg;
LloadBackend *b = conn->backend;
int error = 0, rc = -1;
epoch_t epoch;
ldap_pvt_thread_mutex_lock( &b->b_mutex );
Debug( LDAP_DEBUG_CONNS, "upstream_connect_cb: "
@ -44,6 +45,8 @@ upstream_connect_cb( evutil_socket_t s, short what, void *arg )
goto preempted;
}
epoch = epoch_join();
if ( what == EV_WRITE ) {
socklen_t optlen = sizeof(error);
@ -53,6 +56,7 @@ upstream_connect_cb( evutil_socket_t s, short what, void *arg )
}
if ( error == EINTR || error == EINPROGRESS || error == EWOULDBLOCK ) {
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
epoch_leave( epoch );
return;
} else if ( error ) {
goto done;
@ -63,6 +67,8 @@ upstream_connect_cb( evutil_socket_t s, short what, void *arg )
}
done:
epoch_leave( epoch );
LDAP_LIST_REMOVE( conn, next );
if ( rc ) {
evutil_closesocket( conn->fd );
@ -93,6 +99,7 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg )
{
LloadBackend *b = arg;
ber_socket_t s = AC_SOCKET_INVALID;
epoch_t epoch;
int rc;
if ( result == EVUTIL_EAI_CANCEL ) {
@ -111,6 +118,7 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg )
}
b->b_dns_req = NULL;
epoch = epoch_join();
if ( result || !res ) {
Debug( LDAP_DEBUG_ANY, "upstream_name_cb: "
"name resolution failed for backend '%s': %s\n",
@ -176,6 +184,7 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg )
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
evutil_freeaddrinfo( res );
epoch_leave( epoch );
return;
fail:
@ -189,6 +198,7 @@ fail:
if ( res ) {
evutil_freeaddrinfo( res );
}
epoch_leave( epoch );
}
LloadConnection *
@ -268,7 +278,6 @@ backend_select( LloadOperation *op, int *res )
}
c->c_n_ops_executing++;
c->c_counters.lc_ops_received++;
CONNECTION_UNLOCK_INCREF(c);
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
*res = LDAP_SUCCESS;
@ -356,6 +365,7 @@ backend_connect( evutil_socket_t s, short what, void *arg )
LloadBackend *b = arg;
struct evdns_getaddrinfo_request *request, *placeholder;
char *hostname;
epoch_t epoch;
ldap_pvt_thread_mutex_lock( &b->b_mutex );
assert( b->b_dns_req == NULL );
@ -372,6 +382,8 @@ backend_connect( evutil_socket_t s, short what, void *arg )
return;
}
epoch = epoch_join();
Debug( LDAP_DEBUG_CONNS, "backend_connect: "
"%sattempting connection to %s\n",
(what & EV_TIMEOUT) ? "retry timeout finished, " : "",
@ -438,6 +450,7 @@ backend_connect( evutil_socket_t s, short what, void *arg )
}
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
epoch_leave( epoch );
return;
}
#endif /* LDAP_PF_LOCAL */
@ -473,6 +486,7 @@ backend_connect( evutil_socket_t s, short what, void *arg )
b->b_dns_req = request;
}
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
epoch_leave( epoch );
return;
fail:
@ -480,6 +494,7 @@ fail:
b->b_failed++;
backend_retry( b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
epoch_leave( epoch );
}
void *

View file

@ -34,7 +34,8 @@ bind_mech_external(
{
BerValue binddn;
void *ssl;
char *ptr;
char *ptr, *message = "";
int result = LDAP_SUCCESS;
client->c_state = LLOAD_C_READY;
client->c_type = LLOAD_C_OPEN;
@ -49,14 +50,16 @@ bind_mech_external(
* allow that.
*/
if ( !BER_BVISEMPTY( credentials ) ) {
return operation_send_reject_locked( op, LDAP_UNWILLING_TO_PERFORM,
"proxy authorization is not supported", 1 );
result = LDAP_UNWILLING_TO_PERFORM;
message = "proxy authorization is not supported";
goto done;
}
ssl = ldap_pvt_tls_sb_ctx( client->c_sb );
if ( !ssl || ldap_pvt_tls_get_peer_dn( ssl, &binddn, NULL, 0 ) ) {
return operation_send_reject_locked( op, LDAP_INVALID_CREDENTIALS,
"no externally negotiated identity", 1 );
result = LDAP_INVALID_CREDENTIALS;
message = "no externally negotiated identity";
goto done;
}
client->c_auth.bv_len = binddn.bv_len + STRLENOF("dn:");
client->c_auth.bv_val = ch_malloc( client->c_auth.bv_len + 1 );
@ -71,22 +74,20 @@ bind_mech_external(
client->c_type = LLOAD_C_PRIVILEGED;
}
return operation_send_reject_locked( op, LDAP_SUCCESS, "", 1 );
done:
CONNECTION_UNLOCK(client);
operation_send_reject( op, result, message, 1 );
return LDAP_SUCCESS;
}
/*
* On entering the function, we've put a reference on both connections and hold
* upstream's c_io_mutex.
*/
static int
client_bind(
LloadOperation *op,
LloadConnection *upstream,
struct berval *binddn,
ber_tag_t tag,
struct berval *auth )
{
LloadConnection *upstream = op->o_upstream;
ber_printf( upstream->c_pendingber, "t{titOtO}", LDAP_TAG_MESSAGE,
LDAP_TAG_MSGID, op->o_upstream_msgid,
LDAP_REQ_BIND, &op->o_request,
@ -96,19 +97,14 @@ client_bind(
}
#ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS
/*
* On entering the function, we've put a reference on both connections and hold
* upstream's c_io_mutex.
*/
static int
client_bind_as_vc(
LloadOperation *op,
LloadConnection *upstream,
struct berval *binddn,
ber_tag_t tag,
struct berval *auth )
{
LloadConnection *upstream = op->o_upstream;
CONNECTION_LOCK(upstream);
ber_printf( upstream->c_pendingber, "t{tit{tst{{tOOtOtO}}}}", LDAP_TAG_MESSAGE,
LDAP_TAG_MSGID, op->o_upstream_msgid,
@ -192,9 +188,12 @@ request_bind( LloadConnection *client, LloadOperation *op )
struct berval binddn, auth, mech = BER_BVNULL;
ber_int_t version;
ber_tag_t tag;
unsigned long pin = client->c_pin_id;
unsigned long pin;
int res, rc = LDAP_SUCCESS;
CONNECTION_LOCK(client);
pin = client->c_pin_id;
if ( pin ) {
LloadOperation *pinned_op, needle = {
.o_client_connid = client->c_connid,
@ -222,25 +221,28 @@ request_bind( LloadConnection *client, LloadOperation *op )
pinned_op->o_request = op->o_request;
pinned_op->o_ctrls = op->o_ctrls;
/*
* pinned_op is accessible from the upstream, protect it since we
* lose the client lock in operation_destroy_from_client temporarily
*/
pinned_op->o_client_refcnt++;
/* Noone has seen this operation yet, plant the pin back in its stead */
client->c_n_ops_executing--;
op->o_res = LLOAD_OP_COMPLETED;
tavl_delete( &client->c_ops, op, operation_client_cmp );
op->o_client = NULL;
assert( op->o_upstream == NULL );
rc = tavl_insert( &client->c_ops, pinned_op, operation_client_cmp,
avl_dup_error );
assert( rc == LDAP_SUCCESS );
/* Noone has seen this operation yet */
op->o_refcnt--;
operation_destroy( op );
/* We didn't start a new operation, just continuing an existing one */
lload_stats.counters[LLOAD_STATS_OPS_BIND].lc_ops_received--;
operation_destroy_from_client( op );
pinned_op->o_client_refcnt--;
op = pinned_op;
}
}
/* protect the Bind operation */
op->o_client_refcnt++;
tavl_delete( &client->c_ops, op, operation_client_cmp );
client_reset( client );
@ -259,10 +261,11 @@ request_bind( LloadConnection *client, LloadOperation *op )
"failed to parse version field\n" );
goto fail;
} else if ( version != LDAP_VERSION3 ) {
operation_send_reject_locked(
CONNECTION_UNLOCK(client);
operation_send_reject(
op, LDAP_PROTOCOL_ERROR, "LDAP version unsupported", 1 );
ber_free( copy, 0 );
return LDAP_SUCCESS;
CONNECTION_LOCK(client);
goto fail;
}
tag = ber_get_stringbv( copy, &binddn, LBER_BV_NOTERM );
@ -307,10 +310,7 @@ request_bind( LloadConnection *client, LloadOperation *op )
/* terminate the upstream side if client switched mechanisms */
if ( pin ) {
op->o_client_refcnt++;
CONNECTION_UNLOCK_INCREF(client);
operation_abandon( op );
CONNECTION_LOCK_DECREF(client);
}
ber_free( copy, 0 );
@ -326,26 +326,28 @@ request_bind( LloadConnection *client, LloadOperation *op )
rc = tavl_insert( &client->c_ops, op, operation_client_cmp, avl_dup_error );
assert( rc == LDAP_SUCCESS );
CONNECTION_UNLOCK_INCREF(client);
CONNECTION_UNLOCK(client);
if ( pin ) {
ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
upstream = op->o_upstream;
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
if ( upstream ) {
ldap_pvt_thread_mutex_lock( &upstream->c_io_mutex );
CONNECTION_LOCK(upstream);
if ( !upstream->c_live ) {
CONNECTION_UNLOCK(upstream);
ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
upstream = NULL;
}
}
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
}
/* If we were pinned but lost the link, don't look for a new upstream, we
* have to reject the op and clear pin */
if ( upstream ) {
CONNECTION_UNLOCK_INCREF(upstream);
ldap_pvt_thread_mutex_lock( &upstream->c_io_mutex );
/* No need to do anything */
} else if ( !pin ) {
upstream = backend_select( op, &res );
} else {
@ -377,18 +379,27 @@ request_bind( LloadConnection *client, LloadOperation *op )
ber = upstream->c_pendingber;
if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
if ( !pin ) {
LloadBackend *b = upstream->c_private;
upstream->c_n_ops_executing--;
CONNECTION_UNLOCK(upstream);
ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_n_ops_executing--;
operation_update_backend_counters( op, b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
} else {
CONNECTION_UNLOCK(upstream);
}
Debug( LDAP_DEBUG_ANY, "request_bind: "
"ber_alloc failed\n" );
CONNECTION_LOCK_DECREF(upstream);
ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
upstream->c_state = LLOAD_C_READY;
if ( !BER_BVISNULL( &upstream->c_sasl_bind_mech ) ) {
ber_memfree( upstream->c_sasl_bind_mech.bv_val );
BER_BVZERO( &upstream->c_sasl_bind_mech );
}
CONNECTION_UNLOCK_OR_DESTROY(upstream);
CONNECTION_LOCK_DECREF(client);
operation_unlink( op );
CONNECTION_LOCK(client);
goto fail;
}
upstream->c_pendingber = ber;
@ -397,7 +408,6 @@ request_bind( LloadConnection *client, LloadOperation *op )
lload_stats.counters[LLOAD_STATS_OPS_BIND].lc_ops_forwarded++;
}
CONNECTION_LOCK(upstream);
if ( pin ) {
tavl_delete( &upstream->c_ops, op, operation_upstream_cmp );
if ( tag == LDAP_AUTH_SIMPLE ) {
@ -440,52 +450,28 @@ request_bind( LloadConnection *client, LloadOperation *op )
#ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS
if ( lload_features & LLOAD_FEATURE_VC ) {
rc = client_bind_as_vc( op, &binddn, tag, &auth );
rc = client_bind_as_vc( op, upstream, &binddn, tag, &auth );
} else
#endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */
{
rc = client_bind( op, &binddn, tag, &auth );
rc = client_bind( op, upstream, &binddn, tag, &auth );
}
ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
done:
if ( rc == LDAP_SUCCESS ) {
CONNECTION_LOCK(client);
if ( upstream ) {
ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
}
CONNECTION_LOCK(client);
if ( rc == LDAP_SUCCESS ) {
client->c_pin_id = pin;
if ( !--op->o_client_refcnt || !upstream ) {
operation_destroy_from_client( op );
if ( client->c_state == LLOAD_C_BINDING ) {
client->c_state = LLOAD_C_READY;
client->c_type = LLOAD_C_OPEN;
client->c_pin_id = 0;
if ( !BER_BVISNULL( &client->c_auth ) ) {
ch_free( client->c_auth.bv_val );
BER_BVZERO( &client->c_auth );
}
if ( !BER_BVISNULL( &client->c_sasl_bind_mech ) ) {
ber_memfree( client->c_sasl_bind_mech.bv_val );
BER_BVZERO( &client->c_sasl_bind_mech );
}
}
}
CONNECTION_UNLOCK(client);
if ( upstream ) {
connection_write_cb( -1, 0, upstream );
CONNECTION_LOCK_DECREF(upstream);
CONNECTION_UNLOCK_OR_DESTROY(upstream);
}
CONNECTION_LOCK_DECREF(client);
} else {
fail:
rc = -1;
CONNECTION_LOCK_DECREF(client);
op->o_client_refcnt--;
operation_destroy_from_client( op );
client->c_pin_id = 0;
CONNECTION_DESTROY(client);
}
@ -508,42 +494,26 @@ finish_sasl_bind(
LloadOperation *op,
BerElement *ber )
{
LloadConnection *client = op->o_client;
BerElement *output;
LloadOperation *removed;
ber_int_t msgid;
int rc;
if ( !(lload_features & LLOAD_FEATURE_PROXYAUTHZ) ) {
Debug( LDAP_DEBUG_TRACE, "finish_sasl_bind: "
"connid=%lu not configured to do proxyauthz, making no "
"attempt to resolve final authzid name\n",
op->o_client_connid );
CONNECTION_UNLOCK(upstream);
return forward_final_response( client, op, ber );
}
removed = tavl_delete( &upstream->c_ops, op, operation_upstream_cmp );
if ( !removed ) {
assert( upstream->c_state != LLOAD_C_BINDING );
/* FIXME: has client replaced this bind since? */
assert(0);
operation_destroy_from_upstream( op );
}
assert( removed == op && upstream->c_state == LLOAD_C_BINDING );
CONNECTION_UNLOCK(upstream);
Debug( LDAP_DEBUG_TRACE, "finish_sasl_bind: "
"SASL exchange in lieu of client connid=%lu to upstream "
"connid=%lu finished, resolving final authzid name\n",
op->o_client_connid, op->o_upstream_connid );
ldap_pvt_thread_mutex_lock( &upstream->c_io_mutex );
output = upstream->c_pendingber;
if ( output == NULL && (output = ber_alloc()) == NULL ) {
ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
CONNECTION_LOCK_DESTROY(upstream);
return -1;
}
upstream->c_pendingber = output;
@ -564,12 +534,18 @@ finish_sasl_bind(
ber_free( op->o_ber, 1 );
op->o_ber = ber;
/* Could we have been unlinked in the meantime? */
rc = tavl_insert(
&upstream->c_ops, op, operation_upstream_cmp, avl_dup_error );
assert( rc == LDAP_SUCCESS );
CONNECTION_UNLOCK(upstream);
Debug( LDAP_DEBUG_TRACE, "finish_sasl_bind: "
"SASL exchange in lieu of client connid=%lu to upstream "
"connid=%lu finished, resolving final authzid name msgid=%d\n",
op->o_client_connid, op->o_upstream_connid, op->o_upstream_msgid );
connection_write_cb( -1, 0, upstream );
return LDAP_SUCCESS;
}
@ -580,7 +556,7 @@ handle_bind_response(
LloadOperation *op,
BerElement *ber )
{
LloadConnection *upstream = op->o_upstream;
LloadConnection *upstream;
BerValue response;
BerElement *copy;
LloadOperation *removed;
@ -611,6 +587,13 @@ handle_bind_response(
"connid=%lu, result=%d\n",
op->o_client_msgid, op->o_client_connid, result );
ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
upstream = op->o_upstream;
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
if ( !upstream ) {
return LDAP_SUCCESS;
}
CONNECTION_LOCK(upstream);
if ( !tavl_find( upstream->c_ops, op, operation_upstream_cmp ) ) {
/*
@ -621,7 +604,6 @@ handle_bind_response(
* no response is expected
* - ???
*/
operation_destroy_from_upstream( op );
CONNECTION_UNLOCK(upstream);
return LDAP_SUCCESS;
}
@ -650,7 +632,6 @@ handle_bind_response(
} else if ( result == LDAP_SASL_BIND_IN_PROGRESS ) {
tavl_delete( &upstream->c_ops, op, operation_upstream_cmp );
op->o_upstream_msgid = 0;
op->o_upstream_refcnt++;
rc = tavl_insert(
&upstream->c_ops, op, operation_upstream_cmp, avl_dup_error );
assert( rc == LDAP_SUCCESS );
@ -665,13 +646,18 @@ handle_bind_response(
assert( op->o_client_msgid && op->o_upstream_msgid );
op->o_pin_id = 0;
if ( sasl_finished && result == LDAP_SUCCESS ) {
if ( (lload_features & LLOAD_FEATURE_PROXYAUTHZ) && sasl_finished &&
result == LDAP_SUCCESS ) {
return finish_sasl_bind( upstream, op, ber );
}
upstream->c_state = LLOAD_C_READY;
op->o_res = LLOAD_OP_COMPLETED;
}
CONNECTION_UNLOCK(upstream);
if ( !op->o_pin_id ) {
operation_unlink_upstream( op, upstream );
}
CONNECTION_LOCK(client);
removed = tavl_delete( &client->c_ops, op, operation_client_cmp );
assert( !removed || op == removed );
@ -687,7 +673,6 @@ handle_bind_response(
break;
case LDAP_SUCCESS:
default: {
op->o_client = NULL;
client->c_state = LLOAD_C_READY;
client->c_type = LLOAD_C_OPEN;
client->c_pin_id = 0;
@ -708,7 +693,7 @@ handle_bind_response(
}
}
} else {
assert( client->c_state == LLOAD_C_INVALID ||
assert( client->c_state == LLOAD_C_DYING ||
client->c_state == LLOAD_C_CLOSING );
}
CONNECTION_UNLOCK(client);
@ -729,7 +714,7 @@ handle_whoami_response(
LloadOperation *op,
BerElement *ber )
{
LloadConnection *upstream = op->o_upstream;
LloadConnection *upstream;
BerValue matched, diagmsg;
BerElement *saved_response = op->o_ber;
LloadOperation *removed;
@ -739,7 +724,7 @@ handle_whoami_response(
Debug( LDAP_DEBUG_TRACE, "handle_whoami_response: "
"connid=%ld received whoami response in lieu of connid=%ld\n",
upstream->c_connid, client->c_connid );
op->o_upstream_connid, client->c_connid );
tag = ber_scanf( ber, "{emm" /* "}" */,
&result, &matched, &diagmsg );
@ -748,38 +733,40 @@ handle_whoami_response(
return -1;
}
CONNECTION_LOCK_DECREF(upstream);
ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
upstream = op->o_upstream;
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
if ( !upstream ) {
return LDAP_SUCCESS;
}
op->o_res = LLOAD_OP_COMPLETED;
/* Clear upstream status */
operation_unlink_upstream( op, upstream );
if ( result == LDAP_PROTOCOL_ERROR ) {
LloadBackend *b;
CONNECTION_LOCK(upstream);
b = (LloadBackend *)upstream->c_private;
Debug( LDAP_DEBUG_ANY, "handle_whoami_response: "
"Who Am I? extended operation not supported on backend %s, "
"proxyauthz with clients that do SASL binds will not work "
"msg=%s!\n",
b->b_uri.bv_val, diagmsg.bv_val );
CONNECTION_UNLOCK_INCREF(upstream);
CONNECTION_UNLOCK(upstream);
operation_send_reject( op, LDAP_OTHER, "upstream protocol error", 0 );
return -1;
}
if ( upstream->c_state != LLOAD_C_CLOSING ) {
assert( upstream->c_state == LLOAD_C_BINDING );
upstream->c_state = LLOAD_C_READY;
}
if ( !BER_BVISNULL( &upstream->c_sasl_bind_mech ) ) {
ber_memfree( upstream->c_sasl_bind_mech.bv_val );
BER_BVZERO( &upstream->c_sasl_bind_mech );
}
CONNECTION_UNLOCK_INCREF(upstream);
tag = ber_peek_tag( ber, &len );
CONNECTION_LOCK_DECREF(client);
CONNECTION_LOCK(client);
assert( client->c_state == LLOAD_C_BINDING &&
BER_BVISNULL( &client->c_auth ) );
assert( client->c_state == LLOAD_C_BINDING ||
client->c_state == LLOAD_C_CLOSING );
assert( BER_BVISNULL( &client->c_auth ) );
if ( !BER_BVISNULL( &client->c_auth ) ) {
ber_memfree( client->c_auth.bv_val );
BER_BVZERO( &client->c_auth );
@ -788,8 +775,6 @@ handle_whoami_response(
if ( tag == LDAP_TAG_EXOP_RES_VALUE ) {
tag = ber_scanf( ber, "o", &client->c_auth );
if ( tag == LBER_ERROR ) {
operation_send_reject_locked(
op, LDAP_OTHER, "upstream protocol error", 0 );
CONNECTION_DESTROY(client);
return -1;
}
@ -797,13 +782,13 @@ handle_whoami_response(
removed = tavl_delete( &client->c_ops, op, operation_client_cmp );
assert( !removed || op == removed );
op->o_pin_id = 0;
Debug( LDAP_DEBUG_TRACE, "handle_whoami_response: "
"connid=%ld new authid=%s\n",
client->c_connid, client->c_auth.bv_val );
if ( client->c_state == LLOAD_C_BINDING ) {
op->o_client = NULL;
client->c_state = LLOAD_C_READY;
client->c_type = LLOAD_C_OPEN;
client->c_pin_id = 0;
@ -817,10 +802,11 @@ handle_whoami_response(
}
}
CONNECTION_UNLOCK_INCREF(client);
CONNECTION_UNLOCK(client);
/* defer the disposal of ber to operation_destroy_* */
/* defer the disposal of ber to operation_destroy */
op->o_ber = ber;
return forward_final_response( client, op, saved_response );
}
@ -847,7 +833,12 @@ handle_vc_bind_response(
tag = ber_peek_tag( ber, &len );
if ( result == LDAP_PROTOCOL_ERROR ) {
LloadConnection *upstream = op->o_upstream;
LloadConnection *upstream;
ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
upstream = op->o_upstream;
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
if ( upstream ) {
LloadBackend *b;
CONNECTION_LOCK(upstream);
@ -857,6 +848,7 @@ handle_vc_bind_response(
b->b_uri.bv_val );
CONNECTION_UNLOCK(upstream);
}
}
Debug( LDAP_DEBUG_STATS, "handle_vc_bind_response: "
"received response for bind request msgid=%d by client "
@ -872,7 +864,7 @@ handle_vc_bind_response(
tag = ber_scanf( ber, "o", &client->c_vc_cookie );
if ( tag == LBER_ERROR ) {
rc = -1;
CONNECTION_UNLOCK_INCREF(client);
CONNECTION_UNLOCK(client);
goto done;
}
tag = ber_peek_tag( ber, &len );
@ -882,7 +874,7 @@ handle_vc_bind_response(
tag = ber_scanf( ber, "m", &creds );
if ( tag == LBER_ERROR ) {
rc = -1;
CONNECTION_UNLOCK_INCREF(client);
CONNECTION_UNLOCK(client);
goto done;
}
tag = ber_peek_tag( ber, &len );
@ -892,7 +884,7 @@ handle_vc_bind_response(
tag = ber_scanf( ber, "m", &controls );
if ( tag == LBER_ERROR ) {
rc = -1;
CONNECTION_UNLOCK_INCREF(client);
CONNECTION_UNLOCK(client);
goto done;
}
}
@ -928,7 +920,7 @@ handle_vc_bind_response(
assert( client->c_state == LLOAD_C_INVALID ||
client->c_state == LLOAD_C_CLOSING );
}
CONNECTION_UNLOCK_INCREF(client);
CONNECTION_UNLOCK(client);
ldap_pvt_thread_mutex_lock( &client->c_io_mutex );
output = client->c_pendingber;
@ -952,9 +944,7 @@ handle_vc_bind_response(
}
done:
CONNECTION_LOCK_DECREF(client);
operation_destroy_from_client( op );
CONNECTION_UNLOCK_OR_DESTROY(client);
operation_unlink( op );
ber_free( ber, 1 );
return rc;
}

View file

@ -28,6 +28,8 @@ lload_c_head clients = LDAP_CIRCLEQ_HEAD_INITIALIZER( clients );
ldap_pvt_thread_mutex_t clients_mutex;
static void client_unlink( LloadConnection *upstream );
int
request_abandon( LloadConnection *c, LloadOperation *op )
{
@ -41,17 +43,19 @@ request_abandon( LloadConnection *c, LloadOperation *op )
"connid=%lu msgid=%d invalid integer sent in abandon request\n",
c->c_connid, op->o_client_msgid );
operation_destroy_from_client( op );
CONNECTION_DESTROY(c);
operation_unlink( op );
CONNECTION_LOCK_DESTROY(c);
return -1;
}
CONNECTION_LOCK(c);
request = tavl_find( c->c_ops, &needle, operation_client_cmp );
if ( !request ) {
Debug( LDAP_DEBUG_STATS, "request_abandon: "
"connid=%lu msgid=%d requests abandon of an operation "
"msgid=%d not being processed anymore\n",
c->c_connid, op->o_client_msgid, needle.o_client_msgid );
CONNECTION_UNLOCK(c);
goto done;
} else if ( request->o_tag == LDAP_REQ_BIND ) {
/* RFC 4511 states we must not allow Abandon on Binds */
@ -59,6 +63,7 @@ request_abandon( LloadConnection *c, LloadOperation *op )
"connid=%lu msgid=%d requests abandon of a bind operation "
"msgid=%d\n",
c->c_connid, op->o_client_msgid, needle.o_client_msgid );
CONNECTION_UNLOCK(c);
goto done;
}
Debug( LDAP_DEBUG_STATS, "request_abandon: "
@ -67,20 +72,14 @@ request_abandon( LloadConnection *c, LloadOperation *op )
lload_msgtype2str( request->o_tag ), needle.o_client_msgid );
if ( c->c_state == LLOAD_C_BINDING ) {
/* We have found the request and we are binding, it must be a bind
* request */
assert( request->o_tag == LDAP_REQ_BIND );
c->c_state = LLOAD_C_READY;
assert(0);
}
/* operation_abandon requires a reference since it is passed with c unlocked */
request->o_client_refcnt++;
CONNECTION_UNLOCK_INCREF(c);
CONNECTION_UNLOCK(c);
operation_abandon( request );
CONNECTION_LOCK_DECREF(c);
done:
operation_destroy_from_client( op );
operation_unlink( op );
return rc;
}
@ -92,9 +91,6 @@ request_process( LloadConnection *client, LloadOperation *op )
ber_int_t msgid;
int res, rc = LDAP_SUCCESS;
op->o_client_refcnt++;
CONNECTION_UNLOCK_INCREF(client);
upstream = backend_select( op, &res );
if ( !upstream ) {
Debug( LDAP_DEBUG_STATS, "request_process: "
@ -110,16 +106,29 @@ request_process( LloadConnection *client, LloadOperation *op )
output = upstream->c_pendingber;
if ( output == NULL && (output = ber_alloc()) == NULL ) {
LloadBackend *b = upstream->c_private;
upstream->c_n_ops_executing--;
CONNECTION_UNLOCK(upstream);
ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_n_ops_executing--;
operation_update_backend_counters( op, b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
Debug( LDAP_DEBUG_ANY, "request_process: "
"ber_alloc failed\n" );
rc = -1;
goto fail;
}
upstream->c_pendingber = output;
CONNECTION_LOCK_DECREF(upstream);
op->o_upstream_msgid = msgid = upstream->c_next_msgid++;
rc = tavl_insert(
&upstream->c_ops, op, operation_upstream_cmp, avl_dup_error );
CONNECTION_UNLOCK_INCREF(upstream);
CONNECTION_UNLOCK(upstream);
Debug( LDAP_DEBUG_TRACE, "request_process: "
"client connid=%lu added %s msgid=%d to upstream connid=%lu as "
@ -132,7 +141,7 @@ request_process( LloadConnection *client, LloadOperation *op )
if ( (lload_features & LLOAD_FEATURE_PROXYAUTHZ) &&
client->c_type != LLOAD_C_PRIVILEGED ) {
CONNECTION_LOCK_DECREF(client);
CONNECTION_LOCK(client);
Debug( LDAP_DEBUG_TRACE, "request_process: "
"proxying identity %s to upstream\n",
client->c_auth.bv_val );
@ -141,7 +150,7 @@ request_process( LloadConnection *client, LloadOperation *op )
op->o_tag, &op->o_request,
LDAP_TAG_CONTROLS,
LDAP_CONTROL_PROXY_AUTHZ, 1, &client->c_auth );
CONNECTION_UNLOCK_INCREF(client);
CONNECTION_UNLOCK(client);
if ( !BER_BVISNULL( &op->o_ctrls ) ) {
ber_write( output, op->o_ctrls.bv_val, op->o_ctrls.bv_len, 0 );
@ -157,37 +166,18 @@ request_process( LloadConnection *client, LloadOperation *op )
ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
connection_write_cb( -1, 0, upstream );
CONNECTION_LOCK_DECREF(upstream);
CONNECTION_UNLOCK_OR_DESTROY(upstream);
CONNECTION_LOCK_DECREF(client);
if ( !--op->o_client_refcnt ) {
operation_destroy_from_client( op );
}
return rc;
fail:
if ( upstream ) {
LloadBackend *b;
ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
CONNECTION_LOCK_DECREF(upstream);
upstream->c_n_ops_executing--;
b = (LloadBackend *)upstream->c_private;
CONNECTION_UNLOCK_OR_DESTROY(upstream);
ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_n_ops_executing--;
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
CONNECTION_LOCK_DESTROY(upstream);
operation_send_reject( op, LDAP_OTHER, "internal error", 0 );
}
CONNECTION_LOCK_DECREF(client);
op->o_client_refcnt--;
operation_destroy_from_client( op );
operation_unlink( op );
if ( rc ) {
CONNECTION_DESTROY(client);
CONNECTION_LOCK_DESTROY(client);
}
return rc;
}
@ -202,6 +192,7 @@ handle_one_request( LloadConnection *c )
ber = c->c_currentber;
c->c_currentber = NULL;
CONNECTION_LOCK(c);
op = operation_init( c, ber );
if ( !op ) {
Debug( LDAP_DEBUG_ANY, "handle_one_request: "
@ -211,16 +202,18 @@ handle_one_request( LloadConnection *c )
ber_free( ber, 1 );
return -1;
}
CONNECTION_UNLOCK(c);
switch ( op->o_tag ) {
case LDAP_REQ_UNBIND:
/* There is never a response for this operation */
op->o_res = LLOAD_OP_COMPLETED;
operation_destroy_from_client( op );
operation_unlink( op );
Debug( LDAP_DEBUG_STATS, "handle_one_request: "
"received unbind, closing client connid=%lu\n",
c->c_connid );
CONNECTION_DESTROY(c);
CONNECTION_LOCK_DESTROY(c);
return -1;
case LDAP_REQ_BIND:
handler = request_bind;
@ -234,16 +227,18 @@ handle_one_request( LloadConnection *c )
break;
default:
if ( c->c_state == LLOAD_C_BINDING ) {
return operation_send_reject_locked(
operation_send_reject(
op, LDAP_PROTOCOL_ERROR, "bind in progress", 0 );
return LDAP_SUCCESS;
}
handler = request_process;
break;
}
if ( c->c_state == LLOAD_C_CLOSING ) {
return operation_send_reject_locked(
operation_send_reject(
op, LDAP_UNAVAILABLE, "connection is shutting down", 0 );
return LDAP_SUCCESS;
}
return handler( c, op );
@ -256,9 +251,9 @@ void
client_tls_handshake_cb( evutil_socket_t s, short what, void *arg )
{
LloadConnection *c = arg;
epoch_t epoch;
int rc = 0;
CONNECTION_LOCK_DECREF(c);
if ( what & EV_TIMEOUT ) {
Debug( LDAP_DEBUG_CONNS, "client_tls_handshake_cb: "
"connid=%lu, timeout reached, destroying\n",
@ -274,27 +269,26 @@ client_tls_handshake_cb( evutil_socket_t s, short what, void *arg )
ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
if ( c->c_pendingber ) {
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
CONNECTION_UNLOCK_INCREF(c);
connection_write_cb( s, what, arg );
ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
CONNECTION_LOCK_DECREF(c);
CONNECTION_LOCK(c);
if ( !c->c_live ) {
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
CONNECTION_UNLOCK(c);
goto fail;
}
CONNECTION_UNLOCK(c);
/* Do we still have data pending? If so, connection_write_cb would
* already have arranged the write callback to trigger again */
ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
if ( c->c_pendingber ) {
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
CONNECTION_UNLOCK_INCREF(c);
return;
}
}
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
rc = ldap_pvt_tls_accept( c->c_sb, LLOAD_TLS_CTX );
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
if ( rc < 0 ) {
goto fail;
}
@ -308,13 +302,16 @@ client_tls_handshake_cb( evutil_socket_t s, short what, void *arg )
* This is deadlock-safe, since both share the same base - the one
* that's just running us.
*/
CONNECTION_LOCK(c);
event_del( c->c_read_event );
event_del( c->c_write_event );
c->c_read_timeout = NULL;
event_assign( c->c_read_event, base, c->c_fd, EV_READ|EV_PERSIST,
connection_read_cb, c );
if ( c->c_live ) {
event_add( c->c_read_event, c->c_read_timeout );
}
event_assign( c->c_write_event, base, c->c_fd, EV_WRITE,
connection_write_cb, c );
@ -323,24 +320,29 @@ client_tls_handshake_cb( evutil_socket_t s, short what, void *arg )
c->c_connid );
c->c_is_tls = LLOAD_TLS_ESTABLISHED;
/* The temporary reference established for us is no longer needed */
CONNECTION_UNLOCK_OR_DESTROY(c);
CONNECTION_UNLOCK(c);
return;
} else if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_WRITE, NULL ) ) {
CONNECTION_LOCK(c);
if ( c->c_live ) {
event_add( c->c_write_event, lload_write_timeout );
}
CONNECTION_UNLOCK(c);
Debug( LDAP_DEBUG_CONNS, "client_tls_handshake_cb: "
"connid=%lu need write rc=%d\n",
c->c_connid, rc );
}
CONNECTION_UNLOCK_INCREF(c);
return;
fail:
Debug( LDAP_DEBUG_CONNS, "client_tls_handshake_cb: "
"connid=%lu failed rc=%d\n",
c->c_connid, rc );
CONNECTION_DESTROY(c);
assert( c->c_ops == NULL );
epoch = epoch_join();
CONNECTION_LOCK_DESTROY(c);
epoch_leave( epoch );
}
LloadConnection *
@ -379,11 +381,11 @@ client_init(
Debug( LDAP_DEBUG_CONNS, "client_init: "
"connid=%lu failed initial TLS accept rc=%d\n",
c->c_connid, rc );
CONNECTION_LOCK(c);
goto fail;
}
if ( rc ) {
c->c_refcnt++;
c->c_read_timeout = lload_timeout_net;
read_cb = write_cb = client_tls_handshake_cb;
}
@ -393,30 +395,32 @@ client_init(
if ( !event ) {
Debug( LDAP_DEBUG_ANY, "client_init: "
"Read event could not be allocated\n" );
CONNECTION_LOCK(c);
goto fail;
}
c->c_read_event = event;
event_add( c->c_read_event, c->c_read_timeout );
event = event_new( base, s, EV_WRITE, write_cb, c );
if ( !event ) {
Debug( LDAP_DEBUG_ANY, "client_init: "
"Write event could not be allocated\n" );
CONNECTION_LOCK(c);
goto fail;
}
/* We only register the write event when we have data pending */
c->c_write_event = event;
c->c_private = listener;
c->c_destroy = client_destroy;
c->c_unlink = client_unlink;
c->c_pdu_cb = handle_one_request;
/* There should be no lock inversion yet since no other thread could
* approach it from clients side */
CONNECTION_LOCK(c);
/* We only register the write event when we have data pending */
event_add( c->c_read_event, c->c_read_timeout );
ldap_pvt_thread_mutex_lock( &clients_mutex );
LDAP_CIRCLEQ_INSERT_TAIL( &clients, c, c_next );
ldap_pvt_thread_mutex_unlock( &clients_mutex );
CONNECTION_UNLOCK(c);
return c;
@ -431,8 +435,9 @@ fail:
}
c->c_state = LLOAD_C_INVALID;
CONNECTION_DESTROY(c);
assert( c == NULL );
c->c_live--;
c->c_refcnt--;
connection_destroy( c );
return NULL;
}
@ -444,19 +449,6 @@ client_reset( LloadConnection *c )
root = c->c_ops;
c->c_ops = NULL;
/* unless op->o_client_refcnt > op->o_client_live, there is noone using the
* operation from the client side and noone new will now that we've removed
* it from client's c_ops */
if ( root ) {
TAvlnode *node = tavl_end( root, TAVL_DIR_LEFT );
do {
LloadOperation *op = node->avl_data;
/* make sure it's useable after we've unlocked the connection */
op->o_client_refcnt++;
} while ( (node = tavl_next( node, TAVL_DIR_RIGHT )) );
}
if ( !BER_BVISNULL( &c->c_auth ) ) {
ch_free( c->c_auth.bv_val );
BER_BVZERO( &c->c_auth );
@ -465,7 +457,7 @@ client_reset( LloadConnection *c )
ch_free( c->c_sasl_bind_mech.bv_val );
BER_BVZERO( &c->c_sasl_bind_mech );
}
CONNECTION_UNLOCK_INCREF(c);
CONNECTION_UNLOCK(c);
if ( root ) {
int freed;
@ -475,38 +467,29 @@ client_reset( LloadConnection *c )
freed );
}
CONNECTION_LOCK_DECREF(c);
CONNECTION_LOCK(c);
}
void
client_destroy( LloadConnection *c )
client_unlink( LloadConnection *c )
{
enum sc_state state;
struct event *read_event, *write_event;
Debug( LDAP_DEBUG_CONNS, "client_destroy: "
"destroying client connid=%lu\n",
Debug( LDAP_DEBUG_CONNS, "client_unlink: "
"removing client connid=%lu\n",
c->c_connid );
assert( c->c_state != LLOAD_C_INVALID );
assert( c->c_state != LLOAD_C_DYING );
state = c->c_state;
c->c_state = LLOAD_C_INVALID;
c->c_state = LLOAD_C_DYING;
read_event = c->c_read_event;
write_event = c->c_write_event;
CONNECTION_UNLOCK(c);
/*
* FIXME: operation_destroy_from_upstream might copy op->o_client and bump
* c_refcnt, it is then responsible to call destroy_client again, does that
* mean that we can be triggered for recursion over all connections?
*/
CONNECTION_UNLOCK_INCREF(c);
/*
* Avoid a deadlock:
* event_del will block if the event is currently executing its callback,
* that callback might be waiting to lock c->c_mutex
*/
if ( read_event ) {
event_del( read_event );
}
@ -521,7 +504,20 @@ client_destroy( LloadConnection *c )
ldap_pvt_thread_mutex_unlock( &clients_mutex );
}
CONNECTION_LOCK_DECREF(c);
CONNECTION_LOCK(c);
client_reset( c );
}
void
client_destroy( LloadConnection *c )
{
Debug( LDAP_DEBUG_CONNS, "client_destroy: "
"destroying client connid=%lu\n",
c->c_connid );
CONNECTION_LOCK(c);
assert( c->c_state == LLOAD_C_DYING );
c->c_state = LLOAD_C_INVALID;
if ( c->c_read_event ) {
event_free( c->c_read_event );
@ -533,23 +529,7 @@ client_destroy( LloadConnection *c )
c->c_write_event = NULL;
}
client_reset( c );
/*
* If we attempted to destroy any operations, we might have lent a new
* refcnt token for a thread that raced us to that, let them call us again
* later
*/
assert( c->c_refcnt >= 0 );
if ( c->c_refcnt ) {
c->c_state = LLOAD_C_DYING;
Debug( LDAP_DEBUG_CONNS, "client_destroy: "
"connid=%lu aborting with refcnt=%d\n",
c->c_connid, c->c_refcnt );
CONNECTION_UNLOCK(c);
return;
}
assert( c->c_refcnt == 0 );
connection_destroy( c );
}

View file

@ -41,15 +41,12 @@
#include "lutil.h"
#include "lutil_ldap.h"
static ldap_pvt_thread_mutex_t conn_nextid_mutex;
static unsigned long conn_nextid = 0;
static void
lload_connection_assign_nextid( LloadConnection *conn )
{
ldap_pvt_thread_mutex_lock( &conn_nextid_mutex );
conn->c_connid = conn_nextid++;
ldap_pvt_thread_mutex_unlock( &conn_nextid_mutex );
conn->c_connid = __atomic_fetch_add( &conn_nextid, 1, __ATOMIC_RELAXED );
}
/*
@ -73,37 +70,40 @@ handle_pdus( void *ctx, void *arg )
{
LloadConnection *c = arg;
int pdus_handled = 0;
epoch_t epoch;
CONNECTION_LOCK_DECREF(c);
/* A reference was passed on to us */
assert( IS_ALIVE( c, c_refcnt ) );
epoch = epoch_join();
for ( ;; ) {
BerElement *ber;
ber_tag_t tag;
ber_len_t len;
/* handle_one_response may unlock the connection in the process, we
* need to expect that might be our responsibility to destroy it */
if ( c->c_pdu_cb( c ) ) {
/* Error, connection is unlocked and might already have been
* destroyed */
return NULL;
/* Error/reset, get rid ouf our reference and bail */
goto done;
}
/* Otherwise, handle_one_request leaves the connection locked */
if ( ++pdus_handled >= lload_conn_max_pdus_per_cycle ) {
/* Do not read now, re-enable read event instead */
break;
}
if ( (ber = ber_alloc()) == NULL ) {
ber = c->c_currentber;
if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
Debug( LDAP_DEBUG_ANY, "handle_pdus: "
"connid=%lu, ber_alloc failed\n",
c->c_connid );
CONNECTION_DESTROY(c);
return NULL;
CONNECTION_LOCK_DESTROY(c);
goto done;
}
c->c_currentber = ber;
ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
tag = ber_get_next( c->c_sb, &len, ber );
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
if ( tag != LDAP_TAG_MESSAGE ) {
int err = sock_errno();
@ -123,8 +123,8 @@ handle_pdus( void *ctx, void *arg )
c->c_currentber = NULL;
ber_free( ber, 1 );
CONNECTION_DESTROY(c);
return NULL;
CONNECTION_LOCK_DESTROY(c);
goto done;
}
break;
}
@ -134,7 +134,9 @@ handle_pdus( void *ctx, void *arg )
Debug( LDAP_DEBUG_CONNS, "handle_pdus: "
"re-enabled read event on connid=%lu\n",
c->c_connid );
CONNECTION_UNLOCK_OR_DESTROY(c);
done:
RELEASE_REF( c, c_refcnt, c->c_destroy );
epoch_leave( epoch );
return NULL;
}
@ -152,25 +154,35 @@ connection_read_cb( evutil_socket_t s, short what, void *arg )
BerElement *ber;
ber_tag_t tag;
ber_len_t len;
epoch_t epoch;
CONNECTION_LOCK(c);
if ( !c->c_live ) {
event_del( c->c_read_event );
CONNECTION_UNLOCK(c);
Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
"suspended read event on a dead connid=%lu\n",
c->c_connid );
CONNECTION_UNLOCK(c);
return;
}
CONNECTION_UNLOCK(c);
if ( what & EV_TIMEOUT ) {
Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
"connid=%lu, timeout reached, destroying\n",
c->c_connid );
CONNECTION_DESTROY(c);
/* Make sure the connection stays around for us to unlock it */
epoch = epoch_join();
CONNECTION_LOCK_DESTROY(c);
epoch_leave( epoch );
return;
}
if ( !acquire_ref( &c->c_refcnt ) ) {
return;
}
epoch = epoch_join();
Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
"connection connid=%lu ready to read\n",
c->c_connid );
@ -180,12 +192,14 @@ connection_read_cb( evutil_socket_t s, short what, void *arg )
Debug( LDAP_DEBUG_ANY, "connection_read_cb: "
"connid=%lu, ber_alloc failed\n",
c->c_connid );
CONNECTION_DESTROY(c);
return;
goto out;
}
c->c_currentber = ber;
ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
tag = ber_get_next( c->c_sb, &len, ber );
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
if ( tag != LDAP_TAG_MESSAGE ) {
int err = sock_errno();
@ -210,65 +224,78 @@ connection_read_cb( evutil_socket_t s, short what, void *arg )
Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
"suspended read event on dying connid=%lu\n",
c->c_connid );
CONNECTION_DESTROY(c);
return;
CONNECTION_LOCK_DESTROY(c);
goto out;
}
event_add( c->c_read_event, c->c_read_timeout );
Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
"re-enabled read event on connid=%lu\n",
c->c_connid );
CONNECTION_UNLOCK(c);
return;
}
if ( !lload_conn_max_pdus_per_cycle ||
ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) {
/* If we're overloaded or configured as such, process one and resume in
* the next cycle.
*
* handle_one_request re-locks the mutex in the
* process, need to test it's still alive */
if ( c->c_pdu_cb( c ) == LDAP_SUCCESS ) {
CONNECTION_UNLOCK_OR_DESTROY(c);
}
return;
goto out;
}
event_del( c->c_read_event );
if ( !lload_conn_max_pdus_per_cycle ||
ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) {
/* If we're overloaded or configured as such, process one and resume in
* the next cycle. */
event_add( c->c_read_event, c->c_read_timeout );
c->c_pdu_cb( c );
goto out;
}
Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
"suspended read event on connid=%lu\n",
c->c_connid );
/* We have scheduled a call to handle_requests which takes care of
* handling further requests, just make sure the connection sticks around
* for that */
CONNECTION_UNLOCK_INCREF(c);
/*
* We have scheduled a call to handle_pdus to take care of handling this
* and further requests, its reference is now owned by that task.
*/
epoch_leave( epoch );
return;
out:
RELEASE_REF( c, c_refcnt, c->c_destroy );
epoch_leave( epoch );
}
void
connection_write_cb( evutil_socket_t s, short what, void *arg )
{
LloadConnection *c = arg;
epoch_t epoch;
CONNECTION_LOCK(c);
Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
"considering writing to%s connid=%lu what=%hd\n",
c->c_live ? " live" : " dead", c->c_connid, what );
if ( !c->c_live ) {
CONNECTION_UNLOCK(c);
return;
}
CONNECTION_UNLOCK(c);
if ( what & EV_TIMEOUT ) {
Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
"connid=%lu, timeout reached, destroying\n",
c->c_connid );
CONNECTION_DESTROY(c);
/* Make sure the connection stays around for us to unlock it */
epoch = epoch_join();
CONNECTION_LOCK_DESTROY(c);
epoch_leave( epoch );
return;
}
CONNECTION_UNLOCK_INCREF(c);
/* Before we acquire any locks */
event_del( c->c_write_event );
if ( !acquire_ref( &c->c_refcnt ) ) {
return;
}
epoch = epoch_join();
ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
"have something to write to connection connid=%lu\n",
@ -285,7 +312,7 @@ connection_write_cb( evutil_socket_t s, short what, void *arg )
"ber_flush on fd=%d failed errno=%d (%s)\n",
c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
CONNECTION_LOCK_DESTROY(c);
return;
goto done;
}
event_add( c->c_write_event, lload_write_timeout );
} else {
@ -293,8 +320,9 @@ connection_write_cb( evutil_socket_t s, short what, void *arg )
}
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
CONNECTION_LOCK_DECREF(c);
CONNECTION_UNLOCK_OR_DESTROY(c);
done:
RELEASE_REF( c, c_refcnt, c->c_destroy );
epoch_leave( epoch );
}
void
@ -356,85 +384,54 @@ connections_walk_last(
CONNCB cb,
void *arg )
{
LloadConnection *c, *old;
unsigned long last_connid;
LloadConnection *c = cq_last;
uintptr_t last_connid;
if ( LDAP_CIRCLEQ_EMPTY( cq ) ) {
return;
}
last_connid = cq_last->c_connid;
c = LDAP_CIRCLEQ_LOOP_NEXT( cq, cq_last, c_next );
assert( c->c_connid <= last_connid );
last_connid = c->c_connid;
c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
CONNECTION_LOCK(c);
ldap_pvt_thread_mutex_unlock( cq_mutex );
while ( !acquire_ref( &c->c_refcnt ) ) {
c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
if ( c->c_connid >= last_connid ) {
return;
}
}
/*
* Ugh... concurrency is annoying:
* Notes:
* - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid
* order
* - the connection with the highest c_connid is maintained at cq_last
* - the connection with the highest c_connid is passed in cq_last
* - we can only use cq when we hold cq_mutex
* - connections might be added to or removed from cq while we're busy
* processing connections
* - connection_destroy touches cq
* - we can't even hold locks of two different connections
* - we need a way to detect we've finished looping around cq for some
* definition of looping around
*
* So as a result, 90% of the code below is spent navigating that...
*/
while ( c->c_connid <= last_connid ) {
/* Do not permit the callback to actually free the connection even if
* it wants to, we need it to traverse cq */
c->c_refcnt++;
if ( cb( c, arg ) ) {
c->c_refcnt--;
break;
}
c->c_refcnt--;
do {
int rc;
if ( c->c_connid == last_connid ) {
break;
}
CONNECTION_UNLOCK_INCREF(c);
ldap_pvt_thread_mutex_lock( cq_mutex );
old = c;
retry:
c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
if ( c->c_connid <= old->c_connid ) {
ldap_pvt_thread_mutex_unlock( cq_mutex );
CONNECTION_LOCK_DECREF(old);
CONNECTION_UNLOCK_OR_DESTROY(old);
rc = cb( c, arg );
RELEASE_REF( c, c_refcnt, c->c_destroy );
ldap_pvt_thread_mutex_lock( cq_mutex );
if ( rc || LDAP_CIRCLEQ_EMPTY( cq ) ) {
break;
}
do {
LloadConnection *old = c;
c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
if ( c->c_connid <= old->c_connid || c->c_connid > last_connid ) {
return;
}
CONNECTION_LOCK(c);
assert( c->c_state != LLOAD_C_DYING );
if ( c->c_state == LLOAD_C_INVALID ) {
/* This dying connection will be unlinked once we release cq_mutex
* and it wouldn't be safe to iterate further, skip over it */
CONNECTION_UNLOCK(c);
goto retry;
}
CONNECTION_UNLOCK_INCREF(c);
ldap_pvt_thread_mutex_unlock( cq_mutex );
CONNECTION_LOCK_DECREF(old);
CONNECTION_UNLOCK_OR_DESTROY(old);
CONNECTION_LOCK_DECREF(c);
assert( c->c_state != LLOAD_C_DYING );
assert( c->c_state != LLOAD_C_INVALID );
}
CONNECTION_UNLOCK_OR_DESTROY(c);
ldap_pvt_thread_mutex_lock( cq_mutex );
} while ( !acquire_ref( &c->c_refcnt ) );
} while ( c->c_connid <= last_connid );
}
void
@ -448,44 +445,44 @@ connections_walk(
return connections_walk_last( cq_mutex, cq, cq_last, cb, arg );
}
/*
* Caller is expected to hold the lock.
*/
int
lload_connection_close( LloadConnection *c, void *arg )
{
TAvlnode *node;
int gentle = *(int *)arg;
LloadOperation *op;
if ( !c->c_live ) {
return LDAP_SUCCESS;
}
Debug( LDAP_DEBUG_CONNS, "lload_connection_close: "
"marking connection connid=%lu closing\n",
c->c_connid );
/* We were approached from the connection list */
assert( IS_ALIVE( c, c_refcnt ) );
if ( !gentle ) {
/* Caller has a reference on this connection,
* it doesn't actually die here */
CONNECTION_DESTROY(c);
assert( c );
CONNECTION_LOCK(c);
if ( !gentle || !c->c_ops ) {
CONNECTION_DESTROY(c);
return LDAP_SUCCESS;
}
/* The first thing we do is make sure we don't get new Operations in */
c->c_state = LLOAD_C_CLOSING;
for ( node = tavl_end( c->c_ops, TAVL_DIR_LEFT ); node;
node = tavl_next( node, TAVL_DIR_RIGHT ) ) {
LloadOperation *op = node->avl_data;
do {
TAvlnode *node = tavl_end( c->c_ops, TAVL_DIR_LEFT );
op = node->avl_data;
if ( op->o_client_msgid == 0 ) {
if ( op->o_client == c ) {
operation_destroy_from_client( op );
} else {
assert( op->o_upstream == c );
operation_destroy_from_upstream( op );
}
}
/* Close operations that would need client action to resolve,
* only SASL binds in progress do that right now */
if ( op->o_client_msgid || op->o_upstream_msgid ) {
break;
}
CONNECTION_UNLOCK(c);
operation_unlink( op );
CONNECTION_LOCK(c);
} while ( c->c_ops );
CONNECTION_UNLOCK(c);
return LDAP_SUCCESS;
}
@ -550,7 +547,6 @@ lload_connection_init( ber_socket_t s, const char *peername, int flags )
"connection connid=%lu allocated for socket fd=%d peername=%s\n",
c->c_connid, s, peername );
CONNECTION_LOCK(c);
c->c_state = LLOAD_C_ACTIVE;
return c;

View file

@ -759,6 +759,7 @@ lloadd_listeners_init( const char *urls )
int
lloadd_daemon_destroy( void )
{
epoch_shutdown();
if ( lloadd_inited ) {
int i;
@ -1674,8 +1675,7 @@ lload_handle_global_invalidation( LloadChange *change )
LloadConnection *next =
LDAP_CIRCLEQ_LOOP_NEXT( &clients, c, c_next );
if ( c->c_is_tls ) {
CONNECTION_LOCK(c);
CONNECTION_DESTROY(c);
CONNECTION_LOCK_DESTROY(c);
assert( c == NULL );
}
c = next;

228
servers/lloadd/epoch.c Normal file
View file

@ -0,0 +1,228 @@
/* epoch.c - epoch based memory reclamation */
/* $OpenLDAP$ */
/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
*
* Copyright 2018-2020 The OpenLDAP Foundation.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted only as authorized by the OpenLDAP
* Public License.
*
* A copy of this license is available in the file LICENSE in the
* top-level directory of the distribution or, alternatively, at
* <http://www.OpenLDAP.org/license.html>.
*/
/** @file epoch.c
*
* Implementation of epoch based memory reclamation, in principle
* similar to the algorithm presented in
* https://www.cl.cam.ac.uk/techreports/UCAM-CL-TR-579.pdf
*
* Not completely lock-free at the moment.
*
* Also the problems with epoch based memory reclamation are still
* present - a thread actively observing an epoch getting stuck will
* prevent managed objects (in our case connections and operations)
* from being freed, potentially running out of memory.
*/
#include "portable.h"
#include "lload.h"
#include <epoch.h>
/* Has to be >= 3 */
#define EPOCH_MASK ( 1 << 2 )
#define EPOCH_PREV(epoch) ( ( (epoch) + EPOCH_MASK - 1 ) % EPOCH_MASK )
#define EPOCH_NEXT(epoch) ( ( (epoch) + 1 ) % EPOCH_MASK )
struct pending_ref {
void *object;
dispose_cb *dispose;
struct pending_ref *next;
};
ldap_pvt_thread_rdwr_t epoch_mutex;
static epoch_t current_epoch;
static uintptr_t epoch_threads[EPOCH_MASK];
static struct pending_ref *references[EPOCH_MASK];
void
epoch_init( void )
{
epoch_t epoch;
current_epoch = 0;
for ( epoch = 0; epoch < EPOCH_MASK; epoch++ ) {
assert( !epoch_threads[epoch] );
assert( !references[epoch] );
}
ldap_pvt_thread_rdwr_init( &epoch_mutex );
}
void
epoch_shutdown( void )
{
epoch_t epoch;
struct pending_ref *old, *next;
for ( epoch = 0; epoch < EPOCH_MASK; epoch++ ) {
assert( !epoch_threads[epoch] );
}
/* Free pending references */
epoch = EPOCH_PREV(current_epoch);
next = references[epoch];
references[epoch] = NULL;
for ( old = next; old; old = next ) {
next = old->next;
old->dispose( old->object );
ch_free( old );
}
epoch = current_epoch;
next = references[epoch];
references[epoch] = NULL;
for ( old = next; old; old = next ) {
next = old->next;
old->dispose( old->object );
ch_free( old );
}
/* No references should exist anywhere now */
for ( epoch = 0; epoch < EPOCH_MASK; epoch++ ) {
assert( !references[epoch] );
}
ldap_pvt_thread_rdwr_destroy( &epoch_mutex );
}
epoch_t
epoch_join( void )
{
epoch_t epoch;
struct pending_ref *old, *ref = NULL;
/* TODO: make this completely lock-free */
ldap_pvt_thread_rdwr_rlock( &epoch_mutex );
epoch = current_epoch;
__atomic_add_fetch( &epoch_threads[epoch], 1, __ATOMIC_ACQ_REL );
ldap_pvt_thread_rdwr_runlock( &epoch_mutex );
if ( __atomic_load_n(
&epoch_threads[EPOCH_PREV(epoch)], __ATOMIC_ACQUIRE ) ) {
return epoch;
}
__atomic_exchange(
&references[EPOCH_PREV(epoch)], &ref, &ref, __ATOMIC_ACQ_REL );
Debug( LDAP_DEBUG_TRACE, "epoch_join: "
"advancing epoch to %zu with %s objects to free\n",
EPOCH_NEXT(epoch), ref ? "some" : "no" );
ldap_pvt_thread_rdwr_wlock( &epoch_mutex );
current_epoch = EPOCH_NEXT(epoch);
ldap_pvt_thread_rdwr_wunlock( &epoch_mutex );
for ( old = ref; old; old = ref ) {
ref = old->next;
old->dispose( old->object );
ch_free( old );
}
return epoch;
}
void
epoch_leave( epoch_t epoch )
{
__atomic_sub_fetch( &epoch_threads[epoch], 1, __ATOMIC_ACQ_REL );
}
/*
* Add the object to the "current global epoch", not the epoch our thread
* entered.
*/
void
epoch_append( void *ptr, dispose_cb *cb )
{
struct pending_ref *new;
epoch_t epoch = __atomic_load_n( &current_epoch, __ATOMIC_ACQUIRE );
/*
* BTW, the following is not appropriate here:
* assert( __atomic_load_n( &epoch_threads[epoch], __ATOMIC_RELAXED ) );
*
* We might be a thread lagging behind in the "previous epoch" with no
* other threads executing at all.
*/
new = ch_malloc( sizeof(struct pending_ref) );
new->object = ptr;
new->dispose = cb;
new->next = __atomic_load_n( &references[epoch], __ATOMIC_ACQUIRE );
while ( !__atomic_compare_exchange( &references[epoch], &new->next, &new, 0,
__ATOMIC_RELEASE, __ATOMIC_RELAXED ) )
/* iterate until we succeed */;
}
int
acquire_ref( uintptr_t *refp )
{
uintptr_t refcnt, new_refcnt;
refcnt = __atomic_load_n( refp, __ATOMIC_ACQUIRE );
/*
* If we just incremented the refcnt and checked for zero after, another
* thread might falsely believe the object was going to stick around.
*
* Checking whether the object is still dead at disposal time might not be
* able to distinguish it from being freed in a later epoch.
*/
do {
if ( !refcnt ) {
return refcnt;
}
new_refcnt = refcnt + 1;
} while ( !__atomic_compare_exchange( refp, &refcnt, &new_refcnt, 0,
__ATOMIC_RELEASE, __ATOMIC_RELAXED ) );
assert( new_refcnt == refcnt + 1 );
return refcnt;
}
int
try_release_ref( uintptr_t *refp, void *object, dispose_cb *cb )
{
uintptr_t refcnt, new_refcnt;
refcnt = __atomic_load_n( refp, __ATOMIC_ACQUIRE );
/* We promise the caller that we won't decrease refcnt below 0 */
do {
if ( !refcnt ) {
return refcnt;
}
new_refcnt = refcnt - 1;
} while ( !__atomic_compare_exchange( refp, &refcnt, &new_refcnt, 0,
__ATOMIC_RELEASE, __ATOMIC_RELAXED ) );
assert( new_refcnt == refcnt - 1 );
if ( !new_refcnt ) {
epoch_append( object, cb );
}
return refcnt;
}

143
servers/lloadd/epoch.h Normal file
View file

@ -0,0 +1,143 @@
/* epoch.h - epoch based memory reclamation */
/* $OpenLDAP$ */
/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
*
* Copyright 2018-2020 The OpenLDAP Foundation.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted only as authorized by the OpenLDAP
* Public License.
*
* A copy of this license is available in the file LICENSE in the
* top-level directory of the distribution or, alternatively, at
* <http://www.OpenLDAP.org/license.html>.
*/
#ifndef __LLOAD_EPOCH_H
#define __LLOAD_EPOCH_H
/** @file epoch.h
*
* Implementation of epoch based memory reclamation, in principle
* similar to the algorithm presented in
* https://www.cl.cam.ac.uk/techreports/UCAM-CL-TR-579.pdf
*/
typedef uintptr_t epoch_t;
/** @brief A callback function used to free object and associated data */
typedef void (dispose_cb)( void *object );
/** @brief Initiate global state */
void epoch_init( void );
/** @brief Finalise global state and free any objects still pending */
void epoch_shutdown( void );
/** @brief Register thread as active
*
* In order to safely access managed objects, a thread should call
* this function or make sure no other thread is running (e.g. config
* pause, late shutdown). After calling this, it is guaranteed that no
* reachable objects will be freed before all threads have called
* `epoch_leave( current_epoch + 1 )` so it is essential that there
* is an upper limit to the amount of time between #epoch_join and
* corresponding #epoch_leave or the number of unfreed objects might
* grow without bounds.
*
* To simplify locking, memory is only freed when the current epoch
* is advanced rather than on leaving it.
*
* Can be safely called multiple times by the same thread as long as
* a matching #epoch_leave() call is made eventually.
*
* @return The observed epoch, to be passed to #epoch_leave()
*/
epoch_t epoch_join( void );
/** @brief Register thread as inactive
*
* A thread should call this after they are finished with work
* performed since matching call to #epoch_join(). It is not safe
* to keep a local reference to managed objects after this call
* unless other precautions have been made to prevent it being
* released.
*
* @param[in] epoch Epoch identifier returned by a previous call to
* #epoch_join().
*/
void epoch_leave( epoch_t epoch );
/** @brief Return an unreachable object to be freed
*
* The object should already be unreachable at the point of call and
* cb will be invoked when no other thread that could have seen it
* is active any more. This happens when we have advanced by two
* epochs.
*
* @param[in] ptr Object to be released/freed
* @param[in] cb Callback to invoke when safe to do so
*/
void epoch_append( void *ptr, dispose_cb *cb );
/**
* \defgroup Reference counting helpers
*/
/**@{*/
/** @brief Acquire a reference if possible
*
* Atomically, check reference count is non-zero and increment if so.
* Returns old reference count.
*
* @param[in] refp Pointer to a reference counter
* @return 0 if reference was already zero, non-zero if reference
* count was successfully incremented
*/
int acquire_ref( uintptr_t *refp );
/** @brief Check reference count and try to decrement
*
* Atomically, decrement reference count if non-zero and register
* object if decremented to zero. Returning previous reference count.
*
* @param[in] refp Pointer to a reference counter
* @param[in] object The managed object
* @param[in] cb Callback to invoke when safe to do so
* @return 0 if reference was already zero, non-zero if reference
* count was non-zero at the time of call
*/
int try_release_ref( uintptr_t *refp, void *object, dispose_cb *cb );
/** @brief Read reference count
*
* @param[in] object Pointer to the managed object
* @param[in] ref_field Member where reference count is stored in
* the object
* @return Current value of reference counter
*/
#define IS_ALIVE( object, ref_field ) \
__atomic_load_n( &(object)->ref_field, __ATOMIC_ACQUIRE )
/** @brief Release reference
*
* A cheaper alternative to #try_release_ref(), safe only when we know
* reference count was already non-zero.
*
* @param[in] object The managed object
* @param[in] ref_field Member where reference count is stored in
* the object
* @param[in] cb Callback to invoke when safe to do so
*/
#define RELEASE_REF( object, ref_field, cb ) \
do { \
if ( !__atomic_sub_fetch( \
&(object)->ref_field, 1, __ATOMIC_ACQ_REL ) ) { \
epoch_append( object, (dispose_cb *)cb ); \
} \
} while (0)
/**@}*/
#endif /* __LLOAD_EPOCH_H */

View file

@ -36,6 +36,7 @@ handle_starttls( LloadConnection *c, LloadOperation *op )
char *msg = NULL;
int rc = LDAP_SUCCESS;
CONNECTION_LOCK(c);
tavl_delete( &c->c_ops, op, operation_client_cmp );
if ( c->c_is_tls == LLOAD_TLS_ESTABLISHED ) {
@ -51,6 +52,7 @@ handle_starttls( LloadConnection *c, LloadOperation *op )
rc = LDAP_UNAVAILABLE;
msg = "Could not initialize TLS";
}
CONNECTION_UNLOCK(c);
Debug( LDAP_DEBUG_STATS, "handle_starttls: "
"handling StartTLS exop connid=%lu rc=%d msg=%s\n",
@ -58,11 +60,10 @@ handle_starttls( LloadConnection *c, LloadOperation *op )
if ( rc ) {
/* We've already removed the operation from the queue */
return operation_send_reject_locked( op, rc, msg, 1 );
operation_send_reject( op, rc, msg, 1 );
return LDAP_SUCCESS;
}
CONNECTION_UNLOCK_INCREF(c);
event_del( c->c_read_event );
event_del( c->c_write_event );
/*
@ -77,9 +78,8 @@ handle_starttls( LloadConnection *c, LloadOperation *op )
output = c->c_pendingber;
if ( output == NULL && (output = ber_alloc()) == NULL ) {
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
CONNECTION_LOCK_DECREF(c);
operation_destroy_from_client( op );
CONNECTION_DESTROY(c);
operation_unlink( op );
CONNECTION_LOCK_DESTROY(c);
return -1;
}
c->c_pendingber = output;
@ -88,7 +88,7 @@ handle_starttls( LloadConnection *c, LloadOperation *op )
LDAP_RES_EXTENDED, LDAP_SUCCESS, "", "" );
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
CONNECTION_LOCK_DECREF(c);
CONNECTION_LOCK(c);
c->c_read_timeout = lload_timeout_net;
event_assign( c->c_read_event, base, c->c_fd, EV_READ|EV_PERSIST,
client_tls_handshake_cb, c );
@ -100,8 +100,9 @@ handle_starttls( LloadConnection *c, LloadOperation *op )
event_add( c->c_write_event, lload_write_timeout );
op->o_res = LLOAD_OP_COMPLETED;
operation_destroy_from_client( op );
CONNECTION_UNLOCK_INCREF(c);
CONNECTION_UNLOCK(c);
operation_unlink( op );
return -1;
}
@ -115,10 +116,8 @@ request_extended( LloadConnection *c, LloadOperation *op )
ber_tag_t tag;
if ( (copy = ber_alloc()) == NULL ) {
if ( operation_send_reject_locked(
op, LDAP_OTHER, "internal error", 0 ) == LDAP_SUCCESS ) {
CONNECTION_DESTROY(c);
}
operation_send_reject( op, LDAP_OTHER, "internal error", 0 );
CONNECTION_LOCK_DESTROY(c);
return -1;
}
@ -128,8 +127,9 @@ request_extended( LloadConnection *c, LloadOperation *op )
if ( tag != LDAP_TAG_EXOP_REQ_OID ) {
Debug( LDAP_DEBUG_STATS, "request_extended: "
"no OID present in extended request\n" );
return operation_send_reject_locked(
op, LDAP_PROTOCOL_ERROR, "decoding error", 0 );
operation_send_reject( op, LDAP_PROTOCOL_ERROR, "decoding error", 0 );
CONNECTION_LOCK_DESTROY(c);
return -1;
}
needle.oid = bv;
@ -145,8 +145,8 @@ request_extended( LloadConnection *c, LloadOperation *op )
ber_free( copy, 0 );
if ( c->c_state == LLOAD_C_BINDING ) {
return operation_send_reject_locked(
op, LDAP_PROTOCOL_ERROR, "bind in progress", 0 );
operation_send_reject( op, LDAP_PROTOCOL_ERROR, "bind in progress", 0 );
return LDAP_SUCCESS;
}
return request_process( c, op );
}

View file

@ -84,6 +84,8 @@ LDAP_BEGIN_DECL
#define BER_BV_OPTIONAL( bv ) ( BER_BVISNULL( bv ) ? NULL : ( bv ) )
#include <epoch.h>
typedef struct LloadBackend LloadBackend;
typedef struct LloadPendingConnection LloadPendingConnection;
typedef struct LloadConnection LloadConnection;
@ -280,58 +282,39 @@ struct LloadConnection {
* - also a liveness/validity token is added to c_refcnt during
* lload_connection_init, its existence is tracked in c_live and is usually the
* only one that prevents it from being destroyed
* - anyone who needs to be able to lock the connection after unlocking it has
* to use CONNECTION_UNLOCK_INCREF, they are then responsible that
* CONNECTION_LOCK_DECREF+CONNECTION_UNLOCK_OR_DESTROY is used when they are
* done with it
* - anyone who needs to be able to relock the connection after unlocking it has
* to use acquire_ref(), they need to make sure a matching
* RELEASE_REF( c, c_refcnt, c->c_destroy ); is run eventually
* - when a connection is considered dead, use CONNECTION_DESTROY on a locked
* connection, it might get disposed of or if anyone still holds a token, it
* just gets unlocked and it's the last token holder's responsibility to run
* CONNECTION_UNLOCK_OR_DESTROY
* - CONNECTION_LOCK_DESTROY is a shorthand for locking, decreasing refcount
* and CONNECTION_DESTROY
* connection, it will be made unreachable from normal places and either
* scheduled for reclamation when safe to do so or if anyone still holds a
* reference, it just gets unlocked and reclaimed after the last ref is
* released
* - CONNECTION_LOCK_DESTROY is a shorthand for locking and CONNECTION_DESTROY
*/
ldap_pvt_thread_mutex_t c_mutex; /* protect the connection */
int c_refcnt, c_live;
uintptr_t c_refcnt, c_live;
CONNECTION_DESTROY_CB c_unlink;
CONNECTION_DESTROY_CB c_destroy;
CONNECTION_PDU_CB c_pdu_cb;
#define CONNECTION_LOCK(c) ldap_pvt_thread_mutex_lock( &(c)->c_mutex )
#define CONNECTION_UNLOCK(c) ldap_pvt_thread_mutex_unlock( &(c)->c_mutex )
#define CONNECTION_LOCK_DECREF(c) \
#define CONNECTION_UNLINK_(c) \
do { \
CONNECTION_LOCK(c); \
(c)->c_refcnt--; \
} while (0)
#define CONNECTION_UNLOCK_INCREF(c) \
do { \
(c)->c_refcnt++; \
CONNECTION_UNLOCK(c); \
} while (0)
#define CONNECTION_UNLOCK_OR_DESTROY(c) \
do { \
assert( (c)->c_refcnt >= 0 ); \
if ( (c)->c_state == LLOAD_C_CLOSING && !( c )->c_ops ) { \
(c)->c_refcnt -= (c)->c_live; \
if ( (c)->c_live ) { \
(c)->c_live = 0; \
} \
if ( !( c )->c_refcnt ) { \
Debug( LDAP_DEBUG_TRACE, "%s: destroying connection connid=%lu\n", \
__func__, (c)->c_connid ); \
(c)->c_destroy( (c) ); \
(c) = NULL; \
} else { \
CONNECTION_UNLOCK(c); \
RELEASE_REF( (c), c_refcnt, c->c_destroy ); \
(c)->c_unlink( (c) ); \
} \
} while (0)
#define CONNECTION_DESTROY(c) \
do { \
(c)->c_refcnt -= (c)->c_live; \
(c)->c_live = 0; \
CONNECTION_UNLOCK_OR_DESTROY(c); \
CONNECTION_UNLINK_(c); \
CONNECTION_UNLOCK(c); \
} while (0)
#define CONNECTION_LOCK_DESTROY(c) \
do { \
CONNECTION_LOCK_DECREF(c); \
CONNECTION_LOCK(c); \
CONNECTION_DESTROY(c); \
} while (0);
@ -393,12 +376,13 @@ struct LloadConnection {
enum op_state {
LLOAD_OP_NOT_FREEING = 0,
LLOAD_OP_FREEING_UPSTREAM = 1 << 0,
LLOAD_OP_FREEING_CLIENT = 1 << 1,
LLOAD_OP_DETACHING_UPSTREAM = 1 << 2,
LLOAD_OP_DETACHING_CLIENT = 1 << 3,
LLOAD_OP_DETACHING_CLIENT = 1 << 1,
LLOAD_OP_DETACHING_UPSTREAM = 1 << 0,
};
#define LLOAD_OP_DETACHING_MASK \
( LLOAD_OP_DETACHING_UPSTREAM | LLOAD_OP_DETACHING_CLIENT )
/* operation result for monitoring purposes */
enum op_result {
LLOAD_OP_REJECTED, /* operation was not forwarded */
@ -406,32 +390,28 @@ enum op_result {
LLOAD_OP_FAILED, /* operation was forwarded, but no response was received */
};
#define LLOAD_OP_FREEING_MASK \
( LLOAD_OP_FREEING_UPSTREAM | LLOAD_OP_FREEING_CLIENT )
#define LLOAD_OP_DETACHING_MASK \
( LLOAD_OP_DETACHING_UPSTREAM | LLOAD_OP_DETACHING_CLIENT )
/*
* Operation reference tracking:
* - o_refcnt is set to 1, never incremented
* - operation_unlink sets it to 0 and on transition from 1 clears both
* connection links (o_client, o_upstream)
*/
struct LloadOperation {
uintptr_t o_refcnt;
LloadConnection *o_client;
unsigned long o_client_connid;
int o_client_live, o_client_refcnt;
ber_int_t o_client_msgid;
ber_int_t o_saved_msgid;
LloadConnection *o_upstream;
unsigned long o_upstream_connid;
int o_upstream_live, o_upstream_refcnt;
ber_int_t o_upstream_msgid;
time_t o_last_response;
/* Protects o_client, o_upstream pointers before we lock their c_mutex if
* we don't know they are still alive */
/* Protects o_client, o_upstream links */
ldap_pvt_thread_mutex_t o_link_mutex;
/* Protects o_freeing, can be locked while holding c_mutex */
ldap_pvt_thread_mutex_t o_mutex;
/* Consistent w.r.t. o_mutex, only written to while holding
* op->o_{client,upstream}->c_mutex */
enum op_state o_freeing;
ber_tag_t o_tag;
time_t o_start;
unsigned long o_pin_id;

View file

@ -424,6 +424,8 @@ main( int argc, char **argv )
}
#endif
epoch_init();
while ( (i = getopt( argc, argv,
"c:d:f:F:h:n:o:s:tV"
#ifdef LDAP_PF_INET6

View file

@ -85,6 +85,10 @@ lload_back_open( BackendInfo *bi )
return 0;
}
/* This will fail if we ever try to instantiate more than one lloadd within
* the process */
epoch_init();
if ( lload_tls_init() != 0 ) {
return -1;
}

View file

@ -116,363 +116,6 @@ operation_upstream_cmp( const void *left, const void *right )
}
}
/*
* Free the operation, subject to there being noone else holding a reference
* to it.
*
* Both operation_destroy_from_* functions are the same, two implementations
* exist to cater for the fact that either side (client or upstream) might
* decide to destroy it and each holds a different mutex.
*
* Due to the fact that we rely on mutexes on both connections which have a
* different timespan from the operation, we have to take the following race
* into account:
*
* Trigger
* - both operation_destroy_from_client and operation_destroy_from_upstream
* are called at the same time (each holding its mutex), several times
* before one of them finishes
* - either or both connections might have started the process of being
* destroyed
*
* We need to detect that the race has happened and only allow one of them to
* free the operation (we use o_freeing != 0 to announce+detect that).
*
* In case the caller was in the process of destroying the connection and the
* race had been won by the mirror caller, it will increment c_refcnt on its
* connection and make sure to postpone the final step in
* client/upstream_destroy(). Testing o_freeing for the mirror side's token
* allows the winner to detect that it has been a party to the race and a token
* in c_refcnt has been deposited on its behalf.
*
* Beware! This widget really touches all the mutexes we have and showcases the
* issues with maintaining so many mutex ordering restrictions.
*/
void
operation_destroy_from_client( LloadOperation *op )
{
LloadConnection *upstream = NULL, *client = op->o_client;
LloadBackend *b = NULL;
int race_state, detach_client = !client->c_live;
Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: "
"op=%p attempting to release operation%s\n",
op, detach_client ? " and detach client" : "" );
/* 1. liveness/refcnt adjustment and test */
op->o_client_refcnt -= op->o_client_live;
op->o_client_live = 0;
assert( op->o_client_refcnt <= client->c_refcnt );
if ( op->o_client_refcnt ) {
Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: "
"op=%p not dead yet\n",
op );
return;
}
/* 2. Remove from the operation map and TODO adjust the pending op count */
tavl_delete( &client->c_ops, op, operation_client_cmp );
/* 3. Detect whether we entered a race to free op and indicate that to any
* others */
ldap_pvt_thread_mutex_lock( &op->o_mutex );
race_state = op->o_freeing;
op->o_freeing |= LLOAD_OP_FREEING_CLIENT;
if ( detach_client ) {
op->o_freeing |= LLOAD_OP_DETACHING_CLIENT;
}
ldap_pvt_thread_mutex_unlock( &op->o_mutex );
CONNECTION_UNLOCK_INCREF(client);
if ( detach_client ) {
ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
op->o_client = NULL;
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
}
/* 4. If we lost the race, deal with it straight away */
if ( race_state ) {
/*
* We have raced to destroy op and the first one to lose on this side,
* leave a refcnt token on client so we don't destroy it before the
* other side has finished (it knows we did that when it examines
* o_freeing again).
*/
if ( detach_client ) {
Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: "
"op=%p lost race but client connid=%lu is going down\n",
op, client->c_connid );
CONNECTION_LOCK_DECREF(client);
} else if ( (race_state & LLOAD_OP_FREEING_MASK) ==
LLOAD_OP_FREEING_UPSTREAM ) {
Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: "
"op=%p lost race, increased client refcnt connid=%lu "
"to refcnt=%d\n",
op, client->c_connid, client->c_refcnt );
CONNECTION_LOCK(client);
} else {
Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: "
"op=%p lost race with another "
"operation_destroy_from_client, "
"client connid=%lu\n",
op, client->c_connid );
CONNECTION_LOCK_DECREF(client);
}
return;
}
/* it seems we will be destroying the operation,
* so update the global rejected cunter if needed */
operation_update_global_rejected( op );
/* 5. If we raced the upstream side and won, reclaim the token */
ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
if ( !(race_state & LLOAD_OP_DETACHING_UPSTREAM) ) {
upstream = op->o_upstream;
if ( upstream ) {
CONNECTION_LOCK(upstream);
}
}
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
ldap_pvt_thread_mutex_lock( &op->o_mutex );
/* We don't actually resolve the race in full until we grab the other's
* c_mutex+op->o_mutex here */
if ( upstream && ( op->o_freeing & LLOAD_OP_FREEING_UPSTREAM ) ) {
if ( op->o_freeing & LLOAD_OP_DETACHING_UPSTREAM ) {
CONNECTION_UNLOCK(upstream);
upstream = NULL;
} else {
/*
* We have raced to destroy op and won. To avoid freeing the connection
* under us, a refcnt token has been left over for us on the upstream,
* decref and see whether we are in charge of freeing it
*/
upstream->c_refcnt--;
Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: "
"op=%p other side lost race with us, upstream connid=%lu\n",
op, upstream->c_connid );
}
}
ldap_pvt_thread_mutex_unlock( &op->o_mutex );
/* 6. liveness/refcnt adjustment and test */
op->o_upstream_refcnt -= op->o_upstream_live;
op->o_upstream_live = 0;
if ( op->o_upstream_refcnt ) {
Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: "
"op=%p other side still alive, refcnt=%d\n",
op, op->o_upstream_refcnt );
/* There must have been no race if op is still alive */
ldap_pvt_thread_mutex_lock( &op->o_mutex );
op->o_freeing &= ~LLOAD_OP_FREEING_CLIENT;
if ( detach_client ) {
op->o_freeing &= ~LLOAD_OP_DETACHING_CLIENT;
}
assert( op->o_freeing == 0 );
ldap_pvt_thread_mutex_unlock( &op->o_mutex );
assert( upstream != NULL );
CONNECTION_UNLOCK_OR_DESTROY(upstream);
CONNECTION_LOCK_DECREF(client);
return;
}
/* 7. Remove from the operation map and adjust the pending op count */
if ( upstream ) {
if ( tavl_delete( &upstream->c_ops, op, operation_upstream_cmp ) ) {
upstream->c_n_ops_executing--;
operation_update_conn_counters( op );
b = (LloadBackend *)upstream->c_private;
}
CONNECTION_UNLOCK_OR_DESTROY(upstream);
if ( b ) {
ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_n_ops_executing--;
operation_update_backend_counters( op, b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
}
/* 8. Release the operation */
Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: "
"op=%p destroyed operation from client connid=%lu, "
"client msgid=%d\n",
op, op->o_client_connid, op->o_client_msgid );
ber_free( op->o_ber, 1 );
ldap_pvt_thread_mutex_destroy( &op->o_mutex );
ldap_pvt_thread_mutex_destroy( &op->o_link_mutex );
ch_free( op );
CONNECTION_LOCK_DECREF(client);
}
/*
* See operation_destroy_from_client.
*/
void
operation_destroy_from_upstream( LloadOperation *op )
{
LloadConnection *client = NULL, *upstream = op->o_upstream;
LloadBackend *b = NULL;
int race_state, detach_upstream = !upstream->c_live;
Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: "
"op=%p attempting to release operation%s\n",
op, detach_upstream ? " and detach upstream" : "" );
/* 1. liveness/refcnt adjustment and test */
op->o_upstream_refcnt -= op->o_upstream_live;
op->o_upstream_live = 0;
assert( op->o_upstream_refcnt <= upstream->c_refcnt );
if ( op->o_upstream_refcnt ) {
Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: "
"op=%p not dead yet\n",
op );
return;
}
/* it seems we will be destroying the operation,
* so update the global rejected cunter if needed */
operation_update_global_rejected( op );
/* 2. Remove from the operation map and adjust the pending op count */
if ( tavl_delete( &upstream->c_ops, op, operation_upstream_cmp ) ) {
upstream->c_n_ops_executing--;
operation_update_conn_counters( op );
b = (LloadBackend *)upstream->c_private;
}
ldap_pvt_thread_mutex_lock( &op->o_mutex );
race_state = op->o_freeing;
op->o_freeing |= LLOAD_OP_FREEING_UPSTREAM;
if ( detach_upstream ) {
op->o_freeing |= LLOAD_OP_DETACHING_UPSTREAM;
}
ldap_pvt_thread_mutex_unlock( &op->o_mutex );
CONNECTION_UNLOCK_INCREF(upstream);
/* 3. Detect whether we entered a race to free op */
ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
if ( detach_upstream ) {
op->o_upstream = NULL;
}
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
if ( b ) {
ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_n_ops_executing--;
operation_update_backend_counters( op, b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
/* 4. If we lost the race, deal with it straight away */
if ( race_state ) {
/*
* We have raced to destroy op and the first one to lose on this side,
* leave a refcnt token on upstream so we don't destroy it before the
* other side has finished (it knows we did that when it examines
* o_freeing again).
*/
if ( detach_upstream ) {
Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: "
"op=%p lost race but upstream connid=%lu is going down\n",
op, upstream->c_connid );
CONNECTION_LOCK_DECREF(upstream);
} else if ( (race_state & LLOAD_OP_FREEING_MASK) ==
LLOAD_OP_FREEING_CLIENT ) {
Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: "
"op=%p lost race, increased upstream refcnt connid=%lu "
"to refcnt=%d\n",
op, upstream->c_connid, upstream->c_refcnt );
CONNECTION_LOCK(upstream);
} else {
Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: "
"op=%p lost race with another "
"operation_destroy_from_upstream, "
"upstream connid=%lu\n",
op, upstream->c_connid );
CONNECTION_LOCK_DECREF(upstream);
}
return;
}
/* 5. If we raced the client side and won, reclaim the token */
ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
if ( !(race_state & LLOAD_OP_DETACHING_CLIENT) ) {
client = op->o_client;
if ( client ) {
CONNECTION_LOCK(client);
}
}
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
/* We don't actually resolve the race in full until we grab the other's
* c_mutex+op->o_mutex here */
ldap_pvt_thread_mutex_lock( &op->o_mutex );
if ( client && ( op->o_freeing & LLOAD_OP_FREEING_CLIENT ) ) {
if ( op->o_freeing & LLOAD_OP_DETACHING_CLIENT ) {
CONNECTION_UNLOCK(client);
client = NULL;
} else {
/*
* We have raced to destroy op and won. To avoid freeing the connection
* under us, a refcnt token has been left over for us on the client,
* decref and see whether we are in charge of freeing it
*/
client->c_refcnt--;
Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: "
"op=%p other side lost race with us, client connid=%lu\n",
op, client->c_connid );
}
}
ldap_pvt_thread_mutex_unlock( &op->o_mutex );
/* 6. liveness/refcnt adjustment and test */
op->o_client_refcnt -= op->o_client_live;
op->o_client_live = 0;
if ( op->o_client_refcnt ) {
Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: "
"op=%p other side still alive, refcnt=%d\n",
op, op->o_client_refcnt );
/* There must have been no race if op is still alive */
ldap_pvt_thread_mutex_lock( &op->o_mutex );
op->o_freeing &= ~LLOAD_OP_FREEING_UPSTREAM;
if ( detach_upstream ) {
op->o_freeing &= ~LLOAD_OP_DETACHING_UPSTREAM;
}
assert( op->o_freeing == 0 );
ldap_pvt_thread_mutex_unlock( &op->o_mutex );
assert( client != NULL );
CONNECTION_UNLOCK_OR_DESTROY(client);
CONNECTION_LOCK_DECREF(upstream);
return;
}
/* 7. Remove from the operation map and TODO adjust the pending op count */
if ( client ) {
tavl_delete( &client->c_ops, op, operation_client_cmp );
CONNECTION_UNLOCK_OR_DESTROY(client);
}
/* 8. Release the operation */
Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: "
"op=%p destroyed operation from client connid=%lu, "
"client msgid=%d\n",
op, op->o_client_connid, op->o_client_msgid );
ber_free( op->o_ber, 1 );
ldap_pvt_thread_mutex_destroy( &op->o_mutex );
ldap_pvt_thread_mutex_destroy( &op->o_link_mutex );
ch_free( op );
CONNECTION_LOCK_DECREF(upstream);
}
/*
* Entered holding c_mutex for now.
*/
@ -490,11 +133,9 @@ operation_init( LloadConnection *c, BerElement *ber )
op->o_ber = ber;
op->o_start = slap_get_time();
ldap_pvt_thread_mutex_init( &op->o_mutex );
ldap_pvt_thread_mutex_init( &op->o_link_mutex );
op->o_client_live = op->o_client_refcnt = 1;
op->o_upstream_live = op->o_upstream_refcnt = 1;
op->o_refcnt = 1;
tag = ber_get_int( ber, &op->o_client_msgid );
if ( tag != LDAP_TAG_MSGID ) {
@ -549,13 +190,163 @@ fail:
return NULL;
}
int
operation_send_abandon( LloadOperation *op )
void
operation_destroy( LloadOperation *op )
{
Debug( LDAP_DEBUG_TRACE, "operation_destroy: "
"op=%p destroyed operation from client connid=%lu, "
"client msgid=%d\n",
op, op->o_client_connid, op->o_client_msgid );
assert( op->o_refcnt == 0 );
assert( op->o_client == NULL );
assert( op->o_upstream == NULL );
ber_free( op->o_ber, 1 );
ldap_pvt_thread_mutex_destroy( &op->o_link_mutex );
ch_free( op );
}
int
operation_unlink( LloadOperation *op )
{
LloadConnection *client, *upstream;
uintptr_t prev_refcnt;
int result = 0;
if ( !( prev_refcnt = try_release_ref(
&op->o_refcnt, op, (dispose_cb *)operation_destroy ) ) ) {
return result;
}
assert( prev_refcnt == 1 );
Debug( LDAP_DEBUG_TRACE, "operation_unlink: "
"unlinking operation between client connid=%lu and upstream "
"connid=%lu "
"client msgid=%d\n",
op->o_client_connid, op->o_upstream_connid, op->o_client_msgid );
ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
client = op->o_client;
upstream = op->o_upstream;
op->o_client = NULL;
op->o_upstream = NULL;
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
assert( client || upstream );
if ( client ) {
result |= operation_unlink_client( op, client );
operation_update_global_rejected( op );
}
if ( upstream ) {
result |= operation_unlink_upstream( op, upstream );
}
return result;
}
int
operation_unlink_client( LloadOperation *op, LloadConnection *client )
{
LloadOperation *removed;
int result = 0;
Debug( LDAP_DEBUG_TRACE, "operation_unlink_client: "
"unlinking operation op=%p msgid=%d client connid=%lu\n",
op, op->o_client_msgid, op->o_client_connid );
CONNECTION_LOCK(client);
if ( (removed = tavl_delete(
&client->c_ops, op, operation_client_cmp )) ) {
result = LLOAD_OP_DETACHING_CLIENT;
assert( op == removed );
client->c_n_ops_executing--;
if ( client->c_state == LLOAD_C_BINDING ) {
client->c_state = LLOAD_C_READY;
if ( !BER_BVISNULL( &client->c_auth ) ) {
ber_memfree( client->c_auth.bv_val );
BER_BVZERO( &client->c_auth );
}
if ( !BER_BVISNULL( &client->c_sasl_bind_mech ) ) {
ber_memfree( client->c_sasl_bind_mech.bv_val );
BER_BVZERO( &client->c_sasl_bind_mech );
}
if ( op->o_pin_id ) {
client->c_pin_id = 0;
}
}
}
if ( client->c_state == LLOAD_C_CLOSING && !client->c_ops ) {
CONNECTION_DESTROY(client);
} else {
CONNECTION_UNLOCK(client);
}
return result;
}
int
operation_unlink_upstream( LloadOperation *op, LloadConnection *upstream )
{
LloadOperation *removed;
LloadBackend *b = NULL;
int result = 0;
Debug( LDAP_DEBUG_TRACE, "operation_unlink_upstream: "
"unlinking operation op=%p msgid=%d upstream connid=%lu\n",
op, op->o_upstream_msgid, op->o_upstream_connid );
CONNECTION_LOCK(upstream);
if ( (removed = tavl_delete(
&upstream->c_ops, op, operation_upstream_cmp )) ) {
result |= LLOAD_OP_DETACHING_UPSTREAM;
assert( op == removed );
upstream->c_n_ops_executing--;
if ( upstream->c_state == LLOAD_C_BINDING ) {
assert( op->o_tag == LDAP_REQ_BIND && upstream->c_ops == NULL );
upstream->c_state = LLOAD_C_READY;
if ( !BER_BVISNULL( &upstream->c_sasl_bind_mech ) ) {
ber_memfree( upstream->c_sasl_bind_mech.bv_val );
BER_BVZERO( &upstream->c_sasl_bind_mech );
}
}
operation_update_conn_counters( op, upstream );
b = (LloadBackend *)upstream->c_private;
}
if ( upstream->c_state == LLOAD_C_CLOSING && !upstream->c_ops ) {
CONNECTION_DESTROY(upstream);
} else {
CONNECTION_UNLOCK(upstream);
}
if ( b ) {
ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_n_ops_executing--;
operation_update_backend_counters( op, b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
return result;
}
int
operation_send_abandon( LloadOperation *op, LloadConnection *upstream )
{
LloadConnection *upstream = op->o_upstream;
BerElement *ber;
int rc = -1;
if ( !IS_ALIVE( upstream, c_refcnt ) ) {
return rc;
}
ldap_pvt_thread_mutex_lock( &upstream->c_io_mutex );
ber = upstream->c_pendingber;
if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
@ -605,114 +396,64 @@ void
operation_abandon( LloadOperation *op )
{
LloadConnection *c;
LloadBackend *b;
int rc = LDAP_SUCCESS;
ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
c = op->o_upstream;
if ( !c || !c->c_live ) {
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
if ( !c || !IS_ALIVE( c, c_refcnt ) ) {
goto done;
}
CONNECTION_LOCK(c);
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
/* for now consider all abandoned operations completed,
* perhaps add a separate counter later */
op->o_res = LLOAD_OP_COMPLETED;
if ( tavl_delete( &c->c_ops, op, operation_upstream_cmp ) == NULL ) {
if ( !operation_unlink_upstream( op, c ) ) {
/* The operation has already been abandoned or finished */
Debug( LDAP_DEBUG_TRACE, "operation_abandon: "
"%s from connid=%lu msgid=%d not present in connid=%lu any "
"more\n",
lload_msgtype2str( op->o_tag ), op->o_client_connid,
op->o_client_msgid, op->o_upstream_connid );
goto unlock;
goto done;
}
if ( c->c_state == LLOAD_C_BINDING ) {
c->c_state = LLOAD_C_READY;
if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) {
ber_memfree( c->c_sasl_bind_mech.bv_val );
BER_BVZERO( &c->c_sasl_bind_mech );
}
}
c->c_n_ops_executing--;
b = (LloadBackend *)c->c_private;
op->o_upstream_refcnt++;
CONNECTION_UNLOCK_INCREF(c);
ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_n_ops_executing--;
operation_update_backend_counters( op, b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
if ( operation_send_abandon( op ) == LDAP_SUCCESS ) {
if ( operation_send_abandon( op, c ) == LDAP_SUCCESS ) {
connection_write_cb( -1, 0, c );
}
CONNECTION_LOCK_DECREF(c);
op->o_upstream_refcnt--;
unlock:
if ( !c->c_live || !op->o_upstream_refcnt ) {
operation_destroy_from_upstream( op );
}
if ( rc ) {
CONNECTION_DESTROY(c);
} else {
CONNECTION_UNLOCK_OR_DESTROY(c);
}
done:
c = op->o_client;
assert( c );
/* Caller should hold a reference on client */
CONNECTION_LOCK(c);
if ( c->c_state == LLOAD_C_BINDING ) {
c->c_state = LLOAD_C_READY;
if ( !BER_BVISNULL( &c->c_auth ) ) {
ber_memfree( c->c_auth.bv_val );
BER_BVZERO( &c->c_auth );
}
if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) {
ber_memfree( c->c_sasl_bind_mech.bv_val );
BER_BVZERO( &c->c_sasl_bind_mech );
}
if ( op->o_pin_id ) {
c->c_pin_id = 0;
}
}
assert( op->o_client_refcnt > op->o_client_live );
op->o_client_refcnt--;
operation_destroy_from_client( op );
CONNECTION_UNLOCK(c);
operation_unlink( op );
}
/*
* Called with op->o_client non-NULL and already locked.
*/
int
operation_send_reject_locked(
void
operation_send_reject(
LloadOperation *op,
int result,
const char *msg,
int send_anyway )
{
LloadConnection *c = op->o_client;
LloadConnection *c;
BerElement *ber;
int found;
Debug( LDAP_DEBUG_TRACE, "operation_send_reject_locked: "
Debug( LDAP_DEBUG_TRACE, "operation_send_reject: "
"rejecting %s from client connid=%lu with message: \"%s\"\n",
lload_msgtype2str( op->o_tag ), c->c_connid, msg );
lload_msgtype2str( op->o_tag ), op->o_client_connid, msg );
found = ( tavl_delete( &c->c_ops, op, operation_client_cmp ) == op );
ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
c = op->o_client;
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
if ( !c || !IS_ALIVE( c, c_refcnt ) ) {
Debug( LDAP_DEBUG_TRACE, "operation_send_reject: "
"not sending msgid=%d, client connid=%lu is dead\n",
op->o_client_msgid, op->o_client_connid );
goto done;
}
found = operation_unlink_client( op, c );
if ( !found && !send_anyway ) {
Debug( LDAP_DEBUG_TRACE, "operation_send_reject_locked: "
Debug( LDAP_DEBUG_TRACE, "operation_send_reject: "
"msgid=%d not scheduled for client connid=%lu anymore, "
"not sending\n",
op->o_client_msgid, c->c_connid );
@ -721,25 +462,21 @@ operation_send_reject_locked(
if ( op->o_client_msgid == 0 ) {
assert( op->o_saved_msgid == 0 && op->o_pin_id );
Debug( LDAP_DEBUG_TRACE, "operation_send_reject_locked: "
Debug( LDAP_DEBUG_TRACE, "operation_send_reject: "
"operation pin=%lu is just a pin, not sending\n",
op->o_pin_id );
goto done;
}
CONNECTION_UNLOCK_INCREF(c);
ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
ber = c->c_pendingber;
if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
Debug( LDAP_DEBUG_ANY, "operation_send_reject_locked: "
Debug( LDAP_DEBUG_ANY, "operation_send_reject: "
"ber_alloc failed, closing connid=%lu\n",
c->c_connid );
CONNECTION_LOCK_DECREF(c);
operation_destroy_from_client( op );
CONNECTION_DESTROY(c);
return -1;
CONNECTION_LOCK_DESTROY(c);
goto done;
}
c->c_pendingber = ber;
@ -751,46 +488,8 @@ operation_send_reject_locked(
connection_write_cb( -1, 0, c );
CONNECTION_LOCK_DECREF(c);
done:
operation_destroy_from_client( op );
return LDAP_SUCCESS;
}
void
operation_send_reject(
LloadOperation *op,
int result,
const char *msg,
int send_anyway )
{
LloadConnection *c;
ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
c = op->o_client;
if ( !c ) {
c = op->o_upstream;
/* One of the connections has initiated this and keeps a reference, if
* client is dead, it must have been the upstream */
assert( c );
CONNECTION_LOCK(c);
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
Debug( LDAP_DEBUG_TRACE, "operation_send_reject: "
"not sending msgid=%d, client connid=%lu is dead\n",
op->o_client_msgid, op->o_client_connid );
operation_destroy_from_upstream( op );
CONNECTION_UNLOCK_OR_DESTROY(c);
return;
}
CONNECTION_LOCK(c);
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
/* Non-zero return means connection has been unlocked and might be
* destroyed */
if ( operation_send_reject_locked( op, result, msg, send_anyway ) ==
LDAP_SUCCESS ) {
CONNECTION_UNLOCK_OR_DESTROY(c);
}
operation_unlink( op );
}
/*
@ -803,32 +502,27 @@ operation_send_reject(
void
operation_lost_upstream( LloadOperation *op )
{
LloadConnection *c = op->o_upstream;
operation_send_reject( op, LDAP_OTHER,
"connection to the remote server has been severed", 0 );
CONNECTION_LOCK(c);
op->o_upstream_refcnt--;
operation_destroy_from_upstream( op );
CONNECTION_UNLOCK(c);
}
int
connection_timeout( LloadConnection *upstream, void *arg )
{
LloadOperation *op;
TAvlnode *ops = NULL, *node;
TAvlnode *ops = NULL, *node, *next;
LloadBackend *b = upstream->c_private;
time_t threshold = *(time_t *)arg;
int rc, nops = 0;
CONNECTION_LOCK(upstream);
for ( node = tavl_end( upstream->c_ops, TAVL_DIR_LEFT ); node &&
((LloadOperation *)node->avl_data)->o_start <
threshold; /* shortcut */
node = tavl_next( node, TAVL_DIR_RIGHT ) ) {
node = next ) {
LloadOperation *found_op;
next = tavl_next( node, TAVL_DIR_RIGHT );
op = node->avl_data;
/* Have we received another response since? */
@ -836,7 +530,6 @@ connection_timeout( LloadConnection *upstream, void *arg )
continue;
}
op->o_upstream_refcnt++;
op->o_res = LLOAD_OP_FAILED;
found_op = tavl_delete( &upstream->c_ops, op, operation_upstream_cmp );
assert( op == found_op );
@ -863,13 +556,15 @@ connection_timeout( LloadConnection *upstream, void *arg )
}
if ( nops == 0 ) {
CONNECTION_UNLOCK(upstream);
return LDAP_SUCCESS;
}
upstream->c_n_ops_executing -= nops;
upstream->c_counters.lc_ops_failed += nops;
Debug( LDAP_DEBUG_STATS, "connection_timeout: "
"timing out %d operations for connid=%lu\n",
nops, upstream->c_connid );
CONNECTION_UNLOCK_INCREF(upstream);
CONNECTION_UNLOCK(upstream);
ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_n_ops_executing -= nops;
@ -877,36 +572,17 @@ connection_timeout( LloadConnection *upstream, void *arg )
for ( node = tavl_end( ops, TAVL_DIR_LEFT ); node;
node = tavl_next( node, TAVL_DIR_RIGHT ) ) {
LloadConnection *client;
op = node->avl_data;
ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
client = op->o_client;
if ( !client ) {
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
continue;
}
CONNECTION_LOCK(client);
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
/* operation_send_reject_locked unlocks and destroys client on
* failure */
if ( operation_send_reject_locked( op,
operation_send_reject( op,
op->o_tag == LDAP_REQ_SEARCH ? LDAP_TIMELIMIT_EXCEEDED :
LDAP_ADMINLIMIT_EXCEEDED,
"upstream did not respond in time", 0 ) == LDAP_SUCCESS ) {
CONNECTION_UNLOCK_OR_DESTROY(client);
}
"upstream did not respond in time", 0 );
if ( rc == LDAP_SUCCESS ) {
rc = operation_send_abandon( op );
rc = operation_send_abandon( op, upstream );
}
CONNECTION_LOCK(upstream);
op->o_upstream_refcnt--;
operation_destroy_from_upstream( op );
CONNECTION_UNLOCK(upstream);
operation_unlink( op );
}
/* TODO: if operation_send_abandon failed, we need to kill the upstream */
@ -914,7 +590,13 @@ connection_timeout( LloadConnection *upstream, void *arg )
connection_write_cb( -1, 0, upstream );
}
CONNECTION_LOCK_DECREF(upstream);
CONNECTION_LOCK(upstream);
if ( upstream->c_state == LLOAD_C_CLOSING && !upstream->c_ops ) {
CONNECTION_DESTROY(upstream);
} else {
CONNECTION_UNLOCK(upstream);
}
/* just dispose of the AVL, most operations should already be gone */
tavl_free( ops, NULL );
return LDAP_SUCCESS;
@ -933,12 +615,16 @@ operations_timeout( evutil_socket_t s, short what, void *arg )
threshold = slap_get_time() - lload_timeout_api->tv_sec;
LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
epoch_t epoch;
ldap_pvt_thread_mutex_lock( &b->b_mutex );
if ( b->b_n_ops_executing == 0 ) {
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
continue;
}
epoch = epoch_join();
Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
"timing out binds for backend uri=%s\n",
b->b_uri.bv_val );
@ -951,6 +637,7 @@ operations_timeout( evutil_socket_t s, short what, void *arg )
connections_walk_last( &b->b_mutex, &b->b_conns, b->b_last_conn,
connection_timeout, &threshold );
epoch_leave( epoch );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
done:
@ -976,13 +663,12 @@ operation_update_global_rejected( LloadOperation *op )
}
void
operation_update_conn_counters( LloadOperation *op )
operation_update_conn_counters( LloadOperation *op, LloadConnection *upstream )
{
assert( op->o_upstream != NULL );
if ( op->o_res == LLOAD_OP_COMPLETED ) {
op->o_upstream->c_counters.lc_ops_completed++;
upstream->c_counters.lc_ops_completed++;
} else {
op->o_upstream->c_counters.lc_ops_failed++;
upstream->c_counters.lc_ops_failed++;
}
}

View file

@ -172,15 +172,17 @@ LDAP_SLAPD_F (const char *) lload_msgtype2str( ber_tag_t tag );
LDAP_SLAPD_F (int) operation_upstream_cmp( const void *l, const void *r );
LDAP_SLAPD_F (int) operation_client_cmp( const void *l, const void *r );
LDAP_SLAPD_F (LloadOperation *) operation_init( LloadConnection *c, BerElement *ber );
LDAP_SLAPD_F (int) operation_send_abandon( LloadOperation *op );
LDAP_SLAPD_F (int) operation_send_abandon( LloadOperation *op, LloadConnection *c );
LDAP_SLAPD_F (void) operation_abandon( LloadOperation *op );
LDAP_SLAPD_F (void) operation_send_reject( LloadOperation *op, int result, const char *msg, int send_anyway );
LDAP_SLAPD_F (int) operation_send_reject_locked( LloadOperation *op, int result, const char *msg, int send_anyway );
LDAP_SLAPD_F (void) operation_lost_upstream( LloadOperation *op );
LDAP_SLAPD_F (void) operation_destroy_from_client( LloadOperation *op );
LDAP_SLAPD_F (void) operation_destroy_from_upstream( LloadOperation *op );
LDAP_SLAPD_F (void) operation_destroy( LloadOperation *op );
LDAP_SLAPD_F (int) operation_unlink( LloadOperation *op );
LDAP_SLAPD_F (int) operation_unlink_client( LloadOperation *op, LloadConnection *client );
LDAP_SLAPD_F (int) operation_unlink_upstream( LloadOperation *op, LloadConnection *upstream );
LDAP_SLAPD_F (void) operations_timeout( evutil_socket_t s, short what, void *arg );
LDAP_SLAPD_F (void) operation_update_conn_counters( LloadOperation *op );
LDAP_SLAPD_F (void) operation_update_conn_counters( LloadOperation *op, LloadConnection *upstream );
LDAP_SLAPD_F (void) operation_update_backend_counters( LloadOperation *op, LloadBackend *b );
LDAP_SLAPD_F (void) operation_update_global_rejected( LloadOperation *op );
/*

View file

@ -38,6 +38,8 @@ static const sasl_callback_t client_callbacks[] = {
};
#endif /* HAVE_CYRUS_SASL */
static void upstream_unlink( LloadConnection *upstream );
int
forward_response( LloadConnection *client, LloadOperation *op, BerElement *ber )
{
@ -101,13 +103,13 @@ forward_final_response(
"connid=%lu msgid=%d finishing up with a request for "
"client connid=%lu\n",
op->o_upstream_connid, op->o_upstream_msgid, op->o_client_connid );
rc = forward_response( client, op, ber );
CONNECTION_LOCK(op->o_upstream);
op->o_res = LLOAD_OP_COMPLETED;
if ( !op->o_pin_id || !op->o_upstream_refcnt-- ) {
operation_destroy_from_upstream( op );
if ( !op->o_pin_id ) {
operation_unlink( op );
}
CONNECTION_UNLOCK(op->o_upstream);
return rc;
}
@ -177,11 +179,13 @@ handle_one_response( LloadConnection *c )
goto fail;
}
CONNECTION_LOCK(c);
if ( needle.o_upstream_msgid == 0 ) {
return handle_unsolicited( c, ber );
} else if ( !( op = tavl_find(
c->c_ops, &needle, operation_upstream_cmp ) ) ) {
/* Already abandoned, do nothing */
CONNECTION_UNLOCK(c);
ber_free( ber, 1 );
return rc;
/*
@ -190,6 +194,7 @@ handle_one_response( LloadConnection *c )
event_del( c->c_read_event );
*/
} else {
CONNECTION_UNLOCK(c);
/*
op->o_response_pending = ber;
*/
@ -239,40 +244,14 @@ handle_one_response( LloadConnection *c )
if ( handler ) {
LloadConnection *client;
op->o_upstream_refcnt++;
CONNECTION_UNLOCK_INCREF(c);
ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
client = op->o_client;
if ( client ) {
CONNECTION_LOCK(client);
if ( client->c_live ) {
op->o_client_refcnt++;
CONNECTION_UNLOCK_INCREF(client);
} else {
CONNECTION_UNLOCK(client);
client = NULL;
}
}
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
if ( client ) {
if ( client && IS_ALIVE( client, c_refcnt ) ) {
rc = handler( client, op, ber );
CONNECTION_LOCK_DECREF(client);
op->o_client_refcnt--;
if ( !op->o_client_refcnt ) {
operation_destroy_from_client( op );
}
CONNECTION_UNLOCK_OR_DESTROY(client);
} else {
ber_free( ber, 1 );
}
CONNECTION_LOCK_DECREF(c);
op->o_upstream_refcnt--;
if ( !client || !op->o_upstream_refcnt ) {
operation_destroy_from_upstream( op );
}
} else {
assert(0);
ber_free( ber, 1 );
@ -284,9 +263,8 @@ fail:
"error on processing a response (%s) on upstream connection "
"connid=%ld, tag=%lx\n",
lload_msgtype2str( tag ), c->c_connid, tag );
CONNECTION_DESTROY(c);
CONNECTION_LOCK_DESTROY(c);
}
/* We leave the connection locked */
return rc;
}
@ -459,19 +437,16 @@ upstream_bind_cb( LloadConnection *c )
ber_len_t len;
int rc;
CONNECTION_UNLOCK_INCREF(c);
if ( ber_peek_tag( ber, &len ) == LDAP_TAG_SASL_RES_CREDS &&
ber_scanf( ber, "m", &scred ) == LBER_ERROR ) {
Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: "
"sasl bind response malformed\n" );
CONNECTION_LOCK_DECREF(c);
goto fail;
}
rc = sasl_bind_step( c, &scred, &ccred );
if ( rc != SASL_OK &&
( rc != SASL_CONTINUE || result == LDAP_SUCCESS ) ) {
CONNECTION_LOCK_DECREF(c);
goto fail;
}
@ -482,7 +457,6 @@ upstream_bind_cb( LloadConnection *c )
outber = c->c_pendingber;
if ( outber == NULL && (outber = ber_alloc()) == NULL ) {
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
CONNECTION_LOCK_DECREF(c);
goto fail;
}
c->c_pendingber = outber;
@ -496,27 +470,25 @@ upstream_bind_cb( LloadConnection *c )
connection_write_cb( -1, 0, c );
CONNECTION_LOCK_DECREF(c);
if ( rc == SASL_OK ) {
BER_BVZERO( &c->c_sasl_bind_mech );
}
break;
}
CONNECTION_LOCK_DECREF(c);
}
if ( result == LDAP_SASL_BIND_IN_PROGRESS ) {
goto fail;
}
#endif /* HAVE_CYRUS_SASL */
CONNECTION_LOCK(c);
c->c_pdu_cb = handle_one_response;
c->c_state = LLOAD_C_READY;
c->c_type = LLOAD_C_OPEN;
c->c_read_timeout = NULL;
event_add( c->c_read_event, c->c_read_timeout );
Debug( LDAP_DEBUG_CONNS, "upstream_bind_cb: "
"connid=%lu finished binding, now active\n",
c->c_connid );
CONNECTION_UNLOCK_INCREF(c);
CONNECTION_UNLOCK(c);
ldap_pvt_thread_mutex_lock( &b->b_mutex );
LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next );
b->b_active++;
@ -531,7 +503,6 @@ upstream_bind_cb( LloadConnection *c )
b->b_last_conn = c;
backend_retry( b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
CONNECTION_LOCK_DECREF(c);
break;
default:
Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: "
@ -540,12 +511,13 @@ upstream_bind_cb( LloadConnection *c )
goto fail;
}
event_add( c->c_read_event, c->c_read_timeout );
ber_free( ber, 1 );
return LDAP_SUCCESS;
return -1;
fail:
CONNECTION_LOCK_DESTROY(c);
ber_free( ber, 1 );
CONNECTION_DESTROY(c);
return -1;
}
@ -556,9 +528,17 @@ upstream_bind( void *ctx, void *arg )
BerElement *ber;
ber_int_t msgid;
CONNECTION_LOCK_DECREF(c);
/* A reference was passed on to us */
assert( IS_ALIVE( c, c_refcnt ) );
if ( !IS_ALIVE( c, c_live ) ) {
RELEASE_REF( c, c_refcnt, c->c_destroy );
return NULL;
}
CONNECTION_LOCK(c);
c->c_pdu_cb = upstream_bind_cb;
CONNECTION_UNLOCK_INCREF(c);
CONNECTION_UNLOCK(c);
ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
ber = c->c_pendingber;
@ -599,16 +579,18 @@ upstream_bind( void *ctx, void *arg )
connection_write_cb( -1, 0, c );
CONNECTION_LOCK_DECREF(c);
CONNECTION_LOCK(c);
c->c_read_timeout = lload_timeout_net;
event_add( c->c_read_event, c->c_read_timeout );
CONNECTION_UNLOCK_OR_DESTROY(c);
CONNECTION_UNLOCK(c);
RELEASE_REF( c, c_refcnt, c->c_destroy );
return NULL;
fail:
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
CONNECTION_LOCK_DESTROY(c);
RELEASE_REF( c, c_refcnt, c->c_destroy );
return NULL;
}
@ -621,6 +603,7 @@ upstream_finish( LloadConnection *c )
LloadBackend *b = c->c_private;
int is_bindconn = 0;
assert( c->c_live );
c->c_pdu_cb = handle_one_response;
/* Unless we are configured to use the VC exop, consider allocating the
@ -675,7 +658,8 @@ upstream_finish( LloadConnection *c )
c->c_connid );
return -1;
}
c->c_refcnt++;
/* keep a reference for upstream_bind */
acquire_ref( &c->c_refcnt );
Debug( LDAP_DEBUG_CONNS, "upstream_finish: "
"scheduled a bind callback for connid=%lu\n",
@ -697,6 +681,7 @@ upstream_tls_handshake_cb( evutil_socket_t s, short what, void *arg )
{
LloadConnection *c = arg;
LloadBackend *b;
epoch_t epoch;
int rc = LDAP_SUCCESS;
CONNECTION_LOCK(c);
@ -737,9 +722,9 @@ upstream_tls_handshake_cb( evutil_socket_t s, short what, void *arg )
c->c_connid );
c->c_is_tls = LLOAD_TLS_ESTABLISHED;
CONNECTION_UNLOCK_INCREF(c);
CONNECTION_UNLOCK(c);
ldap_pvt_thread_mutex_lock( &b->b_mutex );
CONNECTION_LOCK_DECREF(c);
CONNECTION_LOCK(c);
rc = upstream_finish( c );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
@ -753,14 +738,18 @@ upstream_tls_handshake_cb( evutil_socket_t s, short what, void *arg )
"connid=%lu need write rc=%d\n",
c->c_connid, rc );
}
CONNECTION_UNLOCK_OR_DESTROY(c);
CONNECTION_UNLOCK(c);
return;
fail:
Debug( LDAP_DEBUG_CONNS, "upstream_tls_handshake_cb: "
"connid=%lu failed rc=%d\n",
c->c_connid, rc );
assert( c->c_ops == NULL );
epoch = epoch_join();
CONNECTION_DESTROY(c);
epoch_leave( epoch );
}
static int
@ -774,6 +763,7 @@ upstream_starttls( LloadConnection *c )
ber_tag_t tag;
c->c_currentber = NULL;
CONNECTION_LOCK(c);
if ( ber_scanf( ber, "it", &msgid, &tag ) == LBER_ERROR ) {
Debug( LDAP_DEBUG_ANY, "upstream_starttls: "
@ -824,9 +814,9 @@ upstream_starttls( LloadConnection *c )
}
c->c_is_tls = LLOAD_CLEARTEXT;
CONNECTION_UNLOCK_INCREF(c);
CONNECTION_UNLOCK(c);
ldap_pvt_thread_mutex_lock( &b->b_mutex );
CONNECTION_LOCK_DECREF(c);
CONNECTION_LOCK(c);
rc = upstream_finish( c );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
@ -836,7 +826,7 @@ upstream_starttls( LloadConnection *c )
}
ber_free( ber, 1 );
CONNECTION_UNLOCK_OR_DESTROY(c);
CONNECTION_UNLOCK(c);
return rc;
}
@ -884,6 +874,7 @@ upstream_init( ber_socket_t s, LloadBackend *b )
return NULL;
}
CONNECTION_LOCK(c);
c->c_private = b;
c->c_is_tls = b->b_tls;
c->c_pdu_cb = handle_one_response;
@ -939,14 +930,15 @@ upstream_init( ber_socket_t s, LloadBackend *b )
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
c->c_pdu_cb = upstream_starttls;
CONNECTION_UNLOCK_INCREF(c);
CONNECTION_UNLOCK(c);
connection_write_cb( s, 0, c );
CONNECTION_LOCK_DECREF(c);
CONNECTION_LOCK(c);
}
event_add( c->c_read_event, c->c_read_timeout );
c->c_destroy = upstream_destroy;
CONNECTION_UNLOCK_OR_DESTROY(c);
c->c_unlink = upstream_unlink;
CONNECTION_UNLOCK(c);
return c;
@ -961,46 +953,39 @@ fail:
}
c->c_state = LLOAD_C_INVALID;
CONNECTION_DESTROY(c);
assert( c == NULL );
c->c_live--;
c->c_refcnt--;
connection_destroy( c );
return NULL;
}
void
upstream_destroy( LloadConnection *c )
static void
upstream_unlink( LloadConnection *c )
{
LloadBackend *b = c->c_private;
struct event *read_event, *write_event;
TAvlnode *root, *node;
TAvlnode *root;
long freed, executing;
enum sc_state state;
Debug( LDAP_DEBUG_CONNS, "upstream_destroy: "
"freeing connection connid=%lu\n",
Debug( LDAP_DEBUG_CONNS, "upstream_unlink: "
"removing upstream connid=%lu\n",
c->c_connid );
assert( c->c_state != LLOAD_C_INVALID );
state = c->c_state;
c->c_state = LLOAD_C_INVALID;
assert( c->c_state != LLOAD_C_DYING );
c->c_state = LLOAD_C_DYING;
read_event = c->c_read_event;
write_event = c->c_write_event;
root = c->c_ops;
c->c_ops = NULL;
executing = c->c_n_ops_executing;
c->c_n_ops_executing = 0;
read_event = c->c_read_event;
write_event = c->c_write_event;
for ( node = tavl_end( root, TAVL_DIR_LEFT ); node;
node = tavl_next( node, TAVL_DIR_RIGHT ) ) {
LloadOperation *op = node->avl_data;
op->o_res = LLOAD_OP_FAILED;
op->o_upstream_refcnt++;
}
CONNECTION_UNLOCK_INCREF(c);
CONNECTION_UNLOCK(c);
freed = tavl_free( root, (AVL_FREE)operation_lost_upstream );
assert( freed == executing );
@ -1018,8 +1003,6 @@ upstream_destroy( LloadConnection *c )
event_del( write_event );
}
/* Remove from the backend on first pass */
if ( state != LLOAD_C_DYING ) {
ldap_pvt_thread_mutex_lock( &b->b_mutex );
if ( c->c_type == LLOAD_C_PREPARING ) {
LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next );
@ -1053,9 +1036,20 @@ upstream_destroy( LloadConnection *c )
b->b_n_ops_executing -= executing;
backend_retry( b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
CONNECTION_LOCK(c);
}
CONNECTION_LOCK_DECREF(c);
void
upstream_destroy( LloadConnection *c )
{
Debug( LDAP_DEBUG_CONNS, "upstream_destroy: "
"freeing connection connid=%lu\n",
c->c_connid );
CONNECTION_LOCK(c);
assert( c->c_state == LLOAD_C_DYING );
c->c_state = LLOAD_C_INVALID;
if ( c->c_read_event ) {
event_free( c->c_read_event );
@ -1067,21 +1061,8 @@ upstream_destroy( LloadConnection *c )
c->c_write_event = NULL;
}
/*
* If we attempted to destroy any operations, we might have lent a new
* refcnt token for a thread that raced us to that, let them call us again
* later
*/
assert( c->c_refcnt >= 0 );
if ( c->c_refcnt ) {
c->c_state = LLOAD_C_DYING;
Debug( LDAP_DEBUG_CONNS, "upstream_destroy: "
"connid=%lu aborting with refcnt=%d\n",
c->c_connid, c->c_refcnt );
CONNECTION_UNLOCK(c);
return;
}
if ( c->c_type != LLOAD_C_BIND ) {
BER_BVZERO( &c->c_sasl_bind_mech );
}
connection_destroy( c );
}