From baf1feab82fdccd4d430685007390fbd378d6016 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Kuzn=C3=ADk?= Date: Mon, 26 Jun 2017 15:48:32 +0100 Subject: [PATCH] Handle asynchronous connect properly --- servers/lloadd/backend.c | 145 +++++++++++++++++++++++++++++++++--- servers/lloadd/config.c | 2 + servers/lloadd/proto-slap.h | 2 + servers/lloadd/slap.h | 11 +++ 4 files changed, 151 insertions(+), 9 deletions(-) diff --git a/servers/lloadd/backend.c b/servers/lloadd/backend.c index 3c2ee2190c..2e68994e6d 100644 --- a/servers/lloadd/backend.c +++ b/servers/lloadd/backend.c @@ -27,6 +27,56 @@ #include "lutil.h" #include "slap.h" +static void +upstream_connect_cb( evutil_socket_t s, short what, void *arg ) +{ + PendingConnection *conn = arg; + Backend *b = conn->backend; + int rc = -1; + + ldap_pvt_thread_mutex_lock( &b->b_mutex ); + Debug( LDAP_DEBUG_CONNS, "upstream_connect_cb: " + "fd=%d connection callback for backend uri='%s'\n", + s, b->b_uri.bv_val ); + if ( what == EV_WRITE ) { + int error; + socklen_t optlen = sizeof(error); + + if ( getsockopt( conn->fd, SOL_SOCKET, SO_ERROR, (void *)&error, + &optlen ) < 0 ) { + goto done; + } + if ( error == EINTR || error == EINPROGRESS || error == EWOULDBLOCK ) { + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + return; + } else if ( error ) { + goto done; + } else if ( !upstream_init( s, conn->backend ) ) { + goto done; + } + rc = LDAP_SUCCESS; + } + +done: + if ( rc ) { + evutil_closesocket( conn->fd ); + b->b_failed++; + Debug( LDAP_DEBUG_ANY, "upstream_connect_cb: " + "fd=%d connection set up failed\n", + s ); + } else { + b->b_failed = 0; + } + b->b_opening--; + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + + LDAP_LIST_REMOVE( conn, next ); + event_free( conn->event ); + ch_free( conn ); + + backend_retry( b ); +} + static void upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg ) { @@ -62,11 +112,45 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg ) ai->sin6_port = htons( b->b_port ); rc = connect( s, (struct sockaddr *)ai, res->ai_addrlen ); } - if ( rc && errno != EINPROGRESS && errno != EWOULDBLOCK ) { - Debug( LDAP_DEBUG_ANY, "upstream_name_cb: " - "failed to connect to server '%s'\n", + /* Asynchronous connect */ + if ( rc ) { + struct timeval tv = { slap_write_timeout / 1000, + 1000 * ( slap_write_timeout % 1000 ) }; + PendingConnection *conn; + + if ( errno != EINPROGRESS && errno != EWOULDBLOCK ) { + Debug( LDAP_DEBUG_ANY, "upstream_name_cb: " + "failed to connect to server '%s'\n", + b->b_uri.bv_val ); + evutil_closesocket( s ); + goto fail; + } + + conn = ch_calloc( 1, sizeof(PendingConnection) ); + LDAP_LIST_ENTRY_INIT( conn, next ); + conn->backend = b; + conn->fd = s; + + conn->event = event_new( slap_get_base( s ), s, EV_WRITE|EV_PERSIST, + upstream_connect_cb, conn ); + if ( !conn->event ) { + Debug( LDAP_DEBUG_ANY, "upstream_name_cb: " + "failed to acquire an event to finish upstream " + "connection setup.\n" ); + ch_free( conn ); + evutil_closesocket( s ); + goto fail; + } + + event_add( conn->event, &tv ); + LDAP_LIST_INSERT_HEAD( &b->b_connecting, conn, next ); + Debug( LDAP_DEBUG_CONNS, "upstream_name_cb: " + "connection to backend uri=%s in progress\n", b->b_uri.bv_val ); - goto fail; + + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + free( res ); + return; } if ( !upstream_init( s, b ) ) { @@ -268,14 +352,45 @@ backend_connect( evutil_socket_t s, short what, void *arg ) rc = connect( s, (struct sockaddr *)&addr, sizeof(struct sockaddr_un) ); - if ( rc && errno != EINPROGRESS && errno != EWOULDBLOCK ) { - evutil_closesocket( s ); + /* Asynchronous connect */ + if ( rc ) { + struct timeval tv = { slap_write_timeout / 1000, + 1000 * ( slap_write_timeout % 1000 ) }; + PendingConnection *conn; + + if ( errno != EINPROGRESS && errno != EWOULDBLOCK ) { + evutil_closesocket( s ); + goto fail; + } + + conn = ch_calloc( 1, sizeof(PendingConnection) ); + LDAP_LIST_ENTRY_INIT( conn, next ); + conn->backend = b; + conn->fd = s; + + conn->event = event_new( slap_get_base( s ), s, + EV_WRITE|EV_PERSIST, upstream_connect_cb, conn ); + if ( !conn->event ) { + Debug( LDAP_DEBUG_ANY, "backend_connect: " + "failed to acquire an event to finish upstream " + "connection setup.\n" ); + ch_free( conn ); + evutil_closesocket( s ); + goto fail; + } + + event_add( conn->event, &tv ); + LDAP_LIST_INSERT_HEAD( &b->b_connecting, conn, next ); + Debug( LDAP_DEBUG_CONNS, "backend_connect: " + "connection to backend uri=%s in progress\n", + b->b_uri.bv_val ); + + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + return; + } else if ( !upstream_init( s, b ) ) { goto fail; } - if ( !upstream_init( s, b ) ) { - goto fail; - } b->b_opening--; b->b_failed = 0; ldap_pvt_thread_mutex_unlock( &b->b_mutex ); @@ -319,6 +434,18 @@ backends_destroy( void ) "destroying backend uri='%s', numconns=%d, numbindconns=%d\n", b->b_uri.bv_val, b->b_numconns, b->b_numbindconns ); + while ( !LDAP_LIST_EMPTY( &b->b_connecting ) ) { + PendingConnection *pending = LDAP_LIST_FIRST( &b->b_connecting ); + + Debug( LDAP_DEBUG_CONNS, "backends_destroy: " + "destroying socket pending connect() fd=%d\n", + pending->fd ); + + event_free( pending->event ); + evutil_closesocket( pending->fd ); + LDAP_LIST_REMOVE( pending, next ); + ch_free( pending ); + } while ( !LDAP_CIRCLEQ_EMPTY( &b->b_bindconns ) ) { Connection *c = LDAP_CIRCLEQ_FIRST( &b->b_bindconns ); diff --git a/servers/lloadd/config.c b/servers/lloadd/config.c index dd62fb2881..f432bbabb8 100644 --- a/servers/lloadd/config.c +++ b/servers/lloadd/config.c @@ -76,6 +76,8 @@ ber_len_t sockbuf_max_incoming_upstream = SLAP_SB_MAX_INCOMING_UPSTREAM; int slap_conn_max_pdus_per_cycle = SLAP_CONN_MAX_PDUS_PER_CYCLE_DEFAULT; +int slap_write_timeout = 10000; + char *slapd_pid_file = NULL; char *slapd_args_file = NULL; diff --git a/servers/lloadd/proto-slap.h b/servers/lloadd/proto-slap.h index bcad45dcc5..c6e9c90930 100644 --- a/servers/lloadd/proto-slap.h +++ b/servers/lloadd/proto-slap.h @@ -231,6 +231,8 @@ LDAP_SLAPD_V (ber_len_t) sockbuf_max_incoming_client; LDAP_SLAPD_V (ber_len_t) sockbuf_max_incoming_upstream; LDAP_SLAPD_V (int) slap_conn_max_pdus_per_cycle; +LDAP_SLAPD_V (int) slap_write_timeout; + LDAP_SLAPD_V (lload_features_t) lload_features; LDAP_SLAPD_V (slap_mask_t) global_allows; diff --git a/servers/lloadd/slap.h b/servers/lloadd/slap.h index 06a7647e1d..a524cf7d19 100644 --- a/servers/lloadd/slap.h +++ b/servers/lloadd/slap.h @@ -101,6 +101,7 @@ LDAP_SLAPD_V (int) slap_debug; typedef unsigned long slap_mask_t; typedef struct Backend Backend; +typedef struct PendingConnection PendingConnection; typedef struct Connection Connection; typedef struct Operation Operation; /* end of forward declarations */ @@ -246,6 +247,15 @@ enum lload_tls_type { LLOAD_STARTTLS, }; +struct PendingConnection { + Backend *backend; + + struct event *event; + ber_socket_t fd; + + LDAP_LIST_ENTRY(PendingConnection) next; +}; + /* Can hold mutex when locking a linked connection */ struct Backend { ldap_pvt_thread_mutex_t b_mutex; @@ -262,6 +272,7 @@ struct Backend { int b_numconns, b_numbindconns; int b_bindavail, b_active, b_opening; LDAP_CIRCLEQ_HEAD(ConnSt, Connection) b_conns, b_bindconns; + LDAP_LIST_HEAD(ConnectingSt, PendingConnection) b_connecting; long b_max_pending, b_max_conn_pending; long b_n_ops_executing;