Merge pull request #9732 from ThomasWaldmann/remove-ssh-remote-repo
Some checks are pending
Lint / lint (push) Waiting to run
CI / lint (push) Waiting to run
CI / security (push) Waiting to run
CI / asan_ubsan (push) Blocked by required conditions
CI / native_tests (push) Blocked by required conditions
CI / vm_tests (NetBSD, false, netbsd, 10.1) (push) Blocked by required conditions
CI / vm_tests (OmniOS, false, omnios, r151056) (push) Blocked by required conditions
CI / vm_tests (OpenBSD, false, openbsd, 7.8) (push) Blocked by required conditions
CI / vm_tests (borg-freebsd-14-x86_64-gh, FreeBSD, true, freebsd, 14.3) (push) Blocked by required conditions
CI / windows_tests (push) Blocked by required conditions
CodeQL / Analyze (push) Waiting to run

Remove ssh:// and socket:// remote repository for current repos (use rest://)
This commit is contained in:
TW 2026-06-08 12:52:17 +02:00 committed by GitHub
commit 5246d2b51e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
27 changed files with 420 additions and 2068 deletions

View file

@ -171,6 +171,10 @@ New features:
- WIP packs project, major repo format changes, you must create new repos! #8572
- rest:// repository URLs - connect via ssh to remote borgstore REST server,
talking http via stdio, #9593
- removed ssh:// and socket:// support for current repositories; use a rest://
repository instead (it can tunnel over ssh). ssh:// and ``borg serve`` remain
available only for legacy (borg 1.x / v1) repositories, e.g. for
``borg transfer --from-borg1 --other-repo ssh://...``.
- prune: show total vs matching archives in output, #9262
- prune: add --json option, #9222
- archive: preserve cwd archive metadata, #9495

View file

@ -8,6 +8,14 @@ Central repository server with Ansible or Salt
This section gives an example of how to set up a Borg repository server for multiple
clients.
.. note::
This example predates Borg 2 and uses the legacy ``ssh://`` transport (served
by ``borg serve``) and ``borg init``. With Borg 2, the ``ssh://`` transport is
only used for legacy borg 1.x (v1) repositories; for current repositories use a
``rest://`` repository instead (Borg connects via ssh and runs a borgstore REST
server on the remote host), and use ``borg repo-create`` instead of ``borg init``.
Machines
--------

View file

@ -64,7 +64,8 @@ If you only back up your own files, run it as your normal user (i.e. not root).
For a local repository always use the same user to invoke borg.
For a remote repository: always use e.g., ssh://borg@remote_host. You can use this
For a remote repository: always use e.g., rest://borg@remote_host (Borg connects
via ssh and runs a borgstore REST server on the remote). You can use this
from different local users; the remote user running borg and accessing the
repo will always be `borg`.
@ -142,7 +143,7 @@ backed up and that the ``prune`` command keeps and deletes the correct backups.
#!/bin/sh
# Setting this, so the repo does not need to be given on the commandline:
export BORG_REPO=ssh://username@example.com:2022/~/backup/main
export BORG_REPO=rest://username@example.com:2022/path/to/backup/main
# See the section "Passphrase notes" for more infos.
export BORG_PASSPHRASE='XYZl0ngandsecurepa_55_phrasea&&123'
@ -361,19 +362,21 @@ Remote repositories
Borg can initialize and access repositories on remote hosts if the
host is accessible using SSH. This is fastest and easiest when Borg
is installed on the remote host, in which case the following syntax is used::
is installed on the remote host, in which case a ``rest://`` repository URL is
used. Borg connects via SSH and runs a borgstore REST server on the remote host
(talking HTTP over stdio)::
$ borg -r ssh://user@hostname:port/path/to/repo repo-create ...
$ borg -r rest://user@hostname:port/path/to/repo repo-create ...
Note: Please see the usage chapter for a full documentation of repo URLs. Also
see :ref:`ssh_configuration` for recommended settings to avoid disconnects and hangs.
Remote operations over SSH can be automated with SSH keys. You can restrict the
use of the SSH keypair by prepending a forced command to the SSH public key in
the remote server's `authorized_keys` file. This example will start Borg
in server mode and limit it to a specific filesystem path::
.. note::
command="borg serve --restrict-to-path /path/to/repo",restrict ssh-rsa AAAAB3[...]
The legacy ``ssh://`` transport, served by ``borg serve`` on the remote host,
is now only used to access legacy borg 1.x (v1) repositories (e.g. via
``borg transfer --from-borg1 --other-repo ssh://...``). For current
repositories, use a ``rest://`` repository as shown above.
If it is not possible to install Borg on the remote host,
it is still possible to use the remote host to store a repository by
@ -530,7 +533,8 @@ Example with **borg extract**:
Difference when using a **remote borg backup server**:
It is basically all the same as with the local repository, but you need to
refer to the repo using a ``ssh://`` URL.
refer to the repo using a ``rest://`` URL (Borg connects via ssh and runs a
borgstore REST server on the remote host).
In the given example, ``borg`` is the user name used to log into the machine
``backup.example.org`` which runs ssh on port ``2222`` and has the borg repo
@ -544,6 +548,6 @@ case if unattended, automated backups were done).
::
borg -r ssh://borg@backup.example.org:2222/path/to/repo mount /mnt/borg
borg -r rest://borg@backup.example.org:2222/path/to/repo mount /mnt/borg
# or
borg -r ssh://borg@backup.example.org:2222/path/to/repo extract archive
borg -r rest://borg@backup.example.org:2222/path/to/repo extract archive

View file

@ -50,8 +50,7 @@ from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
from .item import Item, ArchiveItem, ItemDiff
from . import platform
from .platform import acl_get, acl_set, set_flags, get_flags, swidth
from .remote import RemoteRepository, cache_if_remote
from .repository import Repository, NoManifestError
from .repository import Repository, NoManifestError, cache_if_remote
from .repoobj import RepoObj
has_link = hasattr(os, "link")
@ -1773,7 +1772,7 @@ class ArchiveChecker:
:param oldest/newest: only check archives older/newer than timedelta from oldest/newest archive timestamp
:param verify_data: integrity verification of data referenced by archives
"""
if not isinstance(repository, (Repository, RemoteRepository)):
if not isinstance(repository, Repository):
logger.error("Checking legacy repositories is not supported.")
return False
logger.info("Starting archive consistency check...")

View file

@ -47,7 +47,7 @@ try:
from ..helpers import sig_int
from ..helpers import get_config_dir
from ..platformflags import is_msystem
from ..remote import RemoteRepository
from ..legacy.remote import LegacyRemoteRepository
from ..selftest import selftest
except BaseException:
# an unhandled exception in the try-block would cause the borg cli command to exit with rc 1 due to python's
@ -538,7 +538,7 @@ def sig_trace_handler(sig_no, stack): # pragma: no cover
def format_tb(exc):
qualname = type(exc).__qualname__
remote = isinstance(exc, RemoteRepository.RPCError)
remote = isinstance(exc, LegacyRemoteRepository.RPCError)
if remote:
prefix = "Borg server: "
trace_back = "\n".join(prefix + line for line in exc.exception_full.splitlines())
@ -632,7 +632,7 @@ def main(): # pragma: no cover
tb_log_level = logging.ERROR if e.traceback else logging.DEBUG
tb = format_tb(e)
exit_code = e.exit_code
except RemoteRepository.RPCError as e:
except LegacyRemoteRepository.RPCError as e:
important = e.traceback
msg = e.exception_full if important else e.get_message()
msgid = e.exception_class

View file

@ -13,7 +13,6 @@ from ..helpers.argparsing import SUPPRESS, PositiveInt
from ..helpers.nanorst import rst_to_terminal
from ..manifest import Manifest, AI_HUMAN_SORT_KEYS
from ..patterns import PatternMatcher
from ..remote import RemoteRepository
from ..repository import Repository
from ..repoobj import RepoObj
from ..patterns import (
@ -30,16 +29,19 @@ logger = create_logger(__name__)
def get_repository(location, *, create, exclusive, lock_wait, lock, args, v1_legacy):
if location.proto in ("ssh", "socket"):
if location.proto == "ssh":
if v1_legacy:
from ..legacy.remote import LegacyRemoteRepository
RemoteRepoCls = LegacyRemoteRepository
repository = LegacyRemoteRepository(
location, create=create, exclusive=exclusive, lock_wait=lock_wait, lock=lock, args=args
)
else:
RemoteRepoCls = RemoteRepository
repository = RemoteRepoCls(
location, create=create, exclusive=exclusive, lock_wait=lock_wait, lock=lock, args=args
)
raise Error(
"ssh:// is no longer supported for current repositories; use rest:// instead "
"(it can tunnel over ssh). ssh:// remains available only for legacy v1 repositories "
"via --from-borg1."
)
elif (
location.proto in ("rest", "sftp", "file", "http", "https", "rclone", "s3", "b2") and not v1_legacy
@ -584,16 +586,6 @@ def define_common_options(add_common_option):
action=Highlander,
help="Use this command to connect to the 'borg serve' process (default: 'ssh')",
)
add_common_option(
"--socket",
metavar="PATH",
dest="use_socket",
default=False,
const=True,
nargs="?",
action=Highlander,
help="Use UNIX DOMAIN (IPC) socket at PATH for client/server communication with socket: protocol.",
)
add_common_option(
"-r",
"--repo",

View file

@ -8,7 +8,6 @@ from ..helpers import bin_to_hex, Error
from ..helpers import ProgressIndicatorPercent
from ..helpers.argparsing import ArgumentParser
from ..manifest import Manifest
from ..remote import RemoteRepository
from ..repository import Repository
from ..logger import create_logger
@ -20,7 +19,7 @@ class ArchiveAnalyzer:
def __init__(self, args, repository, manifest):
self.args = args
self.repository = repository
assert isinstance(repository, (Repository, RemoteRepository))
assert isinstance(repository, Repository)
self.manifest = manifest
self.difference_by_path = defaultdict(int) # directory path -> count of chunks changed

View file

@ -11,7 +11,6 @@ from ..hashindex import ChunkIndex, ChunkIndexEntry
from ..helpers import set_ec, EXIT_ERROR, format_file_size, bin_to_hex
from ..helpers import ProgressIndicatorPercent
from ..manifest import Manifest
from ..remote import RemoteRepository
from ..repository import Repository, repo_lister
from ..logger import create_logger
@ -22,7 +21,7 @@ logger = create_logger()
class ArchiveGarbageCollector:
def __init__(self, repository, manifest, *, stats, iec):
self.repository = repository
assert isinstance(repository, (Repository, RemoteRepository))
assert isinstance(repository, Repository)
self.manifest = manifest
self.chunks = None # a ChunkIndex, here used for: id -> (is_used, stored_size)
self.total_files = None # overall number of source files written to all archives in this repo

View file

@ -7,7 +7,7 @@ from ..helpers import PathSpec
from ..helpers import umount
from ..helpers.argparsing import ArgumentParser
from ..manifest import Manifest
from ..remote import cache_if_remote
from ..repository import cache_if_remote
from ..logger import create_logger

View file

@ -1,5 +1,5 @@
from ..constants import * # NOQA
from ..remote import RepositoryServer
from ..legacy.remote import RepositoryServer
from ..logger import create_logger
from ..helpers.argparsing import ArgumentParser
@ -13,7 +13,6 @@ class ServeMixIn:
RepositoryServer(
restrict_to_paths=args.restrict_to_paths,
restrict_to_repositories=args.restrict_to_repositories,
use_socket=args.use_socket,
permissions=args.permissions,
).serve()
@ -24,15 +23,13 @@ class ServeMixIn:
"""
This command starts a repository server process.
`borg serve` currently supports:
`borg serve` is only used to serve legacy (borg 1.x / v1) repositories over SSH, so that
such repositories can still be accessed remotely (e.g. for `borg transfer --from-borg1`).
Current repositories are accessed via a rest:// repository instead (which can itself tunnel
over SSH), so they do not use `borg serve`.
- Being automatically started via SSH when the borg client uses an ssh://...
remote repository. In this mode, `borg serve` will run until that SSH connection
is terminated.
- Being started by some other means (not by the borg client) as a long-running socket
server to be used for borg clients using a socket://... repository (see the `--socket`
option if you do not want to use the default path for the socket and PID file).
It is automatically started via SSH when a borg client uses an ssh://... repository.
In this mode, `borg serve` will run until that SSH connection is terminated.
Please note that `borg serve` does not support providing a specific repository via the
`--repo` option or the `BORG_REPO` environment variable. It is always the borg client that

View file

@ -1,7 +1,6 @@
from .. import __version__
from ..constants import * # NOQA
from ..helpers.argparsing import ArgumentParser
from ..remote import RemoteRepository
from ..logger import create_logger
@ -14,8 +13,10 @@ class VersionMixIn:
from borg.version import parse_version, format_version
client_version = parse_version(__version__)
if args.location.proto in ("ssh", "socket"):
with RemoteRepository(args.location, lock=False, args=args) as repository:
if args.location.proto == "ssh" and getattr(args, "v1_legacy", False):
from ..legacy.remote import LegacyRemoteRepository
with LegacyRemoteRepository(args.location, lock=False, args=args) as repository:
server_version = repository.server_version
else:
server_version = client_version
@ -28,11 +29,11 @@ class VersionMixIn:
"""
This command displays the Borg client and server versions.
If a local repository is given, the client code directly accesses the repository,
so the client version is also shown as the server version.
For current repositories the client code directly accesses the repository (also for
rest:// repositories), so the client version is shown as the server version, too.
If a remote repository is given (e.g., ssh:), the remote Borg is queried, and
its version is displayed as the server version.
If a legacy (borg 1.x / v1) repository is given via ssh: together with --from-borg1,
the remote Borg is queried, and its version is displayed as the server version.
Examples::
@ -40,8 +41,8 @@ class VersionMixIn:
$ borg version /mnt/backup
1.4.0a / 1.4.0a
# remote repository (client uses 1.4.0 alpha, server uses 1.2.7 release)
$ borg version ssh://borg@borgbackup:repo
# legacy remote repository (client uses 1.4.0 alpha, server uses 1.2.7 release)
$ borg version --from-borg1 ssh://borg@borgbackup:repo
1.4.0a / 1.2.7
Due to the version tuple format used in Borg client/server negotiation, only

View file

@ -32,7 +32,6 @@ from .item import ChunkListEntry
from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
from .manifest import Manifest
from .platform import SaveFile
from .remote import RemoteRepository
from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound, repo_lister
from .security import SecurityManager, assert_secure # noqa: F401
@ -651,7 +650,7 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi
flags=ChunkIndex.F_USED, size=0, pack_id=pack_id, obj_offset=0, obj_size=obj_size
)
# Cache does not contain the manifest.
if not isinstance(repository, (Repository, RemoteRepository)):
if not isinstance(repository, Repository):
del chunks[Manifest.MANIFEST_ID]
duration = perf_counter() - t0 or 0.001
# Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.

View file

@ -96,7 +96,7 @@ class ArchiverSetup:
self.patterns_file_path: str | None = None
def get_kind(self) -> str:
if self.repository_location.startswith("ssh://__testsuite__") or self.repository_location.startswith("rest://"):
if self.repository_location.startswith("rest://"):
return "remote"
elif self.EXE == "borg.exe":
return "binary"

View file

@ -73,7 +73,7 @@ IDS_PER_CHUNK = MAX_DATA_SIZE // 40
# we use it in all places where we need to detect or create all-zero buffers
zeros = bytes(MAX_DATA_SIZE)
# borg.remote read() buffer size
# borg serve (borg.legacy.remote) read() buffer size
BUFSIZE = 10 * 1024 * 1024
# To use a safe, limited unpacker, we need to set an upper limit to the archive count in the manifest.

View file

@ -62,7 +62,7 @@ from .crypto.low_level import blake2b_128
from .archiver._common import build_matcher, build_filter
from .archive import Archive, get_item_uid_gid
from .hashindex import FuseVersionsIndex
from .helpers import daemonize, daemonizing, signal_handler, format_file_size, bin_to_hex, Error
from .helpers import daemonizing, signal_handler, format_file_size, bin_to_hex, Error
from .helpers import HardLinkManager
from .helpers import msgpack
from .helpers.lrucache import LRUCache
@ -70,7 +70,6 @@ from .item import Item
from .platform import uid2user, gid2group
from .platformflags import is_darwin
from .repository import Repository
from .remote import RemoteRepository
def fuse_main():
@ -596,13 +595,10 @@ class FuseOperations(llfuse.Operations, FuseBackend):
"that do not reflect the archive content."
)
if not foreground:
if isinstance(self.repository_uncached, RemoteRepository):
daemonize()
else:
with daemonizing(show_rc=show_rc) as (old_id, new_id):
# local repo: the locking process' PID is changing, migrate it:
logger.debug("fuse: mount local repo, going to background: migrating lock.")
self.repository_uncached.migrate_lock(old_id, new_id)
with daemonizing(show_rc=show_rc) as (old_id, new_id):
# the locking process' PID is changing, migrate it:
logger.debug("fuse: mount repo, going to background: migrating lock.")
self.repository_uncached.migrate_lock(old_id, new_id)
# If the file system crashes, we do not want to umount because in that
# case the mountpoint suddenly appears to become empty. This can have

View file

@ -600,7 +600,7 @@ class Location:
rclone_re = re.compile(r"(?P<proto>rclone):(?P<path>(.*))", re.VERBOSE)
sl = "/" if is_win32 else ""
file_or_socket_re = re.compile(r"(?P<proto>(file|socket))://" + sl + abs_path_re, re.VERBOSE)
file_re = re.compile(r"(?P<proto>file)://" + sl + abs_path_re, re.VERBOSE)
local_re = re.compile(local_path_re, re.VERBOSE)
@ -666,7 +666,7 @@ class Location:
self.proto = m.group("proto")
self.path = m.group("path")
return True
m = self.file_or_socket_re.match(text)
m = self.file_re.match(text)
if m:
self.proto = m.group("proto")
self.path = os.path.normpath(m.group("path"))
@ -709,7 +709,7 @@ class Location:
return self._host.lstrip("[").rstrip("]")
def canonical_path(self):
if self.proto in ("file", "socket"):
if self.proto == "file":
return self.path
if self.proto == "rclone":
return f"{self.proto}:{self.path}"
@ -1342,11 +1342,10 @@ class BorgJsonEncoder(json.JSONEncoder):
from ..legacy.repository import LegacyRepository
from ..repository import Repository
from ..legacy.remote import LegacyRemoteRepository
from ..remote import RemoteRepository
from ..archive import Archive
from ..cache import AdHocWithFilesCache
if isinstance(o, (LegacyRepository, LegacyRemoteRepository)) or isinstance(o, (Repository, RemoteRepository)):
if isinstance(o, (LegacyRepository, LegacyRemoteRepository)) or isinstance(o, Repository):
return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()}
if isinstance(o, Archive):
return o.info()

View file

@ -24,7 +24,7 @@ logger = create_logger()
from .archiver._common import build_matcher, build_filter
from .archive import Archive, get_item_uid_gid
from .hashindex import FuseVersionsIndex
from .helpers import daemonize, daemonizing, signal_handler, bin_to_hex, Error
from .helpers import daemonizing, signal_handler, bin_to_hex, Error
from .helpers import HardLinkManager
from .helpers import msgpack
from .helpers.lrucache import LRUCache
@ -32,7 +32,6 @@ from .item import Item
from .platform import uid2user, gid2group
from .platformflags import is_darwin
from .repository import Repository
from .remote import RemoteRepository
BLOCK_SIZE = 512 # Standard filesystem block size for st_blocks and statfs
DEBUG_LOG: str | None = None # os.path.join(os.getcwd(), "fuse_debug.log")
@ -529,12 +528,9 @@ class borgfs(hlfuse.Operations, FuseBackend):
# hlfuse.FUSE will block if foreground=True, otherwise it returns immediately
if not foreground:
# Background mode: daemonize first, then start FUSE (blocking)
if isinstance(self.repository, RemoteRepository):
daemonize()
else:
with daemonizing(show_rc=show_rc) as (old_id, new_id):
logger.debug("fuse: mount local repo, going to background: migrating lock.")
self.repository.migrate_lock(old_id, new_id)
with daemonizing(show_rc=show_rc) as (old_id, new_id):
logger.debug("fuse: mount repo, going to background: migrating lock.")
self.repository.migrate_lock(old_id, new_id)
# Run the FUSE main loop in foreground (we might be daemonized already or not)
with signal_handler("SIGUSR1", self.sig_info_handler), signal_handler("SIGINFO", self.sig_info_handler):

View file

@ -3,34 +3,32 @@ import functools
import inspect
import logging
import os
import queue
import select
import shlex
import shutil
import socket
import struct
import sys
import tempfile
import textwrap
import time
import traceback
from subprocess import Popen, PIPE
from xxhash import xxh64
import borg.logger
from .. import __version__
from ..compress import Compressor
from ..constants import * # NOQA
from ..helpers import Error, ErrorWithTraceback, IntegrityError
from ..helpers import bin_to_hex
from ..helpers import get_limited_unpacker
from ..helpers import replace_placeholders
from ..helpers import sysinfo
from ..helpers import format_file_size
from ..helpers import safe_unlink
from ..helpers import prepare_subprocess_env, ignore_sigint
from ..helpers import get_socket_filename
from ..fslocking import LockTimeout, NotLocked, NotMyLock, LockFailed
from ..logger import create_logger
from ..logger import create_logger, borg_serve_log_queue
from ..helpers import msgpack
from .repository import LegacyRepository
from ..repository import Repository, StoreObjectNotFound
from ..version import parse_version, format_version
from ..helpers.datastruct import EfficientCollectionQueue
from ..platform import is_win32
@ -733,210 +731,242 @@ class LegacyRemoteRepository:
"""actual remoting is done via self.call in the @api decorator"""
class RepositoryNoCache:
"""A not caching Repository wrapper, passes through to repository.
Just to have same API (including the context manager) as RepositoryCache.
*transform* is a callable taking two arguments, key and raw repository data.
The return value is returned from get()/get_many(). By default, the raw
repository data is returned.
"""
def __init__(self, repository, transform=None):
self.repository = repository
self.transform = transform or (lambda key, data: data)
def close(self):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def get(self, key, read_data=True, raise_missing=True):
return next(self.get_many([key], read_data=read_data, raise_missing=raise_missing, cache=False))
def get_many(self, keys, read_data=True, cache=True, raise_missing=True):
for key, data in zip(keys, self.repository.get_many(keys, read_data=read_data, raise_missing=raise_missing)):
yield self.transform(key, data)
def log_instrumentation(self):
pass
# borg serve: borg only serves legacy (borg 1.x / v1) repositories over ssh:// now (current
# repositories use rest://). The legacy client above (LegacyRemoteRepository) spawns "borg serve"
# on the remote host; this server keeps the legacy RPC method allowlist and opens LegacyRepository.
class RepositoryCache(RepositoryNoCache):
"""
A caching Repository wrapper.
class RepositoryServer: # pragma: no cover
_legacy_rpc_methods = ( # LegacyRepository
"__len__",
"check",
"commit",
"delete",
"destroy",
"get",
"list",
"negotiate",
"open",
"close",
"info",
"put",
"rollback",
"save_key",
"load_key",
"break_lock",
"inject_exception",
"get_manifest", # borg2 LegacyRepository has this
)
Caches Repository GET operations locally.
def __init__(self, restrict_to_paths, restrict_to_repositories, permissions=None):
self.repository = None
self.RepoCls = None
self.rpc_methods = ("open", "close", "negotiate")
self.restrict_to_paths = restrict_to_paths
self.restrict_to_repositories = restrict_to_repositories
self.permissions = permissions
self.client_version = None # we update this after client sends version information
*pack* and *unpack* complement *transform* of the base class.
*pack* receives the output of *transform* and should return bytes,
which are stored in the cache. *unpack* receives these bytes and
should return the initial data (as returned by *transform*).
"""
def filter_args(self, f, kwargs):
"""Remove unknown named parameters from call, because client did (implicitly) say it's ok."""
known = set(inspect.signature(f).parameters)
return {name: kwargs[name] for name in kwargs if name in known}
def __init__(self, repository, pack=None, unpack=None, transform=None):
super().__init__(repository, transform)
self.pack = pack or (lambda data: data)
self.unpack = unpack or (lambda data: data)
self.cache = set()
self.basedir = tempfile.mkdtemp(prefix="borg-cache-")
self.query_size_limit()
self.size = 0
# Instrumentation
self.hits = 0
self.misses = 0
self.slow_misses = 0
self.slow_lat = 0.0
self.evictions = 0
self.enospc = 0
def query_size_limit(self):
available_space = shutil.disk_usage(self.basedir).free
self.size_limit = int(min(available_space * 0.25, 2**31))
def prefixed_key(self, key, complete):
# just prefix another byte telling whether this key refers to a complete chunk
# or a without-data-metadata-only chunk (see also read_data param).
prefix = b"\x01" if complete else b"\x00"
return prefix + key
def key_filename(self, key):
return os.path.join(self.basedir, bin_to_hex(key))
def backoff(self):
self.query_size_limit()
target_size = int(0.9 * self.size_limit)
while self.size > target_size and self.cache:
key = self.cache.pop()
file = self.key_filename(key)
self.size -= os.stat(file).st_size
os.unlink(file)
self.evictions += 1
def add_entry(self, key, data, cache, complete):
transformed = self.transform(key, data)
if not cache:
return transformed
packed = self.pack(transformed)
pkey = self.prefixed_key(key, complete=complete)
file = self.key_filename(pkey)
try:
with open(file, "wb") as fd:
fd.write(packed)
except OSError as os_error:
def send_queued_log(self):
while True:
try:
safe_unlink(file)
except FileNotFoundError:
pass # open() could have failed as well
if os_error.errno == errno.ENOSPC:
self.enospc += 1
self.backoff()
# lr_dict contents see BorgQueueHandler
lr_dict = borg_serve_log_queue.get_nowait()
except queue.Empty:
break
else:
raise
else:
self.size += len(packed)
self.cache.add(pkey)
if self.size > self.size_limit:
self.backoff()
return transformed
msg = msgpack.packb({LOG: lr_dict})
os.write(self.stdout_fd, msg)
def log_instrumentation(self):
logger.debug(
"RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), "
"%d evictions, %d ENOSPC hit",
len(self.cache),
format_file_size(self.size),
format_file_size(self.size_limit),
self.hits,
self.misses,
self.slow_misses,
self.slow_lat,
self.evictions,
self.enospc,
)
def serve(self):
def inner_serve():
os.set_blocking(self.stdin_fd, False)
assert not os.get_blocking(self.stdin_fd)
os.set_blocking(self.stdout_fd, True)
assert os.get_blocking(self.stdout_fd)
unpacker = get_limited_unpacker("server")
shutdown_serve = False
while True:
# before processing any new RPCs, send out all pending log output
self.send_queued_log()
if shutdown_serve:
# shutdown wanted! get out of here after sending all log output.
assert self.repository is None
return
# process new RPCs
r, w, es = select.select([self.stdin_fd], [], [], 10)
if r:
data = os.read(self.stdin_fd, BUFSIZE)
if not data:
shutdown_serve = True
continue
unpacker.feed(data)
for unpacked in unpacker:
if isinstance(unpacked, dict):
msgid = unpacked[MSGID]
method = unpacked[MSG]
args = unpacked[ARGS]
else:
if self.repository is not None:
self.repository.close()
raise UnexpectedRPCDataFormatFromClient(__version__)
try:
# logger.debug(f"{type(self)} method: {type(self.repository)}.{method}")
if method not in self.rpc_methods:
raise InvalidRPCMethod(method)
try:
f = getattr(self, method)
except AttributeError:
f = getattr(self.repository, method)
args = self.filter_args(f, args)
res = f(**args)
except BaseException as e:
# These exceptions are reconstructed on the client end in
# LegacyRemoteRepository.call_many(), and will be handled just like locally raised
# exceptions. Suppress the remote traceback for these, except ErrorWithTraceback,
# which should always display a traceback.
reconstructed_exceptions = (
Repository.InvalidRepository,
Repository.InvalidRepositoryConfig,
Repository.DoesNotExist,
Repository.AlreadyExists,
Repository.PathAlreadyExists,
PathNotAllowed,
Repository.InsufficientFreeSpaceError,
)
# logger.exception(e)
ex_short = traceback.format_exception_only(e.__class__, e)
ex_full = traceback.format_exception(*sys.exc_info())
ex_trace = True
if isinstance(e, Error):
ex_short = [e.get_message()]
ex_trace = e.traceback
if not isinstance(e, reconstructed_exceptions):
logging.debug("\n".join(ex_full))
sys_info = sysinfo()
# StoreObjectNotFound and Repository.ObjectNotFound both have
# __name__ == "ObjectNotFound", so we need to distinguish them
# explicitly for correct client-side reconstruction.
exc_cls_name = (
"StoreObjectNotFound" if isinstance(e, StoreObjectNotFound) else e.__class__.__name__
)
try:
msg = msgpack.packb(
{
MSGID: msgid,
"exception_class": exc_cls_name,
"exception_args": e.args,
"exception_full": ex_full,
"exception_short": ex_short,
"exception_trace": ex_trace,
"sysinfo": sys_info,
}
)
except TypeError:
msg = msgpack.packb(
{
MSGID: msgid,
"exception_class": exc_cls_name,
"exception_args": [
x if isinstance(x, (str, bytes, int)) else None for x in e.args
],
"exception_full": ex_full,
"exception_short": ex_short,
"exception_trace": ex_trace,
"sysinfo": sys_info,
}
)
os.write(self.stdout_fd, msg)
else:
os.write(self.stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res}))
if es:
shutdown_serve = True
continue
# server for one ssh:// connection
self.stdin_fd = sys.stdin.fileno()
self.stdout_fd = sys.stdout.fileno()
inner_serve()
def negotiate(self, client_data):
if isinstance(client_data, dict):
self.client_version = client_data["client_version"]
else:
self.client_version = BORG_VERSION # seems to be newer than current version (no known old format)
# not a known old format, send newest negotiate this version knows
return {"server_version": BORG_VERSION}
def _resolve_path(self, path):
if isinstance(path, bytes):
path = os.fsdecode(path)
path = os.path.realpath(path)
return path
def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, v1_legacy=False):
# borg only serves legacy (v1) repositories now; current repositories are accessed via rest://.
self.RepoCls = LegacyRepository
self.rpc_methods = self._legacy_rpc_methods
logging.debug("Resolving repository path %r", path)
path = self._resolve_path(path)
logging.debug("Resolved repository path to %r", path)
path_with_sep = os.path.join(path, "") # make sure there is a trailing slash (os.sep)
if self.restrict_to_paths:
# if --restrict-to-path P is given, we make sure that we only operate in/below path P.
# for the prefix check, it is important that the compared paths both have trailing slashes,
# so that a path /foobar will NOT be accepted with --restrict-to-path /foo option.
for restrict_to_path in self.restrict_to_paths:
restrict_to_path_with_sep = os.path.join(os.path.realpath(restrict_to_path), "") # trailing slash
if path_with_sep.startswith(restrict_to_path_with_sep):
break
else:
raise PathNotAllowed(path)
if self.restrict_to_repositories:
for restrict_to_repository in self.restrict_to_repositories:
restrict_to_repository_with_sep = os.path.join(os.path.realpath(restrict_to_repository), "")
if restrict_to_repository_with_sep == path_with_sep:
break
else:
raise PathNotAllowed(path)
kwargs = dict(lock_wait=lock_wait, lock=lock, exclusive=exclusive, send_log_cb=self.send_queued_log)
self.repository = self.RepoCls(path, create, **kwargs)
self.repository.__enter__() # clean exit handled by serve() method
return self.repository.id
def close(self):
self.log_instrumentation()
self.cache.clear()
shutil.rmtree(self.basedir)
if self.repository is not None:
self.repository.__exit__(None, None, None)
self.repository = None
borg.logger.flush_logging()
self.send_queued_log()
def get_many(self, keys, read_data=True, cache=True, raise_missing=True):
# It could use different cache keys depending on read_data and cache full vs. meta-only chunks.
unknown_keys = [key for key in keys if self.prefixed_key(key, complete=read_data) not in self.cache]
repository_iterator = zip(
unknown_keys, self.repository.get_many(unknown_keys, read_data=read_data, raise_missing=raise_missing)
)
for key in keys:
pkey = self.prefixed_key(key, complete=read_data)
if pkey in self.cache:
file = self.key_filename(pkey)
with open(file, "rb") as fd:
self.hits += 1
yield self.unpack(fd.read())
else:
for key_, data in repository_iterator:
if key_ == key:
transformed = self.add_entry(key, data, cache, complete=read_data)
self.misses += 1
yield transformed
break
else:
# slow path: eviction during this get_many removed this key from the cache
t0 = time.perf_counter()
data = self.repository.get(key, read_data=read_data, raise_missing=raise_missing)
self.slow_lat += time.perf_counter() - t0
transformed = self.add_entry(key, data, cache, complete=read_data)
self.slow_misses += 1
yield transformed
# Consume any pending requests
for _ in repository_iterator:
pass
def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None, force_cache=False):
"""
Return a Repository(No)Cache for *repository*.
If *decrypted_cache* is a repo_objs object, then get and get_many will return a tuple
(csize, plaintext) instead of the actual data in the repository. The cache will
store decrypted data, which increases CPU efficiency (by avoiding repeatedly decrypting
and more importantly MAC and ID checking cached objects).
Internally, objects are compressed with LZ4.
"""
if decrypted_cache and (pack or unpack or transform):
raise ValueError("decrypted_cache and pack/unpack/transform are incompatible")
elif decrypted_cache:
repo_objs = decrypted_cache
# 32 bit csize, 64 bit (8 byte) xxh64, 1 byte ctype, 1 byte clevel
cache_struct = struct.Struct("=I8sBB")
compressor = Compressor("lz4")
def pack(data):
csize, decrypted = data
meta, compressed = compressor.compress({}, decrypted)
return cache_struct.pack(csize, xxh64(compressed).digest(), meta["ctype"], meta["clevel"]) + compressed
def unpack(data):
data = memoryview(data)
csize, checksum, ctype, clevel = cache_struct.unpack(data[: cache_struct.size])
compressed = data[cache_struct.size :]
if checksum != xxh64(compressed).digest():
raise IntegrityError("detected corrupted data in metadata cache")
meta = dict(ctype=ctype, clevel=clevel, csize=len(compressed))
_, decrypted = compressor.decompress(meta, compressed)
return csize, decrypted
def transform(id_, data):
meta, decrypted = repo_objs.parse(id_, data, ro_type=ROBJ_DONTCARE)
csize = meta.get("csize", len(data))
return csize, decrypted
if isinstance(repository, LegacyRemoteRepository) or force_cache:
return RepositoryCache(repository, pack, unpack, transform)
else:
return RepositoryNoCache(repository, transform)
def inject_exception(self, kind):
s1 = "test string"
s2 = "test string2"
if kind == "DoesNotExist":
raise self.RepoCls.DoesNotExist(s1)
elif kind == "AlreadyExists":
raise self.RepoCls.AlreadyExists(s1)
elif kind == "CheckNeeded":
raise self.RepoCls.CheckNeeded(s1)
elif kind == "IntegrityError":
raise IntegrityError(s1)
elif kind == "PathNotAllowed":
raise PathNotAllowed("foo")
elif kind == "ObjectNotFound":
raise self.RepoCls.ObjectNotFound(s1, s2)
elif kind == "StoreObjectNotFound":
raise StoreObjectNotFound(s1)
elif kind == "InvalidRPCMethod":
raise InvalidRPCMethod(s1)
elif kind == "divide":
0 // 0

File diff suppressed because it is too large Load diff

View file

@ -599,3 +599,58 @@ class Repository:
def store_move(self, name, new_name=None, *, delete=False, undelete=False, deleted=False):
self._lock_refresh()
return self.store.move(name, new_name, delete=delete, undelete=undelete, deleted=deleted)
class RepositoryNoCache:
"""A Repository wrapper that passes through to the repository.
It applies an optional *transform* and provides a uniform context-manager API.
*transform* is a callable taking two arguments, key and raw repository data.
The return value is returned from get()/get_many(). By default, the raw
repository data is returned.
"""
def __init__(self, repository, transform=None):
self.repository = repository
self.transform = transform or (lambda key, data: data)
def close(self):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def get(self, key, read_data=True, raise_missing=True):
return next(self.get_many([key], read_data=read_data, raise_missing=raise_missing, cache=False))
def get_many(self, keys, read_data=True, raise_missing=True, cache=True):
for key, data in zip(keys, self.repository.get_many(keys, read_data=read_data, raise_missing=raise_missing)):
yield self.transform(key, data)
def log_instrumentation(self):
pass
def cache_if_remote(repository, *, decrypted_cache=False, transform=None):
"""
Return a RepositoryNoCache wrapping *repository*.
If *decrypted_cache* is a repo_objs object, then get and get_many will return a tuple
(csize, plaintext) instead of the actual data in the repository (the objects are
parsed/decrypted via the *transform* derived from it).
"""
if decrypted_cache and transform:
raise ValueError("decrypted_cache and transform are incompatible")
elif decrypted_cache:
repo_objs = decrypted_cache
def transform(id_, data):
meta, decrypted = repo_objs.parse(id_, data, ro_type=ROBJ_DONTCARE)
csize = meta.get("csize", len(data))
return csize, decrypted
return RepositoryNoCache(repository, transform)

View file

@ -24,7 +24,6 @@ from ...helpers import init_ec_warnings
from ...logger import flush_logging
from ...manifest import Manifest
from ...platform import get_flags
from ...remote import RemoteRepository
from ...repository import Repository
from .. import has_lchflags, has_mknod, is_utime_fully_supported, have_fuse_mtime_ns, st_mtime_ns_round, filter_xattrs
from .. import changedir, ENOATTR # NOQA
@ -178,10 +177,7 @@ def open_archive(repo_path, name):
def open_repository(archiver):
if archiver.get_kind() == "remote":
location = Location(archiver.repository_location)
if location.proto == "rest":
return Repository(location, exclusive=True)
return RemoteRepository(location)
return Repository(Location(archiver.repository_location), exclusive=True)
else:
return Repository(archiver.repository_path, exclusive=True)

View file

@ -9,7 +9,6 @@ from ...archive import ChunkBuffer
from ...constants import * # NOQA
from ...helpers import bin_to_hex, msgpack
from ...manifest import Manifest
from ...remote import RemoteRepository
from ...repository import Repository
from ..repository_test import fchunk
from . import cmd, src_file, create_src_archive, open_archive, generate_archiver_tests, RK_ENCRYPTION
@ -208,7 +207,7 @@ def test_missing_manifest(archivers, request):
check_cmd_setup(archiver)
archive, repository = open_archive(archiver.repository_path, "archive1")
with repository:
if isinstance(repository, (Repository, RemoteRepository)):
if isinstance(repository, Repository):
repository.store_delete("config/manifest")
else:
repository.delete(Manifest.MANIFEST_ID)

