mirror of
https://github.com/postgres/postgres.git
synced 2026-05-28 04:35:45 -04:00
Revert "Allow logical replication snapshots to be database-specific"
This reverts commit 0d3dba38c7, which was determined to have
fundamental flaws. This restricts REPACK (CONCURRENTLY) so that only
one process can run it concurrently on different tables and even on
different databases; we'll lift that restriction in another way during
the next development cycle.
Reported-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAA4eK1Jg21ODQ7fS2fvN5W_S5kDRhAP5inj3XMRQaa=s-GbYhw@mail.gmail.com
This commit is contained in:
parent
2c4bd2bf57
commit
01a80f0621
21 changed files with 17 additions and 159 deletions
|
|
@ -621,7 +621,7 @@ GetStrictOldestNonRemovableTransactionId(Relation rel)
|
|||
else if (rel == NULL || rel->rd_rel->relisshared)
|
||||
{
|
||||
/* Shared relation: take into account all running xids */
|
||||
runningTransactions = GetRunningTransactionData(InvalidOid);
|
||||
runningTransactions = GetRunningTransactionData();
|
||||
LWLockRelease(ProcArrayLock);
|
||||
LWLockRelease(XidGenLock);
|
||||
return runningTransactions->oldestRunningXid;
|
||||
|
|
@ -632,7 +632,7 @@ GetStrictOldestNonRemovableTransactionId(Relation rel)
|
|||
* Normal relation: take into account xids running within the current
|
||||
* database
|
||||
*/
|
||||
runningTransactions = GetRunningTransactionData(InvalidOid);
|
||||
runningTransactions = GetRunningTransactionData();
|
||||
LWLockRelease(ProcArrayLock);
|
||||
LWLockRelease(XidGenLock);
|
||||
return runningTransactions->oldestDatabaseRunningXid;
|
||||
|
|
|
|||
|
|
@ -959,7 +959,6 @@ typedef struct OutputPluginOptions
|
|||
{
|
||||
OutputPluginOutputType output_type;
|
||||
bool receive_rewrites;
|
||||
bool need_shared_catalogs;
|
||||
} OutputPluginOptions;
|
||||
</programlisting>
|
||||
<literal>output_type</literal> has to either be set to
|
||||
|
|
@ -970,9 +969,6 @@ typedef struct OutputPluginOptions
|
|||
also be called for changes made by heap rewrites during certain DDL
|
||||
operations. These are of interest to plugins that handle DDL
|
||||
replication, but they require special handling.
|
||||
<literal>need_shared_catalogs</literal> can be set to false if you are
|
||||
certain the plugin functions do not access shared system catalogs.
|
||||
Doing so can speed up creation of replication slots that use this plugin.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
|
|
|
|||
|
|
@ -394,14 +394,6 @@ systable_beginscan(Relation heapRelation,
|
|||
SysScanDesc sysscan;
|
||||
Relation irel;
|
||||
|
||||
/*
|
||||
* If this backend promised that it won't access shared catalogs during
|
||||
* logical decoding, this it the right place to verify.
|
||||
*/
|
||||
Assert(!HistoricSnapshotActive() ||
|
||||
accessSharedCatalogsInDecoding ||
|
||||
!heapRelation->rd_rel->relisshared);
|
||||
|
||||
if (indexOK &&
|
||||
!IgnoreSystemIndexes &&
|
||||
!ReindexIsProcessingIndex(indexId))
|
||||
|
|
|
|||
|
|
@ -41,8 +41,6 @@ standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
|
|||
for (i = 0; i < xlrec->subxcnt; i++)
|
||||
appendStringInfo(buf, " %u", xlrec->xids[xlrec->xcnt + i]);
|
||||
}
|
||||
|
||||
appendStringInfo(buf, "; dbid: %u", xlrec->dbid);
|
||||
}
|
||||
|
||||
void
|
||||
|
|
|
|||
|
|
@ -7735,7 +7735,7 @@ CreateCheckPoint(int flags)
|
|||
* recovery we don't need to write running xact data.
|
||||
*/
|
||||
if (!shutdown && XLogStandbyInfoActive())
|
||||
LogStandbySnapshot(InvalidOid);
|
||||
LogStandbySnapshot();
|
||||
|
||||
START_CRIT_SECTION();
|
||||
|
||||
|
|
|
|||
|
|
@ -245,7 +245,7 @@ pg_log_standby_snapshot(PG_FUNCTION_ARGS)
|
|||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("pg_log_standby_snapshot() can only be used if \"wal_level\" >= \"replica\"")));
|
||||
|
||||
recptr = LogStandbySnapshot(InvalidOid);
|
||||
recptr = LogStandbySnapshot();
|
||||
|
||||
/*
|
||||
* As a convenience, return the WAL location of the last inserted record
|
||||
|
|
|
|||
|
|
@ -289,7 +289,7 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len)
|
|||
if (now >= timeout &&
|
||||
last_snapshot_lsn <= GetLastImportantRecPtr())
|
||||
{
|
||||
last_snapshot_lsn = LogStandbySnapshot(InvalidOid);
|
||||
last_snapshot_lsn = LogStandbySnapshot();
|
||||
last_snapshot_ts = now;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -386,12 +386,8 @@ standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|||
* Update this decoder's idea of transactions currently
|
||||
* running. In doing so we will determine whether we have
|
||||
* reached consistent status.
|
||||
*
|
||||
* If the output plugin doesn't need access to shared
|
||||
* catalogs, we can ignore transactions in other databases.
|
||||
*/
|
||||
SnapBuildProcessRunningXacts(builder, buf->origptr, running,
|
||||
!ctx->options.need_shared_catalogs);
|
||||
SnapBuildProcessRunningXacts(builder, buf->origptr, running);
|
||||
|
||||
/*
|
||||
* Abort all transactions that we keep track of, that are
|
||||
|
|
@ -401,12 +397,8 @@ standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|||
* all running transactions which includes prepared ones,
|
||||
* while shutdown checkpoints just know that no non-prepared
|
||||
* transactions are in progress.
|
||||
*
|
||||
* The database-specific records might work here too, but it's
|
||||
* not their purpose.
|
||||
*/
|
||||
if (!OidIsValid(running->dbid))
|
||||
ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
|
||||
ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
|
||||
}
|
||||
break;
|
||||
case XLOG_STANDBY_LOCK:
|
||||
|
|
|
|||
|
|
@ -285,9 +285,6 @@ StartupDecodingContext(List *output_plugin_options,
|
|||
ctx->write = do_write;
|
||||
ctx->update_progress = update_progress;
|
||||
|
||||
/* Assume shared catalog access. The startup callback can change it. */
|
||||
ctx->options.need_shared_catalogs = true;
|
||||
|
||||
ctx->output_plugin_options = output_plugin_options;
|
||||
|
||||
ctx->fast_forward = fast_forward;
|
||||
|
|
|
|||
|
|
@ -154,14 +154,6 @@
|
|||
static ResourceOwner SavedResourceOwnerDuringExport = NULL;
|
||||
static bool ExportInProgress = false;
|
||||
|
||||
/*
|
||||
* If a backend is going to do logical decoding and the output plugin does
|
||||
* not need to access shared catalogs, setting this variable to false can make
|
||||
* the decoding startup faster. In particular, the backend will not need to
|
||||
* wait for completion of already running transactions in other databases.
|
||||
*/
|
||||
bool accessSharedCatalogsInDecoding = true;
|
||||
|
||||
/* ->committed and ->catchange manipulation */
|
||||
static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
|
||||
|
||||
|
|
@ -235,9 +227,6 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
|
|||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
/* The default is that shared catalog are used. */
|
||||
accessSharedCatalogsInDecoding = true;
|
||||
|
||||
return builder;
|
||||
}
|
||||
|
||||
|
|
@ -256,9 +245,6 @@ FreeSnapshotBuilder(SnapBuild *builder)
|
|||
builder->snapshot = NULL;
|
||||
}
|
||||
|
||||
/* The default is that shared catalog are used. */
|
||||
accessSharedCatalogsInDecoding = true;
|
||||
|
||||
/* other resources are deallocated via memory context reset */
|
||||
MemoryContextDelete(context);
|
||||
}
|
||||
|
|
@ -1151,8 +1137,7 @@ SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
|
|||
* anymore.
|
||||
*/
|
||||
void
|
||||
SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running,
|
||||
bool db_specific)
|
||||
SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
|
||||
{
|
||||
ReorderBufferTXN *txn;
|
||||
TransactionId xmin;
|
||||
|
|
@ -1164,33 +1149,6 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
|
|||
*/
|
||||
if (builder->state < SNAPBUILD_CONSISTENT)
|
||||
{
|
||||
/*
|
||||
* To reduce the potential for unnecessarily waiting for completion of
|
||||
* unrelated transactions, the caller can declare that only
|
||||
* transactions of the current database are relevant at this stage.
|
||||
*/
|
||||
if (db_specific)
|
||||
{
|
||||
/*
|
||||
* If we must only keep track of transactions running in the
|
||||
* current database, we need transaction info from exactly that
|
||||
* database.
|
||||
*/
|
||||
if (running->dbid != MyDatabaseId)
|
||||
{
|
||||
LogStandbySnapshot(MyDatabaseId);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* We'd better be able to check during scan if the plugin does not
|
||||
* lie.
|
||||
*/
|
||||
if (accessSharedCatalogsInDecoding)
|
||||
accessSharedCatalogsInDecoding = false;
|
||||
}
|
||||
|
||||
/* returns false if there's no point in performing cleanup just yet */
|
||||
if (!SnapBuildFindSnapshot(builder, lsn, running))
|
||||
return;
|
||||
|
|
@ -1198,16 +1156,6 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
|
|||
else
|
||||
SnapBuildSerialize(builder, lsn);
|
||||
|
||||
/*
|
||||
* Database specific transaction info may exist to reach CONSISTENT state
|
||||
* faster, however the code below makes no use of it. Moreover, such
|
||||
* record might cause problems because the following normal (cluster-wide)
|
||||
* record can have lower value of oldestRunningXid. In that case, let's
|
||||
* wait with the cleanup for the next regular cluster-wide record.
|
||||
*/
|
||||
if (OidIsValid(running->dbid))
|
||||
return;
|
||||
|
||||
/*
|
||||
* Update range of interesting xids based on the running xacts
|
||||
* information. We don't increase ->xmax using it, because once we are in
|
||||
|
|
@ -1518,11 +1466,7 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
|
|||
*/
|
||||
if (!RecoveryInProgress())
|
||||
{
|
||||
/*
|
||||
* If the last transaction info was about specific database, so needs
|
||||
* to be the next one - at least until we're in the CONSISTENT state.
|
||||
*/
|
||||
LogStandbySnapshot(running->dbid);
|
||||
LogStandbySnapshot();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -52,13 +52,6 @@ repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
|
|||
/* Probably unnecessary, as we don't use the SQL interface ... */
|
||||
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
|
||||
|
||||
/*
|
||||
* REPACK doesn't need access to shared catalogs, so we can speed up the
|
||||
* historic snapshot creation by setting this flag. We'll only have to
|
||||
* wait for transactions in our database.
|
||||
*/
|
||||
opt->need_shared_catalogs = false;
|
||||
|
||||
if (ctx->output_plugin_options != NIL)
|
||||
{
|
||||
ereport(ERROR,
|
||||
|
|
|
|||
|
|
@ -1776,7 +1776,7 @@ ReplicationSlotReserveWal(void)
|
|||
XLogRecPtr flushptr;
|
||||
|
||||
/* make sure we have enough information to start */
|
||||
flushptr = LogStandbySnapshot(InvalidOid);
|
||||
flushptr = LogStandbySnapshot();
|
||||
|
||||
/* and make sure it's fsynced to disk */
|
||||
XLogFlush(flushptr);
|
||||
|
|
|
|||
|
|
@ -2623,11 +2623,9 @@ ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
|
|||
*
|
||||
* Note that if any transaction has overflowed its cached subtransactions
|
||||
* then there is no real need include any subtransactions.
|
||||
*
|
||||
* If 'dbid' is valid, only gather transactions running in that database.
|
||||
*/
|
||||
RunningTransactions
|
||||
GetRunningTransactionData(Oid dbid)
|
||||
GetRunningTransactionData(void)
|
||||
{
|
||||
/* result workspace */
|
||||
static RunningTransactionsData CurrentRunningXactsData;
|
||||
|
|
@ -2702,18 +2700,6 @@ GetRunningTransactionData(Oid dbid)
|
|||
if (!TransactionIdIsValid(xid))
|
||||
continue;
|
||||
|
||||
/*
|
||||
* Filter by database OID if requested.
|
||||
*/
|
||||
if (OidIsValid(dbid))
|
||||
{
|
||||
int pgprocno = arrayP->pgprocnos[index];
|
||||
PGPROC *proc = &allProcs[pgprocno];
|
||||
|
||||
if (proc->databaseId != dbid)
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* Be careful not to exclude any xids before calculating the values of
|
||||
* oldestRunningXid and suboverflowed, since these are used to clean
|
||||
|
|
@ -2764,12 +2750,6 @@ GetRunningTransactionData(Oid dbid)
|
|||
PGPROC *proc = &allProcs[pgprocno];
|
||||
int nsubxids;
|
||||
|
||||
/*
|
||||
* Filter by database OID if requested.
|
||||
*/
|
||||
if (OidIsValid(dbid) && proc->databaseId != dbid)
|
||||
continue;
|
||||
|
||||
/*
|
||||
* Save subtransaction XIDs. Other backends can't add or remove
|
||||
* entries while we're holding XidGenLock.
|
||||
|
|
@ -2803,7 +2783,6 @@ GetRunningTransactionData(Oid dbid)
|
|||
* increases if slots do.
|
||||
*/
|
||||
|
||||
CurrentRunningXacts->dbid = dbid;
|
||||
CurrentRunningXacts->xcnt = count - subcount;
|
||||
CurrentRunningXacts->subxcnt = subcount;
|
||||
CurrentRunningXacts->subxid_status = suboverflowed ? SUBXIDS_IN_SUBTRANS : SUBXIDS_IN_ARRAY;
|
||||
|
|
|
|||
|
|
@ -1188,14 +1188,6 @@ standby_redo(XLogReaderState *record)
|
|||
xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
|
||||
RunningTransactionsData running;
|
||||
|
||||
/*
|
||||
* Records issued for specific database are not suitable for physical
|
||||
* replication because that affects the whole cluster. In particular,
|
||||
* the list of XID is probably incomplete here.
|
||||
*/
|
||||
if (OidIsValid(xlrec->dbid))
|
||||
return;
|
||||
|
||||
running.xcnt = xlrec->xcnt;
|
||||
running.subxcnt = xlrec->subxcnt;
|
||||
running.subxid_status = xlrec->subxid_overflow ? SUBXIDS_MISSING : SUBXIDS_IN_ARRAY;
|
||||
|
|
@ -1285,22 +1277,11 @@ standby_redo(XLogReaderState *record)
|
|||
* as there's no independent knob to just enable logical decoding. For
|
||||
* details of how this is used, check snapbuild.c's introductory comment.
|
||||
*
|
||||
* If 'dbid' is valid, only gather transactions running in that
|
||||
* database. snapbuild.c can use such running xacts information for faster
|
||||
* startup, but it still needs normal (cluster-wide) during the actual
|
||||
* decoding - see standby_decode() and SnapBuildProcessRunningXacts() for
|
||||
* details. Other processes (e.g. checkpointer) issue the cluster-wide records
|
||||
* whether logical decoding is active or not.
|
||||
*
|
||||
* Please be careful about using this argument for other purposes. In
|
||||
* particular, physical replication *must* ignore the database-specific
|
||||
* records, exactly because they do not cover the whole cluster - see
|
||||
* standby_redo().
|
||||
*
|
||||
* Returns the RecPtr of the last inserted record.
|
||||
*/
|
||||
XLogRecPtr
|
||||
LogStandbySnapshot(Oid dbid)
|
||||
LogStandbySnapshot(void)
|
||||
{
|
||||
XLogRecPtr recptr;
|
||||
RunningTransactions running;
|
||||
|
|
@ -1333,7 +1314,7 @@ LogStandbySnapshot(Oid dbid)
|
|||
* Log details of all in-progress transactions. This should be the last
|
||||
* record we write, because standby will open up when it sees this.
|
||||
*/
|
||||
running = GetRunningTransactionData(dbid);
|
||||
running = GetRunningTransactionData();
|
||||
|
||||
/*
|
||||
* GetRunningTransactionData() acquired ProcArrayLock, we must release it.
|
||||
|
|
@ -1377,7 +1358,6 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
|
|||
xl_running_xacts xlrec;
|
||||
XLogRecPtr recptr;
|
||||
|
||||
xlrec.dbid = CurrRunningXacts->dbid;
|
||||
xlrec.xcnt = CurrRunningXacts->xcnt;
|
||||
xlrec.subxcnt = CurrRunningXacts->subxcnt;
|
||||
xlrec.subxid_overflow = (CurrRunningXacts->subxid_status != SUBXIDS_IN_ARRAY);
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@
|
|||
/*
|
||||
* Each page of XLOG file has a header like this:
|
||||
*/
|
||||
#define XLOG_PAGE_MAGIC 0xD11F /* can be used as WAL version indicator */
|
||||
#define XLOG_PAGE_MAGIC 0xD120 /* can be used as WAL version indicator */
|
||||
|
||||
typedef struct XLogPageHeaderData
|
||||
{
|
||||
|
|
|
|||
|
|
@ -312,15 +312,6 @@ extern void PreventCommandIfReadOnly(const char *cmdname);
|
|||
extern void PreventCommandIfParallelMode(const char *cmdname);
|
||||
extern void PreventCommandDuringRecovery(const char *cmdname);
|
||||
|
||||
/* in replication/snapbuild.c */
|
||||
|
||||
/*
|
||||
* Keep track of whether logical decoding in this backend promised not to
|
||||
* access shared catalogs, as a safety check. This is checked by genam.c when
|
||||
* a catalog scan takes place to verify that no shared catalogs are accessed.
|
||||
*/
|
||||
extern PGDLLIMPORT bool accessSharedCatalogsInDecoding;
|
||||
|
||||
/*****************************************************************************
|
||||
* pdir.h -- *
|
||||
* POSTGRES directory path definitions. *
|
||||
|
|
|
|||
|
|
@ -27,7 +27,6 @@ typedef struct OutputPluginOptions
|
|||
{
|
||||
OutputPluginOutputType output_type;
|
||||
bool receive_rewrites;
|
||||
bool need_shared_catalogs;
|
||||
} OutputPluginOptions;
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -92,8 +92,7 @@ extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
|
|||
XLogRecPtr lsn,
|
||||
xl_heap_new_cid *xlrec);
|
||||
extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
|
||||
xl_running_xacts *running,
|
||||
bool db_specific);
|
||||
xl_running_xacts *running);
|
||||
extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
|
||||
|
||||
extern bool SnapBuildSnapshotExists(XLogRecPtr lsn);
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
|
|||
VirtualTransactionId *sourcevxid);
|
||||
extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc);
|
||||
|
||||
extern RunningTransactions GetRunningTransactionData(Oid dbid);
|
||||
extern RunningTransactions GetRunningTransactionData(void);
|
||||
|
||||
extern bool TransactionIdIsInProgress(TransactionId xid);
|
||||
extern TransactionId GetOldestNonRemovableTransactionId(Relation rel);
|
||||
|
|
|
|||
|
|
@ -126,7 +126,6 @@ typedef enum
|
|||
|
||||
typedef struct RunningTransactionsData
|
||||
{
|
||||
Oid dbid; /* only track xacts in this database */
|
||||
int xcnt; /* # of xact ids in xids[] */
|
||||
int subxcnt; /* # of subxact ids in xids[] */
|
||||
subxids_array_status subxid_status;
|
||||
|
|
@ -144,7 +143,7 @@ typedef RunningTransactionsData *RunningTransactions;
|
|||
extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid);
|
||||
extern void LogAccessExclusiveLockPrepare(void);
|
||||
|
||||
extern XLogRecPtr LogStandbySnapshot(Oid dbid);
|
||||
extern XLogRecPtr LogStandbySnapshot(void);
|
||||
extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
|
||||
bool relcacheInitFileInval);
|
||||
|
||||
|
|
|
|||
|
|
@ -46,7 +46,6 @@ typedef struct xl_standby_locks
|
|||
*/
|
||||
typedef struct xl_running_xacts
|
||||
{
|
||||
Oid dbid; /* only track xacts in this database */
|
||||
int xcnt; /* # of xact ids in xids[] */
|
||||
int subxcnt; /* # of subxact ids in xids[] */
|
||||
bool subxid_overflow; /* snapshot overflowed, subxids missing */
|
||||
|
|
|
|||
Loading…
Reference in a new issue