Merge branch 'aes_gcm' into merge-all

Note: this is just a rather quick merge, just to get things working as
they did in aes-gcm branch. As gcm replaced ctr, more work is needed to
re-integrate aes-ctr (for backwards compat). And also to use self.maccer
for the id_hash.

Conflicts:
	attic/key.py
	attic/testsuite/key.py
This commit is contained in:
Thomas Waldmann 2015-03-12 17:07:16 +01:00
commit 20d1a74aaa
4 changed files with 137 additions and 65 deletions

View file

@ -7,6 +7,9 @@ from libc.stdlib cimport malloc, free
API_VERSION = 2
TAG_SIZE = 16 # bytes; 128 bits is the maximum allowed value. see "hack" below.
IV_SIZE = 16 # bytes; 128 bits
cdef extern from "openssl/rand.h":
int RAND_bytes(unsigned char *buf, int num)
@ -22,7 +25,7 @@ cdef extern from "openssl/evp.h":
ctypedef struct ENGINE:
pass
const EVP_MD *EVP_sha256()
const EVP_CIPHER *EVP_aes_256_ctr()
const EVP_CIPHER *EVP_aes_256_gcm()
void EVP_CIPHER_CTX_init(EVP_CIPHER_CTX *a)
void EVP_CIPHER_CTX_cleanup(EVP_CIPHER_CTX *a)
@ -36,11 +39,14 @@ cdef extern from "openssl/evp.h":
const unsigned char *in_, int inl)
int EVP_EncryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl)
int EVP_DecryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl)
int EVP_CIPHER_CTX_ctrl(EVP_CIPHER_CTX *ctx, int type, int arg, unsigned char *ptr)
int PKCS5_PBKDF2_HMAC(const char *password, int passwordlen,
const unsigned char *salt, int saltlen, int iter,
const EVP_MD *digest,
int keylen, unsigned char *out)
int EVP_CTRL_GCM_GET_TAG
int EVP_CTRL_GCM_SET_TAG
int EVP_CTRL_GCM_SET_IVLEN
import struct
@ -98,7 +104,7 @@ cdef class AES:
EVP_CIPHER_CTX_init(&self.ctx)
self.is_encrypt = is_encrypt
# Set cipher type and mode
cipher_mode = EVP_aes_256_ctr()
cipher_mode = EVP_aes_256_gcm()
if self.is_encrypt:
if not EVP_EncryptInit_ex(&self.ctx, cipher_mode, NULL, NULL, NULL):
raise Exception('EVP_EncryptInit_ex failed')
@ -117,6 +123,9 @@ cdef class AES:
key2 = key
if iv:
iv2 = iv
# Set IV length (bytes)
if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_SET_IVLEN, IV_SIZE, NULL):
raise Exception('EVP_CIPHER_CTX_ctrl SET IVLEN failed')
# Initialise key and IV
if self.is_encrypt:
if not EVP_EncryptInit_ex(&self.ctx, NULL, NULL, key2, iv2):
@ -125,16 +134,24 @@ cdef class AES:
if not EVP_DecryptInit_ex(&self.ctx, NULL, NULL, key2, iv2):
raise Exception('EVP_DecryptInit_ex failed')
@property
def iv(self):
return self.ctx.iv[:16]
def add(self, aad):
cdef int aadl = len(aad)
cdef int outl
# Zero or more calls to specify any AAD
if self.is_encrypt:
if not EVP_EncryptUpdate(&self.ctx, NULL, &outl, aad, aadl):
raise Exception('EVP_EncryptUpdate failed')
else: # decrypt
if not EVP_DecryptUpdate(&self.ctx, NULL, &outl, aad, aadl):
raise Exception('EVP_DecryptUpdate failed')
def encrypt(self, data):
def compute_tag_and_encrypt(self, data):
cdef int inl = len(data)
cdef int ctl = 0
cdef int outl = 0
# note: modes that use padding, need up to one extra AES block (16b)
# note: modes that use padding, need up to one extra AES block (16B)
cdef unsigned char *out = <unsigned char *>malloc(inl+16)
cdef unsigned char *tag = <unsigned char *>malloc(TAG_SIZE)
if not out:
raise MemoryError
try:
@ -144,15 +161,20 @@ cdef class AES:
if not EVP_EncryptFinal_ex(&self.ctx, out+ctl, &outl):
raise Exception('EVP_EncryptFinal failed')
ctl += outl
return out[:ctl]
# Get tag
if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_GET_TAG, TAG_SIZE, tag):
raise Exception('EVP_CIPHER_CTX_ctrl GET TAG failed')
# hack: caller wants 32B tags (256b), so we give back that amount
return (tag[:TAG_SIZE] + b'\x00'*16), out[:ctl]
finally:
free(tag)
free(out)
def decrypt(self, data):
def check_tag_and_decrypt(self, tag, data):
cdef int inl = len(data)
cdef int ptl = 0
cdef int outl = 0
# note: modes that use padding, need up to one extra AES block (16b).
# note: modes that use padding, need up to one extra AES block (16B).
# This is what the openssl docs say. I am not sure this is correct,
# but OTOH it will not cause any harm if our buffer is a little bigger.
cdef unsigned char *out = <unsigned char *>malloc(inl+16)
@ -162,10 +184,11 @@ cdef class AES:
if not EVP_DecryptUpdate(&self.ctx, out, &outl, data, inl):
raise Exception('EVP_DecryptUpdate failed')
ptl = outl
# Set expected tag value.
if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_SET_TAG, TAG_SIZE, tag):
raise Exception('EVP_CIPHER_CTX_ctrl SET TAG failed')
if EVP_DecryptFinal_ex(&self.ctx, out+ptl, &outl) <= 0:
# this error check is very important for modes with padding or
# authentication. for them, a failure here means corrupted data.
# CTR mode does not use padding nor authentication.
# a failure here means corrupted / tampered tag or data
raise Exception('EVP_DecryptFinal failed')
ptl += outl
return out[:ptl]

