mirror of
https://github.com/isc-projects/bind9.git
synced 2026-03-09 17:50:52 -04:00
gnutls-cli is tricky to script around as it immediately closes the server connection when its standard input is closed. This prevents simple shell-based I/O redirection from being used for capturing the DNS response sent over a TLS connection and the workarounds for this issue employ non-standard utilities like "timeout". Instead of resorting to clever shell hacks, reimplement the relevant check in Python. Exit immediately upon receiving a valid DNS response or when gnutls-cli exits in order to decrease the test's run time. Employ dnspython to avoid the need for storing DNS queries in binary files and to improve test readability. Capture more diagnostic output to facilitate troubleshooting. Use a pytest fixture instead of an Autoconf macro to keep test requirements localized.
93 lines
3.8 KiB
Python
93 lines
3.8 KiB
Python
#!/usr/bin/python3
|
|
|
|
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
|
#
|
|
# SPDX-License-Identifier: MPL-2.0
|
|
#
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
|
|
#
|
|
# See the COPYRIGHT file distributed with this work for additional
|
|
# information regarding copyright ownership.
|
|
|
|
import selectors
|
|
import struct
|
|
import subprocess
|
|
import time
|
|
|
|
import pytest
|
|
|
|
pytest.importorskip('dns')
|
|
import dns.exception
|
|
import dns.message
|
|
import dns.rdatatype
|
|
|
|
|
|
def test_gnutls_cli_query(gnutls_cli_executable, named_tlsport):
|
|
# Prepare the example/SOA query which will be sent over TLS.
|
|
query = dns.message.make_query('example.', dns.rdatatype.SOA)
|
|
query_wire = query.to_wire()
|
|
query_with_length = struct.pack('>H', len(query_wire)) + query_wire
|
|
|
|
# Run gnutls-cli.
|
|
gnutls_cli_args = [gnutls_cli_executable, '--no-ca-verification', '-V',
|
|
'--no-ocsp', '--alpn=dot', '--logfile=gnutls-cli.log',
|
|
'--port=%d' % named_tlsport, '10.53.0.1']
|
|
with open('gnutls-cli.err', 'wb') as gnutls_cli_stderr, \
|
|
subprocess.Popen(gnutls_cli_args, stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE, stderr=gnutls_cli_stderr,
|
|
bufsize=0) as gnutls_cli:
|
|
# Send the example/SOA query to the standard input of gnutls-cli. Do
|
|
# not close standard input yet because that causes gnutls-cli to close
|
|
# the TLS connection immediately, preventing the response from being
|
|
# read.
|
|
gnutls_cli.stdin.write(query_with_length)
|
|
gnutls_cli.stdin.flush()
|
|
|
|
# Keep reading data from the standard output of gnutls-cli until a full
|
|
# DNS message is received or a timeout is exceeded or gnutls-cli exits.
|
|
# Popen.communicate() cannot be used here because: a) it closes
|
|
# standard input after sending data to the process (see above why this
|
|
# is a problem), b) gnutls-cli is not DNS-aware, so it does not exit
|
|
# upon receiving a DNS response.
|
|
selector = selectors.DefaultSelector()
|
|
selector.register(gnutls_cli.stdout, selectors.EVENT_READ)
|
|
deadline = time.time() + 10
|
|
gnutls_cli_output = b''
|
|
response = b''
|
|
while not response and not gnutls_cli.poll():
|
|
if not selector.select(timeout=deadline - time.time()):
|
|
break
|
|
gnutls_cli_output += gnutls_cli.stdout.read(512)
|
|
try:
|
|
# Ignore TCP length, just try to parse a DNS message from
|
|
# the rest of the data received.
|
|
response = dns.message.from_wire(gnutls_cli_output[2:])
|
|
except dns.exception.FormError:
|
|
continue
|
|
|
|
# At this point either a DNS response was received or a timeout fired
|
|
# or gnutls-cli exited prematurely. Close the standard input of
|
|
# gnutls-cli. Terminate it if that does not cause it to shut down
|
|
# gracefully.
|
|
gnutls_cli.stdin.close()
|
|
try:
|
|
gnutls_cli.wait(5)
|
|
except subprocess.TimeoutExpired:
|
|
gnutls_cli.kill()
|
|
|
|
# Store the response received for diagnostic purposes.
|
|
with open('gnutls-cli.out.bin', 'wb') as response_bin:
|
|
response_bin.write(gnutls_cli_output)
|
|
if response:
|
|
with open('gnutls-cli.out.txt', 'w', encoding='utf-8') as response_txt:
|
|
response_txt.write(response.to_text())
|
|
|
|
# Check whether a response was received and whether it is sane.
|
|
assert response
|
|
assert query.id == response.id
|
|
assert len(response.answer) == 1
|
|
assert response.answer[0].match(dns.name.from_text('example.'),
|
|
dns.rdataclass.IN, dns.rdatatype.SOA,
|
|
dns.rdatatype.NONE)
|