mirror of
https://github.com/borgbackup/borg.git
synced 2026-06-11 01:41:57 -04:00
Remove cache_if_remote and RepositoryNoCache
In modern borg these were just a pass-through repository wrapper (there is no RepositoryCache), with one variant doing inline decryption and returning (csize, plaintext) tuples. Drop both and make all consumers use the raw repository directly: - fuse.py: ItemCache / FuseOperations / FuseBackend now take the raw repository + repo_objs and decrypt via repo_objs.parse(ROBJ_DONTCARE), matching hlfuse.py. The csize value was discarded at both call sites. - mount_cmds.py: drop the cache_if_remote wrapper around FuseOperations. - archive.py (rebuild_archives / check): drop the pass-through wrapper; robust_iterator now uses self.repository directly. - repository.py: delete the RepositoryNoCache class and cache_if_remote. - repository_test.py: remove TestCacheIfRemote and orphaned imports. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
ac0a643087
commit
c35bc660dd
5 changed files with 75 additions and 185 deletions
|
|
@ -50,7 +50,7 @@ from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
|
|||
from .item import Item, ArchiveItem, ItemDiff
|
||||
from . import platform
|
||||
from .platform import acl_get, acl_set, set_flags, get_flags, swidth
|
||||
from .repository import Repository, NoManifestError, cache_if_remote
|
||||
from .repository import Repository, NoManifestError
|
||||
from .repoobj import RepoObj
|
||||
|
||||
has_link = hasattr(os, "link")
|
||||
|
|
@ -2042,7 +2042,7 @@ class ArchiveChecker:
|
|||
return True, ""
|
||||
|
||||
i = 0
|
||||
archive_items = archive_get_items(archive, repo_objs=self.repo_objs, repository=repository)
|
||||
archive_items = archive_get_items(archive, repo_objs=self.repo_objs, repository=self.repository)
|
||||
for state, items in groupby(archive_items, missing_chunk_detector):
|
||||
items = list(items)
|
||||
if state % 2:
|
||||
|
|
@ -2052,7 +2052,7 @@ class ArchiveChecker:
|
|||
continue
|
||||
if state > 0:
|
||||
unpacker.resync()
|
||||
for chunk_id, cdata in zip(items, repository.get_many(items)):
|
||||
for chunk_id, cdata in zip(items, self.repository.get_many(items)):
|
||||
try:
|
||||
_, data = self.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_ARCHIVE_STREAM)
|
||||
unpacker.feed(data)
|
||||
|
|
@ -2104,59 +2104,58 @@ class ArchiveChecker:
|
|||
pi = ProgressIndicatorPercent(
|
||||
total=num_archives, msg="Checking archives %3.1f%%", step=0.1, msgid="check.rebuild_archives"
|
||||
)
|
||||
with cache_if_remote(self.repository) as repository:
|
||||
for i, info in enumerate(archive_infos):
|
||||
pi.show(i)
|
||||
archive_id, archive_id_hex = info.id, bin_to_hex(info.id)
|
||||
logger.info(
|
||||
f"Analyzing archive {info.name} {info.ts.astimezone()} {archive_id_hex} ({i + 1}/{num_archives})"
|
||||
)
|
||||
if archive_id not in self.chunks:
|
||||
logger.error(f"Archive metadata block {archive_id_hex} is missing!")
|
||||
self.error_found = True
|
||||
if self.repair:
|
||||
logger.error(f"Deleting broken archive {info.name} {archive_id_hex}.")
|
||||
self.manifest.archives.delete_by_id(archive_id)
|
||||
else:
|
||||
logger.error(f"Would delete broken archive {info.name} {archive_id_hex}.")
|
||||
continue
|
||||
cdata = self.repository.get(archive_id)
|
||||
try:
|
||||
_, data = self.repo_objs.parse(archive_id, cdata, ro_type=ROBJ_ARCHIVE_META)
|
||||
except IntegrityErrorBase as integrity_error:
|
||||
logger.error(f"Archive metadata block {archive_id_hex} is corrupted: {integrity_error}")
|
||||
self.error_found = True
|
||||
if self.repair:
|
||||
logger.error(f"Deleting broken archive {info.name} {archive_id_hex}.")
|
||||
self.manifest.archives.delete_by_id(archive_id)
|
||||
else:
|
||||
logger.error(f"Would delete broken archive {info.name} {archive_id_hex}.")
|
||||
continue
|
||||
archive = self.key.unpack_archive(data)
|
||||
archive = ArchiveItem(internal_dict=archive)
|
||||
if archive.version != 2:
|
||||
raise Exception("Unknown archive metadata version")
|
||||
items_buffer = ChunkBuffer(self.key)
|
||||
items_buffer.write_chunk = add_callback
|
||||
for item in robust_iterator(archive):
|
||||
if "chunks" in item:
|
||||
verify_file_chunks(info.name, item)
|
||||
items_buffer.add(item)
|
||||
items_buffer.flush(flush=True)
|
||||
for i, info in enumerate(archive_infos):
|
||||
pi.show(i)
|
||||
archive_id, archive_id_hex = info.id, bin_to_hex(info.id)
|
||||
logger.info(
|
||||
f"Analyzing archive {info.name} {info.ts.astimezone()} {archive_id_hex} ({i + 1}/{num_archives})"
|
||||
)
|
||||
if archive_id not in self.chunks:
|
||||
logger.error(f"Archive metadata block {archive_id_hex} is missing!")
|
||||
self.error_found = True
|
||||
if self.repair:
|
||||
archive.item_ptrs = archive_put_items(
|
||||
items_buffer.chunks, repo_objs=self.repo_objs, add_reference=add_reference
|
||||
)
|
||||
data = self.key.pack_metadata(archive.as_dict())
|
||||
new_archive_id = self.key.id_hash(data)
|
||||
logger.debug(f"archive id old: {bin_to_hex(archive_id)}")
|
||||
logger.debug(f"archive id new: {bin_to_hex(new_archive_id)}")
|
||||
cdata = self.repo_objs.format(new_archive_id, {}, data, ro_type=ROBJ_ARCHIVE_META)
|
||||
add_reference(new_archive_id, len(data), cdata)
|
||||
self.manifest.archives.create(info.name, new_archive_id, info.ts)
|
||||
if archive_id != new_archive_id:
|
||||
self.manifest.archives.delete_by_id(archive_id)
|
||||
pi.finish()
|
||||
logger.error(f"Deleting broken archive {info.name} {archive_id_hex}.")
|
||||
self.manifest.archives.delete_by_id(archive_id)
|
||||
else:
|
||||
logger.error(f"Would delete broken archive {info.name} {archive_id_hex}.")
|
||||
continue
|
||||
cdata = self.repository.get(archive_id)
|
||||
try:
|
||||
_, data = self.repo_objs.parse(archive_id, cdata, ro_type=ROBJ_ARCHIVE_META)
|
||||
except IntegrityErrorBase as integrity_error:
|
||||
logger.error(f"Archive metadata block {archive_id_hex} is corrupted: {integrity_error}")
|
||||
self.error_found = True
|
||||
if self.repair:
|
||||
logger.error(f"Deleting broken archive {info.name} {archive_id_hex}.")
|
||||
self.manifest.archives.delete_by_id(archive_id)
|
||||
else:
|
||||
logger.error(f"Would delete broken archive {info.name} {archive_id_hex}.")
|
||||
continue
|
||||
archive = self.key.unpack_archive(data)
|
||||
archive = ArchiveItem(internal_dict=archive)
|
||||
if archive.version != 2:
|
||||
raise Exception("Unknown archive metadata version")
|
||||
items_buffer = ChunkBuffer(self.key)
|
||||
items_buffer.write_chunk = add_callback
|
||||
for item in robust_iterator(archive):
|
||||
if "chunks" in item:
|
||||
verify_file_chunks(info.name, item)
|
||||
items_buffer.add(item)
|
||||
items_buffer.flush(flush=True)
|
||||
if self.repair:
|
||||
archive.item_ptrs = archive_put_items(
|
||||
items_buffer.chunks, repo_objs=self.repo_objs, add_reference=add_reference
|
||||
)
|
||||
data = self.key.pack_metadata(archive.as_dict())
|
||||
new_archive_id = self.key.id_hash(data)
|
||||
logger.debug(f"archive id old: {bin_to_hex(archive_id)}")
|
||||
logger.debug(f"archive id new: {bin_to_hex(new_archive_id)}")
|
||||
cdata = self.repo_objs.format(new_archive_id, {}, data, ro_type=ROBJ_ARCHIVE_META)
|
||||
add_reference(new_archive_id, len(data), cdata)
|
||||
self.manifest.archives.create(info.name, new_archive_id, info.ts)
|
||||
if archive_id != new_archive_id:
|
||||
self.manifest.archives.delete_by_id(archive_id)
|
||||
pi.finish()
|
||||
|
||||
def finish(self):
|
||||
if self.repair:
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ from ..helpers import PathSpec
|
|||
from ..helpers import umount
|
||||
from ..helpers.argparsing import ArgumentParser
|
||||
from ..manifest import Manifest
|
||||
from ..repository import cache_if_remote
|
||||
|
||||
from ..logger import create_logger
|
||||
|
||||
|
|
@ -51,14 +50,13 @@ class MountMixIn:
|
|||
# Use llfuse/pyfuse3 implementation
|
||||
from ..fuse import FuseOperations
|
||||
|
||||
with cache_if_remote(repository, decrypted_cache=manifest.repo_objs) as cached_repo:
|
||||
operations = FuseOperations(manifest, args, cached_repo)
|
||||
logger.info("Mounting filesystem")
|
||||
try:
|
||||
operations.mount(args.mountpoint, args.options, args.foreground, args.show_rc)
|
||||
except RuntimeError:
|
||||
# Relevant error message already printed to stderr by FUSE
|
||||
raise RTError("FUSE mount failed")
|
||||
operations = FuseOperations(manifest, args, repository)
|
||||
logger.info("Mounting filesystem")
|
||||
try:
|
||||
operations.mount(args.mountpoint, args.options, args.foreground, args.show_rc)
|
||||
except RuntimeError:
|
||||
# Relevant error message already printed to stderr by FUSE
|
||||
raise RTError("FUSE mount failed")
|
||||
|
||||
def do_umount(self, args):
|
||||
"""Unmounts the FUSE filesystem."""
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ from collections import defaultdict, Counter
|
|||
from signal import SIGINT
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from .constants import ROBJ_FILE_STREAM, zeros
|
||||
from .constants import ROBJ_FILE_STREAM, ROBJ_DONTCARE, zeros
|
||||
|
||||
if TYPE_CHECKING:
|
||||
# For type checking, assume llfuse is available
|
||||
|
|
@ -112,8 +112,9 @@ class ItemCache:
|
|||
indirect_entry_struct = struct.Struct("=cII")
|
||||
assert indirect_entry_struct.size == 9
|
||||
|
||||
def __init__(self, decrypted_repository):
|
||||
self.decrypted_repository = decrypted_repository
|
||||
def __init__(self, repository, repo_objs):
|
||||
self.repository = repository
|
||||
self.repo_objs = repo_objs
|
||||
# self.meta, the "meta-array" is a densely packed array of metadata about where items can be found.
|
||||
# It is indexed by the inode number minus self.offset. (This is in a way eerily similar to how the first
|
||||
# unices did this).
|
||||
|
|
@ -133,7 +134,7 @@ class ItemCache:
|
|||
|
||||
# A temporary file that contains direct items, i.e. items directly cached in this layer.
|
||||
# These are items that span more than one chunk and thus cannot be efficiently cached
|
||||
# by the object cache (self.decrypted_repository), which would require variable-length structures;
|
||||
# by the object cache (self.chunks), which would require variable-length structures;
|
||||
# possible but not worth the effort, see iter_archive_items.
|
||||
self.fd = tempfile.TemporaryFile(prefix="borg-tmp")
|
||||
|
||||
|
|
@ -161,7 +162,8 @@ class ItemCache:
|
|||
chunk_id = bytes(self.meta[chunk_id_offset : chunk_id_offset + 32])
|
||||
chunk = self.chunks.get(chunk_id)
|
||||
if not chunk:
|
||||
csize, chunk = next(self.decrypted_repository.get_many([chunk_id]))
|
||||
cdata = self.repository.get(chunk_id)
|
||||
_, chunk = self.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_DONTCARE)
|
||||
self.chunks[chunk_id] = chunk
|
||||
data = memoryview(chunk)[chunk_offset:]
|
||||
unpacker = msgpack.Unpacker()
|
||||
|
|
@ -189,7 +191,8 @@ class ItemCache:
|
|||
meta = self.meta
|
||||
pack_indirect_into = self.indirect_entry_struct.pack_into
|
||||
|
||||
for key, (csize, data) in zip(archive_item_ids, self.decrypted_repository.get_many(archive_item_ids)):
|
||||
for key, cdata in zip(archive_item_ids, self.repository.get_many(archive_item_ids)):
|
||||
_, data = self.repo_objs.parse(key, cdata, ro_type=ROBJ_DONTCARE)
|
||||
# Store the chunk ID in the meta-array
|
||||
if write_offset + 32 >= len(meta):
|
||||
meta.extend(bytes(self.GROW_META_BY))
|
||||
|
|
@ -268,7 +271,7 @@ class ItemCache:
|
|||
class FuseBackend:
|
||||
"""Virtual filesystem based on archive(s) to provide information to fuse"""
|
||||
|
||||
def __init__(self, manifest, args, decrypted_repository):
|
||||
def __init__(self, manifest, args, repository):
|
||||
self._args = args
|
||||
self.numeric_ids = args.numeric_ids
|
||||
self._manifest = manifest
|
||||
|
|
@ -292,7 +295,7 @@ class FuseBackend:
|
|||
self.default_dir = None
|
||||
# Archives to be loaded when first accessed, mapped by their placeholder inode
|
||||
self.pending_archives = {}
|
||||
self.cache = ItemCache(decrypted_repository)
|
||||
self.cache = ItemCache(repository, self.repo_objs)
|
||||
self.allow_damaged_files = False
|
||||
self.versions = False
|
||||
self.uid_forced = None
|
||||
|
|
@ -477,10 +480,9 @@ class FuseBackend:
|
|||
class FuseOperations(llfuse.Operations, FuseBackend):
|
||||
"""Export archive as a FUSE filesystem"""
|
||||
|
||||
def __init__(self, manifest, args, decrypted_repository):
|
||||
def __init__(self, manifest, args, repository):
|
||||
llfuse.Operations.__init__(self)
|
||||
FuseBackend.__init__(self, manifest, args, decrypted_repository)
|
||||
self.decrypted_repository = decrypted_repository
|
||||
FuseBackend.__init__(self, manifest, args, repository)
|
||||
data_cache_capacity = int(os.environ.get("BORG_MOUNT_DATA_CACHE_ENTRIES", os.cpu_count() or 1))
|
||||
logger.debug("mount data cache capacity: %d chunks", data_cache_capacity)
|
||||
self.data_cache = LRUCache(capacity=data_cache_capacity)
|
||||
|
|
@ -510,7 +512,6 @@ class FuseOperations(llfuse.Operations, FuseBackend):
|
|||
self.data_cache._capacity,
|
||||
format_file_size(sum(len(chunk) for key, chunk in self.data_cache.items())),
|
||||
)
|
||||
self.decrypted_repository.log_instrumentation()
|
||||
|
||||
def mount(self, mountpoint, mount_options, foreground=False, show_rc=False):
|
||||
"""Mount filesystem on *mountpoint* with *mount_options*."""
|
||||
|
|
|
|||
|
|
@ -722,58 +722,3 @@ class Repository:
|
|||
def store_move(self, name, new_name=None, *, delete=False, undelete=False, deleted=False):
|
||||
self._lock_refresh()
|
||||
return self.store.move(name, new_name, delete=delete, undelete=undelete, deleted=deleted)
|
||||
|
||||
|
||||
class RepositoryNoCache:
|
||||
"""A Repository wrapper that passes through to the repository.
|
||||
|
||||
It applies an optional *transform* and provides a uniform context-manager API.
|
||||
|
||||
*transform* is a callable taking two arguments, key and raw repository data.
|
||||
The return value is returned from get()/get_many(). By default, the raw
|
||||
repository data is returned.
|
||||
"""
|
||||
|
||||
def __init__(self, repository, transform=None):
|
||||
self.repository = repository
|
||||
self.transform = transform or (lambda key, data: data)
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.close()
|
||||
|
||||
def get(self, key, read_data=True, raise_missing=True):
|
||||
return next(self.get_many([key], read_data=read_data, raise_missing=raise_missing, cache=False))
|
||||
|
||||
def get_many(self, keys, read_data=True, raise_missing=True, cache=True):
|
||||
for key, data in zip(keys, self.repository.get_many(keys, read_data=read_data, raise_missing=raise_missing)):
|
||||
yield self.transform(key, data)
|
||||
|
||||
def log_instrumentation(self):
|
||||
pass
|
||||
|
||||
|
||||
def cache_if_remote(repository, *, decrypted_cache=False, transform=None):
|
||||
"""
|
||||
Return a RepositoryNoCache wrapping *repository*.
|
||||
|
||||
If *decrypted_cache* is a repo_objs object, then get and get_many will return a tuple
|
||||
(csize, plaintext) instead of the actual data in the repository (the objects are
|
||||
parsed/decrypted via the *transform* derived from it).
|
||||
"""
|
||||
if decrypted_cache and transform:
|
||||
raise ValueError("decrypted_cache and transform are incompatible")
|
||||
elif decrypted_cache:
|
||||
repo_objs = decrypted_cache
|
||||
|
||||
def transform(id_, data):
|
||||
meta, decrypted = repo_objs.parse(id_, data, ro_type=ROBJ_DONTCARE)
|
||||
csize = meta.get("csize", len(data))
|
||||
return csize, decrypted
|
||||
|
||||
return RepositoryNoCache(repository, transform)
|
||||
|
|
|
|||
|
|
@ -3,13 +3,10 @@ import sys
|
|||
from hashlib import sha256
|
||||
|
||||
import pytest
|
||||
from ..constants import ROBJ_FILE_STREAM
|
||||
from ..helpers import IntegrityError, Location
|
||||
from ..repository import Repository, MAX_DATA_SIZE, cache_if_remote, rest_serve_command, PackWriter
|
||||
from ..repository import Repository, MAX_DATA_SIZE, rest_serve_command, PackWriter
|
||||
from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION
|
||||
from ..crypto.key import PlaintextKey
|
||||
from .hashindex_test import H
|
||||
from .crypto.key_test import TestKey
|
||||
|
||||
|
||||
def test_rest_serve_command_local():
|
||||
|
|
@ -153,56 +150,6 @@ def check(repository, repo_path, repair=False, status=True):
|
|||
assert tmp_files == [], "Found tmp files"
|
||||
|
||||
|
||||
class TestCacheIfRemote:
|
||||
@pytest.fixture
|
||||
def cache_repository(self, tmpdir):
|
||||
repository_location = os.path.join(str(tmpdir), "repository")
|
||||
with Repository(repository_location, exclusive=True, create=True) as repository:
|
||||
repository.put(H(1), fchunk(b"1234"))
|
||||
repository.put(H(2), fchunk(b"5678"))
|
||||
repository.put(H(3), fchunk(bytes(100)))
|
||||
yield repository
|
||||
|
||||
def test_passthrough(self, cache_repository):
|
||||
# Without decrypted_cache, raw repository data is passed through unchanged.
|
||||
with cache_if_remote(cache_repository) as cached:
|
||||
assert pdchunk(cached.get(H(1))) == b"1234"
|
||||
assert [pdchunk(ch) for ch in cached.get_many([H(1), H(2)])] == [b"1234", b"5678"]
|
||||
|
||||
@pytest.fixture
|
||||
def key(self, cache_repository, monkeypatch):
|
||||
monkeypatch.setenv("BORG_PASSPHRASE", "test")
|
||||
return PlaintextKey.create(cache_repository, TestKey.MockArgs())
|
||||
|
||||
@pytest.fixture
|
||||
def repo_objs(self, key):
|
||||
return RepoObj(key)
|
||||
|
||||
def _put_encrypted_object(self, repo_objs, repository, data):
|
||||
id_ = repo_objs.id_hash(data)
|
||||
repository.put(id_, repo_objs.format(id_, {}, data, ro_type=ROBJ_FILE_STREAM))
|
||||
return id_
|
||||
|
||||
@pytest.fixture
|
||||
def H1(self, repo_objs, cache_repository):
|
||||
return self._put_encrypted_object(repo_objs, cache_repository, b"1234")
|
||||
|
||||
@pytest.fixture
|
||||
def H2(self, repo_objs, cache_repository):
|
||||
return self._put_encrypted_object(repo_objs, cache_repository, b"5678")
|
||||
|
||||
def test_decrypted_cache(self, repo_objs, cache_repository, H1, H2):
|
||||
# With decrypted_cache, get/get_many return (csize, plaintext) tuples.
|
||||
with cache_if_remote(cache_repository, decrypted_cache=repo_objs) as cached:
|
||||
csize, plaintext = cached.get(H1)
|
||||
assert plaintext == b"1234"
|
||||
assert [pt for _csize, pt in cached.get_many([H1, H2])] == [b"1234", b"5678"]
|
||||
|
||||
def test_decrypted_cache_and_transform_incompatible(self, cache_repository, repo_objs):
|
||||
with pytest.raises(ValueError):
|
||||
cache_if_remote(cache_repository, decrypted_cache=repo_objs, transform=lambda key, data: data)
|
||||
|
||||
|
||||
class MockStore:
|
||||
def __init__(self):
|
||||
self.stored = {}
|
||||
|
|
|
|||
Loading…
Reference in a new issue