diff --git a/src/borg/chunker.pyi b/src/borg/chunker.pyi index 1a4536d03..1ba388046 100644 --- a/src/borg/chunker.pyi +++ b/src/borg/chunker.pyi @@ -21,10 +21,33 @@ class ChunkerFailing: def __init__(self, block_size: int, map: str) -> None: ... def chunkify(self, fd: BinaryIO = None, fh: int = -1) -> Iterator: ... +class FileFMAPReader: + def __init__( + self, + *, + fd: BinaryIO = None, + fh: int = -1, + read_size: int = 0, + header_size: int = 0, + sparse: bool = False, + fmap: List[fmap_entry] = None, + ) -> None: ... + def _build_fmap(self) -> List[fmap_entry]: ... + def blockify(self) -> Iterator: ... + class FileReader: - def __init__(self, block_size: int, header_size: int = 0, sparse: bool = False) -> None: ... - def _build_fmap(self, fd: BinaryIO = None, fh: int = -1) -> List[fmap_entry]: ... - def blockify(self, fd: BinaryIO = None, fh: int = -1, fmap: List[fmap_entry] = None) -> Iterator: ... + def __init__( + self, + *, + fd: BinaryIO = None, + fh: int = -1, + read_size: int = 0, + header_size: int = 0, + sparse: bool = False, + fmap: List[fmap_entry] = None, + ) -> None: ... + def _fill_buffer(self) -> bool: ... + def read(self, size: int, return_chunk_info: bool = False) -> Any: ... class ChunkerFixed: def __init__(self, block_size: int, header_size: int = 0, sparse: bool = False) -> None: ... diff --git a/src/borg/chunker.pyx b/src/borg/chunker.pyx index 0c37ab7e1..ed3ae22db 100644 --- a/src/borg/chunker.pyx +++ b/src/borg/chunker.pyx @@ -165,7 +165,7 @@ class ChunkerFailing: return -class FileReader: +class FileFMAPReader: """ This is for reading blocks from a file. @@ -180,29 +180,34 @@ class FileReader: Note: the last block of a data or hole range may be less than the block size, this is supported and not considered to be an error. """ - def __init__(self, read_size, header_size=0, sparse=False): - self.read_size = read_size # how much data we want to read at once + def __init__(self, *, fd=None, fh=-1, read_size=0, header_size=0, sparse=False, fmap=None): + assert fd is not None or fh >= 0 + self.fd = fd + self.fh = fh + assert read_size > 0 assert read_size <= len(zeros) + self.read_size = read_size # how much data we want to read at once + assert header_size <= read_size self.header_size = header_size # size of the first block - assert read_size >= header_size self.reading_time = 0.0 # time spent in reading/seeking # should borg try to do sparse input processing? # whether it actually can be done depends on the input file being seekable. self.try_sparse = sparse and has_seek_hole + self.fmap = fmap - def _build_fmap(self, fd=None, fh=-1): + def _build_fmap(self): started_fmap = time.monotonic() fmap = None if self.try_sparse: try: if self.header_size > 0: header_map = [(0, self.header_size, True), ] - dseek(self.header_size, os.SEEK_SET, fd, fh) - body_map = list(sparsemap(fd, fh)) - dseek(0, os.SEEK_SET, fd, fh) + dseek(self.header_size, os.SEEK_SET, self.fd, self.fh) + body_map = list(sparsemap(self.fd, self.fh)) + dseek(0, os.SEEK_SET, self.fd, self.fh) else: header_map = [] - body_map = list(sparsemap(fd, fh)) + body_map = list(sparsemap(self.fd, self.fh)) except OSError as err: # seeking did not work pass @@ -225,30 +230,27 @@ class FileReader: self.reading_time += time.monotonic() - started_fmap return fmap - def blockify(self, fd=None, fh=-1, fmap=None): + def blockify(self): """ Read sized blocks from a file, optionally supporting a differently sized header block. - - :param fd: Python file object - :param fh: OS-level file handle (if available), - defaults to -1 which means not to use OS-level fd. - :param fmap: a file map, same format as generated by sparsemap """ - fmap =self._build_fmap(fd, fh) if fmap is None else fmap + if self.fmap is None: + self.fmap = self._build_fmap() + offset = 0 # note: the optional header block is implemented via the first fmap entry - for range_start, range_size, is_data in fmap: + for range_start, range_size, is_data in self.fmap: if range_start != offset: # this is for the case when the fmap does not cover the file completely, # e.g. it could be without the ranges of holes or of unchanged data. offset = range_start - dseek(offset, os.SEEK_SET, fd, fh) + dseek(offset, os.SEEK_SET, self.fd, self.fh) while range_size: started_reading = time.monotonic() wanted = min(range_size, self.read_size) if is_data: # read block from the range - data = dread(offset, wanted, fd, fh) + data = dread(offset, wanted, self.fd, self.fh) got = len(data) if zeros.startswith(data): data = None @@ -257,20 +259,164 @@ class FileReader: allocation = CH_DATA else: # hole # seek over block from the range - pos = dseek(wanted, os.SEEK_CUR, fd, fh) + pos = dseek(wanted, os.SEEK_CUR, self.fd, self.fh) got = pos - offset data = None allocation = CH_HOLE + self.reading_time += time.monotonic() - started_reading if got > 0: offset += got range_size -= got - self.reading_time += time.monotonic() - started_reading yield Chunk(data, size=got, allocation=allocation) if got < wanted: # we did not get enough data, looks like EOF. return +class FileReader: + """ + This is a buffered reader for file data. + + It maintains a buffer that is filled by using FileFMAPReader.blockify generator when needed. + The data in that buffer is consumed by clients calling FileReader.read. + """ + def __init__(self, *, fd=None, fh=-1, read_size=0, header_size=0, sparse=False, fmap=None): + self.reader = FileFMAPReader(fd=fd, fh=fh, read_size=read_size, header_size=header_size, sparse=sparse, fmap=fmap) + self.buffer = [] # list of (data, meta) tuples + self.offset = 0 # offset into the first buffer object's data + self.remaining_bytes = 0 # total bytes available in buffer + self.blockify_gen = None # generator from FileFMAPReader.blockify + self.fd = fd + self.fh = fh + self.fmap = fmap + + def _fill_buffer(self): + """ + Fill the buffer with more data from the blockify generator. + Returns True if more data was added, False if EOF. + """ + if self.blockify_gen is None: + return False + + try: + chunk = next(self.blockify_gen) + # Store both data and metadata in the buffer + self.buffer.append((chunk.data, chunk.meta)) + self.remaining_bytes += chunk.meta["size"] + return True + except StopIteration: + self.blockify_gen = None + return False + + def read(self, size, return_chunk_info=False): + """ + Read up to 'size' bytes from the file. + + :param size: Number of bytes to read + :param return_chunk_info: if True, return a tuple (data, allocation, size) instead of just data + :return: Bytes object containing the read data, or None if no data is available. + If return_chunk_info is True, returns a tuple (data, allocation, size). + """ + # Initialize if not already done + if self.blockify_gen is None: + self.buffer = [] + self.offset = 0 + self.remaining_bytes = 0 + self.blockify_gen = self.reader.blockify() + + # If we don't have enough data in the buffer, try to fill it + while self.remaining_bytes < size: + if not self._fill_buffer(): + # No more data available, return what we have + break + + # If we have no data at all, return None + if not self.buffer: + return None if not return_chunk_info else (None, None, 0) + + # Get the first chunk from the buffer + data, meta = self.buffer[0] + chunk_size = meta["size"] + allocation = meta["allocation"] + + # If we're returning chunk info and this is a non-data chunk, handle it specially + if return_chunk_info and (allocation != CH_DATA or data is None): + # For non-data chunks, we return the allocation type and size + size_to_return = min(size, chunk_size - self.offset) + + # Update buffer state + if size_to_return == chunk_size - self.offset: + self.buffer.pop(0) + self.offset = 0 + else: + self.offset += size_to_return + + self.remaining_bytes -= size_to_return + + return (None, allocation, size_to_return) + + # For data chunks or when not returning chunk info, proceed as before + # Prepare to collect the requested data + result = bytearray() + bytes_to_read = min(size, self.remaining_bytes) + bytes_read = 0 + + # Read data from the buffer + while bytes_read < bytes_to_read and self.buffer: + data, meta = self.buffer[0] + chunk_size = meta["size"] + allocation = meta["allocation"] + + # Skip non-data chunks if not returning chunk info + if (allocation != CH_DATA or data is None) and not return_chunk_info: + self.buffer.pop(0) + self.remaining_bytes -= chunk_size + continue + + # If this is a non-data chunk and we're returning chunk info, break to handle it + if (allocation != CH_DATA or data is None) and return_chunk_info: + if bytes_read > 0: + # We've already read some data, so return that first + break + else: + # No data read yet, return info about this non-data chunk + size_to_return = min(size, chunk_size - self.offset) + + # Update buffer state + if size_to_return == chunk_size - self.offset: + self.buffer.pop(0) + self.offset = 0 + else: + self.offset += size_to_return + + self.remaining_bytes -= size_to_return + + return (None, allocation, size_to_return) + + # Calculate how much we can read from this chunk + available = chunk_size - self.offset + to_read = min(available, bytes_to_read - bytes_read) + + # Read the data + if to_read > 0: + result.extend(data[self.offset:self.offset + to_read]) + bytes_read += to_read + + # Update offset or remove chunk if fully consumed + if to_read < available: + self.offset += to_read + else: + self.offset = 0 + self.buffer.pop(0) + + self.remaining_bytes -= to_read + + if return_chunk_info: + return (bytes(result) if result else None, CH_DATA, bytes_read) + else: + return bytes(result) if result else None + + class ChunkerFixed: """ This is a simple chunker for input data with data usually staying at same @@ -297,7 +443,8 @@ class ChunkerFixed: self.chunking_time = 0.0 # likely will stay close to zero - not much to do here. self.reader_block_size = self.block_size # start simple assert self.reader_block_size % self.block_size == 0, "reader_block_size must be N * block_size" - self.reader = FileReader(self.reader_block_size, header_size=self.header_size, sparse=sparse) + self.reader = None + self.sparse = sparse def chunkify(self, fd=None, fh=-1, fmap=None): """ @@ -308,35 +455,39 @@ class ChunkerFixed: defaults to -1 which means not to use OS-level fd. :param fmap: a file map, same format as generated by sparsemap """ - in_header = self.header_size > 0 # first block is header, if header size is given - for block in self.reader.blockify(fd, fh, fmap): - if in_header: - assert self.header_size == block.meta["size"] - yield block # just pass through the header block we get from the reader - in_header = False - continue - # not much to do in here - if self.reader_block_size == self.block_size: - # trivial, the reader already did all the work - yield block # just pass through, avoid creating new objects - else: - # reader block size is a multiple of our block size - read_size = block.meta["size"] - allocation = block.meta["allocation"] - start = 0 - while read_size: - started_chunking = time.monotonic() - size = min(read_size, self.block_size) - if allocation == CH_DATA: - data = block.data[start:start+size] # TODO memoryview? - elif allocation in (CH_ALLOC, CH_HOLE): - data = None - else: - raise ValueError("unsupported allocation") - self.chunking_time += time.monotonic() - started_chunking - yield Chunk(data, size=size, allocation=allocation) - start += size - read_size -= size + # Initialize the reader with the file descriptors + self.reader = FileReader(fd=fd, fh=fh, read_size=self.reader_block_size, + header_size=self.header_size, sparse=self.sparse, fmap=fmap) + + # Handle header if present + if self.header_size > 0: + # Read the header block using read + started_chunking = time.monotonic() + header_info = self.reader.read(self.header_size, return_chunk_info=True) + self.chunking_time += time.monotonic() - started_chunking + + if header_info is not None and header_info[2] > 0: + # Unpack the header info + data, allocation, size = header_info + assert self.header_size == size + # Yield the header chunk + yield Chunk(data, size=size, allocation=allocation) + + # Process the rest of the file using read + while True: + started_chunking = time.monotonic() + chunk_info = self.reader.read(self.block_size, return_chunk_info=True) + self.chunking_time += time.monotonic() - started_chunking + + if chunk_info is None or chunk_info[2] == 0: + # End of file + break + + # Unpack the chunk info + data, allocation, size = chunk_info + + # Yield the chunk with the appropriate allocation type + yield Chunk(data, size=size, allocation=allocation) # Cyclic polynomial / buzhash