mirror of
https://github.com/postgres/postgres.git
synced 2026-06-11 01:30:11 -04:00
Reserve replication slots specifically for REPACK
Add a new GUC max_repack_replication_slots, which lets the user reserve some additional replication slots for concurrent repack (and only concurrent repack). With this, the user doesn't have to worry about changing the max_replication_slots in order to cater for use of concurrent repack. (We still use the same pool of bgworkers though, but that's less commonly a problem than slots.) Author: Álvaro Herrera <alvherre@kurilemu.de> Reviewed-by: Srinath Reddy Sadipiralla <srinath2133@gmail.com> Discussion: https://postgr.es/m/202604012148.nnnmyxxrr6nh@alvherre.pgsql
This commit is contained in:
parent
979387f188
commit
e76d8c749c
14 changed files with 121 additions and 62 deletions
|
|
@ -4639,6 +4639,21 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows
|
||||||
</listitem>
|
</listitem>
|
||||||
</varlistentry>
|
</varlistentry>
|
||||||
|
|
||||||
|
<varlistentry id="guc-max-repack-replication-slots" xreflabel="max_repack_replication_slots">
|
||||||
|
<term><varname>max_repack_replication_slots</varname> (<type>integer</type>)
|
||||||
|
<indexterm>
|
||||||
|
<primary><varname>max_repack_replication_slots</varname> configuration parameter</primary>
|
||||||
|
</indexterm>
|
||||||
|
</term>
|
||||||
|
<listitem>
|
||||||
|
<para>
|
||||||
|
Specifies the maximum number of replication slots for use of
|
||||||
|
the <command>REPACK</command> command. The default is 5.
|
||||||
|
This parameter can only be set at server start.
|
||||||
|
</para>
|
||||||
|
</listitem>
|
||||||
|
</varlistentry>
|
||||||
|
|
||||||
<varlistentry id="guc-max-replication-slots" xreflabel="max_replication_slots">
|
<varlistentry id="guc-max-replication-slots" xreflabel="max_replication_slots">
|
||||||
<term><varname>max_replication_slots</varname> (<type>integer</type>)
|
<term><varname>max_replication_slots</varname> (<type>integer</type>)
|
||||||
<indexterm>
|
<indexterm>
|
||||||
|
|
|
||||||
|
|
@ -293,9 +293,9 @@ REPACK [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] USING
|
||||||
|
|
||||||
<listitem>
|
<listitem>
|
||||||
<para>
|
<para>
|
||||||
The <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
|
The <link linkend="guc-max-repack-replication-slots"><varname>max_repack_replication_slots</varname></link>
|
||||||
configuration parameter does not allow for creation of an additional
|
configuration parameter does not allow for the creation of an
|
||||||
replication slot.
|
additional replication slot.
|
||||||
</para>
|
</para>
|
||||||
</listitem>
|
</listitem>
|
||||||
</itemizedlist>
|
</itemizedlist>
|
||||||
|
|
|
||||||
|
|
@ -212,7 +212,7 @@ repack_setup_logical_decoding(Oid relid)
|
||||||
* Make sure we can use logical decoding.
|
* Make sure we can use logical decoding.
|
||||||
*/
|
*/
|
||||||
CheckSlotPermissions();
|
CheckSlotPermissions();
|
||||||
CheckLogicalDecodingRequirements();
|
CheckLogicalDecodingRequirements(true);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* A single backend should not execute multiple REPACK commands at a time,
|
* A single backend should not execute multiple REPACK commands at a time,
|
||||||
|
|
@ -221,8 +221,8 @@ repack_setup_logical_decoding(Oid relid)
|
||||||
* RS_TEMPORARY so that the slot gets cleaned up on ERROR.
|
* RS_TEMPORARY so that the slot gets cleaned up on ERROR.
|
||||||
*/
|
*/
|
||||||
snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
|
snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
|
||||||
ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, false,
|
ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true,
|
||||||
false);
|
false, false);
|
||||||
|
|
||||||
EnsureLogicalDecodingEnabled();
|
EnsureLogicalDecodingEnabled();
|
||||||
|
|
||||||
|
|
@ -233,6 +233,7 @@ repack_setup_logical_decoding(Oid relid)
|
||||||
ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
|
ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
|
||||||
NIL,
|
NIL,
|
||||||
true,
|
true,
|
||||||
|
true,
|
||||||
InvalidXLogRecPtr,
|
InvalidXLogRecPtr,
|
||||||
XL_ROUTINE(.page_read = read_local_xlog_page,
|
XL_ROUTINE(.page_read = read_local_xlog_page,
|
||||||
.segment_open = wal_segment_open,
|
.segment_open = wal_segment_open,
|
||||||
|
|
|
||||||
|
|
@ -1575,7 +1575,7 @@ CreateConflictDetectionSlot(void)
|
||||||
errmsg("creating replication conflict detection slot"));
|
errmsg("creating replication conflict detection slot"));
|
||||||
|
|
||||||
ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
|
ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
|
||||||
false, false);
|
false, false, false);
|
||||||
|
|
||||||
init_conflict_slot_xmin();
|
init_conflict_slot_xmin();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -108,9 +108,9 @@ static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugi
|
||||||
* decoding.
|
* decoding.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
CheckLogicalDecodingRequirements(void)
|
CheckLogicalDecodingRequirements(bool repack)
|
||||||
{
|
{
|
||||||
CheckSlotRequirements();
|
CheckSlotRequirements(repack);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NB: Adding a new requirement likely means that RestoreSlotFromDisk()
|
* NB: Adding a new requirement likely means that RestoreSlotFromDisk()
|
||||||
|
|
@ -304,6 +304,7 @@ StartupDecodingContext(List *output_plugin_options,
|
||||||
* output_plugin_options -- contains options passed to the output plugin
|
* output_plugin_options -- contains options passed to the output plugin
|
||||||
* need_full_snapshot -- if true, must obtain a snapshot able to read all
|
* need_full_snapshot -- if true, must obtain a snapshot able to read all
|
||||||
* tables; if false, one that can read only catalogs is acceptable.
|
* tables; if false, one that can read only catalogs is acceptable.
|
||||||
|
* for_repack -- if true, we're going to be decoding for REPACK.
|
||||||
* restart_lsn -- if given as invalid, it's this routine's responsibility to
|
* restart_lsn -- if given as invalid, it's this routine's responsibility to
|
||||||
* mark WAL as reserved by setting a convenient restart_lsn for the slot.
|
* mark WAL as reserved by setting a convenient restart_lsn for the slot.
|
||||||
* Otherwise, we set for decoding to start from the given LSN without
|
* Otherwise, we set for decoding to start from the given LSN without
|
||||||
|
|
@ -324,6 +325,7 @@ LogicalDecodingContext *
|
||||||
CreateInitDecodingContext(const char *plugin,
|
CreateInitDecodingContext(const char *plugin,
|
||||||
List *output_plugin_options,
|
List *output_plugin_options,
|
||||||
bool need_full_snapshot,
|
bool need_full_snapshot,
|
||||||
|
bool for_repack,
|
||||||
XLogRecPtr restart_lsn,
|
XLogRecPtr restart_lsn,
|
||||||
XLogReaderRoutine *xl_routine,
|
XLogReaderRoutine *xl_routine,
|
||||||
LogicalOutputPluginWriterPrepareWrite prepare_write,
|
LogicalOutputPluginWriterPrepareWrite prepare_write,
|
||||||
|
|
@ -340,7 +342,7 @@ CreateInitDecodingContext(const char *plugin,
|
||||||
* On a standby, this check is also required while creating the slot.
|
* On a standby, this check is also required while creating the slot.
|
||||||
* Check the comments in the function.
|
* Check the comments in the function.
|
||||||
*/
|
*/
|
||||||
CheckLogicalDecodingRequirements();
|
CheckLogicalDecodingRequirements(for_repack);
|
||||||
|
|
||||||
/* shorter lines... */
|
/* shorter lines... */
|
||||||
slot = MyReplicationSlot;
|
slot = MyReplicationSlot;
|
||||||
|
|
|
||||||
|
|
@ -115,7 +115,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
|
||||||
|
|
||||||
CheckSlotPermissions();
|
CheckSlotPermissions();
|
||||||
|
|
||||||
CheckLogicalDecodingRequirements();
|
CheckLogicalDecodingRequirements(false);
|
||||||
|
|
||||||
if (PG_ARGISNULL(0))
|
if (PG_ARGISNULL(0))
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
|
|
|
||||||
|
|
@ -434,7 +434,7 @@ get_local_synced_slots(void)
|
||||||
|
|
||||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||||
|
|
||||||
for (int i = 0; i < max_replication_slots; i++)
|
for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
|
||||||
{
|
{
|
||||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||||
|
|
||||||
|
|
@ -823,6 +823,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
|
||||||
*/
|
*/
|
||||||
ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
|
ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
|
||||||
remote_slot->two_phase,
|
remote_slot->two_phase,
|
||||||
|
false,
|
||||||
remote_slot->failover,
|
remote_slot->failover,
|
||||||
true);
|
true);
|
||||||
|
|
||||||
|
|
@ -1707,7 +1708,7 @@ update_synced_slots_inactive_since(void)
|
||||||
|
|
||||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||||
|
|
||||||
for (int i = 0; i < max_replication_slots; i++)
|
for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
|
||||||
{
|
{
|
||||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -160,6 +160,8 @@ ReplicationSlot *MyReplicationSlot = NULL;
|
||||||
/* GUC variables */
|
/* GUC variables */
|
||||||
int max_replication_slots = 10; /* the maximum number of replication
|
int max_replication_slots = 10; /* the maximum number of replication
|
||||||
* slots */
|
* slots */
|
||||||
|
int max_repack_replication_slots = 5; /* the maximum number of slots
|
||||||
|
* for REPACK */
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Invalidate replication slots that have remained idle longer than this
|
* Invalidate replication slots that have remained idle longer than this
|
||||||
|
|
@ -199,12 +201,13 @@ ReplicationSlotsShmemRequest(void *arg)
|
||||||
{
|
{
|
||||||
Size size;
|
Size size;
|
||||||
|
|
||||||
if (max_replication_slots == 0)
|
if (max_replication_slots + max_repack_replication_slots == 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
size = offsetof(ReplicationSlotCtlData, replication_slots);
|
size = offsetof(ReplicationSlotCtlData, replication_slots);
|
||||||
size = add_size(size,
|
size = add_size(size,
|
||||||
mul_size(max_replication_slots, sizeof(ReplicationSlot)));
|
mul_size(max_replication_slots + max_repack_replication_slots,
|
||||||
|
sizeof(ReplicationSlot)));
|
||||||
ShmemRequestStruct(.name = "ReplicationSlot Ctl",
|
ShmemRequestStruct(.name = "ReplicationSlot Ctl",
|
||||||
.size = size,
|
.size = size,
|
||||||
.ptr = (void **) &ReplicationSlotCtl,
|
.ptr = (void **) &ReplicationSlotCtl,
|
||||||
|
|
@ -217,7 +220,7 @@ ReplicationSlotsShmemRequest(void *arg)
|
||||||
static void
|
static void
|
||||||
ReplicationSlotsShmemInit(void *arg)
|
ReplicationSlotsShmemInit(void *arg)
|
||||||
{
|
{
|
||||||
for (int i = 0; i < max_replication_slots; i++)
|
for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
|
||||||
{
|
{
|
||||||
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
|
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
|
||||||
|
|
||||||
|
|
@ -366,6 +369,7 @@ IsSlotForConflictCheck(const char *name)
|
||||||
* db_specific: logical decoding is db specific; if the slot is going to
|
* db_specific: logical decoding is db specific; if the slot is going to
|
||||||
* be used for that pass true, otherwise false.
|
* be used for that pass true, otherwise false.
|
||||||
* two_phase: If enabled, allows decoding of prepared transactions.
|
* two_phase: If enabled, allows decoding of prepared transactions.
|
||||||
|
* repack: If true, use a slot from the pool for REPACK.
|
||||||
* failover: If enabled, allows the slot to be synced to standbys so
|
* failover: If enabled, allows the slot to be synced to standbys so
|
||||||
* that logical replication can be resumed after failover.
|
* that logical replication can be resumed after failover.
|
||||||
* synced: True if the slot is synchronized from the primary server.
|
* synced: True if the slot is synchronized from the primary server.
|
||||||
|
|
@ -373,10 +377,11 @@ IsSlotForConflictCheck(const char *name)
|
||||||
void
|
void
|
||||||
ReplicationSlotCreate(const char *name, bool db_specific,
|
ReplicationSlotCreate(const char *name, bool db_specific,
|
||||||
ReplicationSlotPersistency persistency,
|
ReplicationSlotPersistency persistency,
|
||||||
bool two_phase, bool failover, bool synced)
|
bool two_phase, bool repack, bool failover, bool synced)
|
||||||
{
|
{
|
||||||
ReplicationSlot *slot = NULL;
|
ReplicationSlot *slot = NULL;
|
||||||
int i;
|
int startpoint,
|
||||||
|
endpoint;
|
||||||
|
|
||||||
Assert(MyReplicationSlot == NULL);
|
Assert(MyReplicationSlot == NULL);
|
||||||
|
|
||||||
|
|
@ -425,12 +430,16 @@ ReplicationSlotCreate(const char *name, bool db_specific,
|
||||||
LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
|
LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check for name collision, and identify an allocatable slot. We need to
|
* Check for name collision (across the whole array), and identify an
|
||||||
* hold ReplicationSlotControlLock in shared mode for this, so that nobody
|
* allocatable slot (in the array slice specific to our current use case:
|
||||||
* else can change the in_use flags while we're looking at them.
|
* either general, or REPACK only). We need to hold
|
||||||
|
* ReplicationSlotControlLock in shared mode for this, so that nobody else
|
||||||
|
* can change the in_use flags while we're looking at them.
|
||||||
*/
|
*/
|
||||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||||
for (i = 0; i < max_replication_slots; i++)
|
startpoint = !repack ? 0 : max_replication_slots;
|
||||||
|
endpoint = max_replication_slots + (repack ? max_repack_replication_slots : 0);
|
||||||
|
for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
|
||||||
{
|
{
|
||||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||||
|
|
||||||
|
|
@ -438,7 +447,9 @@ ReplicationSlotCreate(const char *name, bool db_specific,
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_DUPLICATE_OBJECT),
|
(errcode(ERRCODE_DUPLICATE_OBJECT),
|
||||||
errmsg("replication slot \"%s\" already exists", name)));
|
errmsg("replication slot \"%s\" already exists", name)));
|
||||||
if (!s->in_use && slot == NULL)
|
|
||||||
|
if (i >= startpoint && i < endpoint &&
|
||||||
|
!s->in_use && slot == NULL)
|
||||||
slot = s;
|
slot = s;
|
||||||
}
|
}
|
||||||
LWLockRelease(ReplicationSlotControlLock);
|
LWLockRelease(ReplicationSlotControlLock);
|
||||||
|
|
@ -448,7 +459,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
|
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
|
||||||
errmsg("all replication slots are in use"),
|
errmsg("all replication slots are in use"),
|
||||||
errhint("Free one or increase \"max_replication_slots\".")));
|
errhint("Free one or increase \"%s\".",
|
||||||
|
repack ? "max_repack_replication_slots" : "max_replication_slots")));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Since this slot is not in use, nobody should be looking at any part of
|
* Since this slot is not in use, nobody should be looking at any part of
|
||||||
|
|
@ -541,7 +553,7 @@ SearchNamedReplicationSlot(const char *name, bool need_lock)
|
||||||
if (need_lock)
|
if (need_lock)
|
||||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||||
|
|
||||||
for (i = 0; i < max_replication_slots; i++)
|
for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
|
||||||
{
|
{
|
||||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||||
|
|
||||||
|
|
@ -569,7 +581,8 @@ int
|
||||||
ReplicationSlotIndex(ReplicationSlot *slot)
|
ReplicationSlotIndex(ReplicationSlot *slot)
|
||||||
{
|
{
|
||||||
Assert(slot >= ReplicationSlotCtl->replication_slots &&
|
Assert(slot >= ReplicationSlotCtl->replication_slots &&
|
||||||
slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
|
slot < ReplicationSlotCtl->replication_slots +
|
||||||
|
(max_replication_slots + max_repack_replication_slots));
|
||||||
|
|
||||||
return slot - ReplicationSlotCtl->replication_slots;
|
return slot - ReplicationSlotCtl->replication_slots;
|
||||||
}
|
}
|
||||||
|
|
@ -863,7 +876,7 @@ ReplicationSlotCleanup(bool synced_only)
|
||||||
restart:
|
restart:
|
||||||
found_valid_logicalslot = false;
|
found_valid_logicalslot = false;
|
||||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||||
for (i = 0; i < max_replication_slots; i++)
|
for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
|
||||||
{
|
{
|
||||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||||
|
|
||||||
|
|
@ -1245,7 +1258,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
|
||||||
if (!already_locked)
|
if (!already_locked)
|
||||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||||
|
|
||||||
for (i = 0; i < max_replication_slots; i++)
|
for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
|
||||||
{
|
{
|
||||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||||
TransactionId effective_xmin;
|
TransactionId effective_xmin;
|
||||||
|
|
@ -1300,7 +1313,7 @@ ReplicationSlotsComputeRequiredLSN(void)
|
||||||
Assert(ReplicationSlotCtl != NULL);
|
Assert(ReplicationSlotCtl != NULL);
|
||||||
|
|
||||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||||
for (i = 0; i < max_replication_slots; i++)
|
for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
|
||||||
{
|
{
|
||||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||||
XLogRecPtr restart_lsn;
|
XLogRecPtr restart_lsn;
|
||||||
|
|
@ -1367,12 +1380,12 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
|
||||||
XLogRecPtr result = InvalidXLogRecPtr;
|
XLogRecPtr result = InvalidXLogRecPtr;
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
if (max_replication_slots <= 0)
|
if (max_replication_slots + max_repack_replication_slots <= 0)
|
||||||
return InvalidXLogRecPtr;
|
return InvalidXLogRecPtr;
|
||||||
|
|
||||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||||
|
|
||||||
for (i = 0; i < max_replication_slots; i++)
|
for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
|
||||||
{
|
{
|
||||||
ReplicationSlot *s;
|
ReplicationSlot *s;
|
||||||
XLogRecPtr restart_lsn;
|
XLogRecPtr restart_lsn;
|
||||||
|
|
@ -1447,11 +1460,11 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
|
||||||
|
|
||||||
*nslots = *nactive = 0;
|
*nslots = *nactive = 0;
|
||||||
|
|
||||||
if (max_replication_slots <= 0)
|
if (max_replication_slots + max_repack_replication_slots <= 0)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||||
for (i = 0; i < max_replication_slots; i++)
|
for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
|
||||||
{
|
{
|
||||||
ReplicationSlot *s;
|
ReplicationSlot *s;
|
||||||
|
|
||||||
|
|
@ -1508,13 +1521,13 @@ ReplicationSlotsDropDBSlots(Oid dboid)
|
||||||
bool found_valid_logicalslot;
|
bool found_valid_logicalslot;
|
||||||
bool dropped = false;
|
bool dropped = false;
|
||||||
|
|
||||||
if (max_replication_slots <= 0)
|
if (max_replication_slots + max_repack_replication_slots <= 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
restart:
|
restart:
|
||||||
found_valid_logicalslot = false;
|
found_valid_logicalslot = false;
|
||||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||||
for (i = 0; i < max_replication_slots; i++)
|
for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
|
||||||
{
|
{
|
||||||
ReplicationSlot *s;
|
ReplicationSlot *s;
|
||||||
char *slotname;
|
char *slotname;
|
||||||
|
|
@ -1611,11 +1624,11 @@ CheckLogicalSlotExists(void)
|
||||||
{
|
{
|
||||||
bool found = false;
|
bool found = false;
|
||||||
|
|
||||||
if (max_replication_slots <= 0)
|
if (max_replication_slots + max_repack_replication_slots <= 0)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||||
for (int i = 0; i < max_replication_slots; i++)
|
for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
|
||||||
{
|
{
|
||||||
ReplicationSlot *s;
|
ReplicationSlot *s;
|
||||||
bool invalidated;
|
bool invalidated;
|
||||||
|
|
@ -1649,17 +1662,24 @@ CheckLogicalSlotExists(void)
|
||||||
* slots.
|
* slots.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
CheckSlotRequirements(void)
|
CheckSlotRequirements(bool repack)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* NB: Adding a new requirement likely means that RestoreSlotFromDisk()
|
* NB: Adding a new requirement likely means that RestoreSlotFromDisk()
|
||||||
* needs the same check.
|
* needs the same check.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
if (max_replication_slots == 0)
|
if (!repack && max_replication_slots == 0)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
|
errmsg("replication slots can only be used if \"%s\" > 0",
|
||||||
|
"max_replication_slots"));
|
||||||
|
|
||||||
|
if (repack && max_repack_replication_slots == 0)
|
||||||
|
ereport(ERROR,
|
||||||
|
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("REPACK can only be used if \"%s\" > 0",
|
||||||
|
"max_repack_replication_slots"));
|
||||||
|
|
||||||
if (wal_level < WAL_LEVEL_REPLICA)
|
if (wal_level < WAL_LEVEL_REPLICA)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
|
|
@ -2210,7 +2230,7 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes,
|
||||||
Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
|
Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
|
||||||
Assert(possible_causes != RS_INVAL_NONE);
|
Assert(possible_causes != RS_INVAL_NONE);
|
||||||
|
|
||||||
if (max_replication_slots == 0)
|
if (max_replication_slots == 0 && max_repack_replication_slots == 0)
|
||||||
return invalidated;
|
return invalidated;
|
||||||
|
|
||||||
XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
|
XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
|
||||||
|
|
@ -2218,7 +2238,7 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes,
|
||||||
restart:
|
restart:
|
||||||
found_valid_logicalslot = false;
|
found_valid_logicalslot = false;
|
||||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||||
for (int i = 0; i < max_replication_slots; i++)
|
for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
|
||||||
{
|
{
|
||||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||||
bool released_lock = false;
|
bool released_lock = false;
|
||||||
|
|
@ -2323,7 +2343,7 @@ CheckPointReplicationSlots(bool is_shutdown)
|
||||||
*/
|
*/
|
||||||
LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
|
LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
|
||||||
|
|
||||||
for (i = 0; i < max_replication_slots; i++)
|
for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
|
||||||
{
|
{
|
||||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||||
char path[MAXPGPATH];
|
char path[MAXPGPATH];
|
||||||
|
|
@ -2424,7 +2444,7 @@ StartupReplicationSlots(void)
|
||||||
FreeDir(replication_dir);
|
FreeDir(replication_dir);
|
||||||
|
|
||||||
/* currently no slots exist, we're done. */
|
/* currently no slots exist, we're done. */
|
||||||
if (max_replication_slots <= 0)
|
if (max_replication_slots + max_repack_replication_slots <= 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
/* Now that we have recovered all the data, compute replication xmin */
|
/* Now that we have recovered all the data, compute replication xmin */
|
||||||
|
|
@ -2853,7 +2873,13 @@ RestoreSlotFromDisk(const char *name)
|
||||||
NameStr(cp.slotdata.name)),
|
NameStr(cp.slotdata.name)),
|
||||||
errhint("Change \"wal_level\" to be \"replica\" or higher.")));
|
errhint("Change \"wal_level\" to be \"replica\" or higher.")));
|
||||||
|
|
||||||
/* nothing can be active yet, don't lock anything */
|
/*
|
||||||
|
* Nothing can be active yet, don't lock anything. Note we iterate up to
|
||||||
|
* max_replication_slots instead of adding max_repack_replication_slots as
|
||||||
|
* in all other places, because we must enforce the GUC value in case
|
||||||
|
* there were more slots before the shutdown than what it is set up to
|
||||||
|
* now.
|
||||||
|
*/
|
||||||
for (i = 0; i < max_replication_slots; i++)
|
for (i = 0; i < max_replication_slots; i++)
|
||||||
{
|
{
|
||||||
ReplicationSlot *slot;
|
ReplicationSlot *slot;
|
||||||
|
|
|
||||||
|
|
@ -53,7 +53,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
|
||||||
/* acquire replication slot, this will check for conflicting names */
|
/* acquire replication slot, this will check for conflicting names */
|
||||||
ReplicationSlotCreate(name, false,
|
ReplicationSlotCreate(name, false,
|
||||||
temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
|
temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
|
||||||
false, false);
|
false, false, false);
|
||||||
|
|
||||||
if (immediately_reserve)
|
if (immediately_reserve)
|
||||||
{
|
{
|
||||||
|
|
@ -90,7 +90,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
CheckSlotPermissions();
|
CheckSlotPermissions();
|
||||||
|
|
||||||
CheckSlotRequirements();
|
CheckSlotRequirements(false);
|
||||||
|
|
||||||
create_physical_replication_slot(NameStr(*name),
|
create_physical_replication_slot(NameStr(*name),
|
||||||
immediately_reserve,
|
immediately_reserve,
|
||||||
|
|
@ -146,7 +146,7 @@ create_logical_replication_slot(char *name, char *plugin,
|
||||||
*/
|
*/
|
||||||
ReplicationSlotCreate(name, true,
|
ReplicationSlotCreate(name, true,
|
||||||
temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
|
temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
|
||||||
failover, false);
|
false, failover, false);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Ensure the logical decoding is enabled before initializing the logical
|
* Ensure the logical decoding is enabled before initializing the logical
|
||||||
|
|
@ -164,6 +164,7 @@ create_logical_replication_slot(char *name, char *plugin,
|
||||||
*/
|
*/
|
||||||
ctx = CreateInitDecodingContext(plugin, NIL,
|
ctx = CreateInitDecodingContext(plugin, NIL,
|
||||||
false, /* just catalogs is OK */
|
false, /* just catalogs is OK */
|
||||||
|
false, /* not repack */
|
||||||
restart_lsn,
|
restart_lsn,
|
||||||
XL_ROUTINE(.page_read = read_local_xlog_page,
|
XL_ROUTINE(.page_read = read_local_xlog_page,
|
||||||
.segment_open = wal_segment_open,
|
.segment_open = wal_segment_open,
|
||||||
|
|
@ -203,7 +204,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
CheckSlotPermissions();
|
CheckSlotPermissions();
|
||||||
|
|
||||||
CheckLogicalDecodingRequirements();
|
CheckLogicalDecodingRequirements(false);
|
||||||
|
|
||||||
create_logical_replication_slot(NameStr(*name),
|
create_logical_replication_slot(NameStr(*name),
|
||||||
NameStr(*plugin),
|
NameStr(*plugin),
|
||||||
|
|
@ -240,7 +241,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
CheckSlotPermissions();
|
CheckSlotPermissions();
|
||||||
|
|
||||||
CheckSlotRequirements();
|
CheckSlotRequirements(false);
|
||||||
|
|
||||||
ReplicationSlotDrop(NameStr(*name), true);
|
ReplicationSlotDrop(NameStr(*name), true);
|
||||||
|
|
||||||
|
|
@ -270,7 +271,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||||
currlsn = GetXLogWriteRecPtr();
|
currlsn = GetXLogWriteRecPtr();
|
||||||
|
|
||||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||||
for (slotno = 0; slotno < max_replication_slots; slotno++)
|
for (slotno = 0; slotno < max_replication_slots + max_repack_replication_slots; slotno++)
|
||||||
{
|
{
|
||||||
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
|
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
|
||||||
ReplicationSlot slot_contents;
|
ReplicationSlot slot_contents;
|
||||||
|
|
@ -648,9 +649,9 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
|
||||||
CheckSlotPermissions();
|
CheckSlotPermissions();
|
||||||
|
|
||||||
if (logical_slot)
|
if (logical_slot)
|
||||||
CheckLogicalDecodingRequirements();
|
CheckLogicalDecodingRequirements(false);
|
||||||
else
|
else
|
||||||
CheckSlotRequirements();
|
CheckSlotRequirements(false);
|
||||||
|
|
||||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||||
|
|
||||||
|
|
@ -665,7 +666,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
|
||||||
* managed to create the new slot, we advance the new slot's restart_lsn
|
* managed to create the new slot, we advance the new slot's restart_lsn
|
||||||
* to the source slot's updated restart_lsn the second time we lock it.
|
* to the source slot's updated restart_lsn the second time we lock it.
|
||||||
*/
|
*/
|
||||||
for (int i = 0; i < max_replication_slots; i++)
|
for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
|
||||||
{
|
{
|
||||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1241,7 +1241,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
||||||
{
|
{
|
||||||
ReplicationSlotCreate(cmd->slotname, false,
|
ReplicationSlotCreate(cmd->slotname, false,
|
||||||
cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
|
cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
|
||||||
false, false, false);
|
false, false, false, false);
|
||||||
|
|
||||||
if (reserve_wal)
|
if (reserve_wal)
|
||||||
{
|
{
|
||||||
|
|
@ -1261,7 +1261,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
||||||
|
|
||||||
Assert(cmd->kind == REPLICATION_KIND_LOGICAL);
|
Assert(cmd->kind == REPLICATION_KIND_LOGICAL);
|
||||||
|
|
||||||
CheckLogicalDecodingRequirements();
|
CheckLogicalDecodingRequirements(false);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Initially create persistent slot as ephemeral - that allows us to
|
* Initially create persistent slot as ephemeral - that allows us to
|
||||||
|
|
@ -1272,7 +1272,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
||||||
*/
|
*/
|
||||||
ReplicationSlotCreate(cmd->slotname, true,
|
ReplicationSlotCreate(cmd->slotname, true,
|
||||||
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
|
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
|
||||||
two_phase, failover, false);
|
two_phase, false, failover, false);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Do options check early so that we can bail before calling the
|
* Do options check early so that we can bail before calling the
|
||||||
|
|
@ -1330,6 +1330,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
||||||
Assert(IsLogicalDecodingEnabled());
|
Assert(IsLogicalDecodingEnabled());
|
||||||
|
|
||||||
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
|
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
|
||||||
|
false,
|
||||||
InvalidXLogRecPtr,
|
InvalidXLogRecPtr,
|
||||||
XL_ROUTINE(.page_read = logical_read_xlog_page,
|
XL_ROUTINE(.page_read = logical_read_xlog_page,
|
||||||
.segment_open = WalSndSegmentOpen,
|
.segment_open = WalSndSegmentOpen,
|
||||||
|
|
@ -1487,7 +1488,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
|
||||||
QueryCompletion qc;
|
QueryCompletion qc;
|
||||||
|
|
||||||
/* make sure that our requirements are still fulfilled */
|
/* make sure that our requirements are still fulfilled */
|
||||||
CheckLogicalDecodingRequirements();
|
CheckLogicalDecodingRequirements(false);
|
||||||
|
|
||||||
Assert(!MyReplicationSlot);
|
Assert(!MyReplicationSlot);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2079,6 +2079,14 @@
|
||||||
max => 'MAX_BACKENDS',
|
max => 'MAX_BACKENDS',
|
||||||
},
|
},
|
||||||
|
|
||||||
|
{ name => 'max_repack_replication_slots', type => 'int', context => 'PGC_POSTMASTER', group => 'REPLICATION_SENDING',
|
||||||
|
short_desc => 'Sets the maximum number of replication slots for use by REPACK.',
|
||||||
|
variable => 'max_repack_replication_slots',
|
||||||
|
boot_val => '5',
|
||||||
|
min => '0',
|
||||||
|
max => 'MAX_BACKENDS',
|
||||||
|
},
|
||||||
|
|
||||||
/* see max_wal_senders */
|
/* see max_wal_senders */
|
||||||
{ name => 'max_replication_slots', type => 'int', context => 'PGC_POSTMASTER', group => 'REPLICATION_SENDING',
|
{ name => 'max_replication_slots', type => 'int', context => 'PGC_POSTMASTER', group => 'REPLICATION_SENDING',
|
||||||
short_desc => 'Sets the maximum number of simultaneously defined replication slots.',
|
short_desc => 'Sets the maximum number of simultaneously defined replication slots.',
|
||||||
|
|
|
||||||
|
|
@ -348,6 +348,8 @@
|
||||||
# (change requires restart)
|
# (change requires restart)
|
||||||
#max_replication_slots = 10 # max number of replication slots
|
#max_replication_slots = 10 # max number of replication slots
|
||||||
# (change requires restart)
|
# (change requires restart)
|
||||||
|
#max_repack_replication_slots = 5 # max number of replication slots for REPACK
|
||||||
|
# (change requires restart)
|
||||||
#wal_keep_size = 0 # in megabytes; 0 disables
|
#wal_keep_size = 0 # in megabytes; 0 disables
|
||||||
#max_slot_wal_keep_size = -1 # in megabytes; -1 disables
|
#max_slot_wal_keep_size = -1 # in megabytes; -1 disables
|
||||||
#idle_replication_slot_timeout = 0 # in seconds; 0 disables
|
#idle_replication_slot_timeout = 0 # in seconds; 0 disables
|
||||||
|
|
|
||||||
|
|
@ -115,11 +115,12 @@ typedef struct LogicalDecodingContext
|
||||||
} LogicalDecodingContext;
|
} LogicalDecodingContext;
|
||||||
|
|
||||||
|
|
||||||
extern void CheckLogicalDecodingRequirements(void);
|
extern void CheckLogicalDecodingRequirements(bool repack);
|
||||||
|
|
||||||
extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
|
extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
|
||||||
List *output_plugin_options,
|
List *output_plugin_options,
|
||||||
bool need_full_snapshot,
|
bool need_full_snapshot,
|
||||||
|
bool for_repack,
|
||||||
XLogRecPtr restart_lsn,
|
XLogRecPtr restart_lsn,
|
||||||
XLogReaderRoutine *xl_routine,
|
XLogReaderRoutine *xl_routine,
|
||||||
LogicalOutputPluginWriterPrepareWrite prepare_write,
|
LogicalOutputPluginWriterPrepareWrite prepare_write,
|
||||||
|
|
|
||||||
|
|
@ -324,13 +324,14 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
|
||||||
|
|
||||||
/* GUCs */
|
/* GUCs */
|
||||||
extern PGDLLIMPORT int max_replication_slots;
|
extern PGDLLIMPORT int max_replication_slots;
|
||||||
|
extern PGDLLIMPORT int max_repack_replication_slots;
|
||||||
extern PGDLLIMPORT char *synchronized_standby_slots;
|
extern PGDLLIMPORT char *synchronized_standby_slots;
|
||||||
extern PGDLLIMPORT int idle_replication_slot_timeout_secs;
|
extern PGDLLIMPORT int idle_replication_slot_timeout_secs;
|
||||||
|
|
||||||
/* management of individual slots */
|
/* management of individual slots */
|
||||||
extern void ReplicationSlotCreate(const char *name, bool db_specific,
|
extern void ReplicationSlotCreate(const char *name, bool db_specific,
|
||||||
ReplicationSlotPersistency persistency,
|
ReplicationSlotPersistency persistency,
|
||||||
bool two_phase, bool failover,
|
bool two_phase, bool repack, bool failover,
|
||||||
bool synced);
|
bool synced);
|
||||||
extern void ReplicationSlotPersist(void);
|
extern void ReplicationSlotPersist(void);
|
||||||
extern void ReplicationSlotDrop(const char *name, bool nowait);
|
extern void ReplicationSlotDrop(const char *name, bool nowait);
|
||||||
|
|
@ -373,7 +374,7 @@ extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname
|
||||||
extern void StartupReplicationSlots(void);
|
extern void StartupReplicationSlots(void);
|
||||||
extern void CheckPointReplicationSlots(bool is_shutdown);
|
extern void CheckPointReplicationSlots(bool is_shutdown);
|
||||||
|
|
||||||
extern void CheckSlotRequirements(void);
|
extern void CheckSlotRequirements(bool repack);
|
||||||
extern void CheckSlotPermissions(void);
|
extern void CheckSlotPermissions(void);
|
||||||
extern ReplicationSlotInvalidationCause
|
extern ReplicationSlotInvalidationCause
|
||||||
GetSlotInvalidationCause(const char *cause_name);
|
GetSlotInvalidationCause(const char *cause_name);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue