diff --git a/servers/lloadd/Makefile.in b/servers/lloadd/Makefile.in
index 736e2bbeba..6f34aebf5f 100644
--- a/servers/lloadd/Makefile.in
+++ b/servers/lloadd/Makefile.in
@@ -21,10 +21,10 @@ XSRCS = version.c
NT_SRCS = nt_svc.c
NT_OBJS = nt_svc.o ../../libraries/liblutil/slapdmsg.res
-SRCS = main.c globals.c backend.c config.c connection.c client.c daemon.c \
- ch_malloc.c init.c operation.c user.c sl_malloc.c upstream.c value.c \
- libevent_support.c \
- $(@PLAT@_SRCS)
+SRCS = main.c globals.c backend.c bind.c config.c connection.c client.c \
+ daemon.c ch_malloc.c init.c operation.c user.c sl_malloc.c \
+ upstream.c value.c libevent_support.c \
+ $(@PLAT@_SRCS)
OBJS = $(patsubst %.c,%.o,$(SRCS)) $(@PLAT@_OBJS)
diff --git a/servers/lloadd/backend.c b/servers/lloadd/backend.c
index 895693b72b..534277b2c2 100644
--- a/servers/lloadd/backend.c
+++ b/servers/lloadd/backend.c
@@ -86,12 +86,12 @@ backend_select( Operation *op )
ldap_pvt_thread_mutex_lock( &b->b_mutex );
c = b->b_conns;
- ldap_pvt_thread_mutex_lock( &c->c_mutex );
+ ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
if ( c->c_state == SLAP_C_READY && !c->c_pendingber ) {
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
return b->b_conns;
}
- ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
diff --git a/servers/lloadd/bind.c b/servers/lloadd/bind.c
new file mode 100644
index 0000000000..c0061f9f40
--- /dev/null
+++ b/servers/lloadd/bind.c
@@ -0,0 +1,275 @@
+/* $OpenLDAP$ */
+/* This work is part of OpenLDAP Software .
+ *
+ * Copyright 1998-2020 The OpenLDAP Foundation.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted only as authorized by the OpenLDAP
+ * Public License.
+ *
+ * A copy of this license is available in the file LICENSE in the
+ * top-level directory of the distribution or, alternatively, at
+ * .
+ */
+
+#include "portable.h"
+
+#include
+#include
+#include
+#include
+#include
+
+#include "lutil.h"
+#include "slap.h"
+
+static int
+request_bind( Operation *op )
+{
+ Connection *c = op->o_upstream;
+ BerElement *ber, *copy = NULL;
+ BerValue binddn;
+ ber_tag_t tag;
+ ber_int_t version;
+
+ ber = c->c_pendingber;
+ if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
+ Debug( LDAP_DEBUG_ANY, "request_bind: "
+ "ber_alloc failed\n" );
+ goto fail;
+ }
+ c->c_pendingber = ber;
+
+ if ( (copy = ber_alloc()) == NULL ) {
+ goto fail;
+ }
+ ber_init2( copy, &op->o_request, 0 );
+
+ tag = ber_get_int( copy, &version );
+ if ( tag == LBER_ERROR ) {
+ goto fail;
+ } else if ( version != LDAP_VERSION3 ) {
+ /* TODO: result code and message */
+ operation_send_reject(
+ op, LDAP_PROTOCOL_ERROR, "LDAP version unsupported" );
+ ber_free( copy, 0 );
+ return 0;
+ }
+
+ tag = ber_get_stringbv( copy, &binddn, LBER_BV_NOTERM );
+ if ( tag == LBER_ERROR ) {
+ goto fail;
+ }
+
+ ldap_pvt_thread_mutex_lock( &c->c_mutex );
+ if ( !BER_BVISNULL( &c->c_auth ) ) {
+ ber_memfree( c->c_auth.bv_val );
+ }
+
+ if ( !BER_BVISEMPTY( &binddn ) ) {
+ char *ptr;
+ c->c_auth.bv_len = STRLENOF("dn:") + binddn.bv_len;
+ c->c_auth.bv_val = ch_malloc( c->c_auth.bv_len + 1 );
+
+ ptr = lutil_strcopy( c->c_auth.bv_val, "dn:" );
+ ptr = lutil_strncopy( ptr, binddn.bv_val, binddn.bv_len );
+ *ptr = '\0';
+ } else {
+ BER_BVZERO( &c->c_auth );
+ }
+
+ op->o_upstream_msgid = c->c_next_msgid++;
+
+ ber_printf( ber, "t{titOtO}", LDAP_TAG_MESSAGE,
+ LDAP_TAG_MSGID, op->o_upstream_msgid,
+ LDAP_REQ_BIND, &op->o_request,
+ LDAP_TAG_CONTROLS, BER_BV_OPTIONAL( &op->o_ctrls ) );
+
+ if ( tavl_insert( &c->c_ops, op, operation_upstream_cmp, avl_dup_error ) ) {
+ assert(0);
+ }
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+
+ ber_free( copy, 0 );
+ upstream_write_cb( -1, 0, c );
+ return 0;
+
+fail:
+ if ( copy ) {
+ ber_free( copy, 0 );
+ }
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+ ldap_pvt_thread_mutex_lock( &op->o_client->c_mutex );
+ client_destroy( op->o_client );
+ return 1;
+}
+
+static int
+request_bind_as_vc( Operation *op )
+{
+ Connection *c = op->o_upstream;
+ BerElement *ber, *request, *copy = NULL;
+ BerValue binddn, auth, mech;
+ ber_int_t version;
+ ber_tag_t tag;
+ ber_len_t len;
+
+ if ( (request = ber_alloc()) == NULL ) {
+ goto fail;
+ }
+ ber_init2( request, &op->o_request, 0 );
+
+ tag = ber_scanf( request, "im", &version, &binddn );
+ if ( tag == LBER_ERROR || version != LDAP_VERSION3 ) {
+ goto fail;
+ }
+
+ copy = ber_dup( request );
+ if ( !copy ) {
+ goto fail;
+ }
+
+ tag = ber_skip_element( request, &auth );
+ if ( tag == LBER_ERROR ) {
+ goto fail;
+ }
+
+ ber = c->c_pendingber;
+ if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
+ Debug( LDAP_DEBUG_ANY, "request_bind_as_vc: "
+ "ber_alloc failed\n" );
+ goto fail;
+ }
+ c->c_pendingber = ber;
+
+ op->o_upstream_msgid = c->c_next_msgid++;
+
+ ldap_pvt_thread_mutex_lock( &c->c_mutex );
+ ber_printf( ber, "t{tit{tst{{tOOtOtO}}}}", LDAP_TAG_MESSAGE,
+ LDAP_TAG_MSGID, op->o_upstream_msgid,
+ LDAP_REQ_EXTENDED,
+ LDAP_TAG_EXOP_REQ_OID, LDAP_EXOP_VERIFY_CREDENTIALS,
+ LDAP_TAG_EXOP_REQ_VALUE,
+ LDAP_TAG_EXOP_VERIFY_CREDENTIALS_COOKIE, BER_BV_OPTIONAL( &c->c_vc_cookie ),
+ &binddn, tag, &auth,
+ LDAP_TAG_EXOP_VERIFY_CREDENTIALS_CONTROLS, BER_BV_OPTIONAL( &op->o_ctrls ) );
+
+ tag = ber_peek_tag( copy, &len );
+ switch ( tag ) {
+ case LDAP_AUTH_SASL:
+ ber_get_stringbv( copy, &mech, LBER_BV_NOTERM );
+ if ( ber_bvcmp( &mech, &c->c_sasl_bind_mech ) ) {
+ ber_memfree( c->c_sasl_bind_mech.bv_val );
+ ber_dupbv( &c->c_sasl_bind_mech, &mech );
+ }
+ /* TODO: extract authzdn from the message */
+ break;
+ case LDAP_AUTH_SIMPLE:
+ if ( !BER_BVISNULL( &c->c_auth ) ) {
+ ber_memfree( c->c_auth.bv_val );
+ }
+ ber_dupbv( &c->c_auth, &binddn );
+ if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) {
+ ber_memfree( c->c_sasl_bind_mech.bv_val );
+ BER_BVZERO( &c->c_sasl_bind_mech );
+ }
+ break;
+ default:
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+ goto fail;
+ }
+ if ( tavl_insert( &c->c_ops, op, operation_upstream_cmp, avl_dup_error ) ) {
+ assert(0);
+ }
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+
+ ber_free( copy, 0 );
+ upstream_write_cb( -1, 0, c );
+ return 0;
+
+fail:
+ if ( copy ) {
+ ber_free( copy, 0 );
+ }
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+ ldap_pvt_thread_mutex_lock( &op->o_client->c_mutex );
+ client_destroy( op->o_client );
+ return 1;
+}
+
+void *
+client_reset( void *ctx, void *arg )
+{
+ Operation *op = arg;
+ Connection *c = op->o_client;
+ TAvlnode *root;
+ int freed, destroy = 1;
+
+ ldap_pvt_thread_mutex_lock( &c->c_mutex );
+ root = c->c_ops;
+ c->c_ops = NULL;
+ c->c_state = SLAP_C_CLOSING;
+ if ( op->o_tag == LDAP_REQ_BIND ) {
+ c->c_state = SLAP_C_BINDING;
+ destroy = 0;
+ }
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+
+ tavl_delete( &root, op, operation_client_cmp );
+ freed = tavl_free( root, (AVL_FREE)operation_abandon );
+
+ Debug( LDAP_DEBUG_TRACE, "client_reset: "
+ "dropped %d operations\n",
+ freed );
+
+ if ( destroy ) {
+ operation_destroy( op );
+ ldap_pvt_thread_mutex_lock( &c->c_mutex );
+ client_destroy( c );
+ }
+
+ return NULL;
+}
+
+void *
+client_bind( void *ctx, void *arg )
+{
+ Operation *op = arg;
+ Connection *upstream, *client = op->o_client;
+ int rc = 0;
+
+ client_reset( ctx, arg );
+
+ upstream = backend_select( op );
+ if ( !upstream ) {
+ Debug( LDAP_DEBUG_STATS, "client_bind: "
+ "no available connection found\n" );
+ operation_send_reject(
+ op, LDAP_UNAVAILABLE, "no connections available" );
+ return NULL;
+ }
+
+ op->o_upstream = upstream;
+ if ( upstream->c_features & SLAP_C_VC ) {
+ rc = request_bind_as_vc( op );
+ } else {
+ rc = request_bind( op );
+ }
+
+ if ( rc ) {
+ /* client doesn't exist anymore */
+ return NULL;
+ }
+
+ ldap_pvt_thread_mutex_lock( &client->c_mutex );
+ rc = tavl_insert( &client->c_ops, op, operation_client_cmp, avl_dup_error );
+ assert( rc == LDAP_SUCCESS );
+ ldap_pvt_thread_mutex_unlock( &client->c_mutex );
+
+ return NULL;
+}
diff --git a/servers/lloadd/client.c b/servers/lloadd/client.c
index 7c4b1eccb3..1a3565916b 100644
--- a/servers/lloadd/client.c
+++ b/servers/lloadd/client.c
@@ -24,16 +24,15 @@
#include "lutil.h"
#include "slap.h"
-static void client_destroy( Connection *c );
-
static void
client_read_cb( evutil_socket_t s, short what, void *arg )
{
Connection *c = arg;
BerElement *ber;
- Operation *op;
+ Operation *op = NULL;
ber_tag_t tag;
ber_len_t len;
+ int rc = 0;
ldap_pvt_thread_mutex_lock( &c->c_mutex );
Debug( LDAP_DEBUG_CONNS, "client_read_cb: "
@@ -71,26 +70,47 @@ client_read_cb( evutil_socket_t s, short what, void *arg )
if ( !op ) {
Debug( LDAP_DEBUG_ANY, "client_read_cb: "
"operation_init failed\n" );
- goto fail;
- }
-
- if ( ldap_pvt_thread_pool_submit(
- &connection_pool, operation_process, op ) ) {
- /* what could have happened? */
- ldap_pvt_thread_mutex_unlock( &c->c_mutex );
- operation_destroy( op );
- ldap_pvt_thread_mutex_lock( &c->c_mutex );
- goto fail;
- }
-
- ldap_pvt_thread_mutex_unlock( &c->c_mutex );
- return;
-fail:
- client_destroy( c );
-
- if ( ber ) {
ber_free( ber, 1 );
+ goto fail;
}
+
+ switch ( op->o_tag ) {
+ case LDAP_REQ_UNBIND:
+ /* We do not expect anything more from the client */
+ event_del( c->c_read_event );
+
+ rc = ldap_pvt_thread_pool_submit(
+ &connection_pool, client_reset, op );
+ if ( rc ) {
+ tavl_delete( &c->c_ops, op, operation_client_cmp );
+ operation_destroy( op );
+ client_destroy( c );
+ return;
+ }
+ break;
+ case LDAP_REQ_BIND:
+ rc = ldap_pvt_thread_pool_submit(
+ &connection_pool, client_bind, op );
+ break;
+ default:
+ rc = ldap_pvt_thread_pool_submit(
+ &connection_pool, request_process, op );
+ break;
+ }
+
+ if ( !rc ) {
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+ return;
+ }
+
+fail:
+ if ( op ) {
+ tavl_delete( &c->c_ops, op, operation_client_cmp );
+ op->o_client = NULL;
+ operation_destroy( op );
+ }
+
+ client_destroy( c );
return;
}
@@ -171,12 +191,14 @@ fail:
return NULL;
}
-static void
+void
client_destroy( Connection *c )
{
+ assert( c->c_read_event != NULL );
event_del( c->c_read_event );
event_free( c->c_read_event );
+ assert( c->c_write_event != NULL );
event_del( c->c_write_event );
event_free( c->c_write_event );
diff --git a/servers/lloadd/operation.c b/servers/lloadd/operation.c
index 4b881ad545..0502531042 100644
--- a/servers/lloadd/operation.c
+++ b/servers/lloadd/operation.c
@@ -108,6 +108,8 @@ operation_destroy( Operation *op )
{
Connection *c;
+ ber_free( op->o_ber, 1 );
+
/* TODO: this is a stopgap and there are many races here, just get
* something in to test with until we implement the freelist */
if ( op->o_client ) {
@@ -137,6 +139,7 @@ operation_init( Connection *c, BerElement *ber )
op = ch_calloc( 1, sizeof(Operation) );
op->o_client = c;
+ op->o_ber = ber;
tag = ber_get_int( ber, &op->o_client_msgid );
if ( tag != LDAP_TAG_MSGID ) {
@@ -179,8 +182,97 @@ fail:
return NULL;
}
+void
+operation_abandon( Operation *op )
+{
+ int rc;
+
+ if ( op->o_upstream ) {
+ Connection *c = op->o_upstream;
+ BerElement *ber;
+
+ ldap_pvt_thread_mutex_lock( &c->c_mutex );
+ rc = ( tavl_delete( &c->c_ops, op, operation_upstream_cmp ) == NULL );
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+
+ if ( rc ) {
+ /* The operation has already been abandoned or finished */
+ goto done;
+ }
+
+ ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
+
+ ber = c->c_pendingber;
+ if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
+ Debug( LDAP_DEBUG_ANY, "operation_abandon: "
+ "ber_alloc failed\n" );
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+ return;
+ }
+ c->c_pendingber = ber;
+
+ rc = ber_printf( ber, "t{titi}", LDAP_TAG_MESSAGE,
+ LDAP_TAG_MSGID, c->c_next_msgid++,
+ LDAP_REQ_ABANDON, op->o_upstream_msgid );
+
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+
+ if ( rc == -1 ) {
+ ber_free( ber, 1 );
+ return;
+ }
+ upstream_write_cb( -1, 0, c );
+ }
+
+done:
+ operation_destroy( op );
+}
+
+void
+operation_send_reject( Operation *op, int result, const char *msg )
+{
+ Connection *c = op->o_client;
+ BerElement *ber;
+ int found;
+
+ ldap_pvt_thread_mutex_lock( &c->c_mutex );
+ found = ( tavl_delete( &c->c_ops, op, operation_client_cmp ) == op );
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+
+ if ( !found ) {
+ return;
+ }
+
+ ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
+
+ ber = c->c_pendingber;
+ if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+ client_destroy( c );
+ return;
+ }
+ c->c_pendingber = ber;
+
+ ber_printf( ber, "t{tit{ess}}", LDAP_TAG_MESSAGE,
+ LDAP_TAG_MSGID, op->o_client_msgid,
+ slap_req2res( op->o_tag ), result, "", msg );
+
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+
+ client_write_cb( -1, 0, c );
+
+ operation_destroy( op );
+}
+
+void
+operation_lost_upstream( Operation *op )
+{
+ operation_send_reject( op, LDAP_UNAVAILABLE,
+ "connection to the remote server has been severed" );
+}
+
void *
-operation_process( void *ctx, void *arg )
+request_process( void *ctx, void *arg )
{
Operation *op = arg;
BerElement *output;
@@ -190,33 +282,34 @@ operation_process( void *ctx, void *arg )
c = backend_select( op );
if ( !c ) {
- Debug( LDAP_DEBUG_STATS, "operation_process: "
+ Debug( LDAP_DEBUG_STATS, "request_process: "
"no available connection found\n" );
goto fail;
}
op->o_upstream = c;
- c->c_pendingber = output = ber_alloc();
- if ( !output ) {
+ output = c->c_pendingber;
+ if ( output == NULL && (output = ber_alloc()) == NULL ) {
goto fail;
}
+ c->c_pendingber = output;
+ ldap_pvt_thread_mutex_lock( &c->c_mutex );
op->o_upstream_msgid = msgid = c->c_next_msgid++;
rc = tavl_insert( &c->c_ops, op, operation_upstream_cmp, avl_dup_error );
assert( rc == LDAP_SUCCESS );
-
- ber_start_seq( output, LDAP_TAG_MESSAGE );
- ber_put_int( output, msgid, LDAP_TAG_MSGID );
- ber_put_berval( output, &op->o_request, op->o_tag );
- if ( !BER_BVISNULL( &op->o_ctrls ) ) {
- ber_put_berval( output, &op->o_ctrls, LDAP_TAG_CONTROLS );
- }
- ber_put_seq( output );
-
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+
+ ber_printf( output, "t{titOtO}", LDAP_TAG_MESSAGE,
+ LDAP_TAG_MSGID, msgid,
+ op->o_tag, &op->o_request,
+ LDAP_TAG_CONTROLS, BER_BV_OPTIONAL( &op->o_ctrls ) );
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+
upstream_write_cb( -1, 0, c );
return NULL;
fail:
+ operation_send_reject( op, LDAP_OTHER, "internal error" );
return NULL;
}
diff --git a/servers/lloadd/proto-slap.h b/servers/lloadd/proto-slap.h
index 93bd8be1d7..4643f00900 100644
--- a/servers/lloadd/proto-slap.h
+++ b/servers/lloadd/proto-slap.h
@@ -58,11 +58,18 @@ LDAP_SLAPD_F (void) ch_free( void * );
#define free ch_free
#endif
+/*
+ * bind.c
+ */
+LDAP_SLAPD_F (void *) client_reset( void *ctx, void *arg );
+LDAP_SLAPD_F (void *) client_bind( void *ctx, void *arg );
+
/*
* client.c
*/
LDAP_SLAPD_F (Connection *) client_init( ber_socket_t s, Listener *url, const char *peername, struct event_base *base, int use_tls );
LDAP_SLAPD_F (void) client_write_cb( evutil_socket_t s, short what, void *arg );
+LDAP_SLAPD_F (void) client_destroy( Connection *c );
/*
* config.c
@@ -139,9 +146,12 @@ LDAP_SLAPD_V (const char *) slapd_slp_attrs;
LDAP_SLAPD_F (const char *) slap_msgtype2str( ber_tag_t tag );
LDAP_SLAPD_F (int) operation_upstream_cmp( const void *l, const void *r );
LDAP_SLAPD_F (int) operation_client_cmp( const void *l, const void *r );
-LDAP_SLAPD_F (void *) operation_process( void *ctx, void *arg );
LDAP_SLAPD_F (Operation *) operation_init( Connection *c, BerElement *ber );
+LDAP_SLAPD_F (void) operation_abandon( Operation *op );
+LDAP_SLAPD_F (void) operation_send_reject( Operation *op, int result, const char *msg );
+LDAP_SLAPD_F (void) operation_lost_upstream( Operation *op );
LDAP_SLAPD_F (void) operation_destroy( Operation *op );
+LDAP_SLAPD_F (void *) request_process( void *ctx, void *arg );
/*
* sl_malloc.c
@@ -168,6 +178,7 @@ LDAP_SLAPD_F (void *) slap_sl_context( void *ptr );
LDAP_SLAPD_F (void) upstream_write_cb( evutil_socket_t s, short what, void *arg );
LDAP_SLAPD_F (void) upstream_read_cb( evutil_socket_t s, short what, void *arg );
LDAP_SLAPD_F (Connection *) upstream_init( ber_socket_t s, Backend *b );
+LDAP_SLAPD_F (void) upstream_destroy( Connection *c );
/*
* user.c
diff --git a/servers/lloadd/slap.h b/servers/lloadd/slap.h
index b9181392e4..01250e5c3b 100644
--- a/servers/lloadd/slap.h
+++ b/servers/lloadd/slap.h
@@ -233,6 +233,7 @@ enum lload_tls_type {
LLOAD_STARTTLS,
};
+/* Can hold mutex when locking a linked connection */
struct Backend {
struct slap_bindconf b_bindconf;
ldap_pvt_thread_mutex_t b_mutex;
@@ -247,6 +248,8 @@ struct Backend {
LDAP_STAILQ_ENTRY(Backend) b_next;
};
+typedef int (*OperationHandler)( Operation *op, BerElement *ber );
+
/* connection state (protected by c_mutex) */
enum sc_state {
SLAP_C_INVALID = 0, /* MUST BE ZERO (0) */
@@ -276,8 +279,16 @@ struct Connection {
struct event *c_read_event, *c_write_event;
/* can only be changed by binding thread */
- struct berval c_sasl_bind_mech; /* mech in progress */
+ int c_features;
+#define SLAP_C_VC 1
+ struct berval c_sasl_bind_mech; /* mech in progress */
+ struct berval c_auth; /* authcDN (possibly in progress) */
+
+ struct berval c_vc_cookie;
+
+ /* Can be held while acquiring c_mutex to inject things into c_ops or
+ * destroy the connection */
ldap_pvt_thread_mutex_t c_io_mutex; /* only one pdu written at a time */
BerElement *c_currentber; /* ber we're attempting to read */
diff --git a/servers/lloadd/upstream.c b/servers/lloadd/upstream.c
index 7d7de4f080..f376d92012 100644
--- a/servers/lloadd/upstream.c
+++ b/servers/lloadd/upstream.c
@@ -24,17 +24,441 @@
#include "lutil.h"
#include "slap.h"
-static void upstream_destroy( Connection *c );
+static int
+forward_response( Operation *op, BerElement *ber )
+{
+ Connection *c = op->o_client;
+ BerElement *output;
+ BerValue response, controls = BER_BVNULL;
+ ber_tag_t tag, response_tag;
+ ber_len_t len;
+ response_tag = ber_skip_element( ber, &response );
+
+ tag = ber_peek_tag( ber, &len );
+ if ( tag == LDAP_TAG_CONTROLS ) {
+ ber_skip_element( ber, &controls );
+ }
+
+ Debug( LDAP_DEBUG_CONNS, "forward_response: "
+ "%s to client %lu request #%d\n",
+ slap_msgtype2str( response_tag ), c->c_connid, op->o_client_msgid );
+
+ ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
+ output = c->c_pendingber;
+ if ( output == NULL && (output = ber_alloc()) == NULL ) {
+ ber_free( ber, 1 );
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+ return -1;
+ }
+ c->c_pendingber = output;
+
+ ber_printf( output, "t{titOtO}", LDAP_TAG_MESSAGE,
+ LDAP_TAG_MSGID, op->o_client_msgid,
+ response_tag, &response,
+ LDAP_TAG_CONTROLS, BER_BV_OPTIONAL( &controls ) );
+
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+
+ ber_free( ber, 1 );
+ client_write_cb( -1, 0, c );
+ return 0;
+}
+
+static int
+forward_final_response( Operation *op, BerElement *ber )
+{
+ int rc;
+
+ Debug( LDAP_DEBUG_CONNS, "forward_final_response: "
+ "finishing up with request #%d for client %lu\n",
+ op->o_client_msgid, op->o_client->c_connid );
+ rc = forward_response( op, ber );
+ operation_destroy( op );
+
+ return rc;
+}
+
+static int
+handle_bind_response( Operation *op, BerElement *ber )
+{
+ Connection *c = op->o_client;
+ BerElement *copy;
+ ber_int_t msgid, result;
+ ber_tag_t tag;
+ int rc = 0;
+
+ copy = ber_dup( ber );
+ if ( !copy ) {
+ rc = -1;
+ goto done;
+ }
+
+ tag = ber_scanf( copy, "{i{e" /* "}}" */, &msgid, &result );
+ ber_free( copy, 0 );
+
+ if ( tag == LBER_ERROR ) {
+ rc = -1;
+ goto done;
+ }
+
+ Debug( LDAP_DEBUG_CONNS, "handle_bind_response: "
+ "received response for bind request by client %lu, result=%d\n",
+ c->c_connid, result );
+
+ switch ( result ) {
+ case LDAP_SASL_BIND_IN_PROGRESS:
+ break;
+ case LDAP_SUCCESS:
+ default: {
+ ldap_pvt_thread_mutex_lock( &c->c_mutex );
+ c->c_state = SLAP_C_READY;
+ if ( result != LDAP_SUCCESS ) {
+ ber_memfree( c->c_auth.bv_val );
+ BER_BVZERO( &c->c_auth );
+ }
+ if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) {
+ ber_memfree( c->c_sasl_bind_mech.bv_val );
+ BER_BVZERO( &c->c_sasl_bind_mech );
+ }
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+ break;
+ }
+ }
+
+done:
+ if ( rc ) {
+ operation_destroy( op );
+ ber_free( ber, 1 );
+ return rc;
+ }
+ return forward_final_response( op, ber );
+}
+
+static int
+handle_vc_bind_response( Operation *op, BerElement *ber )
+{
+ Connection *c = op->o_client;
+ BerElement *output;
+ BerValue matched, diagmsg, creds = BER_BVNULL, controls = BER_BVNULL;
+ ber_int_t result;
+ ber_tag_t tag;
+ ber_len_t len;
+ int rc = 0;
+
+ tag = ber_scanf( ber, "{emm" /* "}" */,
+ &result, &matched, &diagmsg );
+ if ( tag == LBER_ERROR ) {
+ rc = -1;
+ goto done;
+ }
+
+ tag = ber_peek_tag( ber, &len );
+ if ( result == LDAP_PROTOCOL_ERROR ) {
+ Backend *b = op->o_upstream->c_private;
+ ldap_pvt_thread_mutex_lock( &op->o_upstream->c_mutex );
+ Debug( LDAP_DEBUG_ANY, "VC extended operation not supported on backend %s\n",
+ b->b_bindconf.sb_uri.bv_val );
+ ldap_pvt_thread_mutex_unlock( &op->o_upstream->c_mutex );
+ }
+
+ ldap_pvt_thread_mutex_lock( &c->c_mutex );
+
+ Debug( LDAP_DEBUG_CONNS, "handle_vc_bind_response: "
+ "received response for bind request by client %lu, result=%d\n",
+ c->c_connid, result );
+
+ if ( tag == LDAP_TAG_EXOP_VERIFY_CREDENTIALS_COOKIE ) {
+ if ( !BER_BVISNULL( &c->c_vc_cookie ) ) {
+ ber_memfree( c->c_vc_cookie.bv_val );
+ }
+ tag = ber_scanf( ber, "o", &c->c_vc_cookie );
+ if ( tag == LBER_ERROR ) {
+ rc = -1;
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+ goto done;
+ }
+ tag = ber_peek_tag( ber, &len );
+ }
+
+ if ( tag == LDAP_TAG_EXOP_VERIFY_CREDENTIALS_SCREDS ) {
+ tag = ber_scanf( ber, "m", &creds );
+ if ( tag == LBER_ERROR ) {
+ rc = -1;
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+ goto done;
+ }
+ tag = ber_peek_tag( ber, &len );
+ }
+
+ if ( tag == LDAP_TAG_EXOP_VERIFY_CREDENTIALS_CONTROLS ) {
+ tag = ber_scanf( ber, "m", &controls );
+ if ( tag == LBER_ERROR ) {
+ rc = -1;
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+ goto done;
+ }
+ }
+
+ switch ( result ) {
+ case LDAP_SASL_BIND_IN_PROGRESS:
+ break;
+ case LDAP_SUCCESS:
+ default: {
+ c->c_state = SLAP_C_READY;
+ if ( result != LDAP_SUCCESS ) {
+ ber_memfree( c->c_auth.bv_val );
+ BER_BVZERO( &c->c_auth );
+ }
+ if ( !BER_BVISNULL( &c->c_vc_cookie ) ) {
+ ber_memfree( c->c_vc_cookie.bv_val );
+ BER_BVZERO( &c->c_vc_cookie );
+ }
+ if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) {
+ ber_memfree( c->c_sasl_bind_mech.bv_val );
+ BER_BVZERO( &c->c_sasl_bind_mech );
+ }
+ break;
+ }
+ }
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+
+ ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
+ output = c->c_pendingber;
+ if ( output == NULL && (output = ber_alloc()) == NULL ) {
+ rc = -1;
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+ goto done;
+ }
+ c->c_pendingber = output;
+
+ rc = ber_printf( output, "t{tit{eOOtO}tO}", LDAP_TAG_MESSAGE,
+ LDAP_TAG_MSGID, op->o_client_msgid, LDAP_RES_BIND,
+ result, &matched, &diagmsg,
+ LDAP_TAG_SASL_RES_CREDS, BER_BV_OPTIONAL( &creds ),
+ LDAP_TAG_CONTROLS, BER_BV_OPTIONAL( &controls ) );
+
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+ if ( rc >= 0 ) {
+ client_write_cb( -1, 0, c );
+ rc = 0;
+ }
+
+done:
+ operation_destroy( op );
+ ber_free( ber, 1 );
+ return rc;
+}
+
+static int
+handle_unsolicited( Connection *c, BerElement *ber )
+{
+ TAvlnode *root;
+ int freed;
+
+ Debug( LDAP_DEBUG_CONNS, "handle_unsolicited: "
+ "teardown for upstream connection %lu\n",
+ c->c_connid );
+
+ root = c->c_ops;
+ c->c_ops = NULL;
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+
+ freed = tavl_free( root, (AVL_FREE)operation_lost_upstream );
+ Debug( LDAP_DEBUG_TRACE, "handle_unsolicited: "
+ "dropped %d operations\n",
+ freed );
+
+ ldap_pvt_thread_mutex_lock( &c->c_mutex );
+ upstream_destroy( c );
+ ber_free( ber, 1 );
+
+ return -1;
+}
+
+/*
+ * Pull c->c_currentber from the connection and try to look up the operation on
+ * the upstream.
+ *
+ * If it's a notice of disconnection, we won't find it and need to tear down
+ * the connection and tell the clients, if we can't find the operation, ignore
+ * the message (either client already disconnected/abandoned it or the upstream
+ * is pulling our leg).
+ *
+ * Some responses need special handling:
+ * - Bind response
+ * - VC response where the client requested a Bind (both need to update the
+ * client's bind status)
+ * - search entries/referrals and intermediate responses (will not trigger
+ * operation to be removed)
+ *
+ * If the worker pool is overloaded, we might be called directly from
+ * upstream_read_cb, at that point, the connection hasn't been muted.
+ *
+ * TODO: when the client already has data pending on write, we should mute the
+ * upstream.
+ * - should record the BerElement on the Op and the Op on the client
+ */
+static int
+handle_one_response( Connection *c )
+{
+ BerElement *ber;
+ Operation *op = NULL, needle = { .o_upstream = c };
+ OperationHandler handler = NULL;
+ ber_tag_t tag;
+ ber_len_t len;
+ int rc = 0;
+
+ ber = c->c_currentber;
+ c->c_currentber = NULL;
+
+ tag = ber_get_int( ber, &needle.o_upstream_msgid );
+ if ( tag != LDAP_TAG_MSGID ) {
+ rc = -1;
+ ber_free( ber, 1 );
+ goto fail;
+ }
+
+ if ( needle.o_upstream_msgid == 0 ) {
+ return handle_unsolicited( c, ber );
+ } else if ( !( op = tavl_find(
+ c->c_ops, &needle, operation_upstream_cmp ) ) ) {
+ /* Already abandoned, do nothing */
+ /*
+ } else if ( op->o_response_pending ) {
+ c->c_pendingop = op;
+ event_del( c->c_read_event );
+ */
+ } else {
+ /*
+ op->o_response_pending = ber;
+ */
+
+ tag = ber_peek_tag( ber, &len );
+ switch ( tag ) {
+ case LDAP_RES_SEARCH_ENTRY:
+ case LDAP_RES_SEARCH_REFERENCE:
+ case LDAP_RES_INTERMEDIATE:
+ handler = forward_response;
+ break;
+ case LDAP_RES_BIND:
+ handler = handle_bind_response;
+ break;
+ case LDAP_RES_EXTENDED:
+ if ( op->o_tag == LDAP_REQ_BIND ) {
+ handler = handle_vc_bind_response;
+ }
+ break;
+ }
+ if ( !handler ) {
+ handler = forward_final_response;
+ }
+ }
+ if ( op ) {
+ Debug( LDAP_DEBUG_TRACE, "handle_one_response: "
+ "upstream=%lu, processing response for client %lu, msgid=%d\n",
+ c->c_connid, op->o_client->c_connid, op->o_client_msgid );
+ } else {
+ tag = ber_peek_tag( ber, &len );
+ Debug( LDAP_DEBUG_TRACE, "handle_one_response: "
+ "upstream=%lu, %s, msgid=%d not for a pending operation\n",
+ c->c_connid, slap_msgtype2str( tag ), needle.o_upstream_msgid );
+ }
+
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+ if ( handler ) {
+ rc = handler( op, ber );
+ }
+ ldap_pvt_thread_mutex_lock( &c->c_mutex );
+
+fail:
+ if ( rc ) {
+ Debug( LDAP_DEBUG_ANY, "handle_one_response: "
+ "error on processing a response on upstream connection %ld\n",
+ c->c_connid );
+ upstream_destroy( c );
+ }
+ return rc;
+}
+
+/*
+ * We start off with the upstream muted and c_currentber holding the response
+ * we received.
+ *
+ * We run handle_one_response on each response, stopping once we hit an error,
+ * have to wait on reading or process slap_conn_max_pdus_per_cycle responses so
+ * as to maintain fairness and not hog the worker thread forever.
+ *
+ * If we've run out of responses from the upstream or hit the budget, we unmute
+ * the connection and run handle_one_response, it might return an 'error' when
+ * the client is blocked on writing, it's that client's job to wake us again.
+ */
+static void *
+handle_responses( void *ctx, void *arg )
+{
+ Connection *c = arg;
+ int responses_handled = 0;
+
+ ldap_pvt_thread_mutex_lock( &c->c_mutex );
+ for ( ; responses_handled < slap_conn_max_pdus_per_cycle;
+ responses_handled++ ) {
+ BerElement *ber;
+ ber_tag_t tag;
+ ber_len_t len;
+
+ if ( handle_one_response( c ) ) {
+ /* Error, connection might already have been destroyed */
+ return NULL;
+ }
+ /* Otherwise, handle_one_response leaves the connection locked */
+
+ if ( (ber = ber_alloc()) == NULL ) {
+ Debug( LDAP_DEBUG_ANY, "handle_responses: "
+ "ber_alloc failed\n" );
+ upstream_destroy( c );
+ return NULL;
+ }
+ c->c_currentber = ber;
+
+ tag = ber_get_next( c->c_sb, &len, ber );
+ if ( tag != LDAP_TAG_MESSAGE ) {
+ int err = sock_errno();
+
+ if ( err != EWOULDBLOCK && err != EAGAIN ) {
+ char ebuf[128];
+ Debug( LDAP_DEBUG_ANY, "handle_responses: "
+ "ber_get_next on fd %d failed errno=%d (%s)\n",
+ c->c_fd, err,
+ sock_errstr( err, ebuf, sizeof(ebuf) ) );
+
+ c->c_currentber = NULL;
+ ber_free( ber, 1 );
+ upstream_destroy( c );
+ return NULL;
+ }
+ break;
+ }
+ }
+
+ event_add( c->c_read_event, NULL );
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+ return NULL;
+}
+
+/*
+ * Initial read on the upstream connection, if we get an LDAP PDU, submit the
+ * processing of this and successive ones to the work queue.
+ *
+ * If we can't submit it to the queue (overload), process this one and return
+ * to the event loop immediately after.
+ */
void
upstream_read_cb( evutil_socket_t s, short what, void *arg )
{
Connection *c = arg;
BerElement *ber;
ber_tag_t tag;
- Operation *op, needle = { .o_upstream = c };
ber_len_t len;
- int finished = 0;
ldap_pvt_thread_mutex_lock( &c->c_mutex );
Debug( LDAP_DEBUG_CONNS, "upstream_read_cb: "
@@ -48,6 +472,7 @@ upstream_read_cb( evutil_socket_t s, short what, void *arg )
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
return;
}
+ c->c_currentber = ber;
tag = ber_get_next( c->c_sb, &len, ber );
if ( tag != LDAP_TAG_MESSAGE ) {
@@ -60,84 +485,29 @@ upstream_read_cb( evutil_socket_t s, short what, void *arg )
c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
c->c_currentber = NULL;
- goto fail;
+ ber_free( ber, 1 );
+ upstream_destroy( c );
+ return;
}
- c->c_currentber = ber;
+ event_add( c->c_read_event, NULL );
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
return;
}
- c->c_currentber = NULL;
-
- tag = ber_get_int( ber, &needle.o_upstream_msgid );
- if ( tag != LDAP_TAG_MSGID || needle.o_upstream_msgid == 0 ) {
- goto fail;
- }
-
- op = tavl_find( c->c_ops, &needle, operation_upstream_cmp );
- if ( !op ) {
- ber_free( ber, 1 );
- } else {
- Connection *client = op->o_client;
- BerElement *output;
- BerValue response, controls;
- ber_tag_t type;
-
- type = ber_skip_element( ber, &response );
- switch ( type ) {
- case LDAP_RES_SEARCH_ENTRY:
- case LDAP_RES_SEARCH_REFERENCE:
- case LDAP_RES_INTERMEDIATE:
- break;
- default:
- finished = 1;
- tavl_delete( &c->c_ops, op, operation_upstream_cmp );
- break;
+ if ( !slap_conn_max_pdus_per_cycle ||
+ ldap_pvt_thread_pool_submit(
+ &connection_pool, handle_responses, c ) ) {
+ /* If we're overloaded or configured as such, process one and resume in
+ * the next cycle */
+ if ( handle_one_response( c ) == LDAP_SUCCESS ) {
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
}
- ldap_pvt_thread_mutex_unlock( &c->c_mutex );
-
- tag = ber_peek_tag( ber, &len );
- if ( tag == LDAP_TAG_CONTROLS ) {
- tag = ber_skip_element( ber, &controls );
- }
-
- output = ber_alloc();
- if ( !output ) {
- goto fail;
- }
-
- ber_start_seq( output, LDAP_TAG_MESSAGE );
- ber_put_int( output, op->o_client_msgid, LDAP_TAG_MSGID );
- ber_put_berval( output, &response, type );
- if ( tag == LDAP_TAG_CONTROLS ) {
- ber_put_berval( output, &controls, LDAP_TAG_CONTROLS );
- }
- ber_put_seq( output );
-
- if ( finished ) {
- ldap_pvt_thread_mutex_lock( &client->c_mutex );
- tavl_delete( &client->c_ops, op, operation_client_cmp );
- ldap_pvt_thread_mutex_unlock( &client->c_mutex );
- operation_destroy( op );
- }
-
- ldap_pvt_thread_mutex_lock( &client->c_io_mutex );
- client->c_pendingber = output;
- ldap_pvt_thread_mutex_unlock( &client->c_io_mutex );
-
- client_write_cb( -1, 0, client );
return;
}
+ event_del( c->c_read_event );
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
-
return;
-fail:
- Debug( LDAP_DEBUG_ANY, "upstream_read_cb: "
- "error on processing a response on upstream connection %ld\n",
- c->c_connid );
- ber_free( ber, 1 );
- upstream_destroy( c );
}
void
@@ -205,6 +575,7 @@ upstream_bind_cb( evutil_socket_t s, short what, void *arg )
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
return;
}
+ c->c_currentber = ber;
tag = ber_get_next( c->c_sb, &len, ber );
if ( tag != LDAP_TAG_MESSAGE ) {
@@ -219,7 +590,6 @@ upstream_bind_cb( evutil_socket_t s, short what, void *arg )
c->c_currentber = NULL;
goto fail;
}
- c->c_currentber = ber;
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
return;
}
@@ -409,7 +779,7 @@ fail:
return NULL;
}
-static void
+void
upstream_destroy( Connection *c )
{
Backend *b = c->c_private;