From cac7237d3f509932f96b4ba8ff63c3753a2bab94 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Mon, 8 Jun 2026 08:04:59 +0200 Subject: [PATCH 1/4] remove ssh:// and socket:// remote repository for current repos The modern client/server transport (RemoteRepository served by `borg serve` over an msgpack RPC protocol) is now redundant for current (borg 2) repos: its functionality is replaced by rest:// (which can tunnel over ssh to a remote borgstore REST server). Remove the modern RemoteRepository (both ssh:// and socket://) entirely. Legacy v1 (borg 1.x) repos remain reachable over ssh:// via the separate LegacyRemoteRepository client, and `borg serve` / RepositoryServer is kept, trimmed to the legacy-only path, so a remote borg2 can still serve a v1 repo for `borg transfer --from-borg1`. Details: - remote.py: delete RemoteRepository, SleepingBandwidthLimiter and the `api` decorator; trim RepositoryServer to legacy-only (drop modern _rpc_methods, socket serving, non-legacy open() branch); keep cache_if_remote / RepositoryCache / RepositoryNoCache (used by all repos). - get_repository(): non-legacy ssh:// now raises a clear "use rest://" error; socket:// route and the global --socket option removed. - parseformat: drop the socket:// scheme (now an invalid location). - borg serve: keep the command (serves legacy v1 ssh only); update epilog. - borg version: drop modern remote query; keep legacy ssh path. - update isinstance/import sites (cache, archive, fuse/hlfuse, analyze/compact, archiver __init__ -> LegacyRemoteRepository.RPCError). - tests/docs updated; obsolete socket serve test removed. Co-Authored-By: Claude Opus 4.8 --- docs/changes.rst | 4 + docs/deployment/central-backup-server.rst | 8 + docs/quickstart.rst | 28 +- src/borg/archive.py | 4 +- src/borg/archiver/__init__.py | 6 +- src/borg/archiver/_common.py | 26 +- src/borg/archiver/analyze_cmd.py | 3 +- src/borg/archiver/compact_cmd.py | 3 +- src/borg/archiver/serve_cmd.py | 15 +- src/borg/archiver/version_cmd.py | 19 +- src/borg/cache.py | 3 +- src/borg/conftest.py | 2 +- src/borg/fuse.py | 14 +- src/borg/helpers/parseformat.py | 9 +- src/borg/hlfuse.py | 12 +- src/borg/remote.py | 812 +----------------- src/borg/testsuite/archiver/__init__.py | 6 +- src/borg/testsuite/archiver/check_cmd_test.py | 3 +- src/borg/testsuite/archiver/checks_test.py | 45 - src/borg/testsuite/archiver/serve_cmd_test.py | 55 -- .../testsuite/helpers/parseformat_test.py | 9 +- src/borg/testsuite/remote_test.py | 57 +- src/borg/testsuite/repository_test.py | 152 +--- 23 files changed, 101 insertions(+), 1194 deletions(-) delete mode 100644 src/borg/testsuite/archiver/serve_cmd_test.py diff --git a/docs/changes.rst b/docs/changes.rst index ece97339f..04100fde5 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -171,6 +171,10 @@ New features: - WIP packs project, major repo format changes, you must create new repos! #8572 - rest:// repository URLs - connect via ssh to remote borgstore REST server, talking http via stdio, #9593 +- removed ssh:// and socket:// support for current repositories; use a rest:// + repository instead (it can tunnel over ssh). ssh:// and ``borg serve`` remain + available only for legacy (borg 1.x / v1) repositories, e.g. for + ``borg transfer --from-borg1 --other-repo ssh://...``. - prune: show total vs matching archives in output, #9262 - prune: add --json option, #9222 - archive: preserve cwd archive metadata, #9495 diff --git a/docs/deployment/central-backup-server.rst b/docs/deployment/central-backup-server.rst index d697cce92..b1b0860cf 100644 --- a/docs/deployment/central-backup-server.rst +++ b/docs/deployment/central-backup-server.rst @@ -8,6 +8,14 @@ Central repository server with Ansible or Salt This section gives an example of how to set up a Borg repository server for multiple clients. +.. note:: + + This example predates Borg 2 and uses the legacy ``ssh://`` transport (served + by ``borg serve``) and ``borg init``. With Borg 2, the ``ssh://`` transport is + only used for legacy borg 1.x (v1) repositories; for current repositories use a + ``rest://`` repository instead (Borg connects via ssh and runs a borgstore REST + server on the remote host), and use ``borg repo-create`` instead of ``borg init``. + Machines -------- diff --git a/docs/quickstart.rst b/docs/quickstart.rst index b5313d9cf..222e3eb4c 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -64,7 +64,8 @@ If you only back up your own files, run it as your normal user (i.e. not root). For a local repository always use the same user to invoke borg. -For a remote repository: always use e.g., ssh://borg@remote_host. You can use this +For a remote repository: always use e.g., rest://borg@remote_host (Borg connects +via ssh and runs a borgstore REST server on the remote). You can use this from different local users; the remote user running borg and accessing the repo will always be `borg`. @@ -142,7 +143,7 @@ backed up and that the ``prune`` command keeps and deletes the correct backups. #!/bin/sh # Setting this, so the repo does not need to be given on the commandline: - export BORG_REPO=ssh://username@example.com:2022/~/backup/main + export BORG_REPO=rest://username@example.com:2022/path/to/backup/main # See the section "Passphrase notes" for more infos. export BORG_PASSPHRASE='XYZl0ngandsecurepa_55_phrasea&&123' @@ -361,19 +362,21 @@ Remote repositories Borg can initialize and access repositories on remote hosts if the host is accessible using SSH. This is fastest and easiest when Borg -is installed on the remote host, in which case the following syntax is used:: +is installed on the remote host, in which case a ``rest://`` repository URL is +used. Borg connects via SSH and runs a borgstore REST server on the remote host +(talking HTTP over stdio):: - $ borg -r ssh://user@hostname:port/path/to/repo repo-create ... + $ borg -r rest://user@hostname:port/path/to/repo repo-create ... Note: Please see the usage chapter for a full documentation of repo URLs. Also see :ref:`ssh_configuration` for recommended settings to avoid disconnects and hangs. -Remote operations over SSH can be automated with SSH keys. You can restrict the -use of the SSH keypair by prepending a forced command to the SSH public key in -the remote server's `authorized_keys` file. This example will start Borg -in server mode and limit it to a specific filesystem path:: +.. note:: - command="borg serve --restrict-to-path /path/to/repo",restrict ssh-rsa AAAAB3[...] + The legacy ``ssh://`` transport, served by ``borg serve`` on the remote host, + is now only used to access legacy borg 1.x (v1) repositories (e.g. via + ``borg transfer --from-borg1 --other-repo ssh://...``). For current + repositories, use a ``rest://`` repository as shown above. If it is not possible to install Borg on the remote host, it is still possible to use the remote host to store a repository by @@ -530,7 +533,8 @@ Example with **borg extract**: Difference when using a **remote borg backup server**: It is basically all the same as with the local repository, but you need to -refer to the repo using a ``ssh://`` URL. +refer to the repo using a ``rest://`` URL (Borg connects via ssh and runs a +borgstore REST server on the remote host). In the given example, ``borg`` is the user name used to log into the machine ``backup.example.org`` which runs ssh on port ``2222`` and has the borg repo @@ -544,6 +548,6 @@ case if unattended, automated backups were done). :: - borg -r ssh://borg@backup.example.org:2222/path/to/repo mount /mnt/borg + borg -r rest://borg@backup.example.org:2222/path/to/repo mount /mnt/borg # or - borg -r ssh://borg@backup.example.org:2222/path/to/repo extract archive + borg -r rest://borg@backup.example.org:2222/path/to/repo extract archive diff --git a/src/borg/archive.py b/src/borg/archive.py index 43f8434d3..82bcaa98c 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -50,7 +50,7 @@ from .patterns import PathPrefixPattern, FnmatchPattern, IECommand from .item import Item, ArchiveItem, ItemDiff from . import platform from .platform import acl_get, acl_set, set_flags, get_flags, swidth -from .remote import RemoteRepository, cache_if_remote +from .remote import cache_if_remote from .repository import Repository, NoManifestError from .repoobj import RepoObj @@ -1773,7 +1773,7 @@ class ArchiveChecker: :param oldest/newest: only check archives older/newer than timedelta from oldest/newest archive timestamp :param verify_data: integrity verification of data referenced by archives """ - if not isinstance(repository, (Repository, RemoteRepository)): + if not isinstance(repository, Repository): logger.error("Checking legacy repositories is not supported.") return False logger.info("Starting archive consistency check...") diff --git a/src/borg/archiver/__init__.py b/src/borg/archiver/__init__.py index b90a7c7eb..db3a9bec0 100644 --- a/src/borg/archiver/__init__.py +++ b/src/borg/archiver/__init__.py @@ -47,7 +47,7 @@ try: from ..helpers import sig_int from ..helpers import get_config_dir from ..platformflags import is_msystem - from ..remote import RemoteRepository + from ..legacy.remote import LegacyRemoteRepository from ..selftest import selftest except BaseException: # an unhandled exception in the try-block would cause the borg cli command to exit with rc 1 due to python's @@ -538,7 +538,7 @@ def sig_trace_handler(sig_no, stack): # pragma: no cover def format_tb(exc): qualname = type(exc).__qualname__ - remote = isinstance(exc, RemoteRepository.RPCError) + remote = isinstance(exc, LegacyRemoteRepository.RPCError) if remote: prefix = "Borg server: " trace_back = "\n".join(prefix + line for line in exc.exception_full.splitlines()) @@ -632,7 +632,7 @@ def main(): # pragma: no cover tb_log_level = logging.ERROR if e.traceback else logging.DEBUG tb = format_tb(e) exit_code = e.exit_code - except RemoteRepository.RPCError as e: + except LegacyRemoteRepository.RPCError as e: important = e.traceback msg = e.exception_full if important else e.get_message() msgid = e.exception_class diff --git a/src/borg/archiver/_common.py b/src/borg/archiver/_common.py index 921b7c263..4a566c236 100644 --- a/src/borg/archiver/_common.py +++ b/src/borg/archiver/_common.py @@ -13,7 +13,6 @@ from ..helpers.argparsing import SUPPRESS, PositiveInt from ..helpers.nanorst import rst_to_terminal from ..manifest import Manifest, AI_HUMAN_SORT_KEYS from ..patterns import PatternMatcher -from ..remote import RemoteRepository from ..repository import Repository from ..repoobj import RepoObj from ..patterns import ( @@ -30,16 +29,19 @@ logger = create_logger(__name__) def get_repository(location, *, create, exclusive, lock_wait, lock, args, v1_legacy): - if location.proto in ("ssh", "socket"): + if location.proto == "ssh": if v1_legacy: from ..legacy.remote import LegacyRemoteRepository - RemoteRepoCls = LegacyRemoteRepository + repository = LegacyRemoteRepository( + location, create=create, exclusive=exclusive, lock_wait=lock_wait, lock=lock, args=args + ) else: - RemoteRepoCls = RemoteRepository - repository = RemoteRepoCls( - location, create=create, exclusive=exclusive, lock_wait=lock_wait, lock=lock, args=args - ) + raise Error( + "ssh:// is no longer supported for current repositories; use rest:// instead " + "(it can tunnel over ssh). ssh:// remains available only for legacy v1 repositories " + "via --from-borg1." + ) elif ( location.proto in ("rest", "sftp", "file", "http", "https", "rclone", "s3", "b2") and not v1_legacy @@ -584,16 +586,6 @@ def define_common_options(add_common_option): action=Highlander, help="Use this command to connect to the 'borg serve' process (default: 'ssh')", ) - add_common_option( - "--socket", - metavar="PATH", - dest="use_socket", - default=False, - const=True, - nargs="?", - action=Highlander, - help="Use UNIX DOMAIN (IPC) socket at PATH for client/server communication with socket: protocol.", - ) add_common_option( "-r", "--repo", diff --git a/src/borg/archiver/analyze_cmd.py b/src/borg/archiver/analyze_cmd.py index 3db076aaa..7608c93e9 100644 --- a/src/borg/archiver/analyze_cmd.py +++ b/src/borg/archiver/analyze_cmd.py @@ -8,7 +8,6 @@ from ..helpers import bin_to_hex, Error from ..helpers import ProgressIndicatorPercent from ..helpers.argparsing import ArgumentParser from ..manifest import Manifest -from ..remote import RemoteRepository from ..repository import Repository from ..logger import create_logger @@ -20,7 +19,7 @@ class ArchiveAnalyzer: def __init__(self, args, repository, manifest): self.args = args self.repository = repository - assert isinstance(repository, (Repository, RemoteRepository)) + assert isinstance(repository, Repository) self.manifest = manifest self.difference_by_path = defaultdict(int) # directory path -> count of chunks changed diff --git a/src/borg/archiver/compact_cmd.py b/src/borg/archiver/compact_cmd.py index 66ac4bfbc..1bdd4ebe9 100644 --- a/src/borg/archiver/compact_cmd.py +++ b/src/borg/archiver/compact_cmd.py @@ -11,7 +11,6 @@ from ..hashindex import ChunkIndex, ChunkIndexEntry from ..helpers import set_ec, EXIT_ERROR, format_file_size, bin_to_hex from ..helpers import ProgressIndicatorPercent from ..manifest import Manifest -from ..remote import RemoteRepository from ..repository import Repository, repo_lister from ..logger import create_logger @@ -22,7 +21,7 @@ logger = create_logger() class ArchiveGarbageCollector: def __init__(self, repository, manifest, *, stats, iec): self.repository = repository - assert isinstance(repository, (Repository, RemoteRepository)) + assert isinstance(repository, Repository) self.manifest = manifest self.chunks = None # a ChunkIndex, here used for: id -> (is_used, stored_size) self.total_files = None # overall number of source files written to all archives in this repo diff --git a/src/borg/archiver/serve_cmd.py b/src/borg/archiver/serve_cmd.py index 36661758e..6f9ebba97 100644 --- a/src/borg/archiver/serve_cmd.py +++ b/src/borg/archiver/serve_cmd.py @@ -13,7 +13,6 @@ class ServeMixIn: RepositoryServer( restrict_to_paths=args.restrict_to_paths, restrict_to_repositories=args.restrict_to_repositories, - use_socket=args.use_socket, permissions=args.permissions, ).serve() @@ -24,15 +23,13 @@ class ServeMixIn: """ This command starts a repository server process. - `borg serve` currently supports: + `borg serve` is only used to serve legacy (borg 1.x / v1) repositories over SSH, so that + such repositories can still be accessed remotely (e.g. for `borg transfer --from-borg1`). + Current repositories are accessed via a rest:// repository instead (which can itself tunnel + over SSH), so they do not use `borg serve`. - - Being automatically started via SSH when the borg client uses an ssh://... - remote repository. In this mode, `borg serve` will run until that SSH connection - is terminated. - - - Being started by some other means (not by the borg client) as a long-running socket - server to be used for borg clients using a socket://... repository (see the `--socket` - option if you do not want to use the default path for the socket and PID file). + It is automatically started via SSH when a borg client uses an ssh://... repository. + In this mode, `borg serve` will run until that SSH connection is terminated. Please note that `borg serve` does not support providing a specific repository via the `--repo` option or the `BORG_REPO` environment variable. It is always the borg client that diff --git a/src/borg/archiver/version_cmd.py b/src/borg/archiver/version_cmd.py index 409baaeb7..df5a6f589 100644 --- a/src/borg/archiver/version_cmd.py +++ b/src/borg/archiver/version_cmd.py @@ -1,7 +1,6 @@ from .. import __version__ from ..constants import * # NOQA from ..helpers.argparsing import ArgumentParser -from ..remote import RemoteRepository from ..logger import create_logger @@ -14,8 +13,10 @@ class VersionMixIn: from borg.version import parse_version, format_version client_version = parse_version(__version__) - if args.location.proto in ("ssh", "socket"): - with RemoteRepository(args.location, lock=False, args=args) as repository: + if args.location.proto == "ssh" and getattr(args, "v1_legacy", False): + from ..legacy.remote import LegacyRemoteRepository + + with LegacyRemoteRepository(args.location, lock=False, args=args) as repository: server_version = repository.server_version else: server_version = client_version @@ -28,11 +29,11 @@ class VersionMixIn: """ This command displays the Borg client and server versions. - If a local repository is given, the client code directly accesses the repository, - so the client version is also shown as the server version. + For current repositories the client code directly accesses the repository (also for + rest:// repositories), so the client version is shown as the server version, too. - If a remote repository is given (e.g., ssh:), the remote Borg is queried, and - its version is displayed as the server version. + If a legacy (borg 1.x / v1) repository is given via ssh: together with --from-borg1, + the remote Borg is queried, and its version is displayed as the server version. Examples:: @@ -40,8 +41,8 @@ class VersionMixIn: $ borg version /mnt/backup 1.4.0a / 1.4.0a - # remote repository (client uses 1.4.0 alpha, server uses 1.2.7 release) - $ borg version ssh://borg@borgbackup:repo + # legacy remote repository (client uses 1.4.0 alpha, server uses 1.2.7 release) + $ borg version --from-borg1 ssh://borg@borgbackup:repo 1.4.0a / 1.2.7 Due to the version tuple format used in Borg client/server negotiation, only diff --git a/src/borg/cache.py b/src/borg/cache.py index d6138db86..7ef37c89f 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -32,7 +32,6 @@ from .item import ChunkListEntry from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError from .manifest import Manifest from .platform import SaveFile -from .remote import RemoteRepository from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound, repo_lister from .security import SecurityManager, assert_secure # noqa: F401 @@ -651,7 +650,7 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi flags=ChunkIndex.F_USED, size=0, pack_id=pack_id, obj_offset=0, obj_size=obj_size ) # Cache does not contain the manifest. - if not isinstance(repository, (Repository, RemoteRepository)): + if not isinstance(repository, Repository): del chunks[Manifest.MANIFEST_ID] duration = perf_counter() - t0 or 0.001 # Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes. diff --git a/src/borg/conftest.py b/src/borg/conftest.py index e9ea95f61..557cb66f7 100644 --- a/src/borg/conftest.py +++ b/src/borg/conftest.py @@ -96,7 +96,7 @@ class ArchiverSetup: self.patterns_file_path: str | None = None def get_kind(self) -> str: - if self.repository_location.startswith("ssh://__testsuite__") or self.repository_location.startswith("rest://"): + if self.repository_location.startswith("rest://"): return "remote" elif self.EXE == "borg.exe": return "binary" diff --git a/src/borg/fuse.py b/src/borg/fuse.py index fca4b7aee..c8ec1880c 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -62,7 +62,7 @@ from .crypto.low_level import blake2b_128 from .archiver._common import build_matcher, build_filter from .archive import Archive, get_item_uid_gid from .hashindex import FuseVersionsIndex -from .helpers import daemonize, daemonizing, signal_handler, format_file_size, bin_to_hex, Error +from .helpers import daemonizing, signal_handler, format_file_size, bin_to_hex, Error from .helpers import HardLinkManager from .helpers import msgpack from .helpers.lrucache import LRUCache @@ -70,7 +70,6 @@ from .item import Item from .platform import uid2user, gid2group from .platformflags import is_darwin from .repository import Repository -from .remote import RemoteRepository def fuse_main(): @@ -596,13 +595,10 @@ class FuseOperations(llfuse.Operations, FuseBackend): "that do not reflect the archive content." ) if not foreground: - if isinstance(self.repository_uncached, RemoteRepository): - daemonize() - else: - with daemonizing(show_rc=show_rc) as (old_id, new_id): - # local repo: the locking process' PID is changing, migrate it: - logger.debug("fuse: mount local repo, going to background: migrating lock.") - self.repository_uncached.migrate_lock(old_id, new_id) + with daemonizing(show_rc=show_rc) as (old_id, new_id): + # the locking process' PID is changing, migrate it: + logger.debug("fuse: mount repo, going to background: migrating lock.") + self.repository_uncached.migrate_lock(old_id, new_id) # If the file system crashes, we do not want to umount because in that # case the mountpoint suddenly appears to become empty. This can have diff --git a/src/borg/helpers/parseformat.py b/src/borg/helpers/parseformat.py index c8b6c417b..23b0b13be 100644 --- a/src/borg/helpers/parseformat.py +++ b/src/borg/helpers/parseformat.py @@ -600,7 +600,7 @@ class Location: rclone_re = re.compile(r"(?Prclone):(?P(.*))", re.VERBOSE) sl = "/" if is_win32 else "" - file_or_socket_re = re.compile(r"(?P(file|socket))://" + sl + abs_path_re, re.VERBOSE) + file_re = re.compile(r"(?Pfile)://" + sl + abs_path_re, re.VERBOSE) local_re = re.compile(local_path_re, re.VERBOSE) @@ -666,7 +666,7 @@ class Location: self.proto = m.group("proto") self.path = m.group("path") return True - m = self.file_or_socket_re.match(text) + m = self.file_re.match(text) if m: self.proto = m.group("proto") self.path = os.path.normpath(m.group("path")) @@ -709,7 +709,7 @@ class Location: return self._host.lstrip("[").rstrip("]") def canonical_path(self): - if self.proto in ("file", "socket"): + if self.proto == "file": return self.path if self.proto == "rclone": return f"{self.proto}:{self.path}" @@ -1342,11 +1342,10 @@ class BorgJsonEncoder(json.JSONEncoder): from ..legacy.repository import LegacyRepository from ..repository import Repository from ..legacy.remote import LegacyRemoteRepository - from ..remote import RemoteRepository from ..archive import Archive from ..cache import AdHocWithFilesCache - if isinstance(o, (LegacyRepository, LegacyRemoteRepository)) or isinstance(o, (Repository, RemoteRepository)): + if isinstance(o, (LegacyRepository, LegacyRemoteRepository)) or isinstance(o, Repository): return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()} if isinstance(o, Archive): return o.info() diff --git a/src/borg/hlfuse.py b/src/borg/hlfuse.py index 08c069208..b5f106d0d 100644 --- a/src/borg/hlfuse.py +++ b/src/borg/hlfuse.py @@ -24,7 +24,7 @@ logger = create_logger() from .archiver._common import build_matcher, build_filter from .archive import Archive, get_item_uid_gid from .hashindex import FuseVersionsIndex -from .helpers import daemonize, daemonizing, signal_handler, bin_to_hex, Error +from .helpers import daemonizing, signal_handler, bin_to_hex, Error from .helpers import HardLinkManager from .helpers import msgpack from .helpers.lrucache import LRUCache @@ -32,7 +32,6 @@ from .item import Item from .platform import uid2user, gid2group from .platformflags import is_darwin from .repository import Repository -from .remote import RemoteRepository BLOCK_SIZE = 512 # Standard filesystem block size for st_blocks and statfs DEBUG_LOG: str | None = None # os.path.join(os.getcwd(), "fuse_debug.log") @@ -529,12 +528,9 @@ class borgfs(hlfuse.Operations, FuseBackend): # hlfuse.FUSE will block if foreground=True, otherwise it returns immediately if not foreground: # Background mode: daemonize first, then start FUSE (blocking) - if isinstance(self.repository, RemoteRepository): - daemonize() - else: - with daemonizing(show_rc=show_rc) as (old_id, new_id): - logger.debug("fuse: mount local repo, going to background: migrating lock.") - self.repository.migrate_lock(old_id, new_id) + with daemonizing(show_rc=show_rc) as (old_id, new_id): + logger.debug("fuse: mount repo, going to background: migrating lock.") + self.repository.migrate_lock(old_id, new_id) # Run the FUSE main loop in foreground (we might be daemonized already or not) with signal_handler("SIGUSR1", self.sig_info_handler), signal_handler("SIGINFO", self.sig_info_handler): diff --git a/src/borg/remote.py b/src/borg/remote.py index 7f4589406..786fc390e 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -1,21 +1,15 @@ -import atexit import errno -import functools import inspect import logging import os import queue import select -import shlex import shutil -import socket import struct import sys import tempfile -import textwrap import time import traceback -from subprocess import Popen, PIPE from xxhash import xxh64 @@ -23,45 +17,22 @@ import borg.logger from . import __version__ from .compress import Compressor from .constants import * # NOQA -from .helpers import Error, ErrorWithTraceback, IntegrityError +from .helpers import Error, IntegrityError from .helpers import bin_to_hex from .helpers import get_limited_unpacker -from .helpers import replace_placeholders from .helpers import sysinfo from .helpers import format_file_size from .helpers import safe_unlink -from .helpers import prepare_subprocess_env, ignore_sigint -from .helpers import get_socket_filename -from .fslocking import LockTimeout, NotLocked, NotMyLock, LockFailed from .logger import create_logger, borg_serve_log_queue -from .manifest import NoManifestError from .helpers import msgpack from .repository import Repository, StoreObjectNotFound -from .version import parse_version, format_version -from .helpers.datastruct import EfficientCollectionQueue -from .platform import is_win32 +from .version import parse_version logger = create_logger(__name__) BORG_VERSION = parse_version(__version__) MSGID, MSG, ARGS, RESULT, LOG = "i", "m", "a", "r", "l" -MAX_INFLIGHT = 100 - -RATELIMIT_PERIOD = 0.1 - - -class ConnectionClosed(Error): - """Connection closed by remote host""" - - exit_mcode = 80 - - -class ConnectionClosedWithHint(ConnectionClosed): - """Connection closed by remote host. {}""" - - exit_mcode = 81 - class PathNotAllowed(Error): """Repository path not allowed: {}""" @@ -81,41 +52,10 @@ class UnexpectedRPCDataFormatFromClient(Error): exit_mcode = 85 -class UnexpectedRPCDataFormatFromServer(Error): - """Got unexpected RPC data format from server:\n{}""" - - exit_mcode = 86 - - def __init__(self, data): - try: - data = data.decode()[:128] - except UnicodeDecodeError: - data = data[:128] - data = ["%02X" % byte for byte in data] - data = textwrap.fill(" ".join(data), 16 * 3) - super().__init__(data) - - -class ConnectionBrokenWithHint(Error): - """Connection to remote host is broken. {}""" - - exit_mcode = 87 - - # Protocol compatibility: -# In general the server is responsible for rejecting too old clients and the client it responsible for rejecting -# too old servers. This ensures that the knowledge what is compatible is always held by the newer component. -# -# For the client the return of the negotiate method is a dict which includes the server version. -# -# All method calls on the remote repository object must be allowlisted in RepositoryServer.rpc_methods and have api -# stubs in RemoteRepository*. The @api decorator on these stubs is used to set server version requirements. -# -# Method parameters are identified only by name and never by position. Unknown parameters are ignored by the server. -# If a new parameter is important and may not be ignored, on the client a parameter specific version requirement needs -# to be added. -# When parameters are removed, they need to be preserved as defaulted parameters on the client stubs so that older -# servers still get compatible input. +# borg only serves legacy (borg 1.x / v1) repositories over ssh:// now (current repositories use rest://). +# The legacy client lives in borg.legacy.remote (LegacyRemoteRepository); this server keeps the legacy +# RPC method allowlist and opens repositories using LegacyRepository. class RepositoryServer: # pragma: no cover @@ -140,49 +80,14 @@ class RepositoryServer: # pragma: no cover "get_manifest", # borg2 LegacyRepository has this ) - _rpc_methods = ( # Repository - "__len__", - "check", - "delete", - "destroy", - "get", - "list", - "negotiate", - "open", - "close", - "info", - "put", - "save_key", - "load_key", - "break_lock", - "inject_exception", - "get_manifest", - "put_manifest", - "store_list", - "store_load", - "store_store", - "store_delete", - "store_move", - ) - - def __init__(self, restrict_to_paths, restrict_to_repositories, use_socket, permissions=None): + def __init__(self, restrict_to_paths, restrict_to_repositories, permissions=None): self.repository = None self.RepoCls = None self.rpc_methods = ("open", "close", "negotiate") self.restrict_to_paths = restrict_to_paths self.restrict_to_repositories = restrict_to_repositories self.permissions = permissions - # This flag is parsed from the serve command line via Archiver.do_serve, - # i.e. it reflects local system policy and generally ranks higher than - # whatever the client wants, except when initializing a new repository - # (see RepositoryServer.open below). self.client_version = None # we update this after client sends version information - if use_socket is False: - self.socket_path = None - elif use_socket is True: # --socket - self.socket_path = get_socket_filename() - else: # --socket=/some/path - self.socket_path = use_socket def filter_args(self, f, kwargs): """Remove unknown named parameters from call, because client did (implicitly) say it's ok.""" @@ -308,38 +213,10 @@ class RepositoryServer: # pragma: no cover shutdown_serve = True continue - if self.socket_path: # server for socket:// connections - try: - # remove any left-over socket file - os.unlink(self.socket_path) - except OSError: - if os.path.exists(self.socket_path): - raise - sock_dir = os.path.dirname(self.socket_path) - os.makedirs(sock_dir, exist_ok=True) - pid_file = self.socket_path.removesuffix(".sock") + ".pid" - pid = os.getpid() - with open(pid_file, "w") as f: - f.write(str(pid)) - atexit.register(functools.partial(os.remove, pid_file)) - atexit.register(functools.partial(os.remove, self.socket_path)) - sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) - sock.bind(self.socket_path) # this creates the socket file in the fs - sock.listen(0) # no backlog - os.chmod(self.socket_path, mode=0o0770) # group members may use the socket, too. - print(f"borg serve: PID {pid}, listening on socket {self.socket_path} ...", file=sys.stderr) - - while True: - connection, client_address = sock.accept() - print(f"Accepted a connection on socket {self.socket_path} ...", file=sys.stderr) - self.stdin_fd = connection.makefile("rb").fileno() - self.stdout_fd = connection.makefile("wb").fileno() - inner_serve() - print(f"Finished with connection on socket {self.socket_path} .", file=sys.stderr) - else: # server for one ssh:// connection - self.stdin_fd = sys.stdin.fileno() - self.stdout_fd = sys.stdout.fileno() - inner_serve() + # server for one ssh:// connection + self.stdin_fd = sys.stdin.fileno() + self.stdout_fd = sys.stdout.fileno() + inner_serve() def negotiate(self, client_data): if isinstance(client_data, dict): @@ -357,13 +234,11 @@ class RepositoryServer: # pragma: no cover return path def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, v1_legacy=False): - if v1_legacy: - from .legacy.repository import LegacyRepository + # borg only serves legacy (v1) repositories now; current repositories are accessed via rest://. + from .legacy.repository import LegacyRepository - self.RepoCls = LegacyRepository - else: - self.RepoCls = Repository - self.rpc_methods = self._legacy_rpc_methods if v1_legacy else self._rpc_methods + self.RepoCls = LegacyRepository + self.rpc_methods = self._legacy_rpc_methods logging.debug("Resolving repository path %r", path) path = self._resolve_path(path) logging.debug("Resolved repository path to %r", path) @@ -386,8 +261,6 @@ class RepositoryServer: # pragma: no cover else: raise PathNotAllowed(path) kwargs = dict(lock_wait=lock_wait, lock=lock, exclusive=exclusive, send_log_cb=self.send_queued_log) - if not v1_legacy: - kwargs["permissions"] = self.permissions self.repository = self.RepoCls(path, create, **kwargs) self.repository.__enter__() # clean exit handled by serve() method return self.repository.id @@ -422,661 +295,6 @@ class RepositoryServer: # pragma: no cover 0 // 0 -class SleepingBandwidthLimiter: - def __init__(self, limit): - if limit: - self.ratelimit = int(limit * RATELIMIT_PERIOD) - self.ratelimit_last = time.monotonic() - self.ratelimit_quota = self.ratelimit - else: - self.ratelimit = None - - def write(self, fd, to_send): - if self.ratelimit: - now = time.monotonic() - if self.ratelimit_last + RATELIMIT_PERIOD <= now: - self.ratelimit_quota += self.ratelimit - if self.ratelimit_quota > 2 * self.ratelimit: - self.ratelimit_quota = 2 * self.ratelimit - self.ratelimit_last = now - if self.ratelimit_quota == 0: - tosleep = self.ratelimit_last + RATELIMIT_PERIOD - now - time.sleep(tosleep) - self.ratelimit_quota += self.ratelimit - self.ratelimit_last = time.monotonic() - if len(to_send) > self.ratelimit_quota: - to_send = to_send[: self.ratelimit_quota] - try: - written = os.write(fd, to_send) - except BrokenPipeError: - raise ConnectionBrokenWithHint("Broken Pipe") from None - if self.ratelimit: - self.ratelimit_quota -= written - return written - - -def api(*, since, **kwargs_decorator): - """Check version requirements and use self.call to do the remote method call. - - specifies the version in which borg introduced this method. - Calling this method when connected to an older version will fail without transmitting anything to the server. - - Further kwargs can be used to encode version specific restrictions: - - is the value resulting in the behaviour before introducing the new parameter. - If a previous hardcoded behaviour is parameterized in a version, this allows calls that use the previously - hardcoded behaviour to pass through and generates an error if another behaviour is requested by the client. - E.g. when 'append_only' was introduced in 1.0.7 the previous behaviour was what now is append_only=False. - Thus @api(..., append_only={'since': parse_version('1.0.7'), 'previously': False}) allows calls - with append_only=False for all version but rejects calls using append_only=True on versions older than 1.0.7. - - is a flag to set the behaviour if an old version is called the new way. - If set to True, the method is called without the (not yet supported) parameter (this should be done if that is the - more desirable behaviour). If False, an exception is generated. - E.g. before 'threshold' was introduced in 1.2.0a8, a hardcoded threshold of 0.1 was used in commit(). - """ - - def decorator(f): - @functools.wraps(f) - def do_rpc(self, *args, **kwargs): - sig = inspect.signature(f) - bound_args = sig.bind(self, *args, **kwargs) - named = {} # Arguments for the remote process - extra = {} # Arguments for the local process - for name, param in sig.parameters.items(): - if name == "self": - continue - if name in bound_args.arguments: - if name == "wait": - extra[name] = bound_args.arguments[name] - else: - named[name] = bound_args.arguments[name] - else: - if param.default is not param.empty: - named[name] = param.default - - if self.server_version < since: - raise self.RPCServerOutdated(f.__name__, format_version(since)) - - for name, restriction in kwargs_decorator.items(): - if restriction["since"] <= self.server_version: - continue - if "previously" in restriction and named[name] == restriction["previously"]: - continue - if restriction.get("dontcare", False): - continue - - raise self.RPCServerOutdated( - f"{f.__name__} {name}={named[name]!s}", format_version(restriction["since"]) - ) - - return self.call(f.__name__, named, **extra) - - return do_rpc - - return decorator - - -class RemoteRepository: - extra_test_args = [] # type: ignore - - class RPCError(Exception): - def __init__(self, unpacked): - # unpacked has keys: 'exception_args', 'exception_full', 'exception_short', 'sysinfo' - self.unpacked = unpacked - - def get_message(self): - return "\n".join(self.unpacked["exception_short"]) - - @property - def traceback(self): - return self.unpacked.get("exception_trace", True) - - @property - def exception_class(self): - return self.unpacked["exception_class"] - - @property - def exception_full(self): - return "\n".join(self.unpacked["exception_full"]) - - @property - def sysinfo(self): - return self.unpacked["sysinfo"] - - class RPCServerOutdated(Error): - """Borg server is too old for {}. Required version {}""" - - exit_mcode = 84 - - @property - def method(self): - return self.args[0] - - @property - def required_version(self): - return self.args[1] - - def __init__(self, location, create=False, exclusive=False, lock_wait=1.0, lock=True, args=None): - self.location = self._location = location - self.preload_ids = [] - self.msgid = 0 - self.rx_bytes = 0 - self.tx_bytes = 0 - self.to_send = EfficientCollectionQueue(1024 * 1024, bytes) - self.stdin_fd = self.stdout_fd = self.stderr_fd = None - self.stderr_received = b"" # incomplete stderr line bytes received (no \n yet) - self.chunkid_to_msgids = {} - self.ignore_responses = set() - self.responses = {} - self.async_responses = {} - self.shutdown_time = None - self.ratelimit = SleepingBandwidthLimiter(args.upload_ratelimit * 1024 if args and args.upload_ratelimit else 0) - self.upload_buffer_size_limit = args.upload_buffer * 1024 * 1024 if args and args.upload_buffer else 0 - self.unpacker = get_limited_unpacker("client") - self.server_version = None # we update this after server sends its version - self.p = self.sock = None - self._args = args - if self.location.proto == "ssh": - testing = location.host == "__testsuite__" - # when testing, we invoke and talk to a borg process directly (no ssh). - # when not testing, we invoke the system-installed ssh binary to talk to a remote borg. - env = prepare_subprocess_env(system=not testing) - borg_cmd = self.borg_cmd(args, testing) - if not testing: - borg_cmd = self.ssh_cmd(location) + borg_cmd - logger.debug("SSH command line: %s", borg_cmd) - # we do not want the ssh getting killed by Ctrl-C/SIGINT because it is needed for clean shutdown of borg. - self.p = Popen( - borg_cmd, - bufsize=0, - stdin=PIPE, - stdout=PIPE, - stderr=PIPE, - env=env, - preexec_fn=None if is_win32 else ignore_sigint, - ) # nosec B603 - self.stdin_fd = self.p.stdin.fileno() - self.stdout_fd = self.p.stdout.fileno() - self.stderr_fd = self.p.stderr.fileno() - self.r_fds = [self.stdout_fd, self.stderr_fd] - self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd] - elif self.location.proto == "socket": - if args.use_socket is False or args.use_socket is True: # nothing or --socket - socket_path = get_socket_filename() - else: # --socket=/some/path - socket_path = args.use_socket - self.sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) - try: - self.sock.connect(socket_path) # note: socket_path length is rather limited. - except FileNotFoundError: - self.sock = None - raise Error(f"The socket file {socket_path} does not exist.") - except ConnectionRefusedError: - self.sock = None - raise Error(f"There is no borg serve running for the socket file {socket_path}.") - self.stdin_fd = self.sock.makefile("wb").fileno() - self.stdout_fd = self.sock.makefile("rb").fileno() - self.stderr_fd = None - self.r_fds = [self.stdout_fd] - self.x_fds = [self.stdin_fd, self.stdout_fd] - else: - raise Error(f"Unsupported protocol {location.proto}") - - os.set_blocking(self.stdin_fd, False) - assert not os.get_blocking(self.stdin_fd) - os.set_blocking(self.stdout_fd, False) - assert not os.get_blocking(self.stdout_fd) - if self.stderr_fd is not None: - os.set_blocking(self.stderr_fd, False) - assert not os.get_blocking(self.stderr_fd) - - try: - try: - version = self.call("negotiate", {"client_data": {"client_version": BORG_VERSION}}) - except ConnectionClosed: - raise ConnectionClosedWithHint("Is borg working on the server?") from None - if isinstance(version, dict): - self.server_version = version["server_version"] - else: - raise Exception("Server insisted on using unsupported protocol version %s" % version) - - self.id = self.open( - path=self.location.path, create=create, lock_wait=lock_wait, lock=lock, exclusive=exclusive - ) - info = self.info() - self.version = info["version"] - - except Exception: - self.close() - raise - - def __del__(self): - if len(self.responses): - logging.debug("still %d cached responses left in RemoteRepository" % (len(self.responses),)) - if self.p or self.sock: - self.close() - assert False, "cleanup happened in RemoteRepository.__del__" - - def __repr__(self): - return f"<{self.__class__.__name__} {self.location.canonical_path()}>" - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - try: - if exc_type is not None: - self.shutdown_time = time.monotonic() + 30 - finally: - # in any case, we want to close the repo cleanly. - logger.debug( - "RemoteRepository: %s bytes sent, %s bytes received, %d messages sent", - format_file_size(self.tx_bytes), - format_file_size(self.rx_bytes), - self.msgid, - ) - self.close() - - @property - def id_str(self): - return bin_to_hex(self.id) - - def borg_cmd(self, args, testing): - """return a borg serve command line""" - # give some args/options to 'borg serve' process as they were given to us - opts = [] - if args is not None: - root_logger = logging.getLogger() - if root_logger.isEnabledFor(logging.DEBUG): - opts.append("--debug") - elif root_logger.isEnabledFor(logging.INFO): - opts.append("--info") - elif root_logger.isEnabledFor(logging.WARNING): - pass # warning is default - elif root_logger.isEnabledFor(logging.ERROR): - opts.append("--error") - elif root_logger.isEnabledFor(logging.CRITICAL): - opts.append("--critical") - else: - raise ValueError("log level missing, fix this code") - - # Tell the remote server about debug topics it may need to consider. - # Note that debug topics are usable for "spew" or "trace" logs which would - # be too plentiful to transfer for normal use, so the server doesn't send - # them unless explicitly enabled. - # - # Needless to say, if you do --debug-topic=repository.compaction, for example, - # with a 1.0.x server it won't work, because the server does not recognize the - # option. - # - # This is not considered a problem, since this is a debugging feature that - # should not be used for regular use. - for topic in args.debug_topics: - if "." not in topic: - topic = "borg.debug." + topic - if "repository" in topic: - opts.append("--debug-topic=%s" % topic) - env_vars = [] - if testing: - return env_vars + [sys.executable, "-m", "borg", "serve"] + opts + self.extra_test_args - else: # pragma: no cover - remote_path = args.remote_path or os.environ.get("BORG_REMOTE_PATH", "borg") - remote_path = replace_placeholders(remote_path) - return env_vars + [remote_path, "serve"] + opts - - def ssh_cmd(self, location): - """return a ssh command line that can be prefixed to a borg command line""" - rsh = self._args.rsh or os.environ.get("BORG_RSH", "ssh") - args = shlex.split(rsh) - if location.port: - args += ["-p", str(location.port)] - if location.user: - args.append(f"{location.user}@{location.host}") - else: - args.append("%s" % location.host) - return args - - def call(self, cmd, args, **kw): - for resp in self.call_many(cmd, [args], **kw): - return resp - - def call_many(self, cmd, calls, wait=True, is_preloaded=False, async_wait=True): - if not calls and cmd != "async_responses": - return - - assert not is_preloaded or cmd == "get", "is_preloaded is only supported for 'get'" - - def send_buffer(): - if self.to_send: - try: - written = self.ratelimit.write(self.stdin_fd, self.to_send.peek_front()) - self.tx_bytes += written - self.to_send.pop_front(written) - except OSError as e: - # io.write might raise EAGAIN even though select indicates - # that the fd should be writable. - # EWOULDBLOCK is added for defensive programming sake. - if e.errno not in [errno.EAGAIN, errno.EWOULDBLOCK]: - raise - - def pop_preload_msgid(chunkid): - msgid = self.chunkid_to_msgids[chunkid].pop(0) - if not self.chunkid_to_msgids[chunkid]: - del self.chunkid_to_msgids[chunkid] - return msgid - - def handle_error(unpacked): - if "exception_class" not in unpacked: - return - - error = unpacked["exception_class"] - args = unpacked["exception_args"] - - if error == "Error": - raise Error(args[0]) - elif error == "ErrorWithTraceback": - raise ErrorWithTraceback(args[0]) - elif error == "InvalidRepository": - raise Repository.InvalidRepository(self.location.processed) - elif error == "DoesNotExist": - raise Repository.DoesNotExist(self.location.processed) - elif error == "AlreadyExists": - raise Repository.AlreadyExists(self.location.processed) - elif error == "CheckNeeded": - raise Repository.CheckNeeded(self.location.processed) - elif error == "IntegrityError": - raise IntegrityError(args[0]) - elif error == "PathNotAllowed": - raise PathNotAllowed(args[0]) - elif error == "PathPermissionDenied": - raise Repository.PathPermissionDenied(args[0]) - elif error == "PathAlreadyExists": - raise Repository.PathAlreadyExists(args[0]) - elif error == "ParentPathDoesNotExist": - raise Repository.ParentPathDoesNotExist(args[0]) - elif error == "ObjectNotFound": - raise Repository.ObjectNotFound(args[0], self.location.processed) - elif error == "StoreObjectNotFound": - raise StoreObjectNotFound(args[0]) - elif error == "InvalidRPCMethod": - raise InvalidRPCMethod(args[0]) - elif error == "LockTimeout": - raise LockTimeout(args[0]) - elif error == "LockFailed": - raise LockFailed(args[0], args[1]) - elif error == "NotLocked": - raise NotLocked(args[0]) - elif error == "NotMyLock": - raise NotMyLock(args[0]) - elif error == "NoManifestError": - raise NoManifestError - elif error == "InsufficientFreeSpaceError": - raise Repository.InsufficientFreeSpaceError(args[0], args[1]) - elif error == "InvalidRepositoryConfig": - raise Repository.InvalidRepositoryConfig(self.location.processed, args[1]) - else: - raise self.RPCError(unpacked) - - calls = list(calls) - waiting_for = [] - maximum_to_send = 0 if wait else self.upload_buffer_size_limit - send_buffer() # Try to send data, as some cases (async_response) will never try to send data otherwise. - try: - while wait or calls: - logger.debug( - f"call_many: calls: {len(calls)} waiting_for: {len(waiting_for)} responses: {len(self.responses)}" - ) - if self.shutdown_time and time.monotonic() > self.shutdown_time: - # we are shutting this RemoteRepository down already, make sure we do not waste - # a lot of time in case a lot of async stuff is coming in or remote is gone or slow. - logger.debug( - "shutdown_time reached, shutting down with %d waiting_for and %d async_responses.", - len(waiting_for), - len(self.async_responses), - ) - return - while waiting_for: - try: - unpacked = self.responses.pop(waiting_for[0]) - waiting_for.pop(0) - handle_error(unpacked) - yield unpacked[RESULT] - if not waiting_for and not calls: - return - except KeyError: - break - if cmd == "async_responses": - while True: - try: - msgid, unpacked = self.async_responses.popitem() - except KeyError: - # there is nothing left what we already have received - if async_wait and self.ignore_responses: - # but do not return if we shall wait and there is something left to wait for: - break - else: - return - else: - handle_error(unpacked) - yield unpacked[RESULT] - if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT): - w_fds = [self.stdin_fd] - else: - w_fds = [] - r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1) - if x: - raise Exception("FD exception occurred") - for fd in r: - if fd is self.stdout_fd: - data = os.read(fd, BUFSIZE) - if not data: - raise ConnectionClosed() - self.rx_bytes += len(data) - self.unpacker.feed(data) - for unpacked in self.unpacker: - if not isinstance(unpacked, dict): - raise UnexpectedRPCDataFormatFromServer(data) - - lr_dict = unpacked.get(LOG) - if lr_dict is not None: - # Re-emit remote log messages locally. - _logger = logging.getLogger(lr_dict["name"]) - if _logger.isEnabledFor(lr_dict["level"]): - _logger.handle(logging.LogRecord(**lr_dict)) - continue - - msgid = unpacked[MSGID] - if msgid in self.ignore_responses: - self.ignore_responses.remove(msgid) - # async methods never return values, but may raise exceptions. - if "exception_class" in unpacked: - self.async_responses[msgid] = unpacked - else: - # we currently do not have async result values except "None", - # so we do not add them into async_responses. - if unpacked[RESULT] is not None: - self.async_responses[msgid] = unpacked - else: - self.responses[msgid] = unpacked - elif fd is self.stderr_fd: - data = os.read(fd, 32768) - if not data: - raise ConnectionClosed() - self.rx_bytes += len(data) - # deal with incomplete lines (may appear due to block buffering) - if self.stderr_received: - data = self.stderr_received + data - self.stderr_received = b"" - lines = data.splitlines(keepends=True) - if lines and not lines[-1].endswith((b"\r", b"\n")): - self.stderr_received = lines.pop() - # now we have complete lines in and any partial line in self.stderr_received. - _logger = logging.getLogger() - for line in lines: - # borg serve (remote/server side) should not emit stuff on stderr, - # but e.g. the ssh process (local/client side) might output errors there. - assert line.endswith((b"\r", b"\n")) - # something came in on stderr, log it to not lose it. - # decode late, avoid partial utf-8 sequences. - _logger.warning("stderr: " + line.decode().strip()) - if w: - while ( - (len(self.to_send) <= maximum_to_send) - and (calls or self.preload_ids) - and len(waiting_for) < MAX_INFLIGHT - ): - if calls: - args = calls[0] - if cmd == "get" and args["id"] in self.chunkid_to_msgids: - # we have a get command and have already sent a request for this chunkid when - # doing preloading, so we know the msgid of the response we are waiting for: - waiting_for.append(pop_preload_msgid(args["id"])) - del calls[0] - elif not is_preloaded: - # make and send a request (already done if we are using preloading) - self.msgid += 1 - waiting_for.append(self.msgid) - del calls[0] - self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})) - if not self.to_send and self.preload_ids: - chunk_id = self.preload_ids.pop(0) - # for preloading chunks, the raise_missing behaviour is defined HERE, - # not in the get_many / fetch_many call that later fetches the preloaded chunks. - args = {"id": chunk_id, "raise_missing": False} - self.msgid += 1 - self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid) - self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args})) - - send_buffer() - finally: - self.ignore_responses |= set(waiting_for) # we lose order here - if is_preloaded: - for call in calls: - chunkid = call["id"] - if chunkid in self.chunkid_to_msgids: - self.ignore_responses.add(pop_preload_msgid(chunkid)) - - @api(since=parse_version("1.0.0"), v1_legacy={"since": parse_version("2.0.0b21"), "previously": True}) - def open(self, path, create=False, lock_wait=None, lock=True, exclusive=False, v1_legacy=False): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("2.0.0a3")) - def info(self): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0"), max_duration={"since": parse_version("1.2.0a4"), "previously": 0}) - def check(self, repair=False, max_duration=0): - """actual remoting is done via self.call in the @api decorator""" - - @api( - since=parse_version("1.0.0"), - compact={"since": parse_version("1.2.0a0"), "previously": True, "dontcare": True}, - threshold={"since": parse_version("1.2.0a8"), "previously": 0.1, "dontcare": True}, - ) - def commit(self, compact=True, threshold=0.1): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0")) - def rollback(self): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0")) - def destroy(self): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0")) - def __len__(self): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0")) - def list(self, limit=None, marker=None): - """actual remoting is done via self.call in the @api decorator""" - - def get(self, id, read_data=True, raise_missing=True): - for resp in self.get_many([id], read_data=read_data, raise_missing=raise_missing): - return resp - - def get_many(self, ids, read_data=True, is_preloaded=False, raise_missing=True): - yield from self.call_many( - "get", - [{"id": id, "read_data": read_data, "raise_missing": raise_missing} for id in ids], - is_preloaded=is_preloaded, - ) - - @api(since=parse_version("1.0.0")) - def put(self, id, data, wait=True): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0")) - def delete(self, id, wait=True): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0")) - def save_key(self, keydata): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0")) - def load_key(self): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0")) - def break_lock(self): - """actual remoting is done via self.call in the @api decorator""" - - def close(self): - if self.p or self.sock: - self.call("close", {}, wait=True) - if self.p: - self.p.stdin.close() - self.p.stdout.close() - self.p.wait() - self.p = None - if self.sock: - try: - self.sock.shutdown(socket.SHUT_RDWR) - except OSError as e: - if e.errno != errno.ENOTCONN: - raise - self.sock.close() - self.sock = None - - def async_response(self, wait=True): - for resp in self.call_many("async_responses", calls=[], wait=True, async_wait=wait): - return resp - - def preload(self, ids): - self.preload_ids += ids - - @api(since=parse_version("2.0.0b8")) - def get_manifest(self): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("2.0.0b8")) - def put_manifest(self, data): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("2.0.0b8"), deleted={"since": parse_version("2.0.0b14"), "previously": False}) - def store_list(self, name, *, deleted=False): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("2.0.0b8")) - def store_load(self, name): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("2.0.0b8")) - def store_store(self, name, value): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("2.0.0b8"), deleted={"since": parse_version("2.0.0b14"), "previously": False}) - def store_delete(self, name, *, deleted=False): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("2.0.0b14")) - def store_move(self, name, new_name=None, *, delete=False, undelete=False, deleted=False): - """actual remoting is done via self.call in the @api decorator""" - - class RepositoryNoCache: """A not caching Repository wrapper, passes through to repository. @@ -1280,7 +498,7 @@ def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None csize = meta.get("csize", len(data)) return csize, decrypted - if isinstance(repository, RemoteRepository) or force_cache: + if force_cache: return RepositoryCache(repository, pack, unpack, transform) else: return RepositoryNoCache(repository, transform) diff --git a/src/borg/testsuite/archiver/__init__.py b/src/borg/testsuite/archiver/__init__.py index 3169844e4..8b4738c2b 100644 --- a/src/borg/testsuite/archiver/__init__.py +++ b/src/borg/testsuite/archiver/__init__.py @@ -24,7 +24,6 @@ from ...helpers import init_ec_warnings from ...logger import flush_logging from ...manifest import Manifest from ...platform import get_flags -from ...remote import RemoteRepository from ...repository import Repository from .. import has_lchflags, has_mknod, is_utime_fully_supported, have_fuse_mtime_ns, st_mtime_ns_round, filter_xattrs from .. import changedir, ENOATTR # NOQA @@ -178,10 +177,7 @@ def open_archive(repo_path, name): def open_repository(archiver): if archiver.get_kind() == "remote": - location = Location(archiver.repository_location) - if location.proto == "rest": - return Repository(location, exclusive=True) - return RemoteRepository(location) + return Repository(Location(archiver.repository_location), exclusive=True) else: return Repository(archiver.repository_path, exclusive=True) diff --git a/src/borg/testsuite/archiver/check_cmd_test.py b/src/borg/testsuite/archiver/check_cmd_test.py index 0c23b6ef7..c9292687c 100644 --- a/src/borg/testsuite/archiver/check_cmd_test.py +++ b/src/borg/testsuite/archiver/check_cmd_test.py @@ -9,7 +9,6 @@ from ...archive import ChunkBuffer from ...constants import * # NOQA from ...helpers import bin_to_hex, msgpack from ...manifest import Manifest -from ...remote import RemoteRepository from ...repository import Repository from ..repository_test import fchunk from . import cmd, src_file, create_src_archive, open_archive, generate_archiver_tests, RK_ENCRYPTION @@ -208,7 +207,7 @@ def test_missing_manifest(archivers, request): check_cmd_setup(archiver) archive, repository = open_archive(archiver.repository_path, "archive1") with repository: - if isinstance(repository, (Repository, RemoteRepository)): + if isinstance(repository, Repository): repository.store_delete("config/manifest") else: repository.delete(Manifest.MANIFEST_ID) diff --git a/src/borg/testsuite/archiver/checks_test.py b/src/borg/testsuite/archiver/checks_test.py index 88efa31c5..9327feb9d 100644 --- a/src/borg/testsuite/archiver/checks_test.py +++ b/src/borg/testsuite/archiver/checks_test.py @@ -1,6 +1,5 @@ import os import shutil -from unittest.mock import patch import pytest @@ -9,7 +8,6 @@ from ...constants import * # NOQA from ...helpers import Location, get_security_dir, bin_to_hex from ...helpers import EXIT_ERROR from ...manifest import Manifest, MandatoryFeatureUnsupported -from ...remote import RemoteRepository, PathNotAllowed from ...repository import Repository from .. import llfuse from .. import changedir @@ -277,49 +275,6 @@ def test_unknown_mandatory_feature_in_cache(archivers, request): # Begin Remote Tests -def test_remote_repo_restrict_to_path(remote_archiver): - if remote_archiver.repository_location.startswith("rest://"): - pytest.skip("Not applicable for rest:// protocol") - original_location, repo_path = remote_archiver.repository_location, remote_archiver.repository_path - # restricted to repo directory itself: - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", repo_path]): - cmd(remote_archiver, "repo-create", RK_ENCRYPTION) - # restricted to repo directory itself, fail for other directories with same prefix: - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", repo_path]): - with pytest.raises(PathNotAllowed): - remote_archiver.repository_location = original_location + "_0" - cmd(remote_archiver, "repo-create", RK_ENCRYPTION) - # restricted to a completely different path: - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", "/foo"]): - with pytest.raises(PathNotAllowed): - remote_archiver.repository_location = original_location + "_1" - cmd(remote_archiver, "repo-create", RK_ENCRYPTION) - path_prefix = os.path.dirname(repo_path) - # restrict to repo directory's parent directory: - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", path_prefix]): - remote_archiver.repository_location = original_location + "_2" - cmd(remote_archiver, "repo-create", RK_ENCRYPTION) - # restrict to repo directory's parent directory and another directory: - with patch.object( - RemoteRepository, "extra_test_args", ["--restrict-to-path", "/foo", "--restrict-to-path", path_prefix] - ): - remote_archiver.repository_location = original_location + "_3" - cmd(remote_archiver, "repo-create", RK_ENCRYPTION) - - -def test_remote_repo_restrict_to_repository(remote_archiver): - if remote_archiver.repository_location.startswith("rest://"): - pytest.skip("Not applicable for rest:// protocol") - repo_path = remote_archiver.repository_path - # restricted to repo directory itself: - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-repository", repo_path]): - cmd(remote_archiver, "repo-create", RK_ENCRYPTION) - parent_path = os.path.join(repo_path, "..") - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-repository", parent_path]): - with pytest.raises(PathNotAllowed): - cmd(remote_archiver, "repo-create", RK_ENCRYPTION) - - def test_remote_repo_strip_components_doesnt_leak(remote_archiver): cmd(remote_archiver, "repo-create", RK_ENCRYPTION) create_regular_file(remote_archiver.input_path, "dir/file", contents=b"test file contents 1") diff --git a/src/borg/testsuite/archiver/serve_cmd_test.py b/src/borg/testsuite/archiver/serve_cmd_test.py deleted file mode 100644 index 420399af5..000000000 --- a/src/borg/testsuite/archiver/serve_cmd_test.py +++ /dev/null @@ -1,55 +0,0 @@ -import os -import subprocess -import tempfile -import time - -import pytest -import platformdirs - -from . import exec_cmd -from ...platformflags import is_win32 -from ...helpers import get_runtime_dir - - -def have_a_short_runtime_dir(mp): - # Under pytest, we use BORG_BASE_DIR to keep stuff away from the user's normal Borg directories. - # This leads to a very long get_runtime_dir() path — too long for a socket file! - # Thus, we override that again via BORG_RUNTIME_DIR to get a shorter path. - mp.setenv("BORG_RUNTIME_DIR", os.path.join(platformdirs.user_runtime_dir(), "pytest")) - - -@pytest.fixture -def serve_socket(monkeypatch): - have_a_short_runtime_dir(monkeypatch) - # Use a random unique socket filename, so tests can run in parallel. - socket_file = tempfile.mktemp(suffix=".sock", prefix="borg-", dir=get_runtime_dir()) - with subprocess.Popen(["borg", "serve", f"--socket={socket_file}"]) as p: - while not os.path.exists(socket_file): - time.sleep(0.01) # wait until the socket server has started - yield socket_file - p.terminate() - - -@pytest.mark.skipif(is_win32, reason="hangs on win32") -def test_with_socket(serve_socket, tmpdir, monkeypatch): - have_a_short_runtime_dir(monkeypatch) - repo_path = str(tmpdir.join("repo")) - - ret, output = exec_cmd( - f"--socket={serve_socket}", f"--repo=socket://{repo_path}", "repo-create", "--encryption=none" - ) - assert ret == 0 - - ret, output = exec_cmd(f"--socket={serve_socket}", f"--repo=socket://{repo_path}", "repo-info") - assert ret == 0 - assert "Repository ID: " in output - - monkeypatch.setenv("BORG_DELETE_I_KNOW_WHAT_I_AM_DOING", "YES") - ret, output = exec_cmd(f"--socket={serve_socket}", f"--repo=socket://{repo_path}", "repo-delete") - assert ret == 0 - - -@pytest.mark.skipif(is_win32, reason="hangs on win32") -def test_socket_permissions(serve_socket): - st = os.stat(serve_socket) - assert st.st_mode & 0o0777 == 0o0770 # user and group are permitted to use the socket diff --git a/src/borg/testsuite/helpers/parseformat_test.py b/src/borg/testsuite/helpers/parseformat_test.py index 0d6ab162c..93bf0b028 100644 --- a/src/borg/testsuite/helpers/parseformat_test.py +++ b/src/borg/testsuite/helpers/parseformat_test.py @@ -246,12 +246,10 @@ class TestLocationWithoutEnv: def test_socket(self, monkeypatch): monkeypatch.delenv("BORG_REPO", raising=False) + # socket:// is no longer supported and must be rejected as an invalid location. url = "socket:///c:/repo/path" if is_win32 else "socket:///repo/path" - path = "c:/repo/path" if is_win32 else "/repo/path" - assert ( - repr(Location(url)) - == f"Location(proto='socket', user=None, pass=None, host=None, port=None, path='{path}')" - ) + with pytest.raises(ValueError): + Location(url) def test_file(self, monkeypatch): monkeypatch.delenv("BORG_REPO", raising=False) @@ -334,7 +332,6 @@ class TestLocationWithoutEnv: ] locations.insert(1, "c:/absolute/path" if is_win32 else "/absolute/path") locations.insert(2, "file:///c:/absolute/path" if is_win32 else "file:///absolute/path") - locations.insert(3, "socket:///c:/absolute/path" if is_win32 else "socket:///absolute/path") for location in locations: assert ( Location(location).canonical_path() == Location(Location(location).canonical_path()).canonical_path() diff --git a/src/borg/testsuite/remote_test.py b/src/borg/testsuite/remote_test.py index 6f01fc334..0eb0b79c3 100644 --- a/src/borg/testsuite/remote_test.py +++ b/src/borg/testsuite/remote_test.py @@ -1,13 +1,12 @@ import errno import os import io -import time from unittest.mock import patch import pytest from ..constants import ROBJ_FILE_STREAM -from ..remote import SleepingBandwidthLimiter, RepositoryCache, cache_if_remote +from ..remote import RepositoryCache, cache_if_remote from ..repository import Repository from ..crypto.key import PlaintextKey from ..helpers import IntegrityError @@ -17,60 +16,6 @@ from .repository_test import fchunk, pdchunk from .crypto.key_test import TestKey -class TestSleepingBandwidthLimiter: - def expect_write(self, fd, data): - self.expected_fd = fd - self.expected_data = data - - def check_write(self, fd, data): - assert fd == self.expected_fd - assert data == self.expected_data - return len(data) - - def test_write_unlimited(self, monkeypatch): - monkeypatch.setattr(os, "write", self.check_write) - - it = SleepingBandwidthLimiter(0) - self.expect_write(5, b"test") - it.write(5, b"test") - - def test_write(self, monkeypatch): - monkeypatch.setattr(os, "write", self.check_write) - monkeypatch.setattr(time, "monotonic", lambda: now) - monkeypatch.setattr(time, "sleep", lambda x: None) - - now = 100 - - it = SleepingBandwidthLimiter(100) # Bandwidth quota. - - # All fits - self.expect_write(5, b"test") - it.write(5, b"test") - - # Only partial write - self.expect_write(5, b"123456") - it.write(5, b"1234567890") - - # Sleeps - self.expect_write(5, b"123456") - it.write(5, b"123456") - - # Long time interval between writes - now += 10 - self.expect_write(5, b"1") - it.write(5, b"1") - - # Long time interval between writes, filling up the quota - now += 10 - self.expect_write(5, b"1") - it.write(5, b"1") - - # Long time interval between writes, filling up the quota to clip to the maximum - now += 10 - self.expect_write(5, b"1") - it.write(5, b"1") - - class TestRepositoryCache: @pytest.fixture def repository(self, tmpdir): diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index becdb3635..91a4b7e2a 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -1,13 +1,8 @@ -import logging import os -import sys import pytest -from ..helpers import Location from ..helpers import IntegrityError -from ..platformflags import is_win32 -from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed -from ..repository import Repository, StoreObjectNotFound, MAX_DATA_SIZE +from ..repository import Repository, MAX_DATA_SIZE from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION from .hashindex_test import H @@ -18,22 +13,14 @@ def repository(tmp_path): yield Repository(repository_location, exclusive=True, create=True) -@pytest.fixture() -def remote_repository(tmp_path): - if is_win32: - pytest.skip("Remote repository does not yet work on Windows.") - repository_location = Location("ssh://__testsuite__/" + os.fspath(tmp_path / "repository")) - yield RemoteRepository(repository_location, exclusive=True, create=True) - - def pytest_generate_tests(metafunc): - # Generate tests that run on both local and remote repositories. + # Generate tests that run on repositories. if "repo_fixtures" in metafunc.fixturenames: - metafunc.parametrize("repo_fixtures", ["repository", "remote_repository"]) + metafunc.parametrize("repo_fixtures", ["repository"]) def get_repository_from_fixture(repo_fixtures, request): - # Return the repository object from the fixture for tests that run on both local and remote repositories. + # Return the repository object from the fixture. return request.getfixturevalue(repo_fixtures) @@ -43,14 +30,7 @@ def reopen(repository, exclusive: bool | None = True, create=False): raise RuntimeError("Repo must be closed before a reopen. Cannot support nested repository contexts.") return Repository(repository._location, exclusive=exclusive, create=create) - if isinstance(repository, RemoteRepository): - if repository.p is not None or repository.sock is not None: - raise RuntimeError("Remote repo must be closed before a reopen. Cannot support nested repository contexts.") - return RemoteRepository(repository.location, exclusive=exclusive, create=create) - - raise TypeError( - f"Invalid argument type. Expected 'Repository' or 'RemoteRepository', received '{type(repository).__name__}'." - ) + raise TypeError(f"Invalid argument type. Expected 'Repository', received '{type(repository).__name__}'.") def fchunk(data, meta=b"", chunk_id=b"\x00" * 32): @@ -149,125 +129,3 @@ def check(repository, repo_path, repair=False, status=True): # Make sure no tmp files are left behind tmp_files = [name for name in os.listdir(repo_path) if "tmp" in name] assert tmp_files == [], "Found tmp files" - - -def _get_mock_args(): - class MockArgs: - remote_path = "borg" - umask = 0o077 - debug_topics = [] - rsh = None - - def __contains__(self, item): - # to behave like argparse.Namespace - return hasattr(self, item) - - return MockArgs() - - -def test_remote_invalid_rpc(remote_repository): - with remote_repository: - with pytest.raises(InvalidRPCMethod): - remote_repository.call("__init__", {}) - - -def test_remote_rpc_exception_transport(remote_repository): - with remote_repository: - s1 = "test string" - - try: - remote_repository.call("inject_exception", {"kind": "DoesNotExist"}) - except Repository.DoesNotExist as e: - assert len(e.args) == 1 - assert e.args[0] == remote_repository.location.processed - - try: - remote_repository.call("inject_exception", {"kind": "AlreadyExists"}) - except Repository.AlreadyExists as e: - assert len(e.args) == 1 - assert e.args[0] == remote_repository.location.processed - - try: - remote_repository.call("inject_exception", {"kind": "CheckNeeded"}) - except Repository.CheckNeeded as e: - assert len(e.args) == 1 - assert e.args[0] == remote_repository.location.processed - - try: - remote_repository.call("inject_exception", {"kind": "IntegrityError"}) - except IntegrityError as e: - assert len(e.args) == 1 - assert e.args[0] == s1 - - try: - remote_repository.call("inject_exception", {"kind": "PathNotAllowed"}) - except PathNotAllowed as e: - assert len(e.args) == 1 - assert e.args[0] == "foo" - - try: - remote_repository.call("inject_exception", {"kind": "ObjectNotFound"}) - except Repository.ObjectNotFound as e: - assert len(e.args) == 2 - assert e.args[0] == s1 - assert e.args[1] == remote_repository.location.processed - - try: - remote_repository.call("inject_exception", {"kind": "StoreObjectNotFound"}) - except StoreObjectNotFound as e: - assert len(e.args) == 1 - assert e.args[0] == s1 - - try: - remote_repository.call("inject_exception", {"kind": "InvalidRPCMethod"}) - except InvalidRPCMethod as e: - assert len(e.args) == 1 - assert e.args[0] == s1 - - try: - remote_repository.call("inject_exception", {"kind": "divide"}) - except RemoteRepository.RPCError as e: - assert e.unpacked - assert e.get_message().startswith("ZeroDivisionError:") - assert e.exception_class == "ZeroDivisionError" - assert len(e.exception_full) > 0 - - -def test_remote_ssh_cmd(remote_repository): - with remote_repository: - args = _get_mock_args() - remote_repository._args = args - assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "example.com"] - assert remote_repository.ssh_cmd(Location("ssh://user@example.com/foo")) == ["ssh", "user@example.com"] - assert remote_repository.ssh_cmd(Location("ssh://user@example.com:1234/foo")) == [ - "ssh", - "-p", - "1234", - "user@example.com", - ] - os.environ["BORG_RSH"] = "ssh --foo" - assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "--foo", "example.com"] - - -def test_remote_borg_cmd(remote_repository): - with remote_repository: - assert remote_repository.borg_cmd(None, testing=True) == [sys.executable, "-m", "borg", "serve"] - args = _get_mock_args() - # XXX without next line we get spurious test fails when using pytest-xdist, root cause unknown: - logging.getLogger().setLevel(logging.INFO) - # note: test logger is on info log level, so --info gets added automagically - assert remote_repository.borg_cmd(args, testing=False) == ["borg", "serve", "--info"] - args.remote_path = "borg-0.28.2" - assert remote_repository.borg_cmd(args, testing=False) == ["borg-0.28.2", "serve", "--info"] - args.debug_topics = ["something_client_side", "repository_compaction"] - assert remote_repository.borg_cmd(args, testing=False) == [ - "borg-0.28.2", - "serve", - "--info", - "--debug-topic=borg.debug.repository_compaction", - ] - args = _get_mock_args() - assert remote_repository.borg_cmd(args, testing=False) == ["borg", "serve", "--info"] - args.rsh = "ssh -i foo" - remote_repository._args = args - assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "-i", "foo", "example.com"] From 4a4a8e4e7206faea6b319db2fa2172626b3e0eb4 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Mon, 8 Jun 2026 08:44:53 +0200 Subject: [PATCH 2/4] remove now-dead RepositoryCache / force_cache from remote.py After removing the modern RemoteRepository, cache_if_remote always returned RepositoryNoCache in production (the only RepositoryCache path was the removed isinstance(RemoteRepository) check; force_cache=True was used only by a test). Delete the vestigial RepositoryCache class and simplify cache_if_remote: drop the pack/unpack/force_cache parameters and the LZ4/xxh64 cache-file machinery, keep building the decrypted_cache -> transform closure, and always return RepositoryNoCache. Remove the imports that only RepositoryCache used. Replace the RepositoryCache tests with a focused test of the surviving cache_if_remote path (plain passthrough and decrypted (csize, plaintext) tuples). The legacy copy in borg/legacy/remote.py is intentionally left untouched (its RepositoryCache is still used for LegacyRemoteRepository). Co-Authored-By: Claude Opus 4.8 --- src/borg/remote.py | 183 ++---------------------------- src/borg/testsuite/remote_test.py | 160 +++----------------------- 2 files changed, 26 insertions(+), 317 deletions(-) diff --git a/src/borg/remote.py b/src/borg/remote.py index 786fc390e..20918f9f4 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -1,28 +1,17 @@ -import errno import inspect import logging import os import queue import select -import shutil -import struct import sys -import tempfile -import time import traceback -from xxhash import xxh64 - import borg.logger from . import __version__ -from .compress import Compressor from .constants import * # NOQA from .helpers import Error, IntegrityError -from .helpers import bin_to_hex from .helpers import get_limited_unpacker from .helpers import sysinfo -from .helpers import format_file_size -from .helpers import safe_unlink from .logger import create_logger, borg_serve_log_queue from .helpers import msgpack from .repository import Repository, StoreObjectNotFound @@ -296,9 +285,9 @@ class RepositoryServer: # pragma: no cover class RepositoryNoCache: - """A not caching Repository wrapper, passes through to repository. + """A Repository wrapper that passes through to the repository. - Just to have same API (including the context manager) as RepositoryCache. + It applies an optional *transform* and provides a uniform context-manager API. *transform* is a callable taking two arguments, key and raw repository data. The return value is returned from get()/get_many(). By default, the raw @@ -329,176 +318,22 @@ class RepositoryNoCache: pass -class RepositoryCache(RepositoryNoCache): +def cache_if_remote(repository, *, decrypted_cache=False, transform=None): """ - A caching Repository wrapper. - - Caches Repository GET operations locally. - - *pack* and *unpack* complement *transform* of the base class. - *pack* receives the output of *transform* and should return bytes, - which are stored in the cache. *unpack* receives these bytes and - should return the initial data (as returned by *transform*). - """ - - def __init__(self, repository, pack=None, unpack=None, transform=None): - super().__init__(repository, transform) - self.pack = pack or (lambda data: data) - self.unpack = unpack or (lambda data: data) - self.cache = set() - self.basedir = tempfile.mkdtemp(prefix="borg-cache-") - self.query_size_limit() - self.size = 0 - # Instrumentation - self.hits = 0 - self.misses = 0 - self.slow_misses = 0 - self.slow_lat = 0.0 - self.evictions = 0 - self.enospc = 0 - - def query_size_limit(self): - available_space = shutil.disk_usage(self.basedir).free - self.size_limit = int(min(available_space * 0.25, 2**31)) - - def prefixed_key(self, key, complete): - # just prefix another byte telling whether this key refers to a complete chunk - # or a without-data-metadata-only chunk (see also read_data param). - prefix = b"\x01" if complete else b"\x00" - return prefix + key - - def key_filename(self, key): - return os.path.join(self.basedir, bin_to_hex(key)) - - def backoff(self): - self.query_size_limit() - target_size = int(0.9 * self.size_limit) - while self.size > target_size and self.cache: - key = self.cache.pop() - file = self.key_filename(key) - self.size -= os.stat(file).st_size - os.unlink(file) - self.evictions += 1 - - def add_entry(self, key, data, cache, complete): - transformed = self.transform(key, data) - if not cache: - return transformed - packed = self.pack(transformed) - pkey = self.prefixed_key(key, complete=complete) - file = self.key_filename(pkey) - try: - with open(file, "wb") as fd: - fd.write(packed) - except OSError as os_error: - try: - safe_unlink(file) - except FileNotFoundError: - pass # open() could have failed as well - if os_error.errno == errno.ENOSPC: - self.enospc += 1 - self.backoff() - else: - raise - else: - self.size += len(packed) - self.cache.add(pkey) - if self.size > self.size_limit: - self.backoff() - return transformed - - def log_instrumentation(self): - logger.debug( - "RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), " - "%d evictions, %d ENOSPC hit", - len(self.cache), - format_file_size(self.size), - format_file_size(self.size_limit), - self.hits, - self.misses, - self.slow_misses, - self.slow_lat, - self.evictions, - self.enospc, - ) - - def close(self): - self.log_instrumentation() - self.cache.clear() - shutil.rmtree(self.basedir) - - def get_many(self, keys, read_data=True, raise_missing=True, cache=True): - # It could use different cache keys depending on read_data and cache full vs. meta-only chunks. - unknown_keys = [key for key in keys if self.prefixed_key(key, complete=read_data) not in self.cache] - repository_iterator = zip( - unknown_keys, self.repository.get_many(unknown_keys, read_data=read_data, raise_missing=raise_missing) - ) - for key in keys: - pkey = self.prefixed_key(key, complete=read_data) - if pkey in self.cache: - file = self.key_filename(pkey) - with open(file, "rb") as fd: - self.hits += 1 - yield self.unpack(fd.read()) - else: - for key_, data in repository_iterator: - if key_ == key: - transformed = self.add_entry(key, data, cache, complete=read_data) - self.misses += 1 - yield transformed - break - else: - # slow path: eviction during this get_many removed this key from the cache - t0 = time.perf_counter() - data = self.repository.get(key, read_data=read_data, raise_missing=raise_missing) - self.slow_lat += time.perf_counter() - t0 - transformed = self.add_entry(key, data, cache, complete=read_data) - self.slow_misses += 1 - yield transformed - # Consume any pending requests - for _ in repository_iterator: - pass - - -def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None, force_cache=False): - """ - Return a Repository(No)Cache for *repository*. + Return a RepositoryNoCache wrapping *repository*. If *decrypted_cache* is a repo_objs object, then get and get_many will return a tuple - (csize, plaintext) instead of the actual data in the repository. The cache will - store decrypted data, which increases CPU efficiency (by avoiding repeatedly decrypting - and more importantly MAC and ID checking cached objects). - Internally, objects are compressed with LZ4. + (csize, plaintext) instead of the actual data in the repository (the objects are + parsed/decrypted via the *transform* derived from it). """ - if decrypted_cache and (pack or unpack or transform): - raise ValueError("decrypted_cache and pack/unpack/transform are incompatible") + if decrypted_cache and transform: + raise ValueError("decrypted_cache and transform are incompatible") elif decrypted_cache: repo_objs = decrypted_cache - # 32 bit csize, 64 bit (8 byte) xxh64, 1 byte ctype, 1 byte clevel - cache_struct = struct.Struct("=I8sBB") - compressor = Compressor("lz4") - - def pack(data): - csize, decrypted = data - meta, compressed = compressor.compress({}, decrypted) - return cache_struct.pack(csize, xxh64(compressed).digest(), meta["ctype"], meta["clevel"]) + compressed - - def unpack(data): - data = memoryview(data) - csize, checksum, ctype, clevel = cache_struct.unpack(data[: cache_struct.size]) - compressed = data[cache_struct.size :] - if checksum != xxh64(compressed).digest(): - raise IntegrityError("detected corrupted data in metadata cache") - meta = dict(ctype=ctype, clevel=clevel, csize=len(compressed)) - _, decrypted = compressor.decompress(meta, compressed) - return csize, decrypted def transform(id_, data): meta, decrypted = repo_objs.parse(id_, data, ro_type=ROBJ_DONTCARE) csize = meta.get("csize", len(data)) return csize, decrypted - if force_cache: - return RepositoryCache(repository, pack, unpack, transform) - else: - return RepositoryNoCache(repository, transform) + return RepositoryNoCache(repository, transform) diff --git a/src/borg/testsuite/remote_test.py b/src/borg/testsuite/remote_test.py index 0eb0b79c3..4a5ca5e14 100644 --- a/src/borg/testsuite/remote_test.py +++ b/src/borg/testsuite/remote_test.py @@ -1,22 +1,18 @@ -import errno import os -import io -from unittest.mock import patch import pytest from ..constants import ROBJ_FILE_STREAM -from ..remote import RepositoryCache, cache_if_remote +from ..remote import cache_if_remote from ..repository import Repository from ..crypto.key import PlaintextKey -from ..helpers import IntegrityError from ..repoobj import RepoObj from .hashindex_test import H from .repository_test import fchunk, pdchunk from .crypto.key_test import TestKey -class TestRepositoryCache: +class TestCacheIfRemote: @pytest.fixture def repository(self, tmpdir): self.repository_location = os.path.join(str(tmpdir), "repository") @@ -26,124 +22,16 @@ class TestRepositoryCache: repository.put(H(3), fchunk(bytes(100))) yield repository - @pytest.fixture - def cache(self, repository): - return RepositoryCache(repository) - - def test_simple(self, cache: RepositoryCache): - # Single get()s are not cached, since they are used for unique objects like archives. - assert pdchunk(cache.get(H(1))) == b"1234" - assert cache.misses == 1 - assert cache.hits == 0 - - assert [pdchunk(ch) for ch in cache.get_many([H(1)])] == [b"1234"] - assert cache.misses == 2 - assert cache.hits == 0 - - assert [pdchunk(ch) for ch in cache.get_many([H(1)])] == [b"1234"] - assert cache.misses == 2 - assert cache.hits == 1 - - assert pdchunk(cache.get(H(1))) == b"1234" - assert cache.misses == 2 - assert cache.hits == 2 - - def test_meta(self, cache: RepositoryCache): - # Same as test_simple, but not reading the chunk data (metadata only). - # Single get()s are not cached, since they are used for unique objects like archives. - assert pdchunk(cache.get(H(1), read_data=False)) == b"" - assert cache.misses == 1 - assert cache.hits == 0 - - assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""] - assert cache.misses == 2 - assert cache.hits == 0 - - assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""] - assert cache.misses == 2 - assert cache.hits == 1 - - assert pdchunk(cache.get(H(1), read_data=False)) == b"" - assert cache.misses == 2 - assert cache.hits == 2 - - def test_mixed(self, cache: RepositoryCache): - assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""] - assert cache.misses == 1 - assert cache.hits == 0 - - assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=True)] == [b"1234"] - assert cache.misses == 2 - assert cache.hits == 0 - - assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""] - assert cache.misses == 2 - assert cache.hits == 1 - - assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=True)] == [b"1234"] - assert cache.misses == 2 - assert cache.hits == 2 - - def test_backoff(self, cache: RepositoryCache): - def query_size_limit(): - cache.size_limit = 0 - - assert [pdchunk(ch) for ch in cache.get_many([H(1), H(2)])] == [b"1234", b"5678"] - assert cache.misses == 2 - assert cache.evictions == 0 - iterator = cache.get_many([H(1), H(3), H(2)]) - assert pdchunk(next(iterator)) == b"1234" - - # Force cache to back off - qsl = cache.query_size_limit - cache.query_size_limit = query_size_limit # type: ignore[assignment] - cache.backoff() - cache.query_size_limit = qsl # type: ignore[assignment] - # Evicted H(1) and H(2) - assert cache.evictions == 2 - assert H(1) not in cache.cache - assert H(2) not in cache.cache - assert pdchunk(next(iterator)) == bytes(100) - assert cache.slow_misses == 0 - # Since H(2) was in the cache when we called get_many(), but has - # been evicted during iterating the generator, it will be a slow miss. - assert pdchunk(next(iterator)) == b"5678" - assert cache.slow_misses == 1 - - def test_enospc(self, cache: RepositoryCache): - class enospc_open: - def __init__(self, *args): - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - - def write(self, data): - raise OSError(errno.ENOSPC, "foo") - - def truncate(self, n=None): - pass - - iterator = cache.get_many([H(1), H(2), H(3)]) - assert pdchunk(next(iterator)) == b"1234" - - with patch("builtins.open", enospc_open): - assert pdchunk(next(iterator)) == b"5678" - assert cache.enospc == 1 - # We didn't patch query_size_limit, which would set size_limit to a low - # value, so nothing was actually evicted. - assert cache.evictions == 0 - - assert pdchunk(next(iterator)) == bytes(100) + def test_passthrough(self, repository): + # Without decrypted_cache, raw repository data is passed through unchanged. + with cache_if_remote(repository) as cached: + assert pdchunk(cached.get(H(1))) == b"1234" + assert [pdchunk(ch) for ch in cached.get_many([H(1), H(2)])] == [b"1234", b"5678"] @pytest.fixture def key(self, repository, monkeypatch): monkeypatch.setenv("BORG_PASSPHRASE", "test") - key = PlaintextKey.create(repository, TestKey.MockArgs()) - return key + return PlaintextKey.create(repository, TestKey.MockArgs()) @pytest.fixture def repo_objs(self, key): @@ -162,27 +50,13 @@ class TestRepositoryCache: def H2(self, repo_objs, repository): return self._put_encrypted_object(repo_objs, repository, b"5678") - @pytest.fixture - def H3(self, repo_objs, repository): - return self._put_encrypted_object(repo_objs, repository, bytes(100)) + def test_decrypted_cache(self, repo_objs, repository, H1, H2): + # With decrypted_cache, get/get_many return (csize, plaintext) tuples. + with cache_if_remote(repository, decrypted_cache=repo_objs) as cached: + csize, plaintext = cached.get(H1) + assert plaintext == b"1234" + assert [pt for _csize, pt in cached.get_many([H1, H2])] == [b"1234", b"5678"] - @pytest.fixture - def decrypted_cache(self, repo_objs, repository): - return cache_if_remote(repository, decrypted_cache=repo_objs, force_cache=True) - - def test_cache_corruption(self, decrypted_cache: RepositoryCache, H1, H2, H3): - list(decrypted_cache.get_many([H1, H2, H3])) - - iterator = decrypted_cache.get_many([H1, H2, H3]) - assert next(iterator) == (4, b"1234") - - pkey = decrypted_cache.prefixed_key(H2, complete=True) - with open(decrypted_cache.key_filename(pkey), "a+b") as fd: - fd.seek(-1, io.SEEK_END) - corrupted = (int.from_bytes(fd.read(), "little") ^ 2).to_bytes(1, "little") - fd.seek(-1, io.SEEK_END) - fd.write(corrupted) - fd.truncate() - - with pytest.raises(IntegrityError): - assert next(iterator) == (4, b"5678") + def test_decrypted_cache_and_transform_incompatible(self, repository, repo_objs): + with pytest.raises(ValueError): + cache_if_remote(repository, decrypted_cache=repo_objs, transform=lambda key, data: data) From 2cdb9cebc30b09913d1db26dbc8a93f712fffdd3 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Mon, 8 Jun 2026 09:04:19 +0200 Subject: [PATCH 3/4] remove unused caching trio from legacy/remote.py borg.legacy.remote.cache_if_remote (and the RepositoryCache / RepositoryNoCache classes it returns) are dead code: nothing imports or calls them. Every cache_if_remote consumer (archive check, mount, tests) uses the non-legacy borg.remote version, and legacy repos never reach it (Archive.check rejects legacy repos). The trio was copied wholesale during the borg.legacy split (#9556). Delete RepositoryNoCache, RepositoryCache and cache_if_remote, plus the imports that only they used (shutil, struct, tempfile, xxhash.xxh64, compress.Compressor, helpers.safe_unlink). LegacyRemoteRepository and the rest of the module are unchanged. Co-Authored-By: Claude Opus 4.8 --- src/borg/legacy/remote.py | 216 -------------------------------------- 1 file changed, 216 deletions(-) diff --git a/src/borg/legacy/remote.py b/src/borg/legacy/remote.py index 048b2fcff..ffb4cda55 100644 --- a/src/borg/legacy/remote.py +++ b/src/borg/legacy/remote.py @@ -5,26 +5,19 @@ import logging import os import select import shlex -import shutil import socket -import struct import sys -import tempfile import textwrap import time from subprocess import Popen, PIPE -from xxhash import xxh64 - from .. import __version__ -from ..compress import Compressor from ..constants import * # NOQA from ..helpers import Error, ErrorWithTraceback, IntegrityError from ..helpers import bin_to_hex from ..helpers import get_limited_unpacker from ..helpers import replace_placeholders from ..helpers import format_file_size -from ..helpers import safe_unlink from ..helpers import prepare_subprocess_env, ignore_sigint from ..helpers import get_socket_filename from ..fslocking import LockTimeout, NotLocked, NotMyLock, LockFailed @@ -731,212 +724,3 @@ class LegacyRemoteRepository: @api(since=parse_version("2.0.0b8")) def put_manifest(self, data): """actual remoting is done via self.call in the @api decorator""" - - -class RepositoryNoCache: - """A not caching Repository wrapper, passes through to repository. - - Just to have same API (including the context manager) as RepositoryCache. - - *transform* is a callable taking two arguments, key and raw repository data. - The return value is returned from get()/get_many(). By default, the raw - repository data is returned. - """ - - def __init__(self, repository, transform=None): - self.repository = repository - self.transform = transform or (lambda key, data: data) - - def close(self): - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - - def get(self, key, read_data=True, raise_missing=True): - return next(self.get_many([key], read_data=read_data, raise_missing=raise_missing, cache=False)) - - def get_many(self, keys, read_data=True, cache=True, raise_missing=True): - for key, data in zip(keys, self.repository.get_many(keys, read_data=read_data, raise_missing=raise_missing)): - yield self.transform(key, data) - - def log_instrumentation(self): - pass - - -class RepositoryCache(RepositoryNoCache): - """ - A caching Repository wrapper. - - Caches Repository GET operations locally. - - *pack* and *unpack* complement *transform* of the base class. - *pack* receives the output of *transform* and should return bytes, - which are stored in the cache. *unpack* receives these bytes and - should return the initial data (as returned by *transform*). - """ - - def __init__(self, repository, pack=None, unpack=None, transform=None): - super().__init__(repository, transform) - self.pack = pack or (lambda data: data) - self.unpack = unpack or (lambda data: data) - self.cache = set() - self.basedir = tempfile.mkdtemp(prefix="borg-cache-") - self.query_size_limit() - self.size = 0 - # Instrumentation - self.hits = 0 - self.misses = 0 - self.slow_misses = 0 - self.slow_lat = 0.0 - self.evictions = 0 - self.enospc = 0 - - def query_size_limit(self): - available_space = shutil.disk_usage(self.basedir).free - self.size_limit = int(min(available_space * 0.25, 2**31)) - - def prefixed_key(self, key, complete): - # just prefix another byte telling whether this key refers to a complete chunk - # or a without-data-metadata-only chunk (see also read_data param). - prefix = b"\x01" if complete else b"\x00" - return prefix + key - - def key_filename(self, key): - return os.path.join(self.basedir, bin_to_hex(key)) - - def backoff(self): - self.query_size_limit() - target_size = int(0.9 * self.size_limit) - while self.size > target_size and self.cache: - key = self.cache.pop() - file = self.key_filename(key) - self.size -= os.stat(file).st_size - os.unlink(file) - self.evictions += 1 - - def add_entry(self, key, data, cache, complete): - transformed = self.transform(key, data) - if not cache: - return transformed - packed = self.pack(transformed) - pkey = self.prefixed_key(key, complete=complete) - file = self.key_filename(pkey) - try: - with open(file, "wb") as fd: - fd.write(packed) - except OSError as os_error: - try: - safe_unlink(file) - except FileNotFoundError: - pass # open() could have failed as well - if os_error.errno == errno.ENOSPC: - self.enospc += 1 - self.backoff() - else: - raise - else: - self.size += len(packed) - self.cache.add(pkey) - if self.size > self.size_limit: - self.backoff() - return transformed - - def log_instrumentation(self): - logger.debug( - "RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), " - "%d evictions, %d ENOSPC hit", - len(self.cache), - format_file_size(self.size), - format_file_size(self.size_limit), - self.hits, - self.misses, - self.slow_misses, - self.slow_lat, - self.evictions, - self.enospc, - ) - - def close(self): - self.log_instrumentation() - self.cache.clear() - shutil.rmtree(self.basedir) - - def get_many(self, keys, read_data=True, cache=True, raise_missing=True): - # It could use different cache keys depending on read_data and cache full vs. meta-only chunks. - unknown_keys = [key for key in keys if self.prefixed_key(key, complete=read_data) not in self.cache] - repository_iterator = zip( - unknown_keys, self.repository.get_many(unknown_keys, read_data=read_data, raise_missing=raise_missing) - ) - for key in keys: - pkey = self.prefixed_key(key, complete=read_data) - if pkey in self.cache: - file = self.key_filename(pkey) - with open(file, "rb") as fd: - self.hits += 1 - yield self.unpack(fd.read()) - else: - for key_, data in repository_iterator: - if key_ == key: - transformed = self.add_entry(key, data, cache, complete=read_data) - self.misses += 1 - yield transformed - break - else: - # slow path: eviction during this get_many removed this key from the cache - t0 = time.perf_counter() - data = self.repository.get(key, read_data=read_data, raise_missing=raise_missing) - self.slow_lat += time.perf_counter() - t0 - transformed = self.add_entry(key, data, cache, complete=read_data) - self.slow_misses += 1 - yield transformed - # Consume any pending requests - for _ in repository_iterator: - pass - - -def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None, force_cache=False): - """ - Return a Repository(No)Cache for *repository*. - - If *decrypted_cache* is a repo_objs object, then get and get_many will return a tuple - (csize, plaintext) instead of the actual data in the repository. The cache will - store decrypted data, which increases CPU efficiency (by avoiding repeatedly decrypting - and more importantly MAC and ID checking cached objects). - Internally, objects are compressed with LZ4. - """ - if decrypted_cache and (pack or unpack or transform): - raise ValueError("decrypted_cache and pack/unpack/transform are incompatible") - elif decrypted_cache: - repo_objs = decrypted_cache - # 32 bit csize, 64 bit (8 byte) xxh64, 1 byte ctype, 1 byte clevel - cache_struct = struct.Struct("=I8sBB") - compressor = Compressor("lz4") - - def pack(data): - csize, decrypted = data - meta, compressed = compressor.compress({}, decrypted) - return cache_struct.pack(csize, xxh64(compressed).digest(), meta["ctype"], meta["clevel"]) + compressed - - def unpack(data): - data = memoryview(data) - csize, checksum, ctype, clevel = cache_struct.unpack(data[: cache_struct.size]) - compressed = data[cache_struct.size :] - if checksum != xxh64(compressed).digest(): - raise IntegrityError("detected corrupted data in metadata cache") - meta = dict(ctype=ctype, clevel=clevel, csize=len(compressed)) - _, decrypted = compressor.decompress(meta, compressed) - return csize, decrypted - - def transform(id_, data): - meta, decrypted = repo_objs.parse(id_, data, ro_type=ROBJ_DONTCARE) - csize = meta.get("csize", len(data)) - return csize, decrypted - - if isinstance(repository, LegacyRemoteRepository) or force_cache: - return RepositoryCache(repository, pack, unpack, transform) - else: - return RepositoryNoCache(repository, transform) From 6565b44f551275f147776730c23991256e67bdd0 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Mon, 8 Jun 2026 09:26:51 +0200 Subject: [PATCH 4/4] dissolve borg.remote: server -> legacy.remote, cache helpers -> repository borg.remote no longer fit its name: it held the legacy-only borg serve server plus generic repository cache wrappers used by current repos. Split by purpose and remove the module: - Move RepositoryServer into borg.legacy.remote (it only serves legacy v1 ssh repositories). It reuses the exception classes (PathNotAllowed, InvalidRPCMethod, UnexpectedRPCDataFormatFromClient) and BORG_VERSION / MSGID constants already defined there; open() uses the module-level LegacyRepository. serve_cmd.py now imports RepositoryServer from ..legacy.remote. - Move RepositoryNoCache and cache_if_remote into borg.repository (they wrap a Repository and are used by Archive.check and mount of current repos). archive.py and mount_cmds.py import them from ..repository now. - Move the cache_if_remote tests into repository_test.py; delete remote_test.py. - Delete src/borg/remote.py; fix the stale BUFSIZE comment in constants.py. Pure relocation, no behavior change. Co-Authored-By: Claude Opus 4.8 --- src/borg/archive.py | 3 +- src/borg/archiver/mount_cmds.py | 2 +- src/borg/archiver/serve_cmd.py | 2 +- src/borg/constants.py | 2 +- src/borg/legacy/remote.py | 248 ++++++++++++++++++- src/borg/remote.py | 339 -------------------------- src/borg/repository.py | 55 +++++ src/borg/testsuite/remote_test.py | 62 ----- src/borg/testsuite/repository_test.py | 55 ++++- 9 files changed, 360 insertions(+), 408 deletions(-) delete mode 100644 src/borg/remote.py delete mode 100644 src/borg/testsuite/remote_test.py diff --git a/src/borg/archive.py b/src/borg/archive.py index 82bcaa98c..ee156c412 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -50,8 +50,7 @@ from .patterns import PathPrefixPattern, FnmatchPattern, IECommand from .item import Item, ArchiveItem, ItemDiff from . import platform from .platform import acl_get, acl_set, set_flags, get_flags, swidth -from .remote import cache_if_remote -from .repository import Repository, NoManifestError +from .repository import Repository, NoManifestError, cache_if_remote from .repoobj import RepoObj has_link = hasattr(os, "link") diff --git a/src/borg/archiver/mount_cmds.py b/src/borg/archiver/mount_cmds.py index 6111df006..78cf64121 100644 --- a/src/borg/archiver/mount_cmds.py +++ b/src/borg/archiver/mount_cmds.py @@ -7,7 +7,7 @@ from ..helpers import PathSpec from ..helpers import umount from ..helpers.argparsing import ArgumentParser from ..manifest import Manifest -from ..remote import cache_if_remote +from ..repository import cache_if_remote from ..logger import create_logger diff --git a/src/borg/archiver/serve_cmd.py b/src/borg/archiver/serve_cmd.py index 6f9ebba97..af2418912 100644 --- a/src/borg/archiver/serve_cmd.py +++ b/src/borg/archiver/serve_cmd.py @@ -1,5 +1,5 @@ from ..constants import * # NOQA -from ..remote import RepositoryServer +from ..legacy.remote import RepositoryServer from ..logger import create_logger from ..helpers.argparsing import ArgumentParser diff --git a/src/borg/constants.py b/src/borg/constants.py index 6ea8e5b4c..b62af7bf9 100644 --- a/src/borg/constants.py +++ b/src/borg/constants.py @@ -73,7 +73,7 @@ IDS_PER_CHUNK = MAX_DATA_SIZE // 40 # we use it in all places where we need to detect or create all-zero buffers zeros = bytes(MAX_DATA_SIZE) -# borg.remote read() buffer size +# borg serve (borg.legacy.remote) read() buffer size BUFSIZE = 10 * 1024 * 1024 # To use a safe, limited unpacker, we need to set an upper limit to the archive count in the manifest. diff --git a/src/borg/legacy/remote.py b/src/borg/legacy/remote.py index ffb4cda55..abcc7fc85 100644 --- a/src/borg/legacy/remote.py +++ b/src/borg/legacy/remote.py @@ -3,27 +3,32 @@ import functools import inspect import logging import os +import queue import select import shlex import socket import sys import textwrap import time +import traceback from subprocess import Popen, PIPE +import borg.logger from .. import __version__ from ..constants import * # NOQA from ..helpers import Error, ErrorWithTraceback, IntegrityError from ..helpers import bin_to_hex from ..helpers import get_limited_unpacker from ..helpers import replace_placeholders +from ..helpers import sysinfo from ..helpers import format_file_size from ..helpers import prepare_subprocess_env, ignore_sigint from ..helpers import get_socket_filename from ..fslocking import LockTimeout, NotLocked, NotMyLock, LockFailed -from ..logger import create_logger +from ..logger import create_logger, borg_serve_log_queue from ..helpers import msgpack from .repository import LegacyRepository +from ..repository import Repository, StoreObjectNotFound from ..version import parse_version, format_version from ..helpers.datastruct import EfficientCollectionQueue from ..platform import is_win32 @@ -724,3 +729,244 @@ class LegacyRemoteRepository: @api(since=parse_version("2.0.0b8")) def put_manifest(self, data): """actual remoting is done via self.call in the @api decorator""" + + +# borg serve: borg only serves legacy (borg 1.x / v1) repositories over ssh:// now (current +# repositories use rest://). The legacy client above (LegacyRemoteRepository) spawns "borg serve" +# on the remote host; this server keeps the legacy RPC method allowlist and opens LegacyRepository. + + +class RepositoryServer: # pragma: no cover + _legacy_rpc_methods = ( # LegacyRepository + "__len__", + "check", + "commit", + "delete", + "destroy", + "get", + "list", + "negotiate", + "open", + "close", + "info", + "put", + "rollback", + "save_key", + "load_key", + "break_lock", + "inject_exception", + "get_manifest", # borg2 LegacyRepository has this + ) + + def __init__(self, restrict_to_paths, restrict_to_repositories, permissions=None): + self.repository = None + self.RepoCls = None + self.rpc_methods = ("open", "close", "negotiate") + self.restrict_to_paths = restrict_to_paths + self.restrict_to_repositories = restrict_to_repositories + self.permissions = permissions + self.client_version = None # we update this after client sends version information + + def filter_args(self, f, kwargs): + """Remove unknown named parameters from call, because client did (implicitly) say it's ok.""" + known = set(inspect.signature(f).parameters) + return {name: kwargs[name] for name in kwargs if name in known} + + def send_queued_log(self): + while True: + try: + # lr_dict contents see BorgQueueHandler + lr_dict = borg_serve_log_queue.get_nowait() + except queue.Empty: + break + else: + msg = msgpack.packb({LOG: lr_dict}) + os.write(self.stdout_fd, msg) + + def serve(self): + def inner_serve(): + os.set_blocking(self.stdin_fd, False) + assert not os.get_blocking(self.stdin_fd) + os.set_blocking(self.stdout_fd, True) + assert os.get_blocking(self.stdout_fd) + + unpacker = get_limited_unpacker("server") + shutdown_serve = False + while True: + # before processing any new RPCs, send out all pending log output + self.send_queued_log() + + if shutdown_serve: + # shutdown wanted! get out of here after sending all log output. + assert self.repository is None + return + + # process new RPCs + r, w, es = select.select([self.stdin_fd], [], [], 10) + if r: + data = os.read(self.stdin_fd, BUFSIZE) + if not data: + shutdown_serve = True + continue + unpacker.feed(data) + for unpacked in unpacker: + if isinstance(unpacked, dict): + msgid = unpacked[MSGID] + method = unpacked[MSG] + args = unpacked[ARGS] + else: + if self.repository is not None: + self.repository.close() + raise UnexpectedRPCDataFormatFromClient(__version__) + try: + # logger.debug(f"{type(self)} method: {type(self.repository)}.{method}") + if method not in self.rpc_methods: + raise InvalidRPCMethod(method) + try: + f = getattr(self, method) + except AttributeError: + f = getattr(self.repository, method) + args = self.filter_args(f, args) + res = f(**args) + except BaseException as e: + # These exceptions are reconstructed on the client end in + # LegacyRemoteRepository.call_many(), and will be handled just like locally raised + # exceptions. Suppress the remote traceback for these, except ErrorWithTraceback, + # which should always display a traceback. + reconstructed_exceptions = ( + Repository.InvalidRepository, + Repository.InvalidRepositoryConfig, + Repository.DoesNotExist, + Repository.AlreadyExists, + Repository.PathAlreadyExists, + PathNotAllowed, + Repository.InsufficientFreeSpaceError, + ) + # logger.exception(e) + ex_short = traceback.format_exception_only(e.__class__, e) + ex_full = traceback.format_exception(*sys.exc_info()) + ex_trace = True + if isinstance(e, Error): + ex_short = [e.get_message()] + ex_trace = e.traceback + if not isinstance(e, reconstructed_exceptions): + logging.debug("\n".join(ex_full)) + + sys_info = sysinfo() + # StoreObjectNotFound and Repository.ObjectNotFound both have + # __name__ == "ObjectNotFound", so we need to distinguish them + # explicitly for correct client-side reconstruction. + exc_cls_name = ( + "StoreObjectNotFound" if isinstance(e, StoreObjectNotFound) else e.__class__.__name__ + ) + try: + msg = msgpack.packb( + { + MSGID: msgid, + "exception_class": exc_cls_name, + "exception_args": e.args, + "exception_full": ex_full, + "exception_short": ex_short, + "exception_trace": ex_trace, + "sysinfo": sys_info, + } + ) + except TypeError: + msg = msgpack.packb( + { + MSGID: msgid, + "exception_class": exc_cls_name, + "exception_args": [ + x if isinstance(x, (str, bytes, int)) else None for x in e.args + ], + "exception_full": ex_full, + "exception_short": ex_short, + "exception_trace": ex_trace, + "sysinfo": sys_info, + } + ) + os.write(self.stdout_fd, msg) + else: + os.write(self.stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res})) + if es: + shutdown_serve = True + continue + + # server for one ssh:// connection + self.stdin_fd = sys.stdin.fileno() + self.stdout_fd = sys.stdout.fileno() + inner_serve() + + def negotiate(self, client_data): + if isinstance(client_data, dict): + self.client_version = client_data["client_version"] + else: + self.client_version = BORG_VERSION # seems to be newer than current version (no known old format) + + # not a known old format, send newest negotiate this version knows + return {"server_version": BORG_VERSION} + + def _resolve_path(self, path): + if isinstance(path, bytes): + path = os.fsdecode(path) + path = os.path.realpath(path) + return path + + def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, v1_legacy=False): + # borg only serves legacy (v1) repositories now; current repositories are accessed via rest://. + self.RepoCls = LegacyRepository + self.rpc_methods = self._legacy_rpc_methods + logging.debug("Resolving repository path %r", path) + path = self._resolve_path(path) + logging.debug("Resolved repository path to %r", path) + path_with_sep = os.path.join(path, "") # make sure there is a trailing slash (os.sep) + if self.restrict_to_paths: + # if --restrict-to-path P is given, we make sure that we only operate in/below path P. + # for the prefix check, it is important that the compared paths both have trailing slashes, + # so that a path /foobar will NOT be accepted with --restrict-to-path /foo option. + for restrict_to_path in self.restrict_to_paths: + restrict_to_path_with_sep = os.path.join(os.path.realpath(restrict_to_path), "") # trailing slash + if path_with_sep.startswith(restrict_to_path_with_sep): + break + else: + raise PathNotAllowed(path) + if self.restrict_to_repositories: + for restrict_to_repository in self.restrict_to_repositories: + restrict_to_repository_with_sep = os.path.join(os.path.realpath(restrict_to_repository), "") + if restrict_to_repository_with_sep == path_with_sep: + break + else: + raise PathNotAllowed(path) + kwargs = dict(lock_wait=lock_wait, lock=lock, exclusive=exclusive, send_log_cb=self.send_queued_log) + self.repository = self.RepoCls(path, create, **kwargs) + self.repository.__enter__() # clean exit handled by serve() method + return self.repository.id + + def close(self): + if self.repository is not None: + self.repository.__exit__(None, None, None) + self.repository = None + borg.logger.flush_logging() + self.send_queued_log() + + def inject_exception(self, kind): + s1 = "test string" + s2 = "test string2" + if kind == "DoesNotExist": + raise self.RepoCls.DoesNotExist(s1) + elif kind == "AlreadyExists": + raise self.RepoCls.AlreadyExists(s1) + elif kind == "CheckNeeded": + raise self.RepoCls.CheckNeeded(s1) + elif kind == "IntegrityError": + raise IntegrityError(s1) + elif kind == "PathNotAllowed": + raise PathNotAllowed("foo") + elif kind == "ObjectNotFound": + raise self.RepoCls.ObjectNotFound(s1, s2) + elif kind == "StoreObjectNotFound": + raise StoreObjectNotFound(s1) + elif kind == "InvalidRPCMethod": + raise InvalidRPCMethod(s1) + elif kind == "divide": + 0 // 0 diff --git a/src/borg/remote.py b/src/borg/remote.py deleted file mode 100644 index 20918f9f4..000000000 --- a/src/borg/remote.py +++ /dev/null @@ -1,339 +0,0 @@ -import inspect -import logging -import os -import queue -import select -import sys -import traceback - -import borg.logger -from . import __version__ -from .constants import * # NOQA -from .helpers import Error, IntegrityError -from .helpers import get_limited_unpacker -from .helpers import sysinfo -from .logger import create_logger, borg_serve_log_queue -from .helpers import msgpack -from .repository import Repository, StoreObjectNotFound -from .version import parse_version - -logger = create_logger(__name__) - -BORG_VERSION = parse_version(__version__) -MSGID, MSG, ARGS, RESULT, LOG = "i", "m", "a", "r", "l" - - -class PathNotAllowed(Error): - """Repository path not allowed: {}""" - - exit_mcode = 83 - - -class InvalidRPCMethod(Error): - """RPC method {} is not valid""" - - exit_mcode = 82 - - -class UnexpectedRPCDataFormatFromClient(Error): - """Borg {}: Got unexpected RPC data format from client.""" - - exit_mcode = 85 - - -# Protocol compatibility: -# borg only serves legacy (borg 1.x / v1) repositories over ssh:// now (current repositories use rest://). -# The legacy client lives in borg.legacy.remote (LegacyRemoteRepository); this server keeps the legacy -# RPC method allowlist and opens repositories using LegacyRepository. - - -class RepositoryServer: # pragma: no cover - _legacy_rpc_methods = ( # LegacyRepository - "__len__", - "check", - "commit", - "delete", - "destroy", - "get", - "list", - "negotiate", - "open", - "close", - "info", - "put", - "rollback", - "save_key", - "load_key", - "break_lock", - "inject_exception", - "get_manifest", # borg2 LegacyRepository has this - ) - - def __init__(self, restrict_to_paths, restrict_to_repositories, permissions=None): - self.repository = None - self.RepoCls = None - self.rpc_methods = ("open", "close", "negotiate") - self.restrict_to_paths = restrict_to_paths - self.restrict_to_repositories = restrict_to_repositories - self.permissions = permissions - self.client_version = None # we update this after client sends version information - - def filter_args(self, f, kwargs): - """Remove unknown named parameters from call, because client did (implicitly) say it's ok.""" - known = set(inspect.signature(f).parameters) - return {name: kwargs[name] for name in kwargs if name in known} - - def send_queued_log(self): - while True: - try: - # lr_dict contents see BorgQueueHandler - lr_dict = borg_serve_log_queue.get_nowait() - except queue.Empty: - break - else: - msg = msgpack.packb({LOG: lr_dict}) - os.write(self.stdout_fd, msg) - - def serve(self): - def inner_serve(): - os.set_blocking(self.stdin_fd, False) - assert not os.get_blocking(self.stdin_fd) - os.set_blocking(self.stdout_fd, True) - assert os.get_blocking(self.stdout_fd) - - unpacker = get_limited_unpacker("server") - shutdown_serve = False - while True: - # before processing any new RPCs, send out all pending log output - self.send_queued_log() - - if shutdown_serve: - # shutdown wanted! get out of here after sending all log output. - assert self.repository is None - return - - # process new RPCs - r, w, es = select.select([self.stdin_fd], [], [], 10) - if r: - data = os.read(self.stdin_fd, BUFSIZE) - if not data: - shutdown_serve = True - continue - unpacker.feed(data) - for unpacked in unpacker: - if isinstance(unpacked, dict): - msgid = unpacked[MSGID] - method = unpacked[MSG] - args = unpacked[ARGS] - else: - if self.repository is not None: - self.repository.close() - raise UnexpectedRPCDataFormatFromClient(__version__) - try: - # logger.debug(f"{type(self)} method: {type(self.repository)}.{method}") - if method not in self.rpc_methods: - raise InvalidRPCMethod(method) - try: - f = getattr(self, method) - except AttributeError: - f = getattr(self.repository, method) - args = self.filter_args(f, args) - res = f(**args) - except BaseException as e: - # These exceptions are reconstructed on the client end in RemoteRepository.call_many(), - # and will be handled just like locally raised exceptions. Suppress the remote traceback - # for these, except ErrorWithTraceback, which should always display a traceback. - reconstructed_exceptions = ( - Repository.InvalidRepository, - Repository.InvalidRepositoryConfig, - Repository.DoesNotExist, - Repository.AlreadyExists, - Repository.PathAlreadyExists, - PathNotAllowed, - Repository.InsufficientFreeSpaceError, - ) - # logger.exception(e) - ex_short = traceback.format_exception_only(e.__class__, e) - ex_full = traceback.format_exception(*sys.exc_info()) - ex_trace = True - if isinstance(e, Error): - ex_short = [e.get_message()] - ex_trace = e.traceback - if not isinstance(e, reconstructed_exceptions): - logging.debug("\n".join(ex_full)) - - sys_info = sysinfo() - # StoreObjectNotFound and Repository.ObjectNotFound both have - # __name__ == "ObjectNotFound", so we need to distinguish them - # explicitly for correct client-side reconstruction. - exc_cls_name = ( - "StoreObjectNotFound" if isinstance(e, StoreObjectNotFound) else e.__class__.__name__ - ) - try: - msg = msgpack.packb( - { - MSGID: msgid, - "exception_class": exc_cls_name, - "exception_args": e.args, - "exception_full": ex_full, - "exception_short": ex_short, - "exception_trace": ex_trace, - "sysinfo": sys_info, - } - ) - except TypeError: - msg = msgpack.packb( - { - MSGID: msgid, - "exception_class": exc_cls_name, - "exception_args": [ - x if isinstance(x, (str, bytes, int)) else None for x in e.args - ], - "exception_full": ex_full, - "exception_short": ex_short, - "exception_trace": ex_trace, - "sysinfo": sys_info, - } - ) - os.write(self.stdout_fd, msg) - else: - os.write(self.stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res})) - if es: - shutdown_serve = True - continue - - # server for one ssh:// connection - self.stdin_fd = sys.stdin.fileno() - self.stdout_fd = sys.stdout.fileno() - inner_serve() - - def negotiate(self, client_data): - if isinstance(client_data, dict): - self.client_version = client_data["client_version"] - else: - self.client_version = BORG_VERSION # seems to be newer than current version (no known old format) - - # not a known old format, send newest negotiate this version knows - return {"server_version": BORG_VERSION} - - def _resolve_path(self, path): - if isinstance(path, bytes): - path = os.fsdecode(path) - path = os.path.realpath(path) - return path - - def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, v1_legacy=False): - # borg only serves legacy (v1) repositories now; current repositories are accessed via rest://. - from .legacy.repository import LegacyRepository - - self.RepoCls = LegacyRepository - self.rpc_methods = self._legacy_rpc_methods - logging.debug("Resolving repository path %r", path) - path = self._resolve_path(path) - logging.debug("Resolved repository path to %r", path) - path_with_sep = os.path.join(path, "") # make sure there is a trailing slash (os.sep) - if self.restrict_to_paths: - # if --restrict-to-path P is given, we make sure that we only operate in/below path P. - # for the prefix check, it is important that the compared paths both have trailing slashes, - # so that a path /foobar will NOT be accepted with --restrict-to-path /foo option. - for restrict_to_path in self.restrict_to_paths: - restrict_to_path_with_sep = os.path.join(os.path.realpath(restrict_to_path), "") # trailing slash - if path_with_sep.startswith(restrict_to_path_with_sep): - break - else: - raise PathNotAllowed(path) - if self.restrict_to_repositories: - for restrict_to_repository in self.restrict_to_repositories: - restrict_to_repository_with_sep = os.path.join(os.path.realpath(restrict_to_repository), "") - if restrict_to_repository_with_sep == path_with_sep: - break - else: - raise PathNotAllowed(path) - kwargs = dict(lock_wait=lock_wait, lock=lock, exclusive=exclusive, send_log_cb=self.send_queued_log) - self.repository = self.RepoCls(path, create, **kwargs) - self.repository.__enter__() # clean exit handled by serve() method - return self.repository.id - - def close(self): - if self.repository is not None: - self.repository.__exit__(None, None, None) - self.repository = None - borg.logger.flush_logging() - self.send_queued_log() - - def inject_exception(self, kind): - s1 = "test string" - s2 = "test string2" - if kind == "DoesNotExist": - raise self.RepoCls.DoesNotExist(s1) - elif kind == "AlreadyExists": - raise self.RepoCls.AlreadyExists(s1) - elif kind == "CheckNeeded": - raise self.RepoCls.CheckNeeded(s1) - elif kind == "IntegrityError": - raise IntegrityError(s1) - elif kind == "PathNotAllowed": - raise PathNotAllowed("foo") - elif kind == "ObjectNotFound": - raise self.RepoCls.ObjectNotFound(s1, s2) - elif kind == "StoreObjectNotFound": - raise StoreObjectNotFound(s1) - elif kind == "InvalidRPCMethod": - raise InvalidRPCMethod(s1) - elif kind == "divide": - 0 // 0 - - -class RepositoryNoCache: - """A Repository wrapper that passes through to the repository. - - It applies an optional *transform* and provides a uniform context-manager API. - - *transform* is a callable taking two arguments, key and raw repository data. - The return value is returned from get()/get_many(). By default, the raw - repository data is returned. - """ - - def __init__(self, repository, transform=None): - self.repository = repository - self.transform = transform or (lambda key, data: data) - - def close(self): - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - - def get(self, key, read_data=True, raise_missing=True): - return next(self.get_many([key], read_data=read_data, raise_missing=raise_missing, cache=False)) - - def get_many(self, keys, read_data=True, raise_missing=True, cache=True): - for key, data in zip(keys, self.repository.get_many(keys, read_data=read_data, raise_missing=raise_missing)): - yield self.transform(key, data) - - def log_instrumentation(self): - pass - - -def cache_if_remote(repository, *, decrypted_cache=False, transform=None): - """ - Return a RepositoryNoCache wrapping *repository*. - - If *decrypted_cache* is a repo_objs object, then get and get_many will return a tuple - (csize, plaintext) instead of the actual data in the repository (the objects are - parsed/decrypted via the *transform* derived from it). - """ - if decrypted_cache and transform: - raise ValueError("decrypted_cache and transform are incompatible") - elif decrypted_cache: - repo_objs = decrypted_cache - - def transform(id_, data): - meta, decrypted = repo_objs.parse(id_, data, ro_type=ROBJ_DONTCARE) - csize = meta.get("csize", len(data)) - return csize, decrypted - - return RepositoryNoCache(repository, transform) diff --git a/src/borg/repository.py b/src/borg/repository.py index 51e1af70d..04a6d9d51 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -599,3 +599,58 @@ class Repository: def store_move(self, name, new_name=None, *, delete=False, undelete=False, deleted=False): self._lock_refresh() return self.store.move(name, new_name, delete=delete, undelete=undelete, deleted=deleted) + + +class RepositoryNoCache: + """A Repository wrapper that passes through to the repository. + + It applies an optional *transform* and provides a uniform context-manager API. + + *transform* is a callable taking two arguments, key and raw repository data. + The return value is returned from get()/get_many(). By default, the raw + repository data is returned. + """ + + def __init__(self, repository, transform=None): + self.repository = repository + self.transform = transform or (lambda key, data: data) + + def close(self): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def get(self, key, read_data=True, raise_missing=True): + return next(self.get_many([key], read_data=read_data, raise_missing=raise_missing, cache=False)) + + def get_many(self, keys, read_data=True, raise_missing=True, cache=True): + for key, data in zip(keys, self.repository.get_many(keys, read_data=read_data, raise_missing=raise_missing)): + yield self.transform(key, data) + + def log_instrumentation(self): + pass + + +def cache_if_remote(repository, *, decrypted_cache=False, transform=None): + """ + Return a RepositoryNoCache wrapping *repository*. + + If *decrypted_cache* is a repo_objs object, then get and get_many will return a tuple + (csize, plaintext) instead of the actual data in the repository (the objects are + parsed/decrypted via the *transform* derived from it). + """ + if decrypted_cache and transform: + raise ValueError("decrypted_cache and transform are incompatible") + elif decrypted_cache: + repo_objs = decrypted_cache + + def transform(id_, data): + meta, decrypted = repo_objs.parse(id_, data, ro_type=ROBJ_DONTCARE) + csize = meta.get("csize", len(data)) + return csize, decrypted + + return RepositoryNoCache(repository, transform) diff --git a/src/borg/testsuite/remote_test.py b/src/borg/testsuite/remote_test.py deleted file mode 100644 index 4a5ca5e14..000000000 --- a/src/borg/testsuite/remote_test.py +++ /dev/null @@ -1,62 +0,0 @@ -import os - -import pytest - -from ..constants import ROBJ_FILE_STREAM -from ..remote import cache_if_remote -from ..repository import Repository -from ..crypto.key import PlaintextKey -from ..repoobj import RepoObj -from .hashindex_test import H -from .repository_test import fchunk, pdchunk -from .crypto.key_test import TestKey - - -class TestCacheIfRemote: - @pytest.fixture - def repository(self, tmpdir): - self.repository_location = os.path.join(str(tmpdir), "repository") - with Repository(self.repository_location, exclusive=True, create=True) as repository: - repository.put(H(1), fchunk(b"1234")) - repository.put(H(2), fchunk(b"5678")) - repository.put(H(3), fchunk(bytes(100))) - yield repository - - def test_passthrough(self, repository): - # Without decrypted_cache, raw repository data is passed through unchanged. - with cache_if_remote(repository) as cached: - assert pdchunk(cached.get(H(1))) == b"1234" - assert [pdchunk(ch) for ch in cached.get_many([H(1), H(2)])] == [b"1234", b"5678"] - - @pytest.fixture - def key(self, repository, monkeypatch): - monkeypatch.setenv("BORG_PASSPHRASE", "test") - return PlaintextKey.create(repository, TestKey.MockArgs()) - - @pytest.fixture - def repo_objs(self, key): - return RepoObj(key) - - def _put_encrypted_object(self, repo_objs, repository, data): - id_ = repo_objs.id_hash(data) - repository.put(id_, repo_objs.format(id_, {}, data, ro_type=ROBJ_FILE_STREAM)) - return id_ - - @pytest.fixture - def H1(self, repo_objs, repository): - return self._put_encrypted_object(repo_objs, repository, b"1234") - - @pytest.fixture - def H2(self, repo_objs, repository): - return self._put_encrypted_object(repo_objs, repository, b"5678") - - def test_decrypted_cache(self, repo_objs, repository, H1, H2): - # With decrypted_cache, get/get_many return (csize, plaintext) tuples. - with cache_if_remote(repository, decrypted_cache=repo_objs) as cached: - csize, plaintext = cached.get(H1) - assert plaintext == b"1234" - assert [pt for _csize, pt in cached.get_many([H1, H2])] == [b"1234", b"5678"] - - def test_decrypted_cache_and_transform_incompatible(self, repository, repo_objs): - with pytest.raises(ValueError): - cache_if_remote(repository, decrypted_cache=repo_objs, transform=lambda key, data: data) diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 91a4b7e2a..601c1ecc0 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -1,10 +1,13 @@ import os import pytest +from ..constants import ROBJ_FILE_STREAM from ..helpers import IntegrityError -from ..repository import Repository, MAX_DATA_SIZE +from ..repository import Repository, MAX_DATA_SIZE, cache_if_remote from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION +from ..crypto.key import PlaintextKey from .hashindex_test import H +from .crypto.key_test import TestKey @pytest.fixture() @@ -129,3 +132,53 @@ def check(repository, repo_path, repair=False, status=True): # Make sure no tmp files are left behind tmp_files = [name for name in os.listdir(repo_path) if "tmp" in name] assert tmp_files == [], "Found tmp files" + + +class TestCacheIfRemote: + @pytest.fixture + def cache_repository(self, tmpdir): + repository_location = os.path.join(str(tmpdir), "repository") + with Repository(repository_location, exclusive=True, create=True) as repository: + repository.put(H(1), fchunk(b"1234")) + repository.put(H(2), fchunk(b"5678")) + repository.put(H(3), fchunk(bytes(100))) + yield repository + + def test_passthrough(self, cache_repository): + # Without decrypted_cache, raw repository data is passed through unchanged. + with cache_if_remote(cache_repository) as cached: + assert pdchunk(cached.get(H(1))) == b"1234" + assert [pdchunk(ch) for ch in cached.get_many([H(1), H(2)])] == [b"1234", b"5678"] + + @pytest.fixture + def key(self, cache_repository, monkeypatch): + monkeypatch.setenv("BORG_PASSPHRASE", "test") + return PlaintextKey.create(cache_repository, TestKey.MockArgs()) + + @pytest.fixture + def repo_objs(self, key): + return RepoObj(key) + + def _put_encrypted_object(self, repo_objs, repository, data): + id_ = repo_objs.id_hash(data) + repository.put(id_, repo_objs.format(id_, {}, data, ro_type=ROBJ_FILE_STREAM)) + return id_ + + @pytest.fixture + def H1(self, repo_objs, cache_repository): + return self._put_encrypted_object(repo_objs, cache_repository, b"1234") + + @pytest.fixture + def H2(self, repo_objs, cache_repository): + return self._put_encrypted_object(repo_objs, cache_repository, b"5678") + + def test_decrypted_cache(self, repo_objs, cache_repository, H1, H2): + # With decrypted_cache, get/get_many return (csize, plaintext) tuples. + with cache_if_remote(cache_repository, decrypted_cache=repo_objs) as cached: + csize, plaintext = cached.get(H1) + assert plaintext == b"1234" + assert [pt for _csize, pt in cached.get_many([H1, H2])] == [b"1234", b"5678"] + + def test_decrypted_cache_and_transform_incompatible(self, cache_repository, repo_objs): + with pytest.raises(ValueError): + cache_if_remote(cache_repository, decrypted_cache=repo_objs, transform=lambda key, data: data)