Merge remote-tracking branch 'upstream/master' into cli_new_cert_reporting

This commit is contained in:
Alex Zorin 2021-03-22 10:04:30 +11:00
commit 714c4d9208
197 changed files with 1685 additions and 1531 deletions

View file

@ -254,7 +254,7 @@ ignore-mixin-members=yes
# List of module names for which member attributes should not be checked
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis
ignored-modules=pkg_resources,confargparse,argparse,six.moves,six.moves.urllib
ignored-modules=pkg_resources,confargparse,argparse
# import errors ignored only in 1.4.4
# https://bitbucket.org/logilab/pylint/commits/cd000904c9e2

View file

@ -8,15 +8,15 @@ import socket
from cryptography.hazmat.primitives import hashes # type: ignore
import josepy as jose
import requests
import six
from OpenSSL import SSL # type: ignore # https://github.com/python/typeshed/issues/2052
from OpenSSL import crypto
from OpenSSL import SSL # type: ignore # https://github.com/python/typeshed/issues/2052
import requests
from acme import crypto_util
from acme import errors
from acme import fields
from acme.mixins import ResourceMixin, TypeMixin
from acme.mixins import ResourceMixin
from acme.mixins import TypeMixin
logger = logging.getLogger(__name__)
@ -24,7 +24,7 @@ logger = logging.getLogger(__name__)
class Challenge(jose.TypedJSONObjectWithFields):
# _fields_to_partial_json
"""ACME challenge."""
TYPES = {} # type: dict
TYPES: dict = {}
@classmethod
def from_json(cls, jobj):
@ -38,7 +38,7 @@ class Challenge(jose.TypedJSONObjectWithFields):
class ChallengeResponse(ResourceMixin, TypeMixin, jose.TypedJSONObjectWithFields):
# _fields_to_partial_json
"""ACME challenge response."""
TYPES = {} # type: dict
TYPES: dict = {}
resource_type = 'challenge'
resource = fields.Resource(resource_type)
@ -145,8 +145,7 @@ class KeyAuthorizationChallengeResponse(ChallengeResponse):
return jobj
@six.add_metaclass(abc.ABCMeta)
class KeyAuthorizationChallenge(_TokenChallenge):
class KeyAuthorizationChallenge(_TokenChallenge, metaclass=abc.ABCMeta):
"""Challenge based on Key Authorization.
:param response_cls: Subclass of `KeyAuthorizationChallengeResponse`

View file

@ -4,9 +4,14 @@ import collections
import datetime
from email.utils import parsedate_tz
import heapq
import http.client as http_client
import logging
import re
import time
from typing import Dict
from typing import List
from typing import Set
from typing import Text
import josepy as jose
import OpenSSL
@ -14,17 +19,11 @@ import requests
from requests.adapters import HTTPAdapter
from requests.utils import parse_header_links
from requests_toolbelt.adapters.source import SourceAddressAdapter
import six
from six.moves import http_client
from acme import crypto_util
from acme import errors
from acme import jws
from acme import messages
from acme.magic_typing import Dict
from acme.magic_typing import List
from acme.magic_typing import Set
from acme.magic_typing import Text
from acme.mixins import VersionedLEACMEMixin
logger = logging.getLogger(__name__)
@ -34,7 +33,7 @@ DEFAULT_NETWORK_TIMEOUT = 45
DER_CONTENT_TYPE = 'application/pkix-cert'
class ClientBase(object):
class ClientBase:
"""ACME client base object.
:ivar messages.Directory directory:
@ -113,8 +112,9 @@ class ClientBase(object):
"""
return self.update_registration(regr, update={'status': 'deactivated'})
def deactivate_authorization(self, authzr):
# type: (messages.AuthorizationResource) -> messages.AuthorizationResource
def deactivate_authorization(self,
authzr: messages.AuthorizationResource
) -> messages.AuthorizationResource:
"""Deactivate authorization.
:param messages.AuthorizationResource authzr: The Authorization resource
@ -248,7 +248,7 @@ class Client(ClientBase):
if net is None:
net = ClientNetwork(key, alg=alg, verify_ssl=verify_ssl)
if isinstance(directory, six.string_types):
if isinstance(directory, str):
directory = messages.Directory.from_json(
net.get(directory).json())
super(Client, self).__init__(directory=directory,
@ -424,7 +424,7 @@ class Client(ClientBase):
"""
assert max_attempts > 0
attempts = collections.defaultdict(int) # type: Dict[messages.AuthorizationResource, int]
attempts: Dict[messages.AuthorizationResource, int] = collections.defaultdict(int)
exhausted = set()
# priority queue with datetime.datetime (based on Retry-After) as key,
@ -463,7 +463,7 @@ class Client(ClientBase):
exhausted.add(authzr)
if exhausted or any(authzr.body.status == messages.STATUS_INVALID
for authzr in six.itervalues(updated)):
for authzr in updated.values()):
raise errors.PollError(exhausted, updated)
updated_authzrs = tuple(updated[authzr] for authzr in authzrs)
@ -537,7 +537,7 @@ class Client(ClientBase):
:rtype: `list` of `OpenSSL.crypto.X509` wrapped in `.ComparableX509`
"""
chain = [] # type: List[jose.ComparableX509]
chain: List[jose.ComparableX509] = []
uri = certr.cert_chain_uri
while uri is not None and len(chain) < max_length:
response, cert = self._get_cert(uri)
@ -796,7 +796,7 @@ class ClientV2(ClientBase):
if 'rel' in l and 'url' in l and l['rel'] == relation_type]
class BackwardsCompatibleClientV2(object):
class BackwardsCompatibleClientV2:
"""ACME client wrapper that tends towards V2-style calls, but
supports V1 servers.
@ -939,7 +939,7 @@ class BackwardsCompatibleClientV2(object):
return self.client.external_account_required()
class ClientNetwork(object):
class ClientNetwork:
"""Wrapper around requests that signs POSTs for authentication.
Also adds user agent, and handles Content-Type.
@ -969,7 +969,7 @@ class ClientNetwork(object):
self.account = account
self.alg = alg
self.verify_ssl = verify_ssl
self._nonces = set() # type: Set[Text]
self._nonces: Set[Text] = set()
self.user_agent = user_agent
self.session = requests.Session()
self._default_timeout = timeout

View file

@ -5,15 +5,15 @@ import logging
import os
import re
import socket
from typing import Callable
from typing import Tuple
from typing import Union
import josepy as jose
from OpenSSL import crypto
from OpenSSL import SSL # type: ignore # https://github.com/python/typeshed/issues/2052
from acme import errors
from acme.magic_typing import Callable
from acme.magic_typing import Tuple
from acme.magic_typing import Union
logger = logging.getLogger(__name__)
@ -27,7 +27,7 @@ logger = logging.getLogger(__name__)
_DEFAULT_SSL_METHOD = SSL.SSLv23_METHOD # type: ignore
class _DefaultCertSelection(object):
class _DefaultCertSelection:
def __init__(self, certs):
self.certs = certs
@ -36,7 +36,7 @@ class _DefaultCertSelection(object):
return self.certs.get(server_name, None)
class SSLSocket(object): # pylint: disable=too-few-public-methods
class SSLSocket: # pylint: disable=too-few-public-methods
"""SSL wrapper for sockets.
:ivar socket sock: Original wrapped socket.
@ -93,7 +93,7 @@ class SSLSocket(object): # pylint: disable=too-few-public-methods
new_context.set_alpn_select_callback(self.alpn_selection)
connection.set_context(new_context)
class FakeConnection(object):
class FakeConnection:
"""Fake OpenSSL.SSL.Connection."""
# pylint: disable=missing-function-docstring
@ -168,7 +168,7 @@ def probe_sni(name, host, port=443, timeout=300, # pylint: disable=too-many-argu
source_address[1]
) if any(source_address) else ""
)
socket_tuple = (host, port) # type: Tuple[str, int]
socket_tuple: Tuple[str, int] = (host, port)
sock = socket.create_connection(socket_tuple, **socket_kwargs) # type: ignore
except socket.error as error:
raise errors.Error(error)
@ -256,7 +256,7 @@ def _pyopenssl_cert_or_req_san(cert_or_req):
if isinstance(cert_or_req, crypto.X509):
# pylint: disable=line-too-long
func = crypto.dump_certificate # type: Union[Callable[[int, crypto.X509Req], bytes], Callable[[int, crypto.X509], bytes]]
func: Union[Callable[[int, crypto.X509Req], bytes], Callable[[int, crypto.X509], bytes]] = crypto.dump_certificate
else:
func = crypto.dump_certificate_request
text = func(crypto.FILETYPE_TEXT, cert_or_req).decode("utf-8")

View file

@ -1,16 +1,17 @@
"""Shim class to not have to depend on typing module in prod."""
import sys
"""Simple shim around the typing module.
This was useful when this code supported Python 2 and typing wasn't always
available. This code is being kept for now for backwards compatibility.
class TypingClass(object):
"""
import warnings
from typing import * # pylint: disable=wildcard-import, unused-wildcard-import
from typing import Collection, IO # type: ignore
warnings.warn("acme.magic_typing is deprecated and will be removed in a future release.",
DeprecationWarning)
class TypingClass:
"""Ignore import errors by getting anything"""
def __getattr__(self, name):
return None
try:
# mypy doesn't respect modifying sys.modules
from typing import * # pylint: disable=wildcard-import, unused-wildcard-import
from typing import Collection, IO # type: ignore
except ImportError:
# mypy complains because TypingClass is not a module
sys.modules[__name__] = TypingClass() # type: ignore
return None # pragma: no cover

View file

@ -1,8 +1,8 @@
"""ACME protocol messages."""
import json
from collections.abc import Hashable
import josepy as jose
import six
from acme import challenges
from acme import errors
@ -11,13 +11,6 @@ from acme import jws
from acme import util
from acme.mixins import ResourceMixin
try:
from collections.abc import Hashable
except ImportError: # pragma: no cover
from collections import Hashable
OLD_ERROR_PREFIX = "urn:acme:error:"
ERROR_PREFIX = "urn:ietf:params:acme:error:"
@ -68,7 +61,6 @@ def is_acme_error(err):
return False
@six.python_2_unicode_compatible
class Error(jose.JSONObjectWithFields, errors.Error):
"""ACME error.
@ -158,13 +150,10 @@ class _Constant(jose.JSONDeSerializable, Hashable): # type: ignore
def __hash__(self):
return hash((self.__class__, self.name))
def __ne__(self, other):
return not self == other
class Status(_Constant):
"""ACME "status" field."""
POSSIBLE_NAMES = {} # type: dict
POSSIBLE_NAMES: dict = {}
STATUS_UNKNOWN = Status('unknown')
STATUS_PENDING = Status('pending')
STATUS_PROCESSING = Status('processing')
@ -177,7 +166,7 @@ STATUS_DEACTIVATED = Status('deactivated')
class IdentifierType(_Constant):
"""ACME identifier type."""
POSSIBLE_NAMES = {} # type: dict
POSSIBLE_NAMES: dict = {}
IDENTIFIER_FQDN = IdentifierType('dns') # IdentifierDNS in Boulder
@ -195,7 +184,7 @@ class Identifier(jose.JSONObjectWithFields):
class Directory(jose.JSONDeSerializable):
"""Directory."""
_REGISTERED_TYPES = {} # type: dict
_REGISTERED_TYPES: dict = {}
class Meta(jose.JSONObjectWithFields):
"""Directory Meta."""
@ -285,7 +274,7 @@ class ResourceBody(jose.JSONObjectWithFields):
"""ACME Resource Body."""
class ExternalAccountBinding(object):
class ExternalAccountBinding:
"""ACME External Account Binding"""
@classmethod

View file

@ -1,7 +1,7 @@
"""Useful mixins for Challenge and Resource objects"""
class VersionedLEACMEMixin(object):
class VersionedLEACMEMixin:
"""This mixin stores the version of Let's Encrypt's endpoint being used."""
@property
def le_acme_version(self):

View file

