mirror of
https://github.com/certbot/certbot.git
synced 2026-05-28 04:34:11 -04:00
Revert "Initial IP address support: use san.SAN types internally (#10468)"
This reverts commit d638200d12.
This commit is contained in:
parent
b1cf53ff6b
commit
66ace42a43
19 changed files with 350 additions and 435 deletions
|
|
@ -16,7 +16,6 @@ from certbot import errors
|
|||
from certbot import ocsp
|
||||
from certbot import util
|
||||
from certbot._internal import storage
|
||||
from certbot._internal import san
|
||||
from certbot.compat import os
|
||||
from certbot.display import util as display_util
|
||||
|
||||
|
|
@ -94,16 +93,16 @@ def lineage_for_certname(cli_config: configuration.NamespaceConfig,
|
|||
return None
|
||||
|
||||
|
||||
def sans_for_certname(config: configuration.NamespaceConfig,
|
||||
certname: str) -> Optional[list[san.SAN]]:
|
||||
"""Find the domains and/or IP addresses in the cert with name certname."""
|
||||
def domains_for_certname(config: configuration.NamespaceConfig,
|
||||
certname: str) -> Optional[list[str]]:
|
||||
"""Find the domains in the cert with name certname."""
|
||||
lineage = lineage_for_certname(config, certname)
|
||||
return lineage.sans() if lineage else None
|
||||
return lineage.names() if lineage else None
|
||||
|
||||
|
||||
def find_duplicative_certs(config: configuration.NamespaceConfig,
|
||||
sans: list[san.SAN]) -> tuple[Optional[storage.RenewableCert],
|
||||
Optional[storage.RenewableCert]]:
|
||||
domains: list[str]) -> tuple[Optional[storage.RenewableCert],
|
||||
Optional[storage.RenewableCert]]:
|
||||
"""Find existing certs that match the given domain names.
|
||||
|
||||
This function searches for certificates whose domains are equal to
|
||||
|
|
@ -137,15 +136,15 @@ def find_duplicative_certs(config: configuration.NamespaceConfig,
|
|||
# TODO: Handle these differently depending on whether they are
|
||||
# expired or still valid?
|
||||
identical_names_cert, subset_names_cert = rv
|
||||
candidate_names = set(candidate_lineage.sans())
|
||||
if candidate_names == set(sans):
|
||||
candidate_names = set(candidate_lineage.names())
|
||||
if candidate_names == set(domains):
|
||||
identical_names_cert = candidate_lineage
|
||||
elif candidate_names.issubset(set(sans)):
|
||||
elif candidate_names.issubset(set(domains)):
|
||||
# This logic finds and returns the largest subset-names cert
|
||||
# in the case where there are several available.
|
||||
if subset_names_cert is None:
|
||||
subset_names_cert = candidate_lineage
|
||||
elif len(candidate_names) > len(subset_names_cert.sans()):
|
||||
elif len(candidate_names) > len(subset_names_cert.names()):
|
||||
subset_names_cert = candidate_lineage
|
||||
return (identical_names_cert, subset_names_cert)
|
||||
|
||||
|
|
@ -252,11 +251,9 @@ def human_readable_cert_info(config: configuration.NamespaceConfig, cert: storag
|
|||
certinfo = []
|
||||
checker = ocsp.RevocationChecker()
|
||||
|
||||
config_sans = set(config.domains)
|
||||
|
||||
if config.certname and cert.lineagename != config.certname and not skip_filter_checks:
|
||||
return None
|
||||
if config_sans and not config_sans.issubset(cert.sans()):
|
||||
if config.domains and not set(config.domains).issubset(cert.names()):
|
||||
return None
|
||||
now = datetime.datetime.now(datetime.timezone.utc)
|
||||
|
||||
|
|
@ -284,7 +281,7 @@ def human_readable_cert_info(config: configuration.NamespaceConfig, cert: storag
|
|||
certinfo.append(f" Certificate Name: {cert.lineagename}\n"
|
||||
f" Serial Number: {serial}\n"
|
||||
f" Key Type: {cert.private_key_type}\n"
|
||||
f' Identifiers: {" ".join(map(str, cert.sans()))}\n'
|
||||
f' Domains: {" ".join(cert.names())}\n'
|
||||
f" Expiry Date: {valid_string}\n"
|
||||
f" Certificate Path: {cert.fullchain}\n"
|
||||
f" Private Key Path: {cert.privkey}")
|
||||
|
|
|
|||
|
|
@ -13,8 +13,8 @@ from typing import Union
|
|||
from acme import challenges
|
||||
from certbot import configuration
|
||||
from certbot import errors
|
||||
from certbot import util
|
||||
from certbot._internal import constants
|
||||
from certbot._internal import san
|
||||
from certbot.compat import os
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
|
@ -103,7 +103,7 @@ class _DomainsAction(argparse.Action):
|
|||
|
||||
|
||||
def add_domains(args_or_config: Union[argparse.Namespace, configuration.NamespaceConfig],
|
||||
domains: Optional[str]) -> list[san.DNSName]:
|
||||
domains: Optional[str]) -> list[str]:
|
||||
"""Registers new domains to be used during the current client run.
|
||||
|
||||
Domains are not added to the list of requested domains if they have
|
||||
|
|
@ -118,12 +118,12 @@ def add_domains(args_or_config: Union[argparse.Namespace, configuration.Namespac
|
|||
:rtype: `list` of `str`
|
||||
|
||||
"""
|
||||
validated_domains: list[san.DNSName] = []
|
||||
validated_domains: list[str] = []
|
||||
if not domains:
|
||||
return validated_domains
|
||||
|
||||
for d in domains.split(","):
|
||||
domain = san.DNSName(d.strip())
|
||||
for domain in domains.split(","):
|
||||
domain = util.enforce_domain_sanity(domain.strip())
|
||||
validated_domains.append(domain)
|
||||
if domain not in args_or_config.domains:
|
||||
args_or_config.domains.append(domain)
|
||||
|
|
|
|||
|
|
@ -9,17 +9,16 @@ from typing import Optional
|
|||
from typing import Union
|
||||
|
||||
import configargparse
|
||||
from cryptography import x509
|
||||
|
||||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
from certbot import util
|
||||
from certbot._internal import constants
|
||||
from certbot._internal import hooks
|
||||
from certbot._internal import san
|
||||
from certbot._internal.cli.cli_constants import COMMAND_OVERVIEW
|
||||
from certbot._internal.cli.cli_constants import HELP_AND_VERSION_USAGE
|
||||
from certbot._internal.cli.cli_constants import SHORT_USAGE
|
||||
from certbot._internal.cli.cli_utils import add_domains
|
||||
from certbot._internal.cli.cli_utils import CustomHelpFormatter
|
||||
from certbot._internal.cli.cli_utils import flag_default
|
||||
from certbot._internal.cli.cli_utils import HelpfulArgumentGroup
|
||||
|
|
@ -299,7 +298,7 @@ class HelpfulArgumentParser:
|
|||
hooks.validate_hooks(config)
|
||||
|
||||
if config.allow_subset_of_names:
|
||||
if any(d.is_wildcard() for d in config.domains):
|
||||
if any(util.is_wildcard_domain(d) for d in config.domains):
|
||||
raise errors.Error("Using --allow-subset-of-names with a"
|
||||
" wildcard domain is not supported.")
|
||||
|
||||
|
|
@ -330,13 +329,12 @@ class HelpfulArgumentParser:
|
|||
raise errors.Error("--allow-subset-of-names cannot be used with --csr")
|
||||
|
||||
csrfile, contents = config.csr[0:2]
|
||||
typ, util_csr, _ = crypto_util.import_csr_file(csrfile, contents)
|
||||
x509_req = x509.load_pem_x509_csr(util_csr.data)
|
||||
domains, _ = san.from_x509(x509_req.subject, x509_req.extensions)
|
||||
typ, csr, domains = crypto_util.import_csr_file(csrfile, contents)
|
||||
|
||||
# The SANs from the CSR are added to the domains from command line flags as this config
|
||||
# setting is where main.certonly gets the list of identifiers to request.
|
||||
config.domains.extend(domains)
|
||||
# This is not necessary for webroot to work, however,
|
||||
# obtain_certificate_from_csr requires config.domains to be set
|
||||
for domain in domains:
|
||||
add_domains(config, domain)
|
||||
|
||||
if not domains:
|
||||
# TODO: add CN to domains instead:
|
||||
|
|
@ -344,15 +342,14 @@ class HelpfulArgumentParser:
|
|||
"Unfortunately, your CSR %s needs to have a SubjectAltName for every domain"
|
||||
% config.csr[0])
|
||||
|
||||
config.actual_csr = (util_csr, typ)
|
||||
config.actual_csr = (csr, typ)
|
||||
|
||||
# Check that the original values for --domain set by the user were
|
||||
# a subset of the domains listed in the CSR.
|
||||
if set(config.domains) != set(domains):
|
||||
csr_domains = {d.lower() for d in domains}
|
||||
config_domains = set(config.domains)
|
||||
if csr_domains != config_domains:
|
||||
raise errors.ConfigurationError(
|
||||
"Inconsistent requests:\nFrom the CSR: {0}\nFrom command line/config: {1}"
|
||||
.format(san.display(domains),
|
||||
san.display(config.domains)))
|
||||
"Inconsistent domain requests:\nFrom the CSR: {0}\nFrom command line/config: {1}"
|
||||
.format(", ".join(csr_domains), ", ".join(config_domains)))
|
||||
|
||||
|
||||
def determine_verb(self) -> None:
|
||||
|
|
|
|||
|
|
@ -34,7 +34,6 @@ from certbot._internal import cli
|
|||
from certbot._internal import constants
|
||||
from certbot._internal import eff
|
||||
from certbot._internal import error_handler
|
||||
from certbot._internal import san
|
||||
from certbot._internal import storage
|
||||
from certbot._internal.plugins import disco as plugin_disco
|
||||
from certbot._internal.plugins import selection as plugin_selection
|
||||
|
|
@ -349,13 +348,13 @@ class Client:
|
|||
cert, chain = crypto_util.cert_and_chain_from_fullchain(fullchain)
|
||||
return cert.encode(), chain.encode()
|
||||
|
||||
def obtain_certificate(self, sans: list[san.SAN], old_keypath: Optional[str] = None
|
||||
def obtain_certificate(self, domains: list[str], old_keypath: Optional[str] = None
|
||||
) -> tuple[bytes, bytes, util.Key, util.CSR]:
|
||||
"""Obtains a certificate from the ACME server.
|
||||
|
||||
`.register` must be called before `.obtain_certificate`
|
||||
|
||||
:param list sans: domains and/or IP addresses for which to get a certificate.
|
||||
:param list domains: domains to get a certificate
|
||||
|
||||
:returns: certificate as PEM string, chain as PEM string,
|
||||
newly generated private key (`.util.Key`), and DER-encoded
|
||||
|
|
@ -399,9 +398,6 @@ class Client:
|
|||
elif self.config.rsa_key_size and self.config.key_type.lower() == 'rsa':
|
||||
key_size = self.config.rsa_key_size
|
||||
|
||||
domains, _ = san.split(sans)
|
||||
domains_str = [d.dns_name for d in domains]
|
||||
|
||||
# Create CSR from names
|
||||
if self.config.dry_run:
|
||||
key = key or util.Key(
|
||||
|
|
@ -415,8 +411,7 @@ class Client:
|
|||
)
|
||||
csr = util.CSR(file=None, form="pem",
|
||||
data=acme_crypto_util.make_csr(
|
||||
key.pem, domains_str, self.config.must_staple,
|
||||
))
|
||||
key.pem, domains, self.config.must_staple))
|
||||
else:
|
||||
key = key or crypto_util.generate_key(
|
||||
key_size=key_size,
|
||||
|
|
@ -426,43 +421,43 @@ class Client:
|
|||
strict_permissions=self.config.strict_permissions,
|
||||
)
|
||||
csr = crypto_util.generate_csr(
|
||||
key, domains_str, None, self.config.must_staple, self.config.strict_permissions)
|
||||
key, domains, None, self.config.must_staple, self.config.strict_permissions)
|
||||
|
||||
try:
|
||||
orderr = self._get_order_and_authorizations(csr.data, self.config.allow_subset_of_names)
|
||||
except messages.Error as error:
|
||||
# Some sans may be rejected during order creation.
|
||||
# Some domains may be rejected during order creation.
|
||||
# Certbot can retry the operation without the rejected
|
||||
# sans contained within subproblems.
|
||||
# domains contained within subproblems.
|
||||
if self.config.allow_subset_of_names:
|
||||
successful_sans = self._successful_sans_from_error(error, sans)
|
||||
if successful_sans != sans and len(successful_sans) != 0:
|
||||
return self._retry_obtain_certificate(sans, successful_sans, old_keypath)
|
||||
successful_domains = self._successful_domains_from_error(error, domains)
|
||||
if successful_domains != domains and len(successful_domains) != 0:
|
||||
return self._retry_obtain_certificate(domains, successful_domains, old_keypath)
|
||||
raise
|
||||
authzr = orderr.authorizations
|
||||
auth_ident_values = {a.body.identifier.value for a in authzr}
|
||||
successful_sans = [s for s in sans if str(s) in auth_ident_values]
|
||||
auth_domains = {a.body.identifier.value for a in authzr}
|
||||
successful_domains = [d for d in domains if d in auth_domains]
|
||||
|
||||
# allow_subset_of_names is currently disabled for wildcard
|
||||
# certificates. The reason for this and checking allow_subset_of_names
|
||||
# below is because successful_sans == sans is never true if
|
||||
# sans contains a wildcard because the ACME spec forbids identifiers
|
||||
# below is because successful_domains == domains is never true if
|
||||
# domains contains a wildcard because the ACME spec forbids identifiers
|
||||
# in authzs from containing a wildcard character.
|
||||
if self.config.allow_subset_of_names and successful_sans != sans:
|
||||
return self._retry_obtain_certificate(sans, successful_sans, old_keypath)
|
||||
if self.config.allow_subset_of_names and successful_domains != domains:
|
||||
return self._retry_obtain_certificate(domains, successful_domains, old_keypath)
|
||||
else:
|
||||
try:
|
||||
cert, chain = self.obtain_certificate_from_csr(csr, orderr)
|
||||
return cert, chain, key, csr
|
||||
except messages.Error as error:
|
||||
# Some sans may be rejected during the very late stage of
|
||||
# Some domains may be rejected during the very late stage of
|
||||
# order finalization. Certbot can retry the operation without
|
||||
# the rejected sans contained within subproblems.
|
||||
# the rejected domains contained within subproblems.
|
||||
if self.config.allow_subset_of_names:
|
||||
successful_sans = self._successful_sans_from_error(error, sans)
|
||||
if successful_sans != sans and len(successful_sans) != 0:
|
||||
successful_domains = self._successful_domains_from_error(error, domains)
|
||||
if successful_domains != domains and len(successful_domains) != 0:
|
||||
return self._retry_obtain_certificate(
|
||||
sans, successful_sans, old_keypath)
|
||||
domains, successful_domains, old_keypath)
|
||||
raise
|
||||
|
||||
def _get_order_and_authorizations(self, csr_pem: bytes,
|
||||
|
|
@ -510,16 +505,16 @@ class Client:
|
|||
authzr = self.auth_handler.handle_authorizations(orderr, self.config, best_effort)
|
||||
return orderr.update(authorizations=authzr)
|
||||
|
||||
def obtain_and_enroll_certificate(self, sans: list[san.SAN], certname: Optional[str]
|
||||
def obtain_and_enroll_certificate(self, domains: list[str], certname: Optional[str]
|
||||
) -> Optional[storage.RenewableCert]:
|
||||
"""Obtain and enroll certificate.
|
||||
|
||||
Get a new certificate for the specified domains and/or IP addresses using the specified
|
||||
Get a new certificate for the specified domains using the specified
|
||||
authenticator and installer, and then create a new renewable lineage
|
||||
containing it.
|
||||
|
||||
:param sans: domains and/or IP addresses to request a certificate for
|
||||
:type sans: `list` of `san.SAN`
|
||||
:param domains: domains to request a certificate for
|
||||
:type domains: `list` of `str`
|
||||
:param certname: requested name of lineage
|
||||
:type certname: `str` or `None`
|
||||
|
||||
|
|
@ -527,8 +522,8 @@ class Client:
|
|||
referred to the enrolled cert lineage, or None if doing a successful dry run.
|
||||
|
||||
"""
|
||||
new_name = self._choose_lineagename(sans, certname)
|
||||
cert, chain, key, _ = self.obtain_certificate(sans)
|
||||
new_name = self._choose_lineagename(domains, certname)
|
||||
cert, chain, key, _ = self.obtain_certificate(domains)
|
||||
|
||||
if (self.config.config_dir != constants.CLI_DEFAULTS["config_dir"] or
|
||||
self.config.work_dir != constants.CLI_DEFAULTS["work_dir"]):
|
||||
|
|
@ -544,38 +539,29 @@ class Client:
|
|||
key.pem, chain,
|
||||
self.config)
|
||||
|
||||
def _successful_sans_from_error(self, error: messages.Error, sans: list[san.SAN],
|
||||
) -> list[san.SAN]:
|
||||
def _successful_domains_from_error(self, error: messages.Error, domains: list[str],
|
||||
) -> list[str]:
|
||||
if error.subproblems is not None:
|
||||
failed_sans: list[san.SAN] = []
|
||||
for problem in error.subproblems:
|
||||
if not problem.identifier:
|
||||
continue
|
||||
match problem.identifier.typ:
|
||||
case messages.IDENTIFIER_FQDN:
|
||||
failed_sans.append(san.DNSName(problem.identifier.value))
|
||||
case messages.IDENTIFIER_IP:
|
||||
failed_sans.append(san.IPAddress(problem.identifier.value))
|
||||
case _:
|
||||
raise TypeError(f"invalid identifier type {problem.identifier.typ}")
|
||||
successful_sans = [x for x in sans if x not in failed_sans]
|
||||
return successful_sans
|
||||
failed_domains = [problem.identifier.value for problem in error.subproblems
|
||||
if problem.identifier is not None]
|
||||
successful_domains = [x for x in domains if x not in failed_domains]
|
||||
return successful_domains
|
||||
return []
|
||||
|
||||
def _retry_obtain_certificate(self, sans: list[san.SAN], successful_sans: list[san.SAN],
|
||||
def _retry_obtain_certificate(self, domains: list[str], successful_domains: list[str],
|
||||
old_keypath: Optional[str]
|
||||
) -> tuple[bytes, bytes, util.Key, util.CSR]:
|
||||
failed_sans = [s for s in sans if s not in successful_sans]
|
||||
failed_sans_list = san.display(failed_sans)
|
||||
failed_domains = [d for d in domains if d not in successful_domains]
|
||||
domains_list = ", ".join(failed_domains)
|
||||
display_util.notify("Unable to obtain a certificate with every requested "
|
||||
f"domain. Retrying without: {failed_sans_list}")
|
||||
return self.obtain_certificate(successful_sans, old_keypath)
|
||||
f"domain. Retrying without: {domains_list}")
|
||||
return self.obtain_certificate(successful_domains, old_keypath)
|
||||
|
||||
def _choose_lineagename(self, sans: list[san.SAN], certname: Optional[str]) -> str:
|
||||
def _choose_lineagename(self, domains: list[str], certname: Optional[str]) -> str:
|
||||
"""Chooses a name for the new lineage.
|
||||
|
||||
:param sans: domains and/or IP addresses in certificate request
|
||||
:type sans: `list` of `san.SAN`
|
||||
:param domains: domains in certificate request
|
||||
:type domains: `list` of `str`
|
||||
:param certname: requested name of lineage
|
||||
:type certname: `str` or `None`
|
||||
|
||||
|
|
@ -589,11 +575,11 @@ class Client:
|
|||
lineagename = None
|
||||
if certname:
|
||||
lineagename = certname
|
||||
elif sans[0].is_wildcard():
|
||||
elif util.is_wildcard_domain(domains[0]):
|
||||
# Don't make files and directories starting with *.
|
||||
lineagename = str(sans[0])[2:]
|
||||
lineagename = domains[0][2:]
|
||||
else:
|
||||
lineagename = str(sans[0])
|
||||
lineagename = domains[0]
|
||||
# Verify whether chosen lineage is valid
|
||||
if self._is_valid_lineagename(lineagename):
|
||||
return lineagename
|
||||
|
|
@ -655,11 +641,11 @@ class Client:
|
|||
|
||||
return abs_cert_path, abs_chain_path, abs_fullchain_path
|
||||
|
||||
def deploy_certificate(self, sans: list[san.DNSName], privkey_path: str, cert_path: str,
|
||||
def deploy_certificate(self, domains: list[str], privkey_path: str, cert_path: str,
|
||||
chain_path: str, fullchain_path: str) -> None:
|
||||
"""Install certificate
|
||||
|
||||
:param list sans: list of domains/and or IP addresses to install the certificate
|
||||
:param list domains: list of domains to install the certificate
|
||||
:param str privkey_path: path to certificate private key
|
||||
:param str cert_path: certificate file path (optional)
|
||||
:param str fullchain_path: path to the full chain of the certificate
|
||||
|
|
@ -676,13 +662,10 @@ class Client:
|
|||
display_util.notify("Deploying certificate")
|
||||
|
||||
msg = "Could not install certificate"
|
||||
domains, ip_addresses = san.split(sans)
|
||||
if ip_addresses:
|
||||
raise TypeError("deploy of IP address certificate not supported")
|
||||
with error_handler.ErrorHandler(self._recovery_routine_with_msg, msg):
|
||||
for dom in domains:
|
||||
self.installer.deploy_cert(
|
||||
domain=dom.dns_name, cert_path=os.path.abspath(cert_path),
|
||||
domain=dom, cert_path=os.path.abspath(cert_path),
|
||||
key_path=os.path.abspath(privkey_path),
|
||||
chain_path=chain_path,
|
||||
fullchain_path=fullchain_path)
|
||||
|
|
@ -697,17 +680,18 @@ class Client:
|
|||
# sites may have been enabled / final cleanup
|
||||
self.installer.restart()
|
||||
|
||||
def enhance_config(self, domains: list[san.DNSName], chain_path: str,
|
||||
def enhance_config(self, domains: list[str], chain_path: str,
|
||||
redirect_default: bool = True) -> None:
|
||||
"""Enhance the configuration.
|
||||
|
||||
:param list domains: list of domains to configure.
|
||||
:param list domains: list of domains to configure
|
||||
:param chain_path: chain file path
|
||||
:type chain_path: `str` or `None`
|
||||
:param redirect_default: boolean value that the "redirect" flag should default to
|
||||
|
||||
:raises .errors.Error: if no installer is specified in the
|
||||
client.
|
||||
|
||||
"""
|
||||
if self.installer is None:
|
||||
logger.error("No installer is specified, there isn't any "
|
||||
|
|
@ -740,11 +724,11 @@ class Client:
|
|||
with error_handler.ErrorHandler(self._rollback_and_restart, msg):
|
||||
self.installer.restart()
|
||||
|
||||
def apply_enhancement(self, domains: list[san.DNSName], enhancement: str,
|
||||
def apply_enhancement(self, domains: list[str], enhancement: str,
|
||||
options: Optional[str] = None) -> None:
|
||||
"""Applies an enhancement on all domains.
|
||||
|
||||
:param list domains: list of ssl_vhosts (as san.DNSName)
|
||||
:param list domains: list of ssl_vhosts (as strings)
|
||||
:param str enhancement: name of enhancement, e.g. ensure-http-header
|
||||
:param str options: options to enhancement, e.g. Strict-Transport-Security
|
||||
|
||||
|
|
@ -761,7 +745,7 @@ class Client:
|
|||
with error_handler.ErrorHandler(self._recovery_routine_with_msg, None):
|
||||
for dom in domains:
|
||||
try:
|
||||
self.installer.enhance(dom.dns_name, enhancement, options)
|
||||
self.installer.enhance(dom, enhancement, options)
|
||||
except errors.PluginEnhancementAlreadyPresent:
|
||||
logger.info("Enhancement %s was already set.", enh_label)
|
||||
except errors.PluginError:
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import textwrap
|
|||
from typing import Optional
|
||||
|
||||
from acme import messages as acme_messages
|
||||
from certbot._internal import san
|
||||
from certbot.compat import misc
|
||||
|
||||
|
||||
|
|
@ -84,28 +83,28 @@ def separate_list_input(input_: str) -> list[str]:
|
|||
return [str(string) for string in no_commas.split()]
|
||||
|
||||
|
||||
def summarize_sans(sans: list[san.SAN]) -> str:
|
||||
"""Summarizes a list of identifiers in the format of:
|
||||
example.com.com and N more
|
||||
or if there are only two identifiers:
|
||||
example.com and 192.0.2.77
|
||||
or if there is only one identifier:
|
||||
def summarize_domain_list(domains: list[str]) -> str:
|
||||
"""Summarizes a list of domains in the format of:
|
||||
example.com.com and N more domains
|
||||
or if there is are only two domains:
|
||||
example.com and www.example.com
|
||||
or if there is only one domain:
|
||||
example.com
|
||||
|
||||
:param list sans: `san.SAN` list of domains and/or IP addresses
|
||||
:returns: a summary
|
||||
:param list domains: `str` list of domains
|
||||
:returns: the domain list summary
|
||||
:rtype: str
|
||||
"""
|
||||
if not sans:
|
||||
if not domains:
|
||||
return ""
|
||||
|
||||
length = len(sans)
|
||||
length = len(domains)
|
||||
if length == 1:
|
||||
return str(sans[0])
|
||||
return domains[0]
|
||||
elif length == 2:
|
||||
return f"{sans[0]} and {sans[1]}"
|
||||
return " and ".join(domains)
|
||||
else:
|
||||
return f"{sans[0]} and {length - 1} more"
|
||||
return "{0} and {1} more domains".format(domains[0], length-1)
|
||||
|
||||
|
||||
def describe_acme_error(error: acme_messages.Error) -> str:
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ from typing import Optional
|
|||
from certbot import configuration
|
||||
from certbot import errors
|
||||
from certbot import util
|
||||
from certbot._internal import san
|
||||
from certbot.compat import filesystem
|
||||
from certbot.compat import misc
|
||||
from certbot.compat import os
|
||||
|
|
@ -101,7 +100,7 @@ def _run_pre_hook_if_necessary(command: str) -> None:
|
|||
|
||||
def post_hook(
|
||||
config: configuration.NamespaceConfig,
|
||||
renewed_sans: list[san.SAN]
|
||||
renewed_domains: list[str]
|
||||
) -> None:
|
||||
|
||||
"""Run post-hooks if defined.
|
||||
|
|
@ -130,17 +129,17 @@ def post_hook(
|
|||
_run_eventually(hook)
|
||||
# certonly / run
|
||||
else:
|
||||
renewed_sans_str = ' '.join(map(str, renewed_sans))
|
||||
renewed_domains_str = ' '.join(renewed_domains)
|
||||
# 32k is reasonable on Windows and likely quite conservative on other platforms
|
||||
if len(renewed_sans_str) > 32_000:
|
||||
if len(renewed_domains_str) > 32_000:
|
||||
logger.warning("Limiting RENEWED_DOMAINS environment variable to 32k characters")
|
||||
renewed_sans_str = renewed_sans_str[:32_000]
|
||||
renewed_domains_str = renewed_domains_str[:32_000]
|
||||
for hook in all_hooks:
|
||||
_run_hook(
|
||||
"post-hook",
|
||||
hook,
|
||||
{
|
||||
'RENEWED_DOMAINS': renewed_sans_str,
|
||||
'RENEWED_DOMAINS': renewed_domains_str,
|
||||
# Since other commands stop certbot execution on failure,
|
||||
# it doesn't make sense to have a FAILED_DOMAINS variable
|
||||
'FAILED_DOMAINS': ""
|
||||
|
|
@ -164,48 +163,48 @@ def _run_eventually(command: str) -> None:
|
|||
post_hooks.append(command)
|
||||
|
||||
|
||||
def run_saved_post_hooks(renewed_sans: list[san.SAN], failed_sans: list[san.SAN]) -> None:
|
||||
def run_saved_post_hooks(renewed_domains: list[str], failed_domains: list[str]) -> None:
|
||||
"""Run any post hooks that were saved up in the course of the 'renew' verb"""
|
||||
|
||||
renewed_sans_str = ' '.join(map(str, renewed_sans))
|
||||
failed_sans_str = ' '.join(map(str, failed_sans))
|
||||
renewed_domains_str = ' '.join(renewed_domains)
|
||||
failed_domains_str = ' '.join(failed_domains)
|
||||
|
||||
# 32k combined is reasonable on Windows and likely quite conservative on other platforms
|
||||
if len(renewed_sans_str) > 16_000:
|
||||
if len(renewed_domains_str) > 16_000:
|
||||
logger.warning("Limiting RENEWED_DOMAINS environment variable to 16k characters")
|
||||
renewed_sans_str = renewed_sans_str[:16_000]
|
||||
renewed_domains_str = renewed_domains_str[:16_000]
|
||||
|
||||
if len(failed_sans_str) > 16_000:
|
||||
if len(failed_domains_str) > 16_000:
|
||||
logger.warning("Limiting FAILED_DOMAINS environment variable to 16k characters")
|
||||
renewed_sans_str = failed_sans_str[:16_000]
|
||||
renewed_domains_str = failed_domains_str[:16_000]
|
||||
|
||||
for cmd in post_hooks:
|
||||
_run_hook(
|
||||
"post-hook",
|
||||
cmd,
|
||||
{
|
||||
'RENEWED_DOMAINS': renewed_sans_str,
|
||||
'FAILED_DOMAINS': failed_sans_str
|
||||
'RENEWED_DOMAINS': renewed_domains_str,
|
||||
'FAILED_DOMAINS': failed_domains_str
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def deploy_hook(config: configuration.NamespaceConfig, sans: list[san.SAN],
|
||||
def deploy_hook(config: configuration.NamespaceConfig, domains: list[str],
|
||||
lineage_path: str) -> None:
|
||||
"""Run post-issuance hook if defined.
|
||||
|
||||
:param configuration.NamespaceConfig config: Certbot settings
|
||||
:param sans: domains and/or IP addresses in the obtained certificate
|
||||
:type sans: `list` of `str`
|
||||
:param domains: domains in the obtained certificate
|
||||
:type domains: `list` of `str`
|
||||
:param str lineage_path: live directory path for the new cert
|
||||
|
||||
"""
|
||||
if config.deploy_hook:
|
||||
_run_deploy_hook(config.deploy_hook, sans,
|
||||
_run_deploy_hook(config.deploy_hook, domains,
|
||||
lineage_path, config.dry_run, config.run_deploy_hooks)
|
||||
|
||||
|
||||
def renew_hook(config: configuration.NamespaceConfig, sans: list[san.SAN],
|
||||
def renew_hook(config: configuration.NamespaceConfig, domains: list[str],
|
||||
lineage_path: str) -> None:
|
||||
"""Run post-renewal hooks.
|
||||
|
||||
|
|
@ -218,8 +217,8 @@ def renew_hook(config: configuration.NamespaceConfig, sans: list[san.SAN],
|
|||
logged saying that they were skipped.
|
||||
|
||||
:param configuration.NamespaceConfig config: Certbot settings
|
||||
:param sans: domains and/or IP addresses in the obtained certificate
|
||||
:type sans: `list` of `san.SAN`
|
||||
:param domains: domains in the obtained certificate
|
||||
:type domains: `list` of `str`
|
||||
:param str lineage_path: live directory path for the new cert
|
||||
|
||||
"""
|
||||
|
|
@ -231,11 +230,11 @@ def renew_hook(config: configuration.NamespaceConfig, sans: list[san.SAN],
|
|||
if hook in executed_hooks:
|
||||
logger.info("Skipping deploy-hook '%s' as it was already run.", hook)
|
||||
else:
|
||||
_run_deploy_hook(hook, sans, lineage_path, config.dry_run, config.run_deploy_hooks)
|
||||
_run_deploy_hook(hook, domains, lineage_path, config.dry_run, config.run_deploy_hooks)
|
||||
executed_hooks.add(hook)
|
||||
|
||||
|
||||
def _run_deploy_hook(command: str, sans: list[san.SAN], lineage_path: str, dry_run: bool,
|
||||
def _run_deploy_hook(command: str, domains: list[str], lineage_path: str, dry_run: bool,
|
||||
run_deploy_hooks: bool) -> None:
|
||||
"""Run the specified deploy-hook (if not doing a dry run).
|
||||
|
||||
|
|
@ -244,8 +243,8 @@ def _run_deploy_hook(command: str, sans: list[san.SAN], lineage_path: str, dry_r
|
|||
after setting the appropriate environment variables.
|
||||
|
||||
:param str command: command to run as a deploy-hook
|
||||
:param sans: domains and/or IP addresses in the obtained certificate
|
||||
:type sans: `list` of `san.SAN`
|
||||
:param domains: domains in the obtained certificate
|
||||
:type domains: `list` of `str`
|
||||
:param str lineage_path: live directory path for the new cert
|
||||
:param bool dry_run: True iff Certbot is doing a dry run
|
||||
:param bool run_deploy_hooks: True if deploy hooks should run despite Certbot doing a dry run
|
||||
|
|
@ -256,7 +255,7 @@ def _run_deploy_hook(command: str, sans: list[san.SAN], lineage_path: str, dry_r
|
|||
command)
|
||||
return
|
||||
|
||||
os.environ["RENEWED_DOMAINS"] = " ".join(map(str, sans))
|
||||
os.environ["RENEWED_DOMAINS"] = " ".join(domains)
|
||||
os.environ["RENEWED_LINEAGE"] = lineage_path
|
||||
_run_hook("deploy-hook", command)
|
||||
|
||||
|
|
|
|||
|
|
@ -37,7 +37,6 @@ from certbot._internal import eff
|
|||
from certbot._internal import hooks
|
||||
from certbot._internal import log
|
||||
from certbot._internal import renewal
|
||||
from certbot._internal import san
|
||||
from certbot._internal import snap_config
|
||||
from certbot._internal import storage
|
||||
from certbot._internal import updater
|
||||
|
|
@ -86,7 +85,7 @@ def _suggest_donation_if_appropriate(config: configuration.NamespaceConfig) -> N
|
|||
|
||||
|
||||
def _get_and_save_cert(le_client: client.Client, config: configuration.NamespaceConfig,
|
||||
sans: Optional[list[san.SAN]] = None, certname: Optional[str] = None,
|
||||
domains: Optional[list[str]] = None, certname: Optional[str] = None,
|
||||
lineage: Optional[storage.RenewableCert] = None
|
||||
) -> Optional[storage.RenewableCert]:
|
||||
"""Authenticate and enroll certificate.
|
||||
|
|
@ -98,9 +97,8 @@ def _get_and_save_cert(le_client: client.Client, config: configuration.Namespace
|
|||
:param config: Configuration object
|
||||
:type config: configuration.NamespaceConfig
|
||||
|
||||
:param sans: List of domain names and/or IP addresses for which to get a certificate.
|
||||
Defaults to `None`
|
||||
:type sans: `list` of `san.SAN`
|
||||
:param domains: List of domain names to get a certificate. Defaults to `None`
|
||||
:type domains: `list` of `str`
|
||||
|
||||
:param certname: Name of new certificate. Defaults to `None`
|
||||
:type certname: str
|
||||
|
|
@ -115,37 +113,37 @@ def _get_and_save_cert(le_client: client.Client, config: configuration.Namespace
|
|||
|
||||
"""
|
||||
hooks.pre_hook(config)
|
||||
renewed_sans: list[san.SAN] = []
|
||||
renewed_domains: list[str] = []
|
||||
|
||||
try:
|
||||
if lineage is not None:
|
||||
# Renewal, where we already know the specific lineage we're
|
||||
# interested in
|
||||
display_util.notify(
|
||||
"{action} for {identifiers}".format(
|
||||
"{action} for {domains}".format(
|
||||
action="Simulating renewal of an existing certificate"
|
||||
if config.dry_run else "Renewing an existing certificate",
|
||||
identifiers=internal_display_util.summarize_sans(sans or lineage.sans())
|
||||
domains=internal_display_util.summarize_domain_list(domains or lineage.names())
|
||||
)
|
||||
)
|
||||
renewal.renew_cert(config, sans, le_client, lineage)
|
||||
renewal.renew_cert(config, domains, le_client, lineage)
|
||||
else:
|
||||
# TREAT AS NEW REQUEST
|
||||
if sans is None:
|
||||
if domains is None:
|
||||
raise errors.Error("Domain list cannot be none if the lineage is not set.")
|
||||
display_util.notify(
|
||||
"{action} for {identifiers}".format(
|
||||
"{action} for {domains}".format(
|
||||
action="Simulating a certificate request" if config.dry_run else
|
||||
"Requesting a certificate",
|
||||
identifiers=internal_display_util.summarize_sans(sans)
|
||||
domains=internal_display_util.summarize_domain_list(domains)
|
||||
)
|
||||
)
|
||||
lineage = le_client.obtain_and_enroll_certificate(sans, certname)
|
||||
lineage = le_client.obtain_and_enroll_certificate(domains, certname)
|
||||
if lineage is not None:
|
||||
hooks.deploy_hook(config, lineage.sans(), lineage.live_dir)
|
||||
renewed_sans.extend(sans)
|
||||
hooks.deploy_hook(config, lineage.names(), lineage.live_dir)
|
||||
renewed_domains.extend(domains)
|
||||
finally:
|
||||
hooks.post_hook(config, renewed_sans)
|
||||
hooks.post_hook(config, renewed_domains)
|
||||
|
||||
return lineage
|
||||
|
||||
|
|
@ -198,7 +196,7 @@ def _handle_unexpected_key_type_migration(config: configuration.NamespaceConfig,
|
|||
|
||||
|
||||
def _handle_subset_cert_request(config: configuration.NamespaceConfig,
|
||||
sans: Iterable[san.SAN],
|
||||
domains: Iterable[str],
|
||||
cert: storage.RenewableCert
|
||||
) -> tuple[str, Optional[storage.RenewableCert]]:
|
||||
"""Figure out what to do if a previous cert had a subset of the names now requested
|
||||
|
|
@ -206,19 +204,20 @@ def _handle_subset_cert_request(config: configuration.NamespaceConfig,
|
|||
:param config: Configuration object
|
||||
:type config: configuration.NamespaceConfig
|
||||
|
||||
:param sans: List of domain names and/or IP addresses
|
||||
:type sans: `list` of `san.SAN`
|
||||
:param domains: List of domain names
|
||||
:type domains: `list` of `str`
|
||||
|
||||
:param cert: Certificate object
|
||||
:type cert: storage.RenewableCert
|
||||
|
||||
:returns: Tuple of (str action, cert_or_None) as per _find_lineage_for_sans_and_certname
|
||||
:returns: Tuple of (str action, cert_or_None) as per _find_lineage_for_domains_and_certname
|
||||
action can be: "newcert" | "renew" | "reinstall"
|
||||
:rtype: `tuple` of `str`
|
||||
|
||||
"""
|
||||
_handle_unexpected_key_type_migration(config, cert)
|
||||
|
||||
existing = ", ".join(cert.names())
|
||||
question = (
|
||||
"You have an existing certificate that contains a portion of "
|
||||
"the domains you requested (ref: {0}){br}{br}It contains these "
|
||||
|
|
@ -226,8 +225,8 @@ def _handle_subset_cert_request(config: configuration.NamespaceConfig,
|
|||
"certificate: {2}.{br}{br}Do you want to expand and replace this existing "
|
||||
"certificate with the new certificate?"
|
||||
).format(cert.configfile.filename,
|
||||
san.display(cert.sans()),
|
||||
san.display(sans),
|
||||
existing,
|
||||
", ".join(domains),
|
||||
br=os.linesep)
|
||||
if config.expand or config.renew_by_default or display_util.yesno(
|
||||
question, "Expand", "Cancel", cli_flag="--expand", force_interactive=True):
|
||||
|
|
@ -237,7 +236,7 @@ def _handle_subset_cert_request(config: configuration.NamespaceConfig,
|
|||
"replacing your existing certificate for {0}, you must use the "
|
||||
"--duplicate option.{br}{br}"
|
||||
"For example:{br}{br}{1} --duplicate {2}".format(
|
||||
san.display(cert.sans()),
|
||||
existing,
|
||||
cli.cli_command, " ".join(sys.argv[1:]),
|
||||
br=os.linesep
|
||||
))
|
||||
|
|
@ -255,7 +254,7 @@ def _handle_identical_cert_request(config: configuration.NamespaceConfig,
|
|||
:param lineage: Certificate lineage object
|
||||
:type lineage: storage.RenewableCert
|
||||
|
||||
:returns: Tuple of (str action, cert_or_None) as per _find_lineage_for_sans_and_certname
|
||||
:returns: Tuple of (str action, cert_or_None) as per _find_lineage_for_domains_and_certname
|
||||
action can be: "newcert" | "renew" | "reinstall"
|
||||
:rtype: `tuple` of `str`
|
||||
|
||||
|
|
@ -304,8 +303,8 @@ def _handle_identical_cert_request(config: configuration.NamespaceConfig,
|
|||
raise AssertionError('This is impossible')
|
||||
|
||||
|
||||
def _find_lineage_for_sans(config: configuration.NamespaceConfig, sans: list[san.SAN]
|
||||
) -> tuple[Optional[str], Optional[storage.RenewableCert]]:
|
||||
def _find_lineage_for_domains(config: configuration.NamespaceConfig, domains: list[str]
|
||||
) -> tuple[Optional[str], Optional[storage.RenewableCert]]:
|
||||
"""Determine whether there are duplicated names and how to handle
|
||||
them (renew, reinstall, newcert, or raising an error to stop
|
||||
the client run if the user chooses to cancel the operation when
|
||||
|
|
@ -314,8 +313,8 @@ def _find_lineage_for_sans(config: configuration.NamespaceConfig, sans: list[san
|
|||
:param config: Configuration object
|
||||
:type config: configuration.NamespaceConfig
|
||||
|
||||
:param sans: List of domain names and/or IP addresses
|
||||
:type sans: `list` of `san.SAN`
|
||||
:param domains: List of domain names
|
||||
:type domains: `list` of `str`
|
||||
|
||||
:returns: Two-element tuple containing desired new-certificate behavior as
|
||||
a string token ("reinstall", "renew", or "newcert"), plus either
|
||||
|
|
@ -332,7 +331,7 @@ def _find_lineage_for_sans(config: configuration.NamespaceConfig, sans: list[san
|
|||
if config.duplicate:
|
||||
return "newcert", None
|
||||
# TODO: Also address superset case
|
||||
ident_names_cert, subset_names_cert = cert_manager.find_duplicative_certs(config, sans)
|
||||
ident_names_cert, subset_names_cert = cert_manager.find_duplicative_certs(config, domains)
|
||||
# XXX ^ schoen is not sure whether that correctly reads the systemwide
|
||||
# configuration file.
|
||||
if ident_names_cert is None and subset_names_cert is None:
|
||||
|
|
@ -341,19 +340,19 @@ def _find_lineage_for_sans(config: configuration.NamespaceConfig, sans: list[san
|
|||
if ident_names_cert is not None:
|
||||
return _handle_identical_cert_request(config, ident_names_cert)
|
||||
elif subset_names_cert is not None:
|
||||
return _handle_subset_cert_request(config, sans, subset_names_cert)
|
||||
return _handle_subset_cert_request(config, domains, subset_names_cert)
|
||||
return None, None
|
||||
|
||||
|
||||
def _find_cert(config: configuration.NamespaceConfig, sans: list[san.SAN], certname: str
|
||||
def _find_cert(config: configuration.NamespaceConfig, domains: list[str], certname: str
|
||||
) -> tuple[bool, Optional[storage.RenewableCert]]:
|
||||
"""Finds an existing certificate object given domains and/or a certificate name.
|
||||
|
||||
:param config: Configuration object
|
||||
:type config: configuration.NamespaceConfig
|
||||
|
||||
:param sans: List of domain names and/or IP addresses
|
||||
:type sans: `list` of `san.SAN`
|
||||
:param domains: List of domain names
|
||||
:type domains: `list` of `str`
|
||||
|
||||
:param certname: Name of certificate
|
||||
:type certname: str
|
||||
|
|
@ -364,22 +363,22 @@ def _find_cert(config: configuration.NamespaceConfig, sans: list[san.SAN], certn
|
|||
:rtype: `tuple` of `bool` and :class:`storage.RenewableCert` or `None`
|
||||
|
||||
"""
|
||||
action, lineage = _find_lineage_for_sans_and_certname(config, sans, certname)
|
||||
action, lineage = _find_lineage_for_domains_and_certname(config, domains, certname)
|
||||
if action == "reinstall":
|
||||
logger.info("Keeping the existing certificate")
|
||||
return (action != "reinstall"), lineage
|
||||
|
||||
|
||||
def _find_lineage_for_sans_and_certname(
|
||||
config: configuration.NamespaceConfig, sans: list[san.SAN],
|
||||
def _find_lineage_for_domains_and_certname(
|
||||
config: configuration.NamespaceConfig, domains: list[str],
|
||||
certname: str) -> tuple[Optional[str], Optional[storage.RenewableCert]]:
|
||||
"""Find appropriate lineage based on given domains and/or certname.
|
||||
|
||||
:param config: Configuration object
|
||||
:type config: configuration.NamespaceConfig
|
||||
|
||||
:param sans: List of domain names and/or IP addresses
|
||||
:type sans: `list` of `san.SAN`
|
||||
:param domains: List of domain names
|
||||
:type domains: `list` of `str`
|
||||
|
||||
:param certname: Name of certificate
|
||||
:type certname: str
|
||||
|
|
@ -394,19 +393,19 @@ def _find_lineage_for_sans_and_certname(
|
|||
|
||||
"""
|
||||
if not certname:
|
||||
return _find_lineage_for_sans(config, sans)
|
||||
return _find_lineage_for_domains(config, domains)
|
||||
lineage = cert_manager.lineage_for_certname(config, certname)
|
||||
if lineage:
|
||||
if sans:
|
||||
computed_domains = cert_manager.sans_for_certname(config, certname)
|
||||
if computed_domains and set(computed_domains) != set(sans):
|
||||
if domains:
|
||||
computed_domains = cert_manager.domains_for_certname(config, certname)
|
||||
if computed_domains and set(computed_domains) != set(domains):
|
||||
_handle_unexpected_key_type_migration(config, lineage)
|
||||
_ask_user_to_confirm_new_sans(config, sans, certname,
|
||||
lineage.sans()) # raises if no
|
||||
_ask_user_to_confirm_new_names(config, domains, certname,
|
||||
lineage.names()) # raises if no
|
||||
return "renew", lineage
|
||||
# unnecessarily specified domains or no domains specified
|
||||
return _handle_identical_cert_request(config, lineage)
|
||||
elif sans:
|
||||
elif domains:
|
||||
return "newcert", None
|
||||
raise errors.ConfigurationError("No certificate with name {0} found. "
|
||||
"Use -d to specify domains, or run certbot certificates to see "
|
||||
|
|
@ -440,23 +439,22 @@ def _format_list(character: str, strings: Iterable[str]) -> str:
|
|||
)
|
||||
|
||||
|
||||
def _ask_user_to_confirm_new_sans(config: configuration.NamespaceConfig,
|
||||
new_sans: Iterable[san.SAN],
|
||||
certname: str,
|
||||
old_sans: Iterable[san.SAN]) -> None:
|
||||
"""Ask user to confirm update cert certname to contain new_sans.
|
||||
def _ask_user_to_confirm_new_names(config: configuration.NamespaceConfig,
|
||||
new_domains: Iterable[str], certname: str,
|
||||
old_domains: Iterable[str]) -> None:
|
||||
"""Ask user to confirm update cert certname to contain new_domains.
|
||||
|
||||
:param config: Configuration object
|
||||
:type config: configuration.NamespaceConfig
|
||||
|
||||
:param new_sans: List of new domain names and/or IP addresses
|
||||
:type new_sans: `list` of `san.SAN`
|
||||
:param new_domains: List of new domain names
|
||||
:type new_domains: `list` of `str`
|
||||
|
||||
:param certname: Name of certificate
|
||||
:type certname: str
|
||||
|
||||
:param old_sans: List of old domain names and/or IP addresses
|
||||
:type old_sans: `list` of `san.SAN`
|
||||
:param old_domains: List of old domain names
|
||||
:type old_domains: `list` of `str`
|
||||
|
||||
:returns: None
|
||||
:rtype: None
|
||||
|
|
@ -467,7 +465,7 @@ def _ask_user_to_confirm_new_sans(config: configuration.NamespaceConfig,
|
|||
if config.renew_with_new_domains:
|
||||
return
|
||||
|
||||
added, removed = _get_added_removed(map(str, new_sans), map(str, old_sans))
|
||||
added, removed = _get_added_removed(new_domains, old_domains)
|
||||
|
||||
msg = ("You are updating certificate {0} to include new domain(s): {1}{br}{br}"
|
||||
"You are also removing previously included domain(s): {2}{br}{br}"
|
||||
|
|
@ -480,9 +478,9 @@ def _ask_user_to_confirm_new_sans(config: configuration.NamespaceConfig,
|
|||
raise errors.ConfigurationError("Specified mismatched certificate name and domains.")
|
||||
|
||||
|
||||
def _find_sans_or_certname(config: configuration.NamespaceConfig,
|
||||
installer: Optional[interfaces.Installer],
|
||||
question: Optional[str] = None) -> tuple[list[san.SAN], str]:
|
||||
def _find_domains_or_certname(config: configuration.NamespaceConfig,
|
||||
installer: Optional[interfaces.Installer],
|
||||
question: Optional[str] = None) -> tuple[list[str], str]:
|
||||
"""Retrieve domains and certname from config or user input.
|
||||
|
||||
:param config: Configuration object
|
||||
|
|
@ -500,29 +498,27 @@ def _find_sans_or_certname(config: configuration.NamespaceConfig,
|
|||
:raises errors.Error: Usage message, if parameters are not used correctly
|
||||
|
||||
"""
|
||||
domains = None
|
||||
certname = config.certname
|
||||
sans: Optional[list[san.SAN]] = None
|
||||
|
||||
# first, try to get domains from the config
|
||||
if config.domains:
|
||||
sans = config.domains
|
||||
|
||||
# if we can't do that but we have a certname, get the sans
|
||||
# by loading the latest certificate with that certname
|
||||
if certname and not sans:
|
||||
sans = cert_manager.sans_for_certname(config, certname)
|
||||
domains = config.domains
|
||||
# if we can't do that but we have a certname, get the domains
|
||||
# with that certname
|
||||
elif certname:
|
||||
domains = cert_manager.domains_for_certname(config, certname)
|
||||
|
||||
# that certname might not have existed, or there was a problem.
|
||||
# try to get domains from the user.
|
||||
if not sans:
|
||||
sans = san.guess(display_ops.choose_names(installer, question))
|
||||
if not domains:
|
||||
domains = display_ops.choose_names(installer, question)
|
||||
|
||||
if not sans:
|
||||
if not domains and not certname:
|
||||
raise errors.Error("Please specify --domains, or --installer that "
|
||||
"will help in domain names autodiscovery, or "
|
||||
"--cert-name for an existing certificate name.")
|
||||
|
||||
return sans, certname
|
||||
return domains, certname
|
||||
|
||||
|
||||
def _report_next_steps(config: configuration.NamespaceConfig, installer_err: Optional[errors.Error],
|
||||
|
|
@ -1039,7 +1035,7 @@ def _cert_name_from_config_or_lineage(config: configuration.NamespaceConfig,
|
|||
|
||||
|
||||
def _install_cert(config: configuration.NamespaceConfig, le_client: client.Client,
|
||||
sans: list[san.SAN], lineage: Optional[storage.RenewableCert] = None) -> None:
|
||||
domains: list[str], lineage: Optional[storage.RenewableCert] = None) -> None:
|
||||
"""Install a cert
|
||||
|
||||
:param config: Configuration object
|
||||
|
|
@ -1048,8 +1044,8 @@ def _install_cert(config: configuration.NamespaceConfig, le_client: client.Clien
|
|||
:param le_client: Client object
|
||||
:type le_client: client.Client
|
||||
|
||||
:param sans: List of domains
|
||||
:type sans: `list` of `str`
|
||||
:param domains: List of domains
|
||||
:type domains: `list` of `str`
|
||||
|
||||
:param lineage: Certificate lineage object. Defaults to `None`
|
||||
:type lineage: storage.RenewableCert
|
||||
|
|
@ -1062,16 +1058,8 @@ def _install_cert(config: configuration.NamespaceConfig, le_client: client.Clien
|
|||
configuration.NamespaceConfig] = lineage if lineage else config
|
||||
assert path_provider.cert_path is not None
|
||||
|
||||
domains, ip_addresses = san.split(sans)
|
||||
if len(ip_addresses) > 0:
|
||||
# Our apache and nginx plugins are currently relying on this check for a user friendly error
|
||||
# message about their lack of support for IP certificates. If you're removing this check,
|
||||
# please check that the plugins can process IP addresses.
|
||||
raise errors.ConfigurationError("Enhancements not supported for IP address certificates")
|
||||
|
||||
le_client.deploy_certificate(domains, path_provider.key_path, path_provider.cert_path,
|
||||
path_provider.chain_path, path_provider.fullchain_path)
|
||||
|
||||
le_client.enhance_config(domains, path_provider.chain_path)
|
||||
|
||||
|
||||
|
|
@ -1120,9 +1108,9 @@ def install(config: configuration.NamespaceConfig,
|
|||
|
||||
if config.key_path and config.cert_path:
|
||||
_check_certificate_and_key(config)
|
||||
sans, _ = _find_sans_or_certname(config, installer)
|
||||
domains, _ = _find_domains_or_certname(config, installer)
|
||||
le_client = _init_le_client(config, authenticator=None, installer=installer)
|
||||
_install_cert(config, le_client, sans)
|
||||
_install_cert(config, le_client, domains)
|
||||
else:
|
||||
raise errors.ConfigurationError("Path to certificate or key was not defined. "
|
||||
"If your certificate is managed by Certbot, please use --cert-name "
|
||||
|
|
@ -1131,11 +1119,7 @@ def install(config: configuration.NamespaceConfig,
|
|||
if enhancements.are_requested(config):
|
||||
# In the case where we don't have certname, we have errored out already
|
||||
lineage = cert_manager.lineage_for_certname(config, config.certname)
|
||||
domains, ip_addresses = san.split(sans)
|
||||
if ip_addresses:
|
||||
raise TypeError("enhancements not supported for IP address certificates")
|
||||
domains_str = [d.dns_name for d in domains]
|
||||
enhancements.enable(lineage, domains_str, installer, config)
|
||||
enhancements.enable(lineage, domains, installer, config)
|
||||
|
||||
return None
|
||||
|
||||
|
|
@ -1242,26 +1226,18 @@ def enhance(config: configuration.NamespaceConfig,
|
|||
config.certname = cert_manager.get_certnames(
|
||||
config, "enhance", allow_multiple=False,
|
||||
custom_prompt=certname_question)[0]
|
||||
cert_sans = cert_manager.sans_for_certname(config, config.certname)
|
||||
if cert_sans is None:
|
||||
cert_domains = cert_manager.domains_for_certname(config, config.certname)
|
||||
if cert_domains is None:
|
||||
raise errors.Error("Could not find the list of domains for the given certificate name.")
|
||||
cert_domains, ip_addresses = san.split(cert_sans)
|
||||
if len(ip_addresses) > 0:
|
||||
# Our apache and nginx plugins are currently relying on this check for a user friendly error
|
||||
# message about their lack of support for IP certificates. If you're removing this check,
|
||||
# please check that the plugins can process IP addresses.
|
||||
raise errors.ConfigurationError("Enhancements not supported for IP address certificates")
|
||||
|
||||
if config.noninteractive_mode:
|
||||
domains = cert_domains
|
||||
else:
|
||||
domain_question = ("Which domain names would you like to enable the "
|
||||
"selected enhancements for?")
|
||||
domain_strs = display_ops.choose_values(list(map(str, cert_domains)), domain_question)
|
||||
if not domain_strs:
|
||||
domains = display_ops.choose_values(cert_domains, domain_question)
|
||||
if not domains:
|
||||
raise errors.Error("User cancelled the domain selection. No domains "
|
||||
"defined, exiting.")
|
||||
domains = list(map(san.DNSName, domain_strs))
|
||||
|
||||
lineage = cert_manager.lineage_for_certname(config, config.certname)
|
||||
if not lineage:
|
||||
|
|
@ -1272,7 +1248,7 @@ def enhance(config: configuration.NamespaceConfig,
|
|||
le_client = _init_le_client(config, authenticator=None, installer=installer)
|
||||
le_client.enhance_config(domains, config.chain_path, redirect_default=False)
|
||||
if enhancements.are_requested(config):
|
||||
enhancements.enable(lineage, [d.dns_name for d in domains], installer, config)
|
||||
enhancements.enable(lineage, domains, installer, config)
|
||||
|
||||
return None
|
||||
|
||||
|
|
@ -1428,17 +1404,12 @@ def run(config: configuration.NamespaceConfig,
|
|||
# TODO: Handle errors from _init_le_client?
|
||||
le_client = _init_le_client(config, authenticator, installer)
|
||||
|
||||
sans, certname = _find_sans_or_certname(config, installer)
|
||||
|
||||
domains, ip_addresses = san.split(sans)
|
||||
if ip_addresses:
|
||||
raise errors.Error("installation of IP address certificate not supported")
|
||||
|
||||
should_get_cert, lineage = _find_cert(config, sans, certname)
|
||||
domains, certname = _find_domains_or_certname(config, installer)
|
||||
should_get_cert, lineage = _find_cert(config, domains, certname)
|
||||
|
||||
new_lineage = lineage
|
||||
if should_get_cert:
|
||||
new_lineage = _get_and_save_cert(le_client, config, sans,
|
||||
new_lineage = _get_and_save_cert(le_client, config, domains,
|
||||
certname, lineage)
|
||||
|
||||
cert_path = new_lineage.cert_path if new_lineage else None
|
||||
|
|
@ -1452,16 +1423,15 @@ def run(config: configuration.NamespaceConfig,
|
|||
# relevant advice in a nice way, before re-raising the error for normal processing.
|
||||
installer_err: Optional[errors.Error] = None
|
||||
try:
|
||||
_install_cert(config, le_client, sans, new_lineage)
|
||||
_install_cert(config, le_client, domains, new_lineage)
|
||||
|
||||
if enhancements.are_requested(config) and new_lineage:
|
||||
enhancements.enable(new_lineage, [d.dns_name for d in domains], installer, config)
|
||||
enhancements.enable(new_lineage, domains, installer, config)
|
||||
|
||||
sans_strs = list(map(str, sans))
|
||||
if lineage is None or not should_get_cert:
|
||||
display_ops.success_installation(sans_strs)
|
||||
display_ops.success_installation(domains)
|
||||
else:
|
||||
display_ops.success_renewal(sans_strs)
|
||||
display_ops.success_renewal(domains)
|
||||
except errors.Error as e:
|
||||
installer_err = e
|
||||
finally:
|
||||
|
|
@ -1496,17 +1466,16 @@ def _csr_get_and_save_cert(config: configuration.NamespaceConfig,
|
|||
:rtype: `tuple` of `str`
|
||||
|
||||
"""
|
||||
util_csr, _ = config.actual_csr
|
||||
x509_req = x509.load_pem_x509_csr(util_csr.data)
|
||||
domains, ip_addresses = san.from_x509(x509_req.subject, x509_req.extensions)
|
||||
csr, _ = config.actual_csr
|
||||
csr_names = crypto_util.get_names_from_req(csr.data)
|
||||
display_util.notify(
|
||||
"{action} for {sans}".format(
|
||||
"{action} for {domains}".format(
|
||||
action="Simulating a certificate request" if config.dry_run else
|
||||
"Requesting a certificate",
|
||||
sans=internal_display_util.summarize_sans(san.join(domains, ip_addresses))
|
||||
domains=internal_display_util.summarize_domain_list(csr_names)
|
||||
)
|
||||
)
|
||||
cert, chain = le_client.obtain_certificate_from_csr(util_csr)
|
||||
cert, chain = le_client.obtain_certificate_from_csr(csr)
|
||||
if config.dry_run:
|
||||
logger.debug(
|
||||
"Dry run: skipping saving certificate to %s", config.cert_path)
|
||||
|
|
@ -1582,15 +1551,15 @@ def certonly(config: configuration.NamespaceConfig, plugins: plugins_disco.Plugi
|
|||
eff.handle_subscription(config, le_client.account)
|
||||
return
|
||||
|
||||
sans, certname = _find_sans_or_certname(config, installer)
|
||||
should_get_cert, lineage = _find_cert(config, sans, certname)
|
||||
domains, certname = _find_domains_or_certname(config, installer)
|
||||
should_get_cert, lineage = _find_cert(config, domains, certname)
|
||||
|
||||
if not should_get_cert:
|
||||
display_util.notification("Certificate not yet due for renewal; no action taken.",
|
||||
pause=False)
|
||||
return
|
||||
|
||||
lineage = _get_and_save_cert(le_client, config, sans, certname, lineage)
|
||||
lineage = _get_and_save_cert(le_client, config, domains, certname, lineage)
|
||||
|
||||
# If a new cert was issued and we were passed an installer, we can safely
|
||||
# run `installer.restart()` to load the newly issued certificate
|
||||
|
|
|
|||
|
|
@ -298,7 +298,7 @@ class _WebrootMapAction(argparse.Action):
|
|||
for domains, webroot_path in json.loads(str(webroot_map)).items():
|
||||
webroot_path = _validate_webroot(webroot_path)
|
||||
namespace.webroot_map.update(
|
||||
(d.dns_name, webroot_path) for d in cli.add_domains(namespace, domains))
|
||||
(d, webroot_path) for d in cli.add_domains(namespace, domains))
|
||||
|
||||
|
||||
class _WebrootPathAction(argparse.Action):
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
"""Functionality for autorenewal and associated juggling of configurations"""
|
||||
|
||||
import configobj
|
||||
import copy
|
||||
import datetime
|
||||
|
|
@ -30,7 +31,6 @@ from certbot._internal import cli
|
|||
from certbot._internal import client
|
||||
from certbot._internal import constants
|
||||
from certbot._internal import hooks
|
||||
from certbot._internal import san
|
||||
from certbot._internal import storage
|
||||
from certbot._internal import updater
|
||||
from certbot._internal.display import obj as display_obj
|
||||
|
|
@ -148,8 +148,8 @@ def reconstitute(config: configuration.NamespaceConfig,
|
|||
return None
|
||||
|
||||
try:
|
||||
domains, _ = san.split(renewal_candidate.sans())
|
||||
config.domains = domains
|
||||
config.domains = [util.enforce_domain_sanity(d)
|
||||
for d in renewal_candidate.names()]
|
||||
except errors.ConfigurationError as error:
|
||||
logger.error("Renewal configuration file %s references a certificate "
|
||||
"that contains an invalid domain name. The problem "
|
||||
|
|
@ -494,7 +494,7 @@ def _avoid_invalidating_lineage(config: configuration.NamespaceConfig,
|
|||
if util.is_staging(config.server):
|
||||
if not util.is_staging(original_server):
|
||||
if not config.break_my_certs:
|
||||
names = san.display(lineage.sans())
|
||||
names = ", ".join(lineage.names())
|
||||
raise errors.Error(
|
||||
"You've asked to renew/replace a seemingly valid certificate with "
|
||||
f"a test certificate (domains: {names}). We will not do that "
|
||||
|
|
@ -545,15 +545,15 @@ def _avoid_reuse_key_conflicts(config: configuration.NamespaceConfig,
|
|||
"add --new-key.")
|
||||
|
||||
|
||||
def renew_cert(config: configuration.NamespaceConfig, sans: Optional[list[san.SAN]],
|
||||
def renew_cert(config: configuration.NamespaceConfig, domains: Optional[list[str]],
|
||||
le_client: client.Client, lineage: storage.RenewableCert) -> None:
|
||||
"""Renew a certificate lineage."""
|
||||
renewal_params = lineage.configuration["renewalparams"]
|
||||
original_server = renewal_params.get("server", cli.flag_default("server"))
|
||||
_avoid_invalidating_lineage(config, lineage, original_server)
|
||||
_avoid_reuse_key_conflicts(config, lineage)
|
||||
if not sans:
|
||||
sans = lineage.sans()
|
||||
if not domains:
|
||||
domains = lineage.names()
|
||||
# The private key is the existing lineage private key if reuse_key is set.
|
||||
# Otherwise, generate a fresh private key by passing None.
|
||||
if config.reuse_key and not config.new_key:
|
||||
|
|
@ -561,7 +561,7 @@ def renew_cert(config: configuration.NamespaceConfig, sans: Optional[list[san.SA
|
|||
_update_renewal_params_from_key(new_key, config)
|
||||
else:
|
||||
new_key = None
|
||||
new_cert, new_chain, new_key, _ = le_client.obtain_certificate(sans, new_key)
|
||||
new_cert, new_chain, new_key, _ = le_client.obtain_certificate(domains, new_key)
|
||||
if config.dry_run:
|
||||
logger.debug("Dry run: skipping updating lineage at %s", os.path.dirname(lineage.cert))
|
||||
else:
|
||||
|
|
@ -571,7 +571,7 @@ def renew_cert(config: configuration.NamespaceConfig, sans: Optional[list[san.SA
|
|||
lineage.update_all_links_to(lineage.latest_common_version())
|
||||
lineage.truncate()
|
||||
|
||||
hooks.renew_hook(config, sans, lineage.live_dir)
|
||||
hooks.renew_hook(config, domains, lineage.live_dir)
|
||||
|
||||
|
||||
def report(msgs: Iterable[str], category: str) -> str:
|
||||
|
|
@ -632,7 +632,7 @@ def handle_renewal_request(config: configuration.NamespaceConfig) -> None:
|
|||
"""Examine each lineage; renew if due and report results"""
|
||||
|
||||
# This is trivially False if config.domains is empty
|
||||
if any(domain.dns_name not in config.webroot_map for domain in config.domains):
|
||||
if any(domain not in config.webroot_map for domain in config.domains):
|
||||
# If more plugins start using cli.add_domains,
|
||||
# we may want to only log a warning here
|
||||
raise errors.Error("Currently, the renew verb is capable of either "
|
||||
|
|
@ -710,7 +710,7 @@ def handle_renewal_request(config: configuration.NamespaceConfig) -> None:
|
|||
# and we have a lineage in renewal_candidate
|
||||
main.renew_cert(lineage_config, plugins, renewal_candidate)
|
||||
renew_successes.append(renewal_candidate.fullchain)
|
||||
renewed_domains.extend(renewal_candidate.sans())
|
||||
renewed_domains.extend(renewal_candidate.names())
|
||||
else:
|
||||
expiry = crypto_util.notAfter(renewal_candidate.version(
|
||||
"cert", renewal_candidate.latest_common_version()))
|
||||
|
|
@ -721,7 +721,7 @@ def handle_renewal_request(config: configuration.NamespaceConfig) -> None:
|
|||
plugins)
|
||||
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
# obtain_certificate (presumably) encountered an unanticipated problem.
|
||||
# obtain_cert (presumably) encountered an unanticipated problem.
|
||||
logger.error(
|
||||
"Failed to renew certificate %s with error: %s",
|
||||
lineagename, e
|
||||
|
|
@ -729,7 +729,7 @@ def handle_renewal_request(config: configuration.NamespaceConfig) -> None:
|
|||
logger.debug("Traceback was:\n%s", traceback.format_exc())
|
||||
if renewal_candidate:
|
||||
renew_failures.append(renewal_candidate.fullchain)
|
||||
failed_domains.extend(renewal_candidate.sans())
|
||||
failed_domains.extend(renewal_candidate.names())
|
||||
|
||||
# Describe all the results
|
||||
_renew_describe_results(config, renew_successes, renew_failures,
|
||||
|
|
|
|||
|
|
@ -123,16 +123,6 @@ class IPAddress(SAN):
|
|||
"""Always False."""
|
||||
return False
|
||||
|
||||
def guess(names: Iterable[str]) -> list[SAN]:
|
||||
"""Turn a list of strings in to a list of SANs based on how they parse."""
|
||||
sans: list[SAN] = []
|
||||
for name in names:
|
||||
try:
|
||||
sans.append(IPAddress(name))
|
||||
except ValueError:
|
||||
sans.append(DNSName(name))
|
||||
return sans
|
||||
|
||||
def split(sans: Iterable[SAN]) -> tuple[list[DNSName], list[IPAddress]]:
|
||||
"""Split a list of SANs into a list of DNSNames and one of IPAddress, in that order."""
|
||||
domains = []
|
||||
|
|
@ -147,10 +137,6 @@ def split(sans: Iterable[SAN]) -> tuple[list[DNSName], list[IPAddress]]:
|
|||
raise TypeError(f"SAN of type {type(s)}")
|
||||
return domains, ip_addresses
|
||||
|
||||
def join(dns_names: Iterable[DNSName], ip_addresses: Iterable[IPAddress]) -> list[SAN]:
|
||||
"""Combine a list of DNS names and a list of IP addresses."""
|
||||
return list(dns_names) + list(ip_addresses)
|
||||
|
||||
def display(sans: Iterable[SAN]) -> str:
|
||||
"""Return the list of SANs in string form, separated by comma and space."""
|
||||
return ", ".join(map(str, sans))
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ from typing import Optional
|
|||
from typing import Union
|
||||
|
||||
import configobj
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
|
||||
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
|
||||
|
|
@ -29,7 +28,6 @@ from certbot import interfaces
|
|||
from certbot import ocsp
|
||||
from certbot import util
|
||||
from certbot._internal import error_handler
|
||||
from certbot._internal import san
|
||||
from certbot._internal.plugins import disco as plugins_disco
|
||||
from certbot.compat import filesystem
|
||||
from certbot.compat import os
|
||||
|
|
@ -915,30 +913,18 @@ class RenewableCert(interfaces.RenewableCert):
|
|||
os.unlink(link)
|
||||
|
||||
def names(self) -> list[str]:
|
||||
"""Return the DNS names and IP addresses from this certificate as strings.
|
||||
"""What are the subject names of this certificate?
|
||||
|
||||
:returns: the subject names
|
||||
:rtype: `list` of `str`
|
||||
:raises .CertStorageError: if could not find cert file.
|
||||
"""
|
||||
return list(map(str, self.sans()))
|
||||
|
||||
def sans(self) -> list[san.SAN]:
|
||||
"""Return the DNS names and IP addresses from this certificate as SAN objects.
|
||||
|
||||
:returns: the subject names
|
||||
:rtype: `list` of `san.SAN`
|
||||
:raises .CertStorageError: if could not find cert file.
|
||||
|
||||
"""
|
||||
target = self.current_target("cert")
|
||||
if target is None:
|
||||
raise errors.CertStorageError("could not find the certificate file")
|
||||
with open(target, "rb") as f:
|
||||
cert_bytes = f.read()
|
||||
x509_cert = x509.load_pem_x509_certificate(cert_bytes)
|
||||
dns_names, ip_addrs = san.from_x509(x509_cert.subject, x509_cert.extensions)
|
||||
return cast(list[san.SAN], dns_names + ip_addrs)
|
||||
return crypto_util.get_names_from_cert(f.read())
|
||||
|
||||
def ocsp_revoked(self, version: int) -> bool:
|
||||
"""Is the specified cert version revoked according to OCSP?
|
||||
|
|
|
|||
|
|
@ -13,7 +13,6 @@ import pytest
|
|||
|
||||
from certbot import configuration
|
||||
from certbot import errors
|
||||
from certbot._internal import san
|
||||
from certbot._internal.storage import ALL_FOUR
|
||||
from certbot._internal.tests import storage_test
|
||||
from certbot.compat import filesystem
|
||||
|
|
@ -219,12 +218,11 @@ class CertificatesTest(BaseCertManagerTest):
|
|||
|
||||
cert = mock.MagicMock(lineagename="nameone")
|
||||
cert.target_expiry = expiry
|
||||
cert.sans.return_value = [san.DNSName("nameone"), san.DNSName("nametwo")]
|
||||
cert.names.return_value = ["nameone", "nametwo"]
|
||||
cert.is_test_cert = False
|
||||
parsed_certs = [cert]
|
||||
|
||||
mock_config = mock.MagicMock(certname=None, lineagename=None)
|
||||
mock_config.domains = []
|
||||
# pylint: disable=protected-access
|
||||
|
||||
# pylint: disable=protected-access
|
||||
|
|
@ -263,16 +261,16 @@ class CertificatesTest(BaseCertManagerTest):
|
|||
|
||||
cert = mock.MagicMock(lineagename="indescribable")
|
||||
cert.target_expiry = expiry
|
||||
cert.sans.return_value = [san.DNSName("nameone"), san.DNSName("thrice.named")]
|
||||
cert.names.return_value = ["nameone", "thrice.named"]
|
||||
cert.is_test_cert = True
|
||||
parsed_certs.append(cert)
|
||||
|
||||
out = get_report()
|
||||
assert len(re.findall("INVALID:", out)) == 2
|
||||
mock_config.domains = [san.DNSName("thrice.named")]
|
||||
mock_config.domains = ["thrice.named"]
|
||||
out = get_report()
|
||||
assert len(re.findall("INVALID:", out)) == 1
|
||||
mock_config.domains = [san.DNSName("nameone")]
|
||||
mock_config.domains = ["nameone"]
|
||||
out = get_report()
|
||||
assert len(re.findall("INVALID:", out)) == 2
|
||||
mock_config.certname = "indescribable"
|
||||
|
|
@ -333,7 +331,7 @@ class LineageForCertnameTest(BaseCertManagerTest):
|
|||
|
||||
|
||||
class DomainsForCertnameTest(BaseCertManagerTest):
|
||||
"""Tests for certbot._internal.cert_manager.sans_for_certname"""
|
||||
"""Tests for certbot._internal.cert_manager.domains_for_certname"""
|
||||
|
||||
@mock.patch('certbot.util.make_or_verify_dir')
|
||||
@mock.patch('certbot._internal.storage.renewal_file_for_certname')
|
||||
|
|
@ -342,12 +340,12 @@ class DomainsForCertnameTest(BaseCertManagerTest):
|
|||
mock_make_or_verify_dir):
|
||||
mock_renewal_conf_file.return_value = "somefile.conf"
|
||||
mock_match = mock.Mock(lineagename="example.com")
|
||||
domains = [san.DNSName("example.com"), san.DNSName("example.org")]
|
||||
mock_match.sans.return_value = domains
|
||||
domains = ["example.com", "example.org"]
|
||||
mock_match.names.return_value = domains
|
||||
mock_renewable_cert.return_value = mock_match
|
||||
from certbot._internal import cert_manager
|
||||
assert cert_manager.sans_for_certname(self.config, "example.com") == \
|
||||
domains
|
||||
assert cert_manager.domains_for_certname(self.config, "example.com") == \
|
||||
domains
|
||||
assert mock_make_or_verify_dir.called
|
||||
|
||||
@mock.patch('certbot.util.make_or_verify_dir')
|
||||
|
|
@ -355,7 +353,7 @@ class DomainsForCertnameTest(BaseCertManagerTest):
|
|||
def test_no_match(self, mock_renewal_conf_file, mock_make_or_verify_dir):
|
||||
mock_renewal_conf_file.return_value = "somefile.conf"
|
||||
from certbot._internal import cert_manager
|
||||
assert cert_manager.sans_for_certname(self.config, "other.com") is None
|
||||
assert cert_manager.domains_for_certname(self.config, "other.com") is None
|
||||
assert mock_make_or_verify_dir.called
|
||||
|
||||
|
||||
|
|
@ -376,25 +374,24 @@ class DuplicativeCertsTest(storage_test.BaseRenewableCertTest):
|
|||
|
||||
# No overlap at all
|
||||
result = find_duplicative_certs(
|
||||
self.config, [san.DNSName('wow.net'), san.DNSName('hooray.org')])
|
||||
self.config, ['wow.net', 'hooray.org'])
|
||||
assert result == (None, None)
|
||||
|
||||
# Totally identical
|
||||
result = find_duplicative_certs(
|
||||
self.config, [san.DNSName('example.com'), san.DNSName('www.example.com')])
|
||||
self.config, ['example.com', 'www.example.com'])
|
||||
assert result[0].configfile.filename.endswith('example.org.conf')
|
||||
assert result[1] is None
|
||||
|
||||
# Superset
|
||||
result = find_duplicative_certs(
|
||||
self.config, [san.DNSName('example.com'), san.DNSName('www.example.com'),
|
||||
san.DNSName('something.new')])
|
||||
self.config, ['example.com', 'www.example.com', 'something.new'])
|
||||
assert result[0] is None
|
||||
assert result[1].configfile.filename.endswith('example.org.conf')
|
||||
|
||||
# Partial overlap doesn't count
|
||||
result = find_duplicative_certs(
|
||||
self.config, [san.DNSName('example.com'), san.DNSName('something.new')])
|
||||
self.config, ['example.com', 'something.new'])
|
||||
assert result == (None, None)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@ from certbot import errors
|
|||
from certbot.configuration import ArgumentSource, NamespaceConfig
|
||||
from certbot._internal import cli
|
||||
from certbot._internal import constants
|
||||
from certbot._internal import san
|
||||
from certbot._internal.cli.cli_utils import flag_default
|
||||
from certbot._internal.plugins import disco
|
||||
from certbot.compat import filesystem
|
||||
|
|
@ -129,7 +128,7 @@ class ParseTest(unittest.TestCase):
|
|||
with open(tmp_config.name, 'w') as file_h:
|
||||
file_h.write("domains = example.com")
|
||||
namespace = self.parse(["certonly"])
|
||||
assert_value_and_source(namespace, 'domains', [san.DNSName("example.com")], ArgumentSource.CONFIG_FILE)
|
||||
assert_value_and_source(namespace, 'domains', ["example.com"], ArgumentSource.CONFIG_FILE)
|
||||
namespace = self.parse(["renew"])
|
||||
assert_value_and_source(namespace, 'domains', [], ArgumentSource.RUNTIME)
|
||||
|
||||
|
|
@ -235,28 +234,28 @@ class ParseTest(unittest.TestCase):
|
|||
def test_parse_domains(self):
|
||||
short_args = ['-d', 'example.com']
|
||||
namespace = self.parse(short_args)
|
||||
assert_set_by_user_with_value(namespace, 'domains', [san.DNSName('example.com')])
|
||||
assert_set_by_user_with_value(namespace, 'domains', ['example.com'])
|
||||
|
||||
short_args = ['-d', 'trailing.period.com.']
|
||||
namespace = self.parse(short_args)
|
||||
assert_set_by_user_with_value(namespace, 'domains', [san.DNSName('trailing.period.com')])
|
||||
assert_set_by_user_with_value(namespace, 'domains', ['trailing.period.com'])
|
||||
|
||||
short_args = ['-d', 'example.com,another.net,third.org,example.com']
|
||||
namespace = self.parse(short_args)
|
||||
assert_set_by_user_with_value(namespace, 'domains',
|
||||
[san.DNSName('example.com'), san.DNSName('another.net'), san.DNSName('third.org')])
|
||||
['example.com', 'another.net', 'third.org'])
|
||||
|
||||
long_args = ['--domains', 'example.com']
|
||||
namespace = self.parse(long_args)
|
||||
assert_set_by_user_with_value(namespace, 'domains', [san.DNSName('example.com')])
|
||||
assert_set_by_user_with_value(namespace, 'domains', ['example.com'])
|
||||
|
||||
long_args = ['--domains', 'trailing.period.com.']
|
||||
namespace = self.parse(long_args)
|
||||
assert_set_by_user_with_value(namespace, 'domains', [san.DNSName('trailing.period.com')])
|
||||
assert_set_by_user_with_value(namespace, 'domains', ['trailing.period.com'])
|
||||
|
||||
long_args = ['--domains', 'example.com,another.net,example.com']
|
||||
namespace = self.parse(long_args)
|
||||
assert_set_by_user_with_value(namespace, 'domains', [san.DNSName('example.com'), san.DNSName('another.net')])
|
||||
assert_set_by_user_with_value(namespace, 'domains', ['example.com', 'another.net'])
|
||||
|
||||
def test_preferred_challenges(self):
|
||||
short_args = ['--preferred-challenges', 'http, dns']
|
||||
|
|
@ -482,7 +481,7 @@ class ParseTest(unittest.TestCase):
|
|||
assert_value_and_source(namespace, 'pref_challs', [], ArgumentSource.DEFAULT)
|
||||
|
||||
namespace.pref_challs = [challenges.HTTP01.typ]
|
||||
namespace.domains = [san.DNSName('example.com')]
|
||||
namespace.domains = ['example.com']
|
||||
|
||||
namespace = self.parse([])
|
||||
assert_value_and_source(namespace, 'domains', [], ArgumentSource.DEFAULT)
|
||||
|
|
@ -573,7 +572,7 @@ class ParseTest(unittest.TestCase):
|
|||
@mock.patch('certbot._internal.hooks.validate_hooks')
|
||||
def test_argument_with_equals(self, unsused_mock_validate_hooks):
|
||||
namespace = self.parse('-d=example.com')
|
||||
assert_set_by_user_with_value(namespace, 'domains', [san.DNSName('example.com')])
|
||||
assert_set_by_user_with_value(namespace, 'domains', ['example.com'])
|
||||
|
||||
# make sure it doesn't choke on equals signs being present in the argument value
|
||||
plugins = disco.PluginsRegistry.find_all()
|
||||
|
|
@ -599,7 +598,7 @@ class ParseTest(unittest.TestCase):
|
|||
# in double quotes, or as its own line in a docker-compose.yml file (as
|
||||
# in #9811)
|
||||
namespace = self.parse(['certonly', '-d foo.com'])
|
||||
assert_set_by_user_with_value(namespace, 'domains', [san.DNSName('foo.com')])
|
||||
assert_set_by_user_with_value(namespace, 'domains', ['foo.com'])
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(pytest.main(sys.argv[1:] + [__file__])) # pragma: no cover
|
||||
|
|
|
|||
|
|
@ -16,7 +16,6 @@ from certbot import errors
|
|||
from certbot import util
|
||||
from certbot._internal import account
|
||||
from certbot._internal import constants
|
||||
from certbot._internal import san
|
||||
from certbot._internal.display import obj as display_obj
|
||||
from certbot.compat import os
|
||||
import certbot.tests.util as test_util
|
||||
|
|
@ -276,7 +275,6 @@ class ClientTest(ClientTestCommon):
|
|||
self.config.dry_run = False
|
||||
self.config.strict_permissions = True
|
||||
self.eg_domains = ["example.com", "www.example.com"]
|
||||
self.eg_sans = list(map(san.DNSName, self.eg_domains))
|
||||
self.eg_order = mock.MagicMock(
|
||||
authorizations=[None],
|
||||
csr_pem=mock.sentinel.csr_pem)
|
||||
|
|
@ -478,7 +476,7 @@ class ClientTest(ClientTestCommon):
|
|||
mock_crypto_util.generate_key.return_value = key
|
||||
self._set_mock_from_fullchain(mock_crypto_util.cert_and_chain_from_fullchain)
|
||||
|
||||
authzr = (self._authzr_from_sans([san.DNSName("example.com")]))
|
||||
authzr = self._authzr_from_domains(["example.com"])
|
||||
self.config.allow_subset_of_names = True
|
||||
self._test_obtain_certificate_common(key, csr, authzr_ret=authzr, auth_count=2)
|
||||
|
||||
|
|
@ -496,7 +494,7 @@ class ClientTest(ClientTestCommon):
|
|||
self._set_mock_from_fullchain(mock_crypto_util.cert_and_chain_from_fullchain)
|
||||
|
||||
self._mock_obtain_certificate()
|
||||
authzr = self._authzr_from_sans(self.eg_sans)
|
||||
authzr = self._authzr_from_domains(self.eg_domains)
|
||||
self.eg_order.authorizations = authzr
|
||||
self.client.auth_handler.handle_authorizations.return_value = authzr
|
||||
|
||||
|
|
@ -508,15 +506,15 @@ class ClientTest(ClientTestCommon):
|
|||
self.config.allow_subset_of_names = True
|
||||
|
||||
with test_util.patch_display_util():
|
||||
result = self.client.obtain_certificate(self.eg_sans)
|
||||
result = self.client.obtain_certificate(self.eg_domains)
|
||||
|
||||
assert result == \
|
||||
(mock.sentinel.cert, mock.sentinel.chain, key, csr)
|
||||
assert self.client.auth_handler.handle_authorizations.call_count == 2
|
||||
assert self.acme.finalize_order.call_count == 2
|
||||
|
||||
assert mock_crypto_util.generate_key.call_count == 2
|
||||
successful_domains = [d for d in self.eg_domains if d != 'example.com']
|
||||
assert mock_crypto_util.generate_key.call_count == 2
|
||||
mock_crypto_util.generate_csr.assert_has_calls([
|
||||
mock.call(key, self.eg_domains, None, self.config.must_staple, self.config.strict_permissions),
|
||||
mock.call(key, successful_domains, None, self.config.must_staple, self.config.strict_permissions)])
|
||||
|
|
@ -532,7 +530,7 @@ class ClientTest(ClientTestCommon):
|
|||
self._set_mock_from_fullchain(mock_crypto_util.cert_and_chain_from_fullchain)
|
||||
|
||||
self._mock_obtain_certificate()
|
||||
authzr = self._authzr_from_sans(self.eg_sans)
|
||||
authzr = self._authzr_from_domains(self.eg_domains)
|
||||
self.eg_order.authorizations = authzr
|
||||
self.client.auth_handler.handle_authorizations.return_value = authzr
|
||||
|
||||
|
|
@ -546,7 +544,7 @@ class ClientTest(ClientTestCommon):
|
|||
self.config.allow_subset_of_names = True
|
||||
|
||||
with pytest.raises(messages.Error):
|
||||
self.client.obtain_certificate(self.eg_sans)
|
||||
self.client.obtain_certificate(self.eg_domains)
|
||||
assert self.client.auth_handler.handle_authorizations.call_count == 1
|
||||
assert self.acme.finalize_order.call_count == 1
|
||||
assert mock_crypto_util.generate_key.call_count == 1
|
||||
|
|
@ -562,7 +560,7 @@ class ClientTest(ClientTestCommon):
|
|||
self._set_mock_from_fullchain(mock_crypto_util.cert_and_chain_from_fullchain)
|
||||
|
||||
self._mock_obtain_certificate()
|
||||
authzr = self._authzr_from_sans(self.eg_sans)
|
||||
authzr = self._authzr_from_domains(self.eg_domains)
|
||||
self.eg_order.authorizations = authzr
|
||||
self.client.auth_handler.handle_authorizations.return_value = authzr
|
||||
|
||||
|
|
@ -572,7 +570,7 @@ class ClientTest(ClientTestCommon):
|
|||
self.config.allow_subset_of_names = True
|
||||
|
||||
with pytest.raises(messages.Error):
|
||||
self.client.obtain_certificate(self.eg_sans)
|
||||
self.client.obtain_certificate(self.eg_domains)
|
||||
assert self.client.auth_handler.handle_authorizations.call_count == 1
|
||||
assert self.acme.finalize_order.call_count == 1
|
||||
assert mock_crypto_util.generate_key.call_count == 1
|
||||
|
|
@ -588,7 +586,7 @@ class ClientTest(ClientTestCommon):
|
|||
self._set_mock_from_fullchain(mock_crypto_util.cert_and_chain_from_fullchain)
|
||||
|
||||
self._mock_obtain_certificate()
|
||||
authzr = self._authzr_from_sans(self.eg_sans)
|
||||
authzr = self._authzr_from_domains(self.eg_domains)
|
||||
self.eg_order.authorizations = authzr
|
||||
self.client.auth_handler.handle_authorizations.return_value = authzr
|
||||
|
||||
|
|
@ -600,7 +598,7 @@ class ClientTest(ClientTestCommon):
|
|||
self.config.allow_subset_of_names = True
|
||||
|
||||
with test_util.patch_display_util():
|
||||
result = self.client.obtain_certificate(self.eg_sans)
|
||||
result = self.client.obtain_certificate(self.eg_domains)
|
||||
|
||||
assert result == \
|
||||
(mock.sentinel.cert, mock.sentinel.chain, key, csr)
|
||||
|
|
@ -624,7 +622,7 @@ class ClientTest(ClientTestCommon):
|
|||
self._set_mock_from_fullchain(mock_crypto_util.cert_and_chain_from_fullchain)
|
||||
|
||||
self._mock_obtain_certificate()
|
||||
authzr = self._authzr_from_sans(self.eg_sans)
|
||||
authzr = self._authzr_from_domains(self.eg_domains)
|
||||
self.eg_order.authorizations = authzr
|
||||
self.client.auth_handler.handle_authorizations.return_value = authzr
|
||||
|
||||
|
|
@ -638,7 +636,7 @@ class ClientTest(ClientTestCommon):
|
|||
self.config.allow_subset_of_names = True
|
||||
|
||||
with pytest.raises(messages.Error):
|
||||
self.client.obtain_certificate(self.eg_sans)
|
||||
self.client.obtain_certificate(self.eg_domains)
|
||||
assert self.client.auth_handler.handle_authorizations.call_count == 0
|
||||
assert self.acme.new_order.call_count == 1
|
||||
assert mock_crypto_util.generate_key.call_count == 1
|
||||
|
|
@ -654,7 +652,7 @@ class ClientTest(ClientTestCommon):
|
|||
self._set_mock_from_fullchain(mock_crypto_util.cert_and_chain_from_fullchain)
|
||||
|
||||
self._mock_obtain_certificate()
|
||||
authzr = self._authzr_from_sans(self.eg_sans)
|
||||
authzr = self._authzr_from_domains(self.eg_domains)
|
||||
self.eg_order.authorizations = authzr
|
||||
self.client.auth_handler.handle_authorizations.return_value = authzr
|
||||
|
||||
|
|
@ -664,7 +662,7 @@ class ClientTest(ClientTestCommon):
|
|||
self.config.allow_subset_of_names = True
|
||||
|
||||
with pytest.raises(messages.Error):
|
||||
self.client.obtain_certificate(self.eg_sans)
|
||||
self.client.obtain_certificate(self.eg_domains)
|
||||
assert self.client.auth_handler.handle_authorizations.call_count == 0
|
||||
assert self.acme.new_order.call_count == 1
|
||||
assert mock_crypto_util.generate_key.call_count == 1
|
||||
|
|
@ -687,10 +685,8 @@ class ClientTest(ClientTestCommon):
|
|||
elliptic_curve="secp256r1",
|
||||
key_type=self.config.key_type,
|
||||
)
|
||||
# Assumes all of eg_sans are DNSNames.
|
||||
eg_domains = list(map(str, self.eg_sans))
|
||||
mock_acme_crypto.make_csr.assert_called_once_with(
|
||||
mock.sentinel.key_pem, eg_domains, self.config.must_staple)
|
||||
mock.sentinel.key_pem, self.eg_domains, self.config.must_staple)
|
||||
mock_crypto.generate_key.assert_not_called()
|
||||
mock_crypto.generate_csr.assert_not_called()
|
||||
assert mock_crypto.cert_and_chain_from_fullchain.call_count == 1
|
||||
|
|
@ -711,7 +707,7 @@ class ClientTest(ClientTestCommon):
|
|||
self.client.config.dry_run = True
|
||||
|
||||
# Two authzs that are already valid and should get deactivated (dry run)
|
||||
authzrs = self._authzr_from_sans([san.DNSName("example.com"), san.DNSName("www.example.com")])
|
||||
authzrs = self._authzr_from_domains(["example.com", "www.example.com"])
|
||||
for authzr in authzrs:
|
||||
authzr.body.status = messages.STATUS_VALID
|
||||
|
||||
|
|
@ -723,7 +719,7 @@ class ClientTest(ClientTestCommon):
|
|||
self.eg_order.authorizations = authzrs
|
||||
self.client.auth_handler.handle_authorizations.return_value = authzrs
|
||||
with test_util.patch_display_util():
|
||||
result = self.client.obtain_certificate(self.eg_sans)
|
||||
result = self.client.obtain_certificate(self.eg_domains)
|
||||
assert result == (mock.sentinel.cert, mock.sentinel.chain, key, csr)
|
||||
self._check_obtain_certificate(1)
|
||||
|
||||
|
|
@ -745,7 +741,7 @@ class ClientTest(ClientTestCommon):
|
|||
mock_crypto_util.generate_key.return_value = new_key
|
||||
self._set_mock_from_fullchain(mock_crypto_util.cert_and_chain_from_fullchain)
|
||||
|
||||
authzr = self._authzr_from_sans([san.DNSName("example.com")])
|
||||
authzr = self._authzr_from_domains(["example.com"])
|
||||
self.config.allow_subset_of_names = True
|
||||
self.config.reuse_key = True
|
||||
|
||||
|
|
@ -757,7 +753,7 @@ class ClientTest(ClientTestCommon):
|
|||
with test_util.patch_display_util():
|
||||
mocked_open = mock.mock_open(read_data="old_key_pem")
|
||||
with mock.patch('builtins.open', mocked_open):
|
||||
result = self.client.obtain_certificate(self.eg_sans, "old_key_file")
|
||||
result = self.client.obtain_certificate(self.eg_domains, "old_key_file")
|
||||
|
||||
assert result == \
|
||||
(mock.sentinel.cert, mock.sentinel.chain, old_key, csr)
|
||||
|
|
@ -772,19 +768,16 @@ class ClientTest(ClientTestCommon):
|
|||
mock_chain.encode.return_value = mock.sentinel.chain
|
||||
mock_from_fullchain.return_value = (mock_cert, mock_chain)
|
||||
|
||||
def _authzr_from_sans(self, sans):
|
||||
def _authzr_from_domains(self, domains):
|
||||
authzr = []
|
||||
|
||||
# domain ordering should not be affected by authorization order
|
||||
for s in reversed(sans):
|
||||
if type(s) is not san.DNSName:
|
||||
raise TypeError(f"expected DNSName but got {type(s)}")
|
||||
for domain in reversed(domains):
|
||||
authzr.append(
|
||||
mock.MagicMock(
|
||||
body=mock.MagicMock(
|
||||
identifier=mock.MagicMock(
|
||||
value=str(s),
|
||||
typ="dns"))))
|
||||
value=domain))))
|
||||
return authzr
|
||||
|
||||
def _test_obtain_certificate_common(self, key, csr, authzr_ret=None, auth_count=1):
|
||||
|
|
@ -793,13 +786,13 @@ class ClientTest(ClientTestCommon):
|
|||
# return_value is essentially set to (None, None) in
|
||||
# _mock_obtain_certificate(), which breaks this test.
|
||||
# Thus fixed by the next line.
|
||||
authzr = authzr_ret or self._authzr_from_sans(self.eg_sans)
|
||||
authzr = authzr_ret or self._authzr_from_domains(self.eg_domains)
|
||||
|
||||
self.eg_order.authorizations = authzr
|
||||
self.client.auth_handler.handle_authorizations.return_value = authzr
|
||||
|
||||
with test_util.patch_display_util():
|
||||
result = self.client.obtain_certificate(self.eg_sans)
|
||||
result = self.client.obtain_certificate(self.eg_domains)
|
||||
|
||||
assert result == \
|
||||
(mock.sentinel.cert, mock.sentinel.chain, key, csr)
|
||||
|
|
@ -809,19 +802,19 @@ class ClientTest(ClientTestCommon):
|
|||
@mock.patch('certbot._internal.storage.RenewableCert.new_lineage')
|
||||
def test_obtain_and_enroll_certificate(self,
|
||||
mock_storage, mock_obtain_certificate):
|
||||
sans = [san.DNSName("*.example.com"), san.DNSName("example.com")]
|
||||
domains = ["*.example.com", "example.com"]
|
||||
mock_obtain_certificate.return_value = (mock.MagicMock(),
|
||||
mock.MagicMock(), mock.MagicMock(), None)
|
||||
|
||||
self.client.config.dry_run = False
|
||||
assert self.client.obtain_and_enroll_certificate(sans, "example_cert")
|
||||
assert self.client.obtain_and_enroll_certificate(domains, "example_cert")
|
||||
|
||||
assert self.client.obtain_and_enroll_certificate(sans, None)
|
||||
assert self.client.obtain_and_enroll_certificate(sans[1:], None)
|
||||
assert self.client.obtain_and_enroll_certificate(domains, None)
|
||||
assert self.client.obtain_and_enroll_certificate(domains[1:], None)
|
||||
|
||||
self.client.config.dry_run = True
|
||||
|
||||
assert not self.client.obtain_and_enroll_certificate(sans, None)
|
||||
assert not self.client.obtain_and_enroll_certificate(domains, None)
|
||||
|
||||
names = [call[0][0] for call in mock_storage.call_args_list]
|
||||
assert names == ["example_cert", "example.com", "example.com"]
|
||||
|
|
@ -866,12 +859,12 @@ class ClientTest(ClientTestCommon):
|
|||
@test_util.patch_display_util()
|
||||
def test_deploy_certificate_success(self, mock_util):
|
||||
with pytest.raises(errors.Error):
|
||||
self.client.deploy_certificate([san.DNSName("foo.bar")], "key", "cert", "chain", "fullchain")
|
||||
self.client.deploy_certificate(["foo.bar"], "key", "cert", "chain", "fullchain")
|
||||
|
||||
installer = mock.MagicMock()
|
||||
self.client.installer = installer
|
||||
|
||||
self.client.deploy_certificate([san.DNSName("foo.bar")], "key", "cert", "chain", "fullchain")
|
||||
self.client.deploy_certificate(["foo.bar"], "key", "cert", "chain", "fullchain")
|
||||
installer.deploy_cert.assert_called_once_with(
|
||||
cert_path=os.path.abspath("cert"),
|
||||
chain_path=os.path.abspath("chain"),
|
||||
|
|
@ -890,7 +883,7 @@ class ClientTest(ClientTestCommon):
|
|||
|
||||
installer.deploy_cert.side_effect = errors.PluginError
|
||||
with pytest.raises(errors.PluginError):
|
||||
self.client.deploy_certificate([san.DNSName("foo.bar")], "key", "cert", "chain", "fullchain")
|
||||
self.client.deploy_certificate(["foo.bar"], "key", "cert", "chain", "fullchain")
|
||||
installer.recovery_routine.assert_called_once_with()
|
||||
|
||||
mock_notify.assert_any_call('Deploying certificate')
|
||||
|
|
@ -903,7 +896,7 @@ class ClientTest(ClientTestCommon):
|
|||
|
||||
installer.save.side_effect = errors.PluginError
|
||||
with pytest.raises(errors.PluginError):
|
||||
self.client.deploy_certificate([san.DNSName("foo.bar")], "key", "cert", "chain", "fullchain")
|
||||
self.client.deploy_certificate(["foo.bar"], "key", "cert", "chain", "fullchain")
|
||||
installer.recovery_routine.assert_called_once_with()
|
||||
|
||||
@mock.patch('certbot._internal.client.display_util.notify')
|
||||
|
|
@ -914,7 +907,7 @@ class ClientTest(ClientTestCommon):
|
|||
self.client.installer = installer
|
||||
|
||||
with pytest.raises(errors.PluginError):
|
||||
self.client.deploy_certificate([san.DNSName("foo.bar")], "key", "cert", "chain", "fullchain")
|
||||
self.client.deploy_certificate(["foo.bar"], "key", "cert", "chain", "fullchain")
|
||||
mock_notify.assert_called_with(
|
||||
'We were unable to install your certificate, however, we successfully restored '
|
||||
'your server to its prior configuration.')
|
||||
|
|
@ -930,7 +923,7 @@ class ClientTest(ClientTestCommon):
|
|||
self.client.installer = installer
|
||||
|
||||
with pytest.raises(errors.PluginError):
|
||||
self.client.deploy_certificate([san.DNSName("foo.bar")], "key", "cert", "chain", "fullchain")
|
||||
self.client.deploy_certificate(["foo.bar"], "key", "cert", "chain", "fullchain")
|
||||
assert mock_logger.error.call_count == 1
|
||||
assert 'An error occurred and we failed to restore your config' in \
|
||||
mock_logger.error.call_args[0][0]
|
||||
|
|
@ -939,11 +932,11 @@ class ClientTest(ClientTestCommon):
|
|||
|
||||
def test_choose_lineage_name(self):
|
||||
sep = os.path.sep
|
||||
invalid_domains = [san.DNSName(f"exam{sep}ple.com")]
|
||||
valid_domains = [san.DNSName("example.com")]
|
||||
invalid_domains = [f"exam{sep}ple.com"]
|
||||
valid_domains = ["example.com"]
|
||||
invalid_certname = f"foo{sep}.bar"
|
||||
valid_certname = "foo.bar"
|
||||
invalid_wildcard_domain = [san.DNSName(f"*.exam{sep}ple.com")]
|
||||
invalid_wildcard_domain = [f"*.exam{sep}ple.com"]
|
||||
# Verify errors are raised when invalid lineagename is chosen.
|
||||
with pytest.raises(errors.Error):
|
||||
self.client._choose_lineagename(invalid_domains, None)
|
||||
|
|
@ -971,7 +964,7 @@ class EnhanceConfigTest(ClientTestCommon):
|
|||
|
||||
def test_no_installer(self):
|
||||
with pytest.raises(errors.Error):
|
||||
self.client.enhance_config([san.DNSName(self.domain)], None)
|
||||
self.client.enhance_config([self.domain], None)
|
||||
|
||||
def test_unsupported(self):
|
||||
self.client.installer = mock.MagicMock()
|
||||
|
|
@ -1084,7 +1077,7 @@ class EnhanceConfigTest(ClientTestCommon):
|
|||
self.client.installer = mock.MagicMock()
|
||||
self.client.installer.supported_enhancements.return_value = [
|
||||
"ensure-http-header", "redirect", "staple-ocsp"]
|
||||
self.client.enhance_config([san.DNSName(self.domain)], None)
|
||||
self.client.enhance_config([self.domain], None)
|
||||
assert self.client.installer.save.call_count == 1
|
||||
assert self.client.installer.restart.call_count == 1
|
||||
|
||||
|
|
@ -1093,7 +1086,7 @@ class EnhanceConfigTest(ClientTestCommon):
|
|||
self.client.installer.supported_enhancements.return_value = [
|
||||
"ensure-http-header", "redirect", "staple-ocsp"]
|
||||
self.client.installer.enhance.side_effect = errors.PluginEnhancementAlreadyPresent()
|
||||
self.client.enhance_config([san.DNSName(self.domain)], None)
|
||||
self.client.enhance_config([self.domain], None)
|
||||
|
||||
|
||||
class RollbackTest(unittest.TestCase):
|
||||
|
|
|
|||
|
|
@ -105,11 +105,11 @@ class SeparateListInputTest(unittest.TestCase):
|
|||
assert act == self.exp
|
||||
|
||||
|
||||
class SummarizeSANsTest(unittest.TestCase):
|
||||
class SummarizeDomainListTest(unittest.TestCase):
|
||||
@classmethod
|
||||
def _call(cls, domains):
|
||||
from certbot._internal.display.util import summarize_sans
|
||||
return summarize_sans(domains)
|
||||
from certbot._internal.display.util import summarize_domain_list
|
||||
return summarize_domain_list(domains)
|
||||
|
||||
def test_single_domain(self):
|
||||
assert "example.com" == self._call(["example.com"])
|
||||
|
|
@ -119,7 +119,7 @@ class SummarizeSANsTest(unittest.TestCase):
|
|||
self._call(["example.com", "example.org"])
|
||||
|
||||
def test_many_domains(self):
|
||||
assert "example.com and 2 more" == \
|
||||
assert "example.com and 2 more domains" == \
|
||||
self._call(["example.com", "example.org", "a.example.com"])
|
||||
|
||||
def test_empty_domains(self):
|
||||
|
|
|
|||
|
|
@ -28,7 +28,6 @@ from certbot._internal import cli
|
|||
from certbot._internal import constants
|
||||
from certbot._internal import main
|
||||
from certbot._internal import updater
|
||||
from certbot._internal import san
|
||||
from certbot._internal.plugins import disco
|
||||
from certbot._internal.plugins import manual
|
||||
from certbot._internal.plugins import null
|
||||
|
|
@ -78,7 +77,7 @@ class TestHandleCerts(unittest.TestCase):
|
|||
mock_config = mock.Mock()
|
||||
mock_config.expand = True
|
||||
mock_lineage = mock.Mock()
|
||||
mock_lineage.sans.return_value = [san.DNSName("dummy1"), san.DNSName("dummy2")]
|
||||
mock_lineage.names.return_value = ["dummy1", "dummy2"]
|
||||
ret = main._handle_subset_cert_request(mock_config, ["dummy1"], mock_lineage)
|
||||
assert ret == ("renew", mock_lineage)
|
||||
assert mock_handle_migration.called
|
||||
|
|
@ -254,20 +253,20 @@ class CertonlyTest(unittest.TestCase):
|
|||
|
||||
@mock.patch('certbot._internal.main._report_next_steps')
|
||||
@mock.patch('certbot._internal.cert_manager.lineage_for_certname')
|
||||
@mock.patch('certbot._internal.cert_manager.sans_for_certname')
|
||||
@mock.patch('certbot._internal.cert_manager.domains_for_certname')
|
||||
@mock.patch('certbot._internal.renewal.renew_cert')
|
||||
@mock.patch('certbot._internal.main._handle_unexpected_key_type_migration')
|
||||
@mock.patch('certbot._internal.main._report_new_cert')
|
||||
def test_find_lineage_for_sans_and_certname(self, mock_report_cert,
|
||||
mock_handle_type, mock_renew_cert, mock_sans_for_certname, mock_lineage, mock_report_next_steps):
|
||||
domains = [san.DNSName('example.com'), san.DNSName('test.org')]
|
||||
mock_sans_for_certname.return_value = domains
|
||||
mock_lineage.sans.return_value = domains
|
||||
def test_find_lineage_for_domains_and_certname(self, mock_report_cert,
|
||||
mock_handle_type, mock_renew_cert, mock_domains, mock_lineage, mock_report_next_steps):
|
||||
domains = ['example.com', 'test.org']
|
||||
mock_domains.return_value = domains
|
||||
mock_lineage.names.return_value = domains
|
||||
self._call(('certonly --webroot -d example.com -d test.org '
|
||||
'--cert-name example.com --no-directory-hooks').split())
|
||||
|
||||
assert mock_lineage.call_count == 1
|
||||
assert mock_sans_for_certname.call_count == 1
|
||||
assert mock_domains.call_count == 1
|
||||
assert mock_renew_cert.call_count == 1
|
||||
assert mock_report_cert.call_count == 1
|
||||
assert mock_handle_type.call_count == 1
|
||||
|
|
@ -278,7 +277,7 @@ class CertonlyTest(unittest.TestCase):
|
|||
self._call(('certonly --webroot -d example.com -d test.com '
|
||||
'--cert-name example.com --no-directory-hooks').split())
|
||||
assert mock_lineage.call_count == 2
|
||||
assert mock_sans_for_certname.call_count == 2
|
||||
assert mock_domains.call_count == 2
|
||||
assert mock_renew_cert.call_count == 2
|
||||
assert mock_report_cert.call_count == 2
|
||||
assert mock_handle_type.call_count == 2
|
||||
|
|
@ -290,12 +289,12 @@ class CertonlyTest(unittest.TestCase):
|
|||
' --no-directory-hooks'.split())
|
||||
|
||||
@mock.patch('certbot._internal.main._report_next_steps')
|
||||
@mock.patch('certbot._internal.cert_manager.sans_for_certname')
|
||||
@mock.patch('certbot._internal.cert_manager.domains_for_certname')
|
||||
@mock.patch('certbot.display.ops.choose_names')
|
||||
@mock.patch('certbot._internal.cert_manager.lineage_for_certname')
|
||||
@mock.patch('certbot._internal.main._report_new_cert')
|
||||
def test_find_lineage_for_sans_new_certname(self, mock_report_cert,
|
||||
mock_lineage, mock_choose_names, mock_sans_for_certname, unused_mock_report_next_steps):
|
||||
def test_find_lineage_for_domains_new_certname(self, mock_report_cert,
|
||||
mock_lineage, mock_choose_names, mock_domains_for_certname, unused_mock_report_next_steps):
|
||||
mock_lineage.return_value = None
|
||||
|
||||
# no lineage with this name but we specified domains so create a new cert
|
||||
|
|
@ -306,7 +305,7 @@ class CertonlyTest(unittest.TestCase):
|
|||
|
||||
# no lineage with this name and we didn't give domains
|
||||
mock_choose_names.return_value = ["somename"]
|
||||
mock_sans_for_certname.return_value = None
|
||||
mock_domains_for_certname.return_value = None
|
||||
self._call(('certonly --webroot --cert-name example.com --no-directory-hooks').split())
|
||||
assert mock_choose_names.called is True
|
||||
|
||||
|
|
@ -367,31 +366,31 @@ class CertonlyTest(unittest.TestCase):
|
|||
'-i standalone -d example.com').split())
|
||||
|
||||
|
||||
class FindSansOrCertnameTest(unittest.TestCase):
|
||||
"""Tests for certbot._internal.main._find_sans_or_certname."""
|
||||
class FindDomainsOrCertnameTest(unittest.TestCase):
|
||||
"""Tests for certbot._internal.main._find_domains_or_certname."""
|
||||
|
||||
@mock.patch('certbot.display.ops.choose_names')
|
||||
def test_display_ops(self, mock_choose_names):
|
||||
mock_config = mock.Mock(domains=None, certname=None)
|
||||
mock_choose_names.return_value = ["example.com"]
|
||||
mock_choose_names.return_value = "domainname"
|
||||
# pylint: disable=protected-access
|
||||
assert main._find_sans_or_certname(mock_config, None) == ([san.DNSName("example.com")], None)
|
||||
assert main._find_domains_or_certname(mock_config, None) == ("domainname", None)
|
||||
|
||||
@mock.patch('certbot.display.ops.choose_names')
|
||||
def test_no_results(self, mock_choose_names):
|
||||
mock_config = mock.Mock(domains=None, ip_addresses=None, certname=None)
|
||||
mock_config = mock.Mock(domains=None, certname=None)
|
||||
mock_choose_names.return_value = []
|
||||
# pylint: disable=protected-access
|
||||
with pytest.raises(errors.Error):
|
||||
main._find_sans_or_certname(mock_config, None)
|
||||
main._find_domains_or_certname(mock_config, None)
|
||||
|
||||
@mock.patch('certbot._internal.cert_manager.sans_for_certname')
|
||||
@mock.patch('certbot._internal.cert_manager.domains_for_certname')
|
||||
def test_grab_domains(self, mock_domains):
|
||||
mock_config = mock.Mock(domains=None, ip_addresses=None, certname="one.com")
|
||||
mock_config = mock.Mock(domains=None, certname="one.com")
|
||||
mock_domains.return_value = ["one.com", "two.com"]
|
||||
# pylint: disable=protected-access
|
||||
assert main._find_sans_or_certname(mock_config, None) == \
|
||||
(["one.com", "two.com"], "one.com")
|
||||
assert main._find_domains_or_certname(mock_config, None) == \
|
||||
(["one.com", "two.com"], "one.com")
|
||||
|
||||
|
||||
class RevokeTest(test_util.TempDirTestCase):
|
||||
|
|
@ -537,7 +536,7 @@ class ReconfigureTest(test_util.TempDirTestCase):
|
|||
self.mock_get_utility = self.get_utility_patch.start()
|
||||
self.patchers = {
|
||||
'check_symlinks': mock.patch('certbot._internal.storage.RenewableCert._check_symlinks'),
|
||||
'cert_sans': mock.patch('certbot._internal.storage.RenewableCert.sans'),
|
||||
'cert_names': mock.patch('certbot._internal.storage.RenewableCert.names'),
|
||||
'pick_installer': mock.patch('certbot._internal.plugins.selection.pick_installer'),
|
||||
'pick_auth': mock.patch('certbot._internal.plugins.selection.pick_authenticator'),
|
||||
'find_init': mock.patch('certbot._internal.plugins.disco.PluginsRegistry.find_init'),
|
||||
|
|
@ -546,7 +545,7 @@ class ReconfigureTest(test_util.TempDirTestCase):
|
|||
'list_hooks': mock.patch('certbot._internal.hooks.list_hooks'),
|
||||
}
|
||||
self.mocks = {k: v.start() for k, v in self.patchers.items()}
|
||||
self.mocks['cert_sans'].return_value = [san.DNSName('example.com')]
|
||||
self.mocks['cert_names'].return_value = ['example.com']
|
||||
|
||||
self.config_dir = os.path.join(self.tempdir, 'config')
|
||||
renewal_configs_dir = os.path.join(self.config_dir, 'renewal')
|
||||
|
|
@ -1367,7 +1366,7 @@ class MainTest(test_util.ConfigTestCase):
|
|||
self._call('certonly -d example.org --csr {0}'.format(CSR).split())
|
||||
|
||||
def _certonly_new_request_common(self, mock_client, args=None):
|
||||
with mock.patch('certbot._internal.main._find_lineage_for_sans_and_certname') \
|
||||
with mock.patch('certbot._internal.main._find_lineage_for_domains_and_certname') \
|
||||
as mock_renewal:
|
||||
mock_renewal.return_value = ("newcert", None)
|
||||
with mock.patch('certbot._internal.main._init_le_client') as mock_init:
|
||||
|
|
@ -1419,7 +1418,7 @@ class MainTest(test_util.ConfigTestCase):
|
|||
mock_lineage = mock.MagicMock(cert=cert_path, fullchain=chain_path,
|
||||
cert_path=cert_path, fullchain_path=chain_path)
|
||||
mock_lineage.has_pending_deployment.return_value = False
|
||||
mock_lineage.sans.return_value = [san.DNSName('isnot.org')]
|
||||
mock_lineage.names.return_value = ['isnot.org']
|
||||
mock_lineage.private_key_type = 'ecdsa'
|
||||
mock_lineage.elliptic_curve = 'secp256r1'
|
||||
mock_lineage.reuse_key = reuse_key
|
||||
|
|
@ -1635,7 +1634,7 @@ class MainTest(test_util.ConfigTestCase):
|
|||
if renewalparams is not None:
|
||||
mock_lineage.configuration = {'renewalparams': renewalparams}
|
||||
if names is not None:
|
||||
mock_lineage.sans.return_value = [san.DNSName(n) for n in names]
|
||||
mock_lineage.names.return_value = names
|
||||
mock_rc.return_value = mock_lineage
|
||||
with mock.patch('certbot._internal.main.renew_cert') as mock_renew_cert:
|
||||
kwargs.setdefault('args', ['renew'])
|
||||
|
|
@ -1666,6 +1665,12 @@ class MainTest(test_util.ConfigTestCase):
|
|||
self._test_renew_common(renewalparams=renewalparams,
|
||||
assert_oc_called=True)
|
||||
|
||||
def test_renew_with_bad_domain(self):
|
||||
renewalparams = {'authenticator': 'webroot'}
|
||||
names = ['uniçodé.com']
|
||||
self._test_renew_common(renewalparams=renewalparams, error_expected=True,
|
||||
names=names, assert_oc_called=False)
|
||||
|
||||
@mock.patch('certbot._internal.plugins.selection.choose_configurator_plugins')
|
||||
def test_renew_with_configurator(self, mock_sel):
|
||||
mock_sel.return_value = (mock.MagicMock(), mock.MagicMock())
|
||||
|
|
@ -1721,7 +1726,7 @@ class MainTest(test_util.ConfigTestCase):
|
|||
assert 'No hooks were run.' in stdout.getvalue()
|
||||
|
||||
@test_util.patch_display_util()
|
||||
@mock.patch('certbot._internal.main._find_lineage_for_sans_and_certname')
|
||||
@mock.patch('certbot._internal.main._find_lineage_for_domains_and_certname')
|
||||
@mock.patch('certbot._internal.main._init_le_client')
|
||||
@mock.patch('certbot._internal.main._report_new_cert')
|
||||
def test_certonly_reinstall(self, mock_report_new_cert, mock_init,
|
||||
|
|
@ -1969,8 +1974,8 @@ class EnhanceTest(test_util.ConfigTestCase):
|
|||
|
||||
with mock.patch('certbot._internal.cert_manager.get_certnames') as mock_certs:
|
||||
mock_certs.return_value = ['example.com']
|
||||
with mock.patch('certbot._internal.cert_manager.sans_for_certname') as mock_dom:
|
||||
mock_dom.return_value = [san.DNSName('example.com')]
|
||||
with mock.patch('certbot._internal.cert_manager.domains_for_certname') as mock_dom:
|
||||
mock_dom.return_value = ['example.com']
|
||||
with mock.patch('certbot._internal.main._init_le_client') as mock_init:
|
||||
mock_client = mock.MagicMock()
|
||||
mock_client.config = config
|
||||
|
|
@ -1981,7 +1986,7 @@ class EnhanceTest(test_util.ConfigTestCase):
|
|||
@mock.patch('certbot._internal.main.plug_sel.record_chosen_plugins')
|
||||
@mock.patch('certbot._internal.cert_manager.lineage_for_certname')
|
||||
@mock.patch('certbot._internal.main.display_ops.choose_values')
|
||||
@mock.patch('certbot._internal.main._find_sans_or_certname')
|
||||
@mock.patch('certbot._internal.main._find_domains_or_certname')
|
||||
def test_selection_question(self, mock_find, mock_choose, mock_lineage, _rec):
|
||||
mock_lineage.return_value = mock.MagicMock(chain_path="/tmp/nonexistent")
|
||||
mock_choose.return_value = ['example.com']
|
||||
|
|
@ -1995,7 +2000,7 @@ class EnhanceTest(test_util.ConfigTestCase):
|
|||
@mock.patch('certbot._internal.main.plug_sel.record_chosen_plugins')
|
||||
@mock.patch('certbot._internal.cert_manager.lineage_for_certname')
|
||||
@mock.patch('certbot._internal.main.display_ops.choose_values')
|
||||
@mock.patch('certbot._internal.main._find_sans_or_certname')
|
||||
@mock.patch('certbot._internal.main._find_domains_or_certname')
|
||||
def test_selection_auth_warning(self, mock_find, mock_choose, mock_lineage, _rec):
|
||||
mock_lineage.return_value = mock.MagicMock(chain_path="/tmp/nonexistent")
|
||||
mock_choose.return_value = ["example.com"]
|
||||
|
|
@ -2020,7 +2025,7 @@ class EnhanceTest(test_util.ConfigTestCase):
|
|||
assert mock_client.enhance_config.called
|
||||
assert all(getattr(mock_client.config, e) for e in req_enh)
|
||||
assert not any(getattr(mock_client.config, e) for e in not_req_enh)
|
||||
assert san.DNSName("example.com") in mock_client.enhance_config.call_args[0][0]
|
||||
assert "example.com" in mock_client.enhance_config.call_args[0][0]
|
||||
|
||||
@mock.patch('certbot._internal.cert_manager.lineage_for_certname')
|
||||
@mock.patch('certbot._internal.main.display_ops.choose_values')
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ import pytest
|
|||
import certbot
|
||||
from certbot import errors
|
||||
from certbot._internal.storage import ALL_FOUR
|
||||
from certbot._internal import san
|
||||
from certbot.compat import filesystem
|
||||
from certbot.compat import os
|
||||
import certbot.tests.util as test_util
|
||||
|
|
@ -433,17 +432,17 @@ class RenewableCertTests(BaseRenewableCertTest):
|
|||
else:
|
||||
assert not self.test_rc.has_pending_deployment()
|
||||
|
||||
def test_sans(self):
|
||||
def test_names(self):
|
||||
# Trying the current version
|
||||
self._write_out_kind("cert", 12, test_util.load_vector("cert-san_512.pem"))
|
||||
|
||||
assert self.test_rc.sans() == \
|
||||
[san.DNSName("example.com"), san.DNSName("www.example.com")]
|
||||
assert self.test_rc.names() == \
|
||||
["example.com", "www.example.com"]
|
||||
|
||||
# Trying missing cert
|
||||
os.unlink(self.test_rc.cert)
|
||||
with pytest.raises(errors.CertStorageError):
|
||||
self.test_rc.sans()
|
||||
self.test_rc.names()
|
||||
|
||||
def test_autorenewal_is_enabled(self):
|
||||
self.test_rc.configuration["renewalparams"] = {}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ from typing import Optional
|
|||
from urllib import parse
|
||||
|
||||
from certbot import errors
|
||||
from certbot import util
|
||||
from certbot._internal import constants
|
||||
from certbot.compat import misc
|
||||
from certbot.compat import os
|
||||
|
|
@ -482,6 +483,12 @@ def _check_config_sanity(config: NamespaceConfig) -> None:
|
|||
"Trying to run http-01 and https-port "
|
||||
"on the same port ({0})".format(config.https_port))
|
||||
|
||||
# Domain checks
|
||||
if config.namespace.domains is not None:
|
||||
for domain in config.namespace.domains:
|
||||
# This may be redundant, but let's be paranoid
|
||||
util.enforce_domain_sanity(domain)
|
||||
|
||||
|
||||
def _is_immutable(value: Any) -> bool:
|
||||
"""Is value of an immutable type?"""
|
||||
|
|
|
|||
|
|
@ -21,7 +21,6 @@ import configargparse
|
|||
from certbot import errors
|
||||
from certbot._internal import constants
|
||||
from certbot._internal import lock
|
||||
from certbot._internal import san
|
||||
from certbot.compat import filesystem
|
||||
from certbot.compat import os
|
||||
|
||||
|
|
@ -577,8 +576,7 @@ def enforce_le_validity(domain: str) -> str:
|
|||
|
||||
"""
|
||||
|
||||
# Do basic validation on a DNSName
|
||||
domain = san.DNSName(domain).dns_name
|
||||
domain = enforce_domain_sanity(domain)
|
||||
if not re.match("^[A-Za-z0-9.-]*$", domain):
|
||||
raise errors.ConfigurationError(
|
||||
"{0} contains an invalid character. "
|
||||
|
|
|
|||
Loading…
Reference in a new issue