2017-01-19 12:00:00 -05:00
|
|
|
/*-------------------------------------------------------------------------
|
|
|
|
|
*
|
|
|
|
|
* worker_internal.h
|
|
|
|
|
* Internal headers shared by logical replication workers.
|
|
|
|
|
*
|
2022-01-07 19:04:57 -05:00
|
|
|
* Portions Copyright (c) 2016-2022, PostgreSQL Global Development Group
|
2017-01-19 12:00:00 -05:00
|
|
|
*
|
|
|
|
|
* src/include/replication/worker_internal.h
|
|
|
|
|
*
|
|
|
|
|
*-------------------------------------------------------------------------
|
|
|
|
|
*/
|
|
|
|
|
#ifndef WORKER_INTERNAL_H
|
|
|
|
|
#define WORKER_INTERNAL_H
|
|
|
|
|
|
2017-04-13 21:47:24 -04:00
|
|
|
#include <signal.h>
|
|
|
|
|
|
2017-01-21 15:49:53 -05:00
|
|
|
#include "access/xlogdefs.h"
|
2017-01-19 12:00:00 -05:00
|
|
|
#include "catalog/pg_subscription.h"
|
2017-01-21 15:49:53 -05:00
|
|
|
#include "datatype/timestamp.h"
|
2021-11-16 11:30:37 -05:00
|
|
|
#include "storage/fileset.h"
|
2017-01-19 12:00:00 -05:00
|
|
|
#include "storage/lock.h"
|
2020-07-18 14:58:18 -04:00
|
|
|
#include "storage/spin.h"
|
|
|
|
|
|
2017-01-19 12:00:00 -05:00
|
|
|
|
|
|
|
|
typedef struct LogicalRepWorker
|
|
|
|
|
{
|
2017-04-26 10:43:04 -04:00
|
|
|
/* Time at which this worker was launched. */
|
|
|
|
|
TimestampTz launch_time;
|
|
|
|
|
|
|
|
|
|
/* Indicates if this slot is used or free. */
|
|
|
|
|
bool in_use;
|
|
|
|
|
|
2018-11-02 08:56:16 -04:00
|
|
|
/* Increased every time the slot is taken by new worker. */
|
2017-04-26 10:43:04 -04:00
|
|
|
uint16 generation;
|
|
|
|
|
|
2017-01-19 12:00:00 -05:00
|
|
|
/* Pointer to proc array. NULL if not running. */
|
|
|
|
|
PGPROC *proc;
|
|
|
|
|
|
|
|
|
|
/* Database id to connect to. */
|
|
|
|
|
Oid dbid;
|
|
|
|
|
|
|
|
|
|
/* User to use for connection (will be same as owner of subscription). */
|
|
|
|
|
Oid userid;
|
|
|
|
|
|
|
|
|
|
/* Subscription id for the worker. */
|
|
|
|
|
Oid subid;
|
|
|
|
|
|
|
|
|
|
/* Used for initial table synchronization. */
|
|
|
|
|
Oid relid;
|
2017-03-23 08:36:36 -04:00
|
|
|
char relstate;
|
|
|
|
|
XLogRecPtr relstate_lsn;
|
|
|
|
|
slock_t relmutex;
|
2017-01-19 12:00:00 -05:00
|
|
|
|
2021-09-01 22:43:46 -04:00
|
|
|
/*
|
|
|
|
|
* Used to create the changes and subxact files for the streaming
|
|
|
|
|
* transactions. Upon the arrival of the first streaming transaction, the
|
|
|
|
|
* fileset will be initialized, and it will be deleted when the worker
|
|
|
|
|
* exits. Under this, separate buffiles would be created for each
|
|
|
|
|
* transaction which will be deleted after the transaction is finished.
|
|
|
|
|
*/
|
|
|
|
|
FileSet *stream_fileset;
|
|
|
|
|
|
2017-01-19 12:00:00 -05:00
|
|
|
/* Stats. */
|
|
|
|
|
XLogRecPtr last_lsn;
|
|
|
|
|
TimestampTz last_send_time;
|
|
|
|
|
TimestampTz last_recv_time;
|
|
|
|
|
XLogRecPtr reply_lsn;
|
|
|
|
|
TimestampTz reply_time;
|
|
|
|
|
} LogicalRepWorker;
|
|
|
|
|
|
2017-05-09 14:40:42 -04:00
|
|
|
/* Main memory context for apply worker. Permanent during worker lifetime. */
|
|
|
|
|
extern MemoryContext ApplyContext;
|
2017-03-23 08:36:36 -04:00
|
|
|
|
2017-01-19 12:00:00 -05:00
|
|
|
/* libpqreceiver connection */
|
2021-05-12 19:13:54 -04:00
|
|
|
extern struct WalReceiverConn *LogRepWorkerWalRcvConn;
|
2017-01-19 12:00:00 -05:00
|
|
|
|
|
|
|
|
/* Worker and subscription objects. */
|
|
|
|
|
extern Subscription *MySubscription;
|
|
|
|
|
extern LogicalRepWorker *MyLogicalRepWorker;
|
|
|
|
|
|
|
|
|
|
extern bool in_remote_transaction;
|
|
|
|
|
|
|
|
|
|
extern void logicalrep_worker_attach(int slot);
|
2017-03-23 08:36:36 -04:00
|
|
|
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
|
|
|
|
|
bool only_running);
|
2017-08-04 21:14:35 -04:00
|
|
|
extern List *logicalrep_workers_find(Oid subid, bool only_running);
|
2017-03-23 08:36:36 -04:00
|
|
|
extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
|
|
|
|
|
Oid userid, Oid relid);
|
|
|
|
|
extern void logicalrep_worker_stop(Oid subid, Oid relid);
|
|
|
|
|
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
|
|
|
|
|
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
|
|
|
|
|
|
|
|
|
|
extern int logicalrep_sync_worker_count(Oid subid);
|
2017-01-19 12:00:00 -05:00
|
|
|
|
2021-02-14 20:58:02 -05:00
|
|
|
extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
|
|
|
|
|
char *originname, int szorgname);
|
2017-03-23 08:36:36 -04:00
|
|
|
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
|
Allow multiple xacts during table sync in logical replication.
For the initial table data synchronization in logical replication, we use
a single transaction to copy the entire table and then synchronize the
position in the stream with the main apply worker.
There are multiple downsides of this approach: (a) We have to perform the
entire copy operation again if there is any error (network breakdown,
error in the database operation, etc.) while we synchronize the WAL
position between tablesync worker and apply worker; this will be onerous
especially for large copies, (b) Using a single transaction in the
synchronization-phase (where we can receive WAL from multiple
transactions) will have the risk of exceeding the CID limit, (c) The slot
will hold the WAL till the entire sync is complete because we never commit
till the end.
This patch solves all the above downsides by allowing multiple
transactions during the tablesync phase. The initial copy is done in a
single transaction and after that, we commit each transaction as we
receive. To allow recovery after any error or crash, we use a permanent
slot and origin to track the progress. The slot and origin will be removed
once we finish the synchronization of the table. We also remove slot and
origin of tablesync workers if the user performs DROP SUBSCRIPTION .. or
ALTER SUBSCRIPTION .. REFERESH and some of the table syncs are still not
finished.
The commands ALTER SUBSCRIPTION ... REFRESH PUBLICATION and
ALTER SUBSCRIPTION ... SET PUBLICATION ... with refresh option as true
cannot be executed inside a transaction block because they can now drop
the slots for which we have no provision to rollback.
This will also open up the path for logical replication of 2PC
transactions on the subscriber side. Previously, we can't do that because
of the requirement of maintaining a single transaction in tablesync
workers.
Bump catalog version due to change of state in the catalog
(pg_subscription_rel).
Author: Peter Smith, Amit Kapila, and Takamichi Osumi
Reviewed-by: Ajin Cherian, Petr Jelinek, Hou Zhijie and Amit Kapila
Discussion: https://postgr.es/m/CAA4eK1KHJxaZS-fod-0fey=0tq3=Gkn4ho=8N4-5HWiCfu0H1A@mail.gmail.com
2021-02-11 21:11:51 -05:00
|
|
|
|
Add support for prepared transactions to built-in logical replication.
To add support for streaming transactions at prepare time into the
built-in logical replication, we need to do the following things:
* Modify the output plugin (pgoutput) to implement the new two-phase API
callbacks, by leveraging the extended replication protocol.
* Modify the replication apply worker, to properly handle two-phase
transactions by replaying them on prepare.
* Add a new SUBSCRIPTION option "two_phase" to allow users to enable
two-phase transactions. We enable the two_phase once the initial data sync
is over.
We however must explicitly disable replication of two-phase transactions
during replication slot creation, even if the plugin supports it. We
don't need to replicate the changes accumulated during this phase,
and moreover, we don't have a replication connection open so we don't know
where to send the data anyway.
The streaming option is not allowed with this new two_phase option. This
can be done as a separate patch.
We don't allow to toggle two_phase option of a subscription because it can
lead to an inconsistent replica. For the same reason, we don't allow to
refresh the publication once the two_phase is enabled for a subscription
unless copy_data option is false.
Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow
Tested-By: Haiying Tang
Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
2021-07-13 22:03:50 -04:00
|
|
|
extern bool AllTablesyncsReady(void);
|
|
|
|
|
extern void UpdateTwoPhaseState(Oid suboid, char new_state);
|
|
|
|
|
|
2017-03-23 08:36:36 -04:00
|
|
|
void process_syncing_tables(XLogRecPtr current_lsn);
|
|
|
|
|
void invalidate_syncing_table_states(Datum arg, int cacheid,
|
|
|
|
|
uint32 hashvalue);
|
|
|
|
|
|
|
|
|
|
static inline bool
|
|
|
|
|
am_tablesync_worker(void)
|
|
|
|
|
{
|
|
|
|
|
return OidIsValid(MyLogicalRepWorker->relid);
|
|
|
|
|
}
|
2017-01-19 12:00:00 -05:00
|
|
|
|
|
|
|
|
#endif /* WORKER_INTERNAL_H */
|