ITS#6577 real fix: keep dn2id cursor open until we re-acquire lock

on entryInfo, otherwise BDB locks in reader txn are released too soon
This commit is contained in:
Howard Chu 2010-07-01 16:38:32 +00:00
parent c4f0c2cc2b
commit 3b2cef17eb
3 changed files with 14 additions and 67 deletions

View file

@ -442,7 +442,7 @@ bdb_cache_find_ndn(
ei.bei_parent = eip;
ei2 = (EntryInfo *)avl_find( eip->bei_kids, &ei, bdb_rdn_cmp );
if ( !ei2 ) {
DB_LOCK lock;
DBC *cursor;
int len = ei.bei_nrdn.bv_len;
if ( BER_BVISEMPTY( ndn )) {
@ -458,12 +458,12 @@ bdb_cache_find_ndn(
BDB_LOG_PRINTF( bdb->bi_dbenv, NULL, "slapd Reading %s",
ei.bei_nrdn.bv_val );
lock.mode = DB_LOCK_NG;
rc = bdb_dn2id( op, &ei.bei_nrdn, &ei, txn, &lock );
cursor = NULL;
rc = bdb_dn2id( op, &ei.bei_nrdn, &ei, txn, &cursor );
if (rc) {
bdb_cache_entryinfo_lock( eip );
eip->bei_finders--;
bdb_cache_entry_db_unlock( bdb, &lock );
if ( cursor ) cursor->c_close( cursor );
*res = eip;
return rc;
}
@ -477,7 +477,7 @@ bdb_cache_find_ndn(
/* add_internal left eip and c_rwlock locked */
eip->bei_finders--;
ldap_pvt_thread_rdwr_wunlock( &bdb->bi_cache.c_rwlock );
bdb_cache_entry_db_unlock( bdb, &lock );
if ( cursor ) cursor->c_close( cursor );
if ( rc ) {
*res = eip;
return rc;

View file

@ -23,36 +23,6 @@
#include "idl.h"
#include "lutil.h"
#if 0
#define bdb_dn2id_lock BDB_SYMBOL(dn2id_lock)
static int
bdb_dn2id_lock( struct bdb_info *bdb, struct berval *dn,
int rw, DB_TXN *txn, DB_LOCK *lock )
{
int rc;
DBT lockobj;
int db_rw;
if (!txn)
return 0;
if (rw)
db_rw = DB_LOCK_WRITE;
else
db_rw = DB_LOCK_READ;
lockobj.data = dn->bv_val;
lockobj.size = dn->bv_len;
rc = LOCK_GET(bdb->bi_dbenv, TXN_ID(txn), DB_LOCK_NOWAIT,
&lockobj, db_rw, lock);
return rc;
}
#else
#define bdb_dn2id_lock(a,b,c,d,e) 0
#endif
#ifndef BDB_HIER
int
bdb_dn2id_add(
@ -199,10 +169,6 @@ bdb_dn2id_delete(
AC_MEMCPY( ptr.bv_val, e->e_nname.bv_val, e->e_nname.bv_len );
ptr.bv_val[ptr.bv_len] = '\0';
/* We hold this lock until the TXN completes */
rc = bdb_dn2id_lock( bdb, &e->e_nname, 1, txn, &lock );
if ( rc ) goto done;
/* delete it */
rc = db->del( db, txn, &key, 0 );
if( rc != 0 ) {
@ -285,11 +251,10 @@ bdb_dn2id(
struct berval *dn,
EntryInfo *ei,
DB_TXN *txn,
DB_LOCK *lock )
DBC **cursor )
{
struct bdb_info *bdb = (struct bdb_info *) op->o_bd->be_private;
DB *db = bdb->bi_dn2id->bdi_db;
DBC *cursor;
int rc;
DBT key, data;
ID nid;
@ -308,18 +273,11 @@ bdb_dn2id(
data.ulen = sizeof(ID);
data.flags = DB_DBT_USERMEM;
rc = db->cursor( db, txn, &cursor, bdb->bi_db_opflags );
if ( rc ) goto func_leave;
rc = bdb_dn2id_lock( bdb, dn, 0, txn, lock );
if ( rc ) goto nolock;
rc = db->cursor( db, txn, cursor, bdb->bi_db_opflags );
/* fetch it */
rc = cursor->c_get( cursor, &key, &data, DB_SET );
nolock:
cursor->c_close( cursor );
func_leave:
if ( !rc )
rc = (*cursor)->c_get( *cursor, &key, &data, DB_SET );
if( rc != 0 ) {
Debug( LDAP_DEBUG_TRACE, "<= bdb_dn2id: get failed: %s (%d)\n",
@ -686,10 +644,6 @@ hdb_dn2id_delete(
rc = db->cursor( db, txn, &cursor, bdb->bi_db_opflags );
if ( rc ) goto func_leave;
/* We hold this lock until the TXN completes */
rc = bdb_dn2id_lock( bdb, &e->e_nname, 1, txn, &lock );
if ( rc ) goto nolock;
/* Delete our ID from the parent's list */
rc = cursor->c_get( cursor, &key, &data, DB_GET_BOTH_RANGE );
if ( rc == 0 ) {
@ -711,7 +665,6 @@ hdb_dn2id_delete(
rc = cursor->c_del( cursor, 0 );
}
nolock:
cursor->c_close( cursor );
func_leave:
op->o_tmpfree( d, op->o_tmpmemctx );
@ -749,12 +702,11 @@ hdb_dn2id(
struct berval *in,
EntryInfo *ei,
DB_TXN *txn,
DB_LOCK *lock )
DBC **cursor )
{
struct bdb_info *bdb = (struct bdb_info *) op->o_bd->be_private;
DB *db = bdb->bi_dn2id->bdi_db;
DBT key, data;
DBC *cursor;
int rc = 0, nrlen;
diskNode *d;
char *ptr;
@ -780,7 +732,7 @@ hdb_dn2id(
data.dlen = data.ulen;
data.flags = DB_DBT_USERMEM | DB_DBT_PARTIAL;
rc = db->cursor( db, txn, &cursor, bdb->bi_db_opflags );
rc = db->cursor( db, txn, cursor, bdb->bi_db_opflags );
if ( rc ) return rc;
d = op->o_tmpalloc( data.size * 3, op->o_tmpmemctx );
@ -792,10 +744,7 @@ hdb_dn2id(
*ptr = '\0';
data.data = d;
rc = bdb_dn2id_lock( bdb, in, 0, txn, lock );
if ( rc ) goto func_leave;
rc = cursor->c_get( cursor, &key, &data, DB_GET_BOTH_RANGE );
rc = (*cursor)->c_get( *cursor, &key, &data, DB_GET_BOTH_RANGE );
if ( rc == 0 && (dlen[1] != d->nrdnlen[1] || dlen[0] != d->nrdnlen[0] ||
strncmp( d->nrdn, in->bv_val, nrlen ))) {
rc = DB_NOTFOUND;
@ -812,13 +761,11 @@ hdb_dn2id(
/* FIXME: do we need to lock the parent
* entryinfo? Seems safe...
*/
cursor->c_count( cursor, &dkids, 0 );
(*cursor)->c_count( *cursor, &dkids, 0 );
ei->bei_parent->bei_dkids = dkids;
}
}
func_leave:
cursor->c_close( cursor );
op->o_tmpfree( d, op->o_tmpmemctx );
if( rc != 0 ) {
Debug( LDAP_DEBUG_TRACE, "<= hdb_dn2id: get failed: %s (%d)\n",

View file

@ -106,7 +106,7 @@ int bdb_dn2id(
struct berval *dn,
EntryInfo *ei,
DB_TXN *txn,
DB_LOCK *lock );
DBC **cursor );
int bdb_dn2id_add(
Operation *op,