This commit is contained in:
Will Greenberg 2026-05-19 13:10:09 -07:00 committed by GitHub
commit 616e5694e2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 518 additions and 117 deletions

View file

@ -1,4 +1,5 @@
"""Tests for acme.challenges."""
import copy
import sys
from typing import TYPE_CHECKING
import unittest
@ -144,6 +145,63 @@ class DNS01Test(unittest.TestCase):
hash(DNS01.from_json(self.jmsg))
class DNSPersist01Test(unittest.TestCase):
def setUp(self):
from acme.challenges import DNSPersist01
self.jmsg = {
'type': 'dns-persist-01',
'accounturi': 'https://ca.example/acct/123',
'issuer-domain-names': ['authority.example', 'ca.example.net']
}
self.msg = DNSPersist01(
account_uri=self.jmsg['accounturi'],
issuer_domain_names=tuple(self.jmsg['issuer-domain-names']))
def test_to_partial_json(self):
expected_json = copy.deepcopy(self.jmsg)
# josepy converts lists into tuples
expected_json['issuer-domain-names'] = tuple(expected_json['issuer-domain-names'])
assert expected_json == self.msg.to_partial_json()
def test_from_json_hashable(self):
from acme.challenges import DNSPersist01
hash(DNSPersist01.from_json(self.jmsg))
def test_from_json(self):
from acme.challenges import DNSPersist01
assert self.msg == DNSPersist01.from_json(self.jmsg)
def test_get_rdata(self):
expected_rdata = 'authority.example; accounturi=https://ca.example/acct/123'
assert expected_rdata == self.msg.get_validation_rdata(False)
assert expected_rdata + '; policy=wildcard' == \
self.msg.get_validation_rdata(True)
def test_validation_name(self):
assert '_validation-persist.example.com' == \
self.msg.validation_domain_name('example.com')
assert '_validation-persist.example.com' == \
self.msg.validation_domain_name(name='*.example.com')
def test_response(self):
from acme.challenges import DNSPersist01Response
assert isinstance(self.msg.response(), DNSPersist01Response)
class DNSPersist01ResponseTest(unittest.TestCase):
def setUp(self):
from acme.challenges import DNSPersist01Response
self.response = DNSPersist01Response()
def test_truthiness(self):
assert self.response
def test_empty_json(self):
assert {} == self.response
class HTTP01ResponseTest(unittest.TestCase):
def setUp(self):

View file

@ -1,5 +1,6 @@
"""Tests for acme.util."""
import sys
import unittest
import pytest
@ -11,5 +12,23 @@ def test_it():
assert {2: 2, 4: 4} == map_keys({1: 2, 3: 4}, lambda x: x + 1)
class IsWildcardDomainTest(unittest.TestCase):
"""Tests for is_wildcard_domain."""
def setUp(self):
self.wildcard = u"*.example.org"
self.no_wildcard = u"example.org"
def _call(self, domain):
from acme.util import is_wildcard_domain
return is_wildcard_domain(domain)
def test_no_wildcard(self):
assert not self._call(self.no_wildcard)
def test_wildcard(self):
assert self._call(self.wildcard)
if __name__ == '__main__':
sys.exit(pytest.main(sys.argv[1:] + [__file__])) # pragma: no cover

View file

