Improve handling of concurrent checksum requests

When pg_{enable|disable}_data_checksums is called while checksums are
being enabled or disabled, the already running launcher is detected
and the new desired state is recorded.  Processing will then pick up
the new state and change its operation to fulfill the new request.
If the same state is requested but with different cost values, the
new cost values will take effect on the next relation processed.

The previous coding had a complex logic of starting a new launcher
for this, which is now avoided with the shared mem structure instead
used to signal current processing.

This makes the logic more robust, and fixes a bug where the launcher
would erroneously revert back to the "off" state.

Access to the shared memory is also protected with LWLocks in all
cases.  Since the shmem structure is used for signalling between
the worker and the launcher, and there can be only one of each,
there were no concurrency issues detected but it's better to stick
to proper locking protocol should this ever be updated to handle
multiple workers.

Author: Daniel Gustafsson <daniel@yesql.se>
Reviewed-by: Tomas Vondra <tomas@vondra.me>
Reviewed-by: SATYANARAYANA NARLAPURAM <satyanarlapuram@gmail.com>
Reviewed-by: Ayush Tiwari <ayushtiwari.slg01@gmail.com>
Discussion: https://postgr.es/m/9197F930-DDEB-4CAC-82A2-16FEC715CCE8@yesql.se
This commit is contained in:
Daniel Gustafsson 2026-04-30 13:41:53 +02:00
parent 381d19da15
commit bf25e5571b
3 changed files with 114 additions and 25 deletions

View file

