diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 3a62bcce3a..e2ac08fe04 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -266,8 +266,6 @@ typedef enum isc__netievent_type { netievent_httpsend, netievent_httpendpoints, - netievent_streamdnsclose, - netievent_streamdnssend, netievent_streamdnsread, netievent_streamdnscancel, @@ -1633,16 +1631,10 @@ void isc__nm_streamdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg); -void -isc__nm_async_streamdnssend(isc__networker_t *worker, isc__netievent_t *ev0); - void isc__nm_streamdns_send(isc_nmhandle_t *handle, const isc_region_t *region, isc_nm_cb_t cb, void *cbarg); -void -isc__nm_async_streamdnsclose(isc__networker_t *worker, isc__netievent_t *ev0); - void isc__nm_streamdns_close(isc_nmsocket_t *sock); @@ -1854,8 +1846,6 @@ NETIEVENT_SOCKET_HANDLE_TYPE(udpcancel); NETIEVENT_SOCKET_QUOTA_TYPE(tcpaccept); -NETIEVENT_SOCKET_TYPE(streamdnsclose); -NETIEVENT_SOCKET_REQ_TYPE(streamdnssend); NETIEVENT_SOCKET_TYPE(streamdnsread); NETIEVENT_SOCKET_HANDLE_TYPE(streamdnscancel); @@ -1900,8 +1890,6 @@ NETIEVENT_SOCKET_DECL(detach); NETIEVENT_SOCKET_QUOTA_DECL(tcpaccept); -NETIEVENT_SOCKET_DECL(streamdnsclose); -NETIEVENT_SOCKET_REQ_DECL(streamdnssend); NETIEVENT_SOCKET_DECL(streamdnsread); NETIEVENT_SOCKET_HANDLE_DECL(streamdnscancel); diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 24caf453da..ab48c3cdee 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -459,8 +459,6 @@ process_netievent(void *arg) { NETIEVENT_CASE(httpendpoints); #endif NETIEVENT_CASE(streamdnsread); - NETIEVENT_CASE(streamdnssend); - NETIEVENT_CASE(streamdnsclose); NETIEVENT_CASE(streamdnscancel); NETIEVENT_CASE(settlsctx); @@ -519,8 +517,6 @@ NETIEVENT_SOCKET_DEF(detach); NETIEVENT_SOCKET_QUOTA_DEF(tcpaccept); -NETIEVENT_SOCKET_DEF(streamdnsclose); -NETIEVENT_SOCKET_REQ_DEF(streamdnssend); NETIEVENT_SOCKET_DEF(streamdnsread); NETIEVENT_SOCKET_HANDLE_DEF(streamdnscancel); diff --git a/lib/isc/netmgr/streamdns.c b/lib/isc/netmgr/streamdns.c index 2024f5e362..2fbee5cbf9 100644 --- a/lib/isc/netmgr/streamdns.c +++ b/lib/isc/netmgr/streamdns.c @@ -53,7 +53,7 @@ * 'streamdns_send_req_t' object. * * To understand how sending is done, start by looking at - * 'isc__nm_async_streamdnssend()'. Additionally also take a look at + * 'isc__nm_streamdns_send()'. Additionally also take a look at * 'streamdns_get_send_req()' and 'streamdns_put_send_req()' which are * responsible for send requests allocation/reuse and initialisation. * @@ -822,16 +822,15 @@ isc__nm_streamdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { isc_nmsocket_t *sock = NULL; bool closing = false; - bool worker_thread; REQUIRE(VALID_NMHANDLE(handle)); sock = handle->sock; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_streamdnssocket); REQUIRE(sock->recv_handle == NULL); + REQUIRE(sock->tid == isc_tid()); closing = streamdns_closing(sock); - worker_thread = sock->tid == isc_tid(); sock->recv_cb = cb; sock->recv_cbarg = cbarg; @@ -843,64 +842,28 @@ isc__nm_streamdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, * asynchronous as we just want to start reading from the * underlying transport. */ - if (worker_thread && !closing && - isc_dnsstream_assembler_result(sock->streamdns.input) == - ISC_R_UNSET) + if (!closing && isc_dnsstream_assembler_result(sock->streamdns.input) == + ISC_R_UNSET) { isc__netievent_streamdnsread_t event = { .sock = sock }; isc__nm_async_streamdnsread(sock->worker, (isc__netievent_t *)&event); - } else { - isc__netievent_streamdnsread_t *ievent = NULL; - /* - * We want the read operation to be asynchronous in most cases - * because: - * - * 1. A read operation might be initiated from within the read - * callback itself. - * - * 2. Due to the above, we need to make the operation - * asynchronous to keep the socket state consistent. - */ - ievent = isc__nm_get_netievent_streamdnsread(sock->worker, - sock); - isc__nm_enqueue_ievent(sock->worker, - (isc__netievent_t *)ievent); - } -} - -void -isc__nm_async_streamdnssend(isc__networker_t *worker, isc__netievent_t *ev0) { - isc__netievent_streamdnssend_t *ievent = - (isc__netievent_streamdnssend_t *)ev0; - isc_nmsocket_t *sock = ievent->sock; - isc__nm_uvreq_t *req = ievent->req; - streamdns_send_req_t *send_req; - isc_mem_t *mctx; - isc_region_t data = { 0 }; - - REQUIRE(VALID_UVREQ(req)); - REQUIRE(sock->tid == isc_tid()); - - UNUSED(worker); - - ievent->req = NULL; - - if (streamdns_closing(sock)) { - isc__nm_failed_send_cb(sock, req, ISC_R_CANCELED, true); return; } - mctx = sock->worker->mctx; - - send_req = streamdns_get_send_req(sock, mctx, req); - data.base = (unsigned char *)req->uvbuf.base; - data.length = req->uvbuf.len; - isc__nm_senddns(sock->outerhandle, &data, streamdns_writecb, - (void *)send_req); - - isc__nm_uvreq_put(&req, sock); - return; + /* + * We want the read operation to be asynchronous in most cases + * because: + * + * 1. A read operation might be initiated from within the read + * callback itself. + * + * 2. Due to the above, we need to make the operation + * asynchronous to keep the socket state consistent. + */ + isc__netievent_streamdnsread_t *ievent = + isc__nm_get_netievent_streamdnsread(sock->worker, sock); + isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent); } void @@ -908,6 +871,9 @@ isc__nm_streamdns_send(isc_nmhandle_t *handle, const isc_region_t *region, isc_nm_cb_t cb, void *cbarg) { isc__nm_uvreq_t *uvreq = NULL; isc_nmsocket_t *sock = NULL; + streamdns_send_req_t *send_req; + isc_mem_t *mctx; + isc_region_t data = { 0 }; REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); @@ -916,6 +882,7 @@ isc__nm_streamdns_send(isc_nmhandle_t *handle, const isc_region_t *region, sock = handle->sock; REQUIRE(sock->type == isc_nm_streamdnssocket); + REQUIRE(sock->tid == isc_tid()); uvreq = isc__nm_uvreq_get(sock->worker, sock); isc_nmhandle_attach(handle, &uvreq->handle); @@ -924,23 +891,24 @@ isc__nm_streamdns_send(isc_nmhandle_t *handle, const isc_region_t *region, uvreq->uvbuf.base = (char *)region->base; uvreq->uvbuf.len = region->length; + if (streamdns_closing(sock)) { + isc__nm_failed_send_cb(sock, uvreq, ISC_R_CANCELED, true); + return; + } + /* * As when sending, we, basically, handing data to the underlying * transport, we can treat the operation synchronously, as the * transport code will take care of the asynchronicity if required. */ - if (sock->tid == isc_tid()) { - isc__netievent_streamdnssend_t event = { .sock = sock, - .req = uvreq }; - isc__nm_async_streamdnssend(sock->worker, - (isc__netievent_t *)&event); - } else { - isc__netievent_streamdnssend_t *ievent = - isc__nm_get_netievent_streamdnssend(sock->worker, sock, - uvreq); - isc__nm_enqueue_ievent(sock->worker, - (isc__netievent_t *)ievent); - } + mctx = sock->worker->mctx; + send_req = streamdns_get_send_req(sock, mctx, uvreq); + data.base = (unsigned char *)uvreq->uvbuf.base; + data.length = uvreq->uvbuf.len; + isc__nm_senddns(sock->outerhandle, &data, streamdns_writecb, + (void *)send_req); + + isc__nm_uvreq_put(&uvreq, sock); } static void @@ -969,21 +937,11 @@ streamdns_close_direct(isc_nmsocket_t *sock) { atomic_store(&sock->active, false); } -void -isc__nm_async_streamdnsclose(isc__networker_t *worker, isc__netievent_t *ev0) { - isc__netievent_streamdnsclose_t *ievent = - (isc__netievent_streamdnsclose_t *)ev0; - isc_nmsocket_t *sock = ievent->sock; - - UNUSED(worker); - - streamdns_close_direct(sock); -} - void isc__nm_streamdns_close(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_streamdnssocket); + REQUIRE(sock->tid == isc_tid()); if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, true)) @@ -991,15 +949,7 @@ isc__nm_streamdns_close(isc_nmsocket_t *sock) { return; } - if (sock->tid == isc_tid()) { - streamdns_close_direct(sock); - } else { - isc__netievent_streamdnsclose_t *ievent = - isc__nm_get_netievent_streamdnsclose(sock->worker, - sock); - isc__nm_enqueue_ievent(sock->worker, - (isc__netievent_t *)ievent); - } + streamdns_close_direct(sock); } void