Large multivalued attr support

Store attrs with a large number of values separately from the
main entry blob. Note - we need support for large DUPSORT values
for this to be generally usable.
This commit is contained in:
Howard Chu 2016-01-31 15:35:11 +00:00
parent 6bb6d5e3c6
commit a4c7943d39
10 changed files with 497 additions and 137 deletions

View file

@ -167,6 +167,24 @@ Specify the file protection mode that newly created database
files should have. files should have.
The default is 0600. The default is 0600.
.TP .TP
.BI multival_hi \ <integer>
Specify the number of values above which a multivalued attribute is
stored in a separate table. Normally entries are stored as a single
blob inside the database. When an entry gets very large or contains
attributes with a very large number of values, modifications on that
entry may get very slow. Splitting the large attributes out to a separate
table can improve the performance of modification operations.
The default is UINT_MAX, which keeps all attributes in the main blob.
.TP
.BI multival_lo \ <integer>
Specify the number of values below which a multivalued attribute
that was stored in a separate table is moved back into the main
entry blob. If a modification deletes enough values to bring an
attribute below this threshold, its values will be removed from the
separate table and merged back into the main entry blob.
The default is UINT_MAX, which keeps all attributes in
the main blob.
.TP
.BI rtxnsize \ <entries> .BI rtxnsize \ <entries>
Specify the maximum number of entries to process in a single read Specify the maximum number of entries to process in a single read
transaction when executing a large search. Long-lived read transactions transaction when executing a large search. Long-lived read transactions

View file

