Fix memory accumulation in pg_sync_replication_slots() during retries.

Unlike the slotsync worker, whose retry cycles are separated by
transaction boundaries, pg_sync_replication_slots() retries within a
single SQL function call. Per-cycle allocations for slot names, plugin
names, database names, and auxiliary list containers get accumulated
across retries until the function returned. Memory growth is proportional
to the number of retries and remote slots, and the function may wait an
extended period between cycles when slots are slow to persist.

Fix by running each retry cycle in a short-lived memory context
(sync_retry_ctx) that is reset before the next attempt. Additionally,
release tuple slots created with MakeSingleTupleTableSlot() before
clearing the walreceiver result.

Author: Xuneng Zhou <xunengzhou@gmail.com>
Reviewed-by: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CABPTF7VVPxgfYyr8Kyi=+JACjckQ6NpniV9eRtHboj2hMn0REw@mail.gmail.com
This commit is contained in:
Amit Kapila 2026-05-26 15:16:12 -07:00
parent 8656ba7f71
commit 490259d072

View file

@ -1016,6 +1016,7 @@ fetch_remote_slots(WalReceiverConn *wrconn, List *slot_names)
ExecClearTuple(tupslot);
}
ExecDropSingleTupleTableSlot(tupslot);
walrcv_clear_result(res);
return remote_slot_list;
@ -1135,7 +1136,7 @@ validate_remote_info(WalReceiverConn *wrconn)
errmsg("replication slot \"%s\" specified by \"%s\" does not exist on primary server",
PrimarySlotName, "primary_slot_name"));
ExecClearTuple(tupslot);
ExecDropSingleTupleTableSlot(tupslot);
walrcv_clear_result(res);
if (started_tx)
@ -2006,16 +2007,27 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
List *remote_slots = NIL;
List *slot_names = NIL; /* List of slot names to track */
MemoryContext sync_retry_ctx;
check_and_set_sync_info(MyProcPid);
validate_remote_info(wrconn);
/*
* Setup and use a per-sync-cycle memory context, which is reset every
* time we loop below. This avoids having to retail freeing the memory
* used in each sync cycle.
*/
sync_retry_ctx = AllocSetContextCreate(CurrentMemoryContext,
"slot sync retry context",
ALLOCSET_DEFAULT_SIZES);
/* Retry until all the slots are sync-ready */
for (;;)
{
bool slot_persistence_pending = false;
bool some_slot_updated = false;
MemoryContext oldctx;
/* Check for interrupts and config changes */
CHECK_FOR_INTERRUPTS();
@ -2026,6 +2038,9 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
/* We must be in a valid transaction state */
Assert(IsTransactionState());
MemoryContextReset(sync_retry_ctx);
oldctx = MemoryContextSwitchTo(sync_retry_ctx);
/*
* Fetch remote slot info for the given slot_names. If slot_names
* is NIL, fetch all failover-enabled slots. Note that we reuse
@ -2042,6 +2057,12 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
some_slot_updated = synchronize_slots(wrconn, remote_slots,
&slot_persistence_pending);
/*
* slot_names must survive later sync_retry_ctx resets, so copy it
* in the outer context.
*/
MemoryContextSwitchTo(oldctx);
/*
* If slot_persistence_pending is true, extract slot names for
* future iterations (only needed if we haven't done it yet)
@ -2049,9 +2070,6 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
if (slot_names == NIL && slot_persistence_pending)
slot_names = extract_slot_names(remote_slots);
/* Free the current remote_slots list */
list_free_deep(remote_slots);
/* Done if all slots are persisted i.e are sync-ready */
if (!slot_persistence_pending)
break;
@ -2060,6 +2078,8 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
wait_for_slot_activity(some_slot_updated);
}
MemoryContextDelete(sync_retry_ctx);
if (slot_names)
list_free_deep(slot_names);