Set notice receiver before libpq connection startup completes

Commit 112faf1378 added custom notice receivers for replication,
postgres_fdw, and dblink so that remote NOTICE, WARNING, and similar
messages are reported via ereport(). However, those notice receivers were
installed only after libpqsrv_connect() and libpqsrv_connect_params()
returned, by which point libpq connection startup had already completed.
As a result, messages emitted during connection establishment could be
missed.

This commit fixes the issue by splitting libpqsrv_connect() and
libpqsrv_connect_params() into separate start and complete phases:
libpqsrv_connect_start(), libpqsrv_connect_params_start(), and
libpqsrv_connect_complete(). This allows callers to perform
per-connection setup, such as installing a notice receiver, after the
connection has been started but before startup completes.

Note that callers of libpqsrv_connect_start() and
libpqsrv_connect_params_start() must still call
libpqsrv_connect_complete(), even if the start function returns NULL, so
that any external FDs reserved during startup are released properly.

Author: Chao Li <lic@highgo.com>
Reviewed-by: Fujii Masao <masao.fujii@gmail.com>
Reviewed-by: Vignesh C <vignesh21@gmail.com>
Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com>
Discussion: https://postgr.es/m/A2B8B7DE-C119-492F-A9FA-14CF86849777@gmail.com
This commit is contained in:
Fujii Masao 2026-05-23 00:25:48 +09:00
parent d8b5d87e54
commit 06a5c3cdef
4 changed files with 70 additions and 37 deletions

View file

@ -222,7 +222,10 @@ dblink_get_conn(char *conname_or_str,
dblink_we_get_conn = WaitEventExtensionNew("DblinkGetConnect");
/* OK to make connection */
conn = libpqsrv_connect(connstr, dblink_we_get_conn);
conn = libpqsrv_connect_start(connstr);
PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
"received message via remote connection");
libpqsrv_connect_complete(conn, dblink_we_get_conn);
if (PQstatus(conn) == CONNECTION_BAD)
{
@ -235,9 +238,6 @@ dblink_get_conn(char *conname_or_str,
errdetail_internal("%s", msg)));
}
PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
"received message via remote connection");
dblink_security_check(conn, NULL, connstr);
if (PQclientEncoding(conn) != GetDatabaseEncoding())
PQsetClientEncoding(conn, GetDatabaseEncodingName());
@ -321,7 +321,11 @@ dblink_connect(PG_FUNCTION_ARGS)
}
/* OK to make connection */
conn = libpqsrv_connect(connstr, dblink_we_connect);
conn = libpqsrv_connect_start(connstr);
if (conn != NULL)
PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
"received message via remote connection");
libpqsrv_connect_complete(conn, dblink_we_connect);
if (PQstatus(conn) == CONNECTION_BAD)
{
@ -336,9 +340,6 @@ dblink_connect(PG_FUNCTION_ARGS)
errdetail_internal("%s", msg)));
}
PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
"received message via remote connection");
/* check password actually used if not superuser */
dblink_security_check(conn, connname, connstr);

View file

@ -638,6 +638,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
const char **keywords;
const char **values;
char *appname;
PGconn *start_conn;
construct_connection_params(server, user, &keywords, &values, &appname);
@ -646,9 +647,12 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
/* OK to make connection */
conn = libpqsrv_connect_params(keywords, values,
false, /* expand_dbname */
pgfdw_we_connect);
start_conn = libpqsrv_connect_params_start(keywords, values,
/* expand_dbname = */ false);
PQsetNoticeReceiver(start_conn, libpqsrv_notice_receiver,
"received message via remote connection");
libpqsrv_connect_complete(start_conn, pgfdw_we_connect);
conn = start_conn;
if (!conn || PQstatus(conn) != CONNECTION_OK)
ereport(ERROR,
@ -657,9 +661,6 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
server->servername),
errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
"received message via remote connection");
/* Perform post-connection security checks. */
pgfdw_security_check(keywords, values, user, conn);

View file

