Merge branch 'ondrej/reduce-atomic-use-in-netmgr' into 'main'

Refactor the use of atomics in netmgr

See merge request isc-projects/bind9!7724
This commit is contained in:
Ondřej Surý 2023-03-30 14:26:38 +00:00
commit 696b2c8c89
7 changed files with 294 additions and 472 deletions

View file

@ -1479,7 +1479,7 @@ isc_nm_httpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
sock->connect_timeout = timeout;
sock->connect_cb = cb;
sock->connect_cbarg = cbarg;
atomic_init(&sock->client, true);
sock->client = true;
if (isc__nm_closing(worker)) {
isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock);
@ -1609,7 +1609,7 @@ isc__nm_http_request(isc_nmhandle_t *handle, isc_region_t *region,
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
REQUIRE(handle->sock->tid == isc_tid());
REQUIRE(atomic_load(&handle->sock->client));
REQUIRE(handle->sock->client);
REQUIRE(cb != NULL);
@ -1688,11 +1688,9 @@ find_server_request_handler(const char *request_path,
REQUIRE(VALID_NMSOCK(serversocket));
if (atomic_load(&serversocket->listening)) {
handler = http_endpoints_find(
request_path,
http_get_listener_endpoints(serversocket, tid));
}
handler = http_endpoints_find(
request_path, http_get_listener_endpoints(serversocket, tid));
return (handler);
}
@ -2069,7 +2067,7 @@ isc__nm_http_bad_request(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMSOCK(handle->sock));
sock = handle->sock;
REQUIRE(sock->type == isc_nm_httpsocket);
REQUIRE(!atomic_load(&sock->client));
REQUIRE(!sock->client);
REQUIRE(VALID_HTTP2_SESSION(sock->h2.session));
(void)server_send_error_response(ISC_HTTP_ERROR_BAD_REQUEST,
@ -2430,57 +2428,31 @@ http_transpost_tcp_nodelay(isc_nmhandle_t *transphandle) {
static isc_result_t
httplisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
isc_nmsocket_t *httplistensock = (isc_nmsocket_t *)cbarg;
isc_nmsocket_t *httpserver = (isc_nmsocket_t *)cbarg;
isc_nm_http_session_t *session = NULL;
isc_nmsocket_t *listener = NULL, *httpserver = NULL;
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
if (handle->sock->type == isc_nm_tlssocket) {
REQUIRE(VALID_NMSOCK(handle->sock->listener));
listener = handle->sock->listener;
httpserver = listener->h2.httpserver;
} else {
REQUIRE(VALID_NMSOCK(handle->sock->server));
listener = handle->sock->server;
REQUIRE(VALID_NMSOCK(listener->parent));
httpserver = listener->parent->h2.httpserver;
}
/*
* NOTE: HTTP listener socket might be destroyed by the time this
* function gets invoked, so we need to do extra sanity checks to
* detect this case.
*/
if (isc__nm_closing(handle->sock->worker)) {
return (ISC_R_SHUTTINGDOWN);
} else if (isc__nmsocket_closing(handle->sock) || httpserver == NULL) {
return (ISC_R_CANCELED);
}
if (result != ISC_R_SUCCESS) {
/* XXXWPK do nothing? */
} else if (result != ISC_R_SUCCESS) {
return (result);
}
REQUIRE(VALID_NMSOCK(httplistensock));
INSIST(httplistensock == httpserver);
if (atomic_load(&httplistensock->closing)) {
return (ISC_R_CANCELED);
}
REQUIRE(VALID_NMSOCK(httpserver));
REQUIRE(httpserver->type == isc_nm_httplistener);
http_transpost_tcp_nodelay(handle);
new_session(handle->sock->worker->mctx, NULL, &session);
session->max_concurrent_streams =
atomic_load(&httplistensock->h2.max_concurrent_streams);
atomic_load_relaxed(&httpserver->h2.max_concurrent_streams);
initialize_nghttp2_server_session(session);
handle->sock->h2.session = session;
isc_nmhandle_attach(handle, &session->handle);
isc__nmsocket_attach(httplistensock, &session->serversocket);
isc__nmsocket_attach(httpserver, &session->serversocket);
server_send_connection_header(session);
/* TODO H2 */
@ -2523,20 +2495,14 @@ isc_nm_listenhttp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
}
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->closed, true);
sock->closed = true;
isc__nmsocket_detach(&sock);
return (result);
}
isc__nmsocket_attach(sock, &sock->outer->h2.httpserver);
sock->nchildren = sock->outer->nchildren;
sock->fd = (uv_os_sock_t)-1;
isc__nmsocket_barrier_init(sock);
atomic_init(&sock->rchildren, sock->nchildren);
atomic_store(&sock->listening, true);
*sockp = sock;
return (ISC_R_SUCCESS);
}
@ -2711,8 +2677,8 @@ http_close_direct(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
atomic_store(&sock->closed, true);
atomic_store(&sock->active, false);
sock->closed = true;
sock->active = false;
session = sock->h2.session;
if (session != NULL && session->sending == 0 && !session->reading) {
@ -2742,12 +2708,9 @@ isc__nm_http_close(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_httpsocket);
REQUIRE(!isc__nmsocket_active(sock));
REQUIRE(!sock->closing);
if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
true))
{
return;
}
sock->closing = true;
if (sock->h2.session != NULL && sock->h2.session->closed &&
sock->tid == isc_tid())
@ -2843,8 +2806,8 @@ server_call_failed_read_cb(isc_result_t result,
isc_nmsocket_h2_t *next = ISC_LIST_NEXT(h2data, link);
ISC_LIST_DEQUEUE(session->sstreams, h2data, link);
/* Cleanup socket in place */
atomic_store(&h2data->psock->active, false);
atomic_store(&h2data->psock->closed, true);
h2data->psock->active = false;
h2data->psock->closed = true;
isc__nmsocket_detach(&h2data->psock);
h2data = next;
@ -2957,7 +2920,7 @@ isc__nm_http_set_max_streams(isc_nmsocket_t *listener,
max_streams = max_concurrent_streams;
}
atomic_store(&listener->h2.max_concurrent_streams, max_streams);
atomic_store_relaxed(&listener->h2.max_concurrent_streams, max_streams);
}
typedef struct http_endpoints_data {
@ -3208,13 +3171,6 @@ isc__nm_http_initsocket(isc_nmsocket_t *sock) {
void
isc__nm_http_cleanup_data(isc_nmsocket_t *sock) {
if ((sock->type == isc_nm_tcplistener ||
sock->type == isc_nm_tlslistener) &&
sock->h2.httpserver != NULL)
{
isc__nmsocket_detach(&sock->h2.httpserver);
}
if (sock->type == isc_nm_httplistener ||
sock->type == isc_nm_httpsocket)
{

View file

@ -430,8 +430,6 @@ typedef struct isc_nmsocket_h2 {
int32_t stream_id;
isc_nm_http_session_t *session;
isc_nmsocket_t *httpserver;
/* maximum concurrent streams (server-side) */
atomic_uint_fast32_t max_concurrent_streams;
@ -477,6 +475,7 @@ struct isc_nmsocket {
/*% Unlocked, RO */
int magic;
uint32_t tid;
isc_refcount_t references;
isc_nmsocket_type type;
isc__networker_t *worker;
@ -485,10 +484,6 @@ struct isc_nmsocket {
/*% Parent socket for multithreaded listeners */
isc_nmsocket_t *parent;
/*% Listener socket this connection was accepted on */
isc_nmsocket_t *listener;
/*% Self socket */
isc_nmsocket_t *self;
/*% TLS stuff */
struct tlsstream {
@ -563,6 +558,9 @@ struct isc_nmsocket {
/*% server socket for connections */
isc_nmsocket_t *server;
/*% client socket for connections */
isc_nmsocket_t *listener;
/*% Child sockets for multi-socket setups */
isc_nmsocket_t *children;
uint_fast32_t nchildren;
@ -580,17 +578,13 @@ struct isc_nmsocket {
/*% Peer address */
isc_sockaddr_t peer;
/* Atomic */
/*% Number of running (e.g. listening) child sockets */
atomic_uint_fast32_t rchildren;
/*%
* Socket is active if it's listening, working, etc. If it's
* closing, then it doesn't make a sense, for example, to
* push handles or reqs for reuse.
*/
atomic_bool active;
atomic_bool destroying;
bool active;
bool destroying;
bool route_sock;
@ -600,20 +594,18 @@ struct isc_nmsocket {
* If active==false but closed==false, that means the socket
* is closing.
*/
atomic_bool closing;
atomic_bool closed;
atomic_bool listening;
atomic_bool connecting;
atomic_bool connected;
atomic_bool accepting;
bool closing;
bool closed;
bool connecting;
bool connected;
bool accepting;
bool reading;
atomic_bool timedout;
isc_refcount_t references;
bool timedout;
/*%
* Established an outgoing connection, as client not server.
*/
atomic_bool client;
bool client;
/*%
* The socket is processing read callback, this is guard to not read
@ -625,7 +617,7 @@ struct isc_nmsocket {
* A TCP or TCPDNS socket has been set to use the keepalive
* timeout instead of the default idle timeout.
*/
atomic_bool keepalive;
bool keepalive;
/*%
* 'spare' handles for that can be reused to avoid allocations, for UDP.
@ -661,8 +653,6 @@ struct isc_nmsocket {
isc_nm_accept_cb_t accept_cb;
void *accept_cbarg;
atomic_int_fast32_t active_child_connections;
bool barriers_initialised;
bool manual_read_timer;
#if ISC_NETMGR_TRACE
@ -764,18 +754,6 @@ isc__nmsocket_active(isc_nmsocket_t *sock);
* or, for child sockets, 'sock->parent->active'.
*/
bool
isc__nmsocket_deactivate(isc_nmsocket_t *sock);
/*%<
* @brief Deactivate active socket
*
* Atomically deactive the socket by setting @p sock->active or, for child
* sockets, @p sock->parent->active to @c false
*
* @param[in] sock - valid nmsocket
* @return @c false if the socket was already inactive, @c true otherwise
*/
void
isc__nmsocket_clearcb(isc_nmsocket_t *sock);
/*%<

View file

@ -147,8 +147,8 @@ static void
netmgr_teardown(void *arg) {
isc_nm_t *netmgr = (void *)arg;
if (atomic_compare_exchange_strong(&netmgr->shuttingdown,
&(bool){ false }, true))
if (atomic_compare_exchange_strong_acq_rel(&netmgr->shuttingdown,
&(bool){ false }, true))
{
isc__netmgr_log(netmgr, ISC_LOG_DEBUG(1),
"Shutting down network manager");
@ -318,7 +318,7 @@ void
isc_nm_maxudp(isc_nm_t *mgr, uint32_t maxudp) {
REQUIRE(VALID_NM(mgr));
atomic_store(&mgr->maxudp, maxudp);
atomic_store_relaxed(&mgr->maxudp, maxudp);
}
void
@ -349,10 +349,10 @@ isc_nm_settimeouts(isc_nm_t *mgr, uint32_t init, uint32_t idle,
uint32_t keepalive, uint32_t advertised) {
REQUIRE(VALID_NM(mgr));
atomic_store(&mgr->init, init);
atomic_store(&mgr->idle, idle);
atomic_store(&mgr->keepalive, keepalive);
atomic_store(&mgr->advertised, advertised);
atomic_store_relaxed(&mgr->init, init);
atomic_store_relaxed(&mgr->idle, idle);
atomic_store_relaxed(&mgr->keepalive, keepalive);
atomic_store_relaxed(&mgr->advertised, advertised);
}
void
@ -360,10 +360,10 @@ isc_nm_setnetbuffers(isc_nm_t *mgr, int32_t recv_tcp, int32_t send_tcp,
int32_t recv_udp, int32_t send_udp) {
REQUIRE(VALID_NM(mgr));
atomic_store(&mgr->recv_tcp_buffer_size, recv_tcp);
atomic_store(&mgr->send_tcp_buffer_size, send_tcp);
atomic_store(&mgr->recv_udp_buffer_size, recv_udp);
atomic_store(&mgr->send_udp_buffer_size, send_udp);
atomic_store_relaxed(&mgr->recv_tcp_buffer_size, recv_tcp);
atomic_store_relaxed(&mgr->send_tcp_buffer_size, send_tcp);
atomic_store_relaxed(&mgr->recv_udp_buffer_size, recv_udp);
atomic_store_relaxed(&mgr->send_udp_buffer_size, send_udp);
}
bool
@ -390,43 +390,27 @@ isc_nm_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle,
REQUIRE(VALID_NM(mgr));
if (initial != NULL) {
*initial = atomic_load(&mgr->init);
*initial = atomic_load_relaxed(&mgr->init);
}
if (idle != NULL) {
*idle = atomic_load(&mgr->idle);
*idle = atomic_load_relaxed(&mgr->idle);
}
if (keepalive != NULL) {
*keepalive = atomic_load(&mgr->keepalive);
*keepalive = atomic_load_relaxed(&mgr->keepalive);
}
if (advertised != NULL) {
*advertised = atomic_load(&mgr->advertised);
*advertised = atomic_load_relaxed(&mgr->advertised);
}
}
bool
isc__nmsocket_active(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
if (sock->parent != NULL) {
return (atomic_load(&sock->parent->active));
}
return (atomic_load(&sock->active));
}
bool
isc__nmsocket_deactivate(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
if (sock->parent != NULL) {
return (atomic_compare_exchange_strong(&sock->parent->active,
&(bool){ true }, false));
}
return (atomic_compare_exchange_strong(&sock->active, &(bool){ true },
false));
return (sock->active);
}
void
@ -469,7 +453,8 @@ nmsocket_cleanup(void *arg) {
isc__nm_decstats(sock, STATID_ACTIVE);
atomic_store(&sock->destroying, true);
REQUIRE(!sock->destroying);
sock->destroying = true;
if (sock->parent == NULL && sock->children != NULL) {
/*
@ -477,7 +462,6 @@ nmsocket_cleanup(void *arg) {
* so we can clean up and free the children.
*/
for (size_t i = 0; i < sock->nchildren; i++) {
REQUIRE(!atomic_load(&sock->children[i].destroying));
isc_refcount_decrementz(&sock->children[i].references);
nmsocket_cleanup(&sock->children[i]);
}
@ -570,9 +554,18 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG) {
return;
}
if (atomic_load(&sock->active) || atomic_load(&sock->destroying) ||
!atomic_load(&sock->closed) || atomic_load(&sock->references) != 0)
{
REQUIRE(!sock->destroying);
REQUIRE(!sock->active);
if (!sock->closed) {
return;
}
if (isc_refcount_current(&sock->references) != 0) {
/*
* Using such check is valid only if we don't use
* isc_refcount_increment0() on the same variable.
*/
return;
}
@ -588,7 +581,6 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG) {
return;
}
atomic_store(&sock->destroying, true);
if (sock->tid == isc_tid()) {
nmsocket_cleanup(sock);
} else {
@ -618,17 +610,12 @@ isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) {
* destroying the socket, but we have to wait for all the inflight
* handles to finish first.
*/
atomic_store(&sock->active, false);
sock->active = false;
/*
* If the socket has children, they'll need to be marked inactive
* so they can be cleaned up too.
* If the socket has children, they have been marked inactive by the
* shutdown uv_walk
*/
if (sock->children != NULL) {
for (size_t i = 0; i < sock->nchildren; i++) {
atomic_store(&sock->children[i].active, false);
}
}
/*
* If we're here then we already stopped listening; otherwise
@ -636,7 +623,7 @@ isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) {
*
* If it's a regular socket we may need to close it.
*/
if (!atomic_load(&sock->closing) && !atomic_load(&sock->closed)) {
if (!sock->closing && !sock->closed) {
switch (sock->type) {
case isc_nm_udpsocket:
isc__nm_udp_close(sock);
@ -721,6 +708,7 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker,
.result = ISC_R_UNSET,
.active_handles = ISC_LIST_INITIALIZER,
.active_link = ISC_LINK_INITIALIZER,
.active = true,
};
if (iface != NULL) {
@ -792,19 +780,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker,
"\n",
sock, isc_refcount_current(&sock->references));
atomic_init(&sock->active, true);
atomic_init(&sock->closing, false);
atomic_init(&sock->listening, 0);
atomic_init(&sock->closed, 0);
atomic_init(&sock->destroying, 0);
atomic_init(&sock->client, 0);
atomic_init(&sock->connecting, false);
atomic_init(&sock->keepalive, false);
atomic_init(&sock->connected, false);
atomic_init(&sock->timedout, false);
atomic_init(&sock->active_child_connections, 0);
#if HAVE_LIBNGHTTP2
isc__nm_http_initsocket(sock);
#endif
@ -904,7 +879,7 @@ isc___nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t const *peer,
switch (sock->type) {
case isc_nm_udpsocket:
if (!atomic_load(&sock->client)) {
if (!sock->client) {
break;
}
FALLTHROUGH;
@ -967,7 +942,7 @@ nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
handle->dofree(handle->opaque);
}
isc_mem_put(sock->worker->mctx, handle, sizeof(isc_nmhandle_t));
isc_mem_put(sock->worker->mctx, handle, sizeof(*handle));
}
static void
@ -1005,7 +980,7 @@ nmhandle_destroy(isc_nmhandle_t *handle) {
#if defined(__SANITIZE_ADDRESS__) || defined(__SANITIZE_THREAD__)
nmhandle_free(sock, handle);
#else
if (atomic_load(&sock->active)) {
if (sock->active) {
ISC_LIST_APPEND(sock->inactive_handles, handle, inactive_link);
} else {
nmhandle_free(sock, handle);
@ -1078,7 +1053,7 @@ isc__nm_failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
void
isc__nm_failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
REQUIRE(atomic_load(&sock->accepting));
REQUIRE(sock->accepting);
REQUIRE(sock->server);
/*
@ -1092,7 +1067,7 @@ isc__nm_failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
isc__nmsocket_detach(&sock->server);
atomic_store(&sock->accepting, false);
sock->accepting = false;
switch (eresult) {
case ISC_R_NOTCONNECTED:
@ -1112,15 +1087,15 @@ isc__nm_failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
REQUIRE(VALID_UVREQ(req));
REQUIRE(sock->tid == isc_tid());
REQUIRE(req->cb.connect != NULL);
REQUIRE(sock->connecting);
sock->connecting = false;
isc__nm_incstats(sock, STATID_CONNECTFAIL);
isc__nmsocket_timer_stop(sock);
uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock);
atomic_compare_exchange_enforced(&sock->connecting, &(bool){ true },
false);
isc__nmsocket_clearcb(sock);
isc__nm_connectcb(sock, req, eresult, async);
@ -1157,17 +1132,17 @@ isc__nmsocket_connecttimeout_cb(uv_timer_t *timer) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->connecting));
REQUIRE(VALID_UVREQ(req));
REQUIRE(VALID_NMHANDLE(req->handle));
REQUIRE(sock->connecting);
isc__nmsocket_timer_stop(sock);
/*
* Mark the connection as timed out and shutdown the socket.
*/
atomic_compare_exchange_enforced(&sock->timedout, &(bool){ false },
true);
REQUIRE(!sock->timedout);
sock->timedout = true;
isc__nmsocket_clearcb(sock);
isc__nmsocket_shutdown(sock);
}
@ -1221,7 +1196,7 @@ isc__nmsocket_readtimeout_cb(uv_timer_t *timer) {
REQUIRE(sock->tid == isc_tid());
REQUIRE(sock->reading);
if (atomic_load(&sock->client)) {
if (sock->client) {
uv_timer_stop(timer);
sock->recv_read = false;
@ -1259,7 +1234,7 @@ isc__nmsocket_timer_restart(isc_nmsocket_t *sock) {
return;
}
if (atomic_load(&sock->connecting)) {
if (sock->connecting) {
int r;
if (sock->connect_timeout == 0) {
@ -1352,7 +1327,7 @@ isc__nm_get_read_req(isc_nmsocket_t *sock, isc_sockaddr_t *sockaddr) {
isc_nmhandle_attach(sock->recv_handle, &req->handle);
break;
default:
if (atomic_load(&sock->client) && sock->statichandle != NULL) {
if (sock->client && sock->statichandle != NULL) {
isc_nmhandle_attach(sock->statichandle, &req->handle);
} else {
req->handle = isc__nmhandle_get(sock, sockaddr, NULL);
@ -1463,7 +1438,7 @@ isc__nm_closing(isc__networker_t *worker) {
bool
isc__nmsocket_closing(isc_nmsocket_t *sock) {
return (!isc__nmsocket_active(sock) || atomic_load(&sock->closing) ||
return (!sock->active || sock->closing ||
isc__nm_closing(sock->worker) ||
(sock->server != NULL && !isc__nmsocket_active(sock->server)));
}
@ -1528,13 +1503,17 @@ isc_nmhandle_keepalive(isc_nmhandle_t *handle, bool value) {
sock = handle->sock;
netmgr = sock->worker->netmgr;
REQUIRE(sock->tid == isc_tid());
switch (sock->type) {
case isc_nm_tcpsocket:
atomic_store(&sock->keepalive, value);
sock->read_timeout = value ? atomic_load(&netmgr->keepalive)
: atomic_load(&netmgr->idle);
sock->write_timeout = value ? atomic_load(&netmgr->keepalive)
: atomic_load(&netmgr->idle);
sock->keepalive = value;
sock->read_timeout =
value ? atomic_load_relaxed(&netmgr->keepalive)
: atomic_load_relaxed(&netmgr->idle);
sock->write_timeout =
value ? atomic_load_relaxed(&netmgr->keepalive)
: atomic_load_relaxed(&netmgr->idle);
break;
case isc_nm_streamdnssocket:
isc__nmhandle_streamdns_keepalive(handle, value);
@ -1767,52 +1746,29 @@ isc_nm_stoplistening(isc_nmsocket_t *sock) {
}
}
static void
nmsocket_stop_cb(void *arg) {
isc_nmsocket_t *listener = arg;
(void)atomic_fetch_sub(&listener->rchildren, 1);
isc_barrier_wait(&listener->stop_barrier);
}
void
isc__nmsocket_stop(isc_nmsocket_t *listener) {
REQUIRE(VALID_NMSOCK(listener));
REQUIRE(listener->tid == isc_tid());
REQUIRE(listener->tid == 0);
REQUIRE(listener->type == isc_nm_httplistener ||
listener->type == isc_nm_tlslistener ||
listener->type == isc_nm_streamdnslistener);
REQUIRE(!listener->closing);
if (!atomic_compare_exchange_strong(&listener->closing,
&(bool){ false }, true))
{
UNREACHABLE();
}
listener->closing = true;
for (size_t i = 1; i < listener->nchildren; i++) {
isc__networker_t *worker =
&listener->worker->netmgr->workers[i];
isc_async_run(worker->loop, nmsocket_stop_cb, listener);
}
nmsocket_stop_cb(listener);
INSIST(atomic_load(&listener->rchildren) == 0);
if (!atomic_compare_exchange_strong(&listener->listening,
&(bool){ true }, false))
{
UNREACHABLE();
}
REQUIRE(listener->outer != NULL);
isc_nm_stoplistening(listener->outer);
listener->accept_cb = NULL;
listener->accept_cbarg = NULL;
listener->recv_cb = NULL;
listener->recv_cbarg = NULL;
if (listener->outer != NULL) {
isc_nm_stoplistening(listener->outer);
isc__nmsocket_detach(&listener->outer);
}
isc__nmsocket_detach(&listener->outer);
atomic_store(&listener->closed, true);
listener->closed = true;
}
void
@ -2186,7 +2142,7 @@ isc_nm_set_maxage(isc_nmhandle_t *handle, const uint32_t ttl) {
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
REQUIRE(!atomic_load(&handle->sock->client));
REQUIRE(!handle->sock->client);
#if !HAVE_LIBNGHTTP2
UNUSED(ttl);
@ -2569,7 +2525,7 @@ nmsocket_dump(isc_nmsocket_t *sock) {
fprintf(stderr, "\n=================\n");
fprintf(stderr, "Active %s socket %p, type %s, refs %" PRIuFAST32 "\n",
atomic_load(&sock->client) ? "client" : "server", sock,
sock->client ? "client" : "server", sock,
nmsocket_type_totext(sock->type),
isc_refcount_current(&sock->references));
fprintf(stderr,
@ -2577,11 +2533,11 @@ nmsocket_dump(isc_nmsocket_t *sock) {
"%p\n",
sock->parent, sock->listener, sock->server, sock->statichandle);
fprintf(stderr, "Flags:%s%s%s%s%s\n",
atomic_load(&sock->active) ? " active" : "",
atomic_load(&sock->closing) ? " closing" : "",
atomic_load(&sock->destroying) ? " destroying" : "",
atomic_load(&sock->connecting) ? " connecting" : "",
atomic_load(&sock->accepting) ? " accepting" : "");
atomic_load_acquire(&sock->active) ? " active" : "",
sock->closing ? " closing" : "",
sock->destroying ? " destroying" : "",
sock->connecting ? " connecting" : "",
sock->accepting ? " accepting" : "");
fprintf(stderr, "Created by:\n");
isc_backtrace_symbols_fd(sock->backtrace, sock->backtrace_size,
STDERR_FILENO);

View file

@ -124,18 +124,16 @@ streamdns_on_complete_dnsmessage(isc_dnsstream_assembler_t *dnsasm,
isc_region_t *restrict region,
isc_nmsocket_t *sock,
isc_nmhandle_t *transphandle) {
const bool client = atomic_load(&sock->client);
const bool last_datum = isc_dnsstream_assembler_remaininglength(
dnsasm) == region->length;
/*
* Stop after one message if a client
* connection.
* Stop after one message if a client connection.
*/
bool stop = client;
bool stop = sock->client;
sock->recv_read = false;
if (sock->recv_cb != NULL) {
if (!client) {
if (!sock->client) {
/*
* We must allocate a new handle object, as we
* need to ensure that after processing of this
@ -259,8 +257,8 @@ streamdns_sock_new(isc__networker_t *worker, const isc_nmsocket_type_t type,
uint32_t initial = 0;
isc_nm_gettimeouts(worker->netmgr, &initial, NULL, NULL, NULL);
sock->read_timeout = initial;
atomic_init(&sock->client, !is_server);
atomic_init(&sock->connecting, !is_server);
sock->client = !is_server;
sock->connecting = !is_server;
sock->streamdns.input = isc_dnsstream_assembler_new(
sock->worker->mctx, streamdns_on_dnsmessage_data_cb,
sock);
@ -272,7 +270,7 @@ streamdns_sock_new(isc__networker_t *worker, const isc_nmsocket_type_t type,
static void
streamdns_call_connect_cb(isc_nmsocket_t *sock, isc_nmhandle_t *handle,
const isc_result_t result) {
atomic_store(&sock->connecting, false);
sock->connecting = false;
if (sock->connect_cb == NULL) {
return;
}
@ -280,7 +278,7 @@ streamdns_call_connect_cb(isc_nmsocket_t *sock, isc_nmhandle_t *handle,
if (result != ISC_R_SUCCESS) {
isc__nmsocket_clearcb(handle->sock);
} else {
atomic_store(&sock->connected, true);
sock->connected = true;
}
streamdns_try_close_unused(sock);
}
@ -341,7 +339,7 @@ streamdns_transport_connected(isc_nmhandle_t *handle, isc_result_t result,
}
isc_nmhandle_attach(handle, &sock->outerhandle);
atomic_store(&sock->active, true);
sock->active = true;
handle->sock->streamdns.sock = sock;
@ -364,7 +362,7 @@ error:
isc_nm_verify_tls_peer_result_string(handle);
}
streamhandle = isc__nmhandle_get(sock, NULL, NULL);
atomic_store(&sock->closed, true);
sock->closed = true;
streamdns_call_connect_cb(sock, streamhandle, result);
isc_nmhandle_detach(&streamhandle);
isc__nmsocket_detach(&sock);
@ -482,7 +480,7 @@ streamdns_failed_read_cb(isc_nmsocket_t *sock, const isc_result_t result,
sock->recv_read = false;
isc_dnsstream_assembler_clear(sock->streamdns.input);
isc__nmsocket_clearcb(sock);
} else if (atomic_load(&sock->client)) {
} else if (sock->client) {
sock->recv_read = false;
}
isc__nm_readcb(sock, req, result, async);
@ -630,7 +628,7 @@ streamdns_resume_processing(void *arg) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(!atomic_load(&sock->client));
REQUIRE(!sock->client);
if (streamdns_closing(sock)) {
return;
@ -644,27 +642,21 @@ streamdns_accept_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
isc_nmsocket_t *listensock = (isc_nmsocket_t *)cbarg;
isc_nmsocket_t *nsock;
isc_sockaddr_t iface;
int tid;
int tid = isc_tid();
uint32_t initial = 0;
if (result != ISC_R_SUCCESS) {
return (result);
}
INSIST(VALID_NMHANDLE(handle));
INSIST(VALID_NMSOCK(handle->sock));
INSIST(VALID_NMSOCK(listensock));
INSIST(listensock->type == isc_nm_streamdnslistener);
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
if (isc__nm_closing(handle->sock->worker)) {
return (ISC_R_SHUTTINGDOWN);
} else if (isc__nmsocket_closing(handle->sock) ||
atomic_load(&listensock->closing))
{
return (ISC_R_CANCELED);
} else if (result != ISC_R_SUCCESS) {
return (result);
}
tid = isc_tid();
REQUIRE(VALID_NMSOCK(listensock));
REQUIRE(listensock->type == isc_nm_streamdnslistener);
iface = isc_nmhandle_localaddr(handle);
nsock = streamdns_sock_new(handle->sock->worker, isc_nm_streamdnssocket,
&iface, true);
@ -676,10 +668,10 @@ streamdns_accept_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
isc_nm_gettimeouts(handle->sock->worker->netmgr, &initial, NULL, NULL,
NULL);
nsock->read_timeout = initial;
atomic_init(&nsock->accepting, true);
atomic_store(&nsock->active, true);
nsock->accepting = true;
nsock->active = true;
isc__nmsocket_attach(listensock, &nsock->listener);
isc__nmsocket_attach(handle->sock, &nsock->listener);
isc_nmhandle_attach(handle, &nsock->outerhandle);
handle->sock->streamdns.sock = nsock;
@ -693,7 +685,7 @@ streamdns_accept_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
isc_nmhandle_detach(&nsock->recv_handle);
isc__nmsocket_detach(&nsock->listener);
isc_nmhandle_detach(&nsock->outerhandle);
atomic_store(&nsock->closed, true);
nsock->closed = true;
goto exit;
}
@ -706,7 +698,7 @@ streamdns_accept_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
streamdns_handle_incoming_data(nsock, nsock->outerhandle, NULL, 0);
exit:
atomic_store(&nsock->accepting, false);
nsock->accepting = false;
return (result);
}
@ -745,7 +737,7 @@ isc_nm_listenstreamdns(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
backlog, quota, tlsctx, &listener->outer);
}
if (result != ISC_R_SUCCESS) {
atomic_store(&listener->closed, true);
listener->closed = true;
isc__nmsocket_detach(&listener);
return (result);
}
@ -756,12 +748,9 @@ isc_nm_listenstreamdns(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
}
listener->result = result;
atomic_store(&listener->active, true);
atomic_store(&listener->listening, true);
listener->active = true;
INSIST(listener->outer->streamdns.listener == NULL);
listener->nchildren = listener->outer->nchildren;
isc__nmsocket_barrier_init(listener);
atomic_init(&listener->rchildren, listener->outer->nchildren);
isc__nmsocket_attach(listener, &listener->outer->streamdns.listener);
*sockp = listener;
@ -942,8 +931,8 @@ streamdns_close_direct(isc_nmsocket_t *sock) {
/* Further cleanup performed in isc__nm_streamdns_cleanup_data() */
isc_dnsstream_assembler_clear(sock->streamdns.input);
atomic_store(&sock->closed, true);
atomic_store(&sock->active, false);
sock->closed = true;
sock->active = false;
}
void
@ -951,12 +940,9 @@ isc__nm_streamdns_close(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_streamdnssocket);
REQUIRE(sock->tid == isc_tid());
REQUIRE(!sock->closing);
if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
true))
{
return;
}
sock->closing = true;
streamdns_close_direct(sock);
}

View file

@ -80,8 +80,10 @@ failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult);
static void
failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
REQUIRE(atomic_load(&sock->accepting));
REQUIRE(sock->server);
REQUIRE(sock->accepting);
sock->accepting = false;
/*
* Detach the quota early to make room for other connections;
@ -94,8 +96,6 @@ failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
isc__nmsocket_detach(&sock->server);
atomic_store(&sock->accepting, false);
switch (eresult) {
case ISC_R_NOTCONNECTED:
/* IGNORE: The client disconnected before we could accept */
@ -120,7 +120,7 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
worker = sock->worker;
atomic_store(&sock->connecting, true);
sock->connecting = true;
/* 2 minute timeout */
result = isc__nm_socket_connectiontimeout(sock->fd, 120 * 1000);
@ -165,8 +165,6 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
&req->uv_req.connect);
isc__nmsocket_timer_start(sock);
atomic_store(&sock->connected, true);
return (ISC_R_SUCCESS);
}
@ -189,10 +187,10 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
REQUIRE(VALID_UVREQ(req));
REQUIRE(VALID_NMHANDLE(req->handle));
if (atomic_load(&sock->timedout)) {
if (sock->timedout) {
result = ISC_R_TIMEDOUT;
goto error;
} else if (!atomic_load(&sock->connecting)) {
} else if (!sock->connecting) {
/*
* The connect was cancelled from timeout; just clean up
* the req.
@ -245,7 +243,8 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
goto error;
}
atomic_store(&sock->connecting, false);
sock->connecting = false;
sock->connected = true;
result = isc_sockaddr_fromsockaddr(&sock->peer, (struct sockaddr *)&ss);
RUNTIME_CHECK(result == ISC_R_SUCCESS);
@ -289,7 +288,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
sock->connect_timeout = timeout;
sock->fd = fd;
atomic_init(&sock->client, true);
sock->client = true;
req = isc__nm_uvreq_get(sock);
req->cb.connect = cb;
@ -301,11 +300,11 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
(void)isc__nm_socket_min_mtu(sock->fd, sa_family);
(void)isc__nm_socket_tcp_maxseg(sock->fd, NM_MAXSEG);
atomic_store(&sock->active, true);
sock->active = true;
result = tcp_connect_direct(sock, req);
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->active, false);
sock->active = false;
isc__nm_tcp_close(sock);
isc__nm_connectcb(sock, req, result, true);
}
@ -416,8 +415,6 @@ start_tcp_child_job(void *arg) {
goto done;
}
atomic_store(&sock->listening, true);
if (sock->tid == 0) {
r = uv_tcp_getsockname(&sock->uv_handle.tcp,
(struct sockaddr *)&ss,
@ -437,8 +434,6 @@ done:
result = isc_uverr2result(r);
done_result:
atomic_fetch_add(&sock->parent->rchildren, 1);
if (result != ISC_R_SUCCESS) {
sock->pquota = NULL;
}
@ -506,7 +501,6 @@ isc_nm_listentcp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
sock = isc_mem_get(worker->mctx, sizeof(*sock));
isc__nmsocket_init(sock, worker, isc_nm_tcplistener, iface, NULL);
atomic_init(&sock->rchildren, 0);
sock->nchildren = (workers == ISC_NM_LISTEN_ALL) ? (uint32_t)mgr->nloops
: workers;
children_size = sock->nchildren * sizeof(sock->children[0]);
@ -550,17 +544,16 @@ isc_nm_listentcp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
}
}
atomic_store(&sock->active, true);
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->active, false);
sock->active = false;
isc__nm_tcp_stoplistening(sock);
isc_nmsocket_close(&sock);
return (result);
}
REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren);
sock->active = true;
*sockp = sock;
return (ISC_R_SUCCESS);
}
@ -607,9 +600,10 @@ stop_tcp_child_job(void *arg) {
REQUIRE(sock->tid == isc_tid());
REQUIRE(sock->parent != NULL);
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(!sock->closing);
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closing,
&(bool){ false }, true));
sock->active = false;
sock->closing = true;
/*
* The order of the close operation is important here, the uv_close()
@ -626,52 +620,45 @@ stop_tcp_child_job(void *arg) {
isc__nmsocket_timer_stop(sock);
uv_close(&sock->read_timer, NULL);
(void)atomic_fetch_sub(&sock->parent->rchildren, 1);
REQUIRE(!sock->worker->loop->paused);
isc_barrier_wait(&sock->parent->stop_barrier);
}
static void
stop_tcp_child(isc_nmsocket_t *sock, uint32_t tid) {
isc_nmsocket_t *csock = NULL;
stop_tcp_child(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
csock = &sock->children[tid];
REQUIRE(VALID_NMSOCK(csock));
atomic_store(&csock->active, false);
if (tid == 0) {
stop_tcp_child_job(csock);
if (sock->tid == 0) {
stop_tcp_child_job(sock);
} else {
isc_async_run(csock->worker->loop, stop_tcp_child_job, csock);
isc_async_run(sock->worker->loop, stop_tcp_child_job, sock);
}
}
static void
stop_tcp_parent(isc_nmsocket_t *sock) {
/* Stop the parent */
atomic_store(&sock->closed, true);
isc__nmsocket_prep_destroy(sock);
}
void
isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcplistener);
REQUIRE(sock->tid == isc_tid());
REQUIRE(sock->tid == 0);
REQUIRE(!sock->closing);
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closing,
&(bool){ false }, true));
sock->closing = true;
/* Mark the parent socket inactive */
sock->active = false;
/* Stop all the other threads' children */
for (size_t i = 1; i < sock->nchildren; i++) {
stop_tcp_child(sock, i);
stop_tcp_child(&sock->children[i]);
}
stop_tcp_child(sock, 0);
/* Stop the child for the main thread */
stop_tcp_child(&sock->children[0]);
stop_tcp_parent(sock);
/* Stop the parent */
sock->closed = true;
isc__nmsocket_prep_destroy(sock);
}
static void
@ -681,16 +668,14 @@ tcp_stop_cb(uv_handle_t *handle) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->closing));
REQUIRE(sock->closing);
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(!sock->closed);
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closed,
&(bool){ false }, true));
sock->closed = true;
isc__nm_incstats(sock, STATID_CLOSE);
atomic_store(&sock->listening, false);
isc__nmsocket_detach(&sock);
}
@ -747,9 +732,10 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
/* Initialize the timer */
if (sock->read_timeout == 0) {
sock->read_timeout = (atomic_load(&sock->keepalive)
? atomic_load(&netmgr->keepalive)
: atomic_load(&netmgr->idle));
sock->read_timeout =
sock->keepalive
? atomic_load_relaxed(&netmgr->keepalive)
: atomic_load_relaxed(&netmgr->idle);
}
if (isc__nmsocket_closing(sock)) {
@ -824,10 +810,11 @@ isc__nm_tcp_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
req->uvbuf.base = buf->base;
req->uvbuf.len = nread;
if (!atomic_load(&sock->client)) {
sock->read_timeout = (atomic_load(&sock->keepalive)
? atomic_load(&netmgr->keepalive)
: atomic_load(&netmgr->idle));
if (!sock->client) {
sock->read_timeout =
sock->keepalive
? atomic_load_relaxed(&netmgr->keepalive)
: atomic_load_relaxed(&netmgr->idle);
}
isc__nm_readcb(sock, req, ISC_R_SUCCESS, false);
@ -912,7 +899,7 @@ accept_connection(isc_nmsocket_t *ssock) {
isc__nmsocket_attach(ssock, &csock->server);
csock->recv_cb = ssock->recv_cb;
csock->recv_cbarg = ssock->recv_cbarg;
atomic_init(&csock->accepting, true);
csock->accepting = true;
csock->quota = ssock->pquota;
worker = csock->worker;
@ -964,11 +951,11 @@ accept_connection(isc_nmsocket_t *ssock) {
goto failure;
}
atomic_store(&csock->accepting, false);
csock->accepting = false;
isc__nm_incstats(csock, STATID_ACCEPT);
csock->read_timeout = atomic_load(&csock->worker->netmgr->init);
csock->read_timeout = atomic_load_relaxed(&csock->worker->netmgr->init);
/*
* The acceptcb needs to attach to the handle if it wants to keep the
@ -984,7 +971,7 @@ accept_connection(isc_nmsocket_t *ssock) {
return (ISC_R_SUCCESS);
failure:
atomic_store(&csock->active, false);
csock->active = false;
failed_accept_cb(csock, result);
@ -1022,9 +1009,10 @@ tcp_send(isc_nmhandle_t *handle, const isc_region_t *region, isc_nm_cb_t cb,
uvreq->cbarg = cbarg;
if (sock->write_timeout == 0) {
sock->write_timeout = (atomic_load(&sock->keepalive)
? atomic_load(&netmgr->keepalive)
: atomic_load(&netmgr->idle));
sock->write_timeout =
sock->keepalive
? atomic_load_relaxed(&netmgr->keepalive)
: atomic_load_relaxed(&netmgr->idle);
}
result = tcp_send_direct(sock, uvreq);
@ -1150,10 +1138,11 @@ static void
tcp_close_sock(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->closing));
REQUIRE(sock->closing);
REQUIRE(!sock->closed);
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closed,
&(bool){ false }, true));
sock->closed = true;
sock->connected = false;
isc__nm_incstats(sock, STATID_CLOSE);
@ -1161,8 +1150,6 @@ tcp_close_sock(isc_nmsocket_t *sock) {
isc__nmsocket_detach(&sock->server);
}
atomic_store(&sock->connected, false);
isc__nmsocket_prep_destroy(sock);
}
@ -1181,9 +1168,9 @@ isc__nm_tcp_close(isc_nmsocket_t *sock) {
REQUIRE(!isc__nmsocket_active(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(sock->parent == NULL);
REQUIRE(!sock->closing);
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closing,
&(bool){ false }, true));
sock->closing = true;
if (sock->quota != NULL) {
isc_quota_detach(&sock->quota);
@ -1242,15 +1229,16 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
* If the socket is active, mark it inactive and
* continue. If it isn't active, stop now.
*/
if (!isc__nmsocket_deactivate(sock)) {
if (!sock->active) {
return;
}
sock->active = false;
if (sock->accepting) {
return;
}
if (atomic_load(&sock->accepting)) {
return;
}
if (atomic_load(&sock->connecting)) {
if (sock->connecting) {
isc_nmsocket_t *tsock = NULL;
isc__nmsocket_attach(sock, &tsock);
uv_close(&sock->uv_handle.handle, tcp_close_connect_cb);
@ -1266,11 +1254,15 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
return;
}
/*
* Otherwise, we just send the socket to abyss...
*/
/* Destroy the non-listening socket */
if (sock->parent == NULL) {
isc__nmsocket_prep_destroy(sock);
return;
}
/* Destroy the listening socket if on the same loop */
if (sock->tid == sock->parent->tid) {
isc__nmsocket_prep_destroy(sock->parent);
}
}

View file

@ -107,12 +107,10 @@ tls_try_to_enable_tcp_nodelay(isc_nmsocket_t *tlssock);
*/
static bool
inactive(isc_nmsocket_t *sock) {
return (!isc__nmsocket_active(sock) || atomic_load(&sock->closing) ||
return (!isc__nmsocket_active(sock) || sock->closing ||
sock->outerhandle == NULL ||
!isc__nmsocket_active(sock->outerhandle->sock) ||
atomic_load(&sock->outerhandle->sock->closing) ||
(sock->listener != NULL &&
!isc__nmsocket_active(sock->listener)) ||
sock->outerhandle->sock->closing ||
isc__nm_closing(sock->worker));
}
@ -384,53 +382,42 @@ tls_process_outgoing(isc_nmsocket_t *sock, bool finish,
static int
tls_try_handshake(isc_nmsocket_t *sock, isc_result_t *presult) {
int rv = 0;
isc_nmhandle_t *tlshandle = NULL;
REQUIRE(sock->tlsstream.state == TLS_HANDSHAKE);
if (SSL_is_init_finished(sock->tlsstream.tls) == 1) {
return (0);
}
rv = SSL_do_handshake(sock->tlsstream.tls);
int rv = SSL_do_handshake(sock->tlsstream.tls);
if (rv == 1) {
isc_nmhandle_t *tlshandle = NULL;
isc_result_t result = ISC_R_SUCCESS;
REQUIRE(sock->statichandle == NULL);
INSIST(SSL_is_init_finished(sock->tlsstream.tls) == 1);
INSIST(sock->statichandle == NULL);
isc__nmsocket_log_tls_session_reuse(sock, sock->tlsstream.tls);
tlshandle = isc__nmhandle_get(sock, &sock->peer, &sock->iface);
tls_read_stop(sock);
if (isc__nm_closing(sock->worker)) {
result = ISC_R_SHUTTINGDOWN;
}
if (sock->tlsstream.server) {
/*
* We need to check for 'sock->listener->closing' to
* make sure that we are not breaking the contract by
* calling an accept callback after the listener socket
* was shot down. Also, in this case the accept callback
* can be 'NULL'. That can happen as calling the accept
* callback in TLS is deferred until handshake is done.
* There is a possibility for that to happen *after* the
* underlying TCP connection was accepted. That is, a
* situation possible when the underlying TCP connection
* was accepted, handshake related data transmission
* took place, but in the middle of that the socket is
* being shot down before the TLS accept callback could
* have been called.
* The listening sockets are now closed from outer
* to inner order, which means that this function
* will never be called when the outer socket has
* stopped listening.
*
* Also see 'isc__nmsocket_stop()' - the function used
* to shut down the listening TLS socket - for more
* details.
*/
if (isc__nm_closing(sock->worker)) {
result = ISC_R_SHUTTINGDOWN;
} else if (isc__nmsocket_closing(sock) ||
atomic_load(&sock->listener->closing))
{
result = ISC_R_CANCELED;
} else {
result = sock->listener->accept_cb(
tlshandle, result,
sock->listener->accept_cbarg);
if (result == ISC_R_SUCCESS) {
result = sock->accept_cb(tlshandle, result,
sock->accept_cbarg);
}
} else {
tls_call_connect_cb(sock, tlshandle, result);
@ -544,13 +531,13 @@ tls_do_bio(isc_nmsocket_t *sock, isc_region_t *received_data,
INSIST(SSL_is_init_finished(
sock->tlsstream.tls) ==
1);
INSIST(!atomic_load(&sock->client));
INSIST(!sock->client);
finish = true;
} else if (sock->tlsstream.state == TLS_IO &&
hs_result == ISC_R_SUCCESS &&
!sock->tlsstream.server)
{
INSIST(atomic_load(&sock->client));
INSIST(sock->client);
}
}
} else if (send_data != NULL) {
@ -867,17 +854,19 @@ tlslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
isc_tlsctx_attach(tlsctx, &tlssock->tlsstream.ctx);
tlssock->tlsstream.tls = isc_tls_create(tlssock->tlsstream.ctx);
if (tlssock->tlsstream.tls == NULL) {
atomic_store(&tlssock->closed, true);
tlssock->closed = true;
isc_tlsctx_free(&tlssock->tlsstream.ctx);
isc__nmsocket_detach(&tlssock);
return (ISC_R_TLSERROR);
}
isc__nmsocket_attach(tlslistensock, &tlssock->listener);
tlssock->accept_cb = tlslistensock->accept_cb;
tlssock->accept_cbarg = tlslistensock->accept_cbarg;
isc__nmsocket_attach(handle->sock, &tlssock->listener);
isc_nmhandle_attach(handle, &tlssock->outerhandle);
tlssock->peer = handle->sock->peer;
tlssock->read_timeout =
atomic_load(&handle->sock->worker->netmgr->init);
atomic_load_relaxed(&handle->sock->worker->netmgr->init);
/*
* Hold a reference to tlssock in the TCP socket: it will
@ -932,7 +921,7 @@ isc_nm_listentls(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
result = isc_nm_listentcp(mgr, workers, iface, tlslisten_acceptcb,
tlssock, backlog, quota, &tlssock->outer);
if (result != ISC_R_SUCCESS) {
atomic_store(&tlssock->closed, true);
tlssock->closed = true;
isc__nmsocket_detach(&tlssock);
return (result);
}
@ -945,18 +934,14 @@ isc_nm_listentls(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
/* wait for listen result */
isc__nmsocket_attach(tlssock->outer, &tsock);
tlssock->result = result;
atomic_store(&tlssock->active, true);
tlssock->active = true;
INSIST(tlssock->outer->tlsstream.tlslistener == NULL);
isc__nmsocket_attach(tlssock, &tlssock->outer->tlsstream.tlslistener);
isc__nmsocket_detach(&tsock);
INSIST(result != ISC_R_UNSET);
tlssock->nchildren = tlssock->outer->nchildren;
isc__nmsocket_barrier_init(tlssock);
atomic_init(&tlssock->rchildren, tlssock->nchildren);
if (result == ISC_R_SUCCESS) {
atomic_store(&tlssock->listening, true);
*sockp = tlssock;
}
@ -1086,8 +1071,8 @@ tls_close_direct(void *arg) {
}
/* Further cleanup performed in isc__nm_tls_cleanup_data() */
atomic_store(&sock->closed, true);
atomic_store(&sock->active, false);
sock->closed = true;
sock->active = false;
sock->tlsstream.state = TLS_CLOSED;
}
@ -1095,12 +1080,9 @@ void
isc__nm_tls_close(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tlssocket);
REQUIRE(!sock->closing);
if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
true))
{
return;
}
sock->closing = true;
if (sock->tid == isc_tid()) {
/* no point in attempting to make the call asynchronous */
@ -1144,7 +1126,7 @@ isc_nm_tlsconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
nsock->connect_cbarg = cbarg;
nsock->connect_timeout = timeout;
isc_tlsctx_attach(ctx, &nsock->tlsstream.ctx);
atomic_init(&nsock->client, true);
nsock->client = true;
if (client_sess_cache != NULL) {
INSIST(isc_tlsctx_client_session_cache_getctx(
client_sess_cache) == ctx);
@ -1194,7 +1176,7 @@ tcp_connected(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
}
tlssock->peer = isc_nmhandle_peeraddr(handle);
isc_nmhandle_attach(handle, &tlssock->outerhandle);
atomic_store(&tlssock->active, true);
tlssock->active = true;
if (tlssock->tlsstream.client_sess_cache != NULL) {
isc_tlsctx_client_session_cache_reuse_sockaddr(
@ -1215,7 +1197,7 @@ tcp_connected(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
return;
error:
tlshandle = isc__nmhandle_get(tlssock, NULL, NULL);
atomic_store(&tlssock->closed, true);
tlssock->closed = true;
tls_call_connect_cb(tlssock, tlshandle, result);
isc_nmhandle_detach(&tlshandle);
isc__nmsocket_detach(&tlssock);
@ -1246,7 +1228,7 @@ isc__nm_tls_cleanup_data(isc_nmsocket_t *sock) {
isc_tlsctx_free(&sock->tlsstream.ctx);
}
if (sock->tlsstream.client_sess_cache != NULL) {
INSIST(atomic_load(&sock->client));
INSIST(sock->client);
isc_tlsctx_client_session_cache_detach(
&sock->tlsstream.client_sess_cache);
}
@ -1468,7 +1450,7 @@ tls_keep_client_tls_session(isc_nmsocket_t *sock) {
if (sock->tlsstream.client_sess_cache != NULL &&
sock->tlsstream.client_session_saved == false)
{
INSIST(atomic_load(&sock->client));
INSIST(sock->client);
isc_tlsctx_client_session_cache_keep_sockaddr(
sock->tlsstream.client_sess_cache, &sock->peer,
sock->tlsstream.tls);

View file

@ -168,12 +168,8 @@ start_udp_child_job(void *arg) {
}
sock->reading = true;
atomic_store(&sock->listening, true);
done:
result = isc_uverr2result(r);
atomic_fetch_add(&sock->parent->rchildren, 1);
sock->result = result;
@ -233,7 +229,6 @@ isc_nm_listenudp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
sock = isc_mem_get(worker->mctx, sizeof(isc_nmsocket_t));
isc__nmsocket_init(sock, worker, isc_nm_udplistener, iface, NULL);
atomic_init(&sock->rchildren, 0);
sock->nchildren = (workers == ISC_NM_LISTEN_ALL) ? (uint32_t)mgr->nloops
: workers;
children_size = sock->nchildren * sizeof(sock->children[0]);
@ -275,16 +270,16 @@ isc_nm_listenudp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface,
}
}
atomic_store(&sock->active, true);
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->active, false);
sock->active = false;
isc__nm_udp_stoplistening(sock);
isc_nmsocket_close(&sock);
return (result);
}
INSIST(atomic_load(&sock->rchildren) == sock->nchildren);
sock->active = true;
*sockp = sock;
return (ISC_R_SUCCESS);
}
@ -328,7 +323,7 @@ route_connect_direct(isc_nmsocket_t *sock) {
worker = sock->worker;
atomic_store(&sock->connecting, true);
sock->connecting = true;
r = uv_udp_init(&worker->loop->loop, &sock->uv_handle.udp);
UV_RUNTIME_CHECK(uv_udp_init, r);
@ -350,8 +345,8 @@ route_connect_direct(isc_nmsocket_t *sock) {
isc__nm_set_network_buffers(sock->worker->netmgr,
&sock->uv_handle.handle);
atomic_store(&sock->connecting, false);
atomic_store(&sock->connected, true);
sock->connecting = false;
sock->connected = true;
return (ISC_R_SUCCESS);
}
@ -384,7 +379,7 @@ isc_nm_routeconnect(isc_nm_t *mgr, isc_nm_cb_t cb, void *cbarg) {
sock->connect_cb = cb;
sock->connect_cbarg = cbarg;
atomic_init(&sock->client, true);
sock->client = true;
sock->route_sock = true;
sock->fd = fd;
@ -393,11 +388,11 @@ isc_nm_routeconnect(isc_nm_t *mgr, isc_nm_cb_t cb, void *cbarg) {
req->cbarg = cbarg;
req->handle = isc__nmhandle_get(sock, NULL, NULL);
atomic_store(&sock->active, true);
sock->active = true;
result = route_connect_direct(sock);
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->active, false);
sock->active = false;
isc__nm_udp_close(sock);
}
@ -425,9 +420,9 @@ stop_udp_child_job(void *arg) {
REQUIRE(sock->tid == isc_tid());
REQUIRE(sock->parent != NULL);
isc__nm_udp_close(sock);
sock->active = false;
(void)atomic_fetch_sub(&sock->parent->rchildren, 1);
isc__nm_udp_close(sock);
REQUIRE(!sock->worker->loop->paused);
isc_barrier_wait(&sock->parent->stop_barrier);
@ -437,7 +432,6 @@ static void
stop_udp_child(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
atomic_store(&sock->active, false);
if (sock->tid == 0) {
stop_udp_child_job(sock);
} else {
@ -451,12 +445,12 @@ isc__nm_udp_stoplistening(isc_nmsocket_t *sock) {
REQUIRE(sock->type == isc_nm_udplistener);
REQUIRE(sock->tid == isc_tid());
REQUIRE(sock->tid == 0);
REQUIRE(!sock->closing);
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closing,
&(bool){ false }, true));
sock->closing = true;
/* Mark the parent socket inactive */
atomic_store(&sock->active, false);
sock->active = false;
/* Stop all the other threads' children */
for (size_t i = 1; i < sock->nchildren; i++) {
@ -467,7 +461,7 @@ isc__nm_udp_stoplistening(isc_nmsocket_t *sock) {
stop_udp_child(&sock->children[0]);
/* Stop the parent */
atomic_store(&sock->closed, true);
sock->closed = true;
isc__nmsocket_prep_destroy(sock);
}
@ -509,7 +503,7 @@ isc__nm_udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
* - If we're simulating a firewall blocking UDP packets
* bigger than 'maxudp' bytes for testing purposes.
*/
maxudp = atomic_load(&sock->worker->netmgr->maxudp);
maxudp = atomic_load_relaxed(&sock->worker->netmgr->maxudp);
if ((maxudp != 0 && (uint32_t)nrecv > maxudp)) {
/*
* We need to keep the read_cb intact in case, so the
@ -582,7 +576,7 @@ isc__nm_udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
* stop reading now. The reading could be restarted in the read
* callback with another isc_nm_read() call.
*/
if (atomic_load(&sock->client)) {
if (sock->client) {
isc__nmsocket_timer_stop(sock);
isc__nm_stop_reading(sock);
}
@ -654,7 +648,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region,
isc_nm_cb_t cb, void *cbarg) {
isc_nmsocket_t *sock = handle->sock;
const isc_sockaddr_t *peer = &handle->peer;
const struct sockaddr *sa = &peer->type.sa;
const struct sockaddr *sa = sock->connected ? NULL : &peer->type.sa;
isc__nm_uvreq_t *uvreq = NULL;
isc__networker_t *worker = NULL;
uint32_t maxudp;
@ -700,15 +694,6 @@ isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region,
goto fail;
}
/*
* We used uv_udp_connect(), so the peer address has to be
* set to NULL or else uv_udp_send() could fail or assert,
* depending on the libuv version.
*/
if (atomic_load(&sock->connected)) {
sa = NULL;
}
r = uv_udp_send(&uvreq->uv_req.udp_send, &sock->uv_handle.udp,
&uvreq->uvbuf, 1, sa, udp_send_cb);
if (r < 0) {
@ -813,7 +798,7 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
sock->connect_cbarg = cbarg;
sock->read_timeout = timeout;
sock->peer = *peer;
atomic_init(&sock->client, true);
sock->client = true;
sock->fd = fd;
result = isc__nm_socket_reuse(sock->fd);
@ -834,19 +819,19 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
req->local = *local;
req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface);
atomic_store(&sock->active, true);
atomic_store(&sock->connecting, true);
sock->active = true;
sock->connecting = true;
result = udp_connect_direct(sock, req);
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->active, false);
sock->active = false;
isc__nm_failed_connect_cb(sock, req, result, true);
isc__nmsocket_detach(&sock);
return;
}
atomic_store(&sock->connecting, false);
atomic_store(&sock->connected, true);
sock->connecting = false;
sock->connected = true;
isc__nm_connectcb(sock, req, ISC_R_SUCCESS, true);
isc__nmsocket_detach(&sock);
@ -859,7 +844,7 @@ isc__nm_udp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result,
REQUIRE(result != ISC_R_SUCCESS);
REQUIRE(sock->tid == isc_tid());
if (atomic_load(&sock->client)) {
if (sock->client) {
isc__nmsocket_timer_stop(sock);
isc__nm_stop_reading(sock);
@ -952,13 +937,10 @@ udp_close_cb(uv_handle_t *handle) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->closing));
REQUIRE(sock->closing);
REQUIRE(!sock->closed);
if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false },
true))
{
UNREACHABLE();
}
sock->closed = true;
isc__nm_incstats(sock, STATID_CLOSE);
@ -969,12 +951,10 @@ udp_close_cb(uv_handle_t *handle) {
if (sock->parent != NULL) {
/* listening socket (listen) */
atomic_store(&sock->listening, false);
isc__nmsocket_detach(&sock);
} else {
/* client and server sockets */
atomic_store(&sock->connected, false);
atomic_store(&sock->listening, false);
sock->connected = false;
isc__nmsocket_prep_destroy(sock);
}
}
@ -984,12 +964,9 @@ isc__nm_udp_close(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_udpsocket);
REQUIRE(sock->tid == isc_tid());
REQUIRE(!sock->closing);
if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
true))
{
return;
}
sock->closing = true;
isc__nmsocket_clearcb(sock);
isc__nmsocket_timer_stop(sock);
@ -1025,17 +1002,13 @@ isc__nm_udp_shutdown(isc_nmsocket_t *sock) {
* If the socket is active, mark it inactive and
* continue. If it isn't active, stop now.
*/
if (!isc__nmsocket_deactivate(sock)) {
if (!sock->active) {
return;
}
sock->active = false;
/*
* If the socket is connecting, the cancel will happen in the
* async_udpconnect() due socket being inactive now.
*/
if (atomic_load(&sock->connecting)) {
return;
}
/* uv_udp_connect is synchronous, we can't be in connected state */
REQUIRE(!sock->connecting);
/*
* When the client detaches the last handle, the
@ -1051,17 +1024,16 @@ isc__nm_udp_shutdown(isc_nmsocket_t *sock) {
return;
}
/*
* Ignore the listening sockets
*/
if (sock->parent != NULL) {
/* Destroy the non-listening socket */
if (sock->parent == NULL) {
isc__nmsocket_prep_destroy(sock);
return;
}
/*
* Otherwise, we just send the socket to abyss...
*/
isc__nmsocket_prep_destroy(sock);
/* Destroy the listening socket if on the same loop */
if (sock->tid == sock->parent->tid) {
isc__nmsocket_prep_destroy(sock->parent);
}
}
static void
@ -1070,7 +1042,7 @@ udp_cancelread_cb(void *arg) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->client));
REQUIRE(sock->client);
isc__nm_failed_read_cb(sock, ISC_R_EOF, false);
isc__nmsocket_detach(&sock);