diff --git a/attic/repository.py b/attic/repository.py index 28ed09e64..0150f2dba 100644 --- a/attic/repository.py +++ b/attic/repository.py @@ -45,7 +45,6 @@ class Repository(object): class CheckNeeded(Error): '''Inconsistency detected. Please run "attic check {}"''' - def __init__(self, path, create=False): self.path = path self.io = None @@ -88,6 +87,12 @@ class Repository(object): def get_transaction_id(self): index_transaction_id = self.get_index_transaction_id() segments_transaction_id = self.io.get_segments_transaction_id() + # Attempt to automatically rebuild index if we crashed between commit + # tag write and index save + if (index_transaction_id if index_transaction_id is not None else -1) < (segments_transaction_id if segments_transaction_id is not None else -1): + self.replay_segments(index_transaction_id, segments_transaction_id) + index_transaction_id = self.get_index_transaction_id() + if index_transaction_id != segments_transaction_id: raise self.CheckNeeded(self.path) return index_transaction_id @@ -127,14 +132,16 @@ class Repository(object): return {} return NSIndex((os.path.join(self.path, 'index.%d') % transaction_id).encode('utf-8'), readonly=True) - def get_index(self, transaction_id): + def get_index(self, transaction_id, do_cleanup=True): + self._active_txn = True self.lock.upgrade() if transaction_id is None: self.index = NSIndex.create(os.path.join(self.path, 'index.tmp').encode('utf-8')) self.segments = {} self.compact = set() else: - self.io.cleanup(transaction_id) + if do_cleanup: + self.io.cleanup(transaction_id) shutil.copy(os.path.join(self.path, 'index.%d' % transaction_id), os.path.join(self.path, 'index.tmp')) self.index = NSIndex(os.path.join(self.path, 'index.tmp').encode('utf-8')) @@ -161,6 +168,7 @@ class Repository(object): if name.endswith(current): continue os.unlink(os.path.join(self.path, name)) + self.index = None def compact_segments(self): """Compact sparse segments by copying data into new segments @@ -186,6 +194,41 @@ class Repository(object): self.io.delete_segment(segment) self.compact = set() + def replay_segments(self, index_transaction_id, segments_transaction_id): + self.get_index(index_transaction_id, do_cleanup=False) + for segment, filename in self.io.segment_iterator(): + if index_transaction_id is not None and segment <= index_transaction_id: + continue + if segment > segments_transaction_id: + break + self.segments[segment] = 0 + for tag, key, offset in self.io.iter_objects(segment): + if tag == TAG_PUT: + try: + s, _ = self.index[key] + self.compact.add(s) + self.segments[s] -= 1 + except KeyError: + pass + self.index[key] = segment, offset + self.segments[segment] += 1 + elif tag == TAG_DELETE: + try: + s, _ = self.index.pop(key) + except KeyError: + raise self.CheckNeeded(self.path) + self.segments[s] -= 1 + self.compact.add(s) + self.compact.add(segment) + elif tag == TAG_COMMIT: + continue + else: + raise self.CheckNeeded(self.path) + if self.segments[segment] == 0: + self.compact.add(segment) + self.write_index() + self.rollback() + def check(self, progress=False, repair=False): """Check repository consistency @@ -220,11 +263,6 @@ class Repository(object): for segment, filename in self.io.segment_iterator(): if segment > transaction_id: - if repair: - report_progress('Deleting uncommitted segment {}'.format(segment), error=True) - self.io.delete_segment(segment) - else: - report_progress('Uncommitted segment {} found'.format(segment), error=True) continue try: objects = list(self.io.iter_objects(segment)) @@ -241,7 +279,6 @@ class Repository(object): s, _ = self.index[key] self.compact.add(s) self.segments[s] -= 1 - report_progress('Key found in more than one segment. Segment={}, key={}'.format(segment, hexlify(key)), error=True) except KeyError: pass self.index[key] = segment, offset @@ -264,15 +301,19 @@ class Repository(object): self.io.segment = transaction_id + 1 self.io.write_commit() self.io.close_segment() - if current_index and len(current_index) != len(self.index): - report_progress('Index object count mismatch. {} != {}'.format(len(current_index), len(self.index)), error=True) + if current_index and not repair: + if len(current_index) != len(self.index) and False: + report_progress('Index object count mismatch. {} != {}'.format(len(current_index), len(self.index)), error=True) + elif current_index: + for key, value in self.index.iteritems(): + if current_index.get(key, (-1, -1)) != value: + report_progress('Index mismatch for key {}. {} != {}'.format(key, value, current_index.get(key, (-1, -1))), error=True) if not error_found: report_progress('Repository check complete, no problems found.') if repair: + self.compact_segments() self.write_index() else: - # Delete temporary index file - self.index = None os.unlink(os.path.join(self.path, 'index.tmp')) self.rollback() return not error_found or repair @@ -309,7 +350,6 @@ class Repository(object): def put(self, id, data, wait=True): if not self._active_txn: self.get_index(self.get_transaction_id()) - self._active_txn = True try: segment, _ = self.index[id] self.segments[segment] -= 1 @@ -327,7 +367,6 @@ class Repository(object): def delete(self, id, wait=True): if not self._active_txn: self.get_index(self.get_transaction_id()) - self._active_txn = True try: segment, offset = self.index.pop(id) self.segments[segment] -= 1 diff --git a/attic/testsuite/mock.py b/attic/testsuite/mock.py new file mode 100644 index 000000000..b7501ed6c --- /dev/null +++ b/attic/testsuite/mock.py @@ -0,0 +1,5 @@ +try: + # Only available in python 3.3+ + from unittest.mock import * +except ImportError: + from mock import * diff --git a/attic/testsuite/repository.py b/attic/testsuite/repository.py index c59761322..3d67e3c3c 100644 --- a/attic/testsuite/repository.py +++ b/attic/testsuite/repository.py @@ -1,14 +1,15 @@ import os import shutil import tempfile +from attic.testsuite.mock import patch from attic.hashindex import NSIndex -from attic.helpers import Location, IntegrityError +from attic.helpers import Location, IntegrityError, UpgradableLock from attic.remote import RemoteRepository from attic.repository import Repository from attic.testsuite import AtticTestCase -class RepositoryTestCase(AtticTestCase): +class RepositoryTestCaseBase(AtticTestCase): def open(self, create=False): return Repository(os.path.join(self.tmppath, 'repository'), create=create) @@ -21,6 +22,14 @@ class RepositoryTestCase(AtticTestCase): self.repository.close() shutil.rmtree(self.tmppath) + def reopen(self): + if self.repository: + self.repository.close() + self.repository = self.open() + + +class RepositoryTestCase(RepositoryTestCaseBase): + def test1(self): for x in range(100): self.repository.put(('%-32d' % x).encode('ascii'), b'SOMEDATA') @@ -101,23 +110,72 @@ class RepositoryTestCase(AtticTestCase): self.assert_equal(len(self.repository.list(limit=50)), 50) -class RepositoryCheckTestCase(AtticTestCase): +class RepositoryCommitTestCase(RepositoryTestCaseBase): - def open(self, create=False): - return Repository(os.path.join(self.tmppath, 'repository'), create=create) + def add_keys(self): + self.repository.put(b'00000000000000000000000000000000', b'foo') + self.repository.put(b'00000000000000000000000000000001', b'bar') + self.repository.commit() + self.repository.put(b'00000000000000000000000000000001', b'bar2') + self.repository.put(b'00000000000000000000000000000002', b'boo') - def reopen(self): - if self.repository: - self.repository.close() - self.repository = self.open() + def test_replay_of_missing_index(self): + self.add_keys() + for name in os.listdir(self.repository.path): + if name.startswith('index.'): + os.unlink(os.path.join(self.repository.path, name)) + self.reopen() + self.assert_equal(len(self.repository), 2) + self.assert_equal(self.repository.check(), True) - def setUp(self): - self.tmppath = tempfile.mkdtemp() - self.repository = self.open(create=True) + def test_crash_before_compact_segments(self): + self.add_keys() + self.repository.compact_segments = None + try: + self.repository.commit() + except TypeError: + pass + self.reopen() + self.assert_equal(len(self.repository), 3) + self.assert_equal(self.repository.check(), True) - def tearDown(self): - self.repository.close() - shutil.rmtree(self.tmppath) + def test_replay_of_readonly_repository(self): + self.add_keys() + for name in os.listdir(self.repository.path): + if name.startswith('index.'): + os.unlink(os.path.join(self.repository.path, name)) + with patch.object(UpgradableLock, 'upgrade', side_effect=UpgradableLock.LockUpgradeFailed) as upgrade: + self.reopen() + self.assert_raises(UpgradableLock.LockUpgradeFailed, lambda: len(self.repository)) + upgrade.assert_called_once() + + + def test_crash_before_write_index(self): + self.add_keys() + self.repository.write_index = None + try: + self.repository.commit() + except TypeError: + pass + self.reopen() + self.assert_equal(len(self.repository), 3) + self.assert_equal(self.repository.check(), True) + + def test_crash_before_deleting_compacted_segments(self): + self.add_keys() + self.repository.io.delete_segment = None + try: + self.repository.commit() + except TypeError: + pass + self.reopen() + self.assert_equal(len(self.repository), 3) + self.assert_equal(self.repository.check(), True) + self.assert_equal(len(self.repository), 3) + + + +class RepositoryCheckTestCase(RepositoryTestCaseBase): def list_indices(self): return [name for name in os.listdir(os.path.join(self.tmppath, 'repository')) if name.startswith('index.')] @@ -161,7 +219,7 @@ class RepositoryCheckTestCase(AtticTestCase): os.path.join(self.tmppath, 'repository', new_name)) def list_objects(self): - return set((int(key) for key, _ in list(self.open_index().iteritems()))) + return set(int(key) for key in self.repository.list()) def test_repair_corrupted_segment(self): self.add_objects([[1, 2, 3], [4, 5, 6]]) @@ -228,26 +286,12 @@ class RepositoryCheckTestCase(AtticTestCase): def test_repair_missing_index(self): self.add_objects([[1, 2, 3], [4, 5, 6]]) self.delete_index() - self.assert_raises(Repository.CheckNeeded, lambda: self.get_objects(4)) self.check(status=False) self.check(repair=True, status=True) self.check(status=True) self.get_objects(4) self.assert_equal(set([1, 2, 3, 4, 5, 6]), self.list_objects()) - def test_repair_index_too_old(self): - self.add_objects([[1, 2, 3], [4, 5, 6]]) - self.assert_equal(self.list_indices(), ['index.1']) - self.rename_index('index.0') - self.assert_equal(self.list_indices(), ['index.0']) - self.assert_raises(Repository.CheckNeeded, lambda: self.get_objects(4)) - self.check(status=False) - self.check(repair=True, status=True) - self.assert_equal(self.list_indices(), ['index.1']) - self.check(status=True) - self.get_objects(4) - self.assert_equal(set([1, 2, 3, 4, 5, 6]), self.list_objects()) - def test_repair_index_too_new(self): self.add_objects([[1, 2, 3], [4, 5, 6]]) self.assert_equal(self.list_indices(), ['index.1'])