diff --git a/src/borg/remote.py b/src/borg/remote.py index 786fc390e..20918f9f4 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -1,28 +1,17 @@ -import errno import inspect import logging import os import queue import select -import shutil -import struct import sys -import tempfile -import time import traceback -from xxhash import xxh64 - import borg.logger from . import __version__ -from .compress import Compressor from .constants import * # NOQA from .helpers import Error, IntegrityError -from .helpers import bin_to_hex from .helpers import get_limited_unpacker from .helpers import sysinfo -from .helpers import format_file_size -from .helpers import safe_unlink from .logger import create_logger, borg_serve_log_queue from .helpers import msgpack from .repository import Repository, StoreObjectNotFound @@ -296,9 +285,9 @@ class RepositoryServer: # pragma: no cover class RepositoryNoCache: - """A not caching Repository wrapper, passes through to repository. + """A Repository wrapper that passes through to the repository. - Just to have same API (including the context manager) as RepositoryCache. + It applies an optional *transform* and provides a uniform context-manager API. *transform* is a callable taking two arguments, key and raw repository data. The return value is returned from get()/get_many(). By default, the raw @@ -329,176 +318,22 @@ class RepositoryNoCache: pass -class RepositoryCache(RepositoryNoCache): +def cache_if_remote(repository, *, decrypted_cache=False, transform=None): """ - A caching Repository wrapper. - - Caches Repository GET operations locally. - - *pack* and *unpack* complement *transform* of the base class. - *pack* receives the output of *transform* and should return bytes, - which are stored in the cache. *unpack* receives these bytes and - should return the initial data (as returned by *transform*). - """ - - def __init__(self, repository, pack=None, unpack=None, transform=None): - super().__init__(repository, transform) - self.pack = pack or (lambda data: data) - self.unpack = unpack or (lambda data: data) - self.cache = set() - self.basedir = tempfile.mkdtemp(prefix="borg-cache-") - self.query_size_limit() - self.size = 0 - # Instrumentation - self.hits = 0 - self.misses = 0 - self.slow_misses = 0 - self.slow_lat = 0.0 - self.evictions = 0 - self.enospc = 0 - - def query_size_limit(self): - available_space = shutil.disk_usage(self.basedir).free - self.size_limit = int(min(available_space * 0.25, 2**31)) - - def prefixed_key(self, key, complete): - # just prefix another byte telling whether this key refers to a complete chunk - # or a without-data-metadata-only chunk (see also read_data param). - prefix = b"\x01" if complete else b"\x00" - return prefix + key - - def key_filename(self, key): - return os.path.join(self.basedir, bin_to_hex(key)) - - def backoff(self): - self.query_size_limit() - target_size = int(0.9 * self.size_limit) - while self.size > target_size and self.cache: - key = self.cache.pop() - file = self.key_filename(key) - self.size -= os.stat(file).st_size - os.unlink(file) - self.evictions += 1 - - def add_entry(self, key, data, cache, complete): - transformed = self.transform(key, data) - if not cache: - return transformed - packed = self.pack(transformed) - pkey = self.prefixed_key(key, complete=complete) - file = self.key_filename(pkey) - try: - with open(file, "wb") as fd: - fd.write(packed) - except OSError as os_error: - try: - safe_unlink(file) - except FileNotFoundError: - pass # open() could have failed as well - if os_error.errno == errno.ENOSPC: - self.enospc += 1 - self.backoff() - else: - raise - else: - self.size += len(packed) - self.cache.add(pkey) - if self.size > self.size_limit: - self.backoff() - return transformed - - def log_instrumentation(self): - logger.debug( - "RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), " - "%d evictions, %d ENOSPC hit", - len(self.cache), - format_file_size(self.size), - format_file_size(self.size_limit), - self.hits, - self.misses, - self.slow_misses, - self.slow_lat, - self.evictions, - self.enospc, - ) - - def close(self): - self.log_instrumentation() - self.cache.clear() - shutil.rmtree(self.basedir) - - def get_many(self, keys, read_data=True, raise_missing=True, cache=True): - # It could use different cache keys depending on read_data and cache full vs. meta-only chunks. - unknown_keys = [key for key in keys if self.prefixed_key(key, complete=read_data) not in self.cache] - repository_iterator = zip( - unknown_keys, self.repository.get_many(unknown_keys, read_data=read_data, raise_missing=raise_missing) - ) - for key in keys: - pkey = self.prefixed_key(key, complete=read_data) - if pkey in self.cache: - file = self.key_filename(pkey) - with open(file, "rb") as fd: - self.hits += 1 - yield self.unpack(fd.read()) - else: - for key_, data in repository_iterator: - if key_ == key: - transformed = self.add_entry(key, data, cache, complete=read_data) - self.misses += 1 - yield transformed - break - else: - # slow path: eviction during this get_many removed this key from the cache - t0 = time.perf_counter() - data = self.repository.get(key, read_data=read_data, raise_missing=raise_missing) - self.slow_lat += time.perf_counter() - t0 - transformed = self.add_entry(key, data, cache, complete=read_data) - self.slow_misses += 1 - yield transformed - # Consume any pending requests - for _ in repository_iterator: - pass - - -def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None, force_cache=False): - """ - Return a Repository(No)Cache for *repository*. + Return a RepositoryNoCache wrapping *repository*. If *decrypted_cache* is a repo_objs object, then get and get_many will return a tuple - (csize, plaintext) instead of the actual data in the repository. The cache will - store decrypted data, which increases CPU efficiency (by avoiding repeatedly decrypting - and more importantly MAC and ID checking cached objects). - Internally, objects are compressed with LZ4. + (csize, plaintext) instead of the actual data in the repository (the objects are + parsed/decrypted via the *transform* derived from it). """ - if decrypted_cache and (pack or unpack or transform): - raise ValueError("decrypted_cache and pack/unpack/transform are incompatible") + if decrypted_cache and transform: + raise ValueError("decrypted_cache and transform are incompatible") elif decrypted_cache: repo_objs = decrypted_cache - # 32 bit csize, 64 bit (8 byte) xxh64, 1 byte ctype, 1 byte clevel - cache_struct = struct.Struct("=I8sBB") - compressor = Compressor("lz4") - - def pack(data): - csize, decrypted = data - meta, compressed = compressor.compress({}, decrypted) - return cache_struct.pack(csize, xxh64(compressed).digest(), meta["ctype"], meta["clevel"]) + compressed - - def unpack(data): - data = memoryview(data) - csize, checksum, ctype, clevel = cache_struct.unpack(data[: cache_struct.size]) - compressed = data[cache_struct.size :] - if checksum != xxh64(compressed).digest(): - raise IntegrityError("detected corrupted data in metadata cache") - meta = dict(ctype=ctype, clevel=clevel, csize=len(compressed)) - _, decrypted = compressor.decompress(meta, compressed) - return csize, decrypted def transform(id_, data): meta, decrypted = repo_objs.parse(id_, data, ro_type=ROBJ_DONTCARE) csize = meta.get("csize", len(data)) return csize, decrypted - if force_cache: - return RepositoryCache(repository, pack, unpack, transform) - else: - return RepositoryNoCache(repository, transform) + return RepositoryNoCache(repository, transform) diff --git a/src/borg/testsuite/remote_test.py b/src/borg/testsuite/remote_test.py index 0eb0b79c3..4a5ca5e14 100644 --- a/src/borg/testsuite/remote_test.py +++ b/src/borg/testsuite/remote_test.py @@ -1,22 +1,18 @@ -import errno import os -import io -from unittest.mock import patch import pytest from ..constants import ROBJ_FILE_STREAM -from ..remote import RepositoryCache, cache_if_remote +from ..remote import cache_if_remote from ..repository import Repository from ..crypto.key import PlaintextKey -from ..helpers import IntegrityError from ..repoobj import RepoObj from .hashindex_test import H from .repository_test import fchunk, pdchunk from .crypto.key_test import TestKey -class TestRepositoryCache: +class TestCacheIfRemote: @pytest.fixture def repository(self, tmpdir): self.repository_location = os.path.join(str(tmpdir), "repository") @@ -26,124 +22,16 @@ class TestRepositoryCache: repository.put(H(3), fchunk(bytes(100))) yield repository - @pytest.fixture - def cache(self, repository): - return RepositoryCache(repository) - - def test_simple(self, cache: RepositoryCache): - # Single get()s are not cached, since they are used for unique objects like archives. - assert pdchunk(cache.get(H(1))) == b"1234" - assert cache.misses == 1 - assert cache.hits == 0 - - assert [pdchunk(ch) for ch in cache.get_many([H(1)])] == [b"1234"] - assert cache.misses == 2 - assert cache.hits == 0 - - assert [pdchunk(ch) for ch in cache.get_many([H(1)])] == [b"1234"] - assert cache.misses == 2 - assert cache.hits == 1 - - assert pdchunk(cache.get(H(1))) == b"1234" - assert cache.misses == 2 - assert cache.hits == 2 - - def test_meta(self, cache: RepositoryCache): - # Same as test_simple, but not reading the chunk data (metadata only). - # Single get()s are not cached, since they are used for unique objects like archives. - assert pdchunk(cache.get(H(1), read_data=False)) == b"" - assert cache.misses == 1 - assert cache.hits == 0 - - assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""] - assert cache.misses == 2 - assert cache.hits == 0 - - assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""] - assert cache.misses == 2 - assert cache.hits == 1 - - assert pdchunk(cache.get(H(1), read_data=False)) == b"" - assert cache.misses == 2 - assert cache.hits == 2 - - def test_mixed(self, cache: RepositoryCache): - assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""] - assert cache.misses == 1 - assert cache.hits == 0 - - assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=True)] == [b"1234"] - assert cache.misses == 2 - assert cache.hits == 0 - - assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""] - assert cache.misses == 2 - assert cache.hits == 1 - - assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=True)] == [b"1234"] - assert cache.misses == 2 - assert cache.hits == 2 - - def test_backoff(self, cache: RepositoryCache): - def query_size_limit(): - cache.size_limit = 0 - - assert [pdchunk(ch) for ch in cache.get_many([H(1), H(2)])] == [b"1234", b"5678"] - assert cache.misses == 2 - assert cache.evictions == 0 - iterator = cache.get_many([H(1), H(3), H(2)]) - assert pdchunk(next(iterator)) == b"1234" - - # Force cache to back off - qsl = cache.query_size_limit - cache.query_size_limit = query_size_limit # type: ignore[assignment] - cache.backoff() - cache.query_size_limit = qsl # type: ignore[assignment] - # Evicted H(1) and H(2) - assert cache.evictions == 2 - assert H(1) not in cache.cache - assert H(2) not in cache.cache - assert pdchunk(next(iterator)) == bytes(100) - assert cache.slow_misses == 0 - # Since H(2) was in the cache when we called get_many(), but has - # been evicted during iterating the generator, it will be a slow miss. - assert pdchunk(next(iterator)) == b"5678" - assert cache.slow_misses == 1 - - def test_enospc(self, cache: RepositoryCache): - class enospc_open: - def __init__(self, *args): - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - - def write(self, data): - raise OSError(errno.ENOSPC, "foo") - - def truncate(self, n=None): - pass - - iterator = cache.get_many([H(1), H(2), H(3)]) - assert pdchunk(next(iterator)) == b"1234" - - with patch("builtins.open", enospc_open): - assert pdchunk(next(iterator)) == b"5678" - assert cache.enospc == 1 - # We didn't patch query_size_limit, which would set size_limit to a low - # value, so nothing was actually evicted. - assert cache.evictions == 0 - - assert pdchunk(next(iterator)) == bytes(100) + def test_passthrough(self, repository): + # Without decrypted_cache, raw repository data is passed through unchanged. + with cache_if_remote(repository) as cached: + assert pdchunk(cached.get(H(1))) == b"1234" + assert [pdchunk(ch) for ch in cached.get_many([H(1), H(2)])] == [b"1234", b"5678"] @pytest.fixture def key(self, repository, monkeypatch): monkeypatch.setenv("BORG_PASSPHRASE", "test") - key = PlaintextKey.create(repository, TestKey.MockArgs()) - return key + return PlaintextKey.create(repository, TestKey.MockArgs()) @pytest.fixture def repo_objs(self, key): @@ -162,27 +50,13 @@ class TestRepositoryCache: def H2(self, repo_objs, repository): return self._put_encrypted_object(repo_objs, repository, b"5678") - @pytest.fixture - def H3(self, repo_objs, repository): - return self._put_encrypted_object(repo_objs, repository, bytes(100)) + def test_decrypted_cache(self, repo_objs, repository, H1, H2): + # With decrypted_cache, get/get_many return (csize, plaintext) tuples. + with cache_if_remote(repository, decrypted_cache=repo_objs) as cached: + csize, plaintext = cached.get(H1) + assert plaintext == b"1234" + assert [pt for _csize, pt in cached.get_many([H1, H2])] == [b"1234", b"5678"] - @pytest.fixture - def decrypted_cache(self, repo_objs, repository): - return cache_if_remote(repository, decrypted_cache=repo_objs, force_cache=True) - - def test_cache_corruption(self, decrypted_cache: RepositoryCache, H1, H2, H3): - list(decrypted_cache.get_many([H1, H2, H3])) - - iterator = decrypted_cache.get_many([H1, H2, H3]) - assert next(iterator) == (4, b"1234") - - pkey = decrypted_cache.prefixed_key(H2, complete=True) - with open(decrypted_cache.key_filename(pkey), "a+b") as fd: - fd.seek(-1, io.SEEK_END) - corrupted = (int.from_bytes(fd.read(), "little") ^ 2).to_bytes(1, "little") - fd.seek(-1, io.SEEK_END) - fd.write(corrupted) - fd.truncate() - - with pytest.raises(IntegrityError): - assert next(iterator) == (4, b"5678") + def test_decrypted_cache_and_transform_incompatible(self, repository, repo_objs): + with pytest.raises(ValueError): + cache_if_remote(repository, decrypted_cache=repo_objs, transform=lambda key, data: data)