diff --git a/src/borg/archive.py b/src/borg/archive.py index c2c929443..7f628adb9 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -2012,18 +2012,16 @@ class ArchiveChecker: # either we already have this chunk in repo and chunks index or we add it now if id_ not in self.chunks: assert cdata is not None - pack_id = id_ # only correct for N=1 self.chunks[id_] = ChunkIndexEntry( flags=ChunkIndex.F_USED, size=size, - pack_id=pack_id, + pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32, ) if self.repair: pack_results = self.repository.put(id_, cdata) - for chunk_id, pack_id, obj_offset, obj_size in pack_results: - self.chunks.update_pack_info(chunk_id, pack_id, obj_offset, obj_size) + self.chunks.update_pack_info(pack_results) def verify_file_chunks(archive_name, item): """Verifies that all file chunks are present. Missing file chunks will be logged.""" diff --git a/src/borg/cache.py b/src/borg/cache.py index 014177bca..b52fe31cb 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -733,10 +733,7 @@ class ChunksMixin: pack_results = self.repository.put(id, cdata, wait=wait) self.last_refresh_dt = now # .put also refreshed the lock self.chunks.add(id, size) - # pack_results is non-empty when the PackWriter flushed (always at max_count=1). - # Update the index with real pack location so obj_size is not UNKNOWN_INT32. - for chunk_id, pack_id, obj_offset, obj_size in pack_results: - self.chunks.update_pack_info(chunk_id, pack_id, obj_offset, obj_size) + self.chunks.update_pack_info(pack_results) stats.update(size, not exists) return ChunkListEntry(id, size) @@ -834,6 +831,11 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin): def close(self): self.security_manager.save(self.manifest, self.key) pi = ProgressIndicatorMessage(msgid="cache.close") + # Flush any chunks still buffered in the pack writer and update the index + # so the last batch gets real pack location values instead of UNKNOWN_*. + if self._chunks is not None: + pack_results = self.repository.flush() + self._chunks.update_pack_info(pack_results) if self._files is not None: pi.output("Saving files cache") integrity_data = self._write_files_cache(self._files) diff --git a/src/borg/constants.py b/src/borg/constants.py index 28e9010c8..262ac4d27 100644 --- a/src/borg/constants.py +++ b/src/borg/constants.py @@ -55,6 +55,9 @@ MAX_DATA_SIZE = 20971479 # Grep for UNKNOWN_INT32 to find every site that still needs updating. UNKNOWN_INT32 = 0xFFFFFFFF +# Placeholder for pack_id (32-byte field) when the value is not yet known. +UNKNOWN_BYTES32 = b"\xff" * 32 + # MAX_OBJECT_SIZE = MAX_DATA_SIZE + len(PUT header) MAX_OBJECT_SIZE = MAX_DATA_SIZE + 41 # see assertion at end of repository module diff --git a/src/borg/hashindex.pyi b/src/borg/hashindex.pyi index 359db0c0e..a947a3d54 100644 --- a/src/borg/hashindex.pyi +++ b/src/borg/hashindex.pyi @@ -21,7 +21,7 @@ class ChunkIndex: M_USER: int M_SYSTEM: int def add(self, key: bytes, size: int) -> None: ... - def update_pack_info(self, key: bytes, pack_id: bytes, obj_offset: int, obj_size: int) -> None: ... + def update_pack_info(self, pack_results: list | None) -> None: ... def iteritems(self, *, only_new: bool = ...) -> Iterator: ... def clear_new(self) -> None: ... def __contains__(self, key: bytes) -> bool: ... diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index 261ddc262..cc280528e 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -5,7 +5,7 @@ import struct from borghash import HashTableNT -from .constants import UNKNOWN_INT32 +from .constants import UNKNOWN_INT32, UNKNOWN_BYTES32 @@ -81,9 +81,8 @@ class ChunkIndex(HTProxyMixin, MutableMapping): else: flags = v.flags | self.F_USED assert v.size == 0 or v.size == size - pack_id = key # N=1: chunk_id == pack_id self[key] = ChunkIndexEntry( - flags=flags, size=size, pack_id=pack_id, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + flags=flags, size=size, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) def __getitem__(self, key): @@ -109,10 +108,13 @@ class ChunkIndex(HTProxyMixin, MutableMapping): user_flags = value.flags & self.M_USER self.ht[key] = value._replace(flags=system_flags | user_flags) - def update_pack_info(self, key, pack_id, obj_offset, obj_size): - """Update the on-disk location fields for an existing entry without changing flags or size.""" - existing = self[key] - self[key] = existing._replace(pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size) + def update_pack_info(self, pack_results): + """Update the on-disk location fields for a list of (chunk_id, pack_id, obj_offset, obj_size) tuples.""" + if not pack_results: + return + for chunk_id, pack_id, obj_offset, obj_size in pack_results: + existing = self[chunk_id] + self[chunk_id] = existing._replace(pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size) def clear_new(self): """Clears the F_NEW flag of all items.""" diff --git a/src/borg/repository.py b/src/borg/repository.py index 2fe83e80f..63001124d 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -118,21 +118,21 @@ class PackWriter: self._pieces = [] # list of (chunk_id, cdata) def add(self, chunk_id, cdata): - """Buffer a chunk. Returns flush results if the pack is now full, else [].""" + """Buffer a chunk. Returns flush results if the pack is now full, else None.""" self._pieces.append((chunk_id, cdata)) if len(self._pieces) >= self.max_count: return self.flush() - return [] + return None def flush(self): """Write the current pack to the store. Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples — - one entry per chunk that was written. Returns [] if there was nothing + one entry per chunk that was written. Returns None if there was nothing to flush. """ if not self._pieces: - return [] + return None # Build the pack bytes once by joining all pieces (avoids O(n^2) copies # that incremental string concatenation would cause in Python). @@ -143,7 +143,7 @@ class PackWriter: # (backward-compatible file naming: packs/{chunk_id_hex}). # N>1: the pack contains multiple chunks; use SHA256(pack_bytes) so the # file is content-addressed and borgstore can verify/cache it. - if len(self._pieces) == 1: + if self.max_count == 1: pack_id = self._pieces[0][0] # N=1: pack_id == chunk_id else: pack_id = sha256(pack_data).digest() # N>1: content-addressed @@ -409,7 +409,20 @@ class Repository: self._pack_writer = PackWriter(self.store, max_count=1) self.opened = True + def flush(self): + """Flush any buffered pack writer chunks. Returns pack_results (or None). + + Callers that maintain a ChunkIndex must call this and pass the result to + chunks.update_pack_info() before closing, so index entries for the last + batch of chunks get real pack location values instead of UNKNOWN_*. + """ + if self._pack_writer is not None: + return self._pack_writer.flush() + return None + def close(self): + if self._pack_writer is not None: + assert not self._pack_writer._pieces, "PackWriter has unflushed chunks; call flush() before close()" if self.lock: self.lock.release() self.lock = None diff --git a/src/borg/testsuite/hashindex_test.py b/src/borg/testsuite/hashindex_test.py index 6ba88d53a..b503a7c61 100644 --- a/src/borg/testsuite/hashindex_test.py +++ b/src/borg/testsuite/hashindex_test.py @@ -3,7 +3,7 @@ import struct import pytest -from ..constants import UNKNOWN_INT32 +from ..constants import UNKNOWN_INT32, UNKNOWN_BYTES32 from ..hashindex import ChunkIndex, ChunkIndexEntry @@ -22,20 +22,41 @@ def test_chunkindex_add(): x = H2(1) chunks.add(x, 0) assert chunks[x] == ChunkIndexEntry( - flags=ChunkIndex.F_USED, size=0, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + flags=ChunkIndex.F_USED, size=0, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) chunks.add(x, 2) # updating size (we do not have a size yet) assert chunks[x] == ChunkIndexEntry( - flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) chunks.add(x, 2) assert chunks[x] == ChunkIndexEntry( - flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) with pytest.raises(AssertionError): chunks.add(x, 3) # inconsistent size (we already have a different size) +def test_chunkindex_update_pack_info(): + chunks = ChunkIndex() + x1, x2 = H2(1), H2(2) + chunks.add(x1, 10) + chunks.add(x2, 20) + assert chunks[x1].obj_offset == UNKNOWN_INT32 + assert chunks[x2].obj_offset == UNKNOWN_INT32 + + pack_id = H2(3) + # Both chunks land in the same pack (N>1 scenario): batch update in one call. + chunks.update_pack_info([(x1, pack_id, 0, 50), (x2, pack_id, 50, 60)]) + # Location fields updated; flags and size must be unchanged. + assert chunks[x1] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=10, pack_id=pack_id, obj_offset=0, obj_size=50) + assert chunks[x2] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=20, pack_id=pack_id, obj_offset=50, obj_size=60) + + # None and empty list are both no-ops. + chunks.update_pack_info(None) + chunks.update_pack_info([]) + assert chunks[x1].obj_offset == 0 + + def test_keyerror(): chunks = ChunkIndex() x = H2(1) diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 2da91e5c5..d4ea660c8 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -1,10 +1,11 @@ import os import sys +from hashlib import sha256 import pytest from ..constants import ROBJ_FILE_STREAM from ..helpers import IntegrityError, Location -from ..repository import Repository, MAX_DATA_SIZE, cache_if_remote, rest_serve_command +from ..repository import Repository, MAX_DATA_SIZE, cache_if_remote, rest_serve_command, PackWriter from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION from ..crypto.key import PlaintextKey from .hashindex_test import H @@ -200,3 +201,67 @@ class TestCacheIfRemote: def test_decrypted_cache_and_transform_incompatible(self, cache_repository, repo_objs): with pytest.raises(ValueError): cache_if_remote(cache_repository, decrypted_cache=repo_objs, transform=lambda key, data: data) + + +class MockStore: + def __init__(self): + self.stored = {} + + def store(self, key, data): + self.stored[key] = data + + +def test_pack_writer_returns_none_when_not_full(): + pw = PackWriter(MockStore(), max_count=2) + assert pw.add(b"a" * 32, b"data") is None + + +def test_pack_writer_flush_returns_none_when_empty(): + pw = PackWriter(MockStore(), max_count=1) + assert pw.flush() is None + + +def test_pack_writer_n1_flush(): + store = MockStore() + chunk_id = b"c" * 32 + cdata = b"payload" + pw = PackWriter(store, max_count=1) + results = pw.add(chunk_id, cdata) + assert results is not None + assert len(results) == 1 + stored_id, pack_id, obj_offset, obj_size = results[0] + assert stored_id == chunk_id + assert pack_id == chunk_id # N=1: pack_id == chunk_id + assert obj_offset == 0 + assert obj_size == len(cdata) + + +def test_pack_writer_n2_flush(): + store = MockStore() + id1, id2 = b"a" * 32, b"b" * 32 + data1, data2 = b"first", b"second" + pw = PackWriter(store, max_count=2) + assert pw.add(id1, data1) is None + results = pw.add(id2, data2) + assert results is not None + assert len(results) == 2 + pack_data = data1 + data2 + expected_pack_id = sha256(pack_data).digest() + assert results[0] == (id1, expected_pack_id, 0, len(data1)) + assert results[1] == (id2, expected_pack_id, len(data1), len(data2)) + + +def test_pack_writer_final_partial_pack_uses_sha256(): + # When max_count > 1, a final flush with only 1 piece must still use SHA256, + # not the N=1 pack_id == chunk_id hack. + store = MockStore() + chunk_id = b"d" * 32 + cdata = b"solo" + pw = PackWriter(store, max_count=3) + assert pw.add(chunk_id, cdata) is None + results = pw.flush() + assert results is not None + assert len(results) == 1 + _, pack_id, _, _ = results[0] + assert pack_id == sha256(cdata).digest() + assert pack_id != chunk_id