Replace netievent lock-free queue with simple locked queue

The current implementation of isc_queue uses Michael-Scott lock-free
queue that in turn uses hazard pointers.  It was discovered that the way
we use the isc_queue, such complicated mechanism isn't really needed,
because most of the time, we either execute the work directly when on
nmthread (in case of UDP) or schedule the work from the matching
nmthreads.

Replace the current implementation of the isc_queue with a simple locked
ISC_LIST.  There's a slight improvement - since copying the whole list
is very lightweight - we move the queue into a new list before we start
the processing and locking just for moving the queue and not for every
single item on the list.

NOTE: There's a room for future improvements - since we don't guarantee
the order in which the netievents are processed, we could have two lists
- one unlocked that would be used when scheduling the work from the
matching thread and one locked that would be used from non-matching
thread.
This commit is contained in:
Ondřej Surý 2022-02-22 23:40:39 +01:00
parent 1bb56bb0fc
commit 6bd025942c
15 changed files with 132 additions and 815 deletions

View file

@ -32,7 +32,6 @@
#include <isc/dir.h>
#include <isc/file.h>
#include <isc/hash.h>
#include <isc/hp.h>
#include <isc/httpd.h>
#include <isc/managers.h>
#include <isc/netmgr.h>

View file

@ -38,7 +38,6 @@ libisc_la_HEADERS = \
include/isc/heap.h \
include/isc/hex.h \
include/isc/hmac.h \
include/isc/hp.h \
include/isc/ht.h \
include/isc/httpd.h \
include/isc/interfaceiter.h \
@ -67,7 +66,6 @@ libisc_la_HEADERS = \
include/isc/pool.h \
include/isc/portset.h \
include/isc/print.h \
include/isc/queue.h \
include/isc/quota.h \
include/isc/radix.h \
include/isc/random.h \
@ -145,7 +143,6 @@ libisc_la_SOURCES = \
heap.c \
hex.c \
hmac.c \
hp.c \
ht.c \
httpd.c \
interfaceiter.c \
@ -173,7 +170,6 @@ libisc_la_SOURCES = \
parseint.c \
pool.c \
portset.c \
queue.c \
quota.c \
radix.c \
random.c \

View file

