From 8aa745ddbd8ee1cddb3374673eb3eb08a9d5a8da Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sat, 10 Jun 2017 17:59:41 +0200 Subject: [PATCH] create: --no-cache-sync --- conftest.py | 6 +- docs/internals/frontends.rst | 1 + src/borg/archiver.py | 4 +- src/borg/cache.py | 259 +++++++++++++++++++++++++++++---- src/borg/hashindex.pyx | 19 ++- src/borg/helpers.py | 10 +- src/borg/repository.py | 2 +- src/borg/testsuite/archiver.py | 20 ++- 8 files changed, 282 insertions(+), 39 deletions(-) diff --git a/conftest.py b/conftest.py index e85ae6ef1..cc428be1f 100644 --- a/conftest.py +++ b/conftest.py @@ -62,16 +62,16 @@ def pytest_report_header(config, startdir): class DefaultPatches: def __init__(self, request): - self.org_cache_wipe_cache = borg.cache.Cache.wipe_cache + self.org_cache_wipe_cache = borg.cache.LocalCache.wipe_cache def wipe_should_not_be_called(*a, **kw): raise AssertionError("Cache wipe was triggered, if this is part of the test add @pytest.mark.allow_cache_wipe") if 'allow_cache_wipe' not in request.keywords: - borg.cache.Cache.wipe_cache = wipe_should_not_be_called + borg.cache.LocalCache.wipe_cache = wipe_should_not_be_called request.addfinalizer(self.undo) def undo(self): - borg.cache.Cache.wipe_cache = self.org_cache_wipe_cache + borg.cache.LocalCache.wipe_cache = self.org_cache_wipe_cache @pytest.fixture(autouse=True) diff --git a/docs/internals/frontends.rst b/docs/internals/frontends.rst index 4000bede7..c41d427eb 100644 --- a/docs/internals/frontends.rst +++ b/docs/internals/frontends.rst @@ -504,6 +504,7 @@ Errors Operations - cache.begin_transaction + - cache.download_chunks, appears with ``borg create --no-cache-sync`` - cache.commit - cache.sync diff --git a/src/borg/archiver.py b/src/borg/archiver.py index 4536b83d8..579a0bfc7 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -504,7 +504,7 @@ class Archiver: t0_monotonic = time.monotonic() if not dry_run: with Cache(repository, key, manifest, do_files=args.cache_files, progress=args.progress, - lock_wait=self.lock_wait) as cache: + lock_wait=self.lock_wait, permit_adhoc_cache=args.no_cache_sync) as cache: archive = Archive(repository, key, manifest, args.location.archive, cache=cache, create=True, checkpoint_interval=args.checkpoint_interval, numeric_owner=args.numeric_owner, noatime=args.noatime, noctime=args.noctime, @@ -2826,6 +2826,8 @@ class Archiver: help='only display items with the given status characters') subparser.add_argument('--json', action='store_true', help='output stats as JSON (implies --stats)') + subparser.add_argument('--no-cache-sync', dest='no_cache_sync', action='store_true', + help='experimental: do not synchronize the cache') exclude_group = subparser.add_argument_group('Exclusion options') exclude_group.add_argument('-e', '--exclude', dest='patterns', diff --git a/src/borg/cache.py b/src/borg/cache.py index 70d6f029b..280df6c74 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -4,6 +4,7 @@ import shutil import stat from binascii import unhexlify from collections import namedtuple +from time import perf_counter import msgpack @@ -30,6 +31,7 @@ from .crypto.file_integrity import IntegrityCheckedFile, DetachedIntegrityChecke from .locking import Lock from .platform import SaveFile from .remote import cache_if_remote +from .repository import LIST_SCAN_LIMIT FileCacheEntry = namedtuple('FileCacheEntry', 'age inode size mtime chunk_ids') @@ -347,6 +349,69 @@ class Cache: os.remove(config) # kill config first shutil.rmtree(path) + def __new__(cls, repository, key, manifest, path=None, sync=True, do_files=False, warn_if_unencrypted=True, + progress=False, lock_wait=None, permit_adhoc_cache=False): + def local(): + return LocalCache(repository=repository, key=key, manifest=manifest, path=path, sync=sync, + do_files=do_files, warn_if_unencrypted=warn_if_unencrypted, progress=progress, + lock_wait=lock_wait) + + def adhoc(): + return AdHocCache(repository=repository, key=key, manifest=manifest) + + if not permit_adhoc_cache: + return local() + + # ad-hoc cache may be permitted, but if the local cache is in sync it'd be stupid to invalidate + # it by needlessly using the ad-hoc cache. + # Check if the local cache exists and is in sync. + + cache_config = CacheConfig(repository, path, lock_wait) + if cache_config.exists(): + with cache_config: + cache_in_sync = cache_config.manifest_id == manifest.id + # Don't nest cache locks + if cache_in_sync: + # Local cache is in sync, use it + logger.debug('Cache: choosing local cache (in sync)') + return local() + logger.debug('Cache: choosing ad-hoc cache (local cache does not exist or is not in sync)') + return adhoc() + + +class CacheStatsMixin: + str_format = """\ +All archives: {0.total_size:>20s} {0.total_csize:>20s} {0.unique_csize:>20s} + + Unique chunks Total chunks +Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" + + def __str__(self): + return self.str_format.format(self.format_tuple()) + + Summary = namedtuple('Summary', ['total_size', 'total_csize', 'unique_size', 'unique_csize', 'total_unique_chunks', + 'total_chunks']) + + def stats(self): + # XXX: this should really be moved down to `hashindex.pyx` + stats = self.Summary(*self.chunks.summarize())._asdict() + return stats + + def format_tuple(self): + stats = self.stats() + for field in ['total_size', 'total_csize', 'unique_csize']: + stats[field] = format_file_size(stats[field]) + return self.Summary(**stats) + + def chunks_stored_size(self): + return self.stats()['unique_csize'] + + +class LocalCache(CacheStatsMixin): + """ + Persistent, local (client-side) cache. + """ + def __init__(self, repository, key, manifest, path=None, sync=True, do_files=False, warn_if_unencrypted=True, progress=False, lock_wait=None): """ @@ -394,31 +459,6 @@ class Cache: def __exit__(self, exc_type, exc_val, exc_tb): self.close() - def __str__(self): - fmt = """\ -All archives: {0.total_size:>20s} {0.total_csize:>20s} {0.unique_csize:>20s} - - Unique chunks Total chunks -Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" - return fmt.format(self.format_tuple()) - - Summary = namedtuple('Summary', ['total_size', 'total_csize', 'unique_size', 'unique_csize', 'total_unique_chunks', - 'total_chunks']) - - def stats(self): - # XXX: this should really be moved down to `hashindex.pyx` - stats = self.Summary(*self.chunks.summarize())._asdict() - return stats - - def format_tuple(self): - stats = self.stats() - for field in ['total_size', 'total_csize', 'unique_csize']: - stats[field] = format_file_size(stats[field]) - return self.Summary(**stats) - - def chunks_stored_size(self): - return self.stats()['unique_csize'] - def create(self): """Create a new empty cache at `self.path` """ @@ -547,10 +587,14 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" archive indexes. """ archive_path = os.path.join(self.path, 'chunks.archive.d') + # An index of chunks were the size had to be fetched + chunks_fetched_size_index = ChunkIndex() # Instrumentation processed_item_metadata_bytes = 0 processed_item_metadata_chunks = 0 compact_chunks_archive_saved_space = 0 + fetched_chunks_for_csize = 0 + fetched_bytes_for_csize = 0 def mkpath(id, suffix=''): id_hex = bin_to_hex(id) @@ -588,6 +632,34 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" except FileNotFoundError: pass + def fetch_missing_csize(chunk_idx): + """ + Archives created with AdHocCache will have csize=0 in all chunk list entries whose + chunks were already in the repository. + + Scan *chunk_idx* for entries where csize=0 and fill in the correct information. + """ + nonlocal fetched_chunks_for_csize + nonlocal fetched_bytes_for_csize + + all_missing_ids = chunk_idx.zero_csize_ids() + fetch_ids = [] + for id_ in all_missing_ids: + already_fetched_entry = chunks_fetched_size_index.get(id_) + if already_fetched_entry: + entry = chunk_idx[id_]._replace(csize=already_fetched_entry.csize) + assert entry.size == already_fetched_entry.size, 'Chunk size mismatch' + chunk_idx[id_] = entry + else: + fetch_ids.append(id_) + + for id_, data in zip(fetch_ids, decrypted_repository.repository.get_many(fetch_ids)): + entry = chunk_idx[id_]._replace(csize=len(data)) + chunk_idx[id_] = entry + chunks_fetched_size_index[id_] = entry + fetched_chunks_for_csize += 1 + fetched_bytes_for_csize += len(data) + def fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx): nonlocal processed_item_metadata_bytes nonlocal processed_item_metadata_chunks @@ -603,6 +675,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" processed_item_metadata_chunks += 1 sync.feed(data) if self.do_cache: + fetch_missing_csize(chunk_idx) write_archive_index(archive_id, chunk_idx) def write_archive_index(archive_id, chunk_idx): @@ -698,8 +771,13 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" chunk_idx = chunk_idx or ChunkIndex(master_index_capacity) logger.info('Fetching archive index for %s ...', archive_name) fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx) + if not self.do_cache: + fetch_missing_csize(chunk_idx) pi.finish() - logger.debug('Cache sync: processed %s bytes (%d chunks) of metadata', + logger.debug('Cache sync: had to fetch %s (%d chunks) because no archive had a csize set for them ' + '(due to --no-cache-sync)', + format_file_size(fetched_bytes_for_csize), fetched_chunks_for_csize) + logger.debug('Cache sync: processed %s (%d chunks) of metadata', format_file_size(processed_item_metadata_bytes), processed_item_metadata_chunks) logger.debug('Cache sync: compact chunks.archive.d storage saved %s bytes', format_file_size(compact_chunks_archive_saved_space)) @@ -843,3 +921,132 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" entry = FileCacheEntry(age=0, inode=st.st_ino, size=st.st_size, mtime=int_to_bigint(mtime_ns), chunk_ids=ids) self.files[path_hash] = msgpack.packb(entry) self._newest_mtime = max(self._newest_mtime or 0, mtime_ns) + + +class AdHocCache(CacheStatsMixin): + """ + Ad-hoc, non-persistent cache. + + Compared to the standard LocalCache the AdHocCache does not maintain accurate reference count, + nor does it provide a files cache (which would require persistence). Chunks that were not added + during the current AdHocCache lifetime won't have correct size/csize set (0 bytes) and will + have an infinite reference count (MAX_VALUE). + """ + + str_format = """\ +All archives: unknown unknown unknown + + Unique chunks Total chunks +Chunk index: {0.total_unique_chunks:20d} unknown""" + + def __init__(self, repository, key, manifest, warn_if_unencrypted=True): + self.repository = repository + self.key = key + self.manifest = manifest + self._txn_active = False + + self.security_manager = SecurityManager(repository) + self.security_manager.assert_secure(manifest, key) + + logger.warning('Note: --no-cache-sync is an experimental feature.') + + # Public API + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + files = None + do_files = False + + def file_known_and_unchanged(self, path_hash, st, ignore_inode=False): + pass + + def memorize_file(self, path_hash, st, ids): + pass + + def add_chunk(self, id, chunk, stats, overwrite=False, wait=True): + assert not overwrite, 'AdHocCache does not permit overwrites — trying to use it for recreate?' + if not self._txn_active: + self._begin_txn() + size = len(chunk) + refcount = self.seen_chunk(id, size) + if refcount: + return self.chunk_incref(id, stats) + data = self.key.encrypt(chunk) + csize = len(data) + self.repository.put(id, data, wait=wait) + self.chunks.add(id, 1, size, csize) + stats.update(size, csize, not refcount) + return ChunkListEntry(id, size, csize) + + def seen_chunk(self, id, size=None): + return self.chunks.get(id, ChunkIndexEntry(0, None, None)).refcount + + def chunk_incref(self, id, stats): + if not self._txn_active: + self._begin_txn() + count, size, csize = self.chunks.incref(id) + stats.update(size, csize, False) + return ChunkListEntry(id, size, csize) + + def chunk_decref(self, id, stats, wait=True): + if not self._txn_active: + self._begin_txn() + count, size, csize = self.chunks.decref(id) + if count == 0: + del self.chunks[id] + self.repository.delete(id, wait=wait) + stats.update(-size, -csize, True) + else: + stats.update(-size, -csize, False) + + def commit(self): + if not self._txn_active: + return + self.security_manager.save(self.manifest, self.key) + self._txn_active = False + + def rollback(self): + self._txn_active = False + del self.chunks + + # Private API + + def _begin_txn(self): + self._txn_active = True + # Explicitly set the initial hash table capacity to avoid performance issues + # due to hash table "resonance". + # Since we're creating an archive, add 10 % from the start. + num_chunks = len(self.repository) + capacity = int(num_chunks / ChunkIndex.MAX_LOAD_FACTOR * 1.1) + self.chunks = ChunkIndex(capacity) + pi = ProgressIndicatorPercent(total=num_chunks, msg='Downloading chunk list... %3.0f%%', + msgid='cache.download_chunks') + t0 = perf_counter() + num_requests = 0 + marker = None + while True: + result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker) + num_requests += 1 + if not result: + break + pi.show(increase=len(result)) + marker = result[-1] + # All chunks from the repository have a refcount of MAX_VALUE, which is sticky, + # therefore we can't/won't delete them. Chunks we added ourselves in this transaction + # (e.g. checkpoint archives) are tracked correctly. + init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0, csize=0) + for id_ in result: + self.chunks[id_] = init_entry + assert len(self.chunks) == num_chunks + # LocalCache does not contain the manifest, either. + del self.chunks[self.manifest.MANIFEST_ID] + duration = perf_counter() - t0 + pi.finish() + logger.debug('AdHocCache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s', + num_chunks, duration, num_requests, format_file_size(num_chunks * 34 / duration)) + # Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes. + # Protocol overhead is neglected in this calculation. diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index b8e86d142..0d271ad65 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -8,8 +8,9 @@ from libc.stdint cimport uint32_t, UINT32_MAX, uint64_t from libc.errno cimport errno from cpython.exc cimport PyErr_SetFromErrnoWithFilename from cpython.buffer cimport PyBUF_SIMPLE, PyObject_GetBuffer, PyBuffer_Release +from cpython.bytes cimport PyBytes_FromStringAndSize -API_VERSION = '1.1_05' +API_VERSION = '1.1_06' cdef extern from "_hashindex.c": @@ -410,6 +411,22 @@ cdef class ChunkIndex(IndexBase): break self._add(key, (key + self.key_size)) + def zero_csize_ids(self): + cdef void *key = NULL + cdef uint32_t *values + entries = [] + while True: + key = hashindex_next_key(self.index, key) + if not key: + break + values = (key + self.key_size) + refcount = _le32toh(values[0]) + assert refcount <= _MAX_VALUE, "invalid reference count" + if _le32toh(values[2]) == 0: + # csize == 0 + entries.append(PyBytes_FromStringAndSize( key, self.key_size)) + return entries + cdef class ChunkKeyIterator: cdef ChunkIndex idx diff --git a/src/borg/helpers.py b/src/borg/helpers.py index 9100bedf5..870d20219 100644 --- a/src/borg/helpers.py +++ b/src/borg/helpers.py @@ -131,7 +131,7 @@ class MandatoryFeatureUnsupported(Error): def check_extension_modules(): from . import platform, compress, item - if hashindex.API_VERSION != '1.1_05': + if hashindex.API_VERSION != '1.1_06': raise ExtensionModuleError if chunker.API_VERSION != '1.1_01': raise ExtensionModuleError @@ -2010,7 +2010,7 @@ class BorgJsonEncoder(json.JSONEncoder): from .repository import Repository from .remote import RemoteRepository from .archive import Archive - from .cache import Cache + from .cache import LocalCache, AdHocCache if isinstance(o, Repository) or isinstance(o, RemoteRepository): return { 'id': bin_to_hex(o.id), @@ -2018,11 +2018,15 @@ class BorgJsonEncoder(json.JSONEncoder): } if isinstance(o, Archive): return o.info() - if isinstance(o, Cache): + if isinstance(o, LocalCache): return { 'path': o.path, 'stats': o.stats(), } + if isinstance(o, AdHocCache): + return { + 'stats': o.stats(), + } return super().default(o) diff --git a/src/borg/repository.py b/src/borg/repository.py index d15d51b1e..f73e9cf54 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -34,7 +34,7 @@ TAG_PUT = 0 TAG_DELETE = 1 TAG_COMMIT = 2 -LIST_SCAN_LIMIT = 10000 # repo.list() / .scan() result count limit the borg client uses +LIST_SCAN_LIMIT = 100000 # repo.list() / .scan() result count limit the borg client uses FreeSpace = partial(defaultdict, int) diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py index e9bcef5e6..4e7cb5a58 100644 --- a/src/borg/testsuite/archiver.py +++ b/src/borg/testsuite/archiver.py @@ -34,7 +34,7 @@ import borg from .. import xattr, helpers, platform from ..archive import Archive, ChunkBuffer, flags_noatime, flags_normal from ..archiver import Archiver, parse_storage_quota -from ..cache import Cache +from ..cache import Cache, LocalCache from ..constants import * # NOQA from ..crypto.low_level import bytes_to_long, num_aes_blocks from ..crypto.key import KeyfileKeyBase, RepoKey, KeyfileKey, Passphrase, TAMRequiredError @@ -1031,6 +1031,18 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert out_list.index('d x/a') < out_list.index('- x/a/foo_a') assert out_list.index('d x/b') < out_list.index('- x/b/foo_b') + def test_create_no_cache_sync(self): + self.create_test_files() + self.cmd('init', '--encryption=repokey', self.repository_location) + self.cmd('delete', '--cache-only', self.repository_location) + create_json = json.loads(self.cmd('create', '--no-cache-sync', self.repository_location + '::test', 'input', + '--json', '--error')) # ignore experimental warning + info_json = json.loads(self.cmd('info', self.repository_location + '::test', '--json')) + create_stats = create_json['cache']['stats'] + info_stats = info_json['cache']['stats'] + assert create_stats == info_stats + self.cmd('check', self.repository_location) + def test_extract_pattern_opt(self): self.cmd('init', '--encryption=repokey', self.repository_location) self.create_regular_file('file1', size=1024 * 80) @@ -1509,14 +1521,14 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.cmd('create', self.repository_location + '::test', 'input') else: called = False - wipe_cache_safe = Cache.wipe_cache + wipe_cache_safe = LocalCache.wipe_cache def wipe_wrapper(*args): nonlocal called called = True wipe_cache_safe(*args) - with patch.object(Cache, 'wipe_cache', wipe_wrapper): + with patch.object(LocalCache, 'wipe_cache', wipe_wrapper): self.cmd('create', self.repository_location + '::test', 'input') assert called @@ -2223,7 +2235,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) with Cache(repository, key, manifest, sync=False) as cache: original_chunks = cache.chunks - cache.destroy(repository) + Cache.destroy(repository) with Cache(repository, key, manifest) as cache: correct_chunks = cache.chunks assert original_chunks is not correct_chunks