Refactor the use of atomics in netmgr

Now that everything runs on their own loop and we don't cross the thread
boundaries (with few exceptions), most of the atomic_bool variables used
to track the socket state have been unatomicized because they are always
accessed from the matching thread.

The remaining few have been relaxed: a) the sock->active is now using
acquire/release memory ordering; b) the various global limits are now
using relaxed memory ordering - we don't really care about the
synchronization for those.
This commit is contained in:
Ondřej Surý 2023-03-24 13:37:19 +01:00
parent ea8e00e7a5
commit e1a4572fd6
No known key found for this signature in database
GPG key ID: 2820F37E873DEA41
7 changed files with 243 additions and 281 deletions

View file

@ -1479,7 +1479,7 @@ isc_nm_httpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
sock->connect_timeout = timeout;
sock->connect_cb = cb;
sock->connect_cbarg = cbarg;
atomic_init(&sock->client, true);
sock->client = true;
if (isc__nm_closing(worker)) {
isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock);
@ -1609,7 +1609,7 @@ isc__nm_http_request(isc_nmhandle_t *handle, isc_region_t *region,
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
REQUIRE(handle->sock->tid == isc_tid());
REQUIRE(atomic_load(&handle->sock->client));
REQUIRE(handle->sock->client);
REQUIRE(cb != NULL);
@ -1688,7 +1688,7 @@ find_server_request_handler(const char *request_path,
REQUIRE(VALID_NMSOCK(serversocket));
if (atomic_load(&serversocket->listening)) {
if (serversocket->listening) {
handler = http_endpoints_find(
request_path,
http_get_listener_endpoints(serversocket, tid));
@ -2069,7 +2069,7 @@ isc__nm_http_bad_request(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMSOCK(handle->sock));
sock = handle->sock;
REQUIRE(sock->type == isc_nm_httpsocket);
REQUIRE(!atomic_load(&sock->client));
REQUIRE(!sock->client);
REQUIRE(VALID_HTTP2_SESSION(sock->h2.session));
(void)server_send_error_response(ISC_HTTP_ERROR_BAD_REQUEST,
@ -2467,7 +2467,7 @@ httplisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
REQUIRE(VALID_NMSOCK(httplistensock));
INSIST(httplistensock == httpserver);
if (atomic_load(&httplistensock->closing)) {
if (httplistensock->closing) {
return (ISC_R_CANCELED);
}
@ -2475,7 +2475,7 @@ httplisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
new_session(handle->sock->worker->mctx, NULL, &session);
session->max_concurrent_streams =
atomic_load(&httplistensock->h2.max_concurrent_streams);
atomic_load_relaxed(&httplistensock->h2.max_concurrent_streams);
initialize_nghttp2_server_session(session);
handle->sock->h2.session = session;
@ -2523,7 +2523,7 @@ isc_nm_listenhttp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
}
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->closed, true);
sock->closed = true;
isc__nmsocket_detach(&sock);
return (result);
}
@ -2536,7 +2536,7 @@ isc_nm_listenhttp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
isc__nmsocket_barrier_init(sock);
atomic_init(&sock->rchildren, sock->nchildren);
atomic_store(&sock->listening, true);
sock->listening = true;
*sockp = sock;
return (ISC_R_SUCCESS);
}
@ -2711,8 +2711,8 @@ http_close_direct(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
atomic_store(&sock->closed, true);
atomic_store(&sock->active, false);
sock->closed = true;
atomic_store_release(&sock->active, false);
session = sock->h2.session;
if (session != NULL && session->sending == 0 && !session->reading) {
@ -2742,12 +2742,9 @@ isc__nm_http_close(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_httpsocket);
REQUIRE(!isc__nmsocket_active(sock));
REQUIRE(!sock->closing);
if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
true))
{
return;
}
sock->closing = true;
if (sock->h2.session != NULL && sock->h2.session->closed &&
sock->tid == isc_tid())
@ -2843,8 +2840,8 @@ server_call_failed_read_cb(isc_result_t result,
isc_nmsocket_h2_t *next = ISC_LIST_NEXT(h2data, link);
ISC_LIST_DEQUEUE(session->sstreams, h2data, link);
/* Cleanup socket in place */
atomic_store(&h2data->psock->active, false);
atomic_store(&h2data->psock->closed, true);
atomic_store_release(&h2data->psock->active, false);
h2data->psock->closed = true;
isc__nmsocket_detach(&h2data->psock);
h2data = next;
@ -2957,7 +2954,7 @@ isc__nm_http_set_max_streams(isc_nmsocket_t *listener,
max_streams = max_concurrent_streams;
}
atomic_store(&listener->h2.max_concurrent_streams, max_streams);
atomic_store_relaxed(&listener->h2.max_concurrent_streams, max_streams);
}
typedef struct http_endpoints_data {

View file

@ -588,9 +588,12 @@ struct isc_nmsocket {
* Socket is active if it's listening, working, etc. If it's
* closing, then it doesn't make a sense, for example, to
* push handles or reqs for reuse.
*
* We might be accessing sock->parent->active from a different
* thread, so .active has to be atomic.
*/
atomic_bool active;
atomic_bool destroying;
bool destroying;
bool route_sock;
@ -600,20 +603,20 @@ struct isc_nmsocket {
* If active==false but closed==false, that means the socket
* is closing.
*/
atomic_bool closing;
atomic_bool closed;
atomic_bool listening;
atomic_bool connecting;
atomic_bool connected;
atomic_bool accepting;
bool closing;
bool closed;
bool listening;
bool connecting;
bool connected;
bool accepting;
bool reading;
atomic_bool timedout;
bool timedout;
isc_refcount_t references;
/*%
* Established an outgoing connection, as client not server.
*/
atomic_bool client;
bool client;
/*%
* The socket is processing read callback, this is guard to not read
@ -625,7 +628,7 @@ struct isc_nmsocket {
* A TCP or TCPDNS socket has been set to use the keepalive
* timeout instead of the default idle timeout.
*/
atomic_bool keepalive;
bool keepalive;
/*%
* 'spare' handles for that can be reused to avoid allocations, for UDP.

View file

@ -147,8 +147,8 @@ static void
netmgr_teardown(void *arg) {
isc_nm_t *netmgr = (void *)arg;
if (atomic_compare_exchange_strong(&netmgr->shuttingdown,
&(bool){ false }, true))
if (atomic_compare_exchange_strong_acq_rel(&netmgr->shuttingdown,
&(bool){ false }, true))
{
isc__netmgr_log(netmgr, ISC_LOG_DEBUG(1),
"Shutting down network manager");
@ -318,7 +318,7 @@ void
isc_nm_maxudp(isc_nm_t *mgr, uint32_t maxudp) {
REQUIRE(VALID_NM(mgr));
atomic_store(&mgr->maxudp, maxudp);
atomic_store_relaxed(&mgr->maxudp, maxudp);
}
void
@ -349,10 +349,10 @@ isc_nm_settimeouts(isc_nm_t *mgr, uint32_t init, uint32_t idle,
uint32_t keepalive, uint32_t advertised) {
REQUIRE(VALID_NM(mgr));
atomic_store(&mgr->init, init);
atomic_store(&mgr->idle, idle);
atomic_store(&mgr->keepalive, keepalive);
atomic_store(&mgr->advertised, advertised);
atomic_store_relaxed(&mgr->init, init);
atomic_store_relaxed(&mgr->idle, idle);
atomic_store_relaxed(&mgr->keepalive, keepalive);
atomic_store_relaxed(&mgr->advertised, advertised);
}
void
@ -360,10 +360,10 @@ isc_nm_setnetbuffers(isc_nm_t *mgr, int32_t recv_tcp, int32_t send_tcp,
int32_t recv_udp, int32_t send_udp) {
REQUIRE(VALID_NM(mgr));
atomic_store(&mgr->recv_tcp_buffer_size, recv_tcp);
atomic_store(&mgr->send_tcp_buffer_size, send_tcp);
atomic_store(&mgr->recv_udp_buffer_size, recv_udp);
atomic_store(&mgr->send_udp_buffer_size, send_udp);
atomic_store_relaxed(&mgr->recv_tcp_buffer_size, recv_tcp);
atomic_store_relaxed(&mgr->send_tcp_buffer_size, send_tcp);
atomic_store_relaxed(&mgr->recv_udp_buffer_size, recv_udp);
atomic_store_relaxed(&mgr->send_udp_buffer_size, send_udp);
}
bool
@ -390,19 +390,19 @@ isc_nm_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle,
REQUIRE(VALID_NM(mgr));
if (initial != NULL) {
*initial = atomic_load(&mgr->init);
*initial = atomic_load_relaxed(&mgr->init);
}
if (idle != NULL) {
*idle = atomic_load(&mgr->idle);
*idle = atomic_load_relaxed(&mgr->idle);
}
if (keepalive != NULL) {
*keepalive = atomic_load(&mgr->keepalive);
*keepalive = atomic_load_relaxed(&mgr->keepalive);
}
if (advertised != NULL) {
*advertised = atomic_load(&mgr->advertised);
*advertised = atomic_load_relaxed(&mgr->advertised);
}
}
@ -410,10 +410,10 @@ bool
isc__nmsocket_active(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
if (sock->parent != NULL) {
return (atomic_load(&sock->parent->active));
return (atomic_load_acquire(&sock->parent->active));
}
return (atomic_load(&sock->active));
return (atomic_load_acquire(&sock->active));
}
bool
@ -421,12 +421,12 @@ isc__nmsocket_deactivate(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
if (sock->parent != NULL) {
return (atomic_compare_exchange_strong(&sock->parent->active,
&(bool){ true }, false));
return (atomic_compare_exchange_strong_acq_rel(
&sock->parent->active, &(bool){ true }, false));
}
return (atomic_compare_exchange_strong(&sock->active, &(bool){ true },
false));
return (atomic_compare_exchange_strong_acq_rel(&sock->active,
&(bool){ true }, false));
}
void
@ -469,7 +469,8 @@ nmsocket_cleanup(void *arg) {
isc__nm_decstats(sock, STATID_ACTIVE);
atomic_store(&sock->destroying, true);
REQUIRE(!sock->destroying);
sock->destroying = true;
if (sock->parent == NULL && sock->children != NULL) {
/*
@ -477,7 +478,6 @@ nmsocket_cleanup(void *arg) {
* so we can clean up and free the children.
*/
for (size_t i = 0; i < sock->nchildren; i++) {
REQUIRE(!atomic_load(&sock->children[i].destroying));
isc_refcount_decrementz(&sock->children[i].references);
nmsocket_cleanup(&sock->children[i]);
}
@ -570,9 +570,18 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG) {
return;
}
if (atomic_load(&sock->active) || atomic_load(&sock->destroying) ||
!atomic_load(&sock->closed) || atomic_load(&sock->references) != 0)
{
REQUIRE(!sock->destroying);
REQUIRE(!atomic_load_acquire(&sock->active));
if (!sock->closed) {
return;
}
if (isc_refcount_current(&sock->references) != 0) {
/*
* Using such check is valid only if we don't use
* isc_refcount_increment0() on the same variable.
*/
return;
}
@ -588,7 +597,6 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG) {
return;
}
atomic_store(&sock->destroying, true);
if (sock->tid == isc_tid()) {
nmsocket_cleanup(sock);
} else {
@ -618,7 +626,7 @@ isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) {
* destroying the socket, but we have to wait for all the inflight
* handles to finish first.
*/
atomic_store(&sock->active, false);
atomic_store_release(&sock->active, false);
/*
* If the socket has children, they'll need to be marked inactive
@ -626,7 +634,7 @@ isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) {
*/
if (sock->children != NULL) {
for (size_t i = 0; i < sock->nchildren; i++) {
atomic_store(&sock->children[i].active, false);
atomic_store_relaxed(&sock->children[i].active, false);
}
}
@ -636,7 +644,7 @@ isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) {
*
* If it's a regular socket we may need to close it.
*/
if (!atomic_load(&sock->closing) && !atomic_load(&sock->closed)) {
if (!sock->closing && !sock->closed) {
switch (sock->type) {
case isc_nm_udpsocket:
isc__nm_udp_close(sock);
@ -793,15 +801,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker,
sock, isc_refcount_current(&sock->references));
atomic_init(&sock->active, true);
atomic_init(&sock->closing, false);
atomic_init(&sock->listening, 0);
atomic_init(&sock->closed, 0);
atomic_init(&sock->destroying, 0);
atomic_init(&sock->client, 0);
atomic_init(&sock->connecting, false);
atomic_init(&sock->keepalive, false);
atomic_init(&sock->connected, false);
atomic_init(&sock->timedout, false);
atomic_init(&sock->active_child_connections, 0);
@ -904,7 +903,7 @@ isc___nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t const *peer,
switch (sock->type) {
case isc_nm_udpsocket:
if (!atomic_load(&sock->client)) {
if (!sock->client) {
break;
}
FALLTHROUGH;
@ -1005,7 +1004,7 @@ nmhandle_destroy(isc_nmhandle_t *handle) {
#if defined(__SANITIZE_ADDRESS__) || defined(__SANITIZE_THREAD__)
nmhandle_free(sock, handle);
#else
if (atomic_load(&sock->active)) {
if (atomic_load_acquire(&sock->active)) {
ISC_LIST_APPEND(sock->inactive_handles, handle, inactive_link);
} else {
nmhandle_free(sock, handle);
@ -1078,7 +1077,7 @@ isc__nm_failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
void
isc__nm_failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
REQUIRE(atomic_load(&sock->accepting));
REQUIRE(sock->accepting);
REQUIRE(sock->server);
/*
@ -1092,7 +1091,7 @@ isc__nm_failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
isc__nmsocket_detach(&sock->server);
atomic_store(&sock->accepting, false);
sock->accepting = false;
switch (eresult) {
case ISC_R_NOTCONNECTED:
@ -1112,15 +1111,15 @@ isc__nm_failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
REQUIRE(VALID_UVREQ(req));
REQUIRE(sock->tid == isc_tid());
REQUIRE(req->cb.connect != NULL);
REQUIRE(sock->connecting);
sock->connecting = false;
isc__nm_incstats(sock, STATID_CONNECTFAIL);
isc__nmsocket_timer_stop(sock);
uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock);
atomic_compare_exchange_enforced(&sock->connecting, &(bool){ true },
false);
isc__nmsocket_clearcb(sock);
isc__nm_connectcb(sock, req, eresult, async);
@ -1157,17 +1156,17 @@ isc__nmsocket_connecttimeout_cb(uv_timer_t *timer) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->connecting));
REQUIRE(VALID_UVREQ(req));
REQUIRE(VALID_NMHANDLE(req->handle));
REQUIRE(sock->connecting);
isc__nmsocket_timer_stop(sock);
/*
* Mark the connection as timed out and shutdown the socket.
*/
atomic_compare_exchange_enforced(&sock->timedout, &(bool){ false },
true);
REQUIRE(!sock->timedout);
sock->timedout = true;
isc__nmsocket_clearcb(sock);
isc__nmsocket_shutdown(sock);
}
@ -1221,7 +1220,7 @@ isc__nmsocket_readtimeout_cb(uv_timer_t *timer) {
REQUIRE(sock->tid == isc_tid());
REQUIRE(sock->reading);
if (atomic_load(&sock->client)) {
if (sock->client) {
uv_timer_stop(timer);
sock->recv_read = false;
@ -1259,7 +1258,7 @@ isc__nmsocket_timer_restart(isc_nmsocket_t *sock) {
return;
}
if (atomic_load(&sock->connecting)) {
if (sock->connecting) {
int r;
if (sock->connect_timeout == 0) {
@ -1352,7 +1351,7 @@ isc__nm_get_read_req(isc_nmsocket_t *sock, isc_sockaddr_t *sockaddr) {
isc_nmhandle_attach(sock->recv_handle, &req->handle);
break;
default:
if (atomic_load(&sock->client) && sock->statichandle != NULL) {
if (sock->client && sock->statichandle != NULL) {
isc_nmhandle_attach(sock->statichandle, &req->handle);
} else {
req->handle = isc__nmhandle_get(sock, sockaddr, NULL);
@ -1463,7 +1462,7 @@ isc__nm_closing(isc__networker_t *worker) {
bool
isc__nmsocket_closing(isc_nmsocket_t *sock) {
return (!isc__nmsocket_active(sock) || atomic_load(&sock->closing) ||
return (!isc__nmsocket_active(sock) || sock->closing ||
isc__nm_closing(sock->worker) ||
(sock->server != NULL && !isc__nmsocket_active(sock->server)));
}
@ -1528,13 +1527,17 @@ isc_nmhandle_keepalive(isc_nmhandle_t *handle, bool value) {
sock = handle->sock;
netmgr = sock->worker->netmgr;
REQUIRE(sock->tid == isc_tid());
switch (sock->type) {
case isc_nm_tcpsocket:
atomic_store(&sock->keepalive, value);
sock->read_timeout = value ? atomic_load(&netmgr->keepalive)
: atomic_load(&netmgr->idle);
sock->write_timeout = value ? atomic_load(&netmgr->keepalive)
: atomic_load(&netmgr->idle);
sock->keepalive = value;
sock->read_timeout =
value ? atomic_load_relaxed(&netmgr->keepalive)
: atomic_load_relaxed(&netmgr->idle);
sock->write_timeout =
value ? atomic_load_relaxed(&netmgr->keepalive)
: atomic_load_relaxed(&netmgr->idle);
break;
case isc_nm_streamdnssocket:
isc__nmhandle_streamdns_keepalive(handle, value);
@ -1780,12 +1783,10 @@ isc__nmsocket_stop(isc_nmsocket_t *listener) {
REQUIRE(VALID_NMSOCK(listener));
REQUIRE(listener->tid == isc_tid());
REQUIRE(listener->tid == 0);
REQUIRE(listener->listening);
REQUIRE(!listener->closing);
if (!atomic_compare_exchange_strong(&listener->closing,
&(bool){ false }, true))
{
UNREACHABLE();
}
listener->closing = true;
for (size_t i = 1; i < listener->nchildren; i++) {
isc__networker_t *worker =
@ -1796,11 +1797,7 @@ isc__nmsocket_stop(isc_nmsocket_t *listener) {
nmsocket_stop_cb(listener);
INSIST(atomic_load(&listener->rchildren) == 0);
if (!atomic_compare_exchange_strong(&listener->listening,
&(bool){ true }, false))
{
UNREACHABLE();
}
listener->listening = false;
listener->accept_cb = NULL;
listener->accept_cbarg = NULL;
@ -1812,7 +1809,7 @@ isc__nmsocket_stop(isc_nmsocket_t *listener) {
isc__nmsocket_detach(&listener->outer);
}
atomic_store(&listener->closed, true);
listener->closed = true;
}
void
@ -2186,7 +2183,7 @@ isc_nm_set_maxage(isc_nmhandle_t *handle, const uint32_t ttl) {
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
REQUIRE(!atomic_load(&handle->sock->client));
REQUIRE(!handle->sock->client);
#if !HAVE_LIBNGHTTP2
UNUSED(ttl);
@ -2569,7 +2566,7 @@ nmsocket_dump(isc_nmsocket_t *sock) {
fprintf(stderr, "\n=================\n");
fprintf(stderr, "Active %s socket %p, type %s, refs %" PRIuFAST32 "\n",
atomic_load(&sock->client) ? "client" : "server", sock,
sock->client ? "client" : "server", sock,
nmsocket_type_totext(sock->type),
isc_refcount_current(&sock->references));
fprintf(stderr,
@ -2577,11 +2574,11 @@ nmsocket_dump(isc_nmsocket_t *sock) {
"%p\n",
sock->parent, sock->listener, sock->server, sock->statichandle);
fprintf(stderr, "Flags:%s%s%s%s%s\n",
atomic_load(&sock->active) ? " active" : "",
atomic_load(&sock->closing) ? " closing" : "",
atomic_load(&sock->destroying) ? " destroying" : "",
atomic_load(&sock->connecting) ? " connecting" : "",
atomic_load(&sock->accepting) ? " accepting" : "");
atomic_load_acquire(&sock->active) ? " active" : "",
sock->closing ? " closing" : "",
sock->destroying ? " destroying" : "",
sock->connecting ? " connecting" : "",
sock->accepting ? " accepting" : "");
fprintf(stderr, "Created by:\n");
isc_backtrace_symbols_fd(sock->backtrace, sock->backtrace_size,
STDERR_FILENO);

View file

@ -124,18 +124,16 @@ streamdns_on_complete_dnsmessage(isc_dnsstream_assembler_t *dnsasm,
isc_region_t *restrict region,
isc_nmsocket_t *sock,
isc_nmhandle_t *transphandle) {
const bool client = atomic_load(&sock->client);
const bool last_datum = isc_dnsstream_assembler_remaininglength(
dnsasm) == region->length;
/*
* Stop after one message if a client
* connection.
* Stop after one message if a client connection.
*/
bool stop = client;
bool stop = sock->client;
sock->recv_read = false;
if (sock->recv_cb != NULL) {
if (!client) {
if (!sock->client) {
/*
* We must allocate a new handle object, as we
* need to ensure that after processing of this
@ -259,8 +257,8 @@ streamdns_sock_new(isc__networker_t *worker, const isc_nmsocket_type_t type,
uint32_t initial = 0;
isc_nm_gettimeouts(worker->netmgr, &initial, NULL, NULL, NULL);
sock->read_timeout = initial;
atomic_init(&sock->client, !is_server);
atomic_init(&sock->connecting, !is_server);
sock->client = !is_server;
sock->connecting = !is_server;
sock->streamdns.input = isc_dnsstream_assembler_new(
sock->worker->mctx, streamdns_on_dnsmessage_data_cb,
sock);
@ -272,7 +270,7 @@ streamdns_sock_new(isc__networker_t *worker, const isc_nmsocket_type_t type,
static void
streamdns_call_connect_cb(isc_nmsocket_t *sock, isc_nmhandle_t *handle,
const isc_result_t result) {
atomic_store(&sock->connecting, false);
sock->connecting = false;
if (sock->connect_cb == NULL) {
return;
}
@ -280,7 +278,7 @@ streamdns_call_connect_cb(isc_nmsocket_t *sock, isc_nmhandle_t *handle,
if (result != ISC_R_SUCCESS) {
isc__nmsocket_clearcb(handle->sock);
} else {
atomic_store(&sock->connected, true);
sock->connected = true;
}
streamdns_try_close_unused(sock);
}
@ -341,7 +339,7 @@ streamdns_transport_connected(isc_nmhandle_t *handle, isc_result_t result,
}
isc_nmhandle_attach(handle, &sock->outerhandle);
atomic_store(&sock->active, true);
atomic_store_release(&sock->active, true);
handle->sock->streamdns.sock = sock;
@ -364,7 +362,7 @@ error:
isc_nm_verify_tls_peer_result_string(handle);
}
streamhandle = isc__nmhandle_get(sock, NULL, NULL);
atomic_store(&sock->closed, true);
sock->closed = true;
streamdns_call_connect_cb(sock, streamhandle, result);
isc_nmhandle_detach(&streamhandle);
isc__nmsocket_detach(&sock);
@ -482,7 +480,7 @@ streamdns_failed_read_cb(isc_nmsocket_t *sock, const isc_result_t result,
sock->recv_read = false;
isc_dnsstream_assembler_clear(sock->streamdns.input);
isc__nmsocket_clearcb(sock);
} else if (atomic_load(&sock->client)) {
} else if (sock->client) {
sock->recv_read = false;
}
isc__nm_readcb(sock, req, result, async);
@ -630,7 +628,7 @@ streamdns_resume_processing(void *arg) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(!atomic_load(&sock->client));
REQUIRE(!sock->client);
if (streamdns_closing(sock)) {
return;
@ -658,9 +656,7 @@ streamdns_accept_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
if (isc__nm_closing(handle->sock->worker)) {
return (ISC_R_SHUTTINGDOWN);
} else if (isc__nmsocket_closing(handle->sock) ||
atomic_load(&listensock->closing))
{
} else if (isc__nmsocket_closing(handle->sock) || listensock->closing) {
return (ISC_R_CANCELED);
}
@ -676,8 +672,8 @@ streamdns_accept_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
isc_nm_gettimeouts(handle->sock->worker->netmgr, &initial, NULL, NULL,
NULL);
nsock->read_timeout = initial;
atomic_init(&nsock->accepting, true);
atomic_store(&nsock->active, true);
nsock->accepting = true;
atomic_store_release(&nsock->active, true);
isc__nmsocket_attach(listensock, &nsock->listener);
isc_nmhandle_attach(handle, &nsock->outerhandle);
@ -693,7 +689,7 @@ streamdns_accept_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
isc_nmhandle_detach(&nsock->recv_handle);
isc__nmsocket_detach(&nsock->listener);
isc_nmhandle_detach(&nsock->outerhandle);
atomic_store(&nsock->closed, true);
nsock->closed = true;
goto exit;
}
@ -706,7 +702,7 @@ streamdns_accept_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
streamdns_handle_incoming_data(nsock, nsock->outerhandle, NULL, 0);
exit:
atomic_store(&nsock->accepting, false);
nsock->accepting = false;
return (result);
}
@ -745,7 +741,7 @@ isc_nm_listenstreamdns(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
backlog, quota, tlsctx, &listener->outer);
}
if (result != ISC_R_SUCCESS) {
atomic_store(&listener->closed, true);
listener->closed = true;
isc__nmsocket_detach(&listener);
return (result);
}
@ -756,8 +752,8 @@ isc_nm_listenstreamdns(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
}
listener->result = result;
atomic_store(&listener->active, true);
atomic_store(&listener->listening, true);
atomic_store_release(&listener->active, true);
listener->listening = true;
INSIST(listener->outer->streamdns.listener == NULL);
listener->nchildren = listener->outer->nchildren;
isc__nmsocket_barrier_init(listener);
@ -942,8 +938,8 @@ streamdns_close_direct(isc_nmsocket_t *sock) {
/* Further cleanup performed in isc__nm_streamdns_cleanup_data() */
isc_dnsstream_assembler_clear(sock->streamdns.input);
atomic_store(&sock->closed, true);
atomic_store(&sock->active, false);
sock->closed = true;
atomic_store_release(&sock->active, false);
}
void
@ -951,12 +947,9 @@ isc__nm_streamdns_close(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_streamdnssocket);
REQUIRE(sock->tid == isc_tid());
REQUIRE(!sock->closing);
if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
true))
{
return;
}
sock->closing = true;
streamdns_close_direct(sock);
}

View file

@ -80,8 +80,10 @@ failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult);
static void
failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
REQUIRE(atomic_load(&sock->accepting));
REQUIRE(sock->server);
REQUIRE(sock->accepting);
sock->accepting = false;
/*
* Detach the quota early to make room for other connections;
@ -94,8 +96,6 @@ failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
isc__nmsocket_detach(&sock->server);
atomic_store(&sock->accepting, false);
switch (eresult) {
case ISC_R_NOTCONNECTED:
/* IGNORE: The client disconnected before we could accept */
@ -120,7 +120,7 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
worker = sock->worker;
atomic_store(&sock->connecting, true);
sock->connecting = true;
/* 2 minute timeout */
result = isc__nm_socket_connectiontimeout(sock->fd, 120 * 1000);
@ -165,8 +165,6 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
&req->uv_req.connect);
isc__nmsocket_timer_start(sock);
atomic_store(&sock->connected, true);
return (ISC_R_SUCCESS);
}
@ -189,10 +187,10 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
REQUIRE(VALID_UVREQ(req));
REQUIRE(VALID_NMHANDLE(req->handle));
if (atomic_load(&sock->timedout)) {
if (sock->timedout) {
result = ISC_R_TIMEDOUT;
goto error;
} else if (!atomic_load(&sock->connecting)) {
} else if (!sock->connecting) {
/*
* The connect was cancelled from timeout; just clean up
* the req.
@ -245,7 +243,8 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
goto error;
}
atomic_store(&sock->connecting, false);
sock->connecting = false;
sock->connected = true;
result = isc_sockaddr_fromsockaddr(&sock->peer, (struct sockaddr *)&ss);
RUNTIME_CHECK(result == ISC_R_SUCCESS);
@ -289,7 +288,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
sock->connect_timeout = timeout;
sock->fd = fd;
atomic_init(&sock->client, true);
sock->client = true;
req = isc__nm_uvreq_get(sock);
req->cb.connect = cb;
@ -301,11 +300,11 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
(void)isc__nm_socket_min_mtu(sock->fd, sa_family);
(void)isc__nm_socket_tcp_maxseg(sock->fd, NM_MAXSEG);
atomic_store(&sock->active, true);
atomic_store_release(&sock->active, true);
result = tcp_connect_direct(sock, req);
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->active, false);
atomic_store_release(&sock->active, false);
isc__nm_tcp_close(sock);
isc__nm_connectcb(sock, req, result, true);
}
@ -416,7 +415,7 @@ start_tcp_child_job(void *arg) {
goto done;
}
atomic_store(&sock->listening, true);
sock->listening = true;
if (sock->tid == 0) {
r = uv_tcp_getsockname(&sock->uv_handle.tcp,
@ -550,16 +549,15 @@ isc_nm_listentcp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
}
}
atomic_store(&sock->active, true);
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->active, false);
atomic_store_release(&sock->active, false);
isc__nm_tcp_stoplistening(sock);
isc_nmsocket_close(&sock);
return (result);
}
atomic_store_release(&sock->active, true);
REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren);
*sockp = sock;
return (ISC_R_SUCCESS);
@ -607,9 +605,9 @@ stop_tcp_child_job(void *arg) {
REQUIRE(sock->tid == isc_tid());
REQUIRE(sock->parent != NULL);
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(!sock->closing);
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closing,
&(bool){ false }, true));
sock->closing = true;
/*
* The order of the close operation is important here, the uv_close()
@ -633,45 +631,41 @@ stop_tcp_child_job(void *arg) {
}
static void
stop_tcp_child(isc_nmsocket_t *sock, uint32_t tid) {
isc_nmsocket_t *csock = NULL;
stop_tcp_child(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
csock = &sock->children[tid];
REQUIRE(VALID_NMSOCK(csock));
atomic_store(&csock->active, false);
if (tid == 0) {
stop_tcp_child_job(csock);
atomic_store_release(&sock->active, false);
if (sock->tid == 0) {
stop_tcp_child_job(sock);
} else {
isc_async_run(csock->worker->loop, stop_tcp_child_job, csock);
isc_async_run(sock->worker->loop, stop_tcp_child_job, sock);
}
}
static void
stop_tcp_parent(isc_nmsocket_t *sock) {
/* Stop the parent */
atomic_store(&sock->closed, true);
isc__nmsocket_prep_destroy(sock);
}
void
isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcplistener);
REQUIRE(sock->tid == isc_tid());
REQUIRE(sock->tid == 0);
REQUIRE(!sock->closing);
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closing,
&(bool){ false }, true));
sock->closing = true;
/* Mark the parent socket inactive */
atomic_store_release(&sock->active, false);
/* Stop all the other threads' children */
for (size_t i = 1; i < sock->nchildren; i++) {
stop_tcp_child(sock, i);
stop_tcp_child(&sock->children[i]);
}
stop_tcp_child(sock, 0);
/* Stop the child for the main thread */
stop_tcp_child(&sock->children[0]);
stop_tcp_parent(sock);
/* Stop the parent */
sock->closed = true;
isc__nmsocket_prep_destroy(sock);
}
static void
@ -681,16 +675,15 @@ tcp_stop_cb(uv_handle_t *handle) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->closing));
REQUIRE(sock->closing);
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(!sock->closed);
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closed,
&(bool){ false }, true));
sock->closed = true;
sock->listening = false;
isc__nm_incstats(sock, STATID_CLOSE);
atomic_store(&sock->listening, false);
isc__nmsocket_detach(&sock);
}
@ -747,9 +740,10 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
/* Initialize the timer */
if (sock->read_timeout == 0) {
sock->read_timeout = (atomic_load(&sock->keepalive)
? atomic_load(&netmgr->keepalive)
: atomic_load(&netmgr->idle));
sock->read_timeout =
sock->keepalive
? atomic_load_relaxed(&netmgr->keepalive)
: atomic_load_relaxed(&netmgr->idle);
}
if (isc__nmsocket_closing(sock)) {
@ -824,10 +818,11 @@ isc__nm_tcp_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
req->uvbuf.base = buf->base;
req->uvbuf.len = nread;
if (!atomic_load(&sock->client)) {
sock->read_timeout = (atomic_load(&sock->keepalive)
? atomic_load(&netmgr->keepalive)
: atomic_load(&netmgr->idle));
if (!sock->client) {
sock->read_timeout =
sock->keepalive
? atomic_load_relaxed(&netmgr->keepalive)
: atomic_load_relaxed(&netmgr->idle);
}
isc__nm_readcb(sock, req, ISC_R_SUCCESS, false);
@ -912,7 +907,7 @@ accept_connection(isc_nmsocket_t *ssock) {
isc__nmsocket_attach(ssock, &csock->server);
csock->recv_cb = ssock->recv_cb;
csock->recv_cbarg = ssock->recv_cbarg;
atomic_init(&csock->accepting, true);
csock->accepting = true;
csock->quota = ssock->pquota;
worker = csock->worker;
@ -964,11 +959,11 @@ accept_connection(isc_nmsocket_t *ssock) {
goto failure;
}
atomic_store(&csock->accepting, false);
csock->accepting = false;
isc__nm_incstats(csock, STATID_ACCEPT);
csock->read_timeout = atomic_load(&csock->worker->netmgr->init);
csock->read_timeout = atomic_load_relaxed(&csock->worker->netmgr->init);
/*
* The acceptcb needs to attach to the handle if it wants to keep the
@ -984,7 +979,7 @@ accept_connection(isc_nmsocket_t *ssock) {
return (ISC_R_SUCCESS);
failure:
atomic_store(&csock->active, false);
atomic_store_release(&csock->active, false);
failed_accept_cb(csock, result);
@ -1022,9 +1017,10 @@ tcp_send(isc_nmhandle_t *handle, const isc_region_t *region, isc_nm_cb_t cb,
uvreq->cbarg = cbarg;
if (sock->write_timeout == 0) {
sock->write_timeout = (atomic_load(&sock->keepalive)
? atomic_load(&netmgr->keepalive)
: atomic_load(&netmgr->idle));
sock->write_timeout =
sock->keepalive
? atomic_load_relaxed(&netmgr->keepalive)
: atomic_load_relaxed(&netmgr->idle);
}
result = tcp_send_direct(sock, uvreq);
@ -1150,10 +1146,11 @@ static void
tcp_close_sock(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->closing));
REQUIRE(sock->closing);
REQUIRE(!sock->closed);
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closed,
&(bool){ false }, true));
sock->closed = true;
sock->connected = false;
isc__nm_incstats(sock, STATID_CLOSE);
@ -1161,8 +1158,6 @@ tcp_close_sock(isc_nmsocket_t *sock) {
isc__nmsocket_detach(&sock->server);
}
atomic_store(&sock->connected, false);
isc__nmsocket_prep_destroy(sock);
}
@ -1181,9 +1176,9 @@ isc__nm_tcp_close(isc_nmsocket_t *sock) {
REQUIRE(!isc__nmsocket_active(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(sock->parent == NULL);
REQUIRE(!sock->closing);
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closing,
&(bool){ false }, true));
sock->closing = true;
if (sock->quota != NULL) {
isc_quota_detach(&sock->quota);
@ -1246,11 +1241,11 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
return;
}
if (atomic_load(&sock->accepting)) {
if (sock->accepting) {
return;
}
if (atomic_load(&sock->connecting)) {
if (sock->connecting) {
isc_nmsocket_t *tsock = NULL;
isc__nmsocket_attach(sock, &tsock);
uv_close(&sock->uv_handle.handle, tcp_close_connect_cb);

View file

@ -107,10 +107,10 @@ tls_try_to_enable_tcp_nodelay(isc_nmsocket_t *tlssock);
*/
static bool
inactive(isc_nmsocket_t *sock) {
return (!isc__nmsocket_active(sock) || atomic_load(&sock->closing) ||
return (!isc__nmsocket_active(sock) || sock->closing ||
sock->outerhandle == NULL ||
!isc__nmsocket_active(sock->outerhandle->sock) ||
atomic_load(&sock->outerhandle->sock->closing) ||
sock->outerhandle->sock->closing ||
(sock->listener != NULL &&
!isc__nmsocket_active(sock->listener)) ||
isc__nm_closing(sock->worker));
@ -424,7 +424,7 @@ tls_try_handshake(isc_nmsocket_t *sock, isc_result_t *presult) {
if (isc__nm_closing(sock->worker)) {
result = ISC_R_SHUTTINGDOWN;
} else if (isc__nmsocket_closing(sock) ||
atomic_load(&sock->listener->closing))
sock->listener->closing)
{
result = ISC_R_CANCELED;
} else {
@ -544,13 +544,13 @@ tls_do_bio(isc_nmsocket_t *sock, isc_region_t *received_data,
INSIST(SSL_is_init_finished(
sock->tlsstream.tls) ==
1);
INSIST(!atomic_load(&sock->client));
INSIST(!sock->client);
finish = true;
} else if (sock->tlsstream.state == TLS_IO &&
hs_result == ISC_R_SUCCESS &&
!sock->tlsstream.server)
{
INSIST(atomic_load(&sock->client));
INSIST(sock->client);
}
}
} else if (send_data != NULL) {
@ -867,7 +867,7 @@ tlslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
isc_tlsctx_attach(tlsctx, &tlssock->tlsstream.ctx);
tlssock->tlsstream.tls = isc_tls_create(tlssock->tlsstream.ctx);
if (tlssock->tlsstream.tls == NULL) {
atomic_store(&tlssock->closed, true);
tlssock->closed = true;
isc_tlsctx_free(&tlssock->tlsstream.ctx);
isc__nmsocket_detach(&tlssock);
return (ISC_R_TLSERROR);
@ -877,7 +877,7 @@ tlslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
isc_nmhandle_attach(handle, &tlssock->outerhandle);
tlssock->peer = handle->sock->peer;
tlssock->read_timeout =
atomic_load(&handle->sock->worker->netmgr->init);
atomic_load_relaxed(&handle->sock->worker->netmgr->init);
/*
* Hold a reference to tlssock in the TCP socket: it will
@ -932,7 +932,7 @@ isc_nm_listentls(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
result = isc_nm_listentcp(mgr, workers, iface, tlslisten_acceptcb,
tlssock, backlog, quota, &tlssock->outer);
if (result != ISC_R_SUCCESS) {
atomic_store(&tlssock->closed, true);
tlssock->closed = true;
isc__nmsocket_detach(&tlssock);
return (result);
}
@ -945,7 +945,7 @@ isc_nm_listentls(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
/* wait for listen result */
isc__nmsocket_attach(tlssock->outer, &tsock);
tlssock->result = result;
atomic_store(&tlssock->active, true);
atomic_store_release(&tlssock->active, true);
INSIST(tlssock->outer->tlsstream.tlslistener == NULL);
isc__nmsocket_attach(tlssock, &tlssock->outer->tlsstream.tlslistener);
isc__nmsocket_detach(&tsock);
@ -956,7 +956,7 @@ isc_nm_listentls(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
atomic_init(&tlssock->rchildren, tlssock->nchildren);
if (result == ISC_R_SUCCESS) {
atomic_store(&tlssock->listening, true);
tlssock->listening = true;
*sockp = tlssock;
}
@ -1086,8 +1086,8 @@ tls_close_direct(void *arg) {
}
/* Further cleanup performed in isc__nm_tls_cleanup_data() */
atomic_store(&sock->closed, true);
atomic_store(&sock->active, false);
sock->closed = true;
atomic_store_release(&sock->active, false);
sock->tlsstream.state = TLS_CLOSED;
}
@ -1095,12 +1095,9 @@ void
isc__nm_tls_close(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tlssocket);
REQUIRE(!sock->closing);
if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
true))
{
return;
}
sock->closing = true;
if (sock->tid == isc_tid()) {
/* no point in attempting to make the call asynchronous */
@ -1144,7 +1141,7 @@ isc_nm_tlsconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
nsock->connect_cbarg = cbarg;
nsock->connect_timeout = timeout;
isc_tlsctx_attach(ctx, &nsock->tlsstream.ctx);
atomic_init(&nsock->client, true);
nsock->client = true;
if (client_sess_cache != NULL) {
INSIST(isc_tlsctx_client_session_cache_getctx(
client_sess_cache) == ctx);
@ -1194,7 +1191,7 @@ tcp_connected(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
}
tlssock->peer = isc_nmhandle_peeraddr(handle);
isc_nmhandle_attach(handle, &tlssock->outerhandle);
atomic_store(&tlssock->active, true);
atomic_store_release(&tlssock->active, true);
if (tlssock->tlsstream.client_sess_cache != NULL) {
isc_tlsctx_client_session_cache_reuse_sockaddr(
@ -1215,7 +1212,7 @@ tcp_connected(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
return;
error:
tlshandle = isc__nmhandle_get(tlssock, NULL, NULL);
atomic_store(&tlssock->closed, true);
tlssock->closed = true;
tls_call_connect_cb(tlssock, tlshandle, result);
isc_nmhandle_detach(&tlshandle);
isc__nmsocket_detach(&tlssock);
@ -1246,7 +1243,7 @@ isc__nm_tls_cleanup_data(isc_nmsocket_t *sock) {
isc_tlsctx_free(&sock->tlsstream.ctx);
}
if (sock->tlsstream.client_sess_cache != NULL) {
INSIST(atomic_load(&sock->client));
INSIST(sock->client);
isc_tlsctx_client_session_cache_detach(
&sock->tlsstream.client_sess_cache);
}
@ -1468,7 +1465,7 @@ tls_keep_client_tls_session(isc_nmsocket_t *sock) {
if (sock->tlsstream.client_sess_cache != NULL &&
sock->tlsstream.client_session_saved == false)
{
INSIST(atomic_load(&sock->client));
INSIST(sock->client);
isc_tlsctx_client_session_cache_keep_sockaddr(
sock->tlsstream.client_sess_cache, &sock->peer,
sock->tlsstream.tls);

View file

@ -168,8 +168,7 @@ start_udp_child_job(void *arg) {
}
sock->reading = true;
atomic_store(&sock->listening, true);
sock->listening = true;
done:
result = isc_uverr2result(r);
@ -275,15 +274,15 @@ isc_nm_listenudp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
}
}
atomic_store(&sock->active, true);
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->active, false);
atomic_store_release(&sock->active, false);
isc__nm_udp_stoplistening(sock);
isc_nmsocket_close(&sock);
return (result);
}
atomic_store_release(&sock->active, true);
INSIST(atomic_load(&sock->rchildren) == sock->nchildren);
*sockp = sock;
return (ISC_R_SUCCESS);
@ -328,7 +327,7 @@ route_connect_direct(isc_nmsocket_t *sock) {
worker = sock->worker;
atomic_store(&sock->connecting, true);
sock->connecting = true;
r = uv_udp_init(&worker->loop->loop, &sock->uv_handle.udp);
UV_RUNTIME_CHECK(uv_udp_init, r);
@ -350,8 +349,8 @@ route_connect_direct(isc_nmsocket_t *sock) {
isc__nm_set_network_buffers(sock->worker->netmgr,
&sock->uv_handle.handle);
atomic_store(&sock->connecting, false);
atomic_store(&sock->connected, true);
sock->connecting = false;
sock->connected = true;
return (ISC_R_SUCCESS);
}
@ -384,7 +383,7 @@ isc_nm_routeconnect(isc_nm_t *mgr, isc_nm_cb_t cb, void *cbarg) {
sock->connect_cb = cb;
sock->connect_cbarg = cbarg;
atomic_init(&sock->client, true);
sock->client = true;
sock->route_sock = true;
sock->fd = fd;
@ -393,11 +392,11 @@ isc_nm_routeconnect(isc_nm_t *mgr, isc_nm_cb_t cb, void *cbarg) {
req->cbarg = cbarg;
req->handle = isc__nmhandle_get(sock, NULL, NULL);
atomic_store(&sock->active, true);
atomic_store_release(&sock->active, true);
result = route_connect_direct(sock);
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->active, false);
atomic_store_release(&sock->active, false);
isc__nm_udp_close(sock);
}
@ -437,7 +436,7 @@ static void
stop_udp_child(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
atomic_store(&sock->active, false);
atomic_store_release(&sock->active, false);
if (sock->tid == 0) {
stop_udp_child_job(sock);
} else {
@ -451,12 +450,12 @@ isc__nm_udp_stoplistening(isc_nmsocket_t *sock) {
REQUIRE(sock->type == isc_nm_udplistener);
REQUIRE(sock->tid == isc_tid());
REQUIRE(sock->tid == 0);
REQUIRE(!sock->closing);
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closing,
&(bool){ false }, true));
sock->closing = true;
/* Mark the parent socket inactive */
atomic_store(&sock->active, false);
atomic_store_release(&sock->active, false);
/* Stop all the other threads' children */
for (size_t i = 1; i < sock->nchildren; i++) {
@ -467,7 +466,7 @@ isc__nm_udp_stoplistening(isc_nmsocket_t *sock) {
stop_udp_child(&sock->children[0]);
/* Stop the parent */
atomic_store(&sock->closed, true);
sock->closed = true;
isc__nmsocket_prep_destroy(sock);
}
@ -509,7 +508,7 @@ isc__nm_udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
* - If we're simulating a firewall blocking UDP packets
* bigger than 'maxudp' bytes for testing purposes.
*/
maxudp = atomic_load(&sock->worker->netmgr->maxudp);
maxudp = atomic_load_relaxed(&sock->worker->netmgr->maxudp);
if ((maxudp != 0 && (uint32_t)nrecv > maxudp)) {
/*
* We need to keep the read_cb intact in case, so the
@ -582,7 +581,7 @@ isc__nm_udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
* stop reading now. The reading could be restarted in the read
* callback with another isc_nm_read() call.
*/
if (atomic_load(&sock->client)) {
if (sock->client) {
isc__nmsocket_timer_stop(sock);
isc__nm_stop_reading(sock);
}
@ -654,7 +653,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region,
isc_nm_cb_t cb, void *cbarg) {
isc_nmsocket_t *sock = handle->sock;
const isc_sockaddr_t *peer = &handle->peer;
const struct sockaddr *sa = &peer->type.sa;
const struct sockaddr *sa = sock->connected ? NULL : &peer->type.sa;
isc__nm_uvreq_t *uvreq = NULL;
isc__networker_t *worker = NULL;
uint32_t maxudp;
@ -700,15 +699,6 @@ isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region,
goto fail;
}
/*
* We used uv_udp_connect(), so the peer address has to be
* set to NULL or else uv_udp_send() could fail or assert,
* depending on the libuv version.
*/
if (atomic_load(&sock->connected)) {
sa = NULL;
}
r = uv_udp_send(&uvreq->uv_req.udp_send, &sock->uv_handle.udp,
&uvreq->uvbuf, 1, sa, udp_send_cb);
if (r < 0) {
@ -813,7 +803,7 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
sock->connect_cbarg = cbarg;
sock->read_timeout = timeout;
sock->peer = *peer;
atomic_init(&sock->client, true);
sock->client = true;
sock->fd = fd;
result = isc__nm_socket_reuse(sock->fd);
@ -834,19 +824,19 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
req->local = *local;
req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface);
atomic_store(&sock->active, true);
atomic_store(&sock->connecting, true);
atomic_store_release(&sock->active, true);
sock->connecting = true;
result = udp_connect_direct(sock, req);
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->active, false);
atomic_store_release(&sock->active, false);
isc__nm_failed_connect_cb(sock, req, result, true);
isc__nmsocket_detach(&sock);
return;
}
atomic_store(&sock->connecting, false);
atomic_store(&sock->connected, true);
sock->connecting = false;
sock->connected = true;
isc__nm_connectcb(sock, req, ISC_R_SUCCESS, true);
isc__nmsocket_detach(&sock);
@ -859,7 +849,7 @@ isc__nm_udp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result,
REQUIRE(result != ISC_R_SUCCESS);
REQUIRE(sock->tid == isc_tid());
if (atomic_load(&sock->client)) {
if (sock->client) {
isc__nmsocket_timer_stop(sock);
isc__nm_stop_reading(sock);
@ -952,13 +942,10 @@ udp_close_cb(uv_handle_t *handle) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->closing));
REQUIRE(sock->closing);
REQUIRE(!sock->closed);
if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false },
true))
{
UNREACHABLE();
}
sock->closed = true;
isc__nm_incstats(sock, STATID_CLOSE);
@ -967,14 +954,15 @@ udp_close_cb(uv_handle_t *handle) {
isc__nmsocket_detach(&sock->server);
}
/* All sockets */
sock->listening = false;
if (sock->parent != NULL) {
/* listening socket (listen) */
atomic_store(&sock->listening, false);
isc__nmsocket_detach(&sock);
} else {
/* client and server sockets */
atomic_store(&sock->connected, false);
atomic_store(&sock->listening, false);
sock->connected = false;
isc__nmsocket_prep_destroy(sock);
}
}
@ -984,12 +972,9 @@ isc__nm_udp_close(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_udpsocket);
REQUIRE(sock->tid == isc_tid());
REQUIRE(!sock->closing);
if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
true))
{
return;
}
sock->closing = true;
isc__nmsocket_clearcb(sock);
isc__nmsocket_timer_stop(sock);
@ -1029,13 +1014,8 @@ isc__nm_udp_shutdown(isc_nmsocket_t *sock) {
return;
}
/*
* If the socket is connecting, the cancel will happen in the
* async_udpconnect() due socket being inactive now.
*/
if (atomic_load(&sock->connecting)) {
return;
}
/* uv_udp_connect is synchronous, we can't be in connected state */
REQUIRE(!sock->connecting);
/*
* When the client detaches the last handle, the
@ -1070,7 +1050,7 @@ udp_cancelread_cb(void *arg) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->client));
REQUIRE(sock->client);
isc__nm_failed_read_cb(sock, ISC_R_EOF, false);
isc__nmsocket_detach(&sock);