@ -4682,10 +4682,41 @@ DataChecksumsNeedWrite(void)
LocalDataChecksumState == PG_DATA_CHECKSUM_INPROGRESS_OFF);
}
bool
DataChecksumsOff(void)
{
bool ret;
SpinLockAcquire(&XLogCtl->info_lck);
ret = (XLogCtl->data_checksum_version == PG_DATA_CHECKSUM_OFF);
SpinLockRelease(&XLogCtl->info_lck);
return ret;
}
bool
DataChecksumsOn(void)
{
bool ret;
SpinLockAcquire(&XLogCtl->info_lck);
ret = (XLogCtl->data_checksum_version == PG_DATA_CHECKSUM_VERSION);
SpinLockRelease(&XLogCtl->info_lck);
return ret;
}
bool
DataChecksumsInProgressOn(void)
{
return LocalDataChecksumState == PG_DATA_CHECKSUM_INPROGRESS_ON;
bool ret;
SpinLockAcquire(&XLogCtl->info_lck);
ret = (XLogCtl->data_checksum_version == PG_DATA_CHECKSUM_INPROGRESS_ON);
SpinLockRelease(&XLogCtl->info_lck);
return ret;
}
/*

View file

@ -235,7 +235,7 @@ typedef struct ChecksumBarrierCondition
int to;
} ChecksumBarrierCondition;
static const ChecksumBarrierCondition checksum_barriers[7] =
static const ChecksumBarrierCondition checksum_barriers[9] =
{
/*
* Disabling checksums: If checksums are currently enabled, disabling must
@ -267,6 +267,13 @@ static const ChecksumBarrierCondition checksum_barriers[7] =
* set to off since we cannot reach on at that point.
*/
{PG_DATA_CHECKSUM_INPROGRESS_ON, PG_DATA_CHECKSUM_INPROGRESS_OFF},
/*
* Transitions that can happen when a new request is made while another is
* currently being processed.
*/
{PG_DATA_CHECKSUM_INPROGRESS_OFF, PG_DATA_CHECKSUM_INPROGRESS_ON},
{PG_DATA_CHECKSUM_OFF, PG_DATA_CHECKSUM_INPROGRESS_OFF},
};
/*
@ -368,6 +375,15 @@ const ShmemCallbacks DataChecksumsShmemCallbacks = {
.request_fn = DataChecksumsShmemRequest,
};
#define CHECK_FOR_ABORT_REQUEST() \
do { \
LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED); \
if (DataChecksumState->launch_operation != operation) \
abort_requested = true; \
LWLockRelease(DataChecksumsWorkerLock); \
} while (0)
/*****************************************************************************
* Functionality for manipulating the data checksum state in the cluster
*/
@ -557,7 +573,6 @@ StartDataChecksumsWorkerLauncher(DataChecksumsWorkerOperation op,
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
bool running;
DataChecksumsWorkerOperation launcher_running_op;
#ifdef USE_ASSERT_CHECKING
/* The cost delay settings have no effect when disabling */
@ -576,8 +591,6 @@ StartDataChecksumsWorkerLauncher(DataChecksumsWorkerOperation op,
/* Is the launcher already running? If so, what is it doing? */
running = DataChecksumState->launcher_running;
if (running)
launcher_running_op = DataChecksumState->operation;
LWLockRelease(DataChecksumsWorkerLock);
@ -594,13 +607,17 @@ StartDataChecksumsWorkerLauncher(DataChecksumsWorkerOperation op,
* the launcher has had a chance to start up, we still end up launching it
* twice. That's OK, the second invocation will see that a launcher is
* already running and exit quickly.
*
* TODO: We could optimize here and skip launching the launcher, if we are
* already in the desired state, i.e. if the checksums are already enabled
* and you call pg_enable_data_checksums().
*/
if (!running)
{
if ((op == ENABLE_DATACHECKSUMS && DataChecksumsOn()) ||
(op == DISABLE_DATACHECKSUMS && DataChecksumsOff()))
{
ereport(LOG,
errmsg("data checksums already in desired state, exiting"));
return;
}
/*
* Prepare the BackgroundWorker and launch it.
*/
@ -622,9 +639,8 @@ StartDataChecksumsWorkerLauncher(DataChecksumsWorkerOperation op,
}
else
{
if (launcher_running_op == op)
ereport(ERROR,
errmsg("data checksum processing already running"));
ereport(LOG,
errmsg("data checksum processing already running"));
}
}
@ -813,14 +829,17 @@ ProcessDatabase(DataChecksumsWorkerDatabase *db)
* If the worker managed to start, and stop, before we got to waiting
* for it we can see a STOPPED status here without it being a failure.
*/
LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED);
if (DataChecksumState->success == DATACHECKSUMSWORKER_SUCCESSFUL)
{
LWLockRelease(DataChecksumsWorkerLock);
pgstat_report_activity(STATE_IDLE, NULL);
LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
DataChecksumState->worker_pid = InvalidPid;
LWLockRelease(DataChecksumsWorkerLock);
return DataChecksumState->success;
}
LWLockRelease(DataChecksumsWorkerLock);
ereport(WARNING,
errmsg("could not start background worker for enabling data checksums in database \"%s\"",
@ -873,10 +892,12 @@ ProcessDatabase(DataChecksumsWorkerDatabase *db)
db->dbname),
errhint("Restart the database and restart data checksum processing by calling pg_enable_data_checksums()."));
LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED);
if (DataChecksumState->success == DATACHECKSUMSWORKER_ABORTED)
ereport(LOG,
errmsg("data checksums processing was aborted in database \"%s\"",
db->dbname));
LWLockRelease(DataChecksumsWorkerLock);
pgstat_report_activity(STATE_IDLE, NULL);
LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
@ -1002,11 +1023,8 @@ WaitForAllTransactionsToFinish(void)
errhint("Data checksums processing must be restarted manually after cluster restart."));
CHECK_FOR_INTERRUPTS();
CHECK_FOR_ABORT_REQUEST();
LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED);
if (DataChecksumState->launch_operation != operation)
abort_requested = true;
LWLockRelease(DataChecksumsWorkerLock);
if (abort_requested)
break;
}
@ -1189,7 +1207,9 @@ ProcessAllDatabases(void)
int cumulative_total = 0;
/* Set up so first run processes shared catalogs, not once in every db */
LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
DataChecksumState->process_shared_catalogs = true;
LWLockRelease(DataChecksumsWorkerLock);
/* Get a list of all databases to process */
WaitForAllTransactionsToFinish();
@ -1265,7 +1285,9 @@ ProcessAllDatabases(void)
* When one database has completed, it will have done shared catalogs
* so we don't have to process them again.
*/
LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
DataChecksumState->process_shared_catalogs = false;
LWLockRelease(DataChecksumsWorkerLock);
}
FreeDatabaseList(DatabaseList);
@ -1507,7 +1529,6 @@ DataChecksumsWorkerMain(Datum arg)
* implementation detail and care should be taken to avoid it bleeding
* through to the user to avoid confusion.
*/
Assert(DataChecksumState->operation == ENABLE_DATACHECKSUMS);
VacuumCostDelay = DataChecksumState->cost_delay;
VacuumCostLimit = DataChecksumState->cost_limit;
VacuumCostActive = (VacuumCostDelay > 0);
@ -1543,7 +1564,7 @@ DataChecksumsWorkerMain(Datum arg)
rels_done = 0;
foreach_oid(reloid, RelationList)
{
CHECK_FOR_INTERRUPTS();
bool costs_updated = false;
if (!ProcessSingleRelationByOid(reloid, strategy))
{
@ -1553,12 +1574,48 @@ DataChecksumsWorkerMain(Datum arg)
pgstat_progress_update_param(PROGRESS_DATACHECKSUMS_RELS_DONE,
++rels_done);
}
list_free(RelationList);
CHECK_FOR_INTERRUPTS();
CHECK_FOR_ABORT_REQUEST();
if (aborted)
if (abort_requested)
break;
/*
* Check if the cost settings changed during runtime and if so, update
* to reflect the new values and signal that the access strategy needs
* to be refreshed.
*/
LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
if ((DataChecksumState->launch_cost_delay != DataChecksumState->cost_delay)
|| (DataChecksumState->launch_cost_limit != DataChecksumState->cost_limit))
{
costs_updated = true;
VacuumCostDelay = DataChecksumState->launch_cost_delay;
VacuumCostLimit = DataChecksumState->launch_cost_limit;
VacuumCostActive = (VacuumCostDelay > 0);
DataChecksumState->cost_delay = DataChecksumState->launch_cost_delay;
DataChecksumState->cost_limit = DataChecksumState->launch_cost_limit;
}
else
costs_updated = false;
LWLockRelease(DataChecksumsWorkerLock);
if (costs_updated)
{
FreeAccessStrategy(strategy);
strategy = GetAccessStrategy(BAS_VACUUM);
}
}
list_free(RelationList);
FreeAccessStrategy(strategy);
if (aborted || abort_requested)
{
LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
DataChecksumState->success = DATACHECKSUMSWORKER_ABORTED;
LWLockRelease(DataChecksumsWorkerLock);
ereport(DEBUG1,
errmsg("data checksum processing aborted in database OID %u",
dboid));
@ -1623,15 +1680,14 @@ DataChecksumsWorkerMain(Datum arg)
3000,
WAIT_EVENT_CHECKSUM_ENABLE_TEMPTABLE_WAIT);
LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
aborted = DataChecksumState->launch_operation != operation;
LWLockRelease(DataChecksumsWorkerLock);
CHECK_FOR_INTERRUPTS();
CHECK_FOR_ABORT_REQUEST();
if (aborted || abort_requested)
{
LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
DataChecksumState->success = DATACHECKSUMSWORKER_ABORTED;
LWLockRelease(DataChecksumsWorkerLock);
ereport(LOG,
errmsg("data checksum processing aborted in database OID %u",
dboid));

View file

@ -249,6 +249,8 @@ extern uint64 GetSystemIdentifier(void);
extern char *GetMockAuthenticationNonce(void);
extern bool DataChecksumsNeedWrite(void);
extern bool DataChecksumsNeedVerify(void);
extern bool DataChecksumsOn(void);
extern bool DataChecksumsOff(void);
extern bool DataChecksumsInProgressOn(void);
extern void SetDataChecksumsOnInProgress(void);
extern void SetDataChecksumsOn(void);