From 639c5912f51c85d857207fa1a7b4ccee42d2f8f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Kuzn=C3=ADk?= Date: Tue, 28 Mar 2017 18:40:20 +0100 Subject: [PATCH] Client authentication --- servers/lloadd/Makefile.in | 8 +- servers/lloadd/backend.c | 4 +- servers/lloadd/bind.c | 275 +++++++++++++++++++ servers/lloadd/client.c | 66 +++-- servers/lloadd/operation.c | 119 ++++++++- servers/lloadd/proto-slap.h | 13 +- servers/lloadd/slap.h | 13 +- servers/lloadd/upstream.c | 514 +++++++++++++++++++++++++++++++----- 8 files changed, 897 insertions(+), 115 deletions(-) create mode 100644 servers/lloadd/bind.c diff --git a/servers/lloadd/Makefile.in b/servers/lloadd/Makefile.in index 736e2bbeba..6f34aebf5f 100644 --- a/servers/lloadd/Makefile.in +++ b/servers/lloadd/Makefile.in @@ -21,10 +21,10 @@ XSRCS = version.c NT_SRCS = nt_svc.c NT_OBJS = nt_svc.o ../../libraries/liblutil/slapdmsg.res -SRCS = main.c globals.c backend.c config.c connection.c client.c daemon.c \ - ch_malloc.c init.c operation.c user.c sl_malloc.c upstream.c value.c \ - libevent_support.c \ - $(@PLAT@_SRCS) +SRCS = main.c globals.c backend.c bind.c config.c connection.c client.c \ + daemon.c ch_malloc.c init.c operation.c user.c sl_malloc.c \ + upstream.c value.c libevent_support.c \ + $(@PLAT@_SRCS) OBJS = $(patsubst %.c,%.o,$(SRCS)) $(@PLAT@_OBJS) diff --git a/servers/lloadd/backend.c b/servers/lloadd/backend.c index 895693b72b..534277b2c2 100644 --- a/servers/lloadd/backend.c +++ b/servers/lloadd/backend.c @@ -86,12 +86,12 @@ backend_select( Operation *op ) ldap_pvt_thread_mutex_lock( &b->b_mutex ); c = b->b_conns; - ldap_pvt_thread_mutex_lock( &c->c_mutex ); + ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); if ( c->c_state == SLAP_C_READY && !c->c_pendingber ) { ldap_pvt_thread_mutex_unlock( &b->b_mutex ); return b->b_conns; } - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); ldap_pvt_thread_mutex_unlock( &b->b_mutex ); } diff --git a/servers/lloadd/bind.c b/servers/lloadd/bind.c new file mode 100644 index 0000000000..c0061f9f40 --- /dev/null +++ b/servers/lloadd/bind.c @@ -0,0 +1,275 @@ +/* $OpenLDAP$ */ +/* This work is part of OpenLDAP Software . + * + * Copyright 1998-2020 The OpenLDAP Foundation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted only as authorized by the OpenLDAP + * Public License. + * + * A copy of this license is available in the file LICENSE in the + * top-level directory of the distribution or, alternatively, at + * . + */ + +#include "portable.h" + +#include +#include +#include +#include +#include + +#include "lutil.h" +#include "slap.h" + +static int +request_bind( Operation *op ) +{ + Connection *c = op->o_upstream; + BerElement *ber, *copy = NULL; + BerValue binddn; + ber_tag_t tag; + ber_int_t version; + + ber = c->c_pendingber; + if ( ber == NULL && (ber = ber_alloc()) == NULL ) { + Debug( LDAP_DEBUG_ANY, "request_bind: " + "ber_alloc failed\n" ); + goto fail; + } + c->c_pendingber = ber; + + if ( (copy = ber_alloc()) == NULL ) { + goto fail; + } + ber_init2( copy, &op->o_request, 0 ); + + tag = ber_get_int( copy, &version ); + if ( tag == LBER_ERROR ) { + goto fail; + } else if ( version != LDAP_VERSION3 ) { + /* TODO: result code and message */ + operation_send_reject( + op, LDAP_PROTOCOL_ERROR, "LDAP version unsupported" ); + ber_free( copy, 0 ); + return 0; + } + + tag = ber_get_stringbv( copy, &binddn, LBER_BV_NOTERM ); + if ( tag == LBER_ERROR ) { + goto fail; + } + + ldap_pvt_thread_mutex_lock( &c->c_mutex ); + if ( !BER_BVISNULL( &c->c_auth ) ) { + ber_memfree( c->c_auth.bv_val ); + } + + if ( !BER_BVISEMPTY( &binddn ) ) { + char *ptr; + c->c_auth.bv_len = STRLENOF("dn:") + binddn.bv_len; + c->c_auth.bv_val = ch_malloc( c->c_auth.bv_len + 1 ); + + ptr = lutil_strcopy( c->c_auth.bv_val, "dn:" ); + ptr = lutil_strncopy( ptr, binddn.bv_val, binddn.bv_len ); + *ptr = '\0'; + } else { + BER_BVZERO( &c->c_auth ); + } + + op->o_upstream_msgid = c->c_next_msgid++; + + ber_printf( ber, "t{titOtO}", LDAP_TAG_MESSAGE, + LDAP_TAG_MSGID, op->o_upstream_msgid, + LDAP_REQ_BIND, &op->o_request, + LDAP_TAG_CONTROLS, BER_BV_OPTIONAL( &op->o_ctrls ) ); + + if ( tavl_insert( &c->c_ops, op, operation_upstream_cmp, avl_dup_error ) ) { + assert(0); + } + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + + ber_free( copy, 0 ); + upstream_write_cb( -1, 0, c ); + return 0; + +fail: + if ( copy ) { + ber_free( copy, 0 ); + } + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + ldap_pvt_thread_mutex_lock( &op->o_client->c_mutex ); + client_destroy( op->o_client ); + return 1; +} + +static int +request_bind_as_vc( Operation *op ) +{ + Connection *c = op->o_upstream; + BerElement *ber, *request, *copy = NULL; + BerValue binddn, auth, mech; + ber_int_t version; + ber_tag_t tag; + ber_len_t len; + + if ( (request = ber_alloc()) == NULL ) { + goto fail; + } + ber_init2( request, &op->o_request, 0 ); + + tag = ber_scanf( request, "im", &version, &binddn ); + if ( tag == LBER_ERROR || version != LDAP_VERSION3 ) { + goto fail; + } + + copy = ber_dup( request ); + if ( !copy ) { + goto fail; + } + + tag = ber_skip_element( request, &auth ); + if ( tag == LBER_ERROR ) { + goto fail; + } + + ber = c->c_pendingber; + if ( ber == NULL && (ber = ber_alloc()) == NULL ) { + Debug( LDAP_DEBUG_ANY, "request_bind_as_vc: " + "ber_alloc failed\n" ); + goto fail; + } + c->c_pendingber = ber; + + op->o_upstream_msgid = c->c_next_msgid++; + + ldap_pvt_thread_mutex_lock( &c->c_mutex ); + ber_printf( ber, "t{tit{tst{{tOOtOtO}}}}", LDAP_TAG_MESSAGE, + LDAP_TAG_MSGID, op->o_upstream_msgid, + LDAP_REQ_EXTENDED, + LDAP_TAG_EXOP_REQ_OID, LDAP_EXOP_VERIFY_CREDENTIALS, + LDAP_TAG_EXOP_REQ_VALUE, + LDAP_TAG_EXOP_VERIFY_CREDENTIALS_COOKIE, BER_BV_OPTIONAL( &c->c_vc_cookie ), + &binddn, tag, &auth, + LDAP_TAG_EXOP_VERIFY_CREDENTIALS_CONTROLS, BER_BV_OPTIONAL( &op->o_ctrls ) ); + + tag = ber_peek_tag( copy, &len ); + switch ( tag ) { + case LDAP_AUTH_SASL: + ber_get_stringbv( copy, &mech, LBER_BV_NOTERM ); + if ( ber_bvcmp( &mech, &c->c_sasl_bind_mech ) ) { + ber_memfree( c->c_sasl_bind_mech.bv_val ); + ber_dupbv( &c->c_sasl_bind_mech, &mech ); + } + /* TODO: extract authzdn from the message */ + break; + case LDAP_AUTH_SIMPLE: + if ( !BER_BVISNULL( &c->c_auth ) ) { + ber_memfree( c->c_auth.bv_val ); + } + ber_dupbv( &c->c_auth, &binddn ); + if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) { + ber_memfree( c->c_sasl_bind_mech.bv_val ); + BER_BVZERO( &c->c_sasl_bind_mech ); + } + break; + default: + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + goto fail; + } + if ( tavl_insert( &c->c_ops, op, operation_upstream_cmp, avl_dup_error ) ) { + assert(0); + } + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + + ber_free( copy, 0 ); + upstream_write_cb( -1, 0, c ); + return 0; + +fail: + if ( copy ) { + ber_free( copy, 0 ); + } + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + ldap_pvt_thread_mutex_lock( &op->o_client->c_mutex ); + client_destroy( op->o_client ); + return 1; +} + +void * +client_reset( void *ctx, void *arg ) +{ + Operation *op = arg; + Connection *c = op->o_client; + TAvlnode *root; + int freed, destroy = 1; + + ldap_pvt_thread_mutex_lock( &c->c_mutex ); + root = c->c_ops; + c->c_ops = NULL; + c->c_state = SLAP_C_CLOSING; + if ( op->o_tag == LDAP_REQ_BIND ) { + c->c_state = SLAP_C_BINDING; + destroy = 0; + } + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + + tavl_delete( &root, op, operation_client_cmp ); + freed = tavl_free( root, (AVL_FREE)operation_abandon ); + + Debug( LDAP_DEBUG_TRACE, "client_reset: " + "dropped %d operations\n", + freed ); + + if ( destroy ) { + operation_destroy( op ); + ldap_pvt_thread_mutex_lock( &c->c_mutex ); + client_destroy( c ); + } + + return NULL; +} + +void * +client_bind( void *ctx, void *arg ) +{ + Operation *op = arg; + Connection *upstream, *client = op->o_client; + int rc = 0; + + client_reset( ctx, arg ); + + upstream = backend_select( op ); + if ( !upstream ) { + Debug( LDAP_DEBUG_STATS, "client_bind: " + "no available connection found\n" ); + operation_send_reject( + op, LDAP_UNAVAILABLE, "no connections available" ); + return NULL; + } + + op->o_upstream = upstream; + if ( upstream->c_features & SLAP_C_VC ) { + rc = request_bind_as_vc( op ); + } else { + rc = request_bind( op ); + } + + if ( rc ) { + /* client doesn't exist anymore */ + return NULL; + } + + ldap_pvt_thread_mutex_lock( &client->c_mutex ); + rc = tavl_insert( &client->c_ops, op, operation_client_cmp, avl_dup_error ); + assert( rc == LDAP_SUCCESS ); + ldap_pvt_thread_mutex_unlock( &client->c_mutex ); + + return NULL; +} diff --git a/servers/lloadd/client.c b/servers/lloadd/client.c index 7c4b1eccb3..1a3565916b 100644 --- a/servers/lloadd/client.c +++ b/servers/lloadd/client.c @@ -24,16 +24,15 @@ #include "lutil.h" #include "slap.h" -static void client_destroy( Connection *c ); - static void client_read_cb( evutil_socket_t s, short what, void *arg ) { Connection *c = arg; BerElement *ber; - Operation *op; + Operation *op = NULL; ber_tag_t tag; ber_len_t len; + int rc = 0; ldap_pvt_thread_mutex_lock( &c->c_mutex ); Debug( LDAP_DEBUG_CONNS, "client_read_cb: " @@ -71,26 +70,47 @@ client_read_cb( evutil_socket_t s, short what, void *arg ) if ( !op ) { Debug( LDAP_DEBUG_ANY, "client_read_cb: " "operation_init failed\n" ); - goto fail; - } - - if ( ldap_pvt_thread_pool_submit( - &connection_pool, operation_process, op ) ) { - /* what could have happened? */ - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); - operation_destroy( op ); - ldap_pvt_thread_mutex_lock( &c->c_mutex ); - goto fail; - } - - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); - return; -fail: - client_destroy( c ); - - if ( ber ) { ber_free( ber, 1 ); + goto fail; } + + switch ( op->o_tag ) { + case LDAP_REQ_UNBIND: + /* We do not expect anything more from the client */ + event_del( c->c_read_event ); + + rc = ldap_pvt_thread_pool_submit( + &connection_pool, client_reset, op ); + if ( rc ) { + tavl_delete( &c->c_ops, op, operation_client_cmp ); + operation_destroy( op ); + client_destroy( c ); + return; + } + break; + case LDAP_REQ_BIND: + rc = ldap_pvt_thread_pool_submit( + &connection_pool, client_bind, op ); + break; + default: + rc = ldap_pvt_thread_pool_submit( + &connection_pool, request_process, op ); + break; + } + + if ( !rc ) { + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + return; + } + +fail: + if ( op ) { + tavl_delete( &c->c_ops, op, operation_client_cmp ); + op->o_client = NULL; + operation_destroy( op ); + } + + client_destroy( c ); return; } @@ -171,12 +191,14 @@ fail: return NULL; } -static void +void client_destroy( Connection *c ) { + assert( c->c_read_event != NULL ); event_del( c->c_read_event ); event_free( c->c_read_event ); + assert( c->c_write_event != NULL ); event_del( c->c_write_event ); event_free( c->c_write_event ); diff --git a/servers/lloadd/operation.c b/servers/lloadd/operation.c index 4b881ad545..0502531042 100644 --- a/servers/lloadd/operation.c +++ b/servers/lloadd/operation.c @@ -108,6 +108,8 @@ operation_destroy( Operation *op ) { Connection *c; + ber_free( op->o_ber, 1 ); + /* TODO: this is a stopgap and there are many races here, just get * something in to test with until we implement the freelist */ if ( op->o_client ) { @@ -137,6 +139,7 @@ operation_init( Connection *c, BerElement *ber ) op = ch_calloc( 1, sizeof(Operation) ); op->o_client = c; + op->o_ber = ber; tag = ber_get_int( ber, &op->o_client_msgid ); if ( tag != LDAP_TAG_MSGID ) { @@ -179,8 +182,97 @@ fail: return NULL; } +void +operation_abandon( Operation *op ) +{ + int rc; + + if ( op->o_upstream ) { + Connection *c = op->o_upstream; + BerElement *ber; + + ldap_pvt_thread_mutex_lock( &c->c_mutex ); + rc = ( tavl_delete( &c->c_ops, op, operation_upstream_cmp ) == NULL ); + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + + if ( rc ) { + /* The operation has already been abandoned or finished */ + goto done; + } + + ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); + + ber = c->c_pendingber; + if ( ber == NULL && (ber = ber_alloc()) == NULL ) { + Debug( LDAP_DEBUG_ANY, "operation_abandon: " + "ber_alloc failed\n" ); + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + return; + } + c->c_pendingber = ber; + + rc = ber_printf( ber, "t{titi}", LDAP_TAG_MESSAGE, + LDAP_TAG_MSGID, c->c_next_msgid++, + LDAP_REQ_ABANDON, op->o_upstream_msgid ); + + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + + if ( rc == -1 ) { + ber_free( ber, 1 ); + return; + } + upstream_write_cb( -1, 0, c ); + } + +done: + operation_destroy( op ); +} + +void +operation_send_reject( Operation *op, int result, const char *msg ) +{ + Connection *c = op->o_client; + BerElement *ber; + int found; + + ldap_pvt_thread_mutex_lock( &c->c_mutex ); + found = ( tavl_delete( &c->c_ops, op, operation_client_cmp ) == op ); + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + + if ( !found ) { + return; + } + + ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); + + ber = c->c_pendingber; + if ( ber == NULL && (ber = ber_alloc()) == NULL ) { + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + client_destroy( c ); + return; + } + c->c_pendingber = ber; + + ber_printf( ber, "t{tit{ess}}", LDAP_TAG_MESSAGE, + LDAP_TAG_MSGID, op->o_client_msgid, + slap_req2res( op->o_tag ), result, "", msg ); + + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + + client_write_cb( -1, 0, c ); + + operation_destroy( op ); +} + +void +operation_lost_upstream( Operation *op ) +{ + operation_send_reject( op, LDAP_UNAVAILABLE, + "connection to the remote server has been severed" ); +} + void * -operation_process( void *ctx, void *arg ) +request_process( void *ctx, void *arg ) { Operation *op = arg; BerElement *output; @@ -190,33 +282,34 @@ operation_process( void *ctx, void *arg ) c = backend_select( op ); if ( !c ) { - Debug( LDAP_DEBUG_STATS, "operation_process: " + Debug( LDAP_DEBUG_STATS, "request_process: " "no available connection found\n" ); goto fail; } op->o_upstream = c; - c->c_pendingber = output = ber_alloc(); - if ( !output ) { + output = c->c_pendingber; + if ( output == NULL && (output = ber_alloc()) == NULL ) { goto fail; } + c->c_pendingber = output; + ldap_pvt_thread_mutex_lock( &c->c_mutex ); op->o_upstream_msgid = msgid = c->c_next_msgid++; rc = tavl_insert( &c->c_ops, op, operation_upstream_cmp, avl_dup_error ); assert( rc == LDAP_SUCCESS ); - - ber_start_seq( output, LDAP_TAG_MESSAGE ); - ber_put_int( output, msgid, LDAP_TAG_MSGID ); - ber_put_berval( output, &op->o_request, op->o_tag ); - if ( !BER_BVISNULL( &op->o_ctrls ) ) { - ber_put_berval( output, &op->o_ctrls, LDAP_TAG_CONTROLS ); - } - ber_put_seq( output ); - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + + ber_printf( output, "t{titOtO}", LDAP_TAG_MESSAGE, + LDAP_TAG_MSGID, msgid, + op->o_tag, &op->o_request, + LDAP_TAG_CONTROLS, BER_BV_OPTIONAL( &op->o_ctrls ) ); + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + upstream_write_cb( -1, 0, c ); return NULL; fail: + operation_send_reject( op, LDAP_OTHER, "internal error" ); return NULL; } diff --git a/servers/lloadd/proto-slap.h b/servers/lloadd/proto-slap.h index 93bd8be1d7..4643f00900 100644 --- a/servers/lloadd/proto-slap.h +++ b/servers/lloadd/proto-slap.h @@ -58,11 +58,18 @@ LDAP_SLAPD_F (void) ch_free( void * ); #define free ch_free #endif +/* + * bind.c + */ +LDAP_SLAPD_F (void *) client_reset( void *ctx, void *arg ); +LDAP_SLAPD_F (void *) client_bind( void *ctx, void *arg ); + /* * client.c */ LDAP_SLAPD_F (Connection *) client_init( ber_socket_t s, Listener *url, const char *peername, struct event_base *base, int use_tls ); LDAP_SLAPD_F (void) client_write_cb( evutil_socket_t s, short what, void *arg ); +LDAP_SLAPD_F (void) client_destroy( Connection *c ); /* * config.c @@ -139,9 +146,12 @@ LDAP_SLAPD_V (const char *) slapd_slp_attrs; LDAP_SLAPD_F (const char *) slap_msgtype2str( ber_tag_t tag ); LDAP_SLAPD_F (int) operation_upstream_cmp( const void *l, const void *r ); LDAP_SLAPD_F (int) operation_client_cmp( const void *l, const void *r ); -LDAP_SLAPD_F (void *) operation_process( void *ctx, void *arg ); LDAP_SLAPD_F (Operation *) operation_init( Connection *c, BerElement *ber ); +LDAP_SLAPD_F (void) operation_abandon( Operation *op ); +LDAP_SLAPD_F (void) operation_send_reject( Operation *op, int result, const char *msg ); +LDAP_SLAPD_F (void) operation_lost_upstream( Operation *op ); LDAP_SLAPD_F (void) operation_destroy( Operation *op ); +LDAP_SLAPD_F (void *) request_process( void *ctx, void *arg ); /* * sl_malloc.c @@ -168,6 +178,7 @@ LDAP_SLAPD_F (void *) slap_sl_context( void *ptr ); LDAP_SLAPD_F (void) upstream_write_cb( evutil_socket_t s, short what, void *arg ); LDAP_SLAPD_F (void) upstream_read_cb( evutil_socket_t s, short what, void *arg ); LDAP_SLAPD_F (Connection *) upstream_init( ber_socket_t s, Backend *b ); +LDAP_SLAPD_F (void) upstream_destroy( Connection *c ); /* * user.c diff --git a/servers/lloadd/slap.h b/servers/lloadd/slap.h index b9181392e4..01250e5c3b 100644 --- a/servers/lloadd/slap.h +++ b/servers/lloadd/slap.h @@ -233,6 +233,7 @@ enum lload_tls_type { LLOAD_STARTTLS, }; +/* Can hold mutex when locking a linked connection */ struct Backend { struct slap_bindconf b_bindconf; ldap_pvt_thread_mutex_t b_mutex; @@ -247,6 +248,8 @@ struct Backend { LDAP_STAILQ_ENTRY(Backend) b_next; }; +typedef int (*OperationHandler)( Operation *op, BerElement *ber ); + /* connection state (protected by c_mutex) */ enum sc_state { SLAP_C_INVALID = 0, /* MUST BE ZERO (0) */ @@ -276,8 +279,16 @@ struct Connection { struct event *c_read_event, *c_write_event; /* can only be changed by binding thread */ - struct berval c_sasl_bind_mech; /* mech in progress */ + int c_features; +#define SLAP_C_VC 1 + struct berval c_sasl_bind_mech; /* mech in progress */ + struct berval c_auth; /* authcDN (possibly in progress) */ + + struct berval c_vc_cookie; + + /* Can be held while acquiring c_mutex to inject things into c_ops or + * destroy the connection */ ldap_pvt_thread_mutex_t c_io_mutex; /* only one pdu written at a time */ BerElement *c_currentber; /* ber we're attempting to read */ diff --git a/servers/lloadd/upstream.c b/servers/lloadd/upstream.c index 7d7de4f080..f376d92012 100644 --- a/servers/lloadd/upstream.c +++ b/servers/lloadd/upstream.c @@ -24,17 +24,441 @@ #include "lutil.h" #include "slap.h" -static void upstream_destroy( Connection *c ); +static int +forward_response( Operation *op, BerElement *ber ) +{ + Connection *c = op->o_client; + BerElement *output; + BerValue response, controls = BER_BVNULL; + ber_tag_t tag, response_tag; + ber_len_t len; + response_tag = ber_skip_element( ber, &response ); + + tag = ber_peek_tag( ber, &len ); + if ( tag == LDAP_TAG_CONTROLS ) { + ber_skip_element( ber, &controls ); + } + + Debug( LDAP_DEBUG_CONNS, "forward_response: " + "%s to client %lu request #%d\n", + slap_msgtype2str( response_tag ), c->c_connid, op->o_client_msgid ); + + ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); + output = c->c_pendingber; + if ( output == NULL && (output = ber_alloc()) == NULL ) { + ber_free( ber, 1 ); + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + return -1; + } + c->c_pendingber = output; + + ber_printf( output, "t{titOtO}", LDAP_TAG_MESSAGE, + LDAP_TAG_MSGID, op->o_client_msgid, + response_tag, &response, + LDAP_TAG_CONTROLS, BER_BV_OPTIONAL( &controls ) ); + + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + + ber_free( ber, 1 ); + client_write_cb( -1, 0, c ); + return 0; +} + +static int +forward_final_response( Operation *op, BerElement *ber ) +{ + int rc; + + Debug( LDAP_DEBUG_CONNS, "forward_final_response: " + "finishing up with request #%d for client %lu\n", + op->o_client_msgid, op->o_client->c_connid ); + rc = forward_response( op, ber ); + operation_destroy( op ); + + return rc; +} + +static int +handle_bind_response( Operation *op, BerElement *ber ) +{ + Connection *c = op->o_client; + BerElement *copy; + ber_int_t msgid, result; + ber_tag_t tag; + int rc = 0; + + copy = ber_dup( ber ); + if ( !copy ) { + rc = -1; + goto done; + } + + tag = ber_scanf( copy, "{i{e" /* "}}" */, &msgid, &result ); + ber_free( copy, 0 ); + + if ( tag == LBER_ERROR ) { + rc = -1; + goto done; + } + + Debug( LDAP_DEBUG_CONNS, "handle_bind_response: " + "received response for bind request by client %lu, result=%d\n", + c->c_connid, result ); + + switch ( result ) { + case LDAP_SASL_BIND_IN_PROGRESS: + break; + case LDAP_SUCCESS: + default: { + ldap_pvt_thread_mutex_lock( &c->c_mutex ); + c->c_state = SLAP_C_READY; + if ( result != LDAP_SUCCESS ) { + ber_memfree( c->c_auth.bv_val ); + BER_BVZERO( &c->c_auth ); + } + if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) { + ber_memfree( c->c_sasl_bind_mech.bv_val ); + BER_BVZERO( &c->c_sasl_bind_mech ); + } + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + break; + } + } + +done: + if ( rc ) { + operation_destroy( op ); + ber_free( ber, 1 ); + return rc; + } + return forward_final_response( op, ber ); +} + +static int +handle_vc_bind_response( Operation *op, BerElement *ber ) +{ + Connection *c = op->o_client; + BerElement *output; + BerValue matched, diagmsg, creds = BER_BVNULL, controls = BER_BVNULL; + ber_int_t result; + ber_tag_t tag; + ber_len_t len; + int rc = 0; + + tag = ber_scanf( ber, "{emm" /* "}" */, + &result, &matched, &diagmsg ); + if ( tag == LBER_ERROR ) { + rc = -1; + goto done; + } + + tag = ber_peek_tag( ber, &len ); + if ( result == LDAP_PROTOCOL_ERROR ) { + Backend *b = op->o_upstream->c_private; + ldap_pvt_thread_mutex_lock( &op->o_upstream->c_mutex ); + Debug( LDAP_DEBUG_ANY, "VC extended operation not supported on backend %s\n", + b->b_bindconf.sb_uri.bv_val ); + ldap_pvt_thread_mutex_unlock( &op->o_upstream->c_mutex ); + } + + ldap_pvt_thread_mutex_lock( &c->c_mutex ); + + Debug( LDAP_DEBUG_CONNS, "handle_vc_bind_response: " + "received response for bind request by client %lu, result=%d\n", + c->c_connid, result ); + + if ( tag == LDAP_TAG_EXOP_VERIFY_CREDENTIALS_COOKIE ) { + if ( !BER_BVISNULL( &c->c_vc_cookie ) ) { + ber_memfree( c->c_vc_cookie.bv_val ); + } + tag = ber_scanf( ber, "o", &c->c_vc_cookie ); + if ( tag == LBER_ERROR ) { + rc = -1; + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + goto done; + } + tag = ber_peek_tag( ber, &len ); + } + + if ( tag == LDAP_TAG_EXOP_VERIFY_CREDENTIALS_SCREDS ) { + tag = ber_scanf( ber, "m", &creds ); + if ( tag == LBER_ERROR ) { + rc = -1; + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + goto done; + } + tag = ber_peek_tag( ber, &len ); + } + + if ( tag == LDAP_TAG_EXOP_VERIFY_CREDENTIALS_CONTROLS ) { + tag = ber_scanf( ber, "m", &controls ); + if ( tag == LBER_ERROR ) { + rc = -1; + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + goto done; + } + } + + switch ( result ) { + case LDAP_SASL_BIND_IN_PROGRESS: + break; + case LDAP_SUCCESS: + default: { + c->c_state = SLAP_C_READY; + if ( result != LDAP_SUCCESS ) { + ber_memfree( c->c_auth.bv_val ); + BER_BVZERO( &c->c_auth ); + } + if ( !BER_BVISNULL( &c->c_vc_cookie ) ) { + ber_memfree( c->c_vc_cookie.bv_val ); + BER_BVZERO( &c->c_vc_cookie ); + } + if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) { + ber_memfree( c->c_sasl_bind_mech.bv_val ); + BER_BVZERO( &c->c_sasl_bind_mech ); + } + break; + } + } + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + + ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); + output = c->c_pendingber; + if ( output == NULL && (output = ber_alloc()) == NULL ) { + rc = -1; + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + goto done; + } + c->c_pendingber = output; + + rc = ber_printf( output, "t{tit{eOOtO}tO}", LDAP_TAG_MESSAGE, + LDAP_TAG_MSGID, op->o_client_msgid, LDAP_RES_BIND, + result, &matched, &diagmsg, + LDAP_TAG_SASL_RES_CREDS, BER_BV_OPTIONAL( &creds ), + LDAP_TAG_CONTROLS, BER_BV_OPTIONAL( &controls ) ); + + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + if ( rc >= 0 ) { + client_write_cb( -1, 0, c ); + rc = 0; + } + +done: + operation_destroy( op ); + ber_free( ber, 1 ); + return rc; +} + +static int +handle_unsolicited( Connection *c, BerElement *ber ) +{ + TAvlnode *root; + int freed; + + Debug( LDAP_DEBUG_CONNS, "handle_unsolicited: " + "teardown for upstream connection %lu\n", + c->c_connid ); + + root = c->c_ops; + c->c_ops = NULL; + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + + freed = tavl_free( root, (AVL_FREE)operation_lost_upstream ); + Debug( LDAP_DEBUG_TRACE, "handle_unsolicited: " + "dropped %d operations\n", + freed ); + + ldap_pvt_thread_mutex_lock( &c->c_mutex ); + upstream_destroy( c ); + ber_free( ber, 1 ); + + return -1; +} + +/* + * Pull c->c_currentber from the connection and try to look up the operation on + * the upstream. + * + * If it's a notice of disconnection, we won't find it and need to tear down + * the connection and tell the clients, if we can't find the operation, ignore + * the message (either client already disconnected/abandoned it or the upstream + * is pulling our leg). + * + * Some responses need special handling: + * - Bind response + * - VC response where the client requested a Bind (both need to update the + * client's bind status) + * - search entries/referrals and intermediate responses (will not trigger + * operation to be removed) + * + * If the worker pool is overloaded, we might be called directly from + * upstream_read_cb, at that point, the connection hasn't been muted. + * + * TODO: when the client already has data pending on write, we should mute the + * upstream. + * - should record the BerElement on the Op and the Op on the client + */ +static int +handle_one_response( Connection *c ) +{ + BerElement *ber; + Operation *op = NULL, needle = { .o_upstream = c }; + OperationHandler handler = NULL; + ber_tag_t tag; + ber_len_t len; + int rc = 0; + + ber = c->c_currentber; + c->c_currentber = NULL; + + tag = ber_get_int( ber, &needle.o_upstream_msgid ); + if ( tag != LDAP_TAG_MSGID ) { + rc = -1; + ber_free( ber, 1 ); + goto fail; + } + + if ( needle.o_upstream_msgid == 0 ) { + return handle_unsolicited( c, ber ); + } else if ( !( op = tavl_find( + c->c_ops, &needle, operation_upstream_cmp ) ) ) { + /* Already abandoned, do nothing */ + /* + } else if ( op->o_response_pending ) { + c->c_pendingop = op; + event_del( c->c_read_event ); + */ + } else { + /* + op->o_response_pending = ber; + */ + + tag = ber_peek_tag( ber, &len ); + switch ( tag ) { + case LDAP_RES_SEARCH_ENTRY: + case LDAP_RES_SEARCH_REFERENCE: + case LDAP_RES_INTERMEDIATE: + handler = forward_response; + break; + case LDAP_RES_BIND: + handler = handle_bind_response; + break; + case LDAP_RES_EXTENDED: + if ( op->o_tag == LDAP_REQ_BIND ) { + handler = handle_vc_bind_response; + } + break; + } + if ( !handler ) { + handler = forward_final_response; + } + } + if ( op ) { + Debug( LDAP_DEBUG_TRACE, "handle_one_response: " + "upstream=%lu, processing response for client %lu, msgid=%d\n", + c->c_connid, op->o_client->c_connid, op->o_client_msgid ); + } else { + tag = ber_peek_tag( ber, &len ); + Debug( LDAP_DEBUG_TRACE, "handle_one_response: " + "upstream=%lu, %s, msgid=%d not for a pending operation\n", + c->c_connid, slap_msgtype2str( tag ), needle.o_upstream_msgid ); + } + + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + if ( handler ) { + rc = handler( op, ber ); + } + ldap_pvt_thread_mutex_lock( &c->c_mutex ); + +fail: + if ( rc ) { + Debug( LDAP_DEBUG_ANY, "handle_one_response: " + "error on processing a response on upstream connection %ld\n", + c->c_connid ); + upstream_destroy( c ); + } + return rc; +} + +/* + * We start off with the upstream muted and c_currentber holding the response + * we received. + * + * We run handle_one_response on each response, stopping once we hit an error, + * have to wait on reading or process slap_conn_max_pdus_per_cycle responses so + * as to maintain fairness and not hog the worker thread forever. + * + * If we've run out of responses from the upstream or hit the budget, we unmute + * the connection and run handle_one_response, it might return an 'error' when + * the client is blocked on writing, it's that client's job to wake us again. + */ +static void * +handle_responses( void *ctx, void *arg ) +{ + Connection *c = arg; + int responses_handled = 0; + + ldap_pvt_thread_mutex_lock( &c->c_mutex ); + for ( ; responses_handled < slap_conn_max_pdus_per_cycle; + responses_handled++ ) { + BerElement *ber; + ber_tag_t tag; + ber_len_t len; + + if ( handle_one_response( c ) ) { + /* Error, connection might already have been destroyed */ + return NULL; + } + /* Otherwise, handle_one_response leaves the connection locked */ + + if ( (ber = ber_alloc()) == NULL ) { + Debug( LDAP_DEBUG_ANY, "handle_responses: " + "ber_alloc failed\n" ); + upstream_destroy( c ); + return NULL; + } + c->c_currentber = ber; + + tag = ber_get_next( c->c_sb, &len, ber ); + if ( tag != LDAP_TAG_MESSAGE ) { + int err = sock_errno(); + + if ( err != EWOULDBLOCK && err != EAGAIN ) { + char ebuf[128]; + Debug( LDAP_DEBUG_ANY, "handle_responses: " + "ber_get_next on fd %d failed errno=%d (%s)\n", + c->c_fd, err, + sock_errstr( err, ebuf, sizeof(ebuf) ) ); + + c->c_currentber = NULL; + ber_free( ber, 1 ); + upstream_destroy( c ); + return NULL; + } + break; + } + } + + event_add( c->c_read_event, NULL ); + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + return NULL; +} + +/* + * Initial read on the upstream connection, if we get an LDAP PDU, submit the + * processing of this and successive ones to the work queue. + * + * If we can't submit it to the queue (overload), process this one and return + * to the event loop immediately after. + */ void upstream_read_cb( evutil_socket_t s, short what, void *arg ) { Connection *c = arg; BerElement *ber; ber_tag_t tag; - Operation *op, needle = { .o_upstream = c }; ber_len_t len; - int finished = 0; ldap_pvt_thread_mutex_lock( &c->c_mutex ); Debug( LDAP_DEBUG_CONNS, "upstream_read_cb: " @@ -48,6 +472,7 @@ upstream_read_cb( evutil_socket_t s, short what, void *arg ) ldap_pvt_thread_mutex_unlock( &c->c_mutex ); return; } + c->c_currentber = ber; tag = ber_get_next( c->c_sb, &len, ber ); if ( tag != LDAP_TAG_MESSAGE ) { @@ -60,84 +485,29 @@ upstream_read_cb( evutil_socket_t s, short what, void *arg ) c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) ); c->c_currentber = NULL; - goto fail; + ber_free( ber, 1 ); + upstream_destroy( c ); + return; } - c->c_currentber = ber; + event_add( c->c_read_event, NULL ); ldap_pvt_thread_mutex_unlock( &c->c_mutex ); return; } - c->c_currentber = NULL; - - tag = ber_get_int( ber, &needle.o_upstream_msgid ); - if ( tag != LDAP_TAG_MSGID || needle.o_upstream_msgid == 0 ) { - goto fail; - } - - op = tavl_find( c->c_ops, &needle, operation_upstream_cmp ); - if ( !op ) { - ber_free( ber, 1 ); - } else { - Connection *client = op->o_client; - BerElement *output; - BerValue response, controls; - ber_tag_t type; - - type = ber_skip_element( ber, &response ); - switch ( type ) { - case LDAP_RES_SEARCH_ENTRY: - case LDAP_RES_SEARCH_REFERENCE: - case LDAP_RES_INTERMEDIATE: - break; - default: - finished = 1; - tavl_delete( &c->c_ops, op, operation_upstream_cmp ); - break; + if ( !slap_conn_max_pdus_per_cycle || + ldap_pvt_thread_pool_submit( + &connection_pool, handle_responses, c ) ) { + /* If we're overloaded or configured as such, process one and resume in + * the next cycle */ + if ( handle_one_response( c ) == LDAP_SUCCESS ) { + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); } - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); - - tag = ber_peek_tag( ber, &len ); - if ( tag == LDAP_TAG_CONTROLS ) { - tag = ber_skip_element( ber, &controls ); - } - - output = ber_alloc(); - if ( !output ) { - goto fail; - } - - ber_start_seq( output, LDAP_TAG_MESSAGE ); - ber_put_int( output, op->o_client_msgid, LDAP_TAG_MSGID ); - ber_put_berval( output, &response, type ); - if ( tag == LDAP_TAG_CONTROLS ) { - ber_put_berval( output, &controls, LDAP_TAG_CONTROLS ); - } - ber_put_seq( output ); - - if ( finished ) { - ldap_pvt_thread_mutex_lock( &client->c_mutex ); - tavl_delete( &client->c_ops, op, operation_client_cmp ); - ldap_pvt_thread_mutex_unlock( &client->c_mutex ); - operation_destroy( op ); - } - - ldap_pvt_thread_mutex_lock( &client->c_io_mutex ); - client->c_pendingber = output; - ldap_pvt_thread_mutex_unlock( &client->c_io_mutex ); - - client_write_cb( -1, 0, client ); return; } + event_del( c->c_read_event ); ldap_pvt_thread_mutex_unlock( &c->c_mutex ); - return; -fail: - Debug( LDAP_DEBUG_ANY, "upstream_read_cb: " - "error on processing a response on upstream connection %ld\n", - c->c_connid ); - ber_free( ber, 1 ); - upstream_destroy( c ); } void @@ -205,6 +575,7 @@ upstream_bind_cb( evutil_socket_t s, short what, void *arg ) ldap_pvt_thread_mutex_unlock( &c->c_mutex ); return; } + c->c_currentber = ber; tag = ber_get_next( c->c_sb, &len, ber ); if ( tag != LDAP_TAG_MESSAGE ) { @@ -219,7 +590,6 @@ upstream_bind_cb( evutil_socket_t s, short what, void *arg ) c->c_currentber = NULL; goto fail; } - c->c_currentber = ber; ldap_pvt_thread_mutex_unlock( &c->c_mutex ); return; } @@ -409,7 +779,7 @@ fail: return NULL; } -static void +void upstream_destroy( Connection *c ) { Backend *b = c->c_private;