From 837a6068e0170fe24fec904eb06ae0fe6ed7c662 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Kuzn=C3=ADk?= Date: Wed, 3 May 2017 10:14:19 +0100 Subject: [PATCH] Rework client_read_cb along the lines of upstream --- servers/lloadd/bind.c | 53 +++++-------- servers/lloadd/client.c | 149 ++++++++++++++++++++++++---------- servers/lloadd/operation.c | 154 +++++++++++++++++++++++------------- servers/lloadd/proto-slap.h | 9 ++- 4 files changed, 227 insertions(+), 138 deletions(-) diff --git a/servers/lloadd/bind.c b/servers/lloadd/bind.c index 077167a594..53af3895d8 100644 --- a/servers/lloadd/bind.c +++ b/servers/lloadd/bind.c @@ -246,22 +246,14 @@ fail: return 1; } -void * -client_reset( void *ctx, void *arg ) +void +client_reset( Connection *c ) { - Operation *op = arg; - Connection *c = op->o_client; TAvlnode *root; - int freed, destroy = 1; + int freed; - CONNECTION_LOCK(c); root = c->c_ops; c->c_ops = NULL; - c->c_state = SLAP_C_CLOSING; - if ( op->o_tag == LDAP_REQ_BIND ) { - c->c_state = SLAP_C_BINDING; - destroy = 0; - } if ( !BER_BVISNULL( &c->c_auth ) ) { ch_free( c->c_auth.bv_val ); BER_BVZERO( &c->c_auth ); @@ -272,36 +264,28 @@ client_reset( void *ctx, void *arg ) } CONNECTION_UNLOCK_INCREF(c); - tavl_delete( &root, op, operation_client_cmp ); freed = tavl_free( root, (AVL_FREE)operation_abandon ); Debug( LDAP_DEBUG_TRACE, "client_reset: " "dropped %d operations\n", freed ); - if ( destroy ) { - operation_destroy( op ); - CLIENT_LOCK_DESTROY(c); - } else { - CONNECTION_LOCK_DECREF(c); - CLIENT_UNLOCK_OR_DESTROY(c); - } - - return NULL; + CONNECTION_LOCK_DECREF(c); } -void * -client_bind( void *ctx, void *arg ) +int +client_bind( Connection *client, Operation *op ) { - Operation *op = arg; - Connection *upstream, *client = op->o_client; - int rc = 0; + Connection *upstream; + int rc = LDAP_SUCCESS; - CONNECTION_LOCK(client); + /* protect the Bind operation */ + tavl_delete( &client->c_ops, op, operation_client_cmp ); + client->c_state = SLAP_C_BINDING; + + client_reset( client ); CONNECTION_UNLOCK_INCREF(client); - client_reset( ctx, arg ); - upstream = backend_select( op ); if ( !upstream ) { Debug( LDAP_DEBUG_STATS, "client_bind: " @@ -309,8 +293,7 @@ client_bind( void *ctx, void *arg ) operation_send_reject( op, LDAP_UNAVAILABLE, "no connections available", 1 ); CONNECTION_LOCK_DECREF(client); - CLIENT_UNLOCK_OR_DESTROY(client); - return NULL; + return rc; } op->o_upstream = upstream; @@ -324,15 +307,15 @@ client_bind( void *ctx, void *arg ) CONNECTION_LOCK_DECREF(upstream); UPSTREAM_UNLOCK_OR_DESTROY(upstream); + CONNECTION_LOCK_DECREF(client); if ( rc ) { - CLIENT_LOCK_DESTROY(client); - return NULL; + CLIENT_DESTROY(client); + return -1; } - CONNECTION_LOCK_DECREF(client); rc = tavl_insert( &client->c_ops, op, operation_client_cmp, avl_dup_error ); assert( rc == LDAP_SUCCESS ); CLIENT_UNLOCK_OR_DESTROY(client); - return NULL; + return rc; } diff --git a/servers/lloadd/client.c b/servers/lloadd/client.c index 4775dbb38c..f2dac9c7d5 100644 --- a/servers/lloadd/client.c +++ b/servers/lloadd/client.c @@ -24,15 +24,15 @@ #include "lutil.h" #include "slap.h" +typedef int (*RequestHandler)( Connection *c, Operation *op ); + static void client_read_cb( evutil_socket_t s, short what, void *arg ) { Connection *c = arg; BerElement *ber; - Operation *op = NULL; ber_tag_t tag; ber_len_t len; - int rc = 0; /* What if the shutdown is already in progress and we get to lock the * connection? */ @@ -47,8 +47,9 @@ client_read_cb( evutil_socket_t s, short what, void *arg ) Debug( LDAP_DEBUG_ANY, "client_read_cb: " "ber_alloc failed\n" ); CLIENT_DESTROY(c); - goto fail; + return; } + c->c_currentber = ber; tag = ber_get_next( c->c_sb, &len, ber ); if ( tag != LDAP_TAG_MESSAGE ) { @@ -61,72 +62,134 @@ client_read_cb( evutil_socket_t s, short what, void *arg ) c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) ); c->c_currentber = NULL; + ber_free( ber, 1 ); CLIENT_DESTROY(c); - goto fail; + return; } - c->c_currentber = ber; + event_add( c->c_read_event, NULL ); CONNECTION_UNLOCK(c); return; } + if ( !slap_conn_max_pdus_per_cycle || + ldap_pvt_thread_pool_submit( + &connection_pool, handle_requests, c ) ) { + /* If we're overloaded or configured as such, process one and resume in + * the next cycle. + * + * handle_one_request re-locks the mutex in the + * process, need to test it's still alive */ + if ( handle_one_request( c ) == LDAP_SUCCESS ) { + CLIENT_UNLOCK_OR_DESTROY(c); + } + return; + } + event_del( c->c_read_event ); + + CONNECTION_UNLOCK(c); + return; +} + +void * +handle_requests( void *ctx, void *arg ) +{ + Connection *c = arg; + int requests_handled = 0; + + CONNECTION_LOCK(c); + for ( ; requests_handled < slap_conn_max_pdus_per_cycle; + requests_handled++ ) { + BerElement *ber; + ber_tag_t tag; + ber_len_t len; + + /* handle_one_response may unlock the connection in the process, we + * need to expect that might be our responsibility to destroy it */ + if ( handle_one_request( c ) ) { + /* Error, connection is unlocked and might already have been + * destroyed */ + return NULL; + } + /* Otherwise, handle_one_request leaves the connection locked */ + + if ( (ber = ber_alloc()) == NULL ) { + Debug( LDAP_DEBUG_ANY, "client_read_cb: " + "ber_alloc failed\n" ); + CLIENT_DESTROY(c); + return NULL; + } + c->c_currentber = ber; + + tag = ber_get_next( c->c_sb, &len, ber ); + if ( tag != LDAP_TAG_MESSAGE ) { + int err = sock_errno(); + + if ( err != EWOULDBLOCK && err != EAGAIN ) { + char ebuf[128]; + Debug( LDAP_DEBUG_ANY, "handle_requests: " + "ber_get_next on fd %d failed errno=%d (%s)\n", + c->c_fd, err, + sock_errstr( err, ebuf, sizeof(ebuf) ) ); + + c->c_currentber = NULL; + ber_free( ber, 1 ); + CLIENT_DESTROY(c); + return NULL; + } + break; + } + } + + event_add( c->c_read_event, NULL ); + CLIENT_UNLOCK_OR_DESTROY(c); + return NULL; +} + +int +handle_one_request( Connection *c ) +{ + BerElement *ber; + Operation *op = NULL; + RequestHandler handler = NULL; + + ber = c->c_currentber; c->c_currentber = NULL; op = operation_init( c, ber ); if ( !op ) { - Debug( LDAP_DEBUG_ANY, "client_read_cb: " + Debug( LDAP_DEBUG_ANY, "handle_one_request: " "operation_init failed\n" ); CLIENT_DESTROY(c); - goto fail; + ber_free( ber, 1 ); + return -1; } switch ( op->o_tag ) { case LDAP_REQ_UNBIND: - /* We do not expect anything more from the client. Also, we are the - * read event, so don't need to unlock */ - event_del( c->c_read_event ); - - rc = ldap_pvt_thread_pool_submit( - &connection_pool, client_reset, op ); - if ( rc ) { - CONNECTION_UNLOCK(c); - client_reset( NULL, op ); - return; - } - break; + c->c_state = SLAP_C_CLOSING; + CLIENT_DESTROY(c); + return -1; case LDAP_REQ_BIND: - rc = ldap_pvt_thread_pool_submit( - &connection_pool, client_bind, op ); + handler = client_bind; + break; + case LDAP_REQ_ABANDON: + /* FIXME: We need to be able to abandon a Bind request, handling + * ExOps (esp. Cancel) will be different */ + handler = request_abandon; break; default: if ( c->c_state == SLAP_C_BINDING ) { - CONNECTION_UNLOCK(c); + CONNECTION_UNLOCK_INCREF(c); operation_send_reject( op, LDAP_PROTOCOL_ERROR, "bind in progress", 0 ); - return; + CONNECTION_LOCK_DECREF(c); + return LDAP_SUCCESS; } - rc = ldap_pvt_thread_pool_submit( - &connection_pool, request_process, op ); + handler = request_process; break; } - /* FIXME: unlocks in this function need more thought when we refcount - * operations */ - CONNECTION_UNLOCK(c); - - if ( !rc ) { - return; - } - -fail: - if ( op ) { - operation_send_reject( - op, LDAP_OTHER, "server error or overloaded", 1 ); - operation_destroy( op ); - } else if ( ber ) { - ber_free( ber, 1 ); - } - - return; + return handler( c, op ); } void diff --git a/servers/lloadd/operation.c b/servers/lloadd/operation.c index c0af76e767..b0996b2ff5 100644 --- a/servers/lloadd/operation.c +++ b/servers/lloadd/operation.c @@ -204,62 +204,93 @@ fail: void operation_abandon( Operation *op ) { + Connection *c = op->o_upstream; + BerElement *ber; + Backend *b; int rc; - if ( op->o_upstream ) { - Connection *c = op->o_upstream; - BerElement *ber; - Backend *b; + if ( !c ) { + c = op->o_client; CONNECTION_LOCK(c); - rc = ( tavl_delete( &c->c_ops, op, operation_upstream_cmp ) == NULL ); - if ( !rc ) { - c->c_n_ops_executing--; - } - b = (Backend *)c->c_private; - CONNECTION_UNLOCK_INCREF(c); - - if ( rc ) { - /* The operation has already been abandoned or finished */ - goto done; - } - - ldap_pvt_thread_mutex_lock( &b->b_mutex ); - b->b_n_ops_executing--; - ldap_pvt_thread_mutex_unlock( &b->b_mutex ); - - ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); - - ber = c->c_pendingber; - if ( ber == NULL && (ber = ber_alloc()) == NULL ) { - Debug( LDAP_DEBUG_ANY, "operation_abandon: " - "ber_alloc failed\n" ); - CONNECTION_LOCK_DECREF(c); - ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); - UPSTREAM_UNLOCK_OR_DESTROY(c); - goto done; - } - c->c_pendingber = ber; - - rc = ber_printf( ber, "t{titi}", LDAP_TAG_MESSAGE, - LDAP_TAG_MSGID, c->c_next_msgid++, - LDAP_REQ_ABANDON, op->o_upstream_msgid ); - - if ( rc == -1 ) { - ber_free( ber, 1 ); - } - - CONNECTION_LOCK_DECREF(c); - ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); - UPSTREAM_UNLOCK_OR_DESTROY(c); - - if ( rc != -1 ) { - upstream_write_cb( -1, 0, c ); - } + CLIENT_UNLOCK_OR_DESTROY(c); + operation_destroy( op ); + return; } + CONNECTION_LOCK(c); + if ( tavl_delete( &c->c_ops, op, operation_upstream_cmp ) == NULL ) { + /* The operation has already been abandoned or finished */ + goto done; + } + c->c_n_ops_executing--; + b = (Backend *)c->c_private; + CONNECTION_UNLOCK_INCREF(c); + + ldap_pvt_thread_mutex_lock( &b->b_mutex ); + b->b_n_ops_executing--; + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + + ldap_pvt_thread_mutex_lock( &c->c_io_mutex ); + + ber = c->c_pendingber; + if ( ber == NULL && (ber = ber_alloc()) == NULL ) { + Debug( LDAP_DEBUG_ANY, "operation_abandon: " + "ber_alloc failed\n" ); + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + CONNECTION_LOCK_DECREF(c); + goto done; + } + c->c_pendingber = ber; + + rc = ber_printf( ber, "t{titi}", LDAP_TAG_MESSAGE, + LDAP_TAG_MSGID, c->c_next_msgid++, + LDAP_REQ_ABANDON, op->o_upstream_msgid ); + + if ( rc == -1 ) { + ber_free( ber, 1 ); + c->c_pendingber = NULL; + } + + ldap_pvt_thread_mutex_unlock( &c->c_io_mutex ); + + if ( rc != -1 ) { + upstream_write_cb( -1, 0, c ); + } + + CONNECTION_LOCK_DECREF(c); +done: + UPSTREAM_UNLOCK_OR_DESTROY(c); + operation_destroy( op ); +} + +int +request_abandon( Connection *c, Operation *op ) +{ + Operation *request, needle = { .o_client = c }; + ber_tag_t tag; + int rc = -1; + + tag = ber_get_int( op->o_ber, &needle.o_client_msgid ); + if ( tag != LDAP_REQ_ABANDON ) { + /* How would that happen if we already got the tag for the op? */ + assert(0); + goto done; + } + + request = tavl_find( c->c_ops, &needle, operation_client_cmp ); + if ( !request ) { + goto done; + } + + CONNECTION_UNLOCK_INCREF(c); + operation_abandon( request ); + CONNECTION_LOCK_DECREF(c); + + rc = LDAP_SUCCESS; done: operation_destroy( op ); + return rc; } void @@ -317,14 +348,15 @@ operation_lost_upstream( Operation *op ) operation_destroy( op ); } -void * -request_process( void *ctx, void *arg ) +int +request_process( Connection *client, Operation *op ) { - Operation *op = arg; BerElement *output; - Connection *client = op->o_client, *upstream; + Connection *upstream; ber_int_t msgid; - int rc; + int rc = LDAP_SUCCESS; + + CONNECTION_UNLOCK_INCREF(client); upstream = backend_select( op ); if ( !upstream ) { @@ -346,10 +378,16 @@ request_process( void *ctx, void *arg ) rc = tavl_insert( &upstream->c_ops, op, operation_upstream_cmp, avl_dup_error ); CONNECTION_UNLOCK_INCREF(upstream); + + Debug( LDAP_DEBUG_TRACE, "request_process: " + "client connid=%lu added %s msgid=%d to upstream connid=%lu as " + "msgid=%d\n", + op->o_client_connid, slap_msgtype2str( op->o_tag ), + op->o_client_msgid, op->o_upstream_connid, op->o_upstream_msgid ); assert( rc == LDAP_SUCCESS ); if ( lload_features & LLOAD_FEATURE_PROXYAUTHZ ) { - CONNECTION_LOCK(client); + CONNECTION_LOCK_DECREF(client); Debug( LDAP_DEBUG_TRACE, "request_process: " "proxying identity %s to upstream\n", client->c_auth.bv_val ); @@ -358,7 +396,7 @@ request_process( void *ctx, void *arg ) op->o_tag, &op->o_request, LDAP_TAG_CONTROLS, LDAP_CONTROL_PROXY_AUTHZ, 1, &client->c_auth ); - CONNECTION_UNLOCK(client); + CONNECTION_UNLOCK_INCREF(client); if ( !BER_BVISNULL( &op->o_ctrls ) ) { BerElement *control_ber = ber_alloc(); @@ -387,7 +425,8 @@ request_process( void *ctx, void *arg ) CONNECTION_LOCK_DECREF(upstream); UPSTREAM_UNLOCK_OR_DESTROY(upstream); - return NULL; + CONNECTION_LOCK_DECREF(client); + return rc; fail: if ( upstream ) { @@ -396,5 +435,6 @@ fail: UPSTREAM_UNLOCK_OR_DESTROY(upstream); } operation_send_reject( op, LDAP_OTHER, "internal error", 0 ); - return NULL; + CONNECTION_LOCK_DECREF(client); + return rc; } diff --git a/servers/lloadd/proto-slap.h b/servers/lloadd/proto-slap.h index a945b797b8..f949ef546e 100644 --- a/servers/lloadd/proto-slap.h +++ b/servers/lloadd/proto-slap.h @@ -63,12 +63,14 @@ LDAP_SLAPD_F (void) ch_free( void * ); /* * bind.c */ -LDAP_SLAPD_F (void *) client_reset( void *ctx, void *arg ); -LDAP_SLAPD_F (void *) client_bind( void *ctx, void *arg ); +LDAP_SLAPD_F (void) client_reset( Connection *c ); +LDAP_SLAPD_F (int) client_bind( Connection *c, Operation *op ); /* * client.c */ +LDAP_SLAPD_F (void *) handle_requests( void *ctx, void *arg ); +LDAP_SLAPD_F (int) handle_one_request( Connection *c ); LDAP_SLAPD_F (Connection *) client_init( ber_socket_t s, Listener *url, const char *peername, struct event_base *base, int use_tls ); LDAP_SLAPD_F (void) client_write_cb( evutil_socket_t s, short what, void *arg ); LDAP_SLAPD_F (void) client_destroy( Connection *c ); @@ -154,7 +156,8 @@ LDAP_SLAPD_F (void) operation_abandon( Operation *op ); LDAP_SLAPD_F (void) operation_send_reject( Operation *op, int result, const char *msg, int send_anyway ); LDAP_SLAPD_F (void) operation_lost_upstream( Operation *op ); LDAP_SLAPD_F (void) operation_destroy( Operation *op ); -LDAP_SLAPD_F (void *) request_process( void *ctx, void *arg ); +LDAP_SLAPD_F (int) request_abandon( Connection *c, Operation *op ); +LDAP_SLAPD_F (int) request_process( Connection *c, Operation *op ); /* * sl_malloc.c