View file

@ -114,6 +114,21 @@ class HMAC_SHA512_256(HMAC):
super().__init__(key, data, sha512_256)
class GMAC:
def __init__(self, key, data):
if key is None:
raise Exception("do not use GMAC if you don't have a key")
self.key = key
self.data = data
def digest(self):
mac_cipher = AES(is_encrypt=True, key=self.key, iv=b'\0'*16) # XXX do we need an IV here?
# GMAC = aes-gcm with all data as AAD, no data as to-be-encrypted data
mac_cipher.add(bytes(self.data))
tag, _ = mac_cipher.compute_tag_and_encrypt(b'')
return tag
MAC_DEFAULT = HMAC_SHA256.TYPE
@ -156,7 +171,15 @@ class KeyBase(object):
self.maccer = maccer
def id_hash(self, data):
"""Return HMAC hash using the "id" HMAC key
"""Return a HASH (no id_key) or a MAC (using the "id_key" key)
XXX do we need a cryptographic hash function here or is a keyed hash
function like GMAC / GHASH good enough? See NIST SP 800-38D.
IMPORTANT: in 1 repo, there should be only 1 kind of id_hash, otherwise
data hashed/maced with one id_hash might result in same ID as already
exists in the repo for other data created with another id_hash method.
somehow unlikely considering 128 or 256bits, but still.
"""
def encrypt(self, data):
@ -205,32 +228,46 @@ class PlaintextKey(KeyBase):
class AESKeyBase(KeyBase):
"""Common base class shared by KeyfileKey and PassphraseKey
Chunks are encrypted using 256bit AES in Counter Mode (CTR)
Chunks are encrypted using 256bit AES in Galois Counter Mode (GCM)
Payload layout: TYPE(1) + TAG(32) + NONCE(8) + CIPHERTEXT
To reduce payload size only 8 bytes of the 16 bytes nonce is saved
in the payload, the first 8 bytes are always zeros. This does not
affect security but limits the maximum repository capacity to
only 295 exabytes!
"""
def id_hash(self, data):
"""Return HMAC hash using the "id" HMAC key
"""
return self.maccer(self.id_key, data).digest()
return GMAC(self.id_key, data).digest()
#return self.maccer(self.id_key, data).digest()
def encrypt(self, data):
data = self.compressor.compress(data)
self.enc_cipher.reset()
stored_iv = self.enc_cipher.iv[8:]
data = self.enc_cipher.encrypt(data)
hmac = self.maccer(self.enc_hmac_key, stored_iv + data).digest()
self.enc_cipher.reset(iv=self.enc_iv)
iv_last8 = self.enc_iv[8:]
self.enc_cipher.add(iv_last8)
tag, data = self.enc_cipher.compute_tag_and_encrypt(data)
# increase the IV (counter) value so same value is never used twice
current_iv = bytes_to_long(iv_last8)
self.enc_iv = PREFIX + long_to_bytes(current_iv + num_aes_blocks(len(data)))
meta = Meta(compr_type=self.compressor.TYPE, crypt_type=self.TYPE, mac_type=self.maccer.TYPE,
hmac=hmac, stored_iv=stored_iv)
hmac=tag, stored_iv=iv_last8)
return generate(meta, data)
def decrypt(self, id, data):
meta, data, compressor, crypter, maccer = parser(data)
assert isinstance(self, crypter)
assert self.maccer is maccer
computed_hmac = self.maccer(self.enc_hmac_key, meta.stored_iv + data).digest()
if computed_hmac != meta.hmac:
iv_last8 = meta.stored_iv
iv = PREFIX + iv_last8
self.dec_cipher.reset(iv=iv)
self.dec_cipher.add(iv_last8)
tag = meta.hmac # TODO rename Meta element name to be generic
try:
data = self.dec_cipher.check_tag_and_decrypt(tag, data)
except Exception:
raise IntegrityError('Encryption envelope checksum mismatch')
self.dec_cipher.reset(iv=PREFIX + meta.stored_iv)
data = self.compressor.decompress(self.dec_cipher.decrypt(data))
data = self.compressor.decompress(data)
if id and self.id_hash(data) != id:
raise IntegrityError('Chunk id verification failed')
return data
@ -243,14 +280,15 @@ class AESKeyBase(KeyBase):
def init_from_random_data(self, data):
self.enc_key = data[0:32]
self.enc_hmac_key = data[32:64]
self.enc_hmac_key = data[32:64] # XXX enc_hmac_key not used for AES-GCM
self.id_key = data[64:96]
self.chunk_seed = bytes_to_int(data[96:100])
# Convert to signed int32
if self.chunk_seed & 0x80000000:
self.chunk_seed = self.chunk_seed - 0xffffffff - 1
def init_ciphers(self, enc_iv=b''):
def init_ciphers(self, enc_iv=PREFIX * 2): # default IV = 16B zero
self.enc_iv = enc_iv
self.enc_cipher = AES(is_encrypt=True, key=self.enc_key, iv=enc_iv)
self.dec_cipher = AES(is_encrypt=False, key=self.enc_key)
@ -359,25 +397,25 @@ class KeyfileKey(AESKeyBase):
def decrypt_key_file(self, data, passphrase):
d = msgpack.unpackb(data)
assert d[b'version'] == 1
assert d[b'algorithm'] == b'sha256'
assert d[b'algorithm'] == b'gmac'
key = pbkdf2_sha256(passphrase.encode('utf-8'), d[b'salt'], d[b'iterations'], 32)
data = AES(is_encrypt=False, key=key).decrypt(d[b'data'])
if HMAC(key, data, sha256).digest() != d[b'hash']:
try:
data = AES(is_encrypt=False, key=key, iv=b'\0'*16).check_tag_and_decrypt(d[b'hash'], d[b'data'])
return data
except Exception:
return None
return data
def encrypt_key_file(self, data, passphrase):
salt = get_random_bytes(32)
iterations = 100000
key = pbkdf2_sha256(passphrase.encode('utf-8'), salt, iterations, 32)
hash = HMAC(key, data, sha256).digest()
cdata = AES(is_encrypt=True, key=key).encrypt(data)
tag, cdata = AES(is_encrypt=True, key=key, iv=b'\0'*16).compute_tag_and_encrypt(data)
d = {
'version': 1,
'salt': salt,
'iterations': iterations,
'algorithm': 'sha256',
'hash': hash,
'algorithm': 'gmac',
'hash': tag,
'data': cdata,
}
return msgpack.packb(d)

View file

@ -27,18 +27,20 @@ class CryptoTestCase(AtticTestCase):
self.assert_equal(len(bytes2), 10)
self.assert_not_equal(bytes, bytes2)
def test_aes(self):
def test_aes_gcm(self):
key = b'X' * 32
iv = b'A' * 16
data = b'foo' * 10
# encrypt
aes = AES(is_encrypt=True, key=key)
self.assert_equal(bytes_to_long(aes.iv, 8), 0)
cdata = aes.encrypt(data)
self.assert_equal(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466')
self.assert_equal(bytes_to_long(aes.iv, 8), 2)
# decrypt
aes = AES(is_encrypt=False, key=key)
self.assert_equal(bytes_to_long(aes.iv, 8), 0)
pdata = aes.decrypt(cdata)
aes = AES(is_encrypt=True, key=key, iv=iv)
tag, cdata = aes.compute_tag_and_encrypt(data)
self.assert_equal(hexlify(tag), b'c98aa10eb6b7031bcc2160878d9438fb00000000000000000000000000000000')
self.assert_equal(hexlify(cdata), b'841bcce405df769d22ee9f7f012edf5dc7fb2594d924c7400ffd050f2741')
# decrypt (correct tag/cdata)
aes = AES(is_encrypt=False, key=key, iv=iv)
pdata = aes.check_tag_and_decrypt(tag, cdata)
self.assert_equal(data, pdata)
self.assert_equal(bytes_to_long(aes.iv, 8), 2)
# decrypt (incorrect tag/cdata)
aes = AES(is_encrypt=False, key=key, iv=iv)
cdata = b'x' + cdata[1:] # corrupt cdata
self.assertRaises(Exception, aes.check_tag_and_decrypt, tag, cdata)

View file

@ -17,20 +17,20 @@ class KeyTestCase(AtticTestCase):
mac = None
keyfile2_key_file = """
ATTIC KEY 0000000000000000000000000000000000000000000000000000000000000000
hqppdGVyYXRpb25zzgABhqCkaGFzaNoAIMyonNI+7Cjv0qHi0AOBM6bLGxACJhfgzVD2oq
bIS9SFqWFsZ29yaXRobaZzaGEyNTakc2FsdNoAINNK5qqJc1JWSUjACwFEWGTdM7Nd0a5l
1uBGPEb+9XM9p3ZlcnNpb24BpGRhdGHaANAYDT5yfPpU099oBJwMomsxouKyx/OG4QIXK2
hQCG2L2L/9PUu4WIuKvGrsXoP7syemujNfcZws5jLp2UPva4PkQhQsrF1RYDEMLh2eF9Ol
rwtkThq1tnh7KjWMG9Ijt7/aoQtq0zDYP/xaFF8XXSJxiyP5zjH5+spB6RL0oQHvbsliSh
/cXJq7jrqmrJ1phd6dg4SHAM/i+hubadZoS6m25OQzYAW09wZD/phG8OVa698Z5ed3HTaT
SmrtgJL3EoOKgUI9d6BLE4dJdBqntifo""".strip()
ATTIC KEY 0000000000000000000000000000000000000000000000000000000000000000
hqppdGVyYXRpb25zzgABhqCkc2FsdNoAICiRWfijWqIuvr+70VzOsUS4Y6NM45FWm6LgCu
2GyalGqWFsZ29yaXRobaRnbWFjpGhhc2jaACDgCK7u30Pi+Du1qHRyWBupAAAAAAAAAAAA
AAAAAAAAAKd2ZXJzaW9uAaRkYXRh2gDQrlCtq2mzdmkuhwIoko5+amxYqnlfNHHZxRFiX9
F8AliP7H6S0j9uHyrBKRDWtj7VGYWVW8COy/FncLRgRhspB59rH3y/GS6pfeEw7RWUPd32
eOcB6v8q+IHUvGttyFRcN6PxSFHBhOKN0jqStP0UqXLv+d9rGWi6X/HNZGu9WPkqs/g0G9
xnf48i9pOy19aQo3HV//ubf+VYWmc1J8zjCS2Og0JkMtxbqM6j4mShPjkURZZBXSJGtORV
5IzNAzixJWmr8LR12TmFGVb0U9P79A==""".strip()
keyfile2_cdata = unhexlify(re.sub('\W', '', """
0055f161493fcfc16276e8c31493c4641e1eb19a79d0326fad0291e5a9c98e5933
00000000000003e8d21eaf9b86c297a8cd56432e1915bb
004078370be366ac3ad9d147992be8ebee000000000000000000000000000000000000000000000000
b94bfb5d0a63b0c47cf74e2d0585aa
"""))
keyfile2_id = unhexlify('c3fbf14bc001ebcc3cd86e696c13482ed071740927cd7cbe1b01b4bfcee49314')
keyfile2_id = unhexlify('45f309b4ef353c467d16a19039b87e5400000000000000000000000000000000')
def setUp(self):
self.tmppath = tempfile.mkdtemp()
@ -46,6 +46,15 @@ class KeyTestCase(AtticTestCase):
_location = _Location()
id = bytes(32)
def _test_make_testdata(self):
# modify tearDown to not kill the key file first, before using this
os.environ['ATTIC_PASSPHRASE'] = 'passphrase'
key = KeyfileKey.create(self.MockRepository(), self.MockArgs())
print("keyfile2_key_file: find the it in the filesystem, see location in test log output")
print("keyfile2_cdata:", hexlify(key.encrypt(b'payload')))
print("keyfile2_id:", hexlify(key.id_hash(b'payload')))
assert False
def test_plaintext(self):
key = PlaintextKey.create(None, self.MockArgs())
data = b'foo'
@ -55,7 +64,7 @@ class KeyTestCase(AtticTestCase):
def test_keyfile(self):
os.environ['ATTIC_PASSPHRASE'] = 'test'
key = KeyfileKey.create(self.MockRepository(), self.MockArgs())
self.assert_equal(bytes_to_long(key.enc_cipher.iv, 8), 0)
self.assert_equal(bytes_to_long(key.enc_iv, 8), 0)
manifest = key.encrypt(b'XXX')
self.assert_equal(key.extract_nonce(manifest), 0)
manifest2 = key.encrypt(b'XXX')
@ -65,7 +74,7 @@ class KeyTestCase(AtticTestCase):
iv = key.extract_nonce(manifest)
key2 = KeyfileKey.detect(self.MockRepository(), manifest)
# we just assume that the payload fits into 1 AES block (which is given for b'XXX').
self.assert_equal(bytes_to_long(key2.enc_cipher.iv, 8), iv + 1)
self.assert_equal(bytes_to_long(key2.enc_iv, 8), iv + 1)
# Key data sanity check
self.assert_equal(len(set([key2.id_key, key2.enc_key, key2.enc_hmac_key])), 3)
self.assert_equal(key2.chunk_seed == 0, False)
@ -82,7 +91,7 @@ class KeyTestCase(AtticTestCase):
def test_passphrase(self):
os.environ['ATTIC_PASSPHRASE'] = 'test'
key = PassphraseKey.create(self.MockRepository(), self.MockArgs())
self.assert_equal(bytes_to_long(key.enc_cipher.iv, 8), 0)
self.assert_equal(bytes_to_long(key.enc_iv, 8), 0)
self.assert_equal(hexlify(key.id_key), b'793b0717f9d8fb01c751a487e9b827897ceea62409870600013fbc6b4d8d7ca6')
self.assert_equal(hexlify(key.enc_hmac_key), b'b885a05d329a086627412a6142aaeb9f6c54ab7950f996dd65587251f6bc0901')
self.assert_equal(hexlify(key.enc_key), b'2ff3654c6daf7381dbbe718d2b20b4f1ea1e34caa6cc65f6bb3ac376b93fed2a')
@ -96,11 +105,11 @@ class KeyTestCase(AtticTestCase):
iv = key.extract_nonce(manifest)
key2 = PassphraseKey.detect(self.MockRepository(), manifest)
# we just assume that the payload fits into 1 AES block (which is given for b'XXX').
self.assert_equal(bytes_to_long(key2.enc_cipher.iv, 8), iv + 1)
self.assert_equal(bytes_to_long(key2.enc_iv, 8), iv + 1)
self.assert_equal(key.id_key, key2.id_key)
self.assert_equal(key.enc_hmac_key, key2.enc_hmac_key)
self.assert_equal(key.enc_key, key2.enc_key)
self.assert_equal(key.chunk_seed, key2.chunk_seed)
data = b'foo'
self.assert_equal(hexlify(key.id_hash(data)), b'818217cf07d37efad3860766dcdf1d21e401650fed2d76ed1d797d3aae925990')
self.assert_equal(hexlify(key.id_hash(data)), b'a409d69859b8a07625f066e42cde050100000000000000000000000000000000')
self.assert_equal(data, key2.decrypt(key2.id_hash(data), key.encrypt(data)))