diff --git a/attic/crypto.pyx b/attic/crypto.pyx index 61dbc42d5..171619758 100644 --- a/attic/crypto.pyx +++ b/attic/crypto.pyx @@ -7,6 +7,9 @@ from libc.stdlib cimport malloc, free API_VERSION = 2 +TAG_SIZE = 16 # bytes; 128 bits is the maximum allowed value. see "hack" below. +IV_SIZE = 16 # bytes; 128 bits + cdef extern from "openssl/rand.h": int RAND_bytes(unsigned char *buf, int num) @@ -22,7 +25,7 @@ cdef extern from "openssl/evp.h": ctypedef struct ENGINE: pass const EVP_MD *EVP_sha256() - const EVP_CIPHER *EVP_aes_256_ctr() + const EVP_CIPHER *EVP_aes_256_gcm() void EVP_CIPHER_CTX_init(EVP_CIPHER_CTX *a) void EVP_CIPHER_CTX_cleanup(EVP_CIPHER_CTX *a) @@ -36,11 +39,14 @@ cdef extern from "openssl/evp.h": const unsigned char *in_, int inl) int EVP_EncryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl) int EVP_DecryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl) - + int EVP_CIPHER_CTX_ctrl(EVP_CIPHER_CTX *ctx, int type, int arg, unsigned char *ptr) int PKCS5_PBKDF2_HMAC(const char *password, int passwordlen, const unsigned char *salt, int saltlen, int iter, const EVP_MD *digest, int keylen, unsigned char *out) + int EVP_CTRL_GCM_GET_TAG + int EVP_CTRL_GCM_SET_TAG + int EVP_CTRL_GCM_SET_IVLEN import struct @@ -98,7 +104,7 @@ cdef class AES: EVP_CIPHER_CTX_init(&self.ctx) self.is_encrypt = is_encrypt # Set cipher type and mode - cipher_mode = EVP_aes_256_ctr() + cipher_mode = EVP_aes_256_gcm() if self.is_encrypt: if not EVP_EncryptInit_ex(&self.ctx, cipher_mode, NULL, NULL, NULL): raise Exception('EVP_EncryptInit_ex failed') @@ -117,6 +123,9 @@ cdef class AES: key2 = key if iv: iv2 = iv + # Set IV length (bytes) + if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_SET_IVLEN, IV_SIZE, NULL): + raise Exception('EVP_CIPHER_CTX_ctrl SET IVLEN failed') # Initialise key and IV if self.is_encrypt: if not EVP_EncryptInit_ex(&self.ctx, NULL, NULL, key2, iv2): @@ -125,16 +134,24 @@ cdef class AES: if not EVP_DecryptInit_ex(&self.ctx, NULL, NULL, key2, iv2): raise Exception('EVP_DecryptInit_ex failed') - @property - def iv(self): - return self.ctx.iv[:16] + def add(self, aad): + cdef int aadl = len(aad) + cdef int outl + # Zero or more calls to specify any AAD + if self.is_encrypt: + if not EVP_EncryptUpdate(&self.ctx, NULL, &outl, aad, aadl): + raise Exception('EVP_EncryptUpdate failed') + else: # decrypt + if not EVP_DecryptUpdate(&self.ctx, NULL, &outl, aad, aadl): + raise Exception('EVP_DecryptUpdate failed') - def encrypt(self, data): + def compute_tag_and_encrypt(self, data): cdef int inl = len(data) cdef int ctl = 0 cdef int outl = 0 - # note: modes that use padding, need up to one extra AES block (16b) + # note: modes that use padding, need up to one extra AES block (16B) cdef unsigned char *out = malloc(inl+16) + cdef unsigned char *tag = malloc(TAG_SIZE) if not out: raise MemoryError try: @@ -144,15 +161,20 @@ cdef class AES: if not EVP_EncryptFinal_ex(&self.ctx, out+ctl, &outl): raise Exception('EVP_EncryptFinal failed') ctl += outl - return out[:ctl] + # Get tag + if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_GET_TAG, TAG_SIZE, tag): + raise Exception('EVP_CIPHER_CTX_ctrl GET TAG failed') + # hack: caller wants 32B tags (256b), so we give back that amount + return (tag[:TAG_SIZE] + b'\x00'*16), out[:ctl] finally: + free(tag) free(out) - def decrypt(self, data): + def check_tag_and_decrypt(self, tag, data): cdef int inl = len(data) cdef int ptl = 0 cdef int outl = 0 - # note: modes that use padding, need up to one extra AES block (16b). + # note: modes that use padding, need up to one extra AES block (16B). # This is what the openssl docs say. I am not sure this is correct, # but OTOH it will not cause any harm if our buffer is a little bigger. cdef unsigned char *out = malloc(inl+16) @@ -162,10 +184,11 @@ cdef class AES: if not EVP_DecryptUpdate(&self.ctx, out, &outl, data, inl): raise Exception('EVP_DecryptUpdate failed') ptl = outl + # Set expected tag value. + if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_SET_TAG, TAG_SIZE, tag): + raise Exception('EVP_CIPHER_CTX_ctrl SET TAG failed') if EVP_DecryptFinal_ex(&self.ctx, out+ptl, &outl) <= 0: - # this error check is very important for modes with padding or - # authentication. for them, a failure here means corrupted data. - # CTR mode does not use padding nor authentication. + # a failure here means corrupted / tampered tag or data raise Exception('EVP_DecryptFinal failed') ptl += outl return out[:ptl] diff --git a/attic/key.py b/attic/key.py index 1dbd279a9..680015c20 100644 --- a/attic/key.py +++ b/attic/key.py @@ -50,7 +50,7 @@ class KeyBase(object): self.TYPE_STR = bytes([self.TYPE]) def id_hash(self, data): - """Return HMAC hash using the "id" HMAC key + """Return a HASH (no id_key) or a MAC (using the "id_key" key) """ def encrypt(self, data): @@ -92,9 +92,9 @@ class PlaintextKey(KeyBase): class AESKeyBase(KeyBase): """Common base class shared by KeyfileKey and PassphraseKey - Chunks are encrypted using 256bit AES in Counter Mode (CTR) + Chunks are encrypted using 256bit AES in Galois Counter Mode (GCM) - Payload layout: TYPE(1) + HMAC(32) + NONCE(8) + CIPHERTEXT + Payload layout: TYPE(1) + TAG(32) + NONCE(8) + CIPHERTEXT To reduce payload size only 8 bytes of the 16 bytes nonce is saved in the payload, the first 8 bytes are always zeros. This does not @@ -105,45 +105,68 @@ class AESKeyBase(KeyBase): PAYLOAD_OVERHEAD = 1 + 32 + 8 # TYPE + HMAC + NONCE def id_hash(self, data): - """Return HMAC hash using the "id" HMAC key """ - return HMAC(self.id_key, data, sha256).digest() + Return GMAC using the "id_key" GMAC key + + XXX do we need a cryptographic hash function here or is a keyed hash + function like GMAC / GHASH good enough? See NIST SP 800-38D. + + IMPORTANT: in 1 repo, there should be only 1 kind of id_hash, otherwise + data hashed/maced with one id_hash might result in same ID as already + exists in the repo for other data created with another id_hash method. + somehow unlikely considering 128 or 256bits, but still. + """ + mac_cipher = AES(is_encrypt=True, key=self.id_key, iv=b'\0'*16) # XXX do we need an IV here? + # GMAC = aes-gcm with all data as AAD, no data as to-be-encrypted data + mac_cipher.add(bytes(data)) + tag, _ = mac_cipher.compute_tag_and_encrypt(b'') + return tag def encrypt(self, data): data = zlib.compress(data) - self.enc_cipher.reset() - data = b''.join((self.enc_cipher.iv[8:], self.enc_cipher.encrypt(data))) - hmac = HMAC(self.enc_hmac_key, data, sha256).digest() - return b''.join((self.TYPE_STR, hmac, data)) + self.enc_cipher.reset(iv=self.enc_iv) + iv_last8 = self.enc_iv[8:] + self.enc_cipher.add(iv_last8) + tag, data = self.enc_cipher.compute_tag_and_encrypt(data) + # increase the IV (counter) value so same value is never used twice + current_iv = bytes_to_long(iv_last8) + self.enc_iv = PREFIX + long_to_bytes(current_iv + num_aes_blocks(len(data))) + return b''.join((self.TYPE_STR, tag, iv_last8, data)) def decrypt(self, id, data): if data[0] != self.TYPE: raise IntegrityError('Invalid encryption envelope') - hmac = memoryview(data)[1:33] - if memoryview(HMAC(self.enc_hmac_key, memoryview(data)[33:], sha256).digest()) != hmac: + iv_last8 = data[1+32:1+40] + iv = PREFIX + iv_last8 + self.dec_cipher.reset(iv=iv) + self.dec_cipher.add(iv_last8) + tag, data = data[1:1+32], data[1+40:] + try: + data = self.dec_cipher.check_tag_and_decrypt(tag, data) + except Exception: raise IntegrityError('Encryption envelope checksum mismatch') - self.dec_cipher.reset(iv=PREFIX + data[33:41]) - data = zlib.decompress(self.dec_cipher.decrypt(data[41:])) # should use memoryview - if id and HMAC(self.id_key, data, sha256).digest() != id: + data = zlib.decompress(data) + if id and self.id_hash(data) != id: raise IntegrityError('Chunk id verification failed') return data def extract_nonce(self, payload): if payload[0] != self.TYPE: - raise IntegrityError('Invalid encryption envelope') + raise IntegrityError('Invalid encryption envelope') nonce = bytes_to_long(payload[33:41]) return nonce def init_from_random_data(self, data): self.enc_key = data[0:32] - self.enc_hmac_key = data[32:64] + self.enc_hmac_key = data[32:64] # XXX enc_hmac_key not used for AES-GCM self.id_key = data[64:96] self.chunk_seed = bytes_to_int(data[96:100]) # Convert to signed int32 if self.chunk_seed & 0x80000000: self.chunk_seed = self.chunk_seed - 0xffffffff - 1 - def init_ciphers(self, enc_iv=b''): + def init_ciphers(self, enc_iv=PREFIX * 2): # default IV = 16B zero + self.enc_iv = enc_iv self.enc_cipher = AES(is_encrypt=True, key=self.enc_key, iv=enc_iv) self.dec_cipher = AES(is_encrypt=False, key=self.enc_key) @@ -242,25 +265,25 @@ class KeyfileKey(AESKeyBase): def decrypt_key_file(self, data, passphrase): d = msgpack.unpackb(data) assert d[b'version'] == 1 - assert d[b'algorithm'] == b'sha256' + assert d[b'algorithm'] == b'gmac' key = pbkdf2_sha256(passphrase.encode('utf-8'), d[b'salt'], d[b'iterations'], 32) - data = AES(is_encrypt=False, key=key).decrypt(d[b'data']) - if HMAC(key, data, sha256).digest() != d[b'hash']: + try: + data = AES(is_encrypt=False, key=key, iv=b'\0'*16).check_tag_and_decrypt(d[b'hash'], d[b'data']) + return data + except Exception: return None - return data def encrypt_key_file(self, data, passphrase): salt = get_random_bytes(32) iterations = 100000 key = pbkdf2_sha256(passphrase.encode('utf-8'), salt, iterations, 32) - hash = HMAC(key, data, sha256).digest() - cdata = AES(is_encrypt=True, key=key).encrypt(data) + tag, cdata = AES(is_encrypt=True, key=key, iv=b'\0'*16).compute_tag_and_encrypt(data) d = { 'version': 1, 'salt': salt, 'iterations': iterations, - 'algorithm': 'sha256', - 'hash': hash, + 'algorithm': 'gmac', + 'hash': tag, 'data': cdata, } return msgpack.packb(d) diff --git a/attic/testsuite/crypto.py b/attic/testsuite/crypto.py index 304ef97c0..f4b9a9cb6 100644 --- a/attic/testsuite/crypto.py +++ b/attic/testsuite/crypto.py @@ -27,18 +27,20 @@ class CryptoTestCase(AtticTestCase): self.assert_equal(len(bytes2), 10) self.assert_not_equal(bytes, bytes2) - def test_aes(self): + def test_aes_gcm(self): key = b'X' * 32 + iv = b'A' * 16 data = b'foo' * 10 # encrypt - aes = AES(is_encrypt=True, key=key) - self.assert_equal(bytes_to_long(aes.iv, 8), 0) - cdata = aes.encrypt(data) - self.assert_equal(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466') - self.assert_equal(bytes_to_long(aes.iv, 8), 2) - # decrypt - aes = AES(is_encrypt=False, key=key) - self.assert_equal(bytes_to_long(aes.iv, 8), 0) - pdata = aes.decrypt(cdata) + aes = AES(is_encrypt=True, key=key, iv=iv) + tag, cdata = aes.compute_tag_and_encrypt(data) + self.assert_equal(hexlify(tag), b'c98aa10eb6b7031bcc2160878d9438fb00000000000000000000000000000000') + self.assert_equal(hexlify(cdata), b'841bcce405df769d22ee9f7f012edf5dc7fb2594d924c7400ffd050f2741') + # decrypt (correct tag/cdata) + aes = AES(is_encrypt=False, key=key, iv=iv) + pdata = aes.check_tag_and_decrypt(tag, cdata) self.assert_equal(data, pdata) - self.assert_equal(bytes_to_long(aes.iv, 8), 2) + # decrypt (incorrect tag/cdata) + aes = AES(is_encrypt=False, key=key, iv=iv) + cdata = b'x' + cdata[1:] # corrupt cdata + self.assertRaises(Exception, aes.check_tag_and_decrypt, tag, cdata) diff --git a/attic/testsuite/key.py b/attic/testsuite/key.py index 543d1be36..de31f2721 100644 --- a/attic/testsuite/key.py +++ b/attic/testsuite/key.py @@ -15,20 +15,20 @@ class KeyTestCase(AtticTestCase): repository = Location(tempfile.mkstemp()[1]) keyfile2_key_file = """ - ATTIC KEY 0000000000000000000000000000000000000000000000000000000000000000 - hqppdGVyYXRpb25zzgABhqCkaGFzaNoAIMyonNI+7Cjv0qHi0AOBM6bLGxACJhfgzVD2oq - bIS9SFqWFsZ29yaXRobaZzaGEyNTakc2FsdNoAINNK5qqJc1JWSUjACwFEWGTdM7Nd0a5l - 1uBGPEb+9XM9p3ZlcnNpb24BpGRhdGHaANAYDT5yfPpU099oBJwMomsxouKyx/OG4QIXK2 - hQCG2L2L/9PUu4WIuKvGrsXoP7syemujNfcZws5jLp2UPva4PkQhQsrF1RYDEMLh2eF9Ol - rwtkThq1tnh7KjWMG9Ijt7/aoQtq0zDYP/xaFF8XXSJxiyP5zjH5+spB6RL0oQHvbsliSh - /cXJq7jrqmrJ1phd6dg4SHAM/i+hubadZoS6m25OQzYAW09wZD/phG8OVa698Z5ed3HTaT - SmrtgJL3EoOKgUI9d6BLE4dJdBqntifo""".strip() +ATTIC KEY 0000000000000000000000000000000000000000000000000000000000000000 +hqppdGVyYXRpb25zzgABhqCkc2FsdNoAICiRWfijWqIuvr+70VzOsUS4Y6NM45FWm6LgCu +2GyalGqWFsZ29yaXRobaRnbWFjpGhhc2jaACDgCK7u30Pi+Du1qHRyWBupAAAAAAAAAAAA +AAAAAAAAAKd2ZXJzaW9uAaRkYXRh2gDQrlCtq2mzdmkuhwIoko5+amxYqnlfNHHZxRFiX9 +F8AliP7H6S0j9uHyrBKRDWtj7VGYWVW8COy/FncLRgRhspB59rH3y/GS6pfeEw7RWUPd32 +eOcB6v8q+IHUvGttyFRcN6PxSFHBhOKN0jqStP0UqXLv+d9rGWi6X/HNZGu9WPkqs/g0G9 +xnf48i9pOy19aQo3HV//ubf+VYWmc1J8zjCS2Og0JkMtxbqM6j4mShPjkURZZBXSJGtORV +5IzNAzixJWmr8LR12TmFGVb0U9P79A==""".strip() keyfile2_cdata = unhexlify(re.sub('\W', '', """ - 0055f161493fcfc16276e8c31493c4641e1eb19a79d0326fad0291e5a9c98e5933 - 00000000000003e8d21eaf9b86c297a8cd56432e1915bb + 004078370be366ac3ad9d147992be8ebee000000000000000000000000000000000000000000000000 + b94bfb5d0a63b0c47cf74e2d0585aa """)) - keyfile2_id = unhexlify('c3fbf14bc001ebcc3cd86e696c13482ed071740927cd7cbe1b01b4bfcee49314') + keyfile2_id = unhexlify('45f309b4ef353c467d16a19039b87e5400000000000000000000000000000000') def setUp(self): self.tmppath = tempfile.mkdtemp() @@ -44,6 +44,15 @@ class KeyTestCase(AtticTestCase): _location = _Location() id = bytes(32) + def _test_make_testdata(self): + # modify tearDown to not kill the key file first, before using this + os.environ['ATTIC_PASSPHRASE'] = 'passphrase' + key = KeyfileKey.create(self.MockRepository(), self.MockArgs()) + print("keyfile2_key_file: find the it in the filesystem, see location in test log output") + print("keyfile2_cdata:", hexlify(key.encrypt(b'payload'))) + print("keyfile2_id:", hexlify(key.id_hash(b'payload'))) + assert False + def test_plaintext(self): key = PlaintextKey.create(None, None) data = b'foo' @@ -53,7 +62,7 @@ class KeyTestCase(AtticTestCase): def test_keyfile(self): os.environ['ATTIC_PASSPHRASE'] = 'test' key = KeyfileKey.create(self.MockRepository(), self.MockArgs()) - self.assert_equal(bytes_to_long(key.enc_cipher.iv, 8), 0) + self.assert_equal(bytes_to_long(key.enc_iv, 8), 0) manifest = key.encrypt(b'XXX') self.assert_equal(key.extract_nonce(manifest), 0) manifest2 = key.encrypt(b'XXX') @@ -62,7 +71,7 @@ class KeyTestCase(AtticTestCase): self.assert_equal(key.extract_nonce(manifest2), 1) iv = key.extract_nonce(manifest) key2 = KeyfileKey.detect(self.MockRepository(), manifest) - self.assert_equal(bytes_to_long(key2.enc_cipher.iv, 8), iv + num_aes_blocks(len(manifest) - KeyfileKey.PAYLOAD_OVERHEAD)) + self.assert_equal(bytes_to_long(key2.enc_iv, 8), iv + num_aes_blocks(len(manifest) - KeyfileKey.PAYLOAD_OVERHEAD)) # Key data sanity check self.assert_equal(len(set([key2.id_key, key2.enc_key, key2.enc_hmac_key])), 3) self.assert_equal(key2.chunk_seed == 0, False) @@ -79,7 +88,7 @@ class KeyTestCase(AtticTestCase): def test_passphrase(self): os.environ['ATTIC_PASSPHRASE'] = 'test' key = PassphraseKey.create(self.MockRepository(), None) - self.assert_equal(bytes_to_long(key.enc_cipher.iv, 8), 0) + self.assert_equal(bytes_to_long(key.enc_iv, 8), 0) self.assert_equal(hexlify(key.id_key), b'793b0717f9d8fb01c751a487e9b827897ceea62409870600013fbc6b4d8d7ca6') self.assert_equal(hexlify(key.enc_hmac_key), b'b885a05d329a086627412a6142aaeb9f6c54ab7950f996dd65587251f6bc0901') self.assert_equal(hexlify(key.enc_key), b'2ff3654c6daf7381dbbe718d2b20b4f1ea1e34caa6cc65f6bb3ac376b93fed2a') @@ -92,11 +101,11 @@ class KeyTestCase(AtticTestCase): self.assert_equal(key.extract_nonce(manifest2), 1) iv = key.extract_nonce(manifest) key2 = PassphraseKey.detect(self.MockRepository(), manifest) - self.assert_equal(bytes_to_long(key2.enc_cipher.iv, 8), iv + num_aes_blocks(len(manifest) - PassphraseKey.PAYLOAD_OVERHEAD)) + self.assert_equal(bytes_to_long(key2.enc_iv, 8), iv + num_aes_blocks(len(manifest) - PassphraseKey.PAYLOAD_OVERHEAD)) self.assert_equal(key.id_key, key2.id_key) self.assert_equal(key.enc_hmac_key, key2.enc_hmac_key) self.assert_equal(key.enc_key, key2.enc_key) self.assert_equal(key.chunk_seed, key2.chunk_seed) data = b'foo' - self.assert_equal(hexlify(key.id_hash(data)), b'818217cf07d37efad3860766dcdf1d21e401650fed2d76ed1d797d3aae925990') + self.assert_equal(hexlify(key.id_hash(data)), b'a409d69859b8a07625f066e42cde050100000000000000000000000000000000') self.assert_equal(data, key2.decrypt(key2.id_hash(data), key.encrypt(data)))