diff --git a/src/borg/cache.py b/src/borg/cache.py index 99d5bf233..8a7ad4aa0 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -75,6 +75,7 @@ class Cache: self.key = key self.manifest = manifest self.path = path or os.path.join(get_cache_dir(), repository.id_str) + self.unique_hostname = bool(os.environ.get('BORG_UNIQUE_HOSTNAME')) self.do_files = do_files # Warn user before sending data to a never seen before unencrypted repository if not os.path.exists(self.path): @@ -202,7 +203,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" def open(self, lock_wait=None): if not os.path.isdir(self.path): raise Exception('%s Does not look like a Borg cache' % self.path) - self.lock = Lock(os.path.join(self.path, 'lock'), exclusive=True, timeout=lock_wait).acquire() + self.lock = Lock(os.path.join(self.path, 'lock'), exclusive=True, timeout=lock_wait, kill_stale_locks=self.unique_hostname).acquire() self.rollback() def close(self): diff --git a/src/borg/locking.py b/src/borg/locking.py index 5405c2310..56f280929 100644 --- a/src/borg/locking.py +++ b/src/borg/locking.py @@ -1,6 +1,8 @@ +import errno import json import os import socket +import sys import time from .helpers import Error, ErrorWithTraceback @@ -17,10 +19,36 @@ _hostname = socket.gethostname() def get_id(): """Get identification tuple for 'us'""" + + # If changing the thread_id to ever be non-zero, also revisit the check_lock_stale() below. thread_id = 0 return _hostname, _pid, thread_id +def check_lock_stale(host, pid, thread): + """Check if the host, pid, thread combination corresponds to a dead process on our local node or not.""" + if host != _hostname: + return False + + if thread != 0: + # Currently thread is always 0, if we ever decide to set this to a non-zero value, this code needs to be revisited too to do a sensible thing + return False + + try: + # This may not work in Windows. + # This does not kill anything, 0 means "see if we can send a signal to this process or not". + # Possible errors: No such process (== stale lock) or permission denied (not a stale lock) + # If the exception is not raised that means such a pid is valid and we can send a signal to it (== not a stale lock too). + os.kill(pid, 0) + return False + except OSError as err: + if err.errno != errno.ESRCH: + return False + pass + + return True + + class TimeoutTimer: """ A timer for timeout checks (can also deal with no timeout, give timeout=None [default]). @@ -109,12 +137,14 @@ class ExclusiveLock: This makes sure the lock is released again if the block is left, no matter how (e.g. if an exception occurred). """ - def __init__(self, path, timeout=None, sleep=None, id=None): + def __init__(self, path, timeout=None, sleep=None, id=None, kill_stale_locks=False): self.timeout = timeout self.sleep = sleep self.path = os.path.abspath(path) self.id = id or get_id() self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id) + self.ok_to_kill_stale_locks = kill_stale_locks + self.stale_warning_printed = False def __enter__(self): return self.acquire() @@ -137,6 +167,8 @@ class ExclusiveLock: except FileExistsError: # already locked if self.by_me(): return self + if self.kill_stale_lock(): + pass if timer.timed_out_or_sleep(): raise LockTimeout(self.path) except OSError as err: @@ -160,6 +192,47 @@ class ExclusiveLock: def by_me(self): return os.path.exists(self.unique_name) + def kill_stale_lock(self): + for name in os.listdir(self.path): + + try: + host_pid, thread_str = name.rsplit('-', 1) + host, pid_str = host_pid.rsplit('.', 1) + pid = int(pid_str) + thread = int(thread_str) + except ValueError: + # Malformed lock name? Or just some new format we don't understand? + # It's safer to just exit + return False + + if not check_lock_stale(host, pid, thread): + return False + + if not self.ok_to_kill_stale_locks: + if not self.stale_warning_printed: + print(("Found stale lock %s, but not deleting because BORG_UNIQUE_HOSTNAME is not set." % name), file=sys.stderr) + self.stale_warning_printed = True + return False + + try: + os.unlink(os.path.join(self.path, name)) + print(("Killed stale lock %s." % name), file=sys.stderr) + except OSError as err: + if not self.stale_warning_printed: + print(("Found stale lock %s, but cannot delete due to %s" % (name, str(err))), file=sys.stderr) + self.stale_warning_printed = True + return False + + try: + os.rmdir(self.path) + except OSError: + # Directory is not empty = we lost the race to somebody else + # Permission denied = we cannot operate anyway + # other error like EIO = we cannot operate and it's unsafe too. + return False + + return True + def break_lock(self): if self.is_locked(): for name in os.listdir(self.path): @@ -174,17 +247,34 @@ class LockRoster: Note: you usually should call the methods with an exclusive lock held, to avoid conflicting access by multiple threads/processes/machines. """ - def __init__(self, path, id=None): + def __init__(self, path, id=None, kill_stale_locks=False): self.path = path self.id = id or get_id() + self.ok_to_kill_zombie_locks = kill_stale_locks def load(self): try: with open(self.path) as f: data = json.load(f) + + # Just nuke the stale locks early on load + if self.ok_to_kill_zombie_locks: + for key in (SHARED, EXCLUSIVE): + elements = set() + try: + for e in data[key]: + (host, pid, thread) = e + if not check_lock_stale(host, pid, thread): + elements.add(tuple(e)) + else: + print(("Removed stale %s roster lock for pid %d." % (key, pid)), file=sys.stderr) + data[key] = list(list(e) for e in elements) + except KeyError: + pass except (FileNotFoundError, ValueError): # no or corrupt/empty roster file? data = {} + return data def save(self, data): @@ -235,18 +325,18 @@ class Lock: This makes sure the lock is released again if the block is left, no matter how (e.g. if an exception occurred). """ - def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None): + def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None, kill_stale_locks=False): self.path = path self.is_exclusive = exclusive self.sleep = sleep self.timeout = timeout self.id = id or get_id() # globally keeping track of shared and exclusive lockers: - self._roster = LockRoster(path + '.roster', id=id) + self._roster = LockRoster(path + '.roster', id=id, kill_stale_locks=kill_stale_locks) # an exclusive lock, used for: # - holding while doing roster queries / updates - # - holding while the Lock instance itself is exclusive - self._lock = ExclusiveLock(path + '.exclusive', id=id, timeout=timeout) + # - holding while the Lock itself is exclusive + self._lock = ExclusiveLock(path + '.exclusive', id=id, timeout=timeout, kill_stale_locks=kill_stale_locks) def __enter__(self): return self.acquire() diff --git a/src/borg/repository.py b/src/borg/repository.py index f1c97e134..106642d85 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -121,6 +121,7 @@ class Repository: self.do_create = create self.exclusive = exclusive self.append_only = append_only + self.unique_hostname = bool(os.environ.get('BORG_UNIQUE_HOSTNAME')) def __del__(self): if self.lock: @@ -254,7 +255,7 @@ class Repository: if not os.path.isdir(path): raise self.DoesNotExist(path) if lock: - self.lock = Lock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait).acquire() + self.lock = Lock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait, kill_stale_locks=self.unique_hostname).acquire() else: self.lock = None self.config = ConfigParser(interpolation=None)