View file

@ -1,6 +1,5 @@
import os
import shutil
from unittest.mock import patch
import pytest
@ -9,7 +8,6 @@ from ...constants import * # NOQA
from ...helpers import Location, get_security_dir, bin_to_hex
from ...helpers import EXIT_ERROR
from ...manifest import Manifest, MandatoryFeatureUnsupported
from ...remote import RemoteRepository, PathNotAllowed
from ...repository import Repository
from .. import llfuse
from .. import changedir
@ -277,49 +275,6 @@ def test_unknown_mandatory_feature_in_cache(archivers, request):
# Begin Remote Tests
def test_remote_repo_restrict_to_path(remote_archiver):
if remote_archiver.repository_location.startswith("rest://"):
pytest.skip("Not applicable for rest:// protocol")
original_location, repo_path = remote_archiver.repository_location, remote_archiver.repository_path
# restricted to repo directory itself:
with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", repo_path]):
cmd(remote_archiver, "repo-create", RK_ENCRYPTION)
# restricted to repo directory itself, fail for other directories with same prefix:
with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", repo_path]):
with pytest.raises(PathNotAllowed):
remote_archiver.repository_location = original_location + "_0"
cmd(remote_archiver, "repo-create", RK_ENCRYPTION)
# restricted to a completely different path:
with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", "/foo"]):
with pytest.raises(PathNotAllowed):
remote_archiver.repository_location = original_location + "_1"
cmd(remote_archiver, "repo-create", RK_ENCRYPTION)
path_prefix = os.path.dirname(repo_path)
# restrict to repo directory's parent directory:
with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", path_prefix]):
remote_archiver.repository_location = original_location + "_2"
cmd(remote_archiver, "repo-create", RK_ENCRYPTION)
# restrict to repo directory's parent directory and another directory:
with patch.object(
RemoteRepository, "extra_test_args", ["--restrict-to-path", "/foo", "--restrict-to-path", path_prefix]
):
remote_archiver.repository_location = original_location + "_3"
cmd(remote_archiver, "repo-create", RK_ENCRYPTION)
def test_remote_repo_restrict_to_repository(remote_archiver):
if remote_archiver.repository_location.startswith("rest://"):
pytest.skip("Not applicable for rest:// protocol")
repo_path = remote_archiver.repository_path
# restricted to repo directory itself:
with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-repository", repo_path]):
cmd(remote_archiver, "repo-create", RK_ENCRYPTION)
parent_path = os.path.join(repo_path, "..")
with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-repository", parent_path]):
with pytest.raises(PathNotAllowed):
cmd(remote_archiver, "repo-create", RK_ENCRYPTION)
def test_remote_repo_strip_components_doesnt_leak(remote_archiver):
cmd(remote_archiver, "repo-create", RK_ENCRYPTION)
create_regular_file(remote_archiver.input_path, "dir/file", contents=b"test file contents 1")

View file

@ -1,55 +0,0 @@
import os
import subprocess
import tempfile
import time
import pytest
import platformdirs
from . import exec_cmd
from ...platformflags import is_win32
from ...helpers import get_runtime_dir
def have_a_short_runtime_dir(mp):
# Under pytest, we use BORG_BASE_DIR to keep stuff away from the user's normal Borg directories.
# This leads to a very long get_runtime_dir() path — too long for a socket file!
# Thus, we override that again via BORG_RUNTIME_DIR to get a shorter path.
mp.setenv("BORG_RUNTIME_DIR", os.path.join(platformdirs.user_runtime_dir(), "pytest"))
@pytest.fixture
def serve_socket(monkeypatch):
have_a_short_runtime_dir(monkeypatch)
# Use a random unique socket filename, so tests can run in parallel.
socket_file = tempfile.mktemp(suffix=".sock", prefix="borg-", dir=get_runtime_dir())
with subprocess.Popen(["borg", "serve", f"--socket={socket_file}"]) as p:
while not os.path.exists(socket_file):
time.sleep(0.01) # wait until the socket server has started
yield socket_file
p.terminate()
@pytest.mark.skipif(is_win32, reason="hangs on win32")
def test_with_socket(serve_socket, tmpdir, monkeypatch):
have_a_short_runtime_dir(monkeypatch)
repo_path = str(tmpdir.join("repo"))
ret, output = exec_cmd(
f"--socket={serve_socket}", f"--repo=socket://{repo_path}", "repo-create", "--encryption=none"
)
assert ret == 0
ret, output = exec_cmd(f"--socket={serve_socket}", f"--repo=socket://{repo_path}", "repo-info")
assert ret == 0
assert "Repository ID: " in output
monkeypatch.setenv("BORG_DELETE_I_KNOW_WHAT_I_AM_DOING", "YES")
ret, output = exec_cmd(f"--socket={serve_socket}", f"--repo=socket://{repo_path}", "repo-delete")
assert ret == 0
@pytest.mark.skipif(is_win32, reason="hangs on win32")
def test_socket_permissions(serve_socket):
st = os.stat(serve_socket)
assert st.st_mode & 0o0777 == 0o0770 # user and group are permitted to use the socket

View file

