Built-in types are now subscriptable

Generated with: ruff check --extend-select UP006 --fix

(cherry picked from commit 790745da18)
This commit is contained in:
Štěpán Balážik 2026-02-09 15:33:22 +01:00
parent 5fc2b9b2f3
commit a7ef013f6d
22 changed files with 129 additions and 134 deletions

View file

@ -11,7 +11,7 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import Dict, List, Optional, Type
from typing import Optional
import abc
@ -30,7 +30,7 @@ from isctest.asyncserver import (
class ResponseSpoofer(ResponseHandler, abc.ABC):
spoofers: Dict[str, Type["ResponseSpoofer"]] = {}
spoofers: dict[str, type["ResponseSpoofer"]] = {}
def __init_subclass__(cls, mode: str) -> None:
assert mode not in cls.spoofers
@ -69,7 +69,7 @@ class SetSpoofingModeCommand(ControlCommand):
self._current_handler: Optional[ResponseSpoofer] = None
def handle(
self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> Optional[str]:
if len(args) != 1:
qctx.response.set_rcode(dns.rcode.SERVFAIL)

View file

@ -9,7 +9,6 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import Dict
import time
@ -22,7 +21,7 @@ import isctest
@pytest.fixture(autouse=True)
def autouse_flush_resolver_cache(servers: Dict[str, NamedInstance]) -> None:
def autouse_flush_resolver_cache(servers: dict[str, NamedInstance]) -> None:
servers["ns4"].rndc("flush")
@ -78,7 +77,7 @@ def check_domain_hijack(ns4: NamedInstance) -> None:
)
def test_bailiwick_sibling_ns_referral(servers: Dict[str, NamedInstance]) -> None:
def test_bailiwick_sibling_ns_referral(servers: dict[str, NamedInstance]) -> None:
set_spoofing_mode(ans1="sibling-ns", ans2="none")
ns4 = servers["ns4"]
@ -86,7 +85,7 @@ def test_bailiwick_sibling_ns_referral(servers: Dict[str, NamedInstance]) -> Non
check_domain_hijack(ns4)
def test_bailiwick_unsolicited_authority(servers: Dict[str, NamedInstance]) -> None:
def test_bailiwick_unsolicited_authority(servers: dict[str, NamedInstance]) -> None:
set_spoofing_mode(ans1="none", ans2="unsolicited-ns")
ns4 = servers["ns4"]
@ -95,7 +94,7 @@ def test_bailiwick_unsolicited_authority(servers: Dict[str, NamedInstance]) -> N
check_domain_hijack(ns4)
def test_bailiwick_parent_glue(servers: Dict[str, NamedInstance]) -> None:
def test_bailiwick_parent_glue(servers: dict[str, NamedInstance]) -> None:
set_spoofing_mode(ans1="none", ans2="parent-glue")
ns4 = servers["ns4"]
@ -108,7 +107,7 @@ def test_bailiwick_parent_glue(servers: Dict[str, NamedInstance]) -> None:
check_domain_hijack(ns4)
def test_bailiwick_spoofed_dname(servers: Dict[str, NamedInstance]) -> None:
def test_bailiwick_spoofed_dname(servers: dict[str, NamedInstance]) -> None:
set_spoofing_mode(ans1="none", ans2="dname")
ns4 = servers["ns4"]

View file

@ -13,7 +13,7 @@ information regarding copyright ownership.
from dataclasses import dataclass
from enum import Enum
from typing import AsyncGenerator, List, Optional, Tuple
from typing import AsyncGenerator, Optional
import abc
import logging
@ -123,7 +123,7 @@ class RecordGenerator(abc.ABC):
def __init__(self, name_generator: ChainNameGenerator) -> None:
self._name_generator = name_generator
def get_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
def get_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
"""
Return the lists of records and their signatures that should be
generated in response to a given "action".
@ -155,7 +155,7 @@ class RecordGenerator(abc.ABC):
raise NotImplementedError
@abc.abstractmethod
def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
"""
Return the lists of records and their signatures that should be
generated in response to a given "action".
@ -170,7 +170,7 @@ class CnameRecordGenerator(RecordGenerator):
response_count = 1
def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
owner = self._name_generator.current_name
target = self._name_generator.generate_next_name().to_text()
response = self.create_rrset(owner, dns.rdatatype.CNAME, target)
@ -182,7 +182,7 @@ class DnameRecordGenerator(RecordGenerator):
response_count = 2
def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
dname_owner = self._name_generator.current_domain
cname_owner = self._name_generator.current_name
dname_target = self._name_generator.generate_next_sld().to_text()
@ -206,7 +206,7 @@ class XnameRecordGenerator(RecordGenerator):
response_count = 1
def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
owner = self._name_generator.current_name
target = self._name_generator.generate_next_name_in_next_sld().to_text()
response = self.create_rrset(owner, dns.rdatatype.CNAME, target)
@ -218,7 +218,7 @@ class FinalRecordGenerator(RecordGenerator):
response_count = 1
def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
owner = self._name_generator.current_name
response = self.create_rrset(owner, dns.rdatatype.A, "10.53.0.4")
signature = self.create_rrset_signature(owner, response.rdtype)
@ -300,7 +300,7 @@ class ChainSetupCommand(ControlCommand):
self._current_handler: Optional[ChainResponseHandler] = None
def handle(
self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> Optional[str]:
try:
actions, selectors = self._parse_args(args)
@ -320,8 +320,8 @@ class ChainSetupCommand(ControlCommand):
return "chain response setup successful"
def _parse_args(
self, args: List[str]
) -> Tuple[List[ChainAction], List[ChainSelector]]:
self, args: list[str]
) -> tuple[list[ChainAction], list[ChainSelector]]:
try:
delimiter = args.index("_")
except ValueError as exc:
@ -335,7 +335,7 @@ class ChainSetupCommand(ControlCommand):
return actions, selectors
def _parse_args_actions(self, args_actions: List[str]) -> List[ChainAction]:
def _parse_args_actions(self, args_actions: list[str]) -> list[ChainAction]:
actions = []
for action in args_actions + ["FINAL"]:
@ -347,8 +347,8 @@ class ChainSetupCommand(ControlCommand):
return actions
def _parse_args_selectors(
self, args_selectors: List[str], actions: List[ChainAction]
) -> List[ChainSelector]:
self, args_selectors: list[str], actions: list[ChainAction]
) -> list[ChainSelector]:
max_response_index = self._get_max_response_index(actions)
selectors = []
@ -366,19 +366,19 @@ class ChainSetupCommand(ControlCommand):
return selectors
def _get_max_response_index(self, actions: List[ChainAction]) -> int:
def _get_max_response_index(self, actions: list[ChainAction]) -> int:
rrset_generator_classes = [a.value for a in actions]
return sum(g.response_count for g in rrset_generator_classes)
def _prepare_answer(
self, actions: List[ChainAction], selectors: List[ChainSelector]
) -> List[dns.rrset.RRset]:
self, actions: list[ChainAction], selectors: list[ChainSelector]
) -> list[dns.rrset.RRset]:
all_responses, all_signatures = self._generate_rrsets(actions)
return self._select_rrsets(all_responses, all_signatures, selectors)
def _generate_rrsets(
self, actions: List[ChainAction]
) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
self, actions: list[ChainAction]
) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
all_responses = []
all_signatures = []
name_generator = ChainNameGenerator()
@ -394,10 +394,10 @@ class ChainSetupCommand(ControlCommand):
def _select_rrsets(
self,
all_responses: List[dns.rrset.RRset],
all_signatures: List[dns.rrset.RRset],
selectors: List[ChainSelector],
) -> List[dns.rrset.RRset]:
all_responses: list[dns.rrset.RRset],
all_signatures: list[dns.rrset.RRset],
selectors: list[ChainSelector],
) -> list[dns.rrset.RRset]:
rrsets = []
for selector in selectors:
@ -418,7 +418,7 @@ class ChainResponseHandler(DomainHandler):
domains = ["domain.nil."]
def __init__(self, answer_rrsets: List[dns.rrset.RRset]):
def __init__(self, answer_rrsets: list[dns.rrset.RRset]):
super().__init__()
self._answer_rrsets = answer_rrsets
@ -441,7 +441,7 @@ class ChainResponseHandler(DomainHandler):
qctx.response.use_edns()
yield DnsResponseSend(qctx.response)
def _non_chain_answer(self, qctx: QueryContext) -> List[dns.rrset.RRset]:
def _non_chain_answer(self, qctx: QueryContext) -> list[dns.rrset.RRset]:
owner = qctx.qname
return [
RecordGenerator.create_rrset(owner, dns.rdatatype.A, "10.53.0.4"),
@ -449,14 +449,14 @@ class ChainResponseHandler(DomainHandler):
]
@property
def _authority_rrsets(self) -> List[dns.rrset.RRset]:
def _authority_rrsets(self) -> list[dns.rrset.RRset]:
owner = dns.name.from_text("domain.nil.")
return [
RecordGenerator.create_rrset(owner, dns.rdatatype.NS, "ns1.domain.nil."),
]
@property
def _additional_rrsets(self) -> List[dns.rrset.RRset]:
def _additional_rrsets(self) -> list[dns.rrset.RRset]:
owner = dns.name.from_text("ns1.domain.nil.")
return [
RecordGenerator.create_rrset(owner, dns.rdatatype.A, "10.53.0.4"),

View file

@ -12,7 +12,7 @@
# information regarding copyright ownership.
from typing import NamedTuple, Tuple
from typing import NamedTuple
import os
import sys
@ -189,7 +189,7 @@ def keystate_check(server, zone, key):
class CheckDSTest(NamedTuple):
zone: str
logs_to_wait_for: Tuple[str]
logs_to_wait_for: tuple[str]
expected_parent_state: str

View file

@ -9,7 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import AsyncGenerator, List, Optional
from typing import AsyncGenerator, Optional
import logging
@ -31,7 +31,7 @@ from isctest.asyncserver import (
class ErraticAxfrHandler(ResponseHandler):
allowed_actions = ["no-response", "partial-axfr", "complete-axfr"]
def __init__(self, actions: List[str]) -> None:
def __init__(self, actions: list[str]) -> None:
self.actions = actions
self.counter = 0
for action in actions:
@ -72,7 +72,7 @@ class ResponseSequenceCommand(ControlCommand):
self._current_handler: Optional[ResponseHandler] = None
def handle(
self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> str:
for action in args:
if action not in ErraticAxfrHandler.allowed_actions:

View file

@ -17,12 +17,8 @@ from typing import (
AsyncGenerator,
Callable,
Coroutine,
Dict,
List,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)
@ -57,7 +53,7 @@ import dns.version
import dns.zone
_UdpHandler = Callable[
[bytes, Tuple[str, int], asyncio.DatagramTransport], Coroutine[Any, Any, None]
[bytes, tuple[str, int], asyncio.DatagramTransport], Coroutine[Any, Any, None]
]
@ -84,7 +80,7 @@ class _AsyncUdpHandler(asyncio.DatagramProtocol):
"""
self._transport = cast(asyncio.DatagramTransport, transport)
def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None:
def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
"""
Called by asyncio when a datagram is received.
"""
@ -133,7 +129,7 @@ class AsyncServer:
logging.info("Setting up IPv4 listener at %s:%d", ipv4_address, port)
logging.info("Setting up IPv6 listener at [%s]:%d", ipv6_address, port)
self._ip_addresses: Tuple[str, str] = (ipv4_address, ipv6_address)
self._ip_addresses: tuple[str, str] = (ipv4_address, ipv6_address)
self._port: int = port
self._udp_handler: Optional[_UdpHandler] = udp_handler
self._tcp_handler: Optional[_TcpHandler] = tcp_handler
@ -186,7 +182,7 @@ class AsyncServer:
loop.set_exception_handler(self._handle_exception)
def _handle_exception(
self, _: asyncio.AbstractEventLoop, context: Dict[str, Any]
self, _: asyncio.AbstractEventLoop, context: dict[str, Any]
) -> None:
assert self._work_done
exception = context.get("exception", RuntimeError(context["message"]))
@ -512,7 +508,7 @@ class IgnoreAllConnections(ConnectionHandler):
client socket, effectively ignoring all incoming connections.
"""
_connections: Set[asyncio.StreamWriter] = field(default_factory=set)
_connections: set[asyncio.StreamWriter] = field(default_factory=set)
async def handle(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, peer: Peer
@ -623,14 +619,14 @@ class QnameHandler(ResponseHandler):
@property
@abc.abstractmethod
def qnames(self) -> List[str]:
def qnames(self) -> list[str]:
"""
A list of QNAMEs handled by this class.
"""
raise NotImplementedError
def __init__(self) -> None:
self._qnames: List[dns.name.Name] = [dns.name.from_text(d) for d in self.qnames]
self._qnames: list[dns.name.Name] = [dns.name.from_text(d) for d in self.qnames]
def __str__(self) -> str:
return f"{self.__class__.__name__}(QNAMEs: {', '.join(self.qnames)})"
@ -653,7 +649,7 @@ class QnameQtypeHandler(QnameHandler):
@property
@abc.abstractmethod
def qtypes(self) -> List[dns.rdatatype.RdataType]:
def qtypes(self) -> list[dns.rdatatype.RdataType]:
"""
A list of QTYPEs handled by this class.
"""
@ -661,7 +657,7 @@ class QnameQtypeHandler(QnameHandler):
def __init__(self) -> None:
super().__init__()
self._qtypes: List[dns.rdatatype.RdataType] = self.qtypes
self._qtypes: list[dns.rdatatype.RdataType] = self.qtypes
def __str__(self) -> str:
return f"{self.__class__.__name__}(QNAMEs: {', '.join(self.qnames)}; QTYPEs: {', '.join(map(str, self.qtypes))})"
@ -755,14 +751,14 @@ class DomainHandler(ResponseHandler):
@property
@abc.abstractmethod
def domains(self) -> List[str]:
def domains(self) -> list[str]:
"""
A list of domain names handled by this class.
"""
raise NotImplementedError
def __init__(self) -> None:
self._domains: List[dns.name.Name] = sorted(
self._domains: list[dns.name.Name] = sorted(
[dns.name.from_text(d) for d in self.domains], reverse=True
)
self._matched_domain: Optional[dns.name.Name] = None
@ -833,7 +829,7 @@ class ForwarderHandler(ResponseHandler):
logging.debug("[OUT] %s", self._query.hex())
cast(asyncio.DatagramTransport, transport).sendto(self._query)
def datagram_received(self, data: bytes, _: Tuple[str, int]) -> None:
def datagram_received(self, data: bytes, _: tuple[str, int]) -> None:
logging.debug("[IN] %s", data.hex())
self._response.set_result(data)
@ -897,7 +893,7 @@ class _ZoneTreeNode:
"""
zone: Optional[dns.zone.Zone]
children: List["_ZoneTreeNode"] = field(default_factory=list)
children: list["_ZoneTreeNode"] = field(default_factory=list)
class _ZoneTree:
@ -977,7 +973,7 @@ class _DnsMessageWithTsigDisabled(dns.message.Message):
from failing on messages initialized with `dns.message.from_wire(keyring=False)`.
"""
def sign(*_: Any, **__: Any) -> Tuple[dns.rdata.Rdata, None]:
def sign(*_: Any, **__: Any) -> tuple[dns.rdata.Rdata, None]:
assert self.tsig
return self.tsig[0], None
@ -1055,7 +1051,7 @@ class AsyncDnsServer(AsyncServer):
default_rcode: dns.rcode.Rcode = dns.rcode.REFUSED,
default_aa: bool = False,
keyring: Union[
Dict[dns.name.Name, dns.tsig.Key], None, _NoKeyringType
dict[dns.name.Name, dns.tsig.Key], None, _NoKeyringType
] = _NoKeyringType(),
acknowledge_manual_dname_handling: bool = False,
) -> None:
@ -1063,7 +1059,7 @@ class AsyncDnsServer(AsyncServer):
self._zone_tree: _ZoneTree = _ZoneTree()
self._connection_handler: Optional[ConnectionHandler] = None
self._response_handlers: List[ResponseHandler] = []
self._response_handlers: list[ResponseHandler] = []
self._default_rcode = default_rcode
self._default_aa = default_aa
self._keyring = keyring
@ -1172,7 +1168,7 @@ class AsyncDnsServer(AsyncServer):
raise ValueError(error)
async def _handle_udp(
self, wire: bytes, addr: Tuple[str, int], transport: asyncio.DatagramTransport
self, wire: bytes, addr: tuple[str, int], transport: asyncio.DatagramTransport
) -> None:
logging.debug("Received UDP message: %s", wire.hex())
socket_info = transport.get_extra_info("sockname")
@ -1592,7 +1588,7 @@ class ControllableAsyncDnsServer(AsyncDnsServer):
return dns.name.from_text(self._CONTROL_DOMAIN)
@functools.cached_property
def _commands(self) -> Dict[dns.name.Name, "ControlCommand"]:
def _commands(self) -> dict[dns.name.Name, "ControlCommand"]:
return {}
def install_control_commands(self, *commands: "ControlCommand") -> None:
@ -1703,7 +1699,7 @@ class ControlCommand(abc.ABC):
@abc.abstractmethod
def handle(
self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> Optional[str]:
"""
This method is expected to carry out arbitrary actions in response to a
@ -1745,7 +1741,7 @@ class ToggleResponsesCommand(ControlCommand):
self._current_handler: Optional[IgnoreAllQueries] = None
def handle(
self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> Optional[str]:
if len(args) != 1:
logging.error("Invalid %s query %s", self, qctx.qname)
@ -1785,11 +1781,11 @@ class SwitchControlCommand(ControlCommand):
control_subdomain = "switch"
def __init__(self, handler_mapping: Dict[str, Sequence[ResponseHandler]]):
def __init__(self, handler_mapping: dict[str, Sequence[ResponseHandler]]):
self._handler_mapping = handler_mapping
def handle(
self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> Optional[str]:
if len(args) != 1 or args[0] not in self._handler_mapping:
logging.error("Invalid %s query %s", self, qctx.qname)

View file

@ -9,7 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import List, Optional, cast
from typing import Optional, cast
import difflib
import shutil
@ -71,10 +71,10 @@ def noraflag(message: dns.message.Message) -> None:
def _extract_ede_options(
message: dns.message.Message,
) -> List[EDEOption]:
) -> list[EDEOption]:
"""Extract EDE options from the DNS message."""
return cast(
List[EDEOption],
list[EDEOption],
[
option
for option in message.options

View file

@ -11,7 +11,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import List, Union
from typing import Union
from warnings import warn
import collections.abc
@ -156,7 +156,7 @@ dns_rdatatypes_without_meta = integers(0, dns.rdatatype.OPT - 1) | integers(dns.
@composite
def _partition_bytes_to_labels(
draw, remaining_bytes: int, number_of_labels: int
) -> List[int]:
) -> list[int]:
two_bytes_reserved_for_label = 2
# Reserve two bytes for each label

View file

@ -12,7 +12,7 @@
# information regarding copyright ownership.
from pathlib import Path
from typing import List, NamedTuple, Optional
from typing import NamedTuple, Optional
import os
import re
@ -175,7 +175,7 @@ class NamedInstance:
watcher.wait_for_line("all zones loaded")
return cmd
def stop(self, args: Optional[List[str]] = None) -> None:
def stop(self, args: Optional[list[str]] = None) -> None:
"""Stop the instance."""
args = args or []
perl(
@ -183,7 +183,7 @@ class NamedInstance:
[self.system_test_name, self.identifier] + args,
)
def start(self, args: Optional[List[str]] = None) -> None:
def start(self, args: Optional[list[str]] = None) -> None:
"""Start the instance."""
args = args or []
perl(

View file

@ -13,7 +13,7 @@ from datetime import datetime, timedelta, timezone
from functools import total_ordering
from pathlib import Path
from re import compile as Re
from typing import Dict, List, Optional, Tuple, Union
from typing import Optional, Union
import glob
import os
@ -178,7 +178,7 @@ class KeyProperties:
self,
name: str,
metadata: dict,
timing: Dict[str, KeyTimingMetadata],
timing: dict[str, KeyTimingMetadata],
private: bool = True,
legacy: bool = False,
role: str = "csk",
@ -217,7 +217,7 @@ class KeyProperties:
"KSK": "yes",
"ZSK": "yes",
}
timing: Dict[str, KeyTimingMetadata] = {}
timing: dict[str, KeyTimingMetadata] = {}
result = KeyProperties(name="DEFAULT", metadata=metadata, timing=timing)
result.name = "DEFAULT"
@ -379,7 +379,7 @@ class Key:
def get_signing_state(
self, offline_ksk=False, zsk_missing=False, smooth=False
) -> Tuple[bool, bool]:
) -> tuple[bool, bool]:
"""
This returns the signing state derived from the key states, KRRSIGState
and ZRRSIGState.
@ -1504,7 +1504,7 @@ def next_key_event_equals(server, zone, next_event):
def keydir_to_keylist(
zone: Optional[str], keydir: Optional[str] = None, in_use: bool = False
) -> List[Key]:
) -> list[Key]:
"""
Retrieve all keys from the key files in a directory. If 'zone' is None,
retrieve all keys in the directory, otherwise only those matching the
@ -1544,11 +1544,11 @@ def keydir_to_keylist(
return [k for k in all_keys if used(k)]
def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]:
def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> list[Key]:
return [Key(name, keydir) for name in keystr.split()]
def policy_to_properties(ttl, keys: List[str]) -> List[KeyProperties]:
def policy_to_properties(ttl, keys: list[str]) -> list[KeyProperties]:
"""
Get the policies from a list of specially formatted strings.
The splitted line should result in the following items:
@ -1570,8 +1570,8 @@ def policy_to_properties(ttl, keys: List[str]) -> List[KeyProperties]:
line = key.split()
# defaults
metadata: Dict[str, Union[str, int]] = {}
timing: Dict[str, KeyTimingMetadata] = {}
metadata: dict[str, Union[str, int]] = {}
timing: dict[str, KeyTimingMetadata] = {}
private = True
legacy = False
keytag_min = 0

View file

@ -17,11 +17,11 @@ import textwrap
LOG_FORMAT = "%(asctime)s %(levelname)7s:%(name)s %(message)s"
LOG_INDENT = 4
LOGGERS = {
LOGGERS: dict[str, logging.Logger | None] = {
"conftest": None,
"module": None,
"test": None,
} # type: Dict[str, Optional[logging.Logger]]
}
def init_conftest_logger():

View file

@ -9,7 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import Any, List, Match, Optional, Pattern, TextIO, TypeVar, Union
from typing import Any, Match, Optional, Pattern, TextIO, TypeVar, Union
import abc
import os
@ -18,7 +18,7 @@ import time
from isctest.text import FlexPattern, LineReader, compile_pattern
T = TypeVar("T")
OneOrMore = Union[T, List[T]]
OneOrMore = Union[T, list[T]]
class WatchLogException(Exception):
@ -71,12 +71,12 @@ class WatchLog(abc.ABC):
self._timeout = timeout
self._deadline = 0.0
def _setup_wait(self, patterns: OneOrMore[FlexPattern]) -> List[Pattern]:
def _setup_wait(self, patterns: OneOrMore[FlexPattern]) -> list[Pattern]:
self._wait_function_called = True
self._deadline = time.monotonic() + self._timeout
return self._prepare_patterns(patterns)
def _prepare_patterns(self, strings: OneOrMore[FlexPattern]) -> List[Pattern]:
def _prepare_patterns(self, strings: OneOrMore[FlexPattern]) -> list[Pattern]:
"""
Convert a mix of string(s) and/or pattern(s) into a list of patterns.
@ -90,7 +90,7 @@ class WatchLog(abc.ABC):
patterns.append(compile_pattern(string))
return patterns
def _wait_for_match(self, regexes: List[Pattern]) -> Match:
def _wait_for_match(self, regexes: list[Pattern]) -> Match:
if not self._reader:
raise WatchLogException(
"use WatchLog as context manager before calling wait_for_*() functions"
@ -209,7 +209,7 @@ class WatchLog(abc.ABC):
return self._wait_for_match(regexes)
def wait_for_sequence(self, patterns: List[FlexPattern]) -> List[Match]:
def wait_for_sequence(self, patterns: list[FlexPattern]) -> list[Match]:
"""
Block execution until the specified pattern sequence is found in the
log file.
@ -285,7 +285,7 @@ class WatchLog(abc.ABC):
return matches
def wait_for_all(self, patterns: List[FlexPattern]) -> List[Match]:
def wait_for_all(self, patterns: list[FlexPattern]) -> list[Match]:
"""
Block execution until all the specified patterns are found in the
log file in any order.

View file

@ -9,7 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import FrozenSet, Iterable
from typing import Iterable
from dns.name import Name
@ -26,7 +26,7 @@ def len_wire_uncompressed(name: Name) -> int:
return len(name) + sum(map(len, name.labels))
def get_wildcard_names(names: Iterable[Name]) -> FrozenSet[Name]:
def get_wildcard_names(names: Iterable[Name]) -> frozenset[Name]:
return frozenset(name for name in names if name.is_wild())
@ -84,7 +84,7 @@ class ZoneAnalyzer:
.union(self.reachable_dnames)
)
def get_names_with_type(self, rdtype) -> FrozenSet[Name]:
def get_names_with_type(self, rdtype) -> frozenset[Name]:
return frozenset(
name for name in self.zone if self.zone.get_rdataset(name, rdtype)
)
@ -148,7 +148,7 @@ class ZoneAnalyzer:
self.reachable_delegations = frozenset(reachable_delegations)
self.occluded = frozenset(occluded)
def generate_ents(self) -> FrozenSet[Name]:
def generate_ents(self) -> frozenset[Name]:
"""
Generate reachable names of empty nodes "between" all reachable
names with a RR and the origin.

View file

@ -10,7 +10,7 @@
# information regarding copyright ownership.
from pathlib import Path
from typing import List, Optional
from typing import Optional
import os
import subprocess
@ -98,7 +98,7 @@ class EnvCmd:
def _run_script(
interpreter: str,
script: str,
args: Optional[List[str]] = None,
args: Optional[list[str]] = None,
):
if args is None:
args = []
@ -130,12 +130,12 @@ def _run_script(
isctest.log.debug(" exited with %d", returncode)
def shell(script: str, args: Optional[List[str]] = None) -> None:
def shell(script: str, args: Optional[list[str]] = None) -> None:
"""Run a given script with system's shell interpreter."""
_run_script(os.environ["SHELL"], script, args)
def perl(script: str, args: Optional[List[str]] = None) -> None:
def perl(script: str, args: Optional[list[str]] = None) -> None:
"""Run a given script with system's perl interpreter."""
_run_script(os.environ["PERL"], script, args)

View file

@ -13,7 +13,7 @@
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional, Union
from typing import Any, Optional, Union
import jinja2
@ -44,7 +44,7 @@ class TemplateEngine:
def render(
self,
output: str,
data: Optional[Dict[str, Any]] = None,
data: Optional[dict[str, Any]] = None,
template: Optional[str] = None,
) -> None:
"""
@ -69,7 +69,7 @@ class TemplateEngine:
stream = self.j2env.get_template(template).stream(data)
stream.dump(output, encoding="utf-8")
def render_auto(self, data: Optional[Dict[str, Any]] = None):
def render_auto(self, data: Optional[dict[str, Any]] = None):
"""
Render all *.j2 templates with default (and optionally the provided)
values and write the output to files without the .j2 extensions.

View file

@ -12,7 +12,7 @@
# information regarding copyright ownership.
from re import compile as Re
from typing import Iterator, List, Match, Optional, Pattern, TextIO, Union
from typing import Iterator, Match, Optional, Pattern, TextIO, Union
import abc
import re
@ -48,7 +48,7 @@ class Grep(abc.ABC):
if match:
yield match
def grep(self, pattern: FlexPattern) -> List[Match]:
def grep(self, pattern: FlexPattern) -> list[Match]:
"""
Get list of lines matching the pattern.
"""

View file

@ -9,7 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import Dict, List, NamedTuple, Optional, Union
from typing import NamedTuple, Optional, Union
import os
import platform
@ -69,14 +69,14 @@ class Algorithm(NamedTuple):
class AlgorithmSet(NamedTuple):
"""Collection of DEFAULT, ALTERNATIVE and DISABLED algorithms"""
default: Union[Algorithm, List[Algorithm]]
default: Union[Algorithm, list[Algorithm]]
"""DEFAULT is the algorithm for testing."""
alternative: Union[Algorithm, List[Algorithm]]
alternative: Union[Algorithm, list[Algorithm]]
"""ALTERNATIVE is an alternative algorithm for test cases that require more
than one algorithm (for example algorithm rollover)."""
disabled: Union[Algorithm, List[Algorithm]]
disabled: Union[Algorithm, list[Algorithm]]
"""DISABLED is an algorithm that is used for tests against the
"disable-algorithms" configuration option."""
@ -160,7 +160,7 @@ CRYPTO_SUPPORTED_VARS = {
"ED448_SUPPORTED": "0",
}
SUPPORTED_ALGORITHMS: List[Algorithm] = []
SUPPORTED_ALGORITHMS: list[Algorithm] = []
def init_crypto_supported():
@ -250,7 +250,7 @@ def _select_random(algs: AlgorithmSet, stable_period=STABLE_PERIOD) -> Algorithm
return AlgorithmSet(default, alternative, disabled)
def _algorithms_env(algs: AlgorithmSet, name: str) -> Dict[str, str]:
def _algorithms_env(algs: AlgorithmSet, name: str) -> dict[str, str]:
"""Return environment variables with selected algorithms as a dict."""
algs_env = {
"ALGORITHM_SET": name,

View file

@ -17,7 +17,7 @@
from dataclasses import dataclass
from pathlib import Path
from typing import Container, Iterable, Optional, Set, Tuple
from typing import Container, Iterable, Optional
import os
@ -63,7 +63,7 @@ def is_related_to_any(
def do_test_query(
qname: dns.name.Name, qtype: dns.rdatatype.RdataType, server: str, named_port: int
) -> Tuple[dns.message.QueryMessage, "NSEC3Checker"]:
) -> tuple[dns.message.QueryMessage, "NSEC3Checker"]:
query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
response = isctest.query.tcp(query, server, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response, query)
@ -349,8 +349,8 @@ class NSEC3Checker:
assert attrs_seen["algorithm"] is not None, f"no NSEC3 found\n{response}"
self.params: NSEC3Params = NSEC3Params(**attrs_seen)
self.response: dns.message.Message = response
self.owners_present: Set[dns.name.Name] = owners_seen
self.owners_used: Set[dns.name.Name] = set()
self.owners_present: set[dns.name.Name] = owners_seen
self.owners_used: set[dns.name.Name] = set()
@staticmethod
def nsec3_covers(rrset: dns.rrset.RRset, hashed_name: dns.name.Name) -> bool:

View file

@ -11,7 +11,7 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator, Tuple, Union
from typing import AsyncGenerator, Union
import dns.name
import dns.rcode
@ -53,7 +53,7 @@ class BadGoodDnameNsHandler(QnameQtypeHandler, StaticResponseHandler):
def _cname_rrsets(
qname: Union[dns.name.Name, str],
) -> Tuple[dns.rrset.RRset, dns.rrset.RRset]:
) -> tuple[dns.rrset.RRset, dns.rrset.RRset]:
return (
rrset(qname, dns.rdatatype.CNAME, f"{qname}"),
rrset(qname, dns.rdatatype.A, "1.2.3.4"),

View file

@ -11,7 +11,7 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator, List, NamedTuple, Union
from typing import AsyncGenerator, NamedTuple, Union
import abc
@ -40,7 +40,7 @@ def rrset(
def rrset_from_list(
qname: Union[dns.name.Name, str],
rtype: dns.rdatatype.RdataType,
rdata_list: List[str],
rdata_list: list[str],
ttl: int = 300,
) -> dns.rrset.RRset:
return dns.rrset.from_text_list(qname, ttl, dns.rdataclass.IN, rtype, rdata_list)

View file

@ -9,8 +9,6 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import List
import shutil
from isctest.kasp import private_type_record
@ -21,7 +19,7 @@ from isctest.vars.algorithms import Algorithm
import isctest
def configure_tld(zonename: str, delegations: List[Zone]) -> Zone:
def configure_tld(zonename: str, delegations: list[Zone]) -> Zone:
templates = isctest.template.TemplateEngine(".")
alg = Algorithm.default()
keygen = EnvCmd("KEYGEN", f"-q -a {alg.number} -b {alg.bits} -L 3600")
@ -55,7 +53,7 @@ def configure_tld(zonename: str, delegations: List[Zone]) -> Zone:
return Zone(zonename, f"{outfile}.signed", Nameserver("ns2", "10.53.0.2"))
def configure_root(delegations: List[Zone]) -> TrustAnchor:
def configure_root(delegations: list[Zone]) -> TrustAnchor:
templates = isctest.template.TemplateEngine(".")
alg = Algorithm.default()
keygen = EnvCmd("KEYGEN", f"-q -a {alg.number} -b {alg.bits} -L 3600")
@ -110,7 +108,7 @@ def set_key_relationship(key1: str, key2: str):
def render_and_sign_zone(
zonename: str, keys: List[str], signing: bool = True, extra_options: str = ""
zonename: str, keys: list[str], signing: bool = True, extra_options: str = ""
):
dnskeys = []
privaterrs = []
@ -137,7 +135,7 @@ def render_and_sign_zone(
)
def configure_algo_csk(tld: str, policy: str, reconfig: bool = False) -> List[Zone]:
def configure_algo_csk(tld: str, policy: str, reconfig: bool = False) -> list[Zone]:
# The zones at csk-algorithm-roll.$tld represent the various steps
# of a CSK algorithm rollover.
zones = []
@ -285,7 +283,7 @@ def configure_algo_csk(tld: str, policy: str, reconfig: bool = False) -> List[Zo
return zones
def configure_algo_ksk_zsk(tld: str, reconfig: bool = False) -> List[Zone]:
def configure_algo_ksk_zsk(tld: str, reconfig: bool = False) -> list[Zone]:
# The zones at algorithm-roll.$tld represent the various steps of a ZSK/KSK
# algorithm rollover.
zones = []
@ -541,7 +539,7 @@ def configure_algo_ksk_zsk(tld: str, reconfig: bool = False) -> List[Zone]:
return zones
def configure_cskroll1(tld: str, policy: str) -> List[Zone]:
def configure_cskroll1(tld: str, policy: str) -> list[Zone]:
# The zones at csk-roll1.$tld represent the various steps of a CSK rollover
# (which is essentially a ZSK Pre-Publication / KSK Double-KSK rollover).
zones = []
@ -870,7 +868,7 @@ def configure_cskroll1(tld: str, policy: str) -> List[Zone]:
return zones
def configure_cskroll2(tld: str, policy: str) -> List[Zone]:
def configure_cskroll2(tld: str, policy: str) -> list[Zone]:
# The zones at csk-roll2.$tld represent the various steps of a CSK rollover
# (which is essentially a ZSK Pre-Publication / KSK Double-KSK rollover).
# This scenario differs from the csk-roll1 one because the zone signatures (ZRRSIG)
@ -1214,7 +1212,7 @@ def configure_cskroll2(tld: str, policy: str) -> List[Zone]:
return zones
def configure_enable_dnssec(tld: str, policy: str) -> List[Zone]:
def configure_enable_dnssec(tld: str, policy: str) -> list[Zone]:
# The zones at enable-dnssec.$tld represent the various steps of the
# initial signing of a zone.
zones = []
@ -1294,7 +1292,7 @@ def configure_enable_dnssec(tld: str, policy: str) -> List[Zone]:
return zones
def configure_going_insecure(tld: str, reconfig: bool = False) -> List[Zone]:
def configure_going_insecure(tld: str, reconfig: bool = False) -> list[Zone]:
zones = []
keygen = EnvCmd("KEYGEN", "-a ECDSA256 -L 7200")
settime = EnvCmd("SETTIME", "-s")
@ -1362,7 +1360,7 @@ def configure_going_insecure(tld: str, reconfig: bool = False) -> List[Zone]:
return zones
def configure_straight2none(tld: str) -> List[Zone]:
def configure_straight2none(tld: str) -> list[Zone]:
# These zones are going straight to "none" policy. This is undefined behavior.
zones = []
keygen = EnvCmd("KEYGEN", "-k default")
@ -1401,7 +1399,7 @@ def configure_straight2none(tld: str) -> List[Zone]:
return zones
def configure_ksk_doubleksk(tld: str) -> List[Zone]:
def configure_ksk_doubleksk(tld: str) -> list[Zone]:
# The zones at ksk-doubleksk.$tld represent the various steps of a KSK
# Double-KSK rollover.
zones = []
@ -1668,7 +1666,7 @@ def configure_ksk_doubleksk(tld: str) -> List[Zone]:
return zones
def configure_ksk_3crowd(tld: str) -> List[Zone]:
def configure_ksk_3crowd(tld: str) -> list[Zone]:
# Test #2375, the "three is a crowd" bug, where a new key is introduced but the
# previous rollover has not finished yet. In other words, we have a key KEY2
# that is the successor of key KEY1, and we introduce a new key KEY3 that is
@ -1729,7 +1727,7 @@ def configure_ksk_3crowd(tld: str) -> List[Zone]:
return zones
def configure_zsk_prepub(tld: str) -> List[Zone]:
def configure_zsk_prepub(tld: str) -> list[Zone]:
# The zones at zsk-prepub.$tld represent the various steps of a ZSK
# Pre-Publication rollover.
zones = []

View file

@ -96,6 +96,8 @@ lint.select = [
"F401",
# sorted __all__
"RUF022",
# unnecessary `typing` imports
"UP006",
# f-strings
"UP031",
"UP032",