From 0fed44110a5e92ff42497be85bb21f3765049ed2 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Tue, 31 Jan 2023 20:09:59 +0100 Subject: [PATCH] remove part files from final archive checkpoint archives might have a single, incomplete part file as last item. part files are always a prefix of the full file, growing in size from checkpoint to checkpoint. we now manage the archive items metadata stream in a special way: - checkpoint archive A(n) might end with a partial item PI(n) - checkpoint archive A(n+1) does not contain PI(n) - checkpoint archive A(n+1) contains a new partial item PI(n+1) - the final archive does not contain any partial items --- src/borg/archive.py | 82 +++++++++++++++-------- src/borg/archiver/create_cmd.py | 1 + src/borg/archiver/tar_cmds.py | 1 + src/borg/testsuite/archiver/create_cmd.py | 24 +------ 4 files changed, 58 insertions(+), 50 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 94644c99c..e25c79f2f 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -353,6 +353,7 @@ class ChunkBuffer: self.chunks = [] self.key = key self.chunker = get_chunker(*chunker_params, seed=self.key.chunk_seed, sparse=False) + self.saved_chunks_len = None def add(self, item): self.buffer.write(self.packer.pack(item.as_dict())) @@ -392,6 +393,18 @@ class ChunkBuffer: def is_full(self): return self.buffer.tell() > self.BUFFER_SIZE + def save_chunks_state(self): + # as we only append to self.chunks, remembering the current length is good enough + self.saved_chunks_len = len(self.chunks) + + def restore_chunks_state(self): + scl = self.saved_chunks_len + assert scl is not None, "forgot to call save_chunks_state?" + tail_chunks = self.chunks[scl:] + del self.chunks[scl:] + self.saved_chunks_len = None + return tail_chunks + class CacheChunkBuffer(ChunkBuffer): def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS): @@ -649,6 +662,15 @@ Duration: {0.duration} stats.show_progress(item=item, dt=0.2) self.items_buffer.add(item) + def prepare_checkpoint(self): + # we need to flush the archive metadata stream to repo chunks, so that + # we have the metadata stream chunks WITHOUT the part file item we add later. + # The part file item will then get into its own metadata stream chunk, which we + # can easily NOT include into the next checkpoint or the final archive. + self.items_buffer.flush(flush=True) + # remember the current state of self.chunks, which corresponds to the flushed chunks + self.items_buffer.save_chunks_state() + def write_checkpoint(self): metadata = self.save(self.checkpoint_name) # that .save() has committed the repo. @@ -660,6 +682,11 @@ Duration: {0.duration} self.cache.chunk_decref(self.id, self.stats) for id in metadata.item_ptrs: self.cache.chunk_decref(id, self.stats) + # also get rid of that part item, we do not want to have it in next checkpoint or final archive + tail_chunks = self.items_buffer.restore_chunks_state() + # tail_chunks contain the tail of the archive items metadata stream, not needed for next commit. + for id in tail_chunks: + self.cache.chunk_decref(id, self.stats) def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None): name = name or self.name @@ -1234,10 +1261,22 @@ def cached_hash(chunk, id_hash): class ChunksProcessor: # Processes an iterator of chunks for an Item - def __init__(self, *, key, cache, add_item, write_checkpoint, checkpoint_interval, checkpoint_volume, rechunkify): + def __init__( + self, + *, + key, + cache, + add_item, + prepare_checkpoint, + write_checkpoint, + checkpoint_interval, + checkpoint_volume, + rechunkify, + ): self.key = key self.cache = cache self.add_item = add_item + self.prepare_checkpoint = prepare_checkpoint self.write_checkpoint = write_checkpoint self.rechunkify = rechunkify # time interval based checkpointing @@ -1248,38 +1287,35 @@ class ChunksProcessor: self.current_volume = 0 self.last_volume_checkpoint = 0 - def write_part_file(self, item, from_chunk, number): + def write_part_file(self, item): + self.prepare_checkpoint() item = Item(internal_dict=item.as_dict()) - length = len(item.chunks) - # the item should only have the *additional* chunks we processed after the last partial item: - item.chunks = item.chunks[from_chunk:] # for borg recreate, we already have a size member in the source item (giving the total file size), # but we consider only a part of the file here, thus we must recompute the size from the chunks: item.get_size(memorize=True, from_chunks=True) - item.path += ".borg_part_%d" % number - item.part = number - number += 1 + item.path += ".borg_part" + item.part = 1 # used to be an increasing number, but now just always 1 IF this is a partial file self.add_item(item, show_progress=False) self.write_checkpoint() - return length, number - def maybe_checkpoint(self, item, from_chunk, part_number, forced=False): + def maybe_checkpoint(self, item): + checkpoint_done = False sig_int_triggered = sig_int and sig_int.action_triggered() if ( - forced - or sig_int_triggered + sig_int_triggered or (self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval) or (self.checkpoint_volume and self.current_volume - self.last_volume_checkpoint >= self.checkpoint_volume) ): if sig_int_triggered: logger.info("checkpoint requested: starting checkpoint creation...") - from_chunk, part_number = self.write_part_file(item, from_chunk, part_number) + self.write_part_file(item) + checkpoint_done = True self.last_checkpoint = time.monotonic() self.last_volume_checkpoint = self.current_volume if sig_int_triggered: sig_int.action_completed() logger.info("checkpoint requested: finished checkpoint creation!") - return from_chunk, part_number + return checkpoint_done # whether a checkpoint archive was created def process_file_chunks(self, item, cache, stats, show_progress, chunk_iter, chunk_processor=None): if not chunk_processor: @@ -1297,28 +1333,15 @@ class ChunksProcessor: # to get rid of .chunks_healthy, as it might not correspond to .chunks any more. if self.rechunkify and "chunks_healthy" in item: del item.chunks_healthy - from_chunk = 0 - part_number = 1 for chunk in chunk_iter: cle = chunk_processor(chunk) item.chunks.append(cle) self.current_volume += cle[1] if show_progress: stats.show_progress(item=item, dt=0.2) - from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=False) + self.maybe_checkpoint(item) else: - if part_number > 1: - if item.chunks[from_chunk:]: - # if we already have created a part item inside this file, we want to put the final - # chunks (if any) into a part item also (so all parts can be concatenated to get - # the complete file): - from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=True) - - # if we created part files, we have referenced all chunks from the part files, - # but we also will reference the same chunks also from the final, complete file: - for chunk in item.chunks: - cache.chunk_incref(chunk.id, stats, size=chunk.size, part=True) - stats.nfiles_parts += part_number - 1 + stats.nfiles_parts += 0 # TODO: remove tracking of this class FilesystemObjectProcessors: @@ -2474,6 +2497,7 @@ class ArchiveRecreater: cache=self.cache, key=self.key, add_item=target.add_item, + prepare_checkpoint=target.prepare_checkpoint, write_checkpoint=target.write_checkpoint, checkpoint_interval=self.checkpoint_interval, checkpoint_volume=self.checkpoint_volume, diff --git a/src/borg/archiver/create_cmd.py b/src/borg/archiver/create_cmd.py index ccb15c462..7c77d14ee 100644 --- a/src/borg/archiver/create_cmd.py +++ b/src/borg/archiver/create_cmd.py @@ -255,6 +255,7 @@ class CreateMixIn: cache=cache, key=key, add_item=archive.add_item, + prepare_checkpoint=archive.prepare_checkpoint, write_checkpoint=archive.write_checkpoint, checkpoint_interval=args.checkpoint_interval, checkpoint_volume=args.checkpoint_volume, diff --git a/src/borg/archiver/tar_cmds.py b/src/borg/archiver/tar_cmds.py index fd0093743..8c46645a5 100644 --- a/src/borg/archiver/tar_cmds.py +++ b/src/borg/archiver/tar_cmds.py @@ -271,6 +271,7 @@ class TarMixIn: cache=cache, key=key, add_item=archive.add_item, + prepare_checkpoint=archive.prepare_checkpoint, write_checkpoint=archive.write_checkpoint, checkpoint_interval=args.checkpoint_interval, checkpoint_volume=args.checkpoint_volume, diff --git a/src/borg/testsuite/archiver/create_cmd.py b/src/borg/testsuite/archiver/create_cmd.py index 24e2f4461..c55c2a473 100644 --- a/src/borg/testsuite/archiver/create_cmd.py +++ b/src/borg/testsuite/archiver/create_cmd.py @@ -182,27 +182,9 @@ class ArchiverTestCase(ArchiverTestCaseBase): ) # repo looking good overall? checks for rc == 0. self.cmd(f"--repo={self.repository_location}", "check", "--debug") - # verify part files - out = self.cmd( - f"--repo={self.repository_location}", - "extract", - "test", - "stdin.borg_part_1", - "--consider-part-files", - "--stdout", - binary_output=True, - ) - assert out == input_data[:chunk_size] - out = self.cmd( - f"--repo={self.repository_location}", - "extract", - "test", - "stdin.borg_part_2", - "--consider-part-files", - "--stdout", - binary_output=True, - ) - assert out == input_data[: chunk_size - 1] + # verify that there are no part files in final archive + out = self.cmd(f"--repo={self.repository_location}", "list", "test", "--consider-part-files") + assert "stdin.borg_part" not in out # verify full file out = self.cmd(f"--repo={self.repository_location}", "extract", "test", "stdin", "--stdout", binary_output=True) assert out == input_data