Replace Union[S, T] with S | T

Generated with: ruff check --extend-select UP007 --fix && black .

(cherry picked from commit ce9c9a1a9c)
This commit is contained in:
Štěpán Balážik 2026-02-09 15:40:36 +01:00
parent a7ef013f6d
commit 95f49d58b9
8 changed files with 37 additions and 50 deletions

View file

@ -12,16 +12,7 @@ information regarding copyright ownership.
"""
from dataclasses import dataclass, field
from typing import (
Any,
AsyncGenerator,
Callable,
Coroutine,
Optional,
Sequence,
Union,
cast,
)
from typing import Any, AsyncGenerator, Callable, Coroutine, Optional, Sequence, cast
import abc
import asyncio
@ -318,7 +309,7 @@ class ResponseAction(abc.ABC):
"""
@abc.abstractmethod
async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
async def perform(self) -> Optional[dns.message.Message | bytes]:
"""
This method is expected to carry out arbitrary actions (e.g. wait for a
specific amount of time, modify the answer, etc.) and then return the
@ -345,7 +336,7 @@ class DnsResponseSend(ResponseAction):
delay: float = 0.0
acknowledge_hand_rolled_response: bool = False
async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
async def perform(self) -> Optional[dns.message.Message | bytes]:
"""
Yield a potentially delayed response that is a dns.message.Message.
"""
@ -391,7 +382,7 @@ class BytesResponseSend(ResponseAction):
response: bytes
delay: float = 0.0
async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
async def perform(self) -> Optional[dns.message.Message | bytes]:
"""
Yield a potentially delayed response that is a sequence of bytes.
"""
@ -408,7 +399,7 @@ class ResponseDrop(ResponseAction):
Action which does nothing - as if a packet was dropped.
"""
async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
async def perform(self) -> Optional[dns.message.Message | bytes]:
return None
@ -426,7 +417,7 @@ class CloseConnection(ResponseAction):
delay: float = 0.0
async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
async def perform(self) -> Optional[dns.message.Message | bytes]:
if self.delay > 0:
logging.info("Waiting %.1fs before closing TCP connection", self.delay)
await asyncio.sleep(self.delay)
@ -1050,9 +1041,9 @@ class AsyncDnsServer(AsyncServer):
/,
default_rcode: dns.rcode.Rcode = dns.rcode.REFUSED,
default_aa: bool = False,
keyring: Union[
dict[dns.name.Name, dns.tsig.Key], None, _NoKeyringType
] = _NoKeyringType(),
keyring: (
dict[dns.name.Name, dns.tsig.Key] | None | _NoKeyringType
) = _NoKeyringType(),
acknowledge_manual_dname_handling: bool = False,
) -> None:
super().__init__(self._handle_udp, self._handle_tcp, "ans.pid")
@ -1295,7 +1286,7 @@ class AsyncDnsServer(AsyncServer):
)
def _log_response(
self, qctx: QueryContext, response: Optional[Union[dns.message.Message, bytes]]
self, qctx: QueryContext, response: Optional[dns.message.Message | bytes]
) -> None:
if not response:
logging.info(
@ -1395,7 +1386,7 @@ class AsyncDnsServer(AsyncServer):
async def _prepare_responses(
self, qctx: QueryContext
) -> AsyncGenerator[Optional[Union[dns.message.Message, bytes]], None]:
) -> AsyncGenerator[Optional[dns.message.Message | bytes], None]:
"""
Yield response(s) either from response handlers or zone data.
"""
@ -1609,7 +1600,7 @@ class ControllableAsyncDnsServer(AsyncDnsServer):
async def _prepare_responses(
self, qctx: QueryContext
) -> AsyncGenerator[Optional[Union[dns.message.Message, bytes]], None]:
) -> AsyncGenerator[Optional[dns.message.Message | bytes], None]:
"""
Detect and handle control queries, falling back to normal processing
for non-control queries.

View file

@ -11,7 +11,6 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import Union
from warnings import warn
import collections.abc
@ -40,9 +39,7 @@ def dns_names(
draw,
*,
prefix: dns.name.Name = dns.name.empty,
suffix: Union[
dns.name.Name, collections.abc.Iterable[dns.name.Name]
] = dns.name.root,
suffix: dns.name.Name | collections.abc.Iterable[dns.name.Name] = dns.name.root,
min_labels: int = 1,
max_labels: int = 128,
) -> dns.name.Name:

View file

@ -13,7 +13,7 @@ from datetime import datetime, timedelta, timezone
from functools import total_ordering
from pathlib import Path
from re import compile as Re
from typing import Optional, Union
from typing import Optional
import glob
import os
@ -132,26 +132,26 @@ class KeyTimingMetadata:
def __str__(self) -> str:
return self.value.strftime(self.FORMAT)
def __add__(self, other: Union[timedelta, int]):
def __add__(self, other: timedelta | int):
if isinstance(other, int):
other = timedelta(seconds=other)
result = KeyTimingMetadata.__new__(KeyTimingMetadata)
result.value = self.value + other
return result
def __sub__(self, other: Union[timedelta, int]):
def __sub__(self, other: timedelta | int):
if isinstance(other, int):
other = timedelta(seconds=other)
result = KeyTimingMetadata.__new__(KeyTimingMetadata)
result.value = self.value - other
return result
def __iadd__(self, other: Union[timedelta, int]):
def __iadd__(self, other: timedelta | int):
if isinstance(other, int):
other = timedelta(seconds=other)
self.value += other
def __isub__(self, other: Union[timedelta, int]):
def __isub__(self, other: timedelta | int):
if isinstance(other, int):
other = timedelta(seconds=other)
self.value -= other
@ -186,7 +186,7 @@ class KeyProperties:
flags: int = 257,
keytag_min: int = 0,
keytag_max: int = 65535,
offset: Union[timedelta, int] = 0,
offset: timedelta | int = 0,
):
self.name = name
self.key = None
@ -324,7 +324,7 @@ class Key:
operations for KASP tests.
"""
def __init__(self, name: str, keydir: Optional[Union[str, Path]] = None):
def __init__(self, name: str, keydir: Optional[str | Path] = None):
self.name = name
if keydir is None:
self.keydir = Path()
@ -1570,7 +1570,7 @@ def policy_to_properties(ttl, keys: list[str]) -> list[KeyProperties]:
line = key.split()
# defaults
metadata: dict[str, Union[str, int]] = {}
metadata: dict[str, str | int] = {}
timing: dict[str, KeyTimingMetadata] = {}
private = True
legacy = False

