Fix remaining types

This commit is contained in:
Adrien Ferrand 2022-01-19 23:02:08 +01:00
parent d27a3a3216
commit 7eb6bbae82
6 changed files with 25 additions and 14 deletions

View file

@ -48,7 +48,8 @@ def _suppress_x509_verification_warnings() -> None:
# Handle old versions of request with vendorized urllib3
# pylint: disable=no-member
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
requests.packages.urllib3.disable_warnings( # type: ignore[attr-defined]
InsecureRequestWarning)
def check_until_timeout(url: str, attempts: int = 30) -> None:

View file

@ -138,7 +138,7 @@ class _RFC2136Client:
except Exception as e:
raise errors.PluginError('Encountered error adding TXT record: {0}'
.format(e))
rcode = response.rcode()
rcode = response.rcode() # type: ignore[attr-defined]
if rcode == dns.rcode.NOERROR:
logger.debug('Successfully added TXT record %s', record_name)
@ -173,7 +173,7 @@ class _RFC2136Client:
except Exception as e:
raise errors.PluginError('Encountered error deleting TXT record: {0}'
.format(e))
rcode = response.rcode()
rcode = response.rcode() # type: ignore[attr-defined]
if rcode == dns.rcode.NOERROR:
logger.debug('Successfully deleted TXT record %s', record_name)
@ -223,11 +223,13 @@ class _RFC2136Client:
except (OSError, dns.exception.Timeout) as e:
logger.debug('TCP query failed, fallback to UDP: %s', e)
response = dns.query.udp(request, self.server, self._default_timeout, self.port)
rcode = response.rcode()
rcode = response.rcode() # type: ignore[attr-defined]
# Authoritative Answer bit should be set
if (rcode == dns.rcode.NOERROR and response.get_rrset(response.answer,
domain, dns.rdataclass.IN, dns.rdatatype.SOA) and response.flags & dns.flags.AA):
if (rcode == dns.rcode.NOERROR
and response.get_rrset(response.answer, # type: ignore[attr-defined]
domain, dns.rdataclass.IN, dns.rdatatype.SOA)
and response.flags & dns.flags.AA):
logger.debug('Received authoritative SOA response for %s', domain_name)
return True

View file

@ -1180,7 +1180,7 @@ class NginxConfigurator(common.Configurator):
# Entry point in main.py for performing challenges
def perform(self, achalls: List[achallenges.AnnotatedChallenge]
) -> List[challenges.HTTP01Response]:
) -> List[challenges.ChallengeResponse]:
"""Perform the configuration related challenge.
This function currently assumes all challenges will be fulfilled.
@ -1189,10 +1189,13 @@ class NginxConfigurator(common.Configurator):
"""
self._chall_out += len(achalls)
responses: List[Optional[challenges.HTTP01Response]] = [None] * len(achalls)
responses: List[Optional[challenges.ChallengeResponse]] = [None] * len(achalls)
http_doer = http_01.NginxHttp01(self)
key_achalls = [achall for achall in achalls
if isinstance(achall, achallenges.KeyAuthorizationAnnotatedChallenge)]
for i, achall in enumerate(achalls):
for i, achall in enumerate(key_achalls):
# Currently also have chall_doer hold associated index of the
# challenge. This helps to put all of the responses back together
# when they are all complete.

View file

@ -11,7 +11,7 @@ from certbot_nginx._internal import nginxparser
from certbot_nginx._internal.obj import Addr
from acme import challenges
from acme.challenges import HTTP01Response
from acme.challenges import KeyAuthorizationChallengeResponse
from certbot import errors
from certbot.achallenges import KeyAuthorizationAnnotatedChallenge
from certbot.compat import os
@ -49,10 +49,10 @@ class NginxHttp01(common.ChallengePerformer):
self.challenge_conf = os.path.join(
configurator.config.config_dir, "le_http_01_cert_challenge.conf")
def perform(self) -> List[HTTP01Response]:
def perform(self) -> List[KeyAuthorizationChallengeResponse]:
"""Perform a challenge on Nginx.
:returns: list of :class:`certbot.acme.challenges.HTTP01Response`
:returns: list of :class:`acme.challenges.KeyAuthorizationChallengeResponse`
:rtype: list
"""

View file

@ -1,6 +1,7 @@
"""Very low-level nginx config parser based on pyparsing."""
# Forked from https://github.com/fatiherikli/nginxparser (MIT Licensed)
import copy
from distutils.log import error
import logging
import typing
from typing import Any
@ -167,8 +168,10 @@ class UnspacedList(List[Any]):
inbound = UnspacedList(inbound)
return inbound, inbound.spaced
def insert(self, i: int, x: Any) -> None:
def insert(self, i: SupportsIndex, x: Any) -> None:
"""Insert object before index."""
if not isinstance(i, int):
raise ValueError("Only integers are supported")
item, spaced_item = self._coerce(x)
slicepos = self._spaced_position(i) if i < len(self) else len(self.spaced)
self.spaced.insert(slicepos, spaced_item)

View file

@ -16,6 +16,8 @@ from typing import Tuple
import pkg_resources
from acme import challenges
from certbot import achallenges
from certbot import configuration
from certbot import crypto_util
@ -372,7 +374,7 @@ class ChallengePerformer:
if idx is not None:
self.indices.append(idx)
def perform(self) -> List[achallenges.KeyAuthorizationAnnotatedChallenge]:
def perform(self) -> List[challenges.KeyAuthorizationChallengeResponse]:
"""Perform all added challenges.
:returns: challenge responses