From 71bbde208f8303e4feab3f4272b2ad35cb064fd3 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Sat, 6 Jun 2026 00:09:08 +0530 Subject: [PATCH] repository: add PackWriter and two-phase chunk index update, refs #8572 PackWriter buffers (chunk_id, cdata) pairs and flushes as pack files via borgstore. At N=1 pack_id == chunk_id; UNKNOWN_INT32 (0xFFFFFFFF) placeholders in the index are replaced by real pack location fields after flush() via update_pack_info(). Update test_chunkindex_add to expect UNKNOWN_INT32 sentinels from add(). --- src/borg/archive.py | 10 +++- src/borg/cache.py | 6 ++- src/borg/constants.py | 4 ++ src/borg/hashindex.pyi | 1 + src/borg/hashindex.pyx | 11 +++- src/borg/repository.py | 77 ++++++++++++++++++++++++++-- src/borg/testsuite/hashindex_test.py | 13 +++-- 7 files changed, 111 insertions(+), 11 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index ee156c412..c2c929443 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -2014,10 +2014,16 @@ class ArchiveChecker: assert cdata is not None pack_id = id_ # only correct for N=1 self.chunks[id_] = ChunkIndexEntry( - flags=ChunkIndex.F_USED, size=size, pack_id=pack_id, obj_offset=0, obj_size=0 + flags=ChunkIndex.F_USED, + size=size, + pack_id=pack_id, + obj_offset=UNKNOWN_INT32, + obj_size=UNKNOWN_INT32, ) if self.repair: - self.repository.put(id_, cdata) + pack_results = self.repository.put(id_, cdata) + for chunk_id, pack_id, obj_offset, obj_size in pack_results: + self.chunks.update_pack_info(chunk_id, pack_id, obj_offset, obj_size) def verify_file_chunks(archive_name, item): """Verifies that all file chunks are present. Missing file chunks will be logged.""" diff --git a/src/borg/cache.py b/src/borg/cache.py index 7ef37c89f..014177bca 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -730,9 +730,13 @@ class ChunksMixin: cdata = self.repo_objs.format( id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type ) - self.repository.put(id, cdata, wait=wait) + pack_results = self.repository.put(id, cdata, wait=wait) self.last_refresh_dt = now # .put also refreshed the lock self.chunks.add(id, size) + # pack_results is non-empty when the PackWriter flushed (always at max_count=1). + # Update the index with real pack location so obj_size is not UNKNOWN_INT32. + for chunk_id, pack_id, obj_offset, obj_size in pack_results: + self.chunks.update_pack_info(chunk_id, pack_id, obj_offset, obj_size) stats.update(size, not exists) return ChunkListEntry(id, size) diff --git a/src/borg/constants.py b/src/borg/constants.py index b62af7bf9..28e9010c8 100644 --- a/src/borg/constants.py +++ b/src/borg/constants.py @@ -51,6 +51,10 @@ ROBJ_DONTCARE = "*" # used to parse without type assertion (= accept any type) # the header, and the total size was set to precisely 20 MiB for borg < 1.3). MAX_DATA_SIZE = 20971479 +# Placeholder for pack location fields (obj_offset, obj_size) when the value is not yet known. +# Grep for UNKNOWN_INT32 to find every site that still needs updating. +UNKNOWN_INT32 = 0xFFFFFFFF + # MAX_OBJECT_SIZE = MAX_DATA_SIZE + len(PUT header) MAX_OBJECT_SIZE = MAX_DATA_SIZE + 41 # see assertion at end of repository module diff --git a/src/borg/hashindex.pyi b/src/borg/hashindex.pyi index a978c587a..359db0c0e 100644 --- a/src/borg/hashindex.pyi +++ b/src/borg/hashindex.pyi @@ -21,6 +21,7 @@ class ChunkIndex: M_USER: int M_SYSTEM: int def add(self, key: bytes, size: int) -> None: ... + def update_pack_info(self, key: bytes, pack_id: bytes, obj_offset: int, obj_size: int) -> None: ... def iteritems(self, *, only_new: bool = ...) -> Iterator: ... def clear_new(self) -> None: ... def __contains__(self, key: bytes) -> bool: ... diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index e4eb43274..261ddc262 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -5,6 +5,8 @@ import struct from borghash import HashTableNT +from .constants import UNKNOWN_INT32 + cdef _NoDefault = object() @@ -80,7 +82,9 @@ class ChunkIndex(HTProxyMixin, MutableMapping): flags = v.flags | self.F_USED assert v.size == 0 or v.size == size pack_id = key # N=1: chunk_id == pack_id - self[key] = ChunkIndexEntry(flags=flags, size=size, pack_id=pack_id, obj_offset=0, obj_size=0) + self[key] = ChunkIndexEntry( + flags=flags, size=size, pack_id=pack_id, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + ) def __getitem__(self, key): """Specialized __getitem__ that hides system flags.""" @@ -105,6 +109,11 @@ class ChunkIndex(HTProxyMixin, MutableMapping): user_flags = value.flags & self.M_USER self.ht[key] = value._replace(flags=system_flags | user_flags) + def update_pack_info(self, key, pack_id, obj_offset, obj_size): + """Update the on-disk location fields for an existing entry without changing flags or size.""" + existing = self[key] + self[key] = existing._replace(pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size) + def clear_new(self): """Clears the F_NEW flag of all items.""" for key, value in self.ht.items(): diff --git a/src/borg/repository.py b/src/borg/repository.py index 879e5afde..2fe83e80f 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -99,6 +99,71 @@ def build_rest_backend(location): return REST(base_url="http://stdio-backend", command=rest_serve_command(location)) +class PackWriter: + """Buffers chunks into a pack file and writes to the store when full. + + Collects (chunk_id, cdata) pairs in a list and flushes once max_count is + reached. On flush it returns the location info for every chunk so the + caller can update the ChunkIndex with real values. + + At max_count=1 (N=1 phase) each put() maps exactly one chunk to one pack, + so pack_id == chunk_id and the naming scheme is unchanged from before. + Raising max_count later (N>1 phase) enables real packing without touching + this class's interface. + """ + + def __init__(self, store, *, max_count=1): + self.store = store + self.max_count = max_count + self._pieces = [] # list of (chunk_id, cdata) + + def add(self, chunk_id, cdata): + """Buffer a chunk. Returns flush results if the pack is now full, else [].""" + self._pieces.append((chunk_id, cdata)) + if len(self._pieces) >= self.max_count: + return self.flush() + return [] + + def flush(self): + """Write the current pack to the store. + + Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples — + one entry per chunk that was written. Returns [] if there was nothing + to flush. + """ + if not self._pieces: + return [] + + # Build the pack bytes once by joining all pieces (avoids O(n^2) copies + # that incremental string concatenation would cause in Python). + pack_data = b"".join(cdata for _, cdata in self._pieces) + + # Determine pack_id. + # N=1: the pack contains exactly one chunk, so we keep pack_id == chunk_id + # (backward-compatible file naming: packs/{chunk_id_hex}). + # N>1: the pack contains multiple chunks; use SHA256(pack_bytes) so the + # file is content-addressed and borgstore can verify/cache it. + if len(self._pieces) == 1: + pack_id = self._pieces[0][0] # N=1: pack_id == chunk_id + else: + pack_id = sha256(pack_data).digest() # N>1: content-addressed + + # Record (chunk_id, pack_id, obj_offset, obj_size) for every piece. + results = [] + offset = 0 + for chunk_id, cdata in self._pieces: + obj_size = len(cdata) + results.append((chunk_id, pack_id, offset, obj_size)) + offset += obj_size + + key = "packs/" + bin_to_hex(pack_id) + try: + self.store.store(key, pack_data) + finally: + self._pieces = [] # reset even on failure to prevent re-bundling a failed chunk + return results + + class Repository: """borgstore-based key/value store.""" @@ -219,6 +284,7 @@ class Repository: self.do_lock = lock self.lock_wait = lock_wait self.exclusive = exclusive + self._pack_writer = None def __repr__(self): return f"<{self.__class__.__name__} {self._location}>" @@ -340,6 +406,7 @@ class Repository: # important: lock *after* making sure that there actually is an existing, supported repository. if lock: self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire() + self._pack_writer = PackWriter(self.store, max_count=1) self.opened = True def close(self): @@ -564,15 +631,17 @@ class Repository: Note: when doing calls with wait=False this gets async and caller must deal with async results / exceptions later. + + Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples for + every chunk written to disk this call. At max_count=1 this is always + one entry. Callers should pass these to ChunkIndex.update_pack_info() + so the index holds real location values rather than UNKNOWN_INT32 placeholders. """ self._lock_refresh() data_size = len(data) if data_size > MAX_DATA_SIZE: raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]") - - pack_id = id # N=1: pack_id == chunk_id - key = "packs/" + bin_to_hex(pack_id) - self.store.store(key, data) + return self._pack_writer.add(id, data) def delete(self, id, wait=True): """delete a repo object diff --git a/src/borg/testsuite/hashindex_test.py b/src/borg/testsuite/hashindex_test.py index 8c9ec5be9..6ba88d53a 100644 --- a/src/borg/testsuite/hashindex_test.py +++ b/src/borg/testsuite/hashindex_test.py @@ -3,6 +3,7 @@ import struct import pytest +from ..constants import UNKNOWN_INT32 from ..hashindex import ChunkIndex, ChunkIndexEntry @@ -20,11 +21,17 @@ def test_chunkindex_add(): chunks = ChunkIndex() x = H2(1) chunks.add(x, 0) - assert chunks[x] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=0, pack_id=x, obj_offset=0, obj_size=0) + assert chunks[x] == ChunkIndexEntry( + flags=ChunkIndex.F_USED, size=0, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + ) chunks.add(x, 2) # updating size (we do not have a size yet) - assert chunks[x] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=0, obj_size=0) + assert chunks[x] == ChunkIndexEntry( + flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + ) chunks.add(x, 2) - assert chunks[x] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=0, obj_size=0) + assert chunks[x] == ChunkIndexEntry( + flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + ) with pytest.raises(AssertionError): chunks.add(x, 3) # inconsistent size (we already have a different size)