From f58732290a0532024fb4ee547f93c2fd4522b227 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Sat, 23 May 2026 13:57:53 +0530 Subject: [PATCH] legacy: move AES to borg.legacy.crypto.low_level, refs #9556 --- pyproject.toml | 1 + setup.py | 7 ++ src/borg/crypto/low_level.pyx | 119 --------------------- src/borg/legacy/crypto/low_level.pyx | 150 +++++++++++++++++++++++++++ 4 files changed, 158 insertions(+), 119 deletions(-) create mode 100644 src/borg/legacy/crypto/low_level.pyx diff --git a/pyproject.toml b/pyproject.toml index de1f852d5..f330feaeb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -167,6 +167,7 @@ module = [ "pyfuse3", "trio", "borg.crypto.low_level", + "borg.legacy.crypto.low_level", "borg.platform.*", ] ignore_missing_imports = true diff --git a/setup.py b/setup.py index c95c09ed5..7ee2353b5 100644 --- a/setup.py +++ b/setup.py @@ -50,6 +50,7 @@ cflags = ["-Wall", "-Wextra", "-Wpointer-arith", "-Wno-unreachable-code-fallthro compress_source = "src/borg/compress.pyx" crypto_ll_source = "src/borg/crypto/low_level.pyx" +crypto_legacy_ll_source = "src/borg/legacy/crypto/low_level.pyx" buzhash_source = "src/borg/chunkers/buzhash.pyx" buzhash64_source = "src/borg/chunkers/buzhash64.pyx" reader_source = "src/borg/chunkers/reader.pyx" @@ -66,6 +67,7 @@ platform_windows_source = "src/borg/platform/windows.pyx" cython_sources = [ compress_source, crypto_ll_source, + crypto_legacy_ll_source, buzhash_source, buzhash64_source, reader_source, @@ -155,6 +157,10 @@ if not on_rtd: dict(sources=[crypto_ll_source]), crypto_ext_lib, dict(extra_compile_args=cflags) ) + crypto_legacy_ext_kwargs = members_appended( + dict(sources=[crypto_legacy_ll_source]), crypto_ext_lib, dict(extra_compile_args=cflags) + ) + compress_ext_kwargs = members_appended( dict(sources=[compress_source]), lib_ext_kwargs(pc, "BORG_LIBLZ4_PREFIX", "lz4", "liblz4", ">= 1.7.0"), @@ -174,6 +180,7 @@ if not on_rtd: ext_modules += [ Extension("borg.crypto.low_level", **crypto_ext_kwargs), + Extension("borg.legacy.crypto.low_level", **crypto_legacy_ext_kwargs), Extension("borg.compress", **compress_ext_kwargs), Extension("borg.hashindex", [hashindex_source], extra_compile_args=cflags), Extension("borg.item", [item_source], extra_compile_args=cflags), diff --git a/src/borg/crypto/low_level.pyx b/src/borg/crypto/low_level.pyx index 0d56717fa..149ea1d73 100644 --- a/src/borg/crypto/low_level.pyx +++ b/src/borg/crypto/low_level.pyx @@ -657,125 +657,6 @@ cdef class CHACHA20_POLY1305(_AEAD_BASE): super().__init__(key, iv=iv, header_len=header_len, aad_offset=aad_offset) -cdef class AES: # legacy - """A thin wrapper around the OpenSSL EVP cipher API - for legacy code, like key file encryption""" - cdef CIPHER cipher - cdef EVP_CIPHER_CTX *ctx - cdef unsigned char enc_key[32] - cdef int cipher_blk_len - cdef int iv_len - cdef unsigned char iv[16] - cdef long long blocks - - def __init__(self, enc_key, iv=None): - assert isinstance(enc_key, bytes) and len(enc_key) == 32 - self.enc_key = enc_key - self.iv_len = 16 - assert sizeof(self.iv) == self.iv_len - self.cipher = EVP_aes_256_ctr - self.cipher_blk_len = 16 - if iv is not None: - self.set_iv(iv) - else: - self.blocks = -1 # make sure set_iv is called before encrypt - - def __cinit__(self, enc_key, iv=None): - self.ctx = EVP_CIPHER_CTX_new() - - def __dealloc__(self): - EVP_CIPHER_CTX_free(self.ctx) - - def encrypt(self, data, iv=None): - if iv is not None: - self.set_iv(iv) - assert self.blocks == 0, 'iv needs to be set before encrypt is called' - cdef Py_buffer idata - cdef bint idata_acquired = False - cdef unsigned char *odata = NULL - cdef int ilen = len(data) - cdef int olen = 0 - cdef int offset - - try: - odata = PyMem_Malloc(ilen + self.cipher_blk_len) - if not odata: - raise MemoryError - - idata = ro_buffer(data) - idata_acquired = True - - if not EVP_EncryptInit_ex(self.ctx, self.cipher(), NULL, self.enc_key, self.iv): - raise Exception('EVP_EncryptInit_ex failed') - offset = 0 - if not EVP_EncryptUpdate(self.ctx, odata, &olen, idata.buf, ilen): - raise Exception('EVP_EncryptUpdate failed') - offset += olen - if not EVP_EncryptFinal_ex(self.ctx, odata+offset, &olen): - raise Exception('EVP_EncryptFinal failed') - offset += olen - self.blocks = self.block_count(offset) - return odata[:offset] - finally: - if odata: - PyMem_Free(odata) - if idata_acquired: - PyBuffer_Release(&idata) - - def decrypt(self, data): - cdef Py_buffer idata - cdef bint idata_acquired = False - cdef unsigned char *odata = NULL - cdef int ilen = len(data) - cdef int offset - cdef int olen = 0 - - try: - odata = PyMem_Malloc(ilen + self.cipher_blk_len) - if not odata: - raise MemoryError - - idata = ro_buffer(data) - idata_acquired = True - - # Set cipher type and mode - if not EVP_DecryptInit_ex(self.ctx, self.cipher(), NULL, self.enc_key, self.iv): - raise Exception('EVP_DecryptInit_ex failed') - offset = 0 - if not EVP_DecryptUpdate(self.ctx, odata, &olen, idata.buf, ilen): - raise Exception('EVP_DecryptUpdate failed') - offset += olen - if not EVP_DecryptFinal_ex(self.ctx, odata+offset, &olen): - # this error check is very important for modes with padding or - # authentication. for them, a failure here means corrupted data. - # CTR mode does not use padding nor authentication. - raise Exception('EVP_DecryptFinal failed') - offset += olen - self.blocks = self.block_count(ilen) - return odata[:offset] - finally: - if odata: - PyMem_Free(odata) - if idata_acquired: - PyBuffer_Release(&idata) - - def block_count(self, length): - return num_cipher_blocks(length, self.cipher_blk_len) - - def set_iv(self, iv): - # set_iv needs to be called before each encrypt() call, - # because encrypt does a full initialisation of the cipher context. - if isinstance(iv, int): - iv = iv.to_bytes(self.iv_len, byteorder='big') - assert isinstance(iv, bytes) and len(iv) == self.iv_len - self.iv = iv - self.blocks = 0 # number of cipher blocks encrypted with this IV - - def next_iv(self): - # call this after encrypt() to get the next iv (int) for the next encrypt() call - iv = int.from_bytes(self.iv[:self.iv_len], byteorder='big') - return iv + self.blocks - - def hmac_sha256(key, data): return hmac.digest(key, data, 'sha256') diff --git a/src/borg/legacy/crypto/low_level.pyx b/src/borg/legacy/crypto/low_level.pyx new file mode 100644 index 000000000..10af7891e --- /dev/null +++ b/src/borg/legacy/crypto/low_level.pyx @@ -0,0 +1,150 @@ +from cpython cimport PyMem_Malloc, PyMem_Free +from cpython.buffer cimport PyBUF_SIMPLE, PyObject_GetBuffer, PyBuffer_Release + +from ..crypto.low_level import num_cipher_blocks + + +cdef extern from "openssl/evp.h": + ctypedef struct EVP_CIPHER: + pass + ctypedef struct EVP_CIPHER_CTX: + pass + ctypedef struct ENGINE: + pass + + const EVP_CIPHER *EVP_aes_256_ctr() + + EVP_CIPHER_CTX *EVP_CIPHER_CTX_new() + void EVP_CIPHER_CTX_free(EVP_CIPHER_CTX *a) + + int EVP_EncryptInit_ex(EVP_CIPHER_CTX *ctx, const EVP_CIPHER *cipher, ENGINE *impl, + const unsigned char *key, const unsigned char *iv) + int EVP_DecryptInit_ex(EVP_CIPHER_CTX *ctx, const EVP_CIPHER *cipher, ENGINE *impl, + const unsigned char *key, const unsigned char *iv) + int EVP_EncryptUpdate(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl, + const unsigned char *in_, int inl) + int EVP_DecryptUpdate(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl, + const unsigned char *in_, int inl) + int EVP_EncryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl) + int EVP_DecryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl) + + +ctypedef const EVP_CIPHER * (* CIPHER)() + + +cdef Py_buffer ro_buffer(object data) except *: + cdef Py_buffer view + PyObject_GetBuffer(data, &view, PyBUF_SIMPLE) + return view + + +cdef class AES: + """A thin wrapper around the OpenSSL EVP cipher API - for legacy key file encryption.""" + cdef CIPHER cipher + cdef EVP_CIPHER_CTX *ctx + cdef unsigned char enc_key[32] + cdef int cipher_blk_len + cdef int iv_len + cdef unsigned char iv[16] + cdef long long blocks + + def __init__(self, enc_key, iv=None): + assert isinstance(enc_key, bytes) and len(enc_key) == 32 + self.enc_key = enc_key + self.iv_len = 16 + assert sizeof(self.iv) == self.iv_len + self.cipher = EVP_aes_256_ctr + self.cipher_blk_len = 16 + if iv is not None: + self.set_iv(iv) + else: + self.blocks = -1 # make sure set_iv is called before encrypt + + def __cinit__(self, enc_key, iv=None): + self.ctx = EVP_CIPHER_CTX_new() + + def __dealloc__(self): + EVP_CIPHER_CTX_free(self.ctx) + + def encrypt(self, data, iv=None): + if iv is not None: + self.set_iv(iv) + assert self.blocks == 0, 'iv needs to be set before encrypt is called' + cdef Py_buffer idata + cdef bint idata_acquired = False + cdef unsigned char *odata = NULL + cdef int ilen = len(data) + cdef int olen = 0 + cdef int offset + + try: + odata = PyMem_Malloc(ilen + self.cipher_blk_len) + if not odata: + raise MemoryError + + idata = ro_buffer(data) + idata_acquired = True + + if not EVP_EncryptInit_ex(self.ctx, self.cipher(), NULL, self.enc_key, self.iv): + raise Exception('EVP_EncryptInit_ex failed') + offset = 0 + if not EVP_EncryptUpdate(self.ctx, odata, &olen, idata.buf, ilen): + raise Exception('EVP_EncryptUpdate failed') + offset += olen + if not EVP_EncryptFinal_ex(self.ctx, odata+offset, &olen): + raise Exception('EVP_EncryptFinal failed') + offset += olen + self.blocks = self.block_count(offset) + return odata[:offset] + finally: + if odata: + PyMem_Free(odata) + if idata_acquired: + PyBuffer_Release(&idata) + + def decrypt(self, data): + cdef Py_buffer idata + cdef bint idata_acquired = False + cdef unsigned char *odata = NULL + cdef int ilen = len(data) + cdef int offset + cdef int olen = 0 + + try: + odata = PyMem_Malloc(ilen + self.cipher_blk_len) + if not odata: + raise MemoryError + + idata = ro_buffer(data) + idata_acquired = True + + if not EVP_DecryptInit_ex(self.ctx, self.cipher(), NULL, self.enc_key, self.iv): + raise Exception('EVP_DecryptInit_ex failed') + offset = 0 + if not EVP_DecryptUpdate(self.ctx, odata, &olen, idata.buf, ilen): + raise Exception('EVP_DecryptUpdate failed') + offset += olen + if not EVP_DecryptFinal_ex(self.ctx, odata+offset, &olen): + raise Exception('EVP_DecryptFinal failed') + offset += olen + self.blocks = self.block_count(ilen) + return odata[:offset] + finally: + if odata: + PyMem_Free(odata) + if idata_acquired: + PyBuffer_Release(&idata) + + def block_count(self, length): + return num_cipher_blocks(length, self.cipher_blk_len) + + def set_iv(self, iv): + if isinstance(iv, int): + iv = iv.to_bytes(self.iv_len, byteorder='big') + assert isinstance(iv, bytes) and len(iv) == self.iv_len + self.iv = iv + self.blocks = 0 + + def next_iv(self): + iv = int.from_bytes(self.iv[:self.iv_len], byteorder='big') + return iv + self.blocks