Merge pull request #1041 from enkore/feature/newcompact

Repository: compact v2
This commit is contained in:
TW 2016-05-27 22:06:48 +02:00
commit f51715223a
2 changed files with 218 additions and 65 deletions

View file

@ -9,6 +9,8 @@ logger = logging.getLogger(__name__)
import os
import shutil
import struct
from collections import defaultdict
from functools import partial
from zlib import crc32
import msgpack
@ -26,17 +28,55 @@ TAG_PUT = 0
TAG_DELETE = 1
TAG_COMMIT = 2
FreeSpace = partial(defaultdict, int)
class Repository:
"""Filesystem based transactional key value store
"""
Filesystem based transactional key value store
Transactionality is achieved by using a log (aka journal) to record changes. The log is a series of numbered files
called segments. Each segment is a series of log entries. The segment number together with the offset of each
entry relative to its segment start establishes an ordering of the log entries. This is the "definition" of
time for the purposes of the log.
Log entries are either PUT, DELETE or COMMIT.
A COMMIT is always the final log entry in a segment and marks all data from the beginning of the log until the
segment ending with the COMMIT as committed and consistent. The segment number of a segment ending with a COMMIT
is called the transaction ID of that commit, and a segment ending with a COMMIT is called committed.
When reading from a repository it is first checked whether the last segment is committed. If it is not, then
all segments after the last committed segment are deleted; they contain log entries whose consistency is not
established by a COMMIT.
Note that the COMMIT can't establish consistency by itself, but only manages to do so with proper support from
the platform (including the hardware). See platform_base.SyncFile for details.
A PUT inserts a key-value pair. The value is stored in the log entry, hence the repository implements
full data logging, meaning that all data is consistent, not just metadata (which is common in file systems).
A DELETE marks a key as deleted.
For a given key only the last entry regarding the key, which is called current (all other entries are called
superseded), is relevant: If there is no entry or the last entry is a DELETE then the key does not exist.
Otherwise the last PUT defines the value of the key.
By superseding a PUT (with either another PUT or a DELETE) the log entry becomes obsolete. A segment containing
such obsolete entries is called sparse, while a segment containing no such entries is called compact.
Sparse segments can be compacted and thereby disk space freed. This destroys the transaction for which the
superseded entries where current.
On disk layout:
dir/README
dir/config
dir/data/<X // SEGMENTS_PER_DIR>/<X>
dir/index.X
dir/hints.X
"""
class DoesNotExist(Error):
"""Repository {} does not exist."""
@ -210,21 +250,30 @@ class Repository:
self.index = self.open_index(transaction_id)
if transaction_id is None:
self.segments = {} # XXX bad name: usage_count_of_segment_x = self.segments[x]
self.compact = set() # XXX bad name: segments_needing_compaction = self.compact
self.compact = FreeSpace() # XXX bad name: freeable_space_of_segment_x = self.compact[x]
else:
if do_cleanup:
self.io.cleanup(transaction_id)
with open(os.path.join(self.path, 'hints.%d' % transaction_id), 'rb') as fd:
hints = msgpack.unpack(fd)
if hints[b'version'] != 1:
raise ValueError('Unknown hints file version: %d' % hints['version'])
self.segments = hints[b'segments']
self.compact = set(hints[b'compact'])
if hints[b'version'] == 1:
logger.debug('Upgrading from v1 hints.%d', transaction_id)
self.segments = hints[b'segments']
self.compact = FreeSpace()
for segment in sorted(hints[b'compact']):
logger.debug('Rebuilding sparse info for segment %d', segment)
self._rebuild_sparse(segment)
logger.debug('Upgrade to v2 hints complete')
elif hints[b'version'] != 2:
raise ValueError('Unknown hints file version: %d' % hints[b'version'])
else:
self.segments = hints[b'segments']
self.compact = FreeSpace(hints[b'compact'])
def write_index(self):
hints = {b'version': 1,
hints = {b'version': 2,
b'segments': self.segments,
b'compact': list(self.compact)}
b'compact': self.compact}
transaction_id = self.io.get_segments_transaction_id()
hints_file = os.path.join(self.path, 'hints.%d' % transaction_id)
with open(hints_file + '.tmp', 'wb') as fd:
@ -238,10 +287,10 @@ class Repository:
if self.append_only:
with open(os.path.join(self.path, 'transactions'), 'a') as log:
print('transaction %d, UTC time %s' % (transaction_id, datetime.utcnow().isoformat()), file=log)
# Remove old indices
# Remove old auxiliary files
current = '.%d' % transaction_id
for name in os.listdir(self.path):
if not name.startswith('index.') and not name.startswith('hints.'):
if not name.startswith(('index.', 'hints.')):
continue
if name.endswith(current):
continue
@ -267,32 +316,40 @@ class Repository:
for segment in unused:
assert self.segments.pop(segment) == 0
self.io.delete_segment(segment)
del self.compact[segment]
unused = []
for segment in sorted(self.compact):
if self.io.segment_exists(segment):
for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset):
for segment, freeable_space in sorted(self.compact.items()):
if not self.io.segment_exists(segment):
del self.compact[segment]
continue
segment_size = self.io.segment_size(segment)
if segment_size > 0.2 * self.max_segment_size and freeable_space < 0.15 * segment_size:
logger.debug('not compacting segment %d for later (only %d bytes are sparse)',
segment, freeable_space)
continue
segments.setdefault(segment, 0)
for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset):
try:
new_segment, offset = self.io.write_put(key, data, raise_full=save_space)
except LoggedIO.SegmentFull:
complete_xfer()
new_segment, offset = self.io.write_put(key, data)
self.index[key] = new_segment, offset
segments.setdefault(new_segment, 0)
segments[new_segment] += 1
segments[segment] -= 1
elif tag == TAG_DELETE:
if index_transaction_id is None or segment > index_transaction_id:
try:
new_segment, offset = self.io.write_put(key, data, raise_full=save_space)
self.io.write_delete(key, raise_full=save_space)
except LoggedIO.SegmentFull:
complete_xfer()
new_segment, offset = self.io.write_put(key, data)
self.index[key] = new_segment, offset
segments.setdefault(new_segment, 0)
segments[new_segment] += 1
segments[segment] -= 1
elif tag == TAG_DELETE:
if index_transaction_id is None or segment > index_transaction_id:
try:
self.io.write_delete(key, raise_full=save_space)
except LoggedIO.SegmentFull:
complete_xfer()
self.io.write_delete(key)
assert segments[segment] == 0
unused.append(segment)
self.io.write_delete(key)
assert segments[segment] == 0
unused.append(segment)
complete_xfer()
self.compact = set()
def replay_segments(self, index_transaction_id, segments_transaction_id):
self.prepare_txn(index_transaction_id, do_cleanup=False)
@ -315,11 +372,12 @@ class Repository:
def _update_index(self, segment, objects, report=None):
"""some code shared between replay_segments and check"""
self.segments[segment] = 0
for tag, key, offset in objects:
for tag, key, offset, size in objects:
if tag == TAG_PUT:
try:
# If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space
s, _ = self.index[key]
self.compact.add(s)
self.compact[s] += size
self.segments[s] -= 1
except KeyError:
pass
@ -327,12 +385,17 @@ class Repository:
self.segments[segment] += 1
elif tag == TAG_DELETE:
try:
s, _ = self.index.pop(key)
self.segments[s] -= 1
self.compact.add(s)
# if the deleted PUT is not in the index, there is nothing to clean up
s, offset = self.index.pop(key)
except KeyError:
pass
self.compact.add(segment)
else:
if self.io.segment_exists(s):
# the old index is not necessarily valid for this transaction (e.g. compaction); if the segment
# is already gone, then it was already compacted.
self.segments[s] -= 1
size = self.io.read(s, offset, key, read_data=False)
self.compact[s] += size
elif tag == TAG_COMMIT:
continue
else:
@ -342,7 +405,22 @@ class Repository:
else:
report(msg)
if self.segments[segment] == 0:
self.compact.add(segment)
self.compact[segment] += self.io.segment_size(segment)
def _rebuild_sparse(self, segment):
"""Rebuild sparse bytes count for a single segment relative to the current index."""
self.compact[segment] = 0
if self.segments[segment] == 0:
self.compact[segment] += self.io.segment_size(segment)
return
for tag, key, offset, size in self.io.iter_objects(segment, read_data=False):
if tag == TAG_PUT:
if self.index.get(key, (-1, -1)) != (segment, offset):
# This PUT is superseded later
self.compact[segment] += size
elif tag == TAG_DELETE:
# The outcome of the DELETE has been recorded in the PUT branch already
self.compact[segment] += size
def check(self, repair=False, save_space=False):
"""Check repository consistency
@ -457,14 +535,16 @@ class Repository:
if not self._active_txn:
self.prepare_txn(self.get_transaction_id())
try:
segment, _ = self.index[id]
self.segments[segment] -= 1
self.compact.add(segment)
segment = self.io.write_delete(id)
self.segments.setdefault(segment, 0)
self.compact.add(segment)
segment, offset = self.index[id]
except KeyError:
pass
else:
self.segments[segment] -= 1
size = self.io.read(segment, offset, id, read_data=False)
self.compact[segment] += size
segment, size = self.io.write_delete(id)
self.compact[segment] += size
self.segments.setdefault(segment, 0)
segment, offset = self.io.write_put(id, data)
self.segments.setdefault(segment, 0)
self.segments[segment] += 1
@ -478,9 +558,10 @@ class Repository:
except KeyError:
raise self.ObjectNotFound(id, self.path) from None
self.segments[segment] -= 1
self.compact.add(segment)
segment = self.io.write_delete(id)
self.compact.add(segment)
size = self.io.read(segment, offset, id, read_data=False)
self.compact[segment] += size
segment, size = self.io.write_delete(id)
self.compact[segment] += size
self.segments.setdefault(segment, 0)
def preload(self, ids):
@ -578,7 +659,7 @@ class LoggedIO:
seen_commit = False
while True:
try:
tag, key, offset = next(iterator)
tag, key, offset, _ = next(iterator)
except IntegrityError:
return False
except StopIteration:
@ -635,7 +716,18 @@ class LoggedIO:
def segment_exists(self, segment):
return os.path.exists(self.segment_filename(segment))
def iter_objects(self, segment, include_data=False):
def segment_size(self, segment):
return os.path.getsize(self.segment_filename(segment))
def iter_objects(self, segment, include_data=False, read_data=True):
"""
Return object iterator for *segment*.
If read_data is False then include_data must be False as well.
Integrity checks are skipped: all data obtained from the iterator must be considered informational.
The iterator returns four-tuples of (tag, key, offset, data|size).
"""
fd = self.get_fd(segment)
fd.seek(0)
if fd.read(MAGIC_LEN) != MAGIC:
@ -644,11 +736,12 @@ class LoggedIO:
header = fd.read(self.header_fmt.size)
while header:
size, tag, key, data = self._read(fd, self.header_fmt, header, segment, offset,
(TAG_PUT, TAG_DELETE, TAG_COMMIT))
(TAG_PUT, TAG_DELETE, TAG_COMMIT),
read_data=read_data)
if include_data:
yield tag, key, offset, data
else:
yield tag, key, offset
yield tag, key, offset, size
offset += size
header = fd.read(self.header_fmt.size)
@ -672,19 +765,25 @@ class LoggedIO:
fd.write(data[:size])
data = data[size:]
def read(self, segment, offset, id):
def read(self, segment, offset, id, read_data=True):
"""
Read entry from *segment* at *offset* with *id*.
If read_data is False the size of the entry is returned instead and integrity checks are skipped.
The return value should thus be considered informational.
"""
if segment == self.segment and self._write_fd:
self._write_fd.sync()
fd = self.get_fd(segment)
fd.seek(offset)
header = fd.read(self.put_header_fmt.size)
size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ))
size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ), read_data)
if id != key:
raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
segment, offset))
return data
return data if read_data else size
def _read(self, fd, fmt, header, segment, offset, acceptable_tags):
def _read(self, fd, fmt, header, segment, offset, acceptable_tags, read_data=True):
# some code shared by read() and iter_objects()
try:
hdr_tuple = fmt.unpack(header)
@ -702,18 +801,32 @@ class LoggedIO:
raise IntegrityError('Invalid segment entry size [segment {}, offset {}]'.format(
segment, offset))
length = size - fmt.size
data = fd.read(length)
if len(data) != length:
raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
segment, offset, length, len(data)))
if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format(
segment, offset))
if read_data:
data = fd.read(length)
if len(data) != length:
raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
segment, offset, length, len(data)))
if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format(
segment, offset))
if key is None and tag in (TAG_PUT, TAG_DELETE):
key, data = data[:32], data[32:]
else:
if key is None and tag in (TAG_PUT, TAG_DELETE):
key = fd.read(32)
length -= 32
if len(key) != 32:
raise IntegrityError('Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
segment, offset, 32, len(key)))
oldpos = fd.tell()
seeked = fd.seek(length, os.SEEK_CUR) - oldpos
data = None
if seeked != length:
raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format(
segment, offset, length, seeked))
if tag not in acceptable_tags:
raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format(
segment, offset))
if key is None and tag in (TAG_PUT, TAG_DELETE):
key, data = data[:32], data[32:]
return size, tag, key, data
def write_put(self, id, data, raise_full=False):
@ -732,7 +845,7 @@ class LoggedIO:
crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
fd.write(b''.join((crc, header, id)))
self.offset += self.put_header_fmt.size
return self.segment
return self.segment, self.put_header_fmt.size
def write_commit(self):
self.close_segment()

