From 740898d83ba9d83589bcbd607050d23007de318a Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Tue, 7 Mar 2017 15:13:59 +0100 Subject: [PATCH 01/17] CacheSynchronizer --- setup.py | 2 +- src/borg/_cache.c | 159 +++++++++++++++++++++++++++++++++++++++++ src/borg/cache.py | 12 +--- src/borg/hashindex.pyx | 38 ++++++++-- src/borg/helpers.py | 2 +- 5 files changed, 198 insertions(+), 15 deletions(-) create mode 100644 src/borg/_cache.c diff --git a/setup.py b/setup.py index 726c849c3..6878ed96b 100644 --- a/setup.py +++ b/setup.py @@ -600,7 +600,7 @@ if not on_rtd: ext_modules += [ Extension('borg.compress', [compress_source], libraries=['lz4'], include_dirs=include_dirs, library_dirs=library_dirs, define_macros=define_macros), Extension('borg.crypto.low_level', [crypto_ll_source], libraries=crypto_libraries, include_dirs=include_dirs, library_dirs=library_dirs, define_macros=define_macros), - Extension('borg.hashindex', [hashindex_source]), + Extension('borg.hashindex', [hashindex_source], libraries=['msgpackc']), Extension('borg.item', [item_source]), Extension('borg.algorithms.chunker', [chunker_source]), Extension('borg.algorithms.checksums', [checksums_source]), diff --git a/src/borg/_cache.c b/src/borg/_cache.c new file mode 100644 index 000000000..880608ff7 --- /dev/null +++ b/src/borg/_cache.c @@ -0,0 +1,159 @@ + +#include + +// 2**32 - 1025 +#define _MAX_VALUE ( (uint32_t) 4294966271 ) + +#define MIN(x, y) ((x) < (y) ? (x): (y)) + +typedef struct { + HashIndex *chunks; + + msgpack_unpacker unpacker; + msgpack_unpacked unpacked; + const char *error; +} CacheSyncCtx; + +static CacheSyncCtx * +cache_sync_init(HashIndex *chunks) +{ + CacheSyncCtx *ctx; + if (!(ctx = malloc(sizeof(CacheSyncCtx)))) { + return NULL; + } + + ctx->chunks = chunks; + ctx->error = NULL; + + if(!msgpack_unpacker_init(&ctx->unpacker, MSGPACK_UNPACKER_INIT_BUFFER_SIZE)) { + free(ctx); + return NULL; + } + + msgpack_unpacked_init(&ctx->unpacked); + + return ctx; +} + +static void +cache_sync_free(CacheSyncCtx *ctx) +{ + msgpack_unpacker_destroy(&ctx->unpacker); + msgpack_unpacked_destroy(&ctx->unpacked); + free(ctx); +} + +static const char * +cache_sync_error(CacheSyncCtx *ctx) +{ + return ctx->error; +} + +static int +cache_process_chunks(CacheSyncCtx *ctx, msgpack_object_array *array) +{ + uint32_t i; + const char *key; + uint32_t cache_values[3]; + uint32_t *cache_entry; + uint64_t refcount; + msgpack_object *current; + for (i = 0; i < array->size; i++) { + current = &array->ptr[i]; + + if (current->type != MSGPACK_OBJECT_ARRAY || current->via.array.size != 3 + || current->via.array.ptr[0].type != MSGPACK_OBJECT_STR || current->via.array.ptr[0].via.str.size != 32 + || current->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER + || current->via.array.ptr[2].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + ctx->error = "Malformed chunk list entry"; + return 0; + } + + key = current->via.array.ptr[0].via.str.ptr; + cache_entry = (uint32_t*) hashindex_get(ctx->chunks, key); + if (cache_entry) { + refcount = _le32toh(cache_entry[0]); + refcount += 1; + cache_entry[0] = _htole32(MIN(refcount, _MAX_VALUE)); + } else { + /* refcount, size, csize */ + cache_values[0] = 1; + cache_values[1] = current->via.array.ptr[1].via.u64; + cache_values[2] = current->via.array.ptr[2].via.u64; + if (!hashindex_set(ctx->chunks, key, cache_values)) { + ctx->error = "hashindex_set failed"; + return 0; + } + } + } + return 1; +} + +/** + * feed data to the cache synchronizer + * 0 = abort, 1 = continue + * abort is a regular condition, check cache_sync_error + */ +static int +cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length) +{ + msgpack_unpack_return unpack_status; + + /* grow buffer if necessary */ + if (msgpack_unpacker_buffer_capacity(&ctx->unpacker) < length) { + if (!msgpack_unpacker_reserve_buffer(&ctx->unpacker, length)) { + return 0; + } + } + + memcpy(msgpack_unpacker_buffer(&ctx->unpacker), data, length); + msgpack_unpacker_buffer_consumed(&ctx->unpacker, length); + + do { + unpack_status = msgpack_unpacker_next(&ctx->unpacker, &ctx->unpacked); + + switch (unpack_status) { + case MSGPACK_UNPACK_SUCCESS: + { + uint32_t i; + msgpack_object *item = &ctx->unpacked.data; + msgpack_object_kv *current; + + if (item->type != MSGPACK_OBJECT_MAP) { + ctx->error = "Unexpected data type in item stream"; + return 0; + } + + for (i = 0; i < item->via.map.size; i++) { + current = &item->via.map.ptr[i]; + + if (current->key.type != MSGPACK_OBJECT_STR) { + ctx->error = "Invalid key data type in item"; + return 0; + } + + if (current->key.via.str.size == 6 + && !memcmp(current->key.via.str.ptr, "chunks", 6)) { + + if (current->val.type != MSGPACK_OBJECT_ARRAY) { + ctx->error = "Unexpected value type of item chunks"; + return 0; + } + + if (!cache_process_chunks(ctx, ¤t->val.via.array)) { + return 0; + } + } + } + } + break; + case MSGPACK_UNPACK_PARSE_ERROR: + ctx->error = "Malformed msgpack"; + return 0; + default: + break; + } + } while (unpack_status != MSGPACK_UNPACK_CONTINUE); + + return 1; +} diff --git a/src/borg/cache.py b/src/borg/cache.py index 13045f0e9..c9fa70b7f 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -12,7 +12,7 @@ from .logger import create_logger logger = create_logger() from .constants import CACHE_README -from .hashindex import ChunkIndex, ChunkIndexEntry +from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer from .helpers import Location from .helpers import Error from .helpers import get_cache_dir, get_security_dir @@ -571,17 +571,11 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" archive = ArchiveItem(internal_dict=msgpack.unpackb(data)) if archive.version != 1: raise Exception('Unknown archive metadata version') - unpacker = msgpack.Unpacker() + sync = CacheSynchronizer(chunk_idx) for item_id, chunk in zip(archive.items, repository.get_many(archive.items)): data = key.decrypt(item_id, chunk) chunk_idx.add(item_id, 1, len(data), len(chunk)) - unpacker.feed(data) - for item in unpacker: - if not isinstance(item, dict): - logger.error('Error: Did not get expected metadata dict - archive corrupted!') - continue # XXX: continue?! - for chunk_id, size, csize in item.get(b'chunks', []): - chunk_idx.add(chunk_id, 1, size, csize) + sync.feed(data) if self.do_cache: fn = mkpath(archive_id) fn_tmp = mkpath(archive_id, suffix='.tmp') diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index 2409836fe..75ba1df35 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -8,7 +8,7 @@ from libc.stdint cimport uint32_t, UINT32_MAX, uint64_t from libc.errno cimport errno from cpython.exc cimport PyErr_SetFromErrnoWithFilename -API_VERSION = '1.1_01' +API_VERSION = '1.1_02' cdef extern from "_hashindex.c": @@ -31,6 +31,18 @@ cdef extern from "_hashindex.c": double HASH_MAX_LOAD +cdef extern from "_cache.c": + ctypedef struct CacheSyncCtx: + pass + + CacheSyncCtx *cache_sync_init(HashIndex *chunks) + const char *cache_sync_error(CacheSyncCtx *ctx) + int cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length) + void cache_sync_free(CacheSyncCtx *ctx) + + uint32_t _MAX_VALUE + + cdef _NoDefault = object() """ @@ -50,9 +62,6 @@ AssertionError is raised instead. assert UINT32_MAX == 2**32-1 -# module-level constant because cdef's in classes can't have default values -cdef uint32_t _MAX_VALUE = 2**32-1025 - assert _MAX_VALUE % 2 == 1 @@ -375,3 +384,24 @@ cdef class ChunkKeyIterator: cdef uint32_t refcount = _le32toh(value[0]) assert refcount <= _MAX_VALUE, "invalid reference count" return (self.key)[:self.key_size], ChunkIndexEntry(refcount, _le32toh(value[1]), _le32toh(value[2])) + + +cdef class CacheSynchronizer: + cdef ChunkIndex chunks + cdef CacheSyncCtx *sync + + def __cinit__(self, chunks): + self.chunks = chunks + self.sync = cache_sync_init(self.chunks.index) + if not self.sync: + raise Exception('cache_sync_init failed') + + def __dealloc__(self): + if self.sync: + cache_sync_free(self.sync) + + def feed(self, chunk): + if not cache_sync_feed(self.sync, chunk, len(chunk)): + error = cache_sync_error(self.sync) + if error is not None: + raise Exception('cache_sync_feed failed: ' + error.decode('ascii')) diff --git a/src/borg/helpers.py b/src/borg/helpers.py index db66b822a..7e4d4baf9 100644 --- a/src/borg/helpers.py +++ b/src/borg/helpers.py @@ -126,7 +126,7 @@ def check_python(): def check_extension_modules(): from . import platform, compress, item - if hashindex.API_VERSION != '1.1_01': + if hashindex.API_VERSION != '1.1_02': raise ExtensionModuleError if chunker.API_VERSION != '1.1_01': raise ExtensionModuleError From 9f8b967a6f45bb7dbbf1f37cf231ea82f149a0f6 Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Fri, 26 May 2017 12:30:15 +0200 Subject: [PATCH 02/17] cache sync: initialize master index to known capacity --- src/borg/cache.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index c9fa70b7f..cd3a9951d 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -603,6 +603,9 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" # deallocates old hashindex, creates empty hashindex: chunk_idx.clear() cleanup_outdated(cached_ids - archive_ids) + # Explicitly set the initial hash table capacity to avoid performance issues + # due to hash table "resonance". + master_index_capacity = int(len(self.repository) / ChunkIndex.MAX_LOAD_FACTOR) if archive_ids: chunk_idx = None if self.progress: @@ -630,7 +633,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" # Do not make this an else branch; the FileIntegrityError exception handler # above can remove *archive_id* from *cached_ids*. logger.info('Fetching and building archive index for %s ...', archive_name) - archive_chunk_idx = ChunkIndex() + archive_chunk_idx = ChunkIndex(master_index_capacity) fetch_and_build_idx(archive_id, repository, self.key, archive_chunk_idx) logger.info("Merging into master chunks index ...") if chunk_idx is None: @@ -641,7 +644,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" else: chunk_idx.merge(archive_chunk_idx) else: - chunk_idx = chunk_idx or ChunkIndex() + chunk_idx = chunk_idx or ChunkIndex(master_index_capacity) logger.info('Fetching archive index for %s ...', archive_name) fetch_and_build_idx(archive_id, repository, self.key, chunk_idx) if self.progress: From 167875b753290f468751a973f0be719972c265a9 Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Fri, 26 May 2017 13:54:28 +0200 Subject: [PATCH 03/17] cache sync: fix n^2 behaviour in lookup_name --- src/borg/cache.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index cd3a9951d..e34a3426e 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -588,10 +588,16 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" else: os.rename(fn_tmp, fn) - def lookup_name(archive_id): + def get_archive_ids_to_names(archive_ids): + # Pass once over all archives and build a mapping from ids to names. + # The easier approach, doing a similar loop for each archive, has + # square complexity and does about a dozen million functions calls + # with 1100 archives (which takes 30s CPU seconds _alone_). + archive_names = {} for info in self.manifest.archives.list(): - if info.id == archive_id: - return info.name + if info.id in archive_ids: + archive_names[info.id] = info.name + return archive_names def create_master_idx(chunk_idx): logger.info('Synchronizing chunks cache...') @@ -612,8 +618,9 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" pi = ProgressIndicatorPercent(total=len(archive_ids), step=0.1, msg='%3.0f%% Syncing chunks cache. Processing archive %s', msgid='cache.sync') + archive_ids_to_names = get_archive_ids_to_names(archive_ids) for archive_id in archive_ids: - archive_name = lookup_name(archive_id) + archive_name = archive_ids_to_names.pop(archive_id) if self.progress: pi.show(info=[remove_surrogates(archive_name)]) if self.do_cache: From bf895950acb27872b4c084f394df0f7f0c027d60 Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Fri, 20 May 2016 01:54:42 +0200 Subject: [PATCH 04/17] RepositoryCache: limit cache size Unbounded cache size is inacceptable. I don't see why a full-fledged repository needs to be used here, either, since this cache requires none of the consistency or durability guarantees made by it (and bought with a performance impact). A notable issue is that posix_fadvise is slow (for some reason) on tmpfs, which could eat 30-35 % of the total CPU time of a cache sync. --- src/borg/remote.py | 102 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 82 insertions(+), 20 deletions(-) diff --git a/src/borg/remote.py b/src/borg/remote.py index 47c59741e..c991dbefb 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -7,6 +7,7 @@ import logging import os import select import shlex +import shutil import sys import tempfile import textwrap @@ -23,6 +24,7 @@ from .helpers import get_home_dir from .helpers import hostname_is_unique from .helpers import replace_placeholders from .helpers import sysinfo +from .helpers import format_file_size from .logger import create_logger, setup_logging from .repository import Repository, MAX_OBJECT_SIZE, LIST_SCAN_LIMIT from .version import parse_version, format_version @@ -1058,45 +1060,105 @@ class RepositoryNoCache: self.close() def get(self, key): - return next(self.get_many([key])) + return next(self.get_many([key], cache=False)) - def get_many(self, keys): + def get_many(self, keys, cache=True): for data in self.repository.get_many(keys): yield data class RepositoryCache(RepositoryNoCache): - """A caching Repository wrapper - - Caches Repository GET operations using a local temporary Repository. """ - # maximum object size that will be cached, 64 kiB. - THRESHOLD = 2**16 + A caching Repository wrapper. + + Caches Repository GET operations locally. + """ def __init__(self, repository): super().__init__(repository) - tmppath = tempfile.mkdtemp(prefix='borg-tmp') - self.caching_repo = Repository(tmppath, create=True, exclusive=True) - self.caching_repo.__enter__() # handled by context manager in base class + self.cache = set() + self.basedir = tempfile.mkdtemp(prefix='borg-cache-') + self.query_size_limit() + self.size = 0 + # Instrumentation + self.hits = 0 + self.misses = 0 + self.slow_misses = 0 + self.slow_lat = 0.0 + self.evictions = 0 + self.enospc = 0 + + def query_size_limit(self): + stat_fs = os.statvfs(self.basedir) + available_space = stat_fs.f_bsize * stat_fs.f_bavail + self.size_limit = int(min(available_space * 0.25, 2**31)) + + def key_filename(self, key): + return os.path.join(self.basedir, bin_to_hex(key)) + + def backoff(self): + self.query_size_limit() + target_size = int(0.9 * self.size_limit) + while self.size > target_size and self.cache: + key = self.cache.pop() + file = self.key_filename(key) + self.size -= os.stat(file).st_size + os.unlink(file) + self.evictions += 1 + + def add_entry(self, key, data): + file = self.key_filename(key) + try: + with open(file, 'wb') as fd: + fd.write(data) + except OSError as os_error: + if os_error.errno == errno.ENOSPC: + self.enospc += 1 + self.backoff() + else: + raise + else: + self.size += len(data) + self.cache.add(key) + if self.size > self.size_limit: + self.backoff() + return data def close(self): - if self.caching_repo is not None: - self.caching_repo.destroy() - self.caching_repo = None + logger.debug('RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), ' + '%d evictions, %d ENOSPC hit', + len(self.cache), format_file_size(self.size), format_file_size(self.size_limit), + self.hits, self.misses, self.slow_misses, self.slow_lat, + self.evictions, self.enospc) + self.cache.clear() + shutil.rmtree(self.basedir) - def get_many(self, keys): - unknown_keys = [key for key in keys if key not in self.caching_repo] + def get_many(self, keys, cache=True): + unknown_keys = [key for key in keys if key not in self.cache] repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys)) for key in keys: - try: - yield self.caching_repo.get(key) - except Repository.ObjectNotFound: + if key in self.cache: + file = self.key_filename(key) + with open(file, 'rb') as fd: + self.hits += 1 + yield fd.read() + else: for key_, data in repository_iterator: if key_ == key: - if len(data) <= self.THRESHOLD: - self.caching_repo.put(key, data) + if cache: + self.add_entry(key, data) + self.misses += 1 yield data break + else: + # slow path: eviction during this get_many removed this key from the cache + t0 = time.perf_counter() + data = self.repository.get(key) + self.slow_lat += time.perf_counter() - t0 + if cache: + self.add_entry(key, data) + self.slow_misses += 1 + yield data # Consume any pending requests for _ in repository_iterator: pass From c786a5941eb560b51264c24e18f9c1442fb721bb Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Fri, 26 May 2017 22:54:27 +0200 Subject: [PATCH 05/17] CacheSynchronizer: redo as quasi FSM on top of unpack.h This is a (relatively) simple state machine running in the data callbacks invoked by the msgpack unpacking stack machine (the same machine is used in msgpack-c and msgpack-python, changes are minor and cosmetic, e.g. removal of msgpack_unpack_object, removal of the C++ template thus porting to C and so on). Compared to the previous solution this has multiple advantages - msgpack-c dependency is removed - this approach is faster and requires fewer and smaller memory allocations Testability of the two solutions does not differ in my professional opinion(tm). Two other changes were rolled up; _hashindex.c can be compiled without Python.h again (handy for fuzzing and testing); a "small" bug in the cache sync was fixed which allocated too large archive indices, leading to excessive archive.chunks.d disk usage (that actually gave me an idea). --- scripts/fuzz-cache-sync/HOWTO | 10 + scripts/fuzz-cache-sync/main.c | 30 ++ .../fuzz-cache-sync/testcase_dir/test_simple | Bin 0 -> 119 bytes setup.py | 4 +- src/borg/_cache.c | 159 -------- src/borg/_hashindex.c | 19 +- src/borg/cache.py | 2 +- src/borg/cache_sync/cache_sync.c | 108 +++++ src/borg/cache_sync/sysdep.h | 194 +++++++++ src/borg/cache_sync/unpack.h | 374 ++++++++++++++++++ src/borg/cache_sync/unpack_define.h | 95 +++++ src/borg/cache_sync/unpack_template.h | 359 +++++++++++++++++ src/borg/hashindex.pyx | 6 +- src/borg/testsuite/cache.py | 139 +++++++ 14 files changed, 1332 insertions(+), 167 deletions(-) create mode 100644 scripts/fuzz-cache-sync/HOWTO create mode 100644 scripts/fuzz-cache-sync/main.c create mode 100644 scripts/fuzz-cache-sync/testcase_dir/test_simple delete mode 100644 src/borg/_cache.c create mode 100644 src/borg/cache_sync/cache_sync.c create mode 100644 src/borg/cache_sync/sysdep.h create mode 100644 src/borg/cache_sync/unpack.h create mode 100644 src/borg/cache_sync/unpack_define.h create mode 100644 src/borg/cache_sync/unpack_template.h create mode 100644 src/borg/testsuite/cache.py diff --git a/scripts/fuzz-cache-sync/HOWTO b/scripts/fuzz-cache-sync/HOWTO new file mode 100644 index 000000000..ae144b287 --- /dev/null +++ b/scripts/fuzz-cache-sync/HOWTO @@ -0,0 +1,10 @@ +- Install AFL and the requirements for LLVM mode (see docs) +- Compile the fuzzing target, e.g. + + AFL_HARDEN=1 afl-clang-fast main.c -o fuzz-target -O3 + + (other options, like using ASan or MSan are possible as well) +- Add additional test cases to testcase_dir +- Run afl, easiest (but inefficient) way; + + afl-fuzz -i testcase_dir -o findings_dir ./fuzz-target diff --git a/scripts/fuzz-cache-sync/main.c b/scripts/fuzz-cache-sync/main.c new file mode 100644 index 000000000..b25d925db --- /dev/null +++ b/scripts/fuzz-cache-sync/main.c @@ -0,0 +1,30 @@ +#include "../../src/borg/_hashindex.c" +#include "../../src/borg/cache_sync/cache_sync.c" + +#define BUFSZ 32768 + +int main() { + char buf[BUFSZ]; + int len, ret; + CacheSyncCtx *ctx; + HashIndex *idx; + + /* capacity, key size, value size */ + idx = hashindex_init(0, 32, 12); + ctx = cache_sync_init(idx); + + while (1) { + len = read(0, buf, BUFSZ); + if (!len) { + break; + } + ret = cache_sync_feed(ctx, buf, len); + if(!ret && cache_sync_error(ctx)) { + fprintf(stderr, "error: %s\n", cache_sync_error(ctx)); + return 1; + } + } + hashindex_free(idx); + cache_sync_free(ctx); + return 0; +} diff --git a/scripts/fuzz-cache-sync/testcase_dir/test_simple b/scripts/fuzz-cache-sync/testcase_dir/test_simple new file mode 100644 index 0000000000000000000000000000000000000000..0bf5a0ea1616fcbcec4e17de7b347b00f5661ca1 GIT binary patch literal 119 zcmZo&oR*)zI4Q9Rh^x-BTmmuAis>yWElw?3mYh+Vmt72{CQZJ@pkRO>7&0;up~{Gf F82}?xBQF2| literal 0 HcmV?d00001 diff --git a/setup.py b/setup.py index 6878ed96b..951d61aff 100644 --- a/setup.py +++ b/setup.py @@ -91,6 +91,8 @@ try: 'src/borg/crypto/low_level.c', 'src/borg/algorithms/chunker.c', 'src/borg/algorithms/buzhash.c', 'src/borg/hashindex.c', 'src/borg/_hashindex.c', + 'src/borg/cache_sync/cache_sync.c', 'src/borg/cache_sync/sysdep.h', 'src/borg/cache_sync/unpack.h', + 'src/borg/cache_sync/unpack_define.h', 'src/borg/cache_sync/unpack_template.h', 'src/borg/item.c', 'src/borg/algorithms/checksums.c', 'src/borg/algorithms/crc32_dispatch.c', 'src/borg/algorithms/crc32_clmul.c', 'src/borg/algorithms/crc32_slice_by_8.c', @@ -600,7 +602,7 @@ if not on_rtd: ext_modules += [ Extension('borg.compress', [compress_source], libraries=['lz4'], include_dirs=include_dirs, library_dirs=library_dirs, define_macros=define_macros), Extension('borg.crypto.low_level', [crypto_ll_source], libraries=crypto_libraries, include_dirs=include_dirs, library_dirs=library_dirs, define_macros=define_macros), - Extension('borg.hashindex', [hashindex_source], libraries=['msgpackc']), + Extension('borg.hashindex', [hashindex_source]), Extension('borg.item', [item_source]), Extension('borg.algorithms.chunker', [chunker_source]), Extension('borg.algorithms.checksums', [checksums_source]), diff --git a/src/borg/_cache.c b/src/borg/_cache.c deleted file mode 100644 index 880608ff7..000000000 --- a/src/borg/_cache.c +++ /dev/null @@ -1,159 +0,0 @@ - -#include - -// 2**32 - 1025 -#define _MAX_VALUE ( (uint32_t) 4294966271 ) - -#define MIN(x, y) ((x) < (y) ? (x): (y)) - -typedef struct { - HashIndex *chunks; - - msgpack_unpacker unpacker; - msgpack_unpacked unpacked; - const char *error; -} CacheSyncCtx; - -static CacheSyncCtx * -cache_sync_init(HashIndex *chunks) -{ - CacheSyncCtx *ctx; - if (!(ctx = malloc(sizeof(CacheSyncCtx)))) { - return NULL; - } - - ctx->chunks = chunks; - ctx->error = NULL; - - if(!msgpack_unpacker_init(&ctx->unpacker, MSGPACK_UNPACKER_INIT_BUFFER_SIZE)) { - free(ctx); - return NULL; - } - - msgpack_unpacked_init(&ctx->unpacked); - - return ctx; -} - -static void -cache_sync_free(CacheSyncCtx *ctx) -{ - msgpack_unpacker_destroy(&ctx->unpacker); - msgpack_unpacked_destroy(&ctx->unpacked); - free(ctx); -} - -static const char * -cache_sync_error(CacheSyncCtx *ctx) -{ - return ctx->error; -} - -static int -cache_process_chunks(CacheSyncCtx *ctx, msgpack_object_array *array) -{ - uint32_t i; - const char *key; - uint32_t cache_values[3]; - uint32_t *cache_entry; - uint64_t refcount; - msgpack_object *current; - for (i = 0; i < array->size; i++) { - current = &array->ptr[i]; - - if (current->type != MSGPACK_OBJECT_ARRAY || current->via.array.size != 3 - || current->via.array.ptr[0].type != MSGPACK_OBJECT_STR || current->via.array.ptr[0].via.str.size != 32 - || current->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER - || current->via.array.ptr[2].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - ctx->error = "Malformed chunk list entry"; - return 0; - } - - key = current->via.array.ptr[0].via.str.ptr; - cache_entry = (uint32_t*) hashindex_get(ctx->chunks, key); - if (cache_entry) { - refcount = _le32toh(cache_entry[0]); - refcount += 1; - cache_entry[0] = _htole32(MIN(refcount, _MAX_VALUE)); - } else { - /* refcount, size, csize */ - cache_values[0] = 1; - cache_values[1] = current->via.array.ptr[1].via.u64; - cache_values[2] = current->via.array.ptr[2].via.u64; - if (!hashindex_set(ctx->chunks, key, cache_values)) { - ctx->error = "hashindex_set failed"; - return 0; - } - } - } - return 1; -} - -/** - * feed data to the cache synchronizer - * 0 = abort, 1 = continue - * abort is a regular condition, check cache_sync_error - */ -static int -cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length) -{ - msgpack_unpack_return unpack_status; - - /* grow buffer if necessary */ - if (msgpack_unpacker_buffer_capacity(&ctx->unpacker) < length) { - if (!msgpack_unpacker_reserve_buffer(&ctx->unpacker, length)) { - return 0; - } - } - - memcpy(msgpack_unpacker_buffer(&ctx->unpacker), data, length); - msgpack_unpacker_buffer_consumed(&ctx->unpacker, length); - - do { - unpack_status = msgpack_unpacker_next(&ctx->unpacker, &ctx->unpacked); - - switch (unpack_status) { - case MSGPACK_UNPACK_SUCCESS: - { - uint32_t i; - msgpack_object *item = &ctx->unpacked.data; - msgpack_object_kv *current; - - if (item->type != MSGPACK_OBJECT_MAP) { - ctx->error = "Unexpected data type in item stream"; - return 0; - } - - for (i = 0; i < item->via.map.size; i++) { - current = &item->via.map.ptr[i]; - - if (current->key.type != MSGPACK_OBJECT_STR) { - ctx->error = "Invalid key data type in item"; - return 0; - } - - if (current->key.via.str.size == 6 - && !memcmp(current->key.via.str.ptr, "chunks", 6)) { - - if (current->val.type != MSGPACK_OBJECT_ARRAY) { - ctx->error = "Unexpected value type of item chunks"; - return 0; - } - - if (!cache_process_chunks(ctx, ¤t->val.via.array)) { - return 0; - } - } - } - } - break; - case MSGPACK_UNPACK_PARSE_ERROR: - ctx->error = "Malformed msgpack"; - return 0; - default: - break; - } - } while (unpack_status != MSGPACK_UNPACK_CONTINUE); - - return 1; -} diff --git a/src/borg/_hashindex.c b/src/borg/_hashindex.c index b41d57b70..824a6eec3 100644 --- a/src/borg/_hashindex.c +++ b/src/borg/_hashindex.c @@ -1,6 +1,6 @@ -#include #include +#include #include #include #include @@ -56,8 +56,10 @@ typedef struct { int lower_limit; int upper_limit; int min_empty; +#ifdef Py_PYTHON_H /* buckets may be backed by a Python buffer. If buckets_buffer.buf is NULL then this is not used. */ Py_buffer buckets_buffer; +#endif } HashIndex; /* prime (or w/ big prime factors) hash table sizes @@ -106,8 +108,11 @@ static int hash_sizes[] = { #define EPRINTF(msg, ...) fprintf(stderr, "hashindex: " msg "(%s)\n", ##__VA_ARGS__, strerror(errno)) #define EPRINTF_PATH(path, msg, ...) fprintf(stderr, "hashindex: %s: " msg " (%s)\n", path, ##__VA_ARGS__, strerror(errno)) +#ifdef Py_PYTHON_H static HashIndex *hashindex_read(PyObject *file_py); static void hashindex_write(HashIndex *index, PyObject *file_py); +#endif + static HashIndex *hashindex_init(int capacity, int key_size, int value_size); static const void *hashindex_get(HashIndex *index, const void *key); static int hashindex_set(HashIndex *index, const void *key, const void *value); @@ -120,9 +125,12 @@ static void hashindex_free(HashIndex *index); static void hashindex_free_buckets(HashIndex *index) { +#ifdef Py_PYTHON_H if(index->buckets_buffer.buf) { PyBuffer_Release(&index->buckets_buffer); - } else { + } else +#endif + { free(index->buckets); } } @@ -263,6 +271,7 @@ count_empty(HashIndex *index) /* Public API */ +#ifdef Py_PYTHON_H static HashIndex * hashindex_read(PyObject *file_py) { @@ -418,6 +427,7 @@ fail_decref_header: fail: return index; } +#endif static HashIndex * hashindex_init(int capacity, int key_size, int value_size) @@ -444,7 +454,9 @@ hashindex_init(int capacity, int key_size, int value_size) index->lower_limit = get_lower_limit(index->num_buckets); index->upper_limit = get_upper_limit(index->num_buckets); index->min_empty = get_min_empty(index->num_buckets); +#ifdef Py_PYTHON_H index->buckets_buffer.buf = NULL; +#endif for(i = 0; i < capacity; i++) { BUCKET_MARK_EMPTY(index, i); } @@ -458,7 +470,7 @@ hashindex_free(HashIndex *index) free(index); } - +#ifdef Py_PYTHON_H static void hashindex_write(HashIndex *index, PyObject *file_py) { @@ -521,6 +533,7 @@ hashindex_write(HashIndex *index, PyObject *file_py) return; } } +#endif static const void * hashindex_get(HashIndex *index, const void *key) diff --git a/src/borg/cache.py b/src/borg/cache.py index e34a3426e..934f12a78 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -640,7 +640,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" # Do not make this an else branch; the FileIntegrityError exception handler # above can remove *archive_id* from *cached_ids*. logger.info('Fetching and building archive index for %s ...', archive_name) - archive_chunk_idx = ChunkIndex(master_index_capacity) + archive_chunk_idx = ChunkIndex() fetch_and_build_idx(archive_id, repository, self.key, archive_chunk_idx) logger.info("Merging into master chunks index ...") if chunk_idx is None: diff --git a/src/borg/cache_sync/cache_sync.c b/src/borg/cache_sync/cache_sync.c new file mode 100644 index 000000000..174e4b909 --- /dev/null +++ b/src/borg/cache_sync/cache_sync.c @@ -0,0 +1,108 @@ + +#include "unpack.h" + +typedef struct { + unpack_context ctx; + + char *buf; + size_t head; + size_t tail; + size_t size; +} CacheSyncCtx; + +static CacheSyncCtx * +cache_sync_init(HashIndex *chunks) +{ + CacheSyncCtx *ctx; + if (!(ctx = (CacheSyncCtx*)malloc(sizeof(CacheSyncCtx)))) { + return NULL; + } + + unpack_init(&ctx->ctx); + + ctx->ctx.user.chunks = chunks; + ctx->ctx.user.last_error = NULL; + ctx->ctx.user.level = 0; + ctx->ctx.user.inside_chunks = false; + ctx->buf = NULL; + ctx->head = 0; + ctx->tail = 0; + ctx->size = 0; + + return ctx; +} + +static void +cache_sync_free(CacheSyncCtx *ctx) +{ + if(ctx->buf) { + free(ctx->buf); + } + free(ctx); +} + +static const char * +cache_sync_error(CacheSyncCtx *ctx) +{ + return ctx->ctx.user.last_error; +} + +/** + * feed data to the cache synchronizer + * 0 = abort, 1 = continue + * abort is a regular condition, check cache_sync_error + */ +static int +cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length) +{ + size_t new_size; + int ret; + char *new_buf; + + if(ctx->tail + length > ctx->size) { + if((ctx->tail - ctx->head) + length <= ctx->size) { + /* | XXXXX| -> move data in buffer backwards -> |XXXXX | */ + memmove(ctx->buf, ctx->buf + ctx->head, ctx->tail - ctx->head); + ctx->tail -= ctx->head; + ctx->head = 0; + } else { + /* must expand buffer to fit all data */ + new_size = (ctx->tail - ctx->head) + length; + new_buf = (char*) malloc(new_size); + if(!new_buf) { + ctx->ctx.user.last_error = "cache_sync_feed: unable to allocate buffer"; + return 0; + } + memcpy(new_buf, ctx->buf + ctx->head, ctx->tail - ctx->head); + free(ctx->buf); + ctx->buf = new_buf; + ctx->tail -= ctx->head; + ctx->head = 0; + ctx->size = new_size; + } + } + + memcpy(ctx->buf + ctx->tail, data, length); + ctx->tail += length; + + while(1) { + if(ctx->head >= ctx->tail) { + return 1; /* request more bytes */ + } + + ret = unpack_execute(&ctx->ctx, ctx->buf, ctx->tail, &ctx->head); + if(ret == 1) { + unpack_init(&ctx->ctx); + continue; + } else if(ret == 0) { + return 1; + } else { + if(!ctx->ctx.user.last_error) { + ctx->ctx.user.last_error = "Unknown error"; + } + return 0; + } + } + /* unreachable */ + return 1; +} diff --git a/src/borg/cache_sync/sysdep.h b/src/borg/cache_sync/sysdep.h new file mode 100644 index 000000000..ed9c1bc0b --- /dev/null +++ b/src/borg/cache_sync/sysdep.h @@ -0,0 +1,194 @@ +/* + * MessagePack system dependencies + * + * Copyright (C) 2008-2010 FURUHASHI Sadayuki + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MSGPACK_SYSDEP_H__ +#define MSGPACK_SYSDEP_H__ + +#include +#include +#if defined(_MSC_VER) && _MSC_VER < 1600 +typedef __int8 int8_t; +typedef unsigned __int8 uint8_t; +typedef __int16 int16_t; +typedef unsigned __int16 uint16_t; +typedef __int32 int32_t; +typedef unsigned __int32 uint32_t; +typedef __int64 int64_t; +typedef unsigned __int64 uint64_t; +#elif defined(_MSC_VER) // && _MSC_VER >= 1600 +#include +#else +#include +#include +#endif + +#ifdef _WIN32 +#define _msgpack_atomic_counter_header +typedef long _msgpack_atomic_counter_t; +#define _msgpack_sync_decr_and_fetch(ptr) InterlockedDecrement(ptr) +#define _msgpack_sync_incr_and_fetch(ptr) InterlockedIncrement(ptr) +#elif defined(__GNUC__) && ((__GNUC__*10 + __GNUC_MINOR__) < 41) +#define _msgpack_atomic_counter_header "gcc_atomic.h" +#else +typedef unsigned int _msgpack_atomic_counter_t; +#define _msgpack_sync_decr_and_fetch(ptr) __sync_sub_and_fetch(ptr, 1) +#define _msgpack_sync_incr_and_fetch(ptr) __sync_add_and_fetch(ptr, 1) +#endif + +#ifdef _WIN32 + +#ifdef __cplusplus +/* numeric_limits::min,max */ +#ifdef max +#undef max +#endif +#ifdef min +#undef min +#endif +#endif + +#else +#include /* __BYTE_ORDER */ +#endif + +#if !defined(__LITTLE_ENDIAN__) && !defined(__BIG_ENDIAN__) +#if __BYTE_ORDER == __LITTLE_ENDIAN +#define __LITTLE_ENDIAN__ +#elif __BYTE_ORDER == __BIG_ENDIAN +#define __BIG_ENDIAN__ +#elif _WIN32 +#define __LITTLE_ENDIAN__ +#endif +#endif + + +#ifdef __LITTLE_ENDIAN__ + +#ifdef _WIN32 +# if defined(ntohs) +# define _msgpack_be16(x) ntohs(x) +# elif defined(_byteswap_ushort) || (defined(_MSC_VER) && _MSC_VER >= 1400) +# define _msgpack_be16(x) ((uint16_t)_byteswap_ushort((unsigned short)x)) +# else +# define _msgpack_be16(x) ( \ + ((((uint16_t)x) << 8) ) | \ + ((((uint16_t)x) >> 8) ) ) +# endif +#else +# define _msgpack_be16(x) ntohs(x) +#endif + +#ifdef _WIN32 +# if defined(ntohl) +# define _msgpack_be32(x) ntohl(x) +# elif defined(_byteswap_ulong) || (defined(_MSC_VER) && _MSC_VER >= 1400) +# define _msgpack_be32(x) ((uint32_t)_byteswap_ulong((unsigned long)x)) +# else +# define _msgpack_be32(x) \ + ( ((((uint32_t)x) << 24) ) | \ + ((((uint32_t)x) << 8) & 0x00ff0000U ) | \ + ((((uint32_t)x) >> 8) & 0x0000ff00U ) | \ + ((((uint32_t)x) >> 24) ) ) +# endif +#else +# define _msgpack_be32(x) ntohl(x) +#endif + +#if defined(_byteswap_uint64) || (defined(_MSC_VER) && _MSC_VER >= 1400) +# define _msgpack_be64(x) (_byteswap_uint64(x)) +#elif defined(bswap_64) +# define _msgpack_be64(x) bswap_64(x) +#elif defined(__DARWIN_OSSwapInt64) +# define _msgpack_be64(x) __DARWIN_OSSwapInt64(x) +#else +#define _msgpack_be64(x) \ + ( ((((uint64_t)x) << 56) ) | \ + ((((uint64_t)x) << 40) & 0x00ff000000000000ULL ) | \ + ((((uint64_t)x) << 24) & 0x0000ff0000000000ULL ) | \ + ((((uint64_t)x) << 8) & 0x000000ff00000000ULL ) | \ + ((((uint64_t)x) >> 8) & 0x00000000ff000000ULL ) | \ + ((((uint64_t)x) >> 24) & 0x0000000000ff0000ULL ) | \ + ((((uint64_t)x) >> 40) & 0x000000000000ff00ULL ) | \ + ((((uint64_t)x) >> 56) ) ) +#endif + +#define _msgpack_load16(cast, from) ((cast)( \ + (((uint16_t)((uint8_t*)(from))[0]) << 8) | \ + (((uint16_t)((uint8_t*)(from))[1]) ) )) + +#define _msgpack_load32(cast, from) ((cast)( \ + (((uint32_t)((uint8_t*)(from))[0]) << 24) | \ + (((uint32_t)((uint8_t*)(from))[1]) << 16) | \ + (((uint32_t)((uint8_t*)(from))[2]) << 8) | \ + (((uint32_t)((uint8_t*)(from))[3]) ) )) + +#define _msgpack_load64(cast, from) ((cast)( \ + (((uint64_t)((uint8_t*)(from))[0]) << 56) | \ + (((uint64_t)((uint8_t*)(from))[1]) << 48) | \ + (((uint64_t)((uint8_t*)(from))[2]) << 40) | \ + (((uint64_t)((uint8_t*)(from))[3]) << 32) | \ + (((uint64_t)((uint8_t*)(from))[4]) << 24) | \ + (((uint64_t)((uint8_t*)(from))[5]) << 16) | \ + (((uint64_t)((uint8_t*)(from))[6]) << 8) | \ + (((uint64_t)((uint8_t*)(from))[7]) ) )) + +#else + +#define _msgpack_be16(x) (x) +#define _msgpack_be32(x) (x) +#define _msgpack_be64(x) (x) + +#define _msgpack_load16(cast, from) ((cast)( \ + (((uint16_t)((uint8_t*)from)[0]) << 8) | \ + (((uint16_t)((uint8_t*)from)[1]) ) )) + +#define _msgpack_load32(cast, from) ((cast)( \ + (((uint32_t)((uint8_t*)from)[0]) << 24) | \ + (((uint32_t)((uint8_t*)from)[1]) << 16) | \ + (((uint32_t)((uint8_t*)from)[2]) << 8) | \ + (((uint32_t)((uint8_t*)from)[3]) ) )) + +#define _msgpack_load64(cast, from) ((cast)( \ + (((uint64_t)((uint8_t*)from)[0]) << 56) | \ + (((uint64_t)((uint8_t*)from)[1]) << 48) | \ + (((uint64_t)((uint8_t*)from)[2]) << 40) | \ + (((uint64_t)((uint8_t*)from)[3]) << 32) | \ + (((uint64_t)((uint8_t*)from)[4]) << 24) | \ + (((uint64_t)((uint8_t*)from)[5]) << 16) | \ + (((uint64_t)((uint8_t*)from)[6]) << 8) | \ + (((uint64_t)((uint8_t*)from)[7]) ) )) +#endif + + +#define _msgpack_store16(to, num) \ + do { uint16_t val = _msgpack_be16(num); memcpy(to, &val, 2); } while(0) +#define _msgpack_store32(to, num) \ + do { uint32_t val = _msgpack_be32(num); memcpy(to, &val, 4); } while(0) +#define _msgpack_store64(to, num) \ + do { uint64_t val = _msgpack_be64(num); memcpy(to, &val, 8); } while(0) + +/* +#define _msgpack_load16(cast, from) \ + ({ cast val; memcpy(&val, (char*)from, 2); _msgpack_be16(val); }) +#define _msgpack_load32(cast, from) \ + ({ cast val; memcpy(&val, (char*)from, 4); _msgpack_be32(val); }) +#define _msgpack_load64(cast, from) \ + ({ cast val; memcpy(&val, (char*)from, 8); _msgpack_be64(val); }) +*/ + + +#endif /* msgpack/sysdep.h */ diff --git a/src/borg/cache_sync/unpack.h b/src/borg/cache_sync/unpack.h new file mode 100644 index 000000000..0d71a9401 --- /dev/null +++ b/src/borg/cache_sync/unpack.h @@ -0,0 +1,374 @@ +/* + * Borg cache synchronizer, + * based on a MessagePack for Python unpacking routine + * + * Copyright (C) 2009 Naoki INADA + * Copyright (c) 2017 Marian Beermann + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This limits the depth of the structures we can unpack, i.e. how many containers + * are nestable. + */ +#define MSGPACK_EMBED_STACK_SIZE (16) +#include "unpack_define.h" + +// 2**32 - 1025 +#define _MAX_VALUE ( (uint32_t) 4294966271 ) + +#define MIN(x, y) ((x) < (y) ? (x): (y)) + +#ifdef DEBUG +#define set_last_error(msg) \ + fprintf(stderr, "cache_sync parse error: %s\n", (msg)); \ + u->last_error = (msg); +#else +#define set_last_error(msg) \ + u->last_error = (msg); +#endif + +typedef struct unpack_user { + /* Item.chunks is at the top level; we don't care about anything else, + * only need to track the current level to navigate arbitrary and unknown structure. + * To discern keys from everything else on the top level we use expect_map_item_end. + */ + int level; + + const char *last_error; + + HashIndex *chunks; + + /* + * We don't care about most stuff. This flag tells us whether we're at the chunks structure, + * meaning: + * {'foo': 'bar', 'chunks': [...], 'stuff': ... } + * ^-HERE-^ + */ + int inside_chunks; + enum { + /* the next thing is a map key at the Item root level, + * and it might be the "chunks" key we're looking for */ + expect_chunks_map_key, + + /* blocking state to expect_chunks_map_key + * { 'stuff': , 'chunks': [ + * ecmk -> emie -> -> -> -> ecmk ecb eeboce + * (nested containers are tracked via level) + * ecmk=expect_chunks_map_key, emie=expect_map_item_end, ecb=expect_chunks_begin, + * eeboce=expect_entry_begin_or_chunks_end + */ + expect_map_item_end, + + /* next thing must be the chunks array (array) */ + expect_chunks_begin, + + /* next thing must either be another CLE (array) or end of Item.chunks (array_end) */ + expect_entry_begin_or_chunks_end, + + /* + * processing ChunkListEntry tuple: + * expect_key, expect_size, expect_csize, expect_entry_end + */ + /* next thing must be the key (raw, l=32) */ + expect_key, + /* next thing must be the size (int) */ + expect_size, + /* next thing must be the csize (int) */ + expect_csize, + /* next thing must be the end of the CLE (array_end) */ + expect_entry_end, + } expect; + + struct { + char key[32]; + uint32_t csize; + uint32_t size; + } current; +} unpack_user; + +struct unpack_context; +typedef struct unpack_context unpack_context; +typedef int (*execute_fn)(unpack_context *ctx, const char* data, size_t len, size_t* off); + +#define unexpected(what) \ + if(u->inside_chunks || u->expect == expect_chunks_map_key) { \ + set_last_error("Unexpected object: " what); \ + return -1; \ + } + +static inline int unpack_callback_int64(unpack_user* u, int64_t d) +{ + switch(u->expect) { + case expect_size: + u->current.size = d; + u->expect = expect_csize; + break; + case expect_csize: + u->current.csize = d; + u->expect = expect_entry_end; + break; + default: + unexpected("integer"); + } + return 0; +} + +static inline int unpack_callback_uint16(unpack_user* u, uint16_t d) +{ + return unpack_callback_int64(u, d); +} + +static inline int unpack_callback_uint8(unpack_user* u, uint8_t d) +{ + return unpack_callback_int64(u, d); +} + + +static inline int unpack_callback_uint32(unpack_user* u, uint32_t d) +{ + return unpack_callback_int64(u, d); +} + +static inline int unpack_callback_uint64(unpack_user* u, uint64_t d) +{ + return unpack_callback_int64(u, d); +} + +static inline int unpack_callback_int32(unpack_user* u, int32_t d) +{ + return unpack_callback_int64(u, d); +} + +static inline int unpack_callback_int16(unpack_user* u, int16_t d) +{ + return unpack_callback_int64(u, d); +} + +static inline int unpack_callback_int8(unpack_user* u, int8_t d) +{ + return unpack_callback_int64(u, d); +} + +/* Ain't got anything to do with those floats */ +static inline int unpack_callback_double(unpack_user* u, double d) +{ + (void)d; + unexpected("double"); + return 0; +} + +static inline int unpack_callback_float(unpack_user* u, float d) +{ + (void)d; + unexpected("float"); + return 0; +} + +/* nil/true/false — I/don't/care */ +static inline int unpack_callback_nil(unpack_user* u) +{ + unexpected("nil"); + return 0; +} + +static inline int unpack_callback_true(unpack_user* u) +{ + unexpected("true"); + return 0; +} + +static inline int unpack_callback_false(unpack_user* u) +{ + unexpected("false"); + return 0; +} + +static inline int unpack_callback_array(unpack_user* u, unsigned int n) +{ + switch(u->expect) { + case expect_chunks_begin: + /* b'chunks': [ + * ^ */ + u->expect = expect_entry_begin_or_chunks_end; + break; + case expect_entry_begin_or_chunks_end: + /* b'chunks': [ ( + * ^ */ + if(n != 3) { + set_last_error("Invalid chunk list entry length"); + return -1; + } + u->expect = expect_key; + break; + default: + if(u->inside_chunks) { + set_last_error("Unexpected array start"); + return -1; + } else { + u->level++; + return 0; + } + } + return 0; +} + +static inline int unpack_callback_array_item(unpack_user* u, unsigned int current) +{ + (void)u; (void)current; + return 0; +} + +static inline int unpack_callback_array_end(unpack_user* u) +{ + uint32_t *cache_entry; + uint32_t cache_values[3]; + uint64_t refcount; + + switch(u->expect) { + case expect_entry_end: + /* b'chunks': [ ( b'1234...', 123, 345 ) + * ^ */ + cache_entry = (uint32_t*) hashindex_get(u->chunks, u->current.key); + if (cache_entry) { + refcount = _le32toh(cache_entry[0]); + refcount += 1; + cache_entry[0] = _htole32(MIN(refcount, _MAX_VALUE)); + } else { + /* refcount, size, csize */ + cache_values[0] = _htole32(1); + cache_values[1] = _htole32(u->current.size); + cache_values[2] = _htole32(u->current.csize); + if (!hashindex_set(u->chunks, u->current.key, cache_values)) { + set_last_error("hashindex_set failed"); + return -1; + } + } + + u->expect = expect_entry_begin_or_chunks_end; + break; + case expect_entry_begin_or_chunks_end: + /* b'chunks': [ ] + * ^ */ + /* end of Item.chunks */ + u->inside_chunks = 0; + u->expect = expect_map_item_end; + break; + default: + if(u->inside_chunks) { + set_last_error("Invalid state transition (unexpected array end)"); + return -1; + } else { + u->level--; + return 0; + } + } + + return 0; +} + +static inline int unpack_callback_map(unpack_user* u, unsigned int n) +{ + (void)n; + + + if(u->level == 0) { + /* This begins a new Item */ + u->expect = expect_chunks_map_key; + } + + if(u->inside_chunks) { + unexpected("map"); + } + + u->level++; + + return 0; +} + +static inline int unpack_callback_map_item(unpack_user* u, unsigned int current) +{ + (void)u; (void)current; + if(u->level == 1) { + switch(u->expect) { + case expect_map_item_end: + u->expect = expect_chunks_map_key; + break; + default: + set_last_error("Unexpected map item"); + return -1; + } + } + return 0; +} + +static inline int unpack_callback_map_end(unpack_user* u) +{ + u->level--; + if(u->inside_chunks) { + set_last_error("Unexpected map end"); + return -1; + } + return 0; +} + +static inline int unpack_callback_raw(unpack_user* u, const char* b, const char* p, unsigned int l) +{ + /* raw = what Borg uses for binary stuff and strings as well */ + /* Note: p points to an internal buffer which contains l bytes. */ + (void)b; + + switch(u->expect) { + case expect_key: + if(l != 32) { + set_last_error("Incorrect key length"); + return -1; + } + memcpy(u->current.key, p, 32); + u->expect = expect_size; + break; + case expect_chunks_map_key: + if(l == 6 && !memcmp("chunks", p, 6)) { + u->expect = expect_chunks_begin; + u->inside_chunks = 1; + } else { + u->expect = expect_map_item_end; + } + break; + default: + if(u->inside_chunks) { + set_last_error("Unexpected bytes in chunks structure"); + return -1; + } + } + + return 0; +} + +static inline int unpack_callback_bin(unpack_user* u, const char* b, const char* p, unsigned int l) +{ + (void)u; (void)b; (void)p; (void)l; + unexpected("bin"); + return 0; +} + +static inline int unpack_callback_ext(unpack_user* u, const char* base, const char* pos, + unsigned int length) +{ + (void)u; (void)base; (void)pos; (void)length; + unexpected("ext"); + return 0; +} + +#include "unpack_template.h" diff --git a/src/borg/cache_sync/unpack_define.h b/src/borg/cache_sync/unpack_define.h new file mode 100644 index 000000000..d681277bb --- /dev/null +++ b/src/borg/cache_sync/unpack_define.h @@ -0,0 +1,95 @@ +/* + * MessagePack unpacking routine template + * + * Copyright (C) 2008-2010 FURUHASHI Sadayuki + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MSGPACK_UNPACK_DEFINE_H__ +#define MSGPACK_UNPACK_DEFINE_H__ + +#include "sysdep.h" +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + + +#ifndef MSGPACK_EMBED_STACK_SIZE +#define MSGPACK_EMBED_STACK_SIZE 32 +#endif + + +// CS is first byte & 0x1f +typedef enum { + CS_HEADER = 0x00, // nil + + //CS_ = 0x01, + //CS_ = 0x02, // false + //CS_ = 0x03, // true + + CS_BIN_8 = 0x04, + CS_BIN_16 = 0x05, + CS_BIN_32 = 0x06, + + CS_EXT_8 = 0x07, + CS_EXT_16 = 0x08, + CS_EXT_32 = 0x09, + + CS_FLOAT = 0x0a, + CS_DOUBLE = 0x0b, + CS_UINT_8 = 0x0c, + CS_UINT_16 = 0x0d, + CS_UINT_32 = 0x0e, + CS_UINT_64 = 0x0f, + CS_INT_8 = 0x10, + CS_INT_16 = 0x11, + CS_INT_32 = 0x12, + CS_INT_64 = 0x13, + + //CS_FIXEXT1 = 0x14, + //CS_FIXEXT2 = 0x15, + //CS_FIXEXT4 = 0x16, + //CS_FIXEXT8 = 0x17, + //CS_FIXEXT16 = 0x18, + + CS_RAW_8 = 0x19, + CS_RAW_16 = 0x1a, + CS_RAW_32 = 0x1b, + CS_ARRAY_16 = 0x1c, + CS_ARRAY_32 = 0x1d, + CS_MAP_16 = 0x1e, + CS_MAP_32 = 0x1f, + + ACS_RAW_VALUE, + ACS_BIN_VALUE, + ACS_EXT_VALUE, +} msgpack_unpack_state; + + +typedef enum { + CT_ARRAY_ITEM, + CT_MAP_KEY, + CT_MAP_VALUE, +} msgpack_container_type; + + +#ifdef __cplusplus +} +#endif + +#endif /* msgpack/unpack_define.h */ diff --git a/src/borg/cache_sync/unpack_template.h b/src/borg/cache_sync/unpack_template.h new file mode 100644 index 000000000..aa7a4c0bb --- /dev/null +++ b/src/borg/cache_sync/unpack_template.h @@ -0,0 +1,359 @@ +/* + * MessagePack unpacking routine template + * + * Copyright (C) 2008-2010 FURUHASHI Sadayuki + * Copyright (c) 2017 Marian Beermann + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef USE_CASE_RANGE +#if !defined(_MSC_VER) +#define USE_CASE_RANGE +#endif +#endif + +typedef struct unpack_stack { + size_t size; + size_t count; + unsigned int ct; +} unpack_stack; + +struct unpack_context { + unpack_user user; + unsigned int cs; + unsigned int trail; + unsigned int top; + unpack_stack stack[MSGPACK_EMBED_STACK_SIZE]; +}; + +static inline void unpack_init(unpack_context* ctx) +{ + ctx->cs = CS_HEADER; + ctx->trail = 0; + ctx->top = 0; +} + +#define construct 1 + +static inline int unpack_execute(unpack_context* ctx, const char* data, size_t len, size_t* off) +{ + assert(len >= *off); + + const unsigned char* p = (unsigned char*)data + *off; + const unsigned char* const pe = (unsigned char*)data + len; + const void* n = NULL; + + unsigned int trail = ctx->trail; + unsigned int cs = ctx->cs; + unsigned int top = ctx->top; + unpack_stack* stack = ctx->stack; + unpack_user* user = &ctx->user; + + unpack_stack* c = NULL; + + int ret; + +#define construct_cb(name) \ + construct && unpack_callback ## name + +#define push_simple_value(func) \ + if(construct_cb(func)(user) < 0) { goto _failed; } \ + goto _push +#define push_fixed_value(func, arg) \ + if(construct_cb(func)(user, arg) < 0) { goto _failed; } \ + goto _push +#define push_variable_value(func, base, pos, len) \ + if(construct_cb(func)(user, \ + (const char*)base, (const char*)pos, len) < 0) { goto _failed; } \ + goto _push + +#define again_fixed_trail(_cs, trail_len) \ + trail = trail_len; \ + cs = _cs; \ + goto _fixed_trail_again +#define again_fixed_trail_if_zero(_cs, trail_len, ifzero) \ + trail = trail_len; \ + if(trail == 0) { goto ifzero; } \ + cs = _cs; \ + goto _fixed_trail_again + +#define start_container(func, count_, ct_) \ + if(top >= MSGPACK_EMBED_STACK_SIZE) { goto _failed; } /* FIXME */ \ + if(construct_cb(func)(user, count_) < 0) { goto _failed; } \ + if((count_) == 0) { \ + if (construct_cb(func##_end)(user) < 0) { goto _failed; } \ + goto _push; } \ + stack[top].ct = ct_; \ + stack[top].size = count_; \ + stack[top].count = 0; \ + ++top; \ + goto _header_again + +#define NEXT_CS(p) ((unsigned int)*p & 0x1f) + +#ifdef USE_CASE_RANGE +#define SWITCH_RANGE_BEGIN switch(*p) { +#define SWITCH_RANGE(FROM, TO) case FROM ... TO: +#define SWITCH_RANGE_DEFAULT default: +#define SWITCH_RANGE_END } +#else +#define SWITCH_RANGE_BEGIN { if(0) { +#define SWITCH_RANGE(FROM, TO) } else if(FROM <= *p && *p <= TO) { +#define SWITCH_RANGE_DEFAULT } else { +#define SWITCH_RANGE_END } } +#endif + + if(p == pe) { goto _out; } + do { + switch(cs) { + case CS_HEADER: + SWITCH_RANGE_BEGIN + SWITCH_RANGE(0x00, 0x7f) // Positive Fixnum + push_fixed_value(_uint8, *(uint8_t*)p); + SWITCH_RANGE(0xe0, 0xff) // Negative Fixnum + push_fixed_value(_int8, *(int8_t*)p); + SWITCH_RANGE(0xc0, 0xdf) // Variable + switch(*p) { + case 0xc0: // nil + push_simple_value(_nil); + //case 0xc1: // never used + case 0xc2: // false + push_simple_value(_false); + case 0xc3: // true + push_simple_value(_true); + case 0xc4: // bin 8 + again_fixed_trail(NEXT_CS(p), 1); + case 0xc5: // bin 16 + again_fixed_trail(NEXT_CS(p), 2); + case 0xc6: // bin 32 + again_fixed_trail(NEXT_CS(p), 4); + case 0xc7: // ext 8 + again_fixed_trail(NEXT_CS(p), 1); + case 0xc8: // ext 16 + again_fixed_trail(NEXT_CS(p), 2); + case 0xc9: // ext 32 + again_fixed_trail(NEXT_CS(p), 4); + case 0xca: // float + case 0xcb: // double + case 0xcc: // unsigned int 8 + case 0xcd: // unsigned int 16 + case 0xce: // unsigned int 32 + case 0xcf: // unsigned int 64 + case 0xd0: // signed int 8 + case 0xd1: // signed int 16 + case 0xd2: // signed int 32 + case 0xd3: // signed int 64 + again_fixed_trail(NEXT_CS(p), 1 << (((unsigned int)*p) & 0x03)); + case 0xd4: // fixext 1 + case 0xd5: // fixext 2 + case 0xd6: // fixext 4 + case 0xd7: // fixext 8 + again_fixed_trail_if_zero(ACS_EXT_VALUE, + (1 << (((unsigned int)*p) & 0x03))+1, + _ext_zero); + case 0xd8: // fixext 16 + again_fixed_trail_if_zero(ACS_EXT_VALUE, 16+1, _ext_zero); + case 0xd9: // str 8 + again_fixed_trail(NEXT_CS(p), 1); + case 0xda: // raw 16 + case 0xdb: // raw 32 + case 0xdc: // array 16 + case 0xdd: // array 32 + case 0xde: // map 16 + case 0xdf: // map 32 + again_fixed_trail(NEXT_CS(p), 2 << (((unsigned int)*p) & 0x01)); + default: + goto _failed; + } + SWITCH_RANGE(0xa0, 0xbf) // FixRaw + again_fixed_trail_if_zero(ACS_RAW_VALUE, ((unsigned int)*p & 0x1f), _raw_zero); + SWITCH_RANGE(0x90, 0x9f) // FixArray + start_container(_array, ((unsigned int)*p) & 0x0f, CT_ARRAY_ITEM); + SWITCH_RANGE(0x80, 0x8f) // FixMap + start_container(_map, ((unsigned int)*p) & 0x0f, CT_MAP_KEY); + + SWITCH_RANGE_DEFAULT + goto _failed; + SWITCH_RANGE_END + // end CS_HEADER + + + _fixed_trail_again: + ++p; + + default: + if((size_t)(pe - p) < trail) { goto _out; } + n = p; p += trail - 1; + switch(cs) { + case CS_EXT_8: + again_fixed_trail_if_zero(ACS_EXT_VALUE, *(uint8_t*)n+1, _ext_zero); + case CS_EXT_16: + again_fixed_trail_if_zero(ACS_EXT_VALUE, + _msgpack_load16(uint16_t,n)+1, + _ext_zero); + case CS_EXT_32: + again_fixed_trail_if_zero(ACS_EXT_VALUE, + _msgpack_load32(uint32_t,n)+1, + _ext_zero); + case CS_FLOAT: { + union { uint32_t i; float f; } mem; + mem.i = _msgpack_load32(uint32_t,n); + push_fixed_value(_float, mem.f); } + case CS_DOUBLE: { + union { uint64_t i; double f; } mem; + mem.i = _msgpack_load64(uint64_t,n); +#if defined(__arm__) && !(__ARM_EABI__) // arm-oabi + // https://github.com/msgpack/msgpack-perl/pull/1 + mem.i = (mem.i & 0xFFFFFFFFUL) << 32UL | (mem.i >> 32UL); +#endif + push_fixed_value(_double, mem.f); } + case CS_UINT_8: + push_fixed_value(_uint8, *(uint8_t*)n); + case CS_UINT_16: + push_fixed_value(_uint16, _msgpack_load16(uint16_t,n)); + case CS_UINT_32: + push_fixed_value(_uint32, _msgpack_load32(uint32_t,n)); + case CS_UINT_64: + push_fixed_value(_uint64, _msgpack_load64(uint64_t,n)); + + case CS_INT_8: + push_fixed_value(_int8, *(int8_t*)n); + case CS_INT_16: + push_fixed_value(_int16, _msgpack_load16(int16_t,n)); + case CS_INT_32: + push_fixed_value(_int32, _msgpack_load32(int32_t,n)); + case CS_INT_64: + push_fixed_value(_int64, _msgpack_load64(int64_t,n)); + + case CS_BIN_8: + again_fixed_trail_if_zero(ACS_BIN_VALUE, *(uint8_t*)n, _bin_zero); + case CS_BIN_16: + again_fixed_trail_if_zero(ACS_BIN_VALUE, _msgpack_load16(uint16_t,n), _bin_zero); + case CS_BIN_32: + again_fixed_trail_if_zero(ACS_BIN_VALUE, _msgpack_load32(uint32_t,n), _bin_zero); + case ACS_BIN_VALUE: + _bin_zero: + push_variable_value(_bin, data, n, trail); + + case CS_RAW_8: + again_fixed_trail_if_zero(ACS_RAW_VALUE, *(uint8_t*)n, _raw_zero); + case CS_RAW_16: + again_fixed_trail_if_zero(ACS_RAW_VALUE, _msgpack_load16(uint16_t,n), _raw_zero); + case CS_RAW_32: + again_fixed_trail_if_zero(ACS_RAW_VALUE, _msgpack_load32(uint32_t,n), _raw_zero); + case ACS_RAW_VALUE: + _raw_zero: + push_variable_value(_raw, data, n, trail); + + case ACS_EXT_VALUE: + _ext_zero: + push_variable_value(_ext, data, n, trail); + + case CS_ARRAY_16: + start_container(_array, _msgpack_load16(uint16_t,n), CT_ARRAY_ITEM); + case CS_ARRAY_32: + /* FIXME security guard */ + start_container(_array, _msgpack_load32(uint32_t,n), CT_ARRAY_ITEM); + + case CS_MAP_16: + start_container(_map, _msgpack_load16(uint16_t,n), CT_MAP_KEY); + case CS_MAP_32: + /* FIXME security guard */ + start_container(_map, _msgpack_load32(uint32_t,n), CT_MAP_KEY); + + default: + goto _failed; + } + } + +_push: + if(top == 0) { goto _finish; } + c = &stack[top-1]; + switch(c->ct) { + case CT_ARRAY_ITEM: + if(construct_cb(_array_item)(user, c->count) < 0) { goto _failed; } + if(++c->count == c->size) { + if (construct_cb(_array_end)(user) < 0) { goto _failed; } + --top; + /*printf("stack pop %d\n", top);*/ + goto _push; + } + goto _header_again; + case CT_MAP_KEY: + c->ct = CT_MAP_VALUE; + goto _header_again; + case CT_MAP_VALUE: + if(construct_cb(_map_item)(user, c->count) < 0) { goto _failed; } + if(++c->count == c->size) { + if (construct_cb(_map_end)(user) < 0) { goto _failed; } + --top; + /*printf("stack pop %d\n", top);*/ + goto _push; + } + c->ct = CT_MAP_KEY; + goto _header_again; + + default: + goto _failed; + } + +_header_again: + cs = CS_HEADER; + ++p; + } while(p != pe); + goto _out; + + +_finish: + if (!construct) + unpack_callback_nil(user); + ++p; + ret = 1; + /* printf("-- finish --\n"); */ + goto _end; + +_failed: + /* printf("** FAILED **\n"); */ + ret = -1; + goto _end; + +_out: + ret = 0; + goto _end; + +_end: + ctx->cs = cs; + ctx->trail = trail; + ctx->top = top; + *off = p - (const unsigned char*)data; + + return ret; +#undef construct_cb +} + +#undef SWITCH_RANGE_BEGIN +#undef SWITCH_RANGE +#undef SWITCH_RANGE_DEFAULT +#undef SWITCH_RANGE_END +#undef push_simple_value +#undef push_fixed_value +#undef push_variable_value +#undef again_fixed_trail +#undef again_fixed_trail_if_zero +#undef start_container +#undef construct + +#undef NEXT_CS + +/* vim: set ts=4 sw=4 sts=4 expandtab */ diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index 75ba1df35..338683440 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -31,7 +31,7 @@ cdef extern from "_hashindex.c": double HASH_MAX_LOAD -cdef extern from "_cache.c": +cdef extern from "cache_sync/cache_sync.c": ctypedef struct CacheSyncCtx: pass @@ -403,5 +403,5 @@ cdef class CacheSynchronizer: def feed(self, chunk): if not cache_sync_feed(self.sync, chunk, len(chunk)): error = cache_sync_error(self.sync) - if error is not None: - raise Exception('cache_sync_feed failed: ' + error.decode('ascii')) + if error != NULL: + raise ValueError('cache_sync_feed failed: ' + error.decode('ascii')) diff --git a/src/borg/testsuite/cache.py b/src/borg/testsuite/cache.py new file mode 100644 index 000000000..690e50e31 --- /dev/null +++ b/src/borg/testsuite/cache.py @@ -0,0 +1,139 @@ + +from msgpack import packb + +import pytest + +from ..hashindex import ChunkIndex, CacheSynchronizer +from .hashindex import H + + +class TestCacheSynchronizer: + @pytest.fixture + def index(self): + return ChunkIndex() + + @pytest.fixture + def sync(self, index): + return CacheSynchronizer(index) + + def test_no_chunks(self, index, sync): + data = packb({ + 'foo': 'bar', + 'baz': 1234, + 'bar': 5678, + 'user': 'chunks', + 'chunks': [] + }) + sync.feed(data) + assert not len(index) + + def test_simple(self, index, sync): + data = packb({ + 'foo': 'bar', + 'baz': 1234, + 'bar': 5678, + 'user': 'chunks', + 'chunks': [ + (H(1), 1, 2), + (H(2), 2, 3), + ] + }) + sync.feed(data) + assert len(index) == 2 + assert index[H(1)] == (1, 1, 2) + assert index[H(2)] == (1, 2, 3) + + def test_multiple(self, index, sync): + data = packb({ + 'foo': 'bar', + 'baz': 1234, + 'bar': 5678, + 'user': 'chunks', + 'chunks': [ + (H(1), 1, 2), + (H(2), 2, 3), + ] + }) + data += packb({ + 'xattrs': { + 'security.foo': 'bar', + 'chunks': '123456', + }, + 'stuff': [ + (1, 2, 3), + ] + }) + data += packb({ + 'xattrs': { + 'security.foo': 'bar', + 'chunks': '123456', + }, + 'chunks': [ + (H(1), 1, 2), + (H(2), 2, 3), + ], + 'stuff': [ + (1, 2, 3), + ] + }) + data += packb({ + 'chunks': [ + (H(3), 1, 2), + ], + }) + data += packb({ + 'chunks': [ + (H(1), 1, 2), + ], + }) + + part1 = data[:70] + part2 = data[70:120] + part3 = data[120:] + sync.feed(part1) + sync.feed(part2) + sync.feed(part3) + assert len(index) == 3 + assert index[H(1)] == (3, 1, 2) + assert index[H(2)] == (2, 2, 3) + assert index[H(3)] == (1, 1, 2) + + @pytest.mark.parametrize('elem,error', ( + ({1: 2}, 'Unexpected object: map'), + (bytes(213), [ + 'Unexpected bytes in chunks structure', # structure 2/3 + 'Incorrect key length']), # structure 3/3 + (1, 'Unexpected object: integer'), + (1.0, 'Unexpected object: double'), + (True, 'Unexpected object: true'), + (False, 'Unexpected object: false'), + (None, 'Unexpected object: nil'), + )) + @pytest.mark.parametrize('structure', ( + lambda elem: {'chunks': elem}, + lambda elem: {'chunks': [elem]}, + lambda elem: {'chunks': [(elem, 1, 2)]}, + )) + def test_corrupted(self, sync, structure, elem, error): + packed = packb(structure(elem)) + with pytest.raises(ValueError) as excinfo: + sync.feed(packed) + if isinstance(error, str): + error = [error] + possible_errors = ['cache_sync_feed failed: ' + error for error in error] + assert str(excinfo.value) in possible_errors + + @pytest.mark.parametrize('data,error', ( + # Incorrect tuple length + ({'chunks': [(bytes(32), 2, 3, 4)]}, 'Invalid chunk list entry length'), + ({'chunks': [(bytes(32), 2)]}, 'Invalid chunk list entry length'), + # Incorrect types + ({'chunks': [(1, 2, 3)]}, 'Unexpected object: integer'), + ({'chunks': [(1, bytes(32), 2)]}, 'Unexpected object: integer'), + ({'chunks': [(bytes(32), 1.0, 2)]}, 'Unexpected object: double'), + )) + def test_corrupted_ancillary(self, index, sync, data, error): + packed = packb(data) + with pytest.raises(ValueError) as excinfo: + sync.feed(packed) + assert str(excinfo.value) == 'cache_sync_feed failed: ' + error From 835b0e5ee057e07f416817238039b7d29c4de62d Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sun, 28 May 2017 13:16:52 +0200 Subject: [PATCH 06/17] cache sync/remote: compressed, decrypted cache --- src/borg/cache.py | 18 +++++------ src/borg/remote.py | 81 +++++++++++++++++++++++++++++++++++----------- 2 files changed, 70 insertions(+), 29 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index 934f12a78..9fb0c540c 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -564,17 +564,15 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" except FileNotFoundError: pass - def fetch_and_build_idx(archive_id, repository, key, chunk_idx): - cdata = repository.get(archive_id) - data = key.decrypt(archive_id, cdata) - chunk_idx.add(archive_id, 1, len(data), len(cdata)) + def fetch_and_build_idx(archive_id, decrypted_repository, key, chunk_idx): + csize, data = decrypted_repository.get(archive_id) + chunk_idx.add(archive_id, 1, len(data), csize) archive = ArchiveItem(internal_dict=msgpack.unpackb(data)) if archive.version != 1: raise Exception('Unknown archive metadata version') sync = CacheSynchronizer(chunk_idx) - for item_id, chunk in zip(archive.items, repository.get_many(archive.items)): - data = key.decrypt(item_id, chunk) - chunk_idx.add(item_id, 1, len(data), len(chunk)) + for item_id, (csize, data) in zip(archive.items, decrypted_repository.get_many(archive.items)): + chunk_idx.add(item_id, 1, len(data), csize) sync.feed(data) if self.do_cache: fn = mkpath(archive_id) @@ -641,7 +639,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" # above can remove *archive_id* from *cached_ids*. logger.info('Fetching and building archive index for %s ...', archive_name) archive_chunk_idx = ChunkIndex() - fetch_and_build_idx(archive_id, repository, self.key, archive_chunk_idx) + fetch_and_build_idx(archive_id, decrypted_repository, self.key, archive_chunk_idx) logger.info("Merging into master chunks index ...") if chunk_idx is None: # we just use the first archive's idx as starting point, @@ -653,7 +651,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" else: chunk_idx = chunk_idx or ChunkIndex(master_index_capacity) logger.info('Fetching archive index for %s ...', archive_name) - fetch_and_build_idx(archive_id, repository, self.key, chunk_idx) + fetch_and_build_idx(archive_id, decrypted_repository, self.key, chunk_idx) if self.progress: pi.finish() logger.info('Done.') @@ -675,7 +673,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" pass self.begin_txn() - with cache_if_remote(self.repository) as repository: + with cache_if_remote(self.repository, decrypted_cache=self.key) as decrypted_repository: legacy_cleanup() # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d - # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk) diff --git a/src/borg/remote.py b/src/borg/remote.py index c991dbefb..daad2c251 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -8,6 +8,7 @@ import os import select import shlex import shutil +import struct import sys import tempfile import textwrap @@ -18,6 +19,7 @@ from subprocess import Popen, PIPE import msgpack from . import __version__ +from .compress import LZ4 from .helpers import Error, IntegrityError from .helpers import bin_to_hex from .helpers import get_home_dir @@ -1046,9 +1048,14 @@ class RepositoryNoCache: """A not caching Repository wrapper, passes through to repository. Just to have same API (including the context manager) as RepositoryCache. + + *transform* is a callable taking two arguments, key and raw repository data. + The return value is returned from get()/get_many(). By default, the raw + repository data is returned. """ - def __init__(self, repository): + def __init__(self, repository, transform=None): self.repository = repository + self.transform = transform or (lambda key, data: data) def close(self): pass @@ -1063,8 +1070,8 @@ class RepositoryNoCache: return next(self.get_many([key], cache=False)) def get_many(self, keys, cache=True): - for data in self.repository.get_many(keys): - yield data + for key, data in zip(keys, self.repository.get_many(keys)): + yield self.transform(key, data) class RepositoryCache(RepositoryNoCache): @@ -1072,10 +1079,17 @@ class RepositoryCache(RepositoryNoCache): A caching Repository wrapper. Caches Repository GET operations locally. + + *pack* and *unpack* complement *transform* of the base class. + *pack* receives the output of *transform* and should return bytes, + which are stored in the cache. *unpack* receives these bytes and + should return the initial data (as returned by *transform*). """ - def __init__(self, repository): - super().__init__(repository) + def __init__(self, repository, pack=None, unpack=None, transform=None): + super().__init__(repository, transform) + self.pack = pack or (lambda data: data) + self.unpack = unpack or (lambda data: data) self.cache = set() self.basedir = tempfile.mkdtemp(prefix='borg-cache-') self.query_size_limit() @@ -1106,11 +1120,15 @@ class RepositoryCache(RepositoryNoCache): os.unlink(file) self.evictions += 1 - def add_entry(self, key, data): + def add_entry(self, key, data, cache): + transformed = self.transform(key, data) + if not cache: + return transformed + packed = self.pack(transformed) file = self.key_filename(key) try: with open(file, 'wb') as fd: - fd.write(data) + fd.write(packed) except OSError as os_error: if os_error.errno == errno.ENOSPC: self.enospc += 1 @@ -1118,11 +1136,11 @@ class RepositoryCache(RepositoryNoCache): else: raise else: - self.size += len(data) + self.size += len(packed) self.cache.add(key) if self.size > self.size_limit: self.backoff() - return data + return transformed def close(self): logger.debug('RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), ' @@ -1141,31 +1159,56 @@ class RepositoryCache(RepositoryNoCache): file = self.key_filename(key) with open(file, 'rb') as fd: self.hits += 1 - yield fd.read() + yield self.unpack(fd.read()) else: for key_, data in repository_iterator: if key_ == key: - if cache: - self.add_entry(key, data) + transformed = self.add_entry(key, data, cache) self.misses += 1 - yield data + yield transformed break else: # slow path: eviction during this get_many removed this key from the cache t0 = time.perf_counter() data = self.repository.get(key) self.slow_lat += time.perf_counter() - t0 - if cache: - self.add_entry(key, data) + transformed = self.add_entry(key, data, cache) self.slow_misses += 1 - yield data + yield transformed # Consume any pending requests for _ in repository_iterator: pass -def cache_if_remote(repository): +def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None): + """ + Return a Repository(No)Cache for *repository*. + + If *decrypted_cache* is a key object, then get and get_many will return a tuple + (csize, plaintext) instead of the actual data in the repository. The cache will + store decrypted data, which increases CPU efficiency (by avoiding repeatedly decrypting + and more importantly MAC and ID checking cached objects). + Internally, objects are compressed with LZ4. + """ + if decrypted_cache and (pack or unpack or transform): + raise ValueError('decrypted_cache and pack/unpack/transform are incompatible') + elif decrypted_cache: + key = decrypted_cache + cache_struct = struct.Struct('=I') + compressor = LZ4() + + def pack(data): + return cache_struct.pack(data[0]) + compressor.compress(data[1]) + + def unpack(data): + return cache_struct.unpack(data[:cache_struct.size])[0], compressor.decompress(data[cache_struct.size:]) + + def transform(id_, data): + csize = len(data) + decrypted = key.decrypt(id_, data) + return csize, decrypted + if isinstance(repository, RemoteRepository): - return RepositoryCache(repository) + return RepositoryCache(repository, pack, unpack, transform) else: - return RepositoryNoCache(repository) + return RepositoryNoCache(repository, transform) From 7f04e00ba23157dee8910cc35362098dc98e63df Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sun, 28 May 2017 15:33:36 +0200 Subject: [PATCH 07/17] testsuite: add TestRepositoryCache --- src/borg/testsuite/remote.py | 91 +++++++++++++++++++++++++++++++++++- 1 file changed, 90 insertions(+), 1 deletion(-) diff --git a/src/borg/testsuite/remote.py b/src/borg/testsuite/remote.py index b9eddabd6..419463bea 100644 --- a/src/borg/testsuite/remote.py +++ b/src/borg/testsuite/remote.py @@ -1,9 +1,13 @@ +import errno import os import time +from unittest.mock import patch import pytest -from ..remote import SleepingBandwidthLimiter +from ..remote import SleepingBandwidthLimiter, RepositoryCache +from ..repository import Repository +from .hashindex import H class TestSleepingBandwidthLimiter: @@ -58,3 +62,88 @@ class TestSleepingBandwidthLimiter: now += 10 self.expect_write(5, b"1") it.write(5, b"1") + + +class TestRepositoryCache: + @pytest.yield_fixture + def repository(self, tmpdir): + self.repository_location = os.path.join(str(tmpdir), 'repository') + with Repository(self.repository_location, exclusive=True, create=True) as repository: + repository.put(H(1), b'1234') + repository.put(H(2), b'5678') + repository.put(H(3), bytes(100)) + yield repository + + @pytest.fixture + def cache(self, repository): + return RepositoryCache(repository) + + def test_simple(self, cache: RepositoryCache): + # Single get()s are not cached, since they are used for unique objects like archives. + assert cache.get(H(1)) == b'1234' + assert cache.misses == 1 + assert cache.hits == 0 + + assert list(cache.get_many([H(1)])) == [b'1234'] + assert cache.misses == 2 + assert cache.hits == 0 + + assert list(cache.get_many([H(1)])) == [b'1234'] + assert cache.misses == 2 + assert cache.hits == 1 + + assert cache.get(H(1)) == b'1234' + assert cache.misses == 2 + assert cache.hits == 2 + + def test_backoff(self, cache: RepositoryCache): + def query_size_limit(): + cache.size_limit = 0 + + assert list(cache.get_many([H(1), H(2)])) == [b'1234', b'5678'] + assert cache.misses == 2 + assert cache.evictions == 0 + iterator = cache.get_many([H(1), H(3), H(2)]) + assert next(iterator) == b'1234' + + # Force cache to back off + qsl = cache.query_size_limit + cache.query_size_limit = query_size_limit + cache.backoff() + cache.query_size_limit = qsl + # Evicted H(1) and H(2) + assert cache.evictions == 2 + assert H(1) not in cache.cache + assert H(2) not in cache.cache + assert next(iterator) == bytes(100) + assert cache.slow_misses == 0 + # Since H(2) was in the cache when we called get_many(), but has + # been evicted during iterating the generator, it will be a slow miss. + assert next(iterator) == b'5678' + assert cache.slow_misses == 1 + + def test_enospc(self, cache: RepositoryCache): + class enospc_open: + def __init__(self, *args): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def write(self, data): + raise OSError(errno.ENOSPC, 'foo') + + iterator = cache.get_many([H(1), H(2), H(3)]) + assert next(iterator) == b'1234' + + with patch('builtins.open', enospc_open): + assert next(iterator) == b'5678' + assert cache.enospc == 1 + # We didn't patch query_size_limit which would set size_limit to some low + # value, so nothing was actually evicted. + assert cache.evictions == 0 + + assert next(iterator) == bytes(100) From 67b97f22231638f1a0276c54b14fab0319742c54 Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Wed, 31 May 2017 20:46:57 +0200 Subject: [PATCH 08/17] cache sync: cleanup progress handling, unused parameters --- src/borg/cache.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index 9fb0c540c..7a9fce029 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -564,7 +564,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" except FileNotFoundError: pass - def fetch_and_build_idx(archive_id, decrypted_repository, key, chunk_idx): + def fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx): csize, data = decrypted_repository.get(archive_id) chunk_idx.add(archive_id, 1, len(data), csize) archive = ArchiveItem(internal_dict=msgpack.unpackb(data)) @@ -595,6 +595,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" for info in self.manifest.archives.list(): if info.id in archive_ids: archive_names[info.id] = info.name + assert len(archive_names) == len(archive_ids) return archive_names def create_master_idx(chunk_idx): @@ -612,15 +613,12 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" master_index_capacity = int(len(self.repository) / ChunkIndex.MAX_LOAD_FACTOR) if archive_ids: chunk_idx = None - if self.progress: - pi = ProgressIndicatorPercent(total=len(archive_ids), step=0.1, - msg='%3.0f%% Syncing chunks cache. Processing archive %s', - msgid='cache.sync') + pi = ProgressIndicatorPercent(total=len(archive_ids), step=0.1, + msg='%3.0f%% Syncing chunks cache. Processing archive %s', + msgid='cache.sync') archive_ids_to_names = get_archive_ids_to_names(archive_ids) - for archive_id in archive_ids: - archive_name = archive_ids_to_names.pop(archive_id) - if self.progress: - pi.show(info=[remove_surrogates(archive_name)]) + for archive_id, archive_name in archive_ids_to_names.items(): + pi.show(info=[remove_surrogates(archive_name)]) if self.do_cache: if archive_id in cached_ids: archive_chunk_idx_path = mkpath(archive_id) @@ -639,7 +637,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" # above can remove *archive_id* from *cached_ids*. logger.info('Fetching and building archive index for %s ...', archive_name) archive_chunk_idx = ChunkIndex() - fetch_and_build_idx(archive_id, decrypted_repository, self.key, archive_chunk_idx) + fetch_and_build_idx(archive_id, decrypted_repository, archive_chunk_idx) logger.info("Merging into master chunks index ...") if chunk_idx is None: # we just use the first archive's idx as starting point, @@ -651,9 +649,8 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" else: chunk_idx = chunk_idx or ChunkIndex(master_index_capacity) logger.info('Fetching archive index for %s ...', archive_name) - fetch_and_build_idx(archive_id, decrypted_repository, self.key, chunk_idx) - if self.progress: - pi.finish() + fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx) + pi.finish() logger.info('Done.') return chunk_idx From cb98cb838d2b73387fa4a311734a2b3c842f6e17 Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Wed, 31 May 2017 20:47:16 +0200 Subject: [PATCH 09/17] fuse: fix read(2) caching data in metadata cache The OS page cache is responsible for handling this and is much more empowered to do a good job at that than Borg. --- src/borg/fuse.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/borg/fuse.py b/src/borg/fuse.py index 28e09689d..4441c4063 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -340,7 +340,7 @@ class FuseOperations(llfuse.Operations): # evict fully read chunk from cache del self.data_cache[id] else: - data = self.key.decrypt(id, self.repository.get(id)) + data = self.key.decrypt(id, self.repository_uncached.get(id)) if offset + n < len(data): # chunk was only partially read, cache it self.data_cache[id] = data From 795cdfc9abadb7ec4a93e8a01dae7bec1832dcc7 Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Fri, 2 Jun 2017 19:28:50 +0200 Subject: [PATCH 10/17] cache sync: move stat initialization to main unpack --- src/borg/cache_sync/cache_sync.c | 5 +---- src/borg/cache_sync/unpack.h | 15 ++++++++++++++- src/borg/cache_sync/unpack_template.h | 1 + 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/borg/cache_sync/cache_sync.c b/src/borg/cache_sync/cache_sync.c index 174e4b909..e0c2e0fb8 100644 --- a/src/borg/cache_sync/cache_sync.c +++ b/src/borg/cache_sync/cache_sync.c @@ -19,11 +19,8 @@ cache_sync_init(HashIndex *chunks) } unpack_init(&ctx->ctx); - + /* needs to be set only once */ ctx->ctx.user.chunks = chunks; - ctx->ctx.user.last_error = NULL; - ctx->ctx.user.level = 0; - ctx->ctx.user.inside_chunks = false; ctx->buf = NULL; ctx->head = 0; ctx->tail = 0; diff --git a/src/borg/cache_sync/unpack.h b/src/borg/cache_sync/unpack.h index 0d71a9401..62e775f46 100644 --- a/src/borg/cache_sync/unpack.h +++ b/src/borg/cache_sync/unpack.h @@ -89,6 +89,8 @@ typedef struct unpack_user { expect_csize, /* next thing must be the end of the CLE (array_end) */ expect_entry_end, + + expect_item_begin } expect; struct { @@ -108,6 +110,14 @@ typedef int (*execute_fn)(unpack_context *ctx, const char* data, size_t len, siz return -1; \ } +static inline void unpack_init_user_state(unpack_user *u) +{ + u->last_error = NULL; + u->level = 0; + u->inside_chunks = false; + u->expect = expect_item_begin; +} + static inline int unpack_callback_int64(unpack_user* u, int64_t d) { switch(u->expect) { @@ -282,8 +292,11 @@ static inline int unpack_callback_map(unpack_user* u, unsigned int n) { (void)n; - if(u->level == 0) { + if(u->expect != expect_item_begin) { + set_last_error("Invalid state transition"); /* unreachable */ + return -1; + } /* This begins a new Item */ u->expect = expect_chunks_map_key; } diff --git a/src/borg/cache_sync/unpack_template.h b/src/borg/cache_sync/unpack_template.h index aa7a4c0bb..1ac262740 100644 --- a/src/borg/cache_sync/unpack_template.h +++ b/src/borg/cache_sync/unpack_template.h @@ -42,6 +42,7 @@ static inline void unpack_init(unpack_context* ctx) ctx->cs = CS_HEADER; ctx->trail = 0; ctx->top = 0; + unpack_init_user_state(&ctx->user); } #define construct 1 From 5b3667b61760696f5cca985dc556bbb551997aeb Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Fri, 2 Jun 2017 19:31:56 +0200 Subject: [PATCH 11/17] cache sync: macros in all-caps --- src/borg/cache_sync/unpack.h | 44 ++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/borg/cache_sync/unpack.h b/src/borg/cache_sync/unpack.h index 62e775f46..a48559ffd 100644 --- a/src/borg/cache_sync/unpack.h +++ b/src/borg/cache_sync/unpack.h @@ -31,11 +31,11 @@ #define MIN(x, y) ((x) < (y) ? (x): (y)) #ifdef DEBUG -#define set_last_error(msg) \ +#define SET_LAST_ERROR(msg) \ fprintf(stderr, "cache_sync parse error: %s\n", (msg)); \ u->last_error = (msg); #else -#define set_last_error(msg) \ +#define SET_LAST_ERROR(msg) \ u->last_error = (msg); #endif @@ -104,9 +104,9 @@ struct unpack_context; typedef struct unpack_context unpack_context; typedef int (*execute_fn)(unpack_context *ctx, const char* data, size_t len, size_t* off); -#define unexpected(what) \ +#define UNEXPECTED(what) \ if(u->inside_chunks || u->expect == expect_chunks_map_key) { \ - set_last_error("Unexpected object: " what); \ + SET_LAST_ERROR("Unexpected object: " what); \ return -1; \ } @@ -130,7 +130,7 @@ static inline int unpack_callback_int64(unpack_user* u, int64_t d) u->expect = expect_entry_end; break; default: - unexpected("integer"); + UNEXPECTED("integer"); } return 0; } @@ -175,33 +175,33 @@ static inline int unpack_callback_int8(unpack_user* u, int8_t d) static inline int unpack_callback_double(unpack_user* u, double d) { (void)d; - unexpected("double"); + UNEXPECTED("double"); return 0; } static inline int unpack_callback_float(unpack_user* u, float d) { (void)d; - unexpected("float"); + UNEXPECTED("float"); return 0; } /* nil/true/false — I/don't/care */ static inline int unpack_callback_nil(unpack_user* u) { - unexpected("nil"); + UNEXPECTED("nil"); return 0; } static inline int unpack_callback_true(unpack_user* u) { - unexpected("true"); + UNEXPECTED("true"); return 0; } static inline int unpack_callback_false(unpack_user* u) { - unexpected("false"); + UNEXPECTED("false"); return 0; } @@ -217,14 +217,14 @@ static inline int unpack_callback_array(unpack_user* u, unsigned int n) /* b'chunks': [ ( * ^ */ if(n != 3) { - set_last_error("Invalid chunk list entry length"); + SET_LAST_ERROR("Invalid chunk list entry length"); return -1; } u->expect = expect_key; break; default: if(u->inside_chunks) { - set_last_error("Unexpected array start"); + SET_LAST_ERROR("Unexpected array start"); return -1; } else { u->level++; @@ -261,7 +261,7 @@ static inline int unpack_callback_array_end(unpack_user* u) cache_values[1] = _htole32(u->current.size); cache_values[2] = _htole32(u->current.csize); if (!hashindex_set(u->chunks, u->current.key, cache_values)) { - set_last_error("hashindex_set failed"); + SET_LAST_ERROR("hashindex_set failed"); return -1; } } @@ -277,7 +277,7 @@ static inline int unpack_callback_array_end(unpack_user* u) break; default: if(u->inside_chunks) { - set_last_error("Invalid state transition (unexpected array end)"); + SET_LAST_ERROR("Invalid state transition (unexpected array end)"); return -1; } else { u->level--; @@ -294,7 +294,7 @@ static inline int unpack_callback_map(unpack_user* u, unsigned int n) if(u->level == 0) { if(u->expect != expect_item_begin) { - set_last_error("Invalid state transition"); /* unreachable */ + SET_LAST_ERROR("Invalid state transition"); /* unreachable */ return -1; } /* This begins a new Item */ @@ -302,7 +302,7 @@ static inline int unpack_callback_map(unpack_user* u, unsigned int n) } if(u->inside_chunks) { - unexpected("map"); + UNEXPECTED("map"); } u->level++; @@ -319,7 +319,7 @@ static inline int unpack_callback_map_item(unpack_user* u, unsigned int current) u->expect = expect_chunks_map_key; break; default: - set_last_error("Unexpected map item"); + SET_LAST_ERROR("Unexpected map item"); return -1; } } @@ -330,7 +330,7 @@ static inline int unpack_callback_map_end(unpack_user* u) { u->level--; if(u->inside_chunks) { - set_last_error("Unexpected map end"); + SET_LAST_ERROR("Unexpected map end"); return -1; } return 0; @@ -345,7 +345,7 @@ static inline int unpack_callback_raw(unpack_user* u, const char* b, const char* switch(u->expect) { case expect_key: if(l != 32) { - set_last_error("Incorrect key length"); + SET_LAST_ERROR("Incorrect key length"); return -1; } memcpy(u->current.key, p, 32); @@ -361,7 +361,7 @@ static inline int unpack_callback_raw(unpack_user* u, const char* b, const char* break; default: if(u->inside_chunks) { - set_last_error("Unexpected bytes in chunks structure"); + SET_LAST_ERROR("Unexpected bytes in chunks structure"); return -1; } } @@ -372,7 +372,7 @@ static inline int unpack_callback_raw(unpack_user* u, const char* b, const char* static inline int unpack_callback_bin(unpack_user* u, const char* b, const char* p, unsigned int l) { (void)u; (void)b; (void)p; (void)l; - unexpected("bin"); + UNEXPECTED("bin"); return 0; } @@ -380,7 +380,7 @@ static inline int unpack_callback_ext(unpack_user* u, const char* base, const ch unsigned int length) { (void)u; (void)base; (void)pos; (void)length; - unexpected("ext"); + UNEXPECTED("ext"); return 0; } From b544af2af15318d3a7473dd2d4168e32abf4779f Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sat, 3 Jun 2017 12:14:17 +0200 Subject: [PATCH 12/17] RepositoryCache: checksum decrypted cache --- src/borg/remote.py | 67 +++++++++++++++++++++++------------- src/borg/testsuite/remote.py | 54 ++++++++++++++++++++++++++++- 2 files changed, 97 insertions(+), 24 deletions(-) diff --git a/src/borg/remote.py b/src/borg/remote.py index daad2c251..d2cf2a437 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -30,6 +30,7 @@ from .helpers import format_file_size from .logger import create_logger, setup_logging from .repository import Repository, MAX_OBJECT_SIZE, LIST_SCAN_LIMIT from .version import parse_version, format_version +from .algorithms.checksums import xxh64 logger = create_logger(__name__) @@ -1086,6 +1087,9 @@ class RepositoryCache(RepositoryNoCache): should return the initial data (as returned by *transform*). """ + class InvalidateCacheEntry(Exception): + pass + def __init__(self, repository, pack=None, unpack=None, transform=None): super().__init__(repository, transform) self.pack = pack or (lambda data: data) @@ -1100,6 +1104,7 @@ class RepositoryCache(RepositoryNoCache): self.slow_misses = 0 self.slow_lat = 0.0 self.evictions = 0 + self.checksum_errors = 0 self.enospc = 0 def query_size_limit(self): @@ -1144,10 +1149,10 @@ class RepositoryCache(RepositoryNoCache): def close(self): logger.debug('RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), ' - '%d evictions, %d ENOSPC hit', + '%d evictions, %d ENOSPC hit, %d checksum errors', len(self.cache), format_file_size(self.size), format_file_size(self.size_limit), self.hits, self.misses, self.slow_misses, self.slow_lat, - self.evictions, self.enospc) + self.evictions, self.enospc, self.checksum_errors) self.cache.clear() shutil.rmtree(self.basedir) @@ -1157,30 +1162,37 @@ class RepositoryCache(RepositoryNoCache): for key in keys: if key in self.cache: file = self.key_filename(key) - with open(file, 'rb') as fd: - self.hits += 1 - yield self.unpack(fd.read()) - else: - for key_, data in repository_iterator: - if key_ == key: - transformed = self.add_entry(key, data, cache) - self.misses += 1 - yield transformed - break - else: - # slow path: eviction during this get_many removed this key from the cache - t0 = time.perf_counter() - data = self.repository.get(key) - self.slow_lat += time.perf_counter() - t0 + try: + with open(file, 'rb') as fd: + self.hits += 1 + yield self.unpack(fd.read()) + continue # go to the next key + except self.InvalidateCacheEntry: + self.cache.remove(key) + self.size -= os.stat(file).st_size + self.checksum_errors += 1 + os.unlink(file) + # fall through to fetch the object again + for key_, data in repository_iterator: + if key_ == key: transformed = self.add_entry(key, data, cache) - self.slow_misses += 1 + self.misses += 1 yield transformed + break + else: + # slow path: eviction during this get_many removed this key from the cache + t0 = time.perf_counter() + data = self.repository.get(key) + self.slow_lat += time.perf_counter() - t0 + transformed = self.add_entry(key, data, cache) + self.slow_misses += 1 + yield transformed # Consume any pending requests for _ in repository_iterator: pass -def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None): +def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None, force_cache=False): """ Return a Repository(No)Cache for *repository*. @@ -1194,21 +1206,30 @@ def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None raise ValueError('decrypted_cache and pack/unpack/transform are incompatible') elif decrypted_cache: key = decrypted_cache - cache_struct = struct.Struct('=I') + # 32 bit csize, 64 bit (8 byte) xxh64 + cache_struct = struct.Struct('=I8s') compressor = LZ4() def pack(data): - return cache_struct.pack(data[0]) + compressor.compress(data[1]) + csize, decrypted = data + compressed = compressor.compress(decrypted) + return cache_struct.pack(csize, xxh64(compressed)) + compressed def unpack(data): - return cache_struct.unpack(data[:cache_struct.size])[0], compressor.decompress(data[cache_struct.size:]) + data = memoryview(data) + csize, checksum = cache_struct.unpack(data[:cache_struct.size]) + compressed = data[cache_struct.size:] + if checksum != xxh64(compressed): + logger.warning('Repository metadata cache: detected corrupted data in cache!') + raise RepositoryCache.InvalidateCacheEntry + return csize, compressor.decompress(compressed) def transform(id_, data): csize = len(data) decrypted = key.decrypt(id_, data) return csize, decrypted - if isinstance(repository, RemoteRepository): + if isinstance(repository, RemoteRepository) or force_cache: return RepositoryCache(repository, pack, unpack, transform) else: return RepositoryNoCache(repository, transform) diff --git a/src/borg/testsuite/remote.py b/src/borg/testsuite/remote.py index 419463bea..681e82cf7 100644 --- a/src/borg/testsuite/remote.py +++ b/src/borg/testsuite/remote.py @@ -1,13 +1,17 @@ import errno import os +import io import time from unittest.mock import patch import pytest -from ..remote import SleepingBandwidthLimiter, RepositoryCache +from ..remote import SleepingBandwidthLimiter, RepositoryCache, cache_if_remote from ..repository import Repository +from ..crypto.key import PlaintextKey +from ..compress import CompressionSpec from .hashindex import H +from .key import TestKey class TestSleepingBandwidthLimiter: @@ -147,3 +151,51 @@ class TestRepositoryCache: assert cache.evictions == 0 assert next(iterator) == bytes(100) + + @pytest.fixture + def key(self, repository, monkeypatch): + monkeypatch.setenv('BORG_PASSPHRASE', 'test') + key = PlaintextKey.create(repository, TestKey.MockArgs()) + key.compressor = CompressionSpec('none').compressor + return key + + def _put_encrypted_object(self, key, repository, data): + id_ = key.id_hash(data) + repository.put(id_, key.encrypt(data)) + return id_ + + @pytest.fixture + def H1(self, key, repository): + return self._put_encrypted_object(key, repository, b'1234') + + @pytest.fixture + def H2(self, key, repository): + return self._put_encrypted_object(key, repository, b'5678') + + @pytest.fixture + def H3(self, key, repository): + return self._put_encrypted_object(key, repository, bytes(100)) + + @pytest.fixture + def decrypted_cache(self, key, repository): + return cache_if_remote(repository, decrypted_cache=key, force_cache=True) + + def test_cache_corruption(self, decrypted_cache: RepositoryCache, H1, H2, H3): + list(decrypted_cache.get_many([H1, H2, H3])) + + iterator = decrypted_cache.get_many([H1, H2, H3]) + assert next(iterator) == (7, b'1234') + + with open(decrypted_cache.key_filename(H2), 'a+b') as fd: + fd.seek(-1, io.SEEK_END) + corrupted = (int.from_bytes(fd.read(), 'little') ^ 2).to_bytes(1, 'little') + fd.seek(-1, io.SEEK_END) + fd.write(corrupted) + fd.truncate() + + assert next(iterator) == (7, b'5678') + assert decrypted_cache.checksum_errors == 1 + assert decrypted_cache.slow_misses == 1 + assert next(iterator) == (103, bytes(100)) + assert decrypted_cache.hits == 3 + assert decrypted_cache.misses == 3 From 4faaa7d1fa798556a780cb6e9a8f41b8ec106b5a Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sat, 3 Jun 2017 12:27:35 +0200 Subject: [PATCH 13/17] RepositoryCache: abort on data corruption --- src/borg/remote.py | 52 ++++++++++++++---------------------- src/borg/testsuite/remote.py | 9 +++---- 2 files changed, 23 insertions(+), 38 deletions(-) diff --git a/src/borg/remote.py b/src/borg/remote.py index d2cf2a437..63b5e817a 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -1087,9 +1087,6 @@ class RepositoryCache(RepositoryNoCache): should return the initial data (as returned by *transform*). """ - class InvalidateCacheEntry(Exception): - pass - def __init__(self, repository, pack=None, unpack=None, transform=None): super().__init__(repository, transform) self.pack = pack or (lambda data: data) @@ -1104,7 +1101,6 @@ class RepositoryCache(RepositoryNoCache): self.slow_misses = 0 self.slow_lat = 0.0 self.evictions = 0 - self.checksum_errors = 0 self.enospc = 0 def query_size_limit(self): @@ -1149,10 +1145,10 @@ class RepositoryCache(RepositoryNoCache): def close(self): logger.debug('RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), ' - '%d evictions, %d ENOSPC hit, %d checksum errors', + '%d evictions, %d ENOSPC hit', len(self.cache), format_file_size(self.size), format_file_size(self.size_limit), self.hits, self.misses, self.slow_misses, self.slow_lat, - self.evictions, self.enospc, self.checksum_errors) + self.evictions, self.enospc) self.cache.clear() shutil.rmtree(self.basedir) @@ -1162,31 +1158,24 @@ class RepositoryCache(RepositoryNoCache): for key in keys: if key in self.cache: file = self.key_filename(key) - try: - with open(file, 'rb') as fd: - self.hits += 1 - yield self.unpack(fd.read()) - continue # go to the next key - except self.InvalidateCacheEntry: - self.cache.remove(key) - self.size -= os.stat(file).st_size - self.checksum_errors += 1 - os.unlink(file) - # fall through to fetch the object again - for key_, data in repository_iterator: - if key_ == key: - transformed = self.add_entry(key, data, cache) - self.misses += 1 - yield transformed - break + with open(file, 'rb') as fd: + self.hits += 1 + yield self.unpack(fd.read()) else: - # slow path: eviction during this get_many removed this key from the cache - t0 = time.perf_counter() - data = self.repository.get(key) - self.slow_lat += time.perf_counter() - t0 - transformed = self.add_entry(key, data, cache) - self.slow_misses += 1 - yield transformed + for key_, data in repository_iterator: + if key_ == key: + transformed = self.add_entry(key, data, cache) + self.misses += 1 + yield transformed + break + else: + # slow path: eviction during this get_many removed this key from the cache + t0 = time.perf_counter() + data = self.repository.get(key) + self.slow_lat += time.perf_counter() - t0 + transformed = self.add_entry(key, data, cache) + self.slow_misses += 1 + yield transformed # Consume any pending requests for _ in repository_iterator: pass @@ -1220,8 +1209,7 @@ def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None csize, checksum = cache_struct.unpack(data[:cache_struct.size]) compressed = data[cache_struct.size:] if checksum != xxh64(compressed): - logger.warning('Repository metadata cache: detected corrupted data in cache!') - raise RepositoryCache.InvalidateCacheEntry + raise IntegrityError('detected corrupted data in metadata cache') return csize, compressor.decompress(compressed) def transform(id_, data): diff --git a/src/borg/testsuite/remote.py b/src/borg/testsuite/remote.py index 681e82cf7..dccfdaffb 100644 --- a/src/borg/testsuite/remote.py +++ b/src/borg/testsuite/remote.py @@ -10,6 +10,7 @@ from ..remote import SleepingBandwidthLimiter, RepositoryCache, cache_if_remote from ..repository import Repository from ..crypto.key import PlaintextKey from ..compress import CompressionSpec +from ..helpers import IntegrityError from .hashindex import H from .key import TestKey @@ -193,9 +194,5 @@ class TestRepositoryCache: fd.write(corrupted) fd.truncate() - assert next(iterator) == (7, b'5678') - assert decrypted_cache.checksum_errors == 1 - assert decrypted_cache.slow_misses == 1 - assert next(iterator) == (103, bytes(100)) - assert decrypted_cache.hits == 3 - assert decrypted_cache.misses == 3 + with pytest.raises(IntegrityError): + assert next(iterator) == (7, b'5678') From 5af66dbb126043e5b861b4f1df21a7762f93332e Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sat, 3 Jun 2017 14:57:24 +0200 Subject: [PATCH 14/17] cache sync: add more refcount tests --- src/borg/cache_sync/unpack.h | 8 +++-- src/borg/testsuite/cache.py | 59 ++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/src/borg/cache_sync/unpack.h b/src/borg/cache_sync/unpack.h index a48559ffd..d878dda7d 100644 --- a/src/borg/cache_sync/unpack.h +++ b/src/borg/cache_sync/unpack.h @@ -251,8 +251,12 @@ static inline int unpack_callback_array_end(unpack_user* u) /* b'chunks': [ ( b'1234...', 123, 345 ) * ^ */ cache_entry = (uint32_t*) hashindex_get(u->chunks, u->current.key); - if (cache_entry) { + if(cache_entry) { refcount = _le32toh(cache_entry[0]); + if(refcount > _MAX_VALUE) { + SET_LAST_ERROR("invalid reference count"); + return -1; + } refcount += 1; cache_entry[0] = _htole32(MIN(refcount, _MAX_VALUE)); } else { @@ -260,7 +264,7 @@ static inline int unpack_callback_array_end(unpack_user* u) cache_values[0] = _htole32(1); cache_values[1] = _htole32(u->current.size); cache_values[2] = _htole32(u->current.csize); - if (!hashindex_set(u->chunks, u->current.key, cache_values)) { + if(!hashindex_set(u->chunks, u->current.key, cache_values)) { SET_LAST_ERROR("hashindex_set failed"); return -1; } diff --git a/src/borg/testsuite/cache.py b/src/borg/testsuite/cache.py index 690e50e31..6f6452a10 100644 --- a/src/borg/testsuite/cache.py +++ b/src/borg/testsuite/cache.py @@ -1,3 +1,4 @@ +import io from msgpack import packb @@ -137,3 +138,61 @@ class TestCacheSynchronizer: with pytest.raises(ValueError) as excinfo: sync.feed(packed) assert str(excinfo.value) == 'cache_sync_feed failed: ' + error + + def make_index_with_refcount(self, refcount): + index_data = io.BytesIO() + index_data.write(b'BORG_IDX') + # num_entries + index_data.write((1).to_bytes(4, 'little')) + # num_buckets + index_data.write((1).to_bytes(4, 'little')) + # key_size + index_data.write((32).to_bytes(1, 'little')) + # value_size + index_data.write((3 * 4).to_bytes(1, 'little')) + + index_data.write(H(0)) + index_data.write(refcount.to_bytes(4, 'little')) + index_data.write((1234).to_bytes(4, 'little')) + index_data.write((5678).to_bytes(4, 'little')) + + index_data.seek(0) + index = ChunkIndex.read(index_data) + return index + + def test_corrupted_refcount(self): + index = self.make_index_with_refcount(ChunkIndex.MAX_VALUE + 1) + sync = CacheSynchronizer(index) + data = packb({ + 'chunks': [ + (H(0), 1, 2), + ] + }) + with pytest.raises(ValueError) as excinfo: + sync.feed(data) + assert str(excinfo.value) == 'cache_sync_feed failed: invalid reference count' + + def test_refcount_max_value(self): + index = self.make_index_with_refcount(ChunkIndex.MAX_VALUE) + sync = CacheSynchronizer(index) + data = packb({ + 'chunks': [ + (H(0), 1, 2), + ] + }) + sync.feed(data) + assert index[H(0)] == (ChunkIndex.MAX_VALUE, 1234, 5678) + + def test_refcount_one_below_max_value(self): + index = self.make_index_with_refcount(ChunkIndex.MAX_VALUE - 1) + sync = CacheSynchronizer(index) + data = packb({ + 'chunks': [ + (H(0), 1, 2), + ] + }) + sync.feed(data) + # Incremented to maximum + assert index[H(0)] == (ChunkIndex.MAX_VALUE, 1234, 5678) + sync.feed(data) + assert index[H(0)] == (ChunkIndex.MAX_VALUE, 1234, 5678) From 310a71e4f018116d160c3a226648d33c15d35268 Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sat, 10 Jun 2017 09:56:41 +0200 Subject: [PATCH 15/17] cache sync: use ro_buffer to accept bytes, memoryview, ... --- src/borg/hashindex.pyx | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index 338683440..90db35d87 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -7,6 +7,7 @@ cimport cython from libc.stdint cimport uint32_t, UINT32_MAX, uint64_t from libc.errno cimport errno from cpython.exc cimport PyErr_SetFromErrnoWithFilename +from cpython.buffer cimport PyBUF_SIMPLE, PyObject_GetBuffer, PyBuffer_Release API_VERSION = '1.1_02' @@ -386,6 +387,12 @@ cdef class ChunkKeyIterator: return (self.key)[:self.key_size], ChunkIndexEntry(refcount, _le32toh(value[1]), _le32toh(value[2])) +cdef Py_buffer ro_buffer(object data) except *: + cdef Py_buffer view + PyObject_GetBuffer(data, &view, PyBUF_SIMPLE) + return view + + cdef class CacheSynchronizer: cdef ChunkIndex chunks cdef CacheSyncCtx *sync @@ -401,7 +408,11 @@ cdef class CacheSynchronizer: cache_sync_free(self.sync) def feed(self, chunk): - if not cache_sync_feed(self.sync, chunk, len(chunk)): + cdef Py_buffer chunk_buf = ro_buffer(chunk) + cdef int rc + rc = cache_sync_feed(self.sync, chunk_buf.buf, chunk_buf.len) + PyBuffer_Release(&chunk_buf) + if not rc: error = cache_sync_error(self.sync) if error != NULL: raise ValueError('cache_sync_feed failed: ' + error.decode('ascii')) From 5eb43b84640c8fcb9dd0347bf0177f185e2c4511 Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sat, 10 Jun 2017 10:05:43 +0200 Subject: [PATCH 16/17] cache sync: give overview of the source's structure --- src/borg/cache_sync/cache_sync.c | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/borg/cache_sync/cache_sync.c b/src/borg/cache_sync/cache_sync.c index e0c2e0fb8..e4bc653b2 100644 --- a/src/borg/cache_sync/cache_sync.c +++ b/src/borg/cache_sync/cache_sync.c @@ -1,3 +1,20 @@ +/* + * Borg cache synchronizer, + * high level interface. + * + * These routines parse msgpacked item metadata and update a HashIndex + * with all chunks that are referenced from the items. + * + * This file only contains some initialization and buffer management. + * + * The parser is split in two parts, somewhat similar to lexer/parser combinations: + * + * unpack_template.h munches msgpack and calls a specific callback for each object + * encountered (e.g. beginning of a map, an integer, a string, a map item etc.). + * + * unpack.h implements these callbacks and uses another state machine to + * extract chunk references from it. + */ #include "unpack.h" From 0e31f78dd6e29549899290eb2b05bf7bd158b8d7 Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sat, 10 Jun 2017 10:12:06 +0200 Subject: [PATCH 17/17] cache sync: avoid "l" and such as a variable name, note vanilla changes --- src/borg/cache_sync/unpack.h | 13 ++++++------- src/borg/cache_sync/unpack_template.h | 5 +++++ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/borg/cache_sync/unpack.h b/src/borg/cache_sync/unpack.h index d878dda7d..94172d424 100644 --- a/src/borg/cache_sync/unpack.h +++ b/src/borg/cache_sync/unpack.h @@ -288,7 +288,6 @@ static inline int unpack_callback_array_end(unpack_user* u) return 0; } } - return 0; } @@ -317,6 +316,7 @@ static inline int unpack_callback_map(unpack_user* u, unsigned int n) static inline int unpack_callback_map_item(unpack_user* u, unsigned int current) { (void)u; (void)current; + if(u->level == 1) { switch(u->expect) { case expect_map_item_end: @@ -340,7 +340,7 @@ static inline int unpack_callback_map_end(unpack_user* u) return 0; } -static inline int unpack_callback_raw(unpack_user* u, const char* b, const char* p, unsigned int l) +static inline int unpack_callback_raw(unpack_user* u, const char* b, const char* p, unsigned int length) { /* raw = what Borg uses for binary stuff and strings as well */ /* Note: p points to an internal buffer which contains l bytes. */ @@ -348,7 +348,7 @@ static inline int unpack_callback_raw(unpack_user* u, const char* b, const char* switch(u->expect) { case expect_key: - if(l != 32) { + if(length != 32) { SET_LAST_ERROR("Incorrect key length"); return -1; } @@ -356,7 +356,7 @@ static inline int unpack_callback_raw(unpack_user* u, const char* b, const char* u->expect = expect_size; break; case expect_chunks_map_key: - if(l == 6 && !memcmp("chunks", p, 6)) { + if(length == 6 && !memcmp("chunks", p, 6)) { u->expect = expect_chunks_begin; u->inside_chunks = 1; } else { @@ -369,13 +369,12 @@ static inline int unpack_callback_raw(unpack_user* u, const char* b, const char* return -1; } } - return 0; } -static inline int unpack_callback_bin(unpack_user* u, const char* b, const char* p, unsigned int l) +static inline int unpack_callback_bin(unpack_user* u, const char* b, const char* p, unsigned int length) { - (void)u; (void)b; (void)p; (void)l; + (void)u; (void)b; (void)p; (void)length; UNEXPECTED("bin"); return 0; } diff --git a/src/borg/cache_sync/unpack_template.h b/src/borg/cache_sync/unpack_template.h index 1ac262740..a6492da35 100644 --- a/src/borg/cache_sync/unpack_template.h +++ b/src/borg/cache_sync/unpack_template.h @@ -15,6 +15,11 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * + * This has been slightly adapted from the vanilla msgpack-{c, python} version. + * Since cache_sync does not intend to build an output data structure, + * msgpack_unpack_object and all of its uses was removed. */ #ifndef USE_CASE_RANGE