Use long-lived per-thread TXNs for loading entries from DB.

Make sure only one thread tries to load a given entry at a time.
Bump up the DB environment's max TXNs to accomodate the new TXNs.
This commit is contained in:
Howard Chu 2004-07-09 17:00:07 +00:00
parent b8113c5df9
commit ebc8bb0d83
3 changed files with 121 additions and 22 deletions

View file

@ -98,6 +98,7 @@ typedef struct bdb_entry_info {
#define CACHE_ENTRY_NO_KIDS 2
#define CACHE_ENTRY_NOT_LINKED 4
#define CACHE_ENTRY_NO_GRANDKIDS 8
#define CACHE_ENTRY_LOADING 0x10
/*
* remaining fields require backend cache lock to access

View file

@ -31,6 +31,8 @@ static int bdb_cache_delete_internal(Cache *cache, EntryInfo *e);
static void bdb_lru_print(Cache *cache);
#endif
static int bdb_txn_get( Operation *op, DB_ENV *env, DB_TXN **txn );
static EntryInfo *
bdb_cache_entryinfo_new( Cache *cache )
{
@ -655,6 +657,12 @@ again: ldap_pvt_thread_rdwr_rlock( &bdb->bi_cache.c_rwlock );
if ( (*eip)->bei_state & CACHE_ENTRY_DELETED ) {
rc = DB_NOTFOUND;
} else {
int load = 0;
/* Make sure only one thread tries to load the entry */
load1: if ( !(*eip)->bei_e && !((*eip)->bei_state & CACHE_ENTRY_LOADING)) {
load = 1;
(*eip)->bei_state |= CACHE_ENTRY_LOADING;
}
if ( islocked ) {
bdb_cache_entryinfo_unlock( *eip );
islocked = 0;
@ -664,33 +672,67 @@ again: ldap_pvt_thread_rdwr_rlock( &bdb->bi_cache.c_rwlock );
rc = DB_NOTFOUND;
bdb_cache_entry_db_unlock( bdb->bi_dbenv, lock );
} else if ( rc == 0 ) {
if ( !(*eip)->bei_e ) {
bdb_cache_entry_db_relock( bdb->bi_dbenv, locker,
if ( load ) {
DB_TXN *ltid;
u_int32_t locker2 = locker;
/* We don't wrap entire read operations in txn's, but
* we need our cache entry lock and any DB page locks
* to be associated, in order for deadlock detection
* to work properly. So we use a long-lived per-thread
* txn for this step.
*/
if ( !tid ) {
rc = bdb_txn_get( op, bdb->bi_dbenv, &ltid );
if ( ltid )
locker2 = TXN_ID( ltid );
} else {
ltid = tid;
}
/* Give up original read lock, obtain write lock with
* (possibly) new locker ID.
*/
bdb_cache_entry_db_relock( bdb->bi_dbenv, locker2,
*eip, 1, 0, lock );
if (!ep) {
rc = bdb_id2entry( op->o_bd, tid, id, &ep );
rc = bdb_id2entry( op->o_bd, ltid, id, &ep );
}
if ( rc == 0 ) {
/* Make sure no other modifier beat us to it */
if ( (*eip)->bei_e ) {
bdb_entry_return( ep );
ep = NULL;
ep->e_private = *eip;
#ifdef BDB_HIER
/* Check for subtree renames */
rc = bdb_fix_dn( (*eip)->bei_e, 1 );
if ( rc ) rc = bdb_fix_dn( (*eip)->bei_e, 2 );
bdb_fix_dn( ep, 0 );
#endif
} else {
ep->e_private = *eip;
#ifdef BDB_HIER
bdb_fix_dn( ep, 0 );
#endif
(*eip)->bei_e = ep;
ep = NULL;
}
(*eip)->bei_e = ep;
ep = NULL;
}
bdb_cache_entry_db_relock( bdb->bi_dbenv, locker,
*eip, 0, 0, lock );
(*eip)->bei_state ^= CACHE_ENTRY_LOADING;
if ( rc == 0 ) {
/* If we succeeded, downgrade back to a readlock. */
bdb_cache_entry_db_relock( bdb->bi_dbenv, locker,
*eip, 0, 0, lock );
} else {
/* Otherwise, release the lock. */
bdb_cache_entry_db_unlock( bdb->bi_dbenv, lock );
}
if ( !tid ) {
/* If we're using the per-thread txn, release all
* of its page locks now.
*/
DB_LOCKREQ list;
list.op = DB_LOCK_PUT_ALL;
list.obj = NULL;
bdb->bi_dbenv->lock_vec( bdb->bi_dbenv, locker2,
0, &list, 1, NULL );
}
} else if ( !(*eip)->bei_e ) {
/* Some other thread is trying to load the entry,
* give it a chance to finish.
*/
bdb_cache_entry_db_unlock( bdb->bi_dbenv, lock );
ldap_pvt_thread_yield();
bdb_cache_entryinfo_lock( *eip );
islocked = 1;
goto load1;
#ifdef BDB_HIER
} else {
/* Check for subtree renames
@ -1113,6 +1155,60 @@ bdb_lru_print( Cache *cache )
}
#endif
static void
bdb_txn_free( void *key, void *data )
{
DB_TXN *txn = data;
TXN_ABORT( txn );
}
/* Obtain a long-lived transaction for the current thread */
static int
bdb_txn_get( Operation *op, DB_ENV *env, DB_TXN **txn )
{
int i, rc, lockid;
void *ctx, *data;
/* If no op was provided, try to find the ctx anyway... */
if ( op ) {
ctx = op->o_threadctx;
} else {
ctx = ldap_pvt_thread_pool_context();
}
/* Shouldn't happen unless we're single-threaded */
if ( !ctx ) {
*txn = NULL;
return -1;
}
if ( ldap_pvt_thread_pool_getkey( ctx, ((char *)env)+1, &data, NULL ) ) {
for ( i=0, rc=1; rc != 0 && i<4; i++ ) {
rc = TXN_BEGIN( env, NULL, txn, 0 );
if (rc) ldap_pvt_thread_yield();
}
if ( rc != 0) {
return rc;
}
if ( ( rc = ldap_pvt_thread_pool_setkey( ctx, ((char *)env)+1,
*txn, bdb_txn_free ) ) ) {
TXN_ABORT( *txn );
#ifdef NEW_LOGGING
LDAP_LOG( BACK_BDB, ERR, "bdb_txn_get: err %s(%d)\n",
db_strerror(rc), rc, 0 );
#else
Debug( LDAP_DEBUG_ANY, "bdb_txn_get: err %s(%d)\n",
db_strerror(rc), rc, 0 );
#endif
return rc;
}
} else {
*txn = data;
}
return 0;
}
#ifdef BDB_REUSE_LOCKERS
static void
bdb_locker_id_free( void *key, void *data )
@ -1133,10 +1229,9 @@ bdb_locker_id_free( void *key, void *data )
"bdb_locker_id_free: %d err %s(%d)\n",
lockid, db_strerror(rc), rc );
#endif
memset( &lr, 0, sizeof(lr) );
/* release all locks held by this locker. */
lr.op = DB_LOCK_PUT_ALL;
lr.obj = NULL;
env->lock_vec( env, lockid, 0, &lr, 1, NULL );
XLOCK_ID_FREE( env, lockid );
}

View file

@ -196,6 +196,9 @@ bdb_db_open( BackendDB *be )
bdb->bi_dbenv->set_errcall( bdb->bi_dbenv, bdb_errcall );
bdb->bi_dbenv->set_lk_detect( bdb->bi_dbenv, bdb->bi_lock_detect );
/* One long-lived TXN per thread, two TXNs per write op */
bdb->bi_dbenv->set_tx_max( bdb->bi_dbenv, connection_pool_max * 3 );
if ( bdb->bi_idl_cache_max_size ) {
bdb->bi_idl_tree = NULL;
ldap_pvt_thread_rdwr_init( &bdb->bi_idl_tree_rwlock );