diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 56e5bef4a9..99a4fdb2e8 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -61,9 +61,10 @@ #endif /* - * The TCP receive buffer can fit one maximum sized DNS message plus its size, - * the receive buffer here affects TCP, DoT and DoH. + * The TCP send and receive buffers can fit one maximum sized DNS message plus + * its size, the receive buffer here affects TCP, DoT and DoH. */ +#define ISC_NETMGR_TCP_SENDBUF_SIZE (sizeof(uint16_t) + UINT16_MAX) #define ISC_NETMGR_TCP_RECVBUF_SIZE (sizeof(uint16_t) + UINT16_MAX) /* Pick the larger buffer */ diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 7ee0502ae1..9fa5c2b9c8 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -1128,6 +1128,7 @@ isc__nmsocket_writetimeout_cb(void *data, isc_result_t eresult) { sock = req->sock; + isc__nm_start_reading(sock); isc__nmsocket_reset(sock); } diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 63612bf130..45e1c79c73 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -655,6 +655,7 @@ isc__nm_tcp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result, isc__nmsocket_timer_stop(sock); isc__nm_stop_reading(sock); + sock->reading = false; if (sock->recv_cb != NULL) { isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL); @@ -701,13 +702,14 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { goto failure; } + sock->reading = true; + if (!sock->manual_read_timer) { isc__nmsocket_timer_start(sock); } return; failure: - sock->reading = true; isc__nm_tcp_failed_read_cb(sock, result, true); } @@ -720,6 +722,7 @@ isc__nm_tcp_read_stop(isc_nmhandle_t *handle) { isc__nmsocket_timer_stop(sock); isc__nm_stop_reading(sock); + sock->reading = false; return; } @@ -771,8 +774,29 @@ isc__nm_tcp_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { isc__nm_readcb(sock, req, ISC_R_SUCCESS, false); - /* The readcb could have paused the reading */ - if (sock->reading && !sock->manual_read_timer) { + if (!sock->client && sock->reading) { + /* + * Stop reading if we have accumulated enough bytes in the send + * queue; this means that the TCP client is not reading back the + * data we sending to it, and there's no reason to continue + * processing more incoming DNS messages, if the client is not + * reading back the responses. + */ + size_t write_queue_size = + uv_stream_get_write_queue_size(&sock->uv_handle.stream); + + if (write_queue_size >= ISC_NETMGR_TCP_SENDBUF_SIZE) { + isc__nmsocket_log( + sock, ISC_LOG_DEBUG(3), + "throttling TCP connection, the other side is " + "not reading the data (%zu)", + write_queue_size); + isc__nm_stop_reading(sock); + } + } else if (uv_is_active(&sock->uv_handle.handle) && + !sock->manual_read_timer) + { + /* The readcb could have paused the reading */ /* The timer will be updated */ isc__nmsocket_timer_restart(sock); } @@ -996,6 +1020,32 @@ isc__nm_tcp_senddns(isc_nmhandle_t *handle, const isc_region_t *region, tcp_send(handle, region, cb, cbarg, true); } +static void +tcp_maybe_restart_reading(isc_nmsocket_t *sock) { + if (!sock->client && sock->reading && + !uv_is_active(&sock->uv_handle.handle)) + { + /* + * Restart reading if we have less data in the send queue than + * the send buffer size, this means that the TCP client has + * started reading some data again. Starting reading when we go + * under the limit instead of waiting for all data has been + * flushed allows faster recovery (in case there was a + * congestion and now there isn't). + */ + size_t write_queue_size = + uv_stream_get_write_queue_size(&sock->uv_handle.stream); + if (write_queue_size < ISC_NETMGR_TCP_SENDBUF_SIZE) { + isc__nmsocket_log( + sock, ISC_LOG_DEBUG(3), + "resuming TCP connection, the other side " + "is reading the data again (%zu)", + write_queue_size); + isc__nm_start_reading(sock); + } + } +} + static void tcp_send_cb(uv_write_t *req, int status) { isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data; @@ -1013,10 +1063,15 @@ tcp_send_cb(uv_write_t *req, int status) { isc__nm_incstats(sock, STATID_SENDFAIL); isc__nm_failed_send_cb(sock, uvreq, isc_uverr2result(status), false); + if (!sock->client && sock->reading) { + isc__nm_start_reading(sock); + isc__nmsocket_reset(sock); + } return; } isc__nm_sendcb(sock, uvreq, ISC_R_SUCCESS, false); + tcp_maybe_restart_reading(sock); } static isc_result_t @@ -1045,6 +1100,7 @@ tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { if (r == (int)(bufs[0].len)) { /* Wrote everything */ isc__nm_sendcb(sock, req, ISC_R_SUCCESS, true); + tcp_maybe_restart_reading(sock); return (ISC_R_SUCCESS); } else if (r > 0) { bufs[0].base += (size_t)r; @@ -1064,6 +1120,7 @@ tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { if (r == (int)(bufs[0].len + bufs[1].len)) { /* Wrote everything */ isc__nm_sendcb(sock, req, ISC_R_SUCCESS, true); + tcp_maybe_restart_reading(sock); return (ISC_R_SUCCESS); } else if (r == 1) { /* Partial write of DNSMSG length */ @@ -1148,6 +1205,7 @@ isc__nm_tcp_close(isc_nmsocket_t *sock) { /* 2. close the socket + destroy the socket in callback */ isc__nmsocket_clearcb(sock); isc__nm_stop_reading(sock); + sock->reading = false; uv_close(&sock->uv_handle.handle, tcp_close_cb); /* 1. close the timer */ @@ -1226,7 +1284,7 @@ isc__nmhandle_tcp_set_manual_timer(isc_nmhandle_t *handle, const bool manual) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_tcpsocket); REQUIRE(sock->tid == isc_tid()); - REQUIRE(!sock->reading); + REQUIRE(!uv_is_active(&sock->uv_handle.handle)); sock->manual_read_timer = manual; }