View file

@ -10,7 +10,7 @@ from ..hashindex import NSIndex
from ..helpers import Location, IntegrityError
from ..locking import UpgradableLock, LockFailed
from ..remote import RemoteRepository, InvalidRPCMethod, ConnectionClosedWithHint
from ..repository import Repository, LoggedIO
from ..repository import Repository, LoggedIO, MAGIC
from . import BaseTestCase
@ -125,6 +125,46 @@ class RepositoryTestCase(RepositoryTestCaseBase):
self.assert_equal(len(self.repository.list(limit=50)), 50)
class LocalRepositoryTestCase(RepositoryTestCaseBase):
# test case that doesn't work with remote repositories
def _assert_sparse(self):
# The superseded 123456... PUT
assert self.repository.compact[0] == 41 + 9
# The DELETE issued by the superseding PUT (or issued directly)
assert self.repository.compact[2] == 41
self.repository._rebuild_sparse(0)
assert self.repository.compact[0] == 41 + 9
def test_sparse1(self):
self.repository.put(b'00000000000000000000000000000000', b'foo')
self.repository.put(b'00000000000000000000000000000001', b'123456789')
self.repository.commit()
self.repository.put(b'00000000000000000000000000000001', b'bar')
self._assert_sparse()
def test_sparse2(self):
self.repository.put(b'00000000000000000000000000000000', b'foo')
self.repository.put(b'00000000000000000000000000000001', b'123456789')
self.repository.commit()
self.repository.delete(b'00000000000000000000000000000001')
self._assert_sparse()
def test_sparse_delete(self):
self.repository.put(b'00000000000000000000000000000000', b'1245')
self.repository.delete(b'00000000000000000000000000000000')
self.repository.io._write_fd.sync()
# The on-line tracking works on a per-object basis...
assert self.repository.compact[0] == 41 + 41 + 4
self.repository._rebuild_sparse(0)
# ...while _rebuild_sparse can mark whole segments as completely sparse (which then includes the segment magic)
assert self.repository.compact[0] == 41 + 41 + 4 + len(MAGIC)
self.repository.commit()
assert 0 not in [segment for segment, _ in self.repository.io.segment_iterator()]
class RepositoryCommitTestCase(RepositoryTestCaseBase):
def add_keys(self):