@ -1,17 +1,16 @@
"""Support for standalone client challenge solvers. """
import collections
import functools
import http.client as http_client
import http.server as BaseHTTPServer
import logging
import socket
import socketserver
import threading
from six.moves import BaseHTTPServer # type: ignore
from six.moves import http_client
from six.moves import socketserver # type: ignore
from typing import List
from acme import challenges
from acme import crypto_util
from acme.magic_typing import List
logger = logging.getLogger(__name__)
@ -54,7 +53,7 @@ class ACMEServerMixin:
allow_reuse_address = True
class BaseDualNetworkedServers(object):
class BaseDualNetworkedServers:
"""Base class for a pair of IPv6 and IPv4 servers that tries to do everything
it's asked for both servers, but where failures in one server don't
affect the other.
@ -64,8 +63,8 @@ class BaseDualNetworkedServers(object):
def __init__(self, ServerClass, server_address, *remaining_args, **kwargs):
port = server_address[1]
self.threads = [] # type: List[threading.Thread]
self.servers = [] # type: List[ACMEServerMixin]
self.threads: List[threading.Thread] = []
self.servers: List[ACMEServerMixin] = []
# Must try True first.
# Ubuntu, for example, will fail to bind to IPv4 if we've already bound

View file

@ -1,7 +1,6 @@
"""ACME utilities."""
import six
def map_keys(dikt, func):
"""Map dictionary keys."""
return {func(key): value for key, value in six.iteritems(dikt)}
return {func(key): value for key, value in dikt.items()}

View file

@ -87,7 +87,6 @@ language = 'en'
# directories to ignore when looking for source files.
exclude_patterns = [
'_build',
'man/*'
]
# The reST default role (used for this markup: `text`) to use for all

View file

@ -1 +1,3 @@
:orphan:
.. literalinclude:: ../jws-help.txt

View file

@ -3,7 +3,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
# Please update tox.ini when modifying dependency version requirements
install_requires = [
@ -18,7 +18,6 @@ install_requires = [
'requests>=2.6.0',
'requests-toolbelt>=0.3.0',
'setuptools>=39.0.1',
'six>=1.11.0',
]
dev_extras = [

View file

@ -1,11 +1,11 @@
"""Tests for acme.challenges."""
import urllib.parse as urllib_parse
import unittest
from unittest import mock
import josepy as jose
import OpenSSL
import requests
from six.moves.urllib import parse as urllib_parse
from acme import errors

View file

@ -2,6 +2,7 @@
# pylint: disable=too-many-lines
import copy
import datetime
import http.client as http_client
import json
import unittest
from unittest import mock
@ -9,7 +10,6 @@ from unittest import mock
import josepy as jose
import OpenSSL
import requests
from six.moves import http_client # pylint: disable=import-error
from acme import challenges
from acme import errors
@ -61,7 +61,7 @@ class ClientTestBase(unittest.TestCase):
self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212')
reg = messages.Registration(
contact=self.contact, key=KEY.public_key())
the_arg = dict(reg) # type: Dict
the_arg: Dict = dict(reg)
self.new_reg = messages.NewRegistration(**the_arg)
self.regr = messages.RegistrationResource(
body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1')

View file

@ -1,14 +1,13 @@
"""Tests for acme.crypto_util."""
import itertools
import socket
import socketserver
import threading
import time
import unittest
import josepy as jose
import OpenSSL
import six
from six.moves import socketserver # type: ignore # pylint: disable=import-error
from acme import errors
import test_util
@ -27,8 +26,6 @@ class SSLSocketAndProbeSNITest(unittest.TestCase):
class _TestServer(socketserver.TCPServer):
# six.moves.* | pylint: disable=attribute-defined-outside-init,no-init
def server_bind(self): # pylint: disable=missing-docstring
self.socket = SSLSocket(socket.socket(),
certs)
@ -62,7 +59,6 @@ class SSLSocketAndProbeSNITest(unittest.TestCase):
self.assertRaises(errors.Error, self._probe, b'bar')
def test_probe_connection_error(self):
# pylint has a hard time with six
self.server.server_close()
original_timeout = socket.getdefaulttimeout()
try:
@ -121,9 +117,9 @@ class PyOpenSSLCertOrReqSANTest(unittest.TestCase):
@classmethod
def _get_idn_names(cls):
"""Returns expected names from '{cert,csr}-idnsans.pem'."""
chars = [six.unichr(i) for i in itertools.chain(range(0x3c3, 0x400),
range(0x641, 0x6fc),
range(0x1820, 0x1877))]
chars = [chr(i) for i in itertools.chain(range(0x3c3, 0x400),
range(0x641, 0x6fc),
range(0x1820, 0x1877))]
return [''.join(chars[i: i + 45]) + '.invalid'
for i in range(0, len(chars), 45)]
@ -184,7 +180,7 @@ class RandomSnTest(unittest.TestCase):
def setUp(self):
self.cert_count = 5
self.serial_num = [] # type: List[int]
self.serial_num: List[int] = []
self.key = OpenSSL.crypto.PKey()
self.key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)

View file

@ -1,6 +1,7 @@
"""Tests for acme.magic_typing."""
import sys
import unittest
import warnings
from unittest import mock
@ -9,32 +10,21 @@ class MagicTypingTest(unittest.TestCase):
def test_import_success(self):
try:
import typing as temp_typing
except ImportError: # pragma: no cover
temp_typing = None # pragma: no cover
except ImportError: # pragma: no cover
temp_typing = None # pragma: no cover
typing_class_mock = mock.MagicMock()
text_mock = mock.MagicMock()
typing_class_mock.Text = text_mock
sys.modules['typing'] = typing_class_mock
if 'acme.magic_typing' in sys.modules:
del sys.modules['acme.magic_typing'] # pragma: no cover
from acme.magic_typing import Text
del sys.modules['acme.magic_typing'] # pragma: no cover
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
from acme.magic_typing import Text
self.assertEqual(Text, text_mock)
del sys.modules['acme.magic_typing']
sys.modules['typing'] = temp_typing
def test_import_failure(self):
try:
import typing as temp_typing
except ImportError: # pragma: no cover
temp_typing = None # pragma: no cover
sys.modules['typing'] = None
if 'acme.magic_typing' in sys.modules:
del sys.modules['acme.magic_typing'] # pragma: no cover
from acme.magic_typing import Text
self.assertTrue(Text is None)
del sys.modules['acme.magic_typing']
sys.modules['typing'] = temp_typing
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -1,4 +1,5 @@
"""Tests for acme.messages."""
from typing import Dict
import unittest
from unittest import mock
@ -81,7 +82,7 @@ class ConstantTest(unittest.TestCase):
from acme.messages import _Constant
class MockConstant(_Constant): # pylint: disable=missing-docstring
POSSIBLE_NAMES = {} # type: Dict
POSSIBLE_NAMES: Dict = {}
self.MockConstant = MockConstant # pylint: disable=invalid-name
self.const_a = MockConstant('a')

View file

@ -1,13 +1,13 @@
"""Tests for acme.standalone."""
import http.client as http_client
import socket
import socketserver
import threading
import unittest
from unittest import mock
import josepy as jose
import requests
from six.moves import http_client # pylint: disable=import-error
from six.moves import socketserver # type: ignore # pylint: disable=import-error
from acme import challenges
from acme import crypto_util
@ -41,7 +41,7 @@ class HTTP01ServerTest(unittest.TestCase):
def setUp(self):
self.account_key = jose.JWK.load(
test_util.load_vector('rsa1024_key.pem'))
self.resources = set() # type: Set
self.resources: Set = set()
from acme.standalone import HTTP01Server
self.server = HTTP01Server(('', 0), resources=self.resources)
@ -218,7 +218,7 @@ class HTTP01DualNetworkedServersTest(unittest.TestCase):
def setUp(self):
self.account_key = jose.JWK.load(
test_util.load_vector('rsa1024_key.pem'))
self.resources = set() # type: Set
self.resources: Set = set()
from acme.standalone import HTTP01DualNetworkedServers
self.servers = HTTP01DualNetworkedServers(('', 0), resources=self.resources)

View file

@ -9,7 +9,6 @@ import pkg_resources
from certbot import errors
from certbot import util
from certbot.compat import os
logger = logging.getLogger(__name__)

View file

@ -3,7 +3,6 @@ import fnmatch
from certbot_apache._internal import interfaces
PASS = "CERTBOT_PASS_ASSERT"

View file

@ -64,10 +64,10 @@ Translates over to:
"/files/etc/apache2/apache2.conf/bLoCk[1]",
]
"""
from acme.magic_typing import Set
from typing import Set
from certbot import errors
from certbot.compat import os
from certbot_apache._internal import apache_util
from certbot_apache._internal import assertions
from certbot_apache._internal import interfaces
@ -355,7 +355,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
ownpath = self.metadata.get("augeaspath")
directives = self.parser.find_dir(name, start=ownpath, exclude=exclude)
already_parsed = set() # type: Set[str]
already_parsed: Set[str] = set()
for directive in directives:
# Remove the /arg part from the Augeas path
directive = directive.partition("/arg")[0]

View file

@ -1,28 +1,23 @@
"""Apache Configurator."""
# pylint: disable=too-many-lines
from collections import defaultdict
from distutils.version import LooseVersion
import copy
from distutils.version import LooseVersion
import fnmatch
import logging
import re
import socket
import time
from typing import DefaultDict
from typing import Dict
from typing import List
from typing import Set
from typing import Union
import zope.component
import zope.interface
try:
import apacheconfig
HAS_APACHECONFIG = True
except ImportError: # pragma: no cover
HAS_APACHECONFIG = False
from acme import challenges
from acme.magic_typing import DefaultDict
from acme.magic_typing import Dict
from acme.magic_typing import List
from acme.magic_typing import Set
from acme.magic_typing import Union
from certbot import errors
from certbot import interfaces
from certbot import util
@ -41,6 +36,13 @@ from certbot_apache._internal import http_01
from certbot_apache._internal import obj
from certbot_apache._internal import parser
try:
import apacheconfig
HAS_APACHECONFIG = True
except ImportError: # pragma: no cover
HAS_APACHECONFIG = False
logger = logging.getLogger(__name__)
@ -210,23 +212,23 @@ class ApacheConfigurator(common.Installer):
super(ApacheConfigurator, self).__init__(*args, **kwargs)
# Add name_server association dict
self.assoc = {} # type: Dict[str, obj.VirtualHost]
self.assoc: Dict[str, obj.VirtualHost] = {}
# Outstanding challenges
self._chall_out = set() # type: Set[KeyAuthorizationAnnotatedChallenge]
self._chall_out: Set[KeyAuthorizationAnnotatedChallenge] = set()
# List of vhosts configured per wildcard domain on this run.
# used by deploy_cert() and enhance()
self._wildcard_vhosts = {} # type: Dict[str, List[obj.VirtualHost]]
self._wildcard_vhosts: Dict[str, List[obj.VirtualHost]] = {}
# Maps enhancements to vhosts we've enabled the enhancement for
self._enhanced_vhosts = defaultdict(set) # type: DefaultDict[str, Set[obj.VirtualHost]]
self._enhanced_vhosts: DefaultDict[str, Set[obj.VirtualHost]] = defaultdict(set)
# Temporary state for AutoHSTS enhancement
self._autohsts = {} # type: Dict[str, Dict[str, Union[int, float]]]
self._autohsts: Dict[str, Dict[str, Union[int, float]]] = {}
# Reverter save notes
self.save_notes = ""
# Should we use ParserNode implementation instead of the old behavior
self.USE_PARSERNODE = use_parsernode
# Saves the list of file paths that were parsed initially, and
# not added to parser tree by self.conf("vhost-root") for example.
self.parsed_paths = [] # type: List[str]
self.parsed_paths: List[str] = []
# These will be set in the prepare function
self._prepared = False
self.parser = None
@ -832,7 +834,7 @@ class ApacheConfigurator(common.Installer):
:rtype: set
"""
all_names = set() # type: Set[str]
all_names: Set[str] = set()
vhost_macro = []
@ -996,8 +998,8 @@ class ApacheConfigurator(common.Installer):
"""
# Search base config, and all included paths for VirtualHosts
file_paths = {} # type: Dict[str, str]
internal_paths = defaultdict(set) # type: DefaultDict[str, Set[str]]
file_paths: Dict[str, str] = {}
internal_paths: DefaultDict[str, Set[str]] = defaultdict(set)
vhs = []
# Make a list of parser paths because the parser_paths
# dictionary may be modified during the loop.
@ -2156,7 +2158,7 @@ class ApacheConfigurator(common.Installer):
# There can be other RewriteRule directive lines in vhost config.
# rewrite_args_dict keys are directive ids and the corresponding value
# for each is a list of arguments to that directive.
rewrite_args_dict = defaultdict(list) # type: DefaultDict[str, List[str]]
rewrite_args_dict: DefaultDict[str, List[str]] = defaultdict(list)
pat = r'(.*directive\[\d+\]).*'
for match in rewrite_path:
m = re.match(pat, match)
@ -2250,7 +2252,7 @@ class ApacheConfigurator(common.Installer):
if ssl_vhost.aliases:
serveralias = "ServerAlias " + " ".join(ssl_vhost.aliases)
rewrite_rule_args = [] # type: List[str]
rewrite_rule_args: List[str] = []
if self.get_version() >= (2, 3, 9):
rewrite_rule_args = constants.REWRITE_HTTPS_ARGS_WITH_END
else:

View file

@ -1,10 +1,10 @@
""" Dual ParserNode implementation """
from certbot_apache._internal import apacheparser
from certbot_apache._internal import assertions
from certbot_apache._internal import augeasparser
from certbot_apache._internal import apacheparser
class DualNodeBase(object):
class DualNodeBase:
""" Dual parser interface for in development testing. This is used as the
base class for dual parser interface classes. This class handles runtime
attribute value assertions."""

View file

@ -1,9 +1,9 @@
"""A class that performs HTTP-01 challenges for Apache"""
import logging
import errno
import logging
from typing import List
from typing import Set
from acme.magic_typing import List
from acme.magic_typing import Set
from certbot import errors
from certbot.compat import filesystem
from certbot.compat import os
@ -57,7 +57,7 @@ class ApacheHttp01(common.ChallengePerformer):
self.challenge_dir = os.path.join(
self.configurator.config.work_dir,
"http_challenges")
self.moded_vhosts = set() # type: Set[VirtualHost]
self.moded_vhosts: Set[VirtualHost] = set()
def perform(self):
"""Perform all HTTP-01 challenges."""
@ -93,7 +93,7 @@ class ApacheHttp01(common.ChallengePerformer):
self.configurator.enable_mod(mod, temp=True)
def _mod_config(self):
selected_vhosts = [] # type: List[VirtualHost]
selected_vhosts: List[VirtualHost] = []
http_port = str(self.configurator.config.http01_port)
for chall in self.achalls:
# Search for matching VirtualHosts

View file

@ -100,12 +100,9 @@ For this reason the internal representation of data should not ignore the case.
"""
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class ParserNode(object):
class ParserNode(object, metaclass=abc.ABCMeta):
"""
ParserNode is the basic building block of the tree of such nodes,
representing the structure of the configuration. It is largely meant to keep
@ -204,9 +201,7 @@ class ParserNode(object):
"""
# Linter rule exclusion done because of https://github.com/PyCQA/pylint/issues/179
@six.add_metaclass(abc.ABCMeta) # pylint: disable=abstract-method
class CommentNode(ParserNode):
class CommentNode(ParserNode, metaclass=abc.ABCMeta):
"""
CommentNode class is used for representation of comments within the parsed
configuration structure. Because of the nature of comments, it is not able
@ -249,8 +244,7 @@ class CommentNode(ParserNode):
metadata=kwargs.get('metadata', {})) # pragma: no cover
@six.add_metaclass(abc.ABCMeta)
class DirectiveNode(ParserNode):
class DirectiveNode(ParserNode, metaclass=abc.ABCMeta):
"""
DirectiveNode class represents a configuration directive within the configuration.
It can have zero or more parameters attached to it. Because of the nature of
@ -325,8 +319,7 @@ class DirectiveNode(ParserNode):
"""
@six.add_metaclass(abc.ABCMeta)
class BlockNode(DirectiveNode):
class BlockNode(DirectiveNode, metaclass=abc.ABCMeta):
"""
BlockNode class represents a block of nested configuration directives, comments
and other blocks as its children. A BlockNode can have zero or more parameters

View file

@ -1,7 +1,7 @@
"""Module contains classes used by the Apache Configurator."""
import re
from typing import Set
from acme.magic_typing import Set
from certbot.plugins import common
@ -20,9 +20,6 @@ class Addr(common.Addr):
self.is_wildcard() and other.is_wildcard()))
return False
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
return "certbot_apache._internal.obj.Addr(" + repr(self.tup) + ")"
@ -98,7 +95,7 @@ class Addr(common.Addr):
return self.get_addr_obj(port)
class VirtualHost(object):
class VirtualHost:
"""Represents an Apache Virtualhost.
:ivar str filep: file path of VH
@ -140,7 +137,7 @@ class VirtualHost(object):
def get_names(self):
"""Return a set of all names."""
all_names = set() # type: Set[str]
all_names: Set[str] = set()
all_names.update(self.aliases)
# Strip out any scheme:// and <port> field from servername
if self.name is not None:
@ -191,9 +188,6 @@ class VirtualHost(object):
return False
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash((self.filep, self.path,
tuple(self.addrs), tuple(self.get_names()),
@ -251,7 +245,7 @@ class VirtualHost(object):
# already_found acts to keep everything very conservative.
# Don't allow multiple ip:ports in same set.
already_found = set() # type: Set[str]
already_found: Set[str] = set()
for addr in vhost.addrs:
for local_addr in self.addrs:

View file

@ -1,9 +1,9 @@
""" Distribution specific override class for CentOS family (RHEL, Fedora) """
import logging
from typing import List
import zope.interface
from acme.magic_typing import List
from certbot import errors
from certbot import interfaces
from certbot import util
@ -102,9 +102,9 @@ class CentOSConfigurator(configurator.ApacheConfigurator):
loadmods = self.parser.find_dir("LoadModule", "ssl_module", exclude=False)
correct_ifmods = [] # type: List[str]
loadmod_args = [] # type: List[str]
loadmod_paths = [] # type: List[str]
correct_ifmods: List[str] = []
loadmod_args: List[str] = []
loadmod_paths: List[str] = []
for m in loadmods:
noarg_path = m.rpartition("/")[0]
path_args = self.parser.get_all_args(noarg_path)

View file

@ -3,12 +3,9 @@ import copy
import fnmatch
import logging
import re
import sys
from typing import Dict
from typing import List
import six
from acme.magic_typing import Dict
from acme.magic_typing import List
from certbot import errors
from certbot.compat import os
from certbot_apache._internal import apache_util
@ -17,7 +14,7 @@ from certbot_apache._internal import constants
logger = logging.getLogger(__name__)
class ApacheParser(object):
class ApacheParser:
"""Class handles the fine details of parsing the Apache Configuration.
.. todo:: Make parsing general... remove sites-available etc...
@ -51,9 +48,9 @@ class ApacheParser(object):
"version 1.2.0 or higher, please make sure you have you have "
"those installed.")
self.modules = {} # type: Dict[str, str]
self.parser_paths = {} # type: Dict[str, List[str]]
self.variables = {} # type: Dict[str, str]
self.modules: Dict[str, str] = {}
self.parser_paths: Dict[str, List[str]] = {}
self.variables: Dict[str, str] = {}
# Find configuration root and make sure augeas can parse it.
self.root = os.path.abspath(root)
@ -266,7 +263,7 @@ class ApacheParser(object):
the iteration issue. Else... parse and enable mods at same time.
"""
mods = {} # type: Dict[str, str]
mods: Dict[str, str] = {}
matches = self.find_dir("LoadModule")
iterator = iter(matches)
# Make sure prev_size != cur_size for do: while: iteration
@ -275,7 +272,7 @@ class ApacheParser(object):
while len(mods) != prev_size:
prev_size = len(mods)
for match_name, match_filename in six.moves.zip(
for match_name, match_filename in zip(
iterator, iterator):
mod_name = self.get_arg(match_name)
mod_filename = self.get_arg(match_filename)
@ -553,7 +550,7 @@ class ApacheParser(object):
else:
arg_suffix = "/*[self::arg=~regexp('%s')]" % case_i(arg)
ordered_matches = [] # type: List[str]
ordered_matches: List[str] = []
# TODO: Wildcards should be included in alphabetical order
# https://httpd.apache.org/docs/2.4/mod/core.html#include
@ -738,9 +735,6 @@ class ApacheParser(object):
:rtype: str
"""
if sys.version_info < (3, 6):
# This strips off final /Z(?ms)
return fnmatch.translate(clean_fn_match)[:-7] # pragma: no cover
# Since Python 3.6, it returns a different pattern like (?s:.*\.load)\Z
return fnmatch.translate(clean_fn_match)[4:-3] # pragma: no cover

View file

@ -1,7 +1,7 @@
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
# Remember to update local-oldest-requirements.txt when changing the minimum
# acme/certbot version.

View file

@ -107,7 +107,7 @@ class AugeasParserNodeTest(util.ApacheTest): # pylint: disable=too-many-public-
def test_set_parameters(self):
servernames = self.config.parser_root.find_directives("servername")
names = [] # type: List[str]
names: List[str] = []
for servername in servernames:
names += servername.parameters
self.assertFalse("going_to_set_this" in names)

View file

@ -7,7 +7,6 @@ try:
import mock
except ImportError: # pragma: no cover
from unittest import mock # type: ignore
import six # pylint: disable=unused-import # six is used in mock.patch()
from certbot import errors
from certbot_apache._internal import constants

View file

@ -10,7 +10,6 @@ try:
import mock
except ImportError: # pragma: no cover
from unittest import mock # type: ignore
import six # pylint: disable=unused-import # six is used in mock.patch()
from acme import challenges
from certbot import achallenges
@ -726,7 +725,7 @@ class MultipleVhostsTest(util.ApacheTest):
# This calls open
self.config.reverter.register_file_creation = mock.Mock()
mock_open.side_effect = IOError
with mock.patch("six.moves.builtins.open", mock_open):
with mock.patch("builtins.open", mock_open):
self.assertRaises(
errors.PluginError,
self.config.make_vhost_ssl, self.vh_truth[0])
@ -1834,7 +1833,7 @@ class InstallSslOptionsConfTest(util.ApacheTest):
def test_open_module_file(self):
mock_open = mock.mock_open(read_data="testing 12 3")
with mock.patch("six.moves.builtins.open", mock_open):
with mock.patch("builtins.open", mock_open):
self.assertEqual(self.config._open_module_file("/nonsense/"), "testing 12 3")
if __name__ == "__main__":

View file

@ -26,7 +26,7 @@ class ApacheHttp01Test(util.ApacheTest):
super(ApacheHttp01Test, self).setUp(*args, **kwargs)
self.account_key = self.rsa512jwk
self.achalls = [] # type: List[achallenges.KeyAuthorizationAnnotatedChallenge]
self.achalls: List[achallenges.KeyAuthorizationAnnotatedChallenge] = []
vh_truth = util.get_vh_truth(
self.temp_dir, "debian_apache_2_4/multiple_vhosts")
# Takes the vhosts for encryption-example.demo, certbot.demo

View file

@ -31,7 +31,7 @@ if [ -z "$VENV_PATH" ]; then
fi
VENV_BIN="$VENV_PATH/bin"
BOOTSTRAP_VERSION_PATH="$VENV_PATH/certbot-auto-bootstrap-version.txt"
LE_AUTO_VERSION="1.12.0"
LE_AUTO_VERSION="1.13.0"
BASENAME=$(basename $0)
USAGE="Usage: $BASENAME [OPTIONS]
A self-updating wrapper script for the Certbot ACME client. When run, updates
@ -806,6 +806,7 @@ elif [ -f /etc/mageia-release ]; then
NO_SELF_UPGRADE=1
elif [ -f /etc/redhat-release ]; then
DEPRECATED_OS=1
NO_SELF_UPGRADE=1
# Run DeterminePythonVersion to decide on the basis of available Python versions
# whether to use 2.x or 3.x on RedHat-like systems.
# Then, revert LE_PYTHON to its previous state.
@ -1487,18 +1488,18 @@ letsencrypt==0.7.0 \
--hash=sha256:105a5fb107e45bcd0722eb89696986dcf5f08a86a321d6aef25a0c7c63375ade \
--hash=sha256:c36e532c486a7e92155ee09da54b436a3c420813ec1c590b98f635d924720de9
certbot==1.12.0 \
--hash=sha256:f4bb3da5391e4a28e9a2e52ab54986171c0864feff17eaaaca6729a1d4c433a6 \
--hash=sha256:5ee738773479bcb7794e43fedd2415acc0969b75bdd2a21f451e3bff9d99df59
acme==1.12.0 \
--hash=sha256:ca4ad044429f1b8b670b958e5c7ea38159def9d601f4af2359355993918c3317 \
--hash=sha256:aa363474d50e9fdda27acb8b1aa7efb26fecc5650e02039a0de3a3f0e696c2f2
certbot-apache==1.12.0 \
--hash=sha256:38899f6fa08799de9535795d919acf968f288d7208909baf7733f9a763c15227 \
--hash=sha256:e5679b40d99bd241f4fcd9fe44b73e6e25ccc969a617131ff6ebc90d562a49f2
certbot-nginx==1.12.0 \
--hash=sha256:332cd70067bbcf6db52a002650ffa4844d0bd9780279d662aa6725b43f776c14 \
--hash=sha256:3fb6a55290d37ad466681a89a85ceca4c4026fdd8702f3010b87a74266a6fe7b
certbot==1.13.0 \
--hash=sha256:082eb732e1318bb9605afa7aea8db2c2f4c5029d523c73f24c6aa98f03caff76 \
--hash=sha256:64cf41b57df7667d9d849fcaa9031a4f151788246733d1f4c3f37a5aa5e2f458
acme==1.13.0 \
--hash=sha256:93b6365c9425de03497a6b8aee1107814501d2974499b42e9bcc9a7378771143 \
--hash=sha256:6b4257dfd6a6d5f01e8cd4f0b10422c17836bed7c67e9c5b0a0ad6c7d651c088
certbot-apache==1.13.0 \
--hash=sha256:36ed02ac7d2d91febee8dd3181ae9095b3f06434c9ed8959fbc6db24ab4da2e8 \
--hash=sha256:4b5a16e80c1418e2edc05fc2578f522fb24974b2c13eb747cdfeef69e5bd5ae1
certbot-nginx==1.13.0 \
--hash=sha256:3ff271f65321b25c77a868af21f76f58754a7d61529ad565a1d66e29c711120f \
--hash=sha256:9e972cc19c0fa9e5b7863da0423b156fbfb5623fd30b558fd2fd6d21c24c0b08
UNLIKELY_EOF
# -------------------------------------------------------------------------

View file

@ -1,5 +1,4 @@
#!/usr/bin/env python
from __future__ import print_function
import os
import sys

View file

@ -7,7 +7,7 @@ import tempfile
from certbot_integration_tests.utils import certbot_call
class IntegrationTestsContext(object):
class IntegrationTestsContext:
"""General fixture describing a certbot integration tests context"""
def __init__(self, request):
self.request = request

View file

@ -1,5 +1,4 @@
"""Module executing integration tests against certbot core."""
from __future__ import print_function
import os
from os.path import exists

View file

@ -6,7 +6,6 @@ for a directory a specific configuration using built-in pytest hooks.
See https://docs.pytest.org/en/latest/reference.html#hook-reference
"""
from __future__ import print_function
import contextlib
import subprocess
import sys

View file

@ -1,8 +1,8 @@
"""Module executing integration tests against certbot with nginx plugin."""
import os
import ssl
from typing import List
import pytest
from certbot_integration_tests.nginx_tests import context as nginx_context
@ -32,8 +32,8 @@ def test_context(request):
'--preferred-challenges', 'http'
], {'default_server': False}),
], indirect=['context'])
def test_certificate_deployment(certname_pattern, params, context):
# type: (str, List[str], nginx_context.IntegrationTestsContext) -> None
def test_certificate_deployment(certname_pattern: str, params: List[str],
context: nginx_context.IntegrationTestsContext) -> None:
"""
Test various scenarios to deploy a certificate to nginx using certbot.
"""

View file

@ -1,7 +1,7 @@
"""Module to handle the context of RFC2136 integration tests."""
import tempfile
from contextlib import contextmanager
import tempfile
from pkg_resources import resource_filename
from pytest import skip

View file

@ -1,6 +1,5 @@
#!/usr/bin/env python
"""Module to setup an ACME CA server environment able to run multiple tests in parallel"""
from __future__ import print_function
import argparse
import errno
@ -12,18 +11,18 @@ import subprocess
import sys
import tempfile
import time
from typing import List
import requests
# pylint: disable=wildcard-import,unused-wildcard-import
from certbot_integration_tests.utils import misc
from certbot_integration_tests.utils import pebble_artifacts
from certbot_integration_tests.utils import proxy
# pylint: disable=wildcard-import,unused-wildcard-import
from certbot_integration_tests.utils.constants import *
class ACMEServer(object):
class ACMEServer:
"""
ACMEServer configures and handles the lifecycle of an ACME CA server and an HTTP reverse proxy
instance, to allow parallel execution of integration tests against the unique http-01 port
@ -52,7 +51,7 @@ class ACMEServer(object):
self._acme_type = 'pebble' if acme_server == 'pebble' else 'boulder'
self._proxy = http_proxy
self._workspace = tempfile.mkdtemp()
self._processes = [] # type: List[subprocess.Popen]
self._processes: List[subprocess.Popen] = []
self._stdout = sys.stdout if stdout else open(os.devnull, 'w')
self._dns_server = dns_server
self._http_01_port = http_01_port

View file

@ -1,6 +1,5 @@
#!/usr/bin/env python
"""Module to call certbot in test mode"""
from __future__ import absolute_import
import os
import subprocess

View file

@ -1,7 +1,5 @@
#!/usr/bin/env python
"""Module to setup an RFC2136-capable DNS server"""
from __future__ import print_function
import os
import os.path
import shutil
@ -21,7 +19,7 @@ BIND_BIND_ADDRESS = ("127.0.0.1", 45953)
BIND_TEST_QUERY = bytearray.fromhex("0011cb37000000010000000000000000010003")
class DNSServer(object):
class DNSServer:
"""
DNSServer configures and handles the lifetime of an RFC2136-capable server.
DNServer provides access to the dns_xdist parameter, listing the address and port
@ -40,7 +38,7 @@ class DNSServer(object):
self.bind_root = tempfile.mkdtemp()
self.process = None # type: subprocess.Popen
self.process: subprocess.Popen = None
self.dns_xdist = {"address": BIND_BIND_ADDRESS[0], "port": BIND_BIND_ADDRESS[1]}
@ -113,8 +111,7 @@ class DNSServer(object):
self.stop()
raise
def _wait_until_ready(self, attempts=30):
# type: (int) -> None
def _wait_until_ready(self, attempts: int = 30) -> None:
"""
Polls the DNS server over TCP until it gets a response, or until
it runs out of attempts and raises a ValueError.

View file

@ -4,10 +4,12 @@ or outside during setup/teardown of the integration tests environment.
"""
import contextlib
import errno
import http.server as SimpleHTTPServer
import multiprocessing
import os
import re
import shutil
import socketserver
import stat
import sys
import tempfile
@ -23,11 +25,9 @@ from cryptography.x509 import load_pem_x509_certificate
from OpenSSL import crypto
import pkg_resources
import requests
from six.moves import SimpleHTTPServer
from six.moves import socketserver
from certbot_integration_tests.utils.constants import \
PEBBLE_ALTERNATE_ROOTS, PEBBLE_MANAGEMENT_URL
from certbot_integration_tests.utils.constants import PEBBLE_ALTERNATE_ROOTS
from certbot_integration_tests.utils.constants import PEBBLE_MANAGEMENT_URL
RSA_KEY_TYPE = 'rsa'
ECDSA_KEY_TYPE = 'ecdsa'
@ -311,7 +311,7 @@ def echo(keyword, path=None):
if not re.match(r'^\w+$', keyword):
raise ValueError('Error, keyword `{0}` is not a single keyword.'
.format(keyword))
return '{0} -c "from __future__ import print_function; print(\'{1}\')"{2}'.format(
return '{0} -c "print(\'{1}\')"{2}'.format(
os.path.basename(sys.executable), keyword, ' >> "{0}"'.format(path) if path else '')

View file

@ -7,7 +7,8 @@ import stat
import pkg_resources
import requests
from certbot_integration_tests.utils.constants import DEFAULT_HTTP_01_PORT, MOCK_OCSP_SERVER_PORT
from certbot_integration_tests.utils.constants import DEFAULT_HTTP_01_PORT
from certbot_integration_tests.utils.constants import MOCK_OCSP_SERVER_PORT
PEBBLE_VERSION = 'v2.3.0'
ASSETS_PATH = pkg_resources.resource_filename('certbot_integration_tests', 'assets')

View file

@ -4,6 +4,7 @@ This runnable module interfaces itself with the Pebble management interface in o
to serve a mock OCSP responder during integration tests against Pebble.
"""
import datetime
import http.server as BaseHTTPServer
import re
from cryptography import x509
@ -13,7 +14,6 @@ from cryptography.hazmat.primitives import serialization
from cryptography.x509 import ocsp
from dateutil import parser
import requests
from six.moves import BaseHTTPServer
from certbot_integration_tests.utils.constants import MOCK_OCSP_SERVER_PORT
from certbot_integration_tests.utils.constants import PEBBLE_MANAGEMENT_URL

View file

@ -1,12 +1,12 @@
#!/usr/bin/env python
# pylint: disable=missing-module-docstring
import http.server as BaseHTTPServer
import json
import re
import sys
import requests
from six.moves import BaseHTTPServer
from certbot_integration_tests.utils.misc import GracefulTCPServer

View file

@ -18,7 +18,6 @@ install_requires = [
'python-dateutil',
'pyyaml',
'requests',
'six'
]
# Add pywin32 on Windows platforms to handle low-level system calls.

View file

@ -1,9 +1,10 @@
#!/usr/bin/env python3
import pytest
import subprocess
import glob
import os
import re
import subprocess
import pytest
@pytest.fixture(autouse=True, scope="module")

View file

@ -6,7 +6,7 @@ for a directory a specific configuration using built-in pytest hooks.
See https://docs.pytest.org/en/latest/reference.html#hook-reference
"""
from __future__ import print_function
import os
ROOT_PATH = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))

View file

@ -1,8 +1,8 @@
import os
import re
import subprocess
import time
import unittest
import subprocess
import re
@unittest.skipIf(os.name != 'nt', reason='Windows installer tests must be run on Windows.')

View file

@ -11,7 +11,7 @@ from certbot_compatibility_test import util
logger = logging.getLogger(__name__)
class Proxy(object):
class Proxy:
"""A common base for compatibility test configurators"""
@classmethod

View file

@ -2,10 +2,10 @@
import os
import shutil
import subprocess
from typing import Set
import zope.interface
from acme.magic_typing import Set
from certbot._internal import configuration
from certbot_compatibility_test import errors
from certbot_compatibility_test import interfaces
@ -68,7 +68,7 @@ def _get_server_root(config):
def _get_names(config):
"""Returns all and testable domain names in config"""
all_names = set() # type: Set[str]
all_names: Set[str] = set()
for root, _dirs, files in os.walk(config):
for this_file in files:
update_names = _get_server_names(root, this_file)

View file

@ -8,6 +8,8 @@ import shutil
import sys
import tempfile
import time
from typing import List
from typing import Tuple
import OpenSSL
from urllib3.util import connection
@ -15,8 +17,6 @@ from urllib3.util import connection
from acme import challenges
from acme import crypto_util
from acme import messages
from acme.magic_typing import List
from acme.magic_typing import Tuple
from certbot import achallenges
from certbot import errors as le_errors
from certbot.tests import acme_util
@ -178,7 +178,7 @@ def test_enhancements(plugin, domains):
"enhancements")
return False
domains_and_info = [(domain, []) for domain in domains] # type: List[Tuple[str, List[bool]]]
domains_and_info: List[Tuple[str, List[bool]]] = [(domain, []) for domain in domains]
for domain, info in domains_and_info:
try:

View file

@ -3,8 +3,6 @@ import logging
import socket
import requests
import six
from six.moves import xrange
from acme import crypto_util
from acme import errors as acme_errors
@ -12,18 +10,18 @@ from acme import errors as acme_errors
logger = logging.getLogger(__name__)
class Validator(object):
class Validator:
"""Collection of functions to test a live webserver's configuration"""
def certificate(self, cert, name, alt_host=None, port=443):
"""Verifies the certificate presented at name is cert"""
if alt_host is None:
host = socket.gethostbyname(name).encode()
elif isinstance(alt_host, six.binary_type):
elif isinstance(alt_host, bytes):
host = alt_host
else:
host = alt_host.encode()
name = name if isinstance(name, six.binary_type) else name.encode()
name = name if isinstance(name, bytes) else name.encode()
try:
presented_cert = crypto_util.probe_sni(name, host, port)
@ -62,7 +60,7 @@ class Validator(object):
else:
response = requests.get(url, allow_redirects=False)
return response.status_code in xrange(300, 309)
return response.status_code in range(300, 309)
def hsts(self, name):
"""Test for HTTP Strict Transport Security header"""

View file

@ -3,12 +3,11 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
install_requires = [
'certbot',
'certbot-apache',
'six',
'requests',
'zope.interface',
]

View file

@ -1,13 +1,12 @@
"""DNS Authenticator for Cloudflare."""
import logging
from typing import Any
from typing import Dict
from typing import List
import CloudFlare
import zope.interface
from acme.magic_typing import Any
from acme.magic_typing import Dict
from acme.magic_typing import List
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
@ -85,7 +84,7 @@ class Authenticator(dns_common.DNSAuthenticator):
return _CloudflareClient(self.credentials.conf('email'), self.credentials.conf('api-key'))
class _CloudflareClient(object):
class _CloudflareClient:
"""
Encapsulates all communication with the Cloudflare API.
"""
@ -173,7 +172,7 @@ class _CloudflareClient(object):
"""
zone_name_guesses = dns_common.base_domain_name_guesses(domain)
zones = [] # type: List[Dict[str, Any]]
zones: List[Dict[str, Any]] = []
code = msg = None
for zone_name in zone_name_guesses:

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
# Remember to update local-oldest-requirements.txt when changing the minimum
# acme/certbot version.

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
# Remember to update local-oldest-requirements.txt when changing the minimum
# acme/certbot version.

View file

@ -21,6 +21,7 @@ class Authenticator(dns_common.DNSAuthenticator):
description = 'Obtain certificates using a DNS TXT record (if you are ' + \
'using DigitalOcean for DNS).'
ttl = 30
def __init__(self, *args, **kwargs):
super(Authenticator, self).__init__(*args, **kwargs)
@ -45,7 +46,8 @@ class Authenticator(dns_common.DNSAuthenticator):
)
def _perform(self, domain, validation_name, validation):
self._get_digitalocean_client().add_txt_record(domain, validation_name, validation)
self._get_digitalocean_client().add_txt_record(domain, validation_name, validation,
self.ttl)
def _cleanup(self, domain, validation_name, validation):
self._get_digitalocean_client().del_txt_record(domain, validation_name, validation)
@ -54,7 +56,7 @@ class Authenticator(dns_common.DNSAuthenticator):
return _DigitalOceanClient(self.credentials.conf('token'))
class _DigitalOceanClient(object):
class _DigitalOceanClient:
"""
Encapsulates all communication with the DigitalOcean API.
"""
@ -62,13 +64,15 @@ class _DigitalOceanClient(object):
def __init__(self, token):
self.manager = digitalocean.Manager(token=token)
def add_txt_record(self, domain_name, record_name, record_content):
def add_txt_record(self, domain_name: str, record_name: str, record_content: str,
record_ttl: int):
"""
Add a TXT record using the supplied information.
:param str domain_name: The domain to use to associate the record with.
:param str record_name: The record name (typically beginning with '_acme-challenge.').
:param str record_content: The record content (typically the challenge validation).
:param int record_ttl: The record TTL.
:raises certbot.errors.PluginError: if an error occurs communicating with the DigitalOcean
API
"""
@ -89,7 +93,8 @@ class _DigitalOceanClient(object):
result = domain.create_new_domain_record(
type='TXT',
name=self._compute_record_name(domain, record_name),
data=record_content)
data=record_content,
ttl=record_ttl) # ttl kwarg is only effective starting python-digitalocean 1.15.0
record_id = result['domain_record']['id']
@ -99,7 +104,7 @@ class _DigitalOceanClient(object):
raise errors.PluginError('Error adding TXT record using the DigitalOcean API: {0}'
.format(e))
def del_txt_record(self, domain_name, record_name, record_content):
def del_txt_record(self, domain_name: str, record_name: str, record_content: str):
"""
Delete a TXT record using the supplied information.

View file

@ -4,14 +4,13 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
# Remember to update local-oldest-requirements.txt when changing the minimum
# acme/certbot version.
install_requires = [
'python-digitalocean>=1.11',
'python-digitalocean>=1.11', # 1.15.0 or newer is recommended for TTL support
'setuptools>=39.0.1',
'six>=1.11.0',
'zope.interface',
]

View file

@ -40,7 +40,7 @@ class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthentic
def test_perform(self):
self.auth.perform([self.achall])
expected = [mock.call.add_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY)]
expected = [mock.call.add_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY, 30)]
self.assertEqual(expected, self.mock_client.mock_calls)
def test_cleanup(self):
@ -58,6 +58,7 @@ class DigitalOceanClientTest(unittest.TestCase):
record_prefix = "_acme-challenge"
record_name = record_prefix + "." + DOMAIN
record_content = "bar"
record_ttl = 60
def setUp(self):
from certbot_dns_digitalocean._internal.dns_digitalocean import _DigitalOceanClient
@ -78,25 +79,27 @@ class DigitalOceanClientTest(unittest.TestCase):
self.manager.get_all_domains.return_value = [wrong_domain_mock, domain_mock]
self.digitalocean_client.add_txt_record(DOMAIN, self.record_name, self.record_content)
self.digitalocean_client.add_txt_record(DOMAIN, self.record_name, self.record_content,
self.record_ttl)
domain_mock.create_new_domain_record.assert_called_with(type='TXT',
name=self.record_prefix,
data=self.record_content)
data=self.record_content,
ttl=self.record_ttl)
def test_add_txt_record_fail_to_find_domain(self):
self.manager.get_all_domains.return_value = []
self.assertRaises(errors.PluginError,
self.digitalocean_client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
DOMAIN, self.record_name, self.record_content, self.record_ttl)
def test_add_txt_record_error_finding_domain(self):
self.manager.get_all_domains.side_effect = API_ERROR
self.assertRaises(errors.PluginError,
self.digitalocean_client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
DOMAIN, self.record_name, self.record_content, self.record_ttl)
def test_add_txt_record_error_creating_record(self):
domain_mock = mock.MagicMock()
@ -107,7 +110,7 @@ class DigitalOceanClientTest(unittest.TestCase):
self.assertRaises(errors.PluginError,
self.digitalocean_client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
DOMAIN, self.record_name, self.record_content, self.record_ttl)
def test_del_txt_record(self):
first_record_mock = mock.MagicMock()

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
# Remember to update local-oldest-requirements.txt when changing the minimum
# acme/certbot version.

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
# Remember to update local-oldest-requirements.txt when changing the minimum
# acme/certbot version.

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
# Please update tox.ini when modifying dependency version requirements
install_requires = [

View file

@ -76,7 +76,7 @@ class Authenticator(dns_common.DNSAuthenticator):
return _GoogleClient(self.conf('credentials'))
class _GoogleClient(object):
class _GoogleClient:
"""
Encapsulates all communication with the Google Cloud DNS API.
"""

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
# Remember to update local-oldest-requirements.txt when changing the minimum
# acme/certbot version.

View file

@ -401,7 +401,7 @@ class GoogleClientTest(unittest.TestCase):
self.assertRaises(ServerNotFoundError, _GoogleClient.get_project_id)
class DummyResponse(object):
class DummyResponse:
"""
Dummy object to create a fake HTTPResponse (the actual one requires a socket and we only
need the status attribute)

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
# Please update tox.ini when modifying dependency version requirements
install_requires = [

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
# Remember to update local-oldest-requirements.txt when changing the minimum
# acme/certbot version.

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
# Remember to update local-oldest-requirements.txt when changing the minimum
# acme/certbot version.

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
# Remember to update local-oldest-requirements.txt when changing the minimum
# acme/certbot version.

View file

@ -88,7 +88,7 @@ class Authenticator(dns_common.DNSAuthenticator):
dns.tsig.HMAC_MD5))
class _RFC2136Client(object):
class _RFC2136Client:
"""
Encapsulates all communication with the target DNS server.
"""

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
# Remember to update local-oldest-requirements.txt when changing the minimum
# acme/certbot version.

View file

@ -2,15 +2,15 @@
import collections
import logging
import time
from typing import DefaultDict
from typing import Dict
from typing import List
import boto3
from botocore.exceptions import ClientError
from botocore.exceptions import NoCredentialsError
import zope.interface
from acme.magic_typing import DefaultDict
from acme.magic_typing import Dict
from acme.magic_typing import List
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
@ -39,7 +39,7 @@ class Authenticator(dns_common.DNSAuthenticator):
def __init__(self, *args, **kwargs):
super(Authenticator, self).__init__(*args, **kwargs)
self.r53 = boto3.client("route53")
self._resource_records = collections.defaultdict(list) # type: DefaultDict[str, List[Dict[str, str]]]
self._resource_records: DefaultDict[str, List[Dict[str, str]]] = collections.defaultdict(list)
def more_info(self): # pylint: disable=missing-function-docstring
return "Solve a DNS01 challenge using AWS Route53"

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
# Remember to update local-oldest-requirements.txt when changing the minimum
# acme/certbot version.

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
# Please update tox.ini when modifying dependency version requirements
install_requires = [

View file

@ -1,3 +1,4 @@
# pylint: disable=too-many-lines
"""Nginx Configuration"""
from distutils.version import LooseVersion
import logging
@ -6,6 +7,12 @@ import socket
import subprocess
import tempfile
import time
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Text
from typing import Tuple
import OpenSSL
import pkg_resources
@ -13,10 +20,6 @@ import zope.interface
from acme import challenges
from acme import crypto_util as acme_crypto_util
from acme.magic_typing import Dict
from acme.magic_typing import List
from acme.magic_typing import Set
from acme.magic_typing import Text
from certbot import crypto_util
from certbot import errors
from certbot import interfaces
@ -105,18 +108,18 @@ class NginxConfigurator(common.Installer):
self.save_notes = ""
# For creating new vhosts if no names match
self.new_vhost = None
self.new_vhost: Optional[obj.VirtualHost] = None
# List of vhosts configured per wildcard domain on this run.
# used by deploy_cert() and enhance()
self._wildcard_vhosts = {} # type: Dict[str, List[obj.VirtualHost]]
self._wildcard_redirect_vhosts = {} # type: Dict[str, List[obj.VirtualHost]]
self._wildcard_vhosts: Dict[str, List[obj.VirtualHost]] = {}
self._wildcard_redirect_vhosts: Dict[str, List[obj.VirtualHost]] = {}
# Add number of outstanding challenges
self._chall_out = 0
# These will be set in the prepare function
self.parser = None
self.parser: Optional[parser.NginxParser] = None
self.version = version
self.openssl_version = openssl_version
self._enhance_func = {"redirect": self._enable_redirect,
@ -377,10 +380,13 @@ class NginxConfigurator(common.Installer):
ipv6only_present = True
return (ipv6_active, ipv6only_present)
def _vhost_from_duplicated_default(self, domain, allow_port_mismatch, port):
def _vhost_from_duplicated_default(self, domain: str, allow_port_mismatch: bool, port: str
) -> obj.VirtualHost:
"""if allow_port_mismatch is False, only server blocks with matching ports will be
used as a default server block template.
"""
assert self.parser is not None # prepare should already have been called here
if self.new_vhost is None:
default_vhost = self._get_default_vhost(domain, allow_port_mismatch, port)
self.new_vhost = self.parser.duplicate_vhost(default_vhost,
@ -509,7 +515,7 @@ class NginxConfigurator(common.Installer):
match['rank'] += NO_SSL_MODIFIER
return sorted(matches, key=lambda x: x['rank'])
def choose_redirect_vhosts(self, target_name, port, create_if_no_match=False):
def choose_redirect_vhosts(self, target_name: str, port: str) -> List[obj.VirtualHost]:
"""Chooses a single virtual host for redirect enhancement.
Chooses the vhost most closely matching target_name that is
@ -523,9 +529,6 @@ class NginxConfigurator(common.Installer):
:param str target_name: domain name
:param str port: port number
:param bool create_if_no_match: If we should create a new vhost from default
when there is no match found. If we can't choose a default, raise a
MisconfigurationError.
:returns: vhosts associated with name
:rtype: list of :class:`~certbot_nginx._internal.obj.VirtualHost`
@ -538,32 +541,75 @@ class NginxConfigurator(common.Installer):
else:
matches = self._get_redirect_ranked_matches(target_name, port)
vhosts = [x for x in [self._select_best_name_match(matches)]if x is not None]
if not vhosts and create_if_no_match:
vhosts = [self._vhost_from_duplicated_default(target_name, False, port)]
return vhosts
def _port_matches(self, test_port, matching_port):
def choose_auth_vhosts(self, target_name: str) -> Tuple[List[obj.VirtualHost],
List[obj.VirtualHost]]:
"""Returns a list of HTTP and HTTPS vhosts with a server_name matching target_name.
If no HTTP vhost exists, one will be cloned from the default vhost. If that fails, no HTTP
vhost will be returned.
:param str target_name: non-wildcard domain name
:returns: tuple of HTTP and HTTPS virtualhosts
:rtype: tuple of :class:`~certbot_nginx._internal.obj.VirtualHost`
"""
vhosts = [m['vhost'] for m in self._get_ranked_matches(target_name) if m and 'vhost' in m]
http_vhosts = [vh for vh in vhosts if
self._vhost_listening(vh, str(self.config.http01_port), False)]
https_vhosts = [vh for vh in vhosts if
self._vhost_listening(vh, str(self.config.https_port), True)]
# If no HTTP vhost matches, try create one from the default_server on http01_port.
if not http_vhosts:
try:
http_vhosts = [self._vhost_from_duplicated_default(target_name, False,
str(self.config.http01_port))]
except errors.MisconfigurationError:
http_vhosts = []
return http_vhosts, https_vhosts
def _port_matches(self, test_port: str, matching_port: str) -> bool:
# test_port is a number, matching is a number or "" or None
if matching_port == "" or matching_port is None:
# if no port is specified, Nginx defaults to listening on port 80.
return test_port == self.DEFAULT_LISTEN_PORT
return test_port == matching_port
def _vhost_listening_on_port_no_ssl(self, vhost, port):
found_matching_port = False
if not vhost.addrs:
# if there are no listen directives at all, Nginx defaults to
# listening on port 80.
found_matching_port = (port == self.DEFAULT_LISTEN_PORT)
else:
for addr in vhost.addrs:
if self._port_matches(port, addr.get_port()) and not addr.ssl:
found_matching_port = True
def _vhost_listening(self, vhost: obj.VirtualHost, port: str, ssl: bool) -> bool:
"""Tests whether a vhost has an address listening on a port with SSL enabled or disabled.
if found_matching_port:
# make sure we don't have an 'ssl on' directive
return not self.parser.has_ssl_on_directive(vhost)
return False
:param `obj.VirtualHost` vhost: The vhost whose addresses will be tested
:param port str: The port number as a string that the address should be bound to
:param bool ssl: Whether SSL should be enabled or disabled on the address
:returns: Whether the vhost has an address listening on the port and protocol.
:rtype: bool
"""
assert self.parser is not None # prepare should already have been called here
# if the 'ssl on' directive is present on the vhost, all its addresses have SSL enabled
all_addrs_are_ssl = self.parser.has_ssl_on_directive(vhost)
# if we want ssl vhosts: either 'ssl on' or 'addr.ssl' should be enabled
# if we want plaintext vhosts: neither 'ssl on' nor 'addr.ssl' should be enabled
_ssl_matches = lambda addr: addr.ssl or all_addrs_are_ssl if ssl else \
not addr.ssl and not all_addrs_are_ssl
# if there are no listen directives at all, Nginx defaults to
# listening on port 80.
if not vhost.addrs:
return port == self.DEFAULT_LISTEN_PORT and ssl == all_addrs_are_ssl
return any(self._port_matches(port, addr.get_port()) and _ssl_matches(addr)
for addr in vhost.addrs)
def _vhost_listening_on_port_no_ssl(self, vhost: obj.VirtualHost, port: str) -> bool:
return self._vhost_listening(vhost, port, False)
def _get_redirect_ranked_matches(self, target_name, port):
"""Gets a ranked list of plaintextish port-listening vhosts matching target_name
@ -595,7 +641,7 @@ class NginxConfigurator(common.Installer):
:rtype: set
"""
all_names = set() # type: Set[str]
all_names: Set[str] = set()
for vhost in self.parser.get_vhosts():
try:
@ -1176,7 +1222,7 @@ def nginx_restart(nginx_ctl, nginx_conf, sleep_duration):
"""
try:
reload_output = u"" # type: Text
reload_output: Text = u""
with tempfile.TemporaryFile() as out:
proc = subprocess.Popen([nginx_ctl, "-c", nginx_conf, "-s", "reload"],
env=util.env_no_snap_for_external_calls(),

View file

@ -1,8 +1,7 @@
"""nginx plugin constants."""
import platform
from acme.magic_typing import Any
from acme.magic_typing import Dict
from typing import Any
from typing import Dict
FREEBSD_DARWIN_SERVER_ROOT = "/usr/local/etc/nginx"
LINUX_SERVER_ROOT = "/etc/nginx"
@ -15,11 +14,11 @@ elif platform.system() in ('NetBSD',):
else:
server_root_tmp = LINUX_SERVER_ROOT
CLI_DEFAULTS = dict(
CLI_DEFAULTS: Dict[str, Any] = dict(
server_root=server_root_tmp,
ctl="nginx",
sleep_seconds=1
) # type: Dict[str, Any]
)
"""CLI defaults."""

View file

@ -2,9 +2,11 @@
import io
import logging
from typing import List
from typing import Optional
from acme import challenges
from acme.magic_typing import List
from certbot import achallenges
from certbot import errors
from certbot.compat import os
from certbot.plugins import common
@ -111,7 +113,7 @@ class NginxHttp01(common.ChallengePerformer):
:returns: list of :class:`certbot_nginx._internal.obj.Addr` to apply
:rtype: list
"""
addresses = [] # type: List[obj.Addr]
addresses: List[obj.Addr] = []
default_addr = "%s" % self.configurator.config.http01_port
ipv6_addr = "[::]:{0}".format(
self.configurator.config.http01_port)
@ -138,13 +140,12 @@ class NginxHttp01(common.ChallengePerformer):
def _get_validation_path(self, achall):
return os.sep + os.path.join(challenges.HTTP01.URI_ROOT_PATH, achall.chall.encode("token"))
def _make_server_block(self, achall):
def _make_server_block(self, achall: achallenges.KeyAuthorizationAnnotatedChallenge) -> List:
"""Creates a server block for a challenge.
:param achall: Annotated HTTP-01 challenge
:type achall:
:class:`certbot.achallenges.KeyAuthorizationAnnotatedChallenge`
:param list addrs: addresses of challenged domain
:class:`list` of type :class:`~nginx.obj.Addr`
:type achall: :class:`certbot.achallenges.KeyAuthorizationAnnotatedChallenge`
:returns: server block for the challenge host
:rtype: list
"""
@ -172,34 +173,35 @@ class NginxHttp01(common.ChallengePerformer):
return location_directive
def _make_or_mod_server_block(self, achall):
"""Modifies a server block to respond to a challenge.
def _make_or_mod_server_block(self, achall: achallenges.KeyAuthorizationAnnotatedChallenge
) -> Optional[List]:
"""Modifies server blocks to respond to a challenge. Returns a new HTTP server block
to add to the configuration if an existing one can't be found.
:param achall: Annotated HTTP-01 challenge
:type achall:
:class:`certbot.achallenges.KeyAuthorizationAnnotatedChallenge`
:type achall: :class:`certbot.achallenges.KeyAuthorizationAnnotatedChallenge`
:returns: new server block to be added, if any
:rtype: list
"""
try:
vhosts = self.configurator.choose_redirect_vhosts(achall.domain,
'%i' % self.configurator.config.http01_port, create_if_no_match=True)
except errors.MisconfigurationError:
http_vhosts, https_vhosts = self.configurator.choose_auth_vhosts(achall.domain)
new_vhost: Optional[list] = None
if not http_vhosts:
# Couldn't find either a matching name+port server block
# or a port+default_server block, so create a dummy block
return self._make_server_block(achall)
new_vhost = self._make_server_block(achall)
# len is max 1 because Nginx doesn't authenticate wildcards
# if len were or vhosts None, we would have errored
vhost = vhosts[0]
# Modify any existing server blocks
for vhost in set(http_vhosts + https_vhosts):
location_directive = [self._location_directive_for_achall(achall)]
# Modify existing server block
location_directive = [self._location_directive_for_achall(achall)]
self.configurator.parser.add_server_directives(vhost, location_directive)
self.configurator.parser.add_server_directives(vhost,
location_directive)
rewrite_directive = [['rewrite', ' ', '^(/.well-known/acme-challenge/.*)',
' ', '$1', ' ', 'break']]
self.configurator.parser.add_server_directives(vhost,
rewrite_directive, insert_at_top=True)
rewrite_directive = [['rewrite', ' ', '^(/.well-known/acme-challenge/.*)',
' ', '$1', ' ', 'break']]
self.configurator.parser.add_server_directives(vhost,
rewrite_directive, insert_at_top=True)
return None
return new_vhost

View file

@ -2,6 +2,8 @@
# Forked from https://github.com/fatiherikli/nginxparser (MIT Licensed)
import copy
import logging
from typing import Any
from typing import IO
from pyparsing import Combine
from pyparsing import Forward
@ -15,12 +17,11 @@ from pyparsing import restOfLine
from pyparsing import stringEnd
from pyparsing import White
from pyparsing import ZeroOrMore
import six
from acme.magic_typing import IO, Any # pylint: disable=unused-import
logger = logging.getLogger(__name__)
class RawNginxParser(object):
class RawNginxParser:
# pylint: disable=pointless-statement
"""A class that parses nginx configuration with pyparsing."""
@ -70,7 +71,7 @@ class RawNginxParser(object):
"""Returns the parsed tree as a list."""
return self.parse().asList()
class RawNginxDumper(object):
class RawNginxDumper:
"""A class that dumps nginx configuration from the provided tree."""
def __init__(self, blocks):
self.blocks = blocks
@ -79,7 +80,7 @@ class RawNginxDumper(object):
"""Iterates the dumped nginx content."""
blocks = blocks or self.blocks
for b0 in blocks:
if isinstance(b0, six.string_types):
if isinstance(b0, str):
yield b0
continue
item = copy.deepcopy(b0)
@ -96,7 +97,7 @@ class RawNginxDumper(object):
yield '}'
else: # not a block - list of strings
semicolon = ";"
if isinstance(item[0], six.string_types) and item[0].strip() == '#': # comment
if isinstance(item[0], str) and item[0].strip() == '#': # comment
semicolon = ""
yield "".join(item) + semicolon
@ -105,56 +106,8 @@ class RawNginxDumper(object):
return ''.join(self)
# Shortcut functions to respect Python's serialization interface
# (like pyyaml, picker or json)
spacey = lambda x: (isinstance(x, str) and x.isspace()) or x == ''
def loads(source):
"""Parses from a string.
:param str source: The string to parse
:returns: The parsed tree
:rtype: list
"""
return UnspacedList(RawNginxParser(source).as_list())
def load(_file):
"""Parses from a file.
:param file _file: The file to parse
:returns: The parsed tree
:rtype: list
"""
return loads(_file.read())
def dumps(blocks):
# type: (UnspacedList) -> six.text_type
"""Dump to a Unicode string.
:param UnspacedList block: The parsed tree
:rtype: six.text_type
"""
return six.text_type(RawNginxDumper(blocks.spaced))
def dump(blocks, _file):
# type: (UnspacedList, IO[Any]) -> None
"""Dump to a file.
:param UnspacedList block: The parsed tree
:param IO[Any] _file: The file stream to dump to. It must be opened with
Unicode encoding.
:rtype: None
"""
_file.write(dumps(blocks))
spacey = lambda x: (isinstance(x, six.string_types) and x.isspace()) or x == ''
class UnspacedList(list):
"""Wrap a list [of lists], making any whitespace entries magically invisible"""
@ -274,3 +227,50 @@ class UnspacedList(list):
idx -= 1
pos += 1
return idx0 + spaces
# Shortcut functions to respect Python's serialization interface
# (like pyyaml, picker or json)
def loads(source):
"""Parses from a string.
:param str source: The string to parse
:returns: The parsed tree
:rtype: list
"""
return UnspacedList(RawNginxParser(source).as_list())
def load(_file):
"""Parses from a file.
:param file _file: The file to parse
:returns: The parsed tree
:rtype: list
"""
return loads(_file.read())
def dumps(blocks: UnspacedList) -> str:
"""Dump to a Unicode string.
:param UnspacedList block: The parsed tree
:rtype: six.text_type
"""
return str(RawNginxDumper(blocks.spaced))
def dump(blocks: UnspacedList, _file: IO[Any]) -> None:
"""Dump to a file.
:param UnspacedList block: The parsed tree
:param IO[Any] _file: The file stream to dump to. It must be opened with
Unicode encoding.
:rtype: None
"""
_file.write(dumps(blocks))

View file

@ -1,7 +1,6 @@
"""Module contains classes used by the Nginx Configurator."""
import re
import six
from certbot.plugins import common
@ -144,7 +143,7 @@ class Addr(common.Addr):
return False
class VirtualHost(object):
class VirtualHost:
"""Represents an Nginx Virtualhost.
:ivar str filep: file path of VH
@ -211,7 +210,7 @@ class VirtualHost(object):
def contains_list(self, test):
"""Determine if raw server block contains test list at top level
"""
for i in six.moves.range(0, len(self.raw) - len(test) + 1):
for i in range(0, len(self.raw) - len(test) + 1):
if self.raw[i:i + len(test)] == test:
return True
return False
@ -250,7 +249,7 @@ def _find_directive(directives, directive_name, match_content=None):
"""Find a directive of type directive_name in directives. If match_content is given,
Searches for `match_content` in the directive arguments.
"""
if not directives or isinstance(directives, six.string_types):
if not directives or isinstance(directives, str):
return None
# If match_content is None, just match on directive type. Otherwise, match on

View file

@ -5,15 +5,15 @@ import glob
import io
import logging
import re
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union
import pyparsing
import six
from acme.magic_typing import Dict
from acme.magic_typing import List
from acme.magic_typing import Set
from acme.magic_typing import Tuple
from acme.magic_typing import Union
from certbot import errors
from certbot.compat import os
from certbot_nginx._internal import nginxparser
@ -22,7 +22,7 @@ from certbot_nginx._internal import obj
logger = logging.getLogger(__name__)
class NginxParser(object):
class NginxParser:
"""Class handles the fine details of parsing the Nginx Configuration.
:ivar str root: Normalized absolute path to the server root
@ -32,7 +32,7 @@ class NginxParser(object):
"""
def __init__(self, root):
self.parsed = {} # type: Dict[str, Union[List, nginxparser.UnspacedList]]
self.parsed: Dict[str, Union[List, nginxparser.UnspacedList]] = {}
self.root = os.path.abspath(root)
self.config_root = self._find_config_root()
@ -94,7 +94,7 @@ class NginxParser(object):
"""
servers = self._get_raw_servers()
addr_to_ssl = {} # type: Dict[Tuple[str, str], bool]
addr_to_ssl: Dict[Tuple[str, str], bool] = {}
for filename in servers:
for server, _ in servers[filename]:
# Parse the server block to save addr info
@ -106,12 +106,11 @@ class NginxParser(object):
addr_to_ssl[addr_tuple] = addr.ssl or addr_to_ssl[addr_tuple]
return addr_to_ssl
def _get_raw_servers(self):
def _get_raw_servers(self) -> Dict:
# pylint: disable=cell-var-from-loop
# type: () -> Dict
"""Get a map of unparsed all server blocks
"""
servers = {} # type: Dict[str, Union[List, nginxparser.UnspacedList]]
servers: Dict[str, Union[List, nginxparser.UnspacedList]] = {}
for filename in self.parsed:
tree = self.parsed[filename]
servers[filename] = []
@ -362,8 +361,9 @@ class NginxParser(object):
except errors.MisconfigurationError as err:
raise errors.MisconfigurationError("Problem in %s: %s" % (filename, str(err)))
def duplicate_vhost(self, vhost_template, remove_singleton_listen_params=False,
only_directives=None):
def duplicate_vhost(self, vhost_template: obj.VirtualHost,
remove_singleton_listen_params: bool = False,
only_directives: Optional[List] = None) -> obj.VirtualHost:
"""Duplicate the vhost in the configuration files.
:param :class:`~certbot_nginx._internal.obj.VirtualHost` vhost_template: The vhost
@ -549,7 +549,7 @@ def _is_include_directive(entry):
"""
return (isinstance(entry, list) and
len(entry) == 2 and entry[0] == 'include' and
isinstance(entry[1], six.string_types))
isinstance(entry[1], str))
def _is_ssl_on_directive(entry):
"""Checks if an nginx parsed entry is an 'ssl on' directive.
@ -654,7 +654,7 @@ def _add_directive(block, directive, insert_at_top):
directive_name = directive[0]
def can_append(loc, dir_name):
""" Can we append this directive to the block? """
return loc is None or (isinstance(dir_name, six.string_types)
return loc is None or (isinstance(dir_name, str)
and dir_name in REPEATABLE_DIRECTIVES)
err_fmt = 'tried to insert directive "{0}" but found conflicting "{1}".'
@ -740,9 +740,9 @@ def _parse_server_raw(server):
:rtype: dict
"""
addrs = set() # type: Set[obj.Addr]
ssl = False # type: bool
names = set() # type: Set[str]
addrs: Set[obj.Addr] = set()
ssl: bool = False
names: Set[str] = set()
apply_ssl_to_all_addrs = False

View file

@ -3,10 +3,8 @@ raw lists of tokens from pyparsing. """
import abc
import logging
from typing import List
import six
from acme.magic_typing import List
from certbot import errors
logger = logging.getLogger(__name__)
@ -14,7 +12,7 @@ COMMENT = " managed by Certbot"
COMMENT_BLOCK = ["#", COMMENT]
class Parsable(object):
class Parsable:
""" Abstract base class for "Parsable" objects whose underlying representation
is a tree of lists.
@ -24,7 +22,7 @@ class Parsable(object):
__metaclass__ = abc.ABCMeta
def __init__(self, parent=None):
self._data = [] # type: List[object]
self._data: List[object] = []
self._tabs = None
self.parent = parent
@ -152,7 +150,7 @@ class Statements(Parsable):
if not isinstance(raw_list, list):
raise errors.MisconfigurationError("Statements parsing expects a list!")
# If there's a trailing whitespace in the list of statements, keep track of it.
if raw_list and isinstance(raw_list[-1], six.string_types) and raw_list[-1].isspace():
if raw_list and isinstance(raw_list[-1], str) and raw_list[-1].isspace():
self._trailing_whitespace = raw_list[-1]
raw_list = raw_list[:-1]
self._data = [parse_raw(elem, self, add_spaces) for elem in raw_list]
@ -183,8 +181,8 @@ class Statements(Parsable):
def _space_list(list_):
""" Inserts whitespace between adjacent non-whitespace tokens. """
spaced_statement = [] # type: List[str]
for i in reversed(six.moves.xrange(len(list_))):
spaced_statement: List[str] = []
for i in reversed(range(len(list_))):
spaced_statement.insert(0, list_[i])
if i > 0 and not list_[i].isspace() and not list_[i-1].isspace():
spaced_statement.insert(0, " ")
@ -206,7 +204,7 @@ class Sentence(Parsable):
:returns: whether this lists is parseable by `Sentence`.
"""
return isinstance(lists, list) and len(lists) > 0 and \
all(isinstance(elem, six.string_types) for elem in lists)
all(isinstance(elem, str) for elem in lists)
def parse(self, raw_list, add_spaces=False):
""" Parses a list of string types into this object.
@ -214,7 +212,7 @@ class Sentence(Parsable):
if add_spaces:
raw_list = _space_list(raw_list)
if not isinstance(raw_list, list) or \
any(not isinstance(elem, six.string_types) for elem in raw_list):
any(not isinstance(elem, str) for elem in raw_list):
raise errors.MisconfigurationError("Sentence parsing expects a list of string types.")
self._data = raw_list
@ -272,8 +270,8 @@ class Block(Parsable):
"""
def __init__(self, parent=None):
super(Block, self).__init__(parent)
self.names = None # type: Sentence
self.contents = None # type: Block
self.names: Sentence = None
self.contents: Block = None
@staticmethod
def should_parse(lists):

View file

@ -1,7 +1,7 @@
from setuptools import find_packages
from setuptools import setup
version = '1.13.0.dev0'
version = '1.14.0.dev0'
# Remember to update local-oldest-requirements.txt when changing the minimum
# acme/certbot version.

View file

@ -39,7 +39,7 @@ class NginxConfiguratorTest(util.NginxTest):
def test_prepare(self):
self.assertEqual((1, 6, 2), self.config.version)
self.assertEqual(12, len(self.config.parser.parsed))
self.assertEqual(13, len(self.config.parser.parsed))
@mock.patch("certbot_nginx._internal.configurator.util.exe_exists")
@mock.patch("certbot_nginx._internal.configurator.subprocess.Popen")
@ -89,7 +89,7 @@ class NginxConfiguratorTest(util.NginxTest):
"155.225.50.69.nephoscale.net", "www.example.org", "another.alias",
"migration.com", "summer.com", "geese.com", "sslon.com",
"globalssl.com", "globalsslsetssl.com", "ipv6.com", "ipv6ssl.com",
"headers.com", "example.net"})
"headers.com", "example.net", "ssl.both.com"})
def test_supported_enhancements(self):
self.assertEqual(['redirect', 'ensure-http-header', 'staple-ocsp'],
@ -935,7 +935,19 @@ class NginxConfiguratorTest(util.NginxTest):
prefer_ssl=False,
no_ssl_filter_port='80')
# Check that the dialog was called with only port 80 vhosts
self.assertEqual(len(mock_select_vhs.call_args[0][0]), 6)
self.assertEqual(len(mock_select_vhs.call_args[0][0]), 8)
def test_choose_auth_vhosts(self):
"""choose_auth_vhosts correctly selects duplicative and HTTP/HTTPS vhosts"""
http, https = self.config.choose_auth_vhosts('ssl.both.com')
self.assertEqual(len(http), 4)
self.assertEqual(len(https), 2)
self.assertEqual(http[0].names, {'ssl.both.com'})
self.assertEqual(http[1].names, {'ssl.both.com'})
self.assertEqual(http[2].names, {'ssl.both.com'})
self.assertEqual(http[3].names, {'*.both.com'})
self.assertEqual(https[0].names, {'ssl.both.com'})
self.assertEqual(https[1].names, {'*.both.com'})
class InstallSslOptionsConfTest(util.NginxTest):

View file

@ -6,7 +6,6 @@ try:
import mock
except ImportError: # pragma: no cover
from unittest import mock # type: ignore
import six
from acme import challenges
from certbot import achallenges
@ -45,6 +44,10 @@ class HttpPerformTest(util.NginxTest):
challb=acme_util.chall_to_challb(
challenges.HTTP01(token=b"kNdwjxOeX0I_A8DXt9Msmg"), "pending"),
domain="migration.com", account_key=account_key),
achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.chall_to_challb(
challenges.HTTP01(token=b"kNdwjxOeX0I_A8DXt9Msmg"), "pending"),
domain="ipv6ssl.com", account_key=account_key),
]
def setUp(self):
@ -78,8 +81,8 @@ class HttpPerformTest(util.NginxTest):
http_responses = self.http01.perform()
self.assertEqual(len(http_responses), 4)
for i in six.moves.range(4):
self.assertEqual(len(http_responses), 5)
for i in range(5):
self.assertEqual(http_responses[i], acme_responses[i])
def test_mod_config(self):
@ -106,6 +109,43 @@ class HttpPerformTest(util.NginxTest):
# self.assertEqual(vhost.addrs, set(v_addr2_print))
# self.assertEqual(vhost.names, set([response.z_domain.decode('ascii')]))
@mock.patch('certbot_nginx._internal.parser.NginxParser.add_server_directives')
def test_mod_config_http_and_https(self, mock_add_server_directives):
"""A server_name with both HTTP and HTTPS vhosts should get modded in both vhosts"""
self.configuration.https_port = 443
self.http01.add_chall(self.achalls[3]) # migration.com
self.http01._mod_config() # pylint: disable=protected-access
# Domain has an HTTP and HTTPS vhost
# 2 * 'rewrite' + 2 * 'return 200 keyauthz' = 4
self.assertEqual(mock_add_server_directives.call_count, 4)
@mock.patch('certbot_nginx._internal.parser.nginxparser.dump')
@mock.patch('certbot_nginx._internal.parser.NginxParser.add_server_directives')
def test_mod_config_only_https(self, mock_add_server_directives, mock_dump):
"""A server_name with only an HTTPS vhost should get modded"""
self.http01.add_chall(self.achalls[4]) # ipv6ssl.com
self.http01._mod_config() # pylint: disable=protected-access
# It should modify the existing HTTPS vhost
self.assertEqual(mock_add_server_directives.call_count, 2)
# since there was no suitable HTTP vhost or default HTTP vhost, a non-empty one
# should have been created and written to the challenge conf file
self.assertNotEqual(mock_dump.call_args[0][0], [])
@mock.patch('certbot_nginx._internal.parser.NginxParser.add_server_directives')
def test_mod_config_deduplicate(self, mock_add_server_directives):
"""A vhost that appears in both HTTP and HTTPS vhosts only gets modded once"""
achall = achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.chall_to_challb(
challenges.HTTP01(token=b"kNdwjxOeX0I_A8DXt9Msmg"), "pending"),
domain="ssl.both.com", account_key=AUTH_KEY)
self.http01.add_chall(achall)
self.http01._mod_config() # pylint: disable=protected-access
# Should only get called 5 times, rather than 6, because two vhosts are the same
self.assertEqual(mock_add_server_directives.call_count, 5*2)
@mock.patch("certbot_nginx._internal.configurator.NginxConfigurator.ipv6_info")
def test_default_listen_addresses_no_memoization(self, ipv6_info):
# pylint: disable=protected-access