@ -223,9 +223,12 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
conn = palloc0_object(WalReceiverConn);
conn->streamConn =
libpqsrv_connect_params(keys, vals,
/* expand_dbname = */ true,
WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
libpqsrv_connect_params_start(keys, vals,
/* expand_dbname = */ true);
PQsetNoticeReceiver(conn->streamConn, libpqsrv_notice_receiver,
"received message via replication");
libpqsrv_connect_complete(conn->streamConn,
WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
if (options_val != NULL)
pfree(options_val);
@ -245,9 +248,6 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
errhint("Target server's authentication method must be changed, or set password_required=false in the subscription parameters.")));
}
PQsetNoticeReceiver(conn->streamConn, libpqsrv_notice_receiver,
"received message via replication");
/*
* Set always-secure search path for the cases where the connection is
* used to run SQL queries, so malicious users can't get control.

View file

@ -39,10 +39,28 @@
static inline void libpqsrv_connect_prepare(void);
static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
static inline void libpqsrv_connect_complete(PGconn *conn, uint32 wait_event_info);
static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
/*
* Start a connection using PQconnectStart().
*
* The returned connection has not yet completed its startup sequence. Callers
* may perform per-connection setup, such as installing a notice receiver,
* before calling libpqsrv_connect_complete().
*
* Callers must call libpqsrv_connect_complete(), even if this function returns
* NULL, because libpqsrv_connect_prepare() may already have reserved an
* external FD that must be released.
*/
static inline PGconn *
libpqsrv_connect_start(const char *conninfo)
{
libpqsrv_connect_prepare();
return PQconnectStart(conninfo);
}
/*
* PQconnectdb() wrapper that reserves a file descriptor and processes
@ -55,17 +73,30 @@ static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info
static inline PGconn *
libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
{
PGconn *conn = NULL;
PGconn *conn;
libpqsrv_connect_prepare();
conn = libpqsrv_connect_start(conninfo);
conn = PQconnectStart(conninfo);
libpqsrv_connect_internal(conn, wait_event_info);
libpqsrv_connect_complete(conn, wait_event_info);
return conn;
}
/*
* Start a connection using PQconnectStartParams().
*
* See libpqsrv_connect_start() for the resource-lifetime rules.
*/
static inline PGconn *
libpqsrv_connect_params_start(const char *const *keywords,
const char *const *values,
int expand_dbname)
{
libpqsrv_connect_prepare();
return PQconnectStartParams(keywords, values, expand_dbname);
}
/*
* Like libpqsrv_connect(), except that this is a wrapper for
* PQconnectdbParams().
@ -76,13 +107,11 @@ libpqsrv_connect_params(const char *const *keywords,
int expand_dbname,
uint32 wait_event_info)
{
PGconn *conn = NULL;
PGconn *conn;
libpqsrv_connect_prepare();
conn = libpqsrv_connect_params_start(keywords, values, expand_dbname);
conn = PQconnectStartParams(keywords, values, expand_dbname);
libpqsrv_connect_internal(conn, wait_event_info);
libpqsrv_connect_complete(conn, wait_event_info);
return conn;
}
@ -90,8 +119,9 @@ libpqsrv_connect_params(const char *const *keywords,
/*
* PQfinish() wrapper that additionally releases the reserved file descriptor.
*
* It is allowed to call this with a NULL pgconn iff NULL was returned by
* libpqsrv_connect*.
* It is allowed to call this with NULL only when the external FD reservation
* has already been released, for example after calling
* libpqsrv_connect_complete() with a NULL connection.
*/
static inline void
libpqsrv_disconnect(PGconn *conn)
@ -101,7 +131,7 @@ libpqsrv_disconnect(PGconn *conn)
* already released it). This rule makes it easier to write PG_CATCH()
* handlers for this facility's users.
*
* See also libpqsrv_connect_internal().
* See also libpqsrv_connect_complete().
*/
if (conn == NULL)
return;
@ -111,7 +141,7 @@ libpqsrv_disconnect(PGconn *conn)
}
/* internal helper functions follow */
/* lower-level connection helper functions follow */
/*
@ -144,10 +174,11 @@ libpqsrv_connect_prepare(void)
}
/*
* Helper function for all connection establishment functions.
* Complete a connection started by libpqsrv_connect_start() or
* libpqsrv_connect_params_start().
*/
static inline void
libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
libpqsrv_connect_complete(PGconn *conn, uint32 wait_event_info)
{
/*
* With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do