diff --git a/CHANGES b/CHANGES index ebbf5222f4..e76cf6da5d 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,8 @@ +5096. [func] Use multiple event loops in socket code, and + make network threads CPU-affinitive. This + significantly improves performance on large + systems. [GL #666] + 5095. [test] Converted all unit tests from ATF to CMocka; removed the source code for the ATF libraries. Build with "configure --with-cmocka" to enable diff --git a/bin/named/main.c b/bin/named/main.c index b9fadb0991..7c1d1fa6f1 100644 --- a/bin/named/main.c +++ b/bin/named/main.c @@ -793,10 +793,7 @@ create_managers(void) { named_g_udpdisp = 1; #else if (named_g_udpdisp == 0) { - if (named_g_cpus_detected == 1) - named_g_udpdisp = 1; - else - named_g_udpdisp = named_g_cpus_detected - 1; + named_g_udpdisp = named_g_cpus_detected; } if (named_g_udpdisp > named_g_cpus) named_g_udpdisp = named_g_cpus; @@ -824,7 +821,7 @@ create_managers(void) { } result = isc_socketmgr_create2(named_g_mctx, &named_g_socketmgr, - maxsocks); + maxsocks, named_g_cpus); if (result != ISC_R_SUCCESS) { UNEXPECTED_ERROR(__FILE__, __LINE__, "isc_socketmgr_create() failed: %s", diff --git a/bin/tests/system/fetchlimit/tests.sh b/bin/tests/system/fetchlimit/tests.sh index f62825a25b..55309d7b3c 100644 --- a/bin/tests/system/fetchlimit/tests.sh +++ b/bin/tests/system/fetchlimit/tests.sh @@ -157,7 +157,7 @@ status=`expr $status + $ret` copy_setports ns3/named3.conf.in ns3/named.conf $RNDCCMD reconfig 2>&1 | sed 's/^/ns3 /' | cat_i -echo_i "checking lame server clients are dropped at the soft limit" +echo_i "checking lame server clients are dropped near the soft limit" ret=0 fail=0 exceeded=0 diff --git a/bin/tests/system/runtime/tests.sh b/bin/tests/system/runtime/tests.sh index 9dc43acd1d..615a6ae967 100644 --- a/bin/tests/system/runtime/tests.sh +++ b/bin/tests/system/runtime/tests.sh @@ -24,28 +24,6 @@ grep "another named process" ns2/named.run > /dev/null && ret=1 if [ $ret != 0 ]; then echo_i "failed"; fi status=`expr $status + $ret` -if [ ! "$CYGWIN" ]; then - n=`expr $n + 1` - echo_i "verifying that named checks for conflicting listeners ($n)" - ret=0 - (cd ns2; $NAMED -c named-alt1.conf -D ns2-extra-1 -X other.lock -m record,size,mctx -d 99 -g -U 4 >> named2.run 2>&1 & ) - for i in 1 2 3 4 5 6 7 8 9 - do - grep "unable to listen on any configured interface" ns2/named2.run > /dev/null && break - sleep 1 - done - grep "unable to listen on any configured interface" ns2/named2.run > /dev/null || ret=1 - for i in 1 2 3 4 5 6 7 8 9 - do - grep "exiting (due to fatal error)" ns2/named2.run > /dev/null && break - sleep 1 - done - pid=`cat ns2/named2.pid 2>/dev/null` - test "${pid:+set}" = set && $KILL -15 ${pid} >/dev/null 2>&1 - if [ $ret != 0 ]; then echo_i "failed"; fi - status=`expr $status + $ret` -fi - n=`expr $n + 1` echo_i "verifying that named checks for conflicting named processes ($n)" ret=0 diff --git a/doc/arm/notes.xml b/doc/arm/notes.xml index 04e226141a..d3a0750246 100644 --- a/doc/arm/notes.xml +++ b/doc/arm/notes.xml @@ -131,6 +131,15 @@
New Features + + + Task manager and socket code have been substantially modified. + The manager uses per-cpu queues for tasks and network stack runs + multiple event loops in CPU-affinitive threads. This greatly + improves performance on large systems, especially when using + multi-queue NICs. + + A new secondary zone option, mirror, diff --git a/lib/dns/dispatch.c b/lib/dns/dispatch.c index e46cd055cd..57c94ba403 100644 --- a/lib/dns/dispatch.c +++ b/lib/dns/dispatch.c @@ -299,7 +299,8 @@ static isc_result_t get_udpsocket(dns_dispatchmgr_t *mgr, isc_socketmgr_t *sockmgr, const isc_sockaddr_t *localaddr, isc_socket_t **sockp, - isc_socket_t *dup_socket); + isc_socket_t *dup_socket, + bool duponly); static isc_result_t dispatch_createudp(dns_dispatchmgr_t *mgr, isc_socketmgr_t *sockmgr, isc_taskmgr_t *taskmgr, @@ -317,7 +318,7 @@ static void qid_destroy(isc_mem_t *mctx, dns_qid_t **qidp); static isc_result_t open_socket(isc_socketmgr_t *mgr, const isc_sockaddr_t *local, unsigned int options, isc_socket_t **sockp, - isc_socket_t *dup_socket); + isc_socket_t *dup_socket, bool duponly); static bool portavailable(dns_dispatchmgr_t *mgr, isc_socket_t *sock, isc_sockaddr_t *sockaddrp); @@ -728,7 +729,7 @@ get_dispsocket(dns_dispatch_t *disp, const isc_sockaddr_t *dest, if (portentry != NULL) bindoptions |= ISC_SOCKET_REUSEADDRESS; result = open_socket(sockmgr, &localaddr, bindoptions, &sock, - NULL); + NULL, false); if (result == ISC_R_SUCCESS) { if (portentry == NULL) { portentry = new_portentry(disp, port); @@ -1667,7 +1668,7 @@ destroy_mgr(dns_dispatchmgr_t **mgrp) { static isc_result_t open_socket(isc_socketmgr_t *mgr, const isc_sockaddr_t *local, unsigned int options, isc_socket_t **sockp, - isc_socket_t *dup_socket) + isc_socket_t *dup_socket, bool duponly) { isc_socket_t *sock; isc_result_t result; @@ -1675,12 +1676,16 @@ open_socket(isc_socketmgr_t *mgr, const isc_sockaddr_t *local, sock = *sockp; if (sock != NULL) { result = isc_socket_open(sock); - if (result != ISC_R_SUCCESS) + if (result != ISC_R_SUCCESS) { return (result); - } else if (dup_socket != NULL) { + } + } else if (dup_socket != NULL && + (!isc_socket_hasreuseport() || duponly)) + { result = isc_socket_dup(dup_socket, &sock); - if (result != ISC_R_SUCCESS) + if (result != ISC_R_SUCCESS) { return (result); + } isc_socket_setname(sock, "dispatcher", NULL); *sockp = sock; @@ -1688,8 +1693,9 @@ open_socket(isc_socketmgr_t *mgr, const isc_sockaddr_t *local, } else { result = isc_socket_create(mgr, isc_sockaddr_pf(local), isc_sockettype_udp, &sock); - if (result != ISC_R_SUCCESS) + if (result != ISC_R_SUCCESS) { return (result); + } } isc_socket_setname(sock, "dispatcher", NULL); @@ -1699,9 +1705,9 @@ open_socket(isc_socketmgr_t *mgr, const isc_sockaddr_t *local, #endif result = isc_socket_bind(sock, local, options); if (result != ISC_R_SUCCESS) { - if (*sockp == NULL) + if (*sockp == NULL) { isc_socket_detach(&sock); - else { + } else { isc_socket_close(sock); } return (result); @@ -2757,7 +2763,7 @@ dns_dispatch_getudp(dns_dispatchmgr_t *mgr, isc_socketmgr_t *sockmgr, static isc_result_t get_udpsocket(dns_dispatchmgr_t *mgr, dns_dispatch_t *disp, isc_socketmgr_t *sockmgr, const isc_sockaddr_t *localaddr, - isc_socket_t **sockp, isc_socket_t *dup_socket) + isc_socket_t **sockp, isc_socket_t *dup_socket, bool duponly) { unsigned int i, j; isc_socket_t *held[DNS_DISPATCH_HELD]; @@ -2795,7 +2801,7 @@ get_udpsocket(dns_dispatchmgr_t *mgr, dns_dispatch_t *disp, prt = ports[isc_random_uniform(nports)]; isc_sockaddr_setport(&localaddr_bound, prt); result = open_socket(sockmgr, &localaddr_bound, - 0, &sock, NULL); + 0, &sock, NULL, false); /* * Continue if the port choosen is already in use * or the OS has reserved it. @@ -2816,7 +2822,7 @@ get_udpsocket(dns_dispatchmgr_t *mgr, dns_dispatch_t *disp, /* Allow to reuse address for non-random ports. */ result = open_socket(sockmgr, localaddr, ISC_SOCKET_REUSEADDRESS, &sock, - dup_socket); + dup_socket, duponly); if (result == ISC_R_SUCCESS) *sockp = sock; @@ -2828,7 +2834,7 @@ get_udpsocket(dns_dispatchmgr_t *mgr, dns_dispatch_t *disp, i = 0; for (j = 0; j < 0xffffU; j++) { - result = open_socket(sockmgr, localaddr, 0, &sock, NULL); + result = open_socket(sockmgr, localaddr, 0, &sock, NULL, false); if (result != ISC_R_SUCCESS) goto end; else if (portavailable(mgr, sock, NULL)) @@ -2872,22 +2878,27 @@ dispatch_createudp(dns_dispatchmgr_t *mgr, isc_socketmgr_t *sockmgr, dns_dispatch_t *disp; isc_socket_t *sock = NULL; int i = 0; + bool duponly = ((attributes & DNS_DISPATCHATTR_CANREUSE) == 0); + /* This is an attribute needed only at creation time */ + attributes &= ~DNS_DISPATCHATTR_CANREUSE; /* * dispatch_allocate() checks mgr for us. */ disp = NULL; result = dispatch_allocate(mgr, maxrequests, &disp); - if (result != ISC_R_SUCCESS) + if (result != ISC_R_SUCCESS) { return (result); + } disp->socktype = isc_sockettype_udp; if ((attributes & DNS_DISPATCHATTR_EXCLUSIVE) == 0) { result = get_udpsocket(mgr, disp, sockmgr, localaddr, &sock, - dup_socket); - if (result != ISC_R_SUCCESS) + dup_socket, duponly); + if (result != ISC_R_SUCCESS) { goto deallocate_dispatch; + } if (isc_log_wouldlog(dns_lctx, 90)) { char addrbuf[ISC_SOCKADDR_FORMATSIZE]; @@ -2910,35 +2921,42 @@ dispatch_createudp(dns_dispatchmgr_t *mgr, isc_socketmgr_t *sockmgr, */ isc_sockaddr_anyofpf(&sa_any, isc_sockaddr_pf(localaddr)); if (!isc_sockaddr_eqaddr(&sa_any, localaddr)) { - result = open_socket(sockmgr, localaddr, 0, &sock, NULL); - if (sock != NULL) + result = open_socket(sockmgr, localaddr, 0, + &sock, NULL, false); + if (sock != NULL) { isc_socket_detach(&sock); - if (result != ISC_R_SUCCESS) + } + if (result != ISC_R_SUCCESS) { goto deallocate_dispatch; + } } disp->port_table = isc_mem_get(mgr->mctx, sizeof(disp->port_table[0]) * DNS_DISPATCH_PORTTABLESIZE); - if (disp->port_table == NULL) + if (disp->port_table == NULL) { goto deallocate_dispatch; - for (i = 0; i < DNS_DISPATCH_PORTTABLESIZE; i++) + } + for (i = 0; i < DNS_DISPATCH_PORTTABLESIZE; i++) { ISC_LIST_INIT(disp->port_table[i]); + } result = isc_mempool_create(mgr->mctx, sizeof(dispportentry_t), &disp->portpool); - if (result != ISC_R_SUCCESS) + if (result != ISC_R_SUCCESS) { goto deallocate_dispatch; + } isc_mempool_setname(disp->portpool, "disp_portpool"); isc_mempool_setfreemax(disp->portpool, 128); } disp->socket = sock; disp->local = *localaddr; - if ((attributes & DNS_DISPATCHATTR_EXCLUSIVE) != 0) + if ((attributes & DNS_DISPATCHATTR_EXCLUSIVE) != 0) { disp->ntasks = MAX_INTERNAL_TASKS; - else + } else { disp->ntasks = 1; + } for (i = 0; i < disp->ntasks; i++) { disp->task[i] = NULL; result = isc_task_create(taskmgr, 50, &disp->task[i]); @@ -2970,8 +2988,9 @@ dispatch_createudp(dns_dispatchmgr_t *mgr, isc_socketmgr_t *sockmgr, } result = isc_mutex_init(&disp->sepool_lock); - if (result != ISC_R_SUCCESS) + if (result != ISC_R_SUCCESS) { goto kill_sepool; + } isc_mempool_setname(disp->sepool, "disp_sepool"); isc_mempool_setmaxalloc(disp->sepool, 32768); @@ -2990,8 +3009,9 @@ dispatch_createudp(dns_dispatchmgr_t *mgr, isc_socketmgr_t *sockmgr, mgr_log(mgr, LVL(90), "created UDP dispatcher %p", disp); dispatch_log(disp, LVL(90), "created task %p", disp->task[0]); /* XXX */ - if (disp->socket != NULL) + if (disp->socket != NULL) { dispatch_log(disp, LVL(90), "created socket %p", disp->socket); + } *dispp = disp; @@ -3005,11 +3025,13 @@ dispatch_createudp(dns_dispatchmgr_t *mgr, isc_socketmgr_t *sockmgr, kill_ctlevent: isc_event_free(&disp->ctlevent); kill_task: - for (i = 0; i < disp->ntasks; i++) + for (i = 0; i < disp->ntasks; i++) { isc_task_detach(&disp->task[i]); + } kill_socket: - if (disp->socket != NULL) + if (disp->socket != NULL) { isc_socket_detach(&disp->socket); + } deallocate_dispatch: dispatch_free(&disp); diff --git a/lib/dns/include/dns/dispatch.h b/lib/dns/include/dns/dispatch.h index c6293121cd..56e4ba76a6 100644 --- a/lib/dns/include/dns/dispatch.h +++ b/lib/dns/include/dns/dispatch.h @@ -140,6 +140,7 @@ struct dns_dispatchset { #define DNS_DISPATCHATTR_CONNECTED 0x00000080U #define DNS_DISPATCHATTR_FIXEDID 0x00000100U #define DNS_DISPATCHATTR_EXCLUSIVE 0x00000200U +#define DNS_DISPATCHATTR_CANREUSE 0x00000400U /*@}*/ /* diff --git a/lib/isc/include/isc/socket.h b/lib/isc/include/isc/socket.h index 95d70b13f1..98fa51bfd6 100644 --- a/lib/isc/include/isc/socket.h +++ b/lib/isc/include/isc/socket.h @@ -243,8 +243,7 @@ typedef enum { isc_sockettype_udp = 1, isc_sockettype_tcp = 2, isc_sockettype_unix = 3, - isc_sockettype_fdwatch = 4, - isc_sockettype_raw = 5 + isc_sockettype_raw = 4 } isc_sockettype_t; /*@{*/ @@ -275,14 +274,6 @@ typedef enum { #define ISC_SOCKFLAG_NORETRY 0x00000002 /*%< drop failed UDP sends */ /*@}*/ -/*@{*/ -/*! - * Flags for fdwatchcreate. - */ -#define ISC_SOCKFDWATCH_READ 0x00000001 /*%< watch for readable */ -#define ISC_SOCKFDWATCH_WRITE 0x00000002 /*%< watch for writable */ -/*@}*/ - /*% * This structure is actually just the common prefix of a socket manager * object implementation's version of an isc_socketmgr_t. @@ -329,76 +320,6 @@ struct isc_socket { *** those functions which return an isc_result. ***/ -isc_result_t -isc_socket_fdwatchcreate(isc_socketmgr_t *manager, - int fd, - int flags, - isc_sockfdwatch_t callback, - void *cbarg, - isc_task_t *task, - isc_socket_t **socketp); -/*%< - * Create a new file descriptor watch socket managed by 'manager'. - * - * Note: - * - *\li 'fd' is the already-opened file descriptor (must be less - * than maxsockets). - *\li This function is not available on Windows. - *\li The callback function is called "in-line" - this means the function - * needs to return as fast as possible, as all other I/O will be suspended - * until the callback completes. - * - * Requires: - * - *\li 'manager' is a valid manager - * - *\li 'socketp' is a valid pointer, and *socketp == NULL - * - *\li 'fd' be opened. - * - * Ensures: - * - * '*socketp' is attached to the newly created fdwatch socket - * - * Returns: - * - *\li #ISC_R_SUCCESS - *\li #ISC_R_NOMEMORY - *\li #ISC_R_NORESOURCES - *\li #ISC_R_UNEXPECTED - *\li #ISC_R_RANGE - */ - -isc_result_t -isc_socket_fdwatchpoke(isc_socket_t *sock, - int flags); -/*%< - * Poke a file descriptor watch socket informing the manager that it - * should restart watching the socket - * - * Note: - * - *\li 'sock' is the socket returned by isc_socket_fdwatchcreate - * - *\li 'flags' indicates what the manager should watch for on the socket - * in addition to what it may already be watching. It can be one or - * both of ISC_SOCKFDWATCH_READ and ISC_SOCKFDWATCH_WRITE. To - * temporarily disable watching on a socket the value indicating - * no more data should be returned from the call back routine. - * - *\li This function is not available on Windows. - * - * Requires: - * - *\li 'sock' is a valid isc socket - * - * - * Returns: - * - *\li #ISC_R_SUCCESS - */ - isc_result_t isc_socket_create(isc_socketmgr_t *manager, int pf, @@ -407,9 +328,6 @@ isc_socket_create(isc_socketmgr_t *manager, /*%< * Create a new 'type' socket managed by 'manager'. * - * For isc_sockettype_fdwatch sockets you should use isc_socket_fdwatchcreate() - * rather than isc_socket_create(). - * * Note: * *\li 'pf' is the desired protocol family, e.g. PF_INET or PF_INET6. @@ -420,8 +338,6 @@ isc_socket_create(isc_socketmgr_t *manager, * *\li 'socketp' is a valid pointer, and *socketp == NULL * - *\li 'type' is not isc_sockettype_fdwatch - * * Ensures: * * '*socketp' is attached to the newly created socket @@ -551,17 +467,12 @@ isc_socket_open(isc_socket_t *sock); * one. This optimization may not be available for some systems, in which * case this function will return ISC_R_NOTIMPLEMENTED and must not be used. * - * isc_socket_open() should not be called on sockets created by - * isc_socket_fdwatchcreate(). - * * Requires: * * \li there must be no other reference to this socket. * * \li 'socket' is a valid and previously closed by isc_socket_close() * - * \li 'sock->type' is not isc_sockettype_fdwatch - * * Returns: * Same as isc_socket_create(). * \li ISC_R_NOTIMPLEMENTED @@ -577,9 +488,6 @@ isc_socket_close(isc_socket_t *sock); * systems, in which case this function will return ISC_R_NOTIMPLEMENTED and * must not be used. * - * isc_socket_close() should not be called on sockets created by - * isc_socket_fdwatchcreate(). - * * Requires: * * \li The socket must have a valid descriptor. @@ -588,8 +496,6 @@ isc_socket_close(isc_socket_t *sock); * * \li There must be no pending I/O requests. * - * \li 'sock->type' is not isc_sockettype_fdwatch - * * Returns: * \li #ISC_R_NOTIMPLEMENTED */ @@ -907,7 +813,7 @@ isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp); isc_result_t isc_socketmgr_create2(isc_mem_t *mctx, isc_socketmgr_t **managerp, - unsigned int maxsocks); + unsigned int maxsocks, int nthreads); /*%< * Create a socket manager. If "maxsocks" is non-zero, it specifies the * maximum number of sockets that the created manager should handle. @@ -1104,6 +1010,12 @@ isc_socketmgr_maxudp(isc_socketmgr_t *mgr, int maxudp); * Test interface. Drop UDP packet > 'maxudp'. */ +bool +isc_socket_hasreuseport(void); +/*%< + * Return true if there is SO_REUSEPORT support + */ + #ifdef HAVE_LIBXML2 int isc_socketmgr_renderxml(isc_socketmgr_t *mgr, xmlTextWriterPtr writer); diff --git a/lib/isc/pthreads/thread.c b/lib/isc/pthreads/thread.c index 94801d5085..b882838f8c 100644 --- a/lib/isc/pthreads/thread.c +++ b/lib/isc/pthreads/thread.c @@ -110,7 +110,8 @@ isc_thread_setaffinity(int cpu) { CPU_ZERO(&cpuset); CPU_SET(cpu, &cpuset); if (cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID, -1, - &cpuset, sizeof(cpuset)) != 0) { + sizeof(cpuset), &cpuset) != 0) + { return (ISC_R_FAILURE); } #elif defined(HAVE_PTHREAD_SETAFFINITY_NP) @@ -118,7 +119,8 @@ isc_thread_setaffinity(int cpu) { CPU_ZERO(&set); CPU_SET(cpu, &set); if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), - &set) != 0) { + &set) != 0) + { return (ISC_R_FAILURE); } #elif defined(HAVE_PROCESSOR_BIND) diff --git a/lib/isc/task.c b/lib/isc/task.c index 3ab6792aee..c2d00feffd 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -205,7 +205,7 @@ wake_all_queues(isc__taskmgr_t *manager); static inline void wake_all_queues(isc__taskmgr_t *manager) { - for (unsigned i=0; i < manager->workers; i++) { + for (unsigned int i = 0; i < manager->workers; i++) { LOCK(&manager->queues[i].lock); BROADCAST(&manager->queues[i].work_available); UNLOCK(&manager->queues[i].lock); @@ -379,8 +379,9 @@ task_ready(isc__task_t *task) { XTRACE("task_ready"); LOCK(&manager->queues[task->threadid].lock); push_readyq(manager, task, task->threadid); - if (manager->mode == isc_taskmgrmode_normal || has_privilege) + if (manager->mode == isc_taskmgrmode_normal || has_privilege) { SIGNAL(&manager->queues[task->threadid].work_available); + } UNLOCK(&manager->queues[task->threadid].lock); } @@ -898,16 +899,18 @@ static inline isc__task_t * pop_readyq(isc__taskmgr_t *manager, int c) { isc__task_t *task; - if (manager->mode == isc_taskmgrmode_normal) + if (manager->mode == isc_taskmgrmode_normal) { task = HEAD(manager->queues[c].ready_tasks); - else + } else { task = HEAD(manager->queues[c].ready_priority_tasks); + } if (task != NULL) { DEQUEUE(manager->queues[c].ready_tasks, task, ready_link); - if (ISC_LINK_LINKED(task, ready_priority_link)) + if (ISC_LINK_LINKED(task, ready_priority_link)) { DEQUEUE(manager->queues[c].ready_priority_tasks, task, ready_priority_link); + } } return (task); @@ -922,9 +925,10 @@ pop_readyq(isc__taskmgr_t *manager, int c) { static inline void push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) { ENQUEUE(manager->queues[c].ready_tasks, task, ready_link); - if ((task->flags & TASK_F_PRIVILEGED) != 0) + if ((task->flags & TASK_F_PRIVILEGED) != 0) { ENQUEUE(manager->queues[c].ready_priority_tasks, task, ready_priority_link); + } atomic_fetch_add_explicit(&manager->tasks_ready, 1, memory_order_acquire); } @@ -1001,19 +1005,26 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) { * If a pause has been requested, don't do any work * until it's been released. */ - while ((empty_readyq(manager, threadid) && !manager->pause_requested && - !manager->exclusive_requested) && !FINISHED(manager)) + while ((empty_readyq(manager, threadid) && + !manager->pause_requested && + !manager->exclusive_requested) && + !FINISHED(manager)) { XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_WAIT, "wait")); XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_WAIT, manager->pause_requested ? "paused" : "notpaused")); + ISC_MSG_WAIT, + manager->pause_requested + ? "paused" : "notpaused")); XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_WAIT, manager->exclusive_requested ? "excreq" : "notexcreq")); - WAIT(&manager->queues[threadid].work_available, &manager->queues[threadid].lock); + ISC_MSG_WAIT, + manager->exclusive_requested + ? "excreq" : "notexcreq")); + WAIT(&manager->queues[threadid].work_available, + &manager->queues[threadid].lock); XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK, ISC_MSG_AWAKE, "awake")); @@ -1027,19 +1038,24 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) { ISC_MSG_WORKING, "halting")); /* - * Switching to exclusive mode is done as a 2-phase-lock, - * checking if we have to switch is done without any locks - * on pause_requested and exclusive_requested to save time - - * the worst thing that can happen is that we'll launch one task - * more and exclusive task will be postponed a bit. + * Switching to exclusive mode is done as a + * 2-phase-lock, checking if we have to switch is + * done without any locks on pause_requested and + * exclusive_requested to save time - the worst + * thing that can happen is that we'll launch one + * task more and exclusive task will be postponed a + * bit. * - * Broadcasting on halt_cond seems suboptimal, but exclusive tasks - * are rare enought that we don't care. + * Broadcasting on halt_cond seems suboptimal, but + * exclusive tasks are rare enought that we don't + * care. */ LOCK(&manager->halt_lock); manager->halted++; BROADCAST(&manager->halt_cond); - while (manager->pause_requested || manager->exclusive_requested) { + while (manager->pause_requested || + manager->exclusive_requested) + { WAIT(&manager->halt_cond, &manager->halt_lock); } manager->halted--; @@ -1067,8 +1083,9 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) { * lock before exiting the 'if (task != NULL)' block. */ UNLOCK(&manager->queues[threadid].lock); - RUNTIME_CHECK(atomic_fetch_sub_explicit(&manager->tasks_ready, - 1, memory_order_release) > 0); + RUNTIME_CHECK( + atomic_fetch_sub_explicit(&manager->tasks_ready, + 1, memory_order_release) > 0); atomic_fetch_add_explicit(&manager->tasks_running, 1, memory_order_acquire); @@ -1184,7 +1201,8 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) { if (finished) task_finished(task); - RUNTIME_CHECK(atomic_fetch_sub_explicit(&manager->tasks_running, + RUNTIME_CHECK( + atomic_fetch_sub_explicit(&manager->tasks_running, 1, memory_order_release) > 0); LOCK(&manager->queues[threadid].lock); if (requeue) { @@ -1236,7 +1254,7 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) { { bool empty = true; unsigned int i; - for (i=0; iworkers && empty; i++) + for (i = 0; i < manager->workers && empty; i++) { LOCK(&manager->queues[i].lock); empty &= empty_readyq(manager, i); @@ -1286,7 +1304,7 @@ run(void *queuep) { static void manager_free(isc__taskmgr_t *manager) { - for (unsigned int i=0; i < manager->workers; i++) { + for (unsigned int i = 0; i < manager->workers; i++) { DESTROYLOCK(&manager->queues[i].lock); } DESTROYLOCK(&manager->lock); @@ -1321,19 +1339,17 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, RUNTIME_CHECK(isc_mutex_init(&manager->lock) == ISC_R_SUCCESS); RUNTIME_CHECK(isc_mutex_init(&manager->excl_lock) == ISC_R_SUCCESS); - RUNTIME_CHECK(isc_mutex_init(&manager->halt_lock) - == ISC_R_SUCCESS); - RUNTIME_CHECK(isc_condition_init(&manager->halt_cond) - == ISC_R_SUCCESS); + RUNTIME_CHECK(isc_mutex_init(&manager->halt_lock) == ISC_R_SUCCESS); + RUNTIME_CHECK(isc_condition_init(&manager->halt_cond) == ISC_R_SUCCESS); manager->workers = workers; - if (default_quantum == 0) + if (default_quantum == 0) { default_quantum = DEFAULT_DEFAULT_QUANTUM; + } manager->default_quantum = default_quantum; INIT_LIST(manager->tasks); - manager->queues = isc_mem_get(mctx, workers * - sizeof(isc__taskqueue_t)); + manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t)); RUNTIME_CHECK(manager->queues != NULL); manager->tasks_running = 0; @@ -1357,7 +1373,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, RUNTIME_CHECK(isc_mutex_init(&manager->queues[i].lock) == ISC_R_SUCCESS); RUNTIME_CHECK(isc_condition_init( - &manager->queues[i].work_available) + &manager->queues[i].work_available) == ISC_R_SUCCESS); manager->queues[i].manager = manager; manager->queues[i].threadid = i; diff --git a/lib/isc/unix/socket.c b/lib/isc/unix/socket.c index 364531e125..2fb6d4fd66 100644 --- a/lib/isc/unix/socket.c +++ b/lib/isc/unix/socket.c @@ -51,6 +51,7 @@ #include #include #include +#include #include #include #include @@ -184,10 +185,12 @@ typedef enum { poll_idle, poll_active, poll_checking } pollstate_t; #endif /* ISC_SOCKET_USE_POLLWATCH */ /*% - * Size of per-FD lock buckets. + * Per-FD lock buckets, we shuffle them around a bit as FDs come in herds. */ -#define FDLOCK_COUNT 1024 -#define FDLOCK_ID(fd) ((fd) % FDLOCK_COUNT) +#define FDLOCK_BITS 10 +#define FDLOCK_COUNT (1<>(FDLOCK_BITS/2)) |\ + (((fd)<<(FDLOCK_BITS/2))%(FDLOCK_COUNT))) /*% * Maximum number of events communicated with the kernel. There should normally @@ -324,6 +327,7 @@ typedef isc_event_t intev_t; typedef struct isc__socket isc__socket_t; typedef struct isc__socketmgr isc__socketmgr_t; +typedef struct isc__socketthread isc__socketthread_t; #define NEWCONNSOCK(ev) ((isc__socket_t *)(ev)->newsocket) @@ -334,12 +338,13 @@ struct isc__socket { isc_mutex_t lock; isc_sockettype_t type; const isc_statscounter_t *statsindex; + isc_refcount_t references; /* Locked by socket lock. */ ISC_LINK(isc__socket_t) link; - unsigned int references; int fd; int pf; + int threadid; char name[16]; void * tag; @@ -348,20 +353,9 @@ struct isc__socket { ISC_LIST(isc_socket_newconnev_t) accept_list; ISC_LIST(isc_socket_connev_t) connect_list; - /* - * Internal events. Posted when a descriptor is readable or - * writable. These are statically allocated and never freed. - * They will be set to non-purgable before use. - */ - intev_t readable_ev; - intev_t writable_ev; - isc_sockaddr_t peer_address; /* remote address */ - unsigned int pending_recv : 1, - pending_send : 1, - pending_accept : 1, - listener : 1, /* listener socket */ + unsigned int listener : 1, /* listener socket */ connected : 1, connecting : 1, /* connect pending */ bound : 1, /* bound to local addr */ @@ -373,10 +367,6 @@ struct isc__socket { unsigned char overflow; /* used for MSG_TRUNC fake */ #endif - void *fdwatcharg; - isc_sockfdwatch_t fdwatchcb; - int fdwatchflags; - isc_task_t *fdwatchtask; unsigned int dscp; }; @@ -388,8 +378,26 @@ struct isc__socketmgr { isc_socketmgr_t common; isc_mem_t *mctx; isc_mutex_t lock; - isc_mutex_t *fdlock; isc_stats_t *stats; + int nthreads; + isc__socketthread_t *threads; + unsigned int maxsocks; + /* Locked by manager lock. */ + ISC_LIST(isc__socket_t) socklist; + int reserved; /* unlocked */ + isc_condition_t shutdown_ok; + int maxudp; +}; + +struct isc__socketthread { + isc__socketmgr_t * manager; + int threadid; + isc_thread_t thread; + int pipe_fds[2]; + isc_mutex_t *fdlock; + /* Locked by fdlock. */ + isc__socket_t **fds; + int *fdstate; #ifdef USE_KQUEUE int kqueue_fd; int nevents; @@ -399,6 +407,7 @@ struct isc__socketmgr { int epoll_fd; int nevents; struct epoll_event *events; + uint32_t *epoll_events; #endif /* USE_EPOLL */ #ifdef USE_DEVPOLL int devpoll_fd; @@ -406,38 +415,19 @@ struct isc__socketmgr { unsigned int calls; int nevents; struct pollfd *events; + pollinfo_t *fdpollinfo; #endif /* USE_DEVPOLL */ #ifdef USE_SELECT int fd_bufsize; -#endif /* USE_SELECT */ - unsigned int maxsocks; - int pipe_fds[2]; - - /* Locked by fdlock. */ - isc__socket_t **fds; - int *fdstate; -#if defined(USE_EPOLL) - uint32_t *epoll_events; -#endif -#ifdef USE_DEVPOLL - pollinfo_t *fdpollinfo; -#endif - - /* Locked by manager lock. */ - ISC_LIST(isc__socket_t) socklist; -#ifdef USE_SELECT fd_set *read_fds; fd_set *read_fds_copy; fd_set *write_fds; fd_set *write_fds_copy; int maxfd; #endif /* USE_SELECT */ - int reserved; /* unlocked */ - isc_thread_t watcher; - isc_condition_t shutdown_ok; - int maxudp; }; + #define CLOSED 0 /* this one must be zero */ #define MANAGED 1 #define CLOSE_PENDING 2 @@ -463,18 +453,16 @@ static void free_socket(isc__socket_t **); static isc_result_t allocate_socket(isc__socketmgr_t *, isc_sockettype_t, isc__socket_t **); static void destroy(isc__socket_t **); -static void internal_accept(isc_task_t *, isc_event_t *); -static void internal_connect(isc_task_t *, isc_event_t *); -static void internal_recv(isc_task_t *, isc_event_t *); -static void internal_send(isc_task_t *, isc_event_t *); -static void internal_fdwatch_write(isc_task_t *, isc_event_t *); -static void internal_fdwatch_read(isc_task_t *, isc_event_t *); +static void internal_accept(isc__socket_t *); +static void internal_connect(isc__socket_t *); +static void internal_recv(isc__socket_t *); +static void internal_send(isc__socket_t *); static void process_cmsg(isc__socket_t *, struct msghdr *, isc_socketevent_t *); static void build_msghdr_send(isc__socket_t *, char *, isc_socketevent_t *, struct msghdr *, struct iovec *, size_t *); static void build_msghdr_recv(isc__socket_t *, char *, isc_socketevent_t *, struct msghdr *, struct iovec *, size_t *); -static bool process_ctlfd(isc__socketmgr_t *manager); +static bool process_ctlfd(isc__socketthread_t *thread); static void setdscp(isc__socket_t *sock, isc_dscp_t dscp); #define SELECT_POKE_SHUTDOWN (-1) @@ -485,7 +473,7 @@ static void setdscp(isc__socket_t *sock, isc_dscp_t dscp); #define SELECT_POKE_CONNECT (-4) /*%< Same as _WRITE */ #define SELECT_POKE_CLOSE (-5) -#define SOCK_DEAD(s) ((s)->references == 0) +#define SOCK_DEAD(s) (isc_refcount_current(&((s)->references)) == 0) /*% * Shortcut index arrays to get access to statistics counters. @@ -568,19 +556,6 @@ static const isc_statscounter_t unixstatsindex[] = { isc_sockstatscounter_unixrecvfail, isc_sockstatscounter_unixactive }; -static const isc_statscounter_t fdwatchstatsindex[] = { - -1, - -1, - isc_sockstatscounter_fdwatchclose, - isc_sockstatscounter_fdwatchbindfail, - isc_sockstatscounter_fdwatchconnectfail, - isc_sockstatscounter_fdwatchconnect, - -1, - -1, - isc_sockstatscounter_fdwatchsendfail, - isc_sockstatscounter_fdwatchrecvfail, - -1 -}; static const isc_statscounter_t rawstatsindex[] = { isc_sockstatscounter_rawopen, isc_sockstatscounter_rawopenfail, @@ -595,6 +570,14 @@ static const isc_statscounter_t rawstatsindex[] = { isc_sockstatscounter_rawactive }; +static int +gen_threadid(isc__socket_t *sock); + +static int +gen_threadid(isc__socket_t *sock) { + return sock->fd % sock->manager->nthreads; +} + static void manager_log(isc__socketmgr_t *sockmgr, isc_logcategory_t *category, isc_logmodule_t *module, int level, @@ -618,6 +601,31 @@ manager_log(isc__socketmgr_t *sockmgr, "sockmgr %p: %s", sockmgr, msgbuf); } +static void +thread_log(isc__socketthread_t *thread, + isc_logcategory_t *category, isc_logmodule_t *module, int level, + const char *fmt, ...) ISC_FORMAT_PRINTF(5, 6); +static void +thread_log(isc__socketthread_t *thread, + isc_logcategory_t *category, isc_logmodule_t *module, int level, + const char *fmt, ...) +{ + char msgbuf[2048]; + va_list ap; + + if (! isc_log_wouldlog(isc_lctx, level)) { + return; + } + + va_start(ap, fmt); + vsnprintf(msgbuf, sizeof(msgbuf), fmt, ap); + va_end(ap); + + isc_log_write(isc_lctx, category, module, level, + "sockmgr %p thread %d: %s", + thread->manager, thread->threadid, msgbuf); +} + static void socket_log(isc__socket_t *sock, const isc_sockaddr_t *address, isc_logcategory_t *category, isc_logmodule_t *module, int level, @@ -675,21 +683,23 @@ dec_stats(isc_stats_t *stats, isc_statscounter_t counterid) { } static inline isc_result_t -watch_fd(isc__socketmgr_t *manager, int fd, int msg) { +watch_fd(isc__socketthread_t *thread, int fd, int msg) { isc_result_t result = ISC_R_SUCCESS; #ifdef USE_KQUEUE struct kevent evchange; memset(&evchange, 0, sizeof(evchange)); - if (msg == SELECT_POKE_READ) + if (msg == SELECT_POKE_READ) { evchange.filter = EVFILT_READ; - else + } else { evchange.filter = EVFILT_WRITE; + } evchange.flags = EV_ADD; evchange.ident = fd; - if (kevent(manager->kqueue_fd, &evchange, 1, NULL, 0, NULL) != 0) + if (kevent(thread->kqueue_fd, &evchange, 1, NULL, 0, NULL) != 0) { result = isc__errno2result(errno); + } return (result); #elif defined(USE_EPOLL) @@ -698,23 +708,25 @@ watch_fd(isc__socketmgr_t *manager, int fd, int msg) { int ret; int op; - oldevents = manager->epoll_events[fd]; - if (msg == SELECT_POKE_READ) - manager->epoll_events[fd] |= EPOLLIN; - else - manager->epoll_events[fd] |= EPOLLOUT; + oldevents = thread->epoll_events[fd]; + if (msg == SELECT_POKE_READ) { + thread->epoll_events[fd] |= EPOLLIN; + } else { + thread->epoll_events[fd] |= EPOLLOUT; + } - event.events = manager->epoll_events[fd]; + event.events = thread->epoll_events[fd]; memset(&event.data, 0, sizeof(event.data)); event.data.fd = fd; op = (oldevents == 0U) ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; - ret = epoll_ctl(manager->epoll_fd, op, fd, &event); + ret = epoll_ctl(thread->epoll_fd, op, fd, &event); if (ret == -1) { - if (errno == EEXIST) + if (errno == EEXIST) { UNEXPECTED_ERROR(__FILE__, __LINE__, "epoll_ctl(ADD/MOD) returned " "EEXIST for fd %d", fd); + } result = isc__errno2result(errno); } @@ -724,52 +736,56 @@ watch_fd(isc__socketmgr_t *manager, int fd, int msg) { int lockid = FDLOCK_ID(fd); memset(&pfd, 0, sizeof(pfd)); - if (msg == SELECT_POKE_READ) + if (msg == SELECT_POKE_READ) { pfd.events = POLLIN; - else + } else { pfd.events = POLLOUT; + } pfd.fd = fd; pfd.revents = 0; - LOCK(&manager->fdlock[lockid]); - if (write(manager->devpoll_fd, &pfd, sizeof(pfd)) == -1) + if (write(thread->devpoll_fd, &pfd, sizeof(pfd)) == -1) { result = isc__errno2result(errno); - else { - if (msg == SELECT_POKE_READ) - manager->fdpollinfo[fd].want_read = 1; - else - manager->fdpollinfo[fd].want_write = 1; + } else { + if (msg == SELECT_POKE_READ) { + thread->fdpollinfo[fd].want_read = 1; + } else { + thread->fdpollinfo[fd].want_write = 1; + } } - UNLOCK(&manager->fdlock[lockid]); return (result); #elif defined(USE_SELECT) - LOCK(&manager->lock); - if (msg == SELECT_POKE_READ) - FD_SET(fd, manager->read_fds); - if (msg == SELECT_POKE_WRITE) - FD_SET(fd, manager->write_fds); - UNLOCK(&manager->lock); + LOCK(&thread->manager->lock); + if (msg == SELECT_POKE_READ) { + FD_SET(fd, thread->read_fds); + } + if (msg == SELECT_POKE_WRITE) { + FD_SET(fd, thread->write_fds); + } + UNLOCK(&thread->manager->lock); return (result); #endif } static inline isc_result_t -unwatch_fd(isc__socketmgr_t *manager, int fd, int msg) { +unwatch_fd(isc__socketthread_t *thread, int fd, int msg) { isc_result_t result = ISC_R_SUCCESS; #ifdef USE_KQUEUE struct kevent evchange; memset(&evchange, 0, sizeof(evchange)); - if (msg == SELECT_POKE_READ) + if (msg == SELECT_POKE_READ) { evchange.filter = EVFILT_READ; - else + } else { evchange.filter = EVFILT_WRITE; + } evchange.flags = EV_DELETE; evchange.ident = fd; - if (kevent(manager->kqueue_fd, &evchange, 1, NULL, 0, NULL) != 0) + if (kevent(thread->kqueue_fd, &evchange, 1, NULL, 0, NULL) != 0) { result = isc__errno2result(errno); + } return (result); #elif defined(USE_EPOLL) @@ -777,17 +793,18 @@ unwatch_fd(isc__socketmgr_t *manager, int fd, int msg) { int ret; int op; - if (msg == SELECT_POKE_READ) - manager->epoll_events[fd] &= ~(EPOLLIN); - else - manager->epoll_events[fd] &= ~(EPOLLOUT); + if (msg == SELECT_POKE_READ) { + thread->epoll_events[fd] &= ~(EPOLLIN); + } else { + thread->epoll_events[fd] &= ~(EPOLLOUT); + } - event.events = manager->epoll_events[fd]; + event.events = thread->epoll_events[fd]; memset(&event.data, 0, sizeof(event.data)); event.data.fd = fd; op = (event.events == 0U) ? EPOLL_CTL_DEL : EPOLL_CTL_MOD; - ret = epoll_ctl(manager->epoll_fd, op, fd, &event); + ret = epoll_ctl(thread->epoll_fd, op, fd, &event); if (ret == -1 && errno != ENOENT) { char strbuf[ISC_STRERRORSIZE]; strerror_r(errno, strbuf, sizeof(strbuf)); @@ -810,45 +827,51 @@ unwatch_fd(isc__socketmgr_t *manager, int fd, int msg) { * only provides a way of canceling per FD, we may need to re-poll the * socket for the other operation. */ - LOCK(&manager->fdlock[lockid]); if (msg == SELECT_POKE_READ && - manager->fdpollinfo[fd].want_write == 1) { + thread->fdpollinfo[fd].want_write == 1) + { pfds[1].events = POLLOUT; pfds[1].fd = fd; writelen += sizeof(pfds[1]); } if (msg == SELECT_POKE_WRITE && - manager->fdpollinfo[fd].want_read == 1) { + thread->fdpollinfo[fd].want_read == 1) + { pfds[1].events = POLLIN; pfds[1].fd = fd; writelen += sizeof(pfds[1]); } - if (write(manager->devpoll_fd, pfds, writelen) == -1) + if (write(thread->devpoll_fd, pfds, writelen) == -1) { result = isc__errno2result(errno); - else { - if (msg == SELECT_POKE_READ) - manager->fdpollinfo[fd].want_read = 0; - else - manager->fdpollinfo[fd].want_write = 0; + } else { + if (msg == SELECT_POKE_READ) { + thread->fdpollinfo[fd].want_read = 0; + } else { + thread->fdpollinfo[fd].want_write = 0; + } } - UNLOCK(&manager->fdlock[lockid]); return (result); #elif defined(USE_SELECT) - LOCK(&manager->lock); - if (msg == SELECT_POKE_READ) - FD_CLR(fd, manager->read_fds); - else if (msg == SELECT_POKE_WRITE) - FD_CLR(fd, manager->write_fds); - UNLOCK(&manager->lock); + LOCK(&thread->manager->lock); + if (msg == SELECT_POKE_READ) { + FD_CLR(fd, thread->read_fds); + } else if (msg == SELECT_POKE_WRITE) { + FD_CLR(fd, thread->write_fds); + } + UNLOCK(&thread->manager->lock); return (result); #endif } +/* + * A poke message was received, perform a proper watch/unwatch + * on a fd provided + */ static void -wakeup_socket(isc__socketmgr_t *manager, int fd, int msg) { +wakeup_socket(isc__socketthread_t *thread, int fd, int msg) { isc_result_t result; int lockid = FDLOCK_ID(fd); @@ -858,21 +881,21 @@ wakeup_socket(isc__socketmgr_t *manager, int fd, int msg) { * or writes. */ - INSIST(fd >= 0 && fd < (int)manager->maxsocks); + INSIST(fd >= 0 && fd < (int)thread->manager->maxsocks); if (msg == SELECT_POKE_CLOSE) { /* No one should be updating fdstate, so no need to lock it */ - INSIST(manager->fdstate[fd] == CLOSE_PENDING); - manager->fdstate[fd] = CLOSED; - (void)unwatch_fd(manager, fd, SELECT_POKE_READ); - (void)unwatch_fd(manager, fd, SELECT_POKE_WRITE); + INSIST(thread->fdstate[fd] == CLOSE_PENDING); + thread->fdstate[fd] = CLOSED; + (void)unwatch_fd(thread, fd, SELECT_POKE_READ); + (void)unwatch_fd(thread, fd, SELECT_POKE_WRITE); (void)close(fd); return; } - LOCK(&manager->fdlock[lockid]); - if (manager->fdstate[fd] == CLOSE_PENDING) { - UNLOCK(&manager->fdlock[lockid]); + LOCK(&thread->fdlock[lockid]); + if (thread->fdstate[fd] == CLOSE_PENDING) { + UNLOCK(&thread->fdlock[lockid]); /* * We accept (and ignore) any error from unwatch_fd() as we are @@ -882,20 +905,20 @@ wakeup_socket(isc__socketmgr_t *manager, int fd, int msg) { * fdlock; otherwise it could cause deadlock due to a lock order * reversal. */ - (void)unwatch_fd(manager, fd, SELECT_POKE_READ); - (void)unwatch_fd(manager, fd, SELECT_POKE_WRITE); + (void)unwatch_fd(thread, fd, SELECT_POKE_READ); + (void)unwatch_fd(thread, fd, SELECT_POKE_WRITE); return; } - if (manager->fdstate[fd] != MANAGED) { - UNLOCK(&manager->fdlock[lockid]); + if (thread->fdstate[fd] != MANAGED) { + UNLOCK(&thread->fdlock[lockid]); return; } - UNLOCK(&manager->fdlock[lockid]); + UNLOCK(&thread->fdlock[lockid]); /* * Set requested bit. */ - result = watch_fd(manager, fd, msg); + result = watch_fd(thread, fd, msg); if (result != ISC_R_SUCCESS) { /* * XXXJT: what should we do? Ignoring the failure of watching @@ -915,7 +938,7 @@ wakeup_socket(isc__socketmgr_t *manager, int fd, int msg) { * will not get partial writes. */ static void -select_poke(isc__socketmgr_t *mgr, int fd, int msg) { +select_poke(isc__socketmgr_t *mgr, int threadid, int fd, int msg) { int cc; int buf[2]; char strbuf[ISC_STRERRORSIZE]; @@ -924,7 +947,8 @@ select_poke(isc__socketmgr_t *mgr, int fd, int msg) { buf[1] = msg; do { - cc = write(mgr->pipe_fds[1], buf, sizeof(buf)); + cc = write(mgr->threads[threadid].pipe_fds[1], + buf, sizeof(buf)); #ifdef ENOSR /* * Treat ENOSR as EAGAIN but loop slowly as it is @@ -954,12 +978,12 @@ select_poke(isc__socketmgr_t *mgr, int fd, int msg) { * Read a message on the internal fd. */ static void -select_readmsg(isc__socketmgr_t *mgr, int *fd, int *msg) { +select_readmsg(isc__socketthread_t *thread, int *fd, int *msg) { int buf[2]; int cc; char strbuf[ISC_STRERRORSIZE]; - cc = read(mgr->pipe_fds[0], buf, sizeof(buf)); + cc = read(thread->pipe_fds[0], buf, sizeof(buf)); if (cc < 0) { *msg = SELECT_POKE_NOTHING; *fd = -1; /* Silence compiler. */ @@ -1584,7 +1608,6 @@ doio_recv(isc__socket_t *sock, isc_socketevent_t *dev) { case isc_sockettype_udp: case isc_sockettype_raw: break; - case isc_sockettype_fdwatch: default: INSIST(0); ISC_UNREACHABLE(); @@ -1784,39 +1807,22 @@ doio_send(isc__socket_t *sock, isc_socketevent_t *dev) { * references exist. */ static void -socketclose(isc__socketmgr_t *manager, isc__socket_t *sock, int fd) { - isc_sockettype_t type = sock->type; +socketclose(isc__socketthread_t *thread, isc__socket_t *sock, int fd) { int lockid = FDLOCK_ID(fd); - /* * No one has this socket open, so the watcher doesn't have to be * poked, and the socket doesn't have to be locked. */ - LOCK(&manager->fdlock[lockid]); - manager->fds[fd] = NULL; - if (type == isc_sockettype_fdwatch) - manager->fdstate[fd] = CLOSED; - else - manager->fdstate[fd] = CLOSE_PENDING; - UNLOCK(&manager->fdlock[lockid]); - if (type == isc_sockettype_fdwatch) { - /* - * The caller may close the socket once this function returns, - * and `fd' may be reassigned for a new socket. So we do - * unwatch_fd() here, rather than defer it via select_poke(). - * Note: this may complicate data protection among threads and - * may reduce performance due to additional locks. One way to - * solve this would be to dup() the watched descriptor, but we - * take a simpler approach at this moment. - */ - (void)unwatch_fd(manager, fd, SELECT_POKE_READ); - (void)unwatch_fd(manager, fd, SELECT_POKE_WRITE); - } else - select_poke(manager, fd, SELECT_POKE_CLOSE); + LOCK(&thread->fdlock[lockid]); + thread->fds[fd] = NULL; + thread->fdstate[fd] = CLOSE_PENDING; + UNLOCK(&thread->fdlock[lockid]); + select_poke(thread->manager, thread->threadid, fd, SELECT_POKE_CLOSE); - inc_stats(manager->stats, sock->statsindex[STATID_CLOSE]); + inc_stats(thread->manager->stats, sock->statsindex[STATID_CLOSE]); if (sock->active == 1) { - dec_stats(manager->stats, sock->statsindex[STATID_ACTIVE]); + dec_stats(thread->manager->stats, + sock->statsindex[STATID_ACTIVE]); sock->active = 0; } @@ -1825,27 +1831,28 @@ socketclose(isc__socketmgr_t *manager, isc__socket_t *sock, int fd) { * efficiently) */ #ifdef USE_SELECT - LOCK(&manager->lock); - if (manager->maxfd == fd) { + LOCK(&thread->manager->lock); + if (thread->maxfd == fd) { int i; - manager->maxfd = 0; + thread->maxfd = 0; for (i = fd - 1; i >= 0; i--) { lockid = FDLOCK_ID(i); - LOCK(&manager->fdlock[lockid]); - if (manager->fdstate[i] == MANAGED) { - manager->maxfd = i; - UNLOCK(&manager->fdlock[lockid]); + LOCK(&thread->fdlock[lockid]); + if (thread->fdstate[i] == MANAGED) { + thread->maxfd = i; + UNLOCK(&thread->fdlock[lockid]); break; } - UNLOCK(&manager->fdlock[lockid]); + UNLOCK(&thread->fdlock[lockid]); + } + if (thread->maxfd < thread->pipe_fds[0]) { + thread->maxfd = thread->pipe_fds[0]; } - if (manager->maxfd < manager->pipe_fds[0]) - manager->maxfd = manager->pipe_fds[0]; } - UNLOCK(&manager->lock); + UNLOCK(&thread->manager->lock); #endif /* USE_SELECT */ } @@ -1854,6 +1861,7 @@ destroy(isc__socket_t **sockp) { int fd; isc__socket_t *sock = *sockp; isc__socketmgr_t *manager = sock->manager; + isc__socketthread_t *thread; socket_log(sock, NULL, CREATION, isc_msgcat, ISC_MSGSET_SOCKET, ISC_MSG_DESTROYING, "destroying"); @@ -1866,8 +1874,10 @@ destroy(isc__socket_t **sockp) { if (sock->fd >= 0) { fd = sock->fd; + thread = &manager->threads[sock->threadid]; sock->fd = -1; - socketclose(manager, sock, fd); + sock->threadid = -1; + socketclose(thread, sock, fd); } LOCK(&manager->lock); @@ -1888,20 +1898,17 @@ allocate_socket(isc__socketmgr_t *manager, isc_sockettype_t type, isc__socket_t **socketp) { isc__socket_t *sock; - isc_result_t result; sock = isc_mem_get(manager->mctx, sizeof(*sock)); - if (sock == NULL) - return (ISC_R_NOMEMORY); - sock->common.magic = 0; sock->common.impmagic = 0; - sock->references = 0; + isc_refcount_init(&sock->references, 0); sock->manager = manager; sock->type = type; sock->fd = -1; + sock->threadid = -1; sock->dscp = 0; /* TOS/TCLASS is zero until set. */ sock->dupped = 0; sock->statsindex = NULL; @@ -1909,7 +1916,6 @@ allocate_socket(isc__socketmgr_t *manager, isc_sockettype_t type, ISC_LINK_INIT(sock, link); - memset(sock->name, 0, sizeof(sock->name)); sock->tag = NULL; @@ -1920,9 +1926,6 @@ allocate_socket(isc__socketmgr_t *manager, isc_sockettype_t type, ISC_LIST_INIT(sock->send_list); ISC_LIST_INIT(sock->accept_list); ISC_LIST_INIT(sock->connect_list); - sock->pending_recv = 0; - sock->pending_send = 0; - sock->pending_accept = 0; sock->listener = 0; sock->connected = 0; sock->connecting = 0; @@ -1932,33 +1935,13 @@ allocate_socket(isc__socketmgr_t *manager, isc_sockettype_t type, /* * Initialize the lock. */ - result = isc_mutex_init(&sock->lock); - if (result != ISC_R_SUCCESS) { - sock->common.magic = 0; - sock->common.impmagic = 0; - goto error; - } - - /* - * Initialize readable and writable events. - */ - ISC_EVENT_INIT(&sock->readable_ev, sizeof(intev_t), - ISC_EVENTATTR_NOPURGE, NULL, ISC_SOCKEVENT_INTR, - NULL, sock, sock, NULL, NULL); - ISC_EVENT_INIT(&sock->writable_ev, sizeof(intev_t), - ISC_EVENTATTR_NOPURGE, NULL, ISC_SOCKEVENT_INTW, - NULL, sock, sock, NULL, NULL); + RUNTIME_CHECK(isc_mutex_init(&sock->lock) == ISC_R_SUCCESS); sock->common.magic = ISCAPI_SOCKET_MAGIC; sock->common.impmagic = SOCKET_MAGIC; *socketp = sock; return (ISC_R_SUCCESS); - - error: - isc_mem_put(manager->mctx, sock, sizeof(*sock)); - - return (result); } /* @@ -1973,11 +1956,8 @@ free_socket(isc__socket_t **socketp) { isc__socket_t *sock = *socketp; INSIST(VALID_SOCKET(sock)); - INSIST(sock->references == 0); + INSIST(isc_refcount_current(&sock->references) == 0); INSIST(!sock->connecting); - INSIST(!sock->pending_recv); - INSIST(!sock->pending_send); - INSIST(!sock->pending_accept); INSIST(ISC_LIST_EMPTY(sock->recv_list)); INSIST(ISC_LIST_EMPTY(sock->send_list)); INSIST(ISC_LIST_EMPTY(sock->accept_list)); @@ -2245,13 +2225,6 @@ opensocket(isc__socketmgr_t *manager, isc__socket_t *sock, } #endif break; - case isc_sockettype_fdwatch: - /* - * We should not be called for isc_sockettype_fdwatch - * sockets. - */ - INSIST(0); - ISC_UNREACHABLE(); } } else { sock->fd = dup(dup_socket->fd); @@ -2580,12 +2553,12 @@ socket_create(isc_socketmgr_t *manager0, int pf, isc_sockettype_t type, { isc__socket_t *sock = NULL; isc__socketmgr_t *manager = (isc__socketmgr_t *)manager0; + isc__socketthread_t *thread; isc_result_t result; int lockid; REQUIRE(VALID_MANAGER(manager)); REQUIRE(socketp != NULL && *socketp == NULL); - REQUIRE(type != isc_sockettype_fdwatch); result = allocate_socket(manager, type, &sock); if (result != ISC_R_SUCCESS) @@ -2621,7 +2594,12 @@ socket_create(isc_socketmgr_t *manager0, int pf, isc_sockettype_t type, return (result); } - sock->references = 1; + if (sock->fd == -1) { + abort(); + } + sock->threadid = gen_threadid(sock); + isc_refcount_init(&sock->references, 1); + thread = &manager->threads[sock->threadid]; *socketp = (isc_socket_t *)sock; /* @@ -2630,23 +2608,24 @@ socket_create(isc_socketmgr_t *manager0, int pf, isc_sockettype_t type, */ lockid = FDLOCK_ID(sock->fd); - LOCK(&manager->fdlock[lockid]); - manager->fds[sock->fd] = sock; - manager->fdstate[sock->fd] = MANAGED; + LOCK(&thread->fdlock[lockid]); + thread->fds[sock->fd] = sock; + thread->fdstate[sock->fd] = MANAGED; #if defined(USE_EPOLL) - manager->epoll_events[sock->fd] = 0; + thread->epoll_events[sock->fd] = 0; #endif #ifdef USE_DEVPOLL - INSIST(sock->manager->fdpollinfo[sock->fd].want_read == 0 && - sock->manager->fdpollinfo[sock->fd].want_write == 0); + INSIST(thread->fdpollinfo[sock->fd].want_read == 0 && + thread->fdpollinfo[sock->fd].want_write == 0); #endif - UNLOCK(&manager->fdlock[lockid]); + UNLOCK(&thread->fdlock[lockid]); LOCK(&manager->lock); ISC_LIST_APPEND(manager->socklist, sock, link); #ifdef USE_SELECT - if (manager->maxfd < sock->fd) - manager->maxfd = sock->fd; + if (thread->maxfd < sock->fd) { + thread->maxfd = sock->fd; + } #endif UNLOCK(&manager->lock); @@ -2689,42 +2668,42 @@ isc_result_t isc_socket_open(isc_socket_t *sock0) { isc_result_t result; isc__socket_t *sock = (isc__socket_t *)sock0; + isc__socketthread_t *thread; REQUIRE(VALID_SOCKET(sock)); - LOCK(&sock->lock); - REQUIRE(sock->references == 1); - REQUIRE(sock->type != isc_sockettype_fdwatch); - UNLOCK(&sock->lock); + REQUIRE(isc_refcount_current(&sock->references) == 1); /* * We don't need to retain the lock hereafter, since no one else has * this socket. */ REQUIRE(sock->fd == -1); + REQUIRE(sock->threadid == -1); result = opensocket(sock->manager, sock, NULL); - if (result != ISC_R_SUCCESS) + if (result != ISC_R_SUCCESS) { sock->fd = -1; - - if (result == ISC_R_SUCCESS) { + } else { + sock->threadid = gen_threadid(sock); + thread = &sock->manager->threads[sock->threadid]; int lockid = FDLOCK_ID(sock->fd); - LOCK(&sock->manager->fdlock[lockid]); - sock->manager->fds[sock->fd] = sock; - sock->manager->fdstate[sock->fd] = MANAGED; + LOCK(&thread->fdlock[lockid]); + thread->fds[sock->fd] = sock; + thread->fdstate[sock->fd] = MANAGED; #if defined(USE_EPOLL) - sock->manager->epoll_events[sock->fd] = 0; + thread->epoll_events[sock->fd] = 0; #endif #ifdef USE_DEVPOLL - INSIST(sock->manager->fdpollinfo[sock->fd].want_read == 0 && - sock->manager->fdpollinfo[sock->fd].want_write == 0); + INSIST(thread->fdpollinfo[sock->fd].want_read == 0 && + thread->fdpollinfo[sock->fd].want_write == 0); #endif - UNLOCK(&sock->manager->fdlock[lockid]); + UNLOCK(&thread->fdlock[lockid]); #ifdef USE_SELECT LOCK(&sock->manager->lock); - if (sock->manager->maxfd < sock->fd) - sock->manager->maxfd = sock->fd; + if (thread->maxfd < sock->fd) + thread->maxfd = sock->fd; UNLOCK(&sock->manager->lock); #endif } @@ -2732,114 +2711,6 @@ isc_socket_open(isc_socket_t *sock0) { return (result); } -/* - * Create a new 'type' socket managed by 'manager'. Events - * will be posted to 'task' and when dispatched 'action' will be - * called with 'arg' as the arg value. The new socket is returned - * in 'socketp'. - */ -isc_result_t -isc_socket_fdwatchcreate(isc_socketmgr_t *manager0, int fd, int flags, - isc_sockfdwatch_t callback, void *cbarg, - isc_task_t *task, isc_socket_t **socketp) -{ - isc__socketmgr_t *manager = (isc__socketmgr_t *)manager0; - isc__socket_t *sock = NULL; - isc_result_t result; - int lockid; - - REQUIRE(VALID_MANAGER(manager)); - REQUIRE(socketp != NULL && *socketp == NULL); - - if (fd < 0 || (unsigned int)fd >= manager->maxsocks) - return (ISC_R_RANGE); - - result = allocate_socket(manager, isc_sockettype_fdwatch, &sock); - if (result != ISC_R_SUCCESS) - return (result); - - sock->fd = fd; - sock->fdwatcharg = cbarg; - sock->fdwatchcb = callback; - sock->fdwatchflags = flags; - sock->fdwatchtask = task; - sock->statsindex = fdwatchstatsindex; - - sock->references = 1; - *socketp = (isc_socket_t *)sock; - - /* - * Note we don't have to lock the socket like we normally would because - * there are no external references to it yet. - */ - - lockid = FDLOCK_ID(sock->fd); - LOCK(&manager->fdlock[lockid]); - manager->fds[sock->fd] = sock; - manager->fdstate[sock->fd] = MANAGED; -#if defined(USE_EPOLL) - manager->epoll_events[sock->fd] = 0; -#endif - UNLOCK(&manager->fdlock[lockid]); - - LOCK(&manager->lock); - ISC_LIST_APPEND(manager->socklist, sock, link); -#ifdef USE_SELECT - if (manager->maxfd < sock->fd) - manager->maxfd = sock->fd; -#endif - UNLOCK(&manager->lock); - - if ((flags & ISC_SOCKFDWATCH_READ) != 0) { - select_poke(sock->manager, sock->fd, SELECT_POKE_READ); - } - if ((flags & ISC_SOCKFDWATCH_WRITE) != 0) { - select_poke(sock->manager, sock->fd, SELECT_POKE_WRITE); - } - - socket_log(sock, NULL, CREATION, isc_msgcat, ISC_MSGSET_SOCKET, - ISC_MSG_CREATED, "fdwatch-created"); - - return (ISC_R_SUCCESS); -} - -/* - * Indicate to the manager that it should watch the socket again. - * This can be used to restart watching if the previous event handler - * didn't indicate there was more data to be processed. Primarily - * it is for writing but could be used for reading if desired - */ - -isc_result_t -isc_socket_fdwatchpoke(isc_socket_t *sock0, int flags) -{ - isc__socket_t *sock = (isc__socket_t *)sock0; - - REQUIRE(VALID_SOCKET(sock)); - - /* - * We check both flags first to allow us to get the lock - * once but only if we need it. - */ - - if ((flags & (ISC_SOCKFDWATCH_READ | ISC_SOCKFDWATCH_WRITE)) != 0) { - LOCK(&sock->lock); - if (((flags & ISC_SOCKFDWATCH_READ) != 0) && - !sock->pending_recv) - select_poke(sock->manager, sock->fd, - SELECT_POKE_READ); - if (((flags & ISC_SOCKFDWATCH_WRITE) != 0) && - !sock->pending_send) - select_poke(sock->manager, sock->fd, - SELECT_POKE_WRITE); - UNLOCK(&sock->lock); - } - - socket_log(sock, NULL, TRACE, isc_msgcat, ISC_MSGSET_SOCKET, - ISC_MSG_POKED, "fdwatch-poked flags: %d", flags); - - return (ISC_R_SUCCESS); -} /* * Attach to a socket. Caller must explicitly detach when it is done. @@ -2851,9 +2722,8 @@ isc_socket_attach(isc_socket_t *sock0, isc_socket_t **socketp) { REQUIRE(VALID_SOCKET(sock)); REQUIRE(socketp != NULL && *socketp == NULL); - LOCK(&sock->lock); - sock->references++; - UNLOCK(&sock->lock); + int old_refs = isc_refcount_increment(&sock->references); + REQUIRE(old_refs > 0); *socketp = (isc_socket_t *)sock; } @@ -2865,21 +2735,14 @@ isc_socket_attach(isc_socket_t *sock0, isc_socket_t **socketp) { void isc_socket_detach(isc_socket_t **socketp) { isc__socket_t *sock; - bool kill_socket = false; REQUIRE(socketp != NULL); sock = (isc__socket_t *)*socketp; REQUIRE(VALID_SOCKET(sock)); - LOCK(&sock->lock); - REQUIRE(sock->references > 0); - sock->references--; - if (sock->references == 0) - kill_socket = true; - UNLOCK(&sock->lock); - - if (kill_socket) + if (isc_refcount_decrement(&sock->references) == 1) { destroy(&sock); + } *socketp = NULL; } @@ -2889,28 +2752,26 @@ isc_socket_close(isc_socket_t *sock0) { isc__socket_t *sock = (isc__socket_t *)sock0; int fd; isc__socketmgr_t *manager; - + isc__socketthread_t *thread; fflush(stdout); REQUIRE(VALID_SOCKET(sock)); LOCK(&sock->lock); - REQUIRE(sock->references == 1); - REQUIRE(sock->type != isc_sockettype_fdwatch); REQUIRE(sock->fd >= 0 && sock->fd < (int)sock->manager->maxsocks); INSIST(!sock->connecting); - INSIST(!sock->pending_recv); - INSIST(!sock->pending_send); - INSIST(!sock->pending_accept); INSIST(ISC_LIST_EMPTY(sock->recv_list)); INSIST(ISC_LIST_EMPTY(sock->send_list)); INSIST(ISC_LIST_EMPTY(sock->accept_list)); INSIST(ISC_LIST_EMPTY(sock->connect_list)); manager = sock->manager; + thread = &manager->threads[sock->threadid]; fd = sock->fd; sock->fd = -1; + sock->threadid = -1; + sock->dupped = 0; memset(sock->name, 0, sizeof(sock->name)); sock->tag = NULL; @@ -2922,136 +2783,11 @@ isc_socket_close(isc_socket_t *sock0) { UNLOCK(&sock->lock); - socketclose(manager, sock, fd); + socketclose(thread, sock, fd); return (ISC_R_SUCCESS); } -/* - * I/O is possible on a given socket. Schedule an event to this task that - * will call an internal function to do the I/O. This will charge the - * task with the I/O operation and let our select loop handler get back - * to doing something real as fast as possible. - * - * The socket and manager must be locked before calling this function. - */ -static void -dispatch_recv(isc__socket_t *sock) { - intev_t *iev; - isc_socketevent_t *ev; - isc_task_t *sender; - - INSIST(!sock->pending_recv); - - if (sock->type != isc_sockettype_fdwatch) { - ev = ISC_LIST_HEAD(sock->recv_list); - if (ev == NULL) - return; - socket_log(sock, NULL, EVENT, NULL, 0, 0, - "dispatch_recv: event %p -> task %p", - ev, ev->ev_sender); - sender = ev->ev_sender; - } else { - sender = sock->fdwatchtask; - } - - sock->pending_recv = 1; - iev = &sock->readable_ev; - - sock->references++; - iev->ev_sender = sock; - if (sock->type == isc_sockettype_fdwatch) - iev->ev_action = internal_fdwatch_read; - else - iev->ev_action = internal_recv; - iev->ev_arg = sock; - - isc_task_send(sender, (isc_event_t **)&iev); -} - -static void -dispatch_send(isc__socket_t *sock) { - intev_t *iev; - isc_socketevent_t *ev; - isc_task_t *sender; - - INSIST(!sock->pending_send); - - if (sock->type != isc_sockettype_fdwatch) { - ev = ISC_LIST_HEAD(sock->send_list); - if (ev == NULL) - return; - socket_log(sock, NULL, EVENT, NULL, 0, 0, - "dispatch_send: event %p -> task %p", - ev, ev->ev_sender); - sender = ev->ev_sender; - } else { - sender = sock->fdwatchtask; - } - - sock->pending_send = 1; - iev = &sock->writable_ev; - - sock->references++; - iev->ev_sender = sock; - if (sock->type == isc_sockettype_fdwatch) - iev->ev_action = internal_fdwatch_write; - else - iev->ev_action = internal_send; - iev->ev_arg = sock; - - isc_task_send(sender, (isc_event_t **)&iev); -} - -/* - * Dispatch an internal accept event. - */ -static void -dispatch_accept(isc__socket_t *sock) { - intev_t *iev; - isc_socket_newconnev_t *ev; - - INSIST(!sock->pending_accept); - - /* - * Are there any done events left, or were they all canceled - * before the manager got the socket lock? - */ - ev = ISC_LIST_HEAD(sock->accept_list); - if (ev == NULL) - return; - - sock->pending_accept = 1; - iev = &sock->readable_ev; - - sock->references++; /* keep socket around for this internal event */ - iev->ev_sender = sock; - iev->ev_action = internal_accept; - iev->ev_arg = sock; - - isc_task_send(ev->ev_sender, (isc_event_t **)&iev); -} - -static void -dispatch_connect(isc__socket_t *sock) { - intev_t *iev; - isc_socket_connev_t *ev; - - iev = &sock->writable_ev; - - ev = ISC_LIST_HEAD(sock->connect_list); - INSIST(ev != NULL); /* XXX */ - - INSIST(sock->connecting); - - sock->references++; /* keep socket around for this internal event */ - iev->ev_sender = sock; - iev->ev_action = internal_connect; - iev->ev_arg = sock; - - isc_task_send(ev->ev_sender, (isc_event_t **)&iev); -} - /* * Dequeue an item off the given socket's read queue, set the result code * in the done event to the one provided, and send it to the task it was @@ -3070,13 +2806,15 @@ send_recvdone_event(isc__socket_t *sock, isc_socketevent_t **dev) { (*dev)->ev_sender = sock; - if (ISC_LINK_LINKED(*dev, ev_link)) + if (ISC_LINK_LINKED(*dev, ev_link)) { ISC_LIST_DEQUEUE(sock->recv_list, *dev, ev_link); + } if (((*dev)->attributes & ISC_SOCKEVENTATTR_ATTACHED) != 0) { - isc_task_sendanddetach(&task, (isc_event_t **)dev); + isc_task_sendtoanddetach(&task, (isc_event_t **)dev, + sock->threadid); } else { - isc_task_send(task, (isc_event_t **)dev); + isc_task_sendto(task, (isc_event_t **)dev, sock->threadid); } } @@ -3098,9 +2836,10 @@ send_senddone_event(isc__socket_t *sock, isc_socketevent_t **dev) { ISC_LIST_DEQUEUE(sock->send_list, *dev, ev_link); if (((*dev)->attributes & ISC_SOCKEVENTATTR_ATTACHED) != 0) { - isc_task_sendanddetach(&task, (isc_event_t **)dev); + isc_task_sendtoanddetach(&task, (isc_event_t **)dev, + sock->threadid); } else { - isc_task_send(task, (isc_event_t **)dev); + isc_task_sendto(task, (isc_event_t **)dev, sock->threadid); } } @@ -3118,10 +2857,11 @@ send_connectdone_event(isc__socket_t *sock, isc_socket_connev_t **dev) { task = (*dev)->ev_sender; (*dev)->ev_sender = sock; - if (ISC_LINK_LINKED(*dev, ev_link)) + if (ISC_LINK_LINKED(*dev, ev_link)) { ISC_LIST_DEQUEUE(sock->connect_list, *dev, ev_link); + } - isc_task_sendanddetach(&task, (isc_event_t **)dev); + isc_task_sendtoanddetach(&task, (isc_event_t **)dev, sock->threadid); } /* @@ -3136,9 +2876,9 @@ send_connectdone_event(isc__socket_t *sock, isc_socket_connev_t **dev) { * so just unlock and return. */ static void -internal_accept(isc_task_t *me, isc_event_t *ev) { - isc__socket_t *sock; +internal_accept(isc__socket_t *sock) { isc__socketmgr_t *manager; + isc__socketthread_t *thread, *nthread; isc_socket_newconnev_t *dev; isc_task_t *task; socklen_t addrlen; @@ -3147,9 +2887,6 @@ internal_accept(isc_task_t *me, isc_event_t *ev) { char strbuf[ISC_STRERRORSIZE]; const char *err = "accept"; - UNUSED(me); - - sock = ev->ev_sender; INSIST(VALID_SOCKET(sock)); LOCK(&sock->lock); @@ -3159,18 +2896,9 @@ internal_accept(isc_task_t *me, isc_event_t *ev) { manager = sock->manager; INSIST(VALID_MANAGER(manager)); + thread = &manager->threads[sock->threadid]; INSIST(sock->listener); - INSIST(sock->pending_accept == 1); - sock->pending_accept = 0; - - INSIST(sock->references > 0); - sock->references--; /* the internal event is done with this socket */ - if (sock->references == 0) { - UNLOCK(&sock->lock); - destroy(&sock); - return; - } /* * Get the first item off the accept list. @@ -3178,6 +2906,7 @@ internal_accept(isc_task_t *me, isc_event_t *ev) { */ dev = ISC_LIST_HEAD(sock->accept_list); if (dev == NULL) { + unwatch_fd(thread, sock->fd, SELECT_POKE_ACCEPT); UNLOCK(&sock->lock); return; } @@ -3303,8 +3032,9 @@ internal_accept(isc_task_t *me, isc_event_t *ev) { /* * Poke watcher if there are more pending accepts. */ - if (!ISC_LIST_EMPTY(sock->accept_list)) - select_poke(sock->manager, sock->fd, SELECT_POKE_ACCEPT); + if (ISC_LIST_EMPTY(sock->accept_list)) + unwatch_fd(thread, sock->fd, + SELECT_POKE_ACCEPT); UNLOCK(&sock->lock); @@ -3323,8 +3053,10 @@ internal_accept(isc_task_t *me, isc_event_t *ev) { int lockid = FDLOCK_ID(fd); NEWCONNSOCK(dev)->fd = fd; + NEWCONNSOCK(dev)->threadid = gen_threadid(NEWCONNSOCK(dev)); NEWCONNSOCK(dev)->bound = 1; NEWCONNSOCK(dev)->connected = 1; + nthread = &manager->threads[NEWCONNSOCK(dev)->threadid]; /* * Use minimum mtu if possible. @@ -3348,19 +3080,19 @@ internal_accept(isc_task_t *me, isc_event_t *ev) { NEWCONNSOCK(dev)->active = 1; } - LOCK(&manager->fdlock[lockid]); - manager->fds[fd] = NEWCONNSOCK(dev); - manager->fdstate[fd] = MANAGED; + LOCK(&nthread->fdlock[lockid]); + nthread->fds[fd] = NEWCONNSOCK(dev); + nthread->fdstate[fd] = MANAGED; #if defined(USE_EPOLL) - manager->epoll_events[fd] = 0; + nthread->epoll_events[fd] = 0; #endif - UNLOCK(&manager->fdlock[lockid]); + UNLOCK(&nthread->fdlock[lockid]); LOCK(&manager->lock); #ifdef USE_SELECT - if (manager->maxfd < fd) - manager->maxfd = fd; + if (nthread->maxfd < fd) + nthread->maxfd = fd; #endif socket_log(sock, &NEWCONNSOCK(dev)->peer_address, CREATION, @@ -3386,11 +3118,11 @@ internal_accept(isc_task_t *me, isc_event_t *ev) { task = dev->ev_sender; dev->ev_sender = sock; - isc_task_sendanddetach(&task, ISC_EVENT_PTR(&dev)); + isc_task_sendtoanddetach(&task, ISC_EVENT_PTR(&dev), sock->threadid); return; soft_error: - select_poke(sock->manager, sock->fd, SELECT_POKE_ACCEPT); + watch_fd(thread, sock->fd, SELECT_POKE_ACCEPT); UNLOCK(&sock->lock); inc_stats(manager->stats, sock->statsindex[STATID_ACCEPTFAIL]); @@ -3398,40 +3130,29 @@ internal_accept(isc_task_t *me, isc_event_t *ev) { } static void -internal_recv(isc_task_t *me, isc_event_t *ev) { +internal_recv(isc__socket_t *sock) { isc_socketevent_t *dev; - isc__socket_t *sock; - INSIST(ev->ev_type == ISC_SOCKEVENT_INTR); - - sock = ev->ev_sender; INSIST(VALID_SOCKET(sock)); LOCK(&sock->lock); + dev = ISC_LIST_HEAD(sock->recv_list); + if (dev == NULL) { + goto finish; + } + socket_log(sock, NULL, IOEVENT, isc_msgcat, ISC_MSGSET_SOCKET, ISC_MSG_INTERNALRECV, - "internal_recv: task %p got event %p", me, ev); - - INSIST(sock->pending_recv == 1); - sock->pending_recv = 0; - - INSIST(sock->references > 0); - sock->references--; /* the internal event is done with this socket */ - if (sock->references == 0) { - UNLOCK(&sock->lock); - destroy(&sock); - return; - } + "internal_recv: event %p -> task %p", dev, dev->ev_sender); /* * Try to do as much I/O as possible on this socket. There are no * limits here, currently. */ - dev = ISC_LIST_HEAD(sock->recv_list); while (dev != NULL) { switch (doio_recv(sock, dev)) { case DOIO_SOFT: - goto poke; + goto finish; case DOIO_EOF: /* @@ -3444,7 +3165,7 @@ internal_recv(isc_task_t *me, isc_event_t *ev) { send_recvdone_event(sock, &dev); dev = ISC_LIST_HEAD(sock->recv_list); } while (dev != NULL); - goto poke; + goto finish; case DOIO_SUCCESS: case DOIO_HARD: @@ -3455,51 +3176,37 @@ internal_recv(isc_task_t *me, isc_event_t *ev) { dev = ISC_LIST_HEAD(sock->recv_list); } - poke: - if (!ISC_LIST_EMPTY(sock->recv_list)) - select_poke(sock->manager, sock->fd, SELECT_POKE_READ); - + finish: + if (ISC_LIST_EMPTY(sock->recv_list)) { + unwatch_fd(&sock->manager->threads[sock->threadid], sock->fd, + SELECT_POKE_READ); + } UNLOCK(&sock->lock); } static void -internal_send(isc_task_t *me, isc_event_t *ev) { +internal_send(isc__socket_t *sock) { isc_socketevent_t *dev; - isc__socket_t *sock; - INSIST(ev->ev_type == ISC_SOCKEVENT_INTW); - - /* - * Find out what socket this is and lock it. - */ - sock = (isc__socket_t *)ev->ev_sender; INSIST(VALID_SOCKET(sock)); LOCK(&sock->lock); - socket_log(sock, NULL, IOEVENT, - isc_msgcat, ISC_MSGSET_SOCKET, ISC_MSG_INTERNALSEND, - "internal_send: task %p got event %p", me, ev); - - INSIST(sock->pending_send == 1); - sock->pending_send = 0; - - INSIST(sock->references > 0); - sock->references--; /* the internal event is done with this socket */ - if (sock->references == 0) { - UNLOCK(&sock->lock); - destroy(&sock); - return; + dev = ISC_LIST_HEAD(sock->send_list); + if (dev == NULL) { + goto finish; } + socket_log(sock, NULL, EVENT, NULL, 0, 0, + "internal_send: event %p -> task %p", + dev, dev->ev_sender); /* * Try to do as much I/O as possible on this socket. There are no * limits here, currently. */ - dev = ISC_LIST_HEAD(sock->send_list); while (dev != NULL) { switch (doio_send(sock, dev)) { case DOIO_SOFT: - goto poke; + goto finish; case DOIO_HARD: case DOIO_SUCCESS: @@ -3510,92 +3217,11 @@ internal_send(isc_task_t *me, isc_event_t *ev) { dev = ISC_LIST_HEAD(sock->send_list); } - poke: - if (!ISC_LIST_EMPTY(sock->send_list)) - select_poke(sock->manager, sock->fd, SELECT_POKE_WRITE); - - UNLOCK(&sock->lock); -} - -static void -internal_fdwatch_write(isc_task_t *me, isc_event_t *ev) { - isc__socket_t *sock; - int more_data; - - INSIST(ev->ev_type == ISC_SOCKEVENT_INTW); - - /* - * Find out what socket this is and lock it. - */ - sock = (isc__socket_t *)ev->ev_sender; - INSIST(VALID_SOCKET(sock)); - - LOCK(&sock->lock); - socket_log(sock, NULL, IOEVENT, - isc_msgcat, ISC_MSGSET_SOCKET, ISC_MSG_INTERNALSEND, - "internal_fdwatch_write: task %p got event %p", me, ev); - - INSIST(sock->pending_send == 1); - - UNLOCK(&sock->lock); - more_data = (sock->fdwatchcb)(me, (isc_socket_t *)sock, - sock->fdwatcharg, ISC_SOCKFDWATCH_WRITE); - LOCK(&sock->lock); - - sock->pending_send = 0; - - INSIST(sock->references > 0); - sock->references--; /* the internal event is done with this socket */ - if (sock->references == 0) { - UNLOCK(&sock->lock); - destroy(&sock); - return; + finish: + if (ISC_LIST_EMPTY(sock->send_list)) { + unwatch_fd(&sock->manager->threads[sock->threadid], + sock->fd, SELECT_POKE_WRITE); } - - if (more_data) - select_poke(sock->manager, sock->fd, SELECT_POKE_WRITE); - - UNLOCK(&sock->lock); -} - -static void -internal_fdwatch_read(isc_task_t *me, isc_event_t *ev) { - isc__socket_t *sock; - int more_data; - - INSIST(ev->ev_type == ISC_SOCKEVENT_INTR); - - /* - * Find out what socket this is and lock it. - */ - sock = (isc__socket_t *)ev->ev_sender; - INSIST(VALID_SOCKET(sock)); - - LOCK(&sock->lock); - socket_log(sock, NULL, IOEVENT, - isc_msgcat, ISC_MSGSET_SOCKET, ISC_MSG_INTERNALRECV, - "internal_fdwatch_read: task %p got event %p", me, ev); - - INSIST(sock->pending_recv == 1); - - UNLOCK(&sock->lock); - more_data = (sock->fdwatchcb)(me, (isc_socket_t *)sock, - sock->fdwatcharg, ISC_SOCKFDWATCH_READ); - LOCK(&sock->lock); - - sock->pending_recv = 0; - - INSIST(sock->references > 0); - sock->references--; /* the internal event is done with this socket */ - if (sock->references == 0) { - UNLOCK(&sock->lock); - destroy(&sock); - return; - } - - if (more_data) - select_poke(sock->manager, sock->fd, SELECT_POKE_READ); - UNLOCK(&sock->lock); } @@ -3604,127 +3230,121 @@ internal_fdwatch_read(isc_task_t *me, isc_event_t *ev) { * and unlocking twice if both reads and writes are possible. */ static void -process_fd(isc__socketmgr_t *manager, int fd, bool readable, +process_fd(isc__socketthread_t *thread, int fd, bool readable, bool writeable) { isc__socket_t *sock; - bool unlock_sock; - bool unwatch_read = false, unwatch_write = false; int lockid = FDLOCK_ID(fd); /* * If the socket is going to be closed, don't do more I/O. */ - LOCK(&manager->fdlock[lockid]); - if (manager->fdstate[fd] == CLOSE_PENDING) { - UNLOCK(&manager->fdlock[lockid]); + LOCK(&thread->fdlock[lockid]); + if (thread->fdstate[fd] == CLOSE_PENDING) { + UNLOCK(&thread->fdlock[lockid]); - (void)unwatch_fd(manager, fd, SELECT_POKE_READ); - (void)unwatch_fd(manager, fd, SELECT_POKE_WRITE); + (void)unwatch_fd(thread, fd, SELECT_POKE_READ); + (void)unwatch_fd(thread, fd, SELECT_POKE_WRITE); return; } - sock = manager->fds[fd]; - unlock_sock = false; + sock = thread->fds[fd]; + if (sock == NULL) { + UNLOCK(&thread->fdlock[lockid]); + return; + } + if (SOCK_DEAD(sock)) { /* Sock is being closed, bail */ + goto unlock_fd; + } + + isc_refcount_increment(&sock->references); + if (readable) { - if (sock == NULL) { - unwatch_read = true; - goto check_write; + if (sock->listener) { + internal_accept(sock); + } else { + internal_recv(sock); } - unlock_sock = true; - LOCK(&sock->lock); - if (!SOCK_DEAD(sock)) { - if (sock->listener) - dispatch_accept(sock); - else - dispatch_recv(sock); - } - unwatch_read = true; } -check_write: + if (writeable) { - if (sock == NULL) { - unwatch_write = true; - goto unlock_fd; + if (sock->connecting) { + internal_connect(sock); + } else { + internal_send(sock); } - if (!unlock_sock) { - unlock_sock = true; - LOCK(&sock->lock); - } - if (!SOCK_DEAD(sock)) { - if (sock->connecting) - dispatch_connect(sock); - else - dispatch_send(sock); - } - unwatch_write = true; } - if (unlock_sock) - UNLOCK(&sock->lock); unlock_fd: - UNLOCK(&manager->fdlock[lockid]); - if (unwatch_read) - (void)unwatch_fd(manager, fd, SELECT_POKE_READ); - if (unwatch_write) - (void)unwatch_fd(manager, fd, SELECT_POKE_WRITE); - + UNLOCK(&thread->fdlock[lockid]); + if (sock != NULL) { + if (isc_refcount_decrement(&sock->references) == 1) { + destroy(&sock); + } + } } +/* + * process_fds is different for different event loops + * it takes the events from event loops and for each FD + * launches process_fd + */ #ifdef USE_KQUEUE static bool -process_fds(isc__socketmgr_t *manager, struct kevent *events, int nevents) { +process_fds(isc__socketthread_t *thread, struct kevent *events, + int nevents) +{ int i; bool readable, writable; bool done = false; bool have_ctlevent = false; - - if (nevents == manager->nevents) { + if (nevents == thread->nevents) { /* * This is not an error, but something unexpected. If this * happens, it may indicate the need for increasing * ISC_SOCKET_MAXEVENTS. */ - manager_log(manager, ISC_LOGCATEGORY_GENERAL, - ISC_LOGMODULE_SOCKET, ISC_LOG_INFO, - "maximum number of FD events (%d) received", - nevents); + thread_log(thread, ISC_LOGCATEGORY_GENERAL, + ISC_LOGMODULE_SOCKET, ISC_LOG_INFO, + "maximum number of FD events (%d) received", + nevents); } for (i = 0; i < nevents; i++) { - REQUIRE(events[i].ident < manager->maxsocks); - if (events[i].ident == (uintptr_t)manager->pipe_fds[0]) { + REQUIRE(events[i].ident < thread->manager->maxsocks); + if (events[i].ident == (uintptr_t)thread->pipe_fds[0]) { have_ctlevent = true; continue; } readable = (events[i].filter == EVFILT_READ); writable = (events[i].filter == EVFILT_WRITE); - process_fd(manager, events[i].ident, readable, writable); + process_fd(thread, events[i].ident, readable, writable); } if (have_ctlevent) - done = process_ctlfd(manager); + done = process_ctlfd(thread); return (done); } #elif defined(USE_EPOLL) static bool -process_fds(isc__socketmgr_t *manager, struct epoll_event *events, int nevents) +process_fds(isc__socketthread_t *thread, struct epoll_event *events, + int nevents) { int i; bool done = false; bool have_ctlevent = false; - if (nevents == manager->nevents) { - manager_log(manager, ISC_LOGCATEGORY_GENERAL, - ISC_LOGMODULE_SOCKET, ISC_LOG_INFO, - "maximum number of FD events (%d) received", - nevents); + if (nevents == thread->nevents) { + thread_log(thread, ISC_LOGCATEGORY_GENERAL, + ISC_LOGMODULE_SOCKET, ISC_LOG_INFO, + "maximum number of FD events (%d) received", + nevents); } for (i = 0; i < nevents; i++) { - REQUIRE(events[i].data.fd < (int)manager->maxsocks); - if (events[i].data.fd == manager->pipe_fds[0]) { + REQUIRE(events[i].data.fd < (int)thread->manager->maxsocks); + if (events[i].data.fd == thread->pipe_fds[0]) { have_ctlevent = true; continue; } @@ -3738,78 +3358,80 @@ process_fds(isc__socketmgr_t *manager, struct epoll_event *events, int nevents) * won't block because we use non-blocking sockets. */ int fd = events[i].data.fd; - events[i].events |= manager->epoll_events[fd]; + events[i].events |= thread->epoll_events[fd]; } - process_fd(manager, events[i].data.fd, + process_fd(thread, events[i].data.fd, (events[i].events & EPOLLIN) != 0, (events[i].events & EPOLLOUT) != 0); } if (have_ctlevent) - done = process_ctlfd(manager); + done = process_ctlfd(thread); return (done); } #elif defined(USE_DEVPOLL) static bool -process_fds(isc__socketmgr_t *manager, struct pollfd *events, int nevents) { +process_fds(isc__socketthread_t *thread, struct pollfd *events, + int nevents) +{ int i; bool done = false; bool have_ctlevent = false; - if (nevents == manager->nevents) { - manager_log(manager, ISC_LOGCATEGORY_GENERAL, - ISC_LOGMODULE_SOCKET, ISC_LOG_INFO, - "maximum number of FD events (%d) received", - nevents); + if (nevents == thread->nevents) { + thread_log(thread, ISC_LOGCATEGORY_GENERAL, + ISC_LOGMODULE_SOCKET, ISC_LOG_INFO, + "maximum number of FD events (%d) received", + nevents); } for (i = 0; i < nevents; i++) { - REQUIRE(events[i].fd < (int)manager->maxsocks); - if (events[i].fd == manager->pipe_fds[0]) { + REQUIRE(events[i].fd < (int)thread->manager->maxsocks); + if (events[i].fd == thread->pipe_fds[0]) { have_ctlevent = true; continue; } - process_fd(manager, events[i].fd, + process_fd(thread, events[i].fd, (events[i].events & POLLIN) != 0, (events[i].events & POLLOUT) != 0); } if (have_ctlevent) - done = process_ctlfd(manager); + done = process_ctlfd(thread); return (done); } #elif defined(USE_SELECT) static void -process_fds(isc__socketmgr_t *manager, int maxfd, fd_set *readfds, +process_fds(isc__socketthread_t *thread, int maxfd, fd_set *readfds, fd_set *writefds) { int i; - REQUIRE(maxfd <= (int)manager->maxsocks); + REQUIRE(maxfd <= (int)thread->manager->maxsocks); for (i = 0; i < maxfd; i++) { - if (i == manager->pipe_fds[0] || i == manager->pipe_fds[1]) + if (i == thread->pipe_fds[0] || i == thread->pipe_fds[1]) continue; - process_fd(manager, i, FD_ISSET(i, readfds), + process_fd(thread, i, FD_ISSET(i, readfds), FD_ISSET(i, writefds)); } } #endif static bool -process_ctlfd(isc__socketmgr_t *manager) { +process_ctlfd(isc__socketthread_t *thread) { int msg, fd; for (;;) { - select_readmsg(manager, &fd, &msg); + select_readmsg(thread, &fd, &msg); - manager_log(manager, IOEVENT, - isc_msgcat_get(isc_msgcat, ISC_MSGSET_SOCKET, - ISC_MSG_WATCHERMSG, - "watcher got message %d " - "for socket %d"), msg, fd); + thread_log(thread, IOEVENT, + isc_msgcat_get(isc_msgcat, ISC_MSGSET_SOCKET, + ISC_MSG_WATCHERMSG, + "watcher got message %d " + "for socket %d"), msg, fd); /* * Nothing to read? @@ -3832,7 +3454,7 @@ process_ctlfd(isc__socketmgr_t *manager) { * and decide if we need to watch on it now * or not. */ - wakeup_socket(manager, fd, msg); + wakeup_socket(thread, fd, msg); } return (false); @@ -3842,14 +3464,19 @@ process_ctlfd(isc__socketmgr_t *manager) { * This is the thread that will loop forever, always in a select or poll * call. * - * When select returns something to do, track down what thread gets to do - * this I/O and post the event to it. + * When select returns something to do, do whatever's necessary and post + * an event to the task that was requesting the action. */ static isc_threadresult_t -watcher(void *uap) { - isc__socketmgr_t *manager = uap; +netthread(void *uap) { + isc__socketthread_t *thread = uap; + isc__socketmgr_t *manager = thread->manager; + (void)manager; bool done; int cc; + if (manager->nthreads > 1) { + isc_thread_setaffinity(thread->threadid); + } #ifdef USE_KQUEUE const char *fnname = "kevent()"; #elif defined (USE_EPOLL) @@ -3873,44 +3500,48 @@ watcher(void *uap) { /* * Get the control fd here. This will never change. */ - ctlfd = manager->pipe_fds[0]; + ctlfd = thread->pipe_fds[0]; #endif done = false; while (!done) { do { #ifdef USE_KQUEUE - cc = kevent(manager->kqueue_fd, NULL, 0, - manager->events, manager->nevents, NULL); + cc = kevent(thread->kqueue_fd, NULL, 0, + thread->events, thread->nevents, NULL); #elif defined(USE_EPOLL) - cc = epoll_wait(manager->epoll_fd, manager->events, - manager->nevents, -1); + cc = epoll_wait(thread->epoll_fd, + thread->events, + thread->nevents, -1); #elif defined(USE_DEVPOLL) /* * Re-probe every thousand calls. */ - if (manager->calls++ > 1000U) { + if (thread->calls++ > 1000U) { result = isc_resource_getcurlimit( isc_resource_openfiles, - &manager->open_max); - if (result != ISC_R_SUCCESS) - manager->open_max = 64; - manager->calls = 0; + &thread->open_max); + if (result != ISC_R_SUCCESS) { + thread->open_max = 64; + } + thread->calls = 0; } for (pass = 0; pass < 2; pass++) { - dvp.dp_fds = manager->events; - dvp.dp_nfds = manager->nevents; - if (dvp.dp_nfds >= manager->open_max) - dvp.dp_nfds = manager->open_max - 1; + dvp.dp_fds = thread->events; + dvp.dp_nfds = thread->nevents; + if (dvp.dp_nfds >= thread->open_max) { + dvp.dp_nfds = thread->open_max - 1; + } #ifndef ISC_SOCKET_USE_POLLWATCH dvp.dp_timeout = -1; #else - if (pollstate == poll_idle) + if (pollstate == poll_idle) { dvp.dp_timeout = -1; - else + } else { dvp.dp_timeout = ISC_SOCKET_POLLWATCH_TIMEOUT; + } #endif /* ISC_SOCKET_USE_POLLWATCH */ - cc = ioctl(manager->devpoll_fd, DP_POLL, &dvp); + cc = ioctl(thread->devpoll_fd, DP_POLL, &dvp); if (cc == -1 && errno == EINVAL) { /* * {OPEN_MAX} may have dropped. Look @@ -3918,23 +3549,29 @@ watcher(void *uap) { */ result = isc_resource_getcurlimit( isc_resource_openfiles, - &manager->open_max); - if (result != ISC_R_SUCCESS) - manager->open_max = 64; - } else + &thread->open_max); + if (result != ISC_R_SUCCESS) { + thread->open_max = 64; + } + } else { break; + } } #elif defined(USE_SELECT) + /* + * We will have only one thread anyway, we can lock + * manager lock and don't care + */ LOCK(&manager->lock); - memmove(manager->read_fds_copy, manager->read_fds, - manager->fd_bufsize); - memmove(manager->write_fds_copy, manager->write_fds, - manager->fd_bufsize); - maxfd = manager->maxfd + 1; + memmove(thread->read_fds_copy, thread->read_fds, + thread->fd_bufsize); + memmove(thread->write_fds_copy, thread->write_fds, + thread->fd_bufsize); + maxfd = thread->maxfd + 1; UNLOCK(&manager->lock); - cc = select(maxfd, manager->read_fds_copy, - manager->write_fds_copy, NULL, NULL); + cc = select(maxfd, thread->read_fds_copy, + thread->write_fds_copy, NULL, NULL); #endif /* USE_KQUEUE */ if (cc < 0 && !SOFT_ERROR(errno)) { @@ -3949,10 +3586,11 @@ watcher(void *uap) { #if defined(USE_DEVPOLL) && defined(ISC_SOCKET_USE_POLLWATCH) if (cc == 0) { - if (pollstate == poll_active) + if (pollstate == poll_active) { pollstate = poll_checking; - else if (pollstate == poll_checking) + } else if (pollstate == poll_checking) { pollstate = poll_idle; + } } else if (cc > 0) { if (pollstate == poll_checking) { /* @@ -3963,11 +3601,11 @@ watcher(void *uap) { * (and it can also be a false positive) * so it would be just too noisy. */ - manager_log(manager, - ISC_LOGCATEGORY_GENERAL, - ISC_LOGMODULE_SOCKET, - ISC_LOG_DEBUG(1), - "unexpected POLL timeout"); + thread_log(thread, + ISC_LOGCATEGORY_GENERAL, + ISC_LOGMODULE_SOCKET, + ISC_LOG_DEBUG(1), + "unexpected POLL timeout"); } pollstate = poll_active; } @@ -3975,23 +3613,23 @@ watcher(void *uap) { } while (cc < 0); #if defined(USE_KQUEUE) || defined (USE_EPOLL) || defined (USE_DEVPOLL) - done = process_fds(manager, manager->events, cc); + done = process_fds(thread, thread->events, cc); #elif defined(USE_SELECT) - process_fds(manager, maxfd, manager->read_fds_copy, - manager->write_fds_copy); + process_fds(thread, maxfd, thread->read_fds_copy, + thread->write_fds_copy); /* * Process reads on internal, control fd. */ - if (FD_ISSET(ctlfd, manager->read_fds_copy)) - done = process_ctlfd(manager); + if (FD_ISSET(ctlfd, thread->read_fds_copy)) { + done = process_ctlfd(thread); + } #endif } - manager_log(manager, TRACE, "%s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_EXITING, "watcher exiting")); - + thread_log(thread, TRACE, "%s", + isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, + ISC_MSG_EXITING, "watcher exiting")); return ((isc_threadresult_t)0); } @@ -4014,24 +3652,60 @@ isc_socketmgr_maxudp(isc_socketmgr_t *manager0, int maxudp) { } /* - * Create a new socket manager. + * Setup socket thread, thread->manager and thread->threadid must be filled. */ static isc_result_t -setup_watcher(isc_mem_t *mctx, isc__socketmgr_t *manager) { - isc_result_t result; -#if defined(USE_KQUEUE) || defined(USE_EPOLL) || defined(USE_DEVPOLL) +setup_thread(isc__socketthread_t *thread) { + isc_result_t result = ISC_R_SUCCESS; + int i; char strbuf[ISC_STRERRORSIZE]; -#endif + + REQUIRE(thread != NULL); + REQUIRE(VALID_MANAGER(thread->manager)); + REQUIRE(thread->threadid >= 0 && + thread->threadid < thread->manager->nthreads); + + thread->fds = isc_mem_get(thread->manager->mctx, + thread->manager->maxsocks * + sizeof(isc__socket_t *)); + + memset(thread->fds, 0, + thread->manager->maxsocks * sizeof(isc_socket_t *)); + + thread->fdstate = isc_mem_get(thread->manager->mctx, + thread->manager->maxsocks * sizeof(int)); + + memset(thread->fdstate, 0, thread->manager->maxsocks * sizeof(int)); + + thread->fdlock = isc_mem_get(thread->manager->mctx, + FDLOCK_COUNT * sizeof(isc_mutex_t)); + + for (i = 0; i < FDLOCK_COUNT; i++) { + result = isc_mutex_init(&thread->fdlock[i]); + if (result != ISC_R_SUCCESS) { + return (result); + } + } + + if (pipe(thread->pipe_fds) != 0) { + strerror_r(errno, strbuf, sizeof(strbuf)); + UNEXPECTED_ERROR(__FILE__, __LINE__, + "pipe() %s: %s", + isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, + ISC_MSG_FAILED, "failed"), + strbuf); + return (ISC_R_UNEXPECTED); + } + RUNTIME_CHECK(make_nonblock(thread->pipe_fds[0]) == ISC_R_SUCCESS); #ifdef USE_KQUEUE - manager->nevents = ISC_SOCKET_MAXEVENTS; - manager->events = isc_mem_get(mctx, sizeof(struct kevent) * - manager->nevents); - if (manager->events == NULL) - return (ISC_R_NOMEMORY); - manager->kqueue_fd = kqueue(); - if (manager->kqueue_fd == -1) { + thread->nevents = ISC_SOCKET_MAXEVENTS; + thread->events = isc_mem_get(thread->manager->mctx, + sizeof(struct kevent) * thread->nevents); + + thread->kqueue_fd = kqueue(); + if (thread->kqueue_fd == -1) { result = isc__errno2result(errno); strerror_r(errno, strbuf, sizeof(strbuf)); UNEXPECTED_ERROR(__FILE__, __LINE__, @@ -4039,69 +3713,69 @@ setup_watcher(isc_mem_t *mctx, isc__socketmgr_t *manager) { isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed"), strbuf); - isc_mem_put(mctx, manager->events, - sizeof(struct kevent) * manager->nevents); + isc_mem_put(thread->manager->mctx, thread->events, + sizeof(struct kevent) * thread->nevents); return (result); } - result = watch_fd(manager, manager->pipe_fds[0], SELECT_POKE_READ); + result = watch_fd(thread, thread->pipe_fds[0], SELECT_POKE_READ); if (result != ISC_R_SUCCESS) { - close(manager->kqueue_fd); - isc_mem_put(mctx, manager->events, - sizeof(struct kevent) * manager->nevents); - return (result); + close(thread->kqueue_fd); + isc_mem_put(thread->manager->mctx, thread->events, + sizeof(struct kevent) * thread->nevents); } + return (result); + #elif defined(USE_EPOLL) - manager->nevents = ISC_SOCKET_MAXEVENTS; - manager->events = isc_mem_get(mctx, sizeof(struct epoll_event) * - manager->nevents); - if (manager->events == NULL) - return (ISC_R_NOMEMORY); - manager->epoll_fd = epoll_create(manager->nevents); - if (manager->epoll_fd == -1) { + thread->nevents = ISC_SOCKET_MAXEVENTS; + thread->epoll_events = isc_mem_get(thread->manager->mctx, + (thread->manager->maxsocks * + sizeof(uint32_t))); + + memset(thread->epoll_events, 0, + thread->manager->maxsocks * sizeof(uint32_t)); + + thread->events = isc_mem_get(thread->manager->mctx, + sizeof(struct epoll_event) * + thread->nevents); + + thread->epoll_fd = epoll_create(thread->nevents); + if (thread->epoll_fd == -1) { result = isc__errno2result(errno); strerror_r(errno, strbuf, sizeof(strbuf)); UNEXPECTED_ERROR(__FILE__, __LINE__, "epoll_create %s: %s", isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed"), - strbuf); - isc_mem_put(mctx, manager->events, - sizeof(struct epoll_event) * manager->nevents); - return (result); - } - result = watch_fd(manager, manager->pipe_fds[0], SELECT_POKE_READ); - if (result != ISC_R_SUCCESS) { - close(manager->epoll_fd); - isc_mem_put(mctx, manager->events, - sizeof(struct epoll_event) * manager->nevents); + strbuf); return (result); + } + + result = watch_fd(thread, thread->pipe_fds[0], SELECT_POKE_READ); + return (result); + #elif defined(USE_DEVPOLL) - manager->nevents = ISC_SOCKET_MAXEVENTS; + thread->nevents = ISC_SOCKET_MAXEVENTS; result = isc_resource_getcurlimit(isc_resource_openfiles, - &manager->open_max); + &thread->open_max); if (result != ISC_R_SUCCESS) - manager->open_max = 64; - manager->calls = 0; - manager->events = isc_mem_get(mctx, sizeof(struct pollfd) * - manager->nevents); - if (manager->events == NULL) - return (ISC_R_NOMEMORY); + thread->open_max = 64; + thread->calls = 0; + thread->events = isc_mem_get(thread->manager->mctx, + sizeof(struct pollfd) * thread->nevents); + /* * Note: fdpollinfo should be able to support all possible FDs, so * it must have maxsocks entries (not nevents). */ - manager->fdpollinfo = isc_mem_get(mctx, sizeof(pollinfo_t) * - manager->maxsocks); - if (manager->fdpollinfo == NULL) { - isc_mem_put(mctx, manager->events, - sizeof(struct pollfd) * manager->nevents); - return (ISC_R_NOMEMORY); - } - memset(manager->fdpollinfo, 0, sizeof(pollinfo_t) * manager->maxsocks); - manager->devpoll_fd = open("/dev/poll", O_RDWR); - if (manager->devpoll_fd == -1) { + thread->fdpollinfo = isc_mem_get(thread->manager->mctx, + sizeof(pollinfo_t) * + thread->manager->maxsocks); + memset(thread->fdpollinfo, 0, sizeof(pollinfo_t) * + thread->manager->maxsocks); + thread->devpoll_fd = open("/dev/poll", O_RDWR); + if (thread->devpoll_fd == -1) { result = isc__errno2result(errno); strerror_r(errno, strbuf, sizeof(strbuf)); UNEXPECTED_ERROR(__FILE__, __LINE__, @@ -4109,21 +3783,23 @@ setup_watcher(isc_mem_t *mctx, isc__socketmgr_t *manager) { isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed"), strbuf); - isc_mem_put(mctx, manager->events, - sizeof(struct pollfd) * manager->nevents); - isc_mem_put(mctx, manager->fdpollinfo, - sizeof(pollinfo_t) * manager->maxsocks); + isc_mem_put(thread->manager->mctx, thread->events, + sizeof(struct pollfd) * thread->nevents); + isc_mem_put(thread->manager->mctx, thread->fdpollinfo, + sizeof(pollinfo_t) * thread->manager->maxsocks); return (result); } - result = watch_fd(manager, manager->pipe_fds[0], SELECT_POKE_READ); + result = watch_fd(thread, thread->pipe_fds[0], SELECT_POKE_READ); if (result != ISC_R_SUCCESS) { - close(manager->devpoll_fd); - isc_mem_put(mctx, manager->events, - sizeof(struct pollfd) * manager->nevents); - isc_mem_put(mctx, manager->fdpollinfo, - sizeof(pollinfo_t) * manager->maxsocks); + close(thread->devpoll_fd); + isc_mem_put(thread->manager->mctx, thread->events, + sizeof(struct pollfd) * thread->nevents); + isc_mem_put(thread->manager->mctx, thread->fdpollinfo, + sizeof(pollinfo_t) * thread->manager->maxsocks); return (result); } + + return (ISC_R_SUCCESS); #elif defined(USE_SELECT) UNUSED(result); @@ -4133,102 +3809,109 @@ setup_watcher(isc_mem_t *mctx, isc__socketmgr_t *manager) { * FD_SETSIZE, but we separate the cases to avoid possible portability * issues regarding howmany() and the actual representation of fd_set. */ - manager->fd_bufsize = howmany(manager->maxsocks, NFDBITS) * - sizeof(fd_mask); + thread->fd_bufsize = + howmany(manager->maxsocks, NFDBITS) * sizeof(fd_mask); #else - manager->fd_bufsize = sizeof(fd_set); + thread->fd_bufsize = sizeof(fd_set); #endif - manager->read_fds = NULL; - manager->read_fds_copy = NULL; - manager->write_fds = NULL; - manager->write_fds_copy = NULL; + thread->read_fds = isc_mem_get(thread->manager->mctx, + thread->fd_bufsize); + thread->read_fds_copy = isc_mem_get(thread->manager->mctx, + thread->fd_bufsize); + thread->write_fds = isc_mem_get(thread->manager->mctx, + thread->fd_bufsize); + thread->write_fds_copy = isc_mem_get(thread->manager->mctx, + thread->fd_bufsize); + memset(thread->read_fds, 0, thread->fd_bufsize); + memset(thread->write_fds, 0, thread->fd_bufsize); - manager->read_fds = isc_mem_get(mctx, manager->fd_bufsize); - if (manager->read_fds != NULL) - manager->read_fds_copy = isc_mem_get(mctx, manager->fd_bufsize); - if (manager->read_fds_copy != NULL) - manager->write_fds = isc_mem_get(mctx, manager->fd_bufsize); - if (manager->write_fds != NULL) { - manager->write_fds_copy = isc_mem_get(mctx, - manager->fd_bufsize); - } - if (manager->write_fds_copy == NULL) { - if (manager->write_fds != NULL) { - isc_mem_put(mctx, manager->write_fds, - manager->fd_bufsize); - } - if (manager->read_fds_copy != NULL) { - isc_mem_put(mctx, manager->read_fds_copy, - manager->fd_bufsize); - } - if (manager->read_fds != NULL) { - isc_mem_put(mctx, manager->read_fds, - manager->fd_bufsize); - } - return (ISC_R_NOMEMORY); - } - memset(manager->read_fds, 0, manager->fd_bufsize); - memset(manager->write_fds, 0, manager->fd_bufsize); - - (void)watch_fd(manager, manager->pipe_fds[0], SELECT_POKE_READ); - manager->maxfd = manager->pipe_fds[0]; -#endif /* USE_KQUEUE */ + (void)watch_fd(thread, thread->pipe_fds[0], SELECT_POKE_READ); + thread->maxfd = thread->pipe_fds[0]; return (ISC_R_SUCCESS); +#endif /* USE_KQUEUE */ } static void -cleanup_watcher(isc_mem_t *mctx, isc__socketmgr_t *manager) { +cleanup_thread(isc_mem_t *mctx, isc__socketthread_t *thread) { isc_result_t result; + int i; - result = unwatch_fd(manager, manager->pipe_fds[0], SELECT_POKE_READ); + result = unwatch_fd(thread, thread->pipe_fds[0], SELECT_POKE_READ); if (result != ISC_R_SUCCESS) { UNEXPECTED_ERROR(__FILE__, __LINE__, "epoll_ctl(DEL) %s", isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed")); } - #ifdef USE_KQUEUE - close(manager->kqueue_fd); - isc_mem_put(mctx, manager->events, - sizeof(struct kevent) * manager->nevents); + close(thread->kqueue_fd); + isc_mem_put(mctx, thread->events, + sizeof(struct kevent) * thread->nevents); #elif defined(USE_EPOLL) - close(manager->epoll_fd); - isc_mem_put(mctx, manager->events, - sizeof(struct epoll_event) * manager->nevents); + close(thread->epoll_fd); + + isc_mem_put(mctx, thread->events, + sizeof(struct epoll_event) * thread->nevents); #elif defined(USE_DEVPOLL) - close(manager->devpoll_fd); - isc_mem_put(mctx, manager->events, - sizeof(struct pollfd) * manager->nevents); - isc_mem_put(mctx, manager->fdpollinfo, - sizeof(pollinfo_t) * manager->maxsocks); + close(thread->devpoll_fd); + isc_mem_put(mctx, thread->events, + sizeof(struct pollfd) * thread->nevents); + isc_mem_put(mctx, thread->fdpollinfo, + sizeof(pollinfo_t) * thread->manager->maxsocks); #elif defined(USE_SELECT) - if (manager->read_fds != NULL) - isc_mem_put(mctx, manager->read_fds, manager->fd_bufsize); - if (manager->read_fds_copy != NULL) - isc_mem_put(mctx, manager->read_fds_copy, manager->fd_bufsize); - if (manager->write_fds != NULL) - isc_mem_put(mctx, manager->write_fds, manager->fd_bufsize); - if (manager->write_fds_copy != NULL) - isc_mem_put(mctx, manager->write_fds_copy, manager->fd_bufsize); + if (thread->read_fds != NULL) { + isc_mem_put(mctx, thread->read_fds, thread->fd_bufsize); + } + if (thread->read_fds_copy != NULL) { + isc_mem_put(mctx, thread->read_fds_copy, thread->fd_bufsize); + } + if (thread->write_fds != NULL) { + isc_mem_put(mctx, thread->write_fds, thread->fd_bufsize); + } + if (thread->write_fds_copy != NULL) { + isc_mem_put(mctx, thread->write_fds_copy, thread->fd_bufsize); + } #endif /* USE_KQUEUE */ + for (i = 0; i < (int)thread->manager->maxsocks; i++) { + if (thread->fdstate[i] == CLOSE_PENDING) { + /* no need to lock */ + (void)close(i); + } + } + +#if defined(USE_EPOLL) + isc_mem_put(thread->manager->mctx, thread->epoll_events, + thread->manager->maxsocks * sizeof(uint32_t)); +#endif + isc_mem_put(thread->manager->mctx, thread->fds, + thread->manager->maxsocks * sizeof(isc__socket_t *)); + isc_mem_put(thread->manager->mctx, thread->fdstate, + thread->manager->maxsocks * sizeof(int)); + + + if (thread->fdlock != NULL) { + for (i = 0; i < FDLOCK_COUNT; i++) { + DESTROYLOCK(&thread->fdlock[i]); + } + isc_mem_put(thread->manager->mctx, thread->fdlock, + FDLOCK_COUNT * sizeof(isc_mutex_t)); + } + } isc_result_t isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp) { - return (isc_socketmgr_create2(mctx, managerp, 0)); + return (isc_socketmgr_create2(mctx, managerp, 0, 1)); } isc_result_t isc_socketmgr_create2(isc_mem_t *mctx, isc_socketmgr_t **managerp, - unsigned int maxsocks) + unsigned int maxsocks, int nthreads) { int i; isc__socketmgr_t *manager; - char strbuf[ISC_STRERRORSIZE]; - isc_result_t result; REQUIRE(managerp != NULL && *managerp == NULL); @@ -4236,153 +3919,48 @@ isc_socketmgr_create2(isc_mem_t *mctx, isc_socketmgr_t **managerp, maxsocks = ISC_SOCKET_MAXSOCKETS; manager = isc_mem_get(mctx, sizeof(*manager)); - if (manager == NULL) - return (ISC_R_NOMEMORY); /* zero-clear so that necessary cleanup on failure will be easy */ memset(manager, 0, sizeof(*manager)); manager->maxsocks = maxsocks; manager->reserved = 0; manager->maxudp = 0; - manager->fds = isc_mem_get(mctx, - manager->maxsocks * sizeof(isc__socket_t *)); - if (manager->fds == NULL) { - result = ISC_R_NOMEMORY; - goto free_manager; - } - manager->fdstate = isc_mem_get(mctx, manager->maxsocks * sizeof(int)); - if (manager->fdstate == NULL) { - result = ISC_R_NOMEMORY; - goto free_manager; - } -#if defined(USE_EPOLL) - manager->epoll_events = isc_mem_get(mctx, (manager->maxsocks * - sizeof(uint32_t))); - if (manager->epoll_events == NULL) { - result = ISC_R_NOMEMORY; - goto free_manager; - } - memset(manager->epoll_events, 0, manager->maxsocks * sizeof(uint32_t)); -#endif + manager->nthreads = nthreads; manager->stats = NULL; manager->common.magic = ISCAPI_SOCKETMGR_MAGIC; manager->common.impmagic = SOCKET_MANAGER_MAGIC; manager->mctx = NULL; - memset(manager->fds, 0, manager->maxsocks * sizeof(isc_socket_t *)); ISC_LIST_INIT(manager->socklist); - result = isc_mutex_init(&manager->lock); - if (result != ISC_R_SUCCESS) - goto free_manager; - manager->fdlock = isc_mem_get(mctx, FDLOCK_COUNT * sizeof(isc_mutex_t)); - if (manager->fdlock == NULL) { - result = ISC_R_NOMEMORY; - goto cleanup_lock; - } - for (i = 0; i < FDLOCK_COUNT; i++) { - result = isc_mutex_init(&manager->fdlock[i]); - if (result != ISC_R_SUCCESS) { - while (--i >= 0) - DESTROYLOCK(&manager->fdlock[i]); - isc_mem_put(mctx, manager->fdlock, - FDLOCK_COUNT * sizeof(isc_mutex_t)); - manager->fdlock = NULL; - goto cleanup_lock; - } - } + RUNTIME_CHECK(isc_mutex_init(&manager->lock) == ISC_R_SUCCESS); - if (isc_condition_init(&manager->shutdown_ok) != ISC_R_SUCCESS) { - UNEXPECTED_ERROR(__FILE__, __LINE__, - "isc_condition_init() %s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_FAILED, "failed")); - result = ISC_R_UNEXPECTED; - goto cleanup_lock; - } - - /* - * Create the special fds that will be used to wake up the - * select/poll loop when something internal needs to be done. - */ - if (pipe(manager->pipe_fds) != 0) { - strerror_r(errno, strbuf, sizeof(strbuf)); - UNEXPECTED_ERROR(__FILE__, __LINE__, - "pipe() %s: %s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_FAILED, "failed"), - strbuf); - result = ISC_R_UNEXPECTED; - goto cleanup_condition; - } - - RUNTIME_CHECK(make_nonblock(manager->pipe_fds[0]) == ISC_R_SUCCESS); - - /* - * Set up initial state for the select loop - */ - result = setup_watcher(mctx, manager); - if (result != ISC_R_SUCCESS) - goto cleanup; - - memset(manager->fdstate, 0, manager->maxsocks * sizeof(int)); + RUNTIME_CHECK(isc_condition_init(&manager->shutdown_ok) + == ISC_R_SUCCESS); /* * Start up the select/poll thread. */ - if (isc_thread_create(watcher, manager, &manager->watcher) != - ISC_R_SUCCESS) { - UNEXPECTED_ERROR(__FILE__, __LINE__, - "isc_thread_create() %s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_FAILED, "failed")); - cleanup_watcher(mctx, manager); - result = ISC_R_UNEXPECTED; - goto cleanup; - } - isc_thread_setname(manager->watcher, "isc-socket"); + manager->threads = isc_mem_get(mctx, sizeof(isc__socketthread_t) + * manager->nthreads); isc_mem_attach(mctx, &manager->mctx); + for (i=0; i < manager->nthreads; i++) { + manager->threads[i].manager = manager; + manager->threads[i].threadid = i; + setup_thread(&manager->threads[i]); + RUNTIME_CHECK(isc_thread_create(netthread, + &manager->threads[i], + &manager->threads[i].thread) + == ISC_R_SUCCESS); + char tname[1024]; + sprintf(tname, "isc-socket-%d", i); + isc_thread_setname(manager->threads[i].thread, tname); + } + *managerp = (isc_socketmgr_t *)manager; return (ISC_R_SUCCESS); -cleanup: - (void)close(manager->pipe_fds[0]); - (void)close(manager->pipe_fds[1]); - -cleanup_condition: - (void)isc_condition_destroy(&manager->shutdown_ok); - - -cleanup_lock: - if (manager->fdlock != NULL) { - for (i = 0; i < FDLOCK_COUNT; i++) - DESTROYLOCK(&manager->fdlock[i]); - } - DESTROYLOCK(&manager->lock); - -free_manager: - if (manager->fdlock != NULL) { - isc_mem_put(mctx, manager->fdlock, - FDLOCK_COUNT * sizeof(isc_mutex_t)); - } -#if defined(USE_EPOLL) - if (manager->epoll_events != NULL) { - isc_mem_put(mctx, manager->epoll_events, - manager->maxsocks * sizeof(uint32_t)); - } -#endif - if (manager->fdstate != NULL) { - isc_mem_put(mctx, manager->fdstate, - manager->maxsocks * sizeof(int)); - } - if (manager->fds != NULL) { - isc_mem_put(mctx, manager->fds, - manager->maxsocks * sizeof(isc_socket_t *)); - } - isc_mem_put(mctx, manager, sizeof(*manager)); - - return (result); } isc_result_t @@ -4411,8 +3989,8 @@ isc_socketmgr_setstats(isc_socketmgr_t *manager0, isc_stats_t *stats) { void isc_socketmgr_destroy(isc_socketmgr_t **managerp) { isc__socketmgr_t *manager; - int i; isc_mem_t *mctx; + int i; /* * Destroy a socket manager. @@ -4442,47 +4020,35 @@ isc_socketmgr_destroy(isc_socketmgr_t **managerp) { * half of the pipe, which will send EOF to the read half. * This is currently a no-op in the non-threaded case. */ - select_poke(manager, 0, SELECT_POKE_SHUTDOWN); + for (i = 0; i < manager->nthreads; i++) { + select_poke(manager, i, 0, SELECT_POKE_SHUTDOWN); + } /* * Wait for thread to exit. */ - if (isc_thread_join(manager->watcher, NULL) != ISC_R_SUCCESS) - UNEXPECTED_ERROR(__FILE__, __LINE__, - "isc_thread_join() %s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_FAILED, "failed")); - + for (i = 0; i < manager->nthreads; i++) { + isc_result_t result; + result = isc_thread_join(manager->threads[i].thread, NULL); + if (result != ISC_R_SUCCESS) { + UNEXPECTED_ERROR(__FILE__, __LINE__, + "isc_thread_join() %s", + isc_msgcat_get(isc_msgcat, + ISC_MSGSET_GENERAL, + ISC_MSG_FAILED, + "failed")); + } + cleanup_thread(manager->mctx, &manager->threads[i]); + } /* * Clean up. */ - cleanup_watcher(manager->mctx, manager); - - (void)close(manager->pipe_fds[0]); - (void)close(manager->pipe_fds[1]); + isc_mem_put(manager->mctx, manager->threads, + sizeof(isc__socketthread_t) * manager->nthreads); (void)isc_condition_destroy(&manager->shutdown_ok); - for (i = 0; i < (int)manager->maxsocks; i++) - if (manager->fdstate[i] == CLOSE_PENDING) /* no need to lock */ - (void)close(i); - -#if defined(USE_EPOLL) - isc_mem_put(manager->mctx, manager->epoll_events, - manager->maxsocks * sizeof(uint32_t)); -#endif - isc_mem_put(manager->mctx, manager->fds, - manager->maxsocks * sizeof(isc__socket_t *)); - isc_mem_put(manager->mctx, manager->fdstate, - manager->maxsocks * sizeof(int)); - - if (manager->stats != NULL) + if (manager->stats != NULL) { isc_stats_detach(&manager->stats); - - if (manager->fdlock != NULL) { - for (i = 0; i < FDLOCK_COUNT; i++) - DESTROYLOCK(&manager->fdlock[i]); - isc_mem_put(manager->mctx, manager->fdlock, - FDLOCK_COUNT * sizeof(isc_mutex_t)); } DESTROYLOCK(&manager->lock); manager->common.magic = 0; @@ -4513,10 +4079,11 @@ socket_recv(isc__socket_t *sock, isc_socketevent_t *dev, isc_task_t *task, LOCK(&sock->lock); have_lock = true; - if (ISC_LIST_EMPTY(sock->recv_list)) + if (ISC_LIST_EMPTY(sock->recv_list)) { io_state = doio_recv(sock, dev); - else + } else { io_state = DOIO_SOFT; + } } switch (io_state) { @@ -4539,16 +4106,20 @@ socket_recv(isc__socket_t *sock, isc_socketevent_t *dev, isc_task_t *task, * Enqueue the request. If the socket was previously not being * watched, poke the watcher to start paying attention to it. */ - if (ISC_LIST_EMPTY(sock->recv_list) && !sock->pending_recv) - select_poke(sock->manager, sock->fd, SELECT_POKE_READ); + bool do_poke = ISC_LIST_EMPTY(sock->recv_list); ISC_LIST_ENQUEUE(sock->recv_list, dev, ev_link); + if (do_poke) { + select_poke(sock->manager, sock->threadid, sock->fd, + SELECT_POKE_READ); + } socket_log(sock, NULL, EVENT, NULL, 0, 0, "socket_recv: event %p -> task %p", dev, ntask); - if ((flags & ISC_SOCKFLAG_IMMEDIATE) != 0) + if ((flags & ISC_SOCKFLAG_IMMEDIATE) != 0) { result = ISC_R_INPROGRESS; + } break; case DOIO_EOF: @@ -4557,13 +4128,15 @@ socket_recv(isc__socket_t *sock, isc_socketevent_t *dev, isc_task_t *task, case DOIO_HARD: case DOIO_SUCCESS: - if ((flags & ISC_SOCKFLAG_IMMEDIATE) == 0) + if ((flags & ISC_SOCKFLAG_IMMEDIATE) == 0) { send_recvdone_event(sock, &dev); + } break; } - if (have_lock) + if (have_lock) { UNLOCK(&sock->lock); + } return (result); } @@ -4654,16 +4227,17 @@ socket_send(isc__socket_t *sock, isc_socketevent_t *dev, isc_task_t *task, } } - if (sock->type == isc_sockettype_udp) + if (sock->type == isc_sockettype_udp) { io_state = doio_send(sock, dev); - else { + } else { LOCK(&sock->lock); have_lock = true; - if (ISC_LIST_EMPTY(sock->send_list)) + if (ISC_LIST_EMPTY(sock->send_list)) { io_state = doio_send(sock, dev); - else + } else { io_state = DOIO_SOFT; + } } switch (io_state) { @@ -4686,18 +4260,20 @@ socket_send(isc__socket_t *sock, isc_socketevent_t *dev, isc_task_t *task, * not being watched, poke the watcher to start * paying attention to it. */ - if (ISC_LIST_EMPTY(sock->send_list) && - !sock->pending_send) - select_poke(sock->manager, sock->fd, - SELECT_POKE_WRITE); + bool do_poke = ISC_LIST_EMPTY(sock->send_list); ISC_LIST_ENQUEUE(sock->send_list, dev, ev_link); - + if (do_poke) { + select_poke(sock->manager, sock->threadid, + sock->fd, + SELECT_POKE_WRITE); + } socket_log(sock, NULL, EVENT, NULL, 0, 0, "socket_send: event %p -> task %p", dev, ntask); - if ((flags & ISC_SOCKFLAG_IMMEDIATE) != 0) + if ((flags & ISC_SOCKFLAG_IMMEDIATE) != 0) { result = ISC_R_INPROGRESS; + } break; } @@ -4705,13 +4281,15 @@ socket_send(isc__socket_t *sock, isc_socketevent_t *dev, isc_task_t *task, case DOIO_HARD: case DOIO_SUCCESS: - if ((flags & ISC_SOCKFLAG_IMMEDIATE) == 0) + if ((flags & ISC_SOCKFLAG_IMMEDIATE) == 0) { send_senddone_event(sock, &dev); + } break; } - if (have_lock) + if (have_lock) { UNLOCK(&sock->lock); + } return (result); } @@ -4988,17 +4566,29 @@ isc_socket_bind(isc_socket_t *sock0, const isc_sockaddr_t *sockaddr, * Only set SO_REUSEADDR when we want a specific port. */ #ifdef AF_UNIX - if (sock->pf == AF_UNIX) + if (sock->pf == AF_UNIX) { goto bind_socket; + } #endif if ((options & ISC_SOCKET_REUSEADDRESS) != 0 && - isc_sockaddr_getport(sockaddr) != (in_port_t)0 && - setsockopt(sock->fd, SOL_SOCKET, SO_REUSEADDR, (void *)&on, - sizeof(on)) < 0) { - UNEXPECTED_ERROR(__FILE__, __LINE__, + isc_sockaddr_getport(sockaddr) != (in_port_t) 0) + { + if (setsockopt(sock->fd, SOL_SOCKET, SO_REUSEADDR, + (void *)&on, sizeof(on)) < 0) + { + UNEXPECTED_ERROR(__FILE__, __LINE__, "setsockopt(%d) %s", sock->fd, isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed")); + } + if (setsockopt(sock->fd, SOL_SOCKET, SO_REUSEPORT, + (void *)&on, sizeof(on)) < 0) + { + UNEXPECTED_ERROR(__FILE__, __LINE__, + "setsockopt(%d) %s", sock->fd, + isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, + ISC_MSG_FAILED, "failed")); + } /* Press on... */ } #ifdef AF_UNIX @@ -5233,7 +4823,7 @@ isc_socket_accept(isc_socket_t *sock0, UNLOCK(&sock->lock); return (ISC_R_SHUTTINGDOWN); } - nsock->references++; + isc_refcount_increment(&nsock->references); nsock->statsindex = sock->statsindex; dev->ev_sender = ntask; @@ -5244,14 +4834,12 @@ isc_socket_accept(isc_socket_t *sock0, * is no race condition. We will keep the lock for such a short * bit of time waking it up now or later won't matter all that much. */ - if (ISC_LIST_EMPTY(sock->accept_list)) - do_poke = true; - + do_poke = ISC_LIST_EMPTY(sock->accept_list); ISC_LIST_ENQUEUE(sock->accept_list, dev, ev_link); - - if (do_poke) - select_poke(manager, sock->fd, SELECT_POKE_ACCEPT); - + if (do_poke) { + select_poke(manager, sock->threadid, sock->fd, + SELECT_POKE_ACCEPT); + } UNLOCK(&sock->lock); return (ISC_R_SUCCESS); } @@ -5300,7 +4888,7 @@ isc_socket_connect(isc_socket_t *sock0, const isc_sockaddr_t *addr, if (sock->connected) { INSIST(isc_sockaddr_equal(&sock->peer_address, addr)); dev->result = ISC_R_SUCCESS; - isc_task_send(task, ISC_EVENT_PTR(&dev)); + isc_task_sendto(task, ISC_EVENT_PTR(&dev), sock->threadid); UNLOCK(&sock->lock); @@ -5364,7 +4952,7 @@ isc_socket_connect(isc_socket_t *sock0, const isc_sockaddr_t *addr, err_exit: sock->connected = 0; - isc_task_send(task, ISC_EVENT_PTR(&dev)); + isc_task_sendto(task, ISC_EVENT_PTR(&dev), sock->threadid); UNLOCK(&sock->lock); inc_stats(sock->manager->stats, @@ -5380,7 +4968,7 @@ isc_socket_connect(isc_socket_t *sock0, const isc_sockaddr_t *addr, sock->connected = 1; sock->bound = 1; dev->result = ISC_R_SUCCESS; - isc_task_send(task, ISC_EVENT_PTR(&dev)); + isc_task_sendto(task, ISC_EVENT_PTR(&dev), sock->threadid); UNLOCK(&sock->lock); @@ -5404,12 +4992,13 @@ isc_socket_connect(isc_socket_t *sock0, const isc_sockaddr_t *addr, * is no race condition. We will keep the lock for such a short * bit of time waking it up now or later won't matter all that much. */ - if (ISC_LIST_EMPTY(sock->connect_list) && !sock->connecting) - select_poke(manager, sock->fd, SELECT_POKE_CONNECT); - - sock->connecting = 1; - + bool do_poke = ISC_LIST_EMPTY(sock->connect_list); ISC_LIST_ENQUEUE(sock->connect_list, dev, ev_link); + if (do_poke && !sock->connecting) { + sock->connecting = 1; + select_poke(manager, sock->threadid, sock->fd, + SELECT_POKE_CONNECT); + } UNLOCK(&sock->lock); return (ISC_R_SUCCESS); @@ -5419,8 +5008,7 @@ isc_socket_connect(isc_socket_t *sock0, const isc_sockaddr_t *addr, * Called when a socket with a pending connect() finishes. */ static void -internal_connect(isc_task_t *me, isc_event_t *ev) { - isc__socket_t *sock; +internal_connect(isc__socket_t *sock) { isc_socket_connev_t *dev; int cc; isc_result_t result; @@ -5428,26 +5016,10 @@ internal_connect(isc_task_t *me, isc_event_t *ev) { char strbuf[ISC_STRERRORSIZE]; char peerbuf[ISC_SOCKADDR_FORMATSIZE]; - UNUSED(me); - INSIST(ev->ev_type == ISC_SOCKEVENT_INTW); - - sock = ev->ev_sender; INSIST(VALID_SOCKET(sock)); LOCK(&sock->lock); - /* - * When the internal event was sent the reference count was bumped - * to keep the socket around for us. Decrement the count here. - */ - INSIST(sock->references > 0); - sock->references--; - if (sock->references == 0) { - UNLOCK(&sock->lock); - destroy(&sock); - return; - } - /* * Get the first item off the connect list. * If it is empty, unlock the socket and return. @@ -5455,8 +5027,7 @@ internal_connect(isc_task_t *me, isc_event_t *ev) { dev = ISC_LIST_HEAD(sock->connect_list); if (dev == NULL) { INSIST(!sock->connecting); - UNLOCK(&sock->lock); - return; + goto finish; } INSIST(sock->connecting); @@ -5479,10 +5050,7 @@ internal_connect(isc_task_t *me, isc_event_t *ev) { */ if (SOFT_ERROR(errno) || errno == EINPROGRESS) { sock->connecting = 1; - select_poke(sock->manager, sock->fd, - SELECT_POKE_CONNECT); UNLOCK(&sock->lock); - return; } @@ -5532,6 +5100,10 @@ internal_connect(isc_task_t *me, isc_event_t *ev) { dev = ISC_LIST_HEAD(sock->connect_list); } while (dev != NULL); + finish: + unwatch_fd(&sock->manager->threads[sock->threadid], sock->fd, + SELECT_POKE_CONNECT); + UNLOCK(&sock->lock); } @@ -5682,8 +5254,8 @@ isc_socket_cancel(isc_socket_t *sock0, isc_task_t *task, unsigned int how) { dev->result = ISC_R_CANCELED; dev->ev_sender = sock; - isc_task_sendanddetach(¤t_task, - ISC_EVENT_PTR(&dev)); + isc_task_sendtoanddetach(¤t_task, + ISC_EVENT_PTR(&dev), sock->threadid); } dev = next; @@ -5872,20 +5444,59 @@ isc_socket_getfd(isc_socket_t *socket0) { return ((short) sock->fd); } +static isc_once_t hasreuseport_once = ISC_ONCE_INIT; +static bool hasreuseport = false; + +static void +init_hasreuseport() { +/* + * SO_REUSEPORT works very differently on *BSD and on Linux (because why not). + * We only want to use it on Linux, if it's available. On BSD we want to dup() + * sockets instead of re-binding them. + */ +#if defined(SO_REUSEPORT) && defined(__linux__) + int sock, yes = 1; + sock = socket(AF_INET, SOCK_DGRAM, 0); + if (sock < 0) { + close(sock); + return; + } else if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (void *)&yes, sizeof(yes)) < 0) + { + close(sock); + return; + } else if (setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, + (void *)&yes, sizeof(yes)) < 0) + { + close(sock); + return; + } + hasreuseport = true; +#endif +} + +bool +isc_socket_hasreuseport() { + RUNTIME_CHECK(isc_once_do(&hasreuseport_once, init_hasreuseport) + == ISC_R_SUCCESS); + return (hasreuseport); +} + + #if defined(HAVE_LIBXML2) || defined(HAVE_JSON) static const char * _socktype(isc_sockettype_t type) { - if (type == isc_sockettype_udp) + switch (type) { + case isc_sockettype_udp: return ("udp"); - else if (type == isc_sockettype_tcp) + case isc_sockettype_tcp: return ("tcp"); - else if (type == isc_sockettype_unix) + case isc_sockettype_unix: return ("unix"); - else if (type == isc_sockettype_fdwatch) - return ("fdwatch"); - else + default: return ("not-initialized"); + } } #endif @@ -5923,7 +5534,7 @@ isc_socketmgr_renderxml(isc_socketmgr_t *mgr0, xmlTextWriterPtr writer) { TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "references")); TRY0(xmlTextWriterWriteFormatString(writer, "%d", - sock->references)); + (int)isc_refcount_current(&sock->references))); TRY0(xmlTextWriterEndElement(writer)); TRY0(xmlTextWriterWriteElement(writer, ISC_XMLCHAR "type", @@ -5946,18 +5557,6 @@ isc_socketmgr_renderxml(isc_socketmgr_t *mgr0, xmlTextWriterPtr writer) { } TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "states")); - if (sock->pending_recv) - TRY0(xmlTextWriterWriteElement(writer, - ISC_XMLCHAR "state", - ISC_XMLCHAR "pending-receive")); - if (sock->pending_send) - TRY0(xmlTextWriterWriteElement(writer, - ISC_XMLCHAR "state", - ISC_XMLCHAR "pending-send")); - if (sock->pending_accept) - TRY0(xmlTextWriterWriteElement(writer, - ISC_XMLCHAR "state", - ISC_XMLCHAR "pending_accept")); if (sock->listener) TRY0(xmlTextWriterWriteElement(writer, ISC_XMLCHAR "state", @@ -6037,7 +5636,8 @@ isc_socketmgr_renderjson(isc_socketmgr_t *mgr0, json_object *stats) { json_object_object_add(entry, "name", obj); } - obj = json_object_new_int(sock->references); + obj = json_object_new_int( + (int)isc_refcount_current(&sock->references)); CHECKMEM(obj); json_object_object_add(entry, "references", obj); @@ -6065,24 +5665,6 @@ isc_socketmgr_renderjson(isc_socketmgr_t *mgr0, json_object *stats) { CHECKMEM(states); json_object_object_add(entry, "states", states); - if (sock->pending_recv) { - obj = json_object_new_string("pending-receive"); - CHECKMEM(obj); - json_object_array_add(states, obj); - } - - if (sock->pending_send) { - obj = json_object_new_string("pending-send"); - CHECKMEM(obj); - json_object_array_add(states, obj); - } - - if (sock->pending_accept) { - obj = json_object_new_string("pending-accept"); - CHECKMEM(obj); - json_object_array_add(states, obj); - } - if (sock->listener) { obj = json_object_new_string("listener"); CHECKMEM(obj); diff --git a/lib/isc/win32/libisc.def.in b/lib/isc/win32/libisc.def.in index e323f54d04..13d4ba58fa 100644 --- a/lib/isc/win32/libisc.def.in +++ b/lib/isc/win32/libisc.def.in @@ -78,6 +78,7 @@ isc_socket_getpeername isc_socket_getsockname isc_socket_gettag isc_socket_gettype +isc_socket_hasreuseport isc_socket_ipv6only isc_socket_listen isc_socket_open diff --git a/lib/isc/win32/socket.c b/lib/isc/win32/socket.c index d96133e9ec..46ffcacfde 100644 --- a/lib/isc/win32/socket.c +++ b/lib/isc/win32/socket.c @@ -508,8 +508,6 @@ iocompletionport_init(isc_socketmgr_t *manager) { strbuf); } - manager->maxIOCPThreads = min(isc_os_ncpus() + 1, MAX_IOCPTHREADS); - /* Now Create the Completion Port */ manager->hIoCompletionPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, @@ -1551,7 +1549,6 @@ socket_create(isc_socketmgr_t *manager, int pf, isc_sockettype_t type, REQUIRE(VALID_MANAGER(manager)); REQUIRE(socketp != NULL && *socketp == NULL); - REQUIRE(type != isc_sockettype_fdwatch); #ifndef SOCK_RAW if (type == isc_sockettype_raw) @@ -1757,7 +1754,6 @@ isc_socket_dup(isc_socket_t *sock, isc_socket_t **socketp) { isc_result_t isc_socket_open(isc_socket_t *sock) { REQUIRE(VALID_SOCKET(sock)); - REQUIRE(sock->type != isc_sockettype_fdwatch); return (ISC_R_NOTIMPLEMENTED); } @@ -1789,7 +1785,6 @@ isc_socket_detach(isc_socket_t **socketp) { REQUIRE(socketp != NULL); sock = *socketp; REQUIRE(VALID_SOCKET(sock)); - REQUIRE(sock->type != isc_sockettype_fdwatch); LOCK(&sock->lock); CONSISTENT(sock); @@ -1815,7 +1810,6 @@ isc_socket_detach(isc_socket_t **socketp) { isc_result_t isc_socket_close(isc_socket_t *sock) { REQUIRE(VALID_SOCKET(sock)); - REQUIRE(sock->type != isc_sockettype_fdwatch); return (ISC_R_NOTIMPLEMENTED); } @@ -2542,7 +2536,7 @@ isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp) { isc_result_t isc_socketmgr_create2(isc_mem_t *mctx, isc_socketmgr_t **managerp, - unsigned int maxsocks) + unsigned int maxsocks, int nthreads) { isc_socketmgr_t *manager; isc_result_t result; @@ -2578,6 +2572,10 @@ isc_socketmgr_create2(isc_mem_t *mctx, isc_socketmgr_t **managerp, } isc_mem_attach(mctx, &manager->mctx); + if (nthreads == 0) { + nthreads = isc_os_ncpus() + 1; + } + manager->maxIOCPThreads = min(nthreads, MAX_IOCPTHREADS); iocompletionport_init(manager); /* Create the Completion Ports */ @@ -2903,7 +2901,6 @@ isc_socket_sendto(isc_socket_t *sock, isc_region_t *region, isc_result_t ret; REQUIRE(VALID_SOCKET(sock)); - REQUIRE(sock->type != isc_sockettype_fdwatch); LOCK(&sock->lock); CONSISTENT(sock); @@ -3696,20 +3693,25 @@ isc_socket_socketevent(isc_mem_t *mctx, void *sender, return (allocate_socketevent(mctx, sender, eventtype, action, arg)); } +bool +isc_socket_hasreuseport() { + return (false); +} + #ifdef HAVE_LIBXML2 static const char * _socktype(isc_sockettype_t type) { - if (type == isc_sockettype_udp) + switch (type) { + case isc_sockettype_udp: return ("udp"); - else if (type == isc_sockettype_tcp) + case isc_sockettype_tcp: return ("tcp"); - else if (type == isc_sockettype_unix) + case isc_sockettype_unix: return ("unix"); - else if (type == isc_sockettype_fdwatch) - return ("fdwatch"); - else + default: return ("not-initialized"); + } } #define TRY0(a) do { xmlrc = (a); if (xmlrc < 0) goto error; } while(0) diff --git a/lib/ns/interfacemgr.c b/lib/ns/interfacemgr.c index 6f1b0d591a..adba622d09 100644 --- a/lib/ns/interfacemgr.c +++ b/lib/ns/interfacemgr.c @@ -470,6 +470,7 @@ ns_interface_listenudp(ns_interface_t *ifp) { else attrs |= DNS_DISPATCHATTR_IPV6; attrs |= DNS_DISPATCHATTR_NOLISTEN; + attrs |= DNS_DISPATCHATTR_CANREUSE; attrmask = 0; attrmask |= DNS_DISPATCHATTR_UDP | DNS_DISPATCHATTR_TCP; attrmask |= DNS_DISPATCHATTR_IPV4 | DNS_DISPATCHATTR_IPV6;