diff --git a/src/borg/crypto/file_integrity.py b/src/borg/crypto/file_integrity.py index 701c7f9a2..489b34a6f 100644 --- a/src/borg/crypto/file_integrity.py +++ b/src/borg/crypto/file_integrity.py @@ -60,13 +60,14 @@ class FileHashingWrapper(FileLikeWrapper): ALGORITHM: str = None FACTORY: Callable = None - def __init__(self, backing_fd, write): + def __init__(self, backing_fd, write, *, pure_hash: bool = False): super().__init__(backing_fd) self.writing = write self.hash = self.FACTORY() + self.pure_hash = pure_hash # if True, we don't hash the length of the file def __exit__(self, exc_type, exc_val, exc_tb): - if exc_type is None: + if exc_type is None and not self.pure_hash: self.hash_length() super().__exit__(exc_type, exc_val, exc_tb) diff --git a/src/borg/testsuite/crypto/file_integrity_test.py b/src/borg/testsuite/crypto/file_integrity_test.py index d1c59dadf..5db556900 100644 --- a/src/borg/testsuite/crypto/file_integrity_test.py +++ b/src/borg/testsuite/crypto/file_integrity_test.py @@ -1,6 +1,10 @@ +import hashlib +import io + import pytest from ...crypto.file_integrity import DetachedIntegrityCheckedFile, FileIntegrityError, IntegrityCheckedFile +from ...crypto.file_integrity import SHA256FileHashingWrapper from ...platform import SyncFile @@ -147,3 +151,29 @@ class TestIntegrityCheckedFileWithSyncFile: # verify the written data can be read back with integrity check with IntegrityCheckedFile(path=path, write=False, integrity_data=integrity_data) as fd: assert fd.read() == b"test data for integrity check" + + +class TestSHA256FileHashingWrapper: + def test_pure_hash_write(self): + bio = io.BytesIO() + data = b"hello world" + with SHA256FileHashingWrapper(bio, write=True, pure_hash=True) as wrapper: + wrapper.write(data) + assert bio.getvalue() == data + assert wrapper.hexdigest() == hashlib.sha256(data).hexdigest() + + def test_pure_hash_read(self): + data = b"hello world" + bio = io.BytesIO(data) + with SHA256FileHashingWrapper(bio, write=False, pure_hash=True) as wrapper: + assert wrapper.read() == data + assert wrapper.hexdigest() == hashlib.sha256(data).hexdigest() + + def test_impure_hash_write(self): + bio = io.BytesIO() + data = b"hello world" + with SHA256FileHashingWrapper(bio, write=True, pure_hash=False) as wrapper: + wrapper.write(data) + # pure_hash=False appends the file length ("11" in this case) at exit + expected_hash = hashlib.sha256(data + b"11").hexdigest() + assert wrapper.hexdigest() == expected_hash