Merge pull request #1034 from ThomasWaldmann/crypto-aead

new crypto code, blackbox, aead internally
This commit is contained in:
enkore 2017-07-29 12:18:38 +02:00 committed by GitHub
commit 7d02c7e453
13 changed files with 1038 additions and 296 deletions

View file

@ -52,6 +52,7 @@ from distutils.command.clean import clean
compress_source = 'src/borg/compress.pyx'
crypto_ll_source = 'src/borg/crypto/low_level.pyx'
crypto_helpers = 'src/borg/crypto/_crypto_helpers.c'
chunker_source = 'src/borg/chunker.pyx'
hashindex_source = 'src/borg/hashindex.pyx'
item_source = 'src/borg/item.pyx'
@ -730,7 +731,7 @@ ext_modules = []
if not on_rtd:
ext_modules += [
Extension('borg.compress', [compress_source], libraries=['lz4'], include_dirs=include_dirs, library_dirs=library_dirs, define_macros=define_macros),
Extension('borg.crypto.low_level', [crypto_ll_source], libraries=crypto_libraries, include_dirs=include_dirs, library_dirs=library_dirs, define_macros=define_macros),
Extension('borg.crypto.low_level', [crypto_ll_source, crypto_helpers], libraries=crypto_libraries, include_dirs=include_dirs, library_dirs=library_dirs, define_macros=define_macros),
Extension('borg.hashindex', [hashindex_source]),
Extension('borg.item', [item_source]),
Extension('borg.chunker', [chunker_source]),

View file

@ -25,6 +25,7 @@ from .cache import ChunkListEntry
from .crypto.key import key_factory
from .compress import Compressor, CompressionSpec
from .constants import * # NOQA
from .crypto.low_level import IntegrityError as IntegrityErrorBase
from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer
from .helpers import Manifest
from .helpers import hardlinkable
@ -1148,7 +1149,7 @@ class ArchiveChecker:
else:
try:
self.manifest, _ = Manifest.load(repository, (Manifest.Operation.CHECK,), key=self.key)
except IntegrityError as exc:
except IntegrityErrorBase as exc:
logger.error('Repository manifest is corrupted: %s', exc)
self.error_found = True
del self.chunks[Manifest.MANIFEST_ID]
@ -1211,11 +1212,11 @@ class ArchiveChecker:
chunk_id = chunk_ids_revd.pop(-1) # better efficiency
try:
encrypted_data = next(chunk_data_iter)
except (Repository.ObjectNotFound, IntegrityError) as err:
except (Repository.ObjectNotFound, IntegrityErrorBase) as err:
self.error_found = True
errors += 1
logger.error('chunk %s: %s', bin_to_hex(chunk_id), err)
if isinstance(err, IntegrityError):
if isinstance(err, IntegrityErrorBase):
defect_chunks.append(chunk_id)
# as the exception killed our generator, make a new one for remaining chunks:
if chunk_ids_revd:
@ -1225,7 +1226,7 @@ class ArchiveChecker:
_chunk_id = None if chunk_id == Manifest.MANIFEST_ID else chunk_id
try:
self.key.decrypt(_chunk_id, encrypted_data)
except IntegrityError as integrity_error:
except IntegrityErrorBase as integrity_error:
self.error_found = True
errors += 1
logger.error('chunk %s, integrity error: %s', bin_to_hex(chunk_id), integrity_error)
@ -1254,7 +1255,7 @@ class ArchiveChecker:
encrypted_data = self.repository.get(defect_chunk)
_chunk_id = None if defect_chunk == Manifest.MANIFEST_ID else defect_chunk
self.key.decrypt(_chunk_id, encrypted_data)
except IntegrityError:
except IntegrityErrorBase:
# failed twice -> get rid of this chunk
del self.chunks[defect_chunk]
self.repository.delete(defect_chunk)
@ -1295,7 +1296,7 @@ class ArchiveChecker:
cdata = self.repository.get(chunk_id)
try:
data = self.key.decrypt(chunk_id, cdata)
except IntegrityError as exc:
except IntegrityErrorBase as exc:
logger.error('Skipping corrupted chunk: %s', exc)
self.error_found = True
continue

View file

@ -0,0 +1,35 @@
/* some helpers, so our code also works with OpenSSL 1.0.x */
#include <string.h>
#include <openssl/opensslv.h>
#include <openssl/hmac.h>
#if OPENSSL_VERSION_NUMBER < 0x10100000L
HMAC_CTX *HMAC_CTX_new(void)
{
HMAC_CTX *ctx = OPENSSL_malloc(sizeof(*ctx));
if (ctx != NULL) {
memset(ctx, 0, sizeof *ctx);
HMAC_CTX_cleanup(ctx);
}
return ctx;
}
void HMAC_CTX_free(HMAC_CTX *ctx)
{
if (ctx != NULL) {
HMAC_CTX_cleanup(ctx);
OPENSSL_free(ctx);
}
}
const EVP_CIPHER *EVP_aes_256_ocb(void){ /* dummy, so that code compiles */
return NULL;
}
const EVP_CIPHER *EVP_chacha20_poly1305(void){ /* dummy, so that code compiles */
return NULL;
}
#endif

View file

@ -0,0 +1,15 @@
/* some helpers, so our code also works with OpenSSL 1.0.x */
#include <openssl/opensslv.h>
#include <openssl/hmac.h>
#include <openssl/evp.h>
#if OPENSSL_VERSION_NUMBER < 0x10100000L
HMAC_CTX *HMAC_CTX_new(void);
void HMAC_CTX_free(HMAC_CTX *ctx);
const EVP_CIPHER *EVP_aes_256_ocb(void); /* dummy, so that code compiles */
const EVP_CIPHER *EVP_chacha20_poly1305(void); /* dummy, so that code compiles */
#endif

View file

@ -11,7 +11,7 @@ from hmac import HMAC, compare_digest
import msgpack
from borg.logger import create_logger
from ..logger import create_logger
logger = create_logger()
@ -25,10 +25,10 @@ from ..helpers import get_limited_unpacker
from ..helpers import bin_to_hex
from ..item import Key, EncryptedKey
from ..platform import SaveFile
from .nonces import NonceManager
from .low_level import AES, bytes_to_long, bytes_to_int, num_aes_blocks, hmac_sha256, blake2b_256, hkdf_hmac_sha512
PREFIX = b'\0' * 8
from .nonces import NonceManager
from .low_level import AES, bytes_to_long, long_to_bytes, bytes_to_int, num_cipher_blocks, hmac_sha256, blake2b_256, hkdf_hmac_sha512
from .low_level import AES256_CTR_HMAC_SHA256, AES256_CTR_BLAKE2b
class PassphraseWrong(Error):
@ -352,48 +352,31 @@ class AESKeyBase(KeyBase):
PAYLOAD_OVERHEAD = 1 + 32 + 8 # TYPE + HMAC + NONCE
MAC = hmac_sha256
CIPHERSUITE = AES256_CTR_HMAC_SHA256
logically_encrypted = True
def encrypt(self, chunk):
data = self.compressor.compress(chunk)
self.nonce_manager.ensure_reservation(num_aes_blocks(len(data)))
self.enc_cipher.reset()
data = b''.join((self.enc_cipher.iv[8:], self.enc_cipher.encrypt(data)))
assert (self.MAC is blake2b_256 and len(self.enc_hmac_key) == 128 or
self.MAC is hmac_sha256 and len(self.enc_hmac_key) == 32)
hmac = self.MAC(self.enc_hmac_key, data)
return b''.join((self.TYPE_STR, hmac, data))
next_iv = self.nonce_manager.ensure_reservation(self.cipher.next_iv(),
self.cipher.block_count(len(data)))
return self.cipher.encrypt(data, header=self.TYPE_STR, iv=next_iv)
def decrypt(self, id, data, decompress=True):
if not (data[0] == self.TYPE or
data[0] == PassphraseKey.TYPE and isinstance(self, RepoKey)):
id_str = bin_to_hex(id) if id is not None else '(unknown)'
raise IntegrityError('Chunk %s: Invalid encryption envelope' % id_str)
data_view = memoryview(data)
hmac_given = data_view[1:33]
assert (self.MAC is blake2b_256 and len(self.enc_hmac_key) == 128 or
self.MAC is hmac_sha256 and len(self.enc_hmac_key) == 32)
hmac_computed = memoryview(self.MAC(self.enc_hmac_key, data_view[33:]))
if not compare_digest(hmac_computed, hmac_given):
id_str = bin_to_hex(id) if id is not None else '(unknown)'
raise IntegrityError('Chunk %s: Encryption envelope checksum mismatch' % id_str)
self.dec_cipher.reset(iv=PREFIX + data[33:41])
payload = self.dec_cipher.decrypt(data_view[41:])
try:
payload = self.cipher.decrypt(data)
except IntegrityError as e:
raise IntegrityError("Chunk %s: Could not decrypt [%s]" % (bin_to_hex(id), str(e)))
if not decompress:
return payload
data = self.decompress(payload)
self.assert_id(id, data)
return data
def extract_nonce(self, payload):
if not (payload[0] == self.TYPE or
payload[0] == PassphraseKey.TYPE and isinstance(self, RepoKey)):
raise IntegrityError('Manifest: Invalid encryption envelope')
nonce = bytes_to_long(payload[33:41])
return nonce
def init_from_random_data(self, data=None):
if data is None:
data = os.urandom(100)
@ -405,10 +388,21 @@ class AESKeyBase(KeyBase):
if self.chunk_seed & 0x80000000:
self.chunk_seed = self.chunk_seed - 0xffffffff - 1
def init_ciphers(self, manifest_nonce=0):
self.enc_cipher = AES(is_encrypt=True, key=self.enc_key, iv=manifest_nonce.to_bytes(16, byteorder='big'))
self.nonce_manager = NonceManager(self.repository, self.enc_cipher, manifest_nonce)
self.dec_cipher = AES(is_encrypt=False, key=self.enc_key)
def init_ciphers(self, manifest_data=None):
self.cipher = self.CIPHERSUITE(mac_key=self.enc_hmac_key, enc_key=self.enc_key, header_len=1, aad_offset=1)
if manifest_data is None:
nonce = 0
else:
if not (manifest_data[0] == self.TYPE or
manifest_data[0] == PassphraseKey.TYPE and isinstance(self, RepoKey)):
raise IntegrityError('Manifest: Invalid encryption envelope')
# manifest_blocks is a safe upper bound on the amount of cipher blocks needed
# to encrypt the manifest. depending on the ciphersuite and overhead, it might
# be a bit too high, but that does not matter.
manifest_blocks = num_cipher_blocks(len(manifest_data))
nonce = self.cipher.extract_iv(manifest_data) + manifest_blocks
self.cipher.set_iv(nonce)
self.nonce_manager = NonceManager(self.repository, nonce)
class Passphrase(str):
@ -528,8 +522,7 @@ class PassphraseKey(ID_HMAC_SHA_256, AESKeyBase):
key.init(repository, passphrase)
try:
key.decrypt(None, manifest_data)
num_blocks = num_aes_blocks(len(manifest_data) - 41)
key.init_ciphers(key.extract_nonce(manifest_data) + num_blocks)
key.init_ciphers(manifest_data)
key._passphrase = passphrase
return key
except IntegrityError:
@ -568,8 +561,7 @@ class KeyfileKeyBase(AESKeyBase):
else:
if not key.load(target, passphrase):
raise PassphraseWrong
num_blocks = num_aes_blocks(len(manifest_data) - 41)
key.init_ciphers(key.extract_nonce(manifest_data) + num_blocks)
key.init_ciphers(manifest_data)
key._passphrase = passphrase
return key
@ -604,7 +596,7 @@ class KeyfileKeyBase(AESKeyBase):
assert enc_key.version == 1
assert enc_key.algorithm == 'sha256'
key = passphrase.kdf(enc_key.salt, enc_key.iterations, 32)
data = AES(is_encrypt=False, key=key).decrypt(enc_key.data)
data = AES(key, b'\0'*16).decrypt(enc_key.data)
if hmac_sha256(key, data) == enc_key.hash:
return data
@ -613,7 +605,7 @@ class KeyfileKeyBase(AESKeyBase):
iterations = PBKDF2_ITERATIONS
key = passphrase.kdf(salt, iterations, 32)
hash = hmac_sha256(key, data)
cdata = AES(is_encrypt=True, key=key).encrypt(data)
cdata = AES(key, b'\0'*16).encrypt(data)
enc_key = EncryptedKey(
version=1,
salt=salt,
@ -772,7 +764,7 @@ class Blake2KeyfileKey(ID_BLAKE2b_256, KeyfileKey):
STORAGE = KeyBlobStorage.KEYFILE
FILE_ID = 'BORG_KEY'
MAC = blake2b_256
CIPHERSUITE = AES256_CTR_BLAKE2b
class Blake2RepoKey(ID_BLAKE2b_256, RepoKey):
@ -781,7 +773,7 @@ class Blake2RepoKey(ID_BLAKE2b_256, RepoKey):
ARG_NAME = 'repokey-blake2'
STORAGE = KeyBlobStorage.REPO
MAC = blake2b_256
CIPHERSUITE = AES256_CTR_BLAKE2b
class AuthenticatedKeyBase(RepoKey):
@ -799,16 +791,9 @@ class AuthenticatedKeyBase(RepoKey):
super().save(target, passphrase)
self.logically_encrypted = False
def extract_nonce(self, payload):
# This is called during set-up of the AES ciphers we're not actually using for this
# key. Therefore the return value of this method doesn't matter; it's just around
# to not have it crash should key identification be run against a very small chunk
# by "borg check" when the manifest is lost. (The manifest is always large enough
# to have the original method read some garbage from bytes 33-41). (Also, the return
# value must be larger than the 41 byte bloat of the original format).
if payload[0] != self.TYPE:
def init_ciphers(self, manifest_data=None):
if manifest_data is not None and manifest_data[0] != self.TYPE:
raise IntegrityError('Manifest: Invalid encryption envelope')
return 42
def encrypt(self, chunk):
data = self.compressor.compress(chunk)
@ -816,7 +801,8 @@ class AuthenticatedKeyBase(RepoKey):
def decrypt(self, id, data, decompress=True):
if data[0] != self.TYPE:
raise IntegrityError('Chunk %s: Invalid envelope' % bin_to_hex(id))
id_str = bin_to_hex(id) if id is not None else '(unknown)'
raise IntegrityError('Chunk %s: Invalid envelope' % id_str)
payload = memoryview(data)[1:]
if not decompress:
return payload

View file

@ -1,15 +1,51 @@
"""A thin OpenSSL wrapper"""
"""An AEAD style OpenSSL wrapper
API:
encrypt(data, header=b'', aad_offset=0) -> envelope
decrypt(envelope, header_len=0, aad_offset=0) -> data
Envelope layout:
|<--------------------------- envelope ------------------------------------------>|
|<------------ header ----------->|<---------- ciphersuite specific ------------->|
|<-- not auth data -->|<-- aad -->|<-- e.g.: S(aad, iv, E(data)), iv, E(data) -->|
|--- #aad_offset ---->|
|------------- #header_len ------>|
S means a cryptographic signature function (like HMAC or GMAC).
E means a encryption function (like AES).
iv is the initialization vector / nonce, if needed.
The split of header into not authenticated data and aad (additional authenticated
data) is done to support the legacy envelope layout as used in attic and early borg
(where the TYPE byte was not authenticated) and avoid unneeded memcpy and string
garbage.
Newly designed envelope layouts can just authenticate the whole header.
IV handling:
iv = ... # just never repeat!
cs = CS(hmac_key, enc_key, iv=iv)
envelope = cs.encrypt(data, header, aad_offset)
iv = cs.next_iv(len(data))
(repeat)
"""
import hashlib
import hmac
from math import ceil
from libc.stdlib cimport malloc, free
from cpython cimport PyMem_Malloc, PyMem_Free
from cpython.buffer cimport PyBUF_SIMPLE, PyObject_GetBuffer, PyBuffer_Release
from cpython.bytes cimport PyBytes_FromStringAndSize
API_VERSION = '1.1_02'
cdef extern from "openssl/crypto.h":
int CRYPTO_memcmp(const void *a, const void *b, size_t len)
cdef extern from "../algorithms/blake2-libselect.h":
ctypedef struct blake2b_state:
@ -29,9 +65,14 @@ cdef extern from "openssl/evp.h":
pass
ctypedef struct ENGINE:
pass
const EVP_CIPHER *EVP_aes_256_ctr()
EVP_CIPHER_CTX *EVP_CIPHER_CTX_new()
void EVP_CIPHER_CTX_free(EVP_CIPHER_CTX *a)
const EVP_CIPHER *EVP_aes_256_gcm()
const EVP_CIPHER *EVP_aes_256_ocb()
const EVP_CIPHER *EVP_chacha20_poly1305()
void EVP_CIPHER_CTX_init(EVP_CIPHER_CTX *a)
void EVP_CIPHER_CTX_cleanup(EVP_CIPHER_CTX *a)
int EVP_EncryptInit_ex(EVP_CIPHER_CTX *ctx, const EVP_CIPHER *cipher, ENGINE *impl,
const unsigned char *key, const unsigned char *iv)
@ -44,58 +85,83 @@ cdef extern from "openssl/evp.h":
int EVP_EncryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl)
int EVP_DecryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl)
EVP_MD *EVP_sha256() nogil
int EVP_CIPHER_CTX_ctrl(EVP_CIPHER_CTX *ctx, int type, int arg, void *ptr)
int EVP_CTRL_GCM_GET_TAG
int EVP_CTRL_GCM_SET_TAG
int EVP_CTRL_GCM_SET_IVLEN
const EVP_MD *EVP_sha256() nogil
EVP_CIPHER_CTX *EVP_CIPHER_CTX_new()
void EVP_CIPHER_CTX_free(EVP_CIPHER_CTX *a)
cdef extern from "openssl/hmac.h":
ctypedef struct HMAC_CTX:
pass
void HMAC_CTX_init(HMAC_CTX *ctx)
void HMAC_CTX_cleanup(HMAC_CTX *ctx)
HMAC_CTX *HMAC_CTX_new()
void HMAC_CTX_free(HMAC_CTX *a)
int HMAC_Init_ex(HMAC_CTX *ctx, const void *key, int key_len, const EVP_MD *md, ENGINE *impl)
int HMAC_Update(HMAC_CTX *ctx, const unsigned char *data, int len)
int HMAC_Final(HMAC_CTX *ctx, unsigned char *md, unsigned int *len)
unsigned char *HMAC(const EVP_MD *evp_md,
const void *key, int key_len,
const unsigned char *data, int data_len,
unsigned char *md, unsigned int *md_len) nogil
cdef extern from "_crypto_helpers.h":
long OPENSSL_VERSION_NUMBER
ctypedef struct HMAC_CTX:
pass
HMAC_CTX *HMAC_CTX_new()
void HMAC_CTX_free(HMAC_CTX *a)
const EVP_CIPHER *EVP_aes_256_ocb() # dummy
const EVP_CIPHER *EVP_chacha20_poly1305() # dummy
openssl10 = OPENSSL_VERSION_NUMBER < 0x10100000
import struct
_int = struct.Struct('>I')
_long = struct.Struct('>Q')
_2long = struct.Struct('>QQ')
bytes_to_int = lambda x, offset=0: _int.unpack_from(x, offset)[0]
bytes_to_long = lambda x, offset=0: _long.unpack_from(x, offset)[0]
long_to_bytes = lambda x: _long.pack(x)
def bytes16_to_int(b, offset=0):
h, l = _2long.unpack_from(b, offset)
return (h << 64) + l
def num_cipher_blocks(length, blocksize=16):
"""Return the number of cipher blocks required to encrypt/decrypt <length> bytes of data.
For a precise computation, <blocksize> must be the used cipher's block size (AES: 16, CHACHA20: 64).
def int_to_bytes16(i):
max_uint64 = 0xffffffffffffffff
l = i & max_uint64
h = (i >> 64) & max_uint64
return _2long.pack(h, l)
For a safe-upper-boundary computation, <blocksize> must be the MINIMUM of the block sizes (in
bytes) of ALL supported ciphers. This can be used to adjust a counter if the used cipher is not
known (yet).
The default value of blocksize must be adjusted so it reflects this minimum, so a call of this
function without a blocksize is "safe-upper-boundary by default".
def increment_iv(iv, amount=1):
Padding cipher modes are not supported.
"""
Increment the IV by the given amount (default 1).
:param iv: input IV, 16 bytes (128 bit)
:param amount: increment value
:return: input_IV + amount, 16 bytes (128 bit)
"""
assert len(iv) == 16
iv = bytes16_to_int(iv)
iv += amount
iv = int_to_bytes16(iv)
return iv
return (length + blocksize - 1) // blocksize
def num_aes_blocks(int length):
"""Return the number of AES blocks required to encrypt/decrypt *length* bytes of data.
Note: this is only correct for modes without padding, like AES-CTR.
"""
return (length + 15) // 16
class CryptoError(Exception):
"""Malfunction in the crypto module."""
class IntegrityError(CryptoError):
"""Integrity checks failed. Corrupted or tampered data."""
cdef Py_buffer ro_buffer(object data) except *:
@ -104,101 +170,641 @@ cdef Py_buffer ro_buffer(object data) except *:
return view
cdef class AES:
"""A thin wrapper around the OpenSSL EVP cipher API
"""
class UNENCRYPTED:
# Layout: HEADER + PlainText
def __init__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
assert mac_key is None
assert enc_key is None
self.header_len = header_len
self.set_iv(iv)
def encrypt(self, data, header=b'', iv=None):
"""
IMPORTANT: it is called encrypt to satisfy the crypto api naming convention,
but this does NOT encrypt and it does NOT compute and store a MAC either.
"""
if iv is not None:
self.set_iv(iv)
assert self.iv is not None, 'iv needs to be set before encrypt is called'
return header + data
def decrypt(self, envelope):
"""
IMPORTANT: it is called decrypt to satisfy the crypto api naming convention,
but this does NOT decrypt and it does NOT verify a MAC either, because data
is not encrypted and there is no MAC.
"""
return memoryview(envelope)[self.header_len:]
def block_count(self, length):
return 0
def set_iv(self, iv):
self.iv = iv
def next_iv(self):
return self.iv
def extract_iv(self, envelope):
return 0
cdef class AES256_CTR_BASE:
# Layout: HEADER + MAC 32 + IV 8 + CT (same as attic / borg < 1.2 IF HEADER = TYPE_BYTE, no AAD)
cdef EVP_CIPHER_CTX *ctx
cdef int is_encrypt
cdef unsigned char iv_orig[16]
cdef unsigned char *enc_key
cdef int cipher_blk_len
cdef int iv_len, iv_len_short
cdef int aad_offset
cdef int header_len
cdef int mac_len
cdef unsigned char iv[16]
cdef long long blocks
def __cinit__(self, is_encrypt, key, iv=None):
@staticmethod
def requirements_check():
if OPENSSL_VERSION_NUMBER < 0x10000000:
raise ValueError('AES CTR requires OpenSSL >= 1.0.0. Detected: OpenSSL %08x' % OPENSSL_VERSION_NUMBER)
def __init__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
self.requirements_check()
assert isinstance(enc_key, bytes) and len(enc_key) == 32
self.cipher_blk_len = 16
self.iv_len = sizeof(self.iv)
self.iv_len_short = 8
assert aad_offset <= header_len
self.aad_offset = aad_offset
self.header_len = header_len
self.mac_len = 32
self.enc_key = enc_key
if iv is not None:
self.set_iv(iv)
else:
self.blocks = -1 # make sure set_iv is called before encrypt
def __cinit__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
self.ctx = EVP_CIPHER_CTX_new()
self.is_encrypt = is_encrypt
# Set cipher type and mode
cipher_mode = EVP_aes_256_ctr()
if self.is_encrypt:
if not EVP_EncryptInit_ex(self.ctx, cipher_mode, NULL, NULL, NULL):
raise Exception('EVP_EncryptInit_ex failed')
else: # decrypt
if not EVP_DecryptInit_ex(self.ctx, cipher_mode, NULL, NULL, NULL):
raise Exception('EVP_DecryptInit_ex failed')
self.reset(key, iv)
def __dealloc__(self):
EVP_CIPHER_CTX_free(self.ctx)
def reset(self, key=None, iv=None):
cdef const unsigned char *key2 = NULL
cdef const unsigned char *iv2 = NULL
if key:
key2 = key
if iv:
iv2 = iv
assert isinstance(iv, bytes) and len(iv) == 16
for i in range(16):
self.iv_orig[i] = iv[i]
self.blocks = 0 # number of AES blocks encrypted starting with iv_orig
# Initialise key and IV
if self.is_encrypt:
if not EVP_EncryptInit_ex(self.ctx, NULL, NULL, key2, iv2):
raise Exception('EVP_EncryptInit_ex failed')
else: # decrypt
if not EVP_DecryptInit_ex(self.ctx, NULL, NULL, key2, iv2):
raise Exception('EVP_DecryptInit_ex failed')
cdef mac_compute(self, const unsigned char *data1, int data1_len,
const unsigned char *data2, int data2_len,
const unsigned char *mac_buf):
raise NotImplementedError
@property
def iv(self):
return increment_iv(self.iv_orig[:16], self.blocks)
cdef mac_verify(self, const unsigned char *data1, int data1_len,
const unsigned char *data2, int data2_len,
const unsigned char *mac_buf, const unsigned char *mac_wanted):
raise NotImplementedError
def encrypt(self, data):
cdef Py_buffer data_buf = ro_buffer(data)
cdef int inl = len(data)
cdef int ctl = 0
cdef int outl = 0
# note: modes that use padding, need up to one extra AES block (16b)
cdef unsigned char *out = <unsigned char *>malloc(inl+16)
if not out:
def encrypt(self, data, header=b'', iv=None):
"""
encrypt data, compute mac over aad + iv + cdata, prepend header.
aad_offset is the offset into the header where aad starts.
"""
if iv is not None:
self.set_iv(iv)
assert self.blocks == 0, 'iv needs to be set before encrypt is called'
cdef int ilen = len(data)
cdef int hlen = len(header)
assert hlen == self.header_len
cdef int aoffset = self.aad_offset
cdef int alen = hlen - aoffset
cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(hlen + self.mac_len + self.iv_len_short +
ilen + self.cipher_blk_len) # play safe, 1 extra blk
if not odata:
raise MemoryError
cdef int olen
cdef int offset
cdef Py_buffer idata = ro_buffer(data)
cdef Py_buffer hdata = ro_buffer(header)
try:
offset = 0
for i in range(hlen):
odata[offset+i] = header[i]
offset += hlen
offset += self.mac_len
self.store_iv(odata+offset, self.iv)
offset += self.iv_len_short
rc = EVP_EncryptInit_ex(self.ctx, EVP_aes_256_ctr(), NULL, self.enc_key, self.iv)
if not rc:
raise CryptoError('EVP_EncryptInit_ex failed')
rc = EVP_EncryptUpdate(self.ctx, odata+offset, &olen, <const unsigned char*> idata.buf, ilen)
if not rc:
raise CryptoError('EVP_EncryptUpdate failed')
offset += olen
rc = EVP_EncryptFinal_ex(self.ctx, odata+offset, &olen)
if not rc:
raise CryptoError('EVP_EncryptFinal_ex failed')
offset += olen
self.mac_compute(<const unsigned char *> hdata.buf+aoffset, alen,
odata+hlen+self.mac_len, offset-hlen-self.mac_len,
odata+hlen)
self.blocks += self.block_count(ilen)
return odata[:offset]
finally:
PyMem_Free(odata)
PyBuffer_Release(&hdata)
PyBuffer_Release(&idata)
def decrypt(self, envelope):
"""
authenticate aad + iv + cdata, decrypt cdata, ignore header bytes up to aad_offset.
"""
cdef int ilen = len(envelope)
cdef int hlen = self.header_len
assert hlen == self.header_len
cdef int aoffset = self.aad_offset
cdef int alen = hlen - aoffset
cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len) # play safe, 1 extra blk
if not odata:
raise MemoryError
cdef int olen
cdef int offset
cdef unsigned char mac_buf[32]
assert sizeof(mac_buf) == self.mac_len
cdef Py_buffer idata = ro_buffer(envelope)
try:
self.mac_verify(<const unsigned char *> idata.buf+aoffset, alen,
<const unsigned char *> idata.buf+hlen+self.mac_len, ilen-hlen-self.mac_len,
mac_buf, <const unsigned char *> idata.buf+hlen)
iv = self.fetch_iv(<unsigned char *> idata.buf+hlen+self.mac_len)
self.set_iv(iv)
if not EVP_DecryptInit_ex(self.ctx, EVP_aes_256_ctr(), NULL, self.enc_key, iv):
raise CryptoError('EVP_DecryptInit_ex failed')
offset = 0
rc = EVP_DecryptUpdate(self.ctx, odata+offset, &olen,
<const unsigned char*> idata.buf+hlen+self.mac_len+self.iv_len_short,
ilen-hlen-self.mac_len-self.iv_len_short)
if not rc:
raise CryptoError('EVP_DecryptUpdate failed')
offset += olen
rc = EVP_DecryptFinal_ex(self.ctx, odata+offset, &olen)
if rc <= 0:
raise CryptoError('EVP_DecryptFinal_ex failed')
offset += olen
self.blocks += self.block_count(offset)
return odata[:offset]
finally:
PyMem_Free(odata)
PyBuffer_Release(&idata)
def block_count(self, length):
return num_cipher_blocks(length, self.cipher_blk_len)
def set_iv(self, iv):
# set_iv needs to be called before each encrypt() call
if isinstance(iv, int):
iv = iv.to_bytes(self.iv_len, byteorder='big')
assert isinstance(iv, bytes) and len(iv) == self.iv_len
for i in range(self.iv_len):
self.iv[i] = iv[i]
self.blocks = 0 # how many AES blocks got encrypted with this IV?
def next_iv(self):
# call this after encrypt() to get the next iv (int) for the next encrypt() call
iv = int.from_bytes(self.iv[:self.iv_len], byteorder='big')
return iv + self.blocks
cdef fetch_iv(self, unsigned char * iv_in):
# fetch lower self.iv_len_short bytes of iv and add upper zero bytes
return b'\0' * (self.iv_len - self.iv_len_short) + iv_in[0:self.iv_len_short]
cdef store_iv(self, unsigned char * iv_out, unsigned char * iv):
# store only lower self.iv_len_short bytes, upper bytes are assumed to be 0
cdef int i
for i in range(self.iv_len_short):
iv_out[i] = iv[(self.iv_len-self.iv_len_short)+i]
def extract_iv(self, envelope):
offset = self.header_len + self.mac_len
return bytes_to_long(envelope[offset:offset+self.iv_len_short])
cdef class AES256_CTR_HMAC_SHA256(AES256_CTR_BASE):
cdef HMAC_CTX *hmac_ctx
cdef unsigned char *mac_key
def __init__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
assert isinstance(mac_key, bytes) and len(mac_key) == 32
self.mac_key = mac_key
super().__init__(mac_key, enc_key, iv=iv, header_len=header_len, aad_offset=aad_offset)
def __cinit__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
self.hmac_ctx = HMAC_CTX_new()
def __dealloc__(self):
HMAC_CTX_free(self.hmac_ctx)
cdef mac_compute(self, const unsigned char *data1, int data1_len,
const unsigned char *data2, int data2_len,
const unsigned char *mac_buf):
if not HMAC_Init_ex(self.hmac_ctx, self.mac_key, self.mac_len, EVP_sha256(), NULL):
raise CryptoError('HMAC_Init_ex failed')
if not HMAC_Update(self.hmac_ctx, data1, data1_len):
raise CryptoError('HMAC_Update failed')
if not HMAC_Update(self.hmac_ctx, data2, data2_len):
raise CryptoError('HMAC_Update failed')
if not HMAC_Final(self.hmac_ctx, mac_buf, NULL):
raise CryptoError('HMAC_Final failed')
cdef mac_verify(self, const unsigned char *data1, int data1_len,
const unsigned char *data2, int data2_len,
const unsigned char *mac_buf, const unsigned char *mac_wanted):
self.mac_compute(data1, data1_len, data2, data2_len, mac_buf)
if CRYPTO_memcmp(mac_buf, mac_wanted, self.mac_len):
raise IntegrityError('MAC Authentication failed')
cdef class AES256_CTR_BLAKE2b(AES256_CTR_BASE):
cdef unsigned char *mac_key
def __init__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
assert isinstance(mac_key, bytes) and len(mac_key) == 128
self.mac_key = mac_key
super().__init__(mac_key, enc_key, iv=iv, header_len=header_len, aad_offset=aad_offset)
def __cinit__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
pass
def __dealloc__(self):
pass
cdef mac_compute(self, const unsigned char *data1, int data1_len,
const unsigned char *data2, int data2_len,
const unsigned char *mac_buf):
cdef blake2b_state state
cdef int rc
rc = blake2b_init(&state, self.mac_len)
if rc == -1:
raise Exception('blake2b_init() failed')
with nogil:
rc = blake2b_update(&state, self.mac_key, 128)
if rc != -1:
rc = blake2b_update(&state, data1, data1_len)
if rc != -1:
rc = blake2b_update(&state, data2, data2_len)
if rc == -1:
raise Exception('blake2b_update() failed')
rc = blake2b_final(&state, mac_buf, self.mac_len)
if rc == -1:
raise Exception('blake2b_final() failed')
cdef mac_verify(self, const unsigned char *data1, int data1_len,
const unsigned char *data2, int data2_len,
const unsigned char *mac_buf, const unsigned char *mac_wanted):
self.mac_compute(data1, data1_len, data2, data2_len, mac_buf)
if CRYPTO_memcmp(mac_buf, mac_wanted, self.mac_len):
raise IntegrityError('MAC Authentication failed')
ctypedef const EVP_CIPHER * (* CIPHER)()
cdef class _AEAD_BASE:
# Layout: HEADER + MAC 16 + IV 12 + CT
cdef CIPHER cipher
cdef EVP_CIPHER_CTX *ctx
cdef unsigned char *enc_key
cdef int cipher_blk_len
cdef int iv_len
cdef int aad_offset
cdef int header_len
cdef int mac_len
cdef unsigned char iv[12]
cdef long long blocks
@staticmethod
def requirements_check():
"""check whether library requirements for this ciphersuite are satisfied"""
raise NotImplemented # override / implement in child class
def __init__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
assert mac_key is None
assert isinstance(enc_key, bytes) and len(enc_key) == 32
self.iv_len = sizeof(self.iv)
self.header_len = 1
assert aad_offset <= header_len
self.aad_offset = aad_offset
self.header_len = header_len
self.mac_len = 16
self.enc_key = enc_key
if iv is not None:
self.set_iv(iv)
else:
self.blocks = -1 # make sure set_iv is called before encrypt
def __cinit__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
self.ctx = EVP_CIPHER_CTX_new()
def __dealloc__(self):
EVP_CIPHER_CTX_free(self.ctx)
def encrypt(self, data, header=b'', iv=None):
"""
encrypt data, compute mac over aad + iv + cdata, prepend header.
aad_offset is the offset into the header where aad starts.
"""
if iv is not None:
self.set_iv(iv)
assert self.blocks == 0, 'iv needs to be set before encrypt is called'
# AES-GCM, AES-OCB, CHACHA20 ciphers all add a internal 32bit counter to the 96bit (12Byte)
# IV we provide, thus we must not encrypt more than 2^32 cipher blocks with same IV).
block_count = self.block_count(len(data))
if block_count > 2**32:
raise ValueError('too much data, would overflow internal 32bit counter')
cdef int ilen = len(data)
cdef int hlen = len(header)
assert hlen == self.header_len
cdef int aoffset = self.aad_offset
cdef int alen = hlen - aoffset
cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(hlen + self.mac_len + self.iv_len +
ilen + self.cipher_blk_len)
if not odata:
raise MemoryError
cdef int olen
cdef int offset
cdef Py_buffer idata = ro_buffer(data)
cdef Py_buffer hdata = ro_buffer(header)
try:
offset = 0
for i in range(hlen):
odata[offset+i] = header[i]
offset += hlen
offset += self.mac_len
self.store_iv(odata+offset, self.iv)
rc = EVP_EncryptInit_ex(self.ctx, self.cipher(), NULL, NULL, NULL)
if not rc:
raise CryptoError('EVP_EncryptInit_ex failed')
if not EVP_CIPHER_CTX_ctrl(self.ctx, EVP_CTRL_GCM_SET_IVLEN, self.iv_len, NULL):
raise CryptoError('EVP_CIPHER_CTX_ctrl SET IVLEN failed')
rc = EVP_EncryptInit_ex(self.ctx, NULL, NULL, self.enc_key, self.iv)
if not rc:
raise CryptoError('EVP_EncryptInit_ex failed')
rc = EVP_EncryptUpdate(self.ctx, NULL, &olen, <const unsigned char*> hdata.buf+aoffset, alen)
if not rc:
raise CryptoError('EVP_EncryptUpdate failed')
if not EVP_EncryptUpdate(self.ctx, NULL, &olen, odata+offset, self.iv_len):
raise CryptoError('EVP_EncryptUpdate failed')
offset += self.iv_len
rc = EVP_EncryptUpdate(self.ctx, odata+offset, &olen, <const unsigned char*> idata.buf, ilen)
if not rc:
raise CryptoError('EVP_EncryptUpdate failed')
offset += olen
rc = EVP_EncryptFinal_ex(self.ctx, odata+offset, &olen)
if not rc:
raise CryptoError('EVP_EncryptFinal_ex failed')
offset += olen
if not EVP_CIPHER_CTX_ctrl(self.ctx, EVP_CTRL_GCM_GET_TAG, self.mac_len, odata+hlen):
raise CryptoError('EVP_CIPHER_CTX_ctrl GET TAG failed')
self.blocks = block_count
return odata[:offset]
finally:
PyMem_Free(odata)
PyBuffer_Release(&hdata)
PyBuffer_Release(&idata)
def decrypt(self, envelope):
"""
authenticate aad + iv + cdata, decrypt cdata, ignore header bytes up to aad_offset.
"""
# AES-GCM, AES-OCB, CHACHA20 ciphers all add a internal 32bit counter to the 96bit (12Byte)
# IV we provide, thus we must not decrypt more than 2^32 cipher blocks with same IV):
approx_block_count = self.block_count(len(envelope)) # sloppy, but good enough for borg
if approx_block_count > 2**32:
raise ValueError('too much data, would overflow internal 32bit counter')
cdef int ilen = len(envelope)
cdef int hlen = self.header_len
assert hlen == self.header_len
cdef int aoffset = self.aad_offset
cdef int alen = hlen - aoffset
cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len)
if not odata:
raise MemoryError
cdef int olen
cdef int offset
cdef Py_buffer idata = ro_buffer(envelope)
try:
if not EVP_DecryptInit_ex(self.ctx, self.cipher(), NULL, NULL, NULL):
raise CryptoError('EVP_DecryptInit_ex failed')
iv = self.fetch_iv(<unsigned char *> idata.buf+hlen+self.mac_len)
self.set_iv(iv)
if not EVP_CIPHER_CTX_ctrl(self.ctx, EVP_CTRL_GCM_SET_IVLEN, self.iv_len, NULL):
raise CryptoError('EVP_CIPHER_CTX_ctrl SET IVLEN failed')
if not EVP_DecryptInit_ex(self.ctx, NULL, NULL, self.enc_key, iv):
raise CryptoError('EVP_DecryptInit_ex failed')
if not EVP_CIPHER_CTX_ctrl(self.ctx, EVP_CTRL_GCM_SET_TAG, self.mac_len, <void *> idata.buf+hlen):
raise CryptoError('EVP_CIPHER_CTX_ctrl SET TAG failed')
rc = EVP_DecryptUpdate(self.ctx, NULL, &olen, <const unsigned char*> idata.buf+aoffset, alen)
if not rc:
raise CryptoError('EVP_DecryptUpdate failed')
if not EVP_DecryptUpdate(self.ctx, NULL, &olen,
<const unsigned char*> idata.buf+hlen+self.mac_len, self.iv_len):
raise CryptoError('EVP_DecryptUpdate failed')
offset = 0
rc = EVP_DecryptUpdate(self.ctx, odata+offset, &olen,
<const unsigned char*> idata.buf+hlen+self.mac_len+self.iv_len,
ilen-hlen-self.mac_len-self.iv_len)
if not rc:
raise CryptoError('EVP_DecryptUpdate failed')
offset += olen
rc = EVP_DecryptFinal_ex(self.ctx, odata+offset, &olen)
if rc <= 0:
# a failure here means corrupted or tampered tag (mac) or data.
raise IntegrityError('Authentication / EVP_DecryptFinal_ex failed')
offset += olen
self.blocks = self.block_count(offset)
return odata[:offset]
finally:
PyMem_Free(odata)
PyBuffer_Release(&idata)
def block_count(self, length):
return num_cipher_blocks(length, self.cipher_blk_len)
def set_iv(self, iv):
# set_iv needs to be called before each encrypt() call,
# because encrypt does a full initialisation of the cipher context.
if isinstance(iv, int):
iv = iv.to_bytes(self.iv_len, byteorder='big')
assert isinstance(iv, bytes) and len(iv) == self.iv_len
for i in range(self.iv_len):
self.iv[i] = iv[i]
self.blocks = 0 # number of cipher blocks encrypted with this IV
def next_iv(self):
# call this after encrypt() to get the next iv (int) for the next encrypt() call
# AES-GCM, AES-OCB, CHACHA20 ciphers all add a internal 32bit counter to the 96bit
# (12 byte) IV we provide, thus we only need to increment the IV by 1.
iv = int.from_bytes(self.iv[:self.iv_len], byteorder='big')
return iv + 1
cdef fetch_iv(self, unsigned char * iv_in):
return iv_in[0:self.iv_len]
cdef store_iv(self, unsigned char * iv_out, unsigned char * iv):
cdef int i
for i in range(self.iv_len):
iv_out[i] = iv[i]
def extract_iv(self, envelope):
offset = self.header_len + self.mac_len
return bytes_to_long(envelope[offset:offset+self.iv_len])
cdef class _AES_BASE(_AEAD_BASE):
def __init__(self, *args, **kwargs):
self.cipher_blk_len = 16
super().__init__(*args, **kwargs)
cdef class _CHACHA_BASE(_AEAD_BASE):
def __init__(self, *args, **kwargs):
self.cipher_blk_len = 64
super().__init__(*args, **kwargs)
cdef class AES256_GCM(_AES_BASE):
@staticmethod
def requirements_check():
if OPENSSL_VERSION_NUMBER < 0x10001040:
raise ValueError('AES GCM requires OpenSSL >= 1.0.1d. Detected: OpenSSL %08x' % OPENSSL_VERSION_NUMBER)
def __init__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
self.requirements_check()
self.cipher = EVP_aes_256_gcm
super().__init__(mac_key, enc_key, iv=iv, header_len=header_len, aad_offset=aad_offset)
cdef class AES256_OCB(_AES_BASE):
@staticmethod
def requirements_check():
if OPENSSL_VERSION_NUMBER < 0x10100000:
raise ValueError('AES OCB requires OpenSSL >= 1.1.0. Detected: OpenSSL %08x' % OPENSSL_VERSION_NUMBER)
def __init__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
self.requirements_check()
self.cipher = EVP_aes_256_ocb
super().__init__(mac_key, enc_key, iv=iv, header_len=header_len, aad_offset=aad_offset)
cdef class CHACHA20_POLY1305(_CHACHA_BASE):
@staticmethod
def requirements_check():
if OPENSSL_VERSION_NUMBER < 0x10100000:
raise ValueError('CHACHA20-POLY1305 requires OpenSSL >= 1.1.0. Detected: OpenSSL %08x' % OPENSSL_VERSION_NUMBER)
def __init__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1):
self.requirements_check()
self.cipher = EVP_chacha20_poly1305
super().__init__(mac_key, enc_key, iv=iv, header_len=header_len, aad_offset=aad_offset)
cdef class AES:
"""A thin wrapper around the OpenSSL EVP cipher API - for legacy code, like key file encryption"""
cdef CIPHER cipher
cdef EVP_CIPHER_CTX *ctx
cdef unsigned char *enc_key
cdef int cipher_blk_len
cdef int iv_len
cdef unsigned char iv[16]
cdef long long blocks
def __init__(self, enc_key, iv=None):
assert isinstance(enc_key, bytes) and len(enc_key) == 32
self.enc_key = enc_key
self.iv_len = 16
assert sizeof(self.iv) == self.iv_len
self.cipher = EVP_aes_256_ctr
self.cipher_blk_len = 16
if iv is not None:
self.set_iv(iv)
else:
self.blocks = -1 # make sure set_iv is called before encrypt
def __cinit__(self, enc_key, iv=None):
self.ctx = EVP_CIPHER_CTX_new()
def __dealloc__(self):
EVP_CIPHER_CTX_free(self.ctx)
def encrypt(self, data, iv=None):
if iv is not None:
self.set_iv(iv)
assert self.blocks == 0, 'iv needs to be set before encrypt is called'
cdef Py_buffer idata = ro_buffer(data)
cdef int ilen = len(data)
cdef int offset
cdef int olen
cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len)
if not odata:
raise MemoryError
try:
if not EVP_EncryptUpdate(self.ctx, out, &outl, <const unsigned char*> data_buf.buf, inl):
if not EVP_EncryptInit_ex(self.ctx, self.cipher(), NULL, self.enc_key, self.iv):
raise Exception('EVP_EncryptInit_ex failed')
offset = 0
if not EVP_EncryptUpdate(self.ctx, odata, &olen, <const unsigned char*> idata.buf, ilen):
raise Exception('EVP_EncryptUpdate failed')
ctl = outl
if not EVP_EncryptFinal_ex(self.ctx, out+ctl, &outl):
offset += olen
if not EVP_EncryptFinal_ex(self.ctx, odata+offset, &olen):
raise Exception('EVP_EncryptFinal failed')
ctl += outl
self.blocks += num_aes_blocks(ctl)
return out[:ctl]
offset += olen
self.blocks = self.block_count(offset)
return odata[:offset]
finally:
free(out)
PyBuffer_Release(&data_buf)
PyMem_Free(odata)
PyBuffer_Release(&idata)
def decrypt(self, data):
cdef Py_buffer data_buf = ro_buffer(data)
cdef int inl = len(data)
cdef int ptl = 0
cdef int outl = 0
# note: modes that use padding, need up to one extra AES block (16b).
# This is what the openssl docs say. I am not sure this is correct,
# but OTOH it will not cause any harm if our buffer is a little bigger.
cdef unsigned char *out = <unsigned char *>malloc(inl+16)
if not out:
cdef Py_buffer idata = ro_buffer(data)
cdef int ilen = len(data)
cdef int offset
cdef int olen
cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len)
if not odata:
raise MemoryError
try:
if not EVP_DecryptUpdate(self.ctx, out, &outl, <const unsigned char*> data_buf.buf, inl):
# Set cipher type and mode
if not EVP_DecryptInit_ex(self.ctx, self.cipher(), NULL, self.enc_key, self.iv):
raise Exception('EVP_DecryptInit_ex failed')
offset = 0
if not EVP_DecryptUpdate(self.ctx, odata, &olen, <const unsigned char*> idata.buf, ilen):
raise Exception('EVP_DecryptUpdate failed')
ptl = outl
if EVP_DecryptFinal_ex(self.ctx, out+ptl, &outl) <= 0:
offset += olen
if EVP_DecryptFinal_ex(self.ctx, odata+offset, &olen) <= 0:
# this error check is very important for modes with padding or
# authentication. for them, a failure here means corrupted data.
# CTR mode does not use padding nor authentication.
raise Exception('EVP_DecryptFinal failed')
ptl += outl
self.blocks += num_aes_blocks(inl)
return out[:ptl]
offset += olen
self.blocks = self.block_count(ilen)
return odata[:offset]
finally:
free(out)
PyBuffer_Release(&data_buf)
PyMem_Free(odata)
PyBuffer_Release(&idata)
def block_count(self, length):
return num_cipher_blocks(length, self.cipher_blk_len)
def set_iv(self, iv):
# set_iv needs to be called before each encrypt() call,
# because encrypt does a full initialisation of the cipher context.
if isinstance(iv, int):
iv = iv.to_bytes(self.iv_len, byteorder='big')
assert isinstance(iv, bytes) and len(iv) == self.iv_len
for i in range(self.iv_len):
self.iv[i] = iv[i]
self.blocks = 0 # number of cipher blocks encrypted with this IV
def next_iv(self):
# call this after encrypt() to get the next iv (int) for the next encrypt() call
iv = int.from_bytes(self.iv[:self.iv_len], byteorder='big')
return iv + self.blocks
def hmac_sha256(key, data):
@ -210,7 +816,7 @@ def hmac_sha256(key, data):
with nogil:
rc = HMAC(EVP_sha256(), key_ptr, key_len, <const unsigned char*> data_buf.buf, data_buf.len, md, NULL)
if rc != md:
raise Exception('HMAC(EVP_sha256) failed')
raise CryptoError('HMAC(EVP_sha256) failed')
finally:
PyBuffer_Release(&data_buf)
return PyBytes_FromStringAndSize(<char*> &md[0], 32)

View file

@ -14,9 +14,8 @@ NONCE_SPACE_RESERVATION = 2**28 # This in units of AES blocksize (16 bytes)
class NonceManager:
def __init__(self, repository, enc_cipher, manifest_nonce):
def __init__(self, repository, manifest_nonce):
self.repository = repository
self.enc_cipher = enc_cipher
self.end_of_nonce_reservation = None
self.manifest_nonce = manifest_nonce
self.nonce_file = os.path.join(get_security_dir(self.repository.id_str), 'nonce')
@ -47,7 +46,15 @@ class NonceManager:
def commit_repo_nonce_reservation(self, next_unreserved, start_nonce):
self.repository.commit_nonce_reservation(next_unreserved, start_nonce)
def ensure_reservation(self, nonce_space_needed):
def ensure_reservation(self, nonce, nonce_space_needed):
"""
Call this before doing encryption, give current, yet unused, integer IV as <nonce>
and the amount of subsequent (counter-like) IVs needed as <nonce_space_needed>.
Return value is the IV (counter) integer you shall use for encryption.
Note: this method may return the <nonce> you gave, if a reservation for it exists or
can be established, so make sure you give a unused nonce.
"""
# Nonces may never repeat, even if a transaction aborts or the system crashes.
# Therefore a part of the nonce space is reserved before any nonce is used for encryption.
# As these reservations are committed to permanent storage before any nonce is used, this protects
@ -64,24 +71,17 @@ class NonceManager:
if self.end_of_nonce_reservation:
# we already got a reservation, if nonce_space_needed still fits everything is ok
next_nonce = int.from_bytes(self.enc_cipher.iv, byteorder='big')
next_nonce = nonce
assert next_nonce <= self.end_of_nonce_reservation
if next_nonce + nonce_space_needed <= self.end_of_nonce_reservation:
return
return next_nonce
repo_free_nonce = self.get_repo_free_nonce()
local_free_nonce = self.get_local_free_nonce()
free_nonce_space = max(x for x in (repo_free_nonce, local_free_nonce, self.manifest_nonce, self.end_of_nonce_reservation) if x is not None)
reservation_end = free_nonce_space + nonce_space_needed + NONCE_SPACE_RESERVATION
assert reservation_end < MAX_REPRESENTABLE_NONCE
if self.end_of_nonce_reservation is None:
# initialization, reset the encryption cipher to the start of the reservation
self.enc_cipher.reset(None, free_nonce_space.to_bytes(16, byteorder='big'))
else:
# expand existing reservation if possible
if free_nonce_space != self.end_of_nonce_reservation:
# some other client got an interleaved reservation, skip partial space in old reservation to avoid overlap
self.enc_cipher.reset(None, free_nonce_space.to_bytes(16, byteorder='big'))
self.commit_repo_nonce_reservation(reservation_end, repo_free_nonce)
self.commit_local_nonce_reservation(reservation_end, local_free_nonce)
self.end_of_nonce_reservation = reservation_end
return free_nonce_space

View file

@ -91,7 +91,7 @@ class ErrorWithTraceback(Error):
traceback = True
class IntegrityError(ErrorWithTraceback):
class IntegrityError(ErrorWithTraceback, borg.crypto.low_level.IntegrityError):
"""Data integrity error: {}"""

View file

@ -30,7 +30,7 @@ SELFTEST_CASES = [
ChunkerTestCase,
]
SELFTEST_COUNT = 35
SELFTEST_COUNT = 37
class SelfTestResult(TestResult):

View file

@ -36,7 +36,7 @@ from ..archive import Archive, ChunkBuffer, flags_noatime, flags_normal
from ..archiver import Archiver, parse_storage_quota
from ..cache import Cache, LocalCache
from ..constants import * # NOQA
from ..crypto.low_level import bytes_to_long, num_aes_blocks
from ..crypto.low_level import bytes_to_long, num_cipher_blocks
from ..crypto.key import KeyfileKeyBase, RepoKey, KeyfileKey, Passphrase, TAMRequiredError
from ..crypto.keymanager import RepoIdMismatch, NotABorgKeyFile
from ..crypto.file_integrity import FileIntegrityError
@ -2169,7 +2169,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
hash = sha256(data).digest()
if hash not in seen:
seen.add(hash)
num_blocks = num_aes_blocks(len(data) - 41)
num_blocks = num_cipher_blocks(len(data) - 41)
nonce = bytes_to_long(data[33:41])
for counter in range(nonce, nonce + num_blocks):
self.assert_not_in(counter, used)

View file

@ -1,8 +1,10 @@
from binascii import hexlify, unhexlify
from ..crypto.low_level import AES, bytes_to_long, bytes_to_int, long_to_bytes, hmac_sha256, blake2b_256
from ..crypto.low_level import increment_iv, bytes16_to_int, int_to_bytes16
from ..crypto.low_level import AES256_CTR_HMAC_SHA256, AES256_GCM, AES256_OCB, CHACHA20_POLY1305, UNENCRYPTED, \
IntegrityError, blake2b_256, hmac_sha256, openssl10
from ..crypto.low_level import bytes_to_long, bytes_to_int, long_to_bytes
from ..crypto.low_level import hkdf_hmac_sha512
from . import BaseTestCase
# Note: these tests are part of the self test, do not use or import py.test functionality here.
@ -18,42 +20,168 @@ class CryptoTestCase(BaseTestCase):
self.assert_equal(bytes_to_long(b'\0\0\0\0\0\0\0\1'), 1)
self.assert_equal(long_to_bytes(1), b'\0\0\0\0\0\0\0\1')
def test_bytes16_to_int(self):
self.assert_equal(bytes16_to_int(b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\1'), 1)
self.assert_equal(int_to_bytes16(1), b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\1')
self.assert_equal(bytes16_to_int(b'\0\0\0\0\0\0\0\1\0\0\0\0\0\0\0\0'), 2 ** 64)
self.assert_equal(int_to_bytes16(2 ** 64), b'\0\0\0\0\0\0\0\1\0\0\0\0\0\0\0\0')
def test_UNENCRYPTED(self):
iv = b'' # any IV is ok, it just must be set and not None
data = b'data'
header = b'header'
cs = UNENCRYPTED(None, None, iv, header_len=6)
envelope = cs.encrypt(data, header=header)
self.assert_equal(envelope, header + data)
got_data = cs.decrypt(envelope)
self.assert_equal(got_data, data)
def test_increment_iv(self):
iv0 = b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0'
iv1 = b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\1'
iv2 = b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\2'
self.assert_equal(increment_iv(iv0, 0), iv0)
self.assert_equal(increment_iv(iv0, 1), iv1)
self.assert_equal(increment_iv(iv0, 2), iv2)
iva = b'\0\0\0\0\0\0\0\0\xff\xff\xff\xff\xff\xff\xff\xff'
ivb = b'\0\0\0\0\0\0\0\1\x00\x00\x00\x00\x00\x00\x00\x00'
ivc = b'\0\0\0\0\0\0\0\1\x00\x00\x00\x00\x00\x00\x00\x01'
self.assert_equal(increment_iv(iva, 0), iva)
self.assert_equal(increment_iv(iva, 1), ivb)
self.assert_equal(increment_iv(iva, 2), ivc)
self.assert_equal(increment_iv(iv0, 2**64), ivb)
def test_aes(self):
key = b'X' * 32
def test_AES256_CTR_HMAC_SHA256(self):
# this tests the layout as in attic / borg < 1.2 (1 type byte, no aad)
mac_key = b'Y' * 32
enc_key = b'X' * 32
iv = 0
data = b'foo' * 10
# encrypt
aes = AES(is_encrypt=True, key=key)
self.assert_equal(bytes_to_long(aes.iv, 8), 0)
cdata = aes.encrypt(data)
header = b'\x42'
# encrypt-then-mac
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, iv, header_len=1, aad_offset=1)
hdr_mac_iv_cdata = cs.encrypt(data, header=header)
hdr = hdr_mac_iv_cdata[0:1]
mac = hdr_mac_iv_cdata[1:33]
iv = hdr_mac_iv_cdata[33:41]
cdata = hdr_mac_iv_cdata[41:]
self.assert_equal(hexlify(hdr), b'42')
self.assert_equal(hexlify(mac), b'af90b488b0cc4a8f768fe2d6814fa65aec66b148135e54f7d4d29a27f22f57a8')
self.assert_equal(hexlify(iv), b'0000000000000000')
self.assert_equal(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466')
self.assert_equal(bytes_to_long(aes.iv, 8), 2)
# decrypt
aes = AES(is_encrypt=False, key=key)
self.assert_equal(bytes_to_long(aes.iv, 8), 0)
pdata = aes.decrypt(cdata)
self.assert_equal(cs.next_iv(), 2)
# auth-then-decrypt
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1)
pdata = cs.decrypt(hdr_mac_iv_cdata)
self.assert_equal(data, pdata)
self.assert_equal(bytes_to_long(aes.iv, 8), 2)
self.assert_equal(cs.next_iv(), 2)
# auth-failure due to corruption (corrupted data)
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1)
hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:41] + b'\0' + hdr_mac_iv_cdata[42:]
self.assert_raises(IntegrityError,
lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted))
def test_AES256_CTR_HMAC_SHA256_aad(self):
mac_key = b'Y' * 32
enc_key = b'X' * 32
iv = 0
data = b'foo' * 10
header = b'\x12\x34\x56'
# encrypt-then-mac
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, iv, header_len=3, aad_offset=1)
hdr_mac_iv_cdata = cs.encrypt(data, header=header)
hdr = hdr_mac_iv_cdata[0:3]
mac = hdr_mac_iv_cdata[3:35]
iv = hdr_mac_iv_cdata[35:43]
cdata = hdr_mac_iv_cdata[43:]
self.assert_equal(hexlify(hdr), b'123456')
self.assert_equal(hexlify(mac), b'7659a915d9927072ef130258052351a17ef882692893c3850dd798c03d2dd138')
self.assert_equal(hexlify(iv), b'0000000000000000')
self.assert_equal(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466')
self.assert_equal(cs.next_iv(), 2)
# auth-then-decrypt
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1)
pdata = cs.decrypt(hdr_mac_iv_cdata)
self.assert_equal(data, pdata)
self.assert_equal(cs.next_iv(), 2)
# auth-failure due to corruption (corrupted aad)
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1)
hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:1] + b'\0' + hdr_mac_iv_cdata[2:]
self.assert_raises(IntegrityError,
lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted))
def test_AE(self):
# used in legacy-like layout (1 type byte, no aad)
mac_key = None
enc_key = b'X' * 32
iv = 0
data = b'foo' * 10
header = b'\x23'
tests = [
# ciphersuite class, exp_mac, exp_cdata
(AES256_GCM,
b'66a438843aa41a087d6a7ed1dc1f3c4c',
b'5bbb40be14e4bcbfc75715b77b1242d590d2bf9f7f8a8a910b4469888689', )
]
if not openssl10:
tests += [
(AES256_OCB,
b'b6909c23c9aaebd9abbe1ff42097652d',
b'877ce46d2f62dee54699cebc3ba41d9ab613f7c486778c1b3636664b1493', ),
(CHACHA20_POLY1305,
b'fd08594796e0706cde1e8b461e3e0555',
b'a093e4b0387526f085d3c40cca84a35230a5c0dd766453b77ba38bcff775', )
]
for cs_cls, exp_mac, exp_cdata in tests:
# print(repr(cs_cls))
# encrypt/mac
cs = cs_cls(mac_key, enc_key, iv, header_len=1, aad_offset=1)
hdr_mac_iv_cdata = cs.encrypt(data, header=header)
hdr = hdr_mac_iv_cdata[0:1]
mac = hdr_mac_iv_cdata[1:17]
iv = hdr_mac_iv_cdata[17:29]
cdata = hdr_mac_iv_cdata[29:]
self.assert_equal(hexlify(hdr), b'23')
self.assert_equal(hexlify(mac), exp_mac)
self.assert_equal(hexlify(iv), b'000000000000000000000000')
self.assert_equal(hexlify(cdata), exp_cdata)
self.assert_equal(cs.next_iv(), 1)
# auth/decrypt
cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1)
pdata = cs.decrypt(hdr_mac_iv_cdata)
self.assert_equal(data, pdata)
self.assert_equal(cs.next_iv(), 1)
# auth-failure due to corruption (corrupted data)
cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1)
hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:29] + b'\0' + hdr_mac_iv_cdata[30:]
self.assert_raises(IntegrityError,
lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted))
def test_AEAD(self):
# test with aad
mac_key = None
enc_key = b'X' * 32
iv = 0
data = b'foo' * 10
header = b'\x12\x34\x56'
tests = [
# ciphersuite class, exp_mac, exp_cdata
(AES256_GCM,
b'4fb0e5b0a0bca57527352cc6240e7cca',
b'5bbb40be14e4bcbfc75715b77b1242d590d2bf9f7f8a8a910b4469888689', )
]
if not openssl10:
tests += [
(AES256_OCB,
b'f2748c412af1c7ead81863a18c2c1893',
b'877ce46d2f62dee54699cebc3ba41d9ab613f7c486778c1b3636664b1493', ),
(CHACHA20_POLY1305,
b'b7e7c9a79f2404e14f9aad156bf091dd',
b'a093e4b0387526f085d3c40cca84a35230a5c0dd766453b77ba38bcff775', )
]
for cs_cls, exp_mac, exp_cdata in tests:
# print(repr(cs_cls))
# encrypt/mac
cs = cs_cls(mac_key, enc_key, iv, header_len=3, aad_offset=1)
hdr_mac_iv_cdata = cs.encrypt(data, header=header)
hdr = hdr_mac_iv_cdata[0:3]
mac = hdr_mac_iv_cdata[3:19]
iv = hdr_mac_iv_cdata[19:31]
cdata = hdr_mac_iv_cdata[31:]
self.assert_equal(hexlify(hdr), b'123456')
self.assert_equal(hexlify(mac), exp_mac)
self.assert_equal(hexlify(iv), b'000000000000000000000000')
self.assert_equal(hexlify(cdata), exp_cdata)
self.assert_equal(cs.next_iv(), 1)
# auth/decrypt
cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1)
pdata = cs.decrypt(hdr_mac_iv_cdata)
self.assert_equal(data, pdata)
self.assert_equal(cs.next_iv(), 1)
# auth-failure due to corruption (corrupted aad)
cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1)
hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:1] + b'\0' + hdr_mac_iv_cdata[2:]
self.assert_raises(IntegrityError,
lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted))
def test_hmac_sha256(self):
# RFC 4231 test vectors

View file

@ -13,7 +13,8 @@ from ..crypto.key import PlaintextKey, PassphraseKey, AuthenticatedKey, RepoKey,
from ..crypto.key import ID_HMAC_SHA_256, ID_BLAKE2b_256
from ..crypto.key import TAMRequiredError, TAMInvalid, TAMUnsupportedSuiteError, UnsupportedManifestError
from ..crypto.key import identify_key
from ..crypto.low_level import bytes_to_long, num_aes_blocks
from ..crypto.low_level import bytes_to_long
from ..crypto.low_level import IntegrityError as IntegrityErrorBase
from ..helpers import IntegrityError
from ..helpers import Location
from ..helpers import StableDict
@ -75,6 +76,7 @@ class TestKey:
AuthenticatedKey,
KeyfileKey,
RepoKey,
AuthenticatedKey,
Blake2KeyfileKey,
Blake2RepoKey,
Blake2AuthenticatedKey,
@ -115,16 +117,16 @@ class TestKey:
def test_keyfile(self, monkeypatch, keys_dir):
monkeypatch.setenv('BORG_PASSPHRASE', 'test')
key = KeyfileKey.create(self.MockRepository(), self.MockArgs())
assert bytes_to_long(key.enc_cipher.iv, 8) == 0
assert key.cipher.next_iv() == 0
manifest = key.encrypt(b'ABC')
assert key.extract_nonce(manifest) == 0
assert key.cipher.extract_iv(manifest) == 0
manifest2 = key.encrypt(b'ABC')
assert manifest != manifest2
assert key.decrypt(None, manifest) == key.decrypt(None, manifest2)
assert key.extract_nonce(manifest2) == 1
iv = key.extract_nonce(manifest)
assert key.cipher.extract_iv(manifest2) == 1
iv = key.cipher.extract_iv(manifest)
key2 = KeyfileKey.detect(self.MockRepository(), manifest)
assert bytes_to_long(key2.enc_cipher.iv, 8) >= iv + num_aes_blocks(len(manifest) - KeyfileKey.PAYLOAD_OVERHEAD)
assert key2.cipher.next_iv() >= iv + key2.cipher.block_count(len(manifest) - KeyfileKey.PAYLOAD_OVERHEAD)
# Key data sanity check
assert len({key2.id_key, key2.enc_key, key2.enc_hmac_key}) == 3
assert key2.chunk_seed != 0
@ -138,7 +140,7 @@ class TestKey:
fd.write("0000000000002000")
key = KeyfileKey.create(repository, self.MockArgs())
data = key.encrypt(b'ABC')
assert key.extract_nonce(data) == 0x2000
assert key.cipher.extract_iv(data) == 0x2000
assert key.decrypt(None, data) == b'ABC'
def test_keyfile_kfenv(self, tmpdir, monkeypatch):
@ -183,20 +185,20 @@ class TestKey:
def test_passphrase(self, keys_dir, monkeypatch):
monkeypatch.setenv('BORG_PASSPHRASE', 'test')
key = PassphraseKey.create(self.MockRepository(), None)
assert bytes_to_long(key.enc_cipher.iv, 8) == 0
assert key.cipher.next_iv() == 0
assert hexlify(key.id_key) == b'793b0717f9d8fb01c751a487e9b827897ceea62409870600013fbc6b4d8d7ca6'
assert hexlify(key.enc_hmac_key) == b'b885a05d329a086627412a6142aaeb9f6c54ab7950f996dd65587251f6bc0901'
assert hexlify(key.enc_key) == b'2ff3654c6daf7381dbbe718d2b20b4f1ea1e34caa6cc65f6bb3ac376b93fed2a'
assert key.chunk_seed == -775740477
manifest = key.encrypt(b'ABC')
assert key.extract_nonce(manifest) == 0
assert key.cipher.extract_iv(manifest) == 0
manifest2 = key.encrypt(b'ABC')
assert manifest != manifest2
assert key.decrypt(None, manifest) == key.decrypt(None, manifest2)
assert key.extract_nonce(manifest2) == 1
iv = key.extract_nonce(manifest)
assert key.cipher.extract_iv(manifest2) == 1
iv = key.cipher.extract_iv(manifest)
key2 = PassphraseKey.detect(self.MockRepository(), manifest)
assert bytes_to_long(key2.enc_cipher.iv, 8) == iv + num_aes_blocks(len(manifest) - PassphraseKey.PAYLOAD_OVERHEAD)
assert key2.cipher.next_iv() == iv + key2.cipher.block_count(len(manifest))
assert key.id_key == key2.id_key
assert key.enc_hmac_key == key2.enc_hmac_key
assert key.enc_key == key2.enc_key
@ -208,7 +210,7 @@ class TestKey:
def _corrupt_byte(self, key, data, offset):
data = bytearray(data)
data[offset] ^= 1
with pytest.raises(IntegrityError):
with pytest.raises(IntegrityErrorBase):
key.decrypt(b'', data)
def test_decrypt_integrity(self, monkeypatch, keys_dir):

View file

@ -33,29 +33,6 @@ class TestNonceManager:
def commit_nonce_reservation(self, next_unreserved, start_nonce):
pytest.fail("commit_nonce_reservation should never be called on an old repository")
class MockEncCipher:
def __init__(self, iv):
self.iv_set = False # placeholder, this is never a valid iv
self.iv = iv
def reset(self, key, iv):
assert key is None
assert iv is not False
self.iv_set = iv
self.iv = iv
def expect_iv_and_advance(self, expected_iv, advance):
expected_iv = expected_iv.to_bytes(16, byteorder='big')
iv_set = self.iv_set
assert iv_set == expected_iv
self.iv_set = False
self.iv = advance.to_bytes(16, byteorder='big')
def expect_no_reset_and_advance(self, advance):
iv_set = self.iv_set
assert iv_set is False
self.iv = advance.to_bytes(16, byteorder='big')
def setUp(self):
self.repository = None
@ -70,74 +47,70 @@ class TestNonceManager:
def test_empty_cache_and_old_server(self, monkeypatch):
monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
enc_cipher = self.MockEncCipher(0x2000)
self.repository = self.MockOldRepository()
manager = NonceManager(self.repository, enc_cipher, 0x2000)
manager.ensure_reservation(19)
enc_cipher.expect_iv_and_advance(0x2000, 0x2013)
manager = NonceManager(self.repository, 0x2000)
next_nonce = manager.ensure_reservation(0x2000, 19)
assert next_nonce == 0x2000
assert self.cache_nonce() == "0000000000002033"
def test_empty_cache(self, monkeypatch):
monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
enc_cipher = self.MockEncCipher(0x2000)
self.repository = self.MockRepository()
self.repository.next_free = 0x2000
manager = NonceManager(self.repository, enc_cipher, 0x2000)
manager.ensure_reservation(19)
enc_cipher.expect_iv_and_advance(0x2000, 0x2013)
manager = NonceManager(self.repository, 0x2000)
next_nonce = manager.ensure_reservation(0x2000, 19)
assert next_nonce == 0x2000
assert self.cache_nonce() == "0000000000002033"
def test_empty_nonce(self, monkeypatch):
monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
enc_cipher = self.MockEncCipher(0x2000)
self.repository = self.MockRepository()
self.repository.next_free = None
manager = NonceManager(self.repository, enc_cipher, 0x2000)
manager.ensure_reservation(19)
enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19)
manager = NonceManager(self.repository, 0x2000)
next_nonce = manager.ensure_reservation(0x2000, 19)
assert next_nonce == 0x2000
assert self.cache_nonce() == "0000000000002033"
assert self.repository.next_free == 0x2033
# enough space in reservation
manager.ensure_reservation(13)
enc_cipher.expect_no_reset_and_advance(0x2000 + 19 + 13)
next_nonce = manager.ensure_reservation(0x2013, 13)
assert next_nonce == 0x2013
assert self.cache_nonce() == "0000000000002033"
assert self.repository.next_free == 0x2033
# just barely enough space in reservation
manager.ensure_reservation(19)
enc_cipher.expect_no_reset_and_advance(0x2000 + 19 + 13 + 19)
next_nonce = manager.ensure_reservation(0x2020, 19)
assert next_nonce == 0x2020
assert self.cache_nonce() == "0000000000002033"
assert self.repository.next_free == 0x2033
# no space in reservation
manager.ensure_reservation(16)
enc_cipher.expect_no_reset_and_advance(0x2000 + 19 + 13 + 19 + 16)
next_nonce = manager.ensure_reservation(0x2033, 16)
assert next_nonce == 0x2033
assert self.cache_nonce() == "0000000000002063"
assert self.repository.next_free == 0x2063
# spans reservation boundary
manager.ensure_reservation(64)
enc_cipher.expect_no_reset_and_advance(0x2000 + 19 + 13 + 19 + 16 + 64)
next_nonce = manager.ensure_reservation(0x2043, 64)
assert next_nonce == 0x2063
assert self.cache_nonce() == "00000000000020c3"
assert self.repository.next_free == 0x20c3
def test_sync_nonce(self, monkeypatch):
monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
enc_cipher = self.MockEncCipher(0x2000)
self.repository = self.MockRepository()
self.repository.next_free = 0x2000
self.set_cache_nonce("0000000000002000")
manager = NonceManager(self.repository, enc_cipher, 0x2000)
manager.ensure_reservation(19)
enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19)
manager = NonceManager(self.repository, 0x2000)
next_nonce = manager.ensure_reservation(0x2000, 19)
assert next_nonce == 0x2000
assert self.cache_nonce() == "0000000000002033"
assert self.repository.next_free == 0x2033
@ -145,14 +118,13 @@ class TestNonceManager:
def test_server_just_upgraded(self, monkeypatch):
monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
enc_cipher = self.MockEncCipher(0x2000)
self.repository = self.MockRepository()
self.repository.next_free = None
self.set_cache_nonce("0000000000002000")
manager = NonceManager(self.repository, enc_cipher, 0x2000)
manager.ensure_reservation(19)
enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19)
manager = NonceManager(self.repository, 0x2000)
next_nonce = manager.ensure_reservation(0x2000, 19)
assert next_nonce == 0x2000
assert self.cache_nonce() == "0000000000002033"
assert self.repository.next_free == 0x2033
@ -160,13 +132,12 @@ class TestNonceManager:
def test_transaction_abort_no_cache(self, monkeypatch):
monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
enc_cipher = self.MockEncCipher(0x1000)
self.repository = self.MockRepository()
self.repository.next_free = 0x2000
manager = NonceManager(self.repository, enc_cipher, 0x2000)
manager.ensure_reservation(19)
enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19)
manager = NonceManager(self.repository, 0x2000)
next_nonce = manager.ensure_reservation(0x1000, 19)
assert next_nonce == 0x2000
assert self.cache_nonce() == "0000000000002033"
assert self.repository.next_free == 0x2033
@ -174,27 +145,25 @@ class TestNonceManager:
def test_transaction_abort_old_server(self, monkeypatch):
monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
enc_cipher = self.MockEncCipher(0x1000)
self.repository = self.MockOldRepository()
self.set_cache_nonce("0000000000002000")
manager = NonceManager(self.repository, enc_cipher, 0x2000)
manager.ensure_reservation(19)
enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19)
manager = NonceManager(self.repository, 0x2000)
next_nonce = manager.ensure_reservation(0x1000, 19)
assert next_nonce == 0x2000
assert self.cache_nonce() == "0000000000002033"
def test_transaction_abort_on_other_client(self, monkeypatch):
monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
enc_cipher = self.MockEncCipher(0x1000)
self.repository = self.MockRepository()
self.repository.next_free = 0x2000
self.set_cache_nonce("0000000000001000")
manager = NonceManager(self.repository, enc_cipher, 0x2000)
manager.ensure_reservation(19)
enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19)
manager = NonceManager(self.repository, 0x2000)
next_nonce = manager.ensure_reservation(0x1000, 19)
assert next_nonce == 0x2000
assert self.cache_nonce() == "0000000000002033"
assert self.repository.next_free == 0x2033
@ -202,14 +171,13 @@ class TestNonceManager:
def test_interleaved(self, monkeypatch):
monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
enc_cipher = self.MockEncCipher(0x2000)
self.repository = self.MockRepository()
self.repository.next_free = 0x2000
self.set_cache_nonce("0000000000002000")
manager = NonceManager(self.repository, enc_cipher, 0x2000)
manager.ensure_reservation(19)
enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19)
manager = NonceManager(self.repository, 0x2000)
next_nonce = manager.ensure_reservation(0x2000, 19)
assert next_nonce == 0x2000
assert self.cache_nonce() == "0000000000002033"
assert self.repository.next_free == 0x2033
@ -218,13 +186,13 @@ class TestNonceManager:
self.repository.next_free = 0x4000
# enough space in reservation
manager.ensure_reservation(12)
enc_cipher.expect_no_reset_and_advance(0x2000 + 19 + 12)
next_nonce = manager.ensure_reservation(0x2013, 12)
assert next_nonce == 0x2013
assert self.cache_nonce() == "0000000000002033"
assert self.repository.next_free == 0x4000
# spans reservation boundary
manager.ensure_reservation(21)
enc_cipher.expect_iv_and_advance(0x4000, 0x4000 + 21)
next_nonce = manager.ensure_reservation(0x201f, 21)
assert next_nonce == 0x4000
assert self.cache_nonce() == "0000000000004035"
assert self.repository.next_free == 0x4035