mirror of
https://github.com/borgbackup/borg.git
synced 2026-06-08 16:23:42 -04:00
Merge pull request #1419 from textshell/feature/rpc-named-arguments
RFC: Make rpc protocol more extensible
This commit is contained in:
commit
c14b962bad
5 changed files with 519 additions and 102 deletions
|
|
@ -2753,10 +2753,14 @@ def main(): # pragma: no cover
|
|||
tb = "%s\n%s" % (traceback.format_exc(), sysinfo())
|
||||
exit_code = e.exit_code
|
||||
except RemoteRepository.RPCError as e:
|
||||
msg = "%s %s" % (e.remote_type, e.name)
|
||||
important = e.remote_type not in ('LockTimeout', )
|
||||
important = e.exception_class not in ('LockTimeout', )
|
||||
tb_log_level = logging.ERROR if important else logging.DEBUG
|
||||
tb = sysinfo()
|
||||
if important:
|
||||
msg = e.exception_full
|
||||
else:
|
||||
msg = e.get_message()
|
||||
tb = '\n'.join('Borg server: ' + l for l in e.sysinfo.splitlines())
|
||||
tb += "\n" + sysinfo()
|
||||
exit_code = EXIT_ERROR
|
||||
except Exception:
|
||||
msg = 'Local Exception'
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
import errno
|
||||
import fcntl
|
||||
import functools
|
||||
import inspect
|
||||
import logging
|
||||
import os
|
||||
import select
|
||||
|
|
@ -20,8 +22,11 @@ from .helpers import bin_to_hex
|
|||
from .helpers import replace_placeholders
|
||||
from .helpers import yes
|
||||
from .repository import Repository
|
||||
from .version import parse_version, format_version
|
||||
|
||||
RPC_PROTOCOL_VERSION = 2
|
||||
BORG_VERSION = parse_version(__version__)
|
||||
MSGID, MSG, ARGS, RESULT = b'i', b'm', b'a', b'r'
|
||||
|
||||
BUFSIZE = 10 * 1024 * 1024
|
||||
|
||||
|
|
@ -54,6 +59,51 @@ class UnexpectedRPCDataFormatFromServer(Error):
|
|||
"""Got unexpected RPC data format from server."""
|
||||
|
||||
|
||||
# Protocol compatibility:
|
||||
# In general the server is responsible for rejecting too old clients and the client it responsible for rejecting
|
||||
# too old servers. This ensures that the knowledge what is compatible is always held by the newer component.
|
||||
#
|
||||
# The server can do checks for the client version in RepositoryServer.negotiate. If the client_data is 2 then
|
||||
# client is in the version range [0.29.0, 1.0.x] inclusive. For newer clients client_data is a dict which contains
|
||||
# client_version.
|
||||
#
|
||||
# For the client the return of the negotiate method is either 2 if the server is in the version range [0.29.0, 1.0.x]
|
||||
# inclusive, or it is a dict which includes the server version.
|
||||
#
|
||||
# All method calls on the remote repository object must be whitelisted in RepositoryServer.rpc_methods and have api
|
||||
# stubs in RemoteRepository. The @api decorator on these stubs is used to set server version requirements.
|
||||
#
|
||||
# Method parameters are identified only by name and never by position. Unknown parameters are ignored by the server side.
|
||||
# If a new parameter is important and may not be ignored, on the client a parameter specific version requirement needs
|
||||
# to be added.
|
||||
# When parameters are removed, they need to be preserved as defaulted parameters on the client stubs so that older
|
||||
# servers still get compatible input.
|
||||
|
||||
|
||||
compatMap = {
|
||||
'check': ('repair', 'save_space', ),
|
||||
'commit': ('save_space', ),
|
||||
'rollback': (),
|
||||
'destroy': (),
|
||||
'__len__': (),
|
||||
'list': ('limit', 'marker', ),
|
||||
'put': ('id', 'data', ),
|
||||
'get': ('id', ),
|
||||
'delete': ('id', ),
|
||||
'save_key': ('keydata', ),
|
||||
'load_key': (),
|
||||
'break_lock': (),
|
||||
'negotiate': ('client_data', ),
|
||||
'open': ('path', 'create', 'lock_wait', 'lock', 'exclusive', 'append_only', ),
|
||||
'get_free_nonce': (),
|
||||
'commit_nonce_reservation': ('next_unreserved', 'start_nonce', ),
|
||||
}
|
||||
|
||||
|
||||
def decode_keys(d):
|
||||
return {k.decode(): d[k] for k in d}
|
||||
|
||||
|
||||
class RepositoryServer: # pragma: no cover
|
||||
rpc_methods = (
|
||||
'__len__',
|
||||
|
|
@ -72,13 +122,24 @@ class RepositoryServer: # pragma: no cover
|
|||
'load_key',
|
||||
'break_lock',
|
||||
'get_free_nonce',
|
||||
'commit_nonce_reservation'
|
||||
'commit_nonce_reservation',
|
||||
'inject_exception',
|
||||
)
|
||||
|
||||
def __init__(self, restrict_to_paths, append_only):
|
||||
self.repository = None
|
||||
self.restrict_to_paths = restrict_to_paths
|
||||
self.append_only = append_only
|
||||
self.client_version = parse_version('1.0.8') # fallback version if client is too old to send version information
|
||||
|
||||
def positional_to_named(self, method, argv):
|
||||
"""Translate from positional protocol to named protocol."""
|
||||
return {name: argv[pos] for pos, name in enumerate(compatMap[method])}
|
||||
|
||||
def filter_args(self, f, kwargs):
|
||||
"""Remove unknown named parameters from call, because client did (implicitly) say it's ok."""
|
||||
known = set(inspect.signature(f).parameters)
|
||||
return {name: kwargs[name] for name in kwargs if name in known}
|
||||
|
||||
def serve(self):
|
||||
stdin_fd = sys.stdin.fileno()
|
||||
|
|
@ -102,17 +163,26 @@ class RepositoryServer: # pragma: no cover
|
|||
if self.repository is not None:
|
||||
self.repository.close()
|
||||
else:
|
||||
os.write(stderr_fd, "Borg {}: Got connection close before repository was opened.\n"
|
||||
os.write(stderr_fd, 'Borg {}: Got connection close before repository was opened.\n'
|
||||
.format(__version__).encode())
|
||||
return
|
||||
unpacker.feed(data)
|
||||
for unpacked in unpacker:
|
||||
if not (isinstance(unpacked, tuple) and len(unpacked) == 4):
|
||||
if isinstance(unpacked, dict):
|
||||
dictFormat = True
|
||||
msgid = unpacked[MSGID]
|
||||
method = unpacked[MSG].decode()
|
||||
args = decode_keys(unpacked[ARGS])
|
||||
elif isinstance(unpacked, tuple) and len(unpacked) == 4:
|
||||
dictFormat = False
|
||||
# The first field 'type' was always 1 and has always been ignored
|
||||
_, msgid, method, args = unpacked
|
||||
method = method.decode()
|
||||
args = self.positional_to_named(method, args)
|
||||
else:
|
||||
if self.repository is not None:
|
||||
self.repository.close()
|
||||
raise UnexpectedRPCDataFormatFromClient(__version__)
|
||||
type, msgid, method, args = unpacked
|
||||
method = method.decode('ascii')
|
||||
try:
|
||||
if method not in self.rpc_methods:
|
||||
raise InvalidRPCMethod(method)
|
||||
|
|
@ -120,36 +190,82 @@ class RepositoryServer: # pragma: no cover
|
|||
f = getattr(self, method)
|
||||
except AttributeError:
|
||||
f = getattr(self.repository, method)
|
||||
res = f(*args)
|
||||
args = self.filter_args(f, args)
|
||||
res = f(**args)
|
||||
except BaseException as e:
|
||||
if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
|
||||
# These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
|
||||
# and will be handled just like locally raised exceptions. Suppress the remote traceback
|
||||
# for these, except ErrorWithTraceback, which should always display a traceback.
|
||||
pass
|
||||
else:
|
||||
if dictFormat:
|
||||
ex_short = traceback.format_exception_only(e.__class__, e)
|
||||
ex_full = traceback.format_exception(*sys.exc_info())
|
||||
if isinstance(e, Error):
|
||||
tb_log_level = logging.ERROR if e.traceback else logging.DEBUG
|
||||
msg = e.get_message()
|
||||
ex_short = e.get_message()
|
||||
if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
|
||||
# These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
|
||||
# and will be handled just like locally raised exceptions. Suppress the remote traceback
|
||||
# for these, except ErrorWithTraceback, which should always display a traceback.
|
||||
pass
|
||||
else:
|
||||
tb_log_level = logging.ERROR
|
||||
msg = '%s Exception in RPC call' % e.__class__.__name__
|
||||
tb = '%s\n%s' % (traceback.format_exc(), sysinfo())
|
||||
logging.error(msg)
|
||||
logging.log(tb_log_level, tb)
|
||||
exc = "Remote Exception (see remote log for the traceback)"
|
||||
os.write(stdout_fd, msgpack.packb((1, msgid, e.__class__.__name__, exc)))
|
||||
logging.debug('\n'.join(ex_full))
|
||||
|
||||
try:
|
||||
msg = msgpack.packb({MSGID: msgid,
|
||||
b'exception_class': e.__class__.__name__,
|
||||
b'exception_args': e.args,
|
||||
b'exception_full': ex_full,
|
||||
b'exception_short': ex_short,
|
||||
b'sysinfo': sysinfo()})
|
||||
except TypeError:
|
||||
msg = msgpack.packb({MSGID: msgid,
|
||||
b'exception_class': e.__class__.__name__,
|
||||
b'exception_args': [x if isinstance(x, (str, bytes, int)) else None
|
||||
for x in e.args],
|
||||
b'exception_full': ex_full,
|
||||
b'exception_short': ex_short,
|
||||
b'sysinfo': sysinfo()})
|
||||
|
||||
os.write(stdout_fd, msg)
|
||||
else:
|
||||
if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
|
||||
# These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
|
||||
# and will be handled just like locally raised exceptions. Suppress the remote traceback
|
||||
# for these, except ErrorWithTraceback, which should always display a traceback.
|
||||
pass
|
||||
else:
|
||||
if isinstance(e, Error):
|
||||
tb_log_level = logging.ERROR if e.traceback else logging.DEBUG
|
||||
msg = e.get_message()
|
||||
else:
|
||||
tb_log_level = logging.ERROR
|
||||
msg = '%s Exception in RPC call' % e.__class__.__name__
|
||||
tb = '%s\n%s' % (traceback.format_exc(), sysinfo())
|
||||
logging.error(msg)
|
||||
logging.log(tb_log_level, tb)
|
||||
exc = 'Remote Exception (see remote log for the traceback)'
|
||||
os.write(stdout_fd, msgpack.packb((1, msgid, e.__class__.__name__, exc)))
|
||||
else:
|
||||
os.write(stdout_fd, msgpack.packb((1, msgid, None, res)))
|
||||
if dictFormat:
|
||||
os.write(stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res}))
|
||||
else:
|
||||
os.write(stdout_fd, msgpack.packb((1, msgid, None, res)))
|
||||
if es:
|
||||
self.repository.close()
|
||||
return
|
||||
|
||||
def negotiate(self, versions):
|
||||
return RPC_PROTOCOL_VERSION
|
||||
def negotiate(self, client_data):
|
||||
# old format used in 1.0.x
|
||||
if client_data == RPC_PROTOCOL_VERSION:
|
||||
return RPC_PROTOCOL_VERSION
|
||||
# clients since 1.1.0b3 use a dict as client_data
|
||||
if isinstance(client_data, dict):
|
||||
self.client_version = client_data[b'client_version']
|
||||
else:
|
||||
self.client_version = BORG_VERSION # seems to be newer than current version (no known old format)
|
||||
|
||||
# not a known old format, send newest negotiate this version knows
|
||||
return {'server_version': BORG_VERSION}
|
||||
|
||||
def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, append_only=False):
|
||||
path = os.fsdecode(path)
|
||||
if isinstance(path, bytes):
|
||||
path = os.fsdecode(path)
|
||||
if path.startswith('/~'): # /~/x = path x relative to home dir, /~username/x = relative to "user" home dir
|
||||
path = os.path.join(get_home_dir(), path[2:]) # XXX check this (see also 1.0-maint), is it correct for ~u?
|
||||
elif path.startswith('/./'): # /./x = path x relative to cwd
|
||||
|
|
@ -172,6 +288,27 @@ class RepositoryServer: # pragma: no cover
|
|||
self.repository.__enter__() # clean exit handled by serve() method
|
||||
return self.repository.id
|
||||
|
||||
def inject_exception(self, kind):
|
||||
kind = kind.decode()
|
||||
s1 = 'test string'
|
||||
s2 = 'test string2'
|
||||
if kind == 'DoesNotExist':
|
||||
raise Repository.DoesNotExist(s1)
|
||||
elif kind == 'AlreadyExists':
|
||||
raise Repository.AlreadyExists(s1)
|
||||
elif kind == 'CheckNeeded':
|
||||
raise Repository.CheckNeeded(s1)
|
||||
elif kind == 'IntegrityError':
|
||||
raise IntegrityError(s1)
|
||||
elif kind == 'PathNotAllowed':
|
||||
raise PathNotAllowed()
|
||||
elif kind == 'ObjectNotFound':
|
||||
raise Repository.ObjectNotFound(s1, s2)
|
||||
elif kind == 'InvalidRPCMethod':
|
||||
raise InvalidRPCMethod(s1)
|
||||
elif kind == 'divide':
|
||||
0 // 0
|
||||
|
||||
|
||||
class SleepingBandwidthLimiter:
|
||||
def __init__(self, limit):
|
||||
|
|
@ -203,16 +340,100 @@ class SleepingBandwidthLimiter:
|
|||
return written
|
||||
|
||||
|
||||
def api(*, since, **kwargs_decorator):
|
||||
"""Check version requirements and use self.call to do the remote method call.
|
||||
|
||||
<since> specifies the version in which borg introduced this method,
|
||||
calling this method when connected to an older version will fail without transmiting
|
||||
anything to the server.
|
||||
|
||||
Further kwargs can be used to encode version specific restrictions.
|
||||
If a previous hardcoded behaviour is parameterized in a version, this allows calls that
|
||||
use the previously hardcoded behaviour to pass through and generates an error if another
|
||||
behaviour is requested by the client.
|
||||
|
||||
e.g. when 'append_only' was introduced in 1.0.7 the previous behaviour was what now is append_only=False.
|
||||
Thus @api(..., append_only={'since': parse_version('1.0.7'), 'previously': False}) allows calls
|
||||
with append_only=False for all version but rejects calls using append_only=True on versions older than 1.0.7.
|
||||
"""
|
||||
def decorator(f):
|
||||
@functools.wraps(f)
|
||||
def do_rpc(self, *args, **kwargs):
|
||||
sig = inspect.signature(f)
|
||||
bound_args = sig.bind(self, *args, **kwargs)
|
||||
named = {}
|
||||
for name, param in sig.parameters.items():
|
||||
if name == 'self':
|
||||
continue
|
||||
if name in bound_args.arguments:
|
||||
named[name] = bound_args.arguments[name]
|
||||
else:
|
||||
if param.default is not param.empty:
|
||||
named[name] = param.default
|
||||
|
||||
if self.server_version < since:
|
||||
raise self.RPCServerOutdated(f.__name__, format_version(since))
|
||||
|
||||
for name, restriction in kwargs_decorator.items():
|
||||
if restriction['since'] <= self.server_version:
|
||||
continue
|
||||
if 'previously' in restriction and named[name] == restriction['previously']:
|
||||
continue
|
||||
|
||||
raise self.RPCServerOutdated("{0} {1}={2!s}".format(f.__name__, name, named[name]),
|
||||
format_version(restriction['since']))
|
||||
|
||||
return self.call(f.__name__, named)
|
||||
return do_rpc
|
||||
return decorator
|
||||
|
||||
|
||||
class RemoteRepository:
|
||||
extra_test_args = []
|
||||
|
||||
class RPCError(Exception):
|
||||
def __init__(self, name, remote_type):
|
||||
self.name = name
|
||||
self.remote_type = remote_type
|
||||
def __init__(self, unpacked):
|
||||
# for borg < 1.1: unpacked only has b'exception_class' as key
|
||||
# for borg 1.1+: unpacked has keys: b'exception_args', b'exception_full', b'exception_short', b'sysinfo'
|
||||
self.unpacked = unpacked
|
||||
|
||||
class NoAppendOnlyOnServer(Error):
|
||||
"""Server does not support --append-only."""
|
||||
def get_message(self):
|
||||
if b'exception_short' in self.unpacked:
|
||||
return b'\n'.join(self.unpacked[b'exception_short']).decode()
|
||||
else:
|
||||
return self.exception_class
|
||||
|
||||
@property
|
||||
def exception_class(self):
|
||||
return self.unpacked[b'exception_class'].decode()
|
||||
|
||||
@property
|
||||
def exception_full(self):
|
||||
if b'exception_full' in self.unpacked:
|
||||
return b'\n'.join(self.unpacked[b'exception_full']).decode()
|
||||
else:
|
||||
return self.get_message() + '\nRemote Exception (see remote log for the traceback)'
|
||||
|
||||
@property
|
||||
def sysinfo(self):
|
||||
if b'sysinfo' in self.unpacked:
|
||||
return self.unpacked[b'sysinfo'].decode()
|
||||
else:
|
||||
return ''
|
||||
|
||||
class RPCServerOutdated(Error):
|
||||
"""Borg server is too old for {}. Required version {}"""
|
||||
|
||||
@property
|
||||
def method(self):
|
||||
return self.args[0]
|
||||
|
||||
@property
|
||||
def required_version(self):
|
||||
return self.args[1]
|
||||
|
||||
# If compatibility with 1.0.x is not longer needed, replace all checks of this with True and simplify the code
|
||||
dictFormat = False # outside of __init__ for testing of legacy free protocol
|
||||
|
||||
def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False, args=None):
|
||||
self.location = self._location = location
|
||||
|
|
@ -225,6 +446,7 @@ class RemoteRepository:
|
|||
self.ratelimit = SleepingBandwidthLimiter(args.remote_ratelimit * 1024 if args and args.remote_ratelimit else 0)
|
||||
|
||||
self.unpacker = msgpack.Unpacker(use_list=False)
|
||||
self.server_version = parse_version('1.0.8') # fallback version if server is too old to send version information
|
||||
self.p = None
|
||||
testing = location.host == '__testsuite__'
|
||||
borg_cmd = self.borg_cmd(args, testing)
|
||||
|
|
@ -254,17 +476,31 @@ class RemoteRepository:
|
|||
|
||||
try:
|
||||
try:
|
||||
version = self.call('negotiate', RPC_PROTOCOL_VERSION)
|
||||
version = self.call('negotiate', {'client_data': {b'client_version': BORG_VERSION}})
|
||||
except ConnectionClosed:
|
||||
raise ConnectionClosedWithHint('Is borg working on the server?') from None
|
||||
if version != RPC_PROTOCOL_VERSION:
|
||||
raise Exception('Server insisted on using unsupported protocol version %d' % version)
|
||||
try:
|
||||
self.id = self.call('open', self.location.path, create, lock_wait, lock, exclusive, append_only)
|
||||
except self.RPCError as err:
|
||||
if err.remote_type != 'TypeError':
|
||||
raise
|
||||
msg = """\
|
||||
if version == RPC_PROTOCOL_VERSION:
|
||||
self.dictFormat = False
|
||||
elif isinstance(version, dict) and b'server_version' in version:
|
||||
self.dictFormat = True
|
||||
self.server_version = version[b'server_version']
|
||||
else:
|
||||
raise Exception('Server insisted on using unsupported protocol version %s' % version)
|
||||
|
||||
def do_open():
|
||||
self.id = self.open(path=self.location.path, create=create, lock_wait=lock_wait,
|
||||
lock=lock, exclusive=exclusive, append_only=append_only)
|
||||
|
||||
if self.dictFormat:
|
||||
do_open()
|
||||
else:
|
||||
# Ugly detection of versions prior to 1.0.7: If open throws it has to be 1.0.6 or lower
|
||||
try:
|
||||
do_open()
|
||||
except self.RPCError as err:
|
||||
if err.exception_class != 'TypeError':
|
||||
raise
|
||||
msg = """\
|
||||
Please note:
|
||||
If you see a TypeError complaining about the number of positional arguments
|
||||
given to open(), you can ignore it if it comes from a borg version < 1.0.7.
|
||||
|
|
@ -272,21 +508,22 @@ This TypeError is a cosmetic side effect of the compatibility code borg
|
|||
clients >= 1.0.7 have to support older borg servers.
|
||||
This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
||||
"""
|
||||
# emit this msg in the same way as the "Remote: ..." lines that show the remote TypeError
|
||||
sys.stderr.write(msg)
|
||||
if append_only:
|
||||
raise self.NoAppendOnlyOnServer()
|
||||
self.id = self.call('open', self.location.path, create, lock_wait, lock)
|
||||
# emit this msg in the same way as the 'Remote: ...' lines that show the remote TypeError
|
||||
sys.stderr.write(msg)
|
||||
self.server_version = parse_version('1.0.6')
|
||||
compatMap['open'] = ('path', 'create', 'lock_wait', 'lock', ),
|
||||
# try again with corrected version and compatMap
|
||||
do_open()
|
||||
except Exception:
|
||||
self.close()
|
||||
raise
|
||||
|
||||
def __del__(self):
|
||||
if len(self.responses):
|
||||
logging.debug("still %d cached responses left in RemoteRepository" % (len(self.responses),))
|
||||
logging.debug('still %d cached responses left in RemoteRepository' % (len(self.responses),))
|
||||
if self.p:
|
||||
self.close()
|
||||
assert False, "cleanup happened in Repository.__del__"
|
||||
assert False, 'cleanup happened in Repository.__del__'
|
||||
|
||||
def __repr__(self):
|
||||
return '<%s %s>' % (self.__class__.__name__, self.location.canonical_path())
|
||||
|
|
@ -310,7 +547,7 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
|||
|
||||
def borg_cmd(self, args, testing):
|
||||
"""return a borg serve command line"""
|
||||
# give some args/options to "borg serve" process as they were given to us
|
||||
# give some args/options to 'borg serve' process as they were given to us
|
||||
opts = []
|
||||
if args is not None:
|
||||
opts.append('--umask=%03o' % args.umask)
|
||||
|
|
@ -348,7 +585,10 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
|||
args.append('%s' % location.host)
|
||||
return args
|
||||
|
||||
def call(self, cmd, *args, **kw):
|
||||
def named_to_positional(self, method, kwargs):
|
||||
return [kwargs[name] for name in compatMap[method]]
|
||||
|
||||
def call(self, cmd, args, **kw):
|
||||
for resp in self.call_many(cmd, [args], **kw):
|
||||
return resp
|
||||
|
||||
|
|
@ -362,8 +602,11 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
|||
del self.chunkid_to_msgids[chunkid]
|
||||
return msgid
|
||||
|
||||
def handle_error(error, res):
|
||||
error = error.decode('utf-8')
|
||||
def handle_error(unpacked):
|
||||
error = unpacked[b'exception_class'].decode()
|
||||
old_server = b'exception_args' not in unpacked
|
||||
args = unpacked.get(b'exception_args')
|
||||
|
||||
if error == 'DoesNotExist':
|
||||
raise Repository.DoesNotExist(self.location.orig)
|
||||
elif error == 'AlreadyExists':
|
||||
|
|
@ -371,27 +614,36 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
|||
elif error == 'CheckNeeded':
|
||||
raise Repository.CheckNeeded(self.location.orig)
|
||||
elif error == 'IntegrityError':
|
||||
raise IntegrityError(res)
|
||||
if old_server:
|
||||
raise IntegrityError('(not available)')
|
||||
else:
|
||||
raise IntegrityError(args[0].decode())
|
||||
elif error == 'PathNotAllowed':
|
||||
raise PathNotAllowed(*res)
|
||||
raise PathNotAllowed()
|
||||
elif error == 'ObjectNotFound':
|
||||
raise Repository.ObjectNotFound(res[0], self.location.orig)
|
||||
if old_server:
|
||||
raise Repository.ObjectNotFound('(not available)', self.location.orig)
|
||||
else:
|
||||
raise Repository.ObjectNotFound(args[0].decode(), self.location.orig)
|
||||
elif error == 'InvalidRPCMethod':
|
||||
raise InvalidRPCMethod(*res)
|
||||
if old_server:
|
||||
raise InvalidRPCMethod('(not available)')
|
||||
else:
|
||||
raise InvalidRPCMethod(args[0].decode())
|
||||
else:
|
||||
raise self.RPCError(res.decode('utf-8'), error)
|
||||
raise self.RPCError(unpacked)
|
||||
|
||||
calls = list(calls)
|
||||
waiting_for = []
|
||||
while wait or calls:
|
||||
while waiting_for:
|
||||
try:
|
||||
error, res = self.responses.pop(waiting_for[0])
|
||||
unpacked = self.responses.pop(waiting_for[0])
|
||||
waiting_for.pop(0)
|
||||
if error:
|
||||
handle_error(error, res)
|
||||
if b'exception_class' in unpacked:
|
||||
handle_error(unpacked)
|
||||
else:
|
||||
yield res
|
||||
yield unpacked[RESULT]
|
||||
if not waiting_for and not calls:
|
||||
return
|
||||
except KeyError:
|
||||
|
|
@ -410,15 +662,24 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
|||
raise ConnectionClosed()
|
||||
self.unpacker.feed(data)
|
||||
for unpacked in self.unpacker:
|
||||
if not (isinstance(unpacked, tuple) and len(unpacked) == 4):
|
||||
if isinstance(unpacked, dict):
|
||||
msgid = unpacked[MSGID]
|
||||
elif isinstance(unpacked, tuple) and len(unpacked) == 4:
|
||||
# The first field 'type' was always 1 and has always been ignored
|
||||
_, msgid, error, res = unpacked
|
||||
if error:
|
||||
# ignore res, because it is only a fixed string anyway.
|
||||
unpacked = {MSGID: msgid, b'exception_class': error}
|
||||
else:
|
||||
unpacked = {MSGID: msgid, RESULT: res}
|
||||
else:
|
||||
raise UnexpectedRPCDataFormatFromServer()
|
||||
type, msgid, error, res = unpacked
|
||||
if msgid in self.ignore_responses:
|
||||
self.ignore_responses.remove(msgid)
|
||||
if error:
|
||||
handle_error(error, res)
|
||||
if b'exception_class' in unpacked:
|
||||
handle_error(unpacked)
|
||||
else:
|
||||
self.responses[msgid] = error, res
|
||||
self.responses[msgid] = unpacked
|
||||
elif fd is self.stderr_fd:
|
||||
data = os.read(fd, 32768)
|
||||
if not data:
|
||||
|
|
@ -430,23 +691,29 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
|||
while not self.to_send and (calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT:
|
||||
if calls:
|
||||
if is_preloaded:
|
||||
assert cmd == "get", "is_preload is only supported for 'get'"
|
||||
if calls[0][0] in self.chunkid_to_msgids:
|
||||
waiting_for.append(pop_preload_msgid(calls.pop(0)[0]))
|
||||
assert cmd == 'get', "is_preload is only supported for 'get'"
|
||||
if calls[0]['id'] in self.chunkid_to_msgids:
|
||||
waiting_for.append(pop_preload_msgid(calls.pop(0)['id']))
|
||||
else:
|
||||
args = calls.pop(0)
|
||||
if cmd == 'get' and args[0] in self.chunkid_to_msgids:
|
||||
waiting_for.append(pop_preload_msgid(args[0]))
|
||||
if cmd == 'get' and args['id'] in self.chunkid_to_msgids:
|
||||
waiting_for.append(pop_preload_msgid(args['id']))
|
||||
else:
|
||||
self.msgid += 1
|
||||
waiting_for.append(self.msgid)
|
||||
self.to_send = msgpack.packb((1, self.msgid, cmd, args))
|
||||
if self.dictFormat:
|
||||
self.to_send = msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})
|
||||
else:
|
||||
self.to_send = msgpack.packb((1, self.msgid, cmd, self.named_to_positional(cmd, args)))
|
||||
if not self.to_send and self.preload_ids:
|
||||
chunk_id = self.preload_ids.pop(0)
|
||||
args = (chunk_id,)
|
||||
args = {'id': chunk_id}
|
||||
self.msgid += 1
|
||||
self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid)
|
||||
self.to_send = msgpack.packb((1, self.msgid, 'get', args))
|
||||
if self.dictFormat:
|
||||
self.to_send = msgpack.packb({MSGID: self.msgid, MSG: 'get', ARGS: args})
|
||||
else:
|
||||
self.to_send = msgpack.packb((1, self.msgid, 'get', self.named_to_positional(cmd, args)))
|
||||
|
||||
if self.to_send:
|
||||
try:
|
||||
|
|
@ -458,55 +725,74 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
|||
raise
|
||||
self.ignore_responses |= set(waiting_for)
|
||||
|
||||
@api(since=parse_version('1.0.0'),
|
||||
append_only={'since': parse_version('1.0.7'), 'previously': False})
|
||||
def open(self, path, create=False, lock_wait=None, lock=True, exclusive=False, append_only=False):
|
||||
"""actual remoting is done via self.call in the @api decorator"""
|
||||
|
||||
@api(since=parse_version('1.0.0'))
|
||||
def check(self, repair=False, save_space=False):
|
||||
return self.call('check', repair, save_space)
|
||||
"""actual remoting is done via self.call in the @api decorator"""
|
||||
|
||||
@api(since=parse_version('1.0.0'))
|
||||
def commit(self, save_space=False):
|
||||
return self.call('commit', save_space)
|
||||
"""actual remoting is done via self.call in the @api decorator"""
|
||||
|
||||
def rollback(self, *args):
|
||||
return self.call('rollback')
|
||||
@api(since=parse_version('1.0.0'))
|
||||
def rollback(self):
|
||||
"""actual remoting is done via self.call in the @api decorator"""
|
||||
|
||||
@api(since=parse_version('1.0.0'))
|
||||
def destroy(self):
|
||||
return self.call('destroy')
|
||||
"""actual remoting is done via self.call in the @api decorator"""
|
||||
|
||||
@api(since=parse_version('1.0.0'))
|
||||
def __len__(self):
|
||||
return self.call('__len__')
|
||||
"""actual remoting is done via self.call in the @api decorator"""
|
||||
|
||||
@api(since=parse_version('1.0.0'))
|
||||
def list(self, limit=None, marker=None):
|
||||
return self.call('list', limit, marker)
|
||||
"""actual remoting is done via self.call in the @api decorator"""
|
||||
|
||||
@api(since=parse_version('1.1.0b3'))
|
||||
def scan(self, limit=None, marker=None):
|
||||
return self.call('scan', limit, marker)
|
||||
"""actual remoting is done via self.call in the @api decorator"""
|
||||
|
||||
def get(self, id_):
|
||||
for resp in self.get_many([id_]):
|
||||
def get(self, id):
|
||||
for resp in self.get_many([id]):
|
||||
return resp
|
||||
|
||||
def get_many(self, ids, is_preloaded=False):
|
||||
for resp in self.call_many('get', [(id_,) for id_ in ids], is_preloaded=is_preloaded):
|
||||
for resp in self.call_many('get', [{'id': id} for id in ids], is_preloaded=is_preloaded):
|
||||
yield resp
|
||||
|
||||
def put(self, id_, data, wait=True):
|
||||
return self.call('put', id_, data, wait=wait)
|
||||
@api(since=parse_version('1.0.0'))
|
||||
def put(self, id, data, wait=True):
|
||||
"""actual remoting is done via self.call in the @api decorator"""
|
||||
|
||||
def delete(self, id_, wait=True):
|
||||
return self.call('delete', id_, wait=wait)
|
||||
@api(since=parse_version('1.0.0'))
|
||||
def delete(self, id, wait=True):
|
||||
"""actual remoting is done via self.call in the @api decorator"""
|
||||
|
||||
@api(since=parse_version('1.0.0'))
|
||||
def save_key(self, keydata):
|
||||
return self.call('save_key', keydata)
|
||||
"""actual remoting is done via self.call in the @api decorator"""
|
||||
|
||||
@api(since=parse_version('1.0.0'))
|
||||
def load_key(self):
|
||||
return self.call('load_key')
|
||||
"""actual remoting is done via self.call in the @api decorator"""
|
||||
|
||||
@api(since=parse_version('1.0.0'))
|
||||
def get_free_nonce(self):
|
||||
return self.call('get_free_nonce')
|
||||
"""actual remoting is done via self.call in the @api decorator"""
|
||||
|
||||
@api(since=parse_version('1.0.0'))
|
||||
def commit_nonce_reservation(self, next_unreserved, start_nonce):
|
||||
return self.call('commit_nonce_reservation', next_unreserved, start_nonce)
|
||||
"""actual remoting is done via self.call in the @api decorator"""
|
||||
|
||||
@api(since=parse_version('1.0.0'))
|
||||
def break_lock(self):
|
||||
return self.call('break_lock')
|
||||
"""actual remoting is done via self.call in the @api decorator"""
|
||||
|
||||
def close(self):
|
||||
if self.p:
|
||||
|
|
@ -531,7 +817,7 @@ def handle_remote_line(line):
|
|||
logname, msg = msg.split(' ', 1)
|
||||
logging.getLogger(logname).log(level, msg.rstrip())
|
||||
else:
|
||||
sys.stderr.write("Remote: " + line)
|
||||
sys.stderr.write('Remote: ' + line)
|
||||
|
||||
|
||||
class RepositoryNoCache:
|
||||
|
|
|
|||
|
|
@ -825,14 +825,14 @@ class Repository:
|
|||
return result
|
||||
return result
|
||||
|
||||
def get(self, id_):
|
||||
def get(self, id):
|
||||
if not self.index:
|
||||
self.index = self.open_index(self.get_transaction_id())
|
||||
try:
|
||||
segment, offset = self.index[id_]
|
||||
return self.io.read(segment, offset, id_)
|
||||
segment, offset = self.index[id]
|
||||
return self.io.read(segment, offset, id)
|
||||
except KeyError:
|
||||
raise self.ObjectNotFound(id_, self.path) from None
|
||||
raise self.ObjectNotFound(id, self.path) from None
|
||||
|
||||
def get_many(self, ids, is_preloaded=False):
|
||||
for id_ in ids:
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ from ..hashindex import NSIndex
|
|||
from ..helpers import Location
|
||||
from ..helpers import IntegrityError
|
||||
from ..locking import Lock, LockFailed
|
||||
from ..remote import RemoteRepository, InvalidRPCMethod, ConnectionClosedWithHint, handle_remote_line
|
||||
from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, ConnectionClosedWithHint, handle_remote_line
|
||||
from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE
|
||||
from . import BaseTestCase
|
||||
from .hashindex import H
|
||||
|
|
@ -647,7 +647,60 @@ class RemoteRepositoryTestCase(RepositoryTestCase):
|
|||
exclusive=True, create=create)
|
||||
|
||||
def test_invalid_rpc(self):
|
||||
self.assert_raises(InvalidRPCMethod, lambda: self.repository.call('__init__', None))
|
||||
self.assert_raises(InvalidRPCMethod, lambda: self.repository.call('__init__', {}))
|
||||
|
||||
def test_rpc_exception_transport(self):
|
||||
s1 = 'test string'
|
||||
|
||||
try:
|
||||
self.repository.call('inject_exception', {'kind': 'DoesNotExist'})
|
||||
except Repository.DoesNotExist as e:
|
||||
assert len(e.args) == 1
|
||||
assert e.args[0] == self.repository.location.orig
|
||||
|
||||
try:
|
||||
self.repository.call('inject_exception', {'kind': 'AlreadyExists'})
|
||||
except Repository.AlreadyExists as e:
|
||||
assert len(e.args) == 1
|
||||
assert e.args[0] == self.repository.location.orig
|
||||
|
||||
try:
|
||||
self.repository.call('inject_exception', {'kind': 'CheckNeeded'})
|
||||
except Repository.CheckNeeded as e:
|
||||
assert len(e.args) == 1
|
||||
assert e.args[0] == self.repository.location.orig
|
||||
|
||||
try:
|
||||
self.repository.call('inject_exception', {'kind': 'IntegrityError'})
|
||||
except IntegrityError as e:
|
||||
assert len(e.args) == 1
|
||||
assert e.args[0] == s1
|
||||
|
||||
try:
|
||||
self.repository.call('inject_exception', {'kind': 'PathNotAllowed'})
|
||||
except PathNotAllowed as e:
|
||||
assert len(e.args) == 0
|
||||
|
||||
try:
|
||||
self.repository.call('inject_exception', {'kind': 'ObjectNotFound'})
|
||||
except Repository.ObjectNotFound as e:
|
||||
assert len(e.args) == 2
|
||||
assert e.args[0] == s1
|
||||
assert e.args[1] == self.repository.location.orig
|
||||
|
||||
try:
|
||||
self.repository.call('inject_exception', {'kind': 'InvalidRPCMethod'})
|
||||
except InvalidRPCMethod as e:
|
||||
assert len(e.args) == 1
|
||||
assert e.args[0] == s1
|
||||
|
||||
try:
|
||||
self.repository.call('inject_exception', {'kind': 'divide'})
|
||||
except RemoteRepository.RPCError as e:
|
||||
assert e.unpacked
|
||||
assert e.get_message() == 'ZeroDivisionError: integer division or modulo by zero\n'
|
||||
assert e.exception_class == 'ZeroDivisionError'
|
||||
assert len(e.exception_full) > 0
|
||||
|
||||
def test_ssh_cmd(self):
|
||||
assert self.repository.ssh_cmd(Location('example.com:foo')) == ['ssh', 'example.com']
|
||||
|
|
@ -670,6 +723,31 @@ class RemoteRepositoryTestCase(RepositoryTestCase):
|
|||
assert self.repository.borg_cmd(args, testing=False) == ['borg-0.28.2', 'serve', '--umask=077', '--info']
|
||||
|
||||
|
||||
class RemoteLegacyFree(RepositoryTestCaseBase):
|
||||
# Keep testing this so we can someday safely remove the legacy tuple format.
|
||||
|
||||
def open(self, create=False):
|
||||
with patch.object(RemoteRepository, 'dictFormat', True):
|
||||
return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')),
|
||||
exclusive=True, create=create)
|
||||
|
||||
def test_legacy_free(self):
|
||||
# put
|
||||
self.repository.put(H(0), b'foo')
|
||||
self.repository.commit()
|
||||
self.repository.close()
|
||||
# replace
|
||||
self.repository = self.open()
|
||||
with self.repository:
|
||||
self.repository.put(H(0), b'bar')
|
||||
self.repository.commit()
|
||||
# delete
|
||||
self.repository = self.open()
|
||||
with self.repository:
|
||||
self.repository.delete(H(0))
|
||||
self.repository.commit()
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform == 'cygwin', reason='remote is broken on cygwin and hangs')
|
||||
class RemoteRepositoryCheckTestCase(RepositoryCheckTestCase):
|
||||
|
||||
|
|
|
|||
49
src/borg/version.py
Normal file
49
src/borg/version.py
Normal file
|
|
@ -0,0 +1,49 @@
|
|||
import re
|
||||
|
||||
|
||||
def parse_version(version):
|
||||
"""
|
||||
simplistic parser for setuptools_scm versions
|
||||
|
||||
supports final versions and alpha ('a'), beta ('b') and rc versions. It just discards commits since last tag
|
||||
and git revision hash.
|
||||
|
||||
Output is a version tuple containing integers. It ends with one or two elements that ensure that relational
|
||||
operators yield correct relations for alpha, beta and rc versions too. For final versions the last element
|
||||
is a -1, for prerelease versions the last two elements are a smaller negative number and the number of e.g.
|
||||
the beta.
|
||||
|
||||
Note, this sorts version 1.0 before 1.0.0.
|
||||
|
||||
This version format is part of the remote protocol, don‘t change in breaking ways.
|
||||
"""
|
||||
|
||||
parts = version.split('+')[0].split('.')
|
||||
if parts[-1].startswith('dev'):
|
||||
del parts[-1]
|
||||
version = [int(segment) for segment in parts[:-1]]
|
||||
|
||||
prerelease = re.fullmatch('([0-9]+)(a|b|rc)([0-9]+)', parts[-1])
|
||||
if prerelease:
|
||||
version_type = {'a': -4, 'b': -3, 'rc': -2}[prerelease.group(2)]
|
||||
version += [int(prerelease.group(1)), version_type, int(prerelease.group(3))]
|
||||
else:
|
||||
version += [int(parts[-1]), -1]
|
||||
|
||||
return tuple(version)
|
||||
|
||||
|
||||
def format_version(version):
|
||||
"""a reverse for parse_version (obviously without the dropped information)"""
|
||||
f = []
|
||||
it = iter(version)
|
||||
while True:
|
||||
part = next(it)
|
||||
if part >= 0:
|
||||
f += str(part)
|
||||
elif part == -1:
|
||||
break
|
||||
else:
|
||||
f[-1] = f[-1] + {-2: 'rc', -3: 'b', -4: 'a'}[part] + str(next(it))
|
||||
break
|
||||
return '.'.join(f)
|
||||
Loading…
Reference in a new issue