From 1b6f9289174d42ddf6fe340d7ff5a1f02557aef3 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 15 Sep 2023 22:19:29 +0200 Subject: [PATCH] ro_type: typed repo objects, see #7670 writing: put type into repoobj metadata reading: check wanted type against type we got repoobj metadata is encrypted and authenticated. repoobj data is encrypted and authenticated, also (separately). encryption and decryption of both metadata and data get the same "chunk ID" as AAD, so both are "bound" to that (same) ID. a repo-side attacker can neither see cleartext metadata/data, nor successfully tamper with it (AEAD decryption would fail). also, a repo-side attacker could not replace a repoobj A with a differently typed repoobj B without borg noticing: - the metadata/data is cryptographically bound to its ID. authentication/decryption would fail on mismatch. - the type check would fail. thus, the problem (see CVEs in changelog) solved in borg 1 by the manifest and archive TAMs is now already solved by the type check. --- src/borg/archive.py | 59 +++++++++++--------- src/borg/archiver/debug_cmd.py | 20 ++++--- src/borg/archiver/rcompress_cmd.py | 7 ++- src/borg/archiver/tar_cmds.py | 4 +- src/borg/archiver/transfer_cmd.py | 18 +++++- src/borg/cache.py | 27 +++++++-- src/borg/constants.py | 8 +++ src/borg/fuse.py | 3 +- src/borg/helpers/misc.py | 3 +- src/borg/helpers/parseformat.py | 2 +- src/borg/manifest.py | 4 +- src/borg/remote.py | 2 +- src/borg/repoobj.py | 18 +++++- src/borg/testsuite/archive.py | 3 +- src/borg/testsuite/archiver/check_cmd.py | 2 +- src/borg/testsuite/archiver/checks.py | 4 +- src/borg/testsuite/archiver/rcompress_cmd.py | 4 +- src/borg/testsuite/remote.py | 3 +- src/borg/testsuite/repoobj.py | 52 +++++++++++++---- 19 files changed, 171 insertions(+), 72 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 8d85724c8..8db9e28c9 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -296,7 +296,7 @@ class DownloadPipeline: """ hlids_preloaded = set() unpacker = msgpack.Unpacker(use_list=False) - for data in self.fetch_many(ids): + for data in self.fetch_many(ids, ro_type=ROBJ_ARCHIVE_STREAM): unpacker.feed(data) for _item in unpacker: item = Item(internal_dict=_item) @@ -318,9 +318,10 @@ class DownloadPipeline: self.repository.preload([c.id for c in item.chunks]) yield item - def fetch_many(self, ids, is_preloaded=False): + def fetch_many(self, ids, is_preloaded=False, ro_type=None): + assert ro_type is not None for id_, cdata in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)): - _, data = self.repo_objs.parse(id_, cdata) + _, data = self.repo_objs.parse(id_, cdata, ro_type=ro_type) yield data @@ -393,7 +394,9 @@ class CacheChunkBuffer(ChunkBuffer): self.stats = stats def write_chunk(self, chunk): - id_, _ = self.cache.add_chunk(self.key.id_hash(chunk), {}, chunk, stats=self.stats, wait=False) + id_, _ = self.cache.add_chunk( + self.key.id_hash(chunk), {}, chunk, stats=self.stats, wait=False, ro_type=ROBJ_ARCHIVE_STREAM + ) logger.debug(f"writing item metadata stream chunk {bin_to_hex(id_)}") self.cache.repository.async_response(wait=False) return id_ @@ -422,7 +425,7 @@ def archive_get_items(metadata, *, repo_objs, repository): assert "items" not in metadata items = [] for id, cdata in zip(metadata.item_ptrs, repository.get_many(metadata.item_ptrs)): - _, data = repo_objs.parse(id, cdata) + _, data = repo_objs.parse(id, cdata, ro_type=ROBJ_ARCHIVE_CHUNKIDS) ids = msgpack.unpackb(data) items.extend(ids) return items @@ -440,9 +443,9 @@ def archive_put_items(chunk_ids, *, repo_objs, cache=None, stats=None, add_refer id = repo_objs.id_hash(data) logger.debug(f"writing item_ptrs chunk {bin_to_hex(id)}") if cache is not None and stats is not None: - cache.add_chunk(id, {}, data, stats=stats) + cache.add_chunk(id, {}, data, stats=stats, ro_type=ROBJ_ARCHIVE_CHUNKIDS) elif add_reference is not None: - cdata = repo_objs.format(id, {}, data) + cdata = repo_objs.format(id, {}, data, ro_type=ROBJ_ARCHIVE_CHUNKIDS) add_reference(id, len(data), cdata) else: raise NotImplementedError @@ -531,7 +534,7 @@ class Archive: def _load_meta(self, id): cdata = self.repository.get(id) - _, data = self.repo_objs.parse(id, cdata) + _, data = self.repo_objs.parse(id, cdata, ro_type=ROBJ_ARCHIVE_META) archive, _ = self.key.unpack_and_verify_archive(data) metadata = ArchiveItem(internal_dict=archive) if metadata.version not in (1, 2): # legacy: still need to read v1 archives @@ -702,7 +705,7 @@ Duration: {0.duration} data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b"archive") self.id = self.repo_objs.id_hash(data) try: - self.cache.add_chunk(self.id, {}, data, stats=self.stats) + self.cache.add_chunk(self.id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META) except IntegrityError as err: err_msg = str(err) # hack to avoid changing the RPC protocol by introducing new (more specific) exception class @@ -740,7 +743,7 @@ Duration: {0.duration} for id, chunk in zip(self.metadata.items, self.repository.get_many(self.metadata.items)): pi.show(increase=1) add(id) - _, data = self.repo_objs.parse(id, chunk) + _, data = self.repo_objs.parse(id, chunk, ro_type=ROBJ_ARCHIVE_STREAM) sync.feed(data) unique_size = archive_index.stats_against(cache.chunks)[1] pi.finish() @@ -826,7 +829,9 @@ Duration: {0.duration} # it would get stuck. if "chunks" in item: item_chunks_size = 0 - for data in self.pipeline.fetch_many([c.id for c in item.chunks], is_preloaded=True): + for data in self.pipeline.fetch_many( + [c.id for c in item.chunks], is_preloaded=True, ro_type=ROBJ_FILE_STREAM + ): if pi: pi.show(increase=len(data), info=[remove_surrogates(item.path)]) if stdout: @@ -878,7 +883,7 @@ Duration: {0.duration} fd = open(path, "wb") with fd: ids = [c.id for c in item.chunks] - for data in self.pipeline.fetch_many(ids, is_preloaded=True): + for data in self.pipeline.fetch_many(ids, is_preloaded=True, ro_type=ROBJ_FILE_STREAM): if pi: pi.show(increase=len(data), info=[remove_surrogates(item.path)]) with backup_io("write"): @@ -1027,7 +1032,7 @@ Duration: {0.duration} del metadata.items data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b"archive") new_id = self.key.id_hash(data) - self.cache.add_chunk(new_id, {}, data, stats=self.stats) + self.cache.add_chunk(new_id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META) self.manifest.archives[self.name] = (new_id, metadata.time) self.cache.chunk_decref(self.id, self.stats) self.id = new_id @@ -1076,7 +1081,7 @@ Duration: {0.duration} for i, (items_id, data) in enumerate(zip(items_ids, self.repository.get_many(items_ids))): if progress: pi.show(i) - _, data = self.repo_objs.parse(items_id, data) + _, data = self.repo_objs.parse(items_id, data, ro_type=ROBJ_ARCHIVE_STREAM) unpacker.feed(data) chunk_decref(items_id, stats) try: @@ -1132,8 +1137,8 @@ Duration: {0.duration} path, item1, item2, - archive1.pipeline.fetch_many([c.id for c in item1.get("chunks", [])]), - archive2.pipeline.fetch_many([c.id for c in item2.get("chunks", [])]), + archive1.pipeline.fetch_many([c.id for c in item1.get("chunks", [])], ro_type=ROBJ_FILE_STREAM), + archive2.pipeline.fetch_many([c.id for c in item2.get("chunks", [])], ro_type=ROBJ_FILE_STREAM), can_compare_chunk_ids=can_compare_chunk_ids, ) @@ -1319,7 +1324,7 @@ class ChunksProcessor: started_hashing = time.monotonic() chunk_id, data = cached_hash(chunk, self.key.id_hash) stats.hashing_time += time.monotonic() - started_hashing - chunk_entry = cache.add_chunk(chunk_id, {}, data, stats=stats, wait=False) + chunk_entry = cache.add_chunk(chunk_id, {}, data, stats=stats, wait=False, ro_type=ROBJ_FILE_STREAM) self.cache.repository.async_response(wait=False) return chunk_entry @@ -1898,7 +1903,7 @@ class ArchiveChecker: else: try: # we must decompress, so it'll call assert_id() in there: - self.repo_objs.parse(chunk_id, encrypted_data, decompress=True) + self.repo_objs.parse(chunk_id, encrypted_data, decompress=True, ro_type=ROBJ_DONTCARE) except IntegrityErrorBase as integrity_error: self.error_found = True errors += 1 @@ -1930,7 +1935,7 @@ class ArchiveChecker: try: encrypted_data = self.repository.get(defect_chunk) # we must decompress, so it'll call assert_id() in there: - self.repo_objs.parse(defect_chunk, encrypted_data, decompress=True) + self.repo_objs.parse(defect_chunk, encrypted_data, decompress=True, ro_type=ROBJ_DONTCARE) except IntegrityErrorBase: # failed twice -> get rid of this chunk del self.chunks[defect_chunk] @@ -1978,7 +1983,7 @@ class ArchiveChecker: pi.show() cdata = self.repository.get(chunk_id) try: - _, data = self.repo_objs.parse(chunk_id, cdata) + _, data = self.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_DONTCARE) except IntegrityErrorBase as exc: logger.error("Skipping corrupted chunk: %s", exc) self.error_found = True @@ -2043,7 +2048,7 @@ class ArchiveChecker: def add_callback(chunk): id_ = self.key.id_hash(chunk) - cdata = self.repo_objs.format(id_, {}, chunk) + cdata = self.repo_objs.format(id_, {}, chunk, ro_type=ROBJ_ARCHIVE_STREAM) add_reference(id_, len(chunk), cdata) return id_ @@ -2066,7 +2071,7 @@ class ArchiveChecker: def replacement_chunk(size): chunk = Chunk(None, allocation=CH_ALLOC, size=size) chunk_id, data = cached_hash(chunk, self.key.id_hash) - cdata = self.repo_objs.format(chunk_id, {}, data) + cdata = self.repo_objs.format(chunk_id, {}, data, ro_type=ROBJ_FILE_STREAM) return chunk_id, size, cdata offset = 0 @@ -2197,7 +2202,7 @@ class ArchiveChecker: unpacker.resync() for chunk_id, cdata in zip(items, repository.get_many(items)): try: - _, data = self.repo_objs.parse(chunk_id, cdata) + _, data = self.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_ARCHIVE_STREAM) unpacker.feed(data) for item in unpacker: valid, reason = valid_item(item) @@ -2260,7 +2265,7 @@ class ArchiveChecker: mark_as_possibly_superseded(archive_id) cdata = self.repository.get(archive_id) try: - _, data = self.repo_objs.parse(archive_id, cdata) + _, data = self.repo_objs.parse(archive_id, cdata, ro_type=ROBJ_ARCHIVE_META) except IntegrityError as integrity_error: logger.error("Archive metadata block %s is corrupted: %s", bin_to_hex(archive_id), integrity_error) self.error_found = True @@ -2298,7 +2303,7 @@ class ArchiveChecker: ) data = self.key.pack_and_authenticate_metadata(archive.as_dict(), context=b"archive", salt=salt) new_archive_id = self.key.id_hash(data) - cdata = self.repo_objs.format(new_archive_id, {}, data) + cdata = self.repo_objs.format(new_archive_id, {}, data, ro_type=ROBJ_ARCHIVE_META) add_reference(new_archive_id, len(data), cdata) self.manifest.archives[info.name] = (new_archive_id, info.ts) pi.finish() @@ -2434,13 +2439,13 @@ class ArchiveRecreater: chunk_id, data = cached_hash(chunk, self.key.id_hash) if chunk_id in self.seen_chunks: return self.cache.chunk_incref(chunk_id, target.stats) - chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False) + chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False, ro_type=ROBJ_FILE_STREAM) self.cache.repository.async_response(wait=False) self.seen_chunks.add(chunk_entry.id) return chunk_entry def iter_chunks(self, archive, target, chunks): - chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _ in chunks]) + chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _ in chunks], ro_type=ROBJ_FILE_STREAM) if target.recreate_rechunkify: # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk # (does not load the entire file into memory) diff --git a/src/borg/archiver/debug_cmd.py b/src/borg/archiver/debug_cmd.py index 6c0d1892f..cab8075c8 100644 --- a/src/borg/archiver/debug_cmd.py +++ b/src/borg/archiver/debug_cmd.py @@ -35,7 +35,7 @@ class DebugMixIn: repo_objs = manifest.repo_objs archive = Archive(manifest, args.name) for i, item_id in enumerate(archive.metadata.items): - _, data = repo_objs.parse(item_id, repository.get(item_id)) + _, data = repo_objs.parse(item_id, repository.get(item_id), ro_type=ROBJ_ARCHIVE_STREAM) filename = "%06d_%s.items" % (i, bin_to_hex(item_id)) print("Dumping", filename) with open(filename, "wb") as fd: @@ -65,7 +65,8 @@ class DebugMixIn: fd.write(do_indent(prepare_dump_dict(archive_meta_orig))) fd.write(",\n") - _, data = repo_objs.parse(archive_meta_orig["id"], repository.get(archive_meta_orig["id"])) + archive_id = archive_meta_orig["id"] + _, data = repo_objs.parse(archive_id, repository.get(archive_id), ro_type=ROBJ_ARCHIVE_META) archive_org_dict = msgpack.unpackb(data, object_hook=StableDict) fd.write(' "_meta":\n') @@ -77,10 +78,10 @@ class DebugMixIn: first = True items = [] for chunk_id in archive_org_dict["item_ptrs"]: - _, data = repo_objs.parse(chunk_id, repository.get(chunk_id)) + _, data = repo_objs.parse(chunk_id, repository.get(chunk_id), ro_type=ROBJ_ARCHIVE_CHUNKIDS) items.extend(msgpack.unpackb(data)) for item_id in items: - _, data = repo_objs.parse(item_id, repository.get(item_id)) + _, data = repo_objs.parse(item_id, repository.get(item_id), ro_type=ROBJ_ARCHIVE_STREAM) unpacker.feed(data) for item in unpacker: item = prepare_dump_dict(item) @@ -101,7 +102,7 @@ class DebugMixIn: def do_debug_dump_manifest(self, args, repository, manifest): """dump decoded repository manifest""" repo_objs = manifest.repo_objs - _, data = repo_objs.parse(manifest.MANIFEST_ID, repository.get(manifest.MANIFEST_ID)) + _, data = repo_objs.parse(manifest.MANIFEST_ID, repository.get(manifest.MANIFEST_ID), ro_type=ROBJ_MANIFEST) meta = prepare_dump_dict(msgpack.unpackb(data, object_hook=StableDict)) @@ -116,7 +117,7 @@ class DebugMixIn: def decrypt_dump(i, id, cdata, tag=None, segment=None, offset=None): if cdata is not None: - _, data = repo_objs.parse(id, cdata) + _, data = repo_objs.parse(id, cdata, ro_type=ROBJ_DONTCARE) else: _, data = {}, b"" tag_str = "" if tag is None else "_" + tag @@ -211,7 +212,7 @@ class DebugMixIn: break for id in ids: cdata = repository.get(id) - _, data = repo_objs.parse(id, cdata) + _, data = repo_objs.parse(id, cdata, ro_type=ROBJ_DONTCARE) # try to locate wanted sequence crossing the border of last_data and data boundary_data = last_data[-(len(wanted) - 1) :] + data[: len(wanted) - 1] @@ -284,7 +285,7 @@ class DebugMixIn: cdata = f.read() repo_objs = manifest.repo_objs - meta, data = repo_objs.parse(id=id, cdata=cdata) + meta, data = repo_objs.parse(id=id, cdata=cdata, ro_type=ROBJ_DONTCARE) with open(args.json_path, "w") as f: json.dump(meta, f) @@ -315,7 +316,8 @@ class DebugMixIn: meta = json.load(f) repo_objs = manifest.repo_objs - data_encrypted = repo_objs.format(id=id, meta=meta, data=data) + # TODO: support misc repo object types other than ROBJ_FILE_STREAM + data_encrypted = repo_objs.format(id=id, meta=meta, data=data, ro_type=ROBJ_FILE_STREAM) with open(args.object_path, "wb") as f: f.write(data_encrypted) diff --git a/src/borg/archiver/rcompress_cmd.py b/src/borg/archiver/rcompress_cmd.py index 038fd70be..40d7571d0 100644 --- a/src/borg/archiver/rcompress_cmd.py +++ b/src/borg/archiver/rcompress_cmd.py @@ -37,7 +37,7 @@ def find_chunks(repository, repo_objs, stats, ctype, clevel, olevel): if not chunk_ids: break for id, chunk_no_data in zip(chunk_ids, repository.get_many(chunk_ids, read_data=False)): - meta = repo_objs.parse_meta(id, chunk_no_data) + meta = repo_objs.parse_meta(id, chunk_no_data, ro_type=ROBJ_DONTCARE) compr_found = meta["ctype"], meta["clevel"], meta.get("olevel", -1) if compr_found != compr_wanted: recompress_ids.append(id) @@ -57,13 +57,14 @@ def process_chunks(repository, repo_objs, stats, recompress_ids, olevel): for id, chunk in zip(recompress_ids, repository.get_many(recompress_ids, read_data=True)): old_size = len(chunk) stats["old_size"] += old_size - meta, data = repo_objs.parse(id, chunk) + meta, data = repo_objs.parse(id, chunk, ro_type=ROBJ_DONTCARE) + ro_type = meta.pop("type", None) compr_old = meta["ctype"], meta["clevel"], meta.get("olevel", -1) if olevel == -1: # if the chunk was obfuscated, but should not be in future, remove related metadata meta.pop("olevel", None) meta.pop("psize", None) - chunk = repo_objs.format(id, meta, data) + chunk = repo_objs.format(id, meta, data, ro_type=ro_type) compr_done = meta["ctype"], meta["clevel"], meta.get("olevel", -1) if compr_done != compr_old: # we actually changed something diff --git a/src/borg/archiver/tar_cmds.py b/src/borg/archiver/tar_cmds.py index b18cb24d0..0504f97e2 100644 --- a/src/borg/archiver/tar_cmds.py +++ b/src/borg/archiver/tar_cmds.py @@ -115,7 +115,9 @@ class TarMixIn: """ Return a file-like object that reads from the chunks of *item*. """ - chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _ in item.chunks], is_preloaded=True) + chunk_iterator = archive.pipeline.fetch_many( + [chunk_id for chunk_id, _ in item.chunks], is_preloaded=True, ro_type=ROBJ_FILE_STREAM + ) if pi: info = [remove_surrogates(item.path)] return ChunkIteratorFileWrapper( diff --git a/src/borg/archiver/transfer_cmd.py b/src/borg/archiver/transfer_cmd.py index 49529a6cf..f820ef63f 100644 --- a/src/borg/archiver/transfer_cmd.py +++ b/src/borg/archiver/transfer_cmd.py @@ -111,7 +111,11 @@ class TransferMixIn: # keep compressed payload same, verify via assert_id (that will # decompress, but avoid needing to compress it again): meta, data = other_manifest.repo_objs.parse( - chunk_id, cdata, decompress=True, want_compressed=True + chunk_id, + cdata, + decompress=True, + want_compressed=True, + ro_type=ROBJ_FILE_STREAM, ) meta, data = upgrader.upgrade_compressed_chunk(meta, data) chunk_entry = cache.add_chunk( @@ -124,12 +128,20 @@ class TransferMixIn: size=size, ctype=meta["ctype"], clevel=meta["clevel"], + ro_type=ROBJ_FILE_STREAM, ) elif args.recompress == "always": # always decompress and re-compress file data chunks - meta, data = other_manifest.repo_objs.parse(chunk_id, cdata) + meta, data = other_manifest.repo_objs.parse( + chunk_id, cdata, ro_type=ROBJ_FILE_STREAM + ) chunk_entry = cache.add_chunk( - chunk_id, meta, data, stats=archive.stats, wait=False + chunk_id, + meta, + data, + stats=archive.stats, + wait=False, + ro_type=ROBJ_FILE_STREAM, ) else: raise ValueError(f"unsupported recompress mode: {args.recompress}") diff --git a/src/borg/cache.py b/src/borg/cache.py index 4f0032e16..fb99309aa 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -12,7 +12,7 @@ logger = create_logger() files_cache_logger = create_logger("borg.debug.files_cache") -from .constants import CACHE_README, FILES_CACHE_MODE_DISABLED +from .constants import CACHE_README, FILES_CACHE_MODE_DISABLED, ROBJ_FILE_STREAM from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer from .helpers import Location from .helpers import Error @@ -939,7 +939,21 @@ class LocalCache(CacheStatsMixin): self.cache_config.ignored_features.update(repo_features - my_features) self.cache_config.mandatory_features.update(repo_features & my_features) - def add_chunk(self, id, meta, data, *, stats, wait=True, compress=True, size=None, ctype=None, clevel=None): + def add_chunk( + self, + id, + meta, + data, + *, + stats, + wait=True, + compress=True, + size=None, + ctype=None, + clevel=None, + ro_type=ROBJ_FILE_STREAM, + ): + assert ro_type is not None if not self.txn_active: self.begin_txn() if size is None and compress: @@ -949,7 +963,9 @@ class LocalCache(CacheStatsMixin): return self.chunk_incref(id, stats) if size is None: raise ValueError("when giving compressed data for a new chunk, the uncompressed size must be given also") - cdata = self.repo_objs.format(id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel) + cdata = self.repo_objs.format( + id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type + ) self.repository.put(id, cdata, wait=wait) self.chunks.add(id, 1, size) stats.update(size, not refcount) @@ -1113,7 +1129,8 @@ Chunk index: {0.total_unique_chunks:20d} unknown""" def memorize_file(self, hashed_path, path_hash, st, ids): pass - def add_chunk(self, id, meta, data, *, stats, wait=True, compress=True, size=None): + def add_chunk(self, id, meta, data, *, stats, wait=True, compress=True, size=None, ro_type=ROBJ_FILE_STREAM): + assert ro_type is not None if not self._txn_active: self.begin_txn() if size is None and compress: @@ -1123,7 +1140,7 @@ Chunk index: {0.total_unique_chunks:20d} unknown""" refcount = self.seen_chunk(id, size) if refcount: return self.chunk_incref(id, stats, size=size) - cdata = self.repo_objs.format(id, meta, data, compress=compress) + cdata = self.repo_objs.format(id, meta, data, compress=compress, ro_type=ro_type) self.repository.put(id, cdata, wait=wait) self.chunks.add(id, 1, size) stats.update(size, not refcount) diff --git a/src/borg/constants.py b/src/borg/constants.py index 54528b61c..7f4cbc31d 100644 --- a/src/borg/constants.py +++ b/src/borg/constants.py @@ -33,6 +33,14 @@ UMASK_DEFAULT = 0o077 # forcing to 0o100XXX later STDIN_MODE_DEFAULT = 0o660 +# RepoObj types +ROBJ_MANIFEST = "M" # Manifest (directory of archives, other metadata) object +ROBJ_ARCHIVE_META = "A" # main archive metadata object +ROBJ_ARCHIVE_CHUNKIDS = "C" # objects with a list of archive metadata stream chunkids +ROBJ_ARCHIVE_STREAM = "S" # archive metadata stream chunk (containing items) +ROBJ_FILE_STREAM = "F" # file content stream chunk (containing user data) +ROBJ_DONTCARE = "*" # used to parse without type assertion (= accept any type) + # in borg < 1.3, this has been defined like this: # 20 MiB minus 41 bytes for a PUT header (because the "size" field in the Repository includes # the header, and the total size was set to precisely 20 MiB for borg < 1.3). diff --git a/src/borg/fuse.py b/src/borg/fuse.py index 376020d90..92f145874 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -10,6 +10,7 @@ import time from collections import defaultdict from signal import SIGINT +from .constants import ROBJ_FILE_STREAM from .fuse_impl import llfuse, has_pyfuse3 @@ -688,7 +689,7 @@ class FuseOperations(llfuse.Operations, FuseBackend): # evict fully read chunk from cache del self.data_cache[id] else: - _, data = self.repo_objs.parse(id, self.repository_uncached.get(id)) + _, data = self.repo_objs.parse(id, self.repository_uncached.get(id), ro_type=ROBJ_FILE_STREAM) if offset + n < len(data): # chunk was only partially read, cache it self.data_cache[id] = data diff --git a/src/borg/helpers/misc.py b/src/borg/helpers/misc.py index d844b7634..1f687a0bb 100644 --- a/src/borg/helpers/misc.py +++ b/src/borg/helpers/misc.py @@ -13,6 +13,7 @@ logger = create_logger() from . import msgpack from .. import __version__ as borg_version +from ..constants import ROBJ_FILE_STREAM def sysinfo(): @@ -123,7 +124,7 @@ class ChunkIteratorFileWrapper: def open_item(archive, item): """Return file-like object for archived item (with chunks).""" - chunk_iterator = archive.pipeline.fetch_many([c.id for c in item.chunks]) + chunk_iterator = archive.pipeline.fetch_many([c.id for c in item.chunks], ro_type=ROBJ_FILE_STREAM) return ChunkIteratorFileWrapper(chunk_iterator) diff --git a/src/borg/helpers/parseformat.py b/src/borg/helpers/parseformat.py index b4a455a04..7f8456044 100644 --- a/src/borg/helpers/parseformat.py +++ b/src/borg/helpers/parseformat.py @@ -933,7 +933,7 @@ class ItemFormatter(BaseFormatter): hash = self.xxh64() elif hash_function in self.hash_algorithms: hash = hashlib.new(hash_function) - for data in self.archive.pipeline.fetch_many([c.id for c in item.chunks]): + for data in self.archive.pipeline.fetch_many([c.id for c in item.chunks], ro_type=ROBJ_FILE_STREAM): hash.update(data) return hash.hexdigest() diff --git a/src/borg/manifest.py b/src/borg/manifest.py index a9609884e..047a2a4fb 100644 --- a/src/borg/manifest.py +++ b/src/borg/manifest.py @@ -250,7 +250,7 @@ class Manifest: if not key: key = key_factory(repository, cdata, ro_cls=ro_cls) manifest = cls(key, repository, ro_cls=ro_cls) - _, data = manifest.repo_objs.parse(cls.MANIFEST_ID, cdata) + _, data = manifest.repo_objs.parse(cls.MANIFEST_ID, cdata, ro_type=ROBJ_MANIFEST) manifest_dict = key.unpack_and_verify_manifest(data) m = ManifestItem(internal_dict=manifest_dict) manifest.id = manifest.repo_objs.id_hash(data) @@ -315,4 +315,4 @@ class Manifest: ) data = self.key.pack_and_authenticate_metadata(manifest.as_dict()) self.id = self.repo_objs.id_hash(data) - self.repository.put(self.MANIFEST_ID, self.repo_objs.format(self.MANIFEST_ID, {}, data)) + self.repository.put(self.MANIFEST_ID, self.repo_objs.format(self.MANIFEST_ID, {}, data, ro_type=ROBJ_MANIFEST)) diff --git a/src/borg/remote.py b/src/borg/remote.py index 40b8b62ff..4a1551f05 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -1204,7 +1204,7 @@ def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None return csize, decrypted def transform(id_, data): - meta, decrypted = repo_objs.parse(id_, data) + meta, decrypted = repo_objs.parse(id_, data, ro_type=ROBJ_DONTCARE) csize = meta.get("csize", len(data)) return csize, decrypted diff --git a/src/borg/repoobj.py b/src/borg/repoobj.py index 557d4a604..8514db4d1 100644 --- a/src/borg/repoobj.py +++ b/src/borg/repoobj.py @@ -1,5 +1,6 @@ from struct import Struct +from .constants import * # NOQA from .helpers import msgpack, workarounds from .compress import Compressor, LZ4_COMPRESSOR, get_compressor @@ -35,7 +36,11 @@ class RepoObj: size: int = None, ctype: int = None, clevel: int = None, + ro_type: str = None, ) -> bytes: + assert isinstance(ro_type, str) + assert ro_type != ROBJ_DONTCARE + meta["type"] = ro_type assert isinstance(id, bytes) assert isinstance(meta, dict) assert isinstance(data, (bytes, memoryview)) @@ -58,11 +63,12 @@ class RepoObj: hdr = self.meta_len_hdr.pack(len(meta_encrypted)) return hdr + meta_encrypted + data_encrypted - def parse_meta(self, id: bytes, cdata: bytes) -> dict: + def parse_meta(self, id: bytes, cdata: bytes, ro_type: str) -> dict: # when calling parse_meta, enough cdata needs to be supplied to contain completely the # meta_len_hdr and the encrypted, packed metadata. it is allowed to provide more cdata. assert isinstance(id, bytes) assert isinstance(cdata, bytes) + assert isinstance(ro_type, str) obj = memoryview(cdata) offs = self.meta_len_hdr.size hdr = obj[:offs] @@ -71,10 +77,11 @@ class RepoObj: meta_encrypted = obj[offs : offs + len_meta_encrypted] meta_packed = self.key.decrypt(id, meta_encrypted) meta = msgpack.unpackb(meta_packed) + assert ro_type == ROBJ_DONTCARE or meta["type"] == ro_type return meta def parse( - self, id: bytes, cdata: bytes, decompress: bool = True, want_compressed: bool = False + self, id: bytes, cdata: bytes, decompress: bool = True, want_compressed: bool = False, ro_type: str = None ) -> tuple[dict, bytes]: """ Parse a repo object into metadata and data (decrypt it, maybe decompress, maybe verify if the chunk plaintext @@ -86,6 +93,7 @@ class RepoObj: - decompress=False, want_compressed=True: quick, not verifying. returns compressed data (caller wants to reuse). - decompress=False, want_compressed=False: invalid """ + assert isinstance(ro_type, str) assert not (not decompress and not want_compressed), "invalid parameter combination!" assert isinstance(id, bytes) assert isinstance(cdata, bytes) @@ -98,6 +106,7 @@ class RepoObj: offs += len_meta_encrypted meta_packed = self.key.decrypt(id, meta_encrypted) meta_compressed = msgpack.unpackb(meta_packed) # means: before adding more metadata in decompress block + assert ro_type == ROBJ_DONTCARE or meta_compressed["type"] == ro_type data_encrypted = obj[offs:] data_compressed = self.key.decrypt(id, data_encrypted) # does not include the type/level bytes if decompress: @@ -142,10 +151,12 @@ class RepoObj1: # legacy size: int = None, ctype: int = None, clevel: int = None, + ro_type: str = None, ) -> bytes: assert isinstance(id, bytes) assert meta == {} assert isinstance(data, (bytes, memoryview)) + assert ro_type is not None assert compress or size is not None and ctype is not None and clevel is not None if compress: assert size is None or size == len(data) @@ -160,11 +171,12 @@ class RepoObj1: # legacy raise NotImplementedError("parse_meta is not available for RepoObj1") def parse( - self, id: bytes, cdata: bytes, decompress: bool = True, want_compressed: bool = False + self, id: bytes, cdata: bytes, decompress: bool = True, want_compressed: bool = False, ro_type: str = None ) -> tuple[dict, bytes]: assert not (not decompress and not want_compressed), "invalid parameter combination!" assert isinstance(id, bytes) assert isinstance(cdata, bytes) + assert ro_type is not None data_compressed = self.key.decrypt(id, cdata) compressor_cls, compression_level = Compressor.detect(data_compressed[:2]) compressor = compressor_cls(level=compression_level, legacy_mode=True) diff --git a/src/borg/testsuite/archive.py b/src/borg/testsuite/archive.py index cefd6293b..361a82892 100644 --- a/src/borg/testsuite/archive.py +++ b/src/borg/testsuite/archive.py @@ -127,7 +127,8 @@ class MockCache: self.objects = {} self.repository = self.MockRepo() - def add_chunk(self, id, meta, data, stats=None, wait=True): + def add_chunk(self, id, meta, data, stats=None, wait=True, ro_type=None): + assert ro_type is not None self.objects[id] = data return id, len(data) diff --git a/src/borg/testsuite/archiver/check_cmd.py b/src/borg/testsuite/archiver/check_cmd.py index 5ddce3437..01c313d56 100644 --- a/src/borg/testsuite/archiver/check_cmd.py +++ b/src/borg/testsuite/archiver/check_cmd.py @@ -243,7 +243,7 @@ def test_manifest_rebuild_duplicate_archive(archivers, request): } archive = repo_objs.key.pack_and_authenticate_metadata(archive_dict, context=b"archive") archive_id = repo_objs.id_hash(archive) - repository.put(archive_id, repo_objs.format(archive_id, {}, archive)) + repository.put(archive_id, repo_objs.format(archive_id, {}, archive, ro_type=ROBJ_ARCHIVE_META)) repository.commit(compact=False) cmd(archiver, "check", exit_code=1) cmd(archiver, "check", "--repair", exit_code=0) diff --git a/src/borg/testsuite/archiver/checks.py b/src/borg/testsuite/archiver/checks.py index f6311c8e3..c3ed2720a 100644 --- a/src/borg/testsuite/archiver/checks.py +++ b/src/borg/testsuite/archiver/checks.py @@ -337,6 +337,7 @@ def spoof_manifest(repository): "timestamp": (datetime.now(tz=timezone.utc) + timedelta(days=1)).isoformat(timespec="microseconds"), } ), + ro_type=ROBJ_MANIFEST, ) repository.put(Manifest.MANIFEST_ID, cdata) repository.commit(compact=False) @@ -357,6 +358,7 @@ def test_fresh_init_tam_required(archiver): "timestamp": (datetime.now(tz=timezone.utc) + timedelta(days=1)).isoformat(timespec="microseconds"), } ), + ro_type=ROBJ_MANIFEST, ) repository.put(Manifest.MANIFEST_ID, cdata) repository.commit(compact=False) @@ -397,7 +399,7 @@ def write_archive_without_tam(repository, archive_name): } ) archive_id = manifest.repo_objs.id_hash(archive_data) - cdata = manifest.repo_objs.format(archive_id, {}, archive_data) + cdata = manifest.repo_objs.format(archive_id, {}, archive_data, ro_type=ROBJ_ARCHIVE_META) repository.put(archive_id, cdata) manifest.archives[archive_name] = (archive_id, datetime.now()) manifest.write() diff --git a/src/borg/testsuite/archiver/rcompress_cmd.py b/src/borg/testsuite/archiver/rcompress_cmd.py index 3fa8a34bf..dbae44837 100644 --- a/src/borg/testsuite/archiver/rcompress_cmd.py +++ b/src/borg/testsuite/archiver/rcompress_cmd.py @@ -22,7 +22,9 @@ def test_rcompress(archiver): break for id in ids: chunk = repository.get(id, read_data=True) - meta, data = manifest.repo_objs.parse(id, chunk) # will also decompress according to metadata + meta, data = manifest.repo_objs.parse( + id, chunk, ro_type=ROBJ_DONTCARE + ) # will also decompress according to metadata m_olevel = meta.get("olevel", -1) m_psize = meta.get("psize", -1) print( diff --git a/src/borg/testsuite/remote.py b/src/borg/testsuite/remote.py index b5cb2e642..a2d84bcee 100644 --- a/src/borg/testsuite/remote.py +++ b/src/borg/testsuite/remote.py @@ -6,6 +6,7 @@ from unittest.mock import patch import pytest +from ..constants import ROBJ_FILE_STREAM from ..remote import SleepingBandwidthLimiter, RepositoryCache, cache_if_remote from ..repository import Repository from ..crypto.key import PlaintextKey @@ -205,7 +206,7 @@ class TestRepositoryCache: def _put_encrypted_object(self, repo_objs, repository, data): id_ = repo_objs.id_hash(data) - repository.put(id_, repo_objs.format(id_, {}, data)) + repository.put(id_, repo_objs.format(id_, {}, data, ro_type=ROBJ_FILE_STREAM)) return id_ @pytest.fixture diff --git a/src/borg/testsuite/repoobj.py b/src/borg/testsuite/repoobj.py index 5d11ad931..7f923f57a 100644 --- a/src/borg/testsuite/repoobj.py +++ b/src/borg/testsuite/repoobj.py @@ -1,5 +1,6 @@ import pytest +from ..constants import ROBJ_FILE_STREAM, ROBJ_MANIFEST, ROBJ_ARCHIVE_META from ..crypto.key import PlaintextKey from ..repository import Repository from ..repoobj import RepoObj, RepoObj1 @@ -21,14 +22,14 @@ def test_format_parse_roundtrip(key): data = b"foobar" * 10 id = repo_objs.id_hash(data) meta = {"custom": "something"} # size and csize are computed automatically - cdata = repo_objs.format(id, meta, data) + cdata = repo_objs.format(id, meta, data, ro_type=ROBJ_FILE_STREAM) - got_meta = repo_objs.parse_meta(id, cdata) + got_meta = repo_objs.parse_meta(id, cdata, ro_type=ROBJ_FILE_STREAM) assert got_meta["size"] == len(data) assert got_meta["csize"] < len(data) assert got_meta["custom"] == "something" - got_meta, got_data = repo_objs.parse(id, cdata) + got_meta, got_data = repo_objs.parse(id, cdata, ro_type=ROBJ_FILE_STREAM) assert got_meta["size"] == len(data) assert got_meta["csize"] < len(data) assert got_meta["custom"] == "something" @@ -44,11 +45,11 @@ def test_format_parse_roundtrip_borg1(key): # legacy data = b"foobar" * 10 id = repo_objs.id_hash(data) meta = {} # borg1 does not support this kind of metadata - cdata = repo_objs.format(id, meta, data) + cdata = repo_objs.format(id, meta, data, ro_type=ROBJ_FILE_STREAM) # borg1 does not support separate metadata and borg2 does not invoke parse_meta for borg1 repos - got_meta, got_data = repo_objs.parse(id, cdata) + got_meta, got_data = repo_objs.parse(id, cdata, ro_type=ROBJ_FILE_STREAM) assert got_meta["size"] == len(data) assert got_meta["csize"] < len(data) assert data == got_data @@ -67,9 +68,9 @@ def test_borg1_borg2_transition(key): len_data = len(data) repo_objs1 = RepoObj1(key) id = repo_objs1.id_hash(data) - borg1_cdata = repo_objs1.format(id, meta, data) + borg1_cdata = repo_objs1.format(id, meta, data, ro_type=ROBJ_FILE_STREAM) meta1, compr_data1 = repo_objs1.parse( - id, borg1_cdata, decompress=True, want_compressed=True + id, borg1_cdata, decompress=True, want_compressed=True, ro_type=ROBJ_FILE_STREAM ) # avoid re-compression # in borg 1, we can only get this metadata after decrypting the whole chunk (and we do not have "size" here): assert meta1["ctype"] == LZ4.ID # default compression @@ -80,18 +81,49 @@ def test_borg1_borg2_transition(key): # note: as we did not decompress, we do not have "size" and we need to get it from somewhere else. # here, we just use len_data. for borg transfer, we also know the size from another metadata source. borg2_cdata = repo_objs2.format( - id, dict(meta1), compr_data1[2:], compress=False, size=len_data, ctype=meta1["ctype"], clevel=meta1["clevel"] + id, + dict(meta1), + compr_data1[2:], + compress=False, + size=len_data, + ctype=meta1["ctype"], + clevel=meta1["clevel"], + ro_type=ROBJ_FILE_STREAM, ) - meta2, data2 = repo_objs2.parse(id, borg2_cdata) + meta2, data2 = repo_objs2.parse(id, borg2_cdata, ro_type=ROBJ_FILE_STREAM) assert data2 == data assert meta2["ctype"] == LZ4.ID assert meta2["clevel"] == 0xFF assert meta2["csize"] == meta1["csize"] - 2 # borg2 does not store the type/level bytes there assert meta2["size"] == len_data - meta2 = repo_objs2.parse_meta(id, borg2_cdata) + meta2 = repo_objs2.parse_meta(id, borg2_cdata, ro_type=ROBJ_FILE_STREAM) # now, in borg 2, we have nice and separately decrypted metadata (no need to decrypt the whole chunk): assert meta2["ctype"] == LZ4.ID assert meta2["clevel"] == 0xFF assert meta2["csize"] == meta1["csize"] - 2 # borg2 does not store the type/level bytes there assert meta2["size"] == len_data + + +def test_spoof_manifest(key): + repo_objs = RepoObj(key) + data = b"fake or malicious manifest data" # file content could be provided by attacker. + id = repo_objs.id_hash(data) + # create a repo object containing user data (file content data). + cdata = repo_objs.format(id, {}, data, ro_type=ROBJ_FILE_STREAM) + # let's assume an attacker somehow managed to replace the manifest with that repo object. + # as borg always give the ro_type it wants to read, this should fail: + with pytest.raises(AssertionError): + repo_objs.parse(id, cdata, ro_type=ROBJ_MANIFEST) + + +def test_spoof_archive(key): + repo_objs = RepoObj(key) + data = b"fake or malicious archive data" # file content could be provided by attacker. + id = repo_objs.id_hash(data) + # create a repo object containing user data (file content data). + cdata = repo_objs.format(id, {}, data, ro_type=ROBJ_FILE_STREAM) + # let's assume an attacker somehow managed to replace an archive with that repo object. + # as borg always give the ro_type it wants to read, this should fail: + with pytest.raises(AssertionError): + repo_objs.parse(id, cdata, ro_type=ROBJ_ARCHIVE_META)