Merge pull request #9723 from mr-raj12/pack-files-step5-packwriter
Some checks are pending
Lint / lint (push) Waiting to run
CI / lint (push) Waiting to run
CI / security (push) Waiting to run
CI / asan_ubsan (push) Blocked by required conditions
CI / native_tests (push) Blocked by required conditions
CI / vm_tests (NetBSD, false, netbsd, 10.1) (push) Blocked by required conditions
CI / vm_tests (OmniOS, false, omnios, r151056) (push) Blocked by required conditions
CI / vm_tests (OpenBSD, false, openbsd, 7.8) (push) Blocked by required conditions
CI / vm_tests (borg-freebsd-14-x86_64-gh, FreeBSD, true, freebsd, 14.3) (push) Blocked by required conditions
CI / windows_tests (push) Blocked by required conditions
CodeQL / Analyze (push) Waiting to run

repository: add PackWriter and two-phase chunk index update
This commit is contained in:
TW 2026-06-09 12:55:28 +02:00 committed by GitHub
commit 7cdaebf4f3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 218 additions and 14 deletions

View file

@ -2012,12 +2012,16 @@ class ArchiveChecker:
# either we already have this chunk in repo and chunks index or we add it now
if id_ not in self.chunks:
assert cdata is not None
pack_id = id_ # only correct for N=1
self.chunks[id_] = ChunkIndexEntry(
flags=ChunkIndex.F_USED, size=size, pack_id=pack_id, obj_offset=0, obj_size=0
flags=ChunkIndex.F_USED,
size=size,
pack_id=UNKNOWN_BYTES32,
obj_offset=UNKNOWN_INT32,
obj_size=UNKNOWN_INT32,
)
if self.repair:
self.repository.put(id_, cdata)
pack_results = self.repository.put(id_, cdata)
self.chunks.update_pack_info(pack_results)
def verify_file_chunks(archive_name, item):
"""Verifies that all file chunks are present. Missing file chunks will be logged."""

View file

@ -730,9 +730,10 @@ class ChunksMixin:
cdata = self.repo_objs.format(
id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
)
self.repository.put(id, cdata, wait=wait)
pack_results = self.repository.put(id, cdata, wait=wait)
self.last_refresh_dt = now # .put also refreshed the lock
self.chunks.add(id, size)
self.chunks.update_pack_info(pack_results)
stats.update(size, not exists)
return ChunkListEntry(id, size)
@ -830,6 +831,11 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
def close(self):
self.security_manager.save(self.manifest, self.key)
pi = ProgressIndicatorMessage(msgid="cache.close")
# Flush any chunks still buffered in the pack writer and update the index
# so the last batch gets real pack location values instead of UNKNOWN_*.
if self._chunks is not None:
pack_results = self.repository.flush()
self._chunks.update_pack_info(pack_results)
if self._files is not None:
pi.output("Saving files cache")
integrity_data = self._write_files_cache(self._files)

View file

@ -51,6 +51,13 @@ ROBJ_DONTCARE = "*" # used to parse without type assertion (= accept any type)
# the header, and the total size was set to precisely 20 MiB for borg < 1.3).
MAX_DATA_SIZE = 20971479
# Placeholder for pack location fields (obj_offset, obj_size) when the value is not yet known.
# Grep for UNKNOWN_INT32 to find every site that still needs updating.
UNKNOWN_INT32 = 0xFFFFFFFF
# Placeholder for pack_id (32-byte field) when the value is not yet known.
UNKNOWN_BYTES32 = b"\xff" * 32
# MAX_OBJECT_SIZE = MAX_DATA_SIZE + len(PUT header)
MAX_OBJECT_SIZE = MAX_DATA_SIZE + 41 # see assertion at end of repository module

View file

@ -21,6 +21,7 @@ class ChunkIndex:
M_USER: int
M_SYSTEM: int
def add(self, key: bytes, size: int) -> None: ...
def update_pack_info(self, pack_results: list | None) -> None: ...
def iteritems(self, *, only_new: bool = ...) -> Iterator: ...
def clear_new(self) -> None: ...
def __contains__(self, key: bytes) -> bool: ...

View file

@ -5,6 +5,8 @@ import struct
from borghash import HashTableNT
from .constants import UNKNOWN_INT32, UNKNOWN_BYTES32
cdef _NoDefault = object()
@ -79,8 +81,9 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
else:
flags = v.flags | self.F_USED
assert v.size == 0 or v.size == size
pack_id = key # N=1: chunk_id == pack_id
self[key] = ChunkIndexEntry(flags=flags, size=size, pack_id=pack_id, obj_offset=0, obj_size=0)
self[key] = ChunkIndexEntry(
flags=flags, size=size, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
)
def __getitem__(self, key):
"""Specialized __getitem__ that hides system flags."""
@ -105,6 +108,14 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
user_flags = value.flags & self.M_USER
self.ht[key] = value._replace(flags=system_flags | user_flags)
def update_pack_info(self, pack_results):
"""Update the on-disk location fields for a list of (chunk_id, pack_id, obj_offset, obj_size) tuples."""
if not pack_results:
return
for chunk_id, pack_id, obj_offset, obj_size in pack_results:
existing = self[chunk_id]
self[chunk_id] = existing._replace(pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size)
def clear_new(self):
"""Clears the F_NEW flag of all items."""
for key, value in self.ht.items():

View file

@ -99,6 +99,71 @@ def build_rest_backend(location):
return REST(base_url="http://stdio-backend", command=rest_serve_command(location))
class PackWriter:
"""Buffers chunks into a pack file and writes to the store when full.
Collects (chunk_id, cdata) pairs in a list and flushes once max_count is
reached. On flush it returns the location info for every chunk so the
caller can update the ChunkIndex with real values.
At max_count=1 (N=1 phase) each put() maps exactly one chunk to one pack,
so pack_id == chunk_id and the naming scheme is unchanged from before.
Raising max_count later (N>1 phase) enables real packing without touching
this class's interface.
"""
def __init__(self, store, *, max_count=1):
self.store = store
self.max_count = max_count
self._pieces = [] # list of (chunk_id, cdata)
def add(self, chunk_id, cdata):
"""Buffer a chunk. Returns flush results if the pack is now full, else None."""
self._pieces.append((chunk_id, cdata))
if len(self._pieces) >= self.max_count:
return self.flush()
return None
def flush(self):
"""Write the current pack to the store.
Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples
one entry per chunk that was written. Returns None if there was nothing
to flush.
"""
if not self._pieces:
return None
# Build the pack bytes once by joining all pieces (avoids O(n^2) copies
# that incremental string concatenation would cause in Python).
pack_data = b"".join(cdata for _, cdata in self._pieces)
# Determine pack_id.
# N=1: the pack contains exactly one chunk, so we keep pack_id == chunk_id
# (backward-compatible file naming: packs/{chunk_id_hex}).
# N>1: the pack contains multiple chunks; use SHA256(pack_bytes) so the
# file is content-addressed and borgstore can verify/cache it.
if self.max_count == 1:
pack_id = self._pieces[0][0] # N=1: pack_id == chunk_id
else:
pack_id = sha256(pack_data).digest() # N>1: content-addressed
# Record (chunk_id, pack_id, obj_offset, obj_size) for every piece.
results = []
offset = 0
for chunk_id, cdata in self._pieces:
obj_size = len(cdata)
results.append((chunk_id, pack_id, offset, obj_size))
offset += obj_size
key = "packs/" + bin_to_hex(pack_id)
try:
self.store.store(key, pack_data)
finally:
self._pieces = [] # reset even on failure to prevent re-bundling a failed chunk
return results
class Repository:
"""borgstore-based key/value store."""
@ -219,6 +284,7 @@ class Repository:
self.do_lock = lock
self.lock_wait = lock_wait
self.exclusive = exclusive
self._pack_writer = None
def __repr__(self):
return f"<{self.__class__.__name__} {self._location}>"
@ -340,9 +406,23 @@ class Repository:
# important: lock *after* making sure that there actually is an existing, supported repository.
if lock:
self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire()
self._pack_writer = PackWriter(self.store, max_count=1)
self.opened = True
def flush(self):
"""Flush any buffered pack writer chunks. Returns pack_results (or None).
Callers that maintain a ChunkIndex must call this and pass the result to
chunks.update_pack_info() before closing, so index entries for the last
batch of chunks get real pack location values instead of UNKNOWN_*.
"""
if self._pack_writer is not None:
return self._pack_writer.flush()
return None
def close(self):
if self._pack_writer is not None:
assert not self._pack_writer._pieces, "PackWriter has unflushed chunks; call flush() before close()"
if self.lock:
self.lock.release()
self.lock = None
@ -564,15 +644,17 @@ class Repository:
Note: when doing calls with wait=False this gets async and caller must
deal with async results / exceptions later.
Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples for
every chunk written to disk this call. At max_count=1 this is always
one entry. Callers should pass these to ChunkIndex.update_pack_info()
so the index holds real location values rather than UNKNOWN_INT32 placeholders.
"""
self._lock_refresh()
data_size = len(data)
if data_size > MAX_DATA_SIZE:
raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]")
pack_id = id # N=1: pack_id == chunk_id
key = "packs/" + bin_to_hex(pack_id)
self.store.store(key, data)
return self._pack_writer.add(id, data)
def delete(self, id, wait=True):
"""delete a repo object

View file