@ -1,225 +0,0 @@
/*
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
*
* SPDX-License-Identifier: MPL-2.0
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, you can obtain one at https://mozilla.org/MPL/2.0/.
*
* See the COPYRIGHT file distributed with this work for additional
* information regarding copyright ownership.
*/
/*
* Hazard Pointer implementation.
*
* This work is based on C++ code available from:
* https://github.com/pramalhe/ConcurrencyFreaks/
*
* Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Concurrency Freaks nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER>
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <inttypes.h>
#include <isc/align.h>
#include <isc/atomic.h>
#include <isc/hp.h>
#include <isc/mem.h>
#include <isc/once.h>
#include <isc/os.h>
#include <isc/string.h>
#include <isc/thread.h>
#include <isc/util.h>
static int isc__hp_max_threads = 1;
typedef struct retirelist {
int size;
uintptr_t *list;
} retirelist_t;
typedef atomic_uintptr_t isc_hp_uintptr_t;
struct isc_hp {
int max_hps;
int max_retired;
isc_mem_t *mctx;
isc_hp_deletefunc_t *deletefunc;
isc_hp_uintptr_t **hp;
retirelist_t **rl;
};
static inline int
tid(void) {
return (isc_tid_v);
}
void
isc_hp_init(int max_threads) {
REQUIRE(max_threads > 0);
if (isc__hp_max_threads > max_threads) {
return;
}
isc__hp_max_threads = max_threads;
}
isc_hp_t *
isc_hp_new(isc_mem_t *mctx, size_t max_hps, isc_hp_deletefunc_t *deletefunc) {
isc_hp_t *hp = isc_mem_get(mctx, sizeof(*hp));
REQUIRE(isc__hp_max_threads > 0);
REQUIRE(max_hps > 0);
*hp = (isc_hp_t){
.max_hps = max_hps,
.max_retired = isc__hp_max_threads * max_hps,
.deletefunc = deletefunc,
};
isc_mem_attach(mctx, &hp->mctx);
hp->hp = isc_mem_get(mctx, isc__hp_max_threads * sizeof(hp->hp[0]));
for (int i = 0; i < isc__hp_max_threads; i++) {
isc_hp_uintptr_t *hps;
hps = isc_mem_get_aligned(mctx, hp->max_hps * sizeof(*hps),
isc_os_cacheline());
for (int j = 0; j < hp->max_hps; j++) {
atomic_init(&hps[j], 0);
}
hp->hp[i] = hps;
}
hp->rl = isc_mem_get(mctx, isc__hp_max_threads * sizeof(hp->rl[0]));
for (int i = 0; i < isc__hp_max_threads; i++) {
retirelist_t *rl;
rl = isc_mem_get_aligned(mctx, sizeof(*rl), isc_os_cacheline());
rl->size = 0;
rl->list = isc_mem_get(hp->mctx,
hp->max_retired * sizeof(uintptr_t));
memset(rl->list, 0, hp->max_retired * sizeof(uintptr_t));
hp->rl[i] = rl;
}
return (hp);
}
void
isc_hp_destroy(isc_hp_t *hp) {
for (int i = 0; i < isc__hp_max_threads; i++) {
retirelist_t *rl = hp->rl[i];
for (int j = 0; j < rl->size; j++) {
void *data = (void *)rl->list[j];
hp->deletefunc(data);
}
isc_mem_put(hp->mctx, rl->list,
hp->max_retired * sizeof(uintptr_t));
isc_mem_put_aligned(hp->mctx, rl, sizeof(*rl),
isc_os_cacheline());
}
for (int i = 0; i < isc__hp_max_threads; i++) {
isc_hp_uintptr_t *hps = hp->hp[i];
isc_mem_put_aligned(hp->mctx, hps, hp->max_hps * sizeof(*hps),
isc_os_cacheline());
}
isc_mem_put(hp->mctx, hp->hp, isc__hp_max_threads * sizeof(hp->hp[0]));
isc_mem_put(hp->mctx, hp->rl, isc__hp_max_threads * sizeof(hp->rl[0]));
isc_mem_putanddetach(&hp->mctx, hp, sizeof(*hp));
}
void
isc_hp_clear(isc_hp_t *hp) {
for (int i = 0; i < hp->max_hps; i++) {
atomic_store_release(&hp->hp[tid()][i], 0);
}
}
void
isc_hp_clear_one(isc_hp_t *hp, int ihp) {
atomic_store_release(&hp->hp[tid()][ihp], 0);
}
uintptr_t
isc_hp_protect(isc_hp_t *hp, int ihp, atomic_uintptr_t *atom) {
uintptr_t n = 0;
uintptr_t ret;
while ((ret = atomic_load(atom)) != n) {
atomic_store(&hp->hp[tid()][ihp], ret);
n = ret;
}
return (ret);
}
uintptr_t
isc_hp_protect_ptr(isc_hp_t *hp, int ihp, atomic_uintptr_t ptr) {
atomic_store(&hp->hp[tid()][ihp], atomic_load(&ptr));
return (atomic_load(&ptr));
}
uintptr_t
isc_hp_protect_release(isc_hp_t *hp, int ihp, atomic_uintptr_t ptr) {
atomic_store_release(&hp->hp[tid()][ihp], atomic_load(&ptr));
return (atomic_load(&ptr));
}
void
isc_hp_retire(isc_hp_t *hp, uintptr_t ptr) {
retirelist_t *rl = hp->rl[tid()];
rl->list[rl->size++] = ptr;
INSIST(rl->size < hp->max_retired);
for (int iret = 0; iret < rl->size; iret++) {
uintptr_t obj = rl->list[iret];
bool can_delete = true;
for (int itid = 0; itid < isc__hp_max_threads && can_delete;
itid++) {
for (int ihp = hp->max_hps - 1; ihp >= 0; ihp--) {
if (atomic_load(&hp->hp[itid][ihp]) == obj) {
can_delete = false;
break;
}
}
}
if (can_delete) {
size_t bytes = (rl->size - iret) * sizeof(rl->list[0]);
memmove(&rl->list[iret], &rl->list[iret + 1], bytes);
rl->size--;
hp->deletefunc((void *)obj);
}
}
}

View file

@ -1,142 +0,0 @@
/*
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
*
* SPDX-License-Identifier: MPL-2.0
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, you can obtain one at https://mozilla.org/MPL/2.0/.
*
* See the COPYRIGHT file distributed with this work for additional
* information regarding copyright ownership.
*/
/*
* Hazard Pointer implementation.
*
* This work is based on C++ code available from:
* https://github.com/pramalhe/ConcurrencyFreaks/
*
* Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Concurrency Freaks nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER>
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include <isc/atomic.h>
#include <isc/mem.h>
#include <isc/string.h>
#include <isc/types.h>
#include <isc/util.h>
/*%
* Hazard pointers are a mechanism for protecting objects in memory
* from being deleted by other threads while in use. This allows
* safe lock-free data structures.
*
* This is an adaptation of the ConcurrencyFreaks implementation in C.
* More details available at https://github.com/pramalhe/ConcurrencyFreaks,
* in the file HazardPointers.hpp.
*/
typedef void(isc_hp_deletefunc_t)(void *);
void
isc_hp_init(int max_threads);
/*%<
* Initialize hazard pointer constants, isc__hp_max_threads and
* isc__hp_max_retired. If more threads try to access hp, it
* will assert. Calling this function repeatedly can be used
* to increase the limits, but cannot reduce them.
*/
isc_hp_t *
isc_hp_new(isc_mem_t *mctx, size_t max_hps, isc_hp_deletefunc_t *deletefunc);
/*%<
* Create a new hazard pointer array of size 'max_hps' (or a reasonable
* default value if 'max_hps' is 0). The function 'deletefunc' will be
* used to delete objects protected by hazard pointers when it becomes
* safe to retire them.
*/
void
isc_hp_destroy(isc_hp_t *hp);
/*%<
* Destroy a hazard pointer array and clean up all objects protected
* by hazard pointers.
*/
void
isc_hp_clear(isc_hp_t *hp);
/*%<
* Clear all hazard pointers in the array for the current thread.
*
* Progress condition: wait-free bounded (by max_hps)
*/
void
isc_hp_clear_one(isc_hp_t *hp, int ihp);
/*%<
* Clear a specified hazard pointer in the array for the current thread.
*
* Progress condition: wait-free population oblivious.
*/
uintptr_t
isc_hp_protect(isc_hp_t *hp, int ihp, atomic_uintptr_t *atom);
/*%<
* Protect an object referenced by 'atom' with a hazard pointer for the
* current thread.
*
* Progress condition: lock-free.
*/
uintptr_t
isc_hp_protect_ptr(isc_hp_t *hp, int ihp, atomic_uintptr_t ptr);
/*%<
* This returns the same value that is passed as ptr, which is sometimes
* useful.
*
* Progress condition: wait-free population oblivious.
*/
uintptr_t
isc_hp_protect_release(isc_hp_t *hp, int ihp, atomic_uintptr_t ptr);
/*%<
* Same as isc_hp_protect_ptr(), but explicitly uses memory_order_release.
*
* Progress condition: wait-free population oblivious.
*/
void
isc_hp_retire(isc_hp_t *hp, uintptr_t ptr);
/*%<
* Retire an object that is no longer in use by any thread, calling
* the delete function that was specified in isc_hp_new().
*
* Progress condition: wait-free bounded (by the number of threads squared)
*/

View file

@ -198,3 +198,17 @@
__ISC_LIST_UNLINKUNSAFE_TYPE(list, elt, link, void)
#define __ISC_LIST_DEQUEUEUNSAFE_TYPE(list, elt, link, type) \
__ISC_LIST_UNLINKUNSAFE_TYPE(list, elt, link, type)
#define ISC_LIST_MOVEUNSAFE(dest, src) \
{ \
(dest).head = (src).head; \
(dest).tail = (src).tail; \
(src).head = NULL; \
(src).tail = NULL; \
}
#define ISC_LIST_MOVE(dest, src) \
{ \
INSIST(ISC_LIST_EMPTY(dest)); \
ISC_LIST_MOVEUNSAFE(dest, src); \
}

View file

@ -1,56 +0,0 @@
/*
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
*
* SPDX-License-Identifier: MPL-2.0
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, you can obtain one at https://mozilla.org/MPL/2.0/.
*
* See the COPYRIGHT file distributed with this work for additional
* information regarding copyright ownership.
*/
#pragma once
#include <isc/mem.h>
typedef struct isc_queue isc_queue_t;
isc_queue_t *
isc_queue_new(isc_mem_t *mctx);
/*%<
* Create a new fetch-and-add array queue.
*
* 'max_threads' is currently unused. In the future it can be used
* to pass a maximum threads parameter when creating hazard pointers,
* but currently `isc_hp_t` uses a hard-coded value.
*/
void
isc_queue_enqueue(isc_queue_t *queue, uintptr_t item);
/*%<
* Enqueue an object pointer 'item' at the tail of the queue.
*
* Requires:
* \li 'item' is not null.
*/
uintptr_t
isc_queue_dequeue(isc_queue_t *queue);
/*%<
* Remove an object pointer from the head of the queue and return the
* pointer. If the queue is empty, return `nulluintptr` (the uintptr_t
* representation of NULL).
*
* Requires:
* \li 'queue' is not null.
*/
void
isc_queue_destroy(isc_queue_t *queue);
/*%<
* Destroy a queue.
*
* Requires:
* \li 'queue' is not null.
*/

View file

@ -44,11 +44,9 @@ typedef struct isc_counter isc_counter_t; /*%< Counter */
typedef int16_t isc_dscp_t; /*%< Diffserv code point */
typedef struct isc_event isc_event_t; /*%< Event */
typedef ISC_LIST(isc_event_t) isc_eventlist_t; /*%< Event List */
typedef unsigned int isc_eventtype_t; /*%< Event Type */
typedef uint32_t isc_fsaccess_t; /*%< FS Access */
typedef struct isc_hash isc_hash_t; /*%< Hash */
typedef struct isc_hp isc_hp_t; /*%< Hazard
* pointer */
typedef unsigned int isc_eventtype_t; /*%< Event Type */
typedef uint32_t isc_fsaccess_t; /*%< FS Access */
typedef struct isc_hash isc_hash_t; /*%< Hash */
typedef struct isc_httpd isc_httpd_t; /*%< HTTP client */
typedef void(isc_httpdfree_t)(isc_buffer_t *, void *); /*%< HTTP free function
*/

