From 5764a9d66069f9351f9acc811796cd67d65d62c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20K=C4=99pie=C5=84?= Date: Tue, 18 Mar 2025 16:28:18 +0100 Subject: [PATCH 1/7] Simplify peer address formatting Add a helper class, Peer, which holds the tuple of a connection endpoint and gets pretty-printed when formatted as a string. This enables passing instances of this new class directly to logging functions, eliminating the need for the AsyncDnsServer._format_peer() helper method. --- bin/tests/system/isctest/asyncserver.py | 52 ++++++++++++++----------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/bin/tests/system/isctest/asyncserver.py b/bin/tests/system/isctest/asyncserver.py index ab508b404a..211d140221 100644 --- a/bin/tests/system/isctest/asyncserver.py +++ b/bin/tests/system/isctest/asyncserver.py @@ -224,6 +224,20 @@ class DnsProtocol(enum.Enum): TCP = enum.auto() +@dataclass(frozen=True) +class Peer: + """ + Pretty-printed connection endpoint. + """ + + host: str + port: int + + def __str__(self) -> str: + host = f"[{self.host}]" if ":" in self.host else self.host + return f"{host}:{self.port}" + + @dataclass class QueryContext: """ @@ -232,7 +246,7 @@ class QueryContext: query: dns.message.Message response: dns.message.Message - peer: Tuple[str, int] + peer: Peer protocol: DnsProtocol zone: Optional[dns.zone.Zone] = None soa: Optional[dns.rrset.RRset] = None @@ -513,16 +527,20 @@ class AsyncDnsServer(AsyncServer): self._zone_tree.add(zone) async def _handle_udp( - self, wire: bytes, peer: Tuple[str, int], transport: asyncio.DatagramTransport + self, wire: bytes, addr: Tuple[str, int], transport: asyncio.DatagramTransport ) -> None: logging.debug("Received UDP message: %s", wire.hex()) + peer = Peer(addr[0], addr[1]) responses = self._handle_query(wire, peer, DnsProtocol.UDP) async for response in responses: - transport.sendto(response, peer) + transport.sendto(response, addr) async def _handle_tcp( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter ) -> None: + peer_info = writer.get_extra_info("peername") + peer = Peer(peer_info[0], peer_info[1]) + wire_length_bytes = await reader.read(2) (wire_length,) = struct.unpack("!H", wire_length_bytes) logging.debug("Receiving TCP message (%d octets)...", wire_length) @@ -531,38 +549,26 @@ class AsyncDnsServer(AsyncServer): full_message = wire_length_bytes + wire logging.debug("Received complete TCP message: %s", full_message.hex()) - peer = writer.get_extra_info("peername") responses = self._handle_query(wire, peer, DnsProtocol.TCP) async for response in responses: writer.write(response) try: await writer.drain() except ConnectionResetError: - logging.error( - "TCP connection from %s reset by peer", self._format_peer(peer) - ) + logging.error("TCP connection from %s reset by peer", peer) return writer.close() await writer.wait_closed() - def _format_peer(self, peer: Tuple[str, int]) -> str: - host = peer[0] - port = peer[1] - if "::" in host: - host = f"[{host}]" - return f"{host}:{port}" - - def _log_query( - self, qctx: QueryContext, peer: Tuple[str, int], protocol: DnsProtocol - ) -> None: + def _log_query(self, qctx: QueryContext, peer: Peer, protocol: DnsProtocol) -> None: logging.info( "Received %s/%s/%s (ID=%d) query from %s (%s)", qctx.qname.to_text(omit_final_dot=True), dns.rdataclass.to_text(qctx.qclass), dns.rdatatype.to_text(qctx.qtype), qctx.query.id, - self._format_peer(peer), + peer, protocol.name, ) logging.debug( @@ -573,14 +579,14 @@ class AsyncDnsServer(AsyncServer): self, qctx: QueryContext, response: Optional[Union[dns.message.Message, bytes]], - peer: Tuple[str, int], + peer: Peer, protocol: DnsProtocol, ) -> None: if not response: logging.info( "Not sending a response to query (ID=%d) from %s (%s)", qctx.query.id, - self._format_peer(peer), + peer, protocol.name, ) return @@ -606,7 +612,7 @@ class AsyncDnsServer(AsyncServer): len(response.authority), len(response.additional), qctx.query.id, - self._format_peer(peer), + peer, protocol.name, ) logging.debug( @@ -618,13 +624,13 @@ class AsyncDnsServer(AsyncServer): "Sending response (%d bytes) to a query (ID=%d) from %s (%s)", len(response), qctx.query.id, - self._format_peer(peer), + peer, protocol.name, ) logging.debug("[OUT] %s", response.hex()) async def _handle_query( - self, wire: bytes, peer: Tuple[str, int], protocol: DnsProtocol + self, wire: bytes, peer: Peer, protocol: DnsProtocol ) -> AsyncGenerator[bytes, None]: """ Yield wire data to send as a response over the established transport. From e4c3186a7ccce317a3319406dfe85c3722983a11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20K=C4=99pie=C5=84?= Date: Tue, 18 Mar 2025 16:28:18 +0100 Subject: [PATCH 2/7] Gracefully handle TCP client disconnections Prevent premature client disconnections during reading from triggering unhandled exceptions in TCP connection handling code. --- bin/tests/system/isctest/asyncserver.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bin/tests/system/isctest/asyncserver.py b/bin/tests/system/isctest/asyncserver.py index 211d140221..952cb79756 100644 --- a/bin/tests/system/isctest/asyncserver.py +++ b/bin/tests/system/isctest/asyncserver.py @@ -542,10 +542,14 @@ class AsyncDnsServer(AsyncServer): peer = Peer(peer_info[0], peer_info[1]) wire_length_bytes = await reader.read(2) + if len(wire_length_bytes) < 2: + return (wire_length,) = struct.unpack("!H", wire_length_bytes) logging.debug("Receiving TCP message (%d octets)...", wire_length) wire = await reader.read(wire_length) + if len(wire) < wire_length: + return full_message = wire_length_bytes + wire logging.debug("Received complete TCP message: %s", full_message.hex()) From a956947fbab189670db276ee352bc5d77f0a80b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20K=C4=99pie=C5=84?= Date: Tue, 18 Mar 2025 16:28:18 +0100 Subject: [PATCH 3/7] Refactor AsyncDnsServer._handle_tcp() Split up AsyncDnsServer._handle_tcp() into a set of smaller methods to improve code readability. --- bin/tests/system/isctest/asyncserver.py | 42 ++++++++++++++++++++----- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/bin/tests/system/isctest/asyncserver.py b/bin/tests/system/isctest/asyncserver.py index 952cb79756..7a3285402c 100644 --- a/bin/tests/system/isctest/asyncserver.py +++ b/bin/tests/system/isctest/asyncserver.py @@ -541,18 +541,49 @@ class AsyncDnsServer(AsyncServer): peer_info = writer.get_extra_info("peername") peer = Peer(peer_info[0], peer_info[1]) + for _ in range(0, 1): + wire = await self._read_tcp_query(reader) + if not wire: + break + await self._send_tcp_response(writer, peer, wire) + + writer.close() + await writer.wait_closed() + + async def _read_tcp_query(self, reader: asyncio.StreamReader) -> Optional[bytes]: + wire_length = await self._read_tcp_query_wire_length(reader) + if not wire_length: + return None + + return await self._read_tcp_query_wire(reader, wire_length) + + async def _read_tcp_query_wire_length( + self, reader: asyncio.StreamReader + ) -> Optional[int]: wire_length_bytes = await reader.read(2) if len(wire_length_bytes) < 2: - return + return None + (wire_length,) = struct.unpack("!H", wire_length_bytes) + + return wire_length + + async def _read_tcp_query_wire( + self, reader: asyncio.StreamReader, wire_length: int + ) -> Optional[bytes]: logging.debug("Receiving TCP message (%d octets)...", wire_length) wire = await reader.read(wire_length) if len(wire) < wire_length: - return - full_message = wire_length_bytes + wire - logging.debug("Received complete TCP message: %s", full_message.hex()) + return None + logging.debug("Received complete TCP message: %s", wire.hex()) + + return wire + + async def _send_tcp_response( + self, writer: asyncio.StreamWriter, peer: Peer, wire: bytes + ) -> None: responses = self._handle_query(wire, peer, DnsProtocol.TCP) async for response in responses: writer.write(response) @@ -562,9 +593,6 @@ class AsyncDnsServer(AsyncServer): logging.error("TCP connection from %s reset by peer", peer) return - writer.close() - await writer.wait_closed() - def _log_query(self, qctx: QueryContext, peer: Peer, protocol: DnsProtocol) -> None: logging.info( "Received %s/%s/%s (ID=%d) query from %s (%s)", From 748ed4259b66e4b33acf1d2584dc92da00d31aec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20K=C4=99pie=C5=84?= Date: Tue, 18 Mar 2025 16:28:18 +0100 Subject: [PATCH 4/7] Handle connection resets during reading A TCP peer may reset the connection at any point, but asyncserver.py currently only handles connection resets when it is sending data to the client. Handle connection resets during reading in the same way. --- bin/tests/system/isctest/asyncserver.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/bin/tests/system/isctest/asyncserver.py b/bin/tests/system/isctest/asyncserver.py index 7a3285402c..6e62f483a6 100644 --- a/bin/tests/system/isctest/asyncserver.py +++ b/bin/tests/system/isctest/asyncserver.py @@ -542,10 +542,14 @@ class AsyncDnsServer(AsyncServer): peer = Peer(peer_info[0], peer_info[1]) for _ in range(0, 1): - wire = await self._read_tcp_query(reader) - if not wire: - break - await self._send_tcp_response(writer, peer, wire) + try: + wire = await self._read_tcp_query(reader) + if not wire: + break + await self._send_tcp_response(writer, peer, wire) + except ConnectionResetError: + logging.error("TCP connection from %s reset by peer", peer) + return writer.close() await writer.wait_closed() @@ -587,11 +591,7 @@ class AsyncDnsServer(AsyncServer): responses = self._handle_query(wire, peer, DnsProtocol.TCP) async for response in responses: writer.write(response) - try: - await writer.drain() - except ConnectionResetError: - logging.error("TCP connection from %s reset by peer", peer) - return + await writer.drain() def _log_query(self, qctx: QueryContext, peer: Peer, protocol: DnsProtocol) -> None: logging.info( From 8c3f673f3777046e3d0afef8ffef6c86548ba8de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20K=C4=99pie=C5=84?= Date: Tue, 18 Mar 2025 16:28:18 +0100 Subject: [PATCH 5/7] Extend TCP logging Emit more log messages from TCP connection handling code and extend existing ones to improve debuggability of servers using asyncserver.py. --- bin/tests/system/isctest/asyncserver.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/bin/tests/system/isctest/asyncserver.py b/bin/tests/system/isctest/asyncserver.py index 6e62f483a6..902f436ccc 100644 --- a/bin/tests/system/isctest/asyncserver.py +++ b/bin/tests/system/isctest/asyncserver.py @@ -540,10 +540,11 @@ class AsyncDnsServer(AsyncServer): ) -> None: peer_info = writer.get_extra_info("peername") peer = Peer(peer_info[0], peer_info[1]) + logging.debug("Accepted TCP connection from %s", peer) for _ in range(0, 1): try: - wire = await self._read_tcp_query(reader) + wire = await self._read_tcp_query(reader, peer) if not wire: break await self._send_tcp_response(writer, peer, wire) @@ -551,19 +552,24 @@ class AsyncDnsServer(AsyncServer): logging.error("TCP connection from %s reset by peer", peer) return + logging.debug("Closing TCP connection from %s", peer) writer.close() await writer.wait_closed() - async def _read_tcp_query(self, reader: asyncio.StreamReader) -> Optional[bytes]: - wire_length = await self._read_tcp_query_wire_length(reader) + async def _read_tcp_query( + self, reader: asyncio.StreamReader, peer: Peer + ) -> Optional[bytes]: + wire_length = await self._read_tcp_query_wire_length(reader, peer) if not wire_length: return None - return await self._read_tcp_query_wire(reader, wire_length) + return await self._read_tcp_query_wire(reader, peer, wire_length) async def _read_tcp_query_wire_length( - self, reader: asyncio.StreamReader + self, reader: asyncio.StreamReader, peer: Peer ) -> Optional[int]: + logging.debug("Receiving TCP message length from %s...", peer) + wire_length_bytes = await reader.read(2) if len(wire_length_bytes) < 2: return None @@ -573,15 +579,15 @@ class AsyncDnsServer(AsyncServer): return wire_length async def _read_tcp_query_wire( - self, reader: asyncio.StreamReader, wire_length: int + self, reader: asyncio.StreamReader, peer: Peer, wire_length: int ) -> Optional[bytes]: - logging.debug("Receiving TCP message (%d octets)...", wire_length) + logging.debug("Receiving TCP message (%d octets) from %s...", wire_length, peer) wire = await reader.read(wire_length) if len(wire) < wire_length: return None - logging.debug("Received complete TCP message: %s", wire.hex()) + logging.debug("Received complete TCP message from %s: %s", peer, wire.hex()) return wire From 68fe9a5df5c5298413449771c062f85e4b1b9ef3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20K=C4=99pie=C5=84?= Date: Tue, 18 Mar 2025 16:28:18 +0100 Subject: [PATCH 6/7] Enable receiving chunked TCP DNS messages A TCP DNS client may send its queries in chunks, causing StreamReader.read() to return less data than previously declared by the client as the DNS message length; even the two-octet DNS message length itself may be split up into two single-octet transmissions. Sending data in chunks is valid client behavior that should not be treated as an error. Add a new helper method for reading TCP data in a loop, properly distinguishing between chunked queries and client disconnections. Use the new method for reading all TCP data from clients. --- bin/tests/system/isctest/asyncserver.py | 32 +++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/bin/tests/system/isctest/asyncserver.py b/bin/tests/system/isctest/asyncserver.py index 902f436ccc..0d8996e8e2 100644 --- a/bin/tests/system/isctest/asyncserver.py +++ b/bin/tests/system/isctest/asyncserver.py @@ -570,8 +570,8 @@ class AsyncDnsServer(AsyncServer): ) -> Optional[int]: logging.debug("Receiving TCP message length from %s...", peer) - wire_length_bytes = await reader.read(2) - if len(wire_length_bytes) < 2: + wire_length_bytes = await self._read_tcp_octets(reader, peer, 2) + if not wire_length_bytes: return None (wire_length,) = struct.unpack("!H", wire_length_bytes) @@ -583,14 +583,38 @@ class AsyncDnsServer(AsyncServer): ) -> Optional[bytes]: logging.debug("Receiving TCP message (%d octets) from %s...", wire_length, peer) - wire = await reader.read(wire_length) - if len(wire) < wire_length: + wire = await self._read_tcp_octets(reader, peer, wire_length) + if not wire: return None logging.debug("Received complete TCP message from %s: %s", peer, wire.hex()) return wire + async def _read_tcp_octets( + self, reader: asyncio.StreamReader, peer: Peer, expected: int + ) -> Optional[bytes]: + buffer = b"" + + while len(buffer) < expected: + chunk = await reader.read(expected - len(buffer)) + if not chunk: + if buffer: + logging.debug( + "Received short TCP message (%d octets) from %s: %s", + len(buffer), + peer, + buffer.hex(), + ) + else: + logging.debug("Received disconnect from %s", peer) + return None + + logging.debug("Received %d TCP octets from %s", len(chunk), peer) + buffer += chunk + + return buffer + async def _send_tcp_response( self, writer: asyncio.StreamWriter, peer: Peer, wire: bytes ) -> None: From 575a8745822ea4da706e8c7a93ad234d04b3cd03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20K=C4=99pie=C5=84?= Date: Tue, 18 Mar 2025 16:28:18 +0100 Subject: [PATCH 7/7] Handle queries indefinitely on each TCP connection Instead of closing every incoming TCP connection after handling a single query, continue receiving queries on each TCP connection until the client disconnects itself. When coupled with response dropping, this enables silently receiving all incoming data, simulating an unresponsive server. --- bin/tests/system/isctest/asyncserver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/tests/system/isctest/asyncserver.py b/bin/tests/system/isctest/asyncserver.py index 0d8996e8e2..b4270fa899 100644 --- a/bin/tests/system/isctest/asyncserver.py +++ b/bin/tests/system/isctest/asyncserver.py @@ -542,7 +542,7 @@ class AsyncDnsServer(AsyncServer): peer = Peer(peer_info[0], peer_info[1]) logging.debug("Accepted TCP connection from %s", peer) - for _ in range(0, 1): + while True: try: wire = await self._read_tcp_query(reader, peer) if not wire: