From fffd444440b5e005d969ade182a638ec6c69d9f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Sur=C3=BD?= Date: Mon, 29 Aug 2022 10:55:10 +0200 Subject: [PATCH] Cleanup the asychronous code in the stream implementations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After the loopmgr work has been merged, we can now cleanup the TCP and TLS protocols a little bit, because there are stronger guarantees that the sockets will be kept on the respective loops/threads. We only need asynchronous call for listening sockets (start, stop) and reading from the TCP (because the isc_nm_read() might be called from read callback again. This commit does the following changes (they are intertwined together): 1. Cleanup most of the asynchronous events in the TCP code, and add comments for the events that needs to be kept asynchronous. 2. Remove isc_nm_resumeread() from the netmgr API, and replace isc_nm_resumeread() calls with existing isc_nm_read() calls. 3. Remove isc_nm_pauseread() from the netmgr API, and replace isc_nm_pauseread() calls with a new isc_nm_read_stop() call. 4. Disable the isc_nm_cancelread() for the streaming protocols, only the datagram-like protocols can use isc_nm_cancelread(). 5. Add isc_nmhandle_close() that can be used to shutdown the socket earlier than after the last detach. Formerly, the socket would be closed only after all reading and sending would be finished and the last reference would be detached. The new isc_nmhandle_close() can be used to close the underlying socket earlier, so all the other asynchronous calls would call their respective callbacks immediately. Co-authored-by: Ondřej Surý Co-authored-by: Artem Boldariev --- lib/isc/httpd.c | 13 +- lib/isc/include/isc/netmgr.h | 15 +- lib/isc/netmgr/http.c | 30 ++- lib/isc/netmgr/netmgr-int.h | 92 ++------- lib/isc/netmgr/netmgr.c | 79 +++----- lib/isc/netmgr/tcp.c | 359 ++++++++--------------------------- lib/isc/netmgr/tcpdns.c | 10 +- lib/isc/netmgr/tlsdns.c | 4 +- lib/isc/netmgr/tlsstream.c | 157 +++++---------- lib/isc/netmgr/udp.c | 4 +- lib/isccc/ccmsg.c | 12 +- tests/isc/netmgr_common.c | 20 +- tests/isc/netmgr_common.h | 1 + tests/isc/tcp_test.c | 10 +- tests/isc/tls_test.c | 1 + 15 files changed, 220 insertions(+), 587 deletions(-) diff --git a/lib/isc/httpd.c b/lib/isc/httpd.c index 5f9324bdf9..2a0352d198 100644 --- a/lib/isc/httpd.c +++ b/lib/isc/httpd.c @@ -413,7 +413,6 @@ process_request(isc_httpd_t *httpd, isc_region_t *region, size_t *buflen) { const char *content_length = NULL; size_t limit = sizeof(httpd->recvbuf) - httpd->recvlen - 1; size_t len = region->length; - size_t clen = 0; int delim; bool truncated = false; @@ -579,7 +578,7 @@ process_request(isc_httpd_t *httpd, isc_region_t *region, size_t *buflen) { } else { INSIST(content_length != NULL); - clen = (size_t)strtoul(content_length, NULL, 10); + size_t clen = (size_t)strtoul(content_length, NULL, 10); if (clen == ULONG_MAX) { /* Invalid number in the header value. */ return (ISC_R_BADNUMBER); @@ -931,7 +930,8 @@ httpd_request(isc_nmhandle_t *handle, isc_result_t eresult, * ISC_R_NOTFOUND is not returned from netmgr) and we * need to resume reading. */ - isc_nm_resumeread(httpd->readhandle); + isc_nm_read(httpd->readhandle, httpd_request, + httpd->mgr); return; } goto cleanup_readhandle; @@ -1043,7 +1043,7 @@ httpd_request(isc_nmhandle_t *handle, isc_result_t eresult, */ isc_buffer_usedregion(httpd->sendbuffer, &r); - isc_nm_pauseread(httpd->handle); + isc_nm_read_stop(httpd->handle); httpd->state = SEND; isc_nmhandle_attach(httpd->handle, &httpd->sendhandle); @@ -1072,7 +1072,8 @@ isc_httpdmgr_shutdown(isc_httpdmgr_t **httpdmgrp) { httpd = ISC_LIST_HEAD(httpdmgr->running); while (httpd != NULL) { - isc_nm_cancelread(httpd->readhandle); + isc_nm_read_stop(httpd->readhandle); + isc_nmhandle_detach(&httpd->readhandle); httpd = ISC_LIST_NEXT(httpd, link); } UNLOCK(&httpdmgr->lock); @@ -1232,7 +1233,7 @@ httpd_senddone(isc_nmhandle_t *handle, isc_result_t result, void *arg) { */ httpd_request(httpd->handle, ISC_R_SUCCESS, NULL, httpd->mgr); } else if (!httpd->truncated) { - isc_nm_resumeread(httpd->readhandle); + isc_nm_read(httpd->readhandle, httpd_request, httpd->mgr); } else { /* Truncated request, don't resume */ goto cleanup_readhandle; diff --git a/lib/isc/include/isc/netmgr.h b/lib/isc/include/isc/netmgr.h index b282bf6d4d..c9ce9907fe 100644 --- a/lib/isc/include/isc/netmgr.h +++ b/lib/isc/include/isc/netmgr.h @@ -308,9 +308,9 @@ isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg); */ void -isc_nm_pauseread(isc_nmhandle_t *handle); +isc_nm_read_stop(isc_nmhandle_t *handle); /*%< - * Pause reading on this handle's socket, but remember the callback. + * Stop reading on this handle's socket. * * Requires: * \li 'handle' is a valid netmgr handle. @@ -323,19 +323,12 @@ isc_nm_cancelread(isc_nmhandle_t *handle); * active handles with a result code of ISC_R_CANCELED. * * Requires: - * \li 'sock' is a valid netmgr socket + * \li 'sock' is a valid datagram-like netmgr socket * \li ...for which a read/recv callback has been defined. */ void -isc_nm_resumeread(isc_nmhandle_t *handle); -/*%< - * Resume reading on the handle's socket. - * - * Requires: - * \li 'handle' is a valid netmgr handle. - * \li ...for a socket with a defined read/recv callback. - */ +isc_nmhandle_close(isc_nmhandle_t *handle); void isc_nm_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, diff --git a/lib/isc/netmgr/http.c b/lib/isc/netmgr/http.c index 319e7e7005..2a8677126e 100644 --- a/lib/isc/netmgr/http.c +++ b/lib/isc/netmgr/http.c @@ -490,7 +490,8 @@ finish_http_session(isc_nm_http_session_t *session) { if (session->handle != NULL) { if (!session->closed) { session->closed = true; - isc_nm_cancelread(session->handle); + session->reading = false; + isc_nmhandle_close(session->handle); } if (session->client) { @@ -1007,7 +1008,7 @@ http_readcb(isc_nmhandle_t *handle, isc_result_t result, isc_region_t *region, } isc_buffer_putmem(session->buf, region->base + readlen, unread_size); - isc_nm_pauseread(session->handle); + isc_nm_read_stop(session->handle); } /* We might have something to receive or send, do IO */ @@ -1287,11 +1288,11 @@ http_do_bio(isc_nm_http_session_t *session, isc_nmhandle_t *send_httphandle, return; } else { /* Resume reading, it's idempotent, wait for more */ - isc_nm_resumeread(session->handle); + isc_nm_read(session->handle, http_readcb, session); } } else { /* We don't want more data, stop reading for now */ - isc_nm_pauseread(session->handle); + isc_nm_read_stop(session->handle); } if (send_cb != NULL) { @@ -1332,31 +1333,22 @@ get_http_cstream(isc_nmsocket_t *sock, http_cstream_t **streamp) { static void http_call_connect_cb(isc_nmsocket_t *sock, isc_nm_http_session_t *session, isc_result_t result) { - isc__nm_uvreq_t *req = NULL; isc_nmhandle_t *httphandle = isc__nmhandle_get(sock, &sock->peer, &sock->iface); + void *cbarg; + isc_nm_cb_t connect_cb; REQUIRE(sock->connect_cb != NULL); + cbarg = sock->connect_cbarg; + connect_cb = sock->connect_cb; + isc__nmsocket_clearcb(sock); if (result == ISC_R_SUCCESS) { - req = isc__nm_uvreq_get(sock->worker, sock); - req->cb.connect = sock->connect_cb; - req->cbarg = sock->connect_cbarg; if (session != NULL) { session->client_httphandle = httphandle; - req->handle = NULL; - isc_nmhandle_attach(httphandle, &req->handle); - } else { - req->handle = httphandle; } - - isc__nmsocket_clearcb(sock); - isc__nm_connectcb(sock, req, result, true); + connect_cb(httphandle, result, cbarg); } else { - void *cbarg = sock->connect_cbarg; - isc_nm_cb_t connect_cb = sock->connect_cb; - - isc__nmsocket_clearcb(sock); connect_cb(httphandle, result, cbarg); isc_nmhandle_detach(&httphandle); } diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 4805223767..56d4792c2e 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -254,13 +254,7 @@ struct isc_nmhandle { typedef enum isc__netievent_type { netievent_udpcancel, - netievent_tcpconnect, - netievent_tcpclose, - netievent_tcpsend, - netievent_tcpstartread, - netievent_tcppauseread, netievent_tcpaccept, - netievent_tcpcancel, netievent_tcpdnsaccept, netievent_tcpdnsconnect, @@ -271,10 +265,8 @@ typedef enum isc__netievent_type { netievent_tlsclose, netievent_tlssend, - netievent_tlsstartread, netievent_tlsconnect, netievent_tlsdobio, - netievent_tlscancel, netievent_tlsdnsaccept, netievent_tlsdnsconnect, @@ -924,7 +916,6 @@ struct isc_nmsocket { TLS_CLOSED } state; /*%< The order of these is significant */ size_t nsending; - bool reading; } tlsstream; isc_nmsocket_h2_t h2; @@ -1005,7 +996,7 @@ struct isc_nmsocket { atomic_bool connecting; atomic_bool connected; atomic_bool accepting; - atomic_bool reading; + bool reading; atomic_bool timedout; isc_refcount_t references; @@ -1020,11 +1011,6 @@ struct isc_nmsocket { */ bool processing; - /*% - * A TCP socket has had isc_nm_pauseread() called. - */ - atomic_bool readpaused; - /*% * A TCP or TCPDNS socket has been set to use the keepalive * timeout instead of the default idle timeout. @@ -1326,7 +1312,7 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, const isc_region_t *region, void isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg); /* - * Back-end implementation of isc_nm_read() for TCP handles. + * Start reading on this handle. */ void @@ -1335,16 +1321,9 @@ isc__nm_tcp_close(isc_nmsocket_t *sock); * Close a TCP socket. */ void -isc__nm_tcp_pauseread(isc_nmhandle_t *handle); +isc__nm_tcp_read_stop(isc_nmhandle_t *handle); /*%< - * Pause reading on this handle, while still remembering the callback. - */ - -void -isc__nm_tcp_resumeread(isc_nmhandle_t *handle); -/*%< - * Resume reading from socket. - * + * Stop reading on this handle. */ void @@ -1366,40 +1345,18 @@ isc__nm_tcp_stoplistening(isc_nmsocket_t *sock); * Stop listening on 'sock'. */ -int_fast32_t -isc__nm_tcp_listener_nactive(isc_nmsocket_t *sock); -/*%< - * Returns the number of active connections for the TCP listener socket. - */ - void isc__nm_tcp_settimeout(isc_nmhandle_t *handle, uint32_t timeout); /*%< * Set the read timeout for the TCP socket associated with 'handle'. */ -void -isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0); void isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0); void isc__nm_async_tcpaccept(isc__networker_t *worker, isc__netievent_t *ev0); void isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0); -void -isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0); -void -isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ev0); -void -isc__nm_async_pauseread(isc__networker_t *worker, isc__netievent_t *ev0); -void -isc__nm_async_tcpstartread(isc__networker_t *worker, isc__netievent_t *ev0); -void -isc__nm_async_tcppauseread(isc__networker_t *worker, isc__netievent_t *ev0); -void -isc__nm_async_tcpcancel(isc__networker_t *worker, isc__netievent_t *ev0); -void -isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0); /*%< * Callback handlers for asynchronous TCP events (connect, listen, * stoplisten, send, read, pause, close). @@ -1411,14 +1368,9 @@ isc__nm_async_tlsclose(isc__networker_t *worker, isc__netievent_t *ev0); void isc__nm_async_tlssend(isc__networker_t *worker, isc__netievent_t *ev0); -void -isc__nm_async_tlsstartread(isc__networker_t *worker, isc__netievent_t *ev0); - void isc__nm_async_tlsdobio(isc__networker_t *worker, isc__netievent_t *ev0); -void -isc__nm_async_tlscancel(isc__networker_t *worker, isc__netievent_t *ev0); /*%< * Callback handlers for asynchronous TLS events. */ @@ -1570,15 +1522,15 @@ void isc__nm_tls_send(isc_nmhandle_t *handle, const isc_region_t *region, isc_nm_cb_t cb, void *cbarg); -void -isc__nm_tls_cancelread(isc_nmhandle_t *handle); - /*%< * Back-end implementation of isc_nm_send() for TLSDNS handles. */ void isc__nm_tls_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg); +/*%< + * Start reading on the TLS handle. + */ void isc__nm_tls_close(isc_nmsocket_t *sock); @@ -1587,16 +1539,9 @@ isc__nm_tls_close(isc_nmsocket_t *sock); */ void -isc__nm_tls_pauseread(isc_nmhandle_t *handle); +isc__nm_tls_read_stop(isc_nmhandle_t *handle); /*%< - * Pause reading on this handle, while still remembering the callback. - */ - -void -isc__nm_tls_resumeread(isc_nmhandle_t *handle); -/*%< - * Resume reading from the handle. - * + * Stop reading on the TLS handle. */ void @@ -1632,6 +1577,9 @@ void isc__nmhandle_tls_setwritetimeout(isc_nmhandle_t *handle, uint64_t write_timeout); +void +isc__nm_tls_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result); + void isc__nm_http_stoplistening(isc_nmsocket_t *sock); @@ -1819,16 +1767,12 @@ isc__nm_set_network_buffers(isc_nm_t *nm, uv_handle_t *handle); * typedef all the netievent types */ -NETIEVENT_SOCKET_TYPE(tcpclose); NETIEVENT_SOCKET_TYPE(tcplisten); -NETIEVENT_SOCKET_TYPE(tcppauseread); NETIEVENT_SOCKET_TYPE(tcpstop); NETIEVENT_SOCKET_TYPE(tlsclose); /* NETIEVENT_SOCKET_TYPE(tlsconnect); */ /* unique type, defined independently */ NETIEVENT_SOCKET_TYPE(tlsdobio); -NETIEVENT_SOCKET_TYPE(tlsstartread); -NETIEVENT_SOCKET_HANDLE_TYPE(tlscancel); NETIEVENT_SOCKET_TYPE(udplisten); NETIEVENT_SOCKET_TYPE(udpstop); @@ -1858,9 +1802,6 @@ NETIEVENT_SOCKET_TYPE(httpclose); NETIEVENT_SOCKET_HTTP_EPS_TYPE(httpendpoints); #endif /* HAVE_LIBNGHTTP2 */ -NETIEVENT_SOCKET_REQ_TYPE(tcpconnect); -NETIEVENT_SOCKET_REQ_TYPE(tcpsend); -NETIEVENT_SOCKET_TYPE(tcpstartread); NETIEVENT_SOCKET_REQ_TYPE(tlssend); NETIEVENT_SOCKET_REQ_RESULT_TYPE(connectcb); @@ -1868,7 +1809,6 @@ NETIEVENT_SOCKET_REQ_RESULT_TYPE(readcb); NETIEVENT_SOCKET_REQ_RESULT_TYPE(sendcb); NETIEVENT_SOCKET_HANDLE_TYPE(detach); -NETIEVENT_SOCKET_HANDLE_TYPE(tcpcancel); NETIEVENT_SOCKET_HANDLE_TYPE(udpcancel); NETIEVENT_SOCKET_QUOTA_TYPE(tcpaccept); @@ -1877,16 +1817,11 @@ NETIEVENT_SOCKET_TLSCTX_TYPE(settlsctx); /* Now declared the helper functions */ -NETIEVENT_SOCKET_DECL(tcpclose); NETIEVENT_SOCKET_DECL(tcplisten); -NETIEVENT_SOCKET_DECL(tcppauseread); -NETIEVENT_SOCKET_DECL(tcpstartread); NETIEVENT_SOCKET_DECL(tcpstop); NETIEVENT_SOCKET_DECL(tlsclose); NETIEVENT_SOCKET_DECL(tlsconnect); NETIEVENT_SOCKET_DECL(tlsdobio); -NETIEVENT_SOCKET_DECL(tlsstartread); -NETIEVENT_SOCKET_HANDLE_DECL(tlscancel); NETIEVENT_SOCKET_DECL(udplisten); NETIEVENT_SOCKET_DECL(udpstop); @@ -1916,15 +1851,12 @@ NETIEVENT_SOCKET_DECL(httpclose); NETIEVENT_SOCKET_HTTP_EPS_DECL(httpendpoints); #endif /* HAVE_LIBNGHTTP2 */ -NETIEVENT_SOCKET_REQ_DECL(tcpconnect); -NETIEVENT_SOCKET_REQ_DECL(tcpsend); NETIEVENT_SOCKET_REQ_DECL(tlssend); NETIEVENT_SOCKET_REQ_RESULT_DECL(connectcb); NETIEVENT_SOCKET_REQ_RESULT_DECL(readcb); NETIEVENT_SOCKET_REQ_RESULT_DECL(sendcb); -NETIEVENT_SOCKET_HANDLE_DECL(tcpcancel); NETIEVENT_SOCKET_HANDLE_DECL(udpcancel); NETIEVENT_SOCKET_DECL(detach); diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 14f5d2e832..d0b1d27f28 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -451,14 +451,8 @@ process_netievent(void *arg) { NETIEVENT_CASE(udpcancel); NETIEVENT_CASE(tcpaccept); - NETIEVENT_CASE(tcpconnect); NETIEVENT_CASE(tcplisten); - NETIEVENT_CASE(tcpstartread); - NETIEVENT_CASE(tcppauseread); - NETIEVENT_CASE(tcpsend); NETIEVENT_CASE(tcpstop); - NETIEVENT_CASE(tcpcancel); - NETIEVENT_CASE(tcpclose); NETIEVENT_CASE(tcpdnsaccept); NETIEVENT_CASE(tcpdnslisten); @@ -481,11 +475,9 @@ process_netievent(void *arg) { NETIEVENT_CASE(tlsdnsshutdown); #if HAVE_LIBNGHTTP2 - NETIEVENT_CASE(tlsstartread); NETIEVENT_CASE(tlssend); NETIEVENT_CASE(tlsclose); NETIEVENT_CASE(tlsdobio); - NETIEVENT_CASE(tlscancel); NETIEVENT_CASE(httpsend); NETIEVENT_CASE(httpclose); @@ -522,16 +514,11 @@ isc__nm_put_netievent(isc__networker_t *worker, void *ievent) { isc__networker_unref(worker); } -NETIEVENT_SOCKET_DEF(tcpclose); NETIEVENT_SOCKET_DEF(tcplisten); -NETIEVENT_SOCKET_DEF(tcppauseread); -NETIEVENT_SOCKET_DEF(tcpstartread); NETIEVENT_SOCKET_DEF(tcpstop); NETIEVENT_SOCKET_DEF(tlsclose); NETIEVENT_SOCKET_DEF(tlsconnect); NETIEVENT_SOCKET_DEF(tlsdobio); -NETIEVENT_SOCKET_DEF(tlsstartread); -NETIEVENT_SOCKET_HANDLE_DEF(tlscancel); NETIEVENT_SOCKET_DEF(udplisten); NETIEVENT_SOCKET_DEF(udpstop); NETIEVENT_SOCKET_HANDLE_DEF(udpcancel); @@ -562,15 +549,12 @@ NETIEVENT_SOCKET_DEF(httpclose); NETIEVENT_SOCKET_HTTP_EPS_DEF(httpendpoints); #endif /* HAVE_LIBNGHTTP2 */ -NETIEVENT_SOCKET_REQ_DEF(tcpconnect); -NETIEVENT_SOCKET_REQ_DEF(tcpsend); NETIEVENT_SOCKET_REQ_DEF(tlssend); NETIEVENT_SOCKET_REQ_RESULT_DEF(connectcb); NETIEVENT_SOCKET_REQ_RESULT_DEF(readcb); NETIEVENT_SOCKET_REQ_RESULT_DEF(sendcb); NETIEVENT_SOCKET_DEF(detach); -NETIEVENT_SOCKET_HANDLE_DEF(tcpcancel); NETIEVENT_SOCKET_QUOTA_DEF(tcpaccept); @@ -802,6 +786,15 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG) { } } +void +isc_nmhandle_close(isc_nmhandle_t *handle) { + REQUIRE(VALID_NMHANDLE(handle)); + REQUIRE(VALID_NMSOCK(handle->sock)); + + isc__nmsocket_clearcb(handle->sock); + isc__nm_failed_read_cb(handle->sock, ISC_R_EOF, false); +} + void isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) { REQUIRE(sock->parent == NULL); @@ -833,7 +826,7 @@ isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) { * * If it's a regular socket we may need to close it. */ - if (!atomic_load(&sock->closed)) { + if (!atomic_load(&sock->closing) && !atomic_load(&sock->closed)) { switch (sock->type) { case isc_nm_udpsocket: isc__nm_udp_close(sock); @@ -850,7 +843,7 @@ isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) { #if HAVE_LIBNGHTTP2 case isc_nm_tlssocket: isc__nm_tls_close(sock); - break; + return; case isc_nm_httpsocket: isc__nm_http_close(sock); return; @@ -1003,7 +996,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker, sock, isc_refcount_current(&sock->references)); atomic_init(&sock->active, true); - atomic_init(&sock->readpaused, false); atomic_init(&sock->closing, false); atomic_init(&sock->listening, 0); atomic_init(&sock->closed, 0); @@ -1406,6 +1398,11 @@ isc__nm_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result, bool async) { case isc_nm_tlsdnssocket: isc__nm_tlsdns_failed_read_cb(sock, result, async); return; +#ifdef HAVE_LIBNGHTTP2 + case isc_nm_tlssocket: + isc__nm_tls_failed_read_cb(sock, result); + return; +#endif default: UNREACHABLE(); } @@ -1486,7 +1483,7 @@ isc__nmsocket_readtimeout_cb(uv_timer_t *timer) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_tid()); - REQUIRE(atomic_load(&sock->reading)); + REQUIRE(sock->reading); if (atomic_load(&sock->client)) { uv_timer_stop(timer); @@ -1643,7 +1640,7 @@ isc__nm_start_reading(isc_nmsocket_t *sock) { isc_result_t result = ISC_R_SUCCESS; int r; - if (atomic_load(&sock->reading)) { + if (sock->reading) { return (ISC_R_SUCCESS); } @@ -1670,7 +1667,7 @@ isc__nm_start_reading(isc_nmsocket_t *sock) { if (r != 0) { result = isc_uverr2result(r); } else { - atomic_store(&sock->reading, true); + sock->reading = true; } return (result); @@ -1680,7 +1677,7 @@ void isc__nm_stop_reading(isc_nmsocket_t *sock) { int r; - if (!atomic_load(&sock->reading)) { + if (!sock->reading) { return; } @@ -1698,7 +1695,7 @@ isc__nm_stop_reading(isc_nmsocket_t *sock) { default: UNREACHABLE(); } - atomic_store(&sock->reading, false); + sock->reading = false; } bool @@ -2046,58 +2043,30 @@ isc_nm_cancelread(isc_nmhandle_t *handle) { case isc_nm_udpsocket: isc__nm_udp_cancelread(handle); break; - case isc_nm_tcpsocket: - isc__nm_tcp_cancelread(handle); - break; case isc_nm_tcpdnssocket: isc__nm_tcpdns_cancelread(handle); break; case isc_nm_tlsdnssocket: isc__nm_tlsdns_cancelread(handle); break; -#if HAVE_LIBNGHTTP2 - case isc_nm_tlssocket: - isc__nm_tls_cancelread(handle); - break; -#endif default: UNREACHABLE(); } } void -isc_nm_pauseread(isc_nmhandle_t *handle) { +isc_nm_read_stop(isc_nmhandle_t *handle) { REQUIRE(VALID_NMHANDLE(handle)); isc_nmsocket_t *sock = handle->sock; switch (sock->type) { case isc_nm_tcpsocket: - isc__nm_tcp_pauseread(handle); + isc__nm_tcp_read_stop(handle); break; #if HAVE_LIBNGHTTP2 case isc_nm_tlssocket: - isc__nm_tls_pauseread(handle); - break; -#endif - default: - UNREACHABLE(); - } -} - -void -isc_nm_resumeread(isc_nmhandle_t *handle) { - REQUIRE(VALID_NMHANDLE(handle)); - - isc_nmsocket_t *sock = handle->sock; - - switch (sock->type) { - case isc_nm_tcpsocket: - isc__nm_tcp_resumeread(handle); - break; -#if HAVE_LIBNGHTTP2 - case isc_nm_tlssocket: - isc__nm_tls_resumeread(handle); + isc__nm_tls_read_stop(handle); break; #endif default: diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 1fd4bc3e6e..19a49f884f 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -55,9 +55,6 @@ can_log_tcp_quota(void) { static isc_result_t tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req); -static void -tcp_close_direct(isc_nmsocket_t *sock); - static isc_result_t tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req); static void @@ -173,36 +170,6 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { return (ISC_R_SUCCESS); } -void -isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) { - isc__netievent_tcpconnect_t *ievent = - (isc__netievent_tcpconnect_t *)ev0; - isc_nmsocket_t *sock = ievent->sock; - isc__nm_uvreq_t *req = ievent->req; - isc_result_t result = ISC_R_SUCCESS; - - UNUSED(worker); - - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->type == isc_nm_tcpsocket); - REQUIRE(sock->parent == NULL); - REQUIRE(sock->tid == isc_tid()); - - result = tcp_connect_direct(sock, req); - if (result != ISC_R_SUCCESS) { - atomic_store(&sock->active, false); - if (sock->fd != (uv_os_sock_t)(-1)) { - isc__nm_tcp_close(sock); - } - isc__nm_connectcb(sock, req, result, true); - } - - /* - * The sock is now attached to the handle. - */ - isc__nmsocket_detach(&sock); -} - static void tcp_connect_cb(uv_connect_t *uvreq, int status) { isc_result_t result = ISC_R_UNSET; @@ -295,7 +262,6 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer, isc_nm_cb_t cb, void *cbarg, unsigned int timeout) { isc_result_t result = ISC_R_SUCCESS; isc_nmsocket_t *sock = NULL; - isc__netievent_tcpconnect_t *ievent = NULL; isc__nm_uvreq_t *req = NULL; sa_family_t sa_family; isc__networker_t *worker = &mgr->workers[isc_tid()]; @@ -335,14 +301,21 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer, (void)isc__nm_socket_min_mtu(sock->fd, sa_family); (void)isc__nm_socket_tcp_maxseg(sock->fd, NM_MAXSEG); - ievent = isc__nm_get_netievent_tcpconnect(worker, sock, req); - atomic_store(&sock->active, true); - isc__nm_async_tcpconnect(&mgr->workers[sock->tid], - (isc__netievent_t *)ievent); - isc__nm_put_netievent_tcpconnect(worker, ievent); - atomic_store(&sock->active, true); + result = tcp_connect_direct(sock, req); + if (result != ISC_R_SUCCESS) { + atomic_store(&sock->active, false); + if (sock->fd != (uv_os_sock_t)(-1)) { + isc__nm_tcp_close(sock); + } + isc__nm_connectcb(sock, req, result, true); + } + + /* + * The sock is now attached to the handle. + */ + isc__nmsocket_detach(&sock); } static uv_os_sock_t @@ -670,6 +643,26 @@ isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) { stop_tcp_parent(sock); } +static void +tcp_stop_cb(uv_handle_t *handle) { + isc_nmsocket_t *sock = uv_handle_get_data(handle); + uv_handle_set_data(handle, NULL); + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->tid == isc_tid()); + REQUIRE(atomic_load(&sock->closing)); + REQUIRE(sock->type == isc_nm_tcpsocket); + + RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closed, + &(bool){ false }, true)); + + isc__nm_incstats(sock, STATID_CLOSE); + + atomic_store(&sock->listening, false); + + isc__nmsocket_detach(&sock); +} + void isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tcpstop_t *ievent = (isc__netievent_tcpstop_t *)ev0; @@ -739,12 +732,15 @@ destroy: void isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { + isc_nmsocket_t *sock; + isc_nm_t *netmgr; + isc_result_t result; + REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); - isc_nmsocket_t *sock = handle->sock; - isc__netievent_tcpstartread_t *ievent = NULL; - isc_nm_t *netmgr = sock->worker->netmgr; + sock = handle->sock; + netmgr = sock->worker->netmgr; REQUIRE(sock->type == isc_nm_tcpsocket); REQUIRE(sock->statichandle == handle); @@ -752,54 +748,34 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { sock->recv_cb = cb; sock->recv_cbarg = cbarg; sock->recv_read = true; + + /* Initialize the timer */ if (sock->read_timeout == 0) { sock->read_timeout = (atomic_load(&sock->keepalive) ? atomic_load(&netmgr->keepalive) : atomic_load(&netmgr->idle)); } - ievent = isc__nm_get_netievent_tcpstartread(sock->worker, sock); - - /* - * This MUST be done asynchronously, no matter which thread we're - * in. The callback function for isc_nm_read() often calls - * isc_nm_read() again; if we tried to do that synchronously - * we'd clash in processbuffer() and grow the stack indefinitely. - */ - isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent); - - return; -} - -void -isc__nm_async_tcpstartread(isc__networker_t *worker, isc__netievent_t *ev0) { - isc__netievent_tcpstartread_t *ievent = - (isc__netievent_tcpstartread_t *)ev0; - isc_nmsocket_t *sock = ievent->sock; - isc_result_t result; - - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->tid == isc_tid()); - UNUSED(worker); - if (isc__nmsocket_closing(sock)) { result = ISC_R_CANCELED; - } else { - result = isc__nm_start_reading(sock); + goto failure; } + result = isc__nm_start_reading(sock); if (result != ISC_R_SUCCESS) { - atomic_store(&sock->reading, true); - isc__nm_tcp_failed_read_cb(sock, result); - return; + goto failure; } isc__nmsocket_timer_start(sock); + + return; +failure: + sock->reading = true; + isc__nm_tcp_failed_read_cb(sock, result); } void -isc__nm_tcp_pauseread(isc_nmhandle_t *handle) { - isc__netievent_tcppauseread_t *ievent = NULL; +isc__nm_tcp_read_stop(isc_nmhandle_t *handle) { isc_nmsocket_t *sock = NULL; REQUIRE(VALID_NMHANDLE(handle)); @@ -808,61 +784,10 @@ isc__nm_tcp_pauseread(isc_nmhandle_t *handle) { REQUIRE(VALID_NMSOCK(sock)); - if (!atomic_compare_exchange_strong(&sock->readpaused, &(bool){ false }, - true)) { - return; - } - - ievent = isc__nm_get_netievent_tcppauseread(sock->worker, sock); - - isc__nm_maybe_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent); - - return; -} - -void -isc__nm_async_tcppauseread(isc__networker_t *worker, isc__netievent_t *ev0) { - isc__netievent_tcppauseread_t *ievent = - (isc__netievent_tcppauseread_t *)ev0; - isc_nmsocket_t *sock = ievent->sock; - - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->tid == isc_tid()); - UNUSED(worker); - isc__nmsocket_timer_stop(sock); isc__nm_stop_reading(sock); -} -void -isc__nm_tcp_resumeread(isc_nmhandle_t *handle) { - REQUIRE(VALID_NMHANDLE(handle)); - REQUIRE(VALID_NMSOCK(handle->sock)); - - isc__netievent_tcpstartread_t *ievent = NULL; - isc_nmsocket_t *sock = handle->sock; - - REQUIRE(sock->tid == isc_tid()); - - if (sock->recv_cb == NULL) { - /* We are no longer reading */ - return; - } - - if (!isc__nmsocket_active(sock)) { - atomic_store(&sock->reading, true); - isc__nm_tcp_failed_read_cb(sock, ISC_R_CANCELED); - return; - } - - if (!atomic_compare_exchange_strong(&sock->readpaused, &(bool){ true }, - false)) { - return; - } - - ievent = isc__nm_get_netievent_tcpstartread(sock->worker, sock); - - isc__nm_maybe_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent); + return; } void @@ -873,7 +798,7 @@ isc__nm_tcp_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_tid()); - REQUIRE(atomic_load(&sock->reading)); + REQUIRE(sock->reading); REQUIRE(buf != NULL); netmgr = sock->worker->netmgr; @@ -912,7 +837,7 @@ isc__nm_tcp_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { isc__nm_readcb(sock, req, ISC_R_SUCCESS); /* The readcb could have paused the reading */ - if (atomic_load(&sock->reading)) { + if (sock->reading) { /* The timer will be updated */ isc__nmsocket_timer_restart(sock); } @@ -938,10 +863,12 @@ quota_accept_cb(isc_quota_t *quota, void *sock0) { REQUIRE(VALID_NMSOCK(sock)); /* - * Create a tcpaccept event and pass it using the async channel. + * Create a tcpaccept event and pass it using the async channel. This + * needs to be asynchronous, because the quota might have been released + * by a different child socket. */ ievent = isc__nm_get_netievent_tcpaccept(sock->worker, sock, quota); - isc__nm_maybe_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent); + isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent); } /* @@ -1048,8 +975,6 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { csock->read_timeout = atomic_load(&csock->worker->netmgr->init); - atomic_fetch_add(&ssock->parent->active_child_connections, 1); - /* * The acceptcb needs to attach to the handle if it wants to keep the * connection alive @@ -1082,10 +1007,12 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, const isc_region_t *region, REQUIRE(VALID_NMSOCK(handle->sock)); isc_nmsocket_t *sock = handle->sock; - isc__netievent_tcpsend_t *ievent = NULL; + isc_result_t result; isc__nm_uvreq_t *uvreq = NULL; + isc_nm_t *netmgr = sock->worker->netmgr; REQUIRE(sock->type == isc_nm_tcpsocket); + REQUIRE(sock->tid == isc_tid()); uvreq = isc__nm_uvreq_get(sock->worker, sock); uvreq->uvbuf.base = (char *)region->base; @@ -1096,8 +1023,17 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, const isc_region_t *region, uvreq->cb.send = cb; uvreq->cbarg = cbarg; - ievent = isc__nm_get_netievent_tcpsend(sock->worker, sock, uvreq); - isc__nm_maybe_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent); + if (sock->write_timeout == 0) { + sock->write_timeout = (atomic_load(&sock->keepalive) + ? atomic_load(&netmgr->keepalive) + : atomic_load(&netmgr->idle)); + } + + result = tcp_send_direct(sock, uvreq); + if (result != ISC_R_SUCCESS) { + isc__nm_incstats(sock, STATID_SENDFAIL); + isc__nm_failed_send_cb(sock, uvreq, result); + } return; } @@ -1124,34 +1060,6 @@ tcp_send_cb(uv_write_t *req, int status) { isc__nm_sendcb(sock, uvreq, ISC_R_SUCCESS, false); } -/* - * Handle 'tcpsend' async event - send a packet on the socket - */ -void -isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0) { - isc_result_t result; - isc__netievent_tcpsend_t *ievent = (isc__netievent_tcpsend_t *)ev0; - isc_nmsocket_t *sock = ievent->sock; - isc__nm_uvreq_t *uvreq = ievent->req; - isc_nm_t *netmgr = sock->worker->netmgr; - - REQUIRE(sock->type == isc_nm_tcpsocket); - REQUIRE(sock->tid == isc_tid()); - UNUSED(worker); - - if (sock->write_timeout == 0) { - sock->write_timeout = (atomic_load(&sock->keepalive) - ? atomic_load(&netmgr->keepalive) - : atomic_load(&netmgr->idle)); - } - - result = tcp_send_direct(sock, uvreq); - if (result != ISC_R_SUCCESS) { - isc__nm_incstats(sock, STATID_SENDFAIL); - isc__nm_failed_send_cb(sock, uvreq, result); - } -} - static isc_result_t tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { REQUIRE(VALID_NMSOCK(sock)); @@ -1180,38 +1088,14 @@ tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { return (ISC_R_SUCCESS); } -static void -tcp_stop_cb(uv_handle_t *handle) { - isc_nmsocket_t *sock = uv_handle_get_data(handle); - uv_handle_set_data(handle, NULL); - - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->tid == isc_tid()); - REQUIRE(atomic_load(&sock->closing)); - REQUIRE(sock->type == isc_nm_tcpsocket); - - if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false }, - true)) { - UNREACHABLE(); - } - - isc__nm_incstats(sock, STATID_CLOSE); - - atomic_store(&sock->listening, false); - - isc__nmsocket_detach(&sock); -} - static void tcp_close_sock(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_tid()); REQUIRE(atomic_load(&sock->closing)); - if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false }, - true)) { - UNREACHABLE(); - } + RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closed, + &(bool){ false }, true)); isc__nm_incstats(sock, STATID_CLOSE); @@ -1232,22 +1116,16 @@ tcp_close_cb(uv_handle_t *handle) { tcp_close_sock(sock); } -static void -tcp_close_direct(isc_nmsocket_t *sock) { +void +isc__nm_tcp_close(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_tcpsocket); + REQUIRE(!isc__nmsocket_active(sock)); REQUIRE(sock->tid == isc_tid()); - REQUIRE(atomic_load(&sock->closing)); REQUIRE(sock->parent == NULL); - if (sock->server != NULL) { - REQUIRE(VALID_NMSOCK(sock->server)); - REQUIRE(VALID_NMSOCK(sock->server->parent)); - if (sock->server->parent != NULL) { - atomic_fetch_sub( - &sock->server->parent->active_child_connections, - 1); - } - } + RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closing, + &(bool){ false }, true)); if (sock->quota != NULL) { isc_quota_detach(&sock->quota); @@ -1280,44 +1158,6 @@ tcp_close_direct(isc_nmsocket_t *sock) { } } -void -isc__nm_tcp_close(isc_nmsocket_t *sock) { - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->type == isc_nm_tcpsocket); - REQUIRE(!isc__nmsocket_active(sock)); - - if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, - true)) { - return; - } - - if (sock->tid == isc_tid()) { - tcp_close_direct(sock); - } else { - /* - * We need to create an event and pass it using async channel - */ - isc__netievent_tcpclose_t *ievent = - isc__nm_get_netievent_tcpclose(sock->worker, sock); - - isc__nm_enqueue_ievent(sock->worker, - (isc__netievent_t *)ievent); - } -} - -void -isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0) { - isc__netievent_tcpclose_t *ievent = (isc__netievent_tcpclose_t *)ev0; - isc_nmsocket_t *sock = ievent->sock; - - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->tid == isc_tid()); - - UNUSED(worker); - - tcp_close_direct(sock); -} - static void tcp_close_connect_cb(uv_handle_t *handle) { isc_nmsocket_t *sock = uv_handle_get_data(handle); @@ -1375,44 +1215,3 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { isc__nmsocket_prep_destroy(sock); } } - -void -isc__nm_tcp_cancelread(isc_nmhandle_t *handle) { - isc_nmsocket_t *sock = NULL; - isc__netievent_tcpcancel_t *ievent = NULL; - - REQUIRE(VALID_NMHANDLE(handle)); - - sock = handle->sock; - - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->type == isc_nm_tcpsocket); - - ievent = isc__nm_get_netievent_tcpcancel(sock->worker, sock, handle); - isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent); -} - -void -isc__nm_async_tcpcancel(isc__networker_t *worker, isc__netievent_t *ev0) { - isc__netievent_tcpcancel_t *ievent = (isc__netievent_tcpcancel_t *)ev0; - isc_nmsocket_t *sock = ievent->sock; - - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->tid == isc_tid()); - UNUSED(worker); - - uv_timer_stop(&sock->read_timer); - - isc__nm_tcp_failed_read_cb(sock, ISC_R_EOF); -} - -int_fast32_t -isc__nm_tcp_listener_nactive(isc_nmsocket_t *listener) { - int_fast32_t nactive; - - REQUIRE(VALID_NMSOCK(listener)); - - nactive = atomic_load(&listener->active_child_connections); - INSIST(nactive >= 0); - return (nactive); -} diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 47695e1528..a63eba001e 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -725,8 +725,10 @@ isc__nm_tcpdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { ievent = isc__nm_get_netievent_tcpdnsread(sock->worker, sock); /* - * This MUST be done asynchronously, no matter which thread we're - * in. The callback function for isc_nm_read() often calls + * FIXME: This MUST be done asynchronously, ~~no matter which thread + * we're in.~~ ,only when there's existing data on the socket. + + * The callback function for isc_nm_read() often calls * isc_nm_read() again; if we tried to do that synchronously * we'd clash in processbuffer() and grow the stack indefinitely. */ @@ -754,7 +756,7 @@ isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0) { } if (result != ISC_R_SUCCESS) { - atomic_store(&sock->reading, true); + sock->reading = true; isc__nm_failed_read_cb(sock, result, false); } } @@ -870,7 +872,7 @@ isc__nm_tcpdns_read_cb(uv_stream_t *stream, ssize_t nread, REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_tid()); - REQUIRE(atomic_load(&sock->reading)); + REQUIRE(sock->reading); REQUIRE(buf != NULL); if (isc__nmsocket_closing(sock)) { diff --git a/lib/isc/netmgr/tlsdns.c b/lib/isc/netmgr/tlsdns.c index 73826ca9c5..960bcefac3 100644 --- a/lib/isc/netmgr/tlsdns.c +++ b/lib/isc/netmgr/tlsdns.c @@ -949,7 +949,7 @@ isc__nm_async_tlsdnsread(isc__networker_t *worker, isc__netievent_t *ev0) { REQUIRE(sock->tid == isc_tid()); if (isc__nmsocket_closing(sock)) { - atomic_store(&sock->reading, true); + sock->reading = true; isc__nm_failed_read_cb(sock, ISC_R_CANCELED, false); return; } @@ -1468,7 +1468,7 @@ isc__nm_tlsdns_read_cb(uv_stream_t *stream, ssize_t nread, REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_tid()); - REQUIRE(atomic_load(&sock->reading)); + REQUIRE(sock->reading); REQUIRE(buf != NULL); if (isc__nmsocket_closing(sock)) { diff --git a/lib/isc/netmgr/tlsstream.c b/lib/isc/netmgr/tlsstream.c index f63bd479fe..a90485a066 100644 --- a/lib/isc/netmgr/tlsstream.c +++ b/lib/isc/netmgr/tlsstream.c @@ -160,9 +160,8 @@ tls_senddone(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) { isc_mem_put(handle->sock->worker->mctx, send_req, sizeof(*send_req)); tlssock->tlsstream.nsending--; - if (finish && eresult == ISC_R_SUCCESS) { - tlssock->tlsstream.reading = false; - isc_nm_cancelread(handle); + if (finish && eresult == ISC_R_SUCCESS && tlssock->reading) { + tls_failed_read_cb(tlssock, ISC_R_EOF); } else if (eresult == ISC_R_SUCCESS) { tls_do_bio(tlssock, NULL, NULL, false); } else if (eresult != ISC_R_SUCCESS && @@ -203,16 +202,8 @@ tls_failed_read_cb(isc_nmsocket_t *sock, const isc_result_t result) { isc__nmsocket_clearcb(sock); isc_nmhandle_detach(&handle); } else if (sock->recv_cb != NULL && sock->statichandle != NULL) { - isc__nm_uvreq_t *req = NULL; - INSIST(VALID_NMHANDLE(sock->statichandle)); - req = isc__nm_uvreq_get(sock->worker, sock); - req->cb.recv = sock->recv_cb; - req->cbarg = sock->recv_cbarg; - isc_nmhandle_attach(sock->statichandle, &req->handle); - if (result != ISC_R_TIMEDOUT) { - isc__nmsocket_clearcb(sock); - } - isc__nm_readcb(sock, req, result); + sock->recv_cb(sock->statichandle, result, NULL, + sock->recv_cbarg); if (result == ISC_R_TIMEDOUT && (sock->outerhandle == NULL || isc__nmsocket_timer_running(sock->outerhandle->sock))) @@ -226,6 +217,16 @@ tls_failed_read_cb(isc_nmsocket_t *sock, const isc_result_t result) { } } +void +isc__nm_tls_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { + if (!inactive(sock) && sock->tlsstream.state == TLS_IO) { + tls_do_bio(sock, NULL, NULL, true); + } else if (sock->reading) { + sock->reading = false; + tls_failed_read_cb(sock, result); + } +} + static void async_tls_do_bio(isc_nmsocket_t *sock) { isc__netievent_tlsdobio_t *ievent = @@ -386,14 +387,16 @@ tls_do_bio(isc_nmsocket_t *sock, isc_region_t *received_data, int rv = 0; size_t len = 0; int saved_errno = 0; + bool was_reading; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_tid()); + was_reading = sock->reading; /* We will resume read if TLS layer wants us to */ - if (sock->tlsstream.reading && sock->outerhandle) { + if (sock->reading && sock->outerhandle) { REQUIRE(VALID_NMHANDLE(sock->outerhandle)); - isc_nm_pauseread(sock->outerhandle); + isc_nm_read_stop(sock->outerhandle); } if (sock->tlsstream.state == TLS_INIT) { @@ -468,8 +471,7 @@ tls_do_bio(isc_nmsocket_t *sock, isc_region_t *received_data, /* Decrypt and pass data from network to client */ if (sock->tlsstream.state >= TLS_IO && sock->recv_cb != NULL && - !atomic_load(&sock->readpaused) && - sock->statichandle != NULL && !finish) + was_reading && sock->statichandle != NULL && !finish) { uint8_t recv_buf[TLS_BUF_SIZE]; INSIST(sock->tlsstream.state > TLS_HANDSHAKE); @@ -495,7 +497,7 @@ tls_do_bio(isc_nmsocket_t *sock, isc_region_t *received_data, if (sock->statichandle == NULL) { finish = true; break; - } else if (atomic_load(&sock->readpaused)) { + } else if (!sock->reading) { /* * Reading has been paused from withing * the context of read callback - stop @@ -553,19 +555,14 @@ tls_do_bio(isc_nmsocket_t *sock, isc_region_t *received_data, return; case SSL_ERROR_WANT_READ: if (tls_try_to_close_unused_socket(sock) || - sock->outerhandle == NULL || atomic_load(&sock->readpaused)) - { + sock->outerhandle == NULL) { return; } INSIST(VALID_NMHANDLE(sock->outerhandle)); - if (sock->tlsstream.reading) { - isc_nm_resumeread(sock->outerhandle); - } else if (sock->tlsstream.state == TLS_HANDSHAKE) { - sock->tlsstream.reading = true; - isc_nm_read(sock->outerhandle, tls_readcb, sock); - } + sock->reading = true; + isc_nm_read(sock->outerhandle, tls_readcb, sock); return; default: result = tls_error_to_result(tls_status, sock->tlsstream.state, @@ -811,22 +808,8 @@ isc__nm_tls_send(isc_nmhandle_t *handle, const isc_region_t *region, isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent); } -void -isc__nm_async_tlsstartread(isc__networker_t *worker, isc__netievent_t *ev0) { - isc__netievent_tlsstartread_t *ievent = - (isc__netievent_tlsstartread_t *)ev0; - isc_nmsocket_t *sock = ievent->sock; - - REQUIRE(sock->tid == isc_tid()); - - UNUSED(worker); - - tls_do_bio(sock, NULL, NULL, false); -} - void isc__nm_tls_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { - isc__netievent_tlsstartread_t *ievent = NULL; isc_nmsocket_t *sock = NULL; REQUIRE(VALID_NMHANDLE(handle)); @@ -835,7 +818,6 @@ isc__nm_tls_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->statichandle == handle); REQUIRE(sock->tid == isc_tid()); - REQUIRE(sock->recv_cb == NULL); if (inactive(sock)) { cb(handle, ISC_R_NOTCONNECTED, NULL, cbarg); @@ -845,37 +827,22 @@ isc__nm_tls_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { sock->recv_cb = cb; sock->recv_cbarg = cbarg; - ievent = isc__nm_get_netievent_tlsstartread(sock->worker, sock); - isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent); -} - -void -isc__nm_tls_pauseread(isc_nmhandle_t *handle) { - REQUIRE(VALID_NMHANDLE(handle)); - REQUIRE(VALID_NMSOCK(handle->sock)); - - if (atomic_compare_exchange_strong(&handle->sock->readpaused, - &(bool){ false }, true)) - { - if (handle->sock->outerhandle != NULL) { - isc_nm_pauseread(handle->sock->outerhandle); - } + if (sock->reading) { + return; } + + tls_do_bio(sock, NULL, NULL, false); } void -isc__nm_tls_resumeread(isc_nmhandle_t *handle) { +isc__nm_tls_read_stop(isc_nmhandle_t *handle) { REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); - if (!atomic_compare_exchange_strong(&handle->sock->readpaused, - &(bool){ true }, false)) - { - if (inactive(handle->sock)) { - return; - } + handle->sock->reading = false; - async_tls_do_bio(handle->sock); + if (handle->sock->outerhandle != NULL) { + isc_nm_read_stop(handle->sock->outerhandle); } } @@ -888,8 +855,10 @@ tls_close_direct(isc_nmsocket_t *sock) { * external references, we can close everything. */ if (sock->outerhandle != NULL) { - isc_nm_pauseread(sock->outerhandle); - isc__nmsocket_clearcb(sock->outerhandle->sock); + sock->reading = false; + isc_nm_read_stop(sock->outerhandle); + + isc_nmhandle_close(sock->outerhandle); isc_nmhandle_detach(&sock->outerhandle); } @@ -905,8 +874,6 @@ tls_close_direct(isc_nmsocket_t *sock) { void isc__nm_tls_close(isc_nmsocket_t *sock) { - isc__netievent_tlsclose_t *ievent = NULL; - REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_tlssocket); @@ -915,8 +882,15 @@ isc__nm_tls_close(isc_nmsocket_t *sock) { return; } - ievent = isc__nm_get_netievent_tlsclose(sock->worker, sock); - isc__nm_maybe_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent); + if (sock->tid == isc_tid()) { + /* no point in attempting to make the call asynchronous */ + tls_close_direct(sock); + } else { + isc__netievent_tlsclose_t *ievent = + isc__nm_get_netievent_tlsclose(sock->worker, sock); + isc__nm_enqueue_ievent(sock->worker, + (isc__netievent_t *)ievent); + } } void @@ -1052,49 +1026,6 @@ error: isc__nmsocket_detach(&tlssock); } -static void -tls_cancelread(isc_nmsocket_t *sock) { - if (!inactive(sock) && sock->tlsstream.state == TLS_IO) { - tls_do_bio(sock, NULL, NULL, true); - } else if (sock->outerhandle != NULL) { - sock->tlsstream.reading = false; - isc_nm_cancelread(sock->outerhandle); - } -} - -void -isc__nm_tls_cancelread(isc_nmhandle_t *handle) { - isc_nmsocket_t *sock = NULL; - isc__netievent_tlscancel_t *ievent = NULL; - - REQUIRE(VALID_NMHANDLE(handle)); - - sock = handle->sock; - - REQUIRE(sock->type == isc_nm_tlssocket); - - if (sock->tid == isc_tid()) { - tls_cancelread(sock); - } else { - ievent = isc__nm_get_netievent_tlscancel(sock->worker, sock, - handle); - isc__nm_enqueue_ievent(sock->worker, - (isc__netievent_t *)ievent); - } -} - -void -isc__nm_async_tlscancel(isc__networker_t *worker, isc__netievent_t *ev0) { - isc__netievent_tlscancel_t *ievent = (isc__netievent_tlscancel_t *)ev0; - isc_nmsocket_t *sock = ievent->sock; - - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->tid == isc_tid()); - - UNUSED(worker); - tls_cancelread(sock); -} - void isc__nm_async_tlsdobio(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tlsdobio_t *ievent = (isc__netievent_tlsdobio_t *)ev0; diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index c87c87e0df..4706c74a38 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -102,7 +102,7 @@ start_udp_child(isc_nm_t *mgr, isc_sockaddr_t *iface, isc_nmsocket_t *sock, csock->recv_cb = sock->recv_cb; csock->recv_cbarg = sock->recv_cbarg; - atomic_init(&csock->reading, true); + csock->reading = true; if (mgr->load_balance_sockets) { UNUSED(fd); @@ -968,7 +968,7 @@ isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { return; fail: - atomic_store(&sock->reading, true); /* required by the next call */ + sock->reading = true; /* required by the next call */ isc__nm_failed_read_cb(sock, result, false); } diff --git a/lib/isccc/ccmsg.c b/lib/isccc/ccmsg.c index 9ee48ab9c0..1ef5eb6ea2 100644 --- a/lib/isccc/ccmsg.c +++ b/lib/isccc/ccmsg.c @@ -115,7 +115,7 @@ recv_data(isc_nmhandle_t *handle, isc_result_t eresult, isc_region_t *region, return; done: - isc_nm_pauseread(handle); + isc_nm_read_stop(handle); ccmsg->cb(handle, ccmsg->result, ccmsg->cbarg); } @@ -155,12 +155,8 @@ isccc_ccmsg_readmessage(isccc_ccmsg_t *ccmsg, isc_nm_cb_t cb, void *cbarg) { ccmsg->result = ISC_R_UNEXPECTED; /* unknown right now */ ccmsg->length_received = false; - if (ccmsg->reading) { - isc_nm_resumeread(ccmsg->handle); - } else { - isc_nm_read(ccmsg->handle, recv_data, ccmsg); - ccmsg->reading = true; - } + isc_nm_read(ccmsg->handle, recv_data, ccmsg); + ccmsg->reading = true; } void @@ -168,7 +164,7 @@ isccc_ccmsg_cancelread(isccc_ccmsg_t *ccmsg) { REQUIRE(VALID_CCMSG(ccmsg)); if (ccmsg->reading) { - isc_nm_cancelread(ccmsg->handle); + isc_nm_read_stop(ccmsg->handle); ccmsg->reading = false; } } diff --git a/tests/isc/netmgr_common.c b/tests/isc/netmgr_common.c index 7079c41861..f77ed5d761 100644 --- a/tests/isc/netmgr_common.c +++ b/tests/isc/netmgr_common.c @@ -94,6 +94,7 @@ atomic_bool check_listener_quota = false; bool allow_send_back = false; bool noanswer = false; bool stream_use_TLS = false; +bool stream = false; isc_nm_recv_cb_t connect_readcb = NULL; @@ -314,7 +315,11 @@ connect_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) { if (eresult != ISC_R_SUCCESS) { /* Send failed, we need to stop reading too */ - isc_nm_cancelread(handle); + if (stream) { + isc_nm_read_stop(handle); + } else { + isc_nm_cancelread(handle); + } goto unref; } @@ -369,6 +374,7 @@ connect_read_cb(isc_nmhandle_t *handle, isc_result_t eresult, case ISC_R_SHUTTINGDOWN: case ISC_R_CANCELED: case ISC_R_CONNECTIONRESET: + case ISC_R_CONNREFUSED: break; default: fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle, @@ -378,6 +384,9 @@ connect_read_cb(isc_nmhandle_t *handle, isc_result_t eresult, isc_refcount_decrement(&active_creads); + if (stream) { + isc_nm_read_stop(handle); + } isc_nmhandle_detach(&handle); } @@ -648,6 +657,8 @@ connect_success_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) { UNUSED(handle); UNUSED(cbarg); + F(); + isc_refcount_decrement(&active_cconnects); assert_int_equal(eresult, ISC_R_SUCCESS); @@ -693,11 +704,8 @@ noresponse_readcb(isc_nmhandle_t *handle, isc_result_t eresult, UNUSED(region); UNUSED(cbarg); - if (eresult == ISC_R_EOF) { - eresult = ISC_R_CONNECTIONRESET; - } - - assert_int_equal(eresult, ISC_R_CONNECTIONRESET); + assert_true(eresult == ISC_R_CANCELED || + eresult == ISC_R_CONNECTIONRESET || eresult == ISC_R_EOF); isc_refcount_decrement(&active_creads); diff --git a/tests/isc/netmgr_common.h b/tests/isc/netmgr_common.h index 0d85a791f8..17eb27b041 100644 --- a/tests/isc/netmgr_common.h +++ b/tests/isc/netmgr_common.h @@ -118,6 +118,7 @@ extern atomic_bool check_listener_quota; extern bool allow_send_back; extern bool noanswer; extern bool stream_use_TLS; +extern bool stream; extern isc_nm_recv_cb_t connect_readcb; diff --git a/tests/isc/tcp_test.c b/tests/isc/tcp_test.c index 31a0b04163..de25a6be8e 100644 --- a/tests/isc/tcp_test.c +++ b/tests/isc/tcp_test.c @@ -130,4 +130,12 @@ ISC_TEST_ENTRY_CUSTOM(tcp_recv_send_quota_sendback, stream_recv_send_setup, ISC_TEST_LIST_END -ISC_TEST_MAIN +static int +tcp_setup(void **state __attribute__((__unused__))) { + stream_use_TLS = false; + stream = true; + + return (0); +} + +ISC_TEST_MAIN_CUSTOM(tcp_setup, NULL) diff --git a/tests/isc/tls_test.c b/tests/isc/tls_test.c index fcb5187ce3..211dd6bae4 100644 --- a/tests/isc/tls_test.c +++ b/tests/isc/tls_test.c @@ -132,6 +132,7 @@ ISC_TEST_LIST_END static int tls_setup(void **state __attribute__((__unused__))) { stream_use_TLS = true; + stream = true; return (0); }