mirror of
https://github.com/borgbackup/borg.git
synced 2026-06-11 09:59:19 -04:00
Merge pull request #1888 from enkore/f/secdir
Rename BORG_NONCES_DIR to BORG_SECURITY_DIR and then some
This commit is contained in:
commit
734f8a9417
9 changed files with 226 additions and 56 deletions
|
|
@ -189,9 +189,10 @@ Directories and files:
|
|||
Default to '~/.config/borg/keys'. This directory contains keys for encrypted repositories.
|
||||
BORG_KEY_FILE
|
||||
When set, use the given filename as repository key file.
|
||||
BORG_NONCES_DIR
|
||||
Default to '~/.config/borg/key-nonces'. This directory contains information borg uses to
|
||||
track its usage of NONCES ("numbers used once" - usually in encryption context).
|
||||
BORG_SECURITY_DIR
|
||||
Default to '~/.config/borg/security'. This directory contains information borg uses to
|
||||
track its usage of NONCES ("numbers used once" - usually in encryption context) and other
|
||||
security relevant data.
|
||||
BORG_CACHE_DIR
|
||||
Default to '~/.cache/borg'. This directory contains the local cache and might need a lot
|
||||
of space for dealing with big repositories).
|
||||
|
|
|
|||
|
|
@ -970,6 +970,7 @@ class Archiver:
|
|||
if key.NAME.startswith('key file'):
|
||||
print('Key file: %s' % key.find_key())
|
||||
print('Cache: %s' % cache.path)
|
||||
print('Security dir: %s' % cache.security_manager.dir)
|
||||
print(DASHES)
|
||||
print(STATS_HEADER)
|
||||
print(str(cache))
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ from .constants import CACHE_README
|
|||
from .hashindex import ChunkIndex, ChunkIndexEntry
|
||||
from .helpers import Location
|
||||
from .helpers import Error
|
||||
from .helpers import get_cache_dir
|
||||
from .helpers import get_cache_dir, get_security_dir
|
||||
from .helpers import decode_dict, int_to_bigint, bigint_to_int, bin_to_hex
|
||||
from .helpers import format_file_size
|
||||
from .helpers import yes
|
||||
|
|
@ -29,6 +29,113 @@ ChunkListEntry = namedtuple('ChunkListEntry', 'id size csize')
|
|||
FileCacheEntry = namedtuple('FileCacheEntry', 'age inode size mtime chunk_ids')
|
||||
|
||||
|
||||
class SecurityManager:
|
||||
def __init__(self, repository):
|
||||
self.repository = repository
|
||||
self.dir = get_security_dir(repository.id_str)
|
||||
self.key_type_file = os.path.join(self.dir, 'key-type')
|
||||
self.location_file = os.path.join(self.dir, 'location')
|
||||
self.manifest_ts_file = os.path.join(self.dir, 'manifest-timestamp')
|
||||
|
||||
def known(self):
|
||||
return os.path.exists(self.key_type_file)
|
||||
|
||||
def key_matches(self, key):
|
||||
if not self.known():
|
||||
return False
|
||||
try:
|
||||
with open(self.key_type_file, 'r') as fd:
|
||||
type = fd.read()
|
||||
return type == str(key.TYPE)
|
||||
except OSError as exc:
|
||||
logger.warning('Could not read/parse key type file: %s', exc)
|
||||
|
||||
def save(self, manifest, key, cache):
|
||||
logger.debug('security: saving state for %s to %s', self.repository.id_str, self.dir)
|
||||
current_location = cache.repository._location.canonical_path()
|
||||
logger.debug('security: current location %s', current_location)
|
||||
logger.debug('security: key type %s', str(key.TYPE))
|
||||
logger.debug('security: manifest timestamp %s', manifest.timestamp)
|
||||
with open(self.location_file, 'w') as fd:
|
||||
fd.write(current_location)
|
||||
with open(self.key_type_file, 'w') as fd:
|
||||
fd.write(str(key.TYPE))
|
||||
with open(self.manifest_ts_file, 'w') as fd:
|
||||
fd.write(manifest.timestamp)
|
||||
|
||||
def assert_location_matches(self, cache):
|
||||
# Warn user before sending data to a relocated repository
|
||||
try:
|
||||
with open(self.location_file) as fd:
|
||||
previous_location = fd.read()
|
||||
logger.debug('security: read previous_location %r', previous_location)
|
||||
except FileNotFoundError:
|
||||
logger.debug('security: previous_location file %s not found', self.location_file)
|
||||
previous_location = None
|
||||
except OSError as exc:
|
||||
logger.warning('Could not read previous location file: %s', exc)
|
||||
previous_location = None
|
||||
if cache.previous_location and previous_location != cache.previous_location:
|
||||
# Reconcile cache and security dir; we take the cache location.
|
||||
previous_location = cache.previous_location
|
||||
logger.debug('security: using previous_location of cache: %r', previous_location)
|
||||
if previous_location and previous_location != self.repository._location.canonical_path():
|
||||
msg = ("Warning: The repository at location {} was previously located at {}\n".format(
|
||||
self.repository._location.canonical_path(), previous_location) +
|
||||
"Do you want to continue? [yN] ")
|
||||
if not yes(msg, false_msg="Aborting.", invalid_msg="Invalid answer, aborting.",
|
||||
retry=False, env_var_override='BORG_RELOCATED_REPO_ACCESS_IS_OK'):
|
||||
raise Cache.RepositoryAccessAborted()
|
||||
# adapt on-disk config immediately if the new location was accepted
|
||||
logger.debug('security: updating location stored in cache and security dir')
|
||||
with open(self.location_file, 'w') as fd:
|
||||
fd.write(cache.repository._location.canonical_path())
|
||||
cache.begin_txn()
|
||||
cache.commit()
|
||||
|
||||
def assert_no_manifest_replay(self, manifest, key, cache):
|
||||
try:
|
||||
with open(self.manifest_ts_file) as fd:
|
||||
timestamp = fd.read()
|
||||
logger.debug('security: read manifest timestamp %r', timestamp)
|
||||
except FileNotFoundError:
|
||||
logger.debug('security: manifest timestamp file %s not found', self.manifest_ts_file)
|
||||
timestamp = ''
|
||||
except OSError as exc:
|
||||
logger.warning('Could not read previous location file: %s', exc)
|
||||
timestamp = ''
|
||||
timestamp = max(timestamp, cache.timestamp or '')
|
||||
logger.debug('security: determined newest manifest timestamp as %s', timestamp)
|
||||
# If repository is older than the cache or security dir something fishy is going on
|
||||
if timestamp and timestamp > manifest.timestamp:
|
||||
if isinstance(key, PlaintextKey):
|
||||
raise Cache.RepositoryIDNotUnique()
|
||||
else:
|
||||
raise Cache.RepositoryReplay()
|
||||
|
||||
def assert_key_type(self, key, cache):
|
||||
# Make sure an encrypted repository has not been swapped for an unencrypted repository
|
||||
if cache.key_type is not None and cache.key_type != str(key.TYPE):
|
||||
raise Cache.EncryptionMethodMismatch()
|
||||
if self.known() and not self.key_matches(key):
|
||||
raise Cache.EncryptionMethodMismatch()
|
||||
|
||||
def assert_secure(self, manifest, key, cache):
|
||||
self.assert_location_matches(cache)
|
||||
self.assert_key_type(key, cache)
|
||||
self.assert_no_manifest_replay(manifest, key, cache)
|
||||
if not self.known():
|
||||
self.save(manifest, key, cache)
|
||||
|
||||
def assert_access_unknown(self, warn_if_unencrypted, key):
|
||||
if warn_if_unencrypted and isinstance(key, PlaintextKey) and not self.known():
|
||||
msg = ("Warning: Attempting to access a previously unknown unencrypted repository!\n" +
|
||||
"Do you want to continue? [yN] ")
|
||||
if not yes(msg, false_msg="Aborting.", invalid_msg="Invalid answer, aborting.",
|
||||
retry=False, env_var_override='BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK'):
|
||||
raise Cache.CacheInitAbortedError()
|
||||
|
||||
|
||||
class Cache:
|
||||
"""Client Side cache
|
||||
"""
|
||||
|
|
@ -77,44 +184,19 @@ class Cache:
|
|||
self.key = key
|
||||
self.manifest = manifest
|
||||
self.path = path or os.path.join(get_cache_dir(), repository.id_str)
|
||||
self.security_manager = SecurityManager(repository)
|
||||
self.hostname_is_unique = yes(env_var_override='BORG_HOSTNAME_IS_UNIQUE', prompt=False, env_msg=None)
|
||||
if self.hostname_is_unique:
|
||||
logger.info('Enabled removal of stale cache locks')
|
||||
self.do_files = do_files
|
||||
# Warn user before sending data to a never seen before unencrypted repository
|
||||
if not os.path.exists(self.path):
|
||||
if warn_if_unencrypted and isinstance(key, PlaintextKey):
|
||||
msg = ("Warning: Attempting to access a previously unknown unencrypted repository!" +
|
||||
"\n" +
|
||||
"Do you want to continue? [yN] ")
|
||||
if not yes(msg, false_msg="Aborting.", invalid_msg="Invalid answer, aborting.",
|
||||
retry=False, env_var_override='BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK'):
|
||||
raise self.CacheInitAbortedError()
|
||||
self.security_manager.assert_access_unknown(warn_if_unencrypted, key)
|
||||
self.create()
|
||||
self.open(lock_wait=lock_wait)
|
||||
try:
|
||||
# Warn user before sending data to a relocated repository
|
||||
if self.previous_location and self.previous_location != repository._location.canonical_path():
|
||||
msg = ("Warning: The repository at location {} was previously located at {}".format(repository._location.canonical_path(), self.previous_location) +
|
||||
"\n" +
|
||||
"Do you want to continue? [yN] ")
|
||||
if not yes(msg, false_msg="Aborting.", invalid_msg="Invalid answer, aborting.",
|
||||
retry=False, env_var_override='BORG_RELOCATED_REPO_ACCESS_IS_OK'):
|
||||
raise self.RepositoryAccessAborted()
|
||||
# adapt on-disk config immediately if the new location was accepted
|
||||
self.begin_txn()
|
||||
self.commit()
|
||||
|
||||
self.security_manager.assert_secure(manifest, key, self)
|
||||
if sync and self.manifest.id != self.manifest_id:
|
||||
# If repository is older than the cache something fishy is going on
|
||||
if self.timestamp and self.timestamp > manifest.timestamp:
|
||||
if isinstance(key, PlaintextKey):
|
||||
raise self.RepositoryIDNotUnique()
|
||||
else:
|
||||
raise self.RepositoryReplay()
|
||||
# Make sure an encrypted repository has not been swapped for an unencrypted repository
|
||||
if self.key_type is not None and self.key_type != str(key.TYPE):
|
||||
raise self.EncryptionMethodMismatch()
|
||||
self.sync()
|
||||
self.commit()
|
||||
except:
|
||||
|
|
@ -252,6 +334,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
|
|||
"""
|
||||
if not self.txn_active:
|
||||
return
|
||||
self.security_manager.save(self.manifest, self.key, self)
|
||||
pi = ProgressIndicatorMessage()
|
||||
if self.files is not None:
|
||||
if self._newest_mtime is None:
|
||||
|
|
|
|||
|
|
@ -288,15 +288,17 @@ def get_keys_dir():
|
|||
return keys_dir
|
||||
|
||||
|
||||
def get_nonces_dir():
|
||||
"""Determine where to store the local nonce high watermark"""
|
||||
def get_security_dir(repository_id=None):
|
||||
"""Determine where to store local security information."""
|
||||
|
||||
xdg_config = os.environ.get('XDG_CONFIG_HOME', os.path.join(get_home_dir(), '.config'))
|
||||
nonces_dir = os.environ.get('BORG_NONCES_DIR', os.path.join(xdg_config, 'borg', 'key-nonces'))
|
||||
if not os.path.exists(nonces_dir):
|
||||
os.makedirs(nonces_dir)
|
||||
os.chmod(nonces_dir, stat.S_IRWXU)
|
||||
return nonces_dir
|
||||
security_dir = os.environ.get('BORG_SECURITY_DIR', os.path.join(xdg_config, 'borg', 'security'))
|
||||
if repository_id:
|
||||
security_dir = os.path.join(security_dir, repository_id)
|
||||
if not os.path.exists(security_dir):
|
||||
os.makedirs(security_dir)
|
||||
os.chmod(security_dir, stat.S_IRWXU)
|
||||
return security_dir
|
||||
|
||||
|
||||
def get_cache_dir():
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ import sys
|
|||
from binascii import unhexlify
|
||||
|
||||
from .crypto import bytes_to_long, long_to_bytes
|
||||
from .helpers import get_nonces_dir
|
||||
from .helpers import get_security_dir
|
||||
from .helpers import bin_to_hex
|
||||
from .platform import SaveFile
|
||||
from .remote import InvalidRPCMethod
|
||||
|
|
@ -19,7 +19,7 @@ class NonceManager:
|
|||
self.enc_cipher = enc_cipher
|
||||
self.end_of_nonce_reservation = None
|
||||
self.manifest_nonce = manifest_nonce
|
||||
self.nonce_file = os.path.join(get_nonces_dir(), self.repository.id_str)
|
||||
self.nonce_file = os.path.join(get_security_dir(self.repository.id_str), 'nonce')
|
||||
|
||||
def get_local_free_nonce(self):
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ from ..archiver import Archiver
|
|||
from ..cache import Cache
|
||||
from ..constants import * # NOQA
|
||||
from ..crypto import bytes_to_long, num_aes_blocks
|
||||
from ..helpers import PatternMatcher, parse_pattern, Location
|
||||
from ..helpers import PatternMatcher, parse_pattern, Location, get_security_dir
|
||||
from ..helpers import Chunk, Manifest
|
||||
from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR
|
||||
from ..helpers import bin_to_hex
|
||||
|
|
@ -382,8 +382,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|||
item_count = 4 if has_lchflags else 5 # one file is UF_NODUMP
|
||||
self.assert_in('Number of files: %d' % item_count, info_output)
|
||||
shutil.rmtree(self.cache_path)
|
||||
with environment_variable(BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK='yes'):
|
||||
info_output2 = self.cmd('info', self.repository_location + '::test')
|
||||
info_output2 = self.cmd('info', self.repository_location + '::test')
|
||||
|
||||
def filter(output):
|
||||
# filter for interesting "info" output, ignore cache rebuilding related stuff
|
||||
|
|
@ -563,6 +562,89 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|||
else:
|
||||
self.assert_raises(Cache.RepositoryAccessAborted, lambda: self.cmd('create', self.repository_location + '_encrypted::test.2', 'input'))
|
||||
|
||||
def test_repository_swap_detection_no_cache(self):
|
||||
self.create_test_files()
|
||||
os.environ['BORG_PASSPHRASE'] = 'passphrase'
|
||||
self.cmd('init', '--encryption=repokey', self.repository_location)
|
||||
repository_id = self._extract_repository_id(self.repository_path)
|
||||
self.cmd('create', self.repository_location + '::test', 'input')
|
||||
shutil.rmtree(self.repository_path)
|
||||
self.cmd('init', '--encryption=none', self.repository_location)
|
||||
self._set_repository_id(self.repository_path, repository_id)
|
||||
self.assert_equal(repository_id, self._extract_repository_id(self.repository_path))
|
||||
self.cmd('delete', '--cache-only', self.repository_location)
|
||||
if self.FORK_DEFAULT:
|
||||
self.cmd('create', self.repository_location + '::test.2', 'input', exit_code=EXIT_ERROR)
|
||||
else:
|
||||
self.assert_raises(Cache.EncryptionMethodMismatch, lambda: self.cmd('create', self.repository_location + '::test.2', 'input'))
|
||||
|
||||
def test_repository_swap_detection2_no_cache(self):
|
||||
self.create_test_files()
|
||||
self.cmd('init', '--encryption=none', self.repository_location + '_unencrypted')
|
||||
os.environ['BORG_PASSPHRASE'] = 'passphrase'
|
||||
self.cmd('init', '--encryption=repokey', self.repository_location + '_encrypted')
|
||||
self.cmd('create', self.repository_location + '_encrypted::test', 'input')
|
||||
self.cmd('delete', '--cache-only', self.repository_location + '_unencrypted')
|
||||
self.cmd('delete', '--cache-only', self.repository_location + '_encrypted')
|
||||
shutil.rmtree(self.repository_path + '_encrypted')
|
||||
os.rename(self.repository_path + '_unencrypted', self.repository_path + '_encrypted')
|
||||
if self.FORK_DEFAULT:
|
||||
self.cmd('create', self.repository_location + '_encrypted::test.2', 'input', exit_code=EXIT_ERROR)
|
||||
else:
|
||||
with pytest.raises(Cache.RepositoryAccessAborted):
|
||||
self.cmd('create', self.repository_location + '_encrypted::test.2', 'input')
|
||||
|
||||
def test_repository_move(self):
|
||||
self.cmd('init', self.repository_location)
|
||||
repository_id = bin_to_hex(self._extract_repository_id(self.repository_path))
|
||||
os.rename(self.repository_path, self.repository_path + '_new')
|
||||
with environment_variable(BORG_RELOCATED_REPO_ACCESS_IS_OK='yes'):
|
||||
self.cmd('info', self.repository_location + '_new')
|
||||
security_dir = get_security_dir(repository_id)
|
||||
with open(os.path.join(security_dir, 'location')) as fd:
|
||||
location = fd.read()
|
||||
assert location == Location(self.repository_location + '_new').canonical_path()
|
||||
# Needs no confirmation anymore
|
||||
self.cmd('info', self.repository_location + '_new')
|
||||
shutil.rmtree(self.cache_path)
|
||||
self.cmd('info', self.repository_location + '_new')
|
||||
shutil.rmtree(security_dir)
|
||||
self.cmd('info', self.repository_location + '_new')
|
||||
for file in ('location', 'key-type', 'manifest-timestamp'):
|
||||
assert os.path.exists(os.path.join(security_dir, file))
|
||||
|
||||
def test_security_dir_compat(self):
|
||||
self.cmd('init', self.repository_location)
|
||||
repository_id = bin_to_hex(self._extract_repository_id(self.repository_path))
|
||||
security_dir = get_security_dir(repository_id)
|
||||
with open(os.path.join(security_dir, 'location'), 'w') as fd:
|
||||
fd.write('something outdated')
|
||||
# This is fine, because the cache still has the correct information. security_dir and cache can disagree
|
||||
# if older versions are used to confirm a renamed repository.
|
||||
self.cmd('info', self.repository_location)
|
||||
|
||||
def test_unknown_unencrypted(self):
|
||||
self.cmd('init', '--encryption=none', self.repository_location)
|
||||
repository_id = bin_to_hex(self._extract_repository_id(self.repository_path))
|
||||
security_dir = get_security_dir(repository_id)
|
||||
# Ok: repository is known
|
||||
self.cmd('info', self.repository_location)
|
||||
|
||||
# Ok: repository is still known (through security_dir)
|
||||
shutil.rmtree(self.cache_path)
|
||||
self.cmd('info', self.repository_location)
|
||||
|
||||
# Needs confirmation: cache and security dir both gone (eg. another host or rm -rf ~)
|
||||
shutil.rmtree(self.cache_path)
|
||||
shutil.rmtree(security_dir)
|
||||
if self.FORK_DEFAULT:
|
||||
self.cmd('info', self.repository_location, exit_code=EXIT_ERROR)
|
||||
else:
|
||||
with pytest.raises(Cache.CacheInitAbortedError):
|
||||
self.cmd('info', self.repository_location)
|
||||
with environment_variable(BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK='yes'):
|
||||
self.cmd('info', self.repository_location)
|
||||
|
||||
def test_strip_components(self):
|
||||
self.cmd('init', self.repository_location)
|
||||
self.create_regular_file('dir/file')
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ from ..helpers import Buffer
|
|||
from ..helpers import partial_format, format_file_size, parse_file_size, format_timedelta, format_line, PlaceholderError, replace_placeholders
|
||||
from ..helpers import make_path_safe, clean_lines
|
||||
from ..helpers import prune_within, prune_split
|
||||
from ..helpers import get_cache_dir, get_keys_dir, get_nonces_dir
|
||||
from ..helpers import get_cache_dir, get_keys_dir, get_security_dir
|
||||
from ..helpers import is_slow_msgpack
|
||||
from ..helpers import yes, TRUISH, FALSISH, DEFAULTISH
|
||||
from ..helpers import StableDict, int_to_bigint, bigint_to_int, bin_to_hex
|
||||
|
|
@ -660,14 +660,15 @@ def test_get_keys_dir(monkeypatch):
|
|||
assert get_keys_dir() == '/var/tmp'
|
||||
|
||||
|
||||
def test_get_nonces_dir(monkeypatch):
|
||||
"""test that get_nonces_dir respects environment"""
|
||||
def test_get_security_dir(monkeypatch):
|
||||
"""test that get_security_dir respects environment"""
|
||||
monkeypatch.delenv('XDG_CONFIG_HOME', raising=False)
|
||||
assert get_nonces_dir() == os.path.join(os.path.expanduser('~'), '.config', 'borg', 'key-nonces')
|
||||
assert get_security_dir() == os.path.join(os.path.expanduser('~'), '.config', 'borg', 'security')
|
||||
assert get_security_dir(repository_id='1234') == os.path.join(os.path.expanduser('~'), '.config', 'borg', 'security', '1234')
|
||||
monkeypatch.setenv('XDG_CONFIG_HOME', '/var/tmp/.config')
|
||||
assert get_nonces_dir() == os.path.join('/var/tmp/.config', 'borg', 'key-nonces')
|
||||
monkeypatch.setenv('BORG_NONCES_DIR', '/var/tmp')
|
||||
assert get_nonces_dir() == '/var/tmp'
|
||||
assert get_security_dir() == os.path.join('/var/tmp/.config', 'borg', 'security')
|
||||
monkeypatch.setenv('BORG_SECURITY_DIR', '/var/tmp')
|
||||
assert get_security_dir() == '/var/tmp'
|
||||
|
||||
|
||||
def test_file_size():
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ from ..crypto import bytes_to_long, num_aes_blocks
|
|||
from ..helpers import Location
|
||||
from ..helpers import Chunk
|
||||
from ..helpers import IntegrityError
|
||||
from ..helpers import get_nonces_dir
|
||||
from ..helpers import get_security_dir
|
||||
from ..key import PlaintextKey, PassphraseKey, KeyfileKey, RepoKey, Blake2KeyfileKey, Blake2RepoKey, AuthenticatedKey
|
||||
from ..key import Passphrase, PasswordRetriesExceeded, bin_to_hex
|
||||
|
||||
|
|
@ -118,7 +118,7 @@ class TestKey:
|
|||
def test_keyfile_nonce_rollback_protection(self, monkeypatch, keys_dir):
|
||||
monkeypatch.setenv('BORG_PASSPHRASE', 'test')
|
||||
repository = self.MockRepository()
|
||||
with open(os.path.join(get_nonces_dir(), repository.id_str), "w") as fd:
|
||||
with open(os.path.join(get_security_dir(repository.id_str), 'nonce'), "w") as fd:
|
||||
fd.write("0000000000002000")
|
||||
key = KeyfileKey.create(repository, self.MockArgs())
|
||||
data = key.encrypt(Chunk(b'ABC'))
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import os.path
|
|||
|
||||
import pytest
|
||||
|
||||
from ..helpers import get_nonces_dir
|
||||
from ..helpers import get_security_dir
|
||||
from ..key import bin_to_hex
|
||||
from ..nonces import NonceManager
|
||||
from ..remote import InvalidRPCMethod
|
||||
|
|
@ -61,11 +61,11 @@ class TestNonceManager:
|
|||
self.repository = None
|
||||
|
||||
def cache_nonce(self):
|
||||
with open(os.path.join(get_nonces_dir(), self.repository.id_str), "r") as fd:
|
||||
with open(os.path.join(get_security_dir(self.repository.id_str), 'nonce'), "r") as fd:
|
||||
return fd.read()
|
||||
|
||||
def set_cache_nonce(self, nonce):
|
||||
with open(os.path.join(get_nonces_dir(), self.repository.id_str), "w") as fd:
|
||||
with open(os.path.join(get_security_dir(self.repository.id_str), 'nonce'), "w") as fd:
|
||||
assert fd.write(nonce)
|
||||
|
||||
def test_empty_cache_and_old_server(self, monkeypatch):
|
||||
|
|
|
|||
Loading…
Reference in a new issue