diff --git a/src/borg/testsuite/archiver/create_cmd_test.py b/src/borg/testsuite/archiver/create_cmd_test.py index 08794a33e..230fa9ce0 100644 --- a/src/borg/testsuite/archiver/create_cmd_test.py +++ b/src/borg/testsuite/archiver/create_cmd_test.py @@ -12,6 +12,7 @@ import pytest from ... import platform from ...constants import * # NOQA +from ...constants import zeros from ...manifest import Manifest from ...platform import is_cygwin, is_win32, is_darwin from ...repository import Repository @@ -926,3 +927,127 @@ def test_common_options(archivers, request): cmd(archiver, "repo-create", RK_ENCRYPTION) log = cmd(archiver, "--debug", "create", "test", "input") assert "security: read previous location" in log + + +def test_create_big_zeros_files(archivers, request): + """Test creating an archive from 10 files with 10MB zeros each.""" + archiver = request.getfixturevalue(archivers) + # Create 10 files with 10,000,000 bytes of zeros each + count, size = 10, 10 * 1000 * 1000 + assert size <= len(zeros) + for i in range(count): + create_regular_file(archiver.input_path, f"zeros_{i}", contents=memoryview(zeros)[:size]) + # Create repository and archive + cmd(archiver, "repo-create", RK_ENCRYPTION) + cmd(archiver, "create", "test", "input") + + # Extract the archive to verify contents + with tempfile.TemporaryDirectory() as extract_path: + with changedir(extract_path): + cmd(archiver, "extract", "test") + + # Verify that the extracted files have the correct contents + for i in range(count): + extracted_file_path = os.path.join(extract_path, "input", f"zeros_{i}") + with open(extracted_file_path, "rb") as f: + extracted_data = f.read() + # Verify the file contains only zeros and has the correct size + assert extracted_data == bytes(size) + assert len(extracted_data) == size + + # Also verify the directory structure matches + assert_dirs_equal(archiver.input_path, os.path.join(extract_path, "input")) + + +def test_create_big_random_files(archivers, request): + """Test creating an archive from 10 files with 10MB random data each.""" + archiver = request.getfixturevalue(archivers) + # Create 10 files with 10,000,000 bytes of random data each + count, size = 10, 10 * 1000 * 1000 + random_data = {} + for i in range(count): + data = os.urandom(size) + random_data[i] = data + create_regular_file(archiver.input_path, f"random_{i}", contents=data) + # Create repository and archive + cmd(archiver, "repo-create", RK_ENCRYPTION) + cmd(archiver, "create", "test", "input") + + # Extract the archive to verify contents + with tempfile.TemporaryDirectory() as extract_path: + with changedir(extract_path): + cmd(archiver, "extract", "test") + + # Verify that the extracted files have the correct contents + for i in range(count): + extracted_file_path = os.path.join(extract_path, "input", f"random_{i}") + with open(extracted_file_path, "rb") as f: + extracted_data = f.read() + # Verify the file contains the original random data and has the correct size + assert extracted_data == random_data[i] + assert len(extracted_data) == size + + # Also verify the directory structure matches + assert_dirs_equal(archiver.input_path, os.path.join(extract_path, "input")) + + +def test_create_with_compression_algorithms(archivers, request): + """Test creating archives with different compression algorithms.""" + archiver = request.getfixturevalue(archivers) + + # Create test files: 5 files with zeros (highly compressible) and 5 with random data (incompressible) + count, size = 5, 1 * 1000 * 1000 # 1MB per file + random_data = {} + + # Create zeros files + for i in range(count): + create_regular_file(archiver.input_path, f"zeros_{i}", contents=memoryview(zeros)[:size]) + + # Create random files + for i in range(count): + data = os.urandom(size) + random_data[i] = data + create_regular_file(archiver.input_path, f"random_{i}", contents=data) + + # Create repository + cmd(archiver, "repo-create", RK_ENCRYPTION) + + # Test different compression algorithms + algorithms = [ + "none", # No compression + "lz4", # Fast compression + "zlib,6", # Medium compression + "zstd,3", # Good compression/speed balance + "lzma,6", # High compression + ] + + for algo in algorithms: + # Create archive with specific compression algorithm + archive_name = f"test_{algo.replace(',', '_')}" + cmd(archiver, "create", "--compression", algo, archive_name, "input") + + # Extract the archive to verify contents + with tempfile.TemporaryDirectory() as extract_path: + with changedir(extract_path): + cmd(archiver, "extract", archive_name) + + # Verify zeros files + for i in range(count): + extracted_file_path = os.path.join(extract_path, "input", f"zeros_{i}") + with open(extracted_file_path, "rb") as f: + extracted_data = f.read() + # Verify the file contains only zeros and has the correct size + assert extracted_data == bytes(size) + assert len(extracted_data) == size + + # Verify random files + for i in range(count): + extracted_file_path = os.path.join(extract_path, "input", f"random_{i}") + with open(extracted_file_path, "rb") as f: + extracted_data = f.read() + # Verify the file contains the original random data and has the correct size + assert extracted_data == random_data[i] + assert len(extracted_data) == size + + # Also verify the directory structure matches + assert_dirs_equal(archiver.input_path, os.path.join(extract_path, "input"))