certbot: add dns-persist-01 challenge to manual plugin

In terms of UX, dns-persist-01 functions nearly identically to dns-01,
except once the TXT record has been made, no further action is required
from the user.
This commit is contained in:
Will Greenberg 2026-04-24 12:29:31 -07:00
parent 8af86a179e
commit 431ee89d3a
6 changed files with 106 additions and 31 deletions

View file

@ -383,6 +383,8 @@ def challb_to_achall(challb: messages.ChallengeBody, account_key: josepy.JWK,
challb=challb, account_key=account_key, identifier=identifier)
elif isinstance(chall, challenges.DNS):
return achallenges.DNS(challb=challb, identifier=identifier)
elif isinstance(chall, challenges.DNSPersist01):
return achallenges.DNSPersist(challb=challb, identifier=identifier)
else:
return achallenges.Other(challb=challb, identifier=identifier)

View file

@ -194,7 +194,11 @@ def parse_preferred_challenges(pref_challs: Iterable[str]) -> list[str]:
:raises errors.Error: if pref_challs is invalid
"""
aliases = {"dns": "dns-01", "http": "http-01"}
aliases = {
"dns": "dns-01",
"dns-persist": "dns-persist-01",
"http": "http-01",
}
challs = [c.strip() for c in pref_challs]
challs = [aliases.get(c, c) for c in challs]

View file

@ -37,15 +37,16 @@ class Authenticator(common.Plugin, interfaces.Authenticator):
'When using shell scripts, an authenticator script must be provided. '
'The environment variables available to this script depend on the '
'type of challenge. $CERTBOT_IDENTIFIER will always contain the domain or IP address '
'being authenticated. For HTTP-01 and DNS-01, $CERTBOT_VALIDATION '
'being authenticated. For HTTP-01, DNS-01, and DNS-PERSIST-01, $CERTBOT_VALIDATION '
'is the validation string, and $CERTBOT_TOKEN is the filename of the '
'resource requested when performing an HTTP-01 challenge. An additional '
'cleanup script can also be provided and can use the additional variable '
'$CERTBOT_AUTH_OUTPUT which contains the stdout output from the auth script. '
'For both authenticator and cleanup script, on HTTP-01 and DNS-01 challenges, '
'$CERTBOT_REMAINING_CHALLENGES will be equal to the number of challenges that '
'remain after the current one, and $CERTBOT_ALL_IDENTIFIERS contains a comma-separated '
'list of all identifiers that are challenged for the current certificate.')
'For both authenticator and cleanup script, on HTTP-01, DNS-01, and DNS-PERSIST-01 '
'challenges, $CERTBOT_REMAINING_CHALLENGES will be equal to the number of challenges '
'that remain after the current one, and $CERTBOT_ALL_IDENTIFIERS contains a '
'comma-separated list of all identifiers that are challenged for the current '
'certificate.')
# Include the full stop at the end of the FQDN in the instructions below for the null
# label of the DNS root, as stated in section 3.1 of RFC 1035. While not necessary
# for most day to day usage of hostnames, when adding FQDNs to a DNS zone editor, this
@ -61,6 +62,11 @@ Please deploy a DNS TXT record under the name:
with the following value:
{validation}
"""
_DNS_RDATA_TOO_LONG = """
WARNING: Because the above DNS record's value is longer than 255 bytes, you will
need to either split it into multiple substrings, or verify that your DNS provider
does so automatically.
"""
_DNS_VERIFY_INSTRUCTIONS = """
Before continuing, verify the TXT record has been deployed. Depending on the DNS
@ -128,13 +134,13 @@ permitted by DNS standards.)
'the user or by performing the setup manually.')
def auth_hint(self, failed_achalls: Iterable[achallenges.AnnotatedChallenge]) -> str:
def has_chall(cls: type[challenges.Challenge]) -> bool:
def has_chall(cls: tuple[type[challenges.Challenge], ...]) -> bool:
return any(isinstance(achall.chall, cls) for achall in failed_achalls)
has_dns = has_chall(challenges.DNS01)
has_dns = has_chall((challenges.DNS01, challenges.DNSPersist01))
resource_names = {
challenges.DNS01: 'DNS TXT records',
challenges.HTTP01: 'challenge files',
(challenges.DNS01, challenges.DNSPersist01): 'DNS TXT records',
(challenges.HTTP01,): 'challenge files',
}
resources = ' and '.join(sorted([v for k, v in resource_names.items() if has_chall(k)]))
@ -165,30 +171,45 @@ permitted by DNS standards.)
def get_chall_pref(self, identifier: str) -> Iterable[type[challenges.Challenge]]:
# pylint: disable=unused-argument,missing-function-docstring
return [challenges.HTTP01, challenges.DNS01]
return [challenges.DNSPersist01, challenges.HTTP01, challenges.DNS01]
def perform(self, achalls: list[achallenges.AnnotatedChallenge]
) -> list[challenges.ChallengeResponse]: # pylint: disable=missing-function-docstring
responses = []
last_dns_achall = 0
for i, achall in enumerate(achalls):
if isinstance(achall.chall, challenges.DNS01):
if isinstance(achall.chall, (challenges.DNS01, challenges.DNSPersist01)):
last_dns_achall = i
for i, achall in enumerate(achalls):
if self.conf('auth-hook'):
self._perform_achall_with_script(achall, achalls)
else:
self._perform_achall_manually(achall, i == last_dns_achall)
responses.append(achall.response(achall.account_key))
responses.append(self._get_response(achall))
return responses
def _get_response(self, achall: achallenges.AnnotatedChallenge) -> challenges.ChallengeResponse:
if isinstance(achall.chall, (challenges.HTTP01, challenges.DNS01)):
return achall.response(achall.account_key)
else:
assert isinstance(achall.chall, challenges.DNSPersist01)
return achall.response()
def _get_validation(self, achall: achallenges.AnnotatedChallenge) -> str:
if isinstance(achall.chall, (challenges.HTTP01, challenges.DNS01)):
return achall.validation(achall.account_key)
else:
assert isinstance(achall.chall, challenges.DNSPersist01)
is_wildcard = util.is_wildcard_domain(achall.identifier.value)
return achall.get_validation_rdata(is_wildcard)
def _perform_achall_with_script(self, achall: achallenges.AnnotatedChallenge,
achalls: list[achallenges.AnnotatedChallenge]) -> None:
identifier_value = achall.identifier.value
env = {
"CERTBOT_DOMAIN": identifier_value,
"CERTBOT_IDENTIFIER": identifier_value,
"CERTBOT_VALIDATION": achall.validation(achall.account_key),
"CERTBOT_VALIDATION": self._get_validation(achall),
"CERTBOT_ALL_DOMAINS": ','.join(one_achall.identifier.value for one_achall in achalls),
"CERTBOT_ALL_IDENTIFIERS":
','.join(one_achall.identifier.value for one_achall in achalls),
@ -206,29 +227,31 @@ permitted by DNS standards.)
def _perform_achall_manually(self, achall: achallenges.AnnotatedChallenge,
last_dns_achall: bool = False) -> None:
identifier_value = achall.identifier.value
validation = achall.validation(achall.account_key)
validation = self._get_validation(achall)
if isinstance(achall.chall, challenges.HTTP01):
msg = self._HTTP_INSTRUCTIONS.format(
achall=achall, encoded_token=achall.chall.encode('token'),
port=self.config.http01_port,
uri=achall.chall.uri(identifier_value), validation=validation)
else:
assert isinstance(achall.chall, challenges.DNS01)
assert isinstance(achall.chall, (challenges.DNS01, challenges.DNSPersist01))
assert achall.identifier.typ == messages.IDENTIFIER_FQDN
msg = self._DNS_INSTRUCTIONS.format(
domain=achall.validation_domain_name(identifier_value),
validation=validation)
if isinstance(achall.chall, challenges.DNS01):
if len(validation) > 255:
msg += self._DNS_RDATA_TOO_LONG
if isinstance(achall.chall, (challenges.DNS01, challenges.DNSPersist01)):
if self.subsequent_dns_challenge:
# 2nd or later dns-01 challenge
# 2nd or later dns challenge
msg += self._SUBSEQUENT_DNS_CHALLENGE_INSTRUCTIONS
elif self.subsequent_any_challenge:
# 1st dns-01 challenge, but 2nd or later *any* challenge, so
# 1st dns challenge, but 2nd or later *any* challenge, so
# instruct user not to remove any previous http-01 challenge
msg += self._SUBSEQUENT_CHALLENGE_INSTRUCTIONS
self.subsequent_dns_challenge = True
if last_dns_achall:
# last dns-01 challenge
# last dns challenge
msg += self._DNS_VERIFY_INSTRUCTIONS.format(
domain=achall.validation_domain_name(identifier_value))
elif self.subsequent_any_challenge:

View file

@ -25,7 +25,21 @@ class AuthenticatorTest(test_util.TempDirTestCase):
self.http_achall = acme_util.HTTP01_A
self.dns_achall = acme_util.DNS01_A
self.dns_achall_2 = acme_util.DNS01_A_2
self.achalls = [self.http_achall, self.dns_achall, self.dns_achall_2]
self.dns_persist_achall = acme_util.DNS_PERSIST_01_A
self.dns_persist_achall_wildcard = acme_util.DNS_PERSIST_01_A_WILDCARD
self.achalls = [
self.http_achall,
self.dns_achall,
self.dns_achall_2,
self.dns_persist_achall,
self.dns_persist_achall_wildcard,
]
self.responses: list[challenges.ChallengeResponse] = []
for achall in self.achalls:
if isinstance(achall.chall, challenges.DNSPersist01):
self.responses.append(achall.response())
else:
self.responses.append(achall.response(achall.account_key))
for d in ["config_dir", "work_dir", "in_progress"]:
filesystem.mkdir(os.path.join(self.tempdir, d))
# "backup_dir" and "temp_checkpoint_dir" get created in
@ -60,7 +74,7 @@ class AuthenticatorTest(test_util.TempDirTestCase):
def test_get_chall_pref(self):
assert self.auth.get_chall_pref('example.org') == \
[challenges.HTTP01, challenges.DNS01]
[challenges.DNSPersist01, challenges.HTTP01, challenges.DNS01]
def test_script_perform(self):
self.config.manual_auth_hook = (
@ -90,13 +104,32 @@ class AuthenticatorTest(test_util.TempDirTestCase):
','.join(achall.identifier.value for achall in self.achalls),
','.join(achall.identifier.value for achall in self.achalls),
len(self.achalls) - self.achalls.index(self.http_achall) - 1)
dns_persist_expected = '{0}\n{1}\n{2}\n{3}\n{4}\n{5}\n{6}'.format(
self.dns_persist_achall.identifier.value,
self.dns_persist_achall.identifier.value,
'notoken',
self.dns_persist_achall.get_validation_rdata(False),
','.join(achall.identifier.value for achall in self.achalls),
','.join(achall.identifier.value for achall in self.achalls),
len(self.achalls) - self.achalls.index(self.dns_persist_achall) - 1)
dns_persist_wildcard_expected = '{0}\n{1}\n{2}\n{3}\n{4}\n{5}\n{6}'.format(
self.dns_persist_achall_wildcard.identifier.value,
self.dns_persist_achall_wildcard.identifier.value,
'notoken',
self.dns_persist_achall_wildcard.get_validation_rdata(True),
','.join(achall.identifier.value for achall in self.achalls),
','.join(achall.identifier.value for achall in self.achalls),
len(self.achalls) - self.achalls.index(self.dns_persist_achall_wildcard) - 1)
assert self.auth.perform(self.achalls) == \
[achall.response(achall.account_key) for achall in self.achalls]
assert self.auth.perform(self.achalls) == self.responses
assert self.auth.env[self.dns_achall]['CERTBOT_AUTH_OUTPUT'] == \
dns_expected
assert self.auth.env[self.http_achall]['CERTBOT_AUTH_OUTPUT'] == \
http_expected
assert self.auth.env[self.dns_persist_achall]['CERTBOT_AUTH_OUTPUT'] == \
dns_persist_expected
assert self.auth.env[self.dns_persist_achall_wildcard]['CERTBOT_AUTH_OUTPUT'] == \
dns_persist_wildcard_expected
# Successful hook output should be sent to notify
assert self.mock_get_display().notification.call_count == len(self.achalls)
@ -105,13 +138,15 @@ class AuthenticatorTest(test_util.TempDirTestCase):
assert needle in args[0]
def test_manual_perform(self):
assert self.auth.perform(self.achalls) == \
[achall.response(achall.account_key) for achall in self.achalls]
assert self.auth.perform(self.achalls) == self.responses
assert self.mock_get_display().notification.call_count == len(self.achalls)
for i, (args, kwargs) in enumerate(self.mock_get_display().notification.call_args_list):
achall = self.achalls[i]
assert achall.validation(achall.account_key) in args[0]
if isinstance(achall.chall, challenges.DNSPersist01):
assert achall.validation_domain_name(achall.identifier.value) in args[0]
else:
assert achall.validation(achall.account_key) in args[0]
assert kwargs['wrap'] is False
def test_cleanup(self):

View file

@ -100,6 +100,13 @@ class DNS(AnnotatedChallenge):
__slots__ = ('challb', 'domain', 'identifier') # pylint: disable=redefined-slots-in-subclass
acme_type = challenges.DNS
class DNSPersist(AnnotatedChallenge):
"""Client annotated "dns-persist" ACME challenge"""
__slots__ = ('challb', 'domain', 'identifier') # pylint: disable=redefined-slots-in-subclass
acme_type = challenges.DNSPersist01
class Other(AnnotatedChallenge):
"""Client annotated ACME challenge of an unknown type."""
__slots__ = ('challb', 'domain', 'identifier') # pylint: disable=redefined-slots-in-subclass

View file

@ -18,6 +18,9 @@ HTTP01 = challenges.HTTP01(
token=b"evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA")
DNS01 = challenges.DNS01(token=b"17817c66b60ce2e4012dfad92657527a")
DNS01_2 = challenges.DNS01(token=b"cafecafecafecafecafecafe0feedbac")
DNS_PERSIST_01 = challenges.DNSPersist01(
issuer_domain_names=('ca.example',),
account_uri='https://ca.example/acct/123')
CHALLENGES = [HTTP01, DNS01]
@ -40,8 +43,7 @@ def chall_to_challb(chall: challenges.Challenge, status: messages.Status) -> mes
HTTP01_P = chall_to_challb(HTTP01, messages.STATUS_PENDING)
DNS01_P = chall_to_challb(DNS01, messages.STATUS_PENDING)
DNS01_P_2 = chall_to_challb(DNS01_2, messages.STATUS_PENDING)
CHALLENGES_P = [HTTP01_P, DNS01_P]
DNS_PERSIST_01_P = chall_to_challb(DNS_PERSIST_01, messages.STATUS_PENDING)
# AnnotatedChallenge objects
@ -51,8 +53,10 @@ DNS01_A = auth_handler.challb_to_achall(DNS01_P, JWK, messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value="example.org"))
DNS01_A_2 = auth_handler.challb_to_achall(DNS01_P_2, JWK, messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value="esimerkki.example.org"))
ACHALLENGES = [HTTP01_A, DNS01_A]
DNS_PERSIST_01_A = auth_handler.challb_to_achall(DNS_PERSIST_01_P, JWK, messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value="example.net"))
DNS_PERSIST_01_A_WILDCARD = auth_handler.challb_to_achall(DNS_PERSIST_01_P, JWK,
messages.Identifier(typ=messages.IDENTIFIER_FQDN, value="*.example.net"))
def gen_authzr(authz_status: messages.Status, domain: str, challs: Iterable[challenges.Challenge],