Enhance slot synchronization API to respect promotion signal.

Previously, during a promotion, only the slot synchronization worker was
signaled to shut down. The backend executing slot synchronization via the
pg_sync_replication_slots() SQL function was not signaled, allowing it to
complete its synchronization cycle before exiting.

An upcoming patch improves pg_sync_replication_slots() to wait until
replication slots are fully persisted before finishing. This behaviour
requires the backend to exit promptly if a promotion occurs.

This patch ensures that, during promotion, a signal is also sent to the
backend running pg_sync_replication_slots(), allowing it to be interrupted
and exit immediately.

This change was originally committed to master only. However, backpatch
it to v17, where slot synchronization was introduced. Because it is required
for an upcoming bug fix addressing slotsync (including
pg_sync_replication_slots()) blocking promotion when stuck in a wait.

Author: Ajin Cherian <itsajin@gmail.com>
Reviewed-by: Shveta Malik <shveta.malik@gmail.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAFPTHDZAA%2BgWDntpa5ucqKKba41%3DtXmoXqN3q4rpjO9cdxgQrw%40mail.gmail.com
Backpatch-through: 17
This commit is contained in:
Amit Kapila 2025-12-11 03:49:28 +00:00 committed by Fujii Masao
parent 681a91d29d
commit 4bed04d395

View file

@ -74,11 +74,14 @@
/*
* Struct for sharing information to control slot synchronization.
*
* The slot sync worker's pid is needed by the startup process to shut it
* down during promotion. The startup process shuts down the slot sync worker
* and also sets stopSignaled=true to handle the race condition when the
* The 'pid' is either the slot sync worker's pid or the backend's pid running
* the SQL function pg_sync_replication_slots(). When the startup process sets
* 'stopSignaled' during promotion, it uses this 'pid' to wake up the currently
* synchronizing process so that the process can immediately stop its
* synchronizing work on seeing 'stopSignaled' set.
* Setting 'stopSignaled' is also used to handle the race condition when the
* postmaster has not noticed the promotion yet and thus may end up restarting
* the slot sync worker. If stopSignaled is set, the worker will exit in such a
* the slot sync worker. If 'stopSignaled' is set, the worker will exit in such a
* case. The SQL function pg_sync_replication_slots() will also error out if
* this flag is set. Note that we don't need to reset this variable as after
* promotion the slot sync worker won't be restarted because the pmState
@ -1100,10 +1103,10 @@ ValidateSlotSyncParams(int elevel)
}
/*
* Re-read the config file.
* Re-read the config file for slot synchronization.
*
* Exit if any of the slot sync GUCs have changed. The postmaster will
* restart it.
* Exit or throw error if relevant GUCs have changed depending on whether
* called from slot sync worker or from the SQL function pg_sync_replication_slots()
*/
static void
slotsync_reread_config(void)
@ -1114,8 +1117,11 @@ slotsync_reread_config(void)
bool old_hot_standby_feedback = hot_standby_feedback;
bool conninfo_changed;
bool primary_slotname_changed;
bool is_slotsync_worker = AmLogicalSlotSyncWorkerProcess();
bool parameter_changed = false;
Assert(sync_replication_slots);
if (is_slotsync_worker)
Assert(sync_replication_slots);
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
@ -1127,32 +1133,60 @@ slotsync_reread_config(void)
if (old_sync_replication_slots != sync_replication_slots)
{
ereport(LOG,
/* translator: %s is a GUC variable name */
errmsg("replication slot synchronization worker will shut down because \"%s\" is disabled", "sync_replication_slots"));
proc_exit(0);
if (is_slotsync_worker)
{
ereport(LOG,
/* translator: %s is a GUC variable name */
errmsg("replication slot synchronization worker will stop because \"%s\" is disabled",
"sync_replication_slots"));
proc_exit(0);
}
parameter_changed = true;
}
else
{
if (conninfo_changed ||
primary_slotname_changed ||
(old_hot_standby_feedback != hot_standby_feedback))
{
if (is_slotsync_worker)
{
ereport(LOG,
errmsg("replication slot synchronization worker will restart because of a parameter change"));
/*
* Reset the last-start time for this worker so that the
* postmaster can restart it without waiting for
* SLOTSYNC_RESTART_INTERVAL_SEC.
*/
SlotSyncCtx->last_start_time = 0;
proc_exit(0);
}
parameter_changed = true;
}
}
if (conninfo_changed ||
primary_slotname_changed ||
(old_hot_standby_feedback != hot_standby_feedback))
/*
* If we have reached here with a parameter change, we must be running in
* SQL function, emit error in such a case.
*/
if (parameter_changed)
{
ereport(LOG,
errmsg("replication slot synchronization worker will restart because of a parameter change"));
/*
* Reset the last-start time for this worker so that the postmaster
* can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
*/
SlotSyncCtx->last_start_time = 0;
proc_exit(0);
Assert(!is_slotsync_worker);
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("replication slot synchronization will stop because of a parameter change"));
}
}
/*
* Interrupt handler for main loop of slot sync worker.
* Interrupt handler for process performing slot synchronization.
*/
static void
ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
@ -1161,10 +1195,23 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
if (SlotSyncCtx->stopSignaled)
{
ereport(LOG,
errmsg("replication slot synchronization worker is shutting down because promotion is triggered"));
if (AmLogicalSlotSyncWorkerProcess())
{
ereport(LOG,
errmsg("replication slot synchronization worker will stop because promotion is triggered"));
proc_exit(0);
proc_exit(0);
}
else
{
/*
* For the backend executing SQL function
* pg_sync_replication_slots().
*/
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("replication slot synchronization will stop because promotion is triggered"));
}
}
if (ConfigReloadPending)
@ -1267,29 +1314,14 @@ wait_for_slot_activity(bool some_slot_updated)
}
/*
* Emit an error if a promotion or a concurrent sync call is in progress.
* Emit an error if a concurrent sync call is in progress.
* Otherwise, advertise that a sync is in progress.
*/
static void
check_and_set_sync_info(pid_t worker_pid)
check_and_set_sync_info(pid_t sync_process_pid)
{
SpinLockAcquire(&SlotSyncCtx->mutex);
/* The worker pid must not be already assigned in SlotSyncCtx */
Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
/*
* Emit an error if startup process signaled the slot sync machinery to
* stop. See comments atop SlotSyncCtxStruct.
*/
if (SlotSyncCtx->stopSignaled)
{
SpinLockRelease(&SlotSyncCtx->mutex);
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
}
if (SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
@ -1298,13 +1330,16 @@ check_and_set_sync_info(pid_t worker_pid)
errmsg("cannot synchronize replication slots concurrently"));
}
/* The pid must not be already assigned in SlotSyncCtx */
Assert(SlotSyncCtx->pid == InvalidPid);
SlotSyncCtx->syncing = true;
/*
* Advertise the required PID so that the startup process can kill the
* slot sync worker on promotion.
* slot sync process on promotion.
*/
SlotSyncCtx->pid = worker_pid;
SlotSyncCtx->pid = sync_process_pid;
SpinLockRelease(&SlotSyncCtx->mutex);
@ -1319,6 +1354,7 @@ reset_syncing_flag()
{
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->syncing = false;
SlotSyncCtx->pid = InvalidPid;
SpinLockRelease(&SlotSyncCtx->mutex);
syncing_slots = false;
@ -1525,7 +1561,7 @@ update_synced_slots_inactive_since(void)
if (!StandbyMode)
return;
/* The slot sync worker or SQL function mustn't be running by now */
/* The slot sync worker or the SQL function mustn't be running by now */
Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@ -1556,16 +1592,18 @@ update_synced_slots_inactive_since(void)
}
/*
* Shut down the slot sync worker.
* Shut down slot synchronization.
*
* This function sends signal to shutdown slot sync worker, if required. It
* also waits till the slot sync worker has exited or
* This function sets stopSignaled=true and wakes up the slot sync process
* (either worker or backend running the SQL function pg_sync_replication_slots())
* so that worker can exit or the SQL function pg_sync_replication_slots() can
* finish. It also waits till the slot sync worker has exited or
* pg_sync_replication_slots() has finished.
*/
void
ShutDownSlotSync(void)
{
pid_t worker_pid;
pid_t sync_process_pid;
SpinLockAcquire(&SlotSyncCtx->mutex);
@ -1582,16 +1620,16 @@ ShutDownSlotSync(void)
return;
}
worker_pid = SlotSyncCtx->pid;
sync_process_pid = SlotSyncCtx->pid;
SpinLockRelease(&SlotSyncCtx->mutex);
/*
* Signal slotsync worker if it was still running. The worker will stop
* upon detecting that the stopSignaled flag is set to true.
* Signal process doing slotsync, if any. The process will stop upon
* detecting that the stopSignaled flag is set to true.
*/
if (worker_pid != InvalidPid)
kill(worker_pid, SIGUSR1);
if (sync_process_pid != InvalidPid)
kill(sync_process_pid, SIGUSR1);
/* Wait for slot sync to end */
for (;;)
@ -1734,7 +1772,10 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
check_and_set_sync_info(InvalidPid);
check_and_set_sync_info(MyProcPid);
/* Check for interrupts and config changes */
ProcessSlotSyncInterrupts();
validate_remote_info(wrconn);