Cleanup the asychronous code in the stream implementations

After the loopmgr work has been merged, we can now cleanup the TCP and
TLS protocols a little bit, because there are stronger guarantees that
the sockets will be kept on the respective loops/threads.  We only need
asynchronous call for listening sockets (start, stop) and reading from
the TCP (because the isc_nm_read() might be called from read callback
again.

This commit does the following changes (they are intertwined together):

1. Cleanup most of the asynchronous events in the TCP code, and add
   comments for the events that needs to be kept asynchronous.

2. Remove isc_nm_resumeread() from the netmgr API, and replace
   isc_nm_resumeread() calls with existing isc_nm_read() calls.

3. Remove isc_nm_pauseread() from the netmgr API, and replace
   isc_nm_pauseread() calls with a new isc_nm_read_stop() call.

4. Disable the isc_nm_cancelread() for the streaming protocols, only the
   datagram-like protocols can use isc_nm_cancelread().

5. Add isc_nmhandle_close() that can be used to shutdown the socket
  earlier than after the last detach.  Formerly, the socket would be
  closed only after all reading and sending would be finished and the
  last reference would be detached.  The new isc_nmhandle_close() can
  be used to close the underlying socket earlier, so all the other
  asynchronous calls would call their respective callbacks immediately.

Co-authored-by: Ondřej Surý <ondrej@isc.org>
Co-authored-by: Artem Boldariev <artem@isc.org>
This commit is contained in:
Ondřej Surý 2022-08-29 10:55:10 +02:00
parent 62813df44b
commit fffd444440
No known key found for this signature in database
GPG key ID: 2820F37E873DEA41
15 changed files with 220 additions and 587 deletions

View file

@ -413,7 +413,6 @@ process_request(isc_httpd_t *httpd, isc_region_t *region, size_t *buflen) {
const char *content_length = NULL;
size_t limit = sizeof(httpd->recvbuf) - httpd->recvlen - 1;
size_t len = region->length;
size_t clen = 0;
int delim;
bool truncated = false;
@ -579,7 +578,7 @@ process_request(isc_httpd_t *httpd, isc_region_t *region, size_t *buflen) {
} else {
INSIST(content_length != NULL);
clen = (size_t)strtoul(content_length, NULL, 10);
size_t clen = (size_t)strtoul(content_length, NULL, 10);
if (clen == ULONG_MAX) {
/* Invalid number in the header value. */
return (ISC_R_BADNUMBER);
@ -931,7 +930,8 @@ httpd_request(isc_nmhandle_t *handle, isc_result_t eresult,
* ISC_R_NOTFOUND is not returned from netmgr) and we
* need to resume reading.
*/
isc_nm_resumeread(httpd->readhandle);
isc_nm_read(httpd->readhandle, httpd_request,
httpd->mgr);
return;
}
goto cleanup_readhandle;
@ -1043,7 +1043,7 @@ httpd_request(isc_nmhandle_t *handle, isc_result_t eresult,
*/
isc_buffer_usedregion(httpd->sendbuffer, &r);
isc_nm_pauseread(httpd->handle);
isc_nm_read_stop(httpd->handle);
httpd->state = SEND;
isc_nmhandle_attach(httpd->handle, &httpd->sendhandle);
@ -1072,7 +1072,8 @@ isc_httpdmgr_shutdown(isc_httpdmgr_t **httpdmgrp) {
httpd = ISC_LIST_HEAD(httpdmgr->running);
while (httpd != NULL) {
isc_nm_cancelread(httpd->readhandle);
isc_nm_read_stop(httpd->readhandle);
isc_nmhandle_detach(&httpd->readhandle);
httpd = ISC_LIST_NEXT(httpd, link);
}
UNLOCK(&httpdmgr->lock);
@ -1232,7 +1233,7 @@ httpd_senddone(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
*/
httpd_request(httpd->handle, ISC_R_SUCCESS, NULL, httpd->mgr);
} else if (!httpd->truncated) {
isc_nm_resumeread(httpd->readhandle);
isc_nm_read(httpd->readhandle, httpd_request, httpd->mgr);
} else {
/* Truncated request, don't resume */
goto cleanup_readhandle;

View file

@ -308,9 +308,9 @@ isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg);
*/
void
isc_nm_pauseread(isc_nmhandle_t *handle);
isc_nm_read_stop(isc_nmhandle_t *handle);
/*%<
* Pause reading on this handle's socket, but remember the callback.
* Stop reading on this handle's socket.
*
* Requires:
* \li 'handle' is a valid netmgr handle.
@ -323,19 +323,12 @@ isc_nm_cancelread(isc_nmhandle_t *handle);
* active handles with a result code of ISC_R_CANCELED.
*
* Requires:
* \li 'sock' is a valid netmgr socket
* \li 'sock' is a valid datagram-like netmgr socket
* \li ...for which a read/recv callback has been defined.
*/
void
isc_nm_resumeread(isc_nmhandle_t *handle);
/*%<
* Resume reading on the handle's socket.
*
* Requires:
* \li 'handle' is a valid netmgr handle.
* \li ...for a socket with a defined read/recv callback.
*/
isc_nmhandle_close(isc_nmhandle_t *handle);
void
isc_nm_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,

View file

@ -490,7 +490,8 @@ finish_http_session(isc_nm_http_session_t *session) {
if (session->handle != NULL) {
if (!session->closed) {
session->closed = true;
isc_nm_cancelread(session->handle);
session->reading = false;
isc_nmhandle_close(session->handle);
}
if (session->client) {
@ -1007,7 +1008,7 @@ http_readcb(isc_nmhandle_t *handle, isc_result_t result, isc_region_t *region,
}
isc_buffer_putmem(session->buf, region->base + readlen,
unread_size);
isc_nm_pauseread(session->handle);
isc_nm_read_stop(session->handle);
}
/* We might have something to receive or send, do IO */
@ -1287,11 +1288,11 @@ http_do_bio(isc_nm_http_session_t *session, isc_nmhandle_t *send_httphandle,
return;
} else {
/* Resume reading, it's idempotent, wait for more */
isc_nm_resumeread(session->handle);
isc_nm_read(session->handle, http_readcb, session);
}
} else {
/* We don't want more data, stop reading for now */
isc_nm_pauseread(session->handle);
isc_nm_read_stop(session->handle);
}
if (send_cb != NULL) {
@ -1332,31 +1333,22 @@ get_http_cstream(isc_nmsocket_t *sock, http_cstream_t **streamp) {
static void
http_call_connect_cb(isc_nmsocket_t *sock, isc_nm_http_session_t *session,
isc_result_t result) {
isc__nm_uvreq_t *req = NULL;
isc_nmhandle_t *httphandle = isc__nmhandle_get(sock, &sock->peer,
&sock->iface);
void *cbarg;
isc_nm_cb_t connect_cb;
REQUIRE(sock->connect_cb != NULL);
cbarg = sock->connect_cbarg;
connect_cb = sock->connect_cb;
isc__nmsocket_clearcb(sock);
if (result == ISC_R_SUCCESS) {
req = isc__nm_uvreq_get(sock->worker, sock);
req->cb.connect = sock->connect_cb;
req->cbarg = sock->connect_cbarg;
if (session != NULL) {
session->client_httphandle = httphandle;
req->handle = NULL;
isc_nmhandle_attach(httphandle, &req->handle);
} else {
req->handle = httphandle;
}
isc__nmsocket_clearcb(sock);
isc__nm_connectcb(sock, req, result, true);
connect_cb(httphandle, result, cbarg);
} else {
void *cbarg = sock->connect_cbarg;
isc_nm_cb_t connect_cb = sock->connect_cb;
isc__nmsocket_clearcb(sock);
connect_cb(httphandle, result, cbarg);
isc_nmhandle_detach(&httphandle);
}

View file

@ -254,13 +254,7 @@ struct isc_nmhandle {
typedef enum isc__netievent_type {
netievent_udpcancel,
netievent_tcpconnect,
netievent_tcpclose,
netievent_tcpsend,
netievent_tcpstartread,
netievent_tcppauseread,
netievent_tcpaccept,
netievent_tcpcancel,
netievent_tcpdnsaccept,
netievent_tcpdnsconnect,
@ -271,10 +265,8 @@ typedef enum isc__netievent_type {
netievent_tlsclose,
netievent_tlssend,
netievent_tlsstartread,
netievent_tlsconnect,
netievent_tlsdobio,
netievent_tlscancel,
netievent_tlsdnsaccept,
netievent_tlsdnsconnect,
@ -924,7 +916,6 @@ struct isc_nmsocket {
TLS_CLOSED
} state; /*%< The order of these is significant */
size_t nsending;
bool reading;
} tlsstream;
isc_nmsocket_h2_t h2;
@ -1005,7 +996,7 @@ struct isc_nmsocket {
atomic_bool connecting;
atomic_bool connected;
atomic_bool accepting;
atomic_bool reading;
bool reading;
atomic_bool timedout;
isc_refcount_t references;
@ -1020,11 +1011,6 @@ struct isc_nmsocket {
*/
bool processing;
/*%
* A TCP socket has had isc_nm_pauseread() called.
*/
atomic_bool readpaused;
/*%
* A TCP or TCPDNS socket has been set to use the keepalive
* timeout instead of the default idle timeout.
@ -1326,7 +1312,7 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, const isc_region_t *region,
void
isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg);
/*
* Back-end implementation of isc_nm_read() for TCP handles.
* Start reading on this handle.
*/
void
@ -1335,16 +1321,9 @@ isc__nm_tcp_close(isc_nmsocket_t *sock);
* Close a TCP socket.
*/
void
isc__nm_tcp_pauseread(isc_nmhandle_t *handle);
isc__nm_tcp_read_stop(isc_nmhandle_t *handle);
/*%<
* Pause reading on this handle, while still remembering the callback.
*/
void
isc__nm_tcp_resumeread(isc_nmhandle_t *handle);
/*%<
* Resume reading from socket.
*
* Stop reading on this handle.
*/
void
@ -1366,40 +1345,18 @@ isc__nm_tcp_stoplistening(isc_nmsocket_t *sock);
* Stop listening on 'sock'.
*/
int_fast32_t
isc__nm_tcp_listener_nactive(isc_nmsocket_t *sock);
/*%<
* Returns the number of active connections for the TCP listener socket.
*/
void
isc__nm_tcp_settimeout(isc_nmhandle_t *handle, uint32_t timeout);
/*%<
* Set the read timeout for the TCP socket associated with 'handle'.
*/
void
isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpaccept(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_pauseread(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpstartread(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcppauseread(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpcancel(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0);
/*%<
* Callback handlers for asynchronous TCP events (connect, listen,
* stoplisten, send, read, pause, close).
@ -1411,14 +1368,9 @@ isc__nm_async_tlsclose(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tlssend(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tlsstartread(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tlsdobio(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_tlscancel(isc__networker_t *worker, isc__netievent_t *ev0);
/*%<
* Callback handlers for asynchronous TLS events.
*/
@ -1570,15 +1522,15 @@ void
isc__nm_tls_send(isc_nmhandle_t *handle, const isc_region_t *region,
isc_nm_cb_t cb, void *cbarg);
void
isc__nm_tls_cancelread(isc_nmhandle_t *handle);
/*%<
* Back-end implementation of isc_nm_send() for TLSDNS handles.
*/
void
isc__nm_tls_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg);
/*%<
* Start reading on the TLS handle.
*/
void
isc__nm_tls_close(isc_nmsocket_t *sock);
@ -1587,16 +1539,9 @@ isc__nm_tls_close(isc_nmsocket_t *sock);
*/
void
isc__nm_tls_pauseread(isc_nmhandle_t *handle);
isc__nm_tls_read_stop(isc_nmhandle_t *handle);
/*%<
* Pause reading on this handle, while still remembering the callback.
*/
void
isc__nm_tls_resumeread(isc_nmhandle_t *handle);
/*%<
* Resume reading from the handle.
*
* Stop reading on the TLS handle.
*/
void
@ -1632,6 +1577,9 @@ void
isc__nmhandle_tls_setwritetimeout(isc_nmhandle_t *handle,
uint64_t write_timeout);
void
isc__nm_tls_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result);
void
isc__nm_http_stoplistening(isc_nmsocket_t *sock);
@ -1819,16 +1767,12 @@ isc__nm_set_network_buffers(isc_nm_t *nm, uv_handle_t *handle);
* typedef all the netievent types
*/
NETIEVENT_SOCKET_TYPE(tcpclose);
NETIEVENT_SOCKET_TYPE(tcplisten);
NETIEVENT_SOCKET_TYPE(tcppauseread);
NETIEVENT_SOCKET_TYPE(tcpstop);
NETIEVENT_SOCKET_TYPE(tlsclose);
/* NETIEVENT_SOCKET_TYPE(tlsconnect); */ /* unique type, defined independently
*/
NETIEVENT_SOCKET_TYPE(tlsdobio);
NETIEVENT_SOCKET_TYPE(tlsstartread);
NETIEVENT_SOCKET_HANDLE_TYPE(tlscancel);
NETIEVENT_SOCKET_TYPE(udplisten);
NETIEVENT_SOCKET_TYPE(udpstop);
@ -1858,9 +1802,6 @@ NETIEVENT_SOCKET_TYPE(httpclose);
NETIEVENT_SOCKET_HTTP_EPS_TYPE(httpendpoints);
#endif /* HAVE_LIBNGHTTP2 */
NETIEVENT_SOCKET_REQ_TYPE(tcpconnect);
NETIEVENT_SOCKET_REQ_TYPE(tcpsend);
NETIEVENT_SOCKET_TYPE(tcpstartread);
NETIEVENT_SOCKET_REQ_TYPE(tlssend);
NETIEVENT_SOCKET_REQ_RESULT_TYPE(connectcb);
@ -1868,7 +1809,6 @@ NETIEVENT_SOCKET_REQ_RESULT_TYPE(readcb);
NETIEVENT_SOCKET_REQ_RESULT_TYPE(sendcb);
NETIEVENT_SOCKET_HANDLE_TYPE(detach);
NETIEVENT_SOCKET_HANDLE_TYPE(tcpcancel);
NETIEVENT_SOCKET_HANDLE_TYPE(udpcancel);
NETIEVENT_SOCKET_QUOTA_TYPE(tcpaccept);
@ -1877,16 +1817,11 @@ NETIEVENT_SOCKET_TLSCTX_TYPE(settlsctx);
/* Now declared the helper functions */
NETIEVENT_SOCKET_DECL(tcpclose);
NETIEVENT_SOCKET_DECL(tcplisten);
NETIEVENT_SOCKET_DECL(tcppauseread);
NETIEVENT_SOCKET_DECL(tcpstartread);
NETIEVENT_SOCKET_DECL(tcpstop);
NETIEVENT_SOCKET_DECL(tlsclose);
NETIEVENT_SOCKET_DECL(tlsconnect);
NETIEVENT_SOCKET_DECL(tlsdobio);
NETIEVENT_SOCKET_DECL(tlsstartread);
NETIEVENT_SOCKET_HANDLE_DECL(tlscancel);
NETIEVENT_SOCKET_DECL(udplisten);
NETIEVENT_SOCKET_DECL(udpstop);
@ -1916,15 +1851,12 @@ NETIEVENT_SOCKET_DECL(httpclose);
NETIEVENT_SOCKET_HTTP_EPS_DECL(httpendpoints);
#endif /* HAVE_LIBNGHTTP2 */
NETIEVENT_SOCKET_REQ_DECL(tcpconnect);
NETIEVENT_SOCKET_REQ_DECL(tcpsend);
NETIEVENT_SOCKET_REQ_DECL(tlssend);
NETIEVENT_SOCKET_REQ_RESULT_DECL(connectcb);
NETIEVENT_SOCKET_REQ_RESULT_DECL(readcb);
NETIEVENT_SOCKET_REQ_RESULT_DECL(sendcb);
NETIEVENT_SOCKET_HANDLE_DECL(tcpcancel);
NETIEVENT_SOCKET_HANDLE_DECL(udpcancel);
NETIEVENT_SOCKET_DECL(detach);

View file

@ -451,14 +451,8 @@ process_netievent(void *arg) {
NETIEVENT_CASE(udpcancel);
NETIEVENT_CASE(tcpaccept);
NETIEVENT_CASE(tcpconnect);
NETIEVENT_CASE(tcplisten);
NETIEVENT_CASE(tcpstartread);
NETIEVENT_CASE(tcppauseread);
NETIEVENT_CASE(tcpsend);
NETIEVENT_CASE(tcpstop);
NETIEVENT_CASE(tcpcancel);
NETIEVENT_CASE(tcpclose);
NETIEVENT_CASE(tcpdnsaccept);
NETIEVENT_CASE(tcpdnslisten);
@ -481,11 +475,9 @@ process_netievent(void *arg) {
NETIEVENT_CASE(tlsdnsshutdown);
#if HAVE_LIBNGHTTP2
NETIEVENT_CASE(tlsstartread);
NETIEVENT_CASE(tlssend);
NETIEVENT_CASE(tlsclose);
NETIEVENT_CASE(tlsdobio);
NETIEVENT_CASE(tlscancel);
NETIEVENT_CASE(httpsend);
NETIEVENT_CASE(httpclose);
@ -522,16 +514,11 @@ isc__nm_put_netievent(isc__networker_t *worker, void *ievent) {
isc__networker_unref(worker);
}
NETIEVENT_SOCKET_DEF(tcpclose);
NETIEVENT_SOCKET_DEF(tcplisten);
NETIEVENT_SOCKET_DEF(tcppauseread);
NETIEVENT_SOCKET_DEF(tcpstartread);
NETIEVENT_SOCKET_DEF(tcpstop);
NETIEVENT_SOCKET_DEF(tlsclose);
NETIEVENT_SOCKET_DEF(tlsconnect);
NETIEVENT_SOCKET_DEF(tlsdobio);
NETIEVENT_SOCKET_DEF(tlsstartread);
NETIEVENT_SOCKET_HANDLE_DEF(tlscancel);
NETIEVENT_SOCKET_DEF(udplisten);
NETIEVENT_SOCKET_DEF(udpstop);
NETIEVENT_SOCKET_HANDLE_DEF(udpcancel);
@ -562,15 +549,12 @@ NETIEVENT_SOCKET_DEF(httpclose);
NETIEVENT_SOCKET_HTTP_EPS_DEF(httpendpoints);
#endif /* HAVE_LIBNGHTTP2 */
NETIEVENT_SOCKET_REQ_DEF(tcpconnect);
NETIEVENT_SOCKET_REQ_DEF(tcpsend);
NETIEVENT_SOCKET_REQ_DEF(tlssend);
NETIEVENT_SOCKET_REQ_RESULT_DEF(connectcb);
NETIEVENT_SOCKET_REQ_RESULT_DEF(readcb);
NETIEVENT_SOCKET_REQ_RESULT_DEF(sendcb);
NETIEVENT_SOCKET_DEF(detach);
NETIEVENT_SOCKET_HANDLE_DEF(tcpcancel);
NETIEVENT_SOCKET_QUOTA_DEF(tcpaccept);
@ -802,6 +786,15 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG) {
}
}
void
isc_nmhandle_close(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
isc__nmsocket_clearcb(handle->sock);
isc__nm_failed_read_cb(handle->sock, ISC_R_EOF, false);
}
void
isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) {
REQUIRE(sock->parent == NULL);
@ -833,7 +826,7 @@ isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) {
*
* If it's a regular socket we may need to close it.
*/
if (!atomic_load(&sock->closed)) {
if (!atomic_load(&sock->closing) && !atomic_load(&sock->closed)) {
switch (sock->type) {
case isc_nm_udpsocket:
isc__nm_udp_close(sock);
@ -850,7 +843,7 @@ isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) {
#if HAVE_LIBNGHTTP2
case isc_nm_tlssocket:
isc__nm_tls_close(sock);
break;
return;
case isc_nm_httpsocket:
isc__nm_http_close(sock);
return;
@ -1003,7 +996,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc__networker_t *worker,
sock, isc_refcount_current(&sock->references));
atomic_init(&sock->active, true);
atomic_init(&sock->readpaused, false);
atomic_init(&sock->closing, false);
atomic_init(&sock->listening, 0);
atomic_init(&sock->closed, 0);
@ -1406,6 +1398,11 @@ isc__nm_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result, bool async) {
case isc_nm_tlsdnssocket:
isc__nm_tlsdns_failed_read_cb(sock, result, async);
return;
#ifdef HAVE_LIBNGHTTP2
case isc_nm_tlssocket:
isc__nm_tls_failed_read_cb(sock, result);
return;
#endif
default:
UNREACHABLE();
}
@ -1486,7 +1483,7 @@ isc__nmsocket_readtimeout_cb(uv_timer_t *timer) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->reading));
REQUIRE(sock->reading);
if (atomic_load(&sock->client)) {
uv_timer_stop(timer);
@ -1643,7 +1640,7 @@ isc__nm_start_reading(isc_nmsocket_t *sock) {
isc_result_t result = ISC_R_SUCCESS;
int r;
if (atomic_load(&sock->reading)) {
if (sock->reading) {
return (ISC_R_SUCCESS);
}
@ -1670,7 +1667,7 @@ isc__nm_start_reading(isc_nmsocket_t *sock) {
if (r != 0) {
result = isc_uverr2result(r);
} else {
atomic_store(&sock->reading, true);
sock->reading = true;
}
return (result);
@ -1680,7 +1677,7 @@ void
isc__nm_stop_reading(isc_nmsocket_t *sock) {
int r;
if (!atomic_load(&sock->reading)) {
if (!sock->reading) {
return;
}
@ -1698,7 +1695,7 @@ isc__nm_stop_reading(isc_nmsocket_t *sock) {
default:
UNREACHABLE();
}
atomic_store(&sock->reading, false);
sock->reading = false;
}
bool
@ -2046,58 +2043,30 @@ isc_nm_cancelread(isc_nmhandle_t *handle) {
case isc_nm_udpsocket:
isc__nm_udp_cancelread(handle);
break;
case isc_nm_tcpsocket:
isc__nm_tcp_cancelread(handle);
break;
case isc_nm_tcpdnssocket:
isc__nm_tcpdns_cancelread(handle);
break;
case isc_nm_tlsdnssocket:
isc__nm_tlsdns_cancelread(handle);
break;
#if HAVE_LIBNGHTTP2
case isc_nm_tlssocket:
isc__nm_tls_cancelread(handle);
break;
#endif
default:
UNREACHABLE();
}
}
void
isc_nm_pauseread(isc_nmhandle_t *handle) {
isc_nm_read_stop(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMHANDLE(handle));
isc_nmsocket_t *sock = handle->sock;
switch (sock->type) {
case isc_nm_tcpsocket:
isc__nm_tcp_pauseread(handle);
isc__nm_tcp_read_stop(handle);
break;
#if HAVE_LIBNGHTTP2
case isc_nm_tlssocket:
isc__nm_tls_pauseread(handle);
break;
#endif
default:
UNREACHABLE();
}
}
void
isc_nm_resumeread(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMHANDLE(handle));
isc_nmsocket_t *sock = handle->sock;
switch (sock->type) {
case isc_nm_tcpsocket:
isc__nm_tcp_resumeread(handle);
break;
#if HAVE_LIBNGHTTP2
case isc_nm_tlssocket:
isc__nm_tls_resumeread(handle);
isc__nm_tls_read_stop(handle);
break;
#endif
default:

View file

@ -55,9 +55,6 @@ can_log_tcp_quota(void) {
static isc_result_t
tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
static void
tcp_close_direct(isc_nmsocket_t *sock);
static isc_result_t
tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
static void
@ -173,36 +170,6 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
return (ISC_R_SUCCESS);
}
void
isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcpconnect_t *ievent =
(isc__netievent_tcpconnect_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
isc__nm_uvreq_t *req = ievent->req;
isc_result_t result = ISC_R_SUCCESS;
UNUSED(worker);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(sock->parent == NULL);
REQUIRE(sock->tid == isc_tid());
result = tcp_connect_direct(sock, req);
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->active, false);
if (sock->fd != (uv_os_sock_t)(-1)) {
isc__nm_tcp_close(sock);
}
isc__nm_connectcb(sock, req, result, true);
}
/*
* The sock is now attached to the handle.
*/
isc__nmsocket_detach(&sock);
}
static void
tcp_connect_cb(uv_connect_t *uvreq, int status) {
isc_result_t result = ISC_R_UNSET;
@ -295,7 +262,6 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
isc_nm_cb_t cb, void *cbarg, unsigned int timeout) {
isc_result_t result = ISC_R_SUCCESS;
isc_nmsocket_t *sock = NULL;
isc__netievent_tcpconnect_t *ievent = NULL;
isc__nm_uvreq_t *req = NULL;
sa_family_t sa_family;
isc__networker_t *worker = &mgr->workers[isc_tid()];
@ -335,14 +301,21 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
(void)isc__nm_socket_min_mtu(sock->fd, sa_family);
(void)isc__nm_socket_tcp_maxseg(sock->fd, NM_MAXSEG);
ievent = isc__nm_get_netievent_tcpconnect(worker, sock, req);
atomic_store(&sock->active, true);
isc__nm_async_tcpconnect(&mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
isc__nm_put_netievent_tcpconnect(worker, ievent);
atomic_store(&sock->active, true);
result = tcp_connect_direct(sock, req);
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->active, false);
if (sock->fd != (uv_os_sock_t)(-1)) {
isc__nm_tcp_close(sock);
}
isc__nm_connectcb(sock, req, result, true);
}
/*
* The sock is now attached to the handle.
*/
isc__nmsocket_detach(&sock);
}
static uv_os_sock_t
@ -670,6 +643,26 @@ isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) {
stop_tcp_parent(sock);
}
static void
tcp_stop_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = uv_handle_get_data(handle);
uv_handle_set_data(handle, NULL);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->closing));
REQUIRE(sock->type == isc_nm_tcpsocket);
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closed,
&(bool){ false }, true));
isc__nm_incstats(sock, STATID_CLOSE);
atomic_store(&sock->listening, false);
isc__nmsocket_detach(&sock);
}
void
isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcpstop_t *ievent = (isc__netievent_tcpstop_t *)ev0;
@ -739,12 +732,15 @@ destroy:
void
isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
isc_nmsocket_t *sock;
isc_nm_t *netmgr;
isc_result_t result;
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
isc_nmsocket_t *sock = handle->sock;
isc__netievent_tcpstartread_t *ievent = NULL;
isc_nm_t *netmgr = sock->worker->netmgr;
sock = handle->sock;
netmgr = sock->worker->netmgr;
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(sock->statichandle == handle);
@ -752,54 +748,34 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
sock->recv_cb = cb;
sock->recv_cbarg = cbarg;
sock->recv_read = true;
/* Initialize the timer */
if (sock->read_timeout == 0) {
sock->read_timeout = (atomic_load(&sock->keepalive)
? atomic_load(&netmgr->keepalive)
: atomic_load(&netmgr->idle));
}
ievent = isc__nm_get_netievent_tcpstartread(sock->worker, sock);
/*
* This MUST be done asynchronously, no matter which thread we're
* in. The callback function for isc_nm_read() often calls
* isc_nm_read() again; if we tried to do that synchronously
* we'd clash in processbuffer() and grow the stack indefinitely.
*/
isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
return;
}
void
isc__nm_async_tcpstartread(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcpstartread_t *ievent =
(isc__netievent_tcpstartread_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
isc_result_t result;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
UNUSED(worker);
if (isc__nmsocket_closing(sock)) {
result = ISC_R_CANCELED;
} else {
result = isc__nm_start_reading(sock);
goto failure;
}
result = isc__nm_start_reading(sock);
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->reading, true);
isc__nm_tcp_failed_read_cb(sock, result);
return;
goto failure;
}
isc__nmsocket_timer_start(sock);
return;
failure:
sock->reading = true;
isc__nm_tcp_failed_read_cb(sock, result);
}
void
isc__nm_tcp_pauseread(isc_nmhandle_t *handle) {
isc__netievent_tcppauseread_t *ievent = NULL;
isc__nm_tcp_read_stop(isc_nmhandle_t *handle) {
isc_nmsocket_t *sock = NULL;
REQUIRE(VALID_NMHANDLE(handle));
@ -808,61 +784,10 @@ isc__nm_tcp_pauseread(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMSOCK(sock));
if (!atomic_compare_exchange_strong(&sock->readpaused, &(bool){ false },
true)) {
return;
}
ievent = isc__nm_get_netievent_tcppauseread(sock->worker, sock);
isc__nm_maybe_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
return;
}
void
isc__nm_async_tcppauseread(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcppauseread_t *ievent =
(isc__netievent_tcppauseread_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
UNUSED(worker);
isc__nmsocket_timer_stop(sock);
isc__nm_stop_reading(sock);
}
void
isc__nm_tcp_resumeread(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
isc__netievent_tcpstartread_t *ievent = NULL;
isc_nmsocket_t *sock = handle->sock;
REQUIRE(sock->tid == isc_tid());
if (sock->recv_cb == NULL) {
/* We are no longer reading */
return;
}
if (!isc__nmsocket_active(sock)) {
atomic_store(&sock->reading, true);
isc__nm_tcp_failed_read_cb(sock, ISC_R_CANCELED);
return;
}
if (!atomic_compare_exchange_strong(&sock->readpaused, &(bool){ true },
false)) {
return;
}
ievent = isc__nm_get_netievent_tcpstartread(sock->worker, sock);
isc__nm_maybe_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
return;
}
void
@ -873,7 +798,7 @@ isc__nm_tcp_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->reading));
REQUIRE(sock->reading);
REQUIRE(buf != NULL);
netmgr = sock->worker->netmgr;
@ -912,7 +837,7 @@ isc__nm_tcp_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
isc__nm_readcb(sock, req, ISC_R_SUCCESS);
/* The readcb could have paused the reading */
if (atomic_load(&sock->reading)) {
if (sock->reading) {
/* The timer will be updated */
isc__nmsocket_timer_restart(sock);
}
@ -938,10 +863,12 @@ quota_accept_cb(isc_quota_t *quota, void *sock0) {
REQUIRE(VALID_NMSOCK(sock));
/*
* Create a tcpaccept event and pass it using the async channel.
* Create a tcpaccept event and pass it using the async channel. This
* needs to be asynchronous, because the quota might have been released
* by a different child socket.
*/
ievent = isc__nm_get_netievent_tcpaccept(sock->worker, sock, quota);
isc__nm_maybe_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
}
/*
@ -1048,8 +975,6 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
csock->read_timeout = atomic_load(&csock->worker->netmgr->init);
atomic_fetch_add(&ssock->parent->active_child_connections, 1);
/*
* The acceptcb needs to attach to the handle if it wants to keep the
* connection alive
@ -1082,10 +1007,12 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, const isc_region_t *region,
REQUIRE(VALID_NMSOCK(handle->sock));
isc_nmsocket_t *sock = handle->sock;
isc__netievent_tcpsend_t *ievent = NULL;
isc_result_t result;
isc__nm_uvreq_t *uvreq = NULL;
isc_nm_t *netmgr = sock->worker->netmgr;
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(sock->tid == isc_tid());
uvreq = isc__nm_uvreq_get(sock->worker, sock);
uvreq->uvbuf.base = (char *)region->base;
@ -1096,8 +1023,17 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, const isc_region_t *region,
uvreq->cb.send = cb;
uvreq->cbarg = cbarg;
ievent = isc__nm_get_netievent_tcpsend(sock->worker, sock, uvreq);
isc__nm_maybe_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
if (sock->write_timeout == 0) {
sock->write_timeout = (atomic_load(&sock->keepalive)
? atomic_load(&netmgr->keepalive)
: atomic_load(&netmgr->idle));
}
result = tcp_send_direct(sock, uvreq);
if (result != ISC_R_SUCCESS) {
isc__nm_incstats(sock, STATID_SENDFAIL);
isc__nm_failed_send_cb(sock, uvreq, result);
}
return;
}
@ -1124,34 +1060,6 @@ tcp_send_cb(uv_write_t *req, int status) {
isc__nm_sendcb(sock, uvreq, ISC_R_SUCCESS, false);
}
/*
* Handle 'tcpsend' async event - send a packet on the socket
*/
void
isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0) {
isc_result_t result;
isc__netievent_tcpsend_t *ievent = (isc__netievent_tcpsend_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
isc__nm_uvreq_t *uvreq = ievent->req;
isc_nm_t *netmgr = sock->worker->netmgr;
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(sock->tid == isc_tid());
UNUSED(worker);
if (sock->write_timeout == 0) {
sock->write_timeout = (atomic_load(&sock->keepalive)
? atomic_load(&netmgr->keepalive)
: atomic_load(&netmgr->idle));
}
result = tcp_send_direct(sock, uvreq);
if (result != ISC_R_SUCCESS) {
isc__nm_incstats(sock, STATID_SENDFAIL);
isc__nm_failed_send_cb(sock, uvreq, result);
}
}
static isc_result_t
tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
REQUIRE(VALID_NMSOCK(sock));
@ -1180,38 +1088,14 @@ tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
return (ISC_R_SUCCESS);
}
static void
tcp_stop_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = uv_handle_get_data(handle);
uv_handle_set_data(handle, NULL);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->closing));
REQUIRE(sock->type == isc_nm_tcpsocket);
if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false },
true)) {
UNREACHABLE();
}
isc__nm_incstats(sock, STATID_CLOSE);
atomic_store(&sock->listening, false);
isc__nmsocket_detach(&sock);
}
static void
tcp_close_sock(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->closing));
if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false },
true)) {
UNREACHABLE();
}
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closed,
&(bool){ false }, true));
isc__nm_incstats(sock, STATID_CLOSE);
@ -1232,22 +1116,16 @@ tcp_close_cb(uv_handle_t *handle) {
tcp_close_sock(sock);
}
static void
tcp_close_direct(isc_nmsocket_t *sock) {
void
isc__nm_tcp_close(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(!isc__nmsocket_active(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->closing));
REQUIRE(sock->parent == NULL);
if (sock->server != NULL) {
REQUIRE(VALID_NMSOCK(sock->server));
REQUIRE(VALID_NMSOCK(sock->server->parent));
if (sock->server->parent != NULL) {
atomic_fetch_sub(
&sock->server->parent->active_child_connections,
1);
}
}
RUNTIME_CHECK(atomic_compare_exchange_strong(&sock->closing,
&(bool){ false }, true));
if (sock->quota != NULL) {
isc_quota_detach(&sock->quota);
@ -1280,44 +1158,6 @@ tcp_close_direct(isc_nmsocket_t *sock) {
}
}
void
isc__nm_tcp_close(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(!isc__nmsocket_active(sock));
if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
true)) {
return;
}
if (sock->tid == isc_tid()) {
tcp_close_direct(sock);
} else {
/*
* We need to create an event and pass it using async channel
*/
isc__netievent_tcpclose_t *ievent =
isc__nm_get_netievent_tcpclose(sock->worker, sock);
isc__nm_enqueue_ievent(sock->worker,
(isc__netievent_t *)ievent);
}
}
void
isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcpclose_t *ievent = (isc__netievent_tcpclose_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
UNUSED(worker);
tcp_close_direct(sock);
}
static void
tcp_close_connect_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = uv_handle_get_data(handle);
@ -1375,44 +1215,3 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
isc__nmsocket_prep_destroy(sock);
}
}
void
isc__nm_tcp_cancelread(isc_nmhandle_t *handle) {
isc_nmsocket_t *sock = NULL;
isc__netievent_tcpcancel_t *ievent = NULL;
REQUIRE(VALID_NMHANDLE(handle));
sock = handle->sock;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpsocket);
ievent = isc__nm_get_netievent_tcpcancel(sock->worker, sock, handle);
isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
}
void
isc__nm_async_tcpcancel(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcpcancel_t *ievent = (isc__netievent_tcpcancel_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
UNUSED(worker);
uv_timer_stop(&sock->read_timer);
isc__nm_tcp_failed_read_cb(sock, ISC_R_EOF);
}
int_fast32_t
isc__nm_tcp_listener_nactive(isc_nmsocket_t *listener) {
int_fast32_t nactive;
REQUIRE(VALID_NMSOCK(listener));
nactive = atomic_load(&listener->active_child_connections);
INSIST(nactive >= 0);
return (nactive);
}

View file

@ -725,8 +725,10 @@ isc__nm_tcpdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
ievent = isc__nm_get_netievent_tcpdnsread(sock->worker, sock);
/*
* This MUST be done asynchronously, no matter which thread we're
* in. The callback function for isc_nm_read() often calls
* FIXME: This MUST be done asynchronously, ~~no matter which thread
* we're in.~~ ,only when there's existing data on the socket.
* The callback function for isc_nm_read() often calls
* isc_nm_read() again; if we tried to do that synchronously
* we'd clash in processbuffer() and grow the stack indefinitely.
*/
@ -754,7 +756,7 @@ isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0) {
}
if (result != ISC_R_SUCCESS) {
atomic_store(&sock->reading, true);
sock->reading = true;
isc__nm_failed_read_cb(sock, result, false);
}
}
@ -870,7 +872,7 @@ isc__nm_tcpdns_read_cb(uv_stream_t *stream, ssize_t nread,
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->reading));
REQUIRE(sock->reading);
REQUIRE(buf != NULL);
if (isc__nmsocket_closing(sock)) {

View file

@ -949,7 +949,7 @@ isc__nm_async_tlsdnsread(isc__networker_t *worker, isc__netievent_t *ev0) {
REQUIRE(sock->tid == isc_tid());
if (isc__nmsocket_closing(sock)) {
atomic_store(&sock->reading, true);
sock->reading = true;
isc__nm_failed_read_cb(sock, ISC_R_CANCELED, false);
return;
}
@ -1468,7 +1468,7 @@ isc__nm_tlsdns_read_cb(uv_stream_t *stream, ssize_t nread,
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
REQUIRE(atomic_load(&sock->reading));
REQUIRE(sock->reading);
REQUIRE(buf != NULL);
if (isc__nmsocket_closing(sock)) {

View file

@ -160,9 +160,8 @@ tls_senddone(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
isc_mem_put(handle->sock->worker->mctx, send_req, sizeof(*send_req));
tlssock->tlsstream.nsending--;
if (finish && eresult == ISC_R_SUCCESS) {
tlssock->tlsstream.reading = false;
isc_nm_cancelread(handle);
if (finish && eresult == ISC_R_SUCCESS && tlssock->reading) {
tls_failed_read_cb(tlssock, ISC_R_EOF);
} else if (eresult == ISC_R_SUCCESS) {
tls_do_bio(tlssock, NULL, NULL, false);
} else if (eresult != ISC_R_SUCCESS &&
@ -203,16 +202,8 @@ tls_failed_read_cb(isc_nmsocket_t *sock, const isc_result_t result) {
isc__nmsocket_clearcb(sock);
isc_nmhandle_detach(&handle);
} else if (sock->recv_cb != NULL && sock->statichandle != NULL) {
isc__nm_uvreq_t *req = NULL;
INSIST(VALID_NMHANDLE(sock->statichandle));
req = isc__nm_uvreq_get(sock->worker, sock);
req->cb.recv = sock->recv_cb;
req->cbarg = sock->recv_cbarg;
isc_nmhandle_attach(sock->statichandle, &req->handle);
if (result != ISC_R_TIMEDOUT) {
isc__nmsocket_clearcb(sock);
}
isc__nm_readcb(sock, req, result);
sock->recv_cb(sock->statichandle, result, NULL,
sock->recv_cbarg);
if (result == ISC_R_TIMEDOUT &&
(sock->outerhandle == NULL ||
isc__nmsocket_timer_running(sock->outerhandle->sock)))
@ -226,6 +217,16 @@ tls_failed_read_cb(isc_nmsocket_t *sock, const isc_result_t result) {
}
}
void
isc__nm_tls_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
if (!inactive(sock) && sock->tlsstream.state == TLS_IO) {
tls_do_bio(sock, NULL, NULL, true);
} else if (sock->reading) {
sock->reading = false;
tls_failed_read_cb(sock, result);
}
}
static void
async_tls_do_bio(isc_nmsocket_t *sock) {
isc__netievent_tlsdobio_t *ievent =
@ -386,14 +387,16 @@ tls_do_bio(isc_nmsocket_t *sock, isc_region_t *received_data,
int rv = 0;
size_t len = 0;
int saved_errno = 0;
bool was_reading;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
was_reading = sock->reading;
/* We will resume read if TLS layer wants us to */
if (sock->tlsstream.reading && sock->outerhandle) {
if (sock->reading && sock->outerhandle) {
REQUIRE(VALID_NMHANDLE(sock->outerhandle));
isc_nm_pauseread(sock->outerhandle);
isc_nm_read_stop(sock->outerhandle);
}
if (sock->tlsstream.state == TLS_INIT) {
@ -468,8 +471,7 @@ tls_do_bio(isc_nmsocket_t *sock, isc_region_t *received_data,
/* Decrypt and pass data from network to client */
if (sock->tlsstream.state >= TLS_IO && sock->recv_cb != NULL &&
!atomic_load(&sock->readpaused) &&
sock->statichandle != NULL && !finish)
was_reading && sock->statichandle != NULL && !finish)
{
uint8_t recv_buf[TLS_BUF_SIZE];
INSIST(sock->tlsstream.state > TLS_HANDSHAKE);
@ -495,7 +497,7 @@ tls_do_bio(isc_nmsocket_t *sock, isc_region_t *received_data,
if (sock->statichandle == NULL) {
finish = true;
break;
} else if (atomic_load(&sock->readpaused)) {
} else if (!sock->reading) {
/*
* Reading has been paused from withing
* the context of read callback - stop
@ -553,19 +555,14 @@ tls_do_bio(isc_nmsocket_t *sock, isc_region_t *received_data,
return;
case SSL_ERROR_WANT_READ:
if (tls_try_to_close_unused_socket(sock) ||
sock->outerhandle == NULL || atomic_load(&sock->readpaused))
{
sock->outerhandle == NULL) {
return;
}
INSIST(VALID_NMHANDLE(sock->outerhandle));
if (sock->tlsstream.reading) {
isc_nm_resumeread(sock->outerhandle);
} else if (sock->tlsstream.state == TLS_HANDSHAKE) {
sock->tlsstream.reading = true;
isc_nm_read(sock->outerhandle, tls_readcb, sock);
}
sock->reading = true;
isc_nm_read(sock->outerhandle, tls_readcb, sock);
return;
default:
result = tls_error_to_result(tls_status, sock->tlsstream.state,
@ -811,22 +808,8 @@ isc__nm_tls_send(isc_nmhandle_t *handle, const isc_region_t *region,
isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
}
void
isc__nm_async_tlsstartread(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tlsstartread_t *ievent =
(isc__netievent_tlsstartread_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
REQUIRE(sock->tid == isc_tid());
UNUSED(worker);
tls_do_bio(sock, NULL, NULL, false);
}
void
isc__nm_tls_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
isc__netievent_tlsstartread_t *ievent = NULL;
isc_nmsocket_t *sock = NULL;
REQUIRE(VALID_NMHANDLE(handle));
@ -835,7 +818,6 @@ isc__nm_tls_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->statichandle == handle);
REQUIRE(sock->tid == isc_tid());
REQUIRE(sock->recv_cb == NULL);
if (inactive(sock)) {
cb(handle, ISC_R_NOTCONNECTED, NULL, cbarg);
@ -845,37 +827,22 @@ isc__nm_tls_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
sock->recv_cb = cb;
sock->recv_cbarg = cbarg;
ievent = isc__nm_get_netievent_tlsstartread(sock->worker, sock);
isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
}
void
isc__nm_tls_pauseread(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
if (atomic_compare_exchange_strong(&handle->sock->readpaused,
&(bool){ false }, true))
{
if (handle->sock->outerhandle != NULL) {
isc_nm_pauseread(handle->sock->outerhandle);
}
if (sock->reading) {
return;
}
tls_do_bio(sock, NULL, NULL, false);
}
void
isc__nm_tls_resumeread(isc_nmhandle_t *handle) {
isc__nm_tls_read_stop(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock));
if (!atomic_compare_exchange_strong(&handle->sock->readpaused,
&(bool){ true }, false))
{
if (inactive(handle->sock)) {
return;
}
handle->sock->reading = false;
async_tls_do_bio(handle->sock);
if (handle->sock->outerhandle != NULL) {
isc_nm_read_stop(handle->sock->outerhandle);
}
}
@ -888,8 +855,10 @@ tls_close_direct(isc_nmsocket_t *sock) {
* external references, we can close everything.
*/
if (sock->outerhandle != NULL) {
isc_nm_pauseread(sock->outerhandle);
isc__nmsocket_clearcb(sock->outerhandle->sock);
sock->reading = false;
isc_nm_read_stop(sock->outerhandle);
isc_nmhandle_close(sock->outerhandle);
isc_nmhandle_detach(&sock->outerhandle);
}
@ -905,8 +874,6 @@ tls_close_direct(isc_nmsocket_t *sock) {
void
isc__nm_tls_close(isc_nmsocket_t *sock) {
isc__netievent_tlsclose_t *ievent = NULL;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tlssocket);
@ -915,8 +882,15 @@ isc__nm_tls_close(isc_nmsocket_t *sock) {
return;
}
ievent = isc__nm_get_netievent_tlsclose(sock->worker, sock);
isc__nm_maybe_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
if (sock->tid == isc_tid()) {
/* no point in attempting to make the call asynchronous */
tls_close_direct(sock);
} else {
isc__netievent_tlsclose_t *ievent =
isc__nm_get_netievent_tlsclose(sock->worker, sock);
isc__nm_enqueue_ievent(sock->worker,
(isc__netievent_t *)ievent);
}
}
void
@ -1052,49 +1026,6 @@ error:
isc__nmsocket_detach(&tlssock);
}
static void
tls_cancelread(isc_nmsocket_t *sock) {
if (!inactive(sock) && sock->tlsstream.state == TLS_IO) {
tls_do_bio(sock, NULL, NULL, true);
} else if (sock->outerhandle != NULL) {
sock->tlsstream.reading = false;
isc_nm_cancelread(sock->outerhandle);
}
}
void
isc__nm_tls_cancelread(isc_nmhandle_t *handle) {
isc_nmsocket_t *sock = NULL;
isc__netievent_tlscancel_t *ievent = NULL;
REQUIRE(VALID_NMHANDLE(handle));
sock = handle->sock;
REQUIRE(sock->type == isc_nm_tlssocket);
if (sock->tid == isc_tid()) {
tls_cancelread(sock);
} else {
ievent = isc__nm_get_netievent_tlscancel(sock->worker, sock,
handle);
isc__nm_enqueue_ievent(sock->worker,
(isc__netievent_t *)ievent);
}
}
void
isc__nm_async_tlscancel(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tlscancel_t *ievent = (isc__netievent_tlscancel_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_tid());
UNUSED(worker);
tls_cancelread(sock);
}
void
isc__nm_async_tlsdobio(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tlsdobio_t *ievent = (isc__netievent_tlsdobio_t *)ev0;

View file

@ -102,7 +102,7 @@ start_udp_child(isc_nm_t *mgr, isc_sockaddr_t *iface, isc_nmsocket_t *sock,
csock->recv_cb = sock->recv_cb;
csock->recv_cbarg = sock->recv_cbarg;
atomic_init(&csock->reading, true);
csock->reading = true;
if (mgr->load_balance_sockets) {
UNUSED(fd);
@ -968,7 +968,7 @@ isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
return;
fail:
atomic_store(&sock->reading, true); /* required by the next call */
sock->reading = true; /* required by the next call */
isc__nm_failed_read_cb(sock, result, false);
}

View file

@ -115,7 +115,7 @@ recv_data(isc_nmhandle_t *handle, isc_result_t eresult, isc_region_t *region,
return;
done:
isc_nm_pauseread(handle);
isc_nm_read_stop(handle);
ccmsg->cb(handle, ccmsg->result, ccmsg->cbarg);
}
@ -155,12 +155,8 @@ isccc_ccmsg_readmessage(isccc_ccmsg_t *ccmsg, isc_nm_cb_t cb, void *cbarg) {
ccmsg->result = ISC_R_UNEXPECTED; /* unknown right now */
ccmsg->length_received = false;
if (ccmsg->reading) {
isc_nm_resumeread(ccmsg->handle);
} else {
isc_nm_read(ccmsg->handle, recv_data, ccmsg);
ccmsg->reading = true;
}
isc_nm_read(ccmsg->handle, recv_data, ccmsg);
ccmsg->reading = true;
}
void
@ -168,7 +164,7 @@ isccc_ccmsg_cancelread(isccc_ccmsg_t *ccmsg) {
REQUIRE(VALID_CCMSG(ccmsg));
if (ccmsg->reading) {
isc_nm_cancelread(ccmsg->handle);
isc_nm_read_stop(ccmsg->handle);
ccmsg->reading = false;
}
}

View file

@ -94,6 +94,7 @@ atomic_bool check_listener_quota = false;
bool allow_send_back = false;
bool noanswer = false;
bool stream_use_TLS = false;
bool stream = false;
isc_nm_recv_cb_t connect_readcb = NULL;
@ -314,7 +315,11 @@ connect_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
if (eresult != ISC_R_SUCCESS) {
/* Send failed, we need to stop reading too */
isc_nm_cancelread(handle);
if (stream) {
isc_nm_read_stop(handle);
} else {
isc_nm_cancelread(handle);
}
goto unref;
}
@ -369,6 +374,7 @@ connect_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
case ISC_R_SHUTTINGDOWN:
case ISC_R_CANCELED:
case ISC_R_CONNECTIONRESET:
case ISC_R_CONNREFUSED:
break;
default:
fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
@ -378,6 +384,9 @@ connect_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
isc_refcount_decrement(&active_creads);
if (stream) {
isc_nm_read_stop(handle);
}
isc_nmhandle_detach(&handle);
}
@ -648,6 +657,8 @@ connect_success_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
UNUSED(handle);
UNUSED(cbarg);
F();
isc_refcount_decrement(&active_cconnects);
assert_int_equal(eresult, ISC_R_SUCCESS);
@ -693,11 +704,8 @@ noresponse_readcb(isc_nmhandle_t *handle, isc_result_t eresult,
UNUSED(region);
UNUSED(cbarg);
if (eresult == ISC_R_EOF) {
eresult = ISC_R_CONNECTIONRESET;
}
assert_int_equal(eresult, ISC_R_CONNECTIONRESET);
assert_true(eresult == ISC_R_CANCELED ||
eresult == ISC_R_CONNECTIONRESET || eresult == ISC_R_EOF);
isc_refcount_decrement(&active_creads);

View file

@ -118,6 +118,7 @@ extern atomic_bool check_listener_quota;
extern bool allow_send_back;
extern bool noanswer;
extern bool stream_use_TLS;
extern bool stream;
extern isc_nm_recv_cb_t connect_readcb;

View file

@ -130,4 +130,12 @@ ISC_TEST_ENTRY_CUSTOM(tcp_recv_send_quota_sendback, stream_recv_send_setup,
ISC_TEST_LIST_END
ISC_TEST_MAIN
static int
tcp_setup(void **state __attribute__((__unused__))) {
stream_use_TLS = false;
stream = true;
return (0);
}
ISC_TEST_MAIN_CUSTOM(tcp_setup, NULL)

View file

@ -132,6 +132,7 @@ ISC_TEST_LIST_END
static int
tls_setup(void **state __attribute__((__unused__))) {
stream_use_TLS = true;
stream = true;
return (0);
}