mirror of
https://github.com/postgres/postgres.git
synced 2026-04-15 22:10:45 -04:00
Standardize replication origin naming to use "ReplOrigin".
The replication origin code was using inconsistent naming conventions. Functions were typically prefixed with 'replorigin', while typedefs and constants used "RepOrigin". This commit unifies the naming convention by renaming RepOriginId to ReplOriginId. Reviewed-by: Chao Li <li.evan.chao@gmail.com> Discussion: https://postgr.es/m/CAD21AoBDgm3hDqUZ+nqu=ViHmkCnJBuJyaxG_yvv27BAi2zBmQ@mail.gmail.com
This commit is contained in:
parent
4020b370f2
commit
1fdbca159e
33 changed files with 146 additions and 146 deletions
|
|
@ -70,7 +70,7 @@ static void pg_decode_truncate(LogicalDecodingContext *ctx,
|
|||
int nrelations, Relation relations[],
|
||||
ReorderBufferChange *change);
|
||||
static bool pg_decode_filter(LogicalDecodingContext *ctx,
|
||||
RepOriginId origin_id);
|
||||
ReplOriginId origin_id);
|
||||
static void pg_decode_message(LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn, XLogRecPtr lsn,
|
||||
bool transactional, const char *prefix,
|
||||
|
|
@ -461,11 +461,11 @@ pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
|
|||
|
||||
static bool
|
||||
pg_decode_filter(LogicalDecodingContext *ctx,
|
||||
RepOriginId origin_id)
|
||||
ReplOriginId origin_id)
|
||||
{
|
||||
TestDecodingData *data = ctx->output_plugin_private;
|
||||
|
||||
if (data->only_local && origin_id != InvalidRepOriginId)
|
||||
if (data->only_local && origin_id != InvalidReplOriginId)
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1100,7 +1100,7 @@ typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
|
|||
output plugin.
|
||||
<programlisting>
|
||||
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
|
||||
RepOriginId origin_id);
|
||||
ReplOriginId origin_id);
|
||||
</programlisting>
|
||||
The <parameter>ctx</parameter> parameter has the same contents
|
||||
as for the other callbacks. No information but the origin is
|
||||
|
|
|
|||
|
|
@ -331,7 +331,7 @@ xact_desc_stats(StringInfo buf, const char *label,
|
|||
}
|
||||
|
||||
static void
|
||||
xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
|
||||
xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, ReplOriginId origin_id)
|
||||
{
|
||||
xl_xact_parsed_commit parsed;
|
||||
|
||||
|
|
@ -367,7 +367,7 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId
|
|||
}
|
||||
|
||||
static void
|
||||
xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec, RepOriginId origin_id)
|
||||
xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec, ReplOriginId origin_id)
|
||||
{
|
||||
xl_xact_parsed_abort parsed;
|
||||
|
||||
|
|
@ -394,7 +394,7 @@ xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec, RepOriginId or
|
|||
}
|
||||
|
||||
static void
|
||||
xact_desc_prepare(StringInfo buf, uint8 info, xl_xact_prepare *xlrec, RepOriginId origin_id)
|
||||
xact_desc_prepare(StringInfo buf, uint8 info, xl_xact_prepare *xlrec, ReplOriginId origin_id)
|
||||
{
|
||||
xl_xact_parsed_prepare parsed;
|
||||
|
||||
|
|
@ -417,7 +417,7 @@ xact_desc_prepare(StringInfo buf, uint8 info, xl_xact_prepare *xlrec, RepOriginI
|
|||
* Check if the replication origin has been set in this record in the same
|
||||
* way as PrepareRedoAdd().
|
||||
*/
|
||||
if (origin_id != InvalidRepOriginId)
|
||||
if (origin_id != InvalidReplOriginId)
|
||||
appendStringInfo(buf, "; origin: node %u, lsn %X/%08X, at %s",
|
||||
origin_id,
|
||||
LSN_FORMAT_ARGS(parsed.origin_lsn),
|
||||
|
|
|
|||
|
|
@ -54,11 +54,11 @@
|
|||
typedef struct CommitTimestampEntry
|
||||
{
|
||||
TimestampTz time;
|
||||
RepOriginId nodeid;
|
||||
ReplOriginId nodeid;
|
||||
} CommitTimestampEntry;
|
||||
|
||||
#define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
|
||||
sizeof(RepOriginId))
|
||||
sizeof(ReplOriginId))
|
||||
|
||||
#define COMMIT_TS_XACTS_PER_PAGE \
|
||||
(BLCKSZ / SizeOfCommitTimestampEntry)
|
||||
|
|
@ -110,9 +110,9 @@ bool track_commit_timestamp;
|
|||
|
||||
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
|
||||
TransactionId *subxids, TimestampTz ts,
|
||||
RepOriginId nodeid, int64 pageno);
|
||||
ReplOriginId nodeid, int64 pageno);
|
||||
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
|
||||
RepOriginId nodeid, int slotno);
|
||||
ReplOriginId nodeid, int slotno);
|
||||
static void error_commit_ts_disabled(void);
|
||||
static bool CommitTsPagePrecedes(int64 page1, int64 page2);
|
||||
static void ActivateCommitTs(void);
|
||||
|
|
@ -138,7 +138,7 @@ static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
|
|||
void
|
||||
TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
|
||||
TransactionId *subxids, TimestampTz timestamp,
|
||||
RepOriginId nodeid)
|
||||
ReplOriginId nodeid)
|
||||
{
|
||||
int i;
|
||||
TransactionId headxid;
|
||||
|
|
@ -219,7 +219,7 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
|
|||
static void
|
||||
SetXidCommitTsInPage(TransactionId xid, int nsubxids,
|
||||
TransactionId *subxids, TimestampTz ts,
|
||||
RepOriginId nodeid, int64 pageno)
|
||||
ReplOriginId nodeid, int64 pageno)
|
||||
{
|
||||
LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
|
||||
int slotno;
|
||||
|
|
@ -245,7 +245,7 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids,
|
|||
*/
|
||||
static void
|
||||
TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
|
||||
RepOriginId nodeid, int slotno)
|
||||
ReplOriginId nodeid, int slotno)
|
||||
{
|
||||
int entryno = TransactionIdToCTsEntry(xid);
|
||||
CommitTimestampEntry entry;
|
||||
|
|
@ -270,7 +270,7 @@ TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
|
|||
*/
|
||||
bool
|
||||
TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
|
||||
RepOriginId *nodeid)
|
||||
ReplOriginId *nodeid)
|
||||
{
|
||||
int64 pageno = TransactionIdToCTsPage(xid);
|
||||
int entryno = TransactionIdToCTsEntry(xid);
|
||||
|
|
@ -327,7 +327,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
|
|||
{
|
||||
*ts = 0;
|
||||
if (nodeid)
|
||||
*nodeid = InvalidRepOriginId;
|
||||
*nodeid = InvalidReplOriginId;
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
@ -355,7 +355,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
|
|||
* as NULL if not wanted.
|
||||
*/
|
||||
TransactionId
|
||||
GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
|
||||
GetLatestCommitTsData(TimestampTz *ts, ReplOriginId *nodeid)
|
||||
{
|
||||
TransactionId xid;
|
||||
|
||||
|
|
@ -418,7 +418,7 @@ Datum
|
|||
pg_last_committed_xact(PG_FUNCTION_ARGS)
|
||||
{
|
||||
TransactionId xid;
|
||||
RepOriginId nodeid;
|
||||
ReplOriginId nodeid;
|
||||
TimestampTz ts;
|
||||
Datum values[3];
|
||||
bool nulls[3];
|
||||
|
|
@ -462,7 +462,7 @@ Datum
|
|||
pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
|
||||
{
|
||||
TransactionId xid = PG_GETARG_TRANSACTIONID(0);
|
||||
RepOriginId nodeid;
|
||||
ReplOriginId nodeid;
|
||||
TimestampTz ts;
|
||||
Datum values[2];
|
||||
bool nulls[2];
|
||||
|
|
@ -568,7 +568,7 @@ CommitTsShmemInit(void)
|
|||
|
||||
commitTsShared->xidLastCommit = InvalidTransactionId;
|
||||
TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
|
||||
commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
|
||||
commitTsShared->dataLastCommit.nodeid = InvalidReplOriginId;
|
||||
commitTsShared->commitTsActive = false;
|
||||
}
|
||||
else
|
||||
|
|
@ -763,7 +763,7 @@ DeactivateCommitTs(void)
|
|||
commitTsShared->commitTsActive = false;
|
||||
commitTsShared->xidLastCommit = InvalidTransactionId;
|
||||
TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
|
||||
commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
|
||||
commitTsShared->dataLastCommit.nodeid = InvalidReplOriginId;
|
||||
|
||||
TransamVariables->oldestCommitTsXid = InvalidTransactionId;
|
||||
TransamVariables->newestCommitTsXid = InvalidTransactionId;
|
||||
|
|
|
|||
|
|
@ -1157,7 +1157,7 @@ EndPrepare(GlobalTransaction gxact)
|
|||
Assert(hdr->magic == TWOPHASE_MAGIC);
|
||||
hdr->total_len = records.total_len + sizeof(pg_crc32c);
|
||||
|
||||
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
|
||||
replorigin = (replorigin_session_origin != InvalidReplOriginId &&
|
||||
replorigin_session_origin != DoNotReplicateId);
|
||||
|
||||
if (replorigin)
|
||||
|
|
@ -1924,7 +1924,7 @@ restoreTwoPhaseData(void)
|
|||
continue;
|
||||
|
||||
PrepareRedoAdd(fxid, buf, InvalidXLogRecPtr,
|
||||
InvalidXLogRecPtr, InvalidRepOriginId);
|
||||
InvalidXLogRecPtr, InvalidReplOriginId);
|
||||
}
|
||||
}
|
||||
LWLockRelease(TwoPhaseStateLock);
|
||||
|
|
@ -2330,7 +2330,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
|
|||
* Are we using the replication origins feature? Or, in other words, are
|
||||
* we replaying remote actions?
|
||||
*/
|
||||
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
|
||||
replorigin = (replorigin_session_origin != InvalidReplOriginId &&
|
||||
replorigin_session_origin != DoNotReplicateId);
|
||||
|
||||
/* Load the injection point before entering the critical section */
|
||||
|
|
@ -2445,7 +2445,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
|
|||
* Are we using the replication origins feature? Or, in other words, are
|
||||
* we replaying remote actions?
|
||||
*/
|
||||
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
|
||||
replorigin = (replorigin_session_origin != InvalidReplOriginId &&
|
||||
replorigin_session_origin != DoNotReplicateId);
|
||||
|
||||
/*
|
||||
|
|
@ -2506,7 +2506,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
|
|||
void
|
||||
PrepareRedoAdd(FullTransactionId fxid, char *buf,
|
||||
XLogRecPtr start_lsn, XLogRecPtr end_lsn,
|
||||
RepOriginId origin_id)
|
||||
ReplOriginId origin_id)
|
||||
{
|
||||
TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
|
||||
char *bufptr;
|
||||
|
|
@ -2595,7 +2595,7 @@ PrepareRedoAdd(FullTransactionId fxid, char *buf,
|
|||
Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
|
||||
TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
|
||||
|
||||
if (origin_id != InvalidRepOriginId)
|
||||
if (origin_id != InvalidReplOriginId)
|
||||
{
|
||||
/* recover apply progress */
|
||||
replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
|
||||
|
|
|
|||
|
|
@ -1413,7 +1413,7 @@ RecordTransactionCommit(void)
|
|||
* Are we using the replication origins feature? Or, in other words,
|
||||
* are we replaying remote actions?
|
||||
*/
|
||||
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
|
||||
replorigin = (replorigin_session_origin != InvalidReplOriginId &&
|
||||
replorigin_session_origin != DoNotReplicateId);
|
||||
|
||||
/*
|
||||
|
|
@ -1810,7 +1810,7 @@ RecordTransactionAbort(bool isSubXact)
|
|||
* Are we using the replication origins feature? Or, in other words, are
|
||||
* we replaying remote actions?
|
||||
*/
|
||||
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
|
||||
replorigin = (replorigin_session_origin != InvalidReplOriginId &&
|
||||
replorigin_session_origin != DoNotReplicateId);
|
||||
|
||||
/* Fetch the data we need for the abort record */
|
||||
|
|
@ -5928,7 +5928,7 @@ XactLogCommitRecord(TimestampTz commit_time,
|
|||
}
|
||||
|
||||
/* dump transaction origin information */
|
||||
if (replorigin_session_origin != InvalidRepOriginId)
|
||||
if (replorigin_session_origin != InvalidReplOriginId)
|
||||
{
|
||||
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
|
||||
|
||||
|
|
@ -6081,7 +6081,7 @@ XactLogAbortRecord(TimestampTz abort_time,
|
|||
* Dump transaction origin information. We need this during recovery to
|
||||
* update the replication origin progress.
|
||||
*/
|
||||
if (replorigin_session_origin != InvalidRepOriginId)
|
||||
if (replorigin_session_origin != InvalidReplOriginId)
|
||||
{
|
||||
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
|
||||
|
||||
|
|
@ -6152,7 +6152,7 @@ static void
|
|||
xact_redo_commit(xl_xact_parsed_commit *parsed,
|
||||
TransactionId xid,
|
||||
XLogRecPtr lsn,
|
||||
RepOriginId origin_id)
|
||||
ReplOriginId origin_id)
|
||||
{
|
||||
TransactionId max_xid;
|
||||
TimestampTz commit_time;
|
||||
|
|
@ -6165,7 +6165,7 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
|
|||
AdvanceNextFullTransactionIdPastXid(max_xid);
|
||||
|
||||
Assert(((parsed->xinfo & XACT_XINFO_HAS_ORIGIN) == 0) ==
|
||||
(origin_id == InvalidRepOriginId));
|
||||
(origin_id == InvalidReplOriginId));
|
||||
|
||||
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
|
||||
commit_time = parsed->origin_timestamp;
|
||||
|
|
@ -6304,7 +6304,7 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
|
|||
*/
|
||||
static void
|
||||
xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid,
|
||||
XLogRecPtr lsn, RepOriginId origin_id)
|
||||
XLogRecPtr lsn, ReplOriginId origin_id)
|
||||
{
|
||||
TransactionId max_xid;
|
||||
|
||||
|
|
|
|||
|
|
@ -115,7 +115,7 @@ static uint8 curinsert_flags = 0;
|
|||
static XLogRecData hdr_rdt;
|
||||
static char *hdr_scratch = NULL;
|
||||
|
||||
#define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char))
|
||||
#define SizeOfXlogOrigin (sizeof(ReplOriginId) + sizeof(char))
|
||||
#define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char))
|
||||
|
||||
#define HEADER_SCRATCH_SIZE \
|
||||
|
|
@ -861,7 +861,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
|
|||
|
||||
/* followed by the record's origin, if any */
|
||||
if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
|
||||
replorigin_session_origin != InvalidRepOriginId)
|
||||
replorigin_session_origin != InvalidReplOriginId)
|
||||
{
|
||||
*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
|
||||
memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
|
||||
|
|
|
|||
|
|
@ -1707,7 +1707,7 @@ DecodeXLogRecord(XLogReaderState *state,
|
|||
decoded->header = *record;
|
||||
decoded->lsn = lsn;
|
||||
decoded->next = NULL;
|
||||
decoded->record_origin = InvalidRepOriginId;
|
||||
decoded->record_origin = InvalidReplOriginId;
|
||||
decoded->toplevel_xid = InvalidTransactionId;
|
||||
decoded->main_data = NULL;
|
||||
decoded->main_data_len = 0;
|
||||
|
|
@ -1747,7 +1747,7 @@ DecodeXLogRecord(XLogReaderState *state,
|
|||
}
|
||||
else if (block_id == XLR_BLOCK_ID_ORIGIN)
|
||||
{
|
||||
COPY_HEADER_FIELD(&decoded->record_origin, sizeof(RepOriginId));
|
||||
COPY_HEADER_FIELD(&decoded->record_origin, sizeof(ReplOriginId));
|
||||
}
|
||||
else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -1897,7 +1897,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
|
|||
*/
|
||||
if (XLogRecPtrIsValid(opts.lsn))
|
||||
{
|
||||
RepOriginId originid;
|
||||
ReplOriginId originid;
|
||||
char originname[NAMEDATALEN];
|
||||
XLogRecPtr remote_lsn;
|
||||
|
||||
|
|
|
|||
|
|
@ -480,7 +480,7 @@ update_most_recent_deletion_info(TupleTableSlot *scanslot,
|
|||
TransactionId oldestxmin,
|
||||
TransactionId *delete_xid,
|
||||
TimestampTz *delete_time,
|
||||
RepOriginId *delete_origin)
|
||||
ReplOriginId *delete_origin)
|
||||
{
|
||||
BufferHeapTupleTableSlot *hslot;
|
||||
HeapTuple tuple;
|
||||
|
|
@ -488,7 +488,7 @@ update_most_recent_deletion_info(TupleTableSlot *scanslot,
|
|||
bool recently_dead = false;
|
||||
TransactionId xmax;
|
||||
TimestampTz localts;
|
||||
RepOriginId localorigin;
|
||||
ReplOriginId localorigin;
|
||||
|
||||
hslot = (BufferHeapTupleTableSlot *) scanslot;
|
||||
|
||||
|
|
@ -562,7 +562,7 @@ bool
|
|||
RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
|
||||
TransactionId oldestxmin,
|
||||
TransactionId *delete_xid,
|
||||
RepOriginId *delete_origin,
|
||||
ReplOriginId *delete_origin,
|
||||
TimestampTz *delete_time)
|
||||
{
|
||||
TupleTableSlot *scanslot;
|
||||
|
|
@ -574,7 +574,7 @@ RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
|
|||
Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
|
||||
|
||||
*delete_xid = InvalidTransactionId;
|
||||
*delete_origin = InvalidRepOriginId;
|
||||
*delete_origin = InvalidReplOriginId;
|
||||
*delete_time = 0;
|
||||
|
||||
/*
|
||||
|
|
@ -632,7 +632,7 @@ RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
|
|||
TupleTableSlot *searchslot,
|
||||
TransactionId oldestxmin,
|
||||
TransactionId *delete_xid,
|
||||
RepOriginId *delete_origin,
|
||||
ReplOriginId *delete_origin,
|
||||
TimestampTz *delete_time)
|
||||
{
|
||||
Relation idxrel;
|
||||
|
|
@ -649,7 +649,7 @@ RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
|
|||
|
||||
*delete_xid = InvalidTransactionId;
|
||||
*delete_time = 0;
|
||||
*delete_origin = InvalidRepOriginId;
|
||||
*delete_origin = InvalidReplOriginId;
|
||||
|
||||
isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
|
||||
|
||||
|
|
|
|||
|
|
@ -864,7 +864,7 @@ ParallelApplyWorkerMain(Datum main_arg)
|
|||
shm_mq *mq;
|
||||
shm_mq_handle *mqh;
|
||||
shm_mq_handle *error_mqh;
|
||||
RepOriginId originid;
|
||||
ReplOriginId originid;
|
||||
int worker_slot = DatumGetInt32(main_arg);
|
||||
char originname[NAMEDATALEN];
|
||||
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ static void errdetail_apply_conflict(EState *estate,
|
|||
TupleTableSlot *localslot,
|
||||
TupleTableSlot *remoteslot,
|
||||
Oid indexoid, TransactionId localxmin,
|
||||
RepOriginId localorigin,
|
||||
ReplOriginId localorigin,
|
||||
TimestampTz localts, StringInfo err_msg);
|
||||
static void get_tuple_desc(EState *estate, ResultRelInfo *relinfo,
|
||||
ConflictType type, char **key_desc,
|
||||
|
|
@ -61,7 +61,7 @@ static char *build_index_value_desc(EState *estate, Relation localrel,
|
|||
*/
|
||||
bool
|
||||
GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
|
||||
RepOriginId *localorigin, TimestampTz *localts)
|
||||
ReplOriginId *localorigin, TimestampTz *localts)
|
||||
{
|
||||
Datum xminDatum;
|
||||
bool isnull;
|
||||
|
|
@ -77,7 +77,7 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
|
|||
*/
|
||||
if (!track_commit_timestamp)
|
||||
{
|
||||
*localorigin = InvalidRepOriginId;
|
||||
*localorigin = InvalidReplOriginId;
|
||||
*localts = 0;
|
||||
return false;
|
||||
}
|
||||
|
|
@ -253,7 +253,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
|
|||
ConflictType type, TupleTableSlot *searchslot,
|
||||
TupleTableSlot *localslot, TupleTableSlot *remoteslot,
|
||||
Oid indexoid, TransactionId localxmin,
|
||||
RepOriginId localorigin, TimestampTz localts,
|
||||
ReplOriginId localorigin, TimestampTz localts,
|
||||
StringInfo err_msg)
|
||||
{
|
||||
StringInfoData err_detail;
|
||||
|
|
@ -292,7 +292,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
|
|||
|
||||
if (localts)
|
||||
{
|
||||
if (localorigin == InvalidRepOriginId)
|
||||
if (localorigin == InvalidReplOriginId)
|
||||
appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s"),
|
||||
get_rel_name(indexoid),
|
||||
localxmin, timestamptz_to_str(localts));
|
||||
|
|
@ -323,7 +323,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
|
|||
break;
|
||||
|
||||
case CT_UPDATE_ORIGIN_DIFFERS:
|
||||
if (localorigin == InvalidRepOriginId)
|
||||
if (localorigin == InvalidReplOriginId)
|
||||
appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s"),
|
||||
localxmin, timestamptz_to_str(localts));
|
||||
else if (replorigin_by_oid(localorigin, true, &origin_name))
|
||||
|
|
@ -350,7 +350,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
|
|||
|
||||
if (localts)
|
||||
{
|
||||
if (localorigin == InvalidRepOriginId)
|
||||
if (localorigin == InvalidReplOriginId)
|
||||
appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s"),
|
||||
localxmin, timestamptz_to_str(localts));
|
||||
else if (replorigin_by_oid(localorigin, true, &origin_name))
|
||||
|
|
@ -377,7 +377,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
|
|||
break;
|
||||
|
||||
case CT_DELETE_ORIGIN_DIFFERS:
|
||||
if (localorigin == InvalidRepOriginId)
|
||||
if (localorigin == InvalidReplOriginId)
|
||||
appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s"),
|
||||
localxmin, timestamptz_to_str(localts));
|
||||
else if (replorigin_by_oid(localorigin, true, &origin_name))
|
||||
|
|
|
|||
|
|
@ -66,7 +66,7 @@ static inline bool FilterPrepare(LogicalDecodingContext *ctx,
|
|||
TransactionId xid, const char *gid);
|
||||
static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
|
||||
XLogRecordBuffer *buf, Oid txn_dbid,
|
||||
RepOriginId origin_id);
|
||||
ReplOriginId origin_id);
|
||||
|
||||
/*
|
||||
* Take every XLogReadRecord()ed record and perform the actions required to
|
||||
|
|
@ -566,7 +566,7 @@ FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
|
|||
}
|
||||
|
||||
static inline bool
|
||||
FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
|
||||
FilterByOrigin(LogicalDecodingContext *ctx, ReplOriginId origin_id)
|
||||
{
|
||||
if (ctx->callbacks.filter_by_origin_cb == NULL)
|
||||
return false;
|
||||
|
|
@ -584,7 +584,7 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
|||
XLogReaderState *r = buf->record;
|
||||
TransactionId xid = XLogRecGetXid(r);
|
||||
uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
|
||||
RepOriginId origin_id = XLogRecGetOrigin(r);
|
||||
ReplOriginId origin_id = XLogRecGetOrigin(r);
|
||||
Snapshot snapshot = NULL;
|
||||
xl_logical_message *message;
|
||||
|
||||
|
|
@ -665,7 +665,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
|||
{
|
||||
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
|
||||
TimestampTz commit_time = parsed->xact_time;
|
||||
RepOriginId origin_id = XLogRecGetOrigin(buf->record);
|
||||
ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
|
||||
int i;
|
||||
|
||||
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
|
||||
|
|
@ -761,7 +761,7 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
|||
SnapBuild *builder = ctx->snapshot_builder;
|
||||
XLogRecPtr origin_lsn = parsed->origin_lsn;
|
||||
TimestampTz prepare_time = parsed->xact_time;
|
||||
RepOriginId origin_id = XLogRecGetOrigin(buf->record);
|
||||
ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
|
||||
int i;
|
||||
TransactionId xid = parsed->twophase_xid;
|
||||
|
||||
|
|
@ -837,7 +837,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
|||
int i;
|
||||
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
|
||||
TimestampTz abort_time = parsed->xact_time;
|
||||
RepOriginId origin_id = XLogRecGetOrigin(buf->record);
|
||||
ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
|
||||
bool skip_xact;
|
||||
|
||||
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
|
||||
|
|
@ -1289,7 +1289,7 @@ DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
|
|||
*/
|
||||
static bool
|
||||
DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
||||
Oid txn_dbid, RepOriginId origin_id)
|
||||
Oid txn_dbid, ReplOriginId origin_id)
|
||||
{
|
||||
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
|
||||
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
|
||||
|
|
|
|||
|
|
@ -1187,7 +1187,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
|
|||
}
|
||||
|
||||
bool
|
||||
filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
|
||||
filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, ReplOriginId origin_id)
|
||||
{
|
||||
LogicalErrorCallbackState state;
|
||||
ErrorContextCallback errcallback;
|
||||
|
|
|
|||
|
|
@ -111,7 +111,7 @@ typedef struct ReplicationState
|
|||
/*
|
||||
* Local identifier for the remote node.
|
||||
*/
|
||||
RepOriginId roident;
|
||||
ReplOriginId roident;
|
||||
|
||||
/*
|
||||
* Location of the latest commit from the remote side.
|
||||
|
|
@ -149,7 +149,7 @@ typedef struct ReplicationState
|
|||
*/
|
||||
typedef struct ReplicationStateOnDisk
|
||||
{
|
||||
RepOriginId roident;
|
||||
ReplOriginId roident;
|
||||
XLogRecPtr remote_lsn;
|
||||
} ReplicationStateOnDisk;
|
||||
|
||||
|
|
@ -163,7 +163,7 @@ typedef struct ReplicationStateCtl
|
|||
} ReplicationStateCtl;
|
||||
|
||||
/* external variables */
|
||||
RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
|
||||
ReplOriginId replorigin_session_origin = InvalidReplOriginId; /* assumed identity */
|
||||
XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
|
||||
TimestampTz replorigin_session_origin_timestamp = 0;
|
||||
|
||||
|
|
@ -225,7 +225,7 @@ IsReservedOriginName(const char *name)
|
|||
*
|
||||
* Returns InvalidOid if the node isn't known yet and missing_ok is true.
|
||||
*/
|
||||
RepOriginId
|
||||
ReplOriginId
|
||||
replorigin_by_name(const char *roname, bool missing_ok)
|
||||
{
|
||||
Form_pg_replication_origin ident;
|
||||
|
|
@ -256,7 +256,7 @@ replorigin_by_name(const char *roname, bool missing_ok)
|
|||
*
|
||||
* Needs to be called in a transaction.
|
||||
*/
|
||||
RepOriginId
|
||||
ReplOriginId
|
||||
replorigin_create(const char *roname)
|
||||
{
|
||||
Oid roident;
|
||||
|
|
@ -369,7 +369,7 @@ replorigin_create(const char *roname)
|
|||
* Helper function to drop a replication origin.
|
||||
*/
|
||||
static void
|
||||
replorigin_state_clear(RepOriginId roident, bool nowait)
|
||||
replorigin_state_clear(ReplOriginId roident, bool nowait)
|
||||
{
|
||||
int i;
|
||||
|
||||
|
|
@ -426,7 +426,7 @@ restart:
|
|||
}
|
||||
|
||||
/* then clear the in-memory slot */
|
||||
state->roident = InvalidRepOriginId;
|
||||
state->roident = InvalidReplOriginId;
|
||||
state->remote_lsn = InvalidXLogRecPtr;
|
||||
state->local_lsn = InvalidXLogRecPtr;
|
||||
break;
|
||||
|
|
@ -444,7 +444,7 @@ restart:
|
|||
void
|
||||
replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
|
||||
{
|
||||
RepOriginId roident;
|
||||
ReplOriginId roident;
|
||||
Relation rel;
|
||||
HeapTuple tuple;
|
||||
|
||||
|
|
@ -496,13 +496,13 @@ replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
|
|||
* Returns true if the origin is known, false otherwise.
|
||||
*/
|
||||
bool
|
||||
replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
|
||||
replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
|
||||
{
|
||||
HeapTuple tuple;
|
||||
Form_pg_replication_origin ric;
|
||||
|
||||
Assert(OidIsValid((Oid) roident));
|
||||
Assert(roident != InvalidRepOriginId);
|
||||
Assert(roident != InvalidReplOriginId);
|
||||
Assert(roident != DoNotReplicateId);
|
||||
|
||||
tuple = SearchSysCache1(REPLORIGIDENT,
|
||||
|
|
@ -656,7 +656,7 @@ CheckPointReplicationOrigin(void)
|
|||
ReplicationState *curstate = &replication_states[i];
|
||||
XLogRecPtr local_lsn;
|
||||
|
||||
if (curstate->roident == InvalidRepOriginId)
|
||||
if (curstate->roident == InvalidReplOriginId)
|
||||
continue;
|
||||
|
||||
/* zero, to avoid uninitialized padding bytes */
|
||||
|
|
@ -884,7 +884,7 @@ replorigin_redo(XLogReaderState *record)
|
|||
if (state->roident == xlrec->node_id)
|
||||
{
|
||||
/* reset entry */
|
||||
state->roident = InvalidRepOriginId;
|
||||
state->roident = InvalidReplOriginId;
|
||||
state->remote_lsn = InvalidXLogRecPtr;
|
||||
state->local_lsn = InvalidXLogRecPtr;
|
||||
break;
|
||||
|
|
@ -913,7 +913,7 @@ replorigin_redo(XLogReaderState *record)
|
|||
* unless running in recovery.
|
||||
*/
|
||||
void
|
||||
replorigin_advance(RepOriginId node,
|
||||
replorigin_advance(ReplOriginId node,
|
||||
XLogRecPtr remote_commit, XLogRecPtr local_commit,
|
||||
bool go_backward, bool wal_log)
|
||||
{
|
||||
|
|
@ -921,7 +921,7 @@ replorigin_advance(RepOriginId node,
|
|||
ReplicationState *replication_state = NULL;
|
||||
ReplicationState *free_state = NULL;
|
||||
|
||||
Assert(node != InvalidRepOriginId);
|
||||
Assert(node != InvalidReplOriginId);
|
||||
|
||||
/* we don't track DoNotReplicateId */
|
||||
if (node == DoNotReplicateId)
|
||||
|
|
@ -946,7 +946,7 @@ replorigin_advance(RepOriginId node,
|
|||
ReplicationState *curstate = &replication_states[i];
|
||||
|
||||
/* remember where to insert if necessary */
|
||||
if (curstate->roident == InvalidRepOriginId &&
|
||||
if (curstate->roident == InvalidReplOriginId &&
|
||||
free_state == NULL)
|
||||
{
|
||||
free_state = curstate;
|
||||
|
|
@ -997,7 +997,7 @@ replorigin_advance(RepOriginId node,
|
|||
replication_state->roident = node;
|
||||
}
|
||||
|
||||
Assert(replication_state->roident != InvalidRepOriginId);
|
||||
Assert(replication_state->roident != InvalidReplOriginId);
|
||||
|
||||
/*
|
||||
* If somebody "forcefully" sets this slot, WAL log it, so it's durable
|
||||
|
|
@ -1042,7 +1042,7 @@ replorigin_advance(RepOriginId node,
|
|||
|
||||
|
||||
XLogRecPtr
|
||||
replorigin_get_progress(RepOriginId node, bool flush)
|
||||
replorigin_get_progress(ReplOriginId node, bool flush)
|
||||
{
|
||||
int i;
|
||||
XLogRecPtr local_lsn = InvalidXLogRecPtr;
|
||||
|
|
@ -1141,7 +1141,7 @@ ReplicationOriginExitCleanup(int code, Datum arg)
|
|||
* acquired_by = PID of the first process.
|
||||
*/
|
||||
void
|
||||
replorigin_session_setup(RepOriginId node, int acquired_by)
|
||||
replorigin_session_setup(ReplOriginId node, int acquired_by)
|
||||
{
|
||||
static bool registered_cleanup;
|
||||
int i;
|
||||
|
|
@ -1172,7 +1172,7 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
|
|||
ReplicationState *curstate = &replication_states[i];
|
||||
|
||||
/* remember where to insert if necessary */
|
||||
if (curstate->roident == InvalidRepOriginId &&
|
||||
if (curstate->roident == InvalidReplOriginId &&
|
||||
free_slot == -1)
|
||||
{
|
||||
free_slot = i;
|
||||
|
|
@ -1239,7 +1239,7 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
|
|||
}
|
||||
|
||||
|
||||
Assert(session_replication_state->roident != InvalidRepOriginId);
|
||||
Assert(session_replication_state->roident != InvalidReplOriginId);
|
||||
|
||||
if (acquired_by == 0)
|
||||
{
|
||||
|
|
@ -1308,7 +1308,7 @@ void
|
|||
replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
|
||||
{
|
||||
Assert(session_replication_state != NULL);
|
||||
Assert(session_replication_state->roident != InvalidRepOriginId);
|
||||
Assert(session_replication_state->roident != InvalidReplOriginId);
|
||||
|
||||
LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
|
||||
if (session_replication_state->local_lsn < local_commit)
|
||||
|
|
@ -1358,7 +1358,7 @@ Datum
|
|||
pg_replication_origin_create(PG_FUNCTION_ARGS)
|
||||
{
|
||||
char *name;
|
||||
RepOriginId roident;
|
||||
ReplOriginId roident;
|
||||
|
||||
replorigin_check_prerequisites(false, false);
|
||||
|
||||
|
|
@ -1418,7 +1418,7 @@ Datum
|
|||
pg_replication_origin_oid(PG_FUNCTION_ARGS)
|
||||
{
|
||||
char *name;
|
||||
RepOriginId roident;
|
||||
ReplOriginId roident;
|
||||
|
||||
replorigin_check_prerequisites(false, false);
|
||||
|
||||
|
|
@ -1439,7 +1439,7 @@ Datum
|
|||
pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
|
||||
{
|
||||
char *name;
|
||||
RepOriginId origin;
|
||||
ReplOriginId origin;
|
||||
int pid;
|
||||
|
||||
replorigin_check_prerequisites(true, false);
|
||||
|
|
@ -1466,7 +1466,7 @@ pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
|
|||
|
||||
replorigin_session_reset();
|
||||
|
||||
replorigin_session_origin = InvalidRepOriginId;
|
||||
replorigin_session_origin = InvalidReplOriginId;
|
||||
replorigin_session_origin_lsn = InvalidXLogRecPtr;
|
||||
replorigin_session_origin_timestamp = 0;
|
||||
|
||||
|
|
@ -1481,7 +1481,7 @@ pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
|
|||
{
|
||||
replorigin_check_prerequisites(false, false);
|
||||
|
||||
PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
|
||||
PG_RETURN_BOOL(replorigin_session_origin != InvalidReplOriginId);
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -1548,7 +1548,7 @@ pg_replication_origin_advance(PG_FUNCTION_ARGS)
|
|||
{
|
||||
text *name = PG_GETARG_TEXT_PP(0);
|
||||
XLogRecPtr remote_commit = PG_GETARG_LSN(1);
|
||||
RepOriginId node;
|
||||
ReplOriginId node;
|
||||
|
||||
replorigin_check_prerequisites(true, false);
|
||||
|
||||
|
|
@ -1583,7 +1583,7 @@ pg_replication_origin_progress(PG_FUNCTION_ARGS)
|
|||
{
|
||||
char *name;
|
||||
bool flush;
|
||||
RepOriginId roident;
|
||||
ReplOriginId roident;
|
||||
XLogRecPtr remote_lsn = InvalidXLogRecPtr;
|
||||
|
||||
replorigin_check_prerequisites(true, true);
|
||||
|
|
@ -1633,7 +1633,7 @@ pg_show_replication_origin_status(PG_FUNCTION_ARGS)
|
|||
state = &replication_states[i];
|
||||
|
||||
/* unused slot, nothing to display */
|
||||
if (state->roident == InvalidRepOriginId)
|
||||
if (state->roident == InvalidReplOriginId)
|
||||
continue;
|
||||
|
||||
memset(values, 0, sizeof(values));
|
||||
|
|
|
|||
|
|
@ -2824,7 +2824,7 @@ ReorderBufferReplay(ReorderBufferTXN *txn,
|
|||
ReorderBuffer *rb, TransactionId xid,
|
||||
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
|
||||
TimestampTz commit_time,
|
||||
RepOriginId origin_id, XLogRecPtr origin_lsn)
|
||||
ReplOriginId origin_id, XLogRecPtr origin_lsn)
|
||||
{
|
||||
Snapshot snapshot_now;
|
||||
CommandId command_id = FirstCommandId;
|
||||
|
|
@ -2884,7 +2884,7 @@ void
|
|||
ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
|
||||
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
|
||||
TimestampTz commit_time,
|
||||
RepOriginId origin_id, XLogRecPtr origin_lsn)
|
||||
ReplOriginId origin_id, XLogRecPtr origin_lsn)
|
||||
{
|
||||
ReorderBufferTXN *txn;
|
||||
|
||||
|
|
@ -2907,7 +2907,7 @@ bool
|
|||
ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
|
||||
XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
|
||||
TimestampTz prepare_time,
|
||||
RepOriginId origin_id, XLogRecPtr origin_lsn)
|
||||
ReplOriginId origin_id, XLogRecPtr origin_lsn)
|
||||
{
|
||||
ReorderBufferTXN *txn;
|
||||
|
||||
|
|
@ -3001,7 +3001,7 @@ void
|
|||
ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
|
||||
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
|
||||
XLogRecPtr two_phase_at,
|
||||
TimestampTz commit_time, RepOriginId origin_id,
|
||||
TimestampTz commit_time, ReplOriginId origin_id,
|
||||
XLogRecPtr origin_lsn, char *gid, bool is_commit)
|
||||
{
|
||||
ReorderBufferTXN *txn;
|
||||
|
|
|
|||
|
|
@ -323,7 +323,7 @@ ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
|
|||
* This is needed to allow the origin to be dropped.
|
||||
*/
|
||||
replorigin_session_reset();
|
||||
replorigin_session_origin = InvalidRepOriginId;
|
||||
replorigin_session_origin = InvalidReplOriginId;
|
||||
replorigin_session_origin_lsn = InvalidXLogRecPtr;
|
||||
replorigin_session_origin_timestamp = 0;
|
||||
|
||||
|
|
@ -1226,7 +1226,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
|
|||
AclResult aclresult;
|
||||
WalRcvExecResult *res;
|
||||
char originname[NAMEDATALEN];
|
||||
RepOriginId originid;
|
||||
ReplOriginId originid;
|
||||
UserContext ucxt;
|
||||
bool must_use_password;
|
||||
bool run_as_owner;
|
||||
|
|
|
|||
|
|
@ -608,7 +608,7 @@ static bool FindDeletedTupleInLocalRel(Relation localrel,
|
|||
Oid localidxoid,
|
||||
TupleTableSlot *remoteslot,
|
||||
TransactionId *delete_xid,
|
||||
RepOriginId *delete_origin,
|
||||
ReplOriginId *delete_origin,
|
||||
TimestampTz *delete_time);
|
||||
static void apply_handle_tuple_routing(ApplyExecutionData *edata,
|
||||
TupleTableSlot *remoteslot,
|
||||
|
|
@ -3268,7 +3268,7 @@ IsIndexUsableForFindingDeletedTuple(Oid localindexoid,
|
|||
static bool
|
||||
FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
|
||||
TupleTableSlot *remoteslot,
|
||||
TransactionId *delete_xid, RepOriginId *delete_origin,
|
||||
TransactionId *delete_xid, ReplOriginId *delete_origin,
|
||||
TimestampTz *delete_time)
|
||||
{
|
||||
TransactionId oldestxmin;
|
||||
|
|
@ -5627,7 +5627,7 @@ run_apply_worker(void)
|
|||
XLogRecPtr origin_startpos = InvalidXLogRecPtr;
|
||||
char *slotname = NULL;
|
||||
WalRcvStreamOptions options;
|
||||
RepOriginId originid;
|
||||
ReplOriginId originid;
|
||||
TimeLineID startpointTLI;
|
||||
char *err;
|
||||
bool must_use_password;
|
||||
|
|
@ -5874,7 +5874,7 @@ InitializeLogRepWorker(void)
|
|||
static void
|
||||
replorigin_reset(int code, Datum arg)
|
||||
{
|
||||
replorigin_session_origin = InvalidRepOriginId;
|
||||
replorigin_session_origin = InvalidReplOriginId;
|
||||
replorigin_session_origin_lsn = InvalidXLogRecPtr;
|
||||
replorigin_session_origin_timestamp = 0;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
|
|||
bool transactional, const char *prefix,
|
||||
Size sz, const char *message);
|
||||
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
|
||||
RepOriginId origin_id);
|
||||
ReplOriginId origin_id);
|
||||
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn);
|
||||
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
|
||||
|
|
@ -89,7 +89,7 @@ static List *LoadPublications(List *pubnames);
|
|||
static void publication_invalidation_cb(Datum arg, int cacheid,
|
||||
uint32 hashvalue);
|
||||
static void send_repl_origin(LogicalDecodingContext *ctx,
|
||||
RepOriginId origin_id, XLogRecPtr origin_lsn,
|
||||
ReplOriginId origin_id, XLogRecPtr origin_lsn,
|
||||
bool send_origin);
|
||||
|
||||
/*
|
||||
|
|
@ -609,7 +609,7 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
|
|||
static void
|
||||
pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
|
||||
{
|
||||
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
|
||||
bool send_replication_origin = txn->origin_id != InvalidReplOriginId;
|
||||
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
|
||||
|
||||
Assert(txndata);
|
||||
|
|
@ -663,7 +663,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
static void
|
||||
pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
|
||||
{
|
||||
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
|
||||
bool send_replication_origin = txn->origin_id != InvalidReplOriginId;
|
||||
|
||||
OutputPluginPrepareWrite(ctx, !send_replication_origin);
|
||||
logicalrep_write_begin_prepare(ctx->out, txn);
|
||||
|
|
@ -1767,11 +1767,11 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
*/
|
||||
static bool
|
||||
pgoutput_origin_filter(LogicalDecodingContext *ctx,
|
||||
RepOriginId origin_id)
|
||||
ReplOriginId origin_id)
|
||||
{
|
||||
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
|
||||
|
||||
if (data->publish_no_origin && origin_id != InvalidRepOriginId)
|
||||
if (data->publish_no_origin && origin_id != InvalidReplOriginId)
|
||||
return true;
|
||||
|
||||
return false;
|
||||
|
|
@ -1841,7 +1841,7 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx,
|
|||
ReorderBufferTXN *txn)
|
||||
{
|
||||
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
|
||||
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
|
||||
bool send_replication_origin = txn->origin_id != InvalidReplOriginId;
|
||||
|
||||
/* we can't nest streaming of transactions */
|
||||
Assert(!data->in_streaming);
|
||||
|
|
@ -2457,7 +2457,7 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
|
|||
|
||||
/* Send Replication origin */
|
||||
static void
|
||||
send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
|
||||
send_repl_origin(LogicalDecodingContext *ctx, ReplOriginId origin_id,
|
||||
XLogRecPtr origin_lsn, bool send_origin)
|
||||
{
|
||||
if (send_origin)
|
||||
|
|
|
|||
|
|
@ -372,7 +372,7 @@ binary_upgrade_replorigin_advance(PG_FUNCTION_ARGS)
|
|||
Oid subid;
|
||||
char *subname;
|
||||
char originname[NAMEDATALEN];
|
||||
RepOriginId node;
|
||||
ReplOriginId node;
|
||||
XLogRecPtr remote_commit;
|
||||
|
||||
CHECK_IS_BINARY_UPGRADE;
|
||||
|
|
|
|||
|
|
@ -1103,7 +1103,7 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
|
|||
bool failed = false;
|
||||
|
||||
int max_lrworkers;
|
||||
int max_reporigins;
|
||||
int max_replorigins;
|
||||
int max_wprocs;
|
||||
|
||||
pg_log_info("checking settings on subscriber");
|
||||
|
|
@ -1142,7 +1142,7 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
|
|||
disconnect_database(conn, true);
|
||||
}
|
||||
|
||||
max_reporigins = atoi(PQgetvalue(res, 0, 0));
|
||||
max_replorigins = atoi(PQgetvalue(res, 0, 0));
|
||||
max_lrworkers = atoi(PQgetvalue(res, 1, 0));
|
||||
max_wprocs = atoi(PQgetvalue(res, 2, 0));
|
||||
if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
|
||||
|
|
@ -1150,7 +1150,7 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
|
|||
|
||||
pg_log_debug("subscriber: max_logical_replication_workers: %d",
|
||||
max_lrworkers);
|
||||
pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);
|
||||
pg_log_debug("subscriber: max_active_replication_origins: %d", max_replorigins);
|
||||
pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
|
||||
if (primary_slot_name)
|
||||
pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
|
||||
|
|
@ -1159,10 +1159,10 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
|
|||
|
||||
disconnect_database(conn, false);
|
||||
|
||||
if (max_reporigins < num_dbs)
|
||||
if (max_replorigins < num_dbs)
|
||||
{
|
||||
pg_log_error("subscriber requires %d active replication origins, but only %d remain",
|
||||
num_dbs, max_reporigins);
|
||||
num_dbs, max_replorigins);
|
||||
pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
|
||||
"max_active_replication_origins", num_dbs);
|
||||
failed = true;
|
||||
|
|
|
|||
|
|
@ -175,9 +175,9 @@ $old_sub->safe_psql('postgres',
|
|||
);
|
||||
my $sub_oid = $old_sub->safe_psql('postgres',
|
||||
"SELECT oid FROM pg_subscription WHERE subname = 'regress_sub3'");
|
||||
my $reporigin = 'pg_' . qq($sub_oid);
|
||||
my $replorigin = 'pg_' . qq($sub_oid);
|
||||
$old_sub->safe_psql('postgres',
|
||||
"SELECT pg_replication_origin_drop('$reporigin')");
|
||||
"SELECT pg_replication_origin_drop('$replorigin')");
|
||||
|
||||
$old_sub->stop;
|
||||
|
||||
|
|
|
|||
|
|
@ -21,11 +21,11 @@ extern PGDLLIMPORT bool track_commit_timestamp;
|
|||
|
||||
extern void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
|
||||
TransactionId *subxids, TimestampTz timestamp,
|
||||
RepOriginId nodeid);
|
||||
ReplOriginId nodeid);
|
||||
extern bool TransactionIdGetCommitTsData(TransactionId xid,
|
||||
TimestampTz *ts, RepOriginId *nodeid);
|
||||
TimestampTz *ts, ReplOriginId *nodeid);
|
||||
extern TransactionId GetLatestCommitTsData(TimestampTz *ts,
|
||||
RepOriginId *nodeid);
|
||||
ReplOriginId *nodeid);
|
||||
|
||||
extern Size CommitTsShmemSize(void);
|
||||
extern void CommitTsShmemInit(void);
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ extern void FinishPreparedTransaction(const char *gid, bool isCommit);
|
|||
|
||||
extern void PrepareRedoAdd(FullTransactionId fxid, char *buf,
|
||||
XLogRecPtr start_lsn, XLogRecPtr end_lsn,
|
||||
RepOriginId origin_id);
|
||||
ReplOriginId origin_id);
|
||||
extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
|
||||
extern void restoreTwoPhaseData(void);
|
||||
extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
|
||||
|
|
|
|||
|
|
@ -66,7 +66,7 @@ typedef uint32 TimeLineID;
|
|||
* Replication origin id - this is located in this file to avoid having to
|
||||
* include origin.h in a bunch of xlog related places.
|
||||
*/
|
||||
typedef uint16 RepOriginId;
|
||||
typedef uint16 ReplOriginId;
|
||||
|
||||
/*
|
||||
* This chunk of hackery attempts to determine which file sync methods
|
||||
|
|
|
|||
|
|
@ -163,7 +163,7 @@ typedef struct DecodedXLogRecord
|
|||
XLogRecPtr lsn; /* location */
|
||||
XLogRecPtr next_lsn; /* location of next record */
|
||||
XLogRecord header; /* header */
|
||||
RepOriginId record_origin;
|
||||
ReplOriginId record_origin;
|
||||
TransactionId toplevel_xid; /* XID of top-level transaction */
|
||||
char *main_data; /* record's main data portion */
|
||||
uint32 main_data_len; /* main data portion's length */
|
||||
|
|
|
|||
|
|
@ -769,13 +769,13 @@ extern bool RelationFindDeletedTupleInfoSeq(Relation rel,
|
|||
TupleTableSlot *searchslot,
|
||||
TransactionId oldestxmin,
|
||||
TransactionId *delete_xid,
|
||||
RepOriginId *delete_origin,
|
||||
ReplOriginId *delete_origin,
|
||||
TimestampTz *delete_time);
|
||||
extern bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
|
||||
TupleTableSlot *searchslot,
|
||||
TransactionId oldestxmin,
|
||||
TransactionId *delete_xid,
|
||||
RepOriginId *delete_origin,
|
||||
ReplOriginId *delete_origin,
|
||||
TimestampTz *delete_time);
|
||||
extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
|
||||
EState *estate, TupleTableSlot *slot);
|
||||
|
|
|
|||
|
|
@ -74,14 +74,14 @@ typedef struct ConflictTupleInfo
|
|||
* occurred */
|
||||
TransactionId xmin; /* transaction ID of the modification causing
|
||||
* the conflict */
|
||||
RepOriginId origin; /* origin identifier of the modification */
|
||||
ReplOriginId origin; /* origin identifier of the modification */
|
||||
TimestampTz ts; /* timestamp of when the modification on the
|
||||
* conflicting local row occurred */
|
||||
} ConflictTupleInfo;
|
||||
|
||||
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
|
||||
TransactionId *xmin,
|
||||
RepOriginId *localorigin,
|
||||
ReplOriginId *localorigin,
|
||||
TimestampTz *localts);
|
||||
extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
|
||||
int elevel, ConflictType type,
|
||||
|
|
|
|||
|
|
@ -144,7 +144,7 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
|
|||
|
||||
extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
|
||||
TransactionId xid, const char *gid);
|
||||
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
|
||||
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, ReplOriginId origin_id);
|
||||
extern void ResetLogicalStreamingState(void);
|
||||
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
|
||||
|
||||
|
|
|
|||
|
|
@ -18,19 +18,19 @@
|
|||
typedef struct xl_replorigin_set
|
||||
{
|
||||
XLogRecPtr remote_lsn;
|
||||
RepOriginId node_id;
|
||||
ReplOriginId node_id;
|
||||
bool force;
|
||||
} xl_replorigin_set;
|
||||
|
||||
typedef struct xl_replorigin_drop
|
||||
{
|
||||
RepOriginId node_id;
|
||||
ReplOriginId node_id;
|
||||
} xl_replorigin_drop;
|
||||
|
||||
#define XLOG_REPLORIGIN_SET 0x00
|
||||
#define XLOG_REPLORIGIN_DROP 0x10
|
||||
|
||||
#define InvalidRepOriginId 0
|
||||
#define InvalidReplOriginId 0
|
||||
#define DoNotReplicateId PG_UINT16_MAX
|
||||
|
||||
/*
|
||||
|
|
@ -40,7 +40,7 @@ typedef struct xl_replorigin_drop
|
|||
*/
|
||||
#define MAX_RONAME_LEN 512
|
||||
|
||||
extern PGDLLIMPORT RepOriginId replorigin_session_origin;
|
||||
extern PGDLLIMPORT ReplOriginId replorigin_session_origin;
|
||||
extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn;
|
||||
extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
|
||||
|
||||
|
|
@ -48,22 +48,22 @@ extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
|
|||
extern PGDLLIMPORT int max_active_replication_origins;
|
||||
|
||||
/* API for querying & manipulating replication origins */
|
||||
extern RepOriginId replorigin_by_name(const char *roname, bool missing_ok);
|
||||
extern RepOriginId replorigin_create(const char *roname);
|
||||
extern ReplOriginId replorigin_by_name(const char *roname, bool missing_ok);
|
||||
extern ReplOriginId replorigin_create(const char *roname);
|
||||
extern void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait);
|
||||
extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok,
|
||||
extern bool replorigin_by_oid(ReplOriginId roident, bool missing_ok,
|
||||
char **roname);
|
||||
|
||||
/* API for querying & manipulating replication progress tracking */
|
||||
extern void replorigin_advance(RepOriginId node,
|
||||
extern void replorigin_advance(ReplOriginId node,
|
||||
XLogRecPtr remote_commit,
|
||||
XLogRecPtr local_commit,
|
||||
bool go_backward, bool wal_log);
|
||||
extern XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush);
|
||||
extern XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush);
|
||||
|
||||
extern void replorigin_session_advance(XLogRecPtr remote_commit,
|
||||
XLogRecPtr local_commit);
|
||||
extern void replorigin_session_setup(RepOriginId node, int acquired_by);
|
||||
extern void replorigin_session_setup(ReplOriginId node, int acquired_by);
|
||||
extern void replorigin_session_reset(void);
|
||||
extern XLogRecPtr replorigin_session_get_progress(bool flush);
|
||||
|
||||
|
|
|
|||
|
|
@ -94,7 +94,7 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
|
|||
* Filter changes by origin.
|
||||
*/
|
||||
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
|
||||
RepOriginId origin_id);
|
||||
ReplOriginId origin_id);
|
||||
|
||||
/*
|
||||
* Called to shutdown an output plugin.
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ typedef struct ReorderBufferChange
|
|||
/* Transaction this change belongs to. */
|
||||
struct ReorderBufferTXN *txn;
|
||||
|
||||
RepOriginId origin_id;
|
||||
ReplOriginId origin_id;
|
||||
|
||||
/*
|
||||
* Context data for the change. Which part of the union is valid depends
|
||||
|
|
@ -347,7 +347,7 @@ typedef struct ReorderBufferTXN
|
|||
XLogRecPtr restart_decoding_lsn;
|
||||
|
||||
/* origin of the change that caused this transaction */
|
||||
RepOriginId origin_id;
|
||||
ReplOriginId origin_id;
|
||||
XLogRecPtr origin_lsn;
|
||||
|
||||
/*
|
||||
|
|
@ -724,12 +724,12 @@ extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
|
|||
Size message_size, const char *message);
|
||||
extern void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
|
||||
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
|
||||
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
|
||||
TimestampTz commit_time, ReplOriginId origin_id, XLogRecPtr origin_lsn);
|
||||
extern void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
|
||||
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
|
||||
XLogRecPtr two_phase_at,
|
||||
TimestampTz commit_time,
|
||||
RepOriginId origin_id, XLogRecPtr origin_lsn,
|
||||
ReplOriginId origin_id, XLogRecPtr origin_lsn,
|
||||
char *gid, bool is_commit);
|
||||
extern void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
|
||||
TransactionId subxid, XLogRecPtr lsn);
|
||||
|
|
@ -768,7 +768,7 @@ extern bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid
|
|||
extern bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
|
||||
XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
|
||||
TimestampTz prepare_time,
|
||||
RepOriginId origin_id, XLogRecPtr origin_lsn);
|
||||
ReplOriginId origin_id, XLogRecPtr origin_lsn);
|
||||
extern void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid);
|
||||
extern void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid);
|
||||
extern ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *rb);
|
||||
|
|
|
|||
|
|
@ -2575,8 +2575,8 @@ ReorderBufferTupleCidEnt
|
|||
ReorderBufferTupleCidKey
|
||||
ReorderBufferUpdateProgressTxnCB
|
||||
ReorderTuple
|
||||
RepOriginId
|
||||
ReparameterizeForeignPathByChild_function
|
||||
ReplOriginId
|
||||
ReplaceVarsFromTargetList_context
|
||||
ReplaceVarsNoMatchOption
|
||||
ReplaceWrapOption
|
||||
|
|
|
|||
Loading…
Reference in a new issue