From 3c173cc03b9a9a625d487b23ee247c9153967d9b Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sun, 1 Jul 2018 02:34:48 +0200 Subject: [PATCH] wrap msgpack, fixes #3632, fixes #2738 wrap msgpack to avoid future upstream api changes making troubles or that we would have to globally spoil our code with extra params. make sure the packing is always with use_bin_type=False, thus generating "old" msgpack format (as borg always did) from bytes objects. make sure the unpacking is always with raw=True, thus generating bytes objects. note: safe unicode encoding/decoding for some kinds of data types is done in Item class (see item.pyx), so it is enough if we care for bytes objects on the msgpack level. also wrap exception handling, so borg code can catch msgpack specific exceptions even if the upstream msgpack code raises way too generic exceptions typed Exception, TypeError or ValueError. We use own Exception classes for this, upstream classes are deprecated --- src/borg/archive.py | 35 +++----- src/borg/archiver.py | 12 +-- src/borg/cache.py | 3 +- src/borg/crypto/key.py | 7 +- src/borg/fuse.py | 2 +- src/borg/helpers/__init__.py | 4 +- src/borg/helpers/msgpack.py | 146 +++++++++++++++++++++++++++++-- src/borg/remote.py | 3 +- src/borg/repository.py | 5 +- src/borg/testsuite/archive.py | 2 +- src/borg/testsuite/archiver.py | 2 +- src/borg/testsuite/helpers.py | 7 +- src/borg/testsuite/key.py | 4 +- src/borg/testsuite/repository.py | 3 +- 14 files changed, 177 insertions(+), 58 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 7cc86c93d..c22a655f7 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -13,8 +13,6 @@ from io import BytesIO from itertools import groupby, zip_longest from shutil import get_terminal_size -import msgpack - from .logger import create_logger logger = create_logger() @@ -39,6 +37,7 @@ from .helpers import StableDict from .helpers import bin_to_hex from .helpers import safe_ns from .helpers import ellipsis_truncate, ProgressIndicatorPercent, log_multi +from .helpers import msgpack from .patterns import PathPrefixPattern, FnmatchPattern, IECommand from .item import Item, ArchiveItem, ItemDiff from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname @@ -239,7 +238,7 @@ class ChunkBuffer: def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS): self.buffer = BytesIO() - self.packer = msgpack.Packer(unicode_errors='surrogateescape') + self.packer = msgpack.Packer() self.chunks = [] self.key = key self.chunker = Chunker(self.key.chunk_seed, *chunker_params) @@ -348,7 +347,7 @@ class Archive: def _load_meta(self, id): data = self.key.decrypt(id, self.repository.get(id)) - metadata = ArchiveItem(internal_dict=msgpack.unpackb(data, unicode_errors='surrogateescape')) + metadata = ArchiveItem(internal_dict=msgpack.unpackb(data)) if metadata.version != 1: raise Exception('Unknown archive metadata version') return metadata @@ -735,7 +734,7 @@ Utilization of max. archive size: {csize_max:.0%} def set_meta(self, key, value): metadata = self._load_meta(self.id) setattr(metadata, key, value) - data = msgpack.packb(metadata.as_dict(), unicode_errors='surrogateescape') + data = msgpack.packb(metadata.as_dict()) new_id = self.key.id_hash(data) self.cache.add_chunk(new_id, data, self.stats) self.manifest.archives[self.name] = (new_id, metadata.time) @@ -1207,9 +1206,6 @@ def valid_msgpacked_dict(d, keys_serialized): class RobustUnpacker: """A restartable/robust version of the streaming msgpack unpacker """ - class UnpackerCrashed(Exception): - """raise if unpacker crashed""" - def __init__(self, validator, item_keys): super().__init__() self.item_keys = [msgpack.packb(name.encode()) for name in item_keys] @@ -1232,14 +1228,6 @@ class RobustUnpacker: return self def __next__(self): - def unpack_next(): - try: - return next(self._unpacker) - except (TypeError, ValueError) as err: - # transform exceptions that might be raised when feeding - # msgpack with invalid data to a more specific exception - raise self.UnpackerCrashed(str(err)) - if self._resync: data = b''.join(self._buffered_data) while self._resync: @@ -1252,8 +1240,8 @@ class RobustUnpacker: self._unpacker = msgpack.Unpacker(object_hook=StableDict) self._unpacker.feed(data) try: - item = unpack_next() - except (self.UnpackerCrashed, StopIteration): + item = next(self._unpacker) + except (msgpack.UnpackException, StopIteration): # as long as we are resyncing, we also ignore StopIteration pass else: @@ -1262,7 +1250,7 @@ class RobustUnpacker: return item data = data[1:] else: - return unpack_next() + return next(self._unpacker) class ArchiveChecker: @@ -1459,9 +1447,8 @@ class ArchiveChecker: continue try: archive = msgpack.unpackb(data) - # Ignore exceptions that might be raised when feeding - # msgpack with invalid data - except (TypeError, ValueError, StopIteration): + # Ignore exceptions that might be raised when feeding msgpack with invalid data + except msgpack.UnpackException: continue if valid_archive(archive): archive = ArchiveItem(internal_dict=archive) @@ -1640,7 +1627,7 @@ class ArchiveChecker: yield Item(internal_dict=item) else: report('Did not get expected metadata dict when unpacking item metadata (%s)' % reason, chunk_id, i) - except RobustUnpacker.UnpackerCrashed as err: + except msgpack.UnpackException as err: report('Unpacker crashed while unpacking item metadata, trying to resync...', chunk_id, i) unpacker.resync() except Exception: @@ -1696,7 +1683,7 @@ class ArchiveChecker: for previous_item_id in archive.items: mark_as_possibly_superseded(previous_item_id) archive.items = items_buffer.chunks - data = msgpack.packb(archive.as_dict(), unicode_errors='surrogateescape') + data = msgpack.packb(archive.as_dict()) new_archive_id = self.key.id_hash(data) cdata = self.key.encrypt(data) add_reference(new_archive_id, len(data), len(cdata), cdata) diff --git a/src/borg/archiver.py b/src/borg/archiver.py index 100b57486..305f73c54 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -29,8 +29,6 @@ from .logger import create_logger, setup_logging logger = create_logger() -import msgpack - import borg from . import __version__ from . import helpers @@ -68,6 +66,7 @@ from .helpers import ChunkIteratorFileWrapper from .helpers import popen_with_error_handling, prepare_subprocess_env from .helpers import dash_open from .helpers import umount +from .helpers import msgpack from .nanorst import rst_to_terminal from .patterns import ArgparsePatternAction, ArgparseExcludeFileAction, ArgparsePatternFileAction, parse_exclude_pattern from .patterns import PatternMatcher @@ -1705,7 +1704,7 @@ class Archiver: fd.write(',\n') data = key.decrypt(archive_meta_orig[b'id'], repository.get(archive_meta_orig[b'id'])) - archive_org_dict = msgpack.unpackb(data, object_hook=StableDict, unicode_errors='surrogateescape') + archive_org_dict = msgpack.unpackb(data, object_hook=StableDict) fd.write(' "_meta":\n') fd.write(do_indent(prepare_dump_dict(archive_org_dict))) @@ -1738,7 +1737,7 @@ class Archiver: data = key.decrypt(None, repository.get(manifest.MANIFEST_ID)) - meta = prepare_dump_dict(msgpack.fallback.unpackb(data, object_hook=StableDict, unicode_errors='surrogateescape')) + meta = prepare_dump_dict(msgpack.unpackb(data, object_hook=StableDict)) with dash_open(args.path, 'w') as fd: json.dump(meta, fd, indent=4) @@ -1844,7 +1843,7 @@ class Archiver: """convert Borg profile to Python profile""" import marshal with args.output, args.input: - marshal.dump(msgpack.unpack(args.input, use_list=False, encoding='utf-8'), args.output) + marshal.dump(msgpack.mp_unpack(args.input, use_list=False, raw=False), args.output) return EXIT_SUCCESS @with_repository(lock=False, manifest=False) @@ -4107,7 +4106,8 @@ class Archiver: # into a marshal file that can be read by e.g. pyprof2calltree. # For local use it's unnecessary hassle, though, that's why .pyprof makes # it compatible (see above). - msgpack.pack(profiler.stats, fd, use_bin_type=True) + # We do not use our msgpack wrapper here, but directly call mp_pack. + msgpack.mp_pack(profiler.stats, fd, use_bin_type=True) else: return set_ec(func(args)) diff --git a/src/borg/cache.py b/src/borg/cache.py index 6bc06adfd..ef1b733cf 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -6,8 +6,6 @@ from binascii import unhexlify from collections import namedtuple from time import perf_counter -import msgpack - from .logger import create_logger logger = create_logger() @@ -26,6 +24,7 @@ from .helpers import remove_surrogates from .helpers import ProgressIndicatorPercent, ProgressIndicatorMessage from .helpers import set_ec, EXIT_WARNING from .helpers import truncate_and_unlink +from .helpers import msgpack from .item import ArchiveItem, ChunkListEntry from .crypto.key import PlaintextKey from .crypto.file_integrity import IntegrityCheckedFile, DetachedIntegrityCheckedFile, FileIntegrityError diff --git a/src/borg/crypto/key.py b/src/borg/crypto/key.py index 28b34c25b..498eb142f 100644 --- a/src/borg/crypto/key.py +++ b/src/borg/crypto/key.py @@ -9,8 +9,6 @@ from binascii import a2b_base64, b2a_base64, hexlify from hashlib import sha256, sha512, pbkdf2_hmac from hmac import HMAC, compare_digest -import msgpack - from ..logger import create_logger logger = create_logger() @@ -24,6 +22,7 @@ from ..helpers import get_keys_dir, get_security_dir from ..helpers import get_limited_unpacker from ..helpers import bin_to_hex from ..helpers import prepare_subprocess_env +from ..helpers import msgpack from ..item import Key, EncryptedKey from ..platform import SaveFile @@ -212,10 +211,10 @@ class KeyBase: 'hmac': bytes(64), 'salt': os.urandom(64), }) - packed = msgpack.packb(metadata_dict, unicode_errors='surrogateescape') + packed = msgpack.packb(metadata_dict) tam_key = self._tam_key(tam['salt'], context) tam['hmac'] = HMAC(tam_key, packed, sha512).digest() - return msgpack.packb(metadata_dict, unicode_errors='surrogateescape') + return msgpack.packb(metadata_dict) def unpack_and_verify_manifest(self, data, force_tam_not_required=False): """Unpack msgpacked *data* and return (object, did_verify).""" diff --git a/src/borg/fuse.py b/src/borg/fuse.py index c6638e4cb..ba5284d35 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -11,7 +11,6 @@ from signal import SIGINT from distutils.version import LooseVersion import llfuse -import msgpack from .logger import create_logger logger = create_logger() @@ -21,6 +20,7 @@ from .archiver import Archiver from .archive import Archive from .hashindex import FuseVersionsIndex from .helpers import daemonize, hardlinkable, signal_handler, format_file_size +from .helpers import msgpack from .item import Item from .lrucache import LRUCache from .remote import RemoteRepository diff --git a/src/borg/helpers/__init__.py b/src/borg/helpers/__init__.py index b1c875a4e..d8e74d344 100644 --- a/src/borg/helpers/__init__.py +++ b/src/borg/helpers/__init__.py @@ -12,7 +12,6 @@ from .errors import * # NOQA from .fs import * # NOQA from .manifest import * # NOQA from .misc import * # NOQA -from .msgpack import * # NOQA from .parseformat import * # NOQA from .process import * # NOQA from .progress import * # NOQA @@ -20,6 +19,9 @@ from .time import * # NOQA from .usergroup import * # NOQA from .yes import * # NOQA +from .msgpack import is_slow_msgpack, int_to_bigint, bigint_to_int, get_limited_unpacker +from . import msgpack + """ The global exit_code variable is used so that modules other than archiver can increase the program exit code if a warning or error occurred during their operation. This is different from archiver.exit_code, which is only accessible diff --git a/src/borg/helpers/msgpack.py b/src/borg/helpers/msgpack.py index 72bc1f45e..eec48494b 100644 --- a/src/borg/helpers/msgpack.py +++ b/src/borg/helpers/msgpack.py @@ -1,11 +1,147 @@ -import msgpack -import msgpack.fallback - from .datastruct import StableDict from ..constants import * # NOQA +# wrapping msgpack --------------------------------------------------------------------------------------------------- +# +# due to the planned breaking api changes in upstream msgpack, we wrap it the way we need it - +# to avoid having lots of clutter in the calling code. see tickets #968 and #3632. +# +# Packing +# ------- +# use_bin_type = False is needed to generate the old msgpack format (not msgpack 2.0 spec) as borg always did. +# encoding = None is needed because usage of it is deprecated +# unicode_errors = None is needed because usage of it is deprecated +# +# Unpacking +# --------- +# raw = True is needed to unpack the old msgpack format to bytes (not str, about the decoding see item.pyx). +# encoding = None is needed because usage of it is deprecated +# unicode_errors = None is needed because usage of it is deprecated + +from msgpack import Packer as mp_Packer +from msgpack import packb as mp_packb +from msgpack import pack as mp_pack +from msgpack import Unpacker as mp_Unpacker +from msgpack import unpackb as mp_unpackb +from msgpack import unpack as mp_unpack + +from msgpack import ExtType +from msgpack import OutOfData + + +class PackException(Exception): + """Exception while msgpack packing""" + + +class UnpackException(Exception): + """Exception while msgpack unpacking""" + + +class Packer(mp_Packer): + def __init__(self, *, default=None, encoding=None, unicode_errors=None, + use_single_float=False, autoreset=True, use_bin_type=False, + strict_types=False): + assert use_bin_type is False + assert encoding is None + assert unicode_errors is None + super().__init__(default=default, encoding=encoding, unicode_errors=unicode_errors, + use_single_float=use_single_float, autoreset=autoreset, use_bin_type=use_bin_type, + strict_types=strict_types) + + def pack(self, obj): + try: + return super().pack(obj) + except Exception as e: + raise PackException(e) + + +def packb(o, *, use_bin_type=False, encoding=None, unicode_errors=None, **kwargs): + assert use_bin_type is False + assert encoding is None + assert unicode_errors is None + try: + return mp_packb(o, use_bin_type=use_bin_type, encoding=encoding, unicode_errors=unicode_errors, **kwargs) + except Exception as e: + raise PackException(e) + + +def pack(o, stream, *, use_bin_type=False, encoding=None, unicode_errors=None, **kwargs): + assert use_bin_type is False + assert encoding is None + assert unicode_errors is None + try: + return mp_pack(o, stream, use_bin_type=use_bin_type, encoding=encoding, unicode_errors=unicode_errors, **kwargs) + except Exception as e: + raise PackException(e) + + +class Unpacker(mp_Unpacker): + def __init__(self, file_like=None, *, read_size=0, use_list=True, raw=True, + object_hook=None, object_pairs_hook=None, list_hook=None, + encoding=None, unicode_errors=None, max_buffer_size=0, + ext_hook=ExtType, + max_str_len=2147483647, # 2**32-1 + max_bin_len=2147483647, + max_array_len=2147483647, + max_map_len=2147483647, + max_ext_len=2147483647): + assert raw is True + assert encoding is None + assert unicode_errors is None + super().__init__(file_like=file_like, read_size=read_size, use_list=use_list, raw=raw, + object_hook=object_hook, object_pairs_hook=object_pairs_hook, list_hook=list_hook, + encoding=encoding, unicode_errors=unicode_errors, max_buffer_size=max_buffer_size, + ext_hook=ext_hook, + max_str_len=max_str_len, + max_bin_len=max_bin_len, + max_array_len=max_array_len, + max_map_len=max_map_len, + max_ext_len=max_ext_len) + + def unpack(self): + try: + return super().unpack() + except OutOfData: + raise + except Exception as e: + raise UnpackException(e) + + def __next__(self): + try: + return super().__next__() + except StopIteration: + raise + except Exception as e: + raise UnpackException(e) + + next = __next__ + + +def unpackb(packed, *, raw=True, encoding=None, unicode_errors=None, **kwargs): + assert raw is True + assert encoding is None + assert unicode_errors is None + try: + return mp_unpackb(packed, raw=raw, encoding=encoding, unicode_errors=unicode_errors, **kwargs) + except Exception as e: + raise UnpackException(e) + + +def unpack(stream, *, raw=True, encoding=None, unicode_errors=None, **kwargs): + assert raw is True + assert encoding is None + assert unicode_errors is None + try: + return mp_unpack(stream, raw=raw, encoding=encoding, unicode_errors=unicode_errors, **kwargs) + except Exception as e: + raise UnpackException(e) + + +# msgpacking related utilities ----------------------------------------------- def is_slow_msgpack(): + import msgpack + import msgpack.fallback return msgpack.Packer is msgpack.fallback.Packer @@ -31,7 +167,6 @@ def get_limited_unpacker(kind): max_map_len=MAX_ARCHIVES, # list of archives max_str_len=255, # archive name object_hook=StableDict, - unicode_errors='surrogateescape', )) elif kind == 'key': args.update(dict(use_list=True, # default value @@ -39,11 +174,10 @@ def get_limited_unpacker(kind): max_map_len=10, # EncryptedKey dict max_str_len=4000, # inner key data object_hook=StableDict, - unicode_errors='surrogateescape', )) else: raise ValueError('kind must be "server", "client", "manifest" or "key"') - return msgpack.Unpacker(**args) + return Unpacker(**args) def bigint_to_int(mtime): diff --git a/src/borg/remote.py b/src/borg/remote.py index 44a7d8e87..787b4bdb9 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -15,8 +15,6 @@ import time import traceback from subprocess import Popen, PIPE -import msgpack - from . import __version__ from .compress import LZ4 from .constants import * # NOQA @@ -31,6 +29,7 @@ from .helpers import format_file_size from .helpers import truncate_and_unlink from .helpers import prepare_subprocess_env from .logger import create_logger, setup_logging +from .helpers import msgpack from .repository import Repository from .version import parse_version, format_version from .algorithms.checksums import xxh64 diff --git a/src/borg/repository.py b/src/borg/repository.py index 0b2a20bf2..22cd01efe 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -11,8 +11,6 @@ from datetime import datetime from functools import partial from itertools import islice -import msgpack - from .constants import * # NOQA from .hashindex import NSIndex from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size @@ -22,6 +20,7 @@ from .helpers import bin_to_hex from .helpers import hostname_is_unique from .helpers import secure_erase, truncate_and_unlink from .helpers import Manifest +from .helpers import msgpack from .locking import Lock, LockError, LockErrorT from .logger import create_logger from .lrucache import LRUCache @@ -513,7 +512,7 @@ class Repository: try: with IntegrityCheckedFile(hints_path, write=False, integrity_data=integrity_data) as fd: hints = msgpack.unpack(fd) - except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError, FileIntegrityError) as e: + except (msgpack.UnpackException, FileNotFoundError, FileIntegrityError) as e: logger.warning('Repository hints file missing or corrupted, trying to recover: %s', e) if not isinstance(e, FileNotFoundError): os.unlink(hints_path) diff --git a/src/borg/testsuite/archive.py b/src/borg/testsuite/archive.py index bc1133524..0decb9af9 100644 --- a/src/borg/testsuite/archive.py +++ b/src/borg/testsuite/archive.py @@ -3,7 +3,6 @@ from datetime import datetime, timezone from io import StringIO from unittest.mock import Mock -import msgpack import pytest from . import BaseTestCase @@ -11,6 +10,7 @@ from ..crypto.key import PlaintextKey from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, valid_msgpacked_dict, ITEM_KEYS, Statistics from ..archive import BackupOSError, backup_io, backup_io_iter from ..helpers import Manifest +from ..helpers import msgpack from ..item import Item, ArchiveItem diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py index b9a003883..48278d1d2 100644 --- a/src/borg/testsuite/archiver.py +++ b/src/borg/testsuite/archiver.py @@ -23,7 +23,6 @@ from hashlib import sha256 from io import BytesIO, StringIO from unittest.mock import patch -import msgpack import pytest try: @@ -46,6 +45,7 @@ from ..helpers import Manifest, MandatoryFeatureUnsupported from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR from ..helpers import bin_to_hex from ..helpers import MAX_S +from ..helpers import msgpack from ..nanorst import RstToTextLazy, rst_to_terminal from ..patterns import IECommand, PatternMatcher, parse_pattern from ..item import Item, ItemDiff diff --git a/src/borg/testsuite/helpers.py b/src/borg/testsuite/helpers.py index 222115d2a..2d329c075 100644 --- a/src/borg/testsuite/helpers.py +++ b/src/borg/testsuite/helpers.py @@ -8,9 +8,6 @@ from time import sleep import pytest -import msgpack -import msgpack.fallback - from .. import platform from ..helpers import Location from ..helpers import Buffer @@ -19,6 +16,7 @@ from ..helpers import make_path_safe, clean_lines from ..helpers import interval, prune_within, prune_split from ..helpers import get_base_dir, get_cache_dir, get_keys_dir, get_security_dir, get_config_dir from ..helpers import is_slow_msgpack +from ..helpers import msgpack from ..helpers import yes, TRUISH, FALSISH, DEFAULTISH from ..helpers import StableDict, int_to_bigint, bigint_to_int, bin_to_hex from ..helpers import parse_timestamp, ChunkIteratorFileWrapper, ChunkerParams @@ -594,6 +592,9 @@ def test_parse_file_size_invalid(string): def test_is_slow_msgpack(): + # we need to import upstream msgpack package here, not helpers.msgpack: + import msgpack + import msgpack.fallback saved_packer = msgpack.Packer try: msgpack.Packer = msgpack.fallback.Packer diff --git a/src/borg/testsuite/key.py b/src/borg/testsuite/key.py index 075311744..1046e1ad1 100644 --- a/src/borg/testsuite/key.py +++ b/src/borg/testsuite/key.py @@ -4,7 +4,6 @@ import re import tempfile from binascii import hexlify, unhexlify -import msgpack import pytest from ..crypto.key import Passphrase, PasswordRetriesExceeded, bin_to_hex @@ -19,6 +18,7 @@ from ..helpers import IntegrityError from ..helpers import Location from ..helpers import StableDict from ..helpers import get_security_dir +from ..helpers import msgpack class TestKey: @@ -333,7 +333,7 @@ class TestTAM: key.unpack_and_verify_manifest(blob) blob = b'\xc1\xc1\xc1' - with pytest.raises((ValueError, msgpack.UnpackException)): + with pytest.raises(msgpack.UnpackException): key.unpack_and_verify_manifest(blob) def test_missing_when_required(self, key): diff --git a/src/borg/testsuite/repository.py b/src/borg/testsuite/repository.py index 6af9da6bb..9cdd1322f 100644 --- a/src/borg/testsuite/repository.py +++ b/src/borg/testsuite/repository.py @@ -6,13 +6,12 @@ import sys import tempfile from unittest.mock import patch -import msgpack - import pytest from ..hashindex import NSIndex from ..helpers import Location from ..helpers import IntegrityError +from ..helpers import msgpack from ..locking import Lock, LockFailed from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, ConnectionClosedWithHint, handle_remote_line from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE