mirror of
https://github.com/certbot/certbot.git
synced 2026-05-28 04:34:11 -04:00
Upgrade to mypy 0.812 (#8748)
Fixes #8425 This PR upgrades mypy to the latest version available, 0.812. Given the advanced type inference capabilities provided by this newer version, this PRs also fixes various type inconsistencies that are now detected. Here are the non obvious changes done to fix types: * typing in mixins has been solved using `Protocol` classes, as recommended by mypy (https://mypy.readthedocs.io/en/latest/more_types.html#mixin-classes, https://mypy.readthedocs.io/en/stable/protocols.html) * `cast` when we are playing with `Union` types This PR also disables the strict optional checks that have been enable by default in recent versions of mypy. Once this PR is merged, I will create an issue to study how these checks can be enabled. `typing.Protocol` is available only since Python 3.8. To keep compatibility with Python 3.6, I try to import the class `Protocol` from `typing`, and fallback to assign `object` to `Protocol` if that fails. This way the code is working with all versions of Python, but the mypy check can be run only with Python 3.8+ because it needs the protocol feature. As a consequence, tox runs mypy under Python 3.8. Alternatives are: * importing `typing_extensions`, that proposes backport of newest typing features to Python 3.6, but this implies to add a dependency to Certbot just to run mypy * redesign the concerned classes to not use mixins, or use them differently, but this implies to modify the code itself even if there is nothing wrong with it and it is just a matter of instructing mypy to understand in which context the mixins can be used * ignoring type for these classes with `# type: ignore` but we loose the benefit of mypy for them * Upgrade mypy * First step for acme * Cast for the rescue * Fixing types for certbot * Fix typing for certbot-nginx * Finalize type fixes, configure no optional strict check for mypy in tox * Align requirements * Isort * Pylint * Protocol for python 3.6 * Use Python 3.9 for mypy, make code compatible with Python 3.8< * Pylint and mypy * Pragma no cover * Pythonic NotImplemented constant * More type definitions * Add comments * Simplify typing logic * Use vararg tuple * Relax constraints on mypy * Add more type * Do not silence error if target is not defined * Conditionally import Protocol for type checking only * Clean up imports * Add comments * Align python version linting with mypy and coverage * Just ignore types in an unused module * Add comments * Fix lint
This commit is contained in:
parent
584a1a3ece
commit
06a53cb7df
35 changed files with 248 additions and 111 deletions
|
|
@ -40,13 +40,13 @@ jobs:
|
|||
IMAGE_NAME: ubuntu-18.04
|
||||
PYTHON_VERSION: 3.9
|
||||
TOXENV: py39-cover
|
||||
linux-py37-lint:
|
||||
linux-py39-lint:
|
||||
IMAGE_NAME: ubuntu-18.04
|
||||
PYTHON_VERSION: 3.7
|
||||
PYTHON_VERSION: 3.9
|
||||
TOXENV: lint
|
||||
linux-py36-mypy:
|
||||
linux-py39-mypy:
|
||||
IMAGE_NAME: ubuntu-18.04
|
||||
PYTHON_VERSION: 3.6
|
||||
PYTHON_VERSION: 3.9
|
||||
TOXENV: mypy
|
||||
linux-integration:
|
||||
IMAGE_NAME: ubuntu-18.04
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import functools
|
|||
import hashlib
|
||||
import logging
|
||||
import socket
|
||||
from typing import Type
|
||||
|
||||
from cryptography.hazmat.primitives import hashes # type: ignore
|
||||
import josepy as jose
|
||||
|
|
@ -152,8 +153,8 @@ class KeyAuthorizationChallenge(_TokenChallenge, metaclass=abc.ABCMeta):
|
|||
that will be used to generate ``response``.
|
||||
:param str typ: type of the challenge
|
||||
"""
|
||||
typ = NotImplemented
|
||||
response_cls = NotImplemented
|
||||
typ: str = NotImplemented
|
||||
response_cls: Type[KeyAuthorizationChallengeResponse] = NotImplemented
|
||||
thumbprint_hash_function = (
|
||||
KeyAuthorizationChallengeResponse.thumbprint_hash_function)
|
||||
|
||||
|
|
|
|||
|
|
@ -8,10 +8,12 @@ import http.client as http_client
|
|||
import logging
|
||||
import re
|
||||
import time
|
||||
from typing import cast
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Set
|
||||
from typing import Text
|
||||
from typing import Union
|
||||
|
||||
import josepy as jose
|
||||
import OpenSSL
|
||||
|
|
@ -818,6 +820,7 @@ class BackwardsCompatibleClientV2:
|
|||
def __init__(self, net, key, server):
|
||||
directory = messages.Directory.from_json(net.get(server).json())
|
||||
self.acme_version = self._acme_version_from_directory(directory)
|
||||
self.client: Union[Client, ClientV2]
|
||||
if self.acme_version == 1:
|
||||
self.client = Client(directory, key=key, net=net)
|
||||
else:
|
||||
|
|
@ -837,16 +840,18 @@ class BackwardsCompatibleClientV2:
|
|||
if check_tos_cb is not None:
|
||||
check_tos_cb(tos)
|
||||
if self.acme_version == 1:
|
||||
regr = self.client.register(regr)
|
||||
client_v1 = cast(Client, self.client)
|
||||
regr = client_v1.register(regr)
|
||||
if regr.terms_of_service is not None:
|
||||
_assess_tos(regr.terms_of_service)
|
||||
return self.client.agree_to_tos(regr)
|
||||
return client_v1.agree_to_tos(regr)
|
||||
return regr
|
||||
else:
|
||||
if "terms_of_service" in self.client.directory.meta:
|
||||
_assess_tos(self.client.directory.meta.terms_of_service)
|
||||
client_v2 = cast(ClientV2, self.client)
|
||||
if "terms_of_service" in client_v2.directory.meta:
|
||||
_assess_tos(client_v2.directory.meta.terms_of_service)
|
||||
regr = regr.update(terms_of_service_agreed=True)
|
||||
return self.client.new_account(regr)
|
||||
return client_v2.new_account(regr)
|
||||
|
||||
def new_order(self, csr_pem):
|
||||
"""Request a new Order object from the server.
|
||||
|
|
@ -864,14 +869,15 @@ class BackwardsCompatibleClientV2:
|
|||
|
||||
"""
|
||||
if self.acme_version == 1:
|
||||
client_v1 = cast(Client, self.client)
|
||||
csr = OpenSSL.crypto.load_certificate_request(OpenSSL.crypto.FILETYPE_PEM, csr_pem)
|
||||
# pylint: disable=protected-access
|
||||
dnsNames = crypto_util._pyopenssl_cert_or_req_all_names(csr)
|
||||
authorizations = []
|
||||
for domain in dnsNames:
|
||||
authorizations.append(self.client.request_domain_challenges(domain))
|
||||
authorizations.append(client_v1.request_domain_challenges(domain))
|
||||
return messages.OrderResource(authorizations=authorizations, csr_pem=csr_pem)
|
||||
return self.client.new_order(csr_pem)
|
||||
return cast(ClientV2, self.client).new_order(csr_pem)
|
||||
|
||||
def finalize_order(self, orderr, deadline, fetch_alternative_chains=False):
|
||||
"""Finalize an order and obtain a certificate.
|
||||
|
|
@ -886,8 +892,9 @@ class BackwardsCompatibleClientV2:
|
|||
|
||||
"""
|
||||
if self.acme_version == 1:
|
||||
client_v1 = cast(Client, self.client)
|
||||
csr_pem = orderr.csr_pem
|
||||
certr = self.client.request_issuance(
|
||||
certr = client_v1.request_issuance(
|
||||
jose.ComparableX509(
|
||||
OpenSSL.crypto.load_certificate_request(OpenSSL.crypto.FILETYPE_PEM, csr_pem)),
|
||||
orderr.authorizations)
|
||||
|
|
@ -895,7 +902,7 @@ class BackwardsCompatibleClientV2:
|
|||
chain = None
|
||||
while datetime.datetime.now() < deadline:
|
||||
try:
|
||||
chain = self.client.fetch_chain(certr)
|
||||
chain = client_v1.fetch_chain(certr)
|
||||
break
|
||||
except errors.Error:
|
||||
time.sleep(1)
|
||||
|
|
@ -910,7 +917,8 @@ class BackwardsCompatibleClientV2:
|
|||
chain = crypto_util.dump_pyopenssl_chain(chain).decode()
|
||||
|
||||
return orderr.update(fullchain_pem=(cert + chain))
|
||||
return self.client.finalize_order(orderr, deadline, fetch_alternative_chains)
|
||||
return cast(ClientV2, self.client).finalize_order(
|
||||
orderr, deadline, fetch_alternative_chains)
|
||||
|
||||
def revoke(self, cert, rsn):
|
||||
"""Revoke certificate.
|
||||
|
|
@ -936,7 +944,7 @@ class BackwardsCompatibleClientV2:
|
|||
Always return False for ACMEv1 servers, as it doesn't use External Account Binding."""
|
||||
if self.acme_version == 1:
|
||||
return False
|
||||
return self.client.external_account_required()
|
||||
return cast(ClientV2, self.client).external_account_required()
|
||||
|
||||
|
||||
class ClientNetwork:
|
||||
|
|
@ -1129,6 +1137,7 @@ class ClientNetwork:
|
|||
|
||||
# If content is DER, log the base64 of it instead of raw bytes, to keep
|
||||
# binary data out of the logs.
|
||||
debug_content: Union[bytes, str]
|
||||
if response.headers.get("Content-Type") == DER_CONTENT_TYPE:
|
||||
debug_content = base64.b64encode(response.content)
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -14,7 +14,9 @@ class Header(jose.Header):
|
|||
kid = jose.Field('kid', omitempty=True)
|
||||
url = jose.Field('url', omitempty=True)
|
||||
|
||||
@nonce.decoder
|
||||
# Mypy does not understand the josepy magic happening here, and falsely claims
|
||||
# that nonce is redefined. Let's ignore the type check here.
|
||||
@nonce.decoder # type: ignore
|
||||
def nonce(value): # pylint: disable=no-self-argument,missing-function-docstring
|
||||
try:
|
||||
return jose.decode_b64jose(value)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,9 @@
|
|||
"""ACME protocol messages."""
|
||||
import json
|
||||
from collections.abc import Hashable
|
||||
import json
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import Type
|
||||
|
||||
import josepy as jose
|
||||
|
||||
|
|
@ -87,7 +90,9 @@ class Error(jose.JSONObjectWithFields, errors.Error):
|
|||
raise ValueError("The supplied code: %s is not a known ACME error"
|
||||
" code" % code)
|
||||
typ = ERROR_PREFIX + code
|
||||
return cls(typ=typ, **kwargs)
|
||||
# Mypy will not understand that the Error constructor accepts a named argument
|
||||
# "typ" because of josepy magic. Let's ignore the type check here.
|
||||
return cls(typ=typ, **kwargs) # type: ignore
|
||||
|
||||
@property
|
||||
def description(self):
|
||||
|
|
@ -124,7 +129,7 @@ class Error(jose.JSONObjectWithFields, errors.Error):
|
|||
class _Constant(jose.JSONDeSerializable, Hashable): # type: ignore
|
||||
"""ACME constant."""
|
||||
__slots__ = ('name',)
|
||||
POSSIBLE_NAMES = NotImplemented
|
||||
POSSIBLE_NAMES: Dict[str, '_Constant'] = NotImplemented
|
||||
|
||||
def __init__(self, name):
|
||||
super(_Constant, self).__init__()
|
||||
|
|
@ -166,7 +171,7 @@ STATUS_DEACTIVATED = Status('deactivated')
|
|||
|
||||
class IdentifierType(_Constant):
|
||||
"""ACME identifier type."""
|
||||
POSSIBLE_NAMES: dict = {}
|
||||
POSSIBLE_NAMES: Dict[str, 'IdentifierType'] = {}
|
||||
IDENTIFIER_FQDN = IdentifierType('dns') # IdentifierDNS in Boulder
|
||||
|
||||
|
||||
|
|
@ -184,7 +189,7 @@ class Identifier(jose.JSONObjectWithFields):
|
|||
class Directory(jose.JSONDeSerializable):
|
||||
"""Directory."""
|
||||
|
||||
_REGISTERED_TYPES: dict = {}
|
||||
_REGISTERED_TYPES: Dict[str, Type[Any]] = {}
|
||||
|
||||
class Meta(jose.JSONObjectWithFields):
|
||||
"""Directory Meta."""
|
||||
|
|
@ -218,7 +223,7 @@ class Directory(jose.JSONDeSerializable):
|
|||
return getattr(key, 'resource_type', key)
|
||||
|
||||
@classmethod
|
||||
def register(cls, resource_body_cls):
|
||||
def register(cls, resource_body_cls: Type[Any]) -> Type[Any]:
|
||||
"""Register resource."""
|
||||
resource_type = resource_body_cls.resource_type
|
||||
assert resource_type not in cls._REGISTERED_TYPES
|
||||
|
|
@ -528,7 +533,9 @@ class Authorization(ResourceBody):
|
|||
expires = fields.RFC3339Field('expires', omitempty=True)
|
||||
wildcard = jose.Field('wildcard', omitempty=True)
|
||||
|
||||
@challenges.decoder
|
||||
# Mypy does not understand the josepy magic happening here, and falsely claims
|
||||
# that challenge is redefined. Let's ignore the type check here.
|
||||
@challenges.decoder # type: ignore
|
||||
def challenges(value): # pylint: disable=no-self-argument,missing-function-docstring
|
||||
return tuple(ChallengeBody.from_json(chall) for chall in value)
|
||||
|
||||
|
|
@ -627,7 +634,9 @@ class Order(ResourceBody):
|
|||
expires = fields.RFC3339Field('expires', omitempty=True)
|
||||
error = jose.Field('error', omitempty=True, decoder=Error.from_json)
|
||||
|
||||
@identifiers.decoder
|
||||
# Mypy does not understand the josepy magic happening here, and falsely claims
|
||||
# that identifiers is redefined. Let's ignore the type check here.
|
||||
@identifiers.decoder # type: ignore
|
||||
def identifiers(value): # pylint: disable=no-self-argument,missing-function-docstring
|
||||
return tuple(Identifier.from_json(identifier) for identifier in value)
|
||||
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ class BaseDualNetworkedServers:
|
|||
def __init__(self, ServerClass, server_address, *remaining_args, **kwargs):
|
||||
port = server_address[1]
|
||||
self.threads: List[threading.Thread] = []
|
||||
self.servers: List[ACMEServerMixin] = []
|
||||
self.servers: List[socketserver.BaseServer] = []
|
||||
|
||||
# Must try True first.
|
||||
# Ubuntu, for example, will fail to bind to IPv4 if we've already bound
|
||||
|
|
@ -203,8 +203,24 @@ class HTTP01RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
|
|||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.simple_http_resources = kwargs.pop("simple_http_resources", set())
|
||||
self.timeout = kwargs.pop('timeout', 30)
|
||||
self._timeout = kwargs.pop('timeout', 30)
|
||||
BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
|
||||
self.server: HTTP01Server
|
||||
|
||||
# In parent class BaseHTTPRequestHandler, 'timeout' is a class-level property but we
|
||||
# need to define its value during the initialization phase in HTTP01RequestHandler.
|
||||
# However MyPy does not appreciate that we dynamically shadow a class-level property
|
||||
# with an instance-level property (eg. self.timeout = ... in __init__()). So to make
|
||||
# everyone happy, we statically redefine 'timeout' as a method property, and set the
|
||||
# timeout value in a new internal instance-level property _timeout.
|
||||
@property
|
||||
def timeout(self):
|
||||
"""
|
||||
The default timeout this server should apply to requests.
|
||||
:return: timeout to apply
|
||||
:rtype: int
|
||||
"""
|
||||
return self._timeout
|
||||
|
||||
def log_message(self, format, *args): # pylint: disable=redefined-builtin
|
||||
"""Log arbitrary message."""
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
""" apacheconfig implementation of the ParserNode interfaces """
|
||||
from typing import Tuple
|
||||
|
||||
from certbot_apache._internal import assertions
|
||||
from certbot_apache._internal import interfaces
|
||||
|
|
@ -21,7 +22,7 @@ class ApacheParserNode(interfaces.ParserNode):
|
|||
self.metadata = metadata
|
||||
self._raw = self.metadata["ac_ast"]
|
||||
|
||||
def save(self, msg): # pragma: no cover
|
||||
def save(self, msg): # pragma: no cover
|
||||
pass
|
||||
|
||||
def find_ancestors(self, name): # pylint: disable=unused-variable
|
||||
|
|
@ -83,7 +84,7 @@ class ApacheBlockNode(ApacheDirectiveNode):
|
|||
|
||||
def __init__(self, **kwargs):
|
||||
super(ApacheBlockNode, self).__init__(**kwargs)
|
||||
self.children = ()
|
||||
self.children: Tuple[ApacheParserNode, ...] = ()
|
||||
|
||||
def __eq__(self, other): # pragma: no cover
|
||||
if isinstance(other, self.__class__):
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import logging
|
|||
import re
|
||||
import socket
|
||||
import time
|
||||
from typing import cast
|
||||
from typing import DefaultDict
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
|
|
@ -156,9 +157,9 @@ class ApacheConfigurator(common.Installer):
|
|||
self.options[o] = self.OS_DEFAULTS[o]
|
||||
|
||||
# Special cases
|
||||
self.options["version_cmd"][0] = self.option("ctl")
|
||||
self.options["restart_cmd"][0] = self.option("ctl")
|
||||
self.options["conftest_cmd"][0] = self.option("ctl")
|
||||
cast(List[str], self.options["version_cmd"])[0] = self.option("ctl")
|
||||
cast(List[str], self.options["restart_cmd"])[0] = self.option("ctl")
|
||||
cast(List[str], self.options["conftest_cmd"])[0] = self.option("ctl")
|
||||
|
||||
@classmethod
|
||||
def add_parser_arguments(cls, add):
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
""" Distribution specific override class for CentOS family (RHEL, Fedora) """
|
||||
import logging
|
||||
from typing import cast
|
||||
from typing import List
|
||||
|
||||
import zope.interface
|
||||
|
|
@ -76,7 +77,7 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
|
|||
alternative restart cmd used in CentOS.
|
||||
"""
|
||||
super(CentOSConfigurator, self)._prepare_options()
|
||||
self.options["restart_cmd_alt"][0] = self.option("ctl")
|
||||
cast(List[str], self.options["restart_cmd_alt"])[0] = self.option("ctl")
|
||||
|
||||
def get_parser(self):
|
||||
"""Initializes the ApacheParser"""
|
||||
|
|
|
|||
|
|
@ -1,4 +1,7 @@
|
|||
""" Distribution specific override class for Fedora 29+ """
|
||||
from typing import cast
|
||||
from typing import List
|
||||
|
||||
import zope.interface
|
||||
|
||||
from certbot import errors
|
||||
|
|
@ -69,9 +72,9 @@ class FedoraConfigurator(configurator.ApacheConfigurator):
|
|||
of Fedora to restart httpd.
|
||||
"""
|
||||
super(FedoraConfigurator, self)._prepare_options()
|
||||
self.options["restart_cmd"][0] = 'apachectl'
|
||||
self.options["restart_cmd_alt"][0] = 'apachectl'
|
||||
self.options["conftest_cmd"][0] = 'apachectl'
|
||||
cast(List[str], self.options["restart_cmd"])[0] = 'apachectl'
|
||||
cast(List[str], self.options["restart_cmd_alt"])[0] = 'apachectl'
|
||||
cast(List[str], self.options["conftest_cmd"])[0] = 'apachectl'
|
||||
|
||||
|
||||
class FedoraParser(parser.ApacheParser):
|
||||
|
|
|
|||
|
|
@ -1,4 +1,7 @@
|
|||
""" Distribution specific override class for Gentoo Linux """
|
||||
from typing import cast
|
||||
from typing import List
|
||||
|
||||
import zope.interface
|
||||
|
||||
from certbot import interfaces
|
||||
|
|
@ -36,7 +39,7 @@ class GentooConfigurator(configurator.ApacheConfigurator):
|
|||
alternative restart cmd used in Gentoo.
|
||||
"""
|
||||
super(GentooConfigurator, self)._prepare_options()
|
||||
self.options["restart_cmd_alt"][0] = self.option("ctl")
|
||||
cast(List[str], self.options["restart_cmd_alt"])[0] = self.option("ctl")
|
||||
|
||||
def get_parser(self):
|
||||
"""Initializes the ApacheParser"""
|
||||
|
|
|
|||
|
|
@ -8,19 +8,20 @@ import shutil
|
|||
import subprocess
|
||||
import time
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import SECP256R1, SECP384R1, SECP521R1
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import SECP256R1
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import SECP384R1
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import SECP521R1
|
||||
from cryptography.x509 import NameOID
|
||||
|
||||
import pytest
|
||||
|
||||
from certbot_integration_tests.certbot_tests import context as certbot_context
|
||||
from certbot_integration_tests.certbot_tests.assertions import assert_cert_count_for_lineage
|
||||
from certbot_integration_tests.certbot_tests.assertions import assert_elliptic_key
|
||||
from certbot_integration_tests.certbot_tests.assertions import assert_rsa_key
|
||||
from certbot_integration_tests.certbot_tests.assertions import assert_equals_group_owner
|
||||
from certbot_integration_tests.certbot_tests.assertions import assert_equals_group_permissions
|
||||
from certbot_integration_tests.certbot_tests.assertions import assert_equals_world_read_permissions
|
||||
from certbot_integration_tests.certbot_tests.assertions import assert_hook_execution
|
||||
from certbot_integration_tests.certbot_tests.assertions import assert_rsa_key
|
||||
from certbot_integration_tests.certbot_tests.assertions import assert_saved_renew_hook
|
||||
from certbot_integration_tests.certbot_tests.assertions import assert_world_no_permissions
|
||||
from certbot_integration_tests.certbot_tests.assertions import assert_world_read_permissions
|
||||
|
|
|
|||
|
|
@ -1,10 +1,10 @@
|
|||
#!/usr/bin/env python
|
||||
"""Module to call certbot in test mode"""
|
||||
|
||||
from distutils.version import LooseVersion
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from distutils.version import LooseVersion
|
||||
|
||||
import certbot_integration_tests
|
||||
# pylint: disable=wildcard-import,unused-wildcard-import
|
||||
|
|
|
|||
|
|
@ -29,10 +29,7 @@ class _ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
|
|||
request = requests.get(PEBBLE_MANAGEMENT_URL + '/intermediates/0', verify=False)
|
||||
issuer_cert = x509.load_pem_x509_certificate(request.content, default_backend())
|
||||
|
||||
try:
|
||||
content_len = int(self.headers.getheader('content-length', 0))
|
||||
except AttributeError:
|
||||
content_len = int(self.headers.get('Content-Length'))
|
||||
content_len = int(self.headers.get('Content-Length'))
|
||||
|
||||
ocsp_request = ocsp.load_der_ocsp_request(self.rfile.read(content_len))
|
||||
response = requests.get('{0}/cert-status-by-serial/{1}'.format(
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
"""Module contains classes used by the Nginx Configurator."""
|
||||
import re
|
||||
|
||||
|
||||
from certbot.plugins import common
|
||||
|
||||
ADD_HEADER_DIRECTIVE = 'add_header'
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ from certbot import errors
|
|||
from certbot.compat import os
|
||||
from certbot_nginx._internal import nginxparser
|
||||
from certbot_nginx._internal import obj
|
||||
from certbot_nginx._internal.nginxparser import UnspacedList
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -243,6 +244,8 @@ class NginxParser:
|
|||
tree = self.parsed[filename]
|
||||
if ext:
|
||||
filename = filename + os.path.extsep + ext
|
||||
if not isinstance(tree, UnspacedList):
|
||||
raise ValueError(f"Error tree {tree} is not an UnspacedList")
|
||||
try:
|
||||
if lazy and not tree.is_dirty():
|
||||
continue
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
# type: ignore
|
||||
# This module is not used for now, so we just skip type check for the sake of simplicity.
|
||||
""" This file contains parsing routines and object classes to help derive meaning from
|
||||
raw lists of tokens from pyparsing. """
|
||||
|
||||
|
|
@ -204,7 +206,7 @@ class Sentence(Parsable):
|
|||
:returns: whether this lists is parseable by `Sentence`.
|
||||
"""
|
||||
return isinstance(lists, list) and len(lists) > 0 and \
|
||||
all(isinstance(elem, str) for elem in lists)
|
||||
all(isinstance(elem, str) for elem in lists)
|
||||
|
||||
def parse(self, raw_list, add_spaces=False):
|
||||
""" Parses a list of string types into this object.
|
||||
|
|
@ -212,7 +214,7 @@ class Sentence(Parsable):
|
|||
if add_spaces:
|
||||
raw_list = _space_list(raw_list)
|
||||
if not isinstance(raw_list, list) or \
|
||||
any(not isinstance(elem, str) for elem in raw_list):
|
||||
any(not isinstance(elem, str) for elem in raw_list):
|
||||
raise errors.MisconfigurationError("Sentence parsing expects a list of string types.")
|
||||
self._data = raw_list
|
||||
|
||||
|
|
@ -283,7 +285,7 @@ class Block(Parsable):
|
|||
|
||||
:returns: whether this lists is parseable by `Block`. """
|
||||
return isinstance(lists, list) and len(lists) == 2 and \
|
||||
Sentence.should_parse(lists[0]) and isinstance(lists[1], list)
|
||||
Sentence.should_parse(lists[0]) and isinstance(lists[1], list)
|
||||
|
||||
def set_tabs(self, tabs=" "):
|
||||
""" Sets tabs by setting equivalent tabbing on names, then adding tabbing
|
||||
|
|
|
|||
|
|
@ -44,7 +44,6 @@ from certbot._internal.plugins import disco as plugins_disco
|
|||
import certbot._internal.plugins.selection as plugin_selection
|
||||
import certbot.plugins.enhancements as enhancements
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -116,6 +116,8 @@ class HelpfulArgumentParser:
|
|||
# This is the only way to turn off overly verbose config flag documentation
|
||||
self.parser._add_config_file_help = False
|
||||
|
||||
self.verb: str
|
||||
|
||||
# Help that are synonyms for --help subcommands
|
||||
COMMANDS_TOPICS = ["command", "commands", "subcommand", "subcommands", "verbs"]
|
||||
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@
|
|||
import datetime
|
||||
import logging
|
||||
import platform
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
|
@ -391,7 +390,7 @@ class Client:
|
|||
return cert, chain, key, csr
|
||||
|
||||
def _get_order_and_authorizations(self, csr_pem: str,
|
||||
best_effort: bool) -> List[messages.OrderResource]:
|
||||
best_effort: bool) -> messages.OrderResource:
|
||||
"""Request a new order and complete its authorizations.
|
||||
|
||||
:param str csr_pem: A CSR in PEM format.
|
||||
|
|
|
|||
|
|
@ -109,11 +109,12 @@ def post_arg_parse_setup(config):
|
|||
|
||||
root_logger.addHandler(file_handler)
|
||||
root_logger.removeHandler(memory_handler)
|
||||
temp_handler = memory_handler.target # pylint: disable=no-member
|
||||
temp_handler = getattr(memory_handler, 'target', None)
|
||||
memory_handler.setTarget(file_handler) # pylint: disable=no-member
|
||||
memory_handler.flush(force=True) # pylint: disable=unexpected-keyword-arg
|
||||
memory_handler.close()
|
||||
temp_handler.close()
|
||||
if temp_handler:
|
||||
temp_handler.close()
|
||||
|
||||
if config.quiet:
|
||||
level = constants.QUIET_LOGGING_LEVEL
|
||||
|
|
@ -205,7 +206,7 @@ class MemoryHandler(logging.handlers.MemoryHandler):
|
|||
"""Close the memory handler, but don't set the target to None."""
|
||||
# This allows the logging module which may only have a weak
|
||||
# reference to the target handler to properly flush and close it.
|
||||
target = self.target
|
||||
target = getattr(self, 'target')
|
||||
super(MemoryHandler, self).close()
|
||||
self.target = target
|
||||
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ class Reporter:
|
|||
LOW_PRIORITY = 2
|
||||
"""Low priority constant. See `add_message`."""
|
||||
|
||||
_msg_type = collections.namedtuple('ReporterMsg', 'priority text on_crash')
|
||||
_msg_type = collections.namedtuple('_msg_type', 'priority text on_crash')
|
||||
|
||||
def __init__(self, config):
|
||||
self.messages: queue.PriorityQueue[Reporter._msg_type] = queue.PriorityQueue()
|
||||
|
|
|
|||
|
|
@ -5,14 +5,14 @@ import logging
|
|||
import re
|
||||
import shutil
|
||||
import stat
|
||||
|
||||
from typing import Optional
|
||||
|
||||
import configobj
|
||||
import parsedatetime
|
||||
import pytz
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
|
||||
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
||||
import parsedatetime
|
||||
import pytz
|
||||
|
||||
import certbot
|
||||
from certbot import crypto_util
|
||||
|
|
|
|||
|
|
@ -18,15 +18,16 @@ Note, that all annotated challenges act as a proxy objects::
|
|||
|
||||
"""
|
||||
import logging
|
||||
from typing import Type
|
||||
|
||||
import josepy as jose
|
||||
|
||||
from acme import challenges
|
||||
from acme.challenges import Challenge
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
||||
class AnnotatedChallenge(jose.ImmutableMap):
|
||||
"""Client annotated challenge.
|
||||
|
||||
|
|
@ -37,7 +38,7 @@ class AnnotatedChallenge(jose.ImmutableMap):
|
|||
|
||||
"""
|
||||
__slots__ = ('challb',)
|
||||
acme_type = NotImplemented
|
||||
_acme_type: Type[Challenge] = NotImplemented
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.challb, name)
|
||||
|
|
|
|||
|
|
@ -3,6 +3,10 @@ This compat module wraps os.path to forbid some functions.
|
|||
|
||||
isort:skip_file
|
||||
"""
|
||||
|
||||
# NB: Each function defined in compat._path is marked with "type: ignore" to avoid mypy
|
||||
# to complain that a function is redefined (because we imported if first from os.path).
|
||||
|
||||
# pylint: disable=function-redefined
|
||||
from __future__ import absolute_import
|
||||
|
||||
|
|
@ -29,7 +33,7 @@ del ourselves, std_os_path, std_sys
|
|||
|
||||
|
||||
# Function os.path.realpath is broken on some versions of Python for Windows.
|
||||
def realpath(*unused_args, **unused_kwargs):
|
||||
def realpath(*unused_args, **unused_kwargs): # type: ignore
|
||||
"""Method os.path.realpath() is forbidden"""
|
||||
raise RuntimeError('Usage of os.path.realpath() is forbidden. '
|
||||
'Use certbot.compat.filesystem.realpath() instead.')
|
||||
|
|
|
|||
|
|
@ -6,12 +6,15 @@ This module is intended to replace standard os module throughout certbot project
|
|||
This module has the same API as the os module in the Python standard library
|
||||
except for the functions defined below.
|
||||
|
||||
isort:skip_file
|
||||
"""
|
||||
|
||||
# NOTE: If adding a new documented function to compat.os, ensure that it is added to the
|
||||
# NB1: If adding a new documented function to compat.os, ensure that it is added to the
|
||||
# ':members:' list in certbot/docs/api/certbot.compat.os.rst.
|
||||
|
||||
# isort:skip_file
|
||||
# NB2: Each function defined in compat.os is marked with "type: ignore" to avoid mypy
|
||||
# to complain that a function is redefined (because we imported if first from os).
|
||||
|
||||
# pylint: disable=function-redefined
|
||||
from __future__ import absolute_import
|
||||
|
||||
|
|
@ -60,7 +63,7 @@ del ourselves, std_os, std_sys
|
|||
# Basically, it states that appropriate permissions will be set for the owner, nothing for the
|
||||
# group, appropriate permissions for the "Everyone" group, and all permissions to the
|
||||
# "Administrators" group + "System" user, as they can do everything anyway.
|
||||
def chmod(*unused_args, **unused_kwargs):
|
||||
def chmod(*unused_args, **unused_kwargs): # type: ignore
|
||||
"""Method os.chmod() is forbidden"""
|
||||
raise RuntimeError('Usage of os.chmod() is forbidden. '
|
||||
'Use certbot.compat.filesystem.chmod() instead.')
|
||||
|
|
@ -70,7 +73,7 @@ def chmod(*unused_args, **unused_kwargs):
|
|||
# this platform. In order to have a consistent behavior between Linux and Windows on Certbot files
|
||||
# and directories, the filesystem umask method must be used instead, since it implements umask for
|
||||
# Windows.
|
||||
def umask(*unused_args, **unused_kwargs):
|
||||
def umask(*unused_args, **unused_kwargs): # type: ignore
|
||||
"""Method os.chmod() is forbidden"""
|
||||
raise RuntimeError('Usage of os.umask() is forbidden. '
|
||||
'Use certbot.compat.filesystem.umask() instead.')
|
||||
|
|
@ -79,7 +82,7 @@ def umask(*unused_args, **unused_kwargs):
|
|||
# Because uid is not a concept on Windows, chown is useless. In fact, it is not even available
|
||||
# on Python for Windows. So to be consistent on both platforms for Certbot, this method is
|
||||
# always forbidden.
|
||||
def chown(*unused_args, **unused_kwargs):
|
||||
def chown(*unused_args, **unused_kwargs): # type: ignore
|
||||
"""Method os.chown() is forbidden"""
|
||||
raise RuntimeError('Usage of os.chown() is forbidden.'
|
||||
'Use certbot.compat.filesystem.copy_ownership_and_apply_mode() instead.')
|
||||
|
|
@ -90,7 +93,7 @@ def chown(*unused_args, **unused_kwargs):
|
|||
# filesystem.open invokes the Windows native API `CreateFile` to ensure that permissions are
|
||||
# atomically set in case of file creation, or invokes filesystem.chmod to properly set the
|
||||
# permissions for the other cases.
|
||||
def open(*unused_args, **unused_kwargs):
|
||||
def open(*unused_args, **unused_kwargs): # type: ignore
|
||||
"""Method os.open() is forbidden"""
|
||||
raise RuntimeError('Usage of os.open() is forbidden. '
|
||||
'Use certbot.compat.filesystem.open() instead.')
|
||||
|
|
@ -98,7 +101,7 @@ def open(*unused_args, **unused_kwargs):
|
|||
|
||||
# Very similarly to os.open, os.mkdir has the same effects on Windows and creates an unsecured
|
||||
# folder. So a similar mitigation to security.chmod is provided on this platform.
|
||||
def mkdir(*unused_args, **unused_kwargs):
|
||||
def mkdir(*unused_args, **unused_kwargs): # type: ignore
|
||||
"""Method os.mkdir() is forbidden"""
|
||||
raise RuntimeError('Usage of os.mkdir() is forbidden. '
|
||||
'Use certbot.compat.filesystem.mkdir() instead.')
|
||||
|
|
@ -109,7 +112,7 @@ def mkdir(*unused_args, **unused_kwargs):
|
|||
# that our modified os.mkdir is called on Windows, by monkey patching temporarily the mkdir method
|
||||
# on the original os module, executing the modified logic to correctly protect newly created
|
||||
# folders, then restoring original mkdir method in the os module.
|
||||
def makedirs(*unused_args, **unused_kwargs):
|
||||
def makedirs(*unused_args, **unused_kwargs): # type: ignore
|
||||
"""Method os.makedirs() is forbidden"""
|
||||
raise RuntimeError('Usage of os.makedirs() is forbidden. '
|
||||
'Use certbot.compat.filesystem.makedirs() instead.')
|
||||
|
|
@ -117,7 +120,7 @@ def makedirs(*unused_args, **unused_kwargs):
|
|||
|
||||
# Because of the blocking strategy on file handlers on Windows, rename does not behave as expected
|
||||
# with POSIX systems: an exception will be raised if dst already exists.
|
||||
def rename(*unused_args, **unused_kwargs):
|
||||
def rename(*unused_args, **unused_kwargs): # type: ignore
|
||||
"""Method os.rename() is forbidden"""
|
||||
raise RuntimeError('Usage of os.rename() is forbidden. '
|
||||
'Use certbot.compat.filesystem.replace() instead.')
|
||||
|
|
@ -125,7 +128,7 @@ def rename(*unused_args, **unused_kwargs):
|
|||
|
||||
# Behavior of os.replace is consistent between Windows and Linux. However, it is not supported on
|
||||
# Python 2.x. So, as for os.rename, we forbid it in favor of filesystem.replace.
|
||||
def replace(*unused_args, **unused_kwargs):
|
||||
def replace(*unused_args, **unused_kwargs): # type: ignore
|
||||
"""Method os.replace() is forbidden"""
|
||||
raise RuntimeError('Usage of os.replace() is forbidden. '
|
||||
'Use certbot.compat.filesystem.replace() instead.')
|
||||
|
|
@ -133,7 +136,7 @@ def replace(*unused_args, **unused_kwargs):
|
|||
|
||||
# Results given by os.access are inconsistent or partial on Windows, because this platform is not
|
||||
# following the POSIX approach.
|
||||
def access(*unused_args, **unused_kwargs):
|
||||
def access(*unused_args, **unused_kwargs): # type: ignore
|
||||
"""Method os.access() is forbidden"""
|
||||
raise RuntimeError('Usage of os.access() is forbidden. '
|
||||
'Use certbot.compat.filesystem.check_mode() or '
|
||||
|
|
@ -142,7 +145,7 @@ def access(*unused_args, **unused_kwargs):
|
|||
|
||||
# On Windows os.stat call result is inconsistent, with a lot of flags that are not set or
|
||||
# meaningless. We need to use specialized functions from the certbot.compat.filesystem module.
|
||||
def stat(*unused_args, **unused_kwargs):
|
||||
def stat(*unused_args, **unused_kwargs): # type: ignore
|
||||
"""Method os.stat() is forbidden"""
|
||||
raise RuntimeError('Usage of os.stat() is forbidden. '
|
||||
'Use certbot.compat.filesystem functions instead '
|
||||
|
|
@ -151,7 +154,7 @@ def stat(*unused_args, **unused_kwargs):
|
|||
|
||||
# Method os.fstat has the same problem than os.stat, since it is the same function,
|
||||
# but accepting a file descriptor instead of a path.
|
||||
def fstat(*unused_args, **unused_kwargs):
|
||||
def fstat(*unused_args, **unused_kwargs): # type: ignore
|
||||
"""Method os.stat() is forbidden"""
|
||||
raise RuntimeError('Usage of os.fstat() is forbidden. '
|
||||
'Use certbot.compat.filesystem functions instead '
|
||||
|
|
@ -163,7 +166,7 @@ def fstat(*unused_args, **unused_kwargs):
|
|||
# unconditionally, which allows to use more than 259 characters, and its string
|
||||
# representation is prepended with "\\?\". Problem is that it does it for any path,
|
||||
# and will make equality comparison fail with paths that will use the simple form.
|
||||
def readlink(*unused_args, **unused_kwargs):
|
||||
def readlink(*unused_args, **unused_kwargs): # type: ignore
|
||||
"""Method os.readlink() is forbidden"""
|
||||
raise RuntimeError('Usage of os.readlink() is forbidden. '
|
||||
'Use certbot.compat.filesystem.realpath() instead.')
|
||||
|
|
|
|||
|
|
@ -349,7 +349,7 @@ class IInstaller(IPlugin):
|
|||
|
||||
"""
|
||||
|
||||
def rollback_checkpoints(rollback=1):
|
||||
def rollback_checkpoints(rollback: int = 1):
|
||||
"""Revert `rollback` number of configuration checkpoints.
|
||||
|
||||
:raises .PluginError: when configuration cannot be fully reverted
|
||||
|
|
|
|||
|
|
@ -43,6 +43,9 @@ class DNSAuthenticator(common.Plugin):
|
|||
def prepare(self): # pylint: disable=missing-function-docstring
|
||||
pass
|
||||
|
||||
def more_info(self) -> str: # pylint: disable=missing-function-docstring
|
||||
raise NotImplementedError()
|
||||
|
||||
def perform(self, achalls): # pylint: disable=missing-function-docstring
|
||||
self._setup_credentials()
|
||||
|
||||
|
|
|
|||
|
|
@ -1,22 +1,52 @@
|
|||
"""Base test class for DNS authenticators."""
|
||||
import typing
|
||||
|
||||
import configobj
|
||||
import josepy as jose
|
||||
try:
|
||||
import mock
|
||||
except ImportError: # pragma: no cover
|
||||
from unittest import mock # type: ignore
|
||||
|
||||
from acme import challenges
|
||||
from certbot import achallenges
|
||||
from certbot.compat import filesystem
|
||||
from certbot.plugins.dns_common import DNSAuthenticator
|
||||
from certbot.tests import acme_util
|
||||
from certbot.tests import util as test_util
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from typing import Protocol
|
||||
else:
|
||||
Protocol = object # type: ignore
|
||||
|
||||
|
||||
|
||||
try:
|
||||
import mock
|
||||
except ImportError: # pragma: no cover
|
||||
from unittest import mock # type: ignore
|
||||
|
||||
|
||||
DOMAIN = 'example.com'
|
||||
KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
|
||||
|
||||
|
||||
class _AuthenticatorCallableTestCase(Protocol):
|
||||
"""Protocol describing a TestCase able to call a real DNSAuthenticator instance."""
|
||||
auth: DNSAuthenticator
|
||||
|
||||
def assertTrue(self, *unused_args) -> None:
|
||||
"""
|
||||
See
|
||||
https://docs.python.org/3/library/unittest.html#unittest.TestCase.assertTrue
|
||||
"""
|
||||
...
|
||||
|
||||
def assertEqual(self, *unused_args) -> None:
|
||||
"""
|
||||
See
|
||||
https://docs.python.org/3/library/unittest.html#unittest.TestCase.assertEqual
|
||||
"""
|
||||
...
|
||||
|
||||
|
||||
class BaseAuthenticatorTest:
|
||||
"""
|
||||
A base test class to reduce duplication between test code for DNS Authenticator Plugins.
|
||||
|
|
@ -29,13 +59,13 @@ class BaseAuthenticatorTest:
|
|||
achall = achallenges.KeyAuthorizationAnnotatedChallenge(
|
||||
challb=acme_util.DNS01, domain=DOMAIN, account_key=KEY)
|
||||
|
||||
def test_more_info(self):
|
||||
def test_more_info(self: _AuthenticatorCallableTestCase):
|
||||
self.assertTrue(isinstance(self.auth.more_info(), str)) # pylint: disable=no-member
|
||||
|
||||
def test_get_chall_pref(self):
|
||||
def test_get_chall_pref(self: _AuthenticatorCallableTestCase):
|
||||
self.assertEqual(self.auth.get_chall_pref(None), [challenges.DNS01]) # pylint: disable=no-member
|
||||
|
||||
def test_parser_arguments(self):
|
||||
def test_parser_arguments(self: _AuthenticatorCallableTestCase):
|
||||
m = mock.MagicMock()
|
||||
self.auth.add_parser_arguments(m) # pylint: disable=no-member
|
||||
|
||||
|
|
|
|||
|
|
@ -1,32 +1,79 @@
|
|||
"""Base test class for DNS authenticators built on Lexicon."""
|
||||
import typing
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import josepy as jose
|
||||
try:
|
||||
import mock
|
||||
except ImportError: # pragma: no cover
|
||||
from unittest import mock # type: ignore
|
||||
from requests.exceptions import HTTPError
|
||||
from requests.exceptions import RequestException
|
||||
|
||||
from acme.challenges import Challenge
|
||||
from certbot import errors
|
||||
from certbot.plugins import dns_test_common
|
||||
from certbot.plugins.dns_common_lexicon import LexiconClient
|
||||
from certbot.plugins.dns_test_common import _AuthenticatorCallableTestCase
|
||||
from certbot.tests import util as test_util
|
||||
|
||||
try:
|
||||
import mock
|
||||
except ImportError: # pragma: no cover
|
||||
from unittest import mock # type: ignore
|
||||
if typing.TYPE_CHECKING:
|
||||
from typing import Protocol
|
||||
else:
|
||||
Protocol = object # type: ignore
|
||||
|
||||
|
||||
|
||||
|
||||
DOMAIN = 'example.com'
|
||||
KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
|
||||
|
||||
|
||||
class _AuthenticatorCallableLexiconTestCase(_AuthenticatorCallableTestCase, Protocol):
|
||||
"""
|
||||
Protocol describing a TestCase suitable to test challenges against
|
||||
a mocked LexiconClient instance.
|
||||
"""
|
||||
mock_client: MagicMock
|
||||
achall: Challenge
|
||||
|
||||
|
||||
class _LexiconAwareTestCase(Protocol):
|
||||
"""
|
||||
Protocol describing a TestCase suitable to test a real LexiconClient instance.
|
||||
"""
|
||||
client: LexiconClient
|
||||
provider_mock: MagicMock
|
||||
|
||||
record_prefix: str
|
||||
record_name: str
|
||||
record_content: str
|
||||
|
||||
DOMAIN_NOT_FOUND: Exception
|
||||
GENERIC_ERROR: Exception
|
||||
LOGIN_ERROR: Exception
|
||||
UNKNOWN_LOGIN_ERROR: Exception
|
||||
|
||||
def assertRaises(self, *unused_args) -> None:
|
||||
"""
|
||||
See
|
||||
https://docs.python.org/3/library/unittest.html#unittest.TestCase.assertRaises
|
||||
"""
|
||||
...
|
||||
|
||||
|
||||
# These classes are intended to be subclassed/mixed in, so not all members are defined.
|
||||
# pylint: disable=no-member
|
||||
|
||||
class BaseLexiconAuthenticatorTest(dns_test_common.BaseAuthenticatorTest):
|
||||
|
||||
def test_perform(self):
|
||||
def test_perform(self: _AuthenticatorCallableLexiconTestCase):
|
||||
self.auth.perform([self.achall])
|
||||
|
||||
expected = [mock.call.add_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY)]
|
||||
self.assertEqual(expected, self.mock_client.mock_calls)
|
||||
|
||||
def test_cleanup(self):
|
||||
def test_cleanup(self: _AuthenticatorCallableLexiconTestCase):
|
||||
self.auth._attempt_cleanup = True # _attempt_cleanup | pylint: disable=protected-access
|
||||
self.auth.cleanup([self.achall])
|
||||
|
||||
|
|
@ -44,14 +91,14 @@ class BaseLexiconClientTest:
|
|||
record_name = record_prefix + "." + DOMAIN
|
||||
record_content = "bar"
|
||||
|
||||
def test_add_txt_record(self):
|
||||
def test_add_txt_record(self: _LexiconAwareTestCase):
|
||||
self.client.add_txt_record(DOMAIN, self.record_name, self.record_content)
|
||||
|
||||
self.provider_mock.create_record.assert_called_with(type='TXT',
|
||||
name=self.record_name,
|
||||
content=self.record_content)
|
||||
|
||||
def test_add_txt_record_try_twice_to_find_domain(self):
|
||||
def test_add_txt_record_try_twice_to_find_domain(self: _LexiconAwareTestCase):
|
||||
self.provider_mock.authenticate.side_effect = [self.DOMAIN_NOT_FOUND, '']
|
||||
|
||||
self.client.add_txt_record(DOMAIN, self.record_name, self.record_content)
|
||||
|
|
@ -60,7 +107,7 @@ class BaseLexiconClientTest:
|
|||
name=self.record_name,
|
||||
content=self.record_content)
|
||||
|
||||
def test_add_txt_record_fail_to_find_domain(self):
|
||||
def test_add_txt_record_fail_to_find_domain(self: _LexiconAwareTestCase):
|
||||
self.provider_mock.authenticate.side_effect = [self.DOMAIN_NOT_FOUND,
|
||||
self.DOMAIN_NOT_FOUND,
|
||||
self.DOMAIN_NOT_FOUND,]
|
||||
|
|
@ -69,64 +116,64 @@ class BaseLexiconClientTest:
|
|||
self.client.add_txt_record,
|
||||
DOMAIN, self.record_name, self.record_content)
|
||||
|
||||
def test_add_txt_record_fail_to_authenticate(self):
|
||||
def test_add_txt_record_fail_to_authenticate(self: _LexiconAwareTestCase):
|
||||
self.provider_mock.authenticate.side_effect = self.LOGIN_ERROR
|
||||
|
||||
self.assertRaises(errors.PluginError,
|
||||
self.client.add_txt_record,
|
||||
DOMAIN, self.record_name, self.record_content)
|
||||
|
||||
def test_add_txt_record_fail_to_authenticate_with_unknown_error(self):
|
||||
def test_add_txt_record_fail_to_authenticate_with_unknown_error(self: _LexiconAwareTestCase):
|
||||
self.provider_mock.authenticate.side_effect = self.UNKNOWN_LOGIN_ERROR
|
||||
|
||||
self.assertRaises(errors.PluginError,
|
||||
self.client.add_txt_record,
|
||||
DOMAIN, self.record_name, self.record_content)
|
||||
|
||||
def test_add_txt_record_error_finding_domain(self):
|
||||
def test_add_txt_record_error_finding_domain(self: _LexiconAwareTestCase):
|
||||
self.provider_mock.authenticate.side_effect = self.GENERIC_ERROR
|
||||
|
||||
self.assertRaises(errors.PluginError,
|
||||
self.client.add_txt_record,
|
||||
DOMAIN, self.record_name, self.record_content)
|
||||
|
||||
def test_add_txt_record_error_adding_record(self):
|
||||
def test_add_txt_record_error_adding_record(self: _LexiconAwareTestCase):
|
||||
self.provider_mock.create_record.side_effect = self.GENERIC_ERROR
|
||||
|
||||
self.assertRaises(errors.PluginError,
|
||||
self.client.add_txt_record,
|
||||
DOMAIN, self.record_name, self.record_content)
|
||||
|
||||
def test_del_txt_record(self):
|
||||
def test_del_txt_record(self: _LexiconAwareTestCase):
|
||||
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)
|
||||
|
||||
self.provider_mock.delete_record.assert_called_with(type='TXT',
|
||||
name=self.record_name,
|
||||
content=self.record_content)
|
||||
|
||||
def test_del_txt_record_fail_to_find_domain(self):
|
||||
def test_del_txt_record_fail_to_find_domain(self: _LexiconAwareTestCase):
|
||||
self.provider_mock.authenticate.side_effect = [self.DOMAIN_NOT_FOUND,
|
||||
self.DOMAIN_NOT_FOUND,
|
||||
self.DOMAIN_NOT_FOUND, ]
|
||||
|
||||
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)
|
||||
|
||||
def test_del_txt_record_fail_to_authenticate(self):
|
||||
def test_del_txt_record_fail_to_authenticate(self: _LexiconAwareTestCase):
|
||||
self.provider_mock.authenticate.side_effect = self.LOGIN_ERROR
|
||||
|
||||
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)
|
||||
|
||||
def test_del_txt_record_fail_to_authenticate_with_unknown_error(self):
|
||||
def test_del_txt_record_fail_to_authenticate_with_unknown_error(self: _LexiconAwareTestCase):
|
||||
self.provider_mock.authenticate.side_effect = self.UNKNOWN_LOGIN_ERROR
|
||||
|
||||
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)
|
||||
|
||||
def test_del_txt_record_error_finding_domain(self):
|
||||
def test_del_txt_record_error_finding_domain(self: _LexiconAwareTestCase):
|
||||
self.provider_mock.authenticate.side_effect = self.GENERIC_ERROR
|
||||
|
||||
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)
|
||||
|
||||
def test_del_txt_record_error_deleting_record(self):
|
||||
def test_del_txt_record_error_deleting_record(self: _LexiconAwareTestCase):
|
||||
self.provider_mock.delete_record.side_effect = self.GENERIC_ERROR
|
||||
|
||||
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ import shutil
|
|||
import time
|
||||
import traceback
|
||||
|
||||
|
||||
from certbot import errors
|
||||
from certbot import util
|
||||
from certbot._internal import constants
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ MarkupSafe==1.1.1
|
|||
mccabe==0.6.1
|
||||
more-itertools==5.0.0
|
||||
msrest==0.6.18
|
||||
mypy==0.710
|
||||
mypy==0.812
|
||||
mypy-extensions==0.4.3
|
||||
ndg-httpsclient==0.3.2
|
||||
oauth2client==4.0.0
|
||||
|
|
@ -120,6 +120,7 @@ traitlets==4.3.3
|
|||
twine==1.11.0
|
||||
typed-ast==1.4.1
|
||||
typing==3.6.4
|
||||
typing-extensions==3.7.4.3
|
||||
uritemplate==3.0.0
|
||||
virtualenv==16.6.2
|
||||
wcwidth==0.1.8
|
||||
|
|
|
|||
|
|
@ -35,8 +35,6 @@ acme = {path = "../../acme", extras = ["dev", "docs"]}
|
|||
windows-installer = {path = "../../windows-installer"}
|
||||
|
||||
# Extra dependencies
|
||||
# See https://github.com/certbot/certbot/issues/8425.
|
||||
mypy = "0.710"
|
||||
# Upgrading coverage, pylint, pytest, and some of pytest's plugins causes many
|
||||
# test failures so let's pin these packages back for now.
|
||||
coverage = "4.5.4"
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ mccabe==0.6.1; python_version >= "3.6"
|
|||
msgpack==1.0.2; python_version >= "3.6" and python_full_version < "3.0.0" or python_version >= "3.6" and python_full_version >= "3.5.0"
|
||||
msrest==0.6.21; python_version >= "3.6"
|
||||
mypy-extensions==0.4.3; python_version >= "3.6"
|
||||
mypy==0.710
|
||||
mypy==0.812; python_version >= "3.6"
|
||||
oauth2client==4.1.3; python_version >= "3.6"
|
||||
oauthlib==3.1.0; python_version >= "3.6" and python_full_version < "3.0.0" or python_version >= "3.6" and python_full_version >= "3.4.0"
|
||||
packaging==20.9; python_version >= "3.6" and python_full_version < "3.0.0" or python_version >= "3.6" and python_full_version >= "3.6.0"
|
||||
|
|
@ -160,7 +160,8 @@ tox==3.23.0; python_version >= "3.6" and python_full_version < "3.0.0" or python
|
|||
tqdm==4.59.0; python_version >= "3.6" and python_full_version < "3.0.0" or python_version >= "3.6" and python_full_version >= "3.4.0"
|
||||
traitlets==4.3.3
|
||||
twine==3.3.0; python_version >= "3.6"
|
||||
typed-ast==1.4.2; python_version >= "3.6" and implementation_name == "cpython" and python_version < "3.8"
|
||||
typed-ast==1.4.2; implementation_name == "cpython" and python_version < "3.8" and python_version >= "3.6"
|
||||
typing-extensions==3.7.4.3; python_version >= "3.6"
|
||||
uritemplate==3.0.1; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" and python_version >= "3.6"
|
||||
urllib3==1.26.4; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.6.0" and python_version < "4" and python_version >= "3.6"
|
||||
virtualenv==20.4.3; python_version >= "3.6" and python_full_version < "3.0.0" or python_version >= "3.6" and python_full_version >= "3.5.0"
|
||||
|
|
|
|||
3
tox.ini
3
tox.ini
|
|
@ -151,11 +151,12 @@ commands =
|
|||
{[base]install_packages}
|
||||
python -m pylint --reports=n --rcfile=.pylintrc {[base]source_paths}
|
||||
|
||||
// TODO: Re-enable strict checks for optionals with appropriate type corrections or code redesign.
|
||||
[testenv:mypy]
|
||||
basepython = python3
|
||||
commands =
|
||||
{[base]install_packages}
|
||||
mypy {[base]source_paths}
|
||||
mypy --no-strict-optional {[base]source_paths}
|
||||
|
||||
[testenv:apacheconftest]
|
||||
commands =
|
||||
|
|
|
|||
Loading…
Reference in a new issue