mirror of
https://github.com/borgbackup/borg.git
synced 2026-06-11 01:41:57 -04:00
remove now-dead RepositoryCache / force_cache from remote.py
After removing the modern RemoteRepository, cache_if_remote always returned RepositoryNoCache in production (the only RepositoryCache path was the removed isinstance(RemoteRepository) check; force_cache=True was used only by a test). Delete the vestigial RepositoryCache class and simplify cache_if_remote: drop the pack/unpack/force_cache parameters and the LZ4/xxh64 cache-file machinery, keep building the decrypted_cache -> transform closure, and always return RepositoryNoCache. Remove the imports that only RepositoryCache used. Replace the RepositoryCache tests with a focused test of the surviving cache_if_remote path (plain passthrough and decrypted (csize, plaintext) tuples). The legacy copy in borg/legacy/remote.py is intentionally left untouched (its RepositoryCache is still used for LegacyRemoteRepository). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
cac7237d3f
commit
4a4a8e4e72
2 changed files with 26 additions and 317 deletions
|
|
@ -1,28 +1,17 @@
|
|||
import errno
|
||||
import inspect
|
||||
import logging
|
||||
import os
|
||||
import queue
|
||||
import select
|
||||
import shutil
|
||||
import struct
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from xxhash import xxh64
|
||||
|
||||
import borg.logger
|
||||
from . import __version__
|
||||
from .compress import Compressor
|
||||
from .constants import * # NOQA
|
||||
from .helpers import Error, IntegrityError
|
||||
from .helpers import bin_to_hex
|
||||
from .helpers import get_limited_unpacker
|
||||
from .helpers import sysinfo
|
||||
from .helpers import format_file_size
|
||||
from .helpers import safe_unlink
|
||||
from .logger import create_logger, borg_serve_log_queue
|
||||
from .helpers import msgpack
|
||||
from .repository import Repository, StoreObjectNotFound
|
||||
|
|
@ -296,9 +285,9 @@ class RepositoryServer: # pragma: no cover
|
|||
|
||||
|
||||
class RepositoryNoCache:
|
||||
"""A not caching Repository wrapper, passes through to repository.
|
||||
"""A Repository wrapper that passes through to the repository.
|
||||
|
||||
Just to have same API (including the context manager) as RepositoryCache.
|
||||
It applies an optional *transform* and provides a uniform context-manager API.
|
||||
|
||||
*transform* is a callable taking two arguments, key and raw repository data.
|
||||
The return value is returned from get()/get_many(). By default, the raw
|
||||
|
|
@ -329,176 +318,22 @@ class RepositoryNoCache:
|
|||
pass
|
||||
|
||||
|
||||
class RepositoryCache(RepositoryNoCache):
|
||||
def cache_if_remote(repository, *, decrypted_cache=False, transform=None):
|
||||
"""
|
||||
A caching Repository wrapper.
|
||||
|
||||
Caches Repository GET operations locally.
|
||||
|
||||
*pack* and *unpack* complement *transform* of the base class.
|
||||
*pack* receives the output of *transform* and should return bytes,
|
||||
which are stored in the cache. *unpack* receives these bytes and
|
||||
should return the initial data (as returned by *transform*).
|
||||
"""
|
||||
|
||||
def __init__(self, repository, pack=None, unpack=None, transform=None):
|
||||
super().__init__(repository, transform)
|
||||
self.pack = pack or (lambda data: data)
|
||||
self.unpack = unpack or (lambda data: data)
|
||||
self.cache = set()
|
||||
self.basedir = tempfile.mkdtemp(prefix="borg-cache-")
|
||||
self.query_size_limit()
|
||||
self.size = 0
|
||||
# Instrumentation
|
||||
self.hits = 0
|
||||
self.misses = 0
|
||||
self.slow_misses = 0
|
||||
self.slow_lat = 0.0
|
||||
self.evictions = 0
|
||||
self.enospc = 0
|
||||
|
||||
def query_size_limit(self):
|
||||
available_space = shutil.disk_usage(self.basedir).free
|
||||
self.size_limit = int(min(available_space * 0.25, 2**31))
|
||||
|
||||
def prefixed_key(self, key, complete):
|
||||
# just prefix another byte telling whether this key refers to a complete chunk
|
||||
# or a without-data-metadata-only chunk (see also read_data param).
|
||||
prefix = b"\x01" if complete else b"\x00"
|
||||
return prefix + key
|
||||
|
||||
def key_filename(self, key):
|
||||
return os.path.join(self.basedir, bin_to_hex(key))
|
||||
|
||||
def backoff(self):
|
||||
self.query_size_limit()
|
||||
target_size = int(0.9 * self.size_limit)
|
||||
while self.size > target_size and self.cache:
|
||||
key = self.cache.pop()
|
||||
file = self.key_filename(key)
|
||||
self.size -= os.stat(file).st_size
|
||||
os.unlink(file)
|
||||
self.evictions += 1
|
||||
|
||||
def add_entry(self, key, data, cache, complete):
|
||||
transformed = self.transform(key, data)
|
||||
if not cache:
|
||||
return transformed
|
||||
packed = self.pack(transformed)
|
||||
pkey = self.prefixed_key(key, complete=complete)
|
||||
file = self.key_filename(pkey)
|
||||
try:
|
||||
with open(file, "wb") as fd:
|
||||
fd.write(packed)
|
||||
except OSError as os_error:
|
||||
try:
|
||||
safe_unlink(file)
|
||||
except FileNotFoundError:
|
||||
pass # open() could have failed as well
|
||||
if os_error.errno == errno.ENOSPC:
|
||||
self.enospc += 1
|
||||
self.backoff()
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
self.size += len(packed)
|
||||
self.cache.add(pkey)
|
||||
if self.size > self.size_limit:
|
||||
self.backoff()
|
||||
return transformed
|
||||
|
||||
def log_instrumentation(self):
|
||||
logger.debug(
|
||||
"RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), "
|
||||
"%d evictions, %d ENOSPC hit",
|
||||
len(self.cache),
|
||||
format_file_size(self.size),
|
||||
format_file_size(self.size_limit),
|
||||
self.hits,
|
||||
self.misses,
|
||||
self.slow_misses,
|
||||
self.slow_lat,
|
||||
self.evictions,
|
||||
self.enospc,
|
||||
)
|
||||
|
||||
def close(self):
|
||||
self.log_instrumentation()
|
||||
self.cache.clear()
|
||||
shutil.rmtree(self.basedir)
|
||||
|
||||
def get_many(self, keys, read_data=True, raise_missing=True, cache=True):
|
||||
# It could use different cache keys depending on read_data and cache full vs. meta-only chunks.
|
||||
unknown_keys = [key for key in keys if self.prefixed_key(key, complete=read_data) not in self.cache]
|
||||
repository_iterator = zip(
|
||||
unknown_keys, self.repository.get_many(unknown_keys, read_data=read_data, raise_missing=raise_missing)
|
||||
)
|
||||
for key in keys:
|
||||
pkey = self.prefixed_key(key, complete=read_data)
|
||||
if pkey in self.cache:
|
||||
file = self.key_filename(pkey)
|
||||
with open(file, "rb") as fd:
|
||||
self.hits += 1
|
||||
yield self.unpack(fd.read())
|
||||
else:
|
||||
for key_, data in repository_iterator:
|
||||
if key_ == key:
|
||||
transformed = self.add_entry(key, data, cache, complete=read_data)
|
||||
self.misses += 1
|
||||
yield transformed
|
||||
break
|
||||
else:
|
||||
# slow path: eviction during this get_many removed this key from the cache
|
||||
t0 = time.perf_counter()
|
||||
data = self.repository.get(key, read_data=read_data, raise_missing=raise_missing)
|
||||
self.slow_lat += time.perf_counter() - t0
|
||||
transformed = self.add_entry(key, data, cache, complete=read_data)
|
||||
self.slow_misses += 1
|
||||
yield transformed
|
||||
# Consume any pending requests
|
||||
for _ in repository_iterator:
|
||||
pass
|
||||
|
||||
|
||||
def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None, force_cache=False):
|
||||
"""
|
||||
Return a Repository(No)Cache for *repository*.
|
||||
Return a RepositoryNoCache wrapping *repository*.
|
||||
|
||||
If *decrypted_cache* is a repo_objs object, then get and get_many will return a tuple
|
||||
(csize, plaintext) instead of the actual data in the repository. The cache will
|
||||
store decrypted data, which increases CPU efficiency (by avoiding repeatedly decrypting
|
||||
and more importantly MAC and ID checking cached objects).
|
||||
Internally, objects are compressed with LZ4.
|
||||
(csize, plaintext) instead of the actual data in the repository (the objects are
|
||||
parsed/decrypted via the *transform* derived from it).
|
||||
"""
|
||||
if decrypted_cache and (pack or unpack or transform):
|
||||
raise ValueError("decrypted_cache and pack/unpack/transform are incompatible")
|
||||
if decrypted_cache and transform:
|
||||
raise ValueError("decrypted_cache and transform are incompatible")
|
||||
elif decrypted_cache:
|
||||
repo_objs = decrypted_cache
|
||||
# 32 bit csize, 64 bit (8 byte) xxh64, 1 byte ctype, 1 byte clevel
|
||||
cache_struct = struct.Struct("=I8sBB")
|
||||
compressor = Compressor("lz4")
|
||||
|
||||
def pack(data):
|
||||
csize, decrypted = data
|
||||
meta, compressed = compressor.compress({}, decrypted)
|
||||
return cache_struct.pack(csize, xxh64(compressed).digest(), meta["ctype"], meta["clevel"]) + compressed
|
||||
|
||||
def unpack(data):
|
||||
data = memoryview(data)
|
||||
csize, checksum, ctype, clevel = cache_struct.unpack(data[: cache_struct.size])
|
||||
compressed = data[cache_struct.size :]
|
||||
if checksum != xxh64(compressed).digest():
|
||||
raise IntegrityError("detected corrupted data in metadata cache")
|
||||
meta = dict(ctype=ctype, clevel=clevel, csize=len(compressed))
|
||||
_, decrypted = compressor.decompress(meta, compressed)
|
||||
return csize, decrypted
|
||||
|
||||
def transform(id_, data):
|
||||
meta, decrypted = repo_objs.parse(id_, data, ro_type=ROBJ_DONTCARE)
|
||||
csize = meta.get("csize", len(data))
|
||||
return csize, decrypted
|
||||
|
||||
if force_cache:
|
||||
return RepositoryCache(repository, pack, unpack, transform)
|
||||
else:
|
||||
return RepositoryNoCache(repository, transform)
|
||||
return RepositoryNoCache(repository, transform)
|
||||
|
|
|
|||
|
|
@ -1,22 +1,18 @@
|
|||
import errno
|
||||
import os
|
||||
import io
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from ..constants import ROBJ_FILE_STREAM
|
||||
from ..remote import RepositoryCache, cache_if_remote
|
||||
from ..remote import cache_if_remote
|
||||
from ..repository import Repository
|
||||
from ..crypto.key import PlaintextKey
|
||||
from ..helpers import IntegrityError
|
||||
from ..repoobj import RepoObj
|
||||
from .hashindex_test import H
|
||||
from .repository_test import fchunk, pdchunk
|
||||
from .crypto.key_test import TestKey
|
||||
|
||||
|
||||
class TestRepositoryCache:
|
||||
class TestCacheIfRemote:
|
||||
@pytest.fixture
|
||||
def repository(self, tmpdir):
|
||||
self.repository_location = os.path.join(str(tmpdir), "repository")
|
||||
|
|
@ -26,124 +22,16 @@ class TestRepositoryCache:
|
|||
repository.put(H(3), fchunk(bytes(100)))
|
||||
yield repository
|
||||
|
||||
@pytest.fixture
|
||||
def cache(self, repository):
|
||||
return RepositoryCache(repository)
|
||||
|
||||
def test_simple(self, cache: RepositoryCache):
|
||||
# Single get()s are not cached, since they are used for unique objects like archives.
|
||||
assert pdchunk(cache.get(H(1))) == b"1234"
|
||||
assert cache.misses == 1
|
||||
assert cache.hits == 0
|
||||
|
||||
assert [pdchunk(ch) for ch in cache.get_many([H(1)])] == [b"1234"]
|
||||
assert cache.misses == 2
|
||||
assert cache.hits == 0
|
||||
|
||||
assert [pdchunk(ch) for ch in cache.get_many([H(1)])] == [b"1234"]
|
||||
assert cache.misses == 2
|
||||
assert cache.hits == 1
|
||||
|
||||
assert pdchunk(cache.get(H(1))) == b"1234"
|
||||
assert cache.misses == 2
|
||||
assert cache.hits == 2
|
||||
|
||||
def test_meta(self, cache: RepositoryCache):
|
||||
# Same as test_simple, but not reading the chunk data (metadata only).
|
||||
# Single get()s are not cached, since they are used for unique objects like archives.
|
||||
assert pdchunk(cache.get(H(1), read_data=False)) == b""
|
||||
assert cache.misses == 1
|
||||
assert cache.hits == 0
|
||||
|
||||
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""]
|
||||
assert cache.misses == 2
|
||||
assert cache.hits == 0
|
||||
|
||||
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""]
|
||||
assert cache.misses == 2
|
||||
assert cache.hits == 1
|
||||
|
||||
assert pdchunk(cache.get(H(1), read_data=False)) == b""
|
||||
assert cache.misses == 2
|
||||
assert cache.hits == 2
|
||||
|
||||
def test_mixed(self, cache: RepositoryCache):
|
||||
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""]
|
||||
assert cache.misses == 1
|
||||
assert cache.hits == 0
|
||||
|
||||
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=True)] == [b"1234"]
|
||||
assert cache.misses == 2
|
||||
assert cache.hits == 0
|
||||
|
||||
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""]
|
||||
assert cache.misses == 2
|
||||
assert cache.hits == 1
|
||||
|
||||
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=True)] == [b"1234"]
|
||||
assert cache.misses == 2
|
||||
assert cache.hits == 2
|
||||
|
||||
def test_backoff(self, cache: RepositoryCache):
|
||||
def query_size_limit():
|
||||
cache.size_limit = 0
|
||||
|
||||
assert [pdchunk(ch) for ch in cache.get_many([H(1), H(2)])] == [b"1234", b"5678"]
|
||||
assert cache.misses == 2
|
||||
assert cache.evictions == 0
|
||||
iterator = cache.get_many([H(1), H(3), H(2)])
|
||||
assert pdchunk(next(iterator)) == b"1234"
|
||||
|
||||
# Force cache to back off
|
||||
qsl = cache.query_size_limit
|
||||
cache.query_size_limit = query_size_limit # type: ignore[assignment]
|
||||
cache.backoff()
|
||||
cache.query_size_limit = qsl # type: ignore[assignment]
|
||||
# Evicted H(1) and H(2)
|
||||
assert cache.evictions == 2
|
||||
assert H(1) not in cache.cache
|
||||
assert H(2) not in cache.cache
|
||||
assert pdchunk(next(iterator)) == bytes(100)
|
||||
assert cache.slow_misses == 0
|
||||
# Since H(2) was in the cache when we called get_many(), but has
|
||||
# been evicted during iterating the generator, it will be a slow miss.
|
||||
assert pdchunk(next(iterator)) == b"5678"
|
||||
assert cache.slow_misses == 1
|
||||
|
||||
def test_enospc(self, cache: RepositoryCache):
|
||||
class enospc_open:
|
||||
def __init__(self, *args):
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
pass
|
||||
|
||||
def write(self, data):
|
||||
raise OSError(errno.ENOSPC, "foo")
|
||||
|
||||
def truncate(self, n=None):
|
||||
pass
|
||||
|
||||
iterator = cache.get_many([H(1), H(2), H(3)])
|
||||
assert pdchunk(next(iterator)) == b"1234"
|
||||
|
||||
with patch("builtins.open", enospc_open):
|
||||
assert pdchunk(next(iterator)) == b"5678"
|
||||
assert cache.enospc == 1
|
||||
# We didn't patch query_size_limit, which would set size_limit to a low
|
||||
# value, so nothing was actually evicted.
|
||||
assert cache.evictions == 0
|
||||
|
||||
assert pdchunk(next(iterator)) == bytes(100)
|
||||
def test_passthrough(self, repository):
|
||||
# Without decrypted_cache, raw repository data is passed through unchanged.
|
||||
with cache_if_remote(repository) as cached:
|
||||
assert pdchunk(cached.get(H(1))) == b"1234"
|
||||
assert [pdchunk(ch) for ch in cached.get_many([H(1), H(2)])] == [b"1234", b"5678"]
|
||||
|
||||
@pytest.fixture
|
||||
def key(self, repository, monkeypatch):
|
||||
monkeypatch.setenv("BORG_PASSPHRASE", "test")
|
||||
key = PlaintextKey.create(repository, TestKey.MockArgs())
|
||||
return key
|
||||
return PlaintextKey.create(repository, TestKey.MockArgs())
|
||||
|
||||
@pytest.fixture
|
||||
def repo_objs(self, key):
|
||||
|
|
@ -162,27 +50,13 @@ class TestRepositoryCache:
|
|||
def H2(self, repo_objs, repository):
|
||||
return self._put_encrypted_object(repo_objs, repository, b"5678")
|
||||
|
||||
@pytest.fixture
|
||||
def H3(self, repo_objs, repository):
|
||||
return self._put_encrypted_object(repo_objs, repository, bytes(100))
|
||||
def test_decrypted_cache(self, repo_objs, repository, H1, H2):
|
||||
# With decrypted_cache, get/get_many return (csize, plaintext) tuples.
|
||||
with cache_if_remote(repository, decrypted_cache=repo_objs) as cached:
|
||||
csize, plaintext = cached.get(H1)
|
||||
assert plaintext == b"1234"
|
||||
assert [pt for _csize, pt in cached.get_many([H1, H2])] == [b"1234", b"5678"]
|
||||
|
||||
@pytest.fixture
|
||||
def decrypted_cache(self, repo_objs, repository):
|
||||
return cache_if_remote(repository, decrypted_cache=repo_objs, force_cache=True)
|
||||
|
||||
def test_cache_corruption(self, decrypted_cache: RepositoryCache, H1, H2, H3):
|
||||
list(decrypted_cache.get_many([H1, H2, H3]))
|
||||
|
||||
iterator = decrypted_cache.get_many([H1, H2, H3])
|
||||
assert next(iterator) == (4, b"1234")
|
||||
|
||||
pkey = decrypted_cache.prefixed_key(H2, complete=True)
|
||||
with open(decrypted_cache.key_filename(pkey), "a+b") as fd:
|
||||
fd.seek(-1, io.SEEK_END)
|
||||
corrupted = (int.from_bytes(fd.read(), "little") ^ 2).to_bytes(1, "little")
|
||||
fd.seek(-1, io.SEEK_END)
|
||||
fd.write(corrupted)
|
||||
fd.truncate()
|
||||
|
||||
with pytest.raises(IntegrityError):
|
||||
assert next(iterator) == (4, b"5678")
|
||||
def test_decrypted_cache_and_transform_incompatible(self, repository, repo_objs):
|
||||
with pytest.raises(ValueError):
|
||||
cache_if_remote(repository, decrypted_cache=repo_objs, transform=lambda key, data: data)
|
||||
|
|
|
|||
Loading…
Reference in a new issue