repokey: use same format as with external keyfile

- always have a starting line with FILE_ID repoid
- store repkeys content-addressed, name is sha256(content)
- search by repo id on load
- add keyfile_format / keyfile_parse / is_keyfile helpers
This commit is contained in:
Thomas Waldmann 2026-05-31 17:36:58 +02:00
parent a0448abde0
commit f13dd4ddc9
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
7 changed files with 139 additions and 72 deletions

View file

@ -51,9 +51,9 @@ data/
0000... .. ffff...
keys/
repokey
When using encryption in repokey mode, the encrypted, passphrase protected
key is stored here as a base64 encoded text.
key is stored here as a base64 encoded text. The sha256 content hash is
used for the name.
locks/
used by the locking system to manage shared and exclusive locks.
@ -615,7 +615,7 @@ b) with ``create --chunker-params buzhash,19,23,21,4095`` (default):
it can not skip unmodified files then.
.. _internals_hashindex:
HashIndex
---------
@ -721,8 +721,10 @@ removed in a future release.
Both modes
~~~~~~~~~~
Encryption keys (and other secrets) are kept either in a key file on the client
('keyfile' mode) or in the repository under keys/repokey ('repokey' mode).
Encryption keys (and other secrets) are kept either in the keys directory on
the client ('keyfile' mode) or under the keys/ namespace in the repository
('repokey' mode) using the sha256 of the file content as the name.
In both cases, the secrets are generated from random and then encrypted by a
key derived from your passphrase (this happens on the client before the key
is stored into the keyfile or as repokey).

View file

@ -228,7 +228,7 @@ class KeysMixIn:
Change the location of a Borg key. The key can be stored at different locations:
- keyfile: locally, usually in the home directory
- repokey: inside the repository (in the repository config)
- repokey: inside the repository
Please note:

View file

