mirror of
https://github.com/postgres/postgres.git
synced 2026-06-11 01:30:11 -04:00
Restructure repack worker teardown
The original code would leave a shared memory segment unreleased if we fail partway through initialization. Change the shutdown order so that we always free it. Author: Álvaro Herrera <alvherre@kurilemu.de> Reviewed-by: Antonin Houska <ah@cybertec.at> Discussion: https://postgr.es/m/agtNn6ZCmdI2KJFn@alvherre.pgsql
This commit is contained in:
parent
cfedd45133
commit
1588d89af2
1 changed files with 37 additions and 34 deletions
|
|
@ -3408,21 +3408,22 @@ static void
|
|||
start_repack_decoding_worker(Oid relid)
|
||||
{
|
||||
Size size;
|
||||
dsm_segment *seg;
|
||||
DecodingWorkerShared *shared;
|
||||
shm_mq *mq;
|
||||
shm_mq_handle *mqh;
|
||||
BackgroundWorker bgw;
|
||||
|
||||
decoding_worker = palloc0_object(DecodingWorker);
|
||||
|
||||
/* Setup shared memory. */
|
||||
size = BUFFERALIGN(offsetof(DecodingWorkerShared, error_queue)) +
|
||||
BUFFERALIGN(REPACK_ERROR_QUEUE_SIZE);
|
||||
seg = dsm_create(size, 0);
|
||||
shared = (DecodingWorkerShared *) dsm_segment_address(seg);
|
||||
decoding_worker->seg = dsm_create(size, 0);
|
||||
|
||||
shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg);
|
||||
shared->initialized = false;
|
||||
shared->lsn_upto = InvalidXLogRecPtr;
|
||||
shared->done = false;
|
||||
SharedFileSetInit(&shared->sfs, seg);
|
||||
SharedFileSetInit(&shared->sfs, decoding_worker->seg);
|
||||
shared->last_exported = -1;
|
||||
SpinLockInit(&shared->mutex);
|
||||
shared->dbid = MyDatabaseId;
|
||||
|
|
@ -3441,7 +3442,8 @@ start_repack_decoding_worker(Oid relid)
|
|||
mq = shm_mq_create((char *) BUFFERALIGN(shared->error_queue),
|
||||
REPACK_ERROR_QUEUE_SIZE);
|
||||
shm_mq_set_receiver(mq, MyProc);
|
||||
mqh = shm_mq_attach(mq, seg, NULL);
|
||||
|
||||
decoding_worker->error_mqh = shm_mq_attach(mq, decoding_worker->seg, NULL);
|
||||
|
||||
memset(&bgw, 0, sizeof(bgw));
|
||||
snprintf(bgw.bgw_name, BGW_MAXLEN,
|
||||
|
|
@ -3454,19 +3456,15 @@ start_repack_decoding_worker(Oid relid)
|
|||
bgw.bgw_restart_time = BGW_NEVER_RESTART;
|
||||
snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
|
||||
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "RepackWorkerMain");
|
||||
bgw.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
|
||||
bgw.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(decoding_worker->seg));
|
||||
bgw.bgw_notify_pid = MyProcPid;
|
||||
|
||||
decoding_worker = palloc0_object(DecodingWorker);
|
||||
if (!RegisterDynamicBackgroundWorker(&bgw, &decoding_worker->handle))
|
||||
ereport(ERROR,
|
||||
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
|
||||
errmsg("out of background worker slots"),
|
||||
errhint("You might need to increase \"%s\".", "max_worker_processes"));
|
||||
|
||||
decoding_worker->seg = seg;
|
||||
decoding_worker->error_mqh = mqh;
|
||||
|
||||
/*
|
||||
* The decoding setup must be done before the caller can have XID assigned
|
||||
* for any reason, otherwise the worker might end up in a deadlock,
|
||||
|
|
@ -3499,38 +3497,43 @@ start_repack_decoding_worker(Oid relid)
|
|||
static void
|
||||
stop_repack_decoding_worker(void)
|
||||
{
|
||||
BgwHandleStatus status;
|
||||
|
||||
/* Haven't reached the worker startup? */
|
||||
/* Nothing to do if no worker was set up. */
|
||||
if (decoding_worker == NULL)
|
||||
return;
|
||||
|
||||
/* Could not register the worker? */
|
||||
if (decoding_worker->handle == NULL)
|
||||
return;
|
||||
/* Terminate the worker process, if one is running. */
|
||||
if (decoding_worker->handle != NULL)
|
||||
{
|
||||
BgwHandleStatus status;
|
||||
|
||||
TerminateBackgroundWorker(decoding_worker->handle);
|
||||
/* The worker should really exit before the REPACK command does. */
|
||||
HOLD_INTERRUPTS();
|
||||
status = WaitForBackgroundWorkerShutdown(decoding_worker->handle);
|
||||
RESUME_INTERRUPTS();
|
||||
TerminateBackgroundWorker(decoding_worker->handle);
|
||||
/* The worker should really exit before the REPACK command does. */
|
||||
HOLD_INTERRUPTS();
|
||||
status = WaitForBackgroundWorkerShutdown(decoding_worker->handle);
|
||||
RESUME_INTERRUPTS();
|
||||
|
||||
if (status == BGWH_POSTMASTER_DIED)
|
||||
ereport(FATAL,
|
||||
errcode(ERRCODE_ADMIN_SHUTDOWN),
|
||||
errmsg("postmaster exited during REPACK command"));
|
||||
|
||||
shm_mq_detach(decoding_worker->error_mqh);
|
||||
if (status == BGWH_POSTMASTER_DIED)
|
||||
ereport(FATAL,
|
||||
errcode(ERRCODE_ADMIN_SHUTDOWN),
|
||||
errmsg("postmaster exited during REPACK command"));
|
||||
}
|
||||
|
||||
/*
|
||||
* If we could not cancel the current sleep due to ERROR, do that before
|
||||
* we detach from the shared memory the condition variable is located in.
|
||||
* If we did not, the bgworker ERROR handling code would try and fail
|
||||
* badly.
|
||||
* Now detach from our shared memory segment. In error cases there might
|
||||
* still be messages from the worker in the queue, which ProcessInterrupts
|
||||
* would try to read; this is pointless (and causes an assertion failure),
|
||||
* so set the global pointer to NULL to have ProcessRepackMessages ignore
|
||||
* them.
|
||||
*
|
||||
* We must also cancel the current sleep, if one is still set up. This is
|
||||
* critical because the CV lives in the DSM that we're about to detach, so
|
||||
* if we omit it, later automatic cleanup tries to clear freed memory.
|
||||
*/
|
||||
if (decoding_worker->error_mqh != NULL)
|
||||
shm_mq_detach(decoding_worker->error_mqh);
|
||||
ConditionVariableCancelSleep();
|
||||
|
||||
dsm_detach(decoding_worker->seg);
|
||||
if (decoding_worker->seg != NULL)
|
||||
dsm_detach(decoding_worker->seg);
|
||||
pfree(decoding_worker);
|
||||
decoding_worker = NULL;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue