Add read_stream_{pause,resume}()

Read stream users can now pause lookahead when no blocks are currently
available. After resuming, subsequent read_stream_next_buffer() calls
continue lookahead with the previous lookahead distance.

This is especially useful for read stream users with self-referential
access patterns (where consuming already-read buffers can produce
additional block numbers).

Author: Thomas Munro <thomas.munro@gmail.com>
Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Nazir Bilal Yavuz <byavuz81@gmail.com>
Discussion: https://postgr.es/m/CA%2BhUKGJLT2JvWLEiBXMbkSSc5so_Y7%3DN%2BS2ce7npjLw8QL3d5w%40mail.gmail.com
This commit is contained in:
Melanie Plageman 2026-03-03 15:55:52 -05:00
parent b30656ce00
commit 38229cb905
2 changed files with 29 additions and 0 deletions

View file

@ -100,6 +100,7 @@ struct ReadStream
int16 pinned_buffers;
int16 distance;
int16 initialized_buffers;
int16 resume_distance;
int read_buffers_flags;
bool sync_mode; /* using io_method=sync */
bool batch_mode; /* READ_STREAM_USE_BATCHING */
@ -711,6 +712,7 @@ read_stream_begin_impl(int flags,
stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
else
stream->distance = 1;
stream->resume_distance = stream->distance;
/*
* Since we always access the same relation, we can initialize parts of
@ -1034,6 +1036,30 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
return read_stream_get_block(stream, NULL);
}
/*
* Temporarily stop consuming block numbers from the block number callback.
* If called inside the block number callback, its return value should be
* returned by the callback.
*/
BlockNumber
read_stream_pause(ReadStream *stream)
{
stream->resume_distance = stream->distance;
stream->distance = 0;
return InvalidBlockNumber;
}
/*
* Resume looking ahead after the block number callback reported
* end-of-stream. This is useful for streams of self-referential blocks, after
* a buffer needed to be consumed and examined to find more block numbers.
*/
void
read_stream_resume(ReadStream *stream)
{
stream->distance = stream->resume_distance;
}
/*
* Reset a read stream by releasing any queued up buffers, allowing the stream
* to be used again for different blocks. This can be used to clear an
@ -1080,6 +1106,7 @@ read_stream_reset(ReadStream *stream)
/* Start off assuming data is cached. */
stream->distance = 1;
stream->resume_distance = stream->distance;
}
/*

View file

@ -99,6 +99,8 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags,
ReadStreamBlockNumberCB callback,
void *callback_private_data,
size_t per_buffer_data_size);
extern BlockNumber read_stream_pause(ReadStream *stream);
extern void read_stream_resume(ReadStream *stream);
extern void read_stream_reset(ReadStream *stream);
extern void read_stream_end(ReadStream *stream);