@ -3,6 +3,7 @@ import struct
import pytest
from ..constants import UNKNOWN_INT32, UNKNOWN_BYTES32
from ..hashindex import ChunkIndex, ChunkIndexEntry
@ -20,15 +21,42 @@ def test_chunkindex_add():
chunks = ChunkIndex()
x = H2(1)
chunks.add(x, 0)
assert chunks[x] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=0, pack_id=x, obj_offset=0, obj_size=0)
assert chunks[x] == ChunkIndexEntry(
flags=ChunkIndex.F_USED, size=0, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
)
chunks.add(x, 2) # updating size (we do not have a size yet)
assert chunks[x] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=0, obj_size=0)
assert chunks[x] == ChunkIndexEntry(
flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
)
chunks.add(x, 2)
assert chunks[x] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=0, obj_size=0)
assert chunks[x] == ChunkIndexEntry(
flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
)
with pytest.raises(AssertionError):
chunks.add(x, 3) # inconsistent size (we already have a different size)
def test_chunkindex_update_pack_info():
chunks = ChunkIndex()
x1, x2 = H2(1), H2(2)
chunks.add(x1, 10)
chunks.add(x2, 20)
assert chunks[x1].obj_offset == UNKNOWN_INT32
assert chunks[x2].obj_offset == UNKNOWN_INT32
pack_id = H2(3)
# Both chunks land in the same pack (N>1 scenario): batch update in one call.
chunks.update_pack_info([(x1, pack_id, 0, 50), (x2, pack_id, 50, 60)])
# Location fields updated; flags and size must be unchanged.
assert chunks[x1] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=10, pack_id=pack_id, obj_offset=0, obj_size=50)
assert chunks[x2] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=20, pack_id=pack_id, obj_offset=50, obj_size=60)
# None and empty list are both no-ops.
chunks.update_pack_info(None)
chunks.update_pack_info([])
assert chunks[x1].obj_offset == 0
def test_keyerror():
chunks = ChunkIndex()
x = H2(1)

View file

@ -1,10 +1,11 @@
import os
import sys
from hashlib import sha256
import pytest
from ..constants import ROBJ_FILE_STREAM
from ..helpers import IntegrityError, Location
from ..repository import Repository, MAX_DATA_SIZE, cache_if_remote, rest_serve_command
from ..repository import Repository, MAX_DATA_SIZE, cache_if_remote, rest_serve_command, PackWriter
from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION
from ..crypto.key import PlaintextKey
from .hashindex_test import H
@ -200,3 +201,67 @@ class TestCacheIfRemote:
def test_decrypted_cache_and_transform_incompatible(self, cache_repository, repo_objs):
with pytest.raises(ValueError):
cache_if_remote(cache_repository, decrypted_cache=repo_objs, transform=lambda key, data: data)
class MockStore:
def __init__(self):
self.stored = {}
def store(self, key, data):
self.stored[key] = data
def test_pack_writer_returns_none_when_not_full():
pw = PackWriter(MockStore(), max_count=2)
assert pw.add(b"a" * 32, b"data") is None
def test_pack_writer_flush_returns_none_when_empty():
pw = PackWriter(MockStore(), max_count=1)
assert pw.flush() is None
def test_pack_writer_n1_flush():
store = MockStore()
chunk_id = b"c" * 32
cdata = b"payload"
pw = PackWriter(store, max_count=1)
results = pw.add(chunk_id, cdata)
assert results is not None
assert len(results) == 1
stored_id, pack_id, obj_offset, obj_size = results[0]
assert stored_id == chunk_id
assert pack_id == chunk_id # N=1: pack_id == chunk_id
assert obj_offset == 0
assert obj_size == len(cdata)
def test_pack_writer_n2_flush():
store = MockStore()
id1, id2 = b"a" * 32, b"b" * 32
data1, data2 = b"first", b"second"
pw = PackWriter(store, max_count=2)
assert pw.add(id1, data1) is None
results = pw.add(id2, data2)
assert results is not None
assert len(results) == 2
pack_data = data1 + data2
expected_pack_id = sha256(pack_data).digest()
assert results[0] == (id1, expected_pack_id, 0, len(data1))
assert results[1] == (id2, expected_pack_id, len(data1), len(data2))
def test_pack_writer_final_partial_pack_uses_sha256():
# When max_count > 1, a final flush with only 1 piece must still use SHA256,
# not the N=1 pack_id == chunk_id hack.
store = MockStore()
chunk_id = b"d" * 32
cdata = b"solo"
pw = PackWriter(store, max_count=3)
assert pw.add(chunk_id, cdata) is None
results = pw.flush()
assert results is not None
assert len(results) == 1
_, pack_id, _, _ = results[0]
assert pack_id == sha256(cdata).digest()
assert pack_id != chunk_id