aio: Adjust I/O worker pool automatically.

The size of the I/O worker pool used to implement io_method=worker was
previously controlled by the io_workers setting, defaulting to 3.  It
was hard to know how to tune it effectively.  That is replaced with:

  io_min_workers=2
  io_max_workers=8 (up to 32)
  io_worker_idle_timeout=60s
  io_worker_launch_interval=100ms

The pool is automatically sized within the configured range according to
recent variation in demand.  It grows when existing workers detect that
latency might be introduced by queuing, and shrinks when the
highest-numbered worker is idle for too long.  Work was already
concentrated into low-numbered workers in anticipation of this logic.

The logic for waking extra workers now also tries to measure and reduce
the number of spurious wakeups, though they are not entirely eliminated.

Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Dmitry Dolgov <9erthalion6@gmail.com>
Reviewed-by: Nazir Bilal Yavuz <byavuz81@gmail.com>
Discussion: https://postgr.es/m/CA%2BhUKG%2Bm4xV0LMoH2c%3DoRAdEXuCnh%2BtGBTWa7uFeFMGgTLAw%2BQ%40mail.gmail.com
This commit is contained in:
Thomas Munro 2026-04-08 19:06:14 +12:00
parent 948ef7cdc4
commit d1c01b79d4
11 changed files with 803 additions and 149 deletions

View file

@ -2942,16 +2942,75 @@ include_dir 'conf.d'
</listitem>
</varlistentry>
<varlistentry id="guc-io-workers" xreflabel="io_workers">
<term><varname>io_workers</varname> (<type>integer</type>)
<varlistentry id="guc-io-min-workers" xreflabel="io_min_workers">
<term><varname>io_min_workers</varname> (<type>integer</type>)
<indexterm>
<primary><varname>io_workers</varname> configuration parameter</primary>
<primary><varname>io_min_workers</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Selects the number of I/O worker processes to use. The default is
3. This parameter can only be set in the
Sets the minimum number of I/O worker processes. The default is
2. This parameter can only be set in the
<filename>postgresql.conf</filename> file or on the server command
line.
</para>
<para>
Only has an effect if <xref linkend="guc-io-method"/> is set to
<literal>worker</literal>.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-io-max-workers" xreflabel="io_max_workers">
<term><varname>io_max_workers</varname> (<type>int</type>)
<indexterm>
<primary><varname>io_max_workers</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Sets the maximum number of I/O worker processes. The default is
8. This parameter can only be set in the
<filename>postgresql.conf</filename> file or on the server command
line.
</para>
<para>
Only has an effect if <xref linkend="guc-io-method"/> is set to
<literal>worker</literal>.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-io-worker-idle-timeout" xreflabel="io_worker_idle_timeout">
<term><varname>io_worker_idle_timeout</varname> (<type>int</type>)
<indexterm>
<primary><varname>io_worker_idle_timeout</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Sets the time after which entirely idle I/O worker processes exit, reducing the
size of pool to match demand. The default is 1 minute. This
parameter can only be set in the
<filename>postgresql.conf</filename> file or on the server command
line.
</para>
<para>
Only has an effect if <xref linkend="guc-io-method"/> is set to
<literal>worker</literal>.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-io-worker-launch-interval" xreflabel="io_worker_launch_interval">
<term><varname>io_worker_launch_interval</varname> (<type>int</type>)
<indexterm>
<primary><varname>io_worker_launch_interval</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Sets the minimum time before another I/O worker can be launched. This avoids
creating too many for an unsustained burst of activity. The default is 100ms.
This parameter can only be set in the
<filename>postgresql.conf</filename> file or on the server command
line.
</para>

View file

