knot-dns/tests-extra/tools/dnstest/module.py
Bron Gondwana 2946fe16f2 mod-alias: synthesise ALIAS records from locally-served targets
Add the `mod-alias` query module which synthesises answers for ALIAS
records (type 65401) at query time by looking up the ALIAS target in
the server's zone database and copying the target's records into the
response with the original query name as the owner.

The module hooks at KNOTD_STAGE_PREANSWER and is attached at zone
scope (typically via a template).  Behaviour:

  * Fires for A/AAAA queries, any others passed through to the
    standard resolver.
  * ALIAS is additive: direct rrsets on the alias node are merged
    with the synthesised target rrsets.
  * Multiple ALIAS rdata on a node are followed and their results
    merged.
  * TTL = min(alias_ttl, all contributing source TTLs).
  * Targets not served by a zone in this server are ignored;
    external resolution is out of scope.
  * Synthesised records are not signed; pair with mod-onlinesign
    if signed answers are required.

The integration test runs in two random modes per invocation
(plain and DNSSEC-via-mod-onlinesign) so both code paths are
exercised over time.

Co-authored-by: Daniel Salzman <daniel.salzman@nic.cz>
2026-05-10 23:36:27 -04:00

482 lines
14 KiB
Python

#!/usr/bin/env python3
import os
import re
from subprocess import Popen, PIPE, check_call
from dnstest.utils import *
import dnstest.config
import dnstest.params as params
class KnotModule(object):
'''Query module configuration'''
MOD_CONF_PREFIX = "mod-"
MOD_API_PREFIX = "knotd_mod_api_"
# Instance counter.
count = 1
# Module API name suffix.
mod_name = None
# Empty configuration
empty = False
def __init__(self):
self.conf_id = "id%s" % type(self).count
type(self).count += 1
@property
def conf_name(self):
return self.MOD_CONF_PREFIX + self.mod_name
@classmethod
def _check_cmd(cls):
if params.libtool_bin:
prefix = [params.libtool_bin, "exec"]
else:
prefix = []
return prefix + ["objdump", "-t", params.knot_bin]
@classmethod
def check(cls):
'''Checks the server binary for the module code'''
try:
proc = Popen(cls._check_cmd(), stdout=PIPE, stderr=PIPE,
universal_newlines=True)
(out, err) = proc.communicate()
if re.search(cls.MOD_API_PREFIX + cls.mod_name, out):
return
raise Skip()
except:
raise Skip("Module '%s' not detected" % cls.mod_name)
def get_conf_ref(self):
if self.empty:
return str(self.conf_name)
else:
return "%s/%s" % (self.conf_name, self.conf_id)
def get_conf(self, conf=None): pass
class ModSynthRecord(KnotModule):
'''Automatic forward/reverse records module'''
mod_name = "synthrecord"
def __init__(self, mtype, prefix, ttl, network, origin=None):
super().__init__()
self.mtype = mtype
self.prefix = prefix
self.ttl = ttl
self.network = network
self.origin = origin
def get_conf(self, conf=None):
if not conf:
conf = dnstest.config.KnotConf()
conf.begin(self.conf_name)
conf.id_item("id", self.conf_id)
conf.item_str("type", self.mtype)
if (self.prefix):
conf.item_str("prefix", self.prefix)
if (self.ttl):
conf.item_str("ttl", self.ttl)
conf.item("network", self.network)
if (self.origin):
conf.item_str("origin", self.origin)
conf.end()
return conf
class ModDnstap(KnotModule):
'''Dnstap module'''
mod_name = "dnstap"
def __init__(self, sink):
super().__init__()
self.sink = sink
def get_conf(self, conf=None):
if not conf:
conf = dnstest.config.KnotConf()
conf.begin(self.conf_name)
conf.id_item("id", self.conf_id)
conf.item_str("sink", self.sink)
conf.end()
return conf
class ModRRL(KnotModule):
'''RRL module'''
mod_name = "rrl"
def __init__(self, rate_limit, slip=None, table_size=None, whitelist=None,
instant_limit=None, log_period=0):
super().__init__()
self.rate_limit = rate_limit
self.instant_limit = instant_limit if instant_limit else rate_limit
self.slip = slip
self.table_size = table_size
self.whitelist = whitelist
self.log_period = log_period
def get_conf(self, conf=None):
if not conf:
conf = dnstest.config.KnotConf()
conf.begin(self.conf_name)
conf.id_item("id", self.conf_id)
conf.item_str("rate-limit", self.rate_limit)
conf.item_str("instant-limit", self.instant_limit)
if self.slip or self.slip == 0:
conf.item_str("slip", self.slip)
if self.table_size:
conf.item_str("table-size", self.table_size)
if self.whitelist:
conf.item_str("whitelist", self.whitelist)
if self.log_period > 0:
conf.item_str("log-period", self.log_period)
conf.end()
return conf
class ModDnsproxy(KnotModule):
'''Dnsproxy module'''
mod_name = "dnsproxy"
def __init__(self, addr, port=53, nxdomain=False, fallback=True):
super().__init__()
self.addr = addr
self.port = port
self.fallback = fallback
self.nxdomain = nxdomain
def get_conf(self, conf=None):
if not conf:
conf = dnstest.config.KnotConf()
conf.begin("remote")
conf.id_item("id", "%s_%s" % (self.conf_name, self.conf_id))
conf.item_str("address", "%s@%s" % (self.addr, self.port))
conf.end()
conf.begin(self.conf_name)
conf.id_item("id", self.conf_id)
conf.item_str("remote", "%s_%s" % (self.conf_name, self.conf_id))
conf.item_str("fallback", "on" if self.fallback else "off")
conf.item_str("catch-nxdomain", "on" if self.nxdomain else "off")
conf.end()
return conf
class ModWhoami(KnotModule):
'''Whoami module'''
mod_name = "whoami"
empty = True
def __init__(self):
super().__init__()
class ModOnlineSign(KnotModule):
'''Online-sign module'''
mod_name = "onlinesign"
def __init__(self, algorithm=None, key_size=None, prop_delay=3600, ksc=[ ],
ksci=999, ksk_life=9999, ksk_shared=False, cds_publish="rollover",
cds_digesttype="sha256", single_type_signing=True):
super().__init__()
self.algorithm = algorithm
self.key_size = key_size
if not algorithm:
self.empty = True
self.prop_delay = prop_delay
self.ksc = ksc
self.ksci = ksci
self.ksk_life = ksk_life
self.ksk_shared = ksk_shared
self.cds_publish = cds_publish
self.cds_digesttype = cds_digesttype
self.single_type_signing = single_type_signing
def get_conf(self, conf=None):
if not conf:
conf = dnstest.config.KnotConf()
if self.algorithm and len(self.ksc) > 0:
conf.begin("submission")
conf.id_item("id", "blahblah")
parents = ""
for parent in self.ksc:
if parents:
parents += ", "
parents += parent.name
conf.item("parent", "[%s]" % parents)
conf.item_str("check-interval", self.ksci)
if self.algorithm:
conf.begin("policy")
conf.id_item("id", "%s_%s" % (self.conf_name, self.conf_id))
conf.item_str("algorithm", self.algorithm)
if self.key_size:
conf.item_str("zsk-size", self.key_size)
conf.item_str("propagation-delay", self.prop_delay)
if len(self.ksc) > 0:
conf.item("ksk-submission", "blahblah")
conf.item("ksk-lifetime", self.ksk_life)
conf.item("ksk-shared", self.ksk_shared)
conf.item("cds-cdnskey-publish", self.cds_publish)
conf.item("cds-digest-type", self.cds_digesttype)
conf.item("single-type-signing", self.single_type_signing)
conf.end()
conf.begin(self.conf_name)
conf.id_item("id", self.conf_id)
conf.item_str("policy", "%s_%s" % (self.conf_name, self.conf_id))
conf.end()
return conf
class ModStats(KnotModule):
'''Stats module'''
mod_name = "stats"
def __init__(self):
super().__init__()
def _bool(self, conf, name, value=True):
conf.item_str(name, "on" if value else "off")
def get_conf(self, conf=None):
if not conf:
conf = dnstest.config.KnotConf()
conf.begin(self.conf_name)
conf.id_item("id", self.conf_id)
self._bool(conf, "request-protocol", True)
self._bool(conf, "server-operation", True)
self._bool(conf, "request-bytes", True)
self._bool(conf, "response-bytes", True)
self._bool(conf, "edns-presence", True)
self._bool(conf, "flag-presence", True)
self._bool(conf, "response-code", True)
self._bool(conf, "request-edns-option", True)
self._bool(conf, "response-edns-option", True)
self._bool(conf, "reply-nodata", True)
self._bool(conf, "query-type", True)
self._bool(conf, "query-size", True)
self._bool(conf, "reply-size", True)
conf.end()
return conf
class ModCookies(KnotModule):
'''Cookies module'''
mod_name = "cookies"
def __init__(self,
secret_lifetime : int | None = None,
badcookie_slip : int | None = None,
secret : list[bytearray] | None = None):
super().__init__()
self.secret_lifetime = secret_lifetime
self.badcookie_slip = badcookie_slip
self.secret = ['0x'+s.hex() for s in secret] if secret else None
def get_conf(self, conf=None):
if not conf:
conf = dnstest.config.KnotConf()
conf.begin(self.conf_name)
conf.id_item("id", self.conf_id)
if self.badcookie_slip:
conf.item_str("badcookie-slip", self.badcookie_slip)
if self.secret_lifetime:
conf.item_str("secret-lifetime", self.secret_lifetime)
if self.secret:
conf.item_list("secret", self.secret)
conf.end()
return conf
class ModQueryacl(KnotModule):
'''Query ACL module'''
mod_name = "queryacl"
def __init__(self, address=None, interface=None):
super().__init__()
self.address = address
self.interface = interface
def get_conf(self, conf=None):
if not conf:
conf = dnstest.config.KnotConf()
conf.begin(self.conf_name)
conf.id_item("id", self.conf_id)
if self.address:
if isinstance(self.address, list):
conf.item_list("address", self.address)
else:
conf.item("address", self.address)
if self.interface:
if isinstance(self.interface, list):
conf.item_list("interface", self.interface)
else:
conf.item("interface", self.interface)
conf.end()
return conf
class ModGeoip(KnotModule):
'''GeoIP module'''
mod_name = "geoip"
def __init__(self, config_file="net.conf", mode="subnet", geodb_file=None, geodb_key=None):
super().__init__()
self.config_file = config_file
self.mode = mode
self.geodb_file = geodb_file
self.geodb_key = geodb_key
@classmethod
def check(self):
'''Extended module check by libmaxminddb dependency check'''
super().check()
try:
proc = Popen(self._check_cmd(), stdout=PIPE, stderr=PIPE,
universal_newlines=True)
(out, err) = proc.communicate()
if re.search("MMDB_open", out):
return
raise Skip()
except:
raise Skip("Library 'maxminddb' not detected")
def get_conf(self, conf=None):
if not conf:
conf = dnstest.config.KnotConf()
conf.begin(self.conf_name)
conf.id_item("id", self.conf_id)
conf.item_str("config-file", self.config_file)
conf.item("mode", self.mode)
if self.geodb_file:
conf.item_str("geodb-file", self.geodb_file)
if self.geodb_key:
if isinstance(self.geodb_key, list):
conf.item_list("geodb-key", self.geodb_key)
else:
conf.item("geodb-key", self.geodb_key)
conf.item("ttl", 1234)
conf.end()
return conf
class ModNoudp(KnotModule):
'''No UDP module'''
mod_name = "noudp"
def __init__(self, allow_rate=None, trunc_rate=None):
super().__init__()
self.allow_rate = allow_rate
self.trunc_rate = trunc_rate
def get_conf(self, conf=None):
if not conf:
conf = dnstest.config.KnotConf()
conf.begin(self.conf_name)
conf.id_item("id", self.conf_id)
if self.allow_rate:
conf.item_str("udp-allow-rate", self.allow_rate)
if self.trunc_rate:
conf.item_str("udp-truncate-rate", self.trunc_rate)
conf.end()
return conf
class ModProbe(KnotModule):
'''Probe module'''
mod_name = "probe"
def __init__(self, path=None, channels=1, max_rate=1000):
super().__init__()
self.path = path
self.channels = channels
self.max_rate = max_rate
def get_conf(self, conf=None):
if not conf:
conf = dnstest.config.KnotConf()
conf.begin(self.conf_name)
conf.id_item("id", self.conf_id)
if self.path:
conf.item_str("path", self.path)
if self.channels:
conf.item("channels", self.channels)
if self.max_rate:
conf.item("max-rate", self.max_rate)
conf.end()
return conf
class ModAuthSignal(KnotModule):
'''AuthSignal module'''
mod_name = "authsignal"
empty = True
class ModAlias(KnotModule):
'''ALIAS synthesis from locally-served target zones'''
mod_name = "alias"
empty = True
class ModDnsErr(KnotModule):
'''DNS Error module'''
mod_name = "dnserr"
def __init__(self, report_channel=None, agent=False, cache_size=None, cache_lifetime=None):
super().__init__()
self.report_channel = report_channel
self.agent = agent
self.cache_size = cache_size
self.cache_lifetime = cache_lifetime
def get_conf(self, conf=None):
if not conf:
conf = dnstest.config.KnotConf()
conf.begin(self.conf_name)
conf.id_item("id", self.conf_id)
if self.report_channel:
conf.item_str("report-channel", self.report_channel)
if self.agent:
conf.item_str("agent", "on")
if self.cache_size:
conf.item_str("cache-size", self.cache_size)
if self.cache_lifetime:
conf.item_str("cache-lifetime", self.cache_lifetime)
conf.end()
return conf