ITS#6152 preliminary refresh support, untested

This commit is contained in:
Howard Chu 2009-08-17 12:02:48 +00:00
parent 0132ea425d
commit a5cfab44d7

View file

@ -2664,6 +2664,206 @@ get_attr_set(
return i;
}
/* Refresh a cached query:
* 1: Replay the query on the remote DB and merge each entry into
* the local DB. Remember the DNs of each remote entry.
* 2: Search the local DB for all entries matching this queryID.
* Delete any entry whose DN is not in the list from (1).
*/
typedef struct dnlist {
struct dnlist *next;
struct berval dn;
} dnlist;
typedef struct refresh_info {
dnlist *ri_dns;
dnlist *ri_tail;
dnlist *ri_dels;
BackendDB *ri_be;
CachedQuery *ri_q;
} refresh_info;
static dnlist *dnl_alloc( Operation *op, struct berval *bvdn )
{
dnlist *dn = op->o_tmpalloc( sizeof(dnlist) + bvdn->bv_len + 1,
op->o_tmpmemctx );
dn->dn.bv_len = bvdn->bv_len;
dn->dn.bv_val = (char *)(dn+1);
AC_MEMCPY( dn->dn.bv_val, bvdn->bv_val, dn->dn.bv_len );
dn->dn.bv_val[dn->dn.bv_len] = '\0';
return dn;
}
static int
refresh_merge( Operation *op, SlapReply *rs )
{
if ( rs->sr_type == REP_SEARCH ) {
refresh_info *ri = op->o_callback->sc_private;
BackendDB *be = op->o_bd;
Entry *e;
dnlist *dn;
slap_callback *ocb;
int rc;
ocb = op->o_callback;
/* Find local entry, merge */
op->o_bd = ri->ri_be;
rc = be_entry_get_rw( op, &rs->sr_entry->e_nname, NULL, NULL, 0, &e );
if ( rc != LDAP_SUCCESS || e == NULL ) {
/* No local entry, just add it. FIXME: we are not checking
* the cache entry limit here
*/
merge_entry( op, rs->sr_entry, &ri->ri_q->q_uuid );
} else {
/* Entry exists, update it */
Entry ne;
Attribute *a, **b;
Modifications *modlist, *mods;
const char* text = NULL;
char textbuf[SLAP_TEXT_BUFLEN];
size_t textlen = sizeof(textbuf);
slap_callback cb = { NULL, slap_null_cb, NULL, NULL };
ne = *e;
b = &ne.e_attrs;
/* Get a copy of only the attrs we requested */
for ( a=e->e_attrs; a; a=a->a_next ) {
if ( ad_inlist( a->a_desc, rs->sr_attrs )) {
*b = attr_alloc( a->a_desc );
*(*b) = *a;
b = &((*b)->a_next);
}
}
*b = NULL;
slap_entry2mods( &ne, &modlist, &text, textbuf, textlen );
syncrepl_diff_entry( op, ne.e_attrs, rs->sr_entry->e_attrs,
&mods, &modlist, 0 );
be_entry_release_r( op, e );
op->o_tag = LDAP_REQ_MODIFY;
op->orm_modlist = mods;
op->o_callback = &cb;
op->o_bd->be_modify( op, rs );
slap_mods_free( mods, 1 );
}
/* Add DN to list */
dn = dnl_alloc( op, &rs->sr_entry->e_nname );
dn->next = NULL;
if ( ri->ri_tail ) {
ri->ri_tail->next = dn;
} else {
ri->ri_dns = dn;
}
ri->ri_tail = dn;
op->o_callback = ocb;
}
return 0;
}
static int
refresh_purge( Operation *op, SlapReply *rs )
{
if ( rs->sr_type == REP_SEARCH ) {
refresh_info *ri = op->o_callback->sc_private;
dnlist **dn;
int del = 1;
/* Did the entry exist on the remote? */
for ( dn=&ri->ri_dns; *dn; dn = &(*dn)->next ) {
if ( dnmatch( &(*dn)->dn, &rs->sr_entry->e_nname )) {
dnlist *dnext = (*dn)->next;
op->o_tmpfree( *dn, op->o_tmpmemctx );
*dn = dnext;
del = 0;
break;
}
}
/* No, so put it on the list to delete */
if ( del ) {
dnlist *dnl = dnl_alloc( op, &rs->sr_entry->e_nname );
dnl->next = ri->ri_dels;
ri->ri_dels = dnl;
}
}
return 0;
}
static int
refresh_query( Operation *op, SlapReply *rs, CachedQuery *query,
slap_overinst *on )
{
slap_callback cb = { 0 };
refresh_info ri = { 0 };
char filter_str[ LDAP_LUTIL_UUIDSTR_BUFSIZE + STRLENOF( "(queryId=)" ) ];
AttributeAssertion ava = ATTRIBUTEASSERTION_INIT;
Filter filter = {LDAP_FILTER_EQUALITY};
dnlist *dn;
int i, rc;
cb.sc_response = refresh_merge;
cb.sc_private = &ri;
/* cache DB */
ri.ri_be = op->o_bd;
ri.ri_q = query;
op->o_tag = LDAP_REQ_SEARCH;
op->o_protocol = LDAP_VERSION3;
op->o_callback = &cb;
op->o_do_not_cache = 1;
op->o_req_dn = query->qbase->base;
op->o_req_ndn = query->qbase->base;
op->ors_scope = query->scope;
op->ors_slimit = SLAP_NO_LIMIT;
op->ors_tlimit = SLAP_NO_LIMIT;
op->ors_filter = query->filter;
filter2bv_x( op, query->filter, &op->ors_filterstr );
op->ors_attrs = query->qtemp->t_attrs.attrs;
op->ors_attrsonly = 0;
op->o_bd = on->on_info->oi_origdb;
rc = op->o_bd->be_search( op, rs );
if ( rc ) {
op->o_bd = ri.ri_be;
goto leave;
}
/* Get the DNs of all entries matching this query */
cb.sc_response = refresh_purge;
op->o_bd = ri.ri_be;
op->o_req_dn = op->o_bd->be_suffix[0];
op->o_req_ndn = op->o_bd->be_nsuffix[0];
op->ors_scope = LDAP_SCOPE_SUBTREE;
op->ors_deref = LDAP_DEREF_NEVER;
op->ors_filterstr.bv_len = snprintf(filter_str, sizeof(filter_str),
"(%s=%s)", ad_queryId->ad_cname.bv_val, query->q_uuid.bv_val);
filter.f_ava = &ava;
filter.f_av_desc = ad_queryId;
filter.f_av_value = query->q_uuid;
op->ors_attrs = slap_anlist_no_attrs;
op->ors_attrsonly = 1;
rs->sr_entry = NULL;
rs->sr_nentries = 0;
rc = op->o_bd->be_search( op, rs );
if ( rc ) goto leave;
op->o_tag = LDAP_REQ_DELETE;
while (( dn = ri.ri_dels )) {
op->o_req_dn = dn->dn;
op->o_req_ndn = dn->dn;
op->o_bd->be_delete( op, rs );
ri.ri_dels = dn->next;
op->o_tmpfree( dn, op->o_tmpmemctx );
}
leave:
/* reset our local heap, we're done with it */
slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, op->o_threadctx, 1 );
return rc;
}
static void*
consistency_check(
void *ctx,
@ -2720,27 +2920,14 @@ consistency_check(
* expiration has been hit, then skip the refresh since
* we're just going to discard the result anyway.
*/
if ( query->refcnt ) {
if ( query->refcnt )
query->expiry_time = op->o_time + templ->ttl;
} else if ( query->expiry_time < op->o_time ) {
goto expire;
if ( query->expiry_time > op->o_time ) {
refresh_query( op, &rs, query, on );
continue;
}
/* Do the refresh */
/* FIXME: and then a miracle occurs...
* the next step requires firing off a search request
* for a single entry and merging the result into any
* already existing entry in the DB. This is similar
* to what syncrepl already does, but using the wrong
* data structures. Need to see what refactoring can be
* done to share the code...
*/
query->refcnt = 0;
continue;
}
expire:
if (query->expiry_time < op->o_time) {
int rem = 0;
Debug( pcache_debug, "Lock CR index = %p\n",