mirror of
https://git.openldap.org/openldap/openldap.git
synced 2026-02-18 18:18:06 -05:00
ITS#8486 Switch sessionlog to use TAVL
This commit is contained in:
parent
a7fb531e56
commit
234230f286
1 changed files with 103 additions and 82 deletions
|
|
@ -117,7 +117,6 @@ typedef struct syncmatches {
|
|||
|
||||
/* Session log data */
|
||||
typedef struct slog_entry {
|
||||
struct slog_entry *se_next;
|
||||
struct berval se_uuid;
|
||||
struct berval se_csn;
|
||||
int se_sid;
|
||||
|
|
@ -131,9 +130,8 @@ typedef struct sessionlog {
|
|||
int sl_num;
|
||||
int sl_size;
|
||||
int sl_playing;
|
||||
slog_entry *sl_head;
|
||||
slog_entry *sl_tail;
|
||||
ldap_pvt_thread_mutex_t sl_mutex;
|
||||
Avlnode *sl_entries;
|
||||
ldap_pvt_thread_rdwr_t sl_mutex;
|
||||
} sessionlog;
|
||||
|
||||
/* The main state for this overlay */
|
||||
|
|
@ -402,6 +400,25 @@ sp_avl_cmp( const void *c1, const void *c2 )
|
|||
return ber_bvcmp( &m1->mt_dn, &m2->mt_dn );
|
||||
}
|
||||
|
||||
static int
|
||||
syncprov_sessionlog_cmp( const void *l, const void *r )
|
||||
{
|
||||
const slog_entry *left = l, *right = r;
|
||||
int ret = ber_bvcmp( &left->se_csn, &right->se_csn );
|
||||
if ( !ret )
|
||||
ret = ber_bvcmp( &left->se_uuid, &right->se_uuid );
|
||||
/* Only time we have two modifications with same CSN is when we detect a
|
||||
* rename during replication.
|
||||
* We invert the test here because LDAP_REQ_MODDN is
|
||||
* numerically greater than LDAP_REQ_MODIFY but we
|
||||
* want it to occur first.
|
||||
*/
|
||||
if ( !ret )
|
||||
ret = right->se_tag - left->se_tag;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* syncprov_findbase:
|
||||
* finds the true DN of the base of a search (with alias dereferencing) and
|
||||
* checks to make sure the base entry doesn't get replaced with a different
|
||||
|
|
@ -1577,6 +1594,7 @@ syncprov_add_slog( Operation *op )
|
|||
syncprov_info_t *si = on->on_bi.bi_private;
|
||||
sessionlog *sl;
|
||||
slog_entry *se;
|
||||
int rc;
|
||||
|
||||
sl = si->si_logs;
|
||||
{
|
||||
|
|
@ -1586,24 +1604,20 @@ syncprov_add_slog( Operation *op )
|
|||
* state with respect to such operations, so we ignore them and
|
||||
* wipe out anything in the log if we see them.
|
||||
*/
|
||||
ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
|
||||
ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex );
|
||||
/* can only do this if no one else is reading the log at the moment */
|
||||
if (!sl->sl_playing) {
|
||||
while ( se = sl->sl_head ) {
|
||||
sl->sl_head = se->se_next;
|
||||
ch_free( se );
|
||||
if ( !sl->sl_playing ) {
|
||||
tavl_free( sl->sl_entries, (AVL_FREE)ch_free );
|
||||
sl->sl_num = 0;
|
||||
sl->sl_entries = NULL;
|
||||
}
|
||||
sl->sl_tail = NULL;
|
||||
sl->sl_num = 0;
|
||||
}
|
||||
ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
|
||||
ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
|
||||
return;
|
||||
}
|
||||
|
||||
/* Allocate a record. UUIDs are not NUL-terminated. */
|
||||
se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len +
|
||||
se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len +
|
||||
op->o_csn.bv_len + 1 );
|
||||
se->se_next = NULL;
|
||||
se->se_tag = op->o_tag;
|
||||
|
||||
se->se_uuid.bv_val = (char *)(&se[1]);
|
||||
|
|
@ -1616,7 +1630,7 @@ syncprov_add_slog( Operation *op )
|
|||
se->se_csn.bv_len = op->o_csn.bv_len;
|
||||
se->se_sid = slap_parse_csn_sid( &se->se_csn );
|
||||
|
||||
ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
|
||||
ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex );
|
||||
if ( LogTest( LDAP_DEBUG_SYNC ) ) {
|
||||
char uuidstr[40] = {};
|
||||
if ( !BER_BVISEMPTY( &opc->suuid ) ) {
|
||||
|
|
@ -1628,25 +1642,7 @@ syncprov_add_slog( Operation *op )
|
|||
"adding csn=%s to sessionlog, uuid=%s\n",
|
||||
op->o_log_prefix, se->se_csn.bv_val, uuidstr );
|
||||
}
|
||||
if ( sl->sl_head ) {
|
||||
/* Keep the list in csn order. */
|
||||
if ( ber_bvcmp( &sl->sl_tail->se_csn, &se->se_csn ) <= 0 ) {
|
||||
sl->sl_tail->se_next = se;
|
||||
sl->sl_tail = se;
|
||||
} else {
|
||||
slog_entry **sep;
|
||||
|
||||
for ( sep = &sl->sl_head; *sep; sep = &(*sep)->se_next ) {
|
||||
if ( ber_bvcmp( &se->se_csn, &(*sep)->se_csn ) < 0 ) {
|
||||
se->se_next = *sep;
|
||||
*sep = se;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
sl->sl_head = se;
|
||||
sl->sl_tail = se;
|
||||
if ( !sl->sl_entries ) {
|
||||
if ( !sl->sl_mincsn ) {
|
||||
sl->sl_numcsns = 1;
|
||||
sl->sl_mincsn = ch_malloc( 2*sizeof( struct berval ));
|
||||
|
|
@ -1656,35 +1652,40 @@ syncprov_add_slog( Operation *op )
|
|||
BER_BVZERO( &sl->sl_mincsn[1] );
|
||||
}
|
||||
}
|
||||
rc = tavl_insert( &sl->sl_entries, se, syncprov_sessionlog_cmp, avl_dup_error );
|
||||
assert( rc == LDAP_SUCCESS );
|
||||
sl->sl_num++;
|
||||
if (!sl->sl_playing) {
|
||||
while ( sl->sl_num > sl->sl_size ) {
|
||||
int i;
|
||||
se = sl->sl_head;
|
||||
sl->sl_head = se->se_next;
|
||||
Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: "
|
||||
"expiring csn=%s from sessionlog (sessionlog size=%d)\n",
|
||||
op->o_log_prefix, se->se_csn.bv_val, sl->sl_num );
|
||||
for ( i=0; i<sl->sl_numcsns; i++ )
|
||||
if ( sl->sl_sids[i] >= se->se_sid )
|
||||
break;
|
||||
if ( i == sl->sl_numcsns || sl->sl_sids[i] != se->se_sid ) {
|
||||
if ( !sl->sl_playing && sl->sl_num > sl->sl_size ) {
|
||||
Avlnode *edge = tavl_end( sl->sl_entries, TAVL_DIR_LEFT );
|
||||
while ( sl->sl_num > sl->sl_size ) {
|
||||
int i;
|
||||
Avlnode *next = tavl_next( edge, TAVL_DIR_RIGHT );
|
||||
se = edge->avl_data;
|
||||
Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: "
|
||||
"adding csn=%s to mincsn\n",
|
||||
op->o_log_prefix, se->se_csn.bv_val, 0 );
|
||||
slap_insert_csn_sids( (struct sync_cookie *)sl,
|
||||
i, se->se_sid, &se->se_csn );
|
||||
} else {
|
||||
Log4( LDAP_DEBUG_SYNC, ldap_syslog_level, "%s syncprov_add_slog: "
|
||||
"updating mincsn for sid=%d csn=%s to %s\n",
|
||||
op->o_log_prefix, se->se_sid, sl->sl_mincsn[i].bv_val, se->se_csn.bv_val );
|
||||
ber_bvreplace( &sl->sl_mincsn[i], &se->se_csn );
|
||||
"expiring csn=%s from sessionlog (sessionlog size=%d)\n",
|
||||
op->o_log_prefix, se->se_csn.bv_val, sl->sl_num );
|
||||
for ( i=0; i<sl->sl_numcsns; i++ )
|
||||
if ( sl->sl_sids[i] >= se->se_sid )
|
||||
break;
|
||||
if ( i == sl->sl_numcsns || sl->sl_sids[i] != se->se_sid ) {
|
||||
Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: "
|
||||
"adding csn=%s to mincsn\n",
|
||||
op->o_log_prefix, se->se_csn.bv_val, 0 );
|
||||
slap_insert_csn_sids( (struct sync_cookie *)sl,
|
||||
i, se->se_sid, &se->se_csn );
|
||||
} else {
|
||||
Log4( LDAP_DEBUG_SYNC, ldap_syslog_level, "%s syncprov_add_slog: "
|
||||
"updating mincsn for sid=%d csn=%s to %s\n",
|
||||
op->o_log_prefix, se->se_sid, sl->sl_mincsn[i].bv_val, se->se_csn.bv_val );
|
||||
ber_bvreplace( &sl->sl_mincsn[i], &se->se_csn );
|
||||
}
|
||||
tavl_delete( &sl->sl_entries, se, syncprov_sessionlog_cmp );
|
||||
ch_free( se );
|
||||
edge = next;
|
||||
sl->sl_num--;
|
||||
}
|
||||
ch_free( se );
|
||||
sl->sl_num--;
|
||||
}
|
||||
}
|
||||
ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
|
||||
ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1701,17 +1702,18 @@ playlog_cb( Operation *op, SlapReply *rs )
|
|||
/* enter with sl->sl_mutex locked, release before returning */
|
||||
static void
|
||||
syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
|
||||
sync_control *srs, BerVarray ctxcsn, int numcsns, int *sids )
|
||||
sync_control *srs, BerVarray ctxcsn, int numcsns, int *sids,
|
||||
struct berval *mincsn )
|
||||
{
|
||||
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
|
||||
slog_entry *se;
|
||||
int i, j, ndel, num, nmods, mmods;
|
||||
Avlnode *entry;
|
||||
char cbuf[LDAP_PVT_CSNSTR_BUFSIZE];
|
||||
BerVarray uuids;
|
||||
struct berval delcsn[2];
|
||||
|
||||
if ( !sl->sl_num ) {
|
||||
ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
|
||||
ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -1719,7 +1721,7 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
|
|||
i = 0;
|
||||
nmods = 0;
|
||||
sl->sl_playing++;
|
||||
ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
|
||||
ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
|
||||
|
||||
uuids = op->o_tmpalloc( (num+1) * sizeof( struct berval ) +
|
||||
num * UUID_LEN, op->o_tmpmemctx );
|
||||
|
|
@ -1729,13 +1731,31 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
|
|||
delcsn[0].bv_val = cbuf;
|
||||
BER_BVZERO(&delcsn[1]);
|
||||
|
||||
ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
|
||||
/* Make a copy of the relevant UUIDs. Put the Deletes up front
|
||||
* and everything else at the end. Do this first so we can
|
||||
* unlock the list mutex.
|
||||
* let the write side manage the sessionlog again.
|
||||
*/
|
||||
for ( se=sl->sl_head; se; se=se->se_next ) {
|
||||
assert( sl->sl_entries );
|
||||
|
||||
/* Find first relevant log entry. If greater than mincsn, backtrack one entry */
|
||||
{
|
||||
slog_entry te = {0};
|
||||
te.se_csn = *mincsn;
|
||||
entry = tavl_find3( sl->sl_entries, &te, syncprov_sessionlog_cmp, &ndel );
|
||||
}
|
||||
if ( ndel > 0 && entry )
|
||||
entry = tavl_next( entry, TAVL_DIR_LEFT );
|
||||
/* if none, just start at beginning */
|
||||
if ( !entry )
|
||||
entry = tavl_end( sl->sl_entries, TAVL_DIR_LEFT );
|
||||
|
||||
do {
|
||||
slog_entry *se = entry->avl_data;
|
||||
int k;
|
||||
|
||||
/* Make sure writes can still make progress */
|
||||
ldap_pvt_thread_rdwr_runlock( &sl->sl_mutex );
|
||||
ndel = 1;
|
||||
for ( k=0; k<srs->sr_state.numcsns; k++ ) {
|
||||
if ( se->se_sid == srs->sr_state.sids[k] ) {
|
||||
|
|
@ -1744,6 +1764,7 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
|
|||
}
|
||||
}
|
||||
if ( ndel <= 0 ) {
|
||||
ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
|
||||
continue;
|
||||
}
|
||||
ndel = 0;
|
||||
|
|
@ -1756,6 +1777,7 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
|
|||
if ( ndel > 0 ) {
|
||||
Debug( LDAP_DEBUG_SYNC, "%s syncprov_playlog: "
|
||||
"cmp %d, too new\n", op->o_log_prefix, ndel, 0 );
|
||||
ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
|
||||
break;
|
||||
}
|
||||
if ( se->se_tag == LDAP_REQ_DELETE ) {
|
||||
|
|
@ -1765,8 +1787,10 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
|
|||
delcsn[0].bv_len = se->se_csn.bv_len;
|
||||
delcsn[0].bv_val[delcsn[0].bv_len] = '\0';
|
||||
} else {
|
||||
if ( se->se_tag == LDAP_REQ_ADD )
|
||||
if ( se->se_tag == LDAP_REQ_ADD ) {
|
||||
ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
|
||||
continue;
|
||||
}
|
||||
nmods++;
|
||||
j = num - nmods;
|
||||
}
|
||||
|
|
@ -1775,7 +1799,7 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
|
|||
uuids[j].bv_len = UUID_LEN;
|
||||
|
||||
if ( LogTest( LDAP_DEBUG_SYNC ) ) {
|
||||
char uuidstr[40];
|
||||
char uuidstr[40] = {};
|
||||
lutil_uuidstr_from_normalized( uuids[j].bv_val, uuids[j].bv_len,
|
||||
uuidstr, 40 );
|
||||
Log4( LDAP_DEBUG_SYNC, ldap_syslog_level, "%s syncprov_playlog: "
|
||||
|
|
@ -1783,10 +1807,12 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
|
|||
op->o_log_prefix, se->se_tag == LDAP_REQ_DELETE ? "deleted" : "modified",
|
||||
uuidstr, delcsn[0].bv_len ? delcsn[0].bv_val : "(null)" );
|
||||
}
|
||||
}
|
||||
ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
|
||||
ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
|
||||
} while ( (entry = tavl_next( entry, TAVL_DIR_RIGHT )) != NULL );
|
||||
ldap_pvt_thread_rdwr_runlock( &sl->sl_mutex );
|
||||
ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex );
|
||||
sl->sl_playing--;
|
||||
ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
|
||||
ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
|
||||
|
||||
ndel = i;
|
||||
|
||||
|
|
@ -2826,7 +2852,7 @@ no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
|
|||
sl=si->si_logs;
|
||||
if ( sl ) {
|
||||
int do_play = 0;
|
||||
ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
|
||||
ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex );
|
||||
/* Are there any log entries, and is the consumer state
|
||||
* present in the session log?
|
||||
*/
|
||||
|
|
@ -2853,9 +2879,9 @@ no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
|
|||
if ( do_play ) {
|
||||
do_present = 0;
|
||||
/* mutex is unlocked in playlog */
|
||||
syncprov_playlog( op, rs, sl, srs, ctxcsn, numcsns, sids );
|
||||
syncprov_playlog( op, rs, sl, srs, ctxcsn, numcsns, sids, &mincsn );
|
||||
} else {
|
||||
ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
|
||||
ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
|
||||
}
|
||||
}
|
||||
/* Is the CSN still present in the database? */
|
||||
|
|
@ -3185,7 +3211,7 @@ sp_cf_gen(ConfigArgs *c)
|
|||
sl = si->si_logs;
|
||||
if ( !sl ) {
|
||||
sl = ch_calloc( 1, sizeof( sessionlog ));
|
||||
ldap_pvt_thread_mutex_init( &sl->sl_mutex );
|
||||
ldap_pvt_thread_rdwr_init( &sl->sl_mutex );
|
||||
si->si_logs = sl;
|
||||
}
|
||||
sl->sl_size = size;
|
||||
|
|
@ -3490,19 +3516,14 @@ syncprov_db_destroy(
|
|||
if ( si ) {
|
||||
if ( si->si_logs ) {
|
||||
sessionlog *sl = si->si_logs;
|
||||
slog_entry *se = sl->sl_head;
|
||||
|
||||
while ( se ) {
|
||||
slog_entry *se_next = se->se_next;
|
||||
ch_free( se );
|
||||
se = se_next;
|
||||
}
|
||||
tavl_free( sl->sl_entries, (AVL_FREE)ch_free );
|
||||
if ( sl->sl_mincsn )
|
||||
ber_bvarray_free( sl->sl_mincsn );
|
||||
if ( sl->sl_sids )
|
||||
ch_free( sl->sl_sids );
|
||||
|
||||
ldap_pvt_thread_mutex_destroy(&si->si_logs->sl_mutex);
|
||||
ldap_pvt_thread_rdwr_destroy(&si->si_logs->sl_mutex);
|
||||
ch_free( si->si_logs );
|
||||
}
|
||||
if ( si->si_ctxcsn )
|
||||
|
|
|
|||
Loading…
Reference in a new issue