mirror of
https://github.com/certbot/certbot.git
synced 2026-05-28 04:34:11 -04:00
use pep585 types in private certbot files (#10412)
this is another part of https://github.com/certbot/certbot/issues/10195 the first commit was done automatically with the command: ``` ruff check --fix --extend-select UP006 --unsafe-fixes certbot/src/certbot/_internal ``` this was unfortunately insufficient as it left a line in webroot.py with over 100 characters. i fixed this manually in my second commit
This commit is contained in:
parent
1943889119
commit
c519307569
33 changed files with 202 additions and 268 deletions
|
|
@ -8,8 +8,6 @@ import socket
|
|||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import cast
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Mapping
|
||||
from typing import Optional
|
||||
|
||||
|
|
@ -107,10 +105,10 @@ class Account:
|
|||
class AccountMemoryStorage(interfaces.AccountStorage):
|
||||
"""In-memory account storage."""
|
||||
|
||||
def __init__(self, initial_accounts: Optional[Dict[str, Account]] = None) -> None:
|
||||
def __init__(self, initial_accounts: Optional[dict[str, Account]] = None) -> None:
|
||||
self.accounts = initial_accounts if initial_accounts is not None else {}
|
||||
|
||||
def find_all(self) -> List[Account]:
|
||||
def find_all(self) -> list[Account]:
|
||||
return list(self.accounts.values())
|
||||
|
||||
def save(self, account: Account, client: ClientV2) -> None:
|
||||
|
|
@ -154,7 +152,7 @@ class AccountFileStorage(interfaces.AccountStorage):
|
|||
def _metadata_path(cls, account_dir_path: str) -> str:
|
||||
return os.path.join(account_dir_path, "meta.json")
|
||||
|
||||
def _find_all_for_server_path(self, server_path: str) -> List[Account]:
|
||||
def _find_all_for_server_path(self, server_path: str) -> list[Account]:
|
||||
accounts_dir = self.config.accounts_dir_for_server_path(server_path)
|
||||
try:
|
||||
candidates = os.listdir(accounts_dir)
|
||||
|
|
@ -181,7 +179,7 @@ class AccountFileStorage(interfaces.AccountStorage):
|
|||
accounts = prev_accounts
|
||||
return accounts
|
||||
|
||||
def find_all(self) -> List[Account]:
|
||||
def find_all(self) -> list[Account]:
|
||||
return self._find_all_for_server_path(self.config.server_path)
|
||||
|
||||
def _symlink_to_account_dir(self, prev_server_path: str, server_path: str,
|
||||
|
|
|
|||
|
|
@ -2,13 +2,9 @@
|
|||
import datetime
|
||||
import logging
|
||||
import time
|
||||
from typing import Dict
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Sequence
|
||||
from typing import Tuple
|
||||
from typing import Type
|
||||
|
||||
import josepy
|
||||
from requests.models import Response
|
||||
|
|
@ -46,7 +42,7 @@ class AuthHandler:
|
|||
|
||||
"""
|
||||
def __init__(self, auth: interfaces.Authenticator, acme_client: Optional[client.ClientV2],
|
||||
account: Optional[Account], pref_challs: List[str]) -> None:
|
||||
account: Optional[Account], pref_challs: list[str]) -> None:
|
||||
self.auth = auth
|
||||
self.acme = acme_client
|
||||
|
||||
|
|
@ -56,7 +52,7 @@ class AuthHandler:
|
|||
def handle_authorizations(self, orderr: messages.OrderResource,
|
||||
config: configuration.NamespaceConfig, best_effort: bool = False,
|
||||
max_retries: int = 30,
|
||||
max_time_mins: float = 30) -> List[messages.AuthorizationResource]:
|
||||
max_time_mins: float = 30) -> list[messages.AuthorizationResource]:
|
||||
"""
|
||||
Retrieve all authorizations, perform all challenges required to validate
|
||||
these authorizations, then poll and wait for the authorization to be checked.
|
||||
|
|
@ -117,7 +113,7 @@ class AuthHandler:
|
|||
|
||||
raise errors.Error("An unexpected error occurred while handling the authorizations.")
|
||||
|
||||
def deactivate_valid_authorizations(self, orderr: messages.OrderResource) -> Tuple[List, List]:
|
||||
def deactivate_valid_authorizations(self, orderr: messages.OrderResource) -> tuple[list, list]:
|
||||
"""
|
||||
Deactivate all `valid` authorizations in the order, so that they cannot be reused
|
||||
in subsequent orders.
|
||||
|
|
@ -144,7 +140,7 @@ class AuthHandler:
|
|||
|
||||
return (deactivated, failed)
|
||||
|
||||
def _poll_authorizations(self, authzrs: List[messages.AuthorizationResource], max_retries: int,
|
||||
def _poll_authorizations(self, authzrs: list[messages.AuthorizationResource], max_retries: int,
|
||||
deadline_minutes: float, best_effort: bool) -> None:
|
||||
"""
|
||||
Poll the ACME CA server, to wait for confirmation that authorizations have their challenges
|
||||
|
|
@ -154,7 +150,7 @@ class AuthHandler:
|
|||
if not self.acme:
|
||||
raise errors.Error("No ACME client defined, cannot poll authorizations.")
|
||||
|
||||
authzrs_to_check: Dict[int, Tuple[messages.AuthorizationResource,
|
||||
authzrs_to_check: dict[int, tuple[messages.AuthorizationResource,
|
||||
Optional[Response]]] = {index: (authzr, None)
|
||||
for index, authzr in enumerate(authzrs)}
|
||||
authzrs_failed_to_report = []
|
||||
|
|
@ -216,7 +212,7 @@ class AuthHandler:
|
|||
raise errors.AuthorizationError('All authorizations were not finalized by the CA.')
|
||||
|
||||
def _choose_challenges(self, authzrs: Iterable[messages.AuthorizationResource]
|
||||
) -> List[achallenges.AnnotatedChallenge]:
|
||||
) -> list[achallenges.AnnotatedChallenge]:
|
||||
"""
|
||||
Retrieve necessary and pending challenges to satisfy server.
|
||||
NB: Necessary and already validated challenges are not retrieved,
|
||||
|
|
@ -227,7 +223,7 @@ class AuthHandler:
|
|||
|
||||
pending_authzrs = [authzr for authzr in authzrs
|
||||
if authzr.body.status != messages.STATUS_VALID]
|
||||
achalls: List[achallenges.AnnotatedChallenge] = []
|
||||
achalls: list[achallenges.AnnotatedChallenge] = []
|
||||
if pending_authzrs:
|
||||
logger.info("Performing the following challenges:")
|
||||
for authzr in pending_authzrs:
|
||||
|
|
@ -241,7 +237,7 @@ class AuthHandler:
|
|||
|
||||
return achalls
|
||||
|
||||
def _get_chall_pref(self, domain: str) -> List[Type[challenges.Challenge]]:
|
||||
def _get_chall_pref(self, domain: str) -> list[type[challenges.Challenge]]:
|
||||
"""Return list of challenge preferences.
|
||||
|
||||
:param str domain: domain for which you are requesting preferences
|
||||
|
|
@ -263,7 +259,7 @@ class AuthHandler:
|
|||
chall_prefs.extend(plugin_pref)
|
||||
return chall_prefs
|
||||
|
||||
def _cleanup_challenges(self, achalls: List[achallenges.AnnotatedChallenge]) -> None:
|
||||
def _cleanup_challenges(self, achalls: list[achallenges.AnnotatedChallenge]) -> None:
|
||||
"""Cleanup challenges.
|
||||
|
||||
:param achalls: annotated challenges to cleanup
|
||||
|
|
@ -274,7 +270,7 @@ class AuthHandler:
|
|||
self.auth.cleanup(achalls)
|
||||
|
||||
def _challenge_factory(self, authzr: messages.AuthorizationResource,
|
||||
path: Sequence[int]) -> List[achallenges.AnnotatedChallenge]:
|
||||
path: Sequence[int]) -> list[achallenges.AnnotatedChallenge]:
|
||||
"""Construct Namedtuple Challenges
|
||||
|
||||
:param messages.AuthorizationResource authzr: authorization
|
||||
|
|
@ -299,11 +295,11 @@ class AuthHandler:
|
|||
|
||||
return achalls
|
||||
|
||||
def _report_failed_authzrs(self, failed_authzrs: List[messages.AuthorizationResource]) -> None:
|
||||
def _report_failed_authzrs(self, failed_authzrs: list[messages.AuthorizationResource]) -> None:
|
||||
"""Notifies the user about failed authorizations."""
|
||||
if not self.account:
|
||||
raise errors.Error("Account is not set.")
|
||||
problems: Dict[str, List[achallenges.AnnotatedChallenge]] = {}
|
||||
problems: dict[str, list[achallenges.AnnotatedChallenge]] = {}
|
||||
failed_achalls = [challb_to_achall(challb, self.account.key, authzr.body.identifier.value)
|
||||
for authzr in failed_authzrs for challb in authzr.body.challenges
|
||||
if challb.error]
|
||||
|
|
@ -325,7 +321,7 @@ class AuthHandler:
|
|||
|
||||
display_util.notify("".join(msg))
|
||||
|
||||
def _debug_challenges_msg(self, achalls: List[achallenges.AnnotatedChallenge],
|
||||
def _debug_challenges_msg(self, achalls: list[achallenges.AnnotatedChallenge],
|
||||
config: configuration.NamespaceConfig) -> str:
|
||||
"""Construct message for debug challenges prompt
|
||||
|
||||
|
|
@ -388,8 +384,8 @@ def challb_to_achall(challb: messages.ChallengeBody, account_key: josepy.JWK,
|
|||
return achallenges.Other(challb=challb, domain=domain)
|
||||
|
||||
|
||||
def gen_challenge_path(challbs: List[messages.ChallengeBody],
|
||||
preferences: List[Type[challenges.Challenge]]) -> Tuple[int, ...]:
|
||||
def gen_challenge_path(challbs: list[messages.ChallengeBody],
|
||||
preferences: list[type[challenges.Challenge]]) -> tuple[int, ...]:
|
||||
"""Generate a plan to get authority over the identity.
|
||||
|
||||
:param tuple challbs: A tuple of challenges
|
||||
|
|
@ -417,7 +413,7 @@ def gen_challenge_path(challbs: List[messages.ChallengeBody],
|
|||
|
||||
# max_cost is now equal to sum(indices) + 1
|
||||
|
||||
best_combo: Optional[Tuple[int, ...]] = None
|
||||
best_combo: Optional[tuple[int, ...]] = None
|
||||
# Set above completing all of the available challenges
|
||||
best_combo_cost = max_cost
|
||||
|
||||
|
|
@ -441,7 +437,7 @@ def gen_challenge_path(challbs: List[messages.ChallengeBody],
|
|||
return best_combo
|
||||
|
||||
|
||||
def _report_no_chall_path(challbs: List[messages.ChallengeBody]) -> errors.AuthorizationError:
|
||||
def _report_no_chall_path(challbs: list[messages.ChallengeBody]) -> errors.AuthorizationError:
|
||||
"""Logs and return a raisable error reporting that no satisfiable chall path exists.
|
||||
|
||||
:param challbs: challenges from the authorization that can't be satisfied
|
||||
|
|
@ -460,7 +456,7 @@ def _report_no_chall_path(challbs: List[messages.ChallengeBody]) -> errors.Autho
|
|||
return errors.AuthorizationError(msg)
|
||||
|
||||
|
||||
def _generate_failed_chall_msg(failed_achalls: List[achallenges.AnnotatedChallenge]) -> str:
|
||||
def _generate_failed_chall_msg(failed_achalls: list[achallenges.AnnotatedChallenge]) -> str:
|
||||
"""Creates a user friendly error message about failed challenges.
|
||||
|
||||
:param list failed_achalls: A list of failed
|
||||
|
|
|
|||
|
|
@ -6,9 +6,7 @@ import traceback
|
|||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
from typing import TypeVar
|
||||
from typing import Union
|
||||
|
||||
|
|
@ -122,14 +120,14 @@ def lineage_for_certname(cli_config: configuration.NamespaceConfig,
|
|||
|
||||
|
||||
def domains_for_certname(config: configuration.NamespaceConfig,
|
||||
certname: str) -> Optional[List[str]]:
|
||||
certname: str) -> Optional[list[str]]:
|
||||
"""Find the domains in the cert with name certname."""
|
||||
lineage = lineage_for_certname(config, certname)
|
||||
return lineage.names() if lineage else None
|
||||
|
||||
|
||||
def find_duplicative_certs(config: configuration.NamespaceConfig,
|
||||
domains: List[str]) -> Tuple[Optional[storage.RenewableCert],
|
||||
domains: list[str]) -> tuple[Optional[storage.RenewableCert],
|
||||
Optional[storage.RenewableCert]]:
|
||||
"""Find existing certs that match the given domain names.
|
||||
|
||||
|
|
@ -154,9 +152,9 @@ def find_duplicative_certs(config: configuration.NamespaceConfig,
|
|||
|
||||
"""
|
||||
def update_certs_for_domain_matches(candidate_lineage: storage.RenewableCert,
|
||||
rv: Tuple[Optional[storage.RenewableCert],
|
||||
rv: tuple[Optional[storage.RenewableCert],
|
||||
Optional[storage.RenewableCert]]
|
||||
) -> Tuple[Optional[storage.RenewableCert],
|
||||
) -> tuple[Optional[storage.RenewableCert],
|
||||
Optional[storage.RenewableCert]]:
|
||||
"""Return cert as identical_names_cert if it matches,
|
||||
or subset_names_cert if it matches as subset
|
||||
|
|
@ -176,12 +174,12 @@ def find_duplicative_certs(config: configuration.NamespaceConfig,
|
|||
subset_names_cert = candidate_lineage
|
||||
return (identical_names_cert, subset_names_cert)
|
||||
|
||||
init: Tuple[Optional[storage.RenewableCert], Optional[storage.RenewableCert]] = (None, None)
|
||||
init: tuple[Optional[storage.RenewableCert], Optional[storage.RenewableCert]] = (None, None)
|
||||
|
||||
return _search_lineages(config, update_certs_for_domain_matches, init)
|
||||
|
||||
|
||||
def _archive_files(candidate_lineage: storage.RenewableCert, filetype: str) -> Optional[List[str]]:
|
||||
def _archive_files(candidate_lineage: storage.RenewableCert, filetype: str) -> Optional[list[str]]:
|
||||
""" In order to match things like:
|
||||
/etc/letsencrypt/archive/example.com/chain1.pem.
|
||||
|
||||
|
|
@ -203,8 +201,8 @@ def _archive_files(candidate_lineage: storage.RenewableCert, filetype: str) -> O
|
|||
return None
|
||||
|
||||
|
||||
def _acceptable_matches() -> List[Union[Callable[[storage.RenewableCert], str],
|
||||
Callable[[storage.RenewableCert], Optional[List[str]]]]]:
|
||||
def _acceptable_matches() -> list[Union[Callable[[storage.RenewableCert], str],
|
||||
Callable[[storage.RenewableCert], Optional[list[str]]]]]:
|
||||
""" Generates the list that's passed to match_and_check_overlaps. Is its own function to
|
||||
make unit testing easier.
|
||||
|
||||
|
|
@ -235,9 +233,9 @@ def cert_path_to_lineage(cli_config: configuration.NamespaceConfig) -> str:
|
|||
def match_and_check_overlaps(cli_config: configuration.NamespaceConfig,
|
||||
acceptable_matches: Iterable[Union[
|
||||
Callable[[storage.RenewableCert], str],
|
||||
Callable[[storage.RenewableCert], Optional[List[str]]]]],
|
||||
Callable[[storage.RenewableCert], Optional[list[str]]]]],
|
||||
match_func: Callable[[storage.RenewableCert], str],
|
||||
rv_func: Callable[[storage.RenewableCert], str]) -> List[str]:
|
||||
rv_func: Callable[[storage.RenewableCert], str]) -> list[str]:
|
||||
""" Searches through all lineages for a match, and checks for duplicates.
|
||||
If a duplicate is found, an error is raised, as performing operations on lineages
|
||||
that have their properties incorrectly duplicated elsewhere is probably a bad idea.
|
||||
|
|
@ -248,13 +246,13 @@ def match_and_check_overlaps(cli_config: configuration.NamespaceConfig,
|
|||
:param function rv_func: specifies what to return
|
||||
|
||||
"""
|
||||
def find_matches(candidate_lineage: storage.RenewableCert, return_value: List[str],
|
||||
def find_matches(candidate_lineage: storage.RenewableCert, return_value: list[str],
|
||||
acceptable_matches: Iterable[Union[
|
||||
Callable[[storage.RenewableCert], str],
|
||||
Callable[[storage.RenewableCert], Optional[List[str]]]]]) -> List[str]:
|
||||
Callable[[storage.RenewableCert], Optional[list[str]]]]]) -> list[str]:
|
||||
"""Returns a list of matches using _search_lineages."""
|
||||
acceptable_matches_resolved = [func(candidate_lineage) for func in acceptable_matches]
|
||||
acceptable_matches_rv: List[str] = []
|
||||
acceptable_matches_rv: list[str] = []
|
||||
for item in acceptable_matches_resolved:
|
||||
if isinstance(item, list):
|
||||
acceptable_matches_rv += item
|
||||
|
|
@ -265,7 +263,7 @@ def match_and_check_overlaps(cli_config: configuration.NamespaceConfig,
|
|||
return_value.append(rv_func(candidate_lineage))
|
||||
return return_value
|
||||
|
||||
matched: List[str] = _search_lineages(cli_config, find_matches, [], acceptable_matches)
|
||||
matched: list[str] = _search_lineages(cli_config, find_matches, [], acceptable_matches)
|
||||
if not matched:
|
||||
raise errors.Error(f"No match found for cert-path {cli_config.cert_path}!")
|
||||
elif len(matched) > 1:
|
||||
|
|
@ -317,7 +315,7 @@ def human_readable_cert_info(config: configuration.NamespaceConfig, cert: storag
|
|||
|
||||
|
||||
def get_certnames(config: configuration.NamespaceConfig, verb: str, allow_multiple: bool = False,
|
||||
custom_prompt: Optional[str] = None) -> List[str]:
|
||||
custom_prompt: Optional[str] = None) -> list[str]:
|
||||
"""Get certname from flag, interactively, or error out."""
|
||||
certname = config.certname
|
||||
if certname:
|
||||
|
|
@ -375,7 +373,7 @@ def _describe_certs(config: configuration.NamespaceConfig,
|
|||
parsed_certs: Iterable[storage.RenewableCert],
|
||||
parse_failures: Iterable[str]) -> None:
|
||||
"""Print information about the certs we know about"""
|
||||
out: List[str] = []
|
||||
out: list[str] = []
|
||||
|
||||
notify = out.append
|
||||
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ logger = logging.getLogger(__name__)
|
|||
helpful_parser: Optional[HelpfulArgumentParser] = None
|
||||
|
||||
|
||||
def prepare_and_parse_args(plugins: plugins_disco.PluginsRegistry, args: List[str]
|
||||
def prepare_and_parse_args(plugins: plugins_disco.PluginsRegistry, args: list[str]
|
||||
) -> NamespaceConfig:
|
||||
"""Returns parsed command line arguments.
|
||||
|
||||
|
|
@ -491,7 +491,7 @@ def prepare_and_parse_args(plugins: plugins_disco.PluginsRegistry, args: List[st
|
|||
return helpful.parse_args()
|
||||
|
||||
|
||||
def argparse_type(variable: Any) -> Type:
|
||||
def argparse_type(variable: Any) -> type:
|
||||
"""Return our argparse type function for a config variable (default: str)"""
|
||||
# pylint: disable=protected-access
|
||||
if helpful_parser is not None:
|
||||
|
|
|
|||
|
|
@ -5,10 +5,8 @@ import glob
|
|||
import inspect
|
||||
from typing import Any
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Sequence
|
||||
from typing import Tuple
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Union
|
||||
|
||||
|
|
@ -23,7 +21,7 @@ if TYPE_CHECKING:
|
|||
from certbot._internal.cli import helpful
|
||||
|
||||
|
||||
def read_file(filename: str, mode: str = "rb") -> Tuple[str, Any]:
|
||||
def read_file(filename: str, mode: str = "rb") -> tuple[str, Any]:
|
||||
"""Returns the given file's contents.
|
||||
|
||||
:param str filename: path to file
|
||||
|
|
@ -105,7 +103,7 @@ class _DomainsAction(argparse.Action):
|
|||
|
||||
|
||||
def add_domains(args_or_config: Union[argparse.Namespace, configuration.NamespaceConfig],
|
||||
domains: Optional[str]) -> List[str]:
|
||||
domains: Optional[str]) -> list[str]:
|
||||
"""Registers new domains to be used during the current client run.
|
||||
|
||||
Domains are not added to the list of requested domains if they have
|
||||
|
|
@ -120,7 +118,7 @@ def add_domains(args_or_config: Union[argparse.Namespace, configuration.Namespac
|
|||
:rtype: `list` of `str`
|
||||
|
||||
"""
|
||||
validated_domains: List[str] = []
|
||||
validated_domains: list[str] = []
|
||||
if not domains:
|
||||
return validated_domains
|
||||
|
||||
|
|
@ -164,7 +162,7 @@ class _EncodeReasonAction(argparse.Action):
|
|||
setattr(namespace, self.dest, code)
|
||||
|
||||
|
||||
def parse_preferred_challenges(pref_challs: Iterable[str]) -> List[str]:
|
||||
def parse_preferred_challenges(pref_challs: Iterable[str]) -> list[str]:
|
||||
"""Translate and validate preferred challenges.
|
||||
|
||||
:param pref_challs: list of preferred challenge types
|
||||
|
|
|
|||
|
|
@ -4,11 +4,8 @@ import argparse
|
|||
import functools
|
||||
import sys
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
from typing import Union
|
||||
|
||||
import configargparse
|
||||
|
|
@ -42,7 +39,7 @@ class HelpfulArgumentParser:
|
|||
'certbot --help security' for security options.
|
||||
|
||||
"""
|
||||
def __init__(self, args: List[str], plugins: Iterable[str]) -> None:
|
||||
def __init__(self, args: list[str], plugins: Iterable[str]) -> None:
|
||||
from certbot._internal import main
|
||||
self.VERBS = {
|
||||
"auth": main.certonly,
|
||||
|
|
@ -67,14 +64,14 @@ class HelpfulArgumentParser:
|
|||
# Get notification function for printing
|
||||
self.notify = display_obj.NoninteractiveDisplay(sys.stdout).notification
|
||||
|
||||
self.actions: List[configargparse.Action] = []
|
||||
self.actions: list[configargparse.Action] = []
|
||||
|
||||
# List of topics for which additional help can be provided
|
||||
HELP_TOPICS: List[Optional[str]] = ["all", "security", "paths", "automation", "testing"]
|
||||
HELP_TOPICS: list[Optional[str]] = ["all", "security", "paths", "automation", "testing"]
|
||||
HELP_TOPICS += list(self.VERBS) + self.COMMANDS_TOPICS + ["manage"]
|
||||
|
||||
plugin_names: List[Optional[str]] = list(plugins)
|
||||
self.help_topics: List[Optional[str]] = HELP_TOPICS + plugin_names + [None]
|
||||
plugin_names: list[Optional[str]] = list(plugins)
|
||||
self.help_topics: list[Optional[str]] = HELP_TOPICS + plugin_names + [None]
|
||||
|
||||
self.args = args
|
||||
|
||||
|
|
@ -95,7 +92,7 @@ class HelpfulArgumentParser:
|
|||
self.visible_topics = self.determine_help_topics(self.help_arg)
|
||||
|
||||
# elements are added by .add_group()
|
||||
self.groups: Dict[str, argparse._ArgumentGroup] = {}
|
||||
self.groups: dict[str, argparse._ArgumentGroup] = {}
|
||||
|
||||
self.parser = configargparse.ArgParser(
|
||||
prog="certbot",
|
||||
|
|
@ -168,7 +165,7 @@ class HelpfulArgumentParser:
|
|||
self.verb == "renew"):
|
||||
config.domains = []
|
||||
|
||||
def _build_sources_dict(self) -> Dict[str, ArgumentSource]:
|
||||
def _build_sources_dict(self) -> dict[str, ArgumentSource]:
|
||||
# ConfigArgparse's get_source_to_settings_dict doesn't actually create
|
||||
# default entries for each argument with a default value, omitting many
|
||||
# args we'd otherwise care about. So in general, unless an argument was
|
||||
|
|
@ -176,7 +173,7 @@ class HelpfulArgumentParser:
|
|||
# consider it as having a "default" value
|
||||
result = { action.dest: ArgumentSource.DEFAULT for action in self.actions }
|
||||
|
||||
source_to_settings_dict: Dict[str, Dict[str, Tuple[configargparse.Action, str]]]
|
||||
source_to_settings_dict: dict[str, dict[str, tuple[configargparse.Action, str]]]
|
||||
source_to_settings_dict = self.parser.get_source_to_settings_dict()
|
||||
|
||||
# We'll process the sources dict in order of each source's "priority",
|
||||
|
|
@ -187,7 +184,7 @@ class HelpfulArgumentParser:
|
|||
# 3. env vars (shouldn't be any)
|
||||
# 4. command line
|
||||
|
||||
def update_result(settings_dict: Dict[str, Tuple[configargparse.Action, str]],
|
||||
def update_result(settings_dict: dict[str, tuple[configargparse.Action, str]],
|
||||
source: ArgumentSource) -> None:
|
||||
actions = [self._find_action_for_arg(arg) if action is None else action
|
||||
for arg, (action, _) in settings_dict.items()]
|
||||
|
|
@ -202,7 +199,7 @@ class HelpfulArgumentParser:
|
|||
|
||||
# The command line settings dict is weird, so handle it separately
|
||||
if 'command_line' in source_to_settings_dict:
|
||||
settings_dict: Dict[str, Tuple[None, List[str]]]
|
||||
settings_dict: dict[str, tuple[None, list[str]]]
|
||||
settings_dict = source_to_settings_dict['command_line'] # type: ignore
|
||||
(_, unprocessed_args) = settings_dict['']
|
||||
args = []
|
||||
|
|
@ -401,7 +398,7 @@ class HelpfulArgumentParser:
|
|||
pass
|
||||
return True
|
||||
|
||||
def add(self, topics: Optional[Union[List[Optional[str]], str]], *args: Any,
|
||||
def add(self, topics: Optional[Union[list[Optional[str]], str]], *args: Any,
|
||||
**kwargs: Any) -> None:
|
||||
"""Add a new command line argument.
|
||||
|
||||
|
|
@ -416,7 +413,7 @@ class HelpfulArgumentParser:
|
|||
"""
|
||||
self.actions.append(self._add(topics, *args, **kwargs))
|
||||
|
||||
def _add(self, topics: Optional[Union[List[Optional[str]], str]], *args: Any,
|
||||
def _add(self, topics: Optional[Union[list[Optional[str]], str]], *args: Any,
|
||||
**kwargs: Any) -> configargparse.Action:
|
||||
action = kwargs.get("action")
|
||||
if action is util.DeprecatedArgumentAction:
|
||||
|
|
@ -511,7 +508,7 @@ class HelpfulArgumentParser:
|
|||
plugin_ep.plugin_cls.inject_parser_options(parser_or_group, name)
|
||||
|
||||
def determine_help_topics(self, chosen_topic: Union[str, bool]
|
||||
) -> Dict[Optional[str], bool]:
|
||||
) -> dict[Optional[str], bool]:
|
||||
"""
|
||||
|
||||
The user may have requested help on a topic, return a dict of which
|
||||
|
|
|
|||
|
|
@ -5,11 +5,8 @@ import platform
|
|||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import cast
|
||||
from typing import Dict
|
||||
from typing import IO
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
|
@ -152,7 +149,7 @@ def sample_user_agent() -> str:
|
|||
|
||||
def register(config: configuration.NamespaceConfig, account_storage: AccountStorage,
|
||||
tos_cb: Optional[Callable[[str], None]] = None
|
||||
) -> Tuple[account.Account, acme_client.ClientV2]:
|
||||
) -> tuple[account.Account, acme_client.ClientV2]:
|
||||
"""Register new account with an ACME CA.
|
||||
|
||||
This function takes care of generating fresh private key,
|
||||
|
|
@ -232,7 +229,7 @@ def perform_registration(acme: acme_client.ClientV2, config: configuration.Names
|
|||
raise errors.Error("acme client with no private key cannot register account.")
|
||||
|
||||
eab_credentials_supplied = config.eab_kid and config.eab_hmac_key
|
||||
eab: Optional[Dict[str, Any]]
|
||||
eab: Optional[dict[str, Any]]
|
||||
if eab_credentials_supplied:
|
||||
account_public_key = acme.net.key.public_key()
|
||||
eab = messages.ExternalAccountBinding.from_data(account_public_key=account_public_key,
|
||||
|
|
@ -308,7 +305,7 @@ class Client:
|
|||
|
||||
def obtain_certificate_from_csr(self, csr: util.CSR,
|
||||
orderr: Optional[messages.OrderResource] = None
|
||||
) -> Tuple[bytes, bytes]:
|
||||
) -> tuple[bytes, bytes]:
|
||||
"""Obtain certificate.
|
||||
|
||||
:param .util.CSR csr: PEM-encoded Certificate Signing
|
||||
|
|
@ -351,8 +348,8 @@ class Client:
|
|||
cert, chain = crypto_util.cert_and_chain_from_fullchain(fullchain)
|
||||
return cert.encode(), chain.encode()
|
||||
|
||||
def obtain_certificate(self, domains: List[str], old_keypath: Optional[str] = None
|
||||
) -> Tuple[bytes, bytes, util.Key, util.CSR]:
|
||||
def obtain_certificate(self, domains: list[str], old_keypath: Optional[str] = None
|
||||
) -> tuple[bytes, bytes, util.Key, util.CSR]:
|
||||
"""Obtains a certificate from the ACME server.
|
||||
|
||||
`.register` must be called before `.obtain_certificate`
|
||||
|
|
@ -508,7 +505,7 @@ class Client:
|
|||
authzr = self.auth_handler.handle_authorizations(orderr, self.config, best_effort)
|
||||
return orderr.update(authorizations=authzr)
|
||||
|
||||
def obtain_and_enroll_certificate(self, domains: List[str], certname: Optional[str]
|
||||
def obtain_and_enroll_certificate(self, domains: list[str], certname: Optional[str]
|
||||
) -> Optional[storage.RenewableCert]:
|
||||
"""Obtain and enroll certificate.
|
||||
|
||||
|
|
@ -542,8 +539,8 @@ class Client:
|
|||
key.pem, chain,
|
||||
self.config)
|
||||
|
||||
def _successful_domains_from_error(self, error: messages.Error, domains: List[str],
|
||||
) -> List[str]:
|
||||
def _successful_domains_from_error(self, error: messages.Error, domains: list[str],
|
||||
) -> list[str]:
|
||||
if error.subproblems is not None:
|
||||
failed_domains = [problem.identifier.value for problem in error.subproblems
|
||||
if problem.identifier is not None]
|
||||
|
|
@ -551,16 +548,16 @@ class Client:
|
|||
return successful_domains
|
||||
return []
|
||||
|
||||
def _retry_obtain_certificate(self, domains: List[str], successful_domains: List[str],
|
||||
def _retry_obtain_certificate(self, domains: list[str], successful_domains: list[str],
|
||||
old_keypath: Optional[str]
|
||||
) -> Tuple[bytes, bytes, util.Key, util.CSR]:
|
||||
) -> tuple[bytes, bytes, util.Key, util.CSR]:
|
||||
failed_domains = [d for d in domains if d not in successful_domains]
|
||||
domains_list = ", ".join(failed_domains)
|
||||
display_util.notify("Unable to obtain a certificate with every requested "
|
||||
f"domain. Retrying without: {domains_list}")
|
||||
return self.obtain_certificate(successful_domains, old_keypath)
|
||||
|
||||
def _choose_lineagename(self, domains: List[str], certname: Optional[str]) -> str:
|
||||
def _choose_lineagename(self, domains: list[str], certname: Optional[str]) -> str:
|
||||
"""Chooses a name for the new lineage.
|
||||
|
||||
:param domains: domains in certificate request
|
||||
|
|
@ -609,7 +606,7 @@ class Client:
|
|||
|
||||
def save_certificate(self, cert_pem: bytes, chain_pem: bytes,
|
||||
cert_path: str, chain_path: str, fullchain_path: str
|
||||
) -> Tuple[str, str, str]:
|
||||
) -> tuple[str, str, str]:
|
||||
"""Saves the certificate received from the ACME server.
|
||||
|
||||
:param bytes cert_pem:
|
||||
|
|
@ -644,7 +641,7 @@ class Client:
|
|||
|
||||
return abs_cert_path, abs_chain_path, abs_fullchain_path
|
||||
|
||||
def deploy_certificate(self, domains: List[str], privkey_path: str, cert_path: str,
|
||||
def deploy_certificate(self, domains: list[str], privkey_path: str, cert_path: str,
|
||||
chain_path: str, fullchain_path: str) -> None:
|
||||
"""Install certificate
|
||||
|
||||
|
|
@ -683,7 +680,7 @@ class Client:
|
|||
# sites may have been enabled / final cleanup
|
||||
self.installer.restart()
|
||||
|
||||
def enhance_config(self, domains: List[str], chain_path: str,
|
||||
def enhance_config(self, domains: list[str], chain_path: str,
|
||||
redirect_default: bool = True) -> None:
|
||||
"""Enhance the configuration.
|
||||
|
||||
|
|
@ -727,7 +724,7 @@ class Client:
|
|||
with error_handler.ErrorHandler(self._rollback_and_restart, msg):
|
||||
self.installer.restart()
|
||||
|
||||
def apply_enhancement(self, domains: List[str], enhancement: str,
|
||||
def apply_enhancement(self, domains: list[str], enhancement: str,
|
||||
options: Optional[str] = None) -> None:
|
||||
"""Applies an enhancement on all domains.
|
||||
|
||||
|
|
@ -859,7 +856,7 @@ def rollback(default_installer: str, checkpoints: int,
|
|||
|
||||
|
||||
def _open_pem_file(config: configuration.NamespaceConfig,
|
||||
cli_arg_path: str, pem_path: str) -> Tuple[IO, str]:
|
||||
cli_arg_path: str, pem_path: str) -> tuple[IO, str]:
|
||||
"""Open a pem file.
|
||||
|
||||
If cli_arg_path was set by the client, open that.
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import importlib.resources
|
|||
import logging
|
||||
from contextlib import ExitStack
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
|
||||
from acme import challenges
|
||||
from certbot.compat import misc
|
||||
|
|
@ -16,7 +15,7 @@ SETUPTOOLS_PLUGINS_ENTRY_POINT = "certbot.plugins"
|
|||
OLD_SETUPTOOLS_PLUGINS_ENTRY_POINT = "letsencrypt.plugins"
|
||||
"""Plugins Setuptools entry point before rename."""
|
||||
|
||||
CLI_DEFAULTS: Dict[str, Any] = dict( # pylint: disable=use-dict-literal
|
||||
CLI_DEFAULTS: dict[str, Any] = dict( # pylint: disable=use-dict-literal
|
||||
config_files=[
|
||||
os.path.join(misc.get_default_folder('config'), 'cli.ini'),
|
||||
# https://freedesktop.org/wiki/Software/xdg-user-dirs/
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ from typing import Callable
|
|||
from typing import Iterator
|
||||
from typing import Literal
|
||||
from typing import Optional
|
||||
from typing import Type
|
||||
|
||||
# readline module is not available on all systems
|
||||
try:
|
||||
|
|
@ -65,7 +64,7 @@ class Completer:
|
|||
else:
|
||||
readline.parse_and_bind('tab: complete')
|
||||
|
||||
def __exit__(self, unused_type: Optional[Type[BaseException]],
|
||||
def __exit__(self, unused_type: Optional[type[BaseException]],
|
||||
unused_value: Optional[BaseException],
|
||||
unused_traceback: Optional[TracebackType]) -> 'Literal[False]':
|
||||
readline.set_completer_delims(self._original_delims)
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
"""A dummy module with no effect for use on systems without readline."""
|
||||
from typing import Callable
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
|
||||
|
||||
|
|
@ -9,7 +8,7 @@ def get_completer() -> Optional[Callable[[], str]]:
|
|||
"""An empty implementation of readline.get_completer."""
|
||||
|
||||
|
||||
def get_completer_delims() -> List[str]:
|
||||
def get_completer_delims() -> list[str]:
|
||||
"""An empty implementation of readline.get_completer_delims."""
|
||||
return []
|
||||
|
||||
|
|
|
|||
|
|
@ -3,10 +3,8 @@ import logging
|
|||
import sys
|
||||
from typing import Any
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import TextIO
|
||||
from typing import Tuple
|
||||
from typing import TypeVar
|
||||
from typing import Union
|
||||
|
||||
|
|
@ -90,11 +88,11 @@ class FileDisplay:
|
|||
else:
|
||||
logger.debug("Not pausing for user confirmation")
|
||||
|
||||
def menu(self, message: str, choices: Union[List[Tuple[str, str]], List[str]],
|
||||
def menu(self, message: str, choices: Union[list[tuple[str, str]], list[str]],
|
||||
ok_label: Optional[str] = None, cancel_label: Optional[str] = None, # pylint: disable=unused-argument
|
||||
help_label: Optional[str] = None, default: Optional[int] = None, # pylint: disable=unused-argument
|
||||
cli_flag: Optional[str] = None, force_interactive: bool = False,
|
||||
**unused_kwargs: Any) -> Tuple[str, int]:
|
||||
**unused_kwargs: Any) -> tuple[str, int]:
|
||||
"""Display a menu.
|
||||
|
||||
.. todo:: This doesn't enable the help label/button (I wasn't sold on
|
||||
|
|
@ -127,7 +125,7 @@ class FileDisplay:
|
|||
return code, selection - 1
|
||||
|
||||
def input(self, message: str, default: Optional[str] = None, cli_flag: Optional[str] = None,
|
||||
force_interactive: bool = False, **unused_kwargs: Any) -> Tuple[str, str]:
|
||||
force_interactive: bool = False, **unused_kwargs: Any) -> tuple[str, str]:
|
||||
"""Accept input from the user.
|
||||
|
||||
:param str message: message to display to the user
|
||||
|
|
@ -199,9 +197,9 @@ class FileDisplay:
|
|||
ans.startswith(no_label[0].upper())):
|
||||
return False
|
||||
|
||||
def checklist(self, message: str, tags: List[str], default: Optional[List[str]] = None,
|
||||
def checklist(self, message: str, tags: list[str], default: Optional[list[str]] = None,
|
||||
cli_flag: Optional[str] = None, force_interactive: bool = False,
|
||||
**unused_kwargs: Any) -> Tuple[str, List[str]]:
|
||||
**unused_kwargs: Any) -> tuple[str, list[str]]:
|
||||
"""Display a checklist.
|
||||
|
||||
:param str message: Message to display to user
|
||||
|
|
@ -294,7 +292,7 @@ class FileDisplay:
|
|||
|
||||
def directory_select(self, message: str, default: Optional[str] = None,
|
||||
cli_flag: Optional[str] = None, force_interactive: bool = False,
|
||||
**unused_kwargs: Any) -> Tuple[str, str]:
|
||||
**unused_kwargs: Any) -> tuple[str, str]:
|
||||
"""Display a directory selection screen.
|
||||
|
||||
:param str message: prompt to give the user
|
||||
|
|
@ -312,7 +310,7 @@ class FileDisplay:
|
|||
return self.input(message, default, cli_flag, force_interactive)
|
||||
|
||||
def _scrub_checklist_input(self, indices: Iterable[Union[str, int]],
|
||||
tags: List[str]) -> List[str]:
|
||||
tags: list[str]) -> list[str]:
|
||||
"""Validate input and transform indices to appropriate tags.
|
||||
|
||||
:param list indices: input
|
||||
|
|
@ -340,7 +338,7 @@ class FileDisplay:
|
|||
return [tags[index - 1] for index in indices_int]
|
||||
|
||||
def _print_menu(self, message: str,
|
||||
choices: Union[List[Tuple[str, str]], List[str]]) -> None:
|
||||
choices: Union[list[tuple[str, str]], list[str]]) -> None:
|
||||
"""Print a menu on the screen.
|
||||
|
||||
:param str message: title of menu
|
||||
|
|
@ -368,7 +366,7 @@ class FileDisplay:
|
|||
self.outfile.write(SIDE_FRAME + os.linesep)
|
||||
self.outfile.flush()
|
||||
|
||||
def _get_valid_int_ans(self, max_: int) -> Tuple[str, int]:
|
||||
def _get_valid_int_ans(self, max_: int) -> tuple[str, int]:
|
||||
"""Get a numerical selection.
|
||||
|
||||
:param int max: The maximum entry (len of choices), must be positive
|
||||
|
|
@ -446,10 +444,10 @@ class NoninteractiveDisplay:
|
|||
)
|
||||
self.outfile.flush()
|
||||
|
||||
def menu(self, message: str, choices: Union[List[Tuple[str, str]], List[str]],
|
||||
def menu(self, message: str, choices: Union[list[tuple[str, str]], list[str]],
|
||||
ok_label: Optional[str] = None, cancel_label: Optional[str] = None,
|
||||
help_label: Optional[str] = None, default: Optional[int] = None,
|
||||
cli_flag: Optional[str] = None, **unused_kwargs: Any) -> Tuple[str, int]:
|
||||
cli_flag: Optional[str] = None, **unused_kwargs: Any) -> tuple[str, int]:
|
||||
# pylint: disable=unused-argument
|
||||
"""Avoid displaying a menu.
|
||||
|
||||
|
|
@ -473,7 +471,7 @@ class NoninteractiveDisplay:
|
|||
return OK, default
|
||||
|
||||
def input(self, message: str, default: Optional[str] = None, cli_flag: Optional[str] = None,
|
||||
**unused_kwargs: Any) -> Tuple[str, str]:
|
||||
**unused_kwargs: Any) -> tuple[str, str]:
|
||||
"""Accept input from the user.
|
||||
|
||||
:param str message: message to display to the user
|
||||
|
|
@ -506,8 +504,8 @@ class NoninteractiveDisplay:
|
|||
raise self._interaction_fail(message, cli_flag)
|
||||
return default
|
||||
|
||||
def checklist(self, message: str, tags: Iterable[str], default: Optional[List[str]] = None,
|
||||
cli_flag: Optional[str] = None, **unused_kwargs: Any) -> Tuple[str, List[str]]:
|
||||
def checklist(self, message: str, tags: Iterable[str], default: Optional[list[str]] = None,
|
||||
cli_flag: Optional[str] = None, **unused_kwargs: Any) -> tuple[str, list[str]]:
|
||||
"""Display a checklist.
|
||||
|
||||
:param str message: Message to display to user
|
||||
|
|
@ -525,7 +523,7 @@ class NoninteractiveDisplay:
|
|||
return OK, default
|
||||
|
||||
def directory_select(self, message: str, default: Optional[str] = None,
|
||||
cli_flag: Optional[str] = None, **unused_kwargs: Any) -> Tuple[str, str]:
|
||||
cli_flag: Optional[str] = None, **unused_kwargs: Any) -> tuple[str, str]:
|
||||
"""Simulate prompting the user for a directory.
|
||||
|
||||
This function returns default if it is not ``None``, otherwise,
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
"""Internal Certbot display utilities."""
|
||||
import sys
|
||||
import textwrap
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
|
||||
from acme import messages as acme_messages
|
||||
|
|
@ -69,7 +68,7 @@ def input_with_timeout(prompt: Optional[str] = None, timeout: float = 36000.0) -
|
|||
return line.rstrip('\n')
|
||||
|
||||
|
||||
def separate_list_input(input_: str) -> List[str]:
|
||||
def separate_list_input(input_: str) -> list[str]:
|
||||
"""Separate a comma or space separated list.
|
||||
|
||||
:param str input_: input from the user
|
||||
|
|
@ -84,7 +83,7 @@ def separate_list_input(input_: str) -> List[str]:
|
|||
return [str(string) for string in no_commas.split()]
|
||||
|
||||
|
||||
def summarize_domain_list(domains: List[str]) -> str:
|
||||
def summarize_domain_list(domains: list[str]) -> str:
|
||||
"""Summarizes a list of domains in the format of:
|
||||
example.com.com and N more domains
|
||||
or if there is are only two domains:
|
||||
|
|
|
|||
|
|
@ -6,10 +6,7 @@ import traceback
|
|||
from types import TracebackType
|
||||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Type
|
||||
from typing import Union
|
||||
|
||||
from certbot import errors
|
||||
|
|
@ -80,9 +77,9 @@ class ErrorHandler:
|
|||
def __init__(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
|
||||
self.call_on_regular_exit = False
|
||||
self.body_executed = False
|
||||
self.funcs: List[Callable[[], Any]] = []
|
||||
self.prev_handlers: Dict[int, Union[int, None, Callable]] = {}
|
||||
self.received_signals: List[int] = []
|
||||
self.funcs: list[Callable[[], Any]] = []
|
||||
self.prev_handlers: dict[int, Union[int, None, Callable]] = {}
|
||||
self.received_signals: list[int] = []
|
||||
if func is not None:
|
||||
self.register(func, *args, **kwargs)
|
||||
|
||||
|
|
@ -90,7 +87,7 @@ class ErrorHandler:
|
|||
self.body_executed = False
|
||||
self._set_signal_handlers()
|
||||
|
||||
def __exit__(self, exec_type: Optional[Type[BaseException]],
|
||||
def __exit__(self, exec_type: Optional[type[BaseException]],
|
||||
exec_value: Optional[BaseException],
|
||||
trace: Optional[TracebackType]) -> bool:
|
||||
self.body_executed = True
|
||||
|
|
|
|||
|
|
@ -1,10 +1,7 @@
|
|||
"""Facilities for implementing hooks that call shell commands."""
|
||||
|
||||
import logging
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Set
|
||||
|
||||
from certbot import configuration
|
||||
from certbot import errors
|
||||
|
|
@ -75,14 +72,14 @@ def pre_hook(config: configuration.NamespaceConfig) -> None:
|
|||
:param configuration.NamespaceConfig config: Certbot settings
|
||||
|
||||
"""
|
||||
all_hooks: List[str] = (list_hooks(config.renewal_pre_hooks_dir) if config.directory_hooks
|
||||
all_hooks: list[str] = (list_hooks(config.renewal_pre_hooks_dir) if config.directory_hooks
|
||||
else [])
|
||||
all_hooks += [config.pre_hook] if config.pre_hook else []
|
||||
for hook in all_hooks:
|
||||
_run_pre_hook_if_necessary(hook)
|
||||
|
||||
|
||||
executed_pre_hooks: Set[str] = set()
|
||||
executed_pre_hooks: set[str] = set()
|
||||
|
||||
|
||||
def _run_pre_hook_if_necessary(command: str) -> None:
|
||||
|
|
@ -103,7 +100,7 @@ def _run_pre_hook_if_necessary(command: str) -> None:
|
|||
|
||||
def post_hook(
|
||||
config: configuration.NamespaceConfig,
|
||||
renewed_domains: List[str]
|
||||
renewed_domains: list[str]
|
||||
) -> None:
|
||||
|
||||
"""Run post-hooks if defined.
|
||||
|
|
@ -123,7 +120,7 @@ def post_hook(
|
|||
|
||||
"""
|
||||
|
||||
all_hooks: List[str] = (list_hooks(config.renewal_post_hooks_dir) if config.directory_hooks
|
||||
all_hooks: list[str] = (list_hooks(config.renewal_post_hooks_dir) if config.directory_hooks
|
||||
else [])
|
||||
all_hooks += [config.post_hook] if config.post_hook else []
|
||||
# In the "renew" case, we save these up to run at the end
|
||||
|
|
@ -150,7 +147,7 @@ def post_hook(
|
|||
)
|
||||
|
||||
|
||||
post_hooks: List[str] = []
|
||||
post_hooks: list[str] = []
|
||||
|
||||
|
||||
def _run_eventually(command: str) -> None:
|
||||
|
|
@ -166,7 +163,7 @@ def _run_eventually(command: str) -> None:
|
|||
post_hooks.append(command)
|
||||
|
||||
|
||||
def run_saved_post_hooks(renewed_domains: List[str], failed_domains: List[str]) -> None:
|
||||
def run_saved_post_hooks(renewed_domains: list[str], failed_domains: list[str]) -> None:
|
||||
"""Run any post hooks that were saved up in the course of the 'renew' verb"""
|
||||
|
||||
renewed_domains_str = ' '.join(renewed_domains)
|
||||
|
|
@ -192,7 +189,7 @@ def run_saved_post_hooks(renewed_domains: List[str], failed_domains: List[str])
|
|||
)
|
||||
|
||||
|
||||
def deploy_hook(config: configuration.NamespaceConfig, domains: List[str],
|
||||
def deploy_hook(config: configuration.NamespaceConfig, domains: list[str],
|
||||
lineage_path: str) -> None:
|
||||
"""Run post-issuance hook if defined.
|
||||
|
||||
|
|
@ -207,7 +204,7 @@ def deploy_hook(config: configuration.NamespaceConfig, domains: List[str],
|
|||
lineage_path, config.dry_run, config.run_deploy_hooks)
|
||||
|
||||
|
||||
def renew_hook(config: configuration.NamespaceConfig, domains: List[str],
|
||||
def renew_hook(config: configuration.NamespaceConfig, domains: list[str],
|
||||
lineage_path: str) -> None:
|
||||
"""Run post-renewal hooks.
|
||||
|
||||
|
|
@ -226,7 +223,7 @@ def renew_hook(config: configuration.NamespaceConfig, domains: List[str],
|
|||
|
||||
"""
|
||||
executed_hooks = set()
|
||||
all_hooks: List[str] = (list_hooks(config.renewal_deploy_hooks_dir)if config.directory_hooks
|
||||
all_hooks: list[str] = (list_hooks(config.renewal_deploy_hooks_dir)if config.directory_hooks
|
||||
else [])
|
||||
all_hooks += [config.renew_hook] if config.renew_hook else []
|
||||
for hook in all_hooks:
|
||||
|
|
@ -237,7 +234,7 @@ def renew_hook(config: configuration.NamespaceConfig, domains: List[str],
|
|||
executed_hooks.add(hook)
|
||||
|
||||
|
||||
def _run_deploy_hook(command: str, domains: List[str], lineage_path: str, dry_run: bool,
|
||||
def _run_deploy_hook(command: str, domains: list[str], lineage_path: str, dry_run: bool,
|
||||
run_deploy_hooks: bool) -> None:
|
||||
"""Run the specified deploy-hook (if not doing a dry run).
|
||||
|
||||
|
|
@ -263,7 +260,7 @@ def _run_deploy_hook(command: str, domains: List[str], lineage_path: str, dry_ru
|
|||
_run_hook("deploy-hook", command)
|
||||
|
||||
|
||||
def _run_hook(cmd_name: str, shell_cmd: str, extra_env: Optional[Dict[str, str]] = None) -> str:
|
||||
def _run_hook(cmd_name: str, shell_cmd: str, extra_env: Optional[dict[str, str]] = None) -> str:
|
||||
"""Run a hook command.
|
||||
|
||||
:param str cmd_name: the user facing name of the hook being run
|
||||
|
|
@ -281,7 +278,7 @@ def _run_hook(cmd_name: str, shell_cmd: str, extra_env: Optional[Dict[str, str]]
|
|||
return err
|
||||
|
||||
|
||||
def list_hooks(dir_path: str) -> List[str]:
|
||||
def list_hooks(dir_path: str) -> list[str]:
|
||||
"""List paths to all hooks found in dir_path in sorted order.
|
||||
|
||||
:param str dir_path: directory to search
|
||||
|
|
|
|||
|
|
@ -33,8 +33,6 @@ from typing import Any
|
|||
from typing import cast
|
||||
from typing import IO
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
from typing import Type
|
||||
|
||||
from acme import messages
|
||||
from certbot import configuration
|
||||
|
|
@ -148,7 +146,7 @@ def post_arg_parse_setup(config: configuration.NamespaceConfig) -> None:
|
|||
|
||||
|
||||
def setup_log_file_handler(config: configuration.NamespaceConfig, logfile: str,
|
||||
fmt: str) -> Tuple[logging.Handler, str]:
|
||||
fmt: str) -> tuple[logging.Handler, str]:
|
||||
"""Setup file debug logging.
|
||||
|
||||
:param certbot.configuration.NamespaceConfig config: Configuration object
|
||||
|
|
@ -332,7 +330,7 @@ def pre_arg_parse_except_hook(memory_handler: MemoryHandler,
|
|||
memory_handler.flush(force=True)
|
||||
|
||||
|
||||
def post_arg_parse_except_hook(exc_type: Type[BaseException], exc_value: BaseException,
|
||||
def post_arg_parse_except_hook(exc_type: type[BaseException], exc_value: BaseException,
|
||||
trace: TracebackType, debug: bool, quiet: bool,
|
||||
log_path: str) -> None:
|
||||
"""Logs fatal exceptions and reports them to the user.
|
||||
|
|
|
|||
|
|
@ -10,9 +10,7 @@ from typing import cast
|
|||
from typing import Generator
|
||||
from typing import IO
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
from typing import TypeVar
|
||||
from typing import Union
|
||||
|
||||
|
|
@ -87,7 +85,7 @@ def _suggest_donation_if_appropriate(config: configuration.NamespaceConfig) -> N
|
|||
|
||||
|
||||
def _get_and_save_cert(le_client: client.Client, config: configuration.NamespaceConfig,
|
||||
domains: Optional[List[str]] = None, certname: Optional[str] = None,
|
||||
domains: Optional[list[str]] = None, certname: Optional[str] = None,
|
||||
lineage: Optional[storage.RenewableCert] = None
|
||||
) -> Optional[storage.RenewableCert]:
|
||||
"""Authenticate and enroll certificate.
|
||||
|
|
@ -115,7 +113,7 @@ def _get_and_save_cert(le_client: client.Client, config: configuration.Namespace
|
|||
|
||||
"""
|
||||
hooks.pre_hook(config)
|
||||
renewed_domains: List[str] = []
|
||||
renewed_domains: list[str] = []
|
||||
|
||||
try:
|
||||
if lineage is not None:
|
||||
|
|
@ -200,7 +198,7 @@ def _handle_unexpected_key_type_migration(config: configuration.NamespaceConfig,
|
|||
def _handle_subset_cert_request(config: configuration.NamespaceConfig,
|
||||
domains: Iterable[str],
|
||||
cert: storage.RenewableCert
|
||||
) -> Tuple[str, Optional[storage.RenewableCert]]:
|
||||
) -> tuple[str, Optional[storage.RenewableCert]]:
|
||||
"""Figure out what to do if a previous cert had a subset of the names now requested
|
||||
|
||||
:param config: Configuration object
|
||||
|
|
@ -247,7 +245,7 @@ def _handle_subset_cert_request(config: configuration.NamespaceConfig,
|
|||
|
||||
def _handle_identical_cert_request(config: configuration.NamespaceConfig,
|
||||
lineage: storage.RenewableCert,
|
||||
) -> Tuple[str, Optional[storage.RenewableCert]]:
|
||||
) -> tuple[str, Optional[storage.RenewableCert]]:
|
||||
"""Figure out what to do if a lineage has the same names as a previously obtained one
|
||||
|
||||
:param config: Configuration object
|
||||
|
|
@ -305,8 +303,8 @@ def _handle_identical_cert_request(config: configuration.NamespaceConfig,
|
|||
raise AssertionError('This is impossible')
|
||||
|
||||
|
||||
def _find_lineage_for_domains(config: configuration.NamespaceConfig, domains: List[str]
|
||||
) -> Tuple[Optional[str], Optional[storage.RenewableCert]]:
|
||||
def _find_lineage_for_domains(config: configuration.NamespaceConfig, domains: list[str]
|
||||
) -> tuple[Optional[str], Optional[storage.RenewableCert]]:
|
||||
"""Determine whether there are duplicated names and how to handle
|
||||
them (renew, reinstall, newcert, or raising an error to stop
|
||||
the client run if the user chooses to cancel the operation when
|
||||
|
|
@ -346,8 +344,8 @@ def _find_lineage_for_domains(config: configuration.NamespaceConfig, domains: Li
|
|||
return None, None
|
||||
|
||||
|
||||
def _find_cert(config: configuration.NamespaceConfig, domains: List[str], certname: str
|
||||
) -> Tuple[bool, Optional[storage.RenewableCert]]:
|
||||
def _find_cert(config: configuration.NamespaceConfig, domains: list[str], certname: str
|
||||
) -> tuple[bool, Optional[storage.RenewableCert]]:
|
||||
"""Finds an existing certificate object given domains and/or a certificate name.
|
||||
|
||||
:param config: Configuration object
|
||||
|
|
@ -372,8 +370,8 @@ def _find_cert(config: configuration.NamespaceConfig, domains: List[str], certna
|
|||
|
||||
|
||||
def _find_lineage_for_domains_and_certname(
|
||||
config: configuration.NamespaceConfig, domains: List[str],
|
||||
certname: str) -> Tuple[Optional[str], Optional[storage.RenewableCert]]:
|
||||
config: configuration.NamespaceConfig, domains: list[str],
|
||||
certname: str) -> tuple[Optional[str], Optional[storage.RenewableCert]]:
|
||||
"""Find appropriate lineage based on given domains and/or certname.
|
||||
|
||||
:param config: Configuration object
|
||||
|
|
@ -417,7 +415,7 @@ def _find_lineage_for_domains_and_certname(
|
|||
T = TypeVar("T")
|
||||
|
||||
|
||||
def _get_added_removed(after: Iterable[T], before: Iterable[T]) -> Tuple[List[T], List[T]]:
|
||||
def _get_added_removed(after: Iterable[T], before: Iterable[T]) -> tuple[list[T], list[T]]:
|
||||
"""Get lists of items removed from `before`
|
||||
and a lists of items added to `after`
|
||||
"""
|
||||
|
|
@ -482,7 +480,7 @@ def _ask_user_to_confirm_new_names(config: configuration.NamespaceConfig,
|
|||
|
||||
def _find_domains_or_certname(config: configuration.NamespaceConfig,
|
||||
installer: Optional[interfaces.Installer],
|
||||
question: Optional[str] = None) -> Tuple[List[str], str]:
|
||||
question: Optional[str] = None) -> tuple[list[str], str]:
|
||||
"""Retrieve domains and certname from config or user input.
|
||||
|
||||
:param config: Configuration object
|
||||
|
|
@ -543,7 +541,7 @@ def _report_next_steps(config: configuration.NamespaceConfig, installer_err: Opt
|
|||
being saved (created or renewed).
|
||||
|
||||
"""
|
||||
steps: List[str] = []
|
||||
steps: list[str] = []
|
||||
|
||||
# If the installation or enhancement raised an error, show advice on trying again
|
||||
if installer_err:
|
||||
|
|
@ -696,7 +694,7 @@ def _csr_report_new_cert(config: configuration.NamespaceConfig, cert_path: Optio
|
|||
|
||||
|
||||
def _determine_account(config: configuration.NamespaceConfig
|
||||
) -> Tuple[account.Account,
|
||||
) -> tuple[account.Account,
|
||||
Optional[acme_client.ClientV2]]:
|
||||
"""Determine which account to use.
|
||||
|
||||
|
|
@ -1037,7 +1035,7 @@ def _cert_name_from_config_or_lineage(config: configuration.NamespaceConfig,
|
|||
|
||||
|
||||
def _install_cert(config: configuration.NamespaceConfig, le_client: client.Client,
|
||||
domains: List[str], lineage: Optional[storage.RenewableCert] = None) -> None:
|
||||
domains: list[str], lineage: Optional[storage.RenewableCert] = None) -> None:
|
||||
"""Install a cert
|
||||
|
||||
:param config: Configuration object
|
||||
|
|
@ -1469,7 +1467,7 @@ def run(config: configuration.NamespaceConfig,
|
|||
|
||||
|
||||
def _csr_get_and_save_cert(config: configuration.NamespaceConfig,
|
||||
le_client: client.Client) -> Tuple[
|
||||
le_client: client.Client) -> tuple[
|
||||
Optional[str], Optional[str], Optional[str]]:
|
||||
"""Obtain a cert using a user-supplied CSR
|
||||
|
||||
|
|
@ -1823,7 +1821,7 @@ def make_displayer(config: configuration.NamespaceConfig
|
|||
devnull.close()
|
||||
|
||||
|
||||
def main(cli_args: Optional[List[str]] = None) -> Optional[Union[str, int]]:
|
||||
def main(cli_args: Optional[list[str]] = None) -> Optional[Union[str, int]]:
|
||||
"""Run Certbot.
|
||||
|
||||
:param cli_args: command line to Certbot, defaults to ``sys.argv[1:]``
|
||||
|
|
|
|||
|
|
@ -3,13 +3,10 @@ import logging
|
|||
import sys
|
||||
from typing import Callable
|
||||
from typing import cast
|
||||
from typing import Dict
|
||||
from typing import Iterable
|
||||
from typing import Iterator
|
||||
from typing import List
|
||||
from typing import Mapping
|
||||
from typing import Optional
|
||||
from typing import Type
|
||||
from typing import Union
|
||||
|
||||
from certbot import configuration
|
||||
|
|
@ -39,7 +36,7 @@ class PluginEntryPoint:
|
|||
|
||||
def __init__(self, entry_point: importlib_metadata.EntryPoint) -> None:
|
||||
self.name = self.entry_point_to_plugin_name(entry_point)
|
||||
self.plugin_cls: Type[interfaces.Plugin] = entry_point.load()
|
||||
self.plugin_cls: type[interfaces.Plugin] = entry_point.load()
|
||||
self.entry_point = entry_point
|
||||
self.warning_message: Optional[str] = None
|
||||
self._initialized: Optional[interfaces.Plugin] = None
|
||||
|
|
@ -76,7 +73,7 @@ class PluginEntryPoint:
|
|||
"""Should this plugin be hidden from UI?"""
|
||||
return getattr(self.plugin_cls, "hidden", False)
|
||||
|
||||
def ifaces(self, *ifaces_groups: Iterable[Type]) -> bool:
|
||||
def ifaces(self, *ifaces_groups: Iterable[type]) -> bool:
|
||||
"""Does plugin implement specified interface groups?"""
|
||||
return not ifaces_groups or any(
|
||||
all(issubclass(self.plugin_cls, iface)
|
||||
|
|
@ -182,7 +179,7 @@ class PluginsRegistry(Mapping):
|
|||
entry points.
|
||||
|
||||
"""
|
||||
plugins: Dict[str, PluginEntryPoint] = {}
|
||||
plugins: dict[str, PluginEntryPoint] = {}
|
||||
plugin_paths_string = os.getenv('CERTBOT_PLUGIN_PATH')
|
||||
plugin_paths = plugin_paths_string.split(':') if plugin_paths_string else []
|
||||
# XXX should ensure this only happens once
|
||||
|
|
@ -204,7 +201,7 @@ class PluginsRegistry(Mapping):
|
|||
|
||||
@classmethod
|
||||
def _load_entry_point(cls, entry_point: importlib_metadata.EntryPoint,
|
||||
plugins: Dict[str, PluginEntryPoint]) -> None:
|
||||
plugins: dict[str, PluginEntryPoint]) -> None:
|
||||
plugin_ep = PluginEntryPoint(entry_point)
|
||||
if plugin_ep.name in plugins:
|
||||
other_ep = plugins[plugin_ep.name]
|
||||
|
|
@ -230,7 +227,7 @@ class PluginsRegistry(Mapping):
|
|||
def __len__(self) -> int:
|
||||
return len(self._plugins)
|
||||
|
||||
def init(self, config: configuration.NamespaceConfig) -> List[interfaces.Plugin]:
|
||||
def init(self, config: configuration.NamespaceConfig) -> list[interfaces.Plugin]:
|
||||
"""Initialize all plugins in the registry."""
|
||||
return [plugin_ep.init(config) for plugin_ep
|
||||
in self._plugins.values()]
|
||||
|
|
@ -244,11 +241,11 @@ class PluginsRegistry(Mapping):
|
|||
"""Filter plugins based on visibility."""
|
||||
return self.filter(lambda plugin_ep: not plugin_ep.hidden)
|
||||
|
||||
def ifaces(self, *ifaces_groups: Iterable[Type]) -> "PluginsRegistry":
|
||||
def ifaces(self, *ifaces_groups: Iterable[type]) -> "PluginsRegistry":
|
||||
"""Filter plugins based on interfaces."""
|
||||
return self.filter(lambda p_ep: p_ep.ifaces(*ifaces_groups))
|
||||
|
||||
def prepare(self) -> List[Union[bool, Error]]:
|
||||
def prepare(self) -> list[Union[bool, Error]]:
|
||||
"""Prepare all plugins in the registry."""
|
||||
return [plugin_ep.prepare() for plugin_ep in self._plugins.values()]
|
||||
|
||||
|
|
|
|||
|
|
@ -2,11 +2,7 @@
|
|||
import logging
|
||||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import Dict
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Tuple
|
||||
from typing import Type
|
||||
|
||||
from acme import challenges
|
||||
from certbot import achallenges
|
||||
|
|
@ -98,7 +94,7 @@ permitted by DNS standards.)
|
|||
super().__init__(*args, **kwargs)
|
||||
self.reverter = reverter.Reverter(self.config)
|
||||
self.reverter.recovery_routine()
|
||||
self.env: Dict[achallenges.AnnotatedChallenge, Dict[str, str]] = {}
|
||||
self.env: dict[achallenges.AnnotatedChallenge, dict[str, str]] = {}
|
||||
self.subsequent_dns_challenge = False
|
||||
self.subsequent_any_challenge = False
|
||||
|
||||
|
|
@ -132,7 +128,7 @@ permitted by DNS standards.)
|
|||
'the user or by performing the setup manually.')
|
||||
|
||||
def auth_hint(self, failed_achalls: Iterable[achallenges.AnnotatedChallenge]) -> str:
|
||||
def has_chall(cls: Type[challenges.Challenge]) -> bool:
|
||||
def has_chall(cls: type[challenges.Challenge]) -> bool:
|
||||
return any(isinstance(achall.chall, cls) for achall in failed_achalls)
|
||||
|
||||
has_dns = has_chall(challenges.DNS01)
|
||||
|
|
@ -167,12 +163,12 @@ permitted by DNS standards.)
|
|||
)
|
||||
)
|
||||
|
||||
def get_chall_pref(self, domain: str) -> Iterable[Type[challenges.Challenge]]:
|
||||
def get_chall_pref(self, domain: str) -> Iterable[type[challenges.Challenge]]:
|
||||
# pylint: disable=unused-argument,missing-function-docstring
|
||||
return [challenges.HTTP01, challenges.DNS01]
|
||||
|
||||
def perform(self, achalls: List[achallenges.AnnotatedChallenge]
|
||||
) -> List[challenges.ChallengeResponse]: # pylint: disable=missing-function-docstring
|
||||
def perform(self, achalls: list[achallenges.AnnotatedChallenge]
|
||||
) -> list[challenges.ChallengeResponse]: # pylint: disable=missing-function-docstring
|
||||
responses = []
|
||||
last_dns_achall = 0
|
||||
for i, achall in enumerate(achalls):
|
||||
|
|
@ -187,7 +183,7 @@ permitted by DNS standards.)
|
|||
return responses
|
||||
|
||||
def _perform_achall_with_script(self, achall: achallenges.AnnotatedChallenge,
|
||||
achalls: List[achallenges.AnnotatedChallenge]) -> None:
|
||||
achalls: list[achallenges.AnnotatedChallenge]) -> None:
|
||||
env = {
|
||||
"CERTBOT_DOMAIN": achall.domain,
|
||||
"CERTBOT_VALIDATION": achall.validation(achall.account_key),
|
||||
|
|
@ -245,7 +241,7 @@ permitted by DNS standards.)
|
|||
self._execute_hook('cleanup-hook', achall.domain)
|
||||
self.reverter.recovery_routine()
|
||||
|
||||
def _execute_hook(self, hook_name: str, achall_domain: str) -> Tuple[str, str]:
|
||||
def _execute_hook(self, hook_name: str, achall_domain: str) -> tuple[str, str]:
|
||||
returncode, err, out = misc.execute_command_status(
|
||||
self.option_name(hook_name), self.conf(hook_name),
|
||||
env=util.env_no_snap_for_external_calls()
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
"""Null plugin."""
|
||||
import logging
|
||||
from typing import Callable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
|
||||
|
|
@ -29,7 +28,7 @@ class Installer(common.Plugin, interfaces.Installer):
|
|||
def more_info(self) -> str:
|
||||
return "Installer that doesn't do anything (for testing)."
|
||||
|
||||
def get_all_names(self) -> List[str]:
|
||||
def get_all_names(self) -> list[str]:
|
||||
return []
|
||||
|
||||
def deploy_cert(self, domain: str, cert_path: str, key_path: str,
|
||||
|
|
@ -37,10 +36,10 @@ class Installer(common.Plugin, interfaces.Installer):
|
|||
pass # pragma: no cover
|
||||
|
||||
def enhance(self, domain: str, enhancement: str,
|
||||
options: Optional[Union[List[str], str]] = None) -> None:
|
||||
options: Optional[Union[list[str], str]] = None) -> None:
|
||||
pass # pragma: no cover
|
||||
|
||||
def supported_enhancements(self) -> List[str]:
|
||||
def supported_enhancements(self) -> list[str]:
|
||||
return []
|
||||
|
||||
def save(self, title: Optional[str] = None, temporary: bool = False) -> None:
|
||||
|
|
|
|||
|
|
@ -3,10 +3,7 @@
|
|||
import logging
|
||||
from typing import cast
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
from typing import Type
|
||||
from typing import TypeVar
|
||||
|
||||
from certbot import configuration
|
||||
|
|
@ -82,7 +79,7 @@ P = TypeVar('P', bound=interfaces.Plugin)
|
|||
|
||||
def pick_plugin(config: configuration.NamespaceConfig, default: Optional[str],
|
||||
plugins: disco.PluginsRegistry, question: str,
|
||||
ifaces: Iterable[Type]) -> Optional[P]:
|
||||
ifaces: Iterable[type]) -> Optional[P]:
|
||||
"""Pick plugin.
|
||||
|
||||
:param certbot.configuration.NamespaceConfig config: Configuration
|
||||
|
|
@ -136,7 +133,7 @@ def pick_plugin(config: configuration.NamespaceConfig, default: Optional[str],
|
|||
return None
|
||||
|
||||
|
||||
def choose_plugin(prepared: List[disco.PluginEntryPoint],
|
||||
def choose_plugin(prepared: list[disco.PluginEntryPoint],
|
||||
question: str) -> Optional[disco.PluginEntryPoint]:
|
||||
"""Allow the user to choose their plugin.
|
||||
|
||||
|
|
@ -193,7 +190,7 @@ def record_chosen_plugins(config: configuration.NamespaceConfig, plugins: disco.
|
|||
|
||||
def choose_configurator_plugins(config: configuration.NamespaceConfig,
|
||||
plugins: disco.PluginsRegistry,
|
||||
verb: str) -> Tuple[Optional[interfaces.Installer],
|
||||
verb: str) -> tuple[Optional[interfaces.Installer],
|
||||
Optional[interfaces.Authenticator]]:
|
||||
"""
|
||||
Figure out which configurator we're going to use, modifies
|
||||
|
|
@ -290,7 +287,7 @@ def set_configurator(previously: Optional[str], now: Optional[str]) -> Optional[
|
|||
|
||||
|
||||
def cli_plugin_requests(config: configuration.NamespaceConfig
|
||||
) -> Tuple[Optional[str], Optional[str]]:
|
||||
) -> tuple[Optional[str], Optional[str]]:
|
||||
"""
|
||||
Figure out which plugins the user requested with CLI and config options
|
||||
|
||||
|
|
|
|||
|
|
@ -4,13 +4,7 @@ import errno
|
|||
import logging
|
||||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import DefaultDict
|
||||
from typing import Dict
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Set
|
||||
from typing import Tuple
|
||||
from typing import Type
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from acme import challenges
|
||||
|
|
@ -24,9 +18,9 @@ from certbot.plugins import common
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
ServedType = DefaultDict[
|
||||
ServedType = collections.defaultdict[
|
||||
acme_standalone.BaseDualNetworkedServers,
|
||||
Set[achallenges.AnnotatedChallenge]
|
||||
set[achallenges.AnnotatedChallenge]
|
||||
]
|
||||
|
||||
|
||||
|
|
@ -34,12 +28,12 @@ class ServerManager:
|
|||
"""Manager for HTTP-01 standalone server instances."""
|
||||
|
||||
def __init__(self,
|
||||
http_01_resources: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource]
|
||||
http_01_resources: set[acme_standalone.HTTP01RequestHandler.HTTP01Resource]
|
||||
) -> None:
|
||||
self._instances: Dict[int, acme_standalone.HTTP01DualNetworkedServers] = {}
|
||||
self._instances: dict[int, acme_standalone.HTTP01DualNetworkedServers] = {}
|
||||
self.http_01_resources = http_01_resources
|
||||
|
||||
def run(self, port: int, challenge_type: Type[challenges.Challenge],
|
||||
def run(self, port: int, challenge_type: type[challenges.Challenge],
|
||||
listenaddr: str = "") -> acme_standalone.HTTP01DualNetworkedServers:
|
||||
"""Run ACME server on specified ``port``.
|
||||
|
||||
|
|
@ -87,7 +81,7 @@ class ServerManager:
|
|||
instance.shutdown_and_server_close()
|
||||
del self._instances[port]
|
||||
|
||||
def running(self) -> Dict[int, acme_standalone.HTTP01DualNetworkedServers]:
|
||||
def running(self) -> dict[int, acme_standalone.HTTP01DualNetworkedServers]:
|
||||
"""Return all running instances.
|
||||
|
||||
Once the server is stopped using `stop`, it will not be
|
||||
|
|
@ -122,7 +116,7 @@ running. HTTP challenge only (wildcards not supported)."""
|
|||
# values, main thread writes). Due to the nature of CPython's
|
||||
# GIL, the operations are safe, c.f.
|
||||
# https://docs.python.org/2/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe
|
||||
self.http_01_resources: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource] = set()
|
||||
self.http_01_resources: set[acme_standalone.HTTP01RequestHandler.HTTP01Resource] = set()
|
||||
|
||||
self.servers = ServerManager(self.http_01_resources)
|
||||
|
||||
|
|
@ -139,12 +133,12 @@ running. HTTP challenge only (wildcards not supported)."""
|
|||
def prepare(self) -> None: # pylint: disable=missing-function-docstring
|
||||
pass
|
||||
|
||||
def get_chall_pref(self, domain: str) -> Iterable[Type[challenges.Challenge]]:
|
||||
def get_chall_pref(self, domain: str) -> Iterable[type[challenges.Challenge]]:
|
||||
# pylint: disable=unused-argument,missing-function-docstring
|
||||
return [challenges.HTTP01]
|
||||
|
||||
def perform(self, achalls: Iterable[achallenges.AnnotatedChallenge]
|
||||
) -> List[challenges.ChallengeResponse]: # pylint: disable=missing-function-docstring
|
||||
) -> list[challenges.ChallengeResponse]: # pylint: disable=missing-function-docstring
|
||||
return [self._try_perform_single(achall) for achall in achalls]
|
||||
|
||||
def _try_perform_single(self,
|
||||
|
|
@ -162,7 +156,7 @@ running. HTTP challenge only (wildcards not supported)."""
|
|||
return response
|
||||
|
||||
def _perform_http_01(self, achall: achallenges.AnnotatedChallenge
|
||||
) -> Tuple[acme_standalone.HTTP01DualNetworkedServers,
|
||||
) -> tuple[acme_standalone.HTTP01DualNetworkedServers,
|
||||
challenges.ChallengeResponse]:
|
||||
port = self.config.http01_port
|
||||
addr = self.config.http01_address
|
||||
|
|
@ -183,7 +177,7 @@ running. HTTP challenge only (wildcards not supported)."""
|
|||
if not self.served[servers]:
|
||||
self.servers.stop(port)
|
||||
|
||||
def auth_hint(self, failed_achalls: List[achallenges.AnnotatedChallenge]) -> str:
|
||||
def auth_hint(self, failed_achalls: list[achallenges.AnnotatedChallenge]) -> str:
|
||||
port, addr = self.config.http01_port, self.config.http01_address
|
||||
neat_addr = f"{addr}:{port}" if addr else f"port {port}"
|
||||
return ("The Certificate Authority failed to download the challenge files from "
|
||||
|
|
|
|||
|
|
@ -5,14 +5,9 @@ import json
|
|||
import logging
|
||||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import DefaultDict
|
||||
from typing import Dict
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Sequence
|
||||
from typing import Set
|
||||
from typing import Type
|
||||
from typing import Union
|
||||
|
||||
from acme import challenges
|
||||
|
|
@ -88,27 +83,29 @@ to serve all files under specified web root ({0})."""
|
|||
"file, it needs to be on a single line, like: webroot-map = "
|
||||
'{"example.com":"/var/www"}.')
|
||||
|
||||
def auth_hint(self, failed_achalls: List[AnnotatedChallenge]) -> str: # pragma: no cover
|
||||
def auth_hint(self, failed_achalls: list[AnnotatedChallenge]) -> str: # pragma: no cover
|
||||
return ("The Certificate Authority failed to download the temporary challenge files "
|
||||
"created by Certbot. Ensure that the listed domains serve their content from "
|
||||
"the provided --webroot-path/-w and that files created there can be downloaded "
|
||||
"from the internet.")
|
||||
|
||||
def get_chall_pref(self, domain: str) -> Iterable[Type[challenges.Challenge]]:
|
||||
def get_chall_pref(self, domain: str) -> Iterable[type[challenges.Challenge]]:
|
||||
# pylint: disable=unused-argument,missing-function-docstring
|
||||
return [challenges.HTTP01]
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
self.full_roots: Dict[str, str] = {}
|
||||
self.performed: DefaultDict[str, Set[AnnotatedChallenge]] = collections.defaultdict(set)
|
||||
self.full_roots: dict[str, str] = {}
|
||||
self.performed: collections.defaultdict[str, set[AnnotatedChallenge]] = (
|
||||
collections.defaultdict(set)
|
||||
)
|
||||
# stack of dirs successfully created by this authenticator
|
||||
self._created_dirs: List[str] = []
|
||||
self._created_dirs: list[str] = []
|
||||
|
||||
def prepare(self) -> None: # pylint: disable=missing-function-docstring
|
||||
pass
|
||||
|
||||
def perform(self, achalls: List[AnnotatedChallenge]) -> List[challenges.ChallengeResponse]: # pylint: disable=missing-function-docstring
|
||||
def perform(self, achalls: list[AnnotatedChallenge]) -> list[challenges.ChallengeResponse]: # pylint: disable=missing-function-docstring
|
||||
self._set_webroots(achalls)
|
||||
|
||||
self._create_challenge_dirs()
|
||||
|
|
@ -137,7 +134,7 @@ to serve all files under specified web root ({0})."""
|
|||
known_webroots.insert(0, new_webroot)
|
||||
self.conf("map")[achall.domain] = new_webroot
|
||||
|
||||
def _prompt_for_webroot(self, domain: str, known_webroots: List[str]) -> Optional[str]:
|
||||
def _prompt_for_webroot(self, domain: str, known_webroots: list[str]) -> Optional[str]:
|
||||
webroot = None
|
||||
|
||||
while webroot is None:
|
||||
|
|
@ -153,7 +150,7 @@ to serve all files under specified web root ({0})."""
|
|||
return webroot
|
||||
|
||||
def _prompt_with_webroot_list(self, domain: str,
|
||||
known_webroots: List[str]) -> Optional[str]:
|
||||
known_webroots: list[str]) -> Optional[str]:
|
||||
path_flag = "--" + self.option_name("path")
|
||||
|
||||
while True:
|
||||
|
|
@ -253,7 +250,7 @@ to serve all files under specified web root ({0})."""
|
|||
self.performed[root_path].add(achall)
|
||||
return response
|
||||
|
||||
def cleanup(self, achalls: List[AnnotatedChallenge]) -> None: # pylint: disable=missing-function-docstring
|
||||
def cleanup(self, achalls: list[AnnotatedChallenge]) -> None: # pylint: disable=missing-function-docstring
|
||||
for achall in achalls:
|
||||
root_path = self.full_roots.get(achall.domain, None)
|
||||
if root_path is not None:
|
||||
|
|
@ -274,7 +271,7 @@ to serve all files under specified web root ({0})."""
|
|||
logger.info("Not cleaning up the web.config file in %s "
|
||||
"because it is not generated by Certbot.", root_path)
|
||||
|
||||
not_removed: List[str] = []
|
||||
not_removed: list[str] = []
|
||||
while self._created_dirs:
|
||||
path = self._created_dirs.pop()
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -9,9 +9,7 @@ import sys
|
|||
import time
|
||||
import traceback
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Mapping
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
|
|
@ -70,7 +68,7 @@ class AriClientPool:
|
|||
def __init__(self, cli_config: configuration.NamespaceConfig):
|
||||
self._verify_ssl = not cli_config.no_verify_ssl
|
||||
self._user_agent = client.determine_user_agent(cli_config)
|
||||
self._pool: Dict[str, acme_client.ClientV2] = {}
|
||||
self._pool: dict[str, acme_client.ClientV2] = {}
|
||||
|
||||
def get(self, server: str) -> acme_client.ClientV2:
|
||||
"""
|
||||
|
|
@ -196,7 +194,7 @@ def _restore_plugin_configs(config: configuration.NamespaceConfig,
|
|||
# longer defined, stored copies of that parameter will be
|
||||
# deserialized as strings by this logic even if they were
|
||||
# originally meant to be some other type.
|
||||
plugin_prefixes: List[str] = []
|
||||
plugin_prefixes: list[str] = []
|
||||
if renewalparams["authenticator"] == "webroot":
|
||||
_restore_webroot_config(config, renewalparams)
|
||||
else:
|
||||
|
|
@ -245,7 +243,7 @@ def restore_required_config_elements(config: configuration.NamespaceConfig,
|
|||
setattr(config, key, value)
|
||||
|
||||
|
||||
def _remove_deprecated_config_elements(renewalparams: Mapping[str, Any]) -> Dict[str, Any]:
|
||||
def _remove_deprecated_config_elements(renewalparams: Mapping[str, Any]) -> dict[str, Any]:
|
||||
"""Removes deprecated config options from the parsed renewalparams.
|
||||
|
||||
:param dict renewalparams: list of parsed renewalparams
|
||||
|
|
@ -258,7 +256,7 @@ def _remove_deprecated_config_elements(renewalparams: Mapping[str, Any]) -> Dict
|
|||
if option_name not in cli.DEPRECATED_OPTIONS}
|
||||
|
||||
|
||||
def _restore_pref_challs(unused_name: str, value: Union[List[str], str]) -> List[str]:
|
||||
def _restore_pref_challs(unused_name: str, value: Union[list[str], str]) -> list[str]:
|
||||
"""Restores preferred challenges from a renewal config file.
|
||||
|
||||
If value is a `str`, it should be a single challenge type.
|
||||
|
|
@ -525,7 +523,7 @@ def _avoid_reuse_key_conflicts(config: configuration.NamespaceConfig,
|
|||
"add --new-key.")
|
||||
|
||||
|
||||
def renew_cert(config: configuration.NamespaceConfig, domains: Optional[List[str]],
|
||||
def renew_cert(config: configuration.NamespaceConfig, domains: Optional[list[str]],
|
||||
le_client: client.Client, lineage: storage.RenewableCert) -> None:
|
||||
"""Renew a certificate lineage."""
|
||||
renewal_params = lineage.configuration["renewalparams"]
|
||||
|
|
@ -560,9 +558,9 @@ def report(msgs: Iterable[str], category: str) -> str:
|
|||
return " " + "\n ".join(lines)
|
||||
|
||||
|
||||
def _renew_describe_results(config: configuration.NamespaceConfig, renew_successes: List[str],
|
||||
renew_failures: List[str], renew_skipped: List[str],
|
||||
parse_failures: List[str]) -> None:
|
||||
def _renew_describe_results(config: configuration.NamespaceConfig, renew_successes: list[str],
|
||||
renew_failures: list[str], renew_skipped: list[str],
|
||||
parse_failures: list[str]) -> None:
|
||||
"""
|
||||
Print a report to the terminal about the results of the renewal process.
|
||||
|
||||
|
|
|
|||
|
|
@ -3,9 +3,7 @@ from __future__ import annotations
|
|||
import logging
|
||||
import socket
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
from typing import Union
|
||||
|
||||
from requests import PreparedRequest, Session
|
||||
|
|
@ -39,7 +37,7 @@ CURRENT_PYTHON_VERSION_STRING = 'python3.12'
|
|||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def prepare_env(cli_args: List[str]) -> List[str]:
|
||||
def prepare_env(cli_args: list[str]) -> list[str]:
|
||||
"""
|
||||
Prepare runtime environment for a certbot execution in snap.
|
||||
:param list cli_args: List of command line arguments
|
||||
|
|
@ -142,7 +140,7 @@ class _SnapdAdapter(HTTPAdapter):
|
|||
def get_connection_with_tls_context(self, request: PreparedRequest,
|
||||
verify: bool | str | None,
|
||||
proxies: Optional[Iterable[str]] = None,
|
||||
cert: Optional[Union[str, Tuple[str,str]]] = None
|
||||
cert: Optional[Union[str, tuple[str,str]]] = None
|
||||
) -> _SnapdConnectionPool:
|
||||
"""Required method for creating a new connection pool. Simply return our
|
||||
shim that forces a UNIX socket connection to snapd."""
|
||||
|
|
|
|||
|
|
@ -8,12 +8,9 @@ import shutil
|
|||
import stat
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
from typing import Dict
|
||||
from typing import Iterable
|
||||
from typing import List
|
||||
from typing import Mapping
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
from typing import Union
|
||||
|
||||
import configobj
|
||||
|
|
@ -47,7 +44,7 @@ BASE_PRIVKEY_MODE = 0o600
|
|||
# pylint: disable=too-many-lines
|
||||
|
||||
|
||||
def renewal_conf_files(config: configuration.NamespaceConfig) -> List[str]:
|
||||
def renewal_conf_files(config: configuration.NamespaceConfig) -> list[str]:
|
||||
"""Build a list of all renewal configuration files.
|
||||
|
||||
:param configuration.NamespaceConfig config: Configuration object
|
||||
|
|
@ -266,7 +263,7 @@ def _relevant(namespaces: Iterable[str], option: str) -> bool:
|
|||
any(option.startswith(namespace) for namespace in namespaces))
|
||||
|
||||
|
||||
def relevant_values(config: configuration.NamespaceConfig) -> Dict[str, Any]:
|
||||
def relevant_values(config: configuration.NamespaceConfig) -> dict[str, Any]:
|
||||
"""Return a new dict containing only items relevant for renewal.
|
||||
|
||||
:param .NamespaceConfig config: parsed command line
|
||||
|
|
@ -656,7 +653,7 @@ class RenewableCert(interfaces.RenewableCert):
|
|||
# happen as a result of random tampering by a sysadmin, or
|
||||
# filesystem errors, or crashes.)
|
||||
|
||||
def _previous_symlinks(self) -> List[Tuple[str, str]]:
|
||||
def _previous_symlinks(self) -> list[tuple[str, str]]:
|
||||
"""Returns the kind and path of all symlinks used in recovery.
|
||||
|
||||
:returns: list of (kind, symlink) tuples
|
||||
|
|
@ -760,7 +757,7 @@ class RenewableCert(interfaces.RenewableCert):
|
|||
where = os.path.dirname(link)
|
||||
return os.path.join(where, "{0}{1}.pem".format(kind, version))
|
||||
|
||||
def available_versions(self, kind: str) -> List[int]:
|
||||
def available_versions(self, kind: str) -> list[int]:
|
||||
"""Which alternative versions of the specified kind of item exist?
|
||||
|
||||
The archive directory where the current version is stored is
|
||||
|
|
@ -849,7 +846,7 @@ class RenewableCert(interfaces.RenewableCert):
|
|||
:rtype: bool
|
||||
|
||||
"""
|
||||
all_versions: List[int] = []
|
||||
all_versions: list[int] = []
|
||||
for item in ALL_FOUR:
|
||||
version = self.current_version(item)
|
||||
if version is None:
|
||||
|
|
@ -906,7 +903,7 @@ class RenewableCert(interfaces.RenewableCert):
|
|||
for _, link in previous_links:
|
||||
os.unlink(link)
|
||||
|
||||
def names(self) -> List[str]:
|
||||
def names(self) -> list[str]:
|
||||
"""What are the subject names of this certificate?
|
||||
|
||||
:returns: the subject names
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ import copy
|
|||
import io
|
||||
import sys
|
||||
import tempfile
|
||||
from typing import Any, List
|
||||
from typing import Any
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
|
|
@ -85,17 +85,17 @@ class ParseTest(unittest.TestCase):
|
|||
'''Test the cli args entrypoint'''
|
||||
|
||||
@staticmethod
|
||||
def _unmocked_parse(args: List[str]) -> NamespaceConfig:
|
||||
def _unmocked_parse(args: list[str]) -> NamespaceConfig:
|
||||
"""Get result of cli.prepare_and_parse_args."""
|
||||
return cli.prepare_and_parse_args(PLUGINS, args)
|
||||
|
||||
@staticmethod
|
||||
def parse(args: List[str]) -> NamespaceConfig:
|
||||
def parse(args: list[str]) -> NamespaceConfig:
|
||||
"""Mocks certbot._internal.display.obj.get_display and calls _unmocked_parse."""
|
||||
with test_util.patch_display_util():
|
||||
return ParseTest._unmocked_parse(args)
|
||||
|
||||
def _help_output(self, args: List[str]):
|
||||
def _help_output(self, args: list[str]):
|
||||
"Run a command, and return the output string for scrutiny"
|
||||
|
||||
output = io.StringIO()
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@
|
|||
from importlib import reload as reload_module
|
||||
import string
|
||||
import sys
|
||||
from typing import List
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
|
|
@ -30,7 +29,7 @@ class CompleterTest(test_util.TempDirTestCase):
|
|||
if self.tempdir[-1] != os.sep:
|
||||
self.tempdir += os.sep
|
||||
|
||||
self.paths: List[str] = []
|
||||
self.paths: list[str] = []
|
||||
# create some files and directories in temp_dir
|
||||
for c in string.ascii_lowercase:
|
||||
path = os.path.join(self.tempdir, c)
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ import contextlib
|
|||
import signal
|
||||
import sys
|
||||
from typing import Callable
|
||||
from typing import Dict
|
||||
from typing import Union
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
|
@ -28,7 +27,7 @@ def set_signals(sig_handler_dict):
|
|||
def signal_receiver(signums):
|
||||
"""Context manager to catch signals"""
|
||||
signals = []
|
||||
prev_handlers: Dict[int, Union[int, None, Callable]] = get_signals(signums)
|
||||
prev_handlers: dict[int, Union[int, None, Callable]] = get_signals(signums)
|
||||
set_signals({s: lambda s, _: signals.append(s) for s in signums})
|
||||
yield signals
|
||||
set_signals(prev_handlers)
|
||||
|
|
|
|||
|
|
@ -11,7 +11,6 @@ from certbot import util
|
|||
from certbot.compat import filesystem
|
||||
from certbot.compat import os
|
||||
from certbot.tests import util as test_util
|
||||
from typing import List
|
||||
|
||||
|
||||
def pyver_lt(major: int, minor: int):
|
||||
|
|
@ -292,7 +291,7 @@ class RunSavedPostHooksTest(HookTest):
|
|||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.eventually: List[str] = []
|
||||
self.eventually: list[str] = []
|
||||
|
||||
def test_empty(self):
|
||||
assert not self._call_with_mock_execute_and_eventually([], []).called
|
||||
|
|
|
|||
|
|
@ -11,7 +11,6 @@ import shutil
|
|||
import sys
|
||||
import tempfile
|
||||
import traceback
|
||||
from typing import List
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
|
|
@ -1237,7 +1236,7 @@ class MainTest(test_util.ConfigTestCase):
|
|||
@mock.patch('certbot._internal.main.plugins_disco')
|
||||
@mock.patch('certbot._internal.main.cli.HelpfulArgumentParser.determine_help_topics')
|
||||
def test_plugins_no_args(self, _det, mock_disco):
|
||||
ifaces: List[interfaces.Plugin] = []
|
||||
ifaces: list[interfaces.Plugin] = []
|
||||
plugins = mock_disco.PluginsRegistry.find_all()
|
||||
|
||||
stdout = io.StringIO()
|
||||
|
|
@ -1252,7 +1251,7 @@ class MainTest(test_util.ConfigTestCase):
|
|||
@mock.patch('certbot._internal.main.plugins_disco')
|
||||
@mock.patch('certbot._internal.main.cli.HelpfulArgumentParser.determine_help_topics')
|
||||
def test_plugins_no_args_unprivileged(self, _det, mock_disco):
|
||||
ifaces: List[interfaces.Plugin] = []
|
||||
ifaces: list[interfaces.Plugin] = []
|
||||
plugins = mock_disco.PluginsRegistry.find_all()
|
||||
|
||||
def throw_error(directory, mode, strict):
|
||||
|
|
@ -1274,7 +1273,7 @@ class MainTest(test_util.ConfigTestCase):
|
|||
@mock.patch('certbot._internal.main.plugins_disco')
|
||||
@mock.patch('certbot._internal.main.cli.HelpfulArgumentParser.determine_help_topics')
|
||||
def test_plugins_init(self, _det, mock_disco):
|
||||
ifaces: List[interfaces.Plugin] = []
|
||||
ifaces: list[interfaces.Plugin] = []
|
||||
plugins = mock_disco.PluginsRegistry.find_all()
|
||||
|
||||
stdout = io.StringIO()
|
||||
|
|
@ -1290,7 +1289,7 @@ class MainTest(test_util.ConfigTestCase):
|
|||
@mock.patch('certbot._internal.main.plugins_disco')
|
||||
@mock.patch('certbot._internal.main.cli.HelpfulArgumentParser.determine_help_topics')
|
||||
def test_plugins_prepare(self, _det, mock_disco):
|
||||
ifaces: List[interfaces.Plugin] = []
|
||||
ifaces: list[interfaces.Plugin] = []
|
||||
plugins = mock_disco.PluginsRegistry.find_all()
|
||||
|
||||
stdout = io.StringIO()
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@
|
|||
import functools
|
||||
import string
|
||||
import sys
|
||||
from typing import List
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
|
|
@ -264,7 +263,7 @@ class PluginsRegistryTest(unittest.TestCase):
|
|||
self.plugin_ep.prepare.assert_called_once_with()
|
||||
|
||||
def test_prepare_order(self):
|
||||
order: List[str] = []
|
||||
order: list[str] = []
|
||||
plugins = {
|
||||
c: mock.MagicMock(prepare=functools.partial(order.append, c))
|
||||
for c in string.ascii_letters
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
"""Tests for letsencrypt.plugins.selection"""
|
||||
import sys
|
||||
from typing import List
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
|
|
@ -50,7 +49,7 @@ class PickPluginTest(unittest.TestCase):
|
|||
self.default = None
|
||||
self.reg = mock.MagicMock()
|
||||
self.question = "Question?"
|
||||
self.ifaces: List[interfaces.Plugin] = []
|
||||
self.ifaces: list[interfaces.Plugin] = []
|
||||
|
||||
def _call(self):
|
||||
from certbot._internal.plugins.selection import pick_plugin
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@
|
|||
import errno
|
||||
import socket
|
||||
import sys
|
||||
from typing import Set
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
|
|
@ -22,7 +21,7 @@ class ServerManagerTest(unittest.TestCase):
|
|||
|
||||
def setUp(self):
|
||||
from certbot._internal.plugins.standalone import ServerManager
|
||||
self.http_01_resources: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource] = {}
|
||||
self.http_01_resources: set[acme_standalone.HTTP01RequestHandler.HTTP01Resource] = {}
|
||||
self.mgr = ServerManager(self.http_01_resources)
|
||||
|
||||
def test_init(self):
|
||||
|
|
|
|||
Loading…
Reference in a new issue