mirror of
https://github.com/postgres/postgres.git
synced 2026-04-06 09:45:29 -04:00
Convert AIO to use the new shmem allocation functions
This replaces the "shmem_size" and "shmem_init" callbacks in the IO methods table with the same ShmemCallback struct that we now use in other subsystems Reviewed-by: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com> Reviewed-by: Matthias van de Meent <boekewurm+postgres@gmail.com> Reviewed-by: Daniel Gustafsson <daniel@yesql.se> Discussion: https://www.postgresql.org/message-id/CAExHW5vM1bneLYfg0wGeAa=52UiJ3z4vKd3AJ72X8Fw6k3KKrg@mail.gmail.com
This commit is contained in:
parent
2e0943a859
commit
58a1573385
7 changed files with 126 additions and 130 deletions
|
|
@ -23,16 +23,24 @@
|
|||
#include "storage/ipc.h"
|
||||
#include "storage/proc.h"
|
||||
#include "storage/shmem.h"
|
||||
#include "storage/subsystems.h"
|
||||
#include "utils/guc.h"
|
||||
|
||||
|
||||
static void AioShmemRequest(void *arg);
|
||||
static void AioShmemInit(void *arg);
|
||||
static void AioShmemAttach(void *arg);
|
||||
|
||||
static Size
|
||||
AioCtlShmemSize(void)
|
||||
{
|
||||
/* pgaio_ctl itself */
|
||||
return sizeof(PgAioCtl);
|
||||
}
|
||||
const ShmemCallbacks AioShmemCallbacks = {
|
||||
.request_fn = AioShmemRequest,
|
||||
.init_fn = AioShmemInit,
|
||||
.attach_fn = AioShmemAttach,
|
||||
};
|
||||
|
||||
static PgAioBackend *AioBackendShmemPtr;
|
||||
static PgAioHandle *AioHandleShmemPtr;
|
||||
static struct iovec *AioHandleIOVShmemPtr;
|
||||
static uint64 *AioHandleDataShmemPtr;
|
||||
|
||||
static uint32
|
||||
AioProcs(void)
|
||||
|
|
@ -109,12 +117,15 @@ AioChooseMaxConcurrency(void)
|
|||
return Min(max_proportional_pins, 64);
|
||||
}
|
||||
|
||||
Size
|
||||
AioShmemSize(void)
|
||||
/*
|
||||
* Register AIO subsystem's shared memory needs.
|
||||
*/
|
||||
static void
|
||||
AioShmemRequest(void *arg)
|
||||
{
|
||||
Size sz = 0;
|
||||
|
||||
/*
|
||||
* Resolve io_max_concurrency if not already done
|
||||
*
|
||||
* We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
|
||||
* However, if the DBA explicitly set io_max_concurrency = -1 in the
|
||||
* config file, then PGC_S_DYNAMIC_DEFAULT will fail to override that and
|
||||
|
|
@ -132,48 +143,52 @@ AioShmemSize(void)
|
|||
PGC_S_OVERRIDE);
|
||||
}
|
||||
|
||||
sz = add_size(sz, AioCtlShmemSize());
|
||||
sz = add_size(sz, AioBackendShmemSize());
|
||||
sz = add_size(sz, AioHandleShmemSize());
|
||||
sz = add_size(sz, AioHandleIOVShmemSize());
|
||||
sz = add_size(sz, AioHandleDataShmemSize());
|
||||
ShmemRequestStruct(.name = "AioCtl",
|
||||
.size = sizeof(PgAioCtl),
|
||||
.ptr = (void **) &pgaio_ctl,
|
||||
);
|
||||
|
||||
/* Reserve space for method specific resources. */
|
||||
if (pgaio_method_ops->shmem_size)
|
||||
sz = add_size(sz, pgaio_method_ops->shmem_size());
|
||||
ShmemRequestStruct(.name = "AioBackend",
|
||||
.size = AioBackendShmemSize(),
|
||||
.ptr = (void **) &AioBackendShmemPtr,
|
||||
);
|
||||
|
||||
return sz;
|
||||
ShmemRequestStruct(.name = "AioHandle",
|
||||
.size = AioHandleShmemSize(),
|
||||
.ptr = (void **) &AioHandleShmemPtr,
|
||||
);
|
||||
|
||||
ShmemRequestStruct(.name = "AioHandleIOV",
|
||||
.size = AioHandleIOVShmemSize(),
|
||||
.ptr = (void **) &AioHandleIOVShmemPtr,
|
||||
);
|
||||
|
||||
ShmemRequestStruct(.name = "AioHandleData",
|
||||
.size = AioHandleDataShmemSize(),
|
||||
.ptr = (void **) &AioHandleDataShmemPtr,
|
||||
);
|
||||
|
||||
if (pgaio_method_ops->shmem_callbacks.request_fn)
|
||||
pgaio_method_ops->shmem_callbacks.request_fn(pgaio_method_ops->shmem_callbacks.opaque_arg);
|
||||
}
|
||||
|
||||
void
|
||||
AioShmemInit(void)
|
||||
/*
|
||||
* Initialize AIO shared memory during postmaster startup.
|
||||
*/
|
||||
static void
|
||||
AioShmemInit(void *arg)
|
||||
{
|
||||
bool found;
|
||||
uint32 io_handle_off = 0;
|
||||
uint32 iovec_off = 0;
|
||||
uint32 per_backend_iovecs = io_max_concurrency * io_max_combine_limit;
|
||||
|
||||
pgaio_ctl = (PgAioCtl *)
|
||||
ShmemInitStruct("AioCtl", AioCtlShmemSize(), &found);
|
||||
|
||||
if (found)
|
||||
goto out;
|
||||
|
||||
memset(pgaio_ctl, 0, AioCtlShmemSize());
|
||||
|
||||
pgaio_ctl->io_handle_count = AioProcs() * io_max_concurrency;
|
||||
pgaio_ctl->iovec_count = AioProcs() * per_backend_iovecs;
|
||||
|
||||
pgaio_ctl->backend_state = (PgAioBackend *)
|
||||
ShmemInitStruct("AioBackend", AioBackendShmemSize(), &found);
|
||||
|
||||
pgaio_ctl->io_handles = (PgAioHandle *)
|
||||
ShmemInitStruct("AioHandle", AioHandleShmemSize(), &found);
|
||||
|
||||
pgaio_ctl->iovecs = (struct iovec *)
|
||||
ShmemInitStruct("AioHandleIOV", AioHandleIOVShmemSize(), &found);
|
||||
pgaio_ctl->handle_data = (uint64 *)
|
||||
ShmemInitStruct("AioHandleData", AioHandleDataShmemSize(), &found);
|
||||
pgaio_ctl->backend_state = AioBackendShmemPtr;
|
||||
pgaio_ctl->io_handles = AioHandleShmemPtr;
|
||||
pgaio_ctl->iovecs = AioHandleIOVShmemPtr;
|
||||
pgaio_ctl->handle_data = AioHandleDataShmemPtr;
|
||||
|
||||
for (int procno = 0; procno < AioProcs(); procno++)
|
||||
{
|
||||
|
|
@ -208,10 +223,15 @@ AioShmemInit(void)
|
|||
}
|
||||
}
|
||||
|
||||
out:
|
||||
/* Initialize IO method specific resources. */
|
||||
if (pgaio_method_ops->shmem_init)
|
||||
pgaio_method_ops->shmem_init(!found);
|
||||
if (pgaio_method_ops->shmem_callbacks.init_fn)
|
||||
pgaio_method_ops->shmem_callbacks.init_fn(pgaio_method_ops->shmem_callbacks.opaque_arg);
|
||||
}
|
||||
|
||||
static void
|
||||
AioShmemAttach(void *arg)
|
||||
{
|
||||
if (pgaio_method_ops->shmem_callbacks.attach_fn)
|
||||
pgaio_method_ops->shmem_callbacks.attach_fn(pgaio_method_ops->shmem_callbacks.opaque_arg);
|
||||
}
|
||||
|
||||
void
|
||||
|
|
|
|||
|
|
@ -49,8 +49,8 @@
|
|||
|
||||
|
||||
/* Entry points for IoMethodOps. */
|
||||
static size_t pgaio_uring_shmem_size(void);
|
||||
static void pgaio_uring_shmem_init(bool first_time);
|
||||
static void pgaio_uring_shmem_request(void *arg);
|
||||
static void pgaio_uring_shmem_init(void *arg);
|
||||
static void pgaio_uring_init_backend(void);
|
||||
static int pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
|
||||
static void pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation);
|
||||
|
|
@ -70,8 +70,8 @@ const IoMethodOps pgaio_uring_ops = {
|
|||
*/
|
||||
.wait_on_fd_before_close = true,
|
||||
|
||||
.shmem_size = pgaio_uring_shmem_size,
|
||||
.shmem_init = pgaio_uring_shmem_init,
|
||||
.shmem_callbacks.request_fn = pgaio_uring_shmem_request,
|
||||
.shmem_callbacks.init_fn = pgaio_uring_shmem_init,
|
||||
.init_backend = pgaio_uring_init_backend,
|
||||
|
||||
.submit = pgaio_uring_submit,
|
||||
|
|
@ -267,12 +267,6 @@ pgaio_uring_shmem_size(void)
|
|||
{
|
||||
size_t sz;
|
||||
|
||||
/*
|
||||
* Kernel and liburing support for various features influences how much
|
||||
* shmem we need, perform the necessary checks.
|
||||
*/
|
||||
pgaio_uring_check_capabilities();
|
||||
|
||||
sz = pgaio_uring_context_shmem_size();
|
||||
sz = add_size(sz, pgaio_uring_ring_shmem_size());
|
||||
|
||||
|
|
@ -280,10 +274,24 @@ pgaio_uring_shmem_size(void)
|
|||
}
|
||||
|
||||
static void
|
||||
pgaio_uring_shmem_init(bool first_time)
|
||||
pgaio_uring_shmem_request(void *arg)
|
||||
{
|
||||
/*
|
||||
* Kernel and liburing support for various features influences how much
|
||||
* shmem we need, perform the necessary checks.
|
||||
*/
|
||||
pgaio_uring_check_capabilities();
|
||||
|
||||
ShmemRequestStruct(.name = "AioUringContext",
|
||||
.size = pgaio_uring_shmem_size(),
|
||||
.ptr = (void **) &pgaio_uring_contexts,
|
||||
);
|
||||
}
|
||||
|
||||
static void
|
||||
pgaio_uring_shmem_init(void *arg)
|
||||
{
|
||||
int TotalProcs = pgaio_uring_procs();
|
||||
bool found;
|
||||
char *shmem;
|
||||
size_t ring_mem_remain = 0;
|
||||
char *ring_mem_next = 0;
|
||||
|
|
@ -291,13 +299,11 @@ pgaio_uring_shmem_init(bool first_time)
|
|||
/*
|
||||
* We allocate memory for all PgAioUringContext instances and, if
|
||||
* supported, the memory required for each of the io_uring instances, in
|
||||
* one ShmemInitStruct().
|
||||
* one combined allocation.
|
||||
*
|
||||
* pgaio_uring_contexts is already set to the base of the allocation.
|
||||
*/
|
||||
shmem = ShmemInitStruct("AioUringContext", pgaio_uring_shmem_size(), &found);
|
||||
if (found)
|
||||
return;
|
||||
|
||||
pgaio_uring_contexts = (PgAioUringContext *) shmem;
|
||||
shmem = (char *) pgaio_uring_contexts;
|
||||
shmem += pgaio_uring_context_shmem_size();
|
||||
|
||||
/* if supported, handle memory alignment / sizing for io_uring memory */
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@
|
|||
#include "storage/ipc.h"
|
||||
#include "storage/latch.h"
|
||||
#include "storage/proc.h"
|
||||
#include "storage/shmem.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "utils/injection_point.h"
|
||||
#include "utils/memdebug.h"
|
||||
|
|
@ -73,16 +74,16 @@ typedef struct PgAioWorkerControl
|
|||
} PgAioWorkerControl;
|
||||
|
||||
|
||||
static size_t pgaio_worker_shmem_size(void);
|
||||
static void pgaio_worker_shmem_init(bool first_time);
|
||||
static void pgaio_worker_shmem_request(void *arg);
|
||||
static void pgaio_worker_shmem_init(void *arg);
|
||||
|
||||
static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
|
||||
static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
|
||||
|
||||
|
||||
const IoMethodOps pgaio_worker_ops = {
|
||||
.shmem_size = pgaio_worker_shmem_size,
|
||||
.shmem_init = pgaio_worker_shmem_init,
|
||||
.shmem_callbacks.request_fn = pgaio_worker_shmem_request,
|
||||
.shmem_callbacks.init_fn = pgaio_worker_shmem_init,
|
||||
|
||||
.needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
|
||||
.submit = pgaio_worker_submit,
|
||||
|
|
@ -99,64 +100,45 @@ static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
|
|||
static PgAioWorkerControl *io_worker_control;
|
||||
|
||||
|
||||
static size_t
|
||||
pgaio_worker_queue_shmem_size(int *queue_size)
|
||||
static void
|
||||
pgaio_worker_shmem_request(void *arg)
|
||||
{
|
||||
/* Round size up to next power of two so we can make a mask. */
|
||||
*queue_size = pg_nextpower2_32(io_worker_queue_size);
|
||||
|
||||
return offsetof(PgAioWorkerSubmissionQueue, sqes) +
|
||||
sizeof(int) * *queue_size;
|
||||
}
|
||||
|
||||
static size_t
|
||||
pgaio_worker_control_shmem_size(void)
|
||||
{
|
||||
return offsetof(PgAioWorkerControl, workers) +
|
||||
sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
|
||||
}
|
||||
|
||||
static size_t
|
||||
pgaio_worker_shmem_size(void)
|
||||
{
|
||||
size_t sz;
|
||||
size_t size;
|
||||
int queue_size;
|
||||
|
||||
sz = pgaio_worker_queue_shmem_size(&queue_size);
|
||||
sz = add_size(sz, pgaio_worker_control_shmem_size());
|
||||
/* Round size up to next power of two so we can make a mask. */
|
||||
queue_size = pg_nextpower2_32(io_worker_queue_size);
|
||||
|
||||
return sz;
|
||||
size = offsetof(PgAioWorkerSubmissionQueue, sqes) + sizeof(int) * queue_size;
|
||||
ShmemRequestStruct(.name = "AioWorkerSubmissionQueue",
|
||||
.size = size,
|
||||
.ptr = (void **) &io_worker_submission_queue,
|
||||
);
|
||||
|
||||
size = offsetof(PgAioWorkerControl, workers) + sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
|
||||
ShmemRequestStruct(.name = "AioWorkerControl",
|
||||
.size = size,
|
||||
.ptr = (void **) &io_worker_control,
|
||||
);
|
||||
}
|
||||
|
||||
static void
|
||||
pgaio_worker_shmem_init(bool first_time)
|
||||
pgaio_worker_shmem_init(void *arg)
|
||||
{
|
||||
bool found;
|
||||
int queue_size;
|
||||
|
||||
io_worker_submission_queue =
|
||||
ShmemInitStruct("AioWorkerSubmissionQueue",
|
||||
pgaio_worker_queue_shmem_size(&queue_size),
|
||||
&found);
|
||||
if (!found)
|
||||
{
|
||||
io_worker_submission_queue->size = queue_size;
|
||||
io_worker_submission_queue->head = 0;
|
||||
io_worker_submission_queue->tail = 0;
|
||||
}
|
||||
/* Round size up like in pgaio_worker_shmem_request() */
|
||||
queue_size = pg_nextpower2_32(io_worker_queue_size);
|
||||
|
||||
io_worker_control =
|
||||
ShmemInitStruct("AioWorkerControl",
|
||||
pgaio_worker_control_shmem_size(),
|
||||
&found);
|
||||
if (!found)
|
||||
io_worker_submission_queue->size = queue_size;
|
||||
io_worker_submission_queue->head = 0;
|
||||
io_worker_submission_queue->tail = 0;
|
||||
|
||||
io_worker_control->idle_worker_mask = 0;
|
||||
for (int i = 0; i < MAX_IO_WORKERS; ++i)
|
||||
{
|
||||
io_worker_control->idle_worker_mask = 0;
|
||||
for (int i = 0; i < MAX_IO_WORKERS; ++i)
|
||||
{
|
||||
io_worker_control->workers[i].latch = NULL;
|
||||
io_worker_control->workers[i].in_use = false;
|
||||
}
|
||||
io_worker_control->workers[i].latch = NULL;
|
||||
io_worker_control->workers[i].in_use = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -122,7 +122,6 @@ CalculateShmemSize(void)
|
|||
size = add_size(size, WaitEventCustomShmemSize());
|
||||
size = add_size(size, InjectionPointShmemSize());
|
||||
size = add_size(size, SlotSyncShmemSize());
|
||||
size = add_size(size, AioShmemSize());
|
||||
size = add_size(size, WaitLSNShmemSize());
|
||||
size = add_size(size, LogicalDecodingCtlShmemSize());
|
||||
size = add_size(size, DataChecksumsShmemSize());
|
||||
|
|
@ -301,7 +300,6 @@ CreateOrAttachShmemStructs(void)
|
|||
StatsShmemInit();
|
||||
WaitEventCustomShmemInit();
|
||||
InjectionPointShmemInit();
|
||||
AioShmemInit();
|
||||
WaitLSNShmemInit();
|
||||
LogicalDecodingCtlShmemInit();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,6 +20,8 @@
|
|||
#include "port/pg_iovec.h"
|
||||
#include "storage/aio.h"
|
||||
#include "storage/condition_variable.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/shmem.h"
|
||||
|
||||
|
||||
/*
|
||||
|
|
@ -267,20 +269,8 @@ typedef struct IoMethodOps
|
|||
*/
|
||||
bool wait_on_fd_before_close;
|
||||
|
||||
|
||||
/* global initialization */
|
||||
|
||||
/*
|
||||
* Amount of additional shared memory to reserve for the io_method. Called
|
||||
* just like a normal ipci.c style *Size() function. Optional.
|
||||
*/
|
||||
size_t (*shmem_size) (void);
|
||||
|
||||
/*
|
||||
* Initialize shared memory. First time is true if AIO's shared memory was
|
||||
* just initialized, false otherwise. Optional.
|
||||
*/
|
||||
void (*shmem_init) (bool first_time);
|
||||
ShmemCallbacks shmem_callbacks;
|
||||
|
||||
/*
|
||||
* Per-backend initialization. Optional.
|
||||
|
|
|
|||
|
|
@ -20,9 +20,6 @@
|
|||
|
||||
|
||||
/* aio_init.c */
|
||||
extern Size AioShmemSize(void);
|
||||
extern void AioShmemInit(void);
|
||||
|
||||
extern void pgaio_init_backend(void);
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -53,3 +53,6 @@ PG_SHMEM_SUBSYSTEM(ProcSignalShmemCallbacks)
|
|||
|
||||
/* other modules that need some shared memory space */
|
||||
PG_SHMEM_SUBSYSTEM(AsyncShmemCallbacks)
|
||||
|
||||
/* AIO subsystem. This delegates to the method-specific callbacks */
|
||||
PG_SHMEM_SUBSYSTEM(AioShmemCallbacks)
|
||||
|
|
|
|||
Loading…
Reference in a new issue