mirror of
https://github.com/borgbackup/borg.git
synced 2026-06-15 04:21:38 -04:00
Merge pull request #9744 from mr-raj12/pack-files-step6-range-load
Some checks are pending
Lint / lint (push) Waiting to run
CI / lint (push) Waiting to run
CI / security (push) Waiting to run
CI / asan_ubsan (push) Blocked by required conditions
CI / native_tests (push) Blocked by required conditions
CI / vm_tests (NetBSD, false, netbsd, 10.1) (push) Blocked by required conditions
CI / vm_tests (OmniOS, false, omnios, r151056) (push) Blocked by required conditions
CI / vm_tests (OpenBSD, false, openbsd, 7.8) (push) Blocked by required conditions
CI / vm_tests (borg-freebsd-14-x86_64-gh, FreeBSD, true, freebsd, 14.3) (push) Blocked by required conditions
CI / windows_tests (push) Blocked by required conditions
CI / sha256 pack-id (informational) (push) Blocked by required conditions
CodeQL / Analyze (push) Waiting to run
Some checks are pending
Lint / lint (push) Waiting to run
CI / lint (push) Waiting to run
CI / security (push) Waiting to run
CI / asan_ubsan (push) Blocked by required conditions
CI / native_tests (push) Blocked by required conditions
CI / vm_tests (NetBSD, false, netbsd, 10.1) (push) Blocked by required conditions
CI / vm_tests (OmniOS, false, omnios, r151056) (push) Blocked by required conditions
CI / vm_tests (OpenBSD, false, openbsd, 7.8) (push) Blocked by required conditions
CI / vm_tests (borg-freebsd-14-x86_64-gh, FreeBSD, true, freebsd, 14.3) (push) Blocked by required conditions
CI / windows_tests (push) Blocked by required conditions
CI / sha256 pack-id (informational) (push) Blocked by required conditions
CodeQL / Analyze (push) Waiting to run
repository: ChunkIndex-based pack routing with range-load support
This commit is contained in:
commit
350ec8bc36
8 changed files with 438 additions and 35 deletions
54
.github/workflows/ci.yml
vendored
54
.github/workflows/ci.yml
vendored
|
|
@ -703,3 +703,57 @@ jobs:
|
|||
report_type: coverage
|
||||
env_vars: OS,python
|
||||
files: coverage.xml
|
||||
|
||||
sha256_pack_id_tests:
|
||||
name: sha256 pack-id (informational)
|
||||
needs: [lint]
|
||||
runs-on: ubuntu-24.04
|
||||
timeout-minutes: 90
|
||||
continue-on-error: true
|
||||
concurrency:
|
||||
group: sha256-pack-id-${{ github.head_ref || github.ref }}
|
||||
cancel-in-progress: false
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
with:
|
||||
fetch-depth: 0
|
||||
fetch-tags: true
|
||||
|
||||
- name: Set up Python 3.12
|
||||
uses: actions/setup-python@v6
|
||||
with:
|
||||
python-version: "3.12"
|
||||
|
||||
- name: Cache pip
|
||||
uses: actions/cache@v5
|
||||
with:
|
||||
path: ~/.cache/pip
|
||||
key: ${{ runner.os }}-${{ runner.arch }}-pip-sha256-pack-id-${{ hashFiles('requirements.d/development.lock.txt') }}
|
||||
restore-keys: |
|
||||
${{ runner.os }}-${{ runner.arch }}-pip-
|
||||
|
||||
- name: Cache tox environments
|
||||
uses: actions/cache@v5
|
||||
with:
|
||||
path: .tox
|
||||
key: ${{ runner.os }}-${{ runner.arch }}-tox-sha256-pack-id-${{ hashFiles('requirements.d/development.lock.txt', 'pyproject.toml') }}
|
||||
restore-keys: |
|
||||
${{ runner.os }}-${{ runner.arch }}-tox-sha256-pack-id-
|
||||
|
||||
- name: Install Linux packages
|
||||
run: |
|
||||
sudo apt-get update
|
||||
sudo apt-get install -y pkg-config build-essential
|
||||
sudo apt-get install -y libssl-dev libacl1-dev liblz4-dev
|
||||
|
||||
- name: Install Python requirements
|
||||
run: |
|
||||
python -m pip install --upgrade pip setuptools wheel
|
||||
pip install -r requirements.d/development.lock.txt
|
||||
|
||||
- name: Install borgbackup
|
||||
run: pip install -ve ".[cockpit,s3,sftp,rclone]"
|
||||
|
||||
- name: Run tests with sha256 pack-ids
|
||||
run: tox -e sha256-pack-id
|
||||
|
|
|
|||
|
|
@ -257,6 +257,11 @@ extras = ["pyfuse3", "sftp", "s3", "rclone"]
|
|||
set_env = {BORG_FUSE_IMPL = "mfusepy"}
|
||||
extras = ["mfusepy", "sftp", "s3", "rclone"]
|
||||
|
||||
# Informational env: forces sha256 pack_ids even with max_count=1 to expose
|
||||
# code that still assumes pack_id == chunk_id. Run: tox -e sha256-pack-id
|
||||
[tool.tox.env.sha256-pack-id]
|
||||
set_env = {BORG_TESTONLY_SHA256_PACK_ID = "1"}
|
||||
|
||||
[tool.tox.env.ruff]
|
||||
skip_install = true
|
||||
deps = ["ruff"]
|
||||
|
|
|
|||
|
|
@ -2164,7 +2164,8 @@ class ArchiveChecker:
|
|||
|
||||
def finish(self):
|
||||
if self.repair:
|
||||
# we may have deleted chunks, remove the chunks index cache!
|
||||
# we may have deleted chunks. delete_chunkindex_cache() removes the on-disk cache and
|
||||
# drops the stale in-memory index, so the next repository access rebuilds it from the repo.
|
||||
logger.info("Deleting chunks cache in repository - next repository access will cause a rebuild.")
|
||||
delete_chunkindex_cache(self.repository)
|
||||
logger.info("Writing Manifest.")
|
||||
|
|
|
|||
|
|
@ -535,6 +535,9 @@ def delete_chunkindex_cache(repository):
|
|||
except StoreObjectNotFound:
|
||||
pass
|
||||
logger.debug(f"cached chunk indexes deleted: {hashes}")
|
||||
# the in-memory index is now stale; drop it so close() does not write it back into the
|
||||
# cache we just deleted. the next .chunks access rebuilds it from actual repo contents.
|
||||
repository.invalidate_chunk_index()
|
||||
|
||||
|
||||
CHUNKINDEX_HASH_SEED = b"0001" # increment seed to invalidate old chunk indexes
|
||||
|
|
@ -553,15 +556,22 @@ def write_chunkindex_to_repo_cache(
|
|||
# but for simplicity, we do it anyway.
|
||||
for key, existing in chunks.iteritems(only_new=incremental):
|
||||
chunks_to_write[key] = existing._replace(flags=ChunkIndex.F_NONE, size=0)
|
||||
num_to_write = len(chunks_to_write)
|
||||
with io.BytesIO() as f:
|
||||
chunks_to_write.write(f)
|
||||
data = f.getvalue()
|
||||
logger.debug(f"caching {len(chunks_to_write)} chunks (incremental={incremental}).")
|
||||
logger.debug(f"caching {num_to_write} chunks (incremental={incremental}).")
|
||||
chunks_to_write.clear() # free memory of the temporary table
|
||||
if clear:
|
||||
# if we don't need the in-memory chunks index anymore:
|
||||
chunks.clear() # free memory, immediately
|
||||
new_hash = hashlib.sha256(data + CHUNKINDEX_HASH_SEED).hexdigest()
|
||||
if num_to_write == 0 and not force_write:
|
||||
# don't persist an empty incremental index: if it became the only cache/chunks.* (e.g. right
|
||||
# after delete_chunkindex_cache()), build_chunkindex_from_repo() would return it as-is instead
|
||||
# of rebuilding from the repo. with nothing new, the existing cache is already up to date.
|
||||
logger.debug("no new chunks to cache; not writing an empty incremental chunk index.")
|
||||
return new_hash
|
||||
cached_hashes = list_chunkindex_hashes(repository)
|
||||
if force_write or new_hash not in cached_hashes:
|
||||
# when an updated chunks index is stored into the cache, we also store its hash as part of the name.
|
||||
|
|
@ -642,6 +652,13 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi
|
|||
num_chunks = 0
|
||||
# The repo says it has these chunks, so we assume they are referenced/used chunks.
|
||||
# We do not know the plaintext size (!= stored_size), thus we set size = 0.
|
||||
#
|
||||
# IMPORTANT (N=1 only): listing yields pack_ids, not per-chunk locations. We can only
|
||||
# reconstruct the index here under the N=1 assumption -- pack_id == chunk_id, one chunk per
|
||||
# pack at offset 0 spanning the whole pack. At N>1 this is wrong: a cold rebuild would have to
|
||||
# open each pack and read its header to recover the per-chunk offsets and sizes. Until that
|
||||
# exists, Repository.get()'s range-load is only correct while a persisted/cached chunk index
|
||||
# is available; a cold rebuild from a bare repo listing silently falls back to N=1 semantics.
|
||||
for pack_id, pack_size in repo_lister(repository, limit=LIST_SCAN_LIMIT):
|
||||
num_chunks += 1
|
||||
chunk_id = pack_id # N=1: chunk_id == pack_id
|
||||
|
|
@ -678,7 +695,16 @@ class ChunksMixin:
|
|||
@property
|
||||
def chunks(self):
|
||||
if self._chunks is None:
|
||||
self._chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True)
|
||||
# the repository owns the one and only chunk index; use it rather than
|
||||
# building a second one and pushing it back into the repository.
|
||||
self._chunks = self.repository.chunks
|
||||
# note: we deliberately do NOT consolidate the cached chunk index fragments here.
|
||||
# each backup writes a small incremental cache/chunks.* fragment (only its new chunks),
|
||||
# which is cheap. collapsing them all into one big fragment on every run would re-upload
|
||||
# the whole index and, with delete_other, invalidate every other client's fragments --
|
||||
# a multi-GB churn per run on a shared repo. fragment count is reclaimed by `borg compact`
|
||||
# (build_chunkindex_from_repo with cache_immediately). a size/threshold-based policy that
|
||||
# bounds the fragment count without re-uploading large fragments can be added later.
|
||||
return self._chunks
|
||||
|
||||
def seen_chunk(self, id, size=None):
|
||||
|
|
@ -831,11 +857,8 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
|
|||
def close(self):
|
||||
self.security_manager.save(self.manifest, self.key)
|
||||
pi = ProgressIndicatorMessage(msgid="cache.close")
|
||||
# Flush any chunks still buffered in the pack writer and update the index
|
||||
# so the last batch gets real pack location values instead of UNKNOWN_*.
|
||||
if self._chunks is not None:
|
||||
pack_results = self.repository.flush()
|
||||
self._chunks.update_pack_info(pack_results)
|
||||
self.repository.flush()
|
||||
if self._files is not None:
|
||||
pi.output("Saving files cache")
|
||||
integrity_data = self._write_files_cache(self._files)
|
||||
|
|
@ -869,6 +892,7 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
|
|||
def wipe_cache(self):
|
||||
logger.warning("Discarding incompatible cache and forcing a cache rebuild")
|
||||
self._chunks = ChunkIndex()
|
||||
self.repository.chunks = self._chunks
|
||||
self.cache_config.manifest_id = ""
|
||||
self.cache_config._config.set("cache", "manifest", "")
|
||||
|
||||
|
|
|
|||
|
|
@ -80,6 +80,9 @@ def pytest_sessionfinish(session, exitstatus):
|
|||
@pytest.fixture(autouse=True)
|
||||
def clean_env(tmpdir_factory, monkeypatch):
|
||||
# also avoid to use anything from the outside environment:
|
||||
# note: BORG_TESTONLY_SHA256_PACK_ID is intentionally NOT exempted here. The repository
|
||||
# module captures it at import time (repository.FORCE_SHA256_PACK_ID), before this fixture
|
||||
# runs, so wiping it per test is harmless and the env stays fully isolated.
|
||||
keys = [key for key in os.environ if key.startswith("BORG_") and key not in ("BORG_FUSE_IMPL",)]
|
||||
for key in keys:
|
||||
monkeypatch.delenv(key, raising=False)
|
||||
|
|
|
|||
|
|
@ -100,12 +100,26 @@ def build_rest_backend(location):
|
|||
return REST(base_url="http://stdio-backend", command=rest_serve_command(location))
|
||||
|
||||
|
||||
# Test-only switch: force sha256 pack_ids even at N=1, to expose code that still assumes
|
||||
# pack_id == chunk_id. Read once here, at import time, on purpose:
|
||||
# - it happens before the per-test clean_env fixture wipes BORG_* vars, so the tox env that
|
||||
# sets this var does not need a clean_env exemption (the captured value survives the wipe);
|
||||
# - flush() then checks a plain bool instead of touching os.environ on every write.
|
||||
FORCE_SHA256_PACK_ID = os.environ.get("BORG_TESTONLY_SHA256_PACK_ID") == "1"
|
||||
|
||||
|
||||
class PackWriter:
|
||||
"""Buffers chunks into a pack file and writes to the store when full.
|
||||
|
||||
Collects (chunk_id, cdata) pairs in a list and flushes once max_count is
|
||||
reached. On flush it returns the location info for every chunk so the
|
||||
caller can update the ChunkIndex with real values.
|
||||
reached. PackWriter maintains the ChunkIndex directly: each add() marks the
|
||||
chunk as pending (pack_id=UNKNOWN_BYTES32); flush() then assigns the real
|
||||
pack_id, offset and size to every pending entry once the pack is on disk.
|
||||
|
||||
The index is not owned here. Construction requires either a repository or an
|
||||
explicit chunks index; there is no silent default. With a repository, the writer
|
||||
uses that repository's single, authoritative index (see the chunks property), so
|
||||
there is never a second copy to keep in sync. Unit tests pass an explicit index.
|
||||
|
||||
At max_count=1 (N=1 phase) each put() maps exactly one chunk to one pack,
|
||||
so pack_id == chunk_id and the naming scheme is unchanged from before.
|
||||
|
|
@ -113,13 +127,37 @@ class PackWriter:
|
|||
this class's interface.
|
||||
"""
|
||||
|
||||
def __init__(self, store, *, max_count=1):
|
||||
def __init__(self, store, *, max_count=1, chunks=None, repository=None):
|
||||
if repository is None and chunks is None:
|
||||
raise ValueError("PackWriter requires either a repository or an explicit chunks index")
|
||||
self.store = store
|
||||
self.max_count = max_count
|
||||
self.repository = repository # when set, the one and only index lives there
|
||||
self._chunks = chunks # explicit index for repository-less use (tests)
|
||||
self._pieces = [] # list of (chunk_id, cdata)
|
||||
|
||||
@property
|
||||
def chunks(self):
|
||||
"""The ChunkIndex this writer updates.
|
||||
|
||||
With a repository, this is the repository's single index (shared, not copied).
|
||||
Without one, it is the explicit index passed at construction.
|
||||
"""
|
||||
if self.repository is not None:
|
||||
return self.repository.chunks
|
||||
return self._chunks
|
||||
|
||||
def add(self, chunk_id, cdata):
|
||||
"""Buffer a chunk. Returns flush results if the pack is now full, else None."""
|
||||
# Mark the chunk as pending (pack_id=UNKNOWN_BYTES32). flush() assigns the real
|
||||
# pack_id and offset for every piece, so the placeholder offset 0 here is never read:
|
||||
# get() refuses a pending entry (PackLocationUnknown) before any offset would matter.
|
||||
# Precondition: callers add only chunks not already stored (the cache dedups via
|
||||
# seen_chunk() first), so add(chunk_id, 0) never resets a real size on an existing entry.
|
||||
# This is also what keeps ChunkIndex.add's "v.size == 0 or v.size == size" assertion happy:
|
||||
# a fresh id has no entry, so the size=0 we pass here is never compared against a real size.
|
||||
self.chunks.add(chunk_id, 0) # size filled in by cache layer
|
||||
self.chunks.update_pack_info([(chunk_id, UNKNOWN_BYTES32, 0, len(cdata))])
|
||||
self._pieces.append((chunk_id, cdata))
|
||||
if len(self._pieces) >= self.max_count:
|
||||
return self.flush()
|
||||
|
|
@ -128,9 +166,9 @@ class PackWriter:
|
|||
def flush(self):
|
||||
"""Write the current pack to the store.
|
||||
|
||||
Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples —
|
||||
Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples --
|
||||
one entry per chunk that was written. Returns None if there was nothing
|
||||
to flush.
|
||||
to flush. Always updates the ChunkIndex with the real pack_id.
|
||||
"""
|
||||
if not self._pieces:
|
||||
return None
|
||||
|
|
@ -144,10 +182,12 @@ class PackWriter:
|
|||
# (backward-compatible file naming: packs/{chunk_id_hex}).
|
||||
# N>1: the pack contains multiple chunks; use SHA256(pack_bytes) so the
|
||||
# file is content-addressed and borgstore can verify/cache it.
|
||||
if self.max_count == 1:
|
||||
# BORG_TESTONLY_SHA256_PACK_ID (see FORCE_SHA256_PACK_ID): always use sha256 even at
|
||||
# N=1, exposing code that still assumes pack_id == chunk_id.
|
||||
if self.max_count == 1 and not FORCE_SHA256_PACK_ID:
|
||||
pack_id = self._pieces[0][0] # N=1: pack_id == chunk_id
|
||||
else:
|
||||
pack_id = sha256(pack_data).digest() # N>1: content-addressed
|
||||
pack_id = sha256(pack_data).digest()
|
||||
|
||||
# Record (chunk_id, pack_id, obj_offset, obj_size) for every piece.
|
||||
results = []
|
||||
|
|
@ -158,10 +198,25 @@ class PackWriter:
|
|||
offset += obj_size
|
||||
|
||||
key = "packs/" + bin_to_hex(pack_id)
|
||||
# ids this flush pre-marked in the index via add() (pack_id still UNKNOWN_BYTES32).
|
||||
pending_ids = [chunk_id for chunk_id, _ in self._pieces]
|
||||
try:
|
||||
self.store.store(key, pack_data)
|
||||
except Exception:
|
||||
# The pack was not durably stored, so every entry add() pre-marked for it now
|
||||
# points at data that does not exist. Leaving them would make seen_chunk() report
|
||||
# these ids as present, letting a later identical chunk dedup against bytes that were
|
||||
# never written -- silent data loss. These entries were created this session and never
|
||||
# received a real pack_id, so dropping them restores the index to its pre-add() state
|
||||
# (matching master, where the index only ever reflected successfully stored chunks).
|
||||
for chunk_id in pending_ids:
|
||||
entry = self.chunks.get(chunk_id)
|
||||
if entry is not None and entry.pack_id == UNKNOWN_BYTES32:
|
||||
del self.chunks[chunk_id]
|
||||
raise
|
||||
finally:
|
||||
self._pieces = [] # reset even on failure to prevent re-bundling a failed chunk
|
||||
self.chunks.update_pack_info(results) # replace UNKNOWN_BYTES32 with real pack_id
|
||||
return results
|
||||
|
||||
|
||||
|
|
@ -208,6 +263,19 @@ class Repository:
|
|||
id = bin_to_hex(id)
|
||||
super().__init__(id, repo)
|
||||
|
||||
class PackLocationUnknown(ErrorWithTraceback):
|
||||
"""Object with key {} is indexed but its pack location is unresolved in repository {}."""
|
||||
|
||||
exit_mcode = 22
|
||||
|
||||
# this is a code bug, not a genuine miss: the chunk is in the index but still buffered
|
||||
# (not flushed). deliberately NOT a subclass of ObjectNotFound, so the usual
|
||||
# "except ObjectNotFound" handlers do not swallow it -- it surfaces loudly with a traceback.
|
||||
def __init__(self, id, repo):
|
||||
if isinstance(id, bytes):
|
||||
id = bin_to_hex(id)
|
||||
super().__init__(id, repo)
|
||||
|
||||
class ParentPathDoesNotExist(Error):
|
||||
"""The parent path of the repository directory [{}] does not exist."""
|
||||
|
||||
|
|
@ -286,6 +354,7 @@ class Repository:
|
|||
self.lock_wait = lock_wait
|
||||
self.exclusive = exclusive
|
||||
self._pack_writer = None
|
||||
self._chunks = None # ChunkIndex; loaded lazily on first access to .chunks
|
||||
|
||||
def __repr__(self):
|
||||
return f"<{self.__class__.__name__} {self._location}>"
|
||||
|
|
@ -415,23 +484,68 @@ class Repository:
|
|||
# important: lock *after* making sure that there actually is an existing, supported repository.
|
||||
if lock:
|
||||
self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire()
|
||||
self._pack_writer = PackWriter(self.store, max_count=1)
|
||||
self._chunks = None
|
||||
self._pack_writer = PackWriter(self.store, max_count=1, repository=self)
|
||||
self.opened = True
|
||||
|
||||
def flush(self):
|
||||
"""Flush any buffered pack writer chunks. Returns pack_results (or None).
|
||||
@property
|
||||
def chunks(self):
|
||||
"""ChunkIndex mapping every known chunk id to its pack location.
|
||||
|
||||
Callers that maintain a ChunkIndex must call this and pass the result to
|
||||
chunks.update_pack_info() before closing, so index entries for the last
|
||||
batch of chunks get real pack location values instead of UNKNOWN_*.
|
||||
This property is the single owner of the in-memory index: get() resolves
|
||||
pack locations through it, PackWriter updates it, and the Cache reads it
|
||||
from here rather than building its own. Built lazily on first access and
|
||||
persisted back to the repo cache at close().
|
||||
"""
|
||||
if self._chunks is None:
|
||||
from .cache import build_chunkindex_from_repo
|
||||
|
||||
self._chunks = build_chunkindex_from_repo(self)
|
||||
return self._chunks
|
||||
|
||||
@chunks.setter
|
||||
def chunks(self, value):
|
||||
# The index is normally built lazily; this setter exists for the few callers
|
||||
# that must install a specific index (e.g. wiping the cache, or restoring an
|
||||
# index captured before close()). To drop a stale index so it rebuilds, do not
|
||||
# assign None here -- call invalidate_chunk_index() instead.
|
||||
self._chunks = value
|
||||
|
||||
def invalidate_chunk_index(self):
|
||||
"""Drop the in-memory chunk index so close() will not persist a stale copy.
|
||||
|
||||
Called when the on-disk chunk index cache is deleted; the next access to
|
||||
.chunks rebuilds the index from actual repository contents. PackWriter
|
||||
reads the index through this Repository, so it follows automatically.
|
||||
"""
|
||||
self._chunks = None
|
||||
|
||||
@property
|
||||
def is_chunk_index_loaded(self):
|
||||
"""Whether the in-memory chunk index has been built/loaded this session.
|
||||
|
||||
Lets the few flag-style checks ask "is it loaded?" without going through the
|
||||
.chunks property (which would build it on demand). self._chunks should not be
|
||||
read directly elsewhere; use .chunks for the index or this for the loaded flag.
|
||||
"""
|
||||
return self._chunks is not None
|
||||
|
||||
def flush(self):
|
||||
"""Flush any buffered pack writer chunks."""
|
||||
if self._pack_writer is not None:
|
||||
return self._pack_writer.flush()
|
||||
return None
|
||||
self._pack_writer.flush() # PackWriter updates _chunks internally
|
||||
|
||||
def close(self):
|
||||
if self._pack_writer is not None:
|
||||
assert not self._pack_writer._pieces, "PackWriter has unflushed chunks; call flush() before close()"
|
||||
# close() may run again after the store was already closed (idempotent close), so we can
|
||||
# only persist while the store is open. Persisting is also a no-op unless chunks were added
|
||||
# this session (only F_NEW entries are serialized, and an empty incremental write is skipped).
|
||||
# guard on is_chunk_index_loaded so we never trigger a lazy rebuild just to persist on close.
|
||||
if self.store_opened and self.is_chunk_index_loaded:
|
||||
from .cache import write_chunkindex_to_repo_cache
|
||||
|
||||
write_chunkindex_to_repo_cache(self, self.chunks, incremental=True)
|
||||
if self.lock:
|
||||
self.lock.release()
|
||||
self.lock = None
|
||||
|
|
@ -614,18 +728,35 @@ class Repository:
|
|||
|
||||
def get(self, id, read_data=True, raise_missing=True):
|
||||
self._lock_refresh()
|
||||
pack_id = id # N=1: pack_id == chunk_id
|
||||
entry = self.chunks.get(id)
|
||||
if entry is None:
|
||||
if raise_missing:
|
||||
raise self.ObjectNotFound(id, str(self._location))
|
||||
return None
|
||||
if entry.pack_id == UNKNOWN_BYTES32:
|
||||
# chunk is buffered in PackWriter, not yet flushed to a pack. at N=1 put() flushes
|
||||
# immediately, so reaching here points at a flush / index-update ordering bug, not a
|
||||
# genuinely missing object. this is a code bug, so we crash loudly regardless of
|
||||
# raise_missing instead of pretending the object is absent.
|
||||
raise self.PackLocationUnknown(id, str(self._location))
|
||||
pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size
|
||||
id_hex = bin_to_hex(id)
|
||||
key = "packs/" + bin_to_hex(pack_id)
|
||||
try:
|
||||
if read_data:
|
||||
return self.store.load(key)
|
||||
return self.store.load(key, offset=obj_offset, size=obj_size)
|
||||
else:
|
||||
# RepoObj layout supports separately encrypted metadata and data.
|
||||
# We return enough bytes so the client can decrypt the metadata.
|
||||
hdr_size = RepoObj.obj_header.size
|
||||
extra_size = 1024 - hdr_size # load a bit more, 1024b, reduces round trips
|
||||
obj = self.store.load(key, size=hdr_size + extra_size)
|
||||
load_size = hdr_size + extra_size
|
||||
# keep the read inside this object: at N>1 a pack holds neighbouring objects, so
|
||||
# don't pull bytes past obj_size into the next one. (an overshoot would be harmless
|
||||
# -- parse_meta uses the header's length and ignores trailing bytes -- this is just
|
||||
# tidy.) obj_size comes from the same index we already route with.
|
||||
load_size = min(load_size, obj_size)
|
||||
obj = self.store.load(key, offset=obj_offset, size=load_size)
|
||||
hdr = obj[0:hdr_size]
|
||||
if len(hdr) != hdr_size:
|
||||
raise IntegrityError(f"Object too small [id {id_hex}]: expected {hdr_size}, got {len(hdr)} bytes")
|
||||
|
|
@ -633,7 +764,10 @@ class Repository:
|
|||
if meta_size > extra_size:
|
||||
# we did not get enough, need to load more, but not all.
|
||||
# this should be rare, as chunk metadata is rather small usually.
|
||||
obj = self.store.load(key, size=hdr_size + meta_size)
|
||||
retry_size = hdr_size + meta_size
|
||||
# same boundary as above: normally a no-op, just keeps the retry within this object.
|
||||
retry_size = min(retry_size, obj_size)
|
||||
obj = self.store.load(key, offset=obj_offset, size=retry_size)
|
||||
meta = obj[hdr_size : hdr_size + meta_size]
|
||||
if len(meta) != meta_size:
|
||||
raise IntegrityError(f"Object too small [id {id_hex}]: expected {meta_size}, got {len(meta)} bytes")
|
||||
|
|
@ -656,13 +790,13 @@ class Repository:
|
|||
|
||||
Returns a list of (chunk_id, pack_id, obj_offset, obj_size) tuples for
|
||||
every chunk written to disk this call. At max_count=1 this is always
|
||||
one entry. Callers should pass these to ChunkIndex.update_pack_info()
|
||||
so the index holds real location values rather than UNKNOWN_INT32 placeholders.
|
||||
one entry.
|
||||
"""
|
||||
self._lock_refresh()
|
||||
data_size = len(data)
|
||||
if data_size > MAX_DATA_SIZE:
|
||||
raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]")
|
||||
# PackWriter shares this repository's index, so add() triggers the lazy build itself.
|
||||
return self._pack_writer.add(id, data)
|
||||
|
||||
def delete(self, id, wait=True):
|
||||
|
|
|
|||
|
|
@ -110,3 +110,33 @@ def test_read_chunkindex_from_repo_cache_missing(tmp_path):
|
|||
# Try to load a non-existent cache entry — should return None, not raise.
|
||||
result = read_chunkindex_from_repo_cache(repository, "f" * 64)
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_chunkindex_cache_not_consolidated_on_access(tmp_path):
|
||||
"""ChunksMixin.chunks binds the repository index without collapsing the cached fragments.
|
||||
|
||||
Each backup leaves a small incremental cache/chunks.* fragment; collapsing them all into one
|
||||
on every access would re-upload the whole index and, with delete_other, invalidate every other
|
||||
client's fragments. Fragment count is reclaimed by `borg compact`, not on every read here.
|
||||
"""
|
||||
from ..cache import ChunksMixin, write_chunkindex_to_repo_cache, list_chunkindex_hashes
|
||||
from ..hashindex import ChunkIndex, ChunkIndexEntry
|
||||
|
||||
repository_location = os.fspath(tmp_path / "repository")
|
||||
with Repository(repository_location, exclusive=True, create=True) as repository:
|
||||
# seed extra fragments on top of the empty one written at repo creation
|
||||
for h in (H(1), H(2)):
|
||||
ci = ChunkIndex()
|
||||
ci[h] = ChunkIndexEntry(ChunkIndex.F_NEW, 0, h, 0, 4)
|
||||
write_chunkindex_to_repo_cache(repository, ci, incremental=False, force_write=True)
|
||||
before = len(list_chunkindex_hashes(repository))
|
||||
assert before > 1
|
||||
|
||||
cache = ChunksMixin()
|
||||
cache.repository = repository
|
||||
index = cache.chunks # binds the repository index; must NOT collapse the fragments
|
||||
|
||||
# fragments are left intact (no consolidation side effect) ...
|
||||
assert len(list_chunkindex_hashes(repository)) == before
|
||||
# ... and the in-memory index still resolves every seeded chunk
|
||||
assert H(1) in index and H(2) in index
|
||||
|
|
|
|||
|
|
@ -3,7 +3,8 @@ import sys
|
|||
from hashlib import sha256
|
||||
|
||||
import pytest
|
||||
from ..helpers import IntegrityError, Location
|
||||
from ..helpers import IntegrityError, Location, bin_to_hex
|
||||
from ..hashindex import ChunkIndex
|
||||
from ..repository import Repository, MAX_DATA_SIZE, rest_serve_command, PackWriter
|
||||
from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION
|
||||
from .hashindex_test import H
|
||||
|
|
@ -78,12 +79,14 @@ def pdchunk(chunk):
|
|||
def test_basic_operations(repo_fixtures, request):
|
||||
with get_repository_from_fixture(repo_fixtures, request) as repository:
|
||||
for x in range(100):
|
||||
repository.put(H(x), fchunk(b"SOMEDATA"))
|
||||
repository.put(H(x), fchunk(b"SOMEDATA")) # put() updates _chunks via PackWriter
|
||||
key50 = H(50)
|
||||
assert pdchunk(repository.get(key50)) == b"SOMEDATA"
|
||||
repository.delete(key50)
|
||||
with pytest.raises(Repository.ObjectNotFound):
|
||||
repository.get(key50)
|
||||
# no manual hand-off of the index across reopen: close() persisted it to the repo cache,
|
||||
# and the freshly opened repo rebuilds .chunks from there (or by listing the repo) on its own.
|
||||
with reopen(repository) as repository:
|
||||
with pytest.raises(Repository.ObjectNotFound):
|
||||
repository.get(key50)
|
||||
|
|
@ -93,6 +96,32 @@ def test_basic_operations(repo_fixtures, request):
|
|||
assert pdchunk(repository.get(H(x))) == b"SOMEDATA"
|
||||
|
||||
|
||||
def test_chunk_index_persisted_on_close(tmp_path):
|
||||
# close() must serialize the live chunk index into the repo cache, so a freshly opened
|
||||
# repo can resolve pack locations without any manual hand-off. This proves the round-trip
|
||||
# by reading the persisted index back directly (not via a repo rescan, which at N=1 would
|
||||
# reconstruct the same entries and so could mask a broken persist step).
|
||||
from ..cache import list_chunkindex_hashes, read_chunkindex_from_repo_cache
|
||||
|
||||
location = os.fspath(tmp_path / "repo")
|
||||
with Repository(location, exclusive=True, create=True) as repository:
|
||||
for x in range(10):
|
||||
repository.put(H(x), fchunk(b"DATA")) # N=1: each put() flushes immediately
|
||||
# reopen and read the cached fragments straight from disk
|
||||
with Repository(location, exclusive=True) as repository:
|
||||
persisted = ChunkIndex()
|
||||
for hash in list_chunkindex_hashes(repository):
|
||||
fragment = read_chunkindex_from_repo_cache(repository, hash)
|
||||
if fragment is not None:
|
||||
for k, v in fragment.items():
|
||||
persisted[k] = v
|
||||
for x in range(10):
|
||||
assert H(x) in persisted # close() actually wrote this session's chunks
|
||||
# and the reopened repo resolves them end to end
|
||||
for x in range(10):
|
||||
assert pdchunk(repository.get(H(x))) == b"DATA"
|
||||
|
||||
|
||||
def test_read_data(repo_fixtures, request):
|
||||
with get_repository_from_fixture(repo_fixtures, request) as repository:
|
||||
meta, data = b"meta", b"data"
|
||||
|
|
@ -158,13 +187,33 @@ class MockStore:
|
|||
self.stored[key] = data
|
||||
|
||||
|
||||
class FailingPackStore:
|
||||
"""Wraps a store but fails packs/* writes; every other call passes through to the inner store.
|
||||
|
||||
Models the realistic failure where only a pack write broke while the rest of the repo (e.g. the
|
||||
cache/chunks.* index) stays writable. In production PackWriter and the chunk index cache share
|
||||
one store, so a single object has to fail the pack write yet still let the index persist.
|
||||
"""
|
||||
|
||||
def __init__(self, inner):
|
||||
self._inner = inner
|
||||
|
||||
def store(self, key, data):
|
||||
if key.startswith("packs/"):
|
||||
raise OSError("simulated pack store failure")
|
||||
return self._inner.store(key, data)
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._inner, name)
|
||||
|
||||
|
||||
def test_pack_writer_returns_none_when_not_full():
|
||||
pw = PackWriter(MockStore(), max_count=2)
|
||||
pw = PackWriter(MockStore(), max_count=2, chunks=ChunkIndex())
|
||||
assert pw.add(b"a" * 32, b"data") is None
|
||||
|
||||
|
||||
def test_pack_writer_flush_returns_none_when_empty():
|
||||
pw = PackWriter(MockStore(), max_count=1)
|
||||
pw = PackWriter(MockStore(), max_count=1, chunks=ChunkIndex())
|
||||
assert pw.flush() is None
|
||||
|
||||
|
||||
|
|
@ -172,7 +221,7 @@ def test_pack_writer_n1_flush():
|
|||
store = MockStore()
|
||||
chunk_id = b"c" * 32
|
||||
cdata = b"payload"
|
||||
pw = PackWriter(store, max_count=1)
|
||||
pw = PackWriter(store, max_count=1, chunks=ChunkIndex())
|
||||
results = pw.add(chunk_id, cdata)
|
||||
assert results is not None
|
||||
assert len(results) == 1
|
||||
|
|
@ -187,7 +236,7 @@ def test_pack_writer_n2_flush():
|
|||
store = MockStore()
|
||||
id1, id2 = b"a" * 32, b"b" * 32
|
||||
data1, data2 = b"first", b"second"
|
||||
pw = PackWriter(store, max_count=2)
|
||||
pw = PackWriter(store, max_count=2, chunks=ChunkIndex())
|
||||
assert pw.add(id1, data1) is None
|
||||
results = pw.add(id2, data2)
|
||||
assert results is not None
|
||||
|
|
@ -198,13 +247,116 @@ def test_pack_writer_n2_flush():
|
|||
assert results[1] == (id2, expected_pack_id, len(data1), len(data2))
|
||||
|
||||
|
||||
def test_pack_writer_rolls_back_index_on_failed_store():
|
||||
# If store.store() fails, flush() must drop the entries add() pre-marked, otherwise the index
|
||||
# keeps a phantom (indexed but never stored) chunk that seen_chunk() reports as present and a
|
||||
# later identical chunk would dedup against -- silent data loss (#9744 review).
|
||||
chunks = ChunkIndex()
|
||||
chunk_id = b"e" * 32
|
||||
pw = PackWriter(FailingPackStore(MockStore()), max_count=1, chunks=chunks)
|
||||
with pytest.raises(OSError):
|
||||
pw.add(chunk_id, b"payload") # max_count=1 -> add() flushes immediately and fails
|
||||
assert chunks.get(chunk_id) is None # rolled back: no phantom entry left behind
|
||||
|
||||
|
||||
def test_failed_store_phantom_not_persisted(tmp_path):
|
||||
# The phantom must not survive into the persisted repo cache either: close() can write the
|
||||
# in-memory index on context exit, so the rollback has to happen before anything is serialized.
|
||||
from ..cache import write_chunkindex_to_repo_cache, build_chunkindex_from_repo
|
||||
|
||||
chunk_id = H(60)
|
||||
with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository:
|
||||
# fail only the pack write on the repository's own store; cache/chunks.* writes still work,
|
||||
# so one store models "just the pack write broke" (PackWriter and the index cache share a
|
||||
# store in production). the failing store is thus load-bearing for every assertion below.
|
||||
repository.store = FailingPackStore(repository.store)
|
||||
pw = PackWriter(repository.store, max_count=1, repository=repository)
|
||||
with pytest.raises(OSError):
|
||||
pw.add(chunk_id, fchunk(b"DATA"))
|
||||
assert repository.chunks.get(chunk_id) is None # rolled back from the in-memory index ...
|
||||
# ... and persisting + reloading the cache (through that same store) does not bring it back:
|
||||
write_chunkindex_to_repo_cache(repository, repository.chunks, incremental=True)
|
||||
reloaded = build_chunkindex_from_repo(repository)
|
||||
assert reloaded.get(chunk_id) is None
|
||||
|
||||
|
||||
def test_get_read_data_false_with_range(tmp_path):
|
||||
# read_data=False with ChunkIndex entries limits the load to each object's boundary.
|
||||
hdr_size = RepoObj.obj_header.size
|
||||
chunk1 = fchunk(b"FIRST")
|
||||
chunk2 = fchunk(b"SECOND")
|
||||
pack = chunk1 + chunk2
|
||||
pack_id = H(43)
|
||||
id1, id2 = H(47), H(48)
|
||||
with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository:
|
||||
repository.store_store("packs/" + bin_to_hex(pack_id), pack)
|
||||
chunks = ChunkIndex()
|
||||
chunks.add(id1, len(chunk1))
|
||||
chunks.update_pack_info([(id1, pack_id, 0, len(chunk1))])
|
||||
chunks.add(id2, len(chunk2))
|
||||
chunks.update_pack_info([(id2, pack_id, len(chunk1), len(chunk2))])
|
||||
repository.chunks = chunks
|
||||
assert repository.get(id1, read_data=False) == chunk1[:hdr_size]
|
||||
assert repository.get(id2, read_data=False) == chunk2[:hdr_size]
|
||||
|
||||
|
||||
def test_get_read_data_false_large_meta(tmp_path):
|
||||
# When meta_size > extra_size (975 bytes), get() retries with a larger load.
|
||||
hdr_size = RepoObj.obj_header.size
|
||||
# the first try loads ~1KB, so use a meta clearly past that boundary to force the retry path.
|
||||
big_meta = b"M" * 5000
|
||||
chunk = fchunk(b"DATA", meta=big_meta)
|
||||
pack_id = H(44)
|
||||
chunk_id = H(49)
|
||||
with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository:
|
||||
repository.store_store("packs/" + bin_to_hex(pack_id), chunk)
|
||||
chunks = ChunkIndex()
|
||||
chunks.add(chunk_id, len(chunk))
|
||||
chunks.update_pack_info([(chunk_id, pack_id, 0, len(chunk))])
|
||||
repository.chunks = chunks
|
||||
result = repository.get(chunk_id, read_data=False)
|
||||
assert result == chunk[: hdr_size + len(big_meta)]
|
||||
|
||||
|
||||
def test_get_uses_chunk_index_location(tmp_path):
|
||||
# get() routes to the correct pack and offset when a ChunkIndex is assigned via the chunks property.
|
||||
chunk1 = fchunk(b"FIRST")
|
||||
chunk2 = fchunk(b"SECOND")
|
||||
pack = chunk1 + chunk2
|
||||
pack_id = H(55)
|
||||
id1, id2 = H(56), H(57)
|
||||
with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository:
|
||||
# Inject the pack directly; bypasses PackWriter to test routing independently.
|
||||
repository.store_store("packs/" + bin_to_hex(pack_id), pack)
|
||||
chunks = ChunkIndex()
|
||||
chunks.add(id1, len(chunk1))
|
||||
chunks.update_pack_info([(id1, pack_id, 0, len(chunk1))])
|
||||
chunks.add(id2, len(chunk2))
|
||||
chunks.update_pack_info([(id2, pack_id, len(chunk1), len(chunk2))])
|
||||
repository.chunks = chunks
|
||||
assert repository.get(id1) == chunk1
|
||||
assert repository.get(id2) == chunk2
|
||||
|
||||
|
||||
def test_put_marks_id_in_chunk_index(tmp_path):
|
||||
# put() immediately updates _chunks: add() marks the id as seen, then update_pack_info
|
||||
# fills in the real pack location for the current session.
|
||||
with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository:
|
||||
id1 = H(1)
|
||||
repository.put(id1, fchunk(b"ZEROS"))
|
||||
entry = repository._chunks.get(id1)
|
||||
assert entry is not None
|
||||
assert entry.pack_id == id1 # N=1: pack_id == chunk_id, set by update_pack_info in put()
|
||||
assert entry.size == 0 # uncompressed size filled in by cache layer
|
||||
|
||||
|
||||
def test_pack_writer_final_partial_pack_uses_sha256():
|
||||
# When max_count > 1, a final flush with only 1 piece must still use SHA256,
|
||||
# not the N=1 pack_id == chunk_id hack.
|
||||
store = MockStore()
|
||||
chunk_id = b"d" * 32
|
||||
cdata = b"solo"
|
||||
pw = PackWriter(store, max_count=3)
|
||||
pw = PackWriter(store, max_count=3, chunks=ChunkIndex())
|
||||
assert pw.add(chunk_id, cdata) is None
|
||||
results = pw.flush()
|
||||
assert results is not None
|
||||
|
|
|
|||
Loading…
Reference in a new issue