remove ssh:// and socket:// remote repository for current repos

The modern client/server transport (RemoteRepository served by `borg serve`
over an msgpack RPC protocol) is now redundant for current (borg 2) repos:
its functionality is replaced by rest:// (which can tunnel over ssh to a
remote borgstore REST server).

Remove the modern RemoteRepository (both ssh:// and socket://) entirely.
Legacy v1 (borg 1.x) repos remain reachable over ssh:// via the separate
LegacyRemoteRepository client, and `borg serve` / RepositoryServer is kept,
trimmed to the legacy-only path, so a remote borg2 can still serve a v1 repo
for `borg transfer --from-borg1`.

Details:
- remote.py: delete RemoteRepository, SleepingBandwidthLimiter and the `api`
  decorator; trim RepositoryServer to legacy-only (drop modern _rpc_methods,
  socket serving, non-legacy open() branch); keep cache_if_remote /
  RepositoryCache / RepositoryNoCache (used by all repos).
- get_repository(): non-legacy ssh:// now raises a clear "use rest://" error;
  socket:// route and the global --socket option removed.
- parseformat: drop the socket:// scheme (now an invalid location).
- borg serve: keep the command (serves legacy v1 ssh only); update epilog.
- borg version: drop modern remote query; keep legacy ssh path.
- update isinstance/import sites (cache, archive, fuse/hlfuse, analyze/compact,
  archiver __init__ -> LegacyRemoteRepository.RPCError).
- tests/docs updated; obsolete socket serve test removed.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Thomas Waldmann 2026-06-08 08:04:59 +02:00
parent 83c3bc8a67
commit cac7237d3f
23 changed files with 101 additions and 1194 deletions

View file

@ -171,6 +171,10 @@ New features:
- WIP packs project, major repo format changes, you must create new repos! #8572
- rest:// repository URLs - connect via ssh to remote borgstore REST server,
talking http via stdio, #9593
- removed ssh:// and socket:// support for current repositories; use a rest://
repository instead (it can tunnel over ssh). ssh:// and ``borg serve`` remain
available only for legacy (borg 1.x / v1) repositories, e.g. for
``borg transfer --from-borg1 --other-repo ssh://...``.
- prune: show total vs matching archives in output, #9262
- prune: add --json option, #9222
- archive: preserve cwd archive metadata, #9495

View file

@ -8,6 +8,14 @@ Central repository server with Ansible or Salt
This section gives an example of how to set up a Borg repository server for multiple
clients.
.. note::
This example predates Borg 2 and uses the legacy ``ssh://`` transport (served
by ``borg serve``) and ``borg init``. With Borg 2, the ``ssh://`` transport is
only used for legacy borg 1.x (v1) repositories; for current repositories use a
``rest://`` repository instead (Borg connects via ssh and runs a borgstore REST
server on the remote host), and use ``borg repo-create`` instead of ``borg init``.
Machines
--------

View file

@ -64,7 +64,8 @@ If you only back up your own files, run it as your normal user (i.e. not root).
For a local repository always use the same user to invoke borg.
For a remote repository: always use e.g., ssh://borg@remote_host. You can use this
For a remote repository: always use e.g., rest://borg@remote_host (Borg connects
via ssh and runs a borgstore REST server on the remote). You can use this
from different local users; the remote user running borg and accessing the
repo will always be `borg`.
@ -142,7 +143,7 @@ backed up and that the ``prune`` command keeps and deletes the correct backups.
#!/bin/sh
# Setting this, so the repo does not need to be given on the commandline:
export BORG_REPO=ssh://username@example.com:2022/~/backup/main
export BORG_REPO=rest://username@example.com:2022/path/to/backup/main
# See the section "Passphrase notes" for more infos.
export BORG_PASSPHRASE='XYZl0ngandsecurepa_55_phrasea&&123'
@ -361,19 +362,21 @@ Remote repositories
Borg can initialize and access repositories on remote hosts if the
host is accessible using SSH. This is fastest and easiest when Borg
is installed on the remote host, in which case the following syntax is used::
is installed on the remote host, in which case a ``rest://`` repository URL is
used. Borg connects via SSH and runs a borgstore REST server on the remote host
(talking HTTP over stdio)::
$ borg -r ssh://user@hostname:port/path/to/repo repo-create ...
$ borg -r rest://user@hostname:port/path/to/repo repo-create ...
Note: Please see the usage chapter for a full documentation of repo URLs. Also
see :ref:`ssh_configuration` for recommended settings to avoid disconnects and hangs.
Remote operations over SSH can be automated with SSH keys. You can restrict the
use of the SSH keypair by prepending a forced command to the SSH public key in
the remote server's `authorized_keys` file. This example will start Borg
in server mode and limit it to a specific filesystem path::
.. note::
command="borg serve --restrict-to-path /path/to/repo",restrict ssh-rsa AAAAB3[...]
The legacy ``ssh://`` transport, served by ``borg serve`` on the remote host,
is now only used to access legacy borg 1.x (v1) repositories (e.g. via
``borg transfer --from-borg1 --other-repo ssh://...``). For current
repositories, use a ``rest://`` repository as shown above.
If it is not possible to install Borg on the remote host,
it is still possible to use the remote host to store a repository by
@ -530,7 +533,8 @@ Example with **borg extract**:
Difference when using a **remote borg backup server**:
It is basically all the same as with the local repository, but you need to
refer to the repo using a ``ssh://`` URL.
refer to the repo using a ``rest://`` URL (Borg connects via ssh and runs a
borgstore REST server on the remote host).
In the given example, ``borg`` is the user name used to log into the machine
``backup.example.org`` which runs ssh on port ``2222`` and has the borg repo
@ -544,6 +548,6 @@ case if unattended, automated backups were done).
::
borg -r ssh://borg@backup.example.org:2222/path/to/repo mount /mnt/borg
borg -r rest://borg@backup.example.org:2222/path/to/repo mount /mnt/borg
# or
borg -r ssh://borg@backup.example.org:2222/path/to/repo extract archive
borg -r rest://borg@backup.example.org:2222/path/to/repo extract archive

View file