@ -4,7 +4,7 @@ import os
import textwrap
from hashlib import sha256
from pathlib import Path
from typing import Literal, ClassVar
from typing import Literal, ClassVar, Optional
from collections.abc import Callable
from ..logger import create_logger
@ -37,6 +37,39 @@ def keyfile_name_for(content: bytes) -> str:
return sha256(content).hexdigest()
KEYFILE_ID = "BORG_KEY"
def is_keyfile(data: str | bytes, repoid: Optional[str] = None) -> bool:
# repoid is a hex str, if given. if given, we only accept keyfiles for that repo.
header = f"{KEYFILE_ID} {repoid or ''}"
if isinstance(data, str):
return data.startswith(header)
elif isinstance(data, bytes):
# data can be given as bytes to avoid decoding issues for invalid files.
return data.startswith(header.encode())
else:
raise TypeError(f"Expected str or bytes, got {type(data)}")
def keyfile_format(repoid: str, b64data: str) -> str:
return f"{KEYFILE_ID} {repoid}\n{b64data}\n"
def keyfile_parse(data: str | bytes, repoid: Optional[str] = None) -> tuple[str, str]:
if repoid is None:
if not is_keyfile(data):
raise ValueError("Not a keyfile")
else:
if not is_keyfile(data, repoid):
raise ValueError("Not a keyfile for repo %s" % repoid)
if isinstance(data, bytes):
data = data.decode()
header, b64data = data.split("\n", 1)
repoid = header[len(KEYFILE_ID) + 1 :]
return repoid, b64data
# workaround for lost passphrase or key in "authenticated" or "authenticated-blake2" mode
AUTHENTICATED_NO_KEY = "authenticated_no_key" in workarounds
@ -386,7 +419,7 @@ class AESKeyBase(KeyBase):
class FlexiKey:
FILE_ID = "BORG_KEY"
FILE_ID = KEYFILE_ID
STORAGE: ClassVar[str] = KeyBlobStorage.NO_STORAGE # override in subclass
@classmethod
@ -557,38 +590,23 @@ class FlexiKey:
return key
def sanity_check(self, filename, id):
file_id = self.FILE_ID.encode() + b" "
repo_id = bin_to_hex(id).encode("ascii")
repo_id_hex = bin_to_hex(id)
with open(filename, "rb") as fd:
# we do the magic / id check in binary mode to avoid stumbling over
# decoding errors if somebody has binary files in the keys dir for some reason.
if fd.read(len(file_id)) != file_id:
raise KeyfileInvalidError(self.repository._location.canonical_path(), filename)
if fd.read(len(repo_id)) != repo_id:
raise KeyfileMismatchError(self.repository._location.canonical_path(), filename)
data = fd.read(10000)
if not is_keyfile(data):
raise KeyfileInvalidError(self.repository._location.canonical_path(), filename)
if not is_keyfile(data, repo_id_hex):
raise KeyfileMismatchError(self.repository._location.canonical_path(), filename)
# we get here if it really looks like a borg key for this repo,
# do some more checks that are close to how borg reads/parses the key.
with open(filename) as fd:
lines = fd.readlines()
if len(lines) < 2:
logger.warning(f"borg key sanity check: expected 2+ lines total. [{filename}]")
raise KeyfileInvalidError(self.repository._location.canonical_path(), filename)
if len(lines[0].rstrip()) > len(file_id) + len(repo_id):
logger.warning(f"borg key sanity check: key line 1 seems too long. [{filename}]")
raise KeyfileInvalidError(self.repository._location.canonical_path(), filename)
key_b64 = "".join(lines[1:])
try:
key = binascii.a2b_base64(key_b64)
except (ValueError, binascii.Error):
logger.warning(f"borg key sanity check: key line 2+ does not look like base64. [{filename}]")
raise KeyfileInvalidError(self.repository._location.canonical_path(), filename) from None
if len(key) < 20:
# this is in no way a precise check, usually we have about 400b key data.
logger.warning(
f"borg key sanity check: binary encrypted key data from key line 2+ suspiciously short."
f" [{filename}]"
)
raise KeyfileInvalidError(self.repository._location.canonical_path(), filename)
_, key_b64 = keyfile_parse(data, repo_id_hex)
try:
binascii.a2b_base64(key_b64)
except (ValueError, binascii.Error):
logger.warning(f"borg key sanity check: key line 2+ does not look like base64. [{filename}]")
raise KeyfileInvalidError(self.repository._location.canonical_path(), filename) from None
# looks good!
return filename
@ -649,7 +667,8 @@ class FlexiKey:
def load(self, target, passphrase):
if self.STORAGE == KeyBlobStorage.KEYFILE:
with open(target) as fd:
key_data = "".join(fd.readlines()[1:])
key_data = fd.read()
_, key_data = keyfile_parse(key_data, bin_to_hex(self.repository.id))
elif self.STORAGE == KeyBlobStorage.REPO:
# While the repository is encrypted, we consider a repokey repository with a blank
# passphrase an unencrypted repository.
@ -663,6 +682,8 @@ class FlexiKey:
loc = target._location.canonical_path()
raise RepoKeyNotFoundError(loc) from None
key_data = key_data.decode("utf-8") # remote repo: msgpack issue #99, getting bytes
if is_keyfile(key_data):
_, key_data = keyfile_parse(key_data, bin_to_hex(self.repository.id))
else:
raise TypeError("Unsupported borg key storage type")
success = self._load(key_data, passphrase)
@ -675,7 +696,7 @@ class FlexiKey:
if self.STORAGE == KeyBlobStorage.KEYFILE:
old_target = getattr(self, "target", None)
keys_dir = get_keys_dir()
keyfile_data = f"{self.FILE_ID} {bin_to_hex(self.repository_id)}\n{key_data}\n"
keyfile_data = keyfile_format(bin_to_hex(self.repository_id), key_data)
target_dir = target if os.path.isdir(target) else os.path.dirname(target)
auto_named = not os.environ.get("BORG_KEY_FILE") and os.path.samefile(target_dir, keys_dir)
if auto_named:
@ -699,6 +720,7 @@ class FlexiKey:
logger.debug('Could not remove previous keyfile "%s": %s', old_target, exc)
elif self.STORAGE == KeyBlobStorage.REPO:
self.logically_encrypted = passphrase != "" # nosec B105
key_data = keyfile_format(bin_to_hex(self.repository_id), key_data)
key_data = key_data.encode("utf-8") # remote repo: msgpack issue #99, giving bytes
target.save_key(key_data)
else:

View file

@ -8,7 +8,8 @@ from ..helpers import Error, yes, bin_to_hex, hex_to_bin, dash_open, get_keys_di
from ..repoobj import RepoObj
from .key import CHPOKeyfileKey, RepoKeyNotFoundError, KeyBlobStorage, identify_key, keyfile_name_for
from .key import keyfile_format, keyfile_parse, is_keyfile
from .key import RepoKeyNotFoundError, KeyBlobStorage, identify_key, keyfile_name_for
class NotABorgKeyFile(Error):
@ -56,10 +57,14 @@ class KeyManager:
def load_keyblob(self):
if self.keyblob_storage == KeyBlobStorage.KEYFILE:
from .key import CHPOKeyfileKey
k = CHPOKeyfileKey(self.repository)
target = k.find_key()
with open(target) as fd:
self.keyblob = "".join(fd.readlines()[1:])
key_data = fd.read()
_, key_data = keyfile_parse(key_data, bin_to_hex(self.repository.id))
self.keyblob = key_data
elif self.keyblob_storage == KeyBlobStorage.REPO:
key_data = self.repository.load_key().decode()
@ -67,10 +72,14 @@ class KeyManager:
# if we got an empty key, it means there is no key.
loc = self.repository._location.canonical_path()
raise RepoKeyNotFoundError(loc) from None
if is_keyfile(key_data):
_, key_data = keyfile_parse(key_data, bin_to_hex(self.repository.id))
self.keyblob = key_data
def store_keyblob(self, args):
if self.keyblob_storage == KeyBlobStorage.KEYFILE:
from .key import CHPOKeyfileKey
k = CHPOKeyfileKey(self.repository)
target = k.get_existing_or_new_target(args)
keyfile_data = self.get_keyfile_data()
@ -79,14 +88,11 @@ class KeyManager:
with dash_open(target, "w") as fd:
fd.write(keyfile_data)
elif self.keyblob_storage == KeyBlobStorage.REPO:
self.repository.save_key(self.keyblob.encode("utf-8"))
key_data = keyfile_format(bin_to_hex(self.repository.id), self.keyblob.strip())
self.repository.save_key(key_data.encode("utf-8"))
def get_keyfile_data(self):
data = f"{CHPOKeyfileKey.FILE_ID} {bin_to_hex(self.repository.id)}\n"
data += self.keyblob
if not self.keyblob.endswith("\n"):
data += "\n"
return data
return keyfile_format(bin_to_hex(self.repository.id), self.keyblob.strip())
def store_keyfile(self, target):
with dash_open(target, "w") as fd:
@ -147,17 +153,15 @@ class KeyManager:
fd.write(export)
def import_keyfile(self, args):
file_id = CHPOKeyfileKey.FILE_ID
first_line = file_id + " " + bin_to_hex(self.repository.id) + "\n"
with dash_open(args.path, "r") as fd:
file_first_line = fd.read(len(first_line))
if file_first_line != first_line:
if not file_first_line.startswith(file_id):
raise NotABorgKeyFile()
else:
raise RepoIdMismatch()
self.keyblob = fd.read()
key_data = fd.read()
try:
repoid, b64data = keyfile_parse(key_data, bin_to_hex(self.repository.id))
except ValueError:
if not is_keyfile(key_data):
raise NotABorgKeyFile() from None
raise RepoIdMismatch() from None
self.keyblob = b64data
self.store_keyblob(args)
def import_paperkey(self, args):

View file

@ -1,6 +1,7 @@
import os
import time
from pathlib import Path
from hashlib import sha256
from borgstore.store import Store
from borgstore.store import ObjectNotFound as StoreObjectNotFound
@ -17,6 +18,7 @@ from .storelocking import Lock
from .logger import create_logger
from .manifest import NoManifestError
from .repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION
from .crypto.key import is_keyfile
logger = create_logger(__name__)
@ -233,13 +235,39 @@ class Repository:
self.lock.refresh()
def save_key(self, keydata):
# note: saving an empty key means that there is no repokey anymore
self.store.store("keys/repokey", keydata)
# currently, there is only one repokey,
# thus we delete all old/outdated keys stored in this repository.
try:
infos = list(self.store.list("keys"))
except StoreObjectNotFound:
pass
else:
for info in infos:
try:
self.store.delete(f"keys/{info.name}")
except StoreObjectNotFound:
pass
# note: saving an empty key means that there is no repokey for this repo anymore.
if keydata:
digest = sha256(keydata).hexdigest()
self.store.store(f"keys/{digest}", keydata)
def load_key(self):
keydata = self.store.load("keys/repokey")
# note: if we return an empty string, it means there is no repo key
return keydata
repo_id_hex = bin_to_hex(self.id)
# search for a key matching this repository's ID in the keys/ namespace
try:
infos = list(self.store.list("keys"))
except StoreObjectNotFound:
pass
else:
for info in infos:
try:
keydata = self.store.load(f"keys/{info.name}")
if is_keyfile(keydata, repo_id_hex):
return keydata
except StoreObjectNotFound:
pass
return b""
def destroy(self):
"""Destroy the repository"""

View file

@ -5,7 +5,7 @@ from hashlib import sha256
import pytest
from ...constants import * # NOQA
from ...crypto.key import AESOCBRepoKey, AESOCBKeyfileKey, CHPOKeyfileKey, Passphrase
from ...crypto.key import AESOCBRepoKey, AESOCBKeyfileKey, CHPOKeyfileKey, Passphrase, is_keyfile, keyfile_parse
from ...crypto.keymanager import RepoIdMismatch, NotABorgKeyFile
from ...helpers import CommandError
from ...helpers import bin_to_hex, hex_to_bin
@ -114,7 +114,7 @@ def test_key_export_keyfile(archivers, request):
with open(export_file) as fd:
export_contents = fd.read()
assert export_contents.startswith("BORG_KEY " + bin_to_hex(repo_id) + "\n")
assert is_keyfile(export_contents, bin_to_hex(repo_id))
key_file = archiver.keys_path + "/" + os.listdir(archiver.keys_path)[0]
@ -165,13 +165,13 @@ def test_key_export_repokey(archivers, request):
with open(export_file) as fd:
export_contents = fd.read()
assert export_contents.startswith("BORG_KEY " + bin_to_hex(repo_id) + "\n")
assert is_keyfile(export_contents, bin_to_hex(repo_id))
with Repository(archiver.repository_path) as repository:
repo_key = AESOCBRepoKey(repository)
repo_key.load(None, Passphrase.env_passphrase())
backup_key = AESOCBKeyfileKey(TestKey.MockRepository())
backup_key = AESOCBKeyfileKey(TestKey.MockRepository(id=repo_id))
backup_key.load(export_file, Passphrase.env_passphrase())
assert repo_key.crypt_key == backup_key.crypt_key
@ -341,7 +341,9 @@ def test_init_defaults_to_argon2(archivers, request):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", RK_ENCRYPTION)
with Repository(archiver.repository_path) as repository:
key = msgpack.unpackb(binascii.a2b_base64(repository.load_key()))
key_data = repository.load_key()
_, key_data = keyfile_parse(key_data, bin_to_hex(repository.id))
key = msgpack.unpackb(binascii.a2b_base64(key_data))
assert key["algorithm"] == "argon2 chacha20-poly1305"
@ -352,7 +354,9 @@ def test_change_passphrase_does_not_change_algorithm_argon2(archivers, request):
cmd(archiver, "key", "change-passphrase")
with Repository(archiver.repository_path) as repository:
key = msgpack.unpackb(binascii.a2b_base64(repository.load_key()))
key_data = repository.load_key()
_, key_data = keyfile_parse(key_data, bin_to_hex(repository.id))
key = msgpack.unpackb(binascii.a2b_base64(key_data))
assert key["algorithm"] == "argon2 chacha20-poly1305"
@ -362,5 +366,8 @@ def test_change_location_does_not_change_algorithm_argon2(archivers, request):
cmd(archiver, "key", "change-location", "repokey")
with Repository(archiver.repository_path) as repository:
key = msgpack.unpackb(binascii.a2b_base64(repository.load_key()))
key_data = repository.load_key()
if is_keyfile(key_data):
_, key_data = keyfile_parse(key_data, bin_to_hex(repository.id))
key = msgpack.unpackb(binascii.a2b_base64(key_data))
assert key["algorithm"] == "argon2 chacha20-poly1305"

View file

@ -5,7 +5,7 @@ from unittest.mock import MagicMock
import pytest
from ...crypto.key import PlaintextKey, AuthenticatedKey, Blake2AuthenticatedKey
from ...crypto.key import PlaintextKey, AuthenticatedKey, Blake2AuthenticatedKey, keyfile_parse
from ...crypto.key import RepoKey, KeyfileKey, Blake2RepoKey, Blake2KeyfileKey
from ...crypto.key import AEADKeyBase
from ...crypto.key import AESOCBRepoKey, AESOCBKeyfileKey, CHPORepoKey, CHPOKeyfileKey
@ -105,9 +105,11 @@ class TestKey:
def canonical_path(self):
return self.processed
def __init__(self, id=bytes(32)):
self.id = id
self.id_str = bin_to_hex(id)
_location = _Location()
id = bytes(32)
id_str = bin_to_hex(id)
version = 2
def save_key(self, data):
@ -327,7 +329,8 @@ def test_key_file_roundtrip(monkeypatch):
load_me = AESOCBRepoKey.detect(repository, manifest_data=None)
assert to_dict(load_me) == to_dict(save_me)
assert msgpack.unpackb(a2b_base64(saved))["algorithm"] == KEY_ALGORITHMS["argon2"]
_, saved_b64 = keyfile_parse(saved)
assert msgpack.unpackb(a2b_base64(saved_b64))["algorithm"] == KEY_ALGORITHMS["argon2"]
def test_argon2_wrong_passphrase_returns_none(monkeypatch):
@ -337,4 +340,5 @@ def test_argon2_wrong_passphrase_returns_none(monkeypatch):
monkeypatch.setenv("BORG_PASSPHRASE", "correct passphrase")
key = AESOCBRepoKey.create(repository, args=MagicMock(key_algorithm="argon2"))
saved = repository.save_key.call_args.args[0]
assert key.decrypt_key_file(a2b_base64(saved), "wrong passphrase") is None
_, saved_b64 = keyfile_parse(saved)
assert key.decrypt_key_file(a2b_base64(saved_b64), "wrong passphrase") is None