CREATE SUBSCRIPTION ... SERVER.

Allow CREATE SUBSCRIPTION to accept a foreign server using the SERVER
clause instead of a raw connection string using the CONNECTION clause.

  * Enables a user with sufficient privileges to create a subscription
    using a foreign server by name without specifying the connection
    details.

  * Integrates with user mappings (and other FDW infrastructure) using
    the subscription owner.

  * Provides a layer of indirection to manage multiple subscriptions
    to the same remote server more easily.

Also add CREATE FOREIGN DATA WRAPPER ... CONNECTION clause to specify
a connection_function. To be eligible for a subscription, the foreign
server's foreign data wrapper must specify a connection_function.

Add connection_function support to postgres_fdw, and bump postgres_fdw
version to 1.3.

Bump catversion.

Reviewed-by: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
Reviewed-by: Shlok Kyal <shlok.kyal.oss@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/61831790a0a937038f78ce09f8dd4cef7de7456a.camel@j-davis.com
This commit is contained in:
Jeff Davis 2026-03-06 08:27:56 -08:00
parent 868825aaeb
commit 8185bb5347
36 changed files with 1075 additions and 255 deletions

View file

@ -14,7 +14,7 @@ PG_CPPFLAGS = -I$(libpq_srcdir)
SHLIB_LINK_INTERNAL = $(libpq) SHLIB_LINK_INTERNAL = $(libpq)
EXTENSION = postgres_fdw EXTENSION = postgres_fdw
DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.sql DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.sql postgres_fdw--1.2--1.3.sql
REGRESS = postgres_fdw query_cancel REGRESS = postgres_fdw query_cancel
ISOLATION = eval_plan_qual ISOLATION = eval_plan_qual

View file

@ -132,6 +132,7 @@ PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2); PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2);
PG_FUNCTION_INFO_V1(postgres_fdw_disconnect); PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all); PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
PG_FUNCTION_INFO_V1(postgres_fdw_connection);
/* prototypes of private functions */ /* prototypes of private functions */
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user); static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
@ -476,6 +477,142 @@ pgfdw_security_check(const char **keywords, const char **values, UserMapping *us
errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes."))); errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
} }
/*
* Construct connection params from generic options of ForeignServer and
* UserMapping. (Some of them might not be libpq options, in which case we'll
* just waste a few array slots.)
*/
static void
construct_connection_params(ForeignServer *server, UserMapping *user,
const char ***p_keywords, const char ***p_values,
char **p_appname)
{
const char **keywords;
const char **values;
char *appname = NULL;
int n;
/*
* Add 4 extra slots for application_name, fallback_application_name,
* client_encoding, end marker, and 3 extra slots for scram keys and
* required scram pass-through options.
*/
n = list_length(server->options) + list_length(user->options) + 4 + 3;
keywords = (const char **) palloc(n * sizeof(char *));
values = (const char **) palloc(n * sizeof(char *));
n = 0;
n += ExtractConnectionOptions(server->options,
keywords + n, values + n);
n += ExtractConnectionOptions(user->options,
keywords + n, values + n);
/*
* Use pgfdw_application_name as application_name if set.
*
* PQconnectdbParams() processes the parameter arrays from start to end.
* If any key word is repeated, the last value is used. Therefore note
* that pgfdw_application_name must be added to the arrays after options
* of ForeignServer are, so that it can override application_name set in
* ForeignServer.
*/
if (pgfdw_application_name && *pgfdw_application_name != '\0')
{
keywords[n] = "application_name";
values[n] = pgfdw_application_name;
n++;
}
/*
* Search the parameter arrays to find application_name setting, and
* replace escape sequences in it with status information if found. The
* arrays are searched backwards because the last value is used if
* application_name is repeatedly set.
*/
for (int i = n - 1; i >= 0; i--)
{
if (strcmp(keywords[i], "application_name") == 0 &&
*(values[i]) != '\0')
{
/*
* Use this application_name setting if it's not empty string even
* after any escape sequences in it are replaced.
*/
appname = process_pgfdw_appname(values[i]);
if (appname[0] != '\0')
{
values[i] = appname;
break;
}
/*
* This empty application_name is not used, so we set values[i] to
* NULL and keep searching the array to find the next one.
*/
values[i] = NULL;
pfree(appname);
appname = NULL;
}
}
*p_appname = appname;
/* Use "postgres_fdw" as fallback_application_name */
keywords[n] = "fallback_application_name";
values[n] = "postgres_fdw";
n++;
/* Set client_encoding so that libpq can convert encoding properly. */
keywords[n] = "client_encoding";
values[n] = GetDatabaseEncodingName();
n++;
/* Add required SCRAM pass-through connection options if it's enabled. */
if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(server, user))
{
int len;
int encoded_len;
keywords[n] = "scram_client_key";
len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
/* don't forget the zero-terminator */
values[n] = palloc0(len + 1);
encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
sizeof(MyProcPort->scram_ClientKey),
(char *) values[n], len);
if (encoded_len < 0)
elog(ERROR, "could not encode SCRAM client key");
n++;
keywords[n] = "scram_server_key";
len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
/* don't forget the zero-terminator */
values[n] = palloc0(len + 1);
encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
sizeof(MyProcPort->scram_ServerKey),
(char *) values[n], len);
if (encoded_len < 0)
elog(ERROR, "could not encode SCRAM server key");
n++;
/*
* Require scram-sha-256 to ensure that no other auth method is used
* when connecting with foreign server.
*/
keywords[n] = "require_auth";
values[n] = "scram-sha-256";
n++;
}
keywords[n] = values[n] = NULL;
/* Verify the set of connection parameters. */
check_conn_params(keywords, values, user);
*p_keywords = keywords;
*p_values = values;
}
/* /*
* Connect to remote server using specified server and user mapping properties. * Connect to remote server using specified server and user mapping properties.
*/ */
@ -491,127 +628,9 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
{ {
const char **keywords; const char **keywords;
const char **values; const char **values;
char *appname = NULL; char *appname;
int n;
/* construct_connection_params(server, user, &keywords, &values, &appname);
* Construct connection params from generic options of ForeignServer
* and UserMapping. (Some of them might not be libpq options, in
* which case we'll just waste a few array slots.) Add 4 extra slots
* for application_name, fallback_application_name, client_encoding,
* end marker, and 3 extra slots for scram keys and required scram
* pass-through options.
*/
n = list_length(server->options) + list_length(user->options) + 4 + 3;
keywords = (const char **) palloc(n * sizeof(char *));
values = (const char **) palloc(n * sizeof(char *));
n = 0;
n += ExtractConnectionOptions(server->options,
keywords + n, values + n);
n += ExtractConnectionOptions(user->options,
keywords + n, values + n);
/*
* Use pgfdw_application_name as application_name if set.
*
* PQconnectdbParams() processes the parameter arrays from start to
* end. If any key word is repeated, the last value is used. Therefore
* note that pgfdw_application_name must be added to the arrays after
* options of ForeignServer are, so that it can override
* application_name set in ForeignServer.
*/
if (pgfdw_application_name && *pgfdw_application_name != '\0')
{
keywords[n] = "application_name";
values[n] = pgfdw_application_name;
n++;
}
/*
* Search the parameter arrays to find application_name setting, and
* replace escape sequences in it with status information if found.
* The arrays are searched backwards because the last value is used if
* application_name is repeatedly set.
*/
for (int i = n - 1; i >= 0; i--)
{
if (strcmp(keywords[i], "application_name") == 0 &&
*(values[i]) != '\0')
{
/*
* Use this application_name setting if it's not empty string
* even after any escape sequences in it are replaced.
*/
appname = process_pgfdw_appname(values[i]);
if (appname[0] != '\0')
{
values[i] = appname;
break;
}
/*
* This empty application_name is not used, so we set
* values[i] to NULL and keep searching the array to find the
* next one.
*/
values[i] = NULL;
pfree(appname);
appname = NULL;
}
}
/* Use "postgres_fdw" as fallback_application_name */
keywords[n] = "fallback_application_name";
values[n] = "postgres_fdw";
n++;
/* Set client_encoding so that libpq can convert encoding properly. */
keywords[n] = "client_encoding";
values[n] = GetDatabaseEncodingName();
n++;
/* Add required SCRAM pass-through connection options if it's enabled. */
if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(server, user))
{
int len;
int encoded_len;
keywords[n] = "scram_client_key";
len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
/* don't forget the zero-terminator */
values[n] = palloc0(len + 1);
encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
sizeof(MyProcPort->scram_ClientKey),
(char *) values[n], len);
if (encoded_len < 0)
elog(ERROR, "could not encode SCRAM client key");
n++;
keywords[n] = "scram_server_key";
len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
/* don't forget the zero-terminator */
values[n] = palloc0(len + 1);
encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
sizeof(MyProcPort->scram_ServerKey),
(char *) values[n], len);
if (encoded_len < 0)
elog(ERROR, "could not encode SCRAM server key");
n++;
/*
* Require scram-sha-256 to ensure that no other auth method is
* used when connecting with foreign server.
*/
keywords[n] = "require_auth";
values[n] = "scram-sha-256";
n++;
}
keywords[n] = values[n] = NULL;
/* Verify the set of connection parameters. */
check_conn_params(keywords, values, user);
/* first time, allocate or get the custom wait event */ /* first time, allocate or get the custom wait event */
if (pgfdw_we_connect == 0) if (pgfdw_we_connect == 0)
@ -2310,6 +2329,56 @@ postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
} }
} }
/*
* Values in connection strings must be enclosed in single quotes. Single
* quotes and backslashes must be escaped with backslash. NB: these rules are
* different from the rules for escaping a SQL literal.
*/
static void
appendEscapedValue(StringInfo str, const char *val)
{
appendStringInfoChar(str, '\'');
for (int i = 0; val[i] != '\0'; i++)
{
if (val[i] == '\\' || val[i] == '\'')
appendStringInfoChar(str, '\\');
appendStringInfoChar(str, val[i]);
}
appendStringInfoChar(str, '\'');
}
Datum
postgres_fdw_connection(PG_FUNCTION_ARGS)
{
Oid userid = PG_GETARG_OID(0);
Oid serverid = PG_GETARG_OID(1);
ForeignServer *server = GetForeignServer(serverid);
UserMapping *user = GetUserMapping(userid, serverid);
StringInfoData str;
const char **keywords;
const char **values;
char *appname;
char *sep = "";
construct_connection_params(server, user, &keywords, &values, &appname);
initStringInfo(&str);
for (int i = 0; keywords[i] != NULL; i++)
{
if (values[i] == NULL)
continue;
appendStringInfo(&str, "%s%s = ", sep, keywords[i]);
appendEscapedValue(&str, values[i]);
sep = " ";
}
if (appname != NULL)
pfree(appname);
pfree(keywords);
pfree(values);
PG_RETURN_TEXT_P(cstring_to_text(str.data));
}
/* /*
* List active foreign server connections. * List active foreign server connections.
* *

View file

@ -255,6 +255,14 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
ANALYZE ft1; ANALYZE ft1;
ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true'); ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
-- =================================================================== -- ===================================================================
-- test subscription
-- ===================================================================
CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
DROP SUBSCRIPTION regress_pgfdw_subscription;
-- ===================================================================
-- test error case for create publication on foreign table -- test error case for create publication on foreign table
-- =================================================================== -- ===================================================================
CREATE PUBLICATION testpub_ftbl FOR TABLE ft1; -- should fail CREATE PUBLICATION testpub_ftbl FOR TABLE ft1; -- should fail

View file

@ -27,6 +27,7 @@ install_data(
'postgres_fdw--1.0.sql', 'postgres_fdw--1.0.sql',
'postgres_fdw--1.0--1.1.sql', 'postgres_fdw--1.0--1.1.sql',
'postgres_fdw--1.1--1.2.sql', 'postgres_fdw--1.1--1.2.sql',
'postgres_fdw--1.2--1.3.sql',
kwargs: contrib_data_args, kwargs: contrib_data_args,
) )
@ -50,6 +51,7 @@ tests += {
'tap': { 'tap': {
'tests': [ 'tests': [
't/001_auth_scram.pl', 't/001_auth_scram.pl',
't/010_subscription.pl',
], ],
}, },
} }

View file

@ -0,0 +1,12 @@
/* contrib/postgres_fdw/postgres_fdw--1.2--1.3.sql */
-- complain if script is sourced in psql, rather than via ALTER EXTENSION
\echo Use "ALTER EXTENSION postgres_fdw UPDATE TO '1.3'" to load this file. \quit
-- takes internal parameter to prevent calling from SQL
CREATE FUNCTION postgres_fdw_connection(oid, oid, internal)
RETURNS text
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT PARALLEL RESTRICTED;
ALTER FOREIGN DATA WRAPPER postgres_fdw CONNECTION postgres_fdw_connection;

