mirror of
https://github.com/postgres/postgres.git
synced 2026-04-22 14:47:29 -04:00
bufmgr: Improve StartBufferIO interface
Until now StartBufferIO() had a few weaknesses: - As it did not submit staged IOs, it was not safe to call StartBufferIO() where there was a potential for unsubmitted IO, which required AsyncReadBuffers() to use a wrapper (ReadBuffersCanStartIO()) around StartBufferIO(). - With nowait = true, the boolean return value did not allow to distinguish between no IO being necessary and having to wait, which would lead ReadBuffersCanStartIO() to unnecessarily submit staged IO. - Several callers needed to handle both local and shared buffers, requiring the caller to differentiate between StartBufferIO() and StartLocalBufferIO() - In a future commit some callers of StartBufferIO() want the BufferDesc's io_wref to be returned, to asynchronously wait for in-progress IO - Indicating whether to wait with the nowait parameter was somewhat confusing compared to a wait parameter Address these issues as follows: - StartBufferIO() is renamed to StartSharedBufferIO() - A new StartBufferIO() is introduced that supports both shared and local buffers - The boolean return value has been replaced with an enum, indicating whether the IO is already done, already in progress or that the buffer has been readied for IO - A new PgAioWaitRef * argument allows the caller to get the wait reference is desired. All current callers pass NULL, a user of this will be introduced subsequently - Instead of the nowait argument there now is wait This probably would not have been worthwhile on its own, but since all these lines needed to be touched anyway... Author: Andres Freund <andres@anarazel.de> Author: Melanie Plageman <melanieplageman@gmail.com> Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv
This commit is contained in:
parent
2407c8db15
commit
74eafeab1a
7 changed files with 260 additions and 151 deletions
|
|
@ -1148,6 +1148,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
|
|||
BufferDesc *bufHdr;
|
||||
bool need_to_zero;
|
||||
bool isLocalBuf = BufferIsLocal(buffer);
|
||||
StartBufferIOResult sbres;
|
||||
|
||||
Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
|
||||
|
||||
|
|
@ -1159,24 +1160,30 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
|
|||
*/
|
||||
need_to_zero = false;
|
||||
}
|
||||
else if (isLocalBuf)
|
||||
{
|
||||
/* Simple case for non-shared buffers. */
|
||||
bufHdr = GetLocalBufferDescriptor(-buffer - 1);
|
||||
need_to_zero = StartLocalBufferIO(bufHdr, true, false);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Take BM_IO_IN_PROGRESS, or discover that BM_VALID has been set
|
||||
* concurrently. Even though we aren't doing I/O, that ensures that
|
||||
* we don't zero a page that someone else has pinned. An exclusive
|
||||
* content lock wouldn't be enough, because readers are allowed to
|
||||
* drop the content lock after determining that a tuple is visible
|
||||
* (see buffer access rules in README).
|
||||
*/
|
||||
bufHdr = GetBufferDescriptor(buffer - 1);
|
||||
need_to_zero = StartBufferIO(bufHdr, true, false);
|
||||
if (isLocalBuf)
|
||||
{
|
||||
/* Simple case for non-shared buffers. */
|
||||
bufHdr = GetLocalBufferDescriptor(-buffer - 1);
|
||||
sbres = StartLocalBufferIO(bufHdr, true, true, NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Take BM_IO_IN_PROGRESS, or discover that BM_VALID has been set
|
||||
* concurrently. Even though we aren't doing I/O, that ensures
|
||||
* that we don't zero a page that someone else has pinned. An
|
||||
* exclusive content lock wouldn't be enough, because readers are
|
||||
* allowed to drop the content lock after determining that a tuple
|
||||
* is visible (see buffer access rules in README).
|
||||
*/
|
||||
bufHdr = GetBufferDescriptor(buffer - 1);
|
||||
sbres = StartSharedBufferIO(bufHdr, true, true, NULL);
|
||||
}
|
||||
|
||||
Assert(sbres != BUFFER_IO_IN_PROGRESS);
|
||||
need_to_zero = sbres == BUFFER_IO_READY_FOR_IO;
|
||||
}
|
||||
|
||||
if (need_to_zero)
|
||||
|
|
@ -1659,45 +1666,6 @@ CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete)
|
|||
#endif
|
||||
}
|
||||
|
||||
/* helper for ReadBuffersCanStartIO(), to avoid repetition */
|
||||
static inline bool
|
||||
ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait)
|
||||
{
|
||||
if (BufferIsLocal(buffer))
|
||||
return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
|
||||
true, nowait);
|
||||
else
|
||||
return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper for AsyncReadBuffers that tries to get the buffer ready for IO.
|
||||
*/
|
||||
static inline bool
|
||||
ReadBuffersCanStartIO(Buffer buffer, bool nowait)
|
||||
{
|
||||
/*
|
||||
* If this backend currently has staged IO, we need to submit the pending
|
||||
* IO before waiting for the right to issue IO, to avoid the potential for
|
||||
* deadlocks (and, more commonly, unnecessary delays for other backends).
|
||||
*/
|
||||
if (!nowait && pgaio_have_staged())
|
||||
{
|
||||
if (ReadBuffersCanStartIOOnce(buffer, true))
|
||||
return true;
|
||||
|
||||
/*
|
||||
* Unfortunately StartBufferIO() returning false doesn't allow to
|
||||
* distinguish between the buffer already being valid and IO already
|
||||
* being in progress. Since IO already being in progress is quite
|
||||
* rare, this approach seems fine.
|
||||
*/
|
||||
pgaio_submit_staged();
|
||||
}
|
||||
|
||||
return ReadBuffersCanStartIOOnce(buffer, nowait);
|
||||
}
|
||||
|
||||
/*
|
||||
* We track various stats related to buffer hits. Because this is done in a
|
||||
* few separate places, this helper exists for convenience.
|
||||
|
|
@ -1921,6 +1889,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
|
|||
IOContext io_context;
|
||||
IOObject io_object;
|
||||
instr_time io_start;
|
||||
StartBufferIOResult status;
|
||||
|
||||
if (persistence == RELPERSISTENCE_TEMP)
|
||||
{
|
||||
|
|
@ -1974,8 +1943,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
|
|||
pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid);
|
||||
|
||||
/*
|
||||
* Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire()
|
||||
* might block, which we don't want after setting IO_IN_PROGRESS.
|
||||
* Get IO handle before StartBufferIO(), as pgaio_io_acquire() might
|
||||
* block, which we don't want after setting IO_IN_PROGRESS.
|
||||
*
|
||||
* If we need to wait for IO before we can get a handle, submit
|
||||
* already-staged IO first, so that other backends don't need to wait.
|
||||
|
|
@ -2004,31 +1973,41 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
|
|||
* for the outcome: either done, or something went wrong and we will
|
||||
* retry.
|
||||
*/
|
||||
if (!ReadBuffersCanStartIO(buffers[nblocks_done], false))
|
||||
status = StartBufferIO(buffers[nblocks_done], true, true, NULL);
|
||||
if (status != BUFFER_IO_READY_FOR_IO)
|
||||
{
|
||||
/*
|
||||
* Someone else has already completed this block, we're done.
|
||||
*
|
||||
* When IO is necessary, ->nblocks_done is updated in
|
||||
* ProcessReadBuffersResult(), but that is not called if no IO is
|
||||
* necessary. Thus update here.
|
||||
*/
|
||||
operation->nblocks_done += 1;
|
||||
*nblocks_progress = 1;
|
||||
|
||||
pgaio_io_release(ioh);
|
||||
pgaio_wref_clear(&operation->io_wref);
|
||||
*nblocks_progress = 1;
|
||||
if (status == BUFFER_IO_ALREADY_DONE)
|
||||
{
|
||||
/*
|
||||
* Someone has already completed this block, we're done.
|
||||
*
|
||||
* When IO is necessary, ->nblocks_done is updated in
|
||||
* ProcessReadBuffersResult(), but that is not called if no IO is
|
||||
* necessary. Thus update here.
|
||||
*/
|
||||
operation->nblocks_done += 1;
|
||||
Assert(operation->nblocks_done <= operation->nblocks);
|
||||
|
||||
/*
|
||||
* Report and track this as a 'hit' for this backend, even though it
|
||||
* must have started out as a miss in PinBufferForBlock(). The other
|
||||
* backend will track this as a 'read'.
|
||||
*/
|
||||
TrackBufferHit(io_object, io_context,
|
||||
operation->rel, operation->persistence,
|
||||
operation->smgr, operation->forknum,
|
||||
blocknum);
|
||||
return false;
|
||||
Assert(!pgaio_wref_valid(&operation->io_wref));
|
||||
|
||||
/*
|
||||
* Report and track this as a 'hit' for this backend, even though
|
||||
* it must have started out as a miss in PinBufferForBlock(). The
|
||||
* other backend will track this as a 'read'.
|
||||
*/
|
||||
TrackBufferHit(io_object, io_context,
|
||||
operation->rel, operation->persistence,
|
||||
operation->smgr, operation->forknum,
|
||||
blocknum);
|
||||
return false;
|
||||
}
|
||||
|
||||
/* The IO is already in-progress */
|
||||
Assert(status == BUFFER_IO_IN_PROGRESS);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
Assert(io_buffers[0] == buffers[nblocks_done]);
|
||||
|
|
@ -2037,9 +2016,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
|
|||
|
||||
/*
|
||||
* NB: As little code as possible should be added between the
|
||||
* ReadBuffersCanStartIO() above, the further ReadBuffersCanStartIO()s
|
||||
* below and the smgrstartreadv(), as some of the buffers are now marked
|
||||
* as IO_IN_PROGRESS and will thus cause other backends to wait.
|
||||
* StartBufferIO() above, the further StartBufferIO()s below and the
|
||||
* smgrstartreadv(), as some of the buffers are now marked as
|
||||
* IO_IN_PROGRESS and will thus cause other backends to wait.
|
||||
*/
|
||||
|
||||
/*
|
||||
|
|
@ -2053,7 +2032,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
|
|||
Assert(BufferGetBlockNumber(buffers[i - 1]) ==
|
||||
BufferGetBlockNumber(buffers[i]) - 1);
|
||||
|
||||
if (!ReadBuffersCanStartIO(buffers[i], true))
|
||||
status = StartBufferIO(buffers[i], true, false, NULL);
|
||||
if (status != BUFFER_IO_READY_FOR_IO)
|
||||
break;
|
||||
|
||||
Assert(io_buffers[io_buffers_len] == buffers[i]);
|
||||
|
|
@ -2892,16 +2872,23 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
|
|||
* We *must* do smgr[zero]extend before succeeding, else the page
|
||||
* will not be reserved by the kernel, and the next P_NEW call
|
||||
* will decide to return the same page. Clear the BM_VALID bit,
|
||||
* do StartBufferIO() and proceed.
|
||||
* do StartSharedBufferIO() and proceed.
|
||||
*
|
||||
* Loop to handle the very small possibility that someone re-sets
|
||||
* BM_VALID between our clearing it and StartBufferIO inspecting
|
||||
* it.
|
||||
* BM_VALID between our clearing it and StartSharedBufferIO
|
||||
* inspecting it.
|
||||
*/
|
||||
do
|
||||
while (true)
|
||||
{
|
||||
StartBufferIOResult sbres;
|
||||
|
||||
pg_atomic_fetch_and_u64(&existing_hdr->state, ~BM_VALID);
|
||||
} while (!StartBufferIO(existing_hdr, true, false));
|
||||
|
||||
sbres = StartSharedBufferIO(existing_hdr, true, true, NULL);
|
||||
|
||||
if (sbres != BUFFER_IO_ALREADY_DONE)
|
||||
break;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
@ -2927,7 +2914,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
|
|||
LWLockRelease(partition_lock);
|
||||
|
||||
/* XXX: could combine the locked operations in it with the above */
|
||||
StartBufferIO(victim_buf_hdr, true, false);
|
||||
StartSharedBufferIO(victim_buf_hdr, true, true, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -4448,7 +4435,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
|
|||
* someone else flushed the buffer before we could, so we need not do
|
||||
* anything.
|
||||
*/
|
||||
if (!StartBufferIO(buf, false, false))
|
||||
if (StartSharedBufferIO(buf, false, true, NULL) == BUFFER_IO_ALREADY_DONE)
|
||||
return;
|
||||
|
||||
/* Setup error traceback support for ereport() */
|
||||
|
|
@ -7072,6 +7059,13 @@ WaitIO(BufferDesc *buf)
|
|||
{
|
||||
ConditionVariable *cv = BufferDescriptorGetIOCV(buf);
|
||||
|
||||
/*
|
||||
* Should never end up here with unsubmitted IO, as no AIO unaware code
|
||||
* may be used while in batch mode and AIO aware code needs to have
|
||||
* submitted all staged IO to avoid deadlocks & slowness.
|
||||
*/
|
||||
Assert(!pgaio_have_staged());
|
||||
|
||||
ConditionVariablePrepareToSleep(cv);
|
||||
for (;;)
|
||||
{
|
||||
|
|
@ -7124,30 +7118,46 @@ WaitIO(BufferDesc *buf)
|
|||
}
|
||||
|
||||
/*
|
||||
* StartBufferIO: begin I/O on this buffer
|
||||
* StartSharedBufferIO: begin I/O on this buffer
|
||||
* (Assumptions)
|
||||
* My process is executing no IO on this buffer
|
||||
* The buffer is Pinned
|
||||
*
|
||||
* In some scenarios multiple backends could attempt the same I/O operation
|
||||
* concurrently. If someone else has already started I/O on this buffer then
|
||||
* we will wait for completion of the IO using WaitIO().
|
||||
* In several scenarios the buffer may already be undergoing I/O in this or
|
||||
* another backend. How to best handle that depends on the caller's
|
||||
* situation. It might be appropriate to wait synchronously (e.g., because the
|
||||
* buffer is about to be invalidated); wait asynchronously, using the buffer's
|
||||
* IO wait reference (e.g., because the caller is doing readahead and doesn't
|
||||
* need the buffer to be ready immediately); or to not wait at all (e.g.,
|
||||
* because the caller is trying to combine IO for this buffer with another
|
||||
* buffer).
|
||||
*
|
||||
* Input operations are only attempted on buffers that are not BM_VALID,
|
||||
* and output operations only on buffers that are BM_VALID and BM_DIRTY,
|
||||
* so we can always tell if the work is already done.
|
||||
* How and whether to wait is controlled by the wait and io_wref
|
||||
* parameters. In detail:
|
||||
*
|
||||
* Returns true if we successfully marked the buffer as I/O busy,
|
||||
* false if someone else already did the work.
|
||||
* - If the caller passes a non-NULL io_wref and the buffer has an I/O wait
|
||||
* reference, the *io_wref is set to the buffer's io_wref and
|
||||
* BUFFER_IO_IN_PROGRESS is returned. This is done regardless of the wait
|
||||
* parameter.
|
||||
*
|
||||
* If nowait is true, then we don't wait for an I/O to be finished by another
|
||||
* backend. In that case, false indicates either that the I/O was already
|
||||
* finished, or is still in progress. This is useful for callers that want to
|
||||
* find out if they can perform the I/O as part of a larger operation, without
|
||||
* waiting for the answer or distinguishing the reasons why not.
|
||||
* - If the caller passes a NULL io_wref (i.e. the caller does not want to
|
||||
* asynchronously wait for the completion of the IO), wait = false and the
|
||||
* buffer is undergoing IO, BUFFER_IO_IN_PROGRESS is returned.
|
||||
*
|
||||
* - If wait = true and either the buffer does not have a wait reference,
|
||||
* or the caller passes io_wref = NULL, WaitIO() is used to wait for the IO
|
||||
* to complete. To avoid the potential of deadlocks and unnecessary delays,
|
||||
* all staged I/O is submitted before waiting.
|
||||
*
|
||||
* Input operations are only attempted on buffers that are not BM_VALID, and
|
||||
* output operations only on buffers that are BM_VALID and BM_DIRTY, so we can
|
||||
* always tell if the work is already done. If no I/O is necessary,
|
||||
* BUFFER_IO_ALREADY_DONE is returned.
|
||||
*
|
||||
* If we successfully marked the buffer as BM_IO_IN_PROGRESS,
|
||||
* BUFFER_IO_READY_FOR_IO is returned.
|
||||
*/
|
||||
bool
|
||||
StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
|
||||
StartBufferIOResult
|
||||
StartSharedBufferIO(BufferDesc *buf, bool forInput, bool wait, PgAioWaitRef *io_wref)
|
||||
{
|
||||
uint64 buf_state;
|
||||
|
||||
|
|
@ -7159,10 +7169,42 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
|
|||
|
||||
if (!(buf_state & BM_IO_IN_PROGRESS))
|
||||
break;
|
||||
UnlockBufHdr(buf);
|
||||
if (nowait)
|
||||
return false;
|
||||
WaitIO(buf);
|
||||
|
||||
/* Join the existing IO */
|
||||
if (io_wref != NULL && pgaio_wref_valid(&buf->io_wref))
|
||||
{
|
||||
*io_wref = buf->io_wref;
|
||||
UnlockBufHdr(buf);
|
||||
|
||||
return BUFFER_IO_IN_PROGRESS;
|
||||
}
|
||||
else if (!wait)
|
||||
{
|
||||
UnlockBufHdr(buf);
|
||||
return BUFFER_IO_IN_PROGRESS;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* With wait = true, we always have to wait if the caller has
|
||||
* passed io_wref = NULL.
|
||||
*
|
||||
* Even with io_wref != NULL, we have to wait if the buffer's wait
|
||||
* ref is not valid but the IO is in progress, someone else
|
||||
* started IO but hasn't set the wait ref yet. We have no choice
|
||||
* but to wait until the IO completes.
|
||||
*/
|
||||
UnlockBufHdr(buf);
|
||||
|
||||
/*
|
||||
* If this backend currently has staged IO, submit it before
|
||||
* waiting for in-progress IO, to avoid potential deadlocks and
|
||||
* unnecessary delays.
|
||||
*/
|
||||
pgaio_submit_staged();
|
||||
|
||||
WaitIO(buf);
|
||||
}
|
||||
}
|
||||
|
||||
/* Once we get here, there is definitely no I/O active on this buffer */
|
||||
|
|
@ -7171,9 +7213,14 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
|
|||
if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
|
||||
{
|
||||
UnlockBufHdr(buf);
|
||||
return false;
|
||||
return BUFFER_IO_ALREADY_DONE;
|
||||
}
|
||||
|
||||
/*
|
||||
* No IO in progress and not already done; we will start IO. It's possible
|
||||
* that the IO was in progress but we're not done, because the IO errored
|
||||
* out. We'll do the IO ourselves.
|
||||
*/
|
||||
UnlockBufHdrExt(buf, buf_state,
|
||||
BM_IO_IN_PROGRESS, 0,
|
||||
0);
|
||||
|
|
@ -7181,7 +7228,31 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
|
|||
ResourceOwnerRememberBufferIO(CurrentResourceOwner,
|
||||
BufferDescriptorGetBuffer(buf));
|
||||
|
||||
return true;
|
||||
return BUFFER_IO_READY_FOR_IO;
|
||||
}
|
||||
|
||||
/*
|
||||
* Wrapper around StartSharedBufferIO / StartLocalBufferIO. Only to be used
|
||||
* when the caller doesn't otherwise need to care about local vs shared. See
|
||||
* StartSharedBufferIO() for details.
|
||||
*/
|
||||
StartBufferIOResult
|
||||
StartBufferIO(Buffer buffer, bool forInput, bool wait, PgAioWaitRef *io_wref)
|
||||
{
|
||||
BufferDesc *buf_hdr;
|
||||
|
||||
if (BufferIsLocal(buffer))
|
||||
{
|
||||
buf_hdr = GetLocalBufferDescriptor(-buffer - 1);
|
||||
|
||||
return StartLocalBufferIO(buf_hdr, forInput, wait, io_wref);
|
||||
}
|
||||
else
|
||||
{
|
||||
buf_hdr = GetBufferDescriptor(buffer - 1);
|
||||
|
||||
return StartSharedBufferIO(buf_hdr, forInput, wait, io_wref);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -189,9 +189,10 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
|
|||
|
||||
/*
|
||||
* Try to start an I/O operation. There currently are no reasons for
|
||||
* StartLocalBufferIO to return false, so we raise an error in that case.
|
||||
* StartLocalBufferIO to return anything other than
|
||||
* BUFFER_IO_READY_FOR_IO, so we raise an error in that case.
|
||||
*/
|
||||
if (!StartLocalBufferIO(bufHdr, false, false))
|
||||
if (StartLocalBufferIO(bufHdr, false, true, NULL) != BUFFER_IO_READY_FOR_IO)
|
||||
elog(ERROR, "failed to start write IO on local buffer");
|
||||
|
||||
/* Find smgr relation for buffer */
|
||||
|
|
@ -435,7 +436,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
|
|||
pg_atomic_unlocked_write_u64(&existing_hdr->state, buf_state);
|
||||
|
||||
/* no need to loop for local buffers */
|
||||
StartLocalBufferIO(existing_hdr, true, false);
|
||||
StartLocalBufferIO(existing_hdr, true, true, NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
@ -451,7 +452,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
|
|||
|
||||
hresult->id = victim_buf_id;
|
||||
|
||||
StartLocalBufferIO(victim_buf_hdr, true, false);
|
||||
StartLocalBufferIO(victim_buf_hdr, true, true, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -517,26 +518,41 @@ MarkLocalBufferDirty(Buffer buffer)
|
|||
}
|
||||
|
||||
/*
|
||||
* Like StartBufferIO, but for local buffers
|
||||
* Like StartSharedBufferIO, but for local buffers
|
||||
*/
|
||||
bool
|
||||
StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
|
||||
StartBufferIOResult
|
||||
StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool wait, PgAioWaitRef *io_wref)
|
||||
{
|
||||
uint64 buf_state;
|
||||
|
||||
/*
|
||||
* With AIO the buffer could have IO in progress, e.g. when there are two
|
||||
* scans of the same relation. Either wait for the other IO or return
|
||||
* false.
|
||||
* scans of the same relation. Either wait for the other IO (if wait =
|
||||
* true and io_wref == NULL) or return BUFFER_IO_IN_PROGRESS;
|
||||
*/
|
||||
if (pgaio_wref_valid(&bufHdr->io_wref))
|
||||
{
|
||||
PgAioWaitRef iow = bufHdr->io_wref;
|
||||
PgAioWaitRef buf_wref = bufHdr->io_wref;
|
||||
|
||||
if (nowait)
|
||||
return false;
|
||||
if (io_wref != NULL)
|
||||
{
|
||||
/* We've already asynchronously started this IO, so join it */
|
||||
*io_wref = buf_wref;
|
||||
return BUFFER_IO_IN_PROGRESS;
|
||||
}
|
||||
|
||||
pgaio_wref_wait(&iow);
|
||||
/*
|
||||
* For temp buffers we should never need to wait in
|
||||
* StartLocalBufferIO() when called with io_wref == NULL while there
|
||||
* are staged IOs, as it's not allowed to call code that is not aware
|
||||
* of AIO while in batch mode.
|
||||
*/
|
||||
Assert(!pgaio_have_staged());
|
||||
|
||||
if (!wait)
|
||||
return BUFFER_IO_IN_PROGRESS;
|
||||
|
||||
pgaio_wref_wait(&buf_wref);
|
||||
}
|
||||
|
||||
/* Once we get here, there is definitely no I/O active on this buffer */
|
||||
|
|
@ -545,14 +561,14 @@ StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
|
|||
buf_state = pg_atomic_read_u64(&bufHdr->state);
|
||||
if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
|
||||
{
|
||||
return false;
|
||||
return BUFFER_IO_ALREADY_DONE;
|
||||
}
|
||||
|
||||
/* BM_IO_IN_PROGRESS isn't currently used for local buffers */
|
||||
|
||||
/* local buffers don't track IO using resowners */
|
||||
|
||||
return true;
|
||||
return BUFFER_IO_READY_FOR_IO;
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -554,8 +554,25 @@ extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context,
|
|||
|
||||
extern void TrackNewBufferPin(Buffer buf);
|
||||
|
||||
/* solely to make it easier to write tests */
|
||||
extern bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
|
||||
/*
|
||||
* Return value for StartBufferIO / StartSharedBufferIO / StartLocalBufferIO.
|
||||
*
|
||||
* When preparing a buffer for I/O and setting BM_IO_IN_PROGRESS, the buffer
|
||||
* may already have I/O in progress or the I/O may have been done by another
|
||||
* backend. See the documentation of StartSharedBufferIO for more details.
|
||||
*/
|
||||
typedef enum StartBufferIOResult
|
||||
{
|
||||
BUFFER_IO_ALREADY_DONE,
|
||||
BUFFER_IO_IN_PROGRESS,
|
||||
BUFFER_IO_READY_FOR_IO,
|
||||
} StartBufferIOResult;
|
||||
|
||||
/* the following are exposed to make it easier to write tests */
|
||||
extern StartBufferIOResult StartBufferIO(Buffer buffer, bool forInput, bool wait,
|
||||
PgAioWaitRef *io_wref);
|
||||
extern StartBufferIOResult StartSharedBufferIO(BufferDesc *buf, bool forInput, bool wait,
|
||||
PgAioWaitRef *io_wref);
|
||||
extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint64 set_flag_bits,
|
||||
bool forget_owner, bool release_aio);
|
||||
|
||||
|
|
@ -600,7 +617,8 @@ extern BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr,
|
|||
extern void MarkLocalBufferDirty(Buffer buffer);
|
||||
extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty,
|
||||
uint64 set_flag_bits, bool release_aio);
|
||||
extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait);
|
||||
extern StartBufferIOResult StartLocalBufferIO(BufferDesc *bufHdr, bool forInput,
|
||||
bool wait, PgAioWaitRef *io_wref);
|
||||
extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln);
|
||||
extern void InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced);
|
||||
extern void DropRelationLocalBuffers(RelFileLocator rlocator,
|
||||
|
|
|
|||
|
|
@ -383,7 +383,7 @@ sub test_startwait_io
|
|||
$io_method,
|
||||
$psql_a,
|
||||
"first StartBufferIO",
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
|
||||
qr/^t$/,
|
||||
qr/^$/);
|
||||
|
||||
|
|
@ -392,14 +392,14 @@ sub test_startwait_io
|
|||
$io_method,
|
||||
$psql_a,
|
||||
"second StartBufferIO fails, same session",
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
|
||||
qr/^f$/,
|
||||
qr/^$/);
|
||||
psql_like(
|
||||
$io_method,
|
||||
$psql_b,
|
||||
"second StartBufferIO fails, other session",
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
|
||||
qr/^f$/,
|
||||
qr/^$/);
|
||||
|
||||
|
|
@ -409,7 +409,7 @@ sub test_startwait_io
|
|||
$node,
|
||||
$psql_b,
|
||||
"blocking start buffer io",
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
|
||||
"BufferIo");
|
||||
|
||||
# Terminate the IO, without marking it as success, this should trigger the
|
||||
|
|
@ -438,7 +438,7 @@ sub test_startwait_io
|
|||
$io_method,
|
||||
$psql_a,
|
||||
"blocking buffer io w/ success: first start buffer io",
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
|
||||
qr/^t$/,
|
||||
qr/^$/);
|
||||
|
||||
|
|
@ -448,7 +448,7 @@ sub test_startwait_io
|
|||
$node,
|
||||
$psql_b,
|
||||
"blocking start buffer io",
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
|
||||
"BufferIo");
|
||||
|
||||
# Terminate the IO, marking it as success
|
||||
|
|
@ -486,7 +486,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
|
|||
$io_method,
|
||||
$psql_a,
|
||||
"first StartLocalBufferIO",
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
|
||||
qr/^t$/,
|
||||
qr/^$/);
|
||||
|
||||
|
|
@ -497,7 +497,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
|
|||
$io_method,
|
||||
$psql_a,
|
||||
"second StartLocalBufferIO succeeds, same session",
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
|
||||
qr/^t$/,
|
||||
qr/^$/);
|
||||
|
||||
|
|
@ -509,7 +509,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
|
|||
$io_method,
|
||||
$psql_a,
|
||||
"StartLocalBufferIO after not marking valid succeeds, same session",
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
|
||||
qr/^t$/,
|
||||
qr/^$/);
|
||||
|
||||
|
|
@ -524,7 +524,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
|
|||
$io_method,
|
||||
$psql_a,
|
||||
"StartLocalBufferIO after marking valid fails",
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
|
||||
qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
|
||||
qr/^f$/,
|
||||
qr/^$/);
|
||||
|
||||
|
|
@ -1612,7 +1612,7 @@ read_buffers('$table', 1, 3)|,
|
|||
my $buf_id =
|
||||
$psql_b->query_safe(qq|SELECT buffer_create_toy('$table', 3)|);
|
||||
$psql_b->query_safe(
|
||||
qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)|
|
||||
qq|SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true)|
|
||||
);
|
||||
|
||||
query_wait_block(
|
||||
|
|
@ -1637,7 +1637,7 @@ read_buffers('$table', 1, 3)|,
|
|||
$buf_id =
|
||||
$psql_b->query_safe(qq|SELECT buffer_create_toy('$table', 3)|);
|
||||
$psql_b->query_safe(
|
||||
qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)|
|
||||
qq|SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true)|
|
||||
);
|
||||
|
||||
query_wait_block(
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ CREATE FUNCTION buffer_create_toy(rel regclass, blockno int4)
|
|||
RETURNS pg_catalog.int4 STRICT
|
||||
AS 'MODULE_PATHNAME' LANGUAGE C;
|
||||
|
||||
CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool)
|
||||
CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, wait bool)
|
||||
RETURNS pg_catalog.bool STRICT
|
||||
AS 'MODULE_PATHNAME' LANGUAGE C;
|
||||
|
||||
|
|
|
|||
|
|
@ -434,13 +434,13 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
|
|||
if (RelationUsesLocalBuffers(rel))
|
||||
{
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
StartLocalBufferIO(buf_hdrs[i], true, false);
|
||||
StartLocalBufferIO(buf_hdrs[i], true, true, NULL);
|
||||
pgaio_io_set_flag(ioh, PGAIO_HF_REFERENCES_LOCAL);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
StartBufferIO(buf_hdrs[i], true, false);
|
||||
StartSharedBufferIO(buf_hdrs[i], true, true, NULL);
|
||||
}
|
||||
|
||||
pgaio_io_set_handle_data_32(ioh, (uint32 *) bufs, nblocks);
|
||||
|
|
@ -622,15 +622,18 @@ buffer_call_start_io(PG_FUNCTION_ARGS)
|
|||
{
|
||||
Buffer buf = PG_GETARG_INT32(0);
|
||||
bool for_input = PG_GETARG_BOOL(1);
|
||||
bool nowait = PG_GETARG_BOOL(2);
|
||||
bool wait = PG_GETARG_BOOL(2);
|
||||
StartBufferIOResult result;
|
||||
bool can_start;
|
||||
|
||||
if (BufferIsLocal(buf))
|
||||
can_start = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
|
||||
for_input, nowait);
|
||||
result = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
|
||||
for_input, wait, NULL);
|
||||
else
|
||||
can_start = StartBufferIO(GetBufferDescriptor(buf - 1),
|
||||
for_input, nowait);
|
||||
result = StartSharedBufferIO(GetBufferDescriptor(buf - 1),
|
||||
for_input, wait, NULL);
|
||||
|
||||
can_start = result == BUFFER_IO_READY_FOR_IO;
|
||||
|
||||
/*
|
||||
* For tests we don't want the resowner release preventing us from
|
||||
|
|
|
|||
|
|
@ -2946,6 +2946,7 @@ SplitTextOutputData
|
|||
SplitVar
|
||||
StackElem
|
||||
StakindFlags
|
||||
StartBufferIOResult
|
||||
StartDataPtrType
|
||||
StartLOPtrType
|
||||
StartLOsPtrType
|
||||
|
|
|
|||
Loading…
Reference in a new issue