Merge pull request #9690 from ThomasWaldmann/sha256-key-filenames

keyfile: name key files by sha256(keyfile_contents)
This commit is contained in:
TW 2026-05-31 17:18:49 +02:00 committed by GitHub
commit 76b77c748f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 96 additions and 126 deletions

View file

@ -16,7 +16,7 @@ import argon2.low_level
from ..constants import * # NOQA
from ..helpers import StableDict
from ..helpers import Error, IntegrityError
from ..helpers import get_keys_dir
from ..helpers import get_keys_dir, secure_erase
from ..helpers import get_limited_unpacker
from ..helpers import bin_to_hex
from ..helpers.passphrase import Passphrase, PasswordRetriesExceeded, PassphraseWrong
@ -32,6 +32,11 @@ from .low_level import bytes_to_int, num_cipher_blocks, hmac_sha256, blake2b_256
from .low_level import AES256_OCB, CHACHA20_POLY1305
from . import low_level
def keyfile_name_for(content: bytes) -> str:
return sha256(content).hexdigest()
# workaround for lost passphrase or key in "authenticated" or "authenticated-blake2" mode
AUTHENTICATED_NO_KEY = "authenticated_no_key" in workarounds
@ -547,7 +552,7 @@ class FlexiKey:
key.init_ciphers()
target = key.get_new_target(args)
key.save(target, passphrase, create=True, algorithm=KEY_ALGORITHMS["argon2"])
logger.info('Key in "%s" created.' % target)
logger.info('Key in "%s" created.' % key.target)
logger.info("Keep this key safe. Your data will be inaccessible without it.")
return key
@ -613,7 +618,7 @@ class FlexiKey:
keyfile = self._find_key_in_keys_dir()
if keyfile is not None:
return keyfile
return self._get_new_target_in_keys_dir(args)
return get_keys_dir()
def _find_key_in_keys_dir(self):
id = self.repository.id
@ -630,7 +635,7 @@ class FlexiKey:
keyfile = self._find_key_file_from_environment()
if keyfile is not None:
return keyfile
return self._get_new_target_in_keys_dir(args)
return get_keys_dir()
elif self.STORAGE == KeyBlobStorage.REPO:
return self.repository
else:
@ -641,15 +646,6 @@ class FlexiKey:
if keyfile:
return os.path.abspath(keyfile)
def _get_new_target_in_keys_dir(self, args):
filename = args.location.to_key_filename()
path = Path(filename)
i = 1
while path.exists():
i += 1
path = Path(filename + ".%d" % i)
return str(path)
def load(self, target, passphrase):
if self.STORAGE == KeyBlobStorage.KEYFILE:
with open(target) as fd:
@ -677,15 +673,30 @@ class FlexiKey:
def save(self, target, passphrase, algorithm, create=False):
key_data = self._save(passphrase, algorithm)
if self.STORAGE == KeyBlobStorage.KEYFILE:
old_target = getattr(self, "target", None)
keys_dir = get_keys_dir()
keyfile_data = f"{self.FILE_ID} {bin_to_hex(self.repository_id)}\n{key_data}\n"
target_dir = target if os.path.isdir(target) else os.path.dirname(target)
auto_named = not os.environ.get("BORG_KEY_FILE") and os.path.samefile(target_dir, keys_dir)
if auto_named:
target = os.path.join(keys_dir, keyfile_name_for(keyfile_data.encode()))
if create and os.path.isfile(target):
# if a new keyfile key repository is created, ensure that an existing keyfile of another
# keyfile key repo is not accidentally overwritten by careless use of the BORG_KEY_FILE env var.
# see issue #6036
raise Error('Aborting because key in "%s" already exists.' % target)
with SaveFile(target) as fd:
fd.write(f"{self.FILE_ID} {bin_to_hex(self.repository_id)}\n")
fd.write(key_data)
fd.write("\n")
fd.write(keyfile_data)
if auto_named and isinstance(old_target, str) and old_target != target:
try:
in_keys_dir = os.path.samefile(os.path.dirname(old_target), keys_dir)
except OSError:
in_keys_dir = False
if in_keys_dir:
try:
secure_erase(old_target, avoid_collateral_damage=True)
except OSError as exc:
logger.debug('Could not remove previous keyfile "%s": %s', old_target, exc)
elif self.STORAGE == KeyBlobStorage.REPO:
self.logically_encrypted = passphrase != "" # nosec B105
key_data = key_data.encode("utf-8") # remote repo: msgpack issue #99, giving bytes

