From 44935aa8eacf2742c3ed7edd7b7de409ddad89ad Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sat, 19 Nov 2016 16:49:20 +0100 Subject: [PATCH] recreate: remove interruption blah, autocommit blah, resuming blah --- src/borg/archive.py | 136 +++------------------------------ src/borg/archiver.py | 50 +++++------- src/borg/testsuite/archiver.py | 100 ------------------------ 3 files changed, 29 insertions(+), 257 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 263536f20..269263f9d 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1371,9 +1371,6 @@ class ArchiveChecker: class ArchiveRecreater: - AUTOCOMMIT_THRESHOLD = 512 * 1024 * 1024 - """Commit (compact segments) after this many (or 1 % of repository size, whichever is greater) bytes.""" - class FakeTargetArchive: def __init__(self): self.stats = Statistics() @@ -1409,9 +1406,6 @@ class ArchiveRecreater: compression_files or []) key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none')) - self.autocommit_threshold = max(self.AUTOCOMMIT_THRESHOLD, self.cache.chunks_stored_size() / 100) - logger.debug("Autocommit threshold: %s", format_file_size(self.autocommit_threshold)) - self.dry_run = dry_run self.stats = stats self.progress = progress @@ -1423,20 +1417,17 @@ class ArchiveRecreater: def recreate(self, archive_name, comment=None, target_name=None): assert not self.is_temporary_archive(archive_name) archive = self.open_archive(archive_name) - target, resume_from = self.create_target_or_resume(archive, target_name) + target = self.create_target(archive, target_name) if self.exclude_if_present or self.exclude_caches: self.matcher_add_tagged_dirs(archive) if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None: logger.info("Skipping archive %s, nothing to do", archive_name) return True - try: - self.process_items(archive, target, resume_from) - except self.Interrupted as e: - return self.save(archive, target, completed=False, metadata=e.metadata) + self.process_items(archive, target) replace_original = target_name is None return self.save(archive, target, comment, replace_original=replace_original) - def process_items(self, archive, target, resume_from=None): + def process_items(self, archive, target): matcher = self.matcher target_is_subset = not matcher.empty() hardlink_masters = {} if target_is_subset else None @@ -1450,15 +1441,8 @@ class ArchiveRecreater: for item in archive.iter_items(): if item_is_hardlink_master(item): - # Re-visit all of these items in the archive even when fast-forwarding to rebuild hardlink_masters hardlink_masters[item.path] = (item.get('chunks'), None) continue - if resume_from: - # Fast forward to after the last processed file - if item.path == resume_from: - logger.info('Fast-forwarded to %s', remove_surrogates(item.path)) - resume_from = None - continue if not matcher.match(item.path): self.print_file_status('x', item.path) continue @@ -1476,12 +1460,7 @@ class ArchiveRecreater: if self.dry_run: self.print_file_status('-', item.path) else: - try: - self.process_item(archive, target, item) - except self.Interrupted: - if self.progress: - target.stats.show_progress(final=True) - raise + self.process_item(archive, target, item) if self.progress: target.stats.show_progress(final=True) @@ -1491,8 +1470,6 @@ class ArchiveRecreater: target.stats.nfiles += 1 target.add_item(item) self.print_file_status(file_status(item.mode), item.path) - if self.interrupt: - raise self.Interrupted def process_chunks(self, archive, target, item): """Return new chunk ID list for 'item'.""" @@ -1500,9 +1477,8 @@ class ArchiveRecreater: for chunk_id, size, csize in item.chunks: self.cache.chunk_incref(chunk_id, target.stats) return item.chunks - new_chunks = self.process_partial_chunks(target) + new_chunks = [] chunk_iterator = self.create_chunk_iterator(archive, target, item) - consume(chunk_iterator, len(new_chunks)) compress = self.compression_decider1.decide(item.path) for chunk in chunk_iterator: chunk.meta['compress'] = compress @@ -1521,20 +1497,8 @@ class ArchiveRecreater: chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite) new_chunks.append((chunk_id, size, csize)) self.seen_chunks.add(chunk_id) - if self.recompress and self.cache.seen_chunk(chunk_id) == 1: - # This tracks how many bytes are uncommitted but compactable, since we are recompressing - # existing chunks. - target.recreate_uncomitted_bytes += csize - if target.recreate_uncomitted_bytes >= self.autocommit_threshold: - # Issue commits to limit additional space usage when recompressing chunks - target.recreate_uncomitted_bytes = 0 - self.repository.commit() if self.progress: target.stats.show_progress(item=item, dt=0.2) - if self.interrupt: - raise self.Interrupted({ - 'recreate_partial_chunks': new_chunks, - }) return new_chunks def create_chunk_iterator(self, archive, target, item): @@ -1552,19 +1516,6 @@ class ArchiveRecreater: chunk_iterator = _chunk_iterator() return chunk_iterator - def process_partial_chunks(self, target): - """Return chunks from a previous run for archive 'target' (if any) or an empty list.""" - if not target.recreate_partial_chunks: - return [] - # No incref, create_target_or_resume already did that before to deleting the old target archive - # So just copy these over - partial_chunks = target.recreate_partial_chunks - target.recreate_partial_chunks = None - for chunk_id, size, csize in partial_chunks: - self.seen_chunks.add(chunk_id) - logger.debug('Copied %d chunks from a partially processed item', len(partial_chunks)) - return partial_chunks - def save(self, archive, target, comment=None, completed=True, metadata=None, replace_original=True): """Save target archive. If completed, replace source. If not, save temporary with additional 'metadata' dict.""" if self.dry_run: @@ -1631,84 +1582,15 @@ class ArchiveRecreater: matcher.add(tag_files, True) matcher.add(tagged_dirs, False) - def create_target_or_resume(self, archive, target_name=None): - """Create new target archive or resume from temporary archive, if it exists. Return archive, resume from path""" + def create_target(self, archive, target_name=None): + """Create target archive.""" if self.dry_run: return self.FakeTargetArchive(), None target_name = target_name or archive.name + '.recreate' - resume = target_name in self.manifest.archives - target, resume_from = None, None - if resume: - target, resume_from = self.try_resume(archive, target_name) - if not target: - target = self.create_target_archive(target_name) + target = self.create_target_archive(target_name) # If the archives use the same chunker params, then don't rechunkify target.recreate_rechunkify = tuple(archive.metadata.get('chunker_params', [])) != self.chunker_params - return target, resume_from - - def try_resume(self, archive, target_name): - """Try to resume from temporary archive. Return (target archive, resume from path) if successful.""" - logger.info('Found %s, will resume interrupted operation', target_name) - old_target = self.open_archive(target_name) - if not self.can_resume(archive, old_target, target_name): - return None, None - target = self.create_target_archive(target_name + '.temp') - logger.info('Replaying items from interrupted operation...') - last_old_item = self.copy_items(old_target, target) - resume_from = getattr(last_old_item, 'path', None) - self.incref_partial_chunks(old_target, target) - old_target.delete(Statistics(), progress=self.progress) - logger.info('Done replaying items') - return target, resume_from - - def incref_partial_chunks(self, source_archive, target_archive): - target_archive.recreate_partial_chunks = source_archive.metadata.get('recreate_partial_chunks', []) - for chunk_id, size, csize in target_archive.recreate_partial_chunks: - if not self.cache.seen_chunk(chunk_id): - try: - # Repository has __contains__, RemoteRepository doesn't - # `chunk_id in repo` doesn't read the data though, so we try to use that if possible. - get_or_in = getattr(self.repository, '__contains__', self.repository.get) - if get_or_in(chunk_id) is False: - raise Repository.ObjectNotFound(chunk_id, self.repository) - except Repository.ObjectNotFound: - # delete/prune/check between invocations: these chunks are gone. - target_archive.recreate_partial_chunks = None - break - # fast-lane insert into chunks cache - self.cache.chunks[chunk_id] = (1, size, csize) - target_archive.stats.update(size, csize, True) - continue - # incref now, otherwise a source_archive.delete() might delete these chunks - self.cache.chunk_incref(chunk_id, target_archive.stats) - - def copy_items(self, source_archive, target_archive): - item = None - for item in source_archive.iter_items(): - if 'chunks' in item: - for chunk in item.chunks: - self.cache.chunk_incref(chunk.id, target_archive.stats) - target_archive.stats.nfiles += 1 - target_archive.add_item(item) - if self.progress: - target_archive.stats.show_progress(final=True) - return item - - def can_resume(self, archive, old_target, target_name): - resume_id = old_target.metadata.recreate_source_id - resume_args = [safe_decode(arg) for arg in old_target.metadata.recreate_args] - if resume_id != archive.id: - logger.warning('Source archive changed, will discard %s and start over', target_name) - logger.warning('Saved fingerprint: %s', bin_to_hex(resume_id)) - logger.warning('Current fingerprint: %s', archive.fpr) - old_target.delete(Statistics(), progress=self.progress) - return False - if resume_args != sys.argv[1:]: - logger.warning('Command line changed, this might lead to inconsistencies') - logger.warning('Saved: %s', repr(resume_args)) - logger.warning('Current: %s', repr(sys.argv[1:])) - # Just warn in this case, don't start over - return True + return target def create_target_archive(self, name): target = Archive(self.repository, self.key, self.manifest, name, create=True, diff --git a/src/borg/archiver.py b/src/borg/archiver.py index 59f4e17bb..3c63e37e5 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -1058,13 +1058,6 @@ class Archiver: @with_repository(cache=True, exclusive=True) def do_recreate(self, args, repository, manifest, key, cache): """Re-create archives""" - def interrupt(signal_num, stack_frame): - if recreater.interrupt: - print("\nReceived signal, again. I'm not deaf.", file=sys.stderr) - else: - print("\nReceived signal, will exit cleanly.", file=sys.stderr) - recreater.interrupt = True - msg = ("recreate is an experimental feature.\n" "Type 'YES' if you understand this and want to continue: ") if not yes(msg, false_msg="Aborting.", truish=('YES',), @@ -1084,30 +1077,27 @@ class Archiver: file_status_printer=self.print_file_status, dry_run=args.dry_run) - with signal_handler(signal.SIGTERM, interrupt), \ - signal_handler(signal.SIGINT, interrupt), \ - signal_handler(signal.SIGHUP, interrupt): - if args.location.archive: - name = args.location.archive + if args.location.archive: + name = args.location.archive + if recreater.is_temporary_archive(name): + self.print_error('Refusing to work on temporary archive of prior recreate: %s', name) + return self.exit_code + recreater.recreate(name, args.comment, args.target) + else: + if args.target is not None: + self.print_error('--target: Need to specify single archive') + return self.exit_code + for archive in manifest.archives.list(sort_by=['ts']): + name = archive.name if recreater.is_temporary_archive(name): - self.print_error('Refusing to work on temporary archive of prior recreate: %s', name) - return self.exit_code - recreater.recreate(name, args.comment, args.target) - else: - if args.target is not None: - self.print_error('--target: Need to specify single archive') - return self.exit_code - for archive in manifest.archives.list(sort_by=['ts']): - name = archive.name - if recreater.is_temporary_archive(name): - continue - print('Processing', name) - if not recreater.recreate(name, args.comment): - break - manifest.write() - repository.commit() - cache.commit() - return self.exit_code + continue + print('Processing', name) + if not recreater.recreate(name, args.comment): + break + manifest.write() + repository.commit() + cache.commit() + return self.exit_code @with_repository(manifest=False, exclusive=True) def do_with_lock(self, args, repository): diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py index ffa7cccd6..da3720563 100644 --- a/src/borg/testsuite/archiver.py +++ b/src/borg/testsuite/archiver.py @@ -1717,106 +1717,6 @@ class ArchiverTestCase(ArchiverTestCaseBase): archives_after = self.cmd('list', self.repository_location + '::test') assert archives_after == archives_before - def _recreate_interrupt_patch(self, interrupt_after_n_1_files): - def interrupt(self, *args): - if interrupt_after_n_1_files: - self.interrupt = True - pi_save(self, *args) - else: - raise ArchiveRecreater.Interrupted - - def process_item_patch(*args): - return pi_call.pop(0)(*args) - - pi_save = ArchiveRecreater.process_item - pi_call = [pi_save] * interrupt_after_n_1_files + [interrupt] - return process_item_patch - - def _test_recreate_interrupt(self, change_args, interrupt_early): - self.create_test_files() - self.create_regular_file('dir2/abcdef', size=1024 * 80) - self.cmd('init', self.repository_location) - self.cmd('create', self.repository_location + '::test', 'input') - process_files = 1 - if interrupt_early: - process_files = 0 - with patch.object(ArchiveRecreater, 'process_item', self._recreate_interrupt_patch(process_files)): - self.cmd('recreate', self.repository_location, 'input/dir2') - assert 'test.recreate' in self.cmd('list', self.repository_location) - if change_args: - with patch.object(sys, 'argv', sys.argv + ['non-forking tests don\'t use sys.argv']): - output = self.cmd('recreate', '-sv', '--list', '-pC', 'lz4', self.repository_location, 'input/dir2') - else: - output = self.cmd('recreate', '-sv', '--list', self.repository_location, 'input/dir2') - assert 'Found test.recreate, will resume' in output - assert change_args == ('Command line changed' in output) - if not interrupt_early: - assert 'Fast-forwarded to input/dir2/abcdef' in output - assert 'A input/dir2/abcdef' not in output - assert 'A input/dir2/file2' in output - archives = self.cmd('list', self.repository_location) - assert 'test.recreate' not in archives - assert 'test' in archives - files = self.cmd('list', self.repository_location + '::test') - assert 'dir2/file2' in files - assert 'dir2/abcdef' in files - assert 'file1' not in files - - # The _test_create_interrupt requires a deterministic (alphabetic) order of the files to easily check if - # resumption works correctly. Patch scandir_inorder to work in alphabetic order. - - def test_recreate_interrupt(self): - with patch.object(helpers, 'scandir_inorder', helpers.scandir_generic): - self._test_recreate_interrupt(False, True) - - def test_recreate_interrupt2(self): - with patch.object(helpers, 'scandir_inorder', helpers.scandir_generic): - self._test_recreate_interrupt(True, False) - - def _test_recreate_chunker_interrupt_patch(self): - real_add_chunk = Cache.add_chunk - - def add_chunk(*args, **kwargs): - frame = inspect.stack()[2] - try: - caller_self = frame[0].f_locals['self'] - if isinstance(caller_self, ArchiveRecreater): - caller_self.interrupt = True - finally: - del frame - return real_add_chunk(*args, **kwargs) - return add_chunk - - def test_recreate_rechunkify_interrupt(self): - self.create_regular_file('file1', size=1024 * 80) - self.cmd('init', self.repository_location) - self.cmd('create', self.repository_location + '::test', 'input') - archive_before = self.cmd('list', self.repository_location + '::test', '--format', '{sha512}') - with patch.object(Cache, 'add_chunk', self._test_recreate_chunker_interrupt_patch()): - self.cmd('recreate', '-pv', '--chunker-params', '10,13,11,4095', self.repository_location) - assert 'test.recreate' in self.cmd('list', self.repository_location) - output = self.cmd('recreate', '-svp', '--debug', '--chunker-params', '10,13,11,4095', self.repository_location) - assert 'Found test.recreate, will resume' in output - assert 'Copied 1 chunks from a partially processed item' in output - archive_after = self.cmd('list', self.repository_location + '::test', '--format', '{sha512}') - assert archive_after == archive_before - - def test_recreate_changed_source(self): - self.create_test_files() - self.cmd('init', self.repository_location) - self.cmd('create', self.repository_location + '::test', 'input') - with patch.object(ArchiveRecreater, 'process_item', self._recreate_interrupt_patch(1)): - self.cmd('recreate', self.repository_location, 'input/dir2') - assert 'test.recreate' in self.cmd('list', self.repository_location) - self.cmd('delete', self.repository_location + '::test') - self.cmd('create', self.repository_location + '::test', 'input') - output = self.cmd('recreate', self.repository_location, 'input/dir2') - assert 'Source archive changed, will discard test.recreate and start over' in output - - def test_recreate_refuses_temporary(self): - self.cmd('init', self.repository_location) - self.cmd('recreate', self.repository_location + '::cba.recreate', exit_code=2) - def test_recreate_skips_nothing_to_do(self): self.create_regular_file('file1', size=1024 * 80) self.cmd('init', self.repository_location)