Chunker: split logic into FileFMAPReader and FileReader

`FileFMAPReader` deals with sparse files (data vs holes) or fmap and yields blocks of some specific read_size using a generator.

`FileReader` uses the `FileFMAPReader` to fill an internal buffer and lets users use its `read` method to read arbitrary sized chunks from the buffer.

For both classes, instances now only deal with a single file.
This commit is contained in:
Thomas Waldmann 2025-05-27 14:58:22 +02:00
parent 2818a0c26e
commit f036152789
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
2 changed files with 228 additions and 54 deletions

View file

@ -21,10 +21,33 @@ class ChunkerFailing:
def __init__(self, block_size: int, map: str) -> None: ...
def chunkify(self, fd: BinaryIO = None, fh: int = -1) -> Iterator: ...
class FileFMAPReader:
def __init__(
self,
*,
fd: BinaryIO = None,
fh: int = -1,
read_size: int = 0,
header_size: int = 0,
sparse: bool = False,
fmap: List[fmap_entry] = None,
) -> None: ...
def _build_fmap(self) -> List[fmap_entry]: ...
def blockify(self) -> Iterator: ...
class FileReader:
def __init__(self, block_size: int, header_size: int = 0, sparse: bool = False) -> None: ...
def _build_fmap(self, fd: BinaryIO = None, fh: int = -1) -> List[fmap_entry]: ...
def blockify(self, fd: BinaryIO = None, fh: int = -1, fmap: List[fmap_entry] = None) -> Iterator: ...
def __init__(
self,
*,
fd: BinaryIO = None,
fh: int = -1,
read_size: int = 0,
header_size: int = 0,
sparse: bool = False,
fmap: List[fmap_entry] = None,
) -> None: ...
def _fill_buffer(self) -> bool: ...
def read(self, size: int, return_chunk_info: bool = False) -> Any: ...
class ChunkerFixed:
def __init__(self, block_size: int, header_size: int = 0, sparse: bool = False) -> None: ...

View file

@ -165,7 +165,7 @@ class ChunkerFailing:
return
class FileReader:
class FileFMAPReader:
"""
This is for reading blocks from a file.
@ -180,29 +180,34 @@ class FileReader:
Note: the last block of a data or hole range may be less than the block size,
this is supported and not considered to be an error.
"""
def __init__(self, read_size, header_size=0, sparse=False):
self.read_size = read_size # how much data we want to read at once
def __init__(self, *, fd=None, fh=-1, read_size=0, header_size=0, sparse=False, fmap=None):
assert fd is not None or fh >= 0
self.fd = fd
self.fh = fh
assert read_size > 0
assert read_size <= len(zeros)
self.read_size = read_size # how much data we want to read at once
assert header_size <= read_size
self.header_size = header_size # size of the first block
assert read_size >= header_size
self.reading_time = 0.0 # time spent in reading/seeking
# should borg try to do sparse input processing?
# whether it actually can be done depends on the input file being seekable.
self.try_sparse = sparse and has_seek_hole
self.fmap = fmap
def _build_fmap(self, fd=None, fh=-1):
def _build_fmap(self):
started_fmap = time.monotonic()
fmap = None
if self.try_sparse:
try:
if self.header_size > 0:
header_map = [(0, self.header_size, True), ]
dseek(self.header_size, os.SEEK_SET, fd, fh)
body_map = list(sparsemap(fd, fh))
dseek(0, os.SEEK_SET, fd, fh)
dseek(self.header_size, os.SEEK_SET, self.fd, self.fh)
body_map = list(sparsemap(self.fd, self.fh))
dseek(0, os.SEEK_SET, self.fd, self.fh)
else:
header_map = []
body_map = list(sparsemap(fd, fh))
body_map = list(sparsemap(self.fd, self.fh))
except OSError as err:
# seeking did not work
pass
@ -225,30 +230,27 @@ class FileReader:
self.reading_time += time.monotonic() - started_fmap
return fmap
def blockify(self, fd=None, fh=-1, fmap=None):
def blockify(self):
"""
Read <read_size> sized blocks from a file, optionally supporting a differently sized header block.
:param fd: Python file object
:param fh: OS-level file handle (if available),
defaults to -1 which means not to use OS-level fd.
:param fmap: a file map, same format as generated by sparsemap
"""
fmap =self._build_fmap(fd, fh) if fmap is None else fmap
if self.fmap is None:
self.fmap = self._build_fmap()
offset = 0
# note: the optional header block is implemented via the first fmap entry
for range_start, range_size, is_data in fmap:
for range_start, range_size, is_data in self.fmap:
if range_start != offset:
# this is for the case when the fmap does not cover the file completely,
# e.g. it could be without the ranges of holes or of unchanged data.
offset = range_start
dseek(offset, os.SEEK_SET, fd, fh)
dseek(offset, os.SEEK_SET, self.fd, self.fh)
while range_size:
started_reading = time.monotonic()
wanted = min(range_size, self.read_size)
if is_data:
# read block from the range
data = dread(offset, wanted, fd, fh)
data = dread(offset, wanted, self.fd, self.fh)
got = len(data)
if zeros.startswith(data):
data = None
@ -257,20 +259,164 @@ class FileReader:
allocation = CH_DATA
else: # hole
# seek over block from the range
pos = dseek(wanted, os.SEEK_CUR, fd, fh)
pos = dseek(wanted, os.SEEK_CUR, self.fd, self.fh)
got = pos - offset
data = None
allocation = CH_HOLE
self.reading_time += time.monotonic() - started_reading
if got > 0:
offset += got
range_size -= got
self.reading_time += time.monotonic() - started_reading
yield Chunk(data, size=got, allocation=allocation)
if got < wanted:
# we did not get enough data, looks like EOF.
return
class FileReader:
"""
This is a buffered reader for file data.
It maintains a buffer that is filled by using FileFMAPReader.blockify generator when needed.
The data in that buffer is consumed by clients calling FileReader.read.
"""
def __init__(self, *, fd=None, fh=-1, read_size=0, header_size=0, sparse=False, fmap=None):
self.reader = FileFMAPReader(fd=fd, fh=fh, read_size=read_size, header_size=header_size, sparse=sparse, fmap=fmap)
self.buffer = [] # list of (data, meta) tuples
self.offset = 0 # offset into the first buffer object's data
self.remaining_bytes = 0 # total bytes available in buffer
self.blockify_gen = None # generator from FileFMAPReader.blockify
self.fd = fd
self.fh = fh
self.fmap = fmap
def _fill_buffer(self):
"""
Fill the buffer with more data from the blockify generator.
Returns True if more data was added, False if EOF.
"""
if self.blockify_gen is None:
return False
try:
chunk = next(self.blockify_gen)
# Store both data and metadata in the buffer
self.buffer.append((chunk.data, chunk.meta))
self.remaining_bytes += chunk.meta["size"]
return True
except StopIteration:
self.blockify_gen = None
return False
def read(self, size, return_chunk_info=False):
"""
Read up to 'size' bytes from the file.
:param size: Number of bytes to read
:param return_chunk_info: if True, return a tuple (data, allocation, size) instead of just data
:return: Bytes object containing the read data, or None if no data is available.
If return_chunk_info is True, returns a tuple (data, allocation, size).
"""
# Initialize if not already done
if self.blockify_gen is None:
self.buffer = []
self.offset = 0
self.remaining_bytes = 0
self.blockify_gen = self.reader.blockify()
# If we don't have enough data in the buffer, try to fill it
while self.remaining_bytes < size:
if not self._fill_buffer():
# No more data available, return what we have
break
# If we have no data at all, return None
if not self.buffer:
return None if not return_chunk_info else (None, None, 0)
# Get the first chunk from the buffer
data, meta = self.buffer[0]
chunk_size = meta["size"]
allocation = meta["allocation"]
# If we're returning chunk info and this is a non-data chunk, handle it specially
if return_chunk_info and (allocation != CH_DATA or data is None):
# For non-data chunks, we return the allocation type and size
size_to_return = min(size, chunk_size - self.offset)
# Update buffer state
if size_to_return == chunk_size - self.offset:
self.buffer.pop(0)
self.offset = 0
else:
self.offset += size_to_return
self.remaining_bytes -= size_to_return
return (None, allocation, size_to_return)
# For data chunks or when not returning chunk info, proceed as before
# Prepare to collect the requested data
result = bytearray()
bytes_to_read = min(size, self.remaining_bytes)
bytes_read = 0
# Read data from the buffer
while bytes_read < bytes_to_read and self.buffer:
data, meta = self.buffer[0]
chunk_size = meta["size"]
allocation = meta["allocation"]
# Skip non-data chunks if not returning chunk info
if (allocation != CH_DATA or data is None) and not return_chunk_info:
self.buffer.pop(0)
self.remaining_bytes -= chunk_size
continue
# If this is a non-data chunk and we're returning chunk info, break to handle it
if (allocation != CH_DATA or data is None) and return_chunk_info:
if bytes_read > 0:
# We've already read some data, so return that first
break
else:
# No data read yet, return info about this non-data chunk
size_to_return = min(size, chunk_size - self.offset)
# Update buffer state
if size_to_return == chunk_size - self.offset:
self.buffer.pop(0)
self.offset = 0
else:
self.offset += size_to_return
self.remaining_bytes -= size_to_return
return (None, allocation, size_to_return)
# Calculate how much we can read from this chunk
available = chunk_size - self.offset
to_read = min(available, bytes_to_read - bytes_read)
# Read the data
if to_read > 0:
result.extend(data[self.offset:self.offset + to_read])
bytes_read += to_read
# Update offset or remove chunk if fully consumed
if to_read < available:
self.offset += to_read
else:
self.offset = 0
self.buffer.pop(0)
self.remaining_bytes -= to_read
if return_chunk_info:
return (bytes(result) if result else None, CH_DATA, bytes_read)
else:
return bytes(result) if result else None
class ChunkerFixed:
"""
This is a simple chunker for input data with data usually staying at same
@ -297,7 +443,8 @@ class ChunkerFixed:
self.chunking_time = 0.0 # likely will stay close to zero - not much to do here.
self.reader_block_size = self.block_size # start simple
assert self.reader_block_size % self.block_size == 0, "reader_block_size must be N * block_size"
self.reader = FileReader(self.reader_block_size, header_size=self.header_size, sparse=sparse)
self.reader = None
self.sparse = sparse
def chunkify(self, fd=None, fh=-1, fmap=None):
"""
@ -308,35 +455,39 @@ class ChunkerFixed:
defaults to -1 which means not to use OS-level fd.
:param fmap: a file map, same format as generated by sparsemap
"""
in_header = self.header_size > 0 # first block is header, if header size is given
for block in self.reader.blockify(fd, fh, fmap):
if in_header:
assert self.header_size == block.meta["size"]
yield block # just pass through the header block we get from the reader
in_header = False
continue
# not much to do in here
if self.reader_block_size == self.block_size:
# trivial, the reader already did all the work
yield block # just pass through, avoid creating new objects
else:
# reader block size is a multiple of our block size
read_size = block.meta["size"]
allocation = block.meta["allocation"]
start = 0
while read_size:
started_chunking = time.monotonic()
size = min(read_size, self.block_size)
if allocation == CH_DATA:
data = block.data[start:start+size] # TODO memoryview?
elif allocation in (CH_ALLOC, CH_HOLE):
data = None
else:
raise ValueError("unsupported allocation")
self.chunking_time += time.monotonic() - started_chunking
yield Chunk(data, size=size, allocation=allocation)
start += size
read_size -= size
# Initialize the reader with the file descriptors
self.reader = FileReader(fd=fd, fh=fh, read_size=self.reader_block_size,
header_size=self.header_size, sparse=self.sparse, fmap=fmap)
# Handle header if present
if self.header_size > 0:
# Read the header block using read
started_chunking = time.monotonic()
header_info = self.reader.read(self.header_size, return_chunk_info=True)
self.chunking_time += time.monotonic() - started_chunking
if header_info is not None and header_info[2] > 0:
# Unpack the header info
data, allocation, size = header_info
assert self.header_size == size
# Yield the header chunk
yield Chunk(data, size=size, allocation=allocation)
# Process the rest of the file using read
while True:
started_chunking = time.monotonic()
chunk_info = self.reader.read(self.block_size, return_chunk_info=True)
self.chunking_time += time.monotonic() - started_chunking
if chunk_info is None or chunk_info[2] == 0:
# End of file
break
# Unpack the chunk info
data, allocation, size = chunk_info
# Yield the chunk with the appropriate allocation type
yield Chunk(data, size=size, allocation=allocation)
# Cyclic polynomial / buzhash