mirror of
https://gitlab.nic.cz/knot/knot-dns.git
synced 2026-05-28 04:02:31 -04:00
Add the `mod-alias` query module which synthesises answers for ALIAS
records (type 65401) at query time by looking up the ALIAS target in
the server's zone database and copying the target's records into the
response with the original query name as the owner.
The module hooks at KNOTD_STAGE_PREANSWER and is attached at zone
scope (typically via a template). Behaviour:
* Fires for A/AAAA queries, any others passed through to the
standard resolver.
* ALIAS is additive: direct rrsets on the alias node are merged
with the synthesised target rrsets.
* Multiple ALIAS rdata on a node are followed and their results
merged.
* TTL = min(alias_ttl, all contributing source TTLs).
* Targets not served by a zone in this server are ignored;
external resolution is out of scope.
* Synthesised records are not signed; pair with mod-onlinesign
if signed answers are required.
The integration test runs in two random modes per invocation
(plain and DNSSEC-via-mod-onlinesign) so both code paths are
exercised over time.
Co-authored-by: Daniel Salzman <daniel.salzman@nic.cz>
482 lines
14 KiB
Python
482 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import re
|
|
from subprocess import Popen, PIPE, check_call
|
|
from dnstest.utils import *
|
|
import dnstest.config
|
|
import dnstest.params as params
|
|
|
|
class KnotModule(object):
|
|
'''Query module configuration'''
|
|
|
|
MOD_CONF_PREFIX = "mod-"
|
|
MOD_API_PREFIX = "knotd_mod_api_"
|
|
|
|
# Instance counter.
|
|
count = 1
|
|
# Module API name suffix.
|
|
mod_name = None
|
|
# Empty configuration
|
|
empty = False
|
|
|
|
def __init__(self):
|
|
self.conf_id = "id%s" % type(self).count
|
|
type(self).count += 1
|
|
|
|
@property
|
|
def conf_name(self):
|
|
return self.MOD_CONF_PREFIX + self.mod_name
|
|
|
|
@classmethod
|
|
def _check_cmd(cls):
|
|
if params.libtool_bin:
|
|
prefix = [params.libtool_bin, "exec"]
|
|
else:
|
|
prefix = []
|
|
|
|
return prefix + ["objdump", "-t", params.knot_bin]
|
|
|
|
@classmethod
|
|
def check(cls):
|
|
'''Checks the server binary for the module code'''
|
|
|
|
try:
|
|
proc = Popen(cls._check_cmd(), stdout=PIPE, stderr=PIPE,
|
|
universal_newlines=True)
|
|
(out, err) = proc.communicate()
|
|
|
|
if re.search(cls.MOD_API_PREFIX + cls.mod_name, out):
|
|
return
|
|
|
|
raise Skip()
|
|
except:
|
|
raise Skip("Module '%s' not detected" % cls.mod_name)
|
|
|
|
def get_conf_ref(self):
|
|
if self.empty:
|
|
return str(self.conf_name)
|
|
else:
|
|
return "%s/%s" % (self.conf_name, self.conf_id)
|
|
|
|
def get_conf(self, conf=None): pass
|
|
|
|
class ModSynthRecord(KnotModule):
|
|
'''Automatic forward/reverse records module'''
|
|
|
|
mod_name = "synthrecord"
|
|
|
|
def __init__(self, mtype, prefix, ttl, network, origin=None):
|
|
super().__init__()
|
|
self.mtype = mtype
|
|
self.prefix = prefix
|
|
self.ttl = ttl
|
|
self.network = network
|
|
self.origin = origin
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
conf.item_str("type", self.mtype)
|
|
if (self.prefix):
|
|
conf.item_str("prefix", self.prefix)
|
|
if (self.ttl):
|
|
conf.item_str("ttl", self.ttl)
|
|
conf.item("network", self.network)
|
|
if (self.origin):
|
|
conf.item_str("origin", self.origin)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModDnstap(KnotModule):
|
|
'''Dnstap module'''
|
|
|
|
mod_name = "dnstap"
|
|
|
|
def __init__(self, sink):
|
|
super().__init__()
|
|
self.sink = sink
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
conf.item_str("sink", self.sink)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModRRL(KnotModule):
|
|
'''RRL module'''
|
|
|
|
mod_name = "rrl"
|
|
|
|
def __init__(self, rate_limit, slip=None, table_size=None, whitelist=None,
|
|
instant_limit=None, log_period=0):
|
|
super().__init__()
|
|
self.rate_limit = rate_limit
|
|
self.instant_limit = instant_limit if instant_limit else rate_limit
|
|
self.slip = slip
|
|
self.table_size = table_size
|
|
self.whitelist = whitelist
|
|
self.log_period = log_period
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
conf.item_str("rate-limit", self.rate_limit)
|
|
conf.item_str("instant-limit", self.instant_limit)
|
|
if self.slip or self.slip == 0:
|
|
conf.item_str("slip", self.slip)
|
|
if self.table_size:
|
|
conf.item_str("table-size", self.table_size)
|
|
if self.whitelist:
|
|
conf.item_str("whitelist", self.whitelist)
|
|
if self.log_period > 0:
|
|
conf.item_str("log-period", self.log_period)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModDnsproxy(KnotModule):
|
|
'''Dnsproxy module'''
|
|
|
|
mod_name = "dnsproxy"
|
|
|
|
def __init__(self, addr, port=53, nxdomain=False, fallback=True):
|
|
super().__init__()
|
|
self.addr = addr
|
|
self.port = port
|
|
self.fallback = fallback
|
|
self.nxdomain = nxdomain
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin("remote")
|
|
conf.id_item("id", "%s_%s" % (self.conf_name, self.conf_id))
|
|
conf.item_str("address", "%s@%s" % (self.addr, self.port))
|
|
conf.end()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
conf.item_str("remote", "%s_%s" % (self.conf_name, self.conf_id))
|
|
conf.item_str("fallback", "on" if self.fallback else "off")
|
|
conf.item_str("catch-nxdomain", "on" if self.nxdomain else "off")
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModWhoami(KnotModule):
|
|
'''Whoami module'''
|
|
|
|
mod_name = "whoami"
|
|
empty = True
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
class ModOnlineSign(KnotModule):
|
|
'''Online-sign module'''
|
|
|
|
mod_name = "onlinesign"
|
|
|
|
def __init__(self, algorithm=None, key_size=None, prop_delay=3600, ksc=[ ],
|
|
ksci=999, ksk_life=9999, ksk_shared=False, cds_publish="rollover",
|
|
cds_digesttype="sha256", single_type_signing=True):
|
|
super().__init__()
|
|
self.algorithm = algorithm
|
|
self.key_size = key_size
|
|
if not algorithm:
|
|
self.empty = True
|
|
self.prop_delay = prop_delay
|
|
self.ksc = ksc
|
|
self.ksci = ksci
|
|
self.ksk_life = ksk_life
|
|
self.ksk_shared = ksk_shared
|
|
self.cds_publish = cds_publish
|
|
self.cds_digesttype = cds_digesttype
|
|
self.single_type_signing = single_type_signing
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
if self.algorithm and len(self.ksc) > 0:
|
|
conf.begin("submission")
|
|
conf.id_item("id", "blahblah")
|
|
parents = ""
|
|
for parent in self.ksc:
|
|
if parents:
|
|
parents += ", "
|
|
parents += parent.name
|
|
conf.item("parent", "[%s]" % parents)
|
|
conf.item_str("check-interval", self.ksci)
|
|
|
|
if self.algorithm:
|
|
conf.begin("policy")
|
|
conf.id_item("id", "%s_%s" % (self.conf_name, self.conf_id))
|
|
conf.item_str("algorithm", self.algorithm)
|
|
if self.key_size:
|
|
conf.item_str("zsk-size", self.key_size)
|
|
conf.item_str("propagation-delay", self.prop_delay)
|
|
if len(self.ksc) > 0:
|
|
conf.item("ksk-submission", "blahblah")
|
|
conf.item("ksk-lifetime", self.ksk_life)
|
|
conf.item("ksk-shared", self.ksk_shared)
|
|
conf.item("cds-cdnskey-publish", self.cds_publish)
|
|
conf.item("cds-digest-type", self.cds_digesttype)
|
|
conf.item("single-type-signing", self.single_type_signing)
|
|
conf.end()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
conf.item_str("policy", "%s_%s" % (self.conf_name, self.conf_id))
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModStats(KnotModule):
|
|
'''Stats module'''
|
|
|
|
mod_name = "stats"
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
def _bool(self, conf, name, value=True):
|
|
conf.item_str(name, "on" if value else "off")
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
self._bool(conf, "request-protocol", True)
|
|
self._bool(conf, "server-operation", True)
|
|
self._bool(conf, "request-bytes", True)
|
|
self._bool(conf, "response-bytes", True)
|
|
self._bool(conf, "edns-presence", True)
|
|
self._bool(conf, "flag-presence", True)
|
|
self._bool(conf, "response-code", True)
|
|
self._bool(conf, "request-edns-option", True)
|
|
self._bool(conf, "response-edns-option", True)
|
|
self._bool(conf, "reply-nodata", True)
|
|
self._bool(conf, "query-type", True)
|
|
self._bool(conf, "query-size", True)
|
|
self._bool(conf, "reply-size", True)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModCookies(KnotModule):
|
|
'''Cookies module'''
|
|
|
|
mod_name = "cookies"
|
|
|
|
def __init__(self,
|
|
secret_lifetime : int | None = None,
|
|
badcookie_slip : int | None = None,
|
|
secret : list[bytearray] | None = None):
|
|
super().__init__()
|
|
self.secret_lifetime = secret_lifetime
|
|
self.badcookie_slip = badcookie_slip
|
|
self.secret = ['0x'+s.hex() for s in secret] if secret else None
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
if self.badcookie_slip:
|
|
conf.item_str("badcookie-slip", self.badcookie_slip)
|
|
if self.secret_lifetime:
|
|
conf.item_str("secret-lifetime", self.secret_lifetime)
|
|
if self.secret:
|
|
conf.item_list("secret", self.secret)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModQueryacl(KnotModule):
|
|
'''Query ACL module'''
|
|
|
|
mod_name = "queryacl"
|
|
|
|
def __init__(self, address=None, interface=None):
|
|
super().__init__()
|
|
self.address = address
|
|
self.interface = interface
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
if self.address:
|
|
if isinstance(self.address, list):
|
|
conf.item_list("address", self.address)
|
|
else:
|
|
conf.item("address", self.address)
|
|
if self.interface:
|
|
if isinstance(self.interface, list):
|
|
conf.item_list("interface", self.interface)
|
|
else:
|
|
conf.item("interface", self.interface)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModGeoip(KnotModule):
|
|
'''GeoIP module'''
|
|
|
|
mod_name = "geoip"
|
|
|
|
def __init__(self, config_file="net.conf", mode="subnet", geodb_file=None, geodb_key=None):
|
|
super().__init__()
|
|
self.config_file = config_file
|
|
self.mode = mode
|
|
self.geodb_file = geodb_file
|
|
self.geodb_key = geodb_key
|
|
|
|
@classmethod
|
|
def check(self):
|
|
'''Extended module check by libmaxminddb dependency check'''
|
|
super().check()
|
|
|
|
try:
|
|
proc = Popen(self._check_cmd(), stdout=PIPE, stderr=PIPE,
|
|
universal_newlines=True)
|
|
(out, err) = proc.communicate()
|
|
if re.search("MMDB_open", out):
|
|
return
|
|
raise Skip()
|
|
except:
|
|
raise Skip("Library 'maxminddb' not detected")
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
conf.item_str("config-file", self.config_file)
|
|
conf.item("mode", self.mode)
|
|
if self.geodb_file:
|
|
conf.item_str("geodb-file", self.geodb_file)
|
|
if self.geodb_key:
|
|
if isinstance(self.geodb_key, list):
|
|
conf.item_list("geodb-key", self.geodb_key)
|
|
else:
|
|
conf.item("geodb-key", self.geodb_key)
|
|
conf.item("ttl", 1234)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModNoudp(KnotModule):
|
|
'''No UDP module'''
|
|
|
|
mod_name = "noudp"
|
|
|
|
def __init__(self, allow_rate=None, trunc_rate=None):
|
|
super().__init__()
|
|
self.allow_rate = allow_rate
|
|
self.trunc_rate = trunc_rate
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
if self.allow_rate:
|
|
conf.item_str("udp-allow-rate", self.allow_rate)
|
|
if self.trunc_rate:
|
|
conf.item_str("udp-truncate-rate", self.trunc_rate)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModProbe(KnotModule):
|
|
'''Probe module'''
|
|
|
|
mod_name = "probe"
|
|
|
|
def __init__(self, path=None, channels=1, max_rate=1000):
|
|
super().__init__()
|
|
self.path = path
|
|
self.channels = channels
|
|
self.max_rate = max_rate
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
if self.path:
|
|
conf.item_str("path", self.path)
|
|
if self.channels:
|
|
conf.item("channels", self.channels)
|
|
if self.max_rate:
|
|
conf.item("max-rate", self.max_rate)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModAuthSignal(KnotModule):
|
|
'''AuthSignal module'''
|
|
|
|
mod_name = "authsignal"
|
|
empty = True
|
|
|
|
class ModAlias(KnotModule):
|
|
'''ALIAS synthesis from locally-served target zones'''
|
|
|
|
mod_name = "alias"
|
|
empty = True
|
|
|
|
class ModDnsErr(KnotModule):
|
|
'''DNS Error module'''
|
|
|
|
mod_name = "dnserr"
|
|
|
|
def __init__(self, report_channel=None, agent=False, cache_size=None, cache_lifetime=None):
|
|
super().__init__()
|
|
self.report_channel = report_channel
|
|
self.agent = agent
|
|
self.cache_size = cache_size
|
|
self.cache_lifetime = cache_lifetime
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
if self.report_channel:
|
|
conf.item_str("report-channel", self.report_channel)
|
|
if self.agent:
|
|
conf.item_str("agent", "on")
|
|
if self.cache_size:
|
|
conf.item_str("cache-size", self.cache_size)
|
|
if self.cache_lifetime:
|
|
conf.item_str("cache-lifetime", self.cache_lifetime)
|
|
conf.end()
|
|
|
|
return conf
|
|
|