mirror of
https://github.com/postgres/postgres.git
synced 2026-05-28 04:35:45 -04:00
Disable logical decoding after REPACK (CONCURRENTLY)
REPACK (CONCURRENTLY) uses a temporary logical replication slot, which is dropped once done, but it wasn't calling RequestDisableLogicalDecoding(), leaving effective_wal_level stuck at 'logical'. Fix by adding a Boolean flag to ReplicationSlotDropAcquired() to have it request to disable logical decoding, and passing it as true on REPACK. Other callers of that function preserve their existing behavior. Author: Imran Zaheer <imran.zhir@gmail.com> Reviewed-by: shveta malik <shveta.malik@gmail.com> Discussion: https://postgr.es/m/CA+UBfaktds57dw2M8BEv_kS-=ixph3w+3MxKixtaDQMi_k7Ybg@mail.gmail.com
This commit is contained in:
parent
0f24332aeb
commit
2af1dc8928
6 changed files with 47 additions and 32 deletions
|
|
@ -323,7 +323,7 @@ repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
|
|||
ExecDropSingleTupleTableSlot(dstate->slot);
|
||||
|
||||
FreeDecodingContext(ctx);
|
||||
ReplicationSlotDropAcquired();
|
||||
ReplicationSlotDropAcquired(true);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -1406,7 +1406,8 @@ ApplyLauncherMain(Datum main_arg)
|
|||
if (MyReplicationSlot)
|
||||
{
|
||||
if (!retain_dead_tuples)
|
||||
ReplicationSlotDropAcquired();
|
||||
/* XXX unclear why we don't request logical decoding disable */
|
||||
ReplicationSlotDropAcquired(false);
|
||||
else if (can_update_xmin)
|
||||
update_conflict_slot_xmin(xmin);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -557,7 +557,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
|
|||
* database drop by the startup process and the creation of a new
|
||||
* slot by the user. This new user-created slot may end up using
|
||||
* the same shared memory as that of 'local_slot'. Thus check if
|
||||
* local_slot is still the synced one before performing actual
|
||||
* local_slot is still the synced one before performing the actual
|
||||
* drop.
|
||||
*/
|
||||
SpinLockAcquire(&local_slot->mutex);
|
||||
|
|
@ -566,8 +566,14 @@ drop_local_obsolete_slots(List *remote_slot_list)
|
|||
|
||||
if (synced_slot)
|
||||
{
|
||||
/*
|
||||
* Now acquire and drop the slot. Note we purposely don't
|
||||
* request logical decoding to be disabled here: since this is
|
||||
* a standby, which derives its logical decoding state from
|
||||
* the primary, it would be wrong to do so.
|
||||
*/
|
||||
ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
|
||||
ReplicationSlotDropAcquired();
|
||||
ReplicationSlotDropAcquired(false);
|
||||
}
|
||||
|
||||
UnlockSharedObject(DatabaseRelationId, local_slot->data.database,
|
||||
|
|
|
|||
|
|
@ -783,19 +783,10 @@ ReplicationSlotRelease(void)
|
|||
if (slot->data.persistency == RS_EPHEMERAL)
|
||||
{
|
||||
/*
|
||||
* Delete the slot. There is no !PANIC case where this is allowed to
|
||||
* fail, all that may happen is an incomplete cleanup of the on-disk
|
||||
* data.
|
||||
* If slot is ephemeral, we drop it upon release, and request logical
|
||||
* decoding be disabled.
|
||||
*/
|
||||
ReplicationSlotDropAcquired();
|
||||
|
||||
/*
|
||||
* Request to disable logical decoding, even though this slot may not
|
||||
* have been the last logical slot. The checkpointer will verify if
|
||||
* logical decoding should actually be disabled.
|
||||
*/
|
||||
if (is_logical)
|
||||
RequestDisableLogicalDecoding();
|
||||
ReplicationSlotDropAcquired(is_logical);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -914,15 +905,13 @@ restart:
|
|||
}
|
||||
|
||||
/*
|
||||
* Permanently drop replication slot identified by the passed in name.
|
||||
* Permanently drop the replication slot identified by the passed-in name.
|
||||
*
|
||||
* If this is a logical slot, request that logical decoding be disabled.
|
||||
*/
|
||||
void
|
||||
ReplicationSlotDrop(const char *name, bool nowait)
|
||||
{
|
||||
bool is_logical;
|
||||
|
||||
Assert(MyReplicationSlot == NULL);
|
||||
|
||||
ReplicationSlotAcquire(name, nowait, false);
|
||||
|
||||
/*
|
||||
|
|
@ -935,12 +924,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
|
|||
errmsg("cannot drop replication slot \"%s\"", name),
|
||||
errdetail("This replication slot is being synchronized from the primary server."));
|
||||
|
||||
is_logical = SlotIsLogical(MyReplicationSlot);
|
||||
|
||||
ReplicationSlotDropAcquired();
|
||||
|
||||
if (is_logical)
|
||||
RequestDisableLogicalDecoding();
|
||||
ReplicationSlotDropAcquired(SlotIsLogical(MyReplicationSlot));
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -1037,18 +1021,28 @@ ReplicationSlotAlter(const char *name, const bool *failover,
|
|||
|
||||
/*
|
||||
* Permanently drop the currently acquired replication slot.
|
||||
*
|
||||
* If caller requests it, have checkpointer attempt to disable logical
|
||||
* decoding. Obviously, this should only be done if the slot is logical.
|
||||
*/
|
||||
void
|
||||
ReplicationSlotDropAcquired(void)
|
||||
ReplicationSlotDropAcquired(bool try_disable)
|
||||
{
|
||||
ReplicationSlot *slot = MyReplicationSlot;
|
||||
ReplicationSlot *slot;
|
||||
|
||||
Assert(MyReplicationSlot != NULL);
|
||||
slot = MyReplicationSlot;
|
||||
|
||||
/* Can only disable logical decoding if slot is logical */
|
||||
Assert(!try_disable || SlotIsLogical(slot));
|
||||
|
||||
/* slot isn't acquired anymore */
|
||||
MyReplicationSlot = NULL;
|
||||
|
||||
ReplicationSlotDropPtr(slot);
|
||||
|
||||
if (try_disable)
|
||||
RequestDisableLogicalDecoding();
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -1511,7 +1505,7 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
|
|||
* This routine isn't as efficient as it could be - but we don't drop
|
||||
* databases often, especially databases with lots of slots.
|
||||
*
|
||||
* If it drops the last logical slot in the cluster, it requests to disable
|
||||
* If the last logical slot in the cluster is dropped, request to disable
|
||||
* logical decoding.
|
||||
*/
|
||||
void
|
||||
|
|
@ -1606,7 +1600,7 @@ restart:
|
|||
* beginning each time we release the lock.
|
||||
*/
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
ReplicationSlotDropAcquired();
|
||||
ReplicationSlotDropAcquired(false);
|
||||
dropped = true;
|
||||
goto restart;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -335,7 +335,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
|
|||
bool synced);
|
||||
extern void ReplicationSlotPersist(void);
|
||||
extern void ReplicationSlotDrop(const char *name, bool nowait);
|
||||
extern void ReplicationSlotDropAcquired(void);
|
||||
extern void ReplicationSlotDropAcquired(bool try_disable);
|
||||
extern void ReplicationSlotAlter(const char *name, const bool *failover,
|
||||
const bool *two_phase);
|
||||
|
||||
|
|
|
|||
|
|
@ -400,6 +400,20 @@ select pg_cancel_backend(pid) from pg_stat_activity where query ~ 'slot_canceled
|
|||
"the activation process aborted");
|
||||
}
|
||||
|
||||
# Test that logical decoding is disabled after repack
|
||||
$primary->safe_psql('postgres', qq[create table foo(a int primary key)]);
|
||||
$primary->safe_psql('postgres', qq[repack (concurrently) foo;]);
|
||||
ok( $primary->log_contains(
|
||||
"logical decoding is enabled upon creating a new logical replication slot"
|
||||
),
|
||||
"logical decoding enabled by repack");
|
||||
|
||||
# Wait for the checkpointer to disable logical decoding.
|
||||
wait_for_logical_decoding_disabled($primary);
|
||||
test_wal_level($primary, "replica|replica",
|
||||
"logical decoding disabled after repack"
|
||||
);
|
||||
|
||||
$primary->stop;
|
||||
|
||||
done_testing();
|
||||
|
|
|
|||
Loading…
Reference in a new issue