Merge branch 'master' into typed-jose-fields

This commit is contained in:
Adrien Ferrand 2021-11-17 22:39:07 +01:00
commit 14f1f99352
101 changed files with 610 additions and 327 deletions

View file

@ -4,7 +4,7 @@ jobs:
- name: IMAGE_NAME
value: ubuntu-18.04
- name: PYTHON_VERSION
value: 3.9
value: 3.10
- group: certbot-common
strategy:
matrix:
@ -17,6 +17,9 @@ jobs:
linux-py38:
PYTHON_VERSION: 3.8
TOXENV: py38
linux-py39:
PYTHON_VERSION: 3.9
TOXENV: py39
linux-py37-nopin:
PYTHON_VERSION: 3.7
TOXENV: py37
@ -71,6 +74,14 @@ jobs:
PYTHON_VERSION: 3.9
TOXENV: integration
ACME_SERVER: boulder-v2
linux-boulder-v1-py310-integration:
PYTHON_VERSION: 3.10
TOXENV: integration
ACME_SERVER: boulder-v1
linux-boulder-v2-py310-integration:
PYTHON_VERSION: 3.10
TOXENV: integration
ACME_SERVER: boulder-v2
nginx-compat:
TOXENV: nginx_compat
linux-integration-rfc2136:

View file

@ -1,16 +1,16 @@
jobs:
- job: test
variables:
PYTHON_VERSION: 3.9
PYTHON_VERSION: 3.10
strategy:
matrix:
macos-py36:
IMAGE_NAME: macOS-10.15
PYTHON_VERSION: 3.6
TOXENV: py36
macos-py39:
macos-py310:
IMAGE_NAME: macOS-10.15
PYTHON_VERSION: 3.9
PYTHON_VERSION: 3.10
TOXENV: py39
windows-py36:
IMAGE_NAME: vs2017-win2016
@ -36,17 +36,17 @@ jobs:
IMAGE_NAME: ubuntu-18.04
PYTHON_VERSION: 3.6
TOXENV: py36
linux-py39-cover:
linux-py310-cover:
IMAGE_NAME: ubuntu-18.04
PYTHON_VERSION: 3.9
TOXENV: py39-cover
linux-py39-lint:
PYTHON_VERSION: 3.10
TOXENV: py310-cover
linux-py310-lint:
IMAGE_NAME: ubuntu-18.04
PYTHON_VERSION: 3.9
PYTHON_VERSION: 3.10
TOXENV: lint-posix
linux-py39-mypy:
linux-py310-mypy:
IMAGE_NAME: ubuntu-18.04
PYTHON_VERSION: 3.9
PYTHON_VERSION: 3.10
TOXENV: mypy-posix
linux-integration:
IMAGE_NAME: ubuntu-18.04

View file

@ -348,7 +348,7 @@ def gen_ss_cert(key: crypto.PKey, domains: Optional[List[str]] = None,
not_before: Optional[int] = None,
validity: int = (7 * 24 * 60 * 60), force_san: bool = True,
extensions: Optional[List[crypto.X509Extension]] = None,
ips: Optional[List[Union[ipaddress.IPv4Address, ipaddress.IPv4Address]]] = None
ips: Optional[List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]] = None
) -> crypto.X509:
"""Generate new self-signed certificate.

0
acme/acme/py.typed Normal file
View file

View file

@ -7,4 +7,7 @@
# in --editable mode (-e), just "pip install acme[docs]" does not work as
# expected and "pip install -e acme[docs]" must be used instead
# We also pin our dependencies for increased stability.
-c ../tools/requirements.txt
-e acme[docs]

View file

@ -3,7 +3,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
'cryptography>=2.1.4',
@ -45,6 +45,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
],

View file

@ -2,7 +2,6 @@
# pylint: disable=too-many-lines
from collections import defaultdict
import copy
from distutils.version import LooseVersion
import fnmatch
import logging
import re
@ -154,9 +153,10 @@ class ApacheConfigurator(common.Installer, interfaces.Authenticator):
"""
# Disabling TLS session tickets is supported by Apache 2.4.11+ and OpenSSL 1.0.2l+.
# So for old versions of Apache we pick a configuration without this option.
min_openssl_version = util.parse_loose_version('1.0.2l')
openssl_version = self.openssl_version(warn_on_no_mod_ssl)
if self.version < (2, 4, 11) or not openssl_version or\
LooseVersion(openssl_version) < LooseVersion('1.0.2l'):
util.parse_loose_version(openssl_version) < min_openssl_version:
return apache_util.find_ssl_apache_conf("old")
return apache_util.find_ssl_apache_conf("current")
@ -2437,10 +2437,9 @@ class ApacheConfigurator(common.Installer, interfaces.Authenticator):
except errors.SubprocessError as err:
logger.warning("Unable to restart apache using %s",
self.options.restart_cmd)
alt_restart = self.options.restart_cmd_alt
if alt_restart:
if self.options.restart_cmd_alt:
logger.debug("Trying alternative restart command: %s",
alt_restart)
self.options.restart_cmd_alt)
# There is an alternative restart command available
# This usually is "restart" verb while original is "graceful"
try:

View file

@ -1,6 +1,4 @@
""" Entry point for Apache Plugin """
from distutils.version import LooseVersion
from certbot import util
from certbot_apache._internal import configurator
from certbot_apache._internal import override_arch
@ -47,7 +45,8 @@ def get_configurator():
override_class = None
# Special case for older Fedora versions
if os_name == 'fedora' and LooseVersion(os_version) < LooseVersion('29'):
min_version = util.parse_loose_version('29')
if os_name == 'fedora' and util.parse_loose_version(os_version) < min_version:
os_name = 'fedora_old'
try:

View file

@ -118,7 +118,8 @@ class DebianConfigurator(configurator.ApacheConfigurator):
# Generate reversal command.
# Try to be safe here... check that we can probably reverse before
# applying enmod command
if not util.exe_exists(self.options.dismod):
if (self.options.dismod is None or self.options.enmod is None
or not util.exe_exists(self.options.dismod)):
raise errors.MisconfigurationError(
"Unable to find a2dismod, please make sure a2enmod and "
"a2dismod are configured correctly for certbot.")

View file

View file

@ -1,7 +1,7 @@
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
# We specify the minimum acme and certbot version as the current plugin
@ -38,6 +38,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
'Topic :: System :: Installation/Setup',

View file

@ -1,8 +1,8 @@
#!/usr/bin/env python
"""Module to call certbot in test mode"""
from distutils.version import LooseVersion
import os
import pkg_resources
import subprocess
import sys
@ -85,7 +85,7 @@ def _compute_additional_args(workspace, environ, force_renew):
cwd=workspace, env=environ)
# Typical response is: output = 'certbot 0.31.0.dev0'
version_str = output.split(' ')[1].strip()
if LooseVersion(version_str) >= LooseVersion('0.30.0'):
if pkg_resources.parse_version(version_str) >= pkg_resources.parse_version('0.30.0'):
additional_args.append('--no-random-sleep-on-renew')
if force_renew:

View file

@ -1,5 +1,4 @@
from distutils.version import LooseVersion
from pkg_resources import parse_version
from setuptools import __version__ as setuptools_version
from setuptools import find_packages
from setuptools import setup
@ -10,7 +9,7 @@ version = '0.32.0.dev0'
min_setuptools_version='36.2'
# This conditional isn't necessary, but it provides better error messages to
# people who try to install this package with older versions of setuptools.
if LooseVersion(setuptools_version) < LooseVersion(min_setuptools_version):
if parse_version(setuptools_version) < parse_version(min_setuptools_version):
raise RuntimeError(f'setuptools {min_setuptools_version}+ is required')
install_requires = [
@ -29,6 +28,7 @@ install_requires = [
'pywin32>=300 ; sys_platform == "win32"',
'pyyaml',
'requests',
'setuptools',
'types-python-dateutil'
]
@ -51,6 +51,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
],

View file

@ -1,6 +1,7 @@
"""Validators to determine the current webserver configuration"""
import logging
import socket
from typing import cast
import requests
@ -29,7 +30,8 @@ class Validator:
logger.exception(str(error))
return False
return presented_cert.digest("sha256") == cert.digest("sha256")
# Despite documentation saying that bytes are expected for digest(), we must provide a str.
return presented_cert.digest(cast(bytes, "sha256")) == cert.digest("sha256")
def redirect(self, name, port=80, headers=None):
"""Test whether webserver redirects to secure connection."""

View file

@ -1,7 +1,7 @@
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
'certbot',
@ -29,6 +29,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
],

View file

@ -177,5 +177,5 @@ texinfo_documents = [
intersphinx_mapping = {
'python': ('https://docs.python.org/', None),
'acme': ('https://acme-python.readthedocs.org/en/latest/', None),
'certbot': ('https://certbot.eff.org/docs/', None),
'certbot': ('https://eff-certbot.readthedocs.io/en/stable/', None),
}

View file

@ -7,6 +7,9 @@
# in --editable mode (-e), just "pip install certbot-dns-cloudflare[docs]" does not work as
# expected and "pip install -e certbot-dns-cloudflare[docs]" must be used instead
# We also pin our dependencies for increased stability.
-c ../tools/requirements.txt
-e acme
-e certbot
-e certbot-dns-cloudflare[docs]

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
'cloudflare>=1.5.1',
@ -51,6 +51,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
'Topic :: System :: Installation/Setup',

View file

@ -177,5 +177,5 @@ texinfo_documents = [
intersphinx_mapping = {
'python': ('https://docs.python.org/', None),
'acme': ('https://acme-python.readthedocs.org/en/latest/', None),
'certbot': ('https://certbot.eff.org/docs/', None),
'certbot': ('https://eff-certbot.readthedocs.io/en/stable/', None),
}

View file

@ -7,6 +7,9 @@
# in --editable mode (-e), just "pip install certbot-dns-cloudxns[docs]" does not work as
# expected and "pip install -e certbot-dns-cloudxns[docs]" must be used instead
# We also pin our dependencies for increased stability.
-c ../tools/requirements.txt
-e acme
-e certbot
-e certbot-dns-cloudxns[docs]

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
'dns-lexicon>=3.2.1',
@ -51,6 +51,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
'Topic :: System :: Installation/Setup',

View file

@ -177,5 +177,5 @@ texinfo_documents = [
intersphinx_mapping = {
'python': ('https://docs.python.org/', None),
'acme': ('https://acme-python.readthedocs.org/en/latest/', None),
'certbot': ('https://certbot.eff.org/docs/', None),
'certbot': ('https://eff-certbot.readthedocs.io/en/stable/', None),
}

View file

@ -7,6 +7,9 @@
# in --editable mode (-e), just "pip install certbot-dns-digitalocean[docs]" does not work as
# expected and "pip install -e certbot-dns-digitalocean[docs]" must be used instead
# We also pin our dependencies for increased stability.
-c ../tools/requirements.txt
-e acme
-e certbot
-e certbot-dns-digitalocean[docs]

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
'python-digitalocean>=1.11', # 1.15.0 or newer is recommended for TTL support
@ -51,6 +51,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
'Topic :: System :: Installation/Setup',

View file

@ -177,5 +177,5 @@ texinfo_documents = [
intersphinx_mapping = {
'python': ('https://docs.python.org/', None),
'acme': ('https://acme-python.readthedocs.org/en/latest/', None),
'certbot': ('https://certbot.eff.org/docs/', None),
'certbot': ('https://eff-certbot.readthedocs.io/en/stable/', None),
}

View file

@ -7,6 +7,9 @@
# in --editable mode (-e), just "pip install certbot-dns-dnsimple[docs]" does not work as
# expected and "pip install -e certbot-dns-dnsimple[docs]" must be used instead
# We also pin our dependencies for increased stability.
-c ../tools/requirements.txt
-e acme
-e certbot
-e certbot-dns-dnsimple[docs]

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
# This version of lexicon is required to address the problem described in
@ -53,6 +53,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
'Topic :: System :: Installation/Setup',

View file

@ -177,5 +177,5 @@ texinfo_documents = [
intersphinx_mapping = {
'python': ('https://docs.python.org/', None),
'acme': ('https://acme-python.readthedocs.org/en/latest/', None),
'certbot': ('https://certbot.eff.org/docs/', None),
'certbot': ('https://eff-certbot.readthedocs.io/en/stable/', None),
}

View file

@ -7,6 +7,9 @@
# in --editable mode (-e), just "pip install certbot-dns-dnsmadeeasy[docs]" does not work as
# expected and "pip install -e certbot-dns-dnsmadeeasy[docs]" must be used instead
# We also pin our dependencies for increased stability.
-c ../tools/requirements.txt
-e acme
-e certbot
-e certbot-dns-dnsmadeeasy[docs]

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
'dns-lexicon>=3.2.1',
@ -51,6 +51,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
'Topic :: System :: Installation/Setup',

View file

@ -177,5 +177,5 @@ texinfo_documents = [
intersphinx_mapping = {
'python': ('https://docs.python.org/', None),
'acme': ('https://acme-python.readthedocs.org/en/latest/', None),
'certbot': ('https://certbot.eff.org/docs/', None),
'certbot': ('https://eff-certbot.readthedocs.io/en/stable/', None),
}

View file

@ -7,6 +7,9 @@
# in --editable mode (-e), just "pip install certbot-dns-gehirn[docs]" does not work as
# expected and "pip install -e certbot-dns-gehirn[docs]" must be used instead
# We also pin our dependencies for increased stability.
-c ../tools/requirements.txt
-e acme
-e certbot
-e certbot-dns-gehirn[docs]

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
'dns-lexicon>=3.2.1',
@ -51,6 +51,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
'Topic :: System :: Installation/Setup',

View file

@ -177,5 +177,5 @@ texinfo_documents = [
intersphinx_mapping = {
'python': ('https://docs.python.org/', None),
'acme': ('https://acme-python.readthedocs.org/en/latest/', None),
'certbot': ('https://certbot.eff.org/docs/', None),
'certbot': ('https://eff-certbot.readthedocs.io/en/stable/', None),
}

View file

@ -7,6 +7,9 @@
# in --editable mode (-e), just "pip install certbot-dns-google[docs]" does not work as
# expected and "pip install -e certbot-dns-google[docs]" must be used instead
# We also pin our dependencies for increased stability.
-c ../tools/requirements.txt
-e acme
-e certbot
-e certbot-dns-google[docs]

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
'google-api-python-client>=1.5.5',
@ -54,6 +54,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
'Topic :: System :: Installation/Setup',

View file

@ -177,5 +177,5 @@ texinfo_documents = [
intersphinx_mapping = {
'python': ('https://docs.python.org/', None),
'acme': ('https://acme-python.readthedocs.org/en/latest/', None),
'certbot': ('https://certbot.eff.org/docs/', None),
'certbot': ('https://eff-certbot.readthedocs.io/en/stable/', None),
}

View file

@ -7,6 +7,9 @@
# in --editable mode (-e), just "pip install certbot-dns-linode[docs]" does not work as
# expected and "pip install -e certbot-dns-linode[docs]" must be used instead
# We also pin our dependencies for increased stability.
-c ../tools/requirements.txt
-e acme
-e certbot
-e certbot-dns-linode[docs]

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
'dns-lexicon>=3.2.1',
@ -51,6 +51,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
'Topic :: System :: Installation/Setup',

View file

@ -177,5 +177,5 @@ texinfo_documents = [
intersphinx_mapping = {
'python': ('https://docs.python.org/', None),
'acme': ('https://acme-python.readthedocs.org/en/latest/', None),
'certbot': ('https://certbot.eff.org/docs/', None),
'certbot': ('https://eff-certbot.readthedocs.io/en/stable/', None),
}

View file

@ -7,6 +7,9 @@
# in --editable mode (-e), just "pip install certbot-dns-luadns[docs]" does not work as
# expected and "pip install -e certbot-dns-luadns[docs]" must be used instead
# We also pin our dependencies for increased stability.
-c ../tools/requirements.txt
-e acme
-e certbot
-e certbot-dns-luadns[docs]

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
'dns-lexicon>=3.2.1',
@ -51,6 +51,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
'Topic :: System :: Installation/Setup',

View file

@ -177,5 +177,5 @@ texinfo_documents = [
intersphinx_mapping = {
'python': ('https://docs.python.org/', None),
'acme': ('https://acme-python.readthedocs.org/en/latest/', None),
'certbot': ('https://certbot.eff.org/docs/', None),
'certbot': ('https://eff-certbot.readthedocs.io/en/stable/', None),
}

View file

@ -7,6 +7,9 @@
# in --editable mode (-e), just "pip install certbot-dns-nsone[docs]" does not work as
# expected and "pip install -e certbot-dns-nsone[docs]" must be used instead
# We also pin our dependencies for increased stability.
-c ../tools/requirements.txt
-e acme
-e certbot
-e certbot-dns-nsone[docs]

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
'dns-lexicon>=3.2.1',
@ -51,6 +51,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
'Topic :: System :: Installation/Setup',

View file

View file

@ -177,5 +177,5 @@ texinfo_documents = [
intersphinx_mapping = {
'python': ('https://docs.python.org/', None),
'acme': ('https://acme-python.readthedocs.org/en/latest/', None),
'certbot': ('https://certbot.eff.org/docs/', None),
'certbot': ('https://eff-certbot.readthedocs.io/en/stable/', None),
}

View file

@ -7,6 +7,9 @@
# in --editable mode (-e), just "pip install certbot-dns-ovh[docs]" does not work as
# expected and "pip install -e certbot-dns-ovh[docs]" must be used instead
# We also pin our dependencies for increased stability.
-c ../tools/requirements.txt
-e acme
-e certbot
-e certbot-dns-ovh[docs]

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
'dns-lexicon>=3.2.1',
@ -51,6 +51,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
'Topic :: System :: Installation/Setup',

View file

@ -177,5 +177,5 @@ texinfo_documents = [
intersphinx_mapping = {
'python': ('https://docs.python.org/', None),
'acme': ('https://acme-python.readthedocs.org/en/latest/', None),
'certbot': ('https://certbot.eff.org/docs/', None),
'certbot': ('https://eff-certbot.readthedocs.io/en/stable/', None),
}

View file

@ -7,6 +7,9 @@
# in --editable mode (-e), just "pip install certbot-dns-rfc2136[docs]" does not work as
# expected and "pip install -e certbot-dns-rfc2136[docs]" must be used instead
# We also pin our dependencies for increased stability.
-c ../tools/requirements.txt
-e acme
-e certbot
-e certbot-dns-rfc2136[docs]

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
'dnspython>=1.15.0',
@ -51,6 +51,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
'Topic :: System :: Installation/Setup',

View file

@ -177,5 +177,5 @@ texinfo_documents = [
intersphinx_mapping = {
'python': ('https://docs.python.org/', None),
'acme': ('https://acme-python.readthedocs.org/en/latest/', None),
'certbot': ('https://certbot.eff.org/docs/', None),
'certbot': ('https://eff-certbot.readthedocs.io/en/stable/', None),
}

View file

@ -7,6 +7,9 @@
# in --editable mode (-e), just "pip install certbot-dns-route53[docs]" does not work as
# expected and "pip install -e certbot-dns-route53[docs]" must be used instead
# We also pin our dependencies for increased stability.
-c ../tools/requirements.txt
-e acme
-e certbot
-e certbot-dns-route53[docs]

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
'boto3',
@ -51,6 +51,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
'Topic :: System :: Installation/Setup',

View file

@ -177,5 +177,5 @@ texinfo_documents = [
intersphinx_mapping = {
'python': ('https://docs.python.org/', None),
'acme': ('https://acme-python.readthedocs.org/en/latest/', None),
'certbot': ('https://certbot.eff.org/docs/', None),
'certbot': ('https://eff-certbot.readthedocs.io/en/stable/', None),
}

View file

@ -7,6 +7,9 @@
# in --editable mode (-e), just "pip install certbot-dns-sakuracloud[docs]" does not work as
# expected and "pip install -e certbot-dns-sakuracloud[docs]" must be used instead
# We also pin our dependencies for increased stability.
-c ../tools/requirements.txt
-e acme
-e certbot
-e certbot-dns-sakuracloud[docs]

View file

@ -4,7 +4,7 @@ import sys
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
'dns-lexicon>=3.2.1',
@ -51,6 +51,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
'Topic :: System :: Installation/Setup',

View file

@ -1,6 +1,5 @@
# pylint: disable=too-many-lines
"""Nginx Configuration"""
from distutils.version import LooseVersion
import logging
import re
import socket
@ -142,8 +141,9 @@ class NginxConfigurator(common.Installer, interfaces.Authenticator):
# For a complete history, check out https://github.com/certbot/certbot/issues/7322
use_tls13 = self.version >= (1, 13, 0)
min_openssl_version = util.parse_loose_version('1.0.2l')
session_tix_off = self.version >= (1, 5, 9) and self.openssl_version and\
LooseVersion(self.openssl_version) >= LooseVersion('1.0.2l')
util.parse_loose_version(self.openssl_version) >= min_openssl_version
if use_tls13:
if session_tix_off:

View file

View file

@ -1,7 +1,7 @@
from setuptools import find_packages
from setuptools import setup
version = '1.21.0.dev0'
version = '1.22.0.dev0'
install_requires = [
# We specify the minimum acme and certbot version as the current plugin
@ -35,6 +35,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
'Topic :: System :: Installation/Setup',

View file

@ -2,7 +2,28 @@
Certbot adheres to [Semantic Versioning](https://semver.org/).
## 1.21.0 - master
## 1.22.0 - master
### Added
* Support for Python 3.10 was added to Certbot and all of its components.
* The function certbot.util.parse_loose_version was added to parse version
strings in the same way as the now deprecated distutils.version.LooseVersion
class from the Python standard library.
### Changed
* The function certbot.util.get_strict_version was deprecated and will be
removed in a future release.
### Fixed
* Fixed an issue on Windows where the `web.config` created by Certbot would sometimes
conflict with preexisting configurations (#9088).
More details about these changes can be found on our GitHub repo.
## 1.21.0 - 2021-11-02
### Added

View file

@ -1,3 +1,3 @@
"""Certbot client."""
# version number like 1.2.3a0, must have at least 2 parts, like 1.2
__version__ = '1.21.0.dev0'
__version__ = '1.22.0.dev0'

View file

@ -2,6 +2,7 @@
import datetime
import logging
import platform
from typing import cast
from typing import Any
from typing import Dict
from typing import List
@ -224,8 +225,10 @@ def perform_registration(acme, config, tos_cb):
raise errors.Error(msg)
try:
newreg = messages.NewRegistration.from_data(email=config.email,
external_account_binding=eab)
# TODO: Remove the cast once certbot package is fully typed
newreg = messages.NewRegistration.from_data(
email=config.email,
external_account_binding=cast(Optional[messages.ExternalAccountBinding], eab))
return acme.new_account_and_tos(newreg, tos_cb)
except messages.Error as e:
if e.code in ("invalidEmail", "invalidContact"):

View file

@ -31,6 +31,7 @@ _WEB_CONFIG_CONTENT = """\
<configuration>
<system.webServer>
<staticContent>
<remove fileExtension="."/>
<mimeMap fileExtension="." mimeType="text/plain" />
</staticContent>
</system.webServer>
@ -39,7 +40,10 @@ _WEB_CONFIG_CONTENT = """\
# This list references the hashes of all versions of the web.config files that Certbot could
# have generated during an HTTP-01 challenge. If you modify _WEB_CONFIG_CONTENT, you MUST add
# the new hash in this list.
_WEB_CONFIG_SHA256SUMS = ["20c5ca1bd58fa8ad5f07a2f1be8b7cbb707c20fcb607a8fc8db9393952846a97"]
_WEB_CONFIG_SHA256SUMS = [
"20c5ca1bd58fa8ad5f07a2f1be8b7cbb707c20fcb607a8fc8db9393952846a97",
"8d31383d3a079d2098a9d0c0921f4ab87e708b9868dc3f314d54094c2fe70336"
]
class Authenticator(common.Plugin, interfaces.Authenticator):

View file

@ -5,6 +5,7 @@ import logging
import re
import shutil
import stat
from typing import cast
from typing import Optional
import configobj
@ -12,6 +13,7 @@ from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.hazmat.primitives.serialization import load_pem_private_key
import parsedatetime
import pkg_resources
import pytz
import certbot
@ -33,7 +35,7 @@ logger = logging.getLogger(__name__)
ALL_FOUR = ("cert", "privkey", "chain", "fullchain")
README = "README"
CURRENT_VERSION = util.get_strict_version(certbot.__version__)
CURRENT_VERSION = pkg_resources.parse_version(certbot.__version__)
BASE_PRIVKEY_MODE = 0o600
@ -457,7 +459,7 @@ class RenewableCert(interfaces.RenewableCert):
conf_version = self.configuration.get("version")
if (conf_version is not None and
util.get_strict_version(conf_version) > CURRENT_VERSION):
pkg_resources.parse_version(conf_version) > CURRENT_VERSION):
logger.info(
"Attempting to parse the version %s renewal configuration "
"file found at %s with version %s of Certbot. This might not "
@ -894,7 +896,8 @@ class RenewableCert(interfaces.RenewableCert):
if target is None:
raise errors.CertStorageError("could not find the certificate file")
with open(target) as f:
return crypto_util.get_names_from_cert(f.read())
# TODO: Remove the cast once certbot package is fully typed
return crypto_util.get_names_from_cert(cast(bytes, f.read()))
def ocsp_revoked(self, version):
"""Is the specified cert version revoked according to OCSP?

View file

@ -18,6 +18,7 @@ Note, that all annotated challenges act as a proxy objects::
"""
import logging
from typing import Any
from typing import Type
import josepy as jose
@ -40,7 +41,7 @@ class AnnotatedChallenge(jose.ImmutableMap):
__slots__ = ('challb',)
_acme_type: Type[Challenge] = NotImplemented
def __getattr__(self, name):
def __getattr__(self, name: str) -> Any:
return getattr(self.challb, name)
@ -48,7 +49,7 @@ class KeyAuthorizationAnnotatedChallenge(AnnotatedChallenge):
"""Client annotated `KeyAuthorizationChallenge` challenge."""
__slots__ = ('challb', 'domain', 'account_key')
def response_and_validation(self, *args, **kwargs):
def response_and_validation(self, *args: Any, **kwargs: Any) -> Any:
"""Generate response and validation."""
return self.challb.chall.response_and_validation(
self.account_key, *args, **kwargs)

View file

@ -1,5 +1,7 @@
"""Certbot user-supplied configuration."""
import argparse
import copy
from typing import Any
from typing import List
from typing import Optional
from urllib import parse
@ -38,7 +40,9 @@ class NamespaceConfig:
"""
def __init__(self, namespace):
def __init__(self, namespace: argparse.Namespace) -> None:
self.namespace: argparse.Namespace
# Avoid recursion loop because of the delegation defined in __setattr__
object.__setattr__(self, 'namespace', namespace)
self.namespace.config_dir = os.path.abspath(self.namespace.config_dir)
@ -50,10 +54,10 @@ class NamespaceConfig:
# Delegate any attribute not explicitly defined to the underlying namespace object.
def __getattr__(self, name):
def __getattr__(self, name: str) -> Any:
return getattr(self.namespace, name)
def __setattr__(self, name, value):
def __setattr__(self, name: str, value: Any) -> None:
setattr(self.namespace, name, value)
@property
@ -61,6 +65,10 @@ class NamespaceConfig:
"""ACME Directory Resource URI."""
return self.namespace.server
@server.setter
def server(self, server_: str) -> None:
self.namespace.server = server_
@property
def email(self) -> Optional[str]:
"""Email used for registration and recovery contact.
@ -70,6 +78,10 @@ class NamespaceConfig:
"""
return self.namespace.email
@email.setter
def email(self, mail: str) -> None:
self.namespace.email = mail
@property
def rsa_key_size(self) -> int:
"""Size of the RSA key."""
@ -126,32 +138,32 @@ class NamespaceConfig:
return self.namespace.work_dir
@property
def accounts_dir(self):
def accounts_dir(self) -> str:
"""Directory where all account information is stored."""
return self.accounts_dir_for_server_path(self.server_path)
@property
def backup_dir(self):
def backup_dir(self) -> str:
"""Configuration backups directory."""
return os.path.join(self.namespace.work_dir, constants.BACKUP_DIR)
@property
def csr_dir(self):
def csr_dir(self) -> str:
"""Directory where new Certificate Signing Requests (CSRs) are saved."""
return os.path.join(self.namespace.config_dir, constants.CSR_DIR)
@property
def in_progress_dir(self):
def in_progress_dir(self) -> str:
"""Directory used before a permanent checkpoint is finalized."""
return os.path.join(self.namespace.work_dir, constants.IN_PROGRESS_DIR)
@property
def key_dir(self):
def key_dir(self) -> str:
"""Keys storage."""
return os.path.join(self.namespace.config_dir, constants.KEY_DIR)
@property
def temp_checkpoint_dir(self):
def temp_checkpoint_dir(self) -> str:
"""Temporary checkpoint directory."""
return os.path.join(
self.namespace.work_dir, constants.TEMP_CHECKPOINT_DIR)
@ -233,64 +245,64 @@ class NamespaceConfig:
return self.namespace.preferred_chain
@property
def server_path(self):
def server_path(self) -> str:
"""File path based on ``server``."""
parsed = parse.urlparse(self.namespace.server)
return (parsed.netloc + parsed.path).replace('/', os.path.sep)
def accounts_dir_for_server_path(self, server_path):
def accounts_dir_for_server_path(self, server_path: str) -> str:
"""Path to accounts directory based on server_path"""
server_path = misc.underscores_for_unsupported_characters_in_path(server_path)
return os.path.join(
self.namespace.config_dir, constants.ACCOUNTS_DIR, server_path)
@property
def default_archive_dir(self): # pylint: disable=missing-function-docstring
def default_archive_dir(self) -> str: # pylint: disable=missing-function-docstring
return os.path.join(self.namespace.config_dir, constants.ARCHIVE_DIR)
@property
def live_dir(self): # pylint: disable=missing-function-docstring
def live_dir(self) -> str: # pylint: disable=missing-function-docstring
return os.path.join(self.namespace.config_dir, constants.LIVE_DIR)
@property
def renewal_configs_dir(self): # pylint: disable=missing-function-docstring
def renewal_configs_dir(self) -> str: # pylint: disable=missing-function-docstring
return os.path.join(
self.namespace.config_dir, constants.RENEWAL_CONFIGS_DIR)
@property
def renewal_hooks_dir(self):
def renewal_hooks_dir(self) -> str:
"""Path to directory with hooks to run with the renew subcommand."""
return os.path.join(self.namespace.config_dir,
constants.RENEWAL_HOOKS_DIR)
@property
def renewal_pre_hooks_dir(self):
def renewal_pre_hooks_dir(self) -> str:
"""Path to the pre-hook directory for the renew subcommand."""
return os.path.join(self.renewal_hooks_dir,
constants.RENEWAL_PRE_HOOKS_DIR)
@property
def renewal_deploy_hooks_dir(self):
def renewal_deploy_hooks_dir(self) -> str:
"""Path to the deploy-hook directory for the renew subcommand."""
return os.path.join(self.renewal_hooks_dir,
constants.RENEWAL_DEPLOY_HOOKS_DIR)
@property
def renewal_post_hooks_dir(self):
def renewal_post_hooks_dir(self) -> str:
"""Path to the post-hook directory for the renew subcommand."""
return os.path.join(self.renewal_hooks_dir,
constants.RENEWAL_POST_HOOKS_DIR)
# Magic methods
def __deepcopy__(self, _memo):
def __deepcopy__(self, _memo: Any) -> 'NamespaceConfig':
# Work around https://bugs.python.org/issue1515 for py26 tests :( :(
# https://travis-ci.org/letsencrypt/letsencrypt/jobs/106900743#L3276
new_ns = copy.deepcopy(self.namespace)
return type(self)(new_ns)
def _check_config_sanity(config):
def _check_config_sanity(config: NamespaceConfig) -> None:
"""Validate command line options and display error message if
requirements are not met.

View file

@ -4,18 +4,26 @@
is capable of handling the signatures.
"""
import datetime
import hashlib
import logging
import re
from typing import Callable
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import TYPE_CHECKING
from typing import Union
import warnings
from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.dsa import DSAPublicKey
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
@ -23,6 +31,7 @@ from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import NoEncryption
from cryptography.hazmat.primitives.serialization import PrivateFormat
import josepy
from OpenSSL import crypto
from OpenSSL import SSL
import pyrfc3339
@ -34,6 +43,11 @@ from certbot import interfaces
from certbot import util
from certbot.compat import os
# Cryptography ed448 and ed25519 modules do not exist on oldest tests
if TYPE_CHECKING:
from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PublicKey
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
logger = logging.getLogger(__name__)
@ -87,8 +101,9 @@ def generate_key(key_size: int, key_dir: str, key_type: str = "rsa",
# TODO: Remove this call once zope dependencies are removed from Certbot.
def init_save_key(key_size, key_dir, key_type="rsa", elliptic_curve="secp256r1",
keyname="key-certbot.pem"):
def init_save_key(key_size: int, key_dir: str, key_type: str = "rsa",
elliptic_curve: str = "secp256r1",
keyname: str = "key-certbot.pem") -> util.Key:
"""Initializes and saves a privkey.
Inits key and saves it in PEM format on the filesystem.
@ -120,7 +135,7 @@ def init_save_key(key_size, key_dir, key_type="rsa", elliptic_curve="secp256r1",
keyname=keyname, strict_permissions=config.strict_permissions)
def generate_csr(privkey: util.Key, names: Set[str], path: str,
def generate_csr(privkey: util.Key, names: Union[List[str], Set[str]], path: str,
must_staple: bool = False, strict_permissions: bool = True) -> util.CSR:
"""Initialize a CSR with the given private key.
@ -151,7 +166,7 @@ def generate_csr(privkey: util.Key, names: Set[str], path: str,
# TODO: Remove this call once zope dependencies are removed from Certbot.
def init_save_csr(privkey, names, path):
def init_save_csr(privkey: util.Key, names: Set[str], path: str) -> util.CSR:
"""Initialize a CSR with the given private key.
.. deprecated:: 1.16.0
@ -182,12 +197,12 @@ def init_save_csr(privkey, names, path):
# A. Do more checks to verify that the CSR is trusted/valid
# B. Audit the parsing code for vulnerabilities
def valid_csr(csr):
def valid_csr(csr: bytes) -> bool:
"""Validate CSR.
Check if `csr` is a valid CSR for the given domains.
:param str csr: CSR in PEM.
:param bytes csr: CSR in PEM.
:returns: Validity of CSR.
:rtype: bool
@ -202,11 +217,11 @@ def valid_csr(csr):
return False
def csr_matches_pubkey(csr, privkey):
def csr_matches_pubkey(csr: bytes, privkey: bytes) -> bool:
"""Does private key correspond to the subject public key in the CSR?
:param str csr: CSR in PEM.
:param str privkey: Private key file contents (PEM)
:param bytes csr: CSR in PEM.
:param bytes privkey: Private key file contents (PEM)
:returns: Correspondence of private key to CSR subject public key.
:rtype: bool
@ -222,11 +237,11 @@ def csr_matches_pubkey(csr, privkey):
return False
def import_csr_file(csrfile, data):
def import_csr_file(csrfile: str, data: bytes) -> Tuple[int, util.CSR, List[str]]:
"""Import a CSR file, which can be either PEM or DER.
:param str csrfile: CSR filename
:param str data: contents of the CSR file
:param bytes data: contents of the CSR file
:returns: (`crypto.FILETYPE_PEM`,
util.CSR object representing the CSR,
@ -251,12 +266,13 @@ def import_csr_file(csrfile, data):
return PEM, util.CSR(file=csrfile, data=data_pem, form="pem"), domains
def make_key(bits=1024, key_type="rsa", elliptic_curve=None):
def make_key(bits: int = 1024, key_type: str = "rsa",
elliptic_curve: Optional[str] = None) -> bytes:
"""Generate PEM encoded RSA|EC key.
:param int bits: Number of bits if key_type=rsa. At least 1024 for RSA.
:param str ec_curve: The elliptic curve to use.
:param str key_type: The type of key to generate, but be rsa or ecdsa
:param str elliptic_curve: The elliptic curve to use.
:returns: new RSA or ECDSA key in PEM form with specified number of bits
or of type ec_curve when key_type ecdsa is used.
@ -269,6 +285,8 @@ def make_key(bits=1024, key_type="rsa", elliptic_curve=None):
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, bits)
elif key_type == 'ecdsa':
if not elliptic_curve:
raise errors.Error("When key_type == ecdsa, elliptic_curve must be set.")
try:
name = elliptic_curve.upper()
if name in ('SECP256R1', 'SECP384R1', 'SECP521R1'):
@ -297,7 +315,7 @@ def make_key(bits=1024, key_type="rsa", elliptic_curve=None):
return crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
def valid_privkey(privkey):
def valid_privkey(privkey: str) -> bool:
"""Is valid RSA private key?
:param str privkey: Private key file contents in PEM
@ -313,7 +331,7 @@ def valid_privkey(privkey):
return False
def verify_renewable_cert(renewable_cert):
def verify_renewable_cert(renewable_cert: interfaces.RenewableCert) -> None:
"""For checking that your certs were not corrupted on disk.
Several things are checked:
@ -331,7 +349,7 @@ def verify_renewable_cert(renewable_cert):
verify_cert_matches_priv_key(renewable_cert.cert_path, renewable_cert.key_path)
def verify_renewable_cert_sig(renewable_cert):
def verify_renewable_cert_sig(renewable_cert: interfaces.RenewableCert) -> None:
"""Verifies the signature of a RenewableCert object.
:param renewable_cert: cert to verify
@ -355,14 +373,16 @@ def verify_renewable_cert_sig(renewable_cert):
raise errors.Error(error_str)
def verify_signed_payload(public_key, signature, payload, signature_hash_algorithm):
def verify_signed_payload(public_key: Union[DSAPublicKey, 'Ed25519PublicKey', 'Ed448PublicKey',
EllipticCurvePublicKey, RSAPublicKey],
signature: bytes, payload: bytes,
signature_hash_algorithm: hashes.HashAlgorithm) -> None:
"""Check the signature of a payload.
:param RSAPublicKey/EllipticCurvePublicKey public_key: the public_key to check signature
:param bytes signature: the signature bytes
:param bytes payload: the payload bytes
:param cryptography.hazmat.primitives.hashes.HashAlgorithm \
signature_hash_algorithm: algorithm used to hash the payload
:param hashes.HashAlgorithm signature_hash_algorithm: algorithm used to hash the payload
:raises InvalidSignature: If signature verification fails.
:raises errors.Error: If public key type is not supported
@ -382,10 +402,10 @@ def verify_signed_payload(public_key, signature, payload, signature_hash_algorit
verifier.update(payload)
verifier.verify()
else:
raise errors.Error("Unsupported public key type")
raise errors.Error("Unsupported public key type.")
def verify_cert_matches_priv_key(cert_path, key_path):
def verify_cert_matches_priv_key(cert_path: str, key_path: str) -> None:
""" Verifies that the private key and cert match.
:param str cert_path: path to a cert in PEM format
@ -407,7 +427,7 @@ def verify_cert_matches_priv_key(cert_path, key_path):
raise errors.Error(error_str)
def verify_fullchain(renewable_cert):
def verify_fullchain(renewable_cert: interfaces.RenewableCert) -> None:
""" Verifies that fullchain is indeed cert concatenated with chain.
:param renewable_cert: cert to verify
@ -434,7 +454,7 @@ def verify_fullchain(renewable_cert):
raise e
def pyopenssl_load_certificate(data):
def pyopenssl_load_certificate(data: bytes) -> Tuple[crypto.X509, int]:
"""Load PEM/DER certificate.
:raises errors.Error:
@ -452,8 +472,9 @@ def pyopenssl_load_certificate(data):
str(error) for error in openssl_errors)))
def _load_cert_or_req(cert_or_req_str, load_func,
typ=crypto.FILETYPE_PEM):
def _load_cert_or_req(cert_or_req_str: bytes,
load_func: Callable[[int, bytes], Union[crypto.X509, crypto.X509Req]],
typ: int = crypto.FILETYPE_PEM) -> Union[crypto.X509, crypto.X509Req]:
try:
return load_func(typ, cert_or_req_str)
except crypto.Error as err:
@ -462,14 +483,16 @@ def _load_cert_or_req(cert_or_req_str, load_func,
raise
def _get_sans_from_cert_or_req(cert_or_req_str, load_func,
typ=crypto.FILETYPE_PEM):
def _get_sans_from_cert_or_req(cert_or_req_str: bytes,
load_func: Callable[[int, bytes], Union[crypto.X509,
crypto.X509Req]],
typ: int = crypto.FILETYPE_PEM) -> List[str]:
# pylint: disable=protected-access
return acme_crypto_util._pyopenssl_cert_or_req_san(_load_cert_or_req(
cert_or_req_str, load_func, typ))
def get_sans_from_cert(cert, typ=crypto.FILETYPE_PEM):
def get_sans_from_cert(cert: bytes, typ: int = crypto.FILETYPE_PEM) -> List[str]:
"""Get a list of Subject Alternative Names from a certificate.
:param str cert: Certificate (encoded).
@ -483,17 +506,21 @@ def get_sans_from_cert(cert, typ=crypto.FILETYPE_PEM):
cert, crypto.load_certificate, typ)
def _get_names_from_cert_or_req(cert_or_req, load_func, typ):
def _get_names_from_cert_or_req(cert_or_req: bytes,
load_func: Callable[[int, bytes], Union[crypto.X509,
crypto.X509Req]],
typ: int) -> List[str]:
loaded_cert_or_req = _load_cert_or_req(cert_or_req, load_func, typ)
return _get_names_from_loaded_cert_or_req(loaded_cert_or_req)
def _get_names_from_loaded_cert_or_req(loaded_cert_or_req):
def _get_names_from_loaded_cert_or_req(loaded_cert_or_req: Union[crypto.X509, crypto.X509Req]
) -> List[str]:
# pylint: disable=protected-access
return acme_crypto_util._pyopenssl_cert_or_req_all_names(loaded_cert_or_req)
def get_names_from_cert(csr, typ=crypto.FILETYPE_PEM):
def get_names_from_cert(cert: bytes, typ: int = crypto.FILETYPE_PEM) -> List[str]:
"""Get a list of domains from a cert, including the CN if it is set.
:param str cert: Certificate (encoded).
@ -504,13 +531,13 @@ def get_names_from_cert(csr, typ=crypto.FILETYPE_PEM):
"""
return _get_names_from_cert_or_req(
csr, crypto.load_certificate, typ)
cert, crypto.load_certificate, typ)
def get_names_from_req(csr: str, typ: int = crypto.FILETYPE_PEM) -> List[str]:
def get_names_from_req(csr: bytes, typ: int = crypto.FILETYPE_PEM) -> List[str]:
"""Get a list of domains from a CSR, including the CN if it is set.
:param str cert: CSR (encoded).
:param str csr: CSR (encoded).
:param typ: `crypto.FILETYPE_PEM` or `crypto.FILETYPE_ASN1`
:returns: A list of domain names.
:rtype: list
@ -519,7 +546,8 @@ def get_names_from_req(csr: str, typ: int = crypto.FILETYPE_PEM) -> List[str]:
return _get_names_from_cert_or_req(csr, crypto.load_certificate_request, typ)
def dump_pyopenssl_chain(chain, filetype=crypto.FILETYPE_PEM):
def dump_pyopenssl_chain(chain: Union[List[crypto.X509], List[josepy.ComparableX509]],
filetype: int = crypto.FILETYPE_PEM) -> bytes:
"""Dump certificate chain into a bundle.
:param list chain: List of `crypto.X509` (or wrapped in
@ -531,7 +559,7 @@ def dump_pyopenssl_chain(chain, filetype=crypto.FILETYPE_PEM):
return acme_crypto_util.dump_pyopenssl_chain(chain, filetype)
def notBefore(cert_path):
def notBefore(cert_path: str) -> datetime.datetime:
"""When does the cert at cert_path start being valid?
:param str cert_path: path to a cert in PEM format
@ -543,7 +571,7 @@ def notBefore(cert_path):
return _notAfterBefore(cert_path, crypto.X509.get_notBefore)
def notAfter(cert_path):
def notAfter(cert_path: str) -> datetime.datetime:
"""When does the cert at cert_path stop being valid?
:param str cert_path: path to a cert in PEM format
@ -555,7 +583,8 @@ def notAfter(cert_path):
return _notAfterBefore(cert_path, crypto.X509.get_notAfter)
def _notAfterBefore(cert_path, method):
def _notAfterBefore(cert_path: str,
method: Callable[[crypto.X509], Optional[bytes]]) -> datetime.datetime:
"""Internal helper function for finding notbefore/notafter.
:param str cert_path: path to a cert in PEM format
@ -571,6 +600,8 @@ def _notAfterBefore(cert_path, method):
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
# pyopenssl always returns bytes
timestamp = method(x509)
if not timestamp:
raise errors.Error("Error while invoking timestamp method, None has been returned.")
reformatted_timestamp = [timestamp[0:4], b"-", timestamp[4:6], b"-",
timestamp[6:8], b"T", timestamp[8:10], b":",
timestamp[10:12], b":", timestamp[12:]]
@ -580,7 +611,7 @@ def _notAfterBefore(cert_path, method):
return pyrfc3339.parse(timestamp_str)
def sha256sum(filename):
def sha256sum(filename: str) -> str:
"""Compute a sha256sum of a file.
NB: In given file, platform specific newlines characters will be converted
@ -607,7 +638,7 @@ CERT_PEM_REGEX = re.compile(
)
def cert_and_chain_from_fullchain(fullchain_pem):
def cert_and_chain_from_fullchain(fullchain_pem: str) -> Tuple[str, str]:
"""Split fullchain_pem into cert_pem and chain_pem
:param str fullchain_pem: concatenated cert + chain
@ -635,7 +666,7 @@ def cert_and_chain_from_fullchain(fullchain_pem):
return (certs_normalized[0], "".join(certs_normalized[1:]))
def get_serial_from_cert(cert_path):
def get_serial_from_cert(cert_path: str) -> int:
"""Retrieve the serial number of a certificate from certificate path
:param str cert_path: path to a cert in PEM format
@ -649,7 +680,8 @@ def get_serial_from_cert(cert_path):
return x509.get_serial_number()
def find_chain_with_issuer(fullchains, issuer_cn, warn_on_no_match=False):
def find_chain_with_issuer(fullchains: List[str], issuer_cn: str,
warn_on_no_match: bool = False) -> str:
"""Chooses the first certificate chain from fullchains whose topmost
intermediate has an Issuer Common Name matching issuer_cn (in other words
the first chain which chains to a root whose name matches issuer_cn).

View file

@ -1,4 +1,9 @@
"""Certbot client errors."""
from typing import Set
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from certbot.achallenges import AnnotatedChallenge
class Error(Exception):
@ -50,12 +55,12 @@ class FailedChallenges(AuthorizationError):
:ivar set failed_achalls: Failed `.AnnotatedChallenge` instances.
"""
def __init__(self, failed_achalls):
def __init__(self, failed_achalls: Set['AnnotatedChallenge']) -> None:
assert failed_achalls
self.failed_achalls = failed_achalls
super().__init__()
def __str__(self):
def __str__(self) -> str:
return "Failed authorization procedure. {0}".format(
", ".join(
"{0} ({1}): {2}".format(achall.domain, achall.typ, achall.error)
@ -94,7 +99,7 @@ class PluginStorageError(PluginError):
class StandaloneBindError(Error):
"""Standalone plugin bind error."""
def __init__(self, socket_error, port):
def __init__(self, socket_error: OSError, port: int) -> None:
super().__init__(
"Problem binding to port {0}: {1}".format(port, socket_error))
self.socket_error = socket_error

View file

@ -4,25 +4,33 @@ from abc import abstractmethod
from argparse import ArgumentParser
import sys
from types import ModuleType
from typing import Any
from typing import cast
from typing import Iterable
from typing import List
from typing import Optional
from typing import Type
from typing import TYPE_CHECKING
from typing import Union
import warnings
import zope.interface
from acme.challenges import Challenge
from acme.challenges import ChallengeResponse
from certbot.achallenges import AnnotatedChallenge
from acme.client import ClientBase
from certbot import configuration
from certbot.achallenges import AnnotatedChallenge
if TYPE_CHECKING:
from certbot._internal.account import Account
class AccountStorage(metaclass=ABCMeta):
"""Accounts storage interface."""
@abstractmethod
def find_all(self): # pragma: no cover
def find_all(self) -> List['Account']: # pragma: no cover
"""Find all accounts.
:returns: All found accounts.
@ -32,17 +40,20 @@ class AccountStorage(metaclass=ABCMeta):
raise NotImplementedError()
@abstractmethod
def load(self, account_id): # pragma: no cover
def load(self, account_id: str) -> 'Account': # pragma: no cover
"""Load an account by its id.
:raises .AccountNotFound: if account could not be found
:raises .AccountStorageError: if account could not be loaded
:returns: The account loaded
:rtype: .Account
"""
raise NotImplementedError()
@abstractmethod
def save(self, account, client): # pragma: no cover
def save(self, account: 'Account', client: ClientBase) -> None: # pragma: no cover
"""Save account.
:raises .AccountStorageError: if account could not be saved
@ -96,8 +107,11 @@ class Plugin(metaclass=ABCMeta):
description: str = NotImplemented
"""Short plugin description"""
name: str = NotImplemented
"""Unique name of the plugin"""
@abstractmethod
def __init__(self, config: configuration.NamespaceConfig, name: str):
def __init__(self, config: Optional[configuration.NamespaceConfig], name: str) -> None:
"""Create a new `Plugin`.
:param configuration.NamespaceConfig config: Configuration.
@ -167,7 +181,7 @@ class Authenticator(Plugin):
"""
@abstractmethod
def get_chall_pref(self, domain: str) -> Iterable[Challenge]:
def get_chall_pref(self, domain: str) -> Iterable[Type[Challenge]]:
"""Return `collections.Iterable` of challenge preferences.
:param str domain: Domain for which challenge preferences are sought.
@ -181,7 +195,7 @@ class Authenticator(Plugin):
"""
@abstractmethod
def perform(self, achalls: List[AnnotatedChallenge]) -> Iterable[ChallengeResponse]:
def perform(self, achalls: List[AnnotatedChallenge]) -> List[ChallengeResponse]:
"""Perform the given challenge.
:param list achalls: Non-empty (guaranteed) list of
@ -189,10 +203,10 @@ class Authenticator(Plugin):
instances, such that it contains types found within
:func:`get_chall_pref` only.
:returns: `collections.Iterable` of ACME
:returns: list of ACME
:class:`~acme.challenges.ChallengeResponse` instances corresponding to each provided
:class:`~acme.challenges.Challenge`.
:rtype: :class:`collections.Iterable` of
:rtype: :class:`collections.List` of
:class:`acme.challenges.ChallengeResponse`,
where responses are required to be returned in
the same order as corresponding input challenges
@ -261,7 +275,8 @@ class Installer(Plugin):
"""
@abstractmethod
def enhance(self, domain: str, enhancement: str, options: Optional[List[str]] = None) -> None:
def enhance(self, domain: str, enhancement: str,
options: Optional[Union[List[str], str]] = None) -> None:
"""Perform a configuration enhancement.
:param str domain: domain for which to provide enhancement
@ -360,7 +375,7 @@ class RenewableCert(metaclass=ABCMeta):
@property
@abstractmethod
def cert_path(self):
def cert_path(self) -> str:
"""Path to the certificate file.
:rtype: str
@ -369,7 +384,7 @@ class RenewableCert(metaclass=ABCMeta):
@property
@abstractmethod
def key_path(self):
def key_path(self) -> str:
"""Path to the private key file.
:rtype: str
@ -378,7 +393,7 @@ class RenewableCert(metaclass=ABCMeta):
@property
@abstractmethod
def chain_path(self):
def chain_path(self) -> str:
"""Path to the certificate chain file.
:rtype: str
@ -387,7 +402,7 @@ class RenewableCert(metaclass=ABCMeta):
@property
@abstractmethod
def fullchain_path(self):
def fullchain_path(self) -> str:
"""Path to the full chain file.
The full chain is the certificate file plus the chain file.
@ -398,7 +413,7 @@ class RenewableCert(metaclass=ABCMeta):
@property
@abstractmethod
def lineagename(self):
def lineagename(self) -> str:
"""Name given to the certificate lineage.
:rtype: str
@ -406,7 +421,7 @@ class RenewableCert(metaclass=ABCMeta):
"""
@abstractmethod
def names(self):
def names(self) -> List[str]:
"""What are the subject names of this certificate?
:returns: the subject names
@ -442,7 +457,7 @@ class GenericUpdater(metaclass=ABCMeta):
"""
@abstractmethod
def generic_updates(self, lineage, *args, **kwargs):
def generic_updates(self, lineage: RenewableCert, *args: Any, **kwargs: Any) -> None:
"""Perform any update types defined by the installer.
If an installer is a subclass of the class containing this method, this
@ -470,7 +485,7 @@ class RenewDeployer(metaclass=ABCMeta):
"""
@abstractmethod
def renew_deploy(self, lineage, *args, **kwargs):
def renew_deploy(self, lineage: RenewableCert, *args: Any, **kwargs: Any) -> None:
"""Perform updates defined by installer when a certificate has been renewed
If an installer is a subclass of the class containing this method, this
@ -494,10 +509,10 @@ class _ZopeInterfacesDeprecationModule:
Internal class delegating to a module, and displaying warnings when
attributes related to Zope interfaces are accessed.
"""
def __init__(self, module):
def __init__(self, module: ModuleType) -> None:
self.__dict__['_module'] = module
def __getattr__(self, attr):
def __getattr__(self, attr: str) -> None:
if attr in ('IConfig', 'IPlugin', 'IPluginFactory', 'IAuthenticator',
'IInstaller', 'IDisplay', 'IReporter'):
warnings.warn('{0} attribute in certbot.interfaces module is deprecated '
@ -505,13 +520,13 @@ class _ZopeInterfacesDeprecationModule:
DeprecationWarning, stacklevel=2)
return getattr(self._module, attr)
def __setattr__(self, attr, value): # pragma: no cover
def __setattr__(self, attr: str, value: Any) -> None: # pragma: no cover
setattr(self._module, attr, value)
def __delattr__(self, attr): # pragma: no cover
def __delattr__(self, attr: str) -> None: # pragma: no cover
delattr(self._module, attr)
def __dir__(self): # pragma: no cover
def __dir__(self) -> List[str]: # pragma: no cover
return ['_module'] + dir(self._module)

View file

@ -1,8 +1,12 @@
"""Certbot main public entry point."""
from typing import List
from typing import Optional
from typing import Union
from certbot._internal import main as internal_main
def main(cli_args=None):
def main(cli_args: Optional[List[str]] = None) -> Optional[Union[str, int]]:
"""Run Certbot.
:param cli_args: command line to Certbot, defaults to ``sys.argv[1:]``

View file

@ -38,7 +38,7 @@ logger = logging.getLogger(__name__)
class RevocationChecker:
"""This class figures out OCSP checking on this system, and performs it."""
def __init__(self, enforce_openssl_binary_usage=False):
def __init__(self, enforce_openssl_binary_usage: bool = False) -> None:
self.broken = False
self.use_openssl_binary = enforce_openssl_binary_usage or not ocsp
@ -215,7 +215,8 @@ def _check_ocsp_cryptography(cert_path: str, chain_path: str, url: str, timeout:
return False
def _check_ocsp_response(response_ocsp, request_ocsp, issuer_cert, cert_path):
def _check_ocsp_response(response_ocsp: 'ocsp.OCSPResponse', request_ocsp: 'ocsp.OCSPRequest',
issuer_cert: x509.Certificate, cert_path: str) -> None:
"""Verify that the OCSP is valid for several criteria"""
# Assert OCSP response corresponds to the certificate we are talking about
if response_ocsp.serial_number != request_ocsp.serial_number:
@ -249,13 +250,14 @@ def _check_ocsp_response(response_ocsp, request_ocsp, issuer_cert, cert_path):
raise AssertionError('param nextUpdate is in the past.')
def _check_ocsp_response_signature(response_ocsp, issuer_cert, cert_path):
def _check_ocsp_response_signature(response_ocsp: 'ocsp.OCSPResponse',
issuer_cert: x509.Certificate, cert_path: str) -> None:
"""Verify an OCSP response signature against certificate issuer or responder"""
def _key_hash(cert):
def _key_hash(cert: x509.Certificate) -> bytes:
return x509.SubjectKeyIdentifier.from_public_key(cert.public_key()).digest
if response_ocsp.responder_name == issuer_cert.subject or \
response_ocsp.responder_key_hash == _key_hash(issuer_cert):
if (response_ocsp.responder_name == issuer_cert.subject
or response_ocsp.responder_key_hash == _key_hash(issuer_cert)):
# Case where the OCSP responder is also the certificate issuer
logger.debug('OCSP response for certificate %s is signed by the certificate\'s issuer.',
cert_path)
@ -289,21 +291,23 @@ def _check_ocsp_response_signature(response_ocsp, issuer_cert, cert_path):
raise AssertionError('responder is not authorized by issuer to sign OCSP responses')
# Following line may raise UnsupportedAlgorithm
chosen_hash = responder_cert.signature_hash_algorithm
chosen_cert_hash = responder_cert.signature_hash_algorithm
# For a delegate OCSP responder, we need first check that its certificate is effectively
# signed by the certificate issuer.
crypto_util.verify_signed_payload(issuer_cert.public_key(), responder_cert.signature,
responder_cert.tbs_certificate_bytes, chosen_hash)
responder_cert.tbs_certificate_bytes, chosen_cert_hash)
# Following line may raise UnsupportedAlgorithm
chosen_hash = response_ocsp.signature_hash_algorithm
chosen_response_hash = response_ocsp.signature_hash_algorithm
# We check that the OSCP response is effectively signed by the responder
# (an authorized delegate one or the certificate issuer itself).
if not chosen_response_hash:
raise AssertionError("no signature hash algorithm defined")
crypto_util.verify_signed_payload(responder_cert.public_key(), response_ocsp.signature,
response_ocsp.tbs_response_bytes, chosen_hash)
response_ocsp.tbs_response_bytes, chosen_response_hash)
def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors):
def _translate_ocsp_query(cert_path: str, ocsp_output: str, ocsp_errors: str) -> bool:
"""Parse openssl's weird output to work out what it means."""
states = ("good", "revoked", "unknown")

0
certbot/certbot/py.typed Normal file
View file

View file

@ -5,7 +5,13 @@ import logging
import shutil
import time
import traceback
from typing import Iterable
from typing import List
from typing import Set
from typing import TextIO
from typing import Tuple
from certbot import configuration
from certbot import errors
from certbot import util
from certbot._internal import constants
@ -57,13 +63,13 @@ class Reverter:
:type config: :class:`certbot.configuration.NamespaceConfig`
"""
def __init__(self, config):
def __init__(self, config: configuration.NamespaceConfig) -> None:
self.config = config
util.make_or_verify_dir(
config.backup_dir, constants.CONFIG_DIRS_MODE, self.config.strict_permissions)
def revert_temporary_config(self):
def revert_temporary_config(self) -> None:
"""Reload users original configuration files after a temporary save.
This function should reinstall the users original configuration files
@ -83,7 +89,7 @@ class Reverter:
)
raise errors.ReverterError("Unable to revert temporary config")
def rollback_checkpoints(self, rollback=1):
def rollback_checkpoints(self, rollback: int = 1) -> None:
"""Revert 'rollback' number of configuration checkpoints.
:param int rollback: Number of checkpoints to reverse. A str num will be
@ -125,7 +131,7 @@ class Reverter:
"Unable to load checkpoint during rollback")
rollback -= 1
def add_to_temp_checkpoint(self, save_files, save_notes):
def add_to_temp_checkpoint(self, save_files: Set[str], save_notes: str) -> None:
"""Add files to temporary checkpoint.
:param set save_files: set of filepaths to save
@ -135,7 +141,7 @@ class Reverter:
self._add_to_checkpoint_dir(
self.config.temp_checkpoint_dir, save_files, save_notes)
def add_to_checkpoint(self, save_files, save_notes):
def add_to_checkpoint(self, save_files: Set[str], save_notes: str) -> None:
"""Add files to a permanent checkpoint.
:param set save_files: set of filepaths to save
@ -147,7 +153,7 @@ class Reverter:
self._add_to_checkpoint_dir(
self.config.in_progress_dir, save_files, save_notes)
def _add_to_checkpoint_dir(self, cp_dir, save_files, save_notes):
def _add_to_checkpoint_dir(self, cp_dir: str, save_files: Set[str], save_notes: str) -> None:
"""Add save files to checkpoint directory.
:param str cp_dir: Checkpoint directory filepath
@ -192,7 +198,7 @@ class Reverter:
with open(os.path.join(cp_dir, "CHANGES_SINCE"), "a") as notes_fd:
notes_fd.write(save_notes)
def _read_and_append(self, filepath):
def _read_and_append(self, filepath: str) -> Tuple[TextIO, List[str]]:
"""Reads the file lines and returns a file obj.
Read the file returning the lines, and a pointer to the end of the file.
@ -209,7 +215,7 @@ class Reverter:
return op_fd, lines
def _recover_checkpoint(self, cp_dir):
def _recover_checkpoint(self, cp_dir: str) -> None:
"""Recover a specific checkpoint.
Recover a specific checkpoint provided by cp_dir
@ -248,7 +254,7 @@ class Reverter:
raise errors.ReverterError(
"Unable to remove directory: %s" % cp_dir)
def _run_undo_commands(self, filepath):
def _run_undo_commands(self, filepath: str) -> None:
"""Run all commands in a file."""
# NOTE: csv module uses native strings. That is unicode on Python 3
# It is strongly advised to set newline = '' on Python 3 with CSV,
@ -263,7 +269,7 @@ class Reverter:
logger.error(
"Unable to run undo command: %s", " ".join(command))
def _check_tempfile_saves(self, save_files):
def _check_tempfile_saves(self, save_files: Set[str]) -> None:
"""Verify save isn't overwriting any temporary files.
:param set save_files: Set of files about to be saved.
@ -293,7 +299,7 @@ class Reverter:
"Attempting to overwrite challenge "
"file - %s" % filename)
def register_file_creation(self, temporary, *files):
def register_file_creation(self, temporary: bool, *files: str) -> None:
r"""Register the creation of all files during certbot execution.
Call this method before writing to the file to make sure that the
@ -332,7 +338,7 @@ class Reverter:
if new_fd is not None:
new_fd.close()
def register_undo_command(self, temporary, command):
def register_undo_command(self, temporary: bool, command: Iterable[str]) -> None:
"""Register a command to be run to undo actions taken.
.. warning:: This function does not enforce order of operations in terms
@ -362,7 +368,7 @@ class Reverter:
raise errors.ReverterError(
"Unable to register undo command.")
def _get_cp_dir(self, temporary):
def _get_cp_dir(self, temporary: bool) -> str:
"""Return the proper reverter directory."""
if temporary:
cp_dir = self.config.temp_checkpoint_dir
@ -374,7 +380,7 @@ class Reverter:
return cp_dir
def recovery_routine(self):
def recovery_routine(self) -> None:
"""Revert configuration to most recent finalized checkpoint.
Remove all changes (temporary and permanent) that have not been
@ -402,7 +408,7 @@ class Reverter:
"Incomplete or failed recovery for IN_PROGRESS checkpoint "
"- %s" % self.config.in_progress_dir)
def _remove_contained_files(self, file_list):
def _remove_contained_files(self, file_list: str) -> bool:
"""Erase all files contained within file_list.
:param str file_list: file containing list of file paths to be deleted
@ -440,7 +446,7 @@ class Reverter:
return True
def finalize_checkpoint(self, title):
def finalize_checkpoint(self, title: str) -> None:
"""Finalize the checkpoint.
Timestamps and permanently saves all changes made through the use
@ -481,7 +487,7 @@ class Reverter:
# rename the directory as a timestamp
self._timestamp_progress_dir()
def _checkpoint_timestamp(self):
def _checkpoint_timestamp(self) -> str:
"Determine the timestamp of the checkpoint, enforcing monotonicity."
timestamp = str(time.time())
others = glob.glob(os.path.join(self.config.backup_dir, "[0-9]*"))
@ -502,7 +508,7 @@ class Reverter:
timestamp = timetravel
return timestamp
def _timestamp_progress_dir(self):
def _timestamp_progress_dir(self) -> None:
"""Timestamp the checkpoint."""
# It is possible save checkpoints faster than 1 per second resulting in
# collisions in the naming convention.

View file

@ -2,6 +2,8 @@
import datetime
from typing import Any
from typing import Dict
from typing import Iterable
from typing import Tuple
import josepy as jose
@ -22,13 +24,13 @@ DNS01_2 = challenges.DNS01(token=b"cafecafecafecafecafecafe0feedbac")
CHALLENGES = [HTTP01, DNS01]
def gen_combos(challbs):
def gen_combos(challbs: Iterable[messages.ChallengeBody]) -> Tuple[Tuple[int], ...]:
"""Generate natural combinations for challbs."""
# completing a single DV challenge satisfies the CA
return tuple((i,) for i, _ in enumerate(challbs))
def chall_to_challb(chall, status):
def chall_to_challb(chall: challenges.Challenge, status: messages.Status) -> messages.ChallengeBody:
"""Return ChallengeBody from Challenge."""
kwargs = {
"chall": chall,
@ -58,7 +60,9 @@ DNS01_A_2 = auth_handler.challb_to_achall(DNS01_P_2, JWK, "esimerkki.example.org
ACHALLENGES = [HTTP01_A, DNS01_A]
def gen_authzr(authz_status, domain, challs, statuses, combos=True):
def gen_authzr(authz_status: messages.Status, domain: str, challs: Iterable[challenges.Challenge],
statuses: Iterable[messages.Status],
combos: bool = True) -> messages.AuthorizationResource:
"""Generate an authorization resource.
:param authz_status: Status object

View file

@ -2,21 +2,26 @@
from importlib import reload as reload_module
import io
import logging
from multiprocessing import Event
from multiprocessing import Process
import multiprocessing
from multiprocessing import synchronize
import shutil
import sys
import tempfile
from typing import Any
from typing import Callable
from typing import cast
from typing import IO
from typing import Iterable
from typing import List
from typing import Optional
from typing import Union
import unittest
import warnings
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import josepy as jose
import OpenSSL
from OpenSSL import crypto
import pkg_resources
from certbot import configuration
@ -54,7 +59,8 @@ class DummyInstaller(common.Installer):
fullchain_path: str) -> None:
pass
def enhance(self, domain: str, enhancement: str, options: Optional[List[str]] = None) -> None:
def enhance(self, domain: str, enhancement: str,
options: Optional[Union[List[str], str]] = None) -> None:
pass
def supported_enhancements(self) -> List[str]:
@ -70,7 +76,7 @@ class DummyInstaller(common.Installer):
pass
@classmethod
def add_parser_arguments(cls, add):
def add_parser_arguments(cls, add: Callable[..., None]) -> None:
pass
def prepare(self) -> None:
@ -80,13 +86,13 @@ class DummyInstaller(common.Installer):
pass
def vector_path(*names):
def vector_path(*names: str) -> str:
"""Path to a test vector."""
return pkg_resources.resource_filename(
__name__, os.path.join('testdata', *names))
def load_vector(*names):
def load_vector(*names: str) -> bytes:
"""Load contents of a test vector."""
# luckily, resource_string opens file in binary mode
data = pkg_resources.resource_string(
@ -100,7 +106,7 @@ def load_vector(*names):
return data
def _guess_loader(filename, loader_pem, loader_der):
def _guess_loader(filename: str, loader_pem: int, loader_der: int) -> int:
_, ext = os.path.splitext(filename)
if ext.lower() == '.pem':
return loader_pem
@ -109,41 +115,45 @@ def _guess_loader(filename, loader_pem, loader_der):
raise ValueError("Loader could not be recognized based on extension") # pragma: no cover
def load_cert(*names):
def load_cert(*names: str) -> crypto.X509:
"""Load certificate."""
loader = _guess_loader(
names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
return OpenSSL.crypto.load_certificate(loader, load_vector(*names))
names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
return crypto.load_certificate(loader, load_vector(*names))
def load_csr(*names):
def load_csr(*names: str) -> crypto.X509Req:
"""Load certificate request."""
loader = _guess_loader(
names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
return OpenSSL.crypto.load_certificate_request(loader, load_vector(*names))
names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
return crypto.load_certificate_request(loader, load_vector(*names))
def load_comparable_csr(*names):
def load_comparable_csr(*names: str) -> jose.ComparableX509:
"""Load ComparableX509 certificate request."""
return jose.ComparableX509(load_csr(*names))
def load_rsa_private_key(*names):
def load_rsa_private_key(*names: str) -> jose.ComparableRSAKey:
"""Load RSA private key."""
loader = _guess_loader(names[-1], serialization.load_pem_private_key,
serialization.load_der_private_key)
return jose.ComparableRSAKey(loader(
loader = _guess_loader(names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
loader_fn: Callable[..., Any]
if loader == crypto.FILETYPE_PEM:
loader_fn = serialization.load_pem_private_key
else:
loader_fn = serialization.load_der_private_key
return jose.ComparableRSAKey(loader_fn(
load_vector(*names), password=None, backend=default_backend()))
def load_pyopenssl_private_key(*names):
def load_pyopenssl_private_key(*names: str) -> crypto.PKey:
"""Load pyOpenSSL private key."""
loader = _guess_loader(
names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
return OpenSSL.crypto.load_privatekey(loader, load_vector(*names))
names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
return crypto.load_privatekey(loader, load_vector(*names))
def make_lineage(config_dir, testfile, ec=False):
def make_lineage(config_dir: str, testfile: str, ec: bool = False) -> str:
"""Creates a lineage defined by testfile.
This creates the archive, live, and renewal directories if
@ -151,6 +161,7 @@ def make_lineage(config_dir, testfile, ec=False):
:param str config_dir: path to the configuration directory
:param str testfile: configuration file to base the lineage on
:param bool ec: True if we generate the lineage with an ECDSA key
:returns: path to the renewal conf file for the created lineage
:rtype: str
@ -187,7 +198,7 @@ def make_lineage(config_dir, testfile, ec=False):
return conf_path
def patch_get_utility(target='zope.component.getUtility'):
def patch_get_utility(target: str = 'zope.component.getUtility') -> mock.MagicMock:
"""Deprecated, patch certbot.display.util directly or use patch_display_util instead.
:param str target: path to patch
@ -199,11 +210,11 @@ def patch_get_utility(target='zope.component.getUtility'):
warnings.warn('Decorator certbot.tests.util.patch_get_utility is deprecated. You should now '
'patch certbot.display.util yourself directly or use '
'certbot.tests.util.patch_display_util as a temporary workaround.')
return mock.patch(target, new_callable=_create_display_util_mock)
return cast(mock.MagicMock, mock.patch(target, new_callable=_create_display_util_mock))
def patch_get_utility_with_stdout(target='zope.component.getUtility',
stdout=None):
def patch_get_utility_with_stdout(target: str = 'zope.component.getUtility',
stdout: Optional[IO] = None) -> mock.MagicMock:
"""Deprecated, patch certbot.display.util directly
or use patch_display_util_with_stdout instead.
@ -221,10 +232,10 @@ def patch_get_utility_with_stdout(target='zope.component.getUtility',
'workaround.')
stdout = stdout if stdout else io.StringIO()
freezable_mock = _create_display_util_mock_with_stdout(stdout)
return mock.patch(target, new=freezable_mock)
return cast(mock.MagicMock, mock.patch(target, new=freezable_mock))
def patch_display_util():
def patch_display_util() -> mock.MagicMock:
"""Patch certbot.display.util to use a special mock display utility.
The mock display utility works like a regular mock object, except it also
@ -242,14 +253,15 @@ def patch_display_util():
:returns: patch on the function used internally by certbot.display.util to
get a display utility instance
:rtype: unittest.mock._patch
:rtype: mock.MagicMock
"""
return mock.patch('certbot._internal.display.obj.get_display',
new_callable=_create_display_util_mock)
return cast(mock.MagicMock, mock.patch('certbot._internal.display.obj.get_display',
new_callable=_create_display_util_mock))
def patch_display_util_with_stdout(stdout=None):
def patch_display_util_with_stdout(
stdout: Optional[IO] = None) -> mock.MagicMock:
"""Patch certbot.display.util to use a special mock display utility.
The mock display utility works like a regular mock object, except it also
@ -272,13 +284,13 @@ def patch_display_util_with_stdout(stdout=None):
expected to have a `write` method
:returns: patch on the function used internally by certbot.display.util to
get a display utility instance
:rtype: unittest.mock._patch
:rtype: mock.MagicMock
"""
stdout = stdout if stdout else io.StringIO()
return mock.patch('certbot._internal.display.obj.get_display',
new=_create_display_util_mock_with_stdout(stdout))
return cast(mock.MagicMock, mock.patch('certbot._internal.display.obj.get_display',
new=_create_display_util_mock_with_stdout(stdout)))
class FreezableMock:
@ -294,7 +306,8 @@ class FreezableMock:
value of func is ignored.
"""
def __init__(self, frozen=False, func=None, return_value=mock.sentinel.DEFAULT):
def __init__(self, frozen: bool = False, func: Callable[..., Any] = None,
return_value: Any = mock.sentinel.DEFAULT) -> None:
self._frozen_set = set() if frozen else {'freeze', }
self._func = func
self._mock = mock.MagicMock()
@ -302,16 +315,16 @@ class FreezableMock:
self.return_value = return_value
self._frozen = frozen
def freeze(self):
def freeze(self) -> None:
"""Freeze object preventing further changes."""
self._frozen = True
def __call__(self, *args, **kwargs):
def __call__(self, *args: Any, **kwargs: Any) -> mock.MagicMock:
if self._func is not None:
self._func(*args, **kwargs)
return self._mock(*args, **kwargs)
def __getattribute__(self, name):
def __getattribute__(self, name: str) -> Any:
if name == '_frozen':
try:
return object.__getattribute__(self, name)
@ -324,7 +337,7 @@ class FreezableMock:
else:
return getattr(object.__getattribute__(self, '_mock'), name)
def __setattr__(self, name, value):
def __setattr__(self, name: str, value: Any) -> None:
""" Before it is frozen, attributes are set on the FreezableMock
instance and added to the _frozen_set. Attributes in the _frozen_set
cannot be changed after the FreezableMock is frozen. In this case,
@ -349,7 +362,7 @@ class FreezableMock:
return object.__setattr__(self, name, value)
def _create_display_util_mock():
def _create_display_util_mock() -> FreezableMock:
display = FreezableMock()
# Use pylint code for disable to keep on single line under line length limit
method_list = [func for func in dir(display_obj.FileDisplay)
@ -363,14 +376,14 @@ def _create_display_util_mock():
return FreezableMock(frozen=True, return_value=display)
def _create_display_util_mock_with_stdout(stdout):
def _write_msg(message, *unused_args, **unused_kwargs):
def _create_display_util_mock_with_stdout(stdout: IO) -> FreezableMock:
def _write_msg(message: str, *unused_args: Any, **unused_kwargs: Any) -> None:
"""Write to message to stdout.
"""
if message:
stdout.write(message)
def mock_method(*args, **kwargs):
def mock_method(*args: Any, **kwargs: Any) -> None:
"""
Mock function for display utility methods.
"""
@ -394,7 +407,7 @@ def _create_display_util_mock_with_stdout(stdout):
return FreezableMock(frozen=True, return_value=display)
def _assert_valid_call(*args, **kwargs):
def _assert_valid_call(*args: Any, **kwargs: Any) -> None:
assert_args = [args[0] if args else kwargs['message']]
assert_kwargs = {}
@ -408,11 +421,11 @@ def _assert_valid_call(*args, **kwargs):
class TempDirTestCase(unittest.TestCase):
"""Base test class which sets up and tears down a temporary directory"""
def setUp(self):
def setUp(self) -> None:
"""Execute before test"""
self.tempdir = tempfile.mkdtemp()
def tearDown(self):
def tearDown(self) -> None:
"""Execute after test"""
# Cleanup opened resources after a test. This is usually done through atexit handlers in
# Certbot, but during tests, atexit will not run registered functions before tearDown is
@ -429,7 +442,7 @@ class TempDirTestCase(unittest.TestCase):
class ConfigTestCase(TempDirTestCase):
"""Test class which sets up a NamespaceConfig object."""
def setUp(self):
def setUp(self) -> None:
super().setUp()
self.config = configuration.NamespaceConfig(
mock.MagicMock(**constants.CLI_DEFAULTS)
@ -444,7 +457,7 @@ class ConfigTestCase(TempDirTestCase):
self.config.namespace.server = "https://example.com"
def _handle_lock(event_in, event_out, path):
def _handle_lock(event_in: synchronize.Event, event_out: synchronize.Event, path: str) -> None:
"""
Acquire a file lock on given path, then wait to release it. This worker is coordinated
using events to signal when the lock should be acquired and released.
@ -463,7 +476,7 @@ def _handle_lock(event_in, event_out, path):
my_lock.release()
def lock_and_call(callback, path_to_lock):
def lock_and_call(callback: Callable[[], Any], path_to_lock: str) -> None:
"""
Grab a lock on path_to_lock from a foreign process then execute the callback.
:param callable callback: object to call after acquiring the lock
@ -472,9 +485,10 @@ def lock_and_call(callback, path_to_lock):
# Reload certbot.util module to reset internal _LOCKS dictionary.
reload_module(util)
emit_event = Event()
receive_event = Event()
process = Process(target=_handle_lock, args=(emit_event, receive_event, path_to_lock))
emit_event = multiprocessing.Event()
receive_event = multiprocessing.Event()
process = multiprocessing.Process(target=_handle_lock,
args=(emit_event, receive_event, path_to_lock))
process.start()
# Wait confirmation that lock is acquired
@ -489,15 +503,15 @@ def lock_and_call(callback, path_to_lock):
assert process.exitcode == 0
def skip_on_windows(reason):
def skip_on_windows(reason: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Decorator to skip permanently a test on Windows. A reason is required."""
def wrapper(function):
def wrapper(function: Callable[..., Any]) -> Callable[..., Any]:
"""Wrapped version"""
return unittest.skipIf(sys.platform == 'win32', reason)(function)
return wrapper
def temp_join(path):
def temp_join(path: str) -> str:
"""
Return the given path joined to the tempdir path for the current platform
Eg.: 'cert' => /tmp/cert (Linux) or 'C:\\Users\\currentuser\\AppData\\Temp\\cert' (Windows)

View file

@ -1,10 +1,7 @@
"""Utilities for all Certbot."""
# distutils.version under virtualenv confuses pylint
# For more info, see: https://github.com/PyCQA/pylint/issues/73
import argparse
import atexit
import collections
import distutils.version
import errno
import logging
import platform
@ -12,10 +9,15 @@ import re
import socket
import subprocess
import sys
from typing import Any
from typing import Callable
from typing import Dict
from typing import IO
from typing import Text
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import TYPE_CHECKING
from typing import Union
import warnings
@ -31,6 +33,9 @@ _USE_DISTRO = sys.platform.startswith('linux')
if _USE_DISTRO:
import distro
if TYPE_CHECKING:
import distutils.version
logger = logging.getLogger(__name__)
@ -61,9 +66,9 @@ _INITIAL_PID = os.getpid()
# program exits before the lock is cleaned up, it is automatically
# released, but the file isn't deleted.
_LOCKS: Dict[str, lock.LockFile] = {}
_VERSION_COMPONENT_RE = re.compile(r'(\d+ | [a-z]+ | \.)', re.VERBOSE)
def env_no_snap_for_external_calls():
def env_no_snap_for_external_calls() -> Dict[str, str]:
"""
When Certbot is run inside a Snap, certain environment variables
are modified. But Certbot sometimes calls out to external programs,
@ -88,7 +93,7 @@ def env_no_snap_for_external_calls():
return env
def run_script(params, log=logger.error):
def run_script(params: List[str], log: Callable[[str], None]=logger.error) -> Tuple[str, str]:
"""Run the script with the given params.
:param list params: List of parameters to pass to subprocess.run
@ -118,7 +123,7 @@ def run_script(params, log=logger.error):
return proc.stdout, proc.stderr
def exe_exists(exe):
def exe_exists(exe: Optional[str]) -> bool:
"""Determine whether path/name refers to an executable.
:param str exe: Executable path or name
@ -127,6 +132,9 @@ def exe_exists(exe):
:rtype: bool
"""
if exe is None:
return False
path, _ = os.path.split(exe)
if path:
return filesystem.is_executable(exe)
@ -137,7 +145,7 @@ def exe_exists(exe):
return False
def lock_dir_until_exit(dir_path):
def lock_dir_until_exit(dir_path: str) -> None:
"""Lock the directory at dir_path until program exit.
:param str dir_path: path to directory
@ -152,7 +160,7 @@ def lock_dir_until_exit(dir_path):
_LOCKS[dir_path] = lock.lock_dir(dir_path)
def _release_locks():
def _release_locks() -> None:
for dir_lock in _LOCKS.values():
try:
dir_lock.release()
@ -162,7 +170,7 @@ def _release_locks():
_LOCKS.clear()
def set_up_core_dir(directory, mode, strict):
def set_up_core_dir(directory: str, mode: int, strict: bool) -> None:
"""Ensure directory exists with proper permissions and is locked.
:param str directory: Path to a directory.
@ -181,7 +189,7 @@ def set_up_core_dir(directory, mode, strict):
raise errors.Error(PERM_ERR_FMT.format(error))
def make_or_verify_dir(directory, mode=0o755, strict=False):
def make_or_verify_dir(directory: str, mode: int = 0o755, strict: bool = False) -> None:
"""Make sure directory exists with proper permissions.
:param str directory: Path to a directory.
@ -208,7 +216,7 @@ def make_or_verify_dir(directory, mode=0o755, strict=False):
raise
def safe_open(path: str, mode: str = "w", chmod=None) -> IO:
def safe_open(path: str, mode: str = "w", chmod: Optional[int] = None) -> IO:
"""Safely open a file.
:param str path: Path to a file.
@ -225,12 +233,12 @@ def safe_open(path: str, mode: str = "w", chmod=None) -> IO:
return os.fdopen(fd, mode, *fdopen_args)
def _unique_file(path, filename_pat, count, chmod, mode):
def _unique_file(path: str, filename_pat: Callable[[int], str], count: int,
chmod: int, mode: str) -> Tuple[IO, str]:
while True:
current_path = os.path.join(path, filename_pat(count))
try:
return safe_open(current_path, chmod=chmod, mode=mode),\
os.path.abspath(current_path)
return safe_open(current_path, chmod=chmod, mode=mode), os.path.abspath(current_path)
except OSError as err:
# "File exists," is okay, try a different name.
if err.errno != errno.EEXIST:
@ -238,7 +246,7 @@ def _unique_file(path, filename_pat, count, chmod, mode):
count += 1
def unique_file(path, chmod=0o777, mode="w"):
def unique_file(path: str, chmod: int = 0o777, mode: str = "w") -> Tuple[IO, str]:
"""Safely finds a unique file.
:param str path: path/filename.ext
@ -254,7 +262,8 @@ def unique_file(path, chmod=0o777, mode="w"):
count=0, chmod=chmod, mode=mode)
def unique_lineage_name(path, filename, chmod=0o644, mode="w"):
def unique_lineage_name(path: str, filename: str, chmod: int = 0o644,
mode: str = "w") -> Tuple[IO, str]:
"""Safely finds a unique file using lineage convention.
:param str path: directory path
@ -281,7 +290,7 @@ def unique_lineage_name(path, filename, chmod=0o644, mode="w"):
count=1, chmod=chmod, mode=mode)
def safely_remove(path):
def safely_remove(path: str) -> None:
"""Remove a file that may not exist."""
try:
os.remove(path)
@ -290,7 +299,7 @@ def safely_remove(path):
raise
def get_filtered_names(all_names):
def get_filtered_names(all_names: Set[str]) -> Set[str]:
"""Removes names that aren't considered valid by Let's Encrypt.
:param set all_names: all names found in the configuration
@ -307,7 +316,7 @@ def get_filtered_names(all_names):
logger.debug('Not suggesting name "%s"', name, exc_info=True)
return filtered_names
def get_os_info():
def get_os_info() -> Tuple[str, str]:
"""
Get OS name and version
@ -317,7 +326,7 @@ def get_os_info():
return get_python_os_info(pretty=False)
def get_os_info_ua():
def get_os_info_ua() -> str:
"""
Get OS name and version string for User Agent
@ -331,7 +340,7 @@ def get_os_info_ua():
return " ".join(get_python_os_info(pretty=True))
return os_info
def get_systemd_os_like():
def get_systemd_os_like() -> List[str]:
"""
Get a list of strings that indicate the distribution likeness to
other distributions.
@ -344,7 +353,7 @@ def get_systemd_os_like():
return distro.like().split(" ")
return []
def get_var_from_file(varname, filepath="/etc/os-release"):
def get_var_from_file(varname: str, filepath: str = "/etc/os-release") -> str:
"""
Get single value from a file formatted like systemd /etc/os-release
@ -366,14 +375,14 @@ def get_var_from_file(varname, filepath="/etc/os-release"):
return _normalize_string(line.strip()[len(var_string):])
return ""
def _normalize_string(orig):
def _normalize_string(orig: str) -> str:
"""
Helper function for get_var_from_file() to remove quotes
and whitespaces
"""
return orig.replace('"', '').replace("'", "").strip()
def get_python_os_info(pretty=False):
def get_python_os_info(pretty: bool = False) -> Tuple[str, str]:
"""
Get Operating System type/distribution and major version
using python platform module
@ -432,7 +441,7 @@ def get_python_os_info(pretty=False):
EMAIL_REGEX = re.compile("[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+$")
def safe_email(email):
def safe_email(email: str) -> bool:
"""Scrub email address before using it."""
if EMAIL_REGEX.match(email) is not None:
return not email.startswith(".") and ".." not in email
@ -442,11 +451,13 @@ def safe_email(email):
class DeprecatedArgumentAction(argparse.Action):
"""Action to log a warning when an argument is used."""
def __call__(self, unused1, unused2, unused3, option_string=None):
def __call__(self, unused1: Any, unused2: Any, unused3: Any,
option_string: Optional[str] = None) -> None:
warnings.warn("Use of %s is deprecated." % option_string, DeprecationWarning)
def add_deprecated_argument(add_argument, argument_name, nargs):
def add_deprecated_argument(add_argument: Callable[..., None], argument_name: str,
nargs: Union[str, int]) -> None:
"""Adds a deprecated argument with the name argument_name.
Deprecated arguments are not shown in the help. If they are used on
@ -472,11 +483,11 @@ def add_deprecated_argument(add_argument, argument_name, nargs):
help=argparse.SUPPRESS, nargs=nargs)
def enforce_le_validity(domain):
def enforce_le_validity(domain: str) -> str:
"""Checks that Let's Encrypt will consider domain to be valid.
:param str domain: FQDN to check
:type domain: `str` or `unicode`
:type domain: `str`
:returns: The domain cast to `str`, with ASCII-only contents
:rtype: str
:raises ConfigurationError: for invalid domains and cases where Let's
@ -505,12 +516,13 @@ def enforce_le_validity(domain):
label, domain))
return domain
def enforce_domain_sanity(domain):
def enforce_domain_sanity(domain: Union[str, bytes]) -> str:
"""Method which validates domain value and errors out if
the requirements are not met.
:param domain: Domain to check
:type domain: `str` or `unicode`
:type domain: `str` or `bytes`
:raises ConfigurationError: for invalid domains and cases where Let's
Encrypt currently will not issue certificates
@ -564,11 +576,11 @@ def enforce_domain_sanity(domain):
return domain
def is_ipaddress(address):
def is_ipaddress(address: str) -> bool:
"""Is given address string form of IP(v4 or v6) address?
:param address: address to check
:type address: `str` or `unicode`
:type address: `str`
:returns: True if address is valid IP address, otherwise return False.
:rtype: bool
@ -587,23 +599,22 @@ def is_ipaddress(address):
return False
def is_wildcard_domain(domain):
def is_wildcard_domain(domain: Union[str, bytes]) -> bool:
""""Is domain a wildcard domain?
:param domain: domain to check
:type domain: `bytes` or `str` or `unicode`
:type domain: `bytes` or `str`
:returns: True if domain is a wildcard, otherwise, False
:rtype: bool
"""
wildcard_marker: Union[Text, bytes] = b"*."
if isinstance(domain, str):
wildcard_marker = "*."
return domain.startswith(wildcard_marker)
return domain.startswith("*.")
return domain.startswith(b"*.")
def get_strict_version(normalized):
def get_strict_version(normalized: str) -> "distutils.version.StrictVersion":
"""Converts a normalized version to a strict version.
:param str normalized: normalized version string
@ -612,11 +623,16 @@ def get_strict_version(normalized):
:rtype: distutils.version.StrictVersion
"""
# strict version ending with "a" and a number designates a pre-release
return distutils.version.StrictVersion(normalized.replace(".dev", "a"))
warnings.warn("certbot.util.get_strict_version is deprecated and will be "
"removed in a future release.", DeprecationWarning)
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
import distutils.version
# strict version ending with "a" and a number designates a pre-release
return distutils.version.StrictVersion(normalized.replace(".dev", "a"))
def is_staging(srv):
def is_staging(srv: str) -> bool:
"""
Determine whether a given ACME server is a known test / staging server.
@ -627,7 +643,7 @@ def is_staging(srv):
return srv == constants.STAGING_URI or "staging" in srv
def atexit_register(func, *args, **kwargs):
def atexit_register(func: Callable, *args: Any, **kwargs: Any) -> None:
"""Sets func to be called before the program exits.
Special care is taken to ensure func is only called when the process
@ -639,6 +655,32 @@ def atexit_register(func, *args, **kwargs):
atexit.register(_atexit_call, func, *args, **kwargs)
def _atexit_call(func, *args, **kwargs):
def parse_loose_version(version_string: str) -> List[Union[int, str]]:
"""Parses a version string into its components.
This code and the returned tuple is based on the now deprecated
distutils.version.LooseVersion class from the Python standard library.
Two LooseVersion classes and two lists as returned by this function should
compare in the same way. See
https://github.com/python/cpython/blob/v3.10.0/Lib/distutils/version.py#L205-L347.
:param str version_string: version string
:returns: list of parsed version string components
:rtype: list
"""
components: List[Union[int, str]]
components = [x for x in _VERSION_COMPONENT_RE.split(version_string)
if x and x != '.']
for i, obj in enumerate(components):
try:
components[i] = int(obj)
except ValueError:
pass
return components
def _atexit_call(func: Callable, *args: Any, **kwargs: Any) -> None:
if _INITIAL_PID == os.getpid():
func(*args, **kwargs)

View file

@ -1,4 +1,4 @@
usage:
usage:
certbot [SUBCOMMAND] [options] [-d DOMAIN] [-d DOMAIN] ...
Certbot can obtain and install HTTPS/TLS/SSL certificates. By default,
@ -119,7 +119,7 @@ optional arguments:
case, and to know when to deprecate support for past
Python versions and flags. If you wish to hide this
information from the Let's Encrypt server, set this to
"". (default: CertbotACMEClient/1.20.0 (certbot;
"". (default: CertbotACMEClient/1.21.0 (certbot;
OS_NAME OS_VERSION) Authenticator/XXX Installer/YYY
(SUBCOMMAND; flags: FLAGS) Py/major.minor.patchlevel).
The flags encoded in the user agent are: --duplicate,

View file

@ -7,5 +7,8 @@
# in --editable mode (-e), just "pip install .[docs]" does not work as
# expected and "pip install -e certbot[docs]" must be used instead
# We also pin our dependencies for increased stability.
-c ../tools/requirements.txt
-e acme
-e certbot[docs]

View file

@ -1,9 +1,9 @@
import codecs
from distutils.version import LooseVersion
import os
import re
import sys
from pkg_resources import parse_version
from setuptools import __version__ as setuptools_version
from setuptools import find_packages
from setuptools import setup
@ -11,7 +11,7 @@ from setuptools import setup
min_setuptools_version='39.0.1'
# This conditional isn't necessary, but it provides better error messages to
# people who try to install this package with older versions of setuptools.
if LooseVersion(setuptools_version) < LooseVersion(min_setuptools_version):
if parse_version(setuptools_version) < parse_version(min_setuptools_version):
raise RuntimeError(f'setuptools {min_setuptools_version}+ is required')
# Workaround for https://bugs.python.org/issue8876, see
@ -132,6 +132,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
'Topic :: System :: Installation/Setup',

View file

@ -592,6 +592,19 @@ class OsInfoTest(unittest.TestCase):
self.assertEqual(cbutil.get_python_os_info(), ("testdist", "42"))
class GetStrictVersionTest(unittest.TestCase):
"""Test for certbot.util.get_strict_version."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.util import get_strict_version
return get_strict_version(*args, **kwargs)
def test_it(self):
with self.assertWarnsRegex(DeprecationWarning, "get_strict_version"):
self._call("1.2.3")
class AtexitRegisterTest(unittest.TestCase):
"""Tests for certbot.util.atexit_register."""
def setUp(self):
@ -624,5 +637,38 @@ class AtexitRegisterTest(unittest.TestCase):
atexit_func(*args[1:], **kwargs)
class ParseLooseVersionTest(unittest.TestCase):
"""Test for certbot.util.parse_loose_version.
These tests are based on the original tests for
distutils.version.LooseVersion at
https://github.com/python/cpython/blob/v3.10.0/Lib/distutils/tests/test_version.py#L58-L81.
"""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.util import parse_loose_version
return parse_loose_version(*args, **kwargs)
def test_less_than(self):
comparisons = (('1.5.1', '1.5.2b2'),
('3.4j', '1996.07.12'),
('2g6', '11g'),
('0.960923', '2.2beta29'),
('1.13++', '5.5.kw'))
for v1, v2 in comparisons:
self.assertLess(self._call(v1), self._call(v2))
def test_equal(self):
self.assertEqual(self._call('8.02'), self._call('8.02'))
def test_greater_than(self):
comparisons = (('161', '3.10a'),
('3.2.pl0', '3.1.1.6'))
for v1, v2 in comparisons:
self.assertGreater(self._call(v1), self._call(v2))
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -1,8 +1,9 @@
#!/usr/bin/env python
# Test script for OpenSSL version checking
from distutils.version import LooseVersion
import sys
from certbot import util
def main(openssl_version, apache_version):
if not openssl_version.strip():
@ -12,8 +13,8 @@ def main(openssl_version, apache_version):
conf_file_location = "/etc/letsencrypt/options-ssl-apache.conf"
with open(conf_file_location) as f:
contents = f.read()
if LooseVersion(apache_version.strip()) < LooseVersion('2.4.11') or \
LooseVersion(openssl_version.strip()) < LooseVersion('1.0.2l'):
if util.parse_loose_version(apache_version.strip()) < util.parse_loose_version('2.4.11') or \
util.parse_loose_version(openssl_version.strip()) < util.parse_loose_version('1.0.2l'):
# should be old version
# assert SSLSessionTickets not in conf file
if "SSLSessionTickets" in contents:

View file

@ -20,6 +20,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Security',
],

View file

@ -22,6 +22,11 @@
# the certbot.display.util module.
# 5) A deprecation warning is raised in dnspython==1.15.0 in the oldest tests for
# certbot-dns-rfc2136.
# 6) The vendored version of six in botocore causes ImportWarnings in Python
# 3.10+. See https://github.com/boto/botocore/issues/2548.
# 7) botocore's default TLS settings raise deprecation warnings in Python
# 3.10+, but their values are sane from a security perspective. See
# https://github.com/boto/botocore/issues/2550.
filterwarnings =
error
ignore:The external mock module:PendingDeprecationWarning
@ -29,3 +34,5 @@ filterwarnings =
ignore:.*attribute in certbot.interfaces module is deprecated:DeprecationWarning
ignore:.*attribute in certbot.display.util module is deprecated:DeprecationWarning
ignore:decodestring\(\) is a deprecated alias:DeprecationWarning:dns
ignore:_SixMetaPathImporter.:ImportWarning
ignore:ssl.PROTOCOL_TLS:DeprecationWarning:botocore

View file

@ -12,7 +12,7 @@ yes "n" | sphinx-quickstart --dot _ --project $PROJECT --author "Certbot Project
cd $PROJECT/docs
sed -i -e "s|\# import os|import os|" conf.py
sed -i -e "s|\# needs_sphinx = '1.0'|needs_sphinx = '1.0'|" conf.py
sed -i -e "s|intersphinx_mapping = {'https://docs.python.org/': None}|intersphinx_mapping = {\n 'python': ('https://docs.python.org/', None),\n 'acme': ('https://acme-python.readthedocs.org/en/latest/', None),\n 'certbot': ('https://certbot.eff.org/docs/', None),\n}|" conf.py
sed -i -e "s|intersphinx_mapping = {'https://docs.python.org/': None}|intersphinx_mapping = {\n 'python': ('https://docs.python.org/', None),\n 'acme': ('https://acme-python.readthedocs.org/en/latest/', None),\n 'certbot': ('https://eff-certbot.readthedocs.io/en/stable/', None),\n}|" conf.py
sed -i -e "s|html_theme = 'alabaster'|\n# https://docs.readthedocs.io/en/stable/faq.html#i-want-to-use-the-read-the-docs-theme-locally\n# on_rtd is whether we are on readthedocs.org\non_rtd = os.environ.get('READTHEDOCS', None) == 'True'\nif not on_rtd: # only import and set the theme if we're building docs locally\n import sphinx_rtd_theme\n html_theme = 'sphinx_rtd_theme'\n html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]\n# otherwise, readthedocs.org uses their theme by default, so no need to specify it|" conf.py
sed -i -e "s|# Add any paths that contain templates here, relative to this directory.|autodoc_member_order = 'bysource'\nautodoc_default_flags = ['show-inheritance']\n\n# Add any paths that contain templates here, relative to this directory.|" conf.py
sed -i -e "s|# The name of the Pygments (syntax highlighting) style to use.|default_role = 'py:obj'\n\n# The name of the Pygments (syntax highlighting) style to use.|" conf.py

View file

@ -14,7 +14,6 @@ variable VENV_NAME.
from __future__ import print_function
from distutils.version import LooseVersion
import glob
import os
import re
@ -200,31 +199,9 @@ def install_packages(venv_name, pip_args):
# Using the python executable from venv, we ensure to execute following commands in this venv.
py_venv = get_venv_python_path(venv_name)
subprocess_with_print([py_venv, os.path.abspath('tools/pipstrap.py')])
# We only use this value during pip install because:
# 1) We're really only adding it for installing cryptography, which happens here, and
# 2) There are issues with calling it along with VIRTUALENV_NO_DOWNLOAD, which applies at the
# steps above, not during pip install.
env_pip_no_binary = os.environ.get('CERTBOT_PIP_NO_BINARY')
if env_pip_no_binary:
# Check OpenSSL version. If it's too low, don't apply the env variable.
openssl_version_string = str(subprocess_output_with_print(['openssl', 'version']))
matches = re.findall(r'OpenSSL ([^ ]+) ', openssl_version_string)
if not matches:
print('Could not find OpenSSL version, not setting PIP_NO_BINARY.')
else:
openssl_version = matches[0]
if LooseVersion(openssl_version) >= LooseVersion('1.0.2'):
print('Setting PIP_NO_BINARY to {0}'
' as specified in CERTBOT_PIP_NO_BINARY'.format(env_pip_no_binary))
os.environ['PIP_NO_BINARY'] = env_pip_no_binary
else:
print('Not setting PIP_NO_BINARY, as OpenSSL version is too old.')
command = [py_venv, os.path.abspath('tools/pip_install.py')]
command.extend(pip_args)
subprocess_with_print(command)
if 'PIP_NO_BINARY' in os.environ:
del os.environ['PIP_NO_BINARY']
if os.path.isdir(os.path.join(venv_name, 'bin')):
# Linux/OSX specific

Some files were not shown because too many files have changed in this diff Show more