Merge pull request #9773 from ThomasWaldmann/py310-modernize

use Python 3.10 features: match statements and PEP 604 unions
This commit is contained in:
TW 2026-06-14 15:36:29 +02:00 committed by GitHub
commit 46d5c96cb4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 122 additions and 116 deletions

View file

@ -6,7 +6,7 @@ import asyncio
import logging
import os
import sys
from typing import Optional, Callable, List
from typing import Callable
class BorgRunner:
@ -14,10 +14,10 @@ class BorgRunner:
Manages the execution of the borg subprocess and parses its JSON output.
"""
def __init__(self, command: List[str], log_callback: Callable[[dict], None]):
def __init__(self, command: list[str], log_callback: Callable[[dict], None]):
self.command = command
self.log_callback = log_callback
self.process: Optional[asyncio.subprocess.Process] = None
self.process: asyncio.subprocess.Process | None = None
self.logger = logging.getLogger(__name__)
async def start(self):

View file

@ -147,16 +147,17 @@ class StandardLog(Vertical):
status_panel = self.app.query_one("#status")
status_panel.files_count += 1
status = line[0]
if status == "E":
status_panel.error_count += 1
elif status in "U-":
status_panel.unchanged_count += 1
elif status in "M":
status_panel.modified_count += 1
elif status in "A+":
status_panel.added_count += 1
elif status in "dcbs":
status_panel.other_count += 1
match status:
case "E":
status_panel.error_count += 1
case "U" | "-":
status_panel.unchanged_count += 1
case "M":
status_panel.modified_count += 1
case "A" | "+":
status_panel.added_count += 1
case "d" | "c" | "b" | "s":
status_panel.other_count += 1
markup_tag = {
"E": "red", # Error

View file

@ -4,7 +4,7 @@ import os
import textwrap
from hashlib import sha256
from pathlib import Path
from typing import Literal, ClassVar, Optional
from typing import Literal, ClassVar
from collections.abc import Callable
from ..logger import create_logger
@ -45,7 +45,7 @@ KEYFILE_ID = "BORG_KEY"
ADMIN_LABEL = "admin"
def is_keyfile(data: str | bytes, repoid: Optional[str] = None) -> bool:
def is_keyfile(data: str | bytes, repoid: str | None = None) -> bool:
# repoid is a hex str, if given. if given, we only accept keyfiles for that repo.
header = f"{KEYFILE_ID} {repoid or ''}"
if isinstance(data, str):
@ -61,7 +61,7 @@ def keyfile_format(repoid: str, b64data: str) -> str:
return f"{KEYFILE_ID} {repoid}\n{b64data}\n"
def keyfile_parse(data: str | bytes, repoid: Optional[str] = None) -> tuple[str, str]:
def keyfile_parse(data: str | bytes, repoid: str | None = None) -> tuple[str, str]:
if repoid is None:
if not is_keyfile(data):
raise ValueError("Not a keyfile")

View file

@ -176,71 +176,74 @@ class CompressionSpec:
raise ArgumentTypeError("not enough arguments")
# --compression algo[,level]
self.name = values[0]
if self.name in ("none", "lz4"):
return
elif self.name in ("zlib", "lzma", "zlib_legacy"): # zlib_legacy just for testing
if count < 2:
level = 6 # default compression level in py stdlib
elif count == 2:
level = int(values[1])
if not 0 <= level <= 9:
raise ArgumentTypeError("level must be >= 0 and <= 9")
else:
raise ArgumentTypeError("too many arguments")
self.level = level
elif self.name in ("zstd",):
if count < 2:
level = 3 # default compression level in zstd
elif count == 2:
level = int(values[1])
if not 1 <= level <= 22:
raise ArgumentTypeError("level must be >= 1 and <= 22")
else:
raise ArgumentTypeError("too many arguments")
self.level = level
elif self.name == "auto":
if 2 <= count <= 3:
compression = ",".join(values[1:])
else:
raise ArgumentTypeError("bad arguments")
self.inner = CompressionSpec(compression)
elif self.name == "obfuscate":
if 3 <= count <= 5:
level = int(values[1])
if not ((1 <= level <= 6) or (110 <= level <= 123) or (level == 250)):
raise ArgumentTypeError("level must be (inclusively) within 1...6, 110...123 or equal to 250")
match self.name:
case "none" | "lz4":
return
case "zlib" | "lzma" | "zlib_legacy": # zlib_legacy just for testing
if count < 2:
level = 6 # default compression level in py stdlib
elif count == 2:
level = int(values[1])
if not 0 <= level <= 9:
raise ArgumentTypeError("level must be >= 0 and <= 9")
else:
raise ArgumentTypeError("too many arguments")
self.level = level
compression = ",".join(values[2:])
else:
raise ArgumentTypeError("bad arguments")
self.inner = CompressionSpec(compression)
else:
raise ArgumentTypeError("unsupported compression type")
case "zstd":
if count < 2:
level = 3 # default compression level in zstd
elif count == 2:
level = int(values[1])
if not 1 <= level <= 22:
raise ArgumentTypeError("level must be >= 1 and <= 22")
else:
raise ArgumentTypeError("too many arguments")
self.level = level
case "auto":
if 2 <= count <= 3:
compression = ",".join(values[1:])
else:
raise ArgumentTypeError("bad arguments")
self.inner = CompressionSpec(compression)
case "obfuscate":
if 3 <= count <= 5:
level = int(values[1])
if not ((1 <= level <= 6) or (110 <= level <= 123) or (level == 250)):
raise ArgumentTypeError("level must be (inclusively) within 1...6, 110...123 or equal to 250")
self.level = level
compression = ",".join(values[2:])
else:
raise ArgumentTypeError("bad arguments")
self.inner = CompressionSpec(compression)
case _:
raise ArgumentTypeError("unsupported compression type")
@property
def compressor(self):
from ..compress import get_compressor
if self.name in ("none", "lz4"):
return get_compressor(self.name)
elif self.name in ("zlib", "lzma", "zstd", "zlib_legacy"):
return get_compressor(self.name, level=self.level)
elif self.name == "auto":
return get_compressor(self.name, compressor=self.inner.compressor)
elif self.name == "obfuscate":
return get_compressor(self.name, level=self.level, compressor=self.inner.compressor)
match self.name:
case "none" | "lz4":
return get_compressor(self.name)
case "zlib" | "lzma" | "zstd" | "zlib_legacy":
return get_compressor(self.name, level=self.level)
case "auto":
return get_compressor(self.name, compressor=self.inner.compressor)
case "obfuscate":
return get_compressor(self.name, level=self.level, compressor=self.inner.compressor)
def __str__(self):
if self.name in ("none", "lz4"):
return f"{self.name}"
elif self.name in ("zlib", "lzma", "zstd", "zlib_legacy"):
return f"{self.name},{self.level}"
elif self.name == "auto":
return f"auto,{self.inner}"
elif self.name == "obfuscate":
return f"obfuscate,{self.level},{self.inner}"
else:
raise ValueError(f"unsupported compression type: {self.name}")
match self.name:
case "none" | "lz4":
return f"{self.name}"
case "zlib" | "lzma" | "zstd" | "zlib_legacy":
return f"{self.name},{self.level}"
case "auto":
return f"auto,{self.inner}"
case "obfuscate":
return f"obfuscate,{self.level},{self.inner}"
case _:
raise ValueError(f"unsupported compression type: {self.name}")
def ChunkerParams(s):

View file

@ -139,20 +139,21 @@ def calculate_relative_offset(format_string, from_ts, earlier=False):
offset = int(match.group("offset"))
offset *= -1 if earlier else 1
if unit == "y":
return from_ts.replace(year=from_ts.year + offset)
elif unit == "m":
return offset_n_months(from_ts, offset)
elif unit == "w":
return from_ts + timedelta(days=offset * 7)
elif unit == "d":
return from_ts + timedelta(days=offset)
elif unit == "H":
return from_ts + timedelta(seconds=offset * 60 * 60)
elif unit == "M":
return from_ts + timedelta(seconds=offset * 60)
elif unit == "S":
return from_ts + timedelta(seconds=offset)
match unit:
case "y":
return from_ts.replace(year=from_ts.year + offset)
case "m":
return offset_n_months(from_ts, offset)
case "w":
return from_ts + timedelta(days=offset * 7)
case "d":
return from_ts + timedelta(days=offset)
case "H":
return from_ts + timedelta(seconds=offset * 60 * 60)
case "M":
return from_ts + timedelta(seconds=offset * 60)
case "S":
return from_ts + timedelta(seconds=offset)
raise ValueError(f"Invalid relative ts offset format: {format_string}")

View file

@ -41,35 +41,36 @@ def borg_permissions(permissions):
The namespaces match the borg repository layout (see Repository.__init__ ns_config).
"""
if permissions == "all":
return None # permissions system will not be used
elif permissions == "no-delete": # mostly no delete, no overwrite
return {
"": "lr",
"archives": "lrw",
"cache": "lrwWD", # WD for chunks.<HASH>, last-key-checked, ...
"config": "lrW", # W for manifest
"keys": "lr",
"locks": "lrwD", # borg needs to create/delete a shared lock here
"packs": "lrw",
}
elif permissions == "write-only": # mostly no reading
return {
"": "l",
"archives": "lw",
"cache": "lrwWD", # read allowed, e.g. for chunks.<HASH> cache
"config": "lrW", # W for manifest
"keys": "lr",
"locks": "lrwD", # borg needs to create/delete a shared lock here
"packs": "lw", # no r!
}
elif permissions == "read-only": # mostly r/o
return {"": "lr", "locks": "lrwD"}
else:
raise Error(
f"Invalid BORG_REPO_PERMISSIONS value: {permissions}, should be one of: "
f"all, no-delete, write-only, read-only."
)
match permissions:
case "all":
return None # permissions system will not be used
case "no-delete": # mostly no delete, no overwrite
return {
"": "lr",
"archives": "lrw",
"cache": "lrwWD", # WD for chunks.<HASH>, last-key-checked, ...
"config": "lrW", # W for manifest
"keys": "lr",
"locks": "lrwD", # borg needs to create/delete a shared lock here
"packs": "lrw",
}
case "write-only": # mostly no reading
return {
"": "l",
"archives": "lw",
"cache": "lrwWD", # read allowed, e.g. for chunks.<HASH> cache
"config": "lrW", # W for manifest
"keys": "lr",
"locks": "lrwD", # borg needs to create/delete a shared lock here
"packs": "lw", # no r!
}
case "read-only": # mostly r/o
return {"": "lr", "locks": "lrwD"}
case _:
raise Error(
f"Invalid BORG_REPO_PERMISSIONS value: {permissions}, should be one of: "
f"all, no-delete, write-only, read-only."
)
def rest_serve_command(location):