@ -409,6 +409,7 @@ static DNSServiceRef bonjour_sdref = NULL;
#endif
/* State for IO worker management. */
static TimestampTz io_worker_launch_next_time = 0;
static int io_worker_count = 0;
static PMChild *io_worker_children[MAX_IO_WORKERS];
@ -447,7 +448,8 @@ static int CountChildren(BackendTypeMask targetMask);
static void LaunchMissingBackgroundProcesses(void);
static void maybe_start_bgworkers(void);
static bool maybe_reap_io_worker(int pid);
static void maybe_adjust_io_workers(void);
static void maybe_start_io_workers(void);
static TimestampTz maybe_start_io_workers_scheduled_at(void);
static bool CreateOptsFile(int argc, char *argv[], char *fullprogname);
static PMChild *StartChildProcess(BackendType type);
static void StartSysLogger(void);
@ -1391,7 +1393,7 @@ PostmasterMain(int argc, char *argv[])
UpdatePMState(PM_STARTUP);
/* Make sure we can perform I/O while starting up. */
maybe_adjust_io_workers();
maybe_start_io_workers();
/* Start bgwriter and checkpointer so they can help with recovery */
if (CheckpointerPMChild == NULL)
@ -1555,14 +1557,15 @@ checkControlFile(void)
static int
DetermineSleepTime(void)
{
TimestampTz next_wakeup = 0;
TimestampTz next_wakeup;
/*
* Normal case: either there are no background workers at all, or we're in
* a shutdown sequence (during which we ignore bgworkers altogether).
* If in ImmediateShutdown with a SIGKILL timeout, ignore everything else
* and wait for that.
*
* XXX Shouldn't this also test FatalError?
*/
if (Shutdown > NoShutdown ||
(!StartWorkerNeeded && !HaveCrashedWorker))
if (Shutdown >= ImmediateShutdown)
{
if (AbortStartTime != 0)
{
@ -1582,14 +1585,16 @@ DetermineSleepTime(void)
return seconds * 1000;
}
else
return 60 * 1000;
}
if (StartWorkerNeeded)
/* Time of next maybe_start_io_workers() call, or 0 for none. */
next_wakeup = maybe_start_io_workers_scheduled_at();
/* Ignore bgworkers during shutdown. */
if (StartWorkerNeeded && Shutdown == NoShutdown)
return 0;
if (HaveCrashedWorker)
if (HaveCrashedWorker && Shutdown == NoShutdown)
{
dlist_mutable_iter iter;
@ -2545,7 +2550,17 @@ process_pm_child_exit(void)
if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus))
HandleChildCrash(pid, exitstatus, _("io worker"));
maybe_adjust_io_workers();
/*
* A worker that exited with an error might have brought the pool
* size below io_min_workers, or allowed the queue to grow to the
* point where another worker called for growth.
*
* In the common case that a worker timed out due to idleness, no
* replacement needs to be started. maybe_start_io_workers() will
* figure that out.
*/
maybe_start_io_workers();
continue;
}
@ -3265,7 +3280,7 @@ PostmasterStateMachine(void)
UpdatePMState(PM_STARTUP);
/* Make sure we can perform I/O while starting up. */
maybe_adjust_io_workers();
maybe_start_io_workers();
StartupPMChild = StartChildProcess(B_STARTUP);
Assert(StartupPMChild != NULL);
@ -3339,7 +3354,7 @@ LaunchMissingBackgroundProcesses(void)
* A config file change will always lead to this function being called, so
* we always will process the config change in a timely manner.
*/
maybe_adjust_io_workers();
maybe_start_io_workers();
/*
* The checkpointer and the background writer are active from the start,
@ -3800,6 +3815,16 @@ process_pm_pmsignal(void)
StartWorkerNeeded = true;
}
/* Process IO worker start requests. */
if (CheckPostmasterSignal(PMSIGNAL_IO_WORKER_GROW))
{
/*
* No local flag, as the state is exposed through pgaio_worker_*()
* functions. This signal is received on potentially actionable level
* changes, so that maybe_start_io_workers() will run.
*/
}
/* Process background worker state changes. */
if (CheckPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE))
{
@ -4402,44 +4427,113 @@ maybe_reap_io_worker(int pid)
}
/*
* Start or stop IO workers, to close the gap between the number of running
* workers and the number of configured workers. Used to respond to change of
* the io_workers GUC (by increasing and decreasing the number of workers), as
* well as workers terminating in response to errors (by starting
* "replacement" workers).
* Returns the next time at which maybe_start_io_workers() would start one or
* more I/O workers. Any time in the past means ASAP, and 0 means no worker
* is currently scheduled.
*
* This is called by DetermineSleepTime() and also maybe_start_io_workers()
* itself, to make sure that they agree.
*/
static void
maybe_adjust_io_workers(void)
static TimestampTz
maybe_start_io_workers_scheduled_at(void)
{
if (!pgaio_workers_enabled())
return;
return 0;
/*
* If we're in final shutting down state, then we're just waiting for all
* processes to exit.
*/
if (pmState >= PM_WAIT_IO_WORKERS)
return;
return 0;
/* Don't start new workers during an immediate shutdown either. */
if (Shutdown >= ImmediateShutdown)
return;
return 0;
/*
* Don't start new workers if we're in the shutdown phase of a crash
* restart. But we *do* need to start if we're already starting up again.
*/
if (FatalError && pmState >= PM_STOP_BACKENDS)
return;
return 0;
Assert(pmState < PM_WAIT_IO_WORKERS);
/*
* Don't start a worker if we're at or above the maximum. (Excess workers
* exit when the GUC is lowered, but the count can be temporarily too high
* until they are reaped.)
*/
if (io_worker_count >= io_max_workers)
return 0;
/* Not enough running? */
while (io_worker_count < io_workers)
/* If we're under the minimum, start a worker as soon as possible. */
if (io_worker_count < io_min_workers)
return TIMESTAMP_MINUS_INFINITY; /* start worker ASAP */
/* Only proceed if a "grow" signal has been received from a worker. */
if (!pgaio_worker_pm_test_grow_signal_sent())
return 0;
/*
* maybe_start_io_workers() should start a new I/O worker after this time,
* or as soon as possible if is already in the past.
*/
return io_worker_launch_next_time;
}
/*
* Start I/O workers if required. Used at startup, to respond to change of
* the io_min_workers GUC, when asked to start a new one due to submission
* queue backlog, and after workers terminate in response to errors (by
* starting "replacement" workers).
*/
static void
maybe_start_io_workers(void)
{
TimestampTz scheduled_at;
while ((scheduled_at = maybe_start_io_workers_scheduled_at()) != 0)
{
TimestampTz now = GetCurrentTimestamp();
PMChild *child;
int i;
Assert(pmState < PM_WAIT_IO_WORKERS);
/* Still waiting for the scheduled time? */
if (scheduled_at > now)
break;
/*
* Compute next launch time relative to the previous value, so that
* time spent on the postmaster's other duties don't result in an
* inaccurate launch interval.
*/
io_worker_launch_next_time =
TimestampTzPlusMilliseconds(io_worker_launch_next_time,
io_worker_launch_interval);
/*
* If that's already in the past, the interval is either impossibly
* short or we received no requests for new workers for a period.
* Compute a new future time relative to now instead.
*/
if (io_worker_launch_next_time <= now)
io_worker_launch_next_time =
TimestampTzPlusMilliseconds(now, io_worker_launch_interval);
/*
* Check if a grow signal has been received, but the grow request has
* been canceled since then because work ran out. We've still
* advanced the next launch time, to suppress repeat signals from
* workers until then.
*/
if (io_worker_count >= io_min_workers && !pgaio_worker_pm_test_grow())
{
pgaio_worker_pm_clear_grow_signal_sent();
break;
}
/* find unused entry in io_worker_children array */
for (i = 0; i < MAX_IO_WORKERS; ++i)
{
@ -4457,22 +4551,21 @@ maybe_adjust_io_workers(void)
++io_worker_count;
}
else
break; /* try again next time */
}
/* Too many running? */
if (io_worker_count > io_workers)
{
/* ask the IO worker in the highest slot to exit */
for (int i = MAX_IO_WORKERS - 1; i >= 0; --i)
{
if (io_worker_children[i] != NULL)
{
kill(io_worker_children[i]->pid, SIGUSR2);
break;
}
/*
* Fork failure: we'll try again after the launch interval
* expires, or be called again without delay if we don't yet have
* io_min_workers. Don't loop here though, the postmaster has
* other duties.
*/
break;
}
}
/*
* Workers decide when to shut down by themselves, according to the
* io_max_workers and io_worker_idle_timeout GUCs.
*/
}

