mirror of
https://github.com/postgres/postgres.git
synced 2026-05-25 02:40:42 -04:00
Also "make reformat-dat-files". The only change worthy of note is that pgindent messed up the formatting of launcher.c's struct LogicalRepWorkerId, which led me to notice that that struct wasn't used at all anymore, so I just took it out.
564 lines
14 KiB
C
564 lines
14 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* pg_subscription.c
|
|
* replication subscriptions
|
|
*
|
|
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/catalog/pg_subscription.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "access/genam.h"
|
|
#include "access/heapam.h"
|
|
#include "access/htup_details.h"
|
|
#include "access/tableam.h"
|
|
#include "access/xact.h"
|
|
#include "catalog/indexing.h"
|
|
#include "catalog/pg_subscription.h"
|
|
#include "catalog/pg_subscription_rel.h"
|
|
#include "catalog/pg_type.h"
|
|
#include "miscadmin.h"
|
|
#include "nodes/makefuncs.h"
|
|
#include "storage/lmgr.h"
|
|
#include "utils/array.h"
|
|
#include "utils/builtins.h"
|
|
#include "utils/fmgroids.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "utils/pg_lsn.h"
|
|
#include "utils/rel.h"
|
|
#include "utils/syscache.h"
|
|
|
|
static List *textarray_to_stringlist(ArrayType *textarray);
|
|
|
|
/*
|
|
* Fetch the subscription from the syscache.
|
|
*/
|
|
Subscription *
|
|
GetSubscription(Oid subid, bool missing_ok)
|
|
{
|
|
HeapTuple tup;
|
|
Subscription *sub;
|
|
Form_pg_subscription subform;
|
|
Datum datum;
|
|
bool isnull;
|
|
|
|
tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
|
|
|
|
if (!HeapTupleIsValid(tup))
|
|
{
|
|
if (missing_ok)
|
|
return NULL;
|
|
|
|
elog(ERROR, "cache lookup failed for subscription %u", subid);
|
|
}
|
|
|
|
subform = (Form_pg_subscription) GETSTRUCT(tup);
|
|
|
|
sub = (Subscription *) palloc(sizeof(Subscription));
|
|
sub->oid = subid;
|
|
sub->dbid = subform->subdbid;
|
|
sub->name = pstrdup(NameStr(subform->subname));
|
|
sub->owner = subform->subowner;
|
|
sub->enabled = subform->subenabled;
|
|
sub->binary = subform->subbinary;
|
|
sub->stream = subform->substream;
|
|
|
|
/* Get conninfo */
|
|
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
|
|
tup,
|
|
Anum_pg_subscription_subconninfo,
|
|
&isnull);
|
|
Assert(!isnull);
|
|
sub->conninfo = TextDatumGetCString(datum);
|
|
|
|
/* Get slotname */
|
|
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
|
|
tup,
|
|
Anum_pg_subscription_subslotname,
|
|
&isnull);
|
|
if (!isnull)
|
|
sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
|
|
else
|
|
sub->slotname = NULL;
|
|
|
|
/* Get synccommit */
|
|
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
|
|
tup,
|
|
Anum_pg_subscription_subsynccommit,
|
|
&isnull);
|
|
Assert(!isnull);
|
|
sub->synccommit = TextDatumGetCString(datum);
|
|
|
|
/* Get publications */
|
|
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
|
|
tup,
|
|
Anum_pg_subscription_subpublications,
|
|
&isnull);
|
|
Assert(!isnull);
|
|
sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
|
|
|
|
ReleaseSysCache(tup);
|
|
|
|
return sub;
|
|
}
|
|
|
|
/*
|
|
* Return number of subscriptions defined in given database.
|
|
* Used by dropdb() to check if database can indeed be dropped.
|
|
*/
|
|
int
|
|
CountDBSubscriptions(Oid dbid)
|
|
{
|
|
int nsubs = 0;
|
|
Relation rel;
|
|
ScanKeyData scankey;
|
|
SysScanDesc scan;
|
|
HeapTuple tup;
|
|
|
|
rel = table_open(SubscriptionRelationId, RowExclusiveLock);
|
|
|
|
ScanKeyInit(&scankey,
|
|
Anum_pg_subscription_subdbid,
|
|
BTEqualStrategyNumber, F_OIDEQ,
|
|
ObjectIdGetDatum(dbid));
|
|
|
|
scan = systable_beginscan(rel, InvalidOid, false,
|
|
NULL, 1, &scankey);
|
|
|
|
while (HeapTupleIsValid(tup = systable_getnext(scan)))
|
|
nsubs++;
|
|
|
|
systable_endscan(scan);
|
|
|
|
table_close(rel, NoLock);
|
|
|
|
return nsubs;
|
|
}
|
|
|
|
/*
|
|
* Free memory allocated by subscription struct.
|
|
*/
|
|
void
|
|
FreeSubscription(Subscription *sub)
|
|
{
|
|
pfree(sub->name);
|
|
pfree(sub->conninfo);
|
|
if (sub->slotname)
|
|
pfree(sub->slotname);
|
|
list_free_deep(sub->publications);
|
|
pfree(sub);
|
|
}
|
|
|
|
/*
|
|
* get_subscription_oid - given a subscription name, look up the OID
|
|
*
|
|
* If missing_ok is false, throw an error if name not found. If true, just
|
|
* return InvalidOid.
|
|
*/
|
|
Oid
|
|
get_subscription_oid(const char *subname, bool missing_ok)
|
|
{
|
|
Oid oid;
|
|
|
|
oid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
|
|
MyDatabaseId, CStringGetDatum(subname));
|
|
if (!OidIsValid(oid) && !missing_ok)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_UNDEFINED_OBJECT),
|
|
errmsg("subscription \"%s\" does not exist", subname)));
|
|
return oid;
|
|
}
|
|
|
|
/*
|
|
* get_subscription_name - given a subscription OID, look up the name
|
|
*
|
|
* If missing_ok is false, throw an error if name not found. If true, just
|
|
* return NULL.
|
|
*/
|
|
char *
|
|
get_subscription_name(Oid subid, bool missing_ok)
|
|
{
|
|
HeapTuple tup;
|
|
char *subname;
|
|
Form_pg_subscription subform;
|
|
|
|
tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
|
|
|
|
if (!HeapTupleIsValid(tup))
|
|
{
|
|
if (!missing_ok)
|
|
elog(ERROR, "cache lookup failed for subscription %u", subid);
|
|
return NULL;
|
|
}
|
|
|
|
subform = (Form_pg_subscription) GETSTRUCT(tup);
|
|
subname = pstrdup(NameStr(subform->subname));
|
|
|
|
ReleaseSysCache(tup);
|
|
|
|
return subname;
|
|
}
|
|
|
|
/*
|
|
* Convert text array to list of strings.
|
|
*
|
|
* Note: the resulting list of strings is pallocated here.
|
|
*/
|
|
static List *
|
|
textarray_to_stringlist(ArrayType *textarray)
|
|
{
|
|
Datum *elems;
|
|
int nelems,
|
|
i;
|
|
List *res = NIL;
|
|
|
|
deconstruct_array(textarray,
|
|
TEXTOID, -1, false, TYPALIGN_INT,
|
|
&elems, NULL, &nelems);
|
|
|
|
if (nelems == 0)
|
|
return NIL;
|
|
|
|
for (i = 0; i < nelems; i++)
|
|
res = lappend(res, makeString(TextDatumGetCString(elems[i])));
|
|
|
|
return res;
|
|
}
|
|
|
|
/*
|
|
* Add new state record for a subscription table.
|
|
*/
|
|
void
|
|
AddSubscriptionRelState(Oid subid, Oid relid, char state,
|
|
XLogRecPtr sublsn)
|
|
{
|
|
Relation rel;
|
|
HeapTuple tup;
|
|
bool nulls[Natts_pg_subscription_rel];
|
|
Datum values[Natts_pg_subscription_rel];
|
|
|
|
LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
|
|
|
|
rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
|
|
|
|
/* Try finding existing mapping. */
|
|
tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
|
|
ObjectIdGetDatum(relid),
|
|
ObjectIdGetDatum(subid));
|
|
if (HeapTupleIsValid(tup))
|
|
elog(ERROR, "subscription table %u in subscription %u already exists",
|
|
relid, subid);
|
|
|
|
/* Form the tuple. */
|
|
memset(values, 0, sizeof(values));
|
|
memset(nulls, false, sizeof(nulls));
|
|
values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
|
|
values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
|
|
values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
|
|
if (sublsn != InvalidXLogRecPtr)
|
|
values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
|
|
else
|
|
nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
|
|
|
|
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
|
|
|
|
/* Insert tuple into catalog. */
|
|
CatalogTupleInsert(rel, tup);
|
|
|
|
heap_freetuple(tup);
|
|
|
|
/* Cleanup. */
|
|
table_close(rel, NoLock);
|
|
}
|
|
|
|
/*
|
|
* Update the state of a subscription table.
|
|
*/
|
|
void
|
|
UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
|
|
XLogRecPtr sublsn)
|
|
{
|
|
Relation rel;
|
|
HeapTuple tup;
|
|
bool nulls[Natts_pg_subscription_rel];
|
|
Datum values[Natts_pg_subscription_rel];
|
|
bool replaces[Natts_pg_subscription_rel];
|
|
|
|
LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
|
|
|
|
rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
|
|
|
|
/* Try finding existing mapping. */
|
|
tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
|
|
ObjectIdGetDatum(relid),
|
|
ObjectIdGetDatum(subid));
|
|
if (!HeapTupleIsValid(tup))
|
|
elog(ERROR, "subscription table %u in subscription %u does not exist",
|
|
relid, subid);
|
|
|
|
/* Update the tuple. */
|
|
memset(values, 0, sizeof(values));
|
|
memset(nulls, false, sizeof(nulls));
|
|
memset(replaces, false, sizeof(replaces));
|
|
|
|
replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
|
|
values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
|
|
|
|
replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
|
|
if (sublsn != InvalidXLogRecPtr)
|
|
values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
|
|
else
|
|
nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
|
|
|
|
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
|
|
replaces);
|
|
|
|
/* Update the catalog. */
|
|
CatalogTupleUpdate(rel, &tup->t_self, tup);
|
|
|
|
/* Cleanup. */
|
|
table_close(rel, NoLock);
|
|
}
|
|
|
|
/*
|
|
* Get state of subscription table.
|
|
*
|
|
* Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
|
|
*/
|
|
char
|
|
GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
|
|
{
|
|
HeapTuple tup;
|
|
char substate;
|
|
bool isnull;
|
|
Datum d;
|
|
Relation rel;
|
|
|
|
/*
|
|
* This is to avoid the race condition with AlterSubscription which tries
|
|
* to remove this relstate.
|
|
*/
|
|
rel = table_open(SubscriptionRelRelationId, AccessShareLock);
|
|
|
|
/* Try finding the mapping. */
|
|
tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
|
|
ObjectIdGetDatum(relid),
|
|
ObjectIdGetDatum(subid));
|
|
|
|
if (!HeapTupleIsValid(tup))
|
|
{
|
|
table_close(rel, AccessShareLock);
|
|
*sublsn = InvalidXLogRecPtr;
|
|
return SUBREL_STATE_UNKNOWN;
|
|
}
|
|
|
|
/* Get the state. */
|
|
substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
|
|
|
|
/* Get the LSN */
|
|
d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
|
|
Anum_pg_subscription_rel_srsublsn, &isnull);
|
|
if (isnull)
|
|
*sublsn = InvalidXLogRecPtr;
|
|
else
|
|
*sublsn = DatumGetLSN(d);
|
|
|
|
/* Cleanup */
|
|
ReleaseSysCache(tup);
|
|
|
|
table_close(rel, AccessShareLock);
|
|
|
|
return substate;
|
|
}
|
|
|
|
/*
|
|
* Drop subscription relation mapping. These can be for a particular
|
|
* subscription, or for a particular relation, or both.
|
|
*/
|
|
void
|
|
RemoveSubscriptionRel(Oid subid, Oid relid)
|
|
{
|
|
Relation rel;
|
|
TableScanDesc scan;
|
|
ScanKeyData skey[2];
|
|
HeapTuple tup;
|
|
int nkeys = 0;
|
|
|
|
rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
|
|
|
|
if (OidIsValid(subid))
|
|
{
|
|
ScanKeyInit(&skey[nkeys++],
|
|
Anum_pg_subscription_rel_srsubid,
|
|
BTEqualStrategyNumber,
|
|
F_OIDEQ,
|
|
ObjectIdGetDatum(subid));
|
|
}
|
|
|
|
if (OidIsValid(relid))
|
|
{
|
|
ScanKeyInit(&skey[nkeys++],
|
|
Anum_pg_subscription_rel_srrelid,
|
|
BTEqualStrategyNumber,
|
|
F_OIDEQ,
|
|
ObjectIdGetDatum(relid));
|
|
}
|
|
|
|
/* Do the search and delete what we found. */
|
|
scan = table_beginscan_catalog(rel, nkeys, skey);
|
|
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
|
|
{
|
|
Form_pg_subscription_rel subrel;
|
|
|
|
subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
|
|
|
|
/*
|
|
* We don't allow to drop the relation mapping when the table
|
|
* synchronization is in progress unless the caller updates the
|
|
* corresponding subscription as well. This is to ensure that we don't
|
|
* leave tablesync slots or origins in the system when the
|
|
* corresponding table is dropped.
|
|
*/
|
|
if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
|
|
{
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("could not drop relation mapping for subscription \"%s\"",
|
|
get_subscription_name(subrel->srsubid, false)),
|
|
errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
|
|
get_rel_name(relid), subrel->srsubstate),
|
|
|
|
/*
|
|
* translator: first %s is a SQL ALTER command and second %s is a
|
|
* SQL DROP command
|
|
*/
|
|
errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
|
|
"ALTER SUBSCRIPTION ... ENABLE",
|
|
"DROP SUBSCRIPTION ...")));
|
|
}
|
|
|
|
CatalogTupleDelete(rel, &tup->t_self);
|
|
}
|
|
table_endscan(scan);
|
|
|
|
table_close(rel, RowExclusiveLock);
|
|
}
|
|
|
|
|
|
/*
|
|
* Get all relations for subscription.
|
|
*
|
|
* Returned list is palloc'ed in current memory context.
|
|
*/
|
|
List *
|
|
GetSubscriptionRelations(Oid subid)
|
|
{
|
|
List *res = NIL;
|
|
Relation rel;
|
|
HeapTuple tup;
|
|
ScanKeyData skey[1];
|
|
SysScanDesc scan;
|
|
|
|
rel = table_open(SubscriptionRelRelationId, AccessShareLock);
|
|
|
|
ScanKeyInit(&skey[0],
|
|
Anum_pg_subscription_rel_srsubid,
|
|
BTEqualStrategyNumber, F_OIDEQ,
|
|
ObjectIdGetDatum(subid));
|
|
|
|
scan = systable_beginscan(rel, InvalidOid, false,
|
|
NULL, 1, skey);
|
|
|
|
while (HeapTupleIsValid(tup = systable_getnext(scan)))
|
|
{
|
|
Form_pg_subscription_rel subrel;
|
|
SubscriptionRelState *relstate;
|
|
Datum d;
|
|
bool isnull;
|
|
|
|
subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
|
|
|
|
relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
|
|
relstate->relid = subrel->srrelid;
|
|
relstate->state = subrel->srsubstate;
|
|
d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
|
|
Anum_pg_subscription_rel_srsublsn, &isnull);
|
|
if (isnull)
|
|
relstate->lsn = InvalidXLogRecPtr;
|
|
else
|
|
relstate->lsn = DatumGetLSN(d);
|
|
|
|
res = lappend(res, relstate);
|
|
}
|
|
|
|
/* Cleanup */
|
|
systable_endscan(scan);
|
|
table_close(rel, AccessShareLock);
|
|
|
|
return res;
|
|
}
|
|
|
|
/*
|
|
* Get all relations for subscription that are not in a ready state.
|
|
*
|
|
* Returned list is palloc'ed in current memory context.
|
|
*/
|
|
List *
|
|
GetSubscriptionNotReadyRelations(Oid subid)
|
|
{
|
|
List *res = NIL;
|
|
Relation rel;
|
|
HeapTuple tup;
|
|
int nkeys = 0;
|
|
ScanKeyData skey[2];
|
|
SysScanDesc scan;
|
|
|
|
rel = table_open(SubscriptionRelRelationId, AccessShareLock);
|
|
|
|
ScanKeyInit(&skey[nkeys++],
|
|
Anum_pg_subscription_rel_srsubid,
|
|
BTEqualStrategyNumber, F_OIDEQ,
|
|
ObjectIdGetDatum(subid));
|
|
|
|
ScanKeyInit(&skey[nkeys++],
|
|
Anum_pg_subscription_rel_srsubstate,
|
|
BTEqualStrategyNumber, F_CHARNE,
|
|
CharGetDatum(SUBREL_STATE_READY));
|
|
|
|
scan = systable_beginscan(rel, InvalidOid, false,
|
|
NULL, nkeys, skey);
|
|
|
|
while (HeapTupleIsValid(tup = systable_getnext(scan)))
|
|
{
|
|
Form_pg_subscription_rel subrel;
|
|
SubscriptionRelState *relstate;
|
|
Datum d;
|
|
bool isnull;
|
|
|
|
subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
|
|
|
|
relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
|
|
relstate->relid = subrel->srrelid;
|
|
relstate->state = subrel->srsubstate;
|
|
d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
|
|
Anum_pg_subscription_rel_srsublsn, &isnull);
|
|
if (isnull)
|
|
relstate->lsn = InvalidXLogRecPtr;
|
|
else
|
|
relstate->lsn = DatumGetLSN(d);
|
|
|
|
res = lappend(res, relstate);
|
|
}
|
|
|
|
/* Cleanup */
|
|
systable_endscan(scan);
|
|
table_close(rel, AccessShareLock);
|
|
|
|
return res;
|
|
}
|