View file

@ -11,7 +11,6 @@
* information regarding copyright ownership.
*/
#include <isc/hp.h>
#include <isc/managers.h>
#include <isc/util.h>
@ -28,15 +27,6 @@ isc_managers_create(isc_mem_t *mctx, size_t workers, size_t quantum,
isc_taskmgr_t *taskmgr = NULL;
isc_timermgr_t *timermgr = NULL;
/*
* Currently, there are:
* - 1 main thread
* - 1 timer thread
* - n netmgr threads
* - n threadpool threads
*/
isc_hp_init(2 + 2 * workers);
REQUIRE(netmgrp != NULL && *netmgrp == NULL);
isc__netmgr_create(mctx, workers, &netmgr);
*netmgrp = netmgr;

View file

@ -27,7 +27,6 @@
#include <isc/magic.h>
#include <isc/mem.h>
#include <isc/netmgr.h>
#include <isc/queue.h>
#include <isc/quota.h>
#include <isc/random.h>
#include <isc/refcount.h>
@ -201,6 +200,17 @@ typedef enum {
NETIEVENT_MAX = 4,
} netievent_type_t;
typedef struct isc__nm_uvreq isc__nm_uvreq_t;
typedef struct isc__netievent isc__netievent_t;
typedef ISC_LIST(isc__netievent_t) isc__netievent_list_t;
typedef struct ievent {
isc_mutex_t lock;
isc_condition_t cond;
isc__netievent_list_t list;
} ievent_t;
/*
* Single network event loop worker.
*/
@ -210,13 +220,10 @@ typedef struct isc__networker {
uv_loop_t loop; /* libuv loop structure */
uv_async_t async; /* async channel to send
* data to this networker */
isc_mutex_t lock;
bool paused;
bool finished;
isc_thread_t thread;
isc_queue_t *ievents[NETIEVENT_MAX];
atomic_uint_fast32_t nievents[NETIEVENT_MAX];
isc_condition_t cond_prio;
ievent_t ievents[NETIEVENT_MAX];
isc_refcount_t references;
atomic_int_fast64_t pktcount;
@ -421,12 +428,13 @@ isc__nm_put_netievent(isc_nm_t *mgr, void *ievent);
* either in netmgr.c or matching protocol file (e.g. udp.c, tcp.c, etc.)
*/
#define NETIEVENT__SOCKET \
isc__netievent_type type; \
isc_nmsocket_t *sock; \
const char *file; \
unsigned int line; \
const char *func
#define NETIEVENT__SOCKET \
isc__netievent_type type; \
ISC_LINK(isc__netievent_t) link; \
isc_nmsocket_t *sock; \
const char *file; \
unsigned int line; \
const char *func;
typedef struct isc__netievent__socket {
NETIEVENT__SOCKET;
@ -489,8 +497,7 @@ typedef struct isc__netievent__socket_req {
}
typedef struct isc__netievent__socket_req_result {
isc__netievent_type type;
isc_nmsocket_t *sock;
NETIEVENT__SOCKET;
isc__nm_uvreq_t *req;
isc_result_t result;
} isc__netievent__socket_req_result_t;
@ -589,6 +596,7 @@ typedef struct isc__netievent__socket_quota {
typedef struct isc__netievent__task {
isc__netievent_type type;
ISC_LINK(isc__netievent_t) link;
isc_task_t *task;
} isc__netievent__task_t;
@ -624,8 +632,7 @@ typedef struct isc__netievent_udpsend {
} isc__netievent_udpsend_t;
typedef struct isc__netievent_tlsconnect {
isc__netievent_type type;
isc_nmsocket_t *sock;
NETIEVENT__SOCKET;
SSL_CTX *ctx;
isc_sockaddr_t local; /* local address */
isc_sockaddr_t peer; /* peer address */
@ -633,6 +640,7 @@ typedef struct isc__netievent_tlsconnect {
typedef struct isc__netievent {
isc__netievent_type type;
ISC_LINK(isc__netievent_t) link;
} isc__netievent_t;
#define NETIEVENT_TYPE(type) typedef isc__netievent_t isc__netievent_##type##_t;

View file

@ -21,6 +21,7 @@
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/errno.h>
#include <isc/list.h>
#include <isc/log.h>
#include <isc/magic.h>
#include <isc/mem.h>
@ -145,6 +146,7 @@ static isc_threadresult_t
nm_thread(isc_threadarg_t worker0);
static void
async_cb(uv_async_t *handle);
static bool
process_netievent(isc__networker_t *worker, isc__netievent_t *ievent);
static isc_result_t
@ -154,51 +156,6 @@ wait_for_priority_queue(isc__networker_t *worker);
static void
drain_queue(isc__networker_t *worker, netievent_type_t type);
#define ENQUEUE_NETIEVENT(worker, queue, event) \
isc_queue_enqueue(worker->ievents[queue], (uintptr_t)event)
#define DEQUEUE_NETIEVENT(worker, queue) \
(isc__netievent_t *)isc_queue_dequeue(worker->ievents[queue])
#define ENQUEUE_PRIORITY_NETIEVENT(worker, event) \
ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY, event)
#define ENQUEUE_PRIVILEGED_NETIEVENT(worker, event) \
ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED, event)
#define ENQUEUE_TASK_NETIEVENT(worker, event) \
ENQUEUE_NETIEVENT(worker, NETIEVENT_TASK, event)
#define ENQUEUE_NORMAL_NETIEVENT(worker, event) \
ENQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL, event)
#define DEQUEUE_PRIORITY_NETIEVENT(worker) \
DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY)
#define DEQUEUE_PRIVILEGED_NETIEVENT(worker) \
DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
#define DEQUEUE_TASK_NETIEVENT(worker) DEQUEUE_NETIEVENT(worker, NETIEVENT_TASK)
#define DEQUEUE_NORMAL_NETIEVENT(worker) \
DEQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL)
#define INCREMENT_NETIEVENT(worker, queue) \
atomic_fetch_add_release(&worker->nievents[queue], 1)
#define DECREMENT_NETIEVENT(worker, queue) \
atomic_fetch_sub_release(&worker->nievents[queue], 1)
#define INCREMENT_PRIORITY_NETIEVENT(worker) \
INCREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY)
#define INCREMENT_PRIVILEGED_NETIEVENT(worker) \
INCREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
#define INCREMENT_TASK_NETIEVENT(worker) \
INCREMENT_NETIEVENT(worker, NETIEVENT_TASK)
#define INCREMENT_NORMAL_NETIEVENT(worker) \
INCREMENT_NETIEVENT(worker, NETIEVENT_NORMAL)
#define DECREMENT_PRIORITY_NETIEVENT(worker) \
DECREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY)
#define DECREMENT_PRIVILEGED_NETIEVENT(worker) \
DECREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED)
#define DECREMENT_TASK_NETIEVENT(worker) \
DECREMENT_NETIEVENT(worker, NETIEVENT_TASK)
#define DECREMENT_NORMAL_NETIEVENT(worker) \
DECREMENT_NETIEVENT(worker, NETIEVENT_NORMAL)
static void
isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0);
static void
@ -311,12 +268,10 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
r = uv_async_init(&worker->loop, &worker->async, async_cb);
UV_RUNTIME_CHECK(uv_async_init, r);
isc_mutex_init(&worker->lock);
isc_condition_init(&worker->cond_prio);
for (size_t type = 0; type < NETIEVENT_MAX; type++) {
worker->ievents[type] = isc_queue_new(mgr->mctx);
atomic_init(&worker->nievents[type], 0);
isc_mutex_init(&worker->ievents[type].lock);
isc_condition_init(&worker->ievents[type].cond);
ISC_LIST_INIT(worker->ievents[type].list);
}
worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE);
@ -367,28 +322,15 @@ nm_destroy(isc_nm_t **mgr0) {
for (int i = 0; i < mgr->nworkers; i++) {
isc__networker_t *worker = &mgr->workers[i];
isc__netievent_t *ievent = NULL;
int r;
/* Empty the async event queues */
while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) {
isc__nm_put_netievent(mgr, ievent);
}
INSIST(DEQUEUE_PRIVILEGED_NETIEVENT(worker) == NULL);
INSIST(DEQUEUE_TASK_NETIEVENT(worker) == NULL);
while ((ievent = DEQUEUE_NORMAL_NETIEVENT(worker)) != NULL) {
isc__nm_put_netievent(mgr, ievent);
}
isc_condition_destroy(&worker->cond_prio);
isc_mutex_destroy(&worker->lock);
r = uv_loop_close(&worker->loop);
UV_RUNTIME_CHECK(uv_loop_close, r);
for (size_t type = 0; type < NETIEVENT_MAX; type++) {
isc_queue_destroy(worker->ievents[type]);
INSIST(ISC_LIST_EMPTY(worker->ievents[type].list));
isc_condition_destroy(&worker->ievents[type].cond);
isc_mutex_destroy(&worker->ievents[type].lock);
}
isc_mem_put(mgr->mctx, worker->sendbuf,
@ -737,13 +679,17 @@ nm_thread(isc_threadarg_t worker0) {
}
/*
* We are shutting down. Process the task queues
* (they may include shutdown events) but do not process
* the netmgr event queue.
* We are shutting down. Drain the queues.
*/
drain_queue(worker, NETIEVENT_PRIVILEGED);
drain_queue(worker, NETIEVENT_TASK);
for (size_t type = 0; type < NETIEVENT_MAX; type++) {
LOCK(&worker->ievents[type].lock);
INSIST(ISC_LIST_EMPTY(worker->ievents[type].list));
UNLOCK(&worker->ievents[type].lock);
}
LOCK(&mgr->lock);
mgr->workers_running--;
SIGNAL(&mgr->wkstatecond);
@ -765,7 +711,8 @@ process_all_queues(isc__networker_t *worker) {
isc_result_t result = process_queue(worker, type);
switch (result) {
case ISC_R_SUSPEND:
return (true);
reschedule = true;
break;
case ISC_R_EMPTY:
/* empty queue */
break;
@ -859,35 +806,29 @@ isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) {
static void
wait_for_priority_queue(isc__networker_t *worker) {
isc_condition_t *cond = &worker->cond_prio;
bool wait_for_work = true;
isc_condition_t *cond = &worker->ievents[NETIEVENT_PRIORITY].cond;
isc_mutex_t *lock = &worker->ievents[NETIEVENT_PRIORITY].lock;
isc__netievent_list_t *list =
&(worker->ievents[NETIEVENT_PRIORITY].list);
while (true) {
isc__netievent_t *ievent;
LOCK(&worker->lock);
ievent = DEQUEUE_PRIORITY_NETIEVENT(worker);
if (wait_for_work) {
while (ievent == NULL) {
WAIT(cond, &worker->lock);
ievent = DEQUEUE_PRIORITY_NETIEVENT(worker);
}
}
UNLOCK(&worker->lock);
wait_for_work = false;
if (ievent == NULL) {
return;
}
DECREMENT_PRIORITY_NETIEVENT(worker);
(void)process_netievent(worker, ievent);
LOCK(lock);
while (ISC_LIST_EMPTY(*list)) {
WAIT(cond, lock);
}
UNLOCK(lock);
drain_queue(worker, NETIEVENT_PRIORITY);
}
static void
drain_queue(isc__networker_t *worker, netievent_type_t type) {
while (process_queue(worker, type) != ISC_R_EMPTY) {
;
bool empty = false;
while (!empty) {
if (process_queue(worker, type) == ISC_R_EMPTY) {
LOCK(&worker->ievents[type].lock);
empty = ISC_LIST_EMPTY(worker->ievents[type].list);
UNLOCK(&worker->ievents[type].lock);
}
}
}
@ -995,40 +936,41 @@ process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) {
static isc_result_t
process_queue(isc__networker_t *worker, netievent_type_t type) {
/*
* The number of items on the queue is only loosely synchronized with
* the items on the queue. But there's a guarantee that if there's an
* item on the queue, it will be accounted for. However there's a
* possibility that the counter might be higher than the items on the
* queue stored.
*/
uint_fast32_t waiting = atomic_load_acquire(&worker->nievents[type]);
isc__netievent_t *ievent = DEQUEUE_NETIEVENT(worker, type);
isc__netievent_t *ievent = NULL;
isc__netievent_list_t list;
if (ievent == NULL && waiting == 0) {
ISC_LIST_INIT(list);
LOCK(&worker->ievents[type].lock);
ISC_LIST_MOVE(list, worker->ievents[type].list);
UNLOCK(&worker->ievents[type].lock);
ievent = ISC_LIST_HEAD(list);
if (ievent == NULL) {
/* There's nothing scheduled */
return (ISC_R_EMPTY);
} else if (ievent == NULL) {
/* There's at least one item scheduled, but not on the queue yet
*/
return (ISC_R_SUCCESS);
}
while (ievent != NULL) {
DECREMENT_NETIEVENT(worker, type);
bool stop = !process_netievent(worker, ievent);
isc__netievent_t *next = ISC_LIST_NEXT(ievent, link);
ISC_LIST_DEQUEUE(list, ievent, link);
if (stop) {
/* Netievent told us to stop */
if (!process_netievent(worker, ievent)) {
/* The netievent told us to stop */
if (!ISC_LIST_EMPTY(list)) {
/*
* Reschedule the rest of the unprocessed
* events.
*/
LOCK(&worker->ievents[type].lock);
ISC_LIST_PREPENDLIST(worker->ievents[type].list,
list, link);
UNLOCK(&worker->ievents[type].lock);
}
return (ISC_R_SUSPEND);
}
if (waiting-- == 0) {
/* We reached this round "quota" */
break;
}
ievent = DEQUEUE_NETIEVENT(worker, type);
ievent = next;
}
/* We processed at least one */
@ -1041,6 +983,7 @@ isc__nm_get_netievent(isc_nm_t *mgr, isc__netievent_type type) {
sizeof(*event));
*event = (isc__netievent_storage_t){ .ni.type = type };
ISC_LINK_INIT(&(event->ni), link);
return (event);
}
@ -1130,26 +1073,39 @@ isc__nm_maybe_enqueue_ievent(isc__networker_t *worker,
void
isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {
netievent_type_t type;
if (event->type > netievent_prio) {
/*
* We need to make sure this signal will be delivered and
* the queue will be processed.
*/
LOCK(&worker->lock);
INCREMENT_PRIORITY_NETIEVENT(worker);
ENQUEUE_PRIORITY_NETIEVENT(worker, event);
SIGNAL(&worker->cond_prio);
UNLOCK(&worker->lock);
} else if (event->type == netievent_privilegedtask) {
INCREMENT_PRIVILEGED_NETIEVENT(worker);
ENQUEUE_PRIVILEGED_NETIEVENT(worker, event);
} else if (event->type == netievent_task) {
INCREMENT_TASK_NETIEVENT(worker);
ENQUEUE_TASK_NETIEVENT(worker, event);
type = NETIEVENT_PRIORITY;
} else {
INCREMENT_NORMAL_NETIEVENT(worker);
ENQUEUE_NORMAL_NETIEVENT(worker, event);
switch (event->type) {
case netievent_prio:
INSIST(0);
ISC_UNREACHABLE();
break;
case netievent_privilegedtask:
type = NETIEVENT_PRIVILEGED;
break;
case netievent_task:
type = NETIEVENT_TASK;
break;
default:
type = NETIEVENT_NORMAL;
break;
}
}
/*
* We need to make sure this signal will be delivered and
* the queue will be processed.
*/
LOCK(&worker->ievents[type].lock);
ISC_LIST_ENQUEUE(worker->ievents[type].list, event, link);
if (type == NETIEVENT_PRIORITY) {
SIGNAL(&worker->ievents[type].cond);
}
UNLOCK(&worker->ievents[type].lock);
uv_async_send(&worker->async);
}
@ -2927,6 +2883,7 @@ shutdown_walk_cb(uv_handle_t *handle, void *arg) {
void
isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ev0) {
UNUSED(ev0);
uv_walk(&worker->loop, shutdown_walk_cb, NULL);
}

View file

@ -1,214 +0,0 @@
/*
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
*
* SPDX-License-Identifier: MPL-2.0
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, you can obtain one at https://mozilla.org/MPL/2.0/.
*
* See the COPYRIGHT file distributed with this work for additional
* information regarding copyright ownership.
*/
#include <inttypes.h>
#include <isc/align.h>
#include <isc/atomic.h>
#include <isc/hp.h>
#include <isc/mem.h>
#include <isc/os.h>
#include <isc/queue.h>
#include <isc/string.h>
#define BUFFER_SIZE 1024
static uintptr_t nulluintptr = (uintptr_t)NULL;
typedef struct node {
atomic_uint_fast32_t deqidx;
atomic_uintptr_t items[BUFFER_SIZE];
atomic_uint_fast32_t enqidx;
atomic_uintptr_t next;
isc_mem_t *mctx;
} node_t;
/* we just need one Hazard Pointer */
#define HP_TAIL 0
#define HP_HEAD 0
struct isc_queue {
alignas(ISC_OS_CACHELINE_SIZE) atomic_uintptr_t head;
alignas(ISC_OS_CACHELINE_SIZE) atomic_uintptr_t tail;
isc_mem_t *mctx;
int taken;
isc_hp_t *hp;
};
static node_t *
node_new(isc_mem_t *mctx, uintptr_t item) {
node_t *node = isc_mem_get(mctx, sizeof(*node));
*node = (node_t){ .mctx = NULL };
atomic_init(&node->deqidx, 0);
atomic_init(&node->enqidx, 1);
atomic_init(&node->next, 0);
atomic_init(&node->items[0], item);
for (int i = 1; i < BUFFER_SIZE; i++) {
atomic_init(&node->items[i], 0);
}
isc_mem_attach(mctx, &node->mctx);
return (node);
}
static void
node_destroy(void *node0) {
node_t *node = (node_t *)node0;
isc_mem_putanddetach(&node->mctx, node, sizeof(*node));
}
static bool
node_cas_next(node_t *node, node_t *cmp, const node_t *val) {
return (atomic_compare_exchange_strong(&node->next, (uintptr_t *)&cmp,
(uintptr_t)val));
}
static bool
queue_cas_tail(isc_queue_t *queue, node_t *cmp, const node_t *val) {
return (atomic_compare_exchange_strong(&queue->tail, (uintptr_t *)&cmp,
(uintptr_t)val));
}
static bool
queue_cas_head(isc_queue_t *queue, node_t *cmp, const node_t *val) {
return (atomic_compare_exchange_strong(&queue->head, (uintptr_t *)&cmp,
(uintptr_t)val));
}
isc_queue_t *
isc_queue_new(isc_mem_t *mctx) {
isc_queue_t *queue = NULL;
node_t *sentinel = NULL;
queue = isc_mem_get_aligned(mctx, sizeof(*queue), isc_os_cacheline());
*queue = (isc_queue_t){ 0 };
isc_mem_attach(mctx, &queue->mctx);
queue->hp = isc_hp_new(mctx, 1, node_destroy);
sentinel = node_new(mctx, nulluintptr);
atomic_init(&sentinel->enqidx, 0);
atomic_init(&queue->head, (uintptr_t)sentinel);
atomic_init(&queue->tail, (uintptr_t)sentinel);
return (queue);
}
void
isc_queue_enqueue(isc_queue_t *queue, uintptr_t item) {
REQUIRE(item != nulluintptr);
while (true) {
node_t *lt = NULL;
uint_fast32_t idx;
uintptr_t n = nulluintptr;
lt = (node_t *)isc_hp_protect(queue->hp, HP_TAIL, &queue->tail);
idx = atomic_fetch_add(&lt->enqidx, 1);
if (idx > BUFFER_SIZE - 1) {
node_t *lnext = NULL;
if (lt != (node_t *)atomic_load(&queue->tail)) {
continue;
}
lnext = (node_t *)atomic_load(&lt->next);
if (lnext == NULL) {
node_t *newnode = node_new(queue->mctx, item);
if (node_cas_next(lt, NULL, newnode)) {
queue_cas_tail(queue, lt, newnode);
isc_hp_clear(queue->hp);
return;
}
node_destroy(newnode);
} else {
queue_cas_tail(queue, lt, lnext);
}
continue;
}
if (atomic_compare_exchange_strong(&lt->items[idx], &n, item)) {
isc_hp_clear(queue->hp);
return;
}
}
}
uintptr_t
isc_queue_dequeue(isc_queue_t *queue) {
REQUIRE(queue != NULL);
while (true) {
node_t *lh = NULL;
uint_fast32_t idx;
uintptr_t item;
lh = (node_t *)isc_hp_protect(queue->hp, HP_HEAD, &queue->head);
if (atomic_load(&lh->deqidx) >= atomic_load(&lh->enqidx) &&
atomic_load(&lh->next) == nulluintptr)
{
break;
}
idx = atomic_fetch_add(&lh->deqidx, 1);
if (idx > BUFFER_SIZE - 1) {
node_t *lnext = (node_t *)atomic_load(&lh->next);
if (lnext == NULL) {
break;
}
if (queue_cas_head(queue, lh, lnext)) {
isc_hp_retire(queue->hp, (uintptr_t)lh);
}
continue;
}
item = atomic_exchange(&(lh->items[idx]),
(uintptr_t)&queue->taken);
if (item == nulluintptr) {
continue;
}
isc_hp_clear(queue->hp);
return (item);
}
isc_hp_clear(queue->hp);
return (nulluintptr);
}
void
isc_queue_destroy(isc_queue_t *queue) {
node_t *last = NULL;
REQUIRE(queue != NULL);
while (isc_queue_dequeue(queue) != nulluintptr) {
/* do nothing */
}
last = (node_t *)atomic_load_relaxed(&queue->head);
node_destroy(last);
isc_hp_destroy(queue->hp);
isc_mem_putanddetach_aligned(&queue->mctx, queue, sizeof(*queue),
isc_os_cacheline());
}

View file

@ -200,7 +200,7 @@ isc_task_create(isc_taskmgr_t *manager, unsigned int quantum,
isc_result_t
isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum,
isc_task_t **taskp, int threadid) {
isc_task_t *task;
isc_task_t *task = NULL;
bool exiting;
REQUIRE(VALID_MANAGER(manager));

View file

@ -29,7 +29,6 @@
#include <isc/atomic.h>
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/hp.h>
#include <isc/mutex.h>
#include <isc/netmgr.h>
#include <isc/nonce.h>
@ -259,8 +258,6 @@ _setup(void **state) {
return (-1);
}
isc_hp_init(4 * workers);
signal(SIGPIPE, SIG_IGN);
return (0);

View file

@ -21,7 +21,6 @@
#include <isc/buffer.h>
#include <isc/hash.h>
#include <isc/hp.h>
#include <isc/managers.h>
#include <isc/mem.h>
#include <isc/os.h>
@ -78,7 +77,6 @@ create_managers(unsigned int workers) {
}
INSIST(workers != 0);
isc_hp_init(6 * workers);
isc_managers_create(test_mctx, workers, 0, &netmgr, &taskmgr,
&timermgr);

View file

@ -23,7 +23,6 @@
#define UNIT_TESTING
#include <cmocka.h>
#include <isc/hp.h>
#include <isc/nonce.h>
#include <isc/os.h>
#include <isc/quota.h>
@ -209,8 +208,6 @@ _setup(void **state __attribute__((unused))) {
return (-1);
}
isc_hp_init(4 * workers);
signal(SIGPIPE, SIG_IGN);
if (getenv("CI") == NULL || getenv("CI_ENABLE_ALL_TESTS") != NULL) {