Support multiple sync replication at the consumer :

1) simultaneous operation of multiple active sync replication threads
2) cookie management for individual sync replication thread
   (include rid=%3d to the slapd cookie command line option (-c))
This commit is contained in:
Jong Hyuk Choi 2003-11-26 19:49:47 +00:00
parent 944f6a9903
commit 1fdda703e6
23 changed files with 445 additions and 195 deletions

View file

@ -1164,7 +1164,7 @@ replication engine.
.B id
identifies the current
.B syncrepl
directive within the database.
directive within the replication consumer site.
It is a non-negative integer having no more than three digits.
.B provider
specifies the replication provider site containing the master content

View file

@ -244,10 +244,10 @@ do_add( Operation *op, SlapReply *rs )
/* do the update here */
int repl_user = be_isupdate(op->o_bd, &op->o_ndn );
#ifndef SLAPD_MULTIMASTER
if ( !op->o_bd->be_syncinfo &&
if ( LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo ) &&
( !op->o_bd->be_update_ndn.bv_len || repl_user ))
#else
if ( !op->o_bd->be_syncinfo )
if ( LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo ))
#endif
{
int update = op->o_bd->be_update_ndn.bv_len;
@ -328,8 +328,13 @@ do_add( Operation *op, SlapReply *rs )
}
#endif /* LDAP_SLAPI */
if ( op->o_bd->be_syncinfo ) {
defref = op->o_bd->be_syncinfo->si_provideruri_bv;
if ( !LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
syncinfo_t *si;
LDAP_STAILQ_FOREACH( si, &op->o_bd->be_syncinfo, si_next ) {
struct berval tmpbv;
ber_dupbv( &tmpbv, &si->si_provideruri_bv[0] );
ber_bvarray_add( &defref, &tmpbv );
}
} else {
defref = op->o_bd->be_update_refs
? op->o_bd->be_update_refs : default_referral;

View file

@ -436,7 +436,7 @@ retry: /* transaction retry */
goto return_results;
}
if ( !op->o_bd->be_syncinfo ) {
if ( LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
rc = bdb_csn_commit( op, rs, ltid, ei, &suffix_ei,
&ctxcsn_e, &ctxcsn_added, locker );
switch ( rc ) {
@ -481,9 +481,10 @@ retry: /* transaction retry */
suffix_ei = op->oq_add.rs_e->e_private;
}
if ( !op->o_bd->be_syncinfo ) {
if ( LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
if ( ctxcsn_added ) {
bdb_cache_add( bdb, suffix_ei, ctxcsn_e, (struct berval *)&slap_ldapsync_cn_bv, locker );
bdb_cache_add( bdb, suffix_ei, ctxcsn_e,
(struct berval *)&slap_ldapsync_cn_bv, locker );
}
}

View file

@ -263,20 +263,88 @@ bdb_get_commit_csn(
int ctxcsn_added = 0;
int rc;
struct sync_cookie syncCookie = { NULL, -1, NULL};
syncinfo_t *si;
if ( op->o_sync_mode != SLAP_SYNC_NONE ) {
if ( op->o_bd->be_syncinfo ) {
char substr[67];
struct berval bv;
sprintf( substr, "cn=syncrepl%d", op->o_bd->be_syncinfo->si_id );
if ( op->o_sync_mode != SLAP_SYNC_NONE &&
!LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
char substr[67];
struct berval bv;
LDAP_STAILQ_FOREACH( si, &op->o_bd->be_syncinfo, si_next ) {
sprintf( substr, "cn=syncrepl%d", si->si_id );
ber_str2bv( substr, 0, 0, &bv );
build_new_dn( &ctxcsn_ndn, &op->o_bd->be_nsuffix[0], &bv, NULL );
} else {
build_new_dn( &ctxcsn_ndn, &op->o_bd->be_nsuffix[0],
(struct berval *)&slap_ldapsync_cn_bv, NULL );
}
ctxcsn_retry :
consumer_ctxcsn_retry :
rs->sr_err = bdb_dn2entry( op, NULL, &ctxcsn_ndn, &ctxcsn_ei,
0, locker, ctxcsn_lock );
switch(rs->sr_err) {
case 0:
ch_free( ctxcsn_ndn.bv_val );
ctxcsn_ndn.bv_val = NULL;
if ( ctxcsn_ei ) {
ctxcsn_e = ctxcsn_ei->bei_e;
}
break;
case LDAP_BUSY:
ch_free( ctxcsn_ndn.bv_val );
LOCK_ID_FREE (bdb->bi_dbenv, locker );
return LDAP_BUSY;
case DB_LOCK_DEADLOCK:
case DB_LOCK_NOTGRANTED:
goto consumer_ctxcsn_retry;
case DB_NOTFOUND:
ch_free( ctxcsn_ndn.bv_val );
LOCK_ID_FREE( bdb->bi_dbenv, locker );
return LDAP_OTHER;
default:
ch_free( ctxcsn_ndn.bv_val );
ctxcsn_ndn.bv_val = NULL;
LOCK_ID_FREE (bdb->bi_dbenv, locker );
return LDAP_OTHER;
}
if ( ctxcsn_e ) {
csn_a = attr_find( ctxcsn_e->e_attrs,
slap_schema.si_ad_syncreplCookie );
if ( csn_a ) {
struct berval cookie;
const char *text;
int match = -1;
ber_dupbv( &cookie, &csn_a->a_vals[0] );
ber_bvarray_add( &syncCookie.octet_str, &cookie );
slap_parse_sync_cookie( &syncCookie );
if ( *search_context_csn &&
(*search_context_csn)->bv_val != NULL ) {
value_match( &match, slap_schema.si_ad_entryCSN,
slap_schema.si_ad_entryCSN->ad_type->sat_ordering,
SLAP_MR_VALUE_OF_ATTRIBUTE_SYNTAX,
&syncCookie.ctxcsn, *search_context_csn, &text );
}
if ( match < 0 ) {
/* set search_context_csn to the
smallest syncrepl cookie value */
if ( *search_context_csn ) {
ch_free( (*search_context_csn)->bv_val );
ch_free( *search_context_csn );
}
*search_context_csn = ber_dupbv( NULL,
syncCookie.ctxcsn );
}
slap_sync_cookie_free( &syncCookie, 0 );
} else {
*search_context_csn = NULL;
}
} else {
*search_context_csn = NULL;
}
}
} else if ( op->o_sync_mode != SLAP_SYNC_NONE &&
LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
build_new_dn( &ctxcsn_ndn, &op->o_bd->be_nsuffix[0],
(struct berval *)&slap_ldapsync_cn_bv, NULL );
provider_ctxcsn_retry :
rs->sr_err = bdb_dn2entry( op, NULL, &ctxcsn_ndn, &ctxcsn_ei,
0, locker, ctxcsn_lock );
switch(rs->sr_err) {
@ -286,102 +354,83 @@ ctxcsn_retry :
ctxcsn_e = ctxcsn_ei->bei_e;
}
break;
case LDAP_BUSY:
case LDAP_BUSY:
ch_free( ctxcsn_ndn.bv_val );
LOCK_ID_FREE (bdb->bi_dbenv, locker );
return LDAP_BUSY;
case DB_LOCK_DEADLOCK:
case DB_LOCK_NOTGRANTED:
goto ctxcsn_retry;
case DB_NOTFOUND:
if ( !op->o_bd->be_syncinfo ) {
snprintf( gid, sizeof( gid ), "%s-%08lx-%08lx",
bdb_uuid.bv_val, (long) op->o_connid, (long) op->o_opid );
case DB_LOCK_DEADLOCK:
case DB_LOCK_NOTGRANTED:
goto consumer_ctxcsn_retry;
case DB_NOTFOUND:
snprintf( gid, sizeof( gid ), "%s-%08lx-%08lx",
bdb_uuid.bv_val, (long) op->o_connid, (long) op->o_opid );
slap_get_csn( op, csnbuf, sizeof(csnbuf), &csn, 1 );
slap_get_csn( op, csnbuf, sizeof(csnbuf), &csn, 1 );
if ( 0 ) {
if ( 0 ) {
txn_retry:
rs->sr_err = TXN_ABORT( ltid );
if ( rs->sr_err != 0 ) {
rs->sr_err = LDAP_OTHER;
return rs->sr_err;
}
ldap_pvt_thread_yield();
bdb_trans_backoff( ++num_retries );
}
rs->sr_err = TXN_BEGIN( bdb->bi_dbenv, NULL, &ltid, bdb->bi_db_opflags );
rs->sr_err = TXN_ABORT( ltid );
if ( rs->sr_err != 0 ) {
rs->sr_err = LDAP_OTHER;
return rs->sr_err;
}
ldap_pvt_thread_yield();
bdb_trans_backoff( ++num_retries );
}
rs->sr_err = TXN_BEGIN( bdb->bi_dbenv, NULL,
&ltid, bdb->bi_db_opflags );
if ( rs->sr_err != 0 ) {
rs->sr_err = LDAP_OTHER;
return rs->sr_err;
}
rs->sr_err = bdb_csn_commit( op, rs, ltid, NULL, &suffix_ei,
&ctxcsn_e, &ctxcsn_added, locker );
switch( rs->sr_err ) {
case BDB_CSN_ABORT:
LOCK_ID_FREE( bdb->bi_dbenv, locker );
return LDAP_OTHER;
case BDB_CSN_RETRY:
goto txn_retry;
}
rs->sr_err = TXN_PREPARE( ltid, gid );
if ( rs->sr_err != 0 ) {
rs->sr_err = LDAP_OTHER;
return rs->sr_err;
}
bdb_cache_add( bdb, suffix_ei, ctxcsn_e,
(struct berval *)&slap_ldapsync_cn_bv, locker );
rs->sr_err = TXN_COMMIT( ltid, 0 );
if ( rs->sr_err != 0 ) {
rs->sr_err = LDAP_OTHER;
return rs->sr_err;
}
ctxcsn_ei = NULL;
rs->sr_err = bdb_dn2entry( op, NULL, &ctxcsn_ndn, &ctxcsn_ei,
0, locker, ctxcsn_lock );
ch_free( ctxcsn_ndn.bv_val );
if ( ctxcsn_ei ) {
ctxcsn_e = ctxcsn_ei->bei_e;
}
} else {
rs->sr_err = bdb_csn_commit( op, rs, ltid, NULL, &suffix_ei,
&ctxcsn_e, &ctxcsn_added, locker );
switch( rs->sr_err ) {
case BDB_CSN_ABORT:
LOCK_ID_FREE( bdb->bi_dbenv, locker );
return LDAP_OTHER;
case BDB_CSN_RETRY:
goto txn_retry;
}
rs->sr_err = TXN_PREPARE( ltid, gid );
if ( rs->sr_err != 0 ) {
rs->sr_err = LDAP_OTHER;
return rs->sr_err;
}
bdb_cache_add( bdb, suffix_ei, ctxcsn_e,
(struct berval *)&slap_ldapsync_cn_bv, locker );
rs->sr_err = TXN_COMMIT( ltid, 0 );
if ( rs->sr_err != 0 ) {
rs->sr_err = LDAP_OTHER;
return rs->sr_err;
}
rs->sr_err = bdb_dn2entry( op, NULL, &ctxcsn_ndn, &ctxcsn_ei,
0, locker, ctxcsn_lock );
ch_free( ctxcsn_ndn.bv_val );
if ( ctxcsn_ei ) {
ctxcsn_e = ctxcsn_ei->bei_e;
}
break;
default:
ch_free( ctxcsn_ndn.bv_val );
LOCK_ID_FREE (bdb->bi_dbenv, locker );
return LDAP_OTHER;
}
if ( ctxcsn_e ) {
if ( op->o_bd->be_syncinfo ) {
csn_a = attr_find( ctxcsn_e->e_attrs,
slap_schema.si_ad_syncreplCookie );
if ( csn_a ) {
struct berval cookie;
ber_dupbv( &cookie, &csn_a->a_vals[0] );
ber_bvarray_add( &syncCookie.octet_str, &cookie );
slap_parse_sync_cookie( &syncCookie );
*search_context_csn = ber_dupbv( NULL, syncCookie.ctxcsn );
slap_sync_cookie_free( &syncCookie, 0 );
} else {
*search_context_csn = NULL;
}
csn_a = attr_find( ctxcsn_e->e_attrs,
slap_schema.si_ad_contextCSN );
if ( csn_a ) {
*search_context_csn = ber_dupbv( NULL, &csn_a->a_vals[0] );
} else {
csn_a = attr_find( ctxcsn_e->e_attrs,
slap_schema.si_ad_contextCSN );
if ( csn_a ) {
*search_context_csn = ber_dupbv( NULL, &csn_a->a_vals[0] );
} else {
*search_context_csn = NULL;
}
*search_context_csn = NULL;
}
} else {
*search_context_csn = NULL;

View file

@ -251,8 +251,17 @@ retry: /* transaction retry */
matched = NULL;
} else {
BerVarray deref = op->o_bd->be_syncinfo ?
op->o_bd->be_syncinfo->si_provideruri_bv : default_referral;
BerVarray deref = NULL;
if ( !LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
syncinfo_t *si;
LDAP_STAILQ_FOREACH( si, &op->o_bd->be_syncinfo, si_next ) {
struct berval tmpbv;
ber_dupbv( &tmpbv, &si->si_provideruri_bv[0] );
ber_bvarray_add( &deref, &tmpbv );
}
} else {
deref = default_referral;
}
rs->sr_ref = referral_rewrite( deref, NULL, &op->o_req_dn,
LDAP_SCOPE_DEFAULT );
}
@ -260,7 +269,9 @@ retry: /* transaction retry */
rs->sr_err = LDAP_REFERRAL;
send_ldap_result( op, rs );
ber_bvarray_free( rs->sr_ref );
if ( rs->sr_ref != default_referral ) {
ber_bvarray_free( rs->sr_ref );
}
free( (char *)rs->sr_matched );
rs->sr_ref = NULL;
rs->sr_matched = NULL;
@ -472,7 +483,7 @@ retry: /* transaction retry */
ldap_pvt_thread_mutex_unlock( &bdb->bi_lastid_mutex );
#endif
if ( !op->o_bd->be_syncinfo ) {
if ( LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
rc = bdb_csn_commit( op, rs, ltid, ei, &suffix_ei,
&ctxcsn_e, &ctxcsn_added, locker );
switch ( rc ) {
@ -494,7 +505,7 @@ retry: /* transaction retry */
bdb_cache_delete( &bdb->bi_cache, e, bdb->bi_dbenv,
locker, &lock );
if ( !op->o_bd->be_syncinfo ) {
if ( LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
if ( ctxcsn_added ) {
bdb_cache_add( bdb, suffix_ei,
ctxcsn_e, (struct berval *)&slap_ldapsync_cn_bv, locker );

View file

@ -421,8 +421,17 @@ retry: /* transaction retry */
e = NULL;
} else {
BerVarray deref = op->o_bd->be_syncinfo ?
op->o_bd->be_syncinfo->si_provideruri_bv : default_referral;
BerVarray deref = NULL;
if ( !LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
syncinfo_t *si;
LDAP_STAILQ_FOREACH( si, &op->o_bd->be_syncinfo, si_next ) {
struct berval tmpbv;
ber_dupbv( &tmpbv, &si->si_provideruri_bv[0] );
ber_bvarray_add( &deref, &tmpbv );
}
} else {
deref = default_referral;
}
rs->sr_ref = referral_rewrite( deref, NULL, &op->o_req_dn,
LDAP_SCOPE_DEFAULT );
}
@ -430,7 +439,9 @@ retry: /* transaction retry */
rs->sr_err = LDAP_REFERRAL;
send_ldap_result( op, rs );
ber_bvarray_free( rs->sr_ref );
if ( rs->sr_ref != default_referral ) {
ber_bvarray_free( rs->sr_ref );
}
free( (char *)rs->sr_matched );
rs->sr_ref = NULL;
rs->sr_matched = NULL;
@ -576,7 +587,7 @@ retry: /* transaction retry */
goto return_results;
}
if ( !op->o_bd->be_syncinfo ) {
if ( LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
rc = bdb_csn_commit( op, rs, ltid, ei, &suffix_ei,
&ctxcsn_e, &ctxcsn_added, locker );
switch ( rc ) {
@ -599,7 +610,7 @@ retry: /* transaction retry */
bdb_cache_modify( e, dummy.e_attrs, bdb->bi_dbenv, locker, &lock );
if ( !op->o_bd->be_syncinfo ) {
if ( LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
if ( ctxcsn_added ) {
bdb_cache_add( bdb, suffix_ei, ctxcsn_e,
(struct berval *)&slap_ldapsync_cn_bv, locker );

View file

@ -173,8 +173,17 @@ retry: /* transaction retry */
e = NULL;
} else {
BerVarray deref = op->o_bd->be_syncinfo ?
op->o_bd->be_syncinfo->si_provideruri_bv : default_referral;
BerVarray deref = NULL;
if ( !LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
syncinfo_t *si;
LDAP_STAILQ_FOREACH( si, &op->o_bd->be_syncinfo, si_next ) {
struct berval tmpbv;
ber_dupbv( &tmpbv, &si->si_provideruri_bv[0] );
ber_bvarray_add( &deref, &tmpbv );
}
} else {
deref = default_referral;
}
rs->sr_ref = referral_rewrite( deref, NULL, &op->o_req_dn,
LDAP_SCOPE_DEFAULT );
}
@ -940,7 +949,7 @@ retry: /* transaction retry */
goto return_results;
}
if ( !op->o_bd->be_syncinfo ) {
if ( LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
rc = bdb_csn_commit( op, rs, ltid, ei, &suffix_ei,
&ctxcsn_e, &ctxcsn_added, locker );
switch ( rc ) {
@ -972,7 +981,7 @@ retry: /* transaction retry */
bdb_cache_modrdn( save, &op->orr_nnewrdn, e, neip,
bdb->bi_dbenv, locker, &lock );
if ( !op->o_bd->be_syncinfo ) {
if ( LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
if ( ctxcsn_added ) {
bdb_cache_add( bdb, suffix_ei, ctxcsn_e,
(struct berval *)&slap_ldapsync_cn_bv, locker );

View file

@ -1243,7 +1243,8 @@ id2entry_retry:
struct berval cookie;
slap_compose_sync_cookie( sop, &cookie,
search_context_csn,
sop->o_sync_state.sid );
sop->o_sync_state.sid,
sop->o_sync_state.rid );
rs->sr_err = slap_build_sync_state_ctrl( sop,
rs, e, entry_sync_state, ctrls,
num_ctrls++, 1, &cookie );
@ -1388,7 +1389,8 @@ nochange:
struct berval cookie;
slap_compose_sync_cookie( sop, &cookie,
search_context_csn,
sop->o_sync_state.sid );
sop->o_sync_state.sid,
sop->o_sync_state.rid );
if ( sync_send_present_mode ) {
rs->sr_err = LDAP_SUCCESS;
@ -1432,7 +1434,8 @@ nochange:
struct berval cookie;
slap_compose_sync_cookie( sop, &cookie,
search_context_csn,
sop->o_sync_state.sid );
sop->o_sync_state.sid,
sop->o_sync_state.rid );
if ( sync_send_present_mode ) {
slap_build_sync_done_ctrl( sop, rs, ctrls,

View file

@ -60,10 +60,19 @@ ldbm_back_delete(
cache_return_entry_r( &li->li_cache, matched );
} else {
BerVarray deref = op->o_bd->be_syncinfo ?
op->o_bd->be_syncinfo->si_provideruri_bv : default_referral;
BerVarray deref = NULL;
if ( !LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
syncinfo_t *si;
LDAP_STAILQ_FOREACH( si, &op->o_bd->be_syncinfo, si_next ) {
struct berval tmpbv;
ber_dupbv( &tmpbv, &si->si_provideruri_bv[0] );
ber_bvarray_add( &deref, &tmpbv );
}
} else {
deref = default_referral;
}
rs->sr_ref = referral_rewrite( deref, NULL, &op->o_req_dn,
LDAP_SCOPE_DEFAULT );
LDAP_SCOPE_DEFAULT );
}
ldap_pvt_thread_rdwr_wunlock(&li->li_giant_rwlock);

View file

@ -324,10 +324,19 @@ ldbm_back_modify(
: NULL;
cache_return_entry_r( &li->li_cache, matched );
} else {
BerVarray deref = op->o_bd->be_syncinfo ?
op->o_bd->be_syncinfo->si_provideruri_bv : default_referral;
BerVarray deref = NULL;
if ( !LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
syncinfo_t *si;
LDAP_STAILQ_FOREACH( si, &op->o_bd->be_syncinfo, si_next ) {
struct berval tmpbv;
ber_dupbv( &tmpbv, &si->si_provideruri_bv[0] );
ber_bvarray_add( &deref, &tmpbv );
}
} else {
deref = default_referral;
}
rs->sr_ref = referral_rewrite( deref, NULL, &op->o_req_dn,
LDAP_SCOPE_DEFAULT );
LDAP_SCOPE_DEFAULT );
}
ldap_pvt_thread_rdwr_wunlock(&li->li_giant_rwlock);

View file

@ -92,10 +92,19 @@ ldbm_back_modrdn(
: NULL;
cache_return_entry_r( &li->li_cache, matched );
} else {
BerVarray deref = op->o_bd->be_syncinfo ?
op->o_bd->be_syncinfo->si_provideruri_bv : default_referral;
BerVarray deref = NULL;
if ( !LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
syncinfo_t *si;
LDAP_STAILQ_FOREACH( si, &op->o_bd->be_syncinfo, si_next ) {
struct berval tmpbv;
ber_dupbv( &tmpbv, &si->si_provideruri_bv[0] );
ber_bvarray_add( &deref, &tmpbv );
}
} else {
deref = default_referral;
}
rs->sr_ref = referral_rewrite( deref, NULL, &op->o_req_dn,
LDAP_SCOPE_DEFAULT );
LDAP_SCOPE_DEFAULT );
}
ldap_pvt_thread_rdwr_wunlock(&li->li_giant_rwlock);

View file

@ -337,7 +337,7 @@ int backend_startup(Backend *be)
#ifndef SLAPD_MULTIMASTER
if ( backendDB[i].be_update_ndn.bv_val && (
!backendDB[i].be_update_refs &&
!backendDB[i].be_syncinfo &&
LDAP_STAILQ_EMPTY( &backendDB[i].be_syncinfo ) &&
!default_referral ) )
{
#ifdef NEW_LOGGING
@ -375,14 +375,16 @@ int backend_startup(Backend *be)
}
}
if ( backendDB[i].be_syncinfo != NULL ) {
syncinfo_t *si = ( syncinfo_t * ) backendDB[i].be_syncinfo;
si->si_be = &backendDB[i];
init_syncrepl(si);
ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
ldap_pvt_runqueue_insert( &syncrepl_rq, si->si_interval,
do_syncrepl, (void *) backendDB[i].be_syncinfo );
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
if ( !LDAP_STAILQ_EMPTY( &backendDB[i].be_syncinfo )) {
syncinfo_t *si;
LDAP_STAILQ_FOREACH( si, &backendDB[i].be_syncinfo, si_next ) {
si->si_be = &backendDB[i];
init_syncrepl( si );
ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
ldap_pvt_runqueue_insert( &syncrepl_rq,
si->si_interval, do_syncrepl, (void *) si );
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
}
}
}
@ -552,7 +554,7 @@ backend_db_init(
ldap_pvt_thread_mutex_init( &be->be_pcl_mutex );
ldap_pvt_thread_mutex_init( &be->be_context_csn_mutex );
be->be_syncinfo = NULL;
LDAP_STAILQ_INIT( &be->be_syncinfo );
/* assign a default depth limit for alias deref */
be->be_max_deref_depth = SLAPD_DEFAULT_MAXDEREFDEPTH;

View file

@ -2755,21 +2755,11 @@ add_syncrepl(
)
{
syncinfo_t *si;
syncinfo_t *si_entry;
int rc = 0;
int duplicated_replica_id = 0;
if ( be->be_syncinfo ) {
#ifdef NEW_LOGGING
LDAP_LOG( CONFIG, INFO,
"add_syncrepl: multiple syncrepl lines in a database "
"definition are yet to be supported.\n", 0, 0, 0 );
#else
Debug( LDAP_DEBUG_ANY,
"add_syncrepl: multiple syncrepl lines in a database "
"definition are yet to be supported.\n", 0, 0, 0 );
#endif
return 1;
}
si = be->be_syncinfo = (syncinfo_t *) ch_calloc( 1, sizeof( syncinfo_t ) );
si = (syncinfo_t *) ch_calloc( 1, sizeof( syncinfo_t ) );
if ( si == NULL ) {
#ifdef NEW_LOGGING
@ -2807,15 +2797,55 @@ add_syncrepl(
si->si_presentlist = NULL;
LDAP_LIST_INIT( &si->si_nonpresentlist );
if ( parse_syncrepl_line( cargv, cargc, si ) < 0 ) {
rc = parse_syncrepl_line( cargv, cargc, si );
LDAP_STAILQ_FOREACH( si_entry, &be->be_syncinfo, si_next ) {
if ( si->si_id == si_entry->si_id ) {
#ifdef NEW_LOGGING
LDAP_LOG( CONFIG, ERR,
"add_syncrepl: duplicaetd replica id\n", 0, 0,0 );
#else
Debug( LDAP_DEBUG_ANY,
"add_syncrepl: duplicated replica id\n",0, 0, 0 );
#endif
duplicated_replica_id = 1;
break;
}
}
if ( rc < 0 || duplicated_replica_id ) {
syncinfo_t *si_entry;
/* Something bad happened - back out */
#ifdef NEW_LOGGING
LDAP_LOG( CONFIG, ERR, "failed to add syncinfo\n", 0, 0,0 );
#else
Debug( LDAP_DEBUG_ANY, "failed to add syncinfo\n", 0, 0, 0 );
#endif
free( si );
be->be_syncinfo = NULL;
/* If error, remove all syncinfo */
LDAP_STAILQ_FOREACH( si_entry, &be->be_syncinfo, si_next ) {
if ( si_entry->si_updatedn.bv_val ) {
ch_free( si->si_updatedn.bv_val );
}
if ( si_entry->si_filterstr.bv_val ) {
ch_free( si->si_filterstr.bv_val );
}
if ( si_entry->si_attrs ) {
int i;
while ( si_entry->si_attrs[i] != NULL ) {
ch_free( si_entry->si_attrs[i] );
i++;
}
ch_free( si_entry->si_attrs );
}
}
while ( !LDAP_STAILQ_EMPTY( &be->be_syncinfo )) {
si_entry = LDAP_STAILQ_FIRST( &be->be_syncinfo );
LDAP_STAILQ_REMOVE_HEAD( &be->be_syncinfo, si_next );
ch_free( si_entry );
}
LDAP_STAILQ_INIT( &be->be_syncinfo );
return 1;
} else {
#ifdef NEW_LOGGING
@ -2831,6 +2861,7 @@ add_syncrepl(
be->be_flags |= SLAP_BFLAG_NO_SCHEMA_CHECK;
}
si->si_be = be;
LDAP_STAILQ_INSERT_TAIL( &be->be_syncinfo, si, si_next );
return 0;
}
}

View file

@ -192,10 +192,10 @@ do_delete(
/* do the update here */
int repl_user = be_isupdate( op->o_bd, &op->o_ndn );
#ifndef SLAPD_MULTIMASTER
if ( !op->o_bd->be_syncinfo &&
if ( LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo ) &&
( !op->o_bd->be_update_ndn.bv_len || repl_user ))
#else
if ( !op->o_bd->be_syncinfo )
if ( LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo ))
#endif
{
@ -216,8 +216,13 @@ do_delete(
#ifndef SLAPD_MULTIMASTER
} else {
BerVarray defref = NULL;
if ( op->o_bd->be_syncinfo ) {
defref = op->o_bd->be_syncinfo->si_provideruri_bv;
if ( !LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
syncinfo_t *si;
LDAP_STAILQ_FOREACH( si, &op->o_bd->be_syncinfo, si_next ) {
struct berval tmpbv;
ber_dupbv( &tmpbv, &si->si_provideruri_bv[0] );
ber_bvarray_add( &defref, &tmpbv );
}
} else {
defref = op->o_bd->be_update_refs
? op->o_bd->be_update_refs : default_referral;

View file

@ -26,4 +26,6 @@ const struct berval slap_unknown_bv = BER_BVC("unknown");
const struct berval slap_true_bv = BER_BVC("TRUE");
const struct berval slap_false_bv = BER_BVC("FALSE");
struct sync_cookie *slap_sync_cookie = NULL;
//struct sync_cookie *slap_sync_cookie = NULL;
struct slap_sync_cookie_s slap_sync_cookie =
LDAP_STAILQ_HEAD_INITIALIZER( slap_sync_cookie );

View file

@ -306,23 +306,46 @@ slap_compose_sync_cookie(
Operation *op,
struct berval *cookie,
struct berval *csn,
int sid )
int sid,
int rid )
{
char cookiestr[ LDAP_LUTIL_CSNSTR_BUFSIZE + 10 ];
char cookiestr[ LDAP_LUTIL_CSNSTR_BUFSIZE + 20 ];
if ( csn->bv_val == NULL ) {
if ( sid == -1 ) {
cookiestr[0] = '\0';
if ( rid == -1 ) {
cookiestr[0] = '\0';
} else {
snprintf( cookiestr, LDAP_LUTIL_CSNSTR_BUFSIZE + 20,
"rid=%03d", rid );
}
} else {
snprintf( cookiestr, LDAP_LUTIL_CSNSTR_BUFSIZE + 10,
if ( rid == -1 ) {
snprintf( cookiestr, LDAP_LUTIL_CSNSTR_BUFSIZE + 20,
"sid=%03d", sid );
} else {
snprintf( cookiestr, LDAP_LUTIL_CSNSTR_BUFSIZE + 20,
"sid=%03d,rid=%03d", sid, rid );
}
}
} else if ( sid == -1 ) {
snprintf( cookiestr, LDAP_LUTIL_CSNSTR_BUFSIZE + 10,
"csn=%s", csn->bv_val );
} else {
snprintf( cookiestr, LDAP_LUTIL_CSNSTR_BUFSIZE + 10,
if ( sid == -1 ) {
if ( rid == -1 ) {
snprintf( cookiestr, LDAP_LUTIL_CSNSTR_BUFSIZE + 20,
"csn=%s", csn->bv_val );
} else {
snprintf( cookiestr, LDAP_LUTIL_CSNSTR_BUFSIZE + 20,
"csn=%s,rid=%03d", csn->bv_val, rid );
}
} else {
if ( rid == -1 ) {
snprintf( cookiestr, LDAP_LUTIL_CSNSTR_BUFSIZE + 20,
"csn=%s,sid=%03d", csn->bv_val, sid );
} else {
snprintf( cookiestr, LDAP_LUTIL_CSNSTR_BUFSIZE + 20,
"csn=%s,sid=%03d,rid=%03d", csn->bv_val, sid, rid );
}
}
}
ber_str2bv( cookiestr, strlen(cookiestr), 1, cookie );
}
@ -363,6 +386,8 @@ slap_parse_sync_cookie(
int csn_str_len;
char *sid_ptr;
char *sid_str;
char *rid_ptr;
char *rid_str;
char *cval;
struct berval *ctxcsn;
@ -399,6 +424,18 @@ slap_parse_sync_cookie(
} else {
cookie->sid = -1;
}
if (( rid_ptr = strstr( cookie->octet_str->bv_val, "rid=" )) != NULL ) {
rid_str = (char *) SLAP_STRNDUP( rid_ptr,
SLAP_SYNC_RID_SIZE + sizeof("rid=") - 1 );
if ( cval = strchr( rid_str, ',' )) {
*cval = '\0';
}
cookie->rid = atoi( rid_str + sizeof("rid=") - 1 );
ch_free( rid_str );
} else {
cookie->rid = -1;
}
}
int
@ -457,6 +494,7 @@ slap_dup_sync_cookie(
}
new->sid = src->sid;
new->rid = src->rid;
if ( src->ctxcsn ) {
for ( i=0; src->ctxcsn[i].bv_val; i++ ) {

View file

@ -146,6 +146,8 @@ int main( int argc, char **argv )
int serverMode = SLAP_SERVER_MODE;
struct berval cookie = { 0, NULL };
struct sync_cookie *scp = NULL;
struct sync_cookie *scp_entry = NULL;
#ifdef CSRIMALLOC
FILE *leakfile;
@ -244,16 +246,31 @@ int main( int argc, char **argv )
case 'h': /* listen URLs */
if ( urls != NULL ) free( urls );
urls = ch_strdup( optarg );
break;
break;
case 'c': /* provide sync cookie, override if exist in replica */
if ( slap_sync_cookie ) {
slap_sync_cookie_free( slap_sync_cookie, 1 );
}
slap_sync_cookie = (struct sync_cookie *) ch_calloc( 1,
sizeof( struct sync_cookie ));
scp = (struct sync_cookie *) ch_calloc( 1,
sizeof( struct sync_cookie ));
ber_str2bv( optarg, strlen( optarg ), 1, &cookie );
ber_bvarray_add( &slap_sync_cookie->octet_str, &cookie );
ber_bvarray_add( &scp->octet_str, &cookie );
slap_parse_sync_cookie( scp );
LDAP_STAILQ_FOREACH( scp_entry, &slap_sync_cookie, sc_next ) {
if ( scp->rid == scp_entry->rid ) {
#ifdef NEW_LOGGING
LDAP_LOG( OPERATION, CRIT,
"main: duplicated replica id in cookies\n",
0, 0, 0 );
#else
Debug( LDAP_DEBUG_ANY,
"main: duplicated replica id in cookies\n",
0, 0, 0 );
#endif
slap_sync_cookie_free( scp, 1 );
goto destroy;
}
}
LDAP_STAILQ_INSERT_TAIL( &slap_sync_cookie, scp, sc_next );
break;
case 'd': /* set debug level and 'do not detach' flag */
@ -605,6 +622,12 @@ destroy:
/* remember an error during destroy */
rc |= slap_destroy();
while ( !LDAP_STAILQ_EMPTY( &slap_sync_cookie )) {
scp = LDAP_STAILQ_FIRST( &slap_sync_cookie );
LDAP_STAILQ_REMOVE_HEAD( &slap_sync_cookie, sc_next );
ch_free( scp );
}
#ifdef SLAPD_MODULES
module_kill();
#endif

View file

@ -455,10 +455,10 @@ do_modify(
* because it accepts each modify request
*/
#ifndef SLAPD_MULTIMASTER
if ( !op->o_bd->be_syncinfo &&
if ( LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo ) &&
( !op->o_bd->be_update_ndn.bv_len || repl_user ))
#else
if ( !op->o_bd->be_syncinfo )
if ( LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo ))
#endif
{
int update = op->o_bd->be_update_ndn.bv_len;
@ -503,8 +503,13 @@ do_modify(
/* send a referral */
} else {
BerVarray defref = NULL;
if ( op->o_bd->be_syncinfo ) {
defref = op->o_bd->be_syncinfo->si_provideruri_bv;
if ( !LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
syncinfo_t *si;
LDAP_STAILQ_FOREACH( si, &op->o_bd->be_syncinfo, si_next ) {
struct berval tmpbv;
ber_dupbv( &tmpbv, &si->si_provideruri_bv[0] );
ber_bvarray_add( &defref, &tmpbv );
}
} else {
defref = op->o_bd->be_update_refs
? op->o_bd->be_update_refs : default_referral;

View file

@ -353,10 +353,10 @@ do_modrdn(
/* do the update here */
int repl_user = be_isupdate( op->o_bd, &op->o_ndn );
#ifndef SLAPD_MULTIMASTER
if ( !op->o_bd->be_syncinfo &&
if ( LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo ) &&
( !op->o_bd->be_update_ndn.bv_len || repl_user ))
#else
if ( !op->o_bd->be_syncinfo )
if ( LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo ))
#endif
{
op->orr_deleteoldrdn = deloldrdn;
@ -370,8 +370,13 @@ do_modrdn(
#ifndef SLAPD_MULTIMASTER
} else {
BerVarray defref = NULL;
if ( op->o_bd->be_syncinfo ) {
defref = op->o_bd->be_syncinfo->si_provideruri_bv;
if ( !LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
syncinfo_t *si;
LDAP_STAILQ_FOREACH( si, &op->o_bd->be_syncinfo, si_next ) {
struct berval tmpbv;
ber_dupbv( &tmpbv, &si->si_provideruri_bv[0] );
ber_bvarray_add( &defref, &tmpbv );
}
} else {
defref = op->o_bd->be_update_refs
? op->o_bd->be_update_refs : default_referral;

View file

@ -57,8 +57,13 @@ int passwd_extop(
} else if( op->o_bd->be_update_ndn.bv_len ) {
/* we SHOULD return a referral in this case */
BerVarray defref = NULL;
if ( op->o_bd->be_syncinfo ) {
defref = op->o_bd->be_syncinfo->si_provideruri_bv;
if ( !LDAP_STAILQ_EMPTY( &op->o_bd->be_syncinfo )) {
syncinfo_t *si;
LDAP_STAILQ_FOREACH( si, &op->o_bd->be_syncinfo, si_next ) {
struct berval tmpbv;
ber_dupbv( &tmpbv, &si->si_provideruri_bv[0] );
ber_bvarray_add( &defref, &tmpbv );
}
} else {
defref = referral_rewrite( op->o_bd->be_update_refs,
NULL, NULL, LDAP_SCOPE_DEFAULT );

View file

@ -551,7 +551,7 @@ LDAP_SLAPD_V( const struct berval ) slap_empty_bv;
LDAP_SLAPD_V( const struct berval ) slap_unknown_bv;
LDAP_SLAPD_V( const struct berval ) slap_true_bv;
LDAP_SLAPD_V( const struct berval ) slap_false_bv;
LDAP_SLAPD_V( struct sync_cookie * ) slap_sync_cookie;
LDAP_SLAPD_V( struct slap_sync_cookie_s ) slap_sync_cookie;
/*
* index.c
@ -592,7 +592,7 @@ LDAP_SLAPD_F (int) slap_send_syncinfo LDAP_P((
Operation *, SlapReply *, int,
struct berval *, int, BerVarray, int ));
LDAP_SLAPD_F (void) slap_compose_sync_cookie LDAP_P((
Operation *, struct berval *, struct berval *, int ));
Operation *, struct berval *, struct berval *, int, int ));
LDAP_SLAPD_F (void) slap_sync_cookie_free LDAP_P((
struct sync_cookie *, int free_cookie ));
LDAP_SLAPD_F (int) slap_parse_sync_cookie LDAP_P((

View file

@ -1292,6 +1292,7 @@ typedef BackendDB Backend;
*/
#define SLAP_SYNC_SID_SIZE 3
#define SLAP_SYNC_RID_SIZE 3
#define SLAP_SYNCUUID_SET_SIZE 256
struct nonpresent_entry {
@ -1304,11 +1305,15 @@ struct sync_cookie {
struct berval *ctxcsn;
long sid;
struct berval *octet_str;
long rid;
LDAP_STAILQ_ENTRY(sync_cookie) sc_next;
};
LDAP_STAILQ_HEAD( slap_sync_cookie_s, sync_cookie );
typedef struct syncinfo_s {
struct slap_backend_db *si_be;
unsigned int si_id;
long si_id;
char *si_provideruri;
BerVarray si_provideruri_bv;
#define SYNCINFO_TLS_OFF 0
@ -1341,6 +1346,7 @@ typedef struct syncinfo_s {
Avlnode *si_presentlist;
LDAP *si_ld;
LDAP_LIST_HEAD(np, nonpresent_entry) si_nonpresentlist;
LDAP_STAILQ_ENTRY( syncinfo_s ) si_next;
} syncinfo_t;
struct slap_backend_db {
@ -1495,7 +1501,7 @@ struct slap_backend_db {
ldap_pvt_thread_mutex_t be_pcl_mutex;
struct berval be_context_csn;
ldap_pvt_thread_mutex_t be_context_csn_mutex;
syncinfo_t *be_syncinfo; /* For syncrepl */
LDAP_STAILQ_HEAD( be_si, syncinfo_s ) be_syncinfo; /* For syncrepl */
};
struct slap_conn;

View file

@ -187,9 +187,11 @@ do_syncrep1(
syncinfo_t *si )
{
int rc;
int cmdline_cookie_found = 0;
char syncrepl_cbuf[sizeof(CN_STR SYNCREPL_STR)];
struct berval syncrepl_cn_bv;
struct sync_cookie *sc = NULL;
struct sync_cookie syncCookie = { NULL, -1, NULL };
/* Init connection to master */
@ -313,14 +315,20 @@ do_syncrep1(
op->o_tmpmemctx );
op->o_req_dn = op->o_req_ndn;
if ( slap_sync_cookie != NULL ) {
/* cookie is supplied in the command line */
LDAP_STAILQ_FOREACH( sc, &slap_sync_cookie, sc_next ) {
if ( si->si_id == sc->rid ) {
cmdline_cookie_found = 1;
break;
}
}
if ( cmdline_cookie_found ) {
/* cookie is supplied in the command line */
BerVarray cookie = NULL;
struct berval cookie_bv;
LDAP_STAILQ_REMOVE( &slap_sync_cookie, sc, sync_cookie, sc_next );
slap_sync_cookie_free( &si->si_syncCookie, 0 );
slap_parse_sync_cookie( slap_sync_cookie );
/* read stored cookie if it exists */
backend_attribute( op, NULL, &op->o_req_ndn,
@ -328,36 +336,40 @@ do_syncrep1(
if ( !cookie ) {
/* no stored cookie */
if ( slap_sync_cookie->ctxcsn == NULL ||
slap_sync_cookie->ctxcsn->bv_val == NULL ) {
/* if slap_sync_cookie does not have ctxcsn component */
/* set it to an initial value */
slap_init_sync_cookie_ctxcsn( slap_sync_cookie );
if ( sc->ctxcsn == NULL ||
sc->ctxcsn->bv_val == NULL ) {
/* if cmdline cookie does not have ctxcsn */
/* component, set it to an initial value */
slap_init_sync_cookie_ctxcsn( sc );
}
slap_dup_sync_cookie( &si->si_syncCookie, slap_sync_cookie );
slap_sync_cookie_free( slap_sync_cookie, 1 );
slap_sync_cookie = NULL;
slap_dup_sync_cookie( &si->si_syncCookie, sc );
slap_sync_cookie_free( sc, 1 );
sc = NULL;
} else {
/* stored cookie */
ber_dupbv( &cookie_bv, &cookie[0] );
ber_bvarray_add( &si->si_syncCookie.octet_str, &cookie_bv );
slap_parse_sync_cookie( &si->si_syncCookie );
ber_bvarray_free_x( cookie, op->o_tmpmemctx );
if ( slap_sync_cookie->sid != -1 ) {
if ( sc->sid != -1 ) {
/* command line cookie wins */
si->si_syncCookie.sid = slap_sync_cookie->sid;
si->si_syncCookie.sid = sc->sid;
}
if ( slap_sync_cookie->ctxcsn != NULL ) {
if ( sc->ctxcsn != NULL ) {
/* command line cookie wins */
if ( si->si_syncCookie.ctxcsn ) {
ber_bvarray_free( si->si_syncCookie.ctxcsn );
si->si_syncCookie.ctxcsn = NULL;
}
ber_dupbv( &cookie_bv, &slap_sync_cookie->ctxcsn[0] );
ber_dupbv( &cookie_bv, &sc->ctxcsn[0] );
ber_bvarray_add( &si->si_syncCookie.ctxcsn, &cookie_bv );
}
slap_sync_cookie_free( slap_sync_cookie, 1 );
slap_sync_cookie = NULL;
if ( sc->rid != -1 ) {
/* command line cookie wins */
si->si_syncCookie.rid = sc->rid;
}
slap_sync_cookie_free( sc, 1 );
sc = NULL;
}
} else {
/* no command line cookie is specified */