From 2553a705ccc62ed00fc74196afe32239fddf858a Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Wed, 10 Jun 2026 10:43:45 +0530 Subject: [PATCH 01/16] repository: add obj_offset/obj_size range-load params to Store.load calls retry_size min() guards against corrupted meta_size, no-op for healthy objects. --- src/borg/repository.py | 16 ++++++++++++---- src/borg/testsuite/repository_test.py | 14 +++++++++++++- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/src/borg/repository.py b/src/borg/repository.py index c8ae6c1a4..f8b0ffee5 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -603,20 +603,23 @@ class Repository: # note: do not collect the marker id return result - def get(self, id, read_data=True, raise_missing=True): + def get(self, id, read_data=True, raise_missing=True, obj_offset=0, obj_size=None): self._lock_refresh() pack_id = id # N=1: pack_id == chunk_id id_hex = bin_to_hex(id) key = "packs/" + bin_to_hex(pack_id) try: if read_data: - return self.store.load(key) + return self.store.load(key, offset=obj_offset, size=obj_size) else: # RepoObj layout supports separately encrypted metadata and data. # We return enough bytes so the client can decrypt the metadata. hdr_size = RepoObj.obj_header.size extra_size = 1024 - hdr_size # load a bit more, 1024b, reduces round trips - obj = self.store.load(key, size=hdr_size + extra_size) + load_size = hdr_size + extra_size + if obj_size is not None: + load_size = min(load_size, obj_size) + obj = self.store.load(key, offset=obj_offset, size=load_size) hdr = obj[0:hdr_size] if len(hdr) != hdr_size: raise IntegrityError(f"Object too small [id {id_hex}]: expected {hdr_size}, got {len(hdr)} bytes") @@ -624,7 +627,12 @@ class Repository: if meta_size > extra_size: # we did not get enough, need to load more, but not all. # this should be rare, as chunk metadata is rather small usually. - obj = self.store.load(key, size=hdr_size + meta_size) + retry_size = hdr_size + meta_size + if obj_size is not None: + # normally a no-op (meta_size <= obj_size - hdr_size for a healthy object); + # guards against a corrupted meta_size producing an oversize read. + retry_size = min(retry_size, obj_size) + obj = self.store.load(key, offset=obj_offset, size=retry_size) meta = obj[hdr_size : hdr_size + meta_size] if len(meta) != meta_size: raise IntegrityError(f"Object too small [id {id_hex}]: expected {meta_size}, got {len(meta)} bytes") diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 86fe55a23..04de1f37d 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -3,7 +3,7 @@ import sys from hashlib import sha256 import pytest -from ..helpers import IntegrityError, Location +from ..helpers import IntegrityError, Location, bin_to_hex from ..repository import Repository, MAX_DATA_SIZE, rest_serve_command, PackWriter from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION from .hashindex_test import H @@ -198,6 +198,18 @@ def test_pack_writer_n2_flush(): assert results[1] == (id2, expected_pack_id, len(data1), len(data2)) +def test_get_with_range(tmp_path): + # get() passes obj_offset/obj_size through to store.load() for range reads. + chunk1 = fchunk(b"FIRST") + chunk2 = fchunk(b"SECOND") + pack = chunk1 + chunk2 + pack_id = H(42) + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + repository.store_store("packs/" + bin_to_hex(pack_id), pack) + assert repository.get(pack_id, obj_offset=0, obj_size=len(chunk1)) == chunk1 + assert repository.get(pack_id, obj_offset=len(chunk1), obj_size=len(chunk2)) == chunk2 + + def test_pack_writer_final_partial_pack_uses_sha256(): # When max_count > 1, a final flush with only 1 piece must still use SHA256, # not the N=1 pack_id == chunk_id hack. From 68491c440989869ef75bfc3e4df2855f2cc91d64 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Wed, 10 Jun 2026 10:44:51 +0530 Subject: [PATCH 02/16] repository: add set_chunk_index() for ChunkIndex-based pack routing, refs #8572 Replace _pack_info (session-scoped dict) with a borrowed ChunkIndex reference. Cache passes its index via set_chunk_index(); get() routes correctly for all sessions. --- src/borg/cache.py | 2 ++ src/borg/repository.py | 22 +++++++++--- src/borg/testsuite/repository_test.py | 48 +++++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 5 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index 992fda667..8399ab287 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -679,6 +679,7 @@ class ChunksMixin: def chunks(self): if self._chunks is None: self._chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True) + self.repository.set_chunk_index(self._chunks) return self._chunks def seen_chunk(self, id, size=None): @@ -869,6 +870,7 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin): def wipe_cache(self): logger.warning("Discarding incompatible cache and forcing a cache rebuild") self._chunks = ChunkIndex() + self.repository.set_chunk_index(self._chunks) self.cache_config.manifest_id = "" self.cache_config._config.set("cache", "manifest", "") diff --git a/src/borg/repository.py b/src/borg/repository.py index f8b0ffee5..403234efa 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -285,6 +285,7 @@ class Repository: self.lock_wait = lock_wait self.exclusive = exclusive self._pack_writer = None + self._chunks = None # borrowed ChunkIndex reference, set by set_chunk_index() def __repr__(self): return f"<{self.__class__.__name__} {self._location}>" @@ -407,15 +408,19 @@ class Repository: if lock: self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire() self._pack_writer = PackWriter(self.store, max_count=1) + self._chunks = None self.opened = True - def flush(self): - """Flush any buffered pack writer chunks. Returns pack_results (or None). + def set_chunk_index(self, chunks): + """Set the ChunkIndex get() uses to resolve pack locations. - Callers that maintain a ChunkIndex must call this and pass the result to - chunks.update_pack_info() before closing, so index entries for the last - batch of chunks get real pack location values instead of UNKNOWN_*. + The caller retains ownership; Repository holds a borrowed reference. + Pass None to clear. """ + self._chunks = chunks + + def flush(self): + """Flush any buffered pack writer chunks. Returns pack_results (or None).""" if self._pack_writer is not None: return self._pack_writer.flush() return None @@ -429,6 +434,7 @@ class Repository: if self.store_opened: self.store.close() self.store_opened = False + self._chunks = None self.opened = False def info(self): @@ -606,6 +612,10 @@ class Repository: def get(self, id, read_data=True, raise_missing=True, obj_offset=0, obj_size=None): self._lock_refresh() pack_id = id # N=1: pack_id == chunk_id + if self._chunks is not None: + entry = self._chunks.get(id) + if entry is not None and entry.pack_id != UNKNOWN_BYTES32: # UNKNOWN: buffered, not yet flushed + pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size id_hex = bin_to_hex(id) key = "packs/" + bin_to_hex(pack_id) try: @@ -644,6 +654,8 @@ class Repository: return None def get_many(self, ids, read_data=True, raise_missing=True): + # N>1: set_chunk_index() must be called before any get() so locations from prior sessions + # are available; without it get() falls back to N=1 (pack_id == chunk_id) for every id. for id_ in ids: yield self.get(id_, read_data=read_data, raise_missing=raise_missing) diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 04de1f37d..5852f3fd8 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -4,6 +4,7 @@ from hashlib import sha256 import pytest from ..helpers import IntegrityError, Location, bin_to_hex +from ..hashindex import ChunkIndex from ..repository import Repository, MAX_DATA_SIZE, rest_serve_command, PackWriter from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION from .hashindex_test import H @@ -210,6 +211,53 @@ def test_get_with_range(tmp_path): assert repository.get(pack_id, obj_offset=len(chunk1), obj_size=len(chunk2)) == chunk2 +def test_get_read_data_false_with_range(tmp_path): + # read_data=False with obj_size limits the load to the object boundary. + hdr_size = RepoObj.obj_header.size + chunk1 = fchunk(b"FIRST") + chunk2 = fchunk(b"SECOND") + pack = chunk1 + chunk2 + pack_id = H(43) + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + repository.store_store("packs/" + bin_to_hex(pack_id), pack) + result = repository.get(pack_id, read_data=False, obj_offset=0, obj_size=len(chunk1)) + assert result == chunk1[:hdr_size] # empty meta, so header only + result2 = repository.get(pack_id, read_data=False, obj_offset=len(chunk1), obj_size=len(chunk2)) + assert result2 == chunk2[:hdr_size] + + +def test_get_read_data_false_large_meta(tmp_path): + # When meta_size > extra_size (975 bytes), get() retries with a larger load. + hdr_size = RepoObj.obj_header.size + big_meta = b"M" * 1000 # 1000 > 975, forces the retry load + chunk = fchunk(b"DATA", meta=big_meta) + pack_id = H(44) + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + repository.store_store("packs/" + bin_to_hex(pack_id), chunk) + result = repository.get(pack_id, read_data=False, obj_offset=0, obj_size=len(chunk)) + assert result == chunk[: hdr_size + len(big_meta)] + + +def test_get_uses_chunk_index_location(tmp_path): + # get() routes to the correct pack and offset when a ChunkIndex is set via set_chunk_index(). + chunk1 = fchunk(b"FIRST") + chunk2 = fchunk(b"SECOND") + pack = chunk1 + chunk2 + pack_id = H(55) + id1, id2 = H(56), H(57) + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + # Inject the pack directly; bypasses PackWriter to test routing independently. + repository.store_store("packs/" + bin_to_hex(pack_id), pack) + chunks = ChunkIndex() + chunks.add(id1, len(chunk1)) + chunks.update_pack_info([(id1, pack_id, 0, len(chunk1))]) + chunks.add(id2, len(chunk2)) + chunks.update_pack_info([(id2, pack_id, len(chunk1), len(chunk2))]) + repository.set_chunk_index(chunks) + assert repository.get(id1) == chunk1 + assert repository.get(id2) == chunk2 + + def test_pack_writer_final_partial_pack_uses_sha256(): # When max_count > 1, a final flush with only 1 piece must still use SHA256, # not the N=1 pack_id == chunk_id hack. From 54adb9b7f66aa96e0021dea67fa12a1cc0f7ee37 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Wed, 10 Jun 2026 22:12:01 +0530 Subject: [PATCH 03/16] repository: resolve pack location from ChunkIndex in get(), flush handles update_pack_info Remove obj_offset/obj_size params from get(); always initialize _chunks to an empty ChunkIndex so callers never need to guard for None. --- src/borg/cache.py | 5 +--- src/borg/repository.py | 30 +++++++++++------------ src/borg/testsuite/repository_test.py | 34 +++++++++++++-------------- 3 files changed, 31 insertions(+), 38 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index 8399ab287..1a95d0408 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -832,11 +832,8 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin): def close(self): self.security_manager.save(self.manifest, self.key) pi = ProgressIndicatorMessage(msgid="cache.close") - # Flush any chunks still buffered in the pack writer and update the index - # so the last batch gets real pack location values instead of UNKNOWN_*. if self._chunks is not None: - pack_results = self.repository.flush() - self._chunks.update_pack_info(pack_results) + self.repository.flush() if self._files is not None: pi.output("Saving files cache") integrity_data = self._write_files_cache(self._files) diff --git a/src/borg/repository.py b/src/borg/repository.py index 403234efa..1a7690ad6 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -408,22 +408,23 @@ class Repository: if lock: self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire() self._pack_writer = PackWriter(self.store, max_count=1) - self._chunks = None + self._chunks = ChunkIndex() self.opened = True def set_chunk_index(self, chunks): """Set the ChunkIndex get() uses to resolve pack locations. The caller retains ownership; Repository holds a borrowed reference. - Pass None to clear. + Pass None to reset to an empty index. """ - self._chunks = chunks + self._chunks = chunks if chunks is not None else ChunkIndex() def flush(self): - """Flush any buffered pack writer chunks. Returns pack_results (or None).""" + """Flush any buffered pack writer chunks.""" if self._pack_writer is not None: - return self._pack_writer.flush() - return None + pack_results = self._pack_writer.flush() + if pack_results: + self._chunks.update_pack_info(pack_results) def close(self): if self._pack_writer is not None: @@ -609,13 +610,13 @@ class Repository: # note: do not collect the marker id return result - def get(self, id, read_data=True, raise_missing=True, obj_offset=0, obj_size=None): + def get(self, id, read_data=True, raise_missing=True): self._lock_refresh() - pack_id = id # N=1: pack_id == chunk_id - if self._chunks is not None: - entry = self._chunks.get(id) - if entry is not None and entry.pack_id != UNKNOWN_BYTES32: # UNKNOWN: buffered, not yet flushed - pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size + pack_id = id # N=1 fallback: pack_id == chunk_id + obj_offset, obj_size = 0, None + entry = self._chunks.get(id) + if entry is not None and entry.pack_id != UNKNOWN_BYTES32: # UNKNOWN: buffered, not yet flushed + pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size id_hex = bin_to_hex(id) key = "packs/" + bin_to_hex(pack_id) try: @@ -654,8 +655,6 @@ class Repository: return None def get_many(self, ids, read_data=True, raise_missing=True): - # N>1: set_chunk_index() must be called before any get() so locations from prior sessions - # are available; without it get() falls back to N=1 (pack_id == chunk_id) for every id. for id_ in ids: yield self.get(id_, read_data=read_data, raise_missing=raise_missing) @@ -667,8 +666,7 @@ class Repository: Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples for every chunk written to disk this call. At max_count=1 this is always - one entry. Callers should pass these to ChunkIndex.update_pack_info() - so the index holds real location values rather than UNKNOWN_INT32 placeholders. + one entry. """ self._lock_refresh() data_size = len(data) diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 5852f3fd8..04ba25858 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -199,31 +199,24 @@ def test_pack_writer_n2_flush(): assert results[1] == (id2, expected_pack_id, len(data1), len(data2)) -def test_get_with_range(tmp_path): - # get() passes obj_offset/obj_size through to store.load() for range reads. - chunk1 = fchunk(b"FIRST") - chunk2 = fchunk(b"SECOND") - pack = chunk1 + chunk2 - pack_id = H(42) - with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: - repository.store_store("packs/" + bin_to_hex(pack_id), pack) - assert repository.get(pack_id, obj_offset=0, obj_size=len(chunk1)) == chunk1 - assert repository.get(pack_id, obj_offset=len(chunk1), obj_size=len(chunk2)) == chunk2 - - def test_get_read_data_false_with_range(tmp_path): - # read_data=False with obj_size limits the load to the object boundary. + # read_data=False with ChunkIndex entries limits the load to each object's boundary. hdr_size = RepoObj.obj_header.size chunk1 = fchunk(b"FIRST") chunk2 = fchunk(b"SECOND") pack = chunk1 + chunk2 pack_id = H(43) + id1, id2 = H(47), H(48) with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: repository.store_store("packs/" + bin_to_hex(pack_id), pack) - result = repository.get(pack_id, read_data=False, obj_offset=0, obj_size=len(chunk1)) - assert result == chunk1[:hdr_size] # empty meta, so header only - result2 = repository.get(pack_id, read_data=False, obj_offset=len(chunk1), obj_size=len(chunk2)) - assert result2 == chunk2[:hdr_size] + chunks = ChunkIndex() + chunks.add(id1, len(chunk1)) + chunks.update_pack_info([(id1, pack_id, 0, len(chunk1))]) + chunks.add(id2, len(chunk2)) + chunks.update_pack_info([(id2, pack_id, len(chunk1), len(chunk2))]) + repository.set_chunk_index(chunks) + assert repository.get(id1, read_data=False) == chunk1[:hdr_size] + assert repository.get(id2, read_data=False) == chunk2[:hdr_size] def test_get_read_data_false_large_meta(tmp_path): @@ -232,9 +225,14 @@ def test_get_read_data_false_large_meta(tmp_path): big_meta = b"M" * 1000 # 1000 > 975, forces the retry load chunk = fchunk(b"DATA", meta=big_meta) pack_id = H(44) + chunk_id = H(49) with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: repository.store_store("packs/" + bin_to_hex(pack_id), chunk) - result = repository.get(pack_id, read_data=False, obj_offset=0, obj_size=len(chunk)) + chunks = ChunkIndex() + chunks.add(chunk_id, len(chunk)) + chunks.update_pack_info([(chunk_id, pack_id, 0, len(chunk))]) + repository.set_chunk_index(chunks) + result = repository.get(chunk_id, read_data=False) assert result == chunk[: hdr_size + len(big_meta)] From 1df2065f856377eea2fcf34f1be4cc063d3fb0d6 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Wed, 10 Jun 2026 23:01:29 +0530 Subject: [PATCH 04/16] repository: remove N=1 fallback from get(), update _chunks eagerly in put() get() raises ObjectNotFound when entry is missing or UNKNOWN_BYTES32; put() marks the id in _chunks immediately so the index is live after each write. --- src/borg/repository.py | 19 +++++++++++-------- src/borg/testsuite/repository_test.py | 20 +++++++++++++++++++- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/src/borg/repository.py b/src/borg/repository.py index 1a7690ad6..2fb2a5aad 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -415,9 +415,8 @@ class Repository: """Set the ChunkIndex get() uses to resolve pack locations. The caller retains ownership; Repository holds a borrowed reference. - Pass None to reset to an empty index. """ - self._chunks = chunks if chunks is not None else ChunkIndex() + self._chunks = chunks def flush(self): """Flush any buffered pack writer chunks.""" @@ -435,7 +434,6 @@ class Repository: if self.store_opened: self.store.close() self.store_opened = False - self._chunks = None self.opened = False def info(self): @@ -612,11 +610,12 @@ class Repository: def get(self, id, read_data=True, raise_missing=True): self._lock_refresh() - pack_id = id # N=1 fallback: pack_id == chunk_id - obj_offset, obj_size = 0, None entry = self._chunks.get(id) - if entry is not None and entry.pack_id != UNKNOWN_BYTES32: # UNKNOWN: buffered, not yet flushed - pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size + if entry is None or entry.pack_id == UNKNOWN_BYTES32: + if raise_missing: + raise self.ObjectNotFound(id, str(self._location)) + return None + pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size id_hex = bin_to_hex(id) key = "packs/" + bin_to_hex(pack_id) try: @@ -672,7 +671,11 @@ class Repository: data_size = len(data) if data_size > MAX_DATA_SIZE: raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]") - return self._pack_writer.add(id, data) + pack_results = self._pack_writer.add(id, data) + self._chunks.add(id, 0) # mark seen; uncompressed size filled in by cache layer + if pack_results: + self._chunks.update_pack_info(pack_results) + return pack_results def delete(self, id, wait=True): """delete a repo object diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 04ba25858..bba7faea4 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -77,15 +77,21 @@ def pdchunk(chunk): def test_basic_operations(repo_fixtures, request): + chunks = ChunkIndex() with get_repository_from_fixture(repo_fixtures, request) as repository: for x in range(100): - repository.put(H(x), fchunk(b"SOMEDATA")) + pack_results = repository.put(H(x), fchunk(b"SOMEDATA")) + if pack_results: + for chunk_id, *_ in pack_results: + chunks.add(chunk_id, 0) + chunks.update_pack_info(pack_results) key50 = H(50) assert pdchunk(repository.get(key50)) == b"SOMEDATA" repository.delete(key50) with pytest.raises(Repository.ObjectNotFound): repository.get(key50) with reopen(repository) as repository: + repository.set_chunk_index(chunks) with pytest.raises(Repository.ObjectNotFound): repository.get(key50) for x in range(100): @@ -256,6 +262,18 @@ def test_get_uses_chunk_index_location(tmp_path): assert repository.get(id2) == chunk2 +def test_put_marks_id_in_chunk_index(tmp_path): + # put() immediately updates _chunks: add() marks the id as seen, then update_pack_info + # fills in the real pack location for the current session. + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + id1 = H(1) + repository.put(id1, fchunk(b"ZEROS")) + entry = repository._chunks.get(id1) + assert entry is not None + assert entry.pack_id == id1 # N=1: pack_id == chunk_id, set by update_pack_info in put() + assert entry.size == 0 # uncompressed size filled in by cache layer + + def test_pack_writer_final_partial_pack_uses_sha256(): # When max_count > 1, a final flush with only 1 piece must still use SHA256, # not the N=1 pack_id == chunk_id hack. From 67b11f78264a36c315067c1973395a79edccd427 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Thu, 11 Jun 2026 10:10:11 +0530 Subject: [PATCH 05/16] repository: PackWriter now manages ChunkIndex updates internally On add(), marks chunk with UNKNOWN_BYTES32; on flush(), replaces with real pack_id. put(), flush(), and set_chunk_index() simplified accordingly. --- src/borg/repository.py | 51 ++++++++++++++++++--------- src/borg/testsuite/repository_test.py | 8 ++--- 2 files changed, 37 insertions(+), 22 deletions(-) diff --git a/src/borg/repository.py b/src/borg/repository.py index 2fb2a5aad..39017847d 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -103,8 +103,10 @@ class PackWriter: """Buffers chunks into a pack file and writes to the store when full. Collects (chunk_id, cdata) pairs in a list and flushes once max_count is - reached. On flush it returns the location info for every chunk so the - caller can update the ChunkIndex with real values. + reached. If a ChunkIndex is provided via the chunks parameter, PackWriter + maintains it directly: each add() marks the chunk as pending + (pack_id=UNKNOWN_BYTES32) with the correct offset and size; flush() then + updates all pending entries with the real pack_id once the pack is on disk. At max_count=1 (N=1 phase) each put() maps exactly one chunk to one pack, so pack_id == chunk_id and the naming scheme is unchanged from before. @@ -112,14 +114,21 @@ class PackWriter: this class's interface. """ - def __init__(self, store, *, max_count=1): + def __init__(self, store, *, max_count=1, chunks=None): self.store = store self.max_count = max_count + self.chunks = chunks # ChunkIndex reference; may be None (e.g. in unit tests) self._pieces = [] # list of (chunk_id, cdata) + self._current_offset = 0 # byte offset of the next chunk within the current pack def add(self, chunk_id, cdata): """Buffer a chunk. Returns flush results if the pack is now full, else None.""" + if self.chunks is not None: + # Mark chunk as pending: real offset/size are known now; pack_id not yet. + self.chunks.add(chunk_id, 0) # size filled in by cache layer + self.chunks.update_pack_info([(chunk_id, UNKNOWN_BYTES32, self._current_offset, len(cdata))]) self._pieces.append((chunk_id, cdata)) + self._current_offset += len(cdata) if len(self._pieces) >= self.max_count: return self.flush() return None @@ -127,9 +136,9 @@ class PackWriter: def flush(self): """Write the current pack to the store. - Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples — + Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples -- one entry per chunk that was written. Returns None if there was nothing - to flush. + to flush. Also updates the ChunkIndex (if set) with the real pack_id. """ if not self._pieces: return None @@ -161,6 +170,9 @@ class PackWriter: self.store.store(key, pack_data) finally: self._pieces = [] # reset even on failure to prevent re-bundling a failed chunk + self._current_offset = 0 + if self.chunks is not None: + self.chunks.update_pack_info(results) # replace UNKNOWN_BYTES32 with real pack_id return results @@ -407,8 +419,8 @@ class Repository: # important: lock *after* making sure that there actually is an existing, supported repository. if lock: self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire() - self._pack_writer = PackWriter(self.store, max_count=1) self._chunks = ChunkIndex() + self._pack_writer = PackWriter(self.store, max_count=1, chunks=self._chunks) self.opened = True def set_chunk_index(self, chunks): @@ -417,13 +429,12 @@ class Repository: The caller retains ownership; Repository holds a borrowed reference. """ self._chunks = chunks + self._pack_writer.chunks = chunks # keep PackWriter in sync def flush(self): """Flush any buffered pack writer chunks.""" if self._pack_writer is not None: - pack_results = self._pack_writer.flush() - if pack_results: - self._chunks.update_pack_info(pack_results) + self._pack_writer.flush() # PackWriter updates _chunks internally def close(self): if self._pack_writer is not None: @@ -611,11 +622,21 @@ class Repository: def get(self, id, read_data=True, raise_missing=True): self._lock_refresh() entry = self._chunks.get(id) - if entry is None or entry.pack_id == UNKNOWN_BYTES32: + if entry is not None and entry.pack_id == UNKNOWN_BYTES32: + # chunk is buffered in PackWriter, not yet written to a pack file if raise_missing: raise self.ObjectNotFound(id, str(self._location)) return None - pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size + if entry is not None: + pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size + else: + # N=1 fallback for cross-session reads without a populated index. + # obj_size=None tells store.load() to fetch the full file; safe for N=1 + # because each pack holds exactly one chunk. Once a Repository.chunks + # lazy-build property lands, this branch will be unreachable. + pack_id = id # N=1: pack_id == chunk_id + obj_offset = 0 + obj_size = None id_hex = bin_to_hex(id) key = "packs/" + bin_to_hex(pack_id) try: @@ -627,6 +648,8 @@ class Repository: hdr_size = RepoObj.obj_header.size extra_size = 1024 - hdr_size # load a bit more, 1024b, reduces round trips load_size = hdr_size + extra_size + # obj_size is None only via the N=1 fallback above; clamp when available + # so a corrupted or malicious obj_size cannot cause an oversized read. if obj_size is not None: load_size = min(load_size, obj_size) obj = self.store.load(key, offset=obj_offset, size=load_size) @@ -671,11 +694,7 @@ class Repository: data_size = len(data) if data_size > MAX_DATA_SIZE: raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]") - pack_results = self._pack_writer.add(id, data) - self._chunks.add(id, 0) # mark seen; uncompressed size filled in by cache layer - if pack_results: - self._chunks.update_pack_info(pack_results) - return pack_results + return self._pack_writer.add(id, data) # PackWriter updates _chunks internally def delete(self, id, wait=True): """delete a repo object diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index bba7faea4..a0be42ece 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -77,19 +77,15 @@ def pdchunk(chunk): def test_basic_operations(repo_fixtures, request): - chunks = ChunkIndex() with get_repository_from_fixture(repo_fixtures, request) as repository: for x in range(100): - pack_results = repository.put(H(x), fchunk(b"SOMEDATA")) - if pack_results: - for chunk_id, *_ in pack_results: - chunks.add(chunk_id, 0) - chunks.update_pack_info(pack_results) + repository.put(H(x), fchunk(b"SOMEDATA")) # put() updates _chunks via PackWriter key50 = H(50) assert pdchunk(repository.get(key50)) == b"SOMEDATA" repository.delete(key50) with pytest.raises(Repository.ObjectNotFound): repository.get(key50) + chunks = repository._chunks # capture index before close with reopen(repository) as repository: repository.set_chunk_index(chunks) with pytest.raises(Repository.ObjectNotFound): From 05ce183f71f479a67a24e8de9964cc95f39977d4 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Thu, 11 Jun 2026 11:17:11 +0530 Subject: [PATCH 06/16] repository: add BORG_TESTONLY_SHA256_PACK_ID to force sha256 pack_ids at N=1 Adds tox env and informational CI job (continue-on-error) to track progress toward full sha256 pack_id adoption, refs #8572 --- .github/workflows/ci.yml | 51 ++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 5 ++++ src/borg/repository.py | 6 +++-- 3 files changed, 60 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 547955118..95288304b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -703,3 +703,54 @@ jobs: report_type: coverage env_vars: OS,python files: coverage.xml + + sha256_pack_id_tests: + name: sha256 pack-id (informational) + needs: [lint] + runs-on: ubuntu-24.04 + timeout-minutes: 90 + continue-on-error: true + + steps: + - uses: actions/checkout@v6 + with: + fetch-depth: 0 + fetch-tags: true + + - name: Set up Python 3.12 + uses: actions/setup-python@v6 + with: + python-version: "3.12" + + - name: Cache pip + uses: actions/cache@v5 + with: + path: ~/.cache/pip + key: ${{ runner.os }}-${{ runner.arch }}-pip-sha256-pack-id-${{ hashFiles('requirements.d/development.lock.txt') }} + restore-keys: | + ${{ runner.os }}-${{ runner.arch }}-pip- + + - name: Cache tox environments + uses: actions/cache@v5 + with: + path: .tox + key: ${{ runner.os }}-${{ runner.arch }}-tox-sha256-pack-id-${{ hashFiles('requirements.d/development.lock.txt', 'pyproject.toml') }} + restore-keys: | + ${{ runner.os }}-${{ runner.arch }}-tox-sha256-pack-id- + + - name: Install Linux packages + run: | + sudo apt-get update + sudo apt-get install -y pkg-config build-essential + sudo apt-get install -y libssl-dev libacl1-dev liblz4-dev + + - name: Install Python requirements + run: | + python -m pip install --upgrade pip setuptools wheel + pip install -r requirements.d/development.lock.txt + + - name: Install borgbackup + run: pip install -ve ".[cockpit,s3,sftp,rclone]" + + - name: Run tests with sha256 pack-ids + run: tox -e sha256-pack-id diff --git a/pyproject.toml b/pyproject.toml index 683a46372..c9e0ec8cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -257,6 +257,11 @@ extras = ["pyfuse3", "sftp", "s3", "rclone"] set_env = {BORG_FUSE_IMPL = "mfusepy"} extras = ["mfusepy", "sftp", "s3", "rclone"] +# Informational env: forces sha256 pack_ids even with max_count=1 to expose +# code that still assumes pack_id == chunk_id. Run: tox -e sha256-pack-id +[tool.tox.env.sha256-pack-id] +set_env = {BORG_TESTONLY_SHA256_PACK_ID = "1"} + [tool.tox.env.ruff] skip_install = true deps = ["ruff"] diff --git a/src/borg/repository.py b/src/borg/repository.py index 39017847d..da855387d 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -152,10 +152,12 @@ class PackWriter: # (backward-compatible file naming: packs/{chunk_id_hex}). # N>1: the pack contains multiple chunks; use SHA256(pack_bytes) so the # file is content-addressed and borgstore can verify/cache it. - if self.max_count == 1: + # BORG_TESTONLY_SHA256_PACK_ID: always use sha256 even at N=1, exposing code + # that still assumes pack_id == chunk_id. + if self.max_count == 1 and os.environ.get("BORG_TESTONLY_SHA256_PACK_ID") != "1": pack_id = self._pieces[0][0] # N=1: pack_id == chunk_id else: - pack_id = sha256(pack_data).digest() # N>1: content-addressed + pack_id = sha256(pack_data).digest() # Record (chunk_id, pack_id, obj_offset, obj_size) for every piece. results = [] From 844daf9324ae41595bcb7e4db3352e70fe6d3751 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Thu, 11 Jun 2026 12:51:57 +0530 Subject: [PATCH 07/16] repository: add set_chunk_index() and lazy chunks property; route get() through ChunkIndex PackWriter now always owns a ChunkIndex; the N=1 fallback in get() is removed. --- src/borg/repository.py | 72 ++++++++++++++++++++++++------------------ 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/src/borg/repository.py b/src/borg/repository.py index da855387d..e31333cf4 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -117,16 +117,15 @@ class PackWriter: def __init__(self, store, *, max_count=1, chunks=None): self.store = store self.max_count = max_count - self.chunks = chunks # ChunkIndex reference; may be None (e.g. in unit tests) + self.chunks = chunks if chunks is not None else ChunkIndex() self._pieces = [] # list of (chunk_id, cdata) self._current_offset = 0 # byte offset of the next chunk within the current pack def add(self, chunk_id, cdata): """Buffer a chunk. Returns flush results if the pack is now full, else None.""" - if self.chunks is not None: - # Mark chunk as pending: real offset/size are known now; pack_id not yet. - self.chunks.add(chunk_id, 0) # size filled in by cache layer - self.chunks.update_pack_info([(chunk_id, UNKNOWN_BYTES32, self._current_offset, len(cdata))]) + # Mark chunk as pending: real offset/size are known now; pack_id not yet. + self.chunks.add(chunk_id, 0) # size filled in by cache layer + self.chunks.update_pack_info([(chunk_id, UNKNOWN_BYTES32, self._current_offset, len(cdata))]) self._pieces.append((chunk_id, cdata)) self._current_offset += len(cdata) if len(self._pieces) >= self.max_count: @@ -138,7 +137,7 @@ class PackWriter: Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples -- one entry per chunk that was written. Returns None if there was nothing - to flush. Also updates the ChunkIndex (if set) with the real pack_id. + to flush. Always updates the ChunkIndex with the real pack_id. """ if not self._pieces: return None @@ -173,8 +172,7 @@ class PackWriter: finally: self._pieces = [] # reset even on failure to prevent re-bundling a failed chunk self._current_offset = 0 - if self.chunks is not None: - self.chunks.update_pack_info(results) # replace UNKNOWN_BYTES32 with real pack_id + self.chunks.update_pack_info(results) # replace UNKNOWN_BYTES32 with real pack_id return results @@ -299,7 +297,8 @@ class Repository: self.lock_wait = lock_wait self.exclusive = exclusive self._pack_writer = None - self._chunks = None # borrowed ChunkIndex reference, set by set_chunk_index() + self._chunks = None # ChunkIndex; set by open(), replaced by set_chunk_index() + self._chunks_initialized = False def __repr__(self): return f"<{self.__class__.__name__} {self._location}>" @@ -422,6 +421,7 @@ class Repository: if lock: self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire() self._chunks = ChunkIndex() + self._chunks_initialized = False self._pack_writer = PackWriter(self.store, max_count=1, chunks=self._chunks) self.opened = True @@ -432,6 +432,27 @@ class Repository: """ self._chunks = chunks self._pack_writer.chunks = chunks # keep PackWriter in sync + self._chunks_initialized = True + + @property + def chunks(self): + """ChunkIndex mapping every known chunk id to its pack location. + + Built lazily on first access if set_chunk_index() has not been called. + Current-session put() entries (which carry the precise pack_id, offset, + and size set by PackWriter) are overlaid on top of the store-built + entries so they always win. + """ + if not self._chunks_initialized: + from .cache import build_chunkindex_from_repo + + built = build_chunkindex_from_repo(self) + for k, v in self._chunks.iteritems(): + built[k] = v + self._chunks = built + self._pack_writer.chunks = built + self._chunks_initialized = True + return self._chunks def flush(self): """Flush any buffered pack writer chunks.""" @@ -623,22 +644,17 @@ class Repository: def get(self, id, read_data=True, raise_missing=True): self._lock_refresh() - entry = self._chunks.get(id) - if entry is not None and entry.pack_id == UNKNOWN_BYTES32: - # chunk is buffered in PackWriter, not yet written to a pack file + entry = self.chunks.get(id) + if entry is None: if raise_missing: raise self.ObjectNotFound(id, str(self._location)) return None - if entry is not None: - pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size - else: - # N=1 fallback for cross-session reads without a populated index. - # obj_size=None tells store.load() to fetch the full file; safe for N=1 - # because each pack holds exactly one chunk. Once a Repository.chunks - # lazy-build property lands, this branch will be unreachable. - pack_id = id # N=1: pack_id == chunk_id - obj_offset = 0 - obj_size = None + if entry.pack_id == UNKNOWN_BYTES32: + # chunk is buffered in PackWriter, not yet flushed to a pack file + if raise_missing: + raise self.ObjectNotFound(id, str(self._location)) + return None + pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size id_hex = bin_to_hex(id) key = "packs/" + bin_to_hex(pack_id) try: @@ -650,10 +666,8 @@ class Repository: hdr_size = RepoObj.obj_header.size extra_size = 1024 - hdr_size # load a bit more, 1024b, reduces round trips load_size = hdr_size + extra_size - # obj_size is None only via the N=1 fallback above; clamp when available - # so a corrupted or malicious obj_size cannot cause an oversized read. - if obj_size is not None: - load_size = min(load_size, obj_size) + # clamp so a corrupted or malicious obj_size cannot cause an oversized read. + load_size = min(load_size, obj_size) obj = self.store.load(key, offset=obj_offset, size=load_size) hdr = obj[0:hdr_size] if len(hdr) != hdr_size: @@ -663,10 +677,8 @@ class Repository: # we did not get enough, need to load more, but not all. # this should be rare, as chunk metadata is rather small usually. retry_size = hdr_size + meta_size - if obj_size is not None: - # normally a no-op (meta_size <= obj_size - hdr_size for a healthy object); - # guards against a corrupted meta_size producing an oversize read. - retry_size = min(retry_size, obj_size) + # normally a no-op; guards against a corrupted meta_size producing an oversize read. + retry_size = min(retry_size, obj_size) obj = self.store.load(key, offset=obj_offset, size=retry_size) meta = obj[hdr_size : hdr_size + meta_size] if len(meta) != meta_size: From 066004800c4bfb3003986fc2ddfa99e00f800549 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Thu, 11 Jun 2026 13:13:33 +0530 Subject: [PATCH 08/16] repository: simplify chunks property; trigger lazy build before put() Remove overlay loop: put() accesses self.chunks first so PackWriter.chunks is updated before any write. --- src/borg/repository.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/borg/repository.py b/src/borg/repository.py index e31333cf4..4bacfa123 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -439,18 +439,12 @@ class Repository: """ChunkIndex mapping every known chunk id to its pack location. Built lazily on first access if set_chunk_index() has not been called. - Current-session put() entries (which carry the precise pack_id, offset, - and size set by PackWriter) are overlaid on top of the store-built - entries so they always win. """ if not self._chunks_initialized: from .cache import build_chunkindex_from_repo - built = build_chunkindex_from_repo(self) - for k, v in self._chunks.iteritems(): - built[k] = v - self._chunks = built - self._pack_writer.chunks = built + self._chunks = build_chunkindex_from_repo(self) + self._pack_writer.chunks = self._chunks self._chunks_initialized = True return self._chunks @@ -708,7 +702,8 @@ class Repository: data_size = len(data) if data_size > MAX_DATA_SIZE: raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]") - return self._pack_writer.add(id, data) # PackWriter updates _chunks internally + _ = self.chunks # ensure lazy build ran and PackWriter.chunks is current + return self._pack_writer.add(id, data) def delete(self, id, wait=True): """delete a repo object From 0161a54a71f528b4731964865dba91eb4c6aef7c Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Thu, 11 Jun 2026 19:19:34 +0530 Subject: [PATCH 09/16] ci/tests: exempt BORG_TESTONLY_SHA256_PACK_ID from clean_env; add concurrency to sha256 job env var was wiped before every test; sha256 job now gets its own concurrency group so it is not cancelled mid-run --- .github/workflows/ci.yml | 3 +++ src/borg/conftest.py | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 95288304b..9a8d5a08f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -710,6 +710,9 @@ jobs: runs-on: ubuntu-24.04 timeout-minutes: 90 continue-on-error: true + concurrency: + group: sha256-pack-id-${{ github.head_ref || github.ref }} + cancel-in-progress: false steps: - uses: actions/checkout@v6 diff --git a/src/borg/conftest.py b/src/borg/conftest.py index 557cb66f7..e8542e08e 100644 --- a/src/borg/conftest.py +++ b/src/borg/conftest.py @@ -21,7 +21,11 @@ from borg.testsuite.platform.platform_test import fakeroot_detected # noqa: E40 @pytest.fixture(autouse=True) def clean_env(tmpdir_factory, monkeypatch): # also avoid to use anything from the outside environment: - keys = [key for key in os.environ if key.startswith("BORG_") and key not in ("BORG_FUSE_IMPL",)] + keys = [ + key + for key in os.environ + if key.startswith("BORG_") and key not in ("BORG_FUSE_IMPL", "BORG_TESTONLY_SHA256_PACK_ID") + ] for key in keys: monkeypatch.delenv(key, raising=False) # avoid that we access / modify the user's normal .config / .cache directory: From abb32abf2dcfebb00625732cf37775b812241021 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Fri, 12 Jun 2026 12:14:07 +0530 Subject: [PATCH 10/16] repository: persist chunk index to repo cache on close() close() now writes the in-memory index back so the next session can resolve chunk locations instead of failing with ObjectNotFound. Empty incremental writes are skipped so a deleted index is not recreated as a stale, empty one. --- src/borg/cache.py | 9 ++++++++- src/borg/repository.py | 22 ++++++++++++---------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index 1a95d0408..2bba47b93 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -553,15 +553,22 @@ def write_chunkindex_to_repo_cache( # but for simplicity, we do it anyway. for key, existing in chunks.iteritems(only_new=incremental): chunks_to_write[key] = existing._replace(flags=ChunkIndex.F_NONE, size=0) + num_to_write = len(chunks_to_write) with io.BytesIO() as f: chunks_to_write.write(f) data = f.getvalue() - logger.debug(f"caching {len(chunks_to_write)} chunks (incremental={incremental}).") + logger.debug(f"caching {num_to_write} chunks (incremental={incremental}).") chunks_to_write.clear() # free memory of the temporary table if clear: # if we don't need the in-memory chunks index anymore: chunks.clear() # free memory, immediately new_hash = hashlib.sha256(data + CHUNKINDEX_HASH_SEED).hexdigest() + if num_to_write == 0 and not force_write: + # don't persist an empty incremental index: if it became the only cache/chunks.* (e.g. right + # after delete_chunkindex_cache()), build_chunkindex_from_repo() would return it as-is instead + # of rebuilding from the repo. with nothing new, the existing cache is already up to date. + logger.debug("no new chunks to cache; not writing an empty incremental chunk index.") + return new_hash cached_hashes = list_chunkindex_hashes(repository) if force_write or new_hash not in cached_hashes: # when an updated chunks index is stored into the cache, we also store its hash as part of the name. diff --git a/src/borg/repository.py b/src/borg/repository.py index 4bacfa123..6c1bbf6d3 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -297,8 +297,7 @@ class Repository: self.lock_wait = lock_wait self.exclusive = exclusive self._pack_writer = None - self._chunks = None # ChunkIndex; set by open(), replaced by set_chunk_index() - self._chunks_initialized = False + self._chunks = None # ChunkIndex; loaded lazily on first access to .chunks def __repr__(self): return f"<{self.__class__.__name__} {self._location}>" @@ -420,32 +419,31 @@ class Repository: # important: lock *after* making sure that there actually is an existing, supported repository. if lock: self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire() - self._chunks = ChunkIndex() - self._chunks_initialized = False - self._pack_writer = PackWriter(self.store, max_count=1, chunks=self._chunks) + self._chunks = None + self._pack_writer = PackWriter(self.store, max_count=1) self.opened = True def set_chunk_index(self, chunks): """Set the ChunkIndex get() uses to resolve pack locations. The caller retains ownership; Repository holds a borrowed reference. + Pass None to drop the current index: the next access to .chunks then + rebuilds it lazily, and close() will not persist a stale index. """ self._chunks = chunks - self._pack_writer.chunks = chunks # keep PackWriter in sync - self._chunks_initialized = True + self._pack_writer.chunks = chunks @property def chunks(self): """ChunkIndex mapping every known chunk id to its pack location. - Built lazily on first access if set_chunk_index() has not been called. + Built lazily on first access; persisted back to the repo cache at close(). """ - if not self._chunks_initialized: + if self._chunks is None: from .cache import build_chunkindex_from_repo self._chunks = build_chunkindex_from_repo(self) self._pack_writer.chunks = self._chunks - self._chunks_initialized = True return self._chunks def flush(self): @@ -456,6 +454,10 @@ class Repository: def close(self): if self._pack_writer is not None: assert not self._pack_writer._pieces, "PackWriter has unflushed chunks; call flush() before close()" + if self._chunks is not None and self.store_opened: + from .cache import write_chunkindex_to_repo_cache + + write_chunkindex_to_repo_cache(self, self._chunks, incremental=True) if self.lock: self.lock.release() self.lock = None From cae28f2dd12f9f8124e1457b19ee78dcb8fceaa3 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Fri, 12 Jun 2026 12:14:07 +0530 Subject: [PATCH 11/16] check: drop stale in-memory chunk index after --repair clears the cache finish() deletes cache/chunks.* and now also invalidates the live index, so close() does not write it back. The next repository access rebuilds it from actual repository contents. --- src/borg/archive.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/borg/archive.py b/src/borg/archive.py index 063bfa513..02d44ffed 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -2167,6 +2167,9 @@ class ArchiveChecker: # we may have deleted chunks, remove the chunks index cache! logger.info("Deleting chunks cache in repository - next repository access will cause a rebuild.") delete_chunkindex_cache(self.repository) + # the in-memory index is stale too; drop it so close() does not re-cache it into the + # cache we just deleted. the next repository access rebuilds it from the repo. + self.repository.set_chunk_index(None) logger.info("Writing Manifest.") self.manifest.write() From 8b5c2d857e398463fa0b2228e730195d837cba3c Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Fri, 12 Jun 2026 12:53:20 +0530 Subject: [PATCH 12/16] repository: invalidate in-memory chunk index when its cache is deleted delete_chunkindex_cache() now also drops the repository's in-memory index, so close() cannot persist a stale copy and callers no longer clear it by hand. get() now raises a distinct PackLocationUnknown when a chunk is indexed but not yet flushed, rather than a plain ObjectNotFound. --- src/borg/archive.py | 6 ++---- src/borg/cache.py | 5 +++++ src/borg/repository.py | 28 ++++++++++++++++++++++++---- 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 02d44ffed..9c1f9015f 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -2164,12 +2164,10 @@ class ArchiveChecker: def finish(self): if self.repair: - # we may have deleted chunks, remove the chunks index cache! + # we may have deleted chunks. delete_chunkindex_cache() removes the on-disk cache and + # drops the stale in-memory index, so the next repository access rebuilds it from the repo. logger.info("Deleting chunks cache in repository - next repository access will cause a rebuild.") delete_chunkindex_cache(self.repository) - # the in-memory index is stale too; drop it so close() does not re-cache it into the - # cache we just deleted. the next repository access rebuilds it from the repo. - self.repository.set_chunk_index(None) logger.info("Writing Manifest.") self.manifest.write() diff --git a/src/borg/cache.py b/src/borg/cache.py index 2bba47b93..bbd99d409 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -535,6 +535,11 @@ def delete_chunkindex_cache(repository): except StoreObjectNotFound: pass logger.debug(f"cached chunk indexes deleted: {hashes}") + # the in-memory index is now stale; drop it so close() does not write it back into the + # cache we just deleted. the next .chunks access rebuilds it from actual repo contents. + invalidate = getattr(repository, "invalidate_chunk_index", None) + if invalidate is not None: + invalidate() CHUNKINDEX_HASH_SEED = b"0001" # increment seed to invalidate old chunk indexes diff --git a/src/borg/repository.py b/src/borg/repository.py index 6c1bbf6d3..6c0a5df18 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -219,6 +219,12 @@ class Repository: id = bin_to_hex(id) super().__init__(id, repo) + class PackLocationUnknown(ObjectNotFound): + """Object with key {} is indexed but its pack location is unresolved in repository {}.""" + + # distinct from a genuine miss: the chunk is in the index but still buffered (not flushed). + # subclasses ObjectNotFound so existing "except ObjectNotFound" handlers still catch it. + class ParentPathDoesNotExist(Error): """The parent path of the repository directory [{}] does not exist.""" @@ -427,12 +433,21 @@ class Repository: """Set the ChunkIndex get() uses to resolve pack locations. The caller retains ownership; Repository holds a borrowed reference. - Pass None to drop the current index: the next access to .chunks then - rebuilds it lazily, and close() will not persist a stale index. + To drop a stale index, use invalidate_chunk_index() instead. """ self._chunks = chunks self._pack_writer.chunks = chunks + def invalidate_chunk_index(self): + """Drop the in-memory chunk index so close() will not persist a stale copy. + + Called when the on-disk chunk index cache is deleted; the next access to + .chunks rebuilds the index from actual repository contents. + """ + self._chunks = None + if self._pack_writer is not None: + self._pack_writer.chunks = None + @property def chunks(self): """ChunkIndex mapping every known chunk id to its pack location. @@ -455,6 +470,8 @@ class Repository: if self._pack_writer is not None: assert not self._pack_writer._pieces, "PackWriter has unflushed chunks; call flush() before close()" if self._chunks is not None and self.store_opened: + # incremental write: a no-op unless chunks were added this session (only F_NEW + # entries are serialized, and an empty incremental write is skipped downstream). from .cache import write_chunkindex_to_repo_cache write_chunkindex_to_repo_cache(self, self._chunks, incremental=True) @@ -646,9 +663,12 @@ class Repository: raise self.ObjectNotFound(id, str(self._location)) return None if entry.pack_id == UNKNOWN_BYTES32: - # chunk is buffered in PackWriter, not yet flushed to a pack file + # chunk is buffered in PackWriter, not yet flushed to a pack. at N=1 put() flushes + # immediately, so reaching here points at a flush / index-update ordering bug, not a + # genuinely missing object. surface it distinctly instead of a plain ObjectNotFound. if raise_missing: - raise self.ObjectNotFound(id, str(self._location)) + logger.error(f"pack location for object {bin_to_hex(id)} is unresolved (chunk not flushed).") + raise self.PackLocationUnknown(id, str(self._location)) return None pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size id_hex = bin_to_hex(id) From 445f2605ef30ea1e1485e0e042489ed9990d716b Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Fri, 12 Jun 2026 16:22:07 +0530 Subject: [PATCH 13/16] repository: make the chunk index single-owned by the repository PackWriter and the cache now share repository.chunks instead of keeping their own copies; set_chunk_index is gone (use the writable .chunks property). The cache consolidates fragmented cache/chunks.* files so they no longer grow by one per backup until the next check or compact. --- src/borg/cache.py | 21 ++++++- src/borg/repository.py | 84 +++++++++++++++++---------- src/borg/testsuite/cache_test.py | 26 +++++++++ src/borg/testsuite/repository_test.py | 10 ++-- 4 files changed, 101 insertions(+), 40 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index bbd99d409..ee03f0495 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -690,10 +690,25 @@ class ChunksMixin: @property def chunks(self): if self._chunks is None: - self._chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True) - self.repository.set_chunk_index(self._chunks) + # the repository owns the one and only chunk index; use it rather than + # building a second one and pushing it back into the repository. + self._chunks = self.repository.chunks + # repository.chunks must stay safe for read-only repositories (get() builds it too), + # so it never writes. consolidating the cached chunk index is a write-context job and + # belongs here in the cache: without it, each backup's incremental save would leave + # another cache/chunks.* fragment for the next run to merge, growing without bound. + self._consolidate_chunkindex_cache() return self._chunks + def _consolidate_chunkindex_cache(self): + # if the repo holds more than one cached chunk index fragment, replace them with a single + # consolidated one (the in-memory index already merged them all). incremental=False writes + # every entry regardless of its F_NEW flag, which the repository may have already cleared. + if len(list_chunkindex_hashes(self.repository)) > 1: + write_chunkindex_to_repo_cache( + self.repository, self._chunks, incremental=False, clear=False, force_write=True, delete_other=True + ) + def seen_chunk(self, id, size=None): entry = self.chunks.get(id) entry_exists = entry is not None @@ -879,7 +894,7 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin): def wipe_cache(self): logger.warning("Discarding incompatible cache and forcing a cache rebuild") self._chunks = ChunkIndex() - self.repository.set_chunk_index(self._chunks) + self.repository.chunks = self._chunks self.cache_config.manifest_id = "" self.cache_config._config.set("cache", "manifest", "") diff --git a/src/borg/repository.py b/src/borg/repository.py index 6c0a5df18..0d969ffab 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -103,10 +103,15 @@ class PackWriter: """Buffers chunks into a pack file and writes to the store when full. Collects (chunk_id, cdata) pairs in a list and flushes once max_count is - reached. If a ChunkIndex is provided via the chunks parameter, PackWriter - maintains it directly: each add() marks the chunk as pending - (pack_id=UNKNOWN_BYTES32) with the correct offset and size; flush() then - updates all pending entries with the real pack_id once the pack is on disk. + reached. PackWriter maintains the ChunkIndex directly: each add() marks the + chunk as pending (pack_id=UNKNOWN_BYTES32) with the correct offset and size; + flush() then updates all pending entries with the real pack_id once the pack + is on disk. + + The index is not owned here. When constructed with a repository, the writer + uses that repository's single, authoritative index (see the chunks property), + so there is never a second copy to keep in sync. Constructing without a + repository (e.g. unit tests) gives the writer a private index. At max_count=1 (N=1 phase) each put() maps exactly one chunk to one pack, so pack_id == chunk_id and the naming scheme is unchanged from before. @@ -114,13 +119,27 @@ class PackWriter: this class's interface. """ - def __init__(self, store, *, max_count=1, chunks=None): + def __init__(self, store, *, max_count=1, chunks=None, repository=None): self.store = store self.max_count = max_count - self.chunks = chunks if chunks is not None else ChunkIndex() + self.repository = repository # when set, the one and only index lives there + self._chunks = chunks # private index for repository-less use (tests); built lazily self._pieces = [] # list of (chunk_id, cdata) self._current_offset = 0 # byte offset of the next chunk within the current pack + @property + def chunks(self): + """The ChunkIndex this writer updates. + + With a repository, this is the repository's single index (shared, not copied). + Without one, the writer owns a private ChunkIndex built on first use. + """ + if self.repository is not None: + return self.repository.chunks + if self._chunks is None: + self._chunks = ChunkIndex() + return self._chunks + def add(self, chunk_id, cdata): """Buffer a chunk. Returns flush results if the pack is now full, else None.""" # Mark chunk as pending: real offset/size are known now; pack_id not yet. @@ -426,41 +445,41 @@ class Repository: if lock: self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire() self._chunks = None - self._pack_writer = PackWriter(self.store, max_count=1) + self._pack_writer = PackWriter(self.store, max_count=1, repository=self) self.opened = True - def set_chunk_index(self, chunks): - """Set the ChunkIndex get() uses to resolve pack locations. - - The caller retains ownership; Repository holds a borrowed reference. - To drop a stale index, use invalidate_chunk_index() instead. - """ - self._chunks = chunks - self._pack_writer.chunks = chunks - - def invalidate_chunk_index(self): - """Drop the in-memory chunk index so close() will not persist a stale copy. - - Called when the on-disk chunk index cache is deleted; the next access to - .chunks rebuilds the index from actual repository contents. - """ - self._chunks = None - if self._pack_writer is not None: - self._pack_writer.chunks = None - @property def chunks(self): """ChunkIndex mapping every known chunk id to its pack location. - Built lazily on first access; persisted back to the repo cache at close(). + This property is the single owner of the in-memory index: get() resolves + pack locations through it, PackWriter updates it, and the Cache reads it + from here rather than building its own. Built lazily on first access and + persisted back to the repo cache at close(). """ if self._chunks is None: from .cache import build_chunkindex_from_repo self._chunks = build_chunkindex_from_repo(self) - self._pack_writer.chunks = self._chunks return self._chunks + @chunks.setter + def chunks(self, value): + # The index is normally built lazily; this setter exists for the few callers + # that must install a specific index (e.g. wiping the cache, or restoring an + # index captured before close()). To drop a stale index so it rebuilds, do not + # assign None here -- call invalidate_chunk_index() instead. + self._chunks = value + + def invalidate_chunk_index(self): + """Drop the in-memory chunk index so close() will not persist a stale copy. + + Called when the on-disk chunk index cache is deleted; the next access to + .chunks rebuilds the index from actual repository contents. PackWriter + reads the index through this Repository, so it follows automatically. + """ + self._chunks = None + def flush(self): """Flush any buffered pack writer chunks.""" if self._pack_writer is not None: @@ -469,9 +488,10 @@ class Repository: def close(self): if self._pack_writer is not None: assert not self._pack_writer._pieces, "PackWriter has unflushed chunks; call flush() before close()" - if self._chunks is not None and self.store_opened: - # incremental write: a no-op unless chunks were added this session (only F_NEW - # entries are serialized, and an empty incremental write is skipped downstream). + # close() may run again after the store was already closed (idempotent close), so we can + # only persist while the store is open. Persisting is also a no-op unless chunks were added + # this session (only F_NEW entries are serialized, and an empty incremental write is skipped). + if self.store_opened and self._chunks is not None: from .cache import write_chunkindex_to_repo_cache write_chunkindex_to_repo_cache(self, self._chunks, incremental=True) @@ -724,7 +744,7 @@ class Repository: data_size = len(data) if data_size > MAX_DATA_SIZE: raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]") - _ = self.chunks # ensure lazy build ran and PackWriter.chunks is current + # PackWriter shares this repository's index, so add() triggers the lazy build itself. return self._pack_writer.add(id, data) def delete(self, id, wait=True): diff --git a/src/borg/testsuite/cache_test.py b/src/borg/testsuite/cache_test.py index b50516f73..c65472844 100644 --- a/src/borg/testsuite/cache_test.py +++ b/src/borg/testsuite/cache_test.py @@ -110,3 +110,29 @@ def test_read_chunkindex_from_repo_cache_missing(tmp_path): # Try to load a non-existent cache entry — should return None, not raise. result = read_chunkindex_from_repo_cache(repository, "f" * 64) assert result is None + + +def test_chunkindex_cache_consolidated_on_access(tmp_path): + """ChunksMixin.chunks collapses multiple cached chunk-index fragments into a single one. + + Without consolidation every backup's incremental save would leave another cache/chunks.* + behind for the next run to merge, so the fragments would grow without bound. + """ + from ..cache import ChunksMixin, write_chunkindex_to_repo_cache, list_chunkindex_hashes + from ..hashindex import ChunkIndex, ChunkIndexEntry + + repository_location = os.fspath(tmp_path / "repository") + with Repository(repository_location, exclusive=True, create=True) as repository: + # seed extra fragments on top of the empty one written at repo creation + for h in (H(1), H(2)): + ci = ChunkIndex() + ci[h] = ChunkIndexEntry(ChunkIndex.F_NEW, 0, h, 0, 4) + write_chunkindex_to_repo_cache(repository, ci, incremental=False, force_write=True) + assert len(list_chunkindex_hashes(repository)) > 1 + + cache = ChunksMixin() + cache.repository = repository + index = cache.chunks # binds the repository index and consolidates the fragments + + assert len(list_chunkindex_hashes(repository)) == 1 + assert H(1) in index and H(2) in index diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index a0be42ece..73fa77a77 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -87,7 +87,7 @@ def test_basic_operations(repo_fixtures, request): repository.get(key50) chunks = repository._chunks # capture index before close with reopen(repository) as repository: - repository.set_chunk_index(chunks) + repository.chunks = chunks with pytest.raises(Repository.ObjectNotFound): repository.get(key50) for x in range(100): @@ -216,7 +216,7 @@ def test_get_read_data_false_with_range(tmp_path): chunks.update_pack_info([(id1, pack_id, 0, len(chunk1))]) chunks.add(id2, len(chunk2)) chunks.update_pack_info([(id2, pack_id, len(chunk1), len(chunk2))]) - repository.set_chunk_index(chunks) + repository.chunks = chunks assert repository.get(id1, read_data=False) == chunk1[:hdr_size] assert repository.get(id2, read_data=False) == chunk2[:hdr_size] @@ -233,13 +233,13 @@ def test_get_read_data_false_large_meta(tmp_path): chunks = ChunkIndex() chunks.add(chunk_id, len(chunk)) chunks.update_pack_info([(chunk_id, pack_id, 0, len(chunk))]) - repository.set_chunk_index(chunks) + repository.chunks = chunks result = repository.get(chunk_id, read_data=False) assert result == chunk[: hdr_size + len(big_meta)] def test_get_uses_chunk_index_location(tmp_path): - # get() routes to the correct pack and offset when a ChunkIndex is set via set_chunk_index(). + # get() routes to the correct pack and offset when a ChunkIndex is assigned via the chunks property. chunk1 = fchunk(b"FIRST") chunk2 = fchunk(b"SECOND") pack = chunk1 + chunk2 @@ -253,7 +253,7 @@ def test_get_uses_chunk_index_location(tmp_path): chunks.update_pack_info([(id1, pack_id, 0, len(chunk1))]) chunks.add(id2, len(chunk2)) chunks.update_pack_info([(id2, pack_id, len(chunk1), len(chunk2))]) - repository.set_chunk_index(chunks) + repository.chunks = chunks assert repository.get(id1) == chunk1 assert repository.get(id2) == chunk2 From e6010b57931b7a4f2d92658ca80844a2dc9724cf Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Sat, 13 Jun 2026 14:47:07 +0530 Subject: [PATCH 14/16] repository: address chunk-index review feedback PackWriter requires a real/explicit index (no silent private one); PackLocationUnknown is now a loud ErrorWithTraceback and get() raises it unconditionally; close() persists via is_chunk_index_loaded/self.chunks; delete_chunkindex_cache calls invalidate_chunk_index() directly; drop per-access fragment consolidation (compact reclaims instead); capture BORG_TESTONLY_SHA256_PACK_ID at import; remove dead _current_offset; update tests. --- src/borg/cache.py | 32 +++++------ src/borg/conftest.py | 9 ++- src/borg/repository.py | 82 +++++++++++++++++---------- src/borg/testsuite/cache_test.py | 18 +++--- src/borg/testsuite/repository_test.py | 40 ++++++++++--- 5 files changed, 116 insertions(+), 65 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index ee03f0495..7698f7ea7 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -537,9 +537,7 @@ def delete_chunkindex_cache(repository): logger.debug(f"cached chunk indexes deleted: {hashes}") # the in-memory index is now stale; drop it so close() does not write it back into the # cache we just deleted. the next .chunks access rebuilds it from actual repo contents. - invalidate = getattr(repository, "invalidate_chunk_index", None) - if invalidate is not None: - invalidate() + repository.invalidate_chunk_index() CHUNKINDEX_HASH_SEED = b"0001" # increment seed to invalidate old chunk indexes @@ -654,6 +652,13 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi num_chunks = 0 # The repo says it has these chunks, so we assume they are referenced/used chunks. # We do not know the plaintext size (!= stored_size), thus we set size = 0. + # + # IMPORTANT (N=1 only): listing yields pack_ids, not per-chunk locations. We can only + # reconstruct the index here under the N=1 assumption -- pack_id == chunk_id, one chunk per + # pack at offset 0 spanning the whole pack. At N>1 this is wrong: a cold rebuild would have to + # open each pack and read its header to recover the per-chunk offsets and sizes. Until that + # exists, Repository.get()'s range-load is only correct while a persisted/cached chunk index + # is available; a cold rebuild from a bare repo listing silently falls back to N=1 semantics. for pack_id, pack_size in repo_lister(repository, limit=LIST_SCAN_LIMIT): num_chunks += 1 chunk_id = pack_id # N=1: chunk_id == pack_id @@ -693,22 +698,15 @@ class ChunksMixin: # the repository owns the one and only chunk index; use it rather than # building a second one and pushing it back into the repository. self._chunks = self.repository.chunks - # repository.chunks must stay safe for read-only repositories (get() builds it too), - # so it never writes. consolidating the cached chunk index is a write-context job and - # belongs here in the cache: without it, each backup's incremental save would leave - # another cache/chunks.* fragment for the next run to merge, growing without bound. - self._consolidate_chunkindex_cache() + # note: we deliberately do NOT consolidate the cached chunk index fragments here. + # each backup writes a small incremental cache/chunks.* fragment (only its new chunks), + # which is cheap. collapsing them all into one big fragment on every run would re-upload + # the whole index and, with delete_other, invalidate every other client's fragments -- + # a multi-GB churn per run on a shared repo. fragment count is reclaimed by `borg compact` + # (build_chunkindex_from_repo with cache_immediately). a size/threshold-based policy that + # bounds the fragment count without re-uploading large fragments can be added later. return self._chunks - def _consolidate_chunkindex_cache(self): - # if the repo holds more than one cached chunk index fragment, replace them with a single - # consolidated one (the in-memory index already merged them all). incremental=False writes - # every entry regardless of its F_NEW flag, which the repository may have already cleared. - if len(list_chunkindex_hashes(self.repository)) > 1: - write_chunkindex_to_repo_cache( - self.repository, self._chunks, incremental=False, clear=False, force_write=True, delete_other=True - ) - def seen_chunk(self, id, size=None): entry = self.chunks.get(id) entry_exists = entry is not None diff --git a/src/borg/conftest.py b/src/borg/conftest.py index e8542e08e..90729ff39 100644 --- a/src/borg/conftest.py +++ b/src/borg/conftest.py @@ -21,11 +21,10 @@ from borg.testsuite.platform.platform_test import fakeroot_detected # noqa: E40 @pytest.fixture(autouse=True) def clean_env(tmpdir_factory, monkeypatch): # also avoid to use anything from the outside environment: - keys = [ - key - for key in os.environ - if key.startswith("BORG_") and key not in ("BORG_FUSE_IMPL", "BORG_TESTONLY_SHA256_PACK_ID") - ] + # note: BORG_TESTONLY_SHA256_PACK_ID is intentionally NOT exempted here. The repository + # module captures it at import time (repository.FORCE_SHA256_PACK_ID), before this fixture + # runs, so wiping it per test is harmless and the env stays fully isolated. + keys = [key for key in os.environ if key.startswith("BORG_") and key not in ("BORG_FUSE_IMPL",)] for key in keys: monkeypatch.delenv(key, raising=False) # avoid that we access / modify the user's normal .config / .cache directory: diff --git a/src/borg/repository.py b/src/borg/repository.py index 0d969ffab..8a2f10e32 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -99,19 +99,26 @@ def build_rest_backend(location): return REST(base_url="http://stdio-backend", command=rest_serve_command(location)) +# Test-only switch: force sha256 pack_ids even at N=1, to expose code that still assumes +# pack_id == chunk_id. Read once here, at import time, on purpose: +# - it happens before the per-test clean_env fixture wipes BORG_* vars, so the tox env that +# sets this var does not need a clean_env exemption (the captured value survives the wipe); +# - flush() then checks a plain bool instead of touching os.environ on every write. +FORCE_SHA256_PACK_ID = os.environ.get("BORG_TESTONLY_SHA256_PACK_ID") == "1" + + class PackWriter: """Buffers chunks into a pack file and writes to the store when full. Collects (chunk_id, cdata) pairs in a list and flushes once max_count is reached. PackWriter maintains the ChunkIndex directly: each add() marks the - chunk as pending (pack_id=UNKNOWN_BYTES32) with the correct offset and size; - flush() then updates all pending entries with the real pack_id once the pack - is on disk. + chunk as pending (pack_id=UNKNOWN_BYTES32); flush() then assigns the real + pack_id, offset and size to every pending entry once the pack is on disk. - The index is not owned here. When constructed with a repository, the writer - uses that repository's single, authoritative index (see the chunks property), - so there is never a second copy to keep in sync. Constructing without a - repository (e.g. unit tests) gives the writer a private index. + The index is not owned here. Construction requires either a repository or an + explicit chunks index; there is no silent default. With a repository, the writer + uses that repository's single, authoritative index (see the chunks property), so + there is never a second copy to keep in sync. Unit tests pass an explicit index. At max_count=1 (N=1 phase) each put() maps exactly one chunk to one pack, so pack_id == chunk_id and the naming scheme is unchanged from before. @@ -120,33 +127,35 @@ class PackWriter: """ def __init__(self, store, *, max_count=1, chunks=None, repository=None): + if repository is None and chunks is None: + raise ValueError("PackWriter requires either a repository or an explicit chunks index") self.store = store self.max_count = max_count self.repository = repository # when set, the one and only index lives there - self._chunks = chunks # private index for repository-less use (tests); built lazily + self._chunks = chunks # explicit index for repository-less use (tests) self._pieces = [] # list of (chunk_id, cdata) - self._current_offset = 0 # byte offset of the next chunk within the current pack @property def chunks(self): """The ChunkIndex this writer updates. With a repository, this is the repository's single index (shared, not copied). - Without one, the writer owns a private ChunkIndex built on first use. + Without one, it is the explicit index passed at construction. """ if self.repository is not None: return self.repository.chunks - if self._chunks is None: - self._chunks = ChunkIndex() return self._chunks def add(self, chunk_id, cdata): """Buffer a chunk. Returns flush results if the pack is now full, else None.""" - # Mark chunk as pending: real offset/size are known now; pack_id not yet. + # Mark the chunk as pending (pack_id=UNKNOWN_BYTES32). flush() assigns the real + # pack_id and offset for every piece, so the placeholder offset 0 here is never read: + # get() refuses a pending entry (PackLocationUnknown) before any offset would matter. + # Precondition: callers add only chunks not already stored (the cache dedups via + # seen_chunk() first), so add(chunk_id, 0) never resets a real size on an existing entry. self.chunks.add(chunk_id, 0) # size filled in by cache layer - self.chunks.update_pack_info([(chunk_id, UNKNOWN_BYTES32, self._current_offset, len(cdata))]) + self.chunks.update_pack_info([(chunk_id, UNKNOWN_BYTES32, 0, len(cdata))]) self._pieces.append((chunk_id, cdata)) - self._current_offset += len(cdata) if len(self._pieces) >= self.max_count: return self.flush() return None @@ -170,9 +179,9 @@ class PackWriter: # (backward-compatible file naming: packs/{chunk_id_hex}). # N>1: the pack contains multiple chunks; use SHA256(pack_bytes) so the # file is content-addressed and borgstore can verify/cache it. - # BORG_TESTONLY_SHA256_PACK_ID: always use sha256 even at N=1, exposing code - # that still assumes pack_id == chunk_id. - if self.max_count == 1 and os.environ.get("BORG_TESTONLY_SHA256_PACK_ID") != "1": + # BORG_TESTONLY_SHA256_PACK_ID (see FORCE_SHA256_PACK_ID): always use sha256 even at + # N=1, exposing code that still assumes pack_id == chunk_id. + if self.max_count == 1 and not FORCE_SHA256_PACK_ID: pack_id = self._pieces[0][0] # N=1: pack_id == chunk_id else: pack_id = sha256(pack_data).digest() @@ -190,7 +199,6 @@ class PackWriter: self.store.store(key, pack_data) finally: self._pieces = [] # reset even on failure to prevent re-bundling a failed chunk - self._current_offset = 0 self.chunks.update_pack_info(results) # replace UNKNOWN_BYTES32 with real pack_id return results @@ -238,11 +246,18 @@ class Repository: id = bin_to_hex(id) super().__init__(id, repo) - class PackLocationUnknown(ObjectNotFound): + class PackLocationUnknown(ErrorWithTraceback): """Object with key {} is indexed but its pack location is unresolved in repository {}.""" - # distinct from a genuine miss: the chunk is in the index but still buffered (not flushed). - # subclasses ObjectNotFound so existing "except ObjectNotFound" handlers still catch it. + exit_mcode = 22 + + # this is a code bug, not a genuine miss: the chunk is in the index but still buffered + # (not flushed). deliberately NOT a subclass of ObjectNotFound, so the usual + # "except ObjectNotFound" handlers do not swallow it -- it surfaces loudly with a traceback. + def __init__(self, id, repo): + if isinstance(id, bytes): + id = bin_to_hex(id) + super().__init__(id, repo) class ParentPathDoesNotExist(Error): """The parent path of the repository directory [{}] does not exist.""" @@ -480,6 +495,16 @@ class Repository: """ self._chunks = None + @property + def is_chunk_index_loaded(self): + """Whether the in-memory chunk index has been built/loaded this session. + + Lets the few flag-style checks ask "is it loaded?" without going through the + .chunks property (which would build it on demand). self._chunks should not be + read directly elsewhere; use .chunks for the index or this for the loaded flag. + """ + return self._chunks is not None + def flush(self): """Flush any buffered pack writer chunks.""" if self._pack_writer is not None: @@ -491,10 +516,11 @@ class Repository: # close() may run again after the store was already closed (idempotent close), so we can # only persist while the store is open. Persisting is also a no-op unless chunks were added # this session (only F_NEW entries are serialized, and an empty incremental write is skipped). - if self.store_opened and self._chunks is not None: + # guard on is_chunk_index_loaded so we never trigger a lazy rebuild just to persist on close. + if self.store_opened and self.is_chunk_index_loaded: from .cache import write_chunkindex_to_repo_cache - write_chunkindex_to_repo_cache(self, self._chunks, incremental=True) + write_chunkindex_to_repo_cache(self, self.chunks, incremental=True) if self.lock: self.lock.release() self.lock = None @@ -685,11 +711,9 @@ class Repository: if entry.pack_id == UNKNOWN_BYTES32: # chunk is buffered in PackWriter, not yet flushed to a pack. at N=1 put() flushes # immediately, so reaching here points at a flush / index-update ordering bug, not a - # genuinely missing object. surface it distinctly instead of a plain ObjectNotFound. - if raise_missing: - logger.error(f"pack location for object {bin_to_hex(id)} is unresolved (chunk not flushed).") - raise self.PackLocationUnknown(id, str(self._location)) - return None + # genuinely missing object. this is a code bug, so we crash loudly regardless of + # raise_missing instead of pretending the object is absent. + raise self.PackLocationUnknown(id, str(self._location)) pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size id_hex = bin_to_hex(id) key = "packs/" + bin_to_hex(pack_id) diff --git a/src/borg/testsuite/cache_test.py b/src/borg/testsuite/cache_test.py index c65472844..18c3f480e 100644 --- a/src/borg/testsuite/cache_test.py +++ b/src/borg/testsuite/cache_test.py @@ -112,11 +112,12 @@ def test_read_chunkindex_from_repo_cache_missing(tmp_path): assert result is None -def test_chunkindex_cache_consolidated_on_access(tmp_path): - """ChunksMixin.chunks collapses multiple cached chunk-index fragments into a single one. +def test_chunkindex_cache_not_consolidated_on_access(tmp_path): + """ChunksMixin.chunks binds the repository index without collapsing the cached fragments. - Without consolidation every backup's incremental save would leave another cache/chunks.* - behind for the next run to merge, so the fragments would grow without bound. + Each backup leaves a small incremental cache/chunks.* fragment; collapsing them all into one + on every access would re-upload the whole index and, with delete_other, invalidate every other + client's fragments. Fragment count is reclaimed by `borg compact`, not on every read here. """ from ..cache import ChunksMixin, write_chunkindex_to_repo_cache, list_chunkindex_hashes from ..hashindex import ChunkIndex, ChunkIndexEntry @@ -128,11 +129,14 @@ def test_chunkindex_cache_consolidated_on_access(tmp_path): ci = ChunkIndex() ci[h] = ChunkIndexEntry(ChunkIndex.F_NEW, 0, h, 0, 4) write_chunkindex_to_repo_cache(repository, ci, incremental=False, force_write=True) - assert len(list_chunkindex_hashes(repository)) > 1 + before = len(list_chunkindex_hashes(repository)) + assert before > 1 cache = ChunksMixin() cache.repository = repository - index = cache.chunks # binds the repository index and consolidates the fragments + index = cache.chunks # binds the repository index; must NOT collapse the fragments - assert len(list_chunkindex_hashes(repository)) == 1 + # fragments are left intact (no consolidation side effect) ... + assert len(list_chunkindex_hashes(repository)) == before + # ... and the in-memory index still resolves every seeded chunk assert H(1) in index and H(2) in index diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 73fa77a77..f87692711 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -85,9 +85,9 @@ def test_basic_operations(repo_fixtures, request): repository.delete(key50) with pytest.raises(Repository.ObjectNotFound): repository.get(key50) - chunks = repository._chunks # capture index before close + # no manual hand-off of the index across reopen: close() persisted it to the repo cache, + # and the freshly opened repo rebuilds .chunks from there (or by listing the repo) on its own. with reopen(repository) as repository: - repository.chunks = chunks with pytest.raises(Repository.ObjectNotFound): repository.get(key50) for x in range(100): @@ -96,6 +96,32 @@ def test_basic_operations(repo_fixtures, request): assert pdchunk(repository.get(H(x))) == b"SOMEDATA" +def test_chunk_index_persisted_on_close(tmp_path): + # close() must serialize the live chunk index into the repo cache, so a freshly opened + # repo can resolve pack locations without any manual hand-off. This proves the round-trip + # by reading the persisted index back directly (not via a repo rescan, which at N=1 would + # reconstruct the same entries and so could mask a broken persist step). + from ..cache import list_chunkindex_hashes, read_chunkindex_from_repo_cache + + location = os.fspath(tmp_path / "repo") + with Repository(location, exclusive=True, create=True) as repository: + for x in range(10): + repository.put(H(x), fchunk(b"DATA")) # N=1: each put() flushes immediately + # reopen and read the cached fragments straight from disk + with Repository(location, exclusive=True) as repository: + persisted = ChunkIndex() + for hash in list_chunkindex_hashes(repository): + fragment = read_chunkindex_from_repo_cache(repository, hash) + if fragment is not None: + for k, v in fragment.items(): + persisted[k] = v + for x in range(10): + assert H(x) in persisted # close() actually wrote this session's chunks + # and the reopened repo resolves them end to end + for x in range(10): + assert pdchunk(repository.get(H(x))) == b"DATA" + + def test_read_data(repo_fixtures, request): with get_repository_from_fixture(repo_fixtures, request) as repository: meta, data = b"meta", b"data" @@ -162,12 +188,12 @@ class MockStore: def test_pack_writer_returns_none_when_not_full(): - pw = PackWriter(MockStore(), max_count=2) + pw = PackWriter(MockStore(), max_count=2, chunks=ChunkIndex()) assert pw.add(b"a" * 32, b"data") is None def test_pack_writer_flush_returns_none_when_empty(): - pw = PackWriter(MockStore(), max_count=1) + pw = PackWriter(MockStore(), max_count=1, chunks=ChunkIndex()) assert pw.flush() is None @@ -175,7 +201,7 @@ def test_pack_writer_n1_flush(): store = MockStore() chunk_id = b"c" * 32 cdata = b"payload" - pw = PackWriter(store, max_count=1) + pw = PackWriter(store, max_count=1, chunks=ChunkIndex()) results = pw.add(chunk_id, cdata) assert results is not None assert len(results) == 1 @@ -190,7 +216,7 @@ def test_pack_writer_n2_flush(): store = MockStore() id1, id2 = b"a" * 32, b"b" * 32 data1, data2 = b"first", b"second" - pw = PackWriter(store, max_count=2) + pw = PackWriter(store, max_count=2, chunks=ChunkIndex()) assert pw.add(id1, data1) is None results = pw.add(id2, data2) assert results is not None @@ -276,7 +302,7 @@ def test_pack_writer_final_partial_pack_uses_sha256(): store = MockStore() chunk_id = b"d" * 32 cdata = b"solo" - pw = PackWriter(store, max_count=3) + pw = PackWriter(store, max_count=3, chunks=ChunkIndex()) assert pw.add(chunk_id, cdata) is None results = pw.flush() assert results is not None From f917c902566e1bc720ac8488f62ebf954a10bb22 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Sun, 14 Jun 2026 15:40:43 +0530 Subject: [PATCH 15/16] repository: roll back chunk index on failed pack store flush() drops the entries add() pre-marked when store.store() raises, so a never-stored chunk is not left indexed for dedup. adds rollback tests, bumps large-meta test to 5KB, drops security wording from the read_data=False clamp comments. --- src/borg/repository.py | 23 +++++++++++++-- src/borg/testsuite/repository_test.py | 40 ++++++++++++++++++++++++++- 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/src/borg/repository.py b/src/borg/repository.py index 8a2f10e32..d8198b6d8 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -153,6 +153,8 @@ class PackWriter: # get() refuses a pending entry (PackLocationUnknown) before any offset would matter. # Precondition: callers add only chunks not already stored (the cache dedups via # seen_chunk() first), so add(chunk_id, 0) never resets a real size on an existing entry. + # This is also what keeps ChunkIndex.add's "v.size == 0 or v.size == size" assertion happy: + # a fresh id has no entry, so the size=0 we pass here is never compared against a real size. self.chunks.add(chunk_id, 0) # size filled in by cache layer self.chunks.update_pack_info([(chunk_id, UNKNOWN_BYTES32, 0, len(cdata))]) self._pieces.append((chunk_id, cdata)) @@ -195,8 +197,22 @@ class PackWriter: offset += obj_size key = "packs/" + bin_to_hex(pack_id) + # ids this flush pre-marked in the index via add() (pack_id still UNKNOWN_BYTES32). + pending_ids = [chunk_id for chunk_id, _ in self._pieces] try: self.store.store(key, pack_data) + except Exception: + # The pack was not durably stored, so every entry add() pre-marked for it now + # points at data that does not exist. Leaving them would make seen_chunk() report + # these ids as present, letting a later identical chunk dedup against bytes that were + # never written -- silent data loss. These entries were created this session and never + # received a real pack_id, so dropping them restores the index to its pre-add() state + # (matching master, where the index only ever reflected successfully stored chunks). + for chunk_id in pending_ids: + entry = self.chunks.get(chunk_id) + if entry is not None and entry.pack_id == UNKNOWN_BYTES32: + del self.chunks[chunk_id] + raise finally: self._pieces = [] # reset even on failure to prevent re-bundling a failed chunk self.chunks.update_pack_info(results) # replace UNKNOWN_BYTES32 with real pack_id @@ -726,7 +742,10 @@ class Repository: hdr_size = RepoObj.obj_header.size extra_size = 1024 - hdr_size # load a bit more, 1024b, reduces round trips load_size = hdr_size + extra_size - # clamp so a corrupted or malicious obj_size cannot cause an oversized read. + # keep the read inside this object: at N>1 a pack holds neighbouring objects, so + # don't pull bytes past obj_size into the next one. (an overshoot would be harmless + # -- parse_meta uses the header's length and ignores trailing bytes -- this is just + # tidy.) obj_size comes from the same index we already route with. load_size = min(load_size, obj_size) obj = self.store.load(key, offset=obj_offset, size=load_size) hdr = obj[0:hdr_size] @@ -737,7 +756,7 @@ class Repository: # we did not get enough, need to load more, but not all. # this should be rare, as chunk metadata is rather small usually. retry_size = hdr_size + meta_size - # normally a no-op; guards against a corrupted meta_size producing an oversize read. + # same boundary as above: normally a no-op, just keeps the retry within this object. retry_size = min(retry_size, obj_size) obj = self.store.load(key, offset=obj_offset, size=retry_size) meta = obj[hdr_size : hdr_size + meta_size] diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index f87692711..8aca64eae 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -187,6 +187,13 @@ class MockStore: self.stored[key] = data +class FailingStore: + """A store whose pack write always fails -- used to exercise flush()'s rollback path.""" + + def store(self, key, data): + raise OSError("simulated store failure") + + def test_pack_writer_returns_none_when_not_full(): pw = PackWriter(MockStore(), max_count=2, chunks=ChunkIndex()) assert pw.add(b"a" * 32, b"data") is None @@ -227,6 +234,36 @@ def test_pack_writer_n2_flush(): assert results[1] == (id2, expected_pack_id, len(data1), len(data2)) +def test_pack_writer_rolls_back_index_on_failed_store(): + # If store.store() fails, flush() must drop the entries add() pre-marked, otherwise the index + # keeps a phantom (indexed but never stored) chunk that seen_chunk() reports as present and a + # later identical chunk would dedup against -- silent data loss (#9744 review). + chunks = ChunkIndex() + chunk_id = b"e" * 32 + pw = PackWriter(FailingStore(), max_count=1, chunks=chunks) + with pytest.raises(OSError): + pw.add(chunk_id, b"payload") # max_count=1 -> add() flushes immediately and fails + assert chunks.get(chunk_id) is None # rolled back: no phantom entry left behind + + +def test_failed_store_phantom_not_persisted(tmp_path): + # The phantom must not survive into the persisted repo cache either: close() can write the + # in-memory index on context exit, so the rollback has to happen before anything is serialized. + from ..cache import write_chunkindex_to_repo_cache, build_chunkindex_from_repo + + chunk_id = H(60) + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + # a PackWriter that shares the repository's index but writes to a store that always fails: + pw = PackWriter(FailingStore(), max_count=1, repository=repository) + with pytest.raises(OSError): + pw.add(chunk_id, fchunk(b"DATA")) + assert repository.chunks.get(chunk_id) is None # gone from the in-memory index ... + # ... and persisting + reloading the cache does not bring it back: + write_chunkindex_to_repo_cache(repository, repository.chunks, incremental=True) + reloaded = build_chunkindex_from_repo(repository) + assert reloaded.get(chunk_id) is None + + def test_get_read_data_false_with_range(tmp_path): # read_data=False with ChunkIndex entries limits the load to each object's boundary. hdr_size = RepoObj.obj_header.size @@ -250,7 +287,8 @@ def test_get_read_data_false_with_range(tmp_path): def test_get_read_data_false_large_meta(tmp_path): # When meta_size > extra_size (975 bytes), get() retries with a larger load. hdr_size = RepoObj.obj_header.size - big_meta = b"M" * 1000 # 1000 > 975, forces the retry load + # the first try loads ~1KB, so use a meta clearly past that boundary to force the retry path. + big_meta = b"M" * 5000 chunk = fchunk(b"DATA", meta=big_meta) pack_id = H(44) chunk_id = H(49) From ea4be4d2c473083e4573ea57825572ab0703d1e8 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Sun, 14 Jun 2026 18:36:13 +0530 Subject: [PATCH 16/16] repository: make failing-store test helper fail only pack writes Lets one store both fail the pack write and persist the index, matching production. --- src/borg/testsuite/repository_test.py | 32 ++++++++++++++++++++------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 8aca64eae..1192e9f12 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -187,11 +187,24 @@ class MockStore: self.stored[key] = data -class FailingStore: - """A store whose pack write always fails -- used to exercise flush()'s rollback path.""" +class FailingPackStore: + """Wraps a store but fails packs/* writes; every other call passes through to the inner store. + + Models the realistic failure where only a pack write broke while the rest of the repo (e.g. the + cache/chunks.* index) stays writable. In production PackWriter and the chunk index cache share + one store, so a single object has to fail the pack write yet still let the index persist. + """ + + def __init__(self, inner): + self._inner = inner def store(self, key, data): - raise OSError("simulated store failure") + if key.startswith("packs/"): + raise OSError("simulated pack store failure") + return self._inner.store(key, data) + + def __getattr__(self, name): + return getattr(self._inner, name) def test_pack_writer_returns_none_when_not_full(): @@ -240,7 +253,7 @@ def test_pack_writer_rolls_back_index_on_failed_store(): # later identical chunk would dedup against -- silent data loss (#9744 review). chunks = ChunkIndex() chunk_id = b"e" * 32 - pw = PackWriter(FailingStore(), max_count=1, chunks=chunks) + pw = PackWriter(FailingPackStore(MockStore()), max_count=1, chunks=chunks) with pytest.raises(OSError): pw.add(chunk_id, b"payload") # max_count=1 -> add() flushes immediately and fails assert chunks.get(chunk_id) is None # rolled back: no phantom entry left behind @@ -253,12 +266,15 @@ def test_failed_store_phantom_not_persisted(tmp_path): chunk_id = H(60) with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: - # a PackWriter that shares the repository's index but writes to a store that always fails: - pw = PackWriter(FailingStore(), max_count=1, repository=repository) + # fail only the pack write on the repository's own store; cache/chunks.* writes still work, + # so one store models "just the pack write broke" (PackWriter and the index cache share a + # store in production). the failing store is thus load-bearing for every assertion below. + repository.store = FailingPackStore(repository.store) + pw = PackWriter(repository.store, max_count=1, repository=repository) with pytest.raises(OSError): pw.add(chunk_id, fchunk(b"DATA")) - assert repository.chunks.get(chunk_id) is None # gone from the in-memory index ... - # ... and persisting + reloading the cache does not bring it back: + assert repository.chunks.get(chunk_id) is None # rolled back from the in-memory index ... + # ... and persisting + reloading the cache (through that same store) does not bring it back: write_chunkindex_to_repo_cache(repository, repository.chunks, incremental=True) reloaded = build_chunkindex_from_repo(repository) assert reloaded.get(chunk_id) is None