mirror of
https://github.com/postgres/postgres.git
synced 2026-05-28 04:35:45 -04:00
Disable logical decoding after REPACK (CONCURRENTLY)
REPACK (CONCURRENTLY) uses a temporary logical replication slot, which is dropped once done, but it wasn't calling RequestDisableLogicalDecoding(), leaving effective_wal_level stuck at 'logical'. Fix by adding a Boolean flag to ReplicationSlotDropAcquired() to have it request to disable logical decoding, and passing it as true on REPACK. Other callers of that function preserve their existing behavior. Author: Imran Zaheer <imran.zhir@gmail.com> Reviewed-by: shveta malik <shveta.malik@gmail.com> Discussion: https://postgr.es/m/CA+UBfaktds57dw2M8BEv_kS-=ixph3w+3MxKixtaDQMi_k7Ybg@mail.gmail.com
This commit is contained in:
parent
0f24332aeb
commit
2af1dc8928
6 changed files with 47 additions and 32 deletions
|
|
@ -323,7 +323,7 @@ repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
|
||||||
ExecDropSingleTupleTableSlot(dstate->slot);
|
ExecDropSingleTupleTableSlot(dstate->slot);
|
||||||
|
|
||||||
FreeDecodingContext(ctx);
|
FreeDecodingContext(ctx);
|
||||||
ReplicationSlotDropAcquired();
|
ReplicationSlotDropAcquired(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
||||||
|
|
@ -1406,7 +1406,8 @@ ApplyLauncherMain(Datum main_arg)
|
||||||
if (MyReplicationSlot)
|
if (MyReplicationSlot)
|
||||||
{
|
{
|
||||||
if (!retain_dead_tuples)
|
if (!retain_dead_tuples)
|
||||||
ReplicationSlotDropAcquired();
|
/* XXX unclear why we don't request logical decoding disable */
|
||||||
|
ReplicationSlotDropAcquired(false);
|
||||||
else if (can_update_xmin)
|
else if (can_update_xmin)
|
||||||
update_conflict_slot_xmin(xmin);
|
update_conflict_slot_xmin(xmin);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -557,7 +557,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
|
||||||
* database drop by the startup process and the creation of a new
|
* database drop by the startup process and the creation of a new
|
||||||
* slot by the user. This new user-created slot may end up using
|
* slot by the user. This new user-created slot may end up using
|
||||||
* the same shared memory as that of 'local_slot'. Thus check if
|
* the same shared memory as that of 'local_slot'. Thus check if
|
||||||
* local_slot is still the synced one before performing actual
|
* local_slot is still the synced one before performing the actual
|
||||||
* drop.
|
* drop.
|
||||||
*/
|
*/
|
||||||
SpinLockAcquire(&local_slot->mutex);
|
SpinLockAcquire(&local_slot->mutex);
|
||||||
|
|
@ -566,8 +566,14 @@ drop_local_obsolete_slots(List *remote_slot_list)
|
||||||
|
|
||||||
if (synced_slot)
|
if (synced_slot)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* Now acquire and drop the slot. Note we purposely don't
|
||||||
|
* request logical decoding to be disabled here: since this is
|
||||||
|
* a standby, which derives its logical decoding state from
|
||||||
|
* the primary, it would be wrong to do so.
|
||||||
|
*/
|
||||||
ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
|
ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
|
||||||
ReplicationSlotDropAcquired();
|
ReplicationSlotDropAcquired(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnlockSharedObject(DatabaseRelationId, local_slot->data.database,
|
UnlockSharedObject(DatabaseRelationId, local_slot->data.database,
|
||||||
|
|
|
||||||
|
|
@ -783,19 +783,10 @@ ReplicationSlotRelease(void)
|
||||||
if (slot->data.persistency == RS_EPHEMERAL)
|
if (slot->data.persistency == RS_EPHEMERAL)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Delete the slot. There is no !PANIC case where this is allowed to
|
* If slot is ephemeral, we drop it upon release, and request logical
|
||||||
* fail, all that may happen is an incomplete cleanup of the on-disk
|
* decoding be disabled.
|
||||||
* data.
|
|
||||||
*/
|
*/
|
||||||
ReplicationSlotDropAcquired();
|
ReplicationSlotDropAcquired(is_logical);
|
||||||
|
|
||||||
/*
|
|
||||||
* Request to disable logical decoding, even though this slot may not
|
|
||||||
* have been the last logical slot. The checkpointer will verify if
|
|
||||||
* logical decoding should actually be disabled.
|
|
||||||
*/
|
|
||||||
if (is_logical)
|
|
||||||
RequestDisableLogicalDecoding();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -914,15 +905,13 @@ restart:
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Permanently drop replication slot identified by the passed in name.
|
* Permanently drop the replication slot identified by the passed-in name.
|
||||||
|
*
|
||||||
|
* If this is a logical slot, request that logical decoding be disabled.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
ReplicationSlotDrop(const char *name, bool nowait)
|
ReplicationSlotDrop(const char *name, bool nowait)
|
||||||
{
|
{
|
||||||
bool is_logical;
|
|
||||||
|
|
||||||
Assert(MyReplicationSlot == NULL);
|
|
||||||
|
|
||||||
ReplicationSlotAcquire(name, nowait, false);
|
ReplicationSlotAcquire(name, nowait, false);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -935,12 +924,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
|
||||||
errmsg("cannot drop replication slot \"%s\"", name),
|
errmsg("cannot drop replication slot \"%s\"", name),
|
||||||
errdetail("This replication slot is being synchronized from the primary server."));
|
errdetail("This replication slot is being synchronized from the primary server."));
|
||||||
|
|
||||||
is_logical = SlotIsLogical(MyReplicationSlot);
|
ReplicationSlotDropAcquired(SlotIsLogical(MyReplicationSlot));
|
||||||
|
|
||||||
ReplicationSlotDropAcquired();
|
|
||||||
|
|
||||||
if (is_logical)
|
|
||||||
RequestDisableLogicalDecoding();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -1037,18 +1021,28 @@ ReplicationSlotAlter(const char *name, const bool *failover,
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Permanently drop the currently acquired replication slot.
|
* Permanently drop the currently acquired replication slot.
|
||||||
|
*
|
||||||
|
* If caller requests it, have checkpointer attempt to disable logical
|
||||||
|
* decoding. Obviously, this should only be done if the slot is logical.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
ReplicationSlotDropAcquired(void)
|
ReplicationSlotDropAcquired(bool try_disable)
|
||||||
{
|
{
|
||||||
ReplicationSlot *slot = MyReplicationSlot;
|
ReplicationSlot *slot;
|
||||||
|
|
||||||
Assert(MyReplicationSlot != NULL);
|
Assert(MyReplicationSlot != NULL);
|
||||||
|
slot = MyReplicationSlot;
|
||||||
|
|
||||||
|
/* Can only disable logical decoding if slot is logical */
|
||||||
|
Assert(!try_disable || SlotIsLogical(slot));
|
||||||
|
|
||||||
/* slot isn't acquired anymore */
|
/* slot isn't acquired anymore */
|
||||||
MyReplicationSlot = NULL;
|
MyReplicationSlot = NULL;
|
||||||
|
|
||||||
ReplicationSlotDropPtr(slot);
|
ReplicationSlotDropPtr(slot);
|
||||||
|
|
||||||
|
if (try_disable)
|
||||||
|
RequestDisableLogicalDecoding();
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -1511,7 +1505,7 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
|
||||||
* This routine isn't as efficient as it could be - but we don't drop
|
* This routine isn't as efficient as it could be - but we don't drop
|
||||||
* databases often, especially databases with lots of slots.
|
* databases often, especially databases with lots of slots.
|
||||||
*
|
*
|
||||||
* If it drops the last logical slot in the cluster, it requests to disable
|
* If the last logical slot in the cluster is dropped, request to disable
|
||||||
* logical decoding.
|
* logical decoding.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
|
|
@ -1606,7 +1600,7 @@ restart:
|
||||||
* beginning each time we release the lock.
|
* beginning each time we release the lock.
|
||||||
*/
|
*/
|
||||||
LWLockRelease(ReplicationSlotControlLock);
|
LWLockRelease(ReplicationSlotControlLock);
|
||||||
ReplicationSlotDropAcquired();
|
ReplicationSlotDropAcquired(false);
|
||||||
dropped = true;
|
dropped = true;
|
||||||
goto restart;
|
goto restart;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -335,7 +335,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
|
||||||
bool synced);
|
bool synced);
|
||||||
extern void ReplicationSlotPersist(void);
|
extern void ReplicationSlotPersist(void);
|
||||||
extern void ReplicationSlotDrop(const char *name, bool nowait);
|
extern void ReplicationSlotDrop(const char *name, bool nowait);
|
||||||
extern void ReplicationSlotDropAcquired(void);
|
extern void ReplicationSlotDropAcquired(bool try_disable);
|
||||||
extern void ReplicationSlotAlter(const char *name, const bool *failover,
|
extern void ReplicationSlotAlter(const char *name, const bool *failover,
|
||||||
const bool *two_phase);
|
const bool *two_phase);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -400,6 +400,20 @@ select pg_cancel_backend(pid) from pg_stat_activity where query ~ 'slot_canceled
|
||||||
"the activation process aborted");
|
"the activation process aborted");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Test that logical decoding is disabled after repack
|
||||||
|
$primary->safe_psql('postgres', qq[create table foo(a int primary key)]);
|
||||||
|
$primary->safe_psql('postgres', qq[repack (concurrently) foo;]);
|
||||||
|
ok( $primary->log_contains(
|
||||||
|
"logical decoding is enabled upon creating a new logical replication slot"
|
||||||
|
),
|
||||||
|
"logical decoding enabled by repack");
|
||||||
|
|
||||||
|
# Wait for the checkpointer to disable logical decoding.
|
||||||
|
wait_for_logical_decoding_disabled($primary);
|
||||||
|
test_wal_level($primary, "replica|replica",
|
||||||
|
"logical decoding disabled after repack"
|
||||||
|
);
|
||||||
|
|
||||||
$primary->stop;
|
$primary->stop;
|
||||||
|
|
||||||
done_testing();
|
done_testing();
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue