diff --git a/servers/slapd/overlays/syncprov.c b/servers/slapd/overlays/syncprov.c index 741805bb43..7d416c980d 100644 --- a/servers/slapd/overlays/syncprov.c +++ b/servers/slapd/overlays/syncprov.c @@ -118,7 +118,6 @@ typedef struct syncmatches { /* Session log data */ typedef struct slog_entry { - struct slog_entry *se_next; struct berval se_uuid; struct berval se_csn; int se_sid; @@ -132,8 +131,7 @@ typedef struct sessionlog { int sl_num; int sl_size; int sl_playing; - slog_entry *sl_head; - slog_entry *sl_tail; + TAvlnode *sl_entries; ldap_pvt_thread_mutex_t sl_mutex; } sessionlog; @@ -428,6 +426,28 @@ sp_uuid_cmp( const void *l, const void *r ) return ber_bvcmp( left, right ); } +static int +syncprov_sessionlog_cmp( const void *l, const void *r ) +{ + const slog_entry *left = l, *right = r; + + return ber_bvcmp( &left->se_csn, &right->se_csn ); +} + +static int +syncprov_sessionlog_dup( void *o, void *n ) +{ + slog_entry *old = o, *new = n; + + /* Only time we have two modifications with same CSN is when we detect a + * rename during replication */ + /* FIXME: Does that imply a consumer coming just between we apply the mod + * and the modify might only receive the former and never hear of the + * latter? */ + return old->se_tag != LDAP_REQ_MODRDN || + new->se_tag != LDAP_REQ_MODIFY; +} + /* syncprov_findbase: * finds the true DN of the base of a search (with alias dereferencing) and * checks to make sure the base entry doesn't get replaced with a different @@ -1618,6 +1638,7 @@ syncprov_add_slog( Operation *op ) syncprov_info_t *si = on->on_bi.bi_private; sessionlog *sl; slog_entry *se; + int rc; sl = si->si_logs; { @@ -1629,13 +1650,10 @@ syncprov_add_slog( Operation *op ) */ ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); /* can only do this if no one else is reading the log at the moment */ - if (!sl->sl_playing) { - while ( (se = sl->sl_head) ) { - sl->sl_head = se->se_next; - ch_free( se ); - } - sl->sl_tail = NULL; - sl->sl_num = 0; + if ( !sl->sl_playing ) { + tavl_free( sl->sl_entries, (AVL_FREE)ch_free ); + sl->sl_num = 0; + sl->sl_entries = NULL; } ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); return; @@ -1644,7 +1662,6 @@ syncprov_add_slog( Operation *op ) /* Allocate a record. UUIDs are not NUL-terminated. */ se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + op->o_csn.bv_len + 1 ); - se->se_next = NULL; se->se_tag = op->o_tag; se->se_uuid.bv_val = (char *)(&se[1]); @@ -1669,25 +1686,7 @@ syncprov_add_slog( Operation *op ) "adding csn=%s to sessionlog, uuid=%s\n", op->o_log_prefix, se->se_csn.bv_val, uuidstr ); } - if ( sl->sl_head ) { - /* Keep the list in csn order. */ - if ( ber_bvcmp( &sl->sl_tail->se_csn, &se->se_csn ) <= 0 ) { - sl->sl_tail->se_next = se; - sl->sl_tail = se; - } else { - slog_entry **sep; - - for ( sep = &sl->sl_head; *sep; sep = &(*sep)->se_next ) { - if ( ber_bvcmp( &se->se_csn, &(*sep)->se_csn ) < 0 ) { - se->se_next = *sep; - *sep = se; - break; - } - } - } - } else { - sl->sl_head = se; - sl->sl_tail = se; + if ( !sl->sl_entries ) { if ( !sl->sl_mincsn ) { sl->sl_numcsns = 1; sl->sl_mincsn = ch_malloc( 2*sizeof( struct berval )); @@ -1697,33 +1696,36 @@ syncprov_add_slog( Operation *op ) BER_BVZERO( &sl->sl_mincsn[1] ); } } + rc = tavl_insert( &sl->sl_entries, se, syncprov_sessionlog_cmp, syncprov_sessionlog_dup ); + assert( rc == LDAP_SUCCESS ); sl->sl_num++; - if (!sl->sl_playing) { - while ( sl->sl_num > sl->sl_size ) { - int i; - se = sl->sl_head; - sl->sl_head = se->se_next; - Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " - "expiring csn=%s from sessionlog (sessionlog size=%d)\n", - op->o_log_prefix, se->se_csn.bv_val, sl->sl_num ); - for ( i=0; isl_numcsns; i++ ) - if ( sl->sl_sids[i] >= se->se_sid ) - break; - if ( i == sl->sl_numcsns || sl->sl_sids[i] != se->se_sid ) { + if ( !sl->sl_playing && sl->sl_num > sl->sl_size ) { + TAvlnode *edge = tavl_end( sl->sl_entries, TAVL_DIR_LEFT ); + while ( --sl->sl_num > sl->sl_size ) { + int i; + TAvlnode *next = tavl_next( edge, TAVL_DIR_RIGHT ); + se = edge->avl_data; Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " - "adding csn=%s to mincsn\n", - op->o_log_prefix, se->se_csn.bv_val ); - slap_insert_csn_sids( (struct sync_cookie *)sl, - i, se->se_sid, &se->se_csn ); - } else { - Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " - "updating mincsn for sid=%d csn=%s to %s\n", - op->o_log_prefix, se->se_sid, sl->sl_mincsn[i].bv_val, se->se_csn.bv_val ); - ber_bvreplace( &sl->sl_mincsn[i], &se->se_csn ); + "expiring csn=%s from sessionlog (sessionlog size=%d)\n", + op->o_log_prefix, se->se_csn.bv_val, sl->sl_num ); + for ( i=0; isl_numcsns; i++ ) + if ( sl->sl_sids[i] >= se->se_sid ) + break; + if ( i == sl->sl_numcsns || sl->sl_sids[i] != se->se_sid ) { + Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " + "adding csn=%s to mincsn\n", + op->o_log_prefix, se->se_csn.bv_val ); + slap_insert_csn_sids( (struct sync_cookie *)sl, + i, se->se_sid, &se->se_csn ); + } else { + Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " + "updating mincsn for sid=%d csn=%s to %s\n", + op->o_log_prefix, se->se_sid, sl->sl_mincsn[i].bv_val, se->se_csn.bv_val ); + ber_bvreplace( &sl->sl_mincsn[i], &se->se_csn ); + } + tavl_delete( &sl->sl_entries, edge, syncprov_sessionlog_cmp ); + edge = next; } - ch_free( se ); - sl->sl_num--; - } } ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); } @@ -1958,6 +1960,8 @@ syncprov_play_sessionlog( Operation *op, SlapReply *rs, sync_control *srs, struct berval uuid[2] = {}, csn[2] = {}; slog_entry *se; TAvlnode *entry; + char cbuf[LDAP_PVT_CSNSTR_BUFSIZE]; + struct berval delcsn[2]; ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); /* Are there any log entries, and is the consumer state @@ -2009,11 +2013,13 @@ syncprov_play_sessionlog( Operation *op, SlapReply *rs, sync_control *srs, * and everything else at the end. Do this first so we can * unlock the list mutex. */ - for ( se=sl->sl_head; se; se=se->se_next ) { + assert( sl->sl_entries ); + entry = tavl_end( sl->sl_entries, TAVL_DIR_LEFT ); + do { char uuidstr[40] = {}; + slog_entry *se = entry->avl_data; int k; - ndel = 1; for ( k=0; ksr_state.numcsns; k++ ) { if ( se->se_sid == srs->sr_state.sids[k] ) { @@ -2062,7 +2068,7 @@ syncprov_play_sessionlog( Operation *op, SlapReply *rs, sync_control *srs, op->o_log_prefix, se->se_tag == LDAP_REQ_DELETE ? "deleted" : "modified", uuidstr, csns[j].bv_val ); } - } + } while ( (entry = tavl_next( entry, TAVL_DIR_RIGHT )) != NULL ); ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); sl->sl_playing--; ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); @@ -3984,13 +3990,8 @@ syncprov_db_destroy( if ( si ) { if ( si->si_logs ) { sessionlog *sl = si->si_logs; - slog_entry *se = sl->sl_head; - while ( se ) { - slog_entry *se_next = se->se_next; - ch_free( se ); - se = se_next; - } + tavl_free( sl->sl_entries, (AVL_FREE)ch_free ); if ( sl->sl_mincsn ) ber_bvarray_free( sl->sl_mincsn ); if ( sl->sl_sids )