diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index a30c017923..5be0c8d232 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -41,6 +41,8 @@ #define ISC_NETMGR_TID_UNKNOWN -1 +#define ISC_NETMGR_TLSBUF_SIZE 65536 + #if !defined(WIN32) /* * New versions of libuv support recvmmsg on unices. @@ -1850,3 +1852,49 @@ void isc__nm_tcpdns_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result); void isc__nm_tlsdns_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result); + +isc_result_t +isc__nm_tcpdns_processbuffer(isc_nmsocket_t *sock); +isc_result_t +isc__nm_tlsdns_processbuffer(isc_nmsocket_t *sock); + +isc__nm_uvreq_t * +isc__nm_get_read_req(isc_nmsocket_t *sock, isc_sockaddr_t *sockaddr); + +void +isc__nm_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf); + +void +isc__nm_udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, + const struct sockaddr *addr, unsigned flags); +void +isc__nm_tcpdns_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf); +void +isc__nm_tlsdns_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf); + +void +isc__nm_start_reading(isc_nmsocket_t *sock); +void +isc__nm_stop_reading(isc_nmsocket_t *sock); +void +isc__nm_process_sock_buffer(isc_nmsocket_t *sock); +void +isc__nm_resume_processing(void *arg); +bool +isc__nm_inactive(isc_nmsocket_t *sock); + +void +isc__nm_alloc_dnsbuf(isc_nmsocket_t *sock, size_t len); + +void +isc__nm_failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, + isc_result_t eresult); +void +isc__nm_failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult); +void +isc__nm_failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, + isc_result_t eresult); +void +isc__nm_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result); + +#define STREAM_CLIENTS_PER_CONN 23 diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 47add06aa1..fa81a8b35f 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -1583,8 +1583,86 @@ isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg, handle->dofree = dofree; } -static void -isc__nmsocket_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { +void +isc__nm_alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) { + REQUIRE(len <= NM_BIG_BUF); + + if (sock->buf == NULL) { + /* We don't have the buffer at all */ + size_t alloc_len = len < NM_REG_BUF ? NM_REG_BUF : NM_BIG_BUF; + sock->buf = isc_mem_allocate(sock->mgr->mctx, alloc_len); + sock->buf_size = alloc_len; + } else { + /* We have the buffer but it's too small */ + sock->buf = isc_mem_reallocate(sock->mgr->mctx, sock->buf, + NM_BIG_BUF); + sock->buf_size = NM_BIG_BUF; + } +} + +void +isc__nm_failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, + isc_result_t eresult) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(req)); + + if (req->cb.send != NULL) { + isc__nm_sendcb(sock, req, eresult, true); + } else { + isc__nm_uvreq_put(&req, sock); + } +} + +void +isc__nm_failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) { + REQUIRE(sock->accepting); + REQUIRE(sock->server); + + /* + * Detach the quota early to make room for other connections; + * otherwise it'd be detached later asynchronously, and clog + * the quota unnecessarily. + */ + if (sock->quota != NULL) { + isc_quota_detach(&sock->quota); + } + + isc__nmsocket_detach(&sock->server); + + sock->accepting = false; + + switch (eresult) { + case ISC_R_NOTCONNECTED: + /* IGNORE: The client disconnected before we could accept */ + break; + default: + isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, + ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, + "Accepting TCP connection failed: %s", + isc_result_totext(eresult)); + } +} + +void +isc__nm_failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, + isc_result_t eresult) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(req)); + REQUIRE(sock->tid == isc_nm_tid()); + REQUIRE(atomic_load(&sock->connecting)); + REQUIRE(req->cb.connect != NULL); + + atomic_store(&sock->connecting, false); + + isc__nmsocket_clearcb(sock); + + isc__nm_connectcb(sock, req, eresult); + + isc__nmsocket_prep_destroy(sock); +} + +void +isc__nm_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { REQUIRE(VALID_NMSOCK(sock)); switch (sock->type) { case isc_nm_udpsocket: @@ -1613,7 +1691,7 @@ isc__nmsocket_readtimeout_cb(uv_timer_t *timer) { REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->reading); - isc__nmsocket_failed_read_cb(sock, ISC_R_TIMEDOUT); + isc__nm_failed_read_cb(sock, ISC_R_TIMEDOUT); } void diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 378f828186..276e5e8c6c 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -34,7 +34,6 @@ #include "netmgr-int.h" #include "uv-compat.h" -#define TCPDNS_CLIENTS_PER_CONN 23 /*%< * * Maximum number of simultaneous handles in flight supported for a single @@ -57,12 +56,6 @@ can_log_tcpdns_quota(void) { return (false); } -static void -tcpdns_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf); - -static void -resume_processing(void *arg); - static isc_result_t tcpdns_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req); @@ -75,9 +68,6 @@ tcpdns_connect_cb(uv_connect_t *uvreq, int status); static void tcpdns_connection_cb(uv_stream_t *server, int status); -static void -read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf); - static void tcpdns_close_cb(uv_handle_t *uvhandle); @@ -87,98 +77,11 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota); static void quota_accept_cb(isc_quota_t *quota, void *sock0); -static void -failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult); - -static void -failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, - isc_result_t eresult); - static void stop_tcpdns_parent(isc_nmsocket_t *sock); static void stop_tcpdns_child(isc_nmsocket_t *sock); -static void -process_sock_buffer(isc_nmsocket_t *sock); - -static void -stop_reading(isc_nmsocket_t *sock); - -static isc__nm_uvreq_t * -get_read_req(isc_nmsocket_t *sock); - -static inline void -alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) { - REQUIRE(len <= NM_BIG_BUF); - - if (sock->buf == NULL) { - /* We don't have the buffer at all */ - size_t alloc_len = len < NM_REG_BUF ? NM_REG_BUF : NM_BIG_BUF; - sock->buf = isc_mem_allocate(sock->mgr->mctx, alloc_len); - sock->buf_size = alloc_len; - } else { - /* We have the buffer but it's too small */ - sock->buf = isc_mem_reallocate(sock->mgr->mctx, sock->buf, - NM_BIG_BUF); - sock->buf_size = NM_BIG_BUF; - } -} - -static bool -inactive(isc_nmsocket_t *sock) { - return (!isc__nmsocket_active(sock) || atomic_load(&sock->closing) || - atomic_load(&sock->mgr->closing) || - (sock->server != NULL && !isc__nmsocket_active(sock->server))); -} - -static void -failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) { - REQUIRE(sock->accepting); - REQUIRE(sock->server); - - /* - * Detach the quota early to make room for other connections; - * otherwise it'd be detached later asynchronously, and clog - * the quota unnecessarily. - */ - if (sock->quota != NULL) { - isc_quota_detach(&sock->quota); - } - - isc__nmsocket_detach(&sock->server); - - sock->accepting = false; - - switch (eresult) { - case ISC_R_NOTCONNECTED: - /* IGNORE: The client disconnected before we could accept */ - break; - default: - isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, - ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, - "Accepting TCP connection failed: %s", - isc_result_totext(eresult)); - } -} - -static void -failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, - isc_result_t eresult) { - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(VALID_UVREQ(req)); - REQUIRE(sock->tid == isc_nm_tid()); - REQUIRE(atomic_load(&sock->connecting)); - REQUIRE(req->cb.connect != NULL); - - atomic_store(&sock->connecting, false); - - isc__nmsocket_clearcb(sock); - isc__nm_connectcb(sock, req, eresult); - - isc__nmsocket_prep_destroy(sock); -} - static isc_result_t tcpdns_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__networker_t *worker = NULL; @@ -326,7 +229,7 @@ tcpdns_connect_cb(uv_connect_t *uvreq, int status) { return; error: - failed_connect_cb(sock, req, result); + isc__nm_failed_connect_cb(sock, req, result); } isc_result_t @@ -643,7 +546,7 @@ tcpdns_connection_cb(uv_stream_t *server, int status) { REQUIRE(VALID_NMSOCK(ssock)); REQUIRE(ssock->tid == isc_nm_tid()); - if (inactive(ssock)) { + if (isc__nm_inactive(ssock)) { result = ISC_R_CANCELED; goto done; } @@ -720,13 +623,13 @@ isc__nm_async_tcpdnsstop(isc__networker_t *worker, isc__netievent_t *ev0) { } } -static void -failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { +void +isc__nm_tcpdns_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(result != ISC_R_SUCCESS); isc__nmsocket_timer_stop(sock); - stop_reading(sock); + isc__nm_stop_reading(sock); if (!sock->recv_read) { goto destroy; @@ -734,7 +637,7 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { sock->recv_read = false; if (sock->recv_cb != NULL) { - isc__nm_uvreq_t *req = get_read_req(sock); + isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL); isc__nmsocket_clearcb(sock); isc__nm_readcb(sock, req, result); } @@ -749,67 +652,6 @@ destroy: } } -void -isc__nm_tcpdns_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { - failed_read_cb(sock, result); -} - -static void -failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, - isc_result_t eresult) { - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(VALID_UVREQ(req)); - - if (req->cb.send != NULL) { - isc__nm_sendcb(sock, req, eresult, true); - } else { - isc__nm_uvreq_put(&req, sock); - } -} - -static isc__nm_uvreq_t * -get_read_req(isc_nmsocket_t *sock) { - isc__nm_uvreq_t *req = NULL; - - req = isc__nm_uvreq_get(sock->mgr, sock); - req->cb.recv = sock->recv_cb; - req->cbarg = sock->recv_cbarg; - - if (atomic_load(&sock->client)) { - isc_nmhandle_attach(sock->statichandle, &req->handle); - } else { - req->handle = isc__nmhandle_get(sock, NULL, NULL); - } - - return (req); -} - -static void -start_reading(isc_nmsocket_t *sock) { - int r; - - if (sock->reading) { - return; - } - - r = uv_read_start(&sock->uv_handle.stream, tcpdns_alloc_cb, read_cb); - RUNTIME_CHECK(r == 0); - sock->reading = true; -} - -static void -stop_reading(isc_nmsocket_t *sock) { - int r; - - if (!sock->reading) { - return; - } - - r = uv_read_stop(&sock->uv_handle.stream); - RUNTIME_CHECK(r == 0); - sock->reading = false; -} - void isc__nm_tcpdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { REQUIRE(VALID_NMHANDLE(handle)); @@ -847,31 +689,6 @@ isc__nm_tcpdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { return; } -/*%< - * Allocator for TCP read operations. Limited to size 2^16. - * - * Note this doesn't actually allocate anything, it just assigns the - * worker's receive buffer to a socket, and marks it as "in use". - */ -static void -tcpdns_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) { - isc_nmsocket_t *sock = uv_handle_get_data(handle); - isc__networker_t *worker = NULL; - - UNUSED(size); - - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->type == isc_nm_tcpdnssocket); - REQUIRE(isc__nm_in_netthread()); - - worker = &sock->mgr->workers[sock->tid]; - INSIST(!worker->recvbuf_inuse); - - buf->base = worker->recvbuf; - buf->len = size; - worker->recvbuf_inuse = true; -} - void isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tcpdnsread_t *ievent = @@ -883,13 +700,13 @@ isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); - if (inactive(sock)) { + if (isc__nm_inactive(sock)) { sock->reading = true; - failed_read_cb(sock, ISC_R_CANCELED); + isc__nm_failed_read_cb(sock, ISC_R_CANCELED); return; } - process_sock_buffer(sock); + isc__nm_process_sock_buffer(sock); } /* @@ -901,8 +718,8 @@ isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0) { * * The caller will need to unreference the handle. */ -static isc_result_t -processbuffer(isc_nmsocket_t *sock) { +isc_result_t +isc__nm_tcpdns_processbuffer(isc_nmsocket_t *sock) { size_t len; isc__nm_uvreq_t *req = NULL; isc_nmhandle_t *handle = NULL; @@ -910,7 +727,7 @@ processbuffer(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); - if (inactive(sock)) { + if (isc__nm_inactive(sock)) { return (ISC_R_CANCELED); } @@ -931,7 +748,7 @@ processbuffer(isc_nmsocket_t *sock) { return (ISC_R_NOMORE); } - req = get_read_req(sock); + req = isc__nm_get_read_req(sock, NULL); REQUIRE(VALID_UVREQ(req)); /* @@ -975,8 +792,9 @@ processbuffer(isc_nmsocket_t *sock) { return (ISC_R_SUCCESS); } -static void -read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { +void +isc__nm_tcpdns_read_cb(uv_stream_t *stream, ssize_t nread, + const uv_buf_t *buf) { isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)stream); uint8_t *base = NULL; size_t len; @@ -986,8 +804,8 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { REQUIRE(sock->reading); REQUIRE(buf != NULL); - if (inactive(sock)) { - failed_read_cb(sock, ISC_R_CANCELED); + if (isc__nm_inactive(sock)) { + isc__nm_failed_read_cb(sock, ISC_R_CANCELED); goto free; } @@ -997,8 +815,7 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { sock->statsindex[STATID_RECVFAIL]); } - failed_read_cb(sock, isc__nm_uverr2result(nread)); - + isc__nm_failed_read_cb(sock, isc__nm_uverr2result(nread)); goto free; } @@ -1015,7 +832,7 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { */ if (sock->buf_len + len > sock->buf_size) { - alloc_dnsbuf(sock, sock->buf_len + len); + isc__nm_alloc_dnsbuf(sock, sock->buf_len + len); } memmove(sock->buf + sock->buf_len, base, len); sock->buf_len += len; @@ -1024,7 +841,7 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { sock->read_timeout = atomic_load(&sock->mgr->idle); } - process_sock_buffer(sock); + isc__nm_process_sock_buffer(sock); free: isc__nm_free_uvbuf(sock, buf); } @@ -1086,7 +903,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { REQUIRE(VALID_NMSOCK(ssock)); REQUIRE(ssock->tid == isc_nm_tid()); - if (inactive(ssock)) { + if (isc__nm_inactive(ssock)) { if (quota != NULL) { isc_quota_detach("a); } @@ -1168,7 +985,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { csock->read_timeout = atomic_load(&csock->mgr->init); - csock->closehandle_cb = resume_processing; + csock->closehandle_cb = isc__nm_resume_processing; /* * We need to keep the handle alive until we fail to read or connection @@ -1176,7 +993,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { * prep_destroy()->tcpdns_close_direct(). */ isc_nmhandle_attach(handle, &csock->recv_handle); - process_sock_buffer(csock); + isc__nm_process_sock_buffer(csock); /* * The initial timer has been set, update the read timeout for the next @@ -1199,7 +1016,7 @@ failure: atomic_store(&csock->active, false); - failed_accept_cb(csock, result); + isc__nm_failed_accept_cb(csock, result); isc__nmsocket_prep_destroy(csock); @@ -1247,7 +1064,8 @@ tcpdns_send_cb(uv_write_t *req, int status) { if (status < 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - failed_send_cb(sock, uvreq, isc__nm_uverr2result(status)); + isc__nm_failed_send_cb(sock, uvreq, + isc__nm_uverr2result(status)); return; } @@ -1279,7 +1097,7 @@ isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0) { UNUSED(worker); - if (inactive(sock)) { + if (isc__nm_inactive(sock)) { result = ISC_R_CANCELED; goto fail; } @@ -1321,7 +1139,7 @@ isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0) { fail: if (result != ISC_R_SUCCESS) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - failed_send_cb(sock, uvreq, result); + isc__nm_failed_send_cb(sock, uvreq, result); } } @@ -1452,7 +1270,7 @@ tcpdns_close_direct(isc_nmsocket_t *sock) { } isc__nmsocket_timer_stop(sock); - stop_reading(sock); + isc__nm_stop_reading(sock); uv_close((uv_handle_t *)&sock->timer, timer_close_cb); } @@ -1514,7 +1332,7 @@ isc__nm_tcpdns_shutdown(isc_nmsocket_t *sock) { } if (sock->statichandle) { - failed_read_cb(sock, ISC_R_CANCELED); + isc__nm_failed_read_cb(sock, ISC_R_CANCELED); return; } @@ -1554,7 +1372,7 @@ isc__nm_async_tcpdnscancel(isc__networker_t *worker, isc__netievent_t *ev0) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); - failed_read_cb(sock, ISC_R_EOF); + isc__nm_failed_read_cb(sock, ISC_R_EOF); } void @@ -1577,7 +1395,7 @@ isc_nm_tcpdns_sequential(isc_nmhandle_t *handle) { */ isc__nmsocket_timer_stop(sock); - stop_reading(sock); + isc__nm_stop_reading(sock); atomic_store(&sock->sequential, true); } @@ -1593,74 +1411,3 @@ isc_nm_tcpdns_keepalive(isc_nmhandle_t *handle, bool value) { atomic_store(&sock->keepalive, value); } - -/* - * Process a DNS message. - * - * If we only have an incomplete DNS message, we don't touch any - * timers. If we do have a full message, reset the timer. - * - * Stop reading if this is a client socket, or if the server socket - * has been set to sequential mode, or the number of queries we are - * processing simultaneously has reached the clients-per-connection - * limit. In this event we'll be called again by resume_processing() - * later. - */ -static void -process_sock_buffer(isc_nmsocket_t *sock) { - for (;;) { - int_fast32_t ah = atomic_load(&sock->ah); - isc_result_t result = processbuffer(sock); - switch (result) { - case ISC_R_NOMORE: - /* - * Don't reset the timer until we have a - * full DNS message. - */ - start_reading(sock); - /* Start the timer if there are no active handles */ - if (ah == 1) { - isc__nmsocket_timer_start(sock); - } - return; - case ISC_R_CANCELED: - isc__nmsocket_timer_stop(sock); - stop_reading(sock); - return; - case ISC_R_SUCCESS: - /* - * Stop the timer on the successful message read, this - * also allows to restart the timer when we have no more - * data. - */ - isc__nmsocket_timer_stop(sock); - - if (atomic_load(&sock->client) || - atomic_load(&sock->sequential) || - atomic_load(&sock->ah) >= TCPDNS_CLIENTS_PER_CONN) - { - stop_reading(sock); - return; - } - break; - default: - INSIST(0); - } - } -} - -static void -resume_processing(void *arg) { - isc_nmsocket_t *sock = (isc_nmsocket_t *)arg; - - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->tid == isc_nm_tid()); - REQUIRE(sock->type == isc_nm_tcpdnssocket); - REQUIRE(!atomic_load(&sock->client)); - - if (inactive(sock)) { - return; - } - - process_sock_buffer(sock); -} diff --git a/lib/isc/netmgr/tlsdns.c b/lib/isc/netmgr/tlsdns.c index 7e0d8fb539..0e79a110d1 100644 --- a/lib/isc/netmgr/tlsdns.c +++ b/lib/isc/netmgr/tlsdns.c @@ -35,9 +35,6 @@ #include "openssl_shim.h" #include "uv-compat.h" -#define TLS_BUF_SIZE 65536 - -#define TLSDNS_CLIENTS_PER_CONN 23 /*%< * * Maximum number of simultaneous handles in flight supported for a single @@ -50,12 +47,6 @@ static atomic_uint_fast32_t last_tlsdnsquota_log = ATOMIC_VAR_INIT(0); static void tls_error(isc_nmsocket_t *sock, isc_result_t result); -static void -tlsdns_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf); - -static void -resume_processing(void *arg); - static isc_result_t tlsdns_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req); @@ -70,9 +61,6 @@ tlsdns_connect_cb(uv_connect_t *uvreq, int status); static void tlsdns_connection_cb(uv_stream_t *server, int status); -static void -read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf); - static void tlsdns_close_cb(uv_handle_t *uvhandle); @@ -82,30 +70,11 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota); static void quota_accept_cb(isc_quota_t *quota, void *sock0); -static void -failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult); - -static void -failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, - isc_result_t eresult); - static void stop_tlsdns_parent(isc_nmsocket_t *sock); static void stop_tlsdns_child(isc_nmsocket_t *sock); -static void -start_reading(isc_nmsocket_t *sock); - -static void -stop_reading(isc_nmsocket_t *sock); - -static void -process_sock_buffer(isc_nmsocket_t *sock); - -static isc__nm_uvreq_t * -get_read_req(isc_nmsocket_t *sock); - static void async_tlsdns_cycle(isc_nmsocket_t *sock) __attribute__((unused)); @@ -125,78 +94,6 @@ can_log_tlsdns_quota(void) { return (false); } -static inline void -alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) { - REQUIRE(len <= NM_BIG_BUF); - - if (sock->buf == NULL) { - /* We don't have the buffer at all */ - size_t alloc_len = len < NM_REG_BUF ? NM_REG_BUF : NM_BIG_BUF; - sock->buf = isc_mem_allocate(sock->mgr->mctx, alloc_len); - sock->buf_size = alloc_len; - } else { - /* We have the buffer but it's too small */ - sock->buf = isc_mem_reallocate(sock->mgr->mctx, sock->buf, - NM_BIG_BUF); - sock->buf_size = NM_BIG_BUF; - } -} - -static bool -inactive(isc_nmsocket_t *sock) { - return (!isc__nmsocket_active(sock) || atomic_load(&sock->closing) || - atomic_load(&sock->mgr->closing) || - (sock->server != NULL && !isc__nmsocket_active(sock->server))); -} - -static void -failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) { - REQUIRE(sock->accepting); - REQUIRE(sock->server); - - /* - * Detach the quota early to make room for other connections; - * otherwise it'd be detached later asynchronously, and clog - * the quota unnecessarily. - */ - if (sock->quota != NULL) { - isc_quota_detach(&sock->quota); - } - - isc__nmsocket_detach(&sock->server); - - sock->accepting = false; - - switch (eresult) { - case ISC_R_NOTCONNECTED: - /* IGNORE: The client disconnected before we could accept */ - break; - default: - isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, - ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, - "Accepting TCP connection failed: %s", - isc_result_totext(eresult)); - } -} - -static void -failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, - isc_result_t eresult) { - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(VALID_UVREQ(req)); - REQUIRE(sock->tid == isc_nm_tid()); - REQUIRE(atomic_load(&sock->connecting)); - REQUIRE(req->cb.connect != NULL); - - atomic_store(&sock->connecting, false); - - isc__nmsocket_clearcb(sock); - - isc__nm_connectcb(sock, req, eresult); - - isc__nmsocket_prep_destroy(sock); -} - static isc_result_t tlsdns_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__networker_t *worker = NULL; @@ -341,12 +238,12 @@ tlsdns_connect_cb(uv_connect_t *uvreq, int status) { /* * */ - r = BIO_new_bio_pair(&sock->tls.ssl_wbio, TLS_BUF_SIZE, - &sock->tls.app_rbio, TLS_BUF_SIZE); + r = BIO_new_bio_pair(&sock->tls.ssl_wbio, ISC_NETMGR_TLSBUF_SIZE, + &sock->tls.app_rbio, ISC_NETMGR_TLSBUF_SIZE); RUNTIME_CHECK(r == 1); - r = BIO_new_bio_pair(&sock->tls.ssl_rbio, TLS_BUF_SIZE, - &sock->tls.app_wbio, TLS_BUF_SIZE); + r = BIO_new_bio_pair(&sock->tls.ssl_rbio, ISC_NETMGR_TLSBUF_SIZE, + &sock->tls.app_wbio, ISC_NETMGR_TLSBUF_SIZE); RUNTIME_CHECK(r == 1); #if HAVE_SSL_SET0_RBIO && HAVE_SSL_SET0_WBIO @@ -369,7 +266,7 @@ tlsdns_connect_cb(uv_connect_t *uvreq, int status) { sock->tls.pending_req = req; - process_sock_buffer(sock); + isc__nm_process_sock_buffer(sock); result = tls_cycle(sock); if (result != ISC_R_SUCCESS) { @@ -379,7 +276,7 @@ tlsdns_connect_cb(uv_connect_t *uvreq, int status) { return; error: - failed_connect_cb(sock, req, result); + isc__nm_failed_connect_cb(sock, req, result); } isc_result_t @@ -702,7 +599,7 @@ tlsdns_connection_cb(uv_stream_t *server, int status) { REQUIRE(VALID_NMSOCK(ssock)); REQUIRE(ssock->tid == isc_nm_tid()); - if (inactive(ssock)) { + if (isc__nm_inactive(ssock)) { result = ISC_R_CANCELED; goto done; } @@ -854,18 +751,18 @@ isc__nm_async_tlsdnsstop(isc__networker_t *worker, isc__netievent_t *ev0) { } } -static void -failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { +void +isc__nm_tlsdns_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(result != ISC_R_SUCCESS); isc__nmsocket_timer_stop(sock); - stop_reading(sock); + isc__nm_stop_reading(sock); if (sock->tls.pending_req) { isc__nm_uvreq_t *req = sock->tls.pending_req; sock->tls.pending_req = NULL; - failed_connect_cb(sock, req, ISC_R_CANCELED); + isc__nm_failed_connect_cb(sock, req, ISC_R_CANCELED); } if (!sock->recv_read) { @@ -874,7 +771,7 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { sock->recv_read = false; if (sock->recv_cb != NULL) { - isc__nm_uvreq_t *req = get_read_req(sock); + isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL); isc__nmsocket_clearcb(sock); isc__nm_readcb(sock, req, result); } @@ -889,67 +786,6 @@ destroy: } } -void -isc__nm_tlsdns_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { - failed_read_cb(sock, result); -} - -static void -failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, - isc_result_t eresult) { - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(VALID_UVREQ(req)); - - if (req->cb.send != NULL) { - isc__nm_sendcb(sock, req, eresult, true); - } else { - isc__nm_uvreq_put(&req, sock); - } -} - -static isc__nm_uvreq_t * -get_read_req(isc_nmsocket_t *sock) { - isc__nm_uvreq_t *req = NULL; - - req = isc__nm_uvreq_get(sock->mgr, sock); - req->cb.recv = sock->recv_cb; - req->cbarg = sock->recv_cbarg; - - if (atomic_load(&sock->client)) { - isc_nmhandle_attach(sock->statichandle, &req->handle); - } else { - req->handle = isc__nmhandle_get(sock, NULL, NULL); - } - - return (req); -} - -static void -start_reading(isc_nmsocket_t *sock) { - int r; - - if (sock->reading) { - return; - } - - r = uv_read_start(&sock->uv_handle.stream, tlsdns_alloc_cb, read_cb); - RUNTIME_CHECK(r == 0); - sock->reading = true; -} - -static void -stop_reading(isc_nmsocket_t *sock) { - int r; - - if (!sock->reading) { - return; - } - - r = uv_read_stop(&sock->uv_handle.stream); - RUNTIME_CHECK(r == 0); - sock->reading = false; -} - void isc__nm_tlsdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { REQUIRE(VALID_NMHANDLE(handle)); @@ -987,38 +823,6 @@ isc__nm_tlsdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { return; } -/*%< - * Allocator for TCP read operations. Limited to size 2^16. - * - * Note this doesn't actually allocate anything, it just assigns the - * worker's receive buffer to a socket, and marks it as "in use". - */ -static void -tlsdns_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) { - isc_nmsocket_t *sock = uv_handle_get_data(handle); - isc__networker_t *worker = NULL; - - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->type == isc_nm_tlsdnssocket); - REQUIRE(isc__nm_in_netthread()); - - /* - * We need to limit the individual chunks to be read, so the BIO_write() - * will always succeed and the consumed before the next readcb is - * called. - */ - if (size >= TLS_BUF_SIZE) { - size = TLS_BUF_SIZE; - } - - worker = &sock->mgr->workers[sock->tid]; - INSIST(!worker->recvbuf_inuse); - - buf->base = worker->recvbuf; - buf->len = size; - worker->recvbuf_inuse = true; -} - void isc__nm_async_tlsdnsread(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tlsdnsread_t *ievent = @@ -1031,15 +835,15 @@ isc__nm_async_tlsdnsread(isc__networker_t *worker, isc__netievent_t *ev0) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); - if (inactive(sock)) { + if (isc__nm_inactive(sock)) { sock->reading = true; - failed_read_cb(sock, ISC_R_CANCELED); + isc__nm_failed_read_cb(sock, ISC_R_CANCELED); return; } result = tls_cycle(sock); if (result != ISC_R_SUCCESS) { - failed_read_cb(sock, result); + isc__nm_failed_read_cb(sock, result); } } @@ -1052,8 +856,8 @@ isc__nm_async_tlsdnsread(isc__networker_t *worker, isc__netievent_t *ev0) { * * The caller will need to unreference the handle. */ -static isc_result_t -processbuffer(isc_nmsocket_t *sock) { +isc_result_t +isc__nm_tlsdns_processbuffer(isc_nmsocket_t *sock) { size_t len; isc__nm_uvreq_t *req = NULL; isc_nmhandle_t *handle = NULL; @@ -1061,7 +865,7 @@ processbuffer(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); - if (inactive(sock)) { + if (isc__nm_inactive(sock)) { return (ISC_R_CANCELED); } @@ -1082,7 +886,7 @@ processbuffer(isc_nmsocket_t *sock) { return (ISC_R_NOMORE); } - req = get_read_req(sock); + req = isc__nm_get_read_req(sock, NULL); REQUIRE(VALID_UVREQ(req)); /* @@ -1139,12 +943,13 @@ tls_cycle_input(isc_nmsocket_t *sock) { (void)SSL_peek(sock->tls.tls, &(char){ '\0' }, 0); int pending = SSL_pending(sock->tls.tls); - if (pending > TLS_BUF_SIZE) { - pending = TLS_BUF_SIZE; + if (pending > ISC_NETMGR_TLSBUF_SIZE) { + pending = ISC_NETMGR_TLSBUF_SIZE; } if ((sock->buf_len + pending) > sock->buf_size) { - alloc_dnsbuf(sock, sock->buf_len + pending); + isc__nm_alloc_dnsbuf(sock, + sock->buf_len + pending); } len = 0; @@ -1153,7 +958,7 @@ tls_cycle_input(isc_nmsocket_t *sock) { sock->buf_size - sock->buf_len, &len); if (rv != 1) { /* Process what's in the buffer so far */ - process_sock_buffer(sock); + isc__nm_process_sock_buffer(sock); /* FIXME: Should we call failed_read_cb()? */ break; @@ -1163,7 +968,7 @@ tls_cycle_input(isc_nmsocket_t *sock) { sock->buf_len += len; - process_sock_buffer(sock); + isc__nm_process_sock_buffer(sock); } } else if (!SSL_is_init_finished(sock->tls.tls)) { if (SSL_is_server(sock->tls.tls)) { @@ -1185,7 +990,7 @@ tls_cycle_input(isc_nmsocket_t *sock) { if (sock->tls.state == TLS_STATE_NONE && !SSL_is_init_finished(sock->tls.tls)) { sock->tls.state = TLS_STATE_HANDSHAKE; - process_sock_buffer(sock); + isc__nm_process_sock_buffer(sock); } /* else continue reading */ break; @@ -1240,7 +1045,7 @@ tls_error(isc_nmsocket_t *sock, isc_result_t result) { case TLS_STATE_HANDSHAKE: case TLS_STATE_IO: isc__nmsocket_timer_stop(sock); - stop_reading(sock); + isc__nm_stop_reading(sock); break; case TLS_STATE_ERROR: return; @@ -1304,8 +1109,8 @@ tls_cycle_output(isc_nmsocket_t *sock) { break; } - if (pending > TLS_BUF_SIZE) { - pending = TLS_BUF_SIZE; + if (pending > ISC_NETMGR_TLSBUF_SIZE) { + pending = ISC_NETMGR_TLSBUF_SIZE; } sock->tls.senddata.base = isc_mem_get(sock->mgr->mctx, pending); @@ -1436,8 +1241,9 @@ isc__nm_async_tlsdnscycle(isc__networker_t *worker, isc__netievent_t *ev0) { } } -static void -read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { +void +isc__nm_tlsdns_read_cb(uv_stream_t *stream, ssize_t nread, + const uv_buf_t *buf) { isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)stream); size_t len; isc_result_t result; @@ -1448,8 +1254,8 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { REQUIRE(sock->reading); REQUIRE(buf != NULL); - if (inactive(sock)) { - failed_read_cb(sock, ISC_R_CANCELED); + if (isc__nm_inactive(sock)) { + isc__nm_failed_read_cb(sock, ISC_R_CANCELED); goto free; } @@ -1459,7 +1265,7 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { sock->statsindex[STATID_RECVFAIL]); } - failed_read_cb(sock, isc__nm_uverr2result(nread)); + isc__nm_failed_read_cb(sock, isc__nm_uverr2result(nread)); goto free; } @@ -1474,13 +1280,13 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { rv = BIO_write_ex(sock->tls.app_wbio, buf->base, (size_t)nread, &len); if (rv <= 0 || (size_t)nread != len) { - failed_read_cb(sock, ISC_R_TLSERROR); + isc__nm_failed_read_cb(sock, ISC_R_TLSERROR); goto free; } result = tls_cycle(sock); if (result != ISC_R_SUCCESS) { - failed_read_cb(sock, result); + isc__nm_failed_read_cb(sock, result); } free: async_tlsdns_cycle(sock); @@ -1544,7 +1350,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { REQUIRE(VALID_NMSOCK(ssock)); REQUIRE(ssock->tid == isc_nm_tid()); - if (inactive(ssock)) { + if (isc__nm_inactive(ssock)) { if (quota != NULL) { isc_quota_detach("a); } @@ -1627,12 +1433,12 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { csock->tls.tls = isc_tls_create(ssock->tls.ctx); RUNTIME_CHECK(csock->tls.tls != NULL); - r = BIO_new_bio_pair(&csock->tls.ssl_wbio, TLS_BUF_SIZE, - &csock->tls.app_rbio, TLS_BUF_SIZE); + r = BIO_new_bio_pair(&csock->tls.ssl_wbio, ISC_NETMGR_TLSBUF_SIZE, + &csock->tls.app_rbio, ISC_NETMGR_TLSBUF_SIZE); RUNTIME_CHECK(r == 1); - r = BIO_new_bio_pair(&csock->tls.ssl_rbio, TLS_BUF_SIZE, - &csock->tls.app_wbio, TLS_BUF_SIZE); + r = BIO_new_bio_pair(&csock->tls.ssl_rbio, ISC_NETMGR_TLSBUF_SIZE, + &csock->tls.app_wbio, ISC_NETMGR_TLSBUF_SIZE); RUNTIME_CHECK(r == 1); #if HAVE_SSL_SET0_RBIO && HAVE_SSL_SET0_WBIO @@ -1658,7 +1464,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { csock->read_timeout = atomic_load(&csock->mgr->init); - csock->closehandle_cb = resume_processing; + csock->closehandle_cb = isc__nm_resume_processing; /* * We need to keep the handle alive until we fail to read or connection @@ -1677,7 +1483,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { isc_nmhandle_detach(&handle); - process_sock_buffer(csock); + isc__nm_process_sock_buffer(csock); /* * sock is now attached to the handle. @@ -1689,7 +1495,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { failure: atomic_store(&csock->active, false); - failed_accept_cb(csock, result); + isc__nm_failed_accept_cb(csock, result); isc__nmsocket_prep_destroy(csock); @@ -1745,7 +1551,7 @@ isc__nm_async_tlsdnssend(isc__networker_t *worker, isc__netievent_t *ev0) { result = tlsdns_send_direct(sock, uvreq); if (result != ISC_R_SUCCESS) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - failed_send_cb(sock, uvreq, result); + isc__nm_failed_send_cb(sock, uvreq, result); } } @@ -1776,7 +1582,7 @@ tlsdns_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { return (result); } - if (inactive(sock)) { + if (isc__nm_inactive(sock)) { return (ISC_R_CANCELED); } @@ -1970,7 +1776,7 @@ tlsdns_close_direct(isc_nmsocket_t *sock) { } isc__nmsocket_timer_stop(sock); - stop_reading(sock); + isc__nm_stop_reading(sock); uv_close((uv_handle_t *)&sock->timer, timer_close_cb); } @@ -2034,11 +1840,11 @@ isc__nm_tlsdns_shutdown(isc_nmsocket_t *sock) { if (sock->tls.pending_req) { isc__nm_uvreq_t *req = sock->tls.pending_req; sock->tls.pending_req = NULL; - failed_connect_cb(sock, req, ISC_R_CANCELED); + isc__nm_failed_connect_cb(sock, req, ISC_R_CANCELED); } if (sock->statichandle) { - failed_read_cb(sock, ISC_R_CANCELED); + isc__nm_failed_read_cb(sock, ISC_R_CANCELED); return; } @@ -2078,7 +1884,7 @@ isc__nm_async_tlsdnscancel(isc__networker_t *worker, isc__netievent_t *ev0) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); - failed_read_cb(sock, ISC_R_EOF); + isc__nm_failed_read_cb(sock, ISC_R_EOF); } void @@ -2101,7 +1907,7 @@ isc_nm_tlsdns_sequential(isc_nmhandle_t *handle) { */ isc__nmsocket_timer_stop(sock); - stop_reading(sock); + isc__nm_stop_reading(sock); atomic_store(&sock->sequential, true); } @@ -2117,74 +1923,3 @@ isc_nm_tlsdns_keepalive(isc_nmhandle_t *handle, bool value) { atomic_store(&sock->keepalive, value); } - -/* - * Process a DNS message. - * - * If we only have an incomplete DNS message, we don't touch any - * timers. If we do have a full message, reset the timer. - * - * Stop reading if this is a client socket, or if the server socket - * has been set to sequential mode, or the number of queries we are - * processing simultaneously has reached the clients-per-connection - * limit. In this event we'll be called again by resume_processing() - * later. - */ -static void -process_sock_buffer(isc_nmsocket_t *sock) { - for (;;) { - int_fast32_t ah = atomic_load(&sock->ah); - isc_result_t result = processbuffer(sock); - switch (result) { - case ISC_R_NOMORE: - /* - * Don't reset the timer until we have a - * full DNS message. - */ - start_reading(sock); - /* Start the timer if there are no active handles */ - if (ah == 1) { - isc__nmsocket_timer_start(sock); - } - return; - case ISC_R_CANCELED: - isc__nmsocket_timer_stop(sock); - stop_reading(sock); - return; - case ISC_R_SUCCESS: - /* - * Stop the timer on the successful message read, this - * also allows to restart the timer when we have no more - * data. - */ - isc__nmsocket_timer_stop(sock); - - if (atomic_load(&sock->client) || - atomic_load(&sock->sequential) || - atomic_load(&sock->ah) >= TLSDNS_CLIENTS_PER_CONN) - { - stop_reading(sock); - return; - } - break; - default: - INSIST(0); - } - } -} - -static void -resume_processing(void *arg) { - isc_nmsocket_t *sock = (isc_nmsocket_t *)arg; - - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->tid == isc_nm_tid()); - REQUIRE(sock->type == isc_nm_tlsdnssocket); - REQUIRE(!atomic_load(&sock->client)); - - if (inactive(sock)) { - return; - } - - process_sock_buffer(sock); -} diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index dea110dd20..869d7d301b 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -50,33 +50,11 @@ timer_close_cb(uv_handle_t *handle); static void udp_close_direct(isc_nmsocket_t *sock); -static void -failed_read_cb(isc_nmsocket_t *sock, isc_result_t result); - -static void -failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, - isc_result_t eresult); - static void stop_udp_parent(isc_nmsocket_t *sock); static void stop_udp_child(isc_nmsocket_t *sock); -static void -start_reading(isc_nmsocket_t *sock); -static void -stop_reading(isc_nmsocket_t *sock); - -static isc__nm_uvreq_t * -get_read_req(isc_nmsocket_t *sock, isc_sockaddr_t *sockaddr); - -static bool -inactive(isc_nmsocket_t *sock) { - return (!isc__nmsocket_active(sock) || - atomic_load(&sock->mgr->closing) || - (sock->server != NULL && !isc__nmsocket_active(sock->server))); -} - static uv_os_sock_t isc__nm_udp_lb_socket(sa_family_t sa_family) { isc_result_t result; @@ -192,32 +170,6 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, return (result); } -/*%< - * Allocator for UDP recv operations. Limited to size 20 * (2^16 + 2), - * which allows enough space for recvmmsg() to get multiple messages at - * a time. - * - * Note this doesn't actually allocate anything, it just assigns the - * worker's receive buffer to a socket, and marks it as "in use". - */ -static void -udp_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) { - isc_nmsocket_t *sock = uv_handle_get_data(handle); - isc__networker_t *worker = NULL; - - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->type == isc_nm_udpsocket); - REQUIRE(isc__nm_in_netthread()); - REQUIRE(size <= ISC_NETMGR_RECVBUF_SIZE); - - worker = &sock->mgr->workers[sock->tid]; - INSIST(!worker->recvbuf_inuse); - - buf->base = worker->recvbuf; - buf->len = ISC_NETMGR_RECVBUF_SIZE; - worker->recvbuf_inuse = true; -} - /* * Asynchronous 'udplisten' call handler: start listening on a UDP socket. */ @@ -306,7 +258,8 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) { uv_send_buffer_size(&sock->uv_handle.handle, &(int){ ISC_SEND_BUFFER_SIZE }); #endif - r = uv_udp_recv_start(&sock->uv_handle.udp, udp_alloc_cb, udp_recv_cb); + r = uv_udp_recv_start(&sock->uv_handle.udp, isc__nm_alloc_cb, + udp_recv_cb); if (r != 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); goto done; @@ -430,7 +383,7 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, * we can free the buffer and bail. */ if (addr == NULL) { - failed_read_cb(sock, ISC_R_EOF); + isc__nm_failed_read_cb(sock, ISC_R_EOF); goto free; } @@ -438,19 +391,19 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, * - If the socket is no longer active. */ if (!isc__nmsocket_active(sock)) { - failed_read_cb(sock, ISC_R_CANCELED); + isc__nm_failed_read_cb(sock, ISC_R_CANCELED); goto free; } if (nrecv < 0) { - failed_read_cb(sock, isc__nm_uverr2result(nrecv)); + isc__nm_failed_read_cb(sock, isc__nm_uverr2result(nrecv)); goto free; } result = isc_sockaddr_fromsockaddr(&sockaddr, addr); RUNTIME_CHECK(result == ISC_R_SUCCESS); - req = get_read_req(sock, &sockaddr); + req = isc__nm_get_read_req(sock, &sockaddr); /* * The callback will be called synchronously, because result is @@ -570,15 +523,15 @@ isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0) { REQUIRE(sock->tid == isc_nm_tid()); UNUSED(worker); - if (inactive(sock)) { - failed_send_cb(sock, uvreq, ISC_R_CANCELED); + if (isc__nm_inactive(sock)) { + isc__nm_failed_send_cb(sock, uvreq, ISC_R_CANCELED); return; } result = udp_send_direct(sock, uvreq, &ievent->peer); if (result != ISC_R_SUCCESS) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - failed_send_cb(sock, uvreq, result); + isc__nm_failed_send_cb(sock, uvreq, result); } } @@ -614,7 +567,7 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->type == isc_nm_udpsocket); - if (inactive(sock)) { + if (isc__nm_inactive(sock)) { return (ISC_R_CANCELED); } @@ -839,9 +792,9 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, return (result); } -static void -udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, - const struct sockaddr *addr, unsigned flags) { +void +isc__nm_udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, + const struct sockaddr *addr, unsigned flags) { isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); REQUIRE(VALID_NMSOCK(sock)); @@ -854,17 +807,17 @@ udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, * does not. */ if (!sock->parent) { - stop_reading(sock); + isc__nm_stop_reading(sock); } } -static void -failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { +void +isc__nm_udp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(result != ISC_R_SUCCESS); if (atomic_load(&sock->client)) { - stop_reading(sock); + isc__nm_stop_reading(sock); if (!sock->recv_read) { goto destroy; @@ -872,7 +825,7 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { sock->recv_read = false; if (sock->recv_cb != NULL) { - isc__nm_uvreq_t *req = get_read_req(sock, NULL); + isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL); isc__nmsocket_clearcb(sock); isc__nm_readcb(sock, req, result); } @@ -895,46 +848,11 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { sock->recv_read = false; if (sock->recv_cb != NULL) { - isc__nm_uvreq_t *req = get_read_req(sock, NULL); + isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL); isc__nm_readcb(sock, req, result); } } -void -isc__nm_udp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { - failed_read_cb(sock, result); -} - -static void -failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, - isc_result_t eresult) { - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(VALID_UVREQ(req)); - - if (req->cb.send != NULL) { - isc__nm_sendcb(sock, req, eresult, true); - } else { - isc__nm_uvreq_put(&req, sock); - } -} - -static isc__nm_uvreq_t * -get_read_req(isc_nmsocket_t *sock, isc_sockaddr_t *sockaddr) { - isc__nm_uvreq_t *req = NULL; - - req = isc__nm_uvreq_get(sock->mgr, sock); - req->cb.recv = sock->recv_cb; - req->cbarg = sock->recv_cbarg; - - if (atomic_load(&sock->client)) { - isc_nmhandle_attach(sock->statichandle, &req->handle); - } else { - req->handle = isc__nmhandle_get(sock, sockaddr, NULL); - } - - return req; -} - /* * Asynchronous 'udpread' call handler: start or resume reading on a * socket; pause reading and call the 'recv' callback after each @@ -950,41 +868,16 @@ isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); - if (inactive(sock)) { + if (isc__nm_inactive(sock)) { sock->reading = true; - failed_read_cb(sock, ISC_R_CANCELED); + isc__nm_failed_read_cb(sock, ISC_R_CANCELED); return; } - start_reading(sock); + isc__nm_start_reading(sock); isc__nmsocket_timer_start(sock); } -static void -start_reading(isc_nmsocket_t *sock) { - if (sock->reading) { - return; - } - - int r = uv_udp_recv_start(&sock->uv_handle.udp, udp_alloc_cb, - udp_read_cb); - REQUIRE(r == 0); - sock->reading = true; -} - -static void -stop_reading(isc_nmsocket_t *sock) { - if (!sock->reading) { - return; - } - - int r = uv_udp_recv_stop(&sock->uv_handle.udp); - REQUIRE(r == 0); - sock->reading = false; - - isc__nmsocket_timer_stop(sock); -} - void isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { REQUIRE(VALID_NMHANDLE(handle)); @@ -1192,7 +1085,7 @@ isc__nm_udp_shutdown(isc_nmsocket_t *sock) { * interested in the callback. */ if (sock->statichandle) { - failed_read_cb(sock, ISC_R_CANCELED); + isc__nm_failed_read_cb(sock, ISC_R_CANCELED); return; } @@ -1236,5 +1129,5 @@ isc__nm_async_udpcancel(isc__networker_t *worker, isc__netievent_t *ev0) { REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(atomic_load(&sock->client)); - failed_read_cb(sock, ISC_R_EOF); + isc__nm_failed_read_cb(sock, ISC_R_EOF); }