diff --git a/libraries/liblmdb/lmdb.h b/libraries/liblmdb/lmdb.h
index 2076eb35fa..9f00a04202 100644
--- a/libraries/liblmdb/lmdb.h
+++ b/libraries/liblmdb/lmdb.h
@@ -309,7 +309,7 @@ typedef void (MDB_rel_func)(MDB_val *item, void *oldptr, void *newptr, void *rel
#define MDB_APPEND 0x20000
/** Duplicate data is being appended, don't split full pages. */
#define MDB_APPENDDUP 0x40000
-/** Store multiple data items in one call. */
+/** Store multiple data items in one call. Only for #MDB_DUPFIXED. */
#define MDB_MULTIPLE 0x80000
/* @} */
@@ -1210,6 +1210,16 @@ int mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
* correct order. Loading unsorted keys with this flag will cause
* data corruption.
*
#MDB_APPENDDUP - as above, but for sorted dup data.
+ * #MDB_MULTIPLE - store multiple contiguous data elements in a
+ * single request. This flag may only be specified if the database
+ * was opened with #MDB_DUPFIXED. The \b data argument must be an
+ * array of two MDB_vals. The mv_size of the first MDB_val must be
+ * the size of a single data element. The mv_data of the first MDB_val
+ * must point to the beginning of the array of contiguous data elements.
+ * The mv_size of the second MDB_val must be the count of the number
+ * of data elements to store. On return this field will be set to
+ * the count of the number of elements actually written. The mv_data
+ * of the second MDB_val is unused.
*
* @return A non-zero error value on failure and 0 on success. Some possible
* errors are:
diff --git a/libraries/liblmdb/mdb.c b/libraries/liblmdb/mdb.c
index 0badbbbf69..df88adcd27 100644
--- a/libraries/liblmdb/mdb.c
+++ b/libraries/liblmdb/mdb.c
@@ -585,6 +585,7 @@ typedef struct MDB_page {
#define P_DIRTY 0x10 /**< dirty page */
#define P_LEAF2 0x20 /**< for #MDB_DUPFIXED records */
#define P_SUBP 0x40 /**< for #MDB_DUPSORT sub-pages */
+#define P_KEEP 0x8000 /**< leave this page alone during spill */
/** @} */
uint16_t mp_flags; /**< @ref mdb_page */
#define mp_lower mp_pb.pb.pb_lower
@@ -824,6 +825,10 @@ struct MDB_txn {
/** The list of pages that became unused during this transaction.
*/
MDB_IDL mt_free_pgs;
+ /** The list of dirty pages we temporarily wrote to disk
+ * because the dirty list was full.
+ */
+ MDB_IDL mt_spill_pgs;
union {
MDB_ID2L dirty_list; /**< for write txns: modified pages */
MDB_reader *reader; /**< this thread's reader table slot or NULL */
@@ -857,6 +862,7 @@ struct MDB_txn {
#define MDB_TXN_RDONLY 0x01 /**< read-only transaction */
#define MDB_TXN_ERROR 0x02 /**< an error has occurred */
#define MDB_TXN_DIRTY 0x04 /**< must write, even if dirty list is empty */
+#define MDB_TXN_SPILLS 0x08 /**< txn or a parent has spilled pages */
/** @} */
unsigned int mt_flags; /**< @ref mdb_txn */
/** dirty_list maxsize - # of allocated pages allowed, including in parent txns */
@@ -879,8 +885,8 @@ struct MDB_xcursor;
struct MDB_cursor {
/** Next cursor on this DB in this txn */
MDB_cursor *mc_next;
- /** Original cursor if this is a shadow */
- MDB_cursor *mc_orig;
+ /** Backup of the original cursor if this cursor is a shadow */
+ MDB_cursor *mc_backup;
/** Context used for databases with #MDB_DUPSORT, otherwise NULL */
struct MDB_xcursor *mc_xcursor;
/** The transaction that owns this cursor */
@@ -903,8 +909,6 @@ struct MDB_cursor {
#define C_INITIALIZED 0x01 /**< cursor has been initialized and is valid */
#define C_EOF 0x02 /**< No more data */
#define C_SUB 0x04 /**< Cursor is a sub-cursor */
-#define C_SHADOW 0x08 /**< Cursor is a dup from a parent txn */
-#define C_ALLOCD 0x10 /**< Cursor was malloc'd */
#define C_SPLITTING 0x20 /**< Cursor is in page_split */
#define C_UNTRACK 0x40 /**< Un-track cursor when closing */
/** @} */
@@ -1308,7 +1312,7 @@ mdb_dpage_free(MDB_env *env, MDB_page *dp)
}
}
-/* Return all dirty pages to dpage list */
+/** Return all dirty pages to dpage list */
static void
mdb_dlist_free(MDB_txn *txn)
{
@@ -1322,6 +1326,164 @@ mdb_dlist_free(MDB_txn *txn)
dl[0].mid = 0;
}
+/* Set or clear P_KEEP in non-overflow, non-sub pages in known cursors.
+ * When clearing, only consider backup cursors (from parent txns) since
+ * other P_KEEP flags have already been cleared.
+ * @param[in] mc A cursor handle for the current operation.
+ * @param[in] pflags Flags of the pages to update:
+ * P_DIRTY to set P_KEEP, P_DIRTY|P_KEEP to clear it.
+ */
+static void
+mdb_cursorpages_mark(MDB_cursor *mc, unsigned pflags)
+{
+ MDB_txn *txn = mc->mc_txn;
+ MDB_cursor *m2, *m3;
+ MDB_xcursor *mx;
+ unsigned i, j;
+
+ if (mc->mc_flags & C_UNTRACK)
+ mc = NULL; /* will find mc in mt_cursors */
+ for (i = txn->mt_numdbs;; mc = txn->mt_cursors[--i]) {
+ for (; mc; mc=mc->mc_next) {
+ m2 = pflags == P_DIRTY ? mc : mc->mc_backup;
+ for (; m2; m2 = m2->mc_backup) {
+ for (m3=m2; m3->mc_flags & C_INITIALIZED; m3=&mx->mx_cursor) {
+ for (j=0; jmc_snum; j++)
+ if ((m3->mc_pg[j]->mp_flags & (P_SUBP|P_DIRTY|P_KEEP))
+ == pflags)
+ m3->mc_pg[j]->mp_flags ^= P_KEEP;
+ if (!(m3->mc_db->md_flags & MDB_DUPSORT))
+ break;
+ /* Cursor backups have mx malloced at the end of m2 */
+ mx = (m3 == mc ? m3->mc_xcursor : (MDB_xcursor *)(m3+1));
+ }
+ }
+ }
+ if (i == 0)
+ break;
+ }
+}
+
+static int mdb_page_flush(MDB_txn *txn);
+
+/** Spill pages from the dirty list back to disk.
+ * This is intended to prevent running into #MDB_TXN_FULL situations,
+ * but note that they may still occur in a few cases:
+ * 1) pages in #MDB_DUPSORT sub-DBs are never spilled, so if there
+ * are too many of these dirtied in one txn, the txn may still get
+ * too full.
+ * 2) child txns may run out of space if their parents dirtied a
+ * lot of pages and never spilled them. TODO: we probably should do
+ * a preemptive spill during #mdb_txn_begin() of a child txn, if
+ * the parent's dirty_room is below a given threshold.
+ * 3) our estimate of the txn size could be too small. At the
+ * moment this seems unlikely.
+ *
+ * Otherwise, if not using nested txns, it is expected that apps will
+ * not run into #MDB_TXN_FULL any more. The pages are flushed to disk
+ * the same way as for a txn commit, e.g. their P_DIRTY flag is cleared.
+ * If the txn never references them again, they can be left alone.
+ * If the txn only reads them, they can be used without any fuss.
+ * If the txn writes them again, they can be dirtied immediately without
+ * going thru all of the work of #mdb_page_touch(). Such references are
+ * handled by #mdb_page_unspill().
+ *
+ * Also note, we never spill DB root pages, nor pages of active cursors,
+ * because we'll need these back again soon anyway. And in nested txns,
+ * we can't spill a page in a child txn if it was already spilled in a
+ * parent txn. That would alter the parent txns' data even though
+ * the child hasn't committed yet, and we'd have no way to undo it if
+ * the child aborted.
+ *
+ * @param[in] m0 cursor A cursor handle identifying the transaction and
+ * database for which we are checking space.
+ * @param[in] key For a put operation, the key being stored.
+ * @param[in] data For a put operation, the data being stored.
+ * @return 0 on success, non-zero on failure.
+ */
+static int
+mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data)
+{
+ MDB_txn *txn = m0->mc_txn;
+ MDB_page *dp;
+ MDB_ID2L dl = txn->mt_u.dirty_list;
+ unsigned int i, j;
+ int rc;
+
+ if (m0->mc_flags & C_SUB)
+ return MDB_SUCCESS;
+
+ /* Estimate how much space this op will take */
+ i = m0->mc_db->md_depth;
+ /* Named DBs also dirty the main DB */
+ if (m0->mc_dbi > MAIN_DBI)
+ i += txn->mt_dbs[MAIN_DBI].md_depth;
+ /* For puts, roughly factor in the key+data size */
+ if (key)
+ i += (LEAFSIZE(key, data) + txn->mt_env->me_psize) / txn->mt_env->me_psize;
+ i += i; /* double it for good measure */
+
+ if (txn->mt_dirty_room > i)
+ return MDB_SUCCESS;
+
+ if (!txn->mt_spill_pgs) {
+ txn->mt_spill_pgs = mdb_midl_alloc(MDB_IDL_UM_MAX);
+ if (!txn->mt_spill_pgs)
+ return ENOMEM;
+ }
+
+ /* Mark all the dirty root pages we want to preserve */
+ for (i=0; imt_numdbs; i++) {
+ if (txn->mt_dbflags[i] & DB_DIRTY) {
+ j = mdb_mid2l_search(dl, txn->mt_dbs[i].md_root);
+ if (j <= dl[0].mid) {
+ dp = dl[j].mptr;
+ dp->mp_flags |= P_KEEP;
+ }
+ }
+ }
+
+ /* Preserve pages used by cursors */
+ mdb_cursorpages_mark(m0, P_DIRTY);
+
+ /* Save the page IDs of all the pages we're flushing */
+ for (i=1; i<=dl[0].mid; i++) {
+ dp = dl[i].mptr;
+ if (dp->mp_flags & P_KEEP)
+ continue;
+ /* Can't spill twice, make sure it's not already in a parent's
+ * spill list.
+ */
+ if (txn->mt_parent) {
+ MDB_txn *tx2;
+ for (tx2 = txn->mt_parent; tx2; tx2 = tx2->mt_parent) {
+ if (tx2->mt_spill_pgs) {
+ j = mdb_midl_search(tx2->mt_spill_pgs, dl[i].mid);
+ if (j <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[j] == dl[i].mid) {
+ dp->mp_flags |= P_KEEP;
+ break;
+ }
+ }
+ }
+ if (tx2)
+ continue;
+ }
+ if ((rc = mdb_midl_append(&txn->mt_spill_pgs, dl[i].mid)))
+ return rc;
+ }
+ mdb_midl_sort(txn->mt_spill_pgs);
+
+ rc = mdb_page_flush(txn);
+
+ mdb_cursorpages_mark(m0, P_DIRTY|P_KEEP);
+
+ if (rc == 0) {
+ txn->mt_dirty_room = MDB_IDL_UM_MAX - dl[0].mid;
+ txn->mt_flags |= MDB_TXN_SPILLS;
+ }
+ return rc;
+}
+
/** Find oldest txnid still referenced. Expects txn->mt_txnid > 0. */
static txnid_t
mdb_find_oldest(MDB_txn *txn)
@@ -1339,6 +1501,24 @@ mdb_find_oldest(MDB_txn *txn)
return oldest;
}
+/** Add a page to the txn's dirty list */
+static void
+mdb_page_dirty(MDB_txn *txn, MDB_page *mp)
+{
+ MDB_ID2 mid;
+ int (*insert)(MDB_ID2L, MDB_ID2 *);
+
+ if (txn->mt_env->me_flags & MDB_WRITEMAP) {
+ insert = mdb_mid2l_append;
+ } else {
+ insert = mdb_mid2l_insert;
+ }
+ mid.mid = mp->mp_pgno;
+ mid.mptr = mp;
+ insert(txn->mt_u.dirty_list, &mid);
+ txn->mt_dirty_room--;
+}
+
/** Allocate pages for writing.
* If there are free pages available from older transactions, they
* will be re-used first. Otherwise a new page will be allocated.
@@ -1369,11 +1549,9 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
pgno_t pgno, *mop = env->me_pghead;
unsigned i, j, k, mop_len = mop ? mop[0] : 0;
MDB_page *np;
- MDB_ID2 mid;
txnid_t oldest = 0, last;
MDB_cursor_op op;
MDB_cursor m2;
- int (*insert)(MDB_ID2L, MDB_ID2 *);
*mp = NULL;
@@ -1476,11 +1654,9 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
search_done:
if (env->me_flags & MDB_WRITEMAP) {
np = (MDB_page *)(env->me_map + env->me_psize * pgno);
- insert = mdb_mid2l_append;
} else {
if (!(np = mdb_page_malloc(txn, num)))
return ENOMEM;
- insert = mdb_mid2l_insert;
}
if (i) {
mop[0] = mop_len -= num;
@@ -1490,10 +1666,8 @@ search_done:
} else {
txn->mt_next_pgno = pgno + num;
}
- mid.mid = np->mp_pgno = pgno;
- mid.mptr = np;
- insert(txn->mt_u.dirty_list, &mid);
- txn->mt_dirty_room--;
+ np->mp_pgno = pgno;
+ mdb_page_dirty(txn, np);
*mp = np;
return MDB_SUCCESS;
@@ -1523,6 +1697,61 @@ mdb_page_copy(MDB_page *dst, MDB_page *src, unsigned int psize)
}
}
+/** Pull a page off the txn's spill list, if present.
+ * If a page being referenced was spilled to disk in this txn, bring
+ * it back and make it dirty/writable again.
+ * @param[in] tx0 the transaction handle.
+ * @param[in] mp the page being referenced.
+ * @param[out] ret the writable page, if any. ret is unchanged if
+ * mp wasn't spilled.
+ */
+static int
+mdb_page_unspill(MDB_txn *tx0, MDB_page *mp, MDB_page **ret)
+{
+ MDB_env *env = tx0->mt_env;
+ MDB_txn *txn;
+ unsigned x;
+ pgno_t pgno = mp->mp_pgno;
+
+ for (txn = tx0; txn; txn=txn->mt_parent) {
+ if (!txn->mt_spill_pgs)
+ continue;
+ x = mdb_midl_search(txn->mt_spill_pgs, pgno);
+ if (x <= txn->mt_spill_pgs[0] && txn->mt_spill_pgs[x] == pgno) {
+ MDB_page *np;
+ int num;
+ if (IS_OVERFLOW(mp))
+ num = mp->mp_pages;
+ else
+ num = 1;
+ if (env->me_flags & MDB_WRITEMAP) {
+ np = mp;
+ } else {
+ np = mdb_page_malloc(txn, num);
+ if (!np)
+ return ENOMEM;
+ if (num > 1)
+ memcpy(np, mp, num * env->me_psize);
+ else
+ mdb_page_copy(np, mp, env->me_psize);
+ }
+ if (txn == tx0) {
+ /* If in current txn, this page is no longer spilled */
+ for (; x < txn->mt_spill_pgs[0]; x++)
+ txn->mt_spill_pgs[x] = txn->mt_spill_pgs[x+1];
+ txn->mt_spill_pgs[0]--;
+ } /* otherwise, if belonging to a parent txn, the
+ * page remains spilled until child commits
+ */
+ mdb_page_dirty(tx0, np);
+ np->mp_flags |= P_DIRTY;
+ *ret = np;
+ break;
+ }
+ }
+ return MDB_SUCCESS;
+}
+
/** Touch a page: make it dirty and re-insert into tree with updated pgno.
* @param[in] mc cursor pointing to the page to be touched
* @return 0 on success, non-zero on failure.
@@ -1538,6 +1767,14 @@ mdb_page_touch(MDB_cursor *mc)
int rc;
if (!F_ISSET(mp->mp_flags, P_DIRTY)) {
+ if (txn->mt_flags & MDB_TXN_SPILLS) {
+ np = NULL;
+ rc = mdb_page_unspill(txn, mp, &np);
+ if (rc)
+ return rc;
+ if (np)
+ goto done;
+ }
if ((rc = mdb_midl_need(&txn->mt_free_pgs, 1)) ||
(rc = mdb_page_alloc(mc, 1, &np)))
return rc;
@@ -1562,9 +1799,10 @@ mdb_page_touch(MDB_cursor *mc)
if (dl[0].mid) {
unsigned x = mdb_mid2l_search(dl, pgno);
if (x <= dl[0].mid && dl[x].mid == pgno) {
- np = dl[x].mptr;
- if (mp != np)
- mc->mc_pg[mc->mc_top] = np;
+ if (mp != dl[x].mptr) { /* bad cursor? */
+ mc->mc_flags &= ~(C_INITIALIZED|C_EOF);
+ return MDB_CORRUPTED;
+ }
return 0;
}
}
@@ -1584,6 +1822,7 @@ mdb_page_touch(MDB_cursor *mc)
np->mp_pgno = pgno;
np->mp_flags |= P_DIRTY;
+done:
/* Adjust cursors pointing to mp */
mc->mc_pg[mc->mc_top] = np;
dbi = mc->mc_dbi;
@@ -1635,57 +1874,35 @@ mdb_env_sync(MDB_env *env, int force)
return rc;
}
-/** Make shadow copies of all of parent txn's cursors */
+/** Back up parent txn's cursors, then grab the originals for tracking */
static int
mdb_cursor_shadow(MDB_txn *src, MDB_txn *dst)
{
- MDB_cursor *mc, *m2;
- unsigned int i, j, size;
+ MDB_cursor *mc, *bk;
+ MDB_xcursor *mx;
+ size_t size;
+ int i;
- for (i=0;imt_numdbs; i++) {
- if (src->mt_cursors[i]) {
+ for (i = src->mt_numdbs; --i >= 0; ) {
+ if ((mc = src->mt_cursors[i]) != NULL) {
size = sizeof(MDB_cursor);
- if (src->mt_cursors[i]->mc_xcursor)
+ if (mc->mc_xcursor)
size += sizeof(MDB_xcursor);
- for (m2 = src->mt_cursors[i]; m2; m2=m2->mc_next) {
- mc = malloc(size);
- if (!mc)
+ for (; mc; mc = bk->mc_next) {
+ bk = malloc(size);
+ if (!bk)
return ENOMEM;
- mc->mc_orig = m2;
- mc->mc_txn = dst;
- mc->mc_dbi = i;
+ *bk = *mc;
+ mc->mc_backup = bk;
mc->mc_db = &dst->mt_dbs[i];
- mc->mc_dbx = m2->mc_dbx;
- mc->mc_dbflag = &dst->mt_dbflags[i];
- mc->mc_snum = m2->mc_snum;
- mc->mc_top = m2->mc_top;
- mc->mc_flags = m2->mc_flags | (C_SHADOW|C_ALLOCD);
- for (j=0; jmc_snum; j++) {
- mc->mc_pg[j] = m2->mc_pg[j];
- mc->mc_ki[j] = m2->mc_ki[j];
- }
- if (m2->mc_xcursor) {
- MDB_xcursor *mx, *mx2;
- mx = (MDB_xcursor *)(mc+1);
- mc->mc_xcursor = mx;
- mx2 = m2->mc_xcursor;
- mx->mx_db = mx2->mx_db;
- mx->mx_dbx = mx2->mx_dbx;
- mx->mx_dbflag = mx2->mx_dbflag;
- mx->mx_cursor.mc_txn = dst;
- mx->mx_cursor.mc_dbi = mx2->mx_cursor.mc_dbi;
- mx->mx_cursor.mc_db = &mx->mx_db;
- mx->mx_cursor.mc_dbx = &mx->mx_dbx;
- mx->mx_cursor.mc_dbflag = &mx->mx_dbflag;
- mx->mx_cursor.mc_snum = mx2->mx_cursor.mc_snum;
- mx->mx_cursor.mc_top = mx2->mx_cursor.mc_top;
- mx->mx_cursor.mc_flags = mx2->mx_cursor.mc_flags | C_SHADOW;
- for (j=0; jmx_cursor.mc_snum; j++) {
- mx->mx_cursor.mc_pg[j] = mx2->mx_cursor.mc_pg[j];
- mx->mx_cursor.mc_ki[j] = mx2->mx_cursor.mc_ki[j];
- }
- } else {
- mc->mc_xcursor = NULL;
+ /* Kill pointers into src - and dst to reduce abuse: The
+ * user may not use mc until dst ends. Otherwise we'd...
+ */
+ mc->mc_txn = NULL; /* ...set this to dst */
+ mc->mc_dbflag = NULL; /* ...and &dst->mt_dbflags[i] */
+ if ((mx = mc->mc_xcursor) != NULL) {
+ *(MDB_xcursor *)(bk+1) = *mx;
+ mx->mx_cursor.mc_txn = NULL; /* ...and dst. */
}
mc->mc_next = dst->mt_cursors[i];
dst->mt_cursors[i] = mc;
@@ -1695,32 +1912,40 @@ mdb_cursor_shadow(MDB_txn *src, MDB_txn *dst)
return MDB_SUCCESS;
}
-/** Close this write txn's cursors, after optionally merging its shadow
- * cursors back into parent's.
+/** Close this write txn's cursors, give parent txn's cursors back to parent.
* @param[in] txn the transaction handle.
- * @param[in] merge 0 to not merge cursors, C_SHADOW to merge.
+ * @param[in] merge true to keep changes to parent cursors, false to revert.
* @return 0 on success, non-zero on failure.
*/
static void
mdb_cursors_close(MDB_txn *txn, unsigned merge)
{
- MDB_cursor **cursors = txn->mt_cursors, *mc, *next;
- int i, j;
+ MDB_cursor **cursors = txn->mt_cursors, *mc, *next, *bk;
+ MDB_xcursor *mx;
+ int i;
for (i = txn->mt_numdbs; --i >= 0; ) {
for (mc = cursors[i]; mc; mc = next) {
- next = mc->mc_next;
- if (mc->mc_flags & merge) {
- MDB_cursor *m2 = mc->mc_orig;
- m2->mc_snum = mc->mc_snum;
- m2->mc_top = mc->mc_top;
- for (j = mc->mc_snum; --j >= 0; ) {
- m2->mc_pg[j] = mc->mc_pg[j];
- m2->mc_ki[j] = mc->mc_ki[j];
- }
+ next = mc->mc_next;
+ if ((bk = mc->mc_backup) != NULL) {
+ if (merge) {
+ /* Commit changes to parent txn */
+ mc->mc_next = bk->mc_next;
+ mc->mc_backup = bk->mc_backup;
+ mc->mc_txn = bk->mc_txn;
+ mc->mc_db = bk->mc_db;
+ mc->mc_dbflag = bk->mc_dbflag;
+ if ((mx = mc->mc_xcursor) != NULL)
+ mx->mx_cursor.mc_txn = bk->mc_txn;
+ } else {
+ /* Abort nested txn */
+ *mc = *bk;
+ if ((mx = mc->mc_xcursor) != NULL)
+ *mx = *(MDB_xcursor *)(bk+1);
}
- if (mc->mc_flags & C_ALLOCD)
- free(mc);
+ mc = bk;
+ }
+ free(mc);
}
cursors[i] = NULL;
}
@@ -1804,6 +2029,7 @@ mdb_txn_renew0(MDB_txn *txn)
txn->mt_u.dirty_list[0].mid = 0;
txn->mt_free_pgs = env->me_free_pgs;
txn->mt_free_pgs[0] = 0;
+ txn->mt_spill_pgs = NULL;
env->me_txn = txn;
}
@@ -1909,6 +2135,7 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
txn->mt_toggle = parent->mt_toggle;
txn->mt_dirty_room = parent->mt_dirty_room;
txn->mt_u.dirty_list[0].mid = 0;
+ txn->mt_spill_pgs = NULL;
txn->mt_next_pgno = parent->mt_next_pgno;
parent->mt_child = txn;
txn->mt_parent = parent;
@@ -2011,6 +2238,7 @@ mdb_txn_reset0(MDB_txn *txn, const char *act)
txn->mt_parent->mt_child = NULL;
env->me_pgstate = ((MDB_ntxn *)txn)->mnt_pgstate;
mdb_midl_free(txn->mt_free_pgs);
+ mdb_midl_free(txn->mt_spill_pgs);
free(txn->mt_u.dirty_list);
return;
}
@@ -2174,25 +2402,32 @@ mdb_freelist_save(MDB_txn *txn)
total_room += head_room;
}
- /* Fill in the reserved, touched me_pghead records. Avoid write ops
- * so they cannot rearrange anything, just read the destinations.
- */
+ /* Fill in the reserved, touched me_pghead records */
rc = MDB_SUCCESS;
if (mop_len) {
MDB_val key, data;
- mop += mop_len + 1;
+ mop += mop_len;
rc = mdb_cursor_first(&mc, &key, &data);
for (; !rc; rc = mdb_cursor_next(&mc, &key, &data, MDB_NEXT)) {
- MDB_IDL dest = data.mv_data;
+ unsigned flags = MDB_CURRENT;
+ txnid_t id = *(txnid_t *)key.mv_data;
ssize_t len = (ssize_t)(data.mv_size / sizeof(MDB_ID)) - 1;
+ MDB_ID save;
- assert(len >= 0 && *(txnid_t*)key.mv_data <= env->me_pglast);
- if (len > mop_len)
+ assert(len >= 0 && id <= env->me_pglast);
+ key.mv_data = &id;
+ if (len > mop_len) {
len = mop_len;
- *dest++ = len;
- memcpy(dest, mop -= len, len * sizeof(MDB_ID));
- if (! (mop_len -= len))
+ data.mv_size = (len + 1) * sizeof(MDB_ID);
+ flags = 0;
+ }
+ data.mv_data = mop -= len;
+ save = mop[0];
+ mop[0] = len;
+ rc = mdb_cursor_put(&mc, &key, &data, flags);
+ mop[0] = save;
+ if (rc || !(mop_len -= len))
break;
}
}
@@ -2206,27 +2441,34 @@ mdb_page_flush(MDB_txn *txn)
{
MDB_env *env = txn->mt_env;
MDB_ID2L dl = txn->mt_u.dirty_list;
- unsigned psize = env->me_psize;
+ unsigned psize = env->me_psize, j;
int i, pagecount = dl[0].mid, rc;
size_t size = 0, pos = 0;
- pgno_t pgno;
+ pgno_t pgno = 0;
MDB_page *dp = NULL;
#ifdef _WIN32
OVERLAPPED ov;
#else
struct iovec iov[MDB_COMMIT_PAGES];
- ssize_t wpos, wsize = 0, wres;
+ ssize_t wpos = 0, wsize = 0, wres;
size_t next_pos = 1; /* impossible pos, so pos != next_pos */
int n = 0;
#endif
+ j = 0;
if (env->me_flags & MDB_WRITEMAP) {
/* Clear dirty flags */
for (i = pagecount; i; i--) {
dp = dl[i].mptr;
+ /* Don't flush this page yet */
+ if (dp->mp_flags & P_KEEP) {
+ dp->mp_flags ^= P_KEEP;
+ dl[++j] = dl[i];
+ continue;
+ }
dp->mp_flags &= ~P_DIRTY;
}
- dl[0].mid = 0;
+ dl[0].mid = j;
return MDB_SUCCESS;
}
@@ -2234,6 +2476,12 @@ mdb_page_flush(MDB_txn *txn)
for (i = 1;; i++) {
if (i <= pagecount) {
dp = dl[i].mptr;
+ /* Don't flush this page yet */
+ if (dp->mp_flags & P_KEEP) {
+ dp->mp_flags ^= P_KEEP;
+ dl[i].mid = 0;
+ continue;
+ }
pgno = dl[i].mid;
/* clear dirty flag */
dp->mp_flags &= ~P_DIRTY;
@@ -2305,7 +2553,18 @@ mdb_page_flush(MDB_txn *txn)
#endif /* _WIN32 */
}
- mdb_dlist_free(txn);
+ j = 0;
+ for (i=1; i<=pagecount; i++) {
+ dp = dl[i].mptr;
+ /* This is a page we skipped above */
+ if (!dl[i].mid) {
+ dl[++j] = dl[i];
+ dl[j].mid = dp->mp_pgno;
+ continue;
+ }
+ mdb_dpage_free(env, dp);
+ }
+ dl[0].mid = j;
return MDB_SUCCESS;
}
@@ -2359,21 +2618,52 @@ mdb_txn_commit(MDB_txn *txn)
parent->mt_flags = txn->mt_flags;
/* Merge our cursors into parent's and close them */
- mdb_cursors_close(txn, C_SHADOW);
+ mdb_cursors_close(txn, 1);
/* Update parent's DB table. */
memcpy(parent->mt_dbs, txn->mt_dbs, txn->mt_numdbs * sizeof(MDB_db));
- txn->mt_parent->mt_numdbs = txn->mt_numdbs;
- txn->mt_parent->mt_dbflags[0] = txn->mt_dbflags[0];
- txn->mt_parent->mt_dbflags[1] = txn->mt_dbflags[1];
+ parent->mt_numdbs = txn->mt_numdbs;
+ parent->mt_dbflags[0] = txn->mt_dbflags[0];
+ parent->mt_dbflags[1] = txn->mt_dbflags[1];
for (i=2; imt_numdbs; i++) {
/* preserve parent's DB_NEW status */
- x = txn->mt_parent->mt_dbflags[i] & DB_NEW;
- txn->mt_parent->mt_dbflags[i] = txn->mt_dbflags[i] | x;
+ x = parent->mt_dbflags[i] & DB_NEW;
+ parent->mt_dbflags[i] = txn->mt_dbflags[i] | x;
}
- dst = txn->mt_parent->mt_u.dirty_list;
+ dst = parent->mt_u.dirty_list;
src = txn->mt_u.dirty_list;
+ /* Remove anything in our dirty list from parent's spill list */
+ if (parent->mt_spill_pgs) {
+ x = parent->mt_spill_pgs[0];
+ len = x;
+ /* zero out our dirty pages in parent spill list */
+ for (i=1; i<=src[0].mid; i++) {
+ if (src[i].mid < parent->mt_spill_pgs[x])
+ continue;
+ if (src[i].mid > parent->mt_spill_pgs[x]) {
+ if (x <= 1)
+ break;
+ x--;
+ continue;
+ }
+ parent->mt_spill_pgs[x] = 0;
+ len--;
+ }
+ /* OK, we had a few hits, squash zeros from the spill list */
+ if (len < parent->mt_spill_pgs[0]) {
+ x=1;
+ for (y=1; y<=parent->mt_spill_pgs[0]; y++) {
+ if (parent->mt_spill_pgs[y]) {
+ if (y != x) {
+ parent->mt_spill_pgs[x] = parent->mt_spill_pgs[y];
+ }
+ x++;
+ }
+ }
+ parent->mt_spill_pgs[0] = len;
+ }
+ }
/* Find len = length of merging our dirty list with parent's */
x = dst[0].mid;
dst[0].mid = 0; /* simplify loops */
@@ -2405,8 +2695,17 @@ mdb_txn_commit(MDB_txn *txn)
dst[0].mid = len;
free(txn->mt_u.dirty_list);
parent->mt_dirty_room = txn->mt_dirty_room;
+ if (txn->mt_spill_pgs) {
+ if (parent->mt_spill_pgs) {
+ mdb_midl_append_list(&parent->mt_spill_pgs, txn->mt_spill_pgs);
+ mdb_midl_free(txn->mt_spill_pgs);
+ mdb_midl_sort(parent->mt_spill_pgs);
+ } else {
+ parent->mt_spill_pgs = txn->mt_spill_pgs;
+ }
+ }
- txn->mt_parent->mt_child = NULL;
+ parent->mt_child = NULL;
mdb_midl_free(((MDB_ntxn *)txn)->mnt_pgstate.mf_pghead);
free(txn);
return MDB_SUCCESS;
@@ -2502,6 +2801,8 @@ mdb_env_read_header(MDB_env *env, MDB_meta *meta)
memset(&ov, 0, sizeof(ov));
ov.Offset = off;
rc = ReadFile(env->me_fd,&pbuf,MDB_PAGESIZE,&len,&ov) ? (int)len : -1;
+ if (rc == -1 && ErrCode() == ERROR_HANDLE_EOF)
+ rc = 0;
#else
rc = pread(env->me_fd, &pbuf, MDB_PAGESIZE, off);
#endif
@@ -3985,6 +4286,19 @@ mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret, int *lvl)
level = 1;
do {
MDB_ID2L dl = tx2->mt_u.dirty_list;
+ unsigned x;
+ /* Spilled pages were dirtied in this txn and flushed
+ * because the dirty list got full. Bring this page
+ * back in from the map (but don't unspill it here,
+ * leave that unless page_touch happens again).
+ */
+ if (tx2->mt_spill_pgs) {
+ x = mdb_midl_search(tx2->mt_spill_pgs, pgno);
+ if (x <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[x] == pgno) {
+ p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
+ goto done;
+ }
+ }
if (dl[0].mid) {
unsigned x = mdb_mid2l_search(dl, pgno);
if (x <= dl[0].mid && dl[x].mid == pgno) {
@@ -4085,6 +4399,8 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify)
DPRINTF("found leaf page %zu for key [%s]", mp->mp_pgno,
key ? DKEY(key) : NULL);
+ mc->mc_flags |= C_INITIALIZED;
+ mc->mc_flags &= ~C_EOF;
return MDB_SUCCESS;
}
@@ -4212,11 +4528,21 @@ mdb_ovpage_free(MDB_cursor *mc, MDB_page *mp)
int rc;
DPRINTF("free ov page %zu (%d)", pg, ovpages);
- /* If the page is dirty we just acquired it, so we should
- * give it back to our current free list, if any.
+ /* If the page is dirty or on the spill list we just acquired it,
+ * so we should give it back to our current free list, if any.
* Not currently supported in nested txns.
* Otherwise put it onto the list of pages we freed in this txn.
*/
+ if (!(mp->mp_flags & P_DIRTY) && txn->mt_spill_pgs) {
+ unsigned x = mdb_midl_search(txn->mt_spill_pgs, pg);
+ if (x <= txn->mt_spill_pgs[0] && txn->mt_spill_pgs[x] == pg) {
+ /* This page is no longer spilled */
+ for (; x < txn->mt_spill_pgs[0]; x++)
+ txn->mt_spill_pgs[x] = txn->mt_spill_pgs[x+1];
+ txn->mt_spill_pgs[0]--;
+ goto release;
+ }
+ }
if ((mp->mp_flags & P_DIRTY) && !txn->mt_parent && env->me_pghead) {
unsigned j, x;
pgno_t *mop;
@@ -4242,6 +4568,7 @@ mdb_ovpage_free(MDB_cursor *mc, MDB_page *mp)
}
if (!(env->me_flags & MDB_WRITEMAP))
mdb_dpage_free(env, mp);
+release:
/* Insert in me_pghead */
mop = env->me_pghead;
j = mop[0] + ovpages;
@@ -4394,7 +4721,7 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
return rc;
}
} else {
- mc->mc_xcursor->mx_cursor.mc_flags &= ~C_INITIALIZED;
+ mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF);
if (op == MDB_NEXT_DUP)
return MDB_NOTFOUND;
}
@@ -4406,7 +4733,6 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
DPUTS("=====> move to next sibling page");
if ((rc = mdb_cursor_sibling(mc, 1)) != MDB_SUCCESS) {
mc->mc_flags |= C_EOF;
- mc->mc_flags &= ~C_INITIALIZED;
return rc;
}
mp = mc->mc_pg[mc->mc_top];
@@ -4464,7 +4790,7 @@ mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
if (op != MDB_PREV || rc != MDB_NOTFOUND)
return rc;
} else {
- mc->mc_xcursor->mx_cursor.mc_flags &= ~C_INITIALIZED;
+ mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF);
if (op == MDB_PREV_DUP)
return MDB_NOTFOUND;
}
@@ -4476,7 +4802,6 @@ mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
if (mc->mc_ki[mc->mc_top] == 0) {
DPUTS("=====> move to prev sibling page");
if ((rc = mdb_cursor_sibling(mc, 0)) != MDB_SUCCESS) {
- mc->mc_flags &= ~C_INITIALIZED;
return rc;
}
mp = mc->mc_pg[mc->mc_top];
@@ -4682,7 +5007,7 @@ set1:
} else {
if (mc->mc_xcursor)
- mc->mc_xcursor->mx_cursor.mc_flags &= ~C_INITIALIZED;
+ mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF);
if ((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)
return rc;
}
@@ -4730,7 +5055,7 @@ mdb_cursor_first(MDB_cursor *mc, MDB_val *key, MDB_val *data)
return rc;
} else {
if (mc->mc_xcursor)
- mc->mc_xcursor->mx_cursor.mc_flags &= ~C_INITIALIZED;
+ mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF);
if ((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)
return rc;
}
@@ -4778,7 +5103,7 @@ mdb_cursor_last(MDB_cursor *mc, MDB_val *key, MDB_val *data)
return rc;
} else {
if (mc->mc_xcursor)
- mc->mc_xcursor->mx_cursor.mc_flags &= ~C_INITIALIZED;
+ mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF);
if ((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)
return rc;
}
@@ -4960,16 +5285,20 @@ mdb_cursor_touch(MDB_cursor *mc)
return MDB_SUCCESS;
}
+/** Do not spill pages to disk if txn is getting full, may fail instead */
+#define MDB_NOSPILL 0x8000
+
int
mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
unsigned int flags)
{
+ enum { MDB_NO_ROOT = MDB_LAST_ERRCODE+10 }; /* internal code */
MDB_node *leaf = NULL;
MDB_val xdata, *rdata, dkey;
MDB_page *fp;
MDB_db dummy;
int do_sub = 0, insert = 0;
- unsigned int mcount = 0;
+ unsigned int mcount = 0, dcount = 0, nospill;
size_t nsize;
int rc, rc2;
MDB_pagebuf pbuf;
@@ -4977,6 +5306,19 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
unsigned int nflags;
DKBUF;
+ /* Check this first so counter will always be zero on any
+ * early failures.
+ */
+ if (flags & MDB_MULTIPLE) {
+ dcount = data[1].mv_size;
+ data[1].mv_size = 0;
+ if (!F_ISSET(mc->mc_db->md_flags, MDB_DUPFIXED))
+ return EINVAL;
+ }
+
+ nospill = flags & MDB_NOSPILL;
+ flags &= ~MDB_NOSPILL;
+
if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY))
return EACCES;
@@ -5001,23 +5343,10 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
return EINVAL;
rc = MDB_SUCCESS;
} else if (mc->mc_db->md_root == P_INVALID) {
- MDB_page *np;
- /* new database, write a root leaf page */
- DPUTS("allocating new root leaf page");
- if ((rc = mdb_page_new(mc, P_LEAF, 1, &np))) {
- return rc;
- }
+ /* new database, cursor has nothing to point to */
mc->mc_snum = 0;
- mdb_cursor_push(mc, np);
- mc->mc_db->md_root = np->mp_pgno;
- mc->mc_db->md_depth++;
- *mc->mc_dbflag |= DB_DIRTY;
- if ((mc->mc_db->md_flags & (MDB_DUPSORT|MDB_DUPFIXED))
- == MDB_DUPFIXED)
- np->mp_flags |= P_LEAF2;
- mc->mc_flags |= C_INITIALIZED;
- rc = MDB_NOTFOUND;
- goto top;
+ mc->mc_flags &= ~C_INITIALIZED;
+ rc = MDB_NO_ROOT;
} else {
int exact = 0;
MDB_val d2;
@@ -5035,7 +5364,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
}
}
} else {
- rc = mdb_cursor_set(mc, key, &d2, MDB_SET, &exact);
+ rc = mdb_cursor_set(mc, key, &d2, MDB_SET, &exact);
}
if ((flags & MDB_NOOVERWRITE) && rc == 0) {
DPRINTF("duplicate key [%s]", DKEY(key));
@@ -5046,12 +5375,40 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
return rc;
}
- /* Cursor is positioned, now make sure all pages are writable */
- rc2 = mdb_cursor_touch(mc);
- if (rc2)
- return rc2;
+ /* Cursor is positioned, check for room in the dirty list */
+ if (!nospill) {
+ if (flags & MDB_MULTIPLE) {
+ rdata = &xdata;
+ xdata.mv_size = data->mv_size * dcount;
+ } else {
+ rdata = data;
+ }
+ if ((rc2 = mdb_page_spill(mc, key, rdata)))
+ return rc2;
+ }
+
+ if (rc == MDB_NO_ROOT) {
+ MDB_page *np;
+ /* new database, write a root leaf page */
+ DPUTS("allocating new root leaf page");
+ if ((rc2 = mdb_page_new(mc, P_LEAF, 1, &np))) {
+ return rc2;
+ }
+ mdb_cursor_push(mc, np);
+ mc->mc_db->md_root = np->mp_pgno;
+ mc->mc_db->md_depth++;
+ *mc->mc_dbflag |= DB_DIRTY;
+ if ((mc->mc_db->md_flags & (MDB_DUPSORT|MDB_DUPFIXED))
+ == MDB_DUPFIXED)
+ np->mp_flags |= P_LEAF2;
+ mc->mc_flags |= C_INITIALIZED;
+ } else {
+ /* make sure all cursor pages are writable */
+ rc2 = mdb_cursor_touch(mc);
+ if (rc2)
+ return rc2;
+ }
-top:
/* The key already exists */
if (rc == MDB_SUCCESS) {
/* there's only a key anyway, so this is a no-op */
@@ -5207,8 +5564,18 @@ current:
return rc2;
ovpages = omp->mp_pages;
- /* Is the ov page writable and large enough? */
- if ((omp->mp_flags & P_DIRTY) && ovpages >= dpages) {
+ /* Is the ov page large enough? */
+ if (ovpages >= dpages) {
+ if (!(omp->mp_flags & P_DIRTY) &&
+ (level || (mc->mc_txn->mt_env->me_flags & MDB_WRITEMAP)))
+ {
+ rc = mdb_page_unspill(mc->mc_txn, omp, &omp);
+ if (rc)
+ return rc;
+ level = 0; /* dirty in this txn or clean */
+ }
+ /* Is it dirty? */
+ if (omp->mp_flags & P_DIRTY) {
/* yes, overwrite it. Note in this case we don't
* bother to try shrinking the page if the new data
* is smaller than the overflow threshold.
@@ -5241,10 +5608,10 @@ current:
else
memcpy(METADATA(omp), data->mv_data, data->mv_size);
goto done;
- } else {
- if ((rc2 = mdb_ovpage_free(mc, omp)) != MDB_SUCCESS)
- return rc2;
+ }
}
+ if ((rc2 = mdb_ovpage_free(mc, omp)) != MDB_SUCCESS)
+ return rc2;
} else if (NODEDSZ(leaf) == data->mv_size) {
/* same size, just replace it. Note that we could
* also reuse this node if the new data is smaller,
@@ -5317,10 +5684,11 @@ put_sub:
xdata.mv_data = "";
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (flags & MDB_CURRENT) {
- xflags = MDB_CURRENT;
+ xflags = MDB_CURRENT|MDB_NOSPILL;
} else {
mdb_xcursor_init1(mc, leaf);
- xflags = (flags & MDB_NODUPDATA) ? MDB_NOOVERWRITE : 0;
+ xflags = (flags & MDB_NODUPDATA) ?
+ MDB_NOOVERWRITE|MDB_NOSPILL : MDB_NOSPILL;
}
/* converted, write the original data first */
if (dkey.mv_size) {
@@ -5335,13 +5703,14 @@ put_sub:
for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) {
if (m2 == mc || m2->mc_snum < mc->mc_snum) continue;
+ if (!(m2->mc_flags & C_INITIALIZED)) continue;
if (m2->mc_pg[i] == mp && m2->mc_ki[i] == mc->mc_ki[i]) {
mdb_xcursor_init1(m2, leaf);
}
}
}
- /* we've done our job */
- dkey.mv_size = 0;
+ /* we've done our job */
+ dkey.mv_size = 0;
}
if (flags & MDB_APPENDDUP)
xflags |= MDB_APPEND;
@@ -5357,12 +5726,16 @@ put_sub:
if (!rc && !(flags & MDB_CURRENT))
mc->mc_db->md_entries++;
if (flags & MDB_MULTIPLE) {
- mcount++;
- if (mcount < data[1].mv_size) {
- data[0].mv_data = (char *)data[0].mv_data + data[0].mv_size;
- leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
- goto more;
+ if (!rc) {
+ mcount++;
+ if (mcount < dcount) {
+ data[0].mv_data = (char *)data[0].mv_data + data[0].mv_size;
+ leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
+ goto more;
+ }
}
+ /* let caller know how many succeeded, if any */
+ data[1].mv_size = mcount;
}
}
done:
@@ -5386,6 +5759,10 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
if (!(mc->mc_flags & C_INITIALIZED))
return EINVAL;
+ if (!(flags & MDB_NOSPILL) && (rc = mdb_page_spill(mc, NULL, NULL)))
+ return rc;
+ flags &= ~MDB_NOSPILL; /* TODO: Or change (flags != MDB_NODUPDATA) to ~(flags & MDB_NODUPDATA), not looking at the logic of that code just now */
+
rc = mdb_cursor_touch(mc);
if (rc)
return rc;
@@ -5397,7 +5774,7 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) {
mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf);
}
- rc = mdb_cursor_del(&mc->mc_xcursor->mx_cursor, 0);
+ rc = mdb_cursor_del(&mc->mc_xcursor->mx_cursor, MDB_NOSPILL);
/* If sub-DB still has entries, we're done */
if (mc->mc_xcursor->mx_db.md_entries) {
if (leaf->mn_flags & F_SUBDATA) {
@@ -5855,7 +6232,8 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node)
static void
mdb_cursor_init(MDB_cursor *mc, MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
{
- mc->mc_orig = NULL;
+ mc->mc_next = NULL;
+ mc->mc_backup = NULL;
mc->mc_dbi = dbi;
mc->mc_txn = txn;
mc->mc_db = &txn->mt_dbs[dbi];
@@ -5900,7 +6278,6 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
txn->mt_cursors[dbi] = mc;
mc->mc_flags |= C_UNTRACK;
}
- mc->mc_flags |= C_ALLOCD;
} else {
return ENOMEM;
}
@@ -5913,19 +6290,13 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
int
mdb_cursor_renew(MDB_txn *txn, MDB_cursor *mc)
{
- unsigned flags;
-
if (txn == NULL || mc == NULL || mc->mc_dbi >= txn->mt_numdbs)
return EINVAL;
if ((mc->mc_flags & C_UNTRACK) || txn->mt_cursors)
return EINVAL;
- flags = mc->mc_flags;
-
mdb_cursor_init(mc, txn, mc->mc_dbi, mc->mc_xcursor);
-
- mc->mc_flags |= (flags & C_ALLOCD);
return MDB_SUCCESS;
}
@@ -5956,7 +6327,7 @@ mdb_cursor_count(MDB_cursor *mc, size_t *countp)
void
mdb_cursor_close(MDB_cursor *mc)
{
- if (mc != NULL) {
+ if (mc && !mc->mc_backup) {
/* remove from txn, if tracked */
if ((mc->mc_flags & C_UNTRACK) && mc->mc_txn->mt_cursors) {
MDB_cursor **prev = &mc->mc_txn->mt_cursors[mc->mc_dbi];
@@ -5964,8 +6335,7 @@ mdb_cursor_close(MDB_cursor *mc)
if (*prev == mc)
*prev = mc->mc_next;
}
- if (mc->mc_flags & C_ALLOCD)
- free(mc);
+ free(mc);
}
}
@@ -6462,6 +6832,7 @@ mdb_rebalance(MDB_cursor *mc)
return rc;
mc->mc_db->md_depth--;
mc->mc_db->md_branch_pages--;
+ mc->mc_ki[0] = mc->mc_ki[1];
{
/* Adjust other cursors pointing to mp */
MDB_cursor *m2, *m3;
@@ -6480,6 +6851,7 @@ mdb_rebalance(MDB_cursor *mc)
m3->mc_pg[0] = mc->mc_pg[0];
m3->mc_snum = 1;
m3->mc_top = 0;
+ m3->mc_ki[0] = m3->mc_ki[1];
}
}
}
@@ -6543,7 +6915,7 @@ mdb_rebalance(MDB_cursor *mc)
rc = mdb_page_merge(&mn, mc);
else
rc = mdb_page_merge(mc, &mn);
- mc->mc_flags &= ~C_INITIALIZED;
+ mc->mc_flags &= ~(C_INITIALIZED|C_EOF);
}
return rc;
}
@@ -6553,9 +6925,14 @@ static int
mdb_cursor_del0(MDB_cursor *mc, MDB_node *leaf)
{
int rc;
+ MDB_page *mp;
+ indx_t ki;
+
+ mp = mc->mc_pg[mc->mc_top];
+ ki = mc->mc_ki[mc->mc_top];
/* add overflow pages to free list */
- if (!IS_LEAF2(mc->mc_pg[mc->mc_top]) && F_ISSET(leaf->mn_flags, F_BIGDATA)) {
+ if (!IS_LEAF2(mp) && F_ISSET(leaf->mn_flags, F_BIGDATA)) {
MDB_page *omp;
pgno_t pg;
@@ -6564,14 +6941,36 @@ mdb_cursor_del0(MDB_cursor *mc, MDB_node *leaf)
(rc = mdb_ovpage_free(mc, omp)))
return rc;
}
- mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], mc->mc_db->md_pad);
+ mdb_node_del(mp, ki, mc->mc_db->md_pad);
mc->mc_db->md_entries--;
rc = mdb_rebalance(mc);
if (rc != MDB_SUCCESS)
mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
/* if mc points past last node in page, invalidate */
else if (mc->mc_ki[mc->mc_top] >= NUMKEYS(mc->mc_pg[mc->mc_top]))
- mc->mc_flags &= ~C_INITIALIZED;
+ mc->mc_flags &= ~(C_INITIALIZED|C_EOF);
+
+ {
+ /* Adjust other cursors pointing to mp */
+ MDB_cursor *m2;
+ unsigned int nkeys;
+ MDB_dbi dbi = mc->mc_dbi;
+
+ mp = mc->mc_pg[mc->mc_top];
+ nkeys = NUMKEYS(mp);
+ for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
+ if (m2 == mc)
+ continue;
+ if (!(m2->mc_flags & C_INITIALIZED))
+ continue;
+ if (m2->mc_pg[mc->mc_top] == mp) {
+ if (m2->mc_ki[mc->mc_top] > ki)
+ m2->mc_ki[mc->mc_top]--;
+ if (m2->mc_ki[mc->mc_top] >= nkeys)
+ m2->mc_flags &= ~(C_INITIALIZED|C_EOF);
+ }
+ }
+ }
return rc;
}
@@ -6623,6 +7022,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
* run out of space, triggering a split. We need this
* cursor to be consistent until the end of the rebalance.
*/
+ mc.mc_flags |= C_UNTRACK;
mc.mc_next = txn->mt_cursors[dbi];
txn->mt_cursors[dbi] = &mc;
rc = mdb_cursor_del(&mc, data ? 0 : MDB_NODUPDATA);
@@ -7007,7 +7407,7 @@ done:
m3 = m2;
if (m3 == mc)
continue;
- if (!(m3->mc_flags & C_INITIALIZED))
+ if (!(m2->mc_flags & m3->mc_flags & C_INITIALIZED))
continue;
if (m3->mc_flags & C_SPLITTING)
continue;
@@ -7403,7 +7803,7 @@ int mdb_drop(MDB_txn *txn, MDB_dbi dbi, int del)
rc = mdb_drop0(mc, mc->mc_db->md_flags & MDB_DUPSORT);
/* Invalidate the dropped DB's cursors */
for (m2 = txn->mt_cursors[dbi]; m2; m2 = m2->mc_next)
- m2->mc_flags &= ~C_INITIALIZED;
+ m2->mc_flags &= ~(C_INITIALIZED|C_EOF);
if (rc)
goto leave;
diff --git a/libraries/liblmdb/midl.c b/libraries/liblmdb/midl.c
index e7bd680cb0..86e4592d2d 100644
--- a/libraries/liblmdb/midl.c
+++ b/libraries/liblmdb/midl.c
@@ -31,8 +31,7 @@
*/
#define CMP(x,y) ( (x) < (y) ? -1 : (x) > (y) )
-#if 0 /* superseded by append/sort */
-static unsigned mdb_midl_search( MDB_IDL ids, MDB_ID id )
+unsigned mdb_midl_search( MDB_IDL ids, MDB_ID id )
{
/*
* binary search of id in ids
@@ -67,6 +66,7 @@ static unsigned mdb_midl_search( MDB_IDL ids, MDB_ID id )
return cursor;
}
+#if 0 /* superseded by append/sort */
int mdb_midl_insert( MDB_IDL ids, MDB_ID id )
{
unsigned x, i;
diff --git a/libraries/liblmdb/midl.h b/libraries/liblmdb/midl.h
index 9ce7133c6e..b0bdff3f49 100644
--- a/libraries/liblmdb/midl.h
+++ b/libraries/liblmdb/midl.h
@@ -74,14 +74,12 @@ typedef MDB_ID *MDB_IDL;
xidl[xlen] = (id); \
} while (0)
-#if 0 /* superseded by append/sort */
- /** Insert an ID into an IDL.
- * @param[in,out] ids The IDL to insert into.
- * @param[in] id The ID to insert.
- * @return 0 on success, -1 if ID was already present, -2 on error.
+ /** Search for an ID in an IDL.
+ * @param[in] ids The IDL to search.
+ * @param[in] id The ID to search for.
+ * @return The index of the first ID greater than or equal to \b id.
*/
-int mdb_midl_insert( MDB_IDL ids, MDB_ID id );
-#endif
+unsigned mdb_midl_search( MDB_IDL ids, MDB_ID id );
/** Allocate an IDL.
* Allocates memory for an IDL of the given size.
diff --git a/libraries/liblmdb/mtest.c b/libraries/liblmdb/mtest.c
index 55cdd43c38..dbc69b8d4c 100644
--- a/libraries/liblmdb/mtest.c
+++ b/libraries/liblmdb/mtest.c
@@ -129,6 +129,8 @@ int main(int argc,char * argv[])
rc = mdb_cursor_open(txn, dbi, &cur2);
for (i=0; i<50; i++) {
rc = mdb_cursor_get(cur2, &key, &data, MDB_NEXT);
+ if (rc)
+ break;
printf("key: %p %.*s, data: %p %.*s\n",
key.mv_data, (int) key.mv_size, (char *) key.mv_data,
data.mv_data, (int) data.mv_size, (char *) data.mv_data);
@@ -142,6 +144,7 @@ int main(int argc,char * argv[])
data.mv_data, (int) data.mv_size, (char *) data.mv_data);
for (i=0; i<32; i++) {
rc = mdb_cursor_get(cur2, &key, &data, MDB_NEXT);
+ if (rc) break;
printf("key: %p %.*s, data: %p %.*s\n",
key.mv_data, (int) key.mv_size, (char *) key.mv_data,
data.mv_data, (int) data.mv_size, (char *) data.mv_data);
@@ -158,6 +161,7 @@ int main(int argc,char * argv[])
data.mv_data, (int) data.mv_size, (char *) data.mv_data);
for (i=0; i<32; i++) {
rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT);
+ if (rc) break;
printf("key: %p %.*s, data: %p %.*s\n",
key.mv_data, (int) key.mv_size, (char *) key.mv_data,
data.mv_data, (int) data.mv_size, (char *) data.mv_data);