@ -11,6 +11,7 @@ from typing import Optional
from typing import TypeVar
from typing import Union
from acme import util
from cryptography.hazmat.primitives import hashes
import josepy as jose
import requests
@ -476,3 +477,59 @@ class DNSResponse(ChallengeResponse):
"""
return chall.check_validation(self.validation, account_public_key)
@ChallengeResponse.register
class DNSPersist01Response(ChallengeResponse):
"""ACME "dns-persist-01" challenge response."""
def __bool__(self) -> bool:
# Because this response yields an empty JSON object whose __len__() == 0, manually set a
# truthy value
return True
@Challenge.register
class DNSPersist01(Challenge):
"""ACME "dns-persist-01" challenge"""
typ = "dns-persist-01"
LABEL = "_validation-persist"
"""Label clients prepend to the domain name being validated."""
account_uri: str = jose.field("accounturi")
issuer_domain_names: tuple[str] = jose.field("issuer-domain-names")
def get_validation_rdata(self, is_wildcard: bool) -> str:
"""Validation TXT record rdata.
:param bool is_wildcard: Whether to set policy=wildcard.
:rtype: str
"""
parts = [
self.issuer_domain_names[0],
"accounturi={0}".format(self.account_uri),
]
if is_wildcard:
parts.append("policy=wildcard")
return '; '.join(parts)
def validation_domain_name(self, name: str) -> str:
"""Domain name for TXT validation record.
:param str name: Domain name being validated.
:rtype: str
"""
if util.is_wildcard_domain(name):
name = name.removeprefix('*.')
return "{0}.{1}".format(self.LABEL, name)
def response(self) -> DNSPersist01Response:
"""ACME "dns-persist-01" challenge response object.
:rtype: DNSPersist01Response
"""
return DNSPersist01Response()

View file

@ -7,3 +7,16 @@ from typing import Mapping
def map_keys(dikt: Mapping[Any, Any], func: Callable[[Any], Any]) -> dict[Any, Any]:
"""Map dictionary keys."""
return {func(key): value for key, value in dikt.items()}
def is_wildcard_domain(domain: str) -> bool:
""""Is domain a wildcard domain?
:param domain: domain to check
:type domain: `str`
:returns: True if domain is a wildcard, otherwise, False
:rtype: bool
"""
return domain.startswith("*.")

View file

@ -3,6 +3,7 @@ import os
import shutil
import sys
import tempfile
import textwrap
from typing import Iterable
import pytest
@ -38,30 +39,43 @@ class IntegrationTestsContext:
os.close(probe[0])
self.hook_probe = probe[1]
self.manual_dns_auth_hook = (
'{0} -c "import os; import requests; import json; '
"assert not os.environ.get('CERTBOT_DOMAIN').startswith('fail'); "
"data = {{'host':'_acme-challenge.{{0}}.'.format(os.environ.get('CERTBOT_DOMAIN')),"
"'value':os.environ.get('CERTBOT_VALIDATION')}}; "
"request = requests.post('{1}/set-txt', data=json.dumps(data)); "
"request.raise_for_status(); "
'"'
).format(sys.executable, self.challtestsrv_url)
self.manual_dns_auth_hook_allow_fail = (
'{0} -c "import os; import requests; import json; '
"data = {{'host':'_acme-challenge.{{0}}.'.format(os.environ.get('CERTBOT_DOMAIN')),"
"'value':os.environ.get('CERTBOT_VALIDATION')}}; "
"request = requests.post('{1}/set-txt', data=json.dumps(data)); "
"request.raise_for_status(); "
'"'
).format(sys.executable, self.challtestsrv_url)
self.manual_dns_cleanup_hook = (
'{0} -c "import os; import requests; import json; '
"data = {{'host':'_acme-challenge.{{0}}.'.format(os.environ.get('CERTBOT_DOMAIN'))}}; "
"request = requests.post('{1}/clear-txt', data=json.dumps(data)); "
"request.raise_for_status(); "
'"'
).format(sys.executable, self.challtestsrv_url)
self.manual_dns_auth_hook = self.generate_dns_auth_hook('_acme-challenge', True)
self.manual_dns_auth_hook_allow_fail = self.generate_dns_auth_hook('_acme-challenge', False)
self.manual_dns_cleanup_hook = self.generate_dns_cleanup_hook('_acme-challenge')
self.manual_dns_persist_setup_hook = self.generate_dns_auth_hook(
'_validation-persist', True)
def generate_dns_auth_hook(self, challenge_subdomain: str, fail_on_subdomain: bool) -> str:
"""Generates a python one-liner script which sets a DNS challenge TXT record challtestsrv
URL, and optionally fails if the subdomain starts with the word "fail" to simulate a faulty
script"""
script = textwrap.dedent(f"""\
import os
import requests
import json
domain = os.environ.get('CERTBOT_DOMAIN')
{"assert not domain.startswith('fail')" if fail_on_subdomain else "# no-op"}
validation = os.environ.get('CERTBOT_VALIDATION')
data = {{'host':'{challenge_subdomain}.{{0}}.'.format(domain), 'value': validation}}
request = requests.post('{self.challtestsrv_url}/set-txt', data=json.dumps(data))
request.raise_for_status()
""")
return f'{sys.executable} -c "{script}"'
def generate_dns_cleanup_hook(self, challenge_subdomain: str) -> str:
"""Generates a python one-liner script which cleans up the TXT record made by
`generate_dns_auth_hook`"""
script = textwrap.dedent(f"""\
import os
import requests
import json
domain = os.environ.get('CERTBOT_DOMAIN')
data = {{'host':'{challenge_subdomain}.{{0}}.'.format(domain)}}
request = requests.post('{self.challtestsrv_url}/clear-txt', data=json.dumps(data))
request.raise_for_status()
""")
return f'{sys.executable} -c "{script}"'
def cleanup(self) -> None:
"""Cleanup the integration test context."""

View file

@ -206,6 +206,29 @@ def test_manual_dns_auth(context: IntegrationTestsContext) -> None:
assert_cert_count_for_lineage(context.config_dir, certname, 2)
def test_manual_dns_persist_auth(context: IntegrationTestsContext) -> None:
"""Test the DNS-PERSIST-01 challenge using manual plugin."""
certname = context.get_domain('dns-persist')
context.certbot([
'-a', 'manual', '-d', certname, '--preferred-challenges', 'dns-persist',
'run', '--cert-name', certname,
'--manual-setup-hook', context.manual_dns_persist_setup_hook,
'--deploy-hook', misc.echo('deploy', context.hook_probe),
])
assert_hook_execution(context.hook_probe, 'deploy')
assert_saved_deploy_hook(context.config_dir, certname)
assert_cert_count_for_lineage(context.config_dir, certname, 1)
# test renewal with a no-op auth hook, as per our docs
context.certbot([
'renew', '--cert-name', certname, '--authenticator', 'manual',
'--manual-auth-hook', '/bin/true'
])
assert_cert_count_for_lineage(context.config_dir, certname, 2)
def test_certonly(context: IntegrationTestsContext) -> None:
"""Test the certonly verb on certbot."""
context.certbot(['certonly', '--cert-name', 'newname', '-d', context.get_domain('newname')])

View file

@ -15,7 +15,7 @@ import requests
from certbot_integration_tests.utils.constants import DEFAULT_HTTP_01_PORT
from certbot_integration_tests.utils.constants import MOCK_OCSP_SERVER_PORT
PEBBLE_VERSION = 'v2.8.0'
PEBBLE_VERSION = 'v2.10.1'
def fetch(workspace: str, http_01_port: int = DEFAULT_HTTP_01_PORT) -> tuple[str, str, str]:

View file

@ -70,23 +70,25 @@ standalone_ Y N | Uses a "standalone" webserver to obtain a certificate.
|dns_plugs| Y N | This category of plugins automates obtaining a certificate by dns-01_ (53)
| modifying DNS records to prove you have control over a
| domain. Doing domain validation in this way is
| the only way to obtain wildcard certificates from Let's
| one way to obtain wildcard certificates from Let's
| Encrypt.
manual_ Y N | Obtain a certificate by manually following instructions to http-01_ (80) or
| perform domain validation yourself. Certificates created this dns-01_ (53)
| way do not support autorenewal.
manual_ Y N | Obtain a certificate by manually following instructions to http-01_ (80),
| perform domain validation yourself. Certificates created this dns-01_ (53),
| way do not support autorenewal. or dns-persist-01_ (53)
| Autorenewal may be enabled by providing an authentication
| hook script to automate the domain validation steps.
| hook script to automate the domain validation steps. Using
| the ``dns-persist-01`` challenge type is another way to
| obtain wildcard certificates from Let's Encrypt.
=========== ==== ==== =============================================================== =============================
.. |dns_plugs| replace:: :ref:`DNS plugins <dns_plugins>`
Under the hood, plugins use one of several ACME protocol challenges_ to
prove you control a domain. The options are http-01_ (which uses port 80)
and dns-01_ (requiring configuration of a DNS server on
port 53, though that's often not the same machine as your webserver). A few
plugins support more than one challenge type, in which case you can choose one
with ``--preferred-challenges``.
prove you control a domain. The challenge options are http-01_ (which uses port 80),
and the two DNS-based challenges: dns-01_, or dns-persist-01_ (which both
require configuration of a DNS server on port 53, though that may not be the
same machine as your webserver). A few plugins support more than one challenge
type, in which case you can choose one with ``--preferred-challenges``.
There are also many third-party-plugins_ available. Below we describe in more detail
the circumstances in which each plugin can be used, and how to use it.
@ -94,6 +96,7 @@ the circumstances in which each plugin can be used, and how to use it.
.. _challenges: https://datatracker.ietf.org/doc/html/rfc8555#section-8
.. _http-01: https://datatracker.ietf.org/doc/html/rfc8555#section-8.3
.. _dns-01: https://datatracker.ietf.org/doc/html/rfc8555#section-8.4
.. _dns-persist-01: https://datatracker.ietf.org/doc/draft-ietf-acme-dns-persist/
Apache
------
@ -230,34 +233,86 @@ the UI, you can use the plugin to obtain a certificate by specifying
to copy and paste commands into another terminal session, which may
be on a different computer.
The manual plugin can use either the ``http`` or the ``dns`` challenge. You can use the ``--preferred-challenges`` option
to choose the challenge of your preference.
The manual plugin can use either the ``http``, ``dns``, or ``dns-persist``
challenges. You can use the ``--preferred-challenges`` option to choose the
challenge of your preference.
The ``http`` challenge will ask you to place a file with a specific name and
specific content in the ``/.well-known/acme-challenge/`` directory directly
in the top-level directory (“web root”) containing the files served by your
webserver. In essence it's the same as the webroot_ plugin, but not automated.
When using the ``dns`` challenge, ``certbot`` will ask you to place a TXT DNS
record with specific contents under the domain name consisting of the hostname
for which you want a certificate issued, prepended by ``_acme-challenge``.
When using the ``dns`` or ``dns-persist`` challenges, ``certbot`` will ask you to
place a TXT DNS record with specific contents under the domain name consisting
of the hostname for which you want a certificate issued, prepended by a
subdomain (either ``_acme-challenge`` or ``_validation-persist``, depending on
the challenge).
For example, for the domain ``example.com``, a zone file entry would look like:
To perform a ``dns`` challenge with the manual plugin, you'd invoke Certbot like this:
::
certbot certonly --preferred-challenges dns -d example.com --manual
The CA would then provide Certbot with a single-use validation token, which you (or
your auth hook script) would then use to create a TXT record at the subdomain
``_acme-challenge``, like this:
::
_acme-challenge.example.com. 300 IN TXT "gfj9Xq...Rg85nM"
This validation token is only valid for this challenge, and once the CA
validates it, the TXT record can be removed.
The ``dns-persist`` challenge is different, and allows you to create a single
long-lived DNS record for certificate issuance and renewals. To perform a
``dns-persist`` challenge with the manual plugin, you'd invoke Certbot like
this:
::
certbot certonly --preferred-challenges dns-persist -d example.com --manual
Afterward, you (or your auth hook script) will be prompted to create a TXT record like this:
::
_validation-persist.example.com. IN TXT "authority.example; accounturi=https://ca.example/acct/123"
This record can persist indefinitely, and as long as it's available, any future
certificate issuances at that subdomain will automatically be approved by the
CA. As such, ``dns-persist`` allows for easy automated certificate renewals.
However, because Certbot's manual plugin requires manual intervention unless an
auth hook script is provided, fully automated renewals with ``dns-persist``
requires some sort of dummy auth hook, like this:
::
certbot certonly --preferred-challenges dns-persist-01 -d example.com --manual --manual-auth-hook=/bin/true
The ``dns-persist`` challenge type also supports the issuance of wildcard
certificates. If the manual plugin detects that you're issuing a certificate for
a wildcard domain (e.g. ``*.example.com``), you (or your auth hook) will be
instructed to add ``policy=wildcard`` to the TXT record for the wildcard's
parent domain (e.g. ``example.com``).
.. _manual-renewal:
**Renewal with the manual plugin**
Certificates created using ``--manual`` **do not** support automatic renewal unless
combined with an `authentication hook script <#hooks>`_ via ``--manual-auth-hook``
to automatically set up the required HTTP and/or TXT challenges.
Certificates created using ``--manual`` only support automatic renewal with the
``dns-persist`` challenge, or when combined with an `authentication hook script
<#hooks>`_ via ``--manual-auth-hook`` to automatically set up the required HTTP
and/or TXT challenges.
If you can use one of the other plugins_ which support autorenewal to create
your certificate, doing so is highly recommended.
If your certificate was issued via the ``dns-persist`` challenge, automated renewal
is possible by providing a dummy auth hook, for example:
::
certbot certonly --preferred-challenges dns-persist-01 -d example.com --manual --manual-auth-hook=/bin/true
To manually renew a certificate using ``--manual`` without hooks, repeat the same
``certbot --manual`` command you used to create the certificate originally. As this

View file

@ -17,6 +17,7 @@ from certbot import achallenges
from certbot import configuration
from certbot import errors
from certbot import interfaces
from certbot import util
from certbot._internal import error_handler
from certbot._internal.account import Account
from certbot.display import util as display_util
@ -338,25 +339,30 @@ class AuthHandler:
if config.verbose_count > 0:
msg = []
http01_achalls = {}
dns01_achalls = {}
dns_achalls = {}
for achall in achalls:
if isinstance(achall.chall, challenges.HTTP01):
http01_achalls[achall.chall.uri(achall.identifier.value)] = (
achall.validation(achall.account_key) + "\n"
)
if isinstance(achall.chall, challenges.DNS01):
dns01_achalls[achall.validation_domain_name(achall.identifier.value)] = (
dns_achalls[achall.validation_domain_name(achall.identifier.value)] = (
achall.validation(achall.account_key) + "\n"
)
if isinstance(achall.chall, challenges.DNSPersist01):
is_wildcard = util.is_wildcard_domain(achall.identifier.value)
dns_achalls[achall.validation_domain_name(achall.identifier.value)] = (
achall.get_validation_rdata(is_wildcard) + "\n"
)
if http01_achalls:
msg.append("The following URLs should be accessible from the "
"internet and return the value mentioned:\n")
for uri, key_authz in http01_achalls.items():
msg.append(f"URL: {uri}\nExpected value: {key_authz}")
if dns01_achalls:
if dns_achalls:
msg.append("The following FQDNs should return a TXT resource "
"record with the value mentioned:\n")
for fqdn, key_authz_hash in dns01_achalls.items():
for fqdn, key_authz_hash in dns_achalls.items():
msg.append(f"FQDN: {fqdn}\nExpected value: {key_authz_hash}")
return "\n" + "\n".join(msg)
else:
@ -383,6 +389,8 @@ def challb_to_achall(challb: messages.ChallengeBody, account_key: josepy.JWK,
challb=challb, account_key=account_key, identifier=identifier)
elif isinstance(chall, challenges.DNS):
return achallenges.DNS(challb=challb, identifier=identifier)
elif isinstance(chall, challenges.DNSPersist01):
return achallenges.DNSPersist(challb=challb, identifier=identifier)
else:
return achallenges.Other(challb=challb, identifier=identifier)

View file

@ -194,7 +194,11 @@ def parse_preferred_challenges(pref_challs: Iterable[str]) -> list[str]:
:raises errors.Error: if pref_challs is invalid
"""
aliases = {"dns": "dns-01", "http": "http-01"}
aliases = {
"dns": "dns-01",
"dns-persist": "dns-persist-01",
"http": "http-01",
}
challs = [c.strip() for c in pref_challs]
challs = [aliases.get(c, c) for c in challs]

View file

@ -37,15 +37,16 @@ class Authenticator(common.Plugin, interfaces.Authenticator):
'When using shell scripts, an authenticator script must be provided. '
'The environment variables available to this script depend on the '
'type of challenge. $CERTBOT_IDENTIFIER will always contain the domain or IP address '
'being authenticated. For HTTP-01 and DNS-01, $CERTBOT_VALIDATION '
'being authenticated. For HTTP-01, DNS-01, and DNS-PERSIST-01, $CERTBOT_VALIDATION '
'is the validation string, and $CERTBOT_TOKEN is the filename of the '
'resource requested when performing an HTTP-01 challenge. An additional '
'cleanup script can also be provided and can use the additional variable '
'$CERTBOT_AUTH_OUTPUT which contains the stdout output from the auth script. '
'For both authenticator and cleanup script, on HTTP-01 and DNS-01 challenges, '
'$CERTBOT_REMAINING_CHALLENGES will be equal to the number of challenges that '
'remain after the current one, and $CERTBOT_ALL_IDENTIFIERS contains a comma-separated '
'list of all identifiers that are challenged for the current certificate.')
'For both authenticator and cleanup script, on HTTP-01, DNS-01, and DNS-PERSIST-01 '
'challenges, $CERTBOT_REMAINING_CHALLENGES will be equal to the number of challenges '
'that remain after the current one, and $CERTBOT_ALL_IDENTIFIERS contains a '
'comma-separated list of all identifiers that are challenged for the current '
'certificate.')
# Include the full stop at the end of the FQDN in the instructions below for the null
# label of the DNS root, as stated in section 3.1 of RFC 1035. While not necessary
# for most day to day usage of hostnames, when adding FQDNs to a DNS zone editor, this
@ -61,6 +62,11 @@ Please deploy a DNS TXT record under the name:
with the following value:
{validation}
"""
_DNS_RDATA_TOO_LONG = """
WARNING: Because the above DNS record's value is longer than 255 bytes, you will
need to either split it into multiple substrings, or verify that your DNS provider
does so automatically.
"""
_DNS_VERIFY_INSTRUCTIONS = """
Before continuing, verify the TXT record has been deployed. Depending on the DNS
@ -104,18 +110,15 @@ permitted by DNS standards.)
help='Path or command to execute for the authentication script')
add('cleanup-hook',
help='Path or command to execute for the cleanup script')
add('setup-hook',
help='Path or command to execute just this once. Only used for dns-persist challenges')
def prepare(self) -> None: # pylint: disable=missing-function-docstring
if self.config.noninteractive_mode and not self.conf('auth-hook'):
raise errors.PluginError(
'An authentication script must be provided with --{0} when '
'using the manual plugin non-interactively.'.format(
self.option_name('auth-hook')))
self._validate_hooks()
def _validate_hooks(self) -> None:
if self.config.validate_hooks:
for name in ('auth-hook', 'cleanup-hook'):
for name in ('auth-hook', 'cleanup-hook', 'setup-hook'):
hook = self.conf(name)
if hook is not None:
hook_prefix = self.option_name(name)[:-len('-hook')]
@ -128,13 +131,13 @@ permitted by DNS standards.)
'the user or by performing the setup manually.')
def auth_hint(self, failed_achalls: Iterable[achallenges.AnnotatedChallenge]) -> str:
def has_chall(cls: type[challenges.Challenge]) -> bool:
def has_chall(cls: tuple[type[challenges.Challenge], ...]) -> bool:
return any(isinstance(achall.chall, cls) for achall in failed_achalls)
has_dns = has_chall(challenges.DNS01)
has_dns = has_chall((challenges.DNS01, challenges.DNSPersist01))
resource_names = {
challenges.DNS01: 'DNS TXT records',
challenges.HTTP01: 'challenge files',
(challenges.DNS01, challenges.DNSPersist01): 'DNS TXT records',
(challenges.HTTP01,): 'challenge files',
}
resources = ' and '.join(sorted([v for k, v in resource_names.items() if has_chall(k)]))
@ -165,30 +168,84 @@ permitted by DNS standards.)
def get_chall_pref(self, identifier: str) -> Iterable[type[challenges.Challenge]]:
# pylint: disable=unused-argument,missing-function-docstring
return [challenges.HTTP01, challenges.DNS01]
return [challenges.HTTP01, challenges.DNS01, challenges.DNSPersist01]
def _check_hook_achall_match(self, achalls: list[achallenges.AnnotatedChallenge]) -> None:
"""Checks whether the user has provided the appropriate hook type for the given list of
challenges, and emits a helpful warning if not. Raises an exception if we're in
non-interactive mode, have at least 1 non-DNS-PERSIST-01 challenge, and no auth hook
was provided."""
num_dns_persist_achalls = sum(1 for achall in achalls \
if isinstance(achall.chall, challenges.DNSPersist01))
num_other_achalls = len(achalls) - num_dns_persist_achalls
auth_hook_provided = self.conf('auth-hook')
setup_hook_provided = self.conf('setup-hook')
if num_other_achalls > 0 and self.config.noninteractive_mode and not auth_hook_provided:
raise errors.PluginError(
'An authentication script must be provided with --{0} when using the manual '
'plugin non-interactively on HTTP-01 or DNS-01 challenges.'.format(
self.option_name('auth-hook')))
if num_dns_persist_achalls == 0 and setup_hook_provided:
msg = ('A setup script was provided with --{0}, but because no DNS-PERSIST-01 '
'challenges are being attempted, it will be ignored.').format(
self.option_name('setup-hook'))
if not auth_hook_provided:
msg += ' Did you mean to use --{0} instead?'.format(
self.option_name('auth-hook'))
logger.warning(msg)
if num_other_achalls == 0 and auth_hook_provided:
msg = ('An authentication script was provided with --{0}, but because only '
'DNS-PERSIST-01 challenges are being attempted, it will be ignored.').format(
self.option_name('auth-hook'))
if not setup_hook_provided:
msg += ' Did you mean to use --{0} instead?'.format(
self.option_name('setup-hook'))
logger.warning(msg)
def _achall_has_hook(self, achall: achallenges.AnnotatedChallenge) -> bool:
if isinstance(achall.chall, challenges.DNSPersist01):
return self.conf('setup-hook')
else:
return self.conf('auth-hook')
def perform(self, achalls: list[achallenges.AnnotatedChallenge]
) -> list[challenges.ChallengeResponse]: # pylint: disable=missing-function-docstring
self._check_hook_achall_match(achalls)
responses = []
last_dns_achall = 0
for i, achall in enumerate(achalls):
if isinstance(achall.chall, challenges.DNS01):
if isinstance(achall.chall, (challenges.DNS01, challenges.DNSPersist01)):
last_dns_achall = i
for i, achall in enumerate(achalls):
if self.conf('auth-hook'):
if self._achall_has_hook(achall):
self._perform_achall_with_script(achall, achalls)
else:
self._perform_achall_manually(achall, i == last_dns_achall)
responses.append(achall.response(achall.account_key))
responses.append(self._get_response(achall))
return responses
def _get_response(self, achall: achallenges.AnnotatedChallenge) -> challenges.ChallengeResponse:
if isinstance(achall.chall, (challenges.HTTP01, challenges.DNS01)):
return achall.response(achall.account_key)
else:
assert isinstance(achall.chall, challenges.DNSPersist01)
return achall.response()
def _get_validation(self, achall: achallenges.AnnotatedChallenge) -> str:
if isinstance(achall.chall, (challenges.HTTP01, challenges.DNS01)):
return achall.validation(achall.account_key)
else:
assert isinstance(achall.chall, challenges.DNSPersist01)
is_wildcard = util.is_wildcard_domain(achall.identifier.value)
return achall.get_validation_rdata(is_wildcard)
def _perform_achall_with_script(self, achall: achallenges.AnnotatedChallenge,
achalls: list[achallenges.AnnotatedChallenge]) -> None:
identifier_value = achall.identifier.value
env = {
"CERTBOT_DOMAIN": identifier_value,
"CERTBOT_IDENTIFIER": identifier_value,
"CERTBOT_VALIDATION": achall.validation(achall.account_key),
"CERTBOT_VALIDATION": self._get_validation(achall),
"CERTBOT_ALL_DOMAINS": ','.join(one_achall.identifier.value for one_achall in achalls),
"CERTBOT_ALL_IDENTIFIERS":
','.join(one_achall.identifier.value for one_achall in achalls),
@ -199,36 +256,42 @@ permitted by DNS standards.)
else:
os.environ.pop('CERTBOT_TOKEN', None)
os.environ.update(env)
_, out = self._execute_hook('auth-hook', identifier_value)
if isinstance(achall.chall, challenges.DNSPersist01):
hook_name = 'setup-hook'
else:
hook_name = 'auth-hook'
_, out = self._execute_hook(hook_name, identifier_value)
env['CERTBOT_AUTH_OUTPUT'] = out.strip()
self.env[achall] = env
def _perform_achall_manually(self, achall: achallenges.AnnotatedChallenge,
last_dns_achall: bool = False) -> None:
identifier_value = achall.identifier.value
validation = achall.validation(achall.account_key)
validation = self._get_validation(achall)
if isinstance(achall.chall, challenges.HTTP01):
msg = self._HTTP_INSTRUCTIONS.format(
achall=achall, encoded_token=achall.chall.encode('token'),
port=self.config.http01_port,
uri=achall.chall.uri(identifier_value), validation=validation)
else:
assert isinstance(achall.chall, challenges.DNS01)
assert isinstance(achall.chall, (challenges.DNS01, challenges.DNSPersist01))
assert achall.identifier.typ == messages.IDENTIFIER_FQDN
msg = self._DNS_INSTRUCTIONS.format(
domain=achall.validation_domain_name(identifier_value),
validation=validation)
if isinstance(achall.chall, challenges.DNS01):
if len(validation) > 255:
msg += self._DNS_RDATA_TOO_LONG
if isinstance(achall.chall, (challenges.DNS01, challenges.DNSPersist01)):
if self.subsequent_dns_challenge:
# 2nd or later dns-01 challenge
# 2nd or later dns challenge
msg += self._SUBSEQUENT_DNS_CHALLENGE_INSTRUCTIONS
elif self.subsequent_any_challenge:
# 1st dns-01 challenge, but 2nd or later *any* challenge, so
# 1st dns challenge, but 2nd or later *any* challenge, so
# instruct user not to remove any previous http-01 challenge
msg += self._SUBSEQUENT_CHALLENGE_INSTRUCTIONS
self.subsequent_dns_challenge = True
if last_dns_achall:
# last dns-01 challenge
# last dns challenge
msg += self._DNS_VERIFY_INSTRUCTIONS.format(
domain=achall.validation_domain_name(identifier_value))
elif self.subsequent_any_challenge:
@ -240,6 +303,8 @@ permitted by DNS standards.)
def cleanup(self, achalls: Iterable[achallenges.AnnotatedChallenge]) -> None: # pylint: disable=missing-function-docstring
if self.conf('cleanup-hook'):
for achall in achalls:
if isinstance(achall.chall, challenges.DNSPersist01):
continue
env = self.env.pop(achall)
if 'CERTBOT_TOKEN' not in env:
os.environ.pop('CERTBOT_TOKEN', None)

View file

@ -280,6 +280,12 @@ def _relevant(namespaces: Iterable[str], option: str) -> bool:
"""
from certbot._internal import renewal
# an awkward special case: future renewals shouldn't depend on whether the user set
# --manual-setup-hook now, since it's meant to represent a (potentially) one-off script.
# if a user wants it to run again in the future, they must set it explicitly via CLI
if option == "manual_setup_hook":
return False
return (option in renewal.CONFIG_ITEMS or
any(option.startswith(namespace) for namespace in namespaces))

View file

@ -25,7 +25,23 @@ class AuthenticatorTest(test_util.TempDirTestCase):
self.http_achall = acme_util.HTTP01_A
self.dns_achall = acme_util.DNS01_A
self.dns_achall_2 = acme_util.DNS01_A_2
self.achalls = [self.http_achall, self.dns_achall, self.dns_achall_2]
self.dns_persist_achall = acme_util.DNS_PERSIST_01_A
self.dns_persist_achall_wildcard = acme_util.DNS_PERSIST_01_A_WILDCARD_A
self.dns_persist_achall_long = acme_util.DNS_PERSIST_01_LONG_A
self.achalls = [
self.http_achall,
self.dns_achall,
self.dns_achall_2,
self.dns_persist_achall,
self.dns_persist_achall_wildcard,
self.dns_persist_achall_long,
]
self.responses: list[challenges.ChallengeResponse] = []
for achall in self.achalls:
if isinstance(achall.chall, challenges.DNSPersist01):
self.responses.append(achall.response())
else:
self.responses.append(achall.response(achall.account_key))
for d in ["config_dir", "work_dir", "in_progress"]:
filesystem.mkdir(os.path.join(self.tempdir, d))
# "backup_dir" and "temp_checkpoint_dir" get created in
@ -33,8 +49,8 @@ class AuthenticatorTest(test_util.TempDirTestCase):
# initialization.
self.config = mock.MagicMock(
http01_port=0, manual_auth_hook=None, manual_cleanup_hook=None,
noninteractive_mode=False, validate_hooks=False,
config_dir=os.path.join(self.tempdir, "config_dir"),
manual_setup_hook=None, noninteractive_mode=False,
validate_hooks=False, config_dir=os.path.join(self.tempdir, "config_dir"),
work_dir=os.path.join(self.tempdir, "work_dir"),
backup_dir=os.path.join(self.tempdir, "backup_dir"),
temp_checkpoint_dir=os.path.join(
@ -44,10 +60,14 @@ class AuthenticatorTest(test_util.TempDirTestCase):
from certbot._internal.plugins.manual import Authenticator
self.auth = Authenticator(self.config, name='manual')
def test_prepare_no_hook_noninteractive(self):
def test_perform_no_hook_noninteractive(self):
self.config.noninteractive_mode = True
with pytest.raises(errors.PluginError):
self.auth.prepare()
_ = self.auth.perform(self.achalls)
dns_persist_achalls = [achall for achall in self.achalls \
if isinstance(achall.chall, challenges.DNSPersist01)]
assert len(dns_persist_achalls) == 3
_ = self.auth.perform(dns_persist_achalls)
def test_prepare_bad_hook(self):
self.config.manual_auth_hook = os.path.abspath(os.sep) # is / on UNIX
@ -60,43 +80,68 @@ class AuthenticatorTest(test_util.TempDirTestCase):
def test_get_chall_pref(self):
assert self.auth.get_chall_pref('example.org') == \
[challenges.HTTP01, challenges.DNS01]
[challenges.HTTP01, challenges.DNS01, challenges.DNSPersist01]
def test_script_perform(self):
self.config.manual_auth_hook = (
'{0} -c "'
'from certbot.compat import os;'
'print(os.environ.get(\'CERTBOT_DOMAIN\'));'
'print(os.environ.get(\'CERTBOT_IDENTIFIER\'));'
'print(os.environ.get(\'CERTBOT_TOKEN\', \'notoken\'));'
'print(os.environ.get(\'CERTBOT_VALIDATION\', \'novalidation\'));'
'print(os.environ.get(\'CERTBOT_ALL_DOMAINS\'));'
'print(os.environ.get(\'CERTBOT_ALL_IDENTIFIERS\'));'
'print(os.environ.get(\'CERTBOT_REMAINING_CHALLENGES\'));"'
.format(sys.executable))
dns_expected = '{0}\n{1}\n{2}\n{3}\n{4}\n{5}\n{6}'.format(
script_template = textwrap.dedent("""
{0} -c "from certbot.compat import os
print(os.environ.get(\'CERTBOT_DOMAIN\'))
print(os.environ.get(\'CERTBOT_IDENTIFIER\'))
print(os.environ.get(\'CERTBOT_TOKEN\', \'notoken\'))
print(os.environ.get(\'CERTBOT_VALIDATION\', \'novalidation\'))
print(os.environ.get(\'CERTBOT_ALL_DOMAINS\'))
print(os.environ.get(\'CERTBOT_ALL_IDENTIFIERS\'))
print(os.environ.get(\'CERTBOT_REMAINING_CHALLENGES\'))
print('{1}')"
""")
self.config.manual_auth_hook = script_template.format(sys.executable, "auth_hook")
self.config.manual_setup_hook = script_template.format(sys.executable, "setup_hook")
dns_expected = '{0}\n{1}\n{2}\n{3}\n{4}\n{5}\n{6}\n{7}'.format(
self.dns_achall.identifier.value,
self.dns_achall.identifier.value,
'notoken',
self.dns_achall.validation(self.dns_achall.account_key),
','.join(achall.identifier.value for achall in self.achalls),
','.join(achall.identifier.value for achall in self.achalls),
len(self.achalls) - self.achalls.index(self.dns_achall) - 1)
http_expected = '{0}\n{1}\n{2}\n{3}\n{4}\n{5}\n{6}'.format(
len(self.achalls) - self.achalls.index(self.dns_achall) - 1,
"auth_hook")
http_expected = '{0}\n{1}\n{2}\n{3}\n{4}\n{5}\n{6}\n{7}'.format(
self.http_achall.identifier.value,
self.http_achall.identifier.value,
self.http_achall.chall.encode('token'),
self.http_achall.validation(self.http_achall.account_key),
','.join(achall.identifier.value for achall in self.achalls),
','.join(achall.identifier.value for achall in self.achalls),
len(self.achalls) - self.achalls.index(self.http_achall) - 1)
len(self.achalls) - self.achalls.index(self.http_achall) - 1,
"auth_hook")
dns_persist_expected = '{0}\n{1}\n{2}\n{3}\n{4}\n{5}\n{6}\n{7}'.format(
self.dns_persist_achall.identifier.value,
self.dns_persist_achall.identifier.value,
'notoken',
self.dns_persist_achall.get_validation_rdata(False),
','.join(achall.identifier.value for achall in self.achalls),
','.join(achall.identifier.value for achall in self.achalls),
len(self.achalls) - self.achalls.index(self.dns_persist_achall) - 1,
"setup_hook")
dns_persist_wildcard_expected = '{0}\n{1}\n{2}\n{3}\n{4}\n{5}\n{6}\n{7}'.format(
self.dns_persist_achall_wildcard.identifier.value,
self.dns_persist_achall_wildcard.identifier.value,
'notoken',
self.dns_persist_achall_wildcard.get_validation_rdata(True),
','.join(achall.identifier.value for achall in self.achalls),
','.join(achall.identifier.value for achall in self.achalls),
len(self.achalls) - self.achalls.index(self.dns_persist_achall_wildcard) - 1,
"setup_hook")
assert self.auth.perform(self.achalls) == \
[achall.response(achall.account_key) for achall in self.achalls]
assert self.auth.perform(self.achalls) == self.responses
assert self.auth.env[self.dns_achall]['CERTBOT_AUTH_OUTPUT'] == \
dns_expected
assert self.auth.env[self.http_achall]['CERTBOT_AUTH_OUTPUT'] == \
http_expected
assert self.auth.env[self.dns_persist_achall]['CERTBOT_AUTH_OUTPUT'] == \
dns_persist_expected
assert self.auth.env[self.dns_persist_achall_wildcard]['CERTBOT_AUTH_OUTPUT'] == \
dns_persist_wildcard_expected
# Successful hook output should be sent to notify
assert self.mock_get_display().notification.call_count == len(self.achalls)
@ -105,13 +150,17 @@ class AuthenticatorTest(test_util.TempDirTestCase):
assert needle in args[0]
def test_manual_perform(self):
assert self.auth.perform(self.achalls) == \
[achall.response(achall.account_key) for achall in self.achalls]
assert self.auth.perform(self.achalls) == self.responses
assert self.mock_get_display().notification.call_count == len(self.achalls)
for i, (args, kwargs) in enumerate(self.mock_get_display().notification.call_args_list):
achall = self.achalls[i]
assert achall.validation(achall.account_key) in args[0]
if isinstance(achall.chall, challenges.DNSPersist01):
assert achall.validation_domain_name(achall.identifier.value) in args[0]
else:
assert achall.validation(achall.account_key) in args[0]
if achall == self.dns_persist_achall_long:
assert "WARNING: Because the above DNS record's value is longer than 255 bytes" in args[0]
assert kwargs['wrap'] is False
def test_cleanup(self):
@ -121,6 +170,8 @@ class AuthenticatorTest(test_util.TempDirTestCase):
self.auth.perform(self.achalls)
for achall in self.achalls:
if isinstance(achall.chall, challenges.DNSPersist01):
continue
self.auth.cleanup([achall])
assert os.environ['CERTBOT_AUTH_OUTPUT'] == 'foo'
assert os.environ['CERTBOT_DOMAIN'] == achall.identifier.value
@ -136,21 +187,23 @@ class AuthenticatorTest(test_util.TempDirTestCase):
def test_auth_hint_hook(self):
self.config.manual_auth_hook = '/bin/true'
assert self.auth.auth_hint([acme_util.DNS01_A, acme_util.HTTP01_A]) == \
'The Certificate Authority failed to verify the DNS TXT records and challenge ' \
'files created by the --manual-auth-hook. Ensure that this hook is functioning ' \
'correctly and that it waits a sufficient duration of time for DNS propagation. ' \
'Refer to "certbot --help manual" and the Certbot User Guide.'
for dns_achall in [acme_util.DNS01_A, acme_util.DNS_PERSIST_01_A]:
assert self.auth.auth_hint([dns_achall, acme_util.HTTP01_A]) == \
'The Certificate Authority failed to verify the DNS TXT records and challenge ' \
'files created by the --manual-auth-hook. Ensure that this hook is functioning ' \
'correctly and that it waits a sufficient duration of time for DNS propagation. ' \
'Refer to "certbot --help manual" and the Certbot User Guide.'
assert self.auth.auth_hint([acme_util.HTTP01_A]) == \
'The Certificate Authority failed to verify the challenge files created by the ' \
'--manual-auth-hook. Ensure that this hook is functioning correctly. Refer to ' \
'"certbot --help manual" and the Certbot User Guide.'
def test_auth_hint_no_hook(self):
assert self.auth.auth_hint([acme_util.DNS01_A, acme_util.HTTP01_A]) == \
'The Certificate Authority failed to verify the manually created DNS TXT records ' \
'and challenge files. Ensure that you created these in the correct location, or ' \
'try waiting longer for DNS propagation on the next attempt.'
for dns_achall in [acme_util.DNS01_A, acme_util.DNS_PERSIST_01_A]:
assert self.auth.auth_hint([dns_achall, acme_util.HTTP01_A]) == \
'The Certificate Authority failed to verify the manually created DNS TXT records ' \
'and challenge files. Ensure that you created these in the correct location, or ' \
'try waiting longer for DNS propagation on the next attempt.'
assert self.auth.auth_hint([acme_util.HTTP01_A, acme_util.HTTP01_A, acme_util.HTTP01_A]) == \
'The Certificate Authority failed to verify the manually created challenge files. ' \
'Ensure that you created these in the correct location.'

View file

@ -112,6 +112,13 @@ class RelevantValuesTest(unittest.TestCase):
expected_relevant_values = self.values.copy()
assert self._call(self.values) == expected_relevant_values
def test_manual_hooks(self):
self.values["manual_setup_hook"] = '"# setup'
self.values["manual_auth_hook"] = '"# auth'
expected_relevant_values = self.values.copy()
del expected_relevant_values["manual_setup_hook"]
assert self._call(self.values) == expected_relevant_values
class BaseRenewableCertTest(test_util.ConfigTestCase):
"""Base class for setting up Renewable Cert tests.

View file

@ -100,6 +100,13 @@ class DNS(AnnotatedChallenge):
__slots__ = ('challb', 'domain', 'identifier') # pylint: disable=redefined-slots-in-subclass
acme_type = challenges.DNS
class DNSPersist(AnnotatedChallenge):
"""Client annotated "dns-persist" ACME challenge"""
__slots__ = ('challb', 'domain', 'identifier') # pylint: disable=redefined-slots-in-subclass
acme_type = challenges.DNSPersist01
class Other(AnnotatedChallenge):
"""Client annotated ACME challenge of an unknown type."""
__slots__ = ('challb', 'domain', 'identifier') # pylint: disable=redefined-slots-in-subclass

View file

@ -18,6 +18,12 @@ HTTP01 = challenges.HTTP01(
token=b"evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA")
DNS01 = challenges.DNS01(token=b"17817c66b60ce2e4012dfad92657527a")
DNS01_2 = challenges.DNS01(token=b"cafecafecafecafecafecafe0feedbac")
DNS_PERSIST_01 = challenges.DNSPersist01(
issuer_domain_names=('ca.example',),
account_uri='https://ca.example/acct/123')
DNS_PERSIST_01_LONG = challenges.DNSPersist01(
issuer_domain_names=('ca.example',),
account_uri=f"https://ca.example/acct/{'a' * 256}")
CHALLENGES = [HTTP01, DNS01]
@ -40,8 +46,8 @@ def chall_to_challb(chall: challenges.Challenge, status: messages.Status) -> mes
HTTP01_P = chall_to_challb(HTTP01, messages.STATUS_PENDING)
DNS01_P = chall_to_challb(DNS01, messages.STATUS_PENDING)
DNS01_P_2 = chall_to_challb(DNS01_2, messages.STATUS_PENDING)
CHALLENGES_P = [HTTP01_P, DNS01_P]
DNS_PERSIST_01_P = chall_to_challb(DNS_PERSIST_01, messages.STATUS_PENDING)
DNS_PERSIST_01_LONG_P = chall_to_challb(DNS_PERSIST_01_LONG, messages.STATUS_PENDING)
# AnnotatedChallenge objects
@ -51,8 +57,12 @@ DNS01_A = auth_handler.challb_to_achall(DNS01_P, JWK, messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value="example.org"))
DNS01_A_2 = auth_handler.challb_to_achall(DNS01_P_2, JWK, messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value="esimerkki.example.org"))
ACHALLENGES = [HTTP01_A, DNS01_A]
DNS_PERSIST_01_A = auth_handler.challb_to_achall(DNS_PERSIST_01_P, JWK,
messages.Identifier(typ=messages.IDENTIFIER_FQDN, value="example.net"))
DNS_PERSIST_01_LONG_A = auth_handler.challb_to_achall(DNS_PERSIST_01_LONG_P, JWK,
messages.Identifier(typ=messages.IDENTIFIER_FQDN, value="example.net"))
DNS_PERSIST_01_A_WILDCARD_A = auth_handler.challb_to_achall(DNS_PERSIST_01_P, JWK,
messages.Identifier(typ=messages.IDENTIFIER_FQDN, value="*.example.net"))
def gen_authzr(authz_status: messages.Status, domain: str, challs: Iterable[challenges.Challenge],

View file

@ -0,0 +1,2 @@
Add support for [`dns-persist-01`](https://datatracker.ietf.org/doc/draft-ietf-acme-dns-persist/) challenges to certbot's manual plugin. This challenge type allows users to verify control over a domain with a single DNS TXT record which can remain active and unchanged indefinitely, allowing for easy renewals when traditional challenge methods are impractical. `dns-persist-01` also allows for the issuance of wildcard certificates.