crypto low_level: fix freeing of memory

The previous code performed allocations and buffer acquisitions before the
`try` block. If a later allocation or buffer acquisition failed, execution did
not enter the `finally` block, so resources acquired earlier in the setup path
could leak.

Move allocation and buffer acquisition into the guarded block, initialize raw
output pointers to `NULL`, and only call `PyMem_Free` or `PyBuffer_Release`
for resources that were actually acquired.
This commit is contained in:
Thomas Waldmann 2026-04-24 02:20:46 +02:00
parent c409e767b9
commit 70b3e6ee4c
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01

View file

@ -234,15 +234,25 @@ cdef class AES256_CTR_BASE:
assert hlen == self.header_len
cdef int aoffset = self.aad_offset
cdef int alen = hlen - aoffset
cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(hlen + self.mac_len + self.iv_len_short +
ilen + self.cipher_blk_len) # play safe, 1 extra blk
if not odata:
raise MemoryError
cdef int olen = 0
cdef Py_buffer idata
cdef bint idata_acquired = False
cdef Py_buffer hdata
cdef bint hdata_acquired = False
cdef unsigned char *odata = NULL
cdef int olen
cdef int offset
cdef Py_buffer idata = ro_buffer(data)
cdef Py_buffer hdata = ro_buffer(header)
try:
odata = <unsigned char *>PyMem_Malloc(hlen + self.mac_len + self.iv_len_short +
ilen + self.cipher_blk_len) # play safe, 1 extra blk
if not odata:
raise MemoryError
idata = ro_buffer(data)
idata_acquired = True
hdata = ro_buffer(header)
hdata_acquired = True
offset = 0
for i in range(hlen):
odata[offset+i] = header[i]
@ -264,9 +274,12 @@ cdef class AES256_CTR_BASE:
self.blocks += self.block_count(ilen)
return odata[:offset]
finally:
PyMem_Free(odata)
PyBuffer_Release(&hdata)
PyBuffer_Release(&idata)
if odata:
PyMem_Free(odata)
if hdata_acquired:
PyBuffer_Release(&hdata)
if idata_acquired:
PyBuffer_Release(&idata)
def decrypt(self, envelope, aad=None):
"""
@ -276,15 +289,22 @@ cdef class AES256_CTR_BASE:
cdef int hlen = self.header_len
cdef int aoffset = self.aad_offset
cdef int alen = hlen - aoffset
cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len) # play safe, 1 extra blk
if not odata:
raise MemoryError
cdef int olen = 0
cdef Py_buffer idata
cdef bint idata_acquired = False
cdef unsigned char *odata = NULL
cdef int olen
cdef int offset
cdef unsigned char mac_buf[32]
assert sizeof(mac_buf) == self.mac_len
cdef Py_buffer idata = ro_buffer(envelope)
try:
odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len) # play safe, 1 extra blk
if not odata:
raise MemoryError
idata = ro_buffer(envelope)
idata_acquired = True
self.mac_verify(<const unsigned char *> idata.buf+aoffset, alen,
<const unsigned char *> idata.buf+hlen+self.mac_len, ilen-hlen-self.mac_len,
mac_buf, <const unsigned char *> idata.buf+hlen)
@ -304,8 +324,10 @@ cdef class AES256_CTR_BASE:
self.blocks += self.block_count(offset)
return odata[:offset]
finally:
PyMem_Free(odata)
PyBuffer_Release(&idata)
if odata:
PyMem_Free(odata)
if idata_acquired:
PyBuffer_Release(&idata)
def block_count(self, length):
return num_cipher_blocks(length, self.cipher_blk_len)
@ -639,14 +661,21 @@ cdef class AES: # legacy
if iv is not None:
self.set_iv(iv)
assert self.blocks == 0, 'iv needs to be set before encrypt is called'
cdef Py_buffer idata = ro_buffer(data)
cdef Py_buffer idata
cdef bint idata_acquired = False
cdef unsigned char *odata = NULL
cdef int ilen = len(data)
cdef int offset
cdef int olen = 0
cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len)
if not odata:
raise MemoryError
cdef int offset
try:
odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len)
if not odata:
raise MemoryError
idata = ro_buffer(data)
idata_acquired = True
if not EVP_EncryptInit_ex(self.ctx, self.cipher(), NULL, self.enc_key, self.iv):
raise Exception('EVP_EncryptInit_ex failed')
offset = 0
@ -659,18 +688,27 @@ cdef class AES: # legacy
self.blocks = self.block_count(offset)
return odata[:offset]
finally:
PyMem_Free(odata)
PyBuffer_Release(&idata)
if odata:
PyMem_Free(odata)
if idata_acquired:
PyBuffer_Release(&idata)
def decrypt(self, data):
cdef Py_buffer idata = ro_buffer(data)
cdef Py_buffer idata
cdef bint idata_acquired = False
cdef unsigned char *odata = NULL
cdef int ilen = len(data)
cdef int offset
cdef int olen = 0
cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len)
if not odata:
raise MemoryError
try:
odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len)
if not odata:
raise MemoryError
idata = ro_buffer(data)
idata_acquired = True
# Set cipher type and mode
if not EVP_DecryptInit_ex(self.ctx, self.cipher(), NULL, self.enc_key, self.iv):
raise Exception('EVP_DecryptInit_ex failed')
@ -687,8 +725,10 @@ cdef class AES: # legacy
self.blocks = self.block_count(ilen)
return odata[:offset]
finally:
PyMem_Free(odata)
PyBuffer_Release(&idata)
if odata:
PyMem_Free(odata)
if idata_acquired:
PyBuffer_Release(&idata)
def block_count(self, length):
return num_cipher_blocks(length, self.cipher_blk_len)