Align Operation buffers using LBER_ALIGNED_BUFFER

This commit is contained in:
Kurt Zeilenga 2005-10-12 23:17:28 +00:00
parent 0ea43c9d7d
commit 6db7e605ee
16 changed files with 1167 additions and 366 deletions

View file

@ -29,6 +29,10 @@
#define LDAP_THREAD_POOL_IMPLEMENTATION
#include "ldap_thr_debug.h" /* May rename symbols defined below */
#ifdef LDAP_DEVEL
#define SLAP_SEM_LOAD_CONTROL
#endif
#ifndef LDAP_THREAD_HAVE_TPOOL
typedef enum ldap_int_thread_pool_state_e {
@ -118,6 +122,123 @@ ldap_int_thread_pool_shutdown ( void )
return(0);
}
typedef struct ldap_lazy_sem_t {
ldap_pvt_thread_mutex_t ls_mutex;
ldap_pvt_thread_cond_t ls_cond;
int ls_sem_value;
/*
* when more than ls_lazy_count number of resources
* becmoes available, the thread wating for the resources will
* be waken up in order to prevent frequent blocking/waking-up
*/
unsigned int ls_lazy_count;
/*
* only one thread(listener) will wait on this semaphore
* using a flag instead of a list
*/
int ls_wait;
} ldap_lazy_sem_t;
ldap_lazy_sem_t* thread_pool_sem = NULL;
int
ldap_lazy_sem_init( unsigned int value, unsigned int lazyness ) {
thread_pool_sem = (ldap_lazy_sem_t*) LDAP_CALLOC(1,
sizeof( ldap_lazy_sem_t ));
if( thread_pool_sem == NULL ) return -1;
ldap_pvt_thread_mutex_init( &thread_pool_sem->ls_mutex );
ldap_pvt_thread_cond_init( &thread_pool_sem->ls_cond );
thread_pool_sem->ls_sem_value = value;
thread_pool_sem->ls_lazy_count = lazyness;
thread_pool_sem->ls_wait = 0;
return 0;
}
/*
* ldap_lazy_sem_wait is used if a caller is blockable(listener).
* Otherwise use ldap_lazy_sem_dec (worker)
*/
int
ldap_lazy_sem_op_submit( ldap_lazy_sem_t* ls ) {
if ( ls == NULL ) return -1;
/* only worker thread has its thread ctx */
if ( ldap_pvt_thread_pool_context() ) {
/* worker thread */
return ldap_lazy_sem_dec( ls );
} else {
/* listener */
return ldap_lazy_sem_wait( ls );
}
}
/*
* test if given semaphore's count is zero.
* If 0, the caller is blocked
* If not, the count is decremented.
*/
int
ldap_lazy_sem_wait ( ldap_lazy_sem_t* ls ) {
ldap_pvt_thread_mutex_lock( &ls->ls_mutex );
lazy_sem_retry:
if ( ls->ls_sem_value <= 0 ) {
/* no more avaliable resources */
ls->ls_wait = 1;
ldap_pvt_thread_cond_wait( &ls->ls_cond, &ls->ls_mutex );
goto lazy_sem_retry;
} else {
/* avaliable resources */
ls->ls_sem_value--;
}
ldap_pvt_thread_mutex_unlock( &ls->ls_mutex );
return 0;
}
/*
* decrement the count without blocking
* even when the count becomes less than or equal to 0
*/
int
ldap_lazy_sem_dec ( ldap_lazy_sem_t* ls ) {
ldap_pvt_thread_mutex_lock( &ls->ls_mutex );
ls->ls_sem_value--;
ldap_pvt_thread_mutex_unlock( &ls->ls_mutex );
return 0;
}
/*
* Increment the count by one and test if it is greater or
* equal to lazyness. If it is, wake up a blocked thread.
*/
int
ldap_lazy_sem_post( ldap_lazy_sem_t* ls ) {
if( ls == NULL ) return (-1);
ldap_pvt_thread_mutex_lock( &ls->ls_mutex );
ls->ls_sem_value++;
if ( ls->ls_wait ) {
if ( ls->ls_sem_value >= ls->ls_lazy_count ) {
ls->ls_wait = 0;
ldap_pvt_thread_cond_signal( &ls->ls_cond );
}
}
ldap_pvt_thread_mutex_unlock( &ls->ls_mutex );
return 0;
}
int
ldap_pvt_thread_pool_init (
ldap_pvt_thread_pool_t *tpool,
@ -257,6 +378,10 @@ ldap_pvt_thread_pool_submit (
}
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
#ifdef SLAP_SEM_LOAD_CONTROL
ldap_lazy_sem_op_submit( thread_pool_sem );
#endif
if (need_thread) {
int rc;
@ -408,6 +533,11 @@ ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
LDAP_FREE(pool);
#ifdef SLAP_SEM_LOAD_CONTROL
if ( thread_pool_sem ) {
LDAP_FREE( thread_pool_sem );
}
#endif
return(0);
}
@ -484,6 +614,9 @@ ldap_int_thread_pool_wrapper (
ctx->ltc_start_routine(ltc_key, ctx->ltc_arg);
#ifdef SLAP_SEM_LOAD_CONTROL
ldap_lazy_sem_post( thread_pool_sem );
#endif
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
LDAP_SLIST_REMOVE(&pool->ltp_active_list, ctx,
ldap_int_thread_ctx_s, ltc_next.al);

View file

@ -172,8 +172,8 @@ bdb_online_index( void *ctx, void *arg )
struct bdb_info *bdb = be->be_private;
Connection conn = {0};
char opbuf[OPERATION_BUFFER_SIZE];
Operation *op = (Operation *)opbuf;
OperationBuffer opbuf;
Operation *op = (Operation *) &opbuf;
DBC *curs;
DBT key, data;

View file

@ -642,7 +642,7 @@ monitor_filter2ndn(
struct berval *ndn )
{
Connection conn = { 0 };
char opbuf[OPERATION_BUFFER_SIZE];
OperationBuffer opbuf;
Operation *op;
SlapReply rs = { 0 };
slap_callback cb = { NULL, monitor_filter2ndn_cb, NULL, NULL };
@ -654,7 +654,7 @@ monitor_filter2ndn(
return -1;
}
op = (Operation *)opbuf;
op = (Operation *) &opbuf;
connection_fake_init( &conn, op, &conn );
op->o_tag = LDAP_REQ_SEARCH;

View file

@ -224,8 +224,8 @@ backsql_db_open(
SQLHDBC dbh = SQL_NULL_HDBC;
struct berbuf bb = BB_NULL;
char opbuf[ OPERATION_BUFFER_SIZE ];
Operation* op = (Operation *)opbuf;
OperationBuffer opbuf;
Operation* op = (Operation *) &opbuf;
Debug( LDAP_DEBUG_TRACE, "==>backsql_db_open(): "
"testing RDBMS connection\n", 0, 0, 0 );
@ -581,8 +581,8 @@ backsql_db_close(
int
backsql_connection_destroy( Backend *bd, Connection *c )
{
char opbuf[ OPERATION_BUFFER_SIZE ];
Operation* op = (Operation *)opbuf;
OperationBuffer opbuf;
Operation* op = (Operation *) &opbuf;
op->o_hdr = (Opheader *)&op[ 1 ];
op->o_connid = c->c_connid;

View file

@ -2623,7 +2623,7 @@ config_setup_ldif( BackendDB *be, const char *dir, int readit ) {
setup_cookie sc;
slap_callback cb = { NULL, config_ldif_resp, NULL, NULL };
Connection conn = {0};
char opbuf[OPERATION_BUFFER_SIZE];
OperationBuffer opbuf;
Operation *op;
SlapReply rs = {REP_RESULT};
Filter filter = { LDAP_FILTER_PRESENT };
@ -2675,7 +2675,7 @@ config_setup_ldif( BackendDB *be, const char *dir, int readit ) {
if ( readit ) {
void *thrctx = ldap_pvt_thread_pool_context();
op = (Operation *)opbuf;
op = (Operation *) &opbuf;
connection_fake_init( &conn, op, thrctx );
filter.f_desc = slap_schema.si_ad_objectClass;
@ -4063,7 +4063,7 @@ config_back_db_open( BackendDB *be )
BackendInfo *bi;
ConfigArgs c;
Connection conn = {0};
char opbuf[OPERATION_BUFFER_SIZE];
OperationBuffer opbuf;
Operation *op;
slap_callback cb = { NULL, slap_null_cb, NULL, NULL };
SlapReply rs = {REP_RESULT};
@ -4075,7 +4075,7 @@ config_back_db_open( BackendDB *be )
if ( cfb->cb_use_ldif ) {
thrctx = ldap_pvt_thread_pool_context();
op = (Operation *)opbuf;
op = (Operation *) &opbuf;
connection_fake_init( &conn, op, thrctx );
op->o_dn = be->be_rootdn;

View file

@ -43,9 +43,26 @@
#include "slapi/slapi.h"
#endif
#ifdef SLAP_MULTI_CONN_ARRAY
/* for Multiple Connection Arrary (MCA) Support */
static ldap_pvt_thread_mutex_t* connections_mutex;
static Connection **connections = NULL;
/* set to the number of processors */
#define NUM_CONNECTION_ARRAY 2
/* partition the array in a modulo manner */
#define MCA_conn_array_id( fd ) ((int)fd%NUM_CONNECTION_ARRAY)
#define MCA_conn_array_element_id( fd ) ((int)fd/NUM_CONNECTION_ARRAY)
#define MCA_GET_CONNECTION(fd) &(connections[MCA_conn_array_id(fd)])[MCA_conn_array_element_id( fd )]
#define MCA_GET_CONN_MUTEX(fd) &connections_mutex[MCA_conn_array_id(fd)]
#else
/* protected by connections_mutex */
static ldap_pvt_thread_mutex_t connections_mutex;
static Connection *connections = NULL;
#endif
static ldap_pvt_thread_mutex_t conn_nextid_mutex;
static unsigned long conn_nextid = 0;
@ -82,10 +99,17 @@ connection_state2str( int state )
static Connection* connection_get( ber_socket_t s );
#ifdef SLAP_LIGHTWEIGHT_LISTENER
static int connection_input( Connection *c, Operation** op );
#else
static int connection_input( Connection *c );
#endif
static void connection_close( Connection *c );
static int connection_op_activate( Operation *op );
#ifdef SLAP_LIGHTWEIGHT_LISTENER
static void connection_op_queue( Operation *op );
#endif
static int connection_resched( Connection *conn );
static void connection_abandon( Connection *conn );
static void connection_destroy( Connection *c );
@ -96,6 +120,65 @@ static ldap_pvt_thread_start_t connection_operation;
* Initialize connection management infrastructure.
*/
int connections_init(void)
#ifdef SLAP_MULTI_CONN_ARRAY
{
int i, j;
Connection* conn;
assert( connections == NULL );
if( connections != NULL) {
Debug( LDAP_DEBUG_ANY, "connections_init: already initialized.\n",
0, 0, 0 );
return -1;
}
connections_mutex = (ldap_pvt_thread_mutex_t*) ch_calloc( NUM_CONNECTION_ARRAY, sizeof(ldap_pvt_thread_mutex_t) );
if( connections_mutex == NULL ) {
Debug( LDAP_DEBUG_ANY,
"connections_init: allocation of connection mutex[%d] failed\n", i, 0, 0 );
return -1;
}
connections = (Connection**) ch_calloc( NUM_CONNECTION_ARRAY, sizeof(Connection*));
if( connections == NULL ) {
Debug( LDAP_DEBUG_ANY,
"connections_init: allocation of connection[%d] failed\n", 0, 0, 0 );
return -1;
}
for ( i = 0; i < NUM_CONNECTION_ARRAY; i++ ) {
ldap_pvt_thread_mutex_init( connections_mutex+i );
connections[i] = (Connection*) ch_calloc( (dtblsize/NUM_CONNECTION_ARRAY), sizeof(Connection) );
if( connections[i] == NULL ) {
Debug( LDAP_DEBUG_ANY,
"connections_init: allocation (%d*%ld) of connection array[%d] failed\n",
dtblsize, (long) sizeof(Connection), i );
return -1;
}
}
/* should check return of every call */
ldap_pvt_thread_mutex_init( &conn_nextid_mutex );
assert( connections[0]->c_struct_state == SLAP_C_UNINITIALIZED );
assert( connections[NUM_CONNECTION_ARRAY-1]->c_struct_state == SLAP_C_UNINITIALIZED );
for ( i = 0; i < NUM_CONNECTION_ARRAY; i++ ) {
conn = connections[i];
for ( j = 0; j < (dtblsize/NUM_CONNECTION_ARRAY); j++ ) {
conn[j].c_conn_idx = j;
}
}
/*
* per entry initialization of the Connection array initialization
* will be done by connection_init()
*/
return 0;
}
#else
{
int i;
@ -132,11 +215,55 @@ int connections_init(void)
return 0;
}
#endif
/*
* Destroy connection management infrastructure.
*/
int connections_destroy(void)
#ifdef SLAP_MULTI_CONN_ARRAY
{
int i;
ber_socket_t j;
if( connections == NULL) {
Debug( LDAP_DEBUG_ANY, "connections_destroy: nothing to destroy.\n",
0, 0, 0 );
return -1;
}
for ( i = 0; i < NUM_CONNECTION_ARRAY; i++ ) {
Connection* conn = connections[i];
for ( j = 0; j < (dtblsize/NUM_CONNECTION_ARRAY); j++ ) {
if( conn[j].c_struct_state != SLAP_C_UNINITIALIZED ) {
ber_sockbuf_free( conn[j].c_sb );
ldap_pvt_thread_mutex_destroy( &conn[j].c_mutex );
ldap_pvt_thread_mutex_destroy( &conn[j].c_write_mutex );
ldap_pvt_thread_cond_destroy( &conn[j].c_write_cv );
#ifdef LDAP_SLAPI
/* FIX ME!! */
if ( slapi_plugins_used ) {
slapi_int_free_object_extensions( SLAPI_X_EXT_CONNECTION,
&connections[i] );
}
#endif
}
}
}
for ( i = 0; i < NUM_CONNECTION_ARRAY; i++ ) {
free( connections[i] );
connections[i] = NULL;
ldap_pvt_thread_mutex_destroy( &connections_mutex[i] );
}
ldap_pvt_thread_mutex_destroy( &conn_nextid_mutex );
return 0;
}
#else
{
ber_socket_t i;
@ -156,7 +283,8 @@ int connections_destroy(void)
ldap_pvt_thread_cond_destroy( &connections[i].c_write_cv );
#ifdef LDAP_SLAPI
if ( slapi_plugins_used ) {
slapi_int_free_object_extensions( SLAPI_X_EXT_CONNECTION, &connections[i] );
slapi_int_free_object_extensions( SLAPI_X_EXT_CONNECTION,
&connections[i] );
}
#endif
}
@ -169,11 +297,45 @@ int connections_destroy(void)
ldap_pvt_thread_mutex_destroy( &conn_nextid_mutex );
return 0;
}
#endif
/*
* shutdown all connections
*/
int connections_shutdown(void)
#ifdef SLAP_MULTI_CONN_ARRAY
{
int i;
ber_socket_t j;
for ( i = 0; i < NUM_CONNECTION_ARRAY; i++ ) {
Connection* conn = connections[i];
ldap_pvt_thread_mutex_lock( &connections_mutex[i] );
for ( j = 0; j < (dtblsize/NUM_CONNECTION_ARRAY); j++ ) {
if( conn[j].c_struct_state != SLAP_C_USED ) {
continue;
}
/* give persistent clients a chance to cleanup */
if( conn[j].c_conn_state == SLAP_C_CLIENT ) {
ldap_pvt_thread_pool_submit( &connection_pool,
conn[j].c_clientfunc, conn[j].c_clientarg );
continue;
}
ldap_pvt_thread_mutex_lock( &conn[j].c_mutex );
/* connections_mutex and c_mutex are locked */
connection_closing( &conn[j], "connection shutdown" );
connection_close( &conn[j] );
ldap_pvt_thread_mutex_unlock( &conn[j].c_mutex );
}
ldap_pvt_thread_mutex_unlock( &connections_mutex[i] );
}
return 0;
}
#else
{
ber_socket_t i;
@ -203,6 +365,7 @@ int connections_shutdown(void)
return 0;
}
#endif
/*
* Timeout idle connections.
@ -251,7 +414,11 @@ static Connection* connection_get( ber_socket_t s )
}
#ifndef HAVE_WINSOCK
#ifdef SLAP_MULTI_CONN_ARRAY
c = MCA_GET_CONNECTION(s);
#else
c = &connections[s];
#endif
assert( c->c_struct_state != SLAP_C_UNINITIALIZED );
@ -367,10 +534,18 @@ long connection_init(
assert( s < dtblsize );
#endif
#ifdef SLAP_MULTI_CONN_ARRAY
ldap_pvt_thread_mutex_lock( MCA_GET_CONN_MUTEX(s) );
#else
ldap_pvt_thread_mutex_lock( &connections_mutex );
#endif
#ifndef HAVE_WINSOCK
#ifdef SLAP_MULTI_CONN_ARRAY
c = MCA_GET_CONNECTION(s);
#else
c = &connections[s];
#endif
#else
{
@ -497,7 +672,11 @@ long connection_init(
c->c_close_reason = "?"; /* should never be needed */
ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_FD, &s );
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
#ifdef SLAP_MULTI_CONN_ARRAY
ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
#else
ldap_pvt_thread_mutex_unlock( &connections_mutex );
#endif
return 0;
}
@ -585,7 +764,11 @@ long connection_init(
slap_sasl_external( c, ssf, authid );
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
#ifdef SLAP_MULTI_CONN_ARRAY
ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
#else
ldap_pvt_thread_mutex_unlock( &connections_mutex );
#endif
backend_connection_init(c);
@ -611,10 +794,12 @@ void connection2anonymous( Connection *c )
ch_free(c->c_dn.bv_val);
}
BER_BVZERO( &c->c_dn );
if ( !BER_BVISNULL( &c->c_ndn ) ) {
ch_free(c->c_ndn.bv_val);
}
BER_BVZERO( &c->c_ndn );
if ( !BER_BVISNULL( &c->c_sasl_authz_dn ) ) {
ber_memfree_x( c->c_sasl_authz_dn.bv_val, NULL );
}
@ -817,21 +1002,34 @@ unsigned long connections_nextid(void)
unsigned long id;
assert( connections != NULL );
ldap_pvt_thread_mutex_lock( &connections_mutex );
ldap_pvt_thread_mutex_lock( &conn_nextid_mutex );
id = conn_nextid;
ldap_pvt_thread_mutex_unlock( &connections_mutex );
ldap_pvt_thread_mutex_unlock( &conn_nextid_mutex );
return id;
}
Connection* connection_first( ber_socket_t *index )
{
#ifdef SLAP_MULTI_CONN_ARRAY
int conn_array_id;
#endif
assert( connections != NULL );
assert( index != NULL );
#ifdef SLAP_MULTI_CONN_ARRAY
for ( conn_array_id = 0;
conn_array_id < NUM_CONNECTION_ARRAY;
conn_array_id++ )
{
ldap_pvt_thread_mutex_lock( &connections_mutex[ conn_array_id ] );
}
#else
ldap_pvt_thread_mutex_lock( &connections_mutex );
#endif
*index = 0;
@ -839,6 +1037,50 @@ Connection* connection_first( ber_socket_t *index )
}
Connection* connection_next( Connection *c, ber_socket_t *index )
#ifdef SLAP_MULTI_CONN_ARRAY
{
Connection* conn;
assert( connections != NULL );
assert( index != NULL );
assert( *index <= (dtblsize/NUM_CONNECTION_ARRAY) );
if( c != NULL ) {
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
}
c = NULL;
for(; *index < dtblsize; (*index)++) {
conn = MCA_GET_CONNECTION(*index);
if( conn->c_struct_state == SLAP_C_UNINITIALIZED ) {
assert( conn->c_conn_state == SLAP_C_INVALID );
#ifndef HAVE_WINSOCK
continue;
#else
break;
#endif
}
if( conn->c_struct_state == SLAP_C_USED ) {
assert( conn->c_conn_state != SLAP_C_INVALID );
c = conn;
(*index)++;
break;
}
assert( conn->c_struct_state == SLAP_C_UNUSED );
assert( conn->c_conn_state == SLAP_C_INVALID );
}
if( c != NULL ) {
ldap_pvt_thread_mutex_lock( &c->c_mutex );
}
return c;
}
#else
{
assert( connections != NULL );
assert( index != NULL );
@ -876,16 +1118,30 @@ Connection* connection_next( Connection *c, ber_socket_t *index )
return c;
}
#endif
void connection_done( Connection *c )
{
#ifdef SLAP_MULTI_CONN_ARRAY
int conn_array_id;
#endif
assert( connections != NULL );
if( c != NULL ) {
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
}
#ifdef SLAP_MULTI_CONN_ARRAY
for ( conn_array_id = 0;
conn_array_id < NUM_CONNECTION_ARRAY;
conn_array_id++ )
{
ldap_pvt_thread_mutex_unlock( &connections_mutex[ conn_array_id ] );
}
#else
ldap_pvt_thread_mutex_unlock( &connections_mutex );
#endif
}
/*
@ -910,7 +1166,7 @@ void connection_done( Connection *c )
ldap_pvt_thread_mutex_unlock( &slap_counters.sc_ops_mutex ); \
} while (0)
#else /* !SLAPD_MONITOR */
#define INCR_OP_INITIATED(index)
#define INCR_OP_INITIATED(index) do { } while (0)
#define INCR_OP_COMPLETED(index) \
do { \
ldap_pvt_thread_mutex_lock( &slap_counters.sc_ops_mutex ); \
@ -1169,14 +1425,76 @@ void connection_client_stop(
slapd_remove( s, 0, 1 );
}
#ifdef SLAP_LIGHTWEIGHT_LISTENER
void* connection_processing_thread( void* ctx, void* argv )
{
int rc ;
Operation* new_op = NULL;
ber_socket_t s = (ber_socket_t)argv;
/*
* read incoming LDAP requests. If there is more than one,
* the first one is returned with new_op
*/
if( ( rc = connection_read( s, &new_op ) ) < 0 ) {
Debug( LDAP_DEBUG_TRACE, "connection_read(%d) error\n", s, 0, 0 );
tcp_close( s );
return (void*)rc;
}
/* execute the queued request in the same thread */
if( new_op ) {
rc = (int)connection_operation(
ldap_pvt_thread_pool_context(), new_op );
}
return (void*)rc;
}
int connection_processing_activate( ber_socket_t s )
{
int status;
/*
* suspend reading on this file descriptor until a connection processing
* thread reads data on it. Otherwise the listener thread will repeatedly
* submit the same event on it to the pool.
*/
if( !slapd_suspend( s ) ) return 0;
status = ldap_pvt_thread_pool_submit( &connection_pool,
connection_processing_thread, (void *) s );
if( status != 0 ) {
Debug( LDAP_DEBUG_ANY, "connection_processing_activiate(%d): "
"ldap_pvt_thread_pool_submit failed\n",
s, 0, 0 );
return -1;
}
return 1;
}
#endif
#ifdef SLAP_LIGHTWEIGHT_LISTENER
int connection_read( ber_socket_t s, Operation** op )
#else
int connection_read(ber_socket_t s)
#endif
{
int rc = 0;
Connection *c;
#ifdef SLAP_LIGHTWEIGHT_LISTENER
int need_resume = 1;
#endif
assert( connections != NULL );
#ifdef SLAP_MULTI_CONN_ARRAY
ldap_pvt_thread_mutex_lock( MCA_GET_CONN_MUTEX(s) );
#else
ldap_pvt_thread_mutex_lock( &connections_mutex );
#endif
/* get (locked) connection */
c = connection_get( s );
@ -1187,7 +1505,11 @@ int connection_read(ber_socket_t s)
(long) s, 0, 0 );
slapd_remove(s, 1, 0);
#ifdef SLAP_MULTI_CONN_ARRAY
ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
#else
ldap_pvt_thread_mutex_unlock( &connections_mutex );
#endif
return -1;
}
@ -1198,16 +1520,32 @@ int connection_read(ber_socket_t s)
"connection_read(%d): closing, ignoring input for id=%lu\n",
s, c->c_connid, 0 );
connection_return( c );
#ifdef SLAP_MULTI_CONN_ARRAY
ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
#else
ldap_pvt_thread_mutex_unlock( &connections_mutex );
#endif
#ifdef SLAP_LIGHTWEIGHT_LISTENER
slapd_resume( s );
#endif
return 0;
}
if ( c->c_conn_state == SLAP_C_CLIENT ) {
#ifdef SLAP_LIGHTWEIGHT_LISTENER
slapd_resume( s );
#endif
slapd_clr_read( s, 0 );
ldap_pvt_thread_pool_submit( &connection_pool,
c->c_clientfunc, c->c_clientarg );
connection_return( c );
#ifdef SLAP_MULTI_CONN_ARRAY
ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
#else
ldap_pvt_thread_mutex_unlock( &connections_mutex );
#endif
return 0;
}
@ -1219,33 +1557,34 @@ int connection_read(ber_socket_t s)
if ( c->c_is_tls && c->c_needs_tls_accept ) {
rc = ldap_pvt_tls_accept( c->c_sb, slap_tls_ctx );
if ( rc < 0 ) {
#if 0 /* required by next #if 0 */
struct timeval tv;
fd_set rfd;
#endif
Debug( LDAP_DEBUG_TRACE,
"connection_read(%d): TLS accept error "
"error=%d id=%lu, closing\n",
s, rc, c->c_connid );
c->c_needs_tls_accept = 0;
/* connections_mutex and c_mutex are locked */
connection_closing( c, "TLS negotiation failure" );
#if 0
/* Drain input before close, to allow SSL error codes
* to propagate to client. */
FD_ZERO(&rfd);
FD_SET(s, &rfd);
for (rc=1; rc>0;) {
tv.tv_sec = 1;
tv.tv_usec = 0;
rc = select(s+1, &rfd, NULL, NULL, &tv);
if (rc == 1) {
ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_DRAIN, NULL);
{
struct timeval tv;
fd_set rfd;
/* Drain input before close, to allow SSL error codes
* to propagate to client. */
FD_ZERO(&rfd);
FD_SET(s, &rfd);
for (rc=1; rc>0;) {
tv.tv_sec = 1;
tv.tv_usec = 0;
rc = select(s+1, &rfd, NULL, NULL, &tv);
if (rc == 1) {
ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_DRAIN, NULL);
}
}
}
#endif
connection_close( c );
} else if ( rc == 0 ) {
@ -1280,7 +1619,15 @@ int connection_read(ber_socket_t s)
!ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_DATA_READY, NULL ) )
{
connection_return( c );
#ifdef SLAP_MULTI_CONN_ARRAY
ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
#else
ldap_pvt_thread_mutex_unlock( &connections_mutex );
#endif
#ifdef SLAP_LIGHTWEIGHT_LISTENER
slapd_resume( s );
#endif
return 0;
}
}
@ -1291,24 +1638,38 @@ int connection_read(ber_socket_t s)
/* If previous layer is not removed yet, give up for now */
if ( !c->c_sasl_sockctx ) {
connection_return( c );
#ifdef SLAP_MULTI_CONN_ARRAY
ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
#else
ldap_pvt_thread_mutex_unlock( &connections_mutex );
#endif
#ifdef SLAP_LIGHTWEIGHT_LISTENER
slapd_resume( s );
#endif
return 0;
}
c->c_sasl_layers = 0;
rc = ldap_pvt_sasl_install( c->c_sb, c->c_sasl_sockctx );
if( rc != LDAP_SUCCESS ) {
Debug( LDAP_DEBUG_TRACE,
"connection_read(%d): SASL install error "
"error=%d id=%lu, closing\n",
s, rc, c->c_connid );
/* connections_mutex and c_mutex are locked */
#ifdef SLAP_LIGHTWEIGHT_LISTENER
slapd_resume( s );
#endif
connection_closing( c, "SASL layer install failure" );
connection_close( c );
connection_return( c );
#ifdef SLAP_MULTI_CONN_ARRAY
ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
#else
ldap_pvt_thread_mutex_unlock( &connections_mutex );
#endif
return 0;
}
}
@ -1319,7 +1680,21 @@ int connection_read(ber_socket_t s)
do {
/* How do we do this without getting into a busy loop ? */
#ifdef SLAP_LIGHTWEIGHT_LISTENER
rc = connection_input( c, op );
#else
rc = connection_input( c );
#endif
#ifdef SLAP_LIGHTWEIGHT_LISTENER
if( *op && (*op)->o_tag == LDAP_REQ_UNBIND ) {
/*
* After the reception of an unbind request,
* no more incoming requests via the connection
* is expected. Therefore, don't resume connection reading.
*/
need_resume = 0;
}
#endif
}
#ifdef DATA_READY_LOOP
while( !rc && ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_DATA_READY, NULL ));
@ -1333,14 +1708,23 @@ int connection_read(ber_socket_t s)
Debug( LDAP_DEBUG_TRACE,
"connection_read(%d): input error=%d id=%lu, closing.\n",
s, rc, c->c_connid );
/* connections_mutex and c_mutex are locked */
connection_closing( c, conn_lost_str );
connection_close( c );
connection_return( c );
#ifdef SLAP_MULTI_CONN_ARRAY
ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
#else
ldap_pvt_thread_mutex_unlock( &connections_mutex );
#endif
return 0;
}
#ifdef SLAP_LIGHTWEIGHT_LISTENER
if ( need_resume ) slapd_resume( s );
#endif
if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_READ, NULL ) ) {
slapd_set_read( s, 1 );
}
@ -1350,13 +1734,22 @@ int connection_read(ber_socket_t s)
}
connection_return( c );
#ifdef SLAP_MULTI_CONN_ARRAY
ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
#else
ldap_pvt_thread_mutex_unlock( &connections_mutex );
#endif
return 0;
}
#ifdef SLAP_LIGHTWEIGHT_LISTENER
static int
connection_input(
Connection *conn )
connection_input( Connection *conn, Operation** c_op )
#else
static int
connection_input( Connection *conn )
#endif
{
Operation *op;
ber_tag_t tag;
@ -1382,10 +1775,11 @@ connection_input(
#ifdef LDAP_CONNECTIONLESS
if ( conn->c_is_udp ) {
char peername[sizeof("IP=255.255.255.255:65336")];
len = ber_int_sb_read(conn->c_sb, &peeraddr,
sizeof(struct sockaddr));
if (len != sizeof(struct sockaddr))
return 1;
if (len != sizeof(struct sockaddr)) return 1;
sprintf( peername, "IP=%s:%d",
inet_ntoa( peeraddr.sa_in_addr.sin_addr ),
(unsigned) ntohs( peeraddr.sa_in_addr.sin_port ) );
@ -1394,6 +1788,7 @@ connection_input(
conn->c_connid, peername, conn->c_sock_name.bv_val, 0, 0 );
}
#endif
tag = ber_get_next( conn->c_sb, &len, conn->c_currentber );
if ( tag != LDAP_TAG_MESSAGE ) {
int err = errno;
@ -1447,6 +1842,7 @@ connection_input(
}
}
#endif
if(tag == LDAP_REQ_BIND) {
/* immediately abandon all existing operations upon BIND */
connection_abandon( conn );
@ -1543,7 +1939,21 @@ connection_input(
}
} else {
conn->c_n_ops_executing++;
connection_op_activate( op );
#ifdef SLAP_LIGHTWEIGHT_LISTENER
/*
* The first op will be processed in the same thread context,
* Subsequent ops will be submitted to the pool by
* calling connection_op_activate()
*/
if ( *c_op == NULL ) {
/* the first incoming request */
connection_op_queue( op );
*c_op = op;
} else
#endif
{
connection_op_activate( op );
}
}
#ifdef NO_THREADS
@ -1568,7 +1978,11 @@ connection_resched( Connection *conn )
ber_sockbuf_ctrl( conn->c_sb, LBER_SB_OPT_GET_FD, &sd );
/* use trylock to avoid possible deadlock */
#ifdef SLAP_MULTI_CONN_ARRAY
rc = ldap_pvt_thread_mutex_trylock( MCA_GET_CONN_MUTEX( sd ) );
#else
rc = ldap_pvt_thread_mutex_trylock( &connections_mutex );
#endif
if( rc ) {
Debug( LDAP_DEBUG_TRACE,
@ -1580,7 +1994,11 @@ connection_resched( Connection *conn )
* so recheck state below.
*/
ldap_pvt_thread_mutex_unlock( &conn->c_mutex );
#ifdef SLAP_MULTI_CONN_ARRAY
ldap_pvt_thread_mutex_lock( MCA_GET_CONN_MUTEX ( sd ) );
#else
ldap_pvt_thread_mutex_lock( &connections_mutex );
#endif
ldap_pvt_thread_mutex_lock( &conn->c_mutex );
}
@ -1595,7 +2013,11 @@ connection_resched( Connection *conn )
connection_close( conn );
}
#ifdef SLAP_MULTI_CONN_ARRAY
ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX( sd ) );
#else
ldap_pvt_thread_mutex_unlock( &connections_mutex );
#endif
return 0;
}
@ -1630,15 +2052,15 @@ connection_init_log_prefix( Operation *op )
{
if ( op->o_connid == (unsigned long)(-1) ) {
snprintf( op->o_log_prefix, sizeof( op->o_log_prefix ),
"conn=-1 op=%lu", op->o_opid );
"conn=-1 op=%lu", op->o_opid );
} else {
snprintf( op->o_log_prefix, sizeof( op->o_log_prefix ),
"conn=%lu op=%lu", op->o_connid, op->o_opid );
"conn=%lu op=%lu", op->o_connid, op->o_opid );
}
}
static int connection_op_activate( Operation *op )
static void connection_op_queue( Operation *op )
{
int status;
ber_tag_t tag = op->o_tag;
@ -1657,6 +2079,7 @@ static int connection_op_activate( Operation *op )
ber_dupbv( &op->o_ndn, &op->o_conn->c_sasl_authz_dn );
}
}
op->o_authtype = op->o_conn->c_authtype;
ber_dupbv( &op->o_authmech, &op->o_conn->c_authmech );
@ -1664,6 +2087,7 @@ static int connection_op_activate( Operation *op )
op->o_protocol = op->o_conn->c_protocol
? op->o_conn->c_protocol : LDAP_VERSION3;
}
if (op->o_conn->c_conn_state == SLAP_C_INACTIVE
&& op->o_protocol > LDAP_VERSION2)
{
@ -1674,6 +2098,14 @@ static int connection_op_activate( Operation *op )
connection_init_log_prefix( op );
LDAP_STAILQ_INSERT_TAIL( &op->o_conn->c_ops, op, o_next );
}
static int connection_op_activate( Operation *op )
{
int status;
ber_tag_t tag = op->o_tag;
connection_op_queue( op );
status = ldap_pvt_thread_pool_submit( &connection_pool,
connection_operation, (void *) op );
@ -1695,21 +2127,27 @@ int connection_write(ber_socket_t s)
assert( connections != NULL );
#ifdef SLAP_MULTI_CONN_ARRAY
ldap_pvt_thread_mutex_lock( MCA_GET_CONN_MUTEX( s ) );
#else
ldap_pvt_thread_mutex_lock( &connections_mutex );
#endif
c = connection_get( s );
if( c == NULL ) {
Debug( LDAP_DEBUG_ANY,
"connection_write(%ld): no connection!\n",
(long)s, 0, 0 );
slapd_remove(s, 1, 0);
#ifdef SLAP_MULTI_CONN_ARRAY
ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX( s ) );
#else
ldap_pvt_thread_mutex_unlock( &connections_mutex );
#endif
return -1;
}
slapd_clr_write( s, 0);
c->c_n_write++;
Debug( LDAP_DEBUG_TRACE,
@ -1723,8 +2161,9 @@ int connection_write(ber_socket_t s)
if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_WRITE, NULL ) ) {
slapd_set_write( s, 1 );
}
/* If there are ops pending because of a writewaiter, start
* one up.
/* If there are ops pending because of a writewaiter,
* start one up.
*/
while ((op = LDAP_STAILQ_FIRST( &c->c_pending_ops )) != NULL) {
if ( !c->c_writewaiter ) break;
@ -1744,7 +2183,13 @@ int connection_write(ber_socket_t s)
break;
}
connection_return( c );
#ifdef SLAP_MULTI_CONN_ARRAY
ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
#else
ldap_pvt_thread_mutex_unlock( &connections_mutex );
#endif
return 0;
}
@ -1784,4 +2229,3 @@ connection_assign_nextid( Connection *conn )
conn->c_connid = conn_nextid++;
ldap_pvt_thread_mutex_unlock( &conn_nextid_mutex );
}

File diff suppressed because it is too large Load diff

View file

@ -464,8 +464,8 @@ accesslog_purge( void *ctx, void *arg )
struct log_info *li = rtask->arg;
Connection conn = {0};
char opbuf[OPERATION_BUFFER_SIZE];
Operation *op = (Operation *)opbuf;
OperationBuffer opbuf;
Operation *op = (Operation *) &opbuf;
SlapReply rs = {REP_RESULT};
slap_callback cb = { NULL, log_old_lookup, NULL, NULL };
Filter f;

View file

@ -1462,7 +1462,7 @@ consistency_check(
cache_manager *cm = on->on_bi.bi_private;
query_manager *qm = cm->qm;
Connection conn = {0};
char opbuf[OPERATION_BUFFER_SIZE];
OperationBuffer opbuf;
Operation *op;
SlapReply rs = {REP_RESULT};
@ -1470,7 +1470,7 @@ consistency_check(
int i, return_val, pause = 1;
QueryTemplate* templ;
op = (Operation *)opbuf;
op = (Operation *) &opbuf;
connection_fake_init( &conn, op, ctx );
op->o_bd = &cm->db;

View file

@ -861,11 +861,11 @@ syncprov_qtask( void *ctx, void *arg )
struct re_s *rtask = arg;
syncops *so = rtask->arg;
slap_overinst *on = so->s_op->o_private;
char opbuf[OPERATION_BUFFER_SIZE];
OperationBuffer opbuf;
Operation *op;
BackendDB be;
op = (Operation *)opbuf;
op = (Operation *) &opbuf;
*op = *so->s_op;
op->o_hdr = (Opheader *)(op+1);
op->o_controls = (void **)(op->o_hdr+1);
@ -2217,9 +2217,9 @@ syncprov_db_open(
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
Connection conn;
char opbuf[OPERATION_BUFFER_SIZE];
OperationBuffer opbuf;
char ctxcsnbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
Operation *op = (Operation *)opbuf;
Operation *op = (Operation *) &opbuf;
Entry *e;
Attribute *a;
int rc;
@ -2307,8 +2307,8 @@ syncprov_db_close(
}
if ( si->si_numops ) {
Connection conn;
char opbuf[OPERATION_BUFFER_SIZE];
Operation *op = (Operation *)opbuf;
OperationBuffer opbuf;
Operation *op = (Operation *) &opbuf;
SlapReply rs = {REP_RESULT};
void *thrctx;

View file

@ -646,7 +646,11 @@ LDAP_SLAPD_F (const char *) connection_state2str LDAP_P(( int state ))
LDAP_GCCATTR((const));
LDAP_SLAPD_F (int) connection_write LDAP_P((ber_socket_t s));
#ifdef SLAP_LIGHTWEIGHT_LISTENER
LDAP_SLAPD_F (int) connection_read LDAP_P((ber_socket_t, Operation **));
#else
LDAP_SLAPD_F (int) connection_read LDAP_P((ber_socket_t s));
#endif
LDAP_SLAPD_F (unsigned long) connections_nextid(void);

View file

@ -60,6 +60,10 @@ LDAP_BEGIN_DECL
#ifdef LDAP_DEVEL
#define SLAP_LIGHTWEIGHT_LISTENER /* experimental slapd architecture */
#define SLAP_SEM_LOAD_CONTROL /* must also be defined in libldap_r/tpool.c */
#define SLAP_MULTI_CONN_ARRAY
#define SLAP_ACL_HONOR_DISCLOSE /* partially implemented */
#define SLAP_ACL_HONOR_MANAGE /* not yet implemented */
#define SLAP_DYNACL
@ -2555,7 +2559,11 @@ typedef struct slap_op {
LDAP_STAILQ_ENTRY(slap_op) o_next; /* next operation in list */
} Operation;
#define OPERATION_BUFFER_SIZE (sizeof(Operation)+sizeof(Opheader)+SLAP_MAX_CIDS*sizeof(void *))
#define OPERATION_BUFFER_SIZE ( sizeof(Operation) + sizeof(Opheader) + \
SLAP_MAX_CIDS*sizeof(void *) )
typedef LBER_ALIGNED_BUFFER(operation_buffer_u,OPERATION_BUFFER_SIZE)
OperationBuffer;
#define send_ldap_error( op, rs, err, text ) do { \
(rs)->sr_err = err; (rs)->sr_text = text; \

View file

@ -65,7 +65,7 @@ slapacl( int argc, char **argv )
const char *progname = "slapacl";
Connection conn = { 0 };
Listener listener;
char opbuf[OPERATION_BUFFER_SIZE];
OperationBuffer opbuf;
Operation *op = NULL;
Entry e = { 0 }, *ep = &e;
char *attr = NULL;
@ -94,7 +94,7 @@ slapacl( int argc, char **argv )
argv = &argv[ optind ];
argc -= optind;
op = (Operation *)opbuf;
op = (Operation *) &opbuf;
connection_fake_init( &conn, op, &conn );
conn.c_listener = &listener;

View file

@ -56,8 +56,8 @@ slapadd( int argc, char **argv )
Attribute *attr;
Entry *ctxcsn_e;
ID ctxcsn_id, id;
OperationBuffer opbuf;
Operation *op;
char opbuf[OPERATION_BUFFER_SIZE];
int match;
int ret;
@ -69,8 +69,8 @@ slapadd( int argc, char **argv )
slap_tool_init( progname, SLAPADD, argc, argv );
memset( opbuf, 0, sizeof(opbuf) );
op = (Operation *)opbuf;
memset( &opbuf, 0, sizeof(opbuf) );
op = (Operation *) &opbuf;
if( !be->be_entry_open ||
!be->be_entry_close ||

View file

@ -80,7 +80,7 @@ slapauth( int argc, char **argv )
int rc = EXIT_SUCCESS;
const char *progname = "slapauth";
Connection conn = {0};
char opbuf[OPERATION_BUFFER_SIZE];
OperationBuffer opbuf;
Operation *op;
slap_tool_init( progname, SLAPAUTH, argc, argv );
@ -88,7 +88,7 @@ slapauth( int argc, char **argv )
argv = &argv[ optind ];
argc -= optind;
op = (Operation *)opbuf;
op = (Operation *) &opbuf;
connection_fake_init( &conn, op, &conn );
conn.c_sasl_bind_mech = mech;

View file

@ -902,7 +902,7 @@ do_syncrepl(
struct re_s* rtask = arg;
syncinfo_t *si = ( syncinfo_t * ) rtask->arg;
Connection conn = {0};
char opbuf[OPERATION_BUFFER_SIZE];
OperationBuffer opbuf;
Operation *op;
int rc = LDAP_SUCCESS;
int first = 0;
@ -938,7 +938,7 @@ do_syncrepl(
return NULL;
}
op = (Operation *)opbuf;
op = (Operation *) &opbuf;
connection_fake_init( &conn, op, ctx );
/* use global malloc for now */