diff --git a/src/backend/commands/repack.c b/src/backend/commands/repack.c index 22a1307b38d..da9d941f4a1 100644 --- a/src/backend/commands/repack.c +++ b/src/backend/commands/repack.c @@ -3408,21 +3408,22 @@ static void start_repack_decoding_worker(Oid relid) { Size size; - dsm_segment *seg; DecodingWorkerShared *shared; shm_mq *mq; - shm_mq_handle *mqh; BackgroundWorker bgw; + decoding_worker = palloc0_object(DecodingWorker); + /* Setup shared memory. */ size = BUFFERALIGN(offsetof(DecodingWorkerShared, error_queue)) + BUFFERALIGN(REPACK_ERROR_QUEUE_SIZE); - seg = dsm_create(size, 0); - shared = (DecodingWorkerShared *) dsm_segment_address(seg); + decoding_worker->seg = dsm_create(size, 0); + + shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg); shared->initialized = false; shared->lsn_upto = InvalidXLogRecPtr; shared->done = false; - SharedFileSetInit(&shared->sfs, seg); + SharedFileSetInit(&shared->sfs, decoding_worker->seg); shared->last_exported = -1; SpinLockInit(&shared->mutex); shared->dbid = MyDatabaseId; @@ -3441,7 +3442,8 @@ start_repack_decoding_worker(Oid relid) mq = shm_mq_create((char *) BUFFERALIGN(shared->error_queue), REPACK_ERROR_QUEUE_SIZE); shm_mq_set_receiver(mq, MyProc); - mqh = shm_mq_attach(mq, seg, NULL); + + decoding_worker->error_mqh = shm_mq_attach(mq, decoding_worker->seg, NULL); memset(&bgw, 0, sizeof(bgw)); snprintf(bgw.bgw_name, BGW_MAXLEN, @@ -3454,19 +3456,15 @@ start_repack_decoding_worker(Oid relid) bgw.bgw_restart_time = BGW_NEVER_RESTART; snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); snprintf(bgw.bgw_function_name, BGW_MAXLEN, "RepackWorkerMain"); - bgw.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg)); + bgw.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(decoding_worker->seg)); bgw.bgw_notify_pid = MyProcPid; - decoding_worker = palloc0_object(DecodingWorker); if (!RegisterDynamicBackgroundWorker(&bgw, &decoding_worker->handle)) ereport(ERROR, errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("out of background worker slots"), errhint("You might need to increase \"%s\".", "max_worker_processes")); - decoding_worker->seg = seg; - decoding_worker->error_mqh = mqh; - /* * The decoding setup must be done before the caller can have XID assigned * for any reason, otherwise the worker might end up in a deadlock, @@ -3499,38 +3497,43 @@ start_repack_decoding_worker(Oid relid) static void stop_repack_decoding_worker(void) { - BgwHandleStatus status; - - /* Haven't reached the worker startup? */ + /* Nothing to do if no worker was set up. */ if (decoding_worker == NULL) return; - /* Could not register the worker? */ - if (decoding_worker->handle == NULL) - return; + /* Terminate the worker process, if one is running. */ + if (decoding_worker->handle != NULL) + { + BgwHandleStatus status; - TerminateBackgroundWorker(decoding_worker->handle); - /* The worker should really exit before the REPACK command does. */ - HOLD_INTERRUPTS(); - status = WaitForBackgroundWorkerShutdown(decoding_worker->handle); - RESUME_INTERRUPTS(); + TerminateBackgroundWorker(decoding_worker->handle); + /* The worker should really exit before the REPACK command does. */ + HOLD_INTERRUPTS(); + status = WaitForBackgroundWorkerShutdown(decoding_worker->handle); + RESUME_INTERRUPTS(); - if (status == BGWH_POSTMASTER_DIED) - ereport(FATAL, - errcode(ERRCODE_ADMIN_SHUTDOWN), - errmsg("postmaster exited during REPACK command")); - - shm_mq_detach(decoding_worker->error_mqh); + if (status == BGWH_POSTMASTER_DIED) + ereport(FATAL, + errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("postmaster exited during REPACK command")); + } /* - * If we could not cancel the current sleep due to ERROR, do that before - * we detach from the shared memory the condition variable is located in. - * If we did not, the bgworker ERROR handling code would try and fail - * badly. + * Now detach from our shared memory segment. In error cases there might + * still be messages from the worker in the queue, which ProcessInterrupts + * would try to read; this is pointless (and causes an assertion failure), + * so set the global pointer to NULL to have ProcessRepackMessages ignore + * them. + * + * We must also cancel the current sleep, if one is still set up. This is + * critical because the CV lives in the DSM that we're about to detach, so + * if we omit it, later automatic cleanup tries to clear freed memory. */ + if (decoding_worker->error_mqh != NULL) + shm_mq_detach(decoding_worker->error_mqh); ConditionVariableCancelSleep(); - - dsm_detach(decoding_worker->seg); + if (decoding_worker->seg != NULL) + dsm_detach(decoding_worker->seg); pfree(decoding_worker); decoding_worker = NULL; }