From 2ed7f317d3e9a65c1e5de17e403ba877d27d3754 Mon Sep 17 00:00:00 2001 From: Franco Ayala Date: Wed, 19 Oct 2022 16:40:02 -0300 Subject: [PATCH] Adding performance statistics to borg create (#6991) - file status A/M/E counters - chunking time - hashing time - rx_bytes / tx_bytes Note: the sleep() in the test is needed due to timestamp granularity on linux being much more coarse than expected (uses the system timer, 100Hz or 250Hz). --- src/borg/archive.py | 39 +++++++++++- src/borg/archiver/create_cmd.py | 6 ++ src/borg/chunker.pyx | 9 +++ src/borg/testsuite/archive.py | 8 +++ src/borg/testsuite/archiver/create_cmd.py | 77 +++++++++++++++++++++++ 5 files changed, 137 insertions(+), 2 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index f5b5cd403..325f2abd7 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -4,7 +4,7 @@ import os import stat import sys import time -from collections import OrderedDict +from collections import OrderedDict, defaultdict from contextlib import contextmanager from datetime import datetime, timedelta from functools import partial @@ -60,6 +60,11 @@ class Statistics: self.osize = self.usize = self.nfiles = 0 self.osize_parts = self.usize_parts = self.nfiles_parts = 0 self.last_progress = 0 # timestamp when last progress was shown + self.files_stats = defaultdict(int) + self.chunking_time = 0.0 + self.hashing_time = 0.0 + self.rx_bytes = 0 + self.tx_bytes = 0 def update(self, size, unique, part=False): if not part: @@ -81,15 +86,36 @@ class Statistics: stats.osize_parts = self.osize_parts + other.osize_parts stats.usize_parts = self.usize_parts + other.usize_parts stats.nfiles_parts = self.nfiles_parts + other.nfiles_parts + stats.chunking_time = self.chunking_time + other.chunking_time + stats.hashing_time = self.hashing_time + other.hashing_time + for key in other.files_stats: + stats.files_stats[key] = self.files_stats[key] + other.files_stats[key] + return stats def __str__(self): + hashing_time = format_timedelta(timedelta(seconds=self.hashing_time)) + chunking_time = format_timedelta(timedelta(seconds=self.chunking_time)) return """\ Number of files: {stats.nfiles} Original size: {stats.osize_fmt} Deduplicated size: {stats.usize_fmt} +Time spent in hashing: {hashing_time} +Time spent in chunking: {chunking_time} +Added files: {added_files} +Unchanged files: {unchanged_files} +Modified files: {modified_files} +Error files: {error_files} +Bytes read from remote: {stats.rx_bytes} +Bytes sent to remote: {stats.tx_bytes} """.format( - stats=self + stats=self, + hashing_time=hashing_time, + chunking_time=chunking_time, + added_files=self.files_stats["A"], + unchanged_files=self.files_stats["U"], + modified_files=self.files_stats["M"], + error_files=self.files_stats["E"], ) def __repr__(self): @@ -102,6 +128,9 @@ Deduplicated size: {stats.usize_fmt} "original_size": FileSize(self.osize, iec=self.iec), "deduplicated_size": FileSize(self.usize, iec=self.iec), "nfiles": self.nfiles, + "hashing_time": self.hashing_time, + "chunking_time": self.chunking_time, + "files_stats": self.files_stats, } def as_raw_dict(self): @@ -1237,7 +1266,9 @@ class ChunksProcessor: if not chunk_processor: def chunk_processor(chunk): + started_hashing = time.monotonic() chunk_id, data = cached_hash(chunk, self.key.id_hash) + stats.hashing_time += time.monotonic() - started_hashing chunk_entry = cache.add_chunk(chunk_id, {}, data, stats=stats, wait=False) self.cache.repository.async_response(wait=False) return chunk_entry @@ -1411,7 +1442,9 @@ class FilesystemObjectProcessors: else: # normal case, no "2nd+" hardlink if not is_special_file: hashed_path = safe_encode(os.path.join(self.cwd, path)) + started_hashing = time.monotonic() path_hash = self.key.id_hash(hashed_path) + self.stats.hashing_time += time.monotonic() - started_hashing known, ids = cache.file_known_and_unchanged(hashed_path, path_hash, st) else: # in --read-special mode, we may be called for special files. @@ -1434,6 +1467,7 @@ class FilesystemObjectProcessors: else: status = "M" if known else "A" # regular file, modified or added self.print_file_status(status, path) + self.stats.files_stats[status] += 1 status = None # we already printed the status # Only chunkify the file if needed if chunks is not None: @@ -1447,6 +1481,7 @@ class FilesystemObjectProcessors: self.show_progress, backup_io_iter(self.chunker.chunkify(None, fd)), ) + self.stats.chunking_time = self.chunker.chunking_time if is_win32: changed_while_backup = False # TODO else: diff --git a/src/borg/archiver/create_cmd.py b/src/borg/archiver/create_cmd.py index 02f5fc79e..e7e48d053 100644 --- a/src/borg/archiver/create_cmd.py +++ b/src/borg/archiver/create_cmd.py @@ -119,6 +119,7 @@ class CreateMixIn: if status == "C": self.print_warning("%s: file changed while we backed it up", path) self.print_file_status(status, path) + fso.stats.files_stats[status] += 1 if args.paths_from_command: rc = proc.wait() if rc != 0: @@ -142,6 +143,7 @@ class CreateMixIn: else: status = "-" self.print_file_status(status, path) + fso.stats.files_stats[status] += 1 continue path = os.path.normpath(path) parent_dir = os.path.dirname(path) or "." @@ -185,6 +187,8 @@ class CreateMixIn: if args.progress: archive.stats.show_progress(final=True) archive.stats += fso.stats + archive.stats.rx_bytes = getattr(repository, "rx_bytes", 0) + archive.stats.tx_bytes = getattr(repository, "tx_bytes", 0) if sig_int: # do not save the archive if the user ctrl-c-ed - it is valid, but incomplete. # we already have a checkpoint archive in this case. @@ -469,6 +473,8 @@ class CreateMixIn: self.print_warning("%s: file changed while we backed it up", path) if not recurse_excluded_dir: self.print_file_status(status, path) + if status is not None: + fso.stats.files_stats[status] += 1 def build_parser_create(self, subparsers, common_parser, mid_common_parser): from ._common import process_epilog diff --git a/src/borg/chunker.pyx b/src/borg/chunker.pyx index f11e93b2d..5cad2e36d 100644 --- a/src/borg/chunker.pyx +++ b/src/borg/chunker.pyx @@ -2,6 +2,7 @@ API_VERSION = '1.2_01' import errno import os +import time from collections import namedtuple from .constants import CH_DATA, CH_ALLOC, CH_HOLE, zeros @@ -145,6 +146,7 @@ class ChunkerFixed: def __init__(self, block_size, header_size=0, sparse=False): self.block_size = block_size self.header_size = header_size + self.chunking_time = 0.0 # should borg try to do sparse input processing? # whether it actually can be done depends on the input file being seekable. self.try_sparse = sparse and has_seek_hole @@ -198,6 +200,7 @@ class ChunkerFixed: offset = range_start dseek(offset, os.SEEK_SET, fd, fh) while range_size: + started_chunking = time.monotonic() wanted = min(range_size, self.block_size) if is_data: # read block from the range @@ -217,6 +220,7 @@ class ChunkerFixed: if got > 0: offset += got range_size -= got + self.chunking_time += time.monotonic() - started_chunking yield Chunk(data, size=got, allocation=allocation) if got < wanted: # we did not get enough data, looks like EOF. @@ -236,6 +240,7 @@ cdef class Chunker: It also uses a per-repo random seed to avoid some chunk length fingerprinting attacks. """ cdef _Chunker *chunker + cdef readonly float chunking_time def __cinit__(self, int seed, int chunk_min_exp, int chunk_max_exp, int hash_mask_bits, int hash_window_size): min_size = 1 << chunk_min_exp @@ -245,6 +250,8 @@ cdef class Chunker: assert hash_window_size + min_size + 1 <= max_size, "too small max_size" hash_mask = (1 << hash_mask_bits) - 1 self.chunker = chunker_init(hash_window_size, hash_mask, min_size, max_size, seed & 0xffffffff) + self.chunking_time = 0.0 + def chunkify(self, fd, fh=-1): """ @@ -265,6 +272,7 @@ cdef class Chunker: return self def __next__(self): + started_chunking = time.monotonic() data = chunker_process(self.chunker) got = len(data) # we do not have SEEK_DATA/SEEK_HOLE support in chunker_process C code, @@ -275,6 +283,7 @@ cdef class Chunker: allocation = CH_ALLOC else: allocation = CH_DATA + self.chunking_time += time.monotonic() - started_chunking return Chunk(data, size=got, allocation=allocation) diff --git a/src/borg/testsuite/archive.py b/src/borg/testsuite/archive.py index 0a98e386b..3c518ffd3 100644 --- a/src/borg/testsuite/archive.py +++ b/src/borg/testsuite/archive.py @@ -60,6 +60,14 @@ def test_stats_format(stats): Number of files: 1 Original size: 20 B Deduplicated size: 20 B +Time spent in hashing: 0.00 seconds +Time spent in chunking: 0.00 seconds +Added files: 0 +Unchanged files: 0 +Modified files: 0 +Error files: 0 +Bytes read from remote: 0 +Bytes sent to remote: 0 """ ) s = f"{stats.osize_fmt}" diff --git a/src/borg/testsuite/archiver/create_cmd.py b/src/borg/testsuite/archiver/create_cmd.py index 16ebcf137..261921d7f 100644 --- a/src/borg/testsuite/archiver/create_cmd.py +++ b/src/borg/testsuite/archiver/create_cmd.py @@ -1,6 +1,7 @@ import errno import json import os +from random import randbytes import shutil import socket import stat @@ -626,6 +627,46 @@ class ArchiverTestCase(ArchiverTestCaseBase): if has_lchflags: self.assert_in("x input/file3", output) + def test_file_status_counters(self): + """Test file status counters in the stats of `borg create --stats`""" + + def to_dict(borg_create_output): + borg_create_output = borg_create_output.strip().splitlines() + borg_create_output = [line.split(":", 1) for line in borg_create_output] + borg_create_output = { + key: int(value) + for key, value in borg_create_output + if key in ("Added files", "Unchanged files", "Modified files") + } + return borg_create_output + + # Test case set up: create a repository + self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION) + # Archive an empty dir + result = self.cmd(f"--repo={self.repository_location}", "create", "--stats", "test_archive", self.input_path) + result = to_dict(result) + assert result["Added files"] == 0 + assert result["Unchanged files"] == 0 + assert result["Modified files"] == 0 + # Archive a dir with two added files + self.create_regular_file("testfile1", contents=b"test1") + time.sleep(0.01) # testfile2 must have newer timestamps than testfile1 + self.create_regular_file("testfile2", contents=b"test2") + result = self.cmd(f"--repo={self.repository_location}", "create", "--stats", "test_archive2", self.input_path) + result = to_dict(result) + assert result["Added files"] == 2 + assert result["Unchanged files"] == 0 + assert result["Modified files"] == 0 + # Archive a dir with 1 unmodified file and 1 modified + self.create_regular_file("testfile1", contents=b"new data") + result = self.cmd(f"--repo={self.repository_location}", "create", "--stats", "test_archive3", self.input_path) + result = to_dict(result) + # Should process testfile2 as added because of + # https://borgbackup.readthedocs.io/en/stable/faq.html#i-am-seeing-a-added-status-for-an-unchanged-file + assert result["Added files"] == 1 + assert result["Unchanged files"] == 0 + assert result["Modified files"] == 1 + def test_create_json(self): self.create_regular_file("file1", size=1024 * 80) self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION) @@ -731,6 +772,42 @@ class ArchiverTestCase(ArchiverTestCaseBase): log = self.cmd(f"--repo={self.repository_location}", "--debug", "create", "test", "input") assert "security: read previous location" in log + def test_hashing_time(self): + def extract_hashing_time(borg_create_output): + borg_create_output = borg_create_output.strip().splitlines() + borg_create_output = [line.split(":", 1) for line in borg_create_output] + hashing_time = [line for line in borg_create_output if line[0] == "Time spent in hashing"].pop() + hashing_time = hashing_time[1] + hashing_time = float(hashing_time.removesuffix(" seconds")) + return hashing_time + + # Test case set up: create a repository and a file + self.cmd(f"--repo={self.repository_location}", "rcreate", "--encryption=none") + self.create_regular_file("testfile", contents=randbytes(6000000)) + # Archive + result = self.cmd(f"--repo={self.repository_location}", "create", "--stats", "test_archive", self.input_path) + hashing_time = extract_hashing_time(result) + + assert hashing_time > 0.0 + + def test_chunking_time(self): + def extract_chunking_time(borg_create_output): + borg_create_output = borg_create_output.strip().splitlines() + borg_create_output = [line.split(":", 1) for line in borg_create_output] + chunking_time = [line for line in borg_create_output if line[0] == "Time spent in chunking"].pop() + chunking_time = chunking_time[1] + chunking_time = float(chunking_time.removesuffix(" seconds")) + return chunking_time + + # Test case set up: create a repository and a file + self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION) + self.create_regular_file("testfile", contents=randbytes(5000000)) + # Archive + result = self.cmd(f"--repo={self.repository_location}", "create", "--stats", "test_archive", self.input_path) + chunking_time = extract_chunking_time(result) + + assert chunking_time > 0.0 + class RemoteArchiverTestCase(RemoteArchiverTestCaseBase, ArchiverTestCase): """run the same tests, but with a remote repository"""