View file

@ -1,13 +1,14 @@
import binascii
import os
import pkgutil
import textwrap
from hashlib import sha256
from ..helpers import Error, yes, bin_to_hex, hex_to_bin, dash_open
from ..helpers import Error, yes, bin_to_hex, hex_to_bin, dash_open, get_keys_dir
from ..repoobj import RepoObj
from .key import CHPOKeyfileKey, RepoKeyNotFoundError, KeyBlobStorage, identify_key
from .key import CHPOKeyfileKey, RepoKeyNotFoundError, KeyBlobStorage, identify_key, keyfile_name_for
class NotABorgKeyFile(Error):
@ -72,8 +73,11 @@ class KeyManager:
if self.keyblob_storage == KeyBlobStorage.KEYFILE:
k = CHPOKeyfileKey(self.repository)
target = k.get_existing_or_new_target(args)
self.store_keyfile(target)
keyfile_data = self.get_keyfile_data()
if not os.environ.get("BORG_KEY_FILE") and os.path.samefile(target, get_keys_dir()):
target = os.path.join(target, keyfile_name_for(keyfile_data.encode()))
with dash_open(target, "w") as fd:
fd.write(keyfile_data)
elif self.keyblob_storage == KeyBlobStorage.REPO:
self.repository.save_key(self.keyblob.encode("utf-8"))

View file

@ -24,7 +24,7 @@ logger = create_logger()
import yaml
from .errors import Error
from .fs import get_keys_dir, make_path_safe, slashify
from .fs import make_path_safe, slashify
from .argparsing import Action, ArgumentError, ArgumentTypeError, register_type
from .msgpack import Timestamp
from .time import OutputTimestamp, format_time, safe_timestamp
@ -678,17 +678,6 @@ class Location:
]
return ", ".join(items)
def to_key_filename(self):
name = re.sub(r"[^\w]", "_", self.path.rstrip("/"))
if self.proto not in ("file", "socket", "rclone"):
name = re.sub(r"[^\w]", "_", self.host) + "__" + name
if len(name) > 120:
# Limit file names to some reasonable length. Most file systems
# limit them to 255 [unit of choice]; due to variations in unicode
# handling we truncate to 100 *characters*.
name = name[:120]
return os.path.join(get_keys_dir(), name)
def __repr__(self):
return "Location(%s)" % self

View file

@ -1,5 +1,6 @@
import binascii
import os
from hashlib import sha256
import pytest
@ -66,6 +67,43 @@ def test_change_location_to_b2repokey(archivers, request):
assert "(repokey BLAKE2b" in log
def test_keyfile_name_is_content_sha256(archivers, request):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", KF_ENCRYPTION)
[key_filename] = os.listdir(archiver.keys_path)
key_path = os.path.join(archiver.keys_path, key_filename)
with open(key_path, "rb") as fd:
key_content = fd.read()
assert key_filename == sha256(key_content).hexdigest()
def test_change_passphrase_renames_keyfile_to_new_sha256(archivers, request):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", KF_ENCRYPTION)
[old_key_filename] = os.listdir(archiver.keys_path)
old_key_path = os.path.join(archiver.keys_path, old_key_filename)
os.environ["BORG_NEW_PASSPHRASE"] = "newpassphrase"
cmd(archiver, "key", "change-passphrase")
os.environ["BORG_PASSPHRASE"] = "newpassphrase"
[new_key_filename] = os.listdir(archiver.keys_path)
new_key_path = os.path.join(archiver.keys_path, new_key_filename)
assert old_key_filename != new_key_filename
assert not os.path.exists(old_key_path)
with open(new_key_path, "rb") as fd:
key_content = fd.read()
assert new_key_filename == sha256(key_content).hexdigest()
cmd(archiver, "repo-list")
def test_borg_key_file_env_keeps_explicit_path(archivers, request, monkeypatch):
archiver = request.getfixturevalue(archivers)
explicit_key_path = os.path.join(archiver.output_path, "explicit-key")
monkeypatch.setenv("BORG_KEY_FILE", explicit_key_path)
cmd(archiver, "repo-create", KF_ENCRYPTION)
assert os.path.isfile(explicit_key_path)
assert os.listdir(archiver.keys_path) == []
def test_key_export_keyfile(archivers, request):
archiver = request.getfixturevalue(archivers)
export_file = archiver.output_path + "/exported"

View file

@ -55,51 +55,3 @@ def test_repo_create_refuse_to_overwrite_keyfile(archivers, request, monkeypatch
with open(keyfile) as file:
after = file.read()
assert before == after
def test_repo_create_keyfile_same_path_creates_new_keys(archivers, request):
"""Regression test for GH issue #6230.
When creating a new keyfile-encrypted repository at the same filesystem path
multiple times (e.g., after moving/unmounting the previous one), Borg must not
overwrite or reuse the existing key file. Instead, it should create a new key
file in the keys directory, appending a numeric suffix like .2, .3, ...
"""
archiver = request.getfixturevalue(archivers)
# First creation at path A
cmd(archiver, "repo-create", KF_ENCRYPTION)
keys = sorted(os.listdir(archiver.keys_path))
assert len(keys) == 1
base_key = keys[0]
base_path = os.path.join(archiver.keys_path, base_key)
with open(base_path, "rb") as f:
base_contents = f.read()
# Simulate moving/unmounting the repo by removing the path to allow re-create at the same path
import shutil
shutil.rmtree(archiver.repository_path)
cmd(archiver, "repo-create", KF_ENCRYPTION)
keys = sorted(os.listdir(archiver.keys_path))
assert len(keys) == 2
assert base_key in keys
# The new file should be base_key suffixed with .2
assert any(k == base_key + ".2" for k in keys)
second_path = os.path.join(archiver.keys_path, base_key + ".2")
with open(second_path, "rb") as f:
second_contents = f.read()
assert second_contents != base_contents
# Remove repo again and create a third time at same path
shutil.rmtree(archiver.repository_path)
cmd(archiver, "repo-create", KF_ENCRYPTION)
keys = sorted(os.listdir(archiver.keys_path))
assert len(keys) == 3
assert any(k == base_key + ".3" for k in keys)
third_path = os.path.join(archiver.keys_path, base_key + ".3")
with open(third_path, "rb") as f:
third_contents = f.read()
# Ensure all keys are distinct
assert third_contents != base_contents
assert third_contents != second_contents

View file

@ -1,3 +1,4 @@
import os
import tempfile
from binascii import a2b_base64
from unittest.mock import MagicMock
@ -183,6 +184,15 @@ class TestKey:
key = Blake2KeyfileKey.detect(self.MockRepository(), self.keyfile_blake2_cdata)
assert key.decrypt(self.keyfile_blake2_id, self.keyfile_blake2_cdata) == b"payload"
def test_legacy_named_keyfile_still_loads(self, monkeypatch, keys_dir):
monkeypatch.setenv("BORG_PASSPHRASE", "test")
key = CHPOKeyfileKey.create(self.MockRepository(), self.MockArgs())
hashed_keyfile = key.target
legacy_keyfile = str(keys_dir.join("legacy-name"))
os.replace(hashed_keyfile, legacy_keyfile)
key2 = CHPOKeyfileKey.detect(self.MockRepository(), key.encrypt(b"", b"payload"))
assert key2.target == legacy_keyfile
def _corrupt_byte(self, key, data, offset):
data = bytearray(data)
# note: we corrupt in a way so that even corruption of the unauthenticated encryption type byte

View file

@ -76,26 +76,16 @@ def test_text_to_json(key, value, strict):
class TestLocationWithoutEnv:
@pytest.fixture
def keys_dir(self, tmpdir, monkeypatch):
tmpdir = str(tmpdir)
monkeypatch.setenv("BORG_KEYS_DIR", tmpdir)
if not tmpdir.endswith(os.path.sep):
tmpdir += os.path.sep
return tmpdir
def test_ssh(self, monkeypatch, keys_dir):
def test_ssh(self, monkeypatch):
monkeypatch.delenv("BORG_REPO", raising=False)
assert (
repr(Location("ssh://user@host:1234//absolute/path"))
== "Location(proto='ssh', user='user', pass=None, host='host', port=1234, path='/absolute/path')"
)
assert Location("ssh://user@host:1234//absolute/path").to_key_filename() == keys_dir + "host___absolute_path"
assert (
repr(Location("ssh://user@host:1234/relative/path"))
== "Location(proto='ssh', user='user', pass=None, host='host', port=1234, path='relative/path')"
)
assert Location("ssh://user@host:1234/relative/path").to_key_filename() == keys_dir + "host__relative_path"
assert (
repr(Location("ssh://user@host/relative/path"))
== "Location(proto='ssh', user='user', pass=None, host='host', port=None, path='relative/path')"
@ -104,7 +94,6 @@ class TestLocationWithoutEnv:
repr(Location("ssh://user@[::]:1234/relative/path"))
== "Location(proto='ssh', user='user', pass=None, host='::', port=1234, path='relative/path')"
)
assert Location("ssh://user@[::]:1234/relative/path").to_key_filename() == keys_dir + "____relative_path"
assert (
repr(Location("ssh://user@[::]/relative/path"))
== "Location(proto='ssh', user='user', pass=None, host='::', port=None, path='relative/path')"
@ -113,10 +102,6 @@ class TestLocationWithoutEnv:
repr(Location("ssh://user@[2001:db8::]:1234/relative/path"))
== "Location(proto='ssh', user='user', pass=None, host='2001:db8::', port=1234, path='relative/path')"
)
assert (
Location("ssh://user@[2001:db8::]:1234/relative/path").to_key_filename()
== keys_dir + "2001_db8____relative_path"
)
assert (
repr(Location("ssh://user@[2001:db8::]/relative/path"))
== "Location(proto='ssh', user='user', pass=None, host='2001:db8::', port=None, path='relative/path')"
@ -137,10 +122,6 @@ class TestLocationWithoutEnv:
repr(Location("ssh://user@[2001:db8::192.0.2.1]/relative/path"))
== "Location(proto='ssh', user='user', pass=None, host='2001:db8::192.0.2.1', port=None, path='relative/path')" # noqa: E501
)
assert (
Location("ssh://user@[2001:db8::192.0.2.1]/relative/path").to_key_filename()
== keys_dir + "2001_db8__192_0_2_1__relative_path"
)
assert (
repr(Location("ssh://user@[2a02:0001:0002:0003:0004:0005:0006:0007]/relative/path"))
== "Location(proto='ssh', user='user', pass=None, "
@ -152,7 +133,7 @@ class TestLocationWithoutEnv:
"host='2a02:0001:0002:0003:0004:0005:0006:0007', port=1234, path='relative/path')"
)
def test_s3(self, monkeypatch, keys_dir):
def test_s3(self, monkeypatch):
monkeypatch.delenv("BORG_REPO", raising=False)
assert (
repr(Location("s3:/test/path"))
@ -171,38 +152,34 @@ class TestLocationWithoutEnv:
== "Location(proto='b2', user='user', pass='REDACTED', host='s3.us-east-005.backblazeb2.com', port=None, path='test/path')" # noqa: E501
)
def test_rclone(self, monkeypatch, keys_dir):
def test_rclone(self, monkeypatch):
monkeypatch.delenv("BORG_REPO", raising=False)
assert (
repr(Location("rclone:remote:path"))
== "Location(proto='rclone', user=None, pass=None, host=None, port=None, path='remote:path')"
)
assert Location("rclone:remote:path").to_key_filename() == keys_dir + "remote_path"
def test_sftp(self, monkeypatch, keys_dir):
def test_sftp(self, monkeypatch):
monkeypatch.delenv("BORG_REPO", raising=False)
# relative path
assert (
repr(Location("sftp://user@host:1234/rel/path"))
== "Location(proto='sftp', user='user', pass=None, host='host', port=1234, path='rel/path')"
)
assert Location("sftp://user@host:1234/rel/path").to_key_filename() == keys_dir + "host__rel_path"
# absolute path
assert (
repr(Location("sftp://user@host:1234//abs/path"))
== "Location(proto='sftp', user='user', pass=None, host='host', port=1234, path='/abs/path')"
)
assert Location("sftp://user@host:1234//abs/path").to_key_filename() == keys_dir + "host___abs_path"
def test_http(self, monkeypatch, keys_dir):
def test_http(self, monkeypatch):
monkeypatch.delenv("BORG_REPO", raising=False)
assert (
repr(Location("http://user:pass@host:1234/"))
== "Location(proto='http', user='user', pass='REDACTED', host='host', port=1234, path='/')"
)
assert Location("http://user:pass@host:1234/").to_key_filename() == keys_dir + "host__"
def test_socket(self, monkeypatch, keys_dir):
def test_socket(self, monkeypatch):
monkeypatch.delenv("BORG_REPO", raising=False)
url = "socket:///c:/repo/path" if is_win32 else "socket:///repo/path"
path = "c:/repo/path" if is_win32 else "/repo/path"
@ -210,27 +187,24 @@ class TestLocationWithoutEnv:
repr(Location(url))
== f"Location(proto='socket', user=None, pass=None, host=None, port=None, path='{path}')"
)
assert Location(url).to_key_filename().endswith("_repo_path")
def test_file(self, monkeypatch, keys_dir):
def test_file(self, monkeypatch):
monkeypatch.delenv("BORG_REPO", raising=False)
url = "file:///c:/repo/path" if is_win32 else "file:///repo/path"
path = "c:/repo/path" if is_win32 else "/repo/path"
assert (
repr(Location(url)) == f"Location(proto='file', user=None, pass=None, host=None, port=None, path='{path}')"
)
assert Location(url).to_key_filename().endswith("_repo_path")
@pytest.mark.skipif(is_win32, reason="still broken")
def test_smb(self, monkeypatch, keys_dir):
def test_smb(self, monkeypatch):
monkeypatch.delenv("BORG_REPO", raising=False)
assert (
repr(Location("file:////server/share/path"))
== "Location(proto='file', user=None, pass=None, host=None, port=None, path='//server/share/path')"
)
assert Location("file:////server/share/path").to_key_filename().endswith("__server_share_path")
def test_folder(self, monkeypatch, keys_dir):
def test_folder(self, monkeypatch):
monkeypatch.delenv("BORG_REPO", raising=False)
rel_path = "path"
abs_path = os.path.abspath(rel_path)
@ -238,23 +212,20 @@ class TestLocationWithoutEnv:
repr(Location(rel_path))
== f"Location(proto='file', user=None, pass=None, host=None, port=None, path='{abs_path}')"
)
assert Location("path").to_key_filename().endswith(rel_path)
@pytest.mark.skipif(is_win32, reason="Windows has drive letters in abs paths")
def test_abspath(self, monkeypatch, keys_dir):
def test_abspath(self, monkeypatch):
monkeypatch.delenv("BORG_REPO", raising=False)
assert (
repr(Location("/some/absolute/path"))
== "Location(proto='file', user=None, pass=None, host=None, port=None, path='/some/absolute/path')"
)
assert Location("/some/absolute/path").to_key_filename() == keys_dir + "_some_absolute_path"
assert (
repr(Location("/some/../absolute/path"))
== "Location(proto='file', user=None, pass=None, host=None, port=None, path='/absolute/path')"
)
assert Location("/some/../absolute/path").to_key_filename() == keys_dir + "_absolute_path"
def test_relpath(self, monkeypatch, keys_dir):
def test_relpath(self, monkeypatch):
monkeypatch.delenv("BORG_REPO", raising=False)
# For a local path, Borg creates a Location instance with an absolute path.
rel_path = "relative/path"
@ -263,31 +234,26 @@ class TestLocationWithoutEnv:
repr(Location(rel_path))
== f"Location(proto='file', user=None, pass=None, host=None, port=None, path='{abs_path}')"
)
assert Location(rel_path).to_key_filename().endswith("relative_path")
assert (
repr(Location("ssh://user@host/relative/path"))
== "Location(proto='ssh', user='user', pass=None, host='host', port=None, path='relative/path')"
)
assert Location("ssh://user@host/relative/path").to_key_filename() == keys_dir + "host__relative_path"
@pytest.mark.skipif(is_win32, reason="Windows does not support colons in paths")
def test_with_colons(self, monkeypatch, keys_dir):
def test_with_colons(self, monkeypatch):
monkeypatch.delenv("BORG_REPO", raising=False)
assert (
repr(Location("/abs/path:w:cols"))
== "Location(proto='file', user=None, pass=None, host=None, port=None, path='/abs/path:w:cols')"
)
assert Location("/abs/path:w:cols").to_key_filename() == keys_dir + "_abs_path_w_cols"
assert (
repr(Location("file:///abs/path:w:cols"))
== "Location(proto='file', user=None, pass=None, host=None, port=None, path='/abs/path:w:cols')"
)
assert Location("file:///abs/path:w:cols").to_key_filename() == keys_dir + "_abs_path_w_cols"
assert (
repr(Location("ssh://user@host/abs/path:w:cols"))
== "Location(proto='ssh', user='user', pass=None, host='host', port=None, path='abs/path:w:cols')"
)
assert Location("ssh://user@host/abs/path:w:cols").to_key_filename() == keys_dir + "host__abs_path_w_cols"
def test_canonical_path(self, monkeypatch):
monkeypatch.delenv("BORG_REPO", raising=False)