diff --git a/servers/lloadd/backend.c b/servers/lloadd/backend.c index 9309abbe5d..43bd05e386 100644 --- a/servers/lloadd/backend.c +++ b/servers/lloadd/backend.c @@ -123,6 +123,7 @@ backend_select( Operation *op ) LDAP_LIST_FOREACH( c, head, c_next ) { ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); + CONNECTION_LOCK(c); if ( c->c_state == SLAP_C_READY && !c->c_pendingber && ( b->b_max_conn_pending == 0 || c->c_n_ops_executing < b->b_max_conn_pending ) ) { @@ -132,9 +133,11 @@ backend_select( Operation *op ) b->b_n_ops_executing++; c->c_n_ops_executing++; + CONNECTION_UNLOCK_INCREF(c); ldap_pvt_thread_mutex_unlock( &b->b_mutex ); return c; } + CONNECTION_UNLOCK(c); ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); } ldap_pvt_thread_mutex_unlock( &b->b_mutex ); diff --git a/servers/lloadd/bind.c b/servers/lloadd/bind.c index 09a4b5661b..077167a594 100644 --- a/servers/lloadd/bind.c +++ b/servers/lloadd/bind.c @@ -25,7 +25,8 @@ #include "slap.h" /* - * We hold op->o_upstream->c_io_mutex on entering the function. + * On entering the function, we've put a reference on both connections and hold + * upstream's c_io_mutex. */ static int request_bind( Operation *op ) @@ -65,7 +66,7 @@ request_bind( Operation *op ) goto fail; } - ldap_pvt_thread_mutex_lock( &client->c_mutex ); + CONNECTION_LOCK(client); if ( !BER_BVISNULL( &client->c_auth ) ) { ch_free( client->c_auth.bv_val ); } @@ -81,9 +82,9 @@ request_bind( Operation *op ) } else { BER_BVZERO( &client->c_auth ); } - ldap_pvt_thread_mutex_unlock( &client->c_mutex ); + CONNECTION_UNLOCK(client); - ldap_pvt_thread_mutex_lock( &upstream->c_mutex ); + CONNECTION_LOCK(upstream); op->o_upstream_msgid = upstream->c_next_msgid++; ber_printf( ber, "t{titOtO}", LDAP_TAG_MESSAGE, @@ -99,7 +100,7 @@ request_bind( Operation *op ) avl_dup_error ) ) { assert(0); } - ldap_pvt_thread_mutex_unlock( &upstream->c_mutex ); + CONNECTION_UNLOCK(upstream); ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex ); @@ -112,13 +113,12 @@ fail: ber_free( copy, 0 ); } ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex ); - ldap_pvt_thread_mutex_lock( &op->o_client->c_mutex ); - client_destroy( op->o_client ); return 1; } /* - * We hold op->o_upstream->c_io_mutex on entering the function. + * On entering the function, we've put a reference on both connections and hold + * upstream's c_io_mutex. */ static int request_bind_as_vc( Operation *op ) @@ -166,7 +166,7 @@ request_bind_as_vc( Operation *op ) op->o_upstream_msgid = upstream->c_next_msgid++; - ldap_pvt_thread_mutex_lock( &upstream->c_mutex ); + CONNECTION_LOCK(upstream); ber_printf( ber, "t{tit{tst{{tOOtOtO}}}}", LDAP_TAG_MESSAGE, LDAP_TAG_MSGID, op->o_upstream_msgid, LDAP_REQ_EXTENDED, @@ -175,23 +175,23 @@ request_bind_as_vc( Operation *op ) LDAP_TAG_EXOP_VERIFY_CREDENTIALS_COOKIE, BER_BV_OPTIONAL( &upstream->c_vc_cookie ), &binddn, tag, &auth, LDAP_TAG_EXOP_VERIFY_CREDENTIALS_CONTROLS, BER_BV_OPTIONAL( &op->o_ctrls ) ); - ldap_pvt_thread_mutex_unlock( &upstream->c_mutex ); + CONNECTION_UNLOCK(upstream); tag = ber_peek_tag( copy, &len ); switch ( tag ) { case LDAP_AUTH_SASL: ber_get_stringbv( copy, &mech, LBER_BV_NOTERM ); - ldap_pvt_thread_mutex_lock( &client->c_mutex ); + CONNECTION_LOCK(client); if ( ber_bvcmp( &mech, &client->c_sasl_bind_mech ) ) { ber_memfree( client->c_sasl_bind_mech.bv_val ); ber_dupbv( &client->c_sasl_bind_mech, &mech ); } - ldap_pvt_thread_mutex_unlock( &client->c_mutex ); + CONNECTION_UNLOCK(client); /* TODO: extract authzdn from the message */ break; case LDAP_AUTH_SIMPLE: - ldap_pvt_thread_mutex_lock( &client->c_mutex ); + CONNECTION_LOCK(client); if ( !BER_BVISNULL( &client->c_auth ) ) { ch_free( client->c_auth.bv_val ); } @@ -211,7 +211,7 @@ request_bind_as_vc( Operation *op ) ber_memfree( client->c_sasl_bind_mech.bv_val ); BER_BVZERO( &client->c_sasl_bind_mech ); } - ldap_pvt_thread_mutex_unlock( &client->c_mutex ); + CONNECTION_UNLOCK(client); break; default: result = LDAP_PROTOCOL_ERROR; @@ -219,7 +219,7 @@ request_bind_as_vc( Operation *op ) goto fail; } - ldap_pvt_thread_mutex_lock( &upstream->c_mutex ); + CONNECTION_LOCK(upstream); Debug( LDAP_DEBUG_TRACE, "request_bind_as_vc: " "added bind from client connid=%lu to upstream connid=%lu as VC " "exop msgid=%d\n", @@ -228,12 +228,13 @@ request_bind_as_vc( Operation *op ) avl_dup_error ) ) { assert(0); } - ldap_pvt_thread_mutex_unlock( &upstream->c_mutex ); + CONNECTION_UNLOCK(upstream); ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex ); ber_free( copy, 0 ); upstream_write_cb( -1, 0, upstream ); + return 0; fail: @@ -242,8 +243,6 @@ fail: } ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex ); operation_send_reject( op, result, msg, 1 ); - ldap_pvt_thread_mutex_lock( &client->c_mutex ); - client_destroy( client ); return 1; } @@ -255,7 +254,7 @@ client_reset( void *ctx, void *arg ) TAvlnode *root; int freed, destroy = 1; - ldap_pvt_thread_mutex_lock( &c->c_mutex ); + CONNECTION_LOCK(c); root = c->c_ops; c->c_ops = NULL; c->c_state = SLAP_C_CLOSING; @@ -271,7 +270,7 @@ client_reset( void *ctx, void *arg ) ch_free( c->c_sasl_bind_mech.bv_val ); BER_BVZERO( &c->c_sasl_bind_mech ); } - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK_INCREF(c); tavl_delete( &root, op, operation_client_cmp ); freed = tavl_free( root, (AVL_FREE)operation_abandon ); @@ -282,8 +281,10 @@ client_reset( void *ctx, void *arg ) if ( destroy ) { operation_destroy( op ); - ldap_pvt_thread_mutex_lock( &c->c_mutex ); - client_destroy( c ); + CLIENT_LOCK_DESTROY(c); + } else { + CONNECTION_LOCK_DECREF(c); + CLIENT_UNLOCK_OR_DESTROY(c); } return NULL; @@ -296,6 +297,9 @@ client_bind( void *ctx, void *arg ) Connection *upstream, *client = op->o_client; int rc = 0; + CONNECTION_LOCK(client); + CONNECTION_UNLOCK_INCREF(client); + client_reset( ctx, arg ); upstream = backend_select( op ); @@ -304,6 +308,8 @@ client_bind( void *ctx, void *arg ) "no available connection found\n" ); operation_send_reject( op, LDAP_UNAVAILABLE, "no connections available", 1 ); + CONNECTION_LOCK_DECREF(client); + CLIENT_UNLOCK_OR_DESTROY(client); return NULL; } @@ -315,15 +321,18 @@ client_bind( void *ctx, void *arg ) rc = request_bind( op ); } + CONNECTION_LOCK_DECREF(upstream); + UPSTREAM_UNLOCK_OR_DESTROY(upstream); + if ( rc ) { - /* client doesn't exist anymore */ + CLIENT_LOCK_DESTROY(client); return NULL; } - ldap_pvt_thread_mutex_lock( &client->c_mutex ); + CONNECTION_LOCK_DECREF(client); rc = tavl_insert( &client->c_ops, op, operation_client_cmp, avl_dup_error ); assert( rc == LDAP_SUCCESS ); - ldap_pvt_thread_mutex_unlock( &client->c_mutex ); + CLIENT_UNLOCK_OR_DESTROY(client); return NULL; } diff --git a/servers/lloadd/client.c b/servers/lloadd/client.c index 986c5e8563..6838508707 100644 --- a/servers/lloadd/client.c +++ b/servers/lloadd/client.c @@ -34,7 +34,10 @@ client_read_cb( evutil_socket_t s, short what, void *arg ) ber_len_t len; int rc = 0; - ldap_pvt_thread_mutex_lock( &c->c_mutex ); + /* What if the shutdown is already in progress and we get to lock the + * connection? */ + CONNECTION_LOCK(c); + Debug( LDAP_DEBUG_CONNS, "client_read_cb: " "connection %lu ready to read\n", c->c_connid ); @@ -43,6 +46,7 @@ client_read_cb( evutil_socket_t s, short what, void *arg ) if ( ber == NULL && (ber = ber_alloc()) == NULL ) { Debug( LDAP_DEBUG_ANY, "client_read_cb: " "ber_alloc failed\n" ); + CLIENT_DESTROY(c); goto fail; } @@ -57,10 +61,11 @@ client_read_cb( evutil_socket_t s, short what, void *arg ) c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) ); c->c_currentber = NULL; + CLIENT_DESTROY(c); goto fail; } c->c_currentber = ber; - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK(c); return; } @@ -70,21 +75,21 @@ client_read_cb( evutil_socket_t s, short what, void *arg ) if ( !op ) { Debug( LDAP_DEBUG_ANY, "client_read_cb: " "operation_init failed\n" ); - ber_free( ber, 1 ); + CLIENT_DESTROY(c); goto fail; } switch ( op->o_tag ) { case LDAP_REQ_UNBIND: - /* We do not expect anything more from the client */ + /* We do not expect anything more from the client. Also, we are the + * read event, so don't need to unlock */ event_del( c->c_read_event ); rc = ldap_pvt_thread_pool_submit( &connection_pool, client_reset, op ); if ( rc ) { - tavl_delete( &c->c_ops, op, operation_client_cmp ); - operation_destroy( op ); - client_destroy( c ); + CONNECTION_UNLOCK(c); + client_reset( NULL, op ); return; } break; @@ -94,7 +99,7 @@ client_read_cb( evutil_socket_t s, short what, void *arg ) break; default: if ( c->c_state == SLAP_C_BINDING ) { - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK(c); operation_send_reject( op, LDAP_PROTOCOL_ERROR, "bind in progress", 0 ); return; @@ -104,8 +109,11 @@ client_read_cb( evutil_socket_t s, short what, void *arg ) break; } + /* FIXME: unlocks in this function need more thought when we refcount + * operations */ + CONNECTION_UNLOCK(c); + if ( !rc ) { - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); return; } @@ -113,13 +121,11 @@ fail: if ( op ) { operation_send_reject( op, LDAP_OTHER, "server error or overloaded", 1 ); - op->o_client = NULL; operation_destroy( op ); } else if ( ber ) { ber_free( ber, 1 ); } - client_destroy( c ); return; } @@ -128,6 +134,11 @@ client_write_cb( evutil_socket_t s, short what, void *arg ) { Connection *c = arg; + /* What if the shutdown is already in progress and we get to lock the + * connection? */ + CONNECTION_LOCK(c); + CONNECTION_UNLOCK_INCREF(c); + ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); Debug( LDAP_DEBUG_CONNS, "client_write_cb: " "have something to write to client %lu\n", @@ -136,15 +147,17 @@ client_write_cb( evutil_socket_t s, short what, void *arg ) if ( ber_flush( c->c_sb, c->c_pendingber, 1 ) ) { int err = sock_errno(); if ( err != EWOULDBLOCK && err != EAGAIN ) { - ldap_pvt_thread_mutex_lock( &c->c_mutex ); ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); - client_destroy( c ); + CLIENT_LOCK_DESTROY(c); return; } event_add( c->c_write_event, NULL ); } c->c_pendingber = NULL; ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + + CONNECTION_LOCK_DECREF(c); + CLIENT_UNLOCK_OR_DESTROY(c); } Connection * @@ -183,7 +196,7 @@ client_init( c->c_write_event = event; c->c_private = listener; - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK(c); return c; fail: diff --git a/servers/lloadd/connection.c b/servers/lloadd/connection.c index 47752eefec..b1720c1efe 100644 --- a/servers/lloadd/connection.c +++ b/servers/lloadd/connection.c @@ -58,7 +58,10 @@ connection_destroy( Connection *c ) "destroying connection %lu.\n", c->c_connid ); + assert( c->c_live == 0 ); + assert( c->c_refcnt == 0 ); assert( c->c_state == SLAP_C_INVALID ); + ber_sockbuf_free( c->c_sb ); if ( c->c_currentber ) { @@ -68,7 +71,7 @@ connection_destroy( Connection *c ) ber_free( c->c_pendingber, 1 ); } - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK(c); ldap_pvt_thread_mutex_destroy( &c->c_io_mutex ); ldap_pvt_thread_mutex_destroy( &c->c_mutex ); @@ -148,6 +151,7 @@ connection_init( ber_socket_t s, const char *peername, int flags ) #endif c->c_next_msgid = 1; + c->c_refcnt = c->c_live = 1; ldap_pvt_thread_mutex_init( &c->c_mutex ); ldap_pvt_thread_mutex_init( &c->c_io_mutex ); @@ -158,7 +162,7 @@ connection_init( ber_socket_t s, const char *peername, int flags ) "connection connid=%lu allocated for socket fd=%d\n", c->c_connid, s ); - ldap_pvt_thread_mutex_lock( &c->c_mutex ); + CONNECTION_LOCK(c); c->c_state = SLAP_C_ACTIVE; return c; diff --git a/servers/lloadd/operation.c b/servers/lloadd/operation.c index b8f01d4066..c0af76e767 100644 --- a/servers/lloadd/operation.c +++ b/servers/lloadd/operation.c @@ -114,23 +114,23 @@ operation_destroy( Operation *op ) * something in to test with until we implement the freelist */ if ( op->o_client ) { c = op->o_client; - ldap_pvt_thread_mutex_lock( &c->c_mutex ); + CONNECTION_LOCK(c); if ( tavl_delete( &c->c_ops, op, operation_client_cmp ) ) { c->c_n_ops_executing--; } - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK(c); } if ( op->o_upstream ) { Backend *b = NULL; c = op->o_upstream; - ldap_pvt_thread_mutex_lock( &c->c_mutex ); + CONNECTION_LOCK(c); if ( tavl_delete( &c->c_ops, op, operation_upstream_cmp ) ) { c->c_n_ops_executing--; b = (Backend *)c->c_private; } - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK(c); if ( b ) { ldap_pvt_thread_mutex_lock( &b->b_mutex ); @@ -142,6 +142,9 @@ operation_destroy( Operation *op ) ch_free( op ); } +/* + * Entered holding c_mutex for now. + */ Operation * operation_init( Connection *c, BerElement *ber ) { @@ -208,13 +211,13 @@ operation_abandon( Operation *op ) BerElement *ber; Backend *b; - ldap_pvt_thread_mutex_lock( &c->c_mutex ); + CONNECTION_LOCK(c); rc = ( tavl_delete( &c->c_ops, op, operation_upstream_cmp ) == NULL ); if ( !rc ) { c->c_n_ops_executing--; } b = (Backend *)c->c_private; - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK_INCREF(c); if ( rc ) { /* The operation has already been abandoned or finished */ @@ -231,7 +234,9 @@ operation_abandon( Operation *op ) if ( ber == NULL && (ber = ber_alloc()) == NULL ) { Debug( LDAP_DEBUG_ANY, "operation_abandon: " "ber_alloc failed\n" ); + CONNECTION_LOCK_DECREF(c); ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + UPSTREAM_UNLOCK_OR_DESTROY(c); goto done; } c->c_pendingber = ber; @@ -244,7 +249,9 @@ operation_abandon( Operation *op ) ber_free( ber, 1 ); } + CONNECTION_LOCK_DECREF(c); ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + UPSTREAM_UNLOCK_OR_DESTROY(c); if ( rc != -1 ) { upstream_write_cb( -1, 0, c ); @@ -270,20 +277,20 @@ operation_send_reject( "rejecting %s from client %lu with message: \"%s\"\n", slap_msgtype2str( op->o_tag ), op->o_client_connid, msg ); - ldap_pvt_thread_mutex_lock( &c->c_mutex ); + CONNECTION_LOCK(c); found = ( tavl_delete( &c->c_ops, op, operation_client_cmp ) == op ); - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); - if ( !found && !send_anyway ) { + CONNECTION_UNLOCK(c); return; } + CONNECTION_UNLOCK_INCREF(c); ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); ber = c->c_pendingber; if ( ber == NULL && (ber = ber_alloc()) == NULL ) { ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); - client_destroy( c ); + CLIENT_LOCK_DESTROY(c); return; } c->c_pendingber = ber; @@ -297,6 +304,9 @@ operation_send_reject( client_write_cb( -1, 0, c ); operation_destroy( op ); + + CONNECTION_LOCK_DECREF(c); + CLIENT_UNLOCK_OR_DESTROY(c); } void @@ -304,6 +314,7 @@ operation_lost_upstream( Operation *op ) { operation_send_reject( op, LDAP_UNAVAILABLE, "connection to the remote server has been severed", 0 ); + operation_destroy( op ); } void * @@ -330,15 +341,15 @@ request_process( void *ctx, void *arg ) } upstream->c_pendingber = output; - ldap_pvt_thread_mutex_unlock( &upstream->c_mutex ); + CONNECTION_LOCK_DECREF(upstream); op->o_upstream_msgid = msgid = upstream->c_next_msgid++; rc = tavl_insert( &upstream->c_ops, op, operation_upstream_cmp, avl_dup_error ); - ldap_pvt_thread_mutex_unlock( &upstream->c_mutex ); + CONNECTION_UNLOCK_INCREF(upstream); assert( rc == LDAP_SUCCESS ); if ( lload_features & LLOAD_FEATURE_PROXYAUTHZ ) { - ldap_pvt_thread_mutex_lock( &client->c_mutex ); + CONNECTION_LOCK(client); Debug( LDAP_DEBUG_TRACE, "request_process: " "proxying identity %s to upstream\n", client->c_auth.bv_val ); @@ -347,7 +358,7 @@ request_process( void *ctx, void *arg ) op->o_tag, &op->o_request, LDAP_TAG_CONTROLS, LDAP_CONTROL_PROXY_AUTHZ, 1, &client->c_auth ); - ldap_pvt_thread_mutex_unlock( &client->c_mutex ); + CONNECTION_UNLOCK(client); if ( !BER_BVISNULL( &op->o_ctrls ) ) { BerElement *control_ber = ber_alloc(); @@ -373,11 +384,16 @@ request_process( void *ctx, void *arg ) upstream_write_cb( -1, 0, upstream ); + CONNECTION_LOCK_DECREF(upstream); + UPSTREAM_UNLOCK_OR_DESTROY(upstream); + return NULL; fail: if ( upstream ) { ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex ); + CONNECTION_LOCK_DECREF(upstream); + UPSTREAM_UNLOCK_OR_DESTROY(upstream); } operation_send_reject( op, LDAP_OTHER, "internal error", 0 ); return NULL; diff --git a/servers/lloadd/slap.h b/servers/lloadd/slap.h index fba2a68907..ef7dd51507 100644 --- a/servers/lloadd/slap.h +++ b/servers/lloadd/slap.h @@ -283,8 +283,74 @@ struct Connection { enum sc_type c_type; ber_socket_t c_fd; +/* + * Connection reference counting: + * - connection has a reference counter in c_refcnt + * - also a liveness/validity token is added to c_refcnt during + * connection_init, its existence is tracked in c_live and is usually the + * only one that prevents it from being destroyed + * - anyone who needs to be able to lock the connection after unlocking it has + * to use CONNECTION_UNLOCK_INCREF, they are then responsible that + * CONNECTION_LOCK_DECREF+CONNECTION_UNLOCK_OR_DESTROY is used when they are + * done with it + * - when a connection is considered dead, use (UPSTREAM|CLIENT)_DESTROY on a + * locked connection, it might get disposed of or if anyone still holds a + * token, it just gets unlocked and it's the last token holder's + * responsibility to run *_UNLOCK_OR_DESTROY + * - (UPSTREAM|CLIENT)_LOCK_DESTROY is a shorthand for locking, decreasing + * refcount and (UPSTREAM|CLIENT)_DESTROY + */ ldap_pvt_thread_mutex_t c_mutex; /* protect the connection */ - Sockbuf *c_sb; /* ber connection stuff */ + int c_refcnt, c_live; +#define CONNECTION_LOCK(c) ldap_pvt_thread_mutex_lock( &(c)->c_mutex ) +#define CONNECTION_UNLOCK(c) ldap_pvt_thread_mutex_unlock( &(c)->c_mutex ) +#define CONNECTION_LOCK_DECREF(c) \ + do { \ + CONNECTION_LOCK(c); \ + (c)->c_refcnt--; \ + } while (0) +#define CONNECTION_UNLOCK_INCREF(c) \ + do { \ + (c)->c_refcnt++; \ + CONNECTION_UNLOCK(c); \ + } while (0) +#define CONNECTION_UNLOCK_OR_DESTROY(type, c) \ + do { \ + assert( (c)->c_refcnt >= 0 ); \ + if ( !( c )->c_refcnt ) { \ + Debug( LDAP_DEBUG_TRACE, "%s: destroying " #type " connection connid=%lu\n", \ + __func__, (c)->c_connid ); \ + type##_destroy( (c) ); \ + (c) = NULL; \ + } else { \ + CONNECTION_UNLOCK(c); \ + } \ + } while (0) +#define CONNECTION_DESTROY(type, c) \ + do { \ + (c)->c_refcnt -= (c)->c_live; \ + (c)->c_live = 0; \ + CONNECTION_UNLOCK_OR_DESTROY(type, c); \ + } while (0) + +#define UPSTREAM_UNLOCK_OR_DESTROY(c) \ + CONNECTION_UNLOCK_OR_DESTROY(upstream, c); +#define UPSTREAM_DESTROY(c) CONNECTION_DESTROY(upstream, c) +#define UPSTREAM_LOCK_DESTROY(c) \ + do { \ + CONNECTION_LOCK_DECREF(c); \ + UPSTREAM_DESTROY(c); \ + } while (0); + +#define CLIENT_UNLOCK_OR_DESTROY(c) CONNECTION_UNLOCK_OR_DESTROY(client, c); +#define CLIENT_DESTROY(c) CONNECTION_DESTROY(client, c) +#define CLIENT_LOCK_DESTROY(c) \ + do { \ + CONNECTION_LOCK_DECREF(c); \ + CLIENT_DESTROY(c); \ + } while (0); + + Sockbuf *c_sb; /* ber connection stuff */ /* set by connection_init */ unsigned long c_connid; /* unique id of this connection */ @@ -294,6 +360,7 @@ struct Connection { time_t c_activitytime; /* when the connection was last used */ ber_int_t c_next_msgid; /* msgid of the next message */ + /* must not be used while holding either mutex */ struct event *c_read_event, *c_write_event; /* can only be changed by binding thread */ diff --git a/servers/lloadd/upstream.c b/servers/lloadd/upstream.c index a37fe1c3f2..2f898545dc 100644 --- a/servers/lloadd/upstream.c +++ b/servers/lloadd/upstream.c @@ -112,7 +112,7 @@ handle_bind_response( Operation *op, BerElement *ber ) break; case LDAP_SUCCESS: default: { - ldap_pvt_thread_mutex_lock( &c->c_mutex ); + CONNECTION_LOCK(c); c->c_state = SLAP_C_READY; if ( result != LDAP_SUCCESS ) { ber_memfree( c->c_auth.bv_val ); @@ -122,7 +122,7 @@ handle_bind_response( Operation *op, BerElement *ber ) ber_memfree( c->c_sasl_bind_mech.bv_val ); BER_BVZERO( &c->c_sasl_bind_mech ); } - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK(c); break; } } @@ -159,14 +159,14 @@ handle_vc_bind_response( Operation *op, BerElement *ber ) Connection *upstream = op->o_upstream; Backend *b; - ldap_pvt_thread_mutex_lock( &upstream->c_mutex ); + CONNECTION_LOCK(upstream); b = (Backend *)upstream->c_private; Debug( LDAP_DEBUG_ANY, "VC extended operation not supported on backend %s\n", b->b_bindconf.sb_uri.bv_val ); - ldap_pvt_thread_mutex_unlock( &upstream->c_mutex ); + CONNECTION_UNLOCK(upstream); } - ldap_pvt_thread_mutex_lock( &c->c_mutex ); + CONNECTION_LOCK(c); Debug( LDAP_DEBUG_CONNS, "handle_vc_bind_response: " "received response for bind request by client %lu, result=%d\n", @@ -179,7 +179,7 @@ handle_vc_bind_response( Operation *op, BerElement *ber ) tag = ber_scanf( ber, "o", &c->c_vc_cookie ); if ( tag == LBER_ERROR ) { rc = -1; - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK(c); goto done; } tag = ber_peek_tag( ber, &len ); @@ -189,7 +189,7 @@ handle_vc_bind_response( Operation *op, BerElement *ber ) tag = ber_scanf( ber, "m", &creds ); if ( tag == LBER_ERROR ) { rc = -1; - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK(c); goto done; } tag = ber_peek_tag( ber, &len ); @@ -199,7 +199,7 @@ handle_vc_bind_response( Operation *op, BerElement *ber ) tag = ber_scanf( ber, "m", &controls ); if ( tag == LBER_ERROR ) { rc = -1; - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK(c); goto done; } } @@ -225,7 +225,7 @@ handle_vc_bind_response( Operation *op, BerElement *ber ) break; } } - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK(c); ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); output = c->c_pendingber; @@ -258,11 +258,8 @@ static int handle_unsolicited( Connection *c, BerElement *ber ) { TAvlnode *root; - Backend *b; long freed, executing; - b = (Backend *)c->c_private; - Debug( LDAP_DEBUG_CONNS, "handle_unsolicited: " "teardown for upstream connection %lu\n", c->c_connid ); @@ -270,8 +267,7 @@ handle_unsolicited( Connection *c, BerElement *ber ) root = c->c_ops; c->c_ops = NULL; executing = c->c_n_ops_executing; - c->c_n_ops_executing = 0; - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK_INCREF(c); freed = tavl_free( root, (AVL_FREE)operation_lost_upstream ); assert( freed == executing ); @@ -279,14 +275,9 @@ handle_unsolicited( Connection *c, BerElement *ber ) "dropped %ld operations\n", freed ); - ldap_pvt_thread_mutex_lock( &c->c_mutex ); - upstream_destroy( c ); + UPSTREAM_LOCK_DESTROY(c); ber_free( ber, 1 ); - ldap_pvt_thread_mutex_lock( &b->b_mutex ); - b->b_n_ops_executing -= executing; - ldap_pvt_thread_mutex_unlock( &b->b_mutex ); - return -1; } @@ -321,7 +312,7 @@ handle_one_response( Connection *c ) OperationHandler handler = NULL; ber_tag_t tag; ber_len_t len; - int rc = 0; + int rc = LDAP_SUCCESS; ber = c->c_currentber; c->c_currentber = NULL; @@ -379,19 +370,20 @@ handle_one_response( Connection *c ) c->c_connid, slap_msgtype2str( tag ), needle.o_upstream_msgid ); } - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK_INCREF(c); if ( handler ) { rc = handler( op, ber ); } - ldap_pvt_thread_mutex_lock( &c->c_mutex ); + CONNECTION_LOCK_DECREF(c); fail: if ( rc ) { Debug( LDAP_DEBUG_ANY, "handle_one_response: " "error on processing a response on upstream connection %ld\n", c->c_connid ); - upstream_destroy( c ); + UPSTREAM_DESTROY(c); } + /* We leave the connection locked */ return rc; } @@ -413,15 +405,18 @@ handle_responses( void *ctx, void *arg ) Connection *c = arg; int responses_handled = 0; - ldap_pvt_thread_mutex_lock( &c->c_mutex ); + CONNECTION_LOCK(c); for ( ; responses_handled < slap_conn_max_pdus_per_cycle; responses_handled++ ) { BerElement *ber; ber_tag_t tag; ber_len_t len; + /* handle_one_response may unlock the connection in the process, we + * need to expect that might be our responsibility to destroy it */ if ( handle_one_response( c ) ) { - /* Error, connection might already have been destroyed */ + /* Error, connection is unlocked and might already have been + * destroyed */ return NULL; } /* Otherwise, handle_one_response leaves the connection locked */ @@ -429,7 +424,7 @@ handle_responses( void *ctx, void *arg ) if ( (ber = ber_alloc()) == NULL ) { Debug( LDAP_DEBUG_ANY, "handle_responses: " "ber_alloc failed\n" ); - upstream_destroy( c ); + UPSTREAM_DESTROY(c); return NULL; } c->c_currentber = ber; @@ -447,7 +442,7 @@ handle_responses( void *ctx, void *arg ) c->c_currentber = NULL; ber_free( ber, 1 ); - upstream_destroy( c ); + UPSTREAM_DESTROY(c); return NULL; } break; @@ -455,7 +450,7 @@ handle_responses( void *ctx, void *arg ) } event_add( c->c_read_event, NULL ); - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + UPSTREAM_UNLOCK_OR_DESTROY(c); return NULL; } @@ -474,7 +469,7 @@ upstream_read_cb( evutil_socket_t s, short what, void *arg ) ber_tag_t tag; ber_len_t len; - ldap_pvt_thread_mutex_lock( &c->c_mutex ); + CONNECTION_LOCK(c); Debug( LDAP_DEBUG_CONNS, "upstream_read_cb: " "connection %lu ready to read\n", c->c_connid ); @@ -483,7 +478,7 @@ upstream_read_cb( evutil_socket_t s, short what, void *arg ) if ( ber == NULL && (ber = ber_alloc()) == NULL ) { Debug( LDAP_DEBUG_ANY, "upstream_read_cb: " "ber_alloc failed\n" ); - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + UPSTREAM_DESTROY(c); return; } c->c_currentber = ber; @@ -500,11 +495,11 @@ upstream_read_cb( evutil_socket_t s, short what, void *arg ) c->c_currentber = NULL; ber_free( ber, 1 ); - upstream_destroy( c ); + UPSTREAM_DESTROY(c); return; } event_add( c->c_read_event, NULL ); - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK(c); return; } @@ -512,19 +507,22 @@ upstream_read_cb( evutil_socket_t s, short what, void *arg ) ldap_pvt_thread_pool_submit( &connection_pool, handle_responses, c ) ) { /* If we're overloaded or configured as such, process one and resume in - * the next cycle */ + * the next cycle. + * + * handle_one_response re-locks the mutex in the + * process, need to test it's still alive */ if ( handle_one_response( c ) == LDAP_SUCCESS ) { - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + UPSTREAM_UNLOCK_OR_DESTROY(c); } return; } event_del( c->c_read_event ); - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK(c); return; } -void +int upstream_finish( Connection *c ) { struct event_base *base; @@ -541,7 +539,7 @@ upstream_finish( Connection *c ) if ( !event ) { Debug( LDAP_DEBUG_ANY, "upstream_finish: " "Read event could not be allocated\n" ); - goto fail; + return -1; } event_add( event, NULL ); if ( c->c_read_event ) { @@ -552,19 +550,7 @@ upstream_finish( Connection *c ) c->c_state = SLAP_C_READY; - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); - return; -fail: - if ( c->c_write_event ) { - event_del( c->c_write_event ); - event_free( c->c_write_event ); - } - if ( c->c_read_event ) { - event_del( c->c_read_event ); - event_free( c->c_read_event ); - } - upstream_destroy( c ); - return; + return 0; } void @@ -577,7 +563,7 @@ upstream_bind_cb( evutil_socket_t s, short what, void *arg ) ber_len_t len; ber_int_t msgid, result; - ldap_pvt_thread_mutex_lock( &c->c_mutex ); + CONNECTION_LOCK(c); Debug( LDAP_DEBUG_CONNS, "upstream_bind_cb: " "connection %lu ready to read\n", c->c_connid ); @@ -586,7 +572,7 @@ upstream_bind_cb( evutil_socket_t s, short what, void *arg ) if ( ber == NULL && (ber = ber_alloc()) == NULL ) { Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: " "ber_alloc failed\n" ); - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK(c); return; } c->c_currentber = ber; @@ -604,7 +590,7 @@ upstream_bind_cb( evutil_socket_t s, short what, void *arg ) c->c_currentber = NULL; goto fail; } - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK(c); return; } c->c_currentber = NULL; @@ -631,7 +617,9 @@ upstream_bind_cb( evutil_socket_t s, short what, void *arg ) switch ( result ) { case LDAP_SUCCESS: - upstream_finish( c ); + if ( upstream_finish( c ) ) { + goto fail; + } break; #ifdef HAVE_CYRUS_SASL case LDAP_SASL_BIND_IN_PROGRESS: @@ -644,14 +632,14 @@ upstream_bind_cb( evutil_socket_t s, short what, void *arg ) goto fail; } - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK(c); ber_free( ber, 1 ); return; fail: ber_free( ber, 1 ); - upstream_destroy( c ); + UPSTREAM_DESTROY(c); } void @@ -659,6 +647,9 @@ upstream_write_cb( evutil_socket_t s, short what, void *arg ) { Connection *c = arg; + CONNECTION_LOCK(c); + CONNECTION_UNLOCK_INCREF(c); + ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); Debug( LDAP_DEBUG_CONNS, "upstream_write_cb: " "have something to write to upstream %lu\n", @@ -667,18 +658,20 @@ upstream_write_cb( evutil_socket_t s, short what, void *arg ) if ( ber_flush( c->c_sb, c->c_pendingber, 1 ) ) { int err = sock_errno(); if ( err != EWOULDBLOCK && err != EAGAIN ) { - ldap_pvt_thread_mutex_lock( &c->c_mutex ); Debug( LDAP_DEBUG_ANY, "upstream_write_cb: " "error writing to connection %ld\n", c->c_connid ); ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); - upstream_destroy( c ); + UPSTREAM_LOCK_DESTROY(c); return; } event_add( c->c_write_event, NULL ); } c->c_pendingber = NULL; ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + + CONNECTION_LOCK_DECREF(c); + UPSTREAM_UNLOCK_OR_DESTROY(c); } void * @@ -694,7 +687,7 @@ upstream_bind( void *ctx, void *arg ) assert( ber ); - ldap_pvt_thread_mutex_lock( &c->c_mutex ); + CONNECTION_LOCK(c); b = c->c_private; s = c->c_fd; base = slap_get_base( s ); @@ -703,7 +696,7 @@ upstream_bind( void *ctx, void *arg ) if ( !event ) { Debug( LDAP_DEBUG_ANY, "upstream_bind: " "Read event could not be allocated\n" ); - upstream_destroy( c ); + UPSTREAM_DESTROY(c); return NULL; } event_add( event, NULL ); @@ -715,7 +708,7 @@ upstream_bind( void *ctx, void *arg ) msgid = c->c_next_msgid++; - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK_INCREF(c); ldap_pvt_thread_mutex_lock( &b->b_mutex ); if ( b->b_bindconf.sb_method == LDAP_AUTH_SIMPLE ) { @@ -742,6 +735,9 @@ upstream_bind( void *ctx, void *arg ) upstream_write_cb( -1, 0, c ); + CONNECTION_LOCK_DECREF(c); + UPSTREAM_UNLOCK_OR_DESTROY(c); + return NULL; } @@ -786,7 +782,9 @@ upstream_init( ber_socket_t s, Backend *b ) } if ( is_bindconn || b->b_bindconf.sb_method == LDAP_AUTH_NONE ) { - upstream_finish( c ); + if ( upstream_finish( c ) ) { + goto fail; + } } else { ldap_pvt_thread_pool_submit( &connection_pool, upstream_bind, c ); } @@ -800,7 +798,7 @@ upstream_init( ber_socket_t s, Backend *b ) b->b_active++; } - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + CONNECTION_UNLOCK(c); return c; fail: @@ -812,7 +810,7 @@ fail: event_del( c->c_read_event ); event_free( c->c_read_event ); } - connection_destroy( c ); + UPSTREAM_DESTROY(c); return NULL; } @@ -820,6 +818,7 @@ void upstream_destroy( Connection *c ) { Backend *b = c->c_private; + struct event *read_event, *write_event; Debug( LDAP_DEBUG_CONNS, "upstream_destroy: " "freeing connection %lu\n", @@ -827,7 +826,21 @@ upstream_destroy( Connection *c ) assert( c->c_state != SLAP_C_INVALID ); c->c_state = SLAP_C_INVALID; - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + + read_event = c->c_read_event; + write_event = c->c_write_event; + CONNECTION_UNLOCK_INCREF(c); + + /* + * Avoid a deadlock: + * event_del will block if the event is currently executing its callback, + * that callback might be waiting to lock c->c_mutex + */ + event_del( read_event ); + event_free( read_event ); + + event_del( write_event ); + event_free( write_event ); ldap_pvt_thread_mutex_lock( &b->b_mutex ); LDAP_LIST_REMOVE( c, c_next ); @@ -840,13 +853,7 @@ upstream_destroy( Connection *c ) ldap_pvt_thread_mutex_unlock( &b->b_mutex ); backend_retry( b ); - ldap_pvt_thread_mutex_lock( &c->c_mutex ); - - event_del( c->c_read_event ); - event_free( c->c_read_event ); - - event_del( c->c_write_event ); - event_free( c->c_write_event ); + CONNECTION_LOCK_DECREF(c); connection_destroy( c ); }