[9.20] chg: ci: Rework linting of Python code

With the Python version bumped to 3.10 and the dependency situation cleared with !11415 it is now time to run linters and formatters on more parts of the Python code that was previously skipped or ignored.

Switch configuration of the various Python-adjacent tools to `pyproject.toml` to ensure that the same configuration is used in CI and locally.

See the individual commits for details on settings changed and linters added. 

Tweaks to type checking and enabling more `ruff` lints will come in a subsequent MRs.

Prerequisites:
- bind9-qa!160.
- images!442

Backport of MR !11499

Merge branch 'backport-stepan/python-tooling-9.20' into 'bind-9.20'

See merge request isc-projects/bind9!11574
This commit is contained in:
Štěpán Balážik 2026-02-25 13:49:25 +00:00
commit 191026a4cc
169 changed files with 1558 additions and 1721 deletions

View file

@ -397,7 +397,7 @@ stages:
|| (test -s config.log && cat config.log; exit 1)
.git-clone-bind9-qa: &git_clone_bind9-qa
- git clone --depth 1 https://gitlab.isc.org/isc-projects/bind9-qa.git
- git clone --depth 1 https://gitlab.isc.org/isc-projects/bind9-qa.git "$CI_PROJECT_DIR"/bind9-qa
# change directory to the workspace before including this
.find_python: &find_python
@ -410,7 +410,7 @@ stages:
.parse_tsan: &parse_tsan
- *find_python
- find -name 'tsan.*' -exec "$PYTHON" util/parse_tsan.py {} \;
- find -name 'tsan.*' -exec "$PYTHON" "$CI_PROJECT_DIR"/bind9-qa/ci/parse_tsan.py {} \;
.check_readline_setup: &check_readline_setup
- if [[ -n "${WITHOUT_READLINE}" ]]; then
@ -535,7 +535,9 @@ stages:
-F "variables[SHOTGUN_EVAL_THRESHOLD_LATENCY_PCTL_DRIFT_MIN]=$SHOTGUN_EVAL_THRESHOLD_LATENCY_PCTL_DRIFT_MIN"
-F "variables[SHOTGUN_EVAL_THRESHOLD_LATENCY_PCTL_DRIFT_MAX]=$SHOTGUN_EVAL_THRESHOLD_LATENCY_PCTL_DRIFT_MAX"
https://gitlab.isc.org/api/v4/projects/188/trigger/pipeline | jq .id)
- util/ci-wait-shotgun.py $PIPELINE_ID
- *git_clone_bind9-qa
- >
"$CI_PROJECT_DIR"/bind9-qa/ci/wait_shotgun.py $PIPELINE_ID
needs:
- job: ci-variables
artifacts: true
@ -557,14 +559,14 @@ stages:
- *fips_feature_test
- *find_pytest
- *find_python
- ( if [ "${CI_DISPOSABLE_ENVIRONMENT}" = "true" ]; then sleep 3000; "$PYTHON" "${CI_PROJECT_DIR}/util/get-running-system-tests.py"; fi ) &
- *git_clone_bind9-qa
- ( if [ "${CI_DISPOSABLE_ENVIRONMENT}" = "true" ]; then sleep 3000; "$PYTHON" "${CI_PROJECT_DIR}/bind9-qa/ci/get_running_system_tests.py"; fi ) &
- cd bin/tests/system
- RET=0
- >
("$PYTEST" --junit-xml="$CI_PROJECT_DIR"/junit_pytest.xml -n "$TEST_PARALLEL_JOBS" | tee pytest.out.txt) || RET=1
- *git_clone_bind9-qa
- >
"$PYTHON" bind9-qa/ci/postprocess_junit_files.py "$CI_PROJECT_DIR"/junit_pytest.xml --output "$CI_PROJECT_DIR"/junit.xml
"$PYTHON" "$CI_PROJECT_DIR"/bind9-qa/ci/postprocess_junit_files.py "$CI_PROJECT_DIR"/junit_pytest.xml --output "$CI_PROJECT_DIR"/junit.xml
- (exit $RET)
- '( ! grep -F "grep: warning:" pytest.out.txt )'
- test "$CLEAN_BUILD_ARTIFACTS_ON_SUCCESS" -eq 0 || ( cd ../../.. && make clean >/dev/null 2>&1 )
@ -598,10 +600,10 @@ stages:
- cd "$CI_PROJECT_DIR"
- *find_python
- >
"$PYTHON" bin/tests/convert-trs-to-junit.py . > "$CI_PROJECT_DIR"/junit_system.xml
"$PYTHON" bin/tests/convert_trs_to_junit.py . > "$CI_PROJECT_DIR"/junit_system.xml
- *git_clone_bind9-qa
- >
"$PYTHON" bind9-qa/ci/postprocess_junit_files.py "$CI_PROJECT_DIR"/junit_system.xml --output "$CI_PROJECT_DIR"/junit.xml
"$PYTHON" "$CI_PROJECT_DIR"/bind9-qa/ci/postprocess_junit_files.py "$CI_PROJECT_DIR"/junit_system.xml --output "$CI_PROJECT_DIR"/junit.xml
- (exit $RET)
after_script:
- cat bin/tests/system/test-suite.log || true
@ -630,9 +632,9 @@ stages:
- *find_python
- *git_clone_bind9-qa
- >
"$PYTHON" "$CI_PROJECT_DIR"/bin/tests/convert-trs-to-junit.py . > "$CI_PROJECT_DIR"/junit_unit.xml
"$PYTHON" "$CI_PROJECT_DIR"/bin/tests/convert_trs_to_junit.py . > "$CI_PROJECT_DIR"/junit_unit.xml
- >
"$PYTHON" bind9-qa/ci/postprocess_junit_files.py "$CI_PROJECT_DIR"/junit_unit.xml --output "$CI_PROJECT_DIR"/junit.xml
"$PYTHON" "$CI_PROJECT_DIR"/bind9-qa/ci/postprocess_junit_files.py "$CI_PROJECT_DIR"/junit_unit.xml --output "$CI_PROJECT_DIR"/junit.xml
- (exit $RET)
- test "$CLEAN_BUILD_ARTIFACTS_ON_SUCCESS" -eq 0 || make clean >/dev/null 2>&1
artifacts:
@ -664,7 +666,7 @@ stages:
- make -j${BUILD_PARALLEL_JOBS:-1} V=1
- *setup_interfaces
- *git_clone_bind9-qa
- cd bind9-qa/respdiff
- cd "$CI_PROJECT_DIR"/bind9-qa/respdiff
needs: []
artifacts:
paths:
@ -714,7 +716,7 @@ vulture:
<<: *quick_checks_job
<<: *python_triggering_rules
script:
- vulture --exclude "*ans.py,conftest.py,re_compile_checker.py,isctest" --ignore-names "after_servers_start,bootstrap,pytestmark,autouse_*" bin/tests/system/
- vulture
ci-variables:
<<: *quick_checks_job
@ -736,7 +738,8 @@ ci-orphaned-anchors:
<<: *quick_checks_job
script:
- *git_clone_bind9-qa
- bind9-qa/ci-orphaned-anchors/check-orphaned-anchors-ci.py .gitlab-ci.yml
- >
"$CI_PROJECT_DIR"/bind9-qa/ci-orphaned-anchors/check_orphaned_anchors_ci.py .gitlab-ci.yml
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
changes:
@ -783,7 +786,7 @@ doctest:
"$PYTEST" --noconftest --doctest-modules --junit-xml="$CI_PROJECT_DIR/junit_doctest.xml" || RET=1
- *git_clone_bind9-qa
- >
"$PYTHON" bind9-qa/ci/postprocess_junit_files.py "$CI_PROJECT_DIR/junit_doctest.xml" --output "$CI_PROJECT_DIR/junit.xml"
"$PYTHON" "$CI_PROJECT_DIR"/bind9-qa/ci/postprocess_junit_files.py "$CI_PROJECT_DIR/junit_doctest.xml" --output "$CI_PROJECT_DIR/junit.xml"
- (exit $RET)
needs:
- job: autoreconf
@ -796,12 +799,14 @@ doctest:
pylint:
<<: *quick_checks_job
<<: *python_triggering_rules
variables:
PYTHONPATH: "${CI_PROJECT_DIR}/bin/tests/system"
script:
- pylint --rcfile $CI_PROJECT_DIR/.pylintrc $(git ls-files '*.py' | grep -vE '(ans\.py|dangerfile\.py|^bin/tests/system/|^contrib/)')
# Ignore Pylint wrong-import-position error in system test to enable use of pytest.importorskip
- pylint --rcfile $CI_PROJECT_DIR/.pylintrc --load-plugins re_compile_checker --disable=wrong-import-position $(git ls-files 'bin/tests/system/*.py' | grep -vE '(ans\.py|vulture_ignore_list\.py)')
- pylint $(git ls-files '*.py')
ruff:
<<: *quick_checks_job
<<: *python_triggering_rules
script:
- ruff check
reuse:
<<: *quick_checks_job
@ -856,7 +861,8 @@ generate-stress-test-configs:
<<: *quick_checks_job
script:
- *git_clone_bind9-qa
- bind9-qa/stress/generate-stress-test-configs.py > stress-test-configs.yml
- >
"$CI_PROJECT_DIR"/bind9-qa/stress/generate_stress_test_configs.py > stress-test-configs.yml
artifacts:
paths:
- stress-test-configs.yml
@ -864,7 +870,9 @@ generate-stress-test-configs:
generate-tsan-stress-test-configs:
<<: *quick_checks_job
script:
- util/generate-tsan-stress-jobs.py > tsan-stress-test-configs.yml
- *git_clone_bind9-qa
- >
"$CI_PROJECT_DIR"/bind9-qa/ci/generate_tsan_stress_jobs.py > tsan-stress-test-configs.yml
artifacts:
paths:
- tsan-stress-test-configs.yml
@ -1101,7 +1109,7 @@ cross-version-config-tests:
"$PYTEST" --setup-only --junit-xml="$CI_PROJECT_DIR"/junit_pytest.xml -n "${TEST_PARALLEL_JOBS:-1}" || RET=1
- *git_clone_bind9-qa
- >
"$PYTHON" bind9-qa/ci/postprocess_junit_files.py "$CI_PROJECT_DIR"/junit_pytest.xml --output "$CI_PROJECT_DIR"/junit.xml
"$PYTHON" "$CI_PROJECT_DIR"/bind9-qa/ci/postprocess_junit_files.py "$CI_PROJECT_DIR"/junit_pytest.xml --output "$CI_PROJECT_DIR"/junit.xml
- (exit $RET)
needs:
- job: autoreconf
@ -2056,7 +2064,8 @@ publish:
variables:
GIT_DEPTH: 1
script:
- bind9-qa/releng/printing_press_mr.py --document "${DOCUMENT}" --metadata bind9-qa/releng/metadata.json ${FORCE_CVE_IDS:+--force-cve-ids ${FORCE_CVE_IDS}} ${FORCE_SECURITY_RELEASES:+--force-security-releases ${FORCE_SECURITY_RELEASES}}
- >
"$CI_PROJECT_DIR"/bind9-qa/releng/printing_press_mr.py --document "${DOCUMENT}" --metadata "$CI_PROJECT_DIR"/bind9-qa/releng/metadata.json ${FORCE_CVE_IDS:+--force-cve-ids ${FORCE_CVE_IDS}} ${FORCE_SECURITY_RELEASES:+--force-security-releases ${FORCE_SECURITY_RELEASES}}
artifacts:
paths:
- printing-press/
@ -2114,7 +2123,8 @@ merge-tag:
variables:
GIT_DEPTH: 100
script:
- bind9-qa/releng/merge_tag.py --tag "$CI_COMMIT_TAG"
- >
"$CI_PROJECT_DIR"/bind9-qa/releng/merge_tag.py --tag "$CI_COMMIT_TAG"
rules:
- *rule_tag_open_source
artifacts:
@ -2127,11 +2137,13 @@ update-stable-tag:
variables:
GIT_DEPTH: 1
script:
- bind9-qa/releng/update_stable_tag.py --tag "$CI_COMMIT_TAG"
- >
"$CI_PROJECT_DIR"/bind9-qa/releng/update_stable_tag.py --tag "$CI_COMMIT_TAG"
# Work around https://github.com/readthedocs/readthedocs.org/issues/10838
- sleep 300
- rm -rf ./bind9 # remove already cloned project
- bind9-qa/releng/update_stable_tag.py --tag "$CI_COMMIT_TAG"
- >
"$CI_PROJECT_DIR"/bind9-qa/releng/update_stable_tag.py --tag "$CI_COMMIT_TAG"
rules:
- *rule_tag_open_source
artifacts:
@ -2165,7 +2177,8 @@ customer-git:branch:
- *git_clone_bind9-qa
script:
- git checkout -b "$BRANCH" # ensure refs/heads/$BRANCH exists; GitLab clones with detached HEAD
- bind9-qa/releng/push_to_customer_repository.py --branch "$BRANCH" --customer "$CUSTOMER" --force
- >
"$CI_PROJECT_DIR"/bind9-qa/releng/push_to_customer_repository.py --branch "$BRANCH" --customer "$CUSTOMER" --force
customer-git:tag:
<<: *customer_git
@ -2178,7 +2191,8 @@ customer-git:tag:
- *git_clone_bind9-qa
- git clone --depth 1 "https://token:${ISC_CUSTOMERS_WRITE_TOKEN}@gitlab.isc.org/isc-customers/isc-customer-settings.git"
script:
- bind9-qa/releng/push_to_customer_repository.py --tag "$CI_COMMIT_TAG" --entitlements isc-customer-settings/entitlements.yaml --force
- >
"$CI_PROJECT_DIR"/bind9-qa/releng/push_to_customer_repository.py --tag "$CI_COMMIT_TAG" --entitlements isc-customer-settings/entitlements.yaml --force
# Respdiff tests
@ -2250,7 +2264,7 @@ respdiff-third-party:
- autoreconf -fi
- *configure
- make -j${BUILD_PARALLEL_JOBS:-1} V=1
- cd bind9-qa/respdiff
- cd "$CI_PROJECT_DIR"/bind9-qa/respdiff
- bash respdiff.sh -s named -q "${PWD}/100k_mixed.txt" -c 3 -w "${PWD}/rspworkdir" "${CI_PROJECT_DIR}/version-under-test" "${CI_PROJECT_DIR}/bin/named/named"
respdiff:recent-named:
@ -2402,14 +2416,16 @@ backports:
script:
# CI job token is not sufficient for push operations
- git remote get-url origin | sed -e "s/gitlab-ci-token:$CI_JOB_TOKEN/oauth2:$BIND_TEAM_WRITE_TOKEN/" | xargs git remote set-url --push origin
- bind9-qa/releng/backport_mr.py $CI_PROJECT_ID "$MERGE_REQUEST_ID"
- >
"$CI_PROJECT_DIR"/bind9-qa/releng/backport_mr.py $CI_PROJECT_ID "$MERGE_REQUEST_ID"
merged-metadata:
<<: *post_merge
rules:
- if: '$CI_PIPELINE_SOURCE == "push" && ($CI_COMMIT_REF_NAME =~ /^bind-9.[0-9]+(-sub)?$/ || $CI_COMMIT_REF_NAME =~ /^v9.[0-9]+.[0-9]+-release$/ || $CI_COMMIT_REF_NAME == $CI_DEFAULT_BRANCH)'
script:
- bind9-qa/releng/after_merge.py "$CI_PROJECT_ID" "$MERGE_REQUEST_ID"
- >
"$CI_PROJECT_DIR"/bind9-qa/releng/after_merge.py "$CI_PROJECT_ID" "$MERGE_REQUEST_ID"
auto-rebase-trigger:
stage: postmerge

View file

@ -1,28 +0,0 @@
[IMPORTS]
deprecated-modules=
dns.resolver,
[MESSAGES CONTROL]
disable=
C0103, # invalid-name
C0114, # missing-module-docstring
C0115, # missing-class-docstring
C0116, # missing-function-docstring
C0209, # consider-using-f-string
C0301, # line-too-long, handled better by black
C0302, # too-many-lines
C0415, # import-outside-toplevel
R0801, # duplicate-code
R0901, # too-many-ancestors
R0902, # too-many-instance-attributes
R0903, # too-few-public-methods
R0904, # too-many-public-methods
R0911, # too-many-return-statements
R0912, # too-many-branches
R0913, # too-many-arguments
R0914, # too-many-locals
R0915, # too-many-statements
R0916, # too-many-boolean-expressions
R0917, # too-many-positional-arguments

View file

@ -1,6 +1,6 @@
include $(top_srcdir)/Makefile.top
EXTRA_DIST = convert-trs-to-junit.py
EXTRA_DIST = convert_trs_to_junit.py
SUBDIRS = system

View file

@ -6,24 +6,22 @@
#
# Convert automake .trs files into JUnit format suitable for Gitlab
from xml.etree import ElementTree
from xml.etree.ElementTree import Element, SubElement
import argparse
import os
import sys
from xml.etree import ElementTree
from xml.etree.ElementTree import Element
from xml.etree.ElementTree import SubElement
# getting explicit encoding specification right for Python 2/3 would be messy,
# so let's hope for the best
def read_whole_text(filename):
with open(filename) as inf: # pylint: disable-msg=unspecified-encoding
with open(filename, encoding="utf-8") as inf:
return inf.read().strip()
def read_trs_result(filename):
result = None
with open(filename, "r") as trs: # pylint: disable-msg=unspecified-encoding
with open(filename, "r", encoding="utf-8") as trs:
for line in trs:
items = line.split()
if len(items) < 2:
@ -125,7 +123,7 @@ def check_directory(path):
os.listdir(path)
return path
except OSError as ex:
msg = "Path {} cannot be listed as a directory: {}".format(path, ex)
msg = f"Path {path} cannot be listed as a directory: {ex}"
raise argparse.ArgumentTypeError(msg)

View file

@ -68,7 +68,7 @@ def test_rndc_deadlock(ns3):
# Create 4 worker threads running "rndc" commands in a loop.
with concurrent.futures.ThreadPoolExecutor() as executor:
for i in range(1, 5):
domain = "example%d" % i
domain = f"example{i}"
executor.submit(rndc_loop, test_state, domain, ns3)
# Run "rndc status" 10 times, with 1-second pauses between attempts.

View file

@ -9,9 +9,7 @@ file, you can obtain one at https://mozilla.org/MPL/2.0/.
See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
"""
This is a bare-bones DNS server that only serves data from zone files. It is
meant to be used as a replacement for full-blown named instances in system
tests when a given server is only required to return zone-based data.
@ -34,9 +32,7 @@ useful in other system tests, please consider opening a merge request extending
isctest/asyncserver.py.
"""
from isctest.asyncserver import (
AsyncDnsServer,
)
from isctest.asyncserver import AsyncDnsServer
def main() -> None:

View file

@ -11,18 +11,14 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
import dns.rdatatype
import dns.rrset
from isctest.asyncserver import (
DnsResponseSend,
QueryContext,
ResponseAction,
)
from isctest.asyncserver import DnsResponseSend, QueryContext, ResponseAction
from bailiwick_ans import ResponseSpoofer, spoofing_server
from ..bailiwick_ans import ResponseSpoofer, spoofing_server
ATTACKER_IP = "10.53.0.3"
TTL = 3600

View file

@ -11,18 +11,14 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
import dns.rdatatype
import dns.rrset
from isctest.asyncserver import (
DnsResponseSend,
QueryContext,
ResponseAction,
)
from isctest.asyncserver import DnsResponseSend, QueryContext, ResponseAction
from bailiwick_ans import ResponseSpoofer, spoofing_server
from ..bailiwick_ans import ResponseSpoofer, spoofing_server
ATTACKER_IP = "10.53.0.3"
TTL = 3600

View file

