diff --git a/docs/internals/security.rst b/docs/internals/security.rst index b7b247d0e..e8b9bbf05 100644 --- a/docs/internals/security.rst +++ b/docs/internals/security.rst @@ -180,8 +180,6 @@ Decryption:: decompressed = decompress(decrypted) - ASSERT( CONSTANT-TIME-COMPARISON( chunk-id, MAC(id_key, decompressed) ) ) - Notable: - More modern and often faster AEAD ciphers instead of self-assembled stuff. diff --git a/src/borg/archive.py b/src/borg/archive.py index f6a7a900d..7c6b832b0 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -21,7 +21,7 @@ logger = create_logger() from . import xattr from .chunker import get_chunker, Chunk from .cache import ChunkListEntry -from .crypto.key import key_factory +from .crypto.key import key_factory, AEADKeyBase from .compress import Compressor, CompressionSpec from .constants import * # NOQA from .crypto.low_level import IntegrityError as IntegrityErrorBase @@ -1684,6 +1684,12 @@ class ArchiveChecker: chunks_count_index = len(self.chunks) chunks_count_segments = 0 errors = 0 + # for the new crypto, derived from AEADKeyBase, we know that it checks authenticity on + # the crypto.low_level level - invalid chunks will fail to AEAD authenticate. + # for these key types, we know that there is no need to decompress the data afterwards. + # for all other modes, we assume that we must decompress, so we can verify authenticity + # based on the plaintext MAC (via calling ._assert_id(id, plaintext)). + decompress = not isinstance(self.key, AEADKeyBase) defect_chunks = [] pi = ProgressIndicatorPercent( total=chunks_count_index, msg="Verifying data %6.2f%%", step=0.01, msgid="check.verify_data" @@ -1714,7 +1720,7 @@ class ArchiveChecker: chunk_data_iter = self.repository.get_many(chunk_ids) else: try: - self.key.decrypt(chunk_id, encrypted_data) + self.key.decrypt(chunk_id, encrypted_data, decompress=decompress) except IntegrityErrorBase as integrity_error: self.error_found = True errors += 1 @@ -1745,7 +1751,7 @@ class ArchiveChecker: # from the underlying media. try: encrypted_data = self.repository.get(defect_chunk) - self.key.decrypt(defect_chunk, encrypted_data) + self.key.decrypt(defect_chunk, encrypted_data, decompress=decompress) except IntegrityErrorBase: # failed twice -> get rid of this chunk del self.chunks[defect_chunk] diff --git a/src/borg/crypto/key.py b/src/borg/crypto/key.py index 882c595e2..2def782f0 100644 --- a/src/borg/crypto/key.py +++ b/src/borg/crypto/key.py @@ -871,7 +871,10 @@ class AEADKeyBase(KeyBase): if not decompress: return payload data = self.decompress(memoryview(payload)) - self.assert_id(id, data) + # note: calling self.assert_id(id, data) is not needed any more for the new AEAD crypto. + # we put the id into AAD when storing the chunk, so it gets into the authentication tag computation. + # when decrypting, we provide the id we **want** as AAD for the auth tag verification, so + # decrypting only succeeds if we got the ciphertext we wrote **for that chunk id**. return data def init_from_given_data(self, *, enc_key, enc_hmac_key, id_key, chunk_seed): diff --git a/src/borg/testsuite/key.py b/src/borg/testsuite/key.py index 512649232..a0b41ca6c 100644 --- a/src/borg/testsuite/key.py +++ b/src/borg/testsuite/key.py @@ -256,6 +256,27 @@ class TestKey: with pytest.raises(IntegrityError): key.assert_id(id, plaintext_changed) + def test_getting_wrong_chunk_fails(self, key): + # for the new AEAD crypto, we provide the chunk id as AAD when encrypting/authenticating, + # we provide the id **we want** as AAD when authenticating/decrypting the data we got from the repo. + # only if the id used for encrypting matches the id we want, the AEAD crypto authentication will succeed. + # thus, there is no need any more for calling self._assert_id() for the new crypto. + # the old crypto as well as plaintext and authenticated modes still need to call self._assert_id(). + plaintext_wanted = b"123456789" + id_wanted = key.id_hash(plaintext_wanted) + ciphertext_wanted = key.encrypt(id_wanted, plaintext_wanted) + plaintext_other = b"xxxxxxxxx" + id_other = key.id_hash(plaintext_other) + ciphertext_other = key.encrypt(id_other, plaintext_other) + # both ciphertexts are authentic and decrypting them should succeed: + key.decrypt(id_wanted, ciphertext_wanted) + key.decrypt(id_other, ciphertext_other) + # but if we wanted the one and got the other, it must fail. + # the new crypto will fail due to AEAD auth failure, + # the old crypto and plaintext, authenticated modes will fail due to ._assert_id() check failing: + with pytest.raises(IntegrityErrorBase): + key.decrypt(id_wanted, ciphertext_other) + def test_authenticated_encrypt(self, monkeypatch): monkeypatch.setenv("BORG_PASSPHRASE", "test") key = AuthenticatedKey.create(self.MockRepository(), self.MockArgs())