diff --git a/src/borg/archive.py b/src/borg/archive.py index 6a3414cfd..ad07fec7c 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -50,7 +50,7 @@ from .patterns import PathPrefixPattern, FnmatchPattern, IECommand from .item import Item, ArchiveItem, ItemDiff from . import platform from .platform import acl_get, acl_set, set_flags, get_flags, swidth -from .repository import Repository, NoManifestError, cache_if_remote +from .repository import Repository, NoManifestError from .repoobj import RepoObj has_link = hasattr(os, "link") @@ -2042,7 +2042,7 @@ class ArchiveChecker: return True, "" i = 0 - archive_items = archive_get_items(archive, repo_objs=self.repo_objs, repository=repository) + archive_items = archive_get_items(archive, repo_objs=self.repo_objs, repository=self.repository) for state, items in groupby(archive_items, missing_chunk_detector): items = list(items) if state % 2: @@ -2052,7 +2052,7 @@ class ArchiveChecker: continue if state > 0: unpacker.resync() - for chunk_id, cdata in zip(items, repository.get_many(items)): + for chunk_id, cdata in zip(items, self.repository.get_many(items)): try: _, data = self.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_ARCHIVE_STREAM) unpacker.feed(data) @@ -2104,59 +2104,58 @@ class ArchiveChecker: pi = ProgressIndicatorPercent( total=num_archives, msg="Checking archives %3.1f%%", step=0.1, msgid="check.rebuild_archives" ) - with cache_if_remote(self.repository) as repository: - for i, info in enumerate(archive_infos): - pi.show(i) - archive_id, archive_id_hex = info.id, bin_to_hex(info.id) - logger.info( - f"Analyzing archive {info.name} {info.ts.astimezone()} {archive_id_hex} ({i + 1}/{num_archives})" - ) - if archive_id not in self.chunks: - logger.error(f"Archive metadata block {archive_id_hex} is missing!") - self.error_found = True - if self.repair: - logger.error(f"Deleting broken archive {info.name} {archive_id_hex}.") - self.manifest.archives.delete_by_id(archive_id) - else: - logger.error(f"Would delete broken archive {info.name} {archive_id_hex}.") - continue - cdata = self.repository.get(archive_id) - try: - _, data = self.repo_objs.parse(archive_id, cdata, ro_type=ROBJ_ARCHIVE_META) - except IntegrityErrorBase as integrity_error: - logger.error(f"Archive metadata block {archive_id_hex} is corrupted: {integrity_error}") - self.error_found = True - if self.repair: - logger.error(f"Deleting broken archive {info.name} {archive_id_hex}.") - self.manifest.archives.delete_by_id(archive_id) - else: - logger.error(f"Would delete broken archive {info.name} {archive_id_hex}.") - continue - archive = self.key.unpack_archive(data) - archive = ArchiveItem(internal_dict=archive) - if archive.version != 2: - raise Exception("Unknown archive metadata version") - items_buffer = ChunkBuffer(self.key) - items_buffer.write_chunk = add_callback - for item in robust_iterator(archive): - if "chunks" in item: - verify_file_chunks(info.name, item) - items_buffer.add(item) - items_buffer.flush(flush=True) + for i, info in enumerate(archive_infos): + pi.show(i) + archive_id, archive_id_hex = info.id, bin_to_hex(info.id) + logger.info( + f"Analyzing archive {info.name} {info.ts.astimezone()} {archive_id_hex} ({i + 1}/{num_archives})" + ) + if archive_id not in self.chunks: + logger.error(f"Archive metadata block {archive_id_hex} is missing!") + self.error_found = True if self.repair: - archive.item_ptrs = archive_put_items( - items_buffer.chunks, repo_objs=self.repo_objs, add_reference=add_reference - ) - data = self.key.pack_metadata(archive.as_dict()) - new_archive_id = self.key.id_hash(data) - logger.debug(f"archive id old: {bin_to_hex(archive_id)}") - logger.debug(f"archive id new: {bin_to_hex(new_archive_id)}") - cdata = self.repo_objs.format(new_archive_id, {}, data, ro_type=ROBJ_ARCHIVE_META) - add_reference(new_archive_id, len(data), cdata) - self.manifest.archives.create(info.name, new_archive_id, info.ts) - if archive_id != new_archive_id: - self.manifest.archives.delete_by_id(archive_id) - pi.finish() + logger.error(f"Deleting broken archive {info.name} {archive_id_hex}.") + self.manifest.archives.delete_by_id(archive_id) + else: + logger.error(f"Would delete broken archive {info.name} {archive_id_hex}.") + continue + cdata = self.repository.get(archive_id) + try: + _, data = self.repo_objs.parse(archive_id, cdata, ro_type=ROBJ_ARCHIVE_META) + except IntegrityErrorBase as integrity_error: + logger.error(f"Archive metadata block {archive_id_hex} is corrupted: {integrity_error}") + self.error_found = True + if self.repair: + logger.error(f"Deleting broken archive {info.name} {archive_id_hex}.") + self.manifest.archives.delete_by_id(archive_id) + else: + logger.error(f"Would delete broken archive {info.name} {archive_id_hex}.") + continue + archive = self.key.unpack_archive(data) + archive = ArchiveItem(internal_dict=archive) + if archive.version != 2: + raise Exception("Unknown archive metadata version") + items_buffer = ChunkBuffer(self.key) + items_buffer.write_chunk = add_callback + for item in robust_iterator(archive): + if "chunks" in item: + verify_file_chunks(info.name, item) + items_buffer.add(item) + items_buffer.flush(flush=True) + if self.repair: + archive.item_ptrs = archive_put_items( + items_buffer.chunks, repo_objs=self.repo_objs, add_reference=add_reference + ) + data = self.key.pack_metadata(archive.as_dict()) + new_archive_id = self.key.id_hash(data) + logger.debug(f"archive id old: {bin_to_hex(archive_id)}") + logger.debug(f"archive id new: {bin_to_hex(new_archive_id)}") + cdata = self.repo_objs.format(new_archive_id, {}, data, ro_type=ROBJ_ARCHIVE_META) + add_reference(new_archive_id, len(data), cdata) + self.manifest.archives.create(info.name, new_archive_id, info.ts) + if archive_id != new_archive_id: + self.manifest.archives.delete_by_id(archive_id) + pi.finish() def finish(self): if self.repair: diff --git a/src/borg/archiver/mount_cmds.py b/src/borg/archiver/mount_cmds.py index 78cf64121..c748a0770 100644 --- a/src/borg/archiver/mount_cmds.py +++ b/src/borg/archiver/mount_cmds.py @@ -7,7 +7,6 @@ from ..helpers import PathSpec from ..helpers import umount from ..helpers.argparsing import ArgumentParser from ..manifest import Manifest -from ..repository import cache_if_remote from ..logger import create_logger @@ -51,14 +50,13 @@ class MountMixIn: # Use llfuse/pyfuse3 implementation from ..fuse import FuseOperations - with cache_if_remote(repository, decrypted_cache=manifest.repo_objs) as cached_repo: - operations = FuseOperations(manifest, args, cached_repo) - logger.info("Mounting filesystem") - try: - operations.mount(args.mountpoint, args.options, args.foreground, args.show_rc) - except RuntimeError: - # Relevant error message already printed to stderr by FUSE - raise RTError("FUSE mount failed") + operations = FuseOperations(manifest, args, repository) + logger.info("Mounting filesystem") + try: + operations.mount(args.mountpoint, args.options, args.foreground, args.show_rc) + except RuntimeError: + # Relevant error message already printed to stderr by FUSE + raise RTError("FUSE mount failed") def do_umount(self, args): """Unmounts the FUSE filesystem.""" diff --git a/src/borg/fuse.py b/src/borg/fuse.py index c8ec1880c..e92da2ffb 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -27,7 +27,7 @@ from collections import defaultdict, Counter from signal import SIGINT from typing import TYPE_CHECKING -from .constants import ROBJ_FILE_STREAM, zeros +from .constants import ROBJ_FILE_STREAM, ROBJ_DONTCARE, zeros if TYPE_CHECKING: # For type checking, assume llfuse is available @@ -112,8 +112,9 @@ class ItemCache: indirect_entry_struct = struct.Struct("=cII") assert indirect_entry_struct.size == 9 - def __init__(self, decrypted_repository): - self.decrypted_repository = decrypted_repository + def __init__(self, repository, repo_objs): + self.repository = repository + self.repo_objs = repo_objs # self.meta, the "meta-array" is a densely packed array of metadata about where items can be found. # It is indexed by the inode number minus self.offset. (This is in a way eerily similar to how the first # unices did this). @@ -133,7 +134,7 @@ class ItemCache: # A temporary file that contains direct items, i.e. items directly cached in this layer. # These are items that span more than one chunk and thus cannot be efficiently cached - # by the object cache (self.decrypted_repository), which would require variable-length structures; + # by the object cache (self.chunks), which would require variable-length structures; # possible but not worth the effort, see iter_archive_items. self.fd = tempfile.TemporaryFile(prefix="borg-tmp") @@ -161,7 +162,8 @@ class ItemCache: chunk_id = bytes(self.meta[chunk_id_offset : chunk_id_offset + 32]) chunk = self.chunks.get(chunk_id) if not chunk: - csize, chunk = next(self.decrypted_repository.get_many([chunk_id])) + cdata = self.repository.get(chunk_id) + _, chunk = self.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_DONTCARE) self.chunks[chunk_id] = chunk data = memoryview(chunk)[chunk_offset:] unpacker = msgpack.Unpacker() @@ -189,7 +191,8 @@ class ItemCache: meta = self.meta pack_indirect_into = self.indirect_entry_struct.pack_into - for key, (csize, data) in zip(archive_item_ids, self.decrypted_repository.get_many(archive_item_ids)): + for key, cdata in zip(archive_item_ids, self.repository.get_many(archive_item_ids)): + _, data = self.repo_objs.parse(key, cdata, ro_type=ROBJ_DONTCARE) # Store the chunk ID in the meta-array if write_offset + 32 >= len(meta): meta.extend(bytes(self.GROW_META_BY)) @@ -268,7 +271,7 @@ class ItemCache: class FuseBackend: """Virtual filesystem based on archive(s) to provide information to fuse""" - def __init__(self, manifest, args, decrypted_repository): + def __init__(self, manifest, args, repository): self._args = args self.numeric_ids = args.numeric_ids self._manifest = manifest @@ -292,7 +295,7 @@ class FuseBackend: self.default_dir = None # Archives to be loaded when first accessed, mapped by their placeholder inode self.pending_archives = {} - self.cache = ItemCache(decrypted_repository) + self.cache = ItemCache(repository, self.repo_objs) self.allow_damaged_files = False self.versions = False self.uid_forced = None @@ -477,10 +480,9 @@ class FuseBackend: class FuseOperations(llfuse.Operations, FuseBackend): """Export archive as a FUSE filesystem""" - def __init__(self, manifest, args, decrypted_repository): + def __init__(self, manifest, args, repository): llfuse.Operations.__init__(self) - FuseBackend.__init__(self, manifest, args, decrypted_repository) - self.decrypted_repository = decrypted_repository + FuseBackend.__init__(self, manifest, args, repository) data_cache_capacity = int(os.environ.get("BORG_MOUNT_DATA_CACHE_ENTRIES", os.cpu_count() or 1)) logger.debug("mount data cache capacity: %d chunks", data_cache_capacity) self.data_cache = LRUCache(capacity=data_cache_capacity) @@ -510,7 +512,6 @@ class FuseOperations(llfuse.Operations, FuseBackend): self.data_cache._capacity, format_file_size(sum(len(chunk) for key, chunk in self.data_cache.items())), ) - self.decrypted_repository.log_instrumentation() def mount(self, mountpoint, mount_options, foreground=False, show_rc=False): """Mount filesystem on *mountpoint* with *mount_options*.""" diff --git a/src/borg/repository.py b/src/borg/repository.py index 5f45d29c4..c8ae6c1a4 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -722,58 +722,3 @@ class Repository: def store_move(self, name, new_name=None, *, delete=False, undelete=False, deleted=False): self._lock_refresh() return self.store.move(name, new_name, delete=delete, undelete=undelete, deleted=deleted) - - -class RepositoryNoCache: - """A Repository wrapper that passes through to the repository. - - It applies an optional *transform* and provides a uniform context-manager API. - - *transform* is a callable taking two arguments, key and raw repository data. - The return value is returned from get()/get_many(). By default, the raw - repository data is returned. - """ - - def __init__(self, repository, transform=None): - self.repository = repository - self.transform = transform or (lambda key, data: data) - - def close(self): - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - - def get(self, key, read_data=True, raise_missing=True): - return next(self.get_many([key], read_data=read_data, raise_missing=raise_missing, cache=False)) - - def get_many(self, keys, read_data=True, raise_missing=True, cache=True): - for key, data in zip(keys, self.repository.get_many(keys, read_data=read_data, raise_missing=raise_missing)): - yield self.transform(key, data) - - def log_instrumentation(self): - pass - - -def cache_if_remote(repository, *, decrypted_cache=False, transform=None): - """ - Return a RepositoryNoCache wrapping *repository*. - - If *decrypted_cache* is a repo_objs object, then get and get_many will return a tuple - (csize, plaintext) instead of the actual data in the repository (the objects are - parsed/decrypted via the *transform* derived from it). - """ - if decrypted_cache and transform: - raise ValueError("decrypted_cache and transform are incompatible") - elif decrypted_cache: - repo_objs = decrypted_cache - - def transform(id_, data): - meta, decrypted = repo_objs.parse(id_, data, ro_type=ROBJ_DONTCARE) - csize = meta.get("csize", len(data)) - return csize, decrypted - - return RepositoryNoCache(repository, transform) diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index d4ea660c8..86fe55a23 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -3,13 +3,10 @@ import sys from hashlib import sha256 import pytest -from ..constants import ROBJ_FILE_STREAM from ..helpers import IntegrityError, Location -from ..repository import Repository, MAX_DATA_SIZE, cache_if_remote, rest_serve_command, PackWriter +from ..repository import Repository, MAX_DATA_SIZE, rest_serve_command, PackWriter from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION -from ..crypto.key import PlaintextKey from .hashindex_test import H -from .crypto.key_test import TestKey def test_rest_serve_command_local(): @@ -153,56 +150,6 @@ def check(repository, repo_path, repair=False, status=True): assert tmp_files == [], "Found tmp files" -class TestCacheIfRemote: - @pytest.fixture - def cache_repository(self, tmpdir): - repository_location = os.path.join(str(tmpdir), "repository") - with Repository(repository_location, exclusive=True, create=True) as repository: - repository.put(H(1), fchunk(b"1234")) - repository.put(H(2), fchunk(b"5678")) - repository.put(H(3), fchunk(bytes(100))) - yield repository - - def test_passthrough(self, cache_repository): - # Without decrypted_cache, raw repository data is passed through unchanged. - with cache_if_remote(cache_repository) as cached: - assert pdchunk(cached.get(H(1))) == b"1234" - assert [pdchunk(ch) for ch in cached.get_many([H(1), H(2)])] == [b"1234", b"5678"] - - @pytest.fixture - def key(self, cache_repository, monkeypatch): - monkeypatch.setenv("BORG_PASSPHRASE", "test") - return PlaintextKey.create(cache_repository, TestKey.MockArgs()) - - @pytest.fixture - def repo_objs(self, key): - return RepoObj(key) - - def _put_encrypted_object(self, repo_objs, repository, data): - id_ = repo_objs.id_hash(data) - repository.put(id_, repo_objs.format(id_, {}, data, ro_type=ROBJ_FILE_STREAM)) - return id_ - - @pytest.fixture - def H1(self, repo_objs, cache_repository): - return self._put_encrypted_object(repo_objs, cache_repository, b"1234") - - @pytest.fixture - def H2(self, repo_objs, cache_repository): - return self._put_encrypted_object(repo_objs, cache_repository, b"5678") - - def test_decrypted_cache(self, repo_objs, cache_repository, H1, H2): - # With decrypted_cache, get/get_many return (csize, plaintext) tuples. - with cache_if_remote(cache_repository, decrypted_cache=repo_objs) as cached: - csize, plaintext = cached.get(H1) - assert plaintext == b"1234" - assert [pt for _csize, pt in cached.get_many([H1, H2])] == [b"1234", b"5678"] - - def test_decrypted_cache_and_transform_incompatible(self, cache_repository, repo_objs): - with pytest.raises(ValueError): - cache_if_remote(cache_repository, decrypted_cache=repo_objs, transform=lambda key, data: data) - - class MockStore: def __init__(self): self.stored = {}