View file

@ -11,9 +11,8 @@
* infrastructure for reopening the file, and must processed synchronously by
* the client code when submitted.
*
* So that the submitter can make just one system call when submitting a batch
* of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
* could be improved by using futexes instead of latches to wake N waiters.
* The pool of workers tries to stabilize at a size that can handle recently
* seen variation in demand, within the configured limits.
*
* This method of AIO is available in all builds on all operating systems, and
* is the default.
@ -29,6 +28,8 @@
#include "postgres.h"
#include <limits.h>
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "port/pg_bitutils.h"
@ -40,6 +41,8 @@
#include "storage/io_worker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "tcop/tcopprot.h"
@ -48,10 +51,22 @@
#include "utils/ps_status.h"
#include "utils/wait_event.h"
/*
* Saturation for counters used to estimate wakeup:IO ratio.
*
* We maintain hist_wakeups for wakeups received and hist_ios for IOs
* processed by each worker. When either counter reaches this saturation
* value, we divide both by two. The result is an exponentially decaying
* ratio of wakeups to IOs, with a very short memory.
*
* If a worker is itself experiencing useless wakeups, it assumes that
* higher-numbered workers would experience even more, so it should end the
* chain.
*/
#define PGAIO_WORKER_WAKEUP_RATIO_SATURATE 4
/* How many workers should each worker wake up if needed? */
#define IO_WORKER_WAKEUP_FANOUT 2
/* Debugging support: show current IO and wakeups:ios statistics in ps. */
/* #define PGAIO_WORKER_SHOW_PS_INFO */
typedef struct PgAioWorkerSubmissionQueue
{
@ -63,13 +78,35 @@ typedef struct PgAioWorkerSubmissionQueue
typedef struct PgAioWorkerSlot
{
Latch *latch;
bool in_use;
ProcNumber proc_number;
} PgAioWorkerSlot;
/*
* Sets of worker IDs are held in a simple bitmap, accessed through functions
* that provide a more readable abstraction. If we wanted to support more
* workers than that, the contention on the single queue would surely get too
* high, so we might want to consider multiple pools instead of widening this.
*/
typedef uint64 PgAioWorkerSet;
#define PGAIO_WORKERSET_BITS (sizeof(PgAioWorkerSet) * CHAR_BIT)
static_assert(PGAIO_WORKERSET_BITS >= MAX_IO_WORKERS, "too small");
typedef struct PgAioWorkerControl
{
uint64 idle_worker_mask;
/* Seen by postmaster */
bool grow;
bool grow_signal_sent;
/* Protected by AioWorkerSubmissionQueueLock. */
PgAioWorkerSet idle_workerset;
/* Protected by AioWorkerControlLock. */
PgAioWorkerSet workerset;
int nworkers;
/* Protected by AioWorkerControlLock. */
PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
} PgAioWorkerControl;
@ -91,15 +128,108 @@ const IoMethodOps pgaio_worker_ops = {
/* GUCs */
int io_workers = 3;
int io_min_workers = 2;
int io_max_workers = 8;
int io_worker_idle_timeout = 60000;
int io_worker_launch_interval = 100;
static int io_worker_queue_size = 64;
static int MyIoWorkerId;
static int MyIoWorkerId = -1;
static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
static PgAioWorkerControl *io_worker_control;
static void
pgaio_workerset_initialize(PgAioWorkerSet *set)
{
*set = 0;
}
static bool
pgaio_workerset_is_empty(PgAioWorkerSet *set)
{
return *set == 0;
}
static PgAioWorkerSet
pgaio_workerset_singleton(int worker)
{
Assert(worker >= 0 && worker < MAX_IO_WORKERS);
return UINT64_C(1) << worker;
}
static void
pgaio_workerset_all(PgAioWorkerSet *set)
{
*set = UINT64_MAX >> (PGAIO_WORKERSET_BITS - MAX_IO_WORKERS);
}
static void
pgaio_workerset_subtract(PgAioWorkerSet *set1, const PgAioWorkerSet *set2)
{
*set1 &= ~*set2;
}
static void
pgaio_workerset_insert(PgAioWorkerSet *set, int worker)
{
Assert(worker >= 0 && worker < MAX_IO_WORKERS);
*set |= pgaio_workerset_singleton(worker);
}
static void
pgaio_workerset_remove(PgAioWorkerSet *set, int worker)
{
Assert(worker >= 0 && worker < MAX_IO_WORKERS);
*set &= ~pgaio_workerset_singleton(worker);
}
static void
pgaio_workerset_remove_lte(PgAioWorkerSet *set, int worker)
{
Assert(worker >= 0 && worker < MAX_IO_WORKERS);
*set &= (~(PgAioWorkerSet) 0) << (worker + 1);
}
static int
pgaio_workerset_get_highest(PgAioWorkerSet *set)
{
Assert(!pgaio_workerset_is_empty(set));
return pg_leftmost_one_pos64(*set);
}
static int
pgaio_workerset_get_lowest(PgAioWorkerSet *set)
{
Assert(!pgaio_workerset_is_empty(set));
return pg_rightmost_one_pos64(*set);
}
static int
pgaio_workerset_pop_lowest(PgAioWorkerSet *set)
{
int worker = pgaio_workerset_get_lowest(set);
pgaio_workerset_remove(set, worker);
return worker;
}
#ifdef USE_ASSERT_CHECKING
static bool
pgaio_workerset_contains(PgAioWorkerSet *set, int worker)
{
Assert(worker >= 0 && worker < MAX_IO_WORKERS);
return (*set & pgaio_workerset_singleton(worker)) != 0;
}
static int
pgaio_workerset_count(PgAioWorkerSet *set)
{
return pg_popcount64(*set);
}
#endif
static void
pgaio_worker_shmem_request(void *arg)
{
@ -133,37 +263,160 @@ pgaio_worker_shmem_init(void *arg)
io_worker_submission_queue->size = queue_size;
io_worker_submission_queue->head = 0;
io_worker_submission_queue->tail = 0;
io_worker_control->grow = false;
pgaio_workerset_initialize(&io_worker_control->workerset);
pgaio_workerset_initialize(&io_worker_control->idle_workerset);
io_worker_control->idle_worker_mask = 0;
for (int i = 0; i < MAX_IO_WORKERS; ++i)
{
io_worker_control->workers[i].latch = NULL;
io_worker_control->workers[i].in_use = false;
}
io_worker_control->workers[i].proc_number = INVALID_PROC_NUMBER;
}
/*
* Tell postmaster that we think a new worker is needed.
*/
static void
pgaio_worker_request_grow(void)
{
/*
* Suppress useless signaling if we already know that we're at the
* maximum. This uses an unlocked read of nworkers, but that's OK for
* this heuristic purpose.
*/
if (io_worker_control->nworkers >= io_max_workers)
return;
/* Already requested? */
if (io_worker_control->grow)
return;
io_worker_control->grow = true;
pg_memory_barrier();
/*
* If the postmaster has already been signaled, don't do it again until
* the postmaster clears this flag. There is no point in repeated signals
* if grow is being set and cleared repeatedly while the postmaster is
* waiting for io_worker_launch_interval, which it applies even to
* canceled requests.
*/
if (io_worker_control->grow_signal_sent)
return;
io_worker_control->grow_signal_sent = true;
pg_memory_barrier();
SendPostmasterSignal(PMSIGNAL_IO_WORKER_GROW);
}
/*
* Cancel any request for a new worker, after observing an empty queue.
*/
static void
pgaio_worker_cancel_grow(void)
{
if (!io_worker_control->grow)
return;
io_worker_control->grow = false;
pg_memory_barrier();
}
/*
* Called by the postmaster to check if a new worker has been requested (but
* possibly canceled since).
*/
bool
pgaio_worker_pm_test_grow_signal_sent(void)
{
pg_memory_barrier();
return io_worker_control && io_worker_control->grow_signal_sent;
}
/*
* Called by the postmaster to check if a new worker has been requested and
* not canceled since.
*/
bool
pgaio_worker_pm_test_grow(void)
{
pg_memory_barrier();
return io_worker_control && io_worker_control->grow;
}
/*
* Called by the postmaster to clear the request for a new worker.
*/
void
pgaio_worker_pm_clear_grow_signal_sent(void)
{
if (!io_worker_control)
return;
io_worker_control->grow = false;
io_worker_control->grow_signal_sent = false;
pg_memory_barrier();
}
static int
pgaio_worker_choose_idle(void)
pgaio_worker_choose_idle(int only_workers_above)
{
PgAioWorkerSet workerset;
int worker;
if (io_worker_control->idle_worker_mask == 0)
Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
workerset = io_worker_control->idle_workerset;
if (only_workers_above >= 0)
pgaio_workerset_remove_lte(&workerset, only_workers_above);
if (pgaio_workerset_is_empty(&workerset))
return -1;
/* Find the lowest bit position, and clear it. */
worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
Assert(io_worker_control->workers[worker].in_use);
/* Find the lowest numbered idle worker and mark it not idle. */
worker = pgaio_workerset_get_lowest(&workerset);
pgaio_workerset_remove(&io_worker_control->idle_workerset, worker);
return worker;
}
/*
* Try to wake a worker by setting its latch, to tell it there are IOs to
* process in the submission queue.
*/
static void
pgaio_worker_wake(int worker)
{
ProcNumber proc_number;
/*
* If the selected worker is concurrently exiting, then pgaio_worker_die()
* had not yet removed it as of when we saw it in idle_workerset. That's
* OK, because it will wake all remaining workers to close wakeup-vs-exit
* races: *someone* will see the queued IO. If there are no workers
* running, the postmaster will start a new one.
*/
proc_number = io_worker_control->workers[worker].proc_number;
if (proc_number != INVALID_PROC_NUMBER)
SetLatch(&GetPGProcByNumber(proc_number)->procLatch);
}
/*
* Try to wake a set of workers. Used on pool change, to close races
* described in the callers.
*/
static void
pgaio_workerset_wake(PgAioWorkerSet workerset)
{
while (!pgaio_workerset_is_empty(&workerset))
pgaio_worker_wake(pgaio_workerset_pop_lowest(&workerset));
}
static bool
pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
{
PgAioWorkerSubmissionQueue *queue;
uint32 new_head;
Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
queue = io_worker_submission_queue;
new_head = (queue->head + 1) & (queue->size - 1);
if (new_head == queue->tail)
@ -185,6 +438,8 @@ pgaio_worker_submission_queue_consume(void)
PgAioWorkerSubmissionQueue *queue;
int result;
Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
queue = io_worker_submission_queue;
if (queue->tail == queue->head)
return -1; /* empty */
@ -201,6 +456,8 @@ pgaio_worker_submission_queue_depth(void)
uint32 head;
uint32 tail;
Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
head = io_worker_submission_queue->head;
tail = io_worker_submission_queue->tail;
@ -226,8 +483,7 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
{
PgAioHandle **synchronous_ios = NULL;
int nsync = 0;
Latch *wakeup = NULL;
int worker;
int worker = -1;
Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
@ -251,20 +507,14 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
break;
}
if (wakeup == NULL)
{
/* Choose an idle worker to wake up if we haven't already. */
worker = pgaio_worker_choose_idle();
if (worker >= 0)
wakeup = io_worker_control->workers[worker].latch;
pgaio_debug_io(DEBUG4, staged_ios[i],
"choosing worker %d",
worker);
}
}
/* Choose one worker to wake for this batch. */
worker = pgaio_worker_choose_idle(-1);
LWLockRelease(AioWorkerSubmissionQueueLock);
/* Wake up chosen worker. It will wake peers if necessary. */
if (worker != -1)
pgaio_worker_wake(worker);
}
else
{
@ -273,9 +523,6 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
nsync = num_staged_ios;
}
if (wakeup)
SetLatch(wakeup);
/* Run whatever is left synchronously. */
if (nsync > 0)
{
@ -295,14 +542,30 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
static void
pgaio_worker_die(int code, Datum arg)
{
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
Assert(io_worker_control->workers[MyIoWorkerId].in_use);
Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
PgAioWorkerSet notify_set;
io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
io_worker_control->workers[MyIoWorkerId].in_use = false;
io_worker_control->workers[MyIoWorkerId].latch = NULL;
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
pgaio_workerset_remove(&io_worker_control->idle_workerset, MyIoWorkerId);
LWLockRelease(AioWorkerSubmissionQueueLock);
LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
Assert(io_worker_control->workers[MyIoWorkerId].proc_number == MyProcNumber);
io_worker_control->workers[MyIoWorkerId].proc_number = INVALID_PROC_NUMBER;
Assert(pgaio_workerset_contains(&io_worker_control->workerset, MyIoWorkerId));
pgaio_workerset_remove(&io_worker_control->workerset, MyIoWorkerId);
notify_set = io_worker_control->workerset;
Assert(io_worker_control->nworkers > 0);
io_worker_control->nworkers--;
Assert(pgaio_workerset_count(&io_worker_control->workerset) ==
io_worker_control->nworkers);
LWLockRelease(AioWorkerControlLock);
/*
* Notify other workers on pool change. This allows the new highest
* worker to know that it is now the one that can time out, and closes a
* wakeup-loss race described in pgaio_worker_wake().
*/
pgaio_workerset_wake(notify_set);
}
/*
@ -312,33 +575,38 @@ pgaio_worker_die(int code, Datum arg)
static void
pgaio_worker_register(void)
{
PgAioWorkerSet free_workerset;
PgAioWorkerSet old_workerset;
MyIoWorkerId = -1;
/*
* XXX: This could do with more fine-grained locking. But it's also not
* very common for the number of workers to change at the moment...
*/
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
for (int i = 0; i < MAX_IO_WORKERS; ++i)
{
if (!io_worker_control->workers[i].in_use)
{
Assert(io_worker_control->workers[i].latch == NULL);
io_worker_control->workers[i].in_use = true;
MyIoWorkerId = i;
break;
}
else
Assert(io_worker_control->workers[i].latch != NULL);
}
LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
/* Find lowest unused worker ID. */
pgaio_workerset_all(&free_workerset);
pgaio_workerset_subtract(&free_workerset, &io_worker_control->workerset);
if (!pgaio_workerset_is_empty(&free_workerset))
MyIoWorkerId = pgaio_workerset_get_lowest(&free_workerset);
if (MyIoWorkerId == -1)
elog(ERROR, "couldn't find a free worker slot");
elog(ERROR, "couldn't find a free worker ID");
io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
LWLockRelease(AioWorkerSubmissionQueueLock);
Assert(io_worker_control->workers[MyIoWorkerId].proc_number ==
INVALID_PROC_NUMBER);
io_worker_control->workers[MyIoWorkerId].proc_number = MyProcNumber;
old_workerset = io_worker_control->workerset;
Assert(!pgaio_workerset_contains(&old_workerset, MyIoWorkerId));
pgaio_workerset_insert(&io_worker_control->workerset, MyIoWorkerId);
io_worker_control->nworkers++;
Assert(io_worker_control->nworkers <= MAX_IO_WORKERS);
Assert(pgaio_workerset_count(&io_worker_control->workerset) ==
io_worker_control->nworkers);
LWLockRelease(AioWorkerControlLock);
/*
* Notify other workers on pool change. If we were the highest worker,
* this allows the new highest worker to know that it can time out.
*/
pgaio_workerset_wake(old_workerset);
on_shmem_exit(pgaio_worker_die, 0);
}
@ -364,14 +632,48 @@ pgaio_worker_error_callback(void *arg)
errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
}
/*
* Check if this backend is allowed to time out, and thus should use a
* non-infinite sleep time. Only the highest-numbered worker is allowed to
* time out, and only if the pool is above io_min_workers. Serializing
* timeouts keeps IDs in a range 0..N without gaps, and avoids undershooting
* io_min_workers.
*
* The result is only instantaneously true and may be temporarily inconsistent
* in different workers around transitions, but all workers are woken up on
* pool size or GUC changes making the result eventually consistent.
*/
static bool
pgaio_worker_can_timeout(void)
{
PgAioWorkerSet workerset;
if (MyIoWorkerId < io_min_workers)
return false;
/* Serialize against pool size changes. */
LWLockAcquire(AioWorkerControlLock, LW_SHARED);
workerset = io_worker_control->workerset;
LWLockRelease(AioWorkerControlLock);
if (MyIoWorkerId != pgaio_workerset_get_highest(&workerset))
return false;
return true;
}
void
IoWorkerMain(const void *startup_data, size_t startup_data_len)
{
sigjmp_buf local_sigjmp_buf;
TimestampTz idle_timeout_abs = 0;
int timeout_guc_used = 0;
PgAioHandle *volatile error_ioh = NULL;
ErrorContextCallback errcallback = {0};
volatile int error_errno = 0;
char cmd[128];
int hist_ios = 0;
int hist_wakeups = 0;
AuxiliaryProcessMainCommon();
@ -439,10 +741,9 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
while (!ShutdownRequestPending)
{
uint32 io_index;
Latch *latches[IO_WORKER_WAKEUP_FANOUT];
int nlatches = 0;
int nwakeups = 0;
int worker;
int worker = -1;
int queue_depth = 0;
bool maybe_grow = false;
/*
* Try to get a job to do.
@ -453,38 +754,108 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
if ((io_index = pgaio_worker_submission_queue_consume()) == -1)
{
/*
* Nothing to do. Mark self idle.
*
* XXX: Invent some kind of back pressure to reduce useless
* wakeups?
*/
io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
/* Nothing to do. Mark self idle. */
pgaio_workerset_insert(&io_worker_control->idle_workerset,
MyIoWorkerId);
}
else
{
/* Got one. Clear idle flag. */
io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
pgaio_workerset_remove(&io_worker_control->idle_workerset,
MyIoWorkerId);
/* See if we can wake up some peers. */
nwakeups = Min(pgaio_worker_submission_queue_depth(),
IO_WORKER_WAKEUP_FANOUT);
for (int i = 0; i < nwakeups; ++i)
/*
* See if we should wake up a higher numbered peer. Only do that
* if this worker is not receiving spurious wakeups itself. The
* intention is create a frontier beyond which idle workers stay
* asleep.
*
* This heuristic tries to discover the useful wakeup propagation
* chain length when IOs are very fast and workers wake up to find
* that all IOs have already been taken.
*
* If we chose not to wake a worker when we ideally should have,
* then the ratio will soon change to correct that.
*/
if (hist_wakeups <= hist_ios)
{
if ((worker = pgaio_worker_choose_idle()) < 0)
break;
latches[nlatches++] = io_worker_control->workers[worker].latch;
queue_depth = pgaio_worker_submission_queue_depth();
if (queue_depth > 0)
{
/* Choose a worker higher than me to wake. */
worker = pgaio_worker_choose_idle(MyIoWorkerId);
if (worker == -1)
maybe_grow = true;
}
}
}
LWLockRelease(AioWorkerSubmissionQueueLock);
for (int i = 0; i < nlatches; ++i)
SetLatch(latches[i]);
/* Propagate wakeups. */
if (worker != -1)
{
pgaio_worker_wake(worker);
}
else if (maybe_grow)
{
/*
* We know there was at least one more item in the queue, and we
* failed to find a higher-numbered idle worker to wake. Now we
* decide if we should try to start one more worker.
*
* We do this with a simple heuristic: is the queue depth greater
* than the current number of workers?
*
* Consider the following situations:
*
* 1. The queue depth is constantly increasing, because IOs are
* arriving faster than they can possibly be serviced. It doesn't
* matter much which threshold we choose, as we will surely hit
* it. Crossing the current worker count is a useful signal
* because it's clearly too deep to avoid queuing latency already,
* but still leaves a small window of opportunity to improve the
* situation before the queue oveflows.
*
* 2. The worker pool is keeping up, no latency is being
* introduced and an extra worker would be a waste of resources.
* Queue depth distributions tend to be heavily skewed, with long
* tails of low probability spikes (due to submission clustering,
* scheduling, jitter, stalls, noisy neighbors, etc). We want a
* number that is very unlikely to be triggered by an outlier, and
* we bet that an exponential or similar distribution whose
* outliers never reach this threshold must be almost entirely
* concentrated at the low end. If we do see a spike as big as
* the worker count, we take it as a signal that the distribution
* is surely too wide.
*
* On its own, this is an extremely crude signal. When combined
* with the wakeup propagation test that precedes it (but on its
* own tends to overshoot) and io_worker_launch_delay, the result
* is that we gradually test each pool size until we find one that
* doesn't trigger further expansion, and then hold it for at
* least io_worker_idle_timeout.
*
* XXX Perhaps ideas from queueing theory or control theory could
* do a better job of this.
*/
/* Read nworkers without lock for this heuristic purpose. */
if (queue_depth > io_worker_control->nworkers)
pgaio_worker_request_grow();
}
if (io_index != -1)
{
PgAioHandle *ioh = NULL;
/* Cancel timeout and update wakeup:work ratio. */
idle_timeout_abs = 0;
if (++hist_ios == PGAIO_WORKER_WAKEUP_RATIO_SATURATE)
{
hist_wakeups /= 2;
hist_ios /= 2;
}
ioh = &pgaio_ctl->io_handles[io_index];
error_ioh = ioh;
errcallback.arg = ioh;
@ -537,6 +908,19 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
}
#endif
#ifdef PGAIO_WORKER_SHOW_PS_INFO
{
char *description = pgaio_io_get_target_description(ioh);
sprintf(cmd, "%d: [%s] %s",
MyIoWorkerId,
pgaio_io_get_op_name(ioh),
description);
pfree(description);
set_ps_display(cmd);
}
#endif
/*
* We don't expect this to ever fail with ERROR or FATAL, no need
* to keep error_ioh set to the IO.
@ -550,8 +934,76 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
}
else
{
WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
WAIT_EVENT_IO_WORKER_MAIN);
int timeout_ms;
/* Cancel new worker request if pending. */
pgaio_worker_cancel_grow();
/* Compute the remaining allowed idle time. */
if (io_worker_idle_timeout == -1)
{
/* Never time out. */
timeout_ms = -1;
}
else
{
TimestampTz now = GetCurrentTimestamp();
/* If the GUC changes, reset timer. */
if (idle_timeout_abs != 0 &&
io_worker_idle_timeout != timeout_guc_used)
idle_timeout_abs = 0;
/* Only the highest-numbered worker can time out. */
if (pgaio_worker_can_timeout())
{
if (idle_timeout_abs == 0)
{
/*
* I have just been promoted to the timeout worker, or
* the GUC changed. Compute new absolute time from
* now.
*/
idle_timeout_abs =
TimestampTzPlusMilliseconds(now,
io_worker_idle_timeout);
timeout_guc_used = io_worker_idle_timeout;
}
timeout_ms =
TimestampDifferenceMilliseconds(now, idle_timeout_abs);
}
else
{
/* No timeout for me. */
idle_timeout_abs = 0;
timeout_ms = -1;
}
}
#ifdef PGAIO_WORKER_SHOW_PS_INFO
sprintf(cmd, "%d: idle, wakeups:ios = %d:%d",
MyIoWorkerId, hist_wakeups, hist_ios);
set_ps_display(cmd);
#endif
if (WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
timeout_ms,
WAIT_EVENT_IO_WORKER_MAIN) == WL_TIMEOUT)
{
/* WL_TIMEOUT */
if (pgaio_worker_can_timeout())
if (GetCurrentTimestamp() >= idle_timeout_abs)
break;
}
else
{
/* WL_LATCH_SET */
if (++hist_wakeups == PGAIO_WORKER_WAKEUP_RATIO_SATURATE)
{
hist_wakeups /= 2;
hist_ios /= 2;
}
}
ResetLatch(MyLatch);
}
@ -561,6 +1013,10 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
/* If io_max_workers has been decreased, exit highest first. */
if (MyIoWorkerId >= io_max_workers)
break;
}
}

View file

@ -369,6 +369,7 @@ AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue."
WaitLSN "Waiting to read or update shared Wait-for-LSN state."
LogicalDecodingControl "Waiting to read or update logical decoding status information."
DataChecksumsWorker "Waiting for data checksums worker."
AioWorkerControl "Waiting to update AIO worker information."
#
# END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)

View file

@ -1390,6 +1390,14 @@
check_hook => 'check_io_max_concurrency',
},
{ name => 'io_max_workers', type => 'int', context => 'PGC_SIGHUP', group => 'RESOURCES_IO',
short_desc => 'Maximum number of I/O worker processes, for io_method=worker.',
variable => 'io_max_workers',
boot_val => '8',
min => '1',
max => 'MAX_IO_WORKERS',
},
{ name => 'io_method', type => 'enum', context => 'PGC_POSTMASTER', group => 'RESOURCES_IO',
short_desc => 'Selects the method for executing asynchronous I/O.',
variable => 'io_method',
@ -1398,14 +1406,32 @@
assign_hook => 'assign_io_method',
},
{ name => 'io_workers', type => 'int', context => 'PGC_SIGHUP', group => 'RESOURCES_IO',
short_desc => 'Number of IO worker processes, for io_method=worker.',
variable => 'io_workers',
boot_val => '3',
{ name => 'io_min_workers', type => 'int', context => 'PGC_SIGHUP', group => 'RESOURCES_IO',
short_desc => 'Minimum number of I/O worker processes, for io_method=worker.',
variable => 'io_min_workers',
boot_val => '2',
min => '1',
max => 'MAX_IO_WORKERS',
},
{ name => 'io_worker_idle_timeout', type => 'int', context => 'PGC_SIGHUP', group => 'RESOURCES_IO',
short_desc => 'Maximum time before idle I/O worker processes time out, for io_method=worker.',
variable => 'io_worker_idle_timeout',
flags => 'GUC_UNIT_MS',
boot_val => '60000',
min => '0',
max => 'INT_MAX',
},
{ name => 'io_worker_launch_interval', type => 'int', context => 'PGC_SIGHUP', group => 'RESOURCES_IO',
short_desc => 'Minimum time before launching a new I/O worker process, for io_method=worker.',
variable => 'io_worker_launch_interval',
flags => 'GUC_UNIT_MS',
boot_val => '100',
min => '0',
max => 'INT_MAX',
},
# Not for general use --- used by SET SESSION AUTHORIZATION and SET
# ROLE
{ name => 'is_superuser', type => 'bool', context => 'PGC_INTERNAL', group => 'UNGROUPED',

View file

@ -222,7 +222,11 @@
# can execute simultaneously
# -1 sets based on shared_buffers
# (change requires restart)
#io_workers = 3 # 1-32;
#io_min_workers = 2 # 1-32
#io_max_workers = 8 # 1-32
#io_worker_idle_timeout = 60s
#io_worker_launch_interval = 100ms
# - Worker Processes -

View file

@ -17,6 +17,15 @@
pg_noreturn extern void IoWorkerMain(const void *startup_data, size_t startup_data_len);
extern PGDLLIMPORT int io_workers;
/* Public GUCs. */
extern PGDLLIMPORT int io_min_workers;
extern PGDLLIMPORT int io_max_workers;
extern PGDLLIMPORT int io_worker_idle_timeout;
extern PGDLLIMPORT int io_worker_launch_interval;
/* Interfaces visible to the postmaster. */
extern bool pgaio_worker_pm_test_grow_signal_sent(void);
extern void pgaio_worker_pm_clear_grow_signal_sent(void);
extern bool pgaio_worker_pm_test_grow(void);
#endif /* IO_WORKER_H */

View file

@ -88,6 +88,7 @@ PG_LWLOCK(53, AioWorkerSubmissionQueue)
PG_LWLOCK(54, WaitLSN)
PG_LWLOCK(55, LogicalDecodingControl)
PG_LWLOCK(56, DataChecksumsWorker)
PG_LWLOCK(57, AioWorkerControl)
/*
* There also exist several built-in LWLock tranches. As with the predefined

View file

@ -38,6 +38,7 @@ typedef enum
PMSIGNAL_ROTATE_LOGFILE, /* send SIGUSR1 to syslogger to rotate logfile */
PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */
PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */
PMSIGNAL_IO_WORKER_GROW, /* I/O worker pool wants to grow */
PMSIGNAL_BACKGROUND_WORKER_CHANGE, /* background worker state change */
PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */
PMSIGNAL_ADVANCE_STATE_MACHINE, /* advance postmaster's state machine */

View file

@ -14,6 +14,9 @@ $node->init();
$node->append_conf(
'postgresql.conf', qq(
io_method=worker
io_worker_idle_timeout=0ms
io_worker_launch_interval=0ms
io_max_workers=32
));
$node->start();
@ -31,7 +34,7 @@ sub test_number_of_io_workers_dynamic
{
my $node = shift;
my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_workers');
my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_min_workers');
# Verify that worker count can't be set to 0
change_number_of_io_workers($node, 0, $prev_worker_count, 1);
@ -62,24 +65,24 @@ sub change_number_of_io_workers
my ($result, $stdout, $stderr);
($result, $stdout, $stderr) =
$node->psql('postgres', "ALTER SYSTEM SET io_workers = $worker_count");
$node->psql('postgres', "ALTER SYSTEM SET io_min_workers = $worker_count");
$node->safe_psql('postgres', 'SELECT pg_reload_conf()');
if ($expect_failure)
{
like(
$stderr,
qr/$worker_count is outside the valid range for parameter "io_workers"/,
"updating number of io_workers to $worker_count failed, as expected"
qr/$worker_count is outside the valid range for parameter "io_min_workers"/,
"updating io_min_workers to $worker_count failed, as expected"
);
return $prev_worker_count;
}
else
{
is( $node->safe_psql('postgres', 'SHOW io_workers'),
is( $node->safe_psql('postgres', 'SHOW io_min_workers'),
$worker_count,
"updating number of io_workers from $prev_worker_count to $worker_count"
"updating number of io_min_workers from $prev_worker_count to $worker_count"
);
check_io_worker_count($node, $worker_count);

View file

@ -2271,6 +2271,7 @@ PgAioUringCaps
PgAioUringContext
PgAioWaitRef
PgAioWorkerControl
PgAioWorkerSet
PgAioWorkerSlot
PgAioWorkerSubmissionQueue
PgArchData