Merge pull request #858 from enkore/issue/843

Auto-recover from corrupted index/hints file(s)
This commit is contained in:
TW 2016-05-30 17:15:23 +02:00
commit e69bc362e5
4 changed files with 118 additions and 9 deletions

View file

@ -1,9 +1,12 @@
# -*- coding: utf-8 -*-
from collections import namedtuple
import locale
import os
cimport cython
from libc.stdint cimport uint32_t, UINT32_MAX, uint64_t
from libc.errno cimport errno
from cpython.exc cimport PyErr_SetFromErrnoWithFilename
API_VERSION = 2
@ -52,6 +55,7 @@ MAX_VALUE = _MAX_VALUE
assert _MAX_VALUE % 2 == 1
@cython.internal
cdef class IndexBase:
cdef HashIndex *index
@ -63,7 +67,10 @@ cdef class IndexBase:
path = os.fsencode(path)
self.index = hashindex_read(path)
if not self.index:
raise Exception('hashindex_read failed')
if errno:
PyErr_SetFromErrnoWithFilename(OSError, path)
return
raise RuntimeError('hashindex_read failed')
else:
self.index = hashindex_init(capacity, self.key_size, self.value_size)
if not self.index:

View file

@ -65,6 +65,18 @@ class ErrorWithTraceback(Error):
traceback = True
class InternalOSError(Error):
"""Error while accessing repository: [Errno {}] {}: {}"""
def __init__(self, os_error):
self.errno = os_error.errno
self.strerror = os_error.strerror
self.filename = os_error.filename
def get_message(self):
return self.__doc__.format(self.errno, self.strerror, self.filename)
class IntegrityError(ErrorWithTraceback):
"""Data integrity error"""

View file

@ -15,7 +15,8 @@ from zlib import crc32
import msgpack
from .constants import * # NOQA
from .helpers import Error, ErrorWithTraceback, IntegrityError, Location, ProgressIndicatorPercent, bin_to_hex
from .helpers import Error, ErrorWithTraceback, IntegrityError, InternalOSError, Location, ProgressIndicatorPercent, \
bin_to_hex
from .hashindex import NSIndex
from .locking import UpgradableLock, LockError, LockErrorT
from .lrucache import LRUCache
@ -178,7 +179,7 @@ class Repository:
else:
return None
def get_transaction_id(self):
def check_transaction(self):
index_transaction_id = self.get_index_transaction_id()
segments_transaction_id = self.io.get_segments_transaction_id()
if index_transaction_id is not None and segments_transaction_id is None:
@ -191,6 +192,9 @@ class Repository:
else:
replay_from = index_transaction_id
self.replay_segments(replay_from, segments_transaction_id)
def get_transaction_id(self):
self.check_transaction()
return self.get_index_transaction_id()
def break_lock(self):
@ -231,10 +235,27 @@ class Repository:
self.write_index()
self.rollback()
def open_index(self, transaction_id):
def open_index(self, transaction_id, auto_recover=True):
if transaction_id is None:
return NSIndex()
return NSIndex.read((os.path.join(self.path, 'index.%d') % transaction_id).encode('utf-8'))
index_path = os.path.join(self.path, 'index.%d' % transaction_id).encode('utf-8')
try:
return NSIndex.read(index_path)
except RuntimeError as error:
assert str(error) == 'hashindex_read failed' # everything else means we're in *deep* trouble
logger.warning('Repository index missing or corrupted, trying to recover')
try:
os.unlink(index_path)
except OSError as e:
raise InternalOSError(e) from None
if not auto_recover:
raise
self.prepare_txn(self.get_transaction_id())
# don't leave an open transaction around
self.commit()
return self.open_index(self.get_transaction_id())
except OSError as e:
raise InternalOSError(e) from None
def prepare_txn(self, transaction_id, do_cleanup=True):
self._active_txn = True
@ -247,15 +268,33 @@ class Repository:
self._active_txn = False
raise
if not self.index or transaction_id is None:
self.index = self.open_index(transaction_id)
try:
self.index = self.open_index(transaction_id, False)
except RuntimeError:
self.check_transaction()
self.index = self.open_index(transaction_id, False)
if transaction_id is None:
self.segments = {} # XXX bad name: usage_count_of_segment_x = self.segments[x]
self.compact = FreeSpace() # XXX bad name: freeable_space_of_segment_x = self.compact[x]
else:
if do_cleanup:
self.io.cleanup(transaction_id)
with open(os.path.join(self.path, 'hints.%d' % transaction_id), 'rb') as fd:
hints = msgpack.unpack(fd)
hints_path = os.path.join(self.path, 'hints.%d' % transaction_id)
index_path = os.path.join(self.path, 'index.%d' % transaction_id)
try:
with open(hints_path, 'rb') as fd:
hints = msgpack.unpack(fd)
except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError) as e:
logger.warning('Repository hints file missing or corrupted, trying to recover')
if not isinstance(e, FileNotFoundError):
os.unlink(hints_path)
# index must exist at this point
os.unlink(index_path)
self.check_transaction()
self.prepare_txn(transaction_id)
return
except OSError as os_error:
raise InternalOSError(os_error) from None
if hints[b'version'] == 1:
logger.debug('Upgrading from v1 hints.%d', transaction_id)
self.segments = hints[b'segments']

View file

@ -7,7 +7,7 @@ import tempfile
from unittest.mock import patch
from ..hashindex import NSIndex
from ..helpers import Location, IntegrityError
from ..helpers import Location, IntegrityError, InternalOSError
from ..locking import UpgradableLock, LockFailed
from ..remote import RemoteRepository, InvalidRPCMethod, ConnectionClosedWithHint
from ..repository import Repository, LoggedIO, MAGIC
@ -270,6 +270,57 @@ class RepositoryAppendOnlyTestCase(RepositoryTestCaseBase):
assert segments_in_repository() == 6
class RepositoryAuxiliaryCorruptionTestCase(RepositoryTestCaseBase):
def setUp(self):
super().setUp()
self.repository.put(b'00000000000000000000000000000000', b'foo')
self.repository.commit()
self.repository.close()
def do_commit(self):
with self.repository:
self.repository.put(b'00000000000000000000000000000000', b'fox')
self.repository.commit()
def test_corrupted_hints(self):
with open(os.path.join(self.repository.path, 'hints.1'), 'ab') as fd:
fd.write(b'123456789')
self.do_commit()
def test_deleted_hints(self):
os.unlink(os.path.join(self.repository.path, 'hints.1'))
self.do_commit()
def test_deleted_index(self):
os.unlink(os.path.join(self.repository.path, 'index.1'))
self.do_commit()
def test_unreadable_hints(self):
hints = os.path.join(self.repository.path, 'hints.1')
os.unlink(hints)
os.mkdir(hints)
with self.assert_raises(InternalOSError):
self.do_commit()
def test_index(self):
with open(os.path.join(self.repository.path, 'index.1'), 'wb') as fd:
fd.write(b'123456789')
self.do_commit()
def test_index_outside_transaction(self):
with open(os.path.join(self.repository.path, 'index.1'), 'wb') as fd:
fd.write(b'123456789')
with self.repository:
assert len(self.repository) == 1
def test_unreadable_index(self):
index = os.path.join(self.repository.path, 'index.1')
os.unlink(index)
os.mkdir(index)
with self.assert_raises(InternalOSError):
self.do_commit()
class RepositoryCheckTestCase(RepositoryTestCaseBase):
def list_indices(self):