legacy: move AES to borg.legacy.crypto.low_level, refs #9556

This commit is contained in:
Mrityunjay Raj 2026-05-23 13:57:53 +05:30
parent fb6192bf86
commit f58732290a
4 changed files with 158 additions and 119 deletions

View file

@ -167,6 +167,7 @@ module = [
"pyfuse3",
"trio",
"borg.crypto.low_level",
"borg.legacy.crypto.low_level",
"borg.platform.*",
]
ignore_missing_imports = true

View file

@ -50,6 +50,7 @@ cflags = ["-Wall", "-Wextra", "-Wpointer-arith", "-Wno-unreachable-code-fallthro
compress_source = "src/borg/compress.pyx"
crypto_ll_source = "src/borg/crypto/low_level.pyx"
crypto_legacy_ll_source = "src/borg/legacy/crypto/low_level.pyx"
buzhash_source = "src/borg/chunkers/buzhash.pyx"
buzhash64_source = "src/borg/chunkers/buzhash64.pyx"
reader_source = "src/borg/chunkers/reader.pyx"
@ -66,6 +67,7 @@ platform_windows_source = "src/borg/platform/windows.pyx"
cython_sources = [
compress_source,
crypto_ll_source,
crypto_legacy_ll_source,
buzhash_source,
buzhash64_source,
reader_source,
@ -155,6 +157,10 @@ if not on_rtd:
dict(sources=[crypto_ll_source]), crypto_ext_lib, dict(extra_compile_args=cflags)
)
crypto_legacy_ext_kwargs = members_appended(
dict(sources=[crypto_legacy_ll_source]), crypto_ext_lib, dict(extra_compile_args=cflags)
)
compress_ext_kwargs = members_appended(
dict(sources=[compress_source]),
lib_ext_kwargs(pc, "BORG_LIBLZ4_PREFIX", "lz4", "liblz4", ">= 1.7.0"),
@ -174,6 +180,7 @@ if not on_rtd:
ext_modules += [
Extension("borg.crypto.low_level", **crypto_ext_kwargs),
Extension("borg.legacy.crypto.low_level", **crypto_legacy_ext_kwargs),
Extension("borg.compress", **compress_ext_kwargs),
Extension("borg.hashindex", [hashindex_source], extra_compile_args=cflags),
Extension("borg.item", [item_source], extra_compile_args=cflags),

View file

@ -657,125 +657,6 @@ cdef class CHACHA20_POLY1305(_AEAD_BASE):
super().__init__(key, iv=iv, header_len=header_len, aad_offset=aad_offset)
cdef class AES: # legacy
"""A thin wrapper around the OpenSSL EVP cipher API - for legacy code, like key file encryption"""
cdef CIPHER cipher
cdef EVP_CIPHER_CTX *ctx
cdef unsigned char enc_key[32]
cdef int cipher_blk_len
cdef int iv_len
cdef unsigned char iv[16]
cdef long long blocks
def __init__(self, enc_key, iv=None):
assert isinstance(enc_key, bytes) and len(enc_key) == 32
self.enc_key = enc_key
self.iv_len = 16
assert sizeof(self.iv) == self.iv_len
self.cipher = EVP_aes_256_ctr
self.cipher_blk_len = 16
if iv is not None:
self.set_iv(iv)
else:
self.blocks = -1 # make sure set_iv is called before encrypt
def __cinit__(self, enc_key, iv=None):
self.ctx = EVP_CIPHER_CTX_new()
def __dealloc__(self):
EVP_CIPHER_CTX_free(self.ctx)
def encrypt(self, data, iv=None):
if iv is not None:
self.set_iv(iv)
assert self.blocks == 0, 'iv needs to be set before encrypt is called'
cdef Py_buffer idata
cdef bint idata_acquired = False
cdef unsigned char *odata = NULL
cdef int ilen = len(data)
cdef int olen = 0
cdef int offset
try:
odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len)
if not odata:
raise MemoryError
idata = ro_buffer(data)
idata_acquired = True
if not EVP_EncryptInit_ex(self.ctx, self.cipher(), NULL, self.enc_key, self.iv):
raise Exception('EVP_EncryptInit_ex failed')
offset = 0
if not EVP_EncryptUpdate(self.ctx, odata, &olen, <const unsigned char*> idata.buf, ilen):
raise Exception('EVP_EncryptUpdate failed')
offset += olen
if not EVP_EncryptFinal_ex(self.ctx, odata+offset, &olen):
raise Exception('EVP_EncryptFinal failed')
offset += olen
self.blocks = self.block_count(offset)
return odata[:offset]
finally:
if odata:
PyMem_Free(odata)
if idata_acquired:
PyBuffer_Release(&idata)
def decrypt(self, data):
cdef Py_buffer idata
cdef bint idata_acquired = False
cdef unsigned char *odata = NULL
cdef int ilen = len(data)
cdef int offset
cdef int olen = 0
try:
odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len)
if not odata:
raise MemoryError
idata = ro_buffer(data)
idata_acquired = True
# Set cipher type and mode
if not EVP_DecryptInit_ex(self.ctx, self.cipher(), NULL, self.enc_key, self.iv):
raise Exception('EVP_DecryptInit_ex failed')
offset = 0
if not EVP_DecryptUpdate(self.ctx, odata, &olen, <const unsigned char*> idata.buf, ilen):
raise Exception('EVP_DecryptUpdate failed')
offset += olen
if not EVP_DecryptFinal_ex(self.ctx, odata+offset, &olen):
# this error check is very important for modes with padding or
# authentication. for them, a failure here means corrupted data.
# CTR mode does not use padding nor authentication.
raise Exception('EVP_DecryptFinal failed')
offset += olen
self.blocks = self.block_count(ilen)
return odata[:offset]
finally:
if odata:
PyMem_Free(odata)
if idata_acquired:
PyBuffer_Release(&idata)
def block_count(self, length):
return num_cipher_blocks(length, self.cipher_blk_len)
def set_iv(self, iv):
# set_iv needs to be called before each encrypt() call,
# because encrypt does a full initialisation of the cipher context.
if isinstance(iv, int):
iv = iv.to_bytes(self.iv_len, byteorder='big')
assert isinstance(iv, bytes) and len(iv) == self.iv_len
self.iv = iv
self.blocks = 0 # number of cipher blocks encrypted with this IV
def next_iv(self):
# call this after encrypt() to get the next iv (int) for the next encrypt() call
iv = int.from_bytes(self.iv[:self.iv_len], byteorder='big')
return iv + self.blocks
def hmac_sha256(key, data):
return hmac.digest(key, data, 'sha256')

View file

@ -0,0 +1,150 @@
from cpython cimport PyMem_Malloc, PyMem_Free
from cpython.buffer cimport PyBUF_SIMPLE, PyObject_GetBuffer, PyBuffer_Release
from ..crypto.low_level import num_cipher_blocks
cdef extern from "openssl/evp.h":
ctypedef struct EVP_CIPHER:
pass
ctypedef struct EVP_CIPHER_CTX:
pass
ctypedef struct ENGINE:
pass
const EVP_CIPHER *EVP_aes_256_ctr()
EVP_CIPHER_CTX *EVP_CIPHER_CTX_new()
void EVP_CIPHER_CTX_free(EVP_CIPHER_CTX *a)
int EVP_EncryptInit_ex(EVP_CIPHER_CTX *ctx, const EVP_CIPHER *cipher, ENGINE *impl,
const unsigned char *key, const unsigned char *iv)
int EVP_DecryptInit_ex(EVP_CIPHER_CTX *ctx, const EVP_CIPHER *cipher, ENGINE *impl,
const unsigned char *key, const unsigned char *iv)
int EVP_EncryptUpdate(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl,
const unsigned char *in_, int inl)
int EVP_DecryptUpdate(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl,
const unsigned char *in_, int inl)
int EVP_EncryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl)
int EVP_DecryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl)
ctypedef const EVP_CIPHER * (* CIPHER)()
cdef Py_buffer ro_buffer(object data) except *:
cdef Py_buffer view
PyObject_GetBuffer(data, &view, PyBUF_SIMPLE)
return view
cdef class AES:
"""A thin wrapper around the OpenSSL EVP cipher API - for legacy key file encryption."""
cdef CIPHER cipher
cdef EVP_CIPHER_CTX *ctx
cdef unsigned char enc_key[32]
cdef int cipher_blk_len
cdef int iv_len
cdef unsigned char iv[16]
cdef long long blocks
def __init__(self, enc_key, iv=None):
assert isinstance(enc_key, bytes) and len(enc_key) == 32
self.enc_key = enc_key
self.iv_len = 16
assert sizeof(self.iv) == self.iv_len
self.cipher = EVP_aes_256_ctr
self.cipher_blk_len = 16
if iv is not None:
self.set_iv(iv)
else:
self.blocks = -1 # make sure set_iv is called before encrypt
def __cinit__(self, enc_key, iv=None):
self.ctx = EVP_CIPHER_CTX_new()
def __dealloc__(self):
EVP_CIPHER_CTX_free(self.ctx)
def encrypt(self, data, iv=None):
if iv is not None:
self.set_iv(iv)
assert self.blocks == 0, 'iv needs to be set before encrypt is called'
cdef Py_buffer idata
cdef bint idata_acquired = False
cdef unsigned char *odata = NULL
cdef int ilen = len(data)
cdef int olen = 0
cdef int offset
try:
odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len)
if not odata:
raise MemoryError
idata = ro_buffer(data)
idata_acquired = True
if not EVP_EncryptInit_ex(self.ctx, self.cipher(), NULL, self.enc_key, self.iv):
raise Exception('EVP_EncryptInit_ex failed')
offset = 0
if not EVP_EncryptUpdate(self.ctx, odata, &olen, <const unsigned char*> idata.buf, ilen):
raise Exception('EVP_EncryptUpdate failed')
offset += olen
if not EVP_EncryptFinal_ex(self.ctx, odata+offset, &olen):
raise Exception('EVP_EncryptFinal failed')
offset += olen
self.blocks = self.block_count(offset)
return odata[:offset]
finally:
if odata:
PyMem_Free(odata)
if idata_acquired:
PyBuffer_Release(&idata)
def decrypt(self, data):
cdef Py_buffer idata
cdef bint idata_acquired = False
cdef unsigned char *odata = NULL
cdef int ilen = len(data)
cdef int offset
cdef int olen = 0
try:
odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len)
if not odata:
raise MemoryError
idata = ro_buffer(data)
idata_acquired = True
if not EVP_DecryptInit_ex(self.ctx, self.cipher(), NULL, self.enc_key, self.iv):
raise Exception('EVP_DecryptInit_ex failed')
offset = 0
if not EVP_DecryptUpdate(self.ctx, odata, &olen, <const unsigned char*> idata.buf, ilen):
raise Exception('EVP_DecryptUpdate failed')
offset += olen
if not EVP_DecryptFinal_ex(self.ctx, odata+offset, &olen):
raise Exception('EVP_DecryptFinal failed')
offset += olen
self.blocks = self.block_count(ilen)
return odata[:offset]
finally:
if odata:
PyMem_Free(odata)
if idata_acquired:
PyBuffer_Release(&idata)
def block_count(self, length):
return num_cipher_blocks(length, self.cipher_blk_len)
def set_iv(self, iv):
if isinstance(iv, int):
iv = iv.to_bytes(self.iv_len, byteorder='big')
assert isinstance(iv, bytes) and len(iv) == self.iv_len
self.iv = iv
self.blocks = 0
def next_iv(self):
iv = int.from_bytes(self.iv[:self.iv_len], byteorder='big')
return iv + self.blocks