@ -50,7 +50,7 @@ from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
from .item import Item, ArchiveItem, ItemDiff
from . import platform
from .platform import acl_get, acl_set, set_flags, get_flags, swidth
from .remote import RemoteRepository, cache_if_remote
from .remote import cache_if_remote
from .repository import Repository, NoManifestError
from .repoobj import RepoObj
@ -1773,7 +1773,7 @@ class ArchiveChecker:
:param oldest/newest: only check archives older/newer than timedelta from oldest/newest archive timestamp
:param verify_data: integrity verification of data referenced by archives
"""
if not isinstance(repository, (Repository, RemoteRepository)):
if not isinstance(repository, Repository):
logger.error("Checking legacy repositories is not supported.")
return False
logger.info("Starting archive consistency check...")

View file

@ -47,7 +47,7 @@ try:
from ..helpers import sig_int
from ..helpers import get_config_dir
from ..platformflags import is_msystem
from ..remote import RemoteRepository
from ..legacy.remote import LegacyRemoteRepository
from ..selftest import selftest
except BaseException:
# an unhandled exception in the try-block would cause the borg cli command to exit with rc 1 due to python's
@ -538,7 +538,7 @@ def sig_trace_handler(sig_no, stack): # pragma: no cover
def format_tb(exc):
qualname = type(exc).__qualname__
remote = isinstance(exc, RemoteRepository.RPCError)
remote = isinstance(exc, LegacyRemoteRepository.RPCError)
if remote:
prefix = "Borg server: "
trace_back = "\n".join(prefix + line for line in exc.exception_full.splitlines())
@ -632,7 +632,7 @@ def main(): # pragma: no cover
tb_log_level = logging.ERROR if e.traceback else logging.DEBUG
tb = format_tb(e)
exit_code = e.exit_code
except RemoteRepository.RPCError as e:
except LegacyRemoteRepository.RPCError as e:
important = e.traceback
msg = e.exception_full if important else e.get_message()
msgid = e.exception_class

View file

@ -13,7 +13,6 @@ from ..helpers.argparsing import SUPPRESS, PositiveInt
from ..helpers.nanorst import rst_to_terminal
from ..manifest import Manifest, AI_HUMAN_SORT_KEYS
from ..patterns import PatternMatcher
from ..remote import RemoteRepository
from ..repository import Repository
from ..repoobj import RepoObj
from ..patterns import (
@ -30,16 +29,19 @@ logger = create_logger(__name__)
def get_repository(location, *, create, exclusive, lock_wait, lock, args, v1_legacy):
if location.proto in ("ssh", "socket"):
if location.proto == "ssh":
if v1_legacy:
from ..legacy.remote import LegacyRemoteRepository
RemoteRepoCls = LegacyRemoteRepository
repository = LegacyRemoteRepository(
location, create=create, exclusive=exclusive, lock_wait=lock_wait, lock=lock, args=args
)
else:
RemoteRepoCls = RemoteRepository
repository = RemoteRepoCls(
location, create=create, exclusive=exclusive, lock_wait=lock_wait, lock=lock, args=args
)
raise Error(
"ssh:// is no longer supported for current repositories; use rest:// instead "
"(it can tunnel over ssh). ssh:// remains available only for legacy v1 repositories "
"via --from-borg1."
)
elif (
location.proto in ("rest", "sftp", "file", "http", "https", "rclone", "s3", "b2") and not v1_legacy
@ -584,16 +586,6 @@ def define_common_options(add_common_option):
action=Highlander,
help="Use this command to connect to the 'borg serve' process (default: 'ssh')",
)
add_common_option(
"--socket",
metavar="PATH",
dest="use_socket",
default=False,
const=True,
nargs="?",
action=Highlander,
help="Use UNIX DOMAIN (IPC) socket at PATH for client/server communication with socket: protocol.",
)
add_common_option(
"-r",
"--repo",

View file

@ -8,7 +8,6 @@ from ..helpers import bin_to_hex, Error
from ..helpers import ProgressIndicatorPercent
from ..helpers.argparsing import ArgumentParser
from ..manifest import Manifest
from ..remote import RemoteRepository
from ..repository import Repository
from ..logger import create_logger
@ -20,7 +19,7 @@ class ArchiveAnalyzer:
def __init__(self, args, repository, manifest):
self.args = args
self.repository = repository
assert isinstance(repository, (Repository, RemoteRepository))
assert isinstance(repository, Repository)
self.manifest = manifest
self.difference_by_path = defaultdict(int) # directory path -> count of chunks changed

View file

@ -11,7 +11,6 @@ from ..hashindex import ChunkIndex, ChunkIndexEntry
from ..helpers import set_ec, EXIT_ERROR, format_file_size, bin_to_hex
from ..helpers import ProgressIndicatorPercent
from ..manifest import Manifest
from ..remote import RemoteRepository
from ..repository import Repository, repo_lister
from ..logger import create_logger
@ -22,7 +21,7 @@ logger = create_logger()
class ArchiveGarbageCollector:
def __init__(self, repository, manifest, *, stats, iec):
self.repository = repository
assert isinstance(repository, (Repository, RemoteRepository))
assert isinstance(repository, Repository)
self.manifest = manifest
self.chunks = None # a ChunkIndex, here used for: id -> (is_used, stored_size)
self.total_files = None # overall number of source files written to all archives in this repo

View file

@ -13,7 +13,6 @@ class ServeMixIn:
RepositoryServer(
restrict_to_paths=args.restrict_to_paths,
restrict_to_repositories=args.restrict_to_repositories,
use_socket=args.use_socket,
permissions=args.permissions,
).serve()
@ -24,15 +23,13 @@ class ServeMixIn:
"""
This command starts a repository server process.
`borg serve` currently supports:
`borg serve` is only used to serve legacy (borg 1.x / v1) repositories over SSH, so that
such repositories can still be accessed remotely (e.g. for `borg transfer --from-borg1`).
Current repositories are accessed via a rest:// repository instead (which can itself tunnel
over SSH), so they do not use `borg serve`.
- Being automatically started via SSH when the borg client uses an ssh://...
remote repository. In this mode, `borg serve` will run until that SSH connection
is terminated.
- Being started by some other means (not by the borg client) as a long-running socket
server to be used for borg clients using a socket://... repository (see the `--socket`
option if you do not want to use the default path for the socket and PID file).
It is automatically started via SSH when a borg client uses an ssh://... repository.
In this mode, `borg serve` will run until that SSH connection is terminated.
Please note that `borg serve` does not support providing a specific repository via the
`--repo` option or the `BORG_REPO` environment variable. It is always the borg client that

View file

@ -1,7 +1,6 @@
from .. import __version__
from ..constants import * # NOQA
from ..helpers.argparsing import ArgumentParser
from ..remote import RemoteRepository
from ..logger import create_logger
@ -14,8 +13,10 @@ class VersionMixIn:
from borg.version import parse_version, format_version
client_version = parse_version(__version__)
if args.location.proto in ("ssh", "socket"):
with RemoteRepository(args.location, lock=False, args=args) as repository:
if args.location.proto == "ssh" and getattr(args, "v1_legacy", False):
from ..legacy.remote import LegacyRemoteRepository
with LegacyRemoteRepository(args.location, lock=False, args=args) as repository:
server_version = repository.server_version
else:
server_version = client_version
@ -28,11 +29,11 @@ class VersionMixIn:
"""
This command displays the Borg client and server versions.
If a local repository is given, the client code directly accesses the repository,
so the client version is also shown as the server version.
For current repositories the client code directly accesses the repository (also for
rest:// repositories), so the client version is shown as the server version, too.
If a remote repository is given (e.g., ssh:), the remote Borg is queried, and
its version is displayed as the server version.
If a legacy (borg 1.x / v1) repository is given via ssh: together with --from-borg1,
the remote Borg is queried, and its version is displayed as the server version.
Examples::
@ -40,8 +41,8 @@ class VersionMixIn:
$ borg version /mnt/backup
1.4.0a / 1.4.0a
# remote repository (client uses 1.4.0 alpha, server uses 1.2.7 release)
$ borg version ssh://borg@borgbackup:repo
# legacy remote repository (client uses 1.4.0 alpha, server uses 1.2.7 release)
$ borg version --from-borg1 ssh://borg@borgbackup:repo
1.4.0a / 1.2.7
Due to the version tuple format used in Borg client/server negotiation, only

View file

@ -32,7 +32,6 @@ from .item import ChunkListEntry
from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
from .manifest import Manifest
from .platform import SaveFile
from .remote import RemoteRepository
from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound, repo_lister
from .security import SecurityManager, assert_secure # noqa: F401
@ -651,7 +650,7 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi
flags=ChunkIndex.F_USED, size=0, pack_id=pack_id, obj_offset=0, obj_size=obj_size
)
# Cache does not contain the manifest.
if not isinstance(repository, (Repository, RemoteRepository)):
if not isinstance(repository, Repository):
del chunks[Manifest.MANIFEST_ID]
duration = perf_counter() - t0 or 0.001
# Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.

View file

@ -96,7 +96,7 @@ class ArchiverSetup:
self.patterns_file_path: str | None = None
def get_kind(self) -> str:
if self.repository_location.startswith("ssh://__testsuite__") or self.repository_location.startswith("rest://"):
if self.repository_location.startswith("rest://"):
return "remote"
elif self.EXE == "borg.exe":
return "binary"

View file

@ -62,7 +62,7 @@ from .crypto.low_level import blake2b_128
from .archiver._common import build_matcher, build_filter
from .archive import Archive, get_item_uid_gid
from .hashindex import FuseVersionsIndex
from .helpers import daemonize, daemonizing, signal_handler, format_file_size, bin_to_hex, Error
from .helpers import daemonizing, signal_handler, format_file_size, bin_to_hex, Error
from .helpers import HardLinkManager
from .helpers import msgpack
from .helpers.lrucache import LRUCache
@ -70,7 +70,6 @@ from .item import Item
from .platform import uid2user, gid2group
from .platformflags import is_darwin
from .repository import Repository
from .remote import RemoteRepository
def fuse_main():
@ -596,13 +595,10 @@ class FuseOperations(llfuse.Operations, FuseBackend):
"that do not reflect the archive content."
)
if not foreground:
if isinstance(self.repository_uncached, RemoteRepository):
daemonize()
else:
with daemonizing(show_rc=show_rc) as (old_id, new_id):
# local repo: the locking process' PID is changing, migrate it:
logger.debug("fuse: mount local repo, going to background: migrating lock.")
self.repository_uncached.migrate_lock(old_id, new_id)
with daemonizing(show_rc=show_rc) as (old_id, new_id):
# the locking process' PID is changing, migrate it:
logger.debug("fuse: mount repo, going to background: migrating lock.")
self.repository_uncached.migrate_lock(old_id, new_id)
# If the file system crashes, we do not want to umount because in that
# case the mountpoint suddenly appears to become empty. This can have

View file

@ -600,7 +600,7 @@ class Location:
rclone_re = re.compile(r"(?P<proto>rclone):(?P<path>(.*))", re.VERBOSE)
sl = "/" if is_win32 else ""
file_or_socket_re = re.compile(r"(?P<proto>(file|socket))://" + sl + abs_path_re, re.VERBOSE)
file_re = re.compile(r"(?P<proto>file)://" + sl + abs_path_re, re.VERBOSE)
local_re = re.compile(local_path_re, re.VERBOSE)
@ -666,7 +666,7 @@ class Location:
self.proto = m.group("proto")
self.path = m.group("path")
return True
m = self.file_or_socket_re.match(text)
m = self.file_re.match(text)
if m:
self.proto = m.group("proto")
self.path = os.path.normpath(m.group("path"))
@ -709,7 +709,7 @@ class Location:
return self._host.lstrip("[").rstrip("]")
def canonical_path(self):
if self.proto in ("file", "socket"):
if self.proto == "file":
return self.path
if self.proto == "rclone":
return f"{self.proto}:{self.path}"
@ -1342,11 +1342,10 @@ class BorgJsonEncoder(json.JSONEncoder):
from ..legacy.repository import LegacyRepository
from ..repository import Repository
from ..legacy.remote import LegacyRemoteRepository
from ..remote import RemoteRepository
from ..archive import Archive
from ..cache import AdHocWithFilesCache
if isinstance(o, (LegacyRepository, LegacyRemoteRepository)) or isinstance(o, (Repository, RemoteRepository)):
if isinstance(o, (LegacyRepository, LegacyRemoteRepository)) or isinstance(o, Repository):
return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()}
if isinstance(o, Archive):
return o.info()

View file

@ -24,7 +24,7 @@ logger = create_logger()
from .archiver._common import build_matcher, build_filter
from .archive import Archive, get_item_uid_gid
from .hashindex import FuseVersionsIndex
from .helpers import daemonize, daemonizing, signal_handler, bin_to_hex, Error
from .helpers import daemonizing, signal_handler, bin_to_hex, Error
from .helpers import HardLinkManager
from .helpers import msgpack
from .helpers.lrucache import LRUCache
@ -32,7 +32,6 @@ from .item import Item
from .platform import uid2user, gid2group
from .platformflags import is_darwin
from .repository import Repository
from .remote import RemoteRepository
BLOCK_SIZE = 512 # Standard filesystem block size for st_blocks and statfs
DEBUG_LOG: str | None = None # os.path.join(os.getcwd(), "fuse_debug.log")
@ -529,12 +528,9 @@ class borgfs(hlfuse.Operations, FuseBackend):
# hlfuse.FUSE will block if foreground=True, otherwise it returns immediately
if not foreground:
# Background mode: daemonize first, then start FUSE (blocking)
if isinstance(self.repository, RemoteRepository):
daemonize()
else:
with daemonizing(show_rc=show_rc) as (old_id, new_id):
logger.debug("fuse: mount local repo, going to background: migrating lock.")
self.repository.migrate_lock(old_id, new_id)
with daemonizing(show_rc=show_rc) as (old_id, new_id):
logger.debug("fuse: mount repo, going to background: migrating lock.")
self.repository.migrate_lock(old_id, new_id)
# Run the FUSE main loop in foreground (we might be daemonized already or not)
with signal_handler("SIGUSR1", self.sig_info_handler), signal_handler("SIGINFO", self.sig_info_handler):

View file

@ -1,21 +1,15 @@
import atexit
import errno
import functools
import inspect
import logging
import os
import queue
import select
import shlex
import shutil
import socket
import struct
import sys
import tempfile
import textwrap
import time
import traceback
from subprocess import Popen, PIPE
from xxhash import xxh64
@ -23,45 +17,22 @@ import borg.logger
from . import __version__
from .compress import Compressor
from .constants import * # NOQA
from .helpers import Error, ErrorWithTraceback, IntegrityError
from .helpers import Error, IntegrityError
from .helpers import bin_to_hex
from .helpers import get_limited_unpacker
from .helpers import replace_placeholders
from .helpers import sysinfo
from .helpers import format_file_size
from .helpers import safe_unlink
from .helpers import prepare_subprocess_env, ignore_sigint
from .helpers import get_socket_filename
from .fslocking import LockTimeout, NotLocked, NotMyLock, LockFailed
from .logger import create_logger, borg_serve_log_queue
from .manifest import NoManifestError
from .helpers import msgpack
from .repository import Repository, StoreObjectNotFound
from .version import parse_version, format_version
from .helpers.datastruct import EfficientCollectionQueue
from .platform import is_win32
from .version import parse_version
logger = create_logger(__name__)
BORG_VERSION = parse_version(__version__)
MSGID, MSG, ARGS, RESULT, LOG = "i", "m", "a", "r", "l"
MAX_INFLIGHT = 100
RATELIMIT_PERIOD = 0.1
class ConnectionClosed(Error):
"""Connection closed by remote host"""
exit_mcode = 80
class ConnectionClosedWithHint(ConnectionClosed):
"""Connection closed by remote host. {}"""
exit_mcode = 81
class PathNotAllowed(Error):
"""Repository path not allowed: {}"""
@ -81,41 +52,10 @@ class UnexpectedRPCDataFormatFromClient(Error):
exit_mcode = 85
class UnexpectedRPCDataFormatFromServer(Error):
"""Got unexpected RPC data format from server:\n{}"""
exit_mcode = 86
def __init__(self, data):
try:
data = data.decode()[:128]
except UnicodeDecodeError:
data = data[:128]
data = ["%02X" % byte for byte in data]
data = textwrap.fill(" ".join(data), 16 * 3)
super().__init__(data)
class ConnectionBrokenWithHint(Error):
"""Connection to remote host is broken. {}"""
exit_mcode = 87
# Protocol compatibility:
# In general the server is responsible for rejecting too old clients and the client it responsible for rejecting
# too old servers. This ensures that the knowledge what is compatible is always held by the newer component.
#
# For the client the return of the negotiate method is a dict which includes the server version.
#
# All method calls on the remote repository object must be allowlisted in RepositoryServer.rpc_methods and have api
# stubs in RemoteRepository*. The @api decorator on these stubs is used to set server version requirements.
#
# Method parameters are identified only by name and never by position. Unknown parameters are ignored by the server.
# If a new parameter is important and may not be ignored, on the client a parameter specific version requirement needs
# to be added.
# When parameters are removed, they need to be preserved as defaulted parameters on the client stubs so that older
# servers still get compatible input.
# borg only serves legacy (borg 1.x / v1) repositories over ssh:// now (current repositories use rest://).
# The legacy client lives in borg.legacy.remote (LegacyRemoteRepository); this server keeps the legacy
# RPC method allowlist and opens repositories using LegacyRepository.
class RepositoryServer: # pragma: no cover
@ -140,49 +80,14 @@ class RepositoryServer: # pragma: no cover
"get_manifest", # borg2 LegacyRepository has this
)
_rpc_methods = ( # Repository
"__len__",
"check",
"delete",
"destroy",
"get",
"list",
"negotiate",
"open",
"close",
"info",
"put",
"save_key",
"load_key",
"break_lock",
"inject_exception",
"get_manifest",
"put_manifest",
"store_list",
"store_load",
"store_store",
"store_delete",
"store_move",
)
def __init__(self, restrict_to_paths, restrict_to_repositories, use_socket, permissions=None):
def __init__(self, restrict_to_paths, restrict_to_repositories, permissions=None):
self.repository = None
self.RepoCls = None
self.rpc_methods = ("open", "close", "negotiate")
self.restrict_to_paths = restrict_to_paths
self.restrict_to_repositories = restrict_to_repositories
self.permissions = permissions
# This flag is parsed from the serve command line via Archiver.do_serve,
# i.e. it reflects local system policy and generally ranks higher than
# whatever the client wants, except when initializing a new repository
# (see RepositoryServer.open below).
self.client_version = None # we update this after client sends version information
if use_socket is False:
self.socket_path = None
elif use_socket is True: # --socket
self.socket_path = get_socket_filename()
else: # --socket=/some/path
self.socket_path = use_socket
def filter_args(self, f, kwargs):
"""Remove unknown named parameters from call, because client did (implicitly) say it's ok."""
@ -308,38 +213,10 @@ class RepositoryServer: # pragma: no cover
shutdown_serve = True
continue
if self.socket_path: # server for socket:// connections
try:
# remove any left-over socket file
os.unlink(self.socket_path)
except OSError:
if os.path.exists(self.socket_path):
raise
sock_dir = os.path.dirname(self.socket_path)
os.makedirs(sock_dir, exist_ok=True)
pid_file = self.socket_path.removesuffix(".sock") + ".pid"
pid = os.getpid()
with open(pid_file, "w") as f:
f.write(str(pid))
atexit.register(functools.partial(os.remove, pid_file))
atexit.register(functools.partial(os.remove, self.socket_path))
sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
sock.bind(self.socket_path) # this creates the socket file in the fs
sock.listen(0) # no backlog
os.chmod(self.socket_path, mode=0o0770) # group members may use the socket, too.
print(f"borg serve: PID {pid}, listening on socket {self.socket_path} ...", file=sys.stderr)
while True:
connection, client_address = sock.accept()
print(f"Accepted a connection on socket {self.socket_path} ...", file=sys.stderr)
self.stdin_fd = connection.makefile("rb").fileno()
self.stdout_fd = connection.makefile("wb").fileno()
inner_serve()
print(f"Finished with connection on socket {self.socket_path} .", file=sys.stderr)
else: # server for one ssh:// connection
self.stdin_fd = sys.stdin.fileno()
self.stdout_fd = sys.stdout.fileno()
inner_serve()
# server for one ssh:// connection
self.stdin_fd = sys.stdin.fileno()
self.stdout_fd = sys.stdout.fileno()
inner_serve()
def negotiate(self, client_data):
if isinstance(client_data, dict):
@ -357,13 +234,11 @@ class RepositoryServer: # pragma: no cover
return path
def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, v1_legacy=False):
if v1_legacy:
from .legacy.repository import LegacyRepository
# borg only serves legacy (v1) repositories now; current repositories are accessed via rest://.
from .legacy.repository import LegacyRepository
self.RepoCls = LegacyRepository
else:
self.RepoCls = Repository
self.rpc_methods = self._legacy_rpc_methods if v1_legacy else self._rpc_methods
self.RepoCls = LegacyRepository
self.rpc_methods = self._legacy_rpc_methods
logging.debug("Resolving repository path %r", path)
path = self._resolve_path(path)
logging.debug("Resolved repository path to %r", path)
@ -386,8 +261,6 @@ class RepositoryServer: # pragma: no cover
else:
raise PathNotAllowed(path)
kwargs = dict(lock_wait=lock_wait, lock=lock, exclusive=exclusive, send_log_cb=self.send_queued_log)
if not v1_legacy:
kwargs["permissions"] = self.permissions
self.repository = self.RepoCls(path, create, **kwargs)
self.repository.__enter__() # clean exit handled by serve() method
return self.repository.id
@ -422,661 +295,6 @@ class RepositoryServer: # pragma: no cover
0 // 0
class SleepingBandwidthLimiter:
def __init__(self, limit):
if limit:
self.ratelimit = int(limit * RATELIMIT_PERIOD)
self.ratelimit_last = time.monotonic()
self.ratelimit_quota = self.ratelimit
else:
self.ratelimit = None
def write(self, fd, to_send):
if self.ratelimit:
now = time.monotonic()
if self.ratelimit_last + RATELIMIT_PERIOD <= now:
self.ratelimit_quota += self.ratelimit
if self.ratelimit_quota > 2 * self.ratelimit:
self.ratelimit_quota = 2 * self.ratelimit
self.ratelimit_last = now
if self.ratelimit_quota == 0:
tosleep = self.ratelimit_last + RATELIMIT_PERIOD - now
time.sleep(tosleep)
self.ratelimit_quota += self.ratelimit
self.ratelimit_last = time.monotonic()
if len(to_send) > self.ratelimit_quota:
to_send = to_send[: self.ratelimit_quota]
try:
written = os.write(fd, to_send)
except BrokenPipeError:
raise ConnectionBrokenWithHint("Broken Pipe") from None
if self.ratelimit:
self.ratelimit_quota -= written
return written
def api(*, since, **kwargs_decorator):
"""Check version requirements and use self.call to do the remote method call.
<since> specifies the version in which borg introduced this method.
Calling this method when connected to an older version will fail without transmitting anything to the server.
Further kwargs can be used to encode version specific restrictions:
<previously> is the value resulting in the behaviour before introducing the new parameter.
If a previous hardcoded behaviour is parameterized in a version, this allows calls that use the previously
hardcoded behaviour to pass through and generates an error if another behaviour is requested by the client.
E.g. when 'append_only' was introduced in 1.0.7 the previous behaviour was what now is append_only=False.
Thus @api(..., append_only={'since': parse_version('1.0.7'), 'previously': False}) allows calls
with append_only=False for all version but rejects calls using append_only=True on versions older than 1.0.7.
<dontcare> is a flag to set the behaviour if an old version is called the new way.
If set to True, the method is called without the (not yet supported) parameter (this should be done if that is the
more desirable behaviour). If False, an exception is generated.
E.g. before 'threshold' was introduced in 1.2.0a8, a hardcoded threshold of 0.1 was used in commit().
"""
def decorator(f):
@functools.wraps(f)
def do_rpc(self, *args, **kwargs):
sig = inspect.signature(f)
bound_args = sig.bind(self, *args, **kwargs)
named = {} # Arguments for the remote process
extra = {} # Arguments for the local process
for name, param in sig.parameters.items():
if name == "self":
continue
if name in bound_args.arguments:
if name == "wait":
extra[name] = bound_args.arguments[name]
else:
named[name] = bound_args.arguments[name]
else:
if param.default is not param.empty:
named[name] = param.default
if self.server_version < since:
raise self.RPCServerOutdated(f.__name__, format_version(since))
for name, restriction in kwargs_decorator.items():
if restriction["since"] <= self.server_version:
continue
if "previously" in restriction and named[name] == restriction["previously"]:
continue
if restriction.get("dontcare", False):
continue
raise self.RPCServerOutdated(
f"{f.__name__} {name}={named[name]!s}", format_version(restriction["since"])
)
return self.call(f.__name__, named, **extra)
return do_rpc
return decorator
class RemoteRepository:
extra_test_args = [] # type: ignore
class RPCError(Exception):
def __init__(self, unpacked):
# unpacked has keys: 'exception_args', 'exception_full', 'exception_short', 'sysinfo'
self.unpacked = unpacked
def get_message(self):
return "\n".join(self.unpacked["exception_short"])
@property
def traceback(self):
return self.unpacked.get("exception_trace", True)
@property
def exception_class(self):
return self.unpacked["exception_class"]
@property
def exception_full(self):
return "\n".join(self.unpacked["exception_full"])
@property
def sysinfo(self):
return self.unpacked["sysinfo"]
class RPCServerOutdated(Error):
"""Borg server is too old for {}. Required version {}"""
exit_mcode = 84
@property
def method(self):
return self.args[0]
@property
def required_version(self):
return self.args[1]
def __init__(self, location, create=False, exclusive=False, lock_wait=1.0, lock=True, args=None):
self.location = self._location = location
self.preload_ids = []
self.msgid = 0
self.rx_bytes = 0
self.tx_bytes = 0
self.to_send = EfficientCollectionQueue(1024 * 1024, bytes)
self.stdin_fd = self.stdout_fd = self.stderr_fd = None
self.stderr_received = b"" # incomplete stderr line bytes received (no \n yet)
self.chunkid_to_msgids = {}
self.ignore_responses = set()
self.responses = {}
self.async_responses = {}
self.shutdown_time = None
self.ratelimit = SleepingBandwidthLimiter(args.upload_ratelimit * 1024 if args and args.upload_ratelimit else 0)
self.upload_buffer_size_limit = args.upload_buffer * 1024 * 1024 if args and args.upload_buffer else 0
self.unpacker = get_limited_unpacker("client")
self.server_version = None # we update this after server sends its version
self.p = self.sock = None
self._args = args
if self.location.proto == "ssh":
testing = location.host == "__testsuite__"
# when testing, we invoke and talk to a borg process directly (no ssh).
# when not testing, we invoke the system-installed ssh binary to talk to a remote borg.
env = prepare_subprocess_env(system=not testing)
borg_cmd = self.borg_cmd(args, testing)
if not testing:
borg_cmd = self.ssh_cmd(location) + borg_cmd
logger.debug("SSH command line: %s", borg_cmd)
# we do not want the ssh getting killed by Ctrl-C/SIGINT because it is needed for clean shutdown of borg.
self.p = Popen(
borg_cmd,
bufsize=0,
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
env=env,
preexec_fn=None if is_win32 else ignore_sigint,
) # nosec B603
self.stdin_fd = self.p.stdin.fileno()
self.stdout_fd = self.p.stdout.fileno()
self.stderr_fd = self.p.stderr.fileno()
self.r_fds = [self.stdout_fd, self.stderr_fd]
self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd]
elif self.location.proto == "socket":
if args.use_socket is False or args.use_socket is True: # nothing or --socket
socket_path = get_socket_filename()
else: # --socket=/some/path
socket_path = args.use_socket
self.sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
try:
self.sock.connect(socket_path) # note: socket_path length is rather limited.
except FileNotFoundError:
self.sock = None
raise Error(f"The socket file {socket_path} does not exist.")
except ConnectionRefusedError:
self.sock = None
raise Error(f"There is no borg serve running for the socket file {socket_path}.")
self.stdin_fd = self.sock.makefile("wb").fileno()
self.stdout_fd = self.sock.makefile("rb").fileno()
self.stderr_fd = None
self.r_fds = [self.stdout_fd]
self.x_fds = [self.stdin_fd, self.stdout_fd]
else:
raise Error(f"Unsupported protocol {location.proto}")
os.set_blocking(self.stdin_fd, False)
assert not os.get_blocking(self.stdin_fd)
os.set_blocking(self.stdout_fd, False)
assert not os.get_blocking(self.stdout_fd)
if self.stderr_fd is not None:
os.set_blocking(self.stderr_fd, False)
assert not os.get_blocking(self.stderr_fd)
try:
try:
version = self.call("negotiate", {"client_data": {"client_version": BORG_VERSION}})
except ConnectionClosed:
raise ConnectionClosedWithHint("Is borg working on the server?") from None
if isinstance(version, dict):
self.server_version = version["server_version"]
else:
raise Exception("Server insisted on using unsupported protocol version %s" % version)
self.id = self.open(
path=self.location.path, create=create, lock_wait=lock_wait, lock=lock, exclusive=exclusive
)
info = self.info()
self.version = info["version"]
except Exception:
self.close()
raise
def __del__(self):
if len(self.responses):
logging.debug("still %d cached responses left in RemoteRepository" % (len(self.responses),))
if self.p or self.sock:
self.close()
assert False, "cleanup happened in RemoteRepository.__del__"
def __repr__(self):
return f"<{self.__class__.__name__} {self.location.canonical_path()}>"
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
try:
if exc_type is not None:
self.shutdown_time = time.monotonic() + 30
finally:
# in any case, we want to close the repo cleanly.
logger.debug(
"RemoteRepository: %s bytes sent, %s bytes received, %d messages sent",
format_file_size(self.tx_bytes),
format_file_size(self.rx_bytes),
self.msgid,
)
self.close()
@property
def id_str(self):
return bin_to_hex(self.id)
def borg_cmd(self, args, testing):
"""return a borg serve command line"""
# give some args/options to 'borg serve' process as they were given to us
opts = []
if args is not None:
root_logger = logging.getLogger()
if root_logger.isEnabledFor(logging.DEBUG):
opts.append("--debug")
elif root_logger.isEnabledFor(logging.INFO):
opts.append("--info")
elif root_logger.isEnabledFor(logging.WARNING):
pass # warning is default
elif root_logger.isEnabledFor(logging.ERROR):
opts.append("--error")
elif root_logger.isEnabledFor(logging.CRITICAL):
opts.append("--critical")
else:
raise ValueError("log level missing, fix this code")
# Tell the remote server about debug topics it may need to consider.
# Note that debug topics are usable for "spew" or "trace" logs which would
# be too plentiful to transfer for normal use, so the server doesn't send
# them unless explicitly enabled.
#
# Needless to say, if you do --debug-topic=repository.compaction, for example,
# with a 1.0.x server it won't work, because the server does not recognize the
# option.
#
# This is not considered a problem, since this is a debugging feature that
# should not be used for regular use.
for topic in args.debug_topics:
if "." not in topic:
topic = "borg.debug." + topic
if "repository" in topic:
opts.append("--debug-topic=%s" % topic)
env_vars = []
if testing:
return env_vars + [sys.executable, "-m", "borg", "serve"] + opts + self.extra_test_args
else: # pragma: no cover
remote_path = args.remote_path or os.environ.get("BORG_REMOTE_PATH", "borg")
remote_path = replace_placeholders(remote_path)
return env_vars + [remote_path, "serve"] + opts
def ssh_cmd(self, location):
"""return a ssh command line that can be prefixed to a borg command line"""
rsh = self._args.rsh or os.environ.get("BORG_RSH", "ssh")
args = shlex.split(rsh)
if location.port:
args += ["-p", str(location.port)]
if location.user:
args.append(f"{location.user}@{location.host}")
else:
args.append("%s" % location.host)
return args
def call(self, cmd, args, **kw):
for resp in self.call_many(cmd, [args], **kw):
return resp
def call_many(self, cmd, calls, wait=True, is_preloaded=False, async_wait=True):
if not calls and cmd != "async_responses":
return
assert not is_preloaded or cmd == "get", "is_preloaded is only supported for 'get'"
def send_buffer():
if self.to_send:
try:
written = self.ratelimit.write(self.stdin_fd, self.to_send.peek_front())
self.tx_bytes += written
self.to_send.pop_front(written)
except OSError as e:
# io.write might raise EAGAIN even though select indicates
# that the fd should be writable.
# EWOULDBLOCK is added for defensive programming sake.
if e.errno not in [errno.EAGAIN, errno.EWOULDBLOCK]:
raise
def pop_preload_msgid(chunkid):
msgid = self.chunkid_to_msgids[chunkid].pop(0)
if not self.chunkid_to_msgids[chunkid]:
del self.chunkid_to_msgids[chunkid]
return msgid
def handle_error(unpacked):
if "exception_class" not in unpacked:
return
error = unpacked["exception_class"]
args = unpacked["exception_args"]
if error == "Error":
raise Error(args[0])
elif error == "ErrorWithTraceback":
raise ErrorWithTraceback(args[0])
elif error == "InvalidRepository":
raise Repository.InvalidRepository(self.location.processed)
elif error == "DoesNotExist":
raise Repository.DoesNotExist(self.location.processed)
elif error == "AlreadyExists":
raise Repository.AlreadyExists(self.location.processed)
elif error == "CheckNeeded":
raise Repository.CheckNeeded(self.location.processed)
elif error == "IntegrityError":
raise IntegrityError(args[0])
elif error == "PathNotAllowed":
raise PathNotAllowed(args[0])
elif error == "PathPermissionDenied":
raise Repository.PathPermissionDenied(args[0])
elif error == "PathAlreadyExists":
raise Repository.PathAlreadyExists(args[0])
elif error == "ParentPathDoesNotExist":
raise Repository.ParentPathDoesNotExist(args[0])
elif error == "ObjectNotFound":
raise Repository.ObjectNotFound(args[0], self.location.processed)
elif error == "StoreObjectNotFound":
raise StoreObjectNotFound(args[0])
elif error == "InvalidRPCMethod":
raise InvalidRPCMethod(args[0])
elif error == "LockTimeout":
raise LockTimeout(args[0])
elif error == "LockFailed":
raise LockFailed(args[0], args[1])
elif error == "NotLocked":
raise NotLocked(args[0])
elif error == "NotMyLock":
raise NotMyLock(args[0])
elif error == "NoManifestError":
raise NoManifestError
elif error == "InsufficientFreeSpaceError":
raise Repository.InsufficientFreeSpaceError(args[0], args[1])
elif error == "InvalidRepositoryConfig":
raise Repository.InvalidRepositoryConfig(self.location.processed, args[1])
else:
raise self.RPCError(unpacked)
calls = list(calls)
waiting_for = []
maximum_to_send = 0 if wait else self.upload_buffer_size_limit
send_buffer() # Try to send data, as some cases (async_response) will never try to send data otherwise.
try:
while wait or calls:
logger.debug(
f"call_many: calls: {len(calls)} waiting_for: {len(waiting_for)} responses: {len(self.responses)}"
)
if self.shutdown_time and time.monotonic() > self.shutdown_time:
# we are shutting this RemoteRepository down already, make sure we do not waste
# a lot of time in case a lot of async stuff is coming in or remote is gone or slow.
logger.debug(
"shutdown_time reached, shutting down with %d waiting_for and %d async_responses.",
len(waiting_for),
len(self.async_responses),
)
return
while waiting_for:
try:
unpacked = self.responses.pop(waiting_for[0])
waiting_for.pop(0)
handle_error(unpacked)
yield unpacked[RESULT]
if not waiting_for and not calls:
return
except KeyError:
break
if cmd == "async_responses":
while True:
try:
msgid, unpacked = self.async_responses.popitem()
except KeyError:
# there is nothing left what we already have received
if async_wait and self.ignore_responses:
# but do not return if we shall wait and there is something left to wait for:
break
else:
return
else:
handle_error(unpacked)
yield unpacked[RESULT]
if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
w_fds = [self.stdin_fd]
else:
w_fds = []
r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
if x:
raise Exception("FD exception occurred")
for fd in r:
if fd is self.stdout_fd:
data = os.read(fd, BUFSIZE)
if not data:
raise ConnectionClosed()
self.rx_bytes += len(data)
self.unpacker.feed(data)
for unpacked in self.unpacker:
if not isinstance(unpacked, dict):
raise UnexpectedRPCDataFormatFromServer(data)
lr_dict = unpacked.get(LOG)
if lr_dict is not None:
# Re-emit remote log messages locally.
_logger = logging.getLogger(lr_dict["name"])
if _logger.isEnabledFor(lr_dict["level"]):
_logger.handle(logging.LogRecord(**lr_dict))
continue
msgid = unpacked[MSGID]
if msgid in self.ignore_responses:
self.ignore_responses.remove(msgid)
# async methods never return values, but may raise exceptions.
if "exception_class" in unpacked:
self.async_responses[msgid] = unpacked
else:
# we currently do not have async result values except "None",
# so we do not add them into async_responses.
if unpacked[RESULT] is not None:
self.async_responses[msgid] = unpacked
else:
self.responses[msgid] = unpacked
elif fd is self.stderr_fd:
data = os.read(fd, 32768)
if not data:
raise ConnectionClosed()
self.rx_bytes += len(data)
# deal with incomplete lines (may appear due to block buffering)
if self.stderr_received:
data = self.stderr_received + data
self.stderr_received = b""
lines = data.splitlines(keepends=True)
if lines and not lines[-1].endswith((b"\r", b"\n")):
self.stderr_received = lines.pop()
# now we have complete lines in <lines> and any partial line in self.stderr_received.
_logger = logging.getLogger()
for line in lines:
# borg serve (remote/server side) should not emit stuff on stderr,
# but e.g. the ssh process (local/client side) might output errors there.
assert line.endswith((b"\r", b"\n"))
# something came in on stderr, log it to not lose it.
# decode late, avoid partial utf-8 sequences.
_logger.warning("stderr: " + line.decode().strip())
if w:
while (
(len(self.to_send) <= maximum_to_send)
and (calls or self.preload_ids)
and len(waiting_for) < MAX_INFLIGHT
):
if calls:
args = calls[0]
if cmd == "get" and args["id"] in self.chunkid_to_msgids:
# we have a get command and have already sent a request for this chunkid when
# doing preloading, so we know the msgid of the response we are waiting for:
waiting_for.append(pop_preload_msgid(args["id"]))
del calls[0]
elif not is_preloaded:
# make and send a request (already done if we are using preloading)
self.msgid += 1
waiting_for.append(self.msgid)
del calls[0]
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
if not self.to_send and self.preload_ids:
chunk_id = self.preload_ids.pop(0)
# for preloading chunks, the raise_missing behaviour is defined HERE,
# not in the get_many / fetch_many call that later fetches the preloaded chunks.
args = {"id": chunk_id, "raise_missing": False}
self.msgid += 1
self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid)
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args}))
send_buffer()
finally:
self.ignore_responses |= set(waiting_for) # we lose order here
if is_preloaded:
for call in calls:
chunkid = call["id"]
if chunkid in self.chunkid_to_msgids:
self.ignore_responses.add(pop_preload_msgid(chunkid))
@api(since=parse_version("1.0.0"), v1_legacy={"since": parse_version("2.0.0b21"), "previously": True})
def open(self, path, create=False, lock_wait=None, lock=True, exclusive=False, v1_legacy=False):
"""actual remoting is done via self.call in the @api decorator"""
@api(since=parse_version("2.0.0a3"))
def info(self):
"""actual remoting is done via self.call in the @api decorator"""
@api(since=parse_version("1.0.0"), max_duration={"since": parse_version("1.2.0a4"), "previously": 0})
def check(self, repair=False, max_duration=0):
"""actual remoting is done via self.call in the @api decorator"""
@api(
since=parse_version("1.0.0"),
compact={"since": parse_version("1.2.0a0"), "previously": True, "dontcare": True},
threshold={"since": parse_version("1.2.0a8"), "previously": 0.1, "dontcare": True},
)
def commit(self, compact=True, threshold=0.1):
"""actual remoting is done via self.call in the @api decorator"""
@api(since=parse_version("1.0.0"))
def rollback(self):
"""actual remoting is done via self.call in the @api decorator"""
@api(since=parse_version("1.0.0"))
def destroy(self):
"""actual remoting is done via self.call in the @api decorator"""
@api(since=parse_version("1.0.0"))
def __len__(self):
"""actual remoting is done via self.call in the @api decorator"""
@api(since=parse_version("1.0.0"))
def list(self, limit=None, marker=None):
"""actual remoting is done via self.call in the @api decorator"""
def get(self, id, read_data=True, raise_missing=True):
for resp in self.get_many([id], read_data=read_data, raise_missing=raise_missing):
return resp
def get_many(self, ids, read_data=True, is_preloaded=False, raise_missing=True):
yield from self.call_many(
"get",
[{"id": id, "read_data": read_data, "raise_missing": raise_missing} for id in ids],
is_preloaded=is_preloaded,
)
@api(since=parse_version("1.0.0"))
def put(self, id, data, wait=True):
"""actual remoting is done via self.call in the @api decorator"""
@api(since=parse_version("1.0.0"))
def delete(self, id, wait=True):
"""actual remoting is done via self.call in the @api decorator"""
@api(since=parse_version("1.0.0"))
def save_key(self, keydata):
"""actual remoting is done via self.call in the @api decorator"""
@api(since=parse_version("1.0.0"))
def load_key(self):
"""actual remoting is done via self.call in the @api decorator"""
@api(since=parse_version("1.0.0"))
def break_lock(self):
"""actual remoting is done via self.call in the @api decorator"""
def close(self):
if self.p or self.sock:
self.call("close", {}, wait=True)
if self.p:
self.p.stdin.close()
self.p.stdout.close()
self.p.wait()
self.p = None
if self.sock:
try:
self.sock.shutdown(socket.SHUT_RDWR)
except OSError as e:
if e.errno != errno.ENOTCONN:
raise
self.sock.close()
self.sock = None
def async_response(self, wait=True):
for resp in self.call_many("async_responses", calls=[], wait=True, async_wait=wait):
return resp
def preload(self, ids):
self.preload_ids += ids
@api(since=parse_version("2.0.0b8"))
def get_manifest(self):
"""actual remoting is done via self.call in the @api decorator"""
@api(since=parse_version("2.0.0b8"))
def put_manifest(self, data):
"""actual remoting is done via self.call in the @api decorator"""
@api(since=parse_version("2.0.0b8"), deleted={"since": parse_version("2.0.0b14"), "previously": False})
def store_list(self, name, *, deleted=False):
"""actual remoting is done via self.call in the @api decorator"""
@api(since=parse_version("2.0.0b8"))
def store_load(self, name):
"""actual remoting is done via self.call in the @api decorator"""
@api(since=parse_version("2.0.0b8"))
def store_store(self, name, value):
"""actual remoting is done via self.call in the @api decorator"""
@api(since=parse_version("2.0.0b8"), deleted={"since": parse_version("2.0.0b14"), "previously": False})
def store_delete(self, name, *, deleted=False):
"""actual remoting is done via self.call in the @api decorator"""
@api(since=parse_version("2.0.0b14"))
def store_move(self, name, new_name=None, *, delete=False, undelete=False, deleted=False):
"""actual remoting is done via self.call in the @api decorator"""
class RepositoryNoCache:
"""A not caching Repository wrapper, passes through to repository.
@ -1280,7 +498,7 @@ def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None
csize = meta.get("csize", len(data))
return csize, decrypted
if isinstance(repository, RemoteRepository) or force_cache:
if force_cache:
return RepositoryCache(repository, pack, unpack, transform)
else:
return RepositoryNoCache(repository, transform)

View file

@ -24,7 +24,6 @@ from ...helpers import init_ec_warnings
from ...logger import flush_logging
from ...manifest import Manifest
from ...platform import get_flags
from ...remote import RemoteRepository
from ...repository import Repository
from .. import has_lchflags, has_mknod, is_utime_fully_supported, have_fuse_mtime_ns, st_mtime_ns_round, filter_xattrs
from .. import changedir, ENOATTR # NOQA
@ -178,10 +177,7 @@ def open_archive(repo_path, name):
def open_repository(archiver):
if archiver.get_kind() == "remote":
location = Location(archiver.repository_location)
if location.proto == "rest":
return Repository(location, exclusive=True)
return RemoteRepository(location)
return Repository(Location(archiver.repository_location), exclusive=True)
else:
return Repository(archiver.repository_path, exclusive=True)

View file

@ -9,7 +9,6 @@ from ...archive import ChunkBuffer
from ...constants import * # NOQA
from ...helpers import bin_to_hex, msgpack
from ...manifest import Manifest
from ...remote import RemoteRepository
from ...repository import Repository
from ..repository_test import fchunk
from . import cmd, src_file, create_src_archive, open_archive, generate_archiver_tests, RK_ENCRYPTION
@ -208,7 +207,7 @@ def test_missing_manifest(archivers, request):
check_cmd_setup(archiver)
archive, repository = open_archive(archiver.repository_path, "archive1")
with repository:
if isinstance(repository, (Repository, RemoteRepository)):
if isinstance(repository, Repository):
repository.store_delete("config/manifest")
else:
repository.delete(Manifest.MANIFEST_ID)

View file

@ -1,6 +1,5 @@
import os
import shutil
from unittest.mock import patch
import pytest
@ -9,7 +8,6 @@ from ...constants import * # NOQA
from ...helpers import Location, get_security_dir, bin_to_hex
from ...helpers import EXIT_ERROR
from ...manifest import Manifest, MandatoryFeatureUnsupported
from ...remote import RemoteRepository, PathNotAllowed
from ...repository import Repository
from .. import llfuse
from .. import changedir
@ -277,49 +275,6 @@ def test_unknown_mandatory_feature_in_cache(archivers, request):
# Begin Remote Tests
def test_remote_repo_restrict_to_path(remote_archiver):
if remote_archiver.repository_location.startswith("rest://"):
pytest.skip("Not applicable for rest:// protocol")
original_location, repo_path = remote_archiver.repository_location, remote_archiver.repository_path
# restricted to repo directory itself:
with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", repo_path]):
cmd(remote_archiver, "repo-create", RK_ENCRYPTION)
# restricted to repo directory itself, fail for other directories with same prefix:
with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", repo_path]):
with pytest.raises(PathNotAllowed):
remote_archiver.repository_location = original_location + "_0"
cmd(remote_archiver, "repo-create", RK_ENCRYPTION)
# restricted to a completely different path:
with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", "/foo"]):
with pytest.raises(PathNotAllowed):
remote_archiver.repository_location = original_location + "_1"
cmd(remote_archiver, "repo-create", RK_ENCRYPTION)
path_prefix = os.path.dirname(repo_path)
# restrict to repo directory's parent directory:
with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", path_prefix]):
remote_archiver.repository_location = original_location + "_2"
cmd(remote_archiver, "repo-create", RK_ENCRYPTION)
# restrict to repo directory's parent directory and another directory:
with patch.object(
RemoteRepository, "extra_test_args", ["--restrict-to-path", "/foo", "--restrict-to-path", path_prefix]
):
remote_archiver.repository_location = original_location + "_3"
cmd(remote_archiver, "repo-create", RK_ENCRYPTION)
def test_remote_repo_restrict_to_repository(remote_archiver):
if remote_archiver.repository_location.startswith("rest://"):
pytest.skip("Not applicable for rest:// protocol")
repo_path = remote_archiver.repository_path
# restricted to repo directory itself:
with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-repository", repo_path]):
cmd(remote_archiver, "repo-create", RK_ENCRYPTION)
parent_path = os.path.join(repo_path, "..")
with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-repository", parent_path]):
with pytest.raises(PathNotAllowed):
cmd(remote_archiver, "repo-create", RK_ENCRYPTION)
def test_remote_repo_strip_components_doesnt_leak(remote_archiver):
cmd(remote_archiver, "repo-create", RK_ENCRYPTION)
create_regular_file(remote_archiver.input_path, "dir/file", contents=b"test file contents 1")

View file

@ -1,55 +0,0 @@
import os
import subprocess
import tempfile
import time
import pytest
import platformdirs
from . import exec_cmd
from ...platformflags import is_win32
from ...helpers import get_runtime_dir
def have_a_short_runtime_dir(mp):
# Under pytest, we use BORG_BASE_DIR to keep stuff away from the user's normal Borg directories.
# This leads to a very long get_runtime_dir() path — too long for a socket file!
# Thus, we override that again via BORG_RUNTIME_DIR to get a shorter path.
mp.setenv("BORG_RUNTIME_DIR", os.path.join(platformdirs.user_runtime_dir(), "pytest"))
@pytest.fixture
def serve_socket(monkeypatch):
have_a_short_runtime_dir(monkeypatch)
# Use a random unique socket filename, so tests can run in parallel.
socket_file = tempfile.mktemp(suffix=".sock", prefix="borg-", dir=get_runtime_dir())
with subprocess.Popen(["borg", "serve", f"--socket={socket_file}"]) as p:
while not os.path.exists(socket_file):
time.sleep(0.01) # wait until the socket server has started
yield socket_file
p.terminate()
@pytest.mark.skipif(is_win32, reason="hangs on win32")
def test_with_socket(serve_socket, tmpdir, monkeypatch):
have_a_short_runtime_dir(monkeypatch)
repo_path = str(tmpdir.join("repo"))
ret, output = exec_cmd(
f"--socket={serve_socket}", f"--repo=socket://{repo_path}", "repo-create", "--encryption=none"
)
assert ret == 0
ret, output = exec_cmd(f"--socket={serve_socket}", f"--repo=socket://{repo_path}", "repo-info")
assert ret == 0
assert "Repository ID: " in output
monkeypatch.setenv("BORG_DELETE_I_KNOW_WHAT_I_AM_DOING", "YES")
ret, output = exec_cmd(f"--socket={serve_socket}", f"--repo=socket://{repo_path}", "repo-delete")
assert ret == 0
@pytest.mark.skipif(is_win32, reason="hangs on win32")
def test_socket_permissions(serve_socket):
st = os.stat(serve_socket)
assert st.st_mode & 0o0777 == 0o0770 # user and group are permitted to use the socket

View file

@ -246,12 +246,10 @@ class TestLocationWithoutEnv:
def test_socket(self, monkeypatch):
monkeypatch.delenv("BORG_REPO", raising=False)
# socket:// is no longer supported and must be rejected as an invalid location.
url = "socket:///c:/repo/path" if is_win32 else "socket:///repo/path"
path = "c:/repo/path" if is_win32 else "/repo/path"
assert (
repr(Location(url))
== f"Location(proto='socket', user=None, pass=None, host=None, port=None, path='{path}')"
)
with pytest.raises(ValueError):
Location(url)
def test_file(self, monkeypatch):
monkeypatch.delenv("BORG_REPO", raising=False)
@ -334,7 +332,6 @@ class TestLocationWithoutEnv:
]
locations.insert(1, "c:/absolute/path" if is_win32 else "/absolute/path")
locations.insert(2, "file:///c:/absolute/path" if is_win32 else "file:///absolute/path")
locations.insert(3, "socket:///c:/absolute/path" if is_win32 else "socket:///absolute/path")
for location in locations:
assert (
Location(location).canonical_path() == Location(Location(location).canonical_path()).canonical_path()

View file

@ -1,13 +1,12 @@
import errno
import os
import io
import time
from unittest.mock import patch
import pytest
from ..constants import ROBJ_FILE_STREAM
from ..remote import SleepingBandwidthLimiter, RepositoryCache, cache_if_remote
from ..remote import RepositoryCache, cache_if_remote
from ..repository import Repository
from ..crypto.key import PlaintextKey
from ..helpers import IntegrityError
@ -17,60 +16,6 @@ from .repository_test import fchunk, pdchunk
from .crypto.key_test import TestKey
class TestSleepingBandwidthLimiter:
def expect_write(self, fd, data):
self.expected_fd = fd
self.expected_data = data
def check_write(self, fd, data):
assert fd == self.expected_fd
assert data == self.expected_data
return len(data)
def test_write_unlimited(self, monkeypatch):
monkeypatch.setattr(os, "write", self.check_write)
it = SleepingBandwidthLimiter(0)
self.expect_write(5, b"test")
it.write(5, b"test")
def test_write(self, monkeypatch):
monkeypatch.setattr(os, "write", self.check_write)
monkeypatch.setattr(time, "monotonic", lambda: now)
monkeypatch.setattr(time, "sleep", lambda x: None)
now = 100
it = SleepingBandwidthLimiter(100) # Bandwidth quota.
# All fits
self.expect_write(5, b"test")
it.write(5, b"test")
# Only partial write
self.expect_write(5, b"123456")
it.write(5, b"1234567890")
# Sleeps
self.expect_write(5, b"123456")
it.write(5, b"123456")
# Long time interval between writes
now += 10
self.expect_write(5, b"1")
it.write(5, b"1")
# Long time interval between writes, filling up the quota
now += 10
self.expect_write(5, b"1")
it.write(5, b"1")
# Long time interval between writes, filling up the quota to clip to the maximum
now += 10
self.expect_write(5, b"1")
it.write(5, b"1")
class TestRepositoryCache:
@pytest.fixture
def repository(self, tmpdir):

View file

@ -1,13 +1,8 @@
import logging
import os
import sys
import pytest
from ..helpers import Location
from ..helpers import IntegrityError
from ..platformflags import is_win32
from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed
from ..repository import Repository, StoreObjectNotFound, MAX_DATA_SIZE
from ..repository import Repository, MAX_DATA_SIZE
from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION
from .hashindex_test import H
@ -18,22 +13,14 @@ def repository(tmp_path):
yield Repository(repository_location, exclusive=True, create=True)
@pytest.fixture()
def remote_repository(tmp_path):
if is_win32:
pytest.skip("Remote repository does not yet work on Windows.")
repository_location = Location("ssh://__testsuite__/" + os.fspath(tmp_path / "repository"))
yield RemoteRepository(repository_location, exclusive=True, create=True)
def pytest_generate_tests(metafunc):
# Generate tests that run on both local and remote repositories.
# Generate tests that run on repositories.
if "repo_fixtures" in metafunc.fixturenames:
metafunc.parametrize("repo_fixtures", ["repository", "remote_repository"])
metafunc.parametrize("repo_fixtures", ["repository"])
def get_repository_from_fixture(repo_fixtures, request):
# Return the repository object from the fixture for tests that run on both local and remote repositories.
# Return the repository object from the fixture.
return request.getfixturevalue(repo_fixtures)
@ -43,14 +30,7 @@ def reopen(repository, exclusive: bool | None = True, create=False):
raise RuntimeError("Repo must be closed before a reopen. Cannot support nested repository contexts.")
return Repository(repository._location, exclusive=exclusive, create=create)
if isinstance(repository, RemoteRepository):
if repository.p is not None or repository.sock is not None:
raise RuntimeError("Remote repo must be closed before a reopen. Cannot support nested repository contexts.")
return RemoteRepository(repository.location, exclusive=exclusive, create=create)
raise TypeError(
f"Invalid argument type. Expected 'Repository' or 'RemoteRepository', received '{type(repository).__name__}'."
)
raise TypeError(f"Invalid argument type. Expected 'Repository', received '{type(repository).__name__}'.")
def fchunk(data, meta=b"", chunk_id=b"\x00" * 32):
@ -149,125 +129,3 @@ def check(repository, repo_path, repair=False, status=True):
# Make sure no tmp files are left behind
tmp_files = [name for name in os.listdir(repo_path) if "tmp" in name]
assert tmp_files == [], "Found tmp files"
def _get_mock_args():
class MockArgs:
remote_path = "borg"
umask = 0o077
debug_topics = []
rsh = None
def __contains__(self, item):
# to behave like argparse.Namespace
return hasattr(self, item)
return MockArgs()
def test_remote_invalid_rpc(remote_repository):
with remote_repository:
with pytest.raises(InvalidRPCMethod):
remote_repository.call("__init__", {})
def test_remote_rpc_exception_transport(remote_repository):
with remote_repository:
s1 = "test string"
try:
remote_repository.call("inject_exception", {"kind": "DoesNotExist"})
except Repository.DoesNotExist as e:
assert len(e.args) == 1
assert e.args[0] == remote_repository.location.processed
try:
remote_repository.call("inject_exception", {"kind": "AlreadyExists"})
except Repository.AlreadyExists as e:
assert len(e.args) == 1
assert e.args[0] == remote_repository.location.processed
try:
remote_repository.call("inject_exception", {"kind": "CheckNeeded"})
except Repository.CheckNeeded as e:
assert len(e.args) == 1
assert e.args[0] == remote_repository.location.processed
try:
remote_repository.call("inject_exception", {"kind": "IntegrityError"})
except IntegrityError as e:
assert len(e.args) == 1
assert e.args[0] == s1
try:
remote_repository.call("inject_exception", {"kind": "PathNotAllowed"})
except PathNotAllowed as e:
assert len(e.args) == 1
assert e.args[0] == "foo"
try:
remote_repository.call("inject_exception", {"kind": "ObjectNotFound"})
except Repository.ObjectNotFound as e:
assert len(e.args) == 2
assert e.args[0] == s1
assert e.args[1] == remote_repository.location.processed
try:
remote_repository.call("inject_exception", {"kind": "StoreObjectNotFound"})
except StoreObjectNotFound as e:
assert len(e.args) == 1
assert e.args[0] == s1
try:
remote_repository.call("inject_exception", {"kind": "InvalidRPCMethod"})
except InvalidRPCMethod as e:
assert len(e.args) == 1
assert e.args[0] == s1
try:
remote_repository.call("inject_exception", {"kind": "divide"})
except RemoteRepository.RPCError as e:
assert e.unpacked
assert e.get_message().startswith("ZeroDivisionError:")
assert e.exception_class == "ZeroDivisionError"
assert len(e.exception_full) > 0
def test_remote_ssh_cmd(remote_repository):
with remote_repository:
args = _get_mock_args()
remote_repository._args = args
assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "example.com"]
assert remote_repository.ssh_cmd(Location("ssh://user@example.com/foo")) == ["ssh", "user@example.com"]
assert remote_repository.ssh_cmd(Location("ssh://user@example.com:1234/foo")) == [
"ssh",
"-p",
"1234",
"user@example.com",
]
os.environ["BORG_RSH"] = "ssh --foo"
assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "--foo", "example.com"]
def test_remote_borg_cmd(remote_repository):
with remote_repository:
assert remote_repository.borg_cmd(None, testing=True) == [sys.executable, "-m", "borg", "serve"]
args = _get_mock_args()
# XXX without next line we get spurious test fails when using pytest-xdist, root cause unknown:
logging.getLogger().setLevel(logging.INFO)
# note: test logger is on info log level, so --info gets added automagically
assert remote_repository.borg_cmd(args, testing=False) == ["borg", "serve", "--info"]
args.remote_path = "borg-0.28.2"
assert remote_repository.borg_cmd(args, testing=False) == ["borg-0.28.2", "serve", "--info"]
args.debug_topics = ["something_client_side", "repository_compaction"]
assert remote_repository.borg_cmd(args, testing=False) == [
"borg-0.28.2",
"serve",
"--info",
"--debug-topic=borg.debug.repository_compaction",
]
args = _get_mock_args()
assert remote_repository.borg_cmd(args, testing=False) == ["borg", "serve", "--info"]
args.rsh = "ssh -i foo"
remote_repository._args = args
assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "-i", "foo", "example.com"]