mirror of
https://github.com/postgres/postgres.git
synced 2026-06-08 16:26:30 -04:00
The internal queue of buffers could become corrupted in a rare edge case
that failed to invalidate an entry, causing a stale buffer to be
"forwarded" to StartReadBuffers(). This is a simple fix for the
immediate problem.
A small API change might be able to remove this and related fragility
entirely, but that will have to wait a bit.
Defect in commit ed0b87ca.
Bug: 19006
Backpatch-through: 18
Reported-by: Alexander Lakhin <exclusion@gmail.com>
Reviewed-by: Tom Lane <tgl@sss.pgh.pa.us>
Reviewed-by: Michael Paquier <michael@paquier.xyz>
Reviewed-by: Xuneng Zhou <xunengzhou@gmail.com>
Discussion: https://postgr.es/m/19006-80fcaaf69000377e%40postgresql.org
1093 lines
37 KiB
C
1093 lines
37 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* read_stream.c
|
|
* Mechanism for accessing buffered relation data with look-ahead
|
|
*
|
|
* Code that needs to access relation data typically pins blocks one at a
|
|
* time, often in a predictable order that might be sequential or data-driven.
|
|
* Calling the simple ReadBuffer() function for each block is inefficient,
|
|
* because blocks that are not yet in the buffer pool require I/O operations
|
|
* that are small and might stall waiting for storage. This mechanism looks
|
|
* into the future and calls StartReadBuffers() and WaitReadBuffers() to read
|
|
* neighboring blocks together and ahead of time, with an adaptive look-ahead
|
|
* distance.
|
|
*
|
|
* A user-provided callback generates a stream of block numbers that is used
|
|
* to form reads of up to io_combine_limit, by attempting to merge them with a
|
|
* pending read. When that isn't possible, the existing pending read is sent
|
|
* to StartReadBuffers() so that a new one can begin to form.
|
|
*
|
|
* The algorithm for controlling the look-ahead distance is based on recent
|
|
* cache hit and miss history. When no I/O is necessary, there is no benefit
|
|
* in looking ahead more than one block. This is the default initial
|
|
* assumption, but when blocks needing I/O are streamed, the distance is
|
|
* increased rapidly to try to benefit from I/O combining and concurrency. It
|
|
* is reduced gradually when cached blocks are streamed.
|
|
*
|
|
* The main data structure is a circular queue of buffers of size
|
|
* max_pinned_buffers plus some extra space for technical reasons, ready to be
|
|
* returned by read_stream_next_buffer(). Each buffer also has an optional
|
|
* variable sized object that is passed from the callback to the consumer of
|
|
* buffers.
|
|
*
|
|
* Parallel to the queue of buffers, there is a circular queue of in-progress
|
|
* I/Os that have been started with StartReadBuffers(), and for which
|
|
* WaitReadBuffers() must be called before returning the buffer.
|
|
*
|
|
* For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
|
|
* successive calls, then these data structures might appear as follows:
|
|
*
|
|
* buffers buf/data ios
|
|
*
|
|
* +----+ +-----+ +--------+
|
|
* | | | | +----+ 42..44 | <- oldest_io_index
|
|
* +----+ +-----+ | +--------+
|
|
* oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
|
|
* +----+ +-----+ | | +--------+
|
|
* | 42 | | ? |<-+ | | | <- next_io_index
|
|
* +----+ +-----+ | +--------+
|
|
* | 43 | | ? | | | |
|
|
* +----+ +-----+ | +--------+
|
|
* | 44 | | ? | | | |
|
|
* +----+ +-----+ | +--------+
|
|
* | 60 | | ? |<---+
|
|
* +----+ +-----+
|
|
* next_buffer_index -> | | | |
|
|
* +----+ +-----+
|
|
*
|
|
* In the example, 5 buffers are pinned, and the next buffer to be streamed to
|
|
* the client is block 10. Block 10 was a hit and has no associated I/O, but
|
|
* the range 42..44 requires an I/O wait before its buffers are returned, as
|
|
* does block 60.
|
|
*
|
|
*
|
|
* Portions Copyright (c) 2024-2025, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/storage/aio/read_stream.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
#include "postgres.h"
|
|
|
|
#include "miscadmin.h"
|
|
#include "storage/aio.h"
|
|
#include "storage/fd.h"
|
|
#include "storage/smgr.h"
|
|
#include "storage/read_stream.h"
|
|
#include "utils/memdebug.h"
|
|
#include "utils/rel.h"
|
|
#include "utils/spccache.h"
|
|
|
|
typedef struct InProgressIO
|
|
{
|
|
int16 buffer_index;
|
|
ReadBuffersOperation op;
|
|
} InProgressIO;
|
|
|
|
/*
|
|
* State for managing a stream of reads.
|
|
*/
|
|
struct ReadStream
|
|
{
|
|
int16 max_ios;
|
|
int16 io_combine_limit;
|
|
int16 ios_in_progress;
|
|
int16 queue_size;
|
|
int16 max_pinned_buffers;
|
|
int16 forwarded_buffers;
|
|
int16 pinned_buffers;
|
|
int16 distance;
|
|
int16 initialized_buffers;
|
|
int read_buffers_flags;
|
|
bool sync_mode; /* using io_method=sync */
|
|
bool batch_mode; /* READ_STREAM_USE_BATCHING */
|
|
bool advice_enabled;
|
|
bool temporary;
|
|
|
|
/*
|
|
* One-block buffer to support 'ungetting' a block number, to resolve flow
|
|
* control problems when I/Os are split.
|
|
*/
|
|
BlockNumber buffered_blocknum;
|
|
|
|
/*
|
|
* The callback that will tell us which block numbers to read, and an
|
|
* opaque pointer that will be pass to it for its own purposes.
|
|
*/
|
|
ReadStreamBlockNumberCB callback;
|
|
void *callback_private_data;
|
|
|
|
/* Next expected block, for detecting sequential access. */
|
|
BlockNumber seq_blocknum;
|
|
BlockNumber seq_until_processed;
|
|
|
|
/* The read operation we are currently preparing. */
|
|
BlockNumber pending_read_blocknum;
|
|
int16 pending_read_nblocks;
|
|
|
|
/* Space for buffers and optional per-buffer private data. */
|
|
size_t per_buffer_data_size;
|
|
void *per_buffer_data;
|
|
|
|
/* Read operations that have been started but not waited for yet. */
|
|
InProgressIO *ios;
|
|
int16 oldest_io_index;
|
|
int16 next_io_index;
|
|
|
|
bool fast_path;
|
|
|
|
/* Circular queue of buffers. */
|
|
int16 oldest_buffer_index; /* Next pinned buffer to return */
|
|
int16 next_buffer_index; /* Index of next buffer to pin */
|
|
Buffer buffers[FLEXIBLE_ARRAY_MEMBER];
|
|
};
|
|
|
|
/*
|
|
* Return a pointer to the per-buffer data by index.
|
|
*/
|
|
static inline void *
|
|
get_per_buffer_data(ReadStream *stream, int16 buffer_index)
|
|
{
|
|
return (char *) stream->per_buffer_data +
|
|
stream->per_buffer_data_size * buffer_index;
|
|
}
|
|
|
|
/*
|
|
* General-use ReadStreamBlockNumberCB for block range scans. Loops over the
|
|
* blocks [current_blocknum, last_exclusive).
|
|
*/
|
|
BlockNumber
|
|
block_range_read_stream_cb(ReadStream *stream,
|
|
void *callback_private_data,
|
|
void *per_buffer_data)
|
|
{
|
|
BlockRangeReadStreamPrivate *p = callback_private_data;
|
|
|
|
if (p->current_blocknum < p->last_exclusive)
|
|
return p->current_blocknum++;
|
|
|
|
return InvalidBlockNumber;
|
|
}
|
|
|
|
/*
|
|
* Ask the callback which block it would like us to read next, with a one block
|
|
* buffer in front to allow read_stream_unget_block() to work.
|
|
*/
|
|
static inline BlockNumber
|
|
read_stream_get_block(ReadStream *stream, void *per_buffer_data)
|
|
{
|
|
BlockNumber blocknum;
|
|
|
|
blocknum = stream->buffered_blocknum;
|
|
if (blocknum != InvalidBlockNumber)
|
|
stream->buffered_blocknum = InvalidBlockNumber;
|
|
else
|
|
{
|
|
/*
|
|
* Tell Valgrind that the per-buffer data is undefined. That replaces
|
|
* the "noaccess" state that was set when the consumer moved past this
|
|
* entry last time around the queue, and should also catch callbacks
|
|
* that fail to initialize data that the buffer consumer later
|
|
* accesses. On the first go around, it is undefined already.
|
|
*/
|
|
VALGRIND_MAKE_MEM_UNDEFINED(per_buffer_data,
|
|
stream->per_buffer_data_size);
|
|
blocknum = stream->callback(stream,
|
|
stream->callback_private_data,
|
|
per_buffer_data);
|
|
}
|
|
|
|
return blocknum;
|
|
}
|
|
|
|
/*
|
|
* In order to deal with buffer shortages and I/O limits after short reads, we
|
|
* sometimes need to defer handling of a block we've already consumed from the
|
|
* registered callback until later.
|
|
*/
|
|
static inline void
|
|
read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
|
|
{
|
|
/* We shouldn't ever unget more than one block. */
|
|
Assert(stream->buffered_blocknum == InvalidBlockNumber);
|
|
Assert(blocknum != InvalidBlockNumber);
|
|
stream->buffered_blocknum = blocknum;
|
|
}
|
|
|
|
/*
|
|
* Start as much of the current pending read as we can. If we have to split it
|
|
* because of the per-backend buffer limit, or the buffer manager decides to
|
|
* split it, then the pending read is adjusted to hold the remaining portion.
|
|
*
|
|
* We can always start a read of at least size one if we have no progress yet.
|
|
* Otherwise it's possible that we can't start a read at all because of a lack
|
|
* of buffers, and then false is returned. Buffer shortages also reduce the
|
|
* distance to a level that prevents look-ahead until buffers are released.
|
|
*/
|
|
static bool
|
|
read_stream_start_pending_read(ReadStream *stream)
|
|
{
|
|
bool need_wait;
|
|
int requested_nblocks;
|
|
int nblocks;
|
|
int flags;
|
|
int forwarded;
|
|
int16 io_index;
|
|
int16 overflow;
|
|
int16 buffer_index;
|
|
int buffer_limit;
|
|
|
|
/* This should only be called with a pending read. */
|
|
Assert(stream->pending_read_nblocks > 0);
|
|
Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
|
|
|
|
/* We had better not exceed the per-stream buffer limit with this read. */
|
|
Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
|
|
stream->max_pinned_buffers);
|
|
|
|
#ifdef USE_ASSERT_CHECKING
|
|
/* We had better not be overwriting an existing pinned buffer. */
|
|
if (stream->pinned_buffers > 0)
|
|
Assert(stream->next_buffer_index != stream->oldest_buffer_index);
|
|
else
|
|
Assert(stream->next_buffer_index == stream->oldest_buffer_index);
|
|
|
|
/*
|
|
* Pinned buffers forwarded by a preceding StartReadBuffers() call that
|
|
* had to split the operation should match the leading blocks of this
|
|
* following StartReadBuffers() call.
|
|
*/
|
|
Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
|
|
for (int i = 0; i < stream->forwarded_buffers; ++i)
|
|
Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
|
|
stream->pending_read_blocknum + i);
|
|
|
|
/*
|
|
* Check that we've cleared the queue/overflow entries corresponding to
|
|
* the rest of the blocks covered by this read, unless it's the first go
|
|
* around and we haven't even initialized them yet.
|
|
*/
|
|
for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
|
|
Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
|
|
stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
|
|
#endif
|
|
|
|
/* Do we need to issue read-ahead advice? */
|
|
flags = stream->read_buffers_flags;
|
|
if (stream->advice_enabled)
|
|
{
|
|
if (stream->pending_read_blocknum == stream->seq_blocknum)
|
|
{
|
|
/*
|
|
* Sequential: Issue advice until the preadv() calls have caught
|
|
* up with the first advice issued for this sequential region, and
|
|
* then stay of the way of the kernel's own read-ahead.
|
|
*/
|
|
if (stream->seq_until_processed != InvalidBlockNumber)
|
|
flags |= READ_BUFFERS_ISSUE_ADVICE;
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* Random jump: Note the starting location of a new potential
|
|
* sequential region and start issuing advice. Skip it this time
|
|
* if the preadv() follows immediately, eg first block in stream.
|
|
*/
|
|
stream->seq_until_processed = stream->pending_read_blocknum;
|
|
if (stream->pinned_buffers > 0)
|
|
flags |= READ_BUFFERS_ISSUE_ADVICE;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* How many more buffers is this backend allowed?
|
|
*
|
|
* Forwarded buffers are already pinned and map to the leading blocks of
|
|
* the pending read (the remaining portion of an earlier short read that
|
|
* we're about to continue). They are not counted in pinned_buffers, but
|
|
* they are counted as pins already held by this backend according to the
|
|
* buffer manager, so they must be added to the limit it grants us.
|
|
*/
|
|
if (stream->temporary)
|
|
buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
|
|
else
|
|
buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
|
|
Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
|
|
|
|
buffer_limit += stream->forwarded_buffers;
|
|
buffer_limit = Min(buffer_limit, PG_INT16_MAX);
|
|
|
|
if (buffer_limit == 0 && stream->pinned_buffers == 0)
|
|
buffer_limit = 1; /* guarantee progress */
|
|
|
|
/* Does the per-backend limit affect this read? */
|
|
nblocks = stream->pending_read_nblocks;
|
|
if (buffer_limit < nblocks)
|
|
{
|
|
int16 new_distance;
|
|
|
|
/* Shrink distance: no more look-ahead until buffers are released. */
|
|
new_distance = stream->pinned_buffers + buffer_limit;
|
|
if (stream->distance > new_distance)
|
|
stream->distance = new_distance;
|
|
|
|
/* Unless we have nothing to give the consumer, stop here. */
|
|
if (stream->pinned_buffers > 0)
|
|
return false;
|
|
|
|
/* A short read is required to make progress. */
|
|
nblocks = buffer_limit;
|
|
}
|
|
|
|
/*
|
|
* We say how many blocks we want to read, but it may be smaller on return
|
|
* if the buffer manager decides to shorten the read. Initialize buffers
|
|
* to InvalidBuffer (= not a forwarded buffer) as input on first use only,
|
|
* and keep the original nblocks number so we can check for forwarded
|
|
* buffers as output, below.
|
|
*/
|
|
buffer_index = stream->next_buffer_index;
|
|
io_index = stream->next_io_index;
|
|
while (stream->initialized_buffers < buffer_index + nblocks)
|
|
stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
|
|
requested_nblocks = nblocks;
|
|
need_wait = StartReadBuffers(&stream->ios[io_index].op,
|
|
&stream->buffers[buffer_index],
|
|
stream->pending_read_blocknum,
|
|
&nblocks,
|
|
flags);
|
|
stream->pinned_buffers += nblocks;
|
|
|
|
/* Remember whether we need to wait before returning this buffer. */
|
|
if (!need_wait)
|
|
{
|
|
/* Look-ahead distance decays, no I/O necessary. */
|
|
if (stream->distance > 1)
|
|
stream->distance--;
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* Remember to call WaitReadBuffers() before returning head buffer.
|
|
* Look-ahead distance will be adjusted after waiting.
|
|
*/
|
|
stream->ios[io_index].buffer_index = buffer_index;
|
|
if (++stream->next_io_index == stream->max_ios)
|
|
stream->next_io_index = 0;
|
|
Assert(stream->ios_in_progress < stream->max_ios);
|
|
stream->ios_in_progress++;
|
|
stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
|
|
}
|
|
|
|
/*
|
|
* How many pins were acquired but forwarded to the next call? These need
|
|
* to be passed to the next StartReadBuffers() call by leaving them
|
|
* exactly where they are in the queue, or released if the stream ends
|
|
* early. We need the number for accounting purposes, since they are not
|
|
* counted in stream->pinned_buffers but we already hold them.
|
|
*/
|
|
forwarded = 0;
|
|
while (nblocks + forwarded < requested_nblocks &&
|
|
stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
|
|
forwarded++;
|
|
stream->forwarded_buffers = forwarded;
|
|
|
|
/*
|
|
* We gave a contiguous range of buffer space to StartReadBuffers(), but
|
|
* we want it to wrap around at queue_size. Copy overflowing buffers to
|
|
* the front of the array where they'll be consumed, but also leave a copy
|
|
* in the overflow zone which the I/O operation has a pointer to (it needs
|
|
* a contiguous array). Both copies will be cleared when the buffers are
|
|
* handed to the consumer.
|
|
*/
|
|
overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
|
|
if (overflow > 0)
|
|
{
|
|
Assert(overflow < stream->queue_size); /* can't overlap */
|
|
memcpy(&stream->buffers[0],
|
|
&stream->buffers[stream->queue_size],
|
|
sizeof(stream->buffers[0]) * overflow);
|
|
}
|
|
|
|
/* Compute location of start of next read, without using % operator. */
|
|
buffer_index += nblocks;
|
|
if (buffer_index >= stream->queue_size)
|
|
buffer_index -= stream->queue_size;
|
|
Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
|
|
stream->next_buffer_index = buffer_index;
|
|
|
|
/* Adjust the pending read to cover the remaining portion, if any. */
|
|
stream->pending_read_blocknum += nblocks;
|
|
stream->pending_read_nblocks -= nblocks;
|
|
|
|
return true;
|
|
}
|
|
|
|
static void
|
|
read_stream_look_ahead(ReadStream *stream)
|
|
{
|
|
/*
|
|
* Allow amortizing the cost of submitting IO over multiple IOs. This
|
|
* requires that we don't do any operations that could lead to a deadlock
|
|
* with staged-but-unsubmitted IO. The callback needs to opt-in to being
|
|
* careful.
|
|
*/
|
|
if (stream->batch_mode)
|
|
pgaio_enter_batchmode();
|
|
|
|
while (stream->ios_in_progress < stream->max_ios &&
|
|
stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
|
|
{
|
|
BlockNumber blocknum;
|
|
int16 buffer_index;
|
|
void *per_buffer_data;
|
|
|
|
if (stream->pending_read_nblocks == stream->io_combine_limit)
|
|
{
|
|
read_stream_start_pending_read(stream);
|
|
continue;
|
|
}
|
|
|
|
/*
|
|
* See which block the callback wants next in the stream. We need to
|
|
* compute the index of the Nth block of the pending read including
|
|
* wrap-around, but we don't want to use the expensive % operator.
|
|
*/
|
|
buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
|
|
if (buffer_index >= stream->queue_size)
|
|
buffer_index -= stream->queue_size;
|
|
Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
|
|
per_buffer_data = get_per_buffer_data(stream, buffer_index);
|
|
blocknum = read_stream_get_block(stream, per_buffer_data);
|
|
if (blocknum == InvalidBlockNumber)
|
|
{
|
|
/* End of stream. */
|
|
stream->distance = 0;
|
|
break;
|
|
}
|
|
|
|
/* Can we merge it with the pending read? */
|
|
if (stream->pending_read_nblocks > 0 &&
|
|
stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
|
|
{
|
|
stream->pending_read_nblocks++;
|
|
continue;
|
|
}
|
|
|
|
/* We have to start the pending read before we can build another. */
|
|
while (stream->pending_read_nblocks > 0)
|
|
{
|
|
if (!read_stream_start_pending_read(stream) ||
|
|
stream->ios_in_progress == stream->max_ios)
|
|
{
|
|
/* We've hit the buffer or I/O limit. Rewind and stop here. */
|
|
read_stream_unget_block(stream, blocknum);
|
|
if (stream->batch_mode)
|
|
pgaio_exit_batchmode();
|
|
return;
|
|
}
|
|
}
|
|
|
|
/* This is the start of a new pending read. */
|
|
stream->pending_read_blocknum = blocknum;
|
|
stream->pending_read_nblocks = 1;
|
|
}
|
|
|
|
/*
|
|
* We don't start the pending read just because we've hit the distance
|
|
* limit, preferring to give it another chance to grow to full
|
|
* io_combine_limit size once more buffers have been consumed. However,
|
|
* if we've already reached io_combine_limit, or we've reached the
|
|
* distance limit and there isn't anything pinned yet, or the callback has
|
|
* signaled end-of-stream, we start the read immediately. Note that the
|
|
* pending read can exceed the distance goal, if the latter was reduced
|
|
* after hitting the per-backend buffer limit.
|
|
*/
|
|
if (stream->pending_read_nblocks > 0 &&
|
|
(stream->pending_read_nblocks == stream->io_combine_limit ||
|
|
(stream->pending_read_nblocks >= stream->distance &&
|
|
stream->pinned_buffers == 0) ||
|
|
stream->distance == 0) &&
|
|
stream->ios_in_progress < stream->max_ios)
|
|
read_stream_start_pending_read(stream);
|
|
|
|
/*
|
|
* There should always be something pinned when we leave this function,
|
|
* whether started by this call or not, unless we've hit the end of the
|
|
* stream. In the worst case we can always make progress one buffer at a
|
|
* time.
|
|
*/
|
|
Assert(stream->pinned_buffers > 0 || stream->distance == 0);
|
|
|
|
if (stream->batch_mode)
|
|
pgaio_exit_batchmode();
|
|
}
|
|
|
|
/*
|
|
* Create a new read stream object that can be used to perform the equivalent
|
|
* of a series of ReadBuffer() calls for one fork of one relation.
|
|
* Internally, it generates larger vectored reads where possible by looking
|
|
* ahead. The callback should return block numbers or InvalidBlockNumber to
|
|
* signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
|
|
* write extra data for each block into the space provided to it. It will
|
|
* also receive callback_private_data for its own purposes.
|
|
*/
|
|
static ReadStream *
|
|
read_stream_begin_impl(int flags,
|
|
BufferAccessStrategy strategy,
|
|
Relation rel,
|
|
SMgrRelation smgr,
|
|
char persistence,
|
|
ForkNumber forknum,
|
|
ReadStreamBlockNumberCB callback,
|
|
void *callback_private_data,
|
|
size_t per_buffer_data_size)
|
|
{
|
|
ReadStream *stream;
|
|
size_t size;
|
|
int16 queue_size;
|
|
int16 queue_overflow;
|
|
int max_ios;
|
|
int strategy_pin_limit;
|
|
uint32 max_pinned_buffers;
|
|
uint32 max_possible_buffer_limit;
|
|
Oid tablespace_id;
|
|
|
|
/*
|
|
* Decide how many I/Os we will allow to run at the same time. That
|
|
* currently means advice to the kernel to tell it that we will soon read.
|
|
* This number also affects how far we look ahead for opportunities to
|
|
* start more I/Os.
|
|
*/
|
|
tablespace_id = smgr->smgr_rlocator.locator.spcOid;
|
|
if (!OidIsValid(MyDatabaseId) ||
|
|
(rel && IsCatalogRelation(rel)) ||
|
|
IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
|
|
{
|
|
/*
|
|
* Avoid circularity while trying to look up tablespace settings or
|
|
* before spccache.c is ready.
|
|
*/
|
|
max_ios = effective_io_concurrency;
|
|
}
|
|
else if (flags & READ_STREAM_MAINTENANCE)
|
|
max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
|
|
else
|
|
max_ios = get_tablespace_io_concurrency(tablespace_id);
|
|
|
|
/* Cap to INT16_MAX to avoid overflowing below */
|
|
max_ios = Min(max_ios, PG_INT16_MAX);
|
|
|
|
/*
|
|
* If starting a multi-block I/O near the end of the queue, we might
|
|
* temporarily need extra space for overflowing buffers before they are
|
|
* moved to regular circular position. This is the maximum extra space we
|
|
* could need.
|
|
*/
|
|
queue_overflow = io_combine_limit - 1;
|
|
|
|
/*
|
|
* Choose the maximum number of buffers we're prepared to pin. We try to
|
|
* pin fewer if we can, though. We add one so that we can make progress
|
|
* even if max_ios is set to 0 (see also further down). For max_ios > 0,
|
|
* this also allows an extra full I/O's worth of buffers: after an I/O
|
|
* finishes we don't want to have to wait for its buffers to be consumed
|
|
* before starting a new one.
|
|
*
|
|
* Be careful not to allow int16 to overflow. That is possible with the
|
|
* current GUC range limits, so this is an artificial limit of ~32k
|
|
* buffers and we'd need to adjust the types to exceed that. We also have
|
|
* to allow for the spare entry and the overflow space.
|
|
*/
|
|
max_pinned_buffers = (max_ios + 1) * io_combine_limit;
|
|
max_pinned_buffers = Min(max_pinned_buffers,
|
|
PG_INT16_MAX - queue_overflow - 1);
|
|
|
|
/* Give the strategy a chance to limit the number of buffers we pin. */
|
|
strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
|
|
max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
|
|
|
|
/*
|
|
* Also limit our queue to the maximum number of pins we could ever be
|
|
* allowed to acquire according to the buffer manager. We may not really
|
|
* be able to use them all due to other pins held by this backend, but
|
|
* we'll check that later in read_stream_start_pending_read().
|
|
*/
|
|
if (SmgrIsTemp(smgr))
|
|
max_possible_buffer_limit = GetLocalPinLimit();
|
|
else
|
|
max_possible_buffer_limit = GetPinLimit();
|
|
max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
|
|
|
|
/*
|
|
* The limit might be zero on a system configured with too few buffers for
|
|
* the number of connections. We need at least one to make progress.
|
|
*/
|
|
max_pinned_buffers = Max(1, max_pinned_buffers);
|
|
|
|
/*
|
|
* We need one extra entry for buffers and per-buffer data, because users
|
|
* of per-buffer data have access to the object until the next call to
|
|
* read_stream_next_buffer(), so we need a gap between the head and tail
|
|
* of the queue so that we don't clobber it.
|
|
*/
|
|
queue_size = max_pinned_buffers + 1;
|
|
|
|
/*
|
|
* Allocate the object, the buffers, the ios and per_buffer_data space in
|
|
* one big chunk. Though we have queue_size buffers, we want to be able
|
|
* to assume that all the buffers for a single read are contiguous (i.e.
|
|
* don't wrap around halfway through), so we allow temporary overflows of
|
|
* up to the maximum possible overflow size.
|
|
*/
|
|
size = offsetof(ReadStream, buffers);
|
|
size += sizeof(Buffer) * (queue_size + queue_overflow);
|
|
size += sizeof(InProgressIO) * Max(1, max_ios);
|
|
size += per_buffer_data_size * queue_size;
|
|
size += MAXIMUM_ALIGNOF * 2;
|
|
stream = (ReadStream *) palloc(size);
|
|
memset(stream, 0, offsetof(ReadStream, buffers));
|
|
stream->ios = (InProgressIO *)
|
|
MAXALIGN(&stream->buffers[queue_size + queue_overflow]);
|
|
if (per_buffer_data_size > 0)
|
|
stream->per_buffer_data = (void *)
|
|
MAXALIGN(&stream->ios[Max(1, max_ios)]);
|
|
|
|
stream->sync_mode = io_method == IOMETHOD_SYNC;
|
|
stream->batch_mode = flags & READ_STREAM_USE_BATCHING;
|
|
|
|
#ifdef USE_PREFETCH
|
|
|
|
/*
|
|
* Read-ahead advice simulating asynchronous I/O with synchronous calls.
|
|
* Issue advice only if AIO is not used, direct I/O isn't enabled, the
|
|
* caller hasn't promised sequential access (overriding our detection
|
|
* heuristics), and max_ios hasn't been set to zero.
|
|
*/
|
|
if (stream->sync_mode &&
|
|
(io_direct_flags & IO_DIRECT_DATA) == 0 &&
|
|
(flags & READ_STREAM_SEQUENTIAL) == 0 &&
|
|
max_ios > 0)
|
|
stream->advice_enabled = true;
|
|
#endif
|
|
|
|
/*
|
|
* Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
|
|
* we still need to allocate space to combine and run one I/O. Bump it up
|
|
* to one, and remember to ask for synchronous I/O only.
|
|
*/
|
|
if (max_ios == 0)
|
|
{
|
|
max_ios = 1;
|
|
stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
|
|
}
|
|
|
|
/*
|
|
* Capture stable values for these two GUC-derived numbers for the
|
|
* lifetime of this stream, so we don't have to worry about the GUCs
|
|
* changing underneath us beyond this point.
|
|
*/
|
|
stream->max_ios = max_ios;
|
|
stream->io_combine_limit = io_combine_limit;
|
|
|
|
stream->per_buffer_data_size = per_buffer_data_size;
|
|
stream->max_pinned_buffers = max_pinned_buffers;
|
|
stream->queue_size = queue_size;
|
|
stream->callback = callback;
|
|
stream->callback_private_data = callback_private_data;
|
|
stream->buffered_blocknum = InvalidBlockNumber;
|
|
stream->seq_blocknum = InvalidBlockNumber;
|
|
stream->seq_until_processed = InvalidBlockNumber;
|
|
stream->temporary = SmgrIsTemp(smgr);
|
|
|
|
/*
|
|
* Skip the initial ramp-up phase if the caller says we're going to be
|
|
* reading the whole relation. This way we start out assuming we'll be
|
|
* doing full io_combine_limit sized reads.
|
|
*/
|
|
if (flags & READ_STREAM_FULL)
|
|
stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
|
|
else
|
|
stream->distance = 1;
|
|
|
|
/*
|
|
* Since we always access the same relation, we can initialize parts of
|
|
* the ReadBuffersOperation objects and leave them that way, to avoid
|
|
* wasting CPU cycles writing to them for each read.
|
|
*/
|
|
for (int i = 0; i < max_ios; ++i)
|
|
{
|
|
stream->ios[i].op.rel = rel;
|
|
stream->ios[i].op.smgr = smgr;
|
|
stream->ios[i].op.persistence = persistence;
|
|
stream->ios[i].op.forknum = forknum;
|
|
stream->ios[i].op.strategy = strategy;
|
|
}
|
|
|
|
return stream;
|
|
}
|
|
|
|
/*
|
|
* Create a new read stream for reading a relation.
|
|
* See read_stream_begin_impl() for the detailed explanation.
|
|
*/
|
|
ReadStream *
|
|
read_stream_begin_relation(int flags,
|
|
BufferAccessStrategy strategy,
|
|
Relation rel,
|
|
ForkNumber forknum,
|
|
ReadStreamBlockNumberCB callback,
|
|
void *callback_private_data,
|
|
size_t per_buffer_data_size)
|
|
{
|
|
return read_stream_begin_impl(flags,
|
|
strategy,
|
|
rel,
|
|
RelationGetSmgr(rel),
|
|
rel->rd_rel->relpersistence,
|
|
forknum,
|
|
callback,
|
|
callback_private_data,
|
|
per_buffer_data_size);
|
|
}
|
|
|
|
/*
|
|
* Create a new read stream for reading a SMgr relation.
|
|
* See read_stream_begin_impl() for the detailed explanation.
|
|
*/
|
|
ReadStream *
|
|
read_stream_begin_smgr_relation(int flags,
|
|
BufferAccessStrategy strategy,
|
|
SMgrRelation smgr,
|
|
char smgr_persistence,
|
|
ForkNumber forknum,
|
|
ReadStreamBlockNumberCB callback,
|
|
void *callback_private_data,
|
|
size_t per_buffer_data_size)
|
|
{
|
|
return read_stream_begin_impl(flags,
|
|
strategy,
|
|
NULL,
|
|
smgr,
|
|
smgr_persistence,
|
|
forknum,
|
|
callback,
|
|
callback_private_data,
|
|
per_buffer_data_size);
|
|
}
|
|
|
|
/*
|
|
* Pull one pinned buffer out of a stream. Each call returns successive
|
|
* blocks in the order specified by the callback. If per_buffer_data_size was
|
|
* set to a non-zero size, *per_buffer_data receives a pointer to the extra
|
|
* per-buffer data that the callback had a chance to populate, which remains
|
|
* valid until the next call to read_stream_next_buffer(). When the stream
|
|
* runs out of data, InvalidBuffer is returned. The caller may decide to end
|
|
* the stream early at any time by calling read_stream_end().
|
|
*/
|
|
Buffer
|
|
read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
|
|
{
|
|
Buffer buffer;
|
|
int16 oldest_buffer_index;
|
|
|
|
#ifndef READ_STREAM_DISABLE_FAST_PATH
|
|
|
|
/*
|
|
* A fast path for all-cached scans. This is the same as the usual
|
|
* algorithm, but it is specialized for no I/O and no per-buffer data, so
|
|
* we can skip the queue management code, stay in the same buffer slot and
|
|
* use singular StartReadBuffer().
|
|
*/
|
|
if (likely(stream->fast_path))
|
|
{
|
|
BlockNumber next_blocknum;
|
|
|
|
/* Fast path assumptions. */
|
|
Assert(stream->ios_in_progress == 0);
|
|
Assert(stream->forwarded_buffers == 0);
|
|
Assert(stream->pinned_buffers == 1);
|
|
Assert(stream->distance == 1);
|
|
Assert(stream->pending_read_nblocks == 0);
|
|
Assert(stream->per_buffer_data_size == 0);
|
|
Assert(stream->initialized_buffers > stream->oldest_buffer_index);
|
|
|
|
/* We're going to return the buffer we pinned last time. */
|
|
oldest_buffer_index = stream->oldest_buffer_index;
|
|
Assert((oldest_buffer_index + 1) % stream->queue_size ==
|
|
stream->next_buffer_index);
|
|
buffer = stream->buffers[oldest_buffer_index];
|
|
Assert(buffer != InvalidBuffer);
|
|
|
|
/* Choose the next block to pin. */
|
|
next_blocknum = read_stream_get_block(stream, NULL);
|
|
|
|
if (likely(next_blocknum != InvalidBlockNumber))
|
|
{
|
|
int flags = stream->read_buffers_flags;
|
|
|
|
if (stream->advice_enabled)
|
|
flags |= READ_BUFFERS_ISSUE_ADVICE;
|
|
|
|
/*
|
|
* Pin a buffer for the next call. Same buffer entry, and
|
|
* arbitrary I/O entry (they're all free). We don't have to
|
|
* adjust pinned_buffers because we're transferring one to caller
|
|
* but pinning one more.
|
|
*
|
|
* In the fast path we don't need to check the pin limit. We're
|
|
* always allowed at least one pin so that progress can be made,
|
|
* and that's all we need here. Although two pins are momentarily
|
|
* held at the same time, the model used here is that the stream
|
|
* holds only one, and the other now belongs to the caller.
|
|
*/
|
|
if (likely(!StartReadBuffer(&stream->ios[0].op,
|
|
&stream->buffers[oldest_buffer_index],
|
|
next_blocknum,
|
|
flags)))
|
|
{
|
|
/* Fast return. */
|
|
return buffer;
|
|
}
|
|
|
|
/* Next call must wait for I/O for the newly pinned buffer. */
|
|
stream->oldest_io_index = 0;
|
|
stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
|
|
stream->ios_in_progress = 1;
|
|
stream->ios[0].buffer_index = oldest_buffer_index;
|
|
stream->seq_blocknum = next_blocknum + 1;
|
|
}
|
|
else
|
|
{
|
|
/* No more blocks, end of stream. */
|
|
stream->distance = 0;
|
|
stream->oldest_buffer_index = stream->next_buffer_index;
|
|
stream->pinned_buffers = 0;
|
|
stream->buffers[oldest_buffer_index] = InvalidBuffer;
|
|
}
|
|
|
|
stream->fast_path = false;
|
|
return buffer;
|
|
}
|
|
#endif
|
|
|
|
if (unlikely(stream->pinned_buffers == 0))
|
|
{
|
|
Assert(stream->oldest_buffer_index == stream->next_buffer_index);
|
|
|
|
/* End of stream reached? */
|
|
if (stream->distance == 0)
|
|
return InvalidBuffer;
|
|
|
|
/*
|
|
* The usual order of operations is that we look ahead at the bottom
|
|
* of this function after potentially finishing an I/O and making
|
|
* space for more, but if we're just starting up we'll need to crank
|
|
* the handle to get started.
|
|
*/
|
|
read_stream_look_ahead(stream);
|
|
|
|
/* End of stream reached? */
|
|
if (stream->pinned_buffers == 0)
|
|
{
|
|
Assert(stream->distance == 0);
|
|
return InvalidBuffer;
|
|
}
|
|
}
|
|
|
|
/* Grab the oldest pinned buffer and associated per-buffer data. */
|
|
Assert(stream->pinned_buffers > 0);
|
|
oldest_buffer_index = stream->oldest_buffer_index;
|
|
Assert(oldest_buffer_index >= 0 &&
|
|
oldest_buffer_index < stream->queue_size);
|
|
buffer = stream->buffers[oldest_buffer_index];
|
|
if (per_buffer_data)
|
|
*per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
|
|
|
|
Assert(BufferIsValid(buffer));
|
|
|
|
/* Do we have to wait for an associated I/O first? */
|
|
if (stream->ios_in_progress > 0 &&
|
|
stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
|
|
{
|
|
int16 io_index = stream->oldest_io_index;
|
|
int32 distance; /* wider temporary value, clamped below */
|
|
|
|
/* Sanity check that we still agree on the buffers. */
|
|
Assert(stream->ios[io_index].op.buffers ==
|
|
&stream->buffers[oldest_buffer_index]);
|
|
|
|
WaitReadBuffers(&stream->ios[io_index].op);
|
|
|
|
Assert(stream->ios_in_progress > 0);
|
|
stream->ios_in_progress--;
|
|
if (++stream->oldest_io_index == stream->max_ios)
|
|
stream->oldest_io_index = 0;
|
|
|
|
/* Look-ahead distance ramps up rapidly after we do I/O. */
|
|
distance = stream->distance * 2;
|
|
distance = Min(distance, stream->max_pinned_buffers);
|
|
stream->distance = distance;
|
|
|
|
/*
|
|
* If we've reached the first block of a sequential region we're
|
|
* issuing advice for, cancel that until the next jump. The kernel
|
|
* will see the sequential preadv() pattern starting here.
|
|
*/
|
|
if (stream->advice_enabled &&
|
|
stream->ios[io_index].op.blocknum == stream->seq_until_processed)
|
|
stream->seq_until_processed = InvalidBlockNumber;
|
|
}
|
|
|
|
/*
|
|
* We must zap this queue entry, or else it would appear as a forwarded
|
|
* buffer. If it's potentially in the overflow zone (ie from a
|
|
* multi-block I/O that wrapped around the queue), also zap the copy.
|
|
*/
|
|
stream->buffers[oldest_buffer_index] = InvalidBuffer;
|
|
if (oldest_buffer_index < stream->io_combine_limit - 1)
|
|
stream->buffers[stream->queue_size + oldest_buffer_index] =
|
|
InvalidBuffer;
|
|
|
|
#if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
|
|
|
|
/*
|
|
* The caller will get access to the per-buffer data, until the next call.
|
|
* We wipe the one before, which is never occupied because queue_size
|
|
* allowed one extra element. This will hopefully trip up client code
|
|
* that is holding a dangling pointer to it.
|
|
*/
|
|
if (stream->per_buffer_data)
|
|
{
|
|
void *per_buffer_data;
|
|
|
|
per_buffer_data = get_per_buffer_data(stream,
|
|
oldest_buffer_index == 0 ?
|
|
stream->queue_size - 1 :
|
|
oldest_buffer_index - 1);
|
|
|
|
#if defined(CLOBBER_FREED_MEMORY)
|
|
/* This also tells Valgrind the memory is "noaccess". */
|
|
wipe_mem(per_buffer_data, stream->per_buffer_data_size);
|
|
#elif defined(USE_VALGRIND)
|
|
/* Tell it ourselves. */
|
|
VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
|
|
stream->per_buffer_data_size);
|
|
#endif
|
|
}
|
|
#endif
|
|
|
|
/* Pin transferred to caller. */
|
|
Assert(stream->pinned_buffers > 0);
|
|
stream->pinned_buffers--;
|
|
|
|
/* Advance oldest buffer, with wrap-around. */
|
|
stream->oldest_buffer_index++;
|
|
if (stream->oldest_buffer_index == stream->queue_size)
|
|
stream->oldest_buffer_index = 0;
|
|
|
|
/* Prepare for the next call. */
|
|
read_stream_look_ahead(stream);
|
|
|
|
#ifndef READ_STREAM_DISABLE_FAST_PATH
|
|
/* See if we can take the fast path for all-cached scans next time. */
|
|
if (stream->ios_in_progress == 0 &&
|
|
stream->forwarded_buffers == 0 &&
|
|
stream->pinned_buffers == 1 &&
|
|
stream->distance == 1 &&
|
|
stream->pending_read_nblocks == 0 &&
|
|
stream->per_buffer_data_size == 0)
|
|
{
|
|
/*
|
|
* The fast path spins on one buffer entry repeatedly instead of
|
|
* rotating through the whole queue and clearing the entries behind
|
|
* it. If the buffer it starts with happened to be forwarded between
|
|
* StartReadBuffers() calls and also wrapped around the circular queue
|
|
* partway through, then a copy also exists in the overflow zone, and
|
|
* it won't clear it out as the regular path would. Do that now, so
|
|
* it doesn't need code for that.
|
|
*/
|
|
if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
|
|
stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
|
|
InvalidBuffer;
|
|
|
|
stream->fast_path = true;
|
|
}
|
|
#endif
|
|
|
|
return buffer;
|
|
}
|
|
|
|
/*
|
|
* Transitional support for code that would like to perform or skip reads
|
|
* itself, without using the stream. Returns, and consumes, the next block
|
|
* number that would be read by the stream's look-ahead algorithm, or
|
|
* InvalidBlockNumber if the end of the stream is reached. Also reports the
|
|
* strategy that would be used to read it.
|
|
*/
|
|
BlockNumber
|
|
read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
|
|
{
|
|
*strategy = stream->ios[0].op.strategy;
|
|
return read_stream_get_block(stream, NULL);
|
|
}
|
|
|
|
/*
|
|
* Reset a read stream by releasing any queued up buffers, allowing the stream
|
|
* to be used again for different blocks. This can be used to clear an
|
|
* end-of-stream condition and start again, or to throw away blocks that were
|
|
* speculatively read and read some different blocks instead.
|
|
*/
|
|
void
|
|
read_stream_reset(ReadStream *stream)
|
|
{
|
|
int16 index;
|
|
Buffer buffer;
|
|
|
|
/* Stop looking ahead. */
|
|
stream->distance = 0;
|
|
|
|
/* Forget buffered block number and fast path state. */
|
|
stream->buffered_blocknum = InvalidBlockNumber;
|
|
stream->fast_path = false;
|
|
|
|
/* Unpin anything that wasn't consumed. */
|
|
while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
|
|
ReleaseBuffer(buffer);
|
|
|
|
/* Unpin any unused forwarded buffers. */
|
|
index = stream->next_buffer_index;
|
|
while (index < stream->initialized_buffers &&
|
|
(buffer = stream->buffers[index]) != InvalidBuffer)
|
|
{
|
|
Assert(stream->forwarded_buffers > 0);
|
|
stream->forwarded_buffers--;
|
|
ReleaseBuffer(buffer);
|
|
|
|
stream->buffers[index] = InvalidBuffer;
|
|
if (index < stream->io_combine_limit - 1)
|
|
stream->buffers[stream->queue_size + index] = InvalidBuffer;
|
|
|
|
if (++index == stream->queue_size)
|
|
index = 0;
|
|
}
|
|
|
|
Assert(stream->forwarded_buffers == 0);
|
|
Assert(stream->pinned_buffers == 0);
|
|
Assert(stream->ios_in_progress == 0);
|
|
|
|
/* Start off assuming data is cached. */
|
|
stream->distance = 1;
|
|
}
|
|
|
|
/*
|
|
* Release and free a read stream.
|
|
*/
|
|
void
|
|
read_stream_end(ReadStream *stream)
|
|
{
|
|
read_stream_reset(stream);
|
|
pfree(stream);
|
|
}
|