Adding performance statistics to borg create (#6991)

- file status A/M/E counters
- chunking time
- hashing time
- rx_bytes / tx_bytes

Note: the sleep() in the test is needed due to timestamp granularity on linux being much more coarse than expected (uses the system timer, 100Hz or 250Hz).
This commit is contained in:
Franco Ayala 2022-10-19 16:40:02 -03:00 committed by GitHub
parent b8a0e0d6bd
commit 2ed7f317d3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 137 additions and 2 deletions

View file

@ -4,7 +4,7 @@ import os
import stat
import sys
import time
from collections import OrderedDict
from collections import OrderedDict, defaultdict
from contextlib import contextmanager
from datetime import datetime, timedelta
from functools import partial
@ -60,6 +60,11 @@ class Statistics:
self.osize = self.usize = self.nfiles = 0
self.osize_parts = self.usize_parts = self.nfiles_parts = 0
self.last_progress = 0 # timestamp when last progress was shown
self.files_stats = defaultdict(int)
self.chunking_time = 0.0
self.hashing_time = 0.0
self.rx_bytes = 0
self.tx_bytes = 0
def update(self, size, unique, part=False):
if not part:
@ -81,15 +86,36 @@ class Statistics:
stats.osize_parts = self.osize_parts + other.osize_parts
stats.usize_parts = self.usize_parts + other.usize_parts
stats.nfiles_parts = self.nfiles_parts + other.nfiles_parts
stats.chunking_time = self.chunking_time + other.chunking_time
stats.hashing_time = self.hashing_time + other.hashing_time
for key in other.files_stats:
stats.files_stats[key] = self.files_stats[key] + other.files_stats[key]
return stats
def __str__(self):
hashing_time = format_timedelta(timedelta(seconds=self.hashing_time))
chunking_time = format_timedelta(timedelta(seconds=self.chunking_time))
return """\
Number of files: {stats.nfiles}
Original size: {stats.osize_fmt}
Deduplicated size: {stats.usize_fmt}
Time spent in hashing: {hashing_time}
Time spent in chunking: {chunking_time}
Added files: {added_files}
Unchanged files: {unchanged_files}
Modified files: {modified_files}
Error files: {error_files}
Bytes read from remote: {stats.rx_bytes}
Bytes sent to remote: {stats.tx_bytes}
""".format(
stats=self
stats=self,
hashing_time=hashing_time,
chunking_time=chunking_time,
added_files=self.files_stats["A"],
unchanged_files=self.files_stats["U"],
modified_files=self.files_stats["M"],
error_files=self.files_stats["E"],
)
def __repr__(self):
@ -102,6 +128,9 @@ Deduplicated size: {stats.usize_fmt}
"original_size": FileSize(self.osize, iec=self.iec),
"deduplicated_size": FileSize(self.usize, iec=self.iec),
"nfiles": self.nfiles,
"hashing_time": self.hashing_time,
"chunking_time": self.chunking_time,
"files_stats": self.files_stats,
}
def as_raw_dict(self):
@ -1237,7 +1266,9 @@ class ChunksProcessor:
if not chunk_processor:
def chunk_processor(chunk):
started_hashing = time.monotonic()
chunk_id, data = cached_hash(chunk, self.key.id_hash)
stats.hashing_time += time.monotonic() - started_hashing
chunk_entry = cache.add_chunk(chunk_id, {}, data, stats=stats, wait=False)
self.cache.repository.async_response(wait=False)
return chunk_entry
@ -1411,7 +1442,9 @@ class FilesystemObjectProcessors:
else: # normal case, no "2nd+" hardlink
if not is_special_file:
hashed_path = safe_encode(os.path.join(self.cwd, path))
started_hashing = time.monotonic()
path_hash = self.key.id_hash(hashed_path)
self.stats.hashing_time += time.monotonic() - started_hashing
known, ids = cache.file_known_and_unchanged(hashed_path, path_hash, st)
else:
# in --read-special mode, we may be called for special files.
@ -1434,6 +1467,7 @@ class FilesystemObjectProcessors:
else:
status = "M" if known else "A" # regular file, modified or added
self.print_file_status(status, path)
self.stats.files_stats[status] += 1
status = None # we already printed the status
# Only chunkify the file if needed
if chunks is not None:
@ -1447,6 +1481,7 @@ class FilesystemObjectProcessors:
self.show_progress,
backup_io_iter(self.chunker.chunkify(None, fd)),
)
self.stats.chunking_time = self.chunker.chunking_time
if is_win32:
changed_while_backup = False # TODO
else:

View file

@ -119,6 +119,7 @@ class CreateMixIn:
if status == "C":
self.print_warning("%s: file changed while we backed it up", path)
self.print_file_status(status, path)
fso.stats.files_stats[status] += 1
if args.paths_from_command:
rc = proc.wait()
if rc != 0:
@ -142,6 +143,7 @@ class CreateMixIn:
else:
status = "-"
self.print_file_status(status, path)
fso.stats.files_stats[status] += 1
continue
path = os.path.normpath(path)
parent_dir = os.path.dirname(path) or "."
@ -185,6 +187,8 @@ class CreateMixIn:
if args.progress:
archive.stats.show_progress(final=True)
archive.stats += fso.stats
archive.stats.rx_bytes = getattr(repository, "rx_bytes", 0)
archive.stats.tx_bytes = getattr(repository, "tx_bytes", 0)
if sig_int:
# do not save the archive if the user ctrl-c-ed - it is valid, but incomplete.
# we already have a checkpoint archive in this case.
@ -469,6 +473,8 @@ class CreateMixIn:
self.print_warning("%s: file changed while we backed it up", path)
if not recurse_excluded_dir:
self.print_file_status(status, path)
if status is not None:
fso.stats.files_stats[status] += 1
def build_parser_create(self, subparsers, common_parser, mid_common_parser):
from ._common import process_epilog

View file

@ -2,6 +2,7 @@ API_VERSION = '1.2_01'
import errno
import os
import time
from collections import namedtuple
from .constants import CH_DATA, CH_ALLOC, CH_HOLE, zeros
@ -145,6 +146,7 @@ class ChunkerFixed:
def __init__(self, block_size, header_size=0, sparse=False):
self.block_size = block_size
self.header_size = header_size
self.chunking_time = 0.0
# should borg try to do sparse input processing?
# whether it actually can be done depends on the input file being seekable.
self.try_sparse = sparse and has_seek_hole
@ -198,6 +200,7 @@ class ChunkerFixed:
offset = range_start
dseek(offset, os.SEEK_SET, fd, fh)
while range_size:
started_chunking = time.monotonic()
wanted = min(range_size, self.block_size)
if is_data:
# read block from the range
@ -217,6 +220,7 @@ class ChunkerFixed:
if got > 0:
offset += got
range_size -= got
self.chunking_time += time.monotonic() - started_chunking
yield Chunk(data, size=got, allocation=allocation)
if got < wanted:
# we did not get enough data, looks like EOF.
@ -236,6 +240,7 @@ cdef class Chunker:
It also uses a per-repo random seed to avoid some chunk length fingerprinting attacks.
"""
cdef _Chunker *chunker
cdef readonly float chunking_time
def __cinit__(self, int seed, int chunk_min_exp, int chunk_max_exp, int hash_mask_bits, int hash_window_size):
min_size = 1 << chunk_min_exp
@ -245,6 +250,8 @@ cdef class Chunker:
assert hash_window_size + min_size + 1 <= max_size, "too small max_size"
hash_mask = (1 << hash_mask_bits) - 1
self.chunker = chunker_init(hash_window_size, hash_mask, min_size, max_size, seed & 0xffffffff)
self.chunking_time = 0.0
def chunkify(self, fd, fh=-1):
"""
@ -265,6 +272,7 @@ cdef class Chunker:
return self
def __next__(self):
started_chunking = time.monotonic()
data = chunker_process(self.chunker)
got = len(data)
# we do not have SEEK_DATA/SEEK_HOLE support in chunker_process C code,
@ -275,6 +283,7 @@ cdef class Chunker:
allocation = CH_ALLOC
else:
allocation = CH_DATA
self.chunking_time += time.monotonic() - started_chunking
return Chunk(data, size=got, allocation=allocation)

View file

@ -60,6 +60,14 @@ def test_stats_format(stats):
Number of files: 1
Original size: 20 B
Deduplicated size: 20 B
Time spent in hashing: 0.00 seconds
Time spent in chunking: 0.00 seconds
Added files: 0
Unchanged files: 0
Modified files: 0
Error files: 0
Bytes read from remote: 0
Bytes sent to remote: 0
"""
)
s = f"{stats.osize_fmt}"

View file

@ -1,6 +1,7 @@
import errno
import json
import os
from random import randbytes
import shutil
import socket
import stat
@ -626,6 +627,46 @@ class ArchiverTestCase(ArchiverTestCaseBase):
if has_lchflags:
self.assert_in("x input/file3", output)
def test_file_status_counters(self):
"""Test file status counters in the stats of `borg create --stats`"""
def to_dict(borg_create_output):
borg_create_output = borg_create_output.strip().splitlines()
borg_create_output = [line.split(":", 1) for line in borg_create_output]
borg_create_output = {
key: int(value)
for key, value in borg_create_output
if key in ("Added files", "Unchanged files", "Modified files")
}
return borg_create_output
# Test case set up: create a repository
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
# Archive an empty dir
result = self.cmd(f"--repo={self.repository_location}", "create", "--stats", "test_archive", self.input_path)
result = to_dict(result)
assert result["Added files"] == 0
assert result["Unchanged files"] == 0
assert result["Modified files"] == 0
# Archive a dir with two added files
self.create_regular_file("testfile1", contents=b"test1")
time.sleep(0.01) # testfile2 must have newer timestamps than testfile1
self.create_regular_file("testfile2", contents=b"test2")
result = self.cmd(f"--repo={self.repository_location}", "create", "--stats", "test_archive2", self.input_path)
result = to_dict(result)
assert result["Added files"] == 2
assert result["Unchanged files"] == 0
assert result["Modified files"] == 0
# Archive a dir with 1 unmodified file and 1 modified
self.create_regular_file("testfile1", contents=b"new data")
result = self.cmd(f"--repo={self.repository_location}", "create", "--stats", "test_archive3", self.input_path)
result = to_dict(result)
# Should process testfile2 as added because of
# https://borgbackup.readthedocs.io/en/stable/faq.html#i-am-seeing-a-added-status-for-an-unchanged-file
assert result["Added files"] == 1
assert result["Unchanged files"] == 0
assert result["Modified files"] == 1
def test_create_json(self):
self.create_regular_file("file1", size=1024 * 80)
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
@ -731,6 +772,42 @@ class ArchiverTestCase(ArchiverTestCaseBase):
log = self.cmd(f"--repo={self.repository_location}", "--debug", "create", "test", "input")
assert "security: read previous location" in log
def test_hashing_time(self):
def extract_hashing_time(borg_create_output):
borg_create_output = borg_create_output.strip().splitlines()
borg_create_output = [line.split(":", 1) for line in borg_create_output]
hashing_time = [line for line in borg_create_output if line[0] == "Time spent in hashing"].pop()
hashing_time = hashing_time[1]
hashing_time = float(hashing_time.removesuffix(" seconds"))
return hashing_time
# Test case set up: create a repository and a file
self.cmd(f"--repo={self.repository_location}", "rcreate", "--encryption=none")
self.create_regular_file("testfile", contents=randbytes(6000000))
# Archive
result = self.cmd(f"--repo={self.repository_location}", "create", "--stats", "test_archive", self.input_path)
hashing_time = extract_hashing_time(result)
assert hashing_time > 0.0
def test_chunking_time(self):
def extract_chunking_time(borg_create_output):
borg_create_output = borg_create_output.strip().splitlines()
borg_create_output = [line.split(":", 1) for line in borg_create_output]
chunking_time = [line for line in borg_create_output if line[0] == "Time spent in chunking"].pop()
chunking_time = chunking_time[1]
chunking_time = float(chunking_time.removesuffix(" seconds"))
return chunking_time
# Test case set up: create a repository and a file
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
self.create_regular_file("testfile", contents=randbytes(5000000))
# Archive
result = self.cmd(f"--repo={self.repository_location}", "create", "--stats", "test_archive", self.input_path)
chunking_time = extract_chunking_time(result)
assert chunking_time > 0.0
class RemoteArchiverTestCase(RemoteArchiverTestCaseBase, ArchiverTestCase):
"""run the same tests, but with a remote repository"""