Merge remote-tracking branch 'origin/mdb.RE/0.9'

This commit is contained in:
Quanah Gibson-Mount 2024-01-20 22:31:58 +00:00
commit dc7a286207
6 changed files with 707 additions and 39 deletions

View file

@ -1,5 +1,11 @@
LMDB 0.9 Change Log LMDB 0.9 Change Log
LMDB 0.9.32 Engineering
ITS#9378 - Add ability to replay log and replay log tool
ITS#10095 - partial revert of ITS#9278. The patch was incorrect and introduced numerous race conditions.
ITS#10125 - mdb_load: fix cursor reinit in Append mode
ITS#10137 - Allow users to define MDB_IDL_LOGN
LMDB 0.9.31 Release (2023/07/10) LMDB 0.9.31 Release (2023/07/10)
ITS#8447 - Fix cursor_put(MDB_CURRENT) on DUPSORT DB with different sized data ITS#8447 - Fix cursor_put(MDB_CURRENT) on DUPSORT DB with different sized data

View file

@ -78,6 +78,7 @@ mtest3: mtest3.o liblmdb.a
mtest4: mtest4.o liblmdb.a mtest4: mtest4.o liblmdb.a
mtest5: mtest5.o liblmdb.a mtest5: mtest5.o liblmdb.a
mtest6: mtest6.o liblmdb.a mtest6: mtest6.o liblmdb.a
mplay: mplay.o liblmdb.a
mdb.o: mdb.c lmdb.h midl.h mdb.o: mdb.c lmdb.h midl.h
$(CC) $(CFLAGS) $(CPPFLAGS) -c mdb.c $(CC) $(CFLAGS) $(CPPFLAGS) -c mdb.c

View file