@ -246,12 +246,10 @@ class TestLocationWithoutEnv:
def test_socket(self, monkeypatch):
monkeypatch.delenv("BORG_REPO", raising=False)
# socket:// is no longer supported and must be rejected as an invalid location.
url = "socket:///c:/repo/path" if is_win32 else "socket:///repo/path"
path = "c:/repo/path" if is_win32 else "/repo/path"
assert (
repr(Location(url))
== f"Location(proto='socket', user=None, pass=None, host=None, port=None, path='{path}')"
)
with pytest.raises(ValueError):
Location(url)
def test_file(self, monkeypatch):
monkeypatch.delenv("BORG_REPO", raising=False)
@ -334,7 +332,6 @@ class TestLocationWithoutEnv:
]
locations.insert(1, "c:/absolute/path" if is_win32 else "/absolute/path")
locations.insert(2, "file:///c:/absolute/path" if is_win32 else "file:///absolute/path")
locations.insert(3, "socket:///c:/absolute/path" if is_win32 else "socket:///absolute/path")
for location in locations:
assert (
Location(location).canonical_path() == Location(Location(location).canonical_path()).canonical_path()

View file

@ -1,243 +0,0 @@
import errno
import os
import io
import time
from unittest.mock import patch
import pytest
from ..constants import ROBJ_FILE_STREAM
from ..remote import SleepingBandwidthLimiter, RepositoryCache, cache_if_remote
from ..repository import Repository
from ..crypto.key import PlaintextKey
from ..helpers import IntegrityError
from ..repoobj import RepoObj
from .hashindex_test import H
from .repository_test import fchunk, pdchunk
from .crypto.key_test import TestKey
class TestSleepingBandwidthLimiter:
def expect_write(self, fd, data):
self.expected_fd = fd
self.expected_data = data
def check_write(self, fd, data):
assert fd == self.expected_fd
assert data == self.expected_data
return len(data)
def test_write_unlimited(self, monkeypatch):
monkeypatch.setattr(os, "write", self.check_write)
it = SleepingBandwidthLimiter(0)
self.expect_write(5, b"test")
it.write(5, b"test")
def test_write(self, monkeypatch):
monkeypatch.setattr(os, "write", self.check_write)
monkeypatch.setattr(time, "monotonic", lambda: now)
monkeypatch.setattr(time, "sleep", lambda x: None)
now = 100
it = SleepingBandwidthLimiter(100) # Bandwidth quota.
# All fits
self.expect_write(5, b"test")
it.write(5, b"test")
# Only partial write
self.expect_write(5, b"123456")
it.write(5, b"1234567890")
# Sleeps
self.expect_write(5, b"123456")
it.write(5, b"123456")
# Long time interval between writes
now += 10
self.expect_write(5, b"1")
it.write(5, b"1")
# Long time interval between writes, filling up the quota
now += 10
self.expect_write(5, b"1")
it.write(5, b"1")
# Long time interval between writes, filling up the quota to clip to the maximum
now += 10
self.expect_write(5, b"1")
it.write(5, b"1")
class TestRepositoryCache:
@pytest.fixture
def repository(self, tmpdir):
self.repository_location = os.path.join(str(tmpdir), "repository")
with Repository(self.repository_location, exclusive=True, create=True) as repository:
repository.put(H(1), fchunk(b"1234"))
repository.put(H(2), fchunk(b"5678"))
repository.put(H(3), fchunk(bytes(100)))
yield repository
@pytest.fixture
def cache(self, repository):
return RepositoryCache(repository)
def test_simple(self, cache: RepositoryCache):
# Single get()s are not cached, since they are used for unique objects like archives.
assert pdchunk(cache.get(H(1))) == b"1234"
assert cache.misses == 1
assert cache.hits == 0
assert [pdchunk(ch) for ch in cache.get_many([H(1)])] == [b"1234"]
assert cache.misses == 2
assert cache.hits == 0
assert [pdchunk(ch) for ch in cache.get_many([H(1)])] == [b"1234"]
assert cache.misses == 2
assert cache.hits == 1
assert pdchunk(cache.get(H(1))) == b"1234"
assert cache.misses == 2
assert cache.hits == 2
def test_meta(self, cache: RepositoryCache):
# Same as test_simple, but not reading the chunk data (metadata only).
# Single get()s are not cached, since they are used for unique objects like archives.
assert pdchunk(cache.get(H(1), read_data=False)) == b""
assert cache.misses == 1
assert cache.hits == 0
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""]
assert cache.misses == 2
assert cache.hits == 0
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""]
assert cache.misses == 2
assert cache.hits == 1
assert pdchunk(cache.get(H(1), read_data=False)) == b""
assert cache.misses == 2
assert cache.hits == 2
def test_mixed(self, cache: RepositoryCache):
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""]
assert cache.misses == 1
assert cache.hits == 0
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=True)] == [b"1234"]
assert cache.misses == 2
assert cache.hits == 0
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""]
assert cache.misses == 2
assert cache.hits == 1
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=True)] == [b"1234"]
assert cache.misses == 2
assert cache.hits == 2
def test_backoff(self, cache: RepositoryCache):
def query_size_limit():
cache.size_limit = 0
assert [pdchunk(ch) for ch in cache.get_many([H(1), H(2)])] == [b"1234", b"5678"]
assert cache.misses == 2
assert cache.evictions == 0
iterator = cache.get_many([H(1), H(3), H(2)])
assert pdchunk(next(iterator)) == b"1234"
# Force cache to back off
qsl = cache.query_size_limit
cache.query_size_limit = query_size_limit # type: ignore[assignment]
cache.backoff()
cache.query_size_limit = qsl # type: ignore[assignment]
# Evicted H(1) and H(2)
assert cache.evictions == 2
assert H(1) not in cache.cache
assert H(2) not in cache.cache
assert pdchunk(next(iterator)) == bytes(100)
assert cache.slow_misses == 0
# Since H(2) was in the cache when we called get_many(), but has
# been evicted during iterating the generator, it will be a slow miss.
assert pdchunk(next(iterator)) == b"5678"
assert cache.slow_misses == 1
def test_enospc(self, cache: RepositoryCache):
class enospc_open:
def __init__(self, *args):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def write(self, data):
raise OSError(errno.ENOSPC, "foo")
def truncate(self, n=None):
pass
iterator = cache.get_many([H(1), H(2), H(3)])
assert pdchunk(next(iterator)) == b"1234"
with patch("builtins.open", enospc_open):
assert pdchunk(next(iterator)) == b"5678"
assert cache.enospc == 1
# We didn't patch query_size_limit, which would set size_limit to a low
# value, so nothing was actually evicted.
assert cache.evictions == 0
assert pdchunk(next(iterator)) == bytes(100)
@pytest.fixture
def key(self, repository, monkeypatch):
monkeypatch.setenv("BORG_PASSPHRASE", "test")
key = PlaintextKey.create(repository, TestKey.MockArgs())
return key
@pytest.fixture
def repo_objs(self, key):
return RepoObj(key)
def _put_encrypted_object(self, repo_objs, repository, data):
id_ = repo_objs.id_hash(data)
repository.put(id_, repo_objs.format(id_, {}, data, ro_type=ROBJ_FILE_STREAM))
return id_
@pytest.fixture
def H1(self, repo_objs, repository):
return self._put_encrypted_object(repo_objs, repository, b"1234")
@pytest.fixture
def H2(self, repo_objs, repository):
return self._put_encrypted_object(repo_objs, repository, b"5678")
@pytest.fixture
def H3(self, repo_objs, repository):
return self._put_encrypted_object(repo_objs, repository, bytes(100))
@pytest.fixture
def decrypted_cache(self, repo_objs, repository):
return cache_if_remote(repository, decrypted_cache=repo_objs, force_cache=True)
def test_cache_corruption(self, decrypted_cache: RepositoryCache, H1, H2, H3):
list(decrypted_cache.get_many([H1, H2, H3]))
iterator = decrypted_cache.get_many([H1, H2, H3])
assert next(iterator) == (4, b"1234")
pkey = decrypted_cache.prefixed_key(H2, complete=True)
with open(decrypted_cache.key_filename(pkey), "a+b") as fd:
fd.seek(-1, io.SEEK_END)
corrupted = (int.from_bytes(fd.read(), "little") ^ 2).to_bytes(1, "little")
fd.seek(-1, io.SEEK_END)
fd.write(corrupted)
fd.truncate()
with pytest.raises(IntegrityError):
assert next(iterator) == (4, b"5678")

View file

