Extend isctest package with more utility functions

Check for more rcodes and various properties needed in the wildcard
test. Add a `name` module for various dns.name.Name operations (with
`prepend_label` function only now).

Expose `timeout` as a parameter of `query.tcp`/`query.udp`.

(cherry picked from commit e7d46ad8ba)
This commit is contained in:
Štěpán Balážik 2023-12-21 20:25:20 +01:00 committed by Petr Špaček
parent 6bd6777fb7
commit 3aac716a46
4 changed files with 45 additions and 2 deletions

View file

@ -12,6 +12,7 @@
from . import check
from . import instance
from . import query
from . import name
from . import rndc
from . import run
from . import log

View file

@ -9,6 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import shutil
from typing import Any, Optional
import dns.rcode
@ -95,3 +96,26 @@ def zones_equal(
)
assert found_rdataset
assert found_rdataset.ttl == rdataset.ttl
def is_executable(cmd: str, errmsg: str) -> None:
executable = shutil.which(cmd)
assert executable is not None, errmsg
def nxdomain(message: dns.message.Message) -> None:
rcode(message, dns.rcode.NXDOMAIN)
def single_question(message: dns.message.Message) -> None:
assert len(message.question) == 1, str(message)
def empty_answer(message: dns.message.Message) -> None:
assert not message.answer, str(message)
def is_response_to(response: dns.message.Message, query: dns.message.Message) -> None:
single_question(response)
single_question(query)
assert query.is_response(response), str(response)

View file

@ -0,0 +1,16 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import dns.name
def prepend_label(label: str, name: dns.name.Name) -> dns.name.Name:
return dns.name.Name((label,) + name.labels)

View file

@ -24,10 +24,11 @@ def udp(
ip: str,
port: Optional[int] = None,
source: Optional[str] = None,
timeout: int = QUERY_TIMEOUT,
) -> dns.message.Message:
if port is None:
port = int(os.environ["PORT"])
return dns.query.udp(message, ip, QUERY_TIMEOUT, port=port, source=source)
return dns.query.udp(message, ip, timeout, port=port, source=source)
def tcp(
@ -35,7 +36,8 @@ def tcp(
ip: str,
port: Optional[int] = None,
source: Optional[str] = None,
timeout: int = QUERY_TIMEOUT,
) -> dns.message.Message:
if port is None:
port = int(os.environ["PORT"])
return dns.query.tcp(message, ip, QUERY_TIMEOUT, port=port, source=source)
return dns.query.tcp(message, ip, timeout, port=port, source=source)