diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 4ceb182e7b..cc635e3f5f 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -378,9 +378,10 @@ struct isc__nm_uvreq { int magic; isc_nmsocket_t *sock; isc_nmhandle_t *handle; - char tcplen[2]; /* The TCP DNS message length */ - uv_buf_t uvbuf; /* translated isc_region_t, to be - * sent or received */ + char tcplen[2]; /* The TCP DNS message length */ + uv_buf_t uvbuf; /* translated isc_region_t, to be + * sent or received */ + isc_region_t userbuf; isc_sockaddr_t local; /* local address */ isc_sockaddr_t peer; /* peer address */ isc__nm_cb_t cb; /* callback */ @@ -995,7 +996,6 @@ struct isc_nmsocket { TLS_STATE_ERROR, TLS_STATE_CLOSING } state; - isc_region_t senddata; ISC_LIST(isc__nm_uvreq_t) sendreqs; bool cycle; isc_result_t pending_error; @@ -1060,6 +1060,12 @@ struct isc_nmsocket { */ uint64_t write_timeout; + /* + * Reading was throttled over TCP as the peer does not read the + * data we are sending back. + */ + bool reading_throttled; + /*% outer socket is for 'wrapped' sockets - e.g. tcpdns in tcp */ isc_nmsocket_t *outer; diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 336cad4e23..01c770d56a 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -49,8 +49,15 @@ * How many isc_nmhandles and isc_nm_uvreqs will we be * caching for reuse in a socket. */ -#define ISC_NM_HANDLES_STACK_SIZE 600 -#define ISC_NM_REQS_STACK_SIZE 600 +#define ISC_NM_HANDLES_STACK_SIZE 16 +#define ISC_NM_REQS_STACK_SIZE 16 + +/*% + * Same, but for UDP sockets which tend to need larger values as they + * process many requests per socket. + */ +#define ISC_NM_HANDLES_STACK_SIZE_UDP 64 +#define ISC_NM_REQS_STACK_SIZE_UDP 64 /*% * Shortcut index arrays to get access to statistics counters. @@ -1508,16 +1515,25 @@ void isc___nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type, isc_sockaddr_t *iface FLARG) { uint16_t family; + size_t inactive_handles_stack_size = ISC_NM_HANDLES_STACK_SIZE; + size_t inactive_reqs_stack_size = ISC_NM_REQS_STACK_SIZE; REQUIRE(sock != NULL); REQUIRE(mgr != NULL); - *sock = (isc_nmsocket_t){ .type = type, - .fd = -1, - .inactivehandles = isc_astack_new( - mgr->mctx, ISC_NM_HANDLES_STACK_SIZE), - .inactivereqs = isc_astack_new( - mgr->mctx, ISC_NM_REQS_STACK_SIZE) }; + if (type == isc_nm_udpsocket) { + inactive_handles_stack_size = ISC_NM_HANDLES_STACK_SIZE_UDP; + inactive_reqs_stack_size = ISC_NM_REQS_STACK_SIZE_UDP; + } + + *sock = (isc_nmsocket_t){ + .type = type, + .fd = -1, + .inactivehandles = isc_astack_new(mgr->mctx, + inactive_handles_stack_size), + .inactivereqs = isc_astack_new(mgr->mctx, + inactive_reqs_stack_size) + }; ISC_LIST_INIT(sock->tls.sendreqs); @@ -2096,7 +2112,6 @@ isc__nmsocket_readtimeout_cb(uv_timer_t *timer) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); - REQUIRE(atomic_load(&sock->reading)); if (atomic_load(&sock->client)) { uv_timer_stop(timer); diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 016a9e9059..37d44bd9c8 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -766,7 +766,7 @@ isc__nm_async_tcpstartread(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tcpstartread_t *ievent = (isc__netievent_tcpstartread_t *)ev0; isc_nmsocket_t *sock = ievent->sock; - isc_result_t result; + isc_result_t result = ISC_R_SUCCESS; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); @@ -774,7 +774,7 @@ isc__nm_async_tcpstartread(isc__networker_t *worker, isc__netievent_t *ev0) { if (isc__nmsocket_closing(sock)) { result = ISC_R_CANCELED; - } else { + } else if (!sock->reading_throttled) { result = isc__nm_start_reading(sock); } @@ -926,6 +926,7 @@ isc__nm_tcp_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { "the other side is " "not reading the data (%zu)", write_queue_size); + sock->reading_throttled = true; isc__nm_stop_reading(sock); } } @@ -1122,7 +1123,7 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, const isc_region_t *region, static void tcp_maybe_restart_reading(isc_nmsocket_t *sock) { - if (!sock->client && sock->reading && + if (!sock->client && sock->reading_throttled && !uv_is_active(&sock->uv_handle.handle)) { /* @@ -1142,6 +1143,7 @@ tcp_maybe_restart_reading(isc_nmsocket_t *sock) { "resuming TCP connection, the other side " "is reading the data again (%zu)", write_queue_size); + sock->reading_throttled = false; isc__nm_start_reading(sock); } } @@ -1165,7 +1167,14 @@ tcp_send_cb(uv_write_t *req, int status) { isc__nm_failed_send_cb(sock, uvreq, isc__nm_uverr2result(status)); - if (!sock->client && sock->reading) { + if (!sock->client && + (atomic_load(&sock->reading) || sock->reading_throttled)) + { + /* + * As we are resuming reading, it is not throttled + * anymore (technically). + */ + sock->reading_throttled = false; isc__nm_start_reading(sock); isc__nmsocket_reset(sock); } diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 00ecb0f3d2..d7ec755bec 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -734,7 +734,7 @@ isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tcpdnsread_t *ievent = (isc__netievent_tcpdnsread_t *)ev0; isc_nmsocket_t *sock = ievent->sock; - isc_result_t result; + isc_result_t result = ISC_R_SUCCESS; UNUSED(worker); @@ -743,7 +743,7 @@ isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0) { if (isc__nmsocket_closing(sock)) { result = ISC_R_CANCELED; - } else { + } else if (!sock->reading_throttled) { result = isc__nm_process_sock_buffer(sock); } @@ -932,6 +932,7 @@ isc__nm_tcpdns_read_cb(uv_stream_t *stream, ssize_t nread, "the other side is " "not reading the data (%zu)", write_queue_size); + sock->reading_throttled = true; isc__nm_stop_reading(sock); } } @@ -1158,7 +1159,7 @@ isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region, static void tcpdns_maybe_restart_reading(isc_nmsocket_t *sock) { - if (!sock->client && sock->reading && + if (!sock->client && sock->reading_throttled && !uv_is_active(&sock->uv_handle.handle)) { /* @@ -1178,6 +1179,7 @@ tcpdns_maybe_restart_reading(isc_nmsocket_t *sock) { "resuming TCP connection, the other side " "is reading the data again (%zu)", write_queue_size); + sock->reading_throttled = false; isc__nm_start_reading(sock); } } @@ -1201,7 +1203,14 @@ tcpdns_send_cb(uv_write_t *req, int status) { isc__nm_failed_send_cb(sock, uvreq, isc__nm_uverr2result(status)); - if (!sock->client && sock->reading) { + if (!sock->client && + (atomic_load(&sock->reading) || sock->reading_throttled)) + { + /* + * As we are resuming reading, it is not throttled + * anymore (technically). + */ + sock->reading_throttled = false; isc__nm_start_reading(sock); isc__nmsocket_reset(sock); } diff --git a/lib/isc/netmgr/tlsdns.c b/lib/isc/netmgr/tlsdns.c index fa416e2fef..b41c353842 100644 --- a/lib/isc/netmgr/tlsdns.c +++ b/lib/isc/netmgr/tlsdns.c @@ -88,6 +88,9 @@ tlsdns_set_tls_shutdown(isc_tls_t *tls) { (void)SSL_set_shutdown(tls, SSL_SENT_SHUTDOWN); } +static void +tlsdns_maybe_restart_reading(isc_nmsocket_t *sock); + static bool peer_verification_has_failed(isc_nmsocket_t *sock) { if (sock->tls.tls != NULL && sock->tls.state == TLS_STATE_HANDSHAKE && @@ -1084,6 +1087,19 @@ tls_cycle_input(isc_nmsocket_t *sock) { size_t len; for (;;) { + /* + * There is a similar branch in + * isc__nm_process_sock_buffer() which is sufficient to + * stop excessive processing in TCP. However, as we wrap + * this call in a loop, we need to have it here in order + * to limit the number of loop iterations (and, + * consequently, the number of messages processed). + */ + if (atomic_load(&sock->ah) >= STREAM_CLIENTS_PER_CONN) { + isc__nm_stop_reading(sock); + break; + } + (void)SSL_peek(sock->tls.tls, &(char){ '\0' }, 0); int pending = SSL_pending(sock->tls.tls); @@ -1261,17 +1277,17 @@ call_pending_send_callbacks(isc_nmsocket_t *sock, const isc_result_t result) { } static void -free_senddata(isc_nmsocket_t *sock, const isc_result_t result) { +free_senddata(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, + const isc_result_t result) { REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->tls.senddata.base != NULL); - REQUIRE(sock->tls.senddata.length > 0); + REQUIRE(req != NULL && req->userbuf.base != NULL && + req->userbuf.length > 0); - isc_mem_put(sock->mgr->mctx, sock->tls.senddata.base, - sock->tls.senddata.length); - sock->tls.senddata.base = NULL; - sock->tls.senddata.length = 0; + isc_mem_put(sock->mgr->mctx, req->userbuf.base, req->userbuf.length); call_pending_send_callbacks(sock, result); + + isc__nm_uvreq_put(&req, sock); } static void @@ -1284,11 +1300,19 @@ tls_write_cb(uv_write_t *req, int status) { isc_nm_timer_stop(uvreq->timer); isc_nm_timer_detach(&uvreq->timer); - free_senddata(sock, result); - - isc__nm_uvreq_put(&uvreq, sock); + free_senddata(sock, uvreq, result); if (status != 0) { + if (!sock->client && + (atomic_load(&sock->reading) || sock->reading_throttled)) + { + /* + * As we are resuming reading, it is not throttled + * anymore (technically). + */ + sock->reading_throttled = false; + isc__nm_start_reading(sock); + } tls_error(sock, result); return; } @@ -1298,6 +1322,8 @@ tls_write_cb(uv_write_t *req, int status) { tls_error(sock, result); return; } + + tlsdns_maybe_restart_reading(sock); } static isc_result_t @@ -1311,23 +1337,18 @@ tls_cycle_output(isc_nmsocket_t *sock) { int rv; int r; - if (sock->tls.senddata.base != NULL || - sock->tls.senddata.length > 0) - { - break; - } - if (pending > (int)ISC_NETMGR_TCP_RECVBUF_SIZE) { pending = (int)ISC_NETMGR_TCP_RECVBUF_SIZE; } - sock->tls.senddata.base = isc_mem_get(sock->mgr->mctx, pending); - sock->tls.senddata.length = pending; - /* It's a bit misnomer here, but it does the right thing */ req = isc__nm_get_read_req(sock, NULL); - req->uvbuf.base = (char *)sock->tls.senddata.base; - req->uvbuf.len = sock->tls.senddata.length; + + req->userbuf.base = isc_mem_get(sock->mgr->mctx, pending); + req->userbuf.length = (size_t)pending; + + req->uvbuf.base = (char *)req->userbuf.base; + req->uvbuf.len = (size_t)req->userbuf.length; rv = BIO_read_ex(sock->tls.app_rbio, req->uvbuf.base, req->uvbuf.len, &bytes); @@ -1339,23 +1360,20 @@ tls_cycle_output(isc_nmsocket_t *sock) { if (r == pending) { /* Wrote everything, restart */ - isc__nm_uvreq_put(&req, sock); - free_senddata(sock, ISC_R_SUCCESS); + free_senddata(sock, req, ISC_R_SUCCESS); continue; } if (r > 0) { /* Partial write, send rest asynchronously */ - memmove(req->uvbuf.base, req->uvbuf.base + r, - req->uvbuf.len - r); - req->uvbuf.len = req->uvbuf.len - r; + req->uvbuf.base += r; + req->uvbuf.len -= r; } else if (r == UV_ENOSYS || r == UV_EAGAIN) { /* uv_try_write is not supported, send * asynchronously */ } else { result = isc__nm_uverr2result(r); - isc__nm_uvreq_put(&req, sock); - free_senddata(sock, result); + free_senddata(sock, req, result); break; } @@ -1363,8 +1381,7 @@ tls_cycle_output(isc_nmsocket_t *sock) { &req->uvbuf, 1, tls_write_cb); if (r < 0) { result = isc__nm_uverr2result(r); - isc__nm_uvreq_put(&req, sock); - free_senddata(sock, result); + free_senddata(sock, req, result); break; } @@ -1533,6 +1550,28 @@ isc__nm_tlsdns_read_cb(uv_stream_t *stream, ssize_t nread, result = tls_cycle(sock); if (result != ISC_R_SUCCESS) { isc__nm_failed_read_cb(sock, result, true); + } else if (!sock->client) { + /* + * Stop reading if we have accumulated enough bytes in + * the send queue; this means that the TCP client is not + * reading back the data we sending to it, and there's + * no reason to continue processing more incoming DNS + * messages, if the client is not reading back the + * responses. + */ + size_t write_queue_size = + uv_stream_get_write_queue_size(&sock->uv_handle.stream); + + if (write_queue_size >= ISC_NETMGR_TCP_SENDBUF_SIZE) { + isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, + ISC_LOGMODULE_NETMGR, ISC_LOG_DEBUG(3), + "throttling TCP connection, " + "the other side is " + "not reading the data (%zu)", + write_queue_size); + sock->reading_throttled = true; + isc__nm_stop_reading(sock); + } } free: async_tlsdns_cycle(sock); @@ -1776,6 +1815,34 @@ isc__nm_tlsdns_send(isc_nmhandle_t *handle, isc_region_t *region, return; } +static void +tlsdns_maybe_restart_reading(isc_nmsocket_t *sock) { + if (!sock->client && sock->reading_throttled && + !uv_is_active(&sock->uv_handle.handle)) + { + /* + * Restart reading if we have less data in the send queue than + * the send buffer size, this means that the TCP client has + * started reading some data again. Starting reading when we go + * under the limit instead of waiting for all data has been + * flushed allows faster recovery (in case there was a + * congestion and now there isn't). + */ + size_t write_queue_size = + uv_stream_get_write_queue_size(&sock->uv_handle.stream); + if (write_queue_size < ISC_NETMGR_TCP_SENDBUF_SIZE) { + isc_log_write( + isc_lctx, ISC_LOGCATEGORY_GENERAL, + ISC_LOGMODULE_NETMGR, ISC_LOG_DEBUG(3), + "resuming TCP connection, the other side " + "is reading the data again (%zu)", + write_queue_size); + sock->reading_throttled = false; + isc__nm_start_reading(sock); + } + } +} + /* * Handle 'tcpsend' async event - send a packet on the socket */