@ -1,15 +1,13 @@
import logging
import os
import sys
import pytest
from ..helpers import Location
from ..constants import ROBJ_FILE_STREAM
from ..helpers import IntegrityError
from ..platformflags import is_win32
from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed
from ..repository import Repository, StoreObjectNotFound, MAX_DATA_SIZE
from ..repository import Repository, MAX_DATA_SIZE, cache_if_remote
from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION
from ..crypto.key import PlaintextKey
from .hashindex_test import H
from .crypto.key_test import TestKey
@pytest.fixture()
@ -18,22 +16,14 @@ def repository(tmp_path):
yield Repository(repository_location, exclusive=True, create=True)
@pytest.fixture()
def remote_repository(tmp_path):
if is_win32:
pytest.skip("Remote repository does not yet work on Windows.")
repository_location = Location("ssh://__testsuite__/" + os.fspath(tmp_path / "repository"))
yield RemoteRepository(repository_location, exclusive=True, create=True)
def pytest_generate_tests(metafunc):
# Generate tests that run on both local and remote repositories.
# Generate tests that run on repositories.
if "repo_fixtures" in metafunc.fixturenames:
metafunc.parametrize("repo_fixtures", ["repository", "remote_repository"])
metafunc.parametrize("repo_fixtures", ["repository"])
def get_repository_from_fixture(repo_fixtures, request):
# Return the repository object from the fixture for tests that run on both local and remote repositories.
# Return the repository object from the fixture.
return request.getfixturevalue(repo_fixtures)
@ -43,14 +33,7 @@ def reopen(repository, exclusive: bool | None = True, create=False):
raise RuntimeError("Repo must be closed before a reopen. Cannot support nested repository contexts.")
return Repository(repository._location, exclusive=exclusive, create=create)
if isinstance(repository, RemoteRepository):
if repository.p is not None or repository.sock is not None:
raise RuntimeError("Remote repo must be closed before a reopen. Cannot support nested repository contexts.")
return RemoteRepository(repository.location, exclusive=exclusive, create=create)
raise TypeError(
f"Invalid argument type. Expected 'Repository' or 'RemoteRepository', received '{type(repository).__name__}'."
)
raise TypeError(f"Invalid argument type. Expected 'Repository', received '{type(repository).__name__}'.")
def fchunk(data, meta=b"", chunk_id=b"\x00" * 32):
@ -151,123 +134,51 @@ def check(repository, repo_path, repair=False, status=True):
assert tmp_files == [], "Found tmp files"
def _get_mock_args():
class MockArgs:
remote_path = "borg"
umask = 0o077
debug_topics = []
rsh = None
class TestCacheIfRemote:
@pytest.fixture
def cache_repository(self, tmpdir):
repository_location = os.path.join(str(tmpdir), "repository")
with Repository(repository_location, exclusive=True, create=True) as repository:
repository.put(H(1), fchunk(b"1234"))
repository.put(H(2), fchunk(b"5678"))
repository.put(H(3), fchunk(bytes(100)))
yield repository
def __contains__(self, item):
# to behave like argparse.Namespace
return hasattr(self, item)
def test_passthrough(self, cache_repository):
# Without decrypted_cache, raw repository data is passed through unchanged.
with cache_if_remote(cache_repository) as cached:
assert pdchunk(cached.get(H(1))) == b"1234"
assert [pdchunk(ch) for ch in cached.get_many([H(1), H(2)])] == [b"1234", b"5678"]
return MockArgs()
@pytest.fixture
def key(self, cache_repository, monkeypatch):
monkeypatch.setenv("BORG_PASSPHRASE", "test")
return PlaintextKey.create(cache_repository, TestKey.MockArgs())
@pytest.fixture
def repo_objs(self, key):
return RepoObj(key)
def test_remote_invalid_rpc(remote_repository):
with remote_repository:
with pytest.raises(InvalidRPCMethod):
remote_repository.call("__init__", {})
def _put_encrypted_object(self, repo_objs, repository, data):
id_ = repo_objs.id_hash(data)
repository.put(id_, repo_objs.format(id_, {}, data, ro_type=ROBJ_FILE_STREAM))
return id_
@pytest.fixture
def H1(self, repo_objs, cache_repository):
return self._put_encrypted_object(repo_objs, cache_repository, b"1234")
def test_remote_rpc_exception_transport(remote_repository):
with remote_repository:
s1 = "test string"
@pytest.fixture
def H2(self, repo_objs, cache_repository):
return self._put_encrypted_object(repo_objs, cache_repository, b"5678")
try:
remote_repository.call("inject_exception", {"kind": "DoesNotExist"})
except Repository.DoesNotExist as e:
assert len(e.args) == 1
assert e.args[0] == remote_repository.location.processed
def test_decrypted_cache(self, repo_objs, cache_repository, H1, H2):
# With decrypted_cache, get/get_many return (csize, plaintext) tuples.
with cache_if_remote(cache_repository, decrypted_cache=repo_objs) as cached:
csize, plaintext = cached.get(H1)
assert plaintext == b"1234"
assert [pt for _csize, pt in cached.get_many([H1, H2])] == [b"1234", b"5678"]
try:
remote_repository.call("inject_exception", {"kind": "AlreadyExists"})
except Repository.AlreadyExists as e:
assert len(e.args) == 1
assert e.args[0] == remote_repository.location.processed
try:
remote_repository.call("inject_exception", {"kind": "CheckNeeded"})
except Repository.CheckNeeded as e:
assert len(e.args) == 1
assert e.args[0] == remote_repository.location.processed
try:
remote_repository.call("inject_exception", {"kind": "IntegrityError"})
except IntegrityError as e:
assert len(e.args) == 1
assert e.args[0] == s1
try:
remote_repository.call("inject_exception", {"kind": "PathNotAllowed"})
except PathNotAllowed as e:
assert len(e.args) == 1
assert e.args[0] == "foo"
try:
remote_repository.call("inject_exception", {"kind": "ObjectNotFound"})
except Repository.ObjectNotFound as e:
assert len(e.args) == 2
assert e.args[0] == s1
assert e.args[1] == remote_repository.location.processed
try:
remote_repository.call("inject_exception", {"kind": "StoreObjectNotFound"})
except StoreObjectNotFound as e:
assert len(e.args) == 1
assert e.args[0] == s1
try:
remote_repository.call("inject_exception", {"kind": "InvalidRPCMethod"})
except InvalidRPCMethod as e:
assert len(e.args) == 1
assert e.args[0] == s1
try:
remote_repository.call("inject_exception", {"kind": "divide"})
except RemoteRepository.RPCError as e:
assert e.unpacked
assert e.get_message().startswith("ZeroDivisionError:")
assert e.exception_class == "ZeroDivisionError"
assert len(e.exception_full) > 0
def test_remote_ssh_cmd(remote_repository):
with remote_repository:
args = _get_mock_args()
remote_repository._args = args
assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "example.com"]
assert remote_repository.ssh_cmd(Location("ssh://user@example.com/foo")) == ["ssh", "user@example.com"]
assert remote_repository.ssh_cmd(Location("ssh://user@example.com:1234/foo")) == [
"ssh",
"-p",
"1234",
"user@example.com",
]
os.environ["BORG_RSH"] = "ssh --foo"
assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "--foo", "example.com"]
def test_remote_borg_cmd(remote_repository):
with remote_repository:
assert remote_repository.borg_cmd(None, testing=True) == [sys.executable, "-m", "borg", "serve"]
args = _get_mock_args()
# XXX without next line we get spurious test fails when using pytest-xdist, root cause unknown:
logging.getLogger().setLevel(logging.INFO)
# note: test logger is on info log level, so --info gets added automagically
assert remote_repository.borg_cmd(args, testing=False) == ["borg", "serve", "--info"]
args.remote_path = "borg-0.28.2"
assert remote_repository.borg_cmd(args, testing=False) == ["borg-0.28.2", "serve", "--info"]
args.debug_topics = ["something_client_side", "repository_compaction"]
assert remote_repository.borg_cmd(args, testing=False) == [
"borg-0.28.2",
"serve",
"--info",
"--debug-topic=borg.debug.repository_compaction",
]
args = _get_mock_args()
assert remote_repository.borg_cmd(args, testing=False) == ["borg", "serve", "--info"]
args.rsh = "ssh -i foo"
remote_repository._args = args
assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "-i", "foo", "example.com"]
def test_decrypted_cache_and_transform_incompatible(self, cache_repository, repo_objs):
with pytest.raises(ValueError):
cache_if_remote(cache_repository, decrypted_cache=repo_objs, transform=lambda key, data: data)