diff --git a/src/borg/archiver/transfer_cmd.py b/src/borg/archiver/transfer_cmd.py index c3fa8c9b7..dcde6b883 100644 --- a/src/borg/archiver/transfer_cmd.py +++ b/src/borg/archiver/transfer_cmd.py @@ -106,8 +106,11 @@ class TransferMixIn: if refcount == 0: # target repo does not yet have this chunk if not dry_run: cdata = other_repository.get(chunk_id) - # keep compressed payload same, avoid decompression / recompression - meta, data = other_manifest.repo_objs.parse(chunk_id, cdata, decompress=False) + # keep compressed payload same, verify via assert_id (that will + # decompress, but avoid needing to compress it again): + meta, data = other_manifest.repo_objs.parse( + chunk_id, cdata, decompress=True, want_compressed=True + ) meta, data = upgrader.upgrade_compressed_chunk(meta, data) chunk_entry = cache.add_chunk( chunk_id, diff --git a/src/borg/repoobj.py b/src/borg/repoobj.py index 0abb26988..edfa070ae 100644 --- a/src/borg/repoobj.py +++ b/src/borg/repoobj.py @@ -70,7 +70,20 @@ class RepoObj: meta = msgpack.unpackb(meta_packed) return meta - def parse(self, id: bytes, cdata: bytes, decompress: bool = True) -> tuple[dict, bytes]: + def parse( + self, id: bytes, cdata: bytes, decompress: bool = True, want_compressed: bool = False + ) -> tuple[dict, bytes]: + """ + Parse a repo object into metadata and data (decrypt it, maybe decompress, maybe verify if the chunk plaintext + corresponds to the chunk id via assert_id()). + + Tweaking options (default is usually fine): + - decompress=True, want_compressed=False: slow, verifying. returns decompressed data (default). + - decompress=True, want_compressed=True: slow, verifying. returns compressed data (caller wants to reuse it). + - decompress=False, want_compressed=True: quick, not verifying. returns compressed data (caller wants to reuse). + - decompress=False, want_compressed=False: invalid + """ + assert not (not decompress and not want_compressed), "invalid parameter combination!" assert isinstance(id, bytes) assert isinstance(cdata, bytes) obj = memoryview(cdata) @@ -81,24 +94,26 @@ class RepoObj: meta_encrypted = obj[offs : offs + len_meta_encrypted] offs += len_meta_encrypted meta_packed = self.key.decrypt(id, meta_encrypted) - meta = msgpack.unpackb(meta_packed) + meta_compressed = msgpack.unpackb(meta_packed) # means: before adding more metadata in decompress block data_encrypted = obj[offs:] - data_compressed = self.key.decrypt(id, data_encrypted) + data_compressed = self.key.decrypt(id, data_encrypted) # does not include the type/level bytes if decompress: - ctype = meta["ctype"] - clevel = meta["clevel"] - csize = meta["csize"] # always the overall size + ctype = meta_compressed["ctype"] + clevel = meta_compressed["clevel"] + csize = meta_compressed["csize"] # always the overall size assert csize == len(data_compressed) - psize = meta.get("psize", csize) # obfuscation: psize (payload size) is potentially less than csize. + psize = meta_compressed.get( + "psize", csize + ) # obfuscation: psize (payload size) is potentially less than csize. assert psize <= csize compr_hdr = bytes((ctype, clevel)) compressor_cls, compression_level = Compressor.detect(compr_hdr) compressor = compressor_cls(level=compression_level) - meta, data = compressor.decompress(meta, data_compressed[:psize]) + meta, data = compressor.decompress(dict(meta_compressed), data_compressed[:psize]) self.key.assert_id(id, data) else: - data = data_compressed # does not include the type/level bytes - return meta, data + meta, data = None, None + return meta_compressed if want_compressed else meta, data_compressed if want_compressed else data class RepoObj1: # legacy @@ -140,19 +155,22 @@ class RepoObj1: # legacy def parse_meta(self, id: bytes, cdata: bytes) -> dict: raise NotImplementedError("parse_meta is not available for RepoObj1") - def parse(self, id: bytes, cdata: bytes, decompress: bool = True) -> tuple[dict, bytes]: + def parse( + self, id: bytes, cdata: bytes, decompress: bool = True, want_compressed: bool = False + ) -> tuple[dict, bytes]: + assert not (not decompress and not want_compressed), "invalid parameter combination!" assert isinstance(id, bytes) assert isinstance(cdata, bytes) data_compressed = self.key.decrypt(id, cdata) compressor_cls, compression_level = Compressor.detect(data_compressed[:2]) compressor = compressor_cls(level=compression_level, legacy_mode=True) + meta_compressed = {} + meta_compressed["ctype"] = compressor.ID + meta_compressed["clevel"] = compressor.level + meta_compressed["csize"] = len(data_compressed) if decompress: meta, data = compressor.decompress(None, data_compressed) self.key.assert_id(id, data) else: - meta = {} - meta["ctype"] = compressor.ID - meta["clevel"] = compressor.level - meta["csize"] = len(data_compressed) - data = data_compressed - return meta, data + meta, data = None, None + return meta_compressed if want_compressed else meta, data_compressed if want_compressed else data diff --git a/src/borg/testsuite/repoobj.py b/src/borg/testsuite/repoobj.py index 1f3d17f2e..5d11ad931 100644 --- a/src/borg/testsuite/repoobj.py +++ b/src/borg/testsuite/repoobj.py @@ -68,7 +68,9 @@ def test_borg1_borg2_transition(key): repo_objs1 = RepoObj1(key) id = repo_objs1.id_hash(data) borg1_cdata = repo_objs1.format(id, meta, data) - meta1, compr_data1 = repo_objs1.parse(id, borg1_cdata, decompress=False) # borg transfer avoids (de)compression + meta1, compr_data1 = repo_objs1.parse( + id, borg1_cdata, decompress=True, want_compressed=True + ) # avoid re-compression # in borg 1, we can only get this metadata after decrypting the whole chunk (and we do not have "size" here): assert meta1["ctype"] == LZ4.ID # default compression assert meta1["clevel"] == 0xFF # lz4 does not know levels (yet?)