mirror of
https://github.com/borgbackup/borg.git
synced 2026-06-11 01:41:57 -04:00
repository: add PackWriter and two-phase chunk index update, refs #8572
PackWriter buffers (chunk_id, cdata) pairs and flushes as pack files via borgstore. At N=1 pack_id == chunk_id; UNKNOWN_INT32 (0xFFFFFFFF) placeholders in the index are replaced by real pack location fields after flush() via update_pack_info(). Update test_chunkindex_add to expect UNKNOWN_INT32 sentinels from add().
This commit is contained in:
parent
9b8fc60430
commit
71bbde208f
7 changed files with 111 additions and 11 deletions
|
|
@ -2014,10 +2014,16 @@ class ArchiveChecker:
|
|||
assert cdata is not None
|
||||
pack_id = id_ # only correct for N=1
|
||||
self.chunks[id_] = ChunkIndexEntry(
|
||||
flags=ChunkIndex.F_USED, size=size, pack_id=pack_id, obj_offset=0, obj_size=0
|
||||
flags=ChunkIndex.F_USED,
|
||||
size=size,
|
||||
pack_id=pack_id,
|
||||
obj_offset=UNKNOWN_INT32,
|
||||
obj_size=UNKNOWN_INT32,
|
||||
)
|
||||
if self.repair:
|
||||
self.repository.put(id_, cdata)
|
||||
pack_results = self.repository.put(id_, cdata)
|
||||
for chunk_id, pack_id, obj_offset, obj_size in pack_results:
|
||||
self.chunks.update_pack_info(chunk_id, pack_id, obj_offset, obj_size)
|
||||
|
||||
def verify_file_chunks(archive_name, item):
|
||||
"""Verifies that all file chunks are present. Missing file chunks will be logged."""
|
||||
|
|
|
|||
|
|
@ -730,9 +730,13 @@ class ChunksMixin:
|
|||
cdata = self.repo_objs.format(
|
||||
id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
|
||||
)
|
||||
self.repository.put(id, cdata, wait=wait)
|
||||
pack_results = self.repository.put(id, cdata, wait=wait)
|
||||
self.last_refresh_dt = now # .put also refreshed the lock
|
||||
self.chunks.add(id, size)
|
||||
# pack_results is non-empty when the PackWriter flushed (always at max_count=1).
|
||||
# Update the index with real pack location so obj_size is not UNKNOWN_INT32.
|
||||
for chunk_id, pack_id, obj_offset, obj_size in pack_results:
|
||||
self.chunks.update_pack_info(chunk_id, pack_id, obj_offset, obj_size)
|
||||
stats.update(size, not exists)
|
||||
return ChunkListEntry(id, size)
|
||||
|
||||
|
|
|
|||
|
|
@ -51,6 +51,10 @@ ROBJ_DONTCARE = "*" # used to parse without type assertion (= accept any type)
|
|||
# the header, and the total size was set to precisely 20 MiB for borg < 1.3).
|
||||
MAX_DATA_SIZE = 20971479
|
||||
|
||||
# Placeholder for pack location fields (obj_offset, obj_size) when the value is not yet known.
|
||||
# Grep for UNKNOWN_INT32 to find every site that still needs updating.
|
||||
UNKNOWN_INT32 = 0xFFFFFFFF
|
||||
|
||||
# MAX_OBJECT_SIZE = MAX_DATA_SIZE + len(PUT header)
|
||||
MAX_OBJECT_SIZE = MAX_DATA_SIZE + 41 # see assertion at end of repository module
|
||||
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ class ChunkIndex:
|
|||
M_USER: int
|
||||
M_SYSTEM: int
|
||||
def add(self, key: bytes, size: int) -> None: ...
|
||||
def update_pack_info(self, key: bytes, pack_id: bytes, obj_offset: int, obj_size: int) -> None: ...
|
||||
def iteritems(self, *, only_new: bool = ...) -> Iterator: ...
|
||||
def clear_new(self) -> None: ...
|
||||
def __contains__(self, key: bytes) -> bool: ...
|
||||
|
|
|
|||
|
|
@ -5,6 +5,8 @@ import struct
|
|||
|
||||
from borghash import HashTableNT
|
||||
|
||||
from .constants import UNKNOWN_INT32
|
||||
|
||||
|
||||
|
||||
cdef _NoDefault = object()
|
||||
|
|
@ -80,7 +82,9 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
|
|||
flags = v.flags | self.F_USED
|
||||
assert v.size == 0 or v.size == size
|
||||
pack_id = key # N=1: chunk_id == pack_id
|
||||
self[key] = ChunkIndexEntry(flags=flags, size=size, pack_id=pack_id, obj_offset=0, obj_size=0)
|
||||
self[key] = ChunkIndexEntry(
|
||||
flags=flags, size=size, pack_id=pack_id, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
|
||||
)
|
||||
|
||||
def __getitem__(self, key):
|
||||
"""Specialized __getitem__ that hides system flags."""
|
||||
|
|
@ -105,6 +109,11 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
|
|||
user_flags = value.flags & self.M_USER
|
||||
self.ht[key] = value._replace(flags=system_flags | user_flags)
|
||||
|
||||
def update_pack_info(self, key, pack_id, obj_offset, obj_size):
|
||||
"""Update the on-disk location fields for an existing entry without changing flags or size."""
|
||||
existing = self[key]
|
||||
self[key] = existing._replace(pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size)
|
||||
|
||||
def clear_new(self):
|
||||
"""Clears the F_NEW flag of all items."""
|
||||
for key, value in self.ht.items():
|
||||
|
|
|
|||
|
|
@ -99,6 +99,71 @@ def build_rest_backend(location):
|
|||
return REST(base_url="http://stdio-backend", command=rest_serve_command(location))
|
||||
|
||||
|
||||
class PackWriter:
|
||||
"""Buffers chunks into a pack file and writes to the store when full.
|
||||
|
||||
Collects (chunk_id, cdata) pairs in a list and flushes once max_count is
|
||||
reached. On flush it returns the location info for every chunk so the
|
||||
caller can update the ChunkIndex with real values.
|
||||
|
||||
At max_count=1 (N=1 phase) each put() maps exactly one chunk to one pack,
|
||||
so pack_id == chunk_id and the naming scheme is unchanged from before.
|
||||
Raising max_count later (N>1 phase) enables real packing without touching
|
||||
this class's interface.
|
||||
"""
|
||||
|
||||
def __init__(self, store, *, max_count=1):
|
||||
self.store = store
|
||||
self.max_count = max_count
|
||||
self._pieces = [] # list of (chunk_id, cdata)
|
||||
|
||||
def add(self, chunk_id, cdata):
|
||||
"""Buffer a chunk. Returns flush results if the pack is now full, else []."""
|
||||
self._pieces.append((chunk_id, cdata))
|
||||
if len(self._pieces) >= self.max_count:
|
||||
return self.flush()
|
||||
return []
|
||||
|
||||
def flush(self):
|
||||
"""Write the current pack to the store.
|
||||
|
||||
Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples —
|
||||
one entry per chunk that was written. Returns [] if there was nothing
|
||||
to flush.
|
||||
"""
|
||||
if not self._pieces:
|
||||
return []
|
||||
|
||||
# Build the pack bytes once by joining all pieces (avoids O(n^2) copies
|
||||
# that incremental string concatenation would cause in Python).
|
||||
pack_data = b"".join(cdata for _, cdata in self._pieces)
|
||||
|
||||
# Determine pack_id.
|
||||
# N=1: the pack contains exactly one chunk, so we keep pack_id == chunk_id
|
||||
# (backward-compatible file naming: packs/{chunk_id_hex}).
|
||||
# N>1: the pack contains multiple chunks; use SHA256(pack_bytes) so the
|
||||
# file is content-addressed and borgstore can verify/cache it.
|
||||
if len(self._pieces) == 1:
|
||||
pack_id = self._pieces[0][0] # N=1: pack_id == chunk_id
|
||||
else:
|
||||
pack_id = sha256(pack_data).digest() # N>1: content-addressed
|
||||
|
||||
# Record (chunk_id, pack_id, obj_offset, obj_size) for every piece.
|
||||
results = []
|
||||
offset = 0
|
||||
for chunk_id, cdata in self._pieces:
|
||||
obj_size = len(cdata)
|
||||
results.append((chunk_id, pack_id, offset, obj_size))
|
||||
offset += obj_size
|
||||
|
||||
key = "packs/" + bin_to_hex(pack_id)
|
||||
try:
|
||||
self.store.store(key, pack_data)
|
||||
finally:
|
||||
self._pieces = [] # reset even on failure to prevent re-bundling a failed chunk
|
||||
return results
|
||||
|
||||
|
||||
class Repository:
|
||||
"""borgstore-based key/value store."""
|
||||
|
||||
|
|
@ -219,6 +284,7 @@ class Repository:
|
|||
self.do_lock = lock
|
||||
self.lock_wait = lock_wait
|
||||
self.exclusive = exclusive
|
||||
self._pack_writer = None
|
||||
|
||||
def __repr__(self):
|
||||
return f"<{self.__class__.__name__} {self._location}>"
|
||||
|
|
@ -340,6 +406,7 @@ class Repository:
|
|||
# important: lock *after* making sure that there actually is an existing, supported repository.
|
||||
if lock:
|
||||
self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire()
|
||||
self._pack_writer = PackWriter(self.store, max_count=1)
|
||||
self.opened = True
|
||||
|
||||
def close(self):
|
||||
|
|
@ -564,15 +631,17 @@ class Repository:
|
|||
|
||||
Note: when doing calls with wait=False this gets async and caller must
|
||||
deal with async results / exceptions later.
|
||||
|
||||
Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples for
|
||||
every chunk written to disk this call. At max_count=1 this is always
|
||||
one entry. Callers should pass these to ChunkIndex.update_pack_info()
|
||||
so the index holds real location values rather than UNKNOWN_INT32 placeholders.
|
||||
"""
|
||||
self._lock_refresh()
|
||||
data_size = len(data)
|
||||
if data_size > MAX_DATA_SIZE:
|
||||
raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]")
|
||||
|
||||
pack_id = id # N=1: pack_id == chunk_id
|
||||
key = "packs/" + bin_to_hex(pack_id)
|
||||
self.store.store(key, data)
|
||||
return self._pack_writer.add(id, data)
|
||||
|
||||
def delete(self, id, wait=True):
|
||||
"""delete a repo object
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import struct
|
|||
|
||||
import pytest
|
||||
|
||||
from ..constants import UNKNOWN_INT32
|
||||
from ..hashindex import ChunkIndex, ChunkIndexEntry
|
||||
|
||||
|
||||
|
|
@ -20,11 +21,17 @@ def test_chunkindex_add():
|
|||
chunks = ChunkIndex()
|
||||
x = H2(1)
|
||||
chunks.add(x, 0)
|
||||
assert chunks[x] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=0, pack_id=x, obj_offset=0, obj_size=0)
|
||||
assert chunks[x] == ChunkIndexEntry(
|
||||
flags=ChunkIndex.F_USED, size=0, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
|
||||
)
|
||||
chunks.add(x, 2) # updating size (we do not have a size yet)
|
||||
assert chunks[x] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=0, obj_size=0)
|
||||
assert chunks[x] == ChunkIndexEntry(
|
||||
flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
|
||||
)
|
||||
chunks.add(x, 2)
|
||||
assert chunks[x] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=0, obj_size=0)
|
||||
assert chunks[x] == ChunkIndexEntry(
|
||||
flags=ChunkIndex.F_USED, size=2, pack_id=x, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
|
||||
)
|
||||
with pytest.raises(AssertionError):
|
||||
chunks.add(x, 3) # inconsistent size (we already have a different size)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue