Merge branch 'ondrej-use-locked-queue' into 'main'

Replace netievent lock-free queue with simple locked queue

Closes #3180

See merge request isc-projects/bind9!5914
This commit is contained in:
Ondřej Surý 2022-03-04 13:12:53 +00:00
commit d811cca3c6
16 changed files with 136 additions and 815 deletions

View file

@ -1,3 +1,7 @@
5823. [func] Replace hazard pointers based lock-free list with
locked-list based queue that's simpler and has no or
little performance impact. [GL #3180]
5822. [bug] When calling dns_dispatch_send(), attach/detach
dns_request_t object as the read callback could
be called before send callback dereferencing

View file

@ -32,7 +32,6 @@
#include <isc/dir.h>
#include <isc/file.h>
#include <isc/hash.h>
#include <isc/hp.h>
#include <isc/httpd.h>
#include <isc/managers.h>
#include <isc/netmgr.h>

View file

@ -38,7 +38,6 @@ libisc_la_HEADERS = \
include/isc/heap.h \
include/isc/hex.h \
include/isc/hmac.h \
include/isc/hp.h \
include/isc/ht.h \
include/isc/httpd.h \
include/isc/interfaceiter.h \
@ -67,7 +66,6 @@ libisc_la_HEADERS = \
include/isc/pool.h \
include/isc/portset.h \
include/isc/print.h \
include/isc/queue.h \
include/isc/quota.h \
include/isc/radix.h \
include/isc/random.h \
@ -145,7 +143,6 @@ libisc_la_SOURCES = \
heap.c \
hex.c \
hmac.c \
hp.c \
ht.c \
httpd.c \
interfaceiter.c \
@ -173,7 +170,6 @@ libisc_la_SOURCES = \
parseint.c \
pool.c \
portset.c \
queue.c \
quota.c \
radix.c \
random.c \

View file

@ -1,225 +0,0 @@
/*
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
*
* SPDX-License-Identifier: MPL-2.0
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, you can obtain one at https://mozilla.org/MPL/2.0/.
*
* See the COPYRIGHT file distributed with this work for additional
* information regarding copyright ownership.
*/
/*
* Hazard Pointer implementation.
*
* This work is based on C++ code available from:
* https://github.com/pramalhe/ConcurrencyFreaks/
*
* Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Concurrency Freaks nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER>
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <inttypes.h>
#include <isc/align.h>
#include <isc/atomic.h>
#include <isc/hp.h>
#include <isc/mem.h>
#include <isc/once.h>
#include <isc/os.h>
#include <isc/string.h>
#include <isc/thread.h>
#include <isc/util.h>
static int isc__hp_max_threads = 1;
typedef struct retirelist {
int size;
uintptr_t *list;
} retirelist_t;
typedef atomic_uintptr_t isc_hp_uintptr_t;
struct isc_hp {
int max_hps;
int max_retired;
isc_mem_t *mctx;
isc_hp_deletefunc_t *deletefunc;
isc_hp_uintptr_t **hp;
retirelist_t **rl;
};
static inline int
tid(void) {
return (isc_tid_v);
}
void
isc_hp_init(int max_threads) {
REQUIRE(max_threads > 0);
if (isc__hp_max_threads > max_threads) {
return;
}
isc__hp_max_threads = max_threads;
}
isc_hp_t *
isc_hp_new(isc_mem_t *mctx, size_t max_hps, isc_hp_deletefunc_t *deletefunc) {
isc_hp_t *hp = isc_mem_get(mctx, sizeof(*hp));
REQUIRE(isc__hp_max_threads > 0);
REQUIRE(max_hps > 0);
*hp = (isc_hp_t){
.max_hps = max_hps,
.max_retired = isc__hp_max_threads * max_hps,
.deletefunc = deletefunc,
};
isc_mem_attach(mctx, &hp->mctx);
hp->hp = isc_mem_get(mctx, isc__hp_max_threads * sizeof(hp->hp[0]));
for (int i = 0; i < isc__hp_max_threads; i++) {
isc_hp_uintptr_t *hps;
hps = isc_mem_get_aligned(mctx, hp->max_hps * sizeof(*hps),
isc_os_cacheline());
for (int j = 0; j < hp->max_hps; j++) {
atomic_init(&hps[j], 0);
}
hp->hp[i] = hps;
}
hp->rl = isc_mem_get(mctx, isc__hp_max_threads * sizeof(hp->rl[0]));
for (int i = 0; i < isc__hp_max_threads; i++) {
retirelist_t *rl;
rl = isc_mem_get_aligned(mctx, sizeof(*rl), isc_os_cacheline());
rl->size = 0;
rl->list = isc_mem_get(hp->mctx,
hp->max_retired * sizeof(uintptr_t));
memset(rl->list, 0, hp->max_retired * sizeof(uintptr_t));
hp->rl[i] = rl;
}
return (hp);
}
void
isc_hp_destroy(isc_hp_t *hp) {
for (int i = 0; i < isc__hp_max_threads; i++) {
retirelist_t *rl = hp->rl[i];
for (int j = 0; j < rl->size; j++) {
void *data = (void *)rl->list[j];
hp->deletefunc(data);
}
isc_mem_put(hp->mctx, rl->list,
hp->max_retired * sizeof(uintptr_t));
isc_mem_put_aligned(hp->mctx, rl, sizeof(*rl),
isc_os_cacheline());
}
for (int i = 0; i < isc__hp_max_threads; i++) {
isc_hp_uintptr_t *hps = hp->hp[i];
isc_mem_put_aligned(hp->mctx, hps, hp->max_hps * sizeof(*hps),
isc_os_cacheline());
}
isc_mem_put(hp->mctx, hp->hp, isc__hp_max_threads * sizeof(hp->hp[0]));
isc_mem_put(hp->mctx, hp->rl, isc__hp_max_threads * sizeof(hp->rl[0]));
isc_mem_putanddetach(&hp->mctx, hp, sizeof(*hp));
}
void
isc_hp_clear(isc_hp_t *hp) {
for (int i = 0; i < hp->max_hps; i++) {
atomic_store_release(&hp->hp[tid()][i], 0);
}
}
void
isc_hp_clear_one(isc_hp_t *hp, int ihp) {
atomic_store_release(&hp->hp[tid()][ihp], 0);
}
uintptr_t
isc_hp_protect(isc_hp_t *hp, int ihp, atomic_uintptr_t *atom) {
uintptr_t n = 0;
uintptr_t ret;
while ((ret = atomic_load(atom)) != n) {
atomic_store(&hp->hp[tid()][ihp], ret);
n = ret;
}
return (ret);
}
uintptr_t
isc_hp_protect_ptr(isc_hp_t *hp, int ihp, atomic_uintptr_t ptr) {
atomic_store(&hp->hp[tid()][ihp], atomic_load(&ptr));
return (atomic_load(&ptr));
}
uintptr_t
isc_hp_protect_release(isc_hp_t *hp, int ihp, atomic_uintptr_t ptr) {
atomic_store_release(&hp->hp[tid()][ihp], atomic_load(&ptr));
return (atomic_load(&ptr));
}
void
isc_hp_retire(isc_hp_t *hp, uintptr_t ptr) {
retirelist_t *rl = hp->rl[tid()];
rl->list[rl->size++] = ptr;
INSIST(rl->size < hp->max_retired);
for (int iret = 0; iret < rl->size; iret++) {
uintptr_t obj = rl->list[iret];
bool can_delete = true;
for (int itid = 0; itid < isc__hp_max_threads && can_delete;
itid++) {
for (int ihp = hp->max_hps - 1; ihp >= 0; ihp--) {
if (atomic_load(&hp->hp[itid][ihp]) == obj) {
can_delete = false;
break;
}
}
}
if (can_delete) {
size_t bytes = (rl->size - iret) * sizeof(rl->list[0]);
memmove(&rl->list[iret], &rl->list[iret + 1], bytes);
rl->size--;
hp->deletefunc((void *)obj);
}
}
}

View file

@ -1,142 +0,0 @@
/*
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
*
* SPDX-License-Identifier: MPL-2.0
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, you can obtain one at https://mozilla.org/MPL/2.0/.
*
* See the COPYRIGHT file distributed with this work for additional
* information regarding copyright ownership.
*/
/*
* Hazard Pointer implementation.
*
* This work is based on C++ code available from:
* https://github.com/pramalhe/ConcurrencyFreaks/
*
* Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Concurrency Freaks nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER>
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include <isc/atomic.h>
#include <isc/mem.h>
#include <isc/string.h>
#include <isc/types.h>
#include <isc/util.h>
/*%
* Hazard pointers are a mechanism for protecting objects in memory
* from being deleted by other threads while in use. This allows
* safe lock-free data structures.
*
* This is an adaptation of the ConcurrencyFreaks implementation in C.
* More details available at https://github.com/pramalhe/ConcurrencyFreaks,
* in the file HazardPointers.hpp.
*/
typedef void(isc_hp_deletefunc_t)(void *);
void
isc_hp_init(int max_threads);
/*%<
* Initialize hazard pointer constants, isc__hp_max_threads and
* isc__hp_max_retired. If more threads try to access hp, it
* will assert. Calling this function repeatedly can be used
* to increase the limits, but cannot reduce them.
*/
isc_hp_t *
isc_hp_new(isc_mem_t *mctx, size_t max_hps, isc_hp_deletefunc_t *deletefunc);
/*%<
* Create a new hazard pointer array of size 'max_hps' (or a reasonable
* default value if 'max_hps' is 0). The function 'deletefunc' will be
* used to delete objects protected by hazard pointers when it becomes
* safe to retire them.
*/
void
isc_hp_destroy(isc_hp_t *hp);
/*%<
* Destroy a hazard pointer array and clean up all objects protected
* by hazard pointers.
*/
void
isc_hp_clear(isc_hp_t *hp);
/*%<
* Clear all hazard pointers in the array for the current thread.
*
* Progress condition: wait-free bounded (by max_hps)
*/
void
isc_hp_clear_one(isc_hp_t *hp, int ihp);
/*%<
* Clear a specified hazard pointer in the array for the current thread.
*
* Progress condition: wait-free population oblivious.
*/
uintptr_t
isc_hp_protect(isc_hp_t *hp, int ihp, atomic_uintptr_t *atom);
/*%<
* Protect an object referenced by 'atom' with a hazard pointer for the
* current thread.
*
* Progress condition: lock-free.
*/
uintptr_t
isc_hp_protect_ptr(isc_hp_t *hp, int ihp, atomic_uintptr_t ptr);
/*%<
* This returns the same value that is passed as ptr, which is sometimes
* useful.
*
* Progress condition: wait-free population oblivious.
*/
uintptr_t
isc_hp_protect_release(isc_hp_t *hp, int ihp, atomic_uintptr_t ptr);
/*%<
* Same as isc_hp_protect_ptr(), but explicitly uses memory_order_release.
*
* Progress condition: wait-free population oblivious.
*/
void
isc_hp_retire(isc_hp_t *hp, uintptr_t ptr);
/*%<
* Retire an object that is no longer in use by any thread, calling
* the delete function that was specified in isc_hp_new().
*
* Progress condition: wait-free bounded (by the number of threads squared)
*/

View file

@ -198,3 +198,17 @@
__ISC_LIST_UNLINKUNSAFE_TYPE(list, elt, link, void)
#define __ISC_LIST_DEQUEUEUNSAFE_TYPE(list, elt, link, type) \
__ISC_LIST_UNLINKUNSAFE_TYPE(list, elt, link, type)
#define ISC_LIST_MOVEUNSAFE(dest, src) \
{ \
(dest).head = (src).head; \
(dest).tail = (src).tail; \
(src).head = NULL; \
(src).tail = NULL; \
}
#define ISC_LIST_MOVE(dest, src) \
{ \
INSIST(ISC_LIST_EMPTY(dest)); \
ISC_LIST_MOVEUNSAFE(dest, src); \
}

View file

@ -1,56 +0,0 @@
/*
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
*
* SPDX-License-Identifier: MPL-2.0
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, you can obtain one at https://mozilla.org/MPL/2.0/.
*
* See the COPYRIGHT file distributed with this work for additional
* information regarding copyright ownership.
*/
#pragma once
#include <isc/mem.h>
typedef struct isc_queue isc_queue_t;
isc_queue_t *
isc_queue_new(isc_mem_t *mctx);
/*%<
* Create a new fetch-and-add array queue.
*
* 'max_threads' is currently unused. In the future it can be used
* to pass a maximum threads parameter when creating hazard pointers,
* but currently `isc_hp_t` uses a hard-coded value.
*/
void
isc_queue_enqueue(isc_queue_t *queue, uintptr_t item);
/*%<
* Enqueue an object pointer 'item' at the tail of the queue.
*
* Requires:
* \li 'item' is not null.
*/
uintptr_t
isc_queue_dequeue(isc_queue_t *queue);
/*%<
* Remove an object pointer from the head of the queue and return the
* pointer. If the queue is empty, return `nulluintptr` (the uintptr_t
* representation of NULL).
*
* Requires:
* \li 'queue' is not null.
*/
void
isc_queue_destroy(isc_queue_t *queue);
/*%<
* Destroy a queue.
*
* Requires:
* \li 'queue' is not null.
*/

View file

@ -44,11 +44,9 @@ typedef struct isc_counter isc_counter_t; /*%< Counter */
typedef int16_t isc_dscp_t; /*%< Diffserv code point */
typedef struct isc_event isc_event_t; /*%< Event */
typedef ISC_LIST(isc_event_t) isc_eventlist_t; /*%< Event List */
typedef unsigned int isc_eventtype_t; /*%< Event Type */
typedef uint32_t isc_fsaccess_t; /*%< FS Access */
typedef struct isc_hash isc_hash_t; /*%< Hash */
typedef struct isc_hp isc_hp_t; /*%< Hazard
* pointer */
typedef unsigned int isc_eventtype_t; /*%< Event Type */
typedef uint32_t isc_fsaccess_t; /*%< FS Access */
typedef struct isc_hash isc_hash_t; /*%< Hash */
typedef struct isc_httpd isc_httpd_t; /*%< HTTP client */
typedef void(isc_httpdfree_t)(isc_buffer_t *, void *); /*%< HTTP free function
*/

View file

@ -11,7 +11,6 @@
* information regarding copyright ownership.
*/
#include <isc/hp.h>
#include <isc/managers.h>
#include <isc/util.h>
@ -28,15 +27,6 @@ isc_managers_create(isc_mem_t *mctx, size_t workers, size_t quantum,
isc_taskmgr_t *taskmgr = NULL;
isc_timermgr_t *timermgr = NULL;
/*
* Currently, there are:
* - 1 main thread
* - 1 timer thread
* - n netmgr threads
* - n threadpool threads
*/
isc_hp_init(2 + 2 * workers);
REQUIRE(netmgrp != NULL && *netmgrp == NULL);
isc__netmgr_create(mctx, workers, &netmgr);
*netmgrp = netmgr;

View file

@ -27,7 +27,6 @@
#include <isc/magic.h>
#include <isc/mem.h>
#include <isc/netmgr.h>
#include <isc/queue.h>
#include <isc/quota.h>
#include <isc/random.h>
#include <isc/refcount.h>
@ -201,6 +200,17 @@ typedef enum {
NETIEVENT_MAX = 4,
} netievent_type_t;
typedef struct isc__nm_uvreq isc__nm_uvreq_t;
typedef struct isc__netievent isc__netievent_t;
typedef ISC_LIST(isc__netievent_t) isc__netievent_list_t;
typedef struct ievent {
isc_mutex_t lock;
isc_condition_t cond;
isc__netievent_list_t list;
} ievent_t;
/*
* Single network event loop worker.
*/
@ -210,13 +220,10 @@ typedef struct isc__networker {
uv_loop_t loop; /* libuv loop structure */
uv_async_t async; /* async channel to send
* data to this networker */
isc_mutex_t lock;
bool paused;
bool finished;
isc_thread_t thread;
isc_queue_t *ievents[NETIEVENT_MAX];
atomic_uint_fast32_t nievents[NETIEVENT_MAX];
isc_condition_t cond_prio;
ievent_t ievents[NETIEVENT_MAX];
isc_refcount_t references;
atomic_int_fast64_t pktcount;
@ -421,12 +428,13 @@ isc__nm_put_netievent(isc_nm_t *mgr, void *ievent);
* either in netmgr.c or matching protocol file (e.g. udp.c, tcp.c, etc.)
*/
#define NETIEVENT__SOCKET \
isc__netievent_type type; \
isc_nmsocket_t *sock; \
const char *file; \
unsigned int line; \
const char *func
#define NETIEVENT__SOCKET \
isc__netievent_type type; \
ISC_LINK(isc__netievent_t) link; \
isc_nmsocket_t *sock; \
const char *file; \
unsigned int line; \
const char *func;
typedef struct isc__netievent__socket {
NETIEVENT__SOCKET;
@ -489,8 +497,7 @@ typedef struct isc__netievent__socket_req {
}
typedef struct isc__netievent__socket_req_result {
isc__netievent_type type;
isc_nmsocket_t *sock;
NETIEVENT__SOCKET;
isc__nm_uvreq_t *req;
isc_result_t result;
} isc__netievent__socket_req_result_t;
@ -589,6 +596,7 @@ typedef struct isc__netievent__socket_quota {
typedef struct isc__netievent__task {
isc__netievent_type type;
ISC_LINK(isc__netievent_t) link;
isc_task_t *task;
} isc__netievent__task_t;
@ -624,8 +632,7 @@ typedef struct isc__netievent_udpsend {
} isc__netievent_udpsend_t;
typedef struct isc__netievent_tlsconnect {
isc__netievent_type type;
isc_nmsocket_t *sock;
NETIEVENT__SOCKET;
SSL_CTX *ctx;
isc_sockaddr_t local; /* local address */
isc_sockaddr_t peer; /* peer address */
@ -633,6 +640,7 @@ typedef struct isc__netievent_tlsconnect {
typedef struct isc__netievent {
isc__netievent_type type;
ISC_LINK(isc__netievent_t) link;
} isc__netievent_t;
#define NETIEVENT_TYPE(type) typedef isc__netievent_t isc__netievent_##type##_t;

View file

@ -21,6 +21,7 @@
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/errno.h>
#include <isc/list.h>
#include <isc/log.h>
#include <isc/magic.h>
#include <isc/mem.h>
@ -145,6 +146,7 @@ static isc_threadresult_t
nm_thread(isc_threadarg_t worker0);
static void
async_cb(uv_async_t *handle);
static bool
process_netievent(isc__networker_t *worker, isc__netievent_t *ievent);
static isc_result_t
@ -154,51 +156,6 @@ wait_for_priority_queue(isc__networker_t *worker);
static void
drain_queue(isc__networker_t *worker, netievent_type_t type);
#define ENQUEUE_NETIEVENT(worker, queue, event) \
isc_queue_enqueue(worker->ievents[queue], (uintptr_t)event)
#define DEQUEUE_NETIEVENT(worker, queue) \
(isc__netievent_t *)isc_queue_dequeue(worker->ievents[queue])
#define ENQUEUE_PRIORITY_NETIEVENT(worker, event) \
ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY, event)
#define ENQUEUE_PRIVILEGED_NETIEVENT(worker, event) \
ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED, event)
#define ENQUEUE_TASK_NETIEVENT(worker, event) \
ENQUEUE_NETIEVENT(worker, NETIEVENT_TASK, event)
#define ENQUEUE_NORMAL_NETIEVENT(worker, event) \
ENQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL, event)
#define DEQUEUE_PRIORITY_NETIEVENT(worker) \
DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY)
#define DEQUEUE_PRIVILEGED_NETIEVENT(worker) \
DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
#define DEQUEUE_TASK_NETIEVENT(worker) DEQUEUE_NETIEVENT(worker, NETIEVENT_TASK)
#define DEQUEUE_NORMAL_NETIEVENT(worker) \
DEQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL)
#define INCREMENT_NETIEVENT(worker, queue) \
atomic_fetch_add_release(&worker->nievents[queue], 1)
#define DECREMENT_NETIEVENT(worker, queue) \
atomic_fetch_sub_release(&worker->nievents[queue], 1)
#define INCREMENT_PRIORITY_NETIEVENT(worker) \
INCREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY)
#define INCREMENT_PRIVILEGED_NETIEVENT(worker) \
INCREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
#define INCREMENT_TASK_NETIEVENT(worker) \
INCREMENT_NETIEVENT(worker, NETIEVENT_TASK)
#define INCREMENT_NORMAL_NETIEVENT(worker) \
INCREMENT_NETIEVENT(worker, NETIEVENT_NORMAL)
#define DECREMENT_PRIORITY_NETIEVENT(worker) \
DECREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY)
#define DECREMENT_PRIVILEGED_NETIEVENT(worker) \
DECREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
#define DECREMENT_TASK_NETIEVENT(worker) \
DECREMENT_NETIEVENT(worker, NETIEVENT_TASK)
#define DECREMENT_NORMAL_NETIEVENT(worker) \
DECREMENT_NETIEVENT(worker, NETIEVENT_NORMAL)
static void
isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0);
static void
@ -311,12 +268,10 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
r = uv_async_init(&worker->loop, &worker->async, async_cb);
UV_RUNTIME_CHECK(uv_async_init, r);
isc_mutex_init(&worker->lock);
isc_condition_init(&worker->cond_prio);
for (size_t type = 0; type < NETIEVENT_MAX; type++) {
worker->ievents[type] = isc_queue_new(mgr->mctx);
atomic_init(&worker->nievents[type], 0);
isc_mutex_init(&worker->ievents[type].lock);
isc_condition_init(&worker->ievents[type].cond);
ISC_LIST_INIT(worker->ievents[type].list);
}
worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE);
@ -367,28 +322,15 @@ nm_destroy(isc_nm_t **mgr0) {
for (int i = 0; i < mgr->nworkers; i++) {
isc__networker_t *worker = &mgr->workers[i];
isc__netievent_t *ievent = NULL;
int r;
/* Empty the async event queues */
while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) {
isc__nm_put_netievent(mgr, ievent);
}
INSIST(DEQUEUE_PRIVILEGED_NETIEVENT(worker) == NULL);
INSIST(DEQUEUE_TASK_NETIEVENT(worker) == NULL);
while ((ievent = DEQUEUE_NORMAL_NETIEVENT(worker)) != NULL) {
isc__nm_put_netievent(mgr, ievent);
}
isc_condition_destroy(&worker->cond_prio);
isc_mutex_destroy(&worker->lock);
r = uv_loop_close(&worker->loop);
UV_RUNTIME_CHECK(uv_loop_close, r);
for (size_t type = 0; type < NETIEVENT_MAX; type++) {
isc_queue_destroy(worker->ievents[type]);
INSIST(ISC_LIST_EMPTY(worker->ievents[type].list));
isc_condition_destroy(&worker->ievents[type].cond);
isc_mutex_destroy(&worker->ievents[type].lock);
}
isc_mem_put(mgr->mctx, worker->sendbuf,
@ -737,13 +679,17 @@ nm_thread(isc_threadarg_t worker0) {
}
/*
* We are shutting down. Process the task queues
* (they may include shutdown events) but do not process
* the netmgr event queue.
* We are shutting down. Drain the queues.
*/
drain_queue(worker, NETIEVENT_PRIVILEGED);
drain_queue(worker, NETIEVENT_TASK);
for (size_t type = 0; type < NETIEVENT_MAX; type++) {
LOCK(&worker->ievents[type].lock);
INSIST(ISC_LIST_EMPTY(worker->ievents[type].list));
UNLOCK(&worker->ievents[type].lock);
}
LOCK(&mgr->lock);
mgr->workers_running--;
SIGNAL(&mgr->wkstatecond);
@ -765,7 +711,8 @@ process_all_queues(isc__networker_t *worker) {
isc_result_t result = process_queue(worker, type);
switch (result) {
case ISC_R_SUSPEND:
return (true);
reschedule = true;
break;
case ISC_R_EMPTY:
/* empty queue */
break;
@ -859,35 +806,29 @@ isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) {
static void
wait_for_priority_queue(isc__networker_t *worker) {
isc_condition_t *cond = &worker->cond_prio;
bool wait_for_work = true;
isc_condition_t *cond = &worker->ievents[NETIEVENT_PRIORITY].cond;
isc_mutex_t *lock = &worker->ievents[NETIEVENT_PRIORITY].lock;
isc__netievent_list_t *list =
&(worker->ievents[NETIEVENT_PRIORITY].list);
while (true) {
isc__netievent_t *ievent;
LOCK(&worker->lock);
ievent = DEQUEUE_PRIORITY_NETIEVENT(worker);
if (wait_for_work) {
while (ievent == NULL) {
WAIT(cond, &worker->lock);
ievent = DEQUEUE_PRIORITY_NETIEVENT(worker);
}
}
UNLOCK(&worker->lock);
wait_for_work = false;
if (ievent == NULL) {
return;
}
DECREMENT_PRIORITY_NETIEVENT(worker);
(void)process_netievent(worker, ievent);
LOCK(lock);
while (ISC_LIST_EMPTY(*list)) {
WAIT(cond, lock);
}
UNLOCK(lock);
drain_queue(worker, NETIEVENT_PRIORITY);
}
static void
drain_queue(isc__networker_t *worker, netievent_type_t type) {
while (process_queue(worker, type) != ISC_R_EMPTY) {
;
bool empty = false;
while (!empty) {
if (process_queue(worker, type) == ISC_R_EMPTY) {
LOCK(&worker->ievents[type].lock);
empty = ISC_LIST_EMPTY(worker->ievents[type].list);
UNLOCK(&worker->ievents[type].lock);
}
}
}
@ -995,40 +936,41 @@ process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) {
static isc_result_t
process_queue(isc__networker_t *worker, netievent_type_t type) {
/*
* The number of items on the queue is only loosely synchronized with
* the items on the queue. But there's a guarantee that if there's an
* item on the queue, it will be accounted for. However there's a
* possibility that the counter might be higher than the items on the
* queue stored.
*/
uint_fast32_t waiting = atomic_load_acquire(&worker->nievents[type]);
isc__netievent_t *ievent = DEQUEUE_NETIEVENT(worker, type);
isc__netievent_t *ievent = NULL;
isc__netievent_list_t list;
if (ievent == NULL && waiting == 0) {
ISC_LIST_INIT(list);
LOCK(&worker->ievents[type].lock);
ISC_LIST_MOVE(list, worker->ievents[type].list);
UNLOCK(&worker->ievents[type].lock);
ievent = ISC_LIST_HEAD(list);
if (ievent == NULL) {
/* There's nothing scheduled */
return (ISC_R_EMPTY);
} else if (ievent == NULL) {
/* There's at least one item scheduled, but not on the queue yet
*/
return (ISC_R_SUCCESS);
}
while (ievent != NULL) {
DECREMENT_NETIEVENT(worker, type);
bool stop = !process_netievent(worker, ievent);
isc__netievent_t *next = ISC_LIST_NEXT(ievent, link);
ISC_LIST_DEQUEUE(list, ievent, link);
if (stop) {
/* Netievent told us to stop */
if (!process_netievent(worker, ievent)) {
/* The netievent told us to stop */
if (!ISC_LIST_EMPTY(list)) {
/*
* Reschedule the rest of the unprocessed
* events.
*/
LOCK(&worker->ievents[type].lock);
ISC_LIST_PREPENDLIST(worker->ievents[type].list,
list, link);
UNLOCK(&worker->ievents[type].lock);
}
return (ISC_R_SUSPEND);
}
if (waiting-- == 0) {
/* We reached this round "quota" */
break;
}
ievent = DEQUEUE_NETIEVENT(worker, type);
ievent = next;
}
/* We processed at least one */
@ -1041,6 +983,7 @@ isc__nm_get_netievent(isc_nm_t *mgr, isc__netievent_type type) {
sizeof(*event));
*event = (isc__netievent_storage_t){ .ni.type = type };
ISC_LINK_INIT(&(event->ni), link);
return (event);
}
@ -1130,26 +1073,39 @@ isc__nm_maybe_enqueue_ievent(isc__networker_t *worker,
void
isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {
netievent_type_t type;
if (event->type > netievent_prio) {
/*
* We need to make sure this signal will be delivered and
* the queue will be processed.
*/
LOCK(&worker->lock);
INCREMENT_PRIORITY_NETIEVENT(worker);
ENQUEUE_PRIORITY_NETIEVENT(worker, event);
SIGNAL(&worker->cond_prio);
UNLOCK(&worker->lock);
} else if (event->type == netievent_privilegedtask) {
INCREMENT_PRIVILEGED_NETIEVENT(worker);
ENQUEUE_PRIVILEGED_NETIEVENT(worker, event);
} else if (event->type == netievent_task) {
INCREMENT_TASK_NETIEVENT(worker);
ENQUEUE_TASK_NETIEVENT(worker, event);
type = NETIEVENT_PRIORITY;
} else {
INCREMENT_NORMAL_NETIEVENT(worker);
ENQUEUE_NORMAL_NETIEVENT(worker, event);
switch (event->type) {
case netievent_prio:
INSIST(0);
ISC_UNREACHABLE();
break;
case netievent_privilegedtask:
type = NETIEVENT_PRIVILEGED;
break;
case netievent_task:
type = NETIEVENT_TASK;
break;
default:
type = NETIEVENT_NORMAL;
break;
}
}
/*
* We need to make sure this signal will be delivered and
* the queue will be processed.
*/
LOCK(&worker->ievents[type].lock);
ISC_LIST_ENQUEUE(worker->ievents[type].list, event, link);
if (type == NETIEVENT_PRIORITY) {
SIGNAL(&worker->ievents[type].cond);
}
UNLOCK(&worker->ievents[type].lock);
uv_async_send(&worker->async);
}
@ -2927,6 +2883,7 @@ shutdown_walk_cb(uv_handle_t *handle, void *arg) {
void
isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ev0) {
UNUSED(ev0);
uv_walk(&worker->loop, shutdown_walk_cb, NULL);
}

View file

@ -1,214 +0,0 @@
/*
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
*
* SPDX-License-Identifier: MPL-2.0
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, you can obtain one at https://mozilla.org/MPL/2.0/.
*
* See the COPYRIGHT file distributed with this work for additional
* information regarding copyright ownership.
*/
#include <inttypes.h>
#include <isc/align.h>
#include <isc/atomic.h>
#include <isc/hp.h>
#include <isc/mem.h>
#include <isc/os.h>
#include <isc/queue.h>
#include <isc/string.h>
#define BUFFER_SIZE 1024
static uintptr_t nulluintptr = (uintptr_t)NULL;
typedef struct node {
atomic_uint_fast32_t deqidx;
atomic_uintptr_t items[BUFFER_SIZE];
atomic_uint_fast32_t enqidx;
atomic_uintptr_t next;
isc_mem_t *mctx;
} node_t;
/* we just need one Hazard Pointer */
#define HP_TAIL 0
#define HP_HEAD 0
struct isc_queue {
alignas(ISC_OS_CACHELINE_SIZE) atomic_uintptr_t head;
alignas(ISC_OS_CACHELINE_SIZE) atomic_uintptr_t tail;
isc_mem_t *mctx;
int taken;
isc_hp_t *hp;
};
static node_t *
node_new(isc_mem_t *mctx, uintptr_t item) {
node_t *node = isc_mem_get(mctx, sizeof(*node));
*node = (node_t){ .mctx = NULL };
atomic_init(&node->deqidx, 0);
atomic_init(&node->enqidx, 1);
atomic_init(&node->next, 0);
atomic_init(&node->items[0], item);
for (int i = 1; i < BUFFER_SIZE; i++) {
atomic_init(&node->items[i], 0);
}
isc_mem_attach(mctx, &node->mctx);
return (node);
}
static void
node_destroy(void *node0) {
node_t *node = (node_t *)node0;
isc_mem_putanddetach(&node->mctx, node, sizeof(*node));
}
static bool
node_cas_next(node_t *node, node_t *cmp, const node_t *val) {
return (atomic_compare_exchange_strong(&node->next, (uintptr_t *)&cmp,
(uintptr_t)val));
}
static bool
queue_cas_tail(isc_queue_t *queue, node_t *cmp, const node_t *val) {
return (atomic_compare_exchange_strong(&queue->tail, (uintptr_t *)&cmp,
(uintptr_t)val));
}
static bool
queue_cas_head(isc_queue_t *queue, node_t *cmp, const node_t *val) {
return (atomic_compare_exchange_strong(&queue->head, (uintptr_t *)&cmp,
(uintptr_t)val));
}
isc_queue_t *
isc_queue_new(isc_mem_t *mctx) {
isc_queue_t *queue = NULL;
node_t *sentinel = NULL;
queue = isc_mem_get_aligned(mctx, sizeof(*queue), isc_os_cacheline());
*queue = (isc_queue_t){ 0 };
isc_mem_attach(mctx, &queue->mctx);
queue->hp = isc_hp_new(mctx, 1, node_destroy);
sentinel = node_new(mctx, nulluintptr);
atomic_init(&sentinel->enqidx, 0);
atomic_init(&queue->head, (uintptr_t)sentinel);
atomic_init(&queue->tail, (uintptr_t)sentinel);
return (queue);
}
void
isc_queue_enqueue(isc_queue_t *queue, uintptr_t item) {
REQUIRE(item != nulluintptr);
while (true) {
node_t *lt = NULL;
uint_fast32_t idx;
uintptr_t n = nulluintptr;
lt = (node_t *)isc_hp_protect(queue->hp, HP_TAIL, &queue->tail);
idx = atomic_fetch_add(&lt->enqidx, 1);
if (idx > BUFFER_SIZE - 1) {
node_t *lnext = NULL;
if (lt != (node_t *)atomic_load(&queue->tail)) {
continue;
}
lnext = (node_t *)atomic_load(&lt->next);
if (lnext == NULL) {
node_t *newnode = node_new(queue->mctx, item);
if (node_cas_next(lt, NULL, newnode)) {
queue_cas_tail(queue, lt, newnode);
isc_hp_clear(queue->hp);
return;
}
node_destroy(newnode);
} else {
queue_cas_tail(queue, lt, lnext);
}
continue;
}
if (atomic_compare_exchange_strong(&lt->items[idx], &n, item)) {
isc_hp_clear(queue->hp);
return;
}
}
}
uintptr_t
isc_queue_dequeue(isc_queue_t *queue) {
REQUIRE(queue != NULL);
while (true) {
node_t *lh = NULL;
uint_fast32_t idx;
uintptr_t item;
lh = (node_t *)isc_hp_protect(queue->hp, HP_HEAD, &queue->head);
if (atomic_load(&lh->deqidx) >= atomic_load(&lh->enqidx) &&
atomic_load(&lh->next) == nulluintptr)
{
break;
}
idx = atomic_fetch_add(&lh->deqidx, 1);
if (idx > BUFFER_SIZE - 1) {
node_t *lnext = (node_t *)atomic_load(&lh->next);
if (lnext == NULL) {
break;
}
if (queue_cas_head(queue, lh, lnext)) {
isc_hp_retire(queue->hp, (uintptr_t)lh);
}
continue;
}
item = atomic_exchange(&(lh->items[idx]),
(uintptr_t)&queue->taken);
if (item == nulluintptr) {
continue;
}
isc_hp_clear(queue->hp);
return (item);
}
isc_hp_clear(queue->hp);
return (nulluintptr);
}
void
isc_queue_destroy(isc_queue_t *queue) {
node_t *last = NULL;
REQUIRE(queue != NULL);
while (isc_queue_dequeue(queue) != nulluintptr) {
/* do nothing */
}
last = (node_t *)atomic_load_relaxed(&queue->head);
node_destroy(last);
isc_hp_destroy(queue->hp);
isc_mem_putanddetach_aligned(&queue->mctx, queue, sizeof(*queue),
isc_os_cacheline());
}

View file

@ -200,7 +200,7 @@ isc_task_create(isc_taskmgr_t *manager, unsigned int quantum,
isc_result_t
isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum,
isc_task_t **taskp, int threadid) {
isc_task_t *task;
isc_task_t *task = NULL;
bool exiting;
REQUIRE(VALID_MANAGER(manager));

View file

@ -29,7 +29,6 @@
#include <isc/atomic.h>
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/hp.h>
#include <isc/mutex.h>
#include <isc/netmgr.h>
#include <isc/nonce.h>
@ -259,8 +258,6 @@ _setup(void **state) {
return (-1);
}
isc_hp_init(4 * workers);
signal(SIGPIPE, SIG_IGN);
return (0);

View file

@ -21,7 +21,6 @@
#include <isc/buffer.h>
#include <isc/hash.h>
#include <isc/hp.h>
#include <isc/managers.h>
#include <isc/mem.h>
#include <isc/os.h>
@ -78,7 +77,6 @@ create_managers(unsigned int workers) {
}
INSIST(workers != 0);
isc_hp_init(6 * workers);
isc_managers_create(test_mctx, workers, 0, &netmgr, &taskmgr,
&timermgr);

View file

@ -23,7 +23,6 @@
#define UNIT_TESTING
#include <cmocka.h>
#include <isc/hp.h>
#include <isc/nonce.h>
#include <isc/os.h>
#include <isc/quota.h>
@ -209,8 +208,6 @@ _setup(void **state __attribute__((unused))) {
return (-1);
}
isc_hp_init(4 * workers);
signal(SIGPIPE, SIG_IGN);
if (getenv("CI") == NULL || getenv("CI_ENABLE_ALL_TESTS") != NULL) {