pathlib refactor fs

This commit is contained in:
Thomas Waldmann 2025-06-10 12:03:10 +02:00
parent 9a65d5245d
commit a0f73d7a61
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
2 changed files with 26 additions and 22 deletions

View file

@ -7,6 +7,7 @@ import stat
import subprocess
import sys
import textwrap
from pathlib import Path
import platformdirs
@ -34,7 +35,7 @@ def ensure_dir(path, mode=stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO, pretty_dea
An exception otherwise. If a deadly exception happened it is reraised.
"""
try:
os.makedirs(path, mode=mode, exist_ok=True)
Path(path).mkdir(mode=mode, parents=True, exist_ok=True)
except OSError as e:
if pretty_deadly:
raise Error(str(e))
@ -52,12 +53,12 @@ def get_base_dir(*, legacy=False):
"""
if legacy:
base_dir = os.environ.get("BORG_BASE_DIR") or os.environ.get("HOME")
# os.path.expanduser() behaves differently for '~' and '~someuser' as
# Path.expanduser() behaves differently for '~' and '~someuser' as
# parameters: when called with an explicit username, the possibly set
# environment variable HOME is no longer respected. So we have to check if
# it is set and only expand the user's home directory if HOME is unset.
if not base_dir:
base_dir = os.path.expanduser("~%s" % os.environ.get("USER", ""))
base_dir = str(Path(f"~{os.environ.get('USER', '')}").expanduser())
else:
# we only care for BORG_BASE_DIR here, as it can be used to override the base dir
# and not use any more or less platform specific way to determine the base dir.
@ -68,7 +69,7 @@ def get_base_dir(*, legacy=False):
def join_base_dir(*paths, **kw):
legacy = kw.get("legacy", True)
base_dir = get_base_dir(legacy=legacy)
return None if base_dir is None else os.path.join(base_dir, *paths)
return None if base_dir is None else str(Path(base_dir).joinpath(*paths))
def get_keys_dir(*, legacy=False, create=True):
@ -76,7 +77,7 @@ def get_keys_dir(*, legacy=False, create=True):
keys_dir = os.environ.get("BORG_KEYS_DIR")
if keys_dir is None:
# note: do not just give this as default to the environment.get(), see issue #5979.
keys_dir = os.path.join(get_config_dir(legacy=legacy), "keys")
keys_dir = str(Path(get_config_dir(legacy=legacy)) / "keys")
if create:
ensure_dir(keys_dir)
return keys_dir
@ -88,9 +89,9 @@ def get_security_dir(repository_id=None, *, legacy=False, create=True):
if security_dir is None:
get_dir = get_config_dir if legacy else get_data_dir
# note: do not just give this as default to the environment.get(), see issue #5979.
security_dir = os.path.join(get_dir(legacy=legacy), "security")
security_dir = str(Path(get_dir(legacy=legacy)) / "security")
if repository_id:
security_dir = os.path.join(security_dir, repository_id)
security_dir = str(Path(security_dir) / repository_id)
if create:
ensure_dir(security_dir)
return security_dir
@ -119,7 +120,7 @@ def get_runtime_dir(*, legacy=False, create=True):
def get_socket_filename():
return os.path.join(get_runtime_dir(), "borg.sock")
return str(Path(get_runtime_dir()) / "borg.sock")
def get_cache_dir(*, legacy=False, create=True):
@ -132,15 +133,15 @@ def get_cache_dir(*, legacy=False, create=True):
if not os.environ.get("BORG_BASE_DIR"):
cache_home = os.environ.get("XDG_CACHE_HOME", cache_home)
# Use BORG_CACHE_DIR if set, otherwise assemble final path from cache home path
cache_dir = os.environ.get("BORG_CACHE_DIR", os.path.join(cache_home, "borg"))
cache_dir = os.environ.get("BORG_CACHE_DIR", str(Path(cache_home) / "borg"))
else:
cache_dir = os.environ.get(
"BORG_CACHE_DIR", join_base_dir(".cache", "borg", legacy=legacy) or platformdirs.user_cache_dir("borg")
)
if create:
ensure_dir(cache_dir)
cache_tag_fn = os.path.join(cache_dir, CACHE_TAG_NAME)
if not os.path.exists(cache_tag_fn):
cache_tag_fn = Path(cache_dir) / CACHE_TAG_NAME
if not cache_tag_fn.exists():
cache_tag_contents = (
CACHE_TAG_CONTENTS
+ textwrap.dedent(
@ -168,7 +169,7 @@ def get_config_dir(*, legacy=False, create=True):
if not os.environ.get("BORG_BASE_DIR"):
config_home = os.environ.get("XDG_CONFIG_HOME", config_home)
# Use BORG_CONFIG_DIR if set, otherwise assemble final path from config home path
config_dir = os.environ.get("BORG_CONFIG_DIR", os.path.join(config_home, "borg"))
config_dir = os.environ.get("BORG_CONFIG_DIR", str(Path(config_home) / "borg"))
else:
config_dir = os.environ.get(
"BORG_CONFIG_DIR", join_base_dir(".config", "borg", legacy=legacy) or platformdirs.user_config_dir("borg")
@ -191,7 +192,7 @@ def dir_is_cachedir(path=None, dir_fd=None):
if dir_fd is not None:
tag_fd = os.open(CACHE_TAG_NAME, os.O_RDONLY, dir_fd=dir_fd)
else:
tag_fd = os.open(os.path.join(path, CACHE_TAG_NAME), os.O_RDONLY)
tag_fd = os.open(str(Path(path) / CACHE_TAG_NAME), os.O_RDONLY)
return os.read(tag_fd, len(CACHE_TAG_CONTENTS)) == CACHE_TAG_CONTENTS
except (FileNotFoundError, OSError):
return False
@ -228,8 +229,8 @@ def dir_is_tagged(path=None, exclude_caches=None, exclude_if_present=None, dir_f
tag_names.append(CACHE_TAG_NAME)
if exclude_if_present is not None:
for tag in exclude_if_present:
tag_path = os.path.join(path, tag)
if os.path.exists(tag_path):
tag_path = Path(path) / tag
if tag_path.exists():
tag_names.append(tag)
return tag_names
@ -417,13 +418,14 @@ def secure_erase(path, *, avoid_collateral_damage):
If avoid_collateral_damage is False, we always secure erase.
If there are hardlinks pointing to the same inode as <path>, they will contain random garbage afterwards.
"""
with open(path, "r+b") as fd:
path_obj = Path(path)
with path_obj.open("r+b") as fd:
st = os.stat(fd.fileno())
if not (st.st_nlink > 1 and avoid_collateral_damage):
fd.write(os.urandom(st.st_size))
fd.flush()
os.fsync(fd.fileno())
os.unlink(path)
path_obj.unlink()
def safe_unlink(path):
@ -438,14 +440,15 @@ def safe_unlink(path):
recover. Refer to the "File system interaction" section
in legacyrepository.py for further explanations.
"""
path_obj = Path(path)
try:
os.unlink(path)
path_obj.unlink()
except OSError as unlink_err:
if unlink_err.errno != errno.ENOSPC:
# not free space related, give up here.
raise
# we ran out of space while trying to delete the file.
st = os.stat(path)
st = path_obj.stat()
if st.st_nlink > 1:
# rather give up here than cause collateral damage to the other hardlink.
raise
@ -459,7 +462,7 @@ def safe_unlink(path):
raise unlink_err
else:
# successfully truncated the file, try again deleting it:
os.unlink(path)
path_obj.unlink()
def dash_open(path, mode):

View file

@ -2,6 +2,7 @@ import errno
import os
import sys
from contextlib import contextmanager
from pathlib import Path
import pytest
@ -257,10 +258,10 @@ def test_safe_unlink_is_safe_ENOSPC(tmpdir, monkeypatch):
hard_link = tmpdir / "hardlink"
os.link(str(victim), str(hard_link)) # hard_link.mklinkto is not implemented on win32
def os_unlink(_):
def Path_unlink(_):
raise OSError(errno.ENOSPC, "Pretend that we ran out of space")
monkeypatch.setattr(os, "unlink", os_unlink)
monkeypatch.setattr(Path, "unlink", Path_unlink)
with pytest.raises(OSError):
safe_unlink(hard_link)