diff --git a/src/borg/archiver/transfer_cmd.py b/src/borg/archiver/transfer_cmd.py index 8b748da7e..5fccb9fb9 100644 --- a/src/borg/archiver/transfer_cmd.py +++ b/src/borg/archiver/transfer_cmd.py @@ -17,6 +17,60 @@ from ..logger import create_logger logger = create_logger() +def transfer_chunks(upgrader, other_repository, other_manifest, other_chunks, archive, cache, recompress, dry_run): + transfer = 0 + present = 0 + chunks = [] + for chunk_id, size in other_chunks: + chunk_present = cache.seen_chunk(chunk_id, size) + if not chunk_present: # target repo does not yet have this chunk + if not dry_run: + try: + cdata = other_repository.get(chunk_id) + except (Repository.ObjectNotFound, LegacyRepository.ObjectNotFound): + # missing correct chunk in other_repository (source) will result in + # a missing chunk in repository (destination). + # we do NOT want to transfer all-zero replacement chunks from borg1 repos. + pass + else: + if recompress == "never": + # keep compressed payload same, verify via assert_id (that will + # decompress, but avoid needing to compress it again): + meta, data = other_manifest.repo_objs.parse( + chunk_id, cdata, decompress=True, want_compressed=True, ro_type=ROBJ_FILE_STREAM + ) + meta, data = upgrader.upgrade_compressed_chunk(meta, data) + chunk_entry = cache.add_chunk( + chunk_id, + meta, + data, + stats=archive.stats, + wait=False, + compress=False, + size=size, + ctype=meta["ctype"], + clevel=meta["clevel"], + ro_type=ROBJ_FILE_STREAM, + ) + elif recompress == "always": + # always decompress and re-compress file data chunks + meta, data = other_manifest.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_FILE_STREAM) + chunk_entry = cache.add_chunk( + chunk_id, meta, data, stats=archive.stats, wait=False, ro_type=ROBJ_FILE_STREAM + ) + else: + raise ValueError(f"unsupported recompress mode: {recompress}") + cache.repository.async_response(wait=False) + chunks.append(chunk_entry) + transfer += size + else: + if not dry_run: + chunk_entry = cache.reuse_chunk(chunk_id, size, archive.stats) + chunks.append(chunk_entry) + present += size + return chunks, transfer, present + + class TransferMixIn: @with_other_repository(manifest=True, compatibility=(Manifest.Operation.READ,)) @with_repository(manifest=True, cache=True, compatibility=(Manifest.Operation.WRITE,)) @@ -120,68 +174,21 @@ class TransferMixIn: else: other_chunks = None if other_chunks is not None: - chunks = [] - for chunk_id, size in other_chunks: - chunk_present = cache.seen_chunk(chunk_id, size) - if not chunk_present: # target repo does not yet have this chunk - if not dry_run: - try: - cdata = other_repository.get(chunk_id) - except (Repository.ObjectNotFound, LegacyRepository.ObjectNotFound): - # missing correct chunk in other_repository (source) will result in - # a missing chunk in repository (destination). - # we do NOT want to transfer all-zero replacement chunks from borg1 repos. - pass - else: - if args.recompress == "never": - # keep compressed payload same, verify via assert_id (that will - # decompress, but avoid needing to compress it again): - meta, data = other_manifest.repo_objs.parse( - chunk_id, - cdata, - decompress=True, - want_compressed=True, - ro_type=ROBJ_FILE_STREAM, - ) - meta, data = upgrader.upgrade_compressed_chunk(meta, data) - chunk_entry = cache.add_chunk( - chunk_id, - meta, - data, - stats=archive.stats, - wait=False, - compress=False, - size=size, - ctype=meta["ctype"], - clevel=meta["clevel"], - ro_type=ROBJ_FILE_STREAM, - ) - elif args.recompress == "always": - # always decompress and re-compress file data chunks - meta, data = other_manifest.repo_objs.parse( - chunk_id, cdata, ro_type=ROBJ_FILE_STREAM - ) - chunk_entry = cache.add_chunk( - chunk_id, - meta, - data, - stats=archive.stats, - wait=False, - ro_type=ROBJ_FILE_STREAM, - ) - else: - raise ValueError(f"unsupported recompress mode: {args.recompress}") - cache.repository.async_response(wait=False) - chunks.append(chunk_entry) - transfer_size += size - else: - if not dry_run: - chunk_entry = cache.reuse_chunk(chunk_id, size, archive.stats) - chunks.append(chunk_entry) - present_size += size + chunks, transfer, present = transfer_chunks( + upgrader, + other_repository, + other_manifest, + other_chunks, + archive, + cache, + args.recompress, + dry_run, + ) if not dry_run: item.chunks = chunks archive.stats.nfiles += 1 + transfer_size += transfer + present_size += present if not dry_run: item = upgrader.upgrade_item(item=item) archive.add_item(item, show_progress=args.progress)