View file

@ -1,5 +1,5 @@
# postgres_fdw extension # postgres_fdw extension
comment = 'foreign-data wrapper for remote PostgreSQL servers' comment = 'foreign-data wrapper for remote PostgreSQL servers'
default_version = '1.2' default_version = '1.3'
module_pathname = '$libdir/postgres_fdw' module_pathname = '$libdir/postgres_fdw'
relocatable = true relocatable = true

View file

@ -244,6 +244,13 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
ANALYZE ft1; ANALYZE ft1;
ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true'); ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
-- ===================================================================
-- test subscription
-- ===================================================================
CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
DROP SUBSCRIPTION regress_pgfdw_subscription;
-- =================================================================== -- ===================================================================
-- test error case for create publication on foreign table -- test error case for create publication on foreign table
-- =================================================================== -- ===================================================================

View file

@ -0,0 +1,71 @@
# Copyright (c) 2021-2026, PostgreSQL Global Development Group
# Basic logical replication test
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
# Initialize publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->start;
# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init;
$node_subscriber->start;
# Create some preexisting content on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_ins AS SELECT a, a + 1 as b FROM generate_series(1,1002) AS a");
# Replicate the changes without columns
$node_publisher->safe_psql('postgres', "CREATE TABLE tab_no_col()");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_no_col default VALUES");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres', "CREATE EXTENSION postgres_fdw");
$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int, b int)");
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_ins");
my $publisher_host = $node_publisher->host;
my $publisher_port = $node_publisher->port;
$node_subscriber->safe_psql('postgres',
"CREATE SERVER tap_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
);
$node_subscriber->safe_psql('postgres',
"CREATE USER MAPPING FOR PUBLIC SERVER tap_server"
);
$node_subscriber->safe_psql('postgres',
"CREATE FOREIGN TABLE f_tab_ins (a int, b int) SERVER tap_server OPTIONS(table_name 'tab_ins')"
);
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub SERVER tap_server PUBLICATION tap_pub WITH (password_required=false)"
);
# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match");
is($result, qq(1002), 'check that initial data was copied to subscriber');
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_ins SELECT a, a + 1 FROM generate_series(1003,1050) a");
$node_publisher->wait_for_catchup('tap_sub');
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match");
is($result, qq(1050), 'check that inserted data was copied to subscriber');
done_testing();

View file

@ -2577,7 +2577,9 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
<para> <para>
To create a subscription, the user must have the privileges of To create a subscription, the user must have the privileges of
the <literal>pg_create_subscription</literal> role, as well as the <literal>pg_create_subscription</literal> role, as well as
<literal>CREATE</literal> privileges on the database. <literal>CREATE</literal> privileges on the database. If
<literal>SERVER</literal> is specified, the user also must have
<literal>USAGE</literal> privileges on the server.
</para> </para>
<para> <para>

View file

@ -1049,6 +1049,32 @@ postgres=# SELECT postgres_fdw_disconnect_all();
</para> </para>
</sect2> </sect2>
<sect2 id="postgres-fdw-server-subscription">
<title>Subscription Management</title>
<para>
<filename>postgres_fdw</filename> supports subscription connections using
the same options described in <xref
linkend="postgres-fdw-options-connection"/>.
</para>
<para>
For example, assuming the remote server <literal>foreign-host</literal> has
a publication <literal>testpub</literal>:
<programlisting>
CREATE SERVER subscription_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'foreign-host', dbname 'foreign_db');
CREATE USER MAPPING FOR local_user SERVER subscription_server OPTIONS (user 'foreign_user', password 'password');
CREATE SUBSCRIPTION my_subscription SERVER subscription_server PUBLICATION testpub;
</programlisting>
</para>
<para>
To create a subscription, the user must be a member of the <xref
linkend="predefined-role-pg-create-subscription"/> role and have
<literal>USAGE</literal> privileges on the server.
</para>
</sect2>
<sect2 id="postgres-fdw-transaction-management"> <sect2 id="postgres-fdw-transaction-management">
<title>Transaction Management</title> <title>Transaction Management</title>

View file

@ -24,6 +24,7 @@ PostgreSQL documentation
ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable> ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable>
[ HANDLER <replaceable class="parameter">handler_function</replaceable> | NO HANDLER ] [ HANDLER <replaceable class="parameter">handler_function</replaceable> | NO HANDLER ]
[ VALIDATOR <replaceable class="parameter">validator_function</replaceable> | NO VALIDATOR ] [ VALIDATOR <replaceable class="parameter">validator_function</replaceable> | NO VALIDATOR ]
[ CONNECTION <replaceable class="parameter">connection_function</replaceable> | NO CONNECTION ]
[ OPTIONS ( [ ADD | SET | DROP ] <replaceable class="parameter">option</replaceable> ['<replaceable class="parameter">value</replaceable>'] [, ... ]) ] [ OPTIONS ( [ ADD | SET | DROP ] <replaceable class="parameter">option</replaceable> ['<replaceable class="parameter">value</replaceable>'] [, ... ]) ]
ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER } ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable> ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
@ -112,6 +113,25 @@ ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable> REN
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><literal>CONNECTION <replaceable class="parameter">connection_function</replaceable></literal></term>
<listitem>
<para>
Specifies a new connection function for the foreign-data wrapper.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>NO CONNECTION</literal></term>
<listitem>
<para>
This is used to specify that the foreign-data wrapper should no
longer have a connection function.
</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term><literal>OPTIONS ( [ ADD | SET | DROP ] <replaceable class="parameter">option</replaceable> ['<replaceable class="parameter">value</replaceable>'] [, ... ] )</literal></term> <term><literal>OPTIONS ( [ ADD | SET | DROP ] <replaceable class="parameter">option</replaceable> ['<replaceable class="parameter">value</replaceable>'] [, ... ] )</literal></term>
<listitem> <listitem>

View file

@ -21,6 +21,7 @@ PostgreSQL documentation
<refsynopsisdiv> <refsynopsisdiv>
<synopsis> <synopsis>
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SERVER <replaceable>servername</replaceable>
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>' ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>'
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
@ -102,13 +103,24 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry id="sql-altersubscription-params-server">
<term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
<listitem>
<para>
This clause replaces the foreign server or connection string originally
set by <xref linkend="sql-createsubscription"/> with the foreign server
<replaceable>servername</replaceable>.
</para>
</listitem>
</varlistentry>
<varlistentry id="sql-altersubscription-params-connection"> <varlistentry id="sql-altersubscription-params-connection">
<term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term> <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
<listitem> <listitem>
<para> <para>
This clause replaces the connection string originally set by This clause replaces the foreign server or connection string originally
<xref linkend="sql-createsubscription"/>. See there for more set by <xref linkend="sql-createsubscription"/> with the connection
information. string <replaceable>conninfo</replaceable>.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>

View file

@ -24,6 +24,7 @@ PostgreSQL documentation
CREATE FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable> CREATE FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable>
[ HANDLER <replaceable class="parameter">handler_function</replaceable> | NO HANDLER ] [ HANDLER <replaceable class="parameter">handler_function</replaceable> | NO HANDLER ]
[ VALIDATOR <replaceable class="parameter">validator_function</replaceable> | NO VALIDATOR ] [ VALIDATOR <replaceable class="parameter">validator_function</replaceable> | NO VALIDATOR ]
[ CONNECTION <replaceable class="parameter">connection_function</replaceable> | NO CONNECTION ]
[ OPTIONS ( <replaceable class="parameter">option</replaceable> '<replaceable class="parameter">value</replaceable>' [, ... ] ) ] [ OPTIONS ( <replaceable class="parameter">option</replaceable> '<replaceable class="parameter">value</replaceable>' [, ... ] ) ]
</synopsis> </synopsis>
</refsynopsisdiv> </refsynopsisdiv>
@ -99,6 +100,25 @@ CREATE FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><literal>CONNECTION <replaceable class="parameter">connection_function</replaceable></literal></term>
<listitem>
<para>
<replaceable class="parameter">connection_function</replaceable> is the
name of a previously registered function that will be called to generate
the postgres connection string when a foreign server is used as part of
<xref linkend="sql-createsubscription"/>. If no connection function or
<literal>NO CONNECTION</literal> is specified, then servers using this
foreign data wrapper cannot be used for <literal>CREATE
SUBSCRIPTION</literal>. The connection function must take three
arguments: one of type <type>oid</type> for the user, one of type
<type>oid</type> for the server, and an unused third argument of type
<type>internal</type> (which prevents calling the function in other
contexts).
</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term><literal>OPTIONS ( <replaceable class="parameter">option</replaceable> '<replaceable class="parameter">value</replaceable>' [, ... ] )</literal></term> <term><literal>OPTIONS ( <replaceable class="parameter">option</replaceable> '<replaceable class="parameter">value</replaceable>' [, ... ] )</literal></term>
<listitem> <listitem>

View file

@ -42,6 +42,13 @@ CREATE SERVER [ IF NOT EXISTS ] <replaceable class="parameter">server_name</repl
means of user mappings. means of user mappings.
</para> </para>
<para>
If the foreign data wrapper <replaceable>fdw_name</replaceable> is
specified with a <literal>CONNECTION</literal> clause, then <xref
linkend="sql-createsubscription"/> may use this foreign server for
connection information.
</para>
<para> <para>
The server name must be unique within the database. The server name must be unique within the database.
</para> </para>

View file

@ -22,7 +22,7 @@ PostgreSQL documentation
<refsynopsisdiv> <refsynopsisdiv>
<synopsis> <synopsis>
CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable> CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable>
CONNECTION '<replaceable class="parameter">conninfo</replaceable>' { SERVER <replaceable class="parameter">servername</replaceable> | CONNECTION '<replaceable class="parameter">conninfo</replaceable>' }
PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...]
[ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] [ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
</synopsis> </synopsis>
@ -77,6 +77,20 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry id="sql-createsubscription-params-server">
<term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
<listitem>
<para>
A foreign server to use for the connection. The server's foreign data
wrapper must have a <replaceable>connection_function</replaceable>
registered, and a user mapping for the subscription owner on the server
must exist. Additionally, the subscription owner must have
<literal>USAGE</literal> privileges on
<replaceable>servername</replaceable>.
</para>
</listitem>
</varlistentry>
<varlistentry id="sql-createsubscription-params-connection"> <varlistentry id="sql-createsubscription-params-connection">
<term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term> <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
<listitem> <listitem>

View file

@ -895,6 +895,17 @@ findDependentObjects(const ObjectAddress *object,
object->objectSubId == 0) object->objectSubId == 0)
continue; continue;
/*
* Check that the dependent object is not in a shared catalog, which
* is not supported by doDeletion().
*/
if (IsSharedRelation(otherObject.classId))
ereport(ERROR,
(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
errmsg("cannot drop %s because %s depends on it",
getObjectDescription(object, false),
getObjectDescription(&otherObject, false))));
/* /*
* Must lock the dependent object before recursing to it. * Must lock the dependent object before recursing to it.
*/ */

View file

@ -19,11 +19,14 @@
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/tableam.h" #include "access/tableam.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
#include "catalog/pg_foreign_server.h"
#include "catalog/pg_subscription.h" #include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h" #include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "foreign/foreign.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/array.h" #include "utils/array.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
@ -69,7 +72,7 @@ GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
* Fetch the subscription from the syscache. * Fetch the subscription from the syscache.
*/ */
Subscription * Subscription *
GetSubscription(Oid subid, bool missing_ok) GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
{ {
HeapTuple tup; HeapTuple tup;
Subscription *sub; Subscription *sub;
@ -108,10 +111,35 @@ GetSubscription(Oid subid, bool missing_ok)
sub->retentionactive = subform->subretentionactive; sub->retentionactive = subform->subretentionactive;
/* Get conninfo */ /* Get conninfo */
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, if (OidIsValid(subform->subserver))
tup, {
Anum_pg_subscription_subconninfo); AclResult aclresult;
sub->conninfo = TextDatumGetCString(datum);
/* recheck ACL if requested */
if (aclcheck)
{
aclresult = object_aclcheck(ForeignServerRelationId,
subform->subserver,
subform->subowner, ACL_USAGE);
if (aclresult != ACLCHECK_OK)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
GetUserNameFromId(subform->subowner, false),
ForeignServerName(subform->subserver))));
}
sub->conninfo = ForeignServerConnectionString(subform->subowner,
subform->subserver);
}
else
{
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
tup,
Anum_pg_subscription_subconninfo);
sub->conninfo = TextDatumGetCString(datum);
}
/* Get slotname */ /* Get slotname */
datum = SysCacheGetAttr(SUBSCRIPTIONOID, datum = SysCacheGetAttr(SUBSCRIPTIONOID,

View file

@ -1449,7 +1449,7 @@ GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
subbinary, substream, subtwophasestate, subdisableonerr, subbinary, substream, subtwophasestate, subdisableonerr,
subpasswordrequired, subrunasowner, subfailover, subpasswordrequired, subrunasowner, subfailover,
subretaindeadtuples, submaxretention, subretentionactive, subretaindeadtuples, submaxretention, subretentionactive,
subslotname, subsynccommit, subpublications, suborigin) subserver, subslotname, subsynccommit, subpublications, suborigin)
ON pg_subscription TO public; ON pg_subscription TO public;
CREATE VIEW pg_stat_subscription_stats AS CREATE VIEW pg_stat_subscription_stats AS

View file

