diff --git a/setup.cfg b/setup.cfg index d6fc1ddd8..127e86356 100644 --- a/setup.cfg +++ b/setup.cfg @@ -127,6 +127,7 @@ per_file_ignores = src/borg/archiver/help_cmd.py:E501,F405 src/borg/archiver/key_cmds.py:F405 src/borg/archiver/prune_cmd.py:F405 + src/borg/archiver/rcompress_cmd.py:F405 src/borg/archiver/recreate_cmd.py:F405 src/borg/archiver/rdelete_cmd.py:F405 src/borg/archiver/rlist_cmd.py:E501 @@ -163,6 +164,7 @@ per_file_ignores = src/borg/testsuite/archiver/extract_cmd.py:F405 src/borg/testsuite/archiver/mount_cmds.py:E501,E722 src/borg/testsuite/archiver/prune_cmd.py:F405 + src/borg/testsuite/archiver/rcompress_cmd.py:F405 src/borg/testsuite/archiver/recreate_cmd.py:F405 src/borg/testsuite/archiver/return_codes.py:F401,F405,F811 src/borg/testsuite/benchmark.py:F401,F811 diff --git a/src/borg/archiver/__init__.py b/src/borg/archiver/__init__.py index 07cfedfec..635e86835 100644 --- a/src/borg/archiver/__init__.py +++ b/src/borg/archiver/__init__.py @@ -79,6 +79,7 @@ from .list_cmd import ListMixIn from .lock_cmds import LocksMixIn from .mount_cmds import MountMixIn from .prune_cmd import PruneMixIn +from .rcompress_cmd import RCompressMixIn from .recreate_cmd import RecreateMixIn from .rename_cmd import RenameMixIn from .rcreate_cmd import RCreateMixIn @@ -109,6 +110,7 @@ class Archiver( PruneMixIn, RecreateMixIn, RenameMixIn, + RCompressMixIn, RCreateMixIn, RDeleteMixIn, RInfoMixIn, @@ -327,6 +329,7 @@ class Archiver( self.build_parser_locks(subparsers, common_parser, mid_common_parser) self.build_parser_mount_umount(subparsers, common_parser, mid_common_parser) self.build_parser_prune(subparsers, common_parser, mid_common_parser) + self.build_parser_rcompress(subparsers, common_parser, mid_common_parser) self.build_parser_rcreate(subparsers, common_parser, mid_common_parser) self.build_parser_rdelete(subparsers, common_parser, mid_common_parser) self.build_parser_rinfo(subparsers, common_parser, mid_common_parser) diff --git a/src/borg/archiver/rcompress_cmd.py b/src/borg/archiver/rcompress_cmd.py new file mode 100644 index 000000000..ff43193cf --- /dev/null +++ b/src/borg/archiver/rcompress_cmd.py @@ -0,0 +1,246 @@ +import argparse +from collections import defaultdict + +from ._common import with_repository +from ..constants import * # NOQA +from ..compress import CompressionSpec, ObfuscateSize, Auto, COMPRESSOR_TABLE +from ..helpers import sig_int, ProgressIndicatorPercent + +from ..manifest import Manifest + +from ..logger import create_logger + +logger = create_logger() + + +def find_chunks(repository, repo_objs, stats, ctype, clevel, olevel): + """find chunks that need processing (usually: recompression).""" + # to do it this way is maybe not obvious, thus keeping the essential design criteria here: + # - determine the chunk ids at one point in time (== do a **full** scan in one go) **before** + # writing to the repo (and especially before doing a compaction, which moves segment files around) + # - get the chunk ids in **on-disk order** (so we can efficiently compact while processing the chunks) + # - only put the ids into the list that actually need recompression (keeps it a little shorter in some cases) + recompress_ids = [] + compr_keys = stats["compr_keys"] = set() + compr_wanted = ctype, clevel, olevel + state = None + chunks_count = len(repository) + chunks_limit = min(1000, max(100, chunks_count // 1000)) + pi = ProgressIndicatorPercent( + total=chunks_count, + msg="Searching for recompression candidates %3.1f%%", + step=0.1, + msgid="rcompress.find_chunks", + ) + while True: + chunk_ids, state = repository.scan(limit=chunks_limit, state=state) + if not chunk_ids: + break + for id, chunk_no_data in zip(chunk_ids, repository.get_many(chunk_ids, read_data=False)): + meta = repo_objs.parse_meta(id, chunk_no_data) + compr_found = meta["ctype"], meta["clevel"], meta.get("olevel", -1) + if compr_found != compr_wanted: + recompress_ids.append(id) + compr_keys.add(compr_found) + stats[compr_found] += 1 + stats["checked_count"] += 1 + pi.show(increase=1) + pi.finish() + return recompress_ids + + +def process_chunks(repository, repo_objs, stats, recompress_ids, olevel): + """process some chunks (usually: recompress)""" + compr_keys = stats["compr_keys"] + if compr_keys == 0: # work around defaultdict(int) + compr_keys = stats["compr_keys"] = set() + for id, chunk in zip(recompress_ids, repository.get_many(recompress_ids, read_data=True)): + old_size = len(chunk) + stats["old_size"] += old_size + meta, data = repo_objs.parse(id, chunk) + compr_old = meta["ctype"], meta["clevel"], meta.get("olevel", -1) + if olevel == -1: + # if the chunk was obfuscated, but should not be in future, remove related metadata + meta.pop("olevel", None) + meta.pop("psize", None) + chunk = repo_objs.format(id, meta, data) + compr_done = meta["ctype"], meta["clevel"], meta.get("olevel", -1) + if compr_done != compr_old: + # we actually changed something + repository.put(id, chunk, wait=False) + repository.async_response(wait=False) + stats["new_size"] += len(chunk) + compr_keys.add(compr_done) + stats[compr_done] += 1 + stats["recompressed_count"] += 1 + else: + # It might be that the old chunk used compression none or lz4 (for whatever reason, + # including the old compressor being a DecidingCompressor) AND we used a + # DecidingCompressor now, which did NOT compress like we wanted, but decided + # to use the same compression (and obfuscation) we already had. + # In this case, we just keep the old chunk and do not rewrite it - + # This is important to avoid rewriting such chunks **again and again**. + stats["new_size"] += old_size + compr_keys.add(compr_old) + stats[compr_old] += 1 + stats["kept_count"] += 1 + + +def format_compression_spec(ctype, clevel, olevel): + obfuscation = "" if olevel == -1 else f"obfuscate,{olevel}," + for cname, cls in COMPRESSOR_TABLE.items(): + if cls.ID == ctype: + cname = f"{cname}" + break + else: + cname = f"{ctype}" + clevel = f",{clevel}" if clevel != 255 else "" + return obfuscation + cname + clevel + + +class RCompressMixIn: + @with_repository(cache=False, manifest=True, exclusive=True, compatibility=(Manifest.Operation.CHECK,)) + def do_rcompress(self, args, repository, manifest): + """Repository (re-)compression""" + + def get_csettings(c): + if isinstance(c, Auto): + return get_csettings(c.compressor) + if isinstance(c, ObfuscateSize): + ctype, clevel, _ = get_csettings(c.compressor) + olevel = c.level + return ctype, clevel, olevel + ctype, clevel, olevel = c.ID, c.level, -1 + return ctype, clevel, olevel + + repo_objs = manifest.repo_objs + ctype, clevel, olevel = get_csettings(repo_objs.compressor) # desired compression set by --compression + + def checkpoint_func(): + while repository.async_response(wait=True) is not None: + pass + repository.commit(compact=True) + + stats_find = defaultdict(int) + stats_process = defaultdict(int) + recompress_ids = find_chunks(repository, repo_objs, stats_find, ctype, clevel, olevel) + recompress_candidate_count = len(recompress_ids) + chunks_limit = min(1000, max(100, recompress_candidate_count // 1000)) + uncommitted_chunks = 0 + + # start a new transaction + data = repository.get(Manifest.MANIFEST_ID) + repository.put(Manifest.MANIFEST_ID, data) + uncommitted_chunks += 1 + + pi = ProgressIndicatorPercent( + total=len(recompress_ids), msg="Recompressing %3.1f%%", step=0.1, msgid="rcompress.process_chunks" + ) + while recompress_ids: + if sig_int and sig_int.action_done(): + break + ids, recompress_ids = recompress_ids[:chunks_limit], recompress_ids[chunks_limit:] + process_chunks(repository, repo_objs, stats_process, ids, olevel) + pi.show(increase=len(ids)) + checkpointed = self.maybe_checkpoint( + checkpoint_func=checkpoint_func, checkpoint_interval=args.checkpoint_interval + ) + uncommitted_chunks = 0 if checkpointed else (uncommitted_chunks + len(ids)) + pi.finish() + if sig_int: + # Ctrl-C / SIGINT: do not checkpoint (commit) again, we already have a checkpoint in this case. + self.print_error("Got Ctrl-C / SIGINT.") + elif uncommitted_chunks > 0: + checkpoint_func() + if args.stats: + print() + print("Recompression stats:") + print(f"Size: previously {stats_process['old_size']} -> now {stats_process['new_size']} bytes.") + print( + f"Change: " + f"{stats_process['new_size'] - stats_process['old_size']} bytes == " + f"{100.0 * stats_process['new_size'] / stats_process['old_size']:3.2f}%" + ) + print("Found chunks stats (before processing):") + for ck in stats_find["compr_keys"]: + pretty_ck = format_compression_spec(*ck) + print(f"{pretty_ck}: {stats_find[ck]}") + print(f"Total: {stats_find['checked_count']}") + + print(f"Candidates for recompression: {recompress_candidate_count}") + + print("Processed chunks stats (after processing):") + for ck in stats_process["compr_keys"]: + pretty_ck = format_compression_spec(*ck) + print(f"{pretty_ck}: {stats_process[ck]}") + print(f"Recompressed and rewritten: {stats_process['recompressed_count']}") + print(f"Kept as is: {stats_process['kept_count']}") + print(f"Total: {stats_process['recompressed_count'] + stats_process['kept_count']}") + + return self.exit_code + + def build_parser_rcompress(self, subparsers, common_parser, mid_common_parser): + from ._common import process_epilog + + rcompress_epilog = process_epilog( + """ + Repository (re-)compression (and/or re-obfuscation). + + Reads all chunks in the repository (in on-disk order, this is important for + compaction) and recompresses them if they are not already using the compression + type/level and obfuscation level given via ``--compression``. + + If the outcome of the chunk processing indicates a change in compression + type/level or obfuscation level, the processed chunk is written to the repository. + Please note that the outcome might not always be the desired compression + type/level - if no compression gives a shorter output, that might be chosen. + + Every ``--checkpoint-interval``, progress is committed to the repository and + the repository is compacted (this is to keep temporary repo space usage in bounds). + A lower checkpoint interval means lower temporary repo space usage, but also + slower progress due to higher overhead (and vice versa). + + Please note that this command can not work in low (or zero) free disk space + conditions. + + If the ``borg rcompress``process receives a SIGINT signal (Ctrl-C), the repo + will be committed and compacted and borg will terminate cleanly afterwards. + + Both ``--progress`` and ``--stats`` are recommended when ``borg rcompress`` + is used interactively. + + You do **not** need to run ``borg compact`` after ``borg rcompress``. + """ + ) + subparser = subparsers.add_parser( + "rcompress", + parents=[common_parser], + add_help=False, + description=self.do_rcompress.__doc__, + epilog=rcompress_epilog, + formatter_class=argparse.RawDescriptionHelpFormatter, + help=self.do_rcompress.__doc__, + ) + subparser.set_defaults(func=self.do_rcompress) + + subparser.add_argument( + "-C", + "--compression", + metavar="COMPRESSION", + dest="compression", + type=CompressionSpec, + default=CompressionSpec("lz4"), + help="select compression algorithm, see the output of the " '"borg help compression" command for details.', + ) + + subparser.add_argument("-s", "--stats", dest="stats", action="store_true", help="print statistics") + + subparser.add_argument( + "-c", + "--checkpoint-interval", + metavar="SECONDS", + dest="checkpoint_interval", + type=int, + default=1800, + help="write checkpoint every SECONDS seconds (Default: 1800)", + ) diff --git a/src/borg/compress.pyi b/src/borg/compress.pyi index 34569f27a..010e5bfc9 100644 --- a/src/borg/compress.pyi +++ b/src/borg/compress.pyi @@ -61,3 +61,5 @@ class ZSTD(DecidingCompressor): LZ4_COMPRESSOR: Type[LZ4] NONE_COMPRESSOR: Type[CNONE] + +COMPRESSOR_TABLE: dict diff --git a/src/borg/compress.pyx b/src/borg/compress.pyx index ea2ad0f63..5d5e84181 100644 --- a/src/borg/compress.pyx +++ b/src/borg/compress.pyx @@ -491,21 +491,29 @@ class Auto(CompressorBase): return self._decide(meta, data)[0] def compress(self, meta, data): + def get_meta(from_meta, to_meta): + for key in "ctype", "clevel", "csize": + if key in from_meta: + to_meta[key] = from_meta[key] + compressor, (cheap_meta, cheap_compressed_data) = self._decide(dict(meta), data) if compressor in (LZ4_COMPRESSOR, NONE_COMPRESSOR): # we know that trying to compress with expensive compressor is likely pointless, # so we fallback to return the cheap compressed data. - return cheap_meta, cheap_compressed_data + get_meta(cheap_meta, meta) + return meta, cheap_compressed_data # if we get here, the decider decided to try the expensive compressor. # we also know that the compressed data returned by the decider is lz4 compressed. expensive_meta, expensive_compressed_data = compressor.compress(dict(meta), data) ratio = len(expensive_compressed_data) / len(cheap_compressed_data) if ratio < 0.99: # the expensive compressor managed to squeeze the data significantly better than lz4. - return expensive_meta, expensive_compressed_data + get_meta(expensive_meta, meta) + return meta, expensive_compressed_data else: # otherwise let's just store the lz4 data, which decompresses extremely fast. - return cheap_meta, cheap_compressed_data + get_meta(cheap_meta, meta) + return meta, cheap_compressed_data def decompress(self, data): raise NotImplementedError @@ -527,6 +535,7 @@ class ObfuscateSize(CompressorBase): def __init__(self, level=None, compressor=None, legacy_mode=False): super().__init__(level=level, legacy_mode=legacy_mode) # data will be encrypted, so we can tell the level self.compressor = compressor + self.level = level if level is None: pass # decompression elif 1 <= level <= 6: @@ -557,7 +566,6 @@ class ObfuscateSize(CompressorBase): def compress(self, meta, data): assert not self.legacy_mode # we never call this in legacy mode - meta = dict(meta) # make a copy, do not modify caller's dict meta, compressed_data = self.compressor.compress(meta, data) # compress data compr_size = len(compressed_data) assert "csize" in meta, repr(meta) @@ -568,6 +576,7 @@ class ObfuscateSize(CompressorBase): trailer = bytes(addtl_size) obfuscated_data = compressed_data + trailer meta["csize"] = len(obfuscated_data) # csize is the overall output size of this "obfuscation compressor" + meta["olevel"] = self.level # remember the obfuscation level, useful for rcompress return meta, obfuscated_data # for borg2 it is enough that we have the payload size in meta["psize"] def decompress(self, meta, data): diff --git a/src/borg/testsuite/archiver/rcompress_cmd.py b/src/borg/testsuite/archiver/rcompress_cmd.py new file mode 100644 index 000000000..8b5f57952 --- /dev/null +++ b/src/borg/testsuite/archiver/rcompress_cmd.py @@ -0,0 +1,83 @@ +import os +from binascii import hexlify + +from ...constants import * # NOQA +from ...repository import Repository +from ...manifest import Manifest +from ...compress import ZSTD, ZLIB, LZ4, CNONE + +from . import ArchiverTestCaseBase, RK_ENCRYPTION + + +class ArchiverTestCase(ArchiverTestCaseBase): + def test_rcompress(self): + def check_compression(ctype, clevel, olevel): + """check if all the chunks in the repo are compressed/obfuscated like expected""" + repository = Repository(self.repository_path, exclusive=True) + with repository: + manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) + state = None + while True: + ids, state = repository.scan(limit=LIST_SCAN_LIMIT, state=state) + if not ids: + break + for id in ids: + chunk = repository.get(id, read_data=True) + meta, data = manifest.repo_objs.parse(id, chunk) # will also decompress according to metadata + m_olevel = meta.get("olevel", -1) + m_psize = meta.get("psize", -1) + print( + hexlify(id).decode(), + meta["ctype"], + meta["clevel"], + meta["csize"], + meta["size"], + m_olevel, + m_psize, + ) + # this is not as easy as one thinks due to the DecidingCompressor choosing the smallest of + # (desired compressed, lz4 compressed, not compressed). + assert meta["ctype"] in (ctype, LZ4.ID, CNONE.ID) + assert meta["clevel"] in (clevel, 255) # LZ4 and CNONE has level 255 + if olevel != -1: # we expect obfuscation + assert "psize" in meta + assert m_olevel == olevel + else: + assert "psize" not in meta + assert "olevel" not in meta + + self.create_regular_file("file1", size=1024 * 10) + self.create_regular_file("file2", contents=os.urandom(1024 * 10)) + self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION) + + cname, ctype, clevel, olevel = ZLIB.name, ZLIB.ID, 3, -1 + self.cmd(f"--repo={self.repository_location}", "create", "test", "input", "-C", f"{cname},{clevel}") + check_compression(ctype, clevel, olevel) + + cname, ctype, clevel, olevel = ZSTD.name, ZSTD.ID, 1, -1 # change compressor (and level) + self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"{cname},{clevel}") + check_compression(ctype, clevel, olevel) + + cname, ctype, clevel, olevel = ZSTD.name, ZSTD.ID, 3, -1 # only change level + self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"{cname},{clevel}") + check_compression(ctype, clevel, olevel) + + cname, ctype, clevel, olevel = ZSTD.name, ZSTD.ID, 3, 110 # only change to obfuscated + self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"obfuscate,{olevel},{cname},{clevel}") + check_compression(ctype, clevel, olevel) + + cname, ctype, clevel, olevel = ZSTD.name, ZSTD.ID, 3, 112 # only change obfuscation level + self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"obfuscate,{olevel},{cname},{clevel}") + check_compression(ctype, clevel, olevel) + + cname, ctype, clevel, olevel = ZSTD.name, ZSTD.ID, 3, -1 # change to not obfuscated + self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"{cname},{clevel}") + check_compression(ctype, clevel, olevel) + + cname, ctype, clevel, olevel = ZLIB.name, ZLIB.ID, 1, -1 + self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"auto,{cname},{clevel}") + check_compression(ctype, clevel, olevel) + + cname, ctype, clevel, olevel = ZLIB.name, ZLIB.ID, 2, 111 + self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"obfuscate,{olevel},auto,{cname},{clevel}") + check_compression(ctype, clevel, olevel) diff --git a/src/borg/testsuite/compress.py b/src/borg/testsuite/compress.py index cfb16a85a..0b39d82e6 100644 --- a/src/borg/testsuite/compress.py +++ b/src/borg/testsuite/compress.py @@ -206,19 +206,18 @@ def test_obfuscate(): def test_obfuscate_meta(): compressor = CompressionSpec("obfuscate,3,lz4").compressor - meta_in = {} + meta = {} data = bytes(10000) - meta_out, compressed = compressor.compress(meta_in, data) - assert "ctype" not in meta_in # do not modify dict of caller - assert "ctype" in meta_out - assert meta_out["ctype"] == LZ4.ID - assert "clevel" in meta_out - assert meta_out["clevel"] == 0xFF - assert "csize" in meta_out - csize = meta_out["csize"] + meta, compressed = compressor.compress(meta, data) + assert "ctype" in meta + assert meta["ctype"] == LZ4.ID + assert "clevel" in meta + assert meta["clevel"] == 0xFF + assert "csize" in meta + csize = meta["csize"] assert csize == len(compressed) # this is the overall size - assert "psize" in meta_out - psize = meta_out["psize"] + assert "psize" in meta + psize = meta["psize"] assert 0 < psize < 100 assert csize - psize >= 0 # there is a obfuscation trailer trailer = compressed[psize:]