diff --git a/borg/archiver.py b/borg/archiver.py index b65a76300..55563d3a9 100644 --- a/borg/archiver.py +++ b/borg/archiver.py @@ -37,15 +37,16 @@ has_lchflags = hasattr(os, 'lchflags') class Archiver: - def __init__(self, verbose=False): + def __init__(self, verbose=False, lock_wait=None): self.exit_code = EXIT_SUCCESS self.verbose = verbose + self.lock_wait = lock_wait def open_repository(self, location, create=False, exclusive=False): if location.proto == 'ssh': - repository = RemoteRepository(location, create=create) + repository = RemoteRepository(location, create=create, lock_wait=self.lock_wait) else: - repository = Repository(location.path, create=create, exclusive=exclusive) + repository = Repository(location.path, create=create, exclusive=exclusive, lock_wait=self.lock_wait) repository._location = location return repository @@ -119,7 +120,7 @@ class Archiver: compr_args = dict(buffer=COMPR_BUFFER) compr_args.update(args.compression) key.compressor = Compressor(**compr_args) - cache = Cache(repository, key, manifest, do_files=args.cache_files) + cache = Cache(repository, key, manifest, do_files=args.cache_files, lock_wait=self.lock_wait) archive = Archive(repository, key, manifest, args.archive.archive, cache=cache, create=True, checkpoint_interval=args.checkpoint_interval, numeric_owner=args.numeric_owner, progress=args.progress, @@ -305,7 +306,7 @@ class Archiver: """Rename an existing archive""" repository = self.open_repository(args.archive, exclusive=True) manifest, key = Manifest.load(repository) - cache = Cache(repository, key, manifest) + cache = Cache(repository, key, manifest, lock_wait=self.lock_wait) archive = Archive(repository, key, manifest, args.archive.archive, cache=cache) archive.rename(args.name) manifest.write() @@ -317,7 +318,7 @@ class Archiver: """Delete an existing repository or archive""" repository = self.open_repository(args.target, exclusive=True) manifest, key = Manifest.load(repository) - cache = Cache(repository, key, manifest, do_files=args.cache_files) + cache = Cache(repository, key, manifest, do_files=args.cache_files, lock_wait=self.lock_wait) if args.target.archive: archive = Archive(repository, key, manifest, args.target.archive, cache=cache) stats = Statistics() @@ -424,7 +425,7 @@ class Archiver: """Show archive details such as disk space used""" repository = self.open_repository(args.archive) manifest, key = Manifest.load(repository) - cache = Cache(repository, key, manifest, do_files=args.cache_files) + cache = Cache(repository, key, manifest, do_files=args.cache_files, lock_wait=self.lock_wait) archive = Archive(repository, key, manifest, args.archive.archive, cache=cache) stats = archive.calc_stats(cache) print('Name:', archive.name) @@ -443,7 +444,7 @@ class Archiver: """Prune repository archives according to specified rules""" repository = self.open_repository(args.repository, exclusive=True) manifest, key = Manifest.load(repository) - cache = Cache(repository, key, manifest, do_files=args.cache_files) + cache = Cache(repository, key, manifest, do_files=args.cache_files, lock_wait=self.lock_wait) archives = manifest.list_archive_infos(sort_by='ts', reverse=True) # just a ArchiveInfo list if args.hourly + args.daily + args.weekly + args.monthly + args.yearly == 0 and args.within is None: self.print_error('At least one of the "within", "keep-hourly", "keep-daily", "keep-weekly", ' @@ -646,6 +647,8 @@ class Archiver: common_parser.add_argument('--log-level', dest='log_level', default='info', metavar='LEVEL', choices=('debug', 'info', 'warning', 'error', 'critical'), help='set the log level to LEVEL, default: %(default)s)') + common_parser.add_argument('--lock-wait', dest='lock_wait', type=int, metavar='N', default=1, + help='wait for the lock, but max. N seconds (default: %(default)d).') common_parser.add_argument('--show-rc', dest='show_rc', action='store_true', default=False, help='show/log the return code (rc)') common_parser.add_argument('--no-files-cache', dest='cache_files', action='store_false', @@ -1153,6 +1156,7 @@ class Archiver: def run(self, args): os.umask(args.umask) # early, before opening files self.verbose = args.verbose + self.lock_wait = args.lock_wait RemoteRepository.remote_path = args.remote_path RemoteRepository.umask = args.umask setup_logging(level=args.log_level) # do not use loggers before this! diff --git a/borg/cache.py b/borg/cache.py index 89aedd4f9..6116df497 100644 --- a/borg/cache.py +++ b/borg/cache.py @@ -32,7 +32,8 @@ class Cache: class EncryptionMethodMismatch(Error): """Repository encryption method changed since last access, refusing to continue""" - def __init__(self, repository, key, manifest, path=None, sync=True, do_files=False, warn_if_unencrypted=True): + def __init__(self, repository, key, manifest, path=None, sync=True, do_files=False, warn_if_unencrypted=True, + lock_wait=None): self.lock = None self.timestamp = None self.lock = None @@ -52,7 +53,7 @@ class Cache: env_var_override='BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK'): raise self.CacheInitAbortedError() self.create() - self.open() + self.open(lock_wait=lock_wait) # Warn user before sending data to a relocated repository if self.previous_location and self.previous_location != repository._location.canonical_path(): msg = ("Warning: The repository at location {} was previously located at {}".format(repository._location.canonical_path(), self.previous_location) + @@ -136,10 +137,10 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" self.chunks = ChunkIndex.read(os.path.join(self.path, 'chunks').encode('utf-8')) self.files = None - def open(self): + def open(self, lock_wait=None): if not os.path.isdir(self.path): raise Exception('%s Does not look like a Borg cache' % self.path) - self.lock = UpgradableLock(os.path.join(self.path, 'lock'), exclusive=True).acquire() + self.lock = UpgradableLock(os.path.join(self.path, 'lock'), exclusive=True, timeout=lock_wait).acquire() self.rollback() def close(self): diff --git a/borg/locking.py b/borg/locking.py index 159e3c57e..d3f309cbd 100644 --- a/borg/locking.py +++ b/borg/locking.py @@ -74,26 +74,32 @@ class TimeoutTimer: return False +class LockError(Error): + """Failed to acquire the lock {}.""" + + +class LockErrorT(ErrorWithTraceback): + """Failed to acquire the lock {}.""" + + +class LockTimeout(LockError): + """Failed to create/acquire the lock {} (timeout).""" + + +class LockFailed(LockErrorT): + """Failed to create/acquire the lock {} ({}).""" + + +class NotLocked(LockErrorT): + """Failed to release the lock {} (was not locked).""" + + +class NotMyLock(LockErrorT): + """Failed to release the lock {} (was/is locked, but not by me).""" + + class ExclusiveLock: """An exclusive Lock based on mkdir fs operation being atomic""" - class LockError(ErrorWithTraceback): - """Failed to acquire the lock {}.""" - - class LockTimeout(LockError): - """Failed to create/acquire the lock {} (timeout).""" - - class LockFailed(LockError): - """Failed to create/acquire the lock {} ({}).""" - - class UnlockError(ErrorWithTraceback): - """Failed to release the lock {}.""" - - class NotLocked(UnlockError): - """Failed to release the lock {} (was not locked).""" - - class NotMyLock(UnlockError): - """Failed to release the lock {} (was/is locked, but not by me).""" - def __init__(self, path, timeout=None, sleep=None, id=None): self.timeout = timeout self.sleep = sleep @@ -124,9 +130,9 @@ class ExclusiveLock: if self.by_me(): return self if timer.timed_out_or_sleep(): - raise self.LockTimeout(self.path) + raise LockTimeout(self.path) else: - raise self.LockFailed(self.path, str(err)) + raise LockFailed(self.path, str(err)) else: with open(self.unique_name, "wb"): pass @@ -134,9 +140,9 @@ class ExclusiveLock: def release(self): if not self.is_locked(): - raise self.NotLocked(self.path) + raise NotLocked(self.path) if not self.by_me(): - raise self.NotMyLock(self.path) + raise NotMyLock(self.path) os.unlink(self.unique_name) os.rmdir(self.path) @@ -215,23 +221,18 @@ class UpgradableLock: noone is allowed reading) and read access to a resource needs a shared lock (multiple readers are allowed). """ - class SharedLockFailed(ErrorWithTraceback): - """Failed to acquire shared lock [{}]""" - - class ExclusiveLockFailed(ErrorWithTraceback): - """Failed to acquire write lock [{}]""" - - def __init__(self, path, exclusive=False, sleep=None, id=None): + def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None): self.path = path self.is_exclusive = exclusive self.sleep = sleep + self.timeout = timeout self.id = id or get_id() # globally keeping track of shared and exclusive lockers: self._roster = LockRoster(path + '.roster', id=id) # an exclusive lock, used for: # - holding while doing roster queries / updates # - holding while the UpgradableLock itself is exclusive - self._lock = ExclusiveLock(path + '.exclusive', id=id) + self._lock = ExclusiveLock(path + '.exclusive', id=id, timeout=timeout) def __enter__(self): return self.acquire() @@ -246,25 +247,19 @@ class UpgradableLock: if exclusive is None: exclusive = self.is_exclusive sleep = sleep or self.sleep or 0.2 - try: - if exclusive: - self._wait_for_readers_finishing(remove, sleep) - self._roster.modify(EXCLUSIVE, ADD) - else: - with self._lock: - if remove is not None: - self._roster.modify(remove, REMOVE) - self._roster.modify(SHARED, ADD) - self.is_exclusive = exclusive - return self - except ExclusiveLock.LockError as err: - msg = str(err) - if exclusive: - raise self.ExclusiveLockFailed(msg) - else: - raise self.SharedLockFailed(msg) + if exclusive: + self._wait_for_readers_finishing(remove, sleep) + self._roster.modify(EXCLUSIVE, ADD) + else: + with self._lock: + if remove is not None: + self._roster.modify(remove, REMOVE) + self._roster.modify(SHARED, ADD) + self.is_exclusive = exclusive + return self def _wait_for_readers_finishing(self, remove, sleep): + timer = TimeoutTimer(self.timeout, sleep).start() while True: self._lock.acquire() if remove is not None: @@ -273,7 +268,8 @@ class UpgradableLock: if len(self._roster.get(SHARED)) == 0: return # we are the only one and we keep the lock! self._lock.release() - time.sleep(sleep) + if timer.timed_out_or_sleep(): + raise LockTimeout(self.path) def release(self): if self.is_exclusive: diff --git a/borg/remote.py b/borg/remote.py index df435ebce..1d5e9cab6 100644 --- a/borg/remote.py +++ b/borg/remote.py @@ -97,7 +97,7 @@ class RepositoryServer: # pragma: no cover def negotiate(self, versions): return 1 - def open(self, path, create=False): + def open(self, path, create=False, lock_wait=None): path = os.fsdecode(path) if path.startswith('/~'): path = path[1:] @@ -108,7 +108,7 @@ class RepositoryServer: # pragma: no cover break else: raise PathNotAllowed(path) - self.repository = Repository(path, create) + self.repository = Repository(path, create, lock_wait=lock_wait) return self.repository.id @@ -122,7 +122,7 @@ class RemoteRepository: def __init__(self, name): self.name = name - def __init__(self, location, create=False): + def __init__(self, location, create=False, lock_wait=None): self.location = location self.preload_ids = [] self.msgid = 0 @@ -154,7 +154,7 @@ class RemoteRepository: raise ConnectionClosedWithHint('Is borg working on the server?') if version != 1: raise Exception('Server insisted on using unsupported protocol version %d' % version) - self.id = self.call('open', location.path, create) + self.id = self.call('open', location.path, create, lock_wait) def __del__(self): self.close() diff --git a/borg/repository.py b/borg/repository.py index 0127bca3b..77d1d234b 100644 --- a/borg/repository.py +++ b/borg/repository.py @@ -12,7 +12,7 @@ from zlib import crc32 from .helpers import Error, ErrorWithTraceback, IntegrityError, read_msgpack, write_msgpack, unhexlify from .hashindex import NSIndex -from .locking import UpgradableLock +from .locking import UpgradableLock, LockError, LockErrorT from .lrucache import LRUCache MAX_OBJECT_SIZE = 20 * 1024 * 1024 @@ -51,7 +51,7 @@ class Repository: class ObjectNotFound(ErrorWithTraceback): """Object with key {} not found in repository {}.""" - def __init__(self, path, create=False, exclusive=False): + def __init__(self, path, create=False, exclusive=False, lock_wait=None): self.path = os.path.abspath(path) self.io = None self.lock = None @@ -59,7 +59,7 @@ class Repository: self._active_txn = False if create: self.create(self.path) - self.open(self.path, exclusive) + self.open(self.path, exclusive, lock_wait=lock_wait) def __del__(self): self.close() @@ -129,11 +129,11 @@ class Repository: self.replay_segments(replay_from, segments_transaction_id) return self.get_index_transaction_id() - def open(self, path, exclusive): + def open(self, path, exclusive, lock_wait=None): self.path = path if not os.path.isdir(path): raise self.DoesNotExist(path) - self.lock = UpgradableLock(os.path.join(path, 'lock'), exclusive).acquire() + self.lock = UpgradableLock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait).acquire() self.config = ConfigParser(interpolation=None) self.config.read(os.path.join(self.path, 'config')) if 'repository' not in self.config.sections() or self.config.getint('repository', 'version') != 1: @@ -168,7 +168,7 @@ class Repository: self._active_txn = True try: self.lock.upgrade() - except UpgradableLock.ExclusiveLockFailed: + except (LockError, LockErrorT): # if upgrading the lock to exclusive fails, we do not have an # active transaction. this is important for "serve" mode, where # the repository instance lives on - even if exceptions happened. diff --git a/borg/testsuite/locking.py b/borg/testsuite/locking.py index 4b36e0caa..dc9d969cb 100644 --- a/borg/testsuite/locking.py +++ b/borg/testsuite/locking.py @@ -2,7 +2,8 @@ import time import pytest -from ..locking import get_id, TimeoutTimer, ExclusiveLock , UpgradableLock, LockRoster, ADD, REMOVE, SHARED, EXCLUSIVE +from ..locking import get_id, TimeoutTimer, ExclusiveLock, UpgradableLock, LockRoster, \ + ADD, REMOVE, SHARED, EXCLUSIVE, LockTimeout ID1 = "foo", 1, 1 @@ -52,7 +53,7 @@ class TestExclusiveLock: def test_timeout(self, lockpath): with ExclusiveLock(lockpath, id=ID1): - with pytest.raises(ExclusiveLock.LockTimeout): + with pytest.raises(LockTimeout): ExclusiveLock(lockpath, id=ID2, timeout=0.1).acquire() @@ -92,6 +93,17 @@ class TestUpgradableLock: with UpgradableLock(lockpath, exclusive=True, id=ID2): pass + def test_timeout(self, lockpath): + with UpgradableLock(lockpath, exclusive=False, id=ID1): + with pytest.raises(LockTimeout): + UpgradableLock(lockpath, exclusive=True, id=ID2, timeout=0.1).acquire() + with UpgradableLock(lockpath, exclusive=True, id=ID1): + with pytest.raises(LockTimeout): + UpgradableLock(lockpath, exclusive=False, id=ID2, timeout=0.1).acquire() + with UpgradableLock(lockpath, exclusive=True, id=ID1): + with pytest.raises(LockTimeout): + UpgradableLock(lockpath, exclusive=True, id=ID2, timeout=0.1).acquire() + @pytest.fixture() def rosterpath(tmpdir): diff --git a/borg/testsuite/repository.py b/borg/testsuite/repository.py index 2b99b83d6..713f44029 100644 --- a/borg/testsuite/repository.py +++ b/borg/testsuite/repository.py @@ -6,7 +6,7 @@ from mock import patch from ..hashindex import NSIndex from ..helpers import Location, IntegrityError -from ..locking import UpgradableLock +from ..locking import UpgradableLock, LockFailed from ..remote import RemoteRepository, InvalidRPCMethod from ..repository import Repository from . import BaseTestCase @@ -158,9 +158,9 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase): for name in os.listdir(self.repository.path): if name.startswith('index.'): os.unlink(os.path.join(self.repository.path, name)) - with patch.object(UpgradableLock, 'upgrade', side_effect=UpgradableLock.ExclusiveLockFailed) as upgrade: + with patch.object(UpgradableLock, 'upgrade', side_effect=LockFailed) as upgrade: self.reopen() - self.assert_raises(UpgradableLock.ExclusiveLockFailed, lambda: len(self.repository)) + self.assert_raises(LockFailed, lambda: len(self.repository)) upgrade.assert_called_once_with() def test_crash_before_write_index(self):