@ -522,21 +522,53 @@ lookup_fdw_validator_func(DefElem *validator)
/* validator's return value is ignored, so we don't check the type */ /* validator's return value is ignored, so we don't check the type */
} }
/*
* Convert a connection string function name passed from the parser to an Oid.
*/
static Oid
lookup_fdw_connection_func(DefElem *connection)
{
Oid connectionOid;
Oid funcargtypes[3];
if (connection == NULL || connection->arg == NULL)
return InvalidOid;
/* connection string functions take user oid, server oid */
funcargtypes[0] = OIDOID;
funcargtypes[1] = OIDOID;
funcargtypes[2] = INTERNALOID;
connectionOid = LookupFuncName((List *) connection->arg, 3, funcargtypes, false);
/* check that connection string function has correct return type */
if (get_func_rettype(connectionOid) != TEXTOID)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("function %s must return type %s",
NameListToString((List *) connection->arg), "text")));
return connectionOid;
}
/* /*
* Process function options of CREATE/ALTER FDW * Process function options of CREATE/ALTER FDW
*/ */
static void static void
parse_func_options(ParseState *pstate, List *func_options, parse_func_options(ParseState *pstate, List *func_options,
bool *handler_given, Oid *fdwhandler, bool *handler_given, Oid *fdwhandler,
bool *validator_given, Oid *fdwvalidator) bool *validator_given, Oid *fdwvalidator,
bool *connection_given, Oid *fdwconnection)
{ {
ListCell *cell; ListCell *cell;
*handler_given = false; *handler_given = false;
*validator_given = false; *validator_given = false;
*connection_given = false;
/* return InvalidOid if not given */ /* return InvalidOid if not given */
*fdwhandler = InvalidOid; *fdwhandler = InvalidOid;
*fdwvalidator = InvalidOid; *fdwvalidator = InvalidOid;
*fdwconnection = InvalidOid;
foreach(cell, func_options) foreach(cell, func_options)
{ {
@ -556,6 +588,13 @@ parse_func_options(ParseState *pstate, List *func_options,
*validator_given = true; *validator_given = true;
*fdwvalidator = lookup_fdw_validator_func(def); *fdwvalidator = lookup_fdw_validator_func(def);
} }
else if (strcmp(def->defname, "connection") == 0)
{
if (*connection_given)
errorConflictingDefElem(def, pstate);
*connection_given = true;
*fdwconnection = lookup_fdw_connection_func(def);
}
else else
elog(ERROR, "option \"%s\" not recognized", elog(ERROR, "option \"%s\" not recognized",
def->defname); def->defname);
@ -575,8 +614,10 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt)
Oid fdwId; Oid fdwId;
bool handler_given; bool handler_given;
bool validator_given; bool validator_given;
bool connection_given;
Oid fdwhandler; Oid fdwhandler;
Oid fdwvalidator; Oid fdwvalidator;
Oid fdwconnection;
Datum fdwoptions; Datum fdwoptions;
Oid ownerId; Oid ownerId;
ObjectAddress myself; ObjectAddress myself;
@ -620,10 +661,12 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt)
/* Lookup handler and validator functions, if given */ /* Lookup handler and validator functions, if given */
parse_func_options(pstate, stmt->func_options, parse_func_options(pstate, stmt->func_options,
&handler_given, &fdwhandler, &handler_given, &fdwhandler,
&validator_given, &fdwvalidator); &validator_given, &fdwvalidator,
&connection_given, &fdwconnection);
values[Anum_pg_foreign_data_wrapper_fdwhandler - 1] = ObjectIdGetDatum(fdwhandler); values[Anum_pg_foreign_data_wrapper_fdwhandler - 1] = ObjectIdGetDatum(fdwhandler);
values[Anum_pg_foreign_data_wrapper_fdwvalidator - 1] = ObjectIdGetDatum(fdwvalidator); values[Anum_pg_foreign_data_wrapper_fdwvalidator - 1] = ObjectIdGetDatum(fdwvalidator);
values[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection);
nulls[Anum_pg_foreign_data_wrapper_fdwacl - 1] = true; nulls[Anum_pg_foreign_data_wrapper_fdwacl - 1] = true;
@ -695,8 +738,10 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
Datum datum; Datum datum;
bool handler_given; bool handler_given;
bool validator_given; bool validator_given;
bool connection_given;
Oid fdwhandler; Oid fdwhandler;
Oid fdwvalidator; Oid fdwvalidator;
Oid fdwconnection;
ObjectAddress myself; ObjectAddress myself;
rel = table_open(ForeignDataWrapperRelationId, RowExclusiveLock); rel = table_open(ForeignDataWrapperRelationId, RowExclusiveLock);
@ -726,7 +771,8 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
parse_func_options(pstate, stmt->func_options, parse_func_options(pstate, stmt->func_options,
&handler_given, &fdwhandler, &handler_given, &fdwhandler,
&validator_given, &fdwvalidator); &validator_given, &fdwvalidator,
&connection_given, &fdwconnection);
if (handler_given) if (handler_given)
{ {
@ -764,6 +810,12 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
fdwvalidator = fdwForm->fdwvalidator; fdwvalidator = fdwForm->fdwvalidator;
} }
if (connection_given)
{
repl_val[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection);
repl_repl[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = true;
}
/* /*
* If options specified, validate and update. * If options specified, validate and update.
*/ */

View file

@ -27,13 +27,16 @@
#include "catalog/objectaddress.h" #include "catalog/objectaddress.h"
#include "catalog/pg_authid_d.h" #include "catalog/pg_authid_d.h"
#include "catalog/pg_database_d.h" #include "catalog/pg_database_d.h"
#include "catalog/pg_foreign_server.h"
#include "catalog/pg_subscription.h" #include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h" #include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "catalog/pg_user_mapping.h"
#include "commands/defrem.h" #include "commands/defrem.h"
#include "commands/event_trigger.h" #include "commands/event_trigger.h"
#include "commands/subscriptioncmds.h" #include "commands/subscriptioncmds.h"
#include "executor/executor.h" #include "executor/executor.h"
#include "foreign/foreign.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "nodes/makefuncs.h" #include "nodes/makefuncs.h"
#include "pgstat.h" #include "pgstat.h"
@ -619,6 +622,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
Datum values[Natts_pg_subscription]; Datum values[Natts_pg_subscription];
Oid owner = GetUserId(); Oid owner = GetUserId();
HeapTuple tup; HeapTuple tup;
Oid serverid;
char *conninfo; char *conninfo;
char originname[NAMEDATALEN]; char originname[NAMEDATALEN];
List *publications; List *publications;
@ -730,15 +734,40 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
if (opts.wal_receiver_timeout == NULL) if (opts.wal_receiver_timeout == NULL)
opts.wal_receiver_timeout = "-1"; opts.wal_receiver_timeout = "-1";
conninfo = stmt->conninfo;
publications = stmt->publication;
/* Load the library providing us libpq calls. */ /* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false); load_file("libpqwalreceiver", false);
if (stmt->servername)
{
ForeignServer *server;
Assert(!stmt->conninfo);
conninfo = NULL;
server = GetForeignServerByName(stmt->servername, false);
aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
/* make sure a user mapping exists */
GetUserMapping(owner, server->serverid);
serverid = server->serverid;
conninfo = ForeignServerConnectionString(owner, serverid);
}
else
{
Assert(stmt->conninfo);
serverid = InvalidOid;
conninfo = stmt->conninfo;
}
/* Check the connection info string. */ /* Check the connection info string. */
walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser()); walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
publications = stmt->publication;
/* Everything ok, form a new tuple. */ /* Everything ok, form a new tuple. */
memset(values, 0, sizeof(values)); memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls)); memset(nulls, false, sizeof(nulls));
@ -768,8 +797,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
Int32GetDatum(opts.maxretention); Int32GetDatum(opts.maxretention);
values[Anum_pg_subscription_subretentionactive - 1] = values[Anum_pg_subscription_subretentionactive - 1] =
Int32GetDatum(opts.retaindeadtuples); Int32GetDatum(opts.retaindeadtuples);
values[Anum_pg_subscription_subconninfo - 1] = values[Anum_pg_subscription_subserver - 1] = serverid;
CStringGetTextDatum(conninfo); if (!OidIsValid(serverid))
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
else
nulls[Anum_pg_subscription_subconninfo - 1] = true;
if (opts.slot_name) if (opts.slot_name)
values[Anum_pg_subscription_subslotname - 1] = values[Anum_pg_subscription_subslotname - 1] =
DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name)); DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
@ -792,6 +825,18 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
recordDependencyOnOwner(SubscriptionRelationId, subid, owner); recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
ObjectAddressSet(myself, SubscriptionRelationId, subid);
if (stmt->servername)
{
ObjectAddress referenced;
Assert(OidIsValid(serverid));
ObjectAddressSet(referenced, ForeignServerRelationId, serverid);
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
}
/* /*
* A replication origin is currently created for all subscriptions, * A replication origin is currently created for all subscriptions,
* including those that only contain sequences or are otherwise empty. * including those that only contain sequences or are otherwise empty.
@ -945,8 +990,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
if (opts.enabled || opts.retaindeadtuples) if (opts.enabled || opts.retaindeadtuples)
ApplyLauncherWakeupAtCommit(); ApplyLauncherWakeupAtCommit();
ObjectAddressSet(myself, SubscriptionRelationId, subid);
InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0); InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
return myself; return myself;
@ -1410,7 +1453,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION, aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
stmt->subname); stmt->subname);
sub = GetSubscription(subid, false); /*
* Skip ACL checks on the subscription's foreign server, if any. If
* changing the server (or replacing it with a raw connection), then the
* old one will be removed anyway. If changing something unrelated,
* there's no need to do an additional ACL check here; that will be done
* by the subscription worker anyway.
*/
sub = GetSubscription(subid, false, false);
retain_dead_tuples = sub->retaindeadtuples; retain_dead_tuples = sub->retaindeadtuples;
origin = sub->origin; origin = sub->origin;
@ -1435,6 +1485,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
memset(nulls, false, sizeof(nulls)); memset(nulls, false, sizeof(nulls));
memset(replaces, false, sizeof(replaces)); memset(replaces, false, sizeof(replaces));
ObjectAddressSet(myself, SubscriptionRelationId, subid);
switch (stmt->kind) switch (stmt->kind)
{ {
case ALTER_SUBSCRIPTION_OPTIONS: case ALTER_SUBSCRIPTION_OPTIONS:
@ -1753,7 +1805,78 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
break; break;
} }
case ALTER_SUBSCRIPTION_SERVER:
{
ForeignServer *new_server;
ObjectAddress referenced;
AclResult aclresult;
char *conninfo;
/*
* Remove what was there before, either another foreign server
* or a connection string.
*/
if (form->subserver)
{
deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
DEPENDENCY_NORMAL,
ForeignServerRelationId, form->subserver);
}
else
{
nulls[Anum_pg_subscription_subconninfo - 1] = true;
replaces[Anum_pg_subscription_subconninfo - 1] = true;
}
/*
* Check that the subscription owner has USAGE privileges on
* the server.
*/
new_server = GetForeignServerByName(stmt->servername, false);
aclresult = object_aclcheck(ForeignServerRelationId,
new_server->serverid,
form->subowner, ACL_USAGE);
if (aclresult != ACLCHECK_OK)
ereport(ERROR,
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
GetUserNameFromId(form->subowner, false),
ForeignServerName(new_server->serverid)));
/* make sure a user mapping exists */
GetUserMapping(form->subowner, new_server->serverid);
conninfo = ForeignServerConnectionString(form->subowner,
new_server->serverid);
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
/* Check the connection info string. */
walrcv_check_conninfo(conninfo,
sub->passwordrequired && !sub->ownersuperuser);
values[Anum_pg_subscription_subserver - 1] = new_server->serverid;
replaces[Anum_pg_subscription_subserver - 1] = true;
ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid);
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
update_tuple = true;
}
break;
case ALTER_SUBSCRIPTION_CONNECTION: case ALTER_SUBSCRIPTION_CONNECTION:
/* remove reference to foreign server and dependencies, if present */
if (form->subserver)
{
deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
DEPENDENCY_NORMAL,
ForeignServerRelationId, form->subserver);
values[Anum_pg_subscription_subserver - 1] = InvalidOid;
replaces[Anum_pg_subscription_subserver - 1] = true;
}
/* Load the library providing us libpq calls. */ /* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false); load_file("libpqwalreceiver", false);
/* Check the connection info string. */ /* Check the connection info string. */
@ -2038,8 +2161,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
table_close(rel, RowExclusiveLock); table_close(rel, RowExclusiveLock);
ObjectAddressSet(myself, SubscriptionRelationId, subid);
InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0); InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
/* Wake up related replication workers to handle this change quickly. */ /* Wake up related replication workers to handle this change quickly. */
@ -2068,7 +2189,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
ListCell *lc; ListCell *lc;
char originname[NAMEDATALEN]; char originname[NAMEDATALEN];
char *err = NULL; char *err = NULL;
WalReceiverConn *wrconn; WalReceiverConn *wrconn = NULL;
Form_pg_subscription form; Form_pg_subscription form;
List *rstates; List *rstates;
bool must_use_password; bool must_use_password;
@ -2126,9 +2247,35 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
subname = pstrdup(NameStr(*DatumGetName(datum))); subname = pstrdup(NameStr(*DatumGetName(datum)));
/* Get conninfo */ /* Get conninfo */
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup, if (OidIsValid(form->subserver))
Anum_pg_subscription_subconninfo); {
conninfo = TextDatumGetCString(datum); AclResult aclresult;
aclresult = object_aclcheck(ForeignServerRelationId, form->subserver,
form->subowner, ACL_USAGE);
if (aclresult != ACLCHECK_OK)
{
/*
* Unable to generate connection string because permissions on the
* foreign server have been removed. Follow the same logic as an
* unusable subconninfo (which will result in an ERROR later
* unless slot_name = NONE).
*/
err = psprintf(_("subscription owner \"%s\" does not have permission on foreign server \"%s\""),
GetUserNameFromId(form->subowner, false),
ForeignServerName(form->subserver));
conninfo = NULL;
}
else
conninfo = ForeignServerConnectionString(form->subowner,
form->subserver);
}
else
{
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
Anum_pg_subscription_subconninfo);
conninfo = TextDatumGetCString(datum);
}
/* Get slotname */ /* Get slotname */
datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
@ -2227,6 +2374,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
} }
/* Clean up dependencies */ /* Clean up dependencies */
deleteDependencyRecordsFor(SubscriptionRelationId, subid, false);
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
/* Remove any associated relation synchronization states. */ /* Remove any associated relation synchronization states. */
@ -2265,8 +2413,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
*/ */
load_file("libpqwalreceiver", false); load_file("libpqwalreceiver", false);
wrconn = walrcv_connect(conninfo, true, true, must_use_password, if (conninfo)
subname, &err); wrconn = walrcv_connect(conninfo, true, true, must_use_password,
subname, &err);
if (wrconn == NULL) if (wrconn == NULL)
{ {
if (!slotname) if (!slotname)
@ -2436,6 +2586,27 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
aclcheck_error(aclresult, OBJECT_DATABASE, aclcheck_error(aclresult, OBJECT_DATABASE,
get_database_name(MyDatabaseId)); get_database_name(MyDatabaseId));
/*
* If the subscription uses a server, check that the new owner has USAGE
* privileges on the server and that a user mapping exists. Note: does not
* re-check the resulting connection string.
*/
if (OidIsValid(form->subserver))
{
Oid serverid = form->subserver;
aclresult = object_aclcheck(ForeignServerRelationId, serverid, newOwnerId, ACL_USAGE);
if (aclresult != ACLCHECK_OK)
ereport(ERROR,
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("new subscription owner \"%s\" does not have permission on foreign server \"%s\"",
GetUserNameFromId(newOwnerId, false),
ForeignServerName(serverid)));
/* make sure a user mapping exists */
GetUserMapping(newOwnerId, serverid);
}
form->subowner = newOwnerId; form->subowner = newOwnerId;
CatalogTupleUpdate(rel, &tup->t_self, tup); CatalogTupleUpdate(rel, &tup->t_self, tup);

View file

@ -72,6 +72,7 @@ GetForeignDataWrapperExtended(Oid fdwid, bits16 flags)
fdw->fdwname = pstrdup(NameStr(fdwform->fdwname)); fdw->fdwname = pstrdup(NameStr(fdwform->fdwname));
fdw->fdwhandler = fdwform->fdwhandler; fdw->fdwhandler = fdwform->fdwhandler;
fdw->fdwvalidator = fdwform->fdwvalidator; fdw->fdwvalidator = fdwform->fdwvalidator;
fdw->fdwconnection = fdwform->fdwconnection;
/* Extract the fdwoptions */ /* Extract the fdwoptions */
datum = SysCacheGetAttr(FOREIGNDATAWRAPPEROID, datum = SysCacheGetAttr(FOREIGNDATAWRAPPEROID,
@ -176,6 +177,31 @@ GetForeignServerExtended(Oid serverid, bits16 flags)
} }
/*
* ForeignServerName - get name of foreign server.
*/
char *
ForeignServerName(Oid serverid)
{
Form_pg_foreign_server serverform;
char *servername;
HeapTuple tp;
tp = SearchSysCache1(FOREIGNSERVEROID, ObjectIdGetDatum(serverid));
if (!HeapTupleIsValid(tp))
elog(ERROR, "cache lookup failed for foreign server %u", serverid);
serverform = (Form_pg_foreign_server) GETSTRUCT(tp);
servername = pstrdup(NameStr(serverform->srvname));
ReleaseSysCache(tp);
return servername;
}
/* /*
* GetForeignServerByName - look up the foreign server definition by name. * GetForeignServerByName - look up the foreign server definition by name.
*/ */
@ -191,6 +217,66 @@ GetForeignServerByName(const char *srvname, bool missing_ok)
} }
/*
* Retrieve connection string from server's FDW.
*/
char *
ForeignServerConnectionString(Oid userid, Oid serverid)
{
MemoryContext tempContext;
MemoryContext oldcxt;
volatile text *connection_text = NULL;
char *result = NULL;
/*
* GetForeignServer, GetForeignDataWrapper, and the connection function
* itself all leak memory into CurrentMemoryContext. Switch to a temporary
* context for easy cleanup.
*/
tempContext = AllocSetContextCreate(CurrentMemoryContext,
"FDWConnectionContext",
ALLOCSET_SMALL_SIZES);
oldcxt = MemoryContextSwitchTo(tempContext);
PG_TRY();
{
ForeignServer *server;
ForeignDataWrapper *fdw;
Datum connection_datum;
server = GetForeignServer(serverid);
fdw = GetForeignDataWrapper(server->fdwid);
if (!OidIsValid(fdw->fdwconnection))
ereport(ERROR,
(errmsg("foreign data wrapper \"%s\" does not support subscription connections",
fdw->fdwname),
errdetail("Foreign data wrapper must be defined with CONNECTION specified.")));
connection_datum = OidFunctionCall3(fdw->fdwconnection,
ObjectIdGetDatum(userid),
ObjectIdGetDatum(serverid),
PointerGetDatum(NULL));
connection_text = DatumGetTextPP(connection_datum);
}
PG_FINALLY();
{
MemoryContextSwitchTo(oldcxt);
if (connection_text)
result = text_to_cstring((text *) connection_text);
MemoryContextDelete(tempContext);
}
PG_END_TRY();
return result;
}
/* /*
* GetUserMapping - look up the user mapping. * GetUserMapping - look up the user mapping.
* *

View file

@ -5583,6 +5583,8 @@ fdw_option:
| NO HANDLER { $$ = makeDefElem("handler", NULL, @1); } | NO HANDLER { $$ = makeDefElem("handler", NULL, @1); }
| VALIDATOR handler_name { $$ = makeDefElem("validator", (Node *) $2, @1); } | VALIDATOR handler_name { $$ = makeDefElem("validator", (Node *) $2, @1); }
| NO VALIDATOR { $$ = makeDefElem("validator", NULL, @1); } | NO VALIDATOR { $$ = makeDefElem("validator", NULL, @1); }
| CONNECTION handler_name { $$ = makeDefElem("connection", (Node *) $2, @1); }
| NO CONNECTION { $$ = makeDefElem("connection", NULL, @1); }
; ;
fdw_options: fdw_options:
@ -11057,6 +11059,16 @@ CreateSubscriptionStmt:
n->options = $8; n->options = $8;
$$ = (Node *) n; $$ = (Node *) n;
} }
| CREATE SUBSCRIPTION name SERVER name PUBLICATION name_list opt_definition
{
CreateSubscriptionStmt *n =
makeNode(CreateSubscriptionStmt);
n->subname = $3;
n->servername = $5;
n->publication = $7;
n->options = $8;
$$ = (Node *) n;
}
; ;
/***************************************************************************** /*****************************************************************************
@ -11086,6 +11098,16 @@ AlterSubscriptionStmt:
n->conninfo = $5; n->conninfo = $5;
$$ = (Node *) n; $$ = (Node *) n;
} }
| ALTER SUBSCRIPTION name SERVER name
{
AlterSubscriptionStmt *n =
makeNode(AlterSubscriptionStmt);
n->kind = ALTER_SUBSCRIPTION_SERVER;
n->subname = $3;
n->servername = $5;
$$ = (Node *) n;
}
| ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition | ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition
{ {
AlterSubscriptionStmt *n = AlterSubscriptionStmt *n =

View file

@ -5059,7 +5059,7 @@ maybe_reread_subscription(void)
/* Ensure allocations in permanent context. */ /* Ensure allocations in permanent context. */
oldctx = MemoryContextSwitchTo(ApplyContext); oldctx = MemoryContextSwitchTo(ApplyContext);
newsub = GetSubscription(MyLogicalRepWorker->subid, true); newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
/* /*
* Exit if the subscription was removed. This normally should not happen * Exit if the subscription was removed. This normally should not happen
@ -5201,7 +5201,9 @@ set_wal_receiver_timeout(void)
} }
/* /*
* Callback from subscription syscache invalidation. * Callback from subscription syscache invalidation. Also needed for server or
* user mapping invalidation, which can change the connection information for
* subscriptions that connect using a server object.
*/ */
static void static void
subscription_change_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue) subscription_change_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
@ -5806,7 +5808,7 @@ InitializeLogRepWorker(void)
*/ */
LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0, LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
AccessShareLock); AccessShareLock);
MySubscription = GetSubscription(MyLogicalRepWorker->subid, true); MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
if (!MySubscription) if (!MySubscription)
{ {
ereport(LOG, ereport(LOG,
@ -5871,6 +5873,22 @@ InitializeLogRepWorker(void)
CacheRegisterSyscacheCallback(SUBSCRIPTIONOID, CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
subscription_change_cb, subscription_change_cb,
(Datum) 0); (Datum) 0);
/* Changes to foreign servers may affect subscriptions using SERVER. */
CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
subscription_change_cb,
(Datum) 0);
/* Changes to user mappings may affect subscriptions using SERVER. */
CacheRegisterSyscacheCallback(USERMAPPINGOID,
subscription_change_cb,
(Datum) 0);
/*
* Changes to FDW connection_function may affect subscriptions using
* SERVER.
*/
CacheRegisterSyscacheCallback(FOREIGNDATAWRAPPEROID,
subscription_change_cb,
(Datum) 0);
CacheRegisterSyscacheCallback(AUTHOID, CacheRegisterSyscacheCallback(AUTHOID,
subscription_change_cb, subscription_change_cb,

View file

@ -5182,6 +5182,7 @@ getSubscriptions(Archive *fout)
int i_subdisableonerr; int i_subdisableonerr;
int i_subpasswordrequired; int i_subpasswordrequired;
int i_subrunasowner; int i_subrunasowner;
int i_subservername;
int i_subconninfo; int i_subconninfo;
int i_subslotname; int i_subslotname;
int i_subsynccommit; int i_subsynccommit;
@ -5286,14 +5287,24 @@ getSubscriptions(Archive *fout)
if (fout->remoteVersion >= 190000) if (fout->remoteVersion >= 190000)
appendPQExpBufferStr(query, appendPQExpBufferStr(query,
" s.subwalrcvtimeout\n"); " s.subwalrcvtimeout,\n");
else else
appendPQExpBufferStr(query, appendPQExpBufferStr(query,
" '-1' AS subwalrcvtimeout\n"); " '-1' AS subwalrcvtimeout,\n");
if (fout->remoteVersion >= 190000)
appendPQExpBufferStr(query, " fs.srvname AS subservername\n");
else
appendPQExpBufferStr(query, " NULL AS subservername\n");
appendPQExpBufferStr(query, appendPQExpBufferStr(query,
"FROM pg_subscription s\n"); "FROM pg_subscription s\n");
if (fout->remoteVersion >= 190000)
appendPQExpBufferStr(query,
"LEFT JOIN pg_catalog.pg_foreign_server fs \n"
" ON fs.oid = s.subserver \n");
if (dopt->binary_upgrade && fout->remoteVersion >= 170000) if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
appendPQExpBufferStr(query, appendPQExpBufferStr(query,
"LEFT JOIN pg_catalog.pg_replication_origin_status o \n" "LEFT JOIN pg_catalog.pg_replication_origin_status o \n"
@ -5325,6 +5336,7 @@ getSubscriptions(Archive *fout)
i_subfailover = PQfnumber(res, "subfailover"); i_subfailover = PQfnumber(res, "subfailover");
i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples"); i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples");
i_submaxretention = PQfnumber(res, "submaxretention"); i_submaxretention = PQfnumber(res, "submaxretention");
i_subservername = PQfnumber(res, "subservername");
i_subconninfo = PQfnumber(res, "subconninfo"); i_subconninfo = PQfnumber(res, "subconninfo");
i_subslotname = PQfnumber(res, "subslotname"); i_subslotname = PQfnumber(res, "subslotname");
i_subsynccommit = PQfnumber(res, "subsynccommit"); i_subsynccommit = PQfnumber(res, "subsynccommit");
@ -5347,6 +5359,10 @@ getSubscriptions(Archive *fout)
subinfo[i].subenabled = subinfo[i].subenabled =
(strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0); (strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0);
if (PQgetisnull(res, i, i_subservername))
subinfo[i].subservername = NULL;
else
subinfo[i].subservername = pg_strdup(PQgetvalue(res, i, i_subservername));
subinfo[i].subbinary = subinfo[i].subbinary =
(strcmp(PQgetvalue(res, i, i_subbinary), "t") == 0); (strcmp(PQgetvalue(res, i, i_subbinary), "t") == 0);
subinfo[i].substream = *(PQgetvalue(res, i, i_substream)); subinfo[i].substream = *(PQgetvalue(res, i, i_substream));
@ -5363,8 +5379,11 @@ getSubscriptions(Archive *fout)
(strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0); (strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0);
subinfo[i].submaxretention = subinfo[i].submaxretention =
atoi(PQgetvalue(res, i, i_submaxretention)); atoi(PQgetvalue(res, i, i_submaxretention));
subinfo[i].subconninfo = if (PQgetisnull(res, i, i_subconninfo))
pg_strdup(PQgetvalue(res, i, i_subconninfo)); subinfo[i].subconninfo = NULL;
else
subinfo[i].subconninfo =
pg_strdup(PQgetvalue(res, i, i_subconninfo));
if (PQgetisnull(res, i, i_subslotname)) if (PQgetisnull(res, i, i_subslotname))
subinfo[i].subslotname = NULL; subinfo[i].subslotname = NULL;
else else
@ -5575,9 +5594,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n", appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
qsubname); qsubname);
appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ", appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s ",
qsubname); qsubname);
appendStringLiteralAH(query, subinfo->subconninfo, fout); if (subinfo->subservername)
{
appendPQExpBuffer(query, "SERVER %s", fmtId(subinfo->subservername));
}
else
{
appendPQExpBuffer(query, "CONNECTION ");
appendStringLiteralAH(query, subinfo->subconninfo, fout);
}
/* Build list of quoted publications and append them to query. */ /* Build list of quoted publications and append them to query. */
if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames)) if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))

