mirror of
https://git.openldap.org/openldap/openldap.git
synced 2025-12-26 09:39:45 -05:00
Handle asynchronous connect properly
This commit is contained in:
parent
887c266120
commit
baf1feab82
4 changed files with 151 additions and 9 deletions
|
|
@ -27,6 +27,56 @@
|
|||
#include "lutil.h"
|
||||
#include "slap.h"
|
||||
|
||||
static void
|
||||
upstream_connect_cb( evutil_socket_t s, short what, void *arg )
|
||||
{
|
||||
PendingConnection *conn = arg;
|
||||
Backend *b = conn->backend;
|
||||
int rc = -1;
|
||||
|
||||
ldap_pvt_thread_mutex_lock( &b->b_mutex );
|
||||
Debug( LDAP_DEBUG_CONNS, "upstream_connect_cb: "
|
||||
"fd=%d connection callback for backend uri='%s'\n",
|
||||
s, b->b_uri.bv_val );
|
||||
if ( what == EV_WRITE ) {
|
||||
int error;
|
||||
socklen_t optlen = sizeof(error);
|
||||
|
||||
if ( getsockopt( conn->fd, SOL_SOCKET, SO_ERROR, (void *)&error,
|
||||
&optlen ) < 0 ) {
|
||||
goto done;
|
||||
}
|
||||
if ( error == EINTR || error == EINPROGRESS || error == EWOULDBLOCK ) {
|
||||
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
|
||||
return;
|
||||
} else if ( error ) {
|
||||
goto done;
|
||||
} else if ( !upstream_init( s, conn->backend ) ) {
|
||||
goto done;
|
||||
}
|
||||
rc = LDAP_SUCCESS;
|
||||
}
|
||||
|
||||
done:
|
||||
if ( rc ) {
|
||||
evutil_closesocket( conn->fd );
|
||||
b->b_failed++;
|
||||
Debug( LDAP_DEBUG_ANY, "upstream_connect_cb: "
|
||||
"fd=%d connection set up failed\n",
|
||||
s );
|
||||
} else {
|
||||
b->b_failed = 0;
|
||||
}
|
||||
b->b_opening--;
|
||||
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
|
||||
|
||||
LDAP_LIST_REMOVE( conn, next );
|
||||
event_free( conn->event );
|
||||
ch_free( conn );
|
||||
|
||||
backend_retry( b );
|
||||
}
|
||||
|
||||
static void
|
||||
upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg )
|
||||
{
|
||||
|
|
@ -62,11 +112,45 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg )
|
|||
ai->sin6_port = htons( b->b_port );
|
||||
rc = connect( s, (struct sockaddr *)ai, res->ai_addrlen );
|
||||
}
|
||||
if ( rc && errno != EINPROGRESS && errno != EWOULDBLOCK ) {
|
||||
Debug( LDAP_DEBUG_ANY, "upstream_name_cb: "
|
||||
"failed to connect to server '%s'\n",
|
||||
/* Asynchronous connect */
|
||||
if ( rc ) {
|
||||
struct timeval tv = { slap_write_timeout / 1000,
|
||||
1000 * ( slap_write_timeout % 1000 ) };
|
||||
PendingConnection *conn;
|
||||
|
||||
if ( errno != EINPROGRESS && errno != EWOULDBLOCK ) {
|
||||
Debug( LDAP_DEBUG_ANY, "upstream_name_cb: "
|
||||
"failed to connect to server '%s'\n",
|
||||
b->b_uri.bv_val );
|
||||
evutil_closesocket( s );
|
||||
goto fail;
|
||||
}
|
||||
|
||||
conn = ch_calloc( 1, sizeof(PendingConnection) );
|
||||
LDAP_LIST_ENTRY_INIT( conn, next );
|
||||
conn->backend = b;
|
||||
conn->fd = s;
|
||||
|
||||
conn->event = event_new( slap_get_base( s ), s, EV_WRITE|EV_PERSIST,
|
||||
upstream_connect_cb, conn );
|
||||
if ( !conn->event ) {
|
||||
Debug( LDAP_DEBUG_ANY, "upstream_name_cb: "
|
||||
"failed to acquire an event to finish upstream "
|
||||
"connection setup.\n" );
|
||||
ch_free( conn );
|
||||
evutil_closesocket( s );
|
||||
goto fail;
|
||||
}
|
||||
|
||||
event_add( conn->event, &tv );
|
||||
LDAP_LIST_INSERT_HEAD( &b->b_connecting, conn, next );
|
||||
Debug( LDAP_DEBUG_CONNS, "upstream_name_cb: "
|
||||
"connection to backend uri=%s in progress\n",
|
||||
b->b_uri.bv_val );
|
||||
goto fail;
|
||||
|
||||
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
|
||||
free( res );
|
||||
return;
|
||||
}
|
||||
|
||||
if ( !upstream_init( s, b ) ) {
|
||||
|
|
@ -268,14 +352,45 @@ backend_connect( evutil_socket_t s, short what, void *arg )
|
|||
|
||||
rc = connect(
|
||||
s, (struct sockaddr *)&addr, sizeof(struct sockaddr_un) );
|
||||
if ( rc && errno != EINPROGRESS && errno != EWOULDBLOCK ) {
|
||||
evutil_closesocket( s );
|
||||
/* Asynchronous connect */
|
||||
if ( rc ) {
|
||||
struct timeval tv = { slap_write_timeout / 1000,
|
||||
1000 * ( slap_write_timeout % 1000 ) };
|
||||
PendingConnection *conn;
|
||||
|
||||
if ( errno != EINPROGRESS && errno != EWOULDBLOCK ) {
|
||||
evutil_closesocket( s );
|
||||
goto fail;
|
||||
}
|
||||
|
||||
conn = ch_calloc( 1, sizeof(PendingConnection) );
|
||||
LDAP_LIST_ENTRY_INIT( conn, next );
|
||||
conn->backend = b;
|
||||
conn->fd = s;
|
||||
|
||||
conn->event = event_new( slap_get_base( s ), s,
|
||||
EV_WRITE|EV_PERSIST, upstream_connect_cb, conn );
|
||||
if ( !conn->event ) {
|
||||
Debug( LDAP_DEBUG_ANY, "backend_connect: "
|
||||
"failed to acquire an event to finish upstream "
|
||||
"connection setup.\n" );
|
||||
ch_free( conn );
|
||||
evutil_closesocket( s );
|
||||
goto fail;
|
||||
}
|
||||
|
||||
event_add( conn->event, &tv );
|
||||
LDAP_LIST_INSERT_HEAD( &b->b_connecting, conn, next );
|
||||
Debug( LDAP_DEBUG_CONNS, "backend_connect: "
|
||||
"connection to backend uri=%s in progress\n",
|
||||
b->b_uri.bv_val );
|
||||
|
||||
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
|
||||
return;
|
||||
} else if ( !upstream_init( s, b ) ) {
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if ( !upstream_init( s, b ) ) {
|
||||
goto fail;
|
||||
}
|
||||
b->b_opening--;
|
||||
b->b_failed = 0;
|
||||
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
|
||||
|
|
@ -319,6 +434,18 @@ backends_destroy( void )
|
|||
"destroying backend uri='%s', numconns=%d, numbindconns=%d\n",
|
||||
b->b_uri.bv_val, b->b_numconns, b->b_numbindconns );
|
||||
|
||||
while ( !LDAP_LIST_EMPTY( &b->b_connecting ) ) {
|
||||
PendingConnection *pending = LDAP_LIST_FIRST( &b->b_connecting );
|
||||
|
||||
Debug( LDAP_DEBUG_CONNS, "backends_destroy: "
|
||||
"destroying socket pending connect() fd=%d\n",
|
||||
pending->fd );
|
||||
|
||||
event_free( pending->event );
|
||||
evutil_closesocket( pending->fd );
|
||||
LDAP_LIST_REMOVE( pending, next );
|
||||
ch_free( pending );
|
||||
}
|
||||
while ( !LDAP_CIRCLEQ_EMPTY( &b->b_bindconns ) ) {
|
||||
Connection *c = LDAP_CIRCLEQ_FIRST( &b->b_bindconns );
|
||||
|
||||
|
|
|
|||
|
|
@ -76,6 +76,8 @@ ber_len_t sockbuf_max_incoming_upstream = SLAP_SB_MAX_INCOMING_UPSTREAM;
|
|||
|
||||
int slap_conn_max_pdus_per_cycle = SLAP_CONN_MAX_PDUS_PER_CYCLE_DEFAULT;
|
||||
|
||||
int slap_write_timeout = 10000;
|
||||
|
||||
char *slapd_pid_file = NULL;
|
||||
char *slapd_args_file = NULL;
|
||||
|
||||
|
|
|
|||
|
|
@ -231,6 +231,8 @@ LDAP_SLAPD_V (ber_len_t) sockbuf_max_incoming_client;
|
|||
LDAP_SLAPD_V (ber_len_t) sockbuf_max_incoming_upstream;
|
||||
LDAP_SLAPD_V (int) slap_conn_max_pdus_per_cycle;
|
||||
|
||||
LDAP_SLAPD_V (int) slap_write_timeout;
|
||||
|
||||
LDAP_SLAPD_V (lload_features_t) lload_features;
|
||||
|
||||
LDAP_SLAPD_V (slap_mask_t) global_allows;
|
||||
|
|
|
|||
|
|
@ -101,6 +101,7 @@ LDAP_SLAPD_V (int) slap_debug;
|
|||
typedef unsigned long slap_mask_t;
|
||||
|
||||
typedef struct Backend Backend;
|
||||
typedef struct PendingConnection PendingConnection;
|
||||
typedef struct Connection Connection;
|
||||
typedef struct Operation Operation;
|
||||
/* end of forward declarations */
|
||||
|
|
@ -246,6 +247,15 @@ enum lload_tls_type {
|
|||
LLOAD_STARTTLS,
|
||||
};
|
||||
|
||||
struct PendingConnection {
|
||||
Backend *backend;
|
||||
|
||||
struct event *event;
|
||||
ber_socket_t fd;
|
||||
|
||||
LDAP_LIST_ENTRY(PendingConnection) next;
|
||||
};
|
||||
|
||||
/* Can hold mutex when locking a linked connection */
|
||||
struct Backend {
|
||||
ldap_pvt_thread_mutex_t b_mutex;
|
||||
|
|
@ -262,6 +272,7 @@ struct Backend {
|
|||
int b_numconns, b_numbindconns;
|
||||
int b_bindavail, b_active, b_opening;
|
||||
LDAP_CIRCLEQ_HEAD(ConnSt, Connection) b_conns, b_bindconns;
|
||||
LDAP_LIST_HEAD(ConnectingSt, PendingConnection) b_connecting;
|
||||
|
||||
long b_max_pending, b_max_conn_pending;
|
||||
long b_n_ops_executing;
|
||||
|
|
|
|||
Loading…
Reference in a new issue