View file

@ -13,7 +13,7 @@
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional, Union
from typing import Any, Optional
import jinja2
@ -26,7 +26,7 @@ class TemplateEngine:
Engine for rendering jinja2 templates in system test directories.
"""
def __init__(self, directory: Union[str, Path], env_vars=ALL):
def __init__(self, directory: str | Path, env_vars=ALL):
"""
Initialize the template engine for `directory`, optionally overriding
the `env_vars` that will be used when rendering the templates (defaults

View file

@ -9,7 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import NamedTuple, Optional, Union
from typing import NamedTuple, Optional
import os
import platform
@ -69,14 +69,14 @@ class Algorithm(NamedTuple):
class AlgorithmSet(NamedTuple):
"""Collection of DEFAULT, ALTERNATIVE and DISABLED algorithms"""
default: Union[Algorithm, list[Algorithm]]
default: Algorithm | list[Algorithm]
"""DEFAULT is the algorithm for testing."""
alternative: Union[Algorithm, list[Algorithm]]
alternative: Algorithm | list[Algorithm]
"""ALTERNATIVE is an alternative algorithm for test cases that require more
than one algorithm (for example algorithm rollover)."""
disabled: Union[Algorithm, list[Algorithm]]
disabled: Algorithm | list[Algorithm]
"""DISABLED is an algorithm that is used for tests against the
"disable-algorithms" configuration option."""

View file

@ -10,10 +10,9 @@
# information regarding copyright ownership.
from pathlib import Path
from typing import Dict
def load_ac_vars_from_files() -> Dict[str, str]:
def load_ac_vars_from_files() -> dict[str, str]:
ac_vars = {}
ac_vars_dir = Path(__file__).resolve().parent / ".ac_vars"
var_paths = [

View file

@ -11,7 +11,7 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator, Union
from typing import AsyncGenerator
import dns.name
import dns.rcode
@ -52,7 +52,7 @@ class BadGoodDnameNsHandler(QnameQtypeHandler, StaticResponseHandler):
def _cname_rrsets(
qname: Union[dns.name.Name, str],
qname: dns.name.Name | str,
) -> tuple[dns.rrset.RRset, dns.rrset.RRset]:
return (
rrset(qname, dns.rdatatype.CNAME, f"{qname}"),

View file

@ -11,7 +11,7 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator, NamedTuple, Union
from typing import AsyncGenerator, NamedTuple
import abc
@ -29,7 +29,7 @@ from isctest.asyncserver import (
def rrset(
qname: Union[dns.name.Name, str],
qname: dns.name.Name | str,
rtype: dns.rdatatype.RdataType,
rdata: str,
ttl: int = 300,
@ -38,7 +38,7 @@ def rrset(
def rrset_from_list(
qname: Union[dns.name.Name, str],
qname: dns.name.Name | str,
rtype: dns.rdatatype.RdataType,
rdata_list: list[str],
ttl: int = 300,
@ -46,7 +46,7 @@ def rrset_from_list(
return dns.rrset.from_text_list(qname, ttl, dns.rdataclass.IN, rtype, rdata_list)
def soa_rrset(qname: Union[dns.name.Name, str]) -> dns.rrset.RRset:
def soa_rrset(qname: dns.name.Name | str) -> dns.rrset.RRset:
return rrset(qname, dns.rdatatype.SOA, ". . 0 0 0 0 0")
@ -56,9 +56,9 @@ class DelegationRRsets(NamedTuple):
def delegation_rrsets(
owner: Union[dns.name.Name, str],
owner: dns.name.Name | str,
server_number: int,
ns_name: Union[dns.name.Name, str, None] = None,
ns_name: dns.name.Name | str | None = None,
) -> DelegationRRsets:
if ns_name is None:
ns_name = f"ns.{owner}"
@ -68,7 +68,7 @@ def delegation_rrsets(
def setup_delegation(
qctx: QueryContext, owner: Union[dns.name.Name, str], server_number: int
qctx: QueryContext, owner: dns.name.Name | str, server_number: int
) -> None:
delegation = delegation_rrsets(owner, server_number)
qctx.response.authority.append(delegation.ns_rrset)