diff --git a/src/borg/archive.py b/src/borg/archive.py index 7cc86c93d..c22a655f7 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -13,8 +13,6 @@ from io import BytesIO from itertools import groupby, zip_longest from shutil import get_terminal_size -import msgpack - from .logger import create_logger logger = create_logger() @@ -39,6 +37,7 @@ from .helpers import StableDict from .helpers import bin_to_hex from .helpers import safe_ns from .helpers import ellipsis_truncate, ProgressIndicatorPercent, log_multi +from .helpers import msgpack from .patterns import PathPrefixPattern, FnmatchPattern, IECommand from .item import Item, ArchiveItem, ItemDiff from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname @@ -239,7 +238,7 @@ class ChunkBuffer: def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS): self.buffer = BytesIO() - self.packer = msgpack.Packer(unicode_errors='surrogateescape') + self.packer = msgpack.Packer() self.chunks = [] self.key = key self.chunker = Chunker(self.key.chunk_seed, *chunker_params) @@ -348,7 +347,7 @@ class Archive: def _load_meta(self, id): data = self.key.decrypt(id, self.repository.get(id)) - metadata = ArchiveItem(internal_dict=msgpack.unpackb(data, unicode_errors='surrogateescape')) + metadata = ArchiveItem(internal_dict=msgpack.unpackb(data)) if metadata.version != 1: raise Exception('Unknown archive metadata version') return metadata @@ -735,7 +734,7 @@ Utilization of max. archive size: {csize_max:.0%} def set_meta(self, key, value): metadata = self._load_meta(self.id) setattr(metadata, key, value) - data = msgpack.packb(metadata.as_dict(), unicode_errors='surrogateescape') + data = msgpack.packb(metadata.as_dict()) new_id = self.key.id_hash(data) self.cache.add_chunk(new_id, data, self.stats) self.manifest.archives[self.name] = (new_id, metadata.time) @@ -1207,9 +1206,6 @@ def valid_msgpacked_dict(d, keys_serialized): class RobustUnpacker: """A restartable/robust version of the streaming msgpack unpacker """ - class UnpackerCrashed(Exception): - """raise if unpacker crashed""" - def __init__(self, validator, item_keys): super().__init__() self.item_keys = [msgpack.packb(name.encode()) for name in item_keys] @@ -1232,14 +1228,6 @@ class RobustUnpacker: return self def __next__(self): - def unpack_next(): - try: - return next(self._unpacker) - except (TypeError, ValueError) as err: - # transform exceptions that might be raised when feeding - # msgpack with invalid data to a more specific exception - raise self.UnpackerCrashed(str(err)) - if self._resync: data = b''.join(self._buffered_data) while self._resync: @@ -1252,8 +1240,8 @@ class RobustUnpacker: self._unpacker = msgpack.Unpacker(object_hook=StableDict) self._unpacker.feed(data) try: - item = unpack_next() - except (self.UnpackerCrashed, StopIteration): + item = next(self._unpacker) + except (msgpack.UnpackException, StopIteration): # as long as we are resyncing, we also ignore StopIteration pass else: @@ -1262,7 +1250,7 @@ class RobustUnpacker: return item data = data[1:] else: - return unpack_next() + return next(self._unpacker) class ArchiveChecker: @@ -1459,9 +1447,8 @@ class ArchiveChecker: continue try: archive = msgpack.unpackb(data) - # Ignore exceptions that might be raised when feeding - # msgpack with invalid data - except (TypeError, ValueError, StopIteration): + # Ignore exceptions that might be raised when feeding msgpack with invalid data + except msgpack.UnpackException: continue if valid_archive(archive): archive = ArchiveItem(internal_dict=archive) @@ -1640,7 +1627,7 @@ class ArchiveChecker: yield Item(internal_dict=item) else: report('Did not get expected metadata dict when unpacking item metadata (%s)' % reason, chunk_id, i) - except RobustUnpacker.UnpackerCrashed as err: + except msgpack.UnpackException as err: report('Unpacker crashed while unpacking item metadata, trying to resync...', chunk_id, i) unpacker.resync() except Exception: @@ -1696,7 +1683,7 @@ class ArchiveChecker: for previous_item_id in archive.items: mark_as_possibly_superseded(previous_item_id) archive.items = items_buffer.chunks - data = msgpack.packb(archive.as_dict(), unicode_errors='surrogateescape') + data = msgpack.packb(archive.as_dict()) new_archive_id = self.key.id_hash(data) cdata = self.key.encrypt(data) add_reference(new_archive_id, len(data), len(cdata), cdata) diff --git a/src/borg/archiver.py b/src/borg/archiver.py index 100b57486..305f73c54 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -29,8 +29,6 @@ from .logger import create_logger, setup_logging logger = create_logger() -import msgpack - import borg from . import __version__ from . import helpers @@ -68,6 +66,7 @@ from .helpers import ChunkIteratorFileWrapper from .helpers import popen_with_error_handling, prepare_subprocess_env from .helpers import dash_open from .helpers import umount +from .helpers import msgpack from .nanorst import rst_to_terminal from .patterns import ArgparsePatternAction, ArgparseExcludeFileAction, ArgparsePatternFileAction, parse_exclude_pattern from .patterns import PatternMatcher @@ -1705,7 +1704,7 @@ class Archiver: fd.write(',\n') data = key.decrypt(archive_meta_orig[b'id'], repository.get(archive_meta_orig[b'id'])) - archive_org_dict = msgpack.unpackb(data, object_hook=StableDict, unicode_errors='surrogateescape') + archive_org_dict = msgpack.unpackb(data, object_hook=StableDict) fd.write(' "_meta":\n') fd.write(do_indent(prepare_dump_dict(archive_org_dict))) @@ -1738,7 +1737,7 @@ class Archiver: data = key.decrypt(None, repository.get(manifest.MANIFEST_ID)) - meta = prepare_dump_dict(msgpack.fallback.unpackb(data, object_hook=StableDict, unicode_errors='surrogateescape')) + meta = prepare_dump_dict(msgpack.unpackb(data, object_hook=StableDict)) with dash_open(args.path, 'w') as fd: json.dump(meta, fd, indent=4) @@ -1844,7 +1843,7 @@ class Archiver: """convert Borg profile to Python profile""" import marshal with args.output, args.input: - marshal.dump(msgpack.unpack(args.input, use_list=False, encoding='utf-8'), args.output) + marshal.dump(msgpack.mp_unpack(args.input, use_list=False, raw=False), args.output) return EXIT_SUCCESS @with_repository(lock=False, manifest=False) @@ -4107,7 +4106,8 @@ class Archiver: # into a marshal file that can be read by e.g. pyprof2calltree. # For local use it's unnecessary hassle, though, that's why .pyprof makes # it compatible (see above). - msgpack.pack(profiler.stats, fd, use_bin_type=True) + # We do not use our msgpack wrapper here, but directly call mp_pack. + msgpack.mp_pack(profiler.stats, fd, use_bin_type=True) else: return set_ec(func(args)) diff --git a/src/borg/cache.py b/src/borg/cache.py index 6bc06adfd..ef1b733cf 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -6,8 +6,6 @@ from binascii import unhexlify from collections import namedtuple from time import perf_counter -import msgpack - from .logger import create_logger logger = create_logger() @@ -26,6 +24,7 @@ from .helpers import remove_surrogates from .helpers import ProgressIndicatorPercent, ProgressIndicatorMessage from .helpers import set_ec, EXIT_WARNING from .helpers import truncate_and_unlink +from .helpers import msgpack from .item import ArchiveItem, ChunkListEntry from .crypto.key import PlaintextKey from .crypto.file_integrity import IntegrityCheckedFile, DetachedIntegrityCheckedFile, FileIntegrityError diff --git a/src/borg/crypto/key.py b/src/borg/crypto/key.py index 28b34c25b..498eb142f 100644 --- a/src/borg/crypto/key.py +++ b/src/borg/crypto/key.py @@ -9,8 +9,6 @@ from binascii import a2b_base64, b2a_base64, hexlify from hashlib import sha256, sha512, pbkdf2_hmac from hmac import HMAC, compare_digest -import msgpack - from ..logger import create_logger logger = create_logger() @@ -24,6 +22,7 @@ from ..helpers import get_keys_dir, get_security_dir from ..helpers import get_limited_unpacker from ..helpers import bin_to_hex from ..helpers import prepare_subprocess_env +from ..helpers import msgpack from ..item import Key, EncryptedKey from ..platform import SaveFile @@ -212,10 +211,10 @@ class KeyBase: 'hmac': bytes(64), 'salt': os.urandom(64), }) - packed = msgpack.packb(metadata_dict, unicode_errors='surrogateescape') + packed = msgpack.packb(metadata_dict) tam_key = self._tam_key(tam['salt'], context) tam['hmac'] = HMAC(tam_key, packed, sha512).digest() - return msgpack.packb(metadata_dict, unicode_errors='surrogateescape') + return msgpack.packb(metadata_dict) def unpack_and_verify_manifest(self, data, force_tam_not_required=False): """Unpack msgpacked *data* and return (object, did_verify).""" diff --git a/src/borg/fuse.py b/src/borg/fuse.py index c6638e4cb..ba5284d35 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -11,7 +11,6 @@ from signal import SIGINT from distutils.version import LooseVersion import llfuse -import msgpack from .logger import create_logger logger = create_logger() @@ -21,6 +20,7 @@ from .archiver import Archiver from .archive import Archive from .hashindex import FuseVersionsIndex from .helpers import daemonize, hardlinkable, signal_handler, format_file_size +from .helpers import msgpack from .item import Item from .lrucache import LRUCache from .remote import RemoteRepository diff --git a/src/borg/helpers/__init__.py b/src/borg/helpers/__init__.py index b1c875a4e..d8e74d344 100644 --- a/src/borg/helpers/__init__.py +++ b/src/borg/helpers/__init__.py @@ -12,7 +12,6 @@ from .errors import * # NOQA from .fs import * # NOQA from .manifest import * # NOQA from .misc import * # NOQA -from .msgpack import * # NOQA from .parseformat import * # NOQA from .process import * # NOQA from .progress import * # NOQA @@ -20,6 +19,9 @@ from .time import * # NOQA from .usergroup import * # NOQA from .yes import * # NOQA +from .msgpack import is_slow_msgpack, int_to_bigint, bigint_to_int, get_limited_unpacker +from . import msgpack + """ The global exit_code variable is used so that modules other than archiver can increase the program exit code if a warning or error occurred during their operation. This is different from archiver.exit_code, which is only accessible diff --git a/src/borg/helpers/msgpack.py b/src/borg/helpers/msgpack.py index 72bc1f45e..eec48494b 100644 --- a/src/borg/helpers/msgpack.py +++ b/src/borg/helpers/msgpack.py @@ -1,11 +1,147 @@ -import msgpack -import msgpack.fallback - from .datastruct import StableDict from ..constants import * # NOQA +# wrapping msgpack --------------------------------------------------------------------------------------------------- +# +# due to the planned breaking api changes in upstream msgpack, we wrap it the way we need it - +# to avoid having lots of clutter in the calling code. see tickets #968 and #3632. +# +# Packing +# ------- +# use_bin_type = False is needed to generate the old msgpack format (not msgpack 2.0 spec) as borg always did. +# encoding = None is needed because usage of it is deprecated +# unicode_errors = None is needed because usage of it is deprecated +# +# Unpacking +# --------- +# raw = True is needed to unpack the old msgpack format to bytes (not str, about the decoding see item.pyx). +# encoding = None is needed because usage of it is deprecated +# unicode_errors = None is needed because usage of it is deprecated + +from msgpack import Packer as mp_Packer +from msgpack import packb as mp_packb +from msgpack import pack as mp_pack +from msgpack import Unpacker as mp_Unpacker +from msgpack import unpackb as mp_unpackb +from msgpack import unpack as mp_unpack + +from msgpack import ExtType +from msgpack import OutOfData + + +class PackException(Exception): + """Exception while msgpack packing""" + + +class UnpackException(Exception): + """Exception while msgpack unpacking""" + + +class Packer(mp_Packer): + def __init__(self, *, default=None, encoding=None, unicode_errors=None, + use_single_float=False, autoreset=True, use_bin_type=False, + strict_types=False): + assert use_bin_type is False + assert encoding is None + assert unicode_errors is None + super().__init__(default=default, encoding=encoding, unicode_errors=unicode_errors, + use_single_float=use_single_float, autoreset=autoreset, use_bin_type=use_bin_type, + strict_types=strict_types) + + def pack(self, obj): + try: + return super().pack(obj) + except Exception as e: + raise PackException(e) + + +def packb(o, *, use_bin_type=False, encoding=None, unicode_errors=None, **kwargs): + assert use_bin_type is False + assert encoding is None + assert unicode_errors is None + try: + return mp_packb(o, use_bin_type=use_bin_type, encoding=encoding, unicode_errors=unicode_errors, **kwargs) + except Exception as e: + raise PackException(e) + + +def pack(o, stream, *, use_bin_type=False, encoding=None, unicode_errors=None, **kwargs): + assert use_bin_type is False + assert encoding is None + assert unicode_errors is None + try: + return mp_pack(o, stream, use_bin_type=use_bin_type, encoding=encoding, unicode_errors=unicode_errors, **kwargs) + except Exception as e: + raise PackException(e) + + +class Unpacker(mp_Unpacker): + def __init__(self, file_like=None, *, read_size=0, use_list=True, raw=True, + object_hook=None, object_pairs_hook=None, list_hook=None, + encoding=None, unicode_errors=None, max_buffer_size=0, + ext_hook=ExtType, + max_str_len=2147483647, # 2**32-1 + max_bin_len=2147483647, + max_array_len=2147483647, + max_map_len=2147483647, + max_ext_len=2147483647): + assert raw is True + assert encoding is None + assert unicode_errors is None + super().__init__(file_like=file_like, read_size=read_size, use_list=use_list, raw=raw, + object_hook=object_hook, object_pairs_hook=object_pairs_hook, list_hook=list_hook, + encoding=encoding, unicode_errors=unicode_errors, max_buffer_size=max_buffer_size, + ext_hook=ext_hook, + max_str_len=max_str_len, + max_bin_len=max_bin_len, + max_array_len=max_array_len, + max_map_len=max_map_len, + max_ext_len=max_ext_len) + + def unpack(self): + try: + return super().unpack() + except OutOfData: + raise + except Exception as e: + raise UnpackException(e) + + def __next__(self): + try: + return super().__next__() + except StopIteration: + raise + except Exception as e: + raise UnpackException(e) + + next = __next__ + + +def unpackb(packed, *, raw=True, encoding=None, unicode_errors=None, **kwargs): + assert raw is True + assert encoding is None + assert unicode_errors is None + try: + return mp_unpackb(packed, raw=raw, encoding=encoding, unicode_errors=unicode_errors, **kwargs) + except Exception as e: + raise UnpackException(e) + + +def unpack(stream, *, raw=True, encoding=None, unicode_errors=None, **kwargs): + assert raw is True + assert encoding is None + assert unicode_errors is None + try: + return mp_unpack(stream, raw=raw, encoding=encoding, unicode_errors=unicode_errors, **kwargs) + except Exception as e: + raise UnpackException(e) + + +# msgpacking related utilities ----------------------------------------------- def is_slow_msgpack(): + import msgpack + import msgpack.fallback return msgpack.Packer is msgpack.fallback.Packer @@ -31,7 +167,6 @@ def get_limited_unpacker(kind): max_map_len=MAX_ARCHIVES, # list of archives max_str_len=255, # archive name object_hook=StableDict, - unicode_errors='surrogateescape', )) elif kind == 'key': args.update(dict(use_list=True, # default value @@ -39,11 +174,10 @@ def get_limited_unpacker(kind): max_map_len=10, # EncryptedKey dict max_str_len=4000, # inner key data object_hook=StableDict, - unicode_errors='surrogateescape', )) else: raise ValueError('kind must be "server", "client", "manifest" or "key"') - return msgpack.Unpacker(**args) + return Unpacker(**args) def bigint_to_int(mtime): diff --git a/src/borg/remote.py b/src/borg/remote.py index 44a7d8e87..787b4bdb9 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -15,8 +15,6 @@ import time import traceback from subprocess import Popen, PIPE -import msgpack - from . import __version__ from .compress import LZ4 from .constants import * # NOQA @@ -31,6 +29,7 @@ from .helpers import format_file_size from .helpers import truncate_and_unlink from .helpers import prepare_subprocess_env from .logger import create_logger, setup_logging +from .helpers import msgpack from .repository import Repository from .version import parse_version, format_version from .algorithms.checksums import xxh64 diff --git a/src/borg/repository.py b/src/borg/repository.py index 0b2a20bf2..22cd01efe 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -11,8 +11,6 @@ from datetime import datetime from functools import partial from itertools import islice -import msgpack - from .constants import * # NOQA from .hashindex import NSIndex from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size @@ -22,6 +20,7 @@ from .helpers import bin_to_hex from .helpers import hostname_is_unique from .helpers import secure_erase, truncate_and_unlink from .helpers import Manifest +from .helpers import msgpack from .locking import Lock, LockError, LockErrorT from .logger import create_logger from .lrucache import LRUCache @@ -513,7 +512,7 @@ class Repository: try: with IntegrityCheckedFile(hints_path, write=False, integrity_data=integrity_data) as fd: hints = msgpack.unpack(fd) - except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError, FileIntegrityError) as e: + except (msgpack.UnpackException, FileNotFoundError, FileIntegrityError) as e: logger.warning('Repository hints file missing or corrupted, trying to recover: %s', e) if not isinstance(e, FileNotFoundError): os.unlink(hints_path) diff --git a/src/borg/testsuite/archive.py b/src/borg/testsuite/archive.py index bc1133524..0decb9af9 100644 --- a/src/borg/testsuite/archive.py +++ b/src/borg/testsuite/archive.py @@ -3,7 +3,6 @@ from datetime import datetime, timezone from io import StringIO from unittest.mock import Mock -import msgpack import pytest from . import BaseTestCase @@ -11,6 +10,7 @@ from ..crypto.key import PlaintextKey from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, valid_msgpacked_dict, ITEM_KEYS, Statistics from ..archive import BackupOSError, backup_io, backup_io_iter from ..helpers import Manifest +from ..helpers import msgpack from ..item import Item, ArchiveItem diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py index b9a003883..48278d1d2 100644 --- a/src/borg/testsuite/archiver.py +++ b/src/borg/testsuite/archiver.py @@ -23,7 +23,6 @@ from hashlib import sha256 from io import BytesIO, StringIO from unittest.mock import patch -import msgpack import pytest try: @@ -46,6 +45,7 @@ from ..helpers import Manifest, MandatoryFeatureUnsupported from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR from ..helpers import bin_to_hex from ..helpers import MAX_S +from ..helpers import msgpack from ..nanorst import RstToTextLazy, rst_to_terminal from ..patterns import IECommand, PatternMatcher, parse_pattern from ..item import Item, ItemDiff diff --git a/src/borg/testsuite/helpers.py b/src/borg/testsuite/helpers.py index 222115d2a..2d329c075 100644 --- a/src/borg/testsuite/helpers.py +++ b/src/borg/testsuite/helpers.py @@ -8,9 +8,6 @@ from time import sleep import pytest -import msgpack -import msgpack.fallback - from .. import platform from ..helpers import Location from ..helpers import Buffer @@ -19,6 +16,7 @@ from ..helpers import make_path_safe, clean_lines from ..helpers import interval, prune_within, prune_split from ..helpers import get_base_dir, get_cache_dir, get_keys_dir, get_security_dir, get_config_dir from ..helpers import is_slow_msgpack +from ..helpers import msgpack from ..helpers import yes, TRUISH, FALSISH, DEFAULTISH from ..helpers import StableDict, int_to_bigint, bigint_to_int, bin_to_hex from ..helpers import parse_timestamp, ChunkIteratorFileWrapper, ChunkerParams @@ -594,6 +592,9 @@ def test_parse_file_size_invalid(string): def test_is_slow_msgpack(): + # we need to import upstream msgpack package here, not helpers.msgpack: + import msgpack + import msgpack.fallback saved_packer = msgpack.Packer try: msgpack.Packer = msgpack.fallback.Packer diff --git a/src/borg/testsuite/key.py b/src/borg/testsuite/key.py index 075311744..1046e1ad1 100644 --- a/src/borg/testsuite/key.py +++ b/src/borg/testsuite/key.py @@ -4,7 +4,6 @@ import re import tempfile from binascii import hexlify, unhexlify -import msgpack import pytest from ..crypto.key import Passphrase, PasswordRetriesExceeded, bin_to_hex @@ -19,6 +18,7 @@ from ..helpers import IntegrityError from ..helpers import Location from ..helpers import StableDict from ..helpers import get_security_dir +from ..helpers import msgpack class TestKey: @@ -333,7 +333,7 @@ class TestTAM: key.unpack_and_verify_manifest(blob) blob = b'\xc1\xc1\xc1' - with pytest.raises((ValueError, msgpack.UnpackException)): + with pytest.raises(msgpack.UnpackException): key.unpack_and_verify_manifest(blob) def test_missing_when_required(self, key): diff --git a/src/borg/testsuite/repository.py b/src/borg/testsuite/repository.py index 6af9da6bb..9cdd1322f 100644 --- a/src/borg/testsuite/repository.py +++ b/src/borg/testsuite/repository.py @@ -6,13 +6,12 @@ import sys import tempfile from unittest.mock import patch -import msgpack - import pytest from ..hashindex import NSIndex from ..helpers import Location from ..helpers import IntegrityError +from ..helpers import msgpack from ..locking import Lock, LockFailed from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, ConnectionClosedWithHint, handle_remote_line from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE