init ciphersuites with header_len and aad_offset

it's needed for extract_iv already, so it should be given to init, not encrypt/decrypt
This commit is contained in:
Thomas Waldmann 2016-09-03 01:35:34 +02:00
parent e9bbf9307d
commit 37cf3ef469
3 changed files with 71 additions and 55 deletions

View file

@ -359,14 +359,14 @@ class AESKeyBase(KeyBase):
def encrypt(self, chunk):
data = self.compressor.compress(chunk)
self.nonce_manager.ensure_reservation(self.cipher.block_count(len(data)))
return self.cipher.encrypt(data, header=self.TYPE_STR, aad_offset=1)
return self.cipher.encrypt(data, header=self.TYPE_STR)
def decrypt(self, id, data, decompress=True):
if not (data[0] == self.TYPE or
data[0] == PassphraseKey.TYPE and isinstance(self, RepoKey)):
id_str = bin_to_hex(id) if id is not None else '(unknown)'
raise IntegrityError('Chunk %s: Invalid encryption envelope' % id_str)
payload = self.cipher.decrypt(data, header_len=1, aad_offset=1)
payload = self.cipher.decrypt(data)
if not decompress:
return payload
data = self.decompress(payload)
@ -385,7 +385,7 @@ class AESKeyBase(KeyBase):
self.chunk_seed = self.chunk_seed - 0xffffffff - 1
def init_ciphers(self, manifest_data=None):
self.cipher = CIPHERSUITE(mac_key=self.enc_hmac_key, enc_key=self.enc_key)
self.cipher = CIPHERSUITE(mac_key=self.enc_hmac_key, enc_key=self.enc_key, header_len=1, aad_offset=1)
if manifest_data is None:
nonce = 0
else:

View file

@ -201,12 +201,13 @@ cdef Py_buffer ro_buffer(object data) except *:
class UNENCRYPTED:
# Layout: HEADER + PlainText
def __init__(self, mac_key, enc_key, iv=None):
def __init__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
assert mac_key is None
assert enc_key is None
self.header_len = header_len
self.set_iv(iv)
def encrypt(self, data, header=b'', aad_offset=0, iv=None):
def encrypt(self, data, header=b'', iv=None):
"""
IMPORTANT: it is called encrypt to satisfy the crypto api naming convention,
but this does NOT encrypt and it does NOT compute and store a MAC either.
@ -216,13 +217,13 @@ class UNENCRYPTED:
assert self.iv is not None, 'iv needs to be set before encrypt is called'
return header + data
def decrypt(self, envelope, header_len=0, aad_offset=0):
def decrypt(self, envelope):
"""
IMPORTANT: it is called decrypt to satisfy the crypto api naming convention,
but this does NOT decrypt and it does NOT verify a MAC either, because data
is not encrypted and there is no MAC.
"""
return memoryview(envelope)[header_len:]
return memoryview(envelope)[self.header_len:]
def block_count(self, length):
return 0
@ -246,16 +247,21 @@ cdef class AES256_CTR_HMAC_SHA256:
cdef unsigned char *enc_key
cdef int cipher_blk_len
cdef int iv_len, iv_len_short
cdef int aad_offset
cdef int header_len
cdef int mac_len
cdef unsigned char iv[16]
cdef long long blocks
def __init__(self, mac_key, enc_key, iv=None):
def __init__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
assert isinstance(mac_key, bytes) and len(mac_key) == 32
assert isinstance(enc_key, bytes) and len(enc_key) == 32
self.cipher_blk_len = 16
self.iv_len = sizeof(self.iv)
self.iv_len_short = 8
assert aad_offset <= header_len
self.aad_offset = aad_offset
self.header_len = header_len
self.mac_len = 32
self.mac_key = mac_key
self.enc_key = enc_key
@ -264,7 +270,7 @@ cdef class AES256_CTR_HMAC_SHA256:
else:
self.blocks = -1 # make sure set_iv is called before encrypt
def __cinit__(self, mac_key, enc_key, iv=None):
def __cinit__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
self.ctx = EVP_CIPHER_CTX_new()
self.hmac_ctx = HMAC_CTX_new()
@ -272,7 +278,7 @@ cdef class AES256_CTR_HMAC_SHA256:
EVP_CIPHER_CTX_free(self.ctx)
HMAC_CTX_free(self.hmac_ctx)
def encrypt(self, data, header=b'', aad_offset=0, iv=None):
def encrypt(self, data, header=b'', iv=None):
"""
encrypt data, compute mac over aad + iv + cdata, prepend header.
aad_offset is the offset into the header where aad starts.
@ -282,7 +288,8 @@ cdef class AES256_CTR_HMAC_SHA256:
assert self.blocks == 0, 'iv needs to be set before encrypt is called'
cdef int ilen = len(data)
cdef int hlen = len(header)
cdef int aoffset = aad_offset
assert hlen == self.header_len
cdef int aoffset = self.aad_offset
cdef int alen = hlen - aoffset
cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(hlen + self.mac_len + self.iv_len_short +
ilen + self.cipher_blk_len) # play safe, 1 extra blk
@ -326,13 +333,14 @@ cdef class AES256_CTR_HMAC_SHA256:
PyBuffer_Release(&hdata)
PyBuffer_Release(&idata)
def decrypt(self, envelope, header_len=0, aad_offset=0):
def decrypt(self, envelope):
"""
authenticate aad + iv + cdata, decrypt cdata, ignore header bytes up to aad_offset.
"""
cdef int ilen = len(envelope)
cdef int hlen = header_len
cdef int aoffset = aad_offset
cdef int hlen = self.header_len
assert hlen == self.header_len
cdef int aoffset = self.aad_offset
cdef int alen = hlen - aoffset
cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len) # play safe, 1 extra blk
if not odata:
@ -399,7 +407,7 @@ cdef class AES256_CTR_HMAC_SHA256:
iv_out[i] = iv[(self.iv_len-self.iv_len_short)+i]
def extract_iv(self, envelope):
offset = 1 + self.mac_len
offset = self.header_len + self.mac_len
return bytes_to_long(envelope[offset:offset+self.iv_len_short])
@ -414,14 +422,20 @@ cdef class _AEAD_BASE:
cdef unsigned char *enc_key
cdef int cipher_blk_len
cdef int iv_len
cdef int aad_offset
cdef int header_len
cdef int mac_len
cdef unsigned char iv[12]
cdef long long blocks
def __init__(self, mac_key, enc_key, iv=None):
def __init__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
assert mac_key is None
assert isinstance(enc_key, bytes) and len(enc_key) == 32
self.iv_len = sizeof(self.iv)
self.header_len = 1
assert aad_offset <= header_len
self.aad_offset = aad_offset
self.header_len = header_len
self.mac_len = 16
self.enc_key = enc_key
if iv is not None:
@ -429,13 +443,13 @@ cdef class _AEAD_BASE:
else:
self.blocks = -1 # make sure set_iv is called before encrypt
def __cinit__(self, mac_key, enc_key, iv=None):
def __cinit__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
self.ctx = EVP_CIPHER_CTX_new()
def __dealloc__(self):
EVP_CIPHER_CTX_free(self.ctx)
def encrypt(self, data, header=b'', aad_offset=0, iv=None):
def encrypt(self, data, header=b'', iv=None):
"""
encrypt data, compute mac over aad + iv + cdata, prepend header.
aad_offset is the offset into the header where aad starts.
@ -445,7 +459,8 @@ cdef class _AEAD_BASE:
assert self.blocks == 0, 'iv needs to be set before encrypt is called'
cdef int ilen = len(data)
cdef int hlen = len(header)
cdef int aoffset = aad_offset
assert hlen == self.header_len
cdef int aoffset = self.aad_offset
cdef int alen = hlen - aoffset
cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(hlen + self.mac_len + self.iv_len +
ilen + self.cipher_blk_len)
@ -493,13 +508,14 @@ cdef class _AEAD_BASE:
PyBuffer_Release(&hdata)
PyBuffer_Release(&idata)
def decrypt(self, envelope, header_len=0, aad_offset=0):
def decrypt(self, envelope):
"""
authenticate aad + iv + cdata, decrypt cdata, ignore header bytes up to aad_offset.
"""
cdef int ilen = len(envelope)
cdef int hlen = header_len
cdef int aoffset = aad_offset
cdef int hlen = self.header_len
assert hlen == self.header_len
cdef int aoffset = self.aad_offset
cdef int alen = hlen - aoffset
cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len)
if not odata:
@ -573,7 +589,7 @@ cdef class _AEAD_BASE:
iv_out[i] = iv[i]
def extract_iv(self, envelope):
offset = 1 + self.mac_len # XXX 1 -> self.header_len
offset = self.header_len + self.mac_len
return bytes_to_long(envelope[offset:offset+self.iv_len])
@ -590,27 +606,27 @@ cdef class _CHACHA_BASE(_AEAD_BASE):
cdef class AES256_GCM(_AES_BASE):
def __init__(self, mac_key, enc_key, iv=None):
def __init__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
if OPENSSL_VERSION_NUMBER < 0x10001040:
raise ValueError('AES GCM requires OpenSSL >= 1.0.1d. Detected: OpenSSL %08x' % OPENSSL_VERSION_NUMBER)
self.cipher = EVP_aes_256_gcm
super().__init__(mac_key, enc_key, iv=iv)
super().__init__(mac_key, enc_key, iv=iv, header_len=header_len, aad_offset=aad_offset)
cdef class AES256_OCB(_AES_BASE):
def __init__(self, mac_key, enc_key, iv=None):
def __init__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
if OPENSSL_VERSION_NUMBER < 0x10100000:
raise ValueError('AES OCB requires OpenSSL >= 1.1.0. Detected: OpenSSL %08x' % OPENSSL_VERSION_NUMBER)
self.cipher = EVP_aes_256_ocb
super().__init__(mac_key, enc_key, iv=iv)
super().__init__(mac_key, enc_key, iv=iv, header_len=header_len, aad_offset=aad_offset)
cdef class CHACHA20_POLY1305(_CHACHA_BASE):
def __init__(self, mac_key, enc_key, iv=None):
def __init__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
if OPENSSL_VERSION_NUMBER < 0x10100000:
raise ValueError('CHACHA20-POLY1305 requires OpenSSL >= 1.1.0. Detected: OpenSSL %08x' % OPENSSL_VERSION_NUMBER)
self.cipher = EVP_chacha20_poly1305
super().__init__(mac_key, enc_key, iv=iv)
super().__init__(mac_key, enc_key, iv=iv, header_len=header_len, aad_offset=aad_offset)
cdef class AES:

View file

@ -45,10 +45,10 @@ class CryptoTestCase(BaseTestCase):
iv = b'' # any IV is ok, it just must be set and not None
data = b'data'
header = b'header'
cs = UNENCRYPTED(None, None, iv)
cs = UNENCRYPTED(None, None, iv, header_len=6)
envelope = cs.encrypt(data, header=header)
self.assert_equal(envelope, header + data)
got_data = cs.decrypt(envelope, header_len=len(header))
got_data = cs.decrypt(envelope)
self.assert_equal(got_data, data)
def test_AES256_CTR_HMAC_SHA256(self):
@ -59,8 +59,8 @@ class CryptoTestCase(BaseTestCase):
data = b'foo' * 10
header = b'\x42'
# encrypt-then-mac
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, iv)
hdr_mac_iv_cdata = cs.encrypt(data, header=header, aad_offset=1)
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, iv, header_len=1, aad_offset=1)
hdr_mac_iv_cdata = cs.encrypt(data, header=header)
hdr = hdr_mac_iv_cdata[0:1]
mac = hdr_mac_iv_cdata[1:33]
iv = hdr_mac_iv_cdata[33:41]
@ -71,15 +71,15 @@ class CryptoTestCase(BaseTestCase):
self.assert_equal(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466')
self.assert_equal(hexlify(cs.next_iv()), b'00000000000000000000000000000002')
# auth-then-decrypt
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key)
pdata = cs.decrypt(hdr_mac_iv_cdata, header_len=len(header), aad_offset=1)
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1)
pdata = cs.decrypt(hdr_mac_iv_cdata)
self.assert_equal(data, pdata)
self.assert_equal(hexlify(cs.next_iv()), b'00000000000000000000000000000002')
# auth-failure due to corruption (corrupted data)
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key)
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1)
hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:41] + b'\0' + hdr_mac_iv_cdata[42:]
self.assert_raises(IntegrityError,
lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted, header_len=len(header), aad_offset=1))
lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted))
def test_AES256_CTR_HMAC_SHA256_aad(self):
mac_key = b'Y' * 32
@ -88,8 +88,8 @@ class CryptoTestCase(BaseTestCase):
data = b'foo' * 10
header = b'\x12\x34\x56'
# encrypt-then-mac
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, iv)
hdr_mac_iv_cdata = cs.encrypt(data, header=header, aad_offset=1)
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, iv, header_len=3, aad_offset=1)
hdr_mac_iv_cdata = cs.encrypt(data, header=header)
hdr = hdr_mac_iv_cdata[0:3]
mac = hdr_mac_iv_cdata[3:35]
iv = hdr_mac_iv_cdata[35:43]
@ -100,15 +100,15 @@ class CryptoTestCase(BaseTestCase):
self.assert_equal(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466')
self.assert_equal(hexlify(cs.next_iv()), b'00000000000000000000000000000002')
# auth-then-decrypt
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key)
pdata = cs.decrypt(hdr_mac_iv_cdata, header_len=len(header), aad_offset=1)
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1)
pdata = cs.decrypt(hdr_mac_iv_cdata)
self.assert_equal(data, pdata)
self.assert_equal(hexlify(cs.next_iv()), b'00000000000000000000000000000002')
# auth-failure due to corruption (corrupted aad)
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key)
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1)
hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:1] + b'\0' + hdr_mac_iv_cdata[2:]
self.assert_raises(IntegrityError,
lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted, header_len=len(header), aad_offset=1))
lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted))
def test_AE(self):
# used in legacy-like layout (1 type byte, no aad)
@ -134,8 +134,8 @@ class CryptoTestCase(BaseTestCase):
]
for cs_cls, exp_mac, exp_cdata in tests:
# encrypt/mac
cs = cs_cls(mac_key, enc_key, iv)
hdr_mac_iv_cdata = cs.encrypt(data, header=header, aad_offset=1)
cs = cs_cls(mac_key, enc_key, iv, header_len=1, aad_offset=1)
hdr_mac_iv_cdata = cs.encrypt(data, header=header)
hdr = hdr_mac_iv_cdata[0:1]
mac = hdr_mac_iv_cdata[1:17]
iv = hdr_mac_iv_cdata[17:29]
@ -146,15 +146,15 @@ class CryptoTestCase(BaseTestCase):
self.assert_equal(hexlify(cdata), exp_cdata)
self.assert_equal(hexlify(cs.next_iv()), b'000000000000000000000001')
# auth/decrypt
cs = cs_cls(mac_key, enc_key)
pdata = cs.decrypt(hdr_mac_iv_cdata, header_len=len(header), aad_offset=1)
cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1)
pdata = cs.decrypt(hdr_mac_iv_cdata)
self.assert_equal(data, pdata)
self.assert_equal(hexlify(cs.next_iv()), b'000000000000000000000001')
# auth-failure due to corruption (corrupted data)
cs = cs_cls(mac_key, enc_key)
cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1)
hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:29] + b'\0' + hdr_mac_iv_cdata[30:]
self.assert_raises(IntegrityError,
lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted, header_len=len(header), aad_offset=1))
lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted))
def test_AEAD(self):
# test with aad
@ -180,8 +180,8 @@ class CryptoTestCase(BaseTestCase):
]
for cs_cls, exp_mac, exp_cdata in tests:
# encrypt/mac
cs = cs_cls(mac_key, enc_key, iv)
hdr_mac_iv_cdata = cs.encrypt(data, header=header, aad_offset=1)
cs = cs_cls(mac_key, enc_key, iv, header_len=3, aad_offset=1)
hdr_mac_iv_cdata = cs.encrypt(data, header=header)
hdr = hdr_mac_iv_cdata[0:3]
mac = hdr_mac_iv_cdata[3:19]
iv = hdr_mac_iv_cdata[19:31]
@ -192,15 +192,15 @@ class CryptoTestCase(BaseTestCase):
self.assert_equal(hexlify(cdata), exp_cdata)
self.assert_equal(hexlify(cs.next_iv()), b'000000000000000000000001')
# auth/decrypt
cs = cs_cls(mac_key, enc_key)
pdata = cs.decrypt(hdr_mac_iv_cdata, header_len=len(header), aad_offset=1)
cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1)
pdata = cs.decrypt(hdr_mac_iv_cdata)
self.assert_equal(data, pdata)
self.assert_equal(hexlify(cs.next_iv()), b'000000000000000000000001')
# auth-failure due to corruption (corrupted aad)
cs = cs_cls(mac_key, enc_key)
cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1)
hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:1] + b'\0' + hdr_mac_iv_cdata[2:]
self.assert_raises(IntegrityError,
lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted, header_len=len(header), aad_offset=1))
lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted))
def test_hmac_sha256(self):
# RFC 4231 test vectors