diff --git a/src/borg/fuse.py b/src/borg/fuse.py index 46d491c4d..bc8b59be3 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -18,7 +18,7 @@ from .crypto.low_level import blake2b_128 from .archiver import Archiver from .archive import Archive from .hashindex import FuseVersionsIndex -from .helpers import daemonize, hardlinkable, signal_handler, format_file_size +from .helpers import daemonize, daemonizing, hardlinkable, signal_handler, format_file_size from .helpers import msgpack from .item import Item from .lrucache import LRUCache @@ -510,10 +510,13 @@ class FuseOperations(llfuse.Operations, FuseBackend): self._create_filesystem() llfuse.init(self, mountpoint, options) if not foreground: - old_id, new_id = daemonize() - if not isinstance(self.repository_uncached, RemoteRepository): - # local repo and the locking process' PID just changed, migrate it: - self.repository_uncached.migrate_lock(old_id, new_id) + if isinstance(self.repository_uncached, RemoteRepository): + daemonize() + else: + with daemonizing() as (old_id, new_id): + # local repo: the locking process' PID is changing, migrate it: + logger.debug('fuse: mount local repo, going to background: migrating lock.') + self.repository_uncached.migrate_lock(old_id, new_id) # If the file system crashes, we do not want to umount because in that # case the mountpoint suddenly appears to become empty. This can have diff --git a/src/borg/helpers/process.py b/src/borg/helpers/process.py index 763a8c68b..2e61ad0a7 100644 --- a/src/borg/helpers/process.py +++ b/src/borg/helpers/process.py @@ -6,6 +6,8 @@ import shlex import signal import subprocess import sys +import time +import traceback from .. import __version__ @@ -13,17 +15,23 @@ from ..platformflags import is_win32, is_linux, is_freebsd, is_darwin from ..logger import create_logger logger = create_logger() +from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_SIGNAL_BASE -def daemonize(): - """Detach process from controlling terminal and run in background - Returns: old and new get_process_id tuples - """ +@contextlib.contextmanager +def _daemonize(): from ..platform import get_process_id old_id = get_process_id() pid = os.fork() if pid: - os._exit(0) + exit_code = EXIT_SUCCESS + try: + yield old_id, None + except _ExitCodeException as e: + exit_code = e.exit_code + finally: + logger.debug('Daemonizing: Foreground process (%s, %s, %s) is now dying.' % old_id) + os._exit(exit_code) os.setsid() pid = os.fork() if pid: @@ -31,13 +39,101 @@ def daemonize(): os.chdir('/') os.close(0) os.close(1) - os.close(2) fd = os.open(os.devnull, os.O_RDWR) os.dup2(fd, 0) os.dup2(fd, 1) - os.dup2(fd, 2) new_id = get_process_id() - return old_id, new_id + try: + yield old_id, new_id + finally: + # Close / redirect stderr to /dev/null only now + # for the case that we want to log something before yield returns. + os.close(2) + os.dup2(fd, 2) + + +def daemonize(): + """Detach process from controlling terminal and run in background + + Returns: old and new get_process_id tuples + """ + with _daemonize() as (old_id, new_id): + return old_id, new_id + + +@contextlib.contextmanager +def daemonizing(*, timeout=5): + """Like daemonize(), but as context manager. + + The with-body is executed in the background process, + while the foreground process survives until the body is left + or the given timeout is exceeded. In the latter case a warning is + reported by the foreground. + Context variable is (old_id, new_id) get_process_id tuples. + An exception raised in the body is reported by the foreground + as a warning as well as propagated outside the body in the background. + In case of a warning, the foreground exits with exit code EXIT_WARNING + instead of EXIT_SUCCESS. + """ + with _daemonize() as (old_id, new_id): + if new_id is None: + # The original / parent process, waiting for a signal to die. + logger.debug('Daemonizing: Foreground process (%s, %s, %s) is waiting for background process...' % old_id) + exit_code = EXIT_SUCCESS + # Indeed, SIGHUP and SIGTERM handlers should have been set on archiver.run(). Just in case... + with signal_handler('SIGINT', raising_signal_handler(KeyboardInterrupt)), \ + signal_handler('SIGHUP', raising_signal_handler(SigHup)), \ + signal_handler('SIGTERM', raising_signal_handler(SigTerm)): + try: + if timeout > 0: + time.sleep(timeout) + except SigTerm: + # Normal termination; expected from grandchild, see 'os.kill()' below + pass + except SigHup: + # Background wants to indicate a problem; see 'os.kill()' below, + # log message will come from grandchild. + exit_code = EXIT_WARNING + except KeyboardInterrupt: + # Manual termination. + logger.debug('Daemonizing: Foreground process (%s, %s, %s) received SIGINT.' % old_id) + exit_code = EXIT_SIGNAL_BASE + 2 + except BaseException as e: + # Just in case... + logger.warning('Daemonizing: Foreground process received an exception while waiting:\n' + + ''.join(traceback.format_exception(e.__class__, e, e.__traceback__))) + exit_code = EXIT_WARNING + else: + logger.warning('Daemonizing: Background process did not respond (timeout). Is it alive?') + exit_code = EXIT_WARNING + finally: + # Don't call with-body, but die immediately! + # return would be sufficient, but we want to pass the exit code. + raise _ExitCodeException(exit_code) + + # The background / grandchild process. + sig_to_foreground = signal.SIGTERM + logger.debug('Daemonizing: Background process (%s, %s, %s) is starting...' % new_id) + try: + yield old_id, new_id + except BaseException as e: + sig_to_foreground = signal.SIGHUP + logger.warning('Daemonizing: Background process raised an exception while starting:\n' + + ''.join(traceback.format_exception(e.__class__, e, e.__traceback__))) + raise e + else: + logger.debug('Daemonizing: Background process (%s, %s, %s) has started.' % new_id) + finally: + try: + os.kill(old_id[1], sig_to_foreground) + except BaseException as e: + logger.error('Daemonizing: Trying to kill the foreground process raised an exception:\n' + + ''.join(traceback.format_exception(e.__class__, e, e.__traceback__))) + + +class _ExitCodeException(BaseException): + def __init__(self, exit_code): + self.exit_code = exit_code class SignalException(BaseException): diff --git a/src/borg/testsuite/__init__.py b/src/borg/testsuite/__init__.py index 5e2047282..844db8a8a 100644 --- a/src/borg/testsuite/__init__.py +++ b/src/borg/testsuite/__init__.py @@ -238,30 +238,55 @@ class BaseTestCase(unittest.TestCase): self._assert_dirs_equal_cmp(sub_diff, ignore_flags=ignore_flags, ignore_xattrs=ignore_xattrs, ignore_ns=ignore_ns) @contextmanager - def fuse_mount(self, location, mountpoint=None, *options, **kwargs): + def fuse_mount(self, location, mountpoint=None, *options, fork=True, os_fork=False, **kwargs): + # For a successful mount, `fork = True` is required for + # the borg mount daemon to work properly or the tests + # will just freeze. Therefore, if argument `fork` is not + # specified, the default value is `True`, regardless of + # `FORK_DEFAULT`. However, leaving the possibilty to run + # the command with `fork = False` is still necessary for + # testing for mount failures, for example attempting to + # mount a read-only repo. + # `os_fork = True` is needed for testing (the absence of) + # a race condition of the Lock during lock migration when + # borg mount (local repo) is daemonizing (#4953). This is another + # example where we need `fork = False`, because the test case + # needs an OS fork, not a spawning of the fuse mount. + # `fork = False` is implied if `os_fork = True`. if mountpoint is None: mountpoint = tempfile.mkdtemp() else: os.mkdir(mountpoint) - if 'fork' not in kwargs: - # For a successful mount, `fork = True` is required for - # the borg mount daemon to work properly or the tests - # will just freeze. Therefore, if argument `fork` is not - # specified, the default value is `True`, regardless of - # `FORK_DEFAULT`. However, leaving the possibilty to run - # the command with `fork = False` is still necessary for - # testing for mount failures, for example attempting to - # mount a read-only repo. - kwargs['fork'] = True - self.cmd('mount', location, mountpoint, *options, **kwargs) - if kwargs.get('exit_code', EXIT_SUCCESS) == EXIT_ERROR: - # If argument `exit_code = EXIT_ERROR`, then this call - # is testing the behavior of an unsuccessful mount and - # we must not continue, as there is no mount to work - # with. The test itself has already failed or succeeded - # with the call to `self.cmd`, above. - yield - return + args = ['mount', location, mountpoint] + list(options) + if os_fork: + # Do not spawn, but actually (OS) fork. + if os.fork() == 0: + # The child process. + # Decouple from parent and fork again. + # Otherwise, it becomes a zombie and pretends to be alive. + os.setsid() + if os.fork() > 0: + os._exit(0) + # The grandchild process. + try: + self.cmd(*args, fork=False, **kwargs) # borg mount not spawning. + finally: + # This should never be reached, since it daemonizes, + # and the grandchild process exits before cmd() returns. + # However, just in case... + print('Fatal: borg mount did not daemonize properly. Force exiting.', + file=sys.stderr, flush=True) + os._exit(0) + else: + self.cmd(*args, fork=fork, **kwargs) + if kwargs.get('exit_code', EXIT_SUCCESS) == EXIT_ERROR: + # If argument `exit_code = EXIT_ERROR`, then this call + # is testing the behavior of an unsuccessful mount and + # we must not continue, as there is no mount to work + # with. The test itself has already failed or succeeded + # with the call to `self.cmd`, above. + yield + return self.wait_for_mountstate(mountpoint, mounted=True) yield umount(mountpoint) diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py index 3b9700718..6a4cce03b 100644 --- a/src/borg/testsuite/archiver.py +++ b/src/borg/testsuite/archiver.py @@ -2503,6 +2503,94 @@ class ArchiverTestCase(ArchiverTestCaseBase): with self.fuse_mount(self.repository_location, mountpoint, '--prefix=nope'): assert sorted(os.listdir(os.path.join(mountpoint))) == [] + @unittest.skipUnless(has_llfuse, 'llfuse not installed') + def test_migrate_lock_alive(self): + """Both old_id and new_id must not be stale during lock migration / daemonization.""" + from functools import wraps + import pickle + import traceback + + # Check results are communicated from the borg mount background process + # to the pytest process by means of a serialized dict object stored in this file. + assert_data_file = os.path.join(self.tmpdir, 'migrate_lock_assert_data.pickle') + + # Decorates Lock.migrate_lock() with process_alive() checks before and after. + # (We don't want to mix testing code into runtime.) + def write_assert_data(migrate_lock): + @wraps(migrate_lock) + def wrapper(self, old_id, new_id): + wrapper.num_calls += 1 + assert_data = { + 'num_calls': wrapper.num_calls, + 'old_id': old_id, + 'new_id': new_id, + 'before': { + 'old_id_alive': platform.process_alive(*old_id), + 'new_id_alive': platform.process_alive(*new_id)}, + 'exception': None, + 'exception.extr_tb': None, + 'after': { + 'old_id_alive': None, + 'new_id_alive': None}} + try: + with open(assert_data_file, 'wb') as _out: + pickle.dump(assert_data, _out) + except: + pass + try: + return migrate_lock(self, old_id, new_id) + except BaseException as e: + assert_data['exception'] = e + assert_data['exception.extr_tb'] = traceback.extract_tb(e.__traceback__) + finally: + assert_data['after'].update({ + 'old_id_alive': platform.process_alive(*old_id), + 'new_id_alive': platform.process_alive(*new_id)}) + try: + with open(assert_data_file, 'wb') as _out: + pickle.dump(assert_data, _out) + except: + pass + wrapper.num_calls = 0 + return wrapper + + # Decorate + borg.locking.Lock.migrate_lock = write_assert_data(borg.locking.Lock.migrate_lock) + try: + self.cmd('init', '--encryption=none', self.repository_location) + self.create_src_archive('arch') + mountpoint = os.path.join(self.tmpdir, 'mountpoint') + # In order that the decoration is kept for the borg mount process, we must not spawn, but actually fork; + # not to be confused with the forking in borg.helpers.daemonize() which is done as well. + with self.fuse_mount(self.repository_location, mountpoint, os_fork=True): + pass + with open(assert_data_file, 'rb') as _in: + assert_data = pickle.load(_in) + print('\nLock.migrate_lock(): assert_data = %r.' % (assert_data, ), file=sys.stderr, flush=True) + exception = assert_data['exception'] + if exception is not None: + extracted_tb = assert_data['exception.extr_tb'] + print( + 'Lock.migrate_lock() raised an exception:\n', + 'Traceback (most recent call last):\n', + *traceback.format_list(extracted_tb), + *traceback.format_exception(exception.__class__, exception, None), + sep='', end='', file=sys.stderr, flush=True) + + assert assert_data['num_calls'] == 1, "Lock.migrate_lock() must be called exactly once." + assert exception is None, "Lock.migrate_lock() may not raise an exception." + + assert_data_before = assert_data['before'] + assert assert_data_before['old_id_alive'], "old_id must be alive (=must not be stale) when calling Lock.migrate_lock()." + assert assert_data_before['new_id_alive'], "new_id must be alive (=must not be stale) when calling Lock.migrate_lock()." + + assert_data_after = assert_data['after'] + assert assert_data_after['old_id_alive'], "old_id must be alive (=must not be stale) when Lock.migrate_lock() has returned." + assert assert_data_after['new_id_alive'], "new_id must be alive (=must not be stale) when Lock.migrate_lock() has returned." + finally: + # Undecorate + borg.locking.Lock.migrate_lock = borg.locking.Lock.migrate_lock.__wrapped__ + def verify_aes_counter_uniqueness(self, method): seen = set() # Chunks already seen used = set() # counter values already used @@ -3570,6 +3658,10 @@ class RemoteArchiverTestCase(ArchiverTestCase): def test_config(self): pass + @unittest.skip('only works locally') + def test_migrate_lock_alive(self): + pass + def test_strip_components_doesnt_leak(self): self.cmd('init', '--encryption=repokey', self.repository_location) self.create_regular_file('dir/file', contents=b"test file contents 1")