mirror of
https://github.com/borgbackup/borg.git
synced 2026-02-19 02:29:19 -05:00
as a side effect, maybe also better keeps the ssh / tcp connection alive, if there is a bit of traffic every 60s.
907 lines
39 KiB
Python
907 lines
39 KiB
Python
import configparser
|
|
import io
|
|
import os
|
|
import shutil
|
|
import stat
|
|
from collections import namedtuple
|
|
from datetime import datetime, timezone, timedelta
|
|
from time import perf_counter
|
|
|
|
from .logger import create_logger
|
|
|
|
logger = create_logger()
|
|
|
|
files_cache_logger = create_logger("borg.debug.files_cache")
|
|
|
|
from .constants import CACHE_README, FILES_CACHE_MODE_DISABLED, ROBJ_FILE_STREAM, TIME_DIFFERS2_NS
|
|
from .checksums import xxh64
|
|
from .hashindex import ChunkIndex, ChunkIndexEntry
|
|
from .helpers import Error
|
|
from .helpers import get_cache_dir, get_security_dir
|
|
from .helpers import hex_to_bin, bin_to_hex, parse_stringified_list
|
|
from .helpers import format_file_size, safe_encode
|
|
from .helpers import safe_ns
|
|
from .helpers import yes
|
|
from .helpers import ProgressIndicatorMessage
|
|
from .helpers import msgpack
|
|
from .helpers.msgpack import int_to_timestamp, timestamp_to_int
|
|
from .item import ChunkListEntry
|
|
from .crypto.key import PlaintextKey
|
|
from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
|
|
from .manifest import Manifest
|
|
from .platform import SaveFile
|
|
from .remote import RemoteRepository
|
|
from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound, repo_lister
|
|
|
|
# chunks is a list of ChunkListEntry
|
|
FileCacheEntry = namedtuple("FileCacheEntry", "age inode size ctime mtime chunks")
|
|
|
|
|
|
class SecurityManager:
|
|
"""
|
|
Tracks repositories. Ensures that nothing bad happens (repository swaps,
|
|
replay attacks, unknown repositories etc.).
|
|
|
|
This is complicated by the Cache being initially used for this, while
|
|
only some commands actually use the Cache, which meant that other commands
|
|
did not perform these checks.
|
|
|
|
Further complications were created by the Cache being a cache, so it
|
|
could be legitimately deleted, which is annoying because Borg didn't
|
|
recognize repositories after that.
|
|
|
|
Therefore a second location, the security database (see get_security_dir),
|
|
was introduced which stores this information. However, this means that
|
|
the code has to deal with a cache existing but no security DB entry,
|
|
or inconsistencies between the security DB and the cache which have to
|
|
be reconciled, and also with no cache existing but a security DB entry.
|
|
"""
|
|
|
|
def __init__(self, repository):
|
|
self.repository = repository
|
|
self.dir = get_security_dir(repository.id_str, legacy=(repository.version == 1))
|
|
self.cache_dir = cache_dir(repository)
|
|
self.key_type_file = os.path.join(self.dir, "key-type")
|
|
self.location_file = os.path.join(self.dir, "location")
|
|
self.manifest_ts_file = os.path.join(self.dir, "manifest-timestamp")
|
|
|
|
@staticmethod
|
|
def destroy(repository, path=None):
|
|
"""destroy the security dir for ``repository`` or at ``path``"""
|
|
path = path or get_security_dir(repository.id_str, legacy=(repository.version == 1))
|
|
if os.path.exists(path):
|
|
shutil.rmtree(path)
|
|
|
|
def known(self):
|
|
return all(os.path.exists(f) for f in (self.key_type_file, self.location_file, self.manifest_ts_file))
|
|
|
|
def key_matches(self, key):
|
|
if not self.known():
|
|
return False
|
|
try:
|
|
with open(self.key_type_file) as fd:
|
|
type = fd.read()
|
|
return type == str(key.TYPE)
|
|
except OSError as exc:
|
|
logger.warning("Could not read/parse key type file: %s", exc)
|
|
|
|
def save(self, manifest, key):
|
|
logger.debug("security: saving state for %s to %s", self.repository.id_str, self.dir)
|
|
current_location = self.repository._location.canonical_path()
|
|
logger.debug("security: current location %s", current_location)
|
|
logger.debug("security: key type %s", str(key.TYPE))
|
|
logger.debug("security: manifest timestamp %s", manifest.timestamp)
|
|
with SaveFile(self.location_file) as fd:
|
|
fd.write(current_location)
|
|
with SaveFile(self.key_type_file) as fd:
|
|
fd.write(str(key.TYPE))
|
|
with SaveFile(self.manifest_ts_file) as fd:
|
|
fd.write(manifest.timestamp)
|
|
|
|
def assert_location_matches(self):
|
|
# Warn user before sending data to a relocated repository
|
|
try:
|
|
with open(self.location_file) as fd:
|
|
previous_location = fd.read()
|
|
logger.debug("security: read previous location %r", previous_location)
|
|
except FileNotFoundError:
|
|
logger.debug("security: previous location file %s not found", self.location_file)
|
|
previous_location = None
|
|
except OSError as exc:
|
|
logger.warning("Could not read previous location file: %s", exc)
|
|
previous_location = None
|
|
|
|
repository_location = self.repository._location.canonical_path()
|
|
if previous_location and previous_location != repository_location:
|
|
msg = (
|
|
"Warning: The repository at location {} was previously located at {}\n".format(
|
|
repository_location, previous_location
|
|
)
|
|
+ "Do you want to continue? [yN] "
|
|
)
|
|
if not yes(
|
|
msg,
|
|
false_msg="Aborting.",
|
|
invalid_msg="Invalid answer, aborting.",
|
|
retry=False,
|
|
env_var_override="BORG_RELOCATED_REPO_ACCESS_IS_OK",
|
|
):
|
|
raise Cache.RepositoryAccessAborted()
|
|
# adapt on-disk config immediately if the new location was accepted
|
|
logger.debug("security: updating location stored in security dir")
|
|
with SaveFile(self.location_file) as fd:
|
|
fd.write(repository_location)
|
|
|
|
def assert_no_manifest_replay(self, manifest, key):
|
|
try:
|
|
with open(self.manifest_ts_file) as fd:
|
|
timestamp = fd.read()
|
|
logger.debug("security: read manifest timestamp %r", timestamp)
|
|
except FileNotFoundError:
|
|
logger.debug("security: manifest timestamp file %s not found", self.manifest_ts_file)
|
|
timestamp = ""
|
|
except OSError as exc:
|
|
logger.warning("Could not read previous location file: %s", exc)
|
|
timestamp = ""
|
|
logger.debug("security: determined newest manifest timestamp as %s", timestamp)
|
|
# If repository is older than the cache or security dir something fishy is going on
|
|
if timestamp and timestamp > manifest.timestamp:
|
|
if isinstance(key, PlaintextKey):
|
|
raise Cache.RepositoryIDNotUnique()
|
|
else:
|
|
raise Cache.RepositoryReplay()
|
|
|
|
def assert_key_type(self, key):
|
|
# Make sure an encrypted repository has not been swapped for an unencrypted repository
|
|
if self.known() and not self.key_matches(key):
|
|
raise Cache.EncryptionMethodMismatch()
|
|
|
|
def assert_secure(self, manifest, key, *, warn_if_unencrypted=True):
|
|
# warn_if_unencrypted=False is only used for initializing a new repository.
|
|
# Thus, avoiding asking about a repository that's currently initializing.
|
|
self.assert_access_unknown(warn_if_unencrypted, manifest, key)
|
|
self._assert_secure(manifest, key)
|
|
logger.debug("security: repository checks ok, allowing access")
|
|
|
|
def _assert_secure(self, manifest, key):
|
|
self.assert_location_matches()
|
|
self.assert_key_type(key)
|
|
self.assert_no_manifest_replay(manifest, key)
|
|
if not self.known():
|
|
logger.debug("security: remembering previously unknown repository")
|
|
self.save(manifest, key)
|
|
|
|
def assert_access_unknown(self, warn_if_unencrypted, manifest, key):
|
|
# warn_if_unencrypted=False is only used for initializing a new repository.
|
|
# Thus, avoiding asking about a repository that's currently initializing.
|
|
if not key.logically_encrypted and not self.known():
|
|
msg = (
|
|
"Warning: Attempting to access a previously unknown unencrypted repository!\n"
|
|
+ "Do you want to continue? [yN] "
|
|
)
|
|
allow_access = not warn_if_unencrypted or yes(
|
|
msg,
|
|
false_msg="Aborting.",
|
|
invalid_msg="Invalid answer, aborting.",
|
|
retry=False,
|
|
env_var_override="BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK",
|
|
)
|
|
if allow_access:
|
|
if warn_if_unencrypted:
|
|
logger.debug("security: remembering unknown unencrypted repository (explicitly allowed)")
|
|
else:
|
|
logger.debug("security: initializing unencrypted repository")
|
|
self.save(manifest, key)
|
|
else:
|
|
raise Cache.CacheInitAbortedError()
|
|
|
|
|
|
def assert_secure(repository, manifest):
|
|
sm = SecurityManager(repository)
|
|
sm.assert_secure(manifest, manifest.key)
|
|
|
|
|
|
def cache_dir(repository, path=None):
|
|
return path or os.path.join(get_cache_dir(), repository.id_str)
|
|
|
|
|
|
class CacheConfig:
|
|
def __init__(self, repository, path=None):
|
|
self.repository = repository
|
|
self.path = cache_dir(repository, path)
|
|
logger.debug("Using %s as cache", self.path)
|
|
self.config_path = os.path.join(self.path, "config")
|
|
|
|
def __enter__(self):
|
|
self.open()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
|
|
def exists(self):
|
|
return os.path.exists(self.config_path)
|
|
|
|
def create(self):
|
|
assert not self.exists()
|
|
config = configparser.ConfigParser(interpolation=None)
|
|
config.add_section("cache")
|
|
config.set("cache", "version", "1")
|
|
config.set("cache", "repository", self.repository.id_str)
|
|
config.set("cache", "manifest", "")
|
|
config.add_section("integrity")
|
|
config.set("integrity", "manifest", "")
|
|
with SaveFile(self.config_path) as fd:
|
|
config.write(fd)
|
|
|
|
def open(self):
|
|
self.load()
|
|
|
|
def load(self):
|
|
self._config = configparser.ConfigParser(interpolation=None)
|
|
with open(self.config_path) as fd:
|
|
self._config.read_file(fd)
|
|
self._check_upgrade(self.config_path)
|
|
self.id = self._config.get("cache", "repository")
|
|
self.manifest_id = hex_to_bin(self._config.get("cache", "manifest"))
|
|
self.ignored_features = set(parse_stringified_list(self._config.get("cache", "ignored_features", fallback="")))
|
|
self.mandatory_features = set(
|
|
parse_stringified_list(self._config.get("cache", "mandatory_features", fallback=""))
|
|
)
|
|
try:
|
|
self.integrity = dict(self._config.items("integrity"))
|
|
if self._config.get("cache", "manifest") != self.integrity.pop("manifest"):
|
|
# The cache config file is updated (parsed with ConfigParser, the state of the ConfigParser
|
|
# is modified and then written out.), not re-created.
|
|
# Thus, older versions will leave our [integrity] section alone, making the section's data invalid.
|
|
# Therefore, we also add the manifest ID to this section and
|
|
# can discern whether an older version interfered by comparing the manifest IDs of this section
|
|
# and the main [cache] section.
|
|
self.integrity = {}
|
|
logger.warning("Cache integrity data not available: old Borg version modified the cache.")
|
|
except configparser.NoSectionError:
|
|
logger.debug("Cache integrity: No integrity data found (files, chunks). Cache is from old version.")
|
|
self.integrity = {}
|
|
|
|
def save(self, manifest=None):
|
|
if manifest:
|
|
self._config.set("cache", "manifest", manifest.id_str)
|
|
self._config.set("cache", "ignored_features", ",".join(self.ignored_features))
|
|
self._config.set("cache", "mandatory_features", ",".join(self.mandatory_features))
|
|
if not self._config.has_section("integrity"):
|
|
self._config.add_section("integrity")
|
|
for file, integrity_data in self.integrity.items():
|
|
self._config.set("integrity", file, integrity_data)
|
|
self._config.set("integrity", "manifest", manifest.id_str)
|
|
with SaveFile(self.config_path) as fd:
|
|
self._config.write(fd)
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
def _check_upgrade(self, config_path):
|
|
try:
|
|
cache_version = self._config.getint("cache", "version")
|
|
wanted_version = 1
|
|
if cache_version != wanted_version:
|
|
self.close()
|
|
raise Exception(
|
|
"%s has unexpected cache version %d (wanted: %d)." % (config_path, cache_version, wanted_version)
|
|
)
|
|
except configparser.NoSectionError:
|
|
self.close()
|
|
raise Exception("%s does not look like a Borg cache." % config_path) from None
|
|
|
|
|
|
class Cache:
|
|
"""Client Side cache"""
|
|
|
|
class CacheInitAbortedError(Error):
|
|
"""Cache initialization aborted"""
|
|
|
|
exit_mcode = 60
|
|
|
|
class EncryptionMethodMismatch(Error):
|
|
"""Repository encryption method changed since last access, refusing to continue"""
|
|
|
|
exit_mcode = 61
|
|
|
|
class RepositoryAccessAborted(Error):
|
|
"""Repository access aborted"""
|
|
|
|
exit_mcode = 62
|
|
|
|
class RepositoryIDNotUnique(Error):
|
|
"""Cache is newer than repository - do you have multiple, independently updated repos with same ID?"""
|
|
|
|
exit_mcode = 63
|
|
|
|
class RepositoryReplay(Error):
|
|
"""Cache, or information obtained from the security directory is newer than repository - this is either an attack or unsafe (multiple repos with same ID)"""
|
|
|
|
exit_mcode = 64
|
|
|
|
@staticmethod
|
|
def break_lock(repository, path=None):
|
|
pass
|
|
|
|
@staticmethod
|
|
def destroy(repository, path=None):
|
|
"""destroy the cache for ``repository`` or at ``path``"""
|
|
path = path or os.path.join(get_cache_dir(), repository.id_str)
|
|
config = os.path.join(path, "config")
|
|
if os.path.exists(config):
|
|
os.remove(config) # kill config first
|
|
shutil.rmtree(path)
|
|
|
|
def __new__(
|
|
cls,
|
|
repository,
|
|
manifest,
|
|
path=None,
|
|
sync=True,
|
|
warn_if_unencrypted=True,
|
|
progress=False,
|
|
cache_mode=FILES_CACHE_MODE_DISABLED,
|
|
iec=False,
|
|
archive_name=None,
|
|
start_backup=None,
|
|
):
|
|
return AdHocWithFilesCache(
|
|
manifest=manifest,
|
|
path=path,
|
|
warn_if_unencrypted=warn_if_unencrypted,
|
|
progress=progress,
|
|
iec=iec,
|
|
cache_mode=cache_mode,
|
|
archive_name=archive_name,
|
|
start_backup=start_backup,
|
|
)
|
|
|
|
|
|
class FilesCacheMixin:
|
|
"""
|
|
Massively accelerate processing of unchanged files.
|
|
We read the "files cache" (either from cache directory or from previous archive
|
|
in repo) that has metadata for all "already stored" files, like size, ctime/mtime,
|
|
inode number and chunks id/size list.
|
|
When finding a file on disk, we use the metadata to determine if the file is unchanged.
|
|
If so, we use the cached chunks list and skip reading/chunking the file contents.
|
|
"""
|
|
|
|
FILES_CACHE_NAME = "files"
|
|
|
|
def __init__(self, cache_mode, archive_name=None, start_backup=None):
|
|
self.archive_name = archive_name # ideally a SERIES name
|
|
assert not ("c" in cache_mode and "m" in cache_mode)
|
|
assert "d" in cache_mode or "c" in cache_mode or "m" in cache_mode
|
|
self.cache_mode = cache_mode
|
|
self._files = None
|
|
self._newest_cmtime = 0
|
|
self._newest_path_hashes = set()
|
|
self.start_backup = start_backup
|
|
|
|
@property
|
|
def files(self):
|
|
if self._files is None:
|
|
self._files = self._read_files_cache() # try loading from cache dir
|
|
if self._files is None:
|
|
self._files = self._build_files_cache() # try loading from repository
|
|
if self._files is None:
|
|
self._files = {} # start from scratch
|
|
return self._files
|
|
|
|
def _build_files_cache(self):
|
|
"""rebuild the files cache by reading previous archive from repository"""
|
|
if "d" in self.cache_mode: # d(isabled)
|
|
return
|
|
|
|
if not self.archive_name:
|
|
return
|
|
|
|
from .archive import Archive
|
|
|
|
# get the latest archive with the IDENTICAL name, supporting archive series:
|
|
archives = self.manifest.archives.list(match=[self.archive_name], sort_by=["ts"], last=1)
|
|
if not archives:
|
|
# nothing found
|
|
return
|
|
prev_archive = archives[0]
|
|
|
|
files = {}
|
|
logger.debug(
|
|
f"Building files cache from {prev_archive.name} {prev_archive.ts} {bin_to_hex(prev_archive.id)} ..."
|
|
)
|
|
files_cache_logger.debug("FILES-CACHE-BUILD: starting...")
|
|
archive = Archive(self.manifest, prev_archive.id)
|
|
for item in archive.iter_items(preload=False):
|
|
# only put regular files' infos into the files cache:
|
|
if stat.S_ISREG(item.mode):
|
|
path_hash = self.key.id_hash(safe_encode(item.path))
|
|
# keep track of the key(s) for the most recent timestamp(s):
|
|
ctime_ns = item.ctime
|
|
if ctime_ns > self._newest_cmtime:
|
|
self._newest_cmtime = ctime_ns
|
|
self._newest_path_hashes = {path_hash}
|
|
elif ctime_ns == self._newest_cmtime:
|
|
self._newest_path_hashes.add(path_hash)
|
|
mtime_ns = item.mtime
|
|
if mtime_ns > self._newest_cmtime:
|
|
self._newest_cmtime = mtime_ns
|
|
self._newest_path_hashes = {path_hash}
|
|
elif mtime_ns == self._newest_cmtime:
|
|
self._newest_path_hashes.add(path_hash)
|
|
# add the file to the in-memory files cache
|
|
entry = FileCacheEntry(
|
|
age=0,
|
|
inode=item.get("inode", 0),
|
|
size=item.size,
|
|
ctime=int_to_timestamp(ctime_ns),
|
|
mtime=int_to_timestamp(mtime_ns),
|
|
chunks=item.chunks,
|
|
)
|
|
files[path_hash] = msgpack.packb(entry) # takes about 240 Bytes per file
|
|
# deal with special snapshot / timestamp granularity case, see FAQ:
|
|
for path_hash in self._newest_path_hashes:
|
|
del files[path_hash]
|
|
files_cache_logger.debug("FILES-CACHE-BUILD: finished, %d entries loaded.", len(files))
|
|
return files
|
|
|
|
def files_cache_name(self):
|
|
suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "")
|
|
# when using archive series, we automatically make up a separate cache file per series.
|
|
# when not, the user may manually do that by using the env var.
|
|
if not suffix:
|
|
# avoid issues with too complex or long archive_name by hashing it:
|
|
suffix = bin_to_hex(xxh64(self.archive_name.encode()))
|
|
return self.FILES_CACHE_NAME + "." + suffix
|
|
|
|
def discover_files_cache_names(self, path):
|
|
return [fn for fn in os.listdir(path) if fn.startswith(self.FILES_CACHE_NAME + ".")]
|
|
|
|
def _read_files_cache(self):
|
|
"""read files cache from cache directory"""
|
|
if "d" in self.cache_mode: # d(isabled)
|
|
return
|
|
|
|
files = {}
|
|
logger.debug("Reading files cache ...")
|
|
files_cache_logger.debug("FILES-CACHE-LOAD: starting...")
|
|
msg = None
|
|
try:
|
|
with IntegrityCheckedFile(
|
|
path=os.path.join(self.path, self.files_cache_name()),
|
|
write=False,
|
|
integrity_data=self.cache_config.integrity.get(self.files_cache_name()),
|
|
) as fd:
|
|
u = msgpack.Unpacker(use_list=True)
|
|
while True:
|
|
data = fd.read(64 * 1024)
|
|
if not data:
|
|
break
|
|
u.feed(data)
|
|
try:
|
|
for path_hash, item in u:
|
|
entry = FileCacheEntry(*item)
|
|
# in the end, this takes about 240 Bytes per file
|
|
files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
|
|
except (TypeError, ValueError) as exc:
|
|
msg = "The files cache seems invalid. [%s]" % str(exc)
|
|
break
|
|
except OSError as exc:
|
|
msg = "The files cache can't be read. [%s]" % str(exc)
|
|
except FileIntegrityError as fie:
|
|
msg = "The files cache is corrupted. [%s]" % str(fie)
|
|
if msg is not None:
|
|
logger.debug(msg)
|
|
files = None
|
|
files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(files or {}))
|
|
return files
|
|
|
|
def _write_files_cache(self, files):
|
|
"""write files cache to cache directory"""
|
|
max_time_ns = 2**63 - 1 # nanoseconds, good until y2262
|
|
# _self._newest_cmtime might be None if it was never set because no files were modified/added.
|
|
newest_cmtime = self._newest_cmtime if self._newest_cmtime is not None else max_time_ns
|
|
start_backup_time = self.start_backup - TIME_DIFFERS2_NS if self.start_backup is not None else max_time_ns
|
|
# we don't want to persist files cache entries of potentially problematic files:
|
|
discard_after = min(newest_cmtime, start_backup_time)
|
|
ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 2))
|
|
files_cache_logger.debug("FILES-CACHE-SAVE: starting...")
|
|
# TODO: use something like SaveFile here, but that didn't work due to SyncFile missing .seek().
|
|
with IntegrityCheckedFile(path=os.path.join(self.path, self.files_cache_name()), write=True) as fd:
|
|
entries = 0
|
|
age_discarded = 0
|
|
race_discarded = 0
|
|
for path_hash, item in files.items():
|
|
entry = FileCacheEntry(*msgpack.unpackb(item))
|
|
if entry.age == 0: # current entries
|
|
if max(timestamp_to_int(entry.ctime), timestamp_to_int(entry.mtime)) < discard_after:
|
|
# Only keep files seen in this backup that old enough not to suffer race conditions relating
|
|
# to filesystem snapshots and ctime/mtime granularity or being modified while we read them.
|
|
keep = True
|
|
else:
|
|
keep = False
|
|
race_discarded += 1
|
|
else: # old entries
|
|
if entry.age < ttl:
|
|
# Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
|
|
keep = True
|
|
else:
|
|
keep = False
|
|
age_discarded += 1
|
|
if keep:
|
|
msgpack.pack((path_hash, entry), fd)
|
|
entries += 1
|
|
files_cache_logger.debug(f"FILES-CACHE-KILL: removed {age_discarded} entries with age >= TTL [{ttl}]")
|
|
t_str = datetime.fromtimestamp(discard_after / 1e9, timezone.utc).isoformat()
|
|
files_cache_logger.debug(f"FILES-CACHE-KILL: removed {race_discarded} entries with ctime/mtime >= {t_str}")
|
|
files_cache_logger.debug(f"FILES-CACHE-SAVE: finished, {entries} remaining entries saved.")
|
|
return fd.integrity_data
|
|
|
|
def file_known_and_unchanged(self, hashed_path, path_hash, st):
|
|
"""
|
|
Check if we know the file that has this path_hash (know == it is in our files cache) and
|
|
whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode).
|
|
|
|
:param hashed_path: the file's path as we gave it to hash(hashed_path)
|
|
:param path_hash: hash(hashed_path), to save some memory in the files cache
|
|
:param st: the file's stat() result
|
|
:return: known, chunks (known is True if we have infos about this file in the cache,
|
|
chunks is a list[ChunkListEntry] IF the file has not changed, otherwise None).
|
|
"""
|
|
if not stat.S_ISREG(st.st_mode):
|
|
return False, None
|
|
cache_mode = self.cache_mode
|
|
if "d" in cache_mode: # d(isabled)
|
|
files_cache_logger.debug("UNKNOWN: files cache disabled")
|
|
return False, None
|
|
# note: r(echunk) does not need the files cache in this method, but the files cache will
|
|
# be updated and saved to disk to memorize the files. To preserve previous generations in
|
|
# the cache, this means that it also needs to get loaded from disk first.
|
|
if "r" in cache_mode: # r(echunk)
|
|
files_cache_logger.debug("UNKNOWN: rechunking enforced")
|
|
return False, None
|
|
entry = self.files.get(path_hash)
|
|
if not entry:
|
|
files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path)
|
|
return False, None
|
|
# we know the file!
|
|
entry = FileCacheEntry(*msgpack.unpackb(entry))
|
|
if "s" in cache_mode and entry.size != st.st_size:
|
|
files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path)
|
|
return True, None
|
|
if "i" in cache_mode and entry.inode != st.st_ino:
|
|
files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path)
|
|
return True, None
|
|
ctime = int_to_timestamp(safe_ns(st.st_ctime_ns))
|
|
if "c" in cache_mode and entry.ctime != ctime:
|
|
files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path)
|
|
return True, None
|
|
mtime = int_to_timestamp(safe_ns(st.st_mtime_ns))
|
|
if "m" in cache_mode and entry.mtime != mtime:
|
|
files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path)
|
|
return True, None
|
|
# V = any of the inode number, mtime, ctime values.
|
|
# we ignored V in the comparison above or it is still the same value.
|
|
# if it is still the same, replacing it in the tuple doesn't change it.
|
|
# if we ignored it, a reason for doing that is that files were moved/copied to
|
|
# a new disk / new fs (so a one-time change of V is expected) and we wanted
|
|
# to avoid everything getting chunked again. to be able to re-enable the
|
|
# V comparison in a future backup run (and avoid chunking everything again at
|
|
# that time), we need to update V in the cache with what we see in the filesystem.
|
|
self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, ctime=ctime, mtime=mtime, age=0))
|
|
chunks = [ChunkListEntry(*chunk) for chunk in entry.chunks] # convert to list of namedtuple
|
|
return True, chunks
|
|
|
|
def memorize_file(self, hashed_path, path_hash, st, chunks):
|
|
if not stat.S_ISREG(st.st_mode):
|
|
return
|
|
# note: r(echunk) modes will update the files cache, d(isabled) mode won't
|
|
if "d" in self.cache_mode:
|
|
files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled")
|
|
return
|
|
ctime_ns = safe_ns(st.st_ctime_ns)
|
|
mtime_ns = safe_ns(st.st_mtime_ns)
|
|
entry = FileCacheEntry(
|
|
age=0,
|
|
inode=st.st_ino,
|
|
size=st.st_size,
|
|
ctime=int_to_timestamp(ctime_ns),
|
|
mtime=int_to_timestamp(mtime_ns),
|
|
chunks=chunks,
|
|
)
|
|
self.files[path_hash] = msgpack.packb(entry)
|
|
self._newest_cmtime = max(self._newest_cmtime or 0, ctime_ns)
|
|
self._newest_cmtime = max(self._newest_cmtime or 0, mtime_ns)
|
|
files_cache_logger.debug(
|
|
"FILES-CACHE-UPDATE: put %r <- %r", entry._replace(chunks="[%d entries]" % len(entry.chunks)), hashed_path
|
|
)
|
|
|
|
|
|
def load_chunks_hash(repository) -> bytes:
|
|
try:
|
|
hash = repository.store_load("cache/chunks_hash")
|
|
logger.debug(f"cache/chunks_hash is '{bin_to_hex(hash)}'.")
|
|
except (Repository.ObjectNotFound, StoreObjectNotFound):
|
|
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
|
|
hash = b""
|
|
logger.debug("cache/chunks_hash missing!")
|
|
return hash
|
|
|
|
|
|
def write_chunkindex_to_repo_cache(repository, chunks, *, compact=False, clear=False, force_write=False):
|
|
cached_hash = load_chunks_hash(repository)
|
|
if compact:
|
|
# if we don't need the in-memory chunks index anymore:
|
|
chunks.compact() # vacuum the hash table
|
|
with io.BytesIO() as f:
|
|
chunks.write(f)
|
|
data = f.getvalue()
|
|
if clear:
|
|
# if we don't need the in-memory chunks index anymore:
|
|
chunks.clear() # free memory, immediately
|
|
new_hash = xxh64(data)
|
|
if force_write or new_hash != cached_hash:
|
|
# when an updated chunks index is stored into the cache, we also store its hash into the cache.
|
|
# when a client is loading the chunks index from a cache, it has to compare its xxh64
|
|
# hash against cache/chunks_hash in the repository. if it is the same, the cache
|
|
# is valid. If it is different, the cache is either corrupted or out of date and
|
|
# has to be discarded.
|
|
# when some functionality is DELETING chunks from the repository, it has to either update
|
|
# both cache/chunks and cache/chunks_hash (like borg compact does) or it has to delete both,
|
|
# so that all clients will discard any client-local chunks index caches.
|
|
logger.debug(f"caching chunks index {bin_to_hex(new_hash)} in repository...")
|
|
repository.store_store("cache/chunks", data)
|
|
repository.store_store("cache/chunks_hash", new_hash)
|
|
return new_hash
|
|
|
|
|
|
def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False):
|
|
chunks = None
|
|
# first, try to load a pre-computed and centrally cached chunks index:
|
|
if not disable_caches:
|
|
wanted_hash = load_chunks_hash(repository)
|
|
logger.debug(f"trying to get cached chunk index (id {bin_to_hex(wanted_hash)}) from the repo...")
|
|
try:
|
|
chunks_data = repository.store_load("cache/chunks")
|
|
except (Repository.ObjectNotFound, StoreObjectNotFound):
|
|
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
|
|
logger.debug("cache/chunks not found in the repository.")
|
|
else:
|
|
if xxh64(chunks_data) == wanted_hash:
|
|
logger.debug("cache/chunks is valid.")
|
|
with io.BytesIO(chunks_data) as f:
|
|
chunks = ChunkIndex.read(f)
|
|
return chunks
|
|
else:
|
|
logger.debug("cache/chunks is invalid.")
|
|
# if we didn't get anything from the cache, compute the ChunkIndex the slow way:
|
|
logger.debug("querying the chunk IDs list from the repo...")
|
|
chunks = ChunkIndex()
|
|
t0 = perf_counter()
|
|
num_chunks = 0
|
|
# The repo says it has these chunks, so we assume they are referenced chunks.
|
|
# We do not care for refcounting anymore, so we just set refcount = MAX_VALUE.
|
|
# We do not know the plaintext size (!= stored_size), thus we set size = 0.
|
|
init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
|
|
for id, stored_size in repo_lister(repository, limit=LIST_SCAN_LIMIT):
|
|
num_chunks += 1
|
|
chunks[id] = init_entry
|
|
# Cache does not contain the manifest.
|
|
if not isinstance(repository, (Repository, RemoteRepository)):
|
|
del chunks[Manifest.MANIFEST_ID]
|
|
duration = perf_counter() - t0 or 0.001
|
|
# Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.
|
|
# Protocol overhead is neglected in this calculation.
|
|
speed = format_file_size(num_chunks * 34 / duration)
|
|
logger.debug(f"queried {num_chunks} chunk IDs in {duration} s, ~{speed}/s")
|
|
if cache_immediately:
|
|
# immediately update cache/chunks, so we only rarely have to do it the slow way:
|
|
write_chunkindex_to_repo_cache(repository, chunks, compact=False, clear=False, force_write=True)
|
|
return chunks
|
|
|
|
|
|
class ChunksMixin:
|
|
"""
|
|
Chunks index related code for misc. Cache implementations.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._chunks = None
|
|
self.last_refresh_dt = datetime.now(timezone.utc)
|
|
self.refresh_td = timedelta(seconds=60)
|
|
|
|
@property
|
|
def chunks(self):
|
|
if self._chunks is None:
|
|
self._chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True)
|
|
return self._chunks
|
|
|
|
def seen_chunk(self, id, size=None):
|
|
entry = self.chunks.get(id, ChunkIndexEntry(0, None))
|
|
if entry.refcount and size is not None:
|
|
assert isinstance(entry.size, int)
|
|
if not entry.size:
|
|
# AdHocWithFilesCache:
|
|
# Here *size* is used to update the chunk's size information, which will be zero for existing chunks.
|
|
self.chunks[id] = entry._replace(size=size)
|
|
return entry.refcount != 0
|
|
|
|
def reuse_chunk(self, id, size, stats):
|
|
assert isinstance(size, int) and size > 0
|
|
stats.update(size, False)
|
|
return ChunkListEntry(id, size)
|
|
|
|
def add_chunk(
|
|
self,
|
|
id,
|
|
meta,
|
|
data,
|
|
*,
|
|
stats,
|
|
wait=True,
|
|
compress=True,
|
|
size=None,
|
|
ctype=None,
|
|
clevel=None,
|
|
ro_type=ROBJ_FILE_STREAM,
|
|
):
|
|
assert ro_type is not None
|
|
if size is None:
|
|
if compress:
|
|
size = len(data) # data is still uncompressed
|
|
else:
|
|
raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
|
|
now = datetime.now(timezone.utc)
|
|
exists = self.seen_chunk(id, size)
|
|
if exists:
|
|
# if borg create is processing lots of unchanged files (no content and not metadata changes),
|
|
# there could be a long time without any repository operations and the repo lock would get stale.
|
|
self.refresh_lock(now)
|
|
return self.reuse_chunk(id, size, stats)
|
|
cdata = self.repo_objs.format(
|
|
id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
|
|
)
|
|
self.repository.put(id, cdata, wait=wait)
|
|
self.last_refresh_dt = now # .put also refreshed the lock
|
|
self.chunks.add(id, ChunkIndex.MAX_VALUE, size)
|
|
stats.update(size, not exists)
|
|
return ChunkListEntry(id, size)
|
|
|
|
def _write_chunks_cache(self, chunks):
|
|
# this is called from .close, so we can clear/compact here:
|
|
write_chunkindex_to_repo_cache(self.repository, self._chunks, compact=True, clear=True)
|
|
self._chunks = None # nothing there (cleared!)
|
|
|
|
def refresh_lock(self, now):
|
|
if now > self.last_refresh_dt + self.refresh_td:
|
|
# the repository lock needs to get refreshed regularly, or it will be killed as stale.
|
|
# refreshing the lock is not part of the repository API, so we do it indirectly via repository.info.
|
|
self.repository.info()
|
|
self.last_refresh_dt = now
|
|
|
|
|
|
class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
|
|
"""
|
|
An ad-hoc chunks and files cache.
|
|
|
|
Chunks: it does not maintain accurate reference count.
|
|
Chunks that were not added during the current lifetime won't have correct size set (0 bytes)
|
|
and will have an infinite reference count (MAX_VALUE).
|
|
|
|
Files: if a previous_archive_id is given, ad-hoc build a in-memory files cache from that archive.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
manifest,
|
|
path=None,
|
|
warn_if_unencrypted=True,
|
|
progress=False,
|
|
cache_mode=FILES_CACHE_MODE_DISABLED,
|
|
iec=False,
|
|
archive_name=None,
|
|
start_backup=None,
|
|
):
|
|
"""
|
|
:param warn_if_unencrypted: print warning if accessing unknown unencrypted repository
|
|
:param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison
|
|
"""
|
|
FilesCacheMixin.__init__(self, cache_mode, archive_name, start_backup)
|
|
ChunksMixin.__init__(self)
|
|
assert isinstance(manifest, Manifest)
|
|
self.manifest = manifest
|
|
self.repository = manifest.repository
|
|
self.key = manifest.key
|
|
self.repo_objs = manifest.repo_objs
|
|
self.progress = progress
|
|
|
|
self.path = cache_dir(self.repository, path)
|
|
self.security_manager = SecurityManager(self.repository)
|
|
self.cache_config = CacheConfig(self.repository, self.path)
|
|
|
|
# Warn user before sending data to a never seen before unencrypted repository
|
|
if not os.path.exists(self.path):
|
|
self.security_manager.assert_access_unknown(warn_if_unencrypted, manifest, self.key)
|
|
self.create()
|
|
|
|
self.open()
|
|
try:
|
|
self.security_manager.assert_secure(manifest, self.key)
|
|
|
|
if not self.check_cache_compatibility():
|
|
self.wipe_cache()
|
|
|
|
self.update_compatibility()
|
|
except: # noqa
|
|
self.close()
|
|
raise
|
|
|
|
def __enter__(self):
|
|
self._chunks = None
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
self._chunks = None
|
|
|
|
def create(self):
|
|
"""Create a new empty cache at `self.path`"""
|
|
os.makedirs(self.path)
|
|
with open(os.path.join(self.path, "README"), "w") as fd:
|
|
fd.write(CACHE_README)
|
|
self.cache_config.create()
|
|
|
|
def open(self):
|
|
if not os.path.isdir(self.path):
|
|
raise Exception("%s Does not look like a Borg cache" % self.path)
|
|
self.cache_config.open()
|
|
self.cache_config.load()
|
|
|
|
def close(self):
|
|
self.security_manager.save(self.manifest, self.key)
|
|
pi = ProgressIndicatorMessage(msgid="cache.close")
|
|
if self._files is not None:
|
|
pi.output("Saving files cache")
|
|
integrity_data = self._write_files_cache(self._files)
|
|
self.cache_config.integrity[self.files_cache_name()] = integrity_data
|
|
if self._chunks is not None:
|
|
pi.output("Saving chunks cache")
|
|
self._write_chunks_cache(self._chunks) # cache/chunks in repo has a different integrity mechanism
|
|
pi.output("Saving cache config")
|
|
self.cache_config.save(self.manifest)
|
|
self.cache_config.close()
|
|
pi.finish()
|
|
self.cache_config = None
|
|
|
|
def check_cache_compatibility(self):
|
|
my_features = Manifest.SUPPORTED_REPO_FEATURES
|
|
if self.cache_config.ignored_features & my_features:
|
|
# The cache might not contain references of chunks that need a feature that is mandatory for some operation
|
|
# and which this version supports. To avoid corruption while executing that operation force rebuild.
|
|
return False
|
|
if not self.cache_config.mandatory_features <= my_features:
|
|
# The cache was build with consideration to at least one feature that this version does not understand.
|
|
# This client might misinterpret the cache. Thus force a rebuild.
|
|
return False
|
|
return True
|
|
|
|
def wipe_cache(self):
|
|
logger.warning("Discarding incompatible cache and forcing a cache rebuild")
|
|
self._chunks = ChunkIndex()
|
|
self.cache_config.manifest_id = ""
|
|
self.cache_config._config.set("cache", "manifest", "")
|
|
|
|
self.cache_config.ignored_features = set()
|
|
self.cache_config.mandatory_features = set()
|
|
|
|
def update_compatibility(self):
|
|
operation_to_features_map = self.manifest.get_all_mandatory_features()
|
|
my_features = Manifest.SUPPORTED_REPO_FEATURES
|
|
repo_features = set()
|
|
for operation, features in operation_to_features_map.items():
|
|
repo_features.update(features)
|
|
|
|
self.cache_config.ignored_features.update(repo_features - my_features)
|
|
self.cache_config.mandatory_features.update(repo_features & my_features)
|