diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index f40b84592a6..39bcf844694 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -388,11 +388,11 @@ CreateInitDecodingContext(const char *plugin, * without further interlock its return value might immediately be out of * date. * - * So we have to acquire the ProcArrayLock to prevent computation of new - * xmin horizons by other backends, get the safe decoding xid, and inform - * the slot machinery about the new limit. Once that's done the - * ProcArrayLock can be released as the slot machinery now is - * protecting against vacuum. + * So we have to acquire both the ReplicationSlotControlLock and the + * ProcArrayLock to prevent concurrent computation and update of new xmin + * horizons by other backends, get the safe decoding xid, and inform the + * slot machinery about the new limit. Once that's done both locks can be + * released as the slot machinery now is protecting against vacuum. * * Note that, temporarily, the data, not just the catalog, xmin has to be * reserved if a data snapshot is to be exported. Otherwise the initial @@ -405,6 +405,7 @@ CreateInitDecodingContext(const char *plugin, * * ---- */ + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot); @@ -419,6 +420,7 @@ CreateInitDecodingContext(const char *plugin, ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); + LWLockRelease(ReplicationSlotControlLock); ReplicationSlotMarkDirty(); ReplicationSlotSave(); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index c9eb1836a64..345e1bef4c8 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -788,8 +788,11 @@ ReplicationSlotPersist(void) /* * Compute the oldest xmin across all slots and store it in the ProcArray. * - * If already_locked is true, ProcArrayLock has already been acquired - * exclusively. + * If already_locked is true, both the ReplicationSlotControlLock and the + * ProcArrayLock have already been acquired exclusively. It is crucial that the + * caller first acquires the ReplicationSlotControlLock, followed by the + * ProcArrayLock, to prevent any undetectable deadlocks since this function + * acquires them in that order. */ void ReplicationSlotsComputeRequiredXmin(bool already_locked) @@ -799,8 +802,33 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) TransactionId agg_catalog_xmin = InvalidTransactionId; Assert(ReplicationSlotCtl != NULL); + Assert(!already_locked || + (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) && + LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE))); - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + /* + * Hold the ReplicationSlotControlLock until after updating the slot xmin + * values, so no backend updates the initial xmin for newly created slot + * concurrently. A shared lock is used here to minimize lock contention, + * especially when many slots exist and advancements occur frequently. + * This is safe since an exclusive lock is taken during initial slot xmin + * update in slot creation. + * + * One might think that we can hold the ProcArrayLock exclusively and + * update the slot xmin values, but it could increase lock contention on + * the ProcArrayLock, which is not great since this function can be called + * at non-negligible frequency. + * + * Concurrent invocation of this function may cause the computed slot xmin + * to regress. However, this is harmless because tuples prior to the most + * recent xmin are no longer useful once advancement occurs (see + * LogicalConfirmReceivedLocation where the slot's xmin value is flushed + * before updating the effective_xmin). Thus, such regression merely + * prevents VACUUM from prematurely removing tuples without causing the + * early deletion of required data. + */ + if (!already_locked) + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -836,9 +864,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) agg_catalog_xmin = effective_catalog_xmin; } - LWLockRelease(ReplicationSlotControlLock); - ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked); + + if (!already_locked) + LWLockRelease(ReplicationSlotControlLock); } /*