mirror of
https://git.openldap.org/openldap/openldap.git
synced 2025-12-27 01:59:38 -05:00
Simplify write waiter handling
Writer threads do their own wait using select/poll instead of asking the listener thread. Eliminates one mutex+one condvar per conn plus multiple wakeups of the listener thread. Also fixes writetimeout to wait an exact time, instead of the approximation used in the listener thread.
This commit is contained in:
parent
7d6d6944c5
commit
88d22a1ca3
4 changed files with 75 additions and 82 deletions
|
|
@ -154,9 +154,7 @@ int connections_destroy(void)
|
|||
ber_sockbuf_free( connections[i].c_sb );
|
||||
ldap_pvt_thread_mutex_destroy( &connections[i].c_mutex );
|
||||
ldap_pvt_thread_mutex_destroy( &connections[i].c_write1_mutex );
|
||||
ldap_pvt_thread_mutex_destroy( &connections[i].c_write2_mutex );
|
||||
ldap_pvt_thread_cond_destroy( &connections[i].c_write1_cv );
|
||||
ldap_pvt_thread_cond_destroy( &connections[i].c_write2_cv );
|
||||
#ifdef LDAP_SLAPI
|
||||
if ( slapi_plugins_used ) {
|
||||
slapi_int_free_object_extensions( SLAPI_X_EXT_CONNECTION,
|
||||
|
|
@ -211,17 +209,13 @@ int connections_timeout_idle(time_t now)
|
|||
int i = 0, writers = 0;
|
||||
ber_socket_t connindex;
|
||||
Connection* c;
|
||||
time_t old;
|
||||
|
||||
old = slapd_get_writetime();
|
||||
|
||||
for( c = connection_first( &connindex );
|
||||
c != NULL;
|
||||
c = connection_next( c, &connindex ) )
|
||||
{
|
||||
/* Don't timeout a slow-running request or a persistent
|
||||
* outbound connection. But if it has a writewaiter, see
|
||||
* if the waiter has been there too long.
|
||||
* outbound connection.
|
||||
*/
|
||||
if(( c->c_n_ops_executing && !c->c_writewaiter)
|
||||
|| c->c_conn_state == SLAP_C_CLIENT ) {
|
||||
|
|
@ -236,20 +230,8 @@ int connections_timeout_idle(time_t now)
|
|||
i++;
|
||||
continue;
|
||||
}
|
||||
if ( c->c_writewaiter && global_writetimeout ) {
|
||||
writers = 1;
|
||||
if( difftime( c->c_activitytime+global_writetimeout, now) < 0 ) {
|
||||
/* close it */
|
||||
connection_closing( c, "writetimeout" );
|
||||
connection_close( c );
|
||||
i++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
connection_done( c );
|
||||
if ( old && !writers )
|
||||
slapd_clr_writetime( old );
|
||||
|
||||
return i;
|
||||
}
|
||||
|
|
@ -420,9 +402,7 @@ Connection * connection_init(
|
|||
/* should check status of thread calls */
|
||||
ldap_pvt_thread_mutex_init( &c->c_mutex );
|
||||
ldap_pvt_thread_mutex_init( &c->c_write1_mutex );
|
||||
ldap_pvt_thread_mutex_init( &c->c_write2_mutex );
|
||||
ldap_pvt_thread_cond_init( &c->c_write1_cv );
|
||||
ldap_pvt_thread_cond_init( &c->c_write2_cv );
|
||||
|
||||
#ifdef LDAP_SLAPI
|
||||
if ( slapi_plugins_used ) {
|
||||
|
|
@ -780,10 +760,7 @@ connection_wake_writers( Connection *c )
|
|||
ldap_pvt_thread_cond_broadcast( &c->c_write1_cv );
|
||||
ldap_pvt_thread_mutex_unlock( &c->c_write1_mutex );
|
||||
if ( c->c_writewaiter ) {
|
||||
ldap_pvt_thread_mutex_lock( &c->c_write2_mutex );
|
||||
ldap_pvt_thread_cond_signal( &c->c_write2_cv );
|
||||
slapd_clr_write( c->c_sd, 1 );
|
||||
ldap_pvt_thread_mutex_unlock( &c->c_write2_mutex );
|
||||
slapd_shutsock( c->c_sd );
|
||||
}
|
||||
ldap_pvt_thread_mutex_lock( &c->c_write1_mutex );
|
||||
while ( c->c_writers ) {
|
||||
|
|
@ -1311,11 +1288,6 @@ int connection_read_activate( ber_socket_t s )
|
|||
if ( rc )
|
||||
return rc;
|
||||
|
||||
/* Don't let blocked writers block a pause request */
|
||||
if ( connections[s].c_writewaiter &&
|
||||
ldap_pvt_thread_pool_pausing( &connection_pool ))
|
||||
connection_wake_writers( &connections[s] );
|
||||
|
||||
rc = ldap_pvt_thread_pool_submit( &connection_pool,
|
||||
connection_read_thread, (void *)(long)s );
|
||||
|
||||
|
|
@ -1918,6 +1890,7 @@ int connection_write(ber_socket_t s)
|
|||
{
|
||||
Connection *c;
|
||||
Operation *op;
|
||||
int wantwrite;
|
||||
|
||||
assert( connections != NULL );
|
||||
|
||||
|
|
@ -1944,14 +1917,13 @@ int connection_write(ber_socket_t s)
|
|||
Debug( LDAP_DEBUG_TRACE,
|
||||
"connection_write(%d): waking output for id=%lu\n",
|
||||
s, c->c_connid, 0 );
|
||||
ldap_pvt_thread_mutex_lock( &c->c_write2_mutex );
|
||||
ldap_pvt_thread_cond_signal( &c->c_write2_cv );
|
||||
ldap_pvt_thread_mutex_unlock( &c->c_write2_mutex );
|
||||
|
||||
if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_READ, NULL ) ) {
|
||||
slapd_set_read( s, 1 );
|
||||
wantwrite = ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_WRITE, NULL );
|
||||
if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_READ, NULL )) {
|
||||
/* don't wakeup twice */
|
||||
slapd_set_read( s, !wantwrite );
|
||||
}
|
||||
if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_WRITE, NULL ) ) {
|
||||
if ( wantwrite ) {
|
||||
slapd_set_write( s, 1 );
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -41,6 +41,10 @@
|
|||
|
||||
#include "ldap_rq.h"
|
||||
|
||||
#ifdef HAVE_POLL
|
||||
#include <poll.h>
|
||||
#endif
|
||||
|
||||
#if defined(HAVE_SYS_EPOLL_H) && defined(HAVE_EPOLL)
|
||||
# include <sys/epoll.h>
|
||||
#elif defined(SLAP_X_DEVPOLL) && defined(HAVE_SYS_DEVPOLL_H) && defined(HAVE_DEVPOLL)
|
||||
|
|
@ -97,8 +101,6 @@ static ldap_pvt_thread_t *listener_tid;
|
|||
static ber_socket_t wake_sds[SLAPD_MAX_DAEMON_THREADS][2];
|
||||
static int emfile;
|
||||
|
||||
static time_t chk_writetime;
|
||||
|
||||
static volatile int waking;
|
||||
#ifdef NO_THREADS
|
||||
#define WAKE_LISTENER(l,w) do { \
|
||||
|
|
@ -964,14 +966,6 @@ slapd_set_write( ber_socket_t s, int wake )
|
|||
SLAP_SOCK_SET_WRITE( id, s );
|
||||
slap_daemon[id].sd_nwriters++;
|
||||
}
|
||||
if (( wake & 2 ) && global_writetimeout && !chk_writetime ) {
|
||||
if (id)
|
||||
ldap_pvt_thread_mutex_lock( &slap_daemon[0].sd_mutex );
|
||||
if (!chk_writetime)
|
||||
chk_writetime = slap_get_time();
|
||||
if (id)
|
||||
ldap_pvt_thread_mutex_unlock( &slap_daemon[0].sd_mutex );
|
||||
}
|
||||
|
||||
ldap_pvt_thread_mutex_unlock( &slap_daemon[id].sd_mutex );
|
||||
WAKE_LISTENER(id,wake);
|
||||
|
|
@ -1011,25 +1005,6 @@ slapd_set_read( ber_socket_t s, int wake )
|
|||
WAKE_LISTENER(id,wake);
|
||||
}
|
||||
|
||||
time_t
|
||||
slapd_get_writetime()
|
||||
{
|
||||
time_t cur;
|
||||
ldap_pvt_thread_mutex_lock( &slap_daemon[0].sd_mutex );
|
||||
cur = chk_writetime;
|
||||
ldap_pvt_thread_mutex_unlock( &slap_daemon[0].sd_mutex );
|
||||
return cur;
|
||||
}
|
||||
|
||||
void
|
||||
slapd_clr_writetime( time_t old )
|
||||
{
|
||||
ldap_pvt_thread_mutex_lock( &slap_daemon[0].sd_mutex );
|
||||
if ( chk_writetime == old )
|
||||
chk_writetime = 0;
|
||||
ldap_pvt_thread_mutex_unlock( &slap_daemon[0].sd_mutex );
|
||||
}
|
||||
|
||||
static void
|
||||
slapd_close( ber_socket_t s )
|
||||
{
|
||||
|
|
@ -1041,6 +1016,14 @@ slapd_close( ber_socket_t s )
|
|||
#endif
|
||||
}
|
||||
|
||||
void
|
||||
slapd_shutsock( ber_socket_t s )
|
||||
{
|
||||
Debug( LDAP_DEBUG_CONNS, "daemon: shutdown socket %ld\n",
|
||||
(long) s, 0, 0 );
|
||||
shutdown( SLAP_FD2SOCK(s), 2 );
|
||||
}
|
||||
|
||||
static void
|
||||
slap_free_listener_addresses( struct sockaddr **sal )
|
||||
{
|
||||
|
|
@ -2365,18 +2348,13 @@ loop:
|
|||
|
||||
now = slap_get_time();
|
||||
|
||||
if ( !tid && ( global_idletimeout > 0 || chk_writetime )) {
|
||||
if ( !tid && ( global_idletimeout > 0 )) {
|
||||
int check = 0;
|
||||
/* Set the select timeout.
|
||||
* Don't just truncate, preserve the fractions of
|
||||
* seconds to prevent sleeping for zero time.
|
||||
*/
|
||||
if ( chk_writetime ) {
|
||||
tv.tv_sec = global_writetimeout;
|
||||
tv.tv_usec = 0;
|
||||
if ( difftime( chk_writetime, now ) < 0 )
|
||||
check = 2;
|
||||
} else {
|
||||
{
|
||||
tv.tv_sec = global_idletimeout / SLAPD_IDLE_CHECK_LIMIT;
|
||||
tv.tv_usec = global_idletimeout - \
|
||||
( tv.tv_sec * SLAPD_IDLE_CHECK_LIMIT );
|
||||
|
|
@ -2453,7 +2431,7 @@ loop:
|
|||
|
||||
nfds = SLAP_EVENT_MAX(tid);
|
||||
|
||||
if (( chk_writetime || global_idletimeout ) && slap_daemon[tid].sd_nactives ) at = 1;
|
||||
if (( global_idletimeout ) && slap_daemon[tid].sd_nactives ) at = 1;
|
||||
|
||||
ldap_pvt_thread_mutex_unlock( &slap_daemon[tid].sd_mutex );
|
||||
|
||||
|
|
@ -3083,3 +3061,35 @@ slap_wake_listener()
|
|||
{
|
||||
WAKE_LISTENER(0,1);
|
||||
}
|
||||
|
||||
/* return 0 on timeout, 1 on writer ready
|
||||
* -1 on general error
|
||||
*/
|
||||
int
|
||||
slapd_wait_writer( ber_socket_t sd )
|
||||
{
|
||||
#ifdef HAVE_WINSOCK
|
||||
fd_set writefds;
|
||||
struct timeval tv, *tvp;
|
||||
int i;
|
||||
|
||||
FD_ZERO( &writefds );
|
||||
FD_SET( slapd_ws_sockets[sd], &writefds );
|
||||
if ( global_writetimeout ) {
|
||||
tv.tv_sec = global_writetimeout;
|
||||
tv.tv_usec = 0;
|
||||
tvp = &tv;
|
||||
} else {
|
||||
tv = NULL;
|
||||
}
|
||||
return select( 0, NULL, &writefds, NULL, tvp );
|
||||
#else
|
||||
struct pollfd fds;
|
||||
int timeout = global_writetimeout ? global_writetimeout * 1000 : -1;
|
||||
|
||||
fds.fd = sd;
|
||||
fds.events = POLLOUT;
|
||||
|
||||
return poll( &fds, 1, timeout );
|
||||
#endif
|
||||
}
|
||||
|
|
|
|||
|
|
@ -877,8 +877,8 @@ LDAP_SLAPD_F (void) slapd_set_write LDAP_P((ber_socket_t s, int wake));
|
|||
LDAP_SLAPD_F (void) slapd_clr_write LDAP_P((ber_socket_t s, int wake));
|
||||
LDAP_SLAPD_F (void) slapd_set_read LDAP_P((ber_socket_t s, int wake));
|
||||
LDAP_SLAPD_F (int) slapd_clr_read LDAP_P((ber_socket_t s, int wake));
|
||||
LDAP_SLAPD_F (void) slapd_clr_writetime LDAP_P((time_t old));
|
||||
LDAP_SLAPD_F (time_t) slapd_get_writetime LDAP_P((void));
|
||||
LDAP_SLAPD_F (int) slapd_wait_writer( ber_socket_t sd );
|
||||
LDAP_SLAPD_F (void) slapd_shutsock( ber_socket_t sd );
|
||||
|
||||
LDAP_SLAPD_V (volatile sig_atomic_t) slapd_abrupt_shutdown;
|
||||
LDAP_SLAPD_V (volatile sig_atomic_t) slapd_shutdown;
|
||||
|
|
|
|||
|
|
@ -286,6 +286,7 @@ static long send_ldap_ber(
|
|||
Connection *conn = op->o_conn;
|
||||
ber_len_t bytes;
|
||||
long ret = 0;
|
||||
char *close_reason;
|
||||
|
||||
ber_get_option( ber, LBER_OPT_BER_BYTES_TO_WRITE, &bytes );
|
||||
|
||||
|
|
@ -300,7 +301,9 @@ static long send_ldap_ber(
|
|||
conn->c_writers++;
|
||||
|
||||
while ( conn->c_writers > 0 && conn->c_writing ) {
|
||||
ldap_pvt_thread_pool_idle( &connection_pool );
|
||||
ldap_pvt_thread_cond_wait( &conn->c_write1_cv, &conn->c_write1_mutex );
|
||||
ldap_pvt_thread_pool_unidle( &connection_pool );
|
||||
}
|
||||
|
||||
/* connection was closed under us */
|
||||
|
|
@ -337,28 +340,36 @@ static long send_ldap_ber(
|
|||
err, sock_errstr(err), 0 );
|
||||
|
||||
if ( err != EWOULDBLOCK && err != EAGAIN ) {
|
||||
close_reason = "connection lost on write";
|
||||
fail:
|
||||
conn->c_writers--;
|
||||
conn->c_writing = 0;
|
||||
ldap_pvt_thread_mutex_unlock( &conn->c_write1_mutex );
|
||||
ldap_pvt_thread_mutex_lock( &conn->c_mutex );
|
||||
connection_closing( conn, "connection lost on write" );
|
||||
|
||||
connection_closing( conn, close_reason );
|
||||
ldap_pvt_thread_mutex_unlock( &conn->c_mutex );
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* wait for socket to be write-ready */
|
||||
ldap_pvt_thread_mutex_lock( &conn->c_write2_mutex );
|
||||
conn->c_writewaiter = 1;
|
||||
slapd_set_write( conn->c_sd, 2 );
|
||||
|
||||
ldap_pvt_thread_mutex_unlock( &conn->c_write1_mutex );
|
||||
ldap_pvt_thread_pool_idle( &connection_pool );
|
||||
ldap_pvt_thread_cond_wait( &conn->c_write2_cv, &conn->c_write2_mutex );
|
||||
err = slapd_wait_writer( conn->c_sd );
|
||||
conn->c_writewaiter = 0;
|
||||
ldap_pvt_thread_mutex_unlock( &conn->c_write2_mutex );
|
||||
ldap_pvt_thread_pool_unidle( &connection_pool );
|
||||
ldap_pvt_thread_mutex_lock( &conn->c_write1_mutex );
|
||||
/* 0 is timeout, so we close it.
|
||||
* -1 is an error, close it.
|
||||
*/
|
||||
if ( err <= 0 ) {
|
||||
if ( err == 0 )
|
||||
close_reason = "writetimeout";
|
||||
else
|
||||
close_reason = "connection lost on writewait";
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if ( conn->c_writers < 0 ) {
|
||||
ret = 0;
|
||||
break;
|
||||
|
|
|
|||
Loading…
Reference in a new issue