diff --git a/CHANGES b/CHANGES index ea2caa9838..809c831347 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,7 @@ +5823. [func] Replace hazard pointers based lock-free list with + locked-list based queue that's simpler and has no or + little performance impact. [GL #3180] + 5822. [bug] When calling dns_dispatch_send(), attach/detach dns_request_t object as the read callback could be called before send callback dereferencing diff --git a/bin/named/main.c b/bin/named/main.c index e2905e926d..3a2fe05732 100644 --- a/bin/named/main.c +++ b/bin/named/main.c @@ -32,7 +32,6 @@ #include #include #include -#include #include #include #include diff --git a/lib/isc/Makefile.am b/lib/isc/Makefile.am index 3d37127fe9..eea0bb5f77 100644 --- a/lib/isc/Makefile.am +++ b/lib/isc/Makefile.am @@ -38,7 +38,6 @@ libisc_la_HEADERS = \ include/isc/heap.h \ include/isc/hex.h \ include/isc/hmac.h \ - include/isc/hp.h \ include/isc/ht.h \ include/isc/httpd.h \ include/isc/interfaceiter.h \ @@ -67,7 +66,6 @@ libisc_la_HEADERS = \ include/isc/pool.h \ include/isc/portset.h \ include/isc/print.h \ - include/isc/queue.h \ include/isc/quota.h \ include/isc/radix.h \ include/isc/random.h \ @@ -145,7 +143,6 @@ libisc_la_SOURCES = \ heap.c \ hex.c \ hmac.c \ - hp.c \ ht.c \ httpd.c \ interfaceiter.c \ @@ -173,7 +170,6 @@ libisc_la_SOURCES = \ parseint.c \ pool.c \ portset.c \ - queue.c \ quota.c \ radix.c \ random.c \ diff --git a/lib/isc/hp.c b/lib/isc/hp.c deleted file mode 100644 index f46398c7ca..0000000000 --- a/lib/isc/hp.c +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Copyright (C) Internet Systems Consortium, Inc. ("ISC") - * - * SPDX-License-Identifier: MPL-2.0 - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, you can obtain one at https://mozilla.org/MPL/2.0/. - * - * See the COPYRIGHT file distributed with this work for additional - * information regarding copyright ownership. - */ - -/* - * Hazard Pointer implementation. - * - * This work is based on C++ code available from: - * https://github.com/pramalhe/ConcurrencyFreaks/ - * - * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Concurrency Freaks nor the - * names of its contributors may be used to endorse or promote products - * derived from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS - * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED - * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A - * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL - * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF - * THE POSSIBILITY OF SUCH DAMAGE. - */ - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -static int isc__hp_max_threads = 1; - -typedef struct retirelist { - int size; - uintptr_t *list; -} retirelist_t; - -typedef atomic_uintptr_t isc_hp_uintptr_t; - -struct isc_hp { - int max_hps; - int max_retired; - isc_mem_t *mctx; - isc_hp_deletefunc_t *deletefunc; - isc_hp_uintptr_t **hp; - retirelist_t **rl; -}; - -static inline int -tid(void) { - return (isc_tid_v); -} - -void -isc_hp_init(int max_threads) { - REQUIRE(max_threads > 0); - - if (isc__hp_max_threads > max_threads) { - return; - } - - isc__hp_max_threads = max_threads; -} - -isc_hp_t * -isc_hp_new(isc_mem_t *mctx, size_t max_hps, isc_hp_deletefunc_t *deletefunc) { - isc_hp_t *hp = isc_mem_get(mctx, sizeof(*hp)); - - REQUIRE(isc__hp_max_threads > 0); - REQUIRE(max_hps > 0); - - *hp = (isc_hp_t){ - .max_hps = max_hps, - .max_retired = isc__hp_max_threads * max_hps, - .deletefunc = deletefunc, - }; - - isc_mem_attach(mctx, &hp->mctx); - - hp->hp = isc_mem_get(mctx, isc__hp_max_threads * sizeof(hp->hp[0])); - for (int i = 0; i < isc__hp_max_threads; i++) { - isc_hp_uintptr_t *hps; - - hps = isc_mem_get_aligned(mctx, hp->max_hps * sizeof(*hps), - isc_os_cacheline()); - for (int j = 0; j < hp->max_hps; j++) { - atomic_init(&hps[j], 0); - } - - hp->hp[i] = hps; - } - - hp->rl = isc_mem_get(mctx, isc__hp_max_threads * sizeof(hp->rl[0])); - - for (int i = 0; i < isc__hp_max_threads; i++) { - retirelist_t *rl; - - rl = isc_mem_get_aligned(mctx, sizeof(*rl), isc_os_cacheline()); - rl->size = 0; - rl->list = isc_mem_get(hp->mctx, - hp->max_retired * sizeof(uintptr_t)); - memset(rl->list, 0, hp->max_retired * sizeof(uintptr_t)); - - hp->rl[i] = rl; - } - - return (hp); -} - -void -isc_hp_destroy(isc_hp_t *hp) { - for (int i = 0; i < isc__hp_max_threads; i++) { - retirelist_t *rl = hp->rl[i]; - - for (int j = 0; j < rl->size; j++) { - void *data = (void *)rl->list[j]; - hp->deletefunc(data); - } - isc_mem_put(hp->mctx, rl->list, - hp->max_retired * sizeof(uintptr_t)); - isc_mem_put_aligned(hp->mctx, rl, sizeof(*rl), - isc_os_cacheline()); - } - for (int i = 0; i < isc__hp_max_threads; i++) { - isc_hp_uintptr_t *hps = hp->hp[i]; - isc_mem_put_aligned(hp->mctx, hps, hp->max_hps * sizeof(*hps), - isc_os_cacheline()); - } - isc_mem_put(hp->mctx, hp->hp, isc__hp_max_threads * sizeof(hp->hp[0])); - isc_mem_put(hp->mctx, hp->rl, isc__hp_max_threads * sizeof(hp->rl[0])); - - isc_mem_putanddetach(&hp->mctx, hp, sizeof(*hp)); -} - -void -isc_hp_clear(isc_hp_t *hp) { - for (int i = 0; i < hp->max_hps; i++) { - atomic_store_release(&hp->hp[tid()][i], 0); - } -} - -void -isc_hp_clear_one(isc_hp_t *hp, int ihp) { - atomic_store_release(&hp->hp[tid()][ihp], 0); -} - -uintptr_t -isc_hp_protect(isc_hp_t *hp, int ihp, atomic_uintptr_t *atom) { - uintptr_t n = 0; - uintptr_t ret; - while ((ret = atomic_load(atom)) != n) { - atomic_store(&hp->hp[tid()][ihp], ret); - n = ret; - } - return (ret); -} - -uintptr_t -isc_hp_protect_ptr(isc_hp_t *hp, int ihp, atomic_uintptr_t ptr) { - atomic_store(&hp->hp[tid()][ihp], atomic_load(&ptr)); - return (atomic_load(&ptr)); -} - -uintptr_t -isc_hp_protect_release(isc_hp_t *hp, int ihp, atomic_uintptr_t ptr) { - atomic_store_release(&hp->hp[tid()][ihp], atomic_load(&ptr)); - return (atomic_load(&ptr)); -} - -void -isc_hp_retire(isc_hp_t *hp, uintptr_t ptr) { - retirelist_t *rl = hp->rl[tid()]; - rl->list[rl->size++] = ptr; - INSIST(rl->size < hp->max_retired); - - for (int iret = 0; iret < rl->size; iret++) { - uintptr_t obj = rl->list[iret]; - bool can_delete = true; - for (int itid = 0; itid < isc__hp_max_threads && can_delete; - itid++) { - for (int ihp = hp->max_hps - 1; ihp >= 0; ihp--) { - if (atomic_load(&hp->hp[itid][ihp]) == obj) { - can_delete = false; - break; - } - } - } - - if (can_delete) { - size_t bytes = (rl->size - iret) * sizeof(rl->list[0]); - memmove(&rl->list[iret], &rl->list[iret + 1], bytes); - rl->size--; - hp->deletefunc((void *)obj); - } - } -} diff --git a/lib/isc/include/isc/hp.h b/lib/isc/include/isc/hp.h deleted file mode 100644 index 6b6ddea8d1..0000000000 --- a/lib/isc/include/isc/hp.h +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright (C) Internet Systems Consortium, Inc. ("ISC") - * - * SPDX-License-Identifier: MPL-2.0 - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, you can obtain one at https://mozilla.org/MPL/2.0/. - * - * See the COPYRIGHT file distributed with this work for additional - * information regarding copyright ownership. - */ - -/* - * Hazard Pointer implementation. - * - * This work is based on C++ code available from: - * https://github.com/pramalhe/ConcurrencyFreaks/ - * - * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Concurrency Freaks nor the - * names of its contributors may be used to endorse or promote products - * derived from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS - * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED - * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A - * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL - * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF - * THE POSSIBILITY OF SUCH DAMAGE. - */ - -#pragma once - -#include -#include -#include -#include -#include - -/*% - * Hazard pointers are a mechanism for protecting objects in memory - * from being deleted by other threads while in use. This allows - * safe lock-free data structures. - * - * This is an adaptation of the ConcurrencyFreaks implementation in C. - * More details available at https://github.com/pramalhe/ConcurrencyFreaks, - * in the file HazardPointers.hpp. - */ - -typedef void(isc_hp_deletefunc_t)(void *); - -void -isc_hp_init(int max_threads); -/*%< - * Initialize hazard pointer constants, isc__hp_max_threads and - * isc__hp_max_retired. If more threads try to access hp, it - * will assert. Calling this function repeatedly can be used - * to increase the limits, but cannot reduce them. - */ - -isc_hp_t * -isc_hp_new(isc_mem_t *mctx, size_t max_hps, isc_hp_deletefunc_t *deletefunc); -/*%< - * Create a new hazard pointer array of size 'max_hps' (or a reasonable - * default value if 'max_hps' is 0). The function 'deletefunc' will be - * used to delete objects protected by hazard pointers when it becomes - * safe to retire them. - */ - -void -isc_hp_destroy(isc_hp_t *hp); -/*%< - * Destroy a hazard pointer array and clean up all objects protected - * by hazard pointers. - */ - -void -isc_hp_clear(isc_hp_t *hp); -/*%< - * Clear all hazard pointers in the array for the current thread. - * - * Progress condition: wait-free bounded (by max_hps) - */ - -void -isc_hp_clear_one(isc_hp_t *hp, int ihp); -/*%< - * Clear a specified hazard pointer in the array for the current thread. - * - * Progress condition: wait-free population oblivious. - */ - -uintptr_t -isc_hp_protect(isc_hp_t *hp, int ihp, atomic_uintptr_t *atom); -/*%< - * Protect an object referenced by 'atom' with a hazard pointer for the - * current thread. - * - * Progress condition: lock-free. - */ - -uintptr_t -isc_hp_protect_ptr(isc_hp_t *hp, int ihp, atomic_uintptr_t ptr); -/*%< - * This returns the same value that is passed as ptr, which is sometimes - * useful. - * - * Progress condition: wait-free population oblivious. - */ - -uintptr_t -isc_hp_protect_release(isc_hp_t *hp, int ihp, atomic_uintptr_t ptr); -/*%< - * Same as isc_hp_protect_ptr(), but explicitly uses memory_order_release. - * - * Progress condition: wait-free population oblivious. - */ - -void -isc_hp_retire(isc_hp_t *hp, uintptr_t ptr); -/*%< - * Retire an object that is no longer in use by any thread, calling - * the delete function that was specified in isc_hp_new(). - * - * Progress condition: wait-free bounded (by the number of threads squared) - */ diff --git a/lib/isc/include/isc/list.h b/lib/isc/include/isc/list.h index 0b28615c8f..b7a783c3ad 100644 --- a/lib/isc/include/isc/list.h +++ b/lib/isc/include/isc/list.h @@ -198,3 +198,17 @@ __ISC_LIST_UNLINKUNSAFE_TYPE(list, elt, link, void) #define __ISC_LIST_DEQUEUEUNSAFE_TYPE(list, elt, link, type) \ __ISC_LIST_UNLINKUNSAFE_TYPE(list, elt, link, type) + +#define ISC_LIST_MOVEUNSAFE(dest, src) \ + { \ + (dest).head = (src).head; \ + (dest).tail = (src).tail; \ + (src).head = NULL; \ + (src).tail = NULL; \ + } + +#define ISC_LIST_MOVE(dest, src) \ + { \ + INSIST(ISC_LIST_EMPTY(dest)); \ + ISC_LIST_MOVEUNSAFE(dest, src); \ + } diff --git a/lib/isc/include/isc/queue.h b/lib/isc/include/isc/queue.h deleted file mode 100644 index 6cbd5fc4ab..0000000000 --- a/lib/isc/include/isc/queue.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (C) Internet Systems Consortium, Inc. ("ISC") - * - * SPDX-License-Identifier: MPL-2.0 - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, you can obtain one at https://mozilla.org/MPL/2.0/. - * - * See the COPYRIGHT file distributed with this work for additional - * information regarding copyright ownership. - */ - -#pragma once -#include - -typedef struct isc_queue isc_queue_t; - -isc_queue_t * -isc_queue_new(isc_mem_t *mctx); -/*%< - * Create a new fetch-and-add array queue. - * - * 'max_threads' is currently unused. In the future it can be used - * to pass a maximum threads parameter when creating hazard pointers, - * but currently `isc_hp_t` uses a hard-coded value. - */ - -void -isc_queue_enqueue(isc_queue_t *queue, uintptr_t item); -/*%< - * Enqueue an object pointer 'item' at the tail of the queue. - * - * Requires: - * \li 'item' is not null. - */ - -uintptr_t -isc_queue_dequeue(isc_queue_t *queue); -/*%< - * Remove an object pointer from the head of the queue and return the - * pointer. If the queue is empty, return `nulluintptr` (the uintptr_t - * representation of NULL). - * - * Requires: - * \li 'queue' is not null. - */ - -void -isc_queue_destroy(isc_queue_t *queue); -/*%< - * Destroy a queue. - * - * Requires: - * \li 'queue' is not null. - */ diff --git a/lib/isc/include/isc/types.h b/lib/isc/include/isc/types.h index ce3e75a516..c18b4f170e 100644 --- a/lib/isc/include/isc/types.h +++ b/lib/isc/include/isc/types.h @@ -44,11 +44,9 @@ typedef struct isc_counter isc_counter_t; /*%< Counter */ typedef int16_t isc_dscp_t; /*%< Diffserv code point */ typedef struct isc_event isc_event_t; /*%< Event */ typedef ISC_LIST(isc_event_t) isc_eventlist_t; /*%< Event List */ -typedef unsigned int isc_eventtype_t; /*%< Event Type */ -typedef uint32_t isc_fsaccess_t; /*%< FS Access */ -typedef struct isc_hash isc_hash_t; /*%< Hash */ -typedef struct isc_hp isc_hp_t; /*%< Hazard - * pointer */ +typedef unsigned int isc_eventtype_t; /*%< Event Type */ +typedef uint32_t isc_fsaccess_t; /*%< FS Access */ +typedef struct isc_hash isc_hash_t; /*%< Hash */ typedef struct isc_httpd isc_httpd_t; /*%< HTTP client */ typedef void(isc_httpdfree_t)(isc_buffer_t *, void *); /*%< HTTP free function */ diff --git a/lib/isc/managers.c b/lib/isc/managers.c index 7a1dfe9446..63ef09e5b4 100644 --- a/lib/isc/managers.c +++ b/lib/isc/managers.c @@ -11,7 +11,6 @@ * information regarding copyright ownership. */ -#include #include #include @@ -28,15 +27,6 @@ isc_managers_create(isc_mem_t *mctx, size_t workers, size_t quantum, isc_taskmgr_t *taskmgr = NULL; isc_timermgr_t *timermgr = NULL; - /* - * Currently, there are: - * - 1 main thread - * - 1 timer thread - * - n netmgr threads - * - n threadpool threads - */ - isc_hp_init(2 + 2 * workers); - REQUIRE(netmgrp != NULL && *netmgrp == NULL); isc__netmgr_create(mctx, workers, &netmgr); *netmgrp = netmgr; diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index a81558bbea..a6c8b0a8a6 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -201,6 +200,17 @@ typedef enum { NETIEVENT_MAX = 4, } netievent_type_t; +typedef struct isc__nm_uvreq isc__nm_uvreq_t; +typedef struct isc__netievent isc__netievent_t; + +typedef ISC_LIST(isc__netievent_t) isc__netievent_list_t; + +typedef struct ievent { + isc_mutex_t lock; + isc_condition_t cond; + isc__netievent_list_t list; +} ievent_t; + /* * Single network event loop worker. */ @@ -210,13 +220,10 @@ typedef struct isc__networker { uv_loop_t loop; /* libuv loop structure */ uv_async_t async; /* async channel to send * data to this networker */ - isc_mutex_t lock; bool paused; bool finished; isc_thread_t thread; - isc_queue_t *ievents[NETIEVENT_MAX]; - atomic_uint_fast32_t nievents[NETIEVENT_MAX]; - isc_condition_t cond_prio; + ievent_t ievents[NETIEVENT_MAX]; isc_refcount_t references; atomic_int_fast64_t pktcount; @@ -421,12 +428,13 @@ isc__nm_put_netievent(isc_nm_t *mgr, void *ievent); * either in netmgr.c or matching protocol file (e.g. udp.c, tcp.c, etc.) */ -#define NETIEVENT__SOCKET \ - isc__netievent_type type; \ - isc_nmsocket_t *sock; \ - const char *file; \ - unsigned int line; \ - const char *func +#define NETIEVENT__SOCKET \ + isc__netievent_type type; \ + ISC_LINK(isc__netievent_t) link; \ + isc_nmsocket_t *sock; \ + const char *file; \ + unsigned int line; \ + const char *func; typedef struct isc__netievent__socket { NETIEVENT__SOCKET; @@ -489,8 +497,7 @@ typedef struct isc__netievent__socket_req { } typedef struct isc__netievent__socket_req_result { - isc__netievent_type type; - isc_nmsocket_t *sock; + NETIEVENT__SOCKET; isc__nm_uvreq_t *req; isc_result_t result; } isc__netievent__socket_req_result_t; @@ -589,6 +596,7 @@ typedef struct isc__netievent__socket_quota { typedef struct isc__netievent__task { isc__netievent_type type; + ISC_LINK(isc__netievent_t) link; isc_task_t *task; } isc__netievent__task_t; @@ -624,8 +632,7 @@ typedef struct isc__netievent_udpsend { } isc__netievent_udpsend_t; typedef struct isc__netievent_tlsconnect { - isc__netievent_type type; - isc_nmsocket_t *sock; + NETIEVENT__SOCKET; SSL_CTX *ctx; isc_sockaddr_t local; /* local address */ isc_sockaddr_t peer; /* peer address */ @@ -633,6 +640,7 @@ typedef struct isc__netievent_tlsconnect { typedef struct isc__netievent { isc__netievent_type type; + ISC_LINK(isc__netievent_t) link; } isc__netievent_t; #define NETIEVENT_TYPE(type) typedef isc__netievent_t isc__netievent_##type##_t; diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 7ede0ec749..eafce4ec1c 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -145,6 +146,7 @@ static isc_threadresult_t nm_thread(isc_threadarg_t worker0); static void async_cb(uv_async_t *handle); + static bool process_netievent(isc__networker_t *worker, isc__netievent_t *ievent); static isc_result_t @@ -154,51 +156,6 @@ wait_for_priority_queue(isc__networker_t *worker); static void drain_queue(isc__networker_t *worker, netievent_type_t type); -#define ENQUEUE_NETIEVENT(worker, queue, event) \ - isc_queue_enqueue(worker->ievents[queue], (uintptr_t)event) -#define DEQUEUE_NETIEVENT(worker, queue) \ - (isc__netievent_t *)isc_queue_dequeue(worker->ievents[queue]) - -#define ENQUEUE_PRIORITY_NETIEVENT(worker, event) \ - ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY, event) -#define ENQUEUE_PRIVILEGED_NETIEVENT(worker, event) \ - ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED, event) -#define ENQUEUE_TASK_NETIEVENT(worker, event) \ - ENQUEUE_NETIEVENT(worker, NETIEVENT_TASK, event) -#define ENQUEUE_NORMAL_NETIEVENT(worker, event) \ - ENQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL, event) - -#define DEQUEUE_PRIORITY_NETIEVENT(worker) \ - DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY) -#define DEQUEUE_PRIVILEGED_NETIEVENT(worker) \ - DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED) -#define DEQUEUE_TASK_NETIEVENT(worker) DEQUEUE_NETIEVENT(worker, NETIEVENT_TASK) -#define DEQUEUE_NORMAL_NETIEVENT(worker) \ - DEQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL) - -#define INCREMENT_NETIEVENT(worker, queue) \ - atomic_fetch_add_release(&worker->nievents[queue], 1) -#define DECREMENT_NETIEVENT(worker, queue) \ - atomic_fetch_sub_release(&worker->nievents[queue], 1) - -#define INCREMENT_PRIORITY_NETIEVENT(worker) \ - INCREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY) -#define INCREMENT_PRIVILEGED_NETIEVENT(worker) \ - INCREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED) -#define INCREMENT_TASK_NETIEVENT(worker) \ - INCREMENT_NETIEVENT(worker, NETIEVENT_TASK) -#define INCREMENT_NORMAL_NETIEVENT(worker) \ - INCREMENT_NETIEVENT(worker, NETIEVENT_NORMAL) - -#define DECREMENT_PRIORITY_NETIEVENT(worker) \ - DECREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY) -#define DECREMENT_PRIVILEGED_NETIEVENT(worker) \ - DECREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED) -#define DECREMENT_TASK_NETIEVENT(worker) \ - DECREMENT_NETIEVENT(worker, NETIEVENT_TASK) -#define DECREMENT_NORMAL_NETIEVENT(worker) \ - DECREMENT_NETIEVENT(worker, NETIEVENT_NORMAL) - static void isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0); static void @@ -311,12 +268,10 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) { r = uv_async_init(&worker->loop, &worker->async, async_cb); UV_RUNTIME_CHECK(uv_async_init, r); - isc_mutex_init(&worker->lock); - isc_condition_init(&worker->cond_prio); - for (size_t type = 0; type < NETIEVENT_MAX; type++) { - worker->ievents[type] = isc_queue_new(mgr->mctx); - atomic_init(&worker->nievents[type], 0); + isc_mutex_init(&worker->ievents[type].lock); + isc_condition_init(&worker->ievents[type].cond); + ISC_LIST_INIT(worker->ievents[type].list); } worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE); @@ -367,28 +322,15 @@ nm_destroy(isc_nm_t **mgr0) { for (int i = 0; i < mgr->nworkers; i++) { isc__networker_t *worker = &mgr->workers[i]; - isc__netievent_t *ievent = NULL; int r; - /* Empty the async event queues */ - while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) { - isc__nm_put_netievent(mgr, ievent); - } - - INSIST(DEQUEUE_PRIVILEGED_NETIEVENT(worker) == NULL); - INSIST(DEQUEUE_TASK_NETIEVENT(worker) == NULL); - - while ((ievent = DEQUEUE_NORMAL_NETIEVENT(worker)) != NULL) { - isc__nm_put_netievent(mgr, ievent); - } - isc_condition_destroy(&worker->cond_prio); - isc_mutex_destroy(&worker->lock); - r = uv_loop_close(&worker->loop); UV_RUNTIME_CHECK(uv_loop_close, r); for (size_t type = 0; type < NETIEVENT_MAX; type++) { - isc_queue_destroy(worker->ievents[type]); + INSIST(ISC_LIST_EMPTY(worker->ievents[type].list)); + isc_condition_destroy(&worker->ievents[type].cond); + isc_mutex_destroy(&worker->ievents[type].lock); } isc_mem_put(mgr->mctx, worker->sendbuf, @@ -737,13 +679,17 @@ nm_thread(isc_threadarg_t worker0) { } /* - * We are shutting down. Process the task queues - * (they may include shutdown events) but do not process - * the netmgr event queue. + * We are shutting down. Drain the queues. */ drain_queue(worker, NETIEVENT_PRIVILEGED); drain_queue(worker, NETIEVENT_TASK); + for (size_t type = 0; type < NETIEVENT_MAX; type++) { + LOCK(&worker->ievents[type].lock); + INSIST(ISC_LIST_EMPTY(worker->ievents[type].list)); + UNLOCK(&worker->ievents[type].lock); + } + LOCK(&mgr->lock); mgr->workers_running--; SIGNAL(&mgr->wkstatecond); @@ -765,7 +711,8 @@ process_all_queues(isc__networker_t *worker) { isc_result_t result = process_queue(worker, type); switch (result) { case ISC_R_SUSPEND: - return (true); + reschedule = true; + break; case ISC_R_EMPTY: /* empty queue */ break; @@ -859,35 +806,29 @@ isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) { static void wait_for_priority_queue(isc__networker_t *worker) { - isc_condition_t *cond = &worker->cond_prio; - bool wait_for_work = true; + isc_condition_t *cond = &worker->ievents[NETIEVENT_PRIORITY].cond; + isc_mutex_t *lock = &worker->ievents[NETIEVENT_PRIORITY].lock; + isc__netievent_list_t *list = + &(worker->ievents[NETIEVENT_PRIORITY].list); - while (true) { - isc__netievent_t *ievent; - LOCK(&worker->lock); - ievent = DEQUEUE_PRIORITY_NETIEVENT(worker); - if (wait_for_work) { - while (ievent == NULL) { - WAIT(cond, &worker->lock); - ievent = DEQUEUE_PRIORITY_NETIEVENT(worker); - } - } - UNLOCK(&worker->lock); - wait_for_work = false; - - if (ievent == NULL) { - return; - } - DECREMENT_PRIORITY_NETIEVENT(worker); - - (void)process_netievent(worker, ievent); + LOCK(lock); + while (ISC_LIST_EMPTY(*list)) { + WAIT(cond, lock); } + UNLOCK(lock); + + drain_queue(worker, NETIEVENT_PRIORITY); } static void drain_queue(isc__networker_t *worker, netievent_type_t type) { - while (process_queue(worker, type) != ISC_R_EMPTY) { - ; + bool empty = false; + while (!empty) { + if (process_queue(worker, type) == ISC_R_EMPTY) { + LOCK(&worker->ievents[type].lock); + empty = ISC_LIST_EMPTY(worker->ievents[type].list); + UNLOCK(&worker->ievents[type].lock); + } } } @@ -995,40 +936,41 @@ process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) { static isc_result_t process_queue(isc__networker_t *worker, netievent_type_t type) { - /* - * The number of items on the queue is only loosely synchronized with - * the items on the queue. But there's a guarantee that if there's an - * item on the queue, it will be accounted for. However there's a - * possibility that the counter might be higher than the items on the - * queue stored. - */ - uint_fast32_t waiting = atomic_load_acquire(&worker->nievents[type]); - isc__netievent_t *ievent = DEQUEUE_NETIEVENT(worker, type); + isc__netievent_t *ievent = NULL; + isc__netievent_list_t list; - if (ievent == NULL && waiting == 0) { + ISC_LIST_INIT(list); + + LOCK(&worker->ievents[type].lock); + ISC_LIST_MOVE(list, worker->ievents[type].list); + UNLOCK(&worker->ievents[type].lock); + + ievent = ISC_LIST_HEAD(list); + if (ievent == NULL) { /* There's nothing scheduled */ return (ISC_R_EMPTY); - } else if (ievent == NULL) { - /* There's at least one item scheduled, but not on the queue yet - */ - return (ISC_R_SUCCESS); } while (ievent != NULL) { - DECREMENT_NETIEVENT(worker, type); - bool stop = !process_netievent(worker, ievent); + isc__netievent_t *next = ISC_LIST_NEXT(ievent, link); + ISC_LIST_DEQUEUE(list, ievent, link); - if (stop) { - /* Netievent told us to stop */ + if (!process_netievent(worker, ievent)) { + /* The netievent told us to stop */ + if (!ISC_LIST_EMPTY(list)) { + /* + * Reschedule the rest of the unprocessed + * events. + */ + LOCK(&worker->ievents[type].lock); + ISC_LIST_PREPENDLIST(worker->ievents[type].list, + list, link); + UNLOCK(&worker->ievents[type].lock); + } return (ISC_R_SUSPEND); } - if (waiting-- == 0) { - /* We reached this round "quota" */ - break; - } - - ievent = DEQUEUE_NETIEVENT(worker, type); + ievent = next; } /* We processed at least one */ @@ -1041,6 +983,7 @@ isc__nm_get_netievent(isc_nm_t *mgr, isc__netievent_type type) { sizeof(*event)); *event = (isc__netievent_storage_t){ .ni.type = type }; + ISC_LINK_INIT(&(event->ni), link); return (event); } @@ -1130,26 +1073,39 @@ isc__nm_maybe_enqueue_ievent(isc__networker_t *worker, void isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) { + netievent_type_t type; + if (event->type > netievent_prio) { - /* - * We need to make sure this signal will be delivered and - * the queue will be processed. - */ - LOCK(&worker->lock); - INCREMENT_PRIORITY_NETIEVENT(worker); - ENQUEUE_PRIORITY_NETIEVENT(worker, event); - SIGNAL(&worker->cond_prio); - UNLOCK(&worker->lock); - } else if (event->type == netievent_privilegedtask) { - INCREMENT_PRIVILEGED_NETIEVENT(worker); - ENQUEUE_PRIVILEGED_NETIEVENT(worker, event); - } else if (event->type == netievent_task) { - INCREMENT_TASK_NETIEVENT(worker); - ENQUEUE_TASK_NETIEVENT(worker, event); + type = NETIEVENT_PRIORITY; } else { - INCREMENT_NORMAL_NETIEVENT(worker); - ENQUEUE_NORMAL_NETIEVENT(worker, event); + switch (event->type) { + case netievent_prio: + INSIST(0); + ISC_UNREACHABLE(); + break; + case netievent_privilegedtask: + type = NETIEVENT_PRIVILEGED; + break; + case netievent_task: + type = NETIEVENT_TASK; + break; + default: + type = NETIEVENT_NORMAL; + break; + } } + + /* + * We need to make sure this signal will be delivered and + * the queue will be processed. + */ + LOCK(&worker->ievents[type].lock); + ISC_LIST_ENQUEUE(worker->ievents[type].list, event, link); + if (type == NETIEVENT_PRIORITY) { + SIGNAL(&worker->ievents[type].cond); + } + UNLOCK(&worker->ievents[type].lock); + uv_async_send(&worker->async); } @@ -2927,6 +2883,7 @@ shutdown_walk_cb(uv_handle_t *handle, void *arg) { void isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ev0) { UNUSED(ev0); + uv_walk(&worker->loop, shutdown_walk_cb, NULL); } diff --git a/lib/isc/queue.c b/lib/isc/queue.c deleted file mode 100644 index d7856c156d..0000000000 --- a/lib/isc/queue.c +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Copyright (C) Internet Systems Consortium, Inc. ("ISC") - * - * SPDX-License-Identifier: MPL-2.0 - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, you can obtain one at https://mozilla.org/MPL/2.0/. - * - * See the COPYRIGHT file distributed with this work for additional - * information regarding copyright ownership. - */ - -#include - -#include -#include -#include -#include -#include -#include -#include - -#define BUFFER_SIZE 1024 - -static uintptr_t nulluintptr = (uintptr_t)NULL; - -typedef struct node { - atomic_uint_fast32_t deqidx; - atomic_uintptr_t items[BUFFER_SIZE]; - atomic_uint_fast32_t enqidx; - atomic_uintptr_t next; - isc_mem_t *mctx; -} node_t; - -/* we just need one Hazard Pointer */ -#define HP_TAIL 0 -#define HP_HEAD 0 - -struct isc_queue { - alignas(ISC_OS_CACHELINE_SIZE) atomic_uintptr_t head; - alignas(ISC_OS_CACHELINE_SIZE) atomic_uintptr_t tail; - isc_mem_t *mctx; - int taken; - isc_hp_t *hp; -}; - -static node_t * -node_new(isc_mem_t *mctx, uintptr_t item) { - node_t *node = isc_mem_get(mctx, sizeof(*node)); - *node = (node_t){ .mctx = NULL }; - - atomic_init(&node->deqidx, 0); - atomic_init(&node->enqidx, 1); - atomic_init(&node->next, 0); - atomic_init(&node->items[0], item); - - for (int i = 1; i < BUFFER_SIZE; i++) { - atomic_init(&node->items[i], 0); - } - - isc_mem_attach(mctx, &node->mctx); - - return (node); -} - -static void -node_destroy(void *node0) { - node_t *node = (node_t *)node0; - - isc_mem_putanddetach(&node->mctx, node, sizeof(*node)); -} - -static bool -node_cas_next(node_t *node, node_t *cmp, const node_t *val) { - return (atomic_compare_exchange_strong(&node->next, (uintptr_t *)&cmp, - (uintptr_t)val)); -} - -static bool -queue_cas_tail(isc_queue_t *queue, node_t *cmp, const node_t *val) { - return (atomic_compare_exchange_strong(&queue->tail, (uintptr_t *)&cmp, - (uintptr_t)val)); -} - -static bool -queue_cas_head(isc_queue_t *queue, node_t *cmp, const node_t *val) { - return (atomic_compare_exchange_strong(&queue->head, (uintptr_t *)&cmp, - (uintptr_t)val)); -} - -isc_queue_t * -isc_queue_new(isc_mem_t *mctx) { - isc_queue_t *queue = NULL; - node_t *sentinel = NULL; - - queue = isc_mem_get_aligned(mctx, sizeof(*queue), isc_os_cacheline()); - - *queue = (isc_queue_t){ 0 }; - - isc_mem_attach(mctx, &queue->mctx); - - queue->hp = isc_hp_new(mctx, 1, node_destroy); - - sentinel = node_new(mctx, nulluintptr); - atomic_init(&sentinel->enqidx, 0); - - atomic_init(&queue->head, (uintptr_t)sentinel); - atomic_init(&queue->tail, (uintptr_t)sentinel); - - return (queue); -} - -void -isc_queue_enqueue(isc_queue_t *queue, uintptr_t item) { - REQUIRE(item != nulluintptr); - - while (true) { - node_t *lt = NULL; - uint_fast32_t idx; - uintptr_t n = nulluintptr; - - lt = (node_t *)isc_hp_protect(queue->hp, HP_TAIL, &queue->tail); - idx = atomic_fetch_add(<->enqidx, 1); - if (idx > BUFFER_SIZE - 1) { - node_t *lnext = NULL; - - if (lt != (node_t *)atomic_load(&queue->tail)) { - continue; - } - - lnext = (node_t *)atomic_load(<->next); - if (lnext == NULL) { - node_t *newnode = node_new(queue->mctx, item); - if (node_cas_next(lt, NULL, newnode)) { - queue_cas_tail(queue, lt, newnode); - isc_hp_clear(queue->hp); - return; - } - node_destroy(newnode); - } else { - queue_cas_tail(queue, lt, lnext); - } - - continue; - } - - if (atomic_compare_exchange_strong(<->items[idx], &n, item)) { - isc_hp_clear(queue->hp); - return; - } - } -} - -uintptr_t -isc_queue_dequeue(isc_queue_t *queue) { - REQUIRE(queue != NULL); - - while (true) { - node_t *lh = NULL; - uint_fast32_t idx; - uintptr_t item; - - lh = (node_t *)isc_hp_protect(queue->hp, HP_HEAD, &queue->head); - if (atomic_load(&lh->deqidx) >= atomic_load(&lh->enqidx) && - atomic_load(&lh->next) == nulluintptr) - { - break; - } - - idx = atomic_fetch_add(&lh->deqidx, 1); - if (idx > BUFFER_SIZE - 1) { - node_t *lnext = (node_t *)atomic_load(&lh->next); - if (lnext == NULL) { - break; - } - if (queue_cas_head(queue, lh, lnext)) { - isc_hp_retire(queue->hp, (uintptr_t)lh); - } - - continue; - } - - item = atomic_exchange(&(lh->items[idx]), - (uintptr_t)&queue->taken); - if (item == nulluintptr) { - continue; - } - - isc_hp_clear(queue->hp); - return (item); - } - - isc_hp_clear(queue->hp); - return (nulluintptr); -} - -void -isc_queue_destroy(isc_queue_t *queue) { - node_t *last = NULL; - - REQUIRE(queue != NULL); - - while (isc_queue_dequeue(queue) != nulluintptr) { - /* do nothing */ - } - - last = (node_t *)atomic_load_relaxed(&queue->head); - node_destroy(last); - isc_hp_destroy(queue->hp); - - isc_mem_putanddetach_aligned(&queue->mctx, queue, sizeof(*queue), - isc_os_cacheline()); -} diff --git a/lib/isc/task.c b/lib/isc/task.c index 666eb13fb2..6dbd018c50 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -200,7 +200,7 @@ isc_task_create(isc_taskmgr_t *manager, unsigned int quantum, isc_result_t isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum, isc_task_t **taskp, int threadid) { - isc_task_t *task; + isc_task_t *task = NULL; bool exiting; REQUIRE(VALID_MANAGER(manager)); diff --git a/lib/isc/tests/doh_test.c b/lib/isc/tests/doh_test.c index 1fd1283abe..179c5fba97 100644 --- a/lib/isc/tests/doh_test.c +++ b/lib/isc/tests/doh_test.c @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include @@ -259,8 +258,6 @@ _setup(void **state) { return (-1); } - isc_hp_init(4 * workers); - signal(SIGPIPE, SIG_IGN); return (0); diff --git a/lib/isc/tests/isctest.c b/lib/isc/tests/isctest.c index 3f26583a6f..18b95a9c8c 100644 --- a/lib/isc/tests/isctest.c +++ b/lib/isc/tests/isctest.c @@ -21,7 +21,6 @@ #include #include -#include #include #include #include @@ -78,7 +77,6 @@ create_managers(unsigned int workers) { } INSIST(workers != 0); - isc_hp_init(6 * workers); isc_managers_create(test_mctx, workers, 0, &netmgr, &taskmgr, &timermgr); diff --git a/lib/isc/tests/netmgr_test.c b/lib/isc/tests/netmgr_test.c index b27c095b7d..50cf0d9e44 100644 --- a/lib/isc/tests/netmgr_test.c +++ b/lib/isc/tests/netmgr_test.c @@ -23,7 +23,6 @@ #define UNIT_TESTING #include -#include #include #include #include @@ -209,8 +208,6 @@ _setup(void **state __attribute__((unused))) { return (-1); } - isc_hp_init(4 * workers); - signal(SIGPIPE, SIG_IGN); if (getenv("CI") == NULL || getenv("CI_ENABLE_ALL_TESTS") != NULL) {