diff --git a/src/borg/archiver/transfer_cmd.py b/src/borg/archiver/transfer_cmd.py index 8b748da7e..ea3b43326 100644 --- a/src/borg/archiver/transfer_cmd.py +++ b/src/borg/archiver/transfer_cmd.py @@ -1,13 +1,15 @@ import argparse from ._common import with_repository, with_other_repository, Highlander -from ..archive import Archive +from ..archive import Archive, cached_hash, DownloadPipeline +from ..chunker import get_chunker from ..compress import CompressionSpec from ..constants import * # NOQA from ..crypto.key import uses_same_id_hash, uses_same_chunker_secret from ..helpers import Error from ..helpers import location_validator, Location, archivename_validator, comment_validator from ..helpers import format_file_size, bin_to_hex +from ..helpers import ChunkerParams, ChunkIteratorFileWrapper from ..manifest import Manifest from ..legacyrepository import LegacyRepository from ..repository import Repository @@ -17,6 +19,103 @@ from ..logger import create_logger logger = create_logger() +def transfer_chunks( + upgrader, other_repository, other_manifest, other_chunks, archive, cache, recompress, dry_run, chunker_params=None +): + """ + Transfer chunks from another repository to the current repository. + + If chunker_params is provided, the chunks will be re-chunked using the specified parameters. + """ + transfer = 0 + present = 0 + chunks = [] + + # Determine if re-chunking is needed + rechunkify = chunker_params is not None + + if rechunkify: + # Similar to ArchiveRecreater.iter_chunks + pipeline = DownloadPipeline(other_manifest.repository, other_manifest.repo_objs) + chunk_iterator = pipeline.fetch_many(other_chunks, ro_type=ROBJ_FILE_STREAM) + file = ChunkIteratorFileWrapper(chunk_iterator) + + # Create a chunker with the specified parameters + chunker = get_chunker(*chunker_params, seed=archive.key.chunk_seed, sparse=False) + for chunk in chunker.chunkify(file): + if not dry_run: + chunk_id, data = cached_hash(chunk, archive.key.id_hash) + size = len(data) + # Check if the chunk is already in the repository + chunk_present = cache.seen_chunk(chunk_id, size) + if chunk_present: + chunk_entry = cache.reuse_chunk(chunk_id, size, archive.stats) + present += size + else: + # Add the new chunk to the repository + chunk_entry = cache.add_chunk( + chunk_id, {}, data, stats=archive.stats, wait=False, ro_type=ROBJ_FILE_STREAM + ) + cache.repository.async_response(wait=False) + transfer += size + chunks.append(chunk_entry) + else: + # In dry-run mode, just estimate the size + size = len(chunk.data) if chunk.data is not None else chunk.size + transfer += size + else: + # Original implementation without re-chunking + for chunk_id, size in other_chunks: + chunk_present = cache.seen_chunk(chunk_id, size) + if not chunk_present: # target repo does not yet have this chunk + if not dry_run: + try: + cdata = other_repository.get(chunk_id) + except (Repository.ObjectNotFound, LegacyRepository.ObjectNotFound): + # missing correct chunk in other_repository (source) will result in + # a missing chunk in repository (destination). + # we do NOT want to transfer all-zero replacement chunks from borg1 repos. + pass + else: + if recompress == "never": + # keep compressed payload same, verify via assert_id (that will + # decompress, but avoid needing to compress it again): + meta, data = other_manifest.repo_objs.parse( + chunk_id, cdata, decompress=True, want_compressed=True, ro_type=ROBJ_FILE_STREAM + ) + meta, data = upgrader.upgrade_compressed_chunk(meta, data) + chunk_entry = cache.add_chunk( + chunk_id, + meta, + data, + stats=archive.stats, + wait=False, + compress=False, + size=size, + ctype=meta["ctype"], + clevel=meta["clevel"], + ro_type=ROBJ_FILE_STREAM, + ) + elif recompress == "always": + # always decompress and re-compress file data chunks + meta, data = other_manifest.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_FILE_STREAM) + chunk_entry = cache.add_chunk( + chunk_id, meta, data, stats=archive.stats, wait=False, ro_type=ROBJ_FILE_STREAM + ) + else: + raise ValueError(f"unsupported recompress mode: {recompress}") + cache.repository.async_response(wait=False) + chunks.append(chunk_entry) + transfer += size + else: + if not dry_run: + chunk_entry = cache.reuse_chunk(chunk_id, size, archive.stats) + chunks.append(chunk_entry) + present += size + + return chunks, transfer, present + + class TransferMixIn: @with_other_repository(manifest=True, compatibility=(Manifest.Operation.READ,)) @with_repository(manifest=True, cache=True, compatibility=(Manifest.Operation.WRITE,)) @@ -76,7 +175,7 @@ class TransferMixIn: if UpgraderCls is not upgrade_mod.UpgraderFrom12To20 and other_manifest.repository.version == 1: raise Error("To transfer from a borg 1.x repo, you need to use: --upgrader=From12To20") - upgrader = UpgraderCls(cache=cache) + upgrader = UpgraderCls(cache=cache, args=args) for archive_info in archive_infos: name, id, ts = archive_info.name, archive_info.id, archive_info.ts @@ -120,68 +219,22 @@ class TransferMixIn: else: other_chunks = None if other_chunks is not None: - chunks = [] - for chunk_id, size in other_chunks: - chunk_present = cache.seen_chunk(chunk_id, size) - if not chunk_present: # target repo does not yet have this chunk - if not dry_run: - try: - cdata = other_repository.get(chunk_id) - except (Repository.ObjectNotFound, LegacyRepository.ObjectNotFound): - # missing correct chunk in other_repository (source) will result in - # a missing chunk in repository (destination). - # we do NOT want to transfer all-zero replacement chunks from borg1 repos. - pass - else: - if args.recompress == "never": - # keep compressed payload same, verify via assert_id (that will - # decompress, but avoid needing to compress it again): - meta, data = other_manifest.repo_objs.parse( - chunk_id, - cdata, - decompress=True, - want_compressed=True, - ro_type=ROBJ_FILE_STREAM, - ) - meta, data = upgrader.upgrade_compressed_chunk(meta, data) - chunk_entry = cache.add_chunk( - chunk_id, - meta, - data, - stats=archive.stats, - wait=False, - compress=False, - size=size, - ctype=meta["ctype"], - clevel=meta["clevel"], - ro_type=ROBJ_FILE_STREAM, - ) - elif args.recompress == "always": - # always decompress and re-compress file data chunks - meta, data = other_manifest.repo_objs.parse( - chunk_id, cdata, ro_type=ROBJ_FILE_STREAM - ) - chunk_entry = cache.add_chunk( - chunk_id, - meta, - data, - stats=archive.stats, - wait=False, - ro_type=ROBJ_FILE_STREAM, - ) - else: - raise ValueError(f"unsupported recompress mode: {args.recompress}") - cache.repository.async_response(wait=False) - chunks.append(chunk_entry) - transfer_size += size - else: - if not dry_run: - chunk_entry = cache.reuse_chunk(chunk_id, size, archive.stats) - chunks.append(chunk_entry) - present_size += size + chunks, transfer, present = transfer_chunks( + upgrader, + other_repository, + other_manifest, + other_chunks, + archive, + cache, + args.recompress, + dry_run, + args.chunker_params, + ) if not dry_run: item.chunks = chunks archive.stats.nfiles += 1 + transfer_size += transfer + present_size += present if not dry_run: item = upgrader.upgrade_item(item=item) archive.add_item(item, show_progress=args.progress) @@ -213,6 +266,7 @@ class TransferMixIn: This command transfers archives from one repository to another repository. Optionally, it can also upgrade the transferred data. Optionally, it can also recompress the transferred data. + Optionally, it can also re-chunk the transferred data using different chunker parameters. It is easiest (and fastest) to give ``--compression=COMPRESSION --recompress=never`` using the same COMPRESSION mode as in the SRC_REPO - borg will use that COMPRESSION for metadata (in @@ -258,6 +312,10 @@ class TransferMixIn: borg --repo=DST_REPO transfer --other-repo=SRC_REPO --from-borg1 \\ --compress=zstd,3 --recompress=always + # to re-chunk using different chunker parameters: + borg --repo=DST_REPO transfer --other-repo=SRC_REPO \\ + --chunker-params=buzhash,19,23,21,4095 + """ ) @@ -321,5 +379,16 @@ class TransferMixIn: "If no MODE is given, `always` will be used. " 'Not passing --recompress is equivalent to "--recompress never".', ) + subparser.add_argument( + "--chunker-params", + metavar="PARAMS", + dest="chunker_params", + type=ChunkerParams, + default=None, + action=Highlander, + help="rechunk using given chunker parameters (ALGO, CHUNK_MIN_EXP, CHUNK_MAX_EXP, " + "HASH_MASK_BITS, HASH_WINDOW_SIZE) or `default` to use the chunker defaults. " + "default: do not rechunk", + ) define_archive_filters_group(subparser) diff --git a/src/borg/testsuite/archiver/transfer_cmd_test.py b/src/borg/testsuite/archiver/transfer_cmd_test.py index 9071428d8..da1055d55 100644 --- a/src/borg/testsuite/archiver/transfer_cmd_test.py +++ b/src/borg/testsuite/archiver/transfer_cmd_test.py @@ -1,5 +1,7 @@ +import hashlib import json import os +import random import re import stat import tarfile @@ -8,12 +10,13 @@ from contextlib import contextmanager import pytest from ...constants import * # NOQA +from ...helpers import open_item from ...helpers.time import parse_timestamp -from ...helpers.parseformat import parse_file_size +from ...helpers.parseformat import parse_file_size, ChunkerParams from ..platform_test import is_win32 -from . import cmd, create_test_files, RK_ENCRYPTION, open_archive, generate_archiver_tests +from . import cmd, create_regular_file, create_test_files, RK_ENCRYPTION, open_archive, generate_archiver_tests -pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary") # NOQA +pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote") # NOQA def test_transfer_upgrade(archivers, request, monkeypatch): @@ -285,9 +288,11 @@ def setup_repos(archiver, mp): when the context manager is exited, archiver will work with REPO2 (so the transfer can be run). """ original_location = archiver.repository_location + original_path = archiver.repository_path mp.setenv("BORG_PASSPHRASE", "pw1") archiver.repository_location = original_location + "1" + archiver.repository_path = original_path + "1" cmd(archiver, "repo-create", RK_ENCRYPTION) other_repo1 = f"--other-repo={original_location}1" @@ -296,6 +301,7 @@ def setup_repos(archiver, mp): mp.setenv("BORG_PASSPHRASE", "pw2") mp.setenv("BORG_OTHER_PASSPHRASE", "pw1") archiver.repository_location = original_location + "2" + archiver.repository_path = original_path + "2" cmd(archiver, "repo-create", RK_ENCRYPTION, other_repo1) @@ -400,3 +406,66 @@ def test_transfer_recompress(archivers, request, monkeypatch, recompress_mode): # We allow a small percentage difference to account for metadata changes. size_diff_percent = abs(source_size - dest_size) / source_size * 100 assert size_diff_percent < 5, f"dest_size ({dest_size}) should be similar as source_size ({source_size})." + + +def test_transfer_rechunk(archivers, request, monkeypatch): + """Test transfer with re-chunking""" + archiver = request.getfixturevalue(archivers) + + BLKSIZE = 4096 + source_chunker_params = "buzhash,19,23,21,4095" # default buzhash chunks + dest_chunker_params = f"fixed,{BLKSIZE}" # fixed chunk size + + with setup_repos(archiver, monkeypatch) as other_repo1: + contents_1 = random.randbytes(1 * BLKSIZE) + contents_255 = random.randbytes(255 * BLKSIZE) + contents_1024 = random.randbytes(1024 * BLKSIZE) + create_regular_file(archiver.input_path, "file_1", contents=contents_1) + create_regular_file(archiver.input_path, "file_256", contents=contents_255 + contents_1) + create_regular_file(archiver.input_path, "file_1280", contents=contents_1024 + contents_255 + contents_1) + + cmd(archiver, "create", f"--chunker-params={source_chunker_params}", "archive", "input") + + # Get metadata from source archive + source_info_json = cmd(archiver, "info", "--json", "archive") + source_info = json.loads(source_info_json) + source_archive = source_info["archives"][0] + source_chunker_params_info = source_archive["chunker_params"] + + # Calculate SHA256 hashes of file contents from source archive + source_archive_obj, source_repo = open_archive(archiver.repository_path, "archive") + with source_repo: + source_file_hashes = {} + for item in source_archive_obj.iter_items(): + if hasattr(item, "chunks"): # Only process regular files with chunks + f = open_item(source_archive_obj, item) + content = f.read(10 * 1024 * 1024) # Read up to 10 MB + source_file_hashes[item.path] = hashlib.sha256(content).hexdigest() + + # Transfer with rechunking + cmd(archiver, "transfer", other_repo1, f"--chunker-params={dest_chunker_params}") + + # Get metadata from destination archive + dest_info_json = cmd(archiver, "info", "--json", "archive") + dest_info = json.loads(dest_info_json) + dest_archive = dest_info["archives"][0] + dest_chunker_params_info = dest_archive["chunker_params"] + + # chunker params in metadata must reflect the chunker params given on the CLI + assert tuple(source_chunker_params_info) == ChunkerParams(source_chunker_params) + assert tuple(dest_chunker_params_info) == ChunkerParams(dest_chunker_params) + + # Compare file hashes between source and destination archives, also check expected chunk counts. + dest_archive_obj, dest_repo = open_archive(archiver.repository_path, "archive") + with dest_repo: + for item in dest_archive_obj.iter_items(): + if hasattr(item, "chunks"): # Only process regular files with chunks + # Verify expected chunk count for each file + expected_chunk_count = {"input/file_1": 1, "input/file_256": 256, "input/file_1280": 1280}[item.path] + assert len(item.chunks) == expected_chunk_count + f = open_item(dest_archive_obj, item) + content = f.read(10 * 1024 * 1024) # Read up to 10 MB + dest_hash = hashlib.sha256(content).hexdigest() + # Verify that the file hash is identical to the source + assert item.path in source_file_hashes, f"File {item.path} not found in source archive" + assert dest_hash == source_file_hashes[item.path], f"Content hash mismatch for {item.path}" diff --git a/src/borg/upgrade.py b/src/borg/upgrade.py index 92b26b890..0c4eedc19 100644 --- a/src/borg/upgrade.py +++ b/src/borg/upgrade.py @@ -10,8 +10,8 @@ logger = create_logger(__name__) class UpgraderNoOp: - def __init__(self, *, cache): - pass + def __init__(self, *, cache, args): + self.args = args def new_archive(self, *, archive): pass @@ -37,14 +37,19 @@ class UpgraderNoOp: ): if hasattr(metadata, attr): new_metadata[attr] = getattr(metadata, attr) + rechunking = self.args.chunker_params is not None + if rechunking: + # if we are rechunking while transferring, we take the new chunker_params. + new_metadata["chunker_params"] = self.args.chunker_params return new_metadata class UpgraderFrom12To20: borg1_header_fmt = Struct(">I") - def __init__(self, *, cache): + def __init__(self, *, cache, args): self.cache = cache + self.args = args def new_archive(self, *, archive): self.archive = archive @@ -144,10 +149,15 @@ class UpgraderFrom12To20: for attr in ("hostname", "username", "comment", "chunker_params"): if hasattr(metadata, attr): new_metadata[attr] = getattr(metadata, attr) - if chunker_params := new_metadata.get("chunker_params"): - if len(chunker_params) == 4 and isinstance(chunker_params[0], int): - # this is a borg < 1.2 chunker_params tuple, no chunker algo specified, but we only had buzhash: - new_metadata["chunker_params"] = (CH_BUZHASH,) + chunker_params + rechunking = self.args.chunker_params is not None + if rechunking: + # if we are rechunking while transferring, we take the new chunker_params. + new_metadata["chunker_params"] = self.args.chunker_params + else: + if chunker_params := new_metadata.get("chunker_params"): + if len(chunker_params) == 4 and isinstance(chunker_params[0], int): + # this is a borg < 1.2 chunker_params tuple, no chunker algo specified, but we only had buzhash: + new_metadata["chunker_params"] = (CH_BUZHASH,) + chunker_params # old borg used UTC timestamps, but did not have the explicit tz offset in them. for attr in ("time", "time_end"): if hasattr(metadata, attr):