This commit is contained in:
Arno Gobbin 2026-03-23 16:16:58 +01:00 committed by GitHub
commit eff7c2d5c3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 724 additions and 5 deletions

View file

@ -28,6 +28,11 @@ On POSIX systems, you can usually set environment variables to choose a UTF-8 lo
export LANG=en_US.UTF-8
export LC_CTYPE=en_US.UTF-8
.. note::
`Pydantic <https://docs.pydantic.dev/>`_ models are available in
``src/borg/public/cli_api/v1.py``
that can parse Borg's JSON log lines and stdout objects to make the job easier for frontends.
Logging
-------

View file

@ -12,3 +12,4 @@ pytest-cov
pytest-benchmark
Cython
pre-commit
pydantic>=2.0

View file

@ -143,7 +143,7 @@ class Statistics:
PROGRESS_FMT = '{0.osize_fmt} O {0.csize_fmt} C {0.usize_fmt} D {0.nfiles} N '
def show_progress(self, item=None, final=False, stream=None, dt=None):
def show_progress(self, item=None, final=False, stream=None, dt=None, override_time=None):
now = time.monotonic()
if dt is None or now - self.last_progress > dt:
stream = stream or sys.stderr
@ -155,7 +155,7 @@ class Statistics:
else:
data = {}
data.update({
'time': time.time(),
'time': override_time if override_time is not None else time.time(),
'type': 'archive_progress',
'finished': final,
})

View file

@ -1,10 +1,12 @@
import logging
import json
import logging
import sys
import time
import typing
from shutil import get_terminal_size
from ..logger import create_logger
logger = create_logger()
from .parseformat import ellipsis_truncate
@ -75,7 +77,7 @@ class ProgressIndicatorBase:
self.logger.removeHandler(self.handler)
self.handler.close()
def output_json(self, *, finished=False, **kwargs):
def output_json(self, *, finished=False, override_time: typing.Optional[float] = None, **kwargs):
assert self.json
if not self.emit:
return
@ -84,7 +86,7 @@ class ProgressIndicatorBase:
msgid=self.msgid,
type=self.JSON_TYPE,
finished=finished,
time=time.time(),
time=override_time if override_time is not None else time.time(),
))
print(json.dumps(kwargs), file=sys.stderr, flush=True)

View file

