Merge branch 'ondrej-loopmgr-cleanup-tcp-implementation' into 'main'

Cleanup the asychronous code in the TCP implementation

See merge request isc-projects/bind9!6703
This commit is contained in:
Ondřej Surý 2022-09-22 12:58:32 +00:00
commit e5ff78dfbd
15 changed files with 220 additions and 587 deletions

View file

@ -413,7 +413,6 @@ process_request(isc_httpd_t *httpd, isc_region_t *region, size_t *buflen) {
const char *content_length = NULL;
size_t limit = sizeof(httpd->recvbuf) - httpd->recvlen - 1;
size_t len = region->length;
size_t clen = 0;
int delim;
bool truncated = false;
@ -579,7 +578,7 @@ process_request(isc_httpd_t *httpd, isc_region_t *region, size_t *buflen) {
} else {
INSIST(content_length != NULL);
clen = (size_t)strtoul(content_length, NULL, 10);
size_t clen = (size_t)strtoul(content_length, NULL, 10);
if (clen == ULONG_MAX) {
/* Invalid number in the header value. */
return (ISC_R_BADNUMBER);
@ -931,7 +930,8 @@ httpd_request(isc_nmhandle_t *handle, isc_result_t eresult,
* ISC_R_NOTFOUND is not returned from netmgr) and we
* need to resume reading.
*/
isc_nm_resumeread(httpd->readhandle);
isc_nm_read(httpd->readhandle, httpd_request,
httpd->mgr);
return;
}
goto cleanup_readhandle;
@ -1043,7 +1043,7 @@ httpd_request(isc_nmhandle_t *handle, isc_result_t eresult,
*/
isc_buffer_usedregion(httpd->sendbuffer, &r);
isc_nm_pauseread(httpd->handle);
isc_nm_read_stop(httpd->handle);
httpd->state = SEND;
isc_nmhandle_attach(httpd->handle, &httpd->sendhandle);
@ -1072,7 +1072,8 @@ isc_httpdmgr_shutdown(isc_httpdmgr_t **httpdmgrp) {
httpd = ISC_LIST_HEAD(httpdmgr->running);
while (httpd != NULL) {
isc_nm_cancelread(httpd->readhandle);
isc_nm_read_stop(httpd->readhandle);
isc_nmhandle_detach(&httpd->readhandle);
httpd = ISC_LIST_NEXT(httpd, link);
}
UNLOCK(&httpdmgr->lock);
@ -1232,7 +1233,7 @@ httpd_senddone(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
*/
httpd_request(httpd->handle, ISC_R_SUCCESS, NULL, httpd->mgr);
} else if (!httpd->truncated) {
isc_nm_resumeread(httpd->readhandle);
isc_nm_read(httpd->readhandle, httpd_request, httpd->mgr);
} else {
/* Truncated request, don't resume */
goto cleanup_readhandle;

View file

@ -308,9 +308,9 @@ isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg);
*/
void
isc_nm_pauseread(isc_nmhandle_t *handle);
isc_nm_read_stop(isc_nmhandle_t *handle);
/*%<
* Pause reading on this handle's socket, but remember the callback.
* Stop reading on this handle's socket.
*
* Requires:
* \li 'handle' is a valid netmgr handle.
@ -323,19 +323,12 @@ isc_nm_cancelread(isc_nmhandle_t *handle);
* active handles with a result code of ISC_R_CANCELED.
*
* Requires:
* \li 'sock' is a valid netmgr socket
* \li 'sock' is a valid datagram-like netmgr socket
* \li ...for which a read/recv callback has been defined.
*/
void
isc_nm_resumeread(isc_nmhandle_t *handle);
/*%<
* Resume reading on the handle's socket.
*
* Requires:
* \li 'handle' is a valid netmgr handle.
* \li ...for a socket with a defined read/recv callback.
*/
isc_nmhandle_close(isc_nmhandle_t *handle);
void
isc_nm_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,

View file

@ -490,7 +490,8 @@ finish_http_session(isc_nm_http_session_t *session) {
if (session->handle != NULL) {
if (!session->closed) {
session->closed = true;
isc_nm_cancelread(session->handle);
session->reading = false;
isc_nmhandle_close(session->handle);
}
if (session->client) {
@ -1007,7 +1008,7 @@ http_readcb(isc_nmhandle_t *handle, isc_result_t result, isc_region_t *region,
}
isc_buffer_putmem(session->buf, region->base + readlen,
unread_size);
isc_nm_pauseread(session->handle);
isc_nm_read_stop(session->handle);
}
/* We might have something to receive or send, do IO */
@ -1287,11 +1288,11 @@ http_do_bio(isc_nm_http_session_t *session, isc_nmhandle_t *send_httphandle,
return;
} else {
/* Resume reading, it's idempotent, wait for more */
isc_nm_resumeread(session->handle);
isc_nm_read(session->handle, http_readcb, session);
}
} else {
/* We don't want more data, stop reading for now */
isc_nm_pauseread(session->handle);
isc_nm_read_stop(session->handle);
}
if (send_cb != NULL) {
@ -1332,31 +1333,22 @@ get_http_cstream(isc_nmsocket_t *sock, http_cstream_t **streamp) {
static void
http_call_connect_cb(isc_nmsocket_t *sock, isc_nm_http_session_t *session,
isc_result_t result) {
isc__nm_uvreq_t *req = NULL;
isc_nmhandle_t *httphandle = isc__nmhandle_get(sock, &sock->peer,
&sock->iface);
void *cbarg;
isc_nm_cb_t connect_cb;
REQUIRE(sock->connect_cb != NULL);
cbarg = sock->connect_cbarg;
connect_cb = sock->connect_cb;
isc__nmsocket_clearcb(sock);
if (result == ISC_R_SUCCESS) {
req = isc__nm_uvreq_get(sock->worker, sock);
req->cb.connect = sock->connect_cb;
req->cbarg = sock->connect_cbarg;
if (session != NULL) {
session->client_httphandle = httphandle;
req->handle = NULL;
isc_nmhandle_attach(httphandle, &req->handle);
} else {
req->handle = httphandle;
}
isc__nmsocket_clearcb(sock);
isc__nm_connectcb(sock, req, result, true);
connect_cb(httphandle, result, cbarg);
} else {
void *cbarg = sock->connect_cbarg;
isc_nm_cb_t connect_cb = sock->connect_cb;
isc__nmsocket_clearcb(sock);
connect_cb(httphandle, result, cbarg);
isc_nmhandle_detach(&httphandle);
}

View file

@ -254,13 +254,7 @@ struct isc_nmhandle {
typedef enum isc__netievent_type {
netievent_udpcancel,
netievent_tcpconnect,
netievent_tcpclose,
netievent_tcpsend,
netievent_tcpstartread,
netievent_tcppauseread,
netievent_tcpaccept,
netievent_tcpcancel,
netievent_tcpdnsaccept,
netievent_tcpdnsconnect,
@ -271,10 +265,8 @@ typedef enum isc__netievent_type {
netievent_tlsclose,
netievent_tlssend,
netievent_tlsstartread,
netievent_tlsconnect,
netievent_tlsdobio,
netievent_tlscancel,
netievent_tlsdnsaccept,
netievent_tlsdnsconnect,
@ -924,7 +916,6 @@ struct isc_nmsocket {
TLS_CLOSED
} state; /*%< The order of these is significant */
size_t nsending;
bool reading;
} tlsstream;
isc_nmsocket_h2_t h2;
@ -1005,7 +996,7 @@ struct isc_nmsocket {
atomic_bool connecting;
atomic_bool connected;
atomic_bool accepting;
atomic_bool reading;
bool reading;
atomic_bool timedout;
isc_refcount_t references;
@ -1020,11 +1011,6 @@ struct isc_nmsocket {
*/
bool processing;
/*%
* A TCP socket has had isc_nm_pauseread() called.
*/
atomic_bool readpaused;
/*%
* A TCP or TCPDNS socket has been set to use the keepalive
* timeout instead of the default idle timeout.
@ -1326,7 +1312,7 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, const isc_region_t *region,
void
isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg);
/*
* Back-end implementation of isc_nm_read() for TCP handles.
* Start reading on this handle.
*/
void
@ -1335,16 +1321,9 @@ isc__nm_tcp_close(isc_nmsocket_t *sock);
* Close a TCP socket.
*/
void
isc__nm_tcp_pauseread(isc_nmhandle_t *handle);
isc__nm_tcp_read_stop(isc_nmhandle_t *handle);
/*%<
* Pause reading on this handle, while still remembering the callback.
*/
void
isc__nm_tcp_resumeread(isc_nmhandle_t *handle);
/*%<
* Resume reading from socket.
*
* Stop reading on this handle.
*/
void
@ -1366,40 +1345,18 @@ isc__nm_tcp_stoplistening(isc_nmsocket_t *sock);
* Stop listening on 'sock'.
*/
int_fast32_t
isc__nm_tcp_listener_nactive(isc_nmsocket_t *sock);
/*%<
* Returns the number of active connections for the TCP listener socket.
*/
void
isc__nm_tcp_settimeout(isc_nmhandle_t *handle, uint32_t timeout);
/*%<
* Set the read timeout for the TCP socket associated with 'handle'.
*/
void
isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpaccept(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_pauseread(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpstartread(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcppauseread(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpcancel(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0);
/*%<
* Callback handlers for asynchronous TCP events (connect, listen,
* stoplisten, send, read, pause, close).
@ -1411,14 +1368,9 @@ isc__nm_async_tlsclose(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tlssend(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tlsstartread(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tlsdobio(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tlscancel(isc__networker_t *worker, isc__netievent_t *ev0);
/*%<
* Callback handlers for asynchronous TLS events.
*/
@ -1570,15 +1522,15 @@ void
isc__nm_tls_send(isc_nmhandle_t *handle, const isc_region_t *region,
isc_nm_cb_t cb, void *cbarg);
void
isc__nm_tls_cancelread(isc_nmhandle_t *handle);
/*%<
* Back-end implementation of isc_nm_send() for TLSDNS handles.
*/
void
isc__nm_tls_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg);
/*%<
* Start reading on the TLS handle.
*/
void
isc__nm_tls_close(isc_nmsocket_t *sock);
@ -1587,16 +1539,9 @@ isc__nm_tls_close(isc_nmsocket_t *sock);
*/
void
isc__nm_tls_pauseread(isc_nmhandle_t *handle);
isc__nm_tls_read_stop(isc_nmhandle_t *handle);
/*%<
* Pause reading on this handle, while still remembering the callback.
*/
void
isc__nm_tls_resumeread(isc_nmhandle_t *handle);
/*%<
* Resume reading from the handle.
*
* Stop reading on the TLS handle.
*/
void
@ -1632,6 +1577,9 @@ void
isc__nmhandle_tls_setwritetimeout(isc_nmhandle_t *handle,
uint64_t write_timeout);
void
isc__nm_tls_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result);
void
isc__nm_http_stoplistening(isc_nmsocket_t *sock);
@ -1819,16 +1767,12 @@ isc__nm_set_network_buffers(isc_nm_t *nm, uv_handle_t *handle);
* typedef all the netievent types
*/
NETIEVENT_SOCKET_TYPE(tcpclose);
NETIEVENT_SOCKET_TYPE(tcplisten);
NETIEVENT_SOCKET_TYPE(tcppauseread);
NETIEVENT_SOCKET_TYPE(tcpstop);
NETIEVENT_SOCKET_TYPE(tlsclose);
/* NETIEVENT_SOCKET_TYPE(tlsconnect); */ /* unique type, defined independently
*/
NETIEVENT_SOCKET_TYPE(tlsdobio);
NETIEVENT_SOCKET_TYPE(tlsstartread);
NETIEVENT_SOCKET_HANDLE_TYPE(tlscancel);
NETIEVENT_SOCKET_TYPE(udplisten);
NETIEVENT_SOCKET_TYPE(udpstop);
@ -1858,9 +1802,6 @@ NETIEVENT_SOCKET_TYPE(httpclose);
NETIEVENT_SOCKET_HTTP_EPS_TYPE(httpendpoints);
#endif /* HAVE_LIBNGHTTP2 */
NETIEVENT_SOCKET_REQ_TYPE(tcpconnect);
NETIEVENT_SOCKET_REQ_TYPE(tcpsend);
NETIEVENT_SOCKET_TYPE(tcpstartread);
NETIEVENT_SOCKET_REQ_TYPE(tlssend);
NETIEVENT_SOCKET_REQ_RESULT_TYPE(connectcb);
@ -1868,7 +1809,6 @@ NETIEVENT_SOCKET_REQ_RESULT_TYPE(readcb);
NETIEVENT_SOCKET_REQ_RESULT_TYPE(sendcb);
NETIEVENT_SOCKET_HANDLE_TYPE(detach);
NETIEVENT_SOCKET_HANDLE_TYPE(tcpcancel);
NETIEVENT_SOCKET_HANDLE_TYPE(udpcancel);
NETIEVENT_SOCKET_QUOTA_TYPE(tcpaccept);
@ -1877,16 +1817,11 @@ NETIEVENT_SOCKET_TLSCTX_TYPE(settlsctx);
/* Now declared the helper functions */
NETIEVENT_SOCKET_DECL(tcpclose);
NETIEVENT_SOCKET_DECL(tcplisten);
NETIEVENT_SOCKET_DECL(tcppauseread);
NETIEVENT_SOCKET_DECL(tcpstartread);
NETIEVENT_SOCKET_DECL(tcpstop);
NETIEVENT_SOCKET_DECL(tlsclose);
NETIEVENT_SOCKET_DECL(tlsconnect);
NETIEVENT_SOCKET_DECL(tlsdobio);
NETIEVENT_SOCKET_DECL(tlsstartread);
NETIEVENT_SOCKET_HANDLE_DECL(tlscancel);
NETIEVENT_SOCKET_DECL(udplisten);
NETIEVENT_SOCKET_DECL(udpstop);
@ -1916,15 +1851,12 @@ NETIEVENT_SOCKET_DECL(httpclose);
NETIEVENT_SOCKET_HTTP_EPS_DECL(httpendpoints);
#endif /* HAVE_LIBNGHTTP2 */
NETIEVENT_SOCKET_REQ_DECL(tcpconnect);
NETIEVENT_SOCKET_REQ_DECL(tcpsend);
NETIEVENT_SOCKET_REQ_DECL(tlssend);
NETIEVENT_SOCKET_REQ_RESULT_DECL(connectcb);
NETIEVENT_SOCKET_REQ_RESULT_DECL(readcb);
NETIEVENT_SOCKET_REQ_RESULT_DECL(sendcb);
NETIEVENT_SOCKET_HANDLE_DECL(tcpcancel);
NETIEVENT_SOCKET_HANDLE_DECL(udpcancel);
NETIEVENT_SOCKET_DECL(detach);

View file

@ -451,14 +451,8 @@ process_netievent(void *arg) {
NETIEVENT_CASE(udpcancel);
NETIEVENT_CASE(tcpaccept);
NETIEVENT_CASE(tcpconnect);
NETIEVENT_CASE(tcplisten);
NETIEVENT_CASE(tcpstartread);
NETIEVENT_CASE(tcppauseread);
NETIEVENT_CASE(tcpsend);
NETIEVENT_CASE(tcpstop);
NETIEVENT_CASE(tcpcancel);
NETIEVENT_CASE(tcpclose);
NETIEVENT_CASE(tcpdnsaccept);
NETIEVENT_CASE(tcpdnslisten);
@ -481,11 +475,9 @@ process_netievent(void *arg) {
NETIEVENT_CASE(tlsdnsshutdown);
#if HAVE_LIBNGHTTP2
NETIEVENT_CASE(tlsstartread);
NETIEVENT_CASE(tlssend);
NETIEVENT_CASE(tlsclose);
NETIEVENT_CASE(tlsdobio);
NETIEVENT_CASE(tlscancel);
NETIEVENT_CASE(httpsend);
NETIEVENT_CASE(httpclose);
@ -522,16 +514,11 @@ isc__nm_put_netievent(isc__networker_t *worker, void *ievent) {
isc__networker_unref(worker);
}
NETIEVENT_SOCKET_DEF(tcpclose);
NETIEVENT_SOCKET_DEF(tcplisten);
NETIEVENT_SOCKET_DEF(tcppauseread);
NETIEVENT_SOCKET_DEF(tcpstartread);
NETIEVENT_SOCKET_DEF(tcpstop);
NETIEVENT_SOCKET_DEF(tlsclose);
NETIEVENT_SOCKET_DEF(tlsconnect);
NETIEVENT_SOCKET_DEF(tlsdobio);
NETIEVENT_SOCKET_DEF(tlsstartread);
NETIEVENT_SOCKET_HANDLE_DEF(tlscancel);
NETIEVENT_SOCKET_DEF(udplisten);
NETIEVENT_SOCKET_DEF(udpstop);
NETIEVENT_SOCKET_HANDLE_DEF(udpcancel);
@ -562,15 +549,12 @@ NETIEVENT_SOCKET_DEF(httpclose);
NETIEVENT_SOCKET_HTTP_EPS_DEF(httpendpoints);
#endif /* HAVE_LIBNGHTTP2 */
NETIEVENT_SOCKET_REQ_DEF(tcpconnect);
NETIEVENT_SOCKET_REQ_DEF(tcpsend);
NETIEVENT_SOCKET_REQ_DEF(tlssend);
NETIEVENT_SOCKET_REQ_RESULT_DEF(connectcb);
NETIEVENT_SOCKET_REQ_RESULT_DEF(readcb);
NETIEVENT_SOCKET_REQ_RESULT_DEF(sendcb);
NETIEVENT_SOCKET_DEF(detach);
NETIEVENT_SOCKET_HANDLE_DEF(tcpcancel);
NETIEVENT_SOCKET_QUOTA_DEF(tcpaccept);
@ -802,6 +786,15 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG) {
}
}
void
isc_nmhandle_close(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
isc__nmsocket_clearcb(handle->sock);
isc__nm_failed_read_cb(handle->sock, ISC_R_EOF, false);
}
void
isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) {
REQUIRE(sock->parent == NULL);
@ -833,7 +826,7 @@ isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) {
*
* If it's a regular socket we may need to close it.
*/
if (!atomic_load(&sock->closed)) {
if (!atomic_load(&sock->closing) && !atomic_load(&sock->closed)) {
switch (sock->type) {
case isc_nm_udpsocket:
isc__nm_udp_close(sock);
@ -850,7 +843,7 @@ isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) {
#if HAVE_LIBNGHTTP2
case isc_nm_tlssocket:
isc__nm_tls_close(sock);
break;
return;
case isc_nm_httpsocket:
isc__nm_http_close(sock);
return;
@ -1003,7 +996,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker,
sock, isc_refcount_current(&sock->references));
atomic_init(&sock->active, true);
atomic_init(&sock->readpaused, false);
atomic_init(&sock->closing, false);
atomic_init(&sock->listening, 0);
atomic_init(&sock->closed, 0);
@ -1406,6 +1398,11 @@ isc__nm_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result, bool async) {
case isc_nm_tlsdnssocket:
isc__nm_tlsdns_failed_read_cb(sock, result, async);
return;
#ifdef HAVE_LIBNGHTTP2
case isc_nm_tlssocket:
isc__nm_tls_failed_read_cb(sock, result);
return;
#endif
default:
UNREACHABLE();
}
@ -1486,7 +1483,7 @@ isc__nmsocket_readtimeout_cb(uv_timer_t *timer) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->reading));
REQUIRE(sock->reading);
if (atomic_load(&sock->client)) {
uv_timer_stop(timer);
@ -1643,7 +1640,7 @@ isc__nm_start_reading(isc_nmsocket_t *sock) {
isc_result_t result = ISC_R_SUCCESS;
int r;
if (atomic_load(&sock->reading)) {
if (sock->reading) {
return (ISC_R_SUCCESS);
}
@ -1670,7 +1667,7 @@ isc__nm_start_reading(isc_nmsocket_t *sock) {
if (r != 0) {
result = isc_uverr2result(r);
} else {
atomic_store(&sock->reading, true);
sock->reading = true;
}
return (result);
@ -1680,7 +1677,7 @@ void
isc__nm_stop_reading(isc_nmsocket_t *sock) {
int r;
if (!atomic_load(&sock->reading)) {
if (!sock->reading) {
return;
}
@ -1698,7 +1695,7 @@ isc__nm_stop_reading(isc_nmsocket_t *sock) {
default:
UNREACHABLE();
}
atomic_store(&sock->reading, false);
sock->reading = false;
}
bool
@ -2046,58 +2043,30 @@ isc_nm_cancelread(isc_nmhandle_t *handle) {
case isc_nm_udpsocket:
isc__nm_udp_cancelread(handle);
break;
case isc_nm_tcpsocket:
isc__nm_tcp_cancelread(handle);
break;
case isc_nm_tcpdnssocket:
isc__nm_tcpdns_cancelread(handle);
break;
case isc_nm_tlsdnssocket:
isc__nm_tlsdns_cancelread(handle);
break;
#if HAVE_LIBNGHTTP2
case isc_nm_tlssocket:
isc__nm_tls_cancelread(handle);
break;
#endif
default:
UNREACHABLE();
}
}
void
isc_nm_pauseread(isc_nmhandle_t *handle) {
isc_nm_read_stop(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMHANDLE(handle));
isc_nmsocket_t *sock = handle->sock;
switch (sock->type) {
case isc_nm_tcpsocket:
isc__nm_tcp_pauseread(handle);
isc__nm_tcp_read_stop(handle);
break;
#if HAVE_LIBNGHTTP2
case isc_nm_tlssocket:
isc__nm_tls_pauseread(handle);
break;
#endif
default:
UNREACHABLE();
}
}
void
isc_nm_resumeread(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMHANDLE(handle));
isc_nmsocket_t *sock = handle->sock;
switch (sock->type) {
case isc_nm_tcpsocket:
isc__nm_tcp_resumeread(handle);
break;
#if HAVE_LIBNGHTTP2
case isc_nm_tlssocket:
isc__nm_tls_resumeread(handle);
isc__nm_tls_read_stop(handle);
break;
#endif
default:

View file

@ -55,9 +55,6 @@ can_log_tcp_quota(void) {
static isc_result_t
tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
static void
tcp_close_direct(isc_nmsocket_t *sock);
static isc_result_t
tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
static void
@ -173,36 +170,6 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
return (ISC_R_SUCCESS);
}
void
isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcpconnect_t *ievent =
(isc__netievent_tcpconnect_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
isc__nm_uvreq_t *req = ievent->req;
isc_result_t result = ISC_R_SUCCESS;
UNUSED(worker);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(sock->parent == NULL);
REQUIRE(sock->tid == isc_tid());
result = tcp_connect_direct(sock, req);
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->active, false);
if (sock->fd != (uv_os_sock_t)(-1)) {
isc__nm_tcp_close(sock);
}
isc__nm_connectcb(sock, req, result, true);
}
/*
* The sock is now attached to the handle.
*/
isc__nmsocket_detach(&sock);
}
static void
tcp_connect_cb(uv_connect_t *uvreq, int status) {
isc_result_t result = ISC_R_UNSET;
@ -295,7 +262,6 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
isc_nm_cb_t cb, void *cbarg, unsigned int timeout) {
isc_result_t result = ISC_R_SUCCESS;
isc_nmsocket_t *sock = NULL;
isc__netievent_tcpconnect_t *ievent = NULL;
isc__nm_uvreq_t *req = NULL;
sa_family_t sa_family;
isc__networker_t *worker = &mgr->workers[isc_tid()];
@ -335,14 +301,21 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
(void)isc__nm_socket_min_mtu(sock->fd, sa_family);
(void)isc__nm_socket_tcp_maxseg(sock->fd, NM_MAXSEG);
ievent = isc__nm_get_netievent_tcpconnect(worker, sock, req);
atomic_store(&sock->active, true);
isc__nm_async_tcpconnect(&mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
isc__nm_put_netievent_tcpconnect(worker, ievent);
atomic_store(&sock->active, true);
result = tcp_connect_direct(sock, req);
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->active, false);
if (sock->fd != (uv_os_sock_t)(-1)) {
isc__nm_tcp_close(sock);
}
isc__nm_connectcb(sock, req, result, true);
}
/*
* The sock is now attached to the handle.
*/
isc__nmsocket_detach(&sock);
}
static uv_os_sock_t
@ -670,6 +643,26 @@ isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) {
stop_tcp_parent(sock);
}
static void
tcp_stop_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = uv_handle_get_data(handle);
uv_handle_set_data(handle, NULL);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->closing));
REQUIRE(sock->type == isc_nm_tcpsocket);
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closed,
&(bool){ false }, true));
isc__nm_incstats(sock, STATID_CLOSE);
atomic_store(&sock->listening, false);
isc__nmsocket_detach(&sock);
}
void
isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcpstop_t *ievent = (isc__netievent_tcpstop_t *)ev0;
@ -739,12 +732,15 @@ destroy:
void
isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
isc_nmsocket_t *sock;
isc_nm_t *netmgr;
isc_result_t result;
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
isc_nmsocket_t *sock = handle->sock;
isc__netievent_tcpstartread_t *ievent = NULL;
isc_nm_t *netmgr = sock->worker->netmgr;
sock = handle->sock;
netmgr = sock->worker->netmgr;
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(sock->statichandle == handle);
@ -752,54 +748,34 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
sock->recv_cb = cb;
sock->recv_cbarg = cbarg;
sock->recv_read = true;
/* Initialize the timer */
if (sock->read_timeout == 0) {
sock->read_timeout = (atomic_load(&sock->keepalive)
? atomic_load(&netmgr->keepalive)
: atomic_load(&netmgr->idle));
}
ievent = isc__nm_get_netievent_tcpstartread(sock->worker, sock);
/*
* This MUST be done asynchronously, no matter which thread we're
* in. The callback function for isc_nm_read() often calls
* isc_nm_read() again; if we tried to do that synchronously
* we'd clash in processbuffer() and grow the stack indefinitely.
*/
isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
return;
}
void
isc__nm_async_tcpstartread(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcpstartread_t *ievent =
(isc__netievent_tcpstartread_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
isc_result_t result;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
UNUSED(worker);
if (isc__nmsocket_closing(sock)) {
result = ISC_R_CANCELED;
} else {
result = isc__nm_start_reading(sock);
goto failure;
}
result = isc__nm_start_reading(sock);
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->reading, true);
isc__nm_tcp_failed_read_cb(sock, result);
return;
goto failure;
}
isc__nmsocket_timer_start(sock);
return;
failure:
sock->reading = true;
isc__nm_tcp_failed_read_cb(sock, result);
}
void
isc__nm_tcp_pauseread(isc_nmhandle_t *handle) {
isc__netievent_tcppauseread_t *ievent = NULL;
isc__nm_tcp_read_stop(isc_nmhandle_t *handle) {
isc_nmsocket_t *sock = NULL;
REQUIRE(VALID_NMHANDLE(handle));
@ -808,61 +784,10 @@ isc__nm_tcp_pauseread(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMSOCK(sock));
if (!atomic_compare_exchange_strong(&sock->readpaused, &(bool){ false },
true)) {
return;
}
ievent = isc__nm_get_netievent_tcppauseread(sock->worker, sock);
isc__nm_maybe_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
return;
}
void
isc__nm_async_tcppauseread(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcppauseread_t *ievent =
(isc__netievent_tcppauseread_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
UNUSED(worker);
isc__nmsocket_timer_stop(sock);
isc__nm_stop_reading(sock);
}
void
isc__nm_tcp_resumeread(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
isc__netievent_tcpstartread_t *ievent = NULL;
isc_nmsocket_t *sock = handle->sock;
REQUIRE(sock->tid == isc_tid());
if (sock->recv_cb == NULL) {
/* We are no longer reading */
return;
}
if (!isc__nmsocket_active(sock)) {
atomic_store(&sock->reading, true);
isc__nm_tcp_failed_read_cb(sock, ISC_R_CANCELED);
return;
}
if (!atomic_compare_exchange_strong(&sock->readpaused, &(bool){ true },
false)) {
return;
}
ievent = isc__nm_get_netievent_tcpstartread(sock->worker, sock);
isc__nm_maybe_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
return;
}
void
@ -873,7 +798,7 @@ isc__nm_tcp_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->reading));
REQUIRE(sock->reading);
REQUIRE(buf != NULL);
netmgr = sock->worker->netmgr;
@ -912,7 +837,7 @@ isc__nm_tcp_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
isc__nm_readcb(sock, req, ISC_R_SUCCESS);
/* The readcb could have paused the reading */
if (atomic_load(&sock->reading)) {
if (sock->reading) {
/* The timer will be updated */
isc__nmsocket_timer_restart(sock);
}
@ -938,10 +863,12 @@ quota_accept_cb(isc_quota_t *quota, void *sock0) {
REQUIRE(VALID_NMSOCK(sock));
/*
* Create a tcpaccept event and pass it using the async channel.
* Create a tcpaccept event and pass it using the async channel. This
* needs to be asynchronous, because the quota might have been released
* by a different child socket.
*/
ievent = isc__nm_get_netievent_tcpaccept(sock->worker, sock, quota);
isc__nm_maybe_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
}
/*
@ -1048,8 +975,6 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
csock->read_timeout = atomic_load(&csock->worker->netmgr->init);
atomic_fetch_add(&ssock->parent->active_child_connections, 1);
/*
* The acceptcb needs to attach to the handle if it wants to keep the
* connection alive
@ -1082,10 +1007,12 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, const isc_region_t *region,
REQUIRE(VALID_NMSOCK(handle->sock));
isc_nmsocket_t *sock = handle->sock;
isc__netievent_tcpsend_t *ievent = NULL;
isc_result_t result;
isc__nm_uvreq_t *uvreq = NULL;
isc_nm_t *netmgr = sock->worker->netmgr;
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(sock->tid == isc_tid());
uvreq = isc__nm_uvreq_get(sock->worker, sock);
uvreq->uvbuf.base = (char *)region->base;
@ -1096,8 +1023,17 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, const isc_region_t *region,
uvreq->cb.send = cb;
uvreq->cbarg = cbarg;
ievent = isc__nm_get_netievent_tcpsend(sock->worker, sock, uvreq);
isc__nm_maybe_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
if (sock->write_timeout == 0) {
sock->write_timeout = (atomic_load(&sock->keepalive)
? atomic_load(&netmgr->keepalive)
: atomic_load(&netmgr->idle));
}
result = tcp_send_direct(sock, uvreq);
if (result != ISC_R_SUCCESS) {
isc__nm_incstats(sock, STATID_SENDFAIL);
isc__nm_failed_send_cb(sock, uvreq, result);
}
return;
}
@ -1124,34 +1060,6 @@ tcp_send_cb(uv_write_t *req, int status) {
isc__nm_sendcb(sock, uvreq, ISC_R_SUCCESS, false);
}
/*
* Handle 'tcpsend' async event - send a packet on the socket
*/
void
isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0) {
isc_result_t result;
isc__netievent_tcpsend_t *ievent = (isc__netievent_tcpsend_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
isc__nm_uvreq_t *uvreq = ievent->req;
isc_nm_t *netmgr = sock->worker->netmgr;
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(sock->tid == isc_tid());
UNUSED(worker);
if (sock->write_timeout == 0) {
sock->write_timeout = (atomic_load(&sock->keepalive)
? atomic_load(&netmgr->keepalive)
: atomic_load(&netmgr->idle));
}
result = tcp_send_direct(sock, uvreq);
if (result != ISC_R_SUCCESS) {
isc__nm_incstats(sock, STATID_SENDFAIL);
isc__nm_failed_send_cb(sock, uvreq, result);
}
}
static isc_result_t
tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
REQUIRE(VALID_NMSOCK(sock));
@ -1180,38 +1088,14 @@ tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
return (ISC_R_SUCCESS);
}
static void
tcp_stop_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = uv_handle_get_data(handle);
uv_handle_set_data(handle, NULL);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->closing));
REQUIRE(sock->type == isc_nm_tcpsocket);
if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false },
true)) {
UNREACHABLE();
}
isc__nm_incstats(sock, STATID_CLOSE);
atomic_store(&sock->listening, false);
isc__nmsocket_detach(&sock);
}
static void
tcp_close_sock(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->closing));
if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false },
true)) {
UNREACHABLE();
}
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closed,
&(bool){ false }, true));
isc__nm_incstats(sock, STATID_CLOSE);
@ -1232,22 +1116,16 @@ tcp_close_cb(uv_handle_t *handle) {
tcp_close_sock(sock);
}
static void
tcp_close_direct(isc_nmsocket_t *sock) {
void
isc__nm_tcp_close(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(!isc__nmsocket_active(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->closing));
REQUIRE(sock->parent == NULL);
if (sock->server != NULL) {
REQUIRE(VALID_NMSOCK(sock->server));
REQUIRE(VALID_NMSOCK(sock->server->parent));
if (sock->server->parent != NULL) {
atomic_fetch_sub(
&sock->server->parent->active_child_connections,
1);
}
}
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closing,
&(bool){ false }, true));
if (sock->quota != NULL) {
isc_quota_detach(&sock->quota);
@ -1280,44 +1158,6 @@ tcp_close_direct(isc_nmsocket_t *sock) {
}
}
void
isc__nm_tcp_close(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(!isc__nmsocket_active(sock));
if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
true)) {
return;
}
if (sock->tid == isc_tid()) {
tcp_close_direct(sock);
} else {
/*
* We need to create an event and pass it using async channel
*/
isc__netievent_tcpclose_t *ievent =
isc__nm_get_netievent_tcpclose(sock->worker, sock);
isc__nm_enqueue_ievent(sock->worker,
(isc__netievent_t *)ievent);
}
}
void
isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcpclose_t *ievent = (isc__netievent_tcpclose_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
UNUSED(worker);
tcp_close_direct(sock);
}
static void
tcp_close_connect_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = uv_handle_get_data(handle);
@ -1375,44 +1215,3 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
isc__nmsocket_prep_destroy(sock);
}
}
void
isc__nm_tcp_cancelread(isc_nmhandle_t *handle) {
isc_nmsocket_t *sock = NULL;
isc__netievent_tcpcancel_t *ievent = NULL;
REQUIRE(VALID_NMHANDLE(handle));
sock = handle->sock;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpsocket);
ievent = isc__nm_get_netievent_tcpcancel(sock->worker, sock, handle);
isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
}
void
isc__nm_async_tcpcancel(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcpcancel_t *ievent = (isc__netievent_tcpcancel_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
UNUSED(worker);
uv_timer_stop(&sock->read_timer);
isc__nm_tcp_failed_read_cb(sock, ISC_R_EOF);
}
int_fast32_t
isc__nm_tcp_listener_nactive(isc_nmsocket_t *listener) {
int_fast32_t nactive;
REQUIRE(VALID_NMSOCK(listener));
nactive = atomic_load(&listener->active_child_connections);
INSIST(nactive >= 0);
return (nactive);
}

View file

@ -725,8 +725,10 @@ isc__nm_tcpdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
ievent = isc__nm_get_netievent_tcpdnsread(sock->worker, sock);
/*
* This MUST be done asynchronously, no matter which thread we're
* in. The callback function for isc_nm_read() often calls
* FIXME: This MUST be done asynchronously, ~~no matter which thread
* we're in.~~ ,only when there's existing data on the socket.
* The callback function for isc_nm_read() often calls
* isc_nm_read() again; if we tried to do that synchronously
* we'd clash in processbuffer() and grow the stack indefinitely.
*/
@ -754,7 +756,7 @@ isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0) {
}
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->reading, true);
sock->reading = true;
isc__nm_failed_read_cb(sock, result, false);
}
}
@ -870,7 +872,7 @@ isc__nm_tcpdns_read_cb(uv_stream_t *stream, ssize_t nread,
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->reading));
REQUIRE(sock->reading);
REQUIRE(buf != NULL);
if (isc__nmsocket_closing(sock)) {

View file

@ -949,7 +949,7 @@ isc__nm_async_tlsdnsread(isc__networker_t *worker, isc__netievent_t *ev0) {
REQUIRE(sock->tid == isc_tid());
if (isc__nmsocket_closing(sock)) {
atomic_store(&sock->reading, true);
sock->reading = true;
isc__nm_failed_read_cb(sock, ISC_R_CANCELED, false);
return;
}
@ -1468,7 +1468,7 @@ isc__nm_tlsdns_read_cb(uv_stream_t *stream, ssize_t nread,
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->reading));
REQUIRE(sock->reading);
REQUIRE(buf != NULL);
if (isc__nmsocket_closing(sock)) {

View file

@ -160,9 +160,8 @@ tls_senddone(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
isc_mem_put(handle->sock->worker->mctx, send_req, sizeof(*send_req));
tlssock->tlsstream.nsending--;
if (finish && eresult == ISC_R_SUCCESS) {
tlssock->tlsstream.reading = false;
isc_nm_cancelread(handle);
if (finish && eresult == ISC_R_SUCCESS && tlssock->reading) {
tls_failed_read_cb(tlssock, ISC_R_EOF);
} else if (eresult == ISC_R_SUCCESS) {
tls_do_bio(tlssock, NULL, NULL, false);
} else if (eresult != ISC_R_SUCCESS &&
@ -203,16 +202,8 @@ tls_failed_read_cb(isc_nmsocket_t *sock, const isc_result_t result) {
isc__nmsocket_clearcb(sock);
isc_nmhandle_detach(&handle);
} else if (sock->recv_cb != NULL && sock->statichandle != NULL) {
isc__nm_uvreq_t *req = NULL;
INSIST(VALID_NMHANDLE(sock->statichandle));
req = isc__nm_uvreq_get(sock->worker, sock);
req->cb.recv = sock->recv_cb;
req->cbarg = sock->recv_cbarg;
isc_nmhandle_attach(sock->statichandle, &req->handle);
if (result != ISC_R_TIMEDOUT) {
isc__nmsocket_clearcb(sock);
}
isc__nm_readcb(sock, req, result);
sock->recv_cb(sock->statichandle, result, NULL,
sock->recv_cbarg);
if (result == ISC_R_TIMEDOUT &&
(sock->outerhandle == NULL ||
isc__nmsocket_timer_running(sock->outerhandle->sock)))
@ -226,6 +217,16 @@ tls_failed_read_cb(isc_nmsocket_t *sock, const isc_result_t result) {
}
}
void
isc__nm_tls_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
if (!inactive(sock) && sock->tlsstream.state == TLS_IO) {
tls_do_bio(sock, NULL, NULL, true);
} else if (sock->reading) {
sock->reading = false;
tls_failed_read_cb(sock, result);
}
}
static void
async_tls_do_bio(isc_nmsocket_t *sock) {
isc__netievent_tlsdobio_t *ievent =
@ -386,14 +387,16 @@ tls_do_bio(isc_nmsocket_t *sock, isc_region_t *received_data,
int rv = 0;
size_t len = 0;
int saved_errno = 0;
bool was_reading;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
was_reading = sock->reading;
/* We will resume read if TLS layer wants us to */
if (sock->tlsstream.reading && sock->outerhandle) {
if (sock->reading && sock->outerhandle) {
REQUIRE(VALID_NMHANDLE(sock->outerhandle));
isc_nm_pauseread(sock->outerhandle);
isc_nm_read_stop(sock->outerhandle);
}
if (sock->tlsstream.state == TLS_INIT) {
@ -468,8 +471,7 @@ tls_do_bio(isc_nmsocket_t *sock, isc_region_t *received_data,
/* Decrypt and pass data from network to client */
if (sock->tlsstream.state >= TLS_IO && sock->recv_cb != NULL &&
!atomic_load(&sock->readpaused) &&
sock->statichandle != NULL && !finish)
was_reading && sock->statichandle != NULL && !finish)
{
uint8_t recv_buf[TLS_BUF_SIZE];
INSIST(sock->tlsstream.state > TLS_HANDSHAKE);
@ -495,7 +497,7 @@ tls_do_bio(isc_nmsocket_t *sock, isc_region_t *received_data,
if (sock->statichandle == NULL) {
finish = true;
break;
} else if (atomic_load(&sock->readpaused)) {
} else if (!sock->reading) {
/*
* Reading has been paused from withing
* the context of read callback - stop
@ -553,19 +555,14 @@ tls_do_bio(isc_nmsocket_t *sock, isc_region_t *received_data,
return;
case SSL_ERROR_WANT_READ:
if (tls_try_to_close_unused_socket(sock) ||
sock->outerhandle == NULL || atomic_load(&sock->readpaused))
{
sock->outerhandle == NULL) {
return;
}
INSIST(VALID_NMHANDLE(sock->outerhandle));
if (sock->tlsstream.reading) {
isc_nm_resumeread(sock->outerhandle);
} else if (sock->tlsstream.state == TLS_HANDSHAKE) {
sock->tlsstream.reading = true;
isc_nm_read(sock->outerhandle, tls_readcb, sock);
}
sock->reading = true;
isc_nm_read(sock->outerhandle, tls_readcb, sock);
return;
default:
result = tls_error_to_result(tls_status, sock->tlsstream.state,
@ -811,22 +808,8 @@ isc__nm_tls_send(isc_nmhandle_t *handle, const isc_region_t *region,
isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
}
void
isc__nm_async_tlsstartread(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tlsstartread_t *ievent =
(isc__netievent_tlsstartread_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
REQUIRE(sock->tid == isc_tid());
UNUSED(worker);
tls_do_bio(sock, NULL, NULL, false);
}
void
isc__nm_tls_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
isc__netievent_tlsstartread_t *ievent = NULL;
isc_nmsocket_t *sock = NULL;
REQUIRE(VALID_NMHANDLE(handle));
@ -835,7 +818,6 @@ isc__nm_tls_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->statichandle == handle);
REQUIRE(sock->tid == isc_tid());
REQUIRE(sock->recv_cb == NULL);
if (inactive(sock)) {
cb(handle, ISC_R_NOTCONNECTED, NULL, cbarg);
@ -845,37 +827,22 @@ isc__nm_tls_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
sock->recv_cb = cb;
sock->recv_cbarg = cbarg;
ievent = isc__nm_get_netievent_tlsstartread(sock->worker, sock);
isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
}
void
isc__nm_tls_pauseread(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
if (atomic_compare_exchange_strong(&handle->sock->readpaused,
&(bool){ false }, true))
{
if (handle->sock->outerhandle != NULL) {
isc_nm_pauseread(handle->sock->outerhandle);
}
if (sock->reading) {
return;
}
tls_do_bio(sock, NULL, NULL, false);
}
void
isc__nm_tls_resumeread(isc_nmhandle_t *handle) {
isc__nm_tls_read_stop(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
if (!atomic_compare_exchange_strong(&handle->sock->readpaused,
&(bool){ true }, false))
{
if (inactive(handle->sock)) {
return;
}
handle->sock->reading = false;
async_tls_do_bio(handle->sock);
if (handle->sock->outerhandle != NULL) {
isc_nm_read_stop(handle->sock->outerhandle);
}
}
@ -888,8 +855,10 @@ tls_close_direct(isc_nmsocket_t *sock) {
* external references, we can close everything.
*/
if (sock->outerhandle != NULL) {
isc_nm_pauseread(sock->outerhandle);
isc__nmsocket_clearcb(sock->outerhandle->sock);
sock->reading = false;
isc_nm_read_stop(sock->outerhandle);
isc_nmhandle_close(sock->outerhandle);
isc_nmhandle_detach(&sock->outerhandle);
}
@ -905,8 +874,6 @@ tls_close_direct(isc_nmsocket_t *sock) {
void
isc__nm_tls_close(isc_nmsocket_t *sock) {
isc__netievent_tlsclose_t *ievent = NULL;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tlssocket);
@ -915,8 +882,15 @@ isc__nm_tls_close(isc_nmsocket_t *sock) {
return;
}
ievent = isc__nm_get_netievent_tlsclose(sock->worker, sock);
isc__nm_maybe_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
if (sock->tid == isc_tid()) {
/* no point in attempting to make the call asynchronous */
tls_close_direct(sock);
} else {
isc__netievent_tlsclose_t *ievent =
isc__nm_get_netievent_tlsclose(sock->worker, sock);
isc__nm_enqueue_ievent(sock->worker,
(isc__netievent_t *)ievent);
}
}
void
@ -1052,49 +1026,6 @@ error:
isc__nmsocket_detach(&tlssock);
}
static void
tls_cancelread(isc_nmsocket_t *sock) {
if (!inactive(sock) && sock->tlsstream.state == TLS_IO) {
tls_do_bio(sock, NULL, NULL, true);
} else if (sock->outerhandle != NULL) {
sock->tlsstream.reading = false;
isc_nm_cancelread(sock->outerhandle);
}
}
void
isc__nm_tls_cancelread(isc_nmhandle_t *handle) {
isc_nmsocket_t *sock = NULL;
isc__netievent_tlscancel_t *ievent = NULL;
REQUIRE(VALID_NMHANDLE(handle));
sock = handle->sock;
REQUIRE(sock->type == isc_nm_tlssocket);
if (sock->tid == isc_tid()) {
tls_cancelread(sock);
} else {
ievent = isc__nm_get_netievent_tlscancel(sock->worker, sock,
handle);
isc__nm_enqueue_ievent(sock->worker,
(isc__netievent_t *)ievent);
}
}
void
isc__nm_async_tlscancel(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tlscancel_t *ievent = (isc__netievent_tlscancel_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
UNUSED(worker);
tls_cancelread(sock);
}
void
isc__nm_async_tlsdobio(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tlsdobio_t *ievent = (isc__netievent_tlsdobio_t *)ev0;

View file

@ -102,7 +102,7 @@ start_udp_child(isc_nm_t *mgr, isc_sockaddr_t *iface, isc_nmsocket_t *sock,
csock->recv_cb = sock->recv_cb;
csock->recv_cbarg = sock->recv_cbarg;
atomic_init(&csock->reading, true);
csock->reading = true;
if (mgr->load_balance_sockets) {
UNUSED(fd);
@ -968,7 +968,7 @@ isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
return;
fail:
atomic_store(&sock->reading, true); /* required by the next call */
sock->reading = true; /* required by the next call */
isc__nm_failed_read_cb(sock, result, false);
}

View file

@ -115,7 +115,7 @@ recv_data(isc_nmhandle_t *handle, isc_result_t eresult, isc_region_t *region,
return;
done:
isc_nm_pauseread(handle);
isc_nm_read_stop(handle);
ccmsg->cb(handle, ccmsg->result, ccmsg->cbarg);
}
@ -155,12 +155,8 @@ isccc_ccmsg_readmessage(isccc_ccmsg_t *ccmsg, isc_nm_cb_t cb, void *cbarg) {
ccmsg->result = ISC_R_UNEXPECTED; /* unknown right now */
ccmsg->length_received = false;
if (ccmsg->reading) {
isc_nm_resumeread(ccmsg->handle);
} else {
isc_nm_read(ccmsg->handle, recv_data, ccmsg);
ccmsg->reading = true;
}
isc_nm_read(ccmsg->handle, recv_data, ccmsg);
ccmsg->reading = true;
}
void
@ -168,7 +164,7 @@ isccc_ccmsg_cancelread(isccc_ccmsg_t *ccmsg) {
REQUIRE(VALID_CCMSG(ccmsg));
if (ccmsg->reading) {
isc_nm_cancelread(ccmsg->handle);
isc_nm_read_stop(ccmsg->handle);
ccmsg->reading = false;
}
}

View file

@ -94,6 +94,7 @@ atomic_bool check_listener_quota = false;
bool allow_send_back = false;
bool noanswer = false;
bool stream_use_TLS = false;
bool stream = false;
isc_nm_recv_cb_t connect_readcb = NULL;
@ -314,7 +315,11 @@ connect_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
if (eresult != ISC_R_SUCCESS) {
/* Send failed, we need to stop reading too */
isc_nm_cancelread(handle);
if (stream) {
isc_nm_read_stop(handle);
} else {
isc_nm_cancelread(handle);
}
goto unref;
}
@ -369,6 +374,7 @@ connect_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
case ISC_R_SHUTTINGDOWN:
case ISC_R_CANCELED:
case ISC_R_CONNECTIONRESET:
case ISC_R_CONNREFUSED:
break;
default:
fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
@ -378,6 +384,9 @@ connect_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
isc_refcount_decrement(&active_creads);
if (stream) {
isc_nm_read_stop(handle);
}
isc_nmhandle_detach(&handle);
}
@ -648,6 +657,8 @@ connect_success_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
UNUSED(handle);
UNUSED(cbarg);
F();
isc_refcount_decrement(&active_cconnects);
assert_int_equal(eresult, ISC_R_SUCCESS);
@ -693,11 +704,8 @@ noresponse_readcb(isc_nmhandle_t *handle, isc_result_t eresult,
UNUSED(region);
UNUSED(cbarg);
if (eresult == ISC_R_EOF) {
eresult = ISC_R_CONNECTIONRESET;
}
assert_int_equal(eresult, ISC_R_CONNECTIONRESET);
assert_true(eresult == ISC_R_CANCELED ||
eresult == ISC_R_CONNECTIONRESET || eresult == ISC_R_EOF);
isc_refcount_decrement(&active_creads);

View file

@ -118,6 +118,7 @@ extern atomic_bool check_listener_quota;
extern bool allow_send_back;
extern bool noanswer;
extern bool stream_use_TLS;
extern bool stream;
extern isc_nm_recv_cb_t connect_readcb;

View file

@ -130,4 +130,12 @@ ISC_TEST_ENTRY_CUSTOM(tcp_recv_send_quota_sendback, stream_recv_send_setup,
ISC_TEST_LIST_END
ISC_TEST_MAIN
static int
tcp_setup(void **state __attribute__((__unused__))) {
stream_use_TLS = false;
stream = true;
return (0);
}
ISC_TEST_MAIN_CUSTOM(tcp_setup, NULL)

View file

@ -132,6 +132,7 @@ ISC_TEST_LIST_END
static int
tls_setup(void **state __attribute__((__unused__))) {
stream_use_TLS = true;
stream = true;
return (0);
}