@ -32,7 +32,8 @@ LDAP_BEGIN_DECL
#define MDB_AD2ID 0 #define MDB_AD2ID 0
#define MDB_DN2ID 1 #define MDB_DN2ID 1
#define MDB_ID2ENTRY 2 #define MDB_ID2ENTRY 2
#define MDB_NDB 3 #define MDB_ID2VAL 3
#define MDB_NDB 4
/* The default search IDL stack cache depth */ /* The default search IDL stack cache depth */
#define DEFAULT_SEARCH_STACK_DEPTH 16 #define DEFAULT_SEARCH_STACK_DEPTH 16
@ -83,6 +84,7 @@ struct mdb_info {
int mi_txn_cp; int mi_txn_cp;
uint32_t mi_txn_cp_min; uint32_t mi_txn_cp_min;
uint32_t mi_txn_cp_kbyte; uint32_t mi_txn_cp_kbyte;
struct re_s *mi_txn_cp_task; struct re_s *mi_txn_cp_task;
struct re_s *mi_index_task; struct re_s *mi_index_task;
@ -102,6 +104,13 @@ struct mdb_info {
int mi_numads; int mi_numads;
unsigned mi_multi_hi;
/* more than this many values in an attr goes
* into a separate DB */
unsigned mi_multi_lo;
/* less than this many values in an attr goes
* back into main blob */
MDB_dbi mi_dbis[MDB_NDB]; MDB_dbi mi_dbis[MDB_NDB];
AttributeDescription *mi_ads[MDB_MAXADS]; AttributeDescription *mi_ads[MDB_MAXADS];
int mi_adxs[MDB_MAXADS]; int mi_adxs[MDB_MAXADS];
@ -110,6 +119,7 @@ struct mdb_info {
#define mi_id2entry mi_dbis[MDB_ID2ENTRY] #define mi_id2entry mi_dbis[MDB_ID2ENTRY]
#define mi_dn2id mi_dbis[MDB_DN2ID] #define mi_dn2id mi_dbis[MDB_DN2ID]
#define mi_ad2id mi_dbis[MDB_AD2ID] #define mi_ad2id mi_dbis[MDB_AD2ID]
#define mi_id2val mi_dbis[MDB_ID2VAL]
typedef struct mdb_op_info { typedef struct mdb_op_info {
OpExtra moi_oe; OpExtra moi_oe;
@ -121,24 +131,6 @@ typedef struct mdb_op_info {
#define MOI_FREEIT 0x02 #define MOI_FREEIT 0x02
#define MOI_KEEPER 0x04 #define MOI_KEEPER 0x04
/* Copy an ID "src" to pointer "dst" in big-endian byte order */
#define MDB_ID2DISK( src, dst ) \
do { int i0; ID tmp; unsigned char *_p; \
tmp = (src); _p = (unsigned char *)(dst); \
for ( i0=sizeof(ID)-1; i0>=0; i0-- ) { \
_p[i0] = tmp & 0xff; tmp >>= 8; \
} \
} while(0)
/* Copy a pointer "src" to a pointer "dst" from big-endian to native order */
#define MDB_DISK2ID( src, dst ) \
do { unsigned i0; ID tmp = 0; unsigned char *_p; \
_p = (unsigned char *)(src); \
for ( i0=0; i0<sizeof(ID); i0++ ) { \
tmp <<= 8; tmp |= *_p++; \
} *(dst) = tmp; \
} while (0)
LDAP_END_DECL LDAP_END_DECL
/* for the cache of attribute information (which are indexed, etc.) */ /* for the cache of attribute information (which are indexed, etc.) */

View file

@ -83,6 +83,16 @@ static ConfigTable mdbcfg[] = {
mdb_cf_gen, "( OLcfgDbAt:0.3 NAME 'olcDbMode' " mdb_cf_gen, "( OLcfgDbAt:0.3 NAME 'olcDbMode' "
"DESC 'Unix permissions of database files' " "DESC 'Unix permissions of database files' "
"SYNTAX OMsDirectoryString SINGLE-VALUE )", NULL, NULL }, "SYNTAX OMsDirectoryString SINGLE-VALUE )", NULL, NULL },
{ "multival_hi", "num", 2, 2, 0, ARG_UINT|ARG_OFFSET,
(void *)offsetof(struct mdb_info, mi_multi_hi),
"( OLcfgDbAt:12.6 NAME 'olcDbMultivalHi' "
"DESC 'Threshold for splitting multivalued attr out of main blob' "
"SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL },
{ "multival_lo", "num", 2, 2, 0, ARG_UINT|ARG_OFFSET,
(void *)offsetof(struct mdb_info, mi_multi_lo),
"( OLcfgDbAt:12.7 NAME 'olcDbMultivalLo' "
"DESC 'Threshold for consolidating multivalued attr back into main blob' "
"SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL },
{ "rtxnsize", "entries", 2, 2, 0, ARG_UINT|ARG_OFFSET, { "rtxnsize", "entries", 2, 2, 0, ARG_UINT|ARG_OFFSET,
(void *)offsetof(struct mdb_info, mi_rtxn_size), (void *)offsetof(struct mdb_info, mi_rtxn_size),
"( OLcfgDbAt:12.5 NAME 'olcDbRtxnSize' " "( OLcfgDbAt:12.5 NAME 'olcDbRtxnSize' "
@ -105,7 +115,8 @@ static ConfigOCs mdbocs[] = {
"MUST olcDbDirectory " "MUST olcDbDirectory "
"MAY ( olcDbCheckpoint $ olcDbEnvFlags $ " "MAY ( olcDbCheckpoint $ olcDbEnvFlags $ "
"olcDbNoSync $ olcDbIndex $ olcDbMaxReaders $ olcDbMaxSize $ " "olcDbNoSync $ olcDbIndex $ olcDbMaxReaders $ olcDbMaxSize $ "
"olcDbMode $ olcDbSearchStack $ olcDbMaxEntrySize $ olcDbRtxnSize ) )", "olcDbMode $ olcDbSearchStack $ olcDbMaxEntrySize $ olcDbRtxnSize $ "
"olcDbMultivalHi $ olcDbMultivalLo ) )",
Cft_Database, mdbcfg }, Cft_Database, mdbcfg },
{ NULL, 0, NULL } { NULL, 0, NULL }
}; };

View file

@ -23,10 +23,12 @@
#include "back-mdb.h" #include "back-mdb.h"
typedef struct Ecount { typedef struct Ecount {
ber_len_t len; ber_len_t len; /* total entry size */
ber_len_t dlen; /* contiguous data size */
int nattrs; int nattrs;
int nvals; int nvals;
int offset; int offset;
Attribute *multi;
} Ecount; } Ecount;
static int mdb_entry_partsize(struct mdb_info *mdb, MDB_txn *txn, Entry *e, static int mdb_entry_partsize(struct mdb_info *mdb, MDB_txn *txn, Entry *e,
@ -35,6 +37,161 @@ static int mdb_entry_encode(Operation *op, Entry *e, MDB_val *data,
Ecount *ec); Ecount *ec);
static Entry *mdb_entry_alloc( Operation *op, int nattrs, int nvals ); static Entry *mdb_entry_alloc( Operation *op, int nattrs, int nvals );
#define ID2VKSZ (sizeof(ID)+2)
int
mdb_id2v_compare(
const MDB_val *usrkey,
const MDB_val *curkey
)
{
unsigned short *uv, *cv;
ID ui, ci;
int rc;
memcpy(&ui, usrkey->mv_data, sizeof(ID));
memcpy(&ci, curkey->mv_data, sizeof(ID));
if (ui < ci)
return -1;
if (ui > ci)
return 1;
uv = usrkey->mv_data;
cv = curkey->mv_data;
return uv[sizeof(ID)/2] - cv[sizeof(ID)/2];
}
/* Values are stored as
* [normalized-value NUL ] original-value NUL 2-byte-len
* The trailing 2-byte-len is zero if there is no normalized value.
* Otherwise, it is the length of the original-value.
*/
int mdb_mval_put(Operation *op, MDB_cursor *mc, ID id, Attribute *a)
{
struct mdb_info *mdb = (struct mdb_info *) op->o_bd->be_private;
MDB_val key, data;
char *buf;
char ivk[ID2VKSZ];
unsigned i;
unsigned short s;
int rc, len;
memcpy(ivk, &id, sizeof(id));
s = mdb->mi_adxs[a->a_desc->ad_index];
memcpy(ivk+sizeof(ID), &s, 2);
key.mv_data = &ivk;
key.mv_size = sizeof(ivk);
for (i=0; i<a->a_numvals; i++) {
len = a->a_nvals[i].bv_len + 1 + 2;
if (a->a_nvals != a->a_vals)
len += a->a_vals[i].bv_len + 1;
data.mv_size = len;
buf = op->o_tmpalloc( len, op->o_tmpmemctx );
data.mv_data = buf;
memcpy(buf, a->a_nvals[i].bv_val, a->a_nvals[i].bv_len);
buf += a->a_nvals[i].bv_len;
*buf++ = 0;
if (a->a_nvals != a->a_vals) {
s = a->a_vals[i].bv_len;
memcpy(buf, a->a_vals[i].bv_val, a->a_vals[i].bv_len);
buf += a->a_vals[i].bv_len;
*buf++ = 0;
memcpy(buf, &s, 2);
} else {
*buf++ = 0;
*buf++ = 0;
}
rc = mdb_cursor_put(mc, &key, &data, 0);
op->o_tmpfree( data.mv_data, op->o_tmpmemctx );
if (rc)
return rc;
}
return 0;
}
int mdb_mval_del(Operation *op, MDB_cursor *mc, ID id, Attribute *a)
{
struct mdb_info *mdb = (struct mdb_info *) op->o_bd->be_private;
MDB_val key, data;
char *ptr;
char ivk[ID2VKSZ];
unsigned i;
int rc;
unsigned short s;
memcpy(ivk, &id, sizeof(id));
s = mdb->mi_adxs[a->a_desc->ad_index];
memcpy(ivk+sizeof(ID), &s, 2);
key.mv_data = &ivk;
key.mv_size = sizeof(ivk);
if (a->a_numvals) {
for (i=0; i<a->a_numvals; i++) {
data.mv_data = a->a_nvals[i].bv_val;
data.mv_size = a->a_nvals[i].bv_len+1;
rc = mdb_cursor_get(mc, &key, &data, MDB_GET_BOTH_RANGE);
if (rc)
return rc;
rc = mdb_cursor_del(mc, 0);
if (rc)
return rc;
}
} else {
rc = mdb_cursor_get(mc, &key, &data, MDB_SET);
if (rc)
return rc;
rc = mdb_cursor_del(mc, MDB_NODUPDATA);
}
return rc;
}
static int mdb_mval_get(Operation *op, MDB_cursor *mc, ID id, Attribute *a, int have_nvals)
{
struct mdb_info *mdb = (struct mdb_info *) op->o_bd->be_private;
MDB_val key, data;
char *ptr;
char ivk[ID2VKSZ];
unsigned i;
int rc;
unsigned short s;
memcpy(ivk, &id, sizeof(id));
s = mdb->mi_adxs[a->a_desc->ad_index];
memcpy(ivk+sizeof(ID), &s, 2);
key.mv_data = &ivk;
key.mv_size = sizeof(ivk);
if (have_nvals)
a->a_nvals = a->a_vals + a->a_numvals + 1;
else
a->a_nvals = a->a_vals;
for (i=0; i<a->a_numvals; i++) {
if (!i)
rc = mdb_cursor_get(mc, &key, &data, MDB_SET);
else
rc = mdb_cursor_get(mc, &key, &data, MDB_NEXT_DUP);
if (rc)
return rc;
ptr = (char*)data.mv_data + data.mv_size - 2;
memcpy(&s, ptr, 2);
if (have_nvals) {
a->a_nvals[i].bv_val = data.mv_data;
a->a_vals[i].bv_len = s;
a->a_vals[i].bv_val = ptr - a->a_vals[i].bv_len - 1;
a->a_nvals[i].bv_len = a->a_vals[i].bv_val - a->a_nvals[i].bv_val - 1;
} else {
assert(!s);
a->a_vals[i].bv_val = data.mv_data;
a->a_vals[i].bv_len = data.mv_size - 3;
}
}
BER_BVZERO(&a->a_vals[i]);
if (have_nvals) {
BER_BVZERO(&a->a_nvals[i]);
}
return 0;
}
#define ADD_FLAGS (MDB_NOOVERWRITE|MDB_APPEND) #define ADD_FLAGS (MDB_NOOVERWRITE|MDB_APPEND)
static int mdb_id2entry_put( static int mdb_id2entry_put(
@ -47,7 +204,7 @@ static int mdb_id2entry_put(
struct mdb_info *mdb = (struct mdb_info *) op->o_bd->be_private; struct mdb_info *mdb = (struct mdb_info *) op->o_bd->be_private;
Ecount ec; Ecount ec;
MDB_val key, data; MDB_val key, data;
int rc; int rc, adding = flag;
/* We only store rdns, and they go in the dn2id database. */ /* We only store rdns, and they go in the dn2id database. */
@ -67,7 +224,7 @@ static int mdb_id2entry_put(
return LDAP_ADMINLIMIT_EXCEEDED; return LDAP_ADMINLIMIT_EXCEEDED;
again: again:
data.mv_size = ec.len; data.mv_size = ec.dlen;
if ( mc ) if ( mc )
rc = mdb_cursor_put( mc, &key, &data, flag ); rc = mdb_cursor_put( mc, &key, &data, flag );
else else
@ -76,6 +233,26 @@ again:
rc = mdb_entry_encode( op, e, &data, &ec ); rc = mdb_entry_encode( op, e, &data, &ec );
if( rc != LDAP_SUCCESS ) if( rc != LDAP_SUCCESS )
return rc; return rc;
/* Handle adds of large multi-valued attrs here.
* Modifies handle them directly.
*/
if (adding && ec.multi) {
MDB_cursor *mvc;
Attribute *a;
rc = mdb_cursor_open( txn, mdb->mi_dbis[MDB_ID2VAL], &mvc );
if( rc )
return rc;
for ( a = ec.multi; a; a=a->a_next ) {
if (!(a->a_flags & SLAP_ATTR_BIG_MULTI))
continue;
rc = mdb_mval_put( op, mvc, e->e_id, a );
if( rc != LDAP_SUCCESS )
break;
}
mdb_cursor_close( mvc );
if ( rc )
return rc;
}
} }
if (rc) { if (rc) {
/* Was there a hole from slapadd? */ /* Was there a hole from slapadd? */
@ -95,10 +272,7 @@ again:
/* /*
* This routine adds (or updates) an entry on disk. * This routine adds (or updates) an entry on disk.
* The cache should be already be updated.
*/ */
int mdb_id2entry_add( int mdb_id2entry_add(
Operation *op, Operation *op,
MDB_txn *txn, MDB_txn *txn,
@ -191,7 +365,7 @@ int mdb_id2entry(
rc = MDB_NOTFOUND; rc = MDB_NOTFOUND;
if ( rc ) return rc; if ( rc ) return rc;
rc = mdb_entry_decode( op, mdb_cursor_txn( mc ), &data, e ); rc = mdb_entry_decode( op, mdb_cursor_txn( mc ), &data, id, e );
if ( rc ) return rc; if ( rc ) return rc;
(*e)->e_id = id; (*e)->e_id = id;
@ -209,6 +383,7 @@ int mdb_id2entry_delete(
struct mdb_info *mdb = (struct mdb_info *) be->be_private; struct mdb_info *mdb = (struct mdb_info *) be->be_private;
MDB_dbi dbi = mdb->mi_id2entry; MDB_dbi dbi = mdb->mi_id2entry;
MDB_val key; MDB_val key;
MDB_cursor *mvc;
int rc; int rc;
key.mv_data = &e->e_id; key.mv_data = &e->e_id;
@ -216,7 +391,21 @@ int mdb_id2entry_delete(
/* delete from database */ /* delete from database */
rc = mdb_del( tid, dbi, &key, NULL ); rc = mdb_del( tid, dbi, &key, NULL );
if (rc)
return rc;
rc = mdb_cursor_open( tid, mdb->mi_dbis[MDB_ID2VAL], &mvc );
if (rc)
return rc;
rc = mdb_cursor_get( mvc, &key, NULL, MDB_SET_RANGE );
if (rc && rc != MDB_NOTFOUND)
return rc;
while (*(ID *)key.mv_data == e->e_id ) {
rc = mdb_cursor_del( mvc, MDB_NODUPDATA );
if (rc)
return rc;
mdb_cursor_get( mvc, &key, NULL, MDB_GET_CURRENT );
}
return rc; return rc;
} }
@ -567,11 +756,13 @@ int mdb_txn( Operation *op, int txnop, OpExtra **ptr )
static int mdb_entry_partsize(struct mdb_info *mdb, MDB_txn *txn, Entry *e, static int mdb_entry_partsize(struct mdb_info *mdb, MDB_txn *txn, Entry *e,
Ecount *eh) Ecount *eh)
{ {
ber_len_t len; ber_len_t len, dlen;
int i, nat = 0, nval = 0, nnval = 0; int i, nat = 0, nval = 0, nnval = 0, doff = 0;
Attribute *a; Attribute *a;
eh->multi = NULL;
len = 4*sizeof(int); /* nattrs, nvals, ocflags, offset */ len = 4*sizeof(int); /* nattrs, nvals, ocflags, offset */
dlen = len;
for (a=e->e_attrs; a; a=a->a_next) { for (a=e->e_attrs; a; a=a->a_next) {
/* For AttributeDesc, we only store the attr index */ /* For AttributeDesc, we only store the attr index */
nat++; nat++;
@ -586,46 +777,74 @@ static int mdb_entry_partsize(struct mdb_info *mdb, MDB_txn *txn, Entry *e,
return rc; return rc;
} }
len += 2*sizeof(int); /* AD index, numvals */ len += 2*sizeof(int); /* AD index, numvals */
dlen += 2*sizeof(int);
nval += a->a_numvals + 1; /* empty berval at end */ nval += a->a_numvals + 1; /* empty berval at end */
if (a->a_numvals > mdb->mi_multi_hi)
a->a_flags |= SLAP_ATTR_BIG_MULTI;
if (a->a_flags & SLAP_ATTR_BIG_MULTI)
doff += a->a_numvals;
for (i=0; i<a->a_numvals; i++) { for (i=0; i<a->a_numvals; i++) {
len += a->a_vals[i].bv_len + 1 + sizeof(int); /* len */ int alen = a->a_vals[i].bv_len + 1 + sizeof(int); /* len */
len += alen;
if (a->a_flags & SLAP_ATTR_BIG_MULTI) {
if (!eh->multi)
eh->multi = a;
} else {
dlen += alen;
}
} }
if (a->a_nvals != a->a_vals) { if (a->a_nvals != a->a_vals) {
nval += a->a_numvals + 1; nval += a->a_numvals + 1;
nnval++; nnval++;
if (a->a_flags & SLAP_ATTR_BIG_MULTI)
doff += a->a_numvals;
for (i=0; i<a->a_numvals; i++) { for (i=0; i<a->a_numvals; i++) {
len += a->a_nvals[i].bv_len + 1 + sizeof(int);; int alen = a->a_nvals[i].bv_len + 1 + sizeof(int);
len += alen;
if (!(a->a_flags & SLAP_ATTR_BIG_MULTI))
dlen += alen;
} }
} }
} }
/* padding */ /* padding */
len = (len + sizeof(ID)-1) & ~(sizeof(ID)-1); dlen = (dlen + sizeof(ID)-1) & ~(sizeof(ID)-1);
eh->len = len; eh->len = len;
eh->dlen = dlen;
eh->nattrs = nat; eh->nattrs = nat;
eh->nvals = nval; eh->nvals = nval;
eh->offset = nat + nval - nnval; eh->offset = nat + nval - nnval - doff;
return 0; return 0;
} }
#define HIGH_BIT (1<<(sizeof(unsigned int)*CHAR_BIT-1)) /* Flag bits for an encoded attribute */
#define MDB_AT_SORTED (1<<(sizeof(unsigned int)*CHAR_BIT-1))
/* the values are in sorted order */
#define MDB_AT_MULTI (1<<(sizeof(unsigned int)*CHAR_BIT-2))
/* the values of this multi-valued attr are stored separately */
#define MDB_AT_NVALS (1<<(sizeof(unsigned int)*CHAR_BIT-1))
/* this attribute has normalized values */
/* Flatten an Entry into a buffer. The buffer starts with the count of the /* Flatten an Entry into a buffer. The buffer starts with the count of the
* number of attributes in the entry, the total number of values in the * number of attributes in the entry, the total number of values in the
* entry, and the e_ocflags. It then contains a list of integers for each * entry, and the e_ocflags. It then contains a list of integers for each
* attribute. For each attribute the first integer gives the index of the * attribute. For each attribute the first integer gives the index of the
* matching AttributeDescription, followed by the number of values in the * matching AttributeDescription, followed by the number of values in the
* attribute. If the high bit of the attr index is set, the attribute's * attribute. If the MDB_AT_SORTED bit of the attr index is set, the
* values are already sorted. * attribute's values are already sorted. If the MDB_AT_MULTI bit of the
* If the high bit of numvals is set, the attribute also has normalized * attr index is set, the values are stored separately.
* values present. (Note - a_numvals is an unsigned int, so this means *
* it's possible to receive an attribute that we can't encode due to size * If the MDB_AT_NVALS bit of numvals is set, the attribute also has
* overflow. In practice, this should not be an issue.) Then the length * normalized values present. (Note - a_numvals is an unsigned int, so this
* of each value is listed. If there are normalized values, their lengths * means it's possible to receive an attribute that we can't encode due
* come next. This continues for each attribute. After all of the lengths * to size overflow. In practice, this should not be an issue.)
* for the last attribute, the actual values are copied, with a NUL *
* terminator after each value. The buffer is padded to the sizeof(ID). * Then the length of each value is listed. If there are normalized values,
* The entire buffer size is precomputed so that a single malloc can be * their lengths come next. This continues for each attribute. After all
* performed. * of the lengths for the last attribute, the actual values are copied,
* with a NUL terminator after each value.
* The buffer is padded to the sizeof(ID). The entire buffer size is
* precomputed so that a single malloc can be performed.
*/ */
static int mdb_entry_encode(Operation *op, Entry *e, MDB_val *data, Ecount *eh) static int mdb_entry_encode(Operation *op, Entry *e, MDB_val *data, Ecount *eh)
{ {
@ -655,12 +874,17 @@ static int mdb_entry_encode(Operation *op, Entry *e, MDB_val *data, Ecount *eh)
return LDAP_UNDEFINED_TYPE; return LDAP_UNDEFINED_TYPE;
l = mdb->mi_adxs[a->a_desc->ad_index]; l = mdb->mi_adxs[a->a_desc->ad_index];
if (a->a_flags & SLAP_ATTR_SORTED_VALS) if (a->a_flags & SLAP_ATTR_SORTED_VALS)
l |= HIGH_BIT; l |= MDB_AT_SORTED;
if (a->a_flags & SLAP_ATTR_BIG_MULTI)
l |= MDB_AT_MULTI;
*lp++ = l; *lp++ = l;
l = a->a_numvals; l = a->a_numvals;
if (a->a_nvals != a->a_vals) if (a->a_nvals != a->a_vals)
l |= HIGH_BIT; l |= MDB_AT_NVALS;
*lp++ = l; *lp++ = l;
if (a->a_flags & SLAP_ATTR_BIG_MULTI) {
continue;
} else {
if (a->a_vals) { if (a->a_vals) {
for (i=0; a->a_vals[i].bv_val; i++); for (i=0; a->a_vals[i].bv_val; i++);
assert( i == a->a_numvals ); assert( i == a->a_numvals );
@ -682,6 +906,7 @@ static int mdb_entry_encode(Operation *op, Entry *e, MDB_val *data, Ecount *eh)
} }
} }
} }
}
Debug( LDAP_DEBUG_TRACE, "<= mdb_entry_encode(0x%08lx): %s\n", Debug( LDAP_DEBUG_TRACE, "<= mdb_entry_encode(0x%08lx): %s\n",
(long) e->e_id, e->e_dn, 0 ); (long) e->e_id, e->e_dn, 0 );
@ -696,7 +921,7 @@ static int mdb_entry_encode(Operation *op, Entry *e, MDB_val *data, Ecount *eh)
* structure. Attempting to do so will likely corrupt memory. * structure. Attempting to do so will likely corrupt memory.
*/ */
int mdb_entry_decode(Operation *op, MDB_txn *txn, MDB_val *data, Entry **e) int mdb_entry_decode(Operation *op, MDB_txn *txn, MDB_val *data, ID id, Entry **e)
{ {
struct mdb_info *mdb = (struct mdb_info *) op->o_bd->be_private; struct mdb_info *mdb = (struct mdb_info *) op->o_bd->be_private;
int i, j, nattrs, nvals; int i, j, nattrs, nvals;
@ -708,6 +933,7 @@ int mdb_entry_decode(Operation *op, MDB_txn *txn, MDB_val *data, Entry **e)
unsigned int *lp = (unsigned int *)data->mv_data; unsigned int *lp = (unsigned int *)data->mv_data;
unsigned char *ptr; unsigned char *ptr;
BerVarray bptr; BerVarray bptr;
MDB_cursor *mvc = NULL;
Debug( LDAP_DEBUG_TRACE, Debug( LDAP_DEBUG_TRACE,
"=> mdb_entry_decode:\n", "=> mdb_entry_decode:\n",
@ -726,33 +952,50 @@ int mdb_entry_decode(Operation *op, MDB_txn *txn, MDB_val *data, Entry **e)
ptr = (unsigned char *)(lp + i); ptr = (unsigned char *)(lp + i);
for (;nattrs>0; nattrs--) { for (;nattrs>0; nattrs--) {
int have_nval = 0; int have_nval = 0, multi = 0;
a->a_flags = SLAP_ATTR_DONT_FREE_DATA | SLAP_ATTR_DONT_FREE_VALS; a->a_flags = SLAP_ATTR_DONT_FREE_DATA | SLAP_ATTR_DONT_FREE_VALS;
i = *lp++; i = *lp++;
if (i & HIGH_BIT) { if (i & MDB_AT_SORTED) {
i ^= HIGH_BIT; i ^= MDB_AT_SORTED;
a->a_flags |= SLAP_ATTR_SORTED_VALS; a->a_flags |= SLAP_ATTR_SORTED_VALS;
} }
if (i & MDB_AT_MULTI) {
i ^= MDB_AT_MULTI;
a->a_flags |= SLAP_ATTR_BIG_MULTI;
multi = 1;
}
if (i > mdb->mi_numads) { if (i > mdb->mi_numads) {
rc = mdb_ad_read(mdb, txn); rc = mdb_ad_read(mdb, txn);
if (rc) if (rc)
return rc; goto leave;
if (i > mdb->mi_numads) { if (i > mdb->mi_numads) {
Debug( LDAP_DEBUG_ANY, Debug( LDAP_DEBUG_ANY,
"mdb_entry_decode: attribute index %d not recognized\n", "mdb_entry_decode: attribute index %d not recognized\n",
i, 0, 0 ); i, 0, 0 );
return LDAP_OTHER; rc = LDAP_OTHER;
goto leave;
} }
} }
a->a_desc = mdb->mi_ads[i]; a->a_desc = mdb->mi_ads[i];
a->a_numvals = *lp++; a->a_numvals = *lp++;
if (a->a_numvals & HIGH_BIT) { if (a->a_numvals & MDB_AT_NVALS) {
a->a_numvals ^= HIGH_BIT; a->a_numvals ^= MDB_AT_NVALS;
have_nval = 1; have_nval = 1;
} }
a->a_vals = bptr; a->a_vals = bptr;
if (multi) {
if (!mvc) {
rc = mdb_cursor_open(txn, mdb->mi_dbis[MDB_ID2VAL], &mvc);
if (rc)
goto leave;
}
mdb_mval_get(op, mvc, id, a, have_nval);
bptr += a->a_numvals + 1;
if (have_nval)
bptr += a->a_numvals + 1;
} else {
for (i=0; i<a->a_numvals; i++) { for (i=0; i<a->a_numvals; i++) {
bptr->bv_len = *lp++;; bptr->bv_len = *lp++;
bptr->bv_val = (char *)ptr; bptr->bv_val = (char *)ptr;
ptr += bptr->bv_len+1; ptr += bptr->bv_len+1;
bptr++; bptr++;
@ -775,6 +1018,8 @@ int mdb_entry_decode(Operation *op, MDB_txn *txn, MDB_val *data, Entry **e)
} else { } else {
a->a_nvals = a->a_vals; a->a_nvals = a->a_vals;
} }
}
/* FIXME: This is redundant once a sorted entry is saved into the DB */ /* FIXME: This is redundant once a sorted entry is saved into the DB */
if (( a->a_desc->ad_type->sat_flags & SLAP_AT_SORTED_VAL ) if (( a->a_desc->ad_type->sat_flags & SLAP_AT_SORTED_VAL )
&& !(a->a_flags & SLAP_ATTR_SORTED_VALS)) { && !(a->a_flags & SLAP_ATTR_SORTED_VALS)) {
@ -786,7 +1031,7 @@ int mdb_entry_decode(Operation *op, MDB_txn *txn, MDB_val *data, Entry **e)
Debug( LDAP_DEBUG_ANY, Debug( LDAP_DEBUG_ANY,
"mdb_entry_decode: attributeType %s value #%d provided more than once\n", "mdb_entry_decode: attributeType %s value #%d provided more than once\n",
a->a_desc->ad_cname.bv_val, j, 0 ); a->a_desc->ad_cname.bv_val, j, 0 );
return rc; goto leave;
} }
} }
a->a_next = a+1; a->a_next = a+1;
@ -794,9 +1039,13 @@ int mdb_entry_decode(Operation *op, MDB_txn *txn, MDB_val *data, Entry **e)
} }
a[-1].a_next = NULL; a[-1].a_next = NULL;
done: done:
Debug(LDAP_DEBUG_TRACE, "<= mdb_entry_decode\n", Debug(LDAP_DEBUG_TRACE, "<= mdb_entry_decode\n",
0, 0, 0 ); 0, 0, 0 );
*e = x; *e = x;
return 0; rc = 0;
leave:
if (mvc)
mdb_cursor_close(mvc);
return rc;
} }

View file

@ -31,6 +31,7 @@ static const struct berval mdmi_databases[] = {
BER_BVC("ad2i"), BER_BVC("ad2i"),
BER_BVC("dn2i"), BER_BVC("dn2i"),
BER_BVC("id2e"), BER_BVC("id2e"),
BER_BVC("id2v"),
BER_BVNULL BER_BVNULL
}; };
@ -63,6 +64,8 @@ mdb_db_init( BackendDB *be, ConfigReply *cr )
mdb->mi_mapsize = DEFAULT_MAPSIZE; mdb->mi_mapsize = DEFAULT_MAPSIZE;
mdb->mi_rtxn_size = DEFAULT_RTXN_SIZE; mdb->mi_rtxn_size = DEFAULT_RTXN_SIZE;
mdb->mi_multi_hi = UINT_MAX;
mdb->mi_multi_lo = UINT_MAX;
be->be_private = mdb; be->be_private = mdb;
be->be_cf_ocs = be->bd_info->bi_cf_ocs; be->be_cf_ocs = be->bd_info->bi_cf_ocs;
@ -201,6 +204,8 @@ mdb_db_open( BackendDB *be, ConfigReply *cr )
} else { } else {
if ( i == MDB_DN2ID ) if ( i == MDB_DN2ID )
flags |= MDB_DUPSORT; flags |= MDB_DUPSORT;
if ( i == MDB_ID2VAL )
flags ^= MDB_INTEGERKEY|MDB_DUPSORT;
if ( !(slapMode & SLAP_TOOL_READONLY) ) if ( !(slapMode & SLAP_TOOL_READONLY) )
flags |= MDB_CREATE; flags |= MDB_CREATE;
} }
@ -224,6 +229,8 @@ mdb_db_open( BackendDB *be, ConfigReply *cr )
if ( i == MDB_ID2ENTRY ) if ( i == MDB_ID2ENTRY )
mdb_set_compare( txn, mdb->mi_dbis[i], mdb_id_compare ); mdb_set_compare( txn, mdb->mi_dbis[i], mdb_id_compare );
else if ( i == MDB_ID2VAL )
mdb_set_compare( txn, mdb->mi_dbis[i], mdb_id2v_compare );
else if ( i == MDB_DN2ID ) { else if ( i == MDB_DN2ID ) {
MDB_cursor *mc; MDB_cursor *mc;
MDB_val key, data; MDB_val key, data;

View file

@ -74,13 +74,17 @@ int mdb_modify_internal(
char *textbuf, char *textbuf,
size_t textlen ) size_t textlen )
{ {
struct mdb_info *mdb = (struct mdb_info *) op->o_bd->be_private;
int rc, err; int rc, err;
Modification *mod; Modification *mod;
Modifications *ml; Modifications *ml;
Attribute *save_attrs; Attribute *save_attrs;
Attribute *ap; Attribute *ap, *aold, *anew;
int glue_attr_delete = 0; int glue_attr_delete = 0;
int softop, chkpresent;
int got_delete; int got_delete;
int a_flags;
MDB_cursor *mvc = NULL;
Debug( LDAP_DEBUG_TRACE, "mdb_modify_internal: 0x%08lx: %s\n", Debug( LDAP_DEBUG_TRACE, "mdb_modify_internal: 0x%08lx: %s\n",
e->e_id, e->e_dn, 0); e->e_id, e->e_dn, 0);
@ -129,16 +133,64 @@ int mdb_modify_internal(
mod = &ml->sml_mod; mod = &ml->sml_mod;
got_delete = 0; got_delete = 0;
aold = attr_find( e->e_attrs, mod->sm_desc );
if (aold)
a_flags = aold->a_flags;
else
a_flags = 0;
switch ( mod->sm_op ) { switch ( mod->sm_op ) {
case LDAP_MOD_ADD: case LDAP_MOD_ADD:
softop = 0;
chkpresent = 0;
Debug(LDAP_DEBUG_ARGS, Debug(LDAP_DEBUG_ARGS,
"mdb_modify_internal: add %s\n", "mdb_modify_internal: add %s\n",
mod->sm_desc->ad_cname.bv_val, 0, 0); mod->sm_desc->ad_cname.bv_val, 0, 0);
do_add:
err = modify_add_values( e, mod, get_permissiveModify(op), err = modify_add_values( e, mod, get_permissiveModify(op),
text, textbuf, textlen ); text, textbuf, textlen );
if( softop ) {
mod->sm_op = SLAP_MOD_SOFTADD;
if ( err == LDAP_TYPE_OR_VALUE_EXISTS )
err = LDAP_SUCCESS;
}
if( chkpresent ) {
mod->sm_op = SLAP_MOD_ADD_IF_NOT_PRESENT;
}
if( err != LDAP_SUCCESS ) { if( err != LDAP_SUCCESS ) {
Debug(LDAP_DEBUG_ARGS, "mdb_modify_internal: %d %s\n", Debug(LDAP_DEBUG_ARGS, "mdb_modify_internal: %d %s\n",
err, *text, 0); err, *text, 0);
} else {
if (!aold)
anew = attr_find( e->e_attrs, mod->sm_desc );
else
anew = aold;
/* check for big multivalued attrs */
if ( anew->a_numvals > mdb->mi_multi_hi )
anew->a_flags |= SLAP_ATTR_BIG_MULTI;
if ( anew->a_flags & SLAP_ATTR_BIG_MULTI ) {
if (!mvc) {
err = mdb_cursor_open( tid, mdb->mi_dbis[MDB_ID2VAL], &mvc );
if (err)
break;
}
/* if prev was set, just add new values */
if (a_flags & SLAP_ATTR_BIG_MULTI ) {
anew = (Attribute *)mod;
/* Tweak nvals */
if (!anew->a_nvals)
anew->a_nvals = anew->a_vals;
}
err = mdb_mval_put(op, mvc, e->e_id, anew);
if (a_flags & SLAP_ATTR_BIG_MULTI ) {
/* Undo nvals tweak */
if (anew->a_nvals == anew->a_vals)
anew->a_nvals = NULL;
}
}
} }
break; break;
@ -148,16 +200,55 @@ int mdb_modify_internal(
break; break;
} }
softop = 0;
Debug(LDAP_DEBUG_ARGS, Debug(LDAP_DEBUG_ARGS,
"mdb_modify_internal: delete %s\n", "mdb_modify_internal: delete %s\n",
mod->sm_desc->ad_cname.bv_val, 0, 0); mod->sm_desc->ad_cname.bv_val, 0, 0);
do_del:
err = modify_delete_values( e, mod, get_permissiveModify(op), err = modify_delete_values( e, mod, get_permissiveModify(op),
text, textbuf, textlen ); text, textbuf, textlen );
if (softop) {
mod->sm_op = SLAP_MOD_SOFTDEL;
if ( err == LDAP_NO_SUCH_ATTRIBUTE ) {
err = LDAP_SUCCESS;
softop = 2;
}
}
if( err != LDAP_SUCCESS ) { if( err != LDAP_SUCCESS ) {
Debug(LDAP_DEBUG_ARGS, "mdb_modify_internal: %d %s\n", Debug(LDAP_DEBUG_ARGS, "mdb_modify_internal: %d %s\n",
err, *text, 0); err, *text, 0);
} else { } else {
if (softop != 2)
got_delete = 1; got_delete = 1;
/* check for big multivalued attrs */
if (a_flags & SLAP_ATTR_BIG_MULTI) {
Attribute a_dummy;
if (!mvc) {
err = mdb_cursor_open( tid, mdb->mi_dbis[MDB_ID2VAL], &mvc );
if (err)
break;
}
if ( mod->sm_numvals ) {
anew = attr_find( e->e_attrs, mod->sm_desc );
if ( anew->a_numvals < mdb->mi_multi_lo ) {
anew->a_flags ^= SLAP_ATTR_BIG_MULTI;
anew = NULL;
} else {
anew = (Attribute *)mod;
}
} else {
anew = NULL;
}
if (!anew) {
/* delete all values */
anew = &a_dummy;
anew->a_desc = mod->sm_desc;
anew->a_numvals = 0;
}
err = mdb_mval_del( op, mvc, e->e_id, anew );
}
} }
break; break;
@ -172,6 +263,29 @@ int mdb_modify_internal(
err, *text, 0); err, *text, 0);
} else { } else {
got_delete = 1; got_delete = 1;
if (a_flags & SLAP_ATTR_BIG_MULTI) {
Attribute a_dummy;
if (!mvc) {
err = mdb_cursor_open( tid, mdb->mi_dbis[MDB_ID2VAL], &mvc );
if (err)
break;
}
/* delete all values */
anew = &a_dummy;
anew->a_desc = mod->sm_desc;
anew->a_numvals = 0;
err = mdb_mval_del( op, mvc, e->e_id, anew );
if (err)
break;
anew = attr_find( e->e_attrs, mod->sm_desc );
if (mod->sm_numvals >= mdb->mi_multi_lo) {
anew->a_flags |= SLAP_ATTR_BIG_MULTI;
err = mdb_mval_put(op, mvc, e->e_id, anew);
} else if (anew) {
/* revert back to normal attr */
anew->a_flags &= ~SLAP_ATTR_BIG_MULTI;
}
}
} }
break; break;
@ -198,21 +312,8 @@ int mdb_modify_internal(
* We need to add index if necessary. * We need to add index if necessary.
*/ */
mod->sm_op = LDAP_MOD_ADD; mod->sm_op = LDAP_MOD_ADD;
softop = 1;
err = modify_add_values( e, mod, get_permissiveModify(op), goto do_add;
text, textbuf, textlen );
mod->sm_op = SLAP_MOD_SOFTADD;
if ( err == LDAP_TYPE_OR_VALUE_EXISTS ) {
err = LDAP_SUCCESS;
}
if( err != LDAP_SUCCESS ) {
Debug(LDAP_DEBUG_ARGS, "mdb_modify_internal: %d %s\n",
err, *text, 0);
}
break;
case SLAP_MOD_SOFTDEL: case SLAP_MOD_SOFTDEL:
Debug(LDAP_DEBUG_ARGS, Debug(LDAP_DEBUG_ARGS,
@ -222,23 +323,8 @@ int mdb_modify_internal(
* We need to add index if necessary. * We need to add index if necessary.
*/ */
mod->sm_op = LDAP_MOD_DELETE; mod->sm_op = LDAP_MOD_DELETE;
softop = 1;
err = modify_delete_values( e, mod, get_permissiveModify(op), goto do_del;
text, textbuf, textlen );
mod->sm_op = SLAP_MOD_SOFTDEL;
if ( err == LDAP_SUCCESS ) {
got_delete = 1;
} else if ( err == LDAP_NO_SUCH_ATTRIBUTE ) {
err = LDAP_SUCCESS;
}
if( err != LDAP_SUCCESS ) {
Debug(LDAP_DEBUG_ARGS, "mdb_modify_internal: %d %s\n",
err, *text, 0);
}
break;
case SLAP_MOD_ADD_IF_NOT_PRESENT: case SLAP_MOD_ADD_IF_NOT_PRESENT:
if ( attr_find( e->e_attrs, mod->sm_desc ) != NULL ) { if ( attr_find( e->e_attrs, mod->sm_desc ) != NULL ) {
@ -254,17 +340,8 @@ int mdb_modify_internal(
* We need to add index if necessary. * We need to add index if necessary.
*/ */
mod->sm_op = LDAP_MOD_ADD; mod->sm_op = LDAP_MOD_ADD;
chkpresent = 1;
err = modify_add_values( e, mod, get_permissiveModify(op), goto do_add;
text, textbuf, textlen );
mod->sm_op = SLAP_MOD_ADD_IF_NOT_PRESENT;
if( err != LDAP_SUCCESS ) {
Debug(LDAP_DEBUG_ARGS, "mdb_modify_internal: %d %s\n",
err, *text, 0);
}
break;
default: default:
Debug(LDAP_DEBUG_ANY, "mdb_modify_internal: invalid op %d\n", Debug(LDAP_DEBUG_ANY, "mdb_modify_internal: invalid op %d\n",

View file

@ -168,6 +168,8 @@ int mdb_filter_candidates(
* id2entry.c * id2entry.c
*/ */
MDB_cmp_func mdb_id2v_compare;
int mdb_id2entry_add( int mdb_id2entry_add(
Operation *op, Operation *op,
MDB_txn *tid, MDB_txn *tid,
@ -202,11 +204,14 @@ BI_entry_release_rw mdb_entry_release;
BI_entry_get_rw mdb_entry_get; BI_entry_get_rw mdb_entry_get;
BI_op_txn mdb_txn; BI_op_txn mdb_txn;
int mdb_entry_decode( Operation *op, MDB_txn *txn, MDB_val *data, Entry **e ); int mdb_entry_decode( Operation *op, MDB_txn *txn, MDB_val *data, ID id, Entry **e );
void mdb_reader_flush( MDB_env *env ); void mdb_reader_flush( MDB_env *env );
int mdb_opinfo_get( Operation *op, struct mdb_info *mdb, int rdonly, mdb_op_info **moi ); int mdb_opinfo_get( Operation *op, struct mdb_info *mdb, int rdonly, mdb_op_info **moi );
int mdb_mval_put(Operation *op, MDB_cursor *mc, ID id, Attribute *a);
int mdb_mval_del(Operation *op, MDB_cursor *mc, ID id, Attribute *a);
/* /*
* idl.c * idl.c
*/ */

View file

@ -923,7 +923,7 @@ notfound:
goto done; goto done;
} }
rs->sr_err = mdb_entry_decode( op, ltid, &edata, &e ); rs->sr_err = mdb_entry_decode( op, ltid, &edata, id, &e );
if ( rs->sr_err ) { if ( rs->sr_err ) {
rs->sr_err = LDAP_OTHER; rs->sr_err = LDAP_OTHER;
rs->sr_text = "internal error in mdb_entry_decode"; rs->sr_text = "internal error in mdb_entry_decode";

View file

@ -394,7 +394,7 @@ mdb_tool_entry_get_int( BackendDB *be, ID id, Entry **ep )
} }
} }
} }
rc = mdb_entry_decode( &op, mdb_tool_txn, &data, &e ); rc = mdb_entry_decode( &op, mdb_tool_txn, &data, id, &e );
e->e_id = id; e->e_id = id;
if ( !BER_BVISNULL( &dn )) { if ( !BER_BVISNULL( &dn )) {
e->e_name = dn; e->e_name = dn;

View file

@ -1167,10 +1167,11 @@ struct Attribute {
#define SLAP_ATTR_DONT_FREE_DATA 0x4U #define SLAP_ATTR_DONT_FREE_DATA 0x4U
#define SLAP_ATTR_DONT_FREE_VALS 0x8U #define SLAP_ATTR_DONT_FREE_VALS 0x8U
#define SLAP_ATTR_SORTED_VALS 0x10U /* values are sorted */ #define SLAP_ATTR_SORTED_VALS 0x10U /* values are sorted */
#define SLAP_ATTR_BIG_MULTI 0x20U /* for backends */
/* These flags persist across an attr_dup() */ /* These flags persist across an attr_dup() */
#define SLAP_ATTR_PERSISTENT_FLAGS \ #define SLAP_ATTR_PERSISTENT_FLAGS \
SLAP_ATTR_SORTED_VALS (SLAP_ATTR_SORTED_VALS|SLAP_ATTR_BIG_MULTI)
Attribute *a_next; Attribute *a_next;
#ifdef LDAP_COMP_MATCH #ifdef LDAP_COMP_MATCH