diff --git a/servers/lloadd/backend.c b/servers/lloadd/backend.c index 534277b2c2..3a6d96234b 100644 --- a/servers/lloadd/backend.c +++ b/servers/lloadd/backend.c @@ -32,25 +32,26 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg ) { Backend *b = arg; Connection *c; - ber_socket_t s; + ber_socket_t s = AC_SOCKET_INVALID; int rc; + ldap_pvt_thread_mutex_lock( &b->b_mutex ); + if ( result || !res ) { Debug( LDAP_DEBUG_ANY, "upstream_name_cb: " "name resolution failed for backend '%s': %s\n", b->b_bindconf.sb_uri.bv_val, evutil_gai_strerror( result ) ); - return; + goto fail; } - s = socket( res->ai_family, SOCK_STREAM, 0 ); - if ( s == AC_SOCKET_INVALID ) { - return; + /* TODO: if we get failures, try the other addrinfos */ + if ( (s = socket( res->ai_family, SOCK_STREAM, 0 )) == + AC_SOCKET_INVALID ) { + goto fail; } - rc = ber_pvt_socket_set_nonblock( s, 1 ); - if ( rc ) { - evutil_closesocket( s ); - return; + if ( ber_pvt_socket_set_nonblock( s, 1 ) ) { + goto fail; } if ( res->ai_family == PF_INET ) { @@ -66,14 +67,21 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg ) Debug( LDAP_DEBUG_ANY, "upstream_name_cb: " "failed to connect to server '%s'\n", b->b_bindconf.sb_uri.bv_val ); - evutil_closesocket( s ); - return; + goto fail; } c = upstream_init( s, b ); - ldap_pvt_thread_mutex_lock( &b->b_mutex ); - b->b_conns = c; ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + backend_retry( b ); + return; + +fail: + if ( s != AC_SOCKET_INVALID ) { + evutil_closesocket( s ); + } + b->b_opening--; + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + backend_retry( b ); } Connection * @@ -81,48 +89,95 @@ backend_select( Operation *op ) { Backend *b; + /* TODO: Two runs, one with trylock, then one actually locked if we don't + * find anything? */ LDAP_STAILQ_FOREACH ( b, &backend, b_next ) { + struct ConnSt *head; Connection *c; ldap_pvt_thread_mutex_lock( &b->b_mutex ); - c = b->b_conns; - ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); - if ( c->c_state == SLAP_C_READY && !c->c_pendingber ) { - ldap_pvt_thread_mutex_unlock( &b->b_mutex ); - return b->b_conns; + if ( op->o_tag == LDAP_REQ_BIND && + !(lload_features & LLOAD_FEATURE_VC) ) { + head = &b->b_bindconns; + } else { + head = &b->b_conns; + } + + /* TODO: Use CIRCLEQ so that we can do a natural round robin over the + * backend's connections? */ + LDAP_LIST_FOREACH( c, head, c_next ) + { + ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); + if ( c->c_state == SLAP_C_READY && !c->c_pendingber ) { + Debug( LDAP_DEBUG_CONNS, "backend_select: " + "selected connection %lu for client %lu msgid=%d\n", + c->c_connid, op->o_client->c_connid, + op->o_client_msgid ); + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + return c; + } + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); } - ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); ldap_pvt_thread_mutex_unlock( &b->b_mutex ); } return NULL; } +void +backend_retry( Backend *b ) +{ + int rc, requested; + + ldap_pvt_thread_mutex_lock( &b->b_mutex ); + /* TODO: timeout regime */ + + requested = b->b_numconns; + if ( !(lload_features & LLOAD_FEATURE_VC) ) { + requested += b->b_numbindconns; + } + if ( b->b_active + b->b_bindavail + b->b_opening < requested ) { + b->b_opening++; + rc = ldap_pvt_thread_pool_submit( + &connection_pool, backend_connect, b ); + /* TODO check we're not shutting down */ + if ( rc ) { + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + backend_connect( NULL, b ); + return; + } + } + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); +} + void * backend_connect( void *ctx, void *arg ) { struct evutil_addrinfo hints = {}; Backend *b = arg; + char *hostname; + ldap_pvt_thread_mutex_lock( &b->b_mutex ); #ifdef LDAP_PF_LOCAL if ( b->b_proto == LDAP_PROTO_IPC ) { struct sockaddr_un addr; + Connection *c; ber_socket_t s = socket( PF_LOCAL, SOCK_STREAM, 0 ); int rc; if ( s == AC_SOCKET_INVALID ) { - return (void *)-1; + goto fail; } rc = ber_pvt_socket_set_nonblock( s, 1 ); if ( rc ) { evutil_closesocket( s ); - return (void *)-1; + goto fail; } if ( strlen( b->b_host ) > ( sizeof(addr.sun_path) - 1 ) ) { evutil_closesocket( s ); - return (void *)-1; + goto fail; } memset( &addr, '\0', sizeof(addr) ); addr.sun_family = AF_LOCAL; @@ -132,10 +187,12 @@ backend_connect( void *ctx, void *arg ) s, (struct sockaddr *)&addr, sizeof(struct sockaddr_un) ); if ( rc && errno != EINPROGRESS && errno != EWOULDBLOCK ) { evutil_closesocket( s ); - return (void *)-1; + goto fail; } - b->b_conns = upstream_init( s, b ); + c = upstream_init( s, b ); + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + backend_retry( b ); return NULL; } #endif /* LDAP_PF_LOCAL */ @@ -145,6 +202,15 @@ backend_connect( void *ctx, void *arg ) hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = IPPROTO_TCP; - evdns_getaddrinfo( dnsbase, b->b_host, NULL, &hints, upstream_name_cb, b ); + hostname = b->b_host; + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + + evdns_getaddrinfo( dnsbase, hostname, NULL, &hints, upstream_name_cb, b ); return NULL; + +fail: + b->b_opening--; + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + backend_retry( b ); + return (void *)-1; } diff --git a/servers/lloadd/config.c b/servers/lloadd/config.c index 0f13807d3f..84e1ba92d9 100644 --- a/servers/lloadd/config.c +++ b/servers/lloadd/config.c @@ -462,6 +462,9 @@ config_backend( ConfigArgs *c ) b = ch_calloc( 1, sizeof(Backend) ); + LDAP_LIST_INIT( &b->b_conns ); + LDAP_LIST_INIT( &b->b_bindconns ); + b->b_numconns = 1; b->b_numbindconns = 1; diff --git a/servers/lloadd/daemon.c b/servers/lloadd/daemon.c index 31382e594b..fbf8621500 100644 --- a/servers/lloadd/daemon.c +++ b/servers/lloadd/daemon.c @@ -1300,6 +1300,7 @@ slapd_daemon( struct event_base *daemon_base ) } LDAP_STAILQ_FOREACH ( b, &backend, b_next ) { + b->b_opening++; ldap_pvt_thread_pool_submit( &connection_pool, backend_connect, b ); } diff --git a/servers/lloadd/proto-slap.h b/servers/lloadd/proto-slap.h index b55b155de3..6c14fd8ddf 100644 --- a/servers/lloadd/proto-slap.h +++ b/servers/lloadd/proto-slap.h @@ -41,6 +41,7 @@ struct config_reply_s; /* config.h */ */ LDAP_SLAPD_F (void *) backend_connect( void *ctx, void *arg ); +LDAP_SLAPD_F (void) backend_retry( Backend *b ); LDAP_SLAPD_F (Connection *) backend_select( Operation *op ); /* diff --git a/servers/lloadd/slap.h b/servers/lloadd/slap.h index 39e97a0467..1ae43d3df2 100644 --- a/servers/lloadd/slap.h +++ b/servers/lloadd/slap.h @@ -248,7 +248,8 @@ struct Backend { char *b_host; int b_numconns, b_numbindconns; - Connection *b_conns, *b_bindconns; + int b_bindavail, b_active, b_opening; + LDAP_LIST_HEAD(ConnSt, Connection) b_conns, b_bindconns; LDAP_STAILQ_ENTRY(Backend) b_next; }; @@ -263,11 +264,16 @@ enum sc_state { SLAP_C_ACTIVE, /* exclusive operation (tls setup, ...) in progress */ SLAP_C_BINDING, /* binding */ }; +enum sc_type { + SLAP_C_OPEN = 0, /* regular connection */ + SLAP_C_BIND, /* connection used to handle bind client requests if VC not enabled */ +}; /* * represents a connection from an ldap client/to ldap server */ struct Connection { enum sc_state c_state; /* connection state */ + enum sc_type c_type; ber_socket_t c_fd; ldap_pvt_thread_mutex_t c_mutex; /* protect the connection */ @@ -301,7 +307,7 @@ struct Connection { TAvlnode *c_ops; /* Operations pending on the connection */ #define CONN_IS_TLS 1 -#define CONN_IS_CLIENT 4 +#define CONN_IS_BIND 4 #define CONN_IS_IPC 8 #ifdef HAVE_TLS @@ -312,6 +318,9 @@ struct Connection { long c_n_ops_executing; /* num of ops currently executing */ long c_n_ops_completed; /* num of ops completed */ + /* Upstream: Protected by its backend's mutex */ + LDAP_LIST_ENTRY( Connection ) c_next; + void *c_private; }; diff --git a/servers/lloadd/upstream.c b/servers/lloadd/upstream.c index f376d92012..b11afca2ca 100644 --- a/servers/lloadd/upstream.c +++ b/servers/lloadd/upstream.c @@ -735,16 +735,22 @@ upstream_bind( void *ctx, void *arg ) return NULL; } +/* + * We must already hold b->b_mutex when called. + */ Connection * upstream_init( ber_socket_t s, Backend *b ) { Connection *c; struct event_base *base = slap_get_base( s ); struct event *event; - int flags = (b->b_tls == LLOAD_LDAPS) ? CONN_IS_TLS : 0; + int flags, is_bindconn = 0; assert( b != NULL ); + b->b_opening--; + + flags = (b->b_tls == LLOAD_LDAPS) ? CONN_IS_TLS : 0; c = connection_init( s, b->b_host, flags ); c->c_private = b; @@ -757,15 +763,38 @@ upstream_init( ber_socket_t s, Backend *b ) /* We only register the write event when we have data pending */ c->c_write_event = event; - if ( b->b_bindconf.sb_method == LDAP_AUTH_NONE ) { + /* Unless we are configured to use the VC exop, consider allocating the + * connection into the bind conn pool. Start off by allocating one for + * general use, then one for binds, then we start filling up the general + * connection pool, finally the bind pool */ + if ( !(lload_features & LLOAD_FEATURE_VC) && b->b_active && + b->b_numbindconns ) { + if ( !b->b_bindavail ) { + is_bindconn = 1; + } else if ( b->b_active >= b->b_numconns && + b->b_bindavail < b->b_numbindconns ) { + is_bindconn = 1; + } + } + + if ( is_bindconn || b->b_bindconf.sb_method == LDAP_AUTH_NONE ) { upstream_finish( c ); } else { ldap_pvt_thread_pool_submit( &connection_pool, upstream_bind, c ); } - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + if ( is_bindconn ) { + LDAP_LIST_INSERT_HEAD( &b->b_bindconns, c, c_next ); + c->c_type = SLAP_C_BIND; + b->b_bindavail++; + } else { + LDAP_LIST_INSERT_HEAD( &b->b_conns, c, c_next ); + b->b_active++; + } + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); return c; + fail: if ( c->c_write_event ) { event_del( c->c_write_event ); @@ -784,18 +813,22 @@ upstream_destroy( Connection *c ) { Backend *b = c->c_private; + Debug( LDAP_DEBUG_CONNS, "upstream_destroy: freeing connection %lu\n", + c->c_connid ); + + assert( c->c_state != SLAP_C_INVALID ); c->c_state = SLAP_C_INVALID; ldap_pvt_thread_mutex_unlock( &c->c_mutex ); ldap_pvt_thread_mutex_lock( &b->b_mutex ); - if ( !(b->b_conns == c) ) { - ldap_pvt_thread_mutex_unlock( &b->b_mutex ); - return; + LDAP_LIST_REMOVE( c, c_next ); + if ( c->c_type == SLAP_C_BIND ) { + b->b_bindavail--; + } else { + b->b_active--; } - b->b_conns = NULL; ldap_pvt_thread_mutex_unlock( &b->b_mutex ); - - ldap_pvt_thread_pool_submit( &connection_pool, backend_connect, b ); + backend_retry( b ); ldap_pvt_thread_mutex_lock( &c->c_mutex );