diff --git a/borg/cache.py b/borg/cache.py index 937e2a757..37e066685 100644 --- a/borg/cache.py +++ b/borg/cache.py @@ -124,7 +124,7 @@ class Cache: def open(self): if not os.path.isdir(self.path): raise Exception('%s Does not look like a Borg cache' % self.path) - self.lock = UpgradableLock(os.path.join(self.path, 'config'), exclusive=True) + self.lock = UpgradableLock(os.path.join(self.path, 'repo'), exclusive=True).acquire() self.rollback() def close(self): diff --git a/borg/locking.py b/borg/locking.py index 0c6df2fee..9d59c654b 100644 --- a/borg/locking.py +++ b/borg/locking.py @@ -1,43 +1,246 @@ -import fcntl +import errno +import json +import os +import socket +import threading +import time from borg.helpers import Error +ADD, REMOVE = 'add', 'remove' +SHARED, EXCLUSIVE = 'shared', 'exclusive' -class UpgradableLock: - class ReadLockFailed(Error): - """Failed to acquire read lock on {}""" +def get_id(): + """Get identification tuple for 'us'""" + hostname = socket.gethostname() + pid = os.getpid() + tid = threading.current_thread().ident & 0xffffffff + return hostname, pid, tid - class WriteLockFailed(Error): - """Failed to acquire write lock on {}""" - def __init__(self, path, exclusive=False): - self.path = path - try: - self.fd = open(path, 'r+') - except IOError: - self.fd = open(path, 'r') - try: - if exclusive: - fcntl.lockf(self.fd, fcntl.LOCK_EX) +class ExclusiveLock: + """An exclusive Lock based on mkdir fs operation being atomic""" + class LockError(Error): + """Failed to acquire the lock {}.""" + + class LockTimeout(LockError): + """Failed to create/acquire the lock {} (timeout).""" + + class LockFailed(LockError): + """Failed to create/acquire the lock {} ({}).""" + + class UnlockError(Error): + """Failed to release the lock {}.""" + + class NotLocked(UnlockError): + """Failed to release the lock {} (was not locked).""" + + class NotMyLock(UnlockError): + """Failed to release the lock {} (was/is locked, but not by me).""" + + def __init__(self, path, timeout=None, sleep=None, id=None): + self.timeout = timeout + self.sleep = sleep + self.path = os.path.abspath(path) + self.id = id or get_id() + self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id) + + def __enter__(self): + return self.acquire() + + def __exit__(self, *exc): + self.release() + + def __repr__(self): + return "<%s: %r>" % (self.__class__.__name__, self.unique_name) + + def _get_timing(self, timeout, sleep): + if timeout is None: + timeout = self.timeout + start = end = time.time() + if timeout is not None and timeout > 0: + end += timeout + if sleep is None: + sleep = self.sleep + if sleep is None: + if timeout is None: + sleep = 1.0 else: - fcntl.lockf(self.fd, fcntl.LOCK_SH) - # Python 3.2 raises IOError, Python3.3+ raises OSError - except (IOError, OSError): - if exclusive: - raise self.WriteLockFailed(self.path) - else: - raise self.ReadLockFailed(self.path) - self.is_exclusive = exclusive + sleep = max(0, timeout / 10.0) + return start, sleep, end, timeout - def upgrade(self): - try: - fcntl.lockf(self.fd, fcntl.LOCK_EX) - # Python 3.2 raises IOError, Python3.3+ raises OSError - except (IOError, OSError): - raise self.WriteLockFailed(self.path) - self.is_exclusive = True + def acquire(self, timeout=None, sleep=None): + start, sleep, end, timeout = self._get_timing(timeout, sleep) + while True: + try: + os.mkdir(self.path) + except OSError as err: + if err.errno == errno.EEXIST: # already locked + if self.by_me(): + return self + if timeout is not None and time.time() > end: + raise self.LockTimeout(self.path) + time.sleep(sleep) + else: + raise self.LockFailed(self.path, str(err)) + else: + with open(self.unique_name, "wb"): + pass + return self def release(self): - fcntl.lockf(self.fd, fcntl.LOCK_UN) - self.fd.close() + if not self.is_locked(): + raise self.NotLocked(self.path) + if not self.by_me(): + raise self.NotMyLock(self.path) + os.unlink(self.unique_name) + os.rmdir(self.path) + + def is_locked(self): + return os.path.exists(self.path) + + def by_me(self): + return os.path.exists(self.unique_name) + + def break_lock(self): + if self.is_locked(): + for name in os.listdir(self.path): + os.unlink(os.path.join(self.path, name)) + os.rmdir(self.path) + + +class LockRoster: + """ + A Lock Roster to track shared/exclusive lockers. + + Note: you usually should call the methods with an exclusive lock held, + to avoid conflicting access by multiple threads/processes/machines. + """ + def __init__(self, path, id=None): + self.path = path + self.id = id or get_id() + + def load(self): + try: + with open(self.path) as f: + data = json.load(f) + except IOError as err: + if err.errno != errno.ENOENT: + raise + data = {} + return data + + def save(self, data): + with open(self.path, "w") as f: + json.dump(data, f) + + def remove(self): + os.unlink(self.path) + + def get(self, key): + roster = self.load() + return set(tuple(e) for e in roster.get(key, [])) + + def modify(self, key, op): + roster = self.load() + try: + elements = set(tuple(e) for e in roster[key]) + except KeyError: + elements = set() + if op == ADD: + elements.add(self.id) + elif op == REMOVE: + elements.remove(self.id) + else: + raise ValueError('Unknown LockRoster op %r' % op) + roster[key] = list(list(e) for e in elements) + self.save(roster) + + +class UpgradableLock: + """ + A Lock for a resource that can be accessed in a shared or exclusive way. + Typically, write access to a resource needs an exclusive lock (1 writer, + noone is allowed reading) and read access to a resource needs a shared + lock (multiple readers are allowed). + """ + class SharedLockFailed(Error): + """Failed to acquire shared lock [{}]""" + + class ExclusiveLockFailed(Error): + """Failed to acquire write lock [{}]""" + + def __init__(self, path, exclusive=False, sleep=None, id=None): + self.path = path + self.is_exclusive = exclusive + self.sleep = sleep + self.id = id or get_id() + # globally keeping track of shared and exclusive lockers: + self._roster = LockRoster(path + '.roster', id=id) + # an exclusive lock, used for: + # - holding while doing roster queries / updates + # - holding while the UpgradableLock itself is exclusive + self._lock = ExclusiveLock(path + '.lock', id=id) + + def __enter__(self): + return self.acquire() + + def __exit__(self, *exc): + self.release() + + def __repr__(self): + return "<%s: %r>" % (self.__class__.__name__, self.id) + + def acquire(self, exclusive=None, remove=None, sleep=None): + if exclusive is None: + exclusive = self.is_exclusive + sleep = sleep or self.sleep or 0.2 + try: + if exclusive: + self._wait_for_readers_finishing(remove, sleep) + self._roster.modify(EXCLUSIVE, ADD) + else: + with self._lock: + if remove is not None: + self._roster.modify(remove, REMOVE) + self._roster.modify(SHARED, ADD) + self.is_exclusive = exclusive + return self + except ExclusiveLock.LockError as err: + msg = str(err) + if exclusive: + raise self.ExclusiveLockFailed(msg) + else: + raise self.SharedLockFailed(msg) + + def _wait_for_readers_finishing(self, remove, sleep): + while True: + self._lock.acquire() + if remove is not None: + self._roster.modify(remove, REMOVE) + remove = None + if len(self._roster.get(SHARED)) == 0: + return # we are the only one and we keep the lock! + self._lock.release() + time.sleep(sleep) + + def release(self): + if self.is_exclusive: + self._roster.modify(EXCLUSIVE, REMOVE) + self._lock.release() + else: + with self._lock: + self._roster.modify(SHARED, REMOVE) + + def upgrade(self): + if not self.is_exclusive: + self.acquire(exclusive=True, remove=SHARED) + + def downgrade(self): + if self.is_exclusive: + self.acquire(exclusive=False, remove=EXCLUSIVE) + + def break_lock(self): + self._roster.remove() + self._lock.break_lock() diff --git a/borg/repository.py b/borg/repository.py index 0ad99970b..d7db689a0 100644 --- a/borg/repository.py +++ b/borg/repository.py @@ -114,11 +114,11 @@ class Repository: self.path = path if not os.path.isdir(path): raise self.DoesNotExist(path) + self.lock = UpgradableLock(os.path.join(path, 'repo'), exclusive).acquire() self.config = RawConfigParser() self.config.read(os.path.join(self.path, 'config')) if 'repository' not in self.config.sections() or self.config.getint('repository', 'version') != 1: raise self.InvalidRepository(path) - self.lock = UpgradableLock(os.path.join(path, 'config'), exclusive) self.max_segment_size = self.config.getint('repository', 'max_segment_size') self.segments_per_dir = self.config.getint('repository', 'segments_per_dir') self.id = unhexlify(self.config.get('repository', 'id').strip()) @@ -149,7 +149,7 @@ class Repository: self._active_txn = True try: self.lock.upgrade() - except UpgradableLock.WriteLockFailed: + except UpgradableLock.ExclusiveLockFailed: # if upgrading the lock to exclusive fails, we do not have an # active transaction. this is important for "serve" mode, where # the repository instance lives on - even if exceptions happened. diff --git a/borg/testsuite/locking.py b/borg/testsuite/locking.py index 6a910edb6..aae5925a6 100644 --- a/borg/testsuite/locking.py +++ b/borg/testsuite/locking.py @@ -1,24 +1,102 @@ -import os -import tempfile -import unittest +import pytest -from ..locking import UpgradableLock -from . import BaseTestCase +from ..locking import get_id, ExclusiveLock, UpgradableLock, LockRoster, ADD, REMOVE, SHARED, EXCLUSIVE -class UpgradableLockTestCase(BaseTestCase): +ID1 = "foo", 1, 1 +ID2 = "bar", 2, 2 - def test(self): - file = tempfile.NamedTemporaryFile() - lock = UpgradableLock(file.name) - lock.upgrade() - lock.upgrade() - lock.release() +def test_id(): + hostname, pid, tid = get_id() + assert isinstance(hostname, str) + assert isinstance(pid, int) + assert isinstance(tid, int) + assert len(hostname) > 0 + assert pid > 0 - @unittest.skipIf(os.getuid() == 0, 'Root can always open files for writing') - def test_read_only_lock_file(self): - file = tempfile.NamedTemporaryFile() - os.chmod(file.name, 0o444) - lock = UpgradableLock(file.name) - self.assert_raises(UpgradableLock.WriteLockFailed, lock.upgrade) - lock.release() + +@pytest.fixture() +def lockpath(tmpdir): + return str(tmpdir.join('lock')) + + +class TestExclusiveLock: + def test_checks(self, lockpath): + with ExclusiveLock(lockpath, timeout=1) as lock: + assert lock.is_locked() and lock.by_me() + + def test_acquire_break_reacquire(self, lockpath): + lock = ExclusiveLock(lockpath, id=ID1).acquire() + lock.break_lock() + with ExclusiveLock(lockpath, id=ID2): + pass + + def test_timeout(self, lockpath): + with ExclusiveLock(lockpath, id=ID1): + with pytest.raises(ExclusiveLock.LockTimeout): + ExclusiveLock(lockpath, id=ID2, timeout=0.1).acquire() + + +class TestUpgradableLock: + def test_shared(self, lockpath): + lock1 = UpgradableLock(lockpath, exclusive=False, id=ID1).acquire() + lock2 = UpgradableLock(lockpath, exclusive=False, id=ID2).acquire() + assert len(lock1._roster.get(SHARED)) == 2 + assert len(lock1._roster.get(EXCLUSIVE)) == 0 + lock1.release() + lock2.release() + + def test_exclusive(self, lockpath): + with UpgradableLock(lockpath, exclusive=True, id=ID1) as lock: + assert len(lock._roster.get(SHARED)) == 0 + assert len(lock._roster.get(EXCLUSIVE)) == 1 + + def test_upgrade(self, lockpath): + with UpgradableLock(lockpath, exclusive=False) as lock: + lock.upgrade() + lock.upgrade() # NOP + assert len(lock._roster.get(SHARED)) == 0 + assert len(lock._roster.get(EXCLUSIVE)) == 1 + + def test_downgrade(self, lockpath): + with UpgradableLock(lockpath, exclusive=True) as lock: + lock.downgrade() + lock.downgrade() # NOP + assert len(lock._roster.get(SHARED)) == 1 + assert len(lock._roster.get(EXCLUSIVE)) == 0 + + def test_break(self, lockpath): + lock = UpgradableLock(lockpath, exclusive=True, id=ID1).acquire() + lock.break_lock() + assert len(lock._roster.get(SHARED)) == 0 + assert len(lock._roster.get(EXCLUSIVE)) == 0 + with UpgradableLock(lockpath, exclusive=True, id=ID2): + pass + + +@pytest.fixture() +def rosterpath(tmpdir): + return str(tmpdir.join('roster')) + + +class TestLockRoster: + def test_empty(self, rosterpath): + roster = LockRoster(rosterpath) + empty = roster.load() + roster.save(empty) + assert empty == {} + + def test_modify_get(self, rosterpath): + roster1 = LockRoster(rosterpath, id=ID1) + assert roster1.get(SHARED) == set() + roster1.modify(SHARED, ADD) + assert roster1.get(SHARED) == {ID1, } + roster2 = LockRoster(rosterpath, id=ID2) + roster2.modify(SHARED, ADD) + assert roster2.get(SHARED) == {ID1, ID2, } + roster1 = LockRoster(rosterpath, id=ID1) + roster1.modify(SHARED, REMOVE) + assert roster1.get(SHARED) == {ID2, } + roster2 = LockRoster(rosterpath, id=ID2) + roster2.modify(SHARED, REMOVE) + assert roster2.get(SHARED) == set() diff --git a/borg/testsuite/repository.py b/borg/testsuite/repository.py index b32315938..1c9fd072d 100644 --- a/borg/testsuite/repository.py +++ b/borg/testsuite/repository.py @@ -157,9 +157,9 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase): for name in os.listdir(self.repository.path): if name.startswith('index.'): os.unlink(os.path.join(self.repository.path, name)) - with patch.object(UpgradableLock, 'upgrade', side_effect=UpgradableLock.WriteLockFailed) as upgrade: + with patch.object(UpgradableLock, 'upgrade', side_effect=UpgradableLock.ExclusiveLockFailed) as upgrade: self.reopen() - self.assert_raises(UpgradableLock.WriteLockFailed, lambda: len(self.repository)) + self.assert_raises(UpgradableLock.ExclusiveLockFailed, lambda: len(self.repository)) upgrade.assert_called_once_with() def test_crash_before_write_index(self):