Merge pull request #7037 from ThomasWaldmann/rcompress

rcompress: do a repo-wide (re)compression
This commit is contained in:
TW 2022-09-27 19:13:58 +02:00 committed by GitHub
commit ba7b9cfd30
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 359 additions and 15 deletions

View file

@ -127,6 +127,7 @@ per_file_ignores =
src/borg/archiver/help_cmd.py:E501,F405
src/borg/archiver/key_cmds.py:F405
src/borg/archiver/prune_cmd.py:F405
src/borg/archiver/rcompress_cmd.py:F405
src/borg/archiver/recreate_cmd.py:F405
src/borg/archiver/rdelete_cmd.py:F405
src/borg/archiver/rlist_cmd.py:E501
@ -163,6 +164,7 @@ per_file_ignores =
src/borg/testsuite/archiver/extract_cmd.py:F405
src/borg/testsuite/archiver/mount_cmds.py:E501,E722
src/borg/testsuite/archiver/prune_cmd.py:F405
src/borg/testsuite/archiver/rcompress_cmd.py:F405
src/borg/testsuite/archiver/recreate_cmd.py:F405
src/borg/testsuite/archiver/return_codes.py:F401,F405,F811
src/borg/testsuite/benchmark.py:F401,F811

View file

@ -79,6 +79,7 @@ from .list_cmd import ListMixIn
from .lock_cmds import LocksMixIn
from .mount_cmds import MountMixIn
from .prune_cmd import PruneMixIn
from .rcompress_cmd import RCompressMixIn
from .recreate_cmd import RecreateMixIn
from .rename_cmd import RenameMixIn
from .rcreate_cmd import RCreateMixIn
@ -109,6 +110,7 @@ class Archiver(
PruneMixIn,
RecreateMixIn,
RenameMixIn,
RCompressMixIn,
RCreateMixIn,
RDeleteMixIn,
RInfoMixIn,
@ -327,6 +329,7 @@ class Archiver(
self.build_parser_locks(subparsers, common_parser, mid_common_parser)
self.build_parser_mount_umount(subparsers, common_parser, mid_common_parser)
self.build_parser_prune(subparsers, common_parser, mid_common_parser)
self.build_parser_rcompress(subparsers, common_parser, mid_common_parser)
self.build_parser_rcreate(subparsers, common_parser, mid_common_parser)
self.build_parser_rdelete(subparsers, common_parser, mid_common_parser)
self.build_parser_rinfo(subparsers, common_parser, mid_common_parser)

View file

@ -0,0 +1,246 @@
import argparse
from collections import defaultdict
from ._common import with_repository
from ..constants import * # NOQA
from ..compress import CompressionSpec, ObfuscateSize, Auto, COMPRESSOR_TABLE
from ..helpers import sig_int, ProgressIndicatorPercent
from ..manifest import Manifest
from ..logger import create_logger
logger = create_logger()
def find_chunks(repository, repo_objs, stats, ctype, clevel, olevel):
"""find chunks that need processing (usually: recompression)."""
# to do it this way is maybe not obvious, thus keeping the essential design criteria here:
# - determine the chunk ids at one point in time (== do a **full** scan in one go) **before**
# writing to the repo (and especially before doing a compaction, which moves segment files around)
# - get the chunk ids in **on-disk order** (so we can efficiently compact while processing the chunks)
# - only put the ids into the list that actually need recompression (keeps it a little shorter in some cases)
recompress_ids = []
compr_keys = stats["compr_keys"] = set()
compr_wanted = ctype, clevel, olevel
state = None
chunks_count = len(repository)
chunks_limit = min(1000, max(100, chunks_count // 1000))
pi = ProgressIndicatorPercent(
total=chunks_count,
msg="Searching for recompression candidates %3.1f%%",
step=0.1,
msgid="rcompress.find_chunks",
)
while True:
chunk_ids, state = repository.scan(limit=chunks_limit, state=state)
if not chunk_ids:
break
for id, chunk_no_data in zip(chunk_ids, repository.get_many(chunk_ids, read_data=False)):
meta = repo_objs.parse_meta(id, chunk_no_data)
compr_found = meta["ctype"], meta["clevel"], meta.get("olevel", -1)
if compr_found != compr_wanted:
recompress_ids.append(id)
compr_keys.add(compr_found)
stats[compr_found] += 1
stats["checked_count"] += 1
pi.show(increase=1)
pi.finish()
return recompress_ids
def process_chunks(repository, repo_objs, stats, recompress_ids, olevel):
"""process some chunks (usually: recompress)"""
compr_keys = stats["compr_keys"]
if compr_keys == 0: # work around defaultdict(int)
compr_keys = stats["compr_keys"] = set()
for id, chunk in zip(recompress_ids, repository.get_many(recompress_ids, read_data=True)):
old_size = len(chunk)
stats["old_size"] += old_size
meta, data = repo_objs.parse(id, chunk)
compr_old = meta["ctype"], meta["clevel"], meta.get("olevel", -1)
if olevel == -1:
# if the chunk was obfuscated, but should not be in future, remove related metadata
meta.pop("olevel", None)
meta.pop("psize", None)
chunk = repo_objs.format(id, meta, data)
compr_done = meta["ctype"], meta["clevel"], meta.get("olevel", -1)
if compr_done != compr_old:
# we actually changed something
repository.put(id, chunk, wait=False)
repository.async_response(wait=False)
stats["new_size"] += len(chunk)
compr_keys.add(compr_done)
stats[compr_done] += 1
stats["recompressed_count"] += 1
else:
# It might be that the old chunk used compression none or lz4 (for whatever reason,
# including the old compressor being a DecidingCompressor) AND we used a
# DecidingCompressor now, which did NOT compress like we wanted, but decided
# to use the same compression (and obfuscation) we already had.
# In this case, we just keep the old chunk and do not rewrite it -
# This is important to avoid rewriting such chunks **again and again**.
stats["new_size"] += old_size
compr_keys.add(compr_old)
stats[compr_old] += 1
stats["kept_count"] += 1
def format_compression_spec(ctype, clevel, olevel):
obfuscation = "" if olevel == -1 else f"obfuscate,{olevel},"
for cname, cls in COMPRESSOR_TABLE.items():
if cls.ID == ctype:
cname = f"{cname}"
break
else:
cname = f"{ctype}"
clevel = f",{clevel}" if clevel != 255 else ""
return obfuscation + cname + clevel
class RCompressMixIn:
@with_repository(cache=False, manifest=True, exclusive=True, compatibility=(Manifest.Operation.CHECK,))
def do_rcompress(self, args, repository, manifest):
"""Repository (re-)compression"""
def get_csettings(c):
if isinstance(c, Auto):
return get_csettings(c.compressor)
if isinstance(c, ObfuscateSize):
ctype, clevel, _ = get_csettings(c.compressor)
olevel = c.level
return ctype, clevel, olevel
ctype, clevel, olevel = c.ID, c.level, -1
return ctype, clevel, olevel
repo_objs = manifest.repo_objs
ctype, clevel, olevel = get_csettings(repo_objs.compressor) # desired compression set by --compression
def checkpoint_func():
while repository.async_response(wait=True) is not None:
pass
repository.commit(compact=True)
stats_find = defaultdict(int)
stats_process = defaultdict(int)
recompress_ids = find_chunks(repository, repo_objs, stats_find, ctype, clevel, olevel)
recompress_candidate_count = len(recompress_ids)
chunks_limit = min(1000, max(100, recompress_candidate_count // 1000))
uncommitted_chunks = 0
# start a new transaction
data = repository.get(Manifest.MANIFEST_ID)
repository.put(Manifest.MANIFEST_ID, data)
uncommitted_chunks += 1
pi = ProgressIndicatorPercent(
total=len(recompress_ids), msg="Recompressing %3.1f%%", step=0.1, msgid="rcompress.process_chunks"
)
while recompress_ids:
if sig_int and sig_int.action_done():
break
ids, recompress_ids = recompress_ids[:chunks_limit], recompress_ids[chunks_limit:]
process_chunks(repository, repo_objs, stats_process, ids, olevel)
pi.show(increase=len(ids))
checkpointed = self.maybe_checkpoint(
checkpoint_func=checkpoint_func, checkpoint_interval=args.checkpoint_interval
)
uncommitted_chunks = 0 if checkpointed else (uncommitted_chunks + len(ids))
pi.finish()
if sig_int:
# Ctrl-C / SIGINT: do not checkpoint (commit) again, we already have a checkpoint in this case.
self.print_error("Got Ctrl-C / SIGINT.")
elif uncommitted_chunks > 0:
checkpoint_func()
if args.stats:
print()
print("Recompression stats:")
print(f"Size: previously {stats_process['old_size']} -> now {stats_process['new_size']} bytes.")
print(
f"Change: "
f"{stats_process['new_size'] - stats_process['old_size']} bytes == "
f"{100.0 * stats_process['new_size'] / stats_process['old_size']:3.2f}%"
)
print("Found chunks stats (before processing):")
for ck in stats_find["compr_keys"]:
pretty_ck = format_compression_spec(*ck)
print(f"{pretty_ck}: {stats_find[ck]}")
print(f"Total: {stats_find['checked_count']}")
print(f"Candidates for recompression: {recompress_candidate_count}")
print("Processed chunks stats (after processing):")
for ck in stats_process["compr_keys"]:
pretty_ck = format_compression_spec(*ck)
print(f"{pretty_ck}: {stats_process[ck]}")
print(f"Recompressed and rewritten: {stats_process['recompressed_count']}")
print(f"Kept as is: {stats_process['kept_count']}")
print(f"Total: {stats_process['recompressed_count'] + stats_process['kept_count']}")
return self.exit_code
def build_parser_rcompress(self, subparsers, common_parser, mid_common_parser):
from ._common import process_epilog
rcompress_epilog = process_epilog(
"""
Repository (re-)compression (and/or re-obfuscation).
Reads all chunks in the repository (in on-disk order, this is important for
compaction) and recompresses them if they are not already using the compression
type/level and obfuscation level given via ``--compression``.
If the outcome of the chunk processing indicates a change in compression
type/level or obfuscation level, the processed chunk is written to the repository.
Please note that the outcome might not always be the desired compression
type/level - if no compression gives a shorter output, that might be chosen.
Every ``--checkpoint-interval``, progress is committed to the repository and
the repository is compacted (this is to keep temporary repo space usage in bounds).
A lower checkpoint interval means lower temporary repo space usage, but also
slower progress due to higher overhead (and vice versa).
Please note that this command can not work in low (or zero) free disk space
conditions.
If the ``borg rcompress``process receives a SIGINT signal (Ctrl-C), the repo
will be committed and compacted and borg will terminate cleanly afterwards.
Both ``--progress`` and ``--stats`` are recommended when ``borg rcompress``
is used interactively.
You do **not** need to run ``borg compact`` after ``borg rcompress``.
"""
)
subparser = subparsers.add_parser(
"rcompress",
parents=[common_parser],
add_help=False,
description=self.do_rcompress.__doc__,
epilog=rcompress_epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
help=self.do_rcompress.__doc__,
)
subparser.set_defaults(func=self.do_rcompress)
subparser.add_argument(
"-C",
"--compression",
metavar="COMPRESSION",
dest="compression",
type=CompressionSpec,
default=CompressionSpec("lz4"),
help="select compression algorithm, see the output of the " '"borg help compression" command for details.',
)
subparser.add_argument("-s", "--stats", dest="stats", action="store_true", help="print statistics")
subparser.add_argument(
"-c",
"--checkpoint-interval",
metavar="SECONDS",
dest="checkpoint_interval",
type=int,
default=1800,
help="write checkpoint every SECONDS seconds (Default: 1800)",
)

View file

@ -61,3 +61,5 @@ class ZSTD(DecidingCompressor):
LZ4_COMPRESSOR: Type[LZ4]
NONE_COMPRESSOR: Type[CNONE]
COMPRESSOR_TABLE: dict

View file

@ -491,21 +491,29 @@ class Auto(CompressorBase):
return self._decide(meta, data)[0]
def compress(self, meta, data):
def get_meta(from_meta, to_meta):
for key in "ctype", "clevel", "csize":
if key in from_meta:
to_meta[key] = from_meta[key]
compressor, (cheap_meta, cheap_compressed_data) = self._decide(dict(meta), data)
if compressor in (LZ4_COMPRESSOR, NONE_COMPRESSOR):
# we know that trying to compress with expensive compressor is likely pointless,
# so we fallback to return the cheap compressed data.
return cheap_meta, cheap_compressed_data
get_meta(cheap_meta, meta)
return meta, cheap_compressed_data
# if we get here, the decider decided to try the expensive compressor.
# we also know that the compressed data returned by the decider is lz4 compressed.
expensive_meta, expensive_compressed_data = compressor.compress(dict(meta), data)
ratio = len(expensive_compressed_data) / len(cheap_compressed_data)
if ratio < 0.99:
# the expensive compressor managed to squeeze the data significantly better than lz4.
return expensive_meta, expensive_compressed_data
get_meta(expensive_meta, meta)
return meta, expensive_compressed_data
else:
# otherwise let's just store the lz4 data, which decompresses extremely fast.
return cheap_meta, cheap_compressed_data
get_meta(cheap_meta, meta)
return meta, cheap_compressed_data
def decompress(self, data):
raise NotImplementedError
@ -527,6 +535,7 @@ class ObfuscateSize(CompressorBase):
def __init__(self, level=None, compressor=None, legacy_mode=False):
super().__init__(level=level, legacy_mode=legacy_mode) # data will be encrypted, so we can tell the level
self.compressor = compressor
self.level = level
if level is None:
pass # decompression
elif 1 <= level <= 6:
@ -557,7 +566,6 @@ class ObfuscateSize(CompressorBase):
def compress(self, meta, data):
assert not self.legacy_mode # we never call this in legacy mode
meta = dict(meta) # make a copy, do not modify caller's dict
meta, compressed_data = self.compressor.compress(meta, data) # compress data
compr_size = len(compressed_data)
assert "csize" in meta, repr(meta)
@ -568,6 +576,7 @@ class ObfuscateSize(CompressorBase):
trailer = bytes(addtl_size)
obfuscated_data = compressed_data + trailer
meta["csize"] = len(obfuscated_data) # csize is the overall output size of this "obfuscation compressor"
meta["olevel"] = self.level # remember the obfuscation level, useful for rcompress
return meta, obfuscated_data # for borg2 it is enough that we have the payload size in meta["psize"]
def decompress(self, meta, data):

View file

@ -0,0 +1,83 @@
import os
from binascii import hexlify
from ...constants import * # NOQA
from ...repository import Repository
from ...manifest import Manifest
from ...compress import ZSTD, ZLIB, LZ4, CNONE
from . import ArchiverTestCaseBase, RK_ENCRYPTION
class ArchiverTestCase(ArchiverTestCaseBase):
def test_rcompress(self):
def check_compression(ctype, clevel, olevel):
"""check if all the chunks in the repo are compressed/obfuscated like expected"""
repository = Repository(self.repository_path, exclusive=True)
with repository:
manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
state = None
while True:
ids, state = repository.scan(limit=LIST_SCAN_LIMIT, state=state)
if not ids:
break
for id in ids:
chunk = repository.get(id, read_data=True)
meta, data = manifest.repo_objs.parse(id, chunk) # will also decompress according to metadata
m_olevel = meta.get("olevel", -1)
m_psize = meta.get("psize", -1)
print(
hexlify(id).decode(),
meta["ctype"],
meta["clevel"],
meta["csize"],
meta["size"],
m_olevel,
m_psize,
)
# this is not as easy as one thinks due to the DecidingCompressor choosing the smallest of
# (desired compressed, lz4 compressed, not compressed).
assert meta["ctype"] in (ctype, LZ4.ID, CNONE.ID)
assert meta["clevel"] in (clevel, 255) # LZ4 and CNONE has level 255
if olevel != -1: # we expect obfuscation
assert "psize" in meta
assert m_olevel == olevel
else:
assert "psize" not in meta
assert "olevel" not in meta
self.create_regular_file("file1", size=1024 * 10)
self.create_regular_file("file2", contents=os.urandom(1024 * 10))
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
cname, ctype, clevel, olevel = ZLIB.name, ZLIB.ID, 3, -1
self.cmd(f"--repo={self.repository_location}", "create", "test", "input", "-C", f"{cname},{clevel}")
check_compression(ctype, clevel, olevel)
cname, ctype, clevel, olevel = ZSTD.name, ZSTD.ID, 1, -1 # change compressor (and level)
self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"{cname},{clevel}")
check_compression(ctype, clevel, olevel)
cname, ctype, clevel, olevel = ZSTD.name, ZSTD.ID, 3, -1 # only change level
self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"{cname},{clevel}")
check_compression(ctype, clevel, olevel)
cname, ctype, clevel, olevel = ZSTD.name, ZSTD.ID, 3, 110 # only change to obfuscated
self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"obfuscate,{olevel},{cname},{clevel}")
check_compression(ctype, clevel, olevel)
cname, ctype, clevel, olevel = ZSTD.name, ZSTD.ID, 3, 112 # only change obfuscation level
self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"obfuscate,{olevel},{cname},{clevel}")
check_compression(ctype, clevel, olevel)
cname, ctype, clevel, olevel = ZSTD.name, ZSTD.ID, 3, -1 # change to not obfuscated
self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"{cname},{clevel}")
check_compression(ctype, clevel, olevel)
cname, ctype, clevel, olevel = ZLIB.name, ZLIB.ID, 1, -1
self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"auto,{cname},{clevel}")
check_compression(ctype, clevel, olevel)
cname, ctype, clevel, olevel = ZLIB.name, ZLIB.ID, 2, 111
self.cmd(f"--repo={self.repository_location}", "rcompress", "-C", f"obfuscate,{olevel},auto,{cname},{clevel}")
check_compression(ctype, clevel, olevel)

View file

@ -206,19 +206,18 @@ def test_obfuscate():
def test_obfuscate_meta():
compressor = CompressionSpec("obfuscate,3,lz4").compressor
meta_in = {}
meta = {}
data = bytes(10000)
meta_out, compressed = compressor.compress(meta_in, data)
assert "ctype" not in meta_in # do not modify dict of caller
assert "ctype" in meta_out
assert meta_out["ctype"] == LZ4.ID
assert "clevel" in meta_out
assert meta_out["clevel"] == 0xFF
assert "csize" in meta_out
csize = meta_out["csize"]
meta, compressed = compressor.compress(meta, data)
assert "ctype" in meta
assert meta["ctype"] == LZ4.ID
assert "clevel" in meta
assert meta["clevel"] == 0xFF
assert "csize" in meta
csize = meta["csize"]
assert csize == len(compressed) # this is the overall size
assert "psize" in meta_out
psize = meta_out["psize"]
assert "psize" in meta
psize = meta["psize"]
assert 0 < psize < 100
assert csize - psize >= 0 # there is a obfuscation trailer
trailer = compressed[psize:]