mirror of
https://github.com/borgbackup/borg.git
synced 2026-06-11 09:59:19 -04:00
use AES-GCM (an AEAD single-pass mac&cipher)
This has special and extremely fast HW acceleration on e.g recent Intel CPUs: AES-NI and PCLMULQDQ. Notes: a) I had to kill AES.iv method, it just did not work for aes-gcm as done by openssl. As the incremented IV (counter) can't be read back, we have to keep and manually increment it in Key.enc_iv. b) there is a hack in AES.compute_tag_and_encrypt to add 16B of zero to the right of the gmac (which is also 16B) because the current callers expects 32B. AES.check_tag_and_encrypt is tolerant of such a 32B tag, but will only use the left 16B and ignore the right 16B if needed. this is a bit dirty, but I didn't want to change the header layout within this changeset. c) switched from mac&encrypt to encrypt-then-mac (using aes-gcm) for the keyfile 'data' entry d) also added a test that creates the testdata needed for the constants at top of testsuite/key.py e) I kept enc_hmac_key although it is not used by the code in this changeset. But we'll need to keep supporting the old algorithms, too.
This commit is contained in:
parent
1f4077d870
commit
1e1d80c7b0
4 changed files with 123 additions and 66 deletions
|
|
@ -7,6 +7,9 @@ from libc.stdlib cimport malloc, free
|
|||
|
||||
API_VERSION = 2
|
||||
|
||||
TAG_SIZE = 16 # bytes; 128 bits is the maximum allowed value. see "hack" below.
|
||||
IV_SIZE = 16 # bytes; 128 bits
|
||||
|
||||
cdef extern from "openssl/rand.h":
|
||||
int RAND_bytes(unsigned char *buf, int num)
|
||||
|
||||
|
|
@ -22,7 +25,7 @@ cdef extern from "openssl/evp.h":
|
|||
ctypedef struct ENGINE:
|
||||
pass
|
||||
const EVP_MD *EVP_sha256()
|
||||
const EVP_CIPHER *EVP_aes_256_ctr()
|
||||
const EVP_CIPHER *EVP_aes_256_gcm()
|
||||
void EVP_CIPHER_CTX_init(EVP_CIPHER_CTX *a)
|
||||
void EVP_CIPHER_CTX_cleanup(EVP_CIPHER_CTX *a)
|
||||
|
||||
|
|
@ -36,11 +39,14 @@ cdef extern from "openssl/evp.h":
|
|||
const unsigned char *in_, int inl)
|
||||
int EVP_EncryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl)
|
||||
int EVP_DecryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl)
|
||||
|
||||
int EVP_CIPHER_CTX_ctrl(EVP_CIPHER_CTX *ctx, int type, int arg, unsigned char *ptr)
|
||||
int PKCS5_PBKDF2_HMAC(const char *password, int passwordlen,
|
||||
const unsigned char *salt, int saltlen, int iter,
|
||||
const EVP_MD *digest,
|
||||
int keylen, unsigned char *out)
|
||||
int EVP_CTRL_GCM_GET_TAG
|
||||
int EVP_CTRL_GCM_SET_TAG
|
||||
int EVP_CTRL_GCM_SET_IVLEN
|
||||
|
||||
import struct
|
||||
|
||||
|
|
@ -98,7 +104,7 @@ cdef class AES:
|
|||
EVP_CIPHER_CTX_init(&self.ctx)
|
||||
self.is_encrypt = is_encrypt
|
||||
# Set cipher type and mode
|
||||
cipher_mode = EVP_aes_256_ctr()
|
||||
cipher_mode = EVP_aes_256_gcm()
|
||||
if self.is_encrypt:
|
||||
if not EVP_EncryptInit_ex(&self.ctx, cipher_mode, NULL, NULL, NULL):
|
||||
raise Exception('EVP_EncryptInit_ex failed')
|
||||
|
|
@ -117,6 +123,9 @@ cdef class AES:
|
|||
key2 = key
|
||||
if iv:
|
||||
iv2 = iv
|
||||
# Set IV length (bytes)
|
||||
if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_SET_IVLEN, IV_SIZE, NULL):
|
||||
raise Exception('EVP_CIPHER_CTX_ctrl SET IVLEN failed')
|
||||
# Initialise key and IV
|
||||
if self.is_encrypt:
|
||||
if not EVP_EncryptInit_ex(&self.ctx, NULL, NULL, key2, iv2):
|
||||
|
|
@ -125,16 +134,24 @@ cdef class AES:
|
|||
if not EVP_DecryptInit_ex(&self.ctx, NULL, NULL, key2, iv2):
|
||||
raise Exception('EVP_DecryptInit_ex failed')
|
||||
|
||||
@property
|
||||
def iv(self):
|
||||
return self.ctx.iv[:16]
|
||||
def add(self, aad):
|
||||
cdef int aadl = len(aad)
|
||||
cdef int outl
|
||||
# Zero or more calls to specify any AAD
|
||||
if self.is_encrypt:
|
||||
if not EVP_EncryptUpdate(&self.ctx, NULL, &outl, aad, aadl):
|
||||
raise Exception('EVP_EncryptUpdate failed')
|
||||
else: # decrypt
|
||||
if not EVP_DecryptUpdate(&self.ctx, NULL, &outl, aad, aadl):
|
||||
raise Exception('EVP_DecryptUpdate failed')
|
||||
|
||||
def encrypt(self, data):
|
||||
def compute_tag_and_encrypt(self, data):
|
||||
cdef int inl = len(data)
|
||||
cdef int ctl = 0
|
||||
cdef int outl = 0
|
||||
# note: modes that use padding, need up to one extra AES block (16b)
|
||||
# note: modes that use padding, need up to one extra AES block (16B)
|
||||
cdef unsigned char *out = <unsigned char *>malloc(inl+16)
|
||||
cdef unsigned char *tag = <unsigned char *>malloc(TAG_SIZE)
|
||||
if not out:
|
||||
raise MemoryError
|
||||
try:
|
||||
|
|
@ -144,15 +161,20 @@ cdef class AES:
|
|||
if not EVP_EncryptFinal_ex(&self.ctx, out+ctl, &outl):
|
||||
raise Exception('EVP_EncryptFinal failed')
|
||||
ctl += outl
|
||||
return out[:ctl]
|
||||
# Get tag
|
||||
if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_GET_TAG, TAG_SIZE, tag):
|
||||
raise Exception('EVP_CIPHER_CTX_ctrl GET TAG failed')
|
||||
# hack: caller wants 32B tags (256b), so we give back that amount
|
||||
return (tag[:TAG_SIZE] + b'\x00'*16), out[:ctl]
|
||||
finally:
|
||||
free(tag)
|
||||
free(out)
|
||||
|
||||
def decrypt(self, data):
|
||||
def check_tag_and_decrypt(self, tag, data):
|
||||
cdef int inl = len(data)
|
||||
cdef int ptl = 0
|
||||
cdef int outl = 0
|
||||
# note: modes that use padding, need up to one extra AES block (16b).
|
||||
# note: modes that use padding, need up to one extra AES block (16B).
|
||||
# This is what the openssl docs say. I am not sure this is correct,
|
||||
# but OTOH it will not cause any harm if our buffer is a little bigger.
|
||||
cdef unsigned char *out = <unsigned char *>malloc(inl+16)
|
||||
|
|
@ -162,10 +184,11 @@ cdef class AES:
|
|||
if not EVP_DecryptUpdate(&self.ctx, out, &outl, data, inl):
|
||||
raise Exception('EVP_DecryptUpdate failed')
|
||||
ptl = outl
|
||||
# Set expected tag value.
|
||||
if not EVP_CIPHER_CTX_ctrl(&self.ctx, EVP_CTRL_GCM_SET_TAG, TAG_SIZE, tag):
|
||||
raise Exception('EVP_CIPHER_CTX_ctrl SET TAG failed')
|
||||
if EVP_DecryptFinal_ex(&self.ctx, out+ptl, &outl) <= 0:
|
||||
# this error check is very important for modes with padding or
|
||||
# authentication. for them, a failure here means corrupted data.
|
||||
# CTR mode does not use padding nor authentication.
|
||||
# a failure here means corrupted / tampered tag or data
|
||||
raise Exception('EVP_DecryptFinal failed')
|
||||
ptl += outl
|
||||
return out[:ptl]
|
||||
|
|
|
|||
73
attic/key.py
73
attic/key.py
|
|
@ -50,7 +50,7 @@ class KeyBase(object):
|
|||
self.TYPE_STR = bytes([self.TYPE])
|
||||
|
||||
def id_hash(self, data):
|
||||
"""Return HMAC hash using the "id" HMAC key
|
||||
"""Return a HASH (no id_key) or a MAC (using the "id_key" key)
|
||||
"""
|
||||
|
||||
def encrypt(self, data):
|
||||
|
|
@ -92,9 +92,9 @@ class PlaintextKey(KeyBase):
|
|||
class AESKeyBase(KeyBase):
|
||||
"""Common base class shared by KeyfileKey and PassphraseKey
|
||||
|
||||
Chunks are encrypted using 256bit AES in Counter Mode (CTR)
|
||||
Chunks are encrypted using 256bit AES in Galois Counter Mode (GCM)
|
||||
|
||||
Payload layout: TYPE(1) + HMAC(32) + NONCE(8) + CIPHERTEXT
|
||||
Payload layout: TYPE(1) + TAG(32) + NONCE(8) + CIPHERTEXT
|
||||
|
||||
To reduce payload size only 8 bytes of the 16 bytes nonce is saved
|
||||
in the payload, the first 8 bytes are always zeros. This does not
|
||||
|
|
@ -105,45 +105,68 @@ class AESKeyBase(KeyBase):
|
|||
PAYLOAD_OVERHEAD = 1 + 32 + 8 # TYPE + HMAC + NONCE
|
||||
|
||||
def id_hash(self, data):
|
||||
"""Return HMAC hash using the "id" HMAC key
|
||||
"""
|
||||
return HMAC(self.id_key, data, sha256).digest()
|
||||
Return GMAC using the "id_key" GMAC key
|
||||
|
||||
XXX do we need a cryptographic hash function here or is a keyed hash
|
||||
function like GMAC / GHASH good enough? See NIST SP 800-38D.
|
||||
|
||||
IMPORTANT: in 1 repo, there should be only 1 kind of id_hash, otherwise
|
||||
data hashed/maced with one id_hash might result in same ID as already
|
||||
exists in the repo for other data created with another id_hash method.
|
||||
somehow unlikely considering 128 or 256bits, but still.
|
||||
"""
|
||||
mac_cipher = AES(is_encrypt=True, key=self.id_key, iv=b'\0'*16) # XXX do we need an IV here?
|
||||
# GMAC = aes-gcm with all data as AAD, no data as to-be-encrypted data
|
||||
mac_cipher.add(bytes(data))
|
||||
tag, _ = mac_cipher.compute_tag_and_encrypt(b'')
|
||||
return tag
|
||||
|
||||
def encrypt(self, data):
|
||||
data = zlib.compress(data)
|
||||
self.enc_cipher.reset()
|
||||
data = b''.join((self.enc_cipher.iv[8:], self.enc_cipher.encrypt(data)))
|
||||
hmac = HMAC(self.enc_hmac_key, data, sha256).digest()
|
||||
return b''.join((self.TYPE_STR, hmac, data))
|
||||
self.enc_cipher.reset(iv=self.enc_iv)
|
||||
iv_last8 = self.enc_iv[8:]
|
||||
self.enc_cipher.add(iv_last8)
|
||||
tag, data = self.enc_cipher.compute_tag_and_encrypt(data)
|
||||
# increase the IV (counter) value so same value is never used twice
|
||||
current_iv = bytes_to_long(iv_last8)
|
||||
self.enc_iv = PREFIX + long_to_bytes(current_iv + num_aes_blocks(len(data)))
|
||||
return b''.join((self.TYPE_STR, tag, iv_last8, data))
|
||||
|
||||
def decrypt(self, id, data):
|
||||
if data[0] != self.TYPE:
|
||||
raise IntegrityError('Invalid encryption envelope')
|
||||
hmac = memoryview(data)[1:33]
|
||||
if memoryview(HMAC(self.enc_hmac_key, memoryview(data)[33:], sha256).digest()) != hmac:
|
||||
iv_last8 = data[1+32:1+40]
|
||||
iv = PREFIX + iv_last8
|
||||
self.dec_cipher.reset(iv=iv)
|
||||
self.dec_cipher.add(iv_last8)
|
||||
tag, data = data[1:1+32], data[1+40:]
|
||||
try:
|
||||
data = self.dec_cipher.check_tag_and_decrypt(tag, data)
|
||||
except Exception:
|
||||
raise IntegrityError('Encryption envelope checksum mismatch')
|
||||
self.dec_cipher.reset(iv=PREFIX + data[33:41])
|
||||
data = zlib.decompress(self.dec_cipher.decrypt(data[41:])) # should use memoryview
|
||||
if id and HMAC(self.id_key, data, sha256).digest() != id:
|
||||
data = zlib.decompress(data)
|
||||
if id and self.id_hash(data) != id:
|
||||
raise IntegrityError('Chunk id verification failed')
|
||||
return data
|
||||
|
||||
def extract_nonce(self, payload):
|
||||
if payload[0] != self.TYPE:
|
||||
raise IntegrityError('Invalid encryption envelope')
|
||||
raise IntegrityError('Invalid encryption envelope')
|
||||
nonce = bytes_to_long(payload[33:41])
|
||||
return nonce
|
||||
|
||||
def init_from_random_data(self, data):
|
||||
self.enc_key = data[0:32]
|
||||
self.enc_hmac_key = data[32:64]
|
||||
self.enc_hmac_key = data[32:64] # XXX enc_hmac_key not used for AES-GCM
|
||||
self.id_key = data[64:96]
|
||||
self.chunk_seed = bytes_to_int(data[96:100])
|
||||
# Convert to signed int32
|
||||
if self.chunk_seed & 0x80000000:
|
||||
self.chunk_seed = self.chunk_seed - 0xffffffff - 1
|
||||
|
||||
def init_ciphers(self, enc_iv=b''):
|
||||
def init_ciphers(self, enc_iv=PREFIX * 2): # default IV = 16B zero
|
||||
self.enc_iv = enc_iv
|
||||
self.enc_cipher = AES(is_encrypt=True, key=self.enc_key, iv=enc_iv)
|
||||
self.dec_cipher = AES(is_encrypt=False, key=self.enc_key)
|
||||
|
||||
|
|
@ -242,25 +265,25 @@ class KeyfileKey(AESKeyBase):
|
|||
def decrypt_key_file(self, data, passphrase):
|
||||
d = msgpack.unpackb(data)
|
||||
assert d[b'version'] == 1
|
||||
assert d[b'algorithm'] == b'sha256'
|
||||
assert d[b'algorithm'] == b'gmac'
|
||||
key = pbkdf2_sha256(passphrase.encode('utf-8'), d[b'salt'], d[b'iterations'], 32)
|
||||
data = AES(is_encrypt=False, key=key).decrypt(d[b'data'])
|
||||
if HMAC(key, data, sha256).digest() != d[b'hash']:
|
||||
try:
|
||||
data = AES(is_encrypt=False, key=key, iv=b'\0'*16).check_tag_and_decrypt(d[b'hash'], d[b'data'])
|
||||
return data
|
||||
except Exception:
|
||||
return None
|
||||
return data
|
||||
|
||||
def encrypt_key_file(self, data, passphrase):
|
||||
salt = get_random_bytes(32)
|
||||
iterations = 100000
|
||||
key = pbkdf2_sha256(passphrase.encode('utf-8'), salt, iterations, 32)
|
||||
hash = HMAC(key, data, sha256).digest()
|
||||
cdata = AES(is_encrypt=True, key=key).encrypt(data)
|
||||
tag, cdata = AES(is_encrypt=True, key=key, iv=b'\0'*16).compute_tag_and_encrypt(data)
|
||||
d = {
|
||||
'version': 1,
|
||||
'salt': salt,
|
||||
'iterations': iterations,
|
||||
'algorithm': 'sha256',
|
||||
'hash': hash,
|
||||
'algorithm': 'gmac',
|
||||
'hash': tag,
|
||||
'data': cdata,
|
||||
}
|
||||
return msgpack.packb(d)
|
||||
|
|
|
|||
|
|
@ -27,18 +27,20 @@ class CryptoTestCase(AtticTestCase):
|
|||
self.assert_equal(len(bytes2), 10)
|
||||
self.assert_not_equal(bytes, bytes2)
|
||||
|
||||
def test_aes(self):
|
||||
def test_aes_gcm(self):
|
||||
key = b'X' * 32
|
||||
iv = b'A' * 16
|
||||
data = b'foo' * 10
|
||||
# encrypt
|
||||
aes = AES(is_encrypt=True, key=key)
|
||||
self.assert_equal(bytes_to_long(aes.iv, 8), 0)
|
||||
cdata = aes.encrypt(data)
|
||||
self.assert_equal(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466')
|
||||
self.assert_equal(bytes_to_long(aes.iv, 8), 2)
|
||||
# decrypt
|
||||
aes = AES(is_encrypt=False, key=key)
|
||||
self.assert_equal(bytes_to_long(aes.iv, 8), 0)
|
||||
pdata = aes.decrypt(cdata)
|
||||
aes = AES(is_encrypt=True, key=key, iv=iv)
|
||||
tag, cdata = aes.compute_tag_and_encrypt(data)
|
||||
self.assert_equal(hexlify(tag), b'c98aa10eb6b7031bcc2160878d9438fb00000000000000000000000000000000')
|
||||
self.assert_equal(hexlify(cdata), b'841bcce405df769d22ee9f7f012edf5dc7fb2594d924c7400ffd050f2741')
|
||||
# decrypt (correct tag/cdata)
|
||||
aes = AES(is_encrypt=False, key=key, iv=iv)
|
||||
pdata = aes.check_tag_and_decrypt(tag, cdata)
|
||||
self.assert_equal(data, pdata)
|
||||
self.assert_equal(bytes_to_long(aes.iv, 8), 2)
|
||||
# decrypt (incorrect tag/cdata)
|
||||
aes = AES(is_encrypt=False, key=key, iv=iv)
|
||||
cdata = b'x' + cdata[1:] # corrupt cdata
|
||||
self.assertRaises(Exception, aes.check_tag_and_decrypt, tag, cdata)
|
||||
|
|
|
|||
|
|
@ -15,20 +15,20 @@ class KeyTestCase(AtticTestCase):
|
|||
repository = Location(tempfile.mkstemp()[1])
|
||||
|
||||
keyfile2_key_file = """
|
||||
ATTIC KEY 0000000000000000000000000000000000000000000000000000000000000000
|
||||
hqppdGVyYXRpb25zzgABhqCkaGFzaNoAIMyonNI+7Cjv0qHi0AOBM6bLGxACJhfgzVD2oq
|
||||
bIS9SFqWFsZ29yaXRobaZzaGEyNTakc2FsdNoAINNK5qqJc1JWSUjACwFEWGTdM7Nd0a5l
|
||||
1uBGPEb+9XM9p3ZlcnNpb24BpGRhdGHaANAYDT5yfPpU099oBJwMomsxouKyx/OG4QIXK2
|
||||
hQCG2L2L/9PUu4WIuKvGrsXoP7syemujNfcZws5jLp2UPva4PkQhQsrF1RYDEMLh2eF9Ol
|
||||
rwtkThq1tnh7KjWMG9Ijt7/aoQtq0zDYP/xaFF8XXSJxiyP5zjH5+spB6RL0oQHvbsliSh
|
||||
/cXJq7jrqmrJ1phd6dg4SHAM/i+hubadZoS6m25OQzYAW09wZD/phG8OVa698Z5ed3HTaT
|
||||
SmrtgJL3EoOKgUI9d6BLE4dJdBqntifo""".strip()
|
||||
ATTIC KEY 0000000000000000000000000000000000000000000000000000000000000000
|
||||
hqppdGVyYXRpb25zzgABhqCkc2FsdNoAICiRWfijWqIuvr+70VzOsUS4Y6NM45FWm6LgCu
|
||||
2GyalGqWFsZ29yaXRobaRnbWFjpGhhc2jaACDgCK7u30Pi+Du1qHRyWBupAAAAAAAAAAAA
|
||||
AAAAAAAAAKd2ZXJzaW9uAaRkYXRh2gDQrlCtq2mzdmkuhwIoko5+amxYqnlfNHHZxRFiX9
|
||||
F8AliP7H6S0j9uHyrBKRDWtj7VGYWVW8COy/FncLRgRhspB59rH3y/GS6pfeEw7RWUPd32
|
||||
eOcB6v8q+IHUvGttyFRcN6PxSFHBhOKN0jqStP0UqXLv+d9rGWi6X/HNZGu9WPkqs/g0G9
|
||||
xnf48i9pOy19aQo3HV//ubf+VYWmc1J8zjCS2Og0JkMtxbqM6j4mShPjkURZZBXSJGtORV
|
||||
5IzNAzixJWmr8LR12TmFGVb0U9P79A==""".strip()
|
||||
|
||||
keyfile2_cdata = unhexlify(re.sub('\W', '', """
|
||||
0055f161493fcfc16276e8c31493c4641e1eb19a79d0326fad0291e5a9c98e5933
|
||||
00000000000003e8d21eaf9b86c297a8cd56432e1915bb
|
||||
004078370be366ac3ad9d147992be8ebee000000000000000000000000000000000000000000000000
|
||||
b94bfb5d0a63b0c47cf74e2d0585aa
|
||||
"""))
|
||||
keyfile2_id = unhexlify('c3fbf14bc001ebcc3cd86e696c13482ed071740927cd7cbe1b01b4bfcee49314')
|
||||
keyfile2_id = unhexlify('45f309b4ef353c467d16a19039b87e5400000000000000000000000000000000')
|
||||
|
||||
def setUp(self):
|
||||
self.tmppath = tempfile.mkdtemp()
|
||||
|
|
@ -44,6 +44,15 @@ class KeyTestCase(AtticTestCase):
|
|||
_location = _Location()
|
||||
id = bytes(32)
|
||||
|
||||
def _test_make_testdata(self):
|
||||
# modify tearDown to not kill the key file first, before using this
|
||||
os.environ['ATTIC_PASSPHRASE'] = 'passphrase'
|
||||
key = KeyfileKey.create(self.MockRepository(), self.MockArgs())
|
||||
print("keyfile2_key_file: find the it in the filesystem, see location in test log output")
|
||||
print("keyfile2_cdata:", hexlify(key.encrypt(b'payload')))
|
||||
print("keyfile2_id:", hexlify(key.id_hash(b'payload')))
|
||||
assert False
|
||||
|
||||
def test_plaintext(self):
|
||||
key = PlaintextKey.create(None, None)
|
||||
data = b'foo'
|
||||
|
|
@ -53,7 +62,7 @@ class KeyTestCase(AtticTestCase):
|
|||
def test_keyfile(self):
|
||||
os.environ['ATTIC_PASSPHRASE'] = 'test'
|
||||
key = KeyfileKey.create(self.MockRepository(), self.MockArgs())
|
||||
self.assert_equal(bytes_to_long(key.enc_cipher.iv, 8), 0)
|
||||
self.assert_equal(bytes_to_long(key.enc_iv, 8), 0)
|
||||
manifest = key.encrypt(b'XXX')
|
||||
self.assert_equal(key.extract_nonce(manifest), 0)
|
||||
manifest2 = key.encrypt(b'XXX')
|
||||
|
|
@ -62,7 +71,7 @@ class KeyTestCase(AtticTestCase):
|
|||
self.assert_equal(key.extract_nonce(manifest2), 1)
|
||||
iv = key.extract_nonce(manifest)
|
||||
key2 = KeyfileKey.detect(self.MockRepository(), manifest)
|
||||
self.assert_equal(bytes_to_long(key2.enc_cipher.iv, 8), iv + num_aes_blocks(len(manifest) - KeyfileKey.PAYLOAD_OVERHEAD))
|
||||
self.assert_equal(bytes_to_long(key2.enc_iv, 8), iv + num_aes_blocks(len(manifest) - KeyfileKey.PAYLOAD_OVERHEAD))
|
||||
# Key data sanity check
|
||||
self.assert_equal(len(set([key2.id_key, key2.enc_key, key2.enc_hmac_key])), 3)
|
||||
self.assert_equal(key2.chunk_seed == 0, False)
|
||||
|
|
@ -79,7 +88,7 @@ class KeyTestCase(AtticTestCase):
|
|||
def test_passphrase(self):
|
||||
os.environ['ATTIC_PASSPHRASE'] = 'test'
|
||||
key = PassphraseKey.create(self.MockRepository(), None)
|
||||
self.assert_equal(bytes_to_long(key.enc_cipher.iv, 8), 0)
|
||||
self.assert_equal(bytes_to_long(key.enc_iv, 8), 0)
|
||||
self.assert_equal(hexlify(key.id_key), b'793b0717f9d8fb01c751a487e9b827897ceea62409870600013fbc6b4d8d7ca6')
|
||||
self.assert_equal(hexlify(key.enc_hmac_key), b'b885a05d329a086627412a6142aaeb9f6c54ab7950f996dd65587251f6bc0901')
|
||||
self.assert_equal(hexlify(key.enc_key), b'2ff3654c6daf7381dbbe718d2b20b4f1ea1e34caa6cc65f6bb3ac376b93fed2a')
|
||||
|
|
@ -92,11 +101,11 @@ class KeyTestCase(AtticTestCase):
|
|||
self.assert_equal(key.extract_nonce(manifest2), 1)
|
||||
iv = key.extract_nonce(manifest)
|
||||
key2 = PassphraseKey.detect(self.MockRepository(), manifest)
|
||||
self.assert_equal(bytes_to_long(key2.enc_cipher.iv, 8), iv + num_aes_blocks(len(manifest) - PassphraseKey.PAYLOAD_OVERHEAD))
|
||||
self.assert_equal(bytes_to_long(key2.enc_iv, 8), iv + num_aes_blocks(len(manifest) - PassphraseKey.PAYLOAD_OVERHEAD))
|
||||
self.assert_equal(key.id_key, key2.id_key)
|
||||
self.assert_equal(key.enc_hmac_key, key2.enc_hmac_key)
|
||||
self.assert_equal(key.enc_key, key2.enc_key)
|
||||
self.assert_equal(key.chunk_seed, key2.chunk_seed)
|
||||
data = b'foo'
|
||||
self.assert_equal(hexlify(key.id_hash(data)), b'818217cf07d37efad3860766dcdf1d21e401650fed2d76ed1d797d3aae925990')
|
||||
self.assert_equal(hexlify(key.id_hash(data)), b'a409d69859b8a07625f066e42cde050100000000000000000000000000000000')
|
||||
self.assert_equal(data, key2.decrypt(key2.id_hash(data), key.encrypt(data)))
|
||||
|
|
|
|||
Loading…
Reference in a new issue