Repository: compact v2

- Track free space information for each sparse segment
- Don't compact large segments with little free space
This commit is contained in:
Marian Beermann 2016-05-21 18:45:47 +02:00
parent 4302ffbdbc
commit f99ad4ca6f

View file

@ -9,6 +9,8 @@ logger = logging.getLogger(__name__)
import os
import shutil
import struct
from collections import defaultdict
from functools import partial
from zlib import crc32
import msgpack
@ -26,6 +28,8 @@ TAG_PUT = 0
TAG_DELETE = 1
TAG_COMMIT = 2
FreeSpace = partial(defaultdict, int)
class Repository:
"""Filesystem based transactional key value store
@ -210,21 +214,30 @@ class Repository:
self.index = self.open_index(transaction_id)
if transaction_id is None:
self.segments = {} # XXX bad name: usage_count_of_segment_x = self.segments[x]
self.compact = set() # XXX bad name: segments_needing_compaction = self.compact
self.compact = FreeSpace() # XXX bad name: freeable_space_of_segment_x = self.compact[x]
else:
if do_cleanup:
self.io.cleanup(transaction_id)
with open(os.path.join(self.path, 'hints.%d' % transaction_id), 'rb') as fd:
hints = msgpack.unpack(fd)
if hints[b'version'] != 1:
raise ValueError('Unknown hints file version: %d' % hints['version'])
self.segments = hints[b'segments']
self.compact = set(hints[b'compact'])
if hints[b'version'] == 1:
logger.debug('Upgrading from v1 hints.%d', transaction_id)
self.segments = hints[b'segments']
self.compact = FreeSpace()
for segment in sorted(hints[b'compact']):
logger.debug('Rebuilding sparse info for segment %d', segment)
self._rebuild_sparse(segment)
logger.debug('Upgrade to v2 hints complete')
elif hints[b'version'] != 2:
raise ValueError('Unknown hints file version: %d' % hints[b'version'])
else:
self.segments = hints[b'segments']
self.compact = FreeSpace(hints[b'compact'])
def write_index(self):
hints = {b'version': 1,
hints = {b'version': 2,
b'segments': self.segments,
b'compact': list(self.compact)}
b'compact': self.compact}
transaction_id = self.io.get_segments_transaction_id()
hints_file = os.path.join(self.path, 'hints.%d' % transaction_id)
with open(hints_file + '.tmp', 'wb') as fd:
@ -238,10 +251,10 @@ class Repository:
if self.append_only:
with open(os.path.join(self.path, 'transactions'), 'a') as log:
print('transaction %d, UTC time %s' % (transaction_id, datetime.utcnow().isoformat()), file=log)
# Remove old indices
# Remove old auxiliary files
current = '.%d' % transaction_id
for name in os.listdir(self.path):
if not name.startswith('index.') and not name.startswith('hints.'):
if not name.startswith(('index.', 'hints.')):
continue
if name.endswith(current):
continue
@ -267,32 +280,40 @@ class Repository:
for segment in unused:
assert self.segments.pop(segment) == 0
self.io.delete_segment(segment)
del self.compact[segment]
unused = []
for segment in sorted(self.compact):
if self.io.segment_exists(segment):
for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset):
for segment, freeable_space in sorted(self.compact.items()):
if not self.io.segment_exists(segment):
del self.compact[segment]
continue
segment_size = self.io.segment_size(segment)
if segment_size > 0.2 * self.max_segment_size and freeable_space < 0.15 * segment_size:
logger.debug('not compacting segment %d for later (only %d bytes are sparse)',
segment, freeable_space)
continue
segments.setdefault(segment, 0)
for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset):
try:
new_segment, offset = self.io.write_put(key, data, raise_full=save_space)
except LoggedIO.SegmentFull:
complete_xfer()
new_segment, offset = self.io.write_put(key, data)
self.index[key] = new_segment, offset
segments.setdefault(new_segment, 0)
segments[new_segment] += 1
segments[segment] -= 1
elif tag == TAG_DELETE:
if index_transaction_id is None or segment > index_transaction_id:
try:
new_segment, offset = self.io.write_put(key, data, raise_full=save_space)
self.io.write_delete(key, raise_full=save_space)
except LoggedIO.SegmentFull:
complete_xfer()
new_segment, offset = self.io.write_put(key, data)
self.index[key] = new_segment, offset
segments.setdefault(new_segment, 0)
segments[new_segment] += 1
segments[segment] -= 1
elif tag == TAG_DELETE:
if index_transaction_id is None or segment > index_transaction_id:
try:
self.io.write_delete(key, raise_full=save_space)
except LoggedIO.SegmentFull:
complete_xfer()
self.io.write_delete(key)
assert segments[segment] == 0
unused.append(segment)
self.io.write_delete(key)
assert segments[segment] == 0
unused.append(segment)
complete_xfer()
self.compact = set()
def replay_segments(self, index_transaction_id, segments_transaction_id):
self.prepare_txn(index_transaction_id, do_cleanup=False)
@ -315,11 +336,12 @@ class Repository:
def _update_index(self, segment, objects, report=None):
"""some code shared between replay_segments and check"""
self.segments[segment] = 0
for tag, key, offset in objects:
for tag, key, offset, size in objects:
if tag == TAG_PUT:
try:
# If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space
s, _ = self.index[key]
self.compact.add(s)
self.compact[s] += size
self.segments[s] -= 1
except KeyError:
pass
@ -327,12 +349,17 @@ class Repository:
self.segments[segment] += 1
elif tag == TAG_DELETE:
try:
s, _ = self.index.pop(key)
self.segments[s] -= 1
self.compact.add(s)
# if the deleted PUT is not in the index, there is nothing to clean up
s, offset = self.index.pop(key)
except KeyError:
pass
self.compact.add(segment)
else:
if self.io.segment_exists(s):
# the old index is not necessarily valid for this transaction (e.g. compaction); if the segment
# is already gone, then it was already compacted.
self.segments[s] -= 1
size = len(self.io.read(s, offset, key))
self.compact[s] += size
elif tag == TAG_COMMIT:
continue
else:
@ -342,7 +369,22 @@ class Repository:
else:
report(msg)
if self.segments[segment] == 0:
self.compact.add(segment)
self.compact[segment] += self.io.segment_size(segment)
def _rebuild_sparse(self, segment):
"""Rebuild sparse bytes count for a single segment relative to the current index."""
self.compact[segment] = 0
if self.segments[segment] == 0:
self.compact[segment] += self.io.segment_size(segment)
return
for tag, key, offset, size in self.io.iter_objects(segment):
if tag == TAG_PUT:
if self.index.get(key, (-1, -1)) != (segment, offset):
# This PUT is superseded later
self.compact[segment] += size
elif tag == TAG_DELETE:
# The outcome of the DELETE has been recorded in the PUT branch already
self.compact[segment] += size
def check(self, repair=False, save_space=False):
"""Check repository consistency
@ -457,14 +499,16 @@ class Repository:
if not self._active_txn:
self.prepare_txn(self.get_transaction_id())
try:
segment, _ = self.index[id]
self.segments[segment] -= 1
self.compact.add(segment)
segment = self.io.write_delete(id)
self.segments.setdefault(segment, 0)
self.compact.add(segment)
segment, offset = self.index[id]
except KeyError:
pass
else:
self.segments[segment] -= 1
size = len(self.io.read(segment, offset, id))
self.compact[segment] += size
segment, size = self.io.write_delete(id)
self.compact[segment] += size
self.segments.setdefault(segment, 0)
segment, offset = self.io.write_put(id, data)
self.segments.setdefault(segment, 0)
self.segments[segment] += 1
@ -478,9 +522,10 @@ class Repository:
except KeyError:
raise self.ObjectNotFound(id, self.path) from None
self.segments[segment] -= 1
self.compact.add(segment)
segment = self.io.write_delete(id)
self.compact.add(segment)
size = len(self.io.read(segment, offset, id))
self.compact[segment] += size
segment, size = self.io.write_delete(id)
self.compact[segment] += size
self.segments.setdefault(segment, 0)
def preload(self, ids):
@ -578,7 +623,7 @@ class LoggedIO:
seen_commit = False
while True:
try:
tag, key, offset = next(iterator)
tag, key, offset, _ = next(iterator)
except IntegrityError:
return False
except StopIteration:
@ -635,6 +680,9 @@ class LoggedIO:
def segment_exists(self, segment):
return os.path.exists(self.segment_filename(segment))
def segment_size(self, segment):
return os.path.getsize(self.segment_filename(segment))
def iter_objects(self, segment, include_data=False):
fd = self.get_fd(segment)
fd.seek(0)
@ -648,7 +696,7 @@ class LoggedIO:
if include_data:
yield tag, key, offset, data
else:
yield tag, key, offset
yield tag, key, offset, size
offset += size
header = fd.read(self.header_fmt.size)
@ -732,7 +780,7 @@ class LoggedIO:
crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
fd.write(b''.join((crc, header, id)))
self.offset += self.put_header_fmt.size
return self.segment
return self.segment, self.put_header_fmt.size
def write_commit(self):
self.close_segment()