diff --git a/src/borg/testsuite/archiver/prune_cmd_test.py b/src/borg/testsuite/archiver/prune_cmd_test.py index 207f2bdb7..9dda19a0b 100644 --- a/src/borg/testsuite/archiver/prune_cmd_test.py +++ b/src/borg/testsuite/archiver/prune_cmd_test.py @@ -1,8 +1,12 @@ import re -from datetime import datetime +from datetime import datetime, timezone, timedelta + +import pytest from ...constants import * # NOQA +from ...archiver.prune_cmd import prune_split, prune_within from . import cmd, RK_ENCRYPTION, src_dir, generate_archiver_tests +from ...helpers import interval pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary") # NOQA @@ -257,3 +261,142 @@ def test_prune_ignore_protected(archivers, request): output = cmd(archiver, "repo-list") assert "archive1" in output # @PROT protected archive1 from deletion assert "archive3" in output # last one + + +class MockArchive: + def __init__(self, ts, id): + self.ts = ts + self.id = id + + def __repr__(self): + return f"{self.id}: {self.ts.isoformat()}" + + +# This is the local timezone of the system running the tests. +# We need this e.g. to construct archive timestamps for the prune tests, +# because borg prune operates in the local timezone (it first converts the +# archive timestamp to the local timezone). So, if we want the y/m/d/h/m/s +# values which prune uses to be exactly the ones we give [and NOT shift them +# by tzoffset], we need to give the timestamps in the same local timezone. +# Please note that the timestamps in a real borg archive or manifest are +# stored in UTC timezone. +local_tz = datetime.now(tz=timezone.utc).astimezone(tz=None).tzinfo + + +def test_prune_within(): + def subset(lst, indices): + return {lst[i] for i in indices} + + def dotest(test_archives, within, indices): + for ta in test_archives, reversed(test_archives): + kept_because = {} + keep = prune_within(ta, interval(within), kept_because) + assert set(keep) == subset(test_archives, indices) + assert all("within" == kept_because[a.id][0] for a in keep) + + # 1 minute, 1.5 hours, 2.5 hours, 3.5 hours, 25 hours, 49 hours + test_offsets = [60, 90 * 60, 150 * 60, 210 * 60, 25 * 60 * 60, 49 * 60 * 60] + now = datetime.now(timezone.utc) + test_dates = [now - timedelta(seconds=s) for s in test_offsets] + test_archives = [MockArchive(date, i) for i, date in enumerate(test_dates)] + + dotest(test_archives, "15S", []) + dotest(test_archives, "2M", [0]) + dotest(test_archives, "1H", [0]) + dotest(test_archives, "2H", [0, 1]) + dotest(test_archives, "3H", [0, 1, 2]) + dotest(test_archives, "24H", [0, 1, 2, 3]) + dotest(test_archives, "26H", [0, 1, 2, 3, 4]) + dotest(test_archives, "2d", [0, 1, 2, 3, 4]) + dotest(test_archives, "50H", [0, 1, 2, 3, 4, 5]) + dotest(test_archives, "3d", [0, 1, 2, 3, 4, 5]) + dotest(test_archives, "1w", [0, 1, 2, 3, 4, 5]) + dotest(test_archives, "1m", [0, 1, 2, 3, 4, 5]) + dotest(test_archives, "1y", [0, 1, 2, 3, 4, 5]) + + +@pytest.mark.parametrize( + "rule,num_to_keep,expected_ids", + [ + ("yearly", 3, (13, 2, 1)), + ("monthly", 3, (13, 8, 4)), + ("weekly", 2, (13, 8)), + ("daily", 3, (13, 8, 7)), + ("hourly", 3, (13, 10, 8)), + ("minutely", 3, (13, 10, 9)), + ("secondly", 4, (13, 12, 11, 10)), + ("daily", 0, []), + ], +) +def test_prune_split(rule, num_to_keep, expected_ids): + def subset(lst, ids): + return {i for i in lst if i.id in ids} + + archives = [ + # years apart + MockArchive(datetime(2015, 1, 1, 10, 0, 0, tzinfo=local_tz), 1), + MockArchive(datetime(2016, 1, 1, 10, 0, 0, tzinfo=local_tz), 2), + MockArchive(datetime(2017, 1, 1, 10, 0, 0, tzinfo=local_tz), 3), + # months apart + MockArchive(datetime(2017, 2, 1, 10, 0, 0, tzinfo=local_tz), 4), + MockArchive(datetime(2017, 3, 1, 10, 0, 0, tzinfo=local_tz), 5), + # days apart + MockArchive(datetime(2017, 3, 2, 10, 0, 0, tzinfo=local_tz), 6), + MockArchive(datetime(2017, 3, 3, 10, 0, 0, tzinfo=local_tz), 7), + MockArchive(datetime(2017, 3, 4, 10, 0, 0, tzinfo=local_tz), 8), + # minutes apart + MockArchive(datetime(2017, 10, 1, 9, 45, 0, tzinfo=local_tz), 9), + MockArchive(datetime(2017, 10, 1, 9, 55, 0, tzinfo=local_tz), 10), + # seconds apart + MockArchive(datetime(2017, 10, 1, 10, 0, 1, tzinfo=local_tz), 11), + MockArchive(datetime(2017, 10, 1, 10, 0, 3, tzinfo=local_tz), 12), + MockArchive(datetime(2017, 10, 1, 10, 0, 5, tzinfo=local_tz), 13), + ] + kept_because = {} + keep = prune_split(archives, rule, num_to_keep, kept_because) + + assert set(keep) == subset(archives, expected_ids) + for item in keep: + assert kept_because[item.id][0] == rule + + +def test_prune_split_keep_oldest(): + def subset(lst, ids): + return {i for i in lst if i.id in ids} + + archives = [ + # oldest backup, but not last in its year + MockArchive(datetime(2018, 1, 1, 10, 0, 0, tzinfo=local_tz), 1), + # an interim backup + MockArchive(datetime(2018, 12, 30, 10, 0, 0, tzinfo=local_tz), 2), + # year-end backups + MockArchive(datetime(2018, 12, 31, 10, 0, 0, tzinfo=local_tz), 3), + MockArchive(datetime(2019, 12, 31, 10, 0, 0, tzinfo=local_tz), 4), + ] + + # Keep oldest when retention target can't otherwise be met + kept_because = {} + keep = prune_split(archives, "yearly", 3, kept_because) + + assert set(keep) == subset(archives, [1, 3, 4]) + assert kept_because[1][0] == "yearly[oldest]" + assert kept_because[3][0] == "yearly" + assert kept_because[4][0] == "yearly" + + # Otherwise, prune it + kept_because = {} + keep = prune_split(archives, "yearly", 2, kept_because) + + assert set(keep) == subset(archives, [3, 4]) + assert kept_because[3][0] == "yearly" + assert kept_because[4][0] == "yearly" + + +def test_prune_split_no_archives(): + archives = [] + + kept_because = {} + keep = prune_split(archives, "yearly", 3, kept_because) + + assert keep == [] + assert kept_because == {} diff --git a/src/borg/testsuite/helpers/__init__.py b/src/borg/testsuite/helpers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/borg/testsuite/helpers/__init__test.py b/src/borg/testsuite/helpers/__init__test.py new file mode 100644 index 000000000..7eee4ddf5 --- /dev/null +++ b/src/borg/testsuite/helpers/__init__test.py @@ -0,0 +1,64 @@ +import pytest + +from ...constants import * # NOQA +from ...helpers import classify_ec, max_ec + + +@pytest.mark.parametrize( + "ec_range,ec_class", + ( + # inclusive range start, exclusive range end + ((0, 1), "success"), + ((1, 2), "warning"), + ((2, 3), "error"), + ((EXIT_ERROR_BASE, EXIT_WARNING_BASE), "error"), + ((EXIT_WARNING_BASE, EXIT_SIGNAL_BASE), "warning"), + ((EXIT_SIGNAL_BASE, 256), "signal"), + ), +) +def test_classify_ec(ec_range, ec_class): + for ec in range(*ec_range): + classify_ec(ec) == ec_class + + +def test_ec_invalid(): + with pytest.raises(ValueError): + classify_ec(666) + with pytest.raises(ValueError): + classify_ec(-1) + with pytest.raises(TypeError): + classify_ec(None) + + +@pytest.mark.parametrize( + "ec1,ec2,ec_max", + ( + # same for modern / legacy + (EXIT_SUCCESS, EXIT_SUCCESS, EXIT_SUCCESS), + (EXIT_SUCCESS, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE), + # legacy exit codes + (EXIT_SUCCESS, EXIT_WARNING, EXIT_WARNING), + (EXIT_SUCCESS, EXIT_ERROR, EXIT_ERROR), + (EXIT_WARNING, EXIT_SUCCESS, EXIT_WARNING), + (EXIT_WARNING, EXIT_WARNING, EXIT_WARNING), + (EXIT_WARNING, EXIT_ERROR, EXIT_ERROR), + (EXIT_WARNING, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE), + (EXIT_ERROR, EXIT_SUCCESS, EXIT_ERROR), + (EXIT_ERROR, EXIT_WARNING, EXIT_ERROR), + (EXIT_ERROR, EXIT_ERROR, EXIT_ERROR), + (EXIT_ERROR, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE), + # some modern codes + (EXIT_SUCCESS, EXIT_WARNING_BASE, EXIT_WARNING_BASE), + (EXIT_SUCCESS, EXIT_ERROR_BASE, EXIT_ERROR_BASE), + (EXIT_WARNING_BASE, EXIT_SUCCESS, EXIT_WARNING_BASE), + (EXIT_WARNING_BASE + 1, EXIT_WARNING_BASE + 2, EXIT_WARNING_BASE + 1), + (EXIT_WARNING_BASE, EXIT_ERROR_BASE, EXIT_ERROR_BASE), + (EXIT_WARNING_BASE, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE), + (EXIT_ERROR_BASE, EXIT_SUCCESS, EXIT_ERROR_BASE), + (EXIT_ERROR_BASE, EXIT_WARNING_BASE, EXIT_ERROR_BASE), + (EXIT_ERROR_BASE + 1, EXIT_ERROR_BASE + 2, EXIT_ERROR_BASE + 1), + (EXIT_ERROR_BASE, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE), + ), +) +def test_max_ec(ec1, ec2, ec_max): + assert max_ec(ec1, ec2) == ec_max diff --git a/src/borg/testsuite/helpers/datastruct_test.py b/src/borg/testsuite/helpers/datastruct_test.py new file mode 100644 index 000000000..b5403e5a2 --- /dev/null +++ b/src/borg/testsuite/helpers/datastruct_test.py @@ -0,0 +1,66 @@ +import hashlib +import pytest + +from ...helpers.datastruct import StableDict, Buffer +from ...helpers import msgpack + + +def test_stable_dict(): + d = StableDict(foo=1, bar=2, boo=3, baz=4) + assert list(d.items()) == [("bar", 2), ("baz", 4), ("boo", 3), ("foo", 1)] + assert hashlib.md5(msgpack.packb(d)).hexdigest() == "fc78df42cd60691b3ac3dd2a2b39903f" + + +class TestBuffer: + def test_type(self): + buffer = Buffer(bytearray) + assert isinstance(buffer.get(), bytearray) + buffer = Buffer(bytes) # don't do that in practice + assert isinstance(buffer.get(), bytes) + + def test_len(self): + buffer = Buffer(bytearray, size=0) + b = buffer.get() + assert len(buffer) == len(b) == 0 + buffer = Buffer(bytearray, size=1234) + b = buffer.get() + assert len(buffer) == len(b) == 1234 + + def test_resize(self): + buffer = Buffer(bytearray, size=100) + assert len(buffer) == 100 + b1 = buffer.get() + buffer.resize(200) + assert len(buffer) == 200 + b2 = buffer.get() + assert b2 is not b1 # new, bigger buffer + buffer.resize(100) + assert len(buffer) >= 100 + b3 = buffer.get() + assert b3 is b2 # still same buffer (200) + buffer.resize(100, init=True) + assert len(buffer) == 100 # except on init + b4 = buffer.get() + assert b4 is not b3 # new, smaller buffer + + def test_limit(self): + buffer = Buffer(bytearray, size=100, limit=200) + buffer.resize(200) + assert len(buffer) == 200 + with pytest.raises(Buffer.MemoryLimitExceeded): + buffer.resize(201) + assert len(buffer) == 200 + + def test_get(self): + buffer = Buffer(bytearray, size=100, limit=200) + b1 = buffer.get(50) + assert len(b1) >= 50 # == 100 + b2 = buffer.get(100) + assert len(b2) >= 100 # == 100 + assert b2 is b1 # did not need resizing yet + b3 = buffer.get(200) + assert len(b3) == 200 + assert b3 is not b2 # new, resized buffer + with pytest.raises(Buffer.MemoryLimitExceeded): + buffer.get(201) # beyond limit + assert len(buffer) == 200 diff --git a/src/borg/testsuite/helpers/fs_test.py b/src/borg/testsuite/helpers/fs_test.py new file mode 100644 index 000000000..eb5b63973 --- /dev/null +++ b/src/borg/testsuite/helpers/fs_test.py @@ -0,0 +1,437 @@ +import errno +import os +import sys +from contextlib import contextmanager + +import pytest + +from ...constants import * # NOQA +from ...constants import CACHE_TAG_NAME, CACHE_TAG_CONTENTS +from ...helpers.fs import ( + dir_is_tagged, + get_base_dir, + get_cache_dir, + get_keys_dir, + get_security_dir, + get_config_dir, + get_runtime_dir, + dash_open, + safe_unlink, + remove_dotdot_prefixes, + make_path_safe, +) +from ...platform import is_win32, is_darwin +from .. import are_hardlinks_supported +from .. import rejected_dotdot_paths + + +def test_get_base_dir(monkeypatch): + """test that get_base_dir respects environment""" + monkeypatch.delenv("BORG_BASE_DIR", raising=False) + monkeypatch.delenv("HOME", raising=False) + monkeypatch.delenv("USER", raising=False) + assert get_base_dir(legacy=True) == os.path.expanduser("~") + monkeypatch.setenv("USER", "root") + assert get_base_dir(legacy=True) == os.path.expanduser("~root") + monkeypatch.setenv("HOME", "/var/tmp/home") + assert get_base_dir(legacy=True) == "/var/tmp/home" + monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base") + assert get_base_dir(legacy=True) == "/var/tmp/base" + # non-legacy is much easier: + monkeypatch.delenv("BORG_BASE_DIR", raising=False) + assert get_base_dir(legacy=False) is None + monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base") + assert get_base_dir(legacy=False) == "/var/tmp/base" + + +def test_get_base_dir_compat(monkeypatch): + """test that it works the same for legacy and for non-legacy implementation""" + monkeypatch.delenv("BORG_BASE_DIR", raising=False) + # old way: if BORG_BASE_DIR is not set, make something up with HOME/USER/~ + # new way: if BORG_BASE_DIR is not set, return None and let caller deal with it. + assert get_base_dir(legacy=False) is None + # new and old way: BORG_BASE_DIR overrides all other "base path determination". + monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base") + assert get_base_dir(legacy=False) == get_base_dir(legacy=True) + + +def test_get_config_dir(monkeypatch): + """test that get_config_dir respects environment""" + monkeypatch.delenv("BORG_BASE_DIR", raising=False) + home_dir = os.path.expanduser("~") + if is_win32: + monkeypatch.delenv("BORG_CONFIG_DIR", raising=False) + assert get_config_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg") + monkeypatch.setenv("BORG_CONFIG_DIR", home_dir) + assert get_config_dir(create=False) == home_dir + elif is_darwin: + monkeypatch.delenv("BORG_CONFIG_DIR", raising=False) + assert get_config_dir(create=False) == os.path.join(home_dir, "Library", "Application Support", "borg") + monkeypatch.setenv("BORG_CONFIG_DIR", "/var/tmp") + assert get_config_dir(create=False) == "/var/tmp" + else: + monkeypatch.delenv("XDG_CONFIG_HOME", raising=False) + monkeypatch.delenv("BORG_CONFIG_DIR", raising=False) + assert get_config_dir(create=False) == os.path.join(home_dir, ".config", "borg") + monkeypatch.setenv("XDG_CONFIG_HOME", "/var/tmp/.config") + assert get_config_dir(create=False) == os.path.join("/var/tmp/.config", "borg") + monkeypatch.setenv("BORG_CONFIG_DIR", "/var/tmp") + assert get_config_dir(create=False) == "/var/tmp" + + +def test_get_config_dir_compat(monkeypatch): + """test that it works the same for legacy and for non-legacy implementation""" + monkeypatch.delenv("BORG_CONFIG_DIR", raising=False) + monkeypatch.delenv("BORG_BASE_DIR", raising=False) + monkeypatch.delenv("XDG_CONFIG_HOME", raising=False) + if not is_darwin and not is_win32: + # fails on macOS: assert '/Users/tw/Library/Application Support/borg' == '/Users/tw/.config/borg' + # fails on win32 MSYS2 (but we do not need legacy compat there). + assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False) + monkeypatch.setenv("XDG_CONFIG_HOME", "/var/tmp/xdg.config.d") + # fails on macOS: assert '/Users/tw/Library/Application Support/borg' == '/var/tmp/xdg.config.d' + # fails on win32 MSYS2 (but we do not need legacy compat there). + assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False) + monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base") + assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False) + monkeypatch.setenv("BORG_CONFIG_DIR", "/var/tmp/borg.config.d") + assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False) + + +def test_get_cache_dir(monkeypatch): + """test that get_cache_dir respects environment""" + monkeypatch.delenv("BORG_BASE_DIR", raising=False) + home_dir = os.path.expanduser("~") + if is_win32: + monkeypatch.delenv("BORG_CACHE_DIR", raising=False) + assert get_cache_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg", "Cache") + monkeypatch.setenv("BORG_CACHE_DIR", home_dir) + assert get_cache_dir(create=False) == home_dir + elif is_darwin: + monkeypatch.delenv("BORG_CACHE_DIR", raising=False) + assert get_cache_dir(create=False) == os.path.join(home_dir, "Library", "Caches", "borg") + monkeypatch.setenv("BORG_CACHE_DIR", "/var/tmp") + assert get_cache_dir(create=False) == "/var/tmp" + else: + monkeypatch.delenv("XDG_CACHE_HOME", raising=False) + monkeypatch.delenv("BORG_CACHE_DIR", raising=False) + assert get_cache_dir(create=False) == os.path.join(home_dir, ".cache", "borg") + monkeypatch.setenv("XDG_CACHE_HOME", "/var/tmp/.cache") + assert get_cache_dir(create=False) == os.path.join("/var/tmp/.cache", "borg") + monkeypatch.setenv("BORG_CACHE_DIR", "/var/tmp") + assert get_cache_dir(create=False) == "/var/tmp" + + +def test_get_cache_dir_compat(monkeypatch): + """test that it works the same for legacy and for non-legacy implementation""" + monkeypatch.delenv("BORG_CACHE_DIR", raising=False) + monkeypatch.delenv("BORG_BASE_DIR", raising=False) + monkeypatch.delenv("XDG_CACHE_HOME", raising=False) + if not is_darwin and not is_win32: + # fails on macOS: assert '/Users/tw/Library/Caches/borg' == '/Users/tw/.cache/borg' + # fails on win32 MSYS2 (but we do not need legacy compat there). + assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False) + # fails on macOS: assert '/Users/tw/Library/Caches/borg' == '/var/tmp/xdg.cache.d' + # fails on win32 MSYS2 (but we do not need legacy compat there). + monkeypatch.setenv("XDG_CACHE_HOME", "/var/tmp/xdg.cache.d") + assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False) + monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base") + assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False) + monkeypatch.setenv("BORG_CACHE_DIR", "/var/tmp/borg.cache.d") + assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False) + + +def test_get_keys_dir(monkeypatch): + """test that get_keys_dir respects environment""" + monkeypatch.delenv("BORG_BASE_DIR", raising=False) + home_dir = os.path.expanduser("~") + if is_win32: + monkeypatch.delenv("BORG_KEYS_DIR", raising=False) + assert get_keys_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg", "keys") + monkeypatch.setenv("BORG_KEYS_DIR", home_dir) + assert get_keys_dir(create=False) == home_dir + elif is_darwin: + monkeypatch.delenv("BORG_KEYS_DIR", raising=False) + assert get_keys_dir(create=False) == os.path.join(home_dir, "Library", "Application Support", "borg", "keys") + monkeypatch.setenv("BORG_KEYS_DIR", "/var/tmp") + assert get_keys_dir(create=False) == "/var/tmp" + else: + monkeypatch.delenv("XDG_CONFIG_HOME", raising=False) + monkeypatch.delenv("BORG_KEYS_DIR", raising=False) + assert get_keys_dir(create=False) == os.path.join(home_dir, ".config", "borg", "keys") + monkeypatch.setenv("XDG_CONFIG_HOME", "/var/tmp/.config") + assert get_keys_dir(create=False) == os.path.join("/var/tmp/.config", "borg", "keys") + monkeypatch.setenv("BORG_KEYS_DIR", "/var/tmp") + assert get_keys_dir(create=False) == "/var/tmp" + + +def test_get_security_dir(monkeypatch): + """test that get_security_dir respects environment""" + monkeypatch.delenv("BORG_BASE_DIR", raising=False) + home_dir = os.path.expanduser("~") + if is_win32: + monkeypatch.delenv("BORG_SECURITY_DIR", raising=False) + assert get_security_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg", "security") + assert get_security_dir(repository_id="1234", create=False) == os.path.join( + home_dir, "AppData", "Local", "borg", "borg", "security", "1234" + ) + monkeypatch.setenv("BORG_SECURITY_DIR", home_dir) + assert get_security_dir(create=False) == home_dir + elif is_darwin: + monkeypatch.delenv("BORG_SECURITY_DIR", raising=False) + assert get_security_dir(create=False) == os.path.join( + home_dir, "Library", "Application Support", "borg", "security" + ) + assert get_security_dir(repository_id="1234", create=False) == os.path.join( + home_dir, "Library", "Application Support", "borg", "security", "1234" + ) + monkeypatch.setenv("BORG_SECURITY_DIR", "/var/tmp") + assert get_security_dir(create=False) == "/var/tmp" + else: + monkeypatch.delenv("XDG_DATA_HOME", raising=False) + monkeypatch.delenv("BORG_SECURITY_DIR", raising=False) + assert get_security_dir(create=False) == os.path.join(home_dir, ".local", "share", "borg", "security") + assert get_security_dir(repository_id="1234", create=False) == os.path.join( + home_dir, ".local", "share", "borg", "security", "1234" + ) + monkeypatch.setenv("XDG_DATA_HOME", "/var/tmp/.config") + assert get_security_dir(create=False) == os.path.join("/var/tmp/.config", "borg", "security") + monkeypatch.setenv("BORG_SECURITY_DIR", "/var/tmp") + assert get_security_dir(create=False) == "/var/tmp" + + +def test_get_runtime_dir(monkeypatch): + """test that get_runtime_dir respects environment""" + monkeypatch.delenv("BORG_BASE_DIR", raising=False) + home_dir = os.path.expanduser("~") + if is_win32: + monkeypatch.delenv("BORG_RUNTIME_DIR", raising=False) + assert get_runtime_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "Temp", "borg", "borg") + monkeypatch.setenv("BORG_RUNTIME_DIR", home_dir) + assert get_runtime_dir(create=False) == home_dir + elif is_darwin: + monkeypatch.delenv("BORG_RUNTIME_DIR", raising=False) + assert get_runtime_dir(create=False) == os.path.join(home_dir, "Library", "Caches", "TemporaryItems", "borg") + monkeypatch.setenv("BORG_RUNTIME_DIR", "/var/tmp") + assert get_runtime_dir(create=False) == "/var/tmp" + else: + monkeypatch.delenv("XDG_RUNTIME_DIR", raising=False) + monkeypatch.delenv("BORG_RUNTIME_DIR", raising=False) + uid = str(os.getuid()) + assert get_runtime_dir(create=False) in [ + os.path.join("/run/user", uid, "borg"), + os.path.join("/var/run/user", uid, "borg"), + os.path.join(f"/tmp/runtime-{uid}", "borg"), + ] + monkeypatch.setenv("XDG_RUNTIME_DIR", "/var/tmp/.cache") + assert get_runtime_dir(create=False) == os.path.join("/var/tmp/.cache", "borg") + monkeypatch.setenv("BORG_RUNTIME_DIR", "/var/tmp") + assert get_runtime_dir(create=False) == "/var/tmp" + + +def test_dash_open(): + assert dash_open("-", "r") is sys.stdin + assert dash_open("-", "w") is sys.stdout + assert dash_open("-", "rb") is sys.stdin.buffer + assert dash_open("-", "wb") is sys.stdout.buffer + + +@pytest.mark.skipif(not are_hardlinks_supported(), reason="hardlinks not supported") +def test_safe_unlink_is_safe(tmpdir): + contents = b"Hello, world\n" + victim = tmpdir / "victim" + victim.write_binary(contents) + hard_link = tmpdir / "hardlink" + os.link(str(victim), str(hard_link)) # hard_link.mklinkto is not implemented on win32 + + safe_unlink(hard_link) + + assert victim.read_binary() == contents + + +@pytest.mark.skipif(not are_hardlinks_supported(), reason="hardlinks not supported") +def test_safe_unlink_is_safe_ENOSPC(tmpdir, monkeypatch): + contents = b"Hello, world\n" + victim = tmpdir / "victim" + victim.write_binary(contents) + hard_link = tmpdir / "hardlink" + os.link(str(victim), str(hard_link)) # hard_link.mklinkto is not implemented on win32 + + def os_unlink(_): + raise OSError(errno.ENOSPC, "Pretend that we ran out of space") + + monkeypatch.setattr(os, "unlink", os_unlink) + + with pytest.raises(OSError): + safe_unlink(hard_link) + + assert victim.read_binary() == contents + + +@pytest.mark.parametrize( + "original_path, expected_path", + [ + (".", "."), + ("..", "."), + ("/", "."), + ("//", "."), + ("foo", "foo"), + ("foo/bar", "foo/bar"), + ("/foo/bar", "foo/bar"), + ("../foo/bar", "foo/bar"), + ], +) +def test_remove_dotdot_prefixes(original_path, expected_path): + assert remove_dotdot_prefixes(original_path) == expected_path + + +@pytest.mark.parametrize( + "original_path, expected_path", + [ + (".", "."), + ("./", "."), + ("/foo", "foo"), + ("//foo", "foo"), + (".//foo//bar//", "foo/bar"), + ("/foo/bar", "foo/bar"), + ("//foo/bar", "foo/bar"), + ("//foo/./bar", "foo/bar"), + (".test", ".test"), + (".test.", ".test."), + ("..test..", "..test.."), + ("/te..st/foo/bar", "te..st/foo/bar"), + ("/..test../abc//", "..test../abc"), + ], +) +def test_valid_make_path_safe(original_path, expected_path): + assert make_path_safe(original_path) == expected_path + + +@pytest.mark.parametrize("path", rejected_dotdot_paths) +def test_invalid_make_path_safe(path): + with pytest.raises(ValueError, match="unexpected '..' element in path"): + make_path_safe(path) + + +def test_dir_is_tagged(tmpdir): + """Test dir_is_tagged with both path-based and file descriptor-based operations.""" + + @contextmanager + def open_dir(path): + fd = os.open(path, os.O_RDONLY) + try: + yield fd + finally: + os.close(fd) + + # Create directories for testing exclude_caches + cache_dir = tmpdir.mkdir("cache_dir") + cache_tag_path = cache_dir.join(CACHE_TAG_NAME) + cache_tag_path.write_binary(CACHE_TAG_CONTENTS) + + invalid_cache_dir = tmpdir.mkdir("invalid_cache_dir") + invalid_cache_tag_path = invalid_cache_dir.join(CACHE_TAG_NAME) + invalid_cache_tag_path.write_binary(b"invalid signature") + + # Create directories for testing exclude_if_present + tagged_dir = tmpdir.mkdir("tagged_dir") + tag_file = tagged_dir.join(".NOBACKUP") + tag_file.write("test") + + other_tagged_dir = tmpdir.mkdir("other_tagged_dir") + other_tag_file = other_tagged_dir.join(".DONOTBACKUP") + other_tag_file.write("test") + + # Create a directory with both a CACHEDIR.TAG and a custom tag file + both_dir = tmpdir.mkdir("both_dir") + cache_tag_path = both_dir.join(CACHE_TAG_NAME) + cache_tag_path.write_binary(CACHE_TAG_CONTENTS) + custom_tag_path = both_dir.join(".NOBACKUP") + custom_tag_path.write("test") + + # Create a directory without any tag files + normal_dir = tmpdir.mkdir("normal_dir") + + # Test edge cases + test_dir = tmpdir.mkdir("test_dir") + assert dir_is_tagged(path=str(test_dir), exclude_caches=None, exclude_if_present=None) == [] + assert dir_is_tagged(path=str(test_dir), exclude_if_present=[]) == [] + + # Test with non-existent directory (should not raise an exception) + non_existent_dir = str(tmpdir.join("non_existent")) + result = dir_is_tagged(path=non_existent_dir, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) + assert result == [] + + # Test 1: exclude_caches with path-based operations + assert dir_is_tagged(path=str(cache_dir), exclude_caches=True) == [CACHE_TAG_NAME] + assert dir_is_tagged(path=str(invalid_cache_dir), exclude_caches=True) == [] + assert dir_is_tagged(path=str(normal_dir), exclude_caches=True) == [] + + assert dir_is_tagged(path=str(cache_dir), exclude_caches=False) == [] + assert dir_is_tagged(path=str(invalid_cache_dir), exclude_caches=False) == [] + assert dir_is_tagged(path=str(normal_dir), exclude_caches=False) == [] + + # Test 2: exclude_caches with file-descriptor-based operations + with open_dir(str(cache_dir)) as fd: + assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == [CACHE_TAG_NAME] + with open_dir(str(invalid_cache_dir)) as fd: + assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == [] + with open_dir(str(normal_dir)) as fd: + assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == [] + + with open_dir(str(cache_dir)) as fd: + assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == [] + with open_dir(str(invalid_cache_dir)) as fd: + assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == [] + with open_dir(str(normal_dir)) as fd: + assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == [] + + # Test 3: exclude_if_present with path-based operations + tags = [".NOBACKUP"] + assert dir_is_tagged(path=str(tagged_dir), exclude_if_present=tags) == [".NOBACKUP"] + assert dir_is_tagged(path=str(other_tagged_dir), exclude_if_present=tags) == [] + assert dir_is_tagged(path=str(normal_dir), exclude_if_present=tags) == [] + + tags = [".NOBACKUP", ".DONOTBACKUP"] + assert dir_is_tagged(path=str(tagged_dir), exclude_if_present=tags) == [".NOBACKUP"] + assert dir_is_tagged(path=str(other_tagged_dir), exclude_if_present=tags) == [".DONOTBACKUP"] + assert dir_is_tagged(path=str(normal_dir), exclude_if_present=tags) == [] + + # Test 4: exclude_if_present with file descriptor-based operations + tags = [".NOBACKUP"] + with open_dir(str(tagged_dir)) as fd: + assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".NOBACKUP"] + with open_dir(str(other_tagged_dir)) as fd: + assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [] + with open_dir(str(normal_dir)) as fd: + assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [] + + tags = [".NOBACKUP", ".DONOTBACKUP"] + with open_dir(str(tagged_dir)) as fd: + assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".NOBACKUP"] + with open_dir(str(other_tagged_dir)) as fd: + assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".DONOTBACKUP"] + with open_dir(str(normal_dir)) as fd: + assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [] + + # Test 5: both exclude types with path-based operations + assert sorted(dir_is_tagged(path=str(both_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"])) == [ + ".NOBACKUP", + CACHE_TAG_NAME, + ] + assert dir_is_tagged(path=str(cache_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [CACHE_TAG_NAME] + assert dir_is_tagged(path=str(tagged_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [".NOBACKUP"] + assert dir_is_tagged(path=str(normal_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [] + + # Test 6: both exclude types with file descriptor-based operations + with open_dir(str(both_dir)) as fd: + assert sorted(dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"])) == [ + ".NOBACKUP", + CACHE_TAG_NAME, + ] + with open_dir(str(cache_dir)) as fd: + assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [CACHE_TAG_NAME] + with open_dir(str(tagged_dir)) as fd: + assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [".NOBACKUP"] + with open_dir(str(normal_dir)) as fd: + assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [] diff --git a/src/borg/testsuite/helpers/misc_test.py b/src/borg/testsuite/helpers/misc_test.py new file mode 100644 index 000000000..7fa28a6ff --- /dev/null +++ b/src/borg/testsuite/helpers/misc_test.py @@ -0,0 +1,52 @@ +from io import StringIO, BytesIO + +import pytest + +from ...helpers.misc import ChunkIteratorFileWrapper, chunkit, iter_separated + + +def test_chunk_file_wrapper(): + cfw = ChunkIteratorFileWrapper(iter([b"abc", b"def"])) + assert cfw.read(2) == b"ab" + assert cfw.read(50) == b"cdef" + assert cfw.exhausted + + cfw = ChunkIteratorFileWrapper(iter([])) + assert cfw.read(2) == b"" + assert cfw.exhausted + + +def test_chunkit(): + it = chunkit("abcdefg", 3) + assert next(it) == ["a", "b", "c"] + assert next(it) == ["d", "e", "f"] + assert next(it) == ["g"] + with pytest.raises(StopIteration): + next(it) + with pytest.raises(StopIteration): + next(it) + + it = chunkit("ab", 3) + assert list(it) == [["a", "b"]] + + it = chunkit("", 3) + assert list(it) == [] + + +def test_iter_separated(): + # newline and utf-8 + sep, items = "\n", ["foo", "bar/baz", "αáčő"] + fd = StringIO(sep.join(items)) + assert list(iter_separated(fd)) == items + # null and bogus ending + sep, items = "\0", ["foo/bar", "baz", "spam"] + fd = StringIO(sep.join(items) + "\0") + assert list(iter_separated(fd, sep=sep)) == ["foo/bar", "baz", "spam"] + # multichar + sep, items = "SEP", ["foo/bar", "baz", "spam"] + fd = StringIO(sep.join(items)) + assert list(iter_separated(fd, sep=sep)) == items + # bytes + sep, items = b"\n", [b"foo", b"blop\t", b"gr\xe4ezi"] + fd = BytesIO(sep.join(items)) + assert list(iter_separated(fd)) == items diff --git a/src/borg/testsuite/helpers/msgpack_test.py b/src/borg/testsuite/helpers/msgpack_test.py new file mode 100644 index 000000000..fe14a3a58 --- /dev/null +++ b/src/borg/testsuite/helpers/msgpack_test.py @@ -0,0 +1,36 @@ +import sys +import pytest + +from ...helpers.msgpack import is_slow_msgpack +from ...platform import is_cygwin + + +def expected_py_mp_slow_combination(): + """do we expect msgpack to be slow in this environment?""" + # we need to import upstream msgpack package here, not helpers.msgpack: + import msgpack + + # msgpack is slow on cygwin + if is_cygwin: + return True + # msgpack < 1.0.6 did not have py312 wheels + if sys.version_info[:2] == (3, 12) and msgpack.version < (1, 0, 6): + return True + # otherwise we expect msgpack to be fast! + return False + + +@pytest.mark.skipif(expected_py_mp_slow_combination(), reason="ignore expected slow msgpack") +def test_is_slow_msgpack(): + # we need to import upstream msgpack package here, not helpers.msgpack: + import msgpack + import msgpack.fallback + + saved_packer = msgpack.Packer + try: + msgpack.Packer = msgpack.fallback.Packer + assert is_slow_msgpack() + finally: + msgpack.Packer = saved_packer + # this tests that we have fast msgpack on test platform: + assert not is_slow_msgpack() diff --git a/src/borg/testsuite/helpers/parseformat_test.py b/src/borg/testsuite/helpers/parseformat_test.py new file mode 100644 index 000000000..d7cd002d0 --- /dev/null +++ b/src/borg/testsuite/helpers/parseformat_test.py @@ -0,0 +1,606 @@ +import base64 +import os +from argparse import ArgumentTypeError +from datetime import datetime, timezone + +import pytest + +from ...constants import * # NOQA +from ...helpers.parseformat import ( + bin_to_hex, + binary_to_json, + text_to_json, + Location, + archivename_validator, + text_validator, + format_file_size, + parse_file_size, + interval, + partial_format, + clean_lines, + format_line, + PlaceholderError, + replace_placeholders, + swidth_slice, + eval_escapes, + ChunkerParams, +) +from ...helpers.time import format_timedelta, parse_timestamp + + +def test_bin_to_hex(): + assert bin_to_hex(b"") == "" + assert bin_to_hex(b"\x00\x01\xff") == "0001ff" + + +@pytest.mark.parametrize( + "key,value", + [("key", b"\x00\x01\x02\x03"), ("key", b"\x00\x01\x02"), ("key", b"\x00\x01"), ("key", b"\x00"), ("key", b"")], +) +def test_binary_to_json(key, value): + key_b64 = key + "_b64" + d = binary_to_json(key, value) + assert key_b64 in d + assert base64.b64decode(d[key_b64]) == value + + +@pytest.mark.parametrize( + "key,value,strict", + [ + ("key", "abc", True), + ("key", "äöü", True), + ("key", "", True), + ("key", b"\x00\xff".decode("utf-8", errors="surrogateescape"), False), + ("key", "äöü".encode("latin1").decode("utf-8", errors="surrogateescape"), False), + ], +) +def test_text_to_json(key, value, strict): + key_b64 = key + "_b64" + d = text_to_json(key, value) + value_b = value.encode("utf-8", errors="surrogateescape") + if strict: + # no surrogate-escapes, just unicode text + assert key in d + assert d[key] == value_b.decode("utf-8", errors="strict") + assert d[key].encode("utf-8", errors="strict") == value_b + assert key_b64 not in d # not needed. pure valid unicode. + else: + # requiring surrogate-escapes. text has replacement chars, base64 representation is present. + assert key in d + assert d[key] == value.encode("utf-8", errors="replace").decode("utf-8", errors="strict") + assert d[key].encode("utf-8", errors="strict") == value.encode("utf-8", errors="replace") + assert key_b64 in d + assert base64.b64decode(d[key_b64]) == value_b + + +class TestLocationWithoutEnv: + @pytest.fixture + def keys_dir(self, tmpdir, monkeypatch): + tmpdir = str(tmpdir) + monkeypatch.setenv("BORG_KEYS_DIR", tmpdir) + if not tmpdir.endswith(os.path.sep): + tmpdir += os.path.sep + return tmpdir + + def test_ssh(self, monkeypatch, keys_dir): + monkeypatch.delenv("BORG_REPO", raising=False) + assert ( + repr(Location("ssh://user@host:1234//absolute/path")) + == "Location(proto='ssh', user='user', host='host', port=1234, path='/absolute/path')" + ) + assert Location("ssh://user@host:1234//absolute/path").to_key_filename() == keys_dir + "host___absolute_path" + assert ( + repr(Location("ssh://user@host:1234/relative/path")) + == "Location(proto='ssh', user='user', host='host', port=1234, path='relative/path')" + ) + assert Location("ssh://user@host:1234/relative/path").to_key_filename() == keys_dir + "host__relative_path" + assert ( + repr(Location("ssh://user@host/relative/path")) + == "Location(proto='ssh', user='user', host='host', port=None, path='relative/path')" + ) + assert ( + repr(Location("ssh://user@[::]:1234/relative/path")) + == "Location(proto='ssh', user='user', host='::', port=1234, path='relative/path')" + ) + assert Location("ssh://user@[::]:1234/relative/path").to_key_filename() == keys_dir + "____relative_path" + assert ( + repr(Location("ssh://user@[::]/relative/path")) + == "Location(proto='ssh', user='user', host='::', port=None, path='relative/path')" + ) + assert ( + repr(Location("ssh://user@[2001:db8::]:1234/relative/path")) + == "Location(proto='ssh', user='user', host='2001:db8::', port=1234, path='relative/path')" + ) + assert ( + Location("ssh://user@[2001:db8::]:1234/relative/path").to_key_filename() + == keys_dir + "2001_db8____relative_path" + ) + assert ( + repr(Location("ssh://user@[2001:db8::]/relative/path")) + == "Location(proto='ssh', user='user', host='2001:db8::', port=None, path='relative/path')" + ) + assert ( + repr(Location("ssh://user@[2001:db8::c0:ffee]:1234/relative/path")) + == "Location(proto='ssh', user='user', host='2001:db8::c0:ffee', port=1234, path='relative/path')" + ) + assert ( + repr(Location("ssh://user@[2001:db8::c0:ffee]/relative/path")) + == "Location(proto='ssh', user='user', host='2001:db8::c0:ffee', port=None, path='relative/path')" + ) + assert ( + repr(Location("ssh://user@[2001:db8::192.0.2.1]:1234/relative/path")) + == "Location(proto='ssh', user='user', host='2001:db8::192.0.2.1', port=1234, path='relative/path')" + ) + assert ( + repr(Location("ssh://user@[2001:db8::192.0.2.1]/relative/path")) + == "Location(proto='ssh', user='user', host='2001:db8::192.0.2.1', port=None, path='relative/path')" + ) + assert ( + Location("ssh://user@[2001:db8::192.0.2.1]/relative/path").to_key_filename() + == keys_dir + "2001_db8__192_0_2_1__relative_path" + ) + assert ( + repr(Location("ssh://user@[2a02:0001:0002:0003:0004:0005:0006:0007]/relative/path")) + == "Location(proto='ssh', user='user', " + "host='2a02:0001:0002:0003:0004:0005:0006:0007', port=None, path='relative/path')" + ) + assert ( + repr(Location("ssh://user@[2a02:0001:0002:0003:0004:0005:0006:0007]:1234/relative/path")) + == "Location(proto='ssh', user='user', " + "host='2a02:0001:0002:0003:0004:0005:0006:0007', port=1234, path='relative/path')" + ) + + def test_rclone(self, monkeypatch, keys_dir): + monkeypatch.delenv("BORG_REPO", raising=False) + assert ( + repr(Location("rclone:remote:path")) + == "Location(proto='rclone', user=None, host=None, port=None, path='remote:path')" + ) + assert Location("rclone:remote:path").to_key_filename() == keys_dir + "remote_path" + + def test_sftp(self, monkeypatch, keys_dir): + monkeypatch.delenv("BORG_REPO", raising=False) + # relative path + assert ( + repr(Location("sftp://user@host:1234/rel/path")) + == "Location(proto='sftp', user='user', host='host', port=1234, path='rel/path')" + ) + assert Location("sftp://user@host:1234/rel/path").to_key_filename() == keys_dir + "host__rel_path" + # absolute path + assert ( + repr(Location("sftp://user@host:1234//abs/path")) + == "Location(proto='sftp', user='user', host='host', port=1234, path='/abs/path')" + ) + assert Location("sftp://user@host:1234//abs/path").to_key_filename() == keys_dir + "host___abs_path" + + def test_socket(self, monkeypatch, keys_dir): + monkeypatch.delenv("BORG_REPO", raising=False) + assert ( + repr(Location("socket:///repo/path")) + == "Location(proto='socket', user=None, host=None, port=None, path='/repo/path')" + ) + assert Location("socket:///some/path").to_key_filename() == keys_dir + "_some_path" + + def test_file(self, monkeypatch, keys_dir): + monkeypatch.delenv("BORG_REPO", raising=False) + assert ( + repr(Location("file:///some/path")) + == "Location(proto='file', user=None, host=None, port=None, path='/some/path')" + ) + assert ( + repr(Location("file:///some/path")) + == "Location(proto='file', user=None, host=None, port=None, path='/some/path')" + ) + assert Location("file:///some/path").to_key_filename() == keys_dir + "_some_path" + + def test_smb(self, monkeypatch, keys_dir): + monkeypatch.delenv("BORG_REPO", raising=False) + assert ( + repr(Location("file:////server/share/path")) + == "Location(proto='file', user=None, host=None, port=None, path='//server/share/path')" + ) + assert Location("file:////server/share/path").to_key_filename() == keys_dir + "__server_share_path" + + def test_folder(self, monkeypatch, keys_dir): + monkeypatch.delenv("BORG_REPO", raising=False) + rel_path = "path" + abs_path = os.path.abspath(rel_path) + assert repr(Location(rel_path)) == f"Location(proto='file', user=None, host=None, port=None, path='{abs_path}')" + assert Location("path").to_key_filename().endswith(rel_path) + + def test_abspath(self, monkeypatch, keys_dir): + monkeypatch.delenv("BORG_REPO", raising=False) + assert ( + repr(Location("/some/absolute/path")) + == "Location(proto='file', user=None, host=None, port=None, path='/some/absolute/path')" + ) + assert Location("/some/absolute/path").to_key_filename() == keys_dir + "_some_absolute_path" + assert ( + repr(Location("/some/../absolute/path")) + == "Location(proto='file', user=None, host=None, port=None, path='/absolute/path')" + ) + assert Location("/some/../absolute/path").to_key_filename() == keys_dir + "_absolute_path" + + def test_relpath(self, monkeypatch, keys_dir): + monkeypatch.delenv("BORG_REPO", raising=False) + # for a local path, borg creates a Location instance with an absolute path + rel_path = "relative/path" + abs_path = os.path.abspath(rel_path) + assert repr(Location(rel_path)) == f"Location(proto='file', user=None, host=None, port=None, path='{abs_path}')" + assert Location(rel_path).to_key_filename().endswith("relative_path") + assert ( + repr(Location("ssh://user@host/relative/path")) + == "Location(proto='ssh', user='user', host='host', port=None, path='relative/path')" + ) + assert Location("ssh://user@host/relative/path").to_key_filename() == keys_dir + "host__relative_path" + + def test_with_colons(self, monkeypatch, keys_dir): + monkeypatch.delenv("BORG_REPO", raising=False) + assert ( + repr(Location("/abs/path:w:cols")) + == "Location(proto='file', user=None, host=None, port=None, path='/abs/path:w:cols')" + ) + assert Location("/abs/path:w:cols").to_key_filename() == keys_dir + "_abs_path_w_cols" + assert ( + repr(Location("file:///abs/path:w:cols")) + == "Location(proto='file', user=None, host=None, port=None, path='/abs/path:w:cols')" + ) + assert Location("file:///abs/path:w:cols").to_key_filename() == keys_dir + "_abs_path_w_cols" + assert ( + repr(Location("ssh://user@host/abs/path:w:cols")) + == "Location(proto='ssh', user='user', host='host', port=None, path='abs/path:w:cols')" + ) + assert Location("ssh://user@host/abs/path:w:cols").to_key_filename() == keys_dir + "host__abs_path_w_cols" + + def test_canonical_path(self, monkeypatch): + monkeypatch.delenv("BORG_REPO", raising=False) + locations = [ + "relative/path", + "/absolute/path", + "file:///absolute/path", + "socket:///absolute/path", + "ssh://host/relative/path", + "ssh://host//absolute/path", + "ssh://user@host:1234/relative/path", + "sftp://host/relative/path", + "sftp://host//absolute/path", + "sftp://user@host:1234/relative/path", + "rclone:remote:path", + ] + for location in locations: + assert ( + Location(location).canonical_path() == Location(Location(location).canonical_path()).canonical_path() + ), ("failed: %s" % location) + + def test_bad_syntax(self): + with pytest.raises(ValueError): + # this is invalid due to the 2nd colon, correct: 'ssh://user@host/path' + Location("ssh://user@host:/path") + + +@pytest.mark.parametrize( + "name", + [ + "foobar", + # placeholders + "foobar-{now}", + ], +) +def test_archivename_ok(name): + assert archivename_validator(name) == name + + +@pytest.mark.parametrize( + "name", + [ + "", # too short + "x" * 201, # too long + # invalid chars: + "foo/bar", + "foo\\bar", + ">foo", + " datetime(2038, 1, 1) + assert utcfromtimestamp(safe_ns(beyond_y10k) / 1000000000) > datetime(2038, 1, 1) + else: + # ns fit into int64 + assert safe_ns(2**64) <= 2**63 - 1 + assert safe_ns(-1) == 0 + # s are so that their ns conversion fits into int64 + assert safe_s(2**64) * 1000000000 <= 2**63 - 1 + assert safe_s(-1) == 0 + # datetime won't fall over its y10k problem + beyond_y10k = 2**100 + with pytest.raises(OverflowError): + utcfromtimestamp(beyond_y10k) + assert utcfromtimestamp(safe_s(beyond_y10k)) > datetime(2262, 1, 1) + assert utcfromtimestamp(safe_ns(beyond_y10k) / 1000000000) > datetime(2262, 1, 1) diff --git a/src/borg/testsuite/helpers/yes_no_test.py b/src/borg/testsuite/helpers/yes_no_test.py new file mode 100644 index 000000000..508de36f7 --- /dev/null +++ b/src/borg/testsuite/helpers/yes_no_test.py @@ -0,0 +1,104 @@ +import pytest + +from ...helpers.yes_no import yes, TRUISH, FALSISH, DEFAULTISH +from .. import FakeInputs + + +def test_yes_input(): + inputs = list(TRUISH) + input = FakeInputs(inputs) + for i in inputs: + assert yes(input=input) + inputs = list(FALSISH) + input = FakeInputs(inputs) + for i in inputs: + assert not yes(input=input) + + +def test_yes_input_defaults(): + inputs = list(DEFAULTISH) + input = FakeInputs(inputs) + for i in inputs: + assert yes(default=True, input=input) + input = FakeInputs(inputs) + for i in inputs: + assert not yes(default=False, input=input) + + +def test_yes_input_custom(): + input = FakeInputs(["YES", "SURE", "NOPE"]) + assert yes(truish=("YES",), input=input) + assert yes(truish=("SURE",), input=input) + assert not yes(falsish=("NOPE",), input=input) + + +def test_yes_env(monkeypatch): + for value in TRUISH: + monkeypatch.setenv("OVERRIDE_THIS", value) + assert yes(env_var_override="OVERRIDE_THIS") + for value in FALSISH: + monkeypatch.setenv("OVERRIDE_THIS", value) + assert not yes(env_var_override="OVERRIDE_THIS") + + +def test_yes_env_default(monkeypatch): + for value in DEFAULTISH: + monkeypatch.setenv("OVERRIDE_THIS", value) + assert yes(env_var_override="OVERRIDE_THIS", default=True) + assert not yes(env_var_override="OVERRIDE_THIS", default=False) + + +def test_yes_defaults(): + input = FakeInputs(["invalid", "", " "]) + assert not yes(input=input) # default=False + assert not yes(input=input) + assert not yes(input=input) + input = FakeInputs(["invalid", "", " "]) + assert yes(default=True, input=input) + assert yes(default=True, input=input) + assert yes(default=True, input=input) + input = FakeInputs([]) + assert yes(default=True, input=input) + assert not yes(default=False, input=input) + with pytest.raises(ValueError): + yes(default=None) + + +def test_yes_retry(): + input = FakeInputs(["foo", "bar", TRUISH[0]]) + assert yes(retry_msg="Retry: ", input=input) + input = FakeInputs(["foo", "bar", FALSISH[0]]) + assert not yes(retry_msg="Retry: ", input=input) + + +def test_yes_no_retry(): + input = FakeInputs(["foo", "bar", TRUISH[0]]) + assert not yes(retry=False, default=False, input=input) + input = FakeInputs(["foo", "bar", FALSISH[0]]) + assert yes(retry=False, default=True, input=input) + + +def test_yes_output(capfd): + input = FakeInputs(["invalid", "y", "n"]) + assert yes(msg="intro-msg", false_msg="false-msg", true_msg="true-msg", retry_msg="retry-msg", input=input) + out, err = capfd.readouterr() + assert out == "" + assert "intro-msg" in err + assert "retry-msg" in err + assert "true-msg" in err + assert not yes(msg="intro-msg", false_msg="false-msg", true_msg="true-msg", retry_msg="retry-msg", input=input) + out, err = capfd.readouterr() + assert out == "" + assert "intro-msg" in err + assert "retry-msg" not in err + assert "false-msg" in err + + +def test_yes_env_output(capfd, monkeypatch): + env_var = "OVERRIDE_SOMETHING" + monkeypatch.setenv(env_var, "yes") + assert yes(env_var_override=env_var) + out, err = capfd.readouterr() + assert out == "" + assert env_var in err + assert "yes" in err diff --git a/src/borg/testsuite/helpers_test.py b/src/borg/testsuite/helpers_test.py deleted file mode 100644 index 4e0736ad0..000000000 --- a/src/borg/testsuite/helpers_test.py +++ /dev/null @@ -1,1646 +0,0 @@ -import base64 -import errno -import getpass -import hashlib -import os -import shutil -import sys -from argparse import ArgumentTypeError -from contextlib import contextmanager -from datetime import datetime, timezone, timedelta -from io import StringIO, BytesIO - -import pytest - -from ..archiver.prune_cmd import prune_within, prune_split -from .. import platform -from ..constants import * # NOQA -from ..constants import CACHE_TAG_NAME, CACHE_TAG_CONTENTS -from ..helpers.fs import dir_is_tagged -from ..helpers import Location -from ..helpers import Buffer -from ..helpers import ( - partial_format, - format_file_size, - parse_file_size, - format_timedelta, - format_line, - PlaceholderError, - replace_placeholders, -) -from ..helpers import remove_dotdot_prefixes, make_path_safe, clean_lines -from ..helpers import interval -from ..helpers import get_base_dir, get_cache_dir, get_keys_dir, get_security_dir, get_config_dir, get_runtime_dir -from ..helpers import is_slow_msgpack -from ..helpers import msgpack -from ..helpers import yes, TRUISH, FALSISH, DEFAULTISH -from ..helpers import StableDict, bin_to_hex -from ..helpers import parse_timestamp, ChunkIteratorFileWrapper, ChunkerParams -from ..helpers import archivename_validator, text_validator -from ..helpers import ProgressIndicatorPercent -from ..helpers import swidth_slice -from ..helpers import chunkit -from ..helpers import safe_ns, safe_s, SUPPORT_32BIT_PLATFORMS -from ..helpers import popen_with_error_handling -from ..helpers import dash_open -from ..helpers import iter_separated -from ..helpers import eval_escapes -from ..helpers import safe_unlink -from ..helpers import text_to_json, binary_to_json -from ..helpers import classify_ec, max_ec -from ..helpers.passphrase import Passphrase, PasswordRetriesExceeded -from ..platform import is_cygwin, is_win32, is_darwin -from . import FakeInputs, are_hardlinks_supported -from . import rejected_dotdot_paths - - -def test_bin_to_hex(): - assert bin_to_hex(b"") == "" - assert bin_to_hex(b"\x00\x01\xff") == "0001ff" - - -@pytest.mark.parametrize( - "key,value", - [("key", b"\x00\x01\x02\x03"), ("key", b"\x00\x01\x02"), ("key", b"\x00\x01"), ("key", b"\x00"), ("key", b"")], -) -def test_binary_to_json(key, value): - key_b64 = key + "_b64" - d = binary_to_json(key, value) - assert key_b64 in d - assert base64.b64decode(d[key_b64]) == value - - -@pytest.mark.parametrize( - "key,value,strict", - [ - ("key", "abc", True), - ("key", "äöü", True), - ("key", "", True), - ("key", b"\x00\xff".decode("utf-8", errors="surrogateescape"), False), - ("key", "äöü".encode("latin1").decode("utf-8", errors="surrogateescape"), False), - ], -) -def test_text_to_json(key, value, strict): - key_b64 = key + "_b64" - d = text_to_json(key, value) - value_b = value.encode("utf-8", errors="surrogateescape") - if strict: - # no surrogate-escapes, just unicode text - assert key in d - assert d[key] == value_b.decode("utf-8", errors="strict") - assert d[key].encode("utf-8", errors="strict") == value_b - assert key_b64 not in d # not needed. pure valid unicode. - else: - # requiring surrogate-escapes. text has replacement chars, base64 representation is present. - assert key in d - assert d[key] == value.encode("utf-8", errors="replace").decode("utf-8", errors="strict") - assert d[key].encode("utf-8", errors="strict") == value.encode("utf-8", errors="replace") - assert key_b64 in d - assert base64.b64decode(d[key_b64]) == value_b - - -class TestLocationWithoutEnv: - @pytest.fixture - def keys_dir(self, tmpdir, monkeypatch): - tmpdir = str(tmpdir) - monkeypatch.setenv("BORG_KEYS_DIR", tmpdir) - if not tmpdir.endswith(os.path.sep): - tmpdir += os.path.sep - return tmpdir - - def test_ssh(self, monkeypatch, keys_dir): - monkeypatch.delenv("BORG_REPO", raising=False) - assert ( - repr(Location("ssh://user@host:1234//absolute/path")) - == "Location(proto='ssh', user='user', host='host', port=1234, path='/absolute/path')" - ) - assert Location("ssh://user@host:1234//absolute/path").to_key_filename() == keys_dir + "host___absolute_path" - assert ( - repr(Location("ssh://user@host:1234/relative/path")) - == "Location(proto='ssh', user='user', host='host', port=1234, path='relative/path')" - ) - assert Location("ssh://user@host:1234/relative/path").to_key_filename() == keys_dir + "host__relative_path" - assert ( - repr(Location("ssh://user@host/relative/path")) - == "Location(proto='ssh', user='user', host='host', port=None, path='relative/path')" - ) - assert ( - repr(Location("ssh://user@[::]:1234/relative/path")) - == "Location(proto='ssh', user='user', host='::', port=1234, path='relative/path')" - ) - assert Location("ssh://user@[::]:1234/relative/path").to_key_filename() == keys_dir + "____relative_path" - assert ( - repr(Location("ssh://user@[::]/relative/path")) - == "Location(proto='ssh', user='user', host='::', port=None, path='relative/path')" - ) - assert ( - repr(Location("ssh://user@[2001:db8::]:1234/relative/path")) - == "Location(proto='ssh', user='user', host='2001:db8::', port=1234, path='relative/path')" - ) - assert ( - Location("ssh://user@[2001:db8::]:1234/relative/path").to_key_filename() - == keys_dir + "2001_db8____relative_path" - ) - assert ( - repr(Location("ssh://user@[2001:db8::]/relative/path")) - == "Location(proto='ssh', user='user', host='2001:db8::', port=None, path='relative/path')" - ) - assert ( - repr(Location("ssh://user@[2001:db8::c0:ffee]:1234/relative/path")) - == "Location(proto='ssh', user='user', host='2001:db8::c0:ffee', port=1234, path='relative/path')" - ) - assert ( - repr(Location("ssh://user@[2001:db8::c0:ffee]/relative/path")) - == "Location(proto='ssh', user='user', host='2001:db8::c0:ffee', port=None, path='relative/path')" - ) - assert ( - repr(Location("ssh://user@[2001:db8::192.0.2.1]:1234/relative/path")) - == "Location(proto='ssh', user='user', host='2001:db8::192.0.2.1', port=1234, path='relative/path')" - ) - assert ( - repr(Location("ssh://user@[2001:db8::192.0.2.1]/relative/path")) - == "Location(proto='ssh', user='user', host='2001:db8::192.0.2.1', port=None, path='relative/path')" - ) - assert ( - Location("ssh://user@[2001:db8::192.0.2.1]/relative/path").to_key_filename() - == keys_dir + "2001_db8__192_0_2_1__relative_path" - ) - assert ( - repr(Location("ssh://user@[2a02:0001:0002:0003:0004:0005:0006:0007]/relative/path")) - == "Location(proto='ssh', user='user', " - "host='2a02:0001:0002:0003:0004:0005:0006:0007', port=None, path='relative/path')" - ) - assert ( - repr(Location("ssh://user@[2a02:0001:0002:0003:0004:0005:0006:0007]:1234/relative/path")) - == "Location(proto='ssh', user='user', " - "host='2a02:0001:0002:0003:0004:0005:0006:0007', port=1234, path='relative/path')" - ) - - def test_rclone(self, monkeypatch, keys_dir): - monkeypatch.delenv("BORG_REPO", raising=False) - assert ( - repr(Location("rclone:remote:path")) - == "Location(proto='rclone', user=None, host=None, port=None, path='remote:path')" - ) - assert Location("rclone:remote:path").to_key_filename() == keys_dir + "remote_path" - - def test_sftp(self, monkeypatch, keys_dir): - monkeypatch.delenv("BORG_REPO", raising=False) - # relative path - assert ( - repr(Location("sftp://user@host:1234/rel/path")) - == "Location(proto='sftp', user='user', host='host', port=1234, path='rel/path')" - ) - assert Location("sftp://user@host:1234/rel/path").to_key_filename() == keys_dir + "host__rel_path" - # absolute path - assert ( - repr(Location("sftp://user@host:1234//abs/path")) - == "Location(proto='sftp', user='user', host='host', port=1234, path='/abs/path')" - ) - assert Location("sftp://user@host:1234//abs/path").to_key_filename() == keys_dir + "host___abs_path" - - def test_socket(self, monkeypatch, keys_dir): - monkeypatch.delenv("BORG_REPO", raising=False) - assert ( - repr(Location("socket:///repo/path")) - == "Location(proto='socket', user=None, host=None, port=None, path='/repo/path')" - ) - assert Location("socket:///some/path").to_key_filename() == keys_dir + "_some_path" - - def test_file(self, monkeypatch, keys_dir): - monkeypatch.delenv("BORG_REPO", raising=False) - assert ( - repr(Location("file:///some/path")) - == "Location(proto='file', user=None, host=None, port=None, path='/some/path')" - ) - assert ( - repr(Location("file:///some/path")) - == "Location(proto='file', user=None, host=None, port=None, path='/some/path')" - ) - assert Location("file:///some/path").to_key_filename() == keys_dir + "_some_path" - - def test_smb(self, monkeypatch, keys_dir): - monkeypatch.delenv("BORG_REPO", raising=False) - assert ( - repr(Location("file:////server/share/path")) - == "Location(proto='file', user=None, host=None, port=None, path='//server/share/path')" - ) - assert Location("file:////server/share/path").to_key_filename() == keys_dir + "__server_share_path" - - def test_folder(self, monkeypatch, keys_dir): - monkeypatch.delenv("BORG_REPO", raising=False) - rel_path = "path" - abs_path = os.path.abspath(rel_path) - assert repr(Location(rel_path)) == f"Location(proto='file', user=None, host=None, port=None, path='{abs_path}')" - assert Location("path").to_key_filename().endswith(rel_path) - - def test_abspath(self, monkeypatch, keys_dir): - monkeypatch.delenv("BORG_REPO", raising=False) - assert ( - repr(Location("/absolute/path")) - == "Location(proto='file', user=None, host=None, port=None, path='/absolute/path')" - ) - assert Location("/absolute/path").to_key_filename() == keys_dir + "_absolute_path" - assert ( - repr(Location("ssh://user@host//absolute/path")) - == "Location(proto='ssh', user='user', host='host', port=None, path='/absolute/path')" - ) - assert Location("ssh://user@host//absolute/path").to_key_filename() == keys_dir + "host___absolute_path" - - def test_relpath(self, monkeypatch, keys_dir): - monkeypatch.delenv("BORG_REPO", raising=False) - # for a local path, borg creates a Location instance with an absolute path - rel_path = "relative/path" - abs_path = os.path.abspath(rel_path) - assert repr(Location(rel_path)) == f"Location(proto='file', user=None, host=None, port=None, path='{abs_path}')" - assert Location(rel_path).to_key_filename().endswith("relative_path") - assert ( - repr(Location("ssh://user@host/relative/path")) - == "Location(proto='ssh', user='user', host='host', port=None, path='relative/path')" - ) - assert Location("ssh://user@host/relative/path").to_key_filename() == keys_dir + "host__relative_path" - - def test_with_colons(self, monkeypatch, keys_dir): - monkeypatch.delenv("BORG_REPO", raising=False) - assert ( - repr(Location("/abs/path:w:cols")) - == "Location(proto='file', user=None, host=None, port=None, path='/abs/path:w:cols')" - ) - assert ( - repr(Location("/abs/path:with:colons")) - == "Location(proto='file', user=None, host=None, port=None, path='/abs/path:with:colons')" - ) - assert ( - repr(Location("/abs/path:with:colons")) - == "Location(proto='file', user=None, host=None, port=None, path='/abs/path:with:colons')" - ) - assert Location("/abs/path:with:colons").to_key_filename() == keys_dir + "_abs_path_with_colons" - - def test_canonical_path(self, monkeypatch): - monkeypatch.delenv("BORG_REPO", raising=False) - locations = [ - "relative/path", - "/absolute/path", - "file:///absolute/path", - "socket:///absolute/path", - "ssh://host/relative/path", - "ssh://host//absolute/path", - "ssh://user@host:1234/relative/path", - "sftp://host/relative/path", - "sftp://host//absolute/path", - "sftp://user@host:1234/relative/path", - "rclone:remote:path", - ] - for location in locations: - assert ( - Location(location).canonical_path() == Location(Location(location).canonical_path()).canonical_path() - ), ("failed: %s" % location) - - def test_bad_syntax(self): - with pytest.raises(ValueError): - # this is invalid due to the 2nd colon, correct: 'ssh://user@host/path' - Location("ssh://user@host:/path") - - -@pytest.mark.parametrize( - "name", - [ - "foobar", - # placeholders - "foobar-{now}", - ], -) -def test_archivename_ok(name): - archivename_validator(name) # must not raise an exception - - -@pytest.mark.parametrize( - "name", - [ - "", # too short - "x" * 201, # too long - # invalid chars: - "foo/bar", - "foo\\bar", - ">foo", - "= 100 - b3 = buffer.get() - assert b3 is b2 # still same buffer (200) - buffer.resize(100, init=True) - assert len(buffer) == 100 # except on init - b4 = buffer.get() - assert b4 is not b3 # new, smaller buffer - - def test_limit(self): - buffer = Buffer(bytearray, size=100, limit=200) - buffer.resize(200) - assert len(buffer) == 200 - with pytest.raises(Buffer.MemoryLimitExceeded): - buffer.resize(201) - assert len(buffer) == 200 - - def test_get(self): - buffer = Buffer(bytearray, size=100, limit=200) - b1 = buffer.get(50) - assert len(b1) >= 50 # == 100 - b2 = buffer.get(100) - assert len(b2) >= 100 # == 100 - assert b2 is b1 # did not need resizing yet - b3 = buffer.get(200) - assert len(b3) == 200 - assert b3 is not b2 # new, resized buffer - with pytest.raises(Buffer.MemoryLimitExceeded): - buffer.get(201) # beyond limit - assert len(buffer) == 200 - - -def test_yes_input(): - inputs = list(TRUISH) - input = FakeInputs(inputs) - for i in inputs: - assert yes(input=input) - inputs = list(FALSISH) - input = FakeInputs(inputs) - for i in inputs: - assert not yes(input=input) - - -def test_yes_input_defaults(): - inputs = list(DEFAULTISH) - input = FakeInputs(inputs) - for i in inputs: - assert yes(default=True, input=input) - input = FakeInputs(inputs) - for i in inputs: - assert not yes(default=False, input=input) - - -def test_yes_input_custom(): - input = FakeInputs(["YES", "SURE", "NOPE"]) - assert yes(truish=("YES",), input=input) - assert yes(truish=("SURE",), input=input) - assert not yes(falsish=("NOPE",), input=input) - - -def test_yes_env(monkeypatch): - for value in TRUISH: - monkeypatch.setenv("OVERRIDE_THIS", value) - assert yes(env_var_override="OVERRIDE_THIS") - for value in FALSISH: - monkeypatch.setenv("OVERRIDE_THIS", value) - assert not yes(env_var_override="OVERRIDE_THIS") - - -def test_yes_env_default(monkeypatch): - for value in DEFAULTISH: - monkeypatch.setenv("OVERRIDE_THIS", value) - assert yes(env_var_override="OVERRIDE_THIS", default=True) - assert not yes(env_var_override="OVERRIDE_THIS", default=False) - - -def test_yes_defaults(): - input = FakeInputs(["invalid", "", " "]) - assert not yes(input=input) # default=False - assert not yes(input=input) - assert not yes(input=input) - input = FakeInputs(["invalid", "", " "]) - assert yes(default=True, input=input) - assert yes(default=True, input=input) - assert yes(default=True, input=input) - input = FakeInputs([]) - assert yes(default=True, input=input) - assert not yes(default=False, input=input) - with pytest.raises(ValueError): - yes(default=None) - - -def test_yes_retry(): - input = FakeInputs(["foo", "bar", TRUISH[0]]) - assert yes(retry_msg="Retry: ", input=input) - input = FakeInputs(["foo", "bar", FALSISH[0]]) - assert not yes(retry_msg="Retry: ", input=input) - - -def test_yes_no_retry(): - input = FakeInputs(["foo", "bar", TRUISH[0]]) - assert not yes(retry=False, default=False, input=input) - input = FakeInputs(["foo", "bar", FALSISH[0]]) - assert yes(retry=False, default=True, input=input) - - -def test_yes_output(capfd): - input = FakeInputs(["invalid", "y", "n"]) - assert yes(msg="intro-msg", false_msg="false-msg", true_msg="true-msg", retry_msg="retry-msg", input=input) - out, err = capfd.readouterr() - assert out == "" - assert "intro-msg" in err - assert "retry-msg" in err - assert "true-msg" in err - assert not yes(msg="intro-msg", false_msg="false-msg", true_msg="true-msg", retry_msg="retry-msg", input=input) - out, err = capfd.readouterr() - assert out == "" - assert "intro-msg" in err - assert "retry-msg" not in err - assert "false-msg" in err - - -def test_yes_env_output(capfd, monkeypatch): - env_var = "OVERRIDE_SOMETHING" - monkeypatch.setenv(env_var, "yes") - assert yes(env_var_override=env_var) - out, err = capfd.readouterr() - assert out == "" - assert env_var in err - assert "yes" in err - - -def test_progress_percentage(capfd): - pi = ProgressIndicatorPercent(1000, step=5, start=0, msg="%3.0f%%") - pi.logger.setLevel("INFO") - pi.show(0) - out, err = capfd.readouterr() - assert err == " 0%\n" - pi.show(420) - pi.show(680) - out, err = capfd.readouterr() - assert err == " 42%\n 68%\n" - pi.show(1000) - out, err = capfd.readouterr() - assert err == "100%\n" - pi.finish() - out, err = capfd.readouterr() - assert err == "\n" - - -def test_progress_percentage_step(capfd): - pi = ProgressIndicatorPercent(100, step=2, start=0, msg="%3.0f%%") - pi.logger.setLevel("INFO") - pi.show() - out, err = capfd.readouterr() - assert err == " 0%\n" - pi.show() - out, err = capfd.readouterr() - assert err == "" # no output at 1% as we have step == 2 - pi.show() - out, err = capfd.readouterr() - assert err == " 2%\n" - - -def test_progress_percentage_quiet(capfd): - pi = ProgressIndicatorPercent(1000, step=5, start=0, msg="%3.0f%%") - pi.logger.setLevel("WARN") - pi.show(0) - out, err = capfd.readouterr() - assert err == "" - pi.show(1000) - out, err = capfd.readouterr() - assert err == "" - pi.finish() - out, err = capfd.readouterr() - assert err == "" - - -@pytest.mark.parametrize( - "fmt, items_map, expected_result", - [ - ("{space:10}", {"space": " "}, " " * 10), - ("{foobar}", {"bar": "wrong", "foobar": "correct"}, "correct"), - ("{unknown_key}", {}, "{unknown_key}"), - ("{key}{{escaped_key}}", {}, "{key}{{escaped_key}}"), - ("{{escaped_key}}", {"escaped_key": 1234}, "{{escaped_key}}"), - ], -) -def test_partial_format(fmt, items_map, expected_result): - assert partial_format(fmt, items_map) == expected_result - - -def test_chunk_file_wrapper(): - cfw = ChunkIteratorFileWrapper(iter([b"abc", b"def"])) - assert cfw.read(2) == b"ab" - assert cfw.read(50) == b"cdef" - assert cfw.exhausted - - cfw = ChunkIteratorFileWrapper(iter([])) - assert cfw.read(2) == b"" - assert cfw.exhausted - - -def test_chunkit(): - it = chunkit("abcdefg", 3) - assert next(it) == ["a", "b", "c"] - assert next(it) == ["d", "e", "f"] - assert next(it) == ["g"] - with pytest.raises(StopIteration): - next(it) - with pytest.raises(StopIteration): - next(it) - - it = chunkit("ab", 3) - assert list(it) == [["a", "b"]] - - it = chunkit("", 3) - assert list(it) == [] - - -def test_clean_lines(): - conf = """\ -#comment -data1 #data1 -data2 - - data3 -""".splitlines( - keepends=True - ) - assert list(clean_lines(conf)) == ["data1 #data1", "data2", "data3"] - assert list(clean_lines(conf, lstrip=False)) == ["data1 #data1", "data2", " data3"] - assert list(clean_lines(conf, rstrip=False)) == ["data1 #data1\n", "data2\n", "data3\n"] - assert list(clean_lines(conf, remove_empty=False)) == ["data1 #data1", "data2", "", "data3"] - assert list(clean_lines(conf, remove_comments=False)) == ["#comment", "data1 #data1", "data2", "data3"] - - -def test_format_line(): - data = dict(foo="bar baz") - assert format_line("", data) == "" - assert format_line("{foo}", data) == "bar baz" - assert format_line("foo{foo}foo", data) == "foobar bazfoo" - - -def test_format_line_erroneous(): - data = dict() - with pytest.raises(PlaceholderError): - assert format_line("{invalid}", data) - with pytest.raises(PlaceholderError): - assert format_line("{}", data) - with pytest.raises(PlaceholderError): - assert format_line("{now!r}", data) - with pytest.raises(PlaceholderError): - assert format_line("{now.__class__.__module__.__builtins__}", data) - - -def test_replace_placeholders(): - replace_placeholders.reset() # avoid overrides are spoiled by previous tests - now = datetime.now() - assert " " not in replace_placeholders("{now}") - assert int(replace_placeholders("{now:%Y}")) == now.year - - -def test_override_placeholders(): - assert replace_placeholders("{uuid4}", overrides={"uuid4": "overridden"}) == "overridden" - - -def working_swidth(): - return platform.swidth("선") == 2 - - -@pytest.mark.skipif(not working_swidth(), reason="swidth() is not supported / active") -def test_swidth_slice(): - string = "나윤선나윤선나윤선나윤선나윤선" - assert swidth_slice(string, 1) == "" - assert swidth_slice(string, -1) == "" - assert swidth_slice(string, 4) == "나윤" - assert swidth_slice(string, -4) == "윤선" - - -@pytest.mark.skipif(not working_swidth(), reason="swidth() is not supported / active") -def test_swidth_slice_mixed_characters(): - string = "나윤a선나윤선나윤선나윤선나윤선" - assert swidth_slice(string, 5) == "나윤a" - assert swidth_slice(string, 6) == "나윤a" - - -def utcfromtimestamp(timestamp): - """Returns a naive datetime instance representing the timestamp in the UTC timezone""" - return datetime.fromtimestamp(timestamp, timezone.utc).replace(tzinfo=None) - - -def test_safe_timestamps(): - if SUPPORT_32BIT_PLATFORMS: - # ns fit into int64 - assert safe_ns(2**64) <= 2**63 - 1 - assert safe_ns(-1) == 0 - # s fit into int32 - assert safe_s(2**64) <= 2**31 - 1 - assert safe_s(-1) == 0 - # datetime won't fall over its y10k problem - beyond_y10k = 2**100 - with pytest.raises(OverflowError): - utcfromtimestamp(beyond_y10k) - assert utcfromtimestamp(safe_s(beyond_y10k)) > datetime(2038, 1, 1) - assert utcfromtimestamp(safe_ns(beyond_y10k) / 1000000000) > datetime(2038, 1, 1) - else: - # ns fit into int64 - assert safe_ns(2**64) <= 2**63 - 1 - assert safe_ns(-1) == 0 - # s are so that their ns conversion fits into int64 - assert safe_s(2**64) * 1000000000 <= 2**63 - 1 - assert safe_s(-1) == 0 - # datetime won't fall over its y10k problem - beyond_y10k = 2**100 - with pytest.raises(OverflowError): - utcfromtimestamp(beyond_y10k) - assert utcfromtimestamp(safe_s(beyond_y10k)) > datetime(2262, 1, 1) - assert utcfromtimestamp(safe_ns(beyond_y10k) / 1000000000) > datetime(2262, 1, 1) - - -class TestPopenWithErrorHandling: - @pytest.mark.skipif(not shutil.which("test"), reason='"test" binary is needed') - def test_simple(self): - proc = popen_with_error_handling("test 1") - assert proc.wait() == 0 - - @pytest.mark.skipif( - shutil.which("borg-foobar-test-notexist"), reason='"borg-foobar-test-notexist" binary exists (somehow?)' - ) - def test_not_found(self): - proc = popen_with_error_handling("borg-foobar-test-notexist 1234") - assert proc is None - - @pytest.mark.parametrize("cmd", ('mismatched "quote', 'foo --bar="baz', "")) - def test_bad_syntax(self, cmd): - proc = popen_with_error_handling(cmd) - assert proc is None - - def test_shell(self): - with pytest.raises(AssertionError): - popen_with_error_handling("", shell=True) - - -def test_dash_open(): - assert dash_open("-", "r") is sys.stdin - assert dash_open("-", "w") is sys.stdout - assert dash_open("-", "rb") is sys.stdin.buffer - assert dash_open("-", "wb") is sys.stdout.buffer - - -def test_iter_separated(): - # newline and utf-8 - sep, items = "\n", ["foo", "bar/baz", "αáčő"] - fd = StringIO(sep.join(items)) - assert list(iter_separated(fd)) == items - # null and bogus ending - sep, items = "\0", ["foo/bar", "baz", "spam"] - fd = StringIO(sep.join(items) + "\0") - assert list(iter_separated(fd, sep=sep)) == ["foo/bar", "baz", "spam"] - # multichar - sep, items = "SEP", ["foo/bar", "baz", "spam"] - fd = StringIO(sep.join(items)) - assert list(iter_separated(fd, sep=sep)) == items - # bytes - sep, items = b"\n", [b"foo", b"blop\t", b"gr\xe4ezi"] - fd = BytesIO(sep.join(items)) - assert list(iter_separated(fd)) == items - - -def test_eval_escapes(): - assert eval_escapes("\\n\\0\\x23") == "\n\0#" - assert eval_escapes("äç\\n") == "äç\n" - - -@pytest.mark.skipif(not are_hardlinks_supported(), reason="hardlinks not supported") -def test_safe_unlink_is_safe(tmpdir): - contents = b"Hello, world\n" - victim = tmpdir / "victim" - victim.write_binary(contents) - hard_link = tmpdir / "hardlink" - os.link(str(victim), str(hard_link)) # hard_link.mklinkto is not implemented on win32 - - safe_unlink(hard_link) - - assert victim.read_binary() == contents - - -@pytest.mark.skipif(not are_hardlinks_supported(), reason="hardlinks not supported") -def test_safe_unlink_is_safe_ENOSPC(tmpdir, monkeypatch): - contents = b"Hello, world\n" - victim = tmpdir / "victim" - victim.write_binary(contents) - hard_link = tmpdir / "hardlink" - os.link(str(victim), str(hard_link)) # hard_link.mklinkto is not implemented on win32 - - def os_unlink(_): - raise OSError(errno.ENOSPC, "Pretend that we ran out of space") - - monkeypatch.setattr(os, "unlink", os_unlink) - - with pytest.raises(OSError): - safe_unlink(hard_link) - - assert victim.read_binary() == contents - - -class TestPassphrase: - def test_passphrase_new_verification(self, capsys, monkeypatch): - monkeypatch.setattr(getpass, "getpass", lambda prompt: "1234aöäü") - monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "no") - Passphrase.new() - out, err = capsys.readouterr() - assert "1234" not in out - assert "1234" not in err - - monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "yes") - passphrase = Passphrase.new() - out, err = capsys.readouterr() - assert "3132333461c3b6c3a4c3bc" not in out - assert "3132333461c3b6c3a4c3bc" in err - assert passphrase == "1234aöäü" - - monkeypatch.setattr(getpass, "getpass", lambda prompt: "1234/@=") - Passphrase.new() - out, err = capsys.readouterr() - assert "1234/@=" not in out - assert "1234/@=" in err - - def test_passphrase_new_empty(self, capsys, monkeypatch): - monkeypatch.delenv("BORG_PASSPHRASE", False) - monkeypatch.setattr(getpass, "getpass", lambda prompt: "") - with pytest.raises(PasswordRetriesExceeded): - Passphrase.new(allow_empty=False) - out, err = capsys.readouterr() - assert "must not be blank" in err - - def test_passphrase_new_retries(self, monkeypatch): - monkeypatch.delenv("BORG_PASSPHRASE", False) - ascending_numbers = iter(range(20)) - monkeypatch.setattr(getpass, "getpass", lambda prompt: str(next(ascending_numbers))) - with pytest.raises(PasswordRetriesExceeded): - Passphrase.new() - - def test_passphrase_repr(self): - assert "secret" not in repr(Passphrase("secret")) - - def test_passphrase_wrong_debug(self, capsys, monkeypatch): - passphrase = "wrong_passphrase" - monkeypatch.setenv("BORG_DEBUG_PASSPHRASE", "YES") - monkeypatch.setenv("BORG_PASSPHRASE", "env_passphrase") - monkeypatch.setenv("BORG_PASSCOMMAND", "command") - monkeypatch.setenv("BORG_PASSPHRASE_FD", "fd_value") - - Passphrase.display_debug_info(passphrase) - - out, err = capsys.readouterr() - assert "Incorrect passphrase!" in err - assert passphrase in err - assert bin_to_hex(passphrase.encode("utf-8")) in err - assert 'BORG_PASSPHRASE = "env_passphrase"' in err - assert 'BORG_PASSCOMMAND = "command"' in err - assert 'BORG_PASSPHRASE_FD = "fd_value"' in err - - monkeypatch.delenv("BORG_DEBUG_PASSPHRASE", raising=False) - Passphrase.display_debug_info(passphrase) - out, err = capsys.readouterr() - - assert "Incorrect passphrase!" not in err - assert passphrase not in err - - def test_verification(self, capsys, monkeypatch): - passphrase = "test_passphrase" - hex_value = passphrase.encode("utf-8").hex() - - monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "no") - Passphrase.verification(passphrase) - out, err = capsys.readouterr() - assert passphrase not in err - - monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "yes") - Passphrase.verification(passphrase) - out, err = capsys.readouterr() - assert passphrase in err - assert hex_value in err - - -@pytest.mark.parametrize( - "ec_range,ec_class", - ( - # inclusive range start, exclusive range end - ((0, 1), "success"), - ((1, 2), "warning"), - ((2, 3), "error"), - ((EXIT_ERROR_BASE, EXIT_WARNING_BASE), "error"), - ((EXIT_WARNING_BASE, EXIT_SIGNAL_BASE), "warning"), - ((EXIT_SIGNAL_BASE, 256), "signal"), - ), -) -def test_classify_ec(ec_range, ec_class): - for ec in range(*ec_range): - classify_ec(ec) == ec_class - - -def test_ec_invalid(): - with pytest.raises(ValueError): - classify_ec(666) - with pytest.raises(ValueError): - classify_ec(-1) - with pytest.raises(TypeError): - classify_ec(None) - - -@pytest.mark.parametrize( - "ec1,ec2,ec_max", - ( - # same for modern / legacy - (EXIT_SUCCESS, EXIT_SUCCESS, EXIT_SUCCESS), - (EXIT_SUCCESS, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE), - # legacy exit codes - (EXIT_SUCCESS, EXIT_WARNING, EXIT_WARNING), - (EXIT_SUCCESS, EXIT_ERROR, EXIT_ERROR), - (EXIT_WARNING, EXIT_SUCCESS, EXIT_WARNING), - (EXIT_WARNING, EXIT_WARNING, EXIT_WARNING), - (EXIT_WARNING, EXIT_ERROR, EXIT_ERROR), - (EXIT_WARNING, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE), - (EXIT_ERROR, EXIT_SUCCESS, EXIT_ERROR), - (EXIT_ERROR, EXIT_WARNING, EXIT_ERROR), - (EXIT_ERROR, EXIT_ERROR, EXIT_ERROR), - (EXIT_ERROR, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE), - # some modern codes - (EXIT_SUCCESS, EXIT_WARNING_BASE, EXIT_WARNING_BASE), - (EXIT_SUCCESS, EXIT_ERROR_BASE, EXIT_ERROR_BASE), - (EXIT_WARNING_BASE, EXIT_SUCCESS, EXIT_WARNING_BASE), - (EXIT_WARNING_BASE + 1, EXIT_WARNING_BASE + 2, EXIT_WARNING_BASE + 1), - (EXIT_WARNING_BASE, EXIT_ERROR_BASE, EXIT_ERROR_BASE), - (EXIT_WARNING_BASE, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE), - (EXIT_ERROR_BASE, EXIT_SUCCESS, EXIT_ERROR_BASE), - (EXIT_ERROR_BASE, EXIT_WARNING_BASE, EXIT_ERROR_BASE), - (EXIT_ERROR_BASE + 1, EXIT_ERROR_BASE + 2, EXIT_ERROR_BASE + 1), - (EXIT_ERROR_BASE, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE), - ), -) -def test_max_ec(ec1, ec2, ec_max): - assert max_ec(ec1, ec2) == ec_max - - -def test_dir_is_tagged(tmpdir): - """Test dir_is_tagged with both path-based and file descriptor-based operations.""" - - @contextmanager - def open_dir(path): - fd = os.open(path, os.O_RDONLY) - try: - yield fd - finally: - os.close(fd) - - # Create directories for testing exclude_caches - cache_dir = tmpdir.mkdir("cache_dir") - cache_tag_path = cache_dir.join(CACHE_TAG_NAME) - cache_tag_path.write_binary(CACHE_TAG_CONTENTS) - - invalid_cache_dir = tmpdir.mkdir("invalid_cache_dir") - invalid_cache_tag_path = invalid_cache_dir.join(CACHE_TAG_NAME) - invalid_cache_tag_path.write_binary(b"invalid signature") - - # Create directories for testing exclude_if_present - tagged_dir = tmpdir.mkdir("tagged_dir") - tag_file = tagged_dir.join(".NOBACKUP") - tag_file.write("test") - - other_tagged_dir = tmpdir.mkdir("other_tagged_dir") - other_tag_file = other_tagged_dir.join(".DONOTBACKUP") - other_tag_file.write("test") - - # Create a directory with both a CACHEDIR.TAG and a custom tag file - both_dir = tmpdir.mkdir("both_dir") - cache_tag_path = both_dir.join(CACHE_TAG_NAME) - cache_tag_path.write_binary(CACHE_TAG_CONTENTS) - custom_tag_path = both_dir.join(".NOBACKUP") - custom_tag_path.write("test") - - # Create a directory without any tag files - normal_dir = tmpdir.mkdir("normal_dir") - - # Test edge cases - test_dir = tmpdir.mkdir("test_dir") - assert dir_is_tagged(path=str(test_dir), exclude_caches=None, exclude_if_present=None) == [] - assert dir_is_tagged(path=str(test_dir), exclude_if_present=[]) == [] - - # Test with non-existent directory (should not raise an exception) - non_existent_dir = str(tmpdir.join("non_existent")) - result = dir_is_tagged(path=non_existent_dir, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) - assert result == [] - - # Test 1: exclude_caches with path-based operations - assert dir_is_tagged(path=str(cache_dir), exclude_caches=True) == [CACHE_TAG_NAME] - assert dir_is_tagged(path=str(invalid_cache_dir), exclude_caches=True) == [] - assert dir_is_tagged(path=str(normal_dir), exclude_caches=True) == [] - - assert dir_is_tagged(path=str(cache_dir), exclude_caches=False) == [] - assert dir_is_tagged(path=str(invalid_cache_dir), exclude_caches=False) == [] - assert dir_is_tagged(path=str(normal_dir), exclude_caches=False) == [] - - # Test 2: exclude_caches with file-descriptor-based operations - with open_dir(str(cache_dir)) as fd: - assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == [CACHE_TAG_NAME] - with open_dir(str(invalid_cache_dir)) as fd: - assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == [] - with open_dir(str(normal_dir)) as fd: - assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == [] - - with open_dir(str(cache_dir)) as fd: - assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == [] - with open_dir(str(invalid_cache_dir)) as fd: - assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == [] - with open_dir(str(normal_dir)) as fd: - assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == [] - - # Test 3: exclude_if_present with path-based operations - tags = [".NOBACKUP"] - assert dir_is_tagged(path=str(tagged_dir), exclude_if_present=tags) == [".NOBACKUP"] - assert dir_is_tagged(path=str(other_tagged_dir), exclude_if_present=tags) == [] - assert dir_is_tagged(path=str(normal_dir), exclude_if_present=tags) == [] - - tags = [".NOBACKUP", ".DONOTBACKUP"] - assert dir_is_tagged(path=str(tagged_dir), exclude_if_present=tags) == [".NOBACKUP"] - assert dir_is_tagged(path=str(other_tagged_dir), exclude_if_present=tags) == [".DONOTBACKUP"] - assert dir_is_tagged(path=str(normal_dir), exclude_if_present=tags) == [] - - # Test 4: exclude_if_present with file descriptor-based operations - tags = [".NOBACKUP"] - with open_dir(str(tagged_dir)) as fd: - assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".NOBACKUP"] - with open_dir(str(other_tagged_dir)) as fd: - assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [] - with open_dir(str(normal_dir)) as fd: - assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [] - - tags = [".NOBACKUP", ".DONOTBACKUP"] - with open_dir(str(tagged_dir)) as fd: - assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".NOBACKUP"] - with open_dir(str(other_tagged_dir)) as fd: - assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".DONOTBACKUP"] - with open_dir(str(normal_dir)) as fd: - assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [] - - # Test 5: both exclude types with path-based operations - assert sorted(dir_is_tagged(path=str(both_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"])) == [ - ".NOBACKUP", - CACHE_TAG_NAME, - ] - assert dir_is_tagged(path=str(cache_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [CACHE_TAG_NAME] - assert dir_is_tagged(path=str(tagged_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [".NOBACKUP"] - assert dir_is_tagged(path=str(normal_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [] - - # Test 6: both exclude types with file descriptor-based operations - with open_dir(str(both_dir)) as fd: - result = dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) - assert sorted(result) == [".NOBACKUP", CACHE_TAG_NAME] - with open_dir(str(cache_dir)) as fd: - assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [CACHE_TAG_NAME] - with open_dir(str(tagged_dir)) as fd: - assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [".NOBACKUP"] - with open_dir(str(normal_dir)) as fd: - assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == []