Reimplement Certbot zope.interfaces into abstract base classes (#8950)

* Implement certbot services

* Various fixes

* Local oldest requirements

* Clean imports

* Add unit tests for certbot.services

* Clean code

* Protect against nullity of global services

* Fix CLI

* Fix tests

* Consistent test behavior

* Define new ABC classes

* Reimplement services with new ABC classes

* Adapt plugins discovery and selection

* Remove zope interfaces from plugins

* Re-enable delegation for simplicity

* Fix interfaces declaration

* Remove interface implementer

* Interfaces ordering

* Extract zope logic from discovery

* Cleanup imports

* Fixing tests

* Fix main_test

* Finish certbot unit tests

* Fix lint

* Various fixes thanks to mypy

* Fix lint

* Order imports

* Various fixes

* Clean code

* Remove reporter service, migrate display service in certbot.display.util.

* Fix test

* Fix apache compatibility test

* Fix oldest test

* Setup certbot.display.service module

* Reintegrate in util

* Fix imports

* Fix tests and documentation

* Refactor

* Cleanup

* Cleanup

* Clean imports

* Add unit tests

* Borrow sphinx build fix from #8863

* Align zope interfaces on ABC

* Various fixes

* Fix type

* Fix type

* Some cleanup

* Fix lint

* Update certbot/certbot/_internal/configuration.py

Co-authored-by: Brad Warren <bmw@users.noreply.github.com>

* Update certbot/certbot/_internal/configuration.py

Co-authored-by: Brad Warren <bmw@users.noreply.github.com>

* Fix imports

* Fix Config contract (accounts_dir property)

* Remove unnecessary interface

* Set NamespaceConfig public, remove Config interface

* Remove Display ABC and implementation of IDisplay

* Clean lint

* Cleanup old decorators

* Contract on plugin constructor only

* Update certbot/certbot/tests/util.py

Co-authored-by: Brad Warren <bmw@users.noreply.github.com>

* Update certbot/certbot/configuration.py

Co-authored-by: Brad Warren <bmw@users.noreply.github.com>

* Update certbot/certbot/interfaces.py

Co-authored-by: Brad Warren <bmw@users.noreply.github.com>

* Some corrections

* Add changelog

* Fix --authenticators and --installers flags on plugins subcommand

* Fix multiheritance on the interface Plugin

* Update certbot/certbot/_internal/plugins/manual.py

Co-authored-by: Brad Warren <bmw@users.noreply.github.com>

* Update certbot/certbot/_internal/plugins/disco.py

Co-authored-by: Brad Warren <bmw@users.noreply.github.com>

* Add warnings in logger also

* Add deprecation warnings also when plugins are verified.

Co-authored-by: Brad Warren <bmw@users.noreply.github.com>
This commit is contained in:
Adrien Ferrand 2021-07-29 22:45:29 +02:00 committed by GitHub
parent 8133d3e70a
commit 979e21dcbf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
57 changed files with 574 additions and 602 deletions

View file

@ -15,8 +15,6 @@ from typing import Optional
from typing import Set
from typing import Union
import zope.interface
from acme import challenges
from certbot import errors
from certbot import interfaces
@ -119,10 +117,7 @@ class OsOptions:
# TODO: Add directives to sites-enabled... not sites-available.
# sites-available doesn't allow immediate find_dir search even with save()
# and load()
@zope.interface.implementer(interfaces.IAuthenticator, interfaces.IInstaller)
@zope.interface.provider(interfaces.IPluginFactory)
class ApacheConfigurator(common.Installer):
class ApacheConfigurator(common.Installer, interfaces.Authenticator):
"""Apache configurator.
:ivar config: Configuration.

View file

@ -1,12 +1,8 @@
""" Distribution specific override class for Arch Linux """
import zope.interface
from certbot import interfaces
from certbot_apache._internal import configurator
from certbot_apache._internal.configurator import OsOptions
@zope.interface.provider(interfaces.IPluginFactory)
class ArchConfigurator(configurator.ApacheConfigurator):
"""Arch Linux specific ApacheConfigurator override class"""

View file

@ -3,10 +3,7 @@ import logging
from typing import cast
from typing import List
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot import util
from certbot.errors import MisconfigurationError
from certbot_apache._internal import apache_util
@ -17,7 +14,6 @@ from certbot_apache._internal.configurator import OsOptions
logger = logging.getLogger(__name__)
@zope.interface.provider(interfaces.IPluginFactory)
class CentOSConfigurator(configurator.ApacheConfigurator):
"""CentOS specific ApacheConfigurator override class"""

View file

@ -1,12 +1,8 @@
""" Distribution specific override class for macOS """
import zope.interface
from certbot import interfaces
from certbot_apache._internal import configurator
from certbot_apache._internal.configurator import OsOptions
@zope.interface.provider(interfaces.IPluginFactory)
class DarwinConfigurator(configurator.ApacheConfigurator):
"""macOS specific ApacheConfigurator override class"""

View file

@ -1,10 +1,7 @@
""" Distribution specific override class for Debian family (Ubuntu/Debian) """
import logging
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot import util
from certbot.compat import filesystem
from certbot.compat import os
@ -15,7 +12,6 @@ from certbot_apache._internal.configurator import OsOptions
logger = logging.getLogger(__name__)
@zope.interface.provider(interfaces.IPluginFactory)
class DebianConfigurator(configurator.ApacheConfigurator):
"""Debian specific ApacheConfigurator override class"""

View file

@ -1,8 +1,5 @@
""" Distribution specific override class for Fedora 29+ """
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot import util
from certbot_apache._internal import apache_util
from certbot_apache._internal import configurator
@ -10,7 +7,6 @@ from certbot_apache._internal import parser
from certbot_apache._internal.configurator import OsOptions
@zope.interface.provider(interfaces.IPluginFactory)
class FedoraConfigurator(configurator.ApacheConfigurator):
"""Fedora 29+ specific ApacheConfigurator override class"""

View file

@ -1,14 +1,10 @@
""" Distribution specific override class for Gentoo Linux """
import zope.interface
from certbot import interfaces
from certbot_apache._internal import apache_util
from certbot_apache._internal import configurator
from certbot_apache._internal import parser
from certbot_apache._internal.configurator import OsOptions
@zope.interface.provider(interfaces.IPluginFactory)
class GentooConfigurator(configurator.ApacheConfigurator):
"""Gentoo specific ApacheConfigurator override class"""

View file

@ -1,12 +1,8 @@
""" Distribution specific override class for OpenSUSE """
import zope.interface
from certbot import interfaces
from certbot_apache._internal import configurator
from certbot_apache._internal.configurator import OsOptions
@zope.interface.provider(interfaces.IPluginFactory)
class OpenSUSEConfigurator(configurator.ApacheConfigurator):
"""OpenSUSE specific ApacheConfigurator override class"""

View file

@ -6,9 +6,8 @@ from unittest import mock
import zope.interface
from certbot import errors as le_errors
from certbot import errors as le_errors, configuration
from certbot import util as certbot_util
from certbot._internal import configuration
from certbot_apache._internal import entrypoint
from certbot_compatibility_test import errors
from certbot_compatibility_test import interfaces

View file

@ -6,7 +6,7 @@ from typing import Set
import zope.interface
from certbot._internal import configuration
from certbot import configuration
from certbot_compatibility_test import errors
from certbot_compatibility_test import interfaces
from certbot_compatibility_test import util

View file

@ -6,10 +6,8 @@ from typing import List
from typing import Optional
import CloudFlare
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
from certbot.plugins.dns_common import CredentialsConfiguration
@ -18,8 +16,6 @@ logger = logging.getLogger(__name__)
ACCOUNT_URL = 'https://dash.cloudflare.com/?to=/:account/profile/api-tokens'
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for Cloudflare

View file

@ -3,10 +3,8 @@ import logging
from typing import Optional
from lexicon.providers import cloudxns
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
from certbot.plugins import dns_common_lexicon
from certbot.plugins.dns_common import CredentialsConfiguration
@ -16,8 +14,6 @@ logger = logging.getLogger(__name__)
ACCOUNT_URL = 'https://www.cloudxns.net/en/AccountManage/apimanage.html'
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for CloudXNS DNS

View file

@ -3,18 +3,14 @@ import logging
from typing import Optional
import digitalocean
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
from certbot.plugins.dns_common import CredentialsConfiguration
logger = logging.getLogger(__name__)
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for DigitalOcean

View file

@ -3,10 +3,8 @@ import logging
from typing import Optional
from lexicon.providers import dnsimple
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
from certbot.plugins import dns_common_lexicon
from certbot.plugins.dns_common import CredentialsConfiguration
@ -16,8 +14,6 @@ logger = logging.getLogger(__name__)
ACCOUNT_URL = 'https://dnsimple.com/user'
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for DNSimple

View file

@ -3,10 +3,8 @@ import logging
from typing import Optional
from lexicon.providers import dnsmadeeasy
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
from certbot.plugins import dns_common_lexicon
from certbot.plugins.dns_common import CredentialsConfiguration
@ -16,8 +14,6 @@ logger = logging.getLogger(__name__)
ACCOUNT_URL = 'https://cp.dnsmadeeasy.com/account/info'
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for DNS Made Easy

View file

@ -3,10 +3,8 @@ import logging
from typing import Optional
from lexicon.providers import gehirn
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
from certbot.plugins import dns_common_lexicon
from certbot.plugins.dns_common import CredentialsConfiguration
@ -15,8 +13,7 @@ logger = logging.getLogger(__name__)
DASHBOARD_URL = "https://gis.gehirn.jp/"
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for Gehirn Infrastructure Service DNS

View file

@ -6,10 +6,8 @@ from googleapiclient import discovery
from googleapiclient import errors as googleapiclient_errors
import httplib2
from oauth2client.service_account import ServiceAccountCredentials
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
logger = logging.getLogger(__name__)
@ -20,8 +18,6 @@ METADATA_URL = 'http://metadata.google.internal/computeMetadata/v1/'
METADATA_HEADERS = {'Metadata-Flavor': 'Google'}
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for Google Cloud DNS

View file

@ -5,10 +5,8 @@ from typing import Optional
from lexicon.providers import linode
from lexicon.providers import linode4
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
from certbot.plugins import dns_common_lexicon
from certbot.plugins.dns_common import CredentialsConfiguration
@ -18,8 +16,7 @@ logger = logging.getLogger(__name__)
API_KEY_URL = 'https://manager.linode.com/profile/api'
API_KEY_URL_V4 = 'https://cloud.linode.com/profile/tokens'
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for Linode

View file

@ -3,10 +3,8 @@ import logging
from typing import Optional
from lexicon.providers import luadns
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
from certbot.plugins import dns_common_lexicon
from certbot.plugins.dns_common import CredentialsConfiguration
@ -16,8 +14,6 @@ logger = logging.getLogger(__name__)
ACCOUNT_URL = 'https://api.luadns.com/settings'
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for LuaDNS

View file

@ -3,10 +3,8 @@ import logging
from typing import Optional
from lexicon.providers import nsone
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
from certbot.plugins import dns_common_lexicon
from certbot.plugins.dns_common import CredentialsConfiguration
@ -16,8 +14,6 @@ logger = logging.getLogger(__name__)
ACCOUNT_URL = 'https://my.nsone.net/#/account/settings'
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for NS1

View file

@ -3,10 +3,8 @@ import logging
from typing import Optional
from lexicon.providers import ovh
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
from certbot.plugins import dns_common_lexicon
from certbot.plugins.dns_common import CredentialsConfiguration
@ -16,8 +14,6 @@ logger = logging.getLogger(__name__)
TOKEN_URL = 'https://eu.api.ovh.com/createToken/ or https://ca.api.ovh.com/createToken/'
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for OVH

View file

@ -11,10 +11,8 @@ import dns.rdatatype
import dns.tsig
import dns.tsigkeyring
import dns.update
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
from certbot.plugins.dns_common import CredentialsConfiguration
@ -22,8 +20,7 @@ logger = logging.getLogger(__name__)
DEFAULT_NETWORK_TIMEOUT = 45
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator using RFC 2136 Dynamic Updates

View file

@ -9,10 +9,8 @@ from typing import List
import boto3
from botocore.exceptions import ClientError
from botocore.exceptions import NoCredentialsError
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
logger = logging.getLogger(__name__)
@ -23,8 +21,6 @@ INSTRUCTIONS = (
"and add the necessary permissions for Route53 access.")
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
"""Route53 Authenticator

View file

@ -1,14 +1,9 @@
"""Shim around `~certbot_dns_route53._internal.dns_route53` for backwards compatibility."""
import warnings
import zope.interface
from certbot import interfaces
from certbot_dns_route53._internal import dns_route53
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_route53.Authenticator):
"""Shim around `~certbot_dns_route53._internal.dns_route53.Authenticator`
for backwards compatibility."""

View file

@ -3,10 +3,8 @@ import logging
from typing import Optional
from lexicon.providers import sakuracloud
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
from certbot.plugins import dns_common_lexicon
from certbot.plugins.dns_common import CredentialsConfiguration
@ -16,8 +14,6 @@ logger = logging.getLogger(__name__)
APIKEY_URL = "https://secure.sakura.ad.jp/cloud/#!/apikey/top/"
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for Sakura Cloud DNS

View file

@ -16,7 +16,6 @@ from typing import Tuple
import OpenSSL
import pkg_resources
import zope.interface
from acme import challenges
from acme import crypto_util as acme_crypto_util
@ -44,9 +43,7 @@ NO_SSL_MODIFIER = 4
logger = logging.getLogger(__name__)
@zope.interface.implementer(interfaces.IAuthenticator, interfaces.IInstaller)
@zope.interface.provider(interfaces.IPluginFactory)
class NginxConfigurator(common.Installer):
class NginxConfigurator(common.Installer, interfaces.Authenticator):
"""Nginx configurator.
.. todo:: Add proper support for comments in the config. Currently,

View file

@ -9,6 +9,14 @@ Certbot adheres to [Semantic Versioning](https://semver.org/).
* New functions that Certbot plugins can use to interact with the user have
been added to `certbot.display.util`. We plan to deprecate using `IDisplay`
with `zope` in favor of these new functions in the future.
* The `Plugin`, `Authenticator` and `Installer` classes are added to
`certbot.interfaces` module as alternatives to Certbot's current `zope` based
plugin interfaces. The API of these interfaces is identical, but they are
based on Python's `abc` module instead of `zope`. Certbot will continue to
detect plugins that implement either interface, but we plan to drop support
for `zope` based interfaces in a future version of Certbot.
* The class `certbot.configuration.NamespaceConfig` is added to the Certbot's
public API.
### Changed
@ -22,6 +30,11 @@ Certbot adheres to [Semantic Versioning](https://semver.org/).
* The `certbot.tests.patch_get_utility*` functions have been deprecated.
Plugins should now patch `certbot.display.util` themselves in their tests or
use `certbot.tests.util.patch_display_util` as a temporary workaround.
* In order to simplify the transition to Certbot's new plugin interfaces, the
classes `Plugin` and `Installer` in `certbot.plugins.common` module and
`certbot.plugins.dns_common.DNSAuthenticator` now implement Certbot's new
plugin interfaces. The Certbot plugins based on these classes are now
automatically detected as implementing these interfaces.
### Fixed

View file

@ -307,8 +307,7 @@ def human_readable_cert_info(config, cert, skip_filter_checks=False):
def get_certnames(config, verb, allow_multiple=False, custom_prompt=None):
"""Get certname from flag, interactively, or error out.
"""
"""Get certname from flag, interactively, or error out."""
certname = config.certname
if certname:
certnames = [certname]

View file

@ -1,12 +1,11 @@
"""Certbot command line util function"""
import argparse
import copy
import zope.interface.interface # pylint: disable=unused-import
import inspect
from acme import challenges
from certbot import configuration
from certbot import errors
from certbot import interfaces
from certbot import util
from certbot._internal import constants
from certbot.compat import os
@ -59,11 +58,10 @@ def flag_default(name):
def config_help(name, hidden=False):
"""Extract the help message for an `.IConfig` attribute."""
"""Extract the help message for a `configuration.NamespaceConfig` property docstring."""
if hidden:
return argparse.SUPPRESS
field: zope.interface.interface.Attribute = interfaces.IConfig.__getitem__(name)
return field.__doc__
return inspect.getdoc(getattr(configuration.NamespaceConfig, name))
class HelpfulArgumentGroup:

View file

@ -61,8 +61,8 @@ def _create_subparsers(helpful):
helpful.add("plugins",
"--authenticators", action="append_const", dest="ifaces",
default=flag_default("ifaces"),
const=interfaces.IAuthenticator, help="Limit to authenticator plugins only.")
const=interfaces.Authenticator, help="Limit to authenticator plugins only.")
helpful.add("plugins",
"--installers", action="append_const", dest="ifaces",
default=flag_default("ifaces"),
const=interfaces.IInstaller, help="Limit to installer plugins only.")
const=interfaces.Installer, help="Limit to installer plugins only.")

View file

@ -4,6 +4,7 @@ import sys
import textwrap
from typing import Any
from typing import Optional
from typing import Union
import zope.component
import zope.interface
@ -26,13 +27,12 @@ logger = logging.getLogger(__name__)
# object to happen first avoiding this potential bug.
class _DisplayService:
def __init__(self):
self.display: Optional[interfaces.IDisplay] = None
self.display: Optional[Union[FileDisplay, NoninteractiveDisplay]] = None
_SERVICE = _DisplayService()
@zope.interface.implementer(interfaces.IDisplay)
class FileDisplay:
"""File-based display."""
# see https://github.com/certbot/certbot/issues/3915
@ -381,7 +381,6 @@ class FileDisplay:
return util.OK, selection
@zope.interface.implementer(interfaces.IDisplay)
class NoninteractiveDisplay:
"""An iDisplay implementation that never asks for interactive user input"""
@ -390,7 +389,7 @@ class NoninteractiveDisplay:
self.outfile = outfile
def _interaction_fail(self, message, cli_flag, extra=""):
"Error out in case of an attempt to interact in noninteractive mode"
"""Error out in case of an attempt to interact in noninteractive mode"""
msg = "Missing command line flag or config entry for this setting:\n"
msg += message
if extra:
@ -543,7 +542,7 @@ def set_display(display: Any) -> None:
"""
# This call is done only for retro-compatibility purposes.
# TODO: Remove this call once zope dependencies are removed from Certbot.
zope.component.provideUtility(display)
zope.component.provideUtility(display, interfaces.IDisplay)
_SERVICE.display = display

View file

@ -16,10 +16,11 @@ from typing import Union
import configobj
import josepy as jose
import zope.component
import zope.interface
from acme import errors as acme_errors
import certbot
from certbot import crypto_util
from certbot import crypto_util, configuration
from certbot import errors
from certbot import interfaces
from certbot import util
@ -27,7 +28,6 @@ from certbot._internal import account
from certbot._internal import cert_manager
from certbot._internal import cli
from certbot._internal import client
from certbot._internal import configuration
from certbot._internal import constants
from certbot._internal import eff
from certbot._internal import hooks
@ -1540,7 +1540,7 @@ def main(cli_args=None):
# This call is done only for retro-compatibility purposes.
# TODO: Remove this call once zope dependencies are removed from Certbot.
zope.component.provideUtility(config)
zope.component.provideUtility(config, interfaces.IConfig)
# On windows, shell without administrative right cannot create symlinks required by certbot.
# So we check the rights before continuing.
@ -1557,7 +1557,7 @@ def main(cli_args=None):
# These calls are done only for retro-compatibility purposes.
# TODO: Remove these calls once zope dependencies are removed from Certbot.
report = reporter.Reporter(config)
zope.component.provideUtility(report)
zope.component.provideUtility(report, interfaces.IReporter)
util.atexit_register(report.print_messages)
with make_displayer(config) as displayer:

View file

@ -5,7 +5,9 @@ import logging
import sys
from typing import Dict
from typing import Optional
from typing import Type
from typing import Union
import warnings
import pkg_resources
import zope.interface
@ -49,10 +51,10 @@ class PluginEntryPoint:
def __init__(self, entry_point: pkg_resources.EntryPoint, with_prefix=False):
self.name = self.entry_point_to_plugin_name(entry_point, with_prefix)
self.plugin_cls: interfaces.IPluginFactory = entry_point.load()
self.plugin_cls: Type[interfaces.Plugin] = entry_point.load()
self.entry_point = entry_point
self.warning_message: Optional[str] = None
self._initialized: Optional[interfaces.IPlugin] = None
self._initialized: Optional[interfaces.Plugin] = None
self._prepared: Optional[Union[bool, Error]] = None
self._hidden = False
self._long_description: Optional[str] = None
@ -87,10 +89,7 @@ class PluginEntryPoint:
"""Long description of the plugin."""
if self._long_description:
return self._long_description
try:
return self.plugin_cls.long_description
except AttributeError:
return self.description
return getattr(self.plugin_cls, "long_description", self.description)
@long_description.setter
def long_description(self, description):
@ -108,7 +107,7 @@ class PluginEntryPoint:
def ifaces(self, *ifaces_groups):
"""Does plugin implements specified interface groups?"""
return not ifaces_groups or any(
all(iface.implementedBy(self.plugin_cls)
all(_implements(self.plugin_cls, iface)
for iface in ifaces)
for ifaces in ifaces_groups)
@ -121,9 +120,9 @@ class PluginEntryPoint:
"""Memoized plugin initialization."""
if not self.initialized:
self.entry_point.require() # fetch extras!
# TODO: remove type ignore once the interface becomes a proper
# abstract class (using abc) that mypy understands.
self._initialized = self.plugin_cls(config, self.name) # type: ignore
# For plugins implementing ABCs Plugin, Authenticator or Installer, the following
# line will raise an exception if some implementations of abstract methods are missing.
self._initialized = self.plugin_cls(config, self.name)
return self._initialized
def verify(self, ifaces):
@ -131,14 +130,9 @@ class PluginEntryPoint:
if not self.initialized:
raise ValueError("Plugin is not initialized.")
for iface in ifaces: # zope.interface.providedBy(plugin)
try:
zope.interface.verify.verifyObject(iface, self.init())
except zope.interface.exceptions.BrokenImplementation as error:
if iface.implementedBy(self.plugin_cls):
logger.debug(
"%s implements %s but object does not verify: %s",
self.plugin_cls, iface.__name__, error, exc_info=True)
if not _verify(self.init(), self.plugin_cls, iface):
return False
return True
@property
@ -196,8 +190,9 @@ class PluginEntryPoint:
"* {0}".format(self.name),
"Description: {0}".format(self.plugin_cls.description),
"Interfaces: {0}".format(", ".join(
iface.__name__ for iface in zope.interface.implementedBy(
self.plugin_cls))),
cls.__name__ for cls in self.plugin_cls.mro()
if cls.__module__ == 'certbot.interfaces'
)),
"Entry point: {0}".format(self.entry_point),
]
@ -260,11 +255,11 @@ class PluginsRegistry(Mapping):
plugin2 = other_ep.entry_point.dist.key if other_ep.entry_point.dist else "unknown"
raise Exception("Duplicate plugin name {0} from {1} and {2}.".format(
plugin_ep.name, plugin1, plugin2))
if interfaces.IPluginFactory.providedBy(plugin_ep.plugin_cls):
if _provides(plugin_ep.plugin_cls, interfaces.Plugin):
plugins[plugin_ep.name] = plugin_ep
else: # pragma: no cover
logger.warning(
"%r does not provide IPluginFactory, skipping", plugin_ep)
"%r does not inherit from Plugin, skipping", plugin_ep)
return plugin_ep
@ -339,3 +334,88 @@ class PluginsRegistry(Mapping):
if not self._plugins:
return "No plugins"
return "\n\n".join(str(p_ep) for p_ep in self._plugins.values())
_DEPRECATION_PLUGIN = ("Zope interface certbot.interfaces.IPlugin is deprecated, "
"use ABC certbot.interface.Plugin instead.")
_DEPRECATION_AUTHENTICATOR = ("Zope interface certbot.interfaces.IAuthenticator is deprecated, "
"use ABC certbot.interface.Authenticator instead.")
_DEPRECATION_INSTALLER = ("Zope interface certbot.interfaces.IInstaller is deprecated, "
"use ABC certbot.interface.Installer instead.")
_DEPRECATION_FACTORY = ("Zope interface certbot.interfaces.IPluginFactory is deprecated, "
"use ABC certbot.interface.Plugin instead.")
def _provides(target_class: Type[interfaces.Plugin], iface: Type) -> bool:
if issubclass(target_class, iface):
return True
if iface == interfaces.Plugin and interfaces.IPluginFactory.providedBy(target_class):
logging.warning(_DEPRECATION_FACTORY)
warnings.warn(_DEPRECATION_FACTORY, DeprecationWarning)
return True
return False
def _implements(target_class: Type[interfaces.Plugin], iface: Type) -> bool:
if issubclass(target_class, iface):
return True
if iface == interfaces.Plugin and interfaces.IPlugin.implementedBy(target_class):
logging.warning(_DEPRECATION_PLUGIN)
warnings.warn(_DEPRECATION_PLUGIN, DeprecationWarning)
return True
if iface == interfaces.Authenticator and interfaces.IAuthenticator.implementedBy(target_class):
logging.warning(_DEPRECATION_AUTHENTICATOR)
warnings.warn(_DEPRECATION_AUTHENTICATOR, DeprecationWarning)
return True
if iface == interfaces.Installer and interfaces.IInstaller.implementedBy(target_class):
logging.warning(_DEPRECATION_INSTALLER)
warnings.warn(_DEPRECATION_INSTALLER, DeprecationWarning)
return True
return False
def _verify(target_instance: interfaces.Plugin, target_class: Type[interfaces.Plugin],
iface: Type) -> bool:
if issubclass(target_class, iface):
# No need to trigger some verify logic for ABCs: when the object is instantiated,
# an error would be raised if implementation is not done properly.
# So the checks have been done effectively when the plugin has been initialized.
return True
zope_iface: Optional[Type[zope.interface.Interface]] = None
message = ""
if iface == interfaces.Plugin:
zope_iface = interfaces.IPlugin
message = _DEPRECATION_PLUGIN
if iface == interfaces.Authenticator:
zope_iface = interfaces.IAuthenticator
message = _DEPRECATION_AUTHENTICATOR
if iface == interfaces.Installer:
zope_iface = interfaces.IInstaller
message = _DEPRECATION_INSTALLER
if not zope_iface:
raise ValueError(f"Unexpected type: {iface.__name__}")
try:
zope.interface.verify.verifyObject(zope_iface, target_instance)
logging.warning(message)
warnings.warn(message, DeprecationWarning)
return True
except zope.interface.exceptions.BrokenImplementation as error:
if zope_iface.implementedBy(target_class):
logger.debug(
"%s implements %s but object does not verify: %s",
target_class, zope_iface.__name__, error, exc_info=True)
return False

View file

@ -2,8 +2,6 @@
import logging
from typing import Dict
import zope.interface
from acme import challenges
from certbot import achallenges
from certbot import errors
@ -21,9 +19,7 @@ from certbot.plugins import common
logger = logging.getLogger(__name__)
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(common.Plugin):
class Authenticator(common.Plugin, interfaces.Authenticator):
"""Manual authenticator
This plugin allows the user to perform the domain validation

View file

@ -1,22 +1,22 @@
"""Null plugin."""
import logging
import zope.interface
from certbot import interfaces
from certbot.plugins import common
logger = logging.getLogger(__name__)
@zope.interface.implementer(interfaces.IInstaller)
@zope.interface.provider(interfaces.IPluginFactory)
class Installer(common.Plugin):
class Installer(common.Plugin, interfaces.Installer):
"""Null installer."""
description = "Null Installer"
hidden = True
@classmethod
def add_parser_arguments(cls, add):
pass
# pylint: disable=missing-function-docstring
def prepare(self):

View file

@ -14,28 +14,28 @@ logger = logging.getLogger(__name__)
def pick_configurator(
config, default, plugins,
question="How would you like to authenticate and install "
"certificates?"):
config, default, plugins,
question="How would you like to authenticate and install "
"certificates?"):
"""Pick configurator plugin."""
return pick_plugin(
config, default, plugins, question,
(interfaces.IAuthenticator, interfaces.IInstaller))
(interfaces.Authenticator, interfaces.Installer))
def pick_installer(config, default, plugins,
question="How would you like to install certificates?"):
"""Pick installer plugin."""
return pick_plugin(
config, default, plugins, question, (interfaces.IInstaller,))
config, default, plugins, question, (interfaces.Installer,))
def pick_authenticator(
config, default, plugins, question="How would you "
"like to authenticate with the ACME CA?"):
config, default, plugins, question="How would you "
"like to authenticate with the ACME CA?"):
"""Pick authentication plugin."""
return pick_plugin(
config, default, plugins, question, (interfaces.IAuthenticator,))
config, default, plugins, question, (interfaces.Authenticator,))
def get_unprepared_installer(config, plugins):
@ -55,7 +55,7 @@ def get_unprepared_installer(config, plugins):
return None
installers = plugins.filter(lambda p_ep: p_ep.check_name(req_inst))
installers.init(config)
installers = installers.verify((interfaces.IInstaller,))
installers = installers.verify((interfaces.Installer,))
if len(installers) > 1:
raise errors.PluginSelectionError(
"Found multiple installers with the name %s, Certbot is unable to "

View file

@ -11,7 +11,6 @@ from typing import Tuple
from typing import TYPE_CHECKING
import OpenSSL
import zope.interface
from acme import challenges
from acme import standalone as acme_standalone
@ -108,9 +107,7 @@ class ServerManager:
return self._instances.copy()
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(common.Plugin):
class Authenticator(common.Plugin, interfaces.Authenticator):
"""Standalone Authenticator.
This authenticator creates its own ephemeral TCP listener on the

View file

@ -8,8 +8,6 @@ from typing import Dict
from typing import List
from typing import Set
import zope.interface
from acme import challenges
from certbot import errors
from certbot import interfaces
@ -26,9 +24,7 @@ from certbot.util import safe_open
logger = logging.getLogger(__name__)
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(common.Plugin):
class Authenticator(common.Plugin, interfaces.Authenticator):
"""Webroot Authenticator."""
description = "Place files in webroot directory"

View file

@ -184,7 +184,7 @@ def restore_required_config_elements(config, renewalparams):
for item_name, restore_func in required_items:
if item_name in renewalparams and not cli.set_by_cli(item_name):
value = restore_func(item_name, renewalparams[item_name])
setattr(config, item_name, value)
setattr(config.namespace, item_name, value)
def _remove_deprecated_config_elements(renewalparams):
@ -451,7 +451,7 @@ def handle_renewal_request(config):
else:
# This call is done only for retro-compatibility purposes.
# TODO: Remove this call once zope dependencies are removed from Certbot.
zope.component.provideUtility(lineage_config)
zope.component.provideUtility(lineage_config, interfaces.IConfig)
renewal_candidate.ensure_deployed()
from certbot._internal import main
plugins = plugins_disco.PluginsRegistry.find_all()

View file

@ -5,15 +5,11 @@ import queue
import sys
import textwrap
import zope.interface
from certbot import interfaces
from certbot import util
logger = logging.getLogger(__name__)
@zope.interface.implementer(interfaces.IReporter)
class Reporter:
"""Collects and displays information to the user.

View file

@ -1,18 +1,16 @@
"""Certbot user-supplied configuration."""
import copy
from typing import List
from typing import Optional
from urllib import parse
import zope.interface
from certbot import errors
from certbot import interfaces
from certbot import util
from certbot._internal import constants
from certbot.compat import misc
from certbot.compat import os
@zope.interface.implementer(interfaces.IConfig)
class NamespaceConfig:
"""Configuration wrapper around :class:`argparse.Namespace`.
@ -50,7 +48,9 @@ class NamespaceConfig:
self.namespace.logs_dir = os.path.abspath(self.namespace.logs_dir)
# Check command line parameters sanity, and error out in case of problem.
check_config_sanity(self)
_check_config_sanity(self)
# Delegate any attribute not explicitly defined to the underlying namespace object.
def __getattr__(self, name):
return getattr(self.namespace, name)
@ -58,49 +58,179 @@ class NamespaceConfig:
def __setattr__(self, name, value):
setattr(self.namespace, name, value)
@property
def server(self) -> str:
"""ACME Directory Resource URI."""
return self.namespace.server
@property
def email(self) -> Optional[str]:
"""Email used for registration and recovery contact.
Use comma to register multiple emails,
ex: u1@example.com,u2@example.com. (default: Ask).
"""
return self.namespace.email
@property
def rsa_key_size(self) -> int:
"""Size of the RSA key."""
return self.namespace.rsa_key_size
@property
def elliptic_curve(self) -> str:
"""The SECG elliptic curve name to use.
Please see RFC 8446 for supported values.
"""
return self.namespace.elliptic_curve
@property
def key_type(self) -> str:
"""Type of generated private key.
Only *ONE* per invocation can be provided at this time.
"""
return self.namespace.key_type
@property
def must_staple(self) -> bool:
"""Adds the OCSP Must Staple extension to the certificate.
Autoconfigures OCSP Stapling for supported setups
(Apache version >= 2.3.3 ).
"""
return self.namespace.must_staple
@property
def config_dir(self) -> str:
"""Configuration directory."""
return self.namespace.config_dir
@property
def work_dir(self) -> str:
"""Working directory."""
return self.namespace.work_dir
@property
def accounts_dir(self):
"""Directory where all account information is stored."""
return self.accounts_dir_for_server_path(self.server_path)
@property
def backup_dir(self):
"""Configuration backups directory."""
return os.path.join(self.namespace.work_dir, constants.BACKUP_DIR)
@property
def csr_dir(self):
"""Directory where new Certificate Signing Requests (CSRs) are saved."""
return os.path.join(self.namespace.config_dir, constants.CSR_DIR)
@property
def in_progress_dir(self):
"""Directory used before a permanent checkpoint is finalized."""
return os.path.join(self.namespace.work_dir, constants.IN_PROGRESS_DIR)
@property
def key_dir(self):
"""Keys storage."""
return os.path.join(self.namespace.config_dir, constants.KEY_DIR)
@property
def temp_checkpoint_dir(self):
"""Temporary checkpoint directory."""
return os.path.join(
self.namespace.work_dir, constants.TEMP_CHECKPOINT_DIR)
@property
def no_verify_ssl(self) -> bool:
"""Disable verification of the ACME server's certificate."""
return self.namespace.no_verify_ssl
@property
def http01_port(self) -> int:
"""Port used in the http-01 challenge.
This only affects the port Certbot listens on.
A conforming ACME server will still attempt to connect on port 80.
"""
return self.namespace.http01_port
@property
def http01_address(self) -> str:
"""The address the server listens to during http-01 challenge."""
return self.namespace.http01_address
@property
def https_port(self) -> int:
"""Port used to serve HTTPS.
This affects which port Nginx will listen on after a LE certificate
is installed.
"""
return self.namespace.https_port
@property
def pref_challs(self) -> List[str]:
"""List of user specified preferred challenges.
Sorted with the most preferred challenge listed first.
"""
return self.namespace.pref_challs
@property
def allow_subset_of_names(self) -> bool:
"""Allow only a subset of names to be authorized to perform validations.
When performing domain validation, do not consider it a failure
if authorizations can not be obtained for a strict subset of
the requested domains. This may be useful for allowing renewals for
multiple domains to succeed even if some domains no longer point
at this system.
"""
return self.namespace.allow_subset_of_names
@property
def strict_permissions(self) -> bool:
"""Enable strict permissions checks.
Require that all configuration files are owned by the current
user; only needed if your config is somewhere unsafe like /tmp/.
"""
return self.namespace.strict_permissions
@property
def disable_renew_updates(self) -> bool:
"""Disable renewal updates.
If updates provided by installer enhancements when Certbot is being run
with \"renew\" verb should be disabled.
"""
return self.namespace.disable_renew_updates
@property
def preferred_chain(self) -> Optional[str]:
"""Set the preferred certificate chain.
If the CA offers multiple certificate chains, prefer the chain whose
topmost certificate was issued from this Subject Common Name.
If no match, the default offered chain will be used.
"""
return self.namespace.preferred_chain
@property
def server_path(self):
"""File path based on ``server``."""
parsed = parse.urlparse(self.namespace.server)
return (parsed.netloc + parsed.path).replace('/', os.path.sep)
@property
def accounts_dir(self): # pylint: disable=missing-function-docstring
return self.accounts_dir_for_server_path(self.server_path)
def accounts_dir_for_server_path(self, server_path):
"""Path to accounts directory based on server_path"""
server_path = misc.underscores_for_unsupported_characters_in_path(server_path)
return os.path.join(
self.namespace.config_dir, constants.ACCOUNTS_DIR, server_path)
@property
def backup_dir(self): # pylint: disable=missing-function-docstring
return os.path.join(self.namespace.work_dir, constants.BACKUP_DIR)
@property
def csr_dir(self): # pylint: disable=missing-function-docstring
return os.path.join(self.namespace.config_dir, constants.CSR_DIR)
@property
def in_progress_dir(self): # pylint: disable=missing-function-docstring
return os.path.join(self.namespace.work_dir, constants.IN_PROGRESS_DIR)
@property
def key_dir(self): # pylint: disable=missing-function-docstring
return os.path.join(self.namespace.config_dir, constants.KEY_DIR)
@property
def temp_checkpoint_dir(self): # pylint: disable=missing-function-docstring
return os.path.join(
self.namespace.work_dir, constants.TEMP_CHECKPOINT_DIR)
def __deepcopy__(self, _memo):
# Work around https://bugs.python.org/issue1515 for py26 tests :( :(
# https://travis-ci.org/letsencrypt/letsencrypt/jobs/106900743#L3276
new_ns = copy.deepcopy(self.namespace)
return type(self)(new_ns)
@property
def default_archive_dir(self): # pylint: disable=missing-function-docstring
return os.path.join(self.namespace.config_dir, constants.ARCHIVE_DIR)
@ -138,8 +268,16 @@ class NamespaceConfig:
return os.path.join(self.renewal_hooks_dir,
constants.RENEWAL_POST_HOOKS_DIR)
# Magic methods
def check_config_sanity(config):
def __deepcopy__(self, _memo):
# Work around https://bugs.python.org/issue1515 for py26 tests :( :(
# https://travis-ci.org/letsencrypt/letsencrypt/jobs/106900743#L3276
new_ns = copy.deepcopy(self.namespace)
return type(self)(new_ns)
def _check_config_sanity(config):
"""Validate command line options and display error message if
requirements are not met.

View file

@ -1,16 +1,23 @@
"""Certbot client interfaces."""
import abc
from abc import ABCMeta
from abc import abstractmethod
from argparse import ArgumentParser
from typing import Iterable
from typing import List
from typing import Optional
import zope.interface
# pylint: disable=no-self-argument,no-method-argument,inherit-non-class
from acme.challenges import Challenge
from acme.challenges import ChallengeResponse
from certbot.achallenges import AnnotatedChallenge
from certbot import configuration
class AccountStorage(object, metaclass=abc.ABCMeta):
class AccountStorage(metaclass=ABCMeta):
"""Accounts storage interface."""
@abc.abstractmethod
@abstractmethod
def find_all(self): # pragma: no cover
"""Find all accounts.
@ -20,7 +27,7 @@ class AccountStorage(object, metaclass=abc.ABCMeta):
"""
raise NotImplementedError()
@abc.abstractmethod
@abstractmethod
def load(self, account_id): # pragma: no cover
"""Load an account by its id.
@ -30,7 +37,7 @@ class AccountStorage(object, metaclass=abc.ABCMeta):
"""
raise NotImplementedError()
@abc.abstractmethod
@abstractmethod
def save(self, account, client): # pragma: no cover
"""Save account.
@ -40,8 +47,20 @@ class AccountStorage(object, metaclass=abc.ABCMeta):
raise NotImplementedError()
class IPluginFactory(zope.interface.Interface):
"""IPlugin factory.
class IConfig(zope.interface.Interface): # pylint: disable=inherit-non-class
"""Deprecated, use certbot.configuration.NamespaceConfig instead."""
class IPluginFactory(zope.interface.Interface): # pylint: disable=inherit-non-class
"""Deprecated, use certbot.interfaces.Plugin as ABC instead."""
class IPlugin(zope.interface.Interface): # pylint: disable=inherit-non-class
"""Deprecated, use certbot.interfaces.Plugin as ABC instead."""
class Plugin(metaclass=ABCMeta):
"""Certbot plugin.
Objects providing this interface will be called without satisfying
any entry point "extras" (extra dependencies) you might have defined
@ -70,35 +89,21 @@ class IPluginFactory(zope.interface.Interface):
"""
description = zope.interface.Attribute("Short plugin description")
description: str = NotImplemented
"""Short plugin description"""
def __call__(config, name): # pylint: disable=signature-differs
"""Create new `IPlugin`.
@abstractmethod
def __init__(self, config: configuration.NamespaceConfig, name: str):
"""Create a new `Plugin`.
:param IConfig config: Configuration.
:param configuration.NamespaceConfig config: Configuration.
:param str name: Unique plugin name.
"""
super().__init__()
def inject_parser_options(parser, name):
"""Inject argument parser options (flags).
1. Be nice and prepend all options and destinations with
`~.common.option_namespace` and `~common.dest_namespace`.
2. Inject options (flags) only. Positional arguments are not
allowed, as this would break the CLI.
:param ArgumentParser parser: (Almost) top-level CLI parser.
:param str name: Unique plugin name.
"""
class IPlugin(zope.interface.Interface):
"""Certbot plugin."""
def prepare(): # type: ignore
@abstractmethod
def prepare(self) -> None:
"""Prepare the plugin.
Finish up any additional initialization.
@ -117,7 +122,8 @@ class IPlugin(zope.interface.Interface):
"""
def more_info(): # type: ignore
@abstractmethod
def more_info(self) -> str:
"""Human-readable string to help the user.
Should describe the steps taken and any relevant info to help the user
@ -127,8 +133,28 @@ class IPlugin(zope.interface.Interface):
"""
@classmethod
@abstractmethod
def inject_parser_options(cls, parser: ArgumentParser, name: str) -> None:
"""Inject argument parser options (flags).
class IAuthenticator(IPlugin):
1. Be nice and prepend all options and destinations with
`~.common.option_namespace` and `~common.dest_namespace`.
2. Inject options (flags) only. Positional arguments are not
allowed, as this would break the CLI.
:param ArgumentParser parser: (Almost) top-level CLI parser.
:param str name: Unique plugin name.
"""
class IAuthenticator(IPlugin): # pylint: disable=inherit-non-class
"""Deprecated, use certbot.interfaces.Authenticator as ABC instead."""
class Authenticator(Plugin):
"""Generic Certbot Authenticator.
Class represents all possible tools processes that have the
@ -136,7 +162,8 @@ class IAuthenticator(IPlugin):
"""
def get_chall_pref(domain):
@abstractmethod
def get_chall_pref(self, domain: str) -> Iterable[Challenge]:
"""Return `collections.Iterable` of challenge preferences.
:param str domain: Domain for which challenge preferences are sought.
@ -149,7 +176,8 @@ class IAuthenticator(IPlugin):
"""
def perform(achalls):
@abstractmethod
def perform(self, achalls: List[AnnotatedChallenge]) -> Iterable[ChallengeResponse]:
"""Perform the given challenge.
:param list achalls: Non-empty (guaranteed) list of
@ -169,7 +197,8 @@ class IAuthenticator(IPlugin):
"""
def cleanup(achalls):
@abstractmethod
def cleanup(self, achalls: List[AnnotatedChallenge]) -> None:
"""Revert changes and shutdown after challenges complete.
This method should be able to revert all changes made by
@ -184,90 +213,11 @@ class IAuthenticator(IPlugin):
"""
class IConfig(zope.interface.Interface):
"""Certbot user-supplied configuration.
.. warning:: The values stored in the configuration have not been
filtered, stripped or sanitized.
"""
server = zope.interface.Attribute("ACME Directory Resource URI.")
email = zope.interface.Attribute(
"Email used for registration and recovery contact. Use comma to "
"register multiple emails, ex: u1@example.com,u2@example.com. "
"(default: Ask).")
rsa_key_size = zope.interface.Attribute("Size of the RSA key.")
elliptic_curve = zope.interface.Attribute(
"The SECG elliptic curve name to use. Please see RFC 8446 "
"for supported values."
)
key_type = zope.interface.Attribute(
"Type of generated private key"
"(Only *ONE* per invocation can be provided at this time)")
must_staple = zope.interface.Attribute(
"Adds the OCSP Must Staple extension to the certificate. "
"Autoconfigures OCSP Stapling for supported setups "
"(Apache version >= 2.3.3 ).")
config_dir = zope.interface.Attribute("Configuration directory.")
work_dir = zope.interface.Attribute("Working directory.")
accounts_dir = zope.interface.Attribute(
"Directory where all account information is stored.")
backup_dir = zope.interface.Attribute("Configuration backups directory.")
csr_dir = zope.interface.Attribute(
"Directory where newly generated Certificate Signing Requests "
"(CSRs) are saved.")
in_progress_dir = zope.interface.Attribute(
"Directory used before a permanent checkpoint is finalized.")
key_dir = zope.interface.Attribute("Keys storage.")
temp_checkpoint_dir = zope.interface.Attribute(
"Temporary checkpoint directory.")
no_verify_ssl = zope.interface.Attribute(
"Disable verification of the ACME server's certificate.")
http01_port = zope.interface.Attribute(
"Port used in the http-01 challenge. "
"This only affects the port Certbot listens on. "
"A conforming ACME server will still attempt to connect on port 80.")
http01_address = zope.interface.Attribute(
"The address the server listens to during http-01 challenge.")
https_port = zope.interface.Attribute(
"Port used to serve HTTPS. "
"This affects which port Nginx will listen on after a LE certificate "
"is installed.")
pref_challs = zope.interface.Attribute(
"Sorted user specified preferred challenges"
"type strings with the most preferred challenge listed first")
allow_subset_of_names = zope.interface.Attribute(
"When performing domain validation, do not consider it a failure "
"if authorizations can not be obtained for a strict subset of "
"the requested domains. This may be useful for allowing renewals for "
"multiple domains to succeed even if some domains no longer point "
"at this system. This is a boolean")
strict_permissions = zope.interface.Attribute(
"Require that all configuration files are owned by the current "
"user; only needed if your config is somewhere unsafe like /tmp/."
"This is a boolean")
disable_renew_updates = zope.interface.Attribute(
"If updates provided by installer enhancements when Certbot is being run"
" with \"renew\" verb should be disabled.")
preferred_chain = zope.interface.Attribute(
"If the CA offers multiple certificate chains, prefer the chain whose "
"topmost certificate was issued from this Subject Common Name. "
"If no match, the default offered chain will be used."
)
class IInstaller(IPlugin): # pylint: disable=inherit-non-class
"""Deprecated, use certbot.interfaces.Installer as ABC instead."""
class IInstaller(IPlugin):
class Installer(Plugin):
"""Generic Certbot Installer Interface.
Represents any server that an X509 certificate can be placed.
@ -282,14 +232,17 @@ class IInstaller(IPlugin):
"""
def get_all_names(): # type: ignore
@abstractmethod
def get_all_names(self) -> Iterable[str]:
"""Returns all names that may be authenticated.
:rtype: `collections.Iterable` of `str`
"""
def deploy_cert(domain, cert_path, key_path, chain_path, fullchain_path):
@abstractmethod
def deploy_cert(self, domain: str, cert_path: str, key_path: str,
chain_path: str, fullchain_path: str) -> None:
"""Deploy certificate.
:param str domain: domain to deploy certificate file
@ -303,7 +256,8 @@ class IInstaller(IPlugin):
"""
def enhance(domain, enhancement, options=None):
@abstractmethod
def enhance(self, domain: str, enhancement: str, options: Optional[List[str]] = None) -> None:
"""Perform a configuration enhancement.
:param str domain: domain for which to provide enhancement
@ -319,7 +273,8 @@ class IInstaller(IPlugin):
"""
def supported_enhancements(): # type: ignore
@abstractmethod
def supported_enhancements(self) -> List[str]:
"""Returns a `collections.Iterable` of supported enhancements.
:returns: supported enhancements which should be a subset of
@ -328,7 +283,8 @@ class IInstaller(IPlugin):
"""
def save(title: Optional[str] = None, temporary: bool = False):
@abstractmethod
def save(self, title: Optional[str] = None, temporary: bool = False) -> None:
"""Saves all changes to the configuration files.
Both title and temporary are needed because a save may be
@ -350,14 +306,16 @@ class IInstaller(IPlugin):
"""
def rollback_checkpoints(rollback: int = 1):
@abstractmethod
def rollback_checkpoints(self, rollback: int = 1) -> None:
"""Revert `rollback` number of configuration checkpoints.
:raises .PluginError: when configuration cannot be fully reverted
"""
def recovery_routine(): # type: ignore
@abstractmethod
def recovery_routine(self) -> None:
"""Revert configuration to most recent finalized checkpoint.
Remove all changes (temporary and permanent) that have not been
@ -368,14 +326,16 @@ class IInstaller(IPlugin):
"""
def config_test(): # type: ignore
@abstractmethod
def config_test(self) -> None:
"""Make sure the configuration is valid.
:raises .MisconfigurationError: when the config is not in a usable state
"""
def restart(): # type: ignore
@abstractmethod
def restart(self) -> None:
"""Restart or refresh the server content.
:raises .PluginError: when server cannot be restarted
@ -383,174 +343,19 @@ class IInstaller(IPlugin):
"""
class IDisplay(zope.interface.Interface):
"""Generic display."""
# see https://github.com/certbot/certbot/issues/3915
def notification(message, pause, wrap=True, force_interactive=False):
"""Displays a string message
:param str message: Message to display
:param bool pause: Whether or not the application should pause for
confirmation (if available)
:param bool wrap: Whether or not the application should wrap text
:param bool force_interactive: True if it's safe to prompt the user
because it won't cause any workflow regressions
"""
def menu(message, choices, ok_label=None,
cancel_label=None, help_label=None,
default=None, cli_flag=None, force_interactive=False):
"""Displays a generic menu.
When not setting force_interactive=True, you must provide a
default value.
:param str message: message to display
:param choices: choices
:type choices: :class:`list` of :func:`tuple` or :class:`str`
:param str ok_label: label for OK button (UNUSED)
:param str cancel_label: label for Cancel button (UNUSED)
:param str help_label: label for Help button (UNUSED)
:param int default: default (non-interactive) choice from the menu
:param str cli_flag: to automate choice from the menu, eg "--keep"
:param bool force_interactive: True if it's safe to prompt the user
because it won't cause any workflow regressions
:returns: tuple of (`code`, `index`) where
`code` - str display exit code
`index` - int index of the user's selection
:raises errors.MissingCommandlineFlag: if called in non-interactive
mode without a default set
"""
def input(message, default=None, cli_args=None, force_interactive=False):
"""Accept input from the user.
When not setting force_interactive=True, you must provide a
default value.
:param str message: message to display to the user
:param str default: default (non-interactive) response to prompt
:param bool force_interactive: True if it's safe to prompt the user
because it won't cause any workflow regressions
:returns: tuple of (`code`, `input`) where
`code` - str display exit code
`input` - str of the user's input
:rtype: tuple
:raises errors.MissingCommandlineFlag: if called in non-interactive
mode without a default set
"""
def yesno(message, yes_label="Yes", no_label="No", default=None,
cli_args=None, force_interactive=False):
"""Query the user with a yes/no question.
Yes and No label must begin with different letters.
When not setting force_interactive=True, you must provide a
default value.
:param str message: question for the user
:param str default: default (non-interactive) choice from the menu
:param str cli_flag: to automate choice from the menu, eg "--redirect / --no-redirect"
:param bool force_interactive: True if it's safe to prompt the user
because it won't cause any workflow regressions
:returns: True for "Yes", False for "No"
:rtype: bool
:raises errors.MissingCommandlineFlag: if called in non-interactive
mode without a default set
"""
def checklist(message, tags, default=None, cli_args=None, force_interactive=False):
"""Allow for multiple selections from a menu.
When not setting force_interactive=True, you must provide a
default value.
:param str message: message to display to the user
:param list tags: where each is of type :class:`str` len(tags) > 0
:param str default: default (non-interactive) state of the checklist
:param str cli_flag: to automate choice from the menu, eg "--domains"
:param bool force_interactive: True if it's safe to prompt the user
because it won't cause any workflow regressions
:returns: tuple of the form (code, list_tags) where
`code` - int display exit code
`list_tags` - list of str tags selected by the user
:rtype: tuple
:raises errors.MissingCommandlineFlag: if called in non-interactive
mode without a default set
"""
def directory_select(self, message, default=None,
cli_flag=None, force_interactive=False):
"""Display a directory selection screen.
When not setting force_interactive=True, you must provide a
default value.
:param str message: prompt to give the user
:param default: the default value to return, if one exists, when
using the NoninteractiveDisplay
:param str cli_flag: option used to set this value with the CLI,
if one exists, to be included in error messages given by
NoninteractiveDisplay
:param bool force_interactive: True if it's safe to prompt the user
because it won't cause any workflow regressions
:returns: tuple of the form (`code`, `string`) where
`code` - int display exit code
`string` - input entered by the user
"""
class IDisplay(zope.interface.Interface): # pylint: disable=inherit-non-class
"""Deprecated, use your own Display implementation instead."""
class IReporter(zope.interface.Interface):
"""Interface to collect and display information to the user."""
HIGH_PRIORITY = zope.interface.Attribute(
"Used to denote high priority messages")
MEDIUM_PRIORITY = zope.interface.Attribute(
"Used to denote medium priority messages")
LOW_PRIORITY = zope.interface.Attribute(
"Used to denote low priority messages")
def add_message(self, msg, priority, on_crash=True):
"""Adds msg to the list of messages to be printed.
:param str msg: Message to be displayed to the user.
:param int priority: One of HIGH_PRIORITY, MEDIUM_PRIORITY, or
LOW_PRIORITY.
:param bool on_crash: Whether or not the message should be printed if
the program exits abnormally.
"""
def print_messages(self):
"""Prints messages to the user and clears the message queue."""
class IReporter(zope.interface.Interface): # pylint: disable=inherit-non-class
"""Deprecated, use your own Reporter implementation instead."""
class RenewableCert(object, metaclass=abc.ABCMeta):
class RenewableCert(metaclass=ABCMeta):
"""Interface to a certificate lineage."""
@property
@abc.abstractmethod
@abstractmethod
def cert_path(self):
"""Path to the certificate file.
@ -559,7 +364,7 @@ class RenewableCert(object, metaclass=abc.ABCMeta):
"""
@property
@abc.abstractmethod
@abstractmethod
def key_path(self):
"""Path to the private key file.
@ -568,7 +373,7 @@ class RenewableCert(object, metaclass=abc.ABCMeta):
"""
@property
@abc.abstractmethod
@abstractmethod
def chain_path(self):
"""Path to the certificate chain file.
@ -577,7 +382,7 @@ class RenewableCert(object, metaclass=abc.ABCMeta):
"""
@property
@abc.abstractmethod
@abstractmethod
def fullchain_path(self):
"""Path to the full chain file.
@ -588,7 +393,7 @@ class RenewableCert(object, metaclass=abc.ABCMeta):
"""
@property
@abc.abstractmethod
@abstractmethod
def lineagename(self):
"""Name given to the certificate lineage.
@ -596,7 +401,7 @@ class RenewableCert(object, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
@abstractmethod
def names(self):
"""What are the subject names of this certificate?
@ -616,7 +421,8 @@ class RenewableCert(object, metaclass=abc.ABCMeta):
# an update during the run or install subcommand, it should do so when
# :func:`IInstaller.deploy_cert` is called.
class GenericUpdater(object, metaclass=abc.ABCMeta):
class GenericUpdater(metaclass=ABCMeta):
"""Interface for update types not currently specified by Certbot.
This class allows plugins to perform types of updates that Certbot hasn't
@ -631,7 +437,7 @@ class GenericUpdater(object, metaclass=abc.ABCMeta):
interface methods of `interfaces.IInstaller` such as prepare() and restart()
"""
@abc.abstractmethod
@abstractmethod
def generic_updates(self, lineage, *args, **kwargs):
"""Perform any update types defined by the installer.
@ -648,7 +454,7 @@ class GenericUpdater(object, metaclass=abc.ABCMeta):
"""
class RenewDeployer(object, metaclass=abc.ABCMeta):
class RenewDeployer(metaclass=ABCMeta):
"""Interface for update types run when a lineage is renewed
This class allows plugins to perform types of updates that need to run at
@ -659,7 +465,7 @@ class RenewDeployer(object, metaclass=abc.ABCMeta):
be called from the installer code.
"""
@abc.abstractmethod
@abstractmethod
def renew_deploy(self, lineage, *args, **kwargs):
"""Perform updates defined by installer when a certificate has been renewed

View file

@ -1,4 +1,5 @@
"""Plugin common functions."""
from abc import ABCMeta
import logging
import re
import shutil
@ -7,16 +8,16 @@ from typing import List
from josepy import util as jose_util
import pkg_resources
import zope.interface
from certbot import achallenges
from certbot import crypto_util
from certbot import errors
from certbot import interfaces
from certbot import reverter
from certbot._internal import constants
from certbot.compat import filesystem
from certbot.compat import os
from certbot.interfaces import Installer as AbstractInstaller
from certbot.interfaces import Plugin as AbstractPlugin
from certbot.plugins.storage import PluginStorage
logger = logging.getLogger(__name__)
@ -39,13 +40,11 @@ hostname_regex = re.compile(
r"^(([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])\.)*[a-z]+$", re.IGNORECASE)
@zope.interface.implementer(interfaces.IPlugin)
class Plugin:
class Plugin(AbstractPlugin, metaclass=ABCMeta):
"""Generic plugin."""
# provider is not inherited, subclasses must define it on their own
# @zope.interface.provider(interfaces.IPluginFactory)
def __init__(self, config, name):
super().__init__(config, name)
self.config = config
self.name = name
@ -106,7 +105,7 @@ class Plugin:
Should describe, in simple language, what the authenticator tried to do, what went
wrong and what the user should try as their "next steps".
TODO: auth_hint belongs in IAuthenticator but can't be added until the next major
TODO: auth_hint belongs in Authenticator but can't be added until the next major
version of Certbot. For now, it lives in .Plugin and auth_handler will only call it
on authenticators that subclass .Plugin. For now, inherit from `.Plugin` to implement
and/or override the method.
@ -125,7 +124,7 @@ class Plugin:
.format(name=self.name, challs=challs))
class Installer(Plugin):
class Installer(AbstractInstaller, Plugin, metaclass=ABCMeta):
"""An installer base class with reverter and ssl_dhparam methods defined.
Installer plugins do not have to inherit from this class.

View file

@ -5,7 +5,6 @@ import logging
from time import sleep
import configobj
import zope.interface
from acme import challenges
from certbot import errors
@ -19,10 +18,8 @@ from certbot.plugins import common
logger = logging.getLogger(__name__)
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class DNSAuthenticator(common.Plugin):
"""Base class for DNS Authenticators"""
class DNSAuthenticator(common.Plugin, interfaces.Authenticator, metaclass=abc.ABCMeta):
"""Base class for DNS Authenticators"""
def __init__(self, config, name):
super().__init__(config, name)
@ -38,6 +35,7 @@ class DNSAuthenticator(common.Plugin):
'to verify the DNS record.')
def auth_hint(self, failed_achalls):
"""See certbot.plugins.common.Plugin.auth_hint."""
delay = self.conf('propagation-seconds')
return (
'The Certificate Authority failed to verify the DNS TXT records created by --{name}. '

View file

@ -7,6 +7,9 @@ from multiprocessing import Process
import shutil
import sys
import tempfile
from typing import Iterable
from typing import List
from typing import Optional
import unittest
import warnings
@ -16,15 +19,16 @@ import josepy as jose
import OpenSSL
import pkg_resources
from certbot import interfaces
from certbot import configuration
from certbot import util
from certbot._internal import configuration
from certbot._internal import constants
from certbot._internal import lock
from certbot._internal import storage
from certbot._internal.display import obj as display_obj
from certbot.compat import filesystem
from certbot.compat import os
from certbot.display import util as display_util
from certbot.plugins import common
try:
# When we remove this deprecated import, we should also remove the
@ -41,6 +45,41 @@ except ImportError: # pragma: no cover
from unittest import mock # type: ignore
class DummyInstaller(common.Installer):
"""Dummy installer plugin for test purpose."""
def get_all_names(self) -> Iterable[str]:
pass
def deploy_cert(self, domain: str, cert_path: str, key_path: str, chain_path: str,
fullchain_path: str) -> None:
pass
def enhance(self, domain: str, enhancement: str, options: Optional[List[str]] = None) -> None:
pass
def supported_enhancements(self) -> List[str]:
pass
def save(self, title: Optional[str] = None, temporary: bool = False) -> None:
pass
def config_test(self) -> None:
pass
def restart(self) -> None:
pass
@classmethod
def add_parser_arguments(cls, add):
pass
def prepare(self) -> None:
pass
def more_info(self) -> str:
pass
def vector_path(*names):
"""Path to a test vector."""
return pkg_resources.resource_filename(
@ -313,10 +352,13 @@ class FreezableMock:
def _create_display_util_mock():
display = FreezableMock()
# Use pylint code for disable to keep on single line under line length limit
for name in interfaces.IDisplay.names():
if name != 'notification':
method_list = [func for func in dir(display_obj.FileDisplay)
if callable(getattr(display_obj.FileDisplay, func))
and not func.startswith("__")]
for method in method_list:
if method != 'notification':
frozen_mock = FreezableMock(frozen=True, func=_assert_valid_call)
setattr(display, name, frozen_mock)
setattr(display, method, frozen_mock)
display.freeze()
return FreezableMock(frozen=True, return_value=display)
@ -337,14 +379,17 @@ def _create_display_util_mock_with_stdout(stdout):
display = FreezableMock()
# Use pylint code for disable to keep on single line under line length limit
for name in interfaces.IDisplay.names():
if name == 'notification':
method_list = [func for func in dir(display_obj.FileDisplay)
if callable(getattr(display_obj.FileDisplay, func))
and not func.startswith("__")]
for method in method_list:
if method == 'notification':
frozen_mock = FreezableMock(frozen=True,
func=_write_msg)
else:
frozen_mock = FreezableMock(frozen=True,
func=mock_method)
setattr(display, name, frozen_mock)
setattr(display, method, frozen_mock)
display.freeze()
return FreezableMock(frozen=True, return_value=display)
@ -389,14 +434,14 @@ class ConfigTestCase(TempDirTestCase):
self.config = configuration.NamespaceConfig(
mock.MagicMock(**constants.CLI_DEFAULTS)
)
self.config.verb = "certonly"
self.config.config_dir = os.path.join(self.tempdir, 'config')
self.config.work_dir = os.path.join(self.tempdir, 'work')
self.config.logs_dir = os.path.join(self.tempdir, 'logs')
self.config.cert_path = constants.CLI_DEFAULTS['auth_cert_path']
self.config.fullchain_path = constants.CLI_DEFAULTS['auth_chain_path']
self.config.chain_path = constants.CLI_DEFAULTS['auth_chain_path']
self.config.server = "https://example.com"
self.config.namespace.verb = "certonly"
self.config.namespace.config_dir = os.path.join(self.tempdir, 'config')
self.config.namespace.work_dir = os.path.join(self.tempdir, 'work')
self.config.namespace.logs_dir = os.path.join(self.tempdir, 'logs')
self.config.namespace.cert_path = constants.CLI_DEFAULTS['auth_cert_path']
self.config.namespace.fullchain_path = constants.CLI_DEFAULTS['auth_chain_path']
self.config.namespace.chain_path = constants.CLI_DEFAULTS['auth_chain_path']
self.config.namespace.server = "https://example.com"
def _handle_lock(event_in, event_out, path):

View file

@ -3,15 +3,11 @@
For full examples, see `certbot.plugins`.
"""
import zope.interface
from certbot import interfaces
from certbot.plugins import common
@zope.interface.implementer(interfaces.IAuthenticator)
@zope.interface.provider(interfaces.IPluginFactory)
class Authenticator(common.Plugin):
class Authenticator(common.Plugin, interfaces.Authenticator):
"""Example Authenticator."""
description = "Example Authenticator plugin"
@ -20,9 +16,7 @@ class Authenticator(common.Plugin):
# "self" as first argument, e.g. def prepare(self)...
@zope.interface.implementer(interfaces.IInstaller)
@zope.interface.provider(interfaces.IPluginFactory)
class Installer(common.Plugin):
class Installer(common.Plugin, interfaces.Installer):
"""Example Installer."""
description = "Example Installer plugin"

View file

@ -12,8 +12,7 @@ try:
except ImportError: # pragma: no cover
from unittest import mock
from certbot import errors
from certbot._internal import configuration
from certbot import errors, configuration
from certbot._internal.storage import ALL_FOUR
from certbot.compat import filesystem
from certbot.compat import os

View file

@ -1,4 +1,4 @@
"""Tests for certbot._internal.configuration."""
"""Tests for certbot.configuration."""
import unittest
try:
@ -14,7 +14,7 @@ from certbot.tests import util as test_util
class NamespaceConfigTest(test_util.ConfigTestCase):
"""Tests for certbot._internal.configuration.NamespaceConfig."""
"""Tests for certbot.configuration.NamespaceConfig."""
def setUp(self):
super().setUp()
@ -25,7 +25,7 @@ class NamespaceConfigTest(test_util.ConfigTestCase):
def test_init_same_ports(self):
self.config.namespace.https_port = 4321
from certbot._internal.configuration import NamespaceConfig
from certbot.configuration import NamespaceConfig
self.assertRaises(errors.Error, NamespaceConfig, self.config.namespace)
def test_proxy_getattr(self):
@ -41,7 +41,7 @@ class NamespaceConfigTest(test_util.ConfigTestCase):
self.assertEqual(['user:pass@acme.server:443', 'p', 'a', 't', 'h'],
self.config.server_path.split(os.path.sep))
@mock.patch('certbot._internal.configuration.constants')
@mock.patch('certbot.configuration.constants')
def test_dynamic_dirs(self, mock_constants):
mock_constants.ACCOUNTS_DIR = 'acc'
mock_constants.BACKUP_DIR = 'backups'
@ -73,7 +73,7 @@ class NamespaceConfigTest(test_util.ConfigTestCase):
os.path.normpath(os.path.join(self.config.work_dir, 't')))
def test_absolute_paths(self):
from certbot._internal.configuration import NamespaceConfig
from certbot.configuration import NamespaceConfig
config_base = "foo"
work_base = "bar"
@ -106,7 +106,7 @@ class NamespaceConfigTest(test_util.ConfigTestCase):
self.assertTrue(os.path.isabs(config.key_dir))
self.assertTrue(os.path.isabs(config.temp_checkpoint_dir))
@mock.patch('certbot._internal.configuration.constants')
@mock.patch('certbot.configuration.constants')
def test_renewal_dynamic_dirs(self, mock_constants):
mock_constants.ARCHIVE_DIR = 'a'
mock_constants.LIVE_DIR = 'l'
@ -121,7 +121,7 @@ class NamespaceConfigTest(test_util.ConfigTestCase):
self.config.config_dir, 'renewal_configs'))
def test_renewal_absolute_paths(self):
from certbot._internal.configuration import NamespaceConfig
from certbot.configuration import NamespaceConfig
config_base = "foo"
work_base = "bar"

View file

@ -2,15 +2,18 @@
import contextlib
import signal
import sys
from typing import Callable
from typing import Dict
from typing import Union
import unittest
from typing import Callable, Dict, Union
from certbot.compat import os
try:
import mock
except ImportError: # pragma: no cover
from unittest import mock
from certbot.compat import os
def get_signals(signums):

View file

@ -4,8 +4,8 @@ import logging
import logging.handlers
import sys
import time
import unittest
from typing import Optional
import unittest
from acme import messages
from certbot import errors

View file

@ -10,19 +10,18 @@ import shutil
import sys
import tempfile
import traceback
import unittest
from typing import List
import unittest
import josepy as jose
import pytz
from certbot import crypto_util
from certbot import crypto_util, configuration
from certbot import errors
from certbot import interfaces # pylint: disable=unused-import
from certbot import util
from certbot._internal import account
from certbot._internal import cli
from certbot._internal import configuration
from certbot._internal import constants
from certbot._internal import main
from certbot._internal import updater
@ -891,7 +890,7 @@ class MainTest(test_util.ConfigTestCase):
@mock.patch('certbot._internal.main.plugins_disco')
@mock.patch('certbot._internal.main.cli.HelpfulArgumentParser.determine_help_topics')
def test_plugins_no_args(self, _det, mock_disco):
ifaces: List[interfaces.IPlugin] = []
ifaces: List[interfaces.Plugin] = []
plugins = mock_disco.PluginsRegistry.find_all()
stdout = io.StringIO()
@ -906,7 +905,7 @@ class MainTest(test_util.ConfigTestCase):
@mock.patch('certbot._internal.main.plugins_disco')
@mock.patch('certbot._internal.main.cli.HelpfulArgumentParser.determine_help_topics')
def test_plugins_no_args_unprivileged(self, _det, mock_disco):
ifaces: List[interfaces.IPlugin] = []
ifaces: List[interfaces.Plugin] = []
plugins = mock_disco.PluginsRegistry.find_all()
def throw_error(directory, mode, strict):
@ -928,7 +927,7 @@ class MainTest(test_util.ConfigTestCase):
@mock.patch('certbot._internal.main.plugins_disco')
@mock.patch('certbot._internal.main.cli.HelpfulArgumentParser.determine_help_topics')
def test_plugins_init(self, _det, mock_disco):
ifaces: List[interfaces.IPlugin] = []
ifaces: List[interfaces.Plugin] = []
plugins = mock_disco.PluginsRegistry.find_all()
stdout = io.StringIO()
@ -946,7 +945,7 @@ class MainTest(test_util.ConfigTestCase):
@mock.patch('certbot._internal.main.plugins_disco')
@mock.patch('certbot._internal.main.cli.HelpfulArgumentParser.determine_help_topics')
def test_plugins_prepare(self, _det, mock_disco):
ifaces: List[interfaces.IPlugin] = []
ifaces: List[interfaces.Plugin] = []
plugins = mock_disco.PluginsRegistry.find_all()
stdout = io.StringIO()

View file

@ -10,15 +10,17 @@ from cryptography.exceptions import InvalidSignature
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes # type: ignore
try:
import mock
except ImportError: # pragma: no cover
from unittest import mock
import pytz
from certbot import errors
from certbot.tests import util as test_util
try:
import mock
except ImportError: # pragma: no cover
from unittest import mock
try:
# Only cryptography>=2.5 has ocsp module
# and signature_hash_algorithm attribute in OCSPResponse class

View file

@ -24,6 +24,7 @@ ACHALL = achallenges.KeyAuthorizationAnnotatedChallenge(
"pending"),
domain="encryption-example.demo", account_key=AUTH_KEY)
class NamespaceFunctionsTest(unittest.TestCase):
"""Tests for certbot.plugins.common.*_namespace functions."""
@ -47,6 +48,12 @@ class PluginTest(unittest.TestCase):
from certbot.plugins.common import Plugin
class MockPlugin(Plugin): # pylint: disable=missing-docstring
def prepare(self) -> None:
pass
def more_info(self) -> str:
pass
@classmethod
def add_parser_arguments(cls, add):
add("foo-bar", dest="different_to_foo_bar", x=1, y=None)
@ -97,9 +104,9 @@ class InstallerTest(test_util.ConfigTestCase):
def setUp(self):
super().setUp()
filesystem.mkdir(self.config.config_dir)
from certbot.plugins.common import Installer
from certbot.tests.util import DummyInstaller
self.installer = Installer(config=self.config,
self.installer = DummyInstaller(config=self.config,
name="Installer")
self.reverter = self.installer.reverter

View file

@ -1,13 +1,9 @@
"""Tests for certbot._internal.plugins.disco."""
import functools
import string
import unittest
from typing import List
import unittest
try:
import mock
except ImportError: # pragma: no cover
from unittest import mock
import pkg_resources
import zope.interface
@ -17,6 +13,12 @@ from certbot._internal.plugins import null
from certbot._internal.plugins import standalone
from certbot._internal.plugins import webroot
try:
import mock
except ImportError: # pragma: no cover
from unittest import mock
EP_SA = pkg_resources.EntryPoint(
"sa", "certbot._internal.plugins.standalone",
attrs=("Authenticator",),
@ -95,10 +97,10 @@ class PluginEntryPointTest(unittest.TestCase):
"Long desc not found", self.plugin_ep.long_description)
def test_ifaces(self):
self.assertTrue(self.plugin_ep.ifaces((interfaces.IAuthenticator,)))
self.assertFalse(self.plugin_ep.ifaces((interfaces.IInstaller,)))
self.assertTrue(self.plugin_ep.ifaces((interfaces.Authenticator,)))
self.assertFalse(self.plugin_ep.ifaces((interfaces.Installer,)))
self.assertFalse(self.plugin_ep.ifaces((
interfaces.IInstaller, interfaces.IAuthenticator)))
interfaces.Installer, interfaces.Authenticator)))
def test__init__(self):
self.assertIs(self.plugin_ep.initialized, False)
@ -135,16 +137,16 @@ class PluginEntryPointTest(unittest.TestCase):
self.plugin_ep._initialized = plugin = mock.MagicMock()
exceptions = zope.interface.exceptions
with mock.patch("certbot._internal.plugins."
"disco.zope.interface") as mock_zope:
mock_zope.exceptions = exceptions
with mock.patch("certbot._internal.plugins.disco._verify") as mock_verify:
mock_verify.exceptions = exceptions
def verify_object(iface, obj): # pylint: disable=missing-docstring
def verify_object(obj, cls, iface): # pylint: disable=missing-docstring
assert obj is plugin
assert iface is iface1 or iface is iface2 or iface is iface3
if iface is iface3:
raise mock_zope.exceptions.BrokenImplementation(None, None)
mock_zope.verify.verifyObject.side_effect = verify_object
return False
return True
mock_verify.side_effect = verify_object
self.assertTrue(self.plugin_ep.verify((iface1,)))
self.assertTrue(self.plugin_ep.verify((iface1, iface2)))
self.assertFalse(self.plugin_ep.verify((iface3,)))

View file

@ -33,16 +33,16 @@ class ConveniencePickPluginTest(unittest.TestCase):
def test_authenticator(self):
from certbot._internal.plugins.selection import pick_authenticator
self._test(pick_authenticator, (interfaces.IAuthenticator,))
self._test(pick_authenticator, (interfaces.Authenticator,))
def test_installer(self):
from certbot._internal.plugins.selection import pick_installer
self._test(pick_installer, (interfaces.IInstaller,))
self._test(pick_installer, (interfaces.Installer,))
def test_configurator(self):
from certbot._internal.plugins.selection import pick_configurator
self._test(pick_configurator,
(interfaces.IAuthenticator, interfaces.IInstaller))
(interfaces.Authenticator, interfaces.Installer))
class PickPluginTest(unittest.TestCase):

View file

@ -1,17 +1,20 @@
"""Tests for certbot.plugins.storage.PluginStorage"""
import json
from typing import Iterable
from typing import List
from typing import Optional
import unittest
from certbot import errors
from certbot.compat import filesystem
from certbot.compat import os
from certbot.tests import util as test_util
try:
import mock
except ImportError: # pragma: no cover
from unittest import mock
from certbot import errors
from certbot.compat import filesystem
from certbot.compat import os
from certbot.plugins import common
from certbot.tests import util as test_util
class PluginStorageTest(test_util.ConfigTestCase):
@ -19,7 +22,7 @@ class PluginStorageTest(test_util.ConfigTestCase):
def setUp(self):
super().setUp()
self.plugin_cls = common.Installer
self.plugin_cls = test_util.DummyInstaller
filesystem.mkdir(self.config.config_dir)
with mock.patch("certbot.reverter.util"):
self.plugin = self.plugin_cls(config=self.config, name="mockplugin")
@ -101,7 +104,6 @@ class PluginStorageTest(test_util.ConfigTestCase):
plugin2.storage.fetch, "first")
self.assertEqual(plugin1.storage.fetch("first_key"), "first_value")
def test_saved_state(self):
self.plugin.storage.put("testkey", "testvalue")
# Write to disk

View file

@ -1,18 +1,17 @@
"""Tests for certbot._internal.renewal"""
import copy
import unittest
from acme import challenges
from certbot import errors, configuration
from certbot._internal import storage
import certbot.tests.util as test_util
try:
import mock
except ImportError: # pragma: no cover
from unittest import mock
from acme import challenges
from certbot import errors
from certbot._internal import configuration
from certbot._internal import storage
import certbot.tests.util as test_util
class RenewalTest(test_util.ConfigTestCase):