diff --git a/src/borg/archiver.py b/src/borg/archiver.py index 6df3284c3..59f4e17bb 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -2753,10 +2753,14 @@ def main(): # pragma: no cover tb = "%s\n%s" % (traceback.format_exc(), sysinfo()) exit_code = e.exit_code except RemoteRepository.RPCError as e: - msg = "%s %s" % (e.remote_type, e.name) - important = e.remote_type not in ('LockTimeout', ) + important = e.exception_class not in ('LockTimeout', ) tb_log_level = logging.ERROR if important else logging.DEBUG - tb = sysinfo() + if important: + msg = e.exception_full + else: + msg = e.get_message() + tb = '\n'.join('Borg server: ' + l for l in e.sysinfo.splitlines()) + tb += "\n" + sysinfo() exit_code = EXIT_ERROR except Exception: msg = 'Local Exception' diff --git a/src/borg/remote.py b/src/borg/remote.py index 6264241c2..9a5221bfb 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -1,5 +1,7 @@ import errno import fcntl +import functools +import inspect import logging import os import select @@ -20,8 +22,11 @@ from .helpers import bin_to_hex from .helpers import replace_placeholders from .helpers import yes from .repository import Repository +from .version import parse_version, format_version RPC_PROTOCOL_VERSION = 2 +BORG_VERSION = parse_version(__version__) +MSGID, MSG, ARGS, RESULT = b'i', b'm', b'a', b'r' BUFSIZE = 10 * 1024 * 1024 @@ -54,6 +59,51 @@ class UnexpectedRPCDataFormatFromServer(Error): """Got unexpected RPC data format from server.""" +# Protocol compatibility: +# In general the server is responsible for rejecting too old clients and the client it responsible for rejecting +# too old servers. This ensures that the knowledge what is compatible is always held by the newer component. +# +# The server can do checks for the client version in RepositoryServer.negotiate. If the client_data is 2 then +# client is in the version range [0.29.0, 1.0.x] inclusive. For newer clients client_data is a dict which contains +# client_version. +# +# For the client the return of the negotiate method is either 2 if the server is in the version range [0.29.0, 1.0.x] +# inclusive, or it is a dict which includes the server version. +# +# All method calls on the remote repository object must be whitelisted in RepositoryServer.rpc_methods and have api +# stubs in RemoteRepository. The @api decorator on these stubs is used to set server version requirements. +# +# Method parameters are identified only by name and never by position. Unknown parameters are ignored by the server side. +# If a new parameter is important and may not be ignored, on the client a parameter specific version requirement needs +# to be added. +# When parameters are removed, they need to be preserved as defaulted parameters on the client stubs so that older +# servers still get compatible input. + + +compatMap = { + 'check': ('repair', 'save_space', ), + 'commit': ('save_space', ), + 'rollback': (), + 'destroy': (), + '__len__': (), + 'list': ('limit', 'marker', ), + 'put': ('id', 'data', ), + 'get': ('id', ), + 'delete': ('id', ), + 'save_key': ('keydata', ), + 'load_key': (), + 'break_lock': (), + 'negotiate': ('client_data', ), + 'open': ('path', 'create', 'lock_wait', 'lock', 'exclusive', 'append_only', ), + 'get_free_nonce': (), + 'commit_nonce_reservation': ('next_unreserved', 'start_nonce', ), +} + + +def decode_keys(d): + return {k.decode(): d[k] for k in d} + + class RepositoryServer: # pragma: no cover rpc_methods = ( '__len__', @@ -72,13 +122,24 @@ class RepositoryServer: # pragma: no cover 'load_key', 'break_lock', 'get_free_nonce', - 'commit_nonce_reservation' + 'commit_nonce_reservation', + 'inject_exception', ) def __init__(self, restrict_to_paths, append_only): self.repository = None self.restrict_to_paths = restrict_to_paths self.append_only = append_only + self.client_version = parse_version('1.0.8') # fallback version if client is too old to send version information + + def positional_to_named(self, method, argv): + """Translate from positional protocol to named protocol.""" + return {name: argv[pos] for pos, name in enumerate(compatMap[method])} + + def filter_args(self, f, kwargs): + """Remove unknown named parameters from call, because client did (implicitly) say it's ok.""" + known = set(inspect.signature(f).parameters) + return {name: kwargs[name] for name in kwargs if name in known} def serve(self): stdin_fd = sys.stdin.fileno() @@ -102,17 +163,26 @@ class RepositoryServer: # pragma: no cover if self.repository is not None: self.repository.close() else: - os.write(stderr_fd, "Borg {}: Got connection close before repository was opened.\n" + os.write(stderr_fd, 'Borg {}: Got connection close before repository was opened.\n' .format(__version__).encode()) return unpacker.feed(data) for unpacked in unpacker: - if not (isinstance(unpacked, tuple) and len(unpacked) == 4): + if isinstance(unpacked, dict): + dictFormat = True + msgid = unpacked[MSGID] + method = unpacked[MSG].decode() + args = decode_keys(unpacked[ARGS]) + elif isinstance(unpacked, tuple) and len(unpacked) == 4: + dictFormat = False + # The first field 'type' was always 1 and has always been ignored + _, msgid, method, args = unpacked + method = method.decode() + args = self.positional_to_named(method, args) + else: if self.repository is not None: self.repository.close() raise UnexpectedRPCDataFormatFromClient(__version__) - type, msgid, method, args = unpacked - method = method.decode('ascii') try: if method not in self.rpc_methods: raise InvalidRPCMethod(method) @@ -120,36 +190,82 @@ class RepositoryServer: # pragma: no cover f = getattr(self, method) except AttributeError: f = getattr(self.repository, method) - res = f(*args) + args = self.filter_args(f, args) + res = f(**args) except BaseException as e: - if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)): - # These exceptions are reconstructed on the client end in RemoteRepository.call_many(), - # and will be handled just like locally raised exceptions. Suppress the remote traceback - # for these, except ErrorWithTraceback, which should always display a traceback. - pass - else: + if dictFormat: + ex_short = traceback.format_exception_only(e.__class__, e) + ex_full = traceback.format_exception(*sys.exc_info()) if isinstance(e, Error): - tb_log_level = logging.ERROR if e.traceback else logging.DEBUG - msg = e.get_message() + ex_short = e.get_message() + if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)): + # These exceptions are reconstructed on the client end in RemoteRepository.call_many(), + # and will be handled just like locally raised exceptions. Suppress the remote traceback + # for these, except ErrorWithTraceback, which should always display a traceback. + pass else: - tb_log_level = logging.ERROR - msg = '%s Exception in RPC call' % e.__class__.__name__ - tb = '%s\n%s' % (traceback.format_exc(), sysinfo()) - logging.error(msg) - logging.log(tb_log_level, tb) - exc = "Remote Exception (see remote log for the traceback)" - os.write(stdout_fd, msgpack.packb((1, msgid, e.__class__.__name__, exc))) + logging.debug('\n'.join(ex_full)) + + try: + msg = msgpack.packb({MSGID: msgid, + b'exception_class': e.__class__.__name__, + b'exception_args': e.args, + b'exception_full': ex_full, + b'exception_short': ex_short, + b'sysinfo': sysinfo()}) + except TypeError: + msg = msgpack.packb({MSGID: msgid, + b'exception_class': e.__class__.__name__, + b'exception_args': [x if isinstance(x, (str, bytes, int)) else None + for x in e.args], + b'exception_full': ex_full, + b'exception_short': ex_short, + b'sysinfo': sysinfo()}) + + os.write(stdout_fd, msg) + else: + if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)): + # These exceptions are reconstructed on the client end in RemoteRepository.call_many(), + # and will be handled just like locally raised exceptions. Suppress the remote traceback + # for these, except ErrorWithTraceback, which should always display a traceback. + pass + else: + if isinstance(e, Error): + tb_log_level = logging.ERROR if e.traceback else logging.DEBUG + msg = e.get_message() + else: + tb_log_level = logging.ERROR + msg = '%s Exception in RPC call' % e.__class__.__name__ + tb = '%s\n%s' % (traceback.format_exc(), sysinfo()) + logging.error(msg) + logging.log(tb_log_level, tb) + exc = 'Remote Exception (see remote log for the traceback)' + os.write(stdout_fd, msgpack.packb((1, msgid, e.__class__.__name__, exc))) else: - os.write(stdout_fd, msgpack.packb((1, msgid, None, res))) + if dictFormat: + os.write(stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res})) + else: + os.write(stdout_fd, msgpack.packb((1, msgid, None, res))) if es: self.repository.close() return - def negotiate(self, versions): - return RPC_PROTOCOL_VERSION + def negotiate(self, client_data): + # old format used in 1.0.x + if client_data == RPC_PROTOCOL_VERSION: + return RPC_PROTOCOL_VERSION + # clients since 1.1.0b3 use a dict as client_data + if isinstance(client_data, dict): + self.client_version = client_data[b'client_version'] + else: + self.client_version = BORG_VERSION # seems to be newer than current version (no known old format) + + # not a known old format, send newest negotiate this version knows + return {'server_version': BORG_VERSION} def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, append_only=False): - path = os.fsdecode(path) + if isinstance(path, bytes): + path = os.fsdecode(path) if path.startswith('/~'): # /~/x = path x relative to home dir, /~username/x = relative to "user" home dir path = os.path.join(get_home_dir(), path[2:]) # XXX check this (see also 1.0-maint), is it correct for ~u? elif path.startswith('/./'): # /./x = path x relative to cwd @@ -172,6 +288,27 @@ class RepositoryServer: # pragma: no cover self.repository.__enter__() # clean exit handled by serve() method return self.repository.id + def inject_exception(self, kind): + kind = kind.decode() + s1 = 'test string' + s2 = 'test string2' + if kind == 'DoesNotExist': + raise Repository.DoesNotExist(s1) + elif kind == 'AlreadyExists': + raise Repository.AlreadyExists(s1) + elif kind == 'CheckNeeded': + raise Repository.CheckNeeded(s1) + elif kind == 'IntegrityError': + raise IntegrityError(s1) + elif kind == 'PathNotAllowed': + raise PathNotAllowed() + elif kind == 'ObjectNotFound': + raise Repository.ObjectNotFound(s1, s2) + elif kind == 'InvalidRPCMethod': + raise InvalidRPCMethod(s1) + elif kind == 'divide': + 0 // 0 + class SleepingBandwidthLimiter: def __init__(self, limit): @@ -203,16 +340,100 @@ class SleepingBandwidthLimiter: return written +def api(*, since, **kwargs_decorator): + """Check version requirements and use self.call to do the remote method call. + + specifies the version in which borg introduced this method, + calling this method when connected to an older version will fail without transmiting + anything to the server. + + Further kwargs can be used to encode version specific restrictions. + If a previous hardcoded behaviour is parameterized in a version, this allows calls that + use the previously hardcoded behaviour to pass through and generates an error if another + behaviour is requested by the client. + + e.g. when 'append_only' was introduced in 1.0.7 the previous behaviour was what now is append_only=False. + Thus @api(..., append_only={'since': parse_version('1.0.7'), 'previously': False}) allows calls + with append_only=False for all version but rejects calls using append_only=True on versions older than 1.0.7. + """ + def decorator(f): + @functools.wraps(f) + def do_rpc(self, *args, **kwargs): + sig = inspect.signature(f) + bound_args = sig.bind(self, *args, **kwargs) + named = {} + for name, param in sig.parameters.items(): + if name == 'self': + continue + if name in bound_args.arguments: + named[name] = bound_args.arguments[name] + else: + if param.default is not param.empty: + named[name] = param.default + + if self.server_version < since: + raise self.RPCServerOutdated(f.__name__, format_version(since)) + + for name, restriction in kwargs_decorator.items(): + if restriction['since'] <= self.server_version: + continue + if 'previously' in restriction and named[name] == restriction['previously']: + continue + + raise self.RPCServerOutdated("{0} {1}={2!s}".format(f.__name__, name, named[name]), + format_version(restriction['since'])) + + return self.call(f.__name__, named) + return do_rpc + return decorator + + class RemoteRepository: extra_test_args = [] class RPCError(Exception): - def __init__(self, name, remote_type): - self.name = name - self.remote_type = remote_type + def __init__(self, unpacked): + # for borg < 1.1: unpacked only has b'exception_class' as key + # for borg 1.1+: unpacked has keys: b'exception_args', b'exception_full', b'exception_short', b'sysinfo' + self.unpacked = unpacked - class NoAppendOnlyOnServer(Error): - """Server does not support --append-only.""" + def get_message(self): + if b'exception_short' in self.unpacked: + return b'\n'.join(self.unpacked[b'exception_short']).decode() + else: + return self.exception_class + + @property + def exception_class(self): + return self.unpacked[b'exception_class'].decode() + + @property + def exception_full(self): + if b'exception_full' in self.unpacked: + return b'\n'.join(self.unpacked[b'exception_full']).decode() + else: + return self.get_message() + '\nRemote Exception (see remote log for the traceback)' + + @property + def sysinfo(self): + if b'sysinfo' in self.unpacked: + return self.unpacked[b'sysinfo'].decode() + else: + return '' + + class RPCServerOutdated(Error): + """Borg server is too old for {}. Required version {}""" + + @property + def method(self): + return self.args[0] + + @property + def required_version(self): + return self.args[1] + + # If compatibility with 1.0.x is not longer needed, replace all checks of this with True and simplify the code + dictFormat = False # outside of __init__ for testing of legacy free protocol def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False, args=None): self.location = self._location = location @@ -225,6 +446,7 @@ class RemoteRepository: self.ratelimit = SleepingBandwidthLimiter(args.remote_ratelimit * 1024 if args and args.remote_ratelimit else 0) self.unpacker = msgpack.Unpacker(use_list=False) + self.server_version = parse_version('1.0.8') # fallback version if server is too old to send version information self.p = None testing = location.host == '__testsuite__' borg_cmd = self.borg_cmd(args, testing) @@ -254,17 +476,31 @@ class RemoteRepository: try: try: - version = self.call('negotiate', RPC_PROTOCOL_VERSION) + version = self.call('negotiate', {'client_data': {b'client_version': BORG_VERSION}}) except ConnectionClosed: raise ConnectionClosedWithHint('Is borg working on the server?') from None - if version != RPC_PROTOCOL_VERSION: - raise Exception('Server insisted on using unsupported protocol version %d' % version) - try: - self.id = self.call('open', self.location.path, create, lock_wait, lock, exclusive, append_only) - except self.RPCError as err: - if err.remote_type != 'TypeError': - raise - msg = """\ + if version == RPC_PROTOCOL_VERSION: + self.dictFormat = False + elif isinstance(version, dict) and b'server_version' in version: + self.dictFormat = True + self.server_version = version[b'server_version'] + else: + raise Exception('Server insisted on using unsupported protocol version %s' % version) + + def do_open(): + self.id = self.open(path=self.location.path, create=create, lock_wait=lock_wait, + lock=lock, exclusive=exclusive, append_only=append_only) + + if self.dictFormat: + do_open() + else: + # Ugly detection of versions prior to 1.0.7: If open throws it has to be 1.0.6 or lower + try: + do_open() + except self.RPCError as err: + if err.exception_class != 'TypeError': + raise + msg = """\ Please note: If you see a TypeError complaining about the number of positional arguments given to open(), you can ignore it if it comes from a borg version < 1.0.7. @@ -272,21 +508,22 @@ This TypeError is a cosmetic side effect of the compatibility code borg clients >= 1.0.7 have to support older borg servers. This problem will go away as soon as the server has been upgraded to 1.0.7+. """ - # emit this msg in the same way as the "Remote: ..." lines that show the remote TypeError - sys.stderr.write(msg) - if append_only: - raise self.NoAppendOnlyOnServer() - self.id = self.call('open', self.location.path, create, lock_wait, lock) + # emit this msg in the same way as the 'Remote: ...' lines that show the remote TypeError + sys.stderr.write(msg) + self.server_version = parse_version('1.0.6') + compatMap['open'] = ('path', 'create', 'lock_wait', 'lock', ), + # try again with corrected version and compatMap + do_open() except Exception: self.close() raise def __del__(self): if len(self.responses): - logging.debug("still %d cached responses left in RemoteRepository" % (len(self.responses),)) + logging.debug('still %d cached responses left in RemoteRepository' % (len(self.responses),)) if self.p: self.close() - assert False, "cleanup happened in Repository.__del__" + assert False, 'cleanup happened in Repository.__del__' def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.location.canonical_path()) @@ -310,7 +547,7 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. def borg_cmd(self, args, testing): """return a borg serve command line""" - # give some args/options to "borg serve" process as they were given to us + # give some args/options to 'borg serve' process as they were given to us opts = [] if args is not None: opts.append('--umask=%03o' % args.umask) @@ -348,7 +585,10 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. args.append('%s' % location.host) return args - def call(self, cmd, *args, **kw): + def named_to_positional(self, method, kwargs): + return [kwargs[name] for name in compatMap[method]] + + def call(self, cmd, args, **kw): for resp in self.call_many(cmd, [args], **kw): return resp @@ -362,8 +602,11 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. del self.chunkid_to_msgids[chunkid] return msgid - def handle_error(error, res): - error = error.decode('utf-8') + def handle_error(unpacked): + error = unpacked[b'exception_class'].decode() + old_server = b'exception_args' not in unpacked + args = unpacked.get(b'exception_args') + if error == 'DoesNotExist': raise Repository.DoesNotExist(self.location.orig) elif error == 'AlreadyExists': @@ -371,27 +614,36 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. elif error == 'CheckNeeded': raise Repository.CheckNeeded(self.location.orig) elif error == 'IntegrityError': - raise IntegrityError(res) + if old_server: + raise IntegrityError('(not available)') + else: + raise IntegrityError(args[0].decode()) elif error == 'PathNotAllowed': - raise PathNotAllowed(*res) + raise PathNotAllowed() elif error == 'ObjectNotFound': - raise Repository.ObjectNotFound(res[0], self.location.orig) + if old_server: + raise Repository.ObjectNotFound('(not available)', self.location.orig) + else: + raise Repository.ObjectNotFound(args[0].decode(), self.location.orig) elif error == 'InvalidRPCMethod': - raise InvalidRPCMethod(*res) + if old_server: + raise InvalidRPCMethod('(not available)') + else: + raise InvalidRPCMethod(args[0].decode()) else: - raise self.RPCError(res.decode('utf-8'), error) + raise self.RPCError(unpacked) calls = list(calls) waiting_for = [] while wait or calls: while waiting_for: try: - error, res = self.responses.pop(waiting_for[0]) + unpacked = self.responses.pop(waiting_for[0]) waiting_for.pop(0) - if error: - handle_error(error, res) + if b'exception_class' in unpacked: + handle_error(unpacked) else: - yield res + yield unpacked[RESULT] if not waiting_for and not calls: return except KeyError: @@ -410,15 +662,24 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. raise ConnectionClosed() self.unpacker.feed(data) for unpacked in self.unpacker: - if not (isinstance(unpacked, tuple) and len(unpacked) == 4): + if isinstance(unpacked, dict): + msgid = unpacked[MSGID] + elif isinstance(unpacked, tuple) and len(unpacked) == 4: + # The first field 'type' was always 1 and has always been ignored + _, msgid, error, res = unpacked + if error: + # ignore res, because it is only a fixed string anyway. + unpacked = {MSGID: msgid, b'exception_class': error} + else: + unpacked = {MSGID: msgid, RESULT: res} + else: raise UnexpectedRPCDataFormatFromServer() - type, msgid, error, res = unpacked if msgid in self.ignore_responses: self.ignore_responses.remove(msgid) - if error: - handle_error(error, res) + if b'exception_class' in unpacked: + handle_error(unpacked) else: - self.responses[msgid] = error, res + self.responses[msgid] = unpacked elif fd is self.stderr_fd: data = os.read(fd, 32768) if not data: @@ -430,23 +691,29 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. while not self.to_send and (calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT: if calls: if is_preloaded: - assert cmd == "get", "is_preload is only supported for 'get'" - if calls[0][0] in self.chunkid_to_msgids: - waiting_for.append(pop_preload_msgid(calls.pop(0)[0])) + assert cmd == 'get', "is_preload is only supported for 'get'" + if calls[0]['id'] in self.chunkid_to_msgids: + waiting_for.append(pop_preload_msgid(calls.pop(0)['id'])) else: args = calls.pop(0) - if cmd == 'get' and args[0] in self.chunkid_to_msgids: - waiting_for.append(pop_preload_msgid(args[0])) + if cmd == 'get' and args['id'] in self.chunkid_to_msgids: + waiting_for.append(pop_preload_msgid(args['id'])) else: self.msgid += 1 waiting_for.append(self.msgid) - self.to_send = msgpack.packb((1, self.msgid, cmd, args)) + if self.dictFormat: + self.to_send = msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}) + else: + self.to_send = msgpack.packb((1, self.msgid, cmd, self.named_to_positional(cmd, args))) if not self.to_send and self.preload_ids: chunk_id = self.preload_ids.pop(0) - args = (chunk_id,) + args = {'id': chunk_id} self.msgid += 1 self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid) - self.to_send = msgpack.packb((1, self.msgid, 'get', args)) + if self.dictFormat: + self.to_send = msgpack.packb({MSGID: self.msgid, MSG: 'get', ARGS: args}) + else: + self.to_send = msgpack.packb((1, self.msgid, 'get', self.named_to_positional(cmd, args))) if self.to_send: try: @@ -458,55 +725,74 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. raise self.ignore_responses |= set(waiting_for) + @api(since=parse_version('1.0.0'), + append_only={'since': parse_version('1.0.7'), 'previously': False}) + def open(self, path, create=False, lock_wait=None, lock=True, exclusive=False, append_only=False): + """actual remoting is done via self.call in the @api decorator""" + + @api(since=parse_version('1.0.0')) def check(self, repair=False, save_space=False): - return self.call('check', repair, save_space) + """actual remoting is done via self.call in the @api decorator""" + @api(since=parse_version('1.0.0')) def commit(self, save_space=False): - return self.call('commit', save_space) + """actual remoting is done via self.call in the @api decorator""" - def rollback(self, *args): - return self.call('rollback') + @api(since=parse_version('1.0.0')) + def rollback(self): + """actual remoting is done via self.call in the @api decorator""" + @api(since=parse_version('1.0.0')) def destroy(self): - return self.call('destroy') + """actual remoting is done via self.call in the @api decorator""" + @api(since=parse_version('1.0.0')) def __len__(self): - return self.call('__len__') + """actual remoting is done via self.call in the @api decorator""" + @api(since=parse_version('1.0.0')) def list(self, limit=None, marker=None): - return self.call('list', limit, marker) + """actual remoting is done via self.call in the @api decorator""" + @api(since=parse_version('1.1.0b3')) def scan(self, limit=None, marker=None): - return self.call('scan', limit, marker) + """actual remoting is done via self.call in the @api decorator""" - def get(self, id_): - for resp in self.get_many([id_]): + def get(self, id): + for resp in self.get_many([id]): return resp def get_many(self, ids, is_preloaded=False): - for resp in self.call_many('get', [(id_,) for id_ in ids], is_preloaded=is_preloaded): + for resp in self.call_many('get', [{'id': id} for id in ids], is_preloaded=is_preloaded): yield resp - def put(self, id_, data, wait=True): - return self.call('put', id_, data, wait=wait) + @api(since=parse_version('1.0.0')) + def put(self, id, data, wait=True): + """actual remoting is done via self.call in the @api decorator""" - def delete(self, id_, wait=True): - return self.call('delete', id_, wait=wait) + @api(since=parse_version('1.0.0')) + def delete(self, id, wait=True): + """actual remoting is done via self.call in the @api decorator""" + @api(since=parse_version('1.0.0')) def save_key(self, keydata): - return self.call('save_key', keydata) + """actual remoting is done via self.call in the @api decorator""" + @api(since=parse_version('1.0.0')) def load_key(self): - return self.call('load_key') + """actual remoting is done via self.call in the @api decorator""" + @api(since=parse_version('1.0.0')) def get_free_nonce(self): - return self.call('get_free_nonce') + """actual remoting is done via self.call in the @api decorator""" + @api(since=parse_version('1.0.0')) def commit_nonce_reservation(self, next_unreserved, start_nonce): - return self.call('commit_nonce_reservation', next_unreserved, start_nonce) + """actual remoting is done via self.call in the @api decorator""" + @api(since=parse_version('1.0.0')) def break_lock(self): - return self.call('break_lock') + """actual remoting is done via self.call in the @api decorator""" def close(self): if self.p: @@ -531,7 +817,7 @@ def handle_remote_line(line): logname, msg = msg.split(' ', 1) logging.getLogger(logname).log(level, msg.rstrip()) else: - sys.stderr.write("Remote: " + line) + sys.stderr.write('Remote: ' + line) class RepositoryNoCache: diff --git a/src/borg/repository.py b/src/borg/repository.py index 7d1bf8294..fda085a63 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -825,14 +825,14 @@ class Repository: return result return result - def get(self, id_): + def get(self, id): if not self.index: self.index = self.open_index(self.get_transaction_id()) try: - segment, offset = self.index[id_] - return self.io.read(segment, offset, id_) + segment, offset = self.index[id] + return self.io.read(segment, offset, id) except KeyError: - raise self.ObjectNotFound(id_, self.path) from None + raise self.ObjectNotFound(id, self.path) from None def get_many(self, ids, is_preloaded=False): for id_ in ids: diff --git a/src/borg/testsuite/repository.py b/src/borg/testsuite/repository.py index 6d6f00a71..8d77e1205 100644 --- a/src/borg/testsuite/repository.py +++ b/src/borg/testsuite/repository.py @@ -12,7 +12,7 @@ from ..hashindex import NSIndex from ..helpers import Location from ..helpers import IntegrityError from ..locking import Lock, LockFailed -from ..remote import RemoteRepository, InvalidRPCMethod, ConnectionClosedWithHint, handle_remote_line +from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, ConnectionClosedWithHint, handle_remote_line from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE from . import BaseTestCase from .hashindex import H @@ -647,7 +647,60 @@ class RemoteRepositoryTestCase(RepositoryTestCase): exclusive=True, create=create) def test_invalid_rpc(self): - self.assert_raises(InvalidRPCMethod, lambda: self.repository.call('__init__', None)) + self.assert_raises(InvalidRPCMethod, lambda: self.repository.call('__init__', {})) + + def test_rpc_exception_transport(self): + s1 = 'test string' + + try: + self.repository.call('inject_exception', {'kind': 'DoesNotExist'}) + except Repository.DoesNotExist as e: + assert len(e.args) == 1 + assert e.args[0] == self.repository.location.orig + + try: + self.repository.call('inject_exception', {'kind': 'AlreadyExists'}) + except Repository.AlreadyExists as e: + assert len(e.args) == 1 + assert e.args[0] == self.repository.location.orig + + try: + self.repository.call('inject_exception', {'kind': 'CheckNeeded'}) + except Repository.CheckNeeded as e: + assert len(e.args) == 1 + assert e.args[0] == self.repository.location.orig + + try: + self.repository.call('inject_exception', {'kind': 'IntegrityError'}) + except IntegrityError as e: + assert len(e.args) == 1 + assert e.args[0] == s1 + + try: + self.repository.call('inject_exception', {'kind': 'PathNotAllowed'}) + except PathNotAllowed as e: + assert len(e.args) == 0 + + try: + self.repository.call('inject_exception', {'kind': 'ObjectNotFound'}) + except Repository.ObjectNotFound as e: + assert len(e.args) == 2 + assert e.args[0] == s1 + assert e.args[1] == self.repository.location.orig + + try: + self.repository.call('inject_exception', {'kind': 'InvalidRPCMethod'}) + except InvalidRPCMethod as e: + assert len(e.args) == 1 + assert e.args[0] == s1 + + try: + self.repository.call('inject_exception', {'kind': 'divide'}) + except RemoteRepository.RPCError as e: + assert e.unpacked + assert e.get_message() == 'ZeroDivisionError: integer division or modulo by zero\n' + assert e.exception_class == 'ZeroDivisionError' + assert len(e.exception_full) > 0 def test_ssh_cmd(self): assert self.repository.ssh_cmd(Location('example.com:foo')) == ['ssh', 'example.com'] @@ -670,6 +723,31 @@ class RemoteRepositoryTestCase(RepositoryTestCase): assert self.repository.borg_cmd(args, testing=False) == ['borg-0.28.2', 'serve', '--umask=077', '--info'] +class RemoteLegacyFree(RepositoryTestCaseBase): + # Keep testing this so we can someday safely remove the legacy tuple format. + + def open(self, create=False): + with patch.object(RemoteRepository, 'dictFormat', True): + return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')), + exclusive=True, create=create) + + def test_legacy_free(self): + # put + self.repository.put(H(0), b'foo') + self.repository.commit() + self.repository.close() + # replace + self.repository = self.open() + with self.repository: + self.repository.put(H(0), b'bar') + self.repository.commit() + # delete + self.repository = self.open() + with self.repository: + self.repository.delete(H(0)) + self.repository.commit() + + @pytest.mark.skipif(sys.platform == 'cygwin', reason='remote is broken on cygwin and hangs') class RemoteRepositoryCheckTestCase(RepositoryCheckTestCase): diff --git a/src/borg/version.py b/src/borg/version.py new file mode 100644 index 000000000..4eb0c77d5 --- /dev/null +++ b/src/borg/version.py @@ -0,0 +1,49 @@ +import re + + +def parse_version(version): + """ + simplistic parser for setuptools_scm versions + + supports final versions and alpha ('a'), beta ('b') and rc versions. It just discards commits since last tag + and git revision hash. + + Output is a version tuple containing integers. It ends with one or two elements that ensure that relational + operators yield correct relations for alpha, beta and rc versions too. For final versions the last element + is a -1, for prerelease versions the last two elements are a smaller negative number and the number of e.g. + the beta. + + Note, this sorts version 1.0 before 1.0.0. + + This version format is part of the remote protocol, don‘t change in breaking ways. + """ + + parts = version.split('+')[0].split('.') + if parts[-1].startswith('dev'): + del parts[-1] + version = [int(segment) for segment in parts[:-1]] + + prerelease = re.fullmatch('([0-9]+)(a|b|rc)([0-9]+)', parts[-1]) + if prerelease: + version_type = {'a': -4, 'b': -3, 'rc': -2}[prerelease.group(2)] + version += [int(prerelease.group(1)), version_type, int(prerelease.group(3))] + else: + version += [int(parts[-1]), -1] + + return tuple(version) + + +def format_version(version): + """a reverse for parse_version (obviously without the dropped information)""" + f = [] + it = iter(version) + while True: + part = next(it) + if part >= 0: + f += str(part) + elif part == -1: + break + else: + f[-1] = f[-1] + {-2: 'rc', -3: 'b', -4: 'a'}[part] + str(next(it)) + break + return '.'.join(f)