diff --git a/src/borg/repository.py b/src/borg/repository.py index 794f6fdc0..a095ca7c8 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -22,6 +22,7 @@ from .helpers import Location from .helpers import ProgressIndicatorPercent from .helpers import bin_to_hex from .locking import Lock, LockError, LockErrorT +from .logger import create_logger from .lrucache import LRUCache from .platform import SaveFile, SyncFile, sync_dir @@ -110,6 +111,10 @@ class Repository: self.io = None self.lock = None self.index = None + # This is an index of shadowed log entries during this transaction. Consider the following sequence: + # segment_n PUT A, segment_x DELETE A + # After the "DELETE A" in segment_x the shadow index will contain "A -> [n]". + self.shadow_index = {} self._active_txn = False self.lock_wait = lock_wait self.do_lock = lock @@ -308,6 +313,7 @@ class Repository: if transaction_id is None: self.segments = {} # XXX bad name: usage_count_of_segment_x = self.segments[x] self.compact = FreeSpace() # XXX bad name: freeable_space_of_segment_x = self.compact[x] + self.shadow_index.clear() else: if do_cleanup: self.io.cleanup(transaction_id) @@ -338,6 +344,11 @@ class Repository: else: self.segments = hints[b'segments'] self.compact = FreeSpace(hints[b'compact']) + # Drop uncommitted segments in the shadow index + for key, shadowed_segments in self.shadow_index.items(): + for segment in list(shadowed_segments): + if segment > transaction_id: + shadowed_segments.remove(segment) def write_index(self): hints = {b'version': 2, @@ -413,31 +424,40 @@ class Repository: index_transaction_id = self.get_index_transaction_id() segments = self.segments unused = [] # list of segments, that are not used anymore + logger = create_logger('borg.debug.compact_segments') def complete_xfer(intermediate=True): # complete the current transfer (when some target segment is full) nonlocal unused # commit the new, compact, used segments - self.io.write_commit(intermediate=intermediate) + segment = self.io.write_commit(intermediate=intermediate) + logger.debug('complete_xfer: wrote %scommit at segment %d', 'intermediate ' if intermediate else '', segment) # get rid of the old, sparse, unused segments. free space. for segment in unused: + logger.debug('complete_xfer: deleting unused segment %d', segment) assert self.segments.pop(segment) == 0 self.io.delete_segment(segment) del self.compact[segment] unused = [] + logger.debug('compaction started.') for segment, freeable_space in sorted(self.compact.items()): if not self.io.segment_exists(segment): + logger.warning('segment %d not found, but listed in compaction data', segment) del self.compact[segment] continue segment_size = self.io.segment_size(segment) if segment_size > 0.2 * self.max_segment_size and freeable_space < 0.15 * segment_size: - logger.debug('not compacting segment %d for later (only %d bytes are sparse)', - segment, freeable_space) + logger.debug('not compacting segment %d (only %d bytes are sparse)', segment, freeable_space) continue segments.setdefault(segment, 0) + logger.debug('compacting segment %d with usage count %d and %d freeable bytes', + segment, segments[segment], freeable_space) for tag, key, offset, data in self.io.iter_objects(segment, include_data=True): - if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset): + if tag == TAG_COMMIT: + continue + in_index = self.index.get(key) == (segment, offset) + if tag == TAG_PUT and in_index: try: new_segment, offset = self.io.write_put(key, data, raise_full=True) except LoggedIO.SegmentFull: @@ -447,8 +467,22 @@ class Repository: segments.setdefault(new_segment, 0) segments[new_segment] += 1 segments[segment] -= 1 + elif tag == TAG_PUT and not in_index: + # If this is a PUT shadowed by a later tag, then it will be gone when this segment is deleted after + # this loop. Therefore it is removed from the shadow index. + try: + self.shadow_index[key].remove(segment) + except (KeyError, ValueError): + pass elif tag == TAG_DELETE: - if index_transaction_id is None or segment > index_transaction_id: + # If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag, + # therefore we do not drop the delete, but write it to a current segment. + shadowed_put_exists = key not in self.shadow_index or any( + # If the key is in the shadow index and there is any segment with an older PUT of this + # key, we have a shadowed put. + shadowed < segment for shadowed in self.shadow_index[key]) + + if shadowed_put_exists or index_transaction_id is None or segment > index_transaction_id: # (introduced in 6425d16aa84be1eaaf88) # This is needed to avoid object un-deletion if we crash between the commit and the deletion # of old segments in complete_xfer(). @@ -492,6 +526,7 @@ class Repository: assert segments[segment] == 0 unused.append(segment) complete_xfer(intermediate=False) + logger.debug('compaction completed.') def replay_segments(self, index_transaction_id, segments_transaction_id): # fake an old client, so that in case we do not have an exclusive lock yet, prepare_txn will upgrade the lock: @@ -705,6 +740,7 @@ class Repository: segment, offset = self.index.pop(id) except KeyError: raise self.ObjectNotFound(id, self.path) from None + self.shadow_index.setdefault(id, []).append(segment) self.segments[segment] -= 1 size = self.io.read(segment, offset, id, read_data=False) self.compact[segment] += size @@ -1017,6 +1053,7 @@ class LoggedIO: crc = self.crc_fmt.pack(crc32(header) & 0xffffffff) fd.write(b''.join((crc, header))) self.close_segment() + return self.segment - 1 # close_segment() increments it MAX_DATA_SIZE = MAX_OBJECT_SIZE - LoggedIO.put_header_fmt.size diff --git a/src/borg/testsuite/repository.py b/src/borg/testsuite/repository.py index 25e93101a..67c250e8d 100644 --- a/src/borg/testsuite/repository.py +++ b/src/borg/testsuite/repository.py @@ -295,6 +295,56 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase): for tag, key, offset, size in self.repository.io.iter_objects(segment): assert tag != TAG_DELETE + def test_shadowed_entries_are_preserved(self): + get_latest_segment = self.repository.io.get_latest_segment + self.repository.put(H(1), b'1') + # This is the segment with our original PUT of interest + put_segment = get_latest_segment() + self.repository.commit() + + # We now delete H(1), and force this segment to not be compacted, which can happen + # if it's not sparse enough (symbolized by H(2) here). + self.repository.delete(H(1)) + self.repository.put(H(2), b'1') + delete_segment = get_latest_segment() + + # We pretend these are mostly dense (not sparse) and won't be compacted + del self.repository.compact[put_segment] + del self.repository.compact[delete_segment] + + self.repository.commit() + + # Now we perform an unrelated operation on the segment containing the DELETE, + # causing it to be compacted. + self.repository.delete(H(2)) + self.repository.commit() + + assert self.repository.io.segment_exists(put_segment) + assert not self.repository.io.segment_exists(delete_segment) + + # Basic case, since the index survived this must be ok + assert H(1) not in self.repository + # Nuke index, force replay + os.unlink(os.path.join(self.repository.path, 'index.%d' % get_latest_segment())) + # Must not reappear + assert H(1) not in self.repository + + def test_shadow_index_rollback(self): + self.repository.put(H(1), b'1') + self.repository.delete(H(1)) + assert self.repository.shadow_index[H(1)] == [0] + self.repository.commit() + # note how an empty list means that nothing is shadowed for sure + assert self.repository.shadow_index[H(1)] == [] + self.repository.put(H(1), b'1') + self.repository.delete(H(1)) + # 0 put/delete; 1 commit; 2 compacted; 3 commit; 4 put/delete + assert self.repository.shadow_index[H(1)] == [4] + self.repository.rollback() + self.repository.put(H(2), b'1') + # After the rollback segment 4 shouldn't be considered anymore + assert self.repository.shadow_index[H(1)] == [] + class RepositoryAppendOnlyTestCase(RepositoryTestCaseBase): def open(self, create=False):