diff --git a/libraries/liblmdb/lmdb.h b/libraries/liblmdb/lmdb.h index 8a46c1d0d1..f6dfa60c38 100644 --- a/libraries/liblmdb/lmdb.h +++ b/libraries/liblmdb/lmdb.h @@ -66,6 +66,10 @@ * BSD systems or when otherwise configured with MDB_USE_POSIX_SEM. * Multiple users can cause startup to fail later, as noted above. * + * - There is normally no pure read-only mode, since readers need write + * access to locks and lock file. Exceptions: On read-only filesystems + * or with the #MDB_NOLOCK flag described under #mdb_env_open(). + * * - A thread can only use one transaction at a time, plus any child * transactions. Each transaction belongs to one thread. See below. * The #MDB_NOTLS flag changes this for read-only transactions. @@ -216,13 +220,13 @@ typedef struct MDB_cursor MDB_cursor; /** @brief Generic structure used for passing keys and data in and out * of the database. * - * Key sizes must be between 1 and the liblmdb build-time constant - * #MDB_MAXKEYSIZE inclusive. This currently defaults to 511. The - * same applies to data sizes in databases with the #MDB_DUPSORT flag. - * Other data items can in theory be from 0 to 0xffffffff bytes long. - * * Values returned from the database are valid only until a subsequent - * update operation, or the end of the transaction. + * update operation, or the end of the transaction. Do not modify or + * free them, they commonly point into the database itself. + * + * Key sizes must be between 1 and #mdb_env_get_maxkeysize() inclusive. + * The same applies to data sizes in databases with the #MDB_DUPSORT flag. + * Other data items can in theory be from 0 to 0xffffffff bytes long. */ typedef struct MDB_val { size_t mv_size; /**< size of the data item */ @@ -265,10 +269,12 @@ typedef void (MDB_rel_func)(MDB_val *item, void *oldptr, void *newptr, void *rel #define MDB_NOMETASYNC 0x40000 /** use writable mmap */ #define MDB_WRITEMAP 0x80000 - /** use asynchronous msync when MDB_WRITEMAP is used */ + /** use asynchronous msync when #MDB_WRITEMAP is used */ #define MDB_MAPASYNC 0x100000 /** tie reader locktable slots to #MDB_txn objects instead of to threads */ #define MDB_NOTLS 0x200000 + /** don't do any locking, caller must manage their own locks */ +#define MDB_NOLOCK 0x400000 /** @} */ /** @defgroup mdb_dbi_open Database Flags @@ -486,6 +492,8 @@ int mdb_env_create(MDB_env **env); * and uses fewer mallocs, but loses protection from application bugs * like wild pointer writes and other bad updates into the database. * Incompatible with nested transactions. + * Processes with and without MDB_WRITEMAP on the same environment do + * not cooperate well. *
  • #MDB_NOMETASYNC * Flush system buffers to disk only once per transaction, omit the * metadata flush. Defer that until the system flushes files to disk, @@ -523,6 +531,13 @@ int mdb_env_create(MDB_env **env); * user threads over individual OS threads need this option. Such an * application must also serialize the write transactions in an OS * thread, since MDB's write locking is unaware of the user threads. + *
  • #MDB_NOLOCK + * Don't do any locking. If concurrent access is anticipated, the + * caller must manage all concurrency itself. For proper operation + * the caller must enforce single-writer semantics, and must ensure + * that no readers are using old transactions while a writer is + * active. The simplest approach is to use an exclusive lock so that + * no readers may be active at all when a writer begins. * * @param[in] mode The UNIX permissions to set on created files. This parameter * is ignored on Windows. @@ -733,8 +748,10 @@ int mdb_env_set_maxdbs(MDB_env *env, MDB_dbi dbs); /** @brief Get the maximum size of a key for the environment. * + * This is the compile-time constant #MDB_MAXKEYSIZE, default 511. + * See @ref MDB_val. * @param[in] env An environment handle returned by #mdb_env_create() - * @return The maximum size of a key. (#MDB_MAXKEYSIZE) + * @return The maximum size of a key */ int mdb_env_get_maxkeysize(MDB_env *env); diff --git a/libraries/liblmdb/mdb.c b/libraries/liblmdb/mdb.c index 9c5e7389de..0aae740614 100644 --- a/libraries/liblmdb/mdb.c +++ b/libraries/liblmdb/mdb.c @@ -171,7 +171,7 @@ #define Z "I" #else -#define Z "z" +#define Z "z" /**< printf format modifier for size_t */ /** For MDB_LOCK_FORMAT: True if readers take a pid lock in the lockfile */ #define MDB_PIDLOCK 1 @@ -317,6 +317,9 @@ static txnid_t mdb_debug_start; * The string is printed literally, with no format processing. */ #define DPUTS(arg) DPRINTF(("%s", arg)) + /** Debuging output value of a cursor DBI: Negative in a sub-cursor. */ +#define DDBI(mc) \ + (((mc)->mc_flags & C_SUB) ? -(int)(mc)->mc_dbi : (int)(mc)->mc_dbi) /** @} */ /** A default memory page size. @@ -425,7 +428,8 @@ typedef uint16_t indx_t; * * If #MDB_NOTLS is set, the slot address is not saved in thread-specific data. * - * No reader table is used if the database is on a read-only filesystem. + * No reader table is used if the database is on a read-only filesystem, or + * if #MDB_NOLOCK is set. * * Since the database uses multi-version concurrency control, readers don't * actually need any locking. This table is used to keep track of which @@ -600,7 +604,7 @@ typedef struct MDB_page { #define P_LEAF 0x02 /**< leaf page */ #define P_OVERFLOW 0x04 /**< overflow page */ #define P_META 0x08 /**< meta page */ -#define P_DIRTY 0x10 /**< dirty page */ +#define P_DIRTY 0x10 /**< dirty page, also set for #P_SUBP pages */ #define P_LEAF2 0x20 /**< for #MDB_DUPFIXED records */ #define P_SUBP 0x40 /**< for #MDB_DUPSORT sub-pages */ #define P_KEEP 0x8000 /**< leave this page alone during spill */ @@ -786,7 +790,10 @@ typedef struct MDB_db { /** Handle for the default DB. */ #define MAIN_DBI 1 - /** Meta page content. */ + /** Meta page content. + * A meta page is the start point for accessing a database snapshot. + * Pages 0-1 are meta pages. Transaction N writes meta page #(N % 2). + */ typedef struct MDB_meta { /** Stamp identifying this as an MDB file. It must be set * to #MDB_MAGIC. */ @@ -865,9 +872,9 @@ struct MDB_txn { * @ingroup internal * @{ */ -#define DB_DIRTY 0x01 /**< DB was written in this txn */ -#define DB_STALE 0x02 /**< DB record is older than txnID */ -#define DB_NEW 0x04 /**< DB handle opened in this txn */ +#define DB_DIRTY 0x01 /**< DB was modified or is DUPSORT data */ +#define DB_STALE 0x02 /**< Named-DB record is older than txnID */ +#define DB_NEW 0x04 /**< Named-DB handle opened in this txn */ #define DB_VALID 0x08 /**< DB handle is valid, see also #MDB_VALID */ /** @} */ /** In write txns, array of cursors for each DB */ @@ -889,12 +896,12 @@ struct MDB_txn { #define MDB_TXN_SPILLS 0x08 /**< txn or a parent has spilled pages */ /** @} */ unsigned int mt_flags; /**< @ref mdb_txn */ - /** dirty_list maxsize - # of allocated pages allowed, including in parent txns */ - unsigned int mt_dirty_room; - /** Tracks which of the two meta pages was used at the start - * of this transaction. + /** dirty_list room: Array size - #dirty pages visible to this txn. + * Includes ancestor txns' dirty pages not hidden by other txns' + * dirty/spilled pages. Thus commit(nested txn) has room to merge + * dirty_list into mt_parent after freeing hidden mt_parent pages. */ - unsigned int mt_toggle; + unsigned int mt_dirty_room; }; /** Enough space for 2^32 nodes with minimum of 2 keys per node. I.e., plenty. @@ -905,7 +912,14 @@ struct MDB_txn { struct MDB_xcursor; - /** Cursors are used for all DB operations */ + /** Cursors are used for all DB operations. + * A cursor holds a path of (page pointer, key index) from the DB + * root to a position in the DB, plus other state. #MDB_DUPSORT + * cursors include an xcursor to the current data item. Write txns + * track their cursors and keep them up to date when data moves. + * Exception: An xcursor's pointer to a #P_SUBP page can be stale. + * (A node with #F_DUPDATA but no #F_SUBDATA contains a subpage). + */ struct MDB_cursor { /** Next cursor on this DB in this txn */ MDB_cursor *mc_next; @@ -1019,8 +1033,8 @@ struct MDB_env { /** Nested transaction */ typedef struct MDB_ntxn { - MDB_txn mnt_txn; /* the transaction */ - MDB_pgstate mnt_pgstate; /* parent transaction's saved freestate */ + MDB_txn mnt_txn; /**< the transaction */ + MDB_pgstate mnt_pgstate; /**< parent transaction's saved freestate */ } MDB_ntxn; /** max number of pages to commit in one writev() call */ @@ -1042,6 +1056,8 @@ static int mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify); #define MDB_PS_MODIFY 1 #define MDB_PS_ROOTONLY 2 +#define MDB_PS_FIRST 4 +#define MDB_PS_LAST 8 static int mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags); static int mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst); @@ -1255,7 +1271,7 @@ static void mdb_audit(MDB_txn *txn) txn->mt_dbs[i].md_leaf_pages + txn->mt_dbs[i].md_overflow_pages; if (txn->mt_dbs[i].md_flags & MDB_DUPSORT) { - mdb_page_search(&mc, NULL, 0); + mdb_page_search(&mc, NULL, MDB_PS_FIRST); do { unsigned j; MDB_page *mp; @@ -1329,7 +1345,7 @@ mdb_page_free(MDB_env *env, MDB_page *mp) env->me_dpages = mp; } -/* Free a dirty page */ +/** Free a dirty page */ static void mdb_dpage_free(MDB_env *env, MDB_page *dp) { @@ -1356,7 +1372,7 @@ mdb_dlist_free(MDB_txn *txn) dl[0].mid = 0; } -/* Set or clear P_KEEP in dirty, non-overflow, non-sub pages watched by txn. +/** Set or clear P_KEEP in dirty, non-overflow, non-sub pages watched by txn. * @param[in] mc A cursor handle for the current operation. * @param[in] pflags Flags of the pages to update: * P_DIRTY to set P_KEEP, P_DIRTY|P_KEEP to clear it. @@ -1366,10 +1382,12 @@ mdb_dlist_free(MDB_txn *txn) static int mdb_pages_xkeep(MDB_cursor *mc, unsigned pflags, int all) { + enum { Mask = P_SUBP|P_DIRTY|P_KEEP }; MDB_txn *txn = mc->mc_txn; MDB_cursor *m3; MDB_xcursor *mx; - MDB_page *dp; + MDB_page *dp, *mp; + MDB_node *leaf; unsigned i, j; int rc = MDB_SUCCESS, level; @@ -1378,14 +1396,24 @@ mdb_pages_xkeep(MDB_cursor *mc, unsigned pflags, int all) mc = NULL; /* will find mc in mt_cursors */ for (i = txn->mt_numdbs;; mc = txn->mt_cursors[--i]) { for (; mc; mc=mc->mc_next) { - for (m3 = mc; m3->mc_flags & C_INITIALIZED; m3 = &mx->mx_cursor) { - for (j=0; jmc_snum; j++) - if ((m3->mc_pg[j]->mp_flags & (P_SUBP|P_DIRTY|P_KEEP)) - == pflags) - m3->mc_pg[j]->mp_flags ^= P_KEEP; - mx = m3->mc_xcursor; - if (mx == NULL) - break; + if (!(mc->mc_flags & C_INITIALIZED)) + continue; + for (m3 = mc;; m3 = &mx->mx_cursor) { + mp = NULL; + for (j=0; jmc_snum; j++) { + mp = m3->mc_pg[j]; + if ((mp->mp_flags & Mask) == pflags) + mp->mp_flags ^= P_KEEP; + } + mx = m3->mc_xcursor; + /* Proceed to mx if it is at a sub-database */ + if (! (mx && (mx->mx_cursor.mc_flags & C_INITIALIZED))) + break; + if (! (mp && (mp->mp_flags & P_LEAF))) + break; + leaf = NODEPTR(mp, m3->mc_ki[j-1]); + if (!(leaf->mn_flags & F_SUBDATA)) + break; } } if (i == 0) @@ -1401,7 +1429,7 @@ mdb_pages_xkeep(MDB_cursor *mc, unsigned pflags, int all) continue; if ((rc = mdb_page_get(txn, pgno, &dp, &level)) != MDB_SUCCESS) break; - if ((dp->mp_flags & (P_DIRTY|P_KEEP)) == pflags && level <= 1) + if ((dp->mp_flags & Mask) == pflags && level <= 1) dp->mp_flags ^= P_KEEP; } } @@ -1415,15 +1443,12 @@ static int mdb_page_flush(MDB_txn *txn, int keep); /** Spill pages from the dirty list back to disk. * This is intended to prevent running into #MDB_TXN_FULL situations, * but note that they may still occur in a few cases: - * 1) pages in #MDB_DUPSORT sub-DBs are never spilled, so if there - * are too many of these dirtied in one txn, the txn may still get - * too full. + * 1) our estimate of the txn size could be too small. Currently this + * seems unlikely, except with a large number of #MDB_MULTIPLE items. * 2) child txns may run out of space if their parents dirtied a * lot of pages and never spilled them. TODO: we probably should do * a preemptive spill during #mdb_txn_begin() of a child txn, if * the parent's dirty_room is below a given threshold. - * 3) our estimate of the txn size could be too small. At the - * moment this seems unlikely. * * Otherwise, if not using nested txns, it is expected that apps will * not run into #MDB_TXN_FULL any more. The pages are flushed to disk @@ -1541,31 +1566,7 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data) rc = mdb_pages_xkeep(m0, P_DIRTY|P_KEEP, i); done: - if (rc == 0) { - if (txn->mt_parent) { - txn->mt_dirty_room = txn->mt_parent->mt_dirty_room - dl[0].mid; - /* dirty pages that are dirty in an ancestor don't - * count against this txn's dirty_room. - */ - for (i=1; i<=dl[0].mid; i++) { - pgno_t pgno = dl[i].mid; - MDB_txn *tx2; - for (tx2 = txn->mt_parent; tx2; tx2 = tx2->mt_parent) { - j = mdb_mid2l_search(tx2->mt_u.dirty_list, pgno); - if (j <= tx2->mt_u.dirty_list[0].mid && - tx2->mt_u.dirty_list[j].mid == pgno) { - txn->mt_dirty_room++; - break; - } - } - } - } else { - txn->mt_dirty_room = MDB_IDL_UM_MAX - dl[0].mid; - } - txn->mt_flags |= MDB_TXN_SPILLS; - } else { - txn->mt_flags |= MDB_TXN_ERROR; - } + txn->mt_flags |= rc ? MDB_TXN_ERROR : MDB_TXN_SPILLS; return rc; } @@ -1575,12 +1576,14 @@ mdb_find_oldest(MDB_txn *txn) { int i; txnid_t mr, oldest = txn->mt_txnid - 1; - MDB_reader *r = txn->mt_env->me_txns->mti_readers; - for (i = txn->mt_env->me_txns->mti_numreaders; --i >= 0; ) { - if (r[i].mr_pid) { - mr = r[i].mr_txnid; - if (oldest > mr) - oldest = mr; + if (txn->mt_env->me_txns) { + MDB_reader *r = txn->mt_env->me_txns->mti_readers; + for (i = txn->mt_env->me_txns->mti_numreaders; --i >= 0; ) { + if (r[i].mr_pid) { + mr = r[i].mr_txnid; + if (oldest > mr) + oldest = mr; + } } } return oldest; @@ -1790,26 +1793,28 @@ mdb_page_copy(MDB_page *dst, MDB_page *src, unsigned int psize) /** Pull a page off the txn's spill list, if present. * If a page being referenced was spilled to disk in this txn, bring * it back and make it dirty/writable again. - * @param[in] tx0 the transaction handle. + * @param[in] txn the transaction handle. * @param[in] mp the page being referenced. * @param[out] ret the writable page, if any. ret is unchanged if * mp wasn't spilled. */ static int -mdb_page_unspill(MDB_txn *tx0, MDB_page *mp, MDB_page **ret) +mdb_page_unspill(MDB_txn *txn, MDB_page *mp, MDB_page **ret) { - MDB_env *env = tx0->mt_env; - MDB_txn *txn; + MDB_env *env = txn->mt_env; + const MDB_txn *tx2; unsigned x; pgno_t pgno = mp->mp_pgno, pn = pgno << 1; - for (txn = tx0; txn; txn=txn->mt_parent) { - if (!txn->mt_spill_pgs) + for (tx2 = txn; tx2; tx2=tx2->mt_parent) { + if (!tx2->mt_spill_pgs) continue; - x = mdb_midl_search(txn->mt_spill_pgs, pn); - if (x <= txn->mt_spill_pgs[0] && txn->mt_spill_pgs[x] == pn) { + x = mdb_midl_search(tx2->mt_spill_pgs, pn); + if (x <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[x] == pn) { MDB_page *np; int num; + if (txn->mt_dirty_room == 0) + return MDB_TXN_FULL; if (IS_OVERFLOW(mp)) num = mp->mp_pages; else @@ -1825,7 +1830,7 @@ mdb_page_unspill(MDB_txn *tx0, MDB_page *mp, MDB_page **ret) else mdb_page_copy(np, mp, env->me_psize); } - if (txn == tx0) { + if (tx2 == txn) { /* If in current txn, this page is no longer spilled. * If it happens to be the last page, truncate the spill list. * Otherwise mark it as deleted by setting the LSB. @@ -1838,22 +1843,7 @@ mdb_page_unspill(MDB_txn *tx0, MDB_page *mp, MDB_page **ret) * page remains spilled until child commits */ - if (txn->mt_parent) { - MDB_txn *tx2; - /* If this page is also in a parent's dirty list, then - * it's already accounted in dirty_room, and we need to - * cancel out the decrement that mdb_page_dirty does. - */ - for (tx2 = txn->mt_parent; tx2; tx2 = tx2->mt_parent) { - x = mdb_mid2l_search(tx2->mt_u.dirty_list, pgno); - if (x <= tx2->mt_u.dirty_list[0].mid && - tx2->mt_u.dirty_list[x].mid == pgno) { - tx0->mt_dirty_room++; - break; - } - } - } - mdb_page_dirty(tx0, np); + mdb_page_dirty(txn, np); np->mp_flags |= P_DIRTY; *ret = np; break; @@ -1872,7 +1862,6 @@ mdb_page_touch(MDB_cursor *mc) MDB_page *mp = mc->mc_pg[mc->mc_top], *np; MDB_txn *txn = mc->mc_txn; MDB_cursor *m2, *m3; - MDB_dbi dbi; pgno_t pgno; int rc; @@ -1889,7 +1878,8 @@ mdb_page_touch(MDB_cursor *mc) (rc = mdb_page_alloc(mc, 1, &np))) return rc; pgno = np->mp_pgno; - DPRINTF(("touched db %u page %"Z"u -> %"Z"u", mc->mc_dbi,mp->mp_pgno,pgno)); + DPRINTF(("touched db %d page %"Z"u -> %"Z"u", DDBI(mc), + mp->mp_pgno, pgno)); assert(mp->mp_pgno != pgno); mdb_midl_xappend(txn->mt_free_pgs, mp->mp_pgno); /* Update the parent page, if any, to point to the new page */ @@ -1935,17 +1925,16 @@ mdb_page_touch(MDB_cursor *mc) done: /* Adjust cursors pointing to mp */ mc->mc_pg[mc->mc_top] = np; - dbi = mc->mc_dbi; + m2 = txn->mt_cursors[mc->mc_dbi]; if (mc->mc_flags & C_SUB) { - dbi--; - for (m2 = txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { + for (; m2; m2=m2->mc_next) { m3 = &m2->mc_xcursor->mx_cursor; if (m3->mc_snum < mc->mc_snum) continue; if (m3->mc_pg[mc->mc_top] == mp) m3->mc_pg[mc->mc_top] = np; } } else { - for (m2 = txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { + for (; m2; m2=m2->mc_next) { if (m2->mc_snum < mc->mc_snum) continue; if (m2->mc_pg[mc->mc_top] == mp) { m2->mc_pg[mc->mc_top] = np; @@ -2130,7 +2119,9 @@ static int mdb_txn_renew0(MDB_txn *txn) { MDB_env *env = txn->mt_env; - unsigned int i; + MDB_txninfo *ti = env->me_txns; + MDB_meta *meta; + unsigned int i, nr; uint16_t x; int rc, new_notls = 0; @@ -2139,9 +2130,9 @@ mdb_txn_renew0(MDB_txn *txn) txn->mt_dbxs = env->me_dbxs; /* mostly static anyway */ if (txn->mt_flags & MDB_TXN_RDONLY) { - if (!env->me_txns) { - i = mdb_env_pick_meta(env); - txn->mt_txnid = env->me_metas[i]->mm_txnid; + if (!ti) { + meta = env->me_metas[ mdb_env_pick_meta(env) ]; + txn->mt_txnid = meta->mm_txnid; txn->mt_u.reader = NULL; } else { MDB_reader *r = (env->me_flags & MDB_NOTLS) ? txn->mt_u.reader : @@ -2163,36 +2154,43 @@ mdb_txn_renew0(MDB_txn *txn) } LOCK_MUTEX_R(env); - for (i=0; ime_txns->mti_numreaders; i++) - if (env->me_txns->mti_readers[i].mr_pid == 0) + nr = ti->mti_numreaders; + for (i=0; imti_readers[i].mr_pid == 0) break; if (i == env->me_maxreaders) { UNLOCK_MUTEX_R(env); return MDB_READERS_FULL; } - env->me_txns->mti_readers[i].mr_pid = pid; - env->me_txns->mti_readers[i].mr_tid = tid; - if (i >= env->me_txns->mti_numreaders) - env->me_txns->mti_numreaders = i+1; + ti->mti_readers[i].mr_pid = pid; + ti->mti_readers[i].mr_tid = tid; + if (i == nr) + ti->mti_numreaders = ++nr; /* Save numreaders for un-mutexed mdb_env_close() */ - env->me_numreaders = env->me_txns->mti_numreaders; + env->me_numreaders = nr; UNLOCK_MUTEX_R(env); - r = &env->me_txns->mti_readers[i]; + + r = &ti->mti_readers[i]; new_notls = (env->me_flags & MDB_NOTLS); if (!new_notls && (rc=pthread_setspecific(env->me_txkey, r))) { r->mr_pid = 0; return rc; } } - txn->mt_txnid = r->mr_txnid = env->me_txns->mti_txnid; + txn->mt_txnid = r->mr_txnid = ti->mti_txnid; txn->mt_u.reader = r; + meta = env->me_metas[txn->mt_txnid & 1]; } - txn->mt_toggle = txn->mt_txnid & 1; } else { - LOCK_MUTEX_W(env); + if (ti) { + LOCK_MUTEX_W(env); - txn->mt_txnid = env->me_txns->mti_txnid; - txn->mt_toggle = txn->mt_txnid & 1; + txn->mt_txnid = ti->mti_txnid; + meta = env->me_metas[txn->mt_txnid & 1]; + } else { + meta = env->me_metas[ mdb_env_pick_meta(env) ]; + txn->mt_txnid = meta->mm_txnid; + } txn->mt_txnid++; #if MDB_DEBUG if (txn->mt_txnid == mdb_debug_start) @@ -2208,10 +2206,10 @@ mdb_txn_renew0(MDB_txn *txn) } /* Copy the DB info and flags */ - memcpy(txn->mt_dbs, env->me_metas[txn->mt_toggle]->mm_dbs, 2 * sizeof(MDB_db)); + memcpy(txn->mt_dbs, meta->mm_dbs, 2 * sizeof(MDB_db)); /* Moved to here to avoid a data race in read TXNs */ - txn->mt_next_pgno = env->me_metas[txn->mt_toggle]->mm_last_pg+1; + txn->mt_next_pgno = meta->mm_last_pg+1; for (i=2; imt_numdbs; i++) { x = env->me_dbflags[i]; @@ -2307,7 +2305,6 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret) return ENOMEM; } txn->mt_txnid = parent->mt_txnid; - txn->mt_toggle = parent->mt_toggle; txn->mt_dirty_room = parent->mt_dirty_room; txn->mt_u.dirty_list[0].mid = 0; txn->mt_spill_pgs = NULL; @@ -2433,7 +2430,8 @@ mdb_txn_reset0(MDB_txn *txn, const char *act) env->me_txn = NULL; /* The writer mutex was locked in mdb_txn_begin. */ - UNLOCK_MUTEX_W(env); + if (env->me_txns) + UNLOCK_MUTEX_W(env); } } @@ -2488,7 +2486,7 @@ mdb_freelist_save(MDB_txn *txn) if (env->me_pghead) { /* Make sure first page of freeDB is touched and on freelist */ - rc = mdb_page_search(&mc, NULL, MDB_PS_MODIFY); + rc = mdb_page_search(&mc, NULL, MDB_PS_FIRST|MDB_PS_MODIFY); if (rc && rc != MDB_NOTFOUND) return rc; } @@ -2516,9 +2514,7 @@ mdb_freelist_save(MDB_txn *txn) if (freecnt < txn->mt_free_pgs[0]) { if (!freecnt) { /* Make sure last page of freeDB is touched and on freelist */ - key.mv_size = MDB_MAXKEYSIZE+1; - key.mv_data = NULL; - rc = mdb_page_search(&mc, &key, MDB_PS_MODIFY); + rc = mdb_page_search(&mc, NULL, MDB_PS_LAST|MDB_PS_MODIFY); if (rc && rc != MDB_NOTFOUND) return rc; } @@ -2585,7 +2581,7 @@ mdb_freelist_save(MDB_txn *txn) total_room += head_room; } - /* Fill in the reserved, touched me_pghead records */ + /* Fill in the reserved me_pghead records */ rc = MDB_SUCCESS; if (mop_len) { MDB_val key, data; @@ -2655,8 +2651,7 @@ mdb_page_flush(MDB_txn *txn, int keep) } dp->mp_flags &= ~P_DIRTY; } - dl[0].mid = j; - return MDB_SUCCESS; + goto done; } /* Write the pages */ @@ -2750,8 +2745,11 @@ mdb_page_flush(MDB_txn *txn, int keep) } mdb_dpage_free(env, dp); } - dl[0].mid = j; +done: + i--; + txn->mt_dirty_room += i - j; + dl[0].mid = j; return MDB_SUCCESS; } @@ -2791,14 +2789,18 @@ mdb_txn_commit(MDB_txn *txn) if (txn->mt_parent) { MDB_txn *parent = txn->mt_parent; - unsigned x, y, len; MDB_ID2L dst, src; + MDB_IDL pspill; + unsigned x, y, len, ps_len; /* Append our free list to parent's */ rc = mdb_midl_append_list(&parent->mt_free_pgs, txn->mt_free_pgs); if (rc) goto fail; mdb_midl_free(txn->mt_free_pgs); + /* Failures after this must either undo the changes + * to the parent or set MDB_TXN_ERROR in the parent. + */ parent->mt_next_pgno = txn->mt_next_pgno; parent->mt_flags = txn->mt_flags; @@ -2820,37 +2822,26 @@ mdb_txn_commit(MDB_txn *txn) dst = parent->mt_u.dirty_list; src = txn->mt_u.dirty_list; /* Remove anything in our dirty list from parent's spill list */ - if (parent->mt_spill_pgs) { - x = parent->mt_spill_pgs[0]; - len = x; - /* zero out our dirty pages in parent spill list */ - for (i=1; i<=src[0].mid; i++) { + if ((pspill = parent->mt_spill_pgs) && (ps_len = pspill[0])) { + x = y = ps_len; + pspill[0] = (pgno_t)-1; + /* Mark our dirty pages as deleted in parent spill list */ + for (i=0, len=src[0].mid; ++i <= len; ) { MDB_ID pn = src[i].mid << 1; - if (pn < parent->mt_spill_pgs[x]) - continue; - if (pn > parent->mt_spill_pgs[x]) { - if (x <= 1) - break; + while (pn > pspill[x]) x--; - continue; + if (pn == pspill[x]) { + pspill[x] = 1; + y = --x; } - parent->mt_spill_pgs[x] = 0; - len--; - } - /* OK, we had a few hits, squash zeros from the spill list */ - if (len < parent->mt_spill_pgs[0]) { - x=1; - for (y=1; y<=parent->mt_spill_pgs[0]; y++) { - if (parent->mt_spill_pgs[y]) { - if (y != x) { - parent->mt_spill_pgs[x] = parent->mt_spill_pgs[y]; - } - x++; - } - } - parent->mt_spill_pgs[0] = len; } + /* Squash deleted pagenums if we deleted any */ + for (x=y; ++x <= ps_len; ) + if (!(pspill[x] & 1)) + pspill[++y] = pspill[x]; + pspill[0] = y; } + /* Find len = length of merging our dirty list with parent's */ x = dst[0].mid; dst[0].mid = 0; /* simplify loops */ @@ -2884,7 +2875,10 @@ mdb_txn_commit(MDB_txn *txn) parent->mt_dirty_room = txn->mt_dirty_room; if (txn->mt_spill_pgs) { if (parent->mt_spill_pgs) { - mdb_midl_append_list(&parent->mt_spill_pgs, txn->mt_spill_pgs); + /* TODO: Prevent failure here, so parent does not fail */ + rc = mdb_midl_append_list(&parent->mt_spill_pgs, txn->mt_spill_pgs); + if (rc) + parent->mt_flags |= MDB_TXN_ERROR; mdb_midl_free(txn->mt_spill_pgs); mdb_midl_sort(parent->mt_spill_pgs); } else { @@ -2895,7 +2889,7 @@ mdb_txn_commit(MDB_txn *txn) parent->mt_child = NULL; mdb_midl_free(((MDB_ntxn *)txn)->mnt_pgstate.mf_pghead); free(txn); - return MDB_SUCCESS; + return rc; } if (txn != env->me_txn) { @@ -2954,7 +2948,8 @@ done: env->me_txn = NULL; mdb_dbis_update(txn, 1); - UNLOCK_MUTEX_W(env); + if (env->me_txns) + UNLOCK_MUTEX_W(env); free(txn); return MDB_SUCCESS; @@ -3109,7 +3104,7 @@ mdb_env_write_meta(MDB_txn *txn) assert(txn != NULL); assert(txn->mt_env != NULL); - toggle = !txn->mt_toggle; + toggle = txn->mt_txnid & 1; DPRINTF(("writing meta page %d for root page %"Z"u", toggle, txn->mt_dbs[MAIN_DBI].md_root)); @@ -3200,7 +3195,8 @@ done: * readers will get consistent data regardless of how fresh or * how stale their view of these values is. */ - env->me_txns->mti_txnid = txn->mt_txnid; + if (env->me_txns) + env->me_txns->mti_txnid = txn->mt_txnid; return MDB_SUCCESS; } @@ -3276,7 +3272,7 @@ mdb_env_map(MDB_env *env, void *addr, int newsize) int prot = PROT_READ; if (flags & MDB_WRITEMAP) { prot |= PROT_WRITE; - if (newsize && ftruncate(env->me_fd, env->me_mapsize) < 0) + if (ftruncate(env->me_fd, env->me_mapsize) < 0) return ErrCode(); } env->me_map = mmap(addr, env->me_mapsize, prot, MAP_SHARED, @@ -3920,7 +3916,7 @@ fail: * environment and re-opening it with the new flags. */ #define CHANGEABLE (MDB_NOSYNC|MDB_NOMETASYNC|MDB_MAPASYNC) -#define CHANGELESS (MDB_FIXEDMAP|MDB_NOSUBDIR|MDB_RDONLY|MDB_WRITEMAP|MDB_NOTLS) +#define CHANGELESS (MDB_FIXEDMAP|MDB_NOSUBDIR|MDB_RDONLY|MDB_WRITEMAP|MDB_NOTLS|MDB_NOLOCK) int mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode) @@ -3973,7 +3969,7 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode } /* For RDONLY, get lockfile after we know datafile exists */ - if (!F_ISSET(flags, MDB_RDONLY)) { + if (!(flags & (MDB_RDONLY|MDB_NOLOCK))) { rc = mdb_env_setup_locks(env, lpath, mode, &excl); if (rc) goto leave; @@ -4003,7 +3999,7 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode goto leave; } - if (F_ISSET(flags, MDB_RDONLY)) { + if ((flags & (MDB_RDONLY|MDB_NOLOCK)) == MDB_RDONLY) { rc = mdb_env_setup_locks(env, lpath, mode, &excl); if (rc) goto leave; @@ -4246,14 +4242,6 @@ mdb_env_copy(MDB_env *env, const char *path) newfd = CreateFile(lpath, GENERIC_WRITE, 0, NULL, CREATE_NEW, FILE_FLAG_NO_BUFFERING|FILE_FLAG_WRITE_THROUGH, NULL); #else -#ifdef O_DIRECT - /* The OS supports O_DIRECT, try with it */ - newfd = open(lpath, O_WRONLY|O_CREAT|O_EXCL|O_DIRECT, 0666); - /* But open can fail if O_DIRECT isn't supported by the file system - * so retry without the flag - */ - if (newfd == INVALID_HANDLE_VALUE && ErrCode() == EINVAL) -#endif newfd = open(lpath, O_WRONLY|O_CREAT|O_EXCL, 0666); #endif if (newfd == INVALID_HANDLE_VALUE) { @@ -4261,6 +4249,11 @@ mdb_env_copy(MDB_env *env, const char *path) goto leave; } +#ifdef O_DIRECT + /* Set O_DIRECT if the file system supports it */ + if ((rc = fcntl(newfd, F_GETFL)) != -1) + (void) fcntl(newfd, F_SETFL, rc | O_DIRECT); +#endif #ifdef F_NOCACHE /* __APPLE__ */ rc = fcntl(newfd, F_NOCACHE, 1); if (rc) { @@ -4308,7 +4301,7 @@ mdb_cmp_long(const MDB_val *a, const MDB_val *b) *(size_t *)a->mv_data > *(size_t *)b->mv_data; } -/** Compare two items pointing at aligned int's */ +/** Compare two items pointing at aligned unsigned int's */ static int mdb_cmp_int(const MDB_val *a, const MDB_val *b) { @@ -4316,7 +4309,7 @@ mdb_cmp_int(const MDB_val *a, const MDB_val *b) *(unsigned int *)a->mv_data > *(unsigned int *)b->mv_data; } -/** Compare two items pointing at ints of unknown alignment. +/** Compare two items pointing at unsigned ints of unknown alignment. * Nodes and keys are guaranteed to be 2-byte aligned. */ static int @@ -4514,8 +4507,8 @@ mdb_cursor_pop(MDB_cursor *mc) if (mc->mc_snum) mc->mc_top--; - DPRINTF(("popped page %"Z"u off db %u cursor %p", top->mp_pgno, - mc->mc_dbi, (void *) mc)); + DPRINTF(("popped page %"Z"u off db %d cursor %p", top->mp_pgno, + DDBI(mc), (void *) mc)); } } @@ -4523,8 +4516,8 @@ mdb_cursor_pop(MDB_cursor *mc) static int mdb_cursor_push(MDB_cursor *mc, MDB_page *mp) { - DPRINTF(("pushing page %"Z"u on db %u cursor %p", mp->mp_pgno, - mc->mc_dbi, (void *) mc)); + DPRINTF(("pushing page %"Z"u on db %d cursor %p", mp->mp_pgno, + DDBI(mc), (void *) mc)); if (mc->mc_snum >= CURSOR_STACK) { assert(mc->mc_snum < CURSOR_STACK); @@ -4598,18 +4591,11 @@ done: return MDB_SUCCESS; } -/** Search for the page a given key should be in. - * Pushes parent pages on the cursor stack. This function continues a - * search on a cursor that has already been initialized. (Usually by - * #mdb_page_search() but also by #mdb_node_move().) - * @param[in,out] mc the cursor for this operation. - * @param[in] key the key to search for. If NULL, search for the lowest - * page. (This is used by #mdb_cursor_first().) - * @param[in] modify If true, visited pages are updated with new page numbers. - * @return 0 on success, non-zero on failure. +/** Finish #mdb_page_search() / #mdb_page_search_lowest(). + * The cursor is at the root page, set up the rest of it. */ static int -mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify) +mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int flags) { MDB_page *mp = mc->mc_pg[mc->mc_top]; int rc; @@ -4623,11 +4609,10 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify) assert(NUMKEYS(mp) > 1); DPRINTF(("found index 0 to page %"Z"u", NODEPGNO(NODEPTR(mp, 0)))); - if (key == NULL) /* Initialize cursor to first page. */ + if (flags & (MDB_PS_FIRST|MDB_PS_LAST)) { i = 0; - else if (key->mv_size > MDB_MAXKEYSIZE && key->mv_data == NULL) { - /* cursor to last page */ - i = NUMKEYS(mp)-1; + if (flags & MDB_PS_LAST) + i = NUMKEYS(mp) - 1; } else { int exact; node = mdb_node_search(mc, key, &exact); @@ -4640,10 +4625,9 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify) i--; } } + DPRINTF(("following index %u for key [%s]", i, DKEY(key))); } - if (key) - DPRINTF(("following index %u for key [%s]", i, DKEY(key))); assert(i < NUMKEYS(mp)); node = NODEPTR(mp, i); @@ -4654,7 +4638,7 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify) if ((rc = mdb_cursor_push(mc, mp))) return rc; - if (modify) { + if (flags & MDB_PS_MODIFY) { if ((rc = mdb_page_touch(mc)) != 0) return rc; mp = mc->mc_pg[mc->mc_top]; @@ -4668,7 +4652,7 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify) } DPRINTF(("found leaf page %"Z"u for key [%s]", mp->mp_pgno, - key ? DKEY(key) : NULL)); + key ? DKEY(key) : "null")); mc->mc_flags |= C_INITIALIZED; mc->mc_flags &= ~C_EOF; @@ -4694,18 +4678,17 @@ mdb_page_search_lowest(MDB_cursor *mc) mc->mc_ki[mc->mc_top] = 0; if ((rc = mdb_cursor_push(mc, mp))) return rc; - return mdb_page_search_root(mc, NULL, 0); + return mdb_page_search_root(mc, NULL, MDB_PS_FIRST); } /** Search for the page a given key should be in. - * Pushes parent pages on the cursor stack. This function just sets up - * the search; it finds the root page for \b mc's database and sets this - * as the root of the cursor's stack. Then #mdb_page_search_root() is - * called to complete the search. + * Push it and its parent pages on the cursor stack. * @param[in,out] mc the cursor for this operation. - * @param[in] key the key to search for. If NULL, search for the lowest - * page. (This is used by #mdb_cursor_first().) - * @param[in] flags If MDB_PS_MODIFY set, visited pages are updated with new page numbers. + * @param[in] key the key to search for, or NULL for first/last page. + * @param[in] flags If MDB_PS_MODIFY is set, visited pages in the DB + * are touched (updated with new page numbers). + * If MDB_PS_FIRST or MDB_PS_LAST is set, find first or last leaf. + * This is used by #mdb_cursor_first() and #mdb_cursor_last(). * If MDB_PS_ROOTONLY set, just fetch root node, no further lookups. * @return 0 on success, non-zero on failure. */ @@ -4716,23 +4699,20 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags) pgno_t root; /* Make sure the txn is still viable, then find the root from - * the txn's db table. + * the txn's db table and set it as the root of the cursor's stack. */ if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_ERROR)) { DPUTS("transaction has failed, must abort"); return MDB_BAD_TXN; } else { /* Make sure we're using an up-to-date root */ - if (mc->mc_dbi > MAIN_DBI) { - if ((*mc->mc_dbflag & DB_STALE) || - ((flags & MDB_PS_MODIFY) && !(*mc->mc_dbflag & DB_DIRTY))) { + if (*mc->mc_dbflag & DB_STALE) { MDB_cursor mc2; - unsigned char dbflag = 0; mdb_cursor_init(&mc2, mc->mc_txn, MAIN_DBI, NULL); - rc = mdb_page_search(&mc2, &mc->mc_dbx->md_name, flags & MDB_PS_MODIFY); + rc = mdb_page_search(&mc2, &mc->mc_dbx->md_name, 0); if (rc) return rc; - if (*mc->mc_dbflag & DB_STALE) { + { MDB_val data; int exact = 0; uint16_t flags; @@ -4752,11 +4732,7 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags) return MDB_INCOMPATIBLE; memcpy(mc->mc_db, data.mv_data, sizeof(MDB_db)); } - if (flags & MDB_PS_MODIFY) - dbflag = DB_DIRTY; *mc->mc_dbflag &= ~DB_STALE; - *mc->mc_dbflag |= dbflag; - } } root = mc->mc_db->md_root; @@ -4774,8 +4750,8 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags) mc->mc_snum = 1; mc->mc_top = 0; - DPRINTF(("db %u root page %"Z"u has flags 0x%X", - mc->mc_dbi, root, mc->mc_pg[0]->mp_flags)); + DPRINTF(("db %d root page %"Z"u has flags 0x%X", + DDBI(mc), root, mc->mc_pg[0]->mp_flags)); if (flags & MDB_PS_MODIFY) { if ((rc = mdb_page_touch(mc))) @@ -4914,7 +4890,7 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi, if (txn->mt_flags & MDB_TXN_ERROR) return MDB_BAD_TXN; - if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) { + if (key->mv_size > MDB_MAXKEYSIZE) { return MDB_BAD_VALSIZE; } @@ -4966,8 +4942,11 @@ mdb_cursor_sibling(MDB_cursor *mc, int move_right) assert(IS_BRANCH(mc->mc_pg[mc->mc_top])); indx = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); - if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(indx), &mp, NULL) != 0)) + if ((rc = mdb_page_get(mc->mc_txn, NODEPGNO(indx), &mp, NULL)) != 0) { + /* mc will be inconsistent if caller does mc_snum++ as above */ + mc->mc_flags &= ~(C_INITIALIZED|C_EOF); return rc; + } mdb_cursor_push(mc, mp); if (!move_right) @@ -5143,7 +5122,8 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, assert(mc); assert(key); - assert(key->mv_size > 0); + if (key->mv_size == 0) + return MDB_BAD_VALSIZE; if (mc->mc_xcursor) mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF); @@ -5231,9 +5211,10 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, if (!mc->mc_top) { /* There are no other pages */ mc->mc_ki[mc->mc_top] = 0; - if (op == MDB_SET_RANGE) + if (op == MDB_SET_RANGE) { + rc = 0; goto set1; - else + } else return MDB_NOTFOUND; } } @@ -5298,6 +5279,7 @@ set1: if (rc) { if (op == MDB_GET_BOTH || rc > 0) return MDB_NOTFOUND; + rc = 0; } } else { @@ -5327,7 +5309,7 @@ mdb_cursor_first(MDB_cursor *mc, MDB_val *key, MDB_val *data) mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF); if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) { - rc = mdb_page_search(mc, NULL, 0); + rc = mdb_page_search(mc, NULL, MDB_PS_FIRST); if (rc != MDB_SUCCESS) return rc; } @@ -5373,11 +5355,7 @@ mdb_cursor_last(MDB_cursor *mc, MDB_val *key, MDB_val *data) if (!(mc->mc_flags & C_EOF)) { if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) { - MDB_val lkey; - - lkey.mv_size = MDB_MAXKEYSIZE+1; - lkey.mv_data = NULL; - rc = mdb_page_search(mc, &lkey, 0); + rc = mdb_page_search(mc, NULL, MDB_PS_LAST); if (rc != MDB_SUCCESS) return rc; } @@ -5469,7 +5447,7 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data, case MDB_SET_RANGE: if (key == NULL) { rc = EINVAL; - } else if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) { + } else if (key->mv_size > MDB_MAXKEYSIZE) { rc = MDB_BAD_VALSIZE; } else if (op == MDB_SET_RANGE) rc = mdb_cursor_set(mc, key, data, op, NULL); @@ -5575,14 +5553,14 @@ fetchm: return rc; } -/** Touch all the pages in the cursor stack. +/** Touch all the pages in the cursor stack. Set mc_top. * Makes sure all the pages are writable, before attempting a write operation. * @param[in] mc The cursor to operate on. */ static int mdb_cursor_touch(MDB_cursor *mc) { - int rc; + int rc = MDB_SUCCESS; if (mc->mc_dbi > MAIN_DBI && !(*mc->mc_dbflag & DB_DIRTY)) { MDB_cursor mc2; @@ -5593,13 +5571,14 @@ mdb_cursor_touch(MDB_cursor *mc) return rc; *mc->mc_dbflag |= DB_DIRTY; } - for (mc->mc_top = 0; mc->mc_top < mc->mc_snum; mc->mc_top++) { - rc = mdb_page_touch(mc); - if (rc) - return rc; + mc->mc_top = 0; + if (mc->mc_snum) { + do { + rc = mdb_page_touch(mc); + } while (!rc && ++(mc->mc_top) < mc->mc_snum); + mc->mc_top = mc->mc_snum-1; } - mc->mc_top = mc->mc_snum-1; - return MDB_SUCCESS; + return rc; } /** Do not spill pages to disk if txn is getting full, may fail instead */ @@ -5650,8 +5629,8 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, return MDB_BAD_VALSIZE; #endif - DPRINTF(("==> put db %u key [%s], size %"Z"u, data size %"Z"u", - mc->mc_dbi, DKEY(key), key ? key->mv_size:0, data->mv_size)); + DPRINTF(("==> put db %d key [%s], size %"Z"u, data size %"Z"u", + DDBI(mc), DKEY(key), key ? key->mv_size : 0, data->mv_size)); dkey.mv_size = 0; @@ -5662,6 +5641,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, } else if (mc->mc_db->md_root == P_INVALID) { /* new database, cursor has nothing to point to */ mc->mc_snum = 0; + mc->mc_top = 0; mc->mc_flags &= ~C_INITIALIZED; rc = MDB_NO_ROOT; } else { @@ -5980,9 +5960,6 @@ new_sub: unsigned i = mc->mc_top; MDB_page *mp = mc->mc_pg[i]; - if (mc->mc_flags & C_SUB) - dbi--; - for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (mc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -6089,7 +6066,6 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags) if (!(flags & MDB_NOSPILL) && (rc = mdb_page_spill(mc, NULL, NULL))) return rc; - flags &= ~MDB_NOSPILL; /* TODO: Or change (flags != MDB_NODUPDATA) to ~(flags & MDB_NODUPDATA), not looking at the logic of that code just now */ rc = mdb_cursor_touch(mc); if (rc) @@ -6250,6 +6226,7 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, { unsigned int i; size_t node_size = NODESIZE; + ssize_t room; indx_t ofs; MDB_node *node; MDB_page *mp = mc->mc_pg[mc->mc_top]; @@ -6262,7 +6239,7 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, IS_LEAF(mp) ? "leaf" : "branch", IS_SUBP(mp) ? "sub-" : "", mp->mp_pgno, indx, data ? data->mv_size : 0, - key ? key->mv_size : 0, key ? DKEY(key) : NULL)); + key ? key->mv_size : 0, key ? DKEY(key) : "null")); if (IS_LEAF2(mp)) { /* Move higher keys up one slot. */ @@ -6280,9 +6257,9 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, return MDB_SUCCESS; } + room = (ssize_t)SIZELEFT(mp) - (ssize_t)sizeof(indx_t); if (key != NULL) node_size += key->mv_size; - if (IS_LEAF(mp)) { assert(data); if (F_ISSET(flags, F_BIGDATA)) { @@ -6294,26 +6271,23 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, /* Put data on overflow page. */ DPRINTF(("data size is %"Z"u, node would be %"Z"u, put data on overflow page", data->mv_size, node_size+data->mv_size)); - node_size += sizeof(pgno_t); + node_size += sizeof(pgno_t) + (node_size & 1); + if ((ssize_t)node_size > room) + goto full; if ((rc = mdb_page_new(mc, P_OVERFLOW, ovpages, &ofp))) return rc; DPRINTF(("allocated overflow page %"Z"u", ofp->mp_pgno)); flags |= F_BIGDATA; + goto update; } else { node_size += data->mv_size; } } node_size += node_size & 1; + if ((ssize_t)node_size > room) + goto full; - if (node_size + sizeof(indx_t) > SIZELEFT(mp)) { - DPRINTF(("not enough room in page %"Z"u, got %u ptrs", - mp->mp_pgno, NUMKEYS(mp))); - DPRINTF(("upper - lower = %u - %u = %u", mp->mp_upper, mp->mp_lower, - mp->mp_upper - mp->mp_lower)); - DPRINTF(("node size = %"Z"u", node_size)); - return MDB_PAGE_FULL; - } - +update: /* Move higher pointers up one slot. */ for (i = NUMKEYS(mp); i > indx; i--) mp->mp_ptrs[i] = mp->mp_ptrs[i - 1]; @@ -6359,6 +6333,13 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, } return MDB_SUCCESS; + +full: + DPRINTF(("not enough room in page %"Z"u, got %u ptrs", + mp->mp_pgno, NUMKEYS(mp))); + DPRINTF(("upper-lower = %u - %u = %"Z"d", mp->mp_upper,mp->mp_lower,room)); + DPRINTF(("node size = %"Z"u", node_size)); + return MDB_PAGE_FULL; } /** Delete the specified node from a page. @@ -6493,11 +6474,13 @@ mdb_xcursor_init0(MDB_cursor *mc) mx->mx_cursor.mc_txn = mc->mc_txn; mx->mx_cursor.mc_db = &mx->mx_db; mx->mx_cursor.mc_dbx = &mx->mx_dbx; - mx->mx_cursor.mc_dbi = mc->mc_dbi+1; + mx->mx_cursor.mc_dbi = mc->mc_dbi; mx->mx_cursor.mc_dbflag = &mx->mx_dbflag; mx->mx_cursor.mc_snum = 0; mx->mx_cursor.mc_top = 0; mx->mx_cursor.mc_flags = C_SUB; + mx->mx_dbx.md_name.mv_size = 0; + mx->mx_dbx.md_name.mv_data = NULL; mx->mx_dbx.md_cmp = mc->mc_dbx->md_dcmp; mx->mx_dbx.md_dcmp = NULL; mx->mx_dbx.md_rel = mc->mc_dbx->md_rel; @@ -6518,6 +6501,7 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node) memcpy(&mx->mx_db, NODEDATA(node), sizeof(MDB_db)); mx->mx_cursor.mc_pg[0] = 0; mx->mx_cursor.mc_snum = 0; + mx->mx_cursor.mc_top = 0; mx->mx_cursor.mc_flags = C_SUB; } else { MDB_page *fp = NODEDATA(node); @@ -6530,8 +6514,8 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node) mx->mx_db.md_entries = NUMKEYS(fp); COPY_PGNO(mx->mx_db.md_root, fp->mp_pgno); mx->mx_cursor.mc_snum = 1; - mx->mx_cursor.mc_flags = C_INITIALIZED|C_SUB; mx->mx_cursor.mc_top = 0; + mx->mx_cursor.mc_flags = C_INITIALIZED|C_SUB; mx->mx_cursor.mc_pg[0] = fp; mx->mx_cursor.mc_ki[0] = 0; if (mc->mc_db->md_flags & MDB_DUPFIXED) { @@ -6541,12 +6525,9 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node) mx->mx_db.md_flags |= MDB_INTEGERKEY; } } - DPRINTF(("Sub-db %u for db %u root page %"Z"u", mx->mx_cursor.mc_dbi, mc->mc_dbi, + DPRINTF(("Sub-db -%u root page %"Z"u", mx->mx_cursor.mc_dbi, mx->mx_db.md_root)); - mx->mx_dbflag = DB_VALID | (F_ISSET(mc->mc_pg[mc->mc_top]->mp_flags, P_DIRTY) ? - DB_DIRTY : 0); - mx->mx_dbx.md_name.mv_data = NODEKEY(node); - mx->mx_dbx.md_name.mv_size = node->mn_ksize; + mx->mx_dbflag = DB_VALID|DB_DIRTY; /* DB_DIRTY guides mdb_cursor_touch */ #if UINT_MAX < SIZE_MAX if (mx->mx_dbx.md_cmp == mdb_cmp_int && mx->mx_db.md_pad == sizeof(size_t)) #ifdef MISALIGNED_OK @@ -6862,9 +6843,6 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) MDB_dbi dbi = csrc->mc_dbi; MDB_page *mp = csrc->mc_pg[csrc->mc_top]; - if (csrc->mc_flags & C_SUB) - dbi--; - for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (csrc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -7039,9 +7017,6 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst) MDB_dbi dbi = csrc->mc_dbi; MDB_page *mp = cdst->mc_pg[cdst->mc_top]; - if (csrc->mc_flags & C_SUB) - dbi--; - for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (csrc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -7140,9 +7115,6 @@ mdb_rebalance(MDB_cursor *mc) MDB_cursor *m2, *m3; MDB_dbi dbi = mc->mc_dbi; - if (mc->mc_flags & C_SUB) - dbi--; - for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (mc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -7172,9 +7144,6 @@ mdb_rebalance(MDB_cursor *mc) MDB_cursor *m2, *m3; MDB_dbi dbi = mc->mc_dbi; - if (mc->mc_flags & C_SUB) - dbi--; - for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (mc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -7248,8 +7217,11 @@ mdb_rebalance(MDB_cursor *mc) else { if (mc->mc_ki[ptop] == 0) rc = mdb_page_merge(&mn, mc); - else + else { + mn.mc_ki[mn.mc_top] += mc->mc_ki[mn.mc_top] + 1; rc = mdb_page_merge(mc, &mn); + mdb_cursor_copy(&mn, mc); + } mc->mc_flags &= ~(C_INITIALIZED|C_EOF); } return rc; @@ -7336,7 +7308,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, if (txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN; - if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) { + if (key->mv_size > MDB_MAXKEYSIZE) { return MDB_BAD_VALSIZE; } @@ -7740,9 +7712,6 @@ done: MDB_dbi dbi = mc->mc_dbi; int fixup = NUMKEYS(mp); - if (mc->mc_flags & C_SUB) - dbi--; - for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (mc->mc_flags & C_SUB) m3 = &m2->mc_xcursor->mx_cursor; @@ -7800,13 +7769,6 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID)) return EINVAL; - if (txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) - return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN; - - if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) { - return MDB_BAD_VALSIZE; - } - if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE|MDB_APPEND|MDB_APPENDDUP)) != flags) return EINVAL; @@ -8070,7 +8032,7 @@ mdb_drop0(MDB_cursor *mc, int subs) { int rc; - rc = mdb_page_search(mc, NULL, 0); + rc = mdb_page_search(mc, NULL, MDB_PS_FIRST); if (rc == MDB_SUCCESS) { MDB_txn *txn = mc->mc_txn; MDB_node *ni; @@ -8268,7 +8230,7 @@ int mdb_reader_list(MDB_env *env, MDB_msg_func *func, void *ctx) return 0; } -/* insert pid into list if not already present. +/** Insert pid into list if not already present. * return -1 if already present. */ static int mdb_pid_insert(pid_t *ids, pid_t pid) @@ -8296,7 +8258,7 @@ static int mdb_pid_insert(pid_t *ids, pid_t pid) return -1; } } - + if( val > 0 ) { ++cursor; } @@ -8337,6 +8299,8 @@ int mdb_reader_check(MDB_env *env, int *dead) if (!mdb_reader_pid(env, Pidcheck, pid)) { for (j=i; j 0 ) { ++cursor; } @@ -89,7 +89,7 @@ int mdb_midl_insert( MDB_IDL ids, MDB_ID id ) /* no room */ --ids[0]; return -2; - + } else { /* insert id */ for (i=ids[0]; i>x; i--) @@ -203,7 +203,7 @@ int mdb_midl_append_range( MDB_IDL *idp, MDB_ID id, unsigned n ) /* Quicksort + Insertion sort for small arrays */ #define SMALL 8 -#define SWAP(a,b) { itmp=(a); (a)=(b); (b)=itmp; } +#define MIDL_SWAP(a,b) { itmp=(a); (a)=(b); (b)=itmp; } void mdb_midl_sort( MDB_IDL ids ) @@ -231,15 +231,15 @@ mdb_midl_sort( MDB_IDL ids ) l = istack[jstack--]; } else { k = (l + ir) >> 1; /* Choose median of left, center, right */ - SWAP(ids[k], ids[l+1]); + MIDL_SWAP(ids[k], ids[l+1]); if (ids[l] < ids[ir]) { - SWAP(ids[l], ids[ir]); + MIDL_SWAP(ids[l], ids[ir]); } if (ids[l+1] < ids[ir]) { - SWAP(ids[l+1], ids[ir]); + MIDL_SWAP(ids[l+1], ids[ir]); } if (ids[l] < ids[l+1]) { - SWAP(ids[l], ids[l+1]); + MIDL_SWAP(ids[l], ids[l+1]); } i = l+1; j = ir; @@ -248,7 +248,7 @@ mdb_midl_sort( MDB_IDL ids ) do i++; while(ids[i] > a); do j--; while(ids[j] < a); if (j < i) break; - SWAP(ids[i],ids[j]); + MIDL_SWAP(ids[i],ids[j]); } ids[l+1] = ids[j]; ids[j] = a; diff --git a/libraries/liblmdb/midl.h b/libraries/liblmdb/midl.h index b0bdff3f49..635cd29e0e 100644 --- a/libraries/liblmdb/midl.h +++ b/libraries/liblmdb/midl.h @@ -39,7 +39,7 @@ extern "C" { /** @defgroup idls ID List Management * @{ */ - /** A generic ID number. These were entryIDs in back-bdb. + /** A generic unsigned ID number. These were entryIDs in back-bdb. * Preferably it should have the same size as a pointer. */ typedef size_t MDB_ID;