From d30d5f4aecaa1b492d7decf85258deb1eff57016 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sun, 4 Aug 2024 15:57:37 +0200 Subject: [PATCH] Repository3 / RemoteRepository3: implement a borgstore based repository Simplify the repository a lot: No repository transactions, no log-like appending, no append-only, no segments, just using a key/value store for the individual chunks. No locking yet. Also: mypy: ignore missing import there are no library stubs for borgstore yet, so mypy errors without that option. pyproject.toml: install borgstore directly from github There is no pypi release yet. use pip install -e . rather than python setup.py develop The latter is deprecated and had issues installing the "borgstore from github" dependency. --- .github/workflows/ci.yml | 3 +- pyproject.toml | 2 + src/borg/archive.py | 10 +- src/borg/archiver/__init__.py | 25 +- src/borg/archiver/_common.py | 29 +- src/borg/archiver/config_cmd.py | 177 --- src/borg/archiver/debug_cmd.py | 39 +- src/borg/archiver/rcompress_cmd.py | 11 +- src/borg/archiver/serve_cmd.py | 2 +- src/borg/archiver/version_cmd.py | 4 +- src/borg/cache.py | 16 +- src/borg/crypto/keymanager.py | 4 +- src/borg/fuse.py | 2 +- src/borg/helpers/misc.py | 2 +- src/borg/helpers/parseformat.py | 4 +- src/borg/manifest.py | 4 +- src/borg/remote.py | 4 +- src/borg/remote3.py | 1269 +++++++++++++++++ src/borg/repository3.py | 314 ++++ src/borg/testsuite/archiver/__init__.py | 20 +- .../testsuite/archiver/bypass_lock_option.py | 130 -- src/borg/testsuite/archiver/check_cmd.py | 8 +- src/borg/testsuite/archiver/checks.py | 24 +- src/borg/testsuite/archiver/config_cmd.py | 64 - src/borg/testsuite/archiver/corruption.py | 18 - src/borg/testsuite/archiver/create_cmd.py | 4 +- src/borg/testsuite/archiver/delete_cmd.py | 6 +- src/borg/testsuite/archiver/key_cmds.py | 14 +- src/borg/testsuite/archiver/rcompress_cmd.py | 4 +- src/borg/testsuite/archiver/rcreate_cmd.py | 29 - src/borg/testsuite/archiver/rename_cmd.py | 4 +- src/borg/testsuite/archiver/return_codes.py | 2 +- src/borg/testsuite/archiver/rinfo_cmd.py | 18 - src/borg/testsuite/cache.py | 6 +- src/borg/testsuite/repoobj.py | 4 +- src/borg/testsuite/repository3.py | 290 ++++ tox.ini | 2 +- 37 files changed, 1967 insertions(+), 601 deletions(-) delete mode 100644 src/borg/archiver/config_cmd.py create mode 100644 src/borg/remote3.py create mode 100644 src/borg/repository3.py delete mode 100644 src/borg/testsuite/archiver/bypass_lock_option.py delete mode 100644 src/borg/testsuite/archiver/config_cmd.py create mode 100644 src/borg/testsuite/repository3.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4f91c3ed3..68fb3a506 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -104,8 +104,7 @@ jobs: pip install -r requirements.d/development.txt - name: Install borgbackup run: | - # pip install -e . - python setup.py -v develop + pip install -e . - name: run tox env env: XDISTN: "4" diff --git a/pyproject.toml b/pyproject.toml index 3003d9bbb..40c374cba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,8 @@ dependencies = [ "platformdirs >=3.0.0, <5.0.0; sys_platform == 'darwin'", # for macOS: breaking changes in 3.0.0, "platformdirs >=2.6.0, <5.0.0; sys_platform != 'darwin'", # for others: 2.6+ works consistently. "argon2-cffi", + "borgstore", + ] [project.optional-dependencies] diff --git a/src/borg/archive.py b/src/borg/archive.py index fe19b4b6b..d45f4426e 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -51,7 +51,7 @@ from .patterns import PathPrefixPattern, FnmatchPattern, IECommand from .item import Item, ArchiveItem, ItemDiff from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname from .remote import cache_if_remote -from .repository import Repository, LIST_SCAN_LIMIT +from .repository3 import Repository3, LIST_SCAN_LIMIT from .repoobj import RepoObj has_link = hasattr(os, "link") @@ -1046,7 +1046,7 @@ Duration: {0.duration} def fetch_async_response(wait=True): try: return self.repository.async_response(wait=wait) - except Repository.ObjectNotFound: + except Repository3.ObjectNotFound: nonlocal error # object not in repo - strange, but we wanted to delete it anyway. if forced == 0: @@ -1093,7 +1093,7 @@ Duration: {0.duration} error = True if progress: pi.finish() - except (msgpack.UnpackException, Repository.ObjectNotFound): + except (msgpack.UnpackException, Repository3.ObjectNotFound): # items metadata corrupted if forced == 0: raise @@ -1887,7 +1887,7 @@ class ArchiveChecker: # Explicitly set the initial usable hash table capacity to avoid performance issues # due to hash table "resonance". # Since reconstruction of archive items can add some new chunks, add 10 % headroom. - self.chunks = ChunkIndex(usable=len(self.repository) * 1.1) + self.chunks = ChunkIndex() marker = None while True: result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker) @@ -1939,7 +1939,7 @@ class ArchiveChecker: chunk_id = chunk_ids_revd.pop(-1) # better efficiency try: encrypted_data = next(chunk_data_iter) - except (Repository.ObjectNotFound, IntegrityErrorBase) as err: + except (Repository3.ObjectNotFound, IntegrityErrorBase) as err: self.error_found = True errors += 1 logger.error("chunk %s: %s", bin_to_hex(chunk_id), err) diff --git a/src/borg/archiver/__init__.py b/src/borg/archiver/__init__.py index f0de10529..1e1e11eed 100644 --- a/src/borg/archiver/__init__.py +++ b/src/borg/archiver/__init__.py @@ -36,7 +36,7 @@ try: from ..helpers import ErrorIgnoringTextIOWrapper from ..helpers import msgpack from ..helpers import sig_int - from ..remote import RemoteRepository + from ..remote3 import RemoteRepository3 from ..selftest import selftest except BaseException: # an unhandled exception in the try-block would cause the borg cli command to exit with rc 1 due to python's @@ -68,7 +68,6 @@ def get_func(args): from .benchmark_cmd import BenchmarkMixIn from .check_cmd import CheckMixIn from .compact_cmd import CompactMixIn -from .config_cmd import ConfigMixIn from .create_cmd import CreateMixIn from .debug_cmd import DebugMixIn from .delete_cmd import DeleteMixIn @@ -98,7 +97,6 @@ class Archiver( BenchmarkMixIn, CheckMixIn, CompactMixIn, - ConfigMixIn, CreateMixIn, DebugMixIn, DeleteMixIn, @@ -336,7 +334,6 @@ class Archiver( self.build_parser_benchmarks(subparsers, common_parser, mid_common_parser) self.build_parser_check(subparsers, common_parser, mid_common_parser) self.build_parser_compact(subparsers, common_parser, mid_common_parser) - self.build_parser_config(subparsers, common_parser, mid_common_parser) self.build_parser_create(subparsers, common_parser, mid_common_parser) self.build_parser_debug(subparsers, common_parser, mid_common_parser) self.build_parser_delete(subparsers, common_parser, mid_common_parser) @@ -412,22 +409,6 @@ class Archiver( elif not args.paths_from_stdin: # need at least 1 path but args.paths may also be populated from patterns parser.error("Need at least one PATH argument.") - if not getattr(args, "lock", True): # Option --bypass-lock sets args.lock = False - bypass_allowed = { - self.do_check, - self.do_config, - self.do_diff, - self.do_export_tar, - self.do_extract, - self.do_info, - self.do_rinfo, - self.do_list, - self.do_rlist, - self.do_mount, - self.do_umount, - } - if func not in bypass_allowed: - raise Error("Not allowed to bypass locking mechanism for chosen command") # we can only have a complete knowledge of placeholder replacements we should do **after** arg parsing, # e.g. due to options like --timestamp that override the current time. # thus we have to initialize replace_placeholders here and process all args that need placeholder replacement. @@ -581,7 +562,7 @@ def sig_trace_handler(sig_no, stack): # pragma: no cover def format_tb(exc): qualname = type(exc).__qualname__ - remote = isinstance(exc, RemoteRepository.RPCError) + remote = isinstance(exc, RemoteRepository3.RPCError) if remote: prefix = "Borg server: " trace_back = "\n".join(prefix + line for line in exc.exception_full.splitlines()) @@ -659,7 +640,7 @@ def main(): # pragma: no cover tb_log_level = logging.ERROR if e.traceback else logging.DEBUG tb = format_tb(e) exit_code = e.exit_code - except RemoteRepository.RPCError as e: + except RemoteRepository3.RPCError as e: important = e.traceback msg = e.exception_full if important else e.get_message() msgid = e.exception_class diff --git a/src/borg/archiver/_common.py b/src/borg/archiver/_common.py index 4a49de9b8..f205cf8ed 100644 --- a/src/borg/archiver/_common.py +++ b/src/borg/archiver/_common.py @@ -14,7 +14,9 @@ from ..helpers.nanorst import rst_to_terminal from ..manifest import Manifest, AI_HUMAN_SORT_KEYS from ..patterns import PatternMatcher from ..remote import RemoteRepository +from ..remote3 import RemoteRepository3 from ..repository import Repository +from ..repository3 import Repository3 from ..repoobj import RepoObj, RepoObj1 from ..patterns import ( ArgparsePatternAction, @@ -29,9 +31,10 @@ from ..logger import create_logger logger = create_logger(__name__) -def get_repository(location, *, create, exclusive, lock_wait, lock, append_only, make_parent_dirs, storage_quota, args): +def get_repository(location, *, create, exclusive, lock_wait, lock, append_only, make_parent_dirs, storage_quota, args, v1_or_v2): if location.proto in ("ssh", "socket"): - repository = RemoteRepository( + RemoteRepoCls = RemoteRepository if v1_or_v2 else RemoteRepository3 + repository = RemoteRepoCls( location, create=create, exclusive=exclusive, @@ -43,7 +46,8 @@ def get_repository(location, *, create, exclusive, lock_wait, lock, append_only, ) else: - repository = Repository( + RepoCls = Repository if v1_or_v2 else Repository3 + repository = RepoCls( location.path, create=create, exclusive=exclusive, @@ -98,8 +102,7 @@ def with_repository( decorator_name="with_repository", ) - # To process the `--bypass-lock` option if specified, we need to - # modify `lock` inside `wrapper`. Therefore we cannot use the + # We may need to modify `lock` inside `wrapper`. Therefore we cannot use the # `nonlocal` statement to access `lock` as modifications would also # affect the scope outside of `wrapper`. Subsequent calls would # only see the overwritten value of `lock`, not the original one. @@ -129,13 +132,15 @@ def with_repository( make_parent_dirs=make_parent_dirs, storage_quota=storage_quota, args=args, + v1_or_v2=False, ) with repository: - if repository.version not in (2,): + if repository.version not in (3,): raise Error( - "This borg version only accepts version 2 repos for -r/--repo. " - "You can use 'borg transfer' to copy archives from old to new repos." + f"This borg version only accepts version 3 repos for -r/--repo, " + f"but not version {repository.version}. " + f"You can use 'borg transfer' to copy archives from old to new repos." ) if manifest or cache: manifest_ = Manifest.load(repository, compatibility) @@ -195,6 +200,7 @@ def with_other_repository(manifest=False, cache=False, compatibility=None): make_parent_dirs=False, storage_quota=None, args=args, + v1_or_v2=True ) with repository: @@ -504,13 +510,6 @@ def define_common_options(add_common_option): action=Highlander, help="wait at most SECONDS for acquiring a repository/cache lock (default: %(default)d).", ) - add_common_option( - "--bypass-lock", - dest="lock", - action="store_false", - default=argparse.SUPPRESS, # only create args attribute if option is specified - help="Bypass locking mechanism", - ) add_common_option("--show-version", dest="show_version", action="store_true", help="show/log the borg version") add_common_option("--show-rc", dest="show_rc", action="store_true", help="show/log the return code (rc)") add_common_option( diff --git a/src/borg/archiver/config_cmd.py b/src/borg/archiver/config_cmd.py deleted file mode 100644 index f92baf4f3..000000000 --- a/src/borg/archiver/config_cmd.py +++ /dev/null @@ -1,177 +0,0 @@ -import argparse -import configparser - -from ._common import with_repository -from ..cache import Cache, assert_secure -from ..constants import * # NOQA -from ..helpers import Error, CommandError -from ..helpers import parse_file_size, hex_to_bin -from ..manifest import Manifest - -from ..logger import create_logger - -logger = create_logger() - - -class ConfigMixIn: - @with_repository(exclusive=True, manifest=False) - def do_config(self, args, repository): - """get, set, and delete values in a repository or cache config file""" - - def repo_validate(section, name, value=None, check_value=True): - if section not in ["repository"]: - raise ValueError("Invalid section") - if name in ["segments_per_dir", "last_segment_checked"]: - if check_value: - try: - int(value) - except ValueError: - raise ValueError("Invalid value") from None - elif name in ["max_segment_size", "additional_free_space", "storage_quota"]: - if check_value: - try: - parse_file_size(value) - except ValueError: - raise ValueError("Invalid value") from None - if name == "storage_quota": - if parse_file_size(value) < parse_file_size("10M"): - raise ValueError("Invalid value: storage_quota < 10M") - elif name == "max_segment_size": - if parse_file_size(value) >= MAX_SEGMENT_SIZE_LIMIT: - raise ValueError("Invalid value: max_segment_size >= %d" % MAX_SEGMENT_SIZE_LIMIT) - elif name in ["append_only"]: - if check_value and value not in ["0", "1"]: - raise ValueError("Invalid value") - elif name in ["id"]: - if check_value: - hex_to_bin(value, length=32) - else: - raise ValueError("Invalid name") - - def cache_validate(section, name, value=None, check_value=True): - if section not in ["cache"]: - raise ValueError("Invalid section") - # currently, we do not support setting anything in the cache via borg config. - raise ValueError("Invalid name") - - def list_config(config): - default_values = { - "version": "1", - "segments_per_dir": str(DEFAULT_SEGMENTS_PER_DIR), - "max_segment_size": str(MAX_SEGMENT_SIZE_LIMIT), - "additional_free_space": "0", - "storage_quota": repository.storage_quota, - "append_only": repository.append_only, - } - print("[repository]") - for key in [ - "version", - "segments_per_dir", - "max_segment_size", - "storage_quota", - "additional_free_space", - "append_only", - "id", - ]: - value = config.get("repository", key, fallback=False) - if value is None: - value = default_values.get(key) - if value is None: - raise Error("The repository config is missing the %s key which has no default value" % key) - print(f"{key} = {value}") - for key in ["last_segment_checked"]: - value = config.get("repository", key, fallback=None) - if value is None: - continue - print(f"{key} = {value}") - - if not args.list: - if args.name is None: - raise CommandError("No config key name was provided.") - try: - section, name = args.name.split(".") - except ValueError: - section = args.cache and "cache" or "repository" - name = args.name - - if args.cache: - manifest = Manifest.load(repository, (Manifest.Operation.WRITE,)) - assert_secure(repository, manifest, self.lock_wait) - cache = Cache(repository, manifest, lock_wait=self.lock_wait) - - try: - if args.cache: - cache.cache_config.load() - config = cache.cache_config._config - save = cache.cache_config.save - validate = cache_validate - else: - config = repository.config - save = lambda: repository.save_config(repository.path, repository.config) # noqa - validate = repo_validate - - if args.delete: - validate(section, name, check_value=False) - config.remove_option(section, name) - if len(config.options(section)) == 0: - config.remove_section(section) - save() - elif args.list: - list_config(config) - elif args.value: - validate(section, name, args.value) - if section not in config.sections(): - config.add_section(section) - config.set(section, name, args.value) - save() - else: - try: - print(config.get(section, name)) - except (configparser.NoOptionError, configparser.NoSectionError) as e: - raise Error(e) - finally: - if args.cache: - cache.close() - - def build_parser_config(self, subparsers, common_parser, mid_common_parser): - from ._common import process_epilog - - config_epilog = process_epilog( - """ - This command gets and sets options in a local repository or cache config file. - For security reasons, this command only works on local repositories. - - To delete a config value entirely, use ``--delete``. To list the values - of the configuration file or the default values, use ``--list``. To get an existing - key, pass only the key name. To set a key, pass both the key name and - the new value. Keys can be specified in the format "section.name" or - simply "name"; the section will default to "repository" and "cache" for - the repo and cache configs, respectively. - - - By default, borg config manipulates the repository config file. Using ``--cache`` - edits the repository cache's config file instead. - """ - ) - subparser = subparsers.add_parser( - "config", - parents=[common_parser], - add_help=False, - description=self.do_config.__doc__, - epilog=config_epilog, - formatter_class=argparse.RawDescriptionHelpFormatter, - help="get and set configuration values", - ) - subparser.set_defaults(func=self.do_config) - subparser.add_argument( - "-c", "--cache", dest="cache", action="store_true", help="get and set values from the repo cache" - ) - - group = subparser.add_mutually_exclusive_group() - group.add_argument( - "-d", "--delete", dest="delete", action="store_true", help="delete the key from the config file" - ) - group.add_argument("-l", "--list", dest="list", action="store_true", help="list the configuration of the repo") - - subparser.add_argument("name", metavar="NAME", nargs="?", help="name of config key") - subparser.add_argument("value", metavar="VALUE", nargs="?", help="new value for key") diff --git a/src/borg/archiver/debug_cmd.py b/src/borg/archiver/debug_cmd.py index fe9df81f4..89f121521 100644 --- a/src/borg/archiver/debug_cmd.py +++ b/src/borg/archiver/debug_cmd.py @@ -15,7 +15,8 @@ from ..helpers import positive_int_validator, archivename_validator from ..helpers import CommandError, RTError from ..manifest import Manifest from ..platform import get_process_id -from ..repository import Repository, LIST_SCAN_LIMIT, TAG_PUT, TAG_DELETE, TAG_COMMIT +from ..repository import Repository, TAG_PUT, TAG_DELETE, TAG_COMMIT +from ..repository3 import Repository3, LIST_SCAN_LIMIT from ..repoobj import RepoObj from ._common import with_repository, Highlander @@ -330,7 +331,7 @@ class DebugMixIn: repository.delete(id) modified = True print("object %s deleted." % hex_id) - except Repository.ObjectNotFound: + except Repository3.ObjectNotFound: print("object %s not found." % hex_id) if modified: repository.commit(compact=False) @@ -351,23 +352,6 @@ class DebugMixIn: except KeyError: print("object %s not found [info from chunks cache]." % hex_id) - @with_repository(manifest=False, exclusive=True) - def do_debug_dump_hints(self, args, repository): - """dump repository hints""" - if not repository._active_txn: - repository.prepare_txn(repository.get_transaction_id()) - try: - hints = dict( - segments=repository.segments, - compact=repository.compact, - storage_quota_use=repository.storage_quota_use, - shadow_index={bin_to_hex(k): v for k, v in repository.shadow_index.items()}, - ) - with dash_open(args.path, "w") as fd: - json.dump(hints, fd, indent=4) - finally: - repository.rollback() - def do_debug_convert_profile(self, args): """convert Borg profile to Python profile""" import marshal @@ -689,23 +673,6 @@ class DebugMixIn: subparser.set_defaults(func=self.do_debug_refcount_obj) subparser.add_argument("ids", metavar="IDs", nargs="+", type=str, help="hex object ID(s) to show refcounts for") - debug_dump_hints_epilog = process_epilog( - """ - This command dumps the repository hints data. - """ - ) - subparser = debug_parsers.add_parser( - "dump-hints", - parents=[common_parser], - add_help=False, - description=self.do_debug_dump_hints.__doc__, - epilog=debug_dump_hints_epilog, - formatter_class=argparse.RawDescriptionHelpFormatter, - help="dump repo hints (debug)", - ) - subparser.set_defaults(func=self.do_debug_dump_hints) - subparser.add_argument("path", metavar="PATH", type=str, help="file to dump data into") - debug_convert_profile_epilog = process_epilog( """ Convert a Borg profile to a Python cProfile compatible profile. diff --git a/src/borg/archiver/rcompress_cmd.py b/src/borg/archiver/rcompress_cmd.py index 30706dcd6..bc096b3c3 100644 --- a/src/borg/archiver/rcompress_cmd.py +++ b/src/borg/archiver/rcompress_cmd.py @@ -24,14 +24,7 @@ def find_chunks(repository, repo_objs, stats, ctype, clevel, olevel): compr_keys = stats["compr_keys"] = set() compr_wanted = ctype, clevel, olevel state = None - chunks_count = len(repository) - chunks_limit = min(1000, max(100, chunks_count // 1000)) - pi = ProgressIndicatorPercent( - total=chunks_count, - msg="Searching for recompression candidates %3.1f%%", - step=0.1, - msgid="rcompress.find_chunks", - ) + chunks_limit = 1000 while True: chunk_ids, state = repository.scan(limit=chunks_limit, state=state) if not chunk_ids: @@ -44,8 +37,6 @@ def find_chunks(repository, repo_objs, stats, ctype, clevel, olevel): compr_keys.add(compr_found) stats[compr_found] += 1 stats["checked_count"] += 1 - pi.show(increase=1) - pi.finish() return recompress_ids diff --git a/src/borg/archiver/serve_cmd.py b/src/borg/archiver/serve_cmd.py index 8cc613c58..3c5165831 100644 --- a/src/borg/archiver/serve_cmd.py +++ b/src/borg/archiver/serve_cmd.py @@ -3,7 +3,7 @@ import argparse from ._common import Highlander from ..constants import * # NOQA from ..helpers import parse_storage_quota -from ..remote import RepositoryServer +from ..remote3 import RepositoryServer from ..logger import create_logger diff --git a/src/borg/archiver/version_cmd.py b/src/borg/archiver/version_cmd.py index 75593cbfb..03981b4d3 100644 --- a/src/borg/archiver/version_cmd.py +++ b/src/borg/archiver/version_cmd.py @@ -2,7 +2,7 @@ import argparse from .. import __version__ from ..constants import * # NOQA -from ..remote import RemoteRepository +from ..remote3 import RemoteRepository3 from ..logger import create_logger @@ -16,7 +16,7 @@ class VersionMixIn: client_version = parse_version(__version__) if args.location.proto in ("ssh", "socket"): - with RemoteRepository(args.location, lock=False, args=args) as repository: + with RemoteRepository3(args.location, lock=False, args=args) as repository: server_version = repository.server_version else: server_version = client_version diff --git a/src/borg/cache.py b/src/borg/cache.py index 88fe32902..ee88793f3 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -32,7 +32,7 @@ from .locking import Lock from .manifest import Manifest from .platform import SaveFile from .remote import cache_if_remote -from .repository import LIST_SCAN_LIMIT +from .repository3 import LIST_SCAN_LIMIT # note: cmtime might be either a ctime or a mtime timestamp, chunks is a list of ChunkListEntry FileCacheEntry = namedtuple("FileCacheEntry", "age inode size cmtime chunks") @@ -718,35 +718,27 @@ class ChunksMixin: return ChunkListEntry(id, size) def _load_chunks_from_repo(self): - # Explicitly set the initial usable hash table capacity to avoid performance issues - # due to hash table "resonance". - # Since we're creating an archive, add 10 % from the start. - num_chunks = len(self.repository) - chunks = ChunkIndex(usable=num_chunks * 1.1) - pi = ProgressIndicatorPercent( - total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks" - ) + chunks = ChunkIndex() t0 = perf_counter() num_requests = 0 + num_chunks = 0 marker = None while True: result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker) num_requests += 1 if not result: break - pi.show(increase=len(result)) marker = result[-1] # All chunks from the repository have a refcount of MAX_VALUE, which is sticky, # therefore we can't/won't delete them. Chunks we added ourselves in this transaction # (e.g. checkpoint archives) are tracked correctly. init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0) for id_ in result: + num_chunks += 1 chunks[id_] = init_entry - assert len(chunks) == num_chunks # LocalCache does not contain the manifest, either. del chunks[self.manifest.MANIFEST_ID] duration = perf_counter() - t0 or 0.01 - pi.finish() logger.debug( "Cache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s", num_chunks, diff --git a/src/borg/crypto/keymanager.py b/src/borg/crypto/keymanager.py index d8d25893d..c2105ec5b 100644 --- a/src/borg/crypto/keymanager.py +++ b/src/borg/crypto/keymanager.py @@ -5,7 +5,7 @@ from hashlib import sha256 from ..helpers import Error, yes, bin_to_hex, hex_to_bin, dash_open from ..manifest import Manifest, NoManifestError -from ..repository import Repository +from ..repository3 import Repository3 from ..repoobj import RepoObj @@ -50,7 +50,7 @@ class KeyManager: try: manifest_chunk = self.repository.get(Manifest.MANIFEST_ID) - except Repository.ObjectNotFound: + except Repository3.ObjectNotFound: raise NoManifestError manifest_data = RepoObj.extract_crypted_data(manifest_chunk) diff --git a/src/borg/fuse.py b/src/borg/fuse.py index 92f145874..dd77f4016 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -46,7 +46,7 @@ from .helpers.lrucache import LRUCache from .item import Item from .platform import uid2user, gid2group from .platformflags import is_darwin -from .remote import RemoteRepository +from .remote import RemoteRepository # TODO 3 def fuse_main(): diff --git a/src/borg/helpers/misc.py b/src/borg/helpers/misc.py index 1f687a0bb..6028b93a3 100644 --- a/src/borg/helpers/misc.py +++ b/src/borg/helpers/misc.py @@ -2,7 +2,7 @@ import logging import io import os import os.path -import platform +import platform # python stdlib import - if this fails, check that cwd != src/borg/ import sys from collections import deque from itertools import islice diff --git a/src/borg/helpers/parseformat.py b/src/borg/helpers/parseformat.py index c69889b18..f1d95cad0 100644 --- a/src/borg/helpers/parseformat.py +++ b/src/borg/helpers/parseformat.py @@ -1182,11 +1182,13 @@ def ellipsis_truncate(msg, space): class BorgJsonEncoder(json.JSONEncoder): def default(self, o): from ..repository import Repository + from ..repository3 import Repository3 from ..remote import RemoteRepository + from ..remote3 import RemoteRepository3 from ..archive import Archive from ..cache import LocalCache, AdHocCache, AdHocWithFilesCache - if isinstance(o, Repository) or isinstance(o, RemoteRepository): + if isinstance(o, (Repository, Repository3)) or isinstance(o, (RemoteRepository, RemoteRepository3)): return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()} if isinstance(o, Archive): return o.info() diff --git a/src/borg/manifest.py b/src/borg/manifest.py index 9b23cb63c..cdc1b99fb 100644 --- a/src/borg/manifest.py +++ b/src/borg/manifest.py @@ -246,11 +246,11 @@ class Manifest: def load(cls, repository, operations, key=None, *, ro_cls=RepoObj): from .item import ManifestItem from .crypto.key import key_factory - from .repository import Repository + from .repository3 import Repository3 try: cdata = repository.get(cls.MANIFEST_ID) - except Repository.ObjectNotFound: + except Repository3.ObjectNotFound: raise NoManifestError if not key: key = key_factory(repository, cdata, ro_cls=ro_cls) diff --git a/src/borg/remote.py b/src/borg/remote.py index 924b36ad7..e035224d7 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -640,6 +640,7 @@ class RemoteRepository: exclusive=exclusive, append_only=append_only, make_parent_dirs=make_parent_dirs, + v1_or_v2=True, # make remote use Repository, not Repository3 ) info = self.info() self.version = info["version"] @@ -939,9 +940,10 @@ class RemoteRepository: since=parse_version("1.0.0"), append_only={"since": parse_version("1.0.7"), "previously": False}, make_parent_dirs={"since": parse_version("1.1.9"), "previously": False}, + v1_or_v2={"since": parse_version("2.0.0b8"), "previously": True}, # TODO fix version ) def open( - self, path, create=False, lock_wait=None, lock=True, exclusive=False, append_only=False, make_parent_dirs=False + self, path, create=False, lock_wait=None, lock=True, exclusive=False, append_only=False, make_parent_dirs=False, v1_or_v2=False ): """actual remoting is done via self.call in the @api decorator""" diff --git a/src/borg/remote3.py b/src/borg/remote3.py new file mode 100644 index 000000000..85687035d --- /dev/null +++ b/src/borg/remote3.py @@ -0,0 +1,1269 @@ +import atexit +import errno +import functools +import inspect +import logging +import os +import queue +import select +import shlex +import shutil +import socket +import struct +import sys +import tempfile +import textwrap +import time +import traceback +from subprocess import Popen, PIPE + +import borg.logger +from . import __version__ +from .compress import Compressor +from .constants import * # NOQA +from .helpers import Error, ErrorWithTraceback, IntegrityError +from .helpers import bin_to_hex +from .helpers import get_limited_unpacker +from .helpers import replace_placeholders +from .helpers import sysinfo +from .helpers import format_file_size +from .helpers import safe_unlink +from .helpers import prepare_subprocess_env, ignore_sigint +from .helpers import get_socket_filename +from .locking import LockTimeout, NotLocked, NotMyLock, LockFailed +from .logger import create_logger, borg_serve_log_queue +from .helpers import msgpack +from .repository import Repository +from .repository3 import Repository3 +from .version import parse_version, format_version +from .checksums import xxh64 +from .helpers.datastruct import EfficientCollectionQueue + +logger = create_logger(__name__) + +BORG_VERSION = parse_version(__version__) +MSGID, MSG, ARGS, RESULT, LOG = "i", "m", "a", "r", "l" + +MAX_INFLIGHT = 100 + +RATELIMIT_PERIOD = 0.1 + + +def os_write(fd, data): + """os.write wrapper so we do not lose data for partial writes.""" + # TODO: this issue is fixed in cygwin since at least 2.8.0, remove this + # wrapper / workaround when this version is considered ancient. + # This is happening frequently on cygwin due to its small pipe buffer size of only 64kiB + # and also due to its different blocking pipe behaviour compared to Linux/*BSD. + # Neither Linux nor *BSD ever do partial writes on blocking pipes, unless interrupted by a + # signal, in which case serve() would terminate. + amount = remaining = len(data) + while remaining: + count = os.write(fd, data) + remaining -= count + if not remaining: + break + data = data[count:] + time.sleep(count * 1e-09) + return amount + + +class ConnectionClosed(Error): + """Connection closed by remote host""" + + exit_mcode = 80 + + +class ConnectionClosedWithHint(ConnectionClosed): + """Connection closed by remote host. {}""" + + exit_mcode = 81 + + +class PathNotAllowed(Error): + """Repository path not allowed: {}""" + + exit_mcode = 83 + + +class InvalidRPCMethod(Error): + """RPC method {} is not valid""" + + exit_mcode = 82 + + +class UnexpectedRPCDataFormatFromClient(Error): + """Borg {}: Got unexpected RPC data format from client.""" + + exit_mcode = 85 + + +class UnexpectedRPCDataFormatFromServer(Error): + """Got unexpected RPC data format from server:\n{}""" + + exit_mcode = 86 + + def __init__(self, data): + try: + data = data.decode()[:128] + except UnicodeDecodeError: + data = data[:128] + data = ["%02X" % byte for byte in data] + data = textwrap.fill(" ".join(data), 16 * 3) + super().__init__(data) + + +class ConnectionBrokenWithHint(Error): + """Connection to remote host is broken. {}""" + + exit_mcode = 87 + + +# Protocol compatibility: +# In general the server is responsible for rejecting too old clients and the client it responsible for rejecting +# too old servers. This ensures that the knowledge what is compatible is always held by the newer component. +# +# For the client the return of the negotiate method is a dict which includes the server version. +# +# All method calls on the remote repository object must be allowlisted in RepositoryServer.rpc_methods and have api +# stubs in RemoteRepository*. The @api decorator on these stubs is used to set server version requirements. +# +# Method parameters are identified only by name and never by position. Unknown parameters are ignored by the server. +# If a new parameter is important and may not be ignored, on the client a parameter specific version requirement needs +# to be added. +# When parameters are removed, they need to be preserved as defaulted parameters on the client stubs so that older +# servers still get compatible input. + + +class RepositoryServer: # pragma: no cover + _rpc_methods = ( + "__len__", + "check", + "commit", + "delete", + "destroy", + "flags", + "flags_many", + "get", + "list", + "scan", + "negotiate", + "open", + "close", + "info", + "put", + "rollback", + "save_key", + "load_key", + "break_lock", + "inject_exception", + ) + + _rpc_methods3 = ( + "__len__", + "check", + "commit", + "delete", + "destroy", + "get", + "list", + "scan", + "negotiate", + "open", + "close", + "info", + "put", + "save_key", + "load_key", + "break_lock", + "inject_exception", + ) + + def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, storage_quota, use_socket): + self.repository = None + self.RepoCls = None + self.rpc_methods = ("open", "close", "negotiate") + self.restrict_to_paths = restrict_to_paths + self.restrict_to_repositories = restrict_to_repositories + # This flag is parsed from the serve command line via Archiver.do_serve, + # i.e. it reflects local system policy and generally ranks higher than + # whatever the client wants, except when initializing a new repository + # (see RepositoryServer.open below). + self.append_only = append_only + self.storage_quota = storage_quota + self.client_version = None # we update this after client sends version information + if use_socket is False: + self.socket_path = None + elif use_socket is True: # --socket + self.socket_path = get_socket_filename() + else: # --socket=/some/path + self.socket_path = use_socket + + def filter_args(self, f, kwargs): + """Remove unknown named parameters from call, because client did (implicitly) say it's ok.""" + known = set(inspect.signature(f).parameters) + return {name: kwargs[name] for name in kwargs if name in known} + + def send_queued_log(self): + while True: + try: + # lr_dict contents see BorgQueueHandler + lr_dict = borg_serve_log_queue.get_nowait() + except queue.Empty: + break + else: + msg = msgpack.packb({LOG: lr_dict}) + os_write(self.stdout_fd, msg) + + def serve(self): + def inner_serve(): + os.set_blocking(self.stdin_fd, False) + assert not os.get_blocking(self.stdin_fd) + os.set_blocking(self.stdout_fd, True) + assert os.get_blocking(self.stdout_fd) + + unpacker = get_limited_unpacker("server") + shutdown_serve = False + while True: + # before processing any new RPCs, send out all pending log output + self.send_queued_log() + + if shutdown_serve: + # shutdown wanted! get out of here after sending all log output. + assert self.repository is None + return + + # process new RPCs + r, w, es = select.select([self.stdin_fd], [], [], 10) + if r: + data = os.read(self.stdin_fd, BUFSIZE) + if not data: + shutdown_serve = True + continue + unpacker.feed(data) + for unpacked in unpacker: + if isinstance(unpacked, dict): + msgid = unpacked[MSGID] + method = unpacked[MSG] + args = unpacked[ARGS] + else: + if self.repository is not None: + self.repository.close() + raise UnexpectedRPCDataFormatFromClient(__version__) + try: + # logger.debug(f"{type(self)} method: {type(self.repository)}.{method}") + if method not in self.rpc_methods: + raise InvalidRPCMethod(method) + try: + f = getattr(self, method) + except AttributeError: + f = getattr(self.repository, method) + args = self.filter_args(f, args) + res = f(**args) + except BaseException as e: + # logger.exception(e) + ex_short = traceback.format_exception_only(e.__class__, e) + ex_full = traceback.format_exception(*sys.exc_info()) + ex_trace = True + if isinstance(e, Error): + ex_short = [e.get_message()] + ex_trace = e.traceback + if isinstance(e, (self.RepoCls.DoesNotExist, self.RepoCls.AlreadyExists, PathNotAllowed)): + # These exceptions are reconstructed on the client end in RemoteRepository*.call_many(), + # and will be handled just like locally raised exceptions. Suppress the remote traceback + # for these, except ErrorWithTraceback, which should always display a traceback. + pass + else: + logging.debug("\n".join(ex_full)) + + sys_info = sysinfo() + try: + msg = msgpack.packb( + { + MSGID: msgid, + "exception_class": e.__class__.__name__, + "exception_args": e.args, + "exception_full": ex_full, + "exception_short": ex_short, + "exception_trace": ex_trace, + "sysinfo": sys_info, + } + ) + except TypeError: + msg = msgpack.packb( + { + MSGID: msgid, + "exception_class": e.__class__.__name__, + "exception_args": [ + x if isinstance(x, (str, bytes, int)) else None for x in e.args + ], + "exception_full": ex_full, + "exception_short": ex_short, + "exception_trace": ex_trace, + "sysinfo": sys_info, + } + ) + os_write(self.stdout_fd, msg) + else: + os_write(self.stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res})) + if es: + shutdown_serve = True + continue + + if self.socket_path: # server for socket:// connections + try: + # remove any left-over socket file + os.unlink(self.socket_path) + except OSError: + if os.path.exists(self.socket_path): + raise + sock_dir = os.path.dirname(self.socket_path) + os.makedirs(sock_dir, exist_ok=True) + if self.socket_path.endswith(".sock"): + pid_file = self.socket_path.replace(".sock", ".pid") + else: + pid_file = self.socket_path + ".pid" + pid = os.getpid() + with open(pid_file, "w") as f: + f.write(str(pid)) + atexit.register(functools.partial(os.remove, pid_file)) + atexit.register(functools.partial(os.remove, self.socket_path)) + sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) + sock.bind(self.socket_path) # this creates the socket file in the fs + sock.listen(0) # no backlog + os.chmod(self.socket_path, mode=0o0770) # group members may use the socket, too. + print(f"borg serve: PID {pid}, listening on socket {self.socket_path} ...", file=sys.stderr) + + while True: + connection, client_address = sock.accept() + print(f"Accepted a connection on socket {self.socket_path} ...", file=sys.stderr) + self.stdin_fd = connection.makefile("rb").fileno() + self.stdout_fd = connection.makefile("wb").fileno() + inner_serve() + print(f"Finished with connection on socket {self.socket_path} .", file=sys.stderr) + else: # server for one ssh:// connection + self.stdin_fd = sys.stdin.fileno() + self.stdout_fd = sys.stdout.fileno() + inner_serve() + + def negotiate(self, client_data): + if isinstance(client_data, dict): + self.client_version = client_data["client_version"] + else: + self.client_version = BORG_VERSION # seems to be newer than current version (no known old format) + + # not a known old format, send newest negotiate this version knows + return {"server_version": BORG_VERSION} + + def _resolve_path(self, path): + if isinstance(path, bytes): + path = os.fsdecode(path) + if path.startswith("/~/"): # /~/x = path x relative to own home dir + home_dir = os.environ.get("HOME") or os.path.expanduser("~%s" % os.environ.get("USER", "")) + path = os.path.join(home_dir, path[3:]) + elif path.startswith("/./"): # /./x = path x relative to cwd + path = path[3:] + return os.path.realpath(path) + + def open( + self, path, create=False, lock_wait=None, lock=True, exclusive=None, append_only=False, make_parent_dirs=False, v1_or_v2=False + ): + self.RepoCls = Repository if v1_or_v2 else Repository3 + self.rpc_methods = self._rpc_methods if v1_or_v2 else self._rpc_methods3 + logging.debug("Resolving repository path %r", path) + path = self._resolve_path(path) + logging.debug("Resolved repository path to %r", path) + path_with_sep = os.path.join(path, "") # make sure there is a trailing slash (os.sep) + if self.restrict_to_paths: + # if --restrict-to-path P is given, we make sure that we only operate in/below path P. + # for the prefix check, it is important that the compared paths both have trailing slashes, + # so that a path /foobar will NOT be accepted with --restrict-to-path /foo option. + for restrict_to_path in self.restrict_to_paths: + restrict_to_path_with_sep = os.path.join(os.path.realpath(restrict_to_path), "") # trailing slash + if path_with_sep.startswith(restrict_to_path_with_sep): + break + else: + raise PathNotAllowed(path) + if self.restrict_to_repositories: + for restrict_to_repository in self.restrict_to_repositories: + restrict_to_repository_with_sep = os.path.join(os.path.realpath(restrict_to_repository), "") + if restrict_to_repository_with_sep == path_with_sep: + break + else: + raise PathNotAllowed(path) + # "borg init" on "borg serve --append-only" (=self.append_only) does not create an append only repo, + # while "borg init --append-only" (=append_only) does, regardless of the --append-only (self.append_only) + # flag for serve. + append_only = (not create and self.append_only) or append_only + self.repository = self.RepoCls( + path, + create, + lock_wait=lock_wait, + lock=lock, + append_only=append_only, + storage_quota=self.storage_quota, + exclusive=exclusive, + make_parent_dirs=make_parent_dirs, + send_log_cb=self.send_queued_log, + ) + self.repository.__enter__() # clean exit handled by serve() method + return self.repository.id + + def close(self): + if self.repository is not None: + self.repository.__exit__(None, None, None) + self.repository = None + borg.logger.flush_logging() + self.send_queued_log() + + def inject_exception(self, kind): + s1 = "test string" + s2 = "test string2" + if kind == "DoesNotExist": + raise self.RepoCls.DoesNotExist(s1) + elif kind == "AlreadyExists": + raise self.RepoCls.AlreadyExists(s1) + elif kind == "CheckNeeded": + raise self.RepoCls.CheckNeeded(s1) + elif kind == "IntegrityError": + raise IntegrityError(s1) + elif kind == "PathNotAllowed": + raise PathNotAllowed("foo") + elif kind == "ObjectNotFound": + raise self.RepoCls.ObjectNotFound(s1, s2) + elif kind == "InvalidRPCMethod": + raise InvalidRPCMethod(s1) + elif kind == "divide": + 0 // 0 + + +class SleepingBandwidthLimiter: + def __init__(self, limit): + if limit: + self.ratelimit = int(limit * RATELIMIT_PERIOD) + self.ratelimit_last = time.monotonic() + self.ratelimit_quota = self.ratelimit + else: + self.ratelimit = None + + def write(self, fd, to_send): + if self.ratelimit: + now = time.monotonic() + if self.ratelimit_last + RATELIMIT_PERIOD <= now: + self.ratelimit_quota += self.ratelimit + if self.ratelimit_quota > 2 * self.ratelimit: + self.ratelimit_quota = 2 * self.ratelimit + self.ratelimit_last = now + if self.ratelimit_quota == 0: + tosleep = self.ratelimit_last + RATELIMIT_PERIOD - now + time.sleep(tosleep) + self.ratelimit_quota += self.ratelimit + self.ratelimit_last = time.monotonic() + if len(to_send) > self.ratelimit_quota: + to_send = to_send[: self.ratelimit_quota] + try: + written = os.write(fd, to_send) + except BrokenPipeError: + raise ConnectionBrokenWithHint("Broken Pipe") from None + if self.ratelimit: + self.ratelimit_quota -= written + return written + + +def api(*, since, **kwargs_decorator): + """Check version requirements and use self.call to do the remote method call. + + specifies the version in which borg introduced this method. + Calling this method when connected to an older version will fail without transmitting anything to the server. + + Further kwargs can be used to encode version specific restrictions: + + is the value resulting in the behaviour before introducing the new parameter. + If a previous hardcoded behaviour is parameterized in a version, this allows calls that use the previously + hardcoded behaviour to pass through and generates an error if another behaviour is requested by the client. + E.g. when 'append_only' was introduced in 1.0.7 the previous behaviour was what now is append_only=False. + Thus @api(..., append_only={'since': parse_version('1.0.7'), 'previously': False}) allows calls + with append_only=False for all version but rejects calls using append_only=True on versions older than 1.0.7. + + is a flag to set the behaviour if an old version is called the new way. + If set to True, the method is called without the (not yet supported) parameter (this should be done if that is the + more desirable behaviour). If False, an exception is generated. + E.g. before 'threshold' was introduced in 1.2.0a8, a hardcoded threshold of 0.1 was used in commit(). + """ + + def decorator(f): + @functools.wraps(f) + def do_rpc(self, *args, **kwargs): + sig = inspect.signature(f) + bound_args = sig.bind(self, *args, **kwargs) + named = {} # Arguments for the remote process + extra = {} # Arguments for the local process + for name, param in sig.parameters.items(): + if name == "self": + continue + if name in bound_args.arguments: + if name == "wait": + extra[name] = bound_args.arguments[name] + else: + named[name] = bound_args.arguments[name] + else: + if param.default is not param.empty: + named[name] = param.default + + if self.server_version < since: + raise self.RPCServerOutdated(f.__name__, format_version(since)) + + for name, restriction in kwargs_decorator.items(): + if restriction["since"] <= self.server_version: + continue + if "previously" in restriction and named[name] == restriction["previously"]: + continue + if restriction.get("dontcare", False): + continue + + raise self.RPCServerOutdated( + f"{f.__name__} {name}={named[name]!s}", format_version(restriction["since"]) + ) + + return self.call(f.__name__, named, **extra) + + return do_rpc + + return decorator + + +class RemoteRepository3: + extra_test_args = [] # type: ignore + + class RPCError(Exception): + def __init__(self, unpacked): + # unpacked has keys: 'exception_args', 'exception_full', 'exception_short', 'sysinfo' + self.unpacked = unpacked + + def get_message(self): + return "\n".join(self.unpacked["exception_short"]) + + @property + def traceback(self): + return self.unpacked.get("exception_trace", True) + + @property + def exception_class(self): + return self.unpacked["exception_class"] + + @property + def exception_full(self): + return "\n".join(self.unpacked["exception_full"]) + + @property + def sysinfo(self): + return self.unpacked["sysinfo"] + + class RPCServerOutdated(Error): + """Borg server is too old for {}. Required version {}""" + + exit_mcode = 84 + + @property + def method(self): + return self.args[0] + + @property + def required_version(self): + return self.args[1] + + def __init__( + self, + location, + create=False, + exclusive=False, + lock_wait=None, + lock=True, + append_only=False, + make_parent_dirs=False, + args=None, + ): + self.location = self._location = location + self.preload_ids = [] + self.msgid = 0 + self.rx_bytes = 0 + self.tx_bytes = 0 + self.to_send = EfficientCollectionQueue(1024 * 1024, bytes) + self.stdin_fd = self.stdout_fd = self.stderr_fd = None + self.stderr_received = b"" # incomplete stderr line bytes received (no \n yet) + self.chunkid_to_msgids = {} + self.ignore_responses = set() + self.responses = {} + self.async_responses = {} + self.shutdown_time = None + self.ratelimit = SleepingBandwidthLimiter(args.upload_ratelimit * 1024 if args and args.upload_ratelimit else 0) + self.upload_buffer_size_limit = args.upload_buffer * 1024 * 1024 if args and args.upload_buffer else 0 + self.unpacker = get_limited_unpacker("client") + self.server_version = None # we update this after server sends its version + self.p = self.sock = None + self._args = args + if self.location.proto == "ssh": + testing = location.host == "__testsuite__" + # when testing, we invoke and talk to a borg process directly (no ssh). + # when not testing, we invoke the system-installed ssh binary to talk to a remote borg. + env = prepare_subprocess_env(system=not testing) + borg_cmd = self.borg_cmd(args, testing) + if not testing: + borg_cmd = self.ssh_cmd(location) + borg_cmd + logger.debug("SSH command line: %s", borg_cmd) + # we do not want the ssh getting killed by Ctrl-C/SIGINT because it is needed for clean shutdown of borg. + # borg's SIGINT handler tries to write a checkpoint and requires the remote repo connection. + self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env, preexec_fn=ignore_sigint) + self.stdin_fd = self.p.stdin.fileno() + self.stdout_fd = self.p.stdout.fileno() + self.stderr_fd = self.p.stderr.fileno() + self.r_fds = [self.stdout_fd, self.stderr_fd] + self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd] + elif self.location.proto == "socket": + if args.use_socket is False or args.use_socket is True: # nothing or --socket + socket_path = get_socket_filename() + else: # --socket=/some/path + socket_path = args.use_socket + self.sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) + try: + self.sock.connect(socket_path) # note: socket_path length is rather limited. + except FileNotFoundError: + self.sock = None + raise Error(f"The socket file {socket_path} does not exist.") + except ConnectionRefusedError: + self.sock = None + raise Error(f"There is no borg serve running for the socket file {socket_path}.") + self.stdin_fd = self.sock.makefile("wb").fileno() + self.stdout_fd = self.sock.makefile("rb").fileno() + self.stderr_fd = None + self.r_fds = [self.stdout_fd] + self.x_fds = [self.stdin_fd, self.stdout_fd] + else: + raise Error(f"Unsupported protocol {location.proto}") + + os.set_blocking(self.stdin_fd, False) + assert not os.get_blocking(self.stdin_fd) + os.set_blocking(self.stdout_fd, False) + assert not os.get_blocking(self.stdout_fd) + if self.stderr_fd is not None: + os.set_blocking(self.stderr_fd, False) + assert not os.get_blocking(self.stderr_fd) + + try: + try: + version = self.call("negotiate", {"client_data": {"client_version": BORG_VERSION}}) + except ConnectionClosed: + raise ConnectionClosedWithHint("Is borg working on the server?") from None + if isinstance(version, dict): + self.server_version = version["server_version"] + else: + raise Exception("Server insisted on using unsupported protocol version %s" % version) + + self.id = self.open( + path=self.location.path, + create=create, + lock_wait=lock_wait, + lock=lock, + exclusive=exclusive, + append_only=append_only, + make_parent_dirs=make_parent_dirs, + ) + info = self.info() + self.version = info["version"] + self.append_only = info["append_only"] + + except Exception: + self.close() + raise + + def __del__(self): + if len(self.responses): + logging.debug("still %d cached responses left in RemoteRepository3" % (len(self.responses),)) + if self.p or self.sock: + self.close() + assert False, "cleanup happened in Repository3.__del__" + + def __repr__(self): + return f"<{self.__class__.__name__} {self.location.canonical_path()}>" + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + try: + if exc_type is not None: + self.shutdown_time = time.monotonic() + 30 + finally: + # in any case, we want to close the repo cleanly. + logger.debug( + "RemoteRepository3: %s bytes sent, %s bytes received, %d messages sent", + format_file_size(self.tx_bytes), + format_file_size(self.rx_bytes), + self.msgid, + ) + self.close() + + @property + def id_str(self): + return bin_to_hex(self.id) + + def borg_cmd(self, args, testing): + """return a borg serve command line""" + # give some args/options to 'borg serve' process as they were given to us + opts = [] + if args is not None: + root_logger = logging.getLogger() + if root_logger.isEnabledFor(logging.DEBUG): + opts.append("--debug") + elif root_logger.isEnabledFor(logging.INFO): + opts.append("--info") + elif root_logger.isEnabledFor(logging.WARNING): + pass # warning is default + elif root_logger.isEnabledFor(logging.ERROR): + opts.append("--error") + elif root_logger.isEnabledFor(logging.CRITICAL): + opts.append("--critical") + else: + raise ValueError("log level missing, fix this code") + + # Tell the remote server about debug topics it may need to consider. + # Note that debug topics are usable for "spew" or "trace" logs which would + # be too plentiful to transfer for normal use, so the server doesn't send + # them unless explicitly enabled. + # + # Needless to say, if you do --debug-topic=repository.compaction, for example, + # with a 1.0.x server it won't work, because the server does not recognize the + # option. + # + # This is not considered a problem, since this is a debugging feature that + # should not be used for regular use. + for topic in args.debug_topics: + if "." not in topic: + topic = "borg.debug." + topic + if "repository" in topic: + opts.append("--debug-topic=%s" % topic) + + if "storage_quota" in args and args.storage_quota: + opts.append("--storage-quota=%s" % args.storage_quota) + env_vars = [] + if testing: + return env_vars + [sys.executable, "-m", "borg", "serve"] + opts + self.extra_test_args + else: # pragma: no cover + remote_path = args.remote_path or os.environ.get("BORG_REMOTE_PATH", "borg") + remote_path = replace_placeholders(remote_path) + return env_vars + [remote_path, "serve"] + opts + + def ssh_cmd(self, location): + """return a ssh command line that can be prefixed to a borg command line""" + rsh = self._args.rsh or os.environ.get("BORG_RSH", "ssh") + args = shlex.split(rsh) + if location.port: + args += ["-p", str(location.port)] + if location.user: + args.append(f"{location.user}@{location.host}") + else: + args.append("%s" % location.host) + return args + + def call(self, cmd, args, **kw): + for resp in self.call_many(cmd, [args], **kw): + return resp + + def call_many(self, cmd, calls, wait=True, is_preloaded=False, async_wait=True): + if not calls and cmd != "async_responses": + return + + def send_buffer(): + if self.to_send: + try: + written = self.ratelimit.write(self.stdin_fd, self.to_send.peek_front()) + self.tx_bytes += written + self.to_send.pop_front(written) + except OSError as e: + # io.write might raise EAGAIN even though select indicates + # that the fd should be writable. + # EWOULDBLOCK is added for defensive programming sake. + if e.errno not in [errno.EAGAIN, errno.EWOULDBLOCK]: + raise + + def pop_preload_msgid(chunkid): + msgid = self.chunkid_to_msgids[chunkid].pop(0) + if not self.chunkid_to_msgids[chunkid]: + del self.chunkid_to_msgids[chunkid] + return msgid + + def handle_error(unpacked): + if "exception_class" not in unpacked: + return + + error = unpacked["exception_class"] + args = unpacked["exception_args"] + + if error == "Error": + raise Error(args[0]) + elif error == "ErrorWithTraceback": + raise ErrorWithTraceback(args[0]) + elif error == "DoesNotExist": + raise Repository3.DoesNotExist(self.location.processed) + elif error == "AlreadyExists": + raise Repository3.AlreadyExists(self.location.processed) + elif error == "CheckNeeded": + raise Repository3.CheckNeeded(self.location.processed) + elif error == "IntegrityError": + raise IntegrityError(args[0]) + elif error == "PathNotAllowed": + raise PathNotAllowed(args[0]) + elif error == "PathPermissionDenied": + raise Repository3.PathPermissionDenied(args[0]) + elif error == "ParentPathDoesNotExist": + raise Repository3.ParentPathDoesNotExist(args[0]) + elif error == "ObjectNotFound": + raise Repository3.ObjectNotFound(args[0], self.location.processed) + elif error == "InvalidRPCMethod": + raise InvalidRPCMethod(args[0]) + elif error == "LockTimeout": + raise LockTimeout(args[0]) + elif error == "LockFailed": + raise LockFailed(args[0], args[1]) + elif error == "NotLocked": + raise NotLocked(args[0]) + elif error == "NotMyLock": + raise NotMyLock(args[0]) + else: + raise self.RPCError(unpacked) + + calls = list(calls) + waiting_for = [] + maximum_to_send = 0 if wait else self.upload_buffer_size_limit + send_buffer() # Try to send data, as some cases (async_response) will never try to send data otherwise. + while wait or calls: + if self.shutdown_time and time.monotonic() > self.shutdown_time: + # we are shutting this RemoteRepository3 down already, make sure we do not waste + # a lot of time in case a lot of async stuff is coming in or remote is gone or slow. + logger.debug( + "shutdown_time reached, shutting down with %d waiting_for and %d async_responses.", + len(waiting_for), + len(self.async_responses), + ) + return + while waiting_for: + try: + unpacked = self.responses.pop(waiting_for[0]) + waiting_for.pop(0) + handle_error(unpacked) + yield unpacked[RESULT] + if not waiting_for and not calls: + return + except KeyError: + break + if cmd == "async_responses": + while True: + try: + msgid, unpacked = self.async_responses.popitem() + except KeyError: + # there is nothing left what we already have received + if async_wait and self.ignore_responses: + # but do not return if we shall wait and there is something left to wait for: + break + else: + return + else: + handle_error(unpacked) + yield unpacked[RESULT] + if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT): + w_fds = [self.stdin_fd] + else: + w_fds = [] + r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1) + if x: + raise Exception("FD exception occurred") + for fd in r: + if fd is self.stdout_fd: + data = os.read(fd, BUFSIZE) + if not data: + raise ConnectionClosed() + self.rx_bytes += len(data) + self.unpacker.feed(data) + for unpacked in self.unpacker: + if not isinstance(unpacked, dict): + raise UnexpectedRPCDataFormatFromServer(data) + + lr_dict = unpacked.get(LOG) + if lr_dict is not None: + # Re-emit remote log messages locally. + _logger = logging.getLogger(lr_dict["name"]) + if _logger.isEnabledFor(lr_dict["level"]): + _logger.handle(logging.LogRecord(**lr_dict)) + continue + + msgid = unpacked[MSGID] + if msgid in self.ignore_responses: + self.ignore_responses.remove(msgid) + # async methods never return values, but may raise exceptions. + if "exception_class" in unpacked: + self.async_responses[msgid] = unpacked + else: + # we currently do not have async result values except "None", + # so we do not add them into async_responses. + if unpacked[RESULT] is not None: + self.async_responses[msgid] = unpacked + else: + self.responses[msgid] = unpacked + elif fd is self.stderr_fd: + data = os.read(fd, 32768) + if not data: + raise ConnectionClosed() + self.rx_bytes += len(data) + # deal with incomplete lines (may appear due to block buffering) + if self.stderr_received: + data = self.stderr_received + data + self.stderr_received = b"" + lines = data.splitlines(keepends=True) + if lines and not lines[-1].endswith((b"\r", b"\n")): + self.stderr_received = lines.pop() + # now we have complete lines in and any partial line in self.stderr_received. + _logger = logging.getLogger() + for line in lines: + # borg serve (remote/server side) should not emit stuff on stderr, + # but e.g. the ssh process (local/client side) might output errors there. + assert line.endswith((b"\r", b"\n")) + # something came in on stderr, log it to not lose it. + # decode late, avoid partial utf-8 sequences. + _logger.warning("stderr: " + line.decode().strip()) + if w: + while ( + (len(self.to_send) <= maximum_to_send) + and (calls or self.preload_ids) + and len(waiting_for) < MAX_INFLIGHT + ): + if calls: + if is_preloaded: + assert cmd == "get", "is_preload is only supported for 'get'" + if calls[0]["id"] in self.chunkid_to_msgids: + waiting_for.append(pop_preload_msgid(calls.pop(0)["id"])) + else: + args = calls.pop(0) + if cmd == "get" and args["id"] in self.chunkid_to_msgids: + waiting_for.append(pop_preload_msgid(args["id"])) + else: + self.msgid += 1 + waiting_for.append(self.msgid) + self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})) + if not self.to_send and self.preload_ids: + chunk_id = self.preload_ids.pop(0) + args = {"id": chunk_id} + self.msgid += 1 + self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid) + self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args})) + + send_buffer() + self.ignore_responses |= set(waiting_for) # we lose order here + + @api( + since=parse_version("1.0.0"), + append_only={"since": parse_version("1.0.7"), "previously": False}, + make_parent_dirs={"since": parse_version("1.1.9"), "previously": False}, + v1_or_v2={"since": parse_version("2.0.0b8"), "previously": True}, # TODO fix version + ) + def open( + self, path, create=False, lock_wait=None, lock=True, exclusive=False, append_only=False, make_parent_dirs=False, v1_or_v2=False + ): + """actual remoting is done via self.call in the @api decorator""" + + @api(since=parse_version("2.0.0a3")) + def info(self): + """actual remoting is done via self.call in the @api decorator""" + + @api(since=parse_version("1.0.0"), max_duration={"since": parse_version("1.2.0a4"), "previously": 0}) + def check(self, repair=False, max_duration=0): + """actual remoting is done via self.call in the @api decorator""" + + @api( + since=parse_version("1.0.0"), + compact={"since": parse_version("1.2.0a0"), "previously": True, "dontcare": True}, + threshold={"since": parse_version("1.2.0a8"), "previously": 0.1, "dontcare": True}, + ) + def commit(self, compact=True, threshold=0.1): + """actual remoting is done via self.call in the @api decorator""" + + @api(since=parse_version("1.0.0")) + def rollback(self): + """actual remoting is done via self.call in the @api decorator""" + + @api(since=parse_version("1.0.0")) + def destroy(self): + """actual remoting is done via self.call in the @api decorator""" + + @api(since=parse_version("1.0.0")) + def __len__(self): + """actual remoting is done via self.call in the @api decorator""" + + @api( + since=parse_version("1.0.0"), + mask={"since": parse_version("2.0.0b2"), "previously": 0}, + value={"since": parse_version("2.0.0b2"), "previously": 0}, + ) + def list(self, limit=None, marker=None, mask=0, value=0): + """actual remoting is done via self.call in the @api decorator""" + + @api(since=parse_version("2.0.0b3")) + def scan(self, limit=None, state=None): + """actual remoting is done via self.call in the @api decorator""" + + def get(self, id, read_data=True): + for resp in self.get_many([id], read_data=read_data): + return resp + + def get_many(self, ids, read_data=True, is_preloaded=False): + yield from self.call_many("get", [{"id": id, "read_data": read_data} for id in ids], is_preloaded=is_preloaded) + + @api(since=parse_version("1.0.0")) + def put(self, id, data, wait=True): + """actual remoting is done via self.call in the @api decorator""" + + @api(since=parse_version("1.0.0")) + def delete(self, id, wait=True): + """actual remoting is done via self.call in the @api decorator""" + + @api(since=parse_version("1.0.0")) + def save_key(self, keydata): + """actual remoting is done via self.call in the @api decorator""" + + @api(since=parse_version("1.0.0")) + def load_key(self): + """actual remoting is done via self.call in the @api decorator""" + + @api(since=parse_version("1.0.0")) + def break_lock(self): + """actual remoting is done via self.call in the @api decorator""" + + def close(self): + if self.p or self.sock: + self.call("close", {}, wait=True) + if self.p: + self.p.stdin.close() + self.p.stdout.close() + self.p.wait() + self.p = None + if self.sock: + try: + self.sock.shutdown(socket.SHUT_RDWR) + except OSError as e: + if e.errno != errno.ENOTCONN: + raise + self.sock.close() + self.sock = None + + def async_response(self, wait=True): + for resp in self.call_many("async_responses", calls=[], wait=True, async_wait=wait): + return resp + + def preload(self, ids): + self.preload_ids += ids + + +class RepositoryNoCache: + """A not caching Repository wrapper, passes through to repository. + + Just to have same API (including the context manager) as RepositoryCache. + + *transform* is a callable taking two arguments, key and raw repository data. + The return value is returned from get()/get_many(). By default, the raw + repository data is returned. + """ + + def __init__(self, repository, transform=None): + self.repository = repository + self.transform = transform or (lambda key, data: data) + + def close(self): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def get(self, key, read_data=True): + return next(self.get_many([key], read_data=read_data, cache=False)) + + def get_many(self, keys, read_data=True, cache=True): + for key, data in zip(keys, self.repository.get_many(keys, read_data=read_data)): + yield self.transform(key, data) + + def log_instrumentation(self): + pass + + +class RepositoryCache(RepositoryNoCache): + """ + A caching Repository wrapper. + + Caches Repository GET operations locally. + + *pack* and *unpack* complement *transform* of the base class. + *pack* receives the output of *transform* and should return bytes, + which are stored in the cache. *unpack* receives these bytes and + should return the initial data (as returned by *transform*). + """ + + def __init__(self, repository, pack=None, unpack=None, transform=None): + super().__init__(repository, transform) + self.pack = pack or (lambda data: data) + self.unpack = unpack or (lambda data: data) + self.cache = set() + self.basedir = tempfile.mkdtemp(prefix="borg-cache-") + self.query_size_limit() + self.size = 0 + # Instrumentation + self.hits = 0 + self.misses = 0 + self.slow_misses = 0 + self.slow_lat = 0.0 + self.evictions = 0 + self.enospc = 0 + + def query_size_limit(self): + available_space = shutil.disk_usage(self.basedir).free + self.size_limit = int(min(available_space * 0.25, 2**31)) + + def prefixed_key(self, key, complete): + # just prefix another byte telling whether this key refers to a complete chunk + # or a without-data-metadata-only chunk (see also read_data param). + prefix = b"\x01" if complete else b"\x00" + return prefix + key + + def key_filename(self, key): + return os.path.join(self.basedir, bin_to_hex(key)) + + def backoff(self): + self.query_size_limit() + target_size = int(0.9 * self.size_limit) + while self.size > target_size and self.cache: + key = self.cache.pop() + file = self.key_filename(key) + self.size -= os.stat(file).st_size + os.unlink(file) + self.evictions += 1 + + def add_entry(self, key, data, cache, complete): + transformed = self.transform(key, data) + if not cache: + return transformed + packed = self.pack(transformed) + pkey = self.prefixed_key(key, complete=complete) + file = self.key_filename(pkey) + try: + with open(file, "wb") as fd: + fd.write(packed) + except OSError as os_error: + try: + safe_unlink(file) + except FileNotFoundError: + pass # open() could have failed as well + if os_error.errno == errno.ENOSPC: + self.enospc += 1 + self.backoff() + else: + raise + else: + self.size += len(packed) + self.cache.add(pkey) + if self.size > self.size_limit: + self.backoff() + return transformed + + def log_instrumentation(self): + logger.debug( + "RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), " + "%d evictions, %d ENOSPC hit", + len(self.cache), + format_file_size(self.size), + format_file_size(self.size_limit), + self.hits, + self.misses, + self.slow_misses, + self.slow_lat, + self.evictions, + self.enospc, + ) + + def close(self): + self.log_instrumentation() + self.cache.clear() + shutil.rmtree(self.basedir) + + def get_many(self, keys, read_data=True, cache=True): + # It could use different cache keys depending on read_data and cache full vs. meta-only chunks. + unknown_keys = [key for key in keys if self.prefixed_key(key, complete=read_data) not in self.cache] + repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys, read_data=read_data)) + for key in keys: + pkey = self.prefixed_key(key, complete=read_data) + if pkey in self.cache: + file = self.key_filename(pkey) + with open(file, "rb") as fd: + self.hits += 1 + yield self.unpack(fd.read()) + else: + for key_, data in repository_iterator: + if key_ == key: + transformed = self.add_entry(key, data, cache, complete=read_data) + self.misses += 1 + yield transformed + break + else: + # slow path: eviction during this get_many removed this key from the cache + t0 = time.perf_counter() + data = self.repository.get(key, read_data=read_data) + self.slow_lat += time.perf_counter() - t0 + transformed = self.add_entry(key, data, cache, complete=read_data) + self.slow_misses += 1 + yield transformed + # Consume any pending requests + for _ in repository_iterator: + pass + + +def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None, force_cache=False): + """ + Return a Repository(No)Cache for *repository*. + + If *decrypted_cache* is a repo_objs object, then get and get_many will return a tuple + (csize, plaintext) instead of the actual data in the repository. The cache will + store decrypted data, which increases CPU efficiency (by avoiding repeatedly decrypting + and more importantly MAC and ID checking cached objects). + Internally, objects are compressed with LZ4. + """ + if decrypted_cache and (pack or unpack or transform): + raise ValueError("decrypted_cache and pack/unpack/transform are incompatible") + elif decrypted_cache: + repo_objs = decrypted_cache + # 32 bit csize, 64 bit (8 byte) xxh64, 1 byte ctype, 1 byte clevel + cache_struct = struct.Struct("=I8sBB") + compressor = Compressor("lz4") + + def pack(data): + csize, decrypted = data + meta, compressed = compressor.compress({}, decrypted) + return cache_struct.pack(csize, xxh64(compressed), meta["ctype"], meta["clevel"]) + compressed + + def unpack(data): + data = memoryview(data) + csize, checksum, ctype, clevel = cache_struct.unpack(data[: cache_struct.size]) + compressed = data[cache_struct.size :] + if checksum != xxh64(compressed): + raise IntegrityError("detected corrupted data in metadata cache") + meta = dict(ctype=ctype, clevel=clevel, csize=len(compressed)) + _, decrypted = compressor.decompress(meta, compressed) + return csize, decrypted + + def transform(id_, data): + meta, decrypted = repo_objs.parse(id_, data, ro_type=ROBJ_DONTCARE) + csize = meta.get("csize", len(data)) + return csize, decrypted + + if isinstance(repository, RemoteRepository3) or force_cache: + return RepositoryCache(repository, pack, unpack, transform) + else: + return RepositoryNoCache(repository, transform) diff --git a/src/borg/repository3.py b/src/borg/repository3.py new file mode 100644 index 000000000..3edee5c4b --- /dev/null +++ b/src/borg/repository3.py @@ -0,0 +1,314 @@ +import os + +from borgstore.store import Store +from borgstore.store import ObjectNotFound as StoreObjectNotFound + +from .constants import * # NOQA +from .helpers import Error, ErrorWithTraceback, IntegrityError +from .helpers import Location +from .helpers import bin_to_hex, hex_to_bin +from .logger import create_logger +from .repoobj import RepoObj + +logger = create_logger(__name__) + + +class Repository3: + """borgstore based key value store""" + + class AlreadyExists(Error): + """A repository already exists at {}.""" + + exit_mcode = 10 + + class CheckNeeded(ErrorWithTraceback): + """Inconsistency detected. Please run "borg check {}".""" + + exit_mcode = 12 + + class DoesNotExist(Error): + """Repository {} does not exist.""" + + exit_mcode = 13 + + class InsufficientFreeSpaceError(Error): + """Insufficient free space to complete transaction (required: {}, available: {}).""" + + exit_mcode = 14 + + class InvalidRepository(Error): + """{} is not a valid repository. Check repo config.""" + + exit_mcode = 15 + + class InvalidRepositoryConfig(Error): + """{} does not have a valid configuration. Check repo config [{}].""" + + exit_mcode = 16 + + class ObjectNotFound(ErrorWithTraceback): + """Object with key {} not found in repository {}.""" + + exit_mcode = 17 + + def __init__(self, id, repo): + if isinstance(id, bytes): + id = bin_to_hex(id) + super().__init__(id, repo) + + class ParentPathDoesNotExist(Error): + """The parent path of the repo directory [{}] does not exist.""" + + exit_mcode = 18 + + class PathAlreadyExists(Error): + """There is already something at {}.""" + + exit_mcode = 19 + + class StorageQuotaExceeded(Error): + """The storage quota ({}) has been exceeded ({}). Try deleting some archives.""" + + exit_mcode = 20 + + class PathPermissionDenied(Error): + """Permission denied to {}.""" + + exit_mcode = 21 + + def __init__( + self, + path, + create=False, + exclusive=False, + lock_wait=None, + lock=True, + append_only=False, + storage_quota=None, + make_parent_dirs=False, + send_log_cb=None, + ): + self.path = os.path.abspath(path) + url = "file://%s" % self.path + # use a Store with flat config storage and 2-levels-nested data storage + self.store = Store(url, levels={"config/": [0], "data/": [2]}) + self._location = Location(url) + self.version = None + # long-running repository methods which emit log or progress output are responsible for calling + # the ._send_log method periodically to get log and progress output transferred to the borg client + # in a timely manner, in case we have a RemoteRepository. + # for local repositories ._send_log can be called also (it will just do nothing in that case). + self._send_log = send_log_cb or (lambda: None) + self.do_create = create + self.created = False + self.acceptable_repo_versions = (3, ) + self.opened = False + self.append_only = append_only # XXX not implemented / not implementable + self.storage_quota = storage_quota # XXX not implemented + self.storage_quota_use = 0 # XXX not implemented + + def __repr__(self): + return f"<{self.__class__.__name__} {self.path}>" + + def __enter__(self): + if self.do_create: + self.do_create = False + self.create() + self.created = True + self.open() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + @property + def id_str(self): + return bin_to_hex(self.id) + + def create(self): + """Create a new empty repository""" + self.store.create() + self.store.open() + self.store.store("config/readme", REPOSITORY_README.encode()) + self.version = 3 + self.store.store("config/version", str(self.version).encode()) + self.store.store("config/id", bin_to_hex(os.urandom(32)).encode()) + self.store.close() + + def _set_id(self, id): + # for testing: change the id of an existing repository + assert self.opened + assert isinstance(id, bytes) and len(id) == 32 + self.id = id + self.store.store("config/id", bin_to_hex(id).encode()) + + def save_key(self, keydata): + # note: saving an empty key means that there is no repokey anymore + self.store.store("keys/repokey", keydata) + + def load_key(self): + keydata = self.store.load("keys/repokey") + # note: if we return an empty string, it means there is no repo key + return keydata + + def destroy(self): + """Destroy the repository""" + self.close() + self.store.destroy() + + def open(self): + self.store.open() + readme = self.store.load("config/readme").decode() + if readme != REPOSITORY_README: + raise self.InvalidRepository(self.path) + self.version = int(self.store.load("config/version").decode()) + if self.version not in self.acceptable_repo_versions: + self.close() + raise self.InvalidRepositoryConfig( + self.path, "repository version %d is not supported by this borg version" % self.version + ) + self.id = hex_to_bin(self.store.load("config/id").decode(), length=32) + self.opened = True + + def close(self): + if self.opened: + self.store.close() + self.opened = False + + def info(self): + """return some infos about the repo (must be opened first)""" + info = dict(id=self.id, version=self.version, storage_quota_use=self.storage_quota_use, storage_quota=self.storage_quota, append_only=self.append_only) + return info + + def commit(self, compact=True, threshold=0.1): + pass + + def check(self, repair=False, max_duration=0): + """Check repository consistency + + This method verifies all segment checksums and makes sure + the index is consistent with the data stored in the segments. + """ + mode = "full" + logger.info("Starting repository check") + # XXX TODO + logger.info("Finished %s repository check, no problems found.", mode) + return True + + def scan_low_level(self, segment=None, offset=None): + raise NotImplementedError + + def __len__(self): + raise NotImplementedError + + def __contains__(self, id): + raise NotImplementedError + + def list(self, limit=None, marker=None, mask=0, value=0): + """ + list IDs starting from after id - in index (pseudo-random) order. + + if mask and value are given, only return IDs where flags & mask == value (default: all IDs). + """ + infos = self.store.list("data") # XXX we can only get the full list from the store + ids = [hex_to_bin(info.name) for info in infos] + if marker is not None: + idx = ids.index(marker) + ids = ids[idx + 1:] + if limit is not None: + return ids[:limit] + return ids + + + def scan(self, limit=None, state=None): + """ + list (the next) chunk IDs from the repository. + + state can either be None (initially, when starting to scan) or the object + returned from a previous scan call (meaning "continue scanning"). + + returns: list of chunk ids, state + """ + # we only have store.list() anyway, so just call .list() from here. + ids = self.list(limit=limit, marker=state) + state = ids[-1] if ids else None + return ids, state + + def get(self, id, read_data=True): + id_hex = bin_to_hex(id) + key = "data/" + id_hex + try: + if read_data: + # read everything + return self.store.load(key) + else: + # RepoObj layout supports separately encrypted metadata and data. + # We return enough bytes so the client can decrypt the metadata. + meta_len_size = RepoObj.meta_len_hdr.size + extra_len = 1024 - meta_len_size # load a bit more, 1024b, reduces round trips + obj = self.store.load(key, size=meta_len_size + extra_len) + meta_len = obj[0:meta_len_size] + if len(meta_len) != meta_len_size: + raise IntegrityError( + f"Object too small [id {id_hex}]: expected {meta_len_size}, got {len(meta_len)} bytes" + ) + ml = RepoObj.meta_len_hdr.unpack(meta_len)[0] + if ml > extra_len: + # we did not get enough, need to load more, but not all. + # this should be rare, as chunk metadata is rather small usually. + obj = self.store.load(key, size=meta_len_size + ml) + meta = obj[meta_len_size:meta_len_size + ml] + if len(meta) != ml: + raise IntegrityError( + f"Object too small [id {id_hex}]: expected {ml}, got {len(meta)} bytes" + ) + return meta_len + meta + except StoreObjectNotFound: + raise self.ObjectNotFound(id, self.path) from None + + + def get_many(self, ids, read_data=True, is_preloaded=False): + for id_ in ids: + yield self.get(id_, read_data=read_data) + + def put(self, id, data, wait=True): + """put a repo object + + Note: when doing calls with wait=False this gets async and caller must + deal with async results / exceptions later. + """ + data_size = len(data) + if data_size > MAX_DATA_SIZE: + raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]") + + key = "data/" + bin_to_hex(id) + self.store.store(key, data) + + def delete(self, id, wait=True): + """delete a repo object + + Note: when doing calls with wait=False this gets async and caller must + deal with async results / exceptions later. + """ + key = "data/" + bin_to_hex(id) + try: + self.store.delete(key) + except StoreObjectNotFound: + raise self.ObjectNotFound(id, self.path) from None + + def async_response(self, wait=True): + """Get one async result (only applies to remote repositories). + + async commands (== calls with wait=False, e.g. delete and put) have no results, + but may raise exceptions. These async exceptions must get collected later via + async_response() calls. Repeat the call until it returns None. + The previous calls might either return one (non-None) result or raise an exception. + If wait=True is given and there are outstanding responses, it will wait for them + to arrive. With wait=False, it will only return already received responses. + """ + + def preload(self, ids): + """Preload objects (only applies to remote repositories)""" + + def break_lock(self): + pass diff --git a/src/borg/testsuite/archiver/__init__.py b/src/borg/testsuite/archiver/__init__.py index 9d7a5db42..b8da0ea1d 100644 --- a/src/borg/testsuite/archiver/__init__.py +++ b/src/borg/testsuite/archiver/__init__.py @@ -27,8 +27,8 @@ from ...helpers import init_ec_warnings from ...logger import flush_logging from ...manifest import Manifest from ...platform import get_flags -from ...remote import RemoteRepository -from ...repository import Repository +from ...remote3 import RemoteRepository3 +from ...repository3 import Repository3 from .. import has_lchflags, is_utime_fully_supported, have_fuse_mtime_ns, st_mtime_ns_round, no_selinux from .. import changedir from .. import are_symlinks_supported, are_hardlinks_supported, are_fifos_supported @@ -169,7 +169,7 @@ def create_src_archive(archiver, name, ts=None): def open_archive(repo_path, name): - repository = Repository(repo_path, exclusive=True) + repository = Repository3(repo_path, exclusive=True) with repository: manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) archive = Archive(manifest, name) @@ -178,9 +178,9 @@ def open_archive(repo_path, name): def open_repository(archiver): if archiver.get_kind() == "remote": - return RemoteRepository(Location(archiver.repository_location)) + return RemoteRepository3(Location(archiver.repository_location)) else: - return Repository(archiver.repository_path, exclusive=True) + return Repository3(archiver.repository_path, exclusive=True) def create_regular_file(input_path, name, size=0, contents=None): @@ -256,17 +256,13 @@ def create_test_files(input_path, create_hardlinks=True): def _extract_repository_id(repo_path): - with Repository(repo_path) as repository: + with Repository3(repo_path) as repository: return repository.id def _set_repository_id(repo_path, id): - config = ConfigParser(interpolation=None) - config.read(os.path.join(repo_path, "config")) - config.set("repository", "id", bin_to_hex(id)) - with open(os.path.join(repo_path, "config"), "w") as fd: - config.write(fd) - with Repository(repo_path) as repository: + with Repository3(repo_path) as repository: + repository._set_id(id) return repository.id diff --git a/src/borg/testsuite/archiver/bypass_lock_option.py b/src/borg/testsuite/archiver/bypass_lock_option.py deleted file mode 100644 index 8ddeb6762..000000000 --- a/src/borg/testsuite/archiver/bypass_lock_option.py +++ /dev/null @@ -1,130 +0,0 @@ -import pytest - -from ...constants import * # NOQA -from ...helpers import EXIT_ERROR -from ...locking import LockFailed -from ...remote import RemoteRepository -from .. import llfuse -from . import cmd, create_src_archive, RK_ENCRYPTION, read_only, fuse_mount - - -def test_readonly_check(archiver): - cmd(archiver, "rcreate", RK_ENCRYPTION) - create_src_archive(archiver, "test") - - with read_only(archiver.repository_path): - # verify that command normally doesn't work with read-only repo - if archiver.FORK_DEFAULT: - cmd(archiver, "check", "--verify-data", exit_code=EXIT_ERROR) - else: - with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo: - cmd(archiver, "check", "--verify-data") - if isinstance(excinfo.value, RemoteRepository.RPCError): - assert excinfo.value.exception_class == "LockFailed" - # verify that command works with read-only repo when using --bypass-lock - cmd(archiver, "check", "--verify-data", "--bypass-lock") - - -def test_readonly_diff(archiver): - cmd(archiver, "rcreate", RK_ENCRYPTION) - create_src_archive(archiver, "a") - create_src_archive(archiver, "b") - - with read_only(archiver.repository_path): - # verify that command normally doesn't work with read-only repo - if archiver.FORK_DEFAULT: - cmd(archiver, "diff", "a", "b", exit_code=EXIT_ERROR) - else: - with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo: - cmd(archiver, "diff", "a", "b") - if isinstance(excinfo.value, RemoteRepository.RPCError): - assert excinfo.value.exception_class == "LockFailed" - # verify that command works with read-only repo when using --bypass-lock - cmd(archiver, "diff", "a", "b", "--bypass-lock") - - -def test_readonly_export_tar(archiver): - cmd(archiver, "rcreate", RK_ENCRYPTION) - create_src_archive(archiver, "test") - - with read_only(archiver.repository_path): - # verify that command normally doesn't work with read-only repo - if archiver.FORK_DEFAULT: - cmd(archiver, "export-tar", "test", "test.tar", exit_code=EXIT_ERROR) - else: - with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo: - cmd(archiver, "export-tar", "test", "test.tar") - if isinstance(excinfo.value, RemoteRepository.RPCError): - assert excinfo.value.exception_class == "LockFailed" - # verify that command works with read-only repo when using --bypass-lock - cmd(archiver, "export-tar", "test", "test.tar", "--bypass-lock") - - -def test_readonly_extract(archiver): - cmd(archiver, "rcreate", RK_ENCRYPTION) - create_src_archive(archiver, "test") - - with read_only(archiver.repository_path): - # verify that command normally doesn't work with read-only repo - if archiver.FORK_DEFAULT: - cmd(archiver, "extract", "test", exit_code=EXIT_ERROR) - else: - with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo: - cmd(archiver, "extract", "test") - if isinstance(excinfo.value, RemoteRepository.RPCError): - assert excinfo.value.exception_class == "LockFailed" - # verify that command works with read-only repo when using --bypass-lock - cmd(archiver, "extract", "test", "--bypass-lock") - - -def test_readonly_info(archiver): - cmd(archiver, "rcreate", RK_ENCRYPTION) - create_src_archive(archiver, "test") - with read_only(archiver.repository_path): - # verify that command normally doesn't work with read-only repo - if archiver.FORK_DEFAULT: - cmd(archiver, "rinfo", exit_code=EXIT_ERROR) - else: - with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo: - cmd(archiver, "rinfo") - if isinstance(excinfo.value, RemoteRepository.RPCError): - assert excinfo.value.exception_class == "LockFailed" - # verify that command works with read-only repo when using --bypass-lock - cmd(archiver, "rinfo", "--bypass-lock") - - -def test_readonly_list(archiver): - cmd(archiver, "rcreate", RK_ENCRYPTION) - create_src_archive(archiver, "test") - with read_only(archiver.repository_path): - # verify that command normally doesn't work with read-only repo - if archiver.FORK_DEFAULT: - cmd(archiver, "rlist", exit_code=EXIT_ERROR) - else: - with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo: - cmd(archiver, "rlist") - if isinstance(excinfo.value, RemoteRepository.RPCError): - assert excinfo.value.exception_class == "LockFailed" - # verify that command works with read-only repo when using --bypass-lock - cmd(archiver, "rlist", "--bypass-lock") - - -@pytest.mark.skipif(not llfuse, reason="llfuse not installed") -def test_readonly_mount(archiver): - cmd(archiver, "rcreate", RK_ENCRYPTION) - create_src_archive(archiver, "test") - with read_only(archiver.repository_path): - # verify that command normally doesn't work with read-only repo - if archiver.FORK_DEFAULT: - with fuse_mount(archiver, exit_code=EXIT_ERROR): - pass - else: - with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo: - # self.fuse_mount always assumes fork=True, so for this test we have to set fork=False manually - with fuse_mount(archiver, fork=False): - pass - if isinstance(excinfo.value, RemoteRepository.RPCError): - assert excinfo.value.exception_class == "LockFailed" - # verify that command works with read-only repo when using --bypass-lock - with fuse_mount(archiver, None, "--bypass-lock"): - pass diff --git a/src/borg/testsuite/archiver/check_cmd.py b/src/borg/testsuite/archiver/check_cmd.py index 87fd10ab3..2727a5781 100644 --- a/src/borg/testsuite/archiver/check_cmd.py +++ b/src/borg/testsuite/archiver/check_cmd.py @@ -8,7 +8,7 @@ from ...archive import ChunkBuffer from ...constants import * # NOQA from ...helpers import bin_to_hex, msgpack from ...manifest import Manifest -from ...repository import Repository +from ...repository3 import Repository3 from . import cmd, src_file, create_src_archive, open_archive, generate_archiver_tests, RK_ENCRYPTION pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary") # NOQA @@ -28,12 +28,10 @@ def test_check_usage(archivers, request): output = cmd(archiver, "check", "-v", "--progress", exit_code=0) assert "Starting repository check" in output assert "Starting archive consistency check" in output - assert "Checking segments" in output output = cmd(archiver, "check", "-v", "--repository-only", exit_code=0) assert "Starting repository check" in output assert "Starting archive consistency check" not in output - assert "Checking segments" not in output output = cmd(archiver, "check", "-v", "--archives-only", exit_code=0) assert "Starting repository check" not in output @@ -348,7 +346,7 @@ def test_extra_chunks(archivers, request): pytest.skip("only works locally") check_cmd_setup(archiver) cmd(archiver, "check", exit_code=0) - with Repository(archiver.repository_location, exclusive=True) as repository: + with Repository3(archiver.repository_location, exclusive=True) as repository: repository.put(b"01234567890123456789012345678901", b"xxxx") repository.commit(compact=False) output = cmd(archiver, "check", "-v", exit_code=0) # orphans are not considered warnings anymore @@ -391,7 +389,7 @@ def test_empty_repository(archivers, request): if archiver.get_kind() == "remote": pytest.skip("only works locally") check_cmd_setup(archiver) - with Repository(archiver.repository_location, exclusive=True) as repository: + with Repository3(archiver.repository_location, exclusive=True) as repository: for id_ in repository.list(): repository.delete(id_) repository.commit(compact=False) diff --git a/src/borg/testsuite/archiver/checks.py b/src/borg/testsuite/archiver/checks.py index a9324fbdf..8f5dd144a 100644 --- a/src/borg/testsuite/archiver/checks.py +++ b/src/borg/testsuite/archiver/checks.py @@ -9,8 +9,8 @@ from ...constants import * # NOQA from ...helpers import Location, get_security_dir, bin_to_hex from ...helpers import EXIT_ERROR from ...manifest import Manifest, MandatoryFeatureUnsupported -from ...remote import RemoteRepository, PathNotAllowed -from ...repository import Repository +from ...remote3 import RemoteRepository3, PathNotAllowed +from ...repository3 import Repository3 from .. import llfuse from .. import changedir from . import cmd, _extract_repository_id, open_repository, check_cache, create_test_files @@ -25,7 +25,7 @@ def get_security_directory(repo_path): def add_unknown_feature(repo_path, operation): - with Repository(repo_path, exclusive=True) as repository: + with Repository3(repo_path, exclusive=True) as repository: manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) manifest.config["feature_flags"] = {operation.value: {"mandatory": ["unknown-feature"]}} manifest.write() @@ -272,7 +272,7 @@ def test_unknown_mandatory_feature_in_cache(archivers, request): remote_repo = archiver.get_kind() == "remote" print(cmd(archiver, "rcreate", RK_ENCRYPTION)) - with Repository(archiver.repository_path, exclusive=True) as repository: + with Repository3(archiver.repository_path, exclusive=True) as repository: if remote_repo: repository._location = Location(archiver.repository_location) manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) @@ -299,7 +299,7 @@ def test_unknown_mandatory_feature_in_cache(archivers, request): if is_localcache: assert called - with Repository(archiver.repository_path, exclusive=True) as repository: + with Repository3(archiver.repository_path, exclusive=True) as repository: if remote_repo: repository._location = Location(archiver.repository_location) manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) @@ -346,26 +346,26 @@ def test_env_use_chunks_archive(archivers, request, monkeypatch): def test_remote_repo_restrict_to_path(remote_archiver): original_location, repo_path = remote_archiver.repository_location, remote_archiver.repository_path # restricted to repo directory itself: - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", repo_path]): + with patch.object(RemoteRepository3, "extra_test_args", ["--restrict-to-path", repo_path]): cmd(remote_archiver, "rcreate", RK_ENCRYPTION) # restricted to repo directory itself, fail for other directories with same prefix: - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", repo_path]): + with patch.object(RemoteRepository3, "extra_test_args", ["--restrict-to-path", repo_path]): with pytest.raises(PathNotAllowed): remote_archiver.repository_location = original_location + "_0" cmd(remote_archiver, "rcreate", RK_ENCRYPTION) # restricted to a completely different path: - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", "/foo"]): + with patch.object(RemoteRepository3, "extra_test_args", ["--restrict-to-path", "/foo"]): with pytest.raises(PathNotAllowed): remote_archiver.repository_location = original_location + "_1" cmd(remote_archiver, "rcreate", RK_ENCRYPTION) path_prefix = os.path.dirname(repo_path) # restrict to repo directory's parent directory: - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", path_prefix]): + with patch.object(RemoteRepository3, "extra_test_args", ["--restrict-to-path", path_prefix]): remote_archiver.repository_location = original_location + "_2" cmd(remote_archiver, "rcreate", RK_ENCRYPTION) # restrict to repo directory's parent directory and another directory: with patch.object( - RemoteRepository, "extra_test_args", ["--restrict-to-path", "/foo", "--restrict-to-path", path_prefix] + RemoteRepository3, "extra_test_args", ["--restrict-to-path", "/foo", "--restrict-to-path", path_prefix] ): remote_archiver.repository_location = original_location + "_3" cmd(remote_archiver, "rcreate", RK_ENCRYPTION) @@ -374,10 +374,10 @@ def test_remote_repo_restrict_to_path(remote_archiver): def test_remote_repo_restrict_to_repository(remote_archiver): repo_path = remote_archiver.repository_path # restricted to repo directory itself: - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-repository", repo_path]): + with patch.object(RemoteRepository3, "extra_test_args", ["--restrict-to-repository", repo_path]): cmd(remote_archiver, "rcreate", RK_ENCRYPTION) parent_path = os.path.join(repo_path, "..") - with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-repository", parent_path]): + with patch.object(RemoteRepository3, "extra_test_args", ["--restrict-to-repository", parent_path]): with pytest.raises(PathNotAllowed): cmd(remote_archiver, "rcreate", RK_ENCRYPTION) diff --git a/src/borg/testsuite/archiver/config_cmd.py b/src/borg/testsuite/archiver/config_cmd.py deleted file mode 100644 index fa89df241..000000000 --- a/src/borg/testsuite/archiver/config_cmd.py +++ /dev/null @@ -1,64 +0,0 @@ -import os -import pytest - -from ...constants import * # NOQA -from . import RK_ENCRYPTION, create_test_files, cmd, generate_archiver_tests -from ...helpers import CommandError, Error - -pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,binary") # NOQA - - -def test_config(archivers, request): - archiver = request.getfixturevalue(archivers) - create_test_files(archiver.input_path) - os.unlink("input/flagfile") - cmd(archiver, "rcreate", RK_ENCRYPTION) - output = cmd(archiver, "config", "--list") - assert "[repository]" in output - assert "version" in output - assert "segments_per_dir" in output - assert "storage_quota" in output - assert "append_only" in output - assert "additional_free_space" in output - assert "id" in output - assert "last_segment_checked" not in output - - if archiver.FORK_DEFAULT: - output = cmd(archiver, "config", "last_segment_checked", exit_code=2) - assert "No option " in output - else: - with pytest.raises(Error): - cmd(archiver, "config", "last_segment_checked") - - cmd(archiver, "config", "last_segment_checked", "123") - output = cmd(archiver, "config", "last_segment_checked") - assert output == "123" + os.linesep - output = cmd(archiver, "config", "--list") - assert "last_segment_checked" in output - cmd(archiver, "config", "--delete", "last_segment_checked") - - for cfg_key, cfg_value in [("additional_free_space", "2G"), ("repository.append_only", "1")]: - output = cmd(archiver, "config", cfg_key) - assert output == "0" + os.linesep - cmd(archiver, "config", cfg_key, cfg_value) - output = cmd(archiver, "config", cfg_key) - assert output == cfg_value + os.linesep - cmd(archiver, "config", "--delete", cfg_key) - if archiver.FORK_DEFAULT: - cmd(archiver, "config", cfg_key, exit_code=2) - else: - with pytest.raises(Error): - cmd(archiver, "config", cfg_key) - - cmd(archiver, "config", "--list", "--delete", exit_code=2) - if archiver.FORK_DEFAULT: - expected_ec = CommandError().exit_code - cmd(archiver, "config", exit_code=expected_ec) - else: - with pytest.raises(CommandError): - cmd(archiver, "config") - if archiver.FORK_DEFAULT: - cmd(archiver, "config", "invalid-option", exit_code=2) - else: - with pytest.raises(Error): - cmd(archiver, "config", "invalid-option") diff --git a/src/borg/testsuite/archiver/corruption.py b/src/borg/testsuite/archiver/corruption.py index 65804eaca..3df1789d1 100644 --- a/src/borg/testsuite/archiver/corruption.py +++ b/src/borg/testsuite/archiver/corruption.py @@ -13,24 +13,6 @@ from ...hashindex import ChunkIndex from ...cache import LocalCache -def test_check_corrupted_repository(archiver): - cmd(archiver, "rcreate", RK_ENCRYPTION) - create_src_archive(archiver, "test") - cmd(archiver, "extract", "test", "--dry-run") - cmd(archiver, "check") - - name = sorted(os.listdir(os.path.join(archiver.tmpdir, "repository", "data", "0")), reverse=True)[1] - with open(os.path.join(archiver.tmpdir, "repository", "data", "0", name), "r+b") as fd: - fd.seek(100) - fd.write(b"XXXX") - - if archiver.FORK_DEFAULT: - cmd(archiver, "check", exit_code=1) - else: - with pytest.raises(Error): - cmd(archiver, "check") - - def corrupt_archiver(archiver): create_test_files(archiver.input_path) cmd(archiver, "rcreate", RK_ENCRYPTION) diff --git a/src/borg/testsuite/archiver/create_cmd.py b/src/borg/testsuite/archiver/create_cmd.py index 72e8bc97d..4f740abfa 100644 --- a/src/borg/testsuite/archiver/create_cmd.py +++ b/src/borg/testsuite/archiver/create_cmd.py @@ -16,7 +16,7 @@ from ...cache import get_cache_impl from ...constants import * # NOQA from ...manifest import Manifest from ...platform import is_cygwin, is_win32, is_darwin -from ...repository import Repository +from ...repository3 import Repository3 from ...helpers import CommandError, BackupPermissionError from .. import has_lchflags from .. import changedir @@ -668,7 +668,7 @@ def test_create_dry_run(archivers, request): cmd(archiver, "rcreate", RK_ENCRYPTION) cmd(archiver, "create", "--dry-run", "test", "input") # Make sure no archive has been created - with Repository(archiver.repository_path) as repository: + with Repository3(archiver.repository_path) as repository: manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) assert len(manifest.archives) == 0 diff --git a/src/borg/testsuite/archiver/delete_cmd.py b/src/borg/testsuite/archiver/delete_cmd.py index 25c35e931..e931cc588 100644 --- a/src/borg/testsuite/archiver/delete_cmd.py +++ b/src/borg/testsuite/archiver/delete_cmd.py @@ -1,7 +1,7 @@ from ...archive import Archive from ...constants import * # NOQA from ...manifest import Manifest -from ...repository import Repository +from ...repository3 import Repository3 from . import cmd, create_regular_file, src_file, create_src_archive, generate_archiver_tests, RK_ENCRYPTION pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary") # NOQA @@ -47,7 +47,7 @@ def test_delete_force(archivers, request): archiver = request.getfixturevalue(archivers) cmd(archiver, "rcreate", "--encryption=none") create_src_archive(archiver, "test") - with Repository(archiver.repository_path, exclusive=True) as repository: + with Repository3(archiver.repository_path, exclusive=True) as repository: manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) archive = Archive(manifest, "test") for item in archive.iter_items(): @@ -69,7 +69,7 @@ def test_delete_double_force(archivers, request): archiver = request.getfixturevalue(archivers) cmd(archiver, "rcreate", "--encryption=none") create_src_archive(archiver, "test") - with Repository(archiver.repository_path, exclusive=True) as repository: + with Repository3(archiver.repository_path, exclusive=True) as repository: manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) archive = Archive(manifest, "test") id = archive.metadata.items[0] diff --git a/src/borg/testsuite/archiver/key_cmds.py b/src/borg/testsuite/archiver/key_cmds.py index ef00e007e..32a56fdd6 100644 --- a/src/borg/testsuite/archiver/key_cmds.py +++ b/src/borg/testsuite/archiver/key_cmds.py @@ -9,7 +9,7 @@ from ...crypto.keymanager import RepoIdMismatch, NotABorgKeyFile from ...helpers import CommandError from ...helpers import bin_to_hex, hex_to_bin from ...helpers import msgpack -from ...repository import Repository +from ...repository3 import Repository3 from .. import key from . import RK_ENCRYPTION, KF_ENCRYPTION, cmd, _extract_repository_id, _set_repository_id, generate_archiver_tests @@ -129,7 +129,7 @@ def test_key_export_repokey(archivers, request): assert export_contents.startswith("BORG_KEY " + bin_to_hex(repo_id) + "\n") - with Repository(archiver.repository_path) as repository: + with Repository3(archiver.repository_path) as repository: repo_key = AESOCBRepoKey(repository) repo_key.load(None, Passphrase.env_passphrase()) @@ -138,12 +138,12 @@ def test_key_export_repokey(archivers, request): assert repo_key.crypt_key == backup_key.crypt_key - with Repository(archiver.repository_path) as repository: + with Repository3(archiver.repository_path) as repository: repository.save_key(b"") cmd(archiver, "key", "import", export_file) - with Repository(archiver.repository_path) as repository: + with Repository3(archiver.repository_path) as repository: repo_key2 = AESOCBRepoKey(repository) repo_key2.load(None, Passphrase.env_passphrase()) @@ -302,7 +302,7 @@ def test_init_defaults_to_argon2(archivers, request): """https://github.com/borgbackup/borg/issues/747#issuecomment-1076160401""" archiver = request.getfixturevalue(archivers) cmd(archiver, "rcreate", RK_ENCRYPTION) - with Repository(archiver.repository_path) as repository: + with Repository3(archiver.repository_path) as repository: key = msgpack.unpackb(binascii.a2b_base64(repository.load_key())) assert key["algorithm"] == "argon2 chacha20-poly1305" @@ -313,7 +313,7 @@ def test_change_passphrase_does_not_change_algorithm_argon2(archivers, request): os.environ["BORG_NEW_PASSPHRASE"] = "newpassphrase" cmd(archiver, "key", "change-passphrase") - with Repository(archiver.repository_path) as repository: + with Repository3(archiver.repository_path) as repository: key = msgpack.unpackb(binascii.a2b_base64(repository.load_key())) assert key["algorithm"] == "argon2 chacha20-poly1305" @@ -323,6 +323,6 @@ def test_change_location_does_not_change_algorithm_argon2(archivers, request): cmd(archiver, "rcreate", KF_ENCRYPTION) cmd(archiver, "key", "change-location", "repokey") - with Repository(archiver.repository_path) as repository: + with Repository3(archiver.repository_path) as repository: key = msgpack.unpackb(binascii.a2b_base64(repository.load_key())) assert key["algorithm"] == "argon2 chacha20-poly1305" diff --git a/src/borg/testsuite/archiver/rcompress_cmd.py b/src/borg/testsuite/archiver/rcompress_cmd.py index 4635f05ac..680f1a427 100644 --- a/src/borg/testsuite/archiver/rcompress_cmd.py +++ b/src/borg/testsuite/archiver/rcompress_cmd.py @@ -1,7 +1,7 @@ import os from ...constants import * # NOQA -from ...repository import Repository +from ...repository3 import Repository3 from ...manifest import Manifest from ...compress import ZSTD, ZLIB, LZ4, CNONE from ...helpers import bin_to_hex @@ -12,7 +12,7 @@ from . import create_regular_file, cmd, RK_ENCRYPTION def test_rcompress(archiver): def check_compression(ctype, clevel, olevel): """check if all the chunks in the repo are compressed/obfuscated like expected""" - repository = Repository(archiver.repository_path, exclusive=True) + repository = Repository3(archiver.repository_path, exclusive=True) with repository: manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) state = None diff --git a/src/borg/testsuite/archiver/rcreate_cmd.py b/src/borg/testsuite/archiver/rcreate_cmd.py index b027ca1a8..0569d747d 100644 --- a/src/borg/testsuite/archiver/rcreate_cmd.py +++ b/src/borg/testsuite/archiver/rcreate_cmd.py @@ -6,28 +6,11 @@ import pytest from ...helpers.errors import Error, CancelledByUser from ...constants import * # NOQA from ...crypto.key import FlexiKey -from ...repository import Repository from . import cmd, generate_archiver_tests, RK_ENCRYPTION, KF_ENCRYPTION pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary") # NOQA -def test_rcreate_parent_dirs(archivers, request): - archiver = request.getfixturevalue(archivers) - if archiver.EXE: - pytest.skip("does not raise Exception, but sets rc==2") - remote_repo = archiver.get_kind() == "remote" - parent_path = os.path.join(archiver.tmpdir, "parent1", "parent2") - repository_path = os.path.join(parent_path, "repository") - archiver.repository_location = ("ssh://__testsuite__" + repository_path) if remote_repo else repository_path - with pytest.raises(Repository.ParentPathDoesNotExist): - # normal borg rcreate does NOT create missing parent dirs - cmd(archiver, "rcreate", "--encryption=none") - # but if told so, it does: - cmd(archiver, "rcreate", "--encryption=none", "--make-parent-dirs") - assert os.path.exists(parent_path) - - def test_rcreate_interrupt(archivers, request): archiver = request.getfixturevalue(archivers) if archiver.EXE: @@ -51,18 +34,6 @@ def test_rcreate_requires_encryption_option(archivers, request): cmd(archiver, "rcreate", exit_code=2) -def test_rcreate_nested_repositories(archivers, request): - archiver = request.getfixturevalue(archivers) - cmd(archiver, "rcreate", RK_ENCRYPTION) - archiver.repository_location += "/nested" - if archiver.FORK_DEFAULT: - expected_ec = Repository.AlreadyExists().exit_code - cmd(archiver, "rcreate", RK_ENCRYPTION, exit_code=expected_ec) - else: - with pytest.raises(Repository.AlreadyExists): - cmd(archiver, "rcreate", RK_ENCRYPTION) - - def test_rcreate_refuse_to_overwrite_keyfile(archivers, request, monkeypatch): # BORG_KEY_FILE=something borg rcreate should quit if "something" already exists. # See: https://github.com/borgbackup/borg/pull/6046 diff --git a/src/borg/testsuite/archiver/rename_cmd.py b/src/borg/testsuite/archiver/rename_cmd.py index 5a1b65c0a..7a1637733 100644 --- a/src/borg/testsuite/archiver/rename_cmd.py +++ b/src/borg/testsuite/archiver/rename_cmd.py @@ -1,6 +1,6 @@ from ...constants import * # NOQA from ...manifest import Manifest -from ...repository import Repository +from ...repository3 import Repository3 from . import cmd, create_regular_file, generate_archiver_tests, RK_ENCRYPTION pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary") # NOQA @@ -21,7 +21,7 @@ def test_rename(archivers, request): cmd(archiver, "extract", "test.3", "--dry-run") cmd(archiver, "extract", "test.4", "--dry-run") # Make sure both archives have been renamed - with Repository(archiver.repository_path) as repository: + with Repository3(archiver.repository_path) as repository: manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) assert len(manifest.archives) == 2 assert "test.3" in manifest.archives diff --git a/src/borg/testsuite/archiver/return_codes.py b/src/borg/testsuite/archiver/return_codes.py index 9c23f7995..3825904a4 100644 --- a/src/borg/testsuite/archiver/return_codes.py +++ b/src/borg/testsuite/archiver/return_codes.py @@ -5,7 +5,7 @@ from . import cmd_fixture, changedir # NOQA def test_return_codes(cmd_fixture, tmpdir): - repo = tmpdir.mkdir("repo") + repo = tmpdir / "repo" # borg creates the directory input = tmpdir.mkdir("input") output = tmpdir.mkdir("output") input.join("test_file").write("content") diff --git a/src/borg/testsuite/archiver/rinfo_cmd.py b/src/borg/testsuite/archiver/rinfo_cmd.py index bf2b14c52..269c08326 100644 --- a/src/borg/testsuite/archiver/rinfo_cmd.py +++ b/src/borg/testsuite/archiver/rinfo_cmd.py @@ -35,21 +35,3 @@ def test_info_json(archivers, request): stats = cache["stats"] assert all(isinstance(o, int) for o in stats.values()) assert all(key in stats for key in ("total_chunks", "total_size", "total_unique_chunks", "unique_size")) - - -def test_info_on_repository_with_storage_quota(archivers, request): - archiver = request.getfixturevalue(archivers) - create_regular_file(archiver.input_path, "file1", contents=randbytes(1000 * 1000)) - cmd(archiver, "rcreate", RK_ENCRYPTION, "--storage-quota=1G") - cmd(archiver, "create", "test", "input") - info_repo = cmd(archiver, "rinfo") - assert "Storage quota: 1.00 MB used out of 1.00 GB" in info_repo - - -def test_info_on_repository_without_storage_quota(archivers, request): - archiver = request.getfixturevalue(archivers) - create_regular_file(archiver.input_path, "file1", contents=randbytes(1000 * 1000)) - cmd(archiver, "rcreate", RK_ENCRYPTION) - cmd(archiver, "create", "test", "input") - info_repo = cmd(archiver, "rinfo") - assert "Storage quota: 1.00 MB used" in info_repo diff --git a/src/borg/testsuite/cache.py b/src/borg/testsuite/cache.py index 60cb870e3..c232c84b2 100644 --- a/src/borg/testsuite/cache.py +++ b/src/borg/testsuite/cache.py @@ -12,7 +12,7 @@ from ..cache import AdHocCache from ..crypto.key import AESOCBRepoKey from ..hashindex import ChunkIndex, CacheSynchronizer from ..manifest import Manifest -from ..repository import Repository +from ..repository3 import Repository3 class TestCacheSynchronizer: @@ -164,7 +164,7 @@ class TestAdHocCache: @pytest.fixture def repository(self, tmpdir): self.repository_location = os.path.join(str(tmpdir), "repository") - with Repository(self.repository_location, exclusive=True, create=True) as repository: + with Repository3(self.repository_location, exclusive=True, create=True) as repository: repository.put(H(1), b"1234") repository.put(Manifest.MANIFEST_ID, b"5678") yield repository @@ -201,7 +201,7 @@ class TestAdHocCache: assert cache.seen_chunk(H(5)) == 1 cache.chunk_decref(H(5), 1, Statistics()) assert not cache.seen_chunk(H(5)) - with pytest.raises(Repository.ObjectNotFound): + with pytest.raises(Repository3.ObjectNotFound): repository.get(H(5)) def test_files_cache(self, cache): diff --git a/src/borg/testsuite/repoobj.py b/src/borg/testsuite/repoobj.py index 44c364d81..f34fa07d0 100644 --- a/src/borg/testsuite/repoobj.py +++ b/src/borg/testsuite/repoobj.py @@ -3,14 +3,14 @@ import pytest from ..constants import ROBJ_FILE_STREAM, ROBJ_MANIFEST, ROBJ_ARCHIVE_META from ..crypto.key import PlaintextKey from ..helpers.errors import IntegrityError -from ..repository import Repository +from ..repository3 import Repository3 from ..repoobj import RepoObj, RepoObj1 from ..compress import LZ4 @pytest.fixture def repository(tmpdir): - return Repository(tmpdir, create=True) + return Repository3(tmpdir, create=True) @pytest.fixture diff --git a/src/borg/testsuite/repository3.py b/src/borg/testsuite/repository3.py new file mode 100644 index 000000000..533d18723 --- /dev/null +++ b/src/borg/testsuite/repository3.py @@ -0,0 +1,290 @@ +import logging +import os +import sys +from typing import Optional + +import pytest + +from ..helpers import Location +from ..helpers import IntegrityError +from ..platformflags import is_win32 +from ..remote3 import RemoteRepository3, InvalidRPCMethod, PathNotAllowed +from ..repository3 import Repository3, MAX_DATA_SIZE +from ..repoobj import RepoObj +from .hashindex import H + + +@pytest.fixture() +def repository(tmp_path): + repository_location = os.fspath(tmp_path / "repository") + yield Repository3(repository_location, exclusive=True, create=True) + + +@pytest.fixture() +def remote_repository(tmp_path): + if is_win32: + pytest.skip("Remote repository does not yet work on Windows.") + repository_location = Location("ssh://__testsuite__" + os.fspath(tmp_path / "repository")) + yield RemoteRepository3(repository_location, exclusive=True, create=True) + + +def pytest_generate_tests(metafunc): + # Generates tests that run on both local and remote repos + if "repo_fixtures" in metafunc.fixturenames: + metafunc.parametrize("repo_fixtures", ["repository", "remote_repository"]) + + +def get_repository_from_fixture(repo_fixtures, request): + # returns the repo object from the fixture for tests that run on both local and remote repos + return request.getfixturevalue(repo_fixtures) + + +def reopen(repository, exclusive: Optional[bool] = True, create=False): + if isinstance(repository, Repository3): + if repository.opened: + raise RuntimeError("Repo must be closed before a reopen. Cannot support nested repository contexts.") + return Repository3(repository.path, exclusive=exclusive, create=create) + + if isinstance(repository, RemoteRepository3): + if repository.p is not None or repository.sock is not None: + raise RuntimeError("Remote repo must be closed before a reopen. Cannot support nested repository contexts.") + return RemoteRepository3(repository.location, exclusive=exclusive, create=create) + + raise TypeError( + f"Invalid argument type. Expected 'Repository3' or 'RemoteRepository3', received '{type(repository).__name__}'." + ) + + +def fchunk(data, meta=b""): + # format chunk: create a raw chunk that has valid RepoObj layout, but does not use encryption or compression. + meta_len = RepoObj.meta_len_hdr.pack(len(meta)) + assert isinstance(data, bytes) + chunk = meta_len + meta + data + return chunk + + +def pchunk(chunk): + # parse chunk: parse data and meta from a raw chunk made by fchunk + meta_len_size = RepoObj.meta_len_hdr.size + meta_len = chunk[:meta_len_size] + meta_len = RepoObj.meta_len_hdr.unpack(meta_len)[0] + meta = chunk[meta_len_size : meta_len_size + meta_len] + data = chunk[meta_len_size + meta_len :] + return data, meta + + +def pdchunk(chunk): + # parse only data from a raw chunk made by fchunk + return pchunk(chunk)[0] + + +def test_basic_operations(repo_fixtures, request): + with get_repository_from_fixture(repo_fixtures, request) as repository: + for x in range(100): + repository.put(H(x), fchunk(b"SOMEDATA")) + key50 = H(50) + assert pdchunk(repository.get(key50)) == b"SOMEDATA" + repository.delete(key50) + with pytest.raises(Repository3.ObjectNotFound): + repository.get(key50) + with reopen(repository) as repository: + with pytest.raises(Repository3.ObjectNotFound): + repository.get(key50) + for x in range(100): + if x == 50: + continue + assert pdchunk(repository.get(H(x))) == b"SOMEDATA" + + +def test_read_data(repo_fixtures, request): + with get_repository_from_fixture(repo_fixtures, request) as repository: + meta, data = b"meta", b"data" + meta_len = RepoObj.meta_len_hdr.pack(len(meta)) + chunk_complete = meta_len + meta + data + chunk_short = meta_len + meta + repository.put(H(0), chunk_complete) + assert repository.get(H(0)) == chunk_complete + assert repository.get(H(0), read_data=True) == chunk_complete + assert repository.get(H(0), read_data=False) == chunk_short + + +def test_consistency(repo_fixtures, request): + with get_repository_from_fixture(repo_fixtures, request) as repository: + repository.put(H(0), fchunk(b"foo")) + assert pdchunk(repository.get(H(0))) == b"foo" + repository.put(H(0), fchunk(b"foo2")) + assert pdchunk(repository.get(H(0))) == b"foo2" + repository.put(H(0), fchunk(b"bar")) + assert pdchunk(repository.get(H(0))) == b"bar" + repository.delete(H(0)) + with pytest.raises(Repository3.ObjectNotFound): + repository.get(H(0)) + + +def test_list(repo_fixtures, request): + with get_repository_from_fixture(repo_fixtures, request) as repository: + for x in range(100): + repository.put(H(x), fchunk(b"SOMEDATA")) + repo_list = repository.list() + assert len(repo_list) == 100 + first_half = repository.list(limit=50) + assert len(first_half) == 50 + assert first_half == repo_list[:50] + second_half = repository.list(marker=first_half[-1]) + assert len(second_half) == 50 + assert second_half == repo_list[50:] + assert len(repository.list(limit=50)) == 50 + + +def test_scan(repo_fixtures, request): + with get_repository_from_fixture(repo_fixtures, request) as repository: + for x in range(100): + repository.put(H(x), fchunk(b"SOMEDATA")) + ids, _ = repository.scan() + assert len(ids) == 100 + first_half, state = repository.scan(limit=50) + assert len(first_half) == 50 + assert first_half == ids[:50] + second_half, _ = repository.scan(state=state) + assert len(second_half) == 50 + assert second_half == ids[50:] + + +def test_max_data_size(repo_fixtures, request): + with get_repository_from_fixture(repo_fixtures, request) as repository: + max_data = b"x" * (MAX_DATA_SIZE - RepoObj.meta_len_hdr.size) + repository.put(H(0), fchunk(max_data)) + assert pdchunk(repository.get(H(0))) == max_data + with pytest.raises(IntegrityError): + repository.put(H(1), fchunk(max_data + b"x")) + + +def check(repository, repo_path, repair=False, status=True): + assert repository.check(repair=repair) == status + # Make sure no tmp files are left behind + tmp_files = [name for name in os.listdir(repo_path) if "tmp" in name] + assert tmp_files == [], "Found tmp files" + + +def _get_mock_args(): + class MockArgs: + remote_path = "borg" + umask = 0o077 + debug_topics = [] + rsh = None + + def __contains__(self, item): + # to behave like argparse.Namespace + return hasattr(self, item) + + return MockArgs() + + +def test_remote_invalid_rpc(remote_repository): + with remote_repository: + with pytest.raises(InvalidRPCMethod): + remote_repository.call("__init__", {}) + + +def test_remote_rpc_exception_transport(remote_repository): + with remote_repository: + s1 = "test string" + + try: + remote_repository.call("inject_exception", {"kind": "DoesNotExist"}) + except Repository3.DoesNotExist as e: + assert len(e.args) == 1 + assert e.args[0] == remote_repository.location.processed + + try: + remote_repository.call("inject_exception", {"kind": "AlreadyExists"}) + except Repository3.AlreadyExists as e: + assert len(e.args) == 1 + assert e.args[0] == remote_repository.location.processed + + try: + remote_repository.call("inject_exception", {"kind": "CheckNeeded"}) + except Repository3.CheckNeeded as e: + assert len(e.args) == 1 + assert e.args[0] == remote_repository.location.processed + + try: + remote_repository.call("inject_exception", {"kind": "IntegrityError"}) + except IntegrityError as e: + assert len(e.args) == 1 + assert e.args[0] == s1 + + try: + remote_repository.call("inject_exception", {"kind": "PathNotAllowed"}) + except PathNotAllowed as e: + assert len(e.args) == 1 + assert e.args[0] == "foo" + + try: + remote_repository.call("inject_exception", {"kind": "ObjectNotFound"}) + except Repository3.ObjectNotFound as e: + assert len(e.args) == 2 + assert e.args[0] == s1 + assert e.args[1] == remote_repository.location.processed + + try: + remote_repository.call("inject_exception", {"kind": "InvalidRPCMethod"}) + except InvalidRPCMethod as e: + assert len(e.args) == 1 + assert e.args[0] == s1 + + try: + remote_repository.call("inject_exception", {"kind": "divide"}) + except RemoteRepository3.RPCError as e: + assert e.unpacked + assert e.get_message() == "ZeroDivisionError: integer division or modulo by zero\n" + assert e.exception_class == "ZeroDivisionError" + assert len(e.exception_full) > 0 + + +def test_remote_ssh_cmd(remote_repository): + with remote_repository: + args = _get_mock_args() + remote_repository._args = args + assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "example.com"] + assert remote_repository.ssh_cmd(Location("ssh://user@example.com/foo")) == ["ssh", "user@example.com"] + assert remote_repository.ssh_cmd(Location("ssh://user@example.com:1234/foo")) == [ + "ssh", + "-p", + "1234", + "user@example.com", + ] + os.environ["BORG_RSH"] = "ssh --foo" + assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "--foo", "example.com"] + + +def test_remote_borg_cmd(remote_repository): + with remote_repository: + assert remote_repository.borg_cmd(None, testing=True) == [sys.executable, "-m", "borg", "serve"] + args = _get_mock_args() + # XXX without next line we get spurious test fails when using pytest-xdist, root cause unknown: + logging.getLogger().setLevel(logging.INFO) + # note: test logger is on info log level, so --info gets added automagically + assert remote_repository.borg_cmd(args, testing=False) == ["borg", "serve", "--info"] + args.remote_path = "borg-0.28.2" + assert remote_repository.borg_cmd(args, testing=False) == ["borg-0.28.2", "serve", "--info"] + args.debug_topics = ["something_client_side", "repository_compaction"] + assert remote_repository.borg_cmd(args, testing=False) == [ + "borg-0.28.2", + "serve", + "--info", + "--debug-topic=borg.debug.repository_compaction", + ] + args = _get_mock_args() + args.storage_quota = 0 + assert remote_repository.borg_cmd(args, testing=False) == ["borg", "serve", "--info"] + args.storage_quota = 314159265 + assert remote_repository.borg_cmd(args, testing=False) == [ + "borg", + "serve", + "--info", + "--storage-quota=314159265", + ] + args.rsh = "ssh -i foo" + remote_repository._args = args + assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "-i", "foo", "example.com"] diff --git a/tox.ini b/tox.ini index 10f3e7f30..402186dcd 100644 --- a/tox.ini +++ b/tox.ini @@ -42,7 +42,7 @@ deps = pytest mypy pkgconfig -commands = mypy +commands = mypy --ignore-missing-imports [testenv:docs] changedir = docs