mirror of
https://github.com/borgbackup/borg.git
synced 2026-06-11 09:59:19 -04:00
tests: move helpers.fs tests to testsuite.helpers.fs_test
This commit is contained in:
parent
6e57763345
commit
a965ce6dea
2 changed files with 431 additions and 421 deletions
428
src/borg/testsuite/helpers/fs_test.py
Normal file
428
src/borg/testsuite/helpers/fs_test.py
Normal file
|
|
@ -0,0 +1,428 @@
|
|||
import errno
|
||||
import os
|
||||
import sys
|
||||
from contextlib import contextmanager
|
||||
|
||||
import pytest
|
||||
|
||||
from ...constants import * # NOQA
|
||||
from ...constants import CACHE_TAG_NAME, CACHE_TAG_CONTENTS
|
||||
from ...helpers.fs import (
|
||||
dir_is_tagged,
|
||||
get_base_dir,
|
||||
get_cache_dir,
|
||||
get_keys_dir,
|
||||
get_security_dir,
|
||||
get_config_dir,
|
||||
get_runtime_dir,
|
||||
dash_open,
|
||||
safe_unlink,
|
||||
remove_dotdot_prefixes,
|
||||
make_path_safe,
|
||||
)
|
||||
from ...platform import is_win32, is_darwin
|
||||
from .. import are_hardlinks_supported
|
||||
from .. import rejected_dotdot_paths
|
||||
|
||||
|
||||
def test_get_base_dir(monkeypatch):
|
||||
"""test that get_base_dir respects environment"""
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
monkeypatch.delenv("HOME", raising=False)
|
||||
monkeypatch.delenv("USER", raising=False)
|
||||
assert get_base_dir(legacy=True) == os.path.expanduser("~")
|
||||
monkeypatch.setenv("USER", "root")
|
||||
assert get_base_dir(legacy=True) == os.path.expanduser("~root")
|
||||
monkeypatch.setenv("HOME", "/var/tmp/home")
|
||||
assert get_base_dir(legacy=True) == "/var/tmp/home"
|
||||
monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
|
||||
assert get_base_dir(legacy=True) == "/var/tmp/base"
|
||||
# non-legacy is much easier:
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
assert get_base_dir(legacy=False) is None
|
||||
monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
|
||||
assert get_base_dir(legacy=False) == "/var/tmp/base"
|
||||
|
||||
|
||||
def test_get_base_dir_compat(monkeypatch):
|
||||
"""test that it works the same for legacy and for non-legacy implementation"""
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
# old way: if BORG_BASE_DIR is not set, make something up with HOME/USER/~
|
||||
# new way: if BORG_BASE_DIR is not set, return None and let caller deal with it.
|
||||
assert get_base_dir(legacy=False) is None
|
||||
# new and old way: BORG_BASE_DIR overrides all other "base path determination".
|
||||
monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
|
||||
assert get_base_dir(legacy=False) == get_base_dir(legacy=True)
|
||||
|
||||
|
||||
def test_get_config_dir(monkeypatch):
|
||||
"""test that get_config_dir respects environment"""
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
home_dir = os.path.expanduser("~")
|
||||
if is_win32:
|
||||
monkeypatch.delenv("BORG_CONFIG_DIR", raising=False)
|
||||
assert get_config_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg")
|
||||
monkeypatch.setenv("BORG_CONFIG_DIR", home_dir)
|
||||
assert get_config_dir(create=False) == home_dir
|
||||
elif is_darwin:
|
||||
monkeypatch.delenv("BORG_CONFIG_DIR", raising=False)
|
||||
assert get_config_dir(create=False) == os.path.join(home_dir, "Library", "Application Support", "borg")
|
||||
monkeypatch.setenv("BORG_CONFIG_DIR", "/var/tmp")
|
||||
assert get_config_dir(create=False) == "/var/tmp"
|
||||
else:
|
||||
monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
|
||||
monkeypatch.delenv("BORG_CONFIG_DIR", raising=False)
|
||||
assert get_config_dir(create=False) == os.path.join(home_dir, ".config", "borg")
|
||||
monkeypatch.setenv("XDG_CONFIG_HOME", "/var/tmp/.config")
|
||||
assert get_config_dir(create=False) == os.path.join("/var/tmp/.config", "borg")
|
||||
monkeypatch.setenv("BORG_CONFIG_DIR", "/var/tmp")
|
||||
assert get_config_dir(create=False) == "/var/tmp"
|
||||
|
||||
|
||||
def test_get_config_dir_compat(monkeypatch):
|
||||
"""test that it works the same for legacy and for non-legacy implementation"""
|
||||
monkeypatch.delenv("BORG_CONFIG_DIR", raising=False)
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
|
||||
if not is_darwin and not is_win32:
|
||||
# fails on macOS: assert '/Users/tw/Library/Application Support/borg' == '/Users/tw/.config/borg'
|
||||
# fails on win32 MSYS2 (but we do not need legacy compat there).
|
||||
assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False)
|
||||
monkeypatch.setenv("XDG_CONFIG_HOME", "/var/tmp/xdg.config.d")
|
||||
# fails on macOS: assert '/Users/tw/Library/Application Support/borg' == '/var/tmp/xdg.config.d'
|
||||
# fails on win32 MSYS2 (but we do not need legacy compat there).
|
||||
assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False)
|
||||
monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
|
||||
assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False)
|
||||
monkeypatch.setenv("BORG_CONFIG_DIR", "/var/tmp/borg.config.d")
|
||||
assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False)
|
||||
|
||||
|
||||
def test_get_cache_dir(monkeypatch):
|
||||
"""test that get_cache_dir respects environment"""
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
home_dir = os.path.expanduser("~")
|
||||
if is_win32:
|
||||
monkeypatch.delenv("BORG_CACHE_DIR", raising=False)
|
||||
assert get_cache_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg", "Cache")
|
||||
monkeypatch.setenv("BORG_CACHE_DIR", home_dir)
|
||||
assert get_cache_dir(create=False) == home_dir
|
||||
elif is_darwin:
|
||||
monkeypatch.delenv("BORG_CACHE_DIR", raising=False)
|
||||
assert get_cache_dir(create=False) == os.path.join(home_dir, "Library", "Caches", "borg")
|
||||
monkeypatch.setenv("BORG_CACHE_DIR", "/var/tmp")
|
||||
assert get_cache_dir(create=False) == "/var/tmp"
|
||||
else:
|
||||
monkeypatch.delenv("XDG_CACHE_HOME", raising=False)
|
||||
monkeypatch.delenv("BORG_CACHE_DIR", raising=False)
|
||||
assert get_cache_dir(create=False) == os.path.join(home_dir, ".cache", "borg")
|
||||
monkeypatch.setenv("XDG_CACHE_HOME", "/var/tmp/.cache")
|
||||
assert get_cache_dir(create=False) == os.path.join("/var/tmp/.cache", "borg")
|
||||
monkeypatch.setenv("BORG_CACHE_DIR", "/var/tmp")
|
||||
assert get_cache_dir(create=False) == "/var/tmp"
|
||||
|
||||
|
||||
def test_get_cache_dir_compat(monkeypatch):
|
||||
"""test that it works the same for legacy and for non-legacy implementation"""
|
||||
monkeypatch.delenv("BORG_CACHE_DIR", raising=False)
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
monkeypatch.delenv("XDG_CACHE_HOME", raising=False)
|
||||
if not is_darwin and not is_win32:
|
||||
# fails on macOS: assert '/Users/tw/Library/Caches/borg' == '/Users/tw/.cache/borg'
|
||||
# fails on win32 MSYS2 (but we do not need legacy compat there).
|
||||
assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False)
|
||||
# fails on macOS: assert '/Users/tw/Library/Caches/borg' == '/var/tmp/xdg.cache.d'
|
||||
# fails on win32 MSYS2 (but we do not need legacy compat there).
|
||||
monkeypatch.setenv("XDG_CACHE_HOME", "/var/tmp/xdg.cache.d")
|
||||
assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False)
|
||||
monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
|
||||
assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False)
|
||||
monkeypatch.setenv("BORG_CACHE_DIR", "/var/tmp/borg.cache.d")
|
||||
assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False)
|
||||
|
||||
|
||||
def test_get_keys_dir(monkeypatch):
|
||||
"""test that get_keys_dir respects environment"""
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
home_dir = os.path.expanduser("~")
|
||||
if is_win32:
|
||||
monkeypatch.delenv("BORG_KEYS_DIR", raising=False)
|
||||
assert get_keys_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg", "keys")
|
||||
monkeypatch.setenv("BORG_KEYS_DIR", home_dir)
|
||||
assert get_keys_dir(create=False) == home_dir
|
||||
elif is_darwin:
|
||||
monkeypatch.delenv("BORG_KEYS_DIR", raising=False)
|
||||
assert get_keys_dir(create=False) == os.path.join(home_dir, "Library", "Application Support", "borg", "keys")
|
||||
monkeypatch.setenv("BORG_KEYS_DIR", "/var/tmp")
|
||||
assert get_keys_dir(create=False) == "/var/tmp"
|
||||
else:
|
||||
monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
|
||||
monkeypatch.delenv("BORG_KEYS_DIR", raising=False)
|
||||
assert get_keys_dir(create=False) == os.path.join(home_dir, ".config", "borg", "keys")
|
||||
monkeypatch.setenv("XDG_CONFIG_HOME", "/var/tmp/.config")
|
||||
assert get_keys_dir(create=False) == os.path.join("/var/tmp/.config", "borg", "keys")
|
||||
monkeypatch.setenv("BORG_KEYS_DIR", "/var/tmp")
|
||||
assert get_keys_dir(create=False) == "/var/tmp"
|
||||
|
||||
|
||||
def test_get_security_dir(monkeypatch):
|
||||
"""test that get_security_dir respects environment"""
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
home_dir = os.path.expanduser("~")
|
||||
if is_win32:
|
||||
monkeypatch.delenv("BORG_SECURITY_DIR", raising=False)
|
||||
assert get_security_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg", "security")
|
||||
assert get_security_dir(repository_id="1234", create=False) == os.path.join(
|
||||
home_dir, "AppData", "Local", "borg", "borg", "security", "1234"
|
||||
)
|
||||
monkeypatch.setenv("BORG_SECURITY_DIR", home_dir)
|
||||
assert get_security_dir(create=False) == home_dir
|
||||
elif is_darwin:
|
||||
monkeypatch.delenv("BORG_SECURITY_DIR", raising=False)
|
||||
assert get_security_dir(create=False) == os.path.join(
|
||||
home_dir, "Library", "Application Support", "borg", "security"
|
||||
)
|
||||
assert get_security_dir(repository_id="1234", create=False) == os.path.join(
|
||||
home_dir, "Library", "Application Support", "borg", "security", "1234"
|
||||
)
|
||||
monkeypatch.setenv("BORG_SECURITY_DIR", "/var/tmp")
|
||||
assert get_security_dir(create=False) == "/var/tmp"
|
||||
else:
|
||||
monkeypatch.delenv("XDG_DATA_HOME", raising=False)
|
||||
monkeypatch.delenv("BORG_SECURITY_DIR", raising=False)
|
||||
assert get_security_dir(create=False) == os.path.join(home_dir, ".local", "share", "borg", "security")
|
||||
assert get_security_dir(repository_id="1234", create=False) == os.path.join(
|
||||
home_dir, ".local", "share", "borg", "security", "1234"
|
||||
)
|
||||
monkeypatch.setenv("XDG_DATA_HOME", "/var/tmp/.config")
|
||||
assert get_security_dir(create=False) == os.path.join("/var/tmp/.config", "borg", "security")
|
||||
monkeypatch.setenv("BORG_SECURITY_DIR", "/var/tmp")
|
||||
assert get_security_dir(create=False) == "/var/tmp"
|
||||
|
||||
|
||||
def test_get_runtime_dir(monkeypatch):
|
||||
"""test that get_runtime_dir respects environment"""
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
home_dir = os.path.expanduser("~")
|
||||
if is_win32:
|
||||
monkeypatch.delenv("BORG_RUNTIME_DIR", raising=False)
|
||||
assert get_runtime_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "Temp", "borg", "borg")
|
||||
monkeypatch.setenv("BORG_RUNTIME_DIR", home_dir)
|
||||
assert get_runtime_dir(create=False) == home_dir
|
||||
elif is_darwin:
|
||||
monkeypatch.delenv("BORG_RUNTIME_DIR", raising=False)
|
||||
assert get_runtime_dir(create=False) == os.path.join(home_dir, "Library", "Caches", "TemporaryItems", "borg")
|
||||
monkeypatch.setenv("BORG_RUNTIME_DIR", "/var/tmp")
|
||||
assert get_runtime_dir(create=False) == "/var/tmp"
|
||||
else:
|
||||
monkeypatch.delenv("XDG_RUNTIME_DIR", raising=False)
|
||||
monkeypatch.delenv("BORG_RUNTIME_DIR", raising=False)
|
||||
uid = str(os.getuid())
|
||||
assert get_runtime_dir(create=False) in [
|
||||
os.path.join("/run/user", uid, "borg"),
|
||||
os.path.join("/var/run/user", uid, "borg"),
|
||||
os.path.join(f"/tmp/runtime-{uid}", "borg"),
|
||||
]
|
||||
monkeypatch.setenv("XDG_RUNTIME_DIR", "/var/tmp/.cache")
|
||||
assert get_runtime_dir(create=False) == os.path.join("/var/tmp/.cache", "borg")
|
||||
monkeypatch.setenv("BORG_RUNTIME_DIR", "/var/tmp")
|
||||
assert get_runtime_dir(create=False) == "/var/tmp"
|
||||
|
||||
|
||||
def test_dash_open():
|
||||
assert dash_open("-", "r") is sys.stdin
|
||||
assert dash_open("-", "w") is sys.stdout
|
||||
assert dash_open("-", "rb") is sys.stdin.buffer
|
||||
assert dash_open("-", "wb") is sys.stdout.buffer
|
||||
|
||||
|
||||
@pytest.mark.skipif(not are_hardlinks_supported(), reason="hardlinks not supported")
|
||||
def test_safe_unlink_is_safe(tmpdir):
|
||||
contents = b"Hello, world\n"
|
||||
victim = tmpdir / "victim"
|
||||
victim.write_binary(contents)
|
||||
hard_link = tmpdir / "hardlink"
|
||||
os.link(str(victim), str(hard_link)) # hard_link.mklinkto is not implemented on win32
|
||||
|
||||
safe_unlink(hard_link)
|
||||
|
||||
assert victim.read_binary() == contents
|
||||
|
||||
|
||||
@pytest.mark.skipif(not are_hardlinks_supported(), reason="hardlinks not supported")
|
||||
def test_safe_unlink_is_safe_ENOSPC(tmpdir, monkeypatch):
|
||||
contents = b"Hello, world\n"
|
||||
victim = tmpdir / "victim"
|
||||
victim.write_binary(contents)
|
||||
hard_link = tmpdir / "hardlink"
|
||||
os.link(str(victim), str(hard_link)) # hard_link.mklinkto is not implemented on win32
|
||||
|
||||
def os_unlink(_):
|
||||
raise OSError(errno.ENOSPC, "Pretend that we ran out of space")
|
||||
|
||||
monkeypatch.setattr(os, "unlink", os_unlink)
|
||||
|
||||
with pytest.raises(OSError):
|
||||
safe_unlink(hard_link)
|
||||
|
||||
assert victim.read_binary() == contents
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"original_path, expected_path",
|
||||
[("foo", "foo"), ("foo/bar", "foo/bar"), ("/foo/bar", "foo/bar"), ("../foo/bar", "foo/bar")],
|
||||
)
|
||||
def test_remove_dotdot_prefixes(original_path, expected_path):
|
||||
assert remove_dotdot_prefixes(original_path) == expected_path
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"original_path, expected_path",
|
||||
[
|
||||
(".", "."),
|
||||
("./", "."),
|
||||
("/foo", "foo"),
|
||||
("//foo", "foo"),
|
||||
(".//foo//bar//", "foo/bar"),
|
||||
("/foo/bar", "foo/bar"),
|
||||
("//foo/bar", "foo/bar"),
|
||||
("//foo/./bar", "foo/bar"),
|
||||
(".test", ".test"),
|
||||
(".test.", ".test."),
|
||||
("..test..", "..test.."),
|
||||
("/te..st/foo/bar", "te..st/foo/bar"),
|
||||
("/..test../abc//", "..test../abc"),
|
||||
],
|
||||
)
|
||||
def test_valid_make_path_safe(original_path, expected_path):
|
||||
assert make_path_safe(original_path) == expected_path
|
||||
|
||||
|
||||
@pytest.mark.parametrize("path", rejected_dotdot_paths)
|
||||
def test_invalid_make_path_safe(path):
|
||||
with pytest.raises(ValueError, match="unexpected '..' element in path"):
|
||||
make_path_safe(path)
|
||||
|
||||
|
||||
def test_dir_is_tagged(tmpdir):
|
||||
"""Test dir_is_tagged with both path-based and file descriptor-based operations."""
|
||||
|
||||
@contextmanager
|
||||
def open_dir(path):
|
||||
fd = os.open(path, os.O_RDONLY)
|
||||
try:
|
||||
yield fd
|
||||
finally:
|
||||
os.close(fd)
|
||||
|
||||
# Create directories for testing exclude_caches
|
||||
cache_dir = tmpdir.mkdir("cache_dir")
|
||||
cache_tag_path = cache_dir.join(CACHE_TAG_NAME)
|
||||
cache_tag_path.write_binary(CACHE_TAG_CONTENTS)
|
||||
|
||||
invalid_cache_dir = tmpdir.mkdir("invalid_cache_dir")
|
||||
invalid_cache_tag_path = invalid_cache_dir.join(CACHE_TAG_NAME)
|
||||
invalid_cache_tag_path.write_binary(b"invalid signature")
|
||||
|
||||
# Create directories for testing exclude_if_present
|
||||
tagged_dir = tmpdir.mkdir("tagged_dir")
|
||||
tag_file = tagged_dir.join(".NOBACKUP")
|
||||
tag_file.write("test")
|
||||
|
||||
other_tagged_dir = tmpdir.mkdir("other_tagged_dir")
|
||||
other_tag_file = other_tagged_dir.join(".DONOTBACKUP")
|
||||
other_tag_file.write("test")
|
||||
|
||||
# Create a directory with both a CACHEDIR.TAG and a custom tag file
|
||||
both_dir = tmpdir.mkdir("both_dir")
|
||||
cache_tag_path = both_dir.join(CACHE_TAG_NAME)
|
||||
cache_tag_path.write_binary(CACHE_TAG_CONTENTS)
|
||||
custom_tag_path = both_dir.join(".NOBACKUP")
|
||||
custom_tag_path.write("test")
|
||||
|
||||
# Create a directory without any tag files
|
||||
normal_dir = tmpdir.mkdir("normal_dir")
|
||||
|
||||
# Test edge cases
|
||||
test_dir = tmpdir.mkdir("test_dir")
|
||||
assert dir_is_tagged(path=str(test_dir), exclude_caches=None, exclude_if_present=None) == []
|
||||
assert dir_is_tagged(path=str(test_dir), exclude_if_present=[]) == []
|
||||
|
||||
# Test with non-existent directory (should not raise an exception)
|
||||
non_existent_dir = str(tmpdir.join("non_existent"))
|
||||
result = dir_is_tagged(path=non_existent_dir, exclude_caches=True, exclude_if_present=[".NOBACKUP"])
|
||||
assert result == []
|
||||
|
||||
# Test 1: exclude_caches with path-based operations
|
||||
assert dir_is_tagged(path=str(cache_dir), exclude_caches=True) == [CACHE_TAG_NAME]
|
||||
assert dir_is_tagged(path=str(invalid_cache_dir), exclude_caches=True) == []
|
||||
assert dir_is_tagged(path=str(normal_dir), exclude_caches=True) == []
|
||||
|
||||
assert dir_is_tagged(path=str(cache_dir), exclude_caches=False) == []
|
||||
assert dir_is_tagged(path=str(invalid_cache_dir), exclude_caches=False) == []
|
||||
assert dir_is_tagged(path=str(normal_dir), exclude_caches=False) == []
|
||||
|
||||
# Test 2: exclude_caches with file-descriptor-based operations
|
||||
with open_dir(str(cache_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == [CACHE_TAG_NAME]
|
||||
with open_dir(str(invalid_cache_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == []
|
||||
with open_dir(str(normal_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == []
|
||||
|
||||
with open_dir(str(cache_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == []
|
||||
with open_dir(str(invalid_cache_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == []
|
||||
with open_dir(str(normal_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == []
|
||||
|
||||
# Test 3: exclude_if_present with path-based operations
|
||||
tags = [".NOBACKUP"]
|
||||
assert dir_is_tagged(path=str(tagged_dir), exclude_if_present=tags) == [".NOBACKUP"]
|
||||
assert dir_is_tagged(path=str(other_tagged_dir), exclude_if_present=tags) == []
|
||||
assert dir_is_tagged(path=str(normal_dir), exclude_if_present=tags) == []
|
||||
|
||||
tags = [".NOBACKUP", ".DONOTBACKUP"]
|
||||
assert dir_is_tagged(path=str(tagged_dir), exclude_if_present=tags) == [".NOBACKUP"]
|
||||
assert dir_is_tagged(path=str(other_tagged_dir), exclude_if_present=tags) == [".DONOTBACKUP"]
|
||||
assert dir_is_tagged(path=str(normal_dir), exclude_if_present=tags) == []
|
||||
|
||||
# Test 4: exclude_if_present with file descriptor-based operations
|
||||
tags = [".NOBACKUP"]
|
||||
with open_dir(str(tagged_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".NOBACKUP"]
|
||||
with open_dir(str(other_tagged_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == []
|
||||
with open_dir(str(normal_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == []
|
||||
|
||||
tags = [".NOBACKUP", ".DONOTBACKUP"]
|
||||
with open_dir(str(tagged_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".NOBACKUP"]
|
||||
with open_dir(str(other_tagged_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".DONOTBACKUP"]
|
||||
with open_dir(str(normal_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == []
|
||||
|
||||
# Test 5: both exclude types with path-based operations
|
||||
assert sorted(dir_is_tagged(path=str(both_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"])) == [
|
||||
".NOBACKUP",
|
||||
CACHE_TAG_NAME,
|
||||
]
|
||||
assert dir_is_tagged(path=str(cache_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [CACHE_TAG_NAME]
|
||||
assert dir_is_tagged(path=str(tagged_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [".NOBACKUP"]
|
||||
assert dir_is_tagged(path=str(normal_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == []
|
||||
|
||||
# Test 6: both exclude types with file descriptor-based operations
|
||||
with open_dir(str(both_dir)) as fd:
|
||||
assert sorted(dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"])) == [
|
||||
".NOBACKUP",
|
||||
CACHE_TAG_NAME,
|
||||
]
|
||||
with open_dir(str(cache_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [CACHE_TAG_NAME]
|
||||
with open_dir(str(tagged_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [".NOBACKUP"]
|
||||
with open_dir(str(normal_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == []
|
||||
|
|
@ -1,12 +1,10 @@
|
|||
import base64
|
||||
import errno
|
||||
import getpass
|
||||
import hashlib
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
from argparse import ArgumentTypeError
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from io import StringIO, BytesIO
|
||||
|
||||
|
|
@ -15,8 +13,6 @@ import pytest
|
|||
from ..archiver.prune_cmd import prune_within, prune_split
|
||||
from .. import platform
|
||||
from ..constants import * # NOQA
|
||||
from ..constants import CACHE_TAG_NAME, CACHE_TAG_CONTENTS
|
||||
from ..helpers.fs import dir_is_tagged
|
||||
from ..helpers import Location
|
||||
from ..helpers import Buffer
|
||||
from ..helpers import (
|
||||
|
|
@ -28,9 +24,8 @@ from ..helpers import (
|
|||
PlaceholderError,
|
||||
replace_placeholders,
|
||||
)
|
||||
from ..helpers import remove_dotdot_prefixes, make_path_safe, clean_lines
|
||||
from ..helpers import clean_lines
|
||||
from ..helpers import interval
|
||||
from ..helpers import get_base_dir, get_cache_dir, get_keys_dir, get_security_dir, get_config_dir, get_runtime_dir
|
||||
from ..helpers import is_slow_msgpack
|
||||
from ..helpers import msgpack
|
||||
from ..helpers import yes, TRUISH, FALSISH, DEFAULTISH
|
||||
|
|
@ -42,16 +37,13 @@ from ..helpers import swidth_slice
|
|||
from ..helpers import chunkit
|
||||
from ..helpers import safe_ns, safe_s, SUPPORT_32BIT_PLATFORMS
|
||||
from ..helpers import popen_with_error_handling
|
||||
from ..helpers import dash_open
|
||||
from ..helpers import iter_separated
|
||||
from ..helpers import eval_escapes
|
||||
from ..helpers import safe_unlink
|
||||
from ..helpers import text_to_json, binary_to_json
|
||||
from ..helpers import classify_ec, max_ec
|
||||
from ..helpers.passphrase import Passphrase, PasswordRetriesExceeded
|
||||
from ..platform import is_cygwin, is_win32, is_darwin
|
||||
from . import FakeInputs, are_hardlinks_supported
|
||||
from . import rejected_dotdot_paths
|
||||
from ..platform import is_cygwin
|
||||
from . import FakeInputs
|
||||
|
||||
|
||||
def test_bin_to_hex():
|
||||
|
|
@ -404,51 +396,6 @@ def test_invalid_chunkerparams(invalid_chunker_params):
|
|||
ChunkerParams(invalid_chunker_params)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"original_path, expected_path",
|
||||
[
|
||||
(".", "."),
|
||||
("..", "."),
|
||||
("/", "."),
|
||||
("//", "."),
|
||||
("foo", "foo"),
|
||||
("foo/bar", "foo/bar"),
|
||||
("/foo/bar", "foo/bar"),
|
||||
("../foo/bar", "foo/bar"),
|
||||
],
|
||||
)
|
||||
def test_remove_dotdot_prefixes(original_path, expected_path):
|
||||
assert remove_dotdot_prefixes(original_path) == expected_path
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"original_path, expected_path",
|
||||
[
|
||||
(".", "."),
|
||||
("./", "."),
|
||||
("/foo", "foo"),
|
||||
("//foo", "foo"),
|
||||
(".//foo//bar//", "foo/bar"),
|
||||
("/foo/bar", "foo/bar"),
|
||||
("//foo/bar", "foo/bar"),
|
||||
("//foo/./bar", "foo/bar"),
|
||||
(".test", ".test"),
|
||||
(".test.", ".test."),
|
||||
("..test..", "..test.."),
|
||||
("/te..st/foo/bar", "te..st/foo/bar"),
|
||||
("/..test../abc//", "..test../abc"),
|
||||
],
|
||||
)
|
||||
def test_valid_make_path_safe(original_path, expected_path):
|
||||
assert make_path_safe(original_path) == expected_path
|
||||
|
||||
|
||||
@pytest.mark.parametrize("path", rejected_dotdot_paths)
|
||||
def test_invalid_make_path_safe(path):
|
||||
with pytest.raises(ValueError, match="unexpected '..' element in path"):
|
||||
make_path_safe(path)
|
||||
|
||||
|
||||
class MockArchive:
|
||||
def __init__(self, ts, id):
|
||||
self.ts = ts
|
||||
|
|
@ -635,210 +582,6 @@ def test_parse_timestamp():
|
|||
assert parse_timestamp("2015-04-19T20:25:00") == datetime(2015, 4, 19, 20, 25, 0, 0, timezone.utc)
|
||||
|
||||
|
||||
def test_get_base_dir(monkeypatch):
|
||||
"""test that get_base_dir respects environment"""
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
monkeypatch.delenv("HOME", raising=False)
|
||||
monkeypatch.delenv("USER", raising=False)
|
||||
assert get_base_dir(legacy=True) == os.path.expanduser("~")
|
||||
monkeypatch.setenv("USER", "root")
|
||||
assert get_base_dir(legacy=True) == os.path.expanduser("~root")
|
||||
monkeypatch.setenv("HOME", "/var/tmp/home")
|
||||
assert get_base_dir(legacy=True) == "/var/tmp/home"
|
||||
monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
|
||||
assert get_base_dir(legacy=True) == "/var/tmp/base"
|
||||
# non-legacy is much easier:
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
assert get_base_dir(legacy=False) is None
|
||||
monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
|
||||
assert get_base_dir(legacy=False) == "/var/tmp/base"
|
||||
|
||||
|
||||
def test_get_base_dir_compat(monkeypatch):
|
||||
"""test that it works the same for legacy and for non-legacy implementation"""
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
# old way: if BORG_BASE_DIR is not set, make something up with HOME/USER/~
|
||||
# new way: if BORG_BASE_DIR is not set, return None and let caller deal with it.
|
||||
assert get_base_dir(legacy=False) is None
|
||||
# new and old way: BORG_BASE_DIR overrides all other "base path determination".
|
||||
monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
|
||||
assert get_base_dir(legacy=False) == get_base_dir(legacy=True)
|
||||
|
||||
|
||||
def test_get_config_dir(monkeypatch):
|
||||
"""test that get_config_dir respects environment"""
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
home_dir = os.path.expanduser("~")
|
||||
if is_win32:
|
||||
monkeypatch.delenv("BORG_CONFIG_DIR", raising=False)
|
||||
assert get_config_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg")
|
||||
monkeypatch.setenv("BORG_CONFIG_DIR", home_dir)
|
||||
assert get_config_dir(create=False) == home_dir
|
||||
elif is_darwin:
|
||||
monkeypatch.delenv("BORG_CONFIG_DIR", raising=False)
|
||||
assert get_config_dir(create=False) == os.path.join(home_dir, "Library", "Application Support", "borg")
|
||||
monkeypatch.setenv("BORG_CONFIG_DIR", "/var/tmp")
|
||||
assert get_config_dir(create=False) == "/var/tmp"
|
||||
else:
|
||||
monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
|
||||
monkeypatch.delenv("BORG_CONFIG_DIR", raising=False)
|
||||
assert get_config_dir(create=False) == os.path.join(home_dir, ".config", "borg")
|
||||
monkeypatch.setenv("XDG_CONFIG_HOME", "/var/tmp/.config")
|
||||
assert get_config_dir(create=False) == os.path.join("/var/tmp/.config", "borg")
|
||||
monkeypatch.setenv("BORG_CONFIG_DIR", "/var/tmp")
|
||||
assert get_config_dir(create=False) == "/var/tmp"
|
||||
|
||||
|
||||
def test_get_config_dir_compat(monkeypatch):
|
||||
"""test that it works the same for legacy and for non-legacy implementation"""
|
||||
monkeypatch.delenv("BORG_CONFIG_DIR", raising=False)
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
|
||||
if not is_darwin and not is_win32:
|
||||
# fails on macOS: assert '/Users/tw/Library/Application Support/borg' == '/Users/tw/.config/borg'
|
||||
# fails on win32 MSYS2 (but we do not need legacy compat there).
|
||||
assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False)
|
||||
monkeypatch.setenv("XDG_CONFIG_HOME", "/var/tmp/xdg.config.d")
|
||||
# fails on macOS: assert '/Users/tw/Library/Application Support/borg' == '/var/tmp/xdg.config.d'
|
||||
# fails on win32 MSYS2 (but we do not need legacy compat there).
|
||||
assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False)
|
||||
monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
|
||||
assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False)
|
||||
monkeypatch.setenv("BORG_CONFIG_DIR", "/var/tmp/borg.config.d")
|
||||
assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False)
|
||||
|
||||
|
||||
def test_get_cache_dir(monkeypatch):
|
||||
"""test that get_cache_dir respects environment"""
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
home_dir = os.path.expanduser("~")
|
||||
if is_win32:
|
||||
monkeypatch.delenv("BORG_CACHE_DIR", raising=False)
|
||||
assert get_cache_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg", "Cache")
|
||||
monkeypatch.setenv("BORG_CACHE_DIR", home_dir)
|
||||
assert get_cache_dir(create=False) == home_dir
|
||||
elif is_darwin:
|
||||
monkeypatch.delenv("BORG_CACHE_DIR", raising=False)
|
||||
assert get_cache_dir(create=False) == os.path.join(home_dir, "Library", "Caches", "borg")
|
||||
monkeypatch.setenv("BORG_CACHE_DIR", "/var/tmp")
|
||||
assert get_cache_dir(create=False) == "/var/tmp"
|
||||
else:
|
||||
monkeypatch.delenv("XDG_CACHE_HOME", raising=False)
|
||||
monkeypatch.delenv("BORG_CACHE_DIR", raising=False)
|
||||
assert get_cache_dir(create=False) == os.path.join(home_dir, ".cache", "borg")
|
||||
monkeypatch.setenv("XDG_CACHE_HOME", "/var/tmp/.cache")
|
||||
assert get_cache_dir(create=False) == os.path.join("/var/tmp/.cache", "borg")
|
||||
monkeypatch.setenv("BORG_CACHE_DIR", "/var/tmp")
|
||||
assert get_cache_dir(create=False) == "/var/tmp"
|
||||
|
||||
|
||||
def test_get_cache_dir_compat(monkeypatch):
|
||||
"""test that it works the same for legacy and for non-legacy implementation"""
|
||||
monkeypatch.delenv("BORG_CACHE_DIR", raising=False)
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
monkeypatch.delenv("XDG_CACHE_HOME", raising=False)
|
||||
if not is_darwin and not is_win32:
|
||||
# fails on macOS: assert '/Users/tw/Library/Caches/borg' == '/Users/tw/.cache/borg'
|
||||
# fails on win32 MSYS2 (but we do not need legacy compat there).
|
||||
assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False)
|
||||
# fails on macOS: assert '/Users/tw/Library/Caches/borg' == '/var/tmp/xdg.cache.d'
|
||||
# fails on win32 MSYS2 (but we do not need legacy compat there).
|
||||
monkeypatch.setenv("XDG_CACHE_HOME", "/var/tmp/xdg.cache.d")
|
||||
assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False)
|
||||
monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
|
||||
assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False)
|
||||
monkeypatch.setenv("BORG_CACHE_DIR", "/var/tmp/borg.cache.d")
|
||||
assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False)
|
||||
|
||||
|
||||
def test_get_keys_dir(monkeypatch):
|
||||
"""test that get_keys_dir respects environment"""
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
home_dir = os.path.expanduser("~")
|
||||
if is_win32:
|
||||
monkeypatch.delenv("BORG_KEYS_DIR", raising=False)
|
||||
assert get_keys_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg", "keys")
|
||||
monkeypatch.setenv("BORG_KEYS_DIR", home_dir)
|
||||
assert get_keys_dir(create=False) == home_dir
|
||||
elif is_darwin:
|
||||
monkeypatch.delenv("BORG_KEYS_DIR", raising=False)
|
||||
assert get_keys_dir(create=False) == os.path.join(home_dir, "Library", "Application Support", "borg", "keys")
|
||||
monkeypatch.setenv("BORG_KEYS_DIR", "/var/tmp")
|
||||
assert get_keys_dir(create=False) == "/var/tmp"
|
||||
else:
|
||||
monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
|
||||
monkeypatch.delenv("BORG_KEYS_DIR", raising=False)
|
||||
assert get_keys_dir(create=False) == os.path.join(home_dir, ".config", "borg", "keys")
|
||||
monkeypatch.setenv("XDG_CONFIG_HOME", "/var/tmp/.config")
|
||||
assert get_keys_dir(create=False) == os.path.join("/var/tmp/.config", "borg", "keys")
|
||||
monkeypatch.setenv("BORG_KEYS_DIR", "/var/tmp")
|
||||
assert get_keys_dir(create=False) == "/var/tmp"
|
||||
|
||||
|
||||
def test_get_security_dir(monkeypatch):
|
||||
"""test that get_security_dir respects environment"""
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
home_dir = os.path.expanduser("~")
|
||||
if is_win32:
|
||||
monkeypatch.delenv("BORG_SECURITY_DIR", raising=False)
|
||||
assert get_security_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg", "security")
|
||||
assert get_security_dir(repository_id="1234", create=False) == os.path.join(
|
||||
home_dir, "AppData", "Local", "borg", "borg", "security", "1234"
|
||||
)
|
||||
monkeypatch.setenv("BORG_SECURITY_DIR", home_dir)
|
||||
assert get_security_dir(create=False) == home_dir
|
||||
elif is_darwin:
|
||||
monkeypatch.delenv("BORG_SECURITY_DIR", raising=False)
|
||||
assert get_security_dir(create=False) == os.path.join(
|
||||
home_dir, "Library", "Application Support", "borg", "security"
|
||||
)
|
||||
assert get_security_dir(repository_id="1234", create=False) == os.path.join(
|
||||
home_dir, "Library", "Application Support", "borg", "security", "1234"
|
||||
)
|
||||
monkeypatch.setenv("BORG_SECURITY_DIR", "/var/tmp")
|
||||
assert get_security_dir(create=False) == "/var/tmp"
|
||||
else:
|
||||
monkeypatch.delenv("XDG_DATA_HOME", raising=False)
|
||||
monkeypatch.delenv("BORG_SECURITY_DIR", raising=False)
|
||||
assert get_security_dir(create=False) == os.path.join(home_dir, ".local", "share", "borg", "security")
|
||||
assert get_security_dir(repository_id="1234", create=False) == os.path.join(
|
||||
home_dir, ".local", "share", "borg", "security", "1234"
|
||||
)
|
||||
monkeypatch.setenv("XDG_DATA_HOME", "/var/tmp/.config")
|
||||
assert get_security_dir(create=False) == os.path.join("/var/tmp/.config", "borg", "security")
|
||||
monkeypatch.setenv("BORG_SECURITY_DIR", "/var/tmp")
|
||||
assert get_security_dir(create=False) == "/var/tmp"
|
||||
|
||||
|
||||
def test_get_runtime_dir(monkeypatch):
|
||||
"""test that get_runtime_dir respects environment"""
|
||||
monkeypatch.delenv("BORG_BASE_DIR", raising=False)
|
||||
home_dir = os.path.expanduser("~")
|
||||
if is_win32:
|
||||
monkeypatch.delenv("BORG_RUNTIME_DIR", raising=False)
|
||||
assert get_runtime_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "Temp", "borg", "borg")
|
||||
monkeypatch.setenv("BORG_RUNTIME_DIR", home_dir)
|
||||
assert get_runtime_dir(create=False) == home_dir
|
||||
elif is_darwin:
|
||||
monkeypatch.delenv("BORG_RUNTIME_DIR", raising=False)
|
||||
assert get_runtime_dir(create=False) == os.path.join(home_dir, "Library", "Caches", "TemporaryItems", "borg")
|
||||
monkeypatch.setenv("BORG_RUNTIME_DIR", "/var/tmp")
|
||||
assert get_runtime_dir(create=False) == "/var/tmp"
|
||||
else:
|
||||
monkeypatch.delenv("XDG_RUNTIME_DIR", raising=False)
|
||||
monkeypatch.delenv("BORG_RUNTIME_DIR", raising=False)
|
||||
uid = str(os.getuid())
|
||||
assert get_runtime_dir(create=False) in [
|
||||
os.path.join("/run/user", uid, "borg"),
|
||||
os.path.join("/var/run/user", uid, "borg"),
|
||||
os.path.join(f"/tmp/runtime-{uid}", "borg"),
|
||||
]
|
||||
monkeypatch.setenv("XDG_RUNTIME_DIR", "/var/tmp/.cache")
|
||||
assert get_runtime_dir(create=False) == os.path.join("/var/tmp/.cache", "borg")
|
||||
monkeypatch.setenv("BORG_RUNTIME_DIR", "/var/tmp")
|
||||
assert get_runtime_dir(create=False) == "/var/tmp"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"size, fmt",
|
||||
[
|
||||
|
|
@ -1321,13 +1064,6 @@ class TestPopenWithErrorHandling:
|
|||
popen_with_error_handling("", shell=True)
|
||||
|
||||
|
||||
def test_dash_open():
|
||||
assert dash_open("-", "r") is sys.stdin
|
||||
assert dash_open("-", "w") is sys.stdout
|
||||
assert dash_open("-", "rb") is sys.stdin.buffer
|
||||
assert dash_open("-", "wb") is sys.stdout.buffer
|
||||
|
||||
|
||||
def test_iter_separated():
|
||||
# newline and utf-8
|
||||
sep, items = "\n", ["foo", "bar/baz", "αáčő"]
|
||||
|
|
@ -1352,38 +1088,6 @@ def test_eval_escapes():
|
|||
assert eval_escapes("äç\\n") == "äç\n"
|
||||
|
||||
|
||||
@pytest.mark.skipif(not are_hardlinks_supported(), reason="hardlinks not supported")
|
||||
def test_safe_unlink_is_safe(tmpdir):
|
||||
contents = b"Hello, world\n"
|
||||
victim = tmpdir / "victim"
|
||||
victim.write_binary(contents)
|
||||
hard_link = tmpdir / "hardlink"
|
||||
os.link(str(victim), str(hard_link)) # hard_link.mklinkto is not implemented on win32
|
||||
|
||||
safe_unlink(hard_link)
|
||||
|
||||
assert victim.read_binary() == contents
|
||||
|
||||
|
||||
@pytest.mark.skipif(not are_hardlinks_supported(), reason="hardlinks not supported")
|
||||
def test_safe_unlink_is_safe_ENOSPC(tmpdir, monkeypatch):
|
||||
contents = b"Hello, world\n"
|
||||
victim = tmpdir / "victim"
|
||||
victim.write_binary(contents)
|
||||
hard_link = tmpdir / "hardlink"
|
||||
os.link(str(victim), str(hard_link)) # hard_link.mklinkto is not implemented on win32
|
||||
|
||||
def os_unlink(_):
|
||||
raise OSError(errno.ENOSPC, "Pretend that we ran out of space")
|
||||
|
||||
monkeypatch.setattr(os, "unlink", os_unlink)
|
||||
|
||||
with pytest.raises(OSError):
|
||||
safe_unlink(hard_link)
|
||||
|
||||
assert victim.read_binary() == contents
|
||||
|
||||
|
||||
class TestPassphrase:
|
||||
def test_passphrase_new_verification(self, capsys, monkeypatch):
|
||||
monkeypatch.setattr(getpass, "getpass", lambda prompt: "1234aöäü")
|
||||
|
|
@ -1522,125 +1226,3 @@ def test_ec_invalid():
|
|||
)
|
||||
def test_max_ec(ec1, ec2, ec_max):
|
||||
assert max_ec(ec1, ec2) == ec_max
|
||||
|
||||
|
||||
def test_dir_is_tagged(tmpdir):
|
||||
"""Test dir_is_tagged with both path-based and file descriptor-based operations."""
|
||||
|
||||
@contextmanager
|
||||
def open_dir(path):
|
||||
fd = os.open(path, os.O_RDONLY)
|
||||
try:
|
||||
yield fd
|
||||
finally:
|
||||
os.close(fd)
|
||||
|
||||
# Create directories for testing exclude_caches
|
||||
cache_dir = tmpdir.mkdir("cache_dir")
|
||||
cache_tag_path = cache_dir.join(CACHE_TAG_NAME)
|
||||
cache_tag_path.write_binary(CACHE_TAG_CONTENTS)
|
||||
|
||||
invalid_cache_dir = tmpdir.mkdir("invalid_cache_dir")
|
||||
invalid_cache_tag_path = invalid_cache_dir.join(CACHE_TAG_NAME)
|
||||
invalid_cache_tag_path.write_binary(b"invalid signature")
|
||||
|
||||
# Create directories for testing exclude_if_present
|
||||
tagged_dir = tmpdir.mkdir("tagged_dir")
|
||||
tag_file = tagged_dir.join(".NOBACKUP")
|
||||
tag_file.write("test")
|
||||
|
||||
other_tagged_dir = tmpdir.mkdir("other_tagged_dir")
|
||||
other_tag_file = other_tagged_dir.join(".DONOTBACKUP")
|
||||
other_tag_file.write("test")
|
||||
|
||||
# Create a directory with both a CACHEDIR.TAG and a custom tag file
|
||||
both_dir = tmpdir.mkdir("both_dir")
|
||||
cache_tag_path = both_dir.join(CACHE_TAG_NAME)
|
||||
cache_tag_path.write_binary(CACHE_TAG_CONTENTS)
|
||||
custom_tag_path = both_dir.join(".NOBACKUP")
|
||||
custom_tag_path.write("test")
|
||||
|
||||
# Create a directory without any tag files
|
||||
normal_dir = tmpdir.mkdir("normal_dir")
|
||||
|
||||
# Test edge cases
|
||||
test_dir = tmpdir.mkdir("test_dir")
|
||||
assert dir_is_tagged(path=str(test_dir), exclude_caches=None, exclude_if_present=None) == []
|
||||
assert dir_is_tagged(path=str(test_dir), exclude_if_present=[]) == []
|
||||
|
||||
# Test with non-existent directory (should not raise an exception)
|
||||
non_existent_dir = str(tmpdir.join("non_existent"))
|
||||
result = dir_is_tagged(path=non_existent_dir, exclude_caches=True, exclude_if_present=[".NOBACKUP"])
|
||||
assert result == []
|
||||
|
||||
# Test 1: exclude_caches with path-based operations
|
||||
assert dir_is_tagged(path=str(cache_dir), exclude_caches=True) == [CACHE_TAG_NAME]
|
||||
assert dir_is_tagged(path=str(invalid_cache_dir), exclude_caches=True) == []
|
||||
assert dir_is_tagged(path=str(normal_dir), exclude_caches=True) == []
|
||||
|
||||
assert dir_is_tagged(path=str(cache_dir), exclude_caches=False) == []
|
||||
assert dir_is_tagged(path=str(invalid_cache_dir), exclude_caches=False) == []
|
||||
assert dir_is_tagged(path=str(normal_dir), exclude_caches=False) == []
|
||||
|
||||
# Test 2: exclude_caches with file-descriptor-based operations
|
||||
with open_dir(str(cache_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == [CACHE_TAG_NAME]
|
||||
with open_dir(str(invalid_cache_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == []
|
||||
with open_dir(str(normal_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == []
|
||||
|
||||
with open_dir(str(cache_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == []
|
||||
with open_dir(str(invalid_cache_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == []
|
||||
with open_dir(str(normal_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == []
|
||||
|
||||
# Test 3: exclude_if_present with path-based operations
|
||||
tags = [".NOBACKUP"]
|
||||
assert dir_is_tagged(path=str(tagged_dir), exclude_if_present=tags) == [".NOBACKUP"]
|
||||
assert dir_is_tagged(path=str(other_tagged_dir), exclude_if_present=tags) == []
|
||||
assert dir_is_tagged(path=str(normal_dir), exclude_if_present=tags) == []
|
||||
|
||||
tags = [".NOBACKUP", ".DONOTBACKUP"]
|
||||
assert dir_is_tagged(path=str(tagged_dir), exclude_if_present=tags) == [".NOBACKUP"]
|
||||
assert dir_is_tagged(path=str(other_tagged_dir), exclude_if_present=tags) == [".DONOTBACKUP"]
|
||||
assert dir_is_tagged(path=str(normal_dir), exclude_if_present=tags) == []
|
||||
|
||||
# Test 4: exclude_if_present with file descriptor-based operations
|
||||
tags = [".NOBACKUP"]
|
||||
with open_dir(str(tagged_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".NOBACKUP"]
|
||||
with open_dir(str(other_tagged_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == []
|
||||
with open_dir(str(normal_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == []
|
||||
|
||||
tags = [".NOBACKUP", ".DONOTBACKUP"]
|
||||
with open_dir(str(tagged_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".NOBACKUP"]
|
||||
with open_dir(str(other_tagged_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".DONOTBACKUP"]
|
||||
with open_dir(str(normal_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == []
|
||||
|
||||
# Test 5: both exclude types with path-based operations
|
||||
assert sorted(dir_is_tagged(path=str(both_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"])) == [
|
||||
".NOBACKUP",
|
||||
CACHE_TAG_NAME,
|
||||
]
|
||||
assert dir_is_tagged(path=str(cache_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [CACHE_TAG_NAME]
|
||||
assert dir_is_tagged(path=str(tagged_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [".NOBACKUP"]
|
||||
assert dir_is_tagged(path=str(normal_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == []
|
||||
|
||||
# Test 6: both exclude types with file descriptor-based operations
|
||||
with open_dir(str(both_dir)) as fd:
|
||||
result = dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"])
|
||||
assert sorted(result) == [".NOBACKUP", CACHE_TAG_NAME]
|
||||
with open_dir(str(cache_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [CACHE_TAG_NAME]
|
||||
with open_dir(str(tagged_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [".NOBACKUP"]
|
||||
with open_dir(str(normal_dir)) as fd:
|
||||
assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == []
|
||||
|
|
|
|||
Loading…
Reference in a new issue