certbot-dns-rfc2136: add support for dyndns.org

This commit is contained in:
Alexander Houben 2025-10-31 09:42:13 +01:00
parent 4d5d5f7ae8
commit 9e2d8bfcbc
6 changed files with 197 additions and 16 deletions

View file

@ -5,7 +5,7 @@ options {
listen-on { any; };
listen-on-v6 { any; };
// We are allowing BIND to service recursive queries, but only in an extremely limimited sense
// We are allowing BIND to service recursive queries, but only in an extremely limited sense
// where it is entirely disconnected from public DNS:
// - Iterative queries are disabled. Only forwarding to a non-existent forwarder.
// - The only recursive answers we can get (that will not be a SERVFAIL) will come from the
@ -31,6 +31,11 @@ key "default-key." {
secret "91CgOwzihr0nAVEHKFXJPQCbuBBbBI19Ks5VAweUXgbF40NWTD83naeg3c5y2MPdEiFRXnRLJxL6M+AfHCGLNw==";
};
key "dyndns-key." {
algorithm hmac-md5;
secret "PP1k9/WOZmVF0ILNqhKw4g==";
};
zone "mock-recursion" {
type primary;
file "/var/lib/bind/rpz.mock-recursion";
@ -46,6 +51,7 @@ zone "example.com." {
update-policy {
grant default-key zonesub TXT;
grant dyndns-key zonesub TXT;
};
};
@ -56,5 +62,6 @@ zone "sub.example.com." {
update-policy {
grant default-key zonesub TXT;
grant dyndns-key zonesub TXT;
};
};

View file

@ -0,0 +1,16 @@
# Target DNS server
dns_rfc2136_server = {server_address}
# Target DNS port
dns_rfc2136_port = {server_port}
# TSIG key name
dns_rfc2136_name = dyndns-key.
# TSIG key secret
dns_rfc2136_secret = PP1k9/WOZmVF0ILNqhKw4g==
# TSIG key algorithm
dns_rfc2136_algorithm = HMAC-MD5
# Target DNS server protocol preference
dns_rfc2136_server_proto_pref = tcp_only
# Target Update server
dns_rfc2136_update_server = {server_address}
# Target Update server protocol preference
dns_rfc2136_update_server_proto_pref = udp_only

View file

@ -18,10 +18,11 @@ def test_context(request: pytest.FixtureRequest) -> Generator[IntegrationTestsCo
@pytest.mark.parametrize('domain', [('example.com'), ('sub.example.com')])
def test_get_certificate(domain: str, context: IntegrationTestsContext) -> None:
@pytest.mark.parametrize('label', [('default'), ('dyndns')])
def test_get_certificate(domain: str, label: str, context: IntegrationTestsContext) -> None:
context.skip_if_no_bind9_server()
with context.rfc2136_credentials() as creds:
with context.rfc2136_credentials(label) as creds:
context.certbot_test_rfc2136([
'certonly', '--dns-rfc2136-credentials', creds,
'-d', domain, '-d', '*.{}'.format(domain)

View file

@ -29,6 +29,13 @@ server and optional port that supports RFC 2136 Dynamic Updates, the name
of the TSIG key, the TSIG key secret itself, the algorithm used if it's
different to HMAC-MD5, and optionally whether to sign the initial SOA query.
Some providers differentiate between the DNS server that serves DNS queries
and the server that accepts DNS updates, so an optional separate update server
may be specified.
Optional protocol preferences for communication with the DNS server and
update server may be specified, possible values being: ``tcp_first``,
``tcp_only``, ``udp_first`` and ``udp_only``.
.. code-block:: ini
:name: credentials.ini
:caption: Example credentials file:
@ -47,6 +54,15 @@ AmKd7ak51vWKgSl12ib86oQRPkpDjg==
# TSIG sign SOA query (optional, default: false)
dns_rfc2136_sign_query = false
# Target Update server (IPv4 or IPv6 address, not a hostname)
# (optional, default: same as dns_rfc2136_server)
dns_rfc2136_update_server = 192.0.2.1
# Protocol preference for target DNS server communication
# (optional, default: tcp_first)
dns_rfc2136_server_proto_pref = tcp_first
# Protocol preference for update server communication
# (optional, default: tcp_only)
dns_rfc2136_update_server_proto_pref = tcp_only
The path to this file can be provided interactively or using the
``--dns-rfc2136-credentials`` command-line argument. Certbot records the

View file

@ -1,4 +1,5 @@
"""DNS Authenticator using RFC 2136 Dynamic Updates."""
from enum import Enum
import logging
from typing import Any
from typing import Callable
@ -24,6 +25,23 @@ logger = logging.getLogger(__name__)
DEFAULT_NETWORK_TIMEOUT = 45
class ProtoPref(Enum):
"""Enum for protocol preference options."""
TCP_ONLY = 'tcp_only'
TCP_FIRST = 'tcp_first'
UDP_ONLY = 'udp_only'
UDP_FIRST = 'udp_first'
@classmethod
def map_to_func_list(cls, pp: 'ProtoPref') -> list[Callable[..., dns.message.Message]]:
"""Map protocol preference to list of dns.query functions."""
return {
ProtoPref.TCP_ONLY: [dns.query.tcp],
ProtoPref.TCP_FIRST: [dns.query.tcp, dns.query.udp],
ProtoPref.UDP_ONLY: [dns.query.udp],
ProtoPref.UDP_FIRST: [dns.query.udp, dns.query.tcp]
}[pp] # type: ignore
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator using RFC 2136 Dynamic Updates
@ -52,7 +70,7 @@ class Authenticator(dns_common.DNSAuthenticator):
@classmethod
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 60) -> None:
super().add_parser_arguments(add, default_propagation_seconds=60)
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='RFC 2136 credentials INI file.')
def more_info(self) -> str:
@ -68,6 +86,18 @@ class Authenticator(dns_common.DNSAuthenticator):
if algorithm:
if not self.ALGORITHMS.get(algorithm.upper()):
raise errors.PluginError("Unknown algorithm: {0}.".format(algorithm))
server_pp = cast(str, credentials.conf('server_proto_pref'))
if server_pp and server_pp.upper() not in ProtoPref.__members__:
raise errors.PluginError("Unknown protocol preference: {0}. Must be one of {1}."
.format(server_pp, str.join(', ', sorted(ProtoPref.__members__))))
update_server = cast(str, credentials.conf('update_server'))
if update_server and not is_ipaddress(update_server):
raise errors.PluginError(f"The configured target update server ({update_server})" + \
"is not a valid IPv4 or IPv6 address. A hostname is not allowed.")
update_server_pp = cast(str, credentials.conf('update_server_proto_pref'))
if update_server_pp and update_server_pp.upper() not in ProtoPref.__members__:
raise errors.PluginError("Unknown update server protocol: {0}. Must be one of {1}."
.format(update_server_pp, str.join(', ',sorted(ProtoPref.__members__))))
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
@ -98,16 +128,26 @@ class Authenticator(dns_common.DNSAuthenticator):
cast(str, self.credentials.conf('name')),
cast(str, self.credentials.conf('secret')),
self.ALGORITHMS.get(algorithm, dns.tsig.HMAC_MD5),
(self.credentials.conf('sign_query') or '').upper() == "TRUE")
(self.credentials.conf('sign_query') or '').upper() == "TRUE",
DEFAULT_NETWORK_TIMEOUT,
self.credentials.conf('server_proto_pref'),
self.credentials.conf('update_server'),
self.credentials.conf('update_server_proto_pref'))
class _RFC2136Client:
domain_to_challenges_map: dict[str, list[str]] = {}
"""
Encapsulates all communication with the target DNS server.
Encapsulates all communication with the target DNS and/or update server.
"""
def __init__(self, server: str, port: int, key_name: str, key_secret: str,
key_algorithm: dns.name.Name, sign_query: bool,
timeout: int = DEFAULT_NETWORK_TIMEOUT) -> None:
timeout: int = DEFAULT_NETWORK_TIMEOUT,
server_proto_pref: str | None = None,
update_server: str | None = None,
update_server_proto_pref: str | None = None) -> None:
self.server = server
self.port = port
self.keyring = dns.tsigkeyring.from_text({
@ -116,6 +156,37 @@ class _RFC2136Client:
self.algorithm = key_algorithm
self.sign_query = sign_query
self._default_timeout = timeout
self.update_server = update_server or server
self.server_proto_pref = server_proto_pref\
and ProtoPref(server_proto_pref) or ProtoPref.TCP_FIRST
self.update_server_proto_pref = update_server_proto_pref\
and ProtoPref(update_server_proto_pref) or ProtoPref.TCP_ONLY
def _try_with_protocols(self, func: Callable[..., dns.message.Message],
proto_pref: ProtoPref) -> dns.message.Message:
"""
Try to execute a function using a list of protocol functions, falling back on failure.
:param func: The function to execute, taking a protocol function as its only argument.
:param protocol_func_list: The list of protocol functions to try.
:returns: The result of the function.
:raises dns.exception.Timeout: if all protocol functions time out.
"""
proto_funcs = ProtoPref.map_to_func_list(proto_pref)
for idx, protof in enumerate(proto_funcs):
try:
return func(protof)
except (OSError, dns.exception.Timeout) as e:
if idx == len(proto_funcs) - 1:
raise e
def prot_to_msg(prot: Callable[..., Any]) -> str:
return 'TCP' if prot is dns.query.tcp else 'UDP'
exception_message = prot_to_msg(protof) + " query failed"
if idx < len(proto_funcs) - 1:
exception_message += ", fallback to " + prot_to_msg(proto_funcs[idx + 1])
logger.debug('%s: %s', exception_message, e)
return dns.message.Message() # pragma: no cover
def add_txt_record(self, record_name: str, record_content: str, record_ttl: int) -> None:
"""
@ -133,14 +204,19 @@ class _RFC2136Client:
o = dns.name.from_text(domain)
rel = n.relativize(o)
_RFC2136Client.domain_to_challenges_map.setdefault(domain, []).insert(0, record_content)
record_content_list = _RFC2136Client.domain_to_challenges_map[domain]
update = dns.update.Update(
domain,
keyring=self.keyring,
keyalgorithm=self.algorithm)
update.add(rel, record_ttl, dns.rdatatype.TXT, record_content)
update.add(rel, record_ttl, dns.rdatatype.TXT, *record_content_list)
try:
response = dns.query.tcp(update, self.server, self._default_timeout, self.port)
response = self._try_with_protocols(lambda prot:
prot(update, self.update_server, self._default_timeout, self.port),
self.update_server_proto_pref)
except Exception as e:
raise errors.PluginError('Encountered error adding TXT record: {0}'
.format(e))
@ -168,6 +244,8 @@ class _RFC2136Client:
o = dns.name.from_text(domain)
rel = n.relativize(o)
_RFC2136Client.domain_to_challenges_map.pop(domain, None)
update = dns.update.Update(
domain,
keyring=self.keyring,
@ -175,7 +253,9 @@ class _RFC2136Client:
update.delete(rel, dns.rdatatype.TXT, record_content)
try:
response = dns.query.tcp(update, self.server, self._default_timeout, self.port)
response = self._try_with_protocols(lambda prot:
prot(update, self.update_server, self._default_timeout, self.port),
self.update_server_proto_pref)
except Exception as e:
raise errors.PluginError('Encountered error deleting TXT record: {0}'
.format(e))
@ -227,11 +307,9 @@ class _RFC2136Client:
request.use_tsig(self.keyring, algorithm=self.algorithm)
try:
try:
response = dns.query.tcp(request, self.server, self._default_timeout, self.port)
except (OSError, dns.exception.Timeout) as e:
logger.debug('TCP query failed, fallback to UDP: %s', e)
response = dns.query.udp(request, self.server, self._default_timeout, self.port)
response = self._try_with_protocols(lambda prot:
prot(request, self.server, self._default_timeout, self.port),
self.server_proto_pref)
rcode = response.rcode()
# Authoritative Answer bit should be set

View file

@ -100,7 +100,7 @@ class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthentic
self.auth.perform([self.achall])
@test_util.patch_display_util()
def test_valid_server_passes(self, unused_mock_get_utility):
def test_valid_server_passes(self, unused_mock_get_utility: mock.MagicMock) -> None:
config = VALID_CONFIG.copy()
dns_test_common.write(config, self.config.rfc2136_credentials)
@ -111,6 +111,58 @@ class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthentic
self.auth.perform([self.achall])
def test_invalid_update_server_raises(self):
config = VALID_CONFIG.copy()
config["rfc2136_update_server"] = "example.com"
dns_test_common.write(config, self.config.rfc2136_credentials)
with pytest.raises(errors.PluginError):
self.auth.perform([self.achall])
@test_util.patch_display_util()
def test_valid_update_server_passes(self, unused_mock_get_utility: mock.MagicMock) -> None:
config = VALID_CONFIG.copy()
dns_test_common.write(config, self.config.rfc2136_credentials)
self.auth.perform([self.achall])
config["rfc2136_update_server"] = "2001:db8:3333:4444:cccc:dddd:eeee:ffff"
dns_test_common.write(config, self.config.rfc2136_credentials)
self.auth.perform([self.achall])
def test_invalid_server_proto_pref_raises(self):
config = VALID_CONFIG.copy()
config["rfc2136_server_proto_pref"] = "invalid_proto"
dns_test_common.write(config, self.config.rfc2136_credentials)
with pytest.raises(errors.PluginError):
self.auth.perform([self.achall])
@test_util.patch_display_util()
def test_valid_server_proto_pref_passes(self, unused_mock_get_utility: mock.MagicMock) -> None:
config = VALID_CONFIG.copy()
config["rfc2136_server_proto_pref"] = "tcp_only"
dns_test_common.write(config, self.config.rfc2136_credentials)
self.auth.perform([self.achall])
def test_invalid_update_server_proto_pref_raises(self):
config = VALID_CONFIG.copy()
config["rfc2136_update_server_proto_pref"] = "invalid_proto"
dns_test_common.write(config, self.config.rfc2136_credentials)
with pytest.raises(errors.PluginError):
self.auth.perform([self.achall])
@test_util.patch_display_util()
def test_valid_update_server_proto_pref_passes(self, unused_mock_get_utility: mock.MagicMock) -> None:
config = VALID_CONFIG.copy()
config["rfc2136_update_server_proto_pref"] = "udp_first"
dns_test_common.write(config, self.config.rfc2136_credentials)
self.auth.perform([self.achall])
class RFC2136ClientTest(unittest.TestCase):
@ -267,6 +319,17 @@ class RFC2136ClientTest(unittest.TestCase):
mock_make_query.return_value.use_tsig.assert_called_with(mock.ANY,
algorithm=dns.tsig.HMAC_MD5)
@mock.patch("dns.query.tcp")
def test_timeout_on_last_item_in_try_with_protocols_raises(self, unused_mock_query):
from certbot_dns_rfc2136._internal.dns_rfc2136 import ProtoPref
from dns.exception import Timeout
def dns_timeout(*args, **kwargs):
raise Timeout() # type: ignore
with pytest.raises(dns.exception.Timeout):
self.rfc2136_client._try_with_protocols(dns_timeout, ProtoPref.TCP_ONLY)
if __name__ == "__main__":
sys.exit(pytest.main(sys.argv[1:] + [__file__])) # pragma: no cover