From 0847c3f9a5acd4f75499ef567f4914167cdbe815 Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sat, 1 Apr 2017 00:12:16 +0200 Subject: [PATCH] Unify ComprSpec and CompressionSpec; don't instanciate Compressors right away --- src/borg/archive.py | 4 +- src/borg/archiver.py | 9 ++- src/borg/compress.pyx | 120 ++++++++++++++++++++++----------- src/borg/helpers.py | 2 +- src/borg/testsuite/compress.py | 2 +- 5 files changed, 92 insertions(+), 45 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 57ca44cee..75dbeef33 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1660,14 +1660,14 @@ class ArchiveRecreater: chunk_id = self.key.id_hash(data) if chunk_id in self.seen_chunks: return self.cache.chunk_incref(chunk_id, target.stats) - chunk = Chunk(data, compressor=compressor) overwrite = self.recompress if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks: # Check if this chunk is already compressed the way we want it old_chunk = self.key.decrypt(None, self.repository.get(chunk_id), decompress=False) - if Compressor.detect(old_chunk.data).name == compressor.name: + if Compressor.detect(old_chunk.data).name == compressor.decide(data).name: # Stored chunk has the same compression we wanted overwrite = False + chunk = Chunk(data, compressor=compressor) chunk_entry = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite, wait=False) self.cache.repository.async_response(wait=False) self.seen_chunks.add(chunk_entry.id) diff --git a/src/borg/archiver.py b/src/borg/archiver.py index 43aa24c1b..ef6c9cee9 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -108,7 +108,14 @@ def with_repository(fake=False, invert_fake=False, create=False, lock=True, excl with repository: if manifest or cache: kwargs['manifest'], kwargs['key'] = Manifest.load(repository) - if args.__dict__.get('compression'): + # do_recreate uses args.compression is None as in band signalling for "don't recompress", + # note that it does not look at key.compressor. In this case the default compressor applies + # to new chunks. + # + # We can't use a check like `'compression' in args` (an argparse.Namespace speciality), + # since the compression attribute is set. So we need to see whether it's set to something + # true-ish, like a CompressionSpec instance. + if getattr(args, 'compression', False): kwargs['key'].compressor = args.compression.compressor if cache: with Cache(repository, kwargs['key'], kwargs['manifest'], diff --git a/src/borg/compress.pyx b/src/borg/compress.pyx index 4f80b83d1..a029a3730 100644 --- a/src/borg/compress.pyx +++ b/src/borg/compress.pyx @@ -28,17 +28,15 @@ decompressor. """ import zlib -from collections import namedtuple try: import lzma except ImportError: lzma = None -from .logger import create_logger from .helpers import Buffer, DecompressionError -API_VERSION = '1.1_02' +API_VERSION = '1.1_03' cdef extern from "lz4.h": int LZ4_compress_limitedOutput(const char* source, char* dest, int inputSize, int maxOutputSize) nogil @@ -66,11 +64,34 @@ cdef class CompressorBase: def __init__(self, **kwargs): pass + def decide(self, data): + """ + Return which compressor will perform the actual compression for *data*. + + This exists for a very specific case: If borg recreate is instructed to recompress + using Auto compression it needs to determine the _actual_ target compression of a chunk + in order to detect whether it should be recompressed. + + For all Compressors that are not Auto this always returns *self*. + """ + return self + def compress(self, data): + """ + Compress *data* (bytes) and return bytes result. Prepend the ID bytes of this compressor, + which is needed so that the correct decompressor can be used for decompression. + """ # add ID bytes return self.ID + data def decompress(self, data): + """ + Decompress *data* (bytes) and return bytes result. The leading Compressor ID + bytes need to be present. + + Only handles input generated by _this_ Compressor - for a general purpose + decompression method see *Compressor.decompress*. + """ # strip ID bytes return data[2:] @@ -222,22 +243,36 @@ class Auto(CompressorBase): ID = None name = 'auto' - logger = create_logger('borg.debug.file-compression') - def __init__(self, compressor): super().__init__() self.compressor = compressor self.lz4 = get_compressor('lz4') self.none = get_compressor('none') - def compress(self, data): + def _decide(self, data): + """ + Decides what to do with *data*. Returns (compressor, lz4_data). + + *lz4_data* is the LZ4 result if *compressor* is LZ4 as well, otherwise it is None. + """ lz4_data = self.lz4.compress(data) - if len(lz4_data) < 0.97 * len(data): - return self.compressor.compress(data) - elif len(lz4_data) < len(data): - return lz4_data + ratio = len(lz4_data) / len(data) + if ratio < 0.97: + return self.compressor, None + elif ratio < 1: + return self.lz4, lz4_data else: - return self.none.compress(data) + return self.none, None + + def decide(self, data): + return self._decide(data)[0] + + def compress(self, data): + compressor, lz4_data = self._decide(data) + if lz4_data is None: + return compressor.compress(data) + else: + return lz4_data def decompress(self, data): raise NotImplementedError @@ -288,35 +323,40 @@ class Compressor: raise ValueError('No decompressor for this data found: %r.', data[:2]) -ComprSpec = namedtuple('ComprSpec', ('name', 'spec', 'compressor')) - - -def CompressionSpec(s): - values = s.split(',') - count = len(values) - if count < 1: - raise ValueError - # --compression algo[,level] - name = values[0] - if name == 'none': - return ComprSpec(name=name, spec=None, compressor=CNONE()) - elif name == 'lz4': - return ComprSpec(name=name, spec=None, compressor=LZ4()) - if name in ('zlib', 'lzma', ): - if count < 2: - level = 6 # default compression level in py stdlib - elif count == 2: - level = int(values[1]) - if not 0 <= level <= 9: +class CompressionSpec: + def __init__(self, s): + values = s.split(',') + count = len(values) + if count < 1: + raise ValueError + # --compression algo[,level] + self.name = values[0] + if self.name in ('none', 'lz4', ): + return + elif self.name in ('zlib', 'lzma', ): + if count < 2: + level = 6 # default compression level in py stdlib + elif count == 2: + level = int(values[1]) + if not 0 <= level <= 9: + raise ValueError + else: raise ValueError + self.level = level + elif self.name == 'auto': + if 2 <= count <= 3: + compression = ','.join(values[1:]) + else: + raise ValueError + self.inner = CompressionSpec(compression) else: raise ValueError - return ComprSpec(name=name, spec=level, compressor=get_compressor(name, level=level)) - if name == 'auto': - if 2 <= count <= 3: - compression = ','.join(values[1:]) - else: - raise ValueError - inner = CompressionSpec(compression) - return ComprSpec(name=name, spec=inner, compressor=Auto(inner.compressor)) - raise ValueError + + @property + def compressor(self): + if self.name in ('none', 'lz4', ): + return get_compressor(self.name) + elif self.name in ('zlib', 'lzma', ): + return get_compressor(self.name, level=self.level) + elif self.name == 'auto': + return get_compressor(self.name, compressor=self.inner.compressor) diff --git a/src/borg/helpers.py b/src/borg/helpers.py index c1306e013..c168bfd5b 100644 --- a/src/borg/helpers.py +++ b/src/borg/helpers.py @@ -123,7 +123,7 @@ def check_extension_modules(): raise ExtensionModuleError if chunker.API_VERSION != '1.1_01': raise ExtensionModuleError - if compress.API_VERSION != '1.1_02': + if compress.API_VERSION != '1.1_03': raise ExtensionModuleError if crypto.API_VERSION != '1.1_01': raise ExtensionModuleError diff --git a/src/borg/testsuite/compress.py b/src/borg/testsuite/compress.py index 9bcf595c1..ee6da55a1 100644 --- a/src/borg/testsuite/compress.py +++ b/src/borg/testsuite/compress.py @@ -7,7 +7,7 @@ except ImportError: import pytest -from ..compress import get_compressor, Compressor, CompressionSpec, ComprSpec, CNONE, ZLIB, LZ4, LZMA, Auto +from ..compress import get_compressor, Compressor, CompressionSpec, CNONE, ZLIB, LZ4, LZMA, Auto buffer = bytes(2**16)