diff --git a/servers/lloadd/Makefile.in b/servers/lloadd/Makefile.in index c78bb3acd9..e99bad2770 100644 --- a/servers/lloadd/Makefile.in +++ b/servers/lloadd/Makefile.in @@ -20,7 +20,7 @@ NT_SRCS = nt_svc.c NT_OBJS = nt_svc.o ../../libraries/liblutil/slapdmsg.res SRCS = backend.c bind.c config.c connection.c client.c \ - daemon.c extended.c init.c operation.c \ + daemon.c epoch.c extended.c init.c operation.c \ upstream.c libevent_support.c \ $(@PLAT@_SRCS) diff --git a/servers/lloadd/backend.c b/servers/lloadd/backend.c index 711cfaf4ef..a2cff0c525 100644 --- a/servers/lloadd/backend.c +++ b/servers/lloadd/backend.c @@ -33,6 +33,7 @@ upstream_connect_cb( evutil_socket_t s, short what, void *arg ) LloadPendingConnection *conn = arg; LloadBackend *b = conn->backend; int error = 0, rc = -1; + epoch_t epoch; ldap_pvt_thread_mutex_lock( &b->b_mutex ); Debug( LDAP_DEBUG_CONNS, "upstream_connect_cb: " @@ -44,6 +45,8 @@ upstream_connect_cb( evutil_socket_t s, short what, void *arg ) goto preempted; } + epoch = epoch_join(); + if ( what == EV_WRITE ) { socklen_t optlen = sizeof(error); @@ -53,6 +56,7 @@ upstream_connect_cb( evutil_socket_t s, short what, void *arg ) } if ( error == EINTR || error == EINPROGRESS || error == EWOULDBLOCK ) { ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + epoch_leave( epoch ); return; } else if ( error ) { goto done; @@ -63,6 +67,8 @@ upstream_connect_cb( evutil_socket_t s, short what, void *arg ) } done: + epoch_leave( epoch ); + LDAP_LIST_REMOVE( conn, next ); if ( rc ) { evutil_closesocket( conn->fd ); @@ -93,6 +99,7 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg ) { LloadBackend *b = arg; ber_socket_t s = AC_SOCKET_INVALID; + epoch_t epoch; int rc; if ( result == EVUTIL_EAI_CANCEL ) { @@ -111,6 +118,7 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg ) } b->b_dns_req = NULL; + epoch = epoch_join(); if ( result || !res ) { Debug( LDAP_DEBUG_ANY, "upstream_name_cb: " "name resolution failed for backend '%s': %s\n", @@ -176,6 +184,7 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg ) ldap_pvt_thread_mutex_unlock( &b->b_mutex ); evutil_freeaddrinfo( res ); + epoch_leave( epoch ); return; fail: @@ -189,6 +198,7 @@ fail: if ( res ) { evutil_freeaddrinfo( res ); } + epoch_leave( epoch ); } LloadConnection * @@ -268,7 +278,6 @@ backend_select( LloadOperation *op, int *res ) } c->c_n_ops_executing++; c->c_counters.lc_ops_received++; - CONNECTION_UNLOCK_INCREF(c); ldap_pvt_thread_mutex_unlock( &b->b_mutex ); *res = LDAP_SUCCESS; @@ -356,6 +365,7 @@ backend_connect( evutil_socket_t s, short what, void *arg ) LloadBackend *b = arg; struct evdns_getaddrinfo_request *request, *placeholder; char *hostname; + epoch_t epoch; ldap_pvt_thread_mutex_lock( &b->b_mutex ); assert( b->b_dns_req == NULL ); @@ -372,6 +382,8 @@ backend_connect( evutil_socket_t s, short what, void *arg ) return; } + epoch = epoch_join(); + Debug( LDAP_DEBUG_CONNS, "backend_connect: " "%sattempting connection to %s\n", (what & EV_TIMEOUT) ? "retry timeout finished, " : "", @@ -438,6 +450,7 @@ backend_connect( evutil_socket_t s, short what, void *arg ) } ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + epoch_leave( epoch ); return; } #endif /* LDAP_PF_LOCAL */ @@ -473,6 +486,7 @@ backend_connect( evutil_socket_t s, short what, void *arg ) b->b_dns_req = request; } ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + epoch_leave( epoch ); return; fail: @@ -480,6 +494,7 @@ fail: b->b_failed++; backend_retry( b ); ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + epoch_leave( epoch ); } void * diff --git a/servers/lloadd/bind.c b/servers/lloadd/bind.c index 3f0993a63e..4f429602f9 100644 --- a/servers/lloadd/bind.c +++ b/servers/lloadd/bind.c @@ -34,7 +34,8 @@ bind_mech_external( { BerValue binddn; void *ssl; - char *ptr; + char *ptr, *message = ""; + int result = LDAP_SUCCESS; client->c_state = LLOAD_C_READY; client->c_type = LLOAD_C_OPEN; @@ -49,14 +50,16 @@ bind_mech_external( * allow that. */ if ( !BER_BVISEMPTY( credentials ) ) { - return operation_send_reject_locked( op, LDAP_UNWILLING_TO_PERFORM, - "proxy authorization is not supported", 1 ); + result = LDAP_UNWILLING_TO_PERFORM; + message = "proxy authorization is not supported"; + goto done; } ssl = ldap_pvt_tls_sb_ctx( client->c_sb ); if ( !ssl || ldap_pvt_tls_get_peer_dn( ssl, &binddn, NULL, 0 ) ) { - return operation_send_reject_locked( op, LDAP_INVALID_CREDENTIALS, - "no externally negotiated identity", 1 ); + result = LDAP_INVALID_CREDENTIALS; + message = "no externally negotiated identity"; + goto done; } client->c_auth.bv_len = binddn.bv_len + STRLENOF("dn:"); client->c_auth.bv_val = ch_malloc( client->c_auth.bv_len + 1 ); @@ -71,22 +74,20 @@ bind_mech_external( client->c_type = LLOAD_C_PRIVILEGED; } - return operation_send_reject_locked( op, LDAP_SUCCESS, "", 1 ); +done: + CONNECTION_UNLOCK(client); + operation_send_reject( op, result, message, 1 ); + return LDAP_SUCCESS; } -/* - * On entering the function, we've put a reference on both connections and hold - * upstream's c_io_mutex. - */ static int client_bind( LloadOperation *op, + LloadConnection *upstream, struct berval *binddn, ber_tag_t tag, struct berval *auth ) { - LloadConnection *upstream = op->o_upstream; - ber_printf( upstream->c_pendingber, "t{titOtO}", LDAP_TAG_MESSAGE, LDAP_TAG_MSGID, op->o_upstream_msgid, LDAP_REQ_BIND, &op->o_request, @@ -96,19 +97,14 @@ client_bind( } #ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS -/* - * On entering the function, we've put a reference on both connections and hold - * upstream's c_io_mutex. - */ static int client_bind_as_vc( LloadOperation *op, + LloadConnection *upstream, struct berval *binddn, ber_tag_t tag, struct berval *auth ) { - LloadConnection *upstream = op->o_upstream; - CONNECTION_LOCK(upstream); ber_printf( upstream->c_pendingber, "t{tit{tst{{tOOtOtO}}}}", LDAP_TAG_MESSAGE, LDAP_TAG_MSGID, op->o_upstream_msgid, @@ -192,9 +188,12 @@ request_bind( LloadConnection *client, LloadOperation *op ) struct berval binddn, auth, mech = BER_BVNULL; ber_int_t version; ber_tag_t tag; - unsigned long pin = client->c_pin_id; + unsigned long pin; int res, rc = LDAP_SUCCESS; + CONNECTION_LOCK(client); + pin = client->c_pin_id; + if ( pin ) { LloadOperation *pinned_op, needle = { .o_client_connid = client->c_connid, @@ -222,25 +221,28 @@ request_bind( LloadConnection *client, LloadOperation *op ) pinned_op->o_request = op->o_request; pinned_op->o_ctrls = op->o_ctrls; - /* - * pinned_op is accessible from the upstream, protect it since we - * lose the client lock in operation_destroy_from_client temporarily - */ - pinned_op->o_client_refcnt++; + /* Noone has seen this operation yet, plant the pin back in its stead */ + client->c_n_ops_executing--; op->o_res = LLOAD_OP_COMPLETED; + tavl_delete( &client->c_ops, op, operation_client_cmp ); + op->o_client = NULL; + assert( op->o_upstream == NULL ); + + rc = tavl_insert( &client->c_ops, pinned_op, operation_client_cmp, + avl_dup_error ); + assert( rc == LDAP_SUCCESS ); + + /* Noone has seen this operation yet */ + op->o_refcnt--; + operation_destroy( op ); /* We didn't start a new operation, just continuing an existing one */ lload_stats.counters[LLOAD_STATS_OPS_BIND].lc_ops_received--; - operation_destroy_from_client( op ); - pinned_op->o_client_refcnt--; - op = pinned_op; } } - /* protect the Bind operation */ - op->o_client_refcnt++; tavl_delete( &client->c_ops, op, operation_client_cmp ); client_reset( client ); @@ -259,10 +261,11 @@ request_bind( LloadConnection *client, LloadOperation *op ) "failed to parse version field\n" ); goto fail; } else if ( version != LDAP_VERSION3 ) { - operation_send_reject_locked( + CONNECTION_UNLOCK(client); + operation_send_reject( op, LDAP_PROTOCOL_ERROR, "LDAP version unsupported", 1 ); - ber_free( copy, 0 ); - return LDAP_SUCCESS; + CONNECTION_LOCK(client); + goto fail; } tag = ber_get_stringbv( copy, &binddn, LBER_BV_NOTERM ); @@ -307,10 +310,7 @@ request_bind( LloadConnection *client, LloadOperation *op ) /* terminate the upstream side if client switched mechanisms */ if ( pin ) { - op->o_client_refcnt++; - CONNECTION_UNLOCK_INCREF(client); operation_abandon( op ); - CONNECTION_LOCK_DECREF(client); } ber_free( copy, 0 ); @@ -326,26 +326,28 @@ request_bind( LloadConnection *client, LloadOperation *op ) rc = tavl_insert( &client->c_ops, op, operation_client_cmp, avl_dup_error ); assert( rc == LDAP_SUCCESS ); - CONNECTION_UNLOCK_INCREF(client); + CONNECTION_UNLOCK(client); if ( pin ) { ldap_pvt_thread_mutex_lock( &op->o_link_mutex ); upstream = op->o_upstream; + ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); + if ( upstream ) { + ldap_pvt_thread_mutex_lock( &upstream->c_io_mutex ); CONNECTION_LOCK(upstream); if ( !upstream->c_live ) { CONNECTION_UNLOCK(upstream); + ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex ); upstream = NULL; } } - ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); } /* If we were pinned but lost the link, don't look for a new upstream, we * have to reject the op and clear pin */ if ( upstream ) { - CONNECTION_UNLOCK_INCREF(upstream); - ldap_pvt_thread_mutex_lock( &upstream->c_io_mutex ); + /* No need to do anything */ } else if ( !pin ) { upstream = backend_select( op, &res ); } else { @@ -377,18 +379,27 @@ request_bind( LloadConnection *client, LloadOperation *op ) ber = upstream->c_pendingber; if ( ber == NULL && (ber = ber_alloc()) == NULL ) { + ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex ); + if ( !pin ) { + LloadBackend *b = upstream->c_private; + + upstream->c_n_ops_executing--; + CONNECTION_UNLOCK(upstream); + + ldap_pvt_thread_mutex_lock( &b->b_mutex ); + b->b_n_ops_executing--; + operation_update_backend_counters( op, b ); + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + } else { + CONNECTION_UNLOCK(upstream); + } + Debug( LDAP_DEBUG_ANY, "request_bind: " "ber_alloc failed\n" ); - CONNECTION_LOCK_DECREF(upstream); - ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex ); - upstream->c_state = LLOAD_C_READY; - if ( !BER_BVISNULL( &upstream->c_sasl_bind_mech ) ) { - ber_memfree( upstream->c_sasl_bind_mech.bv_val ); - BER_BVZERO( &upstream->c_sasl_bind_mech ); - } - CONNECTION_UNLOCK_OR_DESTROY(upstream); - CONNECTION_LOCK_DECREF(client); + operation_unlink( op ); + + CONNECTION_LOCK(client); goto fail; } upstream->c_pendingber = ber; @@ -397,7 +408,6 @@ request_bind( LloadConnection *client, LloadOperation *op ) lload_stats.counters[LLOAD_STATS_OPS_BIND].lc_ops_forwarded++; } - CONNECTION_LOCK(upstream); if ( pin ) { tavl_delete( &upstream->c_ops, op, operation_upstream_cmp ); if ( tag == LDAP_AUTH_SIMPLE ) { @@ -440,52 +450,28 @@ request_bind( LloadConnection *client, LloadOperation *op ) #ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS if ( lload_features & LLOAD_FEATURE_VC ) { - rc = client_bind_as_vc( op, &binddn, tag, &auth ); + rc = client_bind_as_vc( op, upstream, &binddn, tag, &auth ); } else #endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */ { - rc = client_bind( op, &binddn, tag, &auth ); + rc = client_bind( op, upstream, &binddn, tag, &auth ); } + ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex ); done: - if ( rc == LDAP_SUCCESS ) { - CONNECTION_LOCK(client); - if ( upstream ) { - ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex ); - } + CONNECTION_LOCK(client); + if ( rc == LDAP_SUCCESS ) { client->c_pin_id = pin; - if ( !--op->o_client_refcnt || !upstream ) { - operation_destroy_from_client( op ); - if ( client->c_state == LLOAD_C_BINDING ) { - client->c_state = LLOAD_C_READY; - client->c_type = LLOAD_C_OPEN; - client->c_pin_id = 0; - if ( !BER_BVISNULL( &client->c_auth ) ) { - ch_free( client->c_auth.bv_val ); - BER_BVZERO( &client->c_auth ); - } - if ( !BER_BVISNULL( &client->c_sasl_bind_mech ) ) { - ber_memfree( client->c_sasl_bind_mech.bv_val ); - BER_BVZERO( &client->c_sasl_bind_mech ); - } - } - } CONNECTION_UNLOCK(client); if ( upstream ) { connection_write_cb( -1, 0, upstream ); - CONNECTION_LOCK_DECREF(upstream); - CONNECTION_UNLOCK_OR_DESTROY(upstream); } - CONNECTION_LOCK_DECREF(client); } else { fail: rc = -1; - CONNECTION_LOCK_DECREF(client); - op->o_client_refcnt--; - operation_destroy_from_client( op ); client->c_pin_id = 0; CONNECTION_DESTROY(client); } @@ -508,42 +494,26 @@ finish_sasl_bind( LloadOperation *op, BerElement *ber ) { - LloadConnection *client = op->o_client; BerElement *output; LloadOperation *removed; ber_int_t msgid; int rc; - if ( !(lload_features & LLOAD_FEATURE_PROXYAUTHZ) ) { - Debug( LDAP_DEBUG_TRACE, "finish_sasl_bind: " - "connid=%lu not configured to do proxyauthz, making no " - "attempt to resolve final authzid name\n", - op->o_client_connid ); - CONNECTION_UNLOCK(upstream); - return forward_final_response( client, op, ber ); - } - removed = tavl_delete( &upstream->c_ops, op, operation_upstream_cmp ); if ( !removed ) { assert( upstream->c_state != LLOAD_C_BINDING ); /* FIXME: has client replaced this bind since? */ assert(0); - - operation_destroy_from_upstream( op ); } assert( removed == op && upstream->c_state == LLOAD_C_BINDING ); CONNECTION_UNLOCK(upstream); - Debug( LDAP_DEBUG_TRACE, "finish_sasl_bind: " - "SASL exchange in lieu of client connid=%lu to upstream " - "connid=%lu finished, resolving final authzid name\n", - op->o_client_connid, op->o_upstream_connid ); - ldap_pvt_thread_mutex_lock( &upstream->c_io_mutex ); output = upstream->c_pendingber; if ( output == NULL && (output = ber_alloc()) == NULL ) { ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex ); + CONNECTION_LOCK_DESTROY(upstream); return -1; } upstream->c_pendingber = output; @@ -564,12 +534,18 @@ finish_sasl_bind( ber_free( op->o_ber, 1 ); op->o_ber = ber; + /* Could we have been unlinked in the meantime? */ rc = tavl_insert( &upstream->c_ops, op, operation_upstream_cmp, avl_dup_error ); assert( rc == LDAP_SUCCESS ); CONNECTION_UNLOCK(upstream); + Debug( LDAP_DEBUG_TRACE, "finish_sasl_bind: " + "SASL exchange in lieu of client connid=%lu to upstream " + "connid=%lu finished, resolving final authzid name msgid=%d\n", + op->o_client_connid, op->o_upstream_connid, op->o_upstream_msgid ); + connection_write_cb( -1, 0, upstream ); return LDAP_SUCCESS; } @@ -580,7 +556,7 @@ handle_bind_response( LloadOperation *op, BerElement *ber ) { - LloadConnection *upstream = op->o_upstream; + LloadConnection *upstream; BerValue response; BerElement *copy; LloadOperation *removed; @@ -611,6 +587,13 @@ handle_bind_response( "connid=%lu, result=%d\n", op->o_client_msgid, op->o_client_connid, result ); + ldap_pvt_thread_mutex_lock( &op->o_link_mutex ); + upstream = op->o_upstream; + ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); + if ( !upstream ) { + return LDAP_SUCCESS; + } + CONNECTION_LOCK(upstream); if ( !tavl_find( upstream->c_ops, op, operation_upstream_cmp ) ) { /* @@ -621,7 +604,6 @@ handle_bind_response( * no response is expected * - ??? */ - operation_destroy_from_upstream( op ); CONNECTION_UNLOCK(upstream); return LDAP_SUCCESS; } @@ -650,7 +632,6 @@ handle_bind_response( } else if ( result == LDAP_SASL_BIND_IN_PROGRESS ) { tavl_delete( &upstream->c_ops, op, operation_upstream_cmp ); op->o_upstream_msgid = 0; - op->o_upstream_refcnt++; rc = tavl_insert( &upstream->c_ops, op, operation_upstream_cmp, avl_dup_error ); assert( rc == LDAP_SUCCESS ); @@ -665,13 +646,18 @@ handle_bind_response( assert( op->o_client_msgid && op->o_upstream_msgid ); op->o_pin_id = 0; - if ( sasl_finished && result == LDAP_SUCCESS ) { + if ( (lload_features & LLOAD_FEATURE_PROXYAUTHZ) && sasl_finished && + result == LDAP_SUCCESS ) { return finish_sasl_bind( upstream, op, ber ); } - upstream->c_state = LLOAD_C_READY; + op->o_res = LLOAD_OP_COMPLETED; } CONNECTION_UNLOCK(upstream); + if ( !op->o_pin_id ) { + operation_unlink_upstream( op, upstream ); + } + CONNECTION_LOCK(client); removed = tavl_delete( &client->c_ops, op, operation_client_cmp ); assert( !removed || op == removed ); @@ -687,7 +673,6 @@ handle_bind_response( break; case LDAP_SUCCESS: default: { - op->o_client = NULL; client->c_state = LLOAD_C_READY; client->c_type = LLOAD_C_OPEN; client->c_pin_id = 0; @@ -708,7 +693,7 @@ handle_bind_response( } } } else { - assert( client->c_state == LLOAD_C_INVALID || + assert( client->c_state == LLOAD_C_DYING || client->c_state == LLOAD_C_CLOSING ); } CONNECTION_UNLOCK(client); @@ -729,7 +714,7 @@ handle_whoami_response( LloadOperation *op, BerElement *ber ) { - LloadConnection *upstream = op->o_upstream; + LloadConnection *upstream; BerValue matched, diagmsg; BerElement *saved_response = op->o_ber; LloadOperation *removed; @@ -739,7 +724,7 @@ handle_whoami_response( Debug( LDAP_DEBUG_TRACE, "handle_whoami_response: " "connid=%ld received whoami response in lieu of connid=%ld\n", - upstream->c_connid, client->c_connid ); + op->o_upstream_connid, client->c_connid ); tag = ber_scanf( ber, "{emm" /* "}" */, &result, &matched, &diagmsg ); @@ -748,38 +733,40 @@ handle_whoami_response( return -1; } - CONNECTION_LOCK_DECREF(upstream); + ldap_pvt_thread_mutex_lock( &op->o_link_mutex ); + upstream = op->o_upstream; + ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); + if ( !upstream ) { + return LDAP_SUCCESS; + } + + op->o_res = LLOAD_OP_COMPLETED; + /* Clear upstream status */ + operation_unlink_upstream( op, upstream ); + if ( result == LDAP_PROTOCOL_ERROR ) { LloadBackend *b; + CONNECTION_LOCK(upstream); b = (LloadBackend *)upstream->c_private; Debug( LDAP_DEBUG_ANY, "handle_whoami_response: " "Who Am I? extended operation not supported on backend %s, " "proxyauthz with clients that do SASL binds will not work " "msg=%s!\n", b->b_uri.bv_val, diagmsg.bv_val ); - CONNECTION_UNLOCK_INCREF(upstream); + CONNECTION_UNLOCK(upstream); operation_send_reject( op, LDAP_OTHER, "upstream protocol error", 0 ); return -1; } - if ( upstream->c_state != LLOAD_C_CLOSING ) { - assert( upstream->c_state == LLOAD_C_BINDING ); - upstream->c_state = LLOAD_C_READY; - } - if ( !BER_BVISNULL( &upstream->c_sasl_bind_mech ) ) { - ber_memfree( upstream->c_sasl_bind_mech.bv_val ); - BER_BVZERO( &upstream->c_sasl_bind_mech ); - } - - CONNECTION_UNLOCK_INCREF(upstream); - tag = ber_peek_tag( ber, &len ); - CONNECTION_LOCK_DECREF(client); + CONNECTION_LOCK(client); - assert( client->c_state == LLOAD_C_BINDING && - BER_BVISNULL( &client->c_auth ) ); + assert( client->c_state == LLOAD_C_BINDING || + client->c_state == LLOAD_C_CLOSING ); + + assert( BER_BVISNULL( &client->c_auth ) ); if ( !BER_BVISNULL( &client->c_auth ) ) { ber_memfree( client->c_auth.bv_val ); BER_BVZERO( &client->c_auth ); @@ -788,8 +775,6 @@ handle_whoami_response( if ( tag == LDAP_TAG_EXOP_RES_VALUE ) { tag = ber_scanf( ber, "o", &client->c_auth ); if ( tag == LBER_ERROR ) { - operation_send_reject_locked( - op, LDAP_OTHER, "upstream protocol error", 0 ); CONNECTION_DESTROY(client); return -1; } @@ -797,13 +782,13 @@ handle_whoami_response( removed = tavl_delete( &client->c_ops, op, operation_client_cmp ); assert( !removed || op == removed ); + op->o_pin_id = 0; Debug( LDAP_DEBUG_TRACE, "handle_whoami_response: " "connid=%ld new authid=%s\n", client->c_connid, client->c_auth.bv_val ); if ( client->c_state == LLOAD_C_BINDING ) { - op->o_client = NULL; client->c_state = LLOAD_C_READY; client->c_type = LLOAD_C_OPEN; client->c_pin_id = 0; @@ -817,10 +802,11 @@ handle_whoami_response( } } - CONNECTION_UNLOCK_INCREF(client); + CONNECTION_UNLOCK(client); - /* defer the disposal of ber to operation_destroy_* */ + /* defer the disposal of ber to operation_destroy */ op->o_ber = ber; + return forward_final_response( client, op, saved_response ); } @@ -847,15 +833,21 @@ handle_vc_bind_response( tag = ber_peek_tag( ber, &len ); if ( result == LDAP_PROTOCOL_ERROR ) { - LloadConnection *upstream = op->o_upstream; - LloadBackend *b; + LloadConnection *upstream; - CONNECTION_LOCK(upstream); - b = (LloadBackend *)upstream->c_private; - Debug( LDAP_DEBUG_ANY, "handle_vc_bind_response: " - "VC extended operation not supported on backend %s\n", - b->b_uri.bv_val ); - CONNECTION_UNLOCK(upstream); + ldap_pvt_thread_mutex_lock( &op->o_link_mutex ); + upstream = op->o_upstream; + ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); + if ( upstream ) { + LloadBackend *b; + + CONNECTION_LOCK(upstream); + b = (LloadBackend *)upstream->c_private; + Debug( LDAP_DEBUG_ANY, "handle_vc_bind_response: " + "VC extended operation not supported on backend %s\n", + b->b_uri.bv_val ); + CONNECTION_UNLOCK(upstream); + } } Debug( LDAP_DEBUG_STATS, "handle_vc_bind_response: " @@ -872,7 +864,7 @@ handle_vc_bind_response( tag = ber_scanf( ber, "o", &client->c_vc_cookie ); if ( tag == LBER_ERROR ) { rc = -1; - CONNECTION_UNLOCK_INCREF(client); + CONNECTION_UNLOCK(client); goto done; } tag = ber_peek_tag( ber, &len ); @@ -882,7 +874,7 @@ handle_vc_bind_response( tag = ber_scanf( ber, "m", &creds ); if ( tag == LBER_ERROR ) { rc = -1; - CONNECTION_UNLOCK_INCREF(client); + CONNECTION_UNLOCK(client); goto done; } tag = ber_peek_tag( ber, &len ); @@ -892,7 +884,7 @@ handle_vc_bind_response( tag = ber_scanf( ber, "m", &controls ); if ( tag == LBER_ERROR ) { rc = -1; - CONNECTION_UNLOCK_INCREF(client); + CONNECTION_UNLOCK(client); goto done; } } @@ -928,7 +920,7 @@ handle_vc_bind_response( assert( client->c_state == LLOAD_C_INVALID || client->c_state == LLOAD_C_CLOSING ); } - CONNECTION_UNLOCK_INCREF(client); + CONNECTION_UNLOCK(client); ldap_pvt_thread_mutex_lock( &client->c_io_mutex ); output = client->c_pendingber; @@ -952,9 +944,7 @@ handle_vc_bind_response( } done: - CONNECTION_LOCK_DECREF(client); - operation_destroy_from_client( op ); - CONNECTION_UNLOCK_OR_DESTROY(client); + operation_unlink( op ); ber_free( ber, 1 ); return rc; } diff --git a/servers/lloadd/client.c b/servers/lloadd/client.c index bc1248488f..f03810a3b0 100644 --- a/servers/lloadd/client.c +++ b/servers/lloadd/client.c @@ -28,6 +28,8 @@ lload_c_head clients = LDAP_CIRCLEQ_HEAD_INITIALIZER( clients ); ldap_pvt_thread_mutex_t clients_mutex; +static void client_unlink( LloadConnection *upstream ); + int request_abandon( LloadConnection *c, LloadOperation *op ) { @@ -41,17 +43,19 @@ request_abandon( LloadConnection *c, LloadOperation *op ) "connid=%lu msgid=%d invalid integer sent in abandon request\n", c->c_connid, op->o_client_msgid ); - operation_destroy_from_client( op ); - CONNECTION_DESTROY(c); + operation_unlink( op ); + CONNECTION_LOCK_DESTROY(c); return -1; } + CONNECTION_LOCK(c); request = tavl_find( c->c_ops, &needle, operation_client_cmp ); if ( !request ) { Debug( LDAP_DEBUG_STATS, "request_abandon: " "connid=%lu msgid=%d requests abandon of an operation " "msgid=%d not being processed anymore\n", c->c_connid, op->o_client_msgid, needle.o_client_msgid ); + CONNECTION_UNLOCK(c); goto done; } else if ( request->o_tag == LDAP_REQ_BIND ) { /* RFC 4511 states we must not allow Abandon on Binds */ @@ -59,6 +63,7 @@ request_abandon( LloadConnection *c, LloadOperation *op ) "connid=%lu msgid=%d requests abandon of a bind operation " "msgid=%d\n", c->c_connid, op->o_client_msgid, needle.o_client_msgid ); + CONNECTION_UNLOCK(c); goto done; } Debug( LDAP_DEBUG_STATS, "request_abandon: " @@ -67,20 +72,14 @@ request_abandon( LloadConnection *c, LloadOperation *op ) lload_msgtype2str( request->o_tag ), needle.o_client_msgid ); if ( c->c_state == LLOAD_C_BINDING ) { - /* We have found the request and we are binding, it must be a bind - * request */ - assert( request->o_tag == LDAP_REQ_BIND ); - c->c_state = LLOAD_C_READY; + assert(0); } - /* operation_abandon requires a reference since it is passed with c unlocked */ - request->o_client_refcnt++; - CONNECTION_UNLOCK_INCREF(c); + CONNECTION_UNLOCK(c); operation_abandon( request ); - CONNECTION_LOCK_DECREF(c); done: - operation_destroy_from_client( op ); + operation_unlink( op ); return rc; } @@ -92,9 +91,6 @@ request_process( LloadConnection *client, LloadOperation *op ) ber_int_t msgid; int res, rc = LDAP_SUCCESS; - op->o_client_refcnt++; - CONNECTION_UNLOCK_INCREF(client); - upstream = backend_select( op, &res ); if ( !upstream ) { Debug( LDAP_DEBUG_STATS, "request_process: " @@ -110,16 +106,29 @@ request_process( LloadConnection *client, LloadOperation *op ) output = upstream->c_pendingber; if ( output == NULL && (output = ber_alloc()) == NULL ) { + LloadBackend *b = upstream->c_private; + + upstream->c_n_ops_executing--; + CONNECTION_UNLOCK(upstream); + ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex ); + + ldap_pvt_thread_mutex_lock( &b->b_mutex ); + b->b_n_ops_executing--; + operation_update_backend_counters( op, b ); + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + + Debug( LDAP_DEBUG_ANY, "request_process: " + "ber_alloc failed\n" ); + rc = -1; goto fail; } upstream->c_pendingber = output; - CONNECTION_LOCK_DECREF(upstream); op->o_upstream_msgid = msgid = upstream->c_next_msgid++; rc = tavl_insert( &upstream->c_ops, op, operation_upstream_cmp, avl_dup_error ); - CONNECTION_UNLOCK_INCREF(upstream); + CONNECTION_UNLOCK(upstream); Debug( LDAP_DEBUG_TRACE, "request_process: " "client connid=%lu added %s msgid=%d to upstream connid=%lu as " @@ -132,7 +141,7 @@ request_process( LloadConnection *client, LloadOperation *op ) if ( (lload_features & LLOAD_FEATURE_PROXYAUTHZ) && client->c_type != LLOAD_C_PRIVILEGED ) { - CONNECTION_LOCK_DECREF(client); + CONNECTION_LOCK(client); Debug( LDAP_DEBUG_TRACE, "request_process: " "proxying identity %s to upstream\n", client->c_auth.bv_val ); @@ -141,7 +150,7 @@ request_process( LloadConnection *client, LloadOperation *op ) op->o_tag, &op->o_request, LDAP_TAG_CONTROLS, LDAP_CONTROL_PROXY_AUTHZ, 1, &client->c_auth ); - CONNECTION_UNLOCK_INCREF(client); + CONNECTION_UNLOCK(client); if ( !BER_BVISNULL( &op->o_ctrls ) ) { ber_write( output, op->o_ctrls.bv_val, op->o_ctrls.bv_len, 0 ); @@ -157,37 +166,18 @@ request_process( LloadConnection *client, LloadOperation *op ) ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex ); connection_write_cb( -1, 0, upstream ); - - CONNECTION_LOCK_DECREF(upstream); - CONNECTION_UNLOCK_OR_DESTROY(upstream); - - CONNECTION_LOCK_DECREF(client); - if ( !--op->o_client_refcnt ) { - operation_destroy_from_client( op ); - } return rc; fail: if ( upstream ) { - LloadBackend *b; - - ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex ); - CONNECTION_LOCK_DECREF(upstream); - upstream->c_n_ops_executing--; - b = (LloadBackend *)upstream->c_private; - CONNECTION_UNLOCK_OR_DESTROY(upstream); - - ldap_pvt_thread_mutex_lock( &b->b_mutex ); - b->b_n_ops_executing--; - ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + CONNECTION_LOCK_DESTROY(upstream); operation_send_reject( op, LDAP_OTHER, "internal error", 0 ); } - CONNECTION_LOCK_DECREF(client); - op->o_client_refcnt--; - operation_destroy_from_client( op ); + + operation_unlink( op ); if ( rc ) { - CONNECTION_DESTROY(client); + CONNECTION_LOCK_DESTROY(client); } return rc; } @@ -202,6 +192,7 @@ handle_one_request( LloadConnection *c ) ber = c->c_currentber; c->c_currentber = NULL; + CONNECTION_LOCK(c); op = operation_init( c, ber ); if ( !op ) { Debug( LDAP_DEBUG_ANY, "handle_one_request: " @@ -211,16 +202,18 @@ handle_one_request( LloadConnection *c ) ber_free( ber, 1 ); return -1; } + CONNECTION_UNLOCK(c); switch ( op->o_tag ) { case LDAP_REQ_UNBIND: /* There is never a response for this operation */ op->o_res = LLOAD_OP_COMPLETED; - operation_destroy_from_client( op ); + operation_unlink( op ); + Debug( LDAP_DEBUG_STATS, "handle_one_request: " "received unbind, closing client connid=%lu\n", c->c_connid ); - CONNECTION_DESTROY(c); + CONNECTION_LOCK_DESTROY(c); return -1; case LDAP_REQ_BIND: handler = request_bind; @@ -234,16 +227,18 @@ handle_one_request( LloadConnection *c ) break; default: if ( c->c_state == LLOAD_C_BINDING ) { - return operation_send_reject_locked( + operation_send_reject( op, LDAP_PROTOCOL_ERROR, "bind in progress", 0 ); + return LDAP_SUCCESS; } handler = request_process; break; } if ( c->c_state == LLOAD_C_CLOSING ) { - return operation_send_reject_locked( + operation_send_reject( op, LDAP_UNAVAILABLE, "connection is shutting down", 0 ); + return LDAP_SUCCESS; } return handler( c, op ); @@ -256,9 +251,9 @@ void client_tls_handshake_cb( evutil_socket_t s, short what, void *arg ) { LloadConnection *c = arg; + epoch_t epoch; int rc = 0; - CONNECTION_LOCK_DECREF(c); if ( what & EV_TIMEOUT ) { Debug( LDAP_DEBUG_CONNS, "client_tls_handshake_cb: " "connid=%lu, timeout reached, destroying\n", @@ -274,27 +269,26 @@ client_tls_handshake_cb( evutil_socket_t s, short what, void *arg ) ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); if ( c->c_pendingber ) { ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); - CONNECTION_UNLOCK_INCREF(c); connection_write_cb( s, what, arg ); - ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); - CONNECTION_LOCK_DECREF(c); + CONNECTION_LOCK(c); if ( !c->c_live ) { - ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + CONNECTION_UNLOCK(c); goto fail; } + CONNECTION_UNLOCK(c); /* Do we still have data pending? If so, connection_write_cb would * already have arranged the write callback to trigger again */ + ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); if ( c->c_pendingber ) { ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); - CONNECTION_UNLOCK_INCREF(c); return; } } - ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); rc = ldap_pvt_tls_accept( c->c_sb, LLOAD_TLS_CTX ); + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); if ( rc < 0 ) { goto fail; } @@ -308,13 +302,16 @@ client_tls_handshake_cb( evutil_socket_t s, short what, void *arg ) * This is deadlock-safe, since both share the same base - the one * that's just running us. */ + CONNECTION_LOCK(c); event_del( c->c_read_event ); event_del( c->c_write_event ); c->c_read_timeout = NULL; event_assign( c->c_read_event, base, c->c_fd, EV_READ|EV_PERSIST, connection_read_cb, c ); - event_add( c->c_read_event, c->c_read_timeout ); + if ( c->c_live ) { + event_add( c->c_read_event, c->c_read_timeout ); + } event_assign( c->c_write_event, base, c->c_fd, EV_WRITE, connection_write_cb, c ); @@ -323,24 +320,29 @@ client_tls_handshake_cb( evutil_socket_t s, short what, void *arg ) c->c_connid ); c->c_is_tls = LLOAD_TLS_ESTABLISHED; - - /* The temporary reference established for us is no longer needed */ - CONNECTION_UNLOCK_OR_DESTROY(c); + CONNECTION_UNLOCK(c); return; } else if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_WRITE, NULL ) ) { - event_add( c->c_write_event, lload_write_timeout ); + CONNECTION_LOCK(c); + if ( c->c_live ) { + event_add( c->c_write_event, lload_write_timeout ); + } + CONNECTION_UNLOCK(c); Debug( LDAP_DEBUG_CONNS, "client_tls_handshake_cb: " "connid=%lu need write rc=%d\n", c->c_connid, rc ); } - CONNECTION_UNLOCK_INCREF(c); return; fail: Debug( LDAP_DEBUG_CONNS, "client_tls_handshake_cb: " "connid=%lu failed rc=%d\n", c->c_connid, rc ); - CONNECTION_DESTROY(c); + + assert( c->c_ops == NULL ); + epoch = epoch_join(); + CONNECTION_LOCK_DESTROY(c); + epoch_leave( epoch ); } LloadConnection * @@ -379,11 +381,11 @@ client_init( Debug( LDAP_DEBUG_CONNS, "client_init: " "connid=%lu failed initial TLS accept rc=%d\n", c->c_connid, rc ); + CONNECTION_LOCK(c); goto fail; } if ( rc ) { - c->c_refcnt++; c->c_read_timeout = lload_timeout_net; read_cb = write_cb = client_tls_handshake_cb; } @@ -393,30 +395,32 @@ client_init( if ( !event ) { Debug( LDAP_DEBUG_ANY, "client_init: " "Read event could not be allocated\n" ); + CONNECTION_LOCK(c); goto fail; } c->c_read_event = event; - event_add( c->c_read_event, c->c_read_timeout ); event = event_new( base, s, EV_WRITE, write_cb, c ); if ( !event ) { Debug( LDAP_DEBUG_ANY, "client_init: " "Write event could not be allocated\n" ); + CONNECTION_LOCK(c); goto fail; } - /* We only register the write event when we have data pending */ c->c_write_event = event; c->c_private = listener; c->c_destroy = client_destroy; + c->c_unlink = client_unlink; c->c_pdu_cb = handle_one_request; - /* There should be no lock inversion yet since no other thread could - * approach it from clients side */ + CONNECTION_LOCK(c); + /* We only register the write event when we have data pending */ + event_add( c->c_read_event, c->c_read_timeout ); + ldap_pvt_thread_mutex_lock( &clients_mutex ); LDAP_CIRCLEQ_INSERT_TAIL( &clients, c, c_next ); ldap_pvt_thread_mutex_unlock( &clients_mutex ); - CONNECTION_UNLOCK(c); return c; @@ -431,8 +435,9 @@ fail: } c->c_state = LLOAD_C_INVALID; - CONNECTION_DESTROY(c); - assert( c == NULL ); + c->c_live--; + c->c_refcnt--; + connection_destroy( c ); return NULL; } @@ -444,19 +449,6 @@ client_reset( LloadConnection *c ) root = c->c_ops; c->c_ops = NULL; - /* unless op->o_client_refcnt > op->o_client_live, there is noone using the - * operation from the client side and noone new will now that we've removed - * it from client's c_ops */ - if ( root ) { - TAvlnode *node = tavl_end( root, TAVL_DIR_LEFT ); - do { - LloadOperation *op = node->avl_data; - - /* make sure it's useable after we've unlocked the connection */ - op->o_client_refcnt++; - } while ( (node = tavl_next( node, TAVL_DIR_RIGHT )) ); - } - if ( !BER_BVISNULL( &c->c_auth ) ) { ch_free( c->c_auth.bv_val ); BER_BVZERO( &c->c_auth ); @@ -465,7 +457,7 @@ client_reset( LloadConnection *c ) ch_free( c->c_sasl_bind_mech.bv_val ); BER_BVZERO( &c->c_sasl_bind_mech ); } - CONNECTION_UNLOCK_INCREF(c); + CONNECTION_UNLOCK(c); if ( root ) { int freed; @@ -475,38 +467,29 @@ client_reset( LloadConnection *c ) freed ); } - CONNECTION_LOCK_DECREF(c); + CONNECTION_LOCK(c); } void -client_destroy( LloadConnection *c ) +client_unlink( LloadConnection *c ) { enum sc_state state; struct event *read_event, *write_event; - Debug( LDAP_DEBUG_CONNS, "client_destroy: " - "destroying client connid=%lu\n", + Debug( LDAP_DEBUG_CONNS, "client_unlink: " + "removing client connid=%lu\n", c->c_connid ); assert( c->c_state != LLOAD_C_INVALID ); + assert( c->c_state != LLOAD_C_DYING ); + state = c->c_state; - c->c_state = LLOAD_C_INVALID; + c->c_state = LLOAD_C_DYING; read_event = c->c_read_event; write_event = c->c_write_event; + CONNECTION_UNLOCK(c); - /* - * FIXME: operation_destroy_from_upstream might copy op->o_client and bump - * c_refcnt, it is then responsible to call destroy_client again, does that - * mean that we can be triggered for recursion over all connections? - */ - CONNECTION_UNLOCK_INCREF(c); - - /* - * Avoid a deadlock: - * event_del will block if the event is currently executing its callback, - * that callback might be waiting to lock c->c_mutex - */ if ( read_event ) { event_del( read_event ); } @@ -521,7 +504,20 @@ client_destroy( LloadConnection *c ) ldap_pvt_thread_mutex_unlock( &clients_mutex ); } - CONNECTION_LOCK_DECREF(c); + CONNECTION_LOCK(c); + client_reset( c ); +} + +void +client_destroy( LloadConnection *c ) +{ + Debug( LDAP_DEBUG_CONNS, "client_destroy: " + "destroying client connid=%lu\n", + c->c_connid ); + + CONNECTION_LOCK(c); + assert( c->c_state == LLOAD_C_DYING ); + c->c_state = LLOAD_C_INVALID; if ( c->c_read_event ) { event_free( c->c_read_event ); @@ -533,23 +529,7 @@ client_destroy( LloadConnection *c ) c->c_write_event = NULL; } - client_reset( c ); - - /* - * If we attempted to destroy any operations, we might have lent a new - * refcnt token for a thread that raced us to that, let them call us again - * later - */ - assert( c->c_refcnt >= 0 ); - if ( c->c_refcnt ) { - c->c_state = LLOAD_C_DYING; - Debug( LDAP_DEBUG_CONNS, "client_destroy: " - "connid=%lu aborting with refcnt=%d\n", - c->c_connid, c->c_refcnt ); - CONNECTION_UNLOCK(c); - return; - } - + assert( c->c_refcnt == 0 ); connection_destroy( c ); } diff --git a/servers/lloadd/connection.c b/servers/lloadd/connection.c index e8e805c467..e1b4c39019 100644 --- a/servers/lloadd/connection.c +++ b/servers/lloadd/connection.c @@ -41,15 +41,12 @@ #include "lutil.h" #include "lutil_ldap.h" -static ldap_pvt_thread_mutex_t conn_nextid_mutex; static unsigned long conn_nextid = 0; static void lload_connection_assign_nextid( LloadConnection *conn ) { - ldap_pvt_thread_mutex_lock( &conn_nextid_mutex ); - conn->c_connid = conn_nextid++; - ldap_pvt_thread_mutex_unlock( &conn_nextid_mutex ); + conn->c_connid = __atomic_fetch_add( &conn_nextid, 1, __ATOMIC_RELAXED ); } /* @@ -73,37 +70,40 @@ handle_pdus( void *ctx, void *arg ) { LloadConnection *c = arg; int pdus_handled = 0; + epoch_t epoch; - CONNECTION_LOCK_DECREF(c); + /* A reference was passed on to us */ + assert( IS_ALIVE( c, c_refcnt ) ); + + epoch = epoch_join(); for ( ;; ) { BerElement *ber; ber_tag_t tag; ber_len_t len; - /* handle_one_response may unlock the connection in the process, we - * need to expect that might be our responsibility to destroy it */ if ( c->c_pdu_cb( c ) ) { - /* Error, connection is unlocked and might already have been - * destroyed */ - return NULL; + /* Error/reset, get rid ouf our reference and bail */ + goto done; } - /* Otherwise, handle_one_request leaves the connection locked */ if ( ++pdus_handled >= lload_conn_max_pdus_per_cycle ) { /* Do not read now, re-enable read event instead */ break; } - if ( (ber = ber_alloc()) == NULL ) { + ber = c->c_currentber; + if ( ber == NULL && (ber = ber_alloc()) == NULL ) { Debug( LDAP_DEBUG_ANY, "handle_pdus: " "connid=%lu, ber_alloc failed\n", c->c_connid ); - CONNECTION_DESTROY(c); - return NULL; + CONNECTION_LOCK_DESTROY(c); + goto done; } c->c_currentber = ber; + ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); tag = ber_get_next( c->c_sb, &len, ber ); + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); if ( tag != LDAP_TAG_MESSAGE ) { int err = sock_errno(); @@ -123,8 +123,8 @@ handle_pdus( void *ctx, void *arg ) c->c_currentber = NULL; ber_free( ber, 1 ); - CONNECTION_DESTROY(c); - return NULL; + CONNECTION_LOCK_DESTROY(c); + goto done; } break; } @@ -134,7 +134,9 @@ handle_pdus( void *ctx, void *arg ) Debug( LDAP_DEBUG_CONNS, "handle_pdus: " "re-enabled read event on connid=%lu\n", c->c_connid ); - CONNECTION_UNLOCK_OR_DESTROY(c); +done: + RELEASE_REF( c, c_refcnt, c->c_destroy ); + epoch_leave( epoch ); return NULL; } @@ -152,25 +154,35 @@ connection_read_cb( evutil_socket_t s, short what, void *arg ) BerElement *ber; ber_tag_t tag; ber_len_t len; + epoch_t epoch; CONNECTION_LOCK(c); if ( !c->c_live ) { event_del( c->c_read_event ); + CONNECTION_UNLOCK(c); Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " "suspended read event on a dead connid=%lu\n", c->c_connid ); - CONNECTION_UNLOCK(c); return; } + CONNECTION_UNLOCK(c); if ( what & EV_TIMEOUT ) { Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " "connid=%lu, timeout reached, destroying\n", c->c_connid ); - CONNECTION_DESTROY(c); + /* Make sure the connection stays around for us to unlock it */ + epoch = epoch_join(); + CONNECTION_LOCK_DESTROY(c); + epoch_leave( epoch ); return; } + if ( !acquire_ref( &c->c_refcnt ) ) { + return; + } + epoch = epoch_join(); + Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " "connection connid=%lu ready to read\n", c->c_connid ); @@ -180,12 +192,14 @@ connection_read_cb( evutil_socket_t s, short what, void *arg ) Debug( LDAP_DEBUG_ANY, "connection_read_cb: " "connid=%lu, ber_alloc failed\n", c->c_connid ); - CONNECTION_DESTROY(c); - return; + goto out; } c->c_currentber = ber; + ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); tag = ber_get_next( c->c_sb, &len, ber ); + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + if ( tag != LDAP_TAG_MESSAGE ) { int err = sock_errno(); @@ -210,65 +224,78 @@ connection_read_cb( evutil_socket_t s, short what, void *arg ) Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " "suspended read event on dying connid=%lu\n", c->c_connid ); - CONNECTION_DESTROY(c); - return; + CONNECTION_LOCK_DESTROY(c); + goto out; } event_add( c->c_read_event, c->c_read_timeout ); Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " "re-enabled read event on connid=%lu\n", c->c_connid ); - CONNECTION_UNLOCK(c); - return; - } - - if ( !lload_conn_max_pdus_per_cycle || - ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) { - /* If we're overloaded or configured as such, process one and resume in - * the next cycle. - * - * handle_one_request re-locks the mutex in the - * process, need to test it's still alive */ - if ( c->c_pdu_cb( c ) == LDAP_SUCCESS ) { - CONNECTION_UNLOCK_OR_DESTROY(c); - } - return; + goto out; } event_del( c->c_read_event ); + if ( !lload_conn_max_pdus_per_cycle || + ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) { + /* If we're overloaded or configured as such, process one and resume in + * the next cycle. */ + event_add( c->c_read_event, c->c_read_timeout ); + c->c_pdu_cb( c ); + goto out; + } + Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " "suspended read event on connid=%lu\n", c->c_connid ); - /* We have scheduled a call to handle_requests which takes care of - * handling further requests, just make sure the connection sticks around - * for that */ - CONNECTION_UNLOCK_INCREF(c); + /* + * We have scheduled a call to handle_pdus to take care of handling this + * and further requests, its reference is now owned by that task. + */ + epoch_leave( epoch ); return; + +out: + RELEASE_REF( c, c_refcnt, c->c_destroy ); + epoch_leave( epoch ); } void connection_write_cb( evutil_socket_t s, short what, void *arg ) { LloadConnection *c = arg; + epoch_t epoch; CONNECTION_LOCK(c); + Debug( LDAP_DEBUG_CONNS, "connection_write_cb: " + "considering writing to%s connid=%lu what=%hd\n", + c->c_live ? " live" : " dead", c->c_connid, what ); if ( !c->c_live ) { CONNECTION_UNLOCK(c); return; } + CONNECTION_UNLOCK(c); if ( what & EV_TIMEOUT ) { Debug( LDAP_DEBUG_CONNS, "connection_write_cb: " "connid=%lu, timeout reached, destroying\n", c->c_connid ); - CONNECTION_DESTROY(c); + /* Make sure the connection stays around for us to unlock it */ + epoch = epoch_join(); + CONNECTION_LOCK_DESTROY(c); + epoch_leave( epoch ); return; } - CONNECTION_UNLOCK_INCREF(c); /* Before we acquire any locks */ event_del( c->c_write_event ); + if ( !acquire_ref( &c->c_refcnt ) ) { + return; + } + + epoch = epoch_join(); + ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); Debug( LDAP_DEBUG_CONNS, "connection_write_cb: " "have something to write to connection connid=%lu\n", @@ -285,7 +312,7 @@ connection_write_cb( evutil_socket_t s, short what, void *arg ) "ber_flush on fd=%d failed errno=%d (%s)\n", c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) ); CONNECTION_LOCK_DESTROY(c); - return; + goto done; } event_add( c->c_write_event, lload_write_timeout ); } else { @@ -293,8 +320,9 @@ connection_write_cb( evutil_socket_t s, short what, void *arg ) } ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); - CONNECTION_LOCK_DECREF(c); - CONNECTION_UNLOCK_OR_DESTROY(c); +done: + RELEASE_REF( c, c_refcnt, c->c_destroy ); + epoch_leave( epoch ); } void @@ -356,85 +384,54 @@ connections_walk_last( CONNCB cb, void *arg ) { - LloadConnection *c, *old; - unsigned long last_connid; + LloadConnection *c = cq_last; + uintptr_t last_connid; if ( LDAP_CIRCLEQ_EMPTY( cq ) ) { return; } - last_connid = cq_last->c_connid; - c = LDAP_CIRCLEQ_LOOP_NEXT( cq, cq_last, c_next ); - assert( c->c_connid <= last_connid ); + last_connid = c->c_connid; + c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next ); - CONNECTION_LOCK(c); - ldap_pvt_thread_mutex_unlock( cq_mutex ); + while ( !acquire_ref( &c->c_refcnt ) ) { + c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next ); + if ( c->c_connid >= last_connid ) { + return; + } + } /* - * Ugh... concurrency is annoying: + * Notes: * - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid * order - * - the connection with the highest c_connid is maintained at cq_last + * - the connection with the highest c_connid is passed in cq_last * - we can only use cq when we hold cq_mutex * - connections might be added to or removed from cq while we're busy * processing connections - * - connection_destroy touches cq - * - we can't even hold locks of two different connections * - we need a way to detect we've finished looping around cq for some * definition of looping around - * - * So as a result, 90% of the code below is spent navigating that... */ - while ( c->c_connid <= last_connid ) { - /* Do not permit the callback to actually free the connection even if - * it wants to, we need it to traverse cq */ - c->c_refcnt++; - if ( cb( c, arg ) ) { - c->c_refcnt--; - break; - } - c->c_refcnt--; + do { + int rc; - if ( c->c_connid == last_connid ) { - break; - } - - CONNECTION_UNLOCK_INCREF(c); - - ldap_pvt_thread_mutex_lock( cq_mutex ); - old = c; -retry: - c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next ); - - if ( c->c_connid <= old->c_connid ) { - ldap_pvt_thread_mutex_unlock( cq_mutex ); - - CONNECTION_LOCK_DECREF(old); - CONNECTION_UNLOCK_OR_DESTROY(old); - - ldap_pvt_thread_mutex_lock( cq_mutex ); - return; - } - - CONNECTION_LOCK(c); - assert( c->c_state != LLOAD_C_DYING ); - if ( c->c_state == LLOAD_C_INVALID ) { - /* This dying connection will be unlinked once we release cq_mutex - * and it wouldn't be safe to iterate further, skip over it */ - CONNECTION_UNLOCK(c); - goto retry; - } - CONNECTION_UNLOCK_INCREF(c); ldap_pvt_thread_mutex_unlock( cq_mutex ); - CONNECTION_LOCK_DECREF(old); - CONNECTION_UNLOCK_OR_DESTROY(old); + rc = cb( c, arg ); + RELEASE_REF( c, c_refcnt, c->c_destroy ); - CONNECTION_LOCK_DECREF(c); - assert( c->c_state != LLOAD_C_DYING ); - assert( c->c_state != LLOAD_C_INVALID ); - } - CONNECTION_UNLOCK_OR_DESTROY(c); - ldap_pvt_thread_mutex_lock( cq_mutex ); + ldap_pvt_thread_mutex_lock( cq_mutex ); + if ( rc || LDAP_CIRCLEQ_EMPTY( cq ) ) { + break; + } + + do { + LloadConnection *old = c; + c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next ); + if ( c->c_connid <= old->c_connid || c->c_connid > last_connid ) { + return; + } + } while ( !acquire_ref( &c->c_refcnt ) ); + } while ( c->c_connid <= last_connid ); } void @@ -448,44 +445,44 @@ connections_walk( return connections_walk_last( cq_mutex, cq, cq_last, cb, arg ); } -/* - * Caller is expected to hold the lock. - */ int lload_connection_close( LloadConnection *c, void *arg ) { - TAvlnode *node; int gentle = *(int *)arg; + LloadOperation *op; - if ( !c->c_live ) { - return LDAP_SUCCESS; - } + Debug( LDAP_DEBUG_CONNS, "lload_connection_close: " + "marking connection connid=%lu closing\n", + c->c_connid ); - if ( !gentle ) { - /* Caller has a reference on this connection, - * it doesn't actually die here */ + /* We were approached from the connection list */ + assert( IS_ALIVE( c, c_refcnt ) ); + + CONNECTION_LOCK(c); + if ( !gentle || !c->c_ops ) { CONNECTION_DESTROY(c); - assert( c ); - CONNECTION_LOCK(c); return LDAP_SUCCESS; } /* The first thing we do is make sure we don't get new Operations in */ c->c_state = LLOAD_C_CLOSING; - for ( node = tavl_end( c->c_ops, TAVL_DIR_LEFT ); node; - node = tavl_next( node, TAVL_DIR_RIGHT ) ) { - LloadOperation *op = node->avl_data; + do { + TAvlnode *node = tavl_end( c->c_ops, TAVL_DIR_LEFT ); + op = node->avl_data; - if ( op->o_client_msgid == 0 ) { - if ( op->o_client == c ) { - operation_destroy_from_client( op ); - } else { - assert( op->o_upstream == c ); - operation_destroy_from_upstream( op ); - } + /* Close operations that would need client action to resolve, + * only SASL binds in progress do that right now */ + if ( op->o_client_msgid || op->o_upstream_msgid ) { + break; } - } + + CONNECTION_UNLOCK(c); + operation_unlink( op ); + CONNECTION_LOCK(c); + } while ( c->c_ops ); + + CONNECTION_UNLOCK(c); return LDAP_SUCCESS; } @@ -550,7 +547,6 @@ lload_connection_init( ber_socket_t s, const char *peername, int flags ) "connection connid=%lu allocated for socket fd=%d peername=%s\n", c->c_connid, s, peername ); - CONNECTION_LOCK(c); c->c_state = LLOAD_C_ACTIVE; return c; diff --git a/servers/lloadd/daemon.c b/servers/lloadd/daemon.c index 80c61972ff..21847f21e8 100644 --- a/servers/lloadd/daemon.c +++ b/servers/lloadd/daemon.c @@ -759,6 +759,7 @@ lloadd_listeners_init( const char *urls ) int lloadd_daemon_destroy( void ) { + epoch_shutdown(); if ( lloadd_inited ) { int i; @@ -1674,8 +1675,7 @@ lload_handle_global_invalidation( LloadChange *change ) LloadConnection *next = LDAP_CIRCLEQ_LOOP_NEXT( &clients, c, c_next ); if ( c->c_is_tls ) { - CONNECTION_LOCK(c); - CONNECTION_DESTROY(c); + CONNECTION_LOCK_DESTROY(c); assert( c == NULL ); } c = next; diff --git a/servers/lloadd/epoch.c b/servers/lloadd/epoch.c new file mode 100644 index 0000000000..790a296c86 --- /dev/null +++ b/servers/lloadd/epoch.c @@ -0,0 +1,228 @@ +/* epoch.c - epoch based memory reclamation */ +/* $OpenLDAP$ */ +/* This work is part of OpenLDAP Software . + * + * Copyright 2018-2020 The OpenLDAP Foundation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted only as authorized by the OpenLDAP + * Public License. + * + * A copy of this license is available in the file LICENSE in the + * top-level directory of the distribution or, alternatively, at + * . + */ + +/** @file epoch.c + * + * Implementation of epoch based memory reclamation, in principle + * similar to the algorithm presented in + * https://www.cl.cam.ac.uk/techreports/UCAM-CL-TR-579.pdf + * + * Not completely lock-free at the moment. + * + * Also the problems with epoch based memory reclamation are still + * present - a thread actively observing an epoch getting stuck will + * prevent managed objects (in our case connections and operations) + * from being freed, potentially running out of memory. + */ + +#include "portable.h" + +#include "lload.h" +#include + +/* Has to be >= 3 */ +#define EPOCH_MASK ( 1 << 2 ) +#define EPOCH_PREV(epoch) ( ( (epoch) + EPOCH_MASK - 1 ) % EPOCH_MASK ) +#define EPOCH_NEXT(epoch) ( ( (epoch) + 1 ) % EPOCH_MASK ) + +struct pending_ref { + void *object; + dispose_cb *dispose; + struct pending_ref *next; +}; + +ldap_pvt_thread_rdwr_t epoch_mutex; + +static epoch_t current_epoch; +static uintptr_t epoch_threads[EPOCH_MASK]; +static struct pending_ref *references[EPOCH_MASK]; + +void +epoch_init( void ) +{ + epoch_t epoch; + + current_epoch = 0; + for ( epoch = 0; epoch < EPOCH_MASK; epoch++ ) { + assert( !epoch_threads[epoch] ); + assert( !references[epoch] ); + } + + ldap_pvt_thread_rdwr_init( &epoch_mutex ); +} + +void +epoch_shutdown( void ) +{ + epoch_t epoch; + struct pending_ref *old, *next; + + for ( epoch = 0; epoch < EPOCH_MASK; epoch++ ) { + assert( !epoch_threads[epoch] ); + } + + /* Free pending references */ + epoch = EPOCH_PREV(current_epoch); + next = references[epoch]; + references[epoch] = NULL; + for ( old = next; old; old = next ) { + next = old->next; + + old->dispose( old->object ); + ch_free( old ); + } + + epoch = current_epoch; + next = references[epoch]; + references[epoch] = NULL; + for ( old = next; old; old = next ) { + next = old->next; + + old->dispose( old->object ); + ch_free( old ); + } + + /* No references should exist anywhere now */ + for ( epoch = 0; epoch < EPOCH_MASK; epoch++ ) { + assert( !references[epoch] ); + } + + ldap_pvt_thread_rdwr_destroy( &epoch_mutex ); +} + +epoch_t +epoch_join( void ) +{ + epoch_t epoch; + struct pending_ref *old, *ref = NULL; + + /* TODO: make this completely lock-free */ + ldap_pvt_thread_rdwr_rlock( &epoch_mutex ); + epoch = current_epoch; + __atomic_add_fetch( &epoch_threads[epoch], 1, __ATOMIC_ACQ_REL ); + ldap_pvt_thread_rdwr_runlock( &epoch_mutex ); + + if ( __atomic_load_n( + &epoch_threads[EPOCH_PREV(epoch)], __ATOMIC_ACQUIRE ) ) { + return epoch; + } + + __atomic_exchange( + &references[EPOCH_PREV(epoch)], &ref, &ref, __ATOMIC_ACQ_REL ); + + Debug( LDAP_DEBUG_TRACE, "epoch_join: " + "advancing epoch to %zu with %s objects to free\n", + EPOCH_NEXT(epoch), ref ? "some" : "no" ); + + ldap_pvt_thread_rdwr_wlock( &epoch_mutex ); + current_epoch = EPOCH_NEXT(epoch); + ldap_pvt_thread_rdwr_wunlock( &epoch_mutex ); + + for ( old = ref; old; old = ref ) { + ref = old->next; + + old->dispose( old->object ); + ch_free( old ); + } + + return epoch; +} + +void +epoch_leave( epoch_t epoch ) +{ + __atomic_sub_fetch( &epoch_threads[epoch], 1, __ATOMIC_ACQ_REL ); +} + +/* + * Add the object to the "current global epoch", not the epoch our thread + * entered. + */ +void +epoch_append( void *ptr, dispose_cb *cb ) +{ + struct pending_ref *new; + epoch_t epoch = __atomic_load_n( ¤t_epoch, __ATOMIC_ACQUIRE ); + + /* + * BTW, the following is not appropriate here: + * assert( __atomic_load_n( &epoch_threads[epoch], __ATOMIC_RELAXED ) ); + * + * We might be a thread lagging behind in the "previous epoch" with no + * other threads executing at all. + */ + + new = ch_malloc( sizeof(struct pending_ref) ); + new->object = ptr; + new->dispose = cb; + new->next = __atomic_load_n( &references[epoch], __ATOMIC_ACQUIRE ); + + while ( !__atomic_compare_exchange( &references[epoch], &new->next, &new, 0, + __ATOMIC_RELEASE, __ATOMIC_RELAXED ) ) + /* iterate until we succeed */; +} + +int +acquire_ref( uintptr_t *refp ) +{ + uintptr_t refcnt, new_refcnt; + + refcnt = __atomic_load_n( refp, __ATOMIC_ACQUIRE ); + + /* + * If we just incremented the refcnt and checked for zero after, another + * thread might falsely believe the object was going to stick around. + * + * Checking whether the object is still dead at disposal time might not be + * able to distinguish it from being freed in a later epoch. + */ + do { + if ( !refcnt ) { + return refcnt; + } + + new_refcnt = refcnt + 1; + } while ( !__atomic_compare_exchange( refp, &refcnt, &new_refcnt, 0, + __ATOMIC_RELEASE, __ATOMIC_RELAXED ) ); + assert( new_refcnt == refcnt + 1 ); + + return refcnt; +} + +int +try_release_ref( uintptr_t *refp, void *object, dispose_cb *cb ) +{ + uintptr_t refcnt, new_refcnt; + + refcnt = __atomic_load_n( refp, __ATOMIC_ACQUIRE ); + + /* We promise the caller that we won't decrease refcnt below 0 */ + do { + if ( !refcnt ) { + return refcnt; + } + + new_refcnt = refcnt - 1; + } while ( !__atomic_compare_exchange( refp, &refcnt, &new_refcnt, 0, + __ATOMIC_RELEASE, __ATOMIC_RELAXED ) ); + assert( new_refcnt == refcnt - 1 ); + + if ( !new_refcnt ) { + epoch_append( object, cb ); + } + + return refcnt; +} diff --git a/servers/lloadd/epoch.h b/servers/lloadd/epoch.h new file mode 100644 index 0000000000..b5ae045008 --- /dev/null +++ b/servers/lloadd/epoch.h @@ -0,0 +1,143 @@ +/* epoch.h - epoch based memory reclamation */ +/* $OpenLDAP$ */ +/* This work is part of OpenLDAP Software . + * + * Copyright 2018-2020 The OpenLDAP Foundation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted only as authorized by the OpenLDAP + * Public License. + * + * A copy of this license is available in the file LICENSE in the + * top-level directory of the distribution or, alternatively, at + * . + */ + +#ifndef __LLOAD_EPOCH_H +#define __LLOAD_EPOCH_H + +/** @file epoch.h + * + * Implementation of epoch based memory reclamation, in principle + * similar to the algorithm presented in + * https://www.cl.cam.ac.uk/techreports/UCAM-CL-TR-579.pdf + */ + +typedef uintptr_t epoch_t; + +/** @brief A callback function used to free object and associated data */ +typedef void (dispose_cb)( void *object ); + +/** @brief Initiate global state */ +void epoch_init( void ); + +/** @brief Finalise global state and free any objects still pending */ +void epoch_shutdown( void ); + +/** @brief Register thread as active + * + * In order to safely access managed objects, a thread should call + * this function or make sure no other thread is running (e.g. config + * pause, late shutdown). After calling this, it is guaranteed that no + * reachable objects will be freed before all threads have called + * `epoch_leave( current_epoch + 1 )` so it is essential that there + * is an upper limit to the amount of time between #epoch_join and + * corresponding #epoch_leave or the number of unfreed objects might + * grow without bounds. + * + * To simplify locking, memory is only freed when the current epoch + * is advanced rather than on leaving it. + * + * Can be safely called multiple times by the same thread as long as + * a matching #epoch_leave() call is made eventually. + * + * @return The observed epoch, to be passed to #epoch_leave() + */ +epoch_t epoch_join( void ); + +/** @brief Register thread as inactive + * + * A thread should call this after they are finished with work + * performed since matching call to #epoch_join(). It is not safe + * to keep a local reference to managed objects after this call + * unless other precautions have been made to prevent it being + * released. + * + * @param[in] epoch Epoch identifier returned by a previous call to + * #epoch_join(). + */ +void epoch_leave( epoch_t epoch ); + +/** @brief Return an unreachable object to be freed + * + * The object should already be unreachable at the point of call and + * cb will be invoked when no other thread that could have seen it + * is active any more. This happens when we have advanced by two + * epochs. + * + * @param[in] ptr Object to be released/freed + * @param[in] cb Callback to invoke when safe to do so + */ +void epoch_append( void *ptr, dispose_cb *cb ); + +/** + * \defgroup Reference counting helpers + */ +/**@{*/ + +/** @brief Acquire a reference if possible + * + * Atomically, check reference count is non-zero and increment if so. + * Returns old reference count. + * + * @param[in] refp Pointer to a reference counter + * @return 0 if reference was already zero, non-zero if reference + * count was successfully incremented + */ +int acquire_ref( uintptr_t *refp ); + +/** @brief Check reference count and try to decrement + * + * Atomically, decrement reference count if non-zero and register + * object if decremented to zero. Returning previous reference count. + * + * @param[in] refp Pointer to a reference counter + * @param[in] object The managed object + * @param[in] cb Callback to invoke when safe to do so + * @return 0 if reference was already zero, non-zero if reference + * count was non-zero at the time of call + */ +int try_release_ref( uintptr_t *refp, void *object, dispose_cb *cb ); + +/** @brief Read reference count + * + * @param[in] object Pointer to the managed object + * @param[in] ref_field Member where reference count is stored in + * the object + * @return Current value of reference counter + */ +#define IS_ALIVE( object, ref_field ) \ + __atomic_load_n( &(object)->ref_field, __ATOMIC_ACQUIRE ) + +/** @brief Release reference + * + * A cheaper alternative to #try_release_ref(), safe only when we know + * reference count was already non-zero. + * + * @param[in] object The managed object + * @param[in] ref_field Member where reference count is stored in + * the object + * @param[in] cb Callback to invoke when safe to do so + */ +#define RELEASE_REF( object, ref_field, cb ) \ + do { \ + if ( !__atomic_sub_fetch( \ + &(object)->ref_field, 1, __ATOMIC_ACQ_REL ) ) { \ + epoch_append( object, (dispose_cb *)cb ); \ + } \ + } while (0) + +/**@}*/ + +#endif /* __LLOAD_EPOCH_H */ diff --git a/servers/lloadd/extended.c b/servers/lloadd/extended.c index 74ffcdf38c..1f22a71968 100644 --- a/servers/lloadd/extended.c +++ b/servers/lloadd/extended.c @@ -36,6 +36,7 @@ handle_starttls( LloadConnection *c, LloadOperation *op ) char *msg = NULL; int rc = LDAP_SUCCESS; + CONNECTION_LOCK(c); tavl_delete( &c->c_ops, op, operation_client_cmp ); if ( c->c_is_tls == LLOAD_TLS_ESTABLISHED ) { @@ -51,6 +52,7 @@ handle_starttls( LloadConnection *c, LloadOperation *op ) rc = LDAP_UNAVAILABLE; msg = "Could not initialize TLS"; } + CONNECTION_UNLOCK(c); Debug( LDAP_DEBUG_STATS, "handle_starttls: " "handling StartTLS exop connid=%lu rc=%d msg=%s\n", @@ -58,11 +60,10 @@ handle_starttls( LloadConnection *c, LloadOperation *op ) if ( rc ) { /* We've already removed the operation from the queue */ - return operation_send_reject_locked( op, rc, msg, 1 ); + operation_send_reject( op, rc, msg, 1 ); + return LDAP_SUCCESS; } - CONNECTION_UNLOCK_INCREF(c); - event_del( c->c_read_event ); event_del( c->c_write_event ); /* @@ -77,9 +78,8 @@ handle_starttls( LloadConnection *c, LloadOperation *op ) output = c->c_pendingber; if ( output == NULL && (output = ber_alloc()) == NULL ) { ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); - CONNECTION_LOCK_DECREF(c); - operation_destroy_from_client( op ); - CONNECTION_DESTROY(c); + operation_unlink( op ); + CONNECTION_LOCK_DESTROY(c); return -1; } c->c_pendingber = output; @@ -88,7 +88,7 @@ handle_starttls( LloadConnection *c, LloadOperation *op ) LDAP_RES_EXTENDED, LDAP_SUCCESS, "", "" ); ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); - CONNECTION_LOCK_DECREF(c); + CONNECTION_LOCK(c); c->c_read_timeout = lload_timeout_net; event_assign( c->c_read_event, base, c->c_fd, EV_READ|EV_PERSIST, client_tls_handshake_cb, c ); @@ -100,8 +100,9 @@ handle_starttls( LloadConnection *c, LloadOperation *op ) event_add( c->c_write_event, lload_write_timeout ); op->o_res = LLOAD_OP_COMPLETED; - operation_destroy_from_client( op ); - CONNECTION_UNLOCK_INCREF(c); + CONNECTION_UNLOCK(c); + + operation_unlink( op ); return -1; } @@ -115,10 +116,8 @@ request_extended( LloadConnection *c, LloadOperation *op ) ber_tag_t tag; if ( (copy = ber_alloc()) == NULL ) { - if ( operation_send_reject_locked( - op, LDAP_OTHER, "internal error", 0 ) == LDAP_SUCCESS ) { - CONNECTION_DESTROY(c); - } + operation_send_reject( op, LDAP_OTHER, "internal error", 0 ); + CONNECTION_LOCK_DESTROY(c); return -1; } @@ -128,8 +127,9 @@ request_extended( LloadConnection *c, LloadOperation *op ) if ( tag != LDAP_TAG_EXOP_REQ_OID ) { Debug( LDAP_DEBUG_STATS, "request_extended: " "no OID present in extended request\n" ); - return operation_send_reject_locked( - op, LDAP_PROTOCOL_ERROR, "decoding error", 0 ); + operation_send_reject( op, LDAP_PROTOCOL_ERROR, "decoding error", 0 ); + CONNECTION_LOCK_DESTROY(c); + return -1; } needle.oid = bv; @@ -145,8 +145,8 @@ request_extended( LloadConnection *c, LloadOperation *op ) ber_free( copy, 0 ); if ( c->c_state == LLOAD_C_BINDING ) { - return operation_send_reject_locked( - op, LDAP_PROTOCOL_ERROR, "bind in progress", 0 ); + operation_send_reject( op, LDAP_PROTOCOL_ERROR, "bind in progress", 0 ); + return LDAP_SUCCESS; } return request_process( c, op ); } diff --git a/servers/lloadd/lload.h b/servers/lloadd/lload.h index d787a462d7..5a96fb1782 100644 --- a/servers/lloadd/lload.h +++ b/servers/lloadd/lload.h @@ -84,6 +84,8 @@ LDAP_BEGIN_DECL #define BER_BV_OPTIONAL( bv ) ( BER_BVISNULL( bv ) ? NULL : ( bv ) ) +#include + typedef struct LloadBackend LloadBackend; typedef struct LloadPendingConnection LloadPendingConnection; typedef struct LloadConnection LloadConnection; @@ -280,58 +282,39 @@ struct LloadConnection { * - also a liveness/validity token is added to c_refcnt during * lload_connection_init, its existence is tracked in c_live and is usually the * only one that prevents it from being destroyed - * - anyone who needs to be able to lock the connection after unlocking it has - * to use CONNECTION_UNLOCK_INCREF, they are then responsible that - * CONNECTION_LOCK_DECREF+CONNECTION_UNLOCK_OR_DESTROY is used when they are - * done with it + * - anyone who needs to be able to relock the connection after unlocking it has + * to use acquire_ref(), they need to make sure a matching + * RELEASE_REF( c, c_refcnt, c->c_destroy ); is run eventually * - when a connection is considered dead, use CONNECTION_DESTROY on a locked - * connection, it might get disposed of or if anyone still holds a token, it - * just gets unlocked and it's the last token holder's responsibility to run - * CONNECTION_UNLOCK_OR_DESTROY - * - CONNECTION_LOCK_DESTROY is a shorthand for locking, decreasing refcount - * and CONNECTION_DESTROY + * connection, it will be made unreachable from normal places and either + * scheduled for reclamation when safe to do so or if anyone still holds a + * reference, it just gets unlocked and reclaimed after the last ref is + * released + * - CONNECTION_LOCK_DESTROY is a shorthand for locking and CONNECTION_DESTROY */ ldap_pvt_thread_mutex_t c_mutex; /* protect the connection */ - int c_refcnt, c_live; + uintptr_t c_refcnt, c_live; + CONNECTION_DESTROY_CB c_unlink; CONNECTION_DESTROY_CB c_destroy; CONNECTION_PDU_CB c_pdu_cb; #define CONNECTION_LOCK(c) ldap_pvt_thread_mutex_lock( &(c)->c_mutex ) #define CONNECTION_UNLOCK(c) ldap_pvt_thread_mutex_unlock( &(c)->c_mutex ) -#define CONNECTION_LOCK_DECREF(c) \ +#define CONNECTION_UNLINK_(c) \ do { \ - CONNECTION_LOCK(c); \ - (c)->c_refcnt--; \ - } while (0) -#define CONNECTION_UNLOCK_INCREF(c) \ - do { \ - (c)->c_refcnt++; \ - CONNECTION_UNLOCK(c); \ - } while (0) -#define CONNECTION_UNLOCK_OR_DESTROY(c) \ - do { \ - assert( (c)->c_refcnt >= 0 ); \ - if ( (c)->c_state == LLOAD_C_CLOSING && !( c )->c_ops ) { \ - (c)->c_refcnt -= (c)->c_live; \ + if ( (c)->c_live ) { \ (c)->c_live = 0; \ - } \ - if ( !( c )->c_refcnt ) { \ - Debug( LDAP_DEBUG_TRACE, "%s: destroying connection connid=%lu\n", \ - __func__, (c)->c_connid ); \ - (c)->c_destroy( (c) ); \ - (c) = NULL; \ - } else { \ - CONNECTION_UNLOCK(c); \ + RELEASE_REF( (c), c_refcnt, c->c_destroy ); \ + (c)->c_unlink( (c) ); \ } \ } while (0) #define CONNECTION_DESTROY(c) \ do { \ - (c)->c_refcnt -= (c)->c_live; \ - (c)->c_live = 0; \ - CONNECTION_UNLOCK_OR_DESTROY(c); \ + CONNECTION_UNLINK_(c); \ + CONNECTION_UNLOCK(c); \ } while (0) #define CONNECTION_LOCK_DESTROY(c) \ do { \ - CONNECTION_LOCK_DECREF(c); \ + CONNECTION_LOCK(c); \ CONNECTION_DESTROY(c); \ } while (0); @@ -393,12 +376,13 @@ struct LloadConnection { enum op_state { LLOAD_OP_NOT_FREEING = 0, - LLOAD_OP_FREEING_UPSTREAM = 1 << 0, - LLOAD_OP_FREEING_CLIENT = 1 << 1, - LLOAD_OP_DETACHING_UPSTREAM = 1 << 2, - LLOAD_OP_DETACHING_CLIENT = 1 << 3, + LLOAD_OP_DETACHING_CLIENT = 1 << 1, + LLOAD_OP_DETACHING_UPSTREAM = 1 << 0, }; +#define LLOAD_OP_DETACHING_MASK \ + ( LLOAD_OP_DETACHING_UPSTREAM | LLOAD_OP_DETACHING_CLIENT ) + /* operation result for monitoring purposes */ enum op_result { LLOAD_OP_REJECTED, /* operation was not forwarded */ @@ -406,32 +390,28 @@ enum op_result { LLOAD_OP_FAILED, /* operation was forwarded, but no response was received */ }; -#define LLOAD_OP_FREEING_MASK \ - ( LLOAD_OP_FREEING_UPSTREAM | LLOAD_OP_FREEING_CLIENT ) -#define LLOAD_OP_DETACHING_MASK \ - ( LLOAD_OP_DETACHING_UPSTREAM | LLOAD_OP_DETACHING_CLIENT ) - +/* + * Operation reference tracking: + * - o_refcnt is set to 1, never incremented + * - operation_unlink sets it to 0 and on transition from 1 clears both + * connection links (o_client, o_upstream) + */ struct LloadOperation { + uintptr_t o_refcnt; + LloadConnection *o_client; unsigned long o_client_connid; - int o_client_live, o_client_refcnt; ber_int_t o_client_msgid; ber_int_t o_saved_msgid; LloadConnection *o_upstream; unsigned long o_upstream_connid; - int o_upstream_live, o_upstream_refcnt; ber_int_t o_upstream_msgid; time_t o_last_response; - /* Protects o_client, o_upstream pointers before we lock their c_mutex if - * we don't know they are still alive */ + /* Protects o_client, o_upstream links */ ldap_pvt_thread_mutex_t o_link_mutex; - /* Protects o_freeing, can be locked while holding c_mutex */ - ldap_pvt_thread_mutex_t o_mutex; - /* Consistent w.r.t. o_mutex, only written to while holding - * op->o_{client,upstream}->c_mutex */ - enum op_state o_freeing; + ber_tag_t o_tag; time_t o_start; unsigned long o_pin_id; diff --git a/servers/lloadd/main.c b/servers/lloadd/main.c index 84fbbd8bdc..21125635d0 100644 --- a/servers/lloadd/main.c +++ b/servers/lloadd/main.c @@ -424,6 +424,8 @@ main( int argc, char **argv ) } #endif + epoch_init(); + while ( (i = getopt( argc, argv, "c:d:f:F:h:n:o:s:tV" #ifdef LDAP_PF_INET6 diff --git a/servers/lloadd/module_init.c b/servers/lloadd/module_init.c index 61eed4fdbe..9ba216d59c 100644 --- a/servers/lloadd/module_init.c +++ b/servers/lloadd/module_init.c @@ -85,6 +85,10 @@ lload_back_open( BackendInfo *bi ) return 0; } + /* This will fail if we ever try to instantiate more than one lloadd within + * the process */ + epoch_init(); + if ( lload_tls_init() != 0 ) { return -1; } diff --git a/servers/lloadd/operation.c b/servers/lloadd/operation.c index f8f559cb40..dd845a3379 100644 --- a/servers/lloadd/operation.c +++ b/servers/lloadd/operation.c @@ -116,363 +116,6 @@ operation_upstream_cmp( const void *left, const void *right ) } } -/* - * Free the operation, subject to there being noone else holding a reference - * to it. - * - * Both operation_destroy_from_* functions are the same, two implementations - * exist to cater for the fact that either side (client or upstream) might - * decide to destroy it and each holds a different mutex. - * - * Due to the fact that we rely on mutexes on both connections which have a - * different timespan from the operation, we have to take the following race - * into account: - * - * Trigger - * - both operation_destroy_from_client and operation_destroy_from_upstream - * are called at the same time (each holding its mutex), several times - * before one of them finishes - * - either or both connections might have started the process of being - * destroyed - * - * We need to detect that the race has happened and only allow one of them to - * free the operation (we use o_freeing != 0 to announce+detect that). - * - * In case the caller was in the process of destroying the connection and the - * race had been won by the mirror caller, it will increment c_refcnt on its - * connection and make sure to postpone the final step in - * client/upstream_destroy(). Testing o_freeing for the mirror side's token - * allows the winner to detect that it has been a party to the race and a token - * in c_refcnt has been deposited on its behalf. - * - * Beware! This widget really touches all the mutexes we have and showcases the - * issues with maintaining so many mutex ordering restrictions. - */ -void -operation_destroy_from_client( LloadOperation *op ) -{ - LloadConnection *upstream = NULL, *client = op->o_client; - LloadBackend *b = NULL; - int race_state, detach_client = !client->c_live; - - Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: " - "op=%p attempting to release operation%s\n", - op, detach_client ? " and detach client" : "" ); - - /* 1. liveness/refcnt adjustment and test */ - op->o_client_refcnt -= op->o_client_live; - op->o_client_live = 0; - - assert( op->o_client_refcnt <= client->c_refcnt ); - if ( op->o_client_refcnt ) { - Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: " - "op=%p not dead yet\n", - op ); - return; - } - - /* 2. Remove from the operation map and TODO adjust the pending op count */ - tavl_delete( &client->c_ops, op, operation_client_cmp ); - - /* 3. Detect whether we entered a race to free op and indicate that to any - * others */ - ldap_pvt_thread_mutex_lock( &op->o_mutex ); - race_state = op->o_freeing; - op->o_freeing |= LLOAD_OP_FREEING_CLIENT; - if ( detach_client ) { - op->o_freeing |= LLOAD_OP_DETACHING_CLIENT; - } - ldap_pvt_thread_mutex_unlock( &op->o_mutex ); - - CONNECTION_UNLOCK_INCREF(client); - - if ( detach_client ) { - ldap_pvt_thread_mutex_lock( &op->o_link_mutex ); - op->o_client = NULL; - ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); - } - - /* 4. If we lost the race, deal with it straight away */ - if ( race_state ) { - /* - * We have raced to destroy op and the first one to lose on this side, - * leave a refcnt token on client so we don't destroy it before the - * other side has finished (it knows we did that when it examines - * o_freeing again). - */ - if ( detach_client ) { - Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: " - "op=%p lost race but client connid=%lu is going down\n", - op, client->c_connid ); - CONNECTION_LOCK_DECREF(client); - } else if ( (race_state & LLOAD_OP_FREEING_MASK) == - LLOAD_OP_FREEING_UPSTREAM ) { - Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: " - "op=%p lost race, increased client refcnt connid=%lu " - "to refcnt=%d\n", - op, client->c_connid, client->c_refcnt ); - CONNECTION_LOCK(client); - } else { - Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: " - "op=%p lost race with another " - "operation_destroy_from_client, " - "client connid=%lu\n", - op, client->c_connid ); - CONNECTION_LOCK_DECREF(client); - } - return; - } - - /* it seems we will be destroying the operation, - * so update the global rejected cunter if needed */ - operation_update_global_rejected( op ); - /* 5. If we raced the upstream side and won, reclaim the token */ - ldap_pvt_thread_mutex_lock( &op->o_link_mutex ); - if ( !(race_state & LLOAD_OP_DETACHING_UPSTREAM) ) { - upstream = op->o_upstream; - if ( upstream ) { - CONNECTION_LOCK(upstream); - } - } - ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); - - ldap_pvt_thread_mutex_lock( &op->o_mutex ); - /* We don't actually resolve the race in full until we grab the other's - * c_mutex+op->o_mutex here */ - if ( upstream && ( op->o_freeing & LLOAD_OP_FREEING_UPSTREAM ) ) { - if ( op->o_freeing & LLOAD_OP_DETACHING_UPSTREAM ) { - CONNECTION_UNLOCK(upstream); - upstream = NULL; - } else { - /* - * We have raced to destroy op and won. To avoid freeing the connection - * under us, a refcnt token has been left over for us on the upstream, - * decref and see whether we are in charge of freeing it - */ - upstream->c_refcnt--; - Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: " - "op=%p other side lost race with us, upstream connid=%lu\n", - op, upstream->c_connid ); - } - } - ldap_pvt_thread_mutex_unlock( &op->o_mutex ); - - /* 6. liveness/refcnt adjustment and test */ - op->o_upstream_refcnt -= op->o_upstream_live; - op->o_upstream_live = 0; - if ( op->o_upstream_refcnt ) { - Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: " - "op=%p other side still alive, refcnt=%d\n", - op, op->o_upstream_refcnt ); - - /* There must have been no race if op is still alive */ - ldap_pvt_thread_mutex_lock( &op->o_mutex ); - op->o_freeing &= ~LLOAD_OP_FREEING_CLIENT; - if ( detach_client ) { - op->o_freeing &= ~LLOAD_OP_DETACHING_CLIENT; - } - assert( op->o_freeing == 0 ); - ldap_pvt_thread_mutex_unlock( &op->o_mutex ); - - assert( upstream != NULL ); - CONNECTION_UNLOCK_OR_DESTROY(upstream); - CONNECTION_LOCK_DECREF(client); - return; - } - - /* 7. Remove from the operation map and adjust the pending op count */ - if ( upstream ) { - if ( tavl_delete( &upstream->c_ops, op, operation_upstream_cmp ) ) { - upstream->c_n_ops_executing--; - operation_update_conn_counters( op ); - b = (LloadBackend *)upstream->c_private; - } - CONNECTION_UNLOCK_OR_DESTROY(upstream); - - if ( b ) { - ldap_pvt_thread_mutex_lock( &b->b_mutex ); - b->b_n_ops_executing--; - operation_update_backend_counters( op, b ); - ldap_pvt_thread_mutex_unlock( &b->b_mutex ); - } - } - - /* 8. Release the operation */ - Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: " - "op=%p destroyed operation from client connid=%lu, " - "client msgid=%d\n", - op, op->o_client_connid, op->o_client_msgid ); - ber_free( op->o_ber, 1 ); - ldap_pvt_thread_mutex_destroy( &op->o_mutex ); - ldap_pvt_thread_mutex_destroy( &op->o_link_mutex ); - ch_free( op ); - - CONNECTION_LOCK_DECREF(client); -} - -/* - * See operation_destroy_from_client. - */ -void -operation_destroy_from_upstream( LloadOperation *op ) -{ - LloadConnection *client = NULL, *upstream = op->o_upstream; - LloadBackend *b = NULL; - int race_state, detach_upstream = !upstream->c_live; - - Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: " - "op=%p attempting to release operation%s\n", - op, detach_upstream ? " and detach upstream" : "" ); - - /* 1. liveness/refcnt adjustment and test */ - op->o_upstream_refcnt -= op->o_upstream_live; - op->o_upstream_live = 0; - - assert( op->o_upstream_refcnt <= upstream->c_refcnt ); - if ( op->o_upstream_refcnt ) { - Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: " - "op=%p not dead yet\n", - op ); - return; - } - - /* it seems we will be destroying the operation, - * so update the global rejected cunter if needed */ - operation_update_global_rejected( op ); - /* 2. Remove from the operation map and adjust the pending op count */ - if ( tavl_delete( &upstream->c_ops, op, operation_upstream_cmp ) ) { - upstream->c_n_ops_executing--; - operation_update_conn_counters( op ); - b = (LloadBackend *)upstream->c_private; - } - - ldap_pvt_thread_mutex_lock( &op->o_mutex ); - race_state = op->o_freeing; - op->o_freeing |= LLOAD_OP_FREEING_UPSTREAM; - if ( detach_upstream ) { - op->o_freeing |= LLOAD_OP_DETACHING_UPSTREAM; - } - ldap_pvt_thread_mutex_unlock( &op->o_mutex ); - - CONNECTION_UNLOCK_INCREF(upstream); - - /* 3. Detect whether we entered a race to free op */ - ldap_pvt_thread_mutex_lock( &op->o_link_mutex ); - if ( detach_upstream ) { - op->o_upstream = NULL; - } - ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); - - if ( b ) { - ldap_pvt_thread_mutex_lock( &b->b_mutex ); - b->b_n_ops_executing--; - operation_update_backend_counters( op, b ); - ldap_pvt_thread_mutex_unlock( &b->b_mutex ); - } - - /* 4. If we lost the race, deal with it straight away */ - if ( race_state ) { - /* - * We have raced to destroy op and the first one to lose on this side, - * leave a refcnt token on upstream so we don't destroy it before the - * other side has finished (it knows we did that when it examines - * o_freeing again). - */ - if ( detach_upstream ) { - Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: " - "op=%p lost race but upstream connid=%lu is going down\n", - op, upstream->c_connid ); - CONNECTION_LOCK_DECREF(upstream); - } else if ( (race_state & LLOAD_OP_FREEING_MASK) == - LLOAD_OP_FREEING_CLIENT ) { - Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: " - "op=%p lost race, increased upstream refcnt connid=%lu " - "to refcnt=%d\n", - op, upstream->c_connid, upstream->c_refcnt ); - CONNECTION_LOCK(upstream); - } else { - Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: " - "op=%p lost race with another " - "operation_destroy_from_upstream, " - "upstream connid=%lu\n", - op, upstream->c_connid ); - CONNECTION_LOCK_DECREF(upstream); - } - return; - } - - /* 5. If we raced the client side and won, reclaim the token */ - ldap_pvt_thread_mutex_lock( &op->o_link_mutex ); - if ( !(race_state & LLOAD_OP_DETACHING_CLIENT) ) { - client = op->o_client; - if ( client ) { - CONNECTION_LOCK(client); - } - } - ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); - - /* We don't actually resolve the race in full until we grab the other's - * c_mutex+op->o_mutex here */ - ldap_pvt_thread_mutex_lock( &op->o_mutex ); - if ( client && ( op->o_freeing & LLOAD_OP_FREEING_CLIENT ) ) { - if ( op->o_freeing & LLOAD_OP_DETACHING_CLIENT ) { - CONNECTION_UNLOCK(client); - client = NULL; - } else { - /* - * We have raced to destroy op and won. To avoid freeing the connection - * under us, a refcnt token has been left over for us on the client, - * decref and see whether we are in charge of freeing it - */ - client->c_refcnt--; - Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: " - "op=%p other side lost race with us, client connid=%lu\n", - op, client->c_connid ); - } - } - ldap_pvt_thread_mutex_unlock( &op->o_mutex ); - - /* 6. liveness/refcnt adjustment and test */ - op->o_client_refcnt -= op->o_client_live; - op->o_client_live = 0; - if ( op->o_client_refcnt ) { - Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: " - "op=%p other side still alive, refcnt=%d\n", - op, op->o_client_refcnt ); - /* There must have been no race if op is still alive */ - ldap_pvt_thread_mutex_lock( &op->o_mutex ); - op->o_freeing &= ~LLOAD_OP_FREEING_UPSTREAM; - if ( detach_upstream ) { - op->o_freeing &= ~LLOAD_OP_DETACHING_UPSTREAM; - } - assert( op->o_freeing == 0 ); - ldap_pvt_thread_mutex_unlock( &op->o_mutex ); - - assert( client != NULL ); - CONNECTION_UNLOCK_OR_DESTROY(client); - CONNECTION_LOCK_DECREF(upstream); - return; - } - - /* 7. Remove from the operation map and TODO adjust the pending op count */ - if ( client ) { - tavl_delete( &client->c_ops, op, operation_client_cmp ); - CONNECTION_UNLOCK_OR_DESTROY(client); - } - - /* 8. Release the operation */ - Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: " - "op=%p destroyed operation from client connid=%lu, " - "client msgid=%d\n", - op, op->o_client_connid, op->o_client_msgid ); - ber_free( op->o_ber, 1 ); - ldap_pvt_thread_mutex_destroy( &op->o_mutex ); - ldap_pvt_thread_mutex_destroy( &op->o_link_mutex ); - ch_free( op ); - - CONNECTION_LOCK_DECREF(upstream); -} - /* * Entered holding c_mutex for now. */ @@ -490,11 +133,9 @@ operation_init( LloadConnection *c, BerElement *ber ) op->o_ber = ber; op->o_start = slap_get_time(); - ldap_pvt_thread_mutex_init( &op->o_mutex ); ldap_pvt_thread_mutex_init( &op->o_link_mutex ); - op->o_client_live = op->o_client_refcnt = 1; - op->o_upstream_live = op->o_upstream_refcnt = 1; + op->o_refcnt = 1; tag = ber_get_int( ber, &op->o_client_msgid ); if ( tag != LDAP_TAG_MSGID ) { @@ -549,13 +190,163 @@ fail: return NULL; } -int -operation_send_abandon( LloadOperation *op ) +void +operation_destroy( LloadOperation *op ) +{ + Debug( LDAP_DEBUG_TRACE, "operation_destroy: " + "op=%p destroyed operation from client connid=%lu, " + "client msgid=%d\n", + op, op->o_client_connid, op->o_client_msgid ); + + assert( op->o_refcnt == 0 ); + assert( op->o_client == NULL ); + assert( op->o_upstream == NULL ); + + ber_free( op->o_ber, 1 ); + ldap_pvt_thread_mutex_destroy( &op->o_link_mutex ); + ch_free( op ); +} + +int +operation_unlink( LloadOperation *op ) +{ + LloadConnection *client, *upstream; + uintptr_t prev_refcnt; + int result = 0; + + if ( !( prev_refcnt = try_release_ref( + &op->o_refcnt, op, (dispose_cb *)operation_destroy ) ) ) { + return result; + } + + assert( prev_refcnt == 1 ); + + Debug( LDAP_DEBUG_TRACE, "operation_unlink: " + "unlinking operation between client connid=%lu and upstream " + "connid=%lu " + "client msgid=%d\n", + op->o_client_connid, op->o_upstream_connid, op->o_client_msgid ); + + ldap_pvt_thread_mutex_lock( &op->o_link_mutex ); + client = op->o_client; + upstream = op->o_upstream; + + op->o_client = NULL; + op->o_upstream = NULL; + ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); + + assert( client || upstream ); + + if ( client ) { + result |= operation_unlink_client( op, client ); + operation_update_global_rejected( op ); + } + + if ( upstream ) { + result |= operation_unlink_upstream( op, upstream ); + } + + return result; +} + +int +operation_unlink_client( LloadOperation *op, LloadConnection *client ) +{ + LloadOperation *removed; + int result = 0; + + Debug( LDAP_DEBUG_TRACE, "operation_unlink_client: " + "unlinking operation op=%p msgid=%d client connid=%lu\n", + op, op->o_client_msgid, op->o_client_connid ); + + CONNECTION_LOCK(client); + if ( (removed = tavl_delete( + &client->c_ops, op, operation_client_cmp )) ) { + result = LLOAD_OP_DETACHING_CLIENT; + + assert( op == removed ); + client->c_n_ops_executing--; + + if ( client->c_state == LLOAD_C_BINDING ) { + client->c_state = LLOAD_C_READY; + if ( !BER_BVISNULL( &client->c_auth ) ) { + ber_memfree( client->c_auth.bv_val ); + BER_BVZERO( &client->c_auth ); + } + if ( !BER_BVISNULL( &client->c_sasl_bind_mech ) ) { + ber_memfree( client->c_sasl_bind_mech.bv_val ); + BER_BVZERO( &client->c_sasl_bind_mech ); + } + if ( op->o_pin_id ) { + client->c_pin_id = 0; + } + } + } + if ( client->c_state == LLOAD_C_CLOSING && !client->c_ops ) { + CONNECTION_DESTROY(client); + } else { + CONNECTION_UNLOCK(client); + } + + return result; +} + +int +operation_unlink_upstream( LloadOperation *op, LloadConnection *upstream ) +{ + LloadOperation *removed; + LloadBackend *b = NULL; + int result = 0; + + Debug( LDAP_DEBUG_TRACE, "operation_unlink_upstream: " + "unlinking operation op=%p msgid=%d upstream connid=%lu\n", + op, op->o_upstream_msgid, op->o_upstream_connid ); + + CONNECTION_LOCK(upstream); + if ( (removed = tavl_delete( + &upstream->c_ops, op, operation_upstream_cmp )) ) { + result |= LLOAD_OP_DETACHING_UPSTREAM; + + assert( op == removed ); + upstream->c_n_ops_executing--; + + if ( upstream->c_state == LLOAD_C_BINDING ) { + assert( op->o_tag == LDAP_REQ_BIND && upstream->c_ops == NULL ); + upstream->c_state = LLOAD_C_READY; + if ( !BER_BVISNULL( &upstream->c_sasl_bind_mech ) ) { + ber_memfree( upstream->c_sasl_bind_mech.bv_val ); + BER_BVZERO( &upstream->c_sasl_bind_mech ); + } + } + operation_update_conn_counters( op, upstream ); + b = (LloadBackend *)upstream->c_private; + } + if ( upstream->c_state == LLOAD_C_CLOSING && !upstream->c_ops ) { + CONNECTION_DESTROY(upstream); + } else { + CONNECTION_UNLOCK(upstream); + } + + if ( b ) { + ldap_pvt_thread_mutex_lock( &b->b_mutex ); + b->b_n_ops_executing--; + operation_update_backend_counters( op, b ); + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + } + + return result; +} + +int +operation_send_abandon( LloadOperation *op, LloadConnection *upstream ) { - LloadConnection *upstream = op->o_upstream; BerElement *ber; int rc = -1; + if ( !IS_ALIVE( upstream, c_refcnt ) ) { + return rc; + } + ldap_pvt_thread_mutex_lock( &upstream->c_io_mutex ); ber = upstream->c_pendingber; if ( ber == NULL && (ber = ber_alloc()) == NULL ) { @@ -605,114 +396,64 @@ void operation_abandon( LloadOperation *op ) { LloadConnection *c; - LloadBackend *b; - int rc = LDAP_SUCCESS; ldap_pvt_thread_mutex_lock( &op->o_link_mutex ); c = op->o_upstream; - if ( !c || !c->c_live ) { - ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); + ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); + if ( !c || !IS_ALIVE( c, c_refcnt ) ) { goto done; } - CONNECTION_LOCK(c); - ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); - /* for now consider all abandoned operations completed, * perhaps add a separate counter later */ op->o_res = LLOAD_OP_COMPLETED; - - if ( tavl_delete( &c->c_ops, op, operation_upstream_cmp ) == NULL ) { + if ( !operation_unlink_upstream( op, c ) ) { /* The operation has already been abandoned or finished */ Debug( LDAP_DEBUG_TRACE, "operation_abandon: " "%s from connid=%lu msgid=%d not present in connid=%lu any " "more\n", lload_msgtype2str( op->o_tag ), op->o_client_connid, op->o_client_msgid, op->o_upstream_connid ); - goto unlock; + goto done; } - if ( c->c_state == LLOAD_C_BINDING ) { - c->c_state = LLOAD_C_READY; - if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) { - ber_memfree( c->c_sasl_bind_mech.bv_val ); - BER_BVZERO( &c->c_sasl_bind_mech ); - } - } - c->c_n_ops_executing--; - b = (LloadBackend *)c->c_private; - op->o_upstream_refcnt++; - CONNECTION_UNLOCK_INCREF(c); - - ldap_pvt_thread_mutex_lock( &b->b_mutex ); - b->b_n_ops_executing--; - operation_update_backend_counters( op, b ); - ldap_pvt_thread_mutex_unlock( &b->b_mutex ); - - if ( operation_send_abandon( op ) == LDAP_SUCCESS ) { + if ( operation_send_abandon( op, c ) == LDAP_SUCCESS ) { connection_write_cb( -1, 0, c ); } - CONNECTION_LOCK_DECREF(c); - op->o_upstream_refcnt--; - -unlock: - if ( !c->c_live || !op->o_upstream_refcnt ) { - operation_destroy_from_upstream( op ); - } - if ( rc ) { - CONNECTION_DESTROY(c); - } else { - CONNECTION_UNLOCK_OR_DESTROY(c); - } - done: - c = op->o_client; - assert( c ); - - /* Caller should hold a reference on client */ - CONNECTION_LOCK(c); - if ( c->c_state == LLOAD_C_BINDING ) { - c->c_state = LLOAD_C_READY; - if ( !BER_BVISNULL( &c->c_auth ) ) { - ber_memfree( c->c_auth.bv_val ); - BER_BVZERO( &c->c_auth ); - } - if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) { - ber_memfree( c->c_sasl_bind_mech.bv_val ); - BER_BVZERO( &c->c_sasl_bind_mech ); - } - if ( op->o_pin_id ) { - c->c_pin_id = 0; - } - } - assert( op->o_client_refcnt > op->o_client_live ); - op->o_client_refcnt--; - operation_destroy_from_client( op ); - CONNECTION_UNLOCK(c); + operation_unlink( op ); } -/* - * Called with op->o_client non-NULL and already locked. - */ -int -operation_send_reject_locked( +void +operation_send_reject( LloadOperation *op, int result, const char *msg, int send_anyway ) { - LloadConnection *c = op->o_client; + LloadConnection *c; BerElement *ber; int found; - Debug( LDAP_DEBUG_TRACE, "operation_send_reject_locked: " + Debug( LDAP_DEBUG_TRACE, "operation_send_reject: " "rejecting %s from client connid=%lu with message: \"%s\"\n", - lload_msgtype2str( op->o_tag ), c->c_connid, msg ); + lload_msgtype2str( op->o_tag ), op->o_client_connid, msg ); - found = ( tavl_delete( &c->c_ops, op, operation_client_cmp ) == op ); + ldap_pvt_thread_mutex_lock( &op->o_link_mutex ); + c = op->o_client; + ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); + if ( !c || !IS_ALIVE( c, c_refcnt ) ) { + Debug( LDAP_DEBUG_TRACE, "operation_send_reject: " + "not sending msgid=%d, client connid=%lu is dead\n", + op->o_client_msgid, op->o_client_connid ); + + goto done; + } + + found = operation_unlink_client( op, c ); if ( !found && !send_anyway ) { - Debug( LDAP_DEBUG_TRACE, "operation_send_reject_locked: " + Debug( LDAP_DEBUG_TRACE, "operation_send_reject: " "msgid=%d not scheduled for client connid=%lu anymore, " "not sending\n", op->o_client_msgid, c->c_connid ); @@ -721,25 +462,21 @@ operation_send_reject_locked( if ( op->o_client_msgid == 0 ) { assert( op->o_saved_msgid == 0 && op->o_pin_id ); - Debug( LDAP_DEBUG_TRACE, "operation_send_reject_locked: " + Debug( LDAP_DEBUG_TRACE, "operation_send_reject: " "operation pin=%lu is just a pin, not sending\n", op->o_pin_id ); goto done; } - CONNECTION_UNLOCK_INCREF(c); ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); - ber = c->c_pendingber; if ( ber == NULL && (ber = ber_alloc()) == NULL ) { ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); - Debug( LDAP_DEBUG_ANY, "operation_send_reject_locked: " + Debug( LDAP_DEBUG_ANY, "operation_send_reject: " "ber_alloc failed, closing connid=%lu\n", c->c_connid ); - CONNECTION_LOCK_DECREF(c); - operation_destroy_from_client( op ); - CONNECTION_DESTROY(c); - return -1; + CONNECTION_LOCK_DESTROY(c); + goto done; } c->c_pendingber = ber; @@ -751,46 +488,8 @@ operation_send_reject_locked( connection_write_cb( -1, 0, c ); - CONNECTION_LOCK_DECREF(c); done: - operation_destroy_from_client( op ); - return LDAP_SUCCESS; -} - -void -operation_send_reject( - LloadOperation *op, - int result, - const char *msg, - int send_anyway ) -{ - LloadConnection *c; - - ldap_pvt_thread_mutex_lock( &op->o_link_mutex ); - c = op->o_client; - if ( !c ) { - c = op->o_upstream; - /* One of the connections has initiated this and keeps a reference, if - * client is dead, it must have been the upstream */ - assert( c ); - CONNECTION_LOCK(c); - ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); - Debug( LDAP_DEBUG_TRACE, "operation_send_reject: " - "not sending msgid=%d, client connid=%lu is dead\n", - op->o_client_msgid, op->o_client_connid ); - operation_destroy_from_upstream( op ); - CONNECTION_UNLOCK_OR_DESTROY(c); - return; - } - CONNECTION_LOCK(c); - ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); - - /* Non-zero return means connection has been unlocked and might be - * destroyed */ - if ( operation_send_reject_locked( op, result, msg, send_anyway ) == - LDAP_SUCCESS ) { - CONNECTION_UNLOCK_OR_DESTROY(c); - } + operation_unlink( op ); } /* @@ -803,32 +502,27 @@ operation_send_reject( void operation_lost_upstream( LloadOperation *op ) { - LloadConnection *c = op->o_upstream; - operation_send_reject( op, LDAP_OTHER, "connection to the remote server has been severed", 0 ); - - CONNECTION_LOCK(c); - op->o_upstream_refcnt--; - operation_destroy_from_upstream( op ); - CONNECTION_UNLOCK(c); } int connection_timeout( LloadConnection *upstream, void *arg ) { LloadOperation *op; - TAvlnode *ops = NULL, *node; + TAvlnode *ops = NULL, *node, *next; LloadBackend *b = upstream->c_private; time_t threshold = *(time_t *)arg; int rc, nops = 0; + CONNECTION_LOCK(upstream); for ( node = tavl_end( upstream->c_ops, TAVL_DIR_LEFT ); node && ((LloadOperation *)node->avl_data)->o_start < threshold; /* shortcut */ - node = tavl_next( node, TAVL_DIR_RIGHT ) ) { + node = next ) { LloadOperation *found_op; + next = tavl_next( node, TAVL_DIR_RIGHT ); op = node->avl_data; /* Have we received another response since? */ @@ -836,7 +530,6 @@ connection_timeout( LloadConnection *upstream, void *arg ) continue; } - op->o_upstream_refcnt++; op->o_res = LLOAD_OP_FAILED; found_op = tavl_delete( &upstream->c_ops, op, operation_upstream_cmp ); assert( op == found_op ); @@ -863,13 +556,15 @@ connection_timeout( LloadConnection *upstream, void *arg ) } if ( nops == 0 ) { + CONNECTION_UNLOCK(upstream); return LDAP_SUCCESS; } upstream->c_n_ops_executing -= nops; + upstream->c_counters.lc_ops_failed += nops; Debug( LDAP_DEBUG_STATS, "connection_timeout: " "timing out %d operations for connid=%lu\n", nops, upstream->c_connid ); - CONNECTION_UNLOCK_INCREF(upstream); + CONNECTION_UNLOCK(upstream); ldap_pvt_thread_mutex_lock( &b->b_mutex ); b->b_n_ops_executing -= nops; @@ -877,36 +572,17 @@ connection_timeout( LloadConnection *upstream, void *arg ) for ( node = tavl_end( ops, TAVL_DIR_LEFT ); node; node = tavl_next( node, TAVL_DIR_RIGHT ) ) { - LloadConnection *client; - op = node->avl_data; - ldap_pvt_thread_mutex_lock( &op->o_link_mutex ); - client = op->o_client; - if ( !client ) { - ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); - continue; - } - CONNECTION_LOCK(client); - ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); - - /* operation_send_reject_locked unlocks and destroys client on - * failure */ - if ( operation_send_reject_locked( op, - op->o_tag == LDAP_REQ_SEARCH ? LDAP_TIMELIMIT_EXCEEDED : - LDAP_ADMINLIMIT_EXCEEDED, - "upstream did not respond in time", 0 ) == LDAP_SUCCESS ) { - CONNECTION_UNLOCK_OR_DESTROY(client); - } + operation_send_reject( op, + op->o_tag == LDAP_REQ_SEARCH ? LDAP_TIMELIMIT_EXCEEDED : + LDAP_ADMINLIMIT_EXCEEDED, + "upstream did not respond in time", 0 ); if ( rc == LDAP_SUCCESS ) { - rc = operation_send_abandon( op ); + rc = operation_send_abandon( op, upstream ); } - - CONNECTION_LOCK(upstream); - op->o_upstream_refcnt--; - operation_destroy_from_upstream( op ); - CONNECTION_UNLOCK(upstream); + operation_unlink( op ); } /* TODO: if operation_send_abandon failed, we need to kill the upstream */ @@ -914,7 +590,13 @@ connection_timeout( LloadConnection *upstream, void *arg ) connection_write_cb( -1, 0, upstream ); } - CONNECTION_LOCK_DECREF(upstream); + CONNECTION_LOCK(upstream); + if ( upstream->c_state == LLOAD_C_CLOSING && !upstream->c_ops ) { + CONNECTION_DESTROY(upstream); + } else { + CONNECTION_UNLOCK(upstream); + } + /* just dispose of the AVL, most operations should already be gone */ tavl_free( ops, NULL ); return LDAP_SUCCESS; @@ -933,12 +615,16 @@ operations_timeout( evutil_socket_t s, short what, void *arg ) threshold = slap_get_time() - lload_timeout_api->tv_sec; LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) { + epoch_t epoch; + ldap_pvt_thread_mutex_lock( &b->b_mutex ); if ( b->b_n_ops_executing == 0 ) { ldap_pvt_thread_mutex_unlock( &b->b_mutex ); continue; } + epoch = epoch_join(); + Debug( LDAP_DEBUG_TRACE, "operations_timeout: " "timing out binds for backend uri=%s\n", b->b_uri.bv_val ); @@ -951,6 +637,7 @@ operations_timeout( evutil_socket_t s, short what, void *arg ) connections_walk_last( &b->b_mutex, &b->b_conns, b->b_last_conn, connection_timeout, &threshold ); + epoch_leave( epoch ); ldap_pvt_thread_mutex_unlock( &b->b_mutex ); } done: @@ -976,13 +663,12 @@ operation_update_global_rejected( LloadOperation *op ) } void -operation_update_conn_counters( LloadOperation *op ) +operation_update_conn_counters( LloadOperation *op, LloadConnection *upstream ) { - assert( op->o_upstream != NULL ); if ( op->o_res == LLOAD_OP_COMPLETED ) { - op->o_upstream->c_counters.lc_ops_completed++; + upstream->c_counters.lc_ops_completed++; } else { - op->o_upstream->c_counters.lc_ops_failed++; + upstream->c_counters.lc_ops_failed++; } } diff --git a/servers/lloadd/proto-lload.h b/servers/lloadd/proto-lload.h index f95419e991..21d070313a 100644 --- a/servers/lloadd/proto-lload.h +++ b/servers/lloadd/proto-lload.h @@ -172,15 +172,17 @@ LDAP_SLAPD_F (const char *) lload_msgtype2str( ber_tag_t tag ); LDAP_SLAPD_F (int) operation_upstream_cmp( const void *l, const void *r ); LDAP_SLAPD_F (int) operation_client_cmp( const void *l, const void *r ); LDAP_SLAPD_F (LloadOperation *) operation_init( LloadConnection *c, BerElement *ber ); -LDAP_SLAPD_F (int) operation_send_abandon( LloadOperation *op ); +LDAP_SLAPD_F (int) operation_send_abandon( LloadOperation *op, LloadConnection *c ); LDAP_SLAPD_F (void) operation_abandon( LloadOperation *op ); LDAP_SLAPD_F (void) operation_send_reject( LloadOperation *op, int result, const char *msg, int send_anyway ); LDAP_SLAPD_F (int) operation_send_reject_locked( LloadOperation *op, int result, const char *msg, int send_anyway ); LDAP_SLAPD_F (void) operation_lost_upstream( LloadOperation *op ); -LDAP_SLAPD_F (void) operation_destroy_from_client( LloadOperation *op ); -LDAP_SLAPD_F (void) operation_destroy_from_upstream( LloadOperation *op ); +LDAP_SLAPD_F (void) operation_destroy( LloadOperation *op ); +LDAP_SLAPD_F (int) operation_unlink( LloadOperation *op ); +LDAP_SLAPD_F (int) operation_unlink_client( LloadOperation *op, LloadConnection *client ); +LDAP_SLAPD_F (int) operation_unlink_upstream( LloadOperation *op, LloadConnection *upstream ); LDAP_SLAPD_F (void) operations_timeout( evutil_socket_t s, short what, void *arg ); -LDAP_SLAPD_F (void) operation_update_conn_counters( LloadOperation *op ); +LDAP_SLAPD_F (void) operation_update_conn_counters( LloadOperation *op, LloadConnection *upstream ); LDAP_SLAPD_F (void) operation_update_backend_counters( LloadOperation *op, LloadBackend *b ); LDAP_SLAPD_F (void) operation_update_global_rejected( LloadOperation *op ); /* diff --git a/servers/lloadd/upstream.c b/servers/lloadd/upstream.c index f19fb1f1f9..4037bb0716 100644 --- a/servers/lloadd/upstream.c +++ b/servers/lloadd/upstream.c @@ -38,6 +38,8 @@ static const sasl_callback_t client_callbacks[] = { }; #endif /* HAVE_CYRUS_SASL */ +static void upstream_unlink( LloadConnection *upstream ); + int forward_response( LloadConnection *client, LloadOperation *op, BerElement *ber ) { @@ -101,13 +103,13 @@ forward_final_response( "connid=%lu msgid=%d finishing up with a request for " "client connid=%lu\n", op->o_upstream_connid, op->o_upstream_msgid, op->o_client_connid ); + rc = forward_response( client, op, ber ); - CONNECTION_LOCK(op->o_upstream); + op->o_res = LLOAD_OP_COMPLETED; - if ( !op->o_pin_id || !op->o_upstream_refcnt-- ) { - operation_destroy_from_upstream( op ); + if ( !op->o_pin_id ) { + operation_unlink( op ); } - CONNECTION_UNLOCK(op->o_upstream); return rc; } @@ -177,11 +179,13 @@ handle_one_response( LloadConnection *c ) goto fail; } + CONNECTION_LOCK(c); if ( needle.o_upstream_msgid == 0 ) { return handle_unsolicited( c, ber ); } else if ( !( op = tavl_find( c->c_ops, &needle, operation_upstream_cmp ) ) ) { /* Already abandoned, do nothing */ + CONNECTION_UNLOCK(c); ber_free( ber, 1 ); return rc; /* @@ -190,6 +194,7 @@ handle_one_response( LloadConnection *c ) event_del( c->c_read_event ); */ } else { + CONNECTION_UNLOCK(c); /* op->o_response_pending = ber; */ @@ -239,40 +244,14 @@ handle_one_response( LloadConnection *c ) if ( handler ) { LloadConnection *client; - op->o_upstream_refcnt++; - CONNECTION_UNLOCK_INCREF(c); - ldap_pvt_thread_mutex_lock( &op->o_link_mutex ); client = op->o_client; - if ( client ) { - CONNECTION_LOCK(client); - if ( client->c_live ) { - op->o_client_refcnt++; - CONNECTION_UNLOCK_INCREF(client); - } else { - CONNECTION_UNLOCK(client); - client = NULL; - } - } ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); - - if ( client ) { + if ( client && IS_ALIVE( client, c_refcnt ) ) { rc = handler( client, op, ber ); - CONNECTION_LOCK_DECREF(client); - op->o_client_refcnt--; - if ( !op->o_client_refcnt ) { - operation_destroy_from_client( op ); - } - CONNECTION_UNLOCK_OR_DESTROY(client); } else { ber_free( ber, 1 ); } - - CONNECTION_LOCK_DECREF(c); - op->o_upstream_refcnt--; - if ( !client || !op->o_upstream_refcnt ) { - operation_destroy_from_upstream( op ); - } } else { assert(0); ber_free( ber, 1 ); @@ -284,9 +263,8 @@ fail: "error on processing a response (%s) on upstream connection " "connid=%ld, tag=%lx\n", lload_msgtype2str( tag ), c->c_connid, tag ); - CONNECTION_DESTROY(c); + CONNECTION_LOCK_DESTROY(c); } - /* We leave the connection locked */ return rc; } @@ -459,19 +437,16 @@ upstream_bind_cb( LloadConnection *c ) ber_len_t len; int rc; - CONNECTION_UNLOCK_INCREF(c); if ( ber_peek_tag( ber, &len ) == LDAP_TAG_SASL_RES_CREDS && ber_scanf( ber, "m", &scred ) == LBER_ERROR ) { Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: " "sasl bind response malformed\n" ); - CONNECTION_LOCK_DECREF(c); goto fail; } rc = sasl_bind_step( c, &scred, &ccred ); if ( rc != SASL_OK && ( rc != SASL_CONTINUE || result == LDAP_SUCCESS ) ) { - CONNECTION_LOCK_DECREF(c); goto fail; } @@ -482,7 +457,6 @@ upstream_bind_cb( LloadConnection *c ) outber = c->c_pendingber; if ( outber == NULL && (outber = ber_alloc()) == NULL ) { ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); - CONNECTION_LOCK_DECREF(c); goto fail; } c->c_pendingber = outber; @@ -496,27 +470,25 @@ upstream_bind_cb( LloadConnection *c ) connection_write_cb( -1, 0, c ); - CONNECTION_LOCK_DECREF(c); if ( rc == SASL_OK ) { BER_BVZERO( &c->c_sasl_bind_mech ); } break; } - CONNECTION_LOCK_DECREF(c); } if ( result == LDAP_SASL_BIND_IN_PROGRESS ) { goto fail; } #endif /* HAVE_CYRUS_SASL */ + CONNECTION_LOCK(c); c->c_pdu_cb = handle_one_response; c->c_state = LLOAD_C_READY; c->c_type = LLOAD_C_OPEN; c->c_read_timeout = NULL; - event_add( c->c_read_event, c->c_read_timeout ); Debug( LDAP_DEBUG_CONNS, "upstream_bind_cb: " "connid=%lu finished binding, now active\n", c->c_connid ); - CONNECTION_UNLOCK_INCREF(c); + CONNECTION_UNLOCK(c); ldap_pvt_thread_mutex_lock( &b->b_mutex ); LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next ); b->b_active++; @@ -531,7 +503,6 @@ upstream_bind_cb( LloadConnection *c ) b->b_last_conn = c; backend_retry( b ); ldap_pvt_thread_mutex_unlock( &b->b_mutex ); - CONNECTION_LOCK_DECREF(c); break; default: Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: " @@ -540,12 +511,13 @@ upstream_bind_cb( LloadConnection *c ) goto fail; } + event_add( c->c_read_event, c->c_read_timeout ); ber_free( ber, 1 ); - return LDAP_SUCCESS; + return -1; fail: + CONNECTION_LOCK_DESTROY(c); ber_free( ber, 1 ); - CONNECTION_DESTROY(c); return -1; } @@ -556,9 +528,17 @@ upstream_bind( void *ctx, void *arg ) BerElement *ber; ber_int_t msgid; - CONNECTION_LOCK_DECREF(c); + /* A reference was passed on to us */ + assert( IS_ALIVE( c, c_refcnt ) ); + + if ( !IS_ALIVE( c, c_live ) ) { + RELEASE_REF( c, c_refcnt, c->c_destroy ); + return NULL; + } + + CONNECTION_LOCK(c); c->c_pdu_cb = upstream_bind_cb; - CONNECTION_UNLOCK_INCREF(c); + CONNECTION_UNLOCK(c); ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); ber = c->c_pendingber; @@ -599,16 +579,18 @@ upstream_bind( void *ctx, void *arg ) connection_write_cb( -1, 0, c ); - CONNECTION_LOCK_DECREF(c); + CONNECTION_LOCK(c); c->c_read_timeout = lload_timeout_net; event_add( c->c_read_event, c->c_read_timeout ); - CONNECTION_UNLOCK_OR_DESTROY(c); + CONNECTION_UNLOCK(c); + RELEASE_REF( c, c_refcnt, c->c_destroy ); return NULL; fail: ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); CONNECTION_LOCK_DESTROY(c); + RELEASE_REF( c, c_refcnt, c->c_destroy ); return NULL; } @@ -621,6 +603,7 @@ upstream_finish( LloadConnection *c ) LloadBackend *b = c->c_private; int is_bindconn = 0; + assert( c->c_live ); c->c_pdu_cb = handle_one_response; /* Unless we are configured to use the VC exop, consider allocating the @@ -675,7 +658,8 @@ upstream_finish( LloadConnection *c ) c->c_connid ); return -1; } - c->c_refcnt++; + /* keep a reference for upstream_bind */ + acquire_ref( &c->c_refcnt ); Debug( LDAP_DEBUG_CONNS, "upstream_finish: " "scheduled a bind callback for connid=%lu\n", @@ -697,6 +681,7 @@ upstream_tls_handshake_cb( evutil_socket_t s, short what, void *arg ) { LloadConnection *c = arg; LloadBackend *b; + epoch_t epoch; int rc = LDAP_SUCCESS; CONNECTION_LOCK(c); @@ -737,9 +722,9 @@ upstream_tls_handshake_cb( evutil_socket_t s, short what, void *arg ) c->c_connid ); c->c_is_tls = LLOAD_TLS_ESTABLISHED; - CONNECTION_UNLOCK_INCREF(c); + CONNECTION_UNLOCK(c); ldap_pvt_thread_mutex_lock( &b->b_mutex ); - CONNECTION_LOCK_DECREF(c); + CONNECTION_LOCK(c); rc = upstream_finish( c ); ldap_pvt_thread_mutex_unlock( &b->b_mutex ); @@ -753,14 +738,18 @@ upstream_tls_handshake_cb( evutil_socket_t s, short what, void *arg ) "connid=%lu need write rc=%d\n", c->c_connid, rc ); } - CONNECTION_UNLOCK_OR_DESTROY(c); + CONNECTION_UNLOCK(c); return; fail: Debug( LDAP_DEBUG_CONNS, "upstream_tls_handshake_cb: " "connid=%lu failed rc=%d\n", c->c_connid, rc ); + + assert( c->c_ops == NULL ); + epoch = epoch_join(); CONNECTION_DESTROY(c); + epoch_leave( epoch ); } static int @@ -774,6 +763,7 @@ upstream_starttls( LloadConnection *c ) ber_tag_t tag; c->c_currentber = NULL; + CONNECTION_LOCK(c); if ( ber_scanf( ber, "it", &msgid, &tag ) == LBER_ERROR ) { Debug( LDAP_DEBUG_ANY, "upstream_starttls: " @@ -824,9 +814,9 @@ upstream_starttls( LloadConnection *c ) } c->c_is_tls = LLOAD_CLEARTEXT; - CONNECTION_UNLOCK_INCREF(c); + CONNECTION_UNLOCK(c); ldap_pvt_thread_mutex_lock( &b->b_mutex ); - CONNECTION_LOCK_DECREF(c); + CONNECTION_LOCK(c); rc = upstream_finish( c ); ldap_pvt_thread_mutex_unlock( &b->b_mutex ); @@ -836,7 +826,7 @@ upstream_starttls( LloadConnection *c ) } ber_free( ber, 1 ); - CONNECTION_UNLOCK_OR_DESTROY(c); + CONNECTION_UNLOCK(c); return rc; } @@ -884,6 +874,7 @@ upstream_init( ber_socket_t s, LloadBackend *b ) return NULL; } + CONNECTION_LOCK(c); c->c_private = b; c->c_is_tls = b->b_tls; c->c_pdu_cb = handle_one_response; @@ -939,14 +930,15 @@ upstream_init( ber_socket_t s, LloadBackend *b ) ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); c->c_pdu_cb = upstream_starttls; - CONNECTION_UNLOCK_INCREF(c); + CONNECTION_UNLOCK(c); connection_write_cb( s, 0, c ); - CONNECTION_LOCK_DECREF(c); + CONNECTION_LOCK(c); } event_add( c->c_read_event, c->c_read_timeout ); c->c_destroy = upstream_destroy; - CONNECTION_UNLOCK_OR_DESTROY(c); + c->c_unlink = upstream_unlink; + CONNECTION_UNLOCK(c); return c; @@ -961,46 +953,39 @@ fail: } c->c_state = LLOAD_C_INVALID; - CONNECTION_DESTROY(c); - assert( c == NULL ); + c->c_live--; + c->c_refcnt--; + connection_destroy( c ); return NULL; } -void -upstream_destroy( LloadConnection *c ) +static void +upstream_unlink( LloadConnection *c ) { LloadBackend *b = c->c_private; struct event *read_event, *write_event; - TAvlnode *root, *node; + TAvlnode *root; long freed, executing; - enum sc_state state; - Debug( LDAP_DEBUG_CONNS, "upstream_destroy: " - "freeing connection connid=%lu\n", + Debug( LDAP_DEBUG_CONNS, "upstream_unlink: " + "removing upstream connid=%lu\n", c->c_connid ); assert( c->c_state != LLOAD_C_INVALID ); - state = c->c_state; - c->c_state = LLOAD_C_INVALID; + assert( c->c_state != LLOAD_C_DYING ); + + c->c_state = LLOAD_C_DYING; + + read_event = c->c_read_event; + write_event = c->c_write_event; root = c->c_ops; c->c_ops = NULL; executing = c->c_n_ops_executing; c->c_n_ops_executing = 0; - read_event = c->c_read_event; - write_event = c->c_write_event; - - for ( node = tavl_end( root, TAVL_DIR_LEFT ); node; - node = tavl_next( node, TAVL_DIR_RIGHT ) ) { - LloadOperation *op = node->avl_data; - - op->o_res = LLOAD_OP_FAILED; - op->o_upstream_refcnt++; - } - - CONNECTION_UNLOCK_INCREF(c); + CONNECTION_UNLOCK(c); freed = tavl_free( root, (AVL_FREE)operation_lost_upstream ); assert( freed == executing ); @@ -1018,44 +1003,53 @@ upstream_destroy( LloadConnection *c ) event_del( write_event ); } - /* Remove from the backend on first pass */ - if ( state != LLOAD_C_DYING ) { - ldap_pvt_thread_mutex_lock( &b->b_mutex ); - if ( c->c_type == LLOAD_C_PREPARING ) { - LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next ); - b->b_opening--; - b->b_failed++; - } else if ( c->c_type == LLOAD_C_BIND ) { - if ( c == b->b_last_bindconn ) { - LloadConnection *prev = - LDAP_CIRCLEQ_LOOP_PREV( &b->b_bindconns, c, c_next ); - if ( prev == c ) { - b->b_last_bindconn = NULL; - } else { - b->b_last_bindconn = prev; - } + ldap_pvt_thread_mutex_lock( &b->b_mutex ); + if ( c->c_type == LLOAD_C_PREPARING ) { + LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next ); + b->b_opening--; + b->b_failed++; + } else if ( c->c_type == LLOAD_C_BIND ) { + if ( c == b->b_last_bindconn ) { + LloadConnection *prev = + LDAP_CIRCLEQ_LOOP_PREV( &b->b_bindconns, c, c_next ); + if ( prev == c ) { + b->b_last_bindconn = NULL; + } else { + b->b_last_bindconn = prev; } - LDAP_CIRCLEQ_REMOVE( &b->b_bindconns, c, c_next ); - b->b_bindavail--; - } else { - if ( c == b->b_last_conn ) { - LloadConnection *prev = - LDAP_CIRCLEQ_LOOP_PREV( &b->b_conns, c, c_next ); - if ( prev == c ) { - b->b_last_conn = NULL; - } else { - b->b_last_conn = prev; - } - } - LDAP_CIRCLEQ_REMOVE( &b->b_conns, c, c_next ); - b->b_active--; } - b->b_n_ops_executing -= executing; - backend_retry( b ); - ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + LDAP_CIRCLEQ_REMOVE( &b->b_bindconns, c, c_next ); + b->b_bindavail--; + } else { + if ( c == b->b_last_conn ) { + LloadConnection *prev = + LDAP_CIRCLEQ_LOOP_PREV( &b->b_conns, c, c_next ); + if ( prev == c ) { + b->b_last_conn = NULL; + } else { + b->b_last_conn = prev; + } + } + LDAP_CIRCLEQ_REMOVE( &b->b_conns, c, c_next ); + b->b_active--; } + b->b_n_ops_executing -= executing; + backend_retry( b ); + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); - CONNECTION_LOCK_DECREF(c); + CONNECTION_LOCK(c); +} + +void +upstream_destroy( LloadConnection *c ) +{ + Debug( LDAP_DEBUG_CONNS, "upstream_destroy: " + "freeing connection connid=%lu\n", + c->c_connid ); + + CONNECTION_LOCK(c); + assert( c->c_state == LLOAD_C_DYING ); + c->c_state = LLOAD_C_INVALID; if ( c->c_read_event ) { event_free( c->c_read_event ); @@ -1067,21 +1061,8 @@ upstream_destroy( LloadConnection *c ) c->c_write_event = NULL; } - /* - * If we attempted to destroy any operations, we might have lent a new - * refcnt token for a thread that raced us to that, let them call us again - * later - */ - assert( c->c_refcnt >= 0 ); - if ( c->c_refcnt ) { - c->c_state = LLOAD_C_DYING; - Debug( LDAP_DEBUG_CONNS, "upstream_destroy: " - "connid=%lu aborting with refcnt=%d\n", - c->c_connid, c->c_refcnt ); - CONNECTION_UNLOCK(c); - return; + if ( c->c_type != LLOAD_C_BIND ) { + BER_BVZERO( &c->c_sasl_bind_mech ); } - - BER_BVZERO( &c->c_sasl_bind_mech ); connection_destroy( c ); }