@ -11,8 +11,6 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import Dict, List, Optional, Type
import abc
import dns.name
@ -30,14 +28,14 @@ from isctest.asyncserver import (
class ResponseSpoofer(ResponseHandler, abc.ABC):
spoofers: Dict[str, Type["ResponseSpoofer"]] = {}
spoofers: dict[str, type["ResponseSpoofer"]] = {}
def __init_subclass__(cls, mode: str) -> None:
assert mode not in cls.spoofers
cls.spoofers[mode] = cls
@classmethod
def get_spoofer(cls, mode: str) -> Optional["ResponseSpoofer"]:
def get_spoofer(cls, mode: str) -> "ResponseSpoofer | None":
try:
return cls.spoofers[mode]()
except KeyError:
@ -66,11 +64,11 @@ class SetSpoofingModeCommand(ControlCommand):
control_subdomain = "set-spoofing-mode"
def __init__(self) -> None:
self._current_handler: Optional[ResponseSpoofer] = None
self._current_handler: ResponseSpoofer | None = None
def handle(
self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> Optional[str]:
self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> str | None:
if len(args) != 1:
qctx.response.set_rcode(dns.rcode.SERVFAIL)
return "invalid control command"

View file

@ -9,20 +9,20 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import Dict
import time
import dns.message
import dns.rrset
import pytest
import isctest
from isctest.instance import NamedInstance
import isctest
@pytest.fixture(autouse=True)
def autouse_flush_resolver_cache(servers: Dict[str, NamedInstance]) -> None:
def autouse_flush_resolver_cache(servers: dict[str, NamedInstance]) -> None:
servers["ns4"].rndc("flush")
@ -78,7 +78,7 @@ def check_domain_hijack(ns4: NamedInstance) -> None:
)
def test_bailiwick_sibling_ns_referral(servers: Dict[str, NamedInstance]) -> None:
def test_bailiwick_sibling_ns_referral(servers: dict[str, NamedInstance]) -> None:
set_spoofing_mode(ans1="sibling-ns", ans2="none")
ns4 = servers["ns4"]
@ -86,7 +86,7 @@ def test_bailiwick_sibling_ns_referral(servers: Dict[str, NamedInstance]) -> Non
check_domain_hijack(ns4)
def test_bailiwick_unsolicited_authority(servers: Dict[str, NamedInstance]) -> None:
def test_bailiwick_unsolicited_authority(servers: dict[str, NamedInstance]) -> None:
set_spoofing_mode(ans1="none", ans2="unsolicited-ns")
ns4 = servers["ns4"]
@ -95,7 +95,7 @@ def test_bailiwick_unsolicited_authority(servers: Dict[str, NamedInstance]) -> N
check_domain_hijack(ns4)
def test_bailiwick_parent_glue(servers: Dict[str, NamedInstance]) -> None:
def test_bailiwick_parent_glue(servers: dict[str, NamedInstance]) -> None:
set_spoofing_mode(ans1="none", ans2="parent-glue")
ns4 = servers["ns4"]
@ -108,7 +108,7 @@ def test_bailiwick_parent_glue(servers: Dict[str, NamedInstance]) -> None:
check_domain_hijack(ns4)
def test_bailiwick_spoofed_dname(servers: Dict[str, NamedInstance]) -> None:
def test_bailiwick_spoofed_dname(servers: dict[str, NamedInstance]) -> None:
set_spoofing_mode(ans1="none", ans2="dname")
ns4 = servers["ns4"]

View file

@ -11,7 +11,7 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
import dns.name
import dns.rcode

View file

@ -11,9 +11,9 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from enum import Enum
from typing import AsyncGenerator, List, Optional, Tuple
import abc
import logging
@ -123,7 +123,7 @@ class RecordGenerator(abc.ABC):
def __init__(self, name_generator: ChainNameGenerator) -> None:
self._name_generator = name_generator
def get_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
def get_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
"""
Return the lists of records and their signatures that should be
generated in response to a given "action".
@ -155,7 +155,7 @@ class RecordGenerator(abc.ABC):
raise NotImplementedError
@abc.abstractmethod
def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
"""
Return the lists of records and their signatures that should be
generated in response to a given "action".
@ -170,7 +170,7 @@ class CnameRecordGenerator(RecordGenerator):
response_count = 1
def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
owner = self._name_generator.current_name
target = self._name_generator.generate_next_name().to_text()
response = self.create_rrset(owner, dns.rdatatype.CNAME, target)
@ -182,7 +182,7 @@ class DnameRecordGenerator(RecordGenerator):
response_count = 2
def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
dname_owner = self._name_generator.current_domain
cname_owner = self._name_generator.current_name
dname_target = self._name_generator.generate_next_sld().to_text()
@ -206,7 +206,7 @@ class XnameRecordGenerator(RecordGenerator):
response_count = 1
def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
owner = self._name_generator.current_name
target = self._name_generator.generate_next_name_in_next_sld().to_text()
response = self.create_rrset(owner, dns.rdatatype.CNAME, target)
@ -218,7 +218,7 @@ class FinalRecordGenerator(RecordGenerator):
response_count = 1
def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
owner = self._name_generator.current_name
response = self.create_rrset(owner, dns.rdatatype.A, "10.53.0.4")
signature = self.create_rrset_signature(owner, response.rdtype)
@ -297,11 +297,11 @@ class ChainSetupCommand(ControlCommand):
control_subdomain = "setup-chain"
def __init__(self) -> None:
self._current_handler: Optional[ChainResponseHandler] = None
self._current_handler: ChainResponseHandler | None = None
def handle(
self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> Optional[str]:
self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> str | None:
try:
actions, selectors = self._parse_args(args)
except ValueError as exc:
@ -320,8 +320,8 @@ class ChainSetupCommand(ControlCommand):
return "chain response setup successful"
def _parse_args(
self, args: List[str]
) -> Tuple[List[ChainAction], List[ChainSelector]]:
self, args: list[str]
) -> tuple[list[ChainAction], list[ChainSelector]]:
try:
delimiter = args.index("_")
except ValueError as exc:
@ -335,7 +335,7 @@ class ChainSetupCommand(ControlCommand):
return actions, selectors
def _parse_args_actions(self, args_actions: List[str]) -> List[ChainAction]:
def _parse_args_actions(self, args_actions: list[str]) -> list[ChainAction]:
actions = []
for action in args_actions + ["FINAL"]:
@ -347,8 +347,8 @@ class ChainSetupCommand(ControlCommand):
return actions
def _parse_args_selectors(
self, args_selectors: List[str], actions: List[ChainAction]
) -> List[ChainSelector]:
self, args_selectors: list[str], actions: list[ChainAction]
) -> list[ChainSelector]:
max_response_index = self._get_max_response_index(actions)
selectors = []
@ -366,19 +366,19 @@ class ChainSetupCommand(ControlCommand):
return selectors
def _get_max_response_index(self, actions: List[ChainAction]) -> int:
def _get_max_response_index(self, actions: list[ChainAction]) -> int:
rrset_generator_classes = [a.value for a in actions]
return sum(g.response_count for g in rrset_generator_classes)
def _prepare_answer(
self, actions: List[ChainAction], selectors: List[ChainSelector]
) -> List[dns.rrset.RRset]:
self, actions: list[ChainAction], selectors: list[ChainSelector]
) -> list[dns.rrset.RRset]:
all_responses, all_signatures = self._generate_rrsets(actions)
return self._select_rrsets(all_responses, all_signatures, selectors)
def _generate_rrsets(
self, actions: List[ChainAction]
) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
self, actions: list[ChainAction]
) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
all_responses = []
all_signatures = []
name_generator = ChainNameGenerator()
@ -394,10 +394,10 @@ class ChainSetupCommand(ControlCommand):
def _select_rrsets(
self,
all_responses: List[dns.rrset.RRset],
all_signatures: List[dns.rrset.RRset],
selectors: List[ChainSelector],
) -> List[dns.rrset.RRset]:
all_responses: list[dns.rrset.RRset],
all_signatures: list[dns.rrset.RRset],
selectors: list[ChainSelector],
) -> list[dns.rrset.RRset]:
rrsets = []
for selector in selectors:
@ -418,7 +418,7 @@ class ChainResponseHandler(DomainHandler):
domains = ["domain.nil."]
def __init__(self, answer_rrsets: List[dns.rrset.RRset]):
def __init__(self, answer_rrsets: list[dns.rrset.RRset]):
super().__init__()
self._answer_rrsets = answer_rrsets
@ -441,7 +441,7 @@ class ChainResponseHandler(DomainHandler):
qctx.response.use_edns()
yield DnsResponseSend(qctx.response)
def _non_chain_answer(self, qctx: QueryContext) -> List[dns.rrset.RRset]:
def _non_chain_answer(self, qctx: QueryContext) -> list[dns.rrset.RRset]:
owner = qctx.qname
return [
RecordGenerator.create_rrset(owner, dns.rdatatype.A, "10.53.0.4"),
@ -449,14 +449,14 @@ class ChainResponseHandler(DomainHandler):
]
@property
def _authority_rrsets(self) -> List[dns.rrset.RRset]:
def _authority_rrsets(self) -> list[dns.rrset.RRset]:
owner = dns.name.from_text("domain.nil.")
return [
RecordGenerator.create_rrset(owner, dns.rdatatype.NS, "ns1.domain.nil."),
]
@property
def _additional_rrsets(self) -> List[dns.rrset.RRset]:
def _additional_rrsets(self) -> list[dns.rrset.RRset]:
owner = dns.name.from_text("ns1.domain.nil.")
return [
RecordGenerator.create_rrset(owner, dns.rdatatype.A, "10.53.0.4"),

View file

@ -12,21 +12,19 @@
# information regarding copyright ownership.
from typing import NamedTuple, Tuple
from typing import NamedTuple
import os
import sys
import time
import isctest
import pytest
import dns.exception
import dns.message
import dns.name
import dns.rcode
import dns.rdataclass
import dns.rdatatype
import pytest
import isctest
pytestmark = [
pytest.mark.skipif(
@ -189,7 +187,7 @@ def keystate_check(server, zone, key):
class CheckDSTest(NamedTuple):
zone: str
logs_to_wait_for: Tuple[str]
logs_to_wait_for: tuple[str]
expected_parent_state: str

View file

@ -11,10 +11,9 @@
from re import compile as Re
import dns.rcode
import pytest
import dns.message
import isctest
import isctest.mark

View file

@ -9,23 +9,28 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import filecmp
import os
from pathlib import Path
from re import compile as Re
import filecmp
import os
import shutil
import subprocess
import sys
import tempfile
import time
import sys
import pytest
pytest.register_assert_rewrite("isctest")
import isctest
# pylint: disable=wrong-import-position
from isctest.vars.dirs import SYSTEM_TEST_DIR_GIT_PATH
import isctest
# pylint: enable=wrong-import-position
# Silence warnings caused by passing a pytest fixture to another fixture.
# pylint: disable=redefined-outer-name
@ -38,9 +43,7 @@ isctest.vars.init_vars()
# ----------------------- Globals definition -----------------------------
XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", "")
FILE_DIR = os.path.abspath(Path(__file__).parent)
ENV_RE = Re(b"([^=]+)=(.*)")
PRIORITY_TESTS = [
# Tests that are scheduled first. Speeds up parallel execution.
"rpz/",
@ -70,25 +73,6 @@ def pytest_addoption(parser):
)
def pytest_configure(config):
# Ensure this hook only runs on the main pytest instance if xdist is
# used to spawn other workers.
if not XDIST_WORKER:
if config.pluginmanager.has_plugin("xdist") and config.option.numprocesses:
# system tests depend on module scope for setup & teardown
# enforce use "loadscope" scheduler or disable paralelism
try:
import xdist.scheduler.loadscope # pylint: disable=unused-import
except ImportError:
isctest.log.debug(
"xdist is too old and does not have "
"scheduler.loadscope, disabling parallelism"
)
config.option.dist = "no"
else:
config.option.dist = "loadscope"
def pytest_ignore_collect(collection_path):
# System tests are executed in temporary directories inside
# bin/tests/system. These temporary directories contain all files
@ -245,6 +229,11 @@ def control_port():
return int(os.environ["CONTROLPORT"])
@pytest.fixture(scope="module")
def default_algorithm():
return isctest.vars.algorithms.Algorithm.default()
@pytest.fixture(scope="module")
def system_test_name(request):
"""Name of the system test directory."""
@ -272,7 +261,7 @@ def configure_algorithm_set(request):
name = None
else:
name = mark.args[0]
isctest.vars.set_algorithm_set(name)
isctest.vars.algorithms.set_algorithm_set(name)
@pytest.fixture(autouse=True)

View file

@ -6,9 +6,11 @@
#
# Convert JUnit pytest output to automake .trs files
from pathlib import Path
from xml.etree import ElementTree
import argparse
import sys
from xml.etree import ElementTree
def junit_to_trs(junit_xml):
@ -16,7 +18,7 @@ def junit_to_trs(junit_xml):
testcases = root.findall(".//testcase")
if len(testcases) < 1:
print(":test-result: ERROR convert-junit-to-trs.py")
print(":test-result: ERROR convert_junit_to_trs.py")
return 99
has_fail = False
@ -57,12 +59,13 @@ def main():
)
parser.add_argument(
"junit_file",
type=argparse.FileType("r", encoding="utf-8"),
type=Path,
help="junit xml result file",
)
args = parser.parse_args()
junit_xml = args.junit_file.read()
with args.junit_file.open(encoding="utf-8") as junit_file:
junit_xml = junit_file.read()
sys.exit(junit_to_trs(junit_xml))

View file

@ -9,7 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from cookie_ans import cookie_server
from ..cookie_ans import cookie_server
def main() -> None:

View file

@ -9,7 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from cookie_ans import cookie_server
from ..cookie_ans import cookie_server
def main() -> None:

View file

@ -9,7 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
import dns.edns
import dns.name
@ -20,12 +20,11 @@ import dns.tsigkeyring
from isctest.asyncserver import (
AsyncDnsServer,
ResponseHandler,
DnsResponseSend,
DnsProtocol,
DnsResponseSend,
QueryContext,
ResponseHandler,
)
from isctest.name import prepend_label
from isctest.vars.algorithms import ALG_VARS

View file

@ -146,7 +146,7 @@ else
fi
# Run junit to trs converter script.
./convert-junit-to-trs.py $junit_file >$trs_file
./convert_junit_to_trs.py $junit_file >$trs_file
estatus=$?
if test $enable_hard_errors = no && test $estatus -eq 99; then

View file

@ -9,7 +9,8 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import dns
import dns.rrset
import isctest

View file

@ -9,7 +9,9 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import dns.flags
import dns.message
import dns.rcode
import pytest
import isctest

View file

@ -9,10 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from isctest.asyncserver import (
AsyncDnsServer,
IgnoreAllQueries,
)
from isctest.asyncserver import AsyncDnsServer, IgnoreAllQueries
def main() -> None:

View file

@ -9,8 +9,9 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from collections.abc import AsyncGenerator
import logging
from typing import AsyncGenerator, List, Optional
import dns.rcode
import dns.rdatatype
@ -30,7 +31,7 @@ from isctest.asyncserver import (
class ErraticAxfrHandler(ResponseHandler):
allowed_actions = ["no-response", "partial-axfr", "complete-axfr"]
def __init__(self, actions: List[str]) -> None:
def __init__(self, actions: list[str]) -> None:
self.actions = actions
self.counter = 0
for action in actions:
@ -68,10 +69,10 @@ class ResponseSequenceCommand(ControlCommand):
control_subdomain = "response-sequence"
def __init__(self) -> None:
self._current_handler: Optional[ResponseHandler] = None
self._current_handler: ResponseHandler | None = None
def handle(
self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> str:
for action in args:
if action not in ErraticAxfrHandler.allowed_actions:

View file

@ -9,7 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
import dns.opcode
import dns.rcode
@ -17,8 +17,8 @@ import dns.rcode
from isctest.asyncserver import (
AsyncDnsServer,
DnsResponseSend,
ResponseHandler,
QueryContext,
ResponseHandler,
)

View file

@ -9,9 +9,8 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
import dns
import dns.rcode
from isctest.asyncserver import (

View file

@ -13,8 +13,7 @@ import sys
try:
import yaml
# pylint: disable=bare-except
except:
except ImportError:
print("No python yaml module, skipping")
sys.exit(1)

View file

@ -9,7 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
import dns.flags
import dns.rcode

View file

@ -11,10 +11,10 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import pytest
import isctest
import dns.message
import pytest
import isctest
pytestmark = pytest.mark.extra_artifacts(
[

View file

@ -0,0 +1,177 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from pylint.checkers import BaseChecker
import astroid
class DnsExplicitImportsChecker(BaseChecker):
name = "dns-explicit-imports"
msgs = {
"W9001": (
"Bare 'import dns' is discouraged; import required submodules explicitly",
"dns-bare-import",
"Emitted when the package root 'dns' is imported directly.",
),
"W9002": (
"Missing explicit import for '%s' (add `import %s`)",
"dns-missing-submodule-import",
"Emitted when code references dns.<...> but the corresponding module prefix "
"was not imported with `import dns.<...>`.",
),
"W9003": (
"Unused explicit import for '%s' (remove `import %s`)",
"dns-unused-submodule-import",
"Emitted when a dns.<...> module is imported explicitly but not used.",
),
}
def __init__(self, linter=None):
super().__init__(linter)
self._imported = {}
self._imported_aliases = set()
self._required = {}
def visit_module(self, node): # pylint: disable=unused-argument
self._imported = {}
self._imported_aliases = set()
self._required = {}
def leave_module(self, node): # pylint: disable=unused-argument
for mod, use_node in sorted(self._required.items()):
if mod in self._imported:
continue
prefix = mod + "."
if any(name.startswith(prefix) for name in self._imported):
continue
self.add_message(
"dns-missing-submodule-import",
node=use_node,
args=(mod, mod),
)
for mod, import_node in sorted(self._imported.items()):
if mod in self._imported_aliases:
continue
if any(
name == mod or name.startswith(mod + ".") for name in self._required
):
continue
self.add_message(
"dns-unused-submodule-import",
node=import_node,
args=(mod, mod),
)
def visit_import(self, node):
for name, _asname in node.names:
if name == "dns":
self.add_message("dns-bare-import", node=node)
continue
if name.startswith("dns."):
self._imported.setdefault(name, node)
if _asname:
self._imported_aliases.add(name)
def visit_importfrom(self, node): # pylint: disable=unused-argument
return
def visit_attribute(self, node):
parent = node.parent
# For `dns.a.b.c`, astroid visits intermediate attributes too.
# Process only the rightmost node to avoid duplicate bookkeeping.
if isinstance(parent, astroid.nodes.Attribute) and parent.expr is node:
return
mod = self._dns_module_for_attribute(node)
if mod is None:
return
self._required.setdefault(mod, node)
@staticmethod
def _dns_attribute_nodes(node):
"""
Return the chain of Attribute nodes as a list.
For `dns.a.b.c`, return the list of Attribute nodes for `dns.a`, `dns.a.b`, and `dns.a.b.c`.
Return None if the chain is not rooted in `dns`.
"""
if not isinstance(node, astroid.nodes.Attribute):
return None
nodes = []
expr = node
while isinstance(expr, astroid.nodes.Attribute):
nodes.append(expr)
expr = expr.expr
if not isinstance(expr, astroid.nodes.Name) or expr.name != "dns":
return None
return list(reversed(nodes))
@classmethod
def _dns_module_for_attribute(cls, node):
"""
For dns.a.b.c, return the longest dns.a.b... prefix that is likely to be a module,
or None if the chain is not rooted in dns.
"""
last_module = None
chain_nodes = cls._dns_attribute_nodes(node)
if chain_nodes is None:
return None
full = "dns." + ".".join(part.attrname for part in chain_nodes)
# Prefer inferred module names to avoid treating classes/constants as
# modules (e.g. `dns.name.NameRelation` should resolve to `dns.name`).
for chain_node in chain_nodes:
inferred = cls._infer_module_name(chain_node)
if inferred is not None and full.startswith(inferred):
last_module = inferred
if last_module is not None:
return last_module
# Fallback when inference is unavailable: assume the terminal segment
# is not a module symbol and require the parent path.
parts = full.split(".")
if len(parts) <= 2:
return full
return ".".join(parts[:-1])
@staticmethod
def _infer_module_name(node):
"""Infer `dns.<module>` for a node; return None if inference is unsure."""
try:
for inferred in node.infer():
if inferred is astroid.util.Uninferable:
continue
# Inference can return either a Module node directly or another
# symbol rooted in a module; normalize both to module name.
module = (
inferred
if isinstance(inferred, astroid.nodes.Module)
else inferred.root()
)
name = module.name
if name.startswith("dns."):
return name
# Inference can fail for dynamic/partial code; fall back gracefully.
except astroid.AstroidError:
pass
return None
def register(linter):
linter.register_checker(DnsExplicitImportsChecker(linter))

View file

@ -9,21 +9,22 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import base64
from re import compile as Re
import base64
import os
import pytest
pytest.importorskip("cryptography")
from cryptography.hazmat.primitives.asymmetric import ec
import dns
import dns.dnssec
import dns.zone
from dns.rdtypes.dnskeybase import Flag
import dns.dnssec
import dns.name
import dns.rdataclass
import dns.rdatatype
import dns.rdtypes.ANY.RRSIG
import dns.zone
import pytest
import isctest

View file

@ -9,7 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
import dns.rdatatype
import dns.rrset

View file

@ -14,11 +14,12 @@
import os
import re
import isctest
import isctest.mark
import dns.rcode
import dns.rrset
import pytest
import dns.rrset
import isctest
import isctest.mark
pytestmark = [
isctest.mark.with_dnstap,

View file

@ -17,8 +17,8 @@ except (ModuleNotFoundError, ImportError):
print("No python yaml module, skipping")
sys.exit(1)
import subprocess
import pprint
import subprocess
DNSTAP_READ = sys.argv[1]
DATAFILE = sys.argv[2]

View file

@ -14,6 +14,7 @@
import shutil
import pytest
import isctest

View file

@ -11,18 +11,16 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from functools import reduce
from resource import RLIMIT_NOFILE, getrlimit, setrlimit
import os
import sys
import random
import socket
import subprocess
import random
import sys
import time
from functools import reduce
from resource import getrlimit
from resource import setrlimit
from resource import RLIMIT_NOFILE
MULTIDIG_INSTANCES = 10
CONNECT_TRIES = 5
@ -137,9 +135,8 @@ class SubDIG:
return command
def run(self):
# pylint: disable=consider-using-with
with open(os.devnull, "w", encoding="utf-8") as devnull:
self.sub_process = subprocess.Popen(
self.sub_process = subprocess.Popen( # pylint: disable=consider-using-with
self.get_command(), shell=True, stdout=devnull
)
@ -228,8 +225,8 @@ def run_test(http_secure=True):
assert subdig.alive(), "The single DIG instance is expected to be alive"
assert multidig.alive(), (
"The DIG instances from the set are all expected to "
"be alive, but {} of them have completed"
).format(multidig.completed())
f"be alive, but {multidig.completed()} of them have completed"
)
# Let's close opened connections (in random order) to let all dig
# processes to complete
connector.disconnect_all()

View file

@ -16,13 +16,12 @@ import struct
import subprocess
import time
import pytest
import dns
import dns.exception
import dns.message
import dns.name
import dns.rdataclass
import dns.rdatatype
import pytest
import isctest
@ -48,7 +47,7 @@ def test_gnutls_cli_query(gnutls_cli_executable, named_tlsport):
"--no-ocsp",
"--alpn=dot",
"--logfile=gnutls-cli.log",
"--port=%d" % named_tlsport,
f"--port={named_tlsport}",
"10.53.0.1",
]
with open("gnutls-cli.err", "wb") as gnutls_cli_stderr, subprocess.Popen(

View file

@ -10,6 +10,7 @@
# information regarding copyright ownership.
import dns.flags
import dns.message
import pytest
import isctest

View file

@ -10,9 +10,9 @@
# information regarding copyright ownership.
import os
import pytest
import dns.flags
import pytest
import isctest

View file

@ -11,7 +11,7 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
import dns.rcode
import dns.rdatatype

View file

@ -9,9 +9,10 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import dns
from dns import rdataclass, rdatatype
import dns.name
import isctest
ARTIFACTS = [

View file

@ -11,8 +11,6 @@
import pytest
import isctest.mark
from filters.common import (
ARTIFACTS,
check_filter,
@ -20,6 +18,8 @@ from filters.common import (
prime_cache,
)
import isctest.mark
pytestmark = pytest.mark.extra_artifacts(ARTIFACTS)

View file

@ -11,8 +11,6 @@
import pytest
import isctest.mark
from filters.common import (
ARTIFACTS,
check_filter,
@ -20,6 +18,8 @@ from filters.common import (
prime_cache,
)
import isctest.mark
pytestmark = pytest.mark.extra_artifacts(ARTIFACTS)

View file

@ -11,9 +11,6 @@
import pytest
import isctest
import isctest.mark
from filters.common import (
ARTIFACTS,
check_filter,
@ -21,6 +18,8 @@ from filters.common import (
prime_cache,
)
import isctest.mark
pytestmark = pytest.mark.extra_artifacts(ARTIFACTS)

View file

@ -11,8 +11,6 @@
import pytest
import isctest.mark
from filters.common import (
ARTIFACTS,
check_filter,
@ -20,6 +18,8 @@ from filters.common import (
prime_cache,
)
import isctest.mark
pytestmark = pytest.mark.extra_artifacts(ARTIFACTS)

View file

@ -15,10 +15,10 @@ import subprocess
import pytest
import isctest
from filters.common import ARTIFACTS
import isctest
pytestmark = pytest.mark.extra_artifacts(ARTIFACTS)

View file

@ -11,10 +11,10 @@
import pytest
import isctest
from filters.common import ARTIFACTS
import isctest
pytestmark = pytest.mark.extra_artifacts(ARTIFACTS)

View file

@ -11,7 +11,7 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
import dns.rdatatype
import dns.rrset

View file

@ -11,7 +11,7 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
import dns.name
import dns.rcode

View file

@ -10,6 +10,7 @@
# information regarding copyright ownership.
import dns.edns
import dns.flags
import dns.message
import pytest

View file

@ -9,17 +9,31 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from . import check
from . import instance
from . import hypothesis
from . import query
from . import kasp
from . import run
from . import template
from . import log
from . import vars # pylint: disable=redefined-builtin
from . import ( # pylint: disable=redefined-builtin
check,
hypothesis,
instance,
kasp,
log,
query,
run,
template,
vars,
)
# isctest.mark module is intentionally NOT imported, because it relies on
# environment variables which might not be set at the time of import of the
# `isctest` package. To use the marks, manual `import isctest.mark` is needed
# instead.
__all__ = [
"check",
"hypothesis",
"instance",
"kasp",
"log",
"query",
"run",
"template",
"vars",
]

View file

@ -11,21 +11,9 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from collections.abc import AsyncGenerator, Callable, Coroutine, Sequence
from dataclasses import dataclass, field
from typing import (
Any,
AsyncGenerator,
Callable,
Coroutine,
Dict,
List,
Optional,
Set,
Sequence,
Tuple,
Union,
cast,
)
from typing import Any, cast
import abc
import asyncio
@ -53,11 +41,10 @@ import dns.rdataset
import dns.rdatatype
import dns.rrset
import dns.tsig
import dns.version
import dns.zone
_UdpHandler = Callable[
[bytes, Tuple[str, int], asyncio.DatagramTransport], Coroutine[Any, Any, None]
[bytes, tuple[str, int], asyncio.DatagramTransport], Coroutine[Any, Any, None]
]
@ -75,7 +62,7 @@ class _AsyncUdpHandler(asyncio.DatagramProtocol):
self,
handler: _UdpHandler,
) -> None:
self._transport: Optional[asyncio.DatagramTransport] = None
self._transport: asyncio.DatagramTransport | None = None
self._handler: _UdpHandler = handler
def connection_made(self, transport: asyncio.BaseTransport) -> None:
@ -84,7 +71,7 @@ class _AsyncUdpHandler(asyncio.DatagramProtocol):
"""
self._transport = cast(asyncio.DatagramTransport, transport)
def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None:
def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
"""
Called by asyncio when a datagram is received.
"""
@ -109,9 +96,9 @@ class AsyncServer:
def __init__(
self,
udp_handler: Optional[_UdpHandler],
tcp_handler: Optional[_TcpHandler],
pidfile: Optional[str] = None,
udp_handler: _UdpHandler | None,
tcp_handler: _TcpHandler | None,
pidfile: str | None = None,
) -> None:
logging.basicConfig(
format="%(asctime)s %(levelname)8s %(message)s",
@ -133,12 +120,12 @@ class AsyncServer:
logging.info("Setting up IPv4 listener at %s:%d", ipv4_address, port)
logging.info("Setting up IPv6 listener at [%s]:%d", ipv6_address, port)
self._ip_addresses: Tuple[str, str] = (ipv4_address, ipv6_address)
self._ip_addresses: tuple[str, str] = (ipv4_address, ipv6_address)
self._port: int = port
self._udp_handler: Optional[_UdpHandler] = udp_handler
self._tcp_handler: Optional[_TcpHandler] = tcp_handler
self._pidfile: Optional[str] = pidfile
self._work_done: Optional[asyncio.Future] = None
self._udp_handler: _UdpHandler | None = udp_handler
self._tcp_handler: _TcpHandler | None = tcp_handler
self._pidfile: str | None = pidfile
self._work_done: asyncio.Future | None = None
def _get_ipv4_address_from_directory_name(self) -> str:
containing_directory = pathlib.Path().absolute().stem
@ -186,7 +173,7 @@ class AsyncServer:
loop.set_exception_handler(self._handle_exception)
def _handle_exception(
self, _: asyncio.AbstractEventLoop, context: Dict[str, Any]
self, _: asyncio.AbstractEventLoop, context: dict[str, Any]
) -> None:
assert self._work_done
exception = context.get("exception", RuntimeError(context["message"]))
@ -269,15 +256,13 @@ class QueryContext:
socket: Peer
peer: Peer
protocol: DnsProtocol
zone: Optional[dns.zone.Zone] = field(default=None, init=False)
soa: Optional[dns.rrset.RRset] = field(default=None, init=False)
node: Optional[dns.node.Node] = field(default=None, init=False)
answer: Optional[dns.rdataset.Rdataset] = field(default=None, init=False)
alias: Optional[dns.name.Name] = field(default=None, init=False)
_initialized_response: Optional[dns.message.Message] = field(
default=None, init=False
)
_initialized_response_with_zone_data: Optional[dns.message.Message] = field(
zone: dns.zone.Zone | None = field(default=None, init=False)
soa: dns.rrset.RRset | None = field(default=None, init=False)
node: dns.node.Node | None = field(default=None, init=False)
answer: dns.rdataset.Rdataset | None = field(default=None, init=False)
alias: dns.name.Name | None = field(default=None, init=False)
_initialized_response: dns.message.Message | None = field(default=None, init=False)
_initialized_response_with_zone_data: dns.message.Message | None = field(
default=None, init=False
)
@ -322,7 +307,7 @@ class ResponseAction(abc.ABC):
"""
@abc.abstractmethod
async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
async def perform(self) -> dns.message.Message | bytes | None:
"""
This method is expected to carry out arbitrary actions (e.g. wait for a
specific amount of time, modify the answer, etc.) and then return the
@ -345,11 +330,11 @@ class DnsResponseSend(ResponseAction):
"""
response: dns.message.Message
authoritative: Optional[bool] = None
authoritative: bool | None = None
delay: float = 0.0
acknowledge_hand_rolled_response: bool = False
async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
async def perform(self) -> dns.message.Message | bytes | None:
"""
Yield a potentially delayed response that is a dns.message.Message.
"""
@ -395,7 +380,7 @@ class BytesResponseSend(ResponseAction):
response: bytes
delay: float = 0.0
async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
async def perform(self) -> dns.message.Message | bytes | None:
"""
Yield a potentially delayed response that is a sequence of bytes.
"""
@ -412,7 +397,7 @@ class ResponseDrop(ResponseAction):
Action which does nothing - as if a packet was dropped.
"""
async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
async def perform(self) -> dns.message.Message | bytes | None:
return None
@ -430,7 +415,7 @@ class CloseConnection(ResponseAction):
delay: float = 0.0
async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
async def perform(self) -> dns.message.Message | bytes | None:
if self.delay > 0:
logging.info("Waiting %.1fs before closing TCP connection", self.delay)
await asyncio.sleep(self.delay)
@ -512,7 +497,7 @@ class IgnoreAllConnections(ConnectionHandler):
client socket, effectively ignoring all incoming connections.
"""
_connections: Set[asyncio.StreamWriter] = field(default_factory=set)
_connections: set[asyncio.StreamWriter] = field(default_factory=set)
async def handle(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, peer: Peer
@ -623,14 +608,14 @@ class QnameHandler(ResponseHandler):
@property
@abc.abstractmethod
def qnames(self) -> List[str]:
def qnames(self) -> list[str]:
"""
A list of QNAMEs handled by this class.
"""
raise NotImplementedError
def __init__(self) -> None:
self._qnames: List[dns.name.Name] = [dns.name.from_text(d) for d in self.qnames]
self._qnames: list[dns.name.Name] = [dns.name.from_text(d) for d in self.qnames]
def __str__(self) -> str:
return f"{self.__class__.__name__}(QNAMEs: {', '.join(self.qnames)})"
@ -653,7 +638,7 @@ class QnameQtypeHandler(QnameHandler):
@property
@abc.abstractmethod
def qtypes(self) -> List[dns.rdatatype.RdataType]:
def qtypes(self) -> list[dns.rdatatype.RdataType]:
"""
A list of QTYPEs handled by this class.
"""
@ -661,7 +646,7 @@ class QnameQtypeHandler(QnameHandler):
def __init__(self) -> None:
super().__init__()
self._qtypes: List[dns.rdatatype.RdataType] = self.qtypes
self._qtypes: list[dns.rdatatype.RdataType] = self.qtypes
def __str__(self) -> str:
return f"{self.__class__.__name__}(QNAMEs: {', '.join(self.qnames)}; QTYPEs: {', '.join(map(str, self.qtypes))})"
@ -687,7 +672,7 @@ class StaticResponseHandler(ResponseHandler):
"""
@property
def rcode(self) -> Optional[dns.rcode.Rcode]:
def rcode(self) -> dns.rcode.Rcode | None:
"""
Optional RCODE to be set in the response.
"""
@ -715,7 +700,7 @@ class StaticResponseHandler(ResponseHandler):
return []
@property
def authoritative(self) -> Optional[bool]:
def authoritative(self) -> bool | None:
"""
Whether to set the AA bit in the response.
"""
@ -755,17 +740,17 @@ class DomainHandler(ResponseHandler):
@property
@abc.abstractmethod
def domains(self) -> List[str]:
def domains(self) -> list[str]:
"""
A list of domain names handled by this class.
"""
raise NotImplementedError
def __init__(self) -> None:
self._domains: List[dns.name.Name] = sorted(
self._domains: list[dns.name.Name] = sorted(
[dns.name.from_text(d) for d in self.domains], reverse=True
)
self._matched_domain: Optional[dns.name.Name] = None
self._matched_domain: dns.name.Name | None = None
@property
def matched_domain(self) -> dns.name.Name:
@ -833,7 +818,7 @@ class ForwarderHandler(ResponseHandler):
logging.debug("[OUT] %s", self._query.hex())
cast(asyncio.DatagramTransport, transport).sendto(self._query)
def datagram_received(self, data: bytes, _: Tuple[str, int]) -> None:
def datagram_received(self, data: bytes, _: tuple[str, int]) -> None:
logging.debug("[IN] %s", data.hex())
self._response.set_result(data)
@ -896,8 +881,8 @@ class _ZoneTreeNode:
A node representing a zone with one origin.
"""
zone: Optional[dns.zone.Zone]
children: List["_ZoneTreeNode"] = field(default_factory=list)
zone: dns.zone.Zone | None
children: list["_ZoneTreeNode"] = field(default_factory=list)
class _ZoneTree:
@ -947,7 +932,7 @@ class _ZoneTree:
node_from.children.remove(child)
node_to.children.append(child)
def find_best_zone(self, name: dns.name.Name) -> Optional[dns.zone.Zone]:
def find_best_zone(self, name: dns.name.Name) -> dns.zone.Zone | None:
"""
Return the closest matching zone (if any) for the domain name.
"""
@ -965,7 +950,7 @@ class _DnsMessageWithTsigDisabled(dns.message.Message):
"""
class _DisableTsigHandling(contextlib.ContextDecorator):
def __init__(self, message: Optional[dns.message.Message] = None) -> None:
def __init__(self, message: dns.message.Message | None = None) -> None:
self.original_tsig_sign = dns.tsig.sign
self.original_tsig_validate = dns.tsig.validate
if message:
@ -977,7 +962,7 @@ class _DnsMessageWithTsigDisabled(dns.message.Message):
from failing on messages initialized with `dns.message.from_wire(keyring=False)`.
"""
def sign(*_: Any, **__: Any) -> Tuple[dns.rdata.Rdata, None]:
def sign(*_: Any, **__: Any) -> tuple[dns.rdata.Rdata, None]:
assert self.tsig
return self.tsig[0], None
@ -1054,16 +1039,16 @@ class AsyncDnsServer(AsyncServer):
/,
default_rcode: dns.rcode.Rcode = dns.rcode.REFUSED,
default_aa: bool = False,
keyring: Union[
Dict[dns.name.Name, dns.tsig.Key], None, _NoKeyringType
] = _NoKeyringType(),
keyring: (
dict[dns.name.Name, dns.tsig.Key] | None | _NoKeyringType
) = _NoKeyringType(),
acknowledge_manual_dname_handling: bool = False,
) -> None:
super().__init__(self._handle_udp, self._handle_tcp, "ans.pid")
self._zone_tree: _ZoneTree = _ZoneTree()
self._connection_handler: Optional[ConnectionHandler] = None
self._response_handlers: List[ResponseHandler] = []
self._connection_handler: ConnectionHandler | None = None
self._response_handlers: list[ResponseHandler] = []
self._default_rcode = default_rcode
self._default_aa = default_aa
self._keyring = keyring
@ -1172,7 +1157,7 @@ class AsyncDnsServer(AsyncServer):
raise ValueError(error)
async def _handle_udp(
self, wire: bytes, addr: Tuple[str, int], transport: asyncio.DatagramTransport
self, wire: bytes, addr: tuple[str, int], transport: asyncio.DatagramTransport
) -> None:
logging.debug("Received UDP message: %s", wire.hex())
socket_info = transport.get_extra_info("sockname")
@ -1215,7 +1200,7 @@ class AsyncDnsServer(AsyncServer):
async def _read_tcp_query(
self, reader: asyncio.StreamReader, peer: Peer
) -> Optional[bytes]:
) -> bytes | None:
wire_length = await self._read_tcp_query_wire_length(reader, peer)
if not wire_length:
return None
@ -1224,7 +1209,7 @@ class AsyncDnsServer(AsyncServer):
async def _read_tcp_query_wire_length(
self, reader: asyncio.StreamReader, peer: Peer
) -> Optional[int]:
) -> int | None:
logging.debug("Receiving TCP message length from %s...", peer)
wire_length_bytes = await self._read_tcp_octets(reader, peer, 2)
@ -1237,7 +1222,7 @@ class AsyncDnsServer(AsyncServer):
async def _read_tcp_query_wire(
self, reader: asyncio.StreamReader, peer: Peer, wire_length: int
) -> Optional[bytes]:
) -> bytes | None:
logging.debug("Receiving TCP message (%d octets) from %s...", wire_length, peer)
wire = await self._read_tcp_octets(reader, peer, wire_length)
@ -1250,7 +1235,7 @@ class AsyncDnsServer(AsyncServer):
async def _read_tcp_octets(
self, reader: asyncio.StreamReader, peer: Peer, expected: int
) -> Optional[bytes]:
) -> bytes | None:
buffer = b""
while len(buffer) < expected:
@ -1299,7 +1284,7 @@ class AsyncDnsServer(AsyncServer):
)
def _log_response(
self, qctx: QueryContext, response: Optional[Union[dns.message.Message, bytes]]
self, qctx: QueryContext, response: dns.message.Message | bytes | None
) -> None:
if not response:
logging.info(
@ -1399,7 +1384,7 @@ class AsyncDnsServer(AsyncServer):
async def _prepare_responses(
self, qctx: QueryContext
) -> AsyncGenerator[Optional[Union[dns.message.Message, bytes]], None]:
) -> AsyncGenerator[dns.message.Message | bytes | None, None]:
"""
Yield response(s) either from response handlers or zone data.
"""
@ -1592,7 +1577,7 @@ class ControllableAsyncDnsServer(AsyncDnsServer):
return dns.name.from_text(self._CONTROL_DOMAIN)
@functools.cached_property
def _commands(self) -> Dict[dns.name.Name, "ControlCommand"]:
def _commands(self) -> dict[dns.name.Name, "ControlCommand"]:
return {}
def install_control_commands(self, *commands: "ControlCommand") -> None:
@ -1613,7 +1598,7 @@ class ControllableAsyncDnsServer(AsyncDnsServer):
async def _prepare_responses(
self, qctx: QueryContext
) -> AsyncGenerator[Optional[Union[dns.message.Message, bytes]], None]:
) -> AsyncGenerator[dns.message.Message | bytes | None, None]:
"""
Detect and handle control queries, falling back to normal processing
for non-control queries.
@ -1626,9 +1611,7 @@ class ControllableAsyncDnsServer(AsyncDnsServer):
async for response in super()._prepare_responses(qctx):
yield response
def _handle_control_command(
self, qctx: QueryContext
) -> Optional[dns.message.Message]:
def _handle_control_command(self, qctx: QueryContext) -> dns.message.Message | None:
"""
Detect and handle control queries.
@ -1703,8 +1686,8 @@ class ControlCommand(abc.ABC):
@abc.abstractmethod
def handle(
self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> Optional[str]:
self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> str | None:
"""
This method is expected to carry out arbitrary actions in response to a
control query. Note that it is invoked synchronously (it is not a
@ -1742,11 +1725,11 @@ class ToggleResponsesCommand(ControlCommand):
control_subdomain = "send-responses"
def __init__(self) -> None:
self._current_handler: Optional[IgnoreAllQueries] = None
self._current_handler: IgnoreAllQueries | None = None
def handle(
self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> Optional[str]:
self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> str | None:
if len(args) != 1:
logging.error("Invalid %s query %s", self, qctx.qname)
qctx.response.set_rcode(dns.rcode.SERVFAIL)
@ -1785,12 +1768,12 @@ class SwitchControlCommand(ControlCommand):
control_subdomain = "switch"
def __init__(self, handler_mapping: Dict[str, Sequence[ResponseHandler]]):
def __init__(self, handler_mapping: dict[str, Sequence[ResponseHandler]]):
self._handler_mapping = handler_mapping
def handle(
self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> Optional[str]:
self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> str | None:
if len(args) != 1 or args[0] not in self._handler_mapping:
logging.error("Invalid %s query %s", self, qctx.qname)
qctx.response.set_rcode(dns.rcode.SERVFAIL)

View file

@ -9,15 +9,18 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import cast
import difflib
import shutil
from typing import cast, List, Optional
from dns.edns import EDECode, EDEOption
import dns.edns
from dns.edns import EDECode, EDEOption
import dns.flags
import dns.message
import dns.rcode
import dns.rrset
import dns.zone
import isctest.log
@ -69,10 +72,10 @@ def noraflag(message: dns.message.Message) -> None:
def _extract_ede_options(
message: dns.message.Message,
) -> List[EDEOption]:
) -> list[EDEOption]:
"""Extract EDE options from the DNS message."""
return cast(
List[EDEOption],
list[EDEOption],
[
option
for option in message.options
@ -87,9 +90,7 @@ def noede(message: dns.message.Message) -> None:
assert not ede_options, f"unexpected EDE options {ede_options} in {message}"
def ede(
message: dns.message.Message, code: EDECode, text: Optional[str] = None
) -> None:
def ede(message: dns.message.Message, code: EDECode, text: str | None = None) -> None:
"""Check if message contains expected EDE code (and its text)."""
msg_opts = _extract_ede_options(message)
matching_opts = [opt for opt in msg_opts if opt.code == code]
@ -135,7 +136,7 @@ def same_answer(res1: dns.message.Message, res2: dns.message.Message):
def rrsets_equal(
first_rrset: dns.rrset.RRset,
second_rrset: dns.rrset.RRset,
compare_ttl: Optional[bool] = False,
compare_ttl: bool | None = False,
) -> None:
"""Compare two RRset (optionally including TTL)"""
@ -164,7 +165,7 @@ def rrsets_equal(
def zones_equal(
first_zone: dns.zone.Zone,
second_zone: dns.zone.Zone,
compare_ttl: Optional[bool] = False,
compare_ttl: bool | None = False,
) -> None:
"""Compare two zones (optionally including TTL)"""

View file

@ -9,8 +9,9 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
# This ensures we're using a suitable hypothesis version. A newer version is
# required for FIPS-enabled platforms.
from . import settings, strategies
from . import settings
from . import strategies
__all__ = [
"settings",
"strategies",
]

View file

@ -11,10 +11,10 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import collections.abc
from typing import List, Union
from warnings import warn
import collections.abc
from hypothesis.strategies import (
binary,
builds,
@ -27,7 +27,6 @@ from hypothesis.strategies import (
)
import dns.name
import dns.message
import dns.rdataclass
import dns.rdatatype
@ -39,9 +38,7 @@ def dns_names(
draw,
*,
prefix: dns.name.Name = dns.name.empty,
suffix: Union[
dns.name.Name, collections.abc.Iterable[dns.name.Name]
] = dns.name.root,
suffix: dns.name.Name | collections.abc.Iterable[dns.name.Name] = dns.name.root,
min_labels: int = 1,
max_labels: int = 128,
) -> dns.name.Name:
@ -155,7 +152,7 @@ dns_rdatatypes_without_meta = integers(0, dns.rdatatype.OPT - 1) | integers(dns.
@composite
def _partition_bytes_to_labels(
draw, remaining_bytes: int, number_of_labels: int
) -> List[int]:
) -> list[int]:
two_bytes_reserved_for_label = 2
# Reserve two bytes for each label

View file

@ -11,18 +11,19 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import List, NamedTuple, Optional
from pathlib import Path
from typing import NamedTuple
import os
from pathlib import Path
import re
import dns.update
import dns.exception
import dns.rcode
import dns.update
from .log import debug, WatchLogFromStart, WatchLogFromHere
from .run import CmdResult, EnvCmd, perl
from .log import WatchLogFromHere, WatchLogFromStart, debug
from .query import udp
from .run import CmdResult, EnvCmd, perl
from .text import TextFile
@ -53,8 +54,8 @@ class NamedInstance:
def __init__(
self,
identifier: str,
num: Optional[int] = None,
ports: Optional[NamedPorts] = None,
num: int | None = None,
ports: NamedPorts | None = None,
) -> None:
"""
`identifier` is the name of the instance's directory
@ -94,7 +95,7 @@ class NamedInstance:
return f"10.53.0.{self.num}"
@staticmethod
def _identifier_to_num(identifier: str, num: Optional[int] = None) -> int:
def _identifier_to_num(identifier: str, num: int | None = None) -> int:
regex_match = re.match(r"^ns(?P<index>[0-9]{1,2})$", identifier)
if not regex_match:
if num is None:
@ -175,7 +176,7 @@ class NamedInstance:
watcher.wait_for_line("all zones loaded")
return cmd
def stop(self, args: Optional[List[str]] = None) -> None:
def stop(self, args: list[str] | None = None) -> None:
"""Stop the instance."""
args = args or []
perl(
@ -183,7 +184,7 @@ class NamedInstance:
[self.system_test_name, self.identifier] + args,
)
def start(self, args: Optional[List[str]] = None) -> None:
def start(self, args: list[str] | None = None) -> None:
"""Start the instance."""
args = args or []
perl(

View file

@ -11,26 +11,33 @@
from datetime import datetime, timedelta, timezone
from functools import total_ordering
from pathlib import Path
from re import compile as Re
import glob
import os
from pathlib import Path
import re
from re import compile as Re
import time
from typing import Dict, List, Optional, Tuple, Union
import dns
import dns.dnssec
import dns.exception
import dns.message
import dns.name
import dns.rcode
import dns.rdataclass
import dns.rdatatype
import dns.rrset
import dns.tsig
import dns.zone
import dns.zonefile
from isctest.instance import NamedInstance
from isctest.template import TrustAnchor
from isctest.vars.algorithms import ALL_ALGORITHMS_BY_NUM, Algorithm
import isctest.log
import isctest.query
import isctest.util
from isctest.instance import NamedInstance
from isctest.template import TrustAnchor
from isctest.vars.algorithms import Algorithm, ALL_ALGORITHMS_BY_NUM
DEFAULT_TTL = 300
@ -84,24 +91,24 @@ def Iret(config, zsk=True, ksk=False, rollover=True, smooth=True):
sign_delay = config["signatures-validity"] - config["signatures-refresh"]
safety_interval = config["retire-safety"]
iretKSK = timedelta(0)
iret_ksk = timedelta(0)
if ksk:
# KSK: Double-KSK Method: Iret = DprpP + TTLds
iretKSK = (
iret_ksk = (
config["parent-propagation-delay"] + config["ds-ttl"] + safety_interval
)
iretZSK = timedelta(0)
iret_zsk = timedelta(0)
if zsk:
# ZSK: Pre-Publication Method: Iret = Dsgn + Dprp + TTLsig
iretZSK = (
iret_zsk = (
sign_delay
+ config["zone-propagation-delay"]
+ config["max-zone-ttl"]
+ safety_interval
)
return max(iretKSK, iretZSK)
return max(iret_ksk, iret_zsk)
@total_ordering
@ -130,26 +137,26 @@ class KeyTimingMetadata:
def __str__(self) -> str:
return self.value.strftime(self.FORMAT)
def __add__(self, other: Union[timedelta, int]):
def __add__(self, other: timedelta | int):
if isinstance(other, int):
other = timedelta(seconds=other)
result = KeyTimingMetadata.__new__(KeyTimingMetadata)
result.value = self.value + other
return result
def __sub__(self, other: Union[timedelta, int]):
def __sub__(self, other: timedelta | int):
if isinstance(other, int):
other = timedelta(seconds=other)
result = KeyTimingMetadata.__new__(KeyTimingMetadata)
result.value = self.value - other
return result
def __iadd__(self, other: Union[timedelta, int]):
def __iadd__(self, other: timedelta | int):
if isinstance(other, int):
other = timedelta(seconds=other)
self.value += other
def __isub__(self, other: Union[timedelta, int]):
def __isub__(self, other: timedelta | int):
if isinstance(other, int):
other = timedelta(seconds=other)
self.value -= other
@ -176,7 +183,7 @@ class KeyProperties:
self,
name: str,
metadata: dict,
timing: Dict[str, KeyTimingMetadata],
timing: dict[str, KeyTimingMetadata],
private: bool = True,
legacy: bool = False,
role: str = "csk",
@ -184,7 +191,7 @@ class KeyProperties:
flags: int = 257,
keytag_min: int = 0,
keytag_max: int = 65535,
offset: Union[timedelta, int] = 0,
offset: timedelta | int = 0,
):
self.name = name
self.key = None
@ -215,7 +222,7 @@ class KeyProperties:
"KSK": "yes",
"ZSK": "yes",
}
timing: Dict[str, KeyTimingMetadata] = {}
timing: dict[str, KeyTimingMetadata] = {}
result = KeyProperties(name="DEFAULT", metadata=metadata, timing=timing)
result.name = "DEFAULT"
@ -322,7 +329,7 @@ class Key:
operations for KASP tests.
"""
def __init__(self, name: str, keydir: Optional[Union[str, Path]] = None):
def __init__(self, name: str, keydir: str | Path | None = None):
self.name = name
if keydir is None:
self.keydir = Path()
@ -337,7 +344,7 @@ class Key:
def get_timing(
self, metadata: str, must_exist: bool = True
) -> Optional[KeyTimingMetadata]:
) -> KeyTimingMetadata | None:
regex = rf";\s+{metadata}:\s+(\d+).*"
with open(self.keyfile, "r", encoding="utf-8") as file:
for line in file:
@ -377,7 +384,7 @@ class Key:
def get_signing_state(
self, offline_ksk=False, zsk_missing=False, smooth=False
) -> Tuple[bool, bool]:
) -> tuple[bool, bool]:
"""
This returns the signing state derived from the key states, KRRSIGState
and ZRRSIGState.
@ -575,14 +582,10 @@ class Key:
isctest.log.debug(f"{self.name} {key} TIMING UNEXPECTED: {value}")
return value == "undefined"
def match_properties(self, zone, properties):
def _check_public_key_file(self, zone, properties):
"""
Check the key with given properties.
Check the public key file.
"""
# Check file existence.
# Noop. If file is missing then the get_metadata calls will fail.
# Check the public key file.
role = properties.role_full()
comment = f"This is a {role} key, keyid {self.tag}, for {zone}."
if not isctest.util.file_contents_contain(self.keyfile, comment):
@ -597,31 +600,45 @@ class Key:
isctest.log.debug(f"{self.name} DNSKEY MISMATCH: expected '{dnskey}'")
return False
# Now check the private key file.
if properties.private:
# Retrieve creation date.
created = self.get_metadata("Generated")
return True
pval = self.get_metadata("Created", file=self.privatefile)
if pval != created:
isctest.log.debug(
f"{self.name} Created METADATA MISMATCH: {pval} - {created}"
)
return False
pval = self.get_metadata("Private-key-format", file=self.privatefile)
if pval != "v1.3":
isctest.log.debug(
f"{self.name} Private-key-format METADATA MISMATCH: {pval} - v1.3"
)
return False
pval = self.get_metadata("Algorithm", file=self.privatefile)
if pval != f"{alg}":
isctest.log.debug(
f"{self.name} Algorithm METADATA MISMATCH: {pval} - {alg}"
)
return False
def _check_private_key_file(self, properties):
"""
Check the private key file.
"""
if not properties.private:
return True
# Now check the key state file.
alg = properties.metadata["Algorithm"]
# Retrieve creation date.
created = self.get_metadata("Generated")
pval = self.get_metadata("Created", file=self.privatefile)
if pval != created:
isctest.log.debug(
f"{self.name} Created METADATA MISMATCH: {pval} - {created}"
)
return False
pval = self.get_metadata("Private-key-format", file=self.privatefile)
if pval != "v1.3":
isctest.log.debug(
f"{self.name} Private-key-format METADATA MISMATCH: {pval} - v1.3"
)
return False
pval = self.get_metadata("Algorithm", file=self.privatefile)
if pval != f"{alg}":
isctest.log.debug(
f"{self.name} Algorithm METADATA MISMATCH: {pval} - {alg}"
)
return False
return True
def _check_key_state_file(self, zone, properties):
"""
Check the key state file.
"""
if properties.legacy:
return True
@ -652,7 +669,24 @@ class Key:
if self.tag > properties.keytag_max:
return False
# A match is found.
return True
def match_properties(self, zone, properties):
"""
Check the key with given properties.
"""
# Check file existence.
# Noop. If file is missing then the get_metadata calls will fail.
if not self._check_public_key_file(zone, properties):
return False
if not self._check_private_key_file(properties):
return False
if not self._check_key_state_file(zone, properties):
return False
return True
def match_timingmetadata(self, timings, file=None, comment=False):
@ -1474,8 +1508,8 @@ def next_key_event_equals(server, zone, next_event):
def keydir_to_keylist(
zone: Optional[str], keydir: Optional[str] = None, in_use: bool = False
) -> List[Key]:
zone: str | None, keydir: str | None = None, in_use: bool = False
) -> list[Key]:
"""
Retrieve all keys from the key files in a directory. If 'zone' is None,
retrieve all keys in the directory, otherwise only those matching the
@ -1515,11 +1549,11 @@ def keydir_to_keylist(
return [k for k in all_keys if used(k)]
def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]:
def keystr_to_keylist(keystr: str, keydir: str | None = None) -> list[Key]:
return [Key(name, keydir) for name in keystr.split()]
def policy_to_properties(ttl, keys: List[str]) -> List[KeyProperties]:
def policy_to_properties(ttl, keys: list[str]) -> list[KeyProperties]:
"""
Get the policies from a list of specially formatted strings.
The splitted line should result in the following items:
@ -1541,8 +1575,8 @@ def policy_to_properties(ttl, keys: List[str]) -> List[KeyProperties]:
line = key.split()
# defaults
metadata: Dict[str, Union[str, int]] = {}
timing: Dict[str, KeyTimingMetadata] = {}
metadata: dict[str, str | int] = {}
timing: dict[str, KeyTimingMetadata] = {}
private = True
legacy = False
keytag_min = 0

View file

@ -11,16 +11,31 @@
from .basic import (
avoid_duplicated_logs,
critical,
debug,
deinit_module_logger,
deinit_test_logger,
error,
info,
init_conftest_logger,
init_module_logger,
init_test_logger,
debug,
info,
warning,
error,
critical,
)
from .watchlog import WatchLogFromHere, WatchLogFromStart
from .watchlog import WatchLogFromStart, WatchLogFromHere
__all__ = [
"WatchLogFromHere",
"WatchLogFromStart",
"avoid_duplicated_logs",
"critical",
"debug",
"deinit_module_logger",
"deinit_test_logger",
"error",
"info",
"init_conftest_logger",
"init_module_logger",
"init_test_logger",
"warning",
]

View file

@ -9,19 +9,19 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import logging
from pathlib import Path
import logging
import textwrap
from typing import Dict, Optional
LOG_FORMAT = "%(asctime)s %(levelname)7s:%(name)s %(message)s"
LOG_INDENT = 4
LOGGERS = {
LOGGERS: dict[str, logging.Logger | None] = {
"conftest": None,
"module": None,
"test": None,
} # type: Dict[str, Optional[logging.Logger]]
}
def init_conftest_logger():

View file

@ -9,16 +9,17 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import Any, List, Match, Optional, Pattern, TextIO, TypeVar, Union
from re import Match, Pattern
from typing import Any, TextIO, TypeAlias, TypeVar
import abc
import os
import time
from isctest.text import compile_pattern, FlexPattern, LineReader
from isctest.text import FlexPattern, LineReader, compile_pattern
T = TypeVar("T")
OneOrMore = Union[T, List[T]]
OneOrMore: TypeAlias = T | list[T]
class WatchLogException(Exception):
@ -62,8 +63,8 @@ class WatchLog(abc.ABC):
...
isctest.log.watchlog.WatchLogException: timeout must be greater than 0
"""
self._fd: Optional[TextIO] = None
self._reader: Optional[LineReader] = None
self._fd: TextIO | None = None
self._reader: LineReader | None = None
self._path = path
self._wait_function_called = False
if timeout <= 0.0:
@ -71,12 +72,12 @@ class WatchLog(abc.ABC):
self._timeout = timeout
self._deadline = 0.0
def _setup_wait(self, patterns: OneOrMore[FlexPattern]) -> List[Pattern]:
def _setup_wait(self, patterns: OneOrMore[FlexPattern]) -> list[Pattern]:
self._wait_function_called = True
self._deadline = time.monotonic() + self._timeout
return self._prepare_patterns(patterns)
def _prepare_patterns(self, strings: OneOrMore[FlexPattern]) -> List[Pattern]:
def _prepare_patterns(self, strings: OneOrMore[FlexPattern]) -> list[Pattern]:
"""
Convert a mix of string(s) and/or pattern(s) into a list of patterns.
@ -90,7 +91,7 @@ class WatchLog(abc.ABC):
patterns.append(compile_pattern(string))
return patterns
def _wait_for_match(self, regexes: List[Pattern]) -> Match:
def _wait_for_match(self, regexes: list[Pattern]) -> Match:
if not self._reader:
raise WatchLogException(
"use WatchLog as context manager before calling wait_for_*() functions"
@ -209,7 +210,7 @@ class WatchLog(abc.ABC):
return self._wait_for_match(regexes)
def wait_for_sequence(self, patterns: List[FlexPattern]) -> List[Match]:
def wait_for_sequence(self, patterns: list[FlexPattern]) -> list[Match]:
"""
Block execution until the specified pattern sequence is found in the
log file.
@ -285,7 +286,7 @@ class WatchLog(abc.ABC):
return matches
def wait_for_all(self, patterns: List[FlexPattern]) -> List[Match]:
def wait_for_all(self, patterns: list[FlexPattern]) -> list[Match]:
"""
Block execution until all the specified patterns are found in the
log file in any order.

View file

@ -11,11 +11,12 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import os
from pathlib import Path
import os
import platform
import socket
import shutil
import socket
import subprocess
import pytest

View file

@ -9,14 +9,14 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import Iterable, FrozenSet
import dns.name
import dns.zone
import dns.rdatatype
from collections.abc import Iterable
from dns.name import Name
import dns.name
import dns.rdatatype
import dns.zone
def prepend_label(label: str, name: Name) -> Name:
return Name((label,) + name.labels)
@ -26,7 +26,7 @@ def len_wire_uncompressed(name: Name) -> int:
return len(name) + sum(map(len, name.labels))
def get_wildcard_names(names: Iterable[Name]) -> FrozenSet[Name]:
def get_wildcard_names(names: Iterable[Name]) -> frozenset[Name]:
return frozenset(name for name in names if name.is_wild())
@ -84,7 +84,7 @@ class ZoneAnalyzer:
.union(self.reachable_dnames)
)
def get_names_with_type(self, rdtype) -> FrozenSet[Name]:
def get_names_with_type(self, rdtype) -> frozenset[Name]:
return frozenset(
name for name in self.zone if self.zone.get_rdataset(name, rdtype)
)
@ -148,7 +148,7 @@ class ZoneAnalyzer:
self.reachable_delegations = frozenset(reachable_delegations)
self.occluded = frozenset(occluded)
def generate_ents(self) -> FrozenSet[Name]:
def generate_ents(self) -> frozenset[Name]:
"""
Generate reachable names of empty nodes "between" all reachable
names with a RR and the origin.

View file

@ -9,12 +9,18 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from collections.abc import Callable
from typing import Any
import os
import time
from typing import Any, Callable, Optional
import dns.query
import dns.exception
import dns.flags
import dns.message
import dns.query
import dns.rcode
import dns.rdataclass
import isctest.log
@ -25,11 +31,11 @@ def generic_query(
query_func: Callable[..., Any],
message: dns.message.Message,
ip: str,
port: Optional[int] = None,
source: Optional[str] = None,
port: int | None = None,
source: str | None = None,
timeout: int = QUERY_TIMEOUT,
attempts: int = 10,
expected_rcode: Optional[dns.rcode.Rcode] = None,
expected_rcode: dns.rcode.Rcode | None = None,
verify: bool = False,
log_query: bool = True,
log_response: bool = True,

View file

@ -9,11 +9,11 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import os
from pathlib import Path
import os
import subprocess
import time
from typing import List, Optional
import isctest.log
import isctest.text
@ -39,9 +39,9 @@ def cmd(
stderr=subprocess.PIPE,
log_stdout=True,
log_stderr=True,
input_text: Optional[bytes] = None,
input_text: bytes | None = None,
raise_on_exception=True,
env: Optional[dict] = None,
env: dict | None = None,
) -> CmdResult:
"""Execute a command with given args as subprocess."""
isctest.log.debug(f"isctest.run.cmd(): {' '.join(args)}")
@ -97,7 +97,7 @@ class EnvCmd:
def _run_script(
interpreter: str,
script: str,
args: Optional[List[str]] = None,
args: list[str] | None = None,
):
if args is None:
args = []
@ -129,12 +129,12 @@ def _run_script(
isctest.log.debug(" exited with %d", returncode)
def shell(script: str, args: Optional[List[str]] = None) -> None:
def shell(script: str, args: list[str] | None = None) -> None:
"""Run a given script with system's shell interpreter."""
_run_script(os.environ["SHELL"], script, args)
def perl(script: str, args: Optional[List[str]] = None) -> None:
def perl(script: str, args: list[str] | None = None) -> None:
"""Run a given script with system's perl interpreter."""
_run_script(os.environ["PERL"], script, args)

View file

@ -13,7 +13,7 @@
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional, Union
from typing import Any
import jinja2
@ -26,7 +26,7 @@ class TemplateEngine:
Engine for rendering jinja2 templates in system test directories.
"""
def __init__(self, directory: Union[str, Path], env_vars=ALL):
def __init__(self, directory: str | Path, env_vars=ALL):
"""
Initialize the template engine for `directory`, optionally overriding
the `env_vars` that will be used when rendering the templates (defaults
@ -44,8 +44,8 @@ class TemplateEngine:
def render(
self,
output: str,
data: Optional[Dict[str, Any]] = None,
template: Optional[str] = None,
data: dict[str, Any] | None = None,
template: str | None = None,
) -> None:
"""
Render `output` file from jinja `template` and fill in the `data`. The
@ -69,7 +69,7 @@ class TemplateEngine:
stream = self.j2env.get_template(template).stream(data)
stream.dump(output, encoding="utf-8")
def render_auto(self, data: Optional[Dict[str, Any]] = None):
def render_auto(self, data: dict[str, Any] | None = None):
"""
Render all *.j2 templates with default (and optionally the provided)
values and write the output to files without the .j2 extensions.

View file

@ -11,12 +11,15 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from collections.abc import Iterator
from re import Match, Pattern
from re import compile as Re
from typing import TextIO
import abc
import re
from re import compile as Re
from typing import Iterator, List, Match, Optional, Pattern, TextIO, Union
FlexPattern = Union[str, Pattern]
FlexPattern = str | Pattern
def compile_pattern(string: FlexPattern) -> Pattern:
@ -47,7 +50,7 @@ class Grep(abc.ABC):
if match:
yield match
def grep(self, pattern: FlexPattern) -> List[Match]:
def grep(self, pattern: FlexPattern) -> list[Match]:
"""
Get list of lines matching the pattern.
"""
@ -149,7 +152,7 @@ class LineReader(Grep):
self._stream = stream
self._linebuf = ""
def readline(self) -> Optional[str]:
def readline(self) -> str | None:
"""
Wrapper around io.readline() function to handle unfinished lines.

View file

@ -9,6 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import dns.rrset
import dns.zone
import pytest

View file

@ -11,19 +11,29 @@
import os
from .all import ALL
from .algorithms import init_crypto_supported, set_algorithm_set
from .features import init_features
from .openssl import parse_openssl_config
from .. import log
from . import algorithms, autoconf, basic, dirs, features, openssl, ports
from .all import ALL
__all__ = [
"ALL",
"algorithms",
"autoconf",
"basic",
"dirs",
"features",
"init_vars",
"openssl",
"ports",
]
def init_vars():
"""Initializes the environment variables."""
init_features()
init_crypto_supported()
set_algorithm_set(os.getenv("ALGORITHM_SET"))
parse_openssl_config(ALL["OPENSSL_CONF"])
features.init_features()
algorithms.init_crypto_supported()
algorithms.set_algorithm_set(os.getenv("ALGORITHM_SET"))
openssl.parse_openssl_config(ALL["OPENSSL_CONF"])
os.environ.update(ALL)
log.debug("setting following env vars: %s", ", ".join([str(key) for key in ALL]))

View file

@ -9,16 +9,17 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import NamedTuple
import os
import platform
import random
import subprocess
import tempfile
import time
from typing import Dict, List, NamedTuple, Optional, Union
from .basic import BASIC_VARS
from .. import log
from .basic import BASIC_VARS
# Algorithms are selected randomly at runtime from a list of supported
# algorithms. The randomization is deterministic and remains stable for a
@ -56,18 +57,26 @@ class Algorithm(NamedTuple):
number: int
bits: int
@classmethod
def default(cls):
return cls(
os.environ["DEFAULT_ALGORITHM"],
int(os.environ["DEFAULT_ALGORITHM_NUMBER"]),
int(os.environ["DEFAULT_BITS"]),
)
class AlgorithmSet(NamedTuple):
"""Collection of DEFAULT, ALTERNATIVE and DISABLED algorithms"""
default: Union[Algorithm, List[Algorithm]]
default: Algorithm | list[Algorithm]
"""DEFAULT is the algorithm for testing."""
alternative: Union[Algorithm, List[Algorithm]]
alternative: Algorithm | list[Algorithm]
"""ALTERNATIVE is an alternative algorithm for test cases that require more
than one algorithm (for example algorithm rollover)."""
disabled: Union[Algorithm, List[Algorithm]]
disabled: Algorithm | list[Algorithm]
"""DISABLED is an algorithm that is used for tests against the
"disable-algorithms" configuration option."""
@ -151,7 +160,7 @@ CRYPTO_SUPPORTED_VARS = {
"ED448_SUPPORTED": "0",
}
SUPPORTED_ALGORITHMS: List[Algorithm] = []
SUPPORTED_ALGORITHMS: list[Algorithm] = []
def init_crypto_supported():
@ -241,7 +250,7 @@ def _select_random(algs: AlgorithmSet, stable_period=STABLE_PERIOD) -> Algorithm
return AlgorithmSet(default, alternative, disabled)
def _algorithms_env(algs: AlgorithmSet, name: str) -> Dict[str, str]:
def _algorithms_env(algs: AlgorithmSet, name: str) -> dict[str, str]:
"""Return environment variables with selected algorithms as a dict."""
algs_env = {
"ALGORITHM_SET": name,
@ -264,7 +273,7 @@ def _algorithms_env(algs: AlgorithmSet, name: str) -> Dict[str, str]:
return algs_env
def set_algorithm_set(name: Optional[str]):
def set_algorithm_set(name: str | None):
if name is None:
name = "stable"
assert name in ALGORITHM_SETS, f'ALGORITHM_SET "{name}" unknown'

View file

@ -11,11 +11,8 @@
from collections import ChainMap
# pylint: disable=import-error
from .autoconf import AC_VARS # type: ignore
# pylint: enable=import-error
from .algorithms import ALG_VARS, CRYPTO_SUPPORTED_VARS
from .autoconf import AC_VARS # type: ignore
from .basic import BASIC_VARS
from .dirs import DIR_VARS
from .features import FEATURE_VARS

View file

@ -10,10 +10,9 @@
# information regarding copyright ownership.
from pathlib import Path
from typing import Dict
def load_ac_vars_from_files() -> Dict[str, str]:
def load_ac_vars_from_files() -> dict[str, str]:
ac_vars = {}
ac_vars_dir = Path(__file__).resolve().parent / ".ac_vars"
var_paths = [

View file

@ -11,12 +11,8 @@
import os
# pylint: disable=import-error
from .autoconf import AC_VARS # type: ignore
# pylint: enable=import-error
BASIC_VARS = {
"ARPANAME": f"{AC_VARS['TOP_BUILDDIR']}/bin/tools/arpaname",
"CDS": f"{AC_VARS['TOP_BUILDDIR']}/bin/dnssec/dnssec-cds",

View file

@ -11,12 +11,8 @@
import os
# pylint: disable=import-error
from .autoconf import AC_VARS # type: ignore
# pylint: enable=import-error
SYSTEM_TEST_DIR_GIT_PATH = "bin/tests/system"
DIR_VARS = {

View file

@ -9,9 +9,9 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import os
from re import compile as Re
from typing import Optional
import os
from .. import log
@ -23,7 +23,7 @@ OPENSSL_VARS = {
}
def parse_openssl_config(path: Optional[str]):
def parse_openssl_config(path: str | None):
if path is None or not os.path.exists(path):
OPENSSL_VARS["ENGINE_ARG"] = None
OPENSSL_VARS["SOFTHSM2_MODULE"] = None

View file

@ -11,6 +11,8 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from collections.abc import AsyncGenerator, Collection, Iterable
import abc
import dns.rcode
@ -18,8 +20,6 @@ import dns.rdataclass
import dns.rdatatype
import dns.rrset
from typing import AsyncGenerator, Collection, Iterable
from isctest.asyncserver import (
ControllableAsyncDnsServer,
DnsResponseSend,

View file

@ -11,6 +11,8 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from collections.abc import AsyncGenerator, Collection, Iterable
import abc
import dns.rcode
@ -18,8 +20,6 @@ import dns.rdataclass
import dns.rdatatype
import dns.rrset
from typing import AsyncGenerator, Collection, Iterable
from isctest.asyncserver import (
ControllableAsyncDnsServer,
DnsResponseSend,

View file

@ -11,6 +11,8 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from collections.abc import AsyncGenerator, Collection, Iterable
import abc
import dns.rcode
@ -18,8 +20,6 @@ import dns.rdataclass
import dns.rdatatype
import dns.rrset
from typing import AsyncGenerator, Collection, Iterable
from isctest.asyncserver import (
ControllableAsyncDnsServer,
DnsResponseSend,

View file

@ -9,25 +9,28 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from datetime import timedelta
import os
import shutil
import subprocess
import time
from datetime import timedelta
import dns
import dns.exception
import dns.name
import dns.rcode
import dns.rdataclass
import dns.rdatatype
import dns.tsig
import dns.update
import pytest
from isctest.kasp import KeyProperties, KeyTimingMetadata
from isctest.util import param
from isctest.vars.algorithms import ECDSAP256SHA256, ECDSAP384SHA384, Algorithm
import isctest
import isctest.mark
from isctest.kasp import (
KeyProperties,
KeyTimingMetadata,
)
from isctest.util import param
from isctest.vars.algorithms import ECDSAP256SHA256, ECDSAP384SHA384
pytestmark = pytest.mark.extra_artifacts(
[
@ -131,10 +134,10 @@ KASP_INHERIT_TSIG_SECRET = {
}
def autosign_properties(alg, size):
def autosign_properties(algorithm: Algorithm):
return [
f"ksk {lifetime['P2Y']} {alg} {size} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"zsk {lifetime['P1Y']} {alg} {size} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
f"ksk {lifetime['P2Y']} {algorithm.number} {algorithm.bits} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"zsk {lifetime['P1Y']} {algorithm.number} {algorithm.bits} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
]
@ -355,9 +358,7 @@ def cb_remove_keyfiles(params, ksks=None, zsks=None):
"policy": "autosign",
"config": autosign_config,
"offset": -timedelta(days=30 * 6),
"key-properties": autosign_properties(
os.environ["DEFAULT_ALGORITHM_NUMBER"], os.environ["DEFAULT_BITS"]
),
"key-properties": autosign_properties(Algorithm.default()),
},
id="dnskey-ttl-mismatch.autosign",
),
@ -367,9 +368,7 @@ def cb_remove_keyfiles(params, ksks=None, zsks=None):
"policy": "autosign",
"config": autosign_config,
"offset": -timedelta(days=30 * 6),
"key-properties": autosign_properties(
os.environ["DEFAULT_ALGORITHM_NUMBER"], os.environ["DEFAULT_BITS"]
),
"key-properties": autosign_properties(Algorithm.default()),
"additional-tests": [
{
"callback": cb_rrsig_refresh,
@ -385,9 +384,7 @@ def cb_remove_keyfiles(params, ksks=None, zsks=None):
"policy": "autosign",
"config": autosign_config,
"offset": -timedelta(days=30 * 6),
"key-properties": autosign_properties(
os.environ["DEFAULT_ALGORITHM_NUMBER"], os.environ["DEFAULT_BITS"]
),
"key-properties": autosign_properties(Algorithm.default()),
"additional-tests": [
{
"callback": cb_rrsig_reuse,
@ -403,9 +400,7 @@ def cb_remove_keyfiles(params, ksks=None, zsks=None):
"policy": "autosign",
"config": autosign_config,
"offset": -timedelta(days=30 * 6),
"key-properties": autosign_properties(
os.environ["DEFAULT_ALGORITHM_NUMBER"], os.environ["DEFAULT_BITS"]
),
"key-properties": autosign_properties(Algorithm.default()),
"additional-tests": [
{
"callback": cb_rrsig_refresh,
@ -421,9 +416,7 @@ def cb_remove_keyfiles(params, ksks=None, zsks=None):
"policy": "autosign",
"config": autosign_config,
"offset": -timedelta(days=30 * 6),
"key-properties": autosign_properties(
os.environ["DEFAULT_ALGORITHM_NUMBER"], os.environ["DEFAULT_BITS"]
),
"key-properties": autosign_properties(Algorithm.default()),
"additional-tests": [
{
"callback": cb_remove_keyfiles,
@ -440,8 +433,8 @@ def cb_remove_keyfiles(params, ksks=None, zsks=None):
"config": autosign_config,
"offset": -timedelta(days=30 * 6),
"key-properties": [
f"ksk 63072000 {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent missing",
f"zsk 31536000 {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
f"ksk 63072000 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent missing",
f"zsk 31536000 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
],
},
id="ksk-missing.autosign",
@ -453,8 +446,8 @@ def cb_remove_keyfiles(params, ksks=None, zsks=None):
"config": autosign_config,
"offset": -timedelta(days=30 * 6),
"key-properties": [
f"ksk 63072000 {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"zsk 31536000 {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent missing",
f"ksk 63072000 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"zsk 31536000 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent missing",
],
},
id="zsk-missing.autosign",
@ -513,8 +506,8 @@ def cb_remove_keyfiles(params, ksks=None, zsks=None):
},
"key-directories": ["{keydir}/ksk", "{keydir}/zsk"],
"key-properties": [
f"ksk unlimited {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
f"zsk unlimited {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
f"ksk unlimited {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
f"zsk unlimited {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
],
},
id="keystore.kasp",
@ -615,7 +608,7 @@ def cb_remove_keyfiles(params, ksks=None, zsks=None):
"policy": "unlimited",
"config": kasp_config,
"key-properties": [
f"csk 0 {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="unlimited.kasp",
@ -912,7 +905,7 @@ def test_kasp_default(ns3):
ns3.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"zone {zone}/IN (signed): {expectmsg}")
# Nothing has changed.
expected[0].private = False # noqa
expected[0].private = False
isctest.kasp.check_dnssec_verify(ns3, zone)
isctest.kasp.check_keys(zone, keys, expected)
isctest.kasp.check_keytimes(keys, expected)
@ -1061,18 +1054,16 @@ def test_kasp_dynamic(ns3):
assert f"zone_resigninc: zone {zone}/IN (unsigned): enter" not in "ns3/named.run"
def test_kasp_checkds(ns3):
def test_kasp_checkds(ns3, default_algorithm):
def wait_for_metadata():
return isctest.util.file_contents_contain(ksk.statefile, metadata)
# Zone: checkds-ksk.kasp.
zone = "checkds-ksk.kasp"
policy = "checkds-ksk"
alg = os.environ["DEFAULT_ALGORITHM_NUMBER"]
size = os.environ["DEFAULT_BITS"]
policy_keys = [
f"ksk unlimited {alg} {size} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
f"zsk unlimited {alg} {size} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
f"ksk unlimited {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
f"zsk unlimited {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
]
isctest.kasp.wait_keymgr_done(ns3, zone)
@ -1105,19 +1096,17 @@ def test_kasp_checkds(ns3):
isctest.kasp.check_keys(zone, keys, expected)
def test_kasp_checkds_doubleksk(ns3):
def test_kasp_checkds_doubleksk(ns3, default_algorithm):
def wait_for_metadata():
return isctest.util.file_contents_contain(ksk.statefile, metadata)
# Zone: checkds-doubleksk.kasp.
zone = "checkds-doubleksk.kasp"
policy = "checkds-doubleksk"
alg = os.environ["DEFAULT_ALGORITHM_NUMBER"]
size = os.environ["DEFAULT_BITS"]
policy_keys = [
f"ksk unlimited {alg} {size} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
f"ksk unlimited {alg} {size} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
f"zsk unlimited {alg} {size} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
f"ksk unlimited {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
f"ksk unlimited {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
f"zsk unlimited {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
]
isctest.kasp.wait_keymgr_done(ns3, zone)
@ -1179,17 +1168,15 @@ def test_kasp_checkds_doubleksk(ns3):
isctest.kasp.check_keys(zone, keys, expected)
def test_kasp_checkds_csk(ns3):
def test_kasp_checkds_csk(ns3, default_algorithm):
def wait_for_metadata():
return isctest.util.file_contents_contain(ksk.statefile, metadata)
# Zone: checkds-csk.kasp.
zone = "checkds-csk.kasp"
policy = "checkds-csk"
alg = os.environ["DEFAULT_ALGORITHM_NUMBER"]
size = os.environ["DEFAULT_BITS"]
policy_keys = [
f"csk unlimited {alg} {size} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk unlimited {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
]
isctest.kasp.wait_keymgr_done(ns3, zone)
@ -1467,7 +1454,7 @@ def test_kasp_dnssec_keygen():
isctest.kasp.check_keytimes(keys, expected)
def test_kasp_zsk_retired(ns3):
def test_kasp_zsk_retired(ns3, default_algorithm):
config = {
"dnskey-ttl": timedelta(seconds=300),
"ds-ttl": timedelta(days=1),
@ -1482,14 +1469,12 @@ def test_kasp_zsk_retired(ns3):
zone = "zsk-retired.autosign"
policy = "autosign"
alg = os.environ["DEFAULT_ALGORITHM_NUMBER"]
size = os.environ["DEFAULT_BITS"]
key_properties = [
f"ksk 63072000 {alg} {size} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"ksk 63072000 {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
# zsk predecessor
f"zsk 31536000 {alg} {size} goal:hidden dnskey:omnipresent zrrsig:omnipresent",
f"zsk 31536000 {default_algorithm.number} {default_algorithm.bits} goal:hidden dnskey:omnipresent zrrsig:omnipresent",
# zsk successor
f"zsk 31536000 {alg} {size} goal:omnipresent dnskey:rumoured zrrsig:hidden",
f"zsk 31536000 {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:rumoured zrrsig:hidden",
]
isctest.kasp.wait_keymgr_done(ns3, zone)
@ -1688,18 +1673,16 @@ def test_kasp_reload_restart(ns6):
isctest.run.retry_with_timeout(check_soa_ttl, timeout=10)
def test_kasp_manual_mode(ns3):
def test_kasp_manual_mode(ns3, default_algorithm):
keydir = ns3.identifier
zone = "keyfiles-missing.manual"
policy = "manual"
ttl = int(autosign_config["dnskey-ttl"].total_seconds())
offset = -timedelta(days=30 * 6)
alg = os.environ["DEFAULT_ALGORITHM_NUMBER"]
size = os.environ["DEFAULT_BITS"]
keyprops = [
f"ksk {lifetime['P2Y']} {alg} {size} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"zsk {lifetime['P2M']} {alg} {size} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
f"ksk {lifetime['P2Y']} {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"zsk {lifetime['P2M']} {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
]
isctest.kasp.wait_keymgr_done(ns3, zone)
@ -1774,9 +1757,9 @@ def test_kasp_manual_mode(ns3):
# Check keys again, make sure the rollover has started.
keyprops = [
f"ksk {lifetime['P2Y']} {alg} {size} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"zsk {lifetime['P2M']} {alg} {size} goal:hidden dnskey:omnipresent zrrsig:omnipresent",
f"zsk {lifetime['P2M']} {alg} {size} goal:omnipresent dnskey:rumoured zrrsig:hidden",
f"ksk {lifetime['P2Y']} {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"zsk {lifetime['P2M']} {default_algorithm.number} {default_algorithm.bits} goal:hidden dnskey:omnipresent zrrsig:omnipresent",
f"zsk {lifetime['P2M']} {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:rumoured zrrsig:hidden",
]
expected = isctest.kasp.policy_to_properties(ttl=ttl, keys=keyprops)
keys = isctest.kasp.keydir_to_keylist(zone, keydir)

View file

@ -9,9 +9,10 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import isctest
import pytest
import isctest
pytestmark = pytest.mark.extra_artifacts(
["ns2/named.stats"],
)

View file

@ -9,9 +9,10 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from re import compile as Re
import hashlib
import os
from re import compile as Re
import shutil
import pytest
@ -85,7 +86,6 @@ def token_init_and_cleanup():
assert Re("Found token (.*) with matching token label") in cmd.out
# pylint: disable-msg=too-many-locals
@pytest.mark.parametrize(
"alg_name,alg_type,alg_bits",
[

View file

@ -10,6 +10,7 @@
# information regarding copyright ownership.
from datetime import timedelta
import os
import re
import shutil
@ -17,8 +18,10 @@ import time
import pytest
import isctest
from isctest.kasp import KeyTimingMetadata
from isctest.vars.algorithms import Algorithm
import isctest
pytestmark = pytest.mark.extra_artifacts(
[
@ -116,12 +119,17 @@ def ksr(zone, policy, action, options="", raise_on_exception=True, to_file=""):
def check_keys(
keys,
lifetime,
alg=os.environ["DEFAULT_ALGORITHM_NUMBER"],
size=os.environ["DEFAULT_BITS"],
alg=None,
size=None,
offset=0,
with_state=False,
):
# Check keys that were created.
if alg is None:
alg = Algorithm.default().number
if size is None:
size = Algorithm.default().bits
num = 0
now = KeyTimingMetadata.now()
@ -507,7 +515,6 @@ def check_signedkeyresponse(
# collect keys that should be in this bundle
# collect lines that should be in this bundle
bundle_keys.append(key)
# pylint: disable=unused-variable
for _arg in expected_cds:
bundle_lines.append(lines[line_no])
line_no += 1

View file

@ -11,10 +11,11 @@
import itertools
import isctest
import dns.flags
import dns.rrset
import pytest
import dns.rrset
import isctest
@pytest.mark.parametrize(

View file

@ -9,13 +9,15 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import os
from datetime import timedelta
import os
import pytest
from isctest.vars.algorithms import Algorithm
import isctest
import isctest.mark
pytestmark = pytest.mark.extra_artifacts(
[
@ -133,8 +135,8 @@ lifetime = {
"config": standard_config,
"offset": 0,
"key-properties": [
f"ksk 0 {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:rumoured",
f"zsk {lifetime['P60D']} {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
f"ksk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:rumoured",
f"zsk {lifetime['P60D']} {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
],
},
id="migrate.kasp",
@ -148,7 +150,7 @@ lifetime = {
"config": default_config,
"offset": 0,
"key-properties": [
f"csk 0 {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:rumoured",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:rumoured",
],
},
id="csk.kasp",
@ -162,7 +164,7 @@ lifetime = {
"config": default_config,
"offset": 0,
"key-properties": [
f"csk 0 {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:rumoured",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:rumoured",
],
},
id="csk-nosep.kasp",
@ -176,8 +178,8 @@ lifetime = {
"config": timing_config,
"offset": -timedelta(seconds=300),
"key-properties": [
f"ksk {lifetime['P60D']} {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:rumoured",
f"zsk {lifetime['P60D']} {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
f"ksk {lifetime['P60D']} {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:rumoured",
f"zsk {lifetime['P60D']} {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
],
},
id="rumoured.kasp",
@ -191,8 +193,8 @@ lifetime = {
"config": timing_config,
"offset": -timedelta(seconds=3900),
"key-properties": [
f"ksk {lifetime['P60D']} {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"zsk {lifetime['P60D']} {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
f"ksk {lifetime['P60D']} {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"zsk {lifetime['P60D']} {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
],
},
id="omnipresent.kasp",
@ -206,8 +208,8 @@ lifetime = {
"config": timing_config,
"offset": -timedelta(hours=12),
"key-properties": [
f"ksk {lifetime['P60D']} {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:rumoured",
f"zsk {lifetime['P60D']} {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
f"ksk {lifetime['P60D']} {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:rumoured",
f"zsk {lifetime['P60D']} {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
],
},
id="no-syncpublish.kasp",
@ -223,8 +225,8 @@ lifetime = {
"key-properties": [
"ksk - 8 2048 goal:hidden dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
"zsk - 8 2048 goal:hidden dnskey:omnipresent zrrsig:omnipresent",
f"ksk 0 {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
f"zsk {lifetime['P60D']} {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
f"ksk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
f"zsk {lifetime['P60D']} {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
],
},
id="migrate-nomatch-algnum.kasp",
@ -256,10 +258,10 @@ lifetime = {
"config": migrate_config,
"offset": -timedelta(seconds=3900),
"key-properties": [
f"ksk - {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:hidden dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"zsk - {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:hidden dnskey:omnipresent zrrsig:omnipresent",
f"ksk - {Algorithm.default().number} {Algorithm.default().bits} goal:hidden dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"zsk - {Algorithm.default().number} {Algorithm.default().bits} goal:hidden dnskey:omnipresent zrrsig:omnipresent",
# This key is considered to be prepublished, so it is not yet signing, nor is the DS introduced.
f"csk 0 {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:hidden ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:hidden ds:hidden",
],
},
id="migrate-nomatch-kzc.kasp",

View file

@ -10,13 +10,16 @@
# information regarding copyright ownership.
from datetime import timedelta
import os
from re import compile as Re
import pytest
import os
import dns
import dns.name
import dns.rcode
import dns.rdataclass
import dns.rdatatype
import dns.update
import pytest
import isctest
@ -45,8 +48,6 @@ pytestmark = pytest.mark.extra_artifacts(
]
)
ALGORITHM = os.environ["DEFAULT_ALGORITHM_NUMBER"]
SIZE = os.environ["DEFAULT_BITS"]
CONFIG = {
"dnskey-ttl": timedelta(hours=1),
"ds-ttl": timedelta(days=1),
@ -502,11 +503,11 @@ def check_remove_cds(
check_dnssec(server, zone, keys, expected)
def test_multisigner(ns3, ns4):
def test_multisigner(ns3, ns4, default_algorithm):
zone = "model2.multisigner"
keyprops = [
f"ksk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"zsk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
f"ksk 0 {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"zsk 0 {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
]
# First make sure the zone is properly signed.
@ -529,10 +530,10 @@ def test_multisigner(ns3, ns4):
check_dnssec(ns4, zone, keys4, expected4)
# Add DNSKEY to RRset.
newprops = [f"zsk unlimited {ALGORITHM} {SIZE}"]
newprops = [f"zsk unlimited {default_algorithm.number} {default_algorithm.bits}"]
extra = isctest.kasp.policy_to_properties(ttl=TTL, keys=newprops)
extra[0].private = False # noqa
extra[0].legacy = True # noqa
extra[0].private = False
extra[0].legacy = True
check_add_zsk(ns3, zone, keys3, expected3, [zsks4[0]], extra)
check_add_zsk(ns4, zone, keys4, expected4, [zsks3[0]], extra)
@ -544,10 +545,10 @@ def test_multisigner(ns3, ns4):
check_no_dnssec_in_journal(ns4, zone)
# Add CDNSKEY RRset.
newprops = [f"ksk unlimited {ALGORITHM} {SIZE}"]
newprops = [f"ksk unlimited {default_algorithm.number} {default_algorithm.bits}"]
extra = isctest.kasp.policy_to_properties(ttl=TTL, keys=newprops)
extra[0].private = False # noqa
extra[0].legacy = True # noqa
extra[0].private = False
extra[0].legacy = True
check_add_cdnskey(ns3, zone, keys3, expected3, [ksks4[0]], extra)
check_add_cdnskey(ns4, zone, keys4, expected4, [ksks3[0]], extra)
@ -573,11 +574,11 @@ def test_multisigner(ns3, ns4):
check_no_dnssec_in_journal(ns4, zone)
def test_multisigner_secondary(ns3, ns4, ns5):
def test_multisigner_secondary(ns3, ns4, ns5, default_algorithm):
zone = "model2.secondary"
keyprops = [
f"ksk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"zsk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
f"ksk 0 {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"zsk 0 {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
]
# First make sure the zone is properly signed.
@ -600,10 +601,10 @@ def test_multisigner_secondary(ns3, ns4, ns5):
check_dnssec(ns4, zone, keys4, expected4)
# Add DNSKEY to RRset.
newprops = [f"zsk unlimited {ALGORITHM} {SIZE}"]
newprops = [f"zsk unlimited {default_algorithm.number} {default_algorithm.bits}"]
extra = isctest.kasp.policy_to_properties(ttl=TTL, keys=newprops)
extra[0].private = False # noqa
extra[0].legacy = True # noqa
extra[0].private = False
extra[0].legacy = True
check_add_zsk(ns3, zone, keys3, expected3, [zsks4[0]], extra, primary=ns5)
check_add_zsk(ns4, zone, keys4, expected4, [zsks3[0]], extra, primary=ns5)
@ -617,10 +618,10 @@ def test_multisigner_secondary(ns3, ns4, ns5):
check_no_dnssec_in_journal(ns4, zone)
# Add CDNSKEY RRset.
newprops = [f"ksk unlimited {ALGORITHM} {SIZE}"]
newprops = [f"ksk unlimited {default_algorithm.number} {default_algorithm.bits}"]
extra = isctest.kasp.policy_to_properties(ttl=TTL, keys=newprops)
extra[0].private = False # noqa
extra[0].legacy = True # noqa
extra[0].private = False
extra[0].legacy = True
check_add_cdnskey(ns3, zone, keys3, expected3, [ksks4[0]], extra, primary=ns5)
check_add_cdnskey(ns4, zone, keys4, expected4, [ksks3[0]], extra, primary=ns5)

View file

@ -15,30 +15,30 @@
# https://github.com/pylint-dev/pylint/issues/10785#issuecomment-3677224217
# pylint: disable=unreachable
from collections.abc import Container, Iterable
from dataclasses import dataclass
import os
from pathlib import Path
from typing import Container, Iterable, Optional, Set, Tuple
import pytest
import os
from hypothesis import assume, given
import dns.dnssec
import dns.message
import dns.name
import dns.query
import dns.rcode
import dns.rdataclass
import dns.rdatatype
import dns.rdtypes.ANY.RRSIG
import dns.rdtypes.ANY.NSEC3
import dns.rdtypes.ANY.RRSIG
import dns.rrset
import pytest
from isctest.hypothesis.strategies import dns_names, sampled_from
import isctest
import isctest.name
from hypothesis import assume, given
SUFFIX = dns.name.from_text(".")
AUTH = "10.53.0.1"
RESOLVER = "10.53.0.2"
@ -62,7 +62,7 @@ def is_related_to_any(
def do_test_query(
qname: dns.name.Name, qtype: dns.rdatatype.RdataType, server: str, named_port: int
) -> Tuple[dns.message.QueryMessage, "NSEC3Checker"]:
) -> tuple[dns.message.QueryMessage, "NSEC3Checker"]:
query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
response = isctest.query.tcp(query, server, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response, query)
@ -290,7 +290,7 @@ class NSEC3Params:
algorithm: int
flags: int
iterations: int
salt: Optional[bytes]
salt: bytes | None
class NSEC3Checker:
@ -348,8 +348,8 @@ class NSEC3Checker:
assert attrs_seen["algorithm"] is not None, f"no NSEC3 found\n{response}"
self.params: NSEC3Params = NSEC3Params(**attrs_seen)
self.response: dns.message.Message = response
self.owners_present: Set[dns.name.Name] = owners_seen
self.owners_used: Set[dns.name.Name] = set()
self.owners_present: set[dns.name.Name] = owners_seen
self.owners_used: set[dns.name.Name] = set()
@staticmethod
def nsec3_covers(rrset: dns.rrset.RRset, hashed_name: dns.name.Name) -> bool:

View file

@ -9,16 +9,16 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import os
from datetime import timedelta
import dns
import dns.rcode
import dns.rdataclass
import dns.rdatatype
import pytest
import isctest
pytestmark = pytest.mark.extra_artifacts(
NSEC3_MARK = pytest.mark.extra_artifacts(
[
"*.axfr",
"*.created",
@ -40,9 +40,6 @@ pytestmark = pytest.mark.extra_artifacts(
]
)
ALGORITHM = os.environ["DEFAULT_ALGORITHM_NUMBER"]
SIZE = os.environ["DEFAULT_BITS"]
default_config = {
"dnskey-ttl": timedelta(hours=1),
"ds-ttl": timedelta(days=1),
@ -124,8 +121,8 @@ def check_nsec3_case(server, params, nsec3=True):
if "external-keys" in params:
expected2 = isctest.kasp.policy_to_properties(ttl, keys=params["external-keys"])
for ek in expected2:
ek.private = False # noqa
ek.legacy = True # noqa
ek.private = False
ek.legacy = True
expected = expected + expected2
assert "external-keydir" in params
extkeys = isctest.kasp.keydir_to_keylist(zone, params["external-keydir"])

View file

@ -9,26 +9,20 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
# pylint: disable=redefined-outer-name,unused-import
import os
import shutil
import time
import dns
import dns.update
import dns.name
import dns.rdataclass
import dns.rdatatype
import pytest
from isctest.vars.algorithms import Algorithm
from nsec3.common import NSEC3_MARK, check_nsec3_case
import isctest
import isctest.mark
from isctest.vars.algorithms import RSASHA1
from nsec3.common import (
ALGORITHM,
SIZE,
default_config,
pytestmark,
check_nsec3_case,
)
pytestmark = NSEC3_MARK
# include the following zones when rendering named configs
ZONES = {
@ -100,7 +94,7 @@ def test_nsec3_case(ns3):
"salt-length": 8,
},
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
}
zone = params["zone"]

View file

@ -9,24 +9,19 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
# pylint: disable=redefined-outer-name,unused-import,unspecified-encoding,multiple-statements,use-maxsplit-arg,broad-exception-caught,f-string-without-interpolation
import os
import dns
import dns.rcode
import dns.update
import pytest
from isctest.vars.algorithms import RSASHA1, Algorithm
from nsec3.common import NSEC3_MARK, check_nsec3_case
import isctest
import isctest.mark
from isctest.vars.algorithms import RSASHA1
from nsec3.common import (
ALGORITHM,
SIZE,
default_config,
pytestmark,
check_nsec3_case,
)
pytestmark = NSEC3_MARK
# include the following zones when rendering named configs
ZONES = {
@ -70,7 +65,7 @@ def bootstrap():
"zone": "nsec-to-nsec3.kasp",
"policy": "nsec",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec-to-nsec3.kasp",
@ -104,10 +99,10 @@ def bootstrap():
"zone": "nsec3-xfr-inline.kasp",
"policy": "nsec",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
"external-keys": [
f"csk 0 {ALGORITHM} {SIZE}",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits}",
],
"external-keydir": "ns2",
},
@ -118,7 +113,7 @@ def bootstrap():
"zone": "nsec3-dynamic-update-inline.kasp",
"policy": "nsec",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3-dynamic-update-inline.kasp",
@ -161,7 +156,7 @@ def test_nsec_case(ns3, params):
"zone": "nsec3-to-rsasha1.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:omnipresent krrsig:omnipresent zrrsig:omnipresent ds:omnipresent",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:omnipresent krrsig:omnipresent zrrsig:omnipresent ds:omnipresent",
],
},
id="nsec3-to-rsasha1.kasp",
@ -172,7 +167,7 @@ def test_nsec_case(ns3, params):
"zone": "nsec3-to-rsasha1-ds.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:omnipresent krrsig:omnipresent zrrsig:omnipresent ds:omnipresent",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:omnipresent krrsig:omnipresent zrrsig:omnipresent ds:omnipresent",
],
},
id="nsec3-to-rsasha1-ds.kasp",
@ -183,7 +178,7 @@ def test_nsec_case(ns3, params):
"zone": "nsec3.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3.kasp",
@ -193,7 +188,7 @@ def test_nsec_case(ns3, params):
"zone": "nsec3-dynamic.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3-dynamic.kasp",
@ -203,7 +198,7 @@ def test_nsec_case(ns3, params):
"zone": "nsec3-change.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3-change.kasp",
@ -213,7 +208,7 @@ def test_nsec_case(ns3, params):
"zone": "nsec3-dynamic-change.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3-dynamic-change.kasp",
@ -223,7 +218,7 @@ def test_nsec_case(ns3, params):
"zone": "nsec3-dynamic-to-inline.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3-dynamic-to-inline.kasp",
@ -233,7 +228,7 @@ def test_nsec_case(ns3, params):
"zone": "nsec3-inline-to-dynamic.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3-inline-to-dynamic.kasp",
@ -243,7 +238,7 @@ def test_nsec_case(ns3, params):
"zone": "nsec3-to-nsec.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3-to-nsec.kasp",
@ -253,7 +248,7 @@ def test_nsec_case(ns3, params):
"zone": "nsec3-to-optout.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3-to-optout.kasp",
@ -267,7 +262,7 @@ def test_nsec_case(ns3, params):
"salt-length": 0,
},
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3-from-optout.kasp",
@ -281,7 +276,7 @@ def test_nsec_case(ns3, params):
"salt-length": 8,
},
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3-other.kasp",

View file

@ -9,26 +9,22 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
# pylint: disable=redefined-outer-name,unused-import
import os
import shutil
import time
import dns
import dns.update
import dns.name
import dns.rcode
import dns.rdataclass
import dns.rdatatype
import pytest
from isctest.vars.algorithms import RSASHA1, Algorithm
from nsec3.common import NSEC3_MARK, check_nsec3_case
import isctest
import isctest.mark
from isctest.vars.algorithms import RSASHA1
from nsec3.common import (
ALGORITHM,
SIZE,
default_config,
pytestmark,
check_nsec3_case,
)
pytestmark = NSEC3_MARK
# include the following zones when rendering named configs
ZONES = {
@ -97,7 +93,7 @@ def after_servers_start(ns3, templates):
"policy": "nsec3",
"key-properties": [
f"csk 0 {RSASHA1.number} 2048 goal:hidden dnskey:omnipresent krrsig:omnipresent zrrsig:omnipresent ds:omnipresent",
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="rsasha1-to-nsec3.kasp",
@ -109,7 +105,7 @@ def after_servers_start(ns3, templates):
"policy": "nsec3",
"key-properties": [
f"csk 0 {RSASHA1.number} 2048 goal:hidden dnskey:omnipresent krrsig:omnipresent zrrsig:omnipresent ds:omnipresent",
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="rsasha1-to-nsec3-wait.kasp",
@ -120,7 +116,7 @@ def after_servers_start(ns3, templates):
"zone": "nsec3-to-rsasha1.kasp",
"policy": "rsasha1",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:hidden dnskey:omnipresent krrsig:omnipresent zrrsig:omnipresent ds:omnipresent",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:hidden dnskey:omnipresent krrsig:omnipresent zrrsig:omnipresent ds:omnipresent",
f"csk 0 {RSASHA1.number} 2048 goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
@ -132,7 +128,7 @@ def after_servers_start(ns3, templates):
"zone": "nsec3-to-rsasha1-ds.kasp",
"policy": "rsasha1",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:hidden dnskey:omnipresent krrsig:omnipresent zrrsig:omnipresent ds:omnipresent",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:hidden dnskey:omnipresent krrsig:omnipresent zrrsig:omnipresent ds:omnipresent",
f"csk 0 {RSASHA1.number} 2048 goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
@ -144,7 +140,7 @@ def after_servers_start(ns3, templates):
"zone": "nsec3-to-nsec.kasp",
"policy": "nsec",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3-to-nsec.kasp",
@ -169,7 +165,7 @@ def test_nsec_case(ns3, params):
"zone": "nsec-to-nsec3.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec-to-nsec3.kasp",
@ -179,7 +175,7 @@ def test_nsec_case(ns3, params):
"zone": "nsec3.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3.kasp",
@ -189,7 +185,7 @@ def test_nsec_case(ns3, params):
"zone": "nsec3-dynamic.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3-dynamic.kasp",
@ -203,7 +199,7 @@ def test_nsec_case(ns3, params):
"salt-length": 8,
},
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3-dynamic-change.kasp",
@ -213,7 +209,7 @@ def test_nsec_case(ns3, params):
"zone": "nsec3-dynamic-to-inline.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3-dynamic-to-inline.kasp",
@ -223,7 +219,7 @@ def test_nsec_case(ns3, params):
"zone": "nsec3-inline-to-dynamic.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3-inline-to-dynamic.kasp",
@ -240,7 +236,7 @@ def test_nsec_case(ns3, params):
# "salt-length": 0,
# },
# "key-properties": [
# f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
# f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
# ],
# },
# id="nsec3-to-optout.kasp",
@ -253,7 +249,7 @@ def test_nsec_case(ns3, params):
# "zone": "nsec3-from-optout.kasp",
# "policy": "optout",
# "key-properties": [
# f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
# f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
# ],
# },
# id="nsec3-from-optout.kasp",
@ -267,7 +263,7 @@ def test_nsec_case(ns3, params):
"salt-length": 8,
},
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3-other.kasp",
@ -291,7 +287,7 @@ def test_nsec3_ent(ns3, templates):
"zone": "nsec3-ent.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
}

View file

@ -9,21 +9,13 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
# pylint: disable=redefined-outer-name,unused-import
import os
import shutil
import time
import dns
import pytest
from nsec3.common import NSEC3_MARK, check_nsec3_case
import isctest
from nsec3.common import (
ALGORITHM,
SIZE,
check_nsec3_case,
)
pytestmark = NSEC3_MARK
# include the following zones when rendering named configs
ZONES = {
@ -37,13 +29,13 @@ def bootstrap():
}
def test_nsec3_case(ns3):
def test_nsec3_case(ns3, default_algorithm):
# Get test parameters.
params = {
"zone": "nsec3-fails-to-load.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {default_algorithm.number} {default_algorithm.bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
}
zone = params["zone"]

View file

@ -9,24 +9,17 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
# pylint: disable=redefined-outer-name,unused-import
import os
import dns
import dns.update
import dns.rdatatype
import pytest
from isctest.vars.algorithms import Algorithm
from nsec3.common import NSEC3_MARK, check_nsec3_case, check_nsec3param
import isctest
import isctest.mark
from nsec3.common import (
ALGORITHM,
SIZE,
default_config,
pytestmark,
check_nsec3_case,
check_nsec3param,
)
pytestmark = NSEC3_MARK
# include the following zones when rendering named configs
ZONES = {
@ -74,7 +67,7 @@ def perform_nsec3_tests(server, params):
"zone": "nsec3.kasp",
"policy": "nsec3",
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3.kasp",
@ -88,7 +81,7 @@ def perform_nsec3_tests(server, params):
"salt-length": 8,
},
"key-properties": [
f"csk 0 {ALGORITHM} {SIZE} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
f"csk 0 {Algorithm.default().number} {Algorithm.default().bits} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
],
},
id="nsec3-other.kasp",

View file

@ -9,24 +9,19 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
# pylint: disable=redefined-outer-name,unused-import
from datetime import timedelta
import os
import shutil
import dns
import dns.update
import pytest
import os
import dns.rcode
import dns.rdatatype
from isctest.vars.algorithms import RSASHA256
from nsec3.common import NSEC3_MARK, check_auth_nsec3, check_nsec3param
import isctest
import isctest.mark
from isctest.vars.algorithms import RSASHA256
from nsec3.common import (
pytestmark,
check_auth_nsec3,
check_nsec3param,
)
pytestmark = NSEC3_MARK
DNSKEY_TTL = int(timedelta(hours=1).total_seconds())
ZSK_LIFETIME = int(timedelta(days=90).total_seconds())

View file

@ -9,11 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from isctest.asyncserver import (
AsyncDnsServer,
IgnoreAllConnections,
IgnoreAllQueries,
)
from isctest.asyncserver import AsyncDnsServer, IgnoreAllConnections, IgnoreAllQueries
def main() -> None:

View file

@ -11,6 +11,7 @@
import os
import pytest
import isctest

View file

@ -16,17 +16,12 @@ import os
import re
import sys
import isctest
import pytest
import dns
import dns.exception
import dns.message
import dns.name
import dns.query
import dns.rcode
import dns.rdataclass
import dns.rdatatype
import dns.zone
import pytest
import isctest
pytestmark = [
pytest.mark.skipif(

View file

@ -10,7 +10,7 @@
# information regarding copyright ownership.
[pytest]
addopts = --tb=short -rA -vv
addopts = --tb=short -rA -vv --dist=loadscope
log_format = %(asctime)s %(levelname)s:%(name)s %(message)s
log_date_format = %Y-%m-%d %H:%M:%S
log_cli = 1

View file

@ -11,9 +11,8 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
import dns.message
import dns.name
import dns.rcode
import dns.rdatatype
@ -27,7 +26,7 @@ from isctest.asyncserver import (
ResponseAction,
)
from qmin_ans import (
from ..qmin_ans import (
DelayedResponseHandler,
EntRcodeChanger,
QueryLogHandler,

View file

@ -15,7 +15,7 @@ import dns.rcode
from isctest.asyncserver import AsyncDnsServer
from qmin_ans import DelayedResponseHandler, EntRcodeChanger, QueryLogHandler
from ..qmin_ans import DelayedResponseHandler, EntRcodeChanger, QueryLogHandler
class QueryLogger(QueryLogHandler):

View file

@ -11,7 +11,7 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
import dns.rcode
import dns.rdatatype
@ -24,7 +24,12 @@ from isctest.asyncserver import (
ResponseAction,
)
from qmin_ans import DelayedResponseHandler, EntRcodeChanger, QueryLogHandler, log_query
from ..qmin_ans import (
DelayedResponseHandler,
EntRcodeChanger,
QueryLogHandler,
log_query,
)
class QueryLogger(QueryLogHandler):

View file

@ -11,7 +11,7 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator
from collections.abc import AsyncGenerator
import abc

Some files were not shown because too many files have changed in this diff Show more