@ -481,18 +481,26 @@ typedef MDB_ID txnid_t;
#define MDB_DEBUG 0 #define MDB_DEBUG 0
#endif #endif
#define MDB_DBG_INFO 1
#define MDB_DBG_TRACE 2
#if MDB_DEBUG #if MDB_DEBUG
static int mdb_debug; static int mdb_debug = MDB_DBG_TRACE;
static txnid_t mdb_debug_start; static txnid_t mdb_debug_start;
/** Print a debug message with printf formatting. /** Print a debug message with printf formatting.
* Requires double parenthesis around 2 or more args. * Requires double parenthesis around 2 or more args.
*/ */
# define DPRINTF(args) ((void) ((mdb_debug) && DPRINTF0 args)) # define DPRINTF(args) ((void) ((mdb_debug & MDB_DBG_INFO) && DPRINTF0 args))
# define DPRINTF0(fmt, ...) \ # define DPRINTF0(fmt, ...) \
fprintf(stderr, "%s:%d " fmt "\n", mdb_func_, __LINE__, __VA_ARGS__) fprintf(stderr, "%s:%d " fmt "\n", mdb_func_, __LINE__, __VA_ARGS__)
/** Trace info for replaying */
# define MDB_TRACE(args) ((void) ((mdb_debug & MDB_DBG_TRACE) && DPRINTF1 args))
# define DPRINTF1(fmt, ...) \
fprintf(stderr, ">%d:%s: " fmt "\n", getpid(), mdb_func_, __VA_ARGS__)
#else #else
# define DPRINTF(args) ((void) 0) # define DPRINTF(args) ((void) 0)
# define MDB_TRACE(args) ((void) 0)
#endif #endif
/** Print a debug string. /** Print a debug string.
* The string is printed literally, with no format processing. * The string is printed literally, with no format processing.
@ -589,6 +597,11 @@ static txnid_t mdb_debug_start;
* This is used for printing a hex dump of a key's contents. * This is used for printing a hex dump of a key's contents.
*/ */
#define DKBUF char kbuf[DKBUF_MAXKEYSIZE*2+1] #define DKBUF char kbuf[DKBUF_MAXKEYSIZE*2+1]
/** A data value buffer.
* @ingroup debug
* This is used for printing a hex dump of a #MDB_DUPSORT value's contents.
*/
#define DDBUF char dbuf[DKBUF_MAXKEYSIZE*2+1+2]
/** Display a key in hex. /** Display a key in hex.
* @ingroup debug * @ingroup debug
* Invoke a function to display a key in hex. * Invoke a function to display a key in hex.
@ -596,6 +609,7 @@ static txnid_t mdb_debug_start;
#define DKEY(x) mdb_dkey(x, kbuf) #define DKEY(x) mdb_dkey(x, kbuf)
#else #else
#define DKBUF #define DKBUF
#define DDBUF
#define DKEY(x) 0 #define DKEY(x) 0
#endif #endif
@ -1423,6 +1437,9 @@ static int mdb_update_key(MDB_cursor *mc, MDB_val *key);
static void mdb_cursor_pop(MDB_cursor *mc); static void mdb_cursor_pop(MDB_cursor *mc);
static int mdb_cursor_push(MDB_cursor *mc, MDB_page *mp); static int mdb_cursor_push(MDB_cursor *mc, MDB_page *mp);
static int _mdb_cursor_del(MDB_cursor *mc, unsigned int flags);
static int _mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, unsigned int flags);
static int mdb_cursor_del0(MDB_cursor *mc); static int mdb_cursor_del0(MDB_cursor *mc);
static int mdb_del0(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data, unsigned flags); static int mdb_del0(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data, unsigned flags);
static int mdb_cursor_sibling(MDB_cursor *mc, int move_right); static int mdb_cursor_sibling(MDB_cursor *mc, int move_right);
@ -1614,6 +1631,18 @@ mdb_dkey(MDB_val *key, char *buf)
return buf; return buf;
} }
static char *
mdb_dval(MDB_txn *txn, MDB_dbi dbi, MDB_val *data, char *buf)
{
if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
mdb_dkey(data, buf+1);
*buf = '[';
strcpy(buf + data->mv_size * 2 + 1, "]");
} else
*buf = '\0';
return buf;
}
static const char * static const char *
mdb_leafnode_type(MDB_node *n) mdb_leafnode_type(MDB_node *n)
{ {
@ -2786,7 +2815,7 @@ mdb_txn_renew0(MDB_txn *txn)
txn->mt_txnid++; txn->mt_txnid++;
#if MDB_DEBUG #if MDB_DEBUG
if (txn->mt_txnid == mdb_debug_start) if (txn->mt_txnid == mdb_debug_start)
mdb_debug = 1; mdb_debug = MDB_DBG_INFO;
#endif #endif
txn->mt_child = NULL; txn->mt_child = NULL;
txn->mt_loose_pgs = NULL; txn->mt_loose_pgs = NULL;
@ -2945,6 +2974,7 @@ renew:
txn->mt_txnid, (flags & MDB_RDONLY) ? 'r' : 'w', txn->mt_txnid, (flags & MDB_RDONLY) ? 'r' : 'w',
(void *) txn, (void *) env, txn->mt_dbs[MAIN_DBI].md_root)); (void *) txn, (void *) env, txn->mt_dbs[MAIN_DBI].md_root));
} }
MDB_TRACE(("%p, %p, %u = %p", env, parent, flags, txn));
return rc; return rc;
} }
@ -3080,18 +3110,25 @@ mdb_txn_reset(MDB_txn *txn)
mdb_txn_end(txn, MDB_END_RESET); mdb_txn_end(txn, MDB_END_RESET);
} }
void static void
mdb_txn_abort(MDB_txn *txn) _mdb_txn_abort(MDB_txn *txn)
{ {
if (txn == NULL) if (txn == NULL)
return; return;
if (txn->mt_child) if (txn->mt_child)
mdb_txn_abort(txn->mt_child); _mdb_txn_abort(txn->mt_child);
mdb_txn_end(txn, MDB_END_ABORT|MDB_END_SLOT|MDB_END_FREE); mdb_txn_end(txn, MDB_END_ABORT|MDB_END_SLOT|MDB_END_FREE);
} }
void
mdb_txn_abort(MDB_txn *txn)
{
MDB_TRACE(("%p", txn));
_mdb_txn_abort(txn);
}
/** Save the freelist as of this transaction to the freeDB. /** Save the freelist as of this transaction to the freeDB.
* This changes the freelist. Keep trying until it stabilizes. * This changes the freelist. Keep trying until it stabilizes.
*/ */
@ -3182,7 +3219,7 @@ mdb_freelist_save(MDB_txn *txn)
pglast = head_id = *(txnid_t *)key.mv_data; pglast = head_id = *(txnid_t *)key.mv_data;
total_room = head_room = 0; total_room = head_room = 0;
mdb_tassert(txn, pglast <= env->me_pglast); mdb_tassert(txn, pglast <= env->me_pglast);
rc = mdb_cursor_del(&mc, 0); rc = _mdb_cursor_del(&mc, 0);
if (rc) if (rc)
return rc; return rc;
} }
@ -3202,7 +3239,7 @@ mdb_freelist_save(MDB_txn *txn)
do { do {
freecnt = free_pgs[0]; freecnt = free_pgs[0];
data.mv_size = MDB_IDL_SIZEOF(free_pgs); data.mv_size = MDB_IDL_SIZEOF(free_pgs);
rc = mdb_cursor_put(&mc, &key, &data, MDB_RESERVE); rc = _mdb_cursor_put(&mc, &key, &data, MDB_RESERVE);
if (rc) if (rc)
return rc; return rc;
/* Retry if mt_free_pgs[] grew during the Put() */ /* Retry if mt_free_pgs[] grew during the Put() */
@ -3251,7 +3288,7 @@ mdb_freelist_save(MDB_txn *txn)
key.mv_size = sizeof(head_id); key.mv_size = sizeof(head_id);
key.mv_data = &head_id; key.mv_data = &head_id;
data.mv_size = (head_room + 1) * sizeof(pgno_t); data.mv_size = (head_room + 1) * sizeof(pgno_t);
rc = mdb_cursor_put(&mc, &key, &data, MDB_RESERVE); rc = _mdb_cursor_put(&mc, &key, &data, MDB_RESERVE);
if (rc) if (rc)
return rc; return rc;
/* IDL is initially empty, zero out at least the length */ /* IDL is initially empty, zero out at least the length */
@ -3306,7 +3343,7 @@ mdb_freelist_save(MDB_txn *txn)
data.mv_data = mop -= len; data.mv_data = mop -= len;
save = mop[0]; save = mop[0];
mop[0] = len; mop[0] = len;
rc = mdb_cursor_put(&mc, &key, &data, MDB_CURRENT); rc = _mdb_cursor_put(&mc, &key, &data, MDB_CURRENT);
mop[0] = save; mop[0] = save;
if (rc || !(mop_len -= len)) if (rc || !(mop_len -= len))
break; break;
@ -3467,8 +3504,8 @@ done:
return MDB_SUCCESS; return MDB_SUCCESS;
} }
int static int
mdb_txn_commit(MDB_txn *txn) _mdb_txn_commit(MDB_txn *txn)
{ {
int rc; int rc;
unsigned int i, end_mode; unsigned int i, end_mode;
@ -3481,7 +3518,7 @@ mdb_txn_commit(MDB_txn *txn)
end_mode = MDB_END_EMPTY_COMMIT|MDB_END_UPDATE|MDB_END_SLOT|MDB_END_FREE; end_mode = MDB_END_EMPTY_COMMIT|MDB_END_UPDATE|MDB_END_SLOT|MDB_END_FREE;
if (txn->mt_child) { if (txn->mt_child) {
rc = mdb_txn_commit(txn->mt_child); rc = _mdb_txn_commit(txn->mt_child);
if (rc) if (rc)
goto fail; goto fail;
} }
@ -3661,7 +3698,7 @@ mdb_txn_commit(MDB_txn *txn)
goto fail; goto fail;
} }
data.mv_data = &txn->mt_dbs[i]; data.mv_data = &txn->mt_dbs[i];
rc = mdb_cursor_put(&mc, &txn->mt_dbxs[i].md_name, &data, rc = _mdb_cursor_put(&mc, &txn->mt_dbxs[i].md_name, &data,
F_SUBDATA); F_SUBDATA);
if (rc) if (rc)
goto fail; goto fail;
@ -3692,10 +3729,17 @@ done:
return MDB_SUCCESS; return MDB_SUCCESS;
fail: fail:
mdb_txn_abort(txn); _mdb_txn_abort(txn);
return rc; return rc;
} }
int
mdb_txn_commit(MDB_txn *txn)
{
MDB_TRACE(("%p", txn));
return _mdb_txn_commit(txn);
}
/** Read the environment parameters of a DB environment before /** Read the environment parameters of a DB environment before
* mapping it into memory. * mapping it into memory.
* @param[in] env the environment handle * @param[in] env the environment handle
@ -3992,6 +4036,7 @@ mdb_env_create(MDB_env **env)
GET_PAGESIZE(e->me_os_psize); GET_PAGESIZE(e->me_os_psize);
VGMEMP_CREATE(e,0,0); VGMEMP_CREATE(e,0,0);
*env = e; *env = e;
MDB_TRACE(("%p", e));
return MDB_SUCCESS; return MDB_SUCCESS;
} }
@ -4115,6 +4160,7 @@ mdb_env_set_mapsize(MDB_env *env, size_t size)
env->me_mapsize = size; env->me_mapsize = size;
if (env->me_psize) if (env->me_psize)
env->me_maxpg = env->me_mapsize / env->me_psize; env->me_maxpg = env->me_mapsize / env->me_psize;
MDB_TRACE(("%p, %"Yu"", env, size));
return MDB_SUCCESS; return MDB_SUCCESS;
} }
@ -4124,6 +4170,7 @@ mdb_env_set_maxdbs(MDB_env *env, MDB_dbi dbs)
if (env->me_map) if (env->me_map)
return EINVAL; return EINVAL;
env->me_maxdbs = dbs + CORE_DBS; env->me_maxdbs = dbs + CORE_DBS;
MDB_TRACE(("%p, %u", env, dbs));
return MDB_SUCCESS; return MDB_SUCCESS;
} }
@ -4133,6 +4180,7 @@ mdb_env_set_maxreaders(MDB_env *env, unsigned int readers)
if (env->me_map || readers < 1) if (env->me_map || readers < 1)
return EINVAL; return EINVAL;
env->me_maxreaders = readers; env->me_maxreaders = readers;
MDB_TRACE(("%p, %u", env, readers));
return MDB_SUCCESS; return MDB_SUCCESS;
} }
@ -5076,6 +5124,7 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode
} }
leave: leave:
MDB_TRACE(("%p, %s, %u, %04o", env, path, flags & (CHANGEABLE|CHANGELESS), mode));
if (rc) { if (rc) {
mdb_env_close0(env, excl); mdb_env_close0(env, excl);
} }
@ -5162,17 +5211,6 @@ mdb_env_close0(MDB_env *env, int excl)
sem_unlink(env->me_txns->mti_wmname); sem_unlink(env->me_txns->mti_wmname);
} }
} }
#elif defined(MDB_ROBUST_SUPPORTED)
/* If we have the filelock: If we are the
* only remaining user, clean up robust
* mutexes.
*/
if (excl == 0)
mdb_env_excl_lock(env, &excl);
if (excl > 0) {
pthread_mutex_destroy(env->me_txns->mti_rmutex);
pthread_mutex_destroy(env->me_txns->mti_wmutex);
}
#endif #endif
munmap((void *)env->me_txns, (env->me_maxreaders-1)*sizeof(MDB_reader)+sizeof(MDB_txninfo)); munmap((void *)env->me_txns, (env->me_maxreaders-1)*sizeof(MDB_reader)+sizeof(MDB_txninfo));
} }
@ -5199,6 +5237,7 @@ mdb_env_close(MDB_env *env)
if (env == NULL) if (env == NULL)
return; return;
MDB_TRACE(("%p", env));
VGMEMP_DESTROY(env); VGMEMP_DESTROY(env);
while ((dp = env->me_dpages) != NULL) { while ((dp = env->me_dpages) != NULL) {
VGMEMP_DEFINED(&dp->mp_next, sizeof(dp->mp_next)); VGMEMP_DEFINED(&dp->mp_next, sizeof(dp->mp_next));
@ -6570,8 +6609,8 @@ mdb_cursor_touch(MDB_cursor *mc)
/** Do not spill pages to disk if txn is getting full, may fail instead */ /** Do not spill pages to disk if txn is getting full, may fail instead */
#define MDB_NOSPILL 0x8000 #define MDB_NOSPILL 0x8000
int static int
mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, _mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
unsigned int flags) unsigned int flags)
{ {
MDB_env *env; MDB_env *env;
@ -7034,7 +7073,7 @@ put_sub:
new_dupdata = (int)dkey.mv_size; new_dupdata = (int)dkey.mv_size;
/* converted, write the original data first */ /* converted, write the original data first */
if (dkey.mv_size) { if (dkey.mv_size) {
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags); rc = _mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags);
if (rc) if (rc)
goto bad_sub; goto bad_sub;
/* we've done our job */ /* we've done our job */
@ -7062,7 +7101,7 @@ put_sub:
ecount = mc->mc_xcursor->mx_db.md_entries; ecount = mc->mc_xcursor->mx_db.md_entries;
if (flags & MDB_APPENDDUP) if (flags & MDB_APPENDDUP)
xflags |= MDB_APPEND; xflags |= MDB_APPEND;
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags); rc = _mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
if (flags & F_SUBDATA) { if (flags & F_SUBDATA) {
void *db = NODEDATA(leaf); void *db = NODEDATA(leaf);
memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db)); memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
@ -7103,7 +7142,20 @@ bad_sub:
} }
int int
mdb_cursor_del(MDB_cursor *mc, unsigned int flags) mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
unsigned int flags)
{
DKBUF;
DDBUF;
int rc = _mdb_cursor_put(mc, key, data, flags);
MDB_TRACE(("%p, %"Z"u[%s], %"Z"u%s, %u",
mc, key ? key->mv_size:0, DKEY(key), data ? data->mv_size:0,
data ? mdb_dval(mc->mc_txn, mc->mc_dbi, data, dbuf):"", flags));
return rc;
}
static int
_mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
{ {
MDB_node *leaf; MDB_node *leaf;
MDB_page *mp; MDB_page *mp;
@ -7141,7 +7193,7 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) { if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) {
mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf); mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf);
} }
rc = mdb_cursor_del(&mc->mc_xcursor->mx_cursor, MDB_NOSPILL); rc = _mdb_cursor_del(&mc->mc_xcursor->mx_cursor, MDB_NOSPILL);
if (rc) if (rc)
return rc; return rc;
/* If sub-DB still has entries, we're done */ /* If sub-DB still has entries, we're done */
@ -7205,6 +7257,14 @@ fail:
return rc; return rc;
} }
int
mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
{
MDB_TRACE(("%p, %u",
mc, flags));
return _mdb_cursor_del(mc, flags);
}
/** Allocate and initialize new pages for a database. /** Allocate and initialize new pages for a database.
* Set #MDB_TXN_ERROR on failure. * Set #MDB_TXN_ERROR on failure.
* @param[in] mc a cursor on the database being added to. * @param[in] mc a cursor on the database being added to.
@ -7698,6 +7758,7 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
return ENOMEM; return ENOMEM;
} }
MDB_TRACE(("%p, %u = %p", txn, dbi, mc));
*ret = mc; *ret = mc;
return MDB_SUCCESS; return MDB_SUCCESS;
@ -7761,6 +7822,7 @@ mdb_cursor_count(MDB_cursor *mc, size_t *countp)
void void
mdb_cursor_close(MDB_cursor *mc) mdb_cursor_close(MDB_cursor *mc)
{ {
MDB_TRACE(("%p", mc));
if (mc && !mc->mc_backup) { if (mc && !mc->mc_backup) {
/* remove from txn, if tracked */ /* remove from txn, if tracked */
if ((mc->mc_flags & C_UNTRACK) && mc->mc_txn->mt_cursors) { if ((mc->mc_flags & C_UNTRACK) && mc->mc_txn->mt_cursors) {
@ -8570,6 +8632,8 @@ int
mdb_del(MDB_txn *txn, MDB_dbi dbi, mdb_del(MDB_txn *txn, MDB_dbi dbi,
MDB_val *key, MDB_val *data) MDB_val *key, MDB_val *data)
{ {
DKBUF;
DDBUF;
if (!key || !TXN_DBI_EXIST(txn, dbi, DB_USRVALID)) if (!key || !TXN_DBI_EXIST(txn, dbi, DB_USRVALID))
return EINVAL; return EINVAL;
@ -8581,6 +8645,9 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
data = NULL; data = NULL;
} }
MDB_TRACE(("%p, %u, %"Z"u[%s], %"Z"u%s",
txn, dbi, key ? key->mv_size:0, DKEY(key), data ? data->mv_size:0,
data ? mdb_dval(txn, dbi, data, dbuf):""));
return mdb_del0(txn, dbi, key, data, 0); return mdb_del0(txn, dbi, key, data, 0);
} }
@ -8621,7 +8688,7 @@ mdb_del0(MDB_txn *txn, MDB_dbi dbi,
mc.mc_flags |= C_UNTRACK; mc.mc_flags |= C_UNTRACK;
mc.mc_next = txn->mt_cursors[dbi]; mc.mc_next = txn->mt_cursors[dbi];
txn->mt_cursors[dbi] = &mc; txn->mt_cursors[dbi] = &mc;
rc = mdb_cursor_del(&mc, flags); rc = _mdb_cursor_del(&mc, flags);
txn->mt_cursors[dbi] = mc.mc_next; txn->mt_cursors[dbi] = mc.mc_next;
} }
return rc; return rc;
@ -9063,6 +9130,8 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
MDB_cursor mc; MDB_cursor mc;
MDB_xcursor mx; MDB_xcursor mx;
int rc; int rc;
DKBUF;
DDBUF;
if (!key || !data || !TXN_DBI_EXIST(txn, dbi, DB_USRVALID)) if (!key || !data || !TXN_DBI_EXIST(txn, dbi, DB_USRVALID))
return EINVAL; return EINVAL;
@ -9073,10 +9142,12 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
if (txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_BLOCKED)) if (txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_BLOCKED))
return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN; return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN;
MDB_TRACE(("%p, %u, %"Z"u[%s], %"Z"u%s, %u",
txn, dbi, key ? key->mv_size:0, DKEY(key), data->mv_size, mdb_dval(txn, dbi, data, dbuf), flags));
mdb_cursor_init(&mc, txn, dbi, &mx); mdb_cursor_init(&mc, txn, dbi, &mx);
mc.mc_next = txn->mt_cursors[dbi]; mc.mc_next = txn->mt_cursors[dbi];
txn->mt_cursors[dbi] = &mc; txn->mt_cursors[dbi] = &mc;
rc = mdb_cursor_put(&mc, key, data, flags); rc = _mdb_cursor_put(&mc, key, data, flags);
txn->mt_cursors[dbi] = mc.mc_next; txn->mt_cursors[dbi] = mc.mc_next;
return rc; return rc;
} }
@ -9479,7 +9550,7 @@ finish:
my.mc_error = rc; my.mc_error = rc;
mdb_env_cthr_toggle(&my, 1 | MDB_EOF); mdb_env_cthr_toggle(&my, 1 | MDB_EOF);
rc = THREAD_FINISH(thr); rc = THREAD_FINISH(thr);
mdb_txn_abort(txn); _mdb_txn_abort(txn);
done: done:
#ifdef _WIN32 #ifdef _WIN32
@ -9591,7 +9662,7 @@ mdb_env_copyfd0(MDB_env *env, HANDLE fd)
} }
leave: leave:
mdb_txn_abort(txn); _mdb_txn_abort(txn);
return rc; return rc;
} }
@ -9806,6 +9877,7 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *db
} }
} }
mdb_default_cmp(txn, MAIN_DBI); mdb_default_cmp(txn, MAIN_DBI);
MDB_TRACE(("%p, (null), %u = %u", txn, flags, MAIN_DBI));
return MDB_SUCCESS; return MDB_SUCCESS;
} }
@ -9867,7 +9939,7 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *db
dummy.md_root = P_INVALID; dummy.md_root = P_INVALID;
dummy.md_flags = flags & PERSISTENT_FLAGS; dummy.md_flags = flags & PERSISTENT_FLAGS;
WITH_CURSOR_TRACKING(mc, WITH_CURSOR_TRACKING(mc,
rc = mdb_cursor_put(&mc, &key, &data, F_SUBDATA)); rc = _mdb_cursor_put(&mc, &key, &data, F_SUBDATA));
dbflag |= DB_DIRTY; dbflag |= DB_DIRTY;
} }
@ -9892,6 +9964,7 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *db
if (!unused) { if (!unused) {
txn->mt_numdbs++; txn->mt_numdbs++;
} }
MDB_TRACE(("%p, %s, %u = %u", txn, name, flags, slot));
} }
return rc; return rc;
@ -9923,6 +9996,7 @@ void mdb_dbi_close(MDB_env *env, MDB_dbi dbi)
ptr = env->me_dbxs[dbi].md_name.mv_data; ptr = env->me_dbxs[dbi].md_name.mv_data;
/* If there was no name, this was already closed */ /* If there was no name, this was already closed */
if (ptr) { if (ptr) {
MDB_TRACE(("%p, %u", env, dbi));
env->me_dbxs[dbi].md_name.mv_data = NULL; env->me_dbxs[dbi].md_name.mv_data = NULL;
env->me_dbxs[dbi].md_name.mv_size = 0; env->me_dbxs[dbi].md_name.mv_size = 0;
env->me_dbflags[dbi] = 0; env->me_dbflags[dbi] = 0;
@ -10057,6 +10131,7 @@ int mdb_drop(MDB_txn *txn, MDB_dbi dbi, int del)
if (rc) if (rc)
return rc; return rc;
MDB_TRACE(("%u, %d", dbi, del));
rc = mdb_drop0(mc, mc->mc_db->md_flags & MDB_DUPSORT); rc = mdb_drop0(mc, mc->mc_db->md_flags & MDB_DUPSORT);
/* Invalidate the dropped DB's cursors */ /* Invalidate the dropped DB's cursors */
for (m2 = txn->mt_cursors[dbi]; m2; m2 = m2->mc_next) for (m2 = txn->mt_cursors[dbi]; m2; m2 = m2->mc_next)

View file

@ -449,7 +449,7 @@ int main(int argc, char *argv[])
if (rc == MDB_KEYEXIST && putflags) if (rc == MDB_KEYEXIST && putflags)
continue; continue;
if (rc) { if (rc) {
fprintf(stderr, "mdb_cursor_put failed, error %d %s\n", rc, mdb_strerror(rc)); fprintf(stderr, "%s: line %" Z "d: mdb_cursor_put failed, error %d %s\n", prog, lineno, rc, mdb_strerror(rc));
goto txn_abort; goto txn_abort;
} }
batch++; batch++;
@ -470,9 +470,11 @@ int main(int argc, char *argv[])
fprintf(stderr, "mdb_cursor_open failed, error %d %s\n", rc, mdb_strerror(rc)); fprintf(stderr, "mdb_cursor_open failed, error %d %s\n", rc, mdb_strerror(rc));
goto txn_abort; goto txn_abort;
} }
if (appflag & MDB_APPENDDUP) { if (append) {
MDB_val k, d; MDB_val k, d;
mdb_cursor_get(mc, &k, &d, MDB_LAST); mdb_cursor_get(mc, &k, &d, MDB_LAST);
memcpy(prevk.mv_data, k.mv_data, k.mv_size);
prevk.mv_size = k.mv_size;
} }
batch = 0; batch = 0;
} }

