From aecc62c08e7944689af624eb07cb581a226c5ce2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Kuzn=C3=ADk?= Date: Wed, 22 Nov 2017 13:05:11 +0000 Subject: [PATCH] Introduce operation timeout machinery --- servers/lloadd/config.c | 17 +++- servers/lloadd/daemon.c | 25 ++++- servers/lloadd/operation.c | 187 ++++++++++++++++++++++++++++++++++++ servers/lloadd/proto-slap.h | 4 + 4 files changed, 227 insertions(+), 6 deletions(-) diff --git a/servers/lloadd/config.c b/servers/lloadd/config.c index 0431489e22..949158b559 100644 --- a/servers/lloadd/config.c +++ b/servers/lloadd/config.c @@ -69,7 +69,8 @@ char *global_host = NULL; static FILE *logfile; static char *logfileName; -static struct timeval timeout_net_tv, timeout_write_tv = { 10, 0 }; +static struct timeval timeout_api_tv, timeout_net_tv, + timeout_write_tv = { 10, 0 }; lload_features_t lload_features; @@ -78,6 +79,7 @@ ber_len_t sockbuf_max_incoming_upstream = LLOAD_SB_MAX_INCOMING_UPSTREAM; int slap_conn_max_pdus_per_cycle = LLOAD_CONN_MAX_PDUS_PER_CYCLE_DEFAULT; +struct timeval *lload_timeout_api = NULL; struct timeval *lload_timeout_net = NULL; struct timeval *lload_write_timeout = &timeout_write_tv; @@ -665,6 +667,19 @@ config_bindconf( ConfigArgs *c ) *ptr = '\0'; } + if ( bindconf.sb_timeout_api ) { + timeout_api_tv.tv_sec = bindconf.sb_timeout_api; + lload_timeout_api = &timeout_api_tv; + if ( lload_timeout_event ) { + event_add( lload_timeout_event, lload_timeout_api ); + } + } else { + lload_timeout_api = NULL; + if ( lload_timeout_event ) { + event_del( lload_timeout_event ); + } + } + if ( bindconf.sb_timeout_net ) { timeout_net_tv.tv_sec = bindconf.sb_timeout_net; lload_timeout_net = &timeout_net_tv; diff --git a/servers/lloadd/daemon.c b/servers/lloadd/daemon.c index ec5b0453a5..5a7e33b9fd 100644 --- a/servers/lloadd/daemon.c +++ b/servers/lloadd/daemon.c @@ -83,6 +83,8 @@ static ldap_pvt_thread_t listener_tid, *daemon_tid; struct evdns_base *dnsbase; +struct event *lload_timeout_event; + #ifndef SLAPD_LISTEN_BACKLOG #define SLAPD_LISTEN_BACKLOG 1024 #endif /* ! SLAPD_LISTEN_BACKLOG */ @@ -1234,6 +1236,7 @@ slapd_daemon( struct event_base *daemon_base ) int i, rc; Backend *b; struct event_base *base; + struct event *event; assert( daemon_base != NULL ); @@ -1280,19 +1283,31 @@ slapd_daemon( struct event_base *daemon_base ) current_backend = LDAP_CIRCLEQ_FIRST( &backend ); LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) { - struct event *retry_event = - evtimer_new( daemon_base, backend_connect, b ); - - if ( !retry_event ) { + event = evtimer_new( daemon_base, backend_connect, b ); + if ( !event ) { Debug( LDAP_DEBUG_ANY, "lloadd: " "failed to allocate retry event\n" ); return -1; } - b->b_retry_event = retry_event; + b->b_retry_event = event; backend_retry( b ); } + event = evtimer_new( daemon_base, operations_timeout, event_self_cbarg() ); + if ( !event ) { + Debug( LDAP_DEBUG_ANY, "lloadd: " + "failed to allocate timeout event\n" ); + return -1; + } + lload_timeout_event = event; + + /* TODO: should we just add it with any timeout and re-add when the timeout + * changes? */ + if ( lload_timeout_api ) { + event_add( event, lload_timeout_api ); + } + lloadd_inited = 1; rc = event_base_dispatch( daemon_base ); Debug( LDAP_DEBUG_ANY, "lloadd shutdown: " diff --git a/servers/lloadd/operation.c b/servers/lloadd/operation.c index a52d880dd1..f65c030ea0 100644 --- a/servers/lloadd/operation.c +++ b/servers/lloadd/operation.c @@ -739,3 +739,190 @@ operation_lost_upstream( Operation *op ) operation_destroy_from_upstream( op ); CONNECTION_UNLOCK(c); } + +void +connection_timeout( Connection *upstream, time_t threshold ) +{ + Operation *op; + TAvlnode *ops = NULL, *node; + Backend *b = upstream->c_private; + int rc, nops = 0; + + for ( node = tavl_end( upstream->c_ops, TAVL_DIR_LEFT ); node && + ((Operation *)node->avl_data)->o_start < threshold; /* shortcut */ + node = tavl_next( node, TAVL_DIR_RIGHT ) ) { + Operation *found_op; + + op = node->avl_data; + + /* Have we received another response since? */ + if ( op->o_last_response && op->o_last_response >= threshold ) { + continue; + } + + op->o_upstream_refcnt++; + found_op = tavl_delete( &upstream->c_ops, op, operation_upstream_cmp ); + assert( op == found_op ); + + rc = tavl_insert( &ops, op, operation_upstream_cmp, avl_dup_error ); + assert( rc == LDAP_SUCCESS ); + + Debug( LDAP_DEBUG_STATS2, "connection_timeout: " + "timing out %s from connid=%lu msgid=%d sent to connid=%lu as " + "msgid=%d\n", + slap_msgtype2str( op->o_tag ), op->o_client_connid, + op->o_client_msgid, op->o_upstream_connid, + op->o_upstream_msgid ); + nops++; + } + + if ( nops == 0 ) { + return; + } + upstream->c_n_ops_executing -= nops; + Debug( LDAP_DEBUG_STATS, "connection_timeout: " + "timing out %d operations for connid=%lu\n", + nops, upstream->c_connid ); + CONNECTION_UNLOCK_INCREF(upstream); + + ldap_pvt_thread_mutex_lock( &b->b_mutex ); + b->b_n_ops_executing -= nops; + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + + for ( node = tavl_end( ops, TAVL_DIR_LEFT ); node; + node = tavl_next( node, TAVL_DIR_RIGHT ) ) { + Connection *client; + + op = node->avl_data; + + ldap_pvt_thread_mutex_lock( &op->o_link_mutex ); + client = op->o_client; + if ( !client ) { + ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); + continue; + } + CONNECTION_LOCK(client); + ldap_pvt_thread_mutex_unlock( &op->o_link_mutex ); + + /* operation_send_reject_locked unlocks and destroys client on + * failure */ + if ( operation_send_reject_locked( op, + op->o_tag == LDAP_REQ_SEARCH ? LDAP_TIMELIMIT_EXCEEDED : + LDAP_ADMINLIMIT_EXCEEDED, + "upstream did not respond in time", 0 ) == LDAP_SUCCESS ) { + CONNECTION_UNLOCK_OR_DESTROY(client); + } + + if ( rc == LDAP_SUCCESS ) { + rc = operation_send_abandon( op ); + } + + CONNECTION_LOCK(upstream); + op->o_upstream_refcnt--; + operation_destroy_from_upstream( op ); + CONNECTION_UNLOCK(upstream); + } + + /* TODO: if operation_send_abandon failed, we need to kill the upstream */ + if ( rc == LDAP_SUCCESS ) { + connection_write_cb( -1, 0, upstream ); + } + + CONNECTION_LOCK_DECREF(upstream); + /* just dispose of the AVL, most operations should already be gone */ + tavl_free( ops, NULL ); +} + +static void +backend_timeout( + Backend *b, + struct ConnSt *cq, + Connection **lastp, + time_t threshold ) +{ + Connection *c, *old; + unsigned long last_connid; + + ldap_pvt_thread_mutex_lock( &b->b_mutex ); + if ( !*lastp ) { + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + return; + } + last_connid = (*lastp)->c_connid; + c = LDAP_CIRCLEQ_LOOP_NEXT( cq, *lastp, c_next ); + CONNECTION_LOCK(c); + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + + /* + * Ugh... concurrency is annoying: + * - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid + * order + * - the connection with the highest c_connid is maintained at *lastp + * - we can only use cq when we hold b->b_mutex + * - connections might be added to or removed from cq while we're busy + * processing connections + * - connection_destroy touches cq + * - we can't even hold locks of two different connections + * - we need a way to detect we've finished looping around cq for some + * definition of looping around + * + * So as a result, 90% of the code below is spent navigating that... + */ + while ( c->c_connid <= last_connid ) { + Debug( LDAP_DEBUG_TRACE, "backend_timeout: " + "timing out operations for connid=%lu which has %ld " + "pending ops\n", + c->c_connid, c->c_n_ops_executing ); + connection_timeout( c, threshold ); + if ( c->c_connid == last_connid ) { + break; + } + + CONNECTION_UNLOCK_INCREF(c); + + ldap_pvt_thread_mutex_lock( &b->b_mutex ); + old = c; + c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next ); + CONNECTION_LOCK(c); + CONNECTION_UNLOCK_INCREF(c); + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + + CONNECTION_LOCK_DECREF(old); + CONNECTION_UNLOCK_OR_DESTROY(old); + + CONNECTION_LOCK_DECREF(c); + } + CONNECTION_UNLOCK_OR_DESTROY(c); +} + +void +operations_timeout( evutil_socket_t s, short what, void *arg ) +{ + struct event *self = arg; + Backend *b; + time_t threshold; + + Debug( LDAP_DEBUG_TRACE, "operations_timeout: " + "running timeout task\n" ); + if ( !lload_timeout_api ) goto done; + + threshold = slap_get_time() - lload_timeout_api->tv_sec; + + LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) { + if ( b->b_n_ops_executing == 0 ) continue; + + Debug( LDAP_DEBUG_TRACE, "operations_timeout: " + "timing out binds for backend uri=%s\n", + b->b_uri.bv_val ); + backend_timeout( b, &b->b_bindconns, &b->b_last_bindconn, threshold ); + + Debug( LDAP_DEBUG_TRACE, "operations_timeout: " + "timing out other operations for backend uri=%s\n", + b->b_uri.bv_val ); + backend_timeout( b, &b->b_conns, &b->b_last_conn, threshold ); + } +done: + Debug( LDAP_DEBUG_TRACE, "operations_timeout: " + "timeout task finished\n" ); + evtimer_add( self, lload_timeout_api ); +} diff --git a/servers/lloadd/proto-slap.h b/servers/lloadd/proto-slap.h index 5f797cac78..144cbb4590 100644 --- a/servers/lloadd/proto-slap.h +++ b/servers/lloadd/proto-slap.h @@ -122,6 +122,8 @@ LDAP_SLAPD_V (int) slapd_tcp_rmem; LDAP_SLAPD_V (int) slapd_tcp_wmem; #endif /* LDAP_TCP_BUFFER */ +LDAP_SLAPD_V (struct event *) lload_timeout_event; + #define bvmatch( bv1, bv2 ) \ ( ( (bv1)->bv_len == (bv2)->bv_len ) && \ ( memcmp( (bv1)->bv_val, (bv2)->bv_val, (bv1)->bv_len ) == 0 ) ) @@ -178,6 +180,7 @@ LDAP_SLAPD_F (int) operation_send_reject_locked( Operation *op, int result, cons LDAP_SLAPD_F (void) operation_lost_upstream( Operation *op ); LDAP_SLAPD_F (void) operation_destroy_from_client( Operation *op ); LDAP_SLAPD_F (void) operation_destroy_from_upstream( Operation *op ); +LDAP_SLAPD_F (void) operations_timeout( evutil_socket_t s, short what, void *arg ); /* * sl_malloc.c @@ -255,6 +258,7 @@ LDAP_SLAPD_V (const char) Versionstr[]; LDAP_SLAPD_V (int) global_gentlehup; LDAP_SLAPD_V (int) global_idletimeout; +LDAP_SLAPD_V (struct timeval *) lload_timeout_api; LDAP_SLAPD_V (struct timeval *) lload_timeout_net; LDAP_SLAPD_V (struct timeval *) lload_write_timeout;