From 012d6448d2a8005d3a636f6115b3effc3fd378f4 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Mon, 23 Mar 2015 00:05:57 +0100 Subject: [PATCH] compute mac over all meta, make IV storage flexible, still support legacy legacy: - last 8 bytes of IV are stored, upper 8 are assumed to be zero - mac is computed over iv_last8 + data new: - store complete IV - mac is computed over complete meta + data refactored some code into separate increment_iv and get_aad functions --- attic/key.py | 106 ++++++++++++++++++++++-------------- attic/testsuite/archiver.py | 2 +- attic/testsuite/key.py | 49 +++++++++-------- 3 files changed, 91 insertions(+), 66 deletions(-) diff --git a/attic/key.py b/attic/key.py index 20b88191a..340124691 100644 --- a/attic/key.py +++ b/attic/key.py @@ -24,7 +24,7 @@ from attic.helpers import IntegrityError, get_keys_dir, Error # zero anyway as the full IV is a 128bit counter. PREFIX are the upper 8 bytes, # stored_iv are the lower 8 Bytes. PREFIX = b'\0' * 8 -Meta = namedtuple('Meta', 'compr_type, key_type, mac_type, cipher_type, stored_iv') +Meta = namedtuple('Meta', 'compr_type, key_type, mac_type, cipher_type, iv, legacy') class UnsupportedPayloadError(Error): @@ -198,17 +198,46 @@ COMPR_DEFAULT = NullCompressor.TYPE # no compression class PLAIN: TYPE = 0 + enc_iv = None # dummy def __init__(self, **kw): pass - def compute_mac_and_encrypt(self, data): - return b'', b'', data + def compute_mac_and_encrypt(self, meta, data): + return None, data - def check_mac_and_decrypt(self, mac, iv_last8, data): + def check_mac_and_decrypt(self, mac, meta, data): return data +def increment_iv(iv, amount): + """ + increment the given IV considering that bytes of data was + encrypted based on it. In CTR / GCM mode, the IV is just a counter and + must never repeat. + + :param iv: current IV, 16 bytes (128 bit) + :param amount: amount of data (in bytes) that was encrypted + :return: new IV, 16 bytes (128 bit) + """ + # TODO: code assumes that the last 8 bytes are enough, the upper 8 always zero + iv_last8 = iv[8:] + current_iv = bytes_to_long(iv_last8) + new_iv = current_iv + num_aes_blocks(amount) + iv_last8 = long_to_bytes(new_iv) + iv = PREFIX + iv_last8 + return iv + + +def get_aad(meta): + """get additional authenticated data for AEAD ciphers""" + if meta.legacy: + # legacy format computed the mac over (iv_last8 + data) + return meta.iv[8:] + else: + return msgpack.packb(meta) + + class AES_CTR_HMAC: TYPE = 1 @@ -218,21 +247,19 @@ class AES_CTR_HMAC: self.enc_cipher = AES(mode=AES_CTR_MODE, is_encrypt=True, key=enc_key, iv=enc_iv) self.dec_cipher = AES(mode=AES_CTR_MODE, is_encrypt=False, key=enc_key) - def compute_mac_and_encrypt(self, data): - self.enc_cipher.reset(iv=self.enc_iv) - iv_last8 = self.enc_iv[8:] + def compute_mac_and_encrypt(self, meta, data): + self.enc_cipher.reset(iv=meta.iv) _, data = self.enc_cipher.compute_mac_and_encrypt(data) - # increase the IV (counter) value so same value is never used twice - current_iv = bytes_to_long(iv_last8) - self.enc_iv = PREFIX + long_to_bytes(current_iv + num_aes_blocks(len(data))) - mac = HMAC(self.hmac_key, iv_last8 + data, sha256).digest() # XXX mac / hash flexibility - return mac, iv_last8, data + self.enc_iv = increment_iv(meta.iv, len(data)) + aad = get_aad(meta) + mac = HMAC(self.hmac_key, aad + data, sha256).digest() # XXX mac / hash flexibility + return mac, data - def check_mac_and_decrypt(self, mac, iv_last8, data): - iv = PREFIX + iv_last8 - if HMAC(self.hmac_key, iv_last8 + data, sha256).digest() != mac: + def check_mac_and_decrypt(self, mac, meta, data): + aad = get_aad(meta) + if HMAC(self.hmac_key, aad + data, sha256).digest() != mac: raise IntegrityError('Encryption envelope checksum mismatch') - self.dec_cipher.reset(iv=iv) + self.dec_cipher.reset(iv=meta.iv) data = self.dec_cipher.check_mac_and_decrypt(None, data) return data @@ -246,20 +273,18 @@ class AES_GCM: self.enc_cipher = AES(mode=AES_GCM_MODE, is_encrypt=True, key=enc_key, iv=enc_iv) self.dec_cipher = AES(mode=AES_GCM_MODE, is_encrypt=False, key=enc_key) - def compute_mac_and_encrypt(self, data): - self.enc_cipher.reset(iv=self.enc_iv) - iv_last8 = self.enc_iv[8:] - self.enc_cipher.add(iv_last8) + def compute_mac_and_encrypt(self, meta, data): + self.enc_cipher.reset(iv=meta.iv) + aad = get_aad(meta) + self.enc_cipher.add(aad) mac, data = self.enc_cipher.compute_mac_and_encrypt(data) - # increase the IV (counter) value so same value is never used twice - current_iv = bytes_to_long(iv_last8) - self.enc_iv = PREFIX + long_to_bytes(current_iv + num_aes_blocks(len(data))) - return mac, iv_last8, data + self.enc_iv = increment_iv(meta.iv, len(data)) + return mac, data - def check_mac_and_decrypt(self, mac, iv_last8, data): - iv = PREFIX + iv_last8 - self.dec_cipher.reset(iv=iv) - self.dec_cipher.add(iv_last8) + def check_mac_and_decrypt(self, mac, meta, data): + self.dec_cipher.reset(iv=meta.iv) + aad = get_aad(meta) + self.dec_cipher.add(aad) try: data = self.dec_cipher.check_mac_and_decrypt(mac, data) except Exception: @@ -300,10 +325,10 @@ class KeyBase(object): def encrypt(self, data): data = self.compressor.compress(data) - mac, iv_last8, data = self.cipher.compute_mac_and_encrypt(data) meta = Meta(compr_type=self.compressor.TYPE, key_type=self.TYPE, mac_type=self.maccer_cls.TYPE, cipher_type=self.cipher.TYPE, - stored_iv=iv_last8) + iv=self.cipher.enc_iv, legacy=False) + mac, data = self.cipher.compute_mac_and_encrypt(meta, data) return generate(mac, meta, data) def decrypt(self, id, data): @@ -312,7 +337,7 @@ class KeyBase(object): assert isinstance(self, keyer) assert self.maccer_cls is maccer assert self.cipher_cls is cipher - data = self.cipher.check_mac_and_decrypt(mac, meta.stored_iv, data) + data = self.cipher.check_mac_and_decrypt(mac, meta, data) data = self.compressor.decompress(data) if id and self.id_hash(data) != id: raise IntegrityError('Chunk id verification failed') @@ -352,10 +377,9 @@ class AESKeyBase(KeyBase): affect security but limits the maximum repository capacity to only 295 exabytes! """ - def extract_nonce(self, payload): - mac, meta, data = parser(payload) - nonce = bytes_to_long(meta.stored_iv) - return nonce + def extract_iv(self, payload): + _, meta, _ = parser(payload) + return meta.iv def init_from_random_data(self, data): self.enc_key = data[0:32] @@ -416,8 +440,7 @@ class PassphraseKey(AESKeyBase): key.init(repository, passphrase) try: key.decrypt(None, manifest_data) - num_blocks = num_aes_blocks(len(data)) - key.init_ciphers(PREFIX + long_to_bytes(key.extract_nonce(manifest_data) + num_blocks)) + key.init_ciphers(increment_iv(key.extract_iv(manifest_data), len(data))) return key except IntegrityError: passphrase = getpass(prompt) @@ -447,8 +470,7 @@ class KeyfileKey(AESKeyBase): passphrase = os.environ.get('ATTIC_PASSPHRASE', '') while not key.load(path, passphrase): passphrase = getpass(prompt) - num_blocks = num_aes_blocks(len(data)) - key.init_ciphers(PREFIX + long_to_bytes(key.extract_nonce(manifest_data) + num_blocks)) + key.init_ciphers(increment_iv(key.extract_iv(manifest_data), len(data))) return key @classmethod @@ -631,15 +653,15 @@ def legacy_parser(all_data, key_type): # all rather hardcoded offset = 1 if key_type == PlaintextKey.TYPE: mac = None - stored_iv = None + iv = None data = all_data[offset:] else: mac = all_data[offset:offset+32] - stored_iv = all_data[offset+32:offset+40] + iv = PREFIX + all_data[offset+32:offset+40] data = all_data[offset+40:] meta = Meta(compr_type=6, key_type=key_type, mac_type=HMAC_SHA256.TYPE, cipher_type=AES_CTR_HMAC.TYPE, - stored_iv=stored_iv) + iv=iv, legacy=True) return mac, meta, data def parser00(all_data): diff --git a/attic/testsuite/archiver.py b/attic/testsuite/archiver.py index 5d4c21949..39ae21515 100644 --- a/attic/testsuite/archiver.py +++ b/attic/testsuite/archiver.py @@ -385,7 +385,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): seen.add(hash) mac, meta, data = parser(data) num_blocks = num_aes_blocks(len(data)) - nonce = bytes_to_long(meta.stored_iv) + nonce = bytes_to_long(meta.iv, 8) for counter in range(nonce, nonce + num_blocks): self.assert_not_in(counter, used) used.add(counter) diff --git a/attic/testsuite/key.py b/attic/testsuite/key.py index cde0b79fd..11c87100d 100644 --- a/attic/testsuite/key.py +++ b/attic/testsuite/key.py @@ -5,7 +5,7 @@ import tempfile from binascii import hexlify from attic.crypto import bytes_to_long from attic.testsuite import AtticTestCase -from attic.key import PlaintextKey, PassphraseKey, KeyfileKey, COMPR_DEFAULT +from attic.key import PlaintextKey, PassphraseKey, KeyfileKey, COMPR_DEFAULT, increment_iv from attic.helpers import Location, unhexlify @@ -19,19 +19,20 @@ class KeyTestCase(AtticTestCase): keyfile2_key_file = """ ATTIC KEY 0000000000000000000000000000000000000000000000000000000000000000 -hqppdGVyYXRpb25zzgABhqCkc2FsdNoAIDq9JP02h8kcifnmD32O8kvEVHvgfjz3XgxeTt -wEZNGupGRhdGHaANDXW3xga6hSj1Ix8a41jQKIeX9kZo2Zvyy8XTxX7hbgQKm82649nAfm -hNMTrukDNyrwYN5dUGlS60XUccmfOa+rVJZkQhEiblpC7teFrQvYYUB5in83vDJK8XG8yS -6yHh6uQC5IdTdofTRN41JkQvXyd2wSzvWnqCrVTS8IEN4fmVXbNdJpHHzFxGDtsLRPP1FX -MdB35RjBHsHocJs+uk0syXQwfuVhq/AJQg24GznHpM4rnli8UTe82jM/7BXDAMOUDvTicF -cuzUZa5TlKphowp3ZlcnNpb24BqWFsZ29yaXRobaRnbWFjpGhhc2jaACBkWGoI42Vpa7c7 -yeZwRQ7VAAAAAAAAAAAAAAAAAAAAAA==""".strip() +hqlhbGdvcml0aG2kZ21hY6RoYXNo2gAgY7jwSMnBwpqD3Fk/aAdSAgAAAAAAAAAAAAAAAA +AAAACqaXRlcmF0aW9uc84AAYagp3ZlcnNpb24BpHNhbHTaACASqCq8G6a/K/W+bOrNDW65 +Sfl9ZHrTEtq6l+AMUmATxKRkYXRh2gDQuDVCijDzeZDD/JLPrOtsQL/vrZEWvCt5RuXFOt +tTZfbCJDmv2nt4KvYToVsp82pffZDcsLaOOBCTGurpkdefsdiLMgGiLlbrsXlES+fbKZfq +Tx2x2DjU4L1bFxuoypDIdk2lB3S98ZpFZ6yd1XtDBVTQ34FZTlDXIZ5HyuxAJBrGKYj/Un +Fk24N5xSoPfeQhE3r7hqEsGwEEX0s6sg0LHMGyc4xSBb13iZxWRlSdnvBC7teIeevhT/DU +scOrlrX0NO2eqe5jQF+zj1Q6OtBvRA== +""".strip() keyfile2_cdata = unhexlify(re.sub('\W', '', """ - 0393c420cff16872afba0a609bfa4b458e9ea4e900000000000000000000000000000000 - 9500001402c4080000000000000000c407e04fb0a78f1a39 + 0393c420fd6e9ac6f8c49c4789d1c924c14c309200000000000000000000000000000000 + 9600001402c41000000000000000000000000000000000c2c4071352fe2286e3ed """)) - keyfile2_id = unhexlify('7cf9e207968deea8ea54f14ccf814cfe00000000000000000000000000000000') + keyfile2_id = unhexlify('d4954bcf8d7b1762356e91b2611c727800000000000000000000000000000000') def setUp(self): self.tmppath = tempfile.mkdtemp() @@ -65,17 +66,18 @@ yeZwRQ7VAAAAAAAAAAAAAAAAAAAAAA==""".strip() def test_keyfile(self): os.environ['ATTIC_PASSPHRASE'] = 'test' key = KeyfileKey.create(self.MockRepository(), self.MockArgs()) - self.assert_equal(bytes_to_long(key.enc_iv, 8), 0) + self.assert_equal(key.enc_iv, b'\0'*16) manifest = key.encrypt(b'XXX') - self.assert_equal(key.extract_nonce(manifest), 0) + self.assert_equal(key.extract_iv(manifest), b'\0'*16) manifest2 = key.encrypt(b'XXX') self.assert_not_equal(manifest, manifest2) self.assert_equal(key.decrypt(None, manifest), key.decrypt(None, manifest2)) - self.assert_equal(key.extract_nonce(manifest2), 1) - iv = key.extract_nonce(manifest) + self.assert_equal(key.extract_iv(manifest2), b'\0'*15+b'\x01') + iv = key.extract_iv(manifest) key2 = KeyfileKey.detect(self.MockRepository(), manifest) - # we just assume that the payload fits into 1 AES block (which is given for b'XXX'). - self.assert_equal(bytes_to_long(key2.enc_iv, 8), iv + 1) + # we assume that the payload fits into one 16B AES block (which is given for b'XXX'). + iv_plus_1 = increment_iv(iv, 16) + self.assert_equal(key2.enc_iv, iv_plus_1) # Key data sanity check self.assert_equal(len(set([key2.id_key, key2.enc_key, key2.enc_hmac_key])), 3) self.assert_equal(key2.chunk_seed == 0, False) @@ -92,21 +94,22 @@ yeZwRQ7VAAAAAAAAAAAAAAAAAAAAAA==""".strip() def test_passphrase(self): os.environ['ATTIC_PASSPHRASE'] = 'test' key = PassphraseKey.create(self.MockRepository(), self.MockArgs()) - self.assert_equal(bytes_to_long(key.enc_iv, 8), 0) + self.assert_equal(key.enc_iv, b'\0'*16) self.assert_equal(hexlify(key.id_key), b'793b0717f9d8fb01c751a487e9b827897ceea62409870600013fbc6b4d8d7ca6') self.assert_equal(hexlify(key.enc_hmac_key), b'b885a05d329a086627412a6142aaeb9f6c54ab7950f996dd65587251f6bc0901') self.assert_equal(hexlify(key.enc_key), b'2ff3654c6daf7381dbbe718d2b20b4f1ea1e34caa6cc65f6bb3ac376b93fed2a') self.assert_equal(key.chunk_seed, -775740477) manifest = key.encrypt(b'XXX') - self.assert_equal(key.extract_nonce(manifest), 0) + self.assert_equal(key.extract_iv(manifest), b'\0'*16) manifest2 = key.encrypt(b'XXX') self.assert_not_equal(manifest, manifest2) self.assert_equal(key.decrypt(None, manifest), key.decrypt(None, manifest2)) - self.assert_equal(key.extract_nonce(manifest2), 1) - iv = key.extract_nonce(manifest) + self.assert_equal(key.extract_iv(manifest2), b'\0'*15+b'\x01') + iv = key.extract_iv(manifest) key2 = PassphraseKey.detect(self.MockRepository(), manifest) - # we just assume that the payload fits into 1 AES block (which is given for b'XXX'). - self.assert_equal(bytes_to_long(key2.enc_iv, 8), iv + 1) + # we assume that the payload fits into one 16B AES block (which is given for b'XXX'). + iv_plus_1 = increment_iv(iv, 16) + self.assert_equal(key2.enc_iv, iv_plus_1) self.assert_equal(key.id_key, key2.id_key) self.assert_equal(key.enc_hmac_key, key2.enc_hmac_key) self.assert_equal(key.enc_key, key2.enc_key)