View file

@ -56,7 +56,9 @@ typedef MDB_ID *MDB_IDL;
/* IDL sizes - likely should be even bigger /* IDL sizes - likely should be even bigger
* limiting factors: sizeof(ID), thread stack size * limiting factors: sizeof(ID), thread stack size
*/ */
#ifndef MDB_IDL_LOGN
#define MDB_IDL_LOGN 16 /* DB_SIZE is 2^16, UM_SIZE is 2^17 */ #define MDB_IDL_LOGN 16 /* DB_SIZE is 2^16, UM_SIZE is 2^17 */
#endif
#define MDB_IDL_DB_SIZE (1<<MDB_IDL_LOGN) #define MDB_IDL_DB_SIZE (1<<MDB_IDL_LOGN)
#define MDB_IDL_UM_SIZE (1<<(MDB_IDL_LOGN+1)) #define MDB_IDL_UM_SIZE (1<<(MDB_IDL_LOGN+1))

582
libraries/liblmdb/mplay.c Normal file
View file

@ -0,0 +1,582 @@
/* mplay.c - memory-mapped database log replay */
/*
* Copyright 2011-2023 Howard Chu, Symas Corp.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted only as authorized by the OpenLDAP
* Public License.
*
* A copy of this license is available in the file LICENSE in the
* top-level directory of the distribution or, alternatively, at
* <http://www.OpenLDAP.org/license.html>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <string.h>
#include <ctype.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "lmdb.h"
#define E(expr) CHECK((rc = (expr)) == MDB_SUCCESS, #expr)
#define RES(err, expr) ((rc = expr) == (err) || (CHECK(!rc, #expr), 0))
#define CHECK(test, msg) ((test) ? (void)0 : ((void)fprintf(stderr, \
"%s:%d: %s: %s\n", __FILE__, __LINE__, msg, mdb_strerror(rc)), abort()))
#define MDB_SCNy(t) "z" #t
#define SCMP(s) s, (sizeof(s)-1)
char inbuf[8192];
char *dbuf, *kbuf;
size_t dbufsize;
int maxkey;
#define SOFF(s) (sizeof(s)+1)
#define MAXENVS 16
#define MAXTXNS 16
#define MAXCRSS 16
#define MAXPIDS 16
typedef struct crspair {
void *tcrs; /* scanned text pointer */
MDB_cursor *rcrs;
} crspair;
typedef struct txnpair {
void *ttxn; /* scanned text pointer */
MDB_txn *rtxn;
crspair cursors[MAXCRSS];
int ncursors;
} txnpair;
typedef struct envpair {
void *tenv;
MDB_env *renv;
txnpair txns[MAXTXNS];
int ntxns;
} envpair;
envpair envs[MAXENVS];
int nenvs;
envpair *lastenv;
txnpair *lasttxn;
crspair *lastcrs;
typedef struct pidpair {
int tpid;
pid_t rpid;
int fdout, fdin;
} pidpair;
pidpair *lastpid;
pidpair pids[MAXPIDS];
int npids;
unsigned long lcount;
static int unhex(unsigned char *c2)
{
int x, c;
x = *c2++ & 0x4f;
if (x & 0x40)
x -= 55;
c = x << 4;
x = *c2 & 0x4f;
if (x & 0x40)
x -= 55;
c |= x;
return c;
}
int inhex(char *in, char *out)
{
char *c2 = out;
while (isxdigit(*in)) {
*c2++ = unhex((unsigned char *)in);
in += 2;
}
return c2 - out;
}
static void addenv(void *tenv, MDB_env *renv)
{
assert(nenvs < MAXENVS);
envs[nenvs].tenv = tenv;
envs[nenvs].renv = renv;
envs[nenvs].ntxns = 0;
lastenv = envs+nenvs;
nenvs++;
}
static envpair *findenv(void *tenv)
{
int i;
if (!lastenv || lastenv->tenv != tenv) {
for (i=0; i<nenvs; i++)
if (envs[i].tenv == tenv)
break;
assert(i < nenvs);
lastenv = &envs[i];
}
return lastenv;
}
static void delenv(envpair *ep)
{
int i = ep - envs;
for (; i<nenvs-1; i++)
envs[i] = envs[i+1];
nenvs--;
lastenv = NULL;
}
static void addtxn(void *tenv, void *ttxn, MDB_txn *rtxn)
{
envpair *ep;
txnpair *tp;
ep = findenv(tenv);
assert(ep->ntxns < MAXTXNS);
tp = ep->txns+ep->ntxns;
tp->ttxn = ttxn;
tp->rtxn = rtxn;
tp->ncursors = 0;
ep->ntxns++;
lasttxn = tp;
}
static txnpair *findtxn(void *ttxn)
{
int i, j;
if (lasttxn && lasttxn->ttxn == ttxn)
return lasttxn;
if (lastenv) {
for (i=0; i<lastenv->ntxns; i++) {
if (lastenv->txns[i].ttxn == ttxn) {
lasttxn = lastenv->txns+i;
return lasttxn;
}
}
}
for (i=0; i<nenvs; i++) {
if (envs+i == lastenv) continue;
for (j=0; j<envs[i].ntxns; j++) {
if (envs[i].txns[j].ttxn == ttxn) {
lastenv = envs+i;
lasttxn = envs[i].txns+j;
return lasttxn;
}
}
}
assert(0); /* should have found it */
}
static void deltxn(txnpair *tp)
{
int i = tp - lastenv->txns;
for (; i<lastenv->ntxns-1; i++)
lastenv->txns[i] = lastenv->txns[i+1];
lastenv->ntxns--;
lasttxn = NULL;
}
static void addcrs(txnpair *tp, void *tcrs, MDB_cursor *rcrs)
{
int j = tp->ncursors;
assert(tp->ncursors < MAXCRSS);
tp->cursors[j].tcrs = tcrs;
tp->cursors[j].rcrs = rcrs;
tp->ncursors++;
lastcrs = tp->cursors+j;
}
static crspair *findcrs(void *tcrs)
{
int i, j, k;
envpair *ep;
txnpair *tp;
crspair *cp;
if (lastcrs && lastcrs->tcrs == tcrs)
return lastcrs;
if (lasttxn) {
for (k=0, cp=lasttxn->cursors; k<lasttxn->ncursors; k++, cp++) {
if (cp->tcrs == tcrs) {
lastcrs = cp;
return lastcrs;
}
}
}
if (lastenv) {
for (j=0, tp=lastenv->txns; j<lastenv->ntxns; j++, tp++){
if (tp == lasttxn)
continue;
for (k=0, cp = tp->cursors; k<tp->ncursors; k++, cp++) {
if (cp->tcrs == tcrs) {
lastcrs = cp;
lasttxn = tp;
return lastcrs;
}
}
}
}
for (i=0, ep=envs; i<nenvs; i++, ep++) {
for (j=0, tp=ep->txns; j<ep->ntxns; j++, tp++) {
if (tp == lasttxn)
continue;
for (k=0, cp = tp->cursors; k<tp->ncursors; k++, cp++) {
if (cp->tcrs == tcrs) {
lastcrs = cp;
lasttxn = tp;
lastenv = ep;
return lastcrs;
}
}
}
}
assert(0); /* should have found it already */
}
static void delcrs(void *tcrs)
{
int i;
crspair *cp = findcrs(tcrs);
mdb_cursor_close(cp->rcrs);
for (i = cp - lasttxn->cursors; i<lasttxn->ncursors-1; i++)
lasttxn->cursors[i] = lasttxn->cursors[i+1];
lasttxn->ncursors--;
lastcrs = NULL;
}
void child()
{
int rc;
MDB_val key, data;
char *ptr;
while (fgets(inbuf, sizeof(inbuf), stdin)) {
ptr = inbuf;
if (!strncmp(ptr, SCMP("exit")))
break;
if (!strncmp(ptr, SCMP("mdb_env_create"))) {
void *tenv;
MDB_env *renv;
sscanf(ptr+SOFF("mdb_env_create"), "%p", &tenv);
E(mdb_env_create(&renv));
addenv(tenv, renv);
} else if (!strncmp(ptr, SCMP("mdb_env_set_maxdbs"))) {
void *tenv;
envpair *ep;
unsigned int maxdbs;
sscanf(ptr+SOFF("mdb_env_set_maxdbs"), "%p, %u", &tenv, &maxdbs);
ep = findenv(tenv);
E(mdb_env_set_maxdbs(ep->renv, maxdbs));
} else if (!strncmp(ptr, SCMP("mdb_env_set_mapsize"))) {
void *tenv;
envpair *ep;
size_t mapsize;
sscanf(ptr+SOFF("mdb_env_set_mapsize"), "%p, %"MDB_SCNy(u), &tenv, &mapsize);
ep = findenv(tenv);
E(mdb_env_set_mapsize(ep->renv, mapsize));
} else if (!strncmp(ptr, SCMP("mdb_env_open"))) {
void *tenv;
envpair *ep;
char *path;
int len;
unsigned int flags, mode;
sscanf(ptr+SOFF("mdb_env_open"), "%p, %n", &tenv, &len);
path = ptr+SOFF("mdb_env_open")+len;
ptr = strchr(path, ',');
*ptr++ = '\0';
sscanf(ptr, "%u, %o", &flags, &mode);
ep = findenv(tenv);
E(mdb_env_open(ep->renv, path, flags, mode));
if (!maxkey) {
maxkey = mdb_env_get_maxkeysize(ep->renv);
kbuf = malloc(maxkey+2);
dbuf = malloc(maxkey+2);
dbufsize = maxkey;
}
} else if (!strncmp(ptr, SCMP("mdb_env_close"))) {
void *tenv;
envpair *ep;
sscanf(ptr+SOFF("mdb_env_close"), "%p", &tenv);
ep = findenv(tenv);
mdb_env_close(ep->renv);
delenv(ep);
if (!nenvs) /* if no other envs left, this process is done */
break;
} else if (!strncmp(ptr, SCMP("mdb_txn_begin"))) {
unsigned int flags;
void *tenv, *ttxn;
envpair *ep;
MDB_txn *rtxn;
sscanf(ptr+SOFF("mdb_txn_begin"), "%p, %*p, %u = %p", &tenv, &flags, &ttxn);
ep = findenv(tenv);
E(mdb_txn_begin(ep->renv, NULL, flags, &rtxn));
addtxn(tenv, ttxn, rtxn);
} else if (!strncmp(ptr, SCMP("mdb_txn_commit"))) {
void *ttxn;
txnpair *tp;
sscanf(ptr+SOFF("mdb_txn_commit"), "%p", &ttxn);
tp = findtxn(ttxn);
E(mdb_txn_commit(tp->rtxn));
deltxn(tp);
} else if (!strncmp(ptr, SCMP("mdb_txn_abort"))) {
void *ttxn;
txnpair *tp;
sscanf(ptr+SOFF("mdb_txn_abort"), "%p", &ttxn);
tp = findtxn(ttxn);
mdb_txn_abort(tp->rtxn);
deltxn(tp);
} else if (!strncmp(ptr, SCMP("mdb_dbi_open"))) {
void *ttxn;
txnpair *tp;
char *dbname;
unsigned int flags;
unsigned int tdbi;
MDB_dbi dbi;
sscanf(ptr+SOFF("mdb_dbi_open"), "%p, ", &ttxn);
dbname = strchr(ptr+SOFF("mdb_dbi_open"), ',');
dbname += 2;
ptr = strchr(dbname, ',');
*ptr++ = '\0';
if (!strcmp(dbname, "(null)"))
dbname = NULL;
sscanf(ptr, "%u = %u", &flags, &tdbi);
tp = findtxn(ttxn);
E(mdb_dbi_open(tp->rtxn, dbname, flags, &dbi));
} else if (!strncmp(ptr, SCMP("mdb_dbi_close"))) {
void *tenv;
envpair *ep;
unsigned int tdbi;
sscanf(ptr+SOFF("mdb_dbi_close"), "%p, %u", &tenv, &tdbi);
ep = findenv(tenv);
mdb_dbi_close(ep->renv, tdbi);
} else if (!strncmp(ptr, SCMP("mdb_cursor_open"))) {
void *ttxn, *tcrs;
txnpair *tp;
MDB_cursor *rcrs;
unsigned int tdbi;
sscanf(ptr+SOFF("mdb_cursor_open"), "%p, %u = %p", &ttxn, &tdbi, &tcrs);
tp = findtxn(ttxn);
E(mdb_cursor_open(tp->rtxn, tdbi, &rcrs));
addcrs(tp, tcrs, rcrs);
} else if (!strncmp(ptr, SCMP("mdb_cursor_close"))) {
void *tcrs;
sscanf(ptr+SOFF("mdb_cursor_close"), "%p", &tcrs);
delcrs(tcrs);
} else if (!strncmp(ptr, SCMP("mdb_cursor_put"))) {
void *tcrs;
crspair *cp;
unsigned int flags;
int len;
sscanf(ptr+SOFF("mdb_cursor_put"), "%p, ", &tcrs);
cp = findcrs(tcrs);
ptr = strchr(ptr+SOFF("mdb_cursor_put"), ',');
sscanf(ptr+1, "%"MDB_SCNy(u)",", &key.mv_size);
if (key.mv_size) {
ptr = strchr(ptr, '[');
inhex(ptr+1, kbuf);
key.mv_data = kbuf;
ptr += key.mv_size * 2 + 2;
}
ptr = strchr(ptr+1, ',');
sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len);
if (data.mv_size > dbufsize) {
dbuf = realloc(dbuf, data.mv_size+2);
assert(dbuf != NULL);
dbufsize = data.mv_size;
}
ptr += len+1;
if (*ptr == '[') {
inhex(ptr+1, dbuf);
data.mv_data = dbuf;
ptr += data.mv_size * 2 + 2;
} else {
sprintf(dbuf, "%09ld", (long)mdb_txn_id(lasttxn->rtxn));
}
sscanf(ptr+1, "%u", &flags);
E(mdb_cursor_put(cp->rcrs, &key, &data, flags));
} else if (!strncmp(ptr, SCMP("mdb_cursor_del"))) {
void *tcrs;
crspair *cp;
unsigned int flags;
sscanf(ptr+SOFF("mdb_cursor_del"), "%p, %u", &tcrs, &flags);
cp = findcrs(tcrs);
E(mdb_cursor_del(cp->rcrs, flags));
} else if (!strncmp(ptr, SCMP("mdb_put"))) {
void *ttxn;
txnpair *tp;
unsigned int tdbi, flags;
int len;
sscanf(ptr+SOFF("mdb_put"),"%p, %u, %"MDB_SCNy(u), &ttxn, &tdbi, &key.mv_size);
tp = findtxn(ttxn);
ptr = strchr(ptr+SOFF("mdb_put"), '[');
inhex(ptr+1, kbuf);
key.mv_data = kbuf;
ptr += key.mv_size * 2 + 2;
sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len);
if (data.mv_size > dbufsize) {
dbuf = realloc(dbuf, data.mv_size+2);
assert(dbuf != NULL);
dbufsize = data.mv_size;
}
ptr += len+1;
if (*ptr == '[') {
inhex(ptr+1, dbuf);
ptr += data.mv_size * 2 + 2;
} else {
sprintf(dbuf, "%09ld", (long)mdb_txn_id(tp->rtxn));
}
data.mv_data = dbuf;
sscanf(ptr+1, "%u", &flags);
RES(MDB_KEYEXIST,mdb_put(tp->rtxn, tdbi, &key, &data, flags));
} else if (!strncmp(ptr, SCMP("mdb_del"))) {
void *ttxn;
txnpair *tp;
unsigned int tdbi;
int len;
sscanf(ptr+SOFF("mdb_del"),"%p, %u, %"MDB_SCNy(u), &ttxn, &tdbi, &key.mv_size);
tp = findtxn(ttxn);
ptr = strchr(ptr+SOFF("mdb_del"), '[');
inhex(ptr+1, kbuf);
key.mv_data = kbuf;
ptr += key.mv_size * 2 + 2;
sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len);
if (data.mv_size > dbufsize) {
dbuf = realloc(dbuf, data.mv_size+2);
assert(dbuf != NULL);
dbufsize = data.mv_size;
}
ptr += len+1;
if (*ptr == '[') {
inhex(ptr+1, dbuf);
} else {
sprintf(dbuf, "%09ld", (long)mdb_txn_id(tp->rtxn));
}
data.mv_data = dbuf;
RES(MDB_NOTFOUND,mdb_del(tp->rtxn, tdbi, &key, &data));
}
write(1, "\n", 1);
}
exit(0);
}
static pidpair *addpid(int tpid)
{
int fdout[2], fdin[2];
pid_t pid;
assert(npids < MAXPIDS);
pids[npids].tpid = tpid;
pipe(fdout);
pipe(fdin);
if ((pid = fork()) == 0) {
/* child */
fclose(stdin);
fclose(stdout);
dup2(fdout[0], 0);
dup2(fdin[1], 1);
stdin = fdopen(0, "r");
stdout = fdopen(1, "w");
child();
return 0; /* NOTREACHED */
} else {
pids[npids].rpid = pid;
pids[npids].fdout = fdout[1];
pids[npids].fdin = fdin[0];
lastpid = pids+npids;
npids++;
return lastpid;
}
}
static pidpair *findpid(int tpid)
{
int i;
if (!lastpid || lastpid->tpid != tpid) {
for (i=0; i<npids; i++)
if (pids[i].tpid == tpid)
break;
if (i == npids)
return NULL;
lastpid = &pids[i];
}
return lastpid;
}
volatile pid_t killpid;
static void delpid(int tpid)
{
pidpair *pp = findpid(tpid);
if (pp) {
pid_t kpid = pp->rpid;
killpid = kpid;
write(pp->fdout, "exit\n", sizeof("exit"));
while (killpid == kpid)
usleep(10000);
}
}
static void reaper(int sig)
{
int status, i;
pid_t pid = waitpid(-1, &status, 0);
if (pid > 0) {
fprintf(stderr, "# %s %d\n", WIFEXITED(status) ? "exited" : "killed", pid);
for (i=0; i<npids; i++)
if (pids[i].rpid == pid)
break;
assert(i < npids);
close(pids[i].fdout);
close(pids[i].fdin);
for (;i<npids-1; i++)
pids[i] = pids[i+1];
npids--;
killpid = 0;
}
}
int main(int argc,char * argv[])
{
signal(SIGCHLD, reaper);
while (fgets(inbuf, sizeof(inbuf), stdin)) {
pidpair *pp;
int tpid, len;
char c, *ptr;
lcount++;
if (inbuf[0] == '#' && !strncmp(inbuf+1, SCMP(" killed"))) {
sscanf(inbuf+SOFF("killed"),"%d", &tpid);
delpid(tpid);
continue;
}
if (inbuf[0] != '>')
continue;
ptr = inbuf+1;
sscanf(ptr, "%d:%n", &tpid, &len);
pp = findpid(tpid);
if (!pp)
pp = addpid(tpid); /* new process */
ptr = inbuf+len+1;
len = strlen(ptr);
write(pp->fdout, ptr, len); /* send command and wait for ack */
read(pp->fdin, &c, 1);
}
while (npids)
delpid(pids[0].tpid);
}