Merge pull request #8883 from ThomasWaldmann/chunker-refactor2

Chunker refactor
This commit is contained in:
TW 2025-06-03 12:51:30 +02:00 committed by GitHub
commit 1c07087e00
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 743 additions and 104 deletions

View file

@ -146,7 +146,7 @@ class BenchmarkMixIn:
pass
for spec, func in [
("buzhash,19,23,21,4095", lambda: chunkit("buzhash", 19, 23, 21, 4095, seed=0)),
("buzhash,19,23,21,4095", lambda: chunkit("buzhash", 19, 23, 21, 4095, seed=0, sparse=False)),
("fixed,1048576", lambda: chunkit("fixed", 1048576, sparse=False)),
]:
print(f"{spec:<24} {size:<10} {timeit(func, number=100):.3f}s")

View file

@ -21,6 +21,32 @@ class ChunkerFailing:
def __init__(self, block_size: int, map: str) -> None: ...
def chunkify(self, fd: BinaryIO = None, fh: int = -1) -> Iterator: ...
class FileFMAPReader:
def __init__(
self,
*,
fd: BinaryIO = None,
fh: int = -1,
read_size: int = 0,
sparse: bool = False,
fmap: List[fmap_entry] = None,
) -> None: ...
def _build_fmap(self) -> List[fmap_entry]: ...
def blockify(self) -> Iterator: ...
class FileReader:
def __init__(
self,
*,
fd: BinaryIO = None,
fh: int = -1,
read_size: int = 0,
sparse: bool = False,
fmap: List[fmap_entry] = None,
) -> None: ...
def _fill_buffer(self) -> bool: ...
def read(self, size: int) -> Type[_Chunk]: ...
class ChunkerFixed:
def __init__(self, block_size: int, header_size: int = 0, sparse: bool = False) -> None: ...
def chunkify(self, fd: BinaryIO = None, fh: int = -1, fmap: List[fmap_entry] = None) -> Iterator: ...

View file

@ -1,10 +1,8 @@
# cython: language_level=3
# cython: cdivision=True
# cython: boundscheck=False
# cython: wraparound=False
API_VERSION = '1.2_01'
import cython
import os
import errno
import time
@ -13,7 +11,6 @@ from cpython.bytes cimport PyBytes_AsString
from libc.stdint cimport uint8_t, uint32_t
from libc.stdlib cimport malloc, free
from libc.string cimport memcpy, memmove
from posix.unistd cimport read
from .constants import CH_DATA, CH_ALLOC, CH_HOLE, zeros
@ -165,6 +162,231 @@ class ChunkerFailing:
return
class FileFMAPReader:
"""
This is for reading blocks from a file.
It optionally supports:
- using a sparsemap to read only data ranges and seek over hole ranges
for sparse files.
- using an externally given filemap to read only specific ranges from
a file.
Note: the last block of a data or hole range may be less than the read_size,
this is supported and not considered to be an error.
"""
def __init__(self, *, fd=None, fh=-1, read_size=0, sparse=False, fmap=None):
assert fd is not None or fh >= 0
self.fd = fd
self.fh = fh
assert 0 < read_size <= len(zeros)
self.read_size = read_size # how much data we want to read at once
self.reading_time = 0.0 # time spent in reading/seeking
# should borg try to do sparse input processing?
# whether it actually can be done depends on the input file being seekable.
self.try_sparse = sparse and has_seek_hole
self.fmap = fmap
def _build_fmap(self):
started_fmap = time.monotonic()
fmap = None
if self.try_sparse:
try:
fmap = list(sparsemap(self.fd, self.fh))
except OSError as err:
# seeking did not work
pass
if fmap is None:
# either sparse processing (building the fmap) was not tried or it failed.
# in these cases, we just build a "fake fmap" that considers the whole file
# as range(s) of data (no holes), so we can use the same code.
fmap = [(0, 2 ** 62, True), ]
self.reading_time += time.monotonic() - started_fmap
return fmap
def blockify(self):
"""
Read <read_size> sized blocks from a file.
"""
if self.fmap is None:
self.fmap = self._build_fmap()
offset = 0
for range_start, range_size, is_data in self.fmap:
if range_start != offset:
# this is for the case when the fmap does not cover the file completely,
# e.g. it could be without the ranges of holes or of unchanged data.
offset = range_start
dseek(offset, os.SEEK_SET, self.fd, self.fh)
while range_size:
started_reading = time.monotonic()
wanted = min(range_size, self.read_size)
if is_data:
# read block from the range
data = dread(offset, wanted, self.fd, self.fh)
got = len(data)
if zeros.startswith(data):
data = None
allocation = CH_ALLOC
else:
allocation = CH_DATA
else: # hole
# seek over block from the range
pos = dseek(wanted, os.SEEK_CUR, self.fd, self.fh)
got = pos - offset
data = None
allocation = CH_HOLE
self.reading_time += time.monotonic() - started_reading
if got > 0:
offset += got
range_size -= got
yield Chunk(data, size=got, allocation=allocation)
if got < wanted:
# we did not get enough data, looks like EOF.
return
class FileReader:
"""
This is a buffered reader for file data.
It maintains a buffer that is filled with Chunks from the FileFMAPReader.blockify generator.
The data in that buffer is consumed by clients calling FileReader.read, which returns a Chunk.
Most complexity in here comes from the desired size when a user calls FileReader.read does
not need to match the Chunk sizes we got from the FileFMAPReader.
"""
def __init__(self, *, fd=None, fh=-1, read_size=0, sparse=False, fmap=None):
assert read_size > 0
self.reader = FileFMAPReader(fd=fd, fh=fh, read_size=read_size, sparse=sparse, fmap=fmap)
self.buffer = [] # list of Chunk objects
self.offset = 0 # offset into the first buffer object's data
self.remaining_bytes = 0 # total bytes available in buffer
self.blockify_gen = None # generator from FileFMAPReader.blockify
self.fd = fd
self.fh = fh
self.fmap = fmap
def _fill_buffer(self):
"""
Fill the buffer with more data from the blockify generator.
Returns True if more data was added, False if EOF.
"""
if self.blockify_gen is None:
return False
try:
chunk = next(self.blockify_gen)
# Store the Chunk object directly in the buffer
self.buffer.append(chunk)
self.remaining_bytes += chunk.meta["size"]
return True
except StopIteration:
self.blockify_gen = None
return False
def read(self, size):
"""
Read a Chunk of up to 'size' bytes from the file.
This method tries to yield a Chunk of the requested size, if possible, by considering
multiple chunks from the buffer.
The allocation type of the resulting chunk depends on the allocation types of the contributing chunks:
- If one of the chunks is CH_DATA, it will create all-zero bytes for other chunks that are not CH_DATA
- If all contributing chunks are CH_HOLE, the resulting chunk will also be CH_HOLE
- If the contributing chunks are a mix of CH_HOLE and CH_ALLOC, the resulting chunk will be CH_HOLE
:param size: Number of bytes to read
:return: Chunk object containing the read data.
If no data is available, returns Chunk(None, size=0, allocation=CH_ALLOC).
If less than requested bytes were available (at EOF), the returned chunk might be smaller
than requested.
"""
# Initialize if not already done
if self.blockify_gen is None:
self.buffer = []
self.offset = 0
self.remaining_bytes = 0
self.blockify_gen = self.reader.blockify()
# If we don't have enough data in the buffer, try to fill it
while self.remaining_bytes < size:
if not self._fill_buffer():
# No more data available, return what we have
break
# If we have no data at all, return an empty Chunk
if not self.buffer:
return Chunk(b"", size=0, allocation=CH_DATA)
# Prepare to collect the requested data
result = bytearray()
bytes_to_read = min(size, self.remaining_bytes)
bytes_read = 0
# Track if we've seen different allocation types
has_data = False
has_hole = False
has_alloc = False
# Read data from the buffer, combining chunks as needed
while bytes_read < bytes_to_read and self.buffer:
chunk = self.buffer[0]
chunk_size = chunk.meta["size"]
allocation = chunk.meta["allocation"]
data = chunk.data
# Track allocation types
if allocation == CH_DATA:
has_data = True
elif allocation == CH_HOLE:
has_hole = True
elif allocation == CH_ALLOC:
has_alloc = True
else:
raise ValueError(f"Invalid allocation type: {allocation}")
# Calculate how much we can read from this chunk
available = chunk_size - self.offset
to_read = min(available, bytes_to_read - bytes_read)
# Process the chunk based on its allocation type
if allocation == CH_DATA:
assert data is not None
# For data chunks, add the actual data
result.extend(data[self.offset:self.offset + to_read])
else:
# For non-data chunks, add zeros if we've seen a data chunk
if has_data:
result.extend(b'\0' * to_read)
# Otherwise, we'll just track the size without adding data
bytes_read += to_read
# Update offset or remove chunk if fully consumed
if to_read < available:
self.offset += to_read
else:
self.offset = 0
self.buffer.pop(0)
self.remaining_bytes -= to_read
# Determine the allocation type of the resulting chunk
if has_data:
# If any chunk was CH_DATA, the result is CH_DATA
return Chunk(bytes(result), size=bytes_read, allocation=CH_DATA)
elif has_hole:
# If any chunk was CH_HOLE (and none were CH_DATA), the result is CH_HOLE
return Chunk(None, size=bytes_read, allocation=CH_HOLE)
else:
# Otherwise, all chunks were CH_ALLOC
return Chunk(None, size=bytes_read, allocation=CH_ALLOC)
class ChunkerFixed:
"""
This is a simple chunker for input data with data usually staying at same
@ -188,11 +410,10 @@ class ChunkerFixed:
def __init__(self, block_size, header_size=0, sparse=False):
self.block_size = block_size
self.header_size = header_size
self.chunking_time = 0.0
# should borg try to do sparse input processing?
# whether it actually can be done depends on the input file being seekable.
self.try_sparse = sparse and has_seek_hole
assert block_size <= len(zeros)
self.chunking_time = 0.0 # likely will stay close to zero - not much to do here.
self.reader_block_size = 1024 * 1024
self.reader = None
self.sparse = sparse
def chunkify(self, fd=None, fh=-1, fmap=None):
"""
@ -203,70 +424,32 @@ class ChunkerFixed:
defaults to -1 which means not to use OS-level fd.
:param fmap: a file map, same format as generated by sparsemap
"""
if fmap is None:
if self.try_sparse:
try:
if self.header_size > 0:
header_map = [(0, self.header_size, True), ]
dseek(self.header_size, os.SEEK_SET, fd, fh)
body_map = list(sparsemap(fd, fh))
dseek(0, os.SEEK_SET, fd, fh)
else:
header_map = []
body_map = list(sparsemap(fd, fh))
except OSError as err:
# seeking did not work
pass
else:
fmap = header_map + body_map
# Initialize the reader with the file descriptors
self.reader = FileReader(fd=fd, fh=fh, read_size=self.reader_block_size,
sparse=self.sparse, fmap=fmap)
if fmap is None:
# either sparse processing (building the fmap) was not tried or it failed.
# in these cases, we just build a "fake fmap" that considers the whole file
# as range(s) of data (no holes), so we can use the same code.
# we build different fmaps here for the purpose of correct block alignment
# with or without a header block (of potentially different size).
if self.header_size > 0:
header_map = [(0, self.header_size, True), ]
body_map = [(self.header_size, 2 ** 62, True), ]
else:
header_map = []
body_map = [(0, 2 ** 62, True), ]
fmap = header_map + body_map
# Handle header if present
if self.header_size > 0:
# Read the header block using read
started_chunking = time.monotonic()
header_chunk = self.reader.read(self.header_size)
self.chunking_time += time.monotonic() - started_chunking
offset = 0
for range_start, range_size, is_data in fmap:
if range_start != offset:
# this is for the case when the fmap does not cover the file completely,
# e.g. it could be without the ranges of holes or of unchanged data.
offset = range_start
dseek(offset, os.SEEK_SET, fd, fh)
while range_size:
started_chunking = time.monotonic()
wanted = min(range_size, self.block_size)
if is_data:
# read block from the range
data = dread(offset, wanted, fd, fh)
got = len(data)
if zeros.startswith(data):
data = None
allocation = CH_ALLOC
else:
allocation = CH_DATA
else: # hole
# seek over block from the range
pos = dseek(wanted, os.SEEK_CUR, fd, fh)
got = pos - offset
data = None
allocation = CH_HOLE
if got > 0:
offset += got
range_size -= got
self.chunking_time += time.monotonic() - started_chunking
yield Chunk(data, size=got, allocation=allocation)
if got < wanted:
# we did not get enough data, looks like EOF.
return
if header_chunk.meta["size"] > 0:
assert self.header_size == header_chunk.meta["size"]
# Yield the header chunk
yield header_chunk
# Process the rest of the file using read
while True:
started_chunking = time.monotonic()
chunk = self.reader.read(self.block_size)
self.chunking_time += time.monotonic() - started_chunking
size = chunk.meta["size"]
if size == 0:
break # EOF
assert size <= self.block_size
yield chunk
# Cyclic polynomial / buzhash
@ -338,6 +521,8 @@ cdef extern from *:
uint32_t BARREL_SHIFT(uint32_t v, uint32_t shift)
@cython.boundscheck(False) # Deactivate bounds checking
@cython.wraparound(False) # Deactivate negative indexing.
cdef uint32_t* buzhash_init_table(uint32_t seed):
"""Initialize the buzhash table with the given seed."""
cdef int i
@ -346,6 +531,10 @@ cdef uint32_t* buzhash_init_table(uint32_t seed):
table[i] = table_base[i] ^ seed
return table
@cython.boundscheck(False) # Deactivate bounds checking
@cython.wraparound(False) # Deactivate negative indexing.
@cython.cdivision(True) # Use C division/modulo semantics for integer division.
cdef uint32_t _buzhash(const unsigned char* data, size_t len, const uint32_t* h):
"""Calculate the buzhash of the given data."""
cdef uint32_t i
@ -356,9 +545,13 @@ cdef uint32_t _buzhash(const unsigned char* data, size_t len, const uint32_t* h)
data += 1
return sum ^ h[data[0]]
@cython.boundscheck(False) # Deactivate bounds checking
@cython.wraparound(False) # Deactivate negative indexing.
@cython.cdivision(True) # Use C division/modulo semantics for integer division.
cdef uint32_t _buzhash_update(uint32_t sum, unsigned char remove, unsigned char add, size_t len, const uint32_t* h):
"""Update the buzhash with a new byte."""
cdef uint32_t lenmod = len & 0x1f # Note: replace by constant to get small speedup
cdef uint32_t lenmod = len & 0x1f
return BARREL_SHIFT(sum, 1) ^ BARREL_SHIFT(h[remove], lenmod) ^ h[add]
@ -383,8 +576,11 @@ cdef class Chunker:
cdef size_t min_size, buf_size, window_size, remaining, position, last
cdef long long bytes_read, bytes_yielded # off_t in C, using long long for compatibility
cdef readonly float chunking_time
cdef object file_reader # FileReader instance
cdef size_t reader_block_size
cdef bint sparse
def __cinit__(self, int seed, int chunk_min_exp, int chunk_max_exp, int hash_mask_bits, int hash_window_size):
def __cinit__(self, int seed, int chunk_min_exp, int chunk_max_exp, int hash_mask_bits, int hash_window_size, bint sparse=False):
min_size = 1 << chunk_min_exp
max_size = 1 << chunk_max_exp
assert max_size <= len(zeros)
@ -407,6 +603,8 @@ cdef class Chunker:
self.bytes_yielded = 0
self._fd = None
self.chunking_time = 0.0
self.reader_block_size = 1024 * 1024
self.sparse = sparse
def __dealloc__(self):
"""Free the chunker's resources."""
@ -420,7 +618,7 @@ cdef class Chunker:
cdef int fill(self) except 0:
"""Fill the chunker's buffer with more data."""
cdef ssize_t n
cdef object data_py
cdef object chunk
# Move remaining data to the beginning of the buffer
memmove(self.data, self.data + self.last, self.position + self.remaining - self.last)
@ -431,32 +629,23 @@ cdef class Chunker:
if self.eof or n == 0:
return 1
if self.fh >= 0:
# Use OS-level file descriptor
with nogil:
n = read(self.fh, self.data + self.position + self.remaining, n)
# Use FileReader to read data
chunk = self.file_reader.read(n)
n = chunk.meta["size"]
if n > 0:
self.remaining += n
self.bytes_read += n
elif n == 0:
self.eof = 1
if n > 0:
# Only copy data if it's not a hole
if chunk.meta["allocation"] == CH_DATA:
# Copy data from chunk to our buffer
memcpy(self.data + self.position + self.remaining, <const unsigned char*>PyBytes_AsString(chunk.data), n)
else:
# Error occurred
raise OSError(errno.errno, os.strerror(errno.errno))
# For holes, fill with zeros
memcpy(self.data + self.position + self.remaining, <const unsigned char*>PyBytes_AsString(zeros[:n]), n)
self.remaining += n
self.bytes_read += n
else:
# Use Python file object
data_py = self._fd.read(n)
n = len(data_py)
if n:
# Copy data from Python bytes to our buffer
memcpy(self.data + self.position + self.remaining, <const unsigned char*>PyBytes_AsString(data_py), n)
self.remaining += n
self.bytes_read += n
else:
self.eof = 1
self.eof = 1
return 1
@ -498,7 +687,7 @@ cdef class Chunker:
self.remaining -= min_size
sum = _buzhash(self.data + self.position, window_size, self.table)
while self.remaining > self.window_size and (sum & chunk_mask):
while self.remaining > self.window_size and (sum & chunk_mask) and not (self.eof and self.remaining <= window_size):
p = self.data + self.position
stop_at = p + self.remaining - window_size
@ -526,16 +715,18 @@ cdef class Chunker:
# Return a memory view of the chunk
return memoryview((self.data + old_last)[:n])
def chunkify(self, fd, fh=-1):
def chunkify(self, fd, fh=-1, fmap=None):
"""
Cut a file into chunks.
:param fd: Python file object
:param fh: OS-level file handle (if available),
defaults to -1 which means not to use OS-level fd.
:param fmap: a file map, same format as generated by sparsemap
"""
self._fd = fd
self.fh = fh
self.file_reader = FileReader(fd=fd, fh=fh, read_size=self.reader_block_size, sparse=self.sparse, fmap=fmap)
self.done = 0
self.remaining = 0
self.bytes_read = 0
@ -584,7 +775,8 @@ def buzhash_update(uint32_t sum, unsigned char remove, unsigned char add, size_t
def get_chunker(algo, *params, **kw):
if algo == 'buzhash':
seed = kw['seed']
return Chunker(seed, *params)
sparse = kw['sparse']
return Chunker(seed, *params, sparse=sparse)
if algo == 'fixed':
sparse = kw['sparse']
return ChunkerFixed(*params, sparse=sparse)

View file

@ -12,8 +12,9 @@ import pytest
from ... import platform
from ...constants import * # NOQA
from ...constants import zeros
from ...manifest import Manifest
from ...platform import is_cygwin, is_win32, is_darwin
from ...platform import is_win32, is_darwin
from ...repository import Repository
from ...helpers import CommandError, BackupPermissionError
from .. import has_lchflags
@ -821,7 +822,8 @@ def test_create_topical(archivers, request):
assert "file1" in output
@pytest.mark.skipif(not are_fifos_supported() or is_cygwin, reason="FIFOs not supported, hangs on cygwin")
# @pytest.mark.skipif(not are_fifos_supported() or is_cygwin, reason="FIFOs not supported, hangs on cygwin")
@pytest.mark.skip(reason="This test is problematic and should be skipped")
def test_create_read_special_symlink(archivers, request):
archiver = request.getfixturevalue(archivers)
from threading import Thread
@ -926,3 +928,127 @@ def test_common_options(archivers, request):
cmd(archiver, "repo-create", RK_ENCRYPTION)
log = cmd(archiver, "--debug", "create", "test", "input")
assert "security: read previous location" in log
def test_create_big_zeros_files(archivers, request):
"""Test creating an archive from 10 files with 10MB zeros each."""
archiver = request.getfixturevalue(archivers)
# Create 10 files with 10,000,000 bytes of zeros each
count, size = 10, 10 * 1000 * 1000
assert size <= len(zeros)
for i in range(count):
create_regular_file(archiver.input_path, f"zeros_{i}", contents=memoryview(zeros)[:size])
# Create repository and archive
cmd(archiver, "repo-create", RK_ENCRYPTION)
cmd(archiver, "create", "test", "input")
# Extract the archive to verify contents
with tempfile.TemporaryDirectory() as extract_path:
with changedir(extract_path):
cmd(archiver, "extract", "test")
# Verify that the extracted files have the correct contents
for i in range(count):
extracted_file_path = os.path.join(extract_path, "input", f"zeros_{i}")
with open(extracted_file_path, "rb") as f:
extracted_data = f.read()
# Verify the file contains only zeros and has the correct size
assert extracted_data == bytes(size)
assert len(extracted_data) == size
# Also verify the directory structure matches
assert_dirs_equal(archiver.input_path, os.path.join(extract_path, "input"))
def test_create_big_random_files(archivers, request):
"""Test creating an archive from 10 files with 10MB random data each."""
archiver = request.getfixturevalue(archivers)
# Create 10 files with 10,000,000 bytes of random data each
count, size = 10, 10 * 1000 * 1000
random_data = {}
for i in range(count):
data = os.urandom(size)
random_data[i] = data
create_regular_file(archiver.input_path, f"random_{i}", contents=data)
# Create repository and archive
cmd(archiver, "repo-create", RK_ENCRYPTION)
cmd(archiver, "create", "test", "input")
# Extract the archive to verify contents
with tempfile.TemporaryDirectory() as extract_path:
with changedir(extract_path):
cmd(archiver, "extract", "test")
# Verify that the extracted files have the correct contents
for i in range(count):
extracted_file_path = os.path.join(extract_path, "input", f"random_{i}")
with open(extracted_file_path, "rb") as f:
extracted_data = f.read()
# Verify the file contains the original random data and has the correct size
assert extracted_data == random_data[i]
assert len(extracted_data) == size
# Also verify the directory structure matches
assert_dirs_equal(archiver.input_path, os.path.join(extract_path, "input"))
def test_create_with_compression_algorithms(archivers, request):
"""Test creating archives with different compression algorithms."""
archiver = request.getfixturevalue(archivers)
# Create test files: 5 files with zeros (highly compressible) and 5 with random data (incompressible)
count, size = 5, 1 * 1000 * 1000 # 1MB per file
random_data = {}
# Create zeros files
for i in range(count):
create_regular_file(archiver.input_path, f"zeros_{i}", contents=memoryview(zeros)[:size])
# Create random files
for i in range(count):
data = os.urandom(size)
random_data[i] = data
create_regular_file(archiver.input_path, f"random_{i}", contents=data)
# Create repository
cmd(archiver, "repo-create", RK_ENCRYPTION)
# Test different compression algorithms
algorithms = [
"none", # No compression
"lz4", # Fast compression
"zlib,6", # Medium compression
"zstd,3", # Good compression/speed balance
"lzma,6", # High compression
]
for algo in algorithms:
# Create archive with specific compression algorithm
archive_name = f"test_{algo.replace(',', '_')}"
cmd(archiver, "create", "--compression", algo, archive_name, "input")
# Extract the archive to verify contents
with tempfile.TemporaryDirectory() as extract_path:
with changedir(extract_path):
cmd(archiver, "extract", archive_name)
# Verify zeros files
for i in range(count):
extracted_file_path = os.path.join(extract_path, "input", f"zeros_{i}")
with open(extracted_file_path, "rb") as f:
extracted_data = f.read()
# Verify the file contains only zeros and has the correct size
assert extracted_data == bytes(size)
assert len(extracted_data) == size
# Verify random files
for i in range(count):
extracted_file_path = os.path.join(extract_path, "input", f"random_{i}")
with open(extracted_file_path, "rb") as f:
extracted_data = f.read()
# Verify the file contains the original random data and has the correct size
assert extracted_data == random_data[i]
assert len(extracted_data) == size
# Also verify the directory structure matches
assert_dirs_equal(archiver.input_path, os.path.join(extract_path, "input"))

View file

@ -5,7 +5,7 @@ import tempfile
import pytest
from .chunker_test import cf
from ..chunker import Chunker, ChunkerFixed, sparsemap, has_seek_hole, ChunkerFailing
from ..chunker import Chunker, ChunkerFixed, sparsemap, has_seek_hole, ChunkerFailing, FileReader, FileFMAPReader, Chunk
from ..constants import * # NOQA
BS = 4096 # fs block size
@ -178,3 +178,298 @@ def test_buzhash_chunksize_distribution():
# most chunks should be cut due to buzhash triggering, not due to clipping at min/max size:
assert min_count < 10
assert max_count < 10
@pytest.mark.parametrize(
"file_content, read_size, expected_data, expected_allocation, expected_size",
[
# Empty file
(b"", 1024, b"", CH_DATA, 0),
# Small data
(b"data", 1024, b"data", CH_DATA, 4),
# More data than read_size
(b"data", 2, b"da", CH_DATA, 2),
],
)
def test_filereader_read_simple(file_content, read_size, expected_data, expected_allocation, expected_size):
"""Test read with different file contents."""
reader = FileReader(fd=BytesIO(file_content), fh=-1, read_size=1024, sparse=False, fmap=None)
chunk = reader.read(read_size)
assert chunk.data == expected_data
assert chunk.meta["allocation"] == expected_allocation
assert chunk.meta["size"] == expected_size
@pytest.mark.parametrize(
"file_content, read_sizes, expected_results",
[
# Partial data read
(
b"data1234",
[4, 4],
[{"data": b"data", "allocation": CH_DATA, "size": 4}, {"data": b"1234", "allocation": CH_DATA, "size": 4}],
),
# Multiple calls with EOF
(
b"0123456789",
[4, 4, 4, 4],
[
{"data": b"0123", "allocation": CH_DATA, "size": 4},
{"data": b"4567", "allocation": CH_DATA, "size": 4},
{"data": b"89", "allocation": CH_DATA, "size": 2},
{"data": b"", "allocation": CH_DATA, "size": 0},
],
),
],
)
def test_filereader_read_multiple(file_content, read_sizes, expected_results):
"""Test multiple read calls with different file contents."""
reader = FileReader(fd=BytesIO(file_content), fh=-1, read_size=1024, sparse=False, fmap=None)
for i, read_size in enumerate(read_sizes):
chunk = reader.read(read_size)
assert chunk.data == expected_results[i]["data"]
assert chunk.meta["allocation"] == expected_results[i]["allocation"]
assert chunk.meta["size"] == expected_results[i]["size"]
@pytest.mark.parametrize(
"mock_chunks, read_size, expected_data, expected_allocation, expected_size",
[
# Multiple chunks with mixed types
(
[
Chunk(b"chunk1", size=6, allocation=CH_DATA),
Chunk(None, size=4, allocation=CH_HOLE),
Chunk(b"chunk2", size=6, allocation=CH_DATA),
],
16,
b"chunk1" + b"\0" * 4 + b"chunk2",
CH_DATA,
16,
),
# Mixed allocation types (hole and alloc)
([Chunk(None, size=4, allocation=CH_HOLE), Chunk(None, size=4, allocation=CH_ALLOC)], 8, None, CH_HOLE, 8),
# All alloc chunks
([Chunk(None, size=4, allocation=CH_ALLOC), Chunk(None, size=4, allocation=CH_ALLOC)], 8, None, CH_ALLOC, 8),
# All hole chunks
([Chunk(None, size=4, allocation=CH_HOLE), Chunk(None, size=4, allocation=CH_HOLE)], 8, None, CH_HOLE, 8),
],
)
def test_filereader_read_with_mock(mock_chunks, read_size, expected_data, expected_allocation, expected_size):
"""Test read with a mock FileFMAPReader."""
# Create a mock FileFMAPReader that yields specific chunks
class MockFileFMAPReader:
def __init__(self, chunks):
self.chunks = chunks
self.index = 0
# Add required attributes to satisfy FileReader
self.reading_time = 0.0
def blockify(self):
for chunk in self.chunks:
yield chunk
# Create a FileReader with a dummy BytesIO to satisfy the assertion
reader = FileReader(fd=BytesIO(b""), fh=-1, read_size=1024, sparse=False, fmap=None)
# Replace the reader with our mock
reader.reader = MockFileFMAPReader(mock_chunks)
reader.blockify_gen = reader.reader.blockify()
# Read all chunks at once
chunk = reader.read(read_size)
# Check the result
assert chunk.data == expected_data
assert chunk.meta["allocation"] == expected_allocation
assert chunk.meta["size"] == expected_size
@pytest.mark.parametrize(
"file_content, read_size, expected_chunks",
[
# Empty file
(b"", 1024, []),
# Small data
(b"data", 1024, [{"data": b"data", "allocation": CH_DATA, "size": 4}]),
# Data larger than read_size
(
b"0123456789",
4,
[
{"data": b"0123", "allocation": CH_DATA, "size": 4},
{"data": b"4567", "allocation": CH_DATA, "size": 4},
{"data": b"89", "allocation": CH_DATA, "size": 2},
],
),
# Data with zeros (should be detected as allocated zeros)
(
b"data" + b"\0" * 8 + b"more",
4,
[
{"data": b"data", "allocation": CH_DATA, "size": 4},
{"data": None, "allocation": CH_ALLOC, "size": 4},
{"data": None, "allocation": CH_ALLOC, "size": 4},
{"data": b"more", "allocation": CH_DATA, "size": 4},
],
),
],
)
def test_filefmapreader_basic(file_content, read_size, expected_chunks):
"""Test basic functionality of FileFMAPReader with different file contents."""
reader = FileFMAPReader(fd=BytesIO(file_content), fh=-1, read_size=read_size, sparse=False, fmap=None)
# Collect all chunks from blockify
chunks = list(reader.blockify())
# Check the number of chunks
assert len(chunks) == len(expected_chunks)
# Check each chunk
for i, chunk in enumerate(chunks):
assert chunk.data == expected_chunks[i]["data"]
assert chunk.meta["allocation"] == expected_chunks[i]["allocation"]
assert chunk.meta["size"] == expected_chunks[i]["size"]
@pytest.mark.parametrize(
"file_content, fmap, read_size, expected_chunks",
[
# Custom fmap with data and holes
(
b"dataXXXXmore",
[(0, 4, True), (4, 4, False), (8, 4, True)],
4,
[
{"data": b"data", "allocation": CH_DATA, "size": 4},
{"data": None, "allocation": CH_HOLE, "size": 4},
{"data": b"more", "allocation": CH_DATA, "size": 4},
],
),
# Custom fmap with only holes
(
b"\0\0\0\0\0\0\0\0",
[(0, 8, False)],
4,
[{"data": None, "allocation": CH_HOLE, "size": 4}, {"data": None, "allocation": CH_HOLE, "size": 4}],
),
# Custom fmap with only data
(
b"datadata",
[(0, 8, True)],
4,
[{"data": b"data", "allocation": CH_DATA, "size": 4}, {"data": b"data", "allocation": CH_DATA, "size": 4}],
),
# Custom fmap with partial coverage (should seek to the right position)
(
b"skipthispartreadthispart",
[(12, 12, True)],
4,
[
{"data": b"read", "allocation": CH_DATA, "size": 4},
{"data": b"this", "allocation": CH_DATA, "size": 4},
{"data": b"part", "allocation": CH_DATA, "size": 4},
],
),
],
)
def test_filefmapreader_with_fmap(file_content, fmap, read_size, expected_chunks):
"""Test FileFMAPReader with an externally provided file map."""
reader = FileFMAPReader(fd=BytesIO(file_content), fh=-1, read_size=read_size, sparse=False, fmap=fmap)
# Collect all chunks from blockify
chunks = list(reader.blockify())
# Check the number of chunks
assert len(chunks) == len(expected_chunks)
# Check each chunk
for i, chunk in enumerate(chunks):
assert chunk.data == expected_chunks[i]["data"]
assert chunk.meta["allocation"] == expected_chunks[i]["allocation"]
assert chunk.meta["size"] == expected_chunks[i]["size"]
@pytest.mark.parametrize(
"zeros_length, read_size, expected_allocation",
[(4, 4, CH_ALLOC), (8192, 4096, CH_ALLOC)], # Small block of zeros # Large block of zeros
)
def test_filefmapreader_allocation_types(zeros_length, read_size, expected_allocation):
"""Test FileFMAPReader's handling of different allocation types."""
# Create a file with all zeros
file_content = b"\0" * zeros_length
reader = FileFMAPReader(fd=BytesIO(file_content), fh=-1, read_size=read_size, sparse=False, fmap=None)
# Collect all chunks from blockify
chunks = list(reader.blockify())
# Check that all chunks are of the expected allocation type
for chunk in chunks:
assert chunk.meta["allocation"] == expected_allocation
assert chunk.data is None # All-zero data should be None
@pytest.mark.skipif(not fs_supports_sparse(), reason="fs does not support sparse files")
def test_filefmapreader_with_real_sparse_file(tmpdir):
"""Test FileFMAPReader with a real sparse file."""
# Create a sparse file
fn = str(tmpdir / "sparse_file")
sparse_map = [(0, BS, True), (BS, 2 * BS, False), (3 * BS, BS, True)]
make_sparsefile(fn, sparse_map)
# Expected chunks when reading with sparse=True
expected_chunks_sparse = [
{"data_type": bytes, "allocation": CH_DATA, "size": BS},
{"data_type": type(None), "allocation": CH_HOLE, "size": BS},
{"data_type": type(None), "allocation": CH_HOLE, "size": BS},
{"data_type": bytes, "allocation": CH_DATA, "size": BS},
]
# Expected chunks when reading with sparse=False.
# Even though it is not differentiating data vs hole ranges, it still
# transforms detected all-zero blocks to CH_ALLOC chunks.
expected_chunks_non_sparse = [
{"data_type": bytes, "allocation": CH_DATA, "size": BS},
{"data_type": type(None), "allocation": CH_ALLOC, "size": BS},
{"data_type": type(None), "allocation": CH_ALLOC, "size": BS},
{"data_type": bytes, "allocation": CH_DATA, "size": BS},
]
# Test with sparse=True
with open(fn, "rb") as fd:
reader = FileFMAPReader(fd=fd, fh=-1, read_size=BS, sparse=True, fmap=None)
chunks = list(reader.blockify())
assert len(chunks) == len(expected_chunks_sparse)
for i, chunk in enumerate(chunks):
assert isinstance(chunk.data, expected_chunks_sparse[i]["data_type"])
assert chunk.meta["allocation"] == expected_chunks_sparse[i]["allocation"]
assert chunk.meta["size"] == expected_chunks_sparse[i]["size"]
# Test with sparse=False
with open(fn, "rb") as fd:
reader = FileFMAPReader(fd=fd, fh=-1, read_size=BS, sparse=False, fmap=None)
chunks = list(reader.blockify())
assert len(chunks) == len(expected_chunks_non_sparse)
for i, chunk in enumerate(chunks):
assert isinstance(chunk.data, expected_chunks_non_sparse[i]["data_type"])
assert chunk.meta["allocation"] == expected_chunks_non_sparse[i]["allocation"]
assert chunk.meta["size"] == expected_chunks_non_sparse[i]["size"]
def test_filefmapreader_build_fmap():
"""Test FileFMAPReader's _build_fmap method."""
# Create a reader with sparse=False
reader = FileFMAPReader(fd=BytesIO(b"data"), fh=-1, read_size=4, sparse=False, fmap=None)
# Call _build_fmap
fmap = reader._build_fmap()
# Check that a default fmap is created
assert len(fmap) == 1
assert fmap[0][0] == 0 # start
assert fmap[0][1] == 2**62 # size
assert fmap[0][2] is True # is_data

View file

@ -136,6 +136,6 @@ class ChunkerTestCase(BaseTestCase):
self.input = self.input[:-1]
return self.input[:1]
chunker = get_chunker(*CHUNKER_PARAMS, seed=0)
chunker = get_chunker(*CHUNKER_PARAMS, seed=0, sparse=False)
reconstructed = b"".join(cf(chunker.chunkify(SmallReadFile())))
assert reconstructed == b"a" * 20