use pep585 annotations in acme (#10407)

this is part of https://github.com/certbot/certbot/issues/10195 where i
posted [my general
plan](https://github.com/certbot/certbot/issues/10195#issuecomment-3176661347)
here

these changes were done automatically with the command:
```
ruff check --fix --extend-select UP006 --unsafe-fixes acme
```
This commit is contained in:
Brad Warren 2025-08-11 13:19:39 -07:00 committed by GitHub
parent 14a7a97f5b
commit dd99dc30f2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 67 additions and 90 deletions

View file

@ -5,7 +5,6 @@ import datetime
import http.client as http_client
import json
import sys
from typing import Dict
import unittest
from unittest import mock
@ -55,7 +54,7 @@ class ClientV2Test(unittest.TestCase):
self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212')
reg = messages.Registration(
contact=self.contact, key=KEY.public_key())
the_arg: Dict = dict(reg)
the_arg: dict = dict(reg)
self.new_reg = messages.NewRegistration(**the_arg)
self.regr = messages.RegistrationResource(
body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1')

View file

@ -2,7 +2,6 @@
import ipaddress
import itertools
import sys
from typing import List
import unittest
from unittest import mock
import warnings
@ -110,7 +109,7 @@ class GenMakeSelfSignedCertTest(unittest.TestCase):
def setUp(self):
self.cert_count = 5
self.serial_num: List[int] = []
self.serial_num: list[int] = []
self.privkey = rsa.generate_private_key(public_exponent=65537, key_size=2048)
def test_sn_collisions(self):

View file

@ -1,7 +1,6 @@
"""Tests for acme.messages."""
import sys
import json
from typing import Dict
import unittest
from unittest import mock
@ -118,7 +117,7 @@ class ConstantTest(unittest.TestCase):
from acme.messages import _Constant
class MockConstant(_Constant): # pylint: disable=missing-docstring
POSSIBLE_NAMES: Dict = {}
POSSIBLE_NAMES: dict = {}
self.MockConstant = MockConstant # pylint: disable=invalid-name
self.const_a = MockConstant('a')

View file

@ -4,7 +4,6 @@ import socket
import socketserver
import sys
import threading
from typing import Set
import unittest
from unittest import mock
@ -23,7 +22,7 @@ class HTTP01ServerTest(unittest.TestCase):
def setUp(self):
self.account_key = jose.JWK.load(
test_util.load_vector('rsa1024_key.pem'))
self.resources: Set = set()
self.resources: set = set()
from acme.standalone import HTTP01Server
self.server = HTTP01Server(('', 0), resources=self.resources)
@ -154,7 +153,7 @@ class HTTP01DualNetworkedServersTest(unittest.TestCase):
def setUp(self):
self.account_key = jose.JWK.load(
test_util.load_vector('rsa1024_key.pem'))
self.resources: Set = set()
self.resources: set = set()
from acme.standalone import HTTP01DualNetworkedServers
self.servers = HTTP01DualNetworkedServers(('', 0), resources=self.resources)

View file

@ -5,11 +5,8 @@ import hashlib
import logging
from typing import Any
from typing import cast
from typing import Dict
from typing import Mapping
from typing import Optional
from typing import Tuple
from typing import Type
from typing import TypeVar
from typing import Union
@ -25,10 +22,10 @@ GenericChallenge = TypeVar('GenericChallenge', bound='Challenge')
class Challenge(jose.TypedJSONObjectWithFields):
# _fields_to_partial_json
"""ACME challenge."""
TYPES: Dict[str, Type['Challenge']] = {}
TYPES: dict[str, type['Challenge']] = {}
@classmethod
def from_json(cls: Type[GenericChallenge],
def from_json(cls: type[GenericChallenge],
jobj: Mapping[str, Any]) -> Union[GenericChallenge, 'UnrecognizedChallenge']:
try:
return cast(GenericChallenge, super().from_json(jobj))
@ -40,9 +37,9 @@ class Challenge(jose.TypedJSONObjectWithFields):
class ChallengeResponse(jose.TypedJSONObjectWithFields):
# _fields_to_partial_json
"""ACME challenge response."""
TYPES: Dict[str, Type['ChallengeResponse']] = {}
TYPES: dict[str, type['ChallengeResponse']] = {}
def to_partial_json(self) -> Dict[str, Any]:
def to_partial_json(self) -> dict[str, Any]:
# Removes the `type` field which is inserted by TypedJSONObjectWithFields.to_partial_json.
# This field breaks RFC8555 compliance.
jobj = super().to_partial_json()
@ -62,13 +59,13 @@ class UnrecognizedChallenge(Challenge):
:ivar jobj: Original JSON decoded object.
"""
jobj: Dict[str, Any]
jobj: dict[str, Any]
def __init__(self, jobj: Mapping[str, Any]) -> None:
super().__init__()
object.__setattr__(self, "jobj", jobj)
def to_partial_json(self) -> Dict[str, Any]:
def to_partial_json(self) -> dict[str, Any]:
return self.jobj # pylint: disable=no-member
@classmethod
@ -147,7 +144,7 @@ class KeyAuthorizationChallengeResponse(ChallengeResponse):
return True
def to_partial_json(self) -> Dict[str, Any]:
def to_partial_json(self) -> dict[str, Any]:
jobj = super().to_partial_json()
jobj.pop('keyAuthorization', None)
return jobj
@ -164,7 +161,7 @@ class KeyAuthorizationChallenge(_TokenChallenge, metaclass=abc.ABCMeta):
:param str typ: type of the challenge
"""
typ: str = NotImplemented
response_cls: Type[KeyAuthorizationChallengeResponse] = NotImplemented
response_cls: type[KeyAuthorizationChallengeResponse] = NotImplemented
thumbprint_hash_function = (
KeyAuthorizationChallengeResponse.thumbprint_hash_function)
@ -207,7 +204,7 @@ class KeyAuthorizationChallenge(_TokenChallenge, metaclass=abc.ABCMeta):
raise NotImplementedError() # pragma: no cover
def response_and_validation(self, account_key: jose.JWK, *args: Any, **kwargs: Any
) -> Tuple[KeyAuthorizationChallengeResponse, Any]:
) -> tuple[KeyAuthorizationChallengeResponse, Any]:
"""Generate response and validation.
Convenience function that return results of `response` and

View file

@ -9,11 +9,8 @@ import random
import time
from typing import Any
from typing import cast
from typing import List
from typing import Mapping
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union
from cryptography import x509
@ -156,7 +153,7 @@ class ClientV2:
csr_pem=csr_pem)
def poll(self, authzr: messages.AuthorizationResource
) -> Tuple[messages.AuthorizationResource, requests.Response]:
) -> tuple[messages.AuthorizationResource, requests.Response]:
"""Poll Authorization Resource for status.
:param authzr: Authorization Resource
@ -314,7 +311,7 @@ class ClientV2:
return self.poll_finalization(orderr, deadline, fetch_alternative_chains)
def renewal_time(self, cert_pem: bytes
) -> Tuple[Optional[datetime.datetime], datetime.datetime]:
) -> tuple[Optional[datetime.datetime], datetime.datetime]:
"""Return an appropriate time to attempt renewal of the certificate,
and the next time to ask the ACME server for renewal info.
@ -408,7 +405,7 @@ class ClientV2:
new_args = args[:1] + (None,) + args[1:]
return self._post(*new_args, **kwargs)
def _get_links(self, response: requests.Response, relation_type: str) -> List[str]:
def _get_links(self, response: requests.Response, relation_type: str) -> list[str]:
"""
Retrieves all Link URIs of relation_type from the response.
:param requests.Response response: The requests HTTP response.
@ -621,7 +618,7 @@ class ClientNetwork:
self.account = account
self.alg = alg
self.verify_ssl = verify_ssl
self._nonces: Set[str] = set()
self._nonces: set[str] = set()
self.user_agent = user_agent
self.session = requests.Session()
self._default_timeout = timeout

View file

@ -4,10 +4,8 @@ from datetime import datetime, timedelta, timezone
import ipaddress
import logging
import typing
from typing import List
from typing import Literal
from typing import Optional
from typing import Set
from typing import Union
from cryptography import x509
@ -56,9 +54,9 @@ CertificateIssuerPrivateKeyTypesTpl = (
def make_csr(
private_key_pem: bytes,
domains: Optional[Union[Set[str], List[str]]] = None,
domains: Optional[Union[set[str], list[str]]] = None,
must_staple: bool = False,
ipaddrs: Optional[List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]] = None,
ipaddrs: Optional[list[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]] = None,
) -> bytes:
"""Generate a CSR containing domains or IPs as subjectAltNames.
@ -112,7 +110,7 @@ def make_csr(
def get_names_from_subject_and_extensions(
subject: x509.Name, exts: x509.Extensions
) -> List[str]:
) -> list[str]:
"""Gets all DNS SAN names as well as the first Common Name from subject.
:param subject: Name of the x509 object, which may include Common Name
@ -146,7 +144,7 @@ def get_names_from_subject_and_extensions(
def _cryptography_cert_or_req_san(
cert_or_req: Union[x509.Certificate, x509.CertificateSigningRequest],
) -> List[str]:
) -> list[str]:
"""Get Subject Alternative Names from certificate or CSR using cryptography.
.. note:: Although this is `acme` internal API, it is used by
@ -177,11 +175,11 @@ def _now() -> datetime:
def make_self_signed_cert(private_key: types.CertificateIssuerPrivateKeyTypes,
domains: Optional[List[str]] = None,
domains: Optional[list[str]] = None,
not_before: Optional[datetime] = None,
validity: Optional[timedelta] = None, force_san: bool = True,
extensions: Optional[List[x509.Extension]] = None,
ips: Optional[List[Union[ipaddress.IPv4Address,
extensions: Optional[list[x509.Extension]] = None,
ips: Optional[list[Union[ipaddress.IPv4Address,
ipaddress.IPv6Address]]] = None
) -> x509.Certificate:
"""Generate new self-signed certificate.
@ -228,7 +226,7 @@ def make_self_signed_cert(private_key: types.CertificateIssuerPrivateKeyTypes,
builder = builder.subject_name(x509.Name(name_attrs))
builder = builder.issuer_name(x509.Name(name_attrs))
sanlist: List[x509.GeneralName] = []
sanlist: list[x509.GeneralName] = []
for address in domains:
sanlist.append(x509.DNSName(address))
for ip in ips:
@ -252,7 +250,7 @@ def make_self_signed_cert(private_key: types.CertificateIssuerPrivateKeyTypes,
def dump_cryptography_chain(
chain: List[x509.Certificate],
chain: list[x509.Certificate],
encoding: Literal[Encoding.PEM, Encoding.DER] = Encoding.PEM,
) -> bytes:
"""Dump certificate chain into a bundle.

View file

@ -2,9 +2,7 @@
import datetime
import typing
from typing import Any
from typing import List
from typing import Mapping
from typing import Set
from josepy import errors as jose_errors
import requests
@ -82,7 +80,7 @@ class PollError(ClientError):
to the most recently updated one
"""
def __init__(self, exhausted: Set['messages.AuthorizationResource'],
def __init__(self, exhausted: set['messages.AuthorizationResource'],
updated: Mapping['messages.AuthorizationResource',
'messages.AuthorizationResource']
) -> None:
@ -104,7 +102,7 @@ class ValidationError(Error):
"""Error for authorization failures. Contains a list of authorization
resources, each of which is invalid and should have an error field.
"""
def __init__(self, failed_authzrs: List['messages.AuthorizationResource']) -> None:
def __init__(self, failed_authzrs: list['messages.AuthorizationResource']) -> None:
self.failed_authzrs = failed_authzrs
super().__init__()

View file

@ -3,14 +3,10 @@ from collections.abc import Hashable
import datetime
import json
from typing import Any
from typing import Dict
from typing import Iterator
from typing import List
from typing import Mapping
from typing import MutableMapping
from typing import Optional
from typing import Tuple
from typing import Type
from typing import TypeVar
from cryptography import x509
@ -73,7 +69,7 @@ def is_acme_error(err: BaseException) -> bool:
class _Constant(jose.JSONDeSerializable, Hashable):
"""ACME constant."""
__slots__ = ('name',)
POSSIBLE_NAMES: Dict[str, '_Constant'] = NotImplemented
POSSIBLE_NAMES: dict[str, '_Constant'] = NotImplemented
def __init__(self, name: str) -> None:
super().__init__()
@ -101,7 +97,7 @@ class _Constant(jose.JSONDeSerializable, Hashable):
class IdentifierType(_Constant):
"""ACME identifier type."""
POSSIBLE_NAMES: Dict[str, _Constant] = {}
POSSIBLE_NAMES: dict[str, _Constant] = {}
IDENTIFIER_FQDN = IdentifierType('dns') # IdentifierDNS in Boulder
@ -140,12 +136,12 @@ class Error(jose.JSONObjectWithFields, errors.Error):
detail: str = jose.field('detail', omitempty=True)
identifier: Optional['Identifier'] = jose.field(
'identifier', decoder=Identifier.from_json, omitempty=True)
subproblems: Optional[Tuple['Error', ...]] = jose.field('subproblems', omitempty=True)
subproblems: Optional[tuple['Error', ...]] = jose.field('subproblems', omitempty=True)
# Mypy does not understand the josepy magic happening here, and falsely claims
# that subproblems is redefined. Let's ignore the type check here.
@subproblems.decoder # type: ignore
def subproblems(value: List[Dict[str, Any]]) -> Tuple['Error', ...]: # pylint: disable=no-self-argument,missing-function-docstring
def subproblems(value: list[dict[str, Any]]) -> tuple['Error', ...]: # pylint: disable=no-self-argument,missing-function-docstring
return tuple(Error.from_json(subproblem) for subproblem in value)
@classmethod
@ -208,7 +204,7 @@ class Error(jose.JSONObjectWithFields, errors.Error):
class Status(_Constant):
"""ACME "status" field."""
POSSIBLE_NAMES: Dict[str, _Constant] = {}
POSSIBLE_NAMES: dict[str, _Constant] = {}
STATUS_UNKNOWN = Status('unknown')
@ -231,9 +227,9 @@ class Directory(jose.JSONDeSerializable):
"""Directory Meta."""
_terms_of_service: str = jose.field('termsOfService', omitempty=True)
website: str = jose.field('website', omitempty=True)
caa_identities: List[str] = jose.field('caaIdentities', omitempty=True)
caa_identities: list[str] = jose.field('caaIdentities', omitempty=True)
external_account_required: bool = jose.field('externalAccountRequired', omitempty=True)
profiles: Dict[str, str] = jose.field('profiles', omitempty=True)
profiles: dict[str, str] = jose.field('profiles', omitempty=True)
def __init__(self, **kwargs: Any) -> None:
kwargs = {self._internal_name(k): v for k, v in kwargs.items()}
@ -268,7 +264,7 @@ class Directory(jose.JSONDeSerializable):
except KeyError:
raise KeyError(f'Directory field "{name}" not found')
def to_partial_json(self) -> Dict[str, Any]:
def to_partial_json(self) -> dict[str, Any]:
return util.map_keys(self._jobj, lambda k: k)
@classmethod
@ -304,7 +300,7 @@ class ExternalAccountBinding:
@classmethod
def from_data(cls, account_public_key: jose.JWK, kid: str, hmac_key: str,
directory: Directory, hmac_alg: str = "HS256") -> Dict[str, Any]:
directory: Directory, hmac_alg: str = "HS256") -> dict[str, Any]:
"""Create External Account Binding Resource from contact details, kid and hmac."""
key_json = json.dumps(account_public_key.to_partial_json()).encode()
@ -347,21 +343,21 @@ class Registration(ResourceBody):
# Contact field implements special behavior to allow messages that clear existing
# contacts while not expecting the `contact` field when loading from json.
# This is implemented in the constructor and *_json methods.
contact: Tuple[str, ...] = jose.field('contact', omitempty=True, default=())
contact: tuple[str, ...] = jose.field('contact', omitempty=True, default=())
agreement: str = jose.field('agreement', omitempty=True)
status: Status = jose.field('status', omitempty=True)
terms_of_service_agreed: bool = jose.field('termsOfServiceAgreed', omitempty=True)
only_return_existing: bool = jose.field('onlyReturnExisting', omitempty=True)
external_account_binding: Dict[str, Any] = jose.field('externalAccountBinding',
external_account_binding: dict[str, Any] = jose.field('externalAccountBinding',
omitempty=True)
phone_prefix = 'tel:'
email_prefix = 'mailto:'
@classmethod
def from_data(cls: Type[GenericRegistration], phone: Optional[str] = None,
def from_data(cls: type[GenericRegistration], phone: Optional[str] = None,
email: Optional[str] = None,
external_account_binding: Optional[Dict[str, Any]] = None,
external_account_binding: Optional[dict[str, Any]] = None,
**kwargs: Any) -> GenericRegistration:
"""
Create registration resource from contact details.
@ -398,12 +394,12 @@ class Registration(ResourceBody):
object.__setattr__(self, '_add_contact', True)
super().__init__(**kwargs)
def _filter_contact(self, prefix: str) -> Tuple[str, ...]:
def _filter_contact(self, prefix: str) -> tuple[str, ...]:
return tuple(
detail[len(prefix):] for detail in self.contact # pylint: disable=not-an-iterable
if detail.startswith(prefix))
def _add_contact_if_appropriate(self, jobj: Dict[str, Any]) -> Dict[str, Any]:
def _add_contact_if_appropriate(self, jobj: dict[str, Any]) -> dict[str, Any]:
"""
The `contact` member of Registration objects should not be required when
de-serializing (as it would be if the Fields' `omitempty` flag were `False`), but
@ -420,23 +416,23 @@ class Registration(ResourceBody):
return jobj
def to_partial_json(self) -> Dict[str, Any]:
def to_partial_json(self) -> dict[str, Any]:
"""Modify josepy.JSONDeserializable.to_partial_json()"""
jobj = super().to_partial_json()
return self._add_contact_if_appropriate(jobj)
def fields_to_partial_json(self) -> Dict[str, Any]:
def fields_to_partial_json(self) -> dict[str, Any]:
"""Modify josepy.JSONObjectWithFields.fields_to_partial_json()"""
jobj = super().fields_to_partial_json()
return self._add_contact_if_appropriate(jobj)
@property
def phones(self) -> Tuple[str, ...]:
def phones(self) -> tuple[str, ...]:
"""All phones found in the ``contact`` field."""
return self._filter_contact(self.phone_prefix)
@property
def emails(self) -> Tuple[str, ...]:
def emails(self) -> tuple[str, ...]:
"""All emails found in the ``contact`` field."""
return self._filter_contact(self.email_prefix)
@ -498,13 +494,13 @@ class ChallengeBody(ResourceBody):
def encode(self, name: str) -> Any:
return super().encode(self._internal_name(name))
def to_partial_json(self) -> Dict[str, Any]:
def to_partial_json(self) -> dict[str, Any]:
jobj = super().to_partial_json()
jobj.update(self.chall.to_partial_json())
return jobj
@classmethod
def fields_from_json(cls, jobj: Mapping[str, Any]) -> Dict[str, Any]:
def fields_from_json(cls, jobj: Mapping[str, Any]) -> dict[str, Any]:
jobj_fields = super().fields_from_json(jobj)
jobj_fields['chall'] = challenges.Challenge.from_json(jobj)
return jobj_fields
@ -553,7 +549,7 @@ class Authorization(ResourceBody):
"""
identifier: Identifier = jose.field('identifier', decoder=Identifier.from_json, omitempty=True)
challenges: List[ChallengeBody] = jose.field('challenges', omitempty=True)
challenges: list[ChallengeBody] = jose.field('challenges', omitempty=True)
status: Status = jose.field('status', omitempty=True, decoder=Status.from_json)
# TODO: 'expires' is allowed for Authorization Resources in
@ -566,7 +562,7 @@ class Authorization(ResourceBody):
# Mypy does not understand the josepy magic happening here, and falsely claims
# that challenge is redefined. Let's ignore the type check here.
@challenges.decoder # type: ignore
def challenges(value: List[Dict[str, Any]]) -> Tuple[ChallengeBody, ...]: # pylint: disable=no-self-argument,missing-function-docstring
def challenges(value: list[dict[str, Any]]) -> tuple[ChallengeBody, ...]: # pylint: disable=no-self-argument,missing-function-docstring
return tuple(ChallengeBody.from_json(chall) for chall in value)
@ -608,7 +604,7 @@ class CertificateResource(ResourceWithURI):
"""
cert_chain_uri: str = jose.field('cert_chain_uri')
authzrs: Tuple[AuthorizationResource, ...] = jose.field('authzrs')
authzrs: tuple[AuthorizationResource, ...] = jose.field('authzrs')
class Revocation(jose.JSONObjectWithFields):
@ -640,9 +636,9 @@ class Order(ResourceBody):
"""
# https://datatracker.ietf.org/doc/draft-aaron-acme-profiles/
profile: str = jose.field('profile', omitempty=True)
identifiers: List[Identifier] = jose.field('identifiers', omitempty=True)
identifiers: list[Identifier] = jose.field('identifiers', omitempty=True)
status: Status = jose.field('status', decoder=Status.from_json, omitempty=True)
authorizations: List[str] = jose.field('authorizations', omitempty=True)
authorizations: list[str] = jose.field('authorizations', omitempty=True)
certificate: str = jose.field('certificate', omitempty=True)
finalize: str = jose.field('finalize', omitempty=True)
expires: datetime.datetime = fields.rfc3339('expires', omitempty=True)
@ -651,7 +647,7 @@ class Order(ResourceBody):
# Mypy does not understand the josepy magic happening here, and falsely claims
# that identifiers is redefined. Let's ignore the type check here.
@identifiers.decoder # type: ignore
def identifiers(value: List[Dict[str, Any]]) -> Tuple[Identifier, ...]: # pylint: disable=no-self-argument,missing-function-docstring
def identifiers(value: list[dict[str, Any]]) -> tuple[Identifier, ...]: # pylint: disable=no-self-argument,missing-function-docstring
return tuple(Identifier.from_json(identifier) for identifier in value)
@ -681,15 +677,15 @@ class OrderResource(ResourceWithURI):
decoder=lambda s: s.encode("utf-8"),
encoder=lambda b: b.decode("utf-8"))
authorizations: List[AuthorizationResource] = jose.field('authorizations')
authorizations: list[AuthorizationResource] = jose.field('authorizations')
fullchain_pem: str = jose.field('fullchain_pem', omitempty=True)
alternative_fullchains_pem: List[str] = jose.field('alternative_fullchains_pem',
alternative_fullchains_pem: list[str] = jose.field('alternative_fullchains_pem',
omitempty=True)
# Mypy does not understand the josepy magic happening here, and falsely claims
# that authorizations is redefined. Let's ignore the type check here.
@authorizations.decoder # type: ignore
def authorizations(value: List[Dict[str, Any]]) -> Tuple[AuthorizationResource, ...]: # pylint: disable=no-self-argument,missing-function-docstring
def authorizations(value: list[dict[str, Any]]) -> tuple[AuthorizationResource, ...]: # pylint: disable=no-self-argument,missing-function-docstring
return tuple(AuthorizationResource.from_json(authz) for authz in value)

View file

@ -10,11 +10,7 @@ import socket
import socketserver
import threading
from typing import Any
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Type
from acme import challenges
@ -36,11 +32,11 @@ class BaseDualNetworkedServers:
If two servers are instantiated, they will serve on the same port.
"""
def __init__(self, ServerClass: Type[socketserver.TCPServer], server_address: Tuple[str, int],
def __init__(self, ServerClass: type[socketserver.TCPServer], server_address: tuple[str, int],
*remaining_args: Any, **kwargs: Any) -> None:
port = server_address[1]
self.threads: List[threading.Thread] = []
self.servers: List[socketserver.BaseServer] = []
self.threads: list[threading.Thread] = []
self.servers: list[socketserver.BaseServer] = []
# Preserve socket error for re-raising, if no servers can be started
last_socket_err: Optional[socket.error] = None
@ -94,7 +90,7 @@ class BaseDualNetworkedServers:
thread.start()
self.threads.append(thread)
def getsocknames(self) -> List[Tuple[str, int]]:
def getsocknames(self) -> list[tuple[str, int]]:
"""Wraps socketserver.TCPServer.socket.getsockname"""
return [server.socket.getsockname() for server in self.servers]
@ -124,8 +120,8 @@ class HTTPServer(BaseHTTPServer.HTTPServer):
class HTTP01Server(HTTPServer, ACMEServerMixin):
"""HTTP01 Server."""
def __init__(self, server_address: Tuple[str, int],
resources: Set[HTTP01RequestHandler.HTTP01Resource],
def __init__(self, server_address: tuple[str, int],
resources: set[HTTP01RequestHandler.HTTP01Resource],
ipv6: bool = False, timeout: int = 30) -> None:
super().__init__(
server_address, HTTP01RequestHandler.partial_init(
@ -220,7 +216,7 @@ class HTTP01RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
self.path)
@classmethod
def partial_init(cls, simple_http_resources: Set[HTTP01RequestHandler.HTTP01Resource],
def partial_init(cls, simple_http_resources: set[HTTP01RequestHandler.HTTP01Resource],
timeout: int) -> functools.partial[HTTP01RequestHandler]:
"""Partially initialize this handler.

View file

@ -1,10 +1,9 @@
"""ACME utilities."""
from typing import Any
from typing import Callable
from typing import Dict
from typing import Mapping
def map_keys(dikt: Mapping[Any, Any], func: Callable[[Any], Any]) -> Dict[Any, Any]:
def map_keys(dikt: Mapping[Any, Any], func: Callable[[Any], Any]) -> dict[Any, Any]:
"""Map dictionary keys."""
return {func(key): value for key, value in dikt.items()}