From cac7237d3f509932f96b4ba8ff63c3753a2bab94 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Mon, 8 Jun 2026 08:04:59 +0200 Subject: [PATCH] remove ssh:// and socket:// remote repository for current repos The modern client/server transport (RemoteRepository served by `borg serve` over an msgpack RPC protocol) is now redundant for current (borg 2) repos: its functionality is replaced by rest:// (which can tunnel over ssh to a remote borgstore REST server). Remove the modern RemoteRepository (both ssh:// and socket://) entirely. Legacy v1 (borg 1.x) repos remain reachable over ssh:// via the separate LegacyRemoteRepository client, and `borg serve` / RepositoryServer is kept, trimmed to the legacy-only path, so a remote borg2 can still serve a v1 repo for `borg transfer --from-borg1`. Details: - remote.py: delete RemoteRepository, SleepingBandwidthLimiter and the `api` decorator; trim RepositoryServer to legacy-only (drop modern _rpc_methods, socket serving, non-legacy open() branch); keep cache_if_remote / RepositoryCache / RepositoryNoCache (used by all repos). - get_repository(): non-legacy ssh:// now raises a clear "use rest://" error; socket:// route and the global --socket option removed. - parseformat: drop the socket:// scheme (now an invalid location). - borg serve: keep the command (serves legacy v1 ssh only); update epilog. - borg version: drop modern remote query; keep legacy ssh path. - update isinstance/import sites (cache, archive, fuse/hlfuse, analyze/compact, archiver __init__ -> LegacyRemoteRepository.RPCError). - tests/docs updated; obsolete socket serve test removed. Co-Authored-By: Claude Opus 4.8 --- docs/changes.rst | 4 + docs/deployment/central-backup-server.rst | 8 + docs/quickstart.rst | 28 +- src/borg/archive.py | 4 +- src/borg/archiver/__init__.py | 6 +- src/borg/archiver/_common.py | 26 +- src/borg/archiver/analyze_cmd.py | 3 +- src/borg/archiver/compact_cmd.py | 3 +- src/borg/archiver/serve_cmd.py | 15 +- src/borg/archiver/version_cmd.py | 19 +- src/borg/cache.py | 3 +- src/borg/conftest.py | 2 +- src/borg/fuse.py | 14 +- src/borg/helpers/parseformat.py | 9 +- src/borg/hlfuse.py | 12 +- src/borg/remote.py | 812 +----------------- src/borg/testsuite/archiver/__init__.py | 6 +- src/borg/testsuite/archiver/check_cmd_test.py | 3 +- src/borg/testsuite/archiver/checks_test.py | 45 - src/borg/testsuite/archiver/serve_cmd_test.py | 55 -- .../testsuite/helpers/parseformat_test.py | 9 +- src/borg/testsuite/remote_test.py | 57 +- src/borg/testsuite/repository_test.py | 152 +--- 23 files changed, 101 insertions(+), 1194 deletions(-) delete mode 100644 src/borg/testsuite/archiver/serve_cmd_test.py diff --git a/docs/changes.rst b/docs/changes.rst index ece97339f..04100fde5 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -171,6 +171,10 @@ New features: - WIP packs project, major repo format changes, you must create new repos! #8572 - rest:// repository URLs - connect via ssh to remote borgstore REST server, talking http via stdio, #9593 +- removed ssh:// and socket:// support for current repositories; use a rest:// + repository instead (it can tunnel over ssh). ssh:// and ``borg serve`` remain + available only for legacy (borg 1.x / v1) repositories, e.g. for + ``borg transfer --from-borg1 --other-repo ssh://...``. - prune: show total vs matching archives in output, #9262 - prune: add --json option, #9222 - archive: preserve cwd archive metadata, #9495 diff --git a/docs/deployment/central-backup-server.rst b/docs/deployment/central-backup-server.rst index d697cce92..b1b0860cf 100644 --- a/docs/deployment/central-backup-server.rst +++ b/docs/deployment/central-backup-server.rst @@ -8,6 +8,14 @@ Central repository server with Ansible or Salt This section gives an example of how to set up a Borg repository server for multiple clients. +.. note:: + + This example predates Borg 2 and uses the legacy ``ssh://`` transport (served + by ``borg serve``) and ``borg init``. With Borg 2, the ``ssh://`` transport is + only used for legacy borg 1.x (v1) repositories; for current repositories use a + ``rest://`` repository instead (Borg connects via ssh and runs a borgstore REST + server on the remote host), and use ``borg repo-create`` instead of ``borg init``. + Machines -------- diff --git a/docs/quickstart.rst b/docs/quickstart.rst index b5313d9cf..222e3eb4c 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -64,7 +64,8 @@ If you only back up your own files, run it as your normal user (i.e. not root). For a local repository always use the same user to invoke borg. -For a remote repository: always use e.g., ssh://borg@remote_host. You can use this +For a remote repository: always use e.g., rest://borg@remote_host (Borg connects +via ssh and runs a borgstore REST server on the remote). You can use this from different local users; the remote user running borg and accessing the repo will always be `borg`. @@ -142,7 +143,7 @@ backed up and that the ``prune`` command keeps and deletes the correct backups. #!/bin/sh # Setting this, so the repo does not need to be given on the commandline: - export BORG_REPO=ssh://username@example.com:2022/~/backup/main + export BORG_REPO=rest://username@example.com:2022/path/to/backup/main # See the section "Passphrase notes" for more infos. export BORG_PASSPHRASE='XYZl0ngandsecurepa_55_phrasea&&123' @@ -361,19 +362,21 @@ Remote repositories Borg can initialize and access repositories on remote hosts if the host is accessible using SSH. This is fastest and easiest when Borg -is installed on the remote host, in which case the following syntax is used:: +is installed on the remote host, in which case a ``rest://`` repository URL is +used. Borg connects via SSH and runs a borgstore REST server on the remote host +(talking HTTP over stdio):: - $ borg -r ssh://user@hostname:port/path/to/repo repo-create ... + $ borg -r rest://user@hostname:port/path/to/repo repo-create ... Note: Please see the usage chapter for a full documentation of repo URLs. Also see :ref:`ssh_configuration` for recommended settings to avoid disconnects and hangs. -Remote operations over SSH can be automated with SSH keys. You can restrict the -use of the SSH keypair by prepending a forced command to the SSH public key in -the remote server's `authorized_keys` file. This example will start Borg -in server mode and limit it to a specific filesystem path:: +.. note:: - command="borg serve --restrict-to-path /path/to/repo",restrict ssh-rsa AAAAB3[...] + The legacy ``ssh://`` transport, served by ``borg serve`` on the remote host, + is now only used to access legacy borg 1.x (v1) repositories (e.g. via + ``borg transfer --from-borg1 --other-repo ssh://...``). For current + repositories, use a ``rest://`` repository as shown above. If it is not possible to install Borg on the remote host, it is still possible to use the remote host to store a repository by @@ -530,7 +533,8 @@ Example with **borg extract**: Difference when using a **remote borg backup server**: It is basically all the same as with the local repository, but you need to -refer to the repo using a ``ssh://`` URL. +refer to the repo using a ``rest://`` URL (Borg connects via ssh and runs a +borgstore REST server on the remote host). In the given example, ``borg`` is the user name used to log into the machine ``backup.example.org`` which runs ssh on port ``2222`` and has the borg repo @@ -544,6 +548,6 @@ case if unattended, automated backups were done). :: - borg -r ssh://borg@backup.example.org:2222/path/to/repo mount /mnt/borg + borg -r rest://borg@backup.example.org:2222/path/to/repo mount /mnt/borg # or - borg -r ssh://borg@backup.example.org:2222/path/to/repo extract archive + borg -r rest://borg@backup.example.org:2222/path/to/repo extract archive diff --git a/src/borg/archive.py b/src/borg/archive.py index 43f8434d3..82bcaa98c 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -50,7 +50,7 @@ from .patterns import PathPrefixPattern, FnmatchPattern, IECommand from .item import Item, ArchiveItem, ItemDiff from . import platform from .platform import acl_get, acl_set, set_flags, get_flags, swidth -from .remote import RemoteRepository, cache_if_remote +from .remote import cache_if_remote from .repository import Repository, NoManifestError from .repoobj import RepoObj @@ -1773,7 +1773,7 @@ class ArchiveChecker: :param oldest/newest: only check archives older/newer than timedelta from oldest/newest archive timestamp :param verify_data: integrity verification of data referenced by archives """ - if not isinstance(repository, (Repository, RemoteRepository)): + if not isinstance(repository, Repository): logger.error("Checking legacy repositories is not supported.") return False logger.info("Starting archive consistency check...") diff --git a/src/borg/archiver/__init__.py b/src/borg/archiver/__init__.py index b90a7c7eb..db3a9bec0 100644 --- a/src/borg/archiver/__init__.py +++ b/src/borg/archiver/__init__.py @@ -47,7 +47,7 @@ try: from ..helpers import sig_int from ..helpers import get_config_dir from ..platformflags import is_msystem - from ..remote import RemoteRepository + from ..legacy.remote import LegacyRemoteRepository from ..selftest import selftest except BaseException: # an unhandled exception in the try-block would cause the borg cli command to exit with rc 1 due to python's @@ -538,7 +538,7 @@ def sig_trace_handler(sig_no, stack): # pragma: no cover def format_tb(exc): qualname = type(exc).__qualname__ - remote = isinstance(exc, RemoteRepository.RPCError) + remote = isinstance(exc, LegacyRemoteRepository.RPCError) if remote: prefix = "Borg server: " trace_back = "\n".join(prefix + line for line in exc.exception_full.splitlines()) @@ -632,7 +632,7 @@ def main(): # pragma: no cover tb_log_level = logging.ERROR if e.traceback else logging.DEBUG tb = format_tb(e) exit_code = e.exit_code - except RemoteRepository.RPCError as e: + except LegacyRemoteRepository.RPCError as e: important = e.traceback msg = e.exception_full if important else e.get_message() msgid = e.exception_class diff --git a/src/borg/archiver/_common.py b/src/borg/archiver/_common.py index 921b7c263..4a566c236 100644 --- a/src/borg/archiver/_common.py +++ b/src/borg/archiver/_common.py @@ -13,7 +13,6 @@ from ..helpers.argparsing import SUPPRESS, PositiveInt from ..helpers.nanorst import rst_to_terminal from ..manifest import Manifest, AI_HUMAN_SORT_KEYS from ..patterns import PatternMatcher -from ..remote import RemoteRepository from ..repository import Repository from ..repoobj import RepoObj from ..patterns import ( @@ -30,16 +29,19 @@ logger = create_logger(__name__) def get_repository(location, *, create, exclusive, lock_wait, lock, args, v1_legacy): - if location.proto in ("ssh", "socket"): + if location.proto == "ssh": if v1_legacy: from ..legacy.remote import LegacyRemoteRepository - RemoteRepoCls = LegacyRemoteRepository + repository = LegacyRemoteRepository( + location, create=create, exclusive=exclusive, lock_wait=lock_wait, lock=lock, args=args + ) else: - RemoteRepoCls = RemoteRepository - repository = RemoteRepoCls( - location, create=create, exclusive=exclusive, lock_wait=lock_wait, lock=lock, args=args - ) + raise Error( + "ssh:// is no longer supported for current repositories; use rest:// instead " + "(it can tunnel over ssh). ssh:// remains available only for legacy v1 repositories " + "via --from-borg1." + ) elif ( location.proto in ("rest", "sftp", "file", "http", "https", "rclone", "s3", "b2") and not v1_legacy @@ -584,16 +586,6 @@ def define_common_options(add_common_option): action=Highlander, help="Use this command to connect to the 'borg serve' process (default: 'ssh')", ) - add_common_option( - "--socket", - metavar="PATH", - dest="use_socket", - default=False, - const=True, - nargs="?", - action=Highlander, - help="Use UNIX DOMAIN (IPC) socket at PATH for client/server communication with socket: protocol.", - ) add_common_option( "-r", "--repo", diff --git a/src/borg/archiver/analyze_cmd.py b/src/borg/archiver/analyze_cmd.py index 3db076aaa..7608c93e9 100644 --- a/src/borg/archiver/analyze_cmd.py +++ b/src/borg/archiver/analyze_cmd.py @@ -8,7 +8,6 @@ from ..helpers import bin_to_hex, Error from ..helpers import ProgressIndicatorPercent from ..helpers.argparsing import ArgumentParser from ..manifest import Manifest -from ..remote import RemoteRepository from ..repository import Repository from ..logger import create_logger @@ -20,7 +19,7 @@ class ArchiveAnalyzer: def __init__(self, args, repository, manifest): self.args = args self.repository = repository - assert isinstance(repository, (Repository, RemoteRepository)) + assert isinstance(repository, Repository) self.manifest = manifest self.difference_by_path = defaultdict(int) # directory path -> count of chunks changed diff --git a/src/borg/archiver/compact_cmd.py b/src/borg/archiver/compact_cmd.py index 66ac4bfbc..1bdd4ebe9 100644 --- a/src/borg/archiver/compact_cmd.py +++ b/src/borg/archiver/compact_cmd.py @@ -11,7 +11,6 @@ from ..hashindex import ChunkIndex, ChunkIndexEntry from ..helpers import set_ec, EXIT_ERROR, format_file_size, bin_to_hex from ..helpers import ProgressIndicatorPercent from ..manifest import Manifest -from ..remote import RemoteRepository from ..repository import Repository, repo_lister from ..logger import create_logger @@ -22,7 +21,7 @@ logger = create_logger() class ArchiveGarbageCollector: def __init__(self, repository, manifest, *, stats, iec): self.repository = repository - assert isinstance(repository, (Repository, RemoteRepository)) + assert isinstance(repository, Repository) self.manifest = manifest self.chunks = None # a ChunkIndex, here used for: id -> (is_used, stored_size) self.total_files = None # overall number of source files written to all archives in this repo diff --git a/src/borg/archiver/serve_cmd.py b/src/borg/archiver/serve_cmd.py index 36661758e..6f9ebba97 100644 --- a/src/borg/archiver/serve_cmd.py +++ b/src/borg/archiver/serve_cmd.py @@ -13,7 +13,6 @@ class ServeMixIn: RepositoryServer( restrict_to_paths=args.restrict_to_paths, restrict_to_repositories=args.restrict_to_repositories, - use_socket=args.use_socket, permissions=args.permissions, ).serve() @@ -24,15 +23,13 @@ class ServeMixIn: """ This command starts a repository server process. - `borg serve` currently supports: + `borg serve` is only used to serve legacy (borg 1.x / v1) repositories over SSH, so that + such repositories can still be accessed remotely (e.g. for `borg transfer --from-borg1`). + Current repositories are accessed via a rest:// repository instead (which can itself tunnel + over SSH), so they do not use `borg serve`. - - Being automatically started via SSH when the borg client uses an ssh://... - remote repository. In this mode, `borg serve` will run until that SSH connection - is terminated. - - - Being started by some other means (not by the borg client) as a long-running socket - server to be used for borg clients using a socket://... repository (see the `--socket` - option if you do not want to use the default path for the socket and PID file). + It is automatically started via SSH when a borg client uses an ssh://... repository. + In this mode, `borg serve` will run until that SSH connection is terminated. Please note that `borg serve` does not support providing a specific repository via the `--repo` option or the `BORG_REPO` environment variable. It is always the borg client that diff --git a/src/borg/archiver/version_cmd.py b/src/borg/archiver/version_cmd.py index 409baaeb7..df5a6f589 100644 --- a/src/borg/archiver/version_cmd.py +++ b/src/borg/archiver/version_cmd.py @@ -1,7 +1,6 @@ from .. import __version__ from ..constants import * # NOQA from ..helpers.argparsing import ArgumentParser -from ..remote import RemoteRepository from ..logger import create_logger @@ -14,8 +13,10 @@ class VersionMixIn: from borg.version import parse_version, format_version client_version = parse_version(__version__) - if args.location.proto in ("ssh", "socket"): - with RemoteRepository(args.location, lock=False, args=args) as repository: + if args.location.proto == "ssh" and getattr(args, "v1_legacy", False): + from ..legacy.remote import LegacyRemoteRepository + + with LegacyRemoteRepository(args.location, lock=False, args=args) as repository: server_version = repository.server_version else: server_version = client_version @@ -28,11 +29,11 @@ class VersionMixIn: """ This command displays the Borg client and server versions. - If a local repository is given, the client code directly accesses the repository, - so the client version is also shown as the server version. + For current repositories the client code directly accesses the repository (also for + rest:// repositories), so the client version is shown as the server version, too. - If a remote repository is given (e.g., ssh:), the remote Borg is queried, and - its version is displayed as the server version. + If a legacy (borg 1.x / v1) repository is given via ssh: together with --from-borg1, + the remote Borg is queried, and its version is displayed as the server version. Examples:: @@ -40,8 +41,8 @@ class VersionMixIn: $ borg version /mnt/backup 1.4.0a / 1.4.0a - # remote repository (client uses 1.4.0 alpha, server uses 1.2.7 release) - $ borg version ssh://borg@borgbackup:repo + # legacy remote repository (client uses 1.4.0 alpha, server uses 1.2.7 release) + $ borg version --from-borg1 ssh://borg@borgbackup:repo 1.4.0a / 1.2.7 Due to the version tuple format used in Borg client/server negotiation, only diff --git a/src/borg/cache.py b/src/borg/cache.py index d6138db86..7ef37c89f 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -32,7 +32,6 @@ from .item import ChunkListEntry from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError from .manifest import Manifest from .platform import SaveFile -from .remote import RemoteRepository from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound, repo_lister from .security import SecurityManager, assert_secure # noqa: F401 @@ -651,7 +650,7 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi flags=ChunkIndex.F_USED, size=0, pack_id=pack_id, obj_offset=0, obj_size=obj_size ) # Cache does not contain the manifest. - if not isinstance(repository, (Repository, RemoteRepository)): + if not isinstance(repository, Repository): del chunks[Manifest.MANIFEST_ID] duration = perf_counter() - t0 or 0.001 # Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes. diff --git a/src/borg/conftest.py b/src/borg/conftest.py index e9ea95f61..557cb66f7 100644 --- a/src/borg/conftest.py +++ b/src/borg/conftest.py @@ -96,7 +96,7 @@ class ArchiverSetup: self.patterns_file_path: str | None = None def get_kind(self) -> str: - if self.repository_location.startswith("ssh://__testsuite__") or self.repository_location.startswith("rest://"): + if self.repository_location.startswith("rest://"): return "remote" elif self.EXE == "borg.exe": return "binary" diff --git a/src/borg/fuse.py b/src/borg/fuse.py index fca4b7aee..c8ec1880c 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -62,7 +62,7 @@ from .crypto.low_level import blake2b_128 from .archiver._common import build_matcher, build_filter from .archive import Archive, get_item_uid_gid from .hashindex import FuseVersionsIndex -from .helpers import daemonize, daemonizing, signal_handler, format_file_size, bin_to_hex, Error +from .helpers import daemonizing, signal_handler, format_file_size, bin_to_hex, Error from .helpers import HardLinkManager from .helpers import msgpack from .helpers.lrucache import LRUCache @@ -70,7 +70,6 @@ from .item import Item from .platform import uid2user, gid2group from .platformflags import is_darwin from .repository import Repository -from .remote import RemoteRepository def fuse_main(): @@ -596,13 +595,10 @@ class FuseOperations(llfuse.Operations, FuseBackend): "that do not reflect the archive content." ) if not foreground: - if isinstance(self.repository_uncached, RemoteRepository): - daemonize() - else: - with daemonizing(show_rc=show_rc) as (old_id, new_id): - # local repo: the locking process' PID is changing, migrate it: - logger.debug("fuse: mount local repo, going to background: migrating lock.") - self.repository_uncached.migrate_lock(old_id, new_id) + with daemonizing(show_rc=show_rc) as (old_id, new_id): + # the locking process' PID is changing, migrate it: + logger.debug("fuse: mount repo, going to background: migrating lock.") + self.repository_uncached.migrate_lock(old_id, new_id) # If the file system crashes, we do not want to umount because in that # case the mountpoint suddenly appears to become empty. This can have diff --git a/src/borg/helpers/parseformat.py b/src/borg/helpers/parseformat.py index c8b6c417b..23b0b13be 100644 --- a/src/borg/helpers/parseformat.py +++ b/src/borg/helpers/parseformat.py @@ -600,7 +600,7 @@ class Location: rclone_re = re.compile(r"(?Prclone):(?P(.*))", re.VERBOSE) sl = "/" if is_win32 else "" - file_or_socket_re = re.compile(r"(?P(file|socket))://" + sl + abs_path_re, re.VERBOSE) + file_re = re.compile(r"(?Pfile)://" + sl + abs_path_re, re.VERBOSE) local_re = re.compile(local_path_re, re.VERBOSE) @@ -666,7 +666,7 @@ class Location: self.proto = m.group("proto") self.path = m.group("path") return True - m = self.file_or_socket_re.match(text) + m = self.file_re.match(text) if m: self.proto = m.group("proto") self.path = os.path.normpath(m.group("path")) @@ -709,7 +709,7 @@ class Location: return self._host.lstrip("[").rstrip("]") def canonical_path(self): - if self.proto in ("file", "socket"): + if self.proto == "file": return self.path if self.proto == "rclone": return f"{self.proto}:{self.path}" @@ -1342,11 +1342,10 @@ class BorgJsonEncoder(json.JSONEncoder): from ..legacy.repository import LegacyRepository from ..repository import Repository from ..legacy.remote import LegacyRemoteRepository - from ..remote import RemoteRepository from ..archive import Archive from ..cache import AdHocWithFilesCache - if isinstance(o, (LegacyRepository, LegacyRemoteRepository)) or isinstance(o, (Repository, RemoteRepository)): + if isinstance(o, (LegacyRepository, LegacyRemoteRepository)) or isinstance(o, Repository): return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()} if isinstance(o, Archive): return o.info() diff --git a/src/borg/hlfuse.py b/src/borg/hlfuse.py index 08c069208..b5f106d0d 100644 --- a/src/borg/hlfuse.py +++ b/src/borg/hlfuse.py @@ -24,7 +24,7 @@ logger = create_logger() from .archiver._common import build_matcher, build_filter from .archive import Archive, get_item_uid_gid from .hashindex import FuseVersionsIndex -from .helpers import daemonize, daemonizing, signal_handler, bin_to_hex, Error +from .helpers import daemonizing, signal_handler, bin_to_hex, Error from .helpers import HardLinkManager from .helpers import msgpack from .helpers.lrucache import LRUCache @@ -32,7 +32,6 @@ from .item import Item from .platform import uid2user, gid2group from .platformflags import is_darwin from .repository import Repository -from .remote import RemoteRepository BLOCK_SIZE = 512 # Standard filesystem block size for st_blocks and statfs DEBUG_LOG: str | None = None # os.path.join(os.getcwd(), "fuse_debug.log") @@ -529,12 +528,9 @@ class borgfs(hlfuse.Operations, FuseBackend): # hlfuse.FUSE will block if foreground=True, otherwise it returns immediately if not foreground: # Background mode: daemonize first, then start FUSE (blocking) - if isinstance(self.repository, RemoteRepository): - daemonize() - else: - with daemonizing(show_rc=show_rc) as (old_id, new_id): - logger.debug("fuse: mount local repo, going to background: migrating lock.") - self.repository.migrate_lock(old_id, new_id) + with daemonizing(show_rc=show_rc) as (old_id, new_id): + logger.debug("fuse: mount repo, going to background: migrating lock.") + self.repository.migrate_lock(old_id, new_id) # Run the FUSE main loop in foreground (we might be daemonized already or not) with signal_handler("SIGUSR1", self.sig_info_handler), signal_handler("SIGINFO", self.sig_info_handler): diff --git a/src/borg/remote.py b/src/borg/remote.py index 7f4589406..786fc390e 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -1,21 +1,15 @@ -import atexit import errno -import functools import inspect import logging import os import queue import select -import shlex import shutil -import socket import struct import sys import tempfile -import textwrap import time import traceback -from subprocess import Popen, PIPE from xxhash import xxh64 @@ -23,45 +17,22 @@ import borg.logger from . import __version__ from .compress import Compressor from .constants import * # NOQA -from .helpers import Error, ErrorWithTraceback, IntegrityError +from .helpers import Error, IntegrityError from .helpers import bin_to_hex from .helpers import get_limited_unpacker -from .helpers import replace_placeholders from .helpers import sysinfo from .helpers import format_file_size from .helpers import safe_unlink -from .helpers import prepare_subprocess_env, ignore_sigint -from .helpers import get_socket_filename -from .fslocking import LockTimeout, NotLocked, NotMyLock, LockFailed from .logger import create_logger, borg_serve_log_queue -from .manifest import NoManifestError from .helpers import msgpack from .repository import Repository, StoreObjectNotFound -from .version import parse_version, format_version -from .helpers.datastruct import EfficientCollectionQueue -from .platform import is_win32 +from .version import parse_version logger = create_logger(__name__) BORG_VERSION = parse_version(__version__) MSGID, MSG, ARGS, RESULT, LOG = "i", "m", "a", "r", "l" -MAX_INFLIGHT = 100 - -RATELIMIT_PERIOD = 0.1 - - -class ConnectionClosed(Error): - """Connection closed by remote host""" - - exit_mcode = 80 - - -class ConnectionClosedWithHint(ConnectionClosed): - """Connection closed by remote host. {}""" - - exit_mcode = 81 - class PathNotAllowed(Error): """Repository path not allowed: {}""" @@ -81,41 +52,10 @@ class UnexpectedRPCDataFormatFromClient(Error): exit_mcode = 85 -class UnexpectedRPCDataFormatFromServer(Error): - """Got unexpected RPC data format from server:\n{}""" - - exit_mcode = 86 - - def __init__(self, data): - try: - data = data.decode()[:128] - except UnicodeDecodeError: - data = data[:128] - data = ["%02X" % byte for byte in data] - data = textwrap.fill(" ".join(data), 16 * 3) - super().__init__(data) - - -class ConnectionBrokenWithHint(Error): - """Connection to remote host is broken. {}""" - - exit_mcode = 87 - - # Protocol compatibility: -# In general the server is responsible for rejecting too old clients and the client it responsible for rejecting -# too old servers. This ensures that the knowledge what is compatible is always held by the newer component. -# -# For the client the return of the negotiate method is a dict which includes the server version. -# -# All method calls on the remote repository object must be allowlisted in RepositoryServer.rpc_methods and have api -# stubs in RemoteRepository*. The @api decorator on these stubs is used to set server version requirements. -# -# Method parameters are identified only by name and never by position. Unknown parameters are ignored by the server. -# If a new parameter is important and may not be ignored, on the client a parameter specific version requirement needs -# to be added. -# When parameters are removed, they need to be preserved as defaulted parameters on the client stubs so that older -# servers still get compatible input. +# borg only serves legacy (borg 1.x / v1) repositories over ssh:// now (current repositories use rest://). +# The legacy client lives in borg.legacy.remote (LegacyRemoteRepository); this server keeps the legacy +# RPC method allowlist and opens repositories using LegacyRepository. class RepositoryServer: # pragma: no cover @@ -140,49 +80,14 @@ class RepositoryServer: # pragma: no cover "get_manifest", # borg2 LegacyRepository has this ) - _rpc_methods = ( # Repository - "__len__", - "check", - "delete", - "destroy", - "get", - "list", - "negotiate", - "open", - "close", - "info", - "put", - "save_key", - "load_key", - "break_lock", - "inject_exception", - "get_manifest", - "put_manifest", - "store_list", - "store_load", - "store_store", - "store_delete", - "store_move", - ) - - def __init__(self, restrict_to_paths, restrict_to_repositories, use_socket, permissions=None): + def __init__(self, restrict_to_paths, restrict_to_repositories, permissions=None): self.repository = None self.RepoCls = None self.rpc_methods = ("open", "close", "negotiate") self.restrict_to_paths = restrict_to_paths self.restrict_to_repositories = restrict_to_repositories self.permissions = permissions - # This flag is parsed from the serve command line via Archiver.do_serve, - # i.e. it reflects local system policy and generally ranks higher than - # whatever the client wants, except when initializing a new repository - # (see RepositoryServer.open below). self.client_version = None # we update this after client sends version information - if use_socket is False: - self.socket_path = None - elif use_socket is True: # --socket - self.socket_path = get_socket_filename() - else: # --socket=/some/path - self.socket_path = use_socket def filter_args(self, f, kwargs): """Remove unknown named parameters from call, because client did (implicitly) say it's ok.""" @@ -308,38 +213,10 @@ class RepositoryServer: # pragma: no cover shutdown_serve = True continue - if self.socket_path: # server for socket:// connections - try: - # remove any left-over socket file - os.unlink(self.socket_path) - except OSError: - if os.path.exists(self.socket_path): - raise - sock_dir = os.path.dirname(self.socket_path) - os.makedirs(sock_dir, exist_ok=True) - pid_file = self.socket_path.removesuffix(".sock") + ".pid" - pid = os.getpid() - with open(pid_file, "w") as f: - f.write(str(pid)) - atexit.register(functools.partial(os.remove, pid_file)) - atexit.register(functools.partial(os.remove, self.socket_path)) - sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) - sock.bind(self.socket_path) # this creates the socket file in the fs - sock.listen(0) # no backlog - os.chmod(self.socket_path, mode=0o0770) # group members may use the socket, too. - print(f"borg serve: PID {pid}, listening on socket {self.socket_path} ...", file=sys.stderr) - - while True: - connection, client_address = sock.accept() - print(f"Accepted a connection on socket {self.socket_path} ...", file=sys.stderr) - self.stdin_fd = connection.makefile("rb").fileno() - self.stdout_fd = connection.makefile("wb").fileno() - inner_serve() - print(f"Finished with connection on socket {self.socket_path} .", file=sys.stderr) - else: # server for one ssh:// connection - self.stdin_fd = sys.stdin.fileno() - self.stdout_fd = sys.stdout.fileno() - inner_serve() + # server for one ssh:// connection + self.stdin_fd = sys.stdin.fileno() + self.stdout_fd = sys.stdout.fileno() + inner_serve() def negotiate(self, client_data): if isinstance(client_data, dict): @@ -357,13 +234,11 @@ class RepositoryServer: # pragma: no cover return path def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, v1_legacy=False): - if v1_legacy: - from .legacy.repository import LegacyRepository + # borg only serves legacy (v1) repositories now; current repositories are accessed via rest://. + from .legacy.repository import LegacyRepository - self.RepoCls = LegacyRepository - else: - self.RepoCls = Repository - self.rpc_methods = self._legacy_rpc_methods if v1_legacy else self._rpc_methods + self.RepoCls = LegacyRepository + self.rpc_methods = self._legacy_rpc_methods logging.debug("Resolving repository path %r", path) path = self._resolve_path(path) logging.debug("Resolved repository path to %r", path) @@ -386,8 +261,6 @@ class RepositoryServer: # pragma: no cover else: raise PathNotAllowed(path) kwargs = dict(lock_wait=lock_wait, lock=lock, exclusive=exclusive, send_log_cb=self.send_queued_log) - if not v1_legacy: - kwargs["permissions"] = self.permissions self.repository = self.RepoCls(path, create, **kwargs) self.repository.__enter__() # clean exit handled by serve() method return self.repository.id @@ -422,661 +295,6 @@ class RepositoryServer: # pragma: no cover 0 // 0 -class SleepingBandwidthLimiter: - def __init__(self, limit): - if limit: - self.ratelimit = int(limit * RATELIMIT_PERIOD) - self.ratelimit_last = time.monotonic() - self.ratelimit_quota = self.ratelimit - else: - self.ratelimit = None - - def write(self, fd, to_send): - if self.ratelimit: - now = time.monotonic() - if self.ratelimit_last + RATELIMIT_PERIOD <= now: - self.ratelimit_quota += self.ratelimit - if self.ratelimit_quota > 2 * self.ratelimit: - self.ratelimit_quota = 2 * self.ratelimit - self.ratelimit_last = now - if self.ratelimit_quota == 0: - tosleep = self.ratelimit_last + RATELIMIT_PERIOD - now - time.sleep(tosleep) - self.ratelimit_quota += self.ratelimit - self.ratelimit_last = time.monotonic() - if len(to_send) > self.ratelimit_quota: - to_send = to_send[: self.ratelimit_quota] - try: - written = os.write(fd, to_send) - except BrokenPipeError: - raise ConnectionBrokenWithHint("Broken Pipe") from None - if self.ratelimit: - self.ratelimit_quota -= written - return written - - -def api(*, since, **kwargs_decorator): - """Check version requirements and use self.call to do the remote method call. - - specifies the version in which borg introduced this method. - Calling this method when connected to an older version will fail without transmitting anything to the server. - - Further kwargs can be used to encode version specific restrictions: - - is the value resulting in the behaviour before introducing the new parameter. - If a previous hardcoded behaviour is parameterized in a version, this allows calls that use the previously - hardcoded behaviour to pass through and generates an error if another behaviour is requested by the client. - E.g. when 'append_only' was introduced in 1.0.7 the previous behaviour was what now is append_only=False. - Thus @api(..., append_only={'since': parse_version('1.0.7'), 'previously': False}) allows calls - with append_only=False for all version but rejects calls using append_only=True on versions older than 1.0.7. - - is a flag to set the behaviour if an old version is called the new way. - If set to True, the method is called without the (not yet supported) parameter (this should be done if that is the - more desirable behaviour). If False, an exception is generated. - E.g. before 'threshold' was introduced in 1.2.0a8, a hardcoded threshold of 0.1 was used in commit(). - """ - - def decorator(f): - @functools.wraps(f) - def do_rpc(self, *args, **kwargs): - sig = inspect.signature(f) - bound_args = sig.bind(self, *args, **kwargs) - named = {} # Arguments for the remote process - extra = {} # Arguments for the local process - for name, param in sig.parameters.items(): - if name == "self": - continue - if name in bound_args.arguments: - if name == "wait": - extra[name] = bound_args.arguments[name] - else: - named[name] = bound_args.arguments[name] - else: - if param.default is not param.empty: - named[name] = param.default - - if self.server_version < since: - raise self.RPCServerOutdated(f.__name__, format_version(since)) - - for name, restriction in kwargs_decorator.items(): - if restriction["since"] <= self.server_version: - continue - if "previously" in restriction and named[name] == restriction["previously"]: - continue - if restriction.get("dontcare", False): - continue - - raise self.RPCServerOutdated( - f"{f.__name__} {name}={named[name]!s}", format_version(restriction["since"]) - ) - - return self.call(f.__name__, named, **extra) - - return do_rpc - - return decorator - - -class RemoteRepository: - extra_test_args = [] # type: ignore - - class RPCError(Exception): - def __init__(self, unpacked): - # unpacked has keys: 'exception_args', 'exception_full', 'exception_short', 'sysinfo' - self.unpacked = unpacked - - def get_message(self): - return "\n".join(self.unpacked["exception_short"]) - - @property - def traceback(self): - return self.unpacked.get("exception_trace", True) - - @property - def exception_class(self): - return self.unpacked["exception_class"] - - @property - def exception_full(self): - return "\n".join(self.unpacked["exception_full"]) - - @property - def sysinfo(self): - return self.unpacked["sysinfo"] - - class RPCServerOutdated(Error): - """Borg server is too old for {}. Required version {}""" - - exit_mcode = 84 - - @property - def method(self): - return self.args[0] - - @property - def required_version(self): - return self.args[1] - - def __init__(self, location, create=False, exclusive=False, lock_wait=1.0, lock=True, args=None): - self.location = self._location = location - self.preload_ids = [] - self.msgid = 0 - self.rx_bytes = 0 - self.tx_bytes = 0 - self.to_send = EfficientCollectionQueue(1024 * 1024, bytes) - self.stdin_fd = self.stdout_fd = self.stderr_fd = None - self.stderr_received = b"" # incomplete stderr line bytes received (no \n yet) - self.chunkid_to_msgids = {} - self.ignore_responses = set() - self.responses = {} - self.async_responses = {} - self.shutdown_time = None - self.ratelimit = SleepingBandwidthLimiter(args.upload_ratelimit * 1024 if args and args.upload_ratelimit else 0) - self.upload_buffer_size_limit = args.upload_buffer * 1024 * 1024 if args and args.upload_buffer else 0 - self.unpacker = get_limited_unpacker("client") - self.server_version = None # we update this after server sends its version - self.p = self.sock = None - self._args = args - if self.location.proto == "ssh": - testing = location.host == "__testsuite__" - # when testing, we invoke and talk to a borg process directly (no ssh). - # when not testing, we invoke the system-installed ssh binary to talk to a remote borg. - env = prepare_subprocess_env(system=not testing) - borg_cmd = self.borg_cmd(args, testing) - if not testing: - borg_cmd = self.ssh_cmd(location) + borg_cmd - logger.debug("SSH command line: %s", borg_cmd) - # we do not want the ssh getting killed by Ctrl-C/SIGINT because it is needed for clean shutdown of borg. - self.p = Popen( - borg_cmd, - bufsize=0, - stdin=PIPE, - stdout=PIPE, - stderr=PIPE, - env=env, - preexec_fn=None if is_win32 else ignore_sigint, - ) # nosec B603 - self.stdin_fd = self.p.stdin.fileno() - self.stdout_fd = self.p.stdout.fileno() - self.stderr_fd = self.p.stderr.fileno() - self.r_fds = [self.stdout_fd, self.stderr_fd] - self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd] - elif self.location.proto == "socket": - if args.use_socket is False or args.use_socket is True: # nothing or --socket - socket_path = get_socket_filename() - else: # --socket=/some/path - socket_path = args.use_socket - self.sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) - try: - self.sock.connect(socket_path) # note: socket_path length is rather limited. - except FileNotFoundError: - self.sock = None - raise Error(f"The socket file {socket_path} does not exist.") - except ConnectionRefusedError: - self.sock = None - raise Error(f"There is no borg serve running for the socket file {socket_path}.") - self.stdin_fd = self.sock.makefile("wb").fileno() - self.stdout_fd = self.sock.makefile("rb").fileno() - self.stderr_fd = None - self.r_fds = [self.stdout_fd] - self.x_fds = [self.stdin_fd, self.stdout_fd] - else: - raise Error(f"Unsupported protocol {location.proto}") - - os.set_blocking(self.stdin_fd, False) - assert not os.get_blocking(self.stdin_fd) - os.set_blocking(self.stdout_fd, False) - assert not os.get_blocking(self.stdout_fd) - if self.stderr_fd is not None: - os.set_blocking(self.stderr_fd, False) - assert not os.get_blocking(self.stderr_fd) - - try: - try: - version = self.call("negotiate", {"client_data": {"client_version": BORG_VERSION}}) - except ConnectionClosed: - raise ConnectionClosedWithHint("Is borg working on the server?") from None - if isinstance(version, dict): - self.server_version = version["server_version"] - else: - raise Exception("Server insisted on using unsupported protocol version %s" % version) - - self.id = self.open( - path=self.location.path, create=create, lock_wait=lock_wait, lock=lock, exclusive=exclusive - ) - info = self.info() - self.version = info["version"] - - except Exception: - self.close() - raise - - def __del__(self): - if len(self.responses): - logging.debug("still %d cached responses left in RemoteRepository" % (len(self.responses),)) - if self.p or self.sock: - self.close() - assert False, "cleanup happened in RemoteRepository.__del__" - - def __repr__(self): - return f"<{self.__class__.__name__} {self.location.canonical_path()}>" - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - try: - if exc_type is not None: - self.shutdown_time = time.monotonic() + 30 - finally: - # in any case, we want to close the repo cleanly. - logger.debug( - "RemoteRepository: %s bytes sent, %s bytes received, %d messages sent", - format_file_size(self.tx_bytes), - format_file_size(self.rx_bytes), - self.msgid, - ) - self.close() - - @property - def id_str(self): - return bin_to_hex(self.id) - - def borg_cmd(self, args, testing): - """return a borg serve command line""" - # give some args/options to 'borg serve' process as they were given to us - opts = [] - if args is not None: - root_logger = logging.getLogger() - if root_logger.isEnabledFor(logging.DEBUG): - opts.append("--debug") - elif root_logger.isEnabledFor(logging.INFO): - opts.append("--info") - elif root_logger.isEnabledFor(logging.WARNING): - pass # warning is default - elif root_logger.isEnabledFor(logging.ERROR): - opts.append("--error") - elif root_logger.isEnabledFor(logging.CRITICAL): - opts.append("--critical") - else: - raise ValueError("log level missing, fix this code") - - # Tell the remote server about debug topics it may need to consider. - # Note that debug topics are usable for "spew" or "trace" logs which would - # be too plentiful to transfer for normal use, so the server doesn't send - # them unless explicitly enabled. - # - # Needless to say, if you do --debug-topic=repository.compaction, for example, - # with a 1.0.x server it won't work, because the server does not recognize the - # option. - # - # This is not considered a problem, since this is a debugging feature that - # should not be used for regular use. - for topic in args.debug_topics: - if "." not in topic: - topic = "borg.debug." + topic - if "repository" in topic: - opts.append("--debug-topic=%s" % topic) - env_vars = [] - if testing: - return env_vars + [sys.executable, "-m", "borg", "serve"] + opts + self.extra_test_args - else: # pragma: no cover - remote_path = args.remote_path or os.environ.get("BORG_REMOTE_PATH", "borg") - remote_path = replace_placeholders(remote_path) - return env_vars + [remote_path, "serve"] + opts - - def ssh_cmd(self, location): - """return a ssh command line that can be prefixed to a borg command line""" - rsh = self._args.rsh or os.environ.get("BORG_RSH", "ssh") - args = shlex.split(rsh) - if location.port: - args += ["-p", str(location.port)] - if location.user: - args.append(f"{location.user}@{location.host}") - else: - args.append("%s" % location.host) - return args - - def call(self, cmd, args, **kw): - for resp in self.call_many(cmd, [args], **kw): - return resp - - def call_many(self, cmd, calls, wait=True, is_preloaded=False, async_wait=True): - if not calls and cmd != "async_responses": - return - - assert not is_preloaded or cmd == "get", "is_preloaded is only supported for 'get'" - - def send_buffer(): - if self.to_send: - try: - written = self.ratelimit.write(self.stdin_fd, self.to_send.peek_front()) - self.tx_bytes += written - self.to_send.pop_front(written) - except OSError as e: - # io.write might raise EAGAIN even though select indicates - # that the fd should be writable. - # EWOULDBLOCK is added for defensive programming sake. - if e.errno not in [errno.EAGAIN, errno.EWOULDBLOCK]: - raise - - def pop_preload_msgid(chunkid): - msgid = self.chunkid_to_msgids[chunkid].pop(0) - if not self.chunkid_to_msgids[chunkid]: - del self.chunkid_to_msgids[chunkid] - return msgid - - def handle_error(unpacked): - if "exception_class" not in unpacked: - return - - error = unpacked["exception_class"] - args = unpacked["exception_args"] - - if error == "Error": - raise Error(args[0]) - elif error == "ErrorWithTraceback": - raise ErrorWithTraceback(args[0]) - elif error == "InvalidRepository": - raise Repository.InvalidRepository(self.location.processed) - elif error == "DoesNotExist": - raise Repository.DoesNotExist(self.location.processed) - elif error == "AlreadyExists": - raise Repository.AlreadyExists(self.location.processed) - elif error == "CheckNeeded": - raise Repository.CheckNeeded(self.location.processed) - elif error == "IntegrityError": - raise IntegrityError(args[0]) - elif error == "PathNotAllowed": - raise PathNotAllowed(args[0]) - elif error == "PathPermissionDenied": - raise Repository.PathPermissionDenied(args[0]) - elif error == "PathAlreadyExists": - raise Repository.PathAlreadyExists(args[0]) - elif error == "ParentPathDoesNotExist": - raise Repository.ParentPathDoesNotExist(args[0]) - elif error == "ObjectNotFound": - raise Repository.ObjectNotFound(args[0], self.location.processed) - elif error == "StoreObjectNotFound": - raise StoreObjectNotFound(args[0]) - elif error == "InvalidRPCMethod": - raise InvalidRPCMethod(args[0]) - elif error == "LockTimeout": - raise LockTimeout(args[0]) - elif error == "LockFailed": - raise LockFailed(args[0], args[1]) - elif error == "NotLocked": - raise NotLocked(args[0]) - elif error == "NotMyLock": - raise NotMyLock(args[0]) - elif error == "NoManifestError": - raise NoManifestError - elif error == "InsufficientFreeSpaceError": - raise Repository.InsufficientFreeSpaceError(args[0], args[1]) - elif error == "InvalidRepositoryConfig": - raise Repository.InvalidRepositoryConfig(self.location.processed, args[1]) - else: - raise self.RPCError(unpacked) - - calls = list(calls) - waiting_for = [] - maximum_to_send = 0 if wait else self.upload_buffer_size_limit - send_buffer() # Try to send data, as some cases (async_response) will never try to send data otherwise. - try: - while wait or calls: - logger.debug( - f"call_many: calls: {len(calls)} waiting_for: {len(waiting_for)} responses: {len(self.responses)}" - ) - if self.shutdown_time and time.monotonic() > self.shutdown_time: - # we are shutting this RemoteRepository down already, make sure we do not waste - # a lot of time in case a lot of async stuff is coming in or remote is gone or slow. - logger.debug( - "shutdown_time reached, shutting down with %d waiting_for and %d async_responses.", - len(waiting_for), - len(self.async_responses), - ) - return - while waiting_for: - try: - unpacked = self.responses.pop(waiting_for[0]) - waiting_for.pop(0) - handle_error(unpacked) - yield unpacked[RESULT] - if not waiting_for and not calls: - return - except KeyError: - break - if cmd == "async_responses": - while True: - try: - msgid, unpacked = self.async_responses.popitem() - except KeyError: - # there is nothing left what we already have received - if async_wait and self.ignore_responses: - # but do not return if we shall wait and there is something left to wait for: - break - else: - return - else: - handle_error(unpacked) - yield unpacked[RESULT] - if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT): - w_fds = [self.stdin_fd] - else: - w_fds = [] - r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1) - if x: - raise Exception("FD exception occurred") - for fd in r: - if fd is self.stdout_fd: - data = os.read(fd, BUFSIZE) - if not data: - raise ConnectionClosed() - self.rx_bytes += len(data) - self.unpacker.feed(data) - for unpacked in self.unpacker: - if not isinstance(unpacked, dict): - raise UnexpectedRPCDataFormatFromServer(data) - - lr_dict = unpacked.get(LOG) - if lr_dict is not None: - # Re-emit remote log messages locally. - _logger = logging.getLogger(lr_dict["name"]) - if _logger.isEnabledFor(lr_dict["level"]): - _logger.handle(logging.LogRecord(**lr_dict)) - continue - - msgid = unpacked[MSGID] - if msgid in self.ignore_responses: - self.ignore_responses.remove(msgid) - # async methods never return values, but may raise exceptions. - if "exception_class" in unpacked: - self.async_responses[msgid] = unpacked - else: - # we currently do not have async result values except "None", - # so we do not add them into async_responses. - if unpacked[RESULT] is not None: - self.async_responses[msgid] = unpacked - else: - self.responses[msgid] = unpacked - elif fd is self.stderr_fd: - data = os.read(fd, 32768) - if not data: - raise ConnectionClosed() - self.rx_bytes += len(data) - # deal with incomplete lines (may appear due to block buffering) - if self.stderr_received: - data = self.stderr_received + data - self.stderr_received = b"" - lines = data.splitlines(keepends=True) - if lines and not lines[-1].endswith((b"\r", b"\n")): - self.stderr_received = lines.pop() - # now we have complete lines in and any partial line in self.stderr_received. - _logger = logging.getLogger() - for line in lines: - # borg serve (remote/server side) should not emit stuff on stderr, - # but e.g. the ssh process (local/client side) might output errors there. - assert line.endswith((b"\r", b"\n")) - # something came in on stderr, log it to not lose it. - # decode late, avoid partial utf-8 sequences. - _logger.warning("stderr: " + line.decode().strip()) - if w: - while ( - (len(self.to_send) <= maximum_to_send) - and (calls or self.preload_ids) - and len(waiting_for) < MAX_INFLIGHT - ): - if calls: - args = calls[0] - if cmd == "get" and args["id"] in self.chunkid_to_msgids: - # we have a get command and have already sent a request for this chunkid when - # doing preloading, so we know the msgid of the response we are waiting for: - waiting_for.append(pop_preload_msgid(args["id"])) - del calls[0] - elif not is_preloaded: - # make and send a request (already done if we are using preloading) - self.msgid += 1 - waiting_for.append(self.msgid) - del calls[0] - self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})) - if not self.to_send and self.preload_ids: - chunk_id = self.preload_ids.pop(0) - # for preloading chunks, the raise_missing behaviour is defined HERE, - # not in the get_many / fetch_many call that later fetches the preloaded chunks. - args = {"id": chunk_id, "raise_missing": False} - self.msgid += 1 - self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid) - self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args})) - - send_buffer() - finally: - self.ignore_responses |= set(waiting_for) # we lose order here - if is_preloaded: - for call in calls: - chunkid = call["id"] - if chunkid in self.chunkid_to_msgids: - self.ignore_responses.add(pop_preload_msgid(chunkid)) - - @api(since=parse_version("1.0.0"), v1_legacy={"since": parse_version("2.0.0b21"), "previously": True}) - def open(self, path, create=False, lock_wait=None, lock=True, exclusive=False, v1_legacy=False): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("2.0.0a3")) - def info(self): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0"), max_duration={"since": parse_version("1.2.0a4"), "previously": 0}) - def check(self, repair=False, max_duration=0): - """actual remoting is done via self.call in the @api decorator""" - - @api( - since=parse_version("1.0.0"), - compact={"since": parse_version("1.2.0a0"), "previously": True, "dontcare": True}, - threshold={"since": parse_version("1.2.0a8"), "previously": 0.1, "dontcare": True}, - ) - def commit(self, compact=True, threshold=0.1): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0")) - def rollback(self): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0")) - def destroy(self): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0")) - def __len__(self): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0")) - def list(self, limit=None, marker=None): - """actual remoting is done via self.call in the @api decorator""" - - def get(self, id, read_data=True, raise_missing=True): - for resp in self.get_many([id], read_data=read_data, raise_missing=raise_missing): - return resp - - def get_many(self, ids, read_data=True, is_preloaded=False, raise_missing=True): - yield from self.call_many( - "get", - [{"id": id, "read_data": read_data, "raise_missing": raise_missing} for id in ids], - is_preloaded=is_preloaded, - ) - - @api(since=parse_version("1.0.0")) - def put(self, id, data, wait=True): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0")) - def delete(self, id, wait=True): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0")) - def save_key(self, keydata): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0")) - def load_key(self): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("1.0.0")) - def break_lock(self): - """actual remoting is done via self.call in the @api decorator""" - - def close(self): - if self.p or self.sock: - self.call("close", {}, wait=True) - if self.p: - self.p.stdin.close() - self.p.stdout.close() - self.p.wait() - self.p = None - if self.sock: - try: - self.sock.shutdown(socket.SHUT_RDWR) - except OSError as e: - if e.errno != errno.ENOTCONN: - raise - self.sock.close() - self.sock = None - - def async_response(self, wait=True): - for resp in self.call_many("async_responses", calls=[], wait=True, async_wait=wait): - return resp - - def preload(self, ids): - self.preload_ids += ids - - @api(since=parse_version("2.0.0b8")) - def get_manifest(self): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("2.0.0b8")) - def put_manifest(self, data): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("2.0.0b8"), deleted={"since": parse_version("2.0.0b14"), "previously": False}) - def store_list(self, name, *, deleted=False): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("2.0.0b8")) - def store_load(self, name): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("2.0.0b8")) - def store_store(self, name, value): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("2.0.0b8"), deleted={"since": parse_version("2.0.0b14"), "previously": False}) - def store_delete(self, name, *, deleted=False): - """actual remoting is done via self.call in the @api decorator""" - - @api(since=parse_version("2.0.0b14")) - def store_move(self, name, new_name=None, *, delete=False, undelete=False, deleted=False): - """actual remoting is done via self.call in the @api decorator""" - - class RepositoryNoCache: """A not caching Repository wrapper, passes through to repository. @@ -1280,7 +498,7 @@ def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None csize = meta.get("csize", len(data)) return csize, decrypted - if isinstance(repository, RemoteRepository) or force_cache: + if force_cache: return RepositoryCache(repository, pack, unpack, transform) else: return RepositoryNoCache(repository, transform) diff --git a/src/borg/testsuite/archiver/__init__.py b/src/borg/testsuite/archiver/__init__.py index 3169844e4..8b4738c2b 100644 --- a/src/borg/testsuite/archiver/__init__.py +++ b/src/borg/testsuite/archiver/__init__.py @@ -24,7 +24,6 @@ from ...helpers import init_ec_warnings from ...logger import flush_logging from ...manifest import Manifest from ...platform import get_flags -from ...remote import RemoteRepository from ...repository import Repository from .. import has_lchflags, has_mknod, is_utime_fully_supported, have_fuse_mtime_ns, st_mtime_ns_round, filter_xattrs from .. import changedir, ENOATTR # NOQA @@ -178,10 +177,7 @@ def open_archive(repo_path, name): def open_repository(archiver): if archiver.get_kind() == "remote": - location = Location(archiver.repository_location) - if location.proto == "rest": - return Repository(location, exclusive=True) - return RemoteRepository(location) + return Repository(Location(archiver.repository_location), exclusive=True) else: return Repository(archiver.repository_path, exclusive=True) diff --git a/src/borg/testsuite/archiver/check_cmd_test.py b/src/borg/testsuite/archiver/check_cmd_test.py index 0c23b6ef7..c9292687c 100644 --- a/src/borg/testsuite/archiver/check_cmd_test.py +++ b/src/borg/testsuite/archiver/check_cmd_test.py @@ -9,7 +9,6 @@ from ...archive import ChunkBuffer from ...constants import * # NOQA from ...helpers import bin_to_hex, msgpack from ...manifest import Manifest -from ...remote import RemoteRepository from ...repository import Repository from ..repository_test import fchunk from . import cmd, src_file, create_src_archive, open_archive, generate_archiver_tests, RK_ENCRYPTION @@ -208,7 +207,7 @@ def test_missing_manifest(archivers, request): check_cmd_setup(archiver) archive, repository = open_archive(archiver.repository_path, "archive1") with repository: - if isinstance(repository, (Repository, RemoteRepository)): + if isinstance(repository, Repository): repository.store_delete("config/manifest") else: repository.delete(Manifest.MANIFEST_ID) diff --git a/src/borg/testsuite/archiver/checks_test.py b/src/borg/testsuite/archiver/checks_test.py index 88efa31c5..9327feb9d 100644 --- a/src/borg/testsuite/archiver/checks_test.py +++ b/src/borg/testsuite/archiver/checks_test.py @@ -1,6 +1,5 @@ import os import shutil -from unittest.mock import patch import pytest @@ -9,7 +8,6 @@ from ...constants import * # NOQA from ...helpers import Location, get_security_dir, bin_to_hex from ...helpers import EXIT_ERROR from ...manifest import Manifest, MandatoryFeatureUnsupported -from ...remote import RemoteRepository, PathNotAllowed from ...repository import Repository from .. import llfuse from .. import changedir @@ -277,49 +275,6 @@ def test_unknown_mandatory_feature_in_cache(archivers, request): # Begin Remote Tests -def test_remote_repo_restrict_to_path(remote_archiver): - if remote_archiver.repository_location.startswith("rest://"): - pytest.skip("Not applicable for rest:// protocol") - original_location, repo_path = remote_archiver.repository_location, remote_archiver.repository_path - # restricted to repo directory itself: - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", repo_path]): - cmd(remote_archiver, "repo-create", RK_ENCRYPTION) - # restricted to repo directory itself, fail for other directories with same prefix: - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", repo_path]): - with pytest.raises(PathNotAllowed): - remote_archiver.repository_location = original_location + "_0" - cmd(remote_archiver, "repo-create", RK_ENCRYPTION) - # restricted to a completely different path: - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", "/foo"]): - with pytest.raises(PathNotAllowed): - remote_archiver.repository_location = original_location + "_1" - cmd(remote_archiver, "repo-create", RK_ENCRYPTION) - path_prefix = os.path.dirname(repo_path) - # restrict to repo directory's parent directory: - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", path_prefix]): - remote_archiver.repository_location = original_location + "_2" - cmd(remote_archiver, "repo-create", RK_ENCRYPTION) - # restrict to repo directory's parent directory and another directory: - with patch.object( - RemoteRepository, "extra_test_args", ["--restrict-to-path", "/foo", "--restrict-to-path", path_prefix] - ): - remote_archiver.repository_location = original_location + "_3" - cmd(remote_archiver, "repo-create", RK_ENCRYPTION) - - -def test_remote_repo_restrict_to_repository(remote_archiver): - if remote_archiver.repository_location.startswith("rest://"): - pytest.skip("Not applicable for rest:// protocol") - repo_path = remote_archiver.repository_path - # restricted to repo directory itself: - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-repository", repo_path]): - cmd(remote_archiver, "repo-create", RK_ENCRYPTION) - parent_path = os.path.join(repo_path, "..") - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-repository", parent_path]): - with pytest.raises(PathNotAllowed): - cmd(remote_archiver, "repo-create", RK_ENCRYPTION) - - def test_remote_repo_strip_components_doesnt_leak(remote_archiver): cmd(remote_archiver, "repo-create", RK_ENCRYPTION) create_regular_file(remote_archiver.input_path, "dir/file", contents=b"test file contents 1") diff --git a/src/borg/testsuite/archiver/serve_cmd_test.py b/src/borg/testsuite/archiver/serve_cmd_test.py deleted file mode 100644 index 420399af5..000000000 --- a/src/borg/testsuite/archiver/serve_cmd_test.py +++ /dev/null @@ -1,55 +0,0 @@ -import os -import subprocess -import tempfile -import time - -import pytest -import platformdirs - -from . import exec_cmd -from ...platformflags import is_win32 -from ...helpers import get_runtime_dir - - -def have_a_short_runtime_dir(mp): - # Under pytest, we use BORG_BASE_DIR to keep stuff away from the user's normal Borg directories. - # This leads to a very long get_runtime_dir() path — too long for a socket file! - # Thus, we override that again via BORG_RUNTIME_DIR to get a shorter path. - mp.setenv("BORG_RUNTIME_DIR", os.path.join(platformdirs.user_runtime_dir(), "pytest")) - - -@pytest.fixture -def serve_socket(monkeypatch): - have_a_short_runtime_dir(monkeypatch) - # Use a random unique socket filename, so tests can run in parallel. - socket_file = tempfile.mktemp(suffix=".sock", prefix="borg-", dir=get_runtime_dir()) - with subprocess.Popen(["borg", "serve", f"--socket={socket_file}"]) as p: - while not os.path.exists(socket_file): - time.sleep(0.01) # wait until the socket server has started - yield socket_file - p.terminate() - - -@pytest.mark.skipif(is_win32, reason="hangs on win32") -def test_with_socket(serve_socket, tmpdir, monkeypatch): - have_a_short_runtime_dir(monkeypatch) - repo_path = str(tmpdir.join("repo")) - - ret, output = exec_cmd( - f"--socket={serve_socket}", f"--repo=socket://{repo_path}", "repo-create", "--encryption=none" - ) - assert ret == 0 - - ret, output = exec_cmd(f"--socket={serve_socket}", f"--repo=socket://{repo_path}", "repo-info") - assert ret == 0 - assert "Repository ID: " in output - - monkeypatch.setenv("BORG_DELETE_I_KNOW_WHAT_I_AM_DOING", "YES") - ret, output = exec_cmd(f"--socket={serve_socket}", f"--repo=socket://{repo_path}", "repo-delete") - assert ret == 0 - - -@pytest.mark.skipif(is_win32, reason="hangs on win32") -def test_socket_permissions(serve_socket): - st = os.stat(serve_socket) - assert st.st_mode & 0o0777 == 0o0770 # user and group are permitted to use the socket diff --git a/src/borg/testsuite/helpers/parseformat_test.py b/src/borg/testsuite/helpers/parseformat_test.py index 0d6ab162c..93bf0b028 100644 --- a/src/borg/testsuite/helpers/parseformat_test.py +++ b/src/borg/testsuite/helpers/parseformat_test.py @@ -246,12 +246,10 @@ class TestLocationWithoutEnv: def test_socket(self, monkeypatch): monkeypatch.delenv("BORG_REPO", raising=False) + # socket:// is no longer supported and must be rejected as an invalid location. url = "socket:///c:/repo/path" if is_win32 else "socket:///repo/path" - path = "c:/repo/path" if is_win32 else "/repo/path" - assert ( - repr(Location(url)) - == f"Location(proto='socket', user=None, pass=None, host=None, port=None, path='{path}')" - ) + with pytest.raises(ValueError): + Location(url) def test_file(self, monkeypatch): monkeypatch.delenv("BORG_REPO", raising=False) @@ -334,7 +332,6 @@ class TestLocationWithoutEnv: ] locations.insert(1, "c:/absolute/path" if is_win32 else "/absolute/path") locations.insert(2, "file:///c:/absolute/path" if is_win32 else "file:///absolute/path") - locations.insert(3, "socket:///c:/absolute/path" if is_win32 else "socket:///absolute/path") for location in locations: assert ( Location(location).canonical_path() == Location(Location(location).canonical_path()).canonical_path() diff --git a/src/borg/testsuite/remote_test.py b/src/borg/testsuite/remote_test.py index 6f01fc334..0eb0b79c3 100644 --- a/src/borg/testsuite/remote_test.py +++ b/src/borg/testsuite/remote_test.py @@ -1,13 +1,12 @@ import errno import os import io -import time from unittest.mock import patch import pytest from ..constants import ROBJ_FILE_STREAM -from ..remote import SleepingBandwidthLimiter, RepositoryCache, cache_if_remote +from ..remote import RepositoryCache, cache_if_remote from ..repository import Repository from ..crypto.key import PlaintextKey from ..helpers import IntegrityError @@ -17,60 +16,6 @@ from .repository_test import fchunk, pdchunk from .crypto.key_test import TestKey -class TestSleepingBandwidthLimiter: - def expect_write(self, fd, data): - self.expected_fd = fd - self.expected_data = data - - def check_write(self, fd, data): - assert fd == self.expected_fd - assert data == self.expected_data - return len(data) - - def test_write_unlimited(self, monkeypatch): - monkeypatch.setattr(os, "write", self.check_write) - - it = SleepingBandwidthLimiter(0) - self.expect_write(5, b"test") - it.write(5, b"test") - - def test_write(self, monkeypatch): - monkeypatch.setattr(os, "write", self.check_write) - monkeypatch.setattr(time, "monotonic", lambda: now) - monkeypatch.setattr(time, "sleep", lambda x: None) - - now = 100 - - it = SleepingBandwidthLimiter(100) # Bandwidth quota. - - # All fits - self.expect_write(5, b"test") - it.write(5, b"test") - - # Only partial write - self.expect_write(5, b"123456") - it.write(5, b"1234567890") - - # Sleeps - self.expect_write(5, b"123456") - it.write(5, b"123456") - - # Long time interval between writes - now += 10 - self.expect_write(5, b"1") - it.write(5, b"1") - - # Long time interval between writes, filling up the quota - now += 10 - self.expect_write(5, b"1") - it.write(5, b"1") - - # Long time interval between writes, filling up the quota to clip to the maximum - now += 10 - self.expect_write(5, b"1") - it.write(5, b"1") - - class TestRepositoryCache: @pytest.fixture def repository(self, tmpdir): diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index becdb3635..91a4b7e2a 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -1,13 +1,8 @@ -import logging import os -import sys import pytest -from ..helpers import Location from ..helpers import IntegrityError -from ..platformflags import is_win32 -from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed -from ..repository import Repository, StoreObjectNotFound, MAX_DATA_SIZE +from ..repository import Repository, MAX_DATA_SIZE from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION from .hashindex_test import H @@ -18,22 +13,14 @@ def repository(tmp_path): yield Repository(repository_location, exclusive=True, create=True) -@pytest.fixture() -def remote_repository(tmp_path): - if is_win32: - pytest.skip("Remote repository does not yet work on Windows.") - repository_location = Location("ssh://__testsuite__/" + os.fspath(tmp_path / "repository")) - yield RemoteRepository(repository_location, exclusive=True, create=True) - - def pytest_generate_tests(metafunc): - # Generate tests that run on both local and remote repositories. + # Generate tests that run on repositories. if "repo_fixtures" in metafunc.fixturenames: - metafunc.parametrize("repo_fixtures", ["repository", "remote_repository"]) + metafunc.parametrize("repo_fixtures", ["repository"]) def get_repository_from_fixture(repo_fixtures, request): - # Return the repository object from the fixture for tests that run on both local and remote repositories. + # Return the repository object from the fixture. return request.getfixturevalue(repo_fixtures) @@ -43,14 +30,7 @@ def reopen(repository, exclusive: bool | None = True, create=False): raise RuntimeError("Repo must be closed before a reopen. Cannot support nested repository contexts.") return Repository(repository._location, exclusive=exclusive, create=create) - if isinstance(repository, RemoteRepository): - if repository.p is not None or repository.sock is not None: - raise RuntimeError("Remote repo must be closed before a reopen. Cannot support nested repository contexts.") - return RemoteRepository(repository.location, exclusive=exclusive, create=create) - - raise TypeError( - f"Invalid argument type. Expected 'Repository' or 'RemoteRepository', received '{type(repository).__name__}'." - ) + raise TypeError(f"Invalid argument type. Expected 'Repository', received '{type(repository).__name__}'.") def fchunk(data, meta=b"", chunk_id=b"\x00" * 32): @@ -149,125 +129,3 @@ def check(repository, repo_path, repair=False, status=True): # Make sure no tmp files are left behind tmp_files = [name for name in os.listdir(repo_path) if "tmp" in name] assert tmp_files == [], "Found tmp files" - - -def _get_mock_args(): - class MockArgs: - remote_path = "borg" - umask = 0o077 - debug_topics = [] - rsh = None - - def __contains__(self, item): - # to behave like argparse.Namespace - return hasattr(self, item) - - return MockArgs() - - -def test_remote_invalid_rpc(remote_repository): - with remote_repository: - with pytest.raises(InvalidRPCMethod): - remote_repository.call("__init__", {}) - - -def test_remote_rpc_exception_transport(remote_repository): - with remote_repository: - s1 = "test string" - - try: - remote_repository.call("inject_exception", {"kind": "DoesNotExist"}) - except Repository.DoesNotExist as e: - assert len(e.args) == 1 - assert e.args[0] == remote_repository.location.processed - - try: - remote_repository.call("inject_exception", {"kind": "AlreadyExists"}) - except Repository.AlreadyExists as e: - assert len(e.args) == 1 - assert e.args[0] == remote_repository.location.processed - - try: - remote_repository.call("inject_exception", {"kind": "CheckNeeded"}) - except Repository.CheckNeeded as e: - assert len(e.args) == 1 - assert e.args[0] == remote_repository.location.processed - - try: - remote_repository.call("inject_exception", {"kind": "IntegrityError"}) - except IntegrityError as e: - assert len(e.args) == 1 - assert e.args[0] == s1 - - try: - remote_repository.call("inject_exception", {"kind": "PathNotAllowed"}) - except PathNotAllowed as e: - assert len(e.args) == 1 - assert e.args[0] == "foo" - - try: - remote_repository.call("inject_exception", {"kind": "ObjectNotFound"}) - except Repository.ObjectNotFound as e: - assert len(e.args) == 2 - assert e.args[0] == s1 - assert e.args[1] == remote_repository.location.processed - - try: - remote_repository.call("inject_exception", {"kind": "StoreObjectNotFound"}) - except StoreObjectNotFound as e: - assert len(e.args) == 1 - assert e.args[0] == s1 - - try: - remote_repository.call("inject_exception", {"kind": "InvalidRPCMethod"}) - except InvalidRPCMethod as e: - assert len(e.args) == 1 - assert e.args[0] == s1 - - try: - remote_repository.call("inject_exception", {"kind": "divide"}) - except RemoteRepository.RPCError as e: - assert e.unpacked - assert e.get_message().startswith("ZeroDivisionError:") - assert e.exception_class == "ZeroDivisionError" - assert len(e.exception_full) > 0 - - -def test_remote_ssh_cmd(remote_repository): - with remote_repository: - args = _get_mock_args() - remote_repository._args = args - assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "example.com"] - assert remote_repository.ssh_cmd(Location("ssh://user@example.com/foo")) == ["ssh", "user@example.com"] - assert remote_repository.ssh_cmd(Location("ssh://user@example.com:1234/foo")) == [ - "ssh", - "-p", - "1234", - "user@example.com", - ] - os.environ["BORG_RSH"] = "ssh --foo" - assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "--foo", "example.com"] - - -def test_remote_borg_cmd(remote_repository): - with remote_repository: - assert remote_repository.borg_cmd(None, testing=True) == [sys.executable, "-m", "borg", "serve"] - args = _get_mock_args() - # XXX without next line we get spurious test fails when using pytest-xdist, root cause unknown: - logging.getLogger().setLevel(logging.INFO) - # note: test logger is on info log level, so --info gets added automagically - assert remote_repository.borg_cmd(args, testing=False) == ["borg", "serve", "--info"] - args.remote_path = "borg-0.28.2" - assert remote_repository.borg_cmd(args, testing=False) == ["borg-0.28.2", "serve", "--info"] - args.debug_topics = ["something_client_side", "repository_compaction"] - assert remote_repository.borg_cmd(args, testing=False) == [ - "borg-0.28.2", - "serve", - "--info", - "--debug-topic=borg.debug.repository_compaction", - ] - args = _get_mock_args() - assert remote_repository.borg_cmd(args, testing=False) == ["borg", "serve", "--info"] - args.rsh = "ssh -i foo" - remote_repository._args = args - assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "-i", "foo", "example.com"]