diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index b0b4833e4b..44938e7c56 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -98,14 +98,6 @@ struct isc_nmhandle { isc_nmsocket_t *sock; size_t ah_pos; /* Position in the socket's 'active handles' array */ - /* - * The handle is 'inflight' if netmgr is not currently processing - * it in any way - it might mean that e.g. a recursive resolution - * is happening. For an inflight handle we must wait for the - * calling code to finish before we can free it. - */ - atomic_bool inflight; - isc_sockaddr_t peer; isc_sockaddr_t local; isc_nm_opaquecb_t doreset; /* reset extra callback, external */ @@ -135,7 +127,9 @@ typedef enum isc__netievent_type { netievent_tcpaccept, netievent_tcpstop, netievent_tcpclose, + netievent_tcpdnsclose, + netievent_tcpdnssend, netievent_closecb, netievent_shutdown, @@ -227,6 +221,7 @@ typedef struct isc__netievent__socket_req { typedef isc__netievent__socket_req_t isc__netievent_tcpconnect_t; typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t; typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t; +typedef isc__netievent__socket_req_t isc__netievent_tcpdnssend_t; typedef struct isc__netievent__socket_streaminfo_quota { isc__netievent_type type; @@ -650,6 +645,12 @@ isc__nmsocket_active(isc_nmsocket_t *sock); * or, for child sockets, 'sock->parent->active'. */ +void +isc__nmsocket_clearcb(isc_nmsocket_t *sock); +/*%< + * Clear the recv and accept callbacks in 'sock'. + */ + void isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0); /*%< @@ -774,6 +775,9 @@ isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock); void isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ev0); +void +isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0); + #define isc__nm_uverr2result(x) \ isc___nm_uverr2result(x, true, __FILE__, __LINE__) isc_result_t diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 7fd029178c..da5df23abd 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -621,6 +621,9 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) { case netievent_tcpsend: isc__nm_async_tcpsend(worker, ievent); break; + case netievent_tcpdnssend: + isc__nm_async_tcpdnssend(worker, ievent); + break; case netievent_tcpstop: isc__nm_async_tcpstop(worker, ievent); break; @@ -833,10 +836,13 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) { if (active_handles == 0 || sock->tcphandle != NULL) { destroy = true; } - UNLOCK(&sock->lock); if (destroy) { + atomic_store(&sock->destroying, true); + UNLOCK(&sock->lock); nmsocket_cleanup(sock, true); + } else { + UNLOCK(&sock->lock); } } @@ -987,6 +993,16 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type, sock->magic = NMSOCK_MAGIC; } +void +isc__nmsocket_clearcb(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); + + sock->rcb.recv = NULL; + sock->rcbarg = NULL; + sock->accept_cb.accept = NULL; + sock->accept_cbarg = NULL; +} + void isc__nm_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) { isc_nmsocket_t *sock = uv_handle_get_data(handle); diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index f38f31070b..e3e222dead 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -584,6 +584,7 @@ readtimeout_cb(uv_timer_t *handle) { if (sock->rcb.recv != NULL) { sock->rcb.recv(sock->tcphandle, ISC_R_TIMEDOUT, NULL, sock->rcbarg); + isc__nmsocket_clearcb(sock); } } @@ -682,7 +683,9 @@ isc__nm_tcp_resumeread(isc_nmsocket_t *sock) { isc__netievent_startread_t *ievent = NULL; REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->rcb.recv != NULL); + if (sock->rcb.recv == NULL) { + return (ISC_R_CANCELED); + } if (!atomic_load(&sock->readpaused)) { return (ISC_R_SUCCESS); @@ -744,6 +747,7 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { if (sock->rcb.recv != NULL) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]); sock->rcb.recv(sock->tcphandle, ISC_R_EOF, NULL, sock->rcbarg); + isc__nmsocket_clearcb(sock); } /* @@ -1080,6 +1084,7 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { { sock->rcb.recv(sock->tcphandle, ISC_R_CANCELED, NULL, sock->rcbarg); + isc__nmsocket_clearcb(sock); } } @@ -1092,5 +1097,6 @@ isc__nm_tcp_cancelread(isc_nmsocket_t *sock) { { sock->rcb.recv(sock->tcphandle, ISC_R_CANCELED, NULL, sock->rcbarg); + isc__nmsocket_clearcb(sock); } } diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 19a31870f2..4e49e47c34 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -231,6 +231,11 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult, if (dnssock->self != NULL) { isc__nmsocket_detach(&dnssock->self); } + isc__nmsocket_clearcb(dnssock); + if (dnssock->outerhandle != NULL) { + isc_nmhandle_unref(dnssock->outerhandle); + dnssock->outerhandle = NULL; + } return; } @@ -340,8 +345,7 @@ isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock) { atomic_store(&sock->listening, false); atomic_store(&sock->closed, true); - sock->rcb.recv = NULL; - sock->rcbarg = NULL; + isc__nmsocket_clearcb(sock); if (sock->outer != NULL) { isc__nm_tcp_stoplistening(sock->outer); @@ -385,15 +389,6 @@ isc_nm_tcpdns_keepalive(isc_nmhandle_t *handle) { atomic_store(&handle->sock->outerhandle->sock->keepalive, true); } -typedef struct tcpsend { - isc_mem_t *mctx; - isc_nmhandle_t *handle; - isc_region_t region; - isc_nmhandle_t *orighandle; - isc_nm_cb_t cb; - void *cbarg; -} tcpsend_t; - static void resume_processing(void *arg) { isc_nmsocket_t *sock = (isc_nmsocket_t *)arg; @@ -431,7 +426,11 @@ resume_processing(void *arg) { } isc_nmhandle_unref(handle); } else if (sock->outerhandle != NULL) { - isc_nm_resumeread(sock->outerhandle->sock); + result = isc_nm_resumeread(sock->outerhandle->sock); + if (result != ISC_R_SUCCESS) { + isc_nmhandle_unref(sock->outerhandle); + sock->outerhandle = NULL; + } } return; @@ -466,15 +465,40 @@ resume_processing(void *arg) { static void tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { - tcpsend_t *ts = (tcpsend_t *)cbarg; + isc__nm_uvreq_t *req = (isc__nm_uvreq_t *)cbarg; - ts->cb(ts->orighandle, result, ts->cbarg); - isc_mem_put(ts->mctx, ts->region.base, ts->region.length); + UNUSED(handle); - isc_nmhandle_unref(ts->orighandle); - isc_mem_putanddetach(&ts->mctx, ts, sizeof(*ts)); + req->cb.send(req->handle, result, req->cbarg); + isc_mem_put(req->sock->mgr->mctx, req->uvbuf.base, req->uvbuf.len); + isc__nm_uvreq_put(&req, req->handle->sock); +} - isc_nmhandle_unref(handle); +void +isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0) { + isc_result_t result; + isc__netievent_tcpdnssend_t *ievent = + (isc__netievent_tcpdnssend_t *)ev0; + isc__nm_uvreq_t *req = ievent->req; + isc_nmsocket_t *sock = ievent->sock; + + REQUIRE(worker->id == sock->tid); + + result = ISC_R_NOTCONNECTED; + if (atomic_load(&sock->active) && sock->outerhandle != NULL) { + isc_region_t r; + + r.base = (unsigned char *)req->uvbuf.base; + r.length = req->uvbuf.len; + result = isc__nm_tcp_send(sock->outerhandle, &r, tcpdnssend_cb, + req); + } + + if (result != ISC_R_SUCCESS) { + req->cb.send(req->handle, result, req->cbarg); + isc_mem_put(sock->mgr->mctx, req->uvbuf.base, req->uvbuf.len); + isc__nm_uvreq_put(&req, sock); + } } /* @@ -483,7 +507,7 @@ tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { isc_result_t isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, void *cbarg) { - tcpsend_t *t = NULL; + isc__nm_uvreq_t *uvreq = NULL; REQUIRE(VALID_NMHANDLE(handle)); @@ -492,31 +516,39 @@ isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region, REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_tcpdnssocket); - if (sock->outerhandle == NULL) { - /* The socket is closed */ - return (ISC_R_NOTCONNECTED); + uvreq = isc__nm_uvreq_get(sock->mgr, sock); + uvreq->handle = handle; + isc_nmhandle_ref(uvreq->handle); + uvreq->cb.send = cb; + uvreq->cbarg = cbarg; + + uvreq->uvbuf.base = isc_mem_get(sock->mgr->mctx, region->length + 2); + uvreq->uvbuf.len = region->length + 2; + *(uint16_t *)uvreq->uvbuf.base = htons(region->length); + memmove(uvreq->uvbuf.base + 2, region->base, region->length); + + if (sock->tid == isc_nm_tid()) { + isc_region_t r; + + r.base = (unsigned char *)uvreq->uvbuf.base; + r.length = uvreq->uvbuf.len; + + return (isc__nm_tcp_send(sock->outerhandle, &r, tcpdnssend_cb, + uvreq)); + } else { + isc__netievent_tcpdnssend_t *ievent = NULL; + + ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpdnssend); + ievent->req = uvreq; + ievent->sock = sock; + + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + + return (ISC_R_SUCCESS); } - t = isc_mem_get(sock->mgr->mctx, sizeof(*t)); - *t = (tcpsend_t){ - .cb = cb, - .cbarg = cbarg, - .handle = handle->sock->outerhandle, - }; - - isc_mem_attach(sock->mgr->mctx, &t->mctx); - t->orighandle = handle; - isc_nmhandle_ref(t->orighandle); - isc_nmhandle_ref(t->handle); - - t->region = (isc_region_t){ .base = isc_mem_get(t->mctx, - region->length + 2), - .length = region->length + 2 }; - - *(uint16_t *)t->region.base = htons(region->length); - memmove(t->region.base + 2, region->base, region->length); - - return (isc_nm_send(t->handle, &t->region, tcpdnssend_cb, t)); + return (ISC_R_UNEXPECTED); } static void @@ -524,7 +556,9 @@ tcpdns_close_direct(isc_nmsocket_t *sock) { REQUIRE(sock->tid == isc_nm_tid()); /* We don't need atomics here, it's all in single network thread */ - if (sock->timer_initialized) { + if (sock->self != NULL) { + isc__nmsocket_detach(&sock->self); + } else if (sock->timer_initialized) { /* * We need to fire the timer callback to clean it up, * it will then call us again (via detach) so that we @@ -533,15 +567,13 @@ tcpdns_close_direct(isc_nmsocket_t *sock) { sock->timer_initialized = false; uv_timer_stop(&sock->timer); uv_close((uv_handle_t *)&sock->timer, timer_close_cb); - } else if (sock->self != NULL) { - isc__nmsocket_detach(&sock->self); } else { /* * At this point we're certain that there are no external * references, we can close everything. */ if (sock->outerhandle != NULL) { - sock->outerhandle->sock->rcb.recv = NULL; + isc__nmsocket_clearcb(sock->outerhandle->sock); isc_nmhandle_unref(sock->outerhandle); sock->outerhandle = NULL; } diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index b2b8166afb..0e782f5cde 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -210,19 +210,6 @@ static void stoplistening(isc_nmsocket_t *sock) { REQUIRE(sock->type == isc_nm_udplistener); - /* - * Socket is already closing; there's nothing to do. - */ - if (!isc__nmsocket_active(sock)) { - return; - } - - /* - * Mark it inactive now so that all sends will be ignored - * and we won't try to stop listening again. - */ - atomic_store(&sock->active, false); - for (int i = 0; i < sock->nchildren; i++) { isc__netievent_udpstop_t *event = NULL; @@ -256,6 +243,18 @@ isc__nm_udp_stoplistening(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_udplistener); + /* + * Socket is already closing; there's nothing to do. + */ + if (!isc__nmsocket_active(sock)) { + return; + } + /* + * Mark it inactive now so that all sends will be ignored + * and we won't try to stop listening again. + */ + atomic_store(&sock->active, false); + /* * If the manager is interlocked, re-enqueue this as an asynchronous * event. Otherwise, go ahead and stop listening right away. @@ -336,10 +335,17 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, #endif /* - * If addr == NULL that's the end of stream - we can - * free the buffer and bail. + * Three reasons to return now without processing: + * - If addr == NULL that's the end of stream - we can + * free the buffer and bail. + * - If we're simulating a firewall blocking UDP packets + * bigger than 'maxudp' bytes for testing purposes. + * - If the socket is no longer active. */ - if (addr == NULL) { + maxudp = atomic_load(&sock->mgr->maxudp); + if ((addr == NULL) || (maxudp != 0 && (uint32_t)nrecv > maxudp) || + (!isc__nmsocket_active(sock))) + { if (free_buf) { isc__nm_free_uvbuf(sock, buf); } @@ -347,16 +353,6 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, return; } - /* - * Simulate a firewall blocking UDP packets bigger than - * 'maxudp' bytes. - */ - maxudp = atomic_load(&sock->mgr->maxudp); - if (maxudp != 0 && (uint32_t)nrecv > maxudp) { - isc__nmsocket_detach(&sock); - return; - } - result = isc_sockaddr_fromsockaddr(&sockaddr, addr); RUNTIME_CHECK(result == ISC_R_SUCCESS); nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL); @@ -398,7 +394,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, uint32_t maxudp = atomic_load(&sock->mgr->maxudp); /* - * Simulate a firewall blocking UDP packets bigger than + * We're simulating a firewall blocking UDP packets bigger than * 'maxudp' bytes, for testing purposes. * * The client would ordinarily have unreferenced the handle @@ -522,6 +518,9 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->type == isc_nm_udpsocket); + if (!isc__nmsocket_active(sock)) { + return (ISC_R_CANCELED); + } isc_nmhandle_ref(req->handle); rv = uv_udp_send(&req->uv_req.udp_send, &sock->uv_handle.udp, &req->uvbuf, 1, &peer->type.sa, udp_send_cb);