From f25c580ec4764d2788a14c5b806ff75b5050beda Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sun, 14 Jun 2026 14:02:33 +0200 Subject: [PATCH 1/2] use PEP 604 union types (X | None) instead of typing.Optional Replace the remaining typing.Optional[...] annotations with the X | None syntax (PEP 604, Python 3.10) and drop the now-unused Optional/List imports; List[str] -> list[str] in cockpit/runner. Co-Authored-By: Claude Opus 4.8 --- src/borg/cockpit/runner.py | 6 +++--- src/borg/crypto/key.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/borg/cockpit/runner.py b/src/borg/cockpit/runner.py index 8119e6fb9..6cdafc8c8 100644 --- a/src/borg/cockpit/runner.py +++ b/src/borg/cockpit/runner.py @@ -6,7 +6,7 @@ import asyncio import logging import os import sys -from typing import Optional, Callable, List +from typing import Callable class BorgRunner: @@ -14,10 +14,10 @@ class BorgRunner: Manages the execution of the borg subprocess and parses its JSON output. """ - def __init__(self, command: List[str], log_callback: Callable[[dict], None]): + def __init__(self, command: list[str], log_callback: Callable[[dict], None]): self.command = command self.log_callback = log_callback - self.process: Optional[asyncio.subprocess.Process] = None + self.process: asyncio.subprocess.Process | None = None self.logger = logging.getLogger(__name__) async def start(self): diff --git a/src/borg/crypto/key.py b/src/borg/crypto/key.py index 6927539c6..d0750bdc0 100644 --- a/src/borg/crypto/key.py +++ b/src/borg/crypto/key.py @@ -4,7 +4,7 @@ import os import textwrap from hashlib import sha256 from pathlib import Path -from typing import Literal, ClassVar, Optional +from typing import Literal, ClassVar from collections.abc import Callable from ..logger import create_logger @@ -45,7 +45,7 @@ KEYFILE_ID = "BORG_KEY" ADMIN_LABEL = "admin" -def is_keyfile(data: str | bytes, repoid: Optional[str] = None) -> bool: +def is_keyfile(data: str | bytes, repoid: str | None = None) -> bool: # repoid is a hex str, if given. if given, we only accept keyfiles for that repo. header = f"{KEYFILE_ID} {repoid or ''}" if isinstance(data, str): @@ -61,7 +61,7 @@ def keyfile_format(repoid: str, b64data: str) -> str: return f"{KEYFILE_ID} {repoid}\n{b64data}\n" -def keyfile_parse(data: str | bytes, repoid: Optional[str] = None) -> tuple[str, str]: +def keyfile_parse(data: str | bytes, repoid: str | None = None) -> tuple[str, str]: if repoid is None: if not is_keyfile(data): raise ValueError("Not a keyfile") From b46bfdab18b96a5e29e71e299d6a1d7cf43d0318 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sun, 14 Jun 2026 14:04:25 +0200 Subject: [PATCH 2/2] use structural pattern matching (match/case, Python 3.10) Convert if/elif chains that dispatch on a single value to match statements: repository.borg_permissions (permission preset), CompressionSpec.__init__/compressor/__str__ (compression name), calculate_relative_offset (relative ts unit) and the cockpit widget file-status counter. Membership checks like 'in ("none", "lz4")' become alternative patterns ('none' | 'lz4'). Co-Authored-By: Claude Opus 4.8 --- src/borg/cockpit/widgets.py | 21 +++--- src/borg/helpers/parseformat.py | 117 ++++++++++++++++---------------- src/borg/helpers/time.py | 29 ++++---- src/borg/repository.py | 59 ++++++++-------- 4 files changed, 116 insertions(+), 110 deletions(-) diff --git a/src/borg/cockpit/widgets.py b/src/borg/cockpit/widgets.py index efae7b42b..4aef72e88 100644 --- a/src/borg/cockpit/widgets.py +++ b/src/borg/cockpit/widgets.py @@ -147,16 +147,17 @@ class StandardLog(Vertical): status_panel = self.app.query_one("#status") status_panel.files_count += 1 status = line[0] - if status == "E": - status_panel.error_count += 1 - elif status in "U-": - status_panel.unchanged_count += 1 - elif status in "M": - status_panel.modified_count += 1 - elif status in "A+": - status_panel.added_count += 1 - elif status in "dcbs": - status_panel.other_count += 1 + match status: + case "E": + status_panel.error_count += 1 + case "U" | "-": + status_panel.unchanged_count += 1 + case "M": + status_panel.modified_count += 1 + case "A" | "+": + status_panel.added_count += 1 + case "d" | "c" | "b" | "s": + status_panel.other_count += 1 markup_tag = { "E": "red", # Error diff --git a/src/borg/helpers/parseformat.py b/src/borg/helpers/parseformat.py index f105e689d..9f0ab56fa 100644 --- a/src/borg/helpers/parseformat.py +++ b/src/borg/helpers/parseformat.py @@ -176,71 +176,74 @@ class CompressionSpec: raise ArgumentTypeError("not enough arguments") # --compression algo[,level] self.name = values[0] - if self.name in ("none", "lz4"): - return - elif self.name in ("zlib", "lzma", "zlib_legacy"): # zlib_legacy just for testing - if count < 2: - level = 6 # default compression level in py stdlib - elif count == 2: - level = int(values[1]) - if not 0 <= level <= 9: - raise ArgumentTypeError("level must be >= 0 and <= 9") - else: - raise ArgumentTypeError("too many arguments") - self.level = level - elif self.name in ("zstd",): - if count < 2: - level = 3 # default compression level in zstd - elif count == 2: - level = int(values[1]) - if not 1 <= level <= 22: - raise ArgumentTypeError("level must be >= 1 and <= 22") - else: - raise ArgumentTypeError("too many arguments") - self.level = level - elif self.name == "auto": - if 2 <= count <= 3: - compression = ",".join(values[1:]) - else: - raise ArgumentTypeError("bad arguments") - self.inner = CompressionSpec(compression) - elif self.name == "obfuscate": - if 3 <= count <= 5: - level = int(values[1]) - if not ((1 <= level <= 6) or (110 <= level <= 123) or (level == 250)): - raise ArgumentTypeError("level must be (inclusively) within 1...6, 110...123 or equal to 250") + match self.name: + case "none" | "lz4": + return + case "zlib" | "lzma" | "zlib_legacy": # zlib_legacy just for testing + if count < 2: + level = 6 # default compression level in py stdlib + elif count == 2: + level = int(values[1]) + if not 0 <= level <= 9: + raise ArgumentTypeError("level must be >= 0 and <= 9") + else: + raise ArgumentTypeError("too many arguments") self.level = level - compression = ",".join(values[2:]) - else: - raise ArgumentTypeError("bad arguments") - self.inner = CompressionSpec(compression) - else: - raise ArgumentTypeError("unsupported compression type") + case "zstd": + if count < 2: + level = 3 # default compression level in zstd + elif count == 2: + level = int(values[1]) + if not 1 <= level <= 22: + raise ArgumentTypeError("level must be >= 1 and <= 22") + else: + raise ArgumentTypeError("too many arguments") + self.level = level + case "auto": + if 2 <= count <= 3: + compression = ",".join(values[1:]) + else: + raise ArgumentTypeError("bad arguments") + self.inner = CompressionSpec(compression) + case "obfuscate": + if 3 <= count <= 5: + level = int(values[1]) + if not ((1 <= level <= 6) or (110 <= level <= 123) or (level == 250)): + raise ArgumentTypeError("level must be (inclusively) within 1...6, 110...123 or equal to 250") + self.level = level + compression = ",".join(values[2:]) + else: + raise ArgumentTypeError("bad arguments") + self.inner = CompressionSpec(compression) + case _: + raise ArgumentTypeError("unsupported compression type") @property def compressor(self): from ..compress import get_compressor - if self.name in ("none", "lz4"): - return get_compressor(self.name) - elif self.name in ("zlib", "lzma", "zstd", "zlib_legacy"): - return get_compressor(self.name, level=self.level) - elif self.name == "auto": - return get_compressor(self.name, compressor=self.inner.compressor) - elif self.name == "obfuscate": - return get_compressor(self.name, level=self.level, compressor=self.inner.compressor) + match self.name: + case "none" | "lz4": + return get_compressor(self.name) + case "zlib" | "lzma" | "zstd" | "zlib_legacy": + return get_compressor(self.name, level=self.level) + case "auto": + return get_compressor(self.name, compressor=self.inner.compressor) + case "obfuscate": + return get_compressor(self.name, level=self.level, compressor=self.inner.compressor) def __str__(self): - if self.name in ("none", "lz4"): - return f"{self.name}" - elif self.name in ("zlib", "lzma", "zstd", "zlib_legacy"): - return f"{self.name},{self.level}" - elif self.name == "auto": - return f"auto,{self.inner}" - elif self.name == "obfuscate": - return f"obfuscate,{self.level},{self.inner}" - else: - raise ValueError(f"unsupported compression type: {self.name}") + match self.name: + case "none" | "lz4": + return f"{self.name}" + case "zlib" | "lzma" | "zstd" | "zlib_legacy": + return f"{self.name},{self.level}" + case "auto": + return f"auto,{self.inner}" + case "obfuscate": + return f"obfuscate,{self.level},{self.inner}" + case _: + raise ValueError(f"unsupported compression type: {self.name}") def ChunkerParams(s): diff --git a/src/borg/helpers/time.py b/src/borg/helpers/time.py index d9bccde57..e54394cdc 100644 --- a/src/borg/helpers/time.py +++ b/src/borg/helpers/time.py @@ -139,20 +139,21 @@ def calculate_relative_offset(format_string, from_ts, earlier=False): offset = int(match.group("offset")) offset *= -1 if earlier else 1 - if unit == "y": - return from_ts.replace(year=from_ts.year + offset) - elif unit == "m": - return offset_n_months(from_ts, offset) - elif unit == "w": - return from_ts + timedelta(days=offset * 7) - elif unit == "d": - return from_ts + timedelta(days=offset) - elif unit == "H": - return from_ts + timedelta(seconds=offset * 60 * 60) - elif unit == "M": - return from_ts + timedelta(seconds=offset * 60) - elif unit == "S": - return from_ts + timedelta(seconds=offset) + match unit: + case "y": + return from_ts.replace(year=from_ts.year + offset) + case "m": + return offset_n_months(from_ts, offset) + case "w": + return from_ts + timedelta(days=offset * 7) + case "d": + return from_ts + timedelta(days=offset) + case "H": + return from_ts + timedelta(seconds=offset * 60 * 60) + case "M": + return from_ts + timedelta(seconds=offset * 60) + case "S": + return from_ts + timedelta(seconds=offset) raise ValueError(f"Invalid relative ts offset format: {format_string}") diff --git a/src/borg/repository.py b/src/borg/repository.py index 543864083..df9b4d2a3 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -41,35 +41,36 @@ def borg_permissions(permissions): The namespaces match the borg repository layout (see Repository.__init__ ns_config). """ - if permissions == "all": - return None # permissions system will not be used - elif permissions == "no-delete": # mostly no delete, no overwrite - return { - "": "lr", - "archives": "lrw", - "cache": "lrwWD", # WD for chunks., last-key-checked, ... - "config": "lrW", # W for manifest - "keys": "lr", - "locks": "lrwD", # borg needs to create/delete a shared lock here - "packs": "lrw", - } - elif permissions == "write-only": # mostly no reading - return { - "": "l", - "archives": "lw", - "cache": "lrwWD", # read allowed, e.g. for chunks. cache - "config": "lrW", # W for manifest - "keys": "lr", - "locks": "lrwD", # borg needs to create/delete a shared lock here - "packs": "lw", # no r! - } - elif permissions == "read-only": # mostly r/o - return {"": "lr", "locks": "lrwD"} - else: - raise Error( - f"Invalid BORG_REPO_PERMISSIONS value: {permissions}, should be one of: " - f"all, no-delete, write-only, read-only." - ) + match permissions: + case "all": + return None # permissions system will not be used + case "no-delete": # mostly no delete, no overwrite + return { + "": "lr", + "archives": "lrw", + "cache": "lrwWD", # WD for chunks., last-key-checked, ... + "config": "lrW", # W for manifest + "keys": "lr", + "locks": "lrwD", # borg needs to create/delete a shared lock here + "packs": "lrw", + } + case "write-only": # mostly no reading + return { + "": "l", + "archives": "lw", + "cache": "lrwWD", # read allowed, e.g. for chunks. cache + "config": "lrW", # W for manifest + "keys": "lr", + "locks": "lrwD", # borg needs to create/delete a shared lock here + "packs": "lw", # no r! + } + case "read-only": # mostly r/o + return {"": "lr", "locks": "lrwD"} + case _: + raise Error( + f"Invalid BORG_REPO_PERMISSIONS value: {permissions}, should be one of: " + f"all, no-delete, write-only, read-only." + ) def rest_serve_command(location):