mirror of
https://github.com/borgbackup/borg.git
synced 2026-05-28 04:03:21 -04:00
Merge pull request #7789 from ThomasWaldmann/archive-tam-verify-master
Archive tam verify security fix (master)
This commit is contained in:
commit
3eb070191d
9 changed files with 311 additions and 40 deletions
|
|
@ -5,6 +5,74 @@ Important notes 1.x
|
|||
|
||||
This section provides information about security and corruption issues.
|
||||
|
||||
.. _archives_tam_vuln:
|
||||
|
||||
Pre-1.2.5 archives spoofing vulnerability (CVE-2023-36811)
|
||||
----------------------------------------------------------
|
||||
|
||||
A flaw in the cryptographic authentication scheme in Borg allowed an attacker to
|
||||
fake archives and potentially indirectly cause backup data loss in the repository.
|
||||
|
||||
The attack requires an attacker to be able to
|
||||
|
||||
1. insert files (with no additional headers) into backups
|
||||
2. gain write access to the repository
|
||||
|
||||
This vulnerability does not disclose plaintext to the attacker, nor does it
|
||||
affect the authenticity of existing archives.
|
||||
|
||||
Creating plausible fake archives may be feasible for empty or small archives,
|
||||
but is unlikely for large archives.
|
||||
|
||||
The fix enforces checking the TAM authentication tag of archives at critical
|
||||
places. Borg now considers archives without TAM as garbage or an attack.
|
||||
|
||||
We are not aware of others having discovered, disclosed or exploited this vulnerability.
|
||||
|
||||
Below, if we speak of borg 1.2.5, we mean a borg version >= 1.2.5 **or** a
|
||||
borg version that has the relevant security patches for this vulnerability applied
|
||||
(could be also an older version in that case).
|
||||
|
||||
Steps you must take to upgrade a repository:
|
||||
|
||||
1. Upgrade all clients using this repository to borg 1.2.5.
|
||||
Note: it is not required to upgrade a server, except if the server-side borg
|
||||
is also used as a client (and not just for "borg serve").
|
||||
|
||||
Do **not** run ``borg check`` with borg 1.2.5 before completing the upgrade steps.
|
||||
|
||||
2. Run ``borg info --debug <repository> 2>&1 | grep TAM | grep -i manifest``.
|
||||
|
||||
a) If you get "TAM-verified manifest", continue with 3.
|
||||
b) If you get "Manifest TAM not found and not required", run
|
||||
``borg upgrade --tam --force <repository>`` *on every client*.
|
||||
|
||||
3. Run ``borg list --format='{name} {time} tam:{tam}{NL}' <repository>``.
|
||||
"tam:verified" means that the archive has a valid TAM authentication.
|
||||
"tam:none" is expected as output for archives created by borg <1.0.9.
|
||||
"tam:none" could also come from archives created by an attacker.
|
||||
You should verify that "tam:none" archives are authentic and not malicious
|
||||
(== have good content, have correct timestamp, can be extracted successfully).
|
||||
In case you find crappy/malicious archives, you must delete them before proceeding.
|
||||
In low-risk, trusted environments, you may decide on your own risk to skip step 3
|
||||
and just trust in everything being OK.
|
||||
|
||||
4. If there are no tam:non archives left at this point, you can skip this step.
|
||||
Run ``borg upgrade --archives-tam <repository>``.
|
||||
This will make sure all archives are TAM authenticated (an archive TAM will be added
|
||||
for all archives still missing one).
|
||||
``borg check`` would consider TAM-less archives as garbage or a potential attack.
|
||||
Optionally run the same command as in step 3 to see that all archives now are "tam:verified".
|
||||
|
||||
|
||||
Vulnerability time line:
|
||||
|
||||
* 2023-06-13: Vulnerability discovered during code review by Thomas Waldmann
|
||||
* 2023-06-13...: Work on fixing the issue, upgrade procedure, docs.
|
||||
* 2023-06-30: CVE was assigned via Github CNA
|
||||
* 2023-06-30 .. 2023-08-29: Fixed issue, code review, docs, testing.
|
||||
* 2023-08-30: Released fixed version 1.2.5
|
||||
|
||||
.. _hashindex_set_bug:
|
||||
|
||||
Pre-1.1.11 potential index corruption / data loss issue
|
||||
|
|
|
|||
|
|
@ -493,6 +493,7 @@ class Archive:
|
|||
self.name = name # overwritten later with name from archive metadata
|
||||
self.name_in_manifest = name # can differ from .name later (if borg check fixed duplicate archive names)
|
||||
self.comment = None
|
||||
self.tam_verified = False
|
||||
self.numeric_ids = numeric_ids
|
||||
self.noatime = noatime
|
||||
self.noctime = noctime
|
||||
|
|
@ -532,7 +533,9 @@ class Archive:
|
|||
def _load_meta(self, id):
|
||||
cdata = self.repository.get(id)
|
||||
_, data = self.repo_objs.parse(id, cdata)
|
||||
metadata = ArchiveItem(internal_dict=msgpack.unpackb(data))
|
||||
# we do not require TAM for archives, otherwise we can not even borg list a repo with old archives.
|
||||
archive, self.tam_verified, _ = self.key.unpack_and_verify_archive(data, force_tam_not_required=True)
|
||||
metadata = ArchiveItem(internal_dict=archive)
|
||||
if metadata.version not in (1, 2): # legacy: still need to read v1 archives
|
||||
raise Exception("Unknown archive metadata version")
|
||||
# note: metadata.items must not get written to disk!
|
||||
|
|
@ -1024,7 +1027,7 @@ Duration: {0.duration}
|
|||
setattr(metadata, key, value)
|
||||
if "items" in metadata:
|
||||
del metadata.items
|
||||
data = msgpack.packb(metadata.as_dict())
|
||||
data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b"archive")
|
||||
new_id = self.key.id_hash(data)
|
||||
self.cache.add_chunk(new_id, {}, data, stats=self.stats)
|
||||
self.manifest.archives[self.name] = (new_id, metadata.time)
|
||||
|
|
@ -1992,6 +1995,19 @@ class ArchiveChecker:
|
|||
except msgpack.UnpackException:
|
||||
continue
|
||||
if valid_archive(archive):
|
||||
# **after** doing the low-level checks and having a strong indication that we
|
||||
# are likely looking at an archive item here, also check the TAM authentication:
|
||||
try:
|
||||
archive, verified, _ = self.key.unpack_and_verify_archive(data, force_tam_not_required=False)
|
||||
except IntegrityError:
|
||||
# TAM issues - do not accept this archive!
|
||||
# either somebody is trying to attack us with a fake archive data or
|
||||
# we have an ancient archive made before TAM was a thing (borg < 1.0.9) **and** this repo
|
||||
# was not correctly upgraded to borg 1.2.5 (see advisory at top of the changelog).
|
||||
# borg can't tell the difference, so it has to assume this archive might be an attack
|
||||
# and drops this archive.
|
||||
continue
|
||||
# note: if we get here and verified is False, a TAM is not required.
|
||||
archive = ArchiveItem(internal_dict=archive)
|
||||
name = archive.name
|
||||
logger.info("Found archive %s", name)
|
||||
|
|
@ -2248,7 +2264,17 @@ class ArchiveChecker:
|
|||
self.error_found = True
|
||||
del self.manifest.archives[info.name]
|
||||
continue
|
||||
archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
|
||||
try:
|
||||
archive, verified, salt = self.key.unpack_and_verify_archive(data, force_tam_not_required=False)
|
||||
except IntegrityError as integrity_error:
|
||||
# looks like there is a TAM issue with this archive, this might be an attack!
|
||||
# when upgrading to borg 1.2.5, users are expected to TAM-authenticate all archives they
|
||||
# trust, so there shouldn't be any without TAM.
|
||||
logger.error("Archive TAM authentication issue for archive %s: %s", info.name, integrity_error)
|
||||
self.error_found = True
|
||||
del self.manifest.archives[info.name]
|
||||
continue
|
||||
archive = ArchiveItem(internal_dict=archive)
|
||||
if archive.version != 2:
|
||||
raise Exception("Unknown archive metadata version")
|
||||
items_buffer = ChunkBuffer(self.key)
|
||||
|
|
@ -2267,7 +2293,7 @@ class ArchiveChecker:
|
|||
archive.item_ptrs = archive_put_items(
|
||||
items_buffer.chunks, repo_objs=self.repo_objs, add_reference=add_reference
|
||||
)
|
||||
data = msgpack.packb(archive.as_dict())
|
||||
data = self.key.pack_and_authenticate_metadata(archive.as_dict(), context=b"archive", salt=salt)
|
||||
new_archive_id = self.key.id_hash(data)
|
||||
cdata = self.repo_objs.format(new_archive_id, {}, data)
|
||||
add_reference(new_archive_id, len(data), cdata)
|
||||
|
|
|
|||
|
|
@ -755,7 +755,8 @@ class LocalCache(CacheStatsMixin):
|
|||
nonlocal processed_item_metadata_chunks
|
||||
csize, data = decrypted_repository.get(archive_id)
|
||||
chunk_idx.add(archive_id, 1, len(data))
|
||||
archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
|
||||
archive, verified, _ = self.key.unpack_and_verify_archive(data, force_tam_not_required=True)
|
||||
archive = ArchiveItem(internal_dict=archive)
|
||||
if archive.version not in (1, 2): # legacy
|
||||
raise Exception("Unknown archive metadata version")
|
||||
if archive.version == 1:
|
||||
|
|
|
|||
|
|
@ -72,6 +72,15 @@ class TAMRequiredError(IntegrityError):
|
|||
traceback = False
|
||||
|
||||
|
||||
class ArchiveTAMRequiredError(TAMRequiredError):
|
||||
__doc__ = textwrap.dedent(
|
||||
"""
|
||||
Archive '{}' is unauthenticated, but it is required for this repository.
|
||||
"""
|
||||
).strip()
|
||||
traceback = False
|
||||
|
||||
|
||||
class TAMInvalid(IntegrityError):
|
||||
__doc__ = IntegrityError.__doc__
|
||||
traceback = False
|
||||
|
|
@ -81,6 +90,15 @@ class TAMInvalid(IntegrityError):
|
|||
super().__init__("Manifest authentication did not verify")
|
||||
|
||||
|
||||
class ArchiveTAMInvalid(IntegrityError):
|
||||
__doc__ = IntegrityError.__doc__
|
||||
traceback = False
|
||||
|
||||
def __init__(self):
|
||||
# Error message becomes: "Data integrity error: Archive authentication did not verify"
|
||||
super().__init__("Archive authentication did not verify")
|
||||
|
||||
|
||||
class TAMUnsupportedSuiteError(IntegrityError):
|
||||
"""Could not verify manifest: Unsupported suite {!r}; a newer version is needed."""
|
||||
|
||||
|
|
@ -225,11 +243,13 @@ class KeyBase:
|
|||
output_length=64,
|
||||
)
|
||||
|
||||
def pack_and_authenticate_metadata(self, metadata_dict, context=b"manifest"):
|
||||
def pack_and_authenticate_metadata(self, metadata_dict, context=b"manifest", salt=None):
|
||||
if salt is None:
|
||||
salt = os.urandom(64)
|
||||
metadata_dict = StableDict(metadata_dict)
|
||||
tam = metadata_dict["tam"] = StableDict({"type": "HKDF_HMAC_SHA512", "hmac": bytes(64), "salt": os.urandom(64)})
|
||||
tam = metadata_dict["tam"] = StableDict({"type": "HKDF_HMAC_SHA512", "hmac": bytes(64), "salt": salt})
|
||||
packed = msgpack.packb(metadata_dict)
|
||||
tam_key = self._tam_key(tam["salt"], context)
|
||||
tam_key = self._tam_key(salt, context)
|
||||
tam["hmac"] = hmac.digest(tam_key, packed, "sha512")
|
||||
return msgpack.packb(metadata_dict)
|
||||
|
||||
|
|
@ -252,7 +272,7 @@ class KeyBase:
|
|||
if tam_required:
|
||||
raise TAMRequiredError(self.repository._location.canonical_path())
|
||||
else:
|
||||
logger.debug("TAM not found and not required")
|
||||
logger.debug("Manifest TAM not found and not required")
|
||||
return unpacked, False
|
||||
tam = unpacked.pop("tam", None)
|
||||
if not isinstance(tam, dict):
|
||||
|
|
@ -262,7 +282,9 @@ class KeyBase:
|
|||
if tam_required:
|
||||
raise TAMUnsupportedSuiteError(repr(tam_type))
|
||||
else:
|
||||
logger.debug("Ignoring TAM made with unsupported suite, since TAM is not required: %r", tam_type)
|
||||
logger.debug(
|
||||
"Ignoring manifest TAM made with unsupported suite, since TAM is not required: %r", tam_type
|
||||
)
|
||||
return unpacked, False
|
||||
tam_hmac = tam.get("hmac")
|
||||
tam_salt = tam.get("salt")
|
||||
|
|
@ -279,6 +301,52 @@ class KeyBase:
|
|||
logger.debug("TAM-verified manifest")
|
||||
return unpacked, True
|
||||
|
||||
def unpack_and_verify_archive(self, data, force_tam_not_required=False):
|
||||
"""Unpack msgpacked *data* and return (object, did_verify)."""
|
||||
tam_required = self.tam_required
|
||||
if force_tam_not_required and tam_required:
|
||||
# for a long time, borg only checked manifest for "tam_required" and
|
||||
# people might have archives without TAM, so don't be too annoyingly loud here:
|
||||
logger.debug("Archive authentication DISABLED.")
|
||||
tam_required = False
|
||||
data = bytearray(data)
|
||||
unpacker = get_limited_unpacker("archive")
|
||||
unpacker.feed(data)
|
||||
unpacked = unpacker.unpack()
|
||||
if "tam" not in unpacked:
|
||||
if tam_required:
|
||||
archive_name = unpacked.get("name", "<unknown>")
|
||||
raise ArchiveTAMRequiredError(archive_name)
|
||||
else:
|
||||
logger.debug("Archive TAM not found and not required")
|
||||
return unpacked, False, None
|
||||
tam = unpacked.pop("tam", None)
|
||||
if not isinstance(tam, dict):
|
||||
raise ArchiveTAMInvalid()
|
||||
tam_type = tam.get("type", "<none>")
|
||||
if tam_type != "HKDF_HMAC_SHA512":
|
||||
if tam_required:
|
||||
raise TAMUnsupportedSuiteError(repr(tam_type))
|
||||
else:
|
||||
logger.debug(
|
||||
"Ignoring archive TAM made with unsupported suite, since TAM is not required: %r", tam_type
|
||||
)
|
||||
return unpacked, False, None
|
||||
tam_hmac = tam.get("hmac")
|
||||
tam_salt = tam.get("salt")
|
||||
if not isinstance(tam_salt, (bytes, str)) or not isinstance(tam_hmac, (bytes, str)):
|
||||
raise ArchiveTAMInvalid()
|
||||
tam_hmac = want_bytes(tam_hmac) # legacy
|
||||
tam_salt = want_bytes(tam_salt) # legacy
|
||||
offset = data.index(tam_hmac)
|
||||
data[offset : offset + 64] = bytes(64)
|
||||
tam_key = self._tam_key(tam_salt, context=b"archive")
|
||||
calculated_hmac = hmac.digest(tam_key, data, "sha512")
|
||||
if not hmac.compare_digest(calculated_hmac, tam_hmac):
|
||||
raise ArchiveTAMInvalid()
|
||||
logger.debug("TAM-verified archive")
|
||||
return unpacked, True, tam_salt
|
||||
|
||||
|
||||
class PlaintextKey(KeyBase):
|
||||
TYPE = KeyType.PLAINTEXT
|
||||
|
|
|
|||
|
|
@ -219,10 +219,10 @@ def get_limited_unpacker(kind):
|
|||
args = dict(use_list=False, max_buffer_size=3 * max(BUFSIZE, MAX_OBJECT_SIZE)) # return tuples, not lists
|
||||
if kind in ("server", "client"):
|
||||
pass # nothing special
|
||||
elif kind in ("manifest", "key"):
|
||||
elif kind in ("manifest", "archive", "key"):
|
||||
args.update(dict(use_list=True, object_hook=StableDict)) # default value
|
||||
else:
|
||||
raise ValueError('kind must be "server", "client", "manifest" or "key"')
|
||||
raise ValueError('kind must be "server", "client", "manifest", "archive" or "key"')
|
||||
return Unpacker(**args)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -723,11 +723,12 @@ class ArchiveFormatter(BaseFormatter):
|
|||
"id": "internal ID of the archive",
|
||||
"hostname": "hostname of host on which this archive was created",
|
||||
"username": "username of user who created this archive",
|
||||
"tam": "TAM authentication state of this archive",
|
||||
"size": "size of this archive (data plus metadata, not considering compression and deduplication)",
|
||||
"nfiles": "count of files in this archive",
|
||||
}
|
||||
KEY_GROUPS = (
|
||||
("archive", "name", "comment", "id"),
|
||||
("archive", "name", "comment", "id", "tam"),
|
||||
("start", "time", "end", "command_line"),
|
||||
("hostname", "username"),
|
||||
("size", "nfiles"),
|
||||
|
|
@ -750,6 +751,7 @@ class ArchiveFormatter(BaseFormatter):
|
|||
"username": partial(self.get_meta, "username", ""),
|
||||
"comment": partial(self.get_meta, "comment", ""),
|
||||
"command_line": partial(self.get_meta, "command_line", ""),
|
||||
"tam": self.get_tam,
|
||||
"size": partial(self.get_meta, "size", 0),
|
||||
"nfiles": partial(self.get_meta, "nfiles", 0),
|
||||
"end": self.get_ts_end,
|
||||
|
|
@ -795,6 +797,9 @@ class ArchiveFormatter(BaseFormatter):
|
|||
def get_ts_end(self):
|
||||
return self.format_time(self.archive.ts_end)
|
||||
|
||||
def get_tam(self):
|
||||
return "verified" if self.archive.tam_verified else "none"
|
||||
|
||||
def format_time(self, ts):
|
||||
return OutputTimestamp(ts)
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ import pytest
|
|||
from ...archive import ChunkBuffer
|
||||
from ...constants import * # NOQA
|
||||
from ...helpers import bin_to_hex
|
||||
from ...helpers import msgpack
|
||||
from ...manifest import Manifest
|
||||
from ...repository import Repository
|
||||
from . import cmd, src_file, create_src_archive, open_archive, generate_archiver_tests, RK_ENCRYPTION
|
||||
|
|
@ -233,17 +232,16 @@ def test_manifest_rebuild_duplicate_archive(archivers, request):
|
|||
manifest = repository.get(Manifest.MANIFEST_ID)
|
||||
corrupted_manifest = manifest + b"corrupted!"
|
||||
repository.put(Manifest.MANIFEST_ID, corrupted_manifest)
|
||||
archive = msgpack.packb(
|
||||
{
|
||||
"command_line": "",
|
||||
"item_ptrs": [],
|
||||
"hostname": "foo",
|
||||
"username": "bar",
|
||||
"name": "archive1",
|
||||
"time": "2016-12-15T18:49:51.849711",
|
||||
"version": 2,
|
||||
}
|
||||
)
|
||||
archive_dict = {
|
||||
"command_line": "",
|
||||
"item_ptrs": [],
|
||||
"hostname": "foo",
|
||||
"username": "bar",
|
||||
"name": "archive1",
|
||||
"time": "2016-12-15T18:49:51.849711",
|
||||
"version": 2,
|
||||
}
|
||||
archive = repo_objs.key.pack_and_authenticate_metadata(archive_dict, context=b"archive")
|
||||
archive_id = repo_objs.id_hash(archive)
|
||||
repository.put(archive_id, repo_objs.format(archive_id, {}, archive))
|
||||
repository.commit(compact=False)
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import pytest
|
|||
from ...cache import Cache, LocalCache
|
||||
from ...constants import * # NOQA
|
||||
from ...crypto.key import TAMRequiredError
|
||||
from ...helpers import Location, get_security_dir, bin_to_hex
|
||||
from ...helpers import Location, get_security_dir, bin_to_hex, archive_ts_now
|
||||
from ...helpers import EXIT_ERROR
|
||||
from ...helpers import msgpack
|
||||
from ...manifest import Manifest, MandatoryFeatureUnsupported
|
||||
|
|
@ -322,7 +322,7 @@ def test_check_cache(archivers, request):
|
|||
check_cache(archiver)
|
||||
|
||||
|
||||
# Begin manifest tests
|
||||
# Begin manifest TAM tests
|
||||
def spoof_manifest(repository):
|
||||
with repository:
|
||||
manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
|
||||
|
|
@ -380,6 +380,62 @@ def test_not_required(archiver):
|
|||
cmd(archiver, "rlist")
|
||||
|
||||
|
||||
# Begin archive TAM tests
|
||||
def write_archive_without_tam(repository, archive_name):
|
||||
manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
|
||||
archive_data = msgpack.packb(
|
||||
{
|
||||
"version": 2,
|
||||
"name": archive_name,
|
||||
"item_ptrs": [],
|
||||
"command_line": "",
|
||||
"hostname": "",
|
||||
"username": "",
|
||||
"time": archive_ts_now().isoformat(timespec="microseconds"),
|
||||
"size": 0,
|
||||
"nfiles": 0,
|
||||
}
|
||||
)
|
||||
archive_id = manifest.repo_objs.id_hash(archive_data)
|
||||
cdata = manifest.repo_objs.format(archive_id, {}, archive_data)
|
||||
repository.put(archive_id, cdata)
|
||||
manifest.archives[archive_name] = (archive_id, datetime.now())
|
||||
manifest.write()
|
||||
repository.commit(compact=False)
|
||||
|
||||
|
||||
def test_check_rebuild_manifest(archiver):
|
||||
cmd(archiver, "rcreate", RK_ENCRYPTION)
|
||||
create_src_archive(archiver, "archive_tam")
|
||||
repository = Repository(archiver.repository_path, exclusive=True)
|
||||
with repository:
|
||||
write_archive_without_tam(repository, "archive_no_tam")
|
||||
repository.delete(Manifest.MANIFEST_ID) # kill manifest, so check has to rebuild it
|
||||
repository.commit(compact=False)
|
||||
cmd(archiver, "check", "--repair")
|
||||
output = cmd(archiver, "rlist", "--format='{name} tam:{tam}{NL}'")
|
||||
assert "archive_tam tam:verified" in output # TAM-verified archive is in rebuilt manifest
|
||||
assert "archive_no_tam" not in output # check got rid of untrusted not TAM-verified archive
|
||||
|
||||
|
||||
def test_check_rebuild_refcounts(archiver):
|
||||
cmd(archiver, "rcreate", RK_ENCRYPTION)
|
||||
create_src_archive(archiver, "archive_tam")
|
||||
archive_id_pre_check = cmd(archiver, "rlist", "--format='{name} {id}{NL}'")
|
||||
repository = Repository(archiver.repository_path, exclusive=True)
|
||||
with repository:
|
||||
write_archive_without_tam(repository, "archive_no_tam")
|
||||
output = cmd(archiver, "rlist", "--format='{name} tam:{tam}{NL}'")
|
||||
assert "archive_tam tam:verified" in output # good
|
||||
assert "archive_no_tam tam:none" in output # could be borg < 1.0.9 archive or fake
|
||||
cmd(archiver, "check", "--repair")
|
||||
output = cmd(archiver, "rlist", "--format='{name} tam:{tam}{NL}'")
|
||||
assert "archive_tam tam:verified" in output # TAM-verified archive still there
|
||||
assert "archive_no_tam" not in output # check got rid of untrusted not TAM-verified archive
|
||||
archive_id_post_check = cmd(archiver, "rlist", "--format='{name} {id}{NL}'")
|
||||
assert archive_id_post_check == archive_id_pre_check # rebuild_refcounts didn't change archive_tam archive id
|
||||
|
||||
|
||||
# Begin Remote Tests
|
||||
def test_remote_repo_restrict_to_path(remote_archiver):
|
||||
original_location, repo_path = remote_archiver.repository_location, remote_archiver.repository_path
|
||||
|
|
|
|||
|
|
@ -11,13 +11,8 @@ from ..crypto.key import AEADKeyBase
|
|||
from ..crypto.key import AESOCBRepoKey, AESOCBKeyfileKey, CHPORepoKey, CHPOKeyfileKey
|
||||
from ..crypto.key import Blake2AESOCBRepoKey, Blake2AESOCBKeyfileKey, Blake2CHPORepoKey, Blake2CHPOKeyfileKey
|
||||
from ..crypto.key import ID_HMAC_SHA_256, ID_BLAKE2b_256
|
||||
from ..crypto.key import (
|
||||
TAMRequiredError,
|
||||
TAMInvalid,
|
||||
TAMUnsupportedSuiteError,
|
||||
UnsupportedManifestError,
|
||||
UnsupportedKeyFormatError,
|
||||
)
|
||||
from ..crypto.key import TAMRequiredError, TAMInvalid, TAMUnsupportedSuiteError, ArchiveTAMInvalid
|
||||
from ..crypto.key import UnsupportedManifestError, UnsupportedKeyFormatError
|
||||
from ..crypto.key import identify_key
|
||||
from ..crypto.low_level import IntegrityError as IntegrityErrorBase
|
||||
from ..helpers import IntegrityError
|
||||
|
|
@ -281,6 +276,8 @@ class TestTAM:
|
|||
blob = msgpack.packb({})
|
||||
with pytest.raises(TAMRequiredError):
|
||||
key.unpack_and_verify_manifest(blob)
|
||||
with pytest.raises(TAMRequiredError):
|
||||
key.unpack_and_verify_archive(blob)
|
||||
|
||||
def test_missing(self, key):
|
||||
blob = msgpack.packb({})
|
||||
|
|
@ -288,11 +285,16 @@ class TestTAM:
|
|||
unpacked, verified = key.unpack_and_verify_manifest(blob)
|
||||
assert unpacked == {}
|
||||
assert not verified
|
||||
unpacked, verified, _ = key.unpack_and_verify_archive(blob)
|
||||
assert unpacked == {}
|
||||
assert not verified
|
||||
|
||||
def test_unknown_type_when_required(self, key):
|
||||
blob = msgpack.packb({"tam": {"type": "HMAC_VOLLBIT"}})
|
||||
with pytest.raises(TAMUnsupportedSuiteError):
|
||||
key.unpack_and_verify_manifest(blob)
|
||||
with pytest.raises(TAMUnsupportedSuiteError):
|
||||
key.unpack_and_verify_archive(blob)
|
||||
|
||||
def test_unknown_type(self, key):
|
||||
blob = msgpack.packb({"tam": {"type": "HMAC_VOLLBIT"}})
|
||||
|
|
@ -300,6 +302,9 @@ class TestTAM:
|
|||
unpacked, verified = key.unpack_and_verify_manifest(blob)
|
||||
assert unpacked == {}
|
||||
assert not verified
|
||||
unpacked, verified, _ = key.unpack_and_verify_archive(blob)
|
||||
assert unpacked == {}
|
||||
assert not verified
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"tam, exc",
|
||||
|
|
@ -310,11 +315,25 @@ class TestTAM:
|
|||
(1234, TAMInvalid),
|
||||
),
|
||||
)
|
||||
def test_invalid(self, key, tam, exc):
|
||||
def test_invalid_manifest(self, key, tam, exc):
|
||||
blob = msgpack.packb({"tam": tam})
|
||||
with pytest.raises(exc):
|
||||
key.unpack_and_verify_manifest(blob)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"tam, exc",
|
||||
(
|
||||
({}, TAMUnsupportedSuiteError),
|
||||
({"type": b"\xff"}, TAMUnsupportedSuiteError),
|
||||
(None, ArchiveTAMInvalid),
|
||||
(1234, ArchiveTAMInvalid),
|
||||
),
|
||||
)
|
||||
def test_invalid_archive(self, key, tam, exc):
|
||||
blob = msgpack.packb({"tam": tam})
|
||||
with pytest.raises(exc):
|
||||
key.unpack_and_verify_archive(blob)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"hmac, salt",
|
||||
(({}, bytes(64)), (bytes(64), {}), (None, bytes(64)), (bytes(64), None)),
|
||||
|
|
@ -330,10 +349,12 @@ class TestTAM:
|
|||
blob = msgpack.packb(data)
|
||||
with pytest.raises(TAMInvalid):
|
||||
key.unpack_and_verify_manifest(blob)
|
||||
with pytest.raises(ArchiveTAMInvalid):
|
||||
key.unpack_and_verify_archive(blob)
|
||||
|
||||
def test_round_trip(self, key):
|
||||
def test_round_trip_manifest(self, key):
|
||||
data = {"foo": "bar"}
|
||||
blob = key.pack_and_authenticate_metadata(data)
|
||||
blob = key.pack_and_authenticate_metadata(data, context=b"manifest")
|
||||
assert blob.startswith(b"\x82")
|
||||
|
||||
unpacked = msgpack.unpackb(blob)
|
||||
|
|
@ -344,10 +365,23 @@ class TestTAM:
|
|||
assert unpacked["foo"] == "bar"
|
||||
assert "tam" not in unpacked
|
||||
|
||||
@pytest.mark.parametrize("which", ("hmac", "salt"))
|
||||
def test_tampered(self, key, which):
|
||||
def test_round_trip_archive(self, key):
|
||||
data = {"foo": "bar"}
|
||||
blob = key.pack_and_authenticate_metadata(data)
|
||||
blob = key.pack_and_authenticate_metadata(data, context=b"archive")
|
||||
assert blob.startswith(b"\x82")
|
||||
|
||||
unpacked = msgpack.unpackb(blob)
|
||||
assert unpacked["tam"]["type"] == "HKDF_HMAC_SHA512"
|
||||
|
||||
unpacked, verified, _ = key.unpack_and_verify_archive(blob)
|
||||
assert verified
|
||||
assert unpacked["foo"] == "bar"
|
||||
assert "tam" not in unpacked
|
||||
|
||||
@pytest.mark.parametrize("which", ("hmac", "salt"))
|
||||
def test_tampered_manifest(self, key, which):
|
||||
data = {"foo": "bar"}
|
||||
blob = key.pack_and_authenticate_metadata(data, context=b"manifest")
|
||||
assert blob.startswith(b"\x82")
|
||||
|
||||
unpacked = msgpack.unpackb(blob, object_hook=StableDict)
|
||||
|
|
@ -359,6 +393,21 @@ class TestTAM:
|
|||
with pytest.raises(TAMInvalid):
|
||||
key.unpack_and_verify_manifest(blob)
|
||||
|
||||
@pytest.mark.parametrize("which", ("hmac", "salt"))
|
||||
def test_tampered_archive(self, key, which):
|
||||
data = {"foo": "bar"}
|
||||
blob = key.pack_and_authenticate_metadata(data, context=b"archive")
|
||||
assert blob.startswith(b"\x82")
|
||||
|
||||
unpacked = msgpack.unpackb(blob, object_hook=StableDict)
|
||||
assert len(unpacked["tam"][which]) == 64
|
||||
unpacked["tam"][which] = unpacked["tam"][which][0:32] + bytes(32)
|
||||
assert len(unpacked["tam"][which]) == 64
|
||||
blob = msgpack.packb(unpacked)
|
||||
|
||||
with pytest.raises(ArchiveTAMInvalid):
|
||||
key.unpack_and_verify_archive(blob)
|
||||
|
||||
|
||||
def test_decrypt_key_file_unsupported_algorithm():
|
||||
"""We will add more algorithms in the future. We should raise a helpful error."""
|
||||
|
|
|
|||
Loading…
Reference in a new issue