@ -0,0 +1,196 @@
"""Pydantic models that can parse borg 1.x's JSON output.
The two top-level models are:
- `BorgLogLine`, which parses any line of borg's logging output,
- all `Borg*Result` classes, which parse the final JSON output of some borg commands.
The different types of log lines are defined in the other models.
"""
import json
import logging
import typing
from datetime import datetime
from pathlib import Path
import pydantic
import typing_extensions
_log = logging.getLogger(__name__)
class BaseBorgLogLine(pydantic.BaseModel):
def get_level(self) -> int:
"""Get the log level for this line as a `logging` level value.
If this is a log message with a levelname, use it.
Otherwise, progress messages get `DEBUG` level, and other messages get `INFO`.
"""
return logging.DEBUG
class ArchiveProgressLogLine(BaseBorgLogLine):
original_size: int
compressed_size: int
deduplicated_size: int
nfiles: int
path: Path
time: float
class FinishedArchiveProgress(BaseBorgLogLine):
"""JSON object printed on stdout when an archive is finished."""
time: float
type: typing.Literal["archive_progress"]
finished: bool
class ProgressMessage(BaseBorgLogLine):
operation: int
msgid: typing.Optional[str]
finished: bool
message: typing.Optional[str] = pydantic.Field(None)
time: float
class ProgressPercent(BaseBorgLogLine):
operation: int
msgid: typing.Optional[str] = pydantic.Field(None)
finished: bool
message: typing.Optional[str] = pydantic.Field(None)
current: typing.Optional[float] = pydantic.Field(None)
info: typing.Optional[typing.List[str]] = pydantic.Field(None)
total: typing.Optional[float] = pydantic.Field(None)
time: float
@pydantic.model_validator(mode="after")
def fields_depending_on_finished(self) -> typing_extensions.Self:
if self.finished:
if self.message is not None:
raise ValueError("message must be None if finished is True")
if self.current is not None and self.total is not None and self.current != self.total:
raise ValueError("current must be equal to total if finished is True")
if self.info is not None:
raise ValueError("info must be None if finished is True")
if self.total is not None:
raise ValueError("total must be None if finished is True")
return self
class FileStatus(BaseBorgLogLine):
status: str
path: Path
class LogMessage(BaseBorgLogLine):
time: float
levelname: typing.Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
name: str
message: str
msgid: typing.Optional[str] = pydantic.Field(None)
def get_level(self) -> int:
try:
return getattr(logging, self.levelname)
except AttributeError:
_log.warning(
"could not find log level %s, giving the following message WARNING level: %s",
self.levelname,
json.dumps(self),
)
return logging.WARNING
_BorgLogLinePossibleTypes = typing.Union[
ArchiveProgressLogLine, FinishedArchiveProgress, ProgressMessage, ProgressPercent, FileStatus, LogMessage
]
class BorgLogLine(pydantic.RootModel[_BorgLogLinePossibleTypes]):
"""A log line from Borg with the `--log-json` argument."""
def get_level(self) -> int:
return self.root.get_level()
class _BorgArchive(pydantic.BaseModel):
"""Basic archive attributes."""
name: str
id: str
start: datetime
class _BorgArchiveStatistics(pydantic.BaseModel):
"""Statistics of an archive."""
original_size: int
compressed_size: int
deduplicated_size: int
nfiles: int
class _BorgLimitUsage(pydantic.BaseModel):
"""Usage of borg limits by an archive."""
max_archive_size: float
class _BorgChunkerParams(pydantic.BaseModel):
"""Chunker parameters tuple.
Format: (algorithm, min_exp, max_exp, mask_bits, window_size)
"""
algorithm: typing.Literal["buzhash", "fixed"]
min_exp: int
max_exp: int
mask_bits: int
window_size: int
@pydantic.model_validator(mode="before")
@classmethod
def parse_list(cls, data: typing.Any) -> typing.Any:
"""Parse from list format [algorithm, min_exp, max_exp, mask_bits, window_size]."""
if isinstance(data, list) and len(data) == 5:
return {
"algorithm": data[0],
"min_exp": data[1],
"max_exp": data[2],
"mask_bits": data[3],
"window_size": data[4],
}
return data
class _BorgDetailedArchive(_BorgArchive):
"""Archive attributes, as printed by `json info` or `json create`."""
end: datetime
duration: float
stats: _BorgArchiveStatistics
limits: _BorgLimitUsage
command_line: typing.List[str]
chunker_params: typing.Optional[_BorgChunkerParams] = None
@pydantic.field_validator("chunker_params", mode="before")
@classmethod
def empty_string_to_none(cls, v: typing.Any) -> typing.Any:
"""Convert empty string to None (for old archives without chunker_params)."""
if v == "":
return None
return v
class BorgCreateResult(pydantic.BaseModel):
"""JSON object printed at the end of `borg create`."""
archive: _BorgDetailedArchive
class BorgListResult(pydantic.BaseModel):
"""JSON object printed at the end of `borg list`."""
archives: typing.List[_BorgArchive]

View file

@ -0,0 +1,515 @@
import json
import logging
from datetime import datetime, timezone
from io import StringIO
from pathlib import Path
from typing import Literal
from unittest.mock import patch
import pytest
from borg.item import Item
import borg.public.cli_api.v1 as v1
from borg.archive import Statistics
from borg.archiver import Archiver
from borg.helpers.parseformat import BorgJsonEncoder
from borg.helpers.progress import ProgressIndicatorBase, ProgressIndicatorMessage
from borg.helpers.time import OutputTimestamp, to_localtime
from borg.logger import JsonFormatter
@pytest.fixture(autouse=True)
def reset_progress_operation_id():
"""Reset ProgressIndicatorBase operation ID counter before each test.
This ensures each test gets predictable operation IDs starting from 1.
"""
yield
ProgressIndicatorBase.operation_id_counter = 0
def test_parse_progress_percent_unfinished():
percent = ProgressIndicatorBase()
percent.json = True
percent.emit = True
override_time = 4567.23
with patch("builtins.print") as mock_print:
percent.output_json(finished=False, current=10, override_time=override_time)
mock_print.assert_called_once()
json_output = mock_print.call_args[0][0]
assert v1.ProgressPercent.model_validate_json(json_output) == v1.ProgressPercent(
operation=1, msgid=None, finished=False, message=None, current=10, info=None, total=None, time=4567.23
)
def test_parse_progress_percent_finished():
percent = ProgressIndicatorBase()
percent.json = True
percent.emit = True
override_time = 4567.23
with patch("builtins.print") as mock_print:
percent.output_json(finished=True, override_time=override_time)
mock_print.assert_called_once()
json_output = mock_print.call_args[0][0]
assert v1.ProgressPercent.model_validate_json(json_output) == v1.ProgressPercent(
operation=1, msgid=None, finished=True, message=None, current=None, info=None, total=None, time=override_time
)
def test_parse_archive_progress_log_line():
"""Test parsing ArchiveProgressLogLine from Statistics.show_progress()."""
stats = Statistics()
stats.update(20, 10, unique=True)
stats.output_json = True
override_time = 1234.56
item = Item(path="foo/bar.txt")
out = StringIO()
stats.show_progress(item=item, stream=out, override_time=override_time)
json_output = out.getvalue()
parsed = v1.ArchiveProgressLogLine.model_validate_json(json_output)
assert isinstance(parsed.time, float)
assert parsed == v1.ArchiveProgressLogLine(
original_size=20,
compressed_size=10,
deduplicated_size=10,
nfiles=0,
path=Path("foo/bar.txt"),
time=override_time,
)
def test_parse_finished_archive_progress():
"""Test parsing FinishedArchiveProgress from Statistics.show_progress(final=True)."""
stats = Statistics()
stats.output_json = True
override_time = 5678.90
out = StringIO()
stats.show_progress(stream=out, final=True, override_time=override_time)
json_output = out.getvalue()
parsed = v1.FinishedArchiveProgress.model_validate_json(json_output)
assert isinstance(parsed.time, float)
assert parsed == v1.FinishedArchiveProgress(
type="archive_progress",
finished=True,
time=override_time,
)
def test_parse_progress_message_unfinished():
"""Test parsing ProgressMessage from ProgressIndicatorMessage with message."""
progress = ProgressIndicatorMessage()
progress.json = True
progress.emit = True
override_time = 1234.56
with patch("builtins.print") as mock_print:
progress.output_json(message="Processing files", override_time=override_time)
mock_print.assert_called_once()
json_output = mock_print.call_args[0][0]
parsed = v1.ProgressMessage.model_validate_json(json_output)
assert parsed == v1.ProgressMessage(
operation=progress.id,
msgid=None,
finished=False,
message="Processing files",
time=1234.56,
)
def test_parse_progress_message_finished():
"""Test parsing ProgressMessage when finished."""
progress = ProgressIndicatorMessage()
progress.json = True
progress.emit = True
override_time = 5678.90
with patch("builtins.print") as mock_print:
progress.output_json(finished=True, override_time=override_time)
mock_print.assert_called_once()
json_output = mock_print.call_args[0][0]
parsed = v1.ProgressMessage.model_validate_json(json_output)
assert parsed == v1.ProgressMessage(
operation=progress.id,
msgid=None,
finished=True,
message=None,
time=5678.90,
)
def test_parse_file_status():
"""Test parsing FileStatus from Archiver.print_file_status()."""
archiver = Archiver()
archiver.output_list = True
archiver.output_filter = None
archiver.log_json = True
stderr_capture = StringIO()
with patch("sys.stderr", stderr_capture):
archiver.print_file_status("A", "path/to/file.txt")
json_output = stderr_capture.getvalue()
parsed = v1.FileStatus.model_validate_json(json_output)
assert parsed == v1.FileStatus(status="A", path=Path("path/to/file.txt"))
def test_parse_log_message():
"""Test parsing LogMessage from JsonFormatter."""
formatter = JsonFormatter()
test_time = 1234567890.123
# Create a LogRecord with all required fields
record = logging.LogRecord(
name="borg.test",
level=logging.INFO,
pathname="test.py",
lineno=42,
msg="Test message",
args=(),
exc_info=None,
)
record.msgid = "test.msgid"
record.created = test_time
json_output = formatter.format(record)
parsed = v1.LogMessage.model_validate_json(json_output)
assert isinstance(parsed.time, float)
assert parsed == v1.LogMessage(
levelname="INFO",
name="borg.test",
message="Test message",
msgid="test.msgid",
time=test_time,
)
def test_parse_log_message_all_levels():
"""Test parsing LogMessage for all log levels."""
formatter = JsonFormatter()
test_time = 1234567890.456
levels: list[tuple[int, Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]]] = [
(logging.DEBUG, "DEBUG"),
(logging.INFO, "INFO"),
(logging.WARNING, "WARNING"),
(logging.ERROR, "ERROR"),
(logging.CRITICAL, "CRITICAL"),
]
for level_num, level_name in levels:
record = logging.LogRecord(
name="borg.test",
level=level_num,
pathname="test.py",
lineno=1,
msg=f"{level_name} message",
args=(),
exc_info=None,
)
record.created = test_time
json_output = formatter.format(record)
parsed = v1.LogMessage.model_validate_json(json_output)
assert isinstance(parsed.time, float)
assert parsed == v1.LogMessage(
levelname=level_name,
name="borg.test",
message=f"{level_name} message",
msgid=None,
time=test_time,
)
def test_parse_log_message_without_msgid():
"""Test parsing LogMessage without msgid field."""
formatter = JsonFormatter()
test_time = 1234567890.789
record = logging.LogRecord(
name="borg.test",
level=logging.WARNING,
pathname="test.py",
lineno=10,
msg="Warning without msgid",
args=(),
exc_info=None,
)
# Don't set msgid - it should be None or absent
record.created = test_time
json_output = formatter.format(record)
parsed = v1.LogMessage.model_validate_json(json_output)
assert isinstance(parsed.time, float)
assert parsed == v1.LogMessage(
levelname="WARNING",
name="borg.test",
message="Warning without msgid",
msgid=None,
time=test_time,
)
def test_parse_borg_create_result():
"""Test parsing BorgCreateResult from Archive.info() output."""
# Create timestamps in UTC
start_time = datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc)
end_time = datetime(2024, 1, 15, 10, 35, 0, tzinfo=timezone.utc)
# Build the archive info dict as production code does
archive_info = {
"name": "test-archive",
"id": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
"start": OutputTimestamp(start_time),
"end": OutputTimestamp(end_time),
"duration": (end_time - start_time).total_seconds(),
"stats": {
"original_size": 1000000,
"compressed_size": 500000,
"deduplicated_size": 250000,
"nfiles": 42,
},
"limits": {
"max_archive_size": 0.05,
},
"command_line": ["borg", "create", "::test-archive", "/data"],
}
# Use BorgJsonEncoder to serialize as production does
json_output = json.dumps({"archive": archive_info}, cls=BorgJsonEncoder)
parsed = v1.BorgCreateResult.model_validate_json(json_output)
# OutputTimestamp converts UTC to localtime (naive datetime)
# When Pydantic parses the ISO string back, it creates a naive datetime
expected_start = to_localtime(start_time)
expected_end = to_localtime(end_time)
assert parsed == v1.BorgCreateResult(
archive=v1._BorgDetailedArchive(
name="test-archive",
id="1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
start=expected_start,
end=expected_end,
duration=300.0,
stats=v1._BorgArchiveStatistics(
original_size=1000000,
compressed_size=500000,
deduplicated_size=250000,
nfiles=42,
),
limits=v1._BorgLimitUsage(max_archive_size=0.05),
command_line=["borg", "create", "::test-archive", "/data"],
chunker_params=None,
)
)
def test_parse_borg_list_result():
"""Test parsing BorgListResult from ArchiveFormatter.get_item_data() output."""
# Build archive list as production code does
start_time1 = datetime(2024, 1, 15, 10, 0, 0, tzinfo=timezone.utc)
start_time2 = datetime(2024, 1, 16, 10, 0, 0, tzinfo=timezone.utc)
archives_list = [
{
"name": "archive-2024-01-15",
"id": "aaaa1111bbbb2222cccc3333dddd4444eeee5555ffff6666aaaa7777bbbb8888",
"start": OutputTimestamp(start_time1),
},
{
"name": "archive-2024-01-16",
"id": "bbbb2222cccc3333dddd4444eeee5555ffff6666aaaa7777bbbb8888cccc9999",
"start": OutputTimestamp(start_time2),
},
]
# Use BorgJsonEncoder to serialize as production does
json_output = json.dumps({"archives": archives_list}, cls=BorgJsonEncoder)
parsed = v1.BorgListResult.model_validate_json(json_output)
# OutputTimestamp converts UTC to localtime (naive datetime)
expected_start1 = to_localtime(start_time1)
expected_start2 = to_localtime(start_time2)
assert parsed == v1.BorgListResult(
archives=[
v1._BorgArchive(
name="archive-2024-01-15",
id="aaaa1111bbbb2222cccc3333dddd4444eeee5555ffff6666aaaa7777bbbb8888",
start=expected_start1,
),
v1._BorgArchive(
name="archive-2024-01-16",
id="bbbb2222cccc3333dddd4444eeee5555ffff6666aaaa7777bbbb8888cccc9999",
start=expected_start2,
),
]
)
def test_parse_borg_list_result_empty():
"""Test parsing BorgListResult with no archives."""
json_output = json.dumps({"archives": []}, cls=BorgJsonEncoder)
parsed = v1.BorgListResult.model_validate_json(json_output)
assert parsed == v1.BorgListResult(archives=[])
def test_parse_chunker_params_empty_string():
"""Test parsing when chunker_params is an empty string (old archives without chunker_params)."""
start_time = datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc)
end_time = datetime(2024, 1, 15, 10, 35, 0, tzinfo=timezone.utc)
archive_info = {
"name": "old-archive",
"id": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
"start": OutputTimestamp(start_time),
"end": OutputTimestamp(end_time),
"duration": (end_time - start_time).total_seconds(),
"stats": {
"original_size": 1000000,
"compressed_size": 500000,
"deduplicated_size": 250000,
"nfiles": 42,
},
"limits": {
"max_archive_size": 0.05,
},
"command_line": ["borg", "create", "::old-archive", "/data"],
"chunker_params": "", # Empty string as set in archive.py line 555
}
json_output = json.dumps({"archive": archive_info}, cls=BorgJsonEncoder)
parsed = v1.BorgCreateResult.model_validate_json(json_output)
# Empty string should be treated as None
assert parsed.archive.chunker_params is None
def test_parse_chunker_params_with_values():
"""Test parsing when chunker_params has actual values."""
start_time = datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc)
end_time = datetime(2024, 1, 15, 10, 35, 0, tzinfo=timezone.utc)
archive_info = {
"name": "new-archive",
"id": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
"start": OutputTimestamp(start_time),
"end": OutputTimestamp(end_time),
"duration": (end_time - start_time).total_seconds(),
"stats": {
"original_size": 1000000,
"compressed_size": 500000,
"deduplicated_size": 250000,
"nfiles": 42,
},
"limits": {
"max_archive_size": 0.05,
},
"command_line": ["borg", "create", "::new-archive", "/data"],
"chunker_params": ["buzhash", 19, 23, 21, 4095], # As a list (JSON serialized tuple)
}
json_output = json.dumps({"archive": archive_info}, cls=BorgJsonEncoder)
parsed = v1.BorgCreateResult.model_validate_json(json_output)
expected_start = to_localtime(start_time)
expected_end = to_localtime(end_time)
assert parsed == v1.BorgCreateResult(
archive=v1._BorgDetailedArchive(
name="new-archive",
id="1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
start=expected_start,
end=expected_end,
duration=300.0,
stats=v1._BorgArchiveStatistics(
original_size=1000000,
compressed_size=500000,
deduplicated_size=250000,
nfiles=42,
),
limits=v1._BorgLimitUsage(max_archive_size=0.05),
command_line=["borg", "create", "::new-archive", "/data"],
chunker_params=v1._BorgChunkerParams(
algorithm="buzhash",
min_exp=19,
max_exp=23,
mask_bits=21,
window_size=4095,
),
)
)
def test_parse_chunker_params_fixed_algorithm():
"""Test parsing chunker_params with 'fixed' algorithm."""
start_time = datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc)
end_time = datetime(2024, 1, 15, 10, 35, 0, tzinfo=timezone.utc)
archive_info = {
"name": "fixed-archive",
"id": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
"start": OutputTimestamp(start_time),
"end": OutputTimestamp(end_time),
"duration": (end_time - start_time).total_seconds(),
"stats": {
"original_size": 1000000,
"compressed_size": 500000,
"deduplicated_size": 250000,
"nfiles": 42,
},
"limits": {
"max_archive_size": 0.05,
},
"command_line": ["borg", "create", "::fixed-archive", "/data"],
"chunker_params": ["fixed", 16, 20, 18, 2048],
}
json_output = json.dumps({"archive": archive_info}, cls=BorgJsonEncoder)
parsed = v1.BorgCreateResult.model_validate_json(json_output)
expected_start = to_localtime(start_time)
expected_end = to_localtime(end_time)
assert parsed == v1.BorgCreateResult(
archive=v1._BorgDetailedArchive(
name="fixed-archive",
id="1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
start=expected_start,
end=expected_end,
duration=300.0,
stats=v1._BorgArchiveStatistics(
original_size=1000000,
compressed_size=500000,
deduplicated_size=250000,
nfiles=42,
),
limits=v1._BorgLimitUsage(max_archive_size=0.05),
command_line=["borg", "create", "::fixed-archive", "/data"],
chunker_params=v1._BorgChunkerParams(
algorithm="fixed",
min_exp=16,
max_exp=20,
mask_bits=18,
window_size=2048,
),
)
)