diff --git a/scripts/fuzz-cache-sync/HOWTO b/scripts/fuzz-cache-sync/HOWTO new file mode 100644 index 000000000..ae144b287 --- /dev/null +++ b/scripts/fuzz-cache-sync/HOWTO @@ -0,0 +1,10 @@ +- Install AFL and the requirements for LLVM mode (see docs) +- Compile the fuzzing target, e.g. + + AFL_HARDEN=1 afl-clang-fast main.c -o fuzz-target -O3 + + (other options, like using ASan or MSan are possible as well) +- Add additional test cases to testcase_dir +- Run afl, easiest (but inefficient) way; + + afl-fuzz -i testcase_dir -o findings_dir ./fuzz-target diff --git a/scripts/fuzz-cache-sync/main.c b/scripts/fuzz-cache-sync/main.c new file mode 100644 index 000000000..b25d925db --- /dev/null +++ b/scripts/fuzz-cache-sync/main.c @@ -0,0 +1,30 @@ +#include "../../src/borg/_hashindex.c" +#include "../../src/borg/cache_sync/cache_sync.c" + +#define BUFSZ 32768 + +int main() { + char buf[BUFSZ]; + int len, ret; + CacheSyncCtx *ctx; + HashIndex *idx; + + /* capacity, key size, value size */ + idx = hashindex_init(0, 32, 12); + ctx = cache_sync_init(idx); + + while (1) { + len = read(0, buf, BUFSZ); + if (!len) { + break; + } + ret = cache_sync_feed(ctx, buf, len); + if(!ret && cache_sync_error(ctx)) { + fprintf(stderr, "error: %s\n", cache_sync_error(ctx)); + return 1; + } + } + hashindex_free(idx); + cache_sync_free(ctx); + return 0; +} diff --git a/scripts/fuzz-cache-sync/testcase_dir/test_simple b/scripts/fuzz-cache-sync/testcase_dir/test_simple new file mode 100644 index 000000000..0bf5a0ea1 Binary files /dev/null and b/scripts/fuzz-cache-sync/testcase_dir/test_simple differ diff --git a/setup.py b/setup.py index d7746e4dc..864b352a5 100644 --- a/setup.py +++ b/setup.py @@ -91,6 +91,8 @@ try: 'src/borg/crypto/low_level.c', 'src/borg/chunker.c', 'src/borg/_chunker.c', 'src/borg/hashindex.c', 'src/borg/_hashindex.c', + 'src/borg/cache_sync/cache_sync.c', 'src/borg/cache_sync/sysdep.h', 'src/borg/cache_sync/unpack.h', + 'src/borg/cache_sync/unpack_define.h', 'src/borg/cache_sync/unpack_template.h', 'src/borg/item.c', 'src/borg/algorithms/checksums.c', 'src/borg/algorithms/crc32_dispatch.c', 'src/borg/algorithms/crc32_clmul.c', 'src/borg/algorithms/crc32_slice_by_8.c', diff --git a/src/borg/_hashindex.c b/src/borg/_hashindex.c index 335ccac2e..1a61d67b9 100644 --- a/src/borg/_hashindex.c +++ b/src/borg/_hashindex.c @@ -1,6 +1,6 @@ -#include #include +#include #include #include #include @@ -56,8 +56,10 @@ typedef struct { int lower_limit; int upper_limit; int min_empty; +#ifdef Py_PYTHON_H /* buckets may be backed by a Python buffer. If buckets_buffer.buf is NULL then this is not used. */ Py_buffer buckets_buffer; +#endif } HashIndex; /* prime (or w/ big prime factors) hash table sizes @@ -106,8 +108,11 @@ static int hash_sizes[] = { #define EPRINTF(msg, ...) fprintf(stderr, "hashindex: " msg "(%s)\n", ##__VA_ARGS__, strerror(errno)) #define EPRINTF_PATH(path, msg, ...) fprintf(stderr, "hashindex: %s: " msg " (%s)\n", path, ##__VA_ARGS__, strerror(errno)) +#ifdef Py_PYTHON_H static HashIndex *hashindex_read(PyObject *file_py); static void hashindex_write(HashIndex *index, PyObject *file_py); +#endif + static HashIndex *hashindex_init(int capacity, int key_size, int value_size); static const void *hashindex_get(HashIndex *index, const void *key); static int hashindex_set(HashIndex *index, const void *key, const void *value); @@ -120,9 +125,12 @@ static void hashindex_free(HashIndex *index); static void hashindex_free_buckets(HashIndex *index) { +#ifdef Py_PYTHON_H if(index->buckets_buffer.buf) { PyBuffer_Release(&index->buckets_buffer); - } else { + } else +#endif + { free(index->buckets); } } @@ -263,6 +271,7 @@ count_empty(HashIndex *index) /* Public API */ +#ifdef Py_PYTHON_H static HashIndex * hashindex_read(PyObject *file_py) { @@ -418,6 +427,7 @@ fail_decref_header: fail: return index; } +#endif static HashIndex * hashindex_init(int capacity, int key_size, int value_size) @@ -444,7 +454,9 @@ hashindex_init(int capacity, int key_size, int value_size) index->lower_limit = get_lower_limit(index->num_buckets); index->upper_limit = get_upper_limit(index->num_buckets); index->min_empty = get_min_empty(index->num_buckets); +#ifdef Py_PYTHON_H index->buckets_buffer.buf = NULL; +#endif for(i = 0; i < capacity; i++) { BUCKET_MARK_EMPTY(index, i); } @@ -458,7 +470,7 @@ hashindex_free(HashIndex *index) free(index); } - +#ifdef Py_PYTHON_H static void hashindex_write(HashIndex *index, PyObject *file_py) { @@ -521,6 +533,7 @@ hashindex_write(HashIndex *index, PyObject *file_py) return; } } +#endif static const void * hashindex_get(HashIndex *index, const void *key) diff --git a/src/borg/cache.py b/src/borg/cache.py index 13045f0e9..7a9fce029 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -12,7 +12,7 @@ from .logger import create_logger logger = create_logger() from .constants import CACHE_README -from .hashindex import ChunkIndex, ChunkIndexEntry +from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer from .helpers import Location from .helpers import Error from .helpers import get_cache_dir, get_security_dir @@ -564,24 +564,16 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" except FileNotFoundError: pass - def fetch_and_build_idx(archive_id, repository, key, chunk_idx): - cdata = repository.get(archive_id) - data = key.decrypt(archive_id, cdata) - chunk_idx.add(archive_id, 1, len(data), len(cdata)) + def fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx): + csize, data = decrypted_repository.get(archive_id) + chunk_idx.add(archive_id, 1, len(data), csize) archive = ArchiveItem(internal_dict=msgpack.unpackb(data)) if archive.version != 1: raise Exception('Unknown archive metadata version') - unpacker = msgpack.Unpacker() - for item_id, chunk in zip(archive.items, repository.get_many(archive.items)): - data = key.decrypt(item_id, chunk) - chunk_idx.add(item_id, 1, len(data), len(chunk)) - unpacker.feed(data) - for item in unpacker: - if not isinstance(item, dict): - logger.error('Error: Did not get expected metadata dict - archive corrupted!') - continue # XXX: continue?! - for chunk_id, size, csize in item.get(b'chunks', []): - chunk_idx.add(chunk_id, 1, size, csize) + sync = CacheSynchronizer(chunk_idx) + for item_id, (csize, data) in zip(archive.items, decrypted_repository.get_many(archive.items)): + chunk_idx.add(item_id, 1, len(data), csize) + sync.feed(data) if self.do_cache: fn = mkpath(archive_id) fn_tmp = mkpath(archive_id, suffix='.tmp') @@ -594,10 +586,17 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" else: os.rename(fn_tmp, fn) - def lookup_name(archive_id): + def get_archive_ids_to_names(archive_ids): + # Pass once over all archives and build a mapping from ids to names. + # The easier approach, doing a similar loop for each archive, has + # square complexity and does about a dozen million functions calls + # with 1100 archives (which takes 30s CPU seconds _alone_). + archive_names = {} for info in self.manifest.archives.list(): - if info.id == archive_id: - return info.name + if info.id in archive_ids: + archive_names[info.id] = info.name + assert len(archive_names) == len(archive_ids) + return archive_names def create_master_idx(chunk_idx): logger.info('Synchronizing chunks cache...') @@ -609,16 +608,17 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" # deallocates old hashindex, creates empty hashindex: chunk_idx.clear() cleanup_outdated(cached_ids - archive_ids) + # Explicitly set the initial hash table capacity to avoid performance issues + # due to hash table "resonance". + master_index_capacity = int(len(self.repository) / ChunkIndex.MAX_LOAD_FACTOR) if archive_ids: chunk_idx = None - if self.progress: - pi = ProgressIndicatorPercent(total=len(archive_ids), step=0.1, - msg='%3.0f%% Syncing chunks cache. Processing archive %s', - msgid='cache.sync') - for archive_id in archive_ids: - archive_name = lookup_name(archive_id) - if self.progress: - pi.show(info=[remove_surrogates(archive_name)]) + pi = ProgressIndicatorPercent(total=len(archive_ids), step=0.1, + msg='%3.0f%% Syncing chunks cache. Processing archive %s', + msgid='cache.sync') + archive_ids_to_names = get_archive_ids_to_names(archive_ids) + for archive_id, archive_name in archive_ids_to_names.items(): + pi.show(info=[remove_surrogates(archive_name)]) if self.do_cache: if archive_id in cached_ids: archive_chunk_idx_path = mkpath(archive_id) @@ -637,7 +637,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" # above can remove *archive_id* from *cached_ids*. logger.info('Fetching and building archive index for %s ...', archive_name) archive_chunk_idx = ChunkIndex() - fetch_and_build_idx(archive_id, repository, self.key, archive_chunk_idx) + fetch_and_build_idx(archive_id, decrypted_repository, archive_chunk_idx) logger.info("Merging into master chunks index ...") if chunk_idx is None: # we just use the first archive's idx as starting point, @@ -647,11 +647,10 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" else: chunk_idx.merge(archive_chunk_idx) else: - chunk_idx = chunk_idx or ChunkIndex() + chunk_idx = chunk_idx or ChunkIndex(master_index_capacity) logger.info('Fetching archive index for %s ...', archive_name) - fetch_and_build_idx(archive_id, repository, self.key, chunk_idx) - if self.progress: - pi.finish() + fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx) + pi.finish() logger.info('Done.') return chunk_idx @@ -671,7 +670,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" pass self.begin_txn() - with cache_if_remote(self.repository) as repository: + with cache_if_remote(self.repository, decrypted_cache=self.key) as decrypted_repository: legacy_cleanup() # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d - # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk) diff --git a/src/borg/cache_sync/cache_sync.c b/src/borg/cache_sync/cache_sync.c new file mode 100644 index 000000000..e4bc653b2 --- /dev/null +++ b/src/borg/cache_sync/cache_sync.c @@ -0,0 +1,122 @@ +/* + * Borg cache synchronizer, + * high level interface. + * + * These routines parse msgpacked item metadata and update a HashIndex + * with all chunks that are referenced from the items. + * + * This file only contains some initialization and buffer management. + * + * The parser is split in two parts, somewhat similar to lexer/parser combinations: + * + * unpack_template.h munches msgpack and calls a specific callback for each object + * encountered (e.g. beginning of a map, an integer, a string, a map item etc.). + * + * unpack.h implements these callbacks and uses another state machine to + * extract chunk references from it. + */ + +#include "unpack.h" + +typedef struct { + unpack_context ctx; + + char *buf; + size_t head; + size_t tail; + size_t size; +} CacheSyncCtx; + +static CacheSyncCtx * +cache_sync_init(HashIndex *chunks) +{ + CacheSyncCtx *ctx; + if (!(ctx = (CacheSyncCtx*)malloc(sizeof(CacheSyncCtx)))) { + return NULL; + } + + unpack_init(&ctx->ctx); + /* needs to be set only once */ + ctx->ctx.user.chunks = chunks; + ctx->buf = NULL; + ctx->head = 0; + ctx->tail = 0; + ctx->size = 0; + + return ctx; +} + +static void +cache_sync_free(CacheSyncCtx *ctx) +{ + if(ctx->buf) { + free(ctx->buf); + } + free(ctx); +} + +static const char * +cache_sync_error(CacheSyncCtx *ctx) +{ + return ctx->ctx.user.last_error; +} + +/** + * feed data to the cache synchronizer + * 0 = abort, 1 = continue + * abort is a regular condition, check cache_sync_error + */ +static int +cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length) +{ + size_t new_size; + int ret; + char *new_buf; + + if(ctx->tail + length > ctx->size) { + if((ctx->tail - ctx->head) + length <= ctx->size) { + /* | XXXXX| -> move data in buffer backwards -> |XXXXX | */ + memmove(ctx->buf, ctx->buf + ctx->head, ctx->tail - ctx->head); + ctx->tail -= ctx->head; + ctx->head = 0; + } else { + /* must expand buffer to fit all data */ + new_size = (ctx->tail - ctx->head) + length; + new_buf = (char*) malloc(new_size); + if(!new_buf) { + ctx->ctx.user.last_error = "cache_sync_feed: unable to allocate buffer"; + return 0; + } + memcpy(new_buf, ctx->buf + ctx->head, ctx->tail - ctx->head); + free(ctx->buf); + ctx->buf = new_buf; + ctx->tail -= ctx->head; + ctx->head = 0; + ctx->size = new_size; + } + } + + memcpy(ctx->buf + ctx->tail, data, length); + ctx->tail += length; + + while(1) { + if(ctx->head >= ctx->tail) { + return 1; /* request more bytes */ + } + + ret = unpack_execute(&ctx->ctx, ctx->buf, ctx->tail, &ctx->head); + if(ret == 1) { + unpack_init(&ctx->ctx); + continue; + } else if(ret == 0) { + return 1; + } else { + if(!ctx->ctx.user.last_error) { + ctx->ctx.user.last_error = "Unknown error"; + } + return 0; + } + } + /* unreachable */ + return 1; +} diff --git a/src/borg/cache_sync/sysdep.h b/src/borg/cache_sync/sysdep.h new file mode 100644 index 000000000..ed9c1bc0b --- /dev/null +++ b/src/borg/cache_sync/sysdep.h @@ -0,0 +1,194 @@ +/* + * MessagePack system dependencies + * + * Copyright (C) 2008-2010 FURUHASHI Sadayuki + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MSGPACK_SYSDEP_H__ +#define MSGPACK_SYSDEP_H__ + +#include +#include +#if defined(_MSC_VER) && _MSC_VER < 1600 +typedef __int8 int8_t; +typedef unsigned __int8 uint8_t; +typedef __int16 int16_t; +typedef unsigned __int16 uint16_t; +typedef __int32 int32_t; +typedef unsigned __int32 uint32_t; +typedef __int64 int64_t; +typedef unsigned __int64 uint64_t; +#elif defined(_MSC_VER) // && _MSC_VER >= 1600 +#include +#else +#include +#include +#endif + +#ifdef _WIN32 +#define _msgpack_atomic_counter_header +typedef long _msgpack_atomic_counter_t; +#define _msgpack_sync_decr_and_fetch(ptr) InterlockedDecrement(ptr) +#define _msgpack_sync_incr_and_fetch(ptr) InterlockedIncrement(ptr) +#elif defined(__GNUC__) && ((__GNUC__*10 + __GNUC_MINOR__) < 41) +#define _msgpack_atomic_counter_header "gcc_atomic.h" +#else +typedef unsigned int _msgpack_atomic_counter_t; +#define _msgpack_sync_decr_and_fetch(ptr) __sync_sub_and_fetch(ptr, 1) +#define _msgpack_sync_incr_and_fetch(ptr) __sync_add_and_fetch(ptr, 1) +#endif + +#ifdef _WIN32 + +#ifdef __cplusplus +/* numeric_limits::min,max */ +#ifdef max +#undef max +#endif +#ifdef min +#undef min +#endif +#endif + +#else +#include /* __BYTE_ORDER */ +#endif + +#if !defined(__LITTLE_ENDIAN__) && !defined(__BIG_ENDIAN__) +#if __BYTE_ORDER == __LITTLE_ENDIAN +#define __LITTLE_ENDIAN__ +#elif __BYTE_ORDER == __BIG_ENDIAN +#define __BIG_ENDIAN__ +#elif _WIN32 +#define __LITTLE_ENDIAN__ +#endif +#endif + + +#ifdef __LITTLE_ENDIAN__ + +#ifdef _WIN32 +# if defined(ntohs) +# define _msgpack_be16(x) ntohs(x) +# elif defined(_byteswap_ushort) || (defined(_MSC_VER) && _MSC_VER >= 1400) +# define _msgpack_be16(x) ((uint16_t)_byteswap_ushort((unsigned short)x)) +# else +# define _msgpack_be16(x) ( \ + ((((uint16_t)x) << 8) ) | \ + ((((uint16_t)x) >> 8) ) ) +# endif +#else +# define _msgpack_be16(x) ntohs(x) +#endif + +#ifdef _WIN32 +# if defined(ntohl) +# define _msgpack_be32(x) ntohl(x) +# elif defined(_byteswap_ulong) || (defined(_MSC_VER) && _MSC_VER >= 1400) +# define _msgpack_be32(x) ((uint32_t)_byteswap_ulong((unsigned long)x)) +# else +# define _msgpack_be32(x) \ + ( ((((uint32_t)x) << 24) ) | \ + ((((uint32_t)x) << 8) & 0x00ff0000U ) | \ + ((((uint32_t)x) >> 8) & 0x0000ff00U ) | \ + ((((uint32_t)x) >> 24) ) ) +# endif +#else +# define _msgpack_be32(x) ntohl(x) +#endif + +#if defined(_byteswap_uint64) || (defined(_MSC_VER) && _MSC_VER >= 1400) +# define _msgpack_be64(x) (_byteswap_uint64(x)) +#elif defined(bswap_64) +# define _msgpack_be64(x) bswap_64(x) +#elif defined(__DARWIN_OSSwapInt64) +# define _msgpack_be64(x) __DARWIN_OSSwapInt64(x) +#else +#define _msgpack_be64(x) \ + ( ((((uint64_t)x) << 56) ) | \ + ((((uint64_t)x) << 40) & 0x00ff000000000000ULL ) | \ + ((((uint64_t)x) << 24) & 0x0000ff0000000000ULL ) | \ + ((((uint64_t)x) << 8) & 0x000000ff00000000ULL ) | \ + ((((uint64_t)x) >> 8) & 0x00000000ff000000ULL ) | \ + ((((uint64_t)x) >> 24) & 0x0000000000ff0000ULL ) | \ + ((((uint64_t)x) >> 40) & 0x000000000000ff00ULL ) | \ + ((((uint64_t)x) >> 56) ) ) +#endif + +#define _msgpack_load16(cast, from) ((cast)( \ + (((uint16_t)((uint8_t*)(from))[0]) << 8) | \ + (((uint16_t)((uint8_t*)(from))[1]) ) )) + +#define _msgpack_load32(cast, from) ((cast)( \ + (((uint32_t)((uint8_t*)(from))[0]) << 24) | \ + (((uint32_t)((uint8_t*)(from))[1]) << 16) | \ + (((uint32_t)((uint8_t*)(from))[2]) << 8) | \ + (((uint32_t)((uint8_t*)(from))[3]) ) )) + +#define _msgpack_load64(cast, from) ((cast)( \ + (((uint64_t)((uint8_t*)(from))[0]) << 56) | \ + (((uint64_t)((uint8_t*)(from))[1]) << 48) | \ + (((uint64_t)((uint8_t*)(from))[2]) << 40) | \ + (((uint64_t)((uint8_t*)(from))[3]) << 32) | \ + (((uint64_t)((uint8_t*)(from))[4]) << 24) | \ + (((uint64_t)((uint8_t*)(from))[5]) << 16) | \ + (((uint64_t)((uint8_t*)(from))[6]) << 8) | \ + (((uint64_t)((uint8_t*)(from))[7]) ) )) + +#else + +#define _msgpack_be16(x) (x) +#define _msgpack_be32(x) (x) +#define _msgpack_be64(x) (x) + +#define _msgpack_load16(cast, from) ((cast)( \ + (((uint16_t)((uint8_t*)from)[0]) << 8) | \ + (((uint16_t)((uint8_t*)from)[1]) ) )) + +#define _msgpack_load32(cast, from) ((cast)( \ + (((uint32_t)((uint8_t*)from)[0]) << 24) | \ + (((uint32_t)((uint8_t*)from)[1]) << 16) | \ + (((uint32_t)((uint8_t*)from)[2]) << 8) | \ + (((uint32_t)((uint8_t*)from)[3]) ) )) + +#define _msgpack_load64(cast, from) ((cast)( \ + (((uint64_t)((uint8_t*)from)[0]) << 56) | \ + (((uint64_t)((uint8_t*)from)[1]) << 48) | \ + (((uint64_t)((uint8_t*)from)[2]) << 40) | \ + (((uint64_t)((uint8_t*)from)[3]) << 32) | \ + (((uint64_t)((uint8_t*)from)[4]) << 24) | \ + (((uint64_t)((uint8_t*)from)[5]) << 16) | \ + (((uint64_t)((uint8_t*)from)[6]) << 8) | \ + (((uint64_t)((uint8_t*)from)[7]) ) )) +#endif + + +#define _msgpack_store16(to, num) \ + do { uint16_t val = _msgpack_be16(num); memcpy(to, &val, 2); } while(0) +#define _msgpack_store32(to, num) \ + do { uint32_t val = _msgpack_be32(num); memcpy(to, &val, 4); } while(0) +#define _msgpack_store64(to, num) \ + do { uint64_t val = _msgpack_be64(num); memcpy(to, &val, 8); } while(0) + +/* +#define _msgpack_load16(cast, from) \ + ({ cast val; memcpy(&val, (char*)from, 2); _msgpack_be16(val); }) +#define _msgpack_load32(cast, from) \ + ({ cast val; memcpy(&val, (char*)from, 4); _msgpack_be32(val); }) +#define _msgpack_load64(cast, from) \ + ({ cast val; memcpy(&val, (char*)from, 8); _msgpack_be64(val); }) +*/ + + +#endif /* msgpack/sysdep.h */ diff --git a/src/borg/cache_sync/unpack.h b/src/borg/cache_sync/unpack.h new file mode 100644 index 000000000..94172d424 --- /dev/null +++ b/src/borg/cache_sync/unpack.h @@ -0,0 +1,390 @@ +/* + * Borg cache synchronizer, + * based on a MessagePack for Python unpacking routine + * + * Copyright (C) 2009 Naoki INADA + * Copyright (c) 2017 Marian Beermann + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This limits the depth of the structures we can unpack, i.e. how many containers + * are nestable. + */ +#define MSGPACK_EMBED_STACK_SIZE (16) +#include "unpack_define.h" + +// 2**32 - 1025 +#define _MAX_VALUE ( (uint32_t) 4294966271 ) + +#define MIN(x, y) ((x) < (y) ? (x): (y)) + +#ifdef DEBUG +#define SET_LAST_ERROR(msg) \ + fprintf(stderr, "cache_sync parse error: %s\n", (msg)); \ + u->last_error = (msg); +#else +#define SET_LAST_ERROR(msg) \ + u->last_error = (msg); +#endif + +typedef struct unpack_user { + /* Item.chunks is at the top level; we don't care about anything else, + * only need to track the current level to navigate arbitrary and unknown structure. + * To discern keys from everything else on the top level we use expect_map_item_end. + */ + int level; + + const char *last_error; + + HashIndex *chunks; + + /* + * We don't care about most stuff. This flag tells us whether we're at the chunks structure, + * meaning: + * {'foo': 'bar', 'chunks': [...], 'stuff': ... } + * ^-HERE-^ + */ + int inside_chunks; + enum { + /* the next thing is a map key at the Item root level, + * and it might be the "chunks" key we're looking for */ + expect_chunks_map_key, + + /* blocking state to expect_chunks_map_key + * { 'stuff': , 'chunks': [ + * ecmk -> emie -> -> -> -> ecmk ecb eeboce + * (nested containers are tracked via level) + * ecmk=expect_chunks_map_key, emie=expect_map_item_end, ecb=expect_chunks_begin, + * eeboce=expect_entry_begin_or_chunks_end + */ + expect_map_item_end, + + /* next thing must be the chunks array (array) */ + expect_chunks_begin, + + /* next thing must either be another CLE (array) or end of Item.chunks (array_end) */ + expect_entry_begin_or_chunks_end, + + /* + * processing ChunkListEntry tuple: + * expect_key, expect_size, expect_csize, expect_entry_end + */ + /* next thing must be the key (raw, l=32) */ + expect_key, + /* next thing must be the size (int) */ + expect_size, + /* next thing must be the csize (int) */ + expect_csize, + /* next thing must be the end of the CLE (array_end) */ + expect_entry_end, + + expect_item_begin + } expect; + + struct { + char key[32]; + uint32_t csize; + uint32_t size; + } current; +} unpack_user; + +struct unpack_context; +typedef struct unpack_context unpack_context; +typedef int (*execute_fn)(unpack_context *ctx, const char* data, size_t len, size_t* off); + +#define UNEXPECTED(what) \ + if(u->inside_chunks || u->expect == expect_chunks_map_key) { \ + SET_LAST_ERROR("Unexpected object: " what); \ + return -1; \ + } + +static inline void unpack_init_user_state(unpack_user *u) +{ + u->last_error = NULL; + u->level = 0; + u->inside_chunks = false; + u->expect = expect_item_begin; +} + +static inline int unpack_callback_int64(unpack_user* u, int64_t d) +{ + switch(u->expect) { + case expect_size: + u->current.size = d; + u->expect = expect_csize; + break; + case expect_csize: + u->current.csize = d; + u->expect = expect_entry_end; + break; + default: + UNEXPECTED("integer"); + } + return 0; +} + +static inline int unpack_callback_uint16(unpack_user* u, uint16_t d) +{ + return unpack_callback_int64(u, d); +} + +static inline int unpack_callback_uint8(unpack_user* u, uint8_t d) +{ + return unpack_callback_int64(u, d); +} + + +static inline int unpack_callback_uint32(unpack_user* u, uint32_t d) +{ + return unpack_callback_int64(u, d); +} + +static inline int unpack_callback_uint64(unpack_user* u, uint64_t d) +{ + return unpack_callback_int64(u, d); +} + +static inline int unpack_callback_int32(unpack_user* u, int32_t d) +{ + return unpack_callback_int64(u, d); +} + +static inline int unpack_callback_int16(unpack_user* u, int16_t d) +{ + return unpack_callback_int64(u, d); +} + +static inline int unpack_callback_int8(unpack_user* u, int8_t d) +{ + return unpack_callback_int64(u, d); +} + +/* Ain't got anything to do with those floats */ +static inline int unpack_callback_double(unpack_user* u, double d) +{ + (void)d; + UNEXPECTED("double"); + return 0; +} + +static inline int unpack_callback_float(unpack_user* u, float d) +{ + (void)d; + UNEXPECTED("float"); + return 0; +} + +/* nil/true/false — I/don't/care */ +static inline int unpack_callback_nil(unpack_user* u) +{ + UNEXPECTED("nil"); + return 0; +} + +static inline int unpack_callback_true(unpack_user* u) +{ + UNEXPECTED("true"); + return 0; +} + +static inline int unpack_callback_false(unpack_user* u) +{ + UNEXPECTED("false"); + return 0; +} + +static inline int unpack_callback_array(unpack_user* u, unsigned int n) +{ + switch(u->expect) { + case expect_chunks_begin: + /* b'chunks': [ + * ^ */ + u->expect = expect_entry_begin_or_chunks_end; + break; + case expect_entry_begin_or_chunks_end: + /* b'chunks': [ ( + * ^ */ + if(n != 3) { + SET_LAST_ERROR("Invalid chunk list entry length"); + return -1; + } + u->expect = expect_key; + break; + default: + if(u->inside_chunks) { + SET_LAST_ERROR("Unexpected array start"); + return -1; + } else { + u->level++; + return 0; + } + } + return 0; +} + +static inline int unpack_callback_array_item(unpack_user* u, unsigned int current) +{ + (void)u; (void)current; + return 0; +} + +static inline int unpack_callback_array_end(unpack_user* u) +{ + uint32_t *cache_entry; + uint32_t cache_values[3]; + uint64_t refcount; + + switch(u->expect) { + case expect_entry_end: + /* b'chunks': [ ( b'1234...', 123, 345 ) + * ^ */ + cache_entry = (uint32_t*) hashindex_get(u->chunks, u->current.key); + if(cache_entry) { + refcount = _le32toh(cache_entry[0]); + if(refcount > _MAX_VALUE) { + SET_LAST_ERROR("invalid reference count"); + return -1; + } + refcount += 1; + cache_entry[0] = _htole32(MIN(refcount, _MAX_VALUE)); + } else { + /* refcount, size, csize */ + cache_values[0] = _htole32(1); + cache_values[1] = _htole32(u->current.size); + cache_values[2] = _htole32(u->current.csize); + if(!hashindex_set(u->chunks, u->current.key, cache_values)) { + SET_LAST_ERROR("hashindex_set failed"); + return -1; + } + } + + u->expect = expect_entry_begin_or_chunks_end; + break; + case expect_entry_begin_or_chunks_end: + /* b'chunks': [ ] + * ^ */ + /* end of Item.chunks */ + u->inside_chunks = 0; + u->expect = expect_map_item_end; + break; + default: + if(u->inside_chunks) { + SET_LAST_ERROR("Invalid state transition (unexpected array end)"); + return -1; + } else { + u->level--; + return 0; + } + } + return 0; +} + +static inline int unpack_callback_map(unpack_user* u, unsigned int n) +{ + (void)n; + + if(u->level == 0) { + if(u->expect != expect_item_begin) { + SET_LAST_ERROR("Invalid state transition"); /* unreachable */ + return -1; + } + /* This begins a new Item */ + u->expect = expect_chunks_map_key; + } + + if(u->inside_chunks) { + UNEXPECTED("map"); + } + + u->level++; + + return 0; +} + +static inline int unpack_callback_map_item(unpack_user* u, unsigned int current) +{ + (void)u; (void)current; + + if(u->level == 1) { + switch(u->expect) { + case expect_map_item_end: + u->expect = expect_chunks_map_key; + break; + default: + SET_LAST_ERROR("Unexpected map item"); + return -1; + } + } + return 0; +} + +static inline int unpack_callback_map_end(unpack_user* u) +{ + u->level--; + if(u->inside_chunks) { + SET_LAST_ERROR("Unexpected map end"); + return -1; + } + return 0; +} + +static inline int unpack_callback_raw(unpack_user* u, const char* b, const char* p, unsigned int length) +{ + /* raw = what Borg uses for binary stuff and strings as well */ + /* Note: p points to an internal buffer which contains l bytes. */ + (void)b; + + switch(u->expect) { + case expect_key: + if(length != 32) { + SET_LAST_ERROR("Incorrect key length"); + return -1; + } + memcpy(u->current.key, p, 32); + u->expect = expect_size; + break; + case expect_chunks_map_key: + if(length == 6 && !memcmp("chunks", p, 6)) { + u->expect = expect_chunks_begin; + u->inside_chunks = 1; + } else { + u->expect = expect_map_item_end; + } + break; + default: + if(u->inside_chunks) { + SET_LAST_ERROR("Unexpected bytes in chunks structure"); + return -1; + } + } + return 0; +} + +static inline int unpack_callback_bin(unpack_user* u, const char* b, const char* p, unsigned int length) +{ + (void)u; (void)b; (void)p; (void)length; + UNEXPECTED("bin"); + return 0; +} + +static inline int unpack_callback_ext(unpack_user* u, const char* base, const char* pos, + unsigned int length) +{ + (void)u; (void)base; (void)pos; (void)length; + UNEXPECTED("ext"); + return 0; +} + +#include "unpack_template.h" diff --git a/src/borg/cache_sync/unpack_define.h b/src/borg/cache_sync/unpack_define.h new file mode 100644 index 000000000..d681277bb --- /dev/null +++ b/src/borg/cache_sync/unpack_define.h @@ -0,0 +1,95 @@ +/* + * MessagePack unpacking routine template + * + * Copyright (C) 2008-2010 FURUHASHI Sadayuki + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MSGPACK_UNPACK_DEFINE_H__ +#define MSGPACK_UNPACK_DEFINE_H__ + +#include "sysdep.h" +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + + +#ifndef MSGPACK_EMBED_STACK_SIZE +#define MSGPACK_EMBED_STACK_SIZE 32 +#endif + + +// CS is first byte & 0x1f +typedef enum { + CS_HEADER = 0x00, // nil + + //CS_ = 0x01, + //CS_ = 0x02, // false + //CS_ = 0x03, // true + + CS_BIN_8 = 0x04, + CS_BIN_16 = 0x05, + CS_BIN_32 = 0x06, + + CS_EXT_8 = 0x07, + CS_EXT_16 = 0x08, + CS_EXT_32 = 0x09, + + CS_FLOAT = 0x0a, + CS_DOUBLE = 0x0b, + CS_UINT_8 = 0x0c, + CS_UINT_16 = 0x0d, + CS_UINT_32 = 0x0e, + CS_UINT_64 = 0x0f, + CS_INT_8 = 0x10, + CS_INT_16 = 0x11, + CS_INT_32 = 0x12, + CS_INT_64 = 0x13, + + //CS_FIXEXT1 = 0x14, + //CS_FIXEXT2 = 0x15, + //CS_FIXEXT4 = 0x16, + //CS_FIXEXT8 = 0x17, + //CS_FIXEXT16 = 0x18, + + CS_RAW_8 = 0x19, + CS_RAW_16 = 0x1a, + CS_RAW_32 = 0x1b, + CS_ARRAY_16 = 0x1c, + CS_ARRAY_32 = 0x1d, + CS_MAP_16 = 0x1e, + CS_MAP_32 = 0x1f, + + ACS_RAW_VALUE, + ACS_BIN_VALUE, + ACS_EXT_VALUE, +} msgpack_unpack_state; + + +typedef enum { + CT_ARRAY_ITEM, + CT_MAP_KEY, + CT_MAP_VALUE, +} msgpack_container_type; + + +#ifdef __cplusplus +} +#endif + +#endif /* msgpack/unpack_define.h */ diff --git a/src/borg/cache_sync/unpack_template.h b/src/borg/cache_sync/unpack_template.h new file mode 100644 index 000000000..a6492da35 --- /dev/null +++ b/src/borg/cache_sync/unpack_template.h @@ -0,0 +1,365 @@ +/* + * MessagePack unpacking routine template + * + * Copyright (C) 2008-2010 FURUHASHI Sadayuki + * Copyright (c) 2017 Marian Beermann + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + * This has been slightly adapted from the vanilla msgpack-{c, python} version. + * Since cache_sync does not intend to build an output data structure, + * msgpack_unpack_object and all of its uses was removed. + */ + +#ifndef USE_CASE_RANGE +#if !defined(_MSC_VER) +#define USE_CASE_RANGE +#endif +#endif + +typedef struct unpack_stack { + size_t size; + size_t count; + unsigned int ct; +} unpack_stack; + +struct unpack_context { + unpack_user user; + unsigned int cs; + unsigned int trail; + unsigned int top; + unpack_stack stack[MSGPACK_EMBED_STACK_SIZE]; +}; + +static inline void unpack_init(unpack_context* ctx) +{ + ctx->cs = CS_HEADER; + ctx->trail = 0; + ctx->top = 0; + unpack_init_user_state(&ctx->user); +} + +#define construct 1 + +static inline int unpack_execute(unpack_context* ctx, const char* data, size_t len, size_t* off) +{ + assert(len >= *off); + + const unsigned char* p = (unsigned char*)data + *off; + const unsigned char* const pe = (unsigned char*)data + len; + const void* n = NULL; + + unsigned int trail = ctx->trail; + unsigned int cs = ctx->cs; + unsigned int top = ctx->top; + unpack_stack* stack = ctx->stack; + unpack_user* user = &ctx->user; + + unpack_stack* c = NULL; + + int ret; + +#define construct_cb(name) \ + construct && unpack_callback ## name + +#define push_simple_value(func) \ + if(construct_cb(func)(user) < 0) { goto _failed; } \ + goto _push +#define push_fixed_value(func, arg) \ + if(construct_cb(func)(user, arg) < 0) { goto _failed; } \ + goto _push +#define push_variable_value(func, base, pos, len) \ + if(construct_cb(func)(user, \ + (const char*)base, (const char*)pos, len) < 0) { goto _failed; } \ + goto _push + +#define again_fixed_trail(_cs, trail_len) \ + trail = trail_len; \ + cs = _cs; \ + goto _fixed_trail_again +#define again_fixed_trail_if_zero(_cs, trail_len, ifzero) \ + trail = trail_len; \ + if(trail == 0) { goto ifzero; } \ + cs = _cs; \ + goto _fixed_trail_again + +#define start_container(func, count_, ct_) \ + if(top >= MSGPACK_EMBED_STACK_SIZE) { goto _failed; } /* FIXME */ \ + if(construct_cb(func)(user, count_) < 0) { goto _failed; } \ + if((count_) == 0) { \ + if (construct_cb(func##_end)(user) < 0) { goto _failed; } \ + goto _push; } \ + stack[top].ct = ct_; \ + stack[top].size = count_; \ + stack[top].count = 0; \ + ++top; \ + goto _header_again + +#define NEXT_CS(p) ((unsigned int)*p & 0x1f) + +#ifdef USE_CASE_RANGE +#define SWITCH_RANGE_BEGIN switch(*p) { +#define SWITCH_RANGE(FROM, TO) case FROM ... TO: +#define SWITCH_RANGE_DEFAULT default: +#define SWITCH_RANGE_END } +#else +#define SWITCH_RANGE_BEGIN { if(0) { +#define SWITCH_RANGE(FROM, TO) } else if(FROM <= *p && *p <= TO) { +#define SWITCH_RANGE_DEFAULT } else { +#define SWITCH_RANGE_END } } +#endif + + if(p == pe) { goto _out; } + do { + switch(cs) { + case CS_HEADER: + SWITCH_RANGE_BEGIN + SWITCH_RANGE(0x00, 0x7f) // Positive Fixnum + push_fixed_value(_uint8, *(uint8_t*)p); + SWITCH_RANGE(0xe0, 0xff) // Negative Fixnum + push_fixed_value(_int8, *(int8_t*)p); + SWITCH_RANGE(0xc0, 0xdf) // Variable + switch(*p) { + case 0xc0: // nil + push_simple_value(_nil); + //case 0xc1: // never used + case 0xc2: // false + push_simple_value(_false); + case 0xc3: // true + push_simple_value(_true); + case 0xc4: // bin 8 + again_fixed_trail(NEXT_CS(p), 1); + case 0xc5: // bin 16 + again_fixed_trail(NEXT_CS(p), 2); + case 0xc6: // bin 32 + again_fixed_trail(NEXT_CS(p), 4); + case 0xc7: // ext 8 + again_fixed_trail(NEXT_CS(p), 1); + case 0xc8: // ext 16 + again_fixed_trail(NEXT_CS(p), 2); + case 0xc9: // ext 32 + again_fixed_trail(NEXT_CS(p), 4); + case 0xca: // float + case 0xcb: // double + case 0xcc: // unsigned int 8 + case 0xcd: // unsigned int 16 + case 0xce: // unsigned int 32 + case 0xcf: // unsigned int 64 + case 0xd0: // signed int 8 + case 0xd1: // signed int 16 + case 0xd2: // signed int 32 + case 0xd3: // signed int 64 + again_fixed_trail(NEXT_CS(p), 1 << (((unsigned int)*p) & 0x03)); + case 0xd4: // fixext 1 + case 0xd5: // fixext 2 + case 0xd6: // fixext 4 + case 0xd7: // fixext 8 + again_fixed_trail_if_zero(ACS_EXT_VALUE, + (1 << (((unsigned int)*p) & 0x03))+1, + _ext_zero); + case 0xd8: // fixext 16 + again_fixed_trail_if_zero(ACS_EXT_VALUE, 16+1, _ext_zero); + case 0xd9: // str 8 + again_fixed_trail(NEXT_CS(p), 1); + case 0xda: // raw 16 + case 0xdb: // raw 32 + case 0xdc: // array 16 + case 0xdd: // array 32 + case 0xde: // map 16 + case 0xdf: // map 32 + again_fixed_trail(NEXT_CS(p), 2 << (((unsigned int)*p) & 0x01)); + default: + goto _failed; + } + SWITCH_RANGE(0xa0, 0xbf) // FixRaw + again_fixed_trail_if_zero(ACS_RAW_VALUE, ((unsigned int)*p & 0x1f), _raw_zero); + SWITCH_RANGE(0x90, 0x9f) // FixArray + start_container(_array, ((unsigned int)*p) & 0x0f, CT_ARRAY_ITEM); + SWITCH_RANGE(0x80, 0x8f) // FixMap + start_container(_map, ((unsigned int)*p) & 0x0f, CT_MAP_KEY); + + SWITCH_RANGE_DEFAULT + goto _failed; + SWITCH_RANGE_END + // end CS_HEADER + + + _fixed_trail_again: + ++p; + + default: + if((size_t)(pe - p) < trail) { goto _out; } + n = p; p += trail - 1; + switch(cs) { + case CS_EXT_8: + again_fixed_trail_if_zero(ACS_EXT_VALUE, *(uint8_t*)n+1, _ext_zero); + case CS_EXT_16: + again_fixed_trail_if_zero(ACS_EXT_VALUE, + _msgpack_load16(uint16_t,n)+1, + _ext_zero); + case CS_EXT_32: + again_fixed_trail_if_zero(ACS_EXT_VALUE, + _msgpack_load32(uint32_t,n)+1, + _ext_zero); + case CS_FLOAT: { + union { uint32_t i; float f; } mem; + mem.i = _msgpack_load32(uint32_t,n); + push_fixed_value(_float, mem.f); } + case CS_DOUBLE: { + union { uint64_t i; double f; } mem; + mem.i = _msgpack_load64(uint64_t,n); +#if defined(__arm__) && !(__ARM_EABI__) // arm-oabi + // https://github.com/msgpack/msgpack-perl/pull/1 + mem.i = (mem.i & 0xFFFFFFFFUL) << 32UL | (mem.i >> 32UL); +#endif + push_fixed_value(_double, mem.f); } + case CS_UINT_8: + push_fixed_value(_uint8, *(uint8_t*)n); + case CS_UINT_16: + push_fixed_value(_uint16, _msgpack_load16(uint16_t,n)); + case CS_UINT_32: + push_fixed_value(_uint32, _msgpack_load32(uint32_t,n)); + case CS_UINT_64: + push_fixed_value(_uint64, _msgpack_load64(uint64_t,n)); + + case CS_INT_8: + push_fixed_value(_int8, *(int8_t*)n); + case CS_INT_16: + push_fixed_value(_int16, _msgpack_load16(int16_t,n)); + case CS_INT_32: + push_fixed_value(_int32, _msgpack_load32(int32_t,n)); + case CS_INT_64: + push_fixed_value(_int64, _msgpack_load64(int64_t,n)); + + case CS_BIN_8: + again_fixed_trail_if_zero(ACS_BIN_VALUE, *(uint8_t*)n, _bin_zero); + case CS_BIN_16: + again_fixed_trail_if_zero(ACS_BIN_VALUE, _msgpack_load16(uint16_t,n), _bin_zero); + case CS_BIN_32: + again_fixed_trail_if_zero(ACS_BIN_VALUE, _msgpack_load32(uint32_t,n), _bin_zero); + case ACS_BIN_VALUE: + _bin_zero: + push_variable_value(_bin, data, n, trail); + + case CS_RAW_8: + again_fixed_trail_if_zero(ACS_RAW_VALUE, *(uint8_t*)n, _raw_zero); + case CS_RAW_16: + again_fixed_trail_if_zero(ACS_RAW_VALUE, _msgpack_load16(uint16_t,n), _raw_zero); + case CS_RAW_32: + again_fixed_trail_if_zero(ACS_RAW_VALUE, _msgpack_load32(uint32_t,n), _raw_zero); + case ACS_RAW_VALUE: + _raw_zero: + push_variable_value(_raw, data, n, trail); + + case ACS_EXT_VALUE: + _ext_zero: + push_variable_value(_ext, data, n, trail); + + case CS_ARRAY_16: + start_container(_array, _msgpack_load16(uint16_t,n), CT_ARRAY_ITEM); + case CS_ARRAY_32: + /* FIXME security guard */ + start_container(_array, _msgpack_load32(uint32_t,n), CT_ARRAY_ITEM); + + case CS_MAP_16: + start_container(_map, _msgpack_load16(uint16_t,n), CT_MAP_KEY); + case CS_MAP_32: + /* FIXME security guard */ + start_container(_map, _msgpack_load32(uint32_t,n), CT_MAP_KEY); + + default: + goto _failed; + } + } + +_push: + if(top == 0) { goto _finish; } + c = &stack[top-1]; + switch(c->ct) { + case CT_ARRAY_ITEM: + if(construct_cb(_array_item)(user, c->count) < 0) { goto _failed; } + if(++c->count == c->size) { + if (construct_cb(_array_end)(user) < 0) { goto _failed; } + --top; + /*printf("stack pop %d\n", top);*/ + goto _push; + } + goto _header_again; + case CT_MAP_KEY: + c->ct = CT_MAP_VALUE; + goto _header_again; + case CT_MAP_VALUE: + if(construct_cb(_map_item)(user, c->count) < 0) { goto _failed; } + if(++c->count == c->size) { + if (construct_cb(_map_end)(user) < 0) { goto _failed; } + --top; + /*printf("stack pop %d\n", top);*/ + goto _push; + } + c->ct = CT_MAP_KEY; + goto _header_again; + + default: + goto _failed; + } + +_header_again: + cs = CS_HEADER; + ++p; + } while(p != pe); + goto _out; + + +_finish: + if (!construct) + unpack_callback_nil(user); + ++p; + ret = 1; + /* printf("-- finish --\n"); */ + goto _end; + +_failed: + /* printf("** FAILED **\n"); */ + ret = -1; + goto _end; + +_out: + ret = 0; + goto _end; + +_end: + ctx->cs = cs; + ctx->trail = trail; + ctx->top = top; + *off = p - (const unsigned char*)data; + + return ret; +#undef construct_cb +} + +#undef SWITCH_RANGE_BEGIN +#undef SWITCH_RANGE +#undef SWITCH_RANGE_DEFAULT +#undef SWITCH_RANGE_END +#undef push_simple_value +#undef push_fixed_value +#undef push_variable_value +#undef again_fixed_trail +#undef again_fixed_trail_if_zero +#undef start_container +#undef construct + +#undef NEXT_CS + +/* vim: set ts=4 sw=4 sts=4 expandtab */ diff --git a/src/borg/fuse.py b/src/borg/fuse.py index 28e09689d..4441c4063 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -340,7 +340,7 @@ class FuseOperations(llfuse.Operations): # evict fully read chunk from cache del self.data_cache[id] else: - data = self.key.decrypt(id, self.repository.get(id)) + data = self.key.decrypt(id, self.repository_uncached.get(id)) if offset + n < len(data): # chunk was only partially read, cache it self.data_cache[id] = data diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index 2409836fe..90db35d87 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -7,8 +7,9 @@ cimport cython from libc.stdint cimport uint32_t, UINT32_MAX, uint64_t from libc.errno cimport errno from cpython.exc cimport PyErr_SetFromErrnoWithFilename +from cpython.buffer cimport PyBUF_SIMPLE, PyObject_GetBuffer, PyBuffer_Release -API_VERSION = '1.1_01' +API_VERSION = '1.1_02' cdef extern from "_hashindex.c": @@ -31,6 +32,18 @@ cdef extern from "_hashindex.c": double HASH_MAX_LOAD +cdef extern from "cache_sync/cache_sync.c": + ctypedef struct CacheSyncCtx: + pass + + CacheSyncCtx *cache_sync_init(HashIndex *chunks) + const char *cache_sync_error(CacheSyncCtx *ctx) + int cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length) + void cache_sync_free(CacheSyncCtx *ctx) + + uint32_t _MAX_VALUE + + cdef _NoDefault = object() """ @@ -50,9 +63,6 @@ AssertionError is raised instead. assert UINT32_MAX == 2**32-1 -# module-level constant because cdef's in classes can't have default values -cdef uint32_t _MAX_VALUE = 2**32-1025 - assert _MAX_VALUE % 2 == 1 @@ -375,3 +385,34 @@ cdef class ChunkKeyIterator: cdef uint32_t refcount = _le32toh(value[0]) assert refcount <= _MAX_VALUE, "invalid reference count" return (self.key)[:self.key_size], ChunkIndexEntry(refcount, _le32toh(value[1]), _le32toh(value[2])) + + +cdef Py_buffer ro_buffer(object data) except *: + cdef Py_buffer view + PyObject_GetBuffer(data, &view, PyBUF_SIMPLE) + return view + + +cdef class CacheSynchronizer: + cdef ChunkIndex chunks + cdef CacheSyncCtx *sync + + def __cinit__(self, chunks): + self.chunks = chunks + self.sync = cache_sync_init(self.chunks.index) + if not self.sync: + raise Exception('cache_sync_init failed') + + def __dealloc__(self): + if self.sync: + cache_sync_free(self.sync) + + def feed(self, chunk): + cdef Py_buffer chunk_buf = ro_buffer(chunk) + cdef int rc + rc = cache_sync_feed(self.sync, chunk_buf.buf, chunk_buf.len) + PyBuffer_Release(&chunk_buf) + if not rc: + error = cache_sync_error(self.sync) + if error != NULL: + raise ValueError('cache_sync_feed failed: ' + error.decode('ascii')) diff --git a/src/borg/helpers.py b/src/borg/helpers.py index c93e5c0a5..53e7f645d 100644 --- a/src/borg/helpers.py +++ b/src/borg/helpers.py @@ -125,7 +125,7 @@ def check_python(): def check_extension_modules(): from . import platform, compress, item - if hashindex.API_VERSION != '1.1_01': + if hashindex.API_VERSION != '1.1_02': raise ExtensionModuleError if chunker.API_VERSION != '1.1_01': raise ExtensionModuleError diff --git a/src/borg/remote.py b/src/borg/remote.py index 8c8d21266..201c90b2d 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -7,6 +7,8 @@ import logging import os import select import shlex +import shutil +import struct import sys import tempfile import textwrap @@ -17,15 +19,18 @@ from subprocess import Popen, PIPE import msgpack from . import __version__ +from .compress import LZ4 from .helpers import Error, IntegrityError from .helpers import bin_to_hex from .helpers import get_home_dir from .helpers import hostname_is_unique from .helpers import replace_placeholders from .helpers import sysinfo +from .helpers import format_file_size from .logger import create_logger, setup_logging from .repository import Repository, MAX_OBJECT_SIZE, LIST_SCAN_LIMIT from .version import parse_version, format_version +from .algorithms.checksums import xxh64 logger = create_logger(__name__) @@ -1057,9 +1062,14 @@ class RepositoryNoCache: """A not caching Repository wrapper, passes through to repository. Just to have same API (including the context manager) as RepositoryCache. + + *transform* is a callable taking two arguments, key and raw repository data. + The return value is returned from get()/get_many(). By default, the raw + repository data is returned. """ - def __init__(self, repository): + def __init__(self, repository, transform=None): self.repository = repository + self.transform = transform or (lambda key, data: data) def close(self): pass @@ -1071,52 +1081,156 @@ class RepositoryNoCache: self.close() def get(self, key): - return next(self.get_many([key])) + return next(self.get_many([key], cache=False)) - def get_many(self, keys): - for data in self.repository.get_many(keys): - yield data + def get_many(self, keys, cache=True): + for key, data in zip(keys, self.repository.get_many(keys)): + yield self.transform(key, data) class RepositoryCache(RepositoryNoCache): - """A caching Repository wrapper - - Caches Repository GET operations using a local temporary Repository. """ - # maximum object size that will be cached, 64 kiB. - THRESHOLD = 2**16 + A caching Repository wrapper. - def __init__(self, repository): - super().__init__(repository) - tmppath = tempfile.mkdtemp(prefix='borg-tmp') - self.caching_repo = Repository(tmppath, create=True, exclusive=True) - self.caching_repo.__enter__() # handled by context manager in base class + Caches Repository GET operations locally. + + *pack* and *unpack* complement *transform* of the base class. + *pack* receives the output of *transform* and should return bytes, + which are stored in the cache. *unpack* receives these bytes and + should return the initial data (as returned by *transform*). + """ + + def __init__(self, repository, pack=None, unpack=None, transform=None): + super().__init__(repository, transform) + self.pack = pack or (lambda data: data) + self.unpack = unpack or (lambda data: data) + self.cache = set() + self.basedir = tempfile.mkdtemp(prefix='borg-cache-') + self.query_size_limit() + self.size = 0 + # Instrumentation + self.hits = 0 + self.misses = 0 + self.slow_misses = 0 + self.slow_lat = 0.0 + self.evictions = 0 + self.enospc = 0 + + def query_size_limit(self): + stat_fs = os.statvfs(self.basedir) + available_space = stat_fs.f_bsize * stat_fs.f_bavail + self.size_limit = int(min(available_space * 0.25, 2**31)) + + def key_filename(self, key): + return os.path.join(self.basedir, bin_to_hex(key)) + + def backoff(self): + self.query_size_limit() + target_size = int(0.9 * self.size_limit) + while self.size > target_size and self.cache: + key = self.cache.pop() + file = self.key_filename(key) + self.size -= os.stat(file).st_size + os.unlink(file) + self.evictions += 1 + + def add_entry(self, key, data, cache): + transformed = self.transform(key, data) + if not cache: + return transformed + packed = self.pack(transformed) + file = self.key_filename(key) + try: + with open(file, 'wb') as fd: + fd.write(packed) + except OSError as os_error: + if os_error.errno == errno.ENOSPC: + self.enospc += 1 + self.backoff() + else: + raise + else: + self.size += len(packed) + self.cache.add(key) + if self.size > self.size_limit: + self.backoff() + return transformed def close(self): - if self.caching_repo is not None: - self.caching_repo.destroy() - self.caching_repo = None + logger.debug('RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), ' + '%d evictions, %d ENOSPC hit', + len(self.cache), format_file_size(self.size), format_file_size(self.size_limit), + self.hits, self.misses, self.slow_misses, self.slow_lat, + self.evictions, self.enospc) + self.cache.clear() + shutil.rmtree(self.basedir) - def get_many(self, keys): - unknown_keys = [key for key in keys if key not in self.caching_repo] + def get_many(self, keys, cache=True): + unknown_keys = [key for key in keys if key not in self.cache] repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys)) for key in keys: - try: - yield self.caching_repo.get(key) - except Repository.ObjectNotFound: + if key in self.cache: + file = self.key_filename(key) + with open(file, 'rb') as fd: + self.hits += 1 + yield self.unpack(fd.read()) + else: for key_, data in repository_iterator: if key_ == key: - if len(data) <= self.THRESHOLD: - self.caching_repo.put(key, data) - yield data + transformed = self.add_entry(key, data, cache) + self.misses += 1 + yield transformed break + else: + # slow path: eviction during this get_many removed this key from the cache + t0 = time.perf_counter() + data = self.repository.get(key) + self.slow_lat += time.perf_counter() - t0 + transformed = self.add_entry(key, data, cache) + self.slow_misses += 1 + yield transformed # Consume any pending requests for _ in repository_iterator: pass -def cache_if_remote(repository): - if isinstance(repository, RemoteRepository): - return RepositoryCache(repository) +def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None, force_cache=False): + """ + Return a Repository(No)Cache for *repository*. + + If *decrypted_cache* is a key object, then get and get_many will return a tuple + (csize, plaintext) instead of the actual data in the repository. The cache will + store decrypted data, which increases CPU efficiency (by avoiding repeatedly decrypting + and more importantly MAC and ID checking cached objects). + Internally, objects are compressed with LZ4. + """ + if decrypted_cache and (pack or unpack or transform): + raise ValueError('decrypted_cache and pack/unpack/transform are incompatible') + elif decrypted_cache: + key = decrypted_cache + # 32 bit csize, 64 bit (8 byte) xxh64 + cache_struct = struct.Struct('=I8s') + compressor = LZ4() + + def pack(data): + csize, decrypted = data + compressed = compressor.compress(decrypted) + return cache_struct.pack(csize, xxh64(compressed)) + compressed + + def unpack(data): + data = memoryview(data) + csize, checksum = cache_struct.unpack(data[:cache_struct.size]) + compressed = data[cache_struct.size:] + if checksum != xxh64(compressed): + raise IntegrityError('detected corrupted data in metadata cache') + return csize, compressor.decompress(compressed) + + def transform(id_, data): + csize = len(data) + decrypted = key.decrypt(id_, data) + return csize, decrypted + + if isinstance(repository, RemoteRepository) or force_cache: + return RepositoryCache(repository, pack, unpack, transform) else: - return RepositoryNoCache(repository) + return RepositoryNoCache(repository, transform) diff --git a/src/borg/testsuite/cache.py b/src/borg/testsuite/cache.py new file mode 100644 index 000000000..6f6452a10 --- /dev/null +++ b/src/borg/testsuite/cache.py @@ -0,0 +1,198 @@ +import io + +from msgpack import packb + +import pytest + +from ..hashindex import ChunkIndex, CacheSynchronizer +from .hashindex import H + + +class TestCacheSynchronizer: + @pytest.fixture + def index(self): + return ChunkIndex() + + @pytest.fixture + def sync(self, index): + return CacheSynchronizer(index) + + def test_no_chunks(self, index, sync): + data = packb({ + 'foo': 'bar', + 'baz': 1234, + 'bar': 5678, + 'user': 'chunks', + 'chunks': [] + }) + sync.feed(data) + assert not len(index) + + def test_simple(self, index, sync): + data = packb({ + 'foo': 'bar', + 'baz': 1234, + 'bar': 5678, + 'user': 'chunks', + 'chunks': [ + (H(1), 1, 2), + (H(2), 2, 3), + ] + }) + sync.feed(data) + assert len(index) == 2 + assert index[H(1)] == (1, 1, 2) + assert index[H(2)] == (1, 2, 3) + + def test_multiple(self, index, sync): + data = packb({ + 'foo': 'bar', + 'baz': 1234, + 'bar': 5678, + 'user': 'chunks', + 'chunks': [ + (H(1), 1, 2), + (H(2), 2, 3), + ] + }) + data += packb({ + 'xattrs': { + 'security.foo': 'bar', + 'chunks': '123456', + }, + 'stuff': [ + (1, 2, 3), + ] + }) + data += packb({ + 'xattrs': { + 'security.foo': 'bar', + 'chunks': '123456', + }, + 'chunks': [ + (H(1), 1, 2), + (H(2), 2, 3), + ], + 'stuff': [ + (1, 2, 3), + ] + }) + data += packb({ + 'chunks': [ + (H(3), 1, 2), + ], + }) + data += packb({ + 'chunks': [ + (H(1), 1, 2), + ], + }) + + part1 = data[:70] + part2 = data[70:120] + part3 = data[120:] + sync.feed(part1) + sync.feed(part2) + sync.feed(part3) + assert len(index) == 3 + assert index[H(1)] == (3, 1, 2) + assert index[H(2)] == (2, 2, 3) + assert index[H(3)] == (1, 1, 2) + + @pytest.mark.parametrize('elem,error', ( + ({1: 2}, 'Unexpected object: map'), + (bytes(213), [ + 'Unexpected bytes in chunks structure', # structure 2/3 + 'Incorrect key length']), # structure 3/3 + (1, 'Unexpected object: integer'), + (1.0, 'Unexpected object: double'), + (True, 'Unexpected object: true'), + (False, 'Unexpected object: false'), + (None, 'Unexpected object: nil'), + )) + @pytest.mark.parametrize('structure', ( + lambda elem: {'chunks': elem}, + lambda elem: {'chunks': [elem]}, + lambda elem: {'chunks': [(elem, 1, 2)]}, + )) + def test_corrupted(self, sync, structure, elem, error): + packed = packb(structure(elem)) + with pytest.raises(ValueError) as excinfo: + sync.feed(packed) + if isinstance(error, str): + error = [error] + possible_errors = ['cache_sync_feed failed: ' + error for error in error] + assert str(excinfo.value) in possible_errors + + @pytest.mark.parametrize('data,error', ( + # Incorrect tuple length + ({'chunks': [(bytes(32), 2, 3, 4)]}, 'Invalid chunk list entry length'), + ({'chunks': [(bytes(32), 2)]}, 'Invalid chunk list entry length'), + # Incorrect types + ({'chunks': [(1, 2, 3)]}, 'Unexpected object: integer'), + ({'chunks': [(1, bytes(32), 2)]}, 'Unexpected object: integer'), + ({'chunks': [(bytes(32), 1.0, 2)]}, 'Unexpected object: double'), + )) + def test_corrupted_ancillary(self, index, sync, data, error): + packed = packb(data) + with pytest.raises(ValueError) as excinfo: + sync.feed(packed) + assert str(excinfo.value) == 'cache_sync_feed failed: ' + error + + def make_index_with_refcount(self, refcount): + index_data = io.BytesIO() + index_data.write(b'BORG_IDX') + # num_entries + index_data.write((1).to_bytes(4, 'little')) + # num_buckets + index_data.write((1).to_bytes(4, 'little')) + # key_size + index_data.write((32).to_bytes(1, 'little')) + # value_size + index_data.write((3 * 4).to_bytes(1, 'little')) + + index_data.write(H(0)) + index_data.write(refcount.to_bytes(4, 'little')) + index_data.write((1234).to_bytes(4, 'little')) + index_data.write((5678).to_bytes(4, 'little')) + + index_data.seek(0) + index = ChunkIndex.read(index_data) + return index + + def test_corrupted_refcount(self): + index = self.make_index_with_refcount(ChunkIndex.MAX_VALUE + 1) + sync = CacheSynchronizer(index) + data = packb({ + 'chunks': [ + (H(0), 1, 2), + ] + }) + with pytest.raises(ValueError) as excinfo: + sync.feed(data) + assert str(excinfo.value) == 'cache_sync_feed failed: invalid reference count' + + def test_refcount_max_value(self): + index = self.make_index_with_refcount(ChunkIndex.MAX_VALUE) + sync = CacheSynchronizer(index) + data = packb({ + 'chunks': [ + (H(0), 1, 2), + ] + }) + sync.feed(data) + assert index[H(0)] == (ChunkIndex.MAX_VALUE, 1234, 5678) + + def test_refcount_one_below_max_value(self): + index = self.make_index_with_refcount(ChunkIndex.MAX_VALUE - 1) + sync = CacheSynchronizer(index) + data = packb({ + 'chunks': [ + (H(0), 1, 2), + ] + }) + sync.feed(data) + # Incremented to maximum + assert index[H(0)] == (ChunkIndex.MAX_VALUE, 1234, 5678) + sync.feed(data) + assert index[H(0)] == (ChunkIndex.MAX_VALUE, 1234, 5678) diff --git a/src/borg/testsuite/remote.py b/src/borg/testsuite/remote.py index b9eddabd6..dccfdaffb 100644 --- a/src/borg/testsuite/remote.py +++ b/src/borg/testsuite/remote.py @@ -1,9 +1,18 @@ +import errno import os +import io import time +from unittest.mock import patch import pytest -from ..remote import SleepingBandwidthLimiter +from ..remote import SleepingBandwidthLimiter, RepositoryCache, cache_if_remote +from ..repository import Repository +from ..crypto.key import PlaintextKey +from ..compress import CompressionSpec +from ..helpers import IntegrityError +from .hashindex import H +from .key import TestKey class TestSleepingBandwidthLimiter: @@ -58,3 +67,132 @@ class TestSleepingBandwidthLimiter: now += 10 self.expect_write(5, b"1") it.write(5, b"1") + + +class TestRepositoryCache: + @pytest.yield_fixture + def repository(self, tmpdir): + self.repository_location = os.path.join(str(tmpdir), 'repository') + with Repository(self.repository_location, exclusive=True, create=True) as repository: + repository.put(H(1), b'1234') + repository.put(H(2), b'5678') + repository.put(H(3), bytes(100)) + yield repository + + @pytest.fixture + def cache(self, repository): + return RepositoryCache(repository) + + def test_simple(self, cache: RepositoryCache): + # Single get()s are not cached, since they are used for unique objects like archives. + assert cache.get(H(1)) == b'1234' + assert cache.misses == 1 + assert cache.hits == 0 + + assert list(cache.get_many([H(1)])) == [b'1234'] + assert cache.misses == 2 + assert cache.hits == 0 + + assert list(cache.get_many([H(1)])) == [b'1234'] + assert cache.misses == 2 + assert cache.hits == 1 + + assert cache.get(H(1)) == b'1234' + assert cache.misses == 2 + assert cache.hits == 2 + + def test_backoff(self, cache: RepositoryCache): + def query_size_limit(): + cache.size_limit = 0 + + assert list(cache.get_many([H(1), H(2)])) == [b'1234', b'5678'] + assert cache.misses == 2 + assert cache.evictions == 0 + iterator = cache.get_many([H(1), H(3), H(2)]) + assert next(iterator) == b'1234' + + # Force cache to back off + qsl = cache.query_size_limit + cache.query_size_limit = query_size_limit + cache.backoff() + cache.query_size_limit = qsl + # Evicted H(1) and H(2) + assert cache.evictions == 2 + assert H(1) not in cache.cache + assert H(2) not in cache.cache + assert next(iterator) == bytes(100) + assert cache.slow_misses == 0 + # Since H(2) was in the cache when we called get_many(), but has + # been evicted during iterating the generator, it will be a slow miss. + assert next(iterator) == b'5678' + assert cache.slow_misses == 1 + + def test_enospc(self, cache: RepositoryCache): + class enospc_open: + def __init__(self, *args): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def write(self, data): + raise OSError(errno.ENOSPC, 'foo') + + iterator = cache.get_many([H(1), H(2), H(3)]) + assert next(iterator) == b'1234' + + with patch('builtins.open', enospc_open): + assert next(iterator) == b'5678' + assert cache.enospc == 1 + # We didn't patch query_size_limit which would set size_limit to some low + # value, so nothing was actually evicted. + assert cache.evictions == 0 + + assert next(iterator) == bytes(100) + + @pytest.fixture + def key(self, repository, monkeypatch): + monkeypatch.setenv('BORG_PASSPHRASE', 'test') + key = PlaintextKey.create(repository, TestKey.MockArgs()) + key.compressor = CompressionSpec('none').compressor + return key + + def _put_encrypted_object(self, key, repository, data): + id_ = key.id_hash(data) + repository.put(id_, key.encrypt(data)) + return id_ + + @pytest.fixture + def H1(self, key, repository): + return self._put_encrypted_object(key, repository, b'1234') + + @pytest.fixture + def H2(self, key, repository): + return self._put_encrypted_object(key, repository, b'5678') + + @pytest.fixture + def H3(self, key, repository): + return self._put_encrypted_object(key, repository, bytes(100)) + + @pytest.fixture + def decrypted_cache(self, key, repository): + return cache_if_remote(repository, decrypted_cache=key, force_cache=True) + + def test_cache_corruption(self, decrypted_cache: RepositoryCache, H1, H2, H3): + list(decrypted_cache.get_many([H1, H2, H3])) + + iterator = decrypted_cache.get_many([H1, H2, H3]) + assert next(iterator) == (7, b'1234') + + with open(decrypted_cache.key_filename(H2), 'a+b') as fd: + fd.seek(-1, io.SEEK_END) + corrupted = (int.from_bytes(fd.read(), 'little') ^ 2).to_bytes(1, 'little') + fd.seek(-1, io.SEEK_END) + fd.write(corrupted) + fd.truncate() + + with pytest.raises(IntegrityError): + assert next(iterator) == (7, b'5678')