Properly handle CNAMEs when preparing responses

dnspython does not treat CNAME records in zone files in any special way;
they are just RRsets belonging to zone nodes.  Process CNAMEs when
preparing zone-based responses just like a normal authoritative DNS
server would.
This commit is contained in:
Michał Kępień 2025-05-30 18:08:54 +02:00
parent 2a9c74546d
commit 1b8ceec580
No known key found for this signature in database

View file

@ -266,11 +266,16 @@ class QueryContext:
soa: Optional[dns.rrset.RRset] = None
node: Optional[dns.node.Node] = None
answer: Optional[dns.rdataset.Rdataset] = None
alias: Optional[dns.name.Name] = None
@property
def qname(self) -> dns.name.Name:
return self.query.question[0].name
@property
def current_qname(self) -> dns.name.Name:
return self.alias or self.qname
@property
def qclass(self) -> RdataClass:
return self.query.question[0].rdclass
@ -809,23 +814,28 @@ class AsyncDnsServer(AsyncServer):
if self._nxdomain_response(qctx):
return
if self._cname_response(qctx):
return
if self._nodata_response(qctx):
return
self._noerror_response(qctx)
def _refused_response(self, qctx: QueryContext) -> bool:
qctx.zone = self._zone_tree.find_best_zone(qctx.qname)
if qctx.zone:
zone = self._zone_tree.find_best_zone(qctx.current_qname)
if zone:
qctx.zone = zone
return False
qctx.response.set_rcode(dns.rcode.REFUSED)
if not qctx.response.answer:
qctx.response.set_rcode(dns.rcode.REFUSED)
return True
def _delegation_response(self, qctx: QueryContext) -> bool:
assert qctx.zone
name = qctx.qname
name = qctx.current_qname
delegation = None
while name != qctx.zone.origin:
@ -870,9 +880,9 @@ class AsyncDnsServer(AsyncServer):
qctx.soa = qctx.zone.find_rrset(qctx.zone.origin, dns.rdatatype.SOA)
assert qctx.soa
qctx.node = qctx.zone.get_node(qctx.qname)
qctx.node = qctx.zone.get_node(qctx.current_qname)
if qctx.node or not any(
n for n in qctx.zone.nodes if n.is_subdomain(qctx.qname)
n for n in qctx.zone.nodes if n.is_subdomain(qctx.current_qname)
):
return False
@ -890,6 +900,21 @@ class AsyncDnsServer(AsyncServer):
qctx.response.authority.append(qctx.soa)
return True
def _cname_response(self, qctx: QueryContext) -> bool:
assert qctx.node
cname = qctx.node.get_rdataset(qctx.qclass, dns.rdatatype.CNAME)
if not cname:
return False
cname_rrset = dns.rrset.RRset(qctx.current_qname, qctx.qclass, cname.rdtype)
cname_rrset.update(cname)
qctx.response.answer.append(cname_rrset)
qctx.alias = cname[0].target
self._prepare_response_from_zone_data(qctx)
return True
def _nodata_response(self, qctx: QueryContext) -> bool:
assert qctx.node
assert qctx.soa
@ -899,13 +924,14 @@ class AsyncDnsServer(AsyncServer):
return False
qctx.response.set_rcode(dns.rcode.NOERROR)
qctx.response.authority.append(qctx.soa)
if not qctx.response.answer:
qctx.response.authority.append(qctx.soa)
return True
def _noerror_response(self, qctx: QueryContext) -> None:
assert qctx.answer
answer_rrset = dns.rrset.RRset(qctx.qname, qctx.qclass, qctx.qtype)
answer_rrset = dns.rrset.RRset(qctx.current_qname, qctx.qclass, qctx.qtype)
answer_rrset.update(qctx.answer)
qctx.response.set_rcode(dns.rcode.NOERROR)