implement --lock-wait, support timeout for UpgradableLock, fixes #210

also: simplify exceptions / exception handling
This commit is contained in:
Thomas Waldmann 2015-11-21 15:34:51 +01:00
parent b3b4db427c
commit f19e95fcf7
7 changed files with 88 additions and 75 deletions

View file

@ -37,15 +37,16 @@ has_lchflags = hasattr(os, 'lchflags')
class Archiver:
def __init__(self, verbose=False):
def __init__(self, verbose=False, lock_wait=None):
self.exit_code = EXIT_SUCCESS
self.verbose = verbose
self.lock_wait = lock_wait
def open_repository(self, location, create=False, exclusive=False):
if location.proto == 'ssh':
repository = RemoteRepository(location, create=create)
repository = RemoteRepository(location, create=create, lock_wait=self.lock_wait)
else:
repository = Repository(location.path, create=create, exclusive=exclusive)
repository = Repository(location.path, create=create, exclusive=exclusive, lock_wait=self.lock_wait)
repository._location = location
return repository
@ -119,7 +120,7 @@ class Archiver:
compr_args = dict(buffer=COMPR_BUFFER)
compr_args.update(args.compression)
key.compressor = Compressor(**compr_args)
cache = Cache(repository, key, manifest, do_files=args.cache_files)
cache = Cache(repository, key, manifest, do_files=args.cache_files, lock_wait=self.lock_wait)
archive = Archive(repository, key, manifest, args.archive.archive, cache=cache,
create=True, checkpoint_interval=args.checkpoint_interval,
numeric_owner=args.numeric_owner, progress=args.progress,
@ -305,7 +306,7 @@ class Archiver:
"""Rename an existing archive"""
repository = self.open_repository(args.archive, exclusive=True)
manifest, key = Manifest.load(repository)
cache = Cache(repository, key, manifest)
cache = Cache(repository, key, manifest, lock_wait=self.lock_wait)
archive = Archive(repository, key, manifest, args.archive.archive, cache=cache)
archive.rename(args.name)
manifest.write()
@ -317,7 +318,7 @@ class Archiver:
"""Delete an existing repository or archive"""
repository = self.open_repository(args.target, exclusive=True)
manifest, key = Manifest.load(repository)
cache = Cache(repository, key, manifest, do_files=args.cache_files)
cache = Cache(repository, key, manifest, do_files=args.cache_files, lock_wait=self.lock_wait)
if args.target.archive:
archive = Archive(repository, key, manifest, args.target.archive, cache=cache)
stats = Statistics()
@ -424,7 +425,7 @@ class Archiver:
"""Show archive details such as disk space used"""
repository = self.open_repository(args.archive)
manifest, key = Manifest.load(repository)
cache = Cache(repository, key, manifest, do_files=args.cache_files)
cache = Cache(repository, key, manifest, do_files=args.cache_files, lock_wait=self.lock_wait)
archive = Archive(repository, key, manifest, args.archive.archive, cache=cache)
stats = archive.calc_stats(cache)
print('Name:', archive.name)
@ -443,7 +444,7 @@ class Archiver:
"""Prune repository archives according to specified rules"""
repository = self.open_repository(args.repository, exclusive=True)
manifest, key = Manifest.load(repository)
cache = Cache(repository, key, manifest, do_files=args.cache_files)
cache = Cache(repository, key, manifest, do_files=args.cache_files, lock_wait=self.lock_wait)
archives = manifest.list_archive_infos(sort_by='ts', reverse=True) # just a ArchiveInfo list
if args.hourly + args.daily + args.weekly + args.monthly + args.yearly == 0 and args.within is None:
self.print_error('At least one of the "within", "keep-hourly", "keep-daily", "keep-weekly", '
@ -646,6 +647,8 @@ class Archiver:
common_parser.add_argument('--log-level', dest='log_level', default='info', metavar='LEVEL',
choices=('debug', 'info', 'warning', 'error', 'critical'),
help='set the log level to LEVEL, default: %(default)s)')
common_parser.add_argument('--lock-wait', dest='lock_wait', type=int, metavar='N', default=1,
help='wait for the lock, but max. N seconds (default: %(default)d).')
common_parser.add_argument('--show-rc', dest='show_rc', action='store_true', default=False,
help='show/log the return code (rc)')
common_parser.add_argument('--no-files-cache', dest='cache_files', action='store_false',
@ -1153,6 +1156,7 @@ class Archiver:
def run(self, args):
os.umask(args.umask) # early, before opening files
self.verbose = args.verbose
self.lock_wait = args.lock_wait
RemoteRepository.remote_path = args.remote_path
RemoteRepository.umask = args.umask
setup_logging(level=args.log_level) # do not use loggers before this!

View file

@ -32,7 +32,8 @@ class Cache:
class EncryptionMethodMismatch(Error):
"""Repository encryption method changed since last access, refusing to continue"""
def __init__(self, repository, key, manifest, path=None, sync=True, do_files=False, warn_if_unencrypted=True):
def __init__(self, repository, key, manifest, path=None, sync=True, do_files=False, warn_if_unencrypted=True,
lock_wait=None):
self.lock = None
self.timestamp = None
self.lock = None
@ -52,7 +53,7 @@ class Cache:
env_var_override='BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK'):
raise self.CacheInitAbortedError()
self.create()
self.open()
self.open(lock_wait=lock_wait)
# Warn user before sending data to a relocated repository
if self.previous_location and self.previous_location != repository._location.canonical_path():
msg = ("Warning: The repository at location {} was previously located at {}".format(repository._location.canonical_path(), self.previous_location) +
@ -136,10 +137,10 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
self.chunks = ChunkIndex.read(os.path.join(self.path, 'chunks').encode('utf-8'))
self.files = None
def open(self):
def open(self, lock_wait=None):
if not os.path.isdir(self.path):
raise Exception('%s Does not look like a Borg cache' % self.path)
self.lock = UpgradableLock(os.path.join(self.path, 'lock'), exclusive=True).acquire()
self.lock = UpgradableLock(os.path.join(self.path, 'lock'), exclusive=True, timeout=lock_wait).acquire()
self.rollback()
def close(self):

View file

@ -74,26 +74,32 @@ class TimeoutTimer:
return False
class LockError(Error):
"""Failed to acquire the lock {}."""
class LockErrorT(ErrorWithTraceback):
"""Failed to acquire the lock {}."""
class LockTimeout(LockError):
"""Failed to create/acquire the lock {} (timeout)."""
class LockFailed(LockErrorT):
"""Failed to create/acquire the lock {} ({})."""
class NotLocked(LockErrorT):
"""Failed to release the lock {} (was not locked)."""
class NotMyLock(LockErrorT):
"""Failed to release the lock {} (was/is locked, but not by me)."""
class ExclusiveLock:
"""An exclusive Lock based on mkdir fs operation being atomic"""
class LockError(ErrorWithTraceback):
"""Failed to acquire the lock {}."""
class LockTimeout(LockError):
"""Failed to create/acquire the lock {} (timeout)."""
class LockFailed(LockError):
"""Failed to create/acquire the lock {} ({})."""
class UnlockError(ErrorWithTraceback):
"""Failed to release the lock {}."""
class NotLocked(UnlockError):
"""Failed to release the lock {} (was not locked)."""
class NotMyLock(UnlockError):
"""Failed to release the lock {} (was/is locked, but not by me)."""
def __init__(self, path, timeout=None, sleep=None, id=None):
self.timeout = timeout
self.sleep = sleep
@ -124,9 +130,9 @@ class ExclusiveLock:
if self.by_me():
return self
if timer.timed_out_or_sleep():
raise self.LockTimeout(self.path)
raise LockTimeout(self.path)
else:
raise self.LockFailed(self.path, str(err))
raise LockFailed(self.path, str(err))
else:
with open(self.unique_name, "wb"):
pass
@ -134,9 +140,9 @@ class ExclusiveLock:
def release(self):
if not self.is_locked():
raise self.NotLocked(self.path)
raise NotLocked(self.path)
if not self.by_me():
raise self.NotMyLock(self.path)
raise NotMyLock(self.path)
os.unlink(self.unique_name)
os.rmdir(self.path)
@ -215,23 +221,18 @@ class UpgradableLock:
noone is allowed reading) and read access to a resource needs a shared
lock (multiple readers are allowed).
"""
class SharedLockFailed(ErrorWithTraceback):
"""Failed to acquire shared lock [{}]"""
class ExclusiveLockFailed(ErrorWithTraceback):
"""Failed to acquire write lock [{}]"""
def __init__(self, path, exclusive=False, sleep=None, id=None):
def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None):
self.path = path
self.is_exclusive = exclusive
self.sleep = sleep
self.timeout = timeout
self.id = id or get_id()
# globally keeping track of shared and exclusive lockers:
self._roster = LockRoster(path + '.roster', id=id)
# an exclusive lock, used for:
# - holding while doing roster queries / updates
# - holding while the UpgradableLock itself is exclusive
self._lock = ExclusiveLock(path + '.exclusive', id=id)
self._lock = ExclusiveLock(path + '.exclusive', id=id, timeout=timeout)
def __enter__(self):
return self.acquire()
@ -246,25 +247,19 @@ class UpgradableLock:
if exclusive is None:
exclusive = self.is_exclusive
sleep = sleep or self.sleep or 0.2
try:
if exclusive:
self._wait_for_readers_finishing(remove, sleep)
self._roster.modify(EXCLUSIVE, ADD)
else:
with self._lock:
if remove is not None:
self._roster.modify(remove, REMOVE)
self._roster.modify(SHARED, ADD)
self.is_exclusive = exclusive
return self
except ExclusiveLock.LockError as err:
msg = str(err)
if exclusive:
raise self.ExclusiveLockFailed(msg)
else:
raise self.SharedLockFailed(msg)
if exclusive:
self._wait_for_readers_finishing(remove, sleep)
self._roster.modify(EXCLUSIVE, ADD)
else:
with self._lock:
if remove is not None:
self._roster.modify(remove, REMOVE)
self._roster.modify(SHARED, ADD)
self.is_exclusive = exclusive
return self
def _wait_for_readers_finishing(self, remove, sleep):
timer = TimeoutTimer(self.timeout, sleep).start()
while True:
self._lock.acquire()
if remove is not None:
@ -273,7 +268,8 @@ class UpgradableLock:
if len(self._roster.get(SHARED)) == 0:
return # we are the only one and we keep the lock!
self._lock.release()
time.sleep(sleep)
if timer.timed_out_or_sleep():
raise LockTimeout(self.path)
def release(self):
if self.is_exclusive:

View file

@ -97,7 +97,7 @@ class RepositoryServer: # pragma: no cover
def negotiate(self, versions):
return 1
def open(self, path, create=False):
def open(self, path, create=False, lock_wait=None):
path = os.fsdecode(path)
if path.startswith('/~'):
path = path[1:]
@ -108,7 +108,7 @@ class RepositoryServer: # pragma: no cover
break
else:
raise PathNotAllowed(path)
self.repository = Repository(path, create)
self.repository = Repository(path, create, lock_wait=lock_wait)
return self.repository.id
@ -122,7 +122,7 @@ class RemoteRepository:
def __init__(self, name):
self.name = name
def __init__(self, location, create=False):
def __init__(self, location, create=False, lock_wait=None):
self.location = location
self.preload_ids = []
self.msgid = 0
@ -154,7 +154,7 @@ class RemoteRepository:
raise ConnectionClosedWithHint('Is borg working on the server?')
if version != 1:
raise Exception('Server insisted on using unsupported protocol version %d' % version)
self.id = self.call('open', location.path, create)
self.id = self.call('open', location.path, create, lock_wait)
def __del__(self):
self.close()

View file

@ -12,7 +12,7 @@ from zlib import crc32
from .helpers import Error, ErrorWithTraceback, IntegrityError, read_msgpack, write_msgpack, unhexlify
from .hashindex import NSIndex
from .locking import UpgradableLock
from .locking import UpgradableLock, LockError, LockErrorT
from .lrucache import LRUCache
MAX_OBJECT_SIZE = 20 * 1024 * 1024
@ -51,7 +51,7 @@ class Repository:
class ObjectNotFound(ErrorWithTraceback):
"""Object with key {} not found in repository {}."""
def __init__(self, path, create=False, exclusive=False):
def __init__(self, path, create=False, exclusive=False, lock_wait=None):
self.path = os.path.abspath(path)
self.io = None
self.lock = None
@ -59,7 +59,7 @@ class Repository:
self._active_txn = False
if create:
self.create(self.path)
self.open(self.path, exclusive)
self.open(self.path, exclusive, lock_wait=lock_wait)
def __del__(self):
self.close()
@ -129,11 +129,11 @@ class Repository:
self.replay_segments(replay_from, segments_transaction_id)
return self.get_index_transaction_id()
def open(self, path, exclusive):
def open(self, path, exclusive, lock_wait=None):
self.path = path
if not os.path.isdir(path):
raise self.DoesNotExist(path)
self.lock = UpgradableLock(os.path.join(path, 'lock'), exclusive).acquire()
self.lock = UpgradableLock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait).acquire()
self.config = ConfigParser(interpolation=None)
self.config.read(os.path.join(self.path, 'config'))
if 'repository' not in self.config.sections() or self.config.getint('repository', 'version') != 1:
@ -168,7 +168,7 @@ class Repository:
self._active_txn = True
try:
self.lock.upgrade()
except UpgradableLock.ExclusiveLockFailed:
except (LockError, LockErrorT):
# if upgrading the lock to exclusive fails, we do not have an
# active transaction. this is important for "serve" mode, where
# the repository instance lives on - even if exceptions happened.

View file

@ -2,7 +2,8 @@ import time
import pytest
from ..locking import get_id, TimeoutTimer, ExclusiveLock , UpgradableLock, LockRoster, ADD, REMOVE, SHARED, EXCLUSIVE
from ..locking import get_id, TimeoutTimer, ExclusiveLock, UpgradableLock, LockRoster, \
ADD, REMOVE, SHARED, EXCLUSIVE, LockTimeout
ID1 = "foo", 1, 1
@ -52,7 +53,7 @@ class TestExclusiveLock:
def test_timeout(self, lockpath):
with ExclusiveLock(lockpath, id=ID1):
with pytest.raises(ExclusiveLock.LockTimeout):
with pytest.raises(LockTimeout):
ExclusiveLock(lockpath, id=ID2, timeout=0.1).acquire()
@ -92,6 +93,17 @@ class TestUpgradableLock:
with UpgradableLock(lockpath, exclusive=True, id=ID2):
pass
def test_timeout(self, lockpath):
with UpgradableLock(lockpath, exclusive=False, id=ID1):
with pytest.raises(LockTimeout):
UpgradableLock(lockpath, exclusive=True, id=ID2, timeout=0.1).acquire()
with UpgradableLock(lockpath, exclusive=True, id=ID1):
with pytest.raises(LockTimeout):
UpgradableLock(lockpath, exclusive=False, id=ID2, timeout=0.1).acquire()
with UpgradableLock(lockpath, exclusive=True, id=ID1):
with pytest.raises(LockTimeout):
UpgradableLock(lockpath, exclusive=True, id=ID2, timeout=0.1).acquire()
@pytest.fixture()
def rosterpath(tmpdir):

View file

@ -6,7 +6,7 @@ from mock import patch
from ..hashindex import NSIndex
from ..helpers import Location, IntegrityError
from ..locking import UpgradableLock
from ..locking import UpgradableLock, LockFailed
from ..remote import RemoteRepository, InvalidRPCMethod
from ..repository import Repository
from . import BaseTestCase
@ -158,9 +158,9 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase):
for name in os.listdir(self.repository.path):
if name.startswith('index.'):
os.unlink(os.path.join(self.repository.path, name))
with patch.object(UpgradableLock, 'upgrade', side_effect=UpgradableLock.ExclusiveLockFailed) as upgrade:
with patch.object(UpgradableLock, 'upgrade', side_effect=LockFailed) as upgrade:
self.reopen()
self.assert_raises(UpgradableLock.ExclusiveLockFailed, lambda: len(self.repository))
self.assert_raises(LockFailed, lambda: len(self.repository))
upgrade.assert_called_once_with()
def test_crash_before_write_index(self):