diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 89cd470ac6..6ebccb4852 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -901,6 +901,7 @@ msvc:windows:amd64: "with-vcredist=C:/Program Files (x86)/Microsoft Visual Studio/2017/BuildTools/VC/Redist/MSVC/14.16.27012/vcredist_x64.exe" "with-openssl=C:/OpenSSL" "with-libxml2=C:/libxml2" + "with-libuv=C:/libuv" "without-python" "with-system-tests" x64' diff --git a/configure b/configure index fbbc4b6ec8..1794f5113a 100755 --- a/configure +++ b/configure @@ -747,6 +747,8 @@ OPENSSL_LIBS OPENSSL_CFLAGS INSTALL_LIBRARY ALWAYS_DEFINES +LIBUV_LIBS +LIBUV_CFLAGS PTHREAD_CFLAGS PTHREAD_LIBS PTHREAD_CC @@ -968,6 +970,8 @@ PKG_CONFIG_LIBDIR MAXMINDDB_CFLAGS MAXMINDDB_LIBS MAXMINDDB_PREFIX +LIBUV_CFLAGS +LIBUV_LIBS OPENSSL_CFLAGS OPENSSL_LIBS LIBXML2_CFLAGS @@ -1738,6 +1742,9 @@ Some influential environment variables: linker flags for MAXMINDDB, overriding pkg-config MAXMINDDB_PREFIX value of prefix for libmaxminddb, overriding pkg-config + LIBUV_CFLAGS + C compiler flags for LIBUV, overriding pkg-config + LIBUV_LIBS linker flags for LIBUV, overriding pkg-config OPENSSL_CFLAGS C compiler flags for OPENSSL, overriding pkg-config OPENSSL_LIBS @@ -15780,6 +15787,154 @@ fi done +# libuv +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for libuv" >&5 +$as_echo_n "checking for libuv... " >&6; } + +pkg_failed=no +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for libuv >= 1.0.0" >&5 +$as_echo_n "checking for libuv >= 1.0.0... " >&6; } + +if test -n "$LIBUV_CFLAGS"; then + pkg_cv_LIBUV_CFLAGS="$LIBUV_CFLAGS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { $as_echo "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"libuv >= 1.0.0\""; } >&5 + ($PKG_CONFIG --exists --print-errors "libuv >= 1.0.0") 2>&5 + ac_status=$? + $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_LIBUV_CFLAGS=`$PKG_CONFIG --cflags "libuv >= 1.0.0" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi +if test -n "$LIBUV_LIBS"; then + pkg_cv_LIBUV_LIBS="$LIBUV_LIBS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { $as_echo "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"libuv >= 1.0.0\""; } >&5 + ($PKG_CONFIG --exists --print-errors "libuv >= 1.0.0") 2>&5 + ac_status=$? + $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_LIBUV_LIBS=`$PKG_CONFIG --libs "libuv >= 1.0.0" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi + + + +if test $pkg_failed = yes; then + { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5 +$as_echo "no" >&6; } + +if $PKG_CONFIG --atleast-pkgconfig-version 0.20; then + _pkg_short_errors_supported=yes +else + _pkg_short_errors_supported=no +fi + if test $_pkg_short_errors_supported = yes; then + LIBUV_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors --cflags --libs "libuv >= 1.0.0" 2>&1` + else + LIBUV_PKG_ERRORS=`$PKG_CONFIG --print-errors --cflags --libs "libuv >= 1.0.0" 2>&1` + fi + # Put the nasty error message in config.log where it belongs + echo "$LIBUV_PKG_ERRORS" >&5 + + as_fn_error $? "libuv not found" "$LINENO" 5 +elif test $pkg_failed = untried; then + { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5 +$as_echo "no" >&6; } + as_fn_error $? "libuv not found" "$LINENO" 5 +else + LIBUV_CFLAGS=$pkg_cv_LIBUV_CFLAGS + LIBUV_LIBS=$pkg_cv_LIBUV_LIBS + { $as_echo "$as_me:${as_lineno-$LINENO}: result: yes" >&5 +$as_echo "yes" >&6; } + +fi + + + CCASFLAGS_libuv_ax_save_flags=$CCASFLAGS + + + + CFLAGS_libuv_ax_save_flags=$CFLAGS + + + + CPPFLAGS_libuv_ax_save_flags=$CPPFLAGS + + + + CXXFLAGS_libuv_ax_save_flags=$CXXFLAGS + + + + ERLCFLAGS_libuv_ax_save_flags=$ERLCFLAGS + + + + FCFLAGS_libuv_ax_save_flags=$FCFLAGS + + + + FCLIBS_libuv_ax_save_flags=$FCLIBS + + + + FFLAGS_libuv_ax_save_flags=$FFLAGS + + + + FLIBS_libuv_ax_save_flags=$FLIBS + + + + GCJFLAGS_libuv_ax_save_flags=$GCJFLAGS + + + + JAVACFLAGS_libuv_ax_save_flags=$JAVACFLAGS + + + + LDFLAGS_libuv_ax_save_flags=$LDFLAGS + + + + LIBS_libuv_ax_save_flags=$LIBS + + + + OBJCFLAGS_libuv_ax_save_flags=$OBJCFLAGS + + + + OBJCXXFLAGS_libuv_ax_save_flags=$OBJCXXFLAGS + + + + UPCFLAGS_libuv_ax_save_flags=$UPCFLAGS + + + + VALAFLAGS_libuv_ax_save_flags=$VALAFLAGS + + + + +CFLAGS="$CFLAGS $LIBUV_CFLAGS" +LIBS="$LIBS $LIBUV_LIBS" + # # flockfile is usually provided by pthreads # @@ -18161,7 +18316,7 @@ fi ;; #( esac if test "$GCC" = "yes"; then : - STD_CWARNINGS="$STD_CWARNINGS -W -Wall -Wmissing-prototypes -Wcast-qual -Wwrite-strings -Wformat -Wpointer-arith" + STD_CWARNINGS="$STD_CWARNINGS -W -Wall -Wmissing-prototypes -Wcast-qual -Wwrite-strings -Wformat -Wpointer-arith -Wno-missing-field-initializers" fi @@ -23331,7 +23486,7 @@ ac_config_commands="$ac_config_commands chmod" # elsewhere if there's a good reason for doing so. # -ac_config_files="$ac_config_files make/Makefile make/mkdep Makefile bin/Makefile bin/check/Makefile bin/confgen/Makefile bin/confgen/unix/Makefile bin/delv/Makefile bin/dig/Makefile bin/dnssec/Makefile bin/named/Makefile bin/named/unix/Makefile bin/nsupdate/Makefile bin/pkcs11/Makefile bin/plugins/Makefile bin/python/Makefile bin/python/isc/Makefile bin/python/isc/utils.py bin/python/isc/tests/Makefile bin/python/dnssec-checkds.py bin/python/dnssec-coverage.py bin/python/dnssec-keymgr.py bin/python/isc/__init__.py bin/python/isc/checkds.py bin/python/isc/coverage.py bin/python/isc/dnskey.py bin/python/isc/eventlist.py bin/python/isc/keydict.py bin/python/isc/keyevent.py bin/python/isc/keymgr.py bin/python/isc/keyseries.py bin/python/isc/keyzone.py bin/python/isc/policy.py bin/python/isc/rndc.py bin/python/isc/tests/dnskey_test.py bin/python/isc/tests/policy_test.py bin/rndc/Makefile bin/tests/Makefile bin/tests/headerdep_test.sh bin/tests/optional/Makefile bin/tests/pkcs11/Makefile bin/tests/pkcs11/benchmarks/Makefile bin/tests/system/Makefile bin/tests/system/conf.sh bin/tests/system/dlzexternal/Makefile bin/tests/system/dlzexternal/ns1/dlzs.conf bin/tests/system/dyndb/Makefile bin/tests/system/dyndb/driver/Makefile bin/tests/system/pipelined/Makefile bin/tests/system/rndc/Makefile bin/tests/system/rpz/Makefile bin/tests/system/rsabigexponent/Makefile bin/tests/system/tkey/Makefile bin/tools/Makefile contrib/scripts/check-secure-delegation.pl contrib/scripts/zone-edit.sh doc/Makefile doc/arm/Makefile doc/arm/noteversion.xml doc/arm/pkgversion.xml doc/arm/releaseinfo.xml doc/doxygen/Doxyfile doc/doxygen/Makefile doc/doxygen/doxygen-input-filter doc/misc/Makefile doc/tex/Makefile doc/tex/armstyle.sty doc/xsl/Makefile doc/xsl/isc-docbook-chunk.xsl doc/xsl/isc-docbook-html.xsl doc/xsl/isc-manpage.xsl doc/xsl/isc-notes-html.xsl lib/Makefile lib/bind9/Makefile lib/bind9/include/Makefile lib/bind9/include/bind9/Makefile lib/dns/Makefile lib/dns/include/Makefile lib/dns/include/dns/Makefile lib/dns/include/dst/Makefile lib/dns/tests/Makefile lib/irs/Makefile lib/irs/include/Makefile lib/irs/include/irs/Makefile lib/irs/include/irs/netdb.h lib/irs/include/irs/platform.h lib/irs/tests/Makefile lib/isc/pthreads/Makefile lib/isc/pthreads/include/Makefile lib/isc/pthreads/include/isc/Makefile lib/isc/Makefile lib/isc/include/Makefile lib/isc/include/isc/Makefile lib/isc/include/isc/platform.h lib/isc/include/pk11/Makefile lib/isc/include/pkcs11/Makefile lib/isc/tests/Makefile lib/isc/unix/Makefile lib/isc/unix/include/Makefile lib/isc/unix/include/isc/Makefile lib/isccc/Makefile lib/isccc/include/Makefile lib/isccc/include/isccc/Makefile lib/isccc/tests/Makefile lib/isccfg/Makefile lib/isccfg/include/Makefile lib/isccfg/include/isccfg/Makefile lib/isccfg/tests/Makefile lib/ns/Makefile lib/ns/include/Makefile lib/ns/include/ns/Makefile lib/ns/tests/Makefile lib/samples/Makefile lib/samples/Makefile-postinstall unit/unittest.sh fuzz/Makefile" +ac_config_files="$ac_config_files make/Makefile make/mkdep Makefile bin/Makefile bin/check/Makefile bin/confgen/Makefile bin/confgen/unix/Makefile bin/delv/Makefile bin/dig/Makefile bin/dnssec/Makefile bin/named/Makefile bin/named/unix/Makefile bin/nsupdate/Makefile bin/pkcs11/Makefile bin/plugins/Makefile bin/python/Makefile bin/python/isc/Makefile bin/python/isc/utils.py bin/python/isc/tests/Makefile bin/python/dnssec-checkds.py bin/python/dnssec-coverage.py bin/python/dnssec-keymgr.py bin/python/isc/__init__.py bin/python/isc/checkds.py bin/python/isc/coverage.py bin/python/isc/dnskey.py bin/python/isc/eventlist.py bin/python/isc/keydict.py bin/python/isc/keyevent.py bin/python/isc/keymgr.py bin/python/isc/keyseries.py bin/python/isc/keyzone.py bin/python/isc/policy.py bin/python/isc/rndc.py bin/python/isc/tests/dnskey_test.py bin/python/isc/tests/policy_test.py bin/rndc/Makefile bin/tests/Makefile bin/tests/headerdep_test.sh bin/tests/optional/Makefile bin/tests/pkcs11/Makefile bin/tests/pkcs11/benchmarks/Makefile bin/tests/system/Makefile bin/tests/system/conf.sh bin/tests/system/dlzexternal/Makefile bin/tests/system/dlzexternal/ns1/dlzs.conf bin/tests/system/dyndb/Makefile bin/tests/system/dyndb/driver/Makefile bin/tests/system/pipelined/Makefile bin/tests/system/rndc/Makefile bin/tests/system/rpz/Makefile bin/tests/system/rsabigexponent/Makefile bin/tests/system/tkey/Makefile bin/tools/Makefile contrib/scripts/check-secure-delegation.pl contrib/scripts/zone-edit.sh doc/Makefile doc/arm/Makefile doc/arm/noteversion.xml doc/arm/pkgversion.xml doc/arm/releaseinfo.xml doc/doxygen/Doxyfile doc/doxygen/Makefile doc/doxygen/doxygen-input-filter doc/misc/Makefile doc/tex/Makefile doc/tex/armstyle.sty doc/xsl/Makefile doc/xsl/isc-docbook-chunk.xsl doc/xsl/isc-docbook-html.xsl doc/xsl/isc-manpage.xsl doc/xsl/isc-notes-html.xsl lib/Makefile lib/bind9/Makefile lib/bind9/include/Makefile lib/bind9/include/bind9/Makefile lib/dns/Makefile lib/dns/include/Makefile lib/dns/include/dns/Makefile lib/dns/include/dst/Makefile lib/dns/tests/Makefile lib/irs/Makefile lib/irs/include/Makefile lib/irs/include/irs/Makefile lib/irs/include/irs/netdb.h lib/irs/include/irs/platform.h lib/irs/tests/Makefile lib/isc/pthreads/Makefile lib/isc/pthreads/include/Makefile lib/isc/pthreads/include/isc/Makefile lib/isc/Makefile lib/isc/include/Makefile lib/isc/include/isc/Makefile lib/isc/include/isc/platform.h lib/isc/include/pk11/Makefile lib/isc/include/pkcs11/Makefile lib/isc/netmgr/Makefile lib/isc/tests/Makefile lib/isc/unix/Makefile lib/isc/unix/include/Makefile lib/isc/unix/include/isc/Makefile lib/isccc/Makefile lib/isccc/include/Makefile lib/isccc/include/isccc/Makefile lib/isccc/tests/Makefile lib/isccfg/Makefile lib/isccfg/include/Makefile lib/isccfg/include/isccfg/Makefile lib/isccfg/tests/Makefile lib/ns/Makefile lib/ns/include/Makefile lib/ns/include/ns/Makefile lib/ns/tests/Makefile lib/samples/Makefile lib/samples/Makefile-postinstall unit/unittest.sh fuzz/Makefile" # @@ -24431,6 +24586,7 @@ do "lib/isc/include/isc/platform.h") CONFIG_FILES="$CONFIG_FILES lib/isc/include/isc/platform.h" ;; "lib/isc/include/pk11/Makefile") CONFIG_FILES="$CONFIG_FILES lib/isc/include/pk11/Makefile" ;; "lib/isc/include/pkcs11/Makefile") CONFIG_FILES="$CONFIG_FILES lib/isc/include/pkcs11/Makefile" ;; + "lib/isc/netmgr/Makefile") CONFIG_FILES="$CONFIG_FILES lib/isc/netmgr/Makefile" ;; "lib/isc/tests/Makefile") CONFIG_FILES="$CONFIG_FILES lib/isc/tests/Makefile" ;; "lib/isc/unix/Makefile") CONFIG_FILES="$CONFIG_FILES lib/isc/unix/Makefile" ;; "lib/isc/unix/include/Makefile") CONFIG_FILES="$CONFIG_FILES lib/isc/unix/include/Makefile" ;; diff --git a/configure.ac b/configure.ac index dfdecb192a..f657d81562 100644 --- a/configure.ac +++ b/configure.ac @@ -641,6 +641,15 @@ AC_CHECK_FUNCS([pthread_setaffinity_np cpuset_setaffinity processor_bind sched_s AC_CHECK_FUNCS([pthread_setname_np pthread_set_name_np]) AC_CHECK_HEADERS([pthread_np.h], [], [], [#include ]) +# libuv +AC_MSG_CHECKING(for libuv) +PKG_CHECK_MODULES([LIBUV], [libuv >= 1.0.0], [], + [AC_MSG_ERROR([libuv not found])]) +AX_SAVE_FLAGS([libuv]) + +CFLAGS="$CFLAGS $LIBUV_CFLAGS" +LIBS="$LIBS $LIBUV_LIBS" + # # flockfile is usually provided by pthreads # @@ -1321,7 +1330,7 @@ AS_CASE([$host], [MKDEPCFLAGS="-xM"])]) AS_IF([test "$GCC" = "yes"], - [STD_CWARNINGS="$STD_CWARNINGS -W -Wall -Wmissing-prototypes -Wcast-qual -Wwrite-strings -Wformat -Wpointer-arith"] + [STD_CWARNINGS="$STD_CWARNINGS -W -Wall -Wmissing-prototypes -Wcast-qual -Wwrite-strings -Wformat -Wpointer-arith -Wno-missing-field-initializers"] ) AX_CHECK_COMPILE_FLAG([-fno-strict-aliasing], @@ -2819,6 +2828,7 @@ AC_CONFIG_FILES([ lib/isc/include/isc/platform.h lib/isc/include/pk11/Makefile lib/isc/include/pkcs11/Makefile + lib/isc/netmgr/Makefile lib/isc/tests/Makefile lib/isc/unix/Makefile lib/isc/unix/include/Makefile diff --git a/lib/isc/Makefile.in b/lib/isc/Makefile.in index 1f0066a60d..906ebd3e69 100644 --- a/lib/isc/Makefile.in +++ b/lib/isc/Makefile.in @@ -54,6 +54,8 @@ OBJS = pk11.@O@ pk11_result.@O@ \ hmac.@O@ hp.@O@ httpd.@O@ iterated_hash.@O@ \ lex.@O@ lfsr.@O@ lib.@O@ log.@O@ \ md.@O@ mem.@O@ mutexblock.@O@ \ + netmgr/netmgr.@O@ netmgr/tcp.@O@ netmgr/udp.@O@ \ + netmgr/tcpdns.@O@ netmgr/uverr2result.@O@ \ netaddr.@O@ netscope.@O@ nonce.@O@ openssl_shim.@O@ pool.@O@ \ parseint.@O@ portset.@O@ queue.@O@ quota.@O@ \ radix.@O@ random.@O@ ratelimiter.@O@ \ @@ -86,7 +88,7 @@ LIBS = ${OPENSSL_LIBS} @LIBS@ # Attempt to disable parallel processing. .NOTPARALLEL: .NO_PARALLEL: -SUBDIRS = include unix pthreads +SUBDIRS = include netmgr unix pthreads TARGETS = timestamp TESTDIRS = @UNITTESTS@ diff --git a/lib/isc/include/isc/netmgr.h b/lib/isc/include/isc/netmgr.h new file mode 100644 index 0000000000..e68bed759c --- /dev/null +++ b/lib/isc/include/isc/netmgr.h @@ -0,0 +1,284 @@ +/* + * Copyright (C) Internet Systems Consortium, Inc. ("ISC") + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * See the COPYRIGHT file distributed with this work for additional + * information regarding copyright ownership. + */ + +#pragma once + +#include + +#include +#include +#include + +typedef enum { + NMEV_READ, + NMEV_WRITE, + NMEV_ACCEPT, + NMEV_CONNECTED, + NMEV_CANCELLED, + NMEV_SHUTDOWN +} isc_nm_eventtype; + +isc_nm_t * +isc_nm_start(isc_mem_t *mctx, uint32_t workers); +/*%< + * Creates a new network manager with 'workers' worker threads, + * and starts it running. + */ + +void +isc_nm_attach(isc_nm_t *mgr, isc_nm_t **dst); +void +isc_nm_detach(isc_nm_t **mgr0); +void +isc_nm_destroy(isc_nm_t **mgr0); +/*%< + * Attach/detach a network manager. When all references have been + * released, the network manager is shut down, freeing all resources. + * Destroy is working the same way as detach, but it actively waits + * for all other references to be gone. + */ + +/* Return thread id of current thread, or ISC_NETMGR_TID_UNKNOWN */ +int +isc_nm_tid(void); + +/* + * isc_nm_freehandle frees a handle, releasing resources + */ +void +isc_nm_freehandle(isc_nmhandle_t *handle); + +void +isc_nmsocket_attach(isc_nmsocket_t *sock, isc_nmsocket_t **target); +/*%< + * isc_nmsocket_attach attaches to a socket, increasing refcount + */ + +void +isc_nmsocket_close(isc_nmsocket_t *sock); + +void +isc_nmsocket_detach(isc_nmsocket_t **socketp); +/*%< + * isc_nmsocket_detach detaches from socket, decreasing refcount + * and possibly destroying the socket if it's no longer referenced. + */ + +void +isc_nmhandle_ref(isc_nmhandle_t *handle); +void +isc_nmhandle_unref(isc_nmhandle_t *handle); +/*%< + * Increment/decrement the reference counter in a netmgr handle, + * but (unlike the attach/detach functions) do not change the pointer + * value. If reference counters drop to zero, the handle can be + * marked inactive, possibly triggering deletion of its associated + * socket. + * + * (This will be used to prevent a client from being cleaned up when + * it's passed to an isc_task event handler. The libuv code would not + * otherwise know that the handle was in use and might free it, along + * with the client.) + */ + +void * +isc_nmhandle_getdata(isc_nmhandle_t *handle); + +void * +isc_nmhandle_getextra(isc_nmhandle_t *handle); + +typedef void (*isc_nm_opaquecb)(void *arg); + +bool +isc_nmhandle_is_stream(isc_nmhandle_t *handle); + +/* + * isc_nmhandle_t has a void * opaque field (usually - ns_client_t). + * We reuse handle and `opaque` can also be reused between calls. + * This function sets this field and two callbacks: + * - doreset resets the `opaque` to initial state + * - dofree frees everything associated with `opaque` + */ +void +isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg, + isc_nm_opaquecb doreset, isc_nm_opaquecb dofree); + +isc_sockaddr_t +isc_nmhandle_peeraddr(isc_nmhandle_t *handle); +isc_sockaddr_t +isc_nmhandle_localaddr(isc_nmhandle_t *handle); + +typedef void (*isc_nm_recv_cb_t)(isc_nmhandle_t *handle, isc_region_t *region, + void *cbarg); +/*%< + * Callback function to be used when receiving a packet. + * + * 'handle' the handle that can be used to send back the answer. + * 'region' contains the received data. It will be freed after + * return by caller. + * 'cbarg' the callback argument passed to isc_nm_listenudp(), + * isc_nm_listentcpdns(), or isc_nm_read(). + */ + +typedef void (*isc_nm_cb_t)(isc_nmhandle_t *handle, isc_result_t result, + void *cbarg); +/*%< + * Callback function for other network completion events (send, connect, + * accept). + * + * 'handle' the handle on which the event took place. + * 'result' the result of the event. + * 'cbarg' the callback argument passed to isc_nm_send(), + * isc_nm_tcp_connect(), or isc_nm_listentcp() + */ + +isc_result_t +isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, + isc_nm_recv_cb_t cb, void *cbarg, + size_t extrasize, isc_nmsocket_t **sockp); +/*%< + * Start listening for UDP packets on interface 'iface' using net manager + * 'mgr'. + * + * On success, 'sockp' will be updated to contain a new listening UDP socket. + * + * When a packet is received on the socket, 'cb' will be called with 'cbarg' + * as its argument. + * + * When handles are allocated for the socket, 'extrasize' additional bytes + * will be allocated along with the handle for an associated object + * (typically ns_client). + */ + +void +isc_nm_udp_stoplistening(isc_nmsocket_t *sock); +/*%< + * Stop listening for UDP packets on socket 'sock'. + */ + +void +isc_nm_pause(isc_nm_t *mgr); +/*%< + * Pause all processing, equivalent to taskmgr exclusive tasks. + * It won't return until all workers have been paused. + */ + +void +isc_nm_resume(isc_nm_t *mgr); +/*%< + * Resume paused processing. It will return immediately + * after signalling workers to resume. + */ + +isc_result_t +isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg); + +isc_result_t +isc_nm_pauseread(isc_nmsocket_t *sock); +/*%< + * Pause reading on this socket, while still remembering the callback. + */ + +isc_result_t +isc_nm_resumeread(isc_nmsocket_t *sock); +/*%< + * Resume reading from socket. + * + * Requires: + * \li 'sock' is a valid netmgr socket + * \li ...for which a read/recv callback has been defined. + */ + +isc_result_t +isc_nm_send(isc_nmhandle_t *handle, isc_region_t *region, + isc_nm_cb_t cb, void *cbarg); +/*%< + * Send the data in 'region' via 'handle'. Afterward, the callback 'cb' is + * called with the argument 'cbarg'. + * + * 'region' is not copied; it has to be allocated beforehand and freed + * in 'cb'. + */ + +isc_result_t +isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, + isc_nm_cb_t cb, void *cbarg, + size_t extrahandlesize, isc_quota_t *quota, + isc_nmsocket_t **rv); +/*%< + * Start listening for raw messages over the TCP interface 'iface', using + * net manager 'mgr'. + * + * On success, 'sockp' will be updated to contain a new listening TCP + * socket. + * + * When a message is received on the socket, 'cb' will be called with 'cbarg' + * as its argument. + * + * When handles are allocated for the socket, 'extrasize' additional bytes + * will be allocated along with the handle for an associated object. + * + * If 'quota' is not NULL, then the socket is attached to the specified + * quota. This allows us to enforce TCP client quota limits. + * + * NOTE: This is currently only called inside isc_nm_listentcpdns(), which + * creates a 'wrapper' socket that sends and receives DNS messages - + * prepended with a two-byte length field - and handles buffering. + */ + +void +isc_nm_tcp_stoplistening(isc_nmsocket_t *sock); +/*%< + * Stop listening on TCP socket 'sock'. + */ + +isc_result_t +isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, + isc_nm_recv_cb_t cb, void *arg, + size_t extrahandlesize, isc_quota_t *quota, + isc_nmsocket_t **sockp); +/*%< + * Start listening for DNS messages over the TCP interface 'iface', using + * net manager 'mgr'. + * + * On success, 'sockp' will be updated to contain a new listening TCPDNS + * socket. This is a wrapper around a TCP socket, and handles DNS length + * processing. + * + * When a complete DNS message is received on the socket, 'cb' will be + * called with 'cbarg' as its argument. + * + * When handles are allocated for the socket, 'extrasize' additional bytes + * will be allocated along with the handle for an associated object + * (typically ns_client). + */ + +void +isc_nm_tcpdns_stoplistening(isc_nmsocket_t *sock); +/*%< + * Stop listening on TCPDNS socket 'sock'. + */ + +void +isc_nm_tcpdns_sequential(isc_nmhandle_t *handle); +/*%< + * Disable pipelining on this connection. Each DNS packet + * will be only processed after the previous completes. + * + * This cannot be reversed once set for a given connection + */ + +void +isc_nm_maxudp(isc_nm_t *mgr, uint32_t maxudp); +/*%< + * Simulate a broken firewall that blocks UDP messages larger + * than a given size. + */ diff --git a/lib/isc/include/isc/types.h b/lib/isc/include/isc/types.h index 0c11eadc62..5548d9989d 100644 --- a/lib/isc/include/isc/types.h +++ b/lib/isc/include/isc/types.h @@ -63,6 +63,10 @@ typedef struct isc_logmodule isc_logmodule_t; /*%< Log Module */ typedef struct isc_mem isc_mem_t; /*%< Memory */ typedef struct isc_mempool isc_mempool_t; /*%< Memory Pool */ typedef struct isc_netaddr isc_netaddr_t; /*%< Net Address */ +typedef struct isc_nm isc_nm_t; /*%< Network manager */ +typedef struct isc_nmsocket isc_nmsocket_t; /*%< Network manager socket */ +typedef struct isc_nmiface isc_nmiface_t; /*%< Network manager interface. */ +typedef struct isc_nmhandle isc_nmhandle_t; /*%< Network manager handle */ typedef struct isc_portset isc_portset_t; /*%< Port Set */ typedef struct isc_quota isc_quota_t; /*%< Quota */ typedef struct isc_ratelimiter isc_ratelimiter_t; /*%< Rate Limiter */ diff --git a/lib/isc/netmgr/Makefile.in b/lib/isc/netmgr/Makefile.in new file mode 100644 index 0000000000..d931b7c26d --- /dev/null +++ b/lib/isc/netmgr/Makefile.in @@ -0,0 +1,33 @@ +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# See the COPYRIGHT file distributed with this work for additional +# information regarding copyright ownership. + +srcdir = @srcdir@ +VPATH = @srcdir@ +top_srcdir = @top_srcdir@ + +CINCLUDES = -I${srcdir}/../include \ + -I${srcdir}/../unix/include \ + -I${srcdir}/../pthreads/include \ + -I${srcdir}/.. \ + ${OPENSSL_CFLAGS} \ + ${JSON_C_CFLAGS} \ + ${LIBXML2_CFLAGS} + +CDEFINES = +CWARNINGS = + +# Alphabetically +OBJS = netmgr.@O@ tcp.@O@ udp.@O@ tcpdns.@O@ uverr2result.@O@ + +# Alphabetically +SRCS = netmgr.c tcp.c udp.c tcpdns.c uverr2result.c + +TARGETS = ${OBJS} + +@BIND9_MAKE_RULES@ diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h new file mode 100644 index 0000000000..b95be9873b --- /dev/null +++ b/lib/isc/netmgr/netmgr-int.h @@ -0,0 +1,547 @@ +/* + * Copyright (C) Internet Systems Consortium, Inc. ("ISC") + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * See the COPYRIGHT file distributed with this work for additional + * information regarding copyright ownership. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define ISC_NETMGR_TID_UNKNOWN -1 +#define ISC_NETMGR_TID_NOTLS -2 + +/* + * Single network event loop worker. + */ +typedef struct isc__networker { + isc_nm_t * mgr; + int id; /* thread id */ + uv_loop_t loop; /* libuv loop structure */ + uv_async_t async; /* async channel to send + * data to this networker */ + isc_mutex_t lock; + isc_mempool_t *mpool_bufs; + isc_condition_t cond; + bool paused; + bool finished; + isc_thread_t thread; + isc_queue_t *ievents; /* incoming async events */ + isc_refcount_t references; + atomic_int_fast64_t pktcount; + char udprecvbuf[65536]; + bool udprecvbuf_inuse; +} isc__networker_t; + +/* + * A general handle for a connection bound to a networker. For UDP + * connections we have peer address here, so both TCP and UDP can be + * handled with a simple send-like function + */ +#define NMHANDLE_MAGIC ISC_MAGIC('N', 'M', 'H', 'D') +#define VALID_NMHANDLE(t) ISC_MAGIC_VALID(t, \ + NMHANDLE_MAGIC) + +typedef void (*isc__nm_closecb)(isc_nmhandle_t *); + +struct isc_nmhandle { + int magic; + isc_refcount_t references; + + /* + * The socket is not 'attached' in the traditional + * reference-counting sense. Instead, we keep all handles in an + * array in the socket object. This way, we don't have circular + * dependencies and we can close all handles when we're destroying + * the socket. + */ + isc_nmsocket_t *sock; + size_t ah_pos; /* Position in the socket's + * 'active handles' array */ + + /* + * The handle is 'inflight' if netmgr is not currently processing + * it in any way - it might mean that e.g. a recursive resolution + * is happening. For an inflight handle we must wait for the + * calling code to finish before we can free it. + */ + atomic_bool inflight; + + isc_sockaddr_t peer; + isc_sockaddr_t local; + isc_nm_opaquecb doreset; /* reset extra callback, external */ + isc_nm_opaquecb dofree; /* free extra callback, external */ + void * opaque; + char extra[]; +}; + +/* + * An interface - an address we can listen on. + */ +struct isc_nmiface { + isc_sockaddr_t addr; +}; + +typedef enum isc__netievent_type { + netievent_stop, + netievent_udplisten, + netievent_udpstoplisten, + netievent_udpsend, + netievent_udprecv, + netievent_tcpconnect, + netievent_tcpsend, + netievent_tcprecv, + netievent_tcpstartread, + netievent_tcppauseread, + netievent_tcplisten, + netievent_tcpstoplisten, + netievent_tcpclose, +} isc__netievent_type; + +typedef struct isc__netievent_stop { + isc__netievent_type type; +} isc__netievent_stop_t; + +/* + * We have to split it because we can read and write on a socket + * simultaneously. + */ +typedef union { + isc_nm_recv_cb_t recv; + isc_nm_cb_t accept; +} isc__nm_readcb_t; + +typedef union { + isc_nm_cb_t send; + isc_nm_cb_t connect; +} isc__nm_writecb_t; + +typedef union { + isc_nm_recv_cb_t recv; + isc_nm_cb_t accept; + isc_nm_cb_t send; + isc_nm_cb_t connect; +} isc__nm_cb_t; + +/* + * Wrapper around uv_req_t with 'our' fields in it. req->data should + * always point to its parent. Note that we always allocate more than + * sizeof(struct) because we make room for different req types; + */ +#define UVREQ_MAGIC ISC_MAGIC('N', 'M', 'U', 'R') +#define VALID_UVREQ(t) ISC_MAGIC_VALID(t, UVREQ_MAGIC) + +typedef struct isc__nm_uvreq { + int magic; + isc_nmsocket_t * sock; + isc_nmhandle_t * handle; + uv_buf_t uvbuf; /* translated isc_region_t, to be + sent or received */ + isc_sockaddr_t local; /* local address */ + isc_sockaddr_t peer; /* peer address */ + isc__nm_cb_t cb; /* callback */ + void * cbarg; /* callback argument */ + union { + uv_req_t req; + uv_getaddrinfo_t getaddrinfo; + uv_getnameinfo_t getnameinfo; + uv_shutdown_t shutdown; + uv_write_t write; + uv_connect_t connect; + uv_udp_send_t udp_send; + uv_fs_t fs; + uv_work_t work; + } uv_req; +} isc__nm_uvreq_t; + +typedef struct isc__netievent__socket { + isc__netievent_type type; + isc_nmsocket_t *sock; +} isc__netievent__socket_t; + +typedef isc__netievent__socket_t isc__netievent_udplisten_t; +typedef isc__netievent__socket_t isc__netievent_udpstoplisten_t; +typedef isc__netievent__socket_t isc__netievent_tcpstoplisten_t; +typedef isc__netievent__socket_t isc__netievent_tcpclose_t; +typedef isc__netievent__socket_t isc__netievent_startread_t; +typedef isc__netievent__socket_t isc__netievent_pauseread_t; +typedef isc__netievent__socket_t isc__netievent_resumeread_t; + +typedef struct isc__netievent__socket_req { + isc__netievent_type type; + isc_nmsocket_t *sock; + isc__nm_uvreq_t *req; +} isc__netievent__socket_req_t; + +typedef isc__netievent__socket_req_t isc__netievent_tcpconnect_t; +typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t; +typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t; + +typedef struct isc__netievent_udpsend { + isc__netievent_type type; + isc_nmsocket_t *sock; + isc_sockaddr_t peer; + isc__nm_uvreq_t *req; +} isc__netievent_udpsend_t; + +typedef struct isc__netievent { + isc__netievent_type type; +} isc__netievent_t; + +typedef union { + isc__netievent_t ni; + isc__netievent_stop_t nis; + isc__netievent_udplisten_t niul; + isc__netievent_udpsend_t nius; +} isc__netievent_storage_t; + +/* + * Network manager + */ +#define NM_MAGIC ISC_MAGIC('N', 'E', 'T', 'M') +#define VALID_NM(t) ISC_MAGIC_VALID(t, NM_MAGIC) + +struct isc_nm { + int magic; + isc_refcount_t references; + isc_mem_t *mctx; + uint32_t nworkers; + isc_mutex_t lock; + isc_condition_t wkstatecond; + isc__networker_t *workers; + atomic_uint_fast32_t workers_running; + atomic_uint_fast32_t workers_paused; + atomic_uint_fast32_t maxudp; + atomic_bool paused; + + /* + * A worker is actively waiting for other workers, for example to + * stop listening; that means no other thread can do the same thing + * or pause, or we'll deadlock. We have to either re-enqueue our + * event or wait for the other one to finish if we want to pause. + */ + atomic_bool interlocked; +}; + +typedef enum isc_nmsocket_type { + isc_nm_udpsocket, + isc_nm_udplistener, /* Aggregate of nm_udpsocks */ + isc_nm_tcpsocket, + isc_nm_tcplistener, + isc_nm_tcpdnslistener, + isc_nm_tcpdnssocket +} isc_nmsocket_type; + +/*% + * A universal structure for either a single socket or a group of + * dup'd/SO_REUSE_PORT-using sockets listening on the same interface. + */ +#define NMSOCK_MAGIC ISC_MAGIC('N', 'M', 'S', 'K') +#define VALID_NMSOCK(t) ISC_MAGIC_VALID(t, NMSOCK_MAGIC) + +struct isc_nmsocket { + /*% Unlocked, RO */ + int magic; + int tid; + isc_nmsocket_type type; + isc_nm_t *mgr; + isc_nmsocket_t *parent; + isc_quota_t *quota; + bool overquota; + + /*% outer socket is for 'wrapped' sockets - e.g. tcpdns in tcp */ + isc_nmsocket_t *outer; + + /*% server socket for connections */ + isc_nmsocket_t *server; + + /*% children sockets for multi-socket setups */ + isc_nmsocket_t *children; + int nchildren; + isc_nmiface_t *iface; + isc_nmhandle_t *tcphandle; + + /*% extra data allocated at the end of each isc_nmhandle_t */ + size_t extrahandlesize; + + /*% libuv data */ + uv_os_sock_t fd; + union uv_any_handle uv_handle; + + isc_sockaddr_t peer; + + /* Atomic */ + /*% Number of running (e.g. listening) children sockets */ + atomic_int_fast32_t rchildren; + + /*% + * Socket if active if it's listening, working, etc., if we're + * closing a socket it doesn't make any sense to e.g. still + * push handles or reqs for reuse + */ + atomic_bool active; + atomic_bool destroying; + + /*% + * Socket is closed if it's not active and all the possible + * callbacks were fired, there are no active handles, etc. + * active==false, closed==false means the socket is closing. + */ + atomic_bool closed; + atomic_bool listening; + isc_refcount_t references; + + /*% + * TCPDNS socket is not pipelining. + */ + atomic_bool sequential; + /*% + * TCPDNS socket in sequential mode is currently processing a packet, + * we need to wait until it finishes. + */ + atomic_bool processing; + + /*% + * 'spare' handles for that can be reused to avoid allocations, + * for UDP. + */ + isc_astack_t *inactivehandles; + isc_astack_t *inactivereqs; + + /* Used for active/rchildren during shutdown */ + isc_mutex_t lock; + isc_condition_t cond; + + /*% + * List of active handles. + * ah_size - size of ah_frees and ah_handles + * ah_cpos - current position in ah_frees; + * ah_handles - array of *handles. + * Adding a handle + * - if ah_cpos == ah_size, realloc + * - x = ah_frees[ah_cpos] + * - ah_frees[ah_cpos++] = 0; + * - ah_handles[x] = handle + * - x must be stored with the handle! + * Removing a handle: + * - ah_frees[--ah_cpos] = x + * - ah_handles[x] = NULL; + * + * XXXWPK for now this is locked with socket->lock, but we might want + * to change it to something lockless + */ + size_t ah_size; + size_t ah_cpos; + size_t *ah_frees; + isc_nmhandle_t **ah_handles; + + /* Buffer for TCPDNS processing, optional */ + size_t buf_size; + size_t buf_len; + unsigned char *buf; + + isc__nm_readcb_t rcb; + void *rcbarg; +}; + +bool +isc__nm_in_netthread(void); +/*% + * Returns 'true' if we're in the network thread. + */ + +void * +isc__nm_get_ievent(isc_nm_t *mgr, isc__netievent_type type); +/*%< + * Allocate an ievent and set the type. + */ + +void +isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event); +/*%< + * Enqueue an ievent onto a specific worker queue. (This the only safe + * way to use an isc__networker_t from another thread.) + */ + +void +isc__nm_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf); +/*%< + * Allocator for recv operations. + * + * Note that as currently implemented, this doesn't actually + * allocate anything, it just assigns the the isc__networker's UDP + * receive buffer to a socket, and marks it as "in use". + */ + +void +isc__nm_free_uvbuf(isc_nmsocket_t *sock, const uv_buf_t *buf); +/*%< + * Free a buffer allocated for a receive operation. + * + * Note that as currently implemented, this doesn't actually + * free anything, marks the isc__networker's UDP receive buffer + * as "not in use". + */ + + +isc_nmhandle_t * +isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer, + isc_sockaddr_t *local); +/*%< + * Get a handle for the socket 'sock', allocating a new one + * if there isn't one availbale in 'sock->inactivehandles'. + * + * If 'peer' is not NULL, set the handle's peer address to 'peer', + * otherwise set it to 'sock->peer'. + * + * If 'local' is not NULL, set the handle's local address to 'local', + * otherwise set it to 'sock->iface->addr'. + */ + +isc__nm_uvreq_t * +isc__nm_uvreq_get(isc_nm_t *mgr, isc_nmsocket_t *sock); +/*%< + * Get a UV request structure for the socket 'sock', allocating a + * new one if there isn't one availbale in 'sock->inactivereqs'. + */ + +void +isc__nm_uvreq_put(isc__nm_uvreq_t **req, isc_nmsocket_t *sock); +/*%< + * Completes the use of a UV request structure, setting '*req' to NULL. + * + * The UV request is pushed onto the 'sock->inactivereqs' stack or, + * if that doesn't work, freed. + */ + +void +isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, + isc_nmsocket_type type); +/*%< + * Initialize socket 'sock', attach it to 'mgr', and set it to type 'type'. + */ + +void +isc__nmsocket_prep_destroy(isc_nmsocket_t *sock); +/*%< + * Market 'sock' as inactive, close it if necessary, and destroy it + * if there are no remaining references or active handles. + */ + +isc_result_t +isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, + isc_nm_cb_t cb, void *cbarg); +/*%< + * Back-end implemenation of isc_nm_send() for UDP handles. + */ + +void +isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ievent0); + +void +isc__nm_async_udpstoplisten(isc__networker_t *worker, + isc__netievent_t *ievent0); +void +isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ievent0); +/*%< + * Callback handlers for asynchronous UDP events (listen, stoplisten, send). + */ + +isc_result_t +isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, + isc_nm_cb_t cb, void *cbarg); +/*%< + * Back-end implemenation of isc_nm_send() for TCP handles. + */ + +void +isc__nm_tcp_close(isc_nmsocket_t *sock); +/*%< + * Close a TCP socket. + */ + +void +isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0); +void +isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0); +void +isc__nm_async_tcpstoplisten(isc__networker_t *worker, + isc__netievent_t *ievent0); +void +isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ievent0); +void +isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ievent0); +void +isc__nm_async_pauseread(isc__networker_t *worker, isc__netievent_t *ievent0); +void +isc__nm_async_resumeread(isc__networker_t *worker, isc__netievent_t *ievent0); +void +isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ievent0); +/*%< + * Callback handlers for asynchronous TCP events (connect, listen, + * stoplisten, send, read, pauseread, resumeread, close). + */ + + +isc_result_t +isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region, + isc_nm_cb_t cb, void *cbarg); +/*%< + * Back-end implemenation of isc_nm_send() for TCPDNS handles. + */ + +void +isc__nm_tcpdns_close(isc_nmsocket_t *sock); +/*%< + * Close a TCPDNS socket. + */ + +#define isc__nm_uverr2result(x) \ + isc___nm_uverr2result(x, true, __FILE__, __LINE__) +isc_result_t +isc___nm_uverr2result(int uverr, bool dolog, + const char *file, unsigned int line); +/*%< + * Convert a libuv error value into an isc_result_t. The + * list of supported error values is not complete; new users + * of this function should add any expected errors that are + * not already there. + */ + +bool +isc__nm_acquire_interlocked(isc_nm_t *mgr); +/*%< + * Try to acquire interlocked state; return true if successful. + */ + +void +isc__nm_drop_interlocked(isc_nm_t *mgr); +/*%< + * Drop interlocked state; signal waiters. + */ + +void +isc__nm_acquire_interlocked_force(isc_nm_t *mgr); +/*%< + * Actively wait for interlocked state. + */ diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c new file mode 100644 index 0000000000..962bc1af03 --- /dev/null +++ b/lib/isc/netmgr/netmgr.c @@ -0,0 +1,1056 @@ +/* + * Copyright (C) Internet Systems Consortium, Inc. ("ISC") + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * See the COPYRIGHT file distributed with this work for additional + * information regarding copyright ownership. + */ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "netmgr-int.h" + +/* + * libuv is not thread safe, but has mechanisms to pass messages + * between threads. Each socket is owned by a thread. For UDP + * sockets we have a set of sockets for each interface and we can + * choose a sibling and send the message directly. For TCP, or if + * we're calling from a non-networking thread, we need to pass the + * request using async_cb. + */ + +#if defined(HAVE_TLS) +#if defined(HAVE_THREAD_LOCAL) +#include +static thread_local int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN; +#elif defined(HAVE___THREAD) +static __thread int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN; +#elif defined(HAVE___DECLSPEC_THREAD) +static __declspec( thread ) int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN; +#else /* if defined(HAVE_THREAD_LOCAL) */ +#error "Unknown method for defining a TLS variable!" +#endif /* if defined(HAVE_THREAD_LOCAL) */ +#else /* if defined(HAVE_TLS) */ +static int isc__nm_tid_v = ISC_NETMGR_TID_NOTLS; +#endif /* if defined(HAVE_TLS) */ + +static void +nmsocket_maybe_destroy(isc_nmsocket_t *sock); +static void +nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle); +static void * +nm_thread(void *worker0); +static void +async_cb(uv_async_t *handle); + +int +isc_nm_tid() { + return (isc__nm_tid_v); +} + +bool +isc__nm_in_netthread() { + return (isc__nm_tid_v >= 0); +} + +isc_nm_t * +isc_nm_start(isc_mem_t *mctx, uint32_t workers) { + isc_nm_t *mgr = NULL; + char name[32]; + + mgr = isc_mem_get(mctx, sizeof(*mgr)); + *mgr = (isc_nm_t) { + .nworkers = workers + }; + + isc_mem_attach(mctx, &mgr->mctx); + isc_mutex_init(&mgr->lock); + isc_condition_init(&mgr->wkstatecond); + isc_refcount_init(&mgr->references, 1); + atomic_init(&mgr->workers_running, 0); + atomic_init(&mgr->workers_paused, 0); + atomic_init(&mgr->maxudp, 0); + atomic_init(&mgr->paused, false); + atomic_init(&mgr->interlocked, false); + + mgr->workers = isc_mem_get(mctx, workers * sizeof(isc__networker_t)); + for (size_t i = 0; i < workers; i++) { + int r; + isc__networker_t *worker = &mgr->workers[i]; + *worker = (isc__networker_t) { + .mgr = mgr, + .id = i, + }; + + r = uv_loop_init(&worker->loop); + RUNTIME_CHECK(r == 0); + + worker->loop.data = &mgr->workers[i]; + + r = uv_async_init(&worker->loop, &worker->async, async_cb); + RUNTIME_CHECK(r == 0); + + isc_mutex_init(&worker->lock); + isc_condition_init(&worker->cond); + + isc_mempool_create(mgr->mctx, 65536, &worker->mpool_bufs); + worker->ievents = isc_queue_new(mgr->mctx, 128); + + /* + * We need to do this here and not in nm_thread to avoid a + * race - we could exit isc_nm_start, launch nm_destroy, + * and nm_thread would still not be up. + */ + atomic_fetch_add_explicit(&mgr->workers_running, 1, + memory_order_relaxed); + isc_thread_create(nm_thread, &mgr->workers[i], &worker->thread); + + snprintf(name, sizeof(name), "isc-net-%04zu", i); + isc_thread_setname(worker->thread, name); + } + + mgr->magic = NM_MAGIC; + return (mgr); +} + +/* + * Free the resources of the network manager. + * + * TODO we need to clean up properly - launch all missing callbacks, + * destroy all listeners, etc. + */ +static void +nm_destroy(isc_nm_t **mgr0) { + REQUIRE(VALID_NM(*mgr0)); + REQUIRE(!isc__nm_in_netthread()); + + isc_nm_t *mgr = *mgr0; + + LOCK(&mgr->lock); + mgr->magic = 0; + + for (size_t i = 0; i < mgr->nworkers; i++) { + isc__netievent_t *event = NULL; + + LOCK(&mgr->workers[i].lock); + mgr->workers[i].finished = true; + UNLOCK(&mgr->workers[i].lock); + event = isc__nm_get_ievent(mgr, netievent_stop); + isc__nm_enqueue_ievent(&mgr->workers[i], event); + } + + while (atomic_load(&mgr->workers_running) > 0) { + WAIT(&mgr->wkstatecond, &mgr->lock); + } + UNLOCK(&mgr->lock); + + for (size_t i = 0; i < mgr->nworkers; i++) { + /* Empty the async event queue */ + isc__netievent_t *ievent; + while ((ievent = (isc__netievent_t *) + isc_queue_dequeue(mgr->workers[i].ievents)) != NULL) + { + isc_mem_put(mgr->mctx, ievent, + sizeof(isc__netievent_storage_t)); + } + isc_queue_destroy(mgr->workers[i].ievents); + isc_mempool_destroy(&mgr->workers[i].mpool_bufs); + } + + isc_condition_destroy(&mgr->wkstatecond); + isc_mutex_destroy(&mgr->lock); + isc_mem_put(mgr->mctx, mgr->workers, + mgr->nworkers * sizeof(isc__networker_t)); + isc_mem_putanddetach(&mgr->mctx, mgr, sizeof(*mgr)); + *mgr0 = NULL; +} + +void +isc_nm_pause(isc_nm_t *mgr) { + REQUIRE(VALID_NM(mgr)); + REQUIRE(!isc__nm_in_netthread()); + + atomic_store(&mgr->paused, true); + isc__nm_acquire_interlocked_force(mgr); + + for (size_t i = 0; i < mgr->nworkers; i++) { + isc__netievent_t *event = NULL; + + LOCK(&mgr->workers[i].lock); + mgr->workers[i].paused = true; + UNLOCK(&mgr->workers[i].lock); + + /* + * We have to issue a stop, otherwise the uv_run loop will + * run indefinitely! + */ + event = isc__nm_get_ievent(mgr, netievent_stop); + isc__nm_enqueue_ievent(&mgr->workers[i], event); + } + + LOCK(&mgr->lock); + while (atomic_load_relaxed(&mgr->workers_paused) != + atomic_load_relaxed(&mgr->workers_running)) + { + WAIT(&mgr->wkstatecond, &mgr->lock); + } + UNLOCK(&mgr->lock); +} + +void +isc_nm_resume(isc_nm_t *mgr) { + REQUIRE(VALID_NM(mgr)); + REQUIRE(!isc__nm_in_netthread()); + + for (size_t i = 0; i < mgr->nworkers; i++) { + LOCK(&mgr->workers[i].lock); + mgr->workers[i].paused = false; + SIGNAL(&mgr->workers[i].cond); + UNLOCK(&mgr->workers[i].lock); + } + isc__nm_drop_interlocked(mgr); + + /* + * We're not waiting for all the workers to come back to life; + * they eventually will, we don't care. + */ +} + +void +isc_nm_attach(isc_nm_t *mgr, isc_nm_t **dst) { + int refs; + + REQUIRE(VALID_NM(mgr)); + REQUIRE(dst != NULL && *dst == NULL); + + refs = isc_refcount_increment(&mgr->references); + INSIST(refs > 0); + + *dst = mgr; +} + +void +isc_nm_detach(isc_nm_t **mgr0) { + isc_nm_t *mgr = NULL; + int references; + + REQUIRE(mgr0 != NULL); + REQUIRE(VALID_NM(*mgr0)); + + mgr = *mgr0; + *mgr0 = NULL; + + references = isc_refcount_decrement(&mgr->references); + INSIST(references > 0); + if (references == 1) { + nm_destroy(&mgr); + } +} + + +void +isc_nm_destroy(isc_nm_t **mgr0) { + isc_nm_t *mgr = NULL; + int references; + + REQUIRE(mgr0 != NULL); + REQUIRE(VALID_NM(*mgr0)); + + mgr = *mgr0; + *mgr0 = NULL; + + /* + * Wait for the manager to be dereferenced elsehwere. + */ + while (isc_refcount_current(&mgr->references) > 1) { +#ifdef WIN32 + _sleep(1000); +#else + usleep(1000000); +#endif + } + references = isc_refcount_decrement(&mgr->references); + INSIST(references > 0); + if (references == 1) { + nm_destroy(&mgr); + } +} + +void +isc_nm_maxudp(isc_nm_t *mgr, uint32_t maxudp) { + REQUIRE(VALID_NM(mgr)); + + atomic_store(&mgr->maxudp, maxudp); +} + +/* + * nm_thread is a single worker thread, that runs uv_run event loop + * until asked to stop. + */ +static void * +nm_thread(void *worker0) { + isc__networker_t *worker = (isc__networker_t *) worker0; + + isc__nm_tid_v = worker->id; + isc_thread_setaffinity(isc__nm_tid_v); + + while (true) { + int r = uv_run(&worker->loop, UV_RUN_DEFAULT); + bool pausing = false; + + /* + * or there's nothing to do. In the first case - wait + * for condition. In the latter - timedwait + */ + LOCK(&worker->lock); + while (worker->paused) { + LOCK(&worker->mgr->lock); + if (!pausing) { + atomic_fetch_add_explicit( + &worker->mgr->workers_paused, + 1, memory_order_acquire); + pausing = true; + } + + SIGNAL(&worker->mgr->wkstatecond); + UNLOCK(&worker->mgr->lock); + + WAIT(&worker->cond, &worker->lock); + } + if (pausing) { + uint32_t wp = atomic_fetch_sub_explicit( + &worker->mgr->workers_paused, + 1, memory_order_release); + if (wp == 1) { + atomic_store(&worker->mgr->paused, false); + } + } + UNLOCK(&worker->lock); + + if (worker->finished) { + /* TODO walk the handles and free them! */ + break; + } + + if (r == 0) { + /* + * TODO it should never happen - we don't have + * any sockets we're listening on? + */ +#ifdef WIN32 + _sleep(100); +#else + usleep(100000); +#endif + } + + /* + * Empty the async queue. + */ + async_cb(&worker->async); + } + + LOCK(&worker->mgr->lock); + atomic_fetch_sub_explicit(&worker->mgr->workers_running, 1, + memory_order_relaxed); + SIGNAL(&worker->mgr->wkstatecond); + UNLOCK(&worker->mgr->lock); + return (NULL); +} + +/* + * async_cb is an universal callback for 'async' events sent to event loop. + * It's the only way to safely pass data to libuv event loop. We use a single + * async event and a lockless queue of 'isc__netievent_t' structures passed + * from other threads. + */ +static void +async_cb(uv_async_t *handle) { + isc__networker_t *worker = (isc__networker_t *) handle->loop->data; + isc__netievent_t *ievent; + + /* + * We only try dequeue to not waste time, libuv guarantees + * that if someone calls uv_async_send -after- async_cb was called + * then async_cb will be called again, we won't loose any signals. + */ + while ((ievent = (isc__netievent_t *) + isc_queue_dequeue(worker->ievents)) != NULL) + { + switch (ievent->type) { + case netievent_stop: + uv_stop(handle->loop); + isc_mem_put(worker->mgr->mctx, ievent, + sizeof(isc__netievent_storage_t)); + return; + case netievent_udplisten: + isc__nm_async_udplisten(worker, ievent); + break; + case netievent_udpstoplisten: + isc__nm_async_udpstoplisten(worker, ievent); + break; + case netievent_udpsend: + isc__nm_async_udpsend(worker, ievent); + break; + case netievent_tcpconnect: + isc__nm_async_tcpconnect(worker, ievent); + break; + case netievent_tcplisten: + isc__nm_async_tcplisten(worker, ievent); + break; + case netievent_tcpstartread: + isc__nm_async_startread(worker, ievent); + break; + case netievent_tcppauseread: + isc__nm_async_pauseread(worker, ievent); + break; + case netievent_tcpsend: + isc__nm_async_tcpsend(worker, ievent); + break; + case netievent_tcpstoplisten: + isc__nm_async_tcpstoplisten(worker, ievent); + break; + case netievent_tcpclose: + isc__nm_async_tcpclose(worker, ievent); + break; + default: + INSIST(0); + ISC_UNREACHABLE(); + } + isc_mem_put(worker->mgr->mctx, ievent, + sizeof(isc__netievent_storage_t)); + } +} + +void * +isc__nm_get_ievent(isc_nm_t *mgr, isc__netievent_type type) { + isc__netievent_storage_t *event = + isc_mem_get(mgr->mctx, sizeof(isc__netievent_storage_t)); + + /* XXX: use a memory pool? */ + *event = (isc__netievent_storage_t) { + .ni.type = type + }; + return (event); +} + +void +isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) { + isc_queue_enqueue(worker->ievents, (uintptr_t)event); + uv_async_send(&worker->async); +} + +static bool +isc__nmsocket_active(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); + if (sock->parent != NULL) { + return (atomic_load(&sock->parent->active)); + } + + return (atomic_load(&sock->active)); +} + +void +isc_nmsocket_attach(isc_nmsocket_t *sock, isc_nmsocket_t **target) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(target != NULL && *target == NULL); + + if (sock->parent != NULL) { + INSIST(sock->parent->parent == NULL); /* sanity check */ + isc_refcount_increment(&sock->parent->references); + } else { + isc_refcount_increment(&sock->references); + } + + *target = sock; +} + +/* + * Free all resources inside a socket (including its children if any). + */ +static void +nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) { + isc_nmhandle_t *handle = NULL; + isc__nm_uvreq_t *uvreq = NULL; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(!isc__nmsocket_active(sock)); + + atomic_store(&sock->destroying, true); + + if (sock->parent == NULL && sock->children != NULL) { + /* + * We shouldn't be here unless there are no active handles, + * so we can clean up and free the children. + */ + for (int i = 0; i < sock->nchildren; i++) { + if (!atomic_load(&sock->children[i].destroying)) { + nmsocket_cleanup(&sock->children[i], false); + } + } + + /* + * This was a parent socket; free the children. + */ + isc_mem_put(sock->mgr->mctx, sock->children, + sock->nchildren * sizeof(*sock)); + sock->children = NULL; + sock->nchildren = 0; + } + + if (sock->tcphandle != NULL) { + isc_nmhandle_unref(sock->tcphandle); + sock->tcphandle = NULL; + } + + while ((handle = isc_astack_pop(sock->inactivehandles)) != NULL) { + nmhandle_free(sock, handle); + } + + if (sock->buf != NULL) { + isc_mem_put(sock->mgr->mctx, sock->buf, sock->buf_size); + } + + if (sock->quota != NULL) { + isc_quota_detach(&sock->quota); + } + + isc_astack_destroy(sock->inactivehandles); + + while ((uvreq = isc_astack_pop(sock->inactivereqs)) != NULL) { + isc_mem_put(sock->mgr->mctx, uvreq, sizeof(*uvreq)); + } + + isc_astack_destroy(sock->inactivereqs); + + isc_mem_free(sock->mgr->mctx, sock->ah_frees); + isc_mem_free(sock->mgr->mctx, sock->ah_handles); + + if (dofree) { + isc_nm_t *mgr = sock->mgr; + isc_mem_put(mgr->mctx, sock, sizeof(*sock)); + isc_nm_detach(&mgr); + } else { + isc_nm_detach(&sock->mgr); + } + +} + +static void +nmsocket_maybe_destroy(isc_nmsocket_t *sock) { + int active_handles = 0; + bool destroy = false; + + REQUIRE(!isc__nmsocket_active(sock)); + + if (sock->parent != NULL) { + /* + * This is a child socket and cannot be destroyed except + * as a side effect of destroying the parent, so let's go + * see if the parent is ready to be destroyed. + */ + nmsocket_maybe_destroy(sock->parent); + return; + } + + /* + * This is a parent socket (or a standalone). See whether the + * children have active handles before deciding whether to + * accept destruction. + */ + LOCK(&sock->lock); + active_handles += sock->ah_cpos; + if (sock->children != NULL) { + for (int i = 0; i < sock->nchildren; i++) { + LOCK(&sock->children[i].lock); + active_handles += sock->children[i].ah_cpos; + UNLOCK(&sock->children[i].lock); + } + } + + if (atomic_load(&sock->closed) && + atomic_load(&sock->references) == 0 && + (active_handles == 0 || sock->tcphandle != NULL)) + { + destroy = true; + } + UNLOCK(&sock->lock); + + if (destroy) { + nmsocket_cleanup(sock, true); + } +} + +void +isc__nmsocket_prep_destroy(isc_nmsocket_t *sock) { + REQUIRE(sock->parent == NULL); + + /* + * The final external reference to the socket is gone. We can try + * destroying the socket, but we have to wait for all the inflight + * handles to finish first. + */ + atomic_store(&sock->active, false); + + /* + * If the socket has children, they'll need to be marked inactive + * so they can be cleaned up too. + */ + if (sock->children != NULL) { + for (int i = 0; i < sock->nchildren; i++) { + atomic_store(&sock->children[i].active, false); + } + } + + /* + * If we're here then we already stopped listening; otherwise + * we'd have a hanging reference from the listening process. + * + * If it's a regular socket we may need to close it. + */ + if (!atomic_load(&sock->closed)) { + switch (sock->type) { + case isc_nm_tcpsocket: + isc__nm_tcp_close(sock); + break; + case isc_nm_tcpdnssocket: + isc__nm_tcpdns_close(sock); + break; + default: + break; + } + } + + nmsocket_maybe_destroy(sock); +} + +void +isc_nmsocket_detach(isc_nmsocket_t **sockp) { + REQUIRE(sockp != NULL && *sockp != NULL); + REQUIRE(VALID_NMSOCK(*sockp)); + + isc_nmsocket_t *sock = *sockp, *rsock = NULL; + int references; + *sockp = NULL; + + /* + * If the socket is a part of a set (a child socket) we are + * counting references for the whole set at the parent. + */ + if (sock->parent != NULL) { + rsock = sock->parent; + INSIST(rsock->parent == NULL); /* Sanity check */ + } else { + rsock = sock; + } + + references = isc_refcount_decrement(&rsock->references); + INSIST(references > 0); + if (references == 1) { + isc__nmsocket_prep_destroy(rsock); + } + +} + +void +isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, + isc_nmsocket_type type) +{ + *sock = (isc_nmsocket_t) { + .type = type, + .fd = -1, + .ah_size = 32, + .inactivehandles = isc_astack_new(mgr->mctx, 60), + .inactivereqs = isc_astack_new(mgr->mctx, 60) + }; + + isc_nm_attach(mgr, &sock->mgr); + sock->uv_handle.handle.data = sock; + + sock->ah_frees = isc_mem_allocate(mgr->mctx, + sock->ah_size * sizeof(size_t)); + sock->ah_handles = isc_mem_allocate(mgr->mctx, + sock->ah_size * + sizeof(isc_nmhandle_t *)); + for (size_t i = 0; i < 32; i++) { + sock->ah_frees[i] = i; + sock->ah_handles[i] = NULL; + } + + isc_mutex_init(&sock->lock); + isc_condition_init(&sock->cond); + isc_refcount_init(&sock->references, 1); + atomic_init(&sock->active, true); + + sock->magic = NMSOCK_MAGIC; +} + +void +isc__nm_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) { + isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data; + isc__networker_t *worker = NULL; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(isc__nm_in_netthread()); + REQUIRE(size <= 65536); + + /* TODO that's for UDP only! */ + worker = &sock->mgr->workers[sock->tid]; + INSIST(!worker->udprecvbuf_inuse); + + buf->base = worker->udprecvbuf; + worker->udprecvbuf_inuse = true; + buf->len = size; +} + +void +isc__nm_free_uvbuf(isc_nmsocket_t *sock, const uv_buf_t *buf) { + isc__networker_t *worker = NULL; + + REQUIRE(VALID_NMSOCK(sock)); + + worker = &sock->mgr->workers[sock->tid]; + + REQUIRE(worker->udprecvbuf_inuse); + REQUIRE(buf->base == worker->udprecvbuf); + + UNUSED(buf); + + worker->udprecvbuf_inuse = false; +} + +static isc_nmhandle_t * +alloc_handle(isc_nmsocket_t *sock) { + isc_nmhandle_t *handle = + isc_mem_get(sock->mgr->mctx, + sizeof(isc_nmhandle_t) + sock->extrahandlesize); + + *handle = (isc_nmhandle_t) { + .magic = NMHANDLE_MAGIC + }; + isc_refcount_init(&handle->references, 1); + + return (handle); +} + +isc_nmhandle_t * +isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer, + isc_sockaddr_t *local) +{ + isc_nmhandle_t *handle = NULL; + int pos; + + REQUIRE(VALID_NMSOCK(sock)); + + handle = isc_astack_pop(sock->inactivehandles); + + if (handle == NULL) { + handle = alloc_handle(sock); + } else { + INSIST(VALID_NMHANDLE(handle)); + isc_refcount_increment(&handle->references); + } + + handle->sock = sock; + if (peer != NULL) { + memcpy(&handle->peer, peer, sizeof(isc_sockaddr_t)); + } else { + memcpy(&handle->peer, &sock->peer, sizeof(isc_sockaddr_t)); + } + + if (local != NULL) { + memcpy(&handle->local, local, sizeof(isc_sockaddr_t)); + } else if (sock->iface != NULL) { + memcpy(&handle->local, &sock->iface->addr, + sizeof(isc_sockaddr_t)); + } else { + INSIST(0); + ISC_UNREACHABLE(); + } + + LOCK(&sock->lock); + /* We need to add this handle to the list of active handles */ + if (sock->ah_cpos == sock->ah_size) { + sock->ah_frees = + isc_mem_reallocate(sock->mgr->mctx, sock->ah_frees, + sock->ah_size * 2 * + sizeof(size_t)); + sock->ah_handles = + isc_mem_reallocate(sock->mgr->mctx, + sock->ah_handles, + sock->ah_size * 2 * + sizeof(isc_nmhandle_t *)); + + for (size_t i = sock->ah_size; i < sock->ah_size * 2; i++) { + sock->ah_frees[i] = i; + sock->ah_handles[i] = NULL; + } + + sock->ah_size *= 2; + } + + pos = sock->ah_frees[sock->ah_cpos++]; + INSIST(sock->ah_handles[pos] == NULL); + sock->ah_handles[pos] = handle; + handle->ah_pos = pos; + UNLOCK(&sock->lock); + + if (sock->type == isc_nm_tcpsocket) { + INSIST(sock->tcphandle == NULL); + sock->tcphandle = handle; + } + + return (handle); +} + +void +isc_nmhandle_ref(isc_nmhandle_t *handle) { + int refs; + + REQUIRE(VALID_NMHANDLE(handle)); + + refs = isc_refcount_increment(&handle->references); + INSIST(refs > 0); + +} + +bool +isc_nmhandle_is_stream(isc_nmhandle_t *handle) { + REQUIRE(VALID_NMHANDLE(handle)); + + return (handle->sock->type == isc_nm_tcpsocket || + handle->sock->type == isc_nm_tcpdnssocket); +} + +static void +nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) { + size_t extra = sock->extrahandlesize; + + if (handle->dofree) { + handle->dofree(handle->opaque); + } + + *handle = (isc_nmhandle_t) { + .magic = 0 + }; + isc_mem_put(sock->mgr->mctx, handle, sizeof(isc_nmhandle_t) + extra); +} + +void +isc_nmhandle_unref(isc_nmhandle_t *handle) { + int refs; + + REQUIRE(VALID_NMHANDLE(handle)); + + refs = isc_refcount_decrement(&handle->references); + INSIST(refs > 0); + if (refs == 1) { + isc_nmsocket_t *sock = handle->sock; + bool reuse = false; + + handle->sock = NULL; + if (handle->doreset != NULL) { + handle->doreset(handle->opaque); + } + + /* + * We do it all under lock to avoid races with socket + * destruction. + */ + LOCK(&sock->lock); + INSIST(sock->ah_handles[handle->ah_pos] == handle); + INSIST(sock->ah_size > handle->ah_pos); + INSIST(sock->ah_cpos > 0); + sock->ah_handles[handle->ah_pos] = NULL; + sock->ah_frees[--sock->ah_cpos] = handle->ah_pos; + handle->ah_pos = 0; + + if (atomic_load(&sock->active)) { + reuse = isc_astack_trypush(sock->inactivehandles, + handle); + } + UNLOCK(&sock->lock); + + if (!reuse) { + nmhandle_free(sock, handle); + } + + if (sock->ah_cpos == 0 && + !atomic_load(&sock->active) && + !atomic_load(&sock->destroying)) + { + nmsocket_maybe_destroy(sock); + } + } +} + +void * +isc_nmhandle_getdata(isc_nmhandle_t *handle) { + REQUIRE(VALID_NMHANDLE(handle)); + + return (handle->opaque); +} + +void +isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg, + isc_nm_opaquecb doreset, isc_nm_opaquecb dofree) +{ + REQUIRE(VALID_NMHANDLE(handle)); + + handle->opaque = arg; + handle->doreset = doreset; + handle->dofree = dofree; +} + +void * +isc_nmhandle_getextra(isc_nmhandle_t *handle) { + REQUIRE(VALID_NMHANDLE(handle)); + + return (handle->extra); +} + +isc_sockaddr_t +isc_nmhandle_peeraddr(isc_nmhandle_t *handle) { + REQUIRE(VALID_NMHANDLE(handle)); + + return (handle->peer); +} + +isc_sockaddr_t +isc_nmhandle_localaddr(isc_nmhandle_t *handle) { + REQUIRE(VALID_NMHANDLE(handle)); + + return (handle->local); +} + +isc__nm_uvreq_t * +isc__nm_uvreq_get(isc_nm_t *mgr, isc_nmsocket_t *sock) { + isc__nm_uvreq_t *req = NULL; + + REQUIRE(VALID_NM(mgr)); + REQUIRE(VALID_NMSOCK(sock)); + + if (sock != NULL && atomic_load(&sock->active)) { + /* Try to reuse one */ + req = isc_astack_pop(sock->inactivereqs); + } + + if (req == NULL) { + req = isc_mem_get(mgr->mctx, sizeof(isc__nm_uvreq_t)); + } + + *req = (isc__nm_uvreq_t) { + .magic = 0 + }; + req->uv_req.req.data = req; + isc_nmsocket_attach(sock, &req->sock); + req->magic = UVREQ_MAGIC; + + return (req); +} + +void +isc__nm_uvreq_put(isc__nm_uvreq_t **req0, isc_nmsocket_t *sock) { + isc__nm_uvreq_t *req = NULL; + isc_nmhandle_t *handle = NULL; + + REQUIRE(req0 != NULL); + REQUIRE(VALID_UVREQ(*req0)); + + req = *req0; + *req0 = NULL; + + INSIST(sock == req->sock); + + req->magic = 0; + + /* + * We need to save this first to make sure that handle, + * sock, and the netmgr won't all disappear. + */ + handle = req->handle; + req->handle = NULL; + + if (!atomic_load(&sock->active) || + !isc_astack_trypush(sock->inactivereqs, req)) + { + isc_mem_put(sock->mgr->mctx, req, sizeof(isc__nm_uvreq_t)); + } + + if (handle != NULL) { + isc_nmhandle_unref(handle); + } + + isc_nmsocket_detach(&sock); +} + +isc_result_t +isc_nm_send(isc_nmhandle_t *handle, isc_region_t *region, + isc_nm_cb_t cb, void *cbarg) +{ + REQUIRE(VALID_NMHANDLE(handle)); + + switch (handle->sock->type) { + case isc_nm_udpsocket: + case isc_nm_udplistener: + return (isc__nm_udp_send(handle, region, cb, cbarg)); + case isc_nm_tcpsocket: + return (isc__nm_tcp_send(handle, region, cb, cbarg)); + case isc_nm_tcpdnssocket: + return (isc__nm_tcpdns_send(handle, region, cb, cbarg)); + default: + INSIST(0); + ISC_UNREACHABLE(); + } +} + +bool +isc__nm_acquire_interlocked(isc_nm_t *mgr) { + LOCK(&mgr->lock); + bool success = atomic_compare_exchange_strong(&mgr->interlocked, + &(bool){false}, true); + UNLOCK(&mgr->lock); + return (success); +} + +void +isc__nm_drop_interlocked(isc_nm_t *mgr) { + LOCK(&mgr->lock); + bool success = atomic_compare_exchange_strong(&mgr->interlocked, + &(bool){true}, false); + INSIST(success == true); + BROADCAST(&mgr->wkstatecond); + UNLOCK(&mgr->lock); +} + +void +isc__nm_acquire_interlocked_force(isc_nm_t *mgr) { + LOCK(&mgr->lock); + while (!atomic_compare_exchange_strong(&mgr->interlocked, + &(bool){false}, true)) + { + WAIT(&mgr->wkstatecond, &mgr->lock); + } + UNLOCK(&mgr->lock); +} diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c new file mode 100644 index 0000000000..f0aabd28f8 --- /dev/null +++ b/lib/isc/netmgr/tcp.c @@ -0,0 +1,611 @@ +/* + * Copyright (C) Internet Systems Consortium, Inc. ("ISC") + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * See the COPYRIGHT file distributed with this work for additional + * information regarding copyright ownership. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "netmgr-int.h" + +static int +tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req); + +static void +tcp_close_direct(isc_nmsocket_t *sock); + +static isc_result_t +tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req); +static void +tcp_connect_cb(uv_connect_t *uvreq, int status); + +static void +tcp_connection_cb(uv_stream_t *server, int status); + +static void +read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf); + +static void +tcp_close_cb(uv_handle_t *uvhandle); + +static int +tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { + isc__networker_t *worker; + int r; + + REQUIRE(isc__nm_in_netthread()); + + worker = &sock->mgr->workers[isc_nm_tid()]; + + r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); + if (r != 0) { + return (r); + } + + if (req->local.length != 0) { + r = uv_tcp_bind(&sock->uv_handle.tcp, &req->local.type.sa, 0); + if (r != 0) { + tcp_close_direct(sock); + return (r); + } + } + + r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp, + &req->peer.type.sa, tcp_connect_cb); + return (r); +} + +void +isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0) { + isc__netievent_tcpconnect_t *ievent = + (isc__netievent_tcpconnect_t *) ievent0; + isc_nmsocket_t *sock = ievent->sock; + isc__nm_uvreq_t *req = ievent->req; + int r; + + REQUIRE(sock->type == isc_nm_tcpsocket); + REQUIRE(worker->id == ievent->req->sock->mgr->workers[isc_nm_tid()].id); + + r = tcp_connect_direct(sock, req); + if (r != 0) { + /* We need to issue callbacks ourselves */ + tcp_connect_cb(&req->uv_req.connect, r); + } +} + +static void +tcp_connect_cb(uv_connect_t *uvreq, int status) { + isc__nm_uvreq_t *req = (isc__nm_uvreq_t *) uvreq->data; + isc_nmsocket_t *sock = uvreq->handle->data; + + REQUIRE(VALID_UVREQ(req)); + + if (status == 0) { + isc_result_t result; + isc_nmhandle_t *handle = NULL; + struct sockaddr_storage ss; + + uv_tcp_getpeername(&sock->uv_handle.tcp, + (struct sockaddr *) &ss, + &(int){sizeof(ss)}); + result = isc_sockaddr_fromsockaddr(&sock->peer, + (struct sockaddr *) &ss); + RUNTIME_CHECK(result == ISC_R_SUCCESS); + + handle = isc__nmhandle_get(sock, NULL, NULL); + req->cb.connect(handle, ISC_R_SUCCESS, req->cbarg); + } else { + /* TODO handle it properly, free sock, translate code */ + req->cb.connect(NULL, ISC_R_FAILURE, req->cbarg); + } + + isc__nm_uvreq_put(&req, sock); +} + +isc_result_t +isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, + isc_nm_cb_t cb, void *cbarg, + size_t extrahandlesize, isc_quota_t *quota, + isc_nmsocket_t **rv) +{ + isc__netievent_tcplisten_t *ievent = NULL; + isc_nmsocket_t *nsock = NULL; + + REQUIRE(VALID_NM(mgr)); + + nsock = isc_mem_get(mgr->mctx, sizeof(*nsock)); + isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener); + nsock->iface = iface; + nsock->rcb.accept = cb; + nsock->rcbarg = cbarg; + nsock->extrahandlesize = extrahandlesize; + if (quota != NULL) { + /* + * We need to force it to make sure we get it attached. + * An example failure mode would be server under attack + * reconfiguring interfaces - that might cause weak attach + * to fail and leave this listening socket without limits. + * We can ignore the result. + */ + isc_quota_force(quota, &nsock->quota); + } + nsock->tid = isc_random_uniform(mgr->nworkers); + + /* + * Listening to TCP is rare enough not to care about the + * added overhead from passing this to another thread. + */ + ievent = isc__nm_get_ievent(mgr, netievent_tcplisten); + ievent->sock = nsock; + isc__nm_enqueue_ievent(&mgr->workers[nsock->tid], + (isc__netievent_t *) ievent); + *rv = nsock; + + return (ISC_R_SUCCESS); +} + +void +isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { + isc__netievent_tcplisten_t *ievent = + (isc__netievent_tcplisten_t *) ievent0; + isc_nmsocket_t *sock = ievent->sock; + int r; + + REQUIRE(isc__nm_in_netthread()); + REQUIRE(sock->type == isc_nm_tcplistener); + + r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); + if (r != 0) { + return; + } + + uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, 0); + r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, 10, + tcp_connection_cb); + if (r != 0) { + return; + } + + atomic_store(&sock->listening, true); + + return; +} + +void +isc_nm_tcp_stoplistening(isc_nmsocket_t *sock) { + isc__netievent_tcpstoplisten_t *ievent = NULL; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(!isc__nm_in_netthread()); + + ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstoplisten); + isc_nmsocket_attach(sock, &ievent->sock); + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) ievent); +} + +static void +stoplistening_cb(uv_handle_t *handle) { + isc_nmsocket_t *sock = handle->data; + + LOCK(&sock->lock); + atomic_store(&sock->listening, false); + atomic_store(&sock->closed, true); + SIGNAL(&sock->cond); + UNLOCK(&sock->lock); + + if (sock->quota != NULL) { + isc_quota_detach(&sock->quota); + } + + isc_nmsocket_detach(&sock); +} + +void +isc__nm_async_tcpstoplisten(isc__networker_t *worker, + isc__netievent_t *ievent0) +{ + isc__netievent_tcpstoplisten_t *ievent = + (isc__netievent_tcpstoplisten_t *) ievent0; + isc_nmsocket_t *sock = ievent->sock; + + UNUSED(worker); + + REQUIRE(isc__nm_in_netthread()); + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_tcplistener); + + uv_close(&sock->uv_handle.handle, stoplistening_cb); +} + +isc_result_t +isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { + isc_nmsocket_t *sock = NULL; + + REQUIRE(VALID_NMHANDLE(handle)); + REQUIRE(VALID_NMSOCK(handle->sock)); + + sock = handle->sock; + sock->rcb.recv = cb; + sock->rcbarg = cbarg; /* That's obviously broken... */ + if (sock->tid == isc_nm_tid()) { + int r = uv_read_start(&sock->uv_handle.stream, + isc__nm_alloc_cb, read_cb); + INSIST(r == 0); + } else { + isc__netievent_startread_t *ievent = + isc__nm_get_ievent(sock->mgr, + netievent_tcpstartread); + ievent->sock = sock; + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) ievent); + } + + return (ISC_R_SUCCESS); +} + +void +isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ievent0) { + isc__netievent_startread_t *ievent = + (isc__netievent_startread_t *) ievent0; + isc_nmsocket_t *sock = ievent->sock; + + REQUIRE(worker->id == isc_nm_tid()); + + uv_read_start(&sock->uv_handle.stream, isc__nm_alloc_cb, read_cb); +} + +isc_result_t +isc_nm_pauseread(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); + + if (sock->tid == isc_nm_tid()) { + int r = uv_read_stop(&sock->uv_handle.stream); + INSIST(r == 0); + } else { + isc__netievent_pauseread_t *ievent = + isc__nm_get_ievent(sock->mgr, + netievent_tcppauseread); + ievent->sock = sock; + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) ievent); + } + + return (ISC_R_SUCCESS); +} + +void +isc__nm_async_pauseread(isc__networker_t *worker, isc__netievent_t *ievent0) { + isc__netievent_pauseread_t *ievent = + (isc__netievent_pauseread_t *) ievent0; + isc_nmsocket_t *sock = ievent->sock; + REQUIRE(VALID_NMSOCK(sock)); + + REQUIRE(worker->id == isc_nm_tid()); + + uv_read_stop(&sock->uv_handle.stream); +} + +isc_result_t +isc_nm_resumeread(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->rcb.recv != NULL); + + if (sock->tid == isc_nm_tid()) { + int r = uv_read_start(&sock->uv_handle.stream, + isc__nm_alloc_cb, read_cb); + INSIST(r == 0); + } else { + /* It's the same as startread */ + isc__netievent_startread_t *ievent = + isc__nm_get_ievent(sock->mgr, + netievent_tcpstartread); + ievent->sock = sock; + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) ievent); + } + + return (ISC_R_SUCCESS); +} + +static void +read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { + isc_nmsocket_t *sock = stream->data; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(buf != NULL); + + if (nread >= 0) { + isc_region_t region = { + .base = (unsigned char *) buf->base, + .length = nread + }; + + INSIST(sock->rcb.recv != NULL); + sock->rcb.recv(sock->tcphandle, ®ion, sock->rcbarg); + isc__nm_free_uvbuf(sock, buf); + return; + } + + isc__nm_free_uvbuf(sock, buf); + if (sock->quota) { + isc_quota_detach(&sock->quota); + } + sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg); + + /* + * XXXWPK TODO clean up handles, close the connection, + * reclaim quota + */ +} + +static isc_result_t +accept_connection(isc_nmsocket_t *ssock) { + isc_result_t result; + isc_quota_t *quota = NULL; + isc_nmsocket_t *csock = NULL; + isc__networker_t *worker = NULL; + isc_nmhandle_t *handle = NULL; + struct sockaddr_storage ss; + isc_sockaddr_t local; + int r; + + REQUIRE(VALID_NMSOCK(ssock)); + REQUIRE(ssock->tid == isc_nm_tid()); + + if (!atomic_load_relaxed(&ssock->active)) { + /* We're closing, bail */ + return (ISC_R_CANCELED); + } + + if (ssock->quota != NULL) { + result = isc_quota_attach(ssock->quota, "a); + if (result != ISC_R_SUCCESS) { + return (result); + } + } + + csock = isc_mem_get(ssock->mgr->mctx, sizeof(isc_nmsocket_t)); + isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket); + csock->tid = isc_nm_tid(); + csock->extrahandlesize = ssock->extrahandlesize; + csock->iface = ssock->iface; + csock->quota = quota; + quota = NULL; + + worker = &ssock->mgr->workers[isc_nm_tid()]; + uv_tcp_init(&worker->loop, &csock->uv_handle.tcp); + + r = uv_accept(&ssock->uv_handle.stream, &csock->uv_handle.stream); + if (r != 0) { + if (csock->quota != NULL) { + isc_quota_detach(&csock->quota); + } + isc_mem_put(ssock->mgr->mctx, csock, sizeof(isc_nmsocket_t)); + + return (isc__nm_uverr2result(r)); + } + + isc_nmsocket_attach(ssock, &csock->server); + + uv_tcp_getpeername(&csock->uv_handle.tcp, (struct sockaddr *) &ss, + &(int){sizeof(ss)}); + + result = isc_sockaddr_fromsockaddr(&csock->peer, + (struct sockaddr *) &ss); + RUNTIME_CHECK(result == ISC_R_SUCCESS); + uv_tcp_getsockname(&csock->uv_handle.tcp, (struct sockaddr *) &ss, + &(int){sizeof(ss)}); + result = isc_sockaddr_fromsockaddr(&local, + (struct sockaddr *) &ss); + RUNTIME_CHECK(result == ISC_R_SUCCESS); + + handle = isc__nmhandle_get(csock, NULL, &local); + + INSIST(ssock->rcb.accept != NULL); + ssock->rcb.accept(handle, ISC_R_SUCCESS, ssock->rcbarg); + isc_nmsocket_detach(&csock); + + return (ISC_R_SUCCESS); +} + +static void +tcp_connection_cb(uv_stream_t *server, int status) { + isc_nmsocket_t *ssock = server->data; + isc_result_t result = accept_connection(ssock); + + UNUSED(status); + + if (result != ISC_R_SUCCESS) { + if (result == ISC_R_QUOTA || result == ISC_R_SOFTQUOTA) { + ssock->overquota = true; + } + /* XXXWPK TODO LOG */ + } +} + +/* + * isc__nm_tcp_send sends buf to a peer on a socket. + */ +isc_result_t +isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, + isc_nm_cb_t cb, void *cbarg) +{ + isc_nmsocket_t *sock = handle->sock; + isc__netievent_tcpsend_t *ievent = NULL; + isc__nm_uvreq_t *uvreq = NULL; + + REQUIRE(sock->type == isc_nm_tcpsocket); + + uvreq = isc__nm_uvreq_get(sock->mgr, sock); + uvreq->uvbuf.base = (char *) region->base; + uvreq->uvbuf.len = region->length; + uvreq->handle = handle; + isc_nmhandle_ref(uvreq->handle); + uvreq->cb.send = cb; + uvreq->cbarg = cbarg; + + if (sock->tid == isc_nm_tid()) { + /* + * If we're in the same thread as the socket we can send the + * data directly + */ + return (tcp_send_direct(sock, uvreq)); + } else { + /* + * We need to create an event and pass it using async channel + */ + ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpsend); + ievent->sock = sock; + ievent->req = uvreq; + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) ievent); + return (ISC_R_SUCCESS); + } + + return (ISC_R_UNEXPECTED); +} + +static void +tcp_send_cb(uv_write_t *req, int status) { + isc_result_t result = ISC_R_SUCCESS; + isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *) req->data; + + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + + if (status < 0) { + result = isc__nm_uverr2result(status); + } + + uvreq->cb.send(uvreq->handle, result, uvreq->cbarg); + isc_nmhandle_unref(uvreq->handle); + isc__nm_uvreq_put(&uvreq, uvreq->handle->sock); +} + +/* + * Handle 'tcpsend' async event - send a packet on the socket + */ +void +isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ievent0) { + isc_result_t result; + isc__netievent_tcpsend_t *ievent = (isc__netievent_tcpsend_t *) ievent0; + + REQUIRE(worker->id == ievent->sock->tid); + + if (!atomic_load(&ievent->sock->active)) { + return; + } + + result = tcp_send_direct(ievent->sock, ievent->req); + if (result != ISC_R_SUCCESS) { + ievent->req->cb.send(ievent->req->handle, + result, ievent->req->cbarg); + isc__nm_uvreq_put(&ievent->req, ievent->req->handle->sock); + } +} + +static isc_result_t +tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { + int r; + + REQUIRE(sock->tid == isc_nm_tid()); + REQUIRE(sock->type == isc_nm_tcpsocket); + + isc_nmhandle_ref(req->handle); + r = uv_write(&req->uv_req.write, &sock->uv_handle.stream, + &req->uvbuf, 1, tcp_send_cb); + if (r < 0) { + req->cb.send(NULL, isc__nm_uverr2result(r), req->cbarg); + isc__nm_uvreq_put(&req, sock); + return (isc__nm_uverr2result(r)); + } + + return (ISC_R_SUCCESS); +} + +static void +tcp_close_cb(uv_handle_t *uvhandle) { + isc_nmsocket_t *sock = uvhandle->data; + + REQUIRE(VALID_NMSOCK(sock)); + + atomic_store(&sock->closed, true); + isc__nmsocket_prep_destroy(sock); +} + +static void +tcp_close_direct(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->tid == isc_nm_tid()); + REQUIRE(sock->type == isc_nm_tcpsocket); + + if (sock->quota != NULL) { + isc_nmsocket_t *ssock = sock->server; + + isc_quota_detach(&sock->quota); + + if (ssock->overquota) { + /* XXXWPK TODO we should loop here */ + isc_result_t result = accept_connection(ssock); + if (result != ISC_R_QUOTA && result != ISC_R_SOFTQUOTA) + { + ssock->overquota = false; + } + } + } + + isc_nmsocket_detach(&sock->server); + uv_close(&sock->uv_handle.handle, tcp_close_cb); +} + +void +isc__nm_tcp_close(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_tcpsocket); + + if (sock->tid == isc_nm_tid()) { + tcp_close_direct(sock); + } else { + /* + * We need to create an event and pass it using async channel + */ + isc__netievent_tcpclose_t *ievent = + isc__nm_get_ievent(sock->mgr, netievent_tcpclose); + + ievent->sock = sock; + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) ievent); + } +} + +void +isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ievent0) { + isc__netievent_tcpclose_t *ievent = + (isc__netievent_tcpclose_t *) ievent0; + + REQUIRE(worker->id == ievent->sock->tid); + + tcp_close_direct(ievent->sock); +} diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c new file mode 100644 index 0000000000..0c38ecf494 --- /dev/null +++ b/lib/isc/netmgr/tcpdns.c @@ -0,0 +1,405 @@ +/* + * Copyright (C) Internet Systems Consortium, Inc. ("ISC") + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * See the COPYRIGHT file distributed with this work for additional + * information regarding copyright ownership. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "netmgr-int.h" + +static void +dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg); + +static inline size_t +dnslen(unsigned char* base) { + return ((base[0] << 8) + (base[1])); +} + +#define NM_REG_BUF 4096 +#define NM_BIG_BUF 65536 +static inline void +alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) { + REQUIRE(len <= NM_BIG_BUF); + + if (sock->buf == NULL) { + /* We don't have the buffer at all */ + size_t alloc_len = len < NM_REG_BUF ? NM_REG_BUF : NM_BIG_BUF; + sock->buf = isc_mem_get(sock->mgr->mctx, alloc_len); + sock->buf_size = alloc_len; + } else { + /* We have the buffer but it's too small */ + sock->buf = isc_mem_reallocate(sock->mgr->mctx, sock->buf, + NM_BIG_BUF); + sock->buf_size = NM_BIG_BUF; + } +} + + +/* + * Accept callback for TCP-DNS connection + */ +static void +dnslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { + isc_nmsocket_t *dnslistensock = (isc_nmsocket_t *) cbarg; + isc_nmsocket_t *dnssock = NULL; + + REQUIRE(VALID_NMSOCK(dnslistensock)); + REQUIRE(dnslistensock->type == isc_nm_tcpdnslistener); + + /* If accept() was unnsuccessful we can't do anything */ + if (result != ISC_R_SUCCESS) { + return; + } + + /* We need to create a 'wrapper' dnssocket for this connection */ + dnssock = isc_mem_get(handle->sock->mgr->mctx, sizeof(*dnssock)); + isc__nmsocket_init(dnssock, handle->sock->mgr, + isc_nm_tcpdnssocket); + + /* We need to copy read callbacks from outer socket */ + dnssock->rcb.recv = dnslistensock->rcb.recv; + dnssock->rcbarg = dnslistensock->rcbarg; + dnssock->extrahandlesize = dnslistensock->extrahandlesize; + isc_nmsocket_attach(handle->sock, &dnssock->outer); + dnssock->peer = handle->sock->peer; + dnssock->iface = handle->sock->iface; + + isc_nm_read(handle, dnslisten_readcb, dnssock); +} + +/* + * We've got a read on our underlying socket, need to check if we have + * a complete DNS packet and, if so - call the callback + */ +static void +dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) { + isc_nmsocket_t *dnssock = (isc_nmsocket_t *) arg; + isc_sockaddr_t local; + unsigned char *base = NULL; + size_t len; + + REQUIRE(VALID_NMSOCK(dnssock)); + REQUIRE(VALID_NMHANDLE(handle)); + + if (region == NULL) { + /* Connection closed */ + atomic_store(&dnssock->closed, true); + isc_nmsocket_detach(&dnssock->outer); + isc_nmsocket_detach(&dnssock); + return; + } + + local = isc_nmhandle_localaddr(handle); + + base = region->base; + len = region->length; + + /* + * We have something in the buffer, we need to glue it. + */ + if (dnssock->buf_len > 0) { + size_t plen; + + if (dnssock->buf_len == 1) { + /* Make sure we have the length */ + dnssock->buf[1] = base[0]; + dnssock->buf_len = 2; + base++; + len--; + } + + /* At this point we definitely have 2 bytes there. */ + plen = ISC_MIN(len, (dnslen(dnssock->buf) + 2 - + dnssock->buf_len)); + if (plen > dnssock->buf_size) { + alloc_dnsbuf(dnssock, plen); + } + + memmove(dnssock->buf + dnssock->buf_len, base, plen); + dnssock->buf_len += plen; + base += plen; + len -= plen; + + /* Do we have a complete packet in the buffer? */ + if (dnslen(dnssock->buf) == dnssock->buf_len - 2) { + isc_nmhandle_t *dnshandle = NULL; + isc_region_t r2 = { + .base = dnssock->buf + 2, + .length = dnslen(dnssock->buf) + }; + dnshandle = isc__nmhandle_get(dnssock, NULL, &local); + atomic_store(&dnssock->processing, true); + dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg); + dnssock->buf_len = 0; + + /* + * If the recv callback wants to hold on to the + * handle, it needs to attach to it. + */ + isc_nmhandle_unref(dnshandle); + } + } + + /* + * At this point we've processed whatever was previously in the + * socket buffer. If there are more messages to be found in what + * we've read, and if we're either pipelining or not processing + * anything else, then we can process those messages now. + */ + while (len >= 2 && dnslen(base) <= len - 2 && + !(atomic_load(&dnssock->sequential) && + atomic_load(&dnssock->processing))) + { + isc_nmhandle_t *dnshandle = NULL; + isc_region_t r2 = { + .base = base + 2, + .length = dnslen(base) + }; + + len -= dnslen(base) + 2; + base += dnslen(base) + 2; + + dnshandle = isc__nmhandle_get(dnssock, NULL, &local); + atomic_store(&dnssock->processing, true); + dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg); + + /* + * If the recv callback wants to hold on to the + * handle, it needs to attach to it. + */ + isc_nmhandle_unref(dnshandle); + } + + /* + * We have less than a full message remaining; it can be + * stored in the socket buffer for next time. + */ + if (len > 0) { + if (len > dnssock->buf_size) { + alloc_dnsbuf(dnssock, len); + } + + INSIST(len <= dnssock->buf_size); + memmove(dnssock->buf, base, len); + dnssock->buf_len = len; + } +} + +/* Process all complete packets out of incoming buffer */ +static void +processbuffer(isc_nmsocket_t *dnssock) { + REQUIRE(VALID_NMSOCK(dnssock)); + + /* While we have a complete packet in the buffer */ + while (dnssock->buf_len > 2 && + dnslen(dnssock->buf) <= dnssock->buf_len - 2) + { + isc_nmhandle_t *dnshandle = NULL; + isc_region_t r2 = { + .base = dnssock->buf + 2, + .length = dnslen(dnssock->buf) + }; + size_t len; + + dnshandle = isc__nmhandle_get(dnssock, NULL, NULL); + atomic_store(&dnssock->processing, true); + dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg); + + /* + * If the recv callback wants to hold on to the + * handle, it needs to attach to it. + */ + isc_nmhandle_unref(dnshandle); + + len = dnslen(dnssock->buf) + 2; + dnssock->buf_len -= len; + if (len > 0) { + memmove(dnssock->buf, dnssock->buf + len, + dnssock->buf_len); + } + + /* Check here to make sure we do the processing at least once */ + if (atomic_load(&dnssock->processing)) { + return; + } + } +} + +/* + * isc_nm_listentcpdns listens for connections and accepts + * them immediately, then calls the cb for each incoming DNS packet + * (with 2-byte length stripped) - just like for UDP packet. + */ +isc_result_t +isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, + isc_nm_recv_cb_t cb, void *cbarg, + size_t extrahandlesize, isc_quota_t *quota, + isc_nmsocket_t **rv) +{ + /* A 'wrapper' socket object with outer set to true TCP socket */ + isc_nmsocket_t *dnslistensock = + isc_mem_get(mgr->mctx, sizeof(*dnslistensock)); + isc_result_t result; + + REQUIRE(VALID_NM(mgr)); + + isc__nmsocket_init(dnslistensock, mgr, isc_nm_tcpdnslistener); + dnslistensock->iface = iface; + dnslistensock->rcb.recv = cb; + dnslistensock->rcbarg = cbarg; + dnslistensock->extrahandlesize = extrahandlesize; + + /* We set dnslistensock->outer to a true listening socket */ + result = isc_nm_listentcp(mgr, iface, dnslisten_acceptcb, + dnslistensock, extrahandlesize, + quota, &dnslistensock->outer); + + atomic_store(&dnslistensock->listening, true); + *rv = dnslistensock; + return (result); +} + +void +isc_nm_tcpdns_stoplistening(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_tcpdnslistener); + + atomic_store(&sock->listening, false); + atomic_store(&sock->closed, true); + + if (sock->outer != NULL) { + isc_nm_tcp_stoplistening(sock->outer); + isc_nmsocket_detach(&sock->outer); + } +} + +void +isc_nm_tcpdns_sequential(isc_nmhandle_t *handle) { + REQUIRE(VALID_NMHANDLE(handle)); + + if (handle->sock->type != isc_nm_tcpdnssocket || + handle->sock->outer == NULL) + { + return; + } + + /* + * We don't want pipelining on this connection. That means + * that we can launch query processing only when the previous + * one returned. + * + * The socket MUST be unpaused after the query is processed. + * This is done by isc_nm_resumeread() in tcpdnssend_cb() below. + * + * XXX: The callback is not currently executed in failure cases! + */ + isc_nm_pauseread(handle->sock->outer); + atomic_store(&handle->sock->sequential, true); +} + +typedef struct tcpsend { + isc_mem_t *mctx; + isc_nmhandle_t *handle; + isc_region_t region; + isc_nmhandle_t *orighandle; + isc_nm_cb_t cb; + void *cbarg; +} tcpsend_t; + +static void +tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { + tcpsend_t *ts = (tcpsend_t *) cbarg; + + UNUSED(handle); + + ts->cb(ts->orighandle, result, ts->cbarg); + isc_mem_put(ts->mctx, ts->region.base, ts->region.length); + + /* + * The response was sent, if we're in sequential mode resume + * processing. + */ + if (atomic_load(&ts->orighandle->sock->sequential)) { + atomic_store(&ts->orighandle->sock->processing, false); + processbuffer(ts->orighandle->sock); + isc_nm_resumeread(handle->sock); + } + + isc_nmhandle_unref(ts->orighandle); + isc_mem_putanddetach(&ts->mctx, ts, sizeof(*ts)); +} + +/* + * isc__nm_tcp_send sends buf to a peer on a socket. + */ +isc_result_t +isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region, + isc_nm_cb_t cb, void *cbarg) +{ + tcpsend_t *t = NULL; + + REQUIRE(VALID_NMHANDLE(handle)); + + isc_nmsocket_t *sock = handle->sock; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_tcpdnssocket); + + if (sock->outer == NULL) { + /* The socket is closed, just issue the callback */ + cb(handle, ISC_R_FAILURE, cbarg); + return (ISC_R_NOTCONNECTED); + } + + t = isc_mem_get(sock->mgr->mctx, sizeof(*t)); + *t = (tcpsend_t) { + .cb = cb, + .cbarg = cbarg, + .handle = handle->sock->outer->tcphandle, + }; + + isc_mem_attach(sock->mgr->mctx, &t->mctx); + t->orighandle = handle; + isc_nmhandle_ref(t->orighandle); + + t->region = (isc_region_t) { + .base = isc_mem_get(t->mctx, region->length + 2), + .length = region->length + 2 + }; + + *(uint16_t *) t->region.base = htons(region->length); + memmove(t->region.base + 2, region->base, region->length); + + return (isc__nm_tcp_send(t->handle, &t->region, tcpdnssend_cb, t)); +} + +void +isc__nm_tcpdns_close(isc_nmsocket_t *sock) { + if (sock->outer != NULL) { + isc_nmsocket_detach(&sock->outer); + } + + atomic_store(&sock->closed, true); + isc__nmsocket_prep_destroy(sock); +} diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c new file mode 100644 index 0000000000..e14dc2dd93 --- /dev/null +++ b/lib/isc/netmgr/udp.c @@ -0,0 +1,461 @@ +/* + * Copyright (C) Internet Systems Consortium, Inc. ("ISC") + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * See the COPYRIGHT file distributed with this work for additional + * information regarding copyright ownership. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "netmgr-int.h" + +static isc_result_t +udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, + isc_sockaddr_t *peer); + +static void +udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, + const struct sockaddr *addr, unsigned flags); + +static void +udp_send_cb(uv_udp_send_t *req, int status); + +isc_result_t +isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, + isc_nm_recv_cb_t cb, void *cbarg, + size_t extrahandlesize, isc_nmsocket_t **sockp) +{ + isc_nmsocket_t *nsock = NULL; + + REQUIRE(VALID_NM(mgr)); + + /* + * We are creating mgr->nworkers duplicated sockets, one + * socket for each worker thread. + */ + nsock = isc_mem_get(mgr->mctx, sizeof(isc_nmsocket_t)); + isc__nmsocket_init(nsock, mgr, isc_nm_udplistener); + nsock->iface = iface; + nsock->nchildren = mgr->nworkers; + atomic_init(&nsock->rchildren, mgr->nworkers); + nsock->children = isc_mem_get(mgr->mctx, + mgr->nworkers * sizeof(*nsock)); + memset(nsock->children, 0, mgr->nworkers * sizeof(*nsock)); + + INSIST(nsock->rcb.recv == NULL && nsock->rcbarg == NULL); + nsock->rcb.recv = cb; + nsock->rcbarg = cbarg; + nsock->extrahandlesize = extrahandlesize; + + for (size_t i = 0; i < mgr->nworkers; i++) { + uint16_t family = iface->addr.type.sa.sa_family; + int res; + + isc__netievent_udplisten_t *ievent = NULL; + isc_nmsocket_t *csock = &nsock->children[i]; + + isc__nmsocket_init(csock, mgr, isc_nm_udpsocket); + csock->parent = nsock; + csock->iface = iface; + csock->tid = i; + csock->extrahandlesize = extrahandlesize; + + INSIST(csock->rcb.recv == NULL && csock->rcbarg == NULL); + csock->rcb.recv = cb; + csock->rcbarg = cbarg; + csock->fd = socket(family, SOCK_DGRAM, 0); + INSIST(csock->fd >= 0); + + /* + * This is SO_REUSE**** hell: + * On Linux SO_REUSEPORT allows multiple sockets to bind to + * the same host:port pair. + * On Windows the same thing is achieved with SO_REUSEADDR + */ +#ifdef WIN32 + res = setsockopt(csock->fd, SOL_SOCKET, SO_REUSEADDR, + &(int){1}, sizeof(int)); +#else + res = setsockopt(csock->fd, SOL_SOCKET, SO_REUSEPORT, + &(int){1}, sizeof(int)); +#endif + RUNTIME_CHECK(res == 0); + + ievent = isc__nm_get_ievent(mgr, netievent_udplisten); + ievent->sock = csock; + isc__nm_enqueue_ievent(&mgr->workers[i], + (isc__netievent_t *) ievent); + } + + *sockp = nsock; + return (ISC_R_SUCCESS); +} + +/* + * handle 'udplisten' async call - start listening on a socket. + */ +void +isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { + isc__netievent_udplisten_t *ievent = + (isc__netievent_udplisten_t *) ievent0; + isc_nmsocket_t *sock = ievent->sock; + + REQUIRE(sock->type == isc_nm_udpsocket); + REQUIRE(sock->iface != NULL); + REQUIRE(sock->parent != NULL); + + uv_udp_init(&worker->loop, &sock->uv_handle.udp); + sock->uv_handle.udp.data = NULL; + isc_nmsocket_attach(sock, + (isc_nmsocket_t **)&sock->uv_handle.udp.data); + + uv_udp_open(&sock->uv_handle.udp, sock->fd); + uv_udp_bind(&sock->uv_handle.udp, + &sock->parent->iface->addr.type.sa, 0); + uv_recv_buffer_size(&sock->uv_handle.handle, + &(int){16 * 1024 * 1024}); + uv_send_buffer_size(&sock->uv_handle.handle, + &(int){16 * 1024 * 1024}); + uv_udp_recv_start(&sock->uv_handle.udp, isc__nm_alloc_cb, + udp_recv_cb); +} + +static void +udp_close_cb(uv_handle_t *handle) { + isc_nmsocket_t *sock = handle->data; + atomic_store(&sock->closed, true); + + isc_nmsocket_detach((isc_nmsocket_t **)&sock->uv_handle.udp.data); +} + +static void +stop_udp_child(isc_nmsocket_t *sock) { + INSIST(sock->type == isc_nm_udpsocket); + + uv_udp_recv_stop(&sock->uv_handle.udp); + uv_close((uv_handle_t *) &sock->uv_handle.udp, udp_close_cb); + + LOCK(&sock->parent->lock); + atomic_fetch_sub(&sock->parent->rchildren, 1); + UNLOCK(&sock->parent->lock); + BROADCAST(&sock->parent->cond); +} + +static void +stoplistening(isc_nmsocket_t *sock) { + /* + * Socket is already closing; there's nothing to do. + */ + if (uv_is_closing((uv_handle_t *) &sock->uv_handle.udp)) { + return; + } + + INSIST(sock->type == isc_nm_udplistener); + + for (int i = 0; i < sock->nchildren; i++) { + isc__netievent_udplisten_t *event = NULL; + + if (i == sock->tid) { + stop_udp_child(&sock->children[i]); + continue; + } + + event = isc__nm_get_ievent(sock->mgr, netievent_udpstoplisten); + event->sock = &sock->children[i]; + isc__nm_enqueue_ievent(&sock->mgr->workers[i], + (isc__netievent_t *) event); + } + + LOCK(&sock->lock); + while (atomic_load_relaxed(&sock->rchildren) > 0) { + WAIT(&sock->cond, &sock->lock); + } + atomic_store(&sock->closed, true); + UNLOCK(&sock->lock); + + isc__nmsocket_prep_destroy(sock); +} + +void +isc_nm_udp_stoplistening(isc_nmsocket_t *sock) { + isc__netievent_udpstoplisten_t *ievent = NULL; + + /* We can't be launched from network thread, we'd deadlock */ + REQUIRE(!isc__nm_in_netthread()); + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_udplistener); + + /* + * If the manager is interlocked, re-enqueue this as an asynchronous + * event. Otherwise, go ahead and stop listening right away. + */ + if (!isc__nm_acquire_interlocked(sock->mgr)) { + ievent = isc__nm_get_ievent(sock->mgr, netievent_udpstoplisten); + ievent->sock = sock; + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) ievent); + } else { + stoplistening(sock); + isc__nm_drop_interlocked(sock->mgr); + } +} + +/* + * handle 'udpstoplisten' async call - stop listening on a socket. + */ +void +isc__nm_async_udpstoplisten(isc__networker_t *worker, + isc__netievent_t *ievent0) +{ + isc__netievent_udplisten_t *ievent = + (isc__netievent_udplisten_t *) ievent0; + isc_nmsocket_t *sock = ievent->sock; + + REQUIRE(sock->iface != NULL); + UNUSED(worker); + + /* + * If this is a child socket, stop listening and return. + */ + if (sock->parent != NULL) { + stop_udp_child(sock); + return; + } + + /* + * If network manager is paused, re-enqueue the event for later. + */ + if (!isc__nm_acquire_interlocked(sock->mgr)) { + isc__netievent_udplisten_t *event = NULL; + + event = isc__nm_get_ievent(sock->mgr, netievent_udpstoplisten); + event->sock = sock; + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) event); + } else { + stoplistening(sock); + isc__nm_drop_interlocked(sock->mgr); + } +} + +/* + * udp_recv_cb handles incoming UDP packet from uv. + * The buffer here is reused for a series of packets, + * so we need to allocate a new one. This new one can + * be reused to send the response then. + */ +static void +udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, + const struct sockaddr *addr, unsigned flags) +{ + isc_result_t result; + isc_nmhandle_t *nmhandle = NULL; + isc_sockaddr_t sockaddr; + isc_sockaddr_t localaddr; + struct sockaddr_storage laddr; + isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data; + isc_region_t region; + uint32_t maxudp; + + REQUIRE(VALID_NMSOCK(sock)); + + /* XXXWPK TODO handle it! */ + UNUSED(flags); + + /* + * If addr == NULL that's the end of stream - we can + * free the buffer and bail. + */ + if (addr == NULL) { + isc__nm_free_uvbuf(sock, buf); + return; + } + + /* + * Simulate a firewall blocking UDP packets bigger than + * 'maxudp' bytes. + */ + maxudp = atomic_load(&sock->mgr->maxudp); + if (maxudp != 0 && (uint32_t)nrecv > maxudp) { + return; + } + + result = isc_sockaddr_fromsockaddr(&sockaddr, addr); + RUNTIME_CHECK(result == ISC_R_SUCCESS); + uv_udp_getsockname(handle, (struct sockaddr *) &laddr, + &(int){sizeof(struct sockaddr_storage)}); + result = isc_sockaddr_fromsockaddr(&localaddr, + (struct sockaddr *) &laddr); + RUNTIME_CHECK(result == ISC_R_SUCCESS); + + nmhandle = isc__nmhandle_get(sock, &sockaddr, &localaddr); + region.base = (unsigned char *) buf->base; + region.length = nrecv; + + INSIST(sock->rcb.recv != NULL); + sock->rcb.recv(nmhandle, ®ion, sock->rcbarg); + isc__nm_free_uvbuf(sock, buf); + + /* + * If the recv callback wants to hold on to the handle, + * it needs to attach to it. + */ + isc_nmhandle_unref(nmhandle); +} + +/* + * isc__nm_udp_send sends buf to a peer on a socket. + * It tries to find a proper sibling/child socket so that we won't have + * to jump to other thread. + */ +isc_result_t +isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, + isc_nm_cb_t cb, void *cbarg) +{ + isc_nmsocket_t *psock = NULL, *rsock = NULL; + isc_nmsocket_t *sock = handle->sock; + isc_sockaddr_t *peer = &handle->peer; + isc__netievent_udpsend_t *ievent; + isc__nm_uvreq_t *uvreq = NULL; + int ntid; + uint32_t maxudp = atomic_load(&sock->mgr->maxudp); + + /* + * Simulate a firewall blocking UDP packets bigger than + * 'maxudp' bytes. + */ + if (maxudp != 0 && region->length > maxudp) { + isc_nmhandle_unref(handle); + return (ISC_R_SUCCESS); + } + + if (sock->type == isc_nm_udpsocket) { + INSIST(sock->parent != NULL); + psock = sock->parent; + } else if (sock->type == isc_nm_udplistener) { + psock = sock; + } else { + isc_nmhandle_unref(handle); + return (ISC_R_UNEXPECTED); + } + + if (isc__nm_in_netthread()) { + ntid = isc_nm_tid(); + } else { + ntid = (int) isc_random_uniform(sock->nchildren); + } + + rsock = &psock->children[ntid]; + + uvreq = isc__nm_uvreq_get(sock->mgr, sock); + uvreq->uvbuf.base = (char *) region->base; + uvreq->uvbuf.len = region->length; + + uvreq->handle = handle; + isc_nmhandle_ref(uvreq->handle); + + uvreq->cb.send = cb; + uvreq->cbarg = cbarg; + + if (isc_nm_tid() == rsock->tid) { + /* + * If we're in the same thread as the socket we can send the + * data directly + */ + return (udp_send_direct(rsock, uvreq, peer)); + } else { + /* + * We need to create an event and pass it using async channel + */ + ievent = isc__nm_get_ievent(sock->mgr, netievent_udpsend); + ievent->sock = rsock; + ievent->peer = *peer; + ievent->req = uvreq; + + isc__nm_enqueue_ievent(&sock->mgr->workers[rsock->tid], + (isc__netievent_t *) ievent); + return (ISC_R_SUCCESS); + } +} + + +/* + * handle 'udpsend' async event - send a packet on the socket + */ +void +isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ievent0) { + isc__netievent_udpsend_t *ievent = + (isc__netievent_udpsend_t *) ievent0; + + REQUIRE(worker->id == ievent->sock->tid); + + if (atomic_load(&ievent->sock->active)) { + udp_send_direct(ievent->sock, ievent->req, &ievent->peer); + } else { + ievent->req->cb.send(ievent->req->handle, + ISC_R_CANCELED, ievent->req->cbarg); + isc__nm_uvreq_put(&ievent->req, ievent->req->sock); + } +} + +/* + * udp_send_cb - callback + */ +static void +udp_send_cb(uv_udp_send_t *req, int status) { + isc_result_t result = ISC_R_SUCCESS; + isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data; + + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + + if (status < 0) { + result = isc__nm_uverr2result(status); + } + + uvreq->cb.send(uvreq->handle, result, uvreq->cbarg); + isc_nmhandle_unref(uvreq->handle); + isc__nm_uvreq_put(&uvreq, uvreq->sock); +} + +/* + * udp_send_direct sends buf to a peer on a socket. Sock has to be in + * the same thread as the callee. + */ +static isc_result_t +udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, + isc_sockaddr_t *peer) +{ + int rv; + + REQUIRE(sock->tid == isc_nm_tid()); + REQUIRE(sock->type == isc_nm_udpsocket); + + isc_nmhandle_ref(req->handle); + rv = uv_udp_send(&req->uv_req.udp_send, + &sock->uv_handle.udp, &req->uvbuf, 1, + &peer->type.sa, udp_send_cb); + if (rv < 0) { + return (isc__nm_uverr2result(rv)); + } + + return (ISC_R_SUCCESS); +} diff --git a/lib/isc/netmgr/uverr2result.c b/lib/isc/netmgr/uverr2result.c new file mode 100644 index 0000000000..09a3d6ecd0 --- /dev/null +++ b/lib/isc/netmgr/uverr2result.c @@ -0,0 +1,91 @@ +/* + * Copyright (C) Internet Systems Consortium, Inc. ("ISC") + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * See the COPYRIGHT file distributed with this work for additional + * information regarding copyright ownership. + */ + +#include +#include + +#include +#include +#include +#include +#include + +#include "netmgr-int.h" + +/*% + * Convert a libuv error value into an isc_result_t. The + * list of supported error values is not complete; new users + * of this function should add any expected errors that are + * not already there. + */ +isc_result_t +isc___nm_uverr2result(int uverr, bool dolog, + const char *file, unsigned int line) +{ + switch (uverr) { + case UV_ENOTDIR: + case UV_ELOOP: + case UV_EINVAL: /* XXX sometimes this is not for files */ + case UV_ENAMETOOLONG: + case UV_EBADF: + return (ISC_R_INVALIDFILE); + case UV_ENOENT: + return (ISC_R_FILENOTFOUND); + case UV_EACCES: + case UV_EPERM: + return (ISC_R_NOPERM); + case UV_EEXIST: + return (ISC_R_FILEEXISTS); + case UV_EIO: + return (ISC_R_IOERROR); + case UV_ENOMEM: + return (ISC_R_NOMEMORY); + case UV_ENFILE: + case UV_EMFILE: + return (ISC_R_TOOMANYOPENFILES); + case UV_ENOSPC: + return (ISC_R_DISCFULL); + case UV_EPIPE: + case UV_ECONNRESET: + case UV_ECONNABORTED: + return (ISC_R_CONNECTIONRESET); + case UV_ENOTCONN: + return (ISC_R_NOTCONNECTED); + case UV_ETIMEDOUT: + return (ISC_R_TIMEDOUT); + case UV_ENOBUFS: + return (ISC_R_NORESOURCES); + case UV_EAFNOSUPPORT: + return (ISC_R_FAMILYNOSUPPORT); + case UV_ENETDOWN: + return (ISC_R_NETDOWN); + case UV_EHOSTDOWN: + return (ISC_R_HOSTDOWN); + case UV_ENETUNREACH: + return (ISC_R_NETUNREACH); + case UV_EHOSTUNREACH: + return (ISC_R_HOSTUNREACH); + case UV_EADDRINUSE: + return (ISC_R_ADDRINUSE); + case UV_EADDRNOTAVAIL: + return (ISC_R_ADDRNOTAVAIL); + case UV_ECONNREFUSED: + return (ISC_R_CONNREFUSED); + default: + if (dolog) { + UNEXPECTED_ERROR(file, line, + "unable to convert libuv " + "error code to isc_result: %d: %s", + uverr, uv_strerror(uverr)); + } + return (ISC_R_UNEXPECTED); + } +} diff --git a/lib/isc/task.c b/lib/isc/task.c index 88e5fb1d83..f921b5952e 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -1336,11 +1336,12 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, REQUIRE(managerp != NULL && *managerp == NULL); manager = isc_mem_get(mctx, sizeof(*manager)); - RUNTIME_CHECK(manager != NULL); - manager->common.impmagic = TASK_MANAGER_MAGIC; - manager->common.magic = ISCAPI_TASKMGR_MAGIC; + *manager = (isc__taskmgr_t) { + .common.impmagic = TASK_MANAGER_MAGIC, + .common.magic = ISCAPI_TASKMGR_MAGIC + }; + atomic_store(&manager->mode, isc_taskmgrmode_normal); - manager->mctx = NULL; isc_mutex_init(&manager->lock); isc_mutex_init(&manager->excl_lock); @@ -1363,8 +1364,6 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, atomic_init(&manager->tasks_ready, 0); atomic_init(&manager->curq, 0); atomic_init(&manager->exiting, false); - manager->excl = NULL; - manager->halted = 0; atomic_store_relaxed(&manager->exclusive_req, false); atomic_store_relaxed(&manager->pause_req, false); diff --git a/lib/isc/win32/include/isc/stdatomic.h b/lib/isc/win32/include/isc/stdatomic.h index b14d935d5b..62c2de14be 100644 --- a/lib/isc/win32/include/isc/stdatomic.h +++ b/lib/isc/win32/include/isc/stdatomic.h @@ -70,6 +70,7 @@ typedef int_fast32_t volatile atomic_int_fast32_t; typedef uint_fast32_t volatile atomic_uint_fast32_t; typedef int_fast64_t volatile atomic_int_fast64_t; typedef uint_fast64_t volatile atomic_uint_fast64_t; +typedef uintptr_t volatile atomic_uintptr_t; #define atomic_init(obj, desired) \ (*(obj) = (desired)) @@ -419,8 +420,7 @@ atomic_compare_exchange_abort() { succ, fail) \ : atomic_compare_exchange_abort()))) -#define atomic_compare_exchange_strong(obj, expected, desired, \ - succ, fail) \ +#define atomic_compare_exchange_strong(obj, expected, desired) \ atomic_compare_exchange_strong_explicit(obj, expected, desired, \ memory_order_seq_cst, \ memory_order_seq_cst) diff --git a/lib/isc/win32/libisc.def.in b/lib/isc/win32/libisc.def.in index 6df89c29ff..ba50ba1e75 100644 --- a/lib/isc/win32/libisc.def.in +++ b/lib/isc/win32/libisc.def.in @@ -435,6 +435,29 @@ isc_netaddr_setzone isc_netaddr_totext isc_netaddr_unspec isc_netscope_pton +isc_nmhandle_getdata +isc_nmhandle_getextra +isc_nmhandle_is_stream +isc_nmhandle_localaddr +isc_nmhandle_peeraddr +isc_nmhandle_ref +isc_nmhandle_setdata +isc_nmhandle_unref +isc_nm_destroy +isc_nm_detach +isc_nm_listentcpdns +isc_nm_listenudp +isc_nm_maxudp +isc_nm_send +isc_nm_start +isc_nmsocket_detach +isc_nm_tcpdns_sequential +isc_nm_tcpdns_stoplistening +isc_nm_tid +isc_nm_udp_stoplistening +isc__nm_acquire_interlocked +isc__nm_drop_interlocked +isc__nm_acquire_interlocked_force isc_nonce_buf isc_ntpaths_get isc_ntpaths_init diff --git a/lib/isc/win32/libisc.vcxproj.in b/lib/isc/win32/libisc.vcxproj.in index eed5338b85..798129677d 100644 --- a/lib/isc/win32/libisc.vcxproj.in +++ b/lib/isc/win32/libisc.vcxproj.in @@ -59,11 +59,11 @@ @IF PKCS11 BIND9;@PK11_LIB_LOCATION@WIN32;_DEBUG;_WINDOWS;_USRDLL;LIBISC_EXPORTS;%(PreprocessorDefinitions);%(PreprocessorDefinitions) ..\..\..\config.h - .\;..\..\..\;@LIBXML2_INC@@OPENSSL_INC@@ZLIB_INC@include;..\include;win32;..\..\isccfg\include;..\..\dns\win32\include;..\..\dns\include;%(AdditionalIncludeDirectories) + .\;..\..\..\;@LIBXML2_INC@@LIBUV_INC@@OPENSSL_INC@@ZLIB_INC@include;..\include;win32;..\..\isccfg\include;..\..\dns\win32\include;..\..\dns\include;%(AdditionalIncludeDirectories) @ELSE PKCS11 BIND9;WIN32;_DEBUG;_WINDOWS;_USRDLL;LIBISC_EXPORTS;%(PreprocessorDefinitions);%(PreprocessorDefinitions) ..\..\..\config.h - .\;..\..\..\;@LIBXML2_INC@@OPENSSL_INC@@ZLIB_INC@include;..\include;win32;..\..\isccfg\include;%(AdditionalIncludeDirectories) + .\;..\..\..\;@LIBXML2_INC@@LIBUV_INC@@OPENSSL_INC@@ZLIB_INC@include;..\include;win32;..\..\isccfg\include;%(AdditionalIncludeDirectories) @END PKCS11 true .\$(Configuration)\$(TargetName).pch @@ -77,7 +77,7 @@ Console true ..\..\..\Build\$(Configuration)\$(TargetName)$(TargetExt) - @OPENSSL_LIB@@LIBXML2_LIB@@ZLIB_LIB@ws2_32.lib;%(AdditionalDependencies) + @OPENSSL_LIB@@LIBUV_LIB@@LIBXML2_LIB@@ZLIB_LIB@ws2_32.lib;%(AdditionalDependencies) $(ProjectName).def .\$(Configuration)\$(ProjectName).lib @@ -96,6 +96,9 @@ echo Copying the OpenSSL DLL and LICENSE. copy @OPENSSL_DLL@ ..\Build\Debug\ copy @OPENSSL_PATH@\LICENSE ..\Build\Debug\OpenSSL-LICENSE +echo Copying libuv DLL. +copy @LIBUV_DLL@ ..\Build\Debug\ + @IF LIBXML2 echo Copying the libxml DLL. @@ -148,11 +151,11 @@ copy InstallFiles ..\Build\Debug\ @IF PKCS11 BIND9;@PK11_LIB_LOCATION@WIN32;NDEBUG;_WINDOWS;_USRDLL;LIBISC_EXPORTS;%(PreprocessorDefinitions);%(PreprocessorDefinitions) ..\..\..\config.h - .\;..\..\..\;@LIBXML2_INC@@OPENSSL_INC@@ZLIB_INC@include;..\include;win32;..\..\isccfg\include;..\..\dns\win32\include;..\..\dns\include;%(AdditionalIncludeDirectories) + .\;..\..\..\;@LIBXML2_INC@@LIBUV_INC@@OPENSSL_INC@@ZLIB_INC@include;..\include;win32;..\..\isccfg\include;..\..\dns\win32\include;..\..\dns\include;%(AdditionalIncludeDirectories) @ELSE PKCS11 BIND9;WIN32;_DEBUG;_WINDOWS;_USRDLL;LIBISC_EXPORTS;%(PreprocessorDefinitions);%(PreprocessorDefinitions) ..\..\..\config.h - .\;..\..\..\;@LIBXML2_INC@@OPENSSL_INC@@ZLIB_INC@include;..\include;win32;..\..\isccfg\include;%(AdditionalIncludeDirectories) + .\;..\..\..\;@LIBXML2_INC@@LIBUV_INC@@OPENSSL_INC@@ZLIB_INC@include;..\include;win32;..\..\isccfg\include;%(AdditionalIncludeDirectories) @END PKCS11 OnlyExplicitInline false @@ -169,7 +172,7 @@ copy InstallFiles ..\Build\Debug\ true true ..\..\..\Build\$(Configuration)\$(TargetName)$(TargetExt) - @OPENSSL_LIB@@LIBXML2_LIB@@ZLIB_LIB@ws2_32.lib;%(AdditionalDependencies) + @OPENSSL_LIB@@LIBUV_LIB@@LIBXML2_LIB@@ZLIB_LIB@ws2_32.lib;%(AdditionalDependencies) $(ProjectName).def .\$(Configuration)\$(ProjectName).lib Default @@ -239,6 +242,9 @@ echo Copying the OpenSSL DLL and LICENSE. copy @OPENSSL_DLL@ ..\Build\Release\ copy @OPENSSL_PATH@\LICENSE ..\Build\Release\OpenSSL-LICENSE +echo Copying libuv DLL. +copy @LIBUV_DLL@ ..\Build\Debug\ + @IF LIBXML2 echo Copying the libxml DLL. @@ -436,6 +442,11 @@ copy InstallFiles ..\Build\Release\ + + + + + diff --git a/util/copyrights b/util/copyrights index b2be20d76f..29b3c2efcd 100644 --- a/util/copyrights +++ b/util/copyrights @@ -2201,6 +2201,7 @@ ./lib/isc/include/isc/mutexatomic.h C 2019 ./lib/isc/include/isc/mutexblock.h C 1999,2000,2001,2004,2005,2006,2007,2016,2018,2019 ./lib/isc/include/isc/netaddr.h C 1998,1999,2000,2001,2002,2004,2005,2006,2007,2009,2015,2016,2017,2018,2019 +./lib/isc/include/isc/netmgr.h C 2019 ./lib/isc/include/isc/netscope.h C 2002,2004,2005,2006,2007,2009,2016,2018,2019 ./lib/isc/include/isc/nonce.h C 2018,2019 ./lib/isc/include/isc/os.h C 2000,2001,2004,2005,2006,2007,2016,2018,2019 @@ -2255,6 +2256,12 @@ ./lib/isc/mem_p.h C 2018,2019 ./lib/isc/mutexblock.c C 1999,2000,2001,2004,2005,2007,2011,2012,2016,2018,2019 ./lib/isc/netaddr.c C 1999,2000,2001,2002,2004,2005,2007,2010,2011,2012,2014,2015,2016,2017,2018,2019 +./lib/isc/netmgr/netmgr-int.h C 2019 +./lib/isc/netmgr/netmgr.c C 2019 +./lib/isc/netmgr/tcp.c C 2019 +./lib/isc/netmgr/tcpdns.c C 2019 +./lib/isc/netmgr/udp.c C 2019 +./lib/isc/netmgr/uverr2result.c C 2019 ./lib/isc/netscope.c C 2002,2004,2005,2006,2007,2016,2018,2019 ./lib/isc/nonce.c C 2018,2019 ./lib/isc/openssl_shim.c C 2018,2019 diff --git a/win32utils/Configure b/win32utils/Configure index cd97fd760e..df52ef0fc9 100644 --- a/win32utils/Configure +++ b/win32utils/Configure @@ -253,6 +253,7 @@ my @substinc = ("GSSAPI_INC", "GEOIP_INC", "IDN_INC", "LIBXML2_INC", + "LIBUV_INC", "OPENSSL_INC", "READLINE_INC", "ZLIB_INC"); @@ -266,6 +267,7 @@ my @substlib = ("GSSAPI_LIB", "IDN_LIB", "KRB5_LIB", "LIBXML2_LIB", + "LIBUV_LIB", "OPENSSL_LIB", "READLINE_LIB", "READLINE_LIBD", @@ -282,6 +284,7 @@ my @substdll = ("COMERR_DLL", "KRB5_DLL", "K5SPRT_DLL", "LIBXML2_DLL", + "LIBUV_DLL", "OPENSSL_DLL", "WSHELP_DLL", "ZLIB_DLL"); @@ -379,6 +382,7 @@ my @withlist = ("aes", "system-tests", "tests", "tuning", + "libuv", "vcredist", "zlib"); @@ -418,6 +422,7 @@ my @help = ( " with-system-tests build with system test suite\n", " with-samples build with sample programs\n", " with-openssl[=PATH] build with OpenSSL yes|path (mandatory)\n", +" with-libuv[=PATH] build with libuv yes|path (mandatory)\n", " with-pkcs11[=PATH] build with PKCS#11 support yes|no|provider-path\n", " with-eddsa crypto EDDSA yes|all|no\n", " with-gssapi[=PATH] build with MIT KfW GSSAPI yes|no|path\n", @@ -461,6 +466,8 @@ my $use_tests = "no"; my $use_xtests = "no"; my $use_stests = "no"; my $use_samples = "no"; +my $use_libuv = "auto"; +my $libuv_path = "..\\..\\"; my $use_openssl = "auto"; my $openssl_path = "..\\..\\"; my $use_pkcs11 = "no"; @@ -736,6 +743,13 @@ sub mywith { $use_openssl = "yes"; $openssl_path = $val; } + } elsif ($key =~ /^libuv$/i) { + if ($val =~ /^no$/i) { + die "libuv is required\n"; + } elsif ($val !~ /^yes$/i) { + $use_libuv = "yes"; + $libuv_path = $val; + } } elsif ($key =~ /^pkcs11$/i) { if ($val =~ /^yes$/i) { $use_pkcs11 = "yes"; @@ -937,6 +951,7 @@ if ($verbose) { } else { print "querytrace: disabled\n"; } + print "libuv-path: $libuv_path\n"; print "openssl-path: $openssl_path\n"; if ($use_tests eq "yes") { print "tests: enabled\n"; @@ -1280,6 +1295,65 @@ if ($use_samples eq "yes") { $configcond{"SAMPLES"} = 1; } +# with-libuv +if ($use_libuv eq "auto") { + if ($verbose) { + print "checking for an libuv built directory at sibling root\n"; + } + opendir DIR, $libuv_path || die "No Directory: $!\n"; + my @dirlist = grep (/^libuv-v[0-9]+\.[0-9]+\.[0-9]+(-rc[0-9]+){0,1}$/i, + readdir(DIR)); + closedir(DIR); + + # Make sure we have something + if (scalar(@dirlist) == 0) { + die "can't find an libuv at sibling root\n"; + } + # Now see if we have a directory or just a file. + # Make sure we are case insensitive + my $file; + foreach $file (sort {uc($b) cmp uc($a)} @dirlist) { + if (-f File::Spec->catfile($libuv_path, + $file, + "include\\uv.h")) { + $libuv_path = File::Spec->catdir($libuv_path, $file); + $use_libuv = "yes"; + last; + } + } + + # If we have one use it otherwise report the error + if ($use_libuv eq "auto") { + die "can't find an libuv built directory at sibling root\n"; + } +} +# falls into (so no else) +if ($use_libuv eq "yes") { + $libuv_path = File::Spec->rel2abs($libuv_path); + if ($verbose) { + print "checking for libuv built directory at \"$libuv_path\"\n"; + } + my $libuv_new = 0; + if (!-f File::Spec->catfile($libuv_path, + "include\\uv.h")) { + die "can't find libuv uv.h include\n"; + } + my $libuv_inc = File::Spec->catdir($libuv_path, "include"); + my $libuv_libdir = File::Spec->catdir($libuv_path, "Release"); + my $libuv_lib = File::Spec->catfile($libuv_libdir, "libuv.lib"); + my $libuv_dll = File::Spec->catfile($libuv_libdir, "libuv.dll"); + if (!-f $libuv_lib) { + die "can't find libuv.lib library\n"; + } + if (!-f $libuv_dll) { + die "can't find libuv.dll library\n"; + } + $configvar{"LIBUV_PATH"} = "$libuv_path"; + $configinc{"LIBUV_INC"} = "$libuv_inc"; + $configlib{"LIBUV_LIB"} = "$libuv_lib"; + $configdll{"LIBUV_DLL"} = "$libuv_dll"; +} + # with-openssl if ($use_openssl eq "auto") { if ($verbose) {