View file

@ -51,6 +51,7 @@ class NginxParserTest(util.NginxTest):
self.assertEqual({nparser.abs_path(x) for x in
['foo.conf', 'nginx.conf', 'server.conf',
'sites-enabled/default',
'sites-enabled/both.com',
'sites-enabled/example.com',
'sites-enabled/headers.com',
'sites-enabled/migration.com',
@ -88,7 +89,7 @@ class NginxParserTest(util.NginxTest):
parsed = nparser._parse_files(nparser.abs_path(
'sites-enabled/example.com.test'))
self.assertEqual(3, len(glob.glob(nparser.abs_path('*.test'))))
self.assertEqual(9, len(
self.assertEqual(10, len(
glob.glob(nparser.abs_path('sites-enabled/*.test'))))
self.assertEqual([[['server'], [['listen', '69.50.225.155:9000'],
['listen', '127.0.0.1'],
@ -111,7 +112,7 @@ class NginxParserTest(util.NginxTest):
([[[0], [3], [4]], [[5], [3], [0]]], [])]
for mylist, result in mylists:
paths = [] # type: List[List[int]]
paths: List[List[int]] = []
parser._do_for_subarray(mylist,
lambda x: isinstance(x, list) and
len(x) >= 1 and
@ -171,7 +172,7 @@ class NginxParserTest(util.NginxTest):
'*.www.example.com'},
[], [2, 1, 0])
self.assertEqual(14, len(vhosts))
self.assertEqual(19, len(vhosts))
example_com = [x for x in vhosts if 'example.com' in x.filep][0]
self.assertEqual(vhost3, example_com)
default = [x for x in vhosts if 'default' in x.filep][0]

View file

@ -1,125 +0,0 @@
"""Backport for `TestCase.assertLogs()`.
Most of the idea and code are from CPython implementation.
https://github.com/python/cpython/blob/b76518d43fb82ed9e5d27025d18c90a23d525c90/Lib/unittest/case.py
"""
import logging
import collections
__all__ = ['AssertLogsMixin']
LoggingWatcher = collections.namedtuple('LoggingWatcher', ['records', 'output'])
class CapturingHandler(logging.Handler):
"""
A logging handler capturing all (raw and formatted) logging output.
"""
def __init__(self):
super(CapturingHandler, self).__init__()
self.watcher = LoggingWatcher([], [])
def flush(self):
pass
def emit(self, record):
self.watcher.records.append(record)
self.watcher.output.append(self.format(record))
class AssertLogsContext(object):
"""
A context manager used to implement `TestCase.assertLogs()`.
"""
LOGGING_FORMAT = '%(levelname)s:%(name)s:%(message)s'
def __init__(self, test_case, logger_name, level):
self.test_case = test_case
self.logger_name = logger_name
self.logger_states = None
self.logger = None
if level:
# pylint: disable=protected-access,no-member
try:
# In Python 3.x
name_to_level = logging._nameToLevel # type: ignore
except AttributeError:
# In Python 2.7
name_to_level = logging._levelNames # type: ignore
self.level = name_to_level.get(level, level)
else:
self.level = logging.INFO
self.watcher = None
def _save_logger_states(self):
self.logger_states = (self.logger.handlers[:], self.logger.level, self.logger.propagate)
def _restore_logger_states(self):
self.logger.handlers, self.logger.level, self.logger.propagate = self.logger_states
def __enter__(self):
if isinstance(self.logger_name, logging.Logger):
self.logger = self.logger_name
else:
self.logger = logging.getLogger(self.logger_name)
formatter = logging.Formatter(self.LOGGING_FORMAT)
handler = CapturingHandler()
handler.setFormatter(formatter)
self._save_logger_states()
self.logger.handlers = [handler]
self.logger.setLevel(self.level)
self.logger.propagate = False
self.watcher = handler.watcher
return handler.watcher
def __exit__(self, exc_type, exc_value, tb):
self._restore_logger_states()
if exc_type is not None:
# let unexpected exceptions pass through
return
if not self.watcher.records:
self._raiseFailure(
"no logs of level {} or higher triggered on {}"
.format(logging.getLevelName(self.level), self.logger.name))
def _raiseFailure(self, message):
message = self.test_case._formatMessage(None, message) # pylint: disable=protected-access
raise self.test_case.failureException(message)
class AssertLogsMixin(object):
"""
A mixin that implements `TestCase.assertLogs()`.
"""
def assertLogs(self, logger=None, level=None):
"""Fail unless a log message of level *level* or higher is emitted
on *logger_name* or its children. If omitted, *level* defaults to
INFO and *logger* defaults to the root logger.
This method must be used as a context manager, and will yield
a recording object with two attributes: `output` and `records`.
At the end of the context manager, the `output` attribute will
be a list of the matching formatted log messages and the
`records` attribute will be a list of the corresponding LogRecord
objects.
Example::
with self.assertLogs('foo', level='INFO') as cm:
logging.getLogger('foo').info('first message')
logging.getLogger('foo.bar').error('second message')
self.assertEqual(cm.output, ['INFO:foo:first message',
'ERROR:foo.bar:second message'])
"""
return AssertLogsContext(self, logger, level)

View file

@ -17,10 +17,9 @@ from certbot.plugins import common
from certbot.tests import util as test_util
from certbot_nginx._internal import configurator
from certbot_nginx._internal import nginxparser
import test_log_util
class NginxTest(test_log_util.AssertLogsMixin, test_util.ConfigTestCase):
class NginxTest(test_util.ConfigTestCase):
def setUp(self):
super(NginxTest, self).setUp()

View file

@ -0,0 +1,32 @@
server {
server_name ssl.both.com;
}
# a duplicate vhost
server {
server_name ssl.both.com;
}
# a duplicate by means of wildcard
server {
server_name *.both.com;
}
# combined HTTP and HTTPS
server {
server_name ssl.both.com;
listen 80;
listen 5001 ssl;
ssl_certificate cert.pem;
ssl_certificate_key cert.key;
}
# HTTPS, duplicate by means of wildcard
server {
server_name *.both.com;
listen 5001 ssl;
ssl_certificate cert.pem;
ssl_certificate_key cert.key;
}

View file

@ -2,7 +2,7 @@
Certbot adheres to [Semantic Versioning](https://semver.org/).
## 1.13.0 - master
## 1.14.0 - master
### Added
@ -10,6 +10,28 @@ Certbot adheres to [Semantic Versioning](https://semver.org/).
### Changed
* certbot-auto no longer checks for updates on any operating system.
* The module `acme.magic_typing` is deprecated and will be removed in a future release.
Please use the built-in module `typing` instead.
* The DigitalOcean plugin now creates TXT records for the DNS-01 challenge with a lower 30s TTL.
### Fixed
* Don't output an empty line for a hidden certificate when `certbot certificates` is being used
in combination with `--cert-name` or `-d`.
More details about these changes can be found on our GitHub repo.
## 1.13.0 - 2021-03-02
### Added
*
### Changed
* CLI flags `--os-packages-only`, `--no-self-upgrade`, `--no-bootstrap` and `--no-permissions-check`,
which are related to certbot-auto, are deprecated and will be removed in a future release.
* Certbot no longer conditionally depends on an external mock module. Certbot's
test API will continue to use it if it is available for backwards
compatibility, however, this behavior has been deprecated and will be removed
@ -17,6 +39,12 @@ Certbot adheres to [Semantic Versioning](https://semver.org/).
* The acme library no longer depends on the `security` extras from `requests`
which was needed to support SNI in TLS requests when using old versions of
Python 2.
* Certbot and all of its components no longer depend on the library `six`.
* The update of certbot-auto itself is now disabled on all RHEL-like systems.
* When revoking a certificate by `--cert-name`, it is no longer necessary to specify the `--server`
if the certificate was obtained from a non-default ACME server.
* The nginx authenticator now configures all matching HTTP and HTTPS vhosts for the HTTP-01
challenge. It is now compatible with external HTTPS redirection by a CDN or load balancer.
### Fixed

View file

@ -1,3 +1,3 @@
"""Certbot client."""
# version number like 1.2.3a0, must have at least 2 parts, like 1.2
__version__ = '1.13.0.dev0'
__version__ = '1.14.0.dev0'

View file

@ -10,7 +10,6 @@ from cryptography.hazmat.primitives import serialization
import josepy as jose
import pyrfc3339
import pytz
import six
from acme import fields as acme_fields
from acme import messages
@ -19,13 +18,13 @@ from certbot import errors
from certbot import interfaces
from certbot import util
from certbot._internal import constants
from certbot.compat import os
from certbot.compat import filesystem
from certbot.compat import os
logger = logging.getLogger(__name__)
class Account(object):
class Account:
"""ACME protocol registration.
:ivar .RegistrationResource regr: Registration Resource
@ -101,7 +100,7 @@ class AccountMemoryStorage(interfaces.AccountStorage):
self.accounts = initial_accounts if initial_accounts is not None else {}
def find_all(self):
return list(six.itervalues(self.accounts))
return list(self.accounts.values())
def save(self, account, client):
if account.id in self.accounts:
@ -230,8 +229,7 @@ class AccountFileStorage(interfaces.AccountStorage):
def load(self, account_id):
return self._load_for_server_path(account_id, self.config.server_path)
def save(self, account, client):
# type: (Account, ClientBase) -> None
def save(self, account: Account, client: ClientBase) -> None:
"""Create a new account.
:param Account account: account to create
@ -246,8 +244,7 @@ class AccountFileStorage(interfaces.AccountStorage):
except IOError as error:
raise errors.AccountStorageError(error)
def update_regr(self, account, client):
# type: (Account, ClientBase) -> None
def update_regr(self, account: Account, client: ClientBase) -> None:
"""Update the registration resource.
:param Account account: account to update
@ -260,8 +257,7 @@ class AccountFileStorage(interfaces.AccountStorage):
except IOError as error:
raise errors.AccountStorageError(error)
def update_meta(self, account):
# type: (Account) -> None
def update_meta(self, account: Account) -> None:
"""Update the meta resource.
:param Account account: account to update
@ -339,19 +335,16 @@ class AccountFileStorage(interfaces.AccountStorage):
return dir_path
def _prepare(self, account):
# type: (Account) -> str
def _prepare(self, account: Account) -> str:
account_dir_path = self._account_dir_path(account.id)
util.make_or_verify_dir(account_dir_path, 0o700, self.config.strict_permissions)
return account_dir_path
def _create(self, account, dir_path):
# type: (Account, str) -> None
def _create(self, account: Account, dir_path: str) -> None:
with util.safe_open(self._key_path(dir_path), "w", chmod=0o400) as key_file:
key_file.write(account.key.json_dumps())
def _update_regr(self, account, acme, dir_path):
# type: (Account, ClientBase, str) -> None
def _update_regr(self, account: Account, acme: ClientBase, dir_path: str) -> None:
with open(self._regr_path(dir_path), "w") as regr_file:
regr = account.regr
# If we have a value for new-authz, save it for forwards

View file

@ -2,15 +2,15 @@
import datetime
import logging
import time
from typing import Dict
from typing import List
from typing import Tuple
import zope.component
from acme import challenges
from acme import errors as acme_errors
from acme import messages
from acme.magic_typing import Dict
from acme.magic_typing import List
from acme.magic_typing import Tuple
from certbot import achallenges
from certbot import errors
from certbot import interfaces
@ -19,7 +19,7 @@ from certbot._internal import error_handler
logger = logging.getLogger(__name__)
class AuthHandler(object):
class AuthHandler:
"""ACME Authorization Handler for a client.
:ivar auth: Authenticator capable of solving
@ -98,8 +98,7 @@ class AuthHandler(object):
return authzrs_validated
def deactivate_valid_authorizations(self, orderr):
# type: (messages.OrderResource) -> Tuple[List, List]
def deactivate_valid_authorizations(self, orderr: messages.OrderResource) -> Tuple[List, List]:
"""
Deactivate all `valid` authorizations in the order, so that they cannot be re-used
in subsequent orders.
@ -191,7 +190,7 @@ class AuthHandler(object):
"""
pending_authzrs = [authzr for authzr in authzrs
if authzr.body.status != messages.STATUS_VALID]
achalls = [] # type: List[achallenges.AnnotatedChallenge]
achalls: List[achallenges.AnnotatedChallenge] = []
if pending_authzrs:
logger.info("Performing the following challenges:")
for authzr in pending_authzrs:
@ -428,7 +427,7 @@ _ERROR_HELP = {
def _report_failed_authzrs(failed_authzrs, account_key):
"""Notifies the user about failed authorizations."""
problems = {} # type: Dict[str, List[achallenges.AnnotatedChallenge]]
problems: Dict[str, List[achallenges.AnnotatedChallenge]] = {}
failed_achalls = [challb_to_achall(challb, account_key, authzr.body.identifier.value)
for authzr in failed_authzrs for challb in authzr.body.challenges
if challb.error]

View file

@ -3,11 +3,11 @@ import datetime
import logging
import re
import traceback
from typing import List
import pytz
import zope.component
from acme.magic_typing import List
from certbot import crypto_util
from certbot import errors
from certbot import interfaces
@ -223,7 +223,7 @@ def cert_path_to_lineage(cli_config):
"""
acceptable_matches = _acceptable_matches()
match = match_and_check_overlaps(cli_config, acceptable_matches,
lambda x: cli_config.cert_path[0], lambda x: x.lineagename)
lambda x: cli_config.cert_path, lambda x: x.lineagename)
return match[0]
@ -241,7 +241,7 @@ def match_and_check_overlaps(cli_config, acceptable_matches, match_func, rv_func
def find_matches(candidate_lineage, return_value, acceptable_matches):
"""Returns a list of matches using _search_lineages."""
acceptable_matches = [func(candidate_lineage) for func in acceptable_matches]
acceptable_matches_rv = [] # type: List[str]
acceptable_matches_rv: List[str] = []
for item in acceptable_matches:
if isinstance(item, list):
acceptable_matches_rv += item
@ -254,7 +254,7 @@ def match_and_check_overlaps(cli_config, acceptable_matches, match_func, rv_func
matched = _search_lineages(cli_config, find_matches, [], acceptable_matches)
if not matched:
raise errors.Error("No match found for cert-path {0}!".format(cli_config.cert_path[0]))
raise errors.Error("No match found for cert-path {0}!".format(cli_config.cert_path))
elif len(matched) > 1:
raise errors.OverlappingMatchFound()
return matched
@ -266,9 +266,9 @@ def human_readable_cert_info(config, cert, skip_filter_checks=False):
checker = ocsp.RevocationChecker()
if config.certname and cert.lineagename != config.certname and not skip_filter_checks:
return ""
return None
if config.domains and not set(config.domains).issubset(cert.names()):
return ""
return None
now = pytz.UTC.fromutc(datetime.datetime.utcnow())
reasons = []
@ -358,13 +358,15 @@ def _report_human_readable(config, parsed_certs):
"""Format a results report for a parsed cert"""
certinfo = []
for cert in parsed_certs:
certinfo.append(human_readable_cert_info(config, cert))
cert_info = human_readable_cert_info(config, cert)
if cert_info is not None:
certinfo.append(cert_info)
return "\n".join(certinfo)
def _describe_certs(config, parsed_certs, parse_failures):
"""Print information about the certs we know about"""
out = [] # type: List[str]
out: List[str] = []
notify = out.append

View file

@ -1,73 +1,58 @@
"""Certbot command line argument & config processing."""
# pylint: disable=too-many-lines
from __future__ import print_function
import argparse
import logging
import logging.handlers
import argparse
import sys
import certbot._internal.plugins.selection as plugin_selection
from certbot._internal.plugins import disco as plugins_disco
from typing import Optional
from acme.magic_typing import Optional
# pylint: disable=ungrouped-imports
import certbot
from certbot._internal import constants
import certbot.plugins.enhancements as enhancements
from certbot._internal.cli.cli_constants import (
LEAUTO,
old_path_fragment,
new_path_prefix,
cli_command,
SHORT_USAGE,
COMMAND_OVERVIEW,
HELP_AND_VERSION_USAGE,
ARGPARSE_PARAMS_TO_REMOVE,
EXIT_ACTIONS,
ZERO_ARG_ACTIONS,
VAR_MODIFIERS,
DEPRECATED_OPTIONS
)
from certbot._internal.cli.cli_utils import (
_Default,
read_file,
flag_default,
config_help,
HelpfulArgumentGroup,
CustomHelpFormatter,
_DomainsAction,
add_domains,
CaseInsensitiveList,
_user_agent_comment_type,
_EncodeReasonAction,
parse_preferred_challenges,
_PrefChallAction,
_DeployHookAction,
_RenewHookAction,
nonnegative_int
)
# These imports depend on cli_constants and cli_utils.
from certbot._internal.cli.verb_help import VERB_HELP, VERB_HELP_MAP
from certbot._internal.cli.cli_constants import ARGPARSE_PARAMS_TO_REMOVE
from certbot._internal.cli.cli_constants import cli_command
from certbot._internal.cli.cli_constants import COMMAND_OVERVIEW
from certbot._internal.cli.cli_constants import DEPRECATED_OPTIONS
from certbot._internal.cli.cli_constants import EXIT_ACTIONS
from certbot._internal.cli.cli_constants import HELP_AND_VERSION_USAGE
from certbot._internal.cli.cli_constants import LEAUTO
from certbot._internal.cli.cli_constants import new_path_prefix
from certbot._internal.cli.cli_constants import old_path_fragment
from certbot._internal.cli.cli_constants import SHORT_USAGE
from certbot._internal.cli.cli_constants import VAR_MODIFIERS
from certbot._internal.cli.cli_constants import ZERO_ARG_ACTIONS
from certbot._internal.cli.cli_utils import _Default
from certbot._internal.cli.cli_utils import _DeployHookAction
from certbot._internal.cli.cli_utils import _DomainsAction
from certbot._internal.cli.cli_utils import _EncodeReasonAction
from certbot._internal.cli.cli_utils import _PrefChallAction
from certbot._internal.cli.cli_utils import _RenewHookAction
from certbot._internal.cli.cli_utils import _user_agent_comment_type
from certbot._internal.cli.cli_utils import add_domains
from certbot._internal.cli.cli_utils import CaseInsensitiveList
from certbot._internal.cli.cli_utils import config_help
from certbot._internal.cli.cli_utils import CustomHelpFormatter
from certbot._internal.cli.cli_utils import flag_default
from certbot._internal.cli.cli_utils import HelpfulArgumentGroup
from certbot._internal.cli.cli_utils import nonnegative_int
from certbot._internal.cli.cli_utils import parse_preferred_challenges
from certbot._internal.cli.cli_utils import read_file
from certbot._internal.cli.group_adder import _add_all_groups
from certbot._internal.cli.subparsers import _create_subparsers
from certbot._internal.cli.helpful import HelpfulArgumentParser
from certbot._internal.cli.paths_parser import _paths_parser
from certbot._internal.cli.plugins_parsing import _plugins_parsing
# These imports depend on some or all of the submodules for cli.
from certbot._internal.cli.helpful import HelpfulArgumentParser
# pylint: enable=ungrouped-imports
from certbot._internal.cli.subparsers import _create_subparsers
from certbot._internal.cli.verb_help import VERB_HELP
from certbot._internal.cli.verb_help import VERB_HELP_MAP
from certbot._internal.plugins import disco as plugins_disco
import certbot._internal.plugins.selection as plugin_selection
import certbot.plugins.enhancements as enhancements
logger = logging.getLogger(__name__)
# Global, to save us from a lot of argument passing within the scope of this module
helpful_parser = None # type: Optional[HelpfulArgumentParser]
helpful_parser: Optional[HelpfulArgumentParser] = None
def prepare_and_parse_args(plugins, args, detect_defaults=False):
@ -249,27 +234,6 @@ def prepare_and_parse_args(plugins, args, detect_defaults=False):
default=flag_default("duplicate"),
help="Allow making a certificate lineage that duplicates an existing one "
"(both can be renewed in parallel)")
helpful.add(
"automation", "--os-packages-only", action="store_true",
default=flag_default("os_packages_only"),
help="(certbot-auto only) install OS package dependencies and then stop")
helpful.add(
"automation", "--no-self-upgrade", action="store_true",
default=flag_default("no_self_upgrade"),
help="(certbot-auto only) prevent the certbot-auto script from"
" upgrading itself to newer released versions (default: Upgrade"
" automatically)")
helpful.add(
"automation", "--no-bootstrap", action="store_true",
default=flag_default("no_bootstrap"),
help="(certbot-auto only) prevent the certbot-auto script from"
" installing OS-level dependencies (default: Prompt to install "
" OS-wide dependencies, but exit if the user says 'No')")
helpful.add(
"automation", "--no-permissions-check", action="store_true",
default=flag_default("no_permissions_check"),
help="(certbot-auto only) skip the check on the file system"
" permissions of the certbot-auto script")
helpful.add(
["automation", "renew", "certonly", "run"],
"-q", "--quiet", dest="quiet", action="store_true",
@ -451,6 +415,12 @@ def prepare_and_parse_args(plugins, args, detect_defaults=False):
default=flag_default("autorenew"), dest="autorenew",
help="Disable auto renewal of certificates.")
# Deprecated arguments
helpful.add_deprecated_argument("--os-packages-only", 0)
helpful.add_deprecated_argument("--no-self-upgrade", 0)
helpful.add_deprecated_argument("--no-bootstrap", 0)
helpful.add_deprecated_argument("--no-permissions-check", 0)
# Populate the command line parameters for new style enhancements
enhancements.populate_cli(helpful.add)

View file

@ -109,4 +109,10 @@ VAR_MODIFIERS = {"account": {"server",},
# This is a list of all CLI options that we have ever deprecated. It lets us
# opt out of the default detection, which can interact strangely with option
# deprecation. See https://github.com/certbot/certbot/issues/8540 for more info.
DEPRECATED_OPTIONS = {"manual_public_ip_logging_ok",}
DEPRECATED_OPTIONS = {
"manual_public_ip_logging_ok",
"os_packages_only",
"no_self_upgrade",
"no_bootstrap",
"no_permissions_check",
}

View file

@ -5,14 +5,14 @@ import copy
import zope.interface.interface # pylint: disable=unused-import
from acme import challenges
from certbot import errors
from certbot import interfaces
from certbot import util
from certbot import errors
from certbot.compat import os
from certbot._internal import constants
from certbot.compat import os
class _Default(object):
class _Default:
"""A class to use as a default to detect if a value is set by a user"""
def __bool__(self):
@ -62,11 +62,11 @@ def config_help(name, hidden=False):
"""Extract the help message for an `.IConfig` attribute."""
if hidden:
return argparse.SUPPRESS
field = interfaces.IConfig.__getitem__(name) # type: zope.interface.interface.Attribute
field: zope.interface.interface.Attribute = interfaces.IConfig.__getitem__(name)
return field.__doc__
class HelpfulArgumentGroup(object):
class HelpfulArgumentGroup:
"""Emulates an argparse group for use with HelpfulArgumentParser.
This class is used in the add_group method of HelpfulArgumentParser.

View file

@ -1,6 +1,6 @@
"""This module contains a function to add the groups of arguments for the help
display"""
from certbot._internal.cli import VERB_HELP
from certbot._internal.cli.verb_help import VERB_HELP
def _add_all_groups(helpful):

Some files were not shown because too many files have changed in this diff Show more