Add proof of concept for pull mechanism

Added a small proof of concept for a pull mechanism implementation.
Works fine for simple test cases, but probably needs many improvements
to be useful for more complex usage scenarios.
This commit is contained in:
TNeitzel 2022-02-10 22:45:17 +01:00 committed by Thomas Waldmann
parent 289364b7d7
commit 0a484ebc07
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
3 changed files with 63 additions and 24 deletions

View file

@ -174,10 +174,11 @@ def with_repository(fake=False, invert_fake=False, create=False, lock=True,
make_parent_dirs = getattr(args, 'make_parent_dirs', False)
if argument(args, fake) ^ invert_fake:
return method(self, args, repository=None, **kwargs)
elif location.proto == 'ssh':
elif location.proto == 'ssh' or location.proto == 'serve':
repository = RemoteRepository(location.omit_archive(), create=create, exclusive=argument(args, exclusive),
lock_wait=self.lock_wait, lock=lock, append_only=append_only,
make_parent_dirs=make_parent_dirs, args=args)
make_parent_dirs=make_parent_dirs, args=args, serve=(location.proto == 'serve'))
else:
repository = Repository(location.path, create=create, exclusive=argument(args, exclusive),
lock_wait=self.lock_wait, lock=lock, append_only=append_only,
@ -296,6 +297,7 @@ class Archiver:
restrict_to_repositories=args.restrict_to_repositories,
append_only=args.append_only,
storage_quota=args.storage_quota,
pull_command=args.pull_command
).serve()
def do_version(self, args):
@ -5252,6 +5254,9 @@ class Archiver:
'When a new repository is initialized, sets the storage quota on the new '
'repository as well. Default: no quota.')
subparser.add_argument('--pull-command', metavar='cmd', dest='pull_command',
help='command to use for pulling from a borg server started in serve:// mode')
# borg umount
umount_epilog = process_epilog("""
This command unmounts a FUSE filesystem that was mounted with ``borg mount``.

View file

@ -381,6 +381,10 @@ class Location:
(?::(?P<port>\d+))? # :port (optional)
""" + abs_path_re + optional_archive_re, re.VERBOSE) # path or path::archive
serve_re = re.compile(r"""
(?P<proto>serve):// # serve://
""" + abs_path_re + optional_archive_re, re.VERBOSE) # path or path::archive
file_re = re.compile(r"""
(?P<proto>file):// # file://
""" + file_path_re + optional_archive_re, re.VERBOSE) # servername/path, path or path::archive
@ -460,6 +464,12 @@ class Location:
self.path = normpath_special(m.group('path'))
self.archive = m.group('archive')
return True
m = self.serve_re.match(text)
if m:
self.proto = m.group('proto')
self.path = normpath_special(m.group('path'))
self.archive = m.group('archive')
return True
m = self.file_re.match(text)
if m:
self.proto = m.group('proto')

View file

@ -176,7 +176,7 @@ class RepositoryServer: # pragma: no cover
'inject_exception',
)
def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, storage_quota):
def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, storage_quota, pull_command=None):
self.repository = None
self.restrict_to_paths = restrict_to_paths
self.restrict_to_repositories = restrict_to_repositories
@ -187,6 +187,7 @@ class RepositoryServer: # pragma: no cover
self.append_only = append_only
self.storage_quota = storage_quota
self.client_version = parse_version('1.0.8') # fallback version if client is too old to send version information
self.pull_command = pull_command
def positional_to_named(self, method, argv):
"""Translate from positional protocol to named protocol."""
@ -206,13 +207,27 @@ class RepositoryServer: # pragma: no cover
return {name: kwargs[name] for name in kwargs if name in known}
def serve(self):
stdin_fd = sys.stdin.fileno()
stdout_fd = sys.stdout.fileno()
stderr_fd = sys.stdout.fileno()
os.set_blocking(stdin_fd, False)
os.set_blocking(stdout_fd, True)
os.set_blocking(stderr_fd, True)
if self.pull_command:
self.p = Popen(shlex.split(self.pull_command), bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE)
stdin_fd = self.p.stdout.fileno()
stdout_fd = self.p.stdin.fileno()
stderr_fd = sys.stderr.fileno()
os.set_blocking(stdin_fd, False)
os.set_blocking(stdout_fd, False)
os.set_blocking(stderr_fd, False)
else:
stdin_fd = sys.stdin.fileno()
stdout_fd = sys.stdout.fileno()
stderr_fd = sys.stdout.fileno()
os.set_blocking(stdin_fd, False)
os.set_blocking(stdout_fd, True)
os.set_blocking(stderr_fd, True)
unpacker = get_limited_unpacker('server')
while True:
r, w, es = select.select([stdin_fd], [], [], 10)
if r:
@ -549,7 +564,7 @@ class RemoteRepository:
dictFormat = False # outside of __init__ for testing of legacy free protocol
def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False,
make_parent_dirs=False, args=None):
make_parent_dirs=False, args=None, serve=False):
self.location = self._location = location
self.preload_ids = []
self.msgid = 0
@ -571,20 +586,29 @@ class RemoteRepository:
testing = location.host == '__testsuite__'
# when testing, we invoke and talk to a borg process directly (no ssh).
# when not testing, we invoke the system-installed ssh binary to talk to a remote borg.
env = prepare_subprocess_env(system=not testing)
borg_cmd = self.borg_cmd(args, testing)
if not testing:
borg_cmd = self.ssh_cmd(location) + borg_cmd
logger.debug('SSH command line: %s', borg_cmd)
# we do not want the ssh getting killed by Ctrl-C/SIGINT because it is needed for clean shutdown of borg.
# borg's SIGINT handler tries to write a checkpoint and requires the remote repo connection.
self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env, preexec_fn=ignore_sigint)
self.stdin_fd = self.p.stdin.fileno()
self.stdout_fd = self.p.stdout.fileno()
self.stderr_fd = self.p.stderr.fileno()
os.set_blocking(self.stdin_fd, False)
os.set_blocking(self.stdout_fd, False)
os.set_blocking(self.stderr_fd, False)
if serve:
self.stdin_fd = sys.stdout.fileno()
self.stdout_fd = sys.stdin.fileno()
self.stderr_fd = sys.stderr.fileno()
os.set_blocking(self.stdin_fd, True)
os.set_blocking(self.stdout_fd, False)
else:
env = prepare_subprocess_env(system=not testing)
borg_cmd = self.borg_cmd(args, testing)
if not testing:
borg_cmd = self.ssh_cmd(location) + borg_cmd
logger.debug('SSH command line: %s', borg_cmd)
# we do not want the ssh getting killed by Ctrl-C/SIGINT because it is needed for clean shutdown of borg.
# borg's SIGINT handler tries to write a checkpoint and requires the remote repo connection.
self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env, preexec_fn=ignore_sigint)
self.stdin_fd = self.p.stdin.fileno()
self.stdout_fd = self.p.stdout.fileno()
self.stderr_fd = self.p.stderr.fileno()
os.set_blocking(self.stdin_fd, False)
os.set_blocking(self.stdout_fd, False)
os.set_blocking(self.stderr_fd, False)
self.r_fds = [self.stdout_fd, self.stderr_fd]
self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd]