From e1a4572fd68f0b039594fbf8cfbdc957df07e198 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Sur=C3=BD?= Date: Fri, 24 Mar 2023 13:37:19 +0100 Subject: [PATCH 1/3] Refactor the use of atomics in netmgr Now that everything runs on their own loop and we don't cross the thread boundaries (with few exceptions), most of the atomic_bool variables used to track the socket state have been unatomicized because they are always accessed from the matching thread. The remaining few have been relaxed: a) the sock->active is now using acquire/release memory ordering; b) the various global limits are now using relaxed memory ordering - we don't really care about the synchronization for those. --- lib/isc/netmgr/http.c | 33 ++++---- lib/isc/netmgr/netmgr-int.h | 23 +++--- lib/isc/netmgr/netmgr.c | 149 ++++++++++++++++++------------------ lib/isc/netmgr/streamdns.c | 53 ++++++------- lib/isc/netmgr/tcp.c | 131 +++++++++++++++---------------- lib/isc/netmgr/tlsstream.c | 41 +++++----- lib/isc/netmgr/udp.c | 94 +++++++++-------------- 7 files changed, 243 insertions(+), 281 deletions(-) diff --git a/lib/isc/netmgr/http.c b/lib/isc/netmgr/http.c index 5aed9dfa36..0f7bd8f2d0 100644 --- a/lib/isc/netmgr/http.c +++ b/lib/isc/netmgr/http.c @@ -1479,7 +1479,7 @@ isc_nm_httpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer, sock->connect_timeout = timeout; sock->connect_cb = cb; sock->connect_cbarg = cbarg; - atomic_init(&sock->client, true); + sock->client = true; if (isc__nm_closing(worker)) { isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock); @@ -1609,7 +1609,7 @@ isc__nm_http_request(isc_nmhandle_t *handle, isc_region_t *region, REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); REQUIRE(handle->sock->tid == isc_tid()); - REQUIRE(atomic_load(&handle->sock->client)); + REQUIRE(handle->sock->client); REQUIRE(cb != NULL); @@ -1688,7 +1688,7 @@ find_server_request_handler(const char *request_path, REQUIRE(VALID_NMSOCK(serversocket)); - if (atomic_load(&serversocket->listening)) { + if (serversocket->listening) { handler = http_endpoints_find( request_path, http_get_listener_endpoints(serversocket, tid)); @@ -2069,7 +2069,7 @@ isc__nm_http_bad_request(isc_nmhandle_t *handle) { REQUIRE(VALID_NMSOCK(handle->sock)); sock = handle->sock; REQUIRE(sock->type == isc_nm_httpsocket); - REQUIRE(!atomic_load(&sock->client)); + REQUIRE(!sock->client); REQUIRE(VALID_HTTP2_SESSION(sock->h2.session)); (void)server_send_error_response(ISC_HTTP_ERROR_BAD_REQUEST, @@ -2467,7 +2467,7 @@ httplisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { REQUIRE(VALID_NMSOCK(httplistensock)); INSIST(httplistensock == httpserver); - if (atomic_load(&httplistensock->closing)) { + if (httplistensock->closing) { return (ISC_R_CANCELED); } @@ -2475,7 +2475,7 @@ httplisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { new_session(handle->sock->worker->mctx, NULL, &session); session->max_concurrent_streams = - atomic_load(&httplistensock->h2.max_concurrent_streams); + atomic_load_relaxed(&httplistensock->h2.max_concurrent_streams); initialize_nghttp2_server_session(session); handle->sock->h2.session = session; @@ -2523,7 +2523,7 @@ isc_nm_listenhttp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, } if (result != ISC_R_SUCCESS) { - atomic_store(&sock->closed, true); + sock->closed = true; isc__nmsocket_detach(&sock); return (result); } @@ -2536,7 +2536,7 @@ isc_nm_listenhttp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, isc__nmsocket_barrier_init(sock); atomic_init(&sock->rchildren, sock->nchildren); - atomic_store(&sock->listening, true); + sock->listening = true; *sockp = sock; return (ISC_R_SUCCESS); } @@ -2711,8 +2711,8 @@ http_close_direct(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); - atomic_store(&sock->closed, true); - atomic_store(&sock->active, false); + sock->closed = true; + atomic_store_release(&sock->active, false); session = sock->h2.session; if (session != NULL && session->sending == 0 && !session->reading) { @@ -2742,12 +2742,9 @@ isc__nm_http_close(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_httpsocket); REQUIRE(!isc__nmsocket_active(sock)); + REQUIRE(!sock->closing); - if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, - true)) - { - return; - } + sock->closing = true; if (sock->h2.session != NULL && sock->h2.session->closed && sock->tid == isc_tid()) @@ -2843,8 +2840,8 @@ server_call_failed_read_cb(isc_result_t result, isc_nmsocket_h2_t *next = ISC_LIST_NEXT(h2data, link); ISC_LIST_DEQUEUE(session->sstreams, h2data, link); /* Cleanup socket in place */ - atomic_store(&h2data->psock->active, false); - atomic_store(&h2data->psock->closed, true); + atomic_store_release(&h2data->psock->active, false); + h2data->psock->closed = true; isc__nmsocket_detach(&h2data->psock); h2data = next; @@ -2957,7 +2954,7 @@ isc__nm_http_set_max_streams(isc_nmsocket_t *listener, max_streams = max_concurrent_streams; } - atomic_store(&listener->h2.max_concurrent_streams, max_streams); + atomic_store_relaxed(&listener->h2.max_concurrent_streams, max_streams); } typedef struct http_endpoints_data { diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 1fdcec7528..fd7e38a20e 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -588,9 +588,12 @@ struct isc_nmsocket { * Socket is active if it's listening, working, etc. If it's * closing, then it doesn't make a sense, for example, to * push handles or reqs for reuse. + * + * We might be accessing sock->parent->active from a different + * thread, so .active has to be atomic. */ atomic_bool active; - atomic_bool destroying; + bool destroying; bool route_sock; @@ -600,20 +603,20 @@ struct isc_nmsocket { * If active==false but closed==false, that means the socket * is closing. */ - atomic_bool closing; - atomic_bool closed; - atomic_bool listening; - atomic_bool connecting; - atomic_bool connected; - atomic_bool accepting; + bool closing; + bool closed; + bool listening; + bool connecting; + bool connected; + bool accepting; bool reading; - atomic_bool timedout; + bool timedout; isc_refcount_t references; /*% * Established an outgoing connection, as client not server. */ - atomic_bool client; + bool client; /*% * The socket is processing read callback, this is guard to not read @@ -625,7 +628,7 @@ struct isc_nmsocket { * A TCP or TCPDNS socket has been set to use the keepalive * timeout instead of the default idle timeout. */ - atomic_bool keepalive; + bool keepalive; /*% * 'spare' handles for that can be reused to avoid allocations, for UDP. diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 9ba0874675..4912316058 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -147,8 +147,8 @@ static void netmgr_teardown(void *arg) { isc_nm_t *netmgr = (void *)arg; - if (atomic_compare_exchange_strong(&netmgr->shuttingdown, - &(bool){ false }, true)) + if (atomic_compare_exchange_strong_acq_rel(&netmgr->shuttingdown, + &(bool){ false }, true)) { isc__netmgr_log(netmgr, ISC_LOG_DEBUG(1), "Shutting down network manager"); @@ -318,7 +318,7 @@ void isc_nm_maxudp(isc_nm_t *mgr, uint32_t maxudp) { REQUIRE(VALID_NM(mgr)); - atomic_store(&mgr->maxudp, maxudp); + atomic_store_relaxed(&mgr->maxudp, maxudp); } void @@ -349,10 +349,10 @@ isc_nm_settimeouts(isc_nm_t *mgr, uint32_t init, uint32_t idle, uint32_t keepalive, uint32_t advertised) { REQUIRE(VALID_NM(mgr)); - atomic_store(&mgr->init, init); - atomic_store(&mgr->idle, idle); - atomic_store(&mgr->keepalive, keepalive); - atomic_store(&mgr->advertised, advertised); + atomic_store_relaxed(&mgr->init, init); + atomic_store_relaxed(&mgr->idle, idle); + atomic_store_relaxed(&mgr->keepalive, keepalive); + atomic_store_relaxed(&mgr->advertised, advertised); } void @@ -360,10 +360,10 @@ isc_nm_setnetbuffers(isc_nm_t *mgr, int32_t recv_tcp, int32_t send_tcp, int32_t recv_udp, int32_t send_udp) { REQUIRE(VALID_NM(mgr)); - atomic_store(&mgr->recv_tcp_buffer_size, recv_tcp); - atomic_store(&mgr->send_tcp_buffer_size, send_tcp); - atomic_store(&mgr->recv_udp_buffer_size, recv_udp); - atomic_store(&mgr->send_udp_buffer_size, send_udp); + atomic_store_relaxed(&mgr->recv_tcp_buffer_size, recv_tcp); + atomic_store_relaxed(&mgr->send_tcp_buffer_size, send_tcp); + atomic_store_relaxed(&mgr->recv_udp_buffer_size, recv_udp); + atomic_store_relaxed(&mgr->send_udp_buffer_size, send_udp); } bool @@ -390,19 +390,19 @@ isc_nm_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle, REQUIRE(VALID_NM(mgr)); if (initial != NULL) { - *initial = atomic_load(&mgr->init); + *initial = atomic_load_relaxed(&mgr->init); } if (idle != NULL) { - *idle = atomic_load(&mgr->idle); + *idle = atomic_load_relaxed(&mgr->idle); } if (keepalive != NULL) { - *keepalive = atomic_load(&mgr->keepalive); + *keepalive = atomic_load_relaxed(&mgr->keepalive); } if (advertised != NULL) { - *advertised = atomic_load(&mgr->advertised); + *advertised = atomic_load_relaxed(&mgr->advertised); } } @@ -410,10 +410,10 @@ bool isc__nmsocket_active(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); if (sock->parent != NULL) { - return (atomic_load(&sock->parent->active)); + return (atomic_load_acquire(&sock->parent->active)); } - return (atomic_load(&sock->active)); + return (atomic_load_acquire(&sock->active)); } bool @@ -421,12 +421,12 @@ isc__nmsocket_deactivate(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); if (sock->parent != NULL) { - return (atomic_compare_exchange_strong(&sock->parent->active, - &(bool){ true }, false)); + return (atomic_compare_exchange_strong_acq_rel( + &sock->parent->active, &(bool){ true }, false)); } - return (atomic_compare_exchange_strong(&sock->active, &(bool){ true }, - false)); + return (atomic_compare_exchange_strong_acq_rel(&sock->active, + &(bool){ true }, false)); } void @@ -469,7 +469,8 @@ nmsocket_cleanup(void *arg) { isc__nm_decstats(sock, STATID_ACTIVE); - atomic_store(&sock->destroying, true); + REQUIRE(!sock->destroying); + sock->destroying = true; if (sock->parent == NULL && sock->children != NULL) { /* @@ -477,7 +478,6 @@ nmsocket_cleanup(void *arg) { * so we can clean up and free the children. */ for (size_t i = 0; i < sock->nchildren; i++) { - REQUIRE(!atomic_load(&sock->children[i].destroying)); isc_refcount_decrementz(&sock->children[i].references); nmsocket_cleanup(&sock->children[i]); } @@ -570,9 +570,18 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG) { return; } - if (atomic_load(&sock->active) || atomic_load(&sock->destroying) || - !atomic_load(&sock->closed) || atomic_load(&sock->references) != 0) - { + REQUIRE(!sock->destroying); + REQUIRE(!atomic_load_acquire(&sock->active)); + + if (!sock->closed) { + return; + } + + if (isc_refcount_current(&sock->references) != 0) { + /* + * Using such check is valid only if we don't use + * isc_refcount_increment0() on the same variable. + */ return; } @@ -588,7 +597,6 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG) { return; } - atomic_store(&sock->destroying, true); if (sock->tid == isc_tid()) { nmsocket_cleanup(sock); } else { @@ -618,7 +626,7 @@ isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) { * destroying the socket, but we have to wait for all the inflight * handles to finish first. */ - atomic_store(&sock->active, false); + atomic_store_release(&sock->active, false); /* * If the socket has children, they'll need to be marked inactive @@ -626,7 +634,7 @@ isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) { */ if (sock->children != NULL) { for (size_t i = 0; i < sock->nchildren; i++) { - atomic_store(&sock->children[i].active, false); + atomic_store_relaxed(&sock->children[i].active, false); } } @@ -636,7 +644,7 @@ isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) { * * If it's a regular socket we may need to close it. */ - if (!atomic_load(&sock->closing) && !atomic_load(&sock->closed)) { + if (!sock->closing && !sock->closed) { switch (sock->type) { case isc_nm_udpsocket: isc__nm_udp_close(sock); @@ -793,15 +801,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker, sock, isc_refcount_current(&sock->references)); atomic_init(&sock->active, true); - atomic_init(&sock->closing, false); - atomic_init(&sock->listening, 0); - atomic_init(&sock->closed, 0); - atomic_init(&sock->destroying, 0); - atomic_init(&sock->client, 0); - atomic_init(&sock->connecting, false); - atomic_init(&sock->keepalive, false); - atomic_init(&sock->connected, false); - atomic_init(&sock->timedout, false); atomic_init(&sock->active_child_connections, 0); @@ -904,7 +903,7 @@ isc___nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t const *peer, switch (sock->type) { case isc_nm_udpsocket: - if (!atomic_load(&sock->client)) { + if (!sock->client) { break; } FALLTHROUGH; @@ -1005,7 +1004,7 @@ nmhandle_destroy(isc_nmhandle_t *handle) { #if defined(__SANITIZE_ADDRESS__) || defined(__SANITIZE_THREAD__) nmhandle_free(sock, handle); #else - if (atomic_load(&sock->active)) { + if (atomic_load_acquire(&sock->active)) { ISC_LIST_APPEND(sock->inactive_handles, handle, inactive_link); } else { nmhandle_free(sock, handle); @@ -1078,7 +1077,7 @@ isc__nm_failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, void isc__nm_failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) { - REQUIRE(atomic_load(&sock->accepting)); + REQUIRE(sock->accepting); REQUIRE(sock->server); /* @@ -1092,7 +1091,7 @@ isc__nm_failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) { isc__nmsocket_detach(&sock->server); - atomic_store(&sock->accepting, false); + sock->accepting = false; switch (eresult) { case ISC_R_NOTCONNECTED: @@ -1112,15 +1111,15 @@ isc__nm_failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, REQUIRE(VALID_UVREQ(req)); REQUIRE(sock->tid == isc_tid()); REQUIRE(req->cb.connect != NULL); + REQUIRE(sock->connecting); + + sock->connecting = false; isc__nm_incstats(sock, STATID_CONNECTFAIL); isc__nmsocket_timer_stop(sock); uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock); - atomic_compare_exchange_enforced(&sock->connecting, &(bool){ true }, - false); - isc__nmsocket_clearcb(sock); isc__nm_connectcb(sock, req, eresult, async); @@ -1157,17 +1156,17 @@ isc__nmsocket_connecttimeout_cb(uv_timer_t *timer) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_tid()); - REQUIRE(atomic_load(&sock->connecting)); REQUIRE(VALID_UVREQ(req)); REQUIRE(VALID_NMHANDLE(req->handle)); + REQUIRE(sock->connecting); isc__nmsocket_timer_stop(sock); /* * Mark the connection as timed out and shutdown the socket. */ - atomic_compare_exchange_enforced(&sock->timedout, &(bool){ false }, - true); + REQUIRE(!sock->timedout); + sock->timedout = true; isc__nmsocket_clearcb(sock); isc__nmsocket_shutdown(sock); } @@ -1221,7 +1220,7 @@ isc__nmsocket_readtimeout_cb(uv_timer_t *timer) { REQUIRE(sock->tid == isc_tid()); REQUIRE(sock->reading); - if (atomic_load(&sock->client)) { + if (sock->client) { uv_timer_stop(timer); sock->recv_read = false; @@ -1259,7 +1258,7 @@ isc__nmsocket_timer_restart(isc_nmsocket_t *sock) { return; } - if (atomic_load(&sock->connecting)) { + if (sock->connecting) { int r; if (sock->connect_timeout == 0) { @@ -1352,7 +1351,7 @@ isc__nm_get_read_req(isc_nmsocket_t *sock, isc_sockaddr_t *sockaddr) { isc_nmhandle_attach(sock->recv_handle, &req->handle); break; default: - if (atomic_load(&sock->client) && sock->statichandle != NULL) { + if (sock->client && sock->statichandle != NULL) { isc_nmhandle_attach(sock->statichandle, &req->handle); } else { req->handle = isc__nmhandle_get(sock, sockaddr, NULL); @@ -1463,7 +1462,7 @@ isc__nm_closing(isc__networker_t *worker) { bool isc__nmsocket_closing(isc_nmsocket_t *sock) { - return (!isc__nmsocket_active(sock) || atomic_load(&sock->closing) || + return (!isc__nmsocket_active(sock) || sock->closing || isc__nm_closing(sock->worker) || (sock->server != NULL && !isc__nmsocket_active(sock->server))); } @@ -1528,13 +1527,17 @@ isc_nmhandle_keepalive(isc_nmhandle_t *handle, bool value) { sock = handle->sock; netmgr = sock->worker->netmgr; + REQUIRE(sock->tid == isc_tid()); + switch (sock->type) { case isc_nm_tcpsocket: - atomic_store(&sock->keepalive, value); - sock->read_timeout = value ? atomic_load(&netmgr->keepalive) - : atomic_load(&netmgr->idle); - sock->write_timeout = value ? atomic_load(&netmgr->keepalive) - : atomic_load(&netmgr->idle); + sock->keepalive = value; + sock->read_timeout = + value ? atomic_load_relaxed(&netmgr->keepalive) + : atomic_load_relaxed(&netmgr->idle); + sock->write_timeout = + value ? atomic_load_relaxed(&netmgr->keepalive) + : atomic_load_relaxed(&netmgr->idle); break; case isc_nm_streamdnssocket: isc__nmhandle_streamdns_keepalive(handle, value); @@ -1780,12 +1783,10 @@ isc__nmsocket_stop(isc_nmsocket_t *listener) { REQUIRE(VALID_NMSOCK(listener)); REQUIRE(listener->tid == isc_tid()); REQUIRE(listener->tid == 0); + REQUIRE(listener->listening); + REQUIRE(!listener->closing); - if (!atomic_compare_exchange_strong(&listener->closing, - &(bool){ false }, true)) - { - UNREACHABLE(); - } + listener->closing = true; for (size_t i = 1; i < listener->nchildren; i++) { isc__networker_t *worker = @@ -1796,11 +1797,7 @@ isc__nmsocket_stop(isc_nmsocket_t *listener) { nmsocket_stop_cb(listener); INSIST(atomic_load(&listener->rchildren) == 0); - if (!atomic_compare_exchange_strong(&listener->listening, - &(bool){ true }, false)) - { - UNREACHABLE(); - } + listener->listening = false; listener->accept_cb = NULL; listener->accept_cbarg = NULL; @@ -1812,7 +1809,7 @@ isc__nmsocket_stop(isc_nmsocket_t *listener) { isc__nmsocket_detach(&listener->outer); } - atomic_store(&listener->closed, true); + listener->closed = true; } void @@ -2186,7 +2183,7 @@ isc_nm_set_maxage(isc_nmhandle_t *handle, const uint32_t ttl) { REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); - REQUIRE(!atomic_load(&handle->sock->client)); + REQUIRE(!handle->sock->client); #if !HAVE_LIBNGHTTP2 UNUSED(ttl); @@ -2569,7 +2566,7 @@ nmsocket_dump(isc_nmsocket_t *sock) { fprintf(stderr, "\n=================\n"); fprintf(stderr, "Active %s socket %p, type %s, refs %" PRIuFAST32 "\n", - atomic_load(&sock->client) ? "client" : "server", sock, + sock->client ? "client" : "server", sock, nmsocket_type_totext(sock->type), isc_refcount_current(&sock->references)); fprintf(stderr, @@ -2577,11 +2574,11 @@ nmsocket_dump(isc_nmsocket_t *sock) { "%p\n", sock->parent, sock->listener, sock->server, sock->statichandle); fprintf(stderr, "Flags:%s%s%s%s%s\n", - atomic_load(&sock->active) ? " active" : "", - atomic_load(&sock->closing) ? " closing" : "", - atomic_load(&sock->destroying) ? " destroying" : "", - atomic_load(&sock->connecting) ? " connecting" : "", - atomic_load(&sock->accepting) ? " accepting" : ""); + atomic_load_acquire(&sock->active) ? " active" : "", + sock->closing ? " closing" : "", + sock->destroying ? " destroying" : "", + sock->connecting ? " connecting" : "", + sock->accepting ? " accepting" : ""); fprintf(stderr, "Created by:\n"); isc_backtrace_symbols_fd(sock->backtrace, sock->backtrace_size, STDERR_FILENO); diff --git a/lib/isc/netmgr/streamdns.c b/lib/isc/netmgr/streamdns.c index 5a3a16ab74..8aeb1ce639 100644 --- a/lib/isc/netmgr/streamdns.c +++ b/lib/isc/netmgr/streamdns.c @@ -124,18 +124,16 @@ streamdns_on_complete_dnsmessage(isc_dnsstream_assembler_t *dnsasm, isc_region_t *restrict region, isc_nmsocket_t *sock, isc_nmhandle_t *transphandle) { - const bool client = atomic_load(&sock->client); const bool last_datum = isc_dnsstream_assembler_remaininglength( dnsasm) == region->length; /* - * Stop after one message if a client - * connection. + * Stop after one message if a client connection. */ - bool stop = client; + bool stop = sock->client; sock->recv_read = false; if (sock->recv_cb != NULL) { - if (!client) { + if (!sock->client) { /* * We must allocate a new handle object, as we * need to ensure that after processing of this @@ -259,8 +257,8 @@ streamdns_sock_new(isc__networker_t *worker, const isc_nmsocket_type_t type, uint32_t initial = 0; isc_nm_gettimeouts(worker->netmgr, &initial, NULL, NULL, NULL); sock->read_timeout = initial; - atomic_init(&sock->client, !is_server); - atomic_init(&sock->connecting, !is_server); + sock->client = !is_server; + sock->connecting = !is_server; sock->streamdns.input = isc_dnsstream_assembler_new( sock->worker->mctx, streamdns_on_dnsmessage_data_cb, sock); @@ -272,7 +270,7 @@ streamdns_sock_new(isc__networker_t *worker, const isc_nmsocket_type_t type, static void streamdns_call_connect_cb(isc_nmsocket_t *sock, isc_nmhandle_t *handle, const isc_result_t result) { - atomic_store(&sock->connecting, false); + sock->connecting = false; if (sock->connect_cb == NULL) { return; } @@ -280,7 +278,7 @@ streamdns_call_connect_cb(isc_nmsocket_t *sock, isc_nmhandle_t *handle, if (result != ISC_R_SUCCESS) { isc__nmsocket_clearcb(handle->sock); } else { - atomic_store(&sock->connected, true); + sock->connected = true; } streamdns_try_close_unused(sock); } @@ -341,7 +339,7 @@ streamdns_transport_connected(isc_nmhandle_t *handle, isc_result_t result, } isc_nmhandle_attach(handle, &sock->outerhandle); - atomic_store(&sock->active, true); + atomic_store_release(&sock->active, true); handle->sock->streamdns.sock = sock; @@ -364,7 +362,7 @@ error: isc_nm_verify_tls_peer_result_string(handle); } streamhandle = isc__nmhandle_get(sock, NULL, NULL); - atomic_store(&sock->closed, true); + sock->closed = true; streamdns_call_connect_cb(sock, streamhandle, result); isc_nmhandle_detach(&streamhandle); isc__nmsocket_detach(&sock); @@ -482,7 +480,7 @@ streamdns_failed_read_cb(isc_nmsocket_t *sock, const isc_result_t result, sock->recv_read = false; isc_dnsstream_assembler_clear(sock->streamdns.input); isc__nmsocket_clearcb(sock); - } else if (atomic_load(&sock->client)) { + } else if (sock->client) { sock->recv_read = false; } isc__nm_readcb(sock, req, result, async); @@ -630,7 +628,7 @@ streamdns_resume_processing(void *arg) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_tid()); - REQUIRE(!atomic_load(&sock->client)); + REQUIRE(!sock->client); if (streamdns_closing(sock)) { return; @@ -658,9 +656,7 @@ streamdns_accept_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { if (isc__nm_closing(handle->sock->worker)) { return (ISC_R_SHUTTINGDOWN); - } else if (isc__nmsocket_closing(handle->sock) || - atomic_load(&listensock->closing)) - { + } else if (isc__nmsocket_closing(handle->sock) || listensock->closing) { return (ISC_R_CANCELED); } @@ -676,8 +672,8 @@ streamdns_accept_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { isc_nm_gettimeouts(handle->sock->worker->netmgr, &initial, NULL, NULL, NULL); nsock->read_timeout = initial; - atomic_init(&nsock->accepting, true); - atomic_store(&nsock->active, true); + nsock->accepting = true; + atomic_store_release(&nsock->active, true); isc__nmsocket_attach(listensock, &nsock->listener); isc_nmhandle_attach(handle, &nsock->outerhandle); @@ -693,7 +689,7 @@ streamdns_accept_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { isc_nmhandle_detach(&nsock->recv_handle); isc__nmsocket_detach(&nsock->listener); isc_nmhandle_detach(&nsock->outerhandle); - atomic_store(&nsock->closed, true); + nsock->closed = true; goto exit; } @@ -706,7 +702,7 @@ streamdns_accept_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { streamdns_handle_incoming_data(nsock, nsock->outerhandle, NULL, 0); exit: - atomic_store(&nsock->accepting, false); + nsock->accepting = false; return (result); } @@ -745,7 +741,7 @@ isc_nm_listenstreamdns(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, backlog, quota, tlsctx, &listener->outer); } if (result != ISC_R_SUCCESS) { - atomic_store(&listener->closed, true); + listener->closed = true; isc__nmsocket_detach(&listener); return (result); } @@ -756,8 +752,8 @@ isc_nm_listenstreamdns(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, } listener->result = result; - atomic_store(&listener->active, true); - atomic_store(&listener->listening, true); + atomic_store_release(&listener->active, true); + listener->listening = true; INSIST(listener->outer->streamdns.listener == NULL); listener->nchildren = listener->outer->nchildren; isc__nmsocket_barrier_init(listener); @@ -942,8 +938,8 @@ streamdns_close_direct(isc_nmsocket_t *sock) { /* Further cleanup performed in isc__nm_streamdns_cleanup_data() */ isc_dnsstream_assembler_clear(sock->streamdns.input); - atomic_store(&sock->closed, true); - atomic_store(&sock->active, false); + sock->closed = true; + atomic_store_release(&sock->active, false); } void @@ -951,12 +947,9 @@ isc__nm_streamdns_close(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_streamdnssocket); REQUIRE(sock->tid == isc_tid()); + REQUIRE(!sock->closing); - if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, - true)) - { - return; - } + sock->closing = true; streamdns_close_direct(sock); } diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 0be807f2b7..a5bd12b44f 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -80,8 +80,10 @@ failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult); static void failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) { - REQUIRE(atomic_load(&sock->accepting)); REQUIRE(sock->server); + REQUIRE(sock->accepting); + + sock->accepting = false; /* * Detach the quota early to make room for other connections; @@ -94,8 +96,6 @@ failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) { isc__nmsocket_detach(&sock->server); - atomic_store(&sock->accepting, false); - switch (eresult) { case ISC_R_NOTCONNECTED: /* IGNORE: The client disconnected before we could accept */ @@ -120,7 +120,7 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { worker = sock->worker; - atomic_store(&sock->connecting, true); + sock->connecting = true; /* 2 minute timeout */ result = isc__nm_socket_connectiontimeout(sock->fd, 120 * 1000); @@ -165,8 +165,6 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { &req->uv_req.connect); isc__nmsocket_timer_start(sock); - atomic_store(&sock->connected, true); - return (ISC_R_SUCCESS); } @@ -189,10 +187,10 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) { REQUIRE(VALID_UVREQ(req)); REQUIRE(VALID_NMHANDLE(req->handle)); - if (atomic_load(&sock->timedout)) { + if (sock->timedout) { result = ISC_R_TIMEDOUT; goto error; - } else if (!atomic_load(&sock->connecting)) { + } else if (!sock->connecting) { /* * The connect was cancelled from timeout; just clean up * the req. @@ -245,7 +243,8 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) { goto error; } - atomic_store(&sock->connecting, false); + sock->connecting = false; + sock->connected = true; result = isc_sockaddr_fromsockaddr(&sock->peer, (struct sockaddr *)&ss); RUNTIME_CHECK(result == ISC_R_SUCCESS); @@ -289,7 +288,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer, sock->connect_timeout = timeout; sock->fd = fd; - atomic_init(&sock->client, true); + sock->client = true; req = isc__nm_uvreq_get(sock); req->cb.connect = cb; @@ -301,11 +300,11 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer, (void)isc__nm_socket_min_mtu(sock->fd, sa_family); (void)isc__nm_socket_tcp_maxseg(sock->fd, NM_MAXSEG); - atomic_store(&sock->active, true); + atomic_store_release(&sock->active, true); result = tcp_connect_direct(sock, req); if (result != ISC_R_SUCCESS) { - atomic_store(&sock->active, false); + atomic_store_release(&sock->active, false); isc__nm_tcp_close(sock); isc__nm_connectcb(sock, req, result, true); } @@ -416,7 +415,7 @@ start_tcp_child_job(void *arg) { goto done; } - atomic_store(&sock->listening, true); + sock->listening = true; if (sock->tid == 0) { r = uv_tcp_getsockname(&sock->uv_handle.tcp, @@ -550,16 +549,15 @@ isc_nm_listentcp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, } } - atomic_store(&sock->active, true); - if (result != ISC_R_SUCCESS) { - atomic_store(&sock->active, false); + atomic_store_release(&sock->active, false); isc__nm_tcp_stoplistening(sock); isc_nmsocket_close(&sock); return (result); } + atomic_store_release(&sock->active, true); REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren); *sockp = sock; return (ISC_R_SUCCESS); @@ -607,9 +605,9 @@ stop_tcp_child_job(void *arg) { REQUIRE(sock->tid == isc_tid()); REQUIRE(sock->parent != NULL); REQUIRE(sock->type == isc_nm_tcpsocket); + REQUIRE(!sock->closing); - RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closing, - &(bool){ false }, true)); + sock->closing = true; /* * The order of the close operation is important here, the uv_close() @@ -633,45 +631,41 @@ stop_tcp_child_job(void *arg) { } static void -stop_tcp_child(isc_nmsocket_t *sock, uint32_t tid) { - isc_nmsocket_t *csock = NULL; +stop_tcp_child(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); - csock = &sock->children[tid]; - REQUIRE(VALID_NMSOCK(csock)); - - atomic_store(&csock->active, false); - - if (tid == 0) { - stop_tcp_child_job(csock); + atomic_store_release(&sock->active, false); + if (sock->tid == 0) { + stop_tcp_child_job(sock); } else { - isc_async_run(csock->worker->loop, stop_tcp_child_job, csock); + isc_async_run(sock->worker->loop, stop_tcp_child_job, sock); } } -static void -stop_tcp_parent(isc_nmsocket_t *sock) { - /* Stop the parent */ - atomic_store(&sock->closed, true); - isc__nmsocket_prep_destroy(sock); -} - void isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_tcplistener); REQUIRE(sock->tid == isc_tid()); REQUIRE(sock->tid == 0); + REQUIRE(!sock->closing); - RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closing, - &(bool){ false }, true)); + sock->closing = true; + /* Mark the parent socket inactive */ + atomic_store_release(&sock->active, false); + + /* Stop all the other threads' children */ for (size_t i = 1; i < sock->nchildren; i++) { - stop_tcp_child(sock, i); + stop_tcp_child(&sock->children[i]); } - stop_tcp_child(sock, 0); + /* Stop the child for the main thread */ + stop_tcp_child(&sock->children[0]); - stop_tcp_parent(sock); + /* Stop the parent */ + sock->closed = true; + isc__nmsocket_prep_destroy(sock); } static void @@ -681,16 +675,15 @@ tcp_stop_cb(uv_handle_t *handle) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_tid()); - REQUIRE(atomic_load(&sock->closing)); + REQUIRE(sock->closing); REQUIRE(sock->type == isc_nm_tcpsocket); + REQUIRE(!sock->closed); - RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closed, - &(bool){ false }, true)); + sock->closed = true; + sock->listening = false; isc__nm_incstats(sock, STATID_CLOSE); - atomic_store(&sock->listening, false); - isc__nmsocket_detach(&sock); } @@ -747,9 +740,10 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { /* Initialize the timer */ if (sock->read_timeout == 0) { - sock->read_timeout = (atomic_load(&sock->keepalive) - ? atomic_load(&netmgr->keepalive) - : atomic_load(&netmgr->idle)); + sock->read_timeout = + sock->keepalive + ? atomic_load_relaxed(&netmgr->keepalive) + : atomic_load_relaxed(&netmgr->idle); } if (isc__nmsocket_closing(sock)) { @@ -824,10 +818,11 @@ isc__nm_tcp_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { req->uvbuf.base = buf->base; req->uvbuf.len = nread; - if (!atomic_load(&sock->client)) { - sock->read_timeout = (atomic_load(&sock->keepalive) - ? atomic_load(&netmgr->keepalive) - : atomic_load(&netmgr->idle)); + if (!sock->client) { + sock->read_timeout = + sock->keepalive + ? atomic_load_relaxed(&netmgr->keepalive) + : atomic_load_relaxed(&netmgr->idle); } isc__nm_readcb(sock, req, ISC_R_SUCCESS, false); @@ -912,7 +907,7 @@ accept_connection(isc_nmsocket_t *ssock) { isc__nmsocket_attach(ssock, &csock->server); csock->recv_cb = ssock->recv_cb; csock->recv_cbarg = ssock->recv_cbarg; - atomic_init(&csock->accepting, true); + csock->accepting = true; csock->quota = ssock->pquota; worker = csock->worker; @@ -964,11 +959,11 @@ accept_connection(isc_nmsocket_t *ssock) { goto failure; } - atomic_store(&csock->accepting, false); + csock->accepting = false; isc__nm_incstats(csock, STATID_ACCEPT); - csock->read_timeout = atomic_load(&csock->worker->netmgr->init); + csock->read_timeout = atomic_load_relaxed(&csock->worker->netmgr->init); /* * The acceptcb needs to attach to the handle if it wants to keep the @@ -984,7 +979,7 @@ accept_connection(isc_nmsocket_t *ssock) { return (ISC_R_SUCCESS); failure: - atomic_store(&csock->active, false); + atomic_store_release(&csock->active, false); failed_accept_cb(csock, result); @@ -1022,9 +1017,10 @@ tcp_send(isc_nmhandle_t *handle, const isc_region_t *region, isc_nm_cb_t cb, uvreq->cbarg = cbarg; if (sock->write_timeout == 0) { - sock->write_timeout = (atomic_load(&sock->keepalive) - ? atomic_load(&netmgr->keepalive) - : atomic_load(&netmgr->idle)); + sock->write_timeout = + sock->keepalive + ? atomic_load_relaxed(&netmgr->keepalive) + : atomic_load_relaxed(&netmgr->idle); } result = tcp_send_direct(sock, uvreq); @@ -1150,10 +1146,11 @@ static void tcp_close_sock(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_tid()); - REQUIRE(atomic_load(&sock->closing)); + REQUIRE(sock->closing); + REQUIRE(!sock->closed); - RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closed, - &(bool){ false }, true)); + sock->closed = true; + sock->connected = false; isc__nm_incstats(sock, STATID_CLOSE); @@ -1161,8 +1158,6 @@ tcp_close_sock(isc_nmsocket_t *sock) { isc__nmsocket_detach(&sock->server); } - atomic_store(&sock->connected, false); - isc__nmsocket_prep_destroy(sock); } @@ -1181,9 +1176,9 @@ isc__nm_tcp_close(isc_nmsocket_t *sock) { REQUIRE(!isc__nmsocket_active(sock)); REQUIRE(sock->tid == isc_tid()); REQUIRE(sock->parent == NULL); + REQUIRE(!sock->closing); - RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closing, - &(bool){ false }, true)); + sock->closing = true; if (sock->quota != NULL) { isc_quota_detach(&sock->quota); @@ -1246,11 +1241,11 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { return; } - if (atomic_load(&sock->accepting)) { + if (sock->accepting) { return; } - if (atomic_load(&sock->connecting)) { + if (sock->connecting) { isc_nmsocket_t *tsock = NULL; isc__nmsocket_attach(sock, &tsock); uv_close(&sock->uv_handle.handle, tcp_close_connect_cb); diff --git a/lib/isc/netmgr/tlsstream.c b/lib/isc/netmgr/tlsstream.c index c1293ecaac..949c27766d 100644 --- a/lib/isc/netmgr/tlsstream.c +++ b/lib/isc/netmgr/tlsstream.c @@ -107,10 +107,10 @@ tls_try_to_enable_tcp_nodelay(isc_nmsocket_t *tlssock); */ static bool inactive(isc_nmsocket_t *sock) { - return (!isc__nmsocket_active(sock) || atomic_load(&sock->closing) || + return (!isc__nmsocket_active(sock) || sock->closing || sock->outerhandle == NULL || !isc__nmsocket_active(sock->outerhandle->sock) || - atomic_load(&sock->outerhandle->sock->closing) || + sock->outerhandle->sock->closing || (sock->listener != NULL && !isc__nmsocket_active(sock->listener)) || isc__nm_closing(sock->worker)); @@ -424,7 +424,7 @@ tls_try_handshake(isc_nmsocket_t *sock, isc_result_t *presult) { if (isc__nm_closing(sock->worker)) { result = ISC_R_SHUTTINGDOWN; } else if (isc__nmsocket_closing(sock) || - atomic_load(&sock->listener->closing)) + sock->listener->closing) { result = ISC_R_CANCELED; } else { @@ -544,13 +544,13 @@ tls_do_bio(isc_nmsocket_t *sock, isc_region_t *received_data, INSIST(SSL_is_init_finished( sock->tlsstream.tls) == 1); - INSIST(!atomic_load(&sock->client)); + INSIST(!sock->client); finish = true; } else if (sock->tlsstream.state == TLS_IO && hs_result == ISC_R_SUCCESS && !sock->tlsstream.server) { - INSIST(atomic_load(&sock->client)); + INSIST(sock->client); } } } else if (send_data != NULL) { @@ -867,7 +867,7 @@ tlslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { isc_tlsctx_attach(tlsctx, &tlssock->tlsstream.ctx); tlssock->tlsstream.tls = isc_tls_create(tlssock->tlsstream.ctx); if (tlssock->tlsstream.tls == NULL) { - atomic_store(&tlssock->closed, true); + tlssock->closed = true; isc_tlsctx_free(&tlssock->tlsstream.ctx); isc__nmsocket_detach(&tlssock); return (ISC_R_TLSERROR); @@ -877,7 +877,7 @@ tlslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { isc_nmhandle_attach(handle, &tlssock->outerhandle); tlssock->peer = handle->sock->peer; tlssock->read_timeout = - atomic_load(&handle->sock->worker->netmgr->init); + atomic_load_relaxed(&handle->sock->worker->netmgr->init); /* * Hold a reference to tlssock in the TCP socket: it will @@ -932,7 +932,7 @@ isc_nm_listentls(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, result = isc_nm_listentcp(mgr, workers, iface, tlslisten_acceptcb, tlssock, backlog, quota, &tlssock->outer); if (result != ISC_R_SUCCESS) { - atomic_store(&tlssock->closed, true); + tlssock->closed = true; isc__nmsocket_detach(&tlssock); return (result); } @@ -945,7 +945,7 @@ isc_nm_listentls(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, /* wait for listen result */ isc__nmsocket_attach(tlssock->outer, &tsock); tlssock->result = result; - atomic_store(&tlssock->active, true); + atomic_store_release(&tlssock->active, true); INSIST(tlssock->outer->tlsstream.tlslistener == NULL); isc__nmsocket_attach(tlssock, &tlssock->outer->tlsstream.tlslistener); isc__nmsocket_detach(&tsock); @@ -956,7 +956,7 @@ isc_nm_listentls(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, atomic_init(&tlssock->rchildren, tlssock->nchildren); if (result == ISC_R_SUCCESS) { - atomic_store(&tlssock->listening, true); + tlssock->listening = true; *sockp = tlssock; } @@ -1086,8 +1086,8 @@ tls_close_direct(void *arg) { } /* Further cleanup performed in isc__nm_tls_cleanup_data() */ - atomic_store(&sock->closed, true); - atomic_store(&sock->active, false); + sock->closed = true; + atomic_store_release(&sock->active, false); sock->tlsstream.state = TLS_CLOSED; } @@ -1095,12 +1095,9 @@ void isc__nm_tls_close(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_tlssocket); + REQUIRE(!sock->closing); - if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, - true)) - { - return; - } + sock->closing = true; if (sock->tid == isc_tid()) { /* no point in attempting to make the call asynchronous */ @@ -1144,7 +1141,7 @@ isc_nm_tlsconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer, nsock->connect_cbarg = cbarg; nsock->connect_timeout = timeout; isc_tlsctx_attach(ctx, &nsock->tlsstream.ctx); - atomic_init(&nsock->client, true); + nsock->client = true; if (client_sess_cache != NULL) { INSIST(isc_tlsctx_client_session_cache_getctx( client_sess_cache) == ctx); @@ -1194,7 +1191,7 @@ tcp_connected(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { } tlssock->peer = isc_nmhandle_peeraddr(handle); isc_nmhandle_attach(handle, &tlssock->outerhandle); - atomic_store(&tlssock->active, true); + atomic_store_release(&tlssock->active, true); if (tlssock->tlsstream.client_sess_cache != NULL) { isc_tlsctx_client_session_cache_reuse_sockaddr( @@ -1215,7 +1212,7 @@ tcp_connected(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { return; error: tlshandle = isc__nmhandle_get(tlssock, NULL, NULL); - atomic_store(&tlssock->closed, true); + tlssock->closed = true; tls_call_connect_cb(tlssock, tlshandle, result); isc_nmhandle_detach(&tlshandle); isc__nmsocket_detach(&tlssock); @@ -1246,7 +1243,7 @@ isc__nm_tls_cleanup_data(isc_nmsocket_t *sock) { isc_tlsctx_free(&sock->tlsstream.ctx); } if (sock->tlsstream.client_sess_cache != NULL) { - INSIST(atomic_load(&sock->client)); + INSIST(sock->client); isc_tlsctx_client_session_cache_detach( &sock->tlsstream.client_sess_cache); } @@ -1468,7 +1465,7 @@ tls_keep_client_tls_session(isc_nmsocket_t *sock) { if (sock->tlsstream.client_sess_cache != NULL && sock->tlsstream.client_session_saved == false) { - INSIST(atomic_load(&sock->client)); + INSIST(sock->client); isc_tlsctx_client_session_cache_keep_sockaddr( sock->tlsstream.client_sess_cache, &sock->peer, sock->tlsstream.tls); diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index f7fa9a6317..efa1947aad 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -168,8 +168,7 @@ start_udp_child_job(void *arg) { } sock->reading = true; - - atomic_store(&sock->listening, true); + sock->listening = true; done: result = isc_uverr2result(r); @@ -275,15 +274,15 @@ isc_nm_listenudp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, } } - atomic_store(&sock->active, true); - if (result != ISC_R_SUCCESS) { - atomic_store(&sock->active, false); + atomic_store_release(&sock->active, false); isc__nm_udp_stoplistening(sock); isc_nmsocket_close(&sock); return (result); } + + atomic_store_release(&sock->active, true); INSIST(atomic_load(&sock->rchildren) == sock->nchildren); *sockp = sock; return (ISC_R_SUCCESS); @@ -328,7 +327,7 @@ route_connect_direct(isc_nmsocket_t *sock) { worker = sock->worker; - atomic_store(&sock->connecting, true); + sock->connecting = true; r = uv_udp_init(&worker->loop->loop, &sock->uv_handle.udp); UV_RUNTIME_CHECK(uv_udp_init, r); @@ -350,8 +349,8 @@ route_connect_direct(isc_nmsocket_t *sock) { isc__nm_set_network_buffers(sock->worker->netmgr, &sock->uv_handle.handle); - atomic_store(&sock->connecting, false); - atomic_store(&sock->connected, true); + sock->connecting = false; + sock->connected = true; return (ISC_R_SUCCESS); } @@ -384,7 +383,7 @@ isc_nm_routeconnect(isc_nm_t *mgr, isc_nm_cb_t cb, void *cbarg) { sock->connect_cb = cb; sock->connect_cbarg = cbarg; - atomic_init(&sock->client, true); + sock->client = true; sock->route_sock = true; sock->fd = fd; @@ -393,11 +392,11 @@ isc_nm_routeconnect(isc_nm_t *mgr, isc_nm_cb_t cb, void *cbarg) { req->cbarg = cbarg; req->handle = isc__nmhandle_get(sock, NULL, NULL); - atomic_store(&sock->active, true); + atomic_store_release(&sock->active, true); result = route_connect_direct(sock); if (result != ISC_R_SUCCESS) { - atomic_store(&sock->active, false); + atomic_store_release(&sock->active, false); isc__nm_udp_close(sock); } @@ -437,7 +436,7 @@ static void stop_udp_child(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); - atomic_store(&sock->active, false); + atomic_store_release(&sock->active, false); if (sock->tid == 0) { stop_udp_child_job(sock); } else { @@ -451,12 +450,12 @@ isc__nm_udp_stoplistening(isc_nmsocket_t *sock) { REQUIRE(sock->type == isc_nm_udplistener); REQUIRE(sock->tid == isc_tid()); REQUIRE(sock->tid == 0); + REQUIRE(!sock->closing); - RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closing, - &(bool){ false }, true)); + sock->closing = true; /* Mark the parent socket inactive */ - atomic_store(&sock->active, false); + atomic_store_release(&sock->active, false); /* Stop all the other threads' children */ for (size_t i = 1; i < sock->nchildren; i++) { @@ -467,7 +466,7 @@ isc__nm_udp_stoplistening(isc_nmsocket_t *sock) { stop_udp_child(&sock->children[0]); /* Stop the parent */ - atomic_store(&sock->closed, true); + sock->closed = true; isc__nmsocket_prep_destroy(sock); } @@ -509,7 +508,7 @@ isc__nm_udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, * - If we're simulating a firewall blocking UDP packets * bigger than 'maxudp' bytes for testing purposes. */ - maxudp = atomic_load(&sock->worker->netmgr->maxudp); + maxudp = atomic_load_relaxed(&sock->worker->netmgr->maxudp); if ((maxudp != 0 && (uint32_t)nrecv > maxudp)) { /* * We need to keep the read_cb intact in case, so the @@ -582,7 +581,7 @@ isc__nm_udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, * stop reading now. The reading could be restarted in the read * callback with another isc_nm_read() call. */ - if (atomic_load(&sock->client)) { + if (sock->client) { isc__nmsocket_timer_stop(sock); isc__nm_stop_reading(sock); } @@ -654,7 +653,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region, isc_nm_cb_t cb, void *cbarg) { isc_nmsocket_t *sock = handle->sock; const isc_sockaddr_t *peer = &handle->peer; - const struct sockaddr *sa = &peer->type.sa; + const struct sockaddr *sa = sock->connected ? NULL : &peer->type.sa; isc__nm_uvreq_t *uvreq = NULL; isc__networker_t *worker = NULL; uint32_t maxudp; @@ -700,15 +699,6 @@ isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region, goto fail; } - /* - * We used uv_udp_connect(), so the peer address has to be - * set to NULL or else uv_udp_send() could fail or assert, - * depending on the libuv version. - */ - if (atomic_load(&sock->connected)) { - sa = NULL; - } - r = uv_udp_send(&uvreq->uv_req.udp_send, &sock->uv_handle.udp, &uvreq->uvbuf, 1, sa, udp_send_cb); if (r < 0) { @@ -813,7 +803,7 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer, sock->connect_cbarg = cbarg; sock->read_timeout = timeout; sock->peer = *peer; - atomic_init(&sock->client, true); + sock->client = true; sock->fd = fd; result = isc__nm_socket_reuse(sock->fd); @@ -834,19 +824,19 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer, req->local = *local; req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface); - atomic_store(&sock->active, true); - atomic_store(&sock->connecting, true); + atomic_store_release(&sock->active, true); + sock->connecting = true; result = udp_connect_direct(sock, req); if (result != ISC_R_SUCCESS) { - atomic_store(&sock->active, false); + atomic_store_release(&sock->active, false); isc__nm_failed_connect_cb(sock, req, result, true); isc__nmsocket_detach(&sock); return; } - atomic_store(&sock->connecting, false); - atomic_store(&sock->connected, true); + sock->connecting = false; + sock->connected = true; isc__nm_connectcb(sock, req, ISC_R_SUCCESS, true); isc__nmsocket_detach(&sock); @@ -859,7 +849,7 @@ isc__nm_udp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result, REQUIRE(result != ISC_R_SUCCESS); REQUIRE(sock->tid == isc_tid()); - if (atomic_load(&sock->client)) { + if (sock->client) { isc__nmsocket_timer_stop(sock); isc__nm_stop_reading(sock); @@ -952,13 +942,10 @@ udp_close_cb(uv_handle_t *handle) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_tid()); - REQUIRE(atomic_load(&sock->closing)); + REQUIRE(sock->closing); + REQUIRE(!sock->closed); - if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false }, - true)) - { - UNREACHABLE(); - } + sock->closed = true; isc__nm_incstats(sock, STATID_CLOSE); @@ -967,14 +954,15 @@ udp_close_cb(uv_handle_t *handle) { isc__nmsocket_detach(&sock->server); } + /* All sockets */ + sock->listening = false; + if (sock->parent != NULL) { /* listening socket (listen) */ - atomic_store(&sock->listening, false); isc__nmsocket_detach(&sock); } else { /* client and server sockets */ - atomic_store(&sock->connected, false); - atomic_store(&sock->listening, false); + sock->connected = false; isc__nmsocket_prep_destroy(sock); } } @@ -984,12 +972,9 @@ isc__nm_udp_close(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_udpsocket); REQUIRE(sock->tid == isc_tid()); + REQUIRE(!sock->closing); - if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, - true)) - { - return; - } + sock->closing = true; isc__nmsocket_clearcb(sock); isc__nmsocket_timer_stop(sock); @@ -1029,13 +1014,8 @@ isc__nm_udp_shutdown(isc_nmsocket_t *sock) { return; } - /* - * If the socket is connecting, the cancel will happen in the - * async_udpconnect() due socket being inactive now. - */ - if (atomic_load(&sock->connecting)) { - return; - } + /* uv_udp_connect is synchronous, we can't be in connected state */ + REQUIRE(!sock->connecting); /* * When the client detaches the last handle, the @@ -1070,7 +1050,7 @@ udp_cancelread_cb(void *arg) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_tid()); - REQUIRE(atomic_load(&sock->client)); + REQUIRE(sock->client); isc__nm_failed_read_cb(sock, ISC_R_EOF, false); isc__nmsocket_detach(&sock); From 45365adb324f8c602dfd28e53d4c6db4a7432a37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Sur=C3=BD?= Date: Tue, 28 Mar 2023 17:03:56 +0200 Subject: [PATCH 2/3] Convert sock->active to non-atomic variable, cleanup rchildren The last atomic_bool variable sock->active was converted to non-atomic bool by properly handling the listening socket case where we were checking parent socket instead of children sockets. This is no longer necessary as we properly set the .active to false on the children sockets. Additionally, cleanup the .rchildren - the atomic variable was used for mutex+condition to block until all children were listening, but that's now being handled by a barrier. Finally, just remove dead .self and .active_child_connections members of the netmgr socket. --- lib/isc/netmgr/http.c | 5 ++--- lib/isc/netmgr/netmgr-int.h | 27 ++---------------------- lib/isc/netmgr/netmgr.c | 40 +++++++----------------------------- lib/isc/netmgr/streamdns.c | 9 ++++---- lib/isc/netmgr/tcp.c | 34 +++++++++++++++--------------- lib/isc/netmgr/tlsstream.c | 7 +++---- lib/isc/netmgr/udp.c | 41 +++++++++++++++++-------------------- 7 files changed, 54 insertions(+), 109 deletions(-) diff --git a/lib/isc/netmgr/http.c b/lib/isc/netmgr/http.c index 0f7bd8f2d0..51ef77dbe4 100644 --- a/lib/isc/netmgr/http.c +++ b/lib/isc/netmgr/http.c @@ -2534,7 +2534,6 @@ isc_nm_listenhttp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, sock->fd = (uv_os_sock_t)-1; isc__nmsocket_barrier_init(sock); - atomic_init(&sock->rchildren, sock->nchildren); sock->listening = true; *sockp = sock; @@ -2712,7 +2711,7 @@ http_close_direct(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); sock->closed = true; - atomic_store_release(&sock->active, false); + sock->active = false; session = sock->h2.session; if (session != NULL && session->sending == 0 && !session->reading) { @@ -2840,7 +2839,7 @@ server_call_failed_read_cb(isc_result_t result, isc_nmsocket_h2_t *next = ISC_LIST_NEXT(h2data, link); ISC_LIST_DEQUEUE(session->sstreams, h2data, link); /* Cleanup socket in place */ - atomic_store_release(&h2data->psock->active, false); + h2data->psock->active = false; h2data->psock->closed = true; isc__nmsocket_detach(&h2data->psock); diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index fd7e38a20e..d4028090db 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -477,6 +477,7 @@ struct isc_nmsocket { /*% Unlocked, RO */ int magic; uint32_t tid; + isc_refcount_t references; isc_nmsocket_type type; isc__networker_t *worker; @@ -487,8 +488,6 @@ struct isc_nmsocket { isc_nmsocket_t *parent; /*% Listener socket this connection was accepted on */ isc_nmsocket_t *listener; - /*% Self socket */ - isc_nmsocket_t *self; /*% TLS stuff */ struct tlsstream { @@ -580,19 +579,12 @@ struct isc_nmsocket { /*% Peer address */ isc_sockaddr_t peer; - /* Atomic */ - /*% Number of running (e.g. listening) child sockets */ - atomic_uint_fast32_t rchildren; - /*% * Socket is active if it's listening, working, etc. If it's * closing, then it doesn't make a sense, for example, to * push handles or reqs for reuse. - * - * We might be accessing sock->parent->active from a different - * thread, so .active has to be atomic. */ - atomic_bool active; + bool active; bool destroying; bool route_sock; @@ -611,7 +603,6 @@ struct isc_nmsocket { bool accepting; bool reading; bool timedout; - isc_refcount_t references; /*% * Established an outgoing connection, as client not server. @@ -664,8 +655,6 @@ struct isc_nmsocket { isc_nm_accept_cb_t accept_cb; void *accept_cbarg; - atomic_int_fast32_t active_child_connections; - bool barriers_initialised; bool manual_read_timer; #if ISC_NETMGR_TRACE @@ -767,18 +756,6 @@ isc__nmsocket_active(isc_nmsocket_t *sock); * or, for child sockets, 'sock->parent->active'. */ -bool -isc__nmsocket_deactivate(isc_nmsocket_t *sock); -/*%< - * @brief Deactivate active socket - * - * Atomically deactive the socket by setting @p sock->active or, for child - * sockets, @p sock->parent->active to @c false - * - * @param[in] sock - valid nmsocket - * @return @c false if the socket was already inactive, @c true otherwise - */ - void isc__nmsocket_clearcb(isc_nmsocket_t *sock); /*%< diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 4912316058..36972df7e6 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -409,24 +409,8 @@ isc_nm_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle, bool isc__nmsocket_active(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); - if (sock->parent != NULL) { - return (atomic_load_acquire(&sock->parent->active)); - } - return (atomic_load_acquire(&sock->active)); -} - -bool -isc__nmsocket_deactivate(isc_nmsocket_t *sock) { - REQUIRE(VALID_NMSOCK(sock)); - - if (sock->parent != NULL) { - return (atomic_compare_exchange_strong_acq_rel( - &sock->parent->active, &(bool){ true }, false)); - } - - return (atomic_compare_exchange_strong_acq_rel(&sock->active, - &(bool){ true }, false)); + return (sock->active); } void @@ -571,7 +555,7 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG) { } REQUIRE(!sock->destroying); - REQUIRE(!atomic_load_acquire(&sock->active)); + REQUIRE(!sock->active); if (!sock->closed) { return; @@ -626,17 +610,12 @@ isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) { * destroying the socket, but we have to wait for all the inflight * handles to finish first. */ - atomic_store_release(&sock->active, false); + sock->active = false; /* - * If the socket has children, they'll need to be marked inactive - * so they can be cleaned up too. + * If the socket has children, they have been marked inactive by the + * shutdown uv_walk */ - if (sock->children != NULL) { - for (size_t i = 0; i < sock->nchildren; i++) { - atomic_store_relaxed(&sock->children[i].active, false); - } - } /* * If we're here then we already stopped listening; otherwise @@ -729,6 +708,7 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker, .result = ISC_R_UNSET, .active_handles = ISC_LIST_INITIALIZER, .active_link = ISC_LINK_INITIALIZER, + .active = true, }; if (iface != NULL) { @@ -800,10 +780,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker, "\n", sock, isc_refcount_current(&sock->references)); - atomic_init(&sock->active, true); - - atomic_init(&sock->active_child_connections, 0); - #if HAVE_LIBNGHTTP2 isc__nm_http_initsocket(sock); #endif @@ -1004,7 +980,7 @@ nmhandle_destroy(isc_nmhandle_t *handle) { #if defined(__SANITIZE_ADDRESS__) || defined(__SANITIZE_THREAD__) nmhandle_free(sock, handle); #else - if (atomic_load_acquire(&sock->active)) { + if (sock->active) { ISC_LIST_APPEND(sock->inactive_handles, handle, inactive_link); } else { nmhandle_free(sock, handle); @@ -1774,7 +1750,6 @@ static void nmsocket_stop_cb(void *arg) { isc_nmsocket_t *listener = arg; - (void)atomic_fetch_sub(&listener->rchildren, 1); isc_barrier_wait(&listener->stop_barrier); } @@ -1795,7 +1770,6 @@ isc__nmsocket_stop(isc_nmsocket_t *listener) { } nmsocket_stop_cb(listener); - INSIST(atomic_load(&listener->rchildren) == 0); listener->listening = false; diff --git a/lib/isc/netmgr/streamdns.c b/lib/isc/netmgr/streamdns.c index 8aeb1ce639..374d9712de 100644 --- a/lib/isc/netmgr/streamdns.c +++ b/lib/isc/netmgr/streamdns.c @@ -339,7 +339,7 @@ streamdns_transport_connected(isc_nmhandle_t *handle, isc_result_t result, } isc_nmhandle_attach(handle, &sock->outerhandle); - atomic_store_release(&sock->active, true); + sock->active = true; handle->sock->streamdns.sock = sock; @@ -673,7 +673,7 @@ streamdns_accept_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { NULL); nsock->read_timeout = initial; nsock->accepting = true; - atomic_store_release(&nsock->active, true); + nsock->active = true; isc__nmsocket_attach(listensock, &nsock->listener); isc_nmhandle_attach(handle, &nsock->outerhandle); @@ -752,12 +752,11 @@ isc_nm_listenstreamdns(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, } listener->result = result; - atomic_store_release(&listener->active, true); + listener->active = true; listener->listening = true; INSIST(listener->outer->streamdns.listener == NULL); listener->nchildren = listener->outer->nchildren; isc__nmsocket_barrier_init(listener); - atomic_init(&listener->rchildren, listener->outer->nchildren); isc__nmsocket_attach(listener, &listener->outer->streamdns.listener); *sockp = listener; @@ -939,7 +938,7 @@ streamdns_close_direct(isc_nmsocket_t *sock) { /* Further cleanup performed in isc__nm_streamdns_cleanup_data() */ isc_dnsstream_assembler_clear(sock->streamdns.input); sock->closed = true; - atomic_store_release(&sock->active, false); + sock->active = false; } void diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index a5bd12b44f..87949791b5 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -300,11 +300,11 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer, (void)isc__nm_socket_min_mtu(sock->fd, sa_family); (void)isc__nm_socket_tcp_maxseg(sock->fd, NM_MAXSEG); - atomic_store_release(&sock->active, true); + sock->active = true; result = tcp_connect_direct(sock, req); if (result != ISC_R_SUCCESS) { - atomic_store_release(&sock->active, false); + sock->active = false; isc__nm_tcp_close(sock); isc__nm_connectcb(sock, req, result, true); } @@ -436,8 +436,6 @@ done: result = isc_uverr2result(r); done_result: - atomic_fetch_add(&sock->parent->rchildren, 1); - if (result != ISC_R_SUCCESS) { sock->pquota = NULL; } @@ -505,7 +503,6 @@ isc_nm_listentcp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, sock = isc_mem_get(worker->mctx, sizeof(*sock)); isc__nmsocket_init(sock, worker, isc_nm_tcplistener, iface, NULL); - atomic_init(&sock->rchildren, 0); sock->nchildren = (workers == ISC_NM_LISTEN_ALL) ? (uint32_t)mgr->nloops : workers; children_size = sock->nchildren * sizeof(sock->children[0]); @@ -550,15 +547,15 @@ isc_nm_listentcp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, } if (result != ISC_R_SUCCESS) { - atomic_store_release(&sock->active, false); + sock->active = false; isc__nm_tcp_stoplistening(sock); isc_nmsocket_close(&sock); return (result); } - atomic_store_release(&sock->active, true); - REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren); + sock->active = true; + *sockp = sock; return (ISC_R_SUCCESS); } @@ -607,6 +604,7 @@ stop_tcp_child_job(void *arg) { REQUIRE(sock->type == isc_nm_tcpsocket); REQUIRE(!sock->closing); + sock->active = false; sock->closing = true; /* @@ -624,8 +622,6 @@ stop_tcp_child_job(void *arg) { isc__nmsocket_timer_stop(sock); uv_close(&sock->read_timer, NULL); - (void)atomic_fetch_sub(&sock->parent->rchildren, 1); - REQUIRE(!sock->worker->loop->paused); isc_barrier_wait(&sock->parent->stop_barrier); } @@ -634,7 +630,6 @@ static void stop_tcp_child(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); - atomic_store_release(&sock->active, false); if (sock->tid == 0) { stop_tcp_child_job(sock); } else { @@ -653,7 +648,7 @@ isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) { sock->closing = true; /* Mark the parent socket inactive */ - atomic_store_release(&sock->active, false); + sock->active = false; /* Stop all the other threads' children */ for (size_t i = 1; i < sock->nchildren; i++) { @@ -979,7 +974,7 @@ accept_connection(isc_nmsocket_t *ssock) { return (ISC_R_SUCCESS); failure: - atomic_store_release(&csock->active, false); + csock->active = false; failed_accept_cb(csock, result); @@ -1237,9 +1232,10 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { * If the socket is active, mark it inactive and * continue. If it isn't active, stop now. */ - if (!isc__nmsocket_deactivate(sock)) { + if (!sock->active) { return; } + sock->active = false; if (sock->accepting) { return; @@ -1261,11 +1257,15 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { return; } - /* - * Otherwise, we just send the socket to abyss... - */ + /* Destroy the non-listening socket */ if (sock->parent == NULL) { isc__nmsocket_prep_destroy(sock); + return; + } + + /* Destroy the listening socket if on the same loop */ + if (sock->tid == sock->parent->tid) { + isc__nmsocket_prep_destroy(sock->parent); } } diff --git a/lib/isc/netmgr/tlsstream.c b/lib/isc/netmgr/tlsstream.c index 949c27766d..75481e9d89 100644 --- a/lib/isc/netmgr/tlsstream.c +++ b/lib/isc/netmgr/tlsstream.c @@ -945,7 +945,7 @@ isc_nm_listentls(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, /* wait for listen result */ isc__nmsocket_attach(tlssock->outer, &tsock); tlssock->result = result; - atomic_store_release(&tlssock->active, true); + tlssock->active = true; INSIST(tlssock->outer->tlsstream.tlslistener == NULL); isc__nmsocket_attach(tlssock, &tlssock->outer->tlsstream.tlslistener); isc__nmsocket_detach(&tsock); @@ -953,7 +953,6 @@ isc_nm_listentls(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, tlssock->nchildren = tlssock->outer->nchildren; isc__nmsocket_barrier_init(tlssock); - atomic_init(&tlssock->rchildren, tlssock->nchildren); if (result == ISC_R_SUCCESS) { tlssock->listening = true; @@ -1087,7 +1086,7 @@ tls_close_direct(void *arg) { /* Further cleanup performed in isc__nm_tls_cleanup_data() */ sock->closed = true; - atomic_store_release(&sock->active, false); + sock->active = false; sock->tlsstream.state = TLS_CLOSED; } @@ -1191,7 +1190,7 @@ tcp_connected(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { } tlssock->peer = isc_nmhandle_peeraddr(handle); isc_nmhandle_attach(handle, &tlssock->outerhandle); - atomic_store_release(&tlssock->active, true); + tlssock->active = true; if (tlssock->tlsstream.client_sess_cache != NULL) { isc_tlsctx_client_session_cache_reuse_sockaddr( diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index efa1947aad..dc985dd187 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -172,7 +172,6 @@ start_udp_child_job(void *arg) { done: result = isc_uverr2result(r); - atomic_fetch_add(&sock->parent->rchildren, 1); sock->result = result; @@ -232,7 +231,6 @@ isc_nm_listenudp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, sock = isc_mem_get(worker->mctx, sizeof(isc_nmsocket_t)); isc__nmsocket_init(sock, worker, isc_nm_udplistener, iface, NULL); - atomic_init(&sock->rchildren, 0); sock->nchildren = (workers == ISC_NM_LISTEN_ALL) ? (uint32_t)mgr->nloops : workers; children_size = sock->nchildren * sizeof(sock->children[0]); @@ -275,15 +273,15 @@ isc_nm_listenudp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, } if (result != ISC_R_SUCCESS) { - atomic_store_release(&sock->active, false); + sock->active = false; isc__nm_udp_stoplistening(sock); isc_nmsocket_close(&sock); return (result); } - atomic_store_release(&sock->active, true); - INSIST(atomic_load(&sock->rchildren) == sock->nchildren); + sock->active = true; + *sockp = sock; return (ISC_R_SUCCESS); } @@ -392,11 +390,11 @@ isc_nm_routeconnect(isc_nm_t *mgr, isc_nm_cb_t cb, void *cbarg) { req->cbarg = cbarg; req->handle = isc__nmhandle_get(sock, NULL, NULL); - atomic_store_release(&sock->active, true); + sock->active = true; result = route_connect_direct(sock); if (result != ISC_R_SUCCESS) { - atomic_store_release(&sock->active, false); + sock->active = false; isc__nm_udp_close(sock); } @@ -424,9 +422,9 @@ stop_udp_child_job(void *arg) { REQUIRE(sock->tid == isc_tid()); REQUIRE(sock->parent != NULL); - isc__nm_udp_close(sock); + sock->active = false; - (void)atomic_fetch_sub(&sock->parent->rchildren, 1); + isc__nm_udp_close(sock); REQUIRE(!sock->worker->loop->paused); isc_barrier_wait(&sock->parent->stop_barrier); @@ -436,7 +434,6 @@ static void stop_udp_child(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); - atomic_store_release(&sock->active, false); if (sock->tid == 0) { stop_udp_child_job(sock); } else { @@ -455,7 +452,7 @@ isc__nm_udp_stoplistening(isc_nmsocket_t *sock) { sock->closing = true; /* Mark the parent socket inactive */ - atomic_store_release(&sock->active, false); + sock->active = false; /* Stop all the other threads' children */ for (size_t i = 1; i < sock->nchildren; i++) { @@ -824,12 +821,12 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer, req->local = *local; req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface); - atomic_store_release(&sock->active, true); + sock->active = true; sock->connecting = true; result = udp_connect_direct(sock, req); if (result != ISC_R_SUCCESS) { - atomic_store_release(&sock->active, false); + sock->active = false; isc__nm_failed_connect_cb(sock, req, result, true); isc__nmsocket_detach(&sock); return; @@ -1010,9 +1007,10 @@ isc__nm_udp_shutdown(isc_nmsocket_t *sock) { * If the socket is active, mark it inactive and * continue. If it isn't active, stop now. */ - if (!isc__nmsocket_deactivate(sock)) { + if (!sock->active) { return; } + sock->active = false; /* uv_udp_connect is synchronous, we can't be in connected state */ REQUIRE(!sock->connecting); @@ -1031,17 +1029,16 @@ isc__nm_udp_shutdown(isc_nmsocket_t *sock) { return; } - /* - * Ignore the listening sockets - */ - if (sock->parent != NULL) { + /* Destroy the non-listening socket */ + if (sock->parent == NULL) { + isc__nmsocket_prep_destroy(sock); return; } - /* - * Otherwise, we just send the socket to abyss... - */ - isc__nmsocket_prep_destroy(sock); + /* Destroy the listening socket if on the same loop */ + if (sock->tid == sock->parent->tid) { + isc__nmsocket_prep_destroy(sock->parent); + } } static void From 2846888c573fcc610cdf71bcdd5bb6f92ffaf499 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Sur=C3=BD?= Date: Fri, 24 Mar 2023 15:32:02 +0100 Subject: [PATCH 3/3] Attach the accept "client" socket to .listener member of the socket When accepting a TCP connection in the higher layers (tlsstream, streamdns, and http) attach to the socket the connection was accepted on, and use this socket instead of the parent listening socket. This has an advantage - accessing the sock->listener now doesn't break the thread boundaries, so we can properly check whether the socket is being closed without requiring .closing member to be atomic_bool. --- lib/isc/netmgr/http.c | 58 ++++++------------------------------- lib/isc/netmgr/netmgr-int.h | 8 ++--- lib/isc/netmgr/netmgr.c | 31 +++++--------------- lib/isc/netmgr/streamdns.c | 24 ++++++--------- lib/isc/netmgr/tcp.c | 3 -- lib/isc/netmgr/tlsstream.c | 54 +++++++++++++--------------------- lib/isc/netmgr/udp.c | 5 ---- 7 files changed, 49 insertions(+), 134 deletions(-) diff --git a/lib/isc/netmgr/http.c b/lib/isc/netmgr/http.c index 51ef77dbe4..10e07c8b7a 100644 --- a/lib/isc/netmgr/http.c +++ b/lib/isc/netmgr/http.c @@ -1688,11 +1688,9 @@ find_server_request_handler(const char *request_path, REQUIRE(VALID_NMSOCK(serversocket)); - if (serversocket->listening) { - handler = http_endpoints_find( - request_path, - http_get_listener_endpoints(serversocket, tid)); - } + handler = http_endpoints_find( + request_path, http_get_listener_endpoints(serversocket, tid)); + return (handler); } @@ -2430,57 +2428,31 @@ http_transpost_tcp_nodelay(isc_nmhandle_t *transphandle) { static isc_result_t httplisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { - isc_nmsocket_t *httplistensock = (isc_nmsocket_t *)cbarg; + isc_nmsocket_t *httpserver = (isc_nmsocket_t *)cbarg; isc_nm_http_session_t *session = NULL; - isc_nmsocket_t *listener = NULL, *httpserver = NULL; REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); - if (handle->sock->type == isc_nm_tlssocket) { - REQUIRE(VALID_NMSOCK(handle->sock->listener)); - listener = handle->sock->listener; - httpserver = listener->h2.httpserver; - } else { - REQUIRE(VALID_NMSOCK(handle->sock->server)); - listener = handle->sock->server; - REQUIRE(VALID_NMSOCK(listener->parent)); - httpserver = listener->parent->h2.httpserver; - } - - /* - * NOTE: HTTP listener socket might be destroyed by the time this - * function gets invoked, so we need to do extra sanity checks to - * detect this case. - */ if (isc__nm_closing(handle->sock->worker)) { return (ISC_R_SHUTTINGDOWN); - } else if (isc__nmsocket_closing(handle->sock) || httpserver == NULL) { - return (ISC_R_CANCELED); - } - - if (result != ISC_R_SUCCESS) { - /* XXXWPK do nothing? */ + } else if (result != ISC_R_SUCCESS) { return (result); } - REQUIRE(VALID_NMSOCK(httplistensock)); - INSIST(httplistensock == httpserver); - - if (httplistensock->closing) { - return (ISC_R_CANCELED); - } + REQUIRE(VALID_NMSOCK(httpserver)); + REQUIRE(httpserver->type == isc_nm_httplistener); http_transpost_tcp_nodelay(handle); new_session(handle->sock->worker->mctx, NULL, &session); session->max_concurrent_streams = - atomic_load_relaxed(&httplistensock->h2.max_concurrent_streams); + atomic_load_relaxed(&httpserver->h2.max_concurrent_streams); initialize_nghttp2_server_session(session); handle->sock->h2.session = session; isc_nmhandle_attach(handle, &session->handle); - isc__nmsocket_attach(httplistensock, &session->serversocket); + isc__nmsocket_attach(httpserver, &session->serversocket); server_send_connection_header(session); /* TODO H2 */ @@ -2528,14 +2500,9 @@ isc_nm_listenhttp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, return (result); } - isc__nmsocket_attach(sock, &sock->outer->h2.httpserver); - sock->nchildren = sock->outer->nchildren; sock->fd = (uv_os_sock_t)-1; - isc__nmsocket_barrier_init(sock); - - sock->listening = true; *sockp = sock; return (ISC_R_SUCCESS); } @@ -3204,13 +3171,6 @@ isc__nm_http_initsocket(isc_nmsocket_t *sock) { void isc__nm_http_cleanup_data(isc_nmsocket_t *sock) { - if ((sock->type == isc_nm_tcplistener || - sock->type == isc_nm_tlslistener) && - sock->h2.httpserver != NULL) - { - isc__nmsocket_detach(&sock->h2.httpserver); - } - if (sock->type == isc_nm_httplistener || sock->type == isc_nm_httpsocket) { diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index d4028090db..a852fefb42 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -430,8 +430,6 @@ typedef struct isc_nmsocket_h2 { int32_t stream_id; isc_nm_http_session_t *session; - isc_nmsocket_t *httpserver; - /* maximum concurrent streams (server-side) */ atomic_uint_fast32_t max_concurrent_streams; @@ -486,8 +484,6 @@ struct isc_nmsocket { /*% Parent socket for multithreaded listeners */ isc_nmsocket_t *parent; - /*% Listener socket this connection was accepted on */ - isc_nmsocket_t *listener; /*% TLS stuff */ struct tlsstream { @@ -562,6 +558,9 @@ struct isc_nmsocket { /*% server socket for connections */ isc_nmsocket_t *server; + /*% client socket for connections */ + isc_nmsocket_t *listener; + /*% Child sockets for multi-socket setups */ isc_nmsocket_t *children; uint_fast32_t nchildren; @@ -597,7 +596,6 @@ struct isc_nmsocket { */ bool closing; bool closed; - bool listening; bool connecting; bool connected; bool accepting; diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 36972df7e6..162d5261dd 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -942,7 +942,7 @@ nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) { handle->dofree(handle->opaque); } - isc_mem_put(sock->worker->mctx, handle, sizeof(isc_nmhandle_t)); + isc_mem_put(sock->worker->mctx, handle, sizeof(*handle)); } static void @@ -1438,7 +1438,7 @@ isc__nm_closing(isc__networker_t *worker) { bool isc__nmsocket_closing(isc_nmsocket_t *sock) { - return (!isc__nmsocket_active(sock) || sock->closing || + return (!sock->active || sock->closing || isc__nm_closing(sock->worker) || (sock->server != NULL && !isc__nmsocket_active(sock->server))); } @@ -1746,42 +1746,27 @@ isc_nm_stoplistening(isc_nmsocket_t *sock) { } } -static void -nmsocket_stop_cb(void *arg) { - isc_nmsocket_t *listener = arg; - - isc_barrier_wait(&listener->stop_barrier); -} - void isc__nmsocket_stop(isc_nmsocket_t *listener) { REQUIRE(VALID_NMSOCK(listener)); REQUIRE(listener->tid == isc_tid()); REQUIRE(listener->tid == 0); - REQUIRE(listener->listening); + REQUIRE(listener->type == isc_nm_httplistener || + listener->type == isc_nm_tlslistener || + listener->type == isc_nm_streamdnslistener); REQUIRE(!listener->closing); listener->closing = true; - for (size_t i = 1; i < listener->nchildren; i++) { - isc__networker_t *worker = - &listener->worker->netmgr->workers[i]; - isc_async_run(worker->loop, nmsocket_stop_cb, listener); - } - - nmsocket_stop_cb(listener); - - listener->listening = false; + REQUIRE(listener->outer != NULL); + isc_nm_stoplistening(listener->outer); listener->accept_cb = NULL; listener->accept_cbarg = NULL; listener->recv_cb = NULL; listener->recv_cbarg = NULL; - if (listener->outer != NULL) { - isc_nm_stoplistening(listener->outer); - isc__nmsocket_detach(&listener->outer); - } + isc__nmsocket_detach(&listener->outer); listener->closed = true; } diff --git a/lib/isc/netmgr/streamdns.c b/lib/isc/netmgr/streamdns.c index 374d9712de..fc996bd7a6 100644 --- a/lib/isc/netmgr/streamdns.c +++ b/lib/isc/netmgr/streamdns.c @@ -642,25 +642,21 @@ streamdns_accept_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { isc_nmsocket_t *listensock = (isc_nmsocket_t *)cbarg; isc_nmsocket_t *nsock; isc_sockaddr_t iface; - int tid; + int tid = isc_tid(); uint32_t initial = 0; - if (result != ISC_R_SUCCESS) { - return (result); - } - - INSIST(VALID_NMHANDLE(handle)); - INSIST(VALID_NMSOCK(handle->sock)); - INSIST(VALID_NMSOCK(listensock)); - INSIST(listensock->type == isc_nm_streamdnslistener); + REQUIRE(VALID_NMHANDLE(handle)); + REQUIRE(VALID_NMSOCK(handle->sock)); if (isc__nm_closing(handle->sock->worker)) { return (ISC_R_SHUTTINGDOWN); - } else if (isc__nmsocket_closing(handle->sock) || listensock->closing) { - return (ISC_R_CANCELED); + } else if (result != ISC_R_SUCCESS) { + return (result); } - tid = isc_tid(); + REQUIRE(VALID_NMSOCK(listensock)); + REQUIRE(listensock->type == isc_nm_streamdnslistener); + iface = isc_nmhandle_localaddr(handle); nsock = streamdns_sock_new(handle->sock->worker, isc_nm_streamdnssocket, &iface, true); @@ -675,7 +671,7 @@ streamdns_accept_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { nsock->accepting = true; nsock->active = true; - isc__nmsocket_attach(listensock, &nsock->listener); + isc__nmsocket_attach(handle->sock, &nsock->listener); isc_nmhandle_attach(handle, &nsock->outerhandle); handle->sock->streamdns.sock = nsock; @@ -753,10 +749,8 @@ isc_nm_listenstreamdns(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, listener->result = result; listener->active = true; - listener->listening = true; INSIST(listener->outer->streamdns.listener == NULL); listener->nchildren = listener->outer->nchildren; - isc__nmsocket_barrier_init(listener); isc__nmsocket_attach(listener, &listener->outer->streamdns.listener); *sockp = listener; diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 87949791b5..7163595399 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -415,8 +415,6 @@ start_tcp_child_job(void *arg) { goto done; } - sock->listening = true; - if (sock->tid == 0) { r = uv_tcp_getsockname(&sock->uv_handle.tcp, (struct sockaddr *)&ss, @@ -675,7 +673,6 @@ tcp_stop_cb(uv_handle_t *handle) { REQUIRE(!sock->closed); sock->closed = true; - sock->listening = false; isc__nm_incstats(sock, STATID_CLOSE); diff --git a/lib/isc/netmgr/tlsstream.c b/lib/isc/netmgr/tlsstream.c index 75481e9d89..6432e493ab 100644 --- a/lib/isc/netmgr/tlsstream.c +++ b/lib/isc/netmgr/tlsstream.c @@ -111,8 +111,6 @@ inactive(isc_nmsocket_t *sock) { sock->outerhandle == NULL || !isc__nmsocket_active(sock->outerhandle->sock) || sock->outerhandle->sock->closing || - (sock->listener != NULL && - !isc__nmsocket_active(sock->listener)) || isc__nm_closing(sock->worker)); } @@ -384,53 +382,42 @@ tls_process_outgoing(isc_nmsocket_t *sock, bool finish, static int tls_try_handshake(isc_nmsocket_t *sock, isc_result_t *presult) { - int rv = 0; - isc_nmhandle_t *tlshandle = NULL; - REQUIRE(sock->tlsstream.state == TLS_HANDSHAKE); if (SSL_is_init_finished(sock->tlsstream.tls) == 1) { return (0); } - rv = SSL_do_handshake(sock->tlsstream.tls); + int rv = SSL_do_handshake(sock->tlsstream.tls); if (rv == 1) { + isc_nmhandle_t *tlshandle = NULL; isc_result_t result = ISC_R_SUCCESS; + + REQUIRE(sock->statichandle == NULL); INSIST(SSL_is_init_finished(sock->tlsstream.tls) == 1); - INSIST(sock->statichandle == NULL); + isc__nmsocket_log_tls_session_reuse(sock, sock->tlsstream.tls); tlshandle = isc__nmhandle_get(sock, &sock->peer, &sock->iface); tls_read_stop(sock); + + if (isc__nm_closing(sock->worker)) { + result = ISC_R_SHUTTINGDOWN; + } + if (sock->tlsstream.server) { /* - * We need to check for 'sock->listener->closing' to - * make sure that we are not breaking the contract by - * calling an accept callback after the listener socket - * was shot down. Also, in this case the accept callback - * can be 'NULL'. That can happen as calling the accept - * callback in TLS is deferred until handshake is done. - * There is a possibility for that to happen *after* the - * underlying TCP connection was accepted. That is, a - * situation possible when the underlying TCP connection - * was accepted, handshake related data transmission - * took place, but in the middle of that the socket is - * being shot down before the TLS accept callback could - * have been called. + * The listening sockets are now closed from outer + * to inner order, which means that this function + * will never be called when the outer socket has + * stopped listening. * * Also see 'isc__nmsocket_stop()' - the function used * to shut down the listening TLS socket - for more * details. */ - if (isc__nm_closing(sock->worker)) { - result = ISC_R_SHUTTINGDOWN; - } else if (isc__nmsocket_closing(sock) || - sock->listener->closing) - { - result = ISC_R_CANCELED; - } else { - result = sock->listener->accept_cb( - tlshandle, result, - sock->listener->accept_cbarg); + if (result == ISC_R_SUCCESS) { + result = sock->accept_cb(tlshandle, result, + sock->accept_cbarg); } } else { tls_call_connect_cb(sock, tlshandle, result); @@ -873,7 +860,9 @@ tlslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { return (ISC_R_TLSERROR); } - isc__nmsocket_attach(tlslistensock, &tlssock->listener); + tlssock->accept_cb = tlslistensock->accept_cb; + tlssock->accept_cbarg = tlslistensock->accept_cbarg; + isc__nmsocket_attach(handle->sock, &tlssock->listener); isc_nmhandle_attach(handle, &tlssock->outerhandle); tlssock->peer = handle->sock->peer; tlssock->read_timeout = @@ -952,10 +941,7 @@ isc_nm_listentls(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, INSIST(result != ISC_R_UNSET); tlssock->nchildren = tlssock->outer->nchildren; - isc__nmsocket_barrier_init(tlssock); - if (result == ISC_R_SUCCESS) { - tlssock->listening = true; *sockp = tlssock; } diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index dc985dd187..5fea2e8f33 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -168,8 +168,6 @@ start_udp_child_job(void *arg) { } sock->reading = true; - sock->listening = true; - done: result = isc_uverr2result(r); @@ -951,9 +949,6 @@ udp_close_cb(uv_handle_t *handle) { isc__nmsocket_detach(&sock->server); } - /* All sockets */ - sock->listening = false; - if (sock->parent != NULL) { /* listening socket (listen) */ isc__nmsocket_detach(&sock);