Add sc_writewait callback

Invoked before a blocked writer waits for socket to be writable.
Use in back-mdb to release reader txn while waiting.
This commit is contained in:
Howard Chu 2014-07-22 19:17:15 -07:00
parent a773a0cc14
commit be792fae2c
3 changed files with 75 additions and 0 deletions

View file

@ -316,6 +316,21 @@ static void scope_chunk_ret( Operation *op, ID2 *scopes )
static void *search_stack( Operation *op );
typedef struct ww_ctx {
MDB_txn *txn;
int flag;
} ww_ctx;
static void
mdb_writewait( Operation *op, slap_callback *sc )
{
ww_ctx *ww = sc->sc_private;
if ( !ww->flag ) {
mdb_txn_reset( ww->txn );
ww->flag = 1;
}
}
int
mdb_search( Operation *op, SlapReply *rs )
{
@ -335,6 +350,8 @@ mdb_search( Operation *op, SlapReply *rs )
int tentries = 0;
IdScopes isc;
MDB_cursor *mci, *mcd;
ww_ctx wwctx;
slap_callback cb = { 0 };
mdb_op_info opinfo = {{{0}}}, *moi = &opinfo;
MDB_txn *ltid = NULL;
@ -673,6 +690,13 @@ dn2entry_retry:
id = mdb_idl_first( candidates, &cursor );
}
cb.sc_writewait = mdb_writewait;
cb.sc_private = &wwctx;
wwctx.flag = 0;
wwctx.txn = ltid;
cb.sc_next = op->o_callback;
op->o_callback = &cb;
while (id != NOID)
{
int scopeok;
@ -935,6 +959,10 @@ notfound:
rs->sr_flags = 0;
send_search_reference( op, rs );
if ( wwctx.flag ) {
wwctx.flag = 0;
mdb_txn_renew( ltid );
}
mdb_entry_return( op, e );
rs->sr_entry = NULL;
@ -972,6 +1000,10 @@ notfound:
rs->sr_flags = 0;
rs->sr_err = LDAP_SUCCESS;
rs->sr_err = send_search_entry( op, rs );
if ( wwctx.flag ) {
wwctx.flag = 0;
mdb_txn_renew( ltid );
}
rs->sr_attrs = NULL;
rs->sr_entry = NULL;
if (e != base)
@ -1047,6 +1079,17 @@ loop_continue:
id = mdb_idl_next( candidates, &cursor );
}
}
/* remove our writewait callback */
{
slap_callback **scp = &op->o_callback;
while ( *scp ) {
if ( *scp == &cb ) {
*scp = cb.sc_next;
cb.sc_private = NULL;
break;
}
}
}
nochange:
rs->sr_ctrls = NULL;
@ -1062,6 +1105,17 @@ nochange:
rs->sr_err = LDAP_SUCCESS;
done:
if ( cb.sc_private ) {
/* remove our writewait callback */
slap_callback **scp = &op->o_callback;
while ( *scp ) {
if ( *scp == &cb ) {
*scp = cb.sc_next;
cb.sc_private = NULL;
break;
}
}
}
mdb_cursor_close( mcd );
mdb_cursor_close( mci );
if ( moi == &opinfo ) {

View file

@ -293,6 +293,22 @@ rs_entry2modifiable( Operation *op, SlapReply *rs, slap_overinst *on )
return 1;
}
/* Check for any callbacks that want to be informed about being blocked
* on output. These callbacks are expected to leave the callback list
* unmodified. Their result is ignored.
*/
static void
slap_writewait_play(
Operation *op )
{
slap_callback *sc = op->o_callback;
for ( ; sc; sc = sc->sc_next ) {
if ( sc->sc_writewait )
sc->sc_writewait( op, sc );
}
}
static long send_ldap_ber(
Operation *op,
BerElement *ber )
@ -369,6 +385,7 @@ fail:
conn->c_writewaiter = 1;
ldap_pvt_thread_mutex_unlock( &conn->c_write1_mutex );
ldap_pvt_thread_pool_idle( &connection_pool );
slap_writewait_play( op );
err = slapd_wait_writer( conn->c_sd );
conn->c_writewaiter = 0;
ldap_pvt_thread_pool_unidle( &connection_pool );

View file

@ -2374,10 +2374,14 @@ struct BackendInfo {
typedef int (slap_response)( Operation *, SlapReply * );
struct slap_callback;
typedef void (slap_writewait)( Operation *, struct slap_callback * );
typedef struct slap_callback {
struct slap_callback *sc_next;
slap_response *sc_response;
slap_response *sc_cleanup;
slap_writewait *sc_writewait;
void *sc_private;
} slap_callback;