View file

@ -720,6 +720,7 @@ typedef struct _SubscriptionInfo
bool subfailover; bool subfailover;
bool subretaindeadtuples; bool subretaindeadtuples;
int submaxretention; int submaxretention;
char *subservername;
char *subconninfo; char *subconninfo;
char *subslotname; char *subslotname;
char *subsynccommit; char *subsynccommit;

View file

@ -6895,7 +6895,7 @@ describeSubscriptions(const char *pattern, bool verbose)
printQueryOpt myopt = pset.popt; printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false, static const bool translate_columns[] = {false, false, false, false,
false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false,
false, false, false, false, false, false}; false, false, false, false, false, false, false};
if (pset.sversion < 100000) if (pset.sversion < 100000)
{ {
@ -6965,6 +6965,10 @@ describeSubscriptions(const char *pattern, bool verbose)
gettext_noop("Failover")); gettext_noop("Failover"));
if (pset.sversion >= 190000) if (pset.sversion >= 190000)
{ {
appendPQExpBuffer(&buf,
", (select srvname from pg_foreign_server where oid=subserver) AS \"%s\"\n",
gettext_noop("Server"));
appendPQExpBuffer(&buf, appendPQExpBuffer(&buf,
", subretaindeadtuples AS \"%s\"\n", ", subretaindeadtuples AS \"%s\"\n",
gettext_noop("Retain dead tuples")); gettext_noop("Retain dead tuples"));

View file

@ -2332,7 +2332,7 @@ match_previous_words(int pattern_id,
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny)) else if (Matches("ALTER", "SUBSCRIPTION", MatchAny))
COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO", COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO",
"RENAME TO", "REFRESH PUBLICATION", "REFRESH SEQUENCES", "RENAME TO", "REFRESH PUBLICATION", "REFRESH SEQUENCES",
"SET", "SKIP (", "ADD PUBLICATION", "DROP PUBLICATION"); "SERVER", "SET", "SKIP (", "ADD PUBLICATION", "DROP PUBLICATION");
/* ALTER SUBSCRIPTION <name> REFRESH */ /* ALTER SUBSCRIPTION <name> REFRESH */
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH")) else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH"))
COMPLETE_WITH("PUBLICATION", "SEQUENCES"); COMPLETE_WITH("PUBLICATION", "SEQUENCES");
@ -3870,9 +3870,16 @@ match_previous_words(int pattern_id,
/* CREATE SUBSCRIPTION */ /* CREATE SUBSCRIPTION */
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny)) else if (Matches("CREATE", "SUBSCRIPTION", MatchAny))
COMPLETE_WITH("CONNECTION"); COMPLETE_WITH("SERVER", "CONNECTION");
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "SERVER", MatchAny))
COMPLETE_WITH("PUBLICATION");
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny)) else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny))
COMPLETE_WITH("PUBLICATION"); COMPLETE_WITH("PUBLICATION");
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "SERVER",
MatchAny, "PUBLICATION"))
{
/* complete with nothing here as this refers to remote publications */
}
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION",
MatchAny, "PUBLICATION")) MatchAny, "PUBLICATION"))
{ {

View file

@ -57,6 +57,6 @@
*/ */
/* yyyymmddN */ /* yyyymmddN */
#define CATALOG_VERSION_NO 202603061 #define CATALOG_VERSION_NO 202603062
#endif #endif

View file

@ -38,6 +38,9 @@ CATALOG(pg_foreign_data_wrapper,2328,ForeignDataWrapperRelationId)
Oid fdwvalidator BKI_LOOKUP_OPT(pg_proc); /* option validation Oid fdwvalidator BKI_LOOKUP_OPT(pg_proc); /* option validation
* function, or 0 if * function, or 0 if
* none */ * none */
Oid fdwconnection BKI_LOOKUP_OPT(pg_proc); /* connection string
* function, or 0 if
* none */
#ifdef CATALOG_VARLEN /* variable-length fields start here */ #ifdef CATALOG_VARLEN /* variable-length fields start here */
aclitem fdwacl[1]; /* access permissions */ aclitem fdwacl[1]; /* access permissions */

View file

@ -92,9 +92,12 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
* exceeded max_retention_duration, when * exceeded max_retention_duration, when
* defined */ * defined */
Oid subserver BKI_LOOKUP_OPT(pg_foreign_server); /* If connection uses
* server */
#ifdef CATALOG_VARLEN /* variable-length fields start here */ #ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */ /* Connection string to the publisher */
text subconninfo BKI_FORCE_NOT_NULL; text subconninfo; /* Set if connecting with connection string */
/* Slot name on publisher */ /* Slot name on publisher */
NameData subslotname BKI_FORCE_NULL; NameData subslotname BKI_FORCE_NULL;
@ -207,7 +210,8 @@ typedef struct Subscription
#endif /* EXPOSE_TO_CLIENT_CODE */ #endif /* EXPOSE_TO_CLIENT_CODE */
extern Subscription *GetSubscription(Oid subid, bool missing_ok); extern Subscription *GetSubscription(Oid subid, bool missing_ok,
bool aclcheck);
extern void FreeSubscription(Subscription *sub); extern void FreeSubscription(Subscription *sub);
extern void DisableSubscription(Oid subid); extern void DisableSubscription(Oid subid);

View file

@ -28,6 +28,7 @@ typedef struct ForeignDataWrapper
char *fdwname; /* Name of the FDW */ char *fdwname; /* Name of the FDW */
Oid fdwhandler; /* Oid of handler function, or 0 */ Oid fdwhandler; /* Oid of handler function, or 0 */
Oid fdwvalidator; /* Oid of validator function, or 0 */ Oid fdwvalidator; /* Oid of validator function, or 0 */
Oid fdwconnection; /* Oid of connection string function, or 0 */
List *options; /* fdwoptions as DefElem list */ List *options; /* fdwoptions as DefElem list */
} ForeignDataWrapper; } ForeignDataWrapper;
@ -65,10 +66,12 @@ typedef struct ForeignTable
extern ForeignServer *GetForeignServer(Oid serverid); extern ForeignServer *GetForeignServer(Oid serverid);
extern char *ForeignServerName(Oid serverid);
extern ForeignServer *GetForeignServerExtended(Oid serverid, extern ForeignServer *GetForeignServerExtended(Oid serverid,
bits16 flags); bits16 flags);
extern ForeignServer *GetForeignServerByName(const char *srvname, extern ForeignServer *GetForeignServerByName(const char *srvname,
bool missing_ok); bool missing_ok);
extern char *ForeignServerConnectionString(Oid userid, Oid serverid);
extern UserMapping *GetUserMapping(Oid userid, Oid serverid); extern UserMapping *GetUserMapping(Oid userid, Oid serverid);
extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid); extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid);
extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid, extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid,

