From 26007c016202fdfc564ea52f3bb71e36c4486b15 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 23 Jul 2016 13:58:19 +0200 Subject: [PATCH 1/6] add Lock.got_exclusive_lock --- borg/locking.py | 5 +++++ borg/testsuite/locking.py | 8 ++++++++ 2 files changed, 13 insertions(+) diff --git a/borg/locking.py b/borg/locking.py index 3a88d1504..1607d21fb 100644 --- a/borg/locking.py +++ b/borg/locking.py @@ -299,6 +299,8 @@ class UpgradableLock: self._roster.modify(SHARED, REMOVE) def upgrade(self): + # WARNING: if multiple read-lockers want to upgrade, it will deadlock because they + # all will wait until the other read locks go away - and that won't happen. if not self.is_exclusive: self.acquire(exclusive=True, remove=SHARED) @@ -306,6 +308,9 @@ class UpgradableLock: if self.is_exclusive: self.acquire(exclusive=False, remove=EXCLUSIVE) + def got_exclusive_lock(self): + return self.is_exclusive and self._lock.is_locked() and self._lock.by_me() + def break_lock(self): self._roster.remove() self._lock.break_lock() diff --git a/borg/testsuite/locking.py b/borg/testsuite/locking.py index bc62650db..b219e98b1 100644 --- a/borg/testsuite/locking.py +++ b/borg/testsuite/locking.py @@ -86,6 +86,14 @@ class TestUpgradableLock: assert len(lock._roster.get(SHARED)) == 1 assert len(lock._roster.get(EXCLUSIVE)) == 0 + def test_got_exclusive_lock(self, lockpath): + lock = UpgradableLock(lockpath, exclusive=True, id=ID1) + assert not lock.got_exclusive_lock() + lock.acquire() + assert lock.got_exclusive_lock() + lock.release() + assert not lock.got_exclusive_lock() + def test_break(self, lockpath): lock = UpgradableLock(lockpath, exclusive=True, id=ID1).acquire() lock.break_lock() From 2a355e547e56cd24b2f7b6ca8575ef001b190c3c Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 23 Jul 2016 14:13:32 +0200 Subject: [PATCH 2/6] make sure we have a excl. lock when starting a transaction if we don't, we try to upgrade the lock. this is to support old clients talking to a new server and also to avoid bad consequences from coding mistakes for new clients. --- borg/repository.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/borg/repository.py b/borg/repository.py index 262467dc6..5cf0a4f71 100644 --- a/borg/repository.py +++ b/borg/repository.py @@ -207,14 +207,23 @@ class Repository: def prepare_txn(self, transaction_id, do_cleanup=True): self._active_txn = True - try: - self.lock.upgrade() - except (LockError, LockErrorT): - # if upgrading the lock to exclusive fails, we do not have an - # active transaction. this is important for "serve" mode, where - # the repository instance lives on - even if exceptions happened. - self._active_txn = False - raise + if not self.lock.got_exclusive_lock(): + if self.exclusive is not None: + # self.exclusive is either True or False, thus a new client is active here. + # if it is False and we get here, the caller did not use exclusive=True although + # it is needed for a write operation. if it is True and we get here, something else + # went very wrong, because we should have a exclusive lock, but we don't. + raise AssertionError("bug in code, exclusive lock should exist here") + # if we are here, this is an old client talking to a new server (expecting lock upgrade). + # or we are replaying segments and might need a lock upgrade for that. + try: + self.lock.upgrade() + except (LockError, LockErrorT): + # if upgrading the lock to exclusive fails, we do not have an + # active transaction. this is important for "serve" mode, where + # the repository instance lives on - even if exceptions happened. + self._active_txn = False + raise if not self.index or transaction_id is None: self.index = self.open_index(transaction_id) if transaction_id is None: From d3d51e12eae654687e5c29e8734b2770fc2aa616 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 23 Jul 2016 13:56:06 +0200 Subject: [PATCH 3/6] rename UpgradableLock to Lock lock upgrading is troublesome / may deadlock, do not advertise it. --- borg/cache.py | 6 +++--- borg/locking.py | 6 +++--- borg/repository.py | 6 +++--- borg/testsuite/locking.py | 32 ++++++++++++++++---------------- borg/testsuite/repository.py | 2 +- borg/upgrader.py | 7 +++---- 6 files changed, 29 insertions(+), 30 deletions(-) diff --git a/borg/cache.py b/borg/cache.py index 7badac505..0cacb2a8b 100644 --- a/borg/cache.py +++ b/borg/cache.py @@ -11,7 +11,7 @@ from .logger import create_logger logger = create_logger() from .helpers import Error, get_cache_dir, decode_dict, int_to_bigint, \ bigint_to_int, format_file_size, yes -from .locking import UpgradableLock +from .locking import Lock from .hashindex import ChunkIndex import msgpack @@ -35,7 +35,7 @@ class Cache: @staticmethod def break_lock(repository, path=None): path = path or os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii')) - UpgradableLock(os.path.join(path, 'lock'), exclusive=True).break_lock() + Lock(os.path.join(path, 'lock'), exclusive=True).break_lock() @staticmethod def destroy(repository, path=None): @@ -152,7 +152,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" def open(self, lock_wait=None): if not os.path.isdir(self.path): raise Exception('%s Does not look like a Borg cache' % self.path) - self.lock = UpgradableLock(os.path.join(self.path, 'lock'), exclusive=True, timeout=lock_wait).acquire() + self.lock = Lock(os.path.join(self.path, 'lock'), exclusive=True, timeout=lock_wait).acquire() self.rollback() def close(self): diff --git a/borg/locking.py b/borg/locking.py index 1607d21fb..2dbb27cbc 100644 --- a/borg/locking.py +++ b/borg/locking.py @@ -217,7 +217,7 @@ class LockRoster: self.save(roster) -class UpgradableLock: +class Lock: """ A Lock for a resource that can be accessed in a shared or exclusive way. Typically, write access to a resource needs an exclusive lock (1 writer, @@ -226,7 +226,7 @@ class UpgradableLock: If possible, try to use the contextmanager here like:: - with UpgradableLock(...) as lock: + with Lock(...) as lock: ... This makes sure the lock is released again if the block is left, no @@ -242,7 +242,7 @@ class UpgradableLock: self._roster = LockRoster(path + '.roster', id=id) # an exclusive lock, used for: # - holding while doing roster queries / updates - # - holding while the UpgradableLock itself is exclusive + # - holding while the Lock instance itself is exclusive self._lock = ExclusiveLock(path + '.exclusive', id=id, timeout=timeout) def __enter__(self): diff --git a/borg/repository.py b/borg/repository.py index 5cf0a4f71..525f3abeb 100644 --- a/borg/repository.py +++ b/borg/repository.py @@ -14,7 +14,7 @@ from zlib import crc32 import msgpack from .helpers import Error, ErrorWithTraceback, IntegrityError, Location, ProgressIndicatorPercent from .hashindex import NSIndex -from .locking import UpgradableLock, LockError, LockErrorT +from .locking import Lock, LockError, LockErrorT from .lrucache import LRUCache from .platform import sync_dir @@ -161,14 +161,14 @@ class Repository: return self.get_index_transaction_id() def break_lock(self): - UpgradableLock(os.path.join(self.path, 'lock')).break_lock() + Lock(os.path.join(self.path, 'lock')).break_lock() def open(self, path, exclusive, lock_wait=None, lock=True): self.path = path if not os.path.isdir(path): raise self.DoesNotExist(path) if lock: - self.lock = UpgradableLock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait).acquire() + self.lock = Lock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait).acquire() else: self.lock = None self.config = ConfigParser(interpolation=None) diff --git a/borg/testsuite/locking.py b/borg/testsuite/locking.py index b219e98b1..fcb21f1df 100644 --- a/borg/testsuite/locking.py +++ b/borg/testsuite/locking.py @@ -2,7 +2,7 @@ import time import pytest -from ..locking import get_id, TimeoutTimer, ExclusiveLock, UpgradableLock, LockRoster, \ +from ..locking import get_id, TimeoutTimer, ExclusiveLock, Lock, LockRoster, \ ADD, REMOVE, SHARED, EXCLUSIVE, LockTimeout @@ -58,36 +58,36 @@ class TestExclusiveLock: ExclusiveLock(lockpath, id=ID2, timeout=0.1).acquire() -class TestUpgradableLock: +class TestLock: def test_shared(self, lockpath): - lock1 = UpgradableLock(lockpath, exclusive=False, id=ID1).acquire() - lock2 = UpgradableLock(lockpath, exclusive=False, id=ID2).acquire() + lock1 = Lock(lockpath, exclusive=False, id=ID1).acquire() + lock2 = Lock(lockpath, exclusive=False, id=ID2).acquire() assert len(lock1._roster.get(SHARED)) == 2 assert len(lock1._roster.get(EXCLUSIVE)) == 0 lock1.release() lock2.release() def test_exclusive(self, lockpath): - with UpgradableLock(lockpath, exclusive=True, id=ID1) as lock: + with Lock(lockpath, exclusive=True, id=ID1) as lock: assert len(lock._roster.get(SHARED)) == 0 assert len(lock._roster.get(EXCLUSIVE)) == 1 def test_upgrade(self, lockpath): - with UpgradableLock(lockpath, exclusive=False) as lock: + with Lock(lockpath, exclusive=False) as lock: lock.upgrade() lock.upgrade() # NOP assert len(lock._roster.get(SHARED)) == 0 assert len(lock._roster.get(EXCLUSIVE)) == 1 def test_downgrade(self, lockpath): - with UpgradableLock(lockpath, exclusive=True) as lock: + with Lock(lockpath, exclusive=True) as lock: lock.downgrade() lock.downgrade() # NOP assert len(lock._roster.get(SHARED)) == 1 assert len(lock._roster.get(EXCLUSIVE)) == 0 def test_got_exclusive_lock(self, lockpath): - lock = UpgradableLock(lockpath, exclusive=True, id=ID1) + lock = Lock(lockpath, exclusive=True, id=ID1) assert not lock.got_exclusive_lock() lock.acquire() assert lock.got_exclusive_lock() @@ -95,23 +95,23 @@ class TestUpgradableLock: assert not lock.got_exclusive_lock() def test_break(self, lockpath): - lock = UpgradableLock(lockpath, exclusive=True, id=ID1).acquire() + lock = Lock(lockpath, exclusive=True, id=ID1).acquire() lock.break_lock() assert len(lock._roster.get(SHARED)) == 0 assert len(lock._roster.get(EXCLUSIVE)) == 0 - with UpgradableLock(lockpath, exclusive=True, id=ID2): + with Lock(lockpath, exclusive=True, id=ID2): pass def test_timeout(self, lockpath): - with UpgradableLock(lockpath, exclusive=False, id=ID1): + with Lock(lockpath, exclusive=False, id=ID1): with pytest.raises(LockTimeout): - UpgradableLock(lockpath, exclusive=True, id=ID2, timeout=0.1).acquire() - with UpgradableLock(lockpath, exclusive=True, id=ID1): + Lock(lockpath, exclusive=True, id=ID2, timeout=0.1).acquire() + with Lock(lockpath, exclusive=True, id=ID1): with pytest.raises(LockTimeout): - UpgradableLock(lockpath, exclusive=False, id=ID2, timeout=0.1).acquire() - with UpgradableLock(lockpath, exclusive=True, id=ID1): + Lock(lockpath, exclusive=False, id=ID2, timeout=0.1).acquire() + with Lock(lockpath, exclusive=True, id=ID1): with pytest.raises(LockTimeout): - UpgradableLock(lockpath, exclusive=True, id=ID2, timeout=0.1).acquire() + Lock(lockpath, exclusive=True, id=ID2, timeout=0.1).acquire() @pytest.fixture() diff --git a/borg/testsuite/repository.py b/borg/testsuite/repository.py index b72e80414..e034cf9e1 100644 --- a/borg/testsuite/repository.py +++ b/borg/testsuite/repository.py @@ -6,7 +6,7 @@ from unittest.mock import patch from ..hashindex import NSIndex from ..helpers import Location, IntegrityError -from ..locking import UpgradableLock, LockFailed +from ..locking import Lock, LockFailed from ..remote import RemoteRepository, InvalidRPCMethod from ..repository import Repository, LoggedIO, TAG_COMMIT from . import BaseTestCase diff --git a/borg/upgrader.py b/borg/upgrader.py index 75d9fbb46..f4327e340 100644 --- a/borg/upgrader.py +++ b/borg/upgrader.py @@ -7,7 +7,7 @@ import shutil import time from .helpers import get_keys_dir, get_cache_dir, ProgressIndicatorPercent -from .locking import UpgradableLock +from .locking import Lock from .repository import Repository, MAGIC from .key import KeyfileKey, KeyfileNotFoundError @@ -39,7 +39,7 @@ class AtticRepositoryUpgrader(Repository): shutil.copytree(self.path, backup, copy_function=os.link) logger.info("opening attic repository with borg and converting") # now lock the repo, after we have made the copy - self.lock = UpgradableLock(os.path.join(self.path, 'lock'), exclusive=True, timeout=1.0).acquire() + self.lock = Lock(os.path.join(self.path, 'lock'), exclusive=True, timeout=1.0).acquire() segments = [filename for i, filename in self.io.segment_iterator()] try: keyfile = self.find_attic_keyfile() @@ -48,8 +48,7 @@ class AtticRepositoryUpgrader(Repository): else: self.convert_keyfiles(keyfile, dryrun) # partial open: just hold on to the lock - self.lock = UpgradableLock(os.path.join(self.path, 'lock'), - exclusive=True).acquire() + self.lock = Lock(os.path.join(self.path, 'lock'), exclusive=True).acquire() try: self.convert_cache(dryrun) self.convert_repo_index(dryrun=dryrun, inplace=inplace) From 1e739fd52dbe417db225027baa41c8d60679d8d8 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 23 Jul 2016 16:16:56 +0200 Subject: [PATCH 4/6] fix local repo / upgrader tests --- borg/archiver.py | 10 +++---- borg/testsuite/archiver.py | 4 +-- borg/testsuite/repository.py | 53 +++++++++++++++++++++++++----------- borg/testsuite/upgrader.py | 2 +- 4 files changed, 45 insertions(+), 24 deletions(-) diff --git a/borg/archiver.py b/borg/archiver.py index 4c3bfca0a..3c921e962 100644 --- a/borg/archiver.py +++ b/borg/archiver.py @@ -134,7 +134,7 @@ class Archiver: pass return self.exit_code - @with_repository(exclusive='repair', manifest=False) + @with_repository(exclusive=True, manifest=False) def do_check(self, args, repository): """Check repository consistency""" if args.repair: @@ -174,7 +174,7 @@ class Archiver: key_new.change_passphrase() # option to change key protection passphrase, save return EXIT_SUCCESS - @with_repository(fake='dry_run') + @with_repository(fake='dry_run', exclusive=True) def do_create(self, args, repository, manifest=None, key=None): """Create new archive""" matcher = PatternMatcher(fallback=True) @@ -595,7 +595,7 @@ class Archiver: print(str(cache)) return self.exit_code - @with_repository() + @with_repository(exclusive=True) def do_prune(self, args, repository, manifest, key): """Prune repository archives according to specified rules""" if not any((args.hourly, args.daily, @@ -722,7 +722,7 @@ class Archiver: print("object %s fetched." % hex_id) return EXIT_SUCCESS - @with_repository(manifest=False) + @with_repository(manifest=False, exclusive=True) def do_debug_put_obj(self, args, repository): """put file(s) contents into the repository""" for path in args.paths: @@ -734,7 +734,7 @@ class Archiver: repository.commit() return EXIT_SUCCESS - @with_repository(manifest=False) + @with_repository(manifest=False, exclusive=True) def do_debug_delete_obj(self, args, repository): """delete the objects with the given IDs from the repo""" modified = False diff --git a/borg/testsuite/archiver.py b/borg/testsuite/archiver.py index ff83a225a..f1f70fcdc 100644 --- a/borg/testsuite/archiver.py +++ b/borg/testsuite/archiver.py @@ -236,7 +236,7 @@ class ArchiverTestCaseBase(BaseTestCase): self.cmd('create', self.repository_location + '::' + name, src_dir) def open_archive(self, name): - repository = Repository(self.repository_path) + repository = Repository(self.repository_path, exclusive=True) with repository: manifest, key = Manifest.load(repository) archive = Archive(repository, key, manifest, name) @@ -1288,7 +1288,7 @@ class ArchiverCheckTestCase(ArchiverTestCaseBase): def test_extra_chunks(self): self.cmd('check', self.repository_location, exit_code=0) - with Repository(self.repository_location) as repository: + with Repository(self.repository_location, exclusive=True) as repository: repository.put(b'01234567890123456789012345678901', b'xxxx') repository.commit() self.cmd('check', self.repository_location, exit_code=1) diff --git a/borg/testsuite/repository.py b/borg/testsuite/repository.py index e034cf9e1..a012f514a 100644 --- a/borg/testsuite/repository.py +++ b/borg/testsuite/repository.py @@ -12,11 +12,17 @@ from ..repository import Repository, LoggedIO, TAG_COMMIT from . import BaseTestCase +UNSPECIFIED = object() # for default values where we can't use None + + class RepositoryTestCaseBase(BaseTestCase): key_size = 32 + exclusive = True - def open(self, create=False): - return Repository(os.path.join(self.tmppath, 'repository'), create=create) + def open(self, create=False, exclusive=UNSPECIFIED): + if exclusive is UNSPECIFIED: + exclusive = self.exclusive + return Repository(os.path.join(self.tmppath, 'repository'), exclusive=exclusive, create=create) def setUp(self): self.tmppath = tempfile.mkdtemp() @@ -27,10 +33,10 @@ class RepositoryTestCaseBase(BaseTestCase): self.repository.close() shutil.rmtree(self.tmppath) - def reopen(self): + def reopen(self, exclusive=UNSPECIFIED): if self.repository: self.repository.close() - self.repository = self.open() + self.repository = self.open(exclusive=exclusive) class RepositoryTestCase(RepositoryTestCaseBase): @@ -156,17 +162,6 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase): self.assert_equal(len(self.repository), 3) self.assert_equal(self.repository.check(), True) - def test_replay_of_readonly_repository(self): - self.add_keys() - for name in os.listdir(self.repository.path): - if name.startswith('index.'): - os.unlink(os.path.join(self.repository.path, name)) - with patch.object(UpgradableLock, 'upgrade', side_effect=LockFailed) as upgrade: - self.reopen() - with self.repository: - self.assert_raises(LockFailed, lambda: len(self.repository)) - upgrade.assert_called_once_with() - def test_crash_before_write_index(self): self.add_keys() self.repository.write_index = None @@ -179,6 +174,32 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase): self.assert_equal(len(self.repository), 3) self.assert_equal(self.repository.check(), True) + def test_replay_lock_upgrade_old(self): + self.add_keys() + for name in os.listdir(self.repository.path): + if name.startswith('index.'): + os.unlink(os.path.join(self.repository.path, name)) + with patch.object(Lock, 'upgrade', side_effect=LockFailed) as upgrade: + self.reopen(exclusive=None) # simulate old client that always does lock upgrades + with self.repository: + # the repo is only locked by a shared read lock, but to replay segments, + # we need an exclusive write lock - check if the lock gets upgraded. + self.assert_raises(LockFailed, lambda: len(self.repository)) + upgrade.assert_called_once_with() + + def test_replay_lock_upgrade(self): + self.add_keys() + for name in os.listdir(self.repository.path): + if name.startswith('index.'): + os.unlink(os.path.join(self.repository.path, name)) + with patch.object(Lock, 'upgrade', side_effect=LockFailed) as upgrade: + self.reopen(exclusive=False) # current client usually does not do lock upgrade, except for replay + with self.repository: + # the repo is only locked by a shared read lock, but to replay segments, + # we need an exclusive write lock - check if the lock gets upgraded. + self.assert_raises(LockFailed, lambda: len(self.repository)) + upgrade.assert_called_once_with() + def test_crash_before_deleting_compacted_segments(self): self.add_keys() self.repository.io.delete_segment = None @@ -202,7 +223,7 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase): class RepositoryAppendOnlyTestCase(RepositoryTestCaseBase): def open(self, create=False): - return Repository(os.path.join(self.tmppath, 'repository'), create=create, append_only=True) + return Repository(os.path.join(self.tmppath, 'repository'), exclusive=True, create=create, append_only=True) def test_destroy_append_only(self): # Can't destroy append only repo (via the API) diff --git a/borg/testsuite/upgrader.py b/borg/testsuite/upgrader.py index 26c34a3c6..013a8d002 100644 --- a/borg/testsuite/upgrader.py +++ b/borg/testsuite/upgrader.py @@ -23,7 +23,7 @@ def repo_valid(path): :param path: the path to the repository :returns: if borg can check the repository """ - with Repository(str(path), create=False) as repository: + with Repository(str(path), exclusive=True, create=False) as repository: # can't check raises() because check() handles the error return repository.check() From 64dcbbfdd06fc2187ef2d3d57bd61bbd3af8f371 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 23 Jul 2016 18:22:07 +0200 Subject: [PATCH 5/6] change RPC API, fix remote repo tests --- borg/archiver.py | 4 ++-- borg/remote.py | 25 ++++++++++++------------- borg/testsuite/repository.py | 6 ++++-- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/borg/archiver.py b/borg/archiver.py index 3c921e962..bfd56bf0b 100644 --- a/borg/archiver.py +++ b/borg/archiver.py @@ -68,8 +68,8 @@ def with_repository(fake=False, create=False, lock=True, exclusive=False, manife if argument(args, fake): return method(self, args, repository=None, **kwargs) elif location.proto == 'ssh': - repository = RemoteRepository(location, create=create, lock_wait=self.lock_wait, lock=lock, - append_only=append_only, args=args) + repository = RemoteRepository(location, create=create, exclusive=argument(args, exclusive), + lock_wait=self.lock_wait, lock=lock, append_only=append_only, args=args) else: repository = Repository(location.path, create=create, exclusive=argument(args, exclusive), lock_wait=self.lock_wait, lock=lock, diff --git a/borg/remote.py b/borg/remote.py index daaa021cf..8d1bf95ea 100644 --- a/borg/remote.py +++ b/borg/remote.py @@ -114,7 +114,7 @@ class RepositoryServer: # pragma: no cover def negotiate(self, versions): return RPC_PROTOCOL_VERSION - def open(self, path, create=False, lock_wait=None, lock=True, append_only=False): + def open(self, path, create=False, lock_wait=None, lock=True, exclusive=False, append_only=False): path = os.fsdecode(path) if path.startswith('/~'): path = path[1:] @@ -125,7 +125,9 @@ class RepositoryServer: # pragma: no cover break else: raise PathNotAllowed(path) - self.repository = Repository(path, create, lock_wait=lock_wait, lock=lock, append_only=self.append_only or append_only) + self.repository = Repository(path, create, lock_wait=lock_wait, lock=lock, + append_only=self.append_only or append_only, + exclusive=exclusive) self.repository.__enter__() # clean exit handled by serve() method return self.repository.id @@ -141,7 +143,7 @@ class RemoteRepository: class NoAppendOnlyOnServer(Error): """Server does not support --append-only.""" - def __init__(self, location, create=False, lock_wait=None, lock=True, append_only=False, args=None): + def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False, args=None): self.location = self._location = location self.preload_ids = [] self.msgid = 0 @@ -178,16 +180,13 @@ class RemoteRepository: raise ConnectionClosedWithHint('Is borg working on the server?') from None if version != RPC_PROTOCOL_VERSION: raise Exception('Server insisted on using unsupported protocol version %d' % version) - # Because of protocol versions, only send append_only if necessary - if append_only: - try: - self.id = self.call('open', self.location.path, create, lock_wait, lock, append_only) - except self.RPCError as err: - if err.remote_type == 'TypeError': - raise self.NoAppendOnlyOnServer() from err - else: - raise - else: + try: + self.id = self.call('open', self.location.path, create, lock_wait, lock, exclusive, append_only) + except self.RPCError as err: + if err.remote_type != 'TypeError': + raise + if append_only: + raise self.NoAppendOnlyOnServer() self.id = self.call('open', self.location.path, create, lock_wait, lock) except Exception: self.close() diff --git a/borg/testsuite/repository.py b/borg/testsuite/repository.py index a012f514a..bc08e097f 100644 --- a/borg/testsuite/repository.py +++ b/borg/testsuite/repository.py @@ -386,7 +386,8 @@ class RepositoryCheckTestCase(RepositoryTestCaseBase): class RemoteRepositoryTestCase(RepositoryTestCase): def open(self, create=False): - return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')), create=create) + return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')), + exclusive=True, create=create) def test_invalid_rpc(self): self.assert_raises(InvalidRPCMethod, lambda: self.repository.call('__init__', None)) @@ -415,7 +416,8 @@ class RemoteRepositoryTestCase(RepositoryTestCase): class RemoteRepositoryCheckTestCase(RepositoryCheckTestCase): def open(self, create=False): - return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')), create=create) + return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')), + exclusive=True, create=create) def test_crash_before_compact(self): # skip this test, we can't mock-patch a Repository class in another process! From 33e334820824c735d5e19486467abf17a2059d17 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Wed, 3 Aug 2016 00:34:22 +0200 Subject: [PATCH 6/6] locking: better differentiate new vs. old clients, lock upgrade for replay old clients use self.exclusive = None and do a read->write lock upgrade when needed. new clients use self.exclusive = True/False and never upgrade. replay fakes an old client by setting self.exclusive = None to get a lock upgrade if needed. --- borg/remote.py | 2 +- borg/repository.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/borg/remote.py b/borg/remote.py index 8d1bf95ea..47a20412b 100644 --- a/borg/remote.py +++ b/borg/remote.py @@ -114,7 +114,7 @@ class RepositoryServer: # pragma: no cover def negotiate(self, versions): return RPC_PROTOCOL_VERSION - def open(self, path, create=False, lock_wait=None, lock=True, exclusive=False, append_only=False): + def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, append_only=False): path = os.fsdecode(path) if path.startswith('/~'): path = path[1:] diff --git a/borg/repository.py b/borg/repository.py index 525f3abeb..66c0f6381 100644 --- a/borg/repository.py +++ b/borg/repository.py @@ -79,7 +79,7 @@ class Repository: if self.do_create: self.do_create = False self.create(self.path) - self.open(self.path, self.exclusive, lock_wait=self.lock_wait, lock=self.do_lock) + self.open(self.path, bool(self.exclusive), lock_wait=self.lock_wait, lock=self.do_lock) return self def __exit__(self, exc_type, exc_val, exc_tb): @@ -317,6 +317,9 @@ class Repository: self.compact = set() def replay_segments(self, index_transaction_id, segments_transaction_id): + # fake an old client, so that in case we do not have an exclusive lock yet, prepare_txn will upgrade the lock: + remember_exclusive = self.exclusive + self.exclusive = None self.prepare_txn(index_transaction_id, do_cleanup=False) try: segment_count = sum(1 for _ in self.io.segment_iterator()) @@ -332,6 +335,7 @@ class Repository: pi.finish() self.write_index() finally: + self.exclusive = remember_exclusive self.rollback() def _update_index(self, segment, objects, report=None):