diff --git a/src/borg/archive.py b/src/borg/archive.py index ee156c412..7f628adb9 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -2012,12 +2012,16 @@ class ArchiveChecker: # either we already have this chunk in repo and chunks index or we add it now if id_ not in self.chunks: assert cdata is not None - pack_id = id_ # only correct for N=1 self.chunks[id_] = ChunkIndexEntry( - flags=ChunkIndex.F_USED, size=size, pack_id=pack_id, obj_offset=0, obj_size=0 + flags=ChunkIndex.F_USED, + size=size, + pack_id=UNKNOWN_BYTES32, + obj_offset=UNKNOWN_INT32, + obj_size=UNKNOWN_INT32, ) if self.repair: - self.repository.put(id_, cdata) + pack_results = self.repository.put(id_, cdata) + self.chunks.update_pack_info(pack_results) def verify_file_chunks(archive_name, item): """Verifies that all file chunks are present. Missing file chunks will be logged.""" diff --git a/src/borg/cache.py b/src/borg/cache.py index 7ef37c89f..b52fe31cb 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -730,9 +730,10 @@ class ChunksMixin: cdata = self.repo_objs.format( id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type ) - self.repository.put(id, cdata, wait=wait) + pack_results = self.repository.put(id, cdata, wait=wait) self.last_refresh_dt = now # .put also refreshed the lock self.chunks.add(id, size) + self.chunks.update_pack_info(pack_results) stats.update(size, not exists) return ChunkListEntry(id, size) @@ -830,6 +831,11 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin): def close(self): self.security_manager.save(self.manifest, self.key) pi = ProgressIndicatorMessage(msgid="cache.close") + # Flush any chunks still buffered in the pack writer and update the index + # so the last batch gets real pack location values instead of UNKNOWN_*. + if self._chunks is not None: + pack_results = self.repository.flush() + self._chunks.update_pack_info(pack_results) if self._files is not None: pi.output("Saving files cache") integrity_data = self._write_files_cache(self._files) diff --git a/src/borg/constants.py b/src/borg/constants.py index b62af7bf9..262ac4d27 100644 --- a/src/borg/constants.py +++ b/src/borg/constants.py @@ -51,6 +51,13 @@ ROBJ_DONTCARE = "*" # used to parse without type assertion (= accept any type) # the header, and the total size was set to precisely 20 MiB for borg < 1.3). MAX_DATA_SIZE = 20971479 +# Placeholder for pack location fields (obj_offset, obj_size) when the value is not yet known. +# Grep for UNKNOWN_INT32 to find every site that still needs updating. +UNKNOWN_INT32 = 0xFFFFFFFF + +# Placeholder for pack_id (32-byte field) when the value is not yet known. +UNKNOWN_BYTES32 = b"\xff" * 32 + # MAX_OBJECT_SIZE = MAX_DATA_SIZE + len(PUT header) MAX_OBJECT_SIZE = MAX_DATA_SIZE + 41 # see assertion at end of repository module diff --git a/src/borg/hashindex.pyi b/src/borg/hashindex.pyi index a978c587a..a947a3d54 100644 --- a/src/borg/hashindex.pyi +++ b/src/borg/hashindex.pyi @@ -21,6 +21,7 @@ class ChunkIndex: M_USER: int M_SYSTEM: int def add(self, key: bytes, size: int) -> None: ... + def update_pack_info(self, pack_results: list | None) -> None: ... def iteritems(self, *, only_new: bool = ...) -> Iterator: ... def clear_new(self) -> None: ... def __contains__(self, key: bytes) -> bool: ... diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index e4eb43274..cc280528e 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -5,6 +5,8 @@ import struct from borghash import HashTableNT +from .constants import UNKNOWN_INT32, UNKNOWN_BYTES32 + cdef _NoDefault = object() @@ -79,8 +81,9 @@ class ChunkIndex(HTProxyMixin, MutableMapping): else: flags = v.flags | self.F_USED assert v.size == 0 or v.size == size - pack_id = key # N=1: chunk_id == pack_id - self[key] = ChunkIndexEntry(flags=flags, size=size, pack_id=pack_id, obj_offset=0, obj_size=0) + self[key] = ChunkIndexEntry( + flags=flags, size=size, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + ) def __getitem__(self, key): """Specialized __getitem__ that hides system flags.""" @@ -105,6 +108,14 @@ class ChunkIndex(HTProxyMixin, MutableMapping): user_flags = value.flags & self.M_USER self.ht[key] = value._replace(flags=system_flags | user_flags) + def update_pack_info(self, pack_results): + """Update the on-disk location fields for a list of (chunk_id, pack_id, obj_offset, obj_size) tuples.""" + if not pack_results: + return + for chunk_id, pack_id, obj_offset, obj_size in pack_results: + existing = self[chunk_id] + self[chunk_id] = existing._replace(pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size) + def clear_new(self): """Clears the F_NEW flag of all items.""" for key, value in self.ht.items(): diff --git a/src/borg/repository.py b/src/borg/repository.py index 879e5afde..63001124d 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -99,6 +99,71 @@ def build_rest_backend(location): return REST(base_url="http://stdio-backend", command=rest_serve_command(location)) +class PackWriter: + """Buffers chunks into a pack file and writes to the store when full. + + Collects (chunk_id, cdata) pairs in a list and flushes once max_count is + reached. On flush it returns the location info for every chunk so the + caller can update the ChunkIndex with real values. + + At max_count=1 (N=1 phase) each put() maps exactly one chunk to one pack, + so pack_id == chunk_id and the naming scheme is unchanged from before. + Raising max_count later (N>1 phase) enables real packing without touching + this class's interface. + """ + + def __init__(self, store, *, max_count=1): + self.store = store + self.max_count = max_count + self._pieces = [] # list of (chunk_id, cdata) + + def add(self, chunk_id, cdata): + """Buffer a chunk. Returns flush results if the pack is now full, else None.""" + self._pieces.append((chunk_id, cdata)) + if len(self._pieces) >= self.max_count: + return self.flush() + return None + + def flush(self): + """Write the current pack to the store. + + Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples — + one entry per chunk that was written. Returns None if there was nothing + to flush. + """ + if not self._pieces: + return None + + # Build the pack bytes once by joining all pieces (avoids O(n^2) copies + # that incremental string concatenation would cause in Python). + pack_data = b"".join(cdata for _, cdata in self._pieces) + + # Determine pack_id. + # N=1: the pack contains exactly one chunk, so we keep pack_id == chunk_id + # (backward-compatible file naming: packs/{chunk_id_hex}). + # N>1: the pack contains multiple chunks; use SHA256(pack_bytes) so the + # file is content-addressed and borgstore can verify/cache it. + if self.max_count == 1: + pack_id = self._pieces[0][0] # N=1: pack_id == chunk_id + else: + pack_id = sha256(pack_data).digest() # N>1: content-addressed + + # Record (chunk_id, pack_id, obj_offset, obj_size) for every piece. + results = [] + offset = 0 + for chunk_id, cdata in self._pieces: + obj_size = len(cdata) + results.append((chunk_id, pack_id, offset, obj_size)) + offset += obj_size + + key = "packs/" + bin_to_hex(pack_id) + try: + self.store.store(key, pack_data) + finally: + self._pieces = [] # reset even on failure to prevent re-bundling a failed chunk + return results + + class Repository: """borgstore-based key/value store.""" @@ -219,6 +284,7 @@ class Repository: self.do_lock = lock self.lock_wait = lock_wait self.exclusive = exclusive + self._pack_writer = None def __repr__(self): return f"<{self.__class__.__name__} {self._location}>" @@ -340,9 +406,23 @@ class Repository: # important: lock *after* making sure that there actually is an existing, supported repository. if lock: self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire() + self._pack_writer = PackWriter(self.store, max_count=1) self.opened = True + def flush(self): + """Flush any buffered pack writer chunks. Returns pack_results (or None). + + Callers that maintain a ChunkIndex must call this and pass the result to + chunks.update_pack_info() before closing, so index entries for the last + batch of chunks get real pack location values instead of UNKNOWN_*. + """ + if self._pack_writer is not None: + return self._pack_writer.flush() + return None + def close(self): + if self._pack_writer is not None: + assert not self._pack_writer._pieces, "PackWriter has unflushed chunks; call flush() before close()" if self.lock: self.lock.release() self.lock = None @@ -564,15 +644,17 @@ class Repository: Note: when doing calls with wait=False this gets async and caller must deal with async results / exceptions later. + + Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples for + every chunk written to disk this call. At max_count=1 this is always + one entry. Callers should pass these to ChunkIndex.update_pack_info() + so the index holds real location values rather than UNKNOWN_INT32 placeholders. """ self._lock_refresh() data_size = len(data) if data_size > MAX_DATA_SIZE: raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]") - - pack_id = id # N=1: pack_id == chunk_id - key = "packs/" + bin_to_hex(pack_id) - self.store.store(key, data) + return self._pack_writer.add(id, data) def delete(self, id, wait=True): """delete a repo object diff --git a/src/borg/testsuite/hashindex_test.py b/src/borg/testsuite/hashindex_test.py index 8c9ec5be9..b503a7c61 100644 --- a/src/borg/testsuite/hashindex_test.py +++ b/src/borg/testsuite/hashindex_test.py @@ -3,6 +3,7 @@ import struct import pytest +from ..constants import UNKNOWN_INT32, UNKNOWN_BYTES32 from ..hashindex import ChunkIndex, ChunkIndexEntry @@ -20,15 +21,42 @@ def test_chunkindex_add(): chunks = ChunkIndex() x = H2(1) chunks.add(x, 0) - assert chunks[x] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=0, pack_id=x, obj_offset=0, obj_size=0) + assert chunks[x] == ChunkIndexEntry( + flags=ChunkIndex.F_USED, size=0, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + ) chunks.add(x, 2) # updating size (we do not have a size yet) - assert chunks[x] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=0, obj_size=0) + assert chunks[x] == ChunkIndexEntry( + flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + ) chunks.add(x, 2) - assert chunks[x] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=0, obj_size=0) + assert chunks[x] == ChunkIndexEntry( + flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + ) with pytest.raises(AssertionError): chunks.add(x, 3) # inconsistent size (we already have a different size) +def test_chunkindex_update_pack_info(): + chunks = ChunkIndex() + x1, x2 = H2(1), H2(2) + chunks.add(x1, 10) + chunks.add(x2, 20) + assert chunks[x1].obj_offset == UNKNOWN_INT32 + assert chunks[x2].obj_offset == UNKNOWN_INT32 + + pack_id = H2(3) + # Both chunks land in the same pack (N>1 scenario): batch update in one call. + chunks.update_pack_info([(x1, pack_id, 0, 50), (x2, pack_id, 50, 60)]) + # Location fields updated; flags and size must be unchanged. + assert chunks[x1] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=10, pack_id=pack_id, obj_offset=0, obj_size=50) + assert chunks[x2] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=20, pack_id=pack_id, obj_offset=50, obj_size=60) + + # None and empty list are both no-ops. + chunks.update_pack_info(None) + chunks.update_pack_info([]) + assert chunks[x1].obj_offset == 0 + + def test_keyerror(): chunks = ChunkIndex() x = H2(1) diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 2da91e5c5..d4ea660c8 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -1,10 +1,11 @@ import os import sys +from hashlib import sha256 import pytest from ..constants import ROBJ_FILE_STREAM from ..helpers import IntegrityError, Location -from ..repository import Repository, MAX_DATA_SIZE, cache_if_remote, rest_serve_command +from ..repository import Repository, MAX_DATA_SIZE, cache_if_remote, rest_serve_command, PackWriter from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION from ..crypto.key import PlaintextKey from .hashindex_test import H @@ -200,3 +201,67 @@ class TestCacheIfRemote: def test_decrypted_cache_and_transform_incompatible(self, cache_repository, repo_objs): with pytest.raises(ValueError): cache_if_remote(cache_repository, decrypted_cache=repo_objs, transform=lambda key, data: data) + + +class MockStore: + def __init__(self): + self.stored = {} + + def store(self, key, data): + self.stored[key] = data + + +def test_pack_writer_returns_none_when_not_full(): + pw = PackWriter(MockStore(), max_count=2) + assert pw.add(b"a" * 32, b"data") is None + + +def test_pack_writer_flush_returns_none_when_empty(): + pw = PackWriter(MockStore(), max_count=1) + assert pw.flush() is None + + +def test_pack_writer_n1_flush(): + store = MockStore() + chunk_id = b"c" * 32 + cdata = b"payload" + pw = PackWriter(store, max_count=1) + results = pw.add(chunk_id, cdata) + assert results is not None + assert len(results) == 1 + stored_id, pack_id, obj_offset, obj_size = results[0] + assert stored_id == chunk_id + assert pack_id == chunk_id # N=1: pack_id == chunk_id + assert obj_offset == 0 + assert obj_size == len(cdata) + + +def test_pack_writer_n2_flush(): + store = MockStore() + id1, id2 = b"a" * 32, b"b" * 32 + data1, data2 = b"first", b"second" + pw = PackWriter(store, max_count=2) + assert pw.add(id1, data1) is None + results = pw.add(id2, data2) + assert results is not None + assert len(results) == 2 + pack_data = data1 + data2 + expected_pack_id = sha256(pack_data).digest() + assert results[0] == (id1, expected_pack_id, 0, len(data1)) + assert results[1] == (id2, expected_pack_id, len(data1), len(data2)) + + +def test_pack_writer_final_partial_pack_uses_sha256(): + # When max_count > 1, a final flush with only 1 piece must still use SHA256, + # not the N=1 pack_id == chunk_id hack. + store = MockStore() + chunk_id = b"d" * 32 + cdata = b"solo" + pw = PackWriter(store, max_count=3) + assert pw.add(chunk_id, cdata) is None + results = pw.flush() + assert results is not None + assert len(results) == 1 + _, pack_id, _, _ = results[0] + assert pack_id == sha256(cdata).digest() + assert pack_id != chunk_id