Use isctest.asyncserver in the "tsig" test

Replace the custom DNS server used in the "tsig" system test with
new code based on the isctest.asyncserver module.

(cherry picked from commit e34e831cab)
This commit is contained in:
Štěpán Balážik 2025-06-11 00:33:39 +02:00
parent 58571d588f
commit cbee7c6c52
3 changed files with 52 additions and 52 deletions

View file

@ -1,52 +0,0 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
#
# An adhoc server that returns a TC=1 response with the final byte
# removed to generate UNEXPECTEDEND form dns_message_parse.
#
use IO::File;
use IO::Socket;
my $localport = int($ENV{'PORT'});
if (!$localport) { $localport = 5300; }
printf "localport %u\n", $localport;
my $sock = IO::Socket::INET->new(LocalAddr => "10.53.0.2",
LocalPort => $localport, Proto => "udp") or die "$!";
my $pidf = new IO::File "ans.pid", "w" or die "cannot open pid file: $!";
print $pidf "$$\n" or die "cannot write pid file: $!";
$pidf->close or die "cannot close pid file: $!";
sub rmpid { unlink "ans.pid"; exit 1; };
$SIG{INT} = \&rmpid;
$SIG{TERM} = \&rmpid;
sub arraystring {
my $string = join("", @_);
return $string;
}
for (;;) {
$from = $sock->recv($buf, 512);
($port, $ip_address) = unpack_sockaddr_in($from);
$l = length($buf);
printf "received %u bytes from %s#%u\n", $l, inet_ntoa($ip_address), $port;
@up = unpack("C[$l]", $buf);
$up[2] |= 0x80; # QR
$up[2] |= 0x02; # TC
$up[3] |= 0x80; # RA
$l -= 1; # truncate the response 1 byte
$replydata = pack("C[$l]", @up);
printf "sent %u bytes\n", $sock->send($replydata);
}

View file

@ -0,0 +1,49 @@
"""
Copyright (C) Internet Systems Consortium, Inc. ("ISC")
SPDX-License-Identifier: MPL-2.0
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, you can obtain one at https://mozilla.org/MPL/2.0/.
See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator
import dns.flags
from isctest.asyncserver import (
AsyncDnsServer,
BytesResponseSend,
QueryContext,
ResponseHandler,
)
class TruncatedWithLastByteDroppedHandler(ResponseHandler):
"""
Return a TC=1 response with the final byte removed to make
dns_message_parse() return ISC_R_UNEXPECTEDEND.
"""
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[BytesResponseSend, None]:
tc_response = qctx.query
tc_response.flags |= dns.flags.QR
tc_response.flags |= dns.flags.TC
tc_response.flags |= dns.flags.RA
yield BytesResponseSend(tc_response.to_wire()[:-1])
def main() -> None:
server = AsyncDnsServer(acknowledge_tsig_dnspython_hacks=True)
server.install_response_handler(TruncatedWithLastByteDroppedHandler())
server.run()
if __name__ == "__main__":
main()

View file

@ -21,6 +21,9 @@ pytestmark = pytest.mark.extra_artifacts(
]
)
# TSIG queries not supported by isctest.asyncserver with old dnspython
pytest.importorskip("dns", minversion="2.0.0")
def test_tsig(run_tests_sh):
run_tests_sh()