View file

@ -4383,6 +4383,7 @@ typedef struct CreateSubscriptionStmt
{ {
NodeTag type; NodeTag type;
char *subname; /* Name of the subscription */ char *subname; /* Name of the subscription */
char *servername; /* Server name of publisher */
char *conninfo; /* Connection string to publisher */ char *conninfo; /* Connection string to publisher */
List *publication; /* One or more publication to subscribe to */ List *publication; /* One or more publication to subscribe to */
List *options; /* List of DefElem nodes */ List *options; /* List of DefElem nodes */
@ -4391,6 +4392,7 @@ typedef struct CreateSubscriptionStmt
typedef enum AlterSubscriptionType typedef enum AlterSubscriptionType
{ {
ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_OPTIONS,
ALTER_SUBSCRIPTION_SERVER,
ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_CONNECTION,
ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_SET_PUBLICATION,
ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_ADD_PUBLICATION,
@ -4406,6 +4408,7 @@ typedef struct AlterSubscriptionStmt
NodeTag type; NodeTag type;
AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */ AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */
char *subname; /* Name of the subscription */ char *subname; /* Name of the subscription */
char *servername; /* Server name of publisher */
char *conninfo; /* Connection string to publisher */ char *conninfo; /* Connection string to publisher */
List *publication; /* One or more publication to subscribe to */ List *publication; /* One or more publication to subscribe to */
List *options; /* List of DefElem nodes */ List *options; /* List of DefElem nodes */

View file

@ -224,6 +224,7 @@ NOTICE: checking pg_extension {extconfig} => pg_class {oid}
NOTICE: checking pg_foreign_data_wrapper {fdwowner} => pg_authid {oid} NOTICE: checking pg_foreign_data_wrapper {fdwowner} => pg_authid {oid}
NOTICE: checking pg_foreign_data_wrapper {fdwhandler} => pg_proc {oid} NOTICE: checking pg_foreign_data_wrapper {fdwhandler} => pg_proc {oid}
NOTICE: checking pg_foreign_data_wrapper {fdwvalidator} => pg_proc {oid} NOTICE: checking pg_foreign_data_wrapper {fdwvalidator} => pg_proc {oid}
NOTICE: checking pg_foreign_data_wrapper {fdwconnection} => pg_proc {oid}
NOTICE: checking pg_foreign_server {srvowner} => pg_authid {oid} NOTICE: checking pg_foreign_server {srvowner} => pg_authid {oid}
NOTICE: checking pg_foreign_server {srvfdw} => pg_foreign_data_wrapper {oid} NOTICE: checking pg_foreign_server {srvfdw} => pg_foreign_data_wrapper {oid}
NOTICE: checking pg_user_mapping {umuser} => pg_authid {oid} NOTICE: checking pg_user_mapping {umuser} => pg_authid {oid}
@ -269,5 +270,6 @@ NOTICE: checking pg_publication_rel {prpubid} => pg_publication {oid}
NOTICE: checking pg_publication_rel {prrelid} => pg_class {oid} NOTICE: checking pg_publication_rel {prrelid} => pg_class {oid}
NOTICE: checking pg_subscription {subdbid} => pg_database {oid} NOTICE: checking pg_subscription {subdbid} => pg_database {oid}
NOTICE: checking pg_subscription {subowner} => pg_authid {oid} NOTICE: checking pg_subscription {subowner} => pg_authid {oid}
NOTICE: checking pg_subscription {subserver} => pg_foreign_server {oid}
NOTICE: checking pg_subscription_rel {srsubid} => pg_subscription {oid} NOTICE: checking pg_subscription_rel {srsubid} => pg_subscription {oid}
NOTICE: checking pg_subscription_rel {srrelid} => pg_class {oid} NOTICE: checking pg_subscription_rel {srrelid} => pg_class {oid}

View file

@ -1,6 +1,14 @@
-- --
-- SUBSCRIPTION -- SUBSCRIPTION
-- --
-- directory paths and dlsuffix are passed to us in environment variables
\getenv libdir PG_LIBDIR
\getenv dlsuffix PG_DLSUFFIX
\set regresslib :libdir '/regress' :dlsuffix
CREATE FUNCTION test_fdw_connection(oid, oid, internal)
RETURNS text
AS :'regresslib', 'test_fdw_connection'
LANGUAGE C;
CREATE ROLE regress_subscription_user LOGIN SUPERUSER; CREATE ROLE regress_subscription_user LOGIN SUPERUSER;
CREATE ROLE regress_subscription_user2; CREATE ROLE regress_subscription_user2;
CREATE ROLE regress_subscription_user3 IN ROLE pg_create_subscription; CREATE ROLE regress_subscription_user3 IN ROLE pg_create_subscription;
@ -116,18 +124,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
WARNING: subscription was created, but is not connected WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+ regress_testsub4 \dRs+ regress_testsub4
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
\dRs+ regress_testsub4 \dRs+ regress_testsub4
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row) (1 row)
DROP SUBSCRIPTION regress_testsub3; DROP SUBSCRIPTION regress_testsub3;
@ -140,15 +148,53 @@ ERROR: invalid connection string syntax: invalid connection option "i_dont_exis
-- connecting, so this is reliable and safe) -- connecting, so this is reliable and safe)
CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub; CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub;
ERROR: subscription "regress_testsub5" could not connect to the publisher: invalid port number: "-1" ERROR: subscription "regress_testsub5" could not connect to the publisher: invalid port number: "-1"
CREATE FOREIGN DATA WRAPPER test_fdw;
CREATE SERVER test_server FOREIGN DATA WRAPPER test_fdw;
GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
SET SESSION AUTHORIZATION regress_subscription_user3;
-- fail, need USAGE privileges on server
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
ERROR: permission denied for foreign server test_server
RESET SESSION AUTHORIZATION;
GRANT USAGE ON FOREIGN SERVER test_server TO regress_subscription_user3;
SET SESSION AUTHORIZATION regress_subscription_user3;
-- fail, need user mapping
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
ERROR: user mapping not found for user "regress_subscription_user3", server "test_server"
CREATE USER MAPPING FOR regress_subscription_user3 SERVER test_server OPTIONS(user 'foo', password 'secret');
-- fail, need CONNECTION clause
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
ERROR: foreign data wrapper "test_fdw" does not support subscription connections
DETAIL: Foreign data wrapper must be defined with CONNECTION specified.
RESET SESSION AUTHORIZATION;
ALTER FOREIGN DATA WRAPPER test_fdw CONNECTION test_fdw_connection;
SET SESSION AUTHORIZATION regress_subscription_user3;
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = 'dummy', connect = false);
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
DROP USER MAPPING FOR regress_subscription_user3 SERVER test_server;
RESET SESSION AUTHORIZATION;
REVOKE USAGE ON FOREIGN SERVER test_server FROM regress_subscription_user3;
SET SESSION AUTHORIZATION regress_subscription_user3;
-- fail, must connect but lacks USAGE on server, as well as user mapping
DROP SUBSCRIPTION regress_testsub6;
ERROR: could not connect to publisher when attempting to drop replication slot "dummy": subscription owner "regress_subscription_user3" does not have permission on foreign server "test_server"
HINT: Use ALTER SUBSCRIPTION ... DISABLE to disable the subscription, and then use ALTER SUBSCRIPTION ... SET (slot_name = NONE) to disassociate it from the slot.
ALTER SUBSCRIPTION regress_testsub6 SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub6;
SET SESSION AUTHORIZATION regress_subscription_user;
REVOKE CREATE ON DATABASE REGRESSION FROM regress_subscription_user3;
DROP SERVER test_server;
DROP FOREIGN DATA WRAPPER test_fdw;
-- fail - invalid connection string during ALTER -- fail - invalid connection string during ALTER
ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | test subscription regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | test subscription
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@ -157,10 +203,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+------------------- -----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00000000 | test subscription regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00000000 | test subscription
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET (password_required = true); ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@ -176,10 +222,10 @@ ERROR: unrecognized subscription parameter: "create_slot"
-- ok -- ok
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+------------------- -----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00012345 | test subscription regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00012345 | test subscription
(1 row) (1 row)
-- ok - with lsn = NONE -- ok - with lsn = NONE
@ -188,10 +234,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
ERROR: invalid WAL location (LSN): 0/0 ERROR: invalid WAL location (LSN): 0/0
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+------------------- -----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00000000 | test subscription regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00000000 | test subscription
(1 row) (1 row)
BEGIN; BEGIN;
@ -227,10 +273,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (wal_receiver_timeout = '80s');
ALTER SUBSCRIPTION regress_testsub_foo SET (wal_receiver_timeout = 'foobar'); ALTER SUBSCRIPTION regress_testsub_foo SET (wal_receiver_timeout = 'foobar');
ERROR: invalid value for parameter "wal_receiver_timeout": "foobar" ERROR: invalid value for parameter "wal_receiver_timeout": "foobar"
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+------------------- ---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 80s | 0/00000000 | test subscription regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | | f | 0 | f | local | dbname=regress_doesnotexist2 | 80s | 0/00000000 | test subscription
(1 row) (1 row)
-- rename back to keep the rest simple -- rename back to keep the rest simple
@ -259,19 +305,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row) (1 row)
DROP SUBSCRIPTION regress_testsub; DROP SUBSCRIPTION regress_testsub;
@ -283,27 +329,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row) (1 row)
-- fail - publication already exists -- fail - publication already exists
@ -318,10 +364,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
ERROR: publication "testpub1" is already in subscription "regress_testsub" ERROR: publication "testpub1" is already in subscription "regress_testsub"
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- -----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row) (1 row)
-- fail - publication used more than once -- fail - publication used more than once
@ -336,10 +382,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub"
-- ok - delete publications -- ok - delete publications
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row) (1 row)
DROP SUBSCRIPTION regress_testsub; DROP SUBSCRIPTION regress_testsub;
@ -375,19 +421,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row) (1 row)
-- we can alter streaming when two_phase enabled -- we can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true); ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@ -397,10 +443,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@ -413,18 +459,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@ -437,10 +483,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@ -454,19 +500,19 @@ NOTICE: max_retention_duration is ineffective when retain_dead_tuples is disabl
WARNING: subscription was created, but is not connected WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 1000 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row) (1 row)
-- ok -- ok
ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0); ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0);
\dRs+ \dRs+
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);

