From f99ad4ca6fc3db1987834f5b7e770bcf89d49fed Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sat, 21 May 2016 18:45:47 +0200 Subject: [PATCH 1/4] Repository: compact v2 - Track free space information for each sparse segment - Don't compact large segments with little free space --- borg/repository.py | 144 ++++++++++++++++++++++++++++++--------------- 1 file changed, 96 insertions(+), 48 deletions(-) diff --git a/borg/repository.py b/borg/repository.py index c94e5decf..7d4e972d1 100644 --- a/borg/repository.py +++ b/borg/repository.py @@ -9,6 +9,8 @@ logger = logging.getLogger(__name__) import os import shutil import struct +from collections import defaultdict +from functools import partial from zlib import crc32 import msgpack @@ -26,6 +28,8 @@ TAG_PUT = 0 TAG_DELETE = 1 TAG_COMMIT = 2 +FreeSpace = partial(defaultdict, int) + class Repository: """Filesystem based transactional key value store @@ -210,21 +214,30 @@ class Repository: self.index = self.open_index(transaction_id) if transaction_id is None: self.segments = {} # XXX bad name: usage_count_of_segment_x = self.segments[x] - self.compact = set() # XXX bad name: segments_needing_compaction = self.compact + self.compact = FreeSpace() # XXX bad name: freeable_space_of_segment_x = self.compact[x] else: if do_cleanup: self.io.cleanup(transaction_id) with open(os.path.join(self.path, 'hints.%d' % transaction_id), 'rb') as fd: hints = msgpack.unpack(fd) - if hints[b'version'] != 1: - raise ValueError('Unknown hints file version: %d' % hints['version']) - self.segments = hints[b'segments'] - self.compact = set(hints[b'compact']) + if hints[b'version'] == 1: + logger.debug('Upgrading from v1 hints.%d', transaction_id) + self.segments = hints[b'segments'] + self.compact = FreeSpace() + for segment in sorted(hints[b'compact']): + logger.debug('Rebuilding sparse info for segment %d', segment) + self._rebuild_sparse(segment) + logger.debug('Upgrade to v2 hints complete') + elif hints[b'version'] != 2: + raise ValueError('Unknown hints file version: %d' % hints[b'version']) + else: + self.segments = hints[b'segments'] + self.compact = FreeSpace(hints[b'compact']) def write_index(self): - hints = {b'version': 1, + hints = {b'version': 2, b'segments': self.segments, - b'compact': list(self.compact)} + b'compact': self.compact} transaction_id = self.io.get_segments_transaction_id() hints_file = os.path.join(self.path, 'hints.%d' % transaction_id) with open(hints_file + '.tmp', 'wb') as fd: @@ -238,10 +251,10 @@ class Repository: if self.append_only: with open(os.path.join(self.path, 'transactions'), 'a') as log: print('transaction %d, UTC time %s' % (transaction_id, datetime.utcnow().isoformat()), file=log) - # Remove old indices + # Remove old auxiliary files current = '.%d' % transaction_id for name in os.listdir(self.path): - if not name.startswith('index.') and not name.startswith('hints.'): + if not name.startswith(('index.', 'hints.')): continue if name.endswith(current): continue @@ -267,32 +280,40 @@ class Repository: for segment in unused: assert self.segments.pop(segment) == 0 self.io.delete_segment(segment) + del self.compact[segment] unused = [] - for segment in sorted(self.compact): - if self.io.segment_exists(segment): - for tag, key, offset, data in self.io.iter_objects(segment, include_data=True): - if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset): + for segment, freeable_space in sorted(self.compact.items()): + if not self.io.segment_exists(segment): + del self.compact[segment] + continue + segment_size = self.io.segment_size(segment) + if segment_size > 0.2 * self.max_segment_size and freeable_space < 0.15 * segment_size: + logger.debug('not compacting segment %d for later (only %d bytes are sparse)', + segment, freeable_space) + continue + segments.setdefault(segment, 0) + for tag, key, offset, data in self.io.iter_objects(segment, include_data=True): + if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset): + try: + new_segment, offset = self.io.write_put(key, data, raise_full=save_space) + except LoggedIO.SegmentFull: + complete_xfer() + new_segment, offset = self.io.write_put(key, data) + self.index[key] = new_segment, offset + segments.setdefault(new_segment, 0) + segments[new_segment] += 1 + segments[segment] -= 1 + elif tag == TAG_DELETE: + if index_transaction_id is None or segment > index_transaction_id: try: - new_segment, offset = self.io.write_put(key, data, raise_full=save_space) + self.io.write_delete(key, raise_full=save_space) except LoggedIO.SegmentFull: complete_xfer() - new_segment, offset = self.io.write_put(key, data) - self.index[key] = new_segment, offset - segments.setdefault(new_segment, 0) - segments[new_segment] += 1 - segments[segment] -= 1 - elif tag == TAG_DELETE: - if index_transaction_id is None or segment > index_transaction_id: - try: - self.io.write_delete(key, raise_full=save_space) - except LoggedIO.SegmentFull: - complete_xfer() - self.io.write_delete(key) - assert segments[segment] == 0 - unused.append(segment) + self.io.write_delete(key) + assert segments[segment] == 0 + unused.append(segment) complete_xfer() - self.compact = set() def replay_segments(self, index_transaction_id, segments_transaction_id): self.prepare_txn(index_transaction_id, do_cleanup=False) @@ -315,11 +336,12 @@ class Repository: def _update_index(self, segment, objects, report=None): """some code shared between replay_segments and check""" self.segments[segment] = 0 - for tag, key, offset in objects: + for tag, key, offset, size in objects: if tag == TAG_PUT: try: + # If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space s, _ = self.index[key] - self.compact.add(s) + self.compact[s] += size self.segments[s] -= 1 except KeyError: pass @@ -327,12 +349,17 @@ class Repository: self.segments[segment] += 1 elif tag == TAG_DELETE: try: - s, _ = self.index.pop(key) - self.segments[s] -= 1 - self.compact.add(s) + # if the deleted PUT is not in the index, there is nothing to clean up + s, offset = self.index.pop(key) except KeyError: pass - self.compact.add(segment) + else: + if self.io.segment_exists(s): + # the old index is not necessarily valid for this transaction (e.g. compaction); if the segment + # is already gone, then it was already compacted. + self.segments[s] -= 1 + size = len(self.io.read(s, offset, key)) + self.compact[s] += size elif tag == TAG_COMMIT: continue else: @@ -342,7 +369,22 @@ class Repository: else: report(msg) if self.segments[segment] == 0: - self.compact.add(segment) + self.compact[segment] += self.io.segment_size(segment) + + def _rebuild_sparse(self, segment): + """Rebuild sparse bytes count for a single segment relative to the current index.""" + self.compact[segment] = 0 + if self.segments[segment] == 0: + self.compact[segment] += self.io.segment_size(segment) + return + for tag, key, offset, size in self.io.iter_objects(segment): + if tag == TAG_PUT: + if self.index.get(key, (-1, -1)) != (segment, offset): + # This PUT is superseded later + self.compact[segment] += size + elif tag == TAG_DELETE: + # The outcome of the DELETE has been recorded in the PUT branch already + self.compact[segment] += size def check(self, repair=False, save_space=False): """Check repository consistency @@ -457,14 +499,16 @@ class Repository: if not self._active_txn: self.prepare_txn(self.get_transaction_id()) try: - segment, _ = self.index[id] - self.segments[segment] -= 1 - self.compact.add(segment) - segment = self.io.write_delete(id) - self.segments.setdefault(segment, 0) - self.compact.add(segment) + segment, offset = self.index[id] except KeyError: pass + else: + self.segments[segment] -= 1 + size = len(self.io.read(segment, offset, id)) + self.compact[segment] += size + segment, size = self.io.write_delete(id) + self.compact[segment] += size + self.segments.setdefault(segment, 0) segment, offset = self.io.write_put(id, data) self.segments.setdefault(segment, 0) self.segments[segment] += 1 @@ -478,9 +522,10 @@ class Repository: except KeyError: raise self.ObjectNotFound(id, self.path) from None self.segments[segment] -= 1 - self.compact.add(segment) - segment = self.io.write_delete(id) - self.compact.add(segment) + size = len(self.io.read(segment, offset, id)) + self.compact[segment] += size + segment, size = self.io.write_delete(id) + self.compact[segment] += size self.segments.setdefault(segment, 0) def preload(self, ids): @@ -578,7 +623,7 @@ class LoggedIO: seen_commit = False while True: try: - tag, key, offset = next(iterator) + tag, key, offset, _ = next(iterator) except IntegrityError: return False except StopIteration: @@ -635,6 +680,9 @@ class LoggedIO: def segment_exists(self, segment): return os.path.exists(self.segment_filename(segment)) + def segment_size(self, segment): + return os.path.getsize(self.segment_filename(segment)) + def iter_objects(self, segment, include_data=False): fd = self.get_fd(segment) fd.seek(0) @@ -648,7 +696,7 @@ class LoggedIO: if include_data: yield tag, key, offset, data else: - yield tag, key, offset + yield tag, key, offset, size offset += size header = fd.read(self.header_fmt.size) @@ -732,7 +780,7 @@ class LoggedIO: crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff) fd.write(b''.join((crc, header, id))) self.offset += self.put_header_fmt.size - return self.segment + return self.segment, self.put_header_fmt.size def write_commit(self): self.close_segment() From b8d1bc1ca891a65a7276d33488791a4878731a94 Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sat, 21 May 2016 19:17:14 +0200 Subject: [PATCH 2/4] Repository: don't read+verify old entries for size retrieval --- borg/repository.py | 67 +++++++++++++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 19 deletions(-) diff --git a/borg/repository.py b/borg/repository.py index 7d4e972d1..439c06511 100644 --- a/borg/repository.py +++ b/borg/repository.py @@ -358,7 +358,7 @@ class Repository: # the old index is not necessarily valid for this transaction (e.g. compaction); if the segment # is already gone, then it was already compacted. self.segments[s] -= 1 - size = len(self.io.read(s, offset, key)) + size = self.io.read(s, offset, key, read_data=False) self.compact[s] += size elif tag == TAG_COMMIT: continue @@ -377,7 +377,7 @@ class Repository: if self.segments[segment] == 0: self.compact[segment] += self.io.segment_size(segment) return - for tag, key, offset, size in self.io.iter_objects(segment): + for tag, key, offset, size in self.io.iter_objects(segment, read_data=False): if tag == TAG_PUT: if self.index.get(key, (-1, -1)) != (segment, offset): # This PUT is superseded later @@ -504,7 +504,7 @@ class Repository: pass else: self.segments[segment] -= 1 - size = len(self.io.read(segment, offset, id)) + size = self.io.read(segment, offset, id, read_data=False) self.compact[segment] += size segment, size = self.io.write_delete(id) self.compact[segment] += size @@ -522,7 +522,7 @@ class Repository: except KeyError: raise self.ObjectNotFound(id, self.path) from None self.segments[segment] -= 1 - size = len(self.io.read(segment, offset, id)) + size = self.io.read(segment, offset, id, read_data=False) self.compact[segment] += size segment, size = self.io.write_delete(id) self.compact[segment] += size @@ -683,7 +683,15 @@ class LoggedIO: def segment_size(self, segment): return os.path.getsize(self.segment_filename(segment)) - def iter_objects(self, segment, include_data=False): + def iter_objects(self, segment, include_data=False, read_data=True): + """ + Return object iterator for *segment*. + + If read_data is False then include_data must be False as well. + Integrity checks are skipped: all data obtained from the iterator must be considered informational. + + The iterator returns four-tuples of (tag, key, offset, data|size). + """ fd = self.get_fd(segment) fd.seek(0) if fd.read(MAGIC_LEN) != MAGIC: @@ -692,7 +700,8 @@ class LoggedIO: header = fd.read(self.header_fmt.size) while header: size, tag, key, data = self._read(fd, self.header_fmt, header, segment, offset, - (TAG_PUT, TAG_DELETE, TAG_COMMIT)) + (TAG_PUT, TAG_DELETE, TAG_COMMIT), + read_data=read_data) if include_data: yield tag, key, offset, data else: @@ -720,19 +729,25 @@ class LoggedIO: fd.write(data[:size]) data = data[size:] - def read(self, segment, offset, id): + def read(self, segment, offset, id, read_data=True): + """ + Read entry from *segment* at *offset* with *id*. + + If read_data is False the size of the entry is returned instead and integrity checks are skipped. + The return value should thus be considered informational. + """ if segment == self.segment and self._write_fd: self._write_fd.sync() fd = self.get_fd(segment) fd.seek(offset) header = fd.read(self.put_header_fmt.size) - size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, )) + size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ), read_data) if id != key: raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format( segment, offset)) - return data + return data if read_data else size - def _read(self, fd, fmt, header, segment, offset, acceptable_tags): + def _read(self, fd, fmt, header, segment, offset, acceptable_tags, read_data=True): # some code shared by read() and iter_objects() try: hdr_tuple = fmt.unpack(header) @@ -750,18 +765,32 @@ class LoggedIO: raise IntegrityError('Invalid segment entry size [segment {}, offset {}]'.format( segment, offset)) length = size - fmt.size - data = fd.read(length) - if len(data) != length: - raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format( - segment, offset, length, len(data))) - if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc: - raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format( - segment, offset)) + if read_data: + data = fd.read(length) + if len(data) != length: + raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format( + segment, offset, length, len(data))) + if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc: + raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format( + segment, offset)) + if key is None and tag in (TAG_PUT, TAG_DELETE): + key, data = data[:32], data[32:] + else: + if key is None and tag in (TAG_PUT, TAG_DELETE): + key = fd.read(32) + length -= 32 + if len(key) != 32: + raise IntegrityError('Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format( + segment, offset, 32, len(key))) + oldpos = fd.tell() + seeked = fd.seek(length, os.SEEK_CUR) - oldpos + data = None + if seeked != length: + raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format( + segment, offset, length, seeked)) if tag not in acceptable_tags: raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format( segment, offset)) - if key is None and tag in (TAG_PUT, TAG_DELETE): - key, data = data[:32], data[32:] return size, tag, key, data def write_put(self, id, data, raise_full=False): From 4619f781d8f0fba80d618ac28a3d1b8cc031fc05 Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Mon, 16 May 2016 21:15:07 +0200 Subject: [PATCH 3/4] Terms, principles of Repository --- borg/repository.py | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/borg/repository.py b/borg/repository.py index 439c06511..1620c8278 100644 --- a/borg/repository.py +++ b/borg/repository.py @@ -32,15 +32,51 @@ FreeSpace = partial(defaultdict, int) class Repository: - """Filesystem based transactional key value store + """ + Filesystem based transactional key value store + + Transactionality is achieved by using a log (aka journal) to record changes. The log is a series of numbered files + called segments. Each segment is a series of log entries. The segment number together with the offset of each + entry relative to its segment start establishes an ordering of the log entries. This is the "definition" of + time for the purposes of the log. + + Log entries are either PUT, DELETE or COMMIT. + + A COMMIT is always the final log entry in a segment and marks all data from the beginning of the log until the + segment ending with the COMMIT as committed and consistent. The segment number of a segment ending with a COMMIT + is called the transaction ID of that commit, and a segment ending with a COMMIT is called committed. + + When reading from a repository it is first checked whether the last segment is committed. If it is not, then + all segments after the last committed segment are deleted; they contain log entries whose consistency is not + established by a COMMIT. + + Note that the COMMIT can't establish consistency by itself, but only manages to do so with proper support from + the platform (including the hardware). See platform_base.SyncFile for details. + + A PUT inserts a key-value pair. The value is stored in the log entry, hence the repository implements + full data logging, meaning that all data is consistent, not just metadata (which is common in file systems). + + A DELETE marks a key as deleted. + + For a given key only the last entry regarding the key, which is called current (all other entries are called + superseded), is relevant: If there is no entry or the last entry is a DELETE then the key does not exist. + Otherwise the last PUT defines the value of the key. + + By superseding a PUT (with either another PUT or a DELETE) the log entry becomes obsolete. A segment containing + such obsolete entries is called sparse, while a segment containing no such entries is called compact. + + Sparse segments can be compacted and thereby disk space freed. This destroys the transaction for which the + superseded entries where current. On disk layout: + dir/README dir/config dir/data// dir/index.X dir/hints.X """ + class DoesNotExist(Error): """Repository {} does not exist.""" From 2806133902476cbb678bb2cddcffdf32f87e1d5f Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Wed, 25 May 2016 12:15:46 +0200 Subject: [PATCH 4/4] testsuite/repository: test .compact, _build_sparse --- borg/testsuite/repository.py | 42 +++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/borg/testsuite/repository.py b/borg/testsuite/repository.py index 71743bfb0..85f4af457 100644 --- a/borg/testsuite/repository.py +++ b/borg/testsuite/repository.py @@ -10,7 +10,7 @@ from ..hashindex import NSIndex from ..helpers import Location, IntegrityError from ..locking import UpgradableLock, LockFailed from ..remote import RemoteRepository, InvalidRPCMethod, ConnectionClosedWithHint -from ..repository import Repository, LoggedIO +from ..repository import Repository, LoggedIO, MAGIC from . import BaseTestCase @@ -125,6 +125,46 @@ class RepositoryTestCase(RepositoryTestCaseBase): self.assert_equal(len(self.repository.list(limit=50)), 50) +class LocalRepositoryTestCase(RepositoryTestCaseBase): + # test case that doesn't work with remote repositories + + def _assert_sparse(self): + # The superseded 123456... PUT + assert self.repository.compact[0] == 41 + 9 + # The DELETE issued by the superseding PUT (or issued directly) + assert self.repository.compact[2] == 41 + self.repository._rebuild_sparse(0) + assert self.repository.compact[0] == 41 + 9 + + def test_sparse1(self): + self.repository.put(b'00000000000000000000000000000000', b'foo') + self.repository.put(b'00000000000000000000000000000001', b'123456789') + self.repository.commit() + self.repository.put(b'00000000000000000000000000000001', b'bar') + self._assert_sparse() + + def test_sparse2(self): + self.repository.put(b'00000000000000000000000000000000', b'foo') + self.repository.put(b'00000000000000000000000000000001', b'123456789') + self.repository.commit() + self.repository.delete(b'00000000000000000000000000000001') + self._assert_sparse() + + def test_sparse_delete(self): + self.repository.put(b'00000000000000000000000000000000', b'1245') + self.repository.delete(b'00000000000000000000000000000000') + self.repository.io._write_fd.sync() + + # The on-line tracking works on a per-object basis... + assert self.repository.compact[0] == 41 + 41 + 4 + self.repository._rebuild_sparse(0) + # ...while _rebuild_sparse can mark whole segments as completely sparse (which then includes the segment magic) + assert self.repository.compact[0] == 41 + 41 + 4 + len(MAGIC) + + self.repository.commit() + assert 0 not in [segment for segment, _ in self.repository.io.segment_iterator()] + + class RepositoryCommitTestCase(RepositoryTestCaseBase): def add_keys(self):