mirror of
https://github.com/borgbackup/borg.git
synced 2026-03-22 18:34:03 -04:00
Avoid deadlock problems
Explicitly use write locks, instead of read locks (that are later upgraded) unless we know we will never modify the repository
This commit is contained in:
parent
af3e78e6b2
commit
64cd6632a1
5 changed files with 30 additions and 20 deletions
|
|
@ -27,11 +27,11 @@ class Archiver:
|
|||
def __init__(self):
|
||||
self.exit_code = 0
|
||||
|
||||
def open_repository(self, location, create=False):
|
||||
def open_repository(self, location, create=False, exclusive=False):
|
||||
if location.proto == 'ssh':
|
||||
repository = RemoteRepository(location, create=create)
|
||||
else:
|
||||
repository = Repository(location.path, create=create)
|
||||
repository = Repository(location.path, create=create, exclusive=exclusive)
|
||||
repository._location = location
|
||||
return repository
|
||||
|
||||
|
|
@ -56,7 +56,7 @@ class Archiver:
|
|||
def do_init(self, args):
|
||||
"""Initialize an empty repository"""
|
||||
print('Initializing repository at "%s"' % args.repository.orig)
|
||||
repository = self.open_repository(args.repository, create=True)
|
||||
repository = self.open_repository(args.repository, create=True, exclusive=True)
|
||||
key = key_creator(repository, args)
|
||||
manifest = Manifest(key, repository)
|
||||
manifest.key = key
|
||||
|
|
@ -66,7 +66,7 @@ class Archiver:
|
|||
|
||||
def do_check(self, args):
|
||||
"""Check repository consistency"""
|
||||
repository = self.open_repository(args.repository)
|
||||
repository = self.open_repository(args.repository, exclusive=args.repair)
|
||||
if args.repair:
|
||||
while not os.environ.get('ATTIC_CHECK_I_KNOW_WHAT_I_AM_DOING'):
|
||||
self.print_error("""Warning: 'check --repair' is an experimental feature that might result
|
||||
|
|
@ -95,7 +95,7 @@ Type "Yes I am sure" if you understand this and want to continue.\n""")
|
|||
def do_create(self, args):
|
||||
"""Create new archive"""
|
||||
t0 = datetime.now()
|
||||
repository = self.open_repository(args.archive)
|
||||
repository = self.open_repository(args.archive, exclusive=True)
|
||||
manifest, key = Manifest.load(repository)
|
||||
cache = Cache(repository, key, manifest)
|
||||
archive = Archive(repository, key, manifest, args.archive.archive, cache=cache,
|
||||
|
|
@ -216,7 +216,7 @@ Type "Yes I am sure" if you understand this and want to continue.\n""")
|
|||
|
||||
def do_delete(self, args):
|
||||
"""Delete an existing archive"""
|
||||
repository = self.open_repository(args.archive)
|
||||
repository = self.open_repository(args.archive, exclusive=True)
|
||||
manifest, key = Manifest.load(repository)
|
||||
cache = Cache(repository, key, manifest)
|
||||
archive = Archive(repository, key, manifest, args.archive.archive, cache=cache)
|
||||
|
|
@ -308,7 +308,7 @@ Type "Yes I am sure" if you understand this and want to continue.\n""")
|
|||
|
||||
def do_prune(self, args):
|
||||
"""Prune repository archives according to specified rules"""
|
||||
repository = self.open_repository(args.repository)
|
||||
repository = self.open_repository(args.repository, exclusive=True)
|
||||
manifest, key = Manifest.load(repository)
|
||||
cache = Cache(repository, key, manifest)
|
||||
archives = list(sorted(Archive.list_archives(repository, key, manifest, cache),
|
||||
|
|
|
|||
|
|
@ -33,7 +33,10 @@ class ExtensionModuleError(Error):
|
|||
|
||||
class UpgradableLock:
|
||||
|
||||
class LockUpgradeFailed(Error):
|
||||
class ReadLockFailed(Error):
|
||||
"""Failed to acquire read lock on {}"""
|
||||
|
||||
class WriteLockFailed(Error):
|
||||
"""Failed to acquire write lock on {}"""
|
||||
|
||||
def __init__(self, path, exclusive=False):
|
||||
|
|
@ -42,10 +45,17 @@ class UpgradableLock:
|
|||
self.fd = open(path, 'r+')
|
||||
except IOError:
|
||||
self.fd = open(path, 'r')
|
||||
if exclusive:
|
||||
fcntl.lockf(self.fd, fcntl.LOCK_EX)
|
||||
else:
|
||||
fcntl.lockf(self.fd, fcntl.LOCK_SH)
|
||||
try:
|
||||
if exclusive:
|
||||
fcntl.lockf(self.fd, fcntl.LOCK_EX)
|
||||
else:
|
||||
fcntl.lockf(self.fd, fcntl.LOCK_SH)
|
||||
# Python 3.2 raises IOError, Python3.3+ raises OSError
|
||||
except (IOError, OSError):
|
||||
if exclusive:
|
||||
raise self.WriteLockFailed(self.path)
|
||||
else:
|
||||
raise self.ReadLockFailed(self.path)
|
||||
self.is_exclusive = exclusive
|
||||
|
||||
def upgrade(self):
|
||||
|
|
@ -53,7 +63,7 @@ class UpgradableLock:
|
|||
fcntl.lockf(self.fd, fcntl.LOCK_EX)
|
||||
# Python 3.2 raises IOError, Python3.3+ raises OSError
|
||||
except (IOError, OSError):
|
||||
raise self.LockUpgradeFailed(self.path)
|
||||
raise self.WriteLockFailed(self.path)
|
||||
self.is_exclusive = True
|
||||
|
||||
def release(self):
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ class Repository(object):
|
|||
class CheckNeeded(Error):
|
||||
'''Inconsistency detected. Please run "attic check {}"'''
|
||||
|
||||
def __init__(self, path, create=False):
|
||||
def __init__(self, path, create=False, exclusive=False):
|
||||
self.path = path
|
||||
self.io = None
|
||||
self.lock = None
|
||||
|
|
@ -52,7 +52,7 @@ class Repository(object):
|
|||
self._active_txn = False
|
||||
if create:
|
||||
self.create(path)
|
||||
self.open(path)
|
||||
self.open(path, exclusive)
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
|
@ -98,7 +98,7 @@ class Repository(object):
|
|||
self.replay_segments(replay_from, segments_transaction_id)
|
||||
return self.get_index_transaction_id()
|
||||
|
||||
def open(self, path):
|
||||
def open(self, path, exclusive):
|
||||
self.path = path
|
||||
if not os.path.isdir(path):
|
||||
raise self.DoesNotExist(path)
|
||||
|
|
@ -106,7 +106,7 @@ class Repository(object):
|
|||
self.config.read(os.path.join(self.path, 'config'))
|
||||
if not 'repository' in self.config.sections() or self.config.getint('repository', 'version') != 1:
|
||||
raise self.InvalidRepository(path)
|
||||
self.lock = UpgradableLock(os.path.join(path, 'config'))
|
||||
self.lock = UpgradableLock(os.path.join(path, 'config'), exclusive)
|
||||
self.max_segment_size = self.config.getint('repository', 'max_segment_size')
|
||||
self.segments_per_dir = self.config.getint('repository', 'segments_per_dir')
|
||||
self.id = unhexlify(self.config.get('repository', 'id').strip())
|
||||
|
|
|
|||
|
|
@ -120,7 +120,7 @@ class UpgradableLockTestCase(AtticTestCase):
|
|||
file = tempfile.NamedTemporaryFile()
|
||||
os.chmod(file.name, 0o444)
|
||||
lock = UpgradableLock(file.name)
|
||||
self.assert_raises(UpgradableLock.LockUpgradeFailed, lock.upgrade)
|
||||
self.assert_raises(UpgradableLock.WriteLockFailed, lock.upgrade)
|
||||
lock.release()
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -154,9 +154,9 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase):
|
|||
for name in os.listdir(self.repository.path):
|
||||
if name.startswith('index.'):
|
||||
os.unlink(os.path.join(self.repository.path, name))
|
||||
with patch.object(UpgradableLock, 'upgrade', side_effect=UpgradableLock.LockUpgradeFailed) as upgrade:
|
||||
with patch.object(UpgradableLock, 'upgrade', side_effect=UpgradableLock.WriteLockFailed) as upgrade:
|
||||
self.reopen()
|
||||
self.assert_raises(UpgradableLock.LockUpgradeFailed, lambda: len(self.repository))
|
||||
self.assert_raises(UpgradableLock.WriteLockFailed, lambda: len(self.repository))
|
||||
upgrade.assert_called_once()
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue