mirror of
https://github.com/borgbackup/borg.git
synced 2026-06-10 17:32:13 -04:00
add own CSPRNG based on AES256-CTR and a 256bit key/seed.
the stuff in Python stdlib "random.Random" is not cryptographically strong and the stuff in Python stdlib "secrets" can't be seeded and does not offer shuffle.
This commit is contained in:
parent
17a5326c35
commit
bb7a4647ea
2 changed files with 345 additions and 0 deletions
|
|
@ -40,6 +40,10 @@ from math import ceil
|
|||
|
||||
from cpython cimport PyMem_Malloc, PyMem_Free
|
||||
from cpython.buffer cimport PyBUF_SIMPLE, PyObject_GetBuffer, PyBuffer_Release
|
||||
from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_AsString
|
||||
from libc.stdlib cimport malloc, free
|
||||
from libc.stdint cimport uint8_t, uint32_t, uint64_t
|
||||
from libc.string cimport memset, memcpy
|
||||
|
||||
API_VERSION = '1.3_01'
|
||||
|
||||
|
|
@ -714,3 +718,161 @@ def blake2b_256(key, data):
|
|||
|
||||
def blake2b_128(data):
|
||||
return hashlib.blake2b(data, digest_size=16).digest()
|
||||
|
||||
|
||||
cdef class CSPRNG:
|
||||
"""
|
||||
Cryptographically Secure Pseudo-Random Number Generator based on AES-CTR mode.
|
||||
|
||||
This class provides methods for generating random bytes and shuffling lists
|
||||
using a deterministic algorithm seeded with a 256-bit key.
|
||||
|
||||
The implementation uses AES-256 in CTR mode, which is a well-established
|
||||
method for creating a CSPRNG.
|
||||
"""
|
||||
cdef EVP_CIPHER_CTX *ctx
|
||||
cdef uint8_t key[32]
|
||||
cdef uint8_t iv[16]
|
||||
cdef uint8_t zeros[4096] # Static buffer for zeros
|
||||
cdef uint8_t buffer[4096] # Static buffer for random bytes
|
||||
cdef size_t buffer_size
|
||||
cdef size_t buffer_pos
|
||||
|
||||
def __cinit__(self, bytes seed_key):
|
||||
"""
|
||||
Initialize the CSPRNG with a 256-bit key.
|
||||
|
||||
:param seed_key: A 32-byte key used as the seed for the CSPRNG
|
||||
"""
|
||||
if len(seed_key) != 32:
|
||||
raise ValueError("Seed key must be 32 bytes (256 bits)")
|
||||
|
||||
# Initialize context
|
||||
self.ctx = EVP_CIPHER_CTX_new()
|
||||
if self.ctx == NULL:
|
||||
raise MemoryError("Failed to allocate cipher context")
|
||||
|
||||
self.key = seed_key[:32]
|
||||
|
||||
# Initialize to zeros
|
||||
memset(self.iv, 0, 16)
|
||||
memset(self.zeros, 0, 4096)
|
||||
|
||||
self.buffer_size = 4096
|
||||
self.buffer_pos = self.buffer_size # Force refill on first use
|
||||
|
||||
# Initialize the cipher
|
||||
if not EVP_EncryptInit_ex(self.ctx, EVP_aes_256_ctr(), NULL, self.key, self.iv):
|
||||
EVP_CIPHER_CTX_free(self.ctx)
|
||||
raise CryptoError("Failed to initialize AES-CTR cipher")
|
||||
|
||||
def __dealloc__(self):
|
||||
"""Free resources when the object is deallocated."""
|
||||
if self.ctx != NULL:
|
||||
EVP_CIPHER_CTX_free(self.ctx)
|
||||
self.ctx = NULL
|
||||
|
||||
cdef _refill_buffer(self):
|
||||
"""Refill the internal buffer with random bytes."""
|
||||
cdef int outlen = 0
|
||||
|
||||
# Encrypt zeros to get random bytes
|
||||
if not EVP_EncryptUpdate(self.ctx, self.buffer, &outlen, self.zeros, self.buffer_size):
|
||||
raise CryptoError("Failed to generate random bytes")
|
||||
if outlen != self.buffer_size:
|
||||
raise CryptoError("Unexpected length of random bytes")
|
||||
|
||||
self.buffer_pos = 0
|
||||
|
||||
def random_bytes(self, size_t n):
|
||||
"""
|
||||
Generate n random bytes.
|
||||
|
||||
:param n: Number of bytes to generate
|
||||
:return: a bytes object containing the random bytes
|
||||
"""
|
||||
# Directly create a Python bytes object of the required size
|
||||
cdef object py_bytes = PyBytes_FromStringAndSize(NULL, n)
|
||||
cdef uint8_t *result = <uint8_t *>PyBytes_AsString(py_bytes)
|
||||
cdef size_t remaining
|
||||
cdef size_t pos
|
||||
cdef size_t to_copy
|
||||
cdef size_t available
|
||||
|
||||
remaining = n
|
||||
pos = 0
|
||||
|
||||
while remaining > 0:
|
||||
if self.buffer_pos >= self.buffer_size:
|
||||
self._refill_buffer()
|
||||
|
||||
# Calculate how many bytes we can copy
|
||||
available = self.buffer_size - self.buffer_pos
|
||||
to_copy = remaining if remaining < available else available
|
||||
|
||||
# Copy bytes from buffer to result
|
||||
memcpy(result + pos, &self.buffer[self.buffer_pos], to_copy)
|
||||
|
||||
self.buffer_pos += to_copy
|
||||
pos += to_copy
|
||||
remaining -= to_copy
|
||||
|
||||
return py_bytes
|
||||
|
||||
def random_int(self, n):
|
||||
"""
|
||||
Generate a random integer in the range [0, n).
|
||||
|
||||
:param n: Upper bound (exclusive)
|
||||
:return: Random integer
|
||||
"""
|
||||
if n <= 0:
|
||||
raise ValueError("Upper bound must be positive")
|
||||
if n == 1:
|
||||
return 0
|
||||
|
||||
# Calculate the number of bits and bytes needed
|
||||
bits_needed = 0
|
||||
temp = n - 1
|
||||
while temp > 0:
|
||||
bits_needed += 1
|
||||
temp >>= 1
|
||||
bytes_needed = (bits_needed + 7) // 8
|
||||
|
||||
# Generate random bytes
|
||||
mask = (1 << bits_needed) - 1
|
||||
max_attempts = 1000 # Prevent infinite loop
|
||||
|
||||
# Rejection sampling to avoid bias
|
||||
attempts = 0
|
||||
while attempts < max_attempts:
|
||||
attempts += 1
|
||||
random_data = self.random_bytes(bytes_needed)
|
||||
result = int.from_bytes(random_data, byteorder='big')
|
||||
|
||||
# Apply mask to get the right number of bits
|
||||
result &= mask
|
||||
if result < n:
|
||||
return result
|
||||
|
||||
# If we reach here, we've made too many attempts
|
||||
# Fall back to a slightly biased but guaranteed-to-terminate method
|
||||
random_data = self.random_bytes(bytes_needed)
|
||||
result = int.from_bytes(random_data, byteorder='big')
|
||||
return result % n
|
||||
|
||||
def shuffle(self, list items):
|
||||
"""
|
||||
Shuffle a list in-place using the Fisher-Yates algorithm.
|
||||
|
||||
:param items: List to shuffle
|
||||
"""
|
||||
cdef size_t n = len(items)
|
||||
cdef size_t i, j
|
||||
|
||||
for i in range(n - 1, 0, -1):
|
||||
# Generate random index j such that 0 <= j <= i
|
||||
j = self.random_int(i + 1)
|
||||
|
||||
# Swap items[i] and items[j]
|
||||
items[i], items[j] = items[j], items[i]
|
||||
|
|
|
|||
183
src/borg/testsuite/crypto/csprng_test.py
Normal file
183
src/borg/testsuite/crypto/csprng_test.py
Normal file
|
|
@ -0,0 +1,183 @@
|
|||
import pytest
|
||||
|
||||
from ...crypto.low_level import CSPRNG
|
||||
|
||||
|
||||
# Test keys (32 bytes each)
|
||||
key1 = bytes.fromhex("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
|
||||
key2 = bytes.fromhex("fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210")
|
||||
|
||||
|
||||
def test_deterministic_output():
|
||||
"""Test that the same key produces the same random sequence."""
|
||||
# Create two CSPRNGs with the same key
|
||||
rng1 = CSPRNG(key1)
|
||||
rng2 = CSPRNG(key1)
|
||||
|
||||
# Generate random bytes from both
|
||||
bytes1 = rng1.random_bytes(100)
|
||||
bytes2 = rng2.random_bytes(100)
|
||||
|
||||
# They should be identical
|
||||
assert bytes1 == bytes2
|
||||
|
||||
# Different keys should produce different outputs
|
||||
rng3 = CSPRNG(key2)
|
||||
bytes3 = rng3.random_bytes(100)
|
||||
assert bytes1 != bytes3
|
||||
|
||||
|
||||
def test_random_bytes():
|
||||
"""Test the random_bytes method."""
|
||||
rng = CSPRNG(key1)
|
||||
|
||||
# Test different sizes
|
||||
for size in [1, 10, 100, 1000, 10000]:
|
||||
random_data = rng.random_bytes(size)
|
||||
|
||||
# Check type
|
||||
assert isinstance(random_data, bytes)
|
||||
|
||||
# Check length
|
||||
assert len(random_data) == size
|
||||
|
||||
|
||||
def test_random_int():
|
||||
"""Test the random_int method."""
|
||||
rng = CSPRNG(key1)
|
||||
|
||||
# Test different ranges
|
||||
for upper_bound in [2, 10, 100, 1000, 1000000, 1000000000, 1000000000000]:
|
||||
# Generate multiple random integers
|
||||
for _ in range(10):
|
||||
random_int = rng.random_int(upper_bound)
|
||||
|
||||
# Check range
|
||||
assert 0 <= random_int < upper_bound
|
||||
|
||||
# Check type
|
||||
assert isinstance(random_int, int)
|
||||
|
||||
|
||||
def test_random_int_edge_cases():
|
||||
"""Test the random_int method with edge cases."""
|
||||
rng = CSPRNG(key1)
|
||||
|
||||
# Test error case: upper_bound <= 0
|
||||
with pytest.raises(ValueError):
|
||||
rng.random_int(-1)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
rng.random_int(0)
|
||||
|
||||
# Test with upper bound 1
|
||||
assert rng.random_int(1) == 0
|
||||
|
||||
# Test with upper bound 2
|
||||
for _ in range(10):
|
||||
result = rng.random_int(2)
|
||||
assert 0 <= result < 2
|
||||
|
||||
# Test with upper bound that is a power of 2
|
||||
power_of_2 = 256
|
||||
for _ in range(10):
|
||||
result = rng.random_int(power_of_2)
|
||||
assert 0 <= result < power_of_2
|
||||
|
||||
# Test with upper bound that is one less than a power of 2
|
||||
almost_power_of_2 = 255
|
||||
for _ in range(10):
|
||||
result = rng.random_int(almost_power_of_2)
|
||||
assert 0 <= result < almost_power_of_2
|
||||
|
||||
# Test with upper bound that is one more than a power of 2
|
||||
just_over_power_of_2 = 257
|
||||
for _ in range(10):
|
||||
result = rng.random_int(just_over_power_of_2)
|
||||
assert 0 <= result < just_over_power_of_2
|
||||
|
||||
# Test with a large upper bound
|
||||
large_bound = 1000000000
|
||||
for _ in range(10):
|
||||
result = rng.random_int(large_bound)
|
||||
assert 0 <= result < large_bound
|
||||
|
||||
|
||||
def test_shuffle():
|
||||
"""Test the shuffle method."""
|
||||
rng1 = CSPRNG(key1)
|
||||
rng2 = CSPRNG(key1)
|
||||
|
||||
# Create two identical lists
|
||||
list1 = list(range(100))
|
||||
list2 = list(range(100))
|
||||
|
||||
# Shuffle both lists with the same key
|
||||
rng1.shuffle(list1)
|
||||
rng2.shuffle(list2)
|
||||
|
||||
# They should be identical after shuffling
|
||||
assert list1 == list2
|
||||
|
||||
# The shuffled list should be a permutation of the original
|
||||
assert sorted(list1) == list(range(100))
|
||||
|
||||
# Different keys should produce different shuffles
|
||||
rng3 = CSPRNG(key2)
|
||||
list3 = list(range(100))
|
||||
rng3.shuffle(list3)
|
||||
assert list1 != list3
|
||||
|
||||
# Getting another shuffled list by an already used RNG should produce a different shuffle
|
||||
list4 = list(range(100))
|
||||
rng1.shuffle(list4)
|
||||
assert list1 != list4
|
||||
|
||||
|
||||
def test_statistical_properties():
|
||||
"""Test basic statistical properties of the random output."""
|
||||
rng = CSPRNG(key1)
|
||||
|
||||
# Generate a large number of random bytes
|
||||
data = rng.random_bytes(10000)
|
||||
|
||||
# Count occurrences of each byte value
|
||||
counts = [0] * 256
|
||||
for byte in data:
|
||||
counts[byte] += 1
|
||||
|
||||
# Check that each byte value appears with roughly equal frequency
|
||||
# For 10000 bytes, each value should appear about 39 times (10000/256)
|
||||
# We allow a generous margin of error (±50%)
|
||||
for count in counts:
|
||||
assert 19 <= count <= 59, "Byte distribution is not uniform"
|
||||
|
||||
# Test bit distribution
|
||||
bits_set = 0
|
||||
for byte in data:
|
||||
bits_set += bin(byte).count("1")
|
||||
|
||||
# For random data, approximately 50% of bits should be set
|
||||
# 10000 bytes = 80000 bits, so about 40000 should be set
|
||||
# Allow ±5% margin
|
||||
assert 38000 <= bits_set <= 42000, "Bit distribution is not uniform"
|
||||
|
||||
|
||||
def test_large_shuffle():
|
||||
"""Test shuffling a large list."""
|
||||
rng = CSPRNG(key1)
|
||||
|
||||
# Create a large list
|
||||
large_list = list(range(10000))
|
||||
|
||||
# Make a copy for comparison
|
||||
original = large_list.copy()
|
||||
|
||||
# Shuffle the list
|
||||
rng.shuffle(large_list)
|
||||
|
||||
# The shuffled list should be different from the original
|
||||
assert large_list != original
|
||||
|
||||
# The shuffled list should be a permutation of the original
|
||||
assert sorted(large_list) == original
|
||||
Loading…
Reference in a new issue