View file

@ -729,6 +729,13 @@ test_fdw_handler(PG_FUNCTION_ARGS)
PG_RETURN_NULL(); PG_RETURN_NULL();
} }
PG_FUNCTION_INFO_V1(test_fdw_connection);
Datum
test_fdw_connection(PG_FUNCTION_ARGS)
{
PG_RETURN_TEXT_P(cstring_to_text("dbname=regress_doesnotexist user=doesnotexist password=secret"));
}
PG_FUNCTION_INFO_V1(is_catalog_text_unique_index_oid); PG_FUNCTION_INFO_V1(is_catalog_text_unique_index_oid);
Datum Datum
is_catalog_text_unique_index_oid(PG_FUNCTION_ARGS) is_catalog_text_unique_index_oid(PG_FUNCTION_ARGS)

View file

@ -2,6 +2,17 @@
-- SUBSCRIPTION -- SUBSCRIPTION
-- --
-- directory paths and dlsuffix are passed to us in environment variables
\getenv libdir PG_LIBDIR
\getenv dlsuffix PG_DLSUFFIX
\set regresslib :libdir '/regress' :dlsuffix
CREATE FUNCTION test_fdw_connection(oid, oid, internal)
RETURNS text
AS :'regresslib', 'test_fdw_connection'
LANGUAGE C;
CREATE ROLE regress_subscription_user LOGIN SUPERUSER; CREATE ROLE regress_subscription_user LOGIN SUPERUSER;
CREATE ROLE regress_subscription_user2; CREATE ROLE regress_subscription_user2;
CREATE ROLE regress_subscription_user3 IN ROLE pg_create_subscription; CREATE ROLE regress_subscription_user3 IN ROLE pg_create_subscription;
@ -85,6 +96,50 @@ CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'i_dont_exist=param' PUBLICATION
-- connecting, so this is reliable and safe) -- connecting, so this is reliable and safe)
CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub; CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub;
CREATE FOREIGN DATA WRAPPER test_fdw;
CREATE SERVER test_server FOREIGN DATA WRAPPER test_fdw;
GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
SET SESSION AUTHORIZATION regress_subscription_user3;
-- fail, need USAGE privileges on server
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
RESET SESSION AUTHORIZATION;
GRANT USAGE ON FOREIGN SERVER test_server TO regress_subscription_user3;
SET SESSION AUTHORIZATION regress_subscription_user3;
-- fail, need user mapping
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
CREATE USER MAPPING FOR regress_subscription_user3 SERVER test_server OPTIONS(user 'foo', password 'secret');
-- fail, need CONNECTION clause
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
RESET SESSION AUTHORIZATION;
ALTER FOREIGN DATA WRAPPER test_fdw CONNECTION test_fdw_connection;
SET SESSION AUTHORIZATION regress_subscription_user3;
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = 'dummy', connect = false);
DROP USER MAPPING FOR regress_subscription_user3 SERVER test_server;
RESET SESSION AUTHORIZATION;
REVOKE USAGE ON FOREIGN SERVER test_server FROM regress_subscription_user3;
SET SESSION AUTHORIZATION regress_subscription_user3;
-- fail, must connect but lacks USAGE on server, as well as user mapping
DROP SUBSCRIPTION regress_testsub6;
ALTER SUBSCRIPTION regress_testsub6 SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub6;
SET SESSION AUTHORIZATION regress_subscription_user;
REVOKE CREATE ON DATABASE REGRESSION FROM regress_subscription_user3;
DROP SERVER test_server;
DROP FOREIGN DATA WRAPPER test_fdw;
-- fail - invalid connection string during ALTER -- fail - invalid connection string during ALTER
ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';