key: unify keyfile/repokey classes, locate key independent of type byte (#9743)

Borg used to read the manifest's key-type byte and then look for the key in
exactly one place (keyfile or repokey) depending on the key class that byte
selected. As a result every crypto suite was duplicated into a keyfile class
and a repokey class that differed only in TYPE, NAME, ARG_NAME and STORAGE.

Now key *location* is independent of the type byte: detection tries keyfiles
first and repokeys afterwards until a passphrase unlocks a key. The type byte
still selects the crypto suite (id hash, MAC, cipher) to instantiate. Where a
key is stored (keyfile vs repokey) is therefore a per-key property
(self.storage), not a separate class, so a repository may even hold a mix of
keyfile- and repo-stored borg keys.

With storage decoupled from class identity, the keyfile/repokey class pairs
collapse into one class per crypto suite:
- modern AEAD: AESOCBKey, CHPOKey, Blake3AESOCBKey, Blake3CHPOKey
- legacy borg 1.x (read-only): AESCTRKey, Blake2AESCTRKey
There is now exactly one type byte per modern crypto suite (the old separate
repokey type bytes 0x11/0x21/0x31/0x41 were removed; borg2 is beta and only
needs to read repos it created). identify_key() matches on TYPES_ACCEPTABLE.

CLI: --encryption selects only the crypto suite (aes-ocb, chacha20-poly1305,
blake3-aes-ocb, blake3-chacha20-poly1305, authenticated*, none); the storage
location is chosen with the new --key-location=repokey|keyfile (default
repokey). The old combined modes (repokey-aes-ocb etc.) were removed.
borg key import also gained --key-location. borg key change-location no longer
swaps key classes or rewrites the manifest; it just re-saves the unlocked key
at the new location.

Keyfile removal (key remove, change-location) now overwrites the keyfile with
random data via secure_erase() before unlinking, consistent with save().

borg 1.x legacy read compatibility is preserved (the legacy class merge is a
behavior-preserving rename; the legacy type bytes incl. PASSPHRASE stay in
TYPES_ACCEPTABLE).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Thomas Waldmann 2026-06-12 23:07:57 +02:00
parent e0c7a68a3e
commit d2bc45f56d
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
21 changed files with 349 additions and 343 deletions

View file

@ -168,6 +168,15 @@ above.
New features:
- key: unify keyfile/repokey key classes and locate the key independently of the
manifest key-type byte. Borg now tries keyfiles first and repokeys afterwards until
a passphrase unlocks a key, so where a key is stored (keyfile vs repokey) is a
per-key property rather than a separate key class. The key-type byte still selects
the crypto suite (id hash, MAC, cipher). ``borg repo-create --encryption`` now takes
only the crypto suite (e.g. ``aes-ocb``, ``chacha20-poly1305``, ``blake3-aes-ocb``);
choose the storage location with the new ``--key-location=repokey|keyfile`` option
(default: ``repokey``). The old combined modes (``repokey-aes-ocb`` etc.) were
removed. ``borg key import`` also gained ``--key-location``. #9743
- WIP packs project, major repo format changes, you must create new repos! #8572
- rest:// repository URLs - connect via ssh to remote borgstore REST server,
talking http via stdio, #9593

View file

@ -1,6 +1,6 @@
1. Before a backup can be made, a repository has to be initialized::
$ borg -r /path/to/repo repo-create --encryption=repokey-aes-ocb
$ borg -r /path/to/repo repo-create --encryption=aes-ocb
2. Back up the ``~/src`` and ``~/Documents`` directories into an archive called
*docs*::

View file

@ -9,7 +9,7 @@ Examples
::
# Create a key file protected repository
$ borg repo-create --encryption=keyfile-aes-ocb -v
$ borg repo-create --encryption=aes-ocb --key-location=keyfile -v
Initializing repository at "/path/to/repo"
Enter new passphrase:
Enter same passphrase again:
@ -46,7 +46,7 @@ Fully automated using environment variables:
::
$ BORG_NEW_PASSPHRASE=old borg repo-create --encryption=repokey-aes-ocb
$ BORG_NEW_PASSPHRASE=old borg repo-create --encryption=aes-ocb
# now "old" is the current passphrase.
$ BORG_PASSPHRASE=old BORG_NEW_PASSPHRASE=new borg key change-passphrase
# now "new" is the current passphrase.

View file

@ -8,21 +8,21 @@ Examples
# Local repository
$ export BORG_REPO=/path/to/repo
# Recommended repokey AEAD cryptographic modes
$ borg repo-create --encryption=repokey-aes-ocb
$ borg repo-create --encryption=repokey-chacha20-poly1305
$ borg repo-create --encryption=repokey-blake2-aes-ocb
$ borg repo-create --encryption=repokey-blake2-chacha20-poly1305
# Recommended AEAD cryptographic modes (key stored in the repository by default)
$ borg repo-create --encryption=aes-ocb
$ borg repo-create --encryption=chacha20-poly1305
$ borg repo-create --encryption=blake3-aes-ocb
$ borg repo-create --encryption=blake3-chacha20-poly1305
# No encryption (not recommended)
$ borg repo-create --encryption=authenticated
$ borg repo-create --encryption=authenticated-blake2
$ borg repo-create --encryption=authenticated-blake3
$ borg repo-create --encryption=none
# Remote repository (accesses a remote Borg via SSH)
$ export BORG_REPO=ssh://user@hostname/~/backup
# repokey: stores the encrypted key in <REPO_DIR>/config
$ borg repo-create --encryption=repokey-aes-ocb
# The crypto suite (--encryption) and where the key is stored (--key-location) are
# chosen independently. --key-location defaults to repokey.
# repokey: stores the encrypted key inside the repository
$ borg repo-create --encryption=aes-ocb --key-location=repokey
# keyfile: stores the encrypted key in the config dir's keys/ subdir
# (e.g. ~/.config/borg/keys/ on Linux, ~/Library/Application Support/borg/keys/ on macOS)
$ borg repo-create --encryption=keyfile-aes-ocb
$ borg repo-create --encryption=aes-ocb --key-location=keyfile

View file

@ -21,13 +21,13 @@ locations and passphrases first:
# 1. Create a new "related" repository:
# Here, the existing Borg 1.x repository used repokey (and AES-CTR mode),
# thus we use repokey-aes-ocb for the new Borg 2.0 repository.
# thus we use aes-ocb for the new Borg 2.0 repository.
# Staying with the same chunk ID algorithm (hmac-sha256) and with the same
# key material (via BORG_OTHER_REPO) will make deduplication work
# between old archives (copied with borg transfer) and future ones.
# The AEAD cipher does not matter (everything must be re-encrypted and
# re-authenticated anyway); you could also choose repokey-chacha20-poly1305.
$ borg repo-create -e repokey-aes-ocb
# re-authenticated anyway); you could also choose chacha20-poly1305.
$ borg repo-create -e aes-ocb
# 2. Check what and how much it would transfer:
$ borg transfer --from-borg1 --dry-run
@ -46,15 +46,15 @@ locations and passphrases first:
# 1. Create a new "related" repository:
# Here, the existing Borg 1.x repository used repokey-blake2 (and AES-CTR mode),
# thus we use repokey-blake3-aes-ocb for the new Borg 2.0 repository.
# thus we use blake3-aes-ocb for the new Borg 2.0 repository.
# We need to change from blake2 to blake3, because blake2 is not supported
# for borg2 repos (blake3 is much faster). Because we change how chunk IDs are
# computed, we need to re-chunk everything while doing the transfer.
# The chunker parameters you provide here should be the same as you will
# use for all future Borg 2.0 archives.
# The AEAD cipher does not matter (everything must be re-encrypted and
# re-authenticated anyway); you could also choose repokey-blake3-chacha20-poly1305.
$ borg repo-create -e repokey-blake3-aes-ocb
# re-authenticated anyway); you could also choose blake3-chacha20-poly1305.
$ borg repo-create -e blake3-aes-ocb
$ export CHUNKER_PARAMS="buzhash64,19,23,21,4095"
# 2. Check what and how much it would transfer:

View file

@ -1,8 +1,7 @@
import os
from ..constants import * # NOQA
from ..crypto.key import AESOCBRepoKey, CHPORepoKey, Blake3AESOCBRepoKey, Blake3CHPORepoKey
from ..crypto.key import AESOCBKeyfileKey, CHPOKeyfileKey, Blake3AESOCBKeyfileKey, Blake3CHPOKeyfileKey
from ..crypto.key import KEY_LOCATIONS
from ..crypto.keymanager import KeyManager
from ..helpers import PathSpec, CommandError
from ..helpers.argparsing import ArgumentParser
@ -62,40 +61,24 @@ class KeysMixIn:
@with_repository(exclusive=True, manifest=True, cache=True, compatibility=(Manifest.Operation.CHECK,))
def do_key_change_location(self, args, repository, manifest, cache):
"""Changes the repository key location."""
"""Changes the location of the borg key used to unlock this repository."""
key = manifest.key
if not hasattr(key, "change_passphrase"):
raise CommandError("This repository is not encrypted, cannot change the key location.")
if not getattr(key, "LOCATION_CONFIGURABLE", False):
raise CommandError("This key's location cannot be changed (it has no keyfile/repokey storage).")
if args.key_mode == "keyfile":
if isinstance(key, AESOCBRepoKey):
key_new = AESOCBKeyfileKey(repository)
elif isinstance(key, CHPORepoKey):
key_new = CHPOKeyfileKey(repository)
elif isinstance(key, Blake3AESOCBRepoKey):
key_new = Blake3AESOCBKeyfileKey(repository)
elif isinstance(key, Blake3CHPORepoKey):
key_new = Blake3CHPOKeyfileKey(repository)
else:
print("Change not needed or not supported.")
return
if args.key_mode == "repokey":
if isinstance(key, AESOCBKeyfileKey):
key_new = AESOCBRepoKey(repository)
elif isinstance(key, CHPOKeyfileKey):
key_new = CHPORepoKey(repository)
elif isinstance(key, Blake3AESOCBKeyfileKey):
key_new = Blake3AESOCBRepoKey(repository)
elif isinstance(key, Blake3CHPOKeyfileKey):
key_new = Blake3CHPORepoKey(repository)
else:
print("Change not needed or not supported.")
return
new_storage = KEY_LOCATIONS[args.key_mode]
if key.storage == new_storage:
print(f"The borg key is already stored as {args.key_mode}, nothing to do.")
return
# the crypto class / manifest key-type byte does not change - only the storage location does.
# build a same-class key with the same key material and store it at the new location.
key_new = type(key)(repository)
for name in ("repository_id", "crypt_key", "id_key", "chunk_seed", "sessionid", "cipher"):
value = getattr(key, name)
setattr(key_new, name, value)
setattr(key_new, name, getattr(key, name))
key_new.storage = new_storage
key_new.target = key_new.get_new_target(args)
# save with same passphrase, algorithm and label (keep the unlocked borg key's label)
key_new.save(
@ -106,18 +89,16 @@ class KeysMixIn:
label=key._loaded_label,
)
# rewrite the manifest with the new key, so that the key-type byte of the manifest changes
# the new key (same crypto material, new storage) is the canonical key going forward
manifest.key = key_new
manifest.repo_objs.key = key_new
manifest.write()
cache.key = key_new
loc = key_new.find_key() if hasattr(key_new, "find_key") else None
loc = key_new.find_key()
if args.keep:
logger.info(f"Key copied to {loc}")
else:
key.remove(key.target) # remove key from current location
key.remove(key.target) # remove the borg key from its previous location only
logger.info(f"Key moved to {loc}")
@with_repository(lock=False, manifest=False, cache=False)
@ -256,6 +237,15 @@ class KeysMixIn:
action="store_true",
help="interactively import from a backup done with ``--paper``",
)
subparser.add_argument(
"--key-location",
metavar="LOCATION",
dest="key_location",
choices=("repokey", "keyfile"),
default="repokey",
help="where to store the imported key: 'repokey' (in the repository, default) or "
"'keyfile' (in the local keys directory)",
)
change_passphrase_epilog = process_epilog(
"""

View file

@ -77,7 +77,7 @@ class RepoCreateMixIn:
::
borg repo-create --encryption repokey-aes-ocb
borg repo-create --encryption aes-ocb --key-location repokey
Borg will:
@ -125,11 +125,17 @@ class RepoCreateMixIn:
Depending on your hardware, hashing and crypto performance may vary widely.
The easiest way to find out what is fastest is to run ``borg benchmark cpu``.
`repokey` modes: if you want ease-of-use and "passphrase" security is good enough -
the key will be stored in the repository (in ``repo_dir/config``).
The encryption mode (``--encryption``) only selects the crypto suite (id hash, encryption
and authentication). Where the key is stored is chosen separately with ``--key-location``:
`keyfile` modes: if you want "passphrase and having-the-key" security -
the key will be stored in your home directory (in ``~/.config/borg/keys``).
- ``repokey`` (default): the key is stored in the repository (under ``keys/``). Pick this
if you want ease-of-use and "passphrase" security is good enough.
- ``keyfile``: the key is stored in your home directory (in ``~/.config/borg/keys``). Pick
this if you want "passphrase and having-the-key" security.
You can move the key between these locations later with ``borg key change-location``.
``--key-location`` is ignored for ``none`` and ``authenticated*`` modes (those have no
separate keyfile/repokey storage).
The following table is roughly sorted in order of preference, the better ones are
in the upper part of the table, in the lower part is the old and/or unsafe(r) stuff:
@ -137,17 +143,17 @@ class RepoCreateMixIn:
.. nanorst: inline-fill
+-----------------------------------+--------------+----------------+--------------------+
| Mode (K = keyfile or repokey) | ID-Hash | Encryption | Authentication |
| Encryption mode | ID-Hash | Encryption | Authentication |
+-----------------------------------+--------------+----------------+--------------------+
| K-blake2-chacha20-poly1305 | BLAKE2b | CHACHA20 | POLY1305 |
| blake3-chacha20-poly1305 | BLAKE3 | CHACHA20 | POLY1305 |
+-----------------------------------+--------------+----------------+--------------------+
| K-chacha20-poly1305 | HMAC-SHA-256 | CHACHA20 | POLY1305 |
| chacha20-poly1305 | HMAC-SHA-256 | CHACHA20 | POLY1305 |
+-----------------------------------+--------------+----------------+--------------------+
| K-blake2-aes-ocb | BLAKE2b | AES256-OCB | AES256-OCB |
| blake3-aes-ocb | BLAKE3 | AES256-OCB | AES256-OCB |
+-----------------------------------+--------------+----------------+--------------------+
| K-aes-ocb | HMAC-SHA-256 | AES256-OCB | AES256-OCB |
| aes-ocb | HMAC-SHA-256 | AES256-OCB | AES256-OCB |
+-----------------------------------+--------------+----------------+--------------------+
| authenticated-blake2 | BLAKE2b | none | BLAKE2b |
| authenticated-blake3 | BLAKE3 | none | BLAKE3 |
+-----------------------------------+--------------+----------------+--------------------+
| authenticated | HMAC-SHA-256 | none | HMAC-SHA256 |
+-----------------------------------+--------------+----------------+--------------------+
@ -214,7 +220,17 @@ class RepoCreateMixIn:
required=True,
choices=key_argument_names(),
action=Highlander,
help="select encryption key mode **(required)**",
help="select encryption crypto suite **(required)**",
)
subparser.add_argument(
"--key-location",
metavar="LOCATION",
dest="key_location",
choices=("repokey", "keyfile"),
default="repokey",
action=Highlander,
help="where to store the key: 'repokey' (in the repository, default) or 'keyfile' "
"(in the local keys directory). Ignored for 'none' and 'authenticated*' modes.",
)
subparser.add_argument(
"--copy-crypt-key",

View file

@ -25,9 +25,13 @@ class RepoInfoMixIn:
if key.NAME in ("plaintext", "authenticated"):
encryption += "No"
else:
encryption += "Yes (%s)" % key.NAME
if key.NAME.startswith("key file"):
encryption += "\nKey file: %s" % key.find_key()
# storage (keyfile/repokey) is a per-key property now; the crypto suite is key.NAME.
mode = {KeyBlobStorage.KEYFILE: "keyfile", KeyBlobStorage.REPO: "repokey"}.get(
getattr(key, "storage", None)
)
encryption += "Yes (%s, %s)" % (mode, key.NAME) if mode else "Yes (%s)" % key.NAME
if getattr(key, "storage", None) == KeyBlobStorage.KEYFILE:
encryption += "\nKey file: %s" % key.find_key()
info["encryption"] = encryption
output = (

View file

@ -182,7 +182,7 @@ class KeyType:
# repos with PASSPHRASE mode could not be created any more since borg 1.0, see #97.
# in borg 2. all of its code and also the "borg key migrate-to-repokey" command was removed.
# if you still need to, you can use "borg key migrate-to-repokey" with borg 1.0, 1.1 and 1.2.
# Nowadays, we just dispatch this to RepoKey and assume the passphrase was migrated to a repokey.
# Nowadays, we just dispatch this to the legacy AES-CTR key and assume the passphrase was migrated.
PASSPHRASE = 0x01 # legacy, borg < 1.0
PLAINTEXT = 0x02
REPO = 0x03
@ -191,15 +191,13 @@ class KeyType:
BLAKE2AUTHENTICATED = 0x06
AUTHENTICATED = 0x07
# new crypto
# upper 4 bits are ciphersuite, lower 4 bits are keytype
AESOCBKEYFILE = 0x10
AESOCBREPO = 0x11
CHPOKEYFILE = 0x20
CHPOREPO = 0x21
BLAKE3AESOCBKEYFILE = 0x30
BLAKE3AESOCBREPO = 0x31
BLAKE3CHPOKEYFILE = 0x40
BLAKE3CHPOREPO = 0x41
# upper 4 bits are ciphersuite, lower 4 bits are reserved (0).
# the type byte only identifies the crypto suite; where the key is stored (keyfile vs
# repokey) is not encoded here any more, so there is only one type byte per suite.
AESOCB = 0x10
CHPO = 0x20
BLAKE3AESOCB = 0x30
BLAKE3CHPO = 0x40
BLAKE3AUTHENTICATED = 0x50

View file

@ -121,6 +121,11 @@ class UnsupportedKeyFormatError(Error):
exit_mcode = 49
# map the user-facing key location names ("borg repo-create --key-location", "borg key change-location")
# to the internal KeyBlobStorage values. Note "repokey" != KeyBlobStorage.REPO's string value.
KEY_LOCATIONS = {"keyfile": KeyBlobStorage.KEYFILE, "repokey": KeyBlobStorage.REPO}
def key_creator(repository, args, *, other_key=None):
for key in AVAILABLE_KEY_TYPES:
if key.ARG_NAME == args.encryption:
@ -135,15 +140,14 @@ def key_argument_names():
def identify_key(manifest_data):
# the key-type byte only identifies the crypto suite (id hash, MAC, cipher), NOT where the key is
# stored: keyfile and repokey share one class now and accept both historic type bytes. The legacy
# PASSPHRASE byte (0x01) is part of AESCTRKey.TYPES_ACCEPTABLE. All TYPES_ACCEPTABLE sets are disjoint.
key_type = manifest_data[0]
if key_type == KeyType.PASSPHRASE: # legacy, see comment in KeyType class.
return RepoKey
for key in LEGACY_KEY_TYPES + AVAILABLE_KEY_TYPES:
if key.TYPE == key_type:
if key_type in key.TYPES_ACCEPTABLE:
return key
else:
raise UnsupportedPayloadError(key_type)
raise UnsupportedPayloadError(key_type)
def key_factory(repository, manifest_chunk, *, other=False, ro_cls=RepoObj):
@ -164,16 +168,10 @@ def uses_same_id_hash(other_key, key):
# avoid breaking the deduplication by changing the id hash
old_sha256_ids = (PlaintextKey,)
new_sha256_ids = (PlaintextKey,)
old_hmac_sha256_ids = (RepoKey, KeyfileKey, AuthenticatedKey)
new_hmac_sha256_ids = (AESOCBRepoKey, AESOCBKeyfileKey, CHPORepoKey, CHPOKeyfileKey, AuthenticatedKey)
old_hmac_sha256_ids = (AESCTRKey, AuthenticatedKey)
new_hmac_sha256_ids = (AESOCBKey, CHPOKey, AuthenticatedKey)
# note: we do not support blake2b for new repos, see #8867
new_blake3_ids = (
Blake3AESOCBRepoKey,
Blake3AESOCBKeyfileKey,
Blake3CHPORepoKey,
Blake3CHPOKeyfileKey,
Blake3AuthenticatedKey,
)
new_blake3_ids = (Blake3AESOCBKey, Blake3CHPOKey, Blake3AuthenticatedKey)
same_ids = (
isinstance(other_key, old_hmac_sha256_ids + new_hmac_sha256_ids)
and isinstance(key, new_hmac_sha256_ids)
@ -197,9 +195,14 @@ class KeyBase:
# Name used in command line / API (e.g. borg init --encryption=...)
ARG_NAME = "UNDEFINED"
# Storage type (no key blob storage / keyfile / repo)
# Storage type (no key blob storage / keyfile / repo). This is only a default seed for the
# per-instance self.storage; keyfile vs repokey is a property of an individual key, not the class.
STORAGE: ClassVar[str] = KeyBlobStorage.NO_STORAGE
# Whether a key of this class may be stored as a keyfile or as a repokey (configurable at
# repo creation via --key-location and changeable later via "borg key change-location").
LOCATION_CONFIGURABLE = False
# Seed for the buzhash chunker (borg.algorithms.chunker.Chunker)
# type is int
chunk_seed: int = None
@ -223,6 +226,9 @@ class KeyBase:
self.TYPE_STR = bytes([self.TYPE])
self.repository = repository
self.target = None # key location file path / repo obj
# where this particular key is/will be stored (keyfile or repo); seeded from the class default,
# overwritten when a key is loaded (see FlexiKey._try_key) or created (see --key-location).
self.storage = self.STORAGE
self.copy_crypt_key = False
def id_hash(self, data):
@ -546,7 +552,7 @@ class FlexiKey:
# (for repokey) delete the previously-loaded borg key (keyfile mode auto-erases it in save()).
old_id = self._loaded_key_id
self.save(self.target, passphrase, algorithm=self._encrypted_key_algorithm, label=self._loaded_label)
if self.STORAGE == KeyBlobStorage.REPO and old_id and hasattr(self.repository, "delete_key"):
if self.storage == KeyBlobStorage.REPO and old_id and hasattr(self.repository, "delete_key"):
if self._loaded_key_id != old_id:
self.repository.delete_key(old_id)
@ -554,6 +560,9 @@ class FlexiKey:
def create(cls, repository, args, *, other_key=None):
key = cls(repository)
key.repository_id = repository.id
if cls.LOCATION_CONFIGURABLE:
# choose initial storage (keyfile or repokey) from --key-location (default: repokey).
key.storage = KEY_LOCATIONS.get(getattr(args, "key_location", None), cls.STORAGE)
if other_key is not None:
if isinstance(other_key, PlaintextKey):
raise Error("Copying key material from an unencrypted repository is not possible.")
@ -605,23 +614,18 @@ class FlexiKey:
return filename
def find_key(self):
if self.STORAGE == KeyBlobStorage.KEYFILE:
keyfile = self._find_key_file_from_environment()
if keyfile is not None:
return self.sanity_check(keyfile, self.repository.id)
keyfile = self._find_key_in_keys_dir()
if keyfile is not None:
return keyfile
raise KeyfileNotFoundError(self.repository._location.canonical_path(), get_keys_dir())
elif self.STORAGE == KeyBlobStorage.REPO:
loc = self.repository._location.canonical_path()
key = self.repository.load_key()
if not key:
# if we got an empty key, it means there is no key.
raise RepoKeyNotFoundError(loc) from None
# storage-agnostic: report the location of any existing borg key for this repo, preferring a
# keyfile (checked first) over a repokey. Used for the passphrase prompt and for logging.
env_keyfile = self._find_key_file_from_environment()
if env_keyfile is not None:
return self.sanity_check(env_keyfile, self.repository.id)
keyfile = self._find_key_in_keys_dir()
if keyfile is not None:
return keyfile
loc = self.repository._location.canonical_path()
if self.repository.load_key():
return loc
else:
raise TypeError("Unsupported borg key storage type")
raise RepoKeyNotFoundError(loc) from None
def get_existing_or_new_target(self, args):
keyfile = self._find_key_file_from_environment()
@ -659,12 +663,12 @@ class FlexiKey:
return found
def get_new_target(self, args):
if self.STORAGE == KeyBlobStorage.KEYFILE:
if self.storage == KeyBlobStorage.KEYFILE:
keyfile = self._find_key_file_from_environment()
if keyfile is not None:
return keyfile
return get_keys_dir()
elif self.STORAGE == KeyBlobStorage.REPO:
elif self.storage == KeyBlobStorage.REPO:
return self.repository
else:
raise TypeError("Unsupported borg key storage type")
@ -703,12 +707,9 @@ class FlexiKey:
def _iter_keys(self):
# return [(key_id, blob_text, keyfile_path_or_None)] for all borg keys of this repo.
if self.STORAGE == KeyBlobStorage.KEYFILE:
return self._keyfile_candidates()
elif self.STORAGE == KeyBlobStorage.REPO:
return self._repo_candidates()
else:
raise TypeError("Unsupported borg key storage type")
# storage-agnostic: we look at keyfiles first and repokeys afterwards, regardless of the
# manifest key-type byte. The first key a passphrase unlocks wins (see load_any).
return self._keyfile_candidates() + self._repo_candidates()
def _key_envelope(self, blob_text):
# decode the (unencrypted) EncryptedKey envelope of a borg key without decrypting it.
@ -738,7 +739,14 @@ class FlexiKey:
logger.debug("Borg key %s could not be loaded (corrupted?), skipping it: %s", key_id[:12], exc)
return False
if loaded:
self.target = keyfile_path if self.STORAGE == KeyBlobStorage.KEYFILE else self.repository
# remember where this particular key actually lives (keyfile vs repokey), independent of
# the manifest key-type byte, so save/remove/list operate on the right storage afterwards.
self.storage = KeyBlobStorage.KEYFILE if keyfile_path is not None else KeyBlobStorage.REPO
self.target = keyfile_path if self.storage == KeyBlobStorage.KEYFILE else self.repository
if self.storage == KeyBlobStorage.REPO:
# While the repository is encrypted, we consider a repokey repository with a blank
# passphrase an unencrypted repository.
self.logically_encrypted = passphrase != "" # nosec B105
self._loaded_key_id = key_id
self._loaded_label = self._encrypted_key_label
return True
@ -746,10 +754,6 @@ class FlexiKey:
def load_any(self, passphrase):
"""Try the passphrase against every borg key of this repository."""
if self.STORAGE == KeyBlobStorage.REPO:
# While the repository is encrypted, we consider a repokey repository with a blank
# passphrase an unencrypted repository.
self.logically_encrypted = passphrase != "" # nosec B105
for key_id, blob_text, keyfile_path in self._iter_keys():
if self._try_key(key_id, blob_text, keyfile_path, passphrase):
return True
@ -758,23 +762,21 @@ class FlexiKey:
def load(self, target, passphrase):
# load a specific borg key: for keyfiles, the explicit file given as target; for repokey,
# any of the repository's borg keys (which are addressed by passphrase, not by target).
if self.STORAGE == KeyBlobStorage.KEYFILE:
if self.storage == KeyBlobStorage.KEYFILE:
try:
with open(target, "rb") as fd:
blob = fd.read()
except OSError:
return False
return self._try_key(sha256(blob).hexdigest(), blob.decode("utf-8"), str(target), passphrase)
elif self.STORAGE == KeyBlobStorage.REPO:
return self.load_any(passphrase)
else:
raise TypeError("Unsupported borg key storage type")
return self.load_any(passphrase)
def save(self, target, passphrase, algorithm, create=False, label=None, replace=True):
# replace=True replaces the previously-loaded borg key (change-passphrase semantics);
# replace=False adds an additional borg key, keeping the existing ones (key add).
key_data = self._save(passphrase, algorithm, label=label)
if self.STORAGE == KeyBlobStorage.KEYFILE:
if self.storage == KeyBlobStorage.KEYFILE:
old_target = getattr(self, "target", None)
keys_dir = get_keys_dir()
keyfile_data = keyfile_format(bin_to_hex(self.repository_id), key_data)
@ -801,7 +803,7 @@ class FlexiKey:
except OSError as exc:
logger.debug('Could not remove previous keyfile "%s": %s', old_target, exc)
self._loaded_key_id = sha256(keyfile_data.encode()).hexdigest()
elif self.STORAGE == KeyBlobStorage.REPO:
elif self.storage == KeyBlobStorage.REPO:
self.logically_encrypted = passphrase != "" # nosec B105
key_data = keyfile_format(bin_to_hex(self.repository_id), key_data)
key_data = key_data.encode("utf-8") # remote repo: msgpack issue #99, giving bytes
@ -814,13 +816,15 @@ class FlexiKey:
self._loaded_key_id = sha256(key_data).hexdigest()
else:
raise TypeError("Unsupported borg key storage type")
self.target = target if self.STORAGE != KeyBlobStorage.REPO else self.repository
self.target = target if self.storage != KeyBlobStorage.REPO else self.repository
self._loaded_label = label
def remove(self, target):
if self.STORAGE == KeyBlobStorage.KEYFILE:
os.remove(target) # the keyfile of the borg key we unlocked; other borg keys are separate files
elif self.STORAGE == KeyBlobStorage.REPO:
if self.storage == KeyBlobStorage.KEYFILE:
# the keyfile of the borg key we unlocked; other borg keys are separate files.
# overwrite it with random data before unlinking (same as save() does for old keyfiles).
secure_erase(target, avoid_collateral_damage=True)
elif self.storage == KeyBlobStorage.REPO:
# remove only the borg key we unlocked, leaving the repository's other borg keys alone.
if hasattr(target, "delete_key") and self._loaded_key_id:
target.delete_key(self._loaded_key_id)
@ -831,9 +835,11 @@ class FlexiKey:
def list_keys(self):
"""Return metadata for all borg keys of this repository (no decryption)."""
mode = "keyfile" if self.STORAGE == KeyBlobStorage.KEYFILE else "repokey"
result = []
for key_id, blob_text, keyfile_path in self._iter_keys():
# storage is a per-key property now, so report each key's actual mode (a repository may
# hold a mix of keyfile- and repo-stored borg keys).
mode = "keyfile" if keyfile_path is not None else "repokey"
try:
env = self._key_envelope(blob_text)
label, algorithm = env.get("label"), env.get("algorithm")
@ -854,9 +860,9 @@ class FlexiKey:
def add_key(self, passphrase=None, label=None):
"""Add an additional borg key protecting the same key material with a new passphrase."""
if self.STORAGE == KeyBlobStorage.REPO and not hasattr(self.repository, "store_key"):
if self.storage == KeyBlobStorage.REPO and not hasattr(self.repository, "store_key"):
raise Error("This repository type does not support multiple borg keys.")
if self.STORAGE == KeyBlobStorage.KEYFILE and os.environ.get("BORG_KEY_FILE"):
if self.storage == KeyBlobStorage.KEYFILE and os.environ.get("BORG_KEY_FILE"):
raise Error(
"Cannot add a borg key while BORG_KEY_FILE points to a single keyfile; "
"unset it so the new keyfile can be stored in the keys directory."
@ -889,12 +895,11 @@ class FlexiKey:
victim = matches[0]
if victim["label"] == ADMIN_LABEL:
raise Error('The "%s" borg key is protected and cannot be removed.' % ADMIN_LABEL)
if self.STORAGE == KeyBlobStorage.REPO:
# remove from the victim's own storage (which may differ from the unlocked key's storage)
if victim["mode"] == "repokey":
self.repository.delete_key(victim["id"])
elif self.STORAGE == KeyBlobStorage.KEYFILE:
os.remove(victim["path"])
else:
raise TypeError("Unsupported borg key storage type")
secure_erase(victim["path"], avoid_collateral_damage=True) # overwrite the keyfile before unlinking
return victim
@ -944,8 +949,8 @@ class AuthenticatedKeyBase(AESKeyBase, FlexiKey):
# legacy imports placed after FlexiKey/AESKeyBase/KeyBase/AuthenticatedKeyBase so those names are already
# in the partial module when legacy/crypto/key.py imports them back during circular load
from ..legacy.crypto.key import KeyfileKey, RepoKey
from ..legacy.crypto.key import Blake2KeyfileKey, Blake2RepoKey, Blake2AuthenticatedKey # noqa: F401
from ..legacy.crypto.key import AESCTRKey, Blake2AESCTRKey # noqa: F401
from ..legacy.crypto.key import Blake2AuthenticatedKey # noqa: F401
from ..legacy.crypto.key import LEGACY_KEY_TYPES # noqa: E402
from ..legacy.crypto.key import ID_BLAKE2b_256 # noqa: F401
@ -987,7 +992,7 @@ class AEADKeyBase(KeyBase):
Offsets:0 1 2 8 32 48 [bytes]
suite: 1010b for new AEAD crypto, 0000b is old crypto
keytype: see constants.KeyType (suite+keytype)
keytype: always 0 for new crypto (the key storage location is no longer encoded in the type byte)
reserved: all-zero, for future use
messageIV: a counter starting from 0 for all new encrypted messages of one session
sessionID: 192bit random, computed once per session (the session key is derived from this)
@ -1003,6 +1008,11 @@ class AEADKeyBase(KeyBase):
MAX_IV = 2**48 - 1
# default storage; an individual key's actual storage is tracked per-instance in self.storage.
STORAGE = KeyBlobStorage.REPO
# an AEAD key may be stored as a keyfile or inside the repository (see borg key change-location).
LOCATION_CONFIGURABLE = True
def assert_id(self, id, data):
# Comparing the id hash here would not be needed any more for the new AEAD crypto **IF** we
# could be sure that chunks were created by normal (not tampered, not evil) borg code:
@ -1087,75 +1097,41 @@ class AEADKeyBase(KeyBase):
self.cipher = self._get_cipher(self.sessionid, iv=0)
class AESOCBKeyfileKey(ID_HMAC_SHA_256, AEADKeyBase, FlexiKey):
TYPES_ACCEPTABLE = {KeyType.AESOCBKEYFILE, KeyType.AESOCBREPO}
TYPE = KeyType.AESOCBKEYFILE
NAME = "key file AES-OCB"
ARG_NAME = "keyfile-aes-ocb"
STORAGE = KeyBlobStorage.KEYFILE
# Each of these is one unified key class per crypto suite. A key of this class may be stored either as
# a keyfile or inside the repository (repokey) - that is a per-key storage property (self.storage), not
# a class distinction. The class is selected from the manifest's key-type byte (see identify_key), which
# only encodes the crypto suite (there is exactly one type byte per suite now).
class AESOCBKey(ID_HMAC_SHA_256, AEADKeyBase, FlexiKey):
TYPE = KeyType.AESOCB
TYPES_ACCEPTABLE = {TYPE}
NAME = "AES-OCB"
ARG_NAME = "aes-ocb"
CIPHERSUITE = AES256_OCB
class AESOCBRepoKey(ID_HMAC_SHA_256, AEADKeyBase, FlexiKey):
TYPES_ACCEPTABLE = {KeyType.AESOCBKEYFILE, KeyType.AESOCBREPO}
TYPE = KeyType.AESOCBREPO
NAME = "repokey AES-OCB"
ARG_NAME = "repokey-aes-ocb"
STORAGE = KeyBlobStorage.REPO
CIPHERSUITE = AES256_OCB
class CHPOKeyfileKey(ID_HMAC_SHA_256, AEADKeyBase, FlexiKey):
TYPES_ACCEPTABLE = {KeyType.CHPOKEYFILE, KeyType.CHPOREPO}
TYPE = KeyType.CHPOKEYFILE
NAME = "key file ChaCha20-Poly1305"
ARG_NAME = "keyfile-chacha20-poly1305"
STORAGE = KeyBlobStorage.KEYFILE
class CHPOKey(ID_HMAC_SHA_256, AEADKeyBase, FlexiKey):
TYPE = KeyType.CHPO
TYPES_ACCEPTABLE = {TYPE}
NAME = "ChaCha20-Poly1305"
ARG_NAME = "chacha20-poly1305"
CIPHERSUITE = CHACHA20_POLY1305
class CHPORepoKey(ID_HMAC_SHA_256, AEADKeyBase, FlexiKey):
TYPES_ACCEPTABLE = {KeyType.CHPOKEYFILE, KeyType.CHPOREPO}
TYPE = KeyType.CHPOREPO
NAME = "repokey ChaCha20-Poly1305"
ARG_NAME = "repokey-chacha20-poly1305"
STORAGE = KeyBlobStorage.REPO
CIPHERSUITE = CHACHA20_POLY1305
class Blake3AESOCBKeyfileKey(ID_BLAKE3_256, AEADKeyBase, FlexiKey):
TYPES_ACCEPTABLE = {KeyType.BLAKE3AESOCBKEYFILE, KeyType.BLAKE3AESOCBREPO}
TYPE = KeyType.BLAKE3AESOCBKEYFILE
NAME = "key file BLAKE3 AES-OCB"
ARG_NAME = "keyfile-blake3-aes-ocb"
STORAGE = KeyBlobStorage.KEYFILE
class Blake3AESOCBKey(ID_BLAKE3_256, AEADKeyBase, FlexiKey):
TYPE = KeyType.BLAKE3AESOCB
TYPES_ACCEPTABLE = {TYPE}
NAME = "BLAKE3 AES-OCB"
ARG_NAME = "blake3-aes-ocb"
CIPHERSUITE = AES256_OCB
class Blake3AESOCBRepoKey(ID_BLAKE3_256, AEADKeyBase, FlexiKey):
TYPES_ACCEPTABLE = {KeyType.BLAKE3AESOCBKEYFILE, KeyType.BLAKE3AESOCBREPO}
TYPE = KeyType.BLAKE3AESOCBREPO
NAME = "repokey BLAKE3 AES-OCB"
ARG_NAME = "repokey-blake3-aes-ocb"
STORAGE = KeyBlobStorage.REPO
CIPHERSUITE = AES256_OCB
class Blake3CHPOKeyfileKey(ID_BLAKE3_256, AEADKeyBase, FlexiKey):
TYPES_ACCEPTABLE = {KeyType.BLAKE3CHPOKEYFILE, KeyType.BLAKE3CHPOREPO}
TYPE = KeyType.BLAKE3CHPOKEYFILE
NAME = "key file BLAKE3 ChaCha20-Poly1305"
ARG_NAME = "keyfile-blake3-chacha20-poly1305"
STORAGE = KeyBlobStorage.KEYFILE
CIPHERSUITE = CHACHA20_POLY1305
class Blake3CHPORepoKey(ID_BLAKE3_256, AEADKeyBase, FlexiKey):
TYPES_ACCEPTABLE = {KeyType.BLAKE3CHPOKEYFILE, KeyType.BLAKE3CHPOREPO}
TYPE = KeyType.BLAKE3CHPOREPO
NAME = "repokey BLAKE3 ChaCha20-Poly1305"
ARG_NAME = "repokey-blake3-chacha20-poly1305"
STORAGE = KeyBlobStorage.REPO
class Blake3CHPOKey(ID_BLAKE3_256, AEADKeyBase, FlexiKey):
TYPE = KeyType.BLAKE3CHPO
TYPES_ACCEPTABLE = {TYPE}
NAME = "BLAKE3 ChaCha20-Poly1305"
ARG_NAME = "blake3-chacha20-poly1305"
CIPHERSUITE = CHACHA20_POLY1305
@ -1166,12 +1142,8 @@ AVAILABLE_KEY_TYPES = (
AuthenticatedKey,
# new crypto
Blake3AuthenticatedKey,
AESOCBKeyfileKey,
AESOCBRepoKey,
CHPOKeyfileKey,
CHPORepoKey,
Blake3AESOCBKeyfileKey,
Blake3AESOCBRepoKey,
Blake3CHPOKeyfileKey,
Blake3CHPORepoKey,
AESOCBKey,
CHPOKey,
Blake3AESOCBKey,
Blake3CHPOKey,
)

View file

@ -9,7 +9,7 @@ from ..repoobj import RepoObj
from .key import keyfile_format, keyfile_parse, is_keyfile
from .key import RepoKeyNotFoundError, KeyBlobStorage, identify_key, keyfile_name_for
from .key import RepoKeyNotFoundError, KeyBlobStorage, KEY_LOCATIONS, identify_key, keyfile_name_for
class NotABorgKeyFile(Error):
@ -103,17 +103,19 @@ class KeyManager:
self.loaded_label = selected["label"]
def store_keyblob(self, args):
if self.keyblob_storage == KeyBlobStorage.KEYFILE:
from .key import CHPOKeyfileKey
# storage location for the imported key: --key-location wins, else the class default.
storage = KEY_LOCATIONS.get(getattr(args, "key_location", None), self.keyblob_storage)
if storage == KeyBlobStorage.KEYFILE:
from .key import CHPOKey
k = CHPOKeyfileKey(self.repository)
k = CHPOKey(self.repository)
target = k.get_existing_or_new_target(args)
keyfile_data = self.get_keyfile_data()
if not os.environ.get("BORG_KEY_FILE") and os.path.samefile(target, get_keys_dir()):
target = os.path.join(target, keyfile_name_for(keyfile_data.encode()))
with dash_open(target, "w") as fd:
fd.write(keyfile_data)
elif self.keyblob_storage == KeyBlobStorage.REPO:
elif storage == KeyBlobStorage.REPO:
key_data = keyfile_format(bin_to_hex(self.repository.id), self.keyblob.strip())
self.repository.save_key(key_data.encode("utf-8"))

View file

@ -92,40 +92,30 @@ class Blake2AuthenticatedKey(ID_BLAKE2b_256, AuthenticatedKeyBase): # type: ign
ARG_NAME = "authenticated-blake2"
class KeyfileKey(Pbkdf2FileMixin, ID_HMAC_SHA_256, AESKeyBase, FlexiKey): # type: ignore[misc]
# borg 1.x AES-CTR keys. keyfile and repokey are no longer separate classes - storage is a per-key
# property (self.storage), tracked when the key is loaded. These classes are read-only (borg 2 only
# reads borg 1.x repos, e.g. via borg transfer; it never creates them), so the canonical TYPE byte is
# never written - only TYPES_ACCEPTABLE matters, and it keeps the historic keyfile/repokey/passphrase bytes.
class AESCTRKey(Pbkdf2FileMixin, ID_HMAC_SHA_256, AESKeyBase, FlexiKey): # type: ignore[misc]
TYPES_ACCEPTABLE = {KeyType.KEYFILE, KeyType.REPO, KeyType.PASSPHRASE}
TYPE = KeyType.KEYFILE
NAME = "key file"
ARG_NAME = "keyfile"
STORAGE = KeyBlobStorage.KEYFILE
NAME = "AES-CTR HMAC-SHA256"
ARG_NAME = None # not creatable: borg 1.x compatibility (read-only)
STORAGE = KeyBlobStorage.REPO # seed default; actual per-key storage is tracked in self.storage on load
LOCATION_CONFIGURABLE = True # borg 1.x had keyfile and repokey variants
CIPHERSUITE = AES256_CTR_HMAC_SHA256
class RepoKey(Pbkdf2FileMixin, ID_HMAC_SHA_256, AESKeyBase, FlexiKey): # type: ignore[misc]
TYPES_ACCEPTABLE = {KeyType.KEYFILE, KeyType.REPO, KeyType.PASSPHRASE}
TYPE = KeyType.REPO
NAME = "repokey"
ARG_NAME = "repokey"
STORAGE = KeyBlobStorage.REPO
CIPHERSUITE = AES256_CTR_HMAC_SHA256
class Blake2KeyfileKey(Pbkdf2FileMixin, ID_BLAKE2b_256, AESKeyBase, FlexiKey): # type: ignore[misc]
class Blake2AESCTRKey(Pbkdf2FileMixin, ID_BLAKE2b_256, AESKeyBase, FlexiKey): # type: ignore[misc]
TYPES_ACCEPTABLE = {KeyType.BLAKE2KEYFILE, KeyType.BLAKE2REPO}
TYPE = KeyType.BLAKE2KEYFILE
NAME = "key file BLAKE2b"
ARG_NAME = "keyfile-blake2"
STORAGE = KeyBlobStorage.KEYFILE
NAME = "AES-CTR BLAKE2b"
ARG_NAME = None # not creatable: borg 1.x compatibility (read-only)
STORAGE = KeyBlobStorage.REPO # seed default; actual per-key storage is tracked in self.storage on load
LOCATION_CONFIGURABLE = True # borg 1.x had keyfile and repokey variants
CIPHERSUITE = AES256_CTR_BLAKE2b
class Blake2RepoKey(Pbkdf2FileMixin, ID_BLAKE2b_256, AESKeyBase, FlexiKey): # type: ignore[misc]
TYPES_ACCEPTABLE = {KeyType.BLAKE2KEYFILE, KeyType.BLAKE2REPO}
TYPE = KeyType.BLAKE2REPO
NAME = "repokey BLAKE2b"
ARG_NAME = "repokey-blake2"
STORAGE = KeyBlobStorage.REPO
CIPHERSUITE = AES256_CTR_BLAKE2b
LEGACY_KEY_TYPES = (KeyfileKey, RepoKey, Blake2KeyfileKey, Blake2RepoKey, Blake2AuthenticatedKey)
LEGACY_KEY_TYPES = (AESCTRKey, Blake2AESCTRKey, Blake2AuthenticatedKey)

View file

@ -31,8 +31,12 @@ from .. import are_symlinks_supported, are_hardlinks_supported, are_fifos_suppor
from ..platform.platform_test import is_win32
from ...xattr import get_all
RK_ENCRYPTION = "--encryption=repokey-aes-ocb"
KF_ENCRYPTION = "--encryption=keyfile-chacha20-poly1305"
# --encryption now selects only the crypto suite; key storage is chosen with --key-location
# (default: repokey). RK_* stays a single token (repokey is the default); for keyfile storage,
# pass KF_ENCRYPTION together with KF_LOCATION.
RK_ENCRYPTION = "--encryption=aes-ocb"
KF_ENCRYPTION = "--encryption=chacha20-poly1305"
KF_LOCATION = "--key-location=keyfile"
# This points to the ``src/borg/archiver`` directory (small, with only a few files).
# There are quite a lot of files in there, because there is a __pycache__ subdirectory.

View file

@ -356,7 +356,7 @@ def test_extra_chunks(archivers, request):
cmd(archiver, "check", "-v", exit_code=0) # check does not deal with orphans anymore
@pytest.mark.parametrize("init_args", [["--encryption=repokey-aes-ocb"], ["--encryption", "none"]])
@pytest.mark.parametrize("init_args", [["--encryption=aes-ocb"], ["--encryption", "none"]])
def test_verify_data(archivers, request, init_args):
archiver = request.getfixturevalue(archivers)
if archiver.get_kind() != "local":
@ -392,7 +392,7 @@ def test_verify_data(archivers, request, init_args):
assert f"{src_file}: Missing file chunk detected" in output
@pytest.mark.parametrize("init_args", [["--encryption=repokey-aes-ocb"], ["--encryption", "none"]])
@pytest.mark.parametrize("init_args", [["--encryption=aes-ocb"], ["--encryption", "none"]])
def test_corrupted_file_chunk(archivers, request, init_args):
## similar to test_verify_data, but here we let the low level repository-only checks discover the issue.

View file

@ -5,14 +5,23 @@ from hashlib import sha256
import pytest
from ...constants import * # NOQA
from ...crypto.key import AESOCBRepoKey, AESOCBKeyfileKey, CHPOKeyfileKey, Passphrase, is_keyfile, keyfile_parse
from ...constants import KeyBlobStorage
from ...crypto.key import AESOCBKey, CHPOKey, Passphrase, is_keyfile, keyfile_parse
from ...crypto.keymanager import RepoIdMismatch, NotABorgKeyFile
from ...helpers import CommandError
from ...helpers import bin_to_hex, hex_to_bin
from ...helpers import msgpack
from ...repository import Repository
from ..crypto.key_test import TestKey
from . import RK_ENCRYPTION, KF_ENCRYPTION, cmd, _extract_repository_id, _set_repository_id, generate_archiver_tests
from . import (
RK_ENCRYPTION,
KF_ENCRYPTION,
KF_LOCATION,
cmd,
_extract_repository_id,
_set_repository_id,
generate_archiver_tests,
)
pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary") # NOQA
@ -34,24 +43,24 @@ def test_change_location_to_keyfile(archivers, request):
assert "(repokey" in log
cmd(archiver, "key", "change-location", "keyfile")
log = cmd(archiver, "repo-info")
assert "(key file" in log
assert "(keyfile" in log
def test_change_location_to_b3keyfile(archivers, request):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", "--encryption=repokey-blake3-aes-ocb")
cmd(archiver, "repo-create", "--encryption=blake3-aes-ocb")
log = cmd(archiver, "repo-info")
assert "(repokey BLAKE3" in log
assert "(repokey, BLAKE3" in log
cmd(archiver, "key", "change-location", "keyfile")
log = cmd(archiver, "repo-info")
assert "(key file BLAKE3" in log
assert "(keyfile, BLAKE3" in log
def test_change_location_to_repokey(archivers, request):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", KF_ENCRYPTION)
cmd(archiver, "repo-create", KF_ENCRYPTION, KF_LOCATION)
log = cmd(archiver, "repo-info")
assert "(key file" in log
assert "(keyfile" in log
cmd(archiver, "key", "change-location", "repokey")
log = cmd(archiver, "repo-info")
assert "(repokey" in log
@ -59,17 +68,17 @@ def test_change_location_to_repokey(archivers, request):
def test_change_location_to_b3repokey(archivers, request):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", "--encryption=keyfile-blake3-aes-ocb")
cmd(archiver, "repo-create", "--encryption=blake3-aes-ocb", KF_LOCATION)
log = cmd(archiver, "repo-info")
assert "(key file BLAKE3" in log
assert "(keyfile, BLAKE3" in log
cmd(archiver, "key", "change-location", "repokey")
log = cmd(archiver, "repo-info")
assert "(repokey BLAKE3" in log
assert "(repokey, BLAKE3" in log
def test_keyfile_name_is_content_sha256(archivers, request):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", KF_ENCRYPTION)
cmd(archiver, "repo-create", KF_ENCRYPTION, KF_LOCATION)
[key_filename] = os.listdir(archiver.keys_path)
key_path = os.path.join(archiver.keys_path, key_filename)
with open(key_path, "rb") as fd:
@ -79,7 +88,7 @@ def test_keyfile_name_is_content_sha256(archivers, request):
def test_change_passphrase_renames_keyfile_to_new_sha256(archivers, request):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", KF_ENCRYPTION)
cmd(archiver, "repo-create", KF_ENCRYPTION, KF_LOCATION)
[old_key_filename] = os.listdir(archiver.keys_path)
old_key_path = os.path.join(archiver.keys_path, old_key_filename)
os.environ["BORG_NEW_PASSPHRASE"] = "newpassphrase"
@ -99,7 +108,7 @@ def test_borg_key_file_env_keeps_explicit_path(archivers, request, monkeypatch):
archiver = request.getfixturevalue(archivers)
explicit_key_path = os.path.join(archiver.output_path, "explicit-key")
monkeypatch.setenv("BORG_KEY_FILE", explicit_key_path)
cmd(archiver, "repo-create", KF_ENCRYPTION)
cmd(archiver, "repo-create", KF_ENCRYPTION, KF_LOCATION)
assert os.path.isfile(explicit_key_path)
assert os.listdir(archiver.keys_path) == []
@ -107,7 +116,7 @@ def test_borg_key_file_env_keeps_explicit_path(archivers, request, monkeypatch):
def test_key_export_keyfile(archivers, request):
archiver = request.getfixturevalue(archivers)
export_file = archiver.output_path + "/exported"
cmd(archiver, "repo-create", KF_ENCRYPTION)
cmd(archiver, "repo-create", KF_ENCRYPTION, KF_LOCATION)
repo_id = _extract_repository_id(archiver.repository_path)
cmd(archiver, "key", "export", export_file)
@ -125,7 +134,7 @@ def test_key_export_keyfile(archivers, request):
os.unlink(key_file)
cmd(archiver, "key", "import", export_file)
cmd(archiver, "key", "import", export_file, "--key-location=keyfile")
with open(key_file) as fd:
key_contents2 = fd.read()
@ -135,7 +144,7 @@ def test_key_export_keyfile(archivers, request):
def test_key_import_keyfile_with_borg_key_file(archivers, request, monkeypatch):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", KF_ENCRYPTION)
cmd(archiver, "repo-create", KF_ENCRYPTION, KF_LOCATION)
exported_key_file = os.path.join(archiver.output_path, "exported")
cmd(archiver, "key", "export", exported_key_file)
@ -147,7 +156,7 @@ def test_key_import_keyfile_with_borg_key_file(archivers, request, monkeypatch):
imported_key_file = os.path.join(archiver.output_path, "imported")
monkeypatch.setenv("BORG_KEY_FILE", imported_key_file)
cmd(archiver, "key", "import", exported_key_file)
cmd(archiver, "key", "import", exported_key_file, "--key-location=keyfile")
assert not os.path.isfile(key_file), '"borg key import" should respect BORG_KEY_FILE'
with open(imported_key_file) as fd:
@ -168,10 +177,11 @@ def test_key_export_repokey(archivers, request):
assert is_keyfile(export_contents, bin_to_hex(repo_id))
with Repository(archiver.repository_path) as repository:
repo_key = AESOCBRepoKey(repository)
repo_key = AESOCBKey(repository) # default storage (repokey): load_any finds the repo's key
repo_key.load(None, Passphrase.env_passphrase())
backup_key = AESOCBKeyfileKey(TestKey.MockRepository(id=repo_id))
backup_key = AESOCBKey(TestKey.MockRepository(id=repo_id))
backup_key.storage = KeyBlobStorage.KEYFILE # load explicitly from the exported keyfile
backup_key.load(export_file, Passphrase.env_passphrase())
assert repo_key.crypt_key == backup_key.crypt_key
@ -182,7 +192,7 @@ def test_key_export_repokey(archivers, request):
cmd(archiver, "key", "import", export_file)
with Repository(archiver.repository_path) as repository:
repo_key2 = AESOCBRepoKey(repository)
repo_key2 = AESOCBKey(repository)
repo_key2.load(None, Passphrase.env_passphrase())
assert repo_key2.crypt_key == repo_key.crypt_key
@ -232,7 +242,7 @@ def test_key_export_qr_directory(archivers, request):
def test_key_import_errors(archivers, request):
archiver = request.getfixturevalue(archivers)
export_file = archiver.output_path + "/exported"
cmd(archiver, "repo-create", KF_ENCRYPTION)
cmd(archiver, "repo-create", KF_ENCRYPTION, KF_LOCATION)
if archiver.FORK_DEFAULT:
expected_ec = CommandError().exit_code
cmd(archiver, "key", "import", export_file, exit_code=expected_ec)
@ -265,12 +275,12 @@ def test_key_export_paperkey(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_id = "e294423506da4e1ea76e8dcdf1a3919624ae3ae496fddf905610c351d3f09239"
export_file = archiver.output_path + "/exported"
cmd(archiver, "repo-create", KF_ENCRYPTION)
cmd(archiver, "repo-create", KF_ENCRYPTION, KF_LOCATION)
_set_repository_id(archiver.repository_path, hex_to_bin(repo_id))
key_file = archiver.keys_path + "/" + os.listdir(archiver.keys_path)[0]
with open(key_file, "w") as fd:
fd.write(CHPOKeyfileKey.FILE_ID + " " + repo_id + "\n")
fd.write(CHPOKey.FILE_ID + " " + repo_id + "\n")
fd.write(binascii.b2a_base64(b"abcdefghijklmnopqrstu").decode())
cmd(archiver, "key", "export", "--paper", export_file)
@ -293,12 +303,12 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02
def test_key_import_paperkey(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_id = "e294423506da4e1ea76e8dcdf1a3919624ae3ae496fddf905610c351d3f09239"
cmd(archiver, "repo-create", KF_ENCRYPTION)
cmd(archiver, "repo-create", KF_ENCRYPTION, KF_LOCATION)
_set_repository_id(archiver.repository_path, hex_to_bin(repo_id))
key_file = archiver.keys_path + "/" + os.listdir(archiver.keys_path)[0]
with open(key_file, "w") as fd:
fd.write(AESOCBKeyfileKey.FILE_ID + " " + repo_id + "\n")
fd.write(AESOCBKey.FILE_ID + " " + repo_id + "\n")
fd.write(binascii.b2a_base64(b"abcdefghijklmnopqrstu").decode())
typed_input = (
@ -362,7 +372,7 @@ def test_change_passphrase_does_not_change_algorithm_argon2(archivers, request):
def test_change_location_does_not_change_algorithm_argon2(archivers, request):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", KF_ENCRYPTION)
cmd(archiver, "repo-create", KF_ENCRYPTION, KF_LOCATION)
cmd(archiver, "key", "change-location", "repokey")
with Repository(archiver.repository_path) as repository:
@ -408,23 +418,27 @@ def _key_id_for_label(archiver, label):
raise AssertionError(f"label {label!r} not found in:\n{out}")
@pytest.mark.parametrize("encryption", [RK_ENCRYPTION, KF_ENCRYPTION])
def test_key_first_key_is_admin(archivers, request, encryption):
# crypto suite + key storage combinations used to parametrize the multi-key tests below.
ENC_ARGS_AND_MODE = [((RK_ENCRYPTION,), "repokey"), ((KF_ENCRYPTION, KF_LOCATION), "keyfile")]
ENC_ARGS = [args for args, _mode in ENC_ARGS_AND_MODE]
@pytest.mark.parametrize("enc_args, mode", ENC_ARGS_AND_MODE)
def test_key_first_key_is_admin(archivers, request, enc_args, mode):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", encryption)
cmd(archiver, "repo-create", *enc_args)
out = cmd(archiver, "key", "list")
assert "admin" in out
mode = "keyfile" if encryption == KF_ENCRYPTION else "repokey"
rows = [ln for ln in out.splitlines() if "argon2" in ln]
assert len(rows) == 1
assert rows[0].lstrip().startswith("*")
assert mode in rows[0]
@pytest.mark.parametrize("encryption", [RK_ENCRYPTION, KF_ENCRYPTION])
def test_key_add(archivers, request, encryption):
@pytest.mark.parametrize("enc_args", ENC_ARGS)
def test_key_add(archivers, request, enc_args):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", encryption) # admin = DEFAULT_PASSPHRASE
cmd(archiver, "repo-create", *enc_args) # admin = DEFAULT_PASSPHRASE
os.environ["BORG_NEW_PASSPHRASE"] = "alicepass"
cmd(archiver, "key", "add", "--label", "alice")
@ -450,10 +464,10 @@ def test_key_add_rejects_duplicate_and_reserved(archivers, request):
_expect_error(archiver, "key", "add", "--label", "admin") # reserved
@pytest.mark.parametrize("encryption", [RK_ENCRYPTION, KF_ENCRYPTION])
def test_key_remove_by_label(archivers, request, encryption):
@pytest.mark.parametrize("enc_args", ENC_ARGS)
def test_key_remove_by_label(archivers, request, enc_args):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", encryption)
cmd(archiver, "repo-create", *enc_args)
os.environ["BORG_NEW_PASSPHRASE"] = "alicepass"
cmd(archiver, "key", "add", "--label", "alice")
@ -577,10 +591,12 @@ def test_key_change_location_keeps_label(archivers, request):
cmd(archiver, "key", "change-location", "keyfile")
out = cmd(archiver, "key", "list") # still unlocked as xxx
rows = [ln for ln in out.splitlines() if "argon2" in ln]
assert len(rows) == 1
assert "keyfile" in rows[0]
assert "xxx" in rows[0] # label preserved, not lost
# the migrated xxx key now lives in a keyfile; admin stays a repokey (mixed storage is allowed now)
xxx_rows = [ln for ln in out.splitlines() if "xxx" in ln]
assert len(xxx_rows) == 1
assert "keyfile" in xxx_rows[0] # storage changed
assert "xxx" in xxx_rows[0] # label preserved, not lost
assert "admin" in out # the other borg key is untouched
def _exported_label(path, repo_id):
@ -649,7 +665,7 @@ def test_key_export_rejects_ambiguous_key_selector(archivers, request):
def _store_corrupted_borg_key(repository_path, repo_id):
"""Store a borg key whose envelope is unparseable but which still passes the
keyfile header / repo-id check (so it is enumerated). Returns its key id."""
header = CHPOKeyfileKey.FILE_ID + " " + bin_to_hex(repo_id) + "\n"
header = CHPOKey.FILE_ID + " " + bin_to_hex(repo_id) + "\n"
body = binascii.b2a_base64(b"this is not a valid key envelope").decode() # valid base64, not msgpack
blob = (header + body).encode("utf-8")
with Repository(repository_path) as repository:

View file

@ -6,7 +6,7 @@ import pytest
from ...helpers.errors import Error, CancelledByUser
from ...constants import * # NOQA
from ...crypto.key import FlexiKey
from . import cmd, generate_archiver_tests, RK_ENCRYPTION, KF_ENCRYPTION
from . import cmd, generate_archiver_tests, RK_ENCRYPTION, KF_ENCRYPTION, KF_LOCATION
pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary") # NOQA
@ -42,11 +42,11 @@ def test_repo_create_refuse_to_overwrite_keyfile(archivers, request, monkeypatch
monkeypatch.setenv("BORG_KEY_FILE", keyfile)
original_location = archiver.repository_location
archiver.repository_location = original_location + "0"
cmd(archiver, "repo-create", KF_ENCRYPTION)
cmd(archiver, "repo-create", KF_ENCRYPTION, KF_LOCATION)
with open(keyfile) as file:
before = file.read()
archiver.repository_location = original_location + "1"
arg = ("repo-create", KF_ENCRYPTION)
arg = ("repo-create", KF_ENCRYPTION, KF_LOCATION)
if archiver.FORK_DEFAULT:
cmd(archiver, *arg, exit_code=2)
else:

View file

@ -27,7 +27,7 @@ def repo_url(request, tmpdir, monkeypatch):
tmpdir.remove(rec=1)
@pytest.fixture(params=["none", "repokey-aes-ocb"])
@pytest.fixture(params=["none", "aes-ocb"])
def repo(request, cmd_fixture, repo_url):
cmd_fixture(f"--repo={repo_url}", "repo-create", "--encryption", request.param)
return repo_url

View file

@ -7,7 +7,7 @@ from .hashindex_test import H
from .crypto.key_test import TestKey
from ..archive import Statistics
from ..cache import AdHocWithFilesCache, FileCacheEntry, delete_chunkindex_cache, read_chunkindex_from_repo_cache
from ..crypto.key import AESOCBRepoKey
from ..crypto.key import AESOCBKey
from ..helpers import safe_ns
from ..helpers.msgpack import int_to_timestamp
from ..manifest import Manifest
@ -25,7 +25,7 @@ class TestAdHocWithFilesCache:
@pytest.fixture
def key(self, repository, monkeypatch):
monkeypatch.setenv("BORG_PASSPHRASE", "test")
key = AESOCBRepoKey.create(repository, TestKey.MockArgs())
key = AESOCBKey.create(repository, TestKey.MockArgs())
return key
@pytest.fixture

View file

@ -9,8 +9,8 @@ from ...crypto.low_level import bytes_to_long, bytes_to_int, long_to_bytes
from ...crypto.low_level import hmac_sha256
from ...legacy.crypto.low_level import AES
from hashlib import sha256
from ...crypto.key import CHPOKeyfileKey, AESOCBRepoKey, KeyBase, PlaintextKey
from ...legacy.crypto.key import KeyfileKey as LegacyKeyfileKey
from ...crypto.key import CHPOKey, AESOCBKey, KeyBase, PlaintextKey
from ...legacy.crypto.key import AESCTRKey as LegacyAESCTRKey
from ...helpers import msgpack, bin_to_hex
from .. import BaseTestCase
@ -217,7 +217,7 @@ def test_decrypt_key_file_argon2_chacha20_poly1305():
"data": envelope,
}
)
key = CHPOKeyfileKey(None)
key = CHPOKey(None)
decrypted = key.decrypt_key_file(encrypted, "hello, pass phrase")
@ -228,13 +228,13 @@ def test_decrypt_key_file_pbkdf2_sha256_aes256_ctr_hmac_sha256():
plain = b"hello"
salt = b"salt" * 4
passphrase = "hello, pass phrase"
key = LegacyKeyfileKey.pbkdf2(passphrase, salt, 1, 32)
key = LegacyAESCTRKey.pbkdf2(passphrase, salt, 1, 32)
hash = hmac_sha256(key, plain)
data = AES(key, b"\0" * 16).encrypt(plain)
encrypted = msgpack.packb(
{"version": 1, "algorithm": "sha256", "iterations": 1, "salt": salt, "data": data, "hash": hash}
)
key = LegacyKeyfileKey(None)
key = LegacyAESCTRKey(None)
decrypted = key.decrypt_key_file(encrypted, passphrase)
@ -275,11 +275,11 @@ def test_repo_key_detect_does_not_raise_integrity_error(getpass, monkeypatch):
repository = MagicMock(id=b"repository_id")
getpass.return_value = "hello, pass phrase"
monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "no")
AESOCBRepoKey.create(repository, args=MagicMock(key_algorithm="argon2"))
AESOCBKey.create(repository, args=MagicMock(key_algorithm="argon2"))
saved = repository.store_key.call_args.args[0]
repository.load_keys.return_value = [("key0", saved)]
AESOCBRepoKey.detect(repository, manifest_data=None)
AESOCBKey.detect(repository, manifest_data=None)
class TestDeriveKey(BaseTestCase):

View file

@ -6,10 +6,9 @@ from unittest.mock import MagicMock
import pytest
from ...crypto.key import PlaintextKey, AuthenticatedKey, Blake2AuthenticatedKey, keyfile_parse
from ...crypto.key import RepoKey, KeyfileKey, Blake2RepoKey, Blake2KeyfileKey
from ...crypto.key import AESCTRKey, Blake2AESCTRKey
from ...crypto.key import AEADKeyBase
from ...crypto.key import AESOCBRepoKey, AESOCBKeyfileKey, CHPORepoKey, CHPOKeyfileKey
from ...crypto.key import Blake3AESOCBRepoKey, Blake3AESOCBKeyfileKey, Blake3CHPORepoKey, Blake3CHPOKeyfileKey
from ...crypto.key import AESOCBKey, CHPOKey, Blake3AESOCBKey, Blake3CHPOKey
from ...crypto.key import Blake3AuthenticatedKey
from ...crypto.key import ID_HMAC_SHA_256, ID_BLAKE2b_256, ID_BLAKE3_256
from ...crypto.key import UnsupportedManifestError, UnsupportedKeyFormatError
@ -26,6 +25,14 @@ class TestKey:
class MockArgs:
location = Location(tempfile.mkstemp()[1])
key_algorithm = "argon2"
key_location = "repokey" # default storage; tests that want a keyfile use kf_args() below
@classmethod
def kf_args(cls):
# like MockArgs(), but selects keyfile storage (the unified key classes default to repokey).
args = cls.MockArgs()
args.key_location = "keyfile"
return args
keyfile2_key_file = """
BORG_KEY 0000000000000000000000000000000000000000000000000000000000000000
@ -75,25 +82,21 @@ class TestKey:
@pytest.fixture(
params=(
# keyfile and repokey are no longer separate classes (storage is a per-key property),
# so each crypto suite appears once here.
# not encrypted
PlaintextKey,
AuthenticatedKey,
Blake3AuthenticatedKey,
# legacy crypto
KeyfileKey,
Blake2KeyfileKey,
RepoKey,
Blake2RepoKey,
AESCTRKey,
Blake2AESCTRKey,
Blake2AuthenticatedKey,
# new crypto
AESOCBKeyfileKey,
AESOCBRepoKey,
Blake3AESOCBKeyfileKey,
Blake3AESOCBRepoKey,
CHPOKeyfileKey,
CHPORepoKey,
Blake3CHPOKeyfileKey,
Blake3CHPORepoKey,
AESOCBKey,
Blake3AESOCBKey,
CHPOKey,
Blake3CHPOKey,
)
)
def key(self, request, monkeypatch):
@ -118,7 +121,9 @@ class TestKey:
self.key_data = data
def load_key(self):
return self.key_data
# mirror a real repository: no repokey stored yet -> empty bytes (not an error). Detection
# is storage-agnostic now and always probes repo candidates, even for keyfile keys.
return getattr(self, "key_data", b"")
def test_plaintext(self):
key = PlaintextKey.create(None, None)
@ -129,7 +134,7 @@ class TestKey:
def test_keyfile(self, monkeypatch, keys_dir):
monkeypatch.setenv("BORG_PASSPHRASE", "test")
key = KeyfileKey.create(self.MockRepository(), self.MockArgs())
key = AESCTRKey.create(self.MockRepository(), self.kf_args())
assert key.cipher.next_iv() == 0
chunk = b"ABC"
id = key.id_hash(chunk)
@ -140,8 +145,8 @@ class TestKey:
assert key.decrypt(id, manifest) == key.decrypt(id, manifest2)
assert key.cipher.extract_iv(manifest2) == 1
iv = key.cipher.extract_iv(manifest)
key2 = KeyfileKey.detect(self.MockRepository(), manifest)
assert key2.cipher.next_iv() >= iv + key2.cipher.block_count(len(manifest) - KeyfileKey.PAYLOAD_OVERHEAD)
key2 = AESCTRKey.detect(self.MockRepository(), manifest)
assert key2.cipher.next_iv() >= iv + key2.cipher.block_count(len(manifest) - AESCTRKey.PAYLOAD_OVERHEAD)
# Key data sanity check
assert len({key2.id_key, key2.crypt_key}) == 2
assert key2.chunk_seed != 0
@ -154,22 +159,22 @@ class TestKey:
monkeypatch.setenv("BORG_KEY_FILE", str(keyfile))
monkeypatch.setenv("BORG_PASSPHRASE", "testkf")
assert not keyfile.exists()
key = CHPOKeyfileKey.create(self.MockRepository(), self.MockArgs())
key = CHPOKey.create(self.MockRepository(), self.kf_args())
assert keyfile.exists()
chunk = b"ABC"
chunk_id = key.id_hash(chunk)
chunk_cdata = key.encrypt(chunk_id, chunk)
key = CHPOKeyfileKey.detect(self.MockRepository(), chunk_cdata)
key = CHPOKey.detect(self.MockRepository(), chunk_cdata)
assert chunk == key.decrypt(chunk_id, chunk_cdata)
keyfile.remove()
with pytest.raises(FileNotFoundError):
CHPOKeyfileKey.detect(self.MockRepository(), chunk_cdata)
CHPOKey.detect(self.MockRepository(), chunk_cdata)
def test_keyfile2(self, monkeypatch, keys_dir):
with keys_dir.join("keyfile").open("w") as fd:
fd.write(self.keyfile2_key_file)
monkeypatch.setenv("BORG_PASSPHRASE", "passphrase")
key = KeyfileKey.detect(self.MockRepository(), self.keyfile2_cdata)
key = AESCTRKey.detect(self.MockRepository(), self.keyfile2_cdata)
assert key.decrypt(self.keyfile2_id, self.keyfile2_cdata) == b"payload"
def test_keyfile2_kfenv(self, tmpdir, monkeypatch):
@ -178,23 +183,23 @@ class TestKey:
fd.write(self.keyfile2_key_file)
monkeypatch.setenv("BORG_KEY_FILE", str(keyfile))
monkeypatch.setenv("BORG_PASSPHRASE", "passphrase")
key = KeyfileKey.detect(self.MockRepository(), self.keyfile2_cdata)
key = AESCTRKey.detect(self.MockRepository(), self.keyfile2_cdata)
assert key.decrypt(self.keyfile2_id, self.keyfile2_cdata) == b"payload"
def test_keyfile_blake2(self, monkeypatch, keys_dir):
with keys_dir.join("keyfile").open("w") as fd:
fd.write(self.keyfile_blake2_key_file)
monkeypatch.setenv("BORG_PASSPHRASE", "passphrase")
key = Blake2KeyfileKey.detect(self.MockRepository(), self.keyfile_blake2_cdata)
key = Blake2AESCTRKey.detect(self.MockRepository(), self.keyfile_blake2_cdata)
assert key.decrypt(self.keyfile_blake2_id, self.keyfile_blake2_cdata) == b"payload"
def test_legacy_named_keyfile_still_loads(self, monkeypatch, keys_dir):
monkeypatch.setenv("BORG_PASSPHRASE", "test")
key = CHPOKeyfileKey.create(self.MockRepository(), self.MockArgs())
key = CHPOKey.create(self.MockRepository(), self.kf_args())
hashed_keyfile = key.target
legacy_keyfile = str(keys_dir.join("legacy-name"))
os.replace(hashed_keyfile, legacy_keyfile)
key2 = CHPOKeyfileKey.detect(self.MockRepository(), key.encrypt(b"", b"payload"))
key2 = CHPOKey.detect(self.MockRepository(), key.encrypt(b"", b"payload"))
assert key2.target == legacy_keyfile
def _corrupt_byte(self, key, data, offset):
@ -209,7 +214,7 @@ class TestKey:
with keys_dir.join("keyfile").open("w") as fd:
fd.write(self.keyfile2_key_file)
monkeypatch.setenv("BORG_PASSPHRASE", "passphrase")
key = KeyfileKey.detect(self.MockRepository(), self.keyfile2_cdata)
key = AESCTRKey.detect(self.MockRepository(), self.keyfile2_cdata)
data = self.keyfile2_cdata
for i in range(len(data)):
@ -284,7 +289,7 @@ class TestTAM:
@pytest.fixture
def key(self, monkeypatch):
monkeypatch.setenv("BORG_PASSPHRASE", "test")
return CHPOKeyfileKey.create(TestKey.MockRepository(), TestKey.MockArgs())
return CHPOKey.create(TestKey.MockRepository(), TestKey.MockArgs())
def test_unpack_future(self, key):
blob = b"\xc1\xc1\xc1\xc1foobar"
@ -312,7 +317,7 @@ class TestTAM:
def test_decrypt_key_file_unsupported_algorithm():
"""We will add more algorithms in the future. We should raise a helpful error."""
key = CHPOKeyfileKey(None)
key = CHPOKey(None)
encrypted = msgpack.packb({"algorithm": "THIS ALGORITHM IS NOT SUPPORTED", "version": 1})
with pytest.raises(UnsupportedKeyFormatError):
@ -321,7 +326,7 @@ def test_decrypt_key_file_unsupported_algorithm():
def test_decrypt_key_file_v2_is_unsupported():
"""There may eventually be a version 2 of the format. For now we should raise a helpful error."""
key = CHPOKeyfileKey(None)
key = CHPOKey(None)
encrypted = msgpack.packb({"version": 2})
with pytest.raises(UnsupportedKeyFormatError):
@ -336,10 +341,10 @@ def test_key_file_roundtrip(monkeypatch):
repository = MagicMock(id=b"repository_id")
monkeypatch.setenv("BORG_PASSPHRASE", "hello, pass phrase")
save_me = AESOCBRepoKey.create(repository, args=MagicMock(key_algorithm="argon2"))
save_me = AESOCBKey.create(repository, args=MagicMock(key_algorithm="argon2"))
saved = repository.store_key.call_args.args[0]
repository.load_keys.return_value = [("key0", saved)]
load_me = AESOCBRepoKey.detect(repository, manifest_data=None)
load_me = AESOCBKey.detect(repository, manifest_data=None)
assert to_dict(load_me) == to_dict(save_me)
_, saved_b64 = keyfile_parse(saved)
@ -351,7 +356,7 @@ def test_argon2_wrong_passphrase_returns_none(monkeypatch):
# decrypt_key_file signals this by returning None, not by raising (refs #8036)
repository = MagicMock(id=b"repository_id")
monkeypatch.setenv("BORG_PASSPHRASE", "correct passphrase")
key = AESOCBRepoKey.create(repository, args=MagicMock(key_algorithm="argon2"))
key = AESOCBKey.create(repository, args=MagicMock(key_algorithm="argon2"))
saved = repository.store_key.call_args.args[0]
_, saved_b64 = keyfile_parse(saved)
assert key.decrypt_key_file(a2b_base64(saved_b64), "wrong passphrase") is None

View file

@ -4,7 +4,7 @@ import pytest
from ...crypto.key import UnsupportedKeyFormatError
from ...helpers import msgpack
from ...legacy.crypto.key import KeyfileKey as LegacyKeyfileKey
from ...legacy.crypto.key import AESCTRKey as LegacyAESCTRKey
# ── Pbkdf2FileMixin ───────────────────────────────────────────────────────────
@ -13,7 +13,7 @@ from ...legacy.crypto.key import KeyfileKey as LegacyKeyfileKey
def test_pbkdf2_encrypt_decrypt_roundtrip():
# encrypt_key_file dispatches to encrypt_key_file_pbkdf2; decrypt_key_file
# dispatches back — the round-trip must recover the original plaintext
key_obj = LegacyKeyfileKey(None)
key_obj = LegacyAESCTRKey(None)
plaintext = b"secret key material"
blob = key_obj.encrypt_key_file(plaintext, "correct passphrase", "sha256")
assert key_obj.decrypt_key_file(blob, "correct passphrase") == plaintext
@ -22,7 +22,7 @@ def test_pbkdf2_encrypt_decrypt_roundtrip():
def test_pbkdf2_wrong_passphrase_returns_none():
# a wrong passphrase derives a different key, so the HMAC check fails;
# decrypt_key_file signals this by returning None, not by raising
key_obj = LegacyKeyfileKey(None)
key_obj = LegacyAESCTRKey(None)
blob = key_obj.encrypt_key_file(b"secret key material", "correct passphrase", "sha256")
assert key_obj.decrypt_key_file(blob, "wrong passphrase") is None
@ -31,4 +31,4 @@ def test_pbkdf2_unsupported_version_raises():
# only version 1 is defined in the borg 1.x format; anything else must raise
blob = msgpack.packb({"version": 99})
with pytest.raises(UnsupportedKeyFormatError):
LegacyKeyfileKey(None).decrypt_key_file(blob, "pass")
LegacyAESCTRKey(None).decrypt_key_file(blob, "pass")