mirror of
https://github.com/isc-projects/bind9.git
synced 2026-06-09 10:32:13 -04:00
Merge branch '3961-add-fast-isc_loop_run' into 'main'
Refactor isc_job_run to not-make any allocations Closes #3961 See merge request isc-projects/bind9!7722
This commit is contained in:
commit
ea8e00e7a5
32 changed files with 363 additions and 337 deletions
9
CHANGES
9
CHANGES
|
|
@ -1,3 +1,12 @@
|
|||
6133. [cleanup] Refactor the isc_job_run() to not make any allocations
|
||||
by embedding isc_job_t into callback argument, and
|
||||
running it directly. As a side-effect, isc_async_run
|
||||
and isc_job_run now executes jobs in the natural order.
|
||||
|
||||
Use the new improved API to execute connect, read and
|
||||
send callbacks from netmgr in more straightforward
|
||||
manner, speeding up the networking. [GL #3961]
|
||||
|
||||
6132. [doc] Remove a dead link in the DNSSEC guide. [GL #3967]
|
||||
|
||||
6131. [test] Add a minimal test-only library to allow testing
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@
|
|||
#include <sys/types.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <isc/async.h>
|
||||
#include <isc/attributes.h>
|
||||
#include <isc/base64.h>
|
||||
#include <isc/buffer.h>
|
||||
|
|
@ -2168,7 +2169,7 @@ run_server(void *arg) {
|
|||
ns_client_request, ifp, accept_cb, ifp, 10,
|
||||
NULL, NULL, &ifp->tcplistensocket));
|
||||
ifp->flags |= NS_INTERFACEFLAG_LISTENING;
|
||||
isc_job_run(loopmgr, sendquery, ifp->tcplistensocket);
|
||||
isc_async_current(loopmgr, sendquery, ifp->tcplistensocket);
|
||||
|
||||
return;
|
||||
|
||||
|
|
|
|||
|
|
@ -16,11 +16,11 @@
|
|||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <isc/async.h>
|
||||
#include <isc/attributes.h>
|
||||
#include <isc/buffer.h>
|
||||
#include <isc/commandline.h>
|
||||
#include <isc/condition.h>
|
||||
#include <isc/job.h>
|
||||
#include <isc/loop.h>
|
||||
#include <isc/netaddr.h>
|
||||
#include <isc/parseint.h>
|
||||
|
|
@ -889,12 +889,13 @@ start_next_command(void);
|
|||
|
||||
static void
|
||||
process_next_command(void *arg __attribute__((__unused__))) {
|
||||
isc_loop_t *loop = isc_loop_main(loopmgr);
|
||||
if (cmdline == NULL) {
|
||||
in_use = false;
|
||||
} else {
|
||||
do_next_command(cmdline);
|
||||
if (ISC_LIST_HEAD(lookup_list) != NULL) {
|
||||
isc_job_run(loopmgr, run_loop, NULL);
|
||||
isc_async_run(loop, run_loop, NULL);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@
|
|||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <isc/async.h>
|
||||
#include <isc/atomic.h>
|
||||
#include <isc/attributes.h>
|
||||
#include <isc/base32.h>
|
||||
|
|
@ -41,7 +42,6 @@
|
|||
#include <isc/file.h>
|
||||
#include <isc/hash.h>
|
||||
#include <isc/hex.h>
|
||||
#include <isc/job.h>
|
||||
#include <isc/loop.h>
|
||||
#include <isc/managers.h>
|
||||
#include <isc/md.h>
|
||||
|
|
@ -1667,7 +1667,7 @@ assignwork(void *arg) {
|
|||
lock_and_dumpnode(dns_fixedname_name(&fname), node);
|
||||
dns_db_detachnode(gdb, &node);
|
||||
|
||||
isc_job_run(loopmgr, assignwork, NULL);
|
||||
isc_async_current(loopmgr, assignwork, NULL);
|
||||
}
|
||||
|
||||
/*%
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@
|
|||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <isc/async.h>
|
||||
#include <isc/attributes.h>
|
||||
#include <isc/base64.h>
|
||||
#include <isc/buffer.h>
|
||||
|
|
@ -28,7 +29,6 @@
|
|||
#include <isc/file.h>
|
||||
#include <isc/getaddresses.h>
|
||||
#include <isc/hash.h>
|
||||
#include <isc/job.h>
|
||||
#include <isc/lex.h>
|
||||
#include <isc/log.h>
|
||||
#include <isc/loop.h>
|
||||
|
|
@ -2424,7 +2424,7 @@ static void
|
|||
done_update(void) {
|
||||
ddebug("done_update()");
|
||||
|
||||
isc_job_run(loopmgr, getinput, NULL);
|
||||
isc_async_current(loopmgr, getinput, NULL);
|
||||
}
|
||||
|
||||
static void
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@
|
|||
struct dns_validator {
|
||||
unsigned int magic;
|
||||
dns_view_t *view;
|
||||
isc_loopmgr_t *loopmgr;
|
||||
isc_loop_t *loop;
|
||||
uint32_t tid;
|
||||
isc_refcount_t references;
|
||||
|
||||
|
|
@ -160,7 +160,7 @@ isc_result_t
|
|||
dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type,
|
||||
dns_rdataset_t *rdataset, dns_rdataset_t *sigrdataset,
|
||||
dns_message_t *message, unsigned int options,
|
||||
isc_loopmgr_t *loop, isc_job_cb cb, void *arg,
|
||||
isc_loop_t *loop, isc_job_cb cb, void *arg,
|
||||
dns_validator_t **validatorp);
|
||||
/*%<
|
||||
* Start a DNSSEC validation.
|
||||
|
|
|
|||
|
|
@ -977,7 +977,7 @@ valcreate(fetchctx_t *fctx, dns_message_t *message, dns_adbaddrinfo_t *addrinfo,
|
|||
|
||||
result = dns_validator_create(
|
||||
fctx->res->view, name, type, rdataset, sigrdataset, message,
|
||||
valoptions, fctx->res->loopmgr, validated, valarg, &validator);
|
||||
valoptions, fctx->loop, validated, valarg, &validator);
|
||||
RUNTIME_CHECK(result == ISC_R_SUCCESS);
|
||||
inc_stats(fctx->res, dns_resstatscounter_val);
|
||||
if ((valoptions & DNS_VALIDATOR_DEFER) == 0) {
|
||||
|
|
@ -10420,7 +10420,7 @@ dns_resolver_createfetch(dns_resolver_t *res, const dns_name_t *name,
|
|||
|
||||
if (new_fctx) {
|
||||
fetchctx_ref(fctx);
|
||||
isc_job_run(res->loopmgr, (isc_job_cb)fctx_start, fctx);
|
||||
isc_async_run(fctx->loop, (isc_job_cb)fctx_start, fctx);
|
||||
}
|
||||
|
||||
unlock:
|
||||
|
|
|
|||
|
|
@ -227,7 +227,7 @@ validator_done(dns_validator_t *val, isc_result_t result) {
|
|||
val->result = result;
|
||||
|
||||
dns_validator_ref(val);
|
||||
isc_job_run(val->loopmgr, validator_done_cb, val);
|
||||
isc_async_run(val->loop, validator_done_cb, val);
|
||||
}
|
||||
|
||||
/*%
|
||||
|
|
@ -950,8 +950,8 @@ create_fetch(dns_validator_t *val, dns_name_t *name, dns_rdatatype_t type,
|
|||
dns_validator_ref(val);
|
||||
return (dns_resolver_createfetch(
|
||||
val->view->resolver, name, type, NULL, NULL, NULL, NULL, 0,
|
||||
fopts, 0, NULL, isc_loop_current(val->loopmgr), callback, val,
|
||||
&val->frdataset, &val->fsigrdataset, &val->fetch));
|
||||
fopts, 0, NULL, val->loop, callback, val, &val->frdataset,
|
||||
&val->fsigrdataset, &val->fetch));
|
||||
}
|
||||
|
||||
/*%
|
||||
|
|
@ -981,7 +981,7 @@ create_validator(dns_validator_t *val, dns_name_t *name, dns_rdatatype_t type,
|
|||
|
||||
validator_logcreate(val, name, type, caller, "validator");
|
||||
result = dns_validator_create(val->view, name, type, rdataset, sig,
|
||||
NULL, vopts, val->loopmgr, cb, val,
|
||||
NULL, vopts, val->loop, cb, val,
|
||||
&val->subvalidator);
|
||||
if (result == ISC_R_SUCCESS) {
|
||||
dns_validator_attach(val, &val->subvalidator->parent);
|
||||
|
|
@ -2978,7 +2978,7 @@ isc_result_t
|
|||
dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type,
|
||||
dns_rdataset_t *rdataset, dns_rdataset_t *sigrdataset,
|
||||
dns_message_t *message, unsigned int options,
|
||||
isc_loopmgr_t *loopmgr, isc_job_cb cb, void *arg,
|
||||
isc_loop_t *loop, isc_job_cb cb, void *arg,
|
||||
dns_validator_t **validatorp) {
|
||||
isc_result_t result = ISC_R_FAILURE;
|
||||
dns_validator_t *val = NULL;
|
||||
|
|
@ -2997,7 +2997,7 @@ dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type,
|
|||
.type = type,
|
||||
.options = options,
|
||||
.link = ISC_LINK_INITIALIZER,
|
||||
.loopmgr = loopmgr,
|
||||
.loop = loop,
|
||||
.cb = cb,
|
||||
.arg = arg };
|
||||
|
||||
|
|
@ -3024,7 +3024,7 @@ dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type,
|
|||
|
||||
if ((options & DNS_VALIDATOR_DEFER) == 0) {
|
||||
dns_validator_ref(val);
|
||||
isc_job_run(val->loopmgr, validator_start, val);
|
||||
isc_async_run(val->loop, validator_start, val);
|
||||
}
|
||||
|
||||
*validatorp = val;
|
||||
|
|
@ -3050,7 +3050,7 @@ dns_validator_send(dns_validator_t *validator) {
|
|||
validator->options &= ~DNS_VALIDATOR_DEFER;
|
||||
|
||||
dns_validator_ref(validator);
|
||||
isc_job_run(validator->loopmgr, validator_start, validator);
|
||||
isc_async_run(validator->loop, validator_start, validator);
|
||||
}
|
||||
|
||||
void
|
||||
|
|
|
|||
|
|
@ -115,6 +115,7 @@ libisc_la_SOURCES = \
|
|||
ascii.c \
|
||||
assertions.c \
|
||||
async.c \
|
||||
async_p.h \
|
||||
backtrace.c \
|
||||
base32.c \
|
||||
base64.c \
|
||||
|
|
|
|||
|
|
@ -34,28 +34,62 @@
|
|||
#include <isc/uv.h>
|
||||
#include <isc/work.h>
|
||||
|
||||
#include "async_p.h"
|
||||
#include "job_p.h"
|
||||
#include "loop_p.h"
|
||||
|
||||
void
|
||||
isc_async_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
|
||||
int r;
|
||||
isc_job_t *job = NULL;
|
||||
|
||||
REQUIRE(VALID_LOOP(loop));
|
||||
REQUIRE(cb != NULL);
|
||||
|
||||
job = isc__job_new(loop, cb, cbarg);
|
||||
isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
|
||||
*job = (isc_job_t){
|
||||
.link = ISC_LINK_INITIALIZER,
|
||||
.cb = cb,
|
||||
.cbarg = cbarg,
|
||||
};
|
||||
|
||||
/*
|
||||
* Now send the half-initialized job to the loop queue.
|
||||
*
|
||||
* The ISC_ASTACK_PUSH is counterintuitive here, but uv_idle
|
||||
* drains its queue backwards, so if there's more than one event
|
||||
* to be processed then they need to be in reverse order.
|
||||
*/
|
||||
ISC_ASTACK_PUSH(loop->queue_jobs, job, link);
|
||||
ISC_ASTACK_PUSH(loop->async_jobs, job, link);
|
||||
|
||||
r = uv_async_send(&loop->queue_trigger);
|
||||
int r = uv_async_send(&loop->async_trigger);
|
||||
UV_RUNTIME_CHECK(uv_async_send, r);
|
||||
}
|
||||
|
||||
void
|
||||
isc__async_cb(uv_async_t *handle) {
|
||||
isc_loop_t *loop = uv_handle_get_data(handle);
|
||||
|
||||
REQUIRE(VALID_LOOP(loop));
|
||||
|
||||
ISC_STACK(isc_job_t) drain = ISC_ASTACK_TO_STACK(loop->async_jobs);
|
||||
ISC_LIST(isc_job_t) jobs = ISC_LIST_INITIALIZER;
|
||||
|
||||
isc_job_t *job = ISC_STACK_POP(drain, link);
|
||||
isc_job_t *next = NULL;
|
||||
while (job != NULL) {
|
||||
ISC_LIST_PREPEND(jobs, job, link);
|
||||
|
||||
job = ISC_STACK_POP(drain, link);
|
||||
}
|
||||
|
||||
for (job = ISC_LIST_HEAD(jobs),
|
||||
next = (job ? ISC_LIST_NEXT(job, link) : NULL);
|
||||
job != NULL;
|
||||
job = next, next = (job ? ISC_LIST_NEXT(job, link) : NULL))
|
||||
{
|
||||
job->cb(job->cbarg);
|
||||
|
||||
isc_mem_put(loop->mctx, job, sizeof(*job));
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
isc__async_close(uv_handle_t *handle) {
|
||||
isc_loop_t *loop = uv_handle_get_data(handle);
|
||||
|
||||
isc__async_cb(&loop->async_trigger);
|
||||
}
|
||||
|
|
|
|||
29
lib/isc/async_p.h
Normal file
29
lib/isc/async_p.h
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
||||
*
|
||||
* SPDX-License-Identifier: MPL-2.0
|
||||
*
|
||||
* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, you can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*
|
||||
* See the COPYRIGHT file distributed with this work for additional
|
||||
* information regarding copyright ownership.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <isc/async.h>
|
||||
#include <isc/job.h>
|
||||
#include <isc/loop.h>
|
||||
#include <isc/mem.h>
|
||||
#include <isc/stack.h>
|
||||
#include <isc/uv.h>
|
||||
|
||||
typedef ISC_ASTACK(isc_job_t) isc_asyncstack_t;
|
||||
|
||||
void
|
||||
isc__async_cb(uv_async_t *handle);
|
||||
|
||||
void
|
||||
isc__async_close(uv_handle_t *handle);
|
||||
|
|
@ -15,7 +15,7 @@
|
|||
* \brief The isc_async unit provides a way to schedule jobs on any isc
|
||||
* event loop (isc_loop unit)
|
||||
*
|
||||
* The unit is built around the uv_async_t primitive and locked list with
|
||||
* The unit is built around the uv_async_t primitive and lock-free stack with
|
||||
* isc_job_cb. Jobs are first scheduled onto the locked list, then the
|
||||
* uv_async_send() is called and the uv_async_t callback processes the enqueued
|
||||
* jobs are scheduled to be run on the isc event loop.
|
||||
|
|
@ -45,4 +45,10 @@ isc_async_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg);
|
|||
*\li 'cbarg' is passed to the 'cb' as the only argument, may be NULL
|
||||
*/
|
||||
|
||||
#define isc_async_current(loopmgr, cb, cbarg) \
|
||||
isc_async_run(isc_loop_current(loopmgr), cb, cbarg)
|
||||
/*%<
|
||||
* Helper macro to run the job on the current loop
|
||||
*/
|
||||
|
||||
ISC_LANG_ENDDECLS
|
||||
|
|
|
|||
|
|
@ -30,23 +30,31 @@
|
|||
#include <isc/types.h>
|
||||
|
||||
typedef void (*isc_job_cb)(void *);
|
||||
typedef struct isc_job isc_job_t;
|
||||
|
||||
struct isc_job {
|
||||
isc_job_cb cb;
|
||||
void *cbarg;
|
||||
ISC_LINK(isc_job_t) link;
|
||||
};
|
||||
|
||||
#define ISC_JOB_INITIALIZER \
|
||||
{ \
|
||||
.link = ISC_LINK_INITIALIZER \
|
||||
}
|
||||
|
||||
ISC_LANG_BEGINDECLS
|
||||
|
||||
void
|
||||
isc_job_run(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg);
|
||||
isc_job_run(isc_loop_t *loop, isc_job_t *job, isc_job_cb cb, void *cbarg);
|
||||
/*%<
|
||||
* Schedule the job callback 'cb' to be run on the currently
|
||||
* running event loop.
|
||||
*
|
||||
* Note: Because of the design of uv_idle_start(), if more than one
|
||||
* job is posted at once, the jobs will be pushed onto a stack and
|
||||
* executed in last-in-first-out order. To post events that are
|
||||
* executed in order posted, use isc_async_run() instead.
|
||||
*
|
||||
* Requires:
|
||||
*
|
||||
*\li 'loopmgr' is the active loop manager.
|
||||
*\li 'loop' is the current loop
|
||||
*\li 'job' is initialized
|
||||
*\li 'cb' is a callback function, must be non-NULL
|
||||
*\li 'cbarg' is passed to the 'cb' as the only argument, may be NULL
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include <inttypes.h>
|
||||
|
||||
#include <isc/job.h>
|
||||
#include <isc/lang.h>
|
||||
#include <isc/mem.h>
|
||||
#include <isc/refcount.h>
|
||||
|
|
|
|||
|
|
@ -48,19 +48,18 @@ typedef struct isc_httpdurl isc_httpdurl_t; /*%< HTTP URL */
|
|||
typedef void(isc_httpdondestroy_t)(void *); /*%< Callback on destroying httpd */
|
||||
typedef struct isc_interface isc_interface_t; /*%< Interface */
|
||||
typedef struct isc_interfaceiter isc_interfaceiter_t; /*%< Interface Iterator */
|
||||
typedef struct isc_job isc_job_t;
|
||||
typedef struct isc_lex isc_lex_t; /*%< Lex */
|
||||
typedef struct isc_log isc_log_t; /*%< Log */
|
||||
typedef struct isc_logcategory isc_logcategory_t; /*%< Log Category */
|
||||
typedef struct isc_logconfig isc_logconfig_t; /*%< Log Configuration */
|
||||
typedef struct isc_logmodule isc_logmodule_t; /*%< Log Module */
|
||||
typedef struct isc_loop isc_loop_t; /*%< Event loop */
|
||||
typedef struct isc_loopmgr isc_loopmgr_t; /*%< Event loop manager */
|
||||
typedef struct isc_mem isc_mem_t; /*%< Memory */
|
||||
typedef struct isc_mempool isc_mempool_t; /*%< Memory Pool */
|
||||
typedef struct isc_netaddr isc_netaddr_t; /*%< Net Address */
|
||||
typedef struct isc_netprefix isc_netprefix_t; /*%< Net Prefix */
|
||||
typedef struct isc_nm isc_nm_t; /*%< Network manager */
|
||||
typedef struct isc_lex isc_lex_t; /*%< Lex */
|
||||
typedef struct isc_log isc_log_t; /*%< Log */
|
||||
typedef struct isc_logcategory isc_logcategory_t; /*%< Log Category */
|
||||
typedef struct isc_logconfig isc_logconfig_t; /*%< Log Configuration */
|
||||
typedef struct isc_logmodule isc_logmodule_t; /*%< Log Module */
|
||||
typedef struct isc_loop isc_loop_t; /*%< Event loop */
|
||||
typedef struct isc_loopmgr isc_loopmgr_t; /*%< Event loop manager */
|
||||
typedef struct isc_mem isc_mem_t; /*%< Memory */
|
||||
typedef struct isc_mempool isc_mempool_t; /*%< Memory Pool */
|
||||
typedef struct isc_netaddr isc_netaddr_t; /*%< Net Address */
|
||||
typedef struct isc_netprefix isc_netprefix_t; /*%< Net Prefix */
|
||||
typedef struct isc_nm isc_nm_t; /*%< Network manager */
|
||||
typedef struct isc_nmsocket isc_nmsocket_t; /*%< Network manager socket */
|
||||
typedef struct isc_nmhandle isc_nmhandle_t; /*%< Network manager handle */
|
||||
typedef struct isc_portset isc_portset_t; /*%< Port Set */
|
||||
|
|
|
|||
109
lib/isc/job.c
109
lib/isc/job.c
|
|
@ -36,100 +36,51 @@
|
|||
#include "job_p.h"
|
||||
#include "loop_p.h"
|
||||
|
||||
#define JOB_MAGIC ISC_MAGIC('J', 'O', 'B', ' ')
|
||||
#define VALID_JOB(t) ISC_MAGIC_VALID(t, JOB_MAGIC)
|
||||
|
||||
/*
|
||||
* Private: static
|
||||
*/
|
||||
|
||||
static void
|
||||
isc__job_close_cb(uv_handle_t *handle) {
|
||||
isc_job_t *job = uv_handle_get_data(handle);
|
||||
isc_loop_t *loop = job->loop;
|
||||
|
||||
REQUIRE(loop == isc_loop_current(job->loop->loopmgr));
|
||||
|
||||
isc_mem_put(loop->mctx, job, sizeof(*job));
|
||||
|
||||
isc_loop_detach(&loop);
|
||||
}
|
||||
|
||||
static void
|
||||
isc__job_destroy(isc_job_t *job) {
|
||||
REQUIRE(VALID_JOB(job));
|
||||
REQUIRE(job->loop == isc_loop_current(job->loop->loopmgr));
|
||||
|
||||
job->magic = 0;
|
||||
|
||||
uv_close(&job->idle, isc__job_close_cb);
|
||||
}
|
||||
|
||||
static void
|
||||
isc__job_cb(uv_idle_t *idle) {
|
||||
isc_job_t *job = uv_handle_get_data(idle);
|
||||
int r;
|
||||
|
||||
REQUIRE(job->loop == isc_loop_current(job->loop->loopmgr));
|
||||
|
||||
job->cb(job->cbarg);
|
||||
|
||||
r = uv_idle_stop(idle);
|
||||
UV_RUNTIME_CHECK(uv_idle_stop, r);
|
||||
|
||||
isc__job_destroy(job);
|
||||
}
|
||||
|
||||
/*
|
||||
* Public: #include <isc/job.h>
|
||||
*/
|
||||
|
||||
void
|
||||
isc_job_run(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg) {
|
||||
isc_loop_t *loop = isc_loop_current(loopmgr);
|
||||
isc_job_t *job = isc__job_new(loop, cb, cbarg);
|
||||
isc__job_init(loop, job);
|
||||
isc__job_run(job);
|
||||
isc_job_run(isc_loop_t *loop, isc_job_t *job, isc_job_cb cb, void *cbarg) {
|
||||
if (ISC_LIST_EMPTY(loop->run_jobs)) {
|
||||
uv_idle_start(&loop->run_trigger, isc__job_cb);
|
||||
}
|
||||
|
||||
job->cb = cb;
|
||||
job->cbarg = cbarg;
|
||||
|
||||
ISC_LIST_APPEND(loop->run_jobs, job, link);
|
||||
}
|
||||
|
||||
/*
|
||||
* Protected: #include <job_p.h>
|
||||
*/
|
||||
|
||||
isc_job_t *
|
||||
isc__job_new(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
|
||||
isc_job_t *job = NULL;
|
||||
void
|
||||
isc__job_cb(uv_idle_t *handle) {
|
||||
isc_loop_t *loop = uv_handle_get_data(handle);
|
||||
ISC_LIST(isc_job_t) jobs = ISC_LIST_INITIALIZER;
|
||||
|
||||
REQUIRE(VALID_LOOP(loop));
|
||||
REQUIRE(cb != NULL);
|
||||
ISC_LIST_MOVE(jobs, loop->run_jobs);
|
||||
|
||||
job = isc_mem_get(loop->mctx, sizeof(*job));
|
||||
*job = (isc_job_t){
|
||||
.magic = JOB_MAGIC,
|
||||
.cb = cb,
|
||||
.cbarg = cbarg,
|
||||
.link = ISC_LINK_INITIALIZER,
|
||||
};
|
||||
isc_job_t *job, *next;
|
||||
for (job = ISC_LIST_HEAD(jobs),
|
||||
next = (job != NULL) ? ISC_LIST_NEXT(job, link) : NULL;
|
||||
job != NULL;
|
||||
job = next, next = job ? ISC_LIST_NEXT(job, link) : NULL)
|
||||
{
|
||||
ISC_LIST_UNLINK(jobs, job, link);
|
||||
job->cb(job->cbarg);
|
||||
}
|
||||
|
||||
isc_loop_attach(loop, &job->loop);
|
||||
|
||||
return (job);
|
||||
if (ISC_LIST_EMPTY(loop->run_jobs)) {
|
||||
uv_idle_stop(&loop->run_trigger);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
isc__job_init(isc_loop_t *loop, isc_job_t *job) {
|
||||
int r = uv_idle_init(&loop->loop, &job->idle);
|
||||
UV_RUNTIME_CHECK(uv_idle_init, r);
|
||||
uv_handle_set_data(&job->idle, job);
|
||||
}
|
||||
|
||||
void
|
||||
isc__job_run(isc_job_t *job) {
|
||||
int r;
|
||||
|
||||
REQUIRE(VALID_JOB(job));
|
||||
REQUIRE(job->loop == isc_loop_current(job->loop->loopmgr));
|
||||
|
||||
r = uv_idle_start(&job->idle, isc__job_cb);
|
||||
UV_RUNTIME_CHECK(uv_idle_start, r);
|
||||
isc__job_close(uv_handle_t *handle) {
|
||||
isc_loop_t *loop = uv_handle_get_data(handle);
|
||||
|
||||
isc__job_cb(&loop->run_trigger);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,12 +15,12 @@
|
|||
|
||||
#include <isc/job.h>
|
||||
#include <isc/loop.h>
|
||||
#include <isc/uv.h>
|
||||
|
||||
isc_job_t *
|
||||
isc__job_new(isc_loop_t *loop, isc_job_cb cb, void *cbarg);
|
||||
typedef ISC_LIST(isc_job_t) isc_joblist_t;
|
||||
|
||||
void
|
||||
isc__job_init(isc_loop_t *loop, isc_job_t *job);
|
||||
isc__job_cb(uv_idle_t *handle);
|
||||
|
||||
void
|
||||
isc__job_run(isc_job_t *job);
|
||||
isc__job_close(uv_handle_t *handle);
|
||||
|
|
|
|||
102
lib/isc/loop.c
102
lib/isc/loop.c
|
|
@ -38,6 +38,7 @@
|
|||
#include <isc/uv.h>
|
||||
#include <isc/work.h>
|
||||
|
||||
#include "async_p.h"
|
||||
#include "job_p.h"
|
||||
#include "loop_p.h"
|
||||
|
||||
|
|
@ -140,8 +141,10 @@ static void
|
|||
destroy_cb(uv_async_t *handle) {
|
||||
isc_loop_t *loop = uv_handle_get_data(handle);
|
||||
|
||||
/* Again, the first close callback here is called last */
|
||||
uv_close(&loop->async_trigger, isc__async_close);
|
||||
uv_close(&loop->run_trigger, isc__job_close);
|
||||
uv_close(&loop->destroy_trigger, NULL);
|
||||
uv_close(&loop->queue_trigger, NULL);
|
||||
uv_close(&loop->pause_trigger, NULL);
|
||||
uv_close(&loop->wakeup_trigger, NULL);
|
||||
uv_close(&loop->quiescent, NULL);
|
||||
|
|
@ -175,29 +178,14 @@ shutdown_cb(uv_async_t *handle) {
|
|||
isc_job_t *prev = ISC_LIST_PREV(job, link);
|
||||
ISC_LIST_UNLINK(loop->teardown_jobs, job, link);
|
||||
|
||||
isc__job_run(job);
|
||||
job->cb(job->cbarg);
|
||||
|
||||
isc_mem_put(loop->mctx, job, sizeof(*job));
|
||||
|
||||
job = prev;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
queue_cb(uv_async_t *handle) {
|
||||
isc_loop_t *loop = uv_handle_get_data(handle);
|
||||
|
||||
REQUIRE(VALID_LOOP(loop));
|
||||
|
||||
ISC_STACK(isc_job_t) drain = ISC_ASTACK_TO_STACK(loop->queue_jobs);
|
||||
isc_job_t *job = ISC_STACK_POP(drain, link);
|
||||
|
||||
while (job != NULL) {
|
||||
isc__job_init(loop, job);
|
||||
isc__job_run(job);
|
||||
|
||||
job = ISC_STACK_POP(drain, link);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
wakeup_cb(uv_async_t *handle) {
|
||||
/* we only woke up to make the loop take a spin */
|
||||
|
|
@ -209,7 +197,8 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
|
|||
*loop = (isc_loop_t){
|
||||
.tid = tid,
|
||||
.loopmgr = loopmgr,
|
||||
.queue_jobs = ISC_ASTACK_INITIALIZER,
|
||||
.async_jobs = ISC_ASTACK_INITIALIZER,
|
||||
.run_jobs = ISC_LIST_INITIALIZER,
|
||||
.setup_jobs = ISC_LIST_INITIALIZER,
|
||||
.teardown_jobs = ISC_LIST_INITIALIZER,
|
||||
};
|
||||
|
|
@ -225,9 +214,13 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
|
|||
UV_RUNTIME_CHECK(uv_async_init, r);
|
||||
uv_handle_set_data(&loop->shutdown_trigger, loop);
|
||||
|
||||
r = uv_async_init(&loop->loop, &loop->queue_trigger, queue_cb);
|
||||
r = uv_async_init(&loop->loop, &loop->async_trigger, isc__async_cb);
|
||||
UV_RUNTIME_CHECK(uv_async_init, r);
|
||||
uv_handle_set_data(&loop->queue_trigger, loop);
|
||||
uv_handle_set_data(&loop->async_trigger, loop);
|
||||
|
||||
r = uv_idle_init(&loop->loop, &loop->run_trigger);
|
||||
UV_RUNTIME_CHECK(uv_idle_init, r);
|
||||
uv_handle_set_data(&loop->run_trigger, loop);
|
||||
|
||||
r = uv_async_init(&loop->loop, &loop->destroy_trigger, destroy_cb);
|
||||
UV_RUNTIME_CHECK(uv_async_init, r);
|
||||
|
|
@ -251,25 +244,31 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
|
|||
}
|
||||
|
||||
static void
|
||||
loop_run(isc_loop_t *loop) {
|
||||
int r;
|
||||
isc_job_t *job;
|
||||
setup_jobs_cb(void *arg) {
|
||||
isc_loop_t *loop = arg;
|
||||
isc_job_t *job = ISC_LIST_HEAD(loop->setup_jobs);
|
||||
|
||||
job = ISC_LIST_HEAD(loop->setup_jobs);
|
||||
while (job != NULL) {
|
||||
isc_job_t *next = ISC_LIST_NEXT(job, link);
|
||||
ISC_LIST_UNLINK(loop->setup_jobs, job, link);
|
||||
|
||||
isc__job_run(job);
|
||||
job->cb(job->cbarg);
|
||||
|
||||
isc_mem_put(loop->mctx, job, sizeof(*job));
|
||||
|
||||
job = next;
|
||||
}
|
||||
}
|
||||
|
||||
r = uv_prepare_start(&loop->quiescent, isc__qsbr_quiescent_cb);
|
||||
static void
|
||||
loop_run(isc_loop_t *loop) {
|
||||
int r = uv_prepare_start(&loop->quiescent, isc__qsbr_quiescent_cb);
|
||||
UV_RUNTIME_CHECK(uv_prepare_start, r);
|
||||
|
||||
isc_barrier_wait(&loop->loopmgr->starting);
|
||||
|
||||
isc_async_run(loop, setup_jobs_cb, loop);
|
||||
|
||||
r = uv_run(&loop->loop, UV_RUN_DEFAULT);
|
||||
UV_RUNTIME_CHECK(uv_run, r);
|
||||
|
||||
|
|
@ -281,7 +280,8 @@ loop_close(isc_loop_t *loop) {
|
|||
int r = uv_loop_close(&loop->loop);
|
||||
UV_RUNTIME_CHECK(uv_loop_close, r);
|
||||
|
||||
INSIST(ISC_ASTACK_EMPTY(loop->queue_jobs));
|
||||
INSIST(ISC_ASTACK_EMPTY(loop->async_jobs));
|
||||
INSIST(ISC_LIST_EMPTY(loop->run_jobs));
|
||||
|
||||
loop->magic = 0;
|
||||
|
||||
|
|
@ -382,53 +382,41 @@ isc_loopmgr_create(isc_mem_t *mctx, uint32_t nloops, isc_loopmgr_t **loopmgrp) {
|
|||
|
||||
isc_job_t *
|
||||
isc_loop_setup(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
|
||||
isc_loopmgr_t *loopmgr = NULL;
|
||||
isc_job_t *job = NULL;
|
||||
|
||||
REQUIRE(VALID_LOOP(loop));
|
||||
REQUIRE(cb != NULL);
|
||||
|
||||
loopmgr = loop->loopmgr;
|
||||
isc_loopmgr_t *loopmgr = loop->loopmgr;
|
||||
isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
|
||||
*job = (isc_job_t){
|
||||
.cb = cb,
|
||||
.cbarg = cbarg,
|
||||
.link = ISC_LINK_INITIALIZER,
|
||||
};
|
||||
|
||||
REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
|
||||
atomic_load(&loopmgr->paused));
|
||||
|
||||
job = isc__job_new(loop, cb, cbarg);
|
||||
isc__job_init(loop, job);
|
||||
|
||||
/*
|
||||
* The ISC_LIST_PREPEND is counterintuitive here, but actually, the
|
||||
* uv_idle_start() puts the item on the HEAD of the internal list, so we
|
||||
* want to store items here in reverse order, so on the uv_loop, they
|
||||
* are scheduled in the correct order
|
||||
*/
|
||||
ISC_LIST_PREPEND(loop->setup_jobs, job, link);
|
||||
ISC_LIST_APPEND(loop->setup_jobs, job, link);
|
||||
|
||||
return (job);
|
||||
}
|
||||
|
||||
isc_job_t *
|
||||
isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
|
||||
isc_loopmgr_t *loopmgr = NULL;
|
||||
isc_job_t *job = NULL;
|
||||
|
||||
REQUIRE(VALID_LOOP(loop));
|
||||
|
||||
loopmgr = loop->loopmgr;
|
||||
isc_loopmgr_t *loopmgr = loop->loopmgr;
|
||||
isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
|
||||
*job = (isc_job_t){
|
||||
.cb = cb,
|
||||
.cbarg = cbarg,
|
||||
.link = ISC_LINK_INITIALIZER,
|
||||
};
|
||||
|
||||
REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
|
||||
atomic_load(&loopmgr->paused));
|
||||
|
||||
job = isc__job_new(loop, cb, cbarg);
|
||||
isc__job_init(loop, job);
|
||||
|
||||
/*
|
||||
* The ISC_LIST_PREPEND is counterintuitive here, but actually, the
|
||||
* uv_idle_start() puts the item on the HEAD of the internal list, so we
|
||||
* want to store items here in reverse order, so on the uv_loop, they
|
||||
* are scheduled in the correct order
|
||||
*/
|
||||
ISC_LIST_PREPEND(loop->teardown_jobs, job, link);
|
||||
ISC_LIST_APPEND(loop->teardown_jobs, job, link);
|
||||
|
||||
return (job);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
#include <inttypes.h>
|
||||
|
||||
#include <isc/barrier.h>
|
||||
#include <isc/job.h>
|
||||
#include <isc/lang.h>
|
||||
#include <isc/loop.h>
|
||||
#include <isc/magic.h>
|
||||
|
|
@ -30,15 +31,15 @@
|
|||
#include <isc/uv.h>
|
||||
#include <isc/work.h>
|
||||
|
||||
#include "async_p.h"
|
||||
#include "job_p.h"
|
||||
|
||||
/*
|
||||
* Per-thread loop
|
||||
*/
|
||||
#define LOOP_MAGIC ISC_MAGIC('L', 'O', 'O', 'P')
|
||||
#define VALID_LOOP(t) ISC_MAGIC_VALID(t, LOOP_MAGIC)
|
||||
|
||||
typedef ISC_LIST(isc_job_t) isc_joblist_t;
|
||||
typedef ISC_ASTACK(isc_job_t) isc_jobstack_t;
|
||||
|
||||
struct isc_loop {
|
||||
int magic;
|
||||
isc_refcount_t references;
|
||||
|
|
@ -56,8 +57,12 @@ struct isc_loop {
|
|||
bool shuttingdown;
|
||||
|
||||
/* Async queue */
|
||||
uv_async_t queue_trigger;
|
||||
isc_jobstack_t queue_jobs;
|
||||
uv_async_t async_trigger;
|
||||
isc_asyncstack_t async_jobs;
|
||||
|
||||
/* Jobs queue */
|
||||
uv_idle_t run_trigger;
|
||||
isc_joblist_t run_jobs;
|
||||
|
||||
/* Pause */
|
||||
uv_async_t pause_trigger;
|
||||
|
|
@ -130,15 +135,6 @@ struct isc_signal {
|
|||
#define JOB_MAGIC ISC_MAGIC('J', 'O', 'B', ' ')
|
||||
#define VALID_JOB(t) ISC_MAGIC_VALID(t, JOB_MAGIC)
|
||||
|
||||
struct isc_job {
|
||||
int magic;
|
||||
uv_idle_t idle;
|
||||
isc_loop_t *loop;
|
||||
isc_job_cb cb;
|
||||
void *cbarg;
|
||||
ISC_LINK(isc_job_t) link;
|
||||
};
|
||||
|
||||
/*
|
||||
* Work to be offloaded to an external thread.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -1156,8 +1156,7 @@ http_send_outgoing(isc_nm_http_session_t *session, isc_nmhandle_t *httphandle,
|
|||
|
||||
INSIST(VALID_NMHANDLE(httphandle));
|
||||
|
||||
newcb = isc__nm_uvreq_get(httphandle->sock->worker,
|
||||
httphandle->sock);
|
||||
newcb = isc__nm_uvreq_get(httphandle->sock);
|
||||
newcb->cb.send = cb;
|
||||
newcb->cbarg = cbarg;
|
||||
isc_nmhandle_attach(httphandle, &newcb->handle);
|
||||
|
|
@ -1483,7 +1482,7 @@ isc_nm_httpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
|
|||
atomic_init(&sock->client, true);
|
||||
|
||||
if (isc__nm_closing(worker)) {
|
||||
isc__nm_uvreq_t *req = isc__nm_uvreq_get(worker, sock);
|
||||
isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock);
|
||||
|
||||
req->cb.connect = cb;
|
||||
req->cbarg = cbarg;
|
||||
|
|
@ -2178,7 +2177,7 @@ isc__nm_http_send(isc_nmhandle_t *handle, const isc_region_t *region,
|
|||
REQUIRE(VALID_NMSOCK(sock));
|
||||
REQUIRE(sock->tid == isc_tid());
|
||||
|
||||
uvreq = isc__nm_uvreq_get(sock->worker, sock);
|
||||
uvreq = isc__nm_uvreq_get(sock);
|
||||
isc_nmhandle_attach(handle, &uvreq->handle);
|
||||
uvreq->cb.send = cb;
|
||||
uvreq->cbarg = cbarg;
|
||||
|
|
@ -2198,7 +2197,7 @@ failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
|
|||
if (req->cb.send != NULL) {
|
||||
isc__nm_sendcb(sock, req, eresult, true);
|
||||
} else {
|
||||
isc__nm_uvreq_put(&req, sock);
|
||||
isc__nm_uvreq_put(&req);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -2218,7 +2217,7 @@ client_httpsend(isc_nmhandle_t *handle, isc_nmsocket_t *sock,
|
|||
}
|
||||
|
||||
http_do_bio(sock->h2.session, handle, cb, cbarg);
|
||||
isc__nm_uvreq_put(&req, sock);
|
||||
isc__nm_uvreq_put(&req);
|
||||
}
|
||||
|
||||
static void
|
||||
|
|
@ -2273,7 +2272,7 @@ server_httpsend(isc_nmhandle_t *handle, isc_nmsocket_t *sock,
|
|||
} else {
|
||||
cb(handle, result, cbarg);
|
||||
}
|
||||
isc__nm_uvreq_put(&req, sock);
|
||||
isc__nm_uvreq_put(&req);
|
||||
}
|
||||
|
||||
static void
|
||||
|
|
|
|||
|
|
@ -142,10 +142,10 @@ STATIC_ASSERT(ISC_NETMGR_TCP_RECVBUF_SIZE <= ISC_NETMGR_RECVBUF_SIZE,
|
|||
ievent->file = file; \
|
||||
ievent->line = line; \
|
||||
ievent->func = func;
|
||||
#define isc__nm_uvreq_get(req, sock) \
|
||||
isc___nm_uvreq_get(req, sock, __FILE__, __LINE__, __func__)
|
||||
#define isc__nm_uvreq_put(req, sock) \
|
||||
isc___nm_uvreq_put(req, sock, __FILE__, __LINE__, __func__)
|
||||
#define isc__nm_uvreq_get(sock) \
|
||||
isc___nm_uvreq_get(sock, __FILE__, __LINE__, __func__)
|
||||
#define isc__nm_uvreq_put(req) \
|
||||
isc___nm_uvreq_put(req, __FILE__, __LINE__, __func__)
|
||||
#define isc__nmsocket_init(sock, mgr, type, iface, parent) \
|
||||
isc___nmsocket_init(sock, mgr, type, iface, parent, __FILE__, \
|
||||
__LINE__, __func__)
|
||||
|
|
@ -168,8 +168,8 @@ STATIC_ASSERT(ISC_NETMGR_TCP_RECVBUF_SIZE <= ISC_NETMGR_RECVBUF_SIZE,
|
|||
#define FLARG_PASS
|
||||
#define FLARG_IEVENT(ievent)
|
||||
#define FLARG_IEVENT_PASS(ievent)
|
||||
#define isc__nm_uvreq_get(req, sock) isc___nm_uvreq_get(req, sock)
|
||||
#define isc__nm_uvreq_put(req, sock) isc___nm_uvreq_put(req, sock)
|
||||
#define isc__nm_uvreq_get(sock) isc___nm_uvreq_get(sock)
|
||||
#define isc__nm_uvreq_put(req) isc___nm_uvreq_put(req)
|
||||
#define isc__nmsocket_init(sock, mgr, type, iface, parent) \
|
||||
isc___nmsocket_init(sock, mgr, type, iface, parent)
|
||||
#define isc__nmsocket_put(sockp) isc___nmsocket_put(sockp)
|
||||
|
|
@ -252,7 +252,6 @@ typedef union {
|
|||
isc_nm_recv_cb_t recv;
|
||||
isc_nm_cb_t send;
|
||||
isc_nm_cb_t connect;
|
||||
isc_nm_accept_cb_t accept;
|
||||
} isc__nm_cb_t;
|
||||
|
||||
/*
|
||||
|
|
@ -291,7 +290,9 @@ struct isc__nm_uvreq {
|
|||
uv_fs_t fs;
|
||||
} uv_req;
|
||||
ISC_LINK(isc__nm_uvreq_t) link;
|
||||
ISC_LINK(isc__nm_uvreq_t) inactive_link;
|
||||
ISC_LINK(isc__nm_uvreq_t) active_link;
|
||||
|
||||
isc_job_t job;
|
||||
};
|
||||
|
||||
/*
|
||||
|
|
@ -627,11 +628,16 @@ struct isc_nmsocket {
|
|||
atomic_bool keepalive;
|
||||
|
||||
/*%
|
||||
* 'spare' handles for that can be reused to avoid allocations,
|
||||
* for UDP.
|
||||
* 'spare' handles for that can be reused to avoid allocations, for UDP.
|
||||
*/
|
||||
ISC_LIST(isc_nmhandle_t) inactive_handles;
|
||||
|
||||
/*%
|
||||
* 'active' handles and uvreqs, mostly for debugging purposes.
|
||||
*/
|
||||
ISC_LIST(isc_nmhandle_t) active_handles;
|
||||
ISC_LIST(isc__nm_uvreq_t) active_uvreqs;
|
||||
|
||||
/*%
|
||||
* Used to pass a result back from listen or connect events.
|
||||
*/
|
||||
|
|
@ -664,7 +670,6 @@ struct isc_nmsocket {
|
|||
int backtrace_size;
|
||||
#endif
|
||||
LINK(isc_nmsocket_t) active_link;
|
||||
ISC_LIST(isc_nmhandle_t) active_handles;
|
||||
};
|
||||
|
||||
void
|
||||
|
|
@ -695,14 +700,14 @@ isc___nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t const *peer,
|
|||
*/
|
||||
|
||||
isc__nm_uvreq_t *
|
||||
isc___nm_uvreq_get(isc__networker_t *worker, isc_nmsocket_t *sock FLARG);
|
||||
isc___nm_uvreq_get(isc_nmsocket_t *sock FLARG);
|
||||
/*%<
|
||||
* Get a UV request structure for the socket 'sock', allocating a
|
||||
* new one if there isn't one available in 'sock->inactivereqs'.
|
||||
*/
|
||||
|
||||
void
|
||||
isc___nm_uvreq_put(isc__nm_uvreq_t **req, isc_nmsocket_t *sock FLARG);
|
||||
isc___nm_uvreq_put(isc__nm_uvreq_t **req FLARG);
|
||||
/*%<
|
||||
* Completes the use of a UV request structure, setting '*req' to NULL.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -48,13 +48,6 @@
|
|||
#include "openssl_shim.h"
|
||||
#include "trampoline_p.h"
|
||||
|
||||
/*%
|
||||
* How many isc_nmhandles and isc_nm_uvreqs will we be
|
||||
* caching for reuse in a socket.
|
||||
*/
|
||||
#define ISC_NM_HANDLES_STACK_SIZE 600
|
||||
#define ISC_NM_REQS_STACK_SIZE 600
|
||||
|
||||
/*%
|
||||
* Shortcut index arrays to get access to statistics counters.
|
||||
*/
|
||||
|
|
@ -249,8 +242,6 @@ isc_netmgr_create(isc_mem_t *mctx, isc_loopmgr_t *loopmgr, isc_nm_t **netmgrp) {
|
|||
|
||||
isc_mempool_create(worker->mctx, sizeof(isc__nm_uvreq_t),
|
||||
&worker->uvreq_pool);
|
||||
isc_mempool_setfreemax(worker->uvreq_pool,
|
||||
ISC_NM_REQS_STACK_SIZE);
|
||||
|
||||
isc_loop_attach(loop, &worker->loop);
|
||||
isc_loop_teardown(loop, networker_teardown, worker);
|
||||
|
|
@ -872,7 +863,7 @@ dequeue_handle(isc_nmsocket_t *sock) {
|
|||
return (handle);
|
||||
}
|
||||
#else
|
||||
UNUSED(sock);
|
||||
INSIST(ISC_LIST_EMPTY(sock->inactive_handles));
|
||||
#endif /* !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__ */
|
||||
return (NULL);
|
||||
}
|
||||
|
|
@ -1027,8 +1018,8 @@ nmhandle_destroy(isc_nmhandle_t *handle) {
|
|||
* call it now asynchronously.
|
||||
*/
|
||||
if (sock->closehandle_cb != NULL) {
|
||||
isc_job_run(sock->worker->netmgr->loopmgr,
|
||||
isc__nm_closehandle_job, sock);
|
||||
isc_async_run(sock->worker->loop, isc__nm_closehandle_job,
|
||||
sock);
|
||||
} else {
|
||||
isc___nmsocket_detach(&sock FLARG_PASS);
|
||||
}
|
||||
|
|
@ -1081,7 +1072,7 @@ isc__nm_failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
|
|||
if (req->cb.send != NULL) {
|
||||
isc__nm_sendcb(sock, req, eresult, async);
|
||||
} else {
|
||||
isc__nm_uvreq_put(&req, sock);
|
||||
isc__nm_uvreq_put(&req);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1348,7 +1339,7 @@ isc__nm_uvreq_t *
|
|||
isc__nm_get_read_req(isc_nmsocket_t *sock, isc_sockaddr_t *sockaddr) {
|
||||
isc__nm_uvreq_t *req = NULL;
|
||||
|
||||
req = isc__nm_uvreq_get(sock->worker, sock);
|
||||
req = isc__nm_uvreq_get(sock);
|
||||
req->cb.recv = sock->recv_cb;
|
||||
req->cbarg = sock->recv_cbarg;
|
||||
|
||||
|
|
@ -1594,58 +1585,51 @@ isc_nmhandle_netmgr(isc_nmhandle_t *handle) {
|
|||
return (handle->sock->worker->netmgr);
|
||||
}
|
||||
|
||||
/* FIXME: Use per-worker mempool */
|
||||
isc__nm_uvreq_t *
|
||||
isc___nm_uvreq_get(isc__networker_t *worker, isc_nmsocket_t *sock FLARG) {
|
||||
isc__nm_uvreq_t *req = NULL;
|
||||
|
||||
REQUIRE(worker != NULL);
|
||||
isc___nm_uvreq_get(isc_nmsocket_t *sock FLARG) {
|
||||
REQUIRE(VALID_NMSOCK(sock));
|
||||
REQUIRE(sock->tid == isc_tid());
|
||||
|
||||
req = isc_mempool_get(worker->uvreq_pool);
|
||||
isc__networker_t *worker = sock->worker;
|
||||
|
||||
isc__nm_uvreq_t *req = isc_mempool_get(worker->uvreq_pool);
|
||||
*req = (isc__nm_uvreq_t){
|
||||
.connect_tries = 3,
|
||||
.link = ISC_LINK_INITIALIZER,
|
||||
.inactive_link = ISC_LINK_INITIALIZER,
|
||||
.uv_req.req.data = req,
|
||||
.active_link = ISC_LINK_INITIALIZER,
|
||||
.job = ISC_JOB_INITIALIZER,
|
||||
.magic = UVREQ_MAGIC,
|
||||
};
|
||||
uv_handle_set_data(&req->uv_req.handle, req);
|
||||
|
||||
isc___nmsocket_attach(sock, &req->sock FLARG_PASS);
|
||||
|
||||
ISC_LIST_APPEND(sock->active_uvreqs, req, active_link);
|
||||
|
||||
return (req);
|
||||
}
|
||||
|
||||
void
|
||||
isc___nm_uvreq_put(isc__nm_uvreq_t **req0, isc_nmsocket_t *sock FLARG) {
|
||||
REQUIRE(req0 != NULL);
|
||||
REQUIRE(VALID_UVREQ(*req0));
|
||||
REQUIRE(VALID_NMSOCK(sock));
|
||||
REQUIRE(sock->tid == isc_tid());
|
||||
isc___nm_uvreq_put(isc__nm_uvreq_t **reqp FLARG) {
|
||||
REQUIRE(reqp != NULL && VALID_UVREQ(*reqp));
|
||||
|
||||
isc__nm_uvreq_t *req = NULL;
|
||||
isc_nmhandle_t *handle = NULL;
|
||||
isc__networker_t *worker = sock->worker;
|
||||
isc__nm_uvreq_t *req = *reqp;
|
||||
isc_nmhandle_t *handle = req->handle;
|
||||
isc_nmsocket_t *sock = req->sock;
|
||||
|
||||
req = *req0;
|
||||
*req0 = NULL;
|
||||
*reqp = NULL;
|
||||
req->handle = NULL;
|
||||
|
||||
INSIST(sock == req->sock);
|
||||
REQUIRE(VALID_UVREQ(req));
|
||||
|
||||
req->magic = 0;
|
||||
|
||||
/*
|
||||
* We need to save this first to make sure that handle,
|
||||
* sock, and the netmgr won't all disappear.
|
||||
*/
|
||||
ISC_SWAP(handle, req->handle);
|
||||
|
||||
isc_mempool_put(worker->uvreq_pool, req);
|
||||
ISC_LIST_UNLINK(sock->active_uvreqs, req, active_link);
|
||||
|
||||
if (handle != NULL) {
|
||||
isc__nmhandle_detach(&handle FLARG_PASS);
|
||||
}
|
||||
|
||||
isc_mempool_put(sock->worker->uvreq_pool, req);
|
||||
|
||||
isc___nmsocket_detach(&sock FLARG_PASS);
|
||||
}
|
||||
|
||||
|
|
@ -1840,16 +1824,11 @@ isc__nmsocket_barrier_init(isc_nmsocket_t *listener) {
|
|||
}
|
||||
|
||||
static void
|
||||
isc__nm_connectcb_job(void *arg) {
|
||||
isc___nm_connectcb(void *arg) {
|
||||
isc__nm_uvreq_t *uvreq = arg;
|
||||
isc_result_t eresult = uvreq->result;
|
||||
|
||||
REQUIRE(VALID_UVREQ(uvreq));
|
||||
REQUIRE(VALID_NMHANDLE(uvreq->handle));
|
||||
REQUIRE(uvreq->cb.connect != NULL);
|
||||
|
||||
uvreq->cb.connect(uvreq->handle, eresult, uvreq->cbarg);
|
||||
isc__nm_uvreq_put(&uvreq, uvreq->handle->sock);
|
||||
uvreq->cb.connect(uvreq->handle, uvreq->result, uvreq->cbarg);
|
||||
isc__nm_uvreq_put(&uvreq);
|
||||
}
|
||||
|
||||
void
|
||||
|
|
@ -1858,31 +1837,28 @@ isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
|
|||
REQUIRE(VALID_NMSOCK(sock));
|
||||
REQUIRE(VALID_UVREQ(uvreq));
|
||||
REQUIRE(VALID_NMHANDLE(uvreq->handle));
|
||||
REQUIRE(uvreq->cb.connect != NULL);
|
||||
|
||||
uvreq->result = eresult;
|
||||
|
||||
if (!async) {
|
||||
isc__nm_connectcb_job(uvreq);
|
||||
isc___nm_connectcb(uvreq);
|
||||
return;
|
||||
}
|
||||
|
||||
isc_job_run(sock->worker->netmgr->loopmgr, isc__nm_connectcb_job,
|
||||
uvreq);
|
||||
isc_job_run(sock->worker->loop, &uvreq->job, isc___nm_connectcb, uvreq);
|
||||
}
|
||||
|
||||
static void
|
||||
isc__nm_readcb_job(void *arg) {
|
||||
isc___nm_readcb(void *arg) {
|
||||
isc__nm_uvreq_t *uvreq = arg;
|
||||
isc_result_t eresult = uvreq->result;
|
||||
isc_nmsocket_t *sock = uvreq->handle->sock;
|
||||
isc_region_t region;
|
||||
|
||||
region.base = (unsigned char *)uvreq->uvbuf.base;
|
||||
region.length = uvreq->uvbuf.len;
|
||||
uvreq->cb.recv(uvreq->handle, uvreq->result, ®ion, uvreq->cbarg);
|
||||
|
||||
uvreq->cb.recv(uvreq->handle, eresult, ®ion, uvreq->cbarg);
|
||||
|
||||
isc__nm_uvreq_put(&uvreq, sock);
|
||||
isc__nm_uvreq_put(&uvreq);
|
||||
}
|
||||
|
||||
void
|
||||
|
|
@ -1891,25 +1867,23 @@ isc__nm_readcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
|
|||
REQUIRE(VALID_NMSOCK(sock));
|
||||
REQUIRE(VALID_UVREQ(uvreq));
|
||||
REQUIRE(VALID_NMHANDLE(uvreq->handle));
|
||||
|
||||
uvreq->result = eresult;
|
||||
|
||||
if (!async) {
|
||||
isc__nm_readcb_job(uvreq);
|
||||
isc___nm_readcb(uvreq);
|
||||
return;
|
||||
}
|
||||
|
||||
isc_job_run(sock->worker->netmgr->loopmgr, isc__nm_readcb_job, uvreq);
|
||||
isc_job_run(sock->worker->loop, &uvreq->job, isc___nm_readcb, uvreq);
|
||||
}
|
||||
|
||||
static void
|
||||
isc__nm_sendcb_job(void *arg) {
|
||||
isc___nm_sendcb(void *arg) {
|
||||
isc__nm_uvreq_t *uvreq = arg;
|
||||
isc_result_t eresult = uvreq->result;
|
||||
isc_nmsocket_t *sock = uvreq->handle->sock;
|
||||
|
||||
uvreq->cb.send(uvreq->handle, eresult, uvreq->cbarg);
|
||||
|
||||
isc__nm_uvreq_put(&uvreq, sock);
|
||||
uvreq->cb.send(uvreq->handle, uvreq->result, uvreq->cbarg);
|
||||
isc__nm_uvreq_put(&uvreq);
|
||||
}
|
||||
|
||||
void
|
||||
|
|
@ -1918,14 +1892,15 @@ isc__nm_sendcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
|
|||
REQUIRE(VALID_NMSOCK(sock));
|
||||
REQUIRE(VALID_UVREQ(uvreq));
|
||||
REQUIRE(VALID_NMHANDLE(uvreq->handle));
|
||||
|
||||
uvreq->result = eresult;
|
||||
|
||||
if (!async) {
|
||||
isc__nm_sendcb_job(uvreq);
|
||||
isc___nm_sendcb(uvreq);
|
||||
return;
|
||||
}
|
||||
|
||||
isc_job_run(sock->worker->netmgr->loopmgr, isc__nm_sendcb_job, uvreq);
|
||||
isc_job_run(sock->worker->loop, &uvreq->job, isc___nm_sendcb, uvreq);
|
||||
}
|
||||
|
||||
static void
|
||||
|
|
|
|||
|
|
@ -103,8 +103,10 @@ static void
|
|||
streamdns_readmore(isc_nmsocket_t *sock, isc_nmhandle_t *transphandle) {
|
||||
streamdns_resumeread(sock, transphandle);
|
||||
|
||||
/* Restart the timer only if there's a last single active handle */
|
||||
isc_nmhandle_t *handle = ISC_LIST_HEAD(sock->active_handles);
|
||||
if (handle != NULL && ISC_LIST_NEXT(handle, active_link) == NULL) {
|
||||
INSIST(handle != NULL);
|
||||
if (ISC_LIST_NEXT(handle, active_link) == NULL) {
|
||||
isc__nmsocket_timer_start(sock);
|
||||
}
|
||||
}
|
||||
|
|
@ -891,7 +893,7 @@ isc__nm_streamdns_send(isc_nmhandle_t *handle, const isc_region_t *region,
|
|||
REQUIRE(sock->type == isc_nm_streamdnssocket);
|
||||
REQUIRE(sock->tid == isc_tid());
|
||||
|
||||
uvreq = isc__nm_uvreq_get(sock->worker, sock);
|
||||
uvreq = isc__nm_uvreq_get(sock);
|
||||
isc_nmhandle_attach(handle, &uvreq->handle);
|
||||
uvreq->cb.send = cb;
|
||||
uvreq->cbarg = cbarg;
|
||||
|
|
@ -915,7 +917,7 @@ isc__nm_streamdns_send(isc_nmhandle_t *handle, const isc_region_t *region,
|
|||
isc__nm_senddns(sock->outerhandle, &data, streamdns_writecb,
|
||||
(void *)send_req);
|
||||
|
||||
isc__nm_uvreq_put(&uvreq, sock);
|
||||
isc__nm_uvreq_put(&uvreq);
|
||||
}
|
||||
|
||||
static void
|
||||
|
|
|
|||
|
|
@ -197,7 +197,7 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
|
|||
* The connect was cancelled from timeout; just clean up
|
||||
* the req.
|
||||
*/
|
||||
isc__nm_uvreq_put(&req, sock);
|
||||
isc__nm_uvreq_put(&req);
|
||||
return;
|
||||
} else if (isc__nm_closing(worker)) {
|
||||
/* Network manager shutting down */
|
||||
|
|
@ -291,7 +291,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
|
|||
sock->fd = fd;
|
||||
atomic_init(&sock->client, true);
|
||||
|
||||
req = isc__nm_uvreq_get(worker, sock);
|
||||
req = isc__nm_uvreq_get(sock);
|
||||
req->cb.connect = cb;
|
||||
req->cbarg = cbarg;
|
||||
req->peer = *peer;
|
||||
|
|
@ -1009,7 +1009,7 @@ tcp_send(isc_nmhandle_t *handle, const isc_region_t *region, isc_nm_cb_t cb,
|
|||
REQUIRE(sock->type == isc_nm_tcpsocket);
|
||||
REQUIRE(sock->tid == isc_tid());
|
||||
|
||||
uvreq = isc__nm_uvreq_get(sock->worker, sock);
|
||||
uvreq = isc__nm_uvreq_get(sock);
|
||||
if (dnsmsg) {
|
||||
*(uint16_t *)uvreq->tcplen = htons(region->length);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -981,7 +981,7 @@ tls_send_direct(void *arg) {
|
|||
|
||||
tls_do_bio(sock, NULL, req, false);
|
||||
done:
|
||||
isc__nm_uvreq_put(&req, sock);
|
||||
isc__nm_uvreq_put(&req);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -998,7 +998,7 @@ tls_send(isc_nmhandle_t *handle, const isc_region_t *region, isc_nm_cb_t cb,
|
|||
|
||||
REQUIRE(sock->type == isc_nm_tlssocket);
|
||||
|
||||
uvreq = isc__nm_uvreq_get(sock->worker, sock);
|
||||
uvreq = isc__nm_uvreq_get(sock);
|
||||
isc_nmhandle_attach(handle, &uvreq->handle);
|
||||
uvreq->cb.send = cb;
|
||||
uvreq->cbarg = cbarg;
|
||||
|
|
|
|||
|
|
@ -388,7 +388,7 @@ isc_nm_routeconnect(isc_nm_t *mgr, isc_nm_cb_t cb, void *cbarg) {
|
|||
sock->route_sock = true;
|
||||
sock->fd = fd;
|
||||
|
||||
req = isc__nm_uvreq_get(worker, sock);
|
||||
req = isc__nm_uvreq_get(sock);
|
||||
req->cb.connect = cb;
|
||||
req->cbarg = cbarg;
|
||||
req->handle = isc__nmhandle_get(sock, NULL, NULL);
|
||||
|
|
@ -681,7 +681,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region,
|
|||
return;
|
||||
}
|
||||
|
||||
uvreq = isc__nm_uvreq_get(sock->worker, sock);
|
||||
uvreq = isc__nm_uvreq_get(sock);
|
||||
uvreq->uvbuf.base = (char *)region->base;
|
||||
uvreq->uvbuf.len = region->length;
|
||||
|
||||
|
|
@ -827,7 +827,7 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
|
|||
(void)isc__nm_socket_min_mtu(sock->fd, sa_family);
|
||||
|
||||
/* Initialize the request */
|
||||
req = isc__nm_uvreq_get(worker, sock);
|
||||
req = isc__nm_uvreq_get(sock);
|
||||
req->cb.connect = cb;
|
||||
req->cbarg = cbarg;
|
||||
req->peer = *peer;
|
||||
|
|
|
|||
|
|
@ -702,7 +702,7 @@ doh_receive_send_reply_cb(isc_nmhandle_t *handle, isc_result_t eresult,
|
|||
assert_true(eresult == ISC_R_SUCCESS);
|
||||
}
|
||||
|
||||
isc_job_run(loopmgr, doh_connect_thread, connect_nm);
|
||||
isc_async_current(loopmgr, doh_connect_thread, connect_nm);
|
||||
}
|
||||
if (sends <= 0) {
|
||||
isc_loopmgr_shutdown(loopmgr);
|
||||
|
|
|
|||
|
|
@ -39,37 +39,55 @@ static atomic_uint executed;
|
|||
|
||||
#define MAX_EXECUTED 1000000
|
||||
|
||||
struct test_arg {
|
||||
isc_job_t job;
|
||||
union {
|
||||
int n;
|
||||
void *ptr;
|
||||
} arg;
|
||||
};
|
||||
|
||||
static void
|
||||
shutdown_cb(void *arg) {
|
||||
UNUSED(arg);
|
||||
struct test_arg *ta = arg;
|
||||
|
||||
isc_mem_put(mctx, ta, sizeof(*ta));
|
||||
|
||||
isc_loopmgr_shutdown(loopmgr);
|
||||
}
|
||||
|
||||
static void
|
||||
job_cb(void *arg __attribute__((__unused__))) {
|
||||
job_cb(void *arg) {
|
||||
struct test_arg *ta = arg;
|
||||
unsigned int n = atomic_fetch_add(&executed, 1);
|
||||
|
||||
if (n <= MAX_EXECUTED) {
|
||||
atomic_fetch_add(&scheduled, 1);
|
||||
isc_job_run(loopmgr, job_cb, loopmgr);
|
||||
isc_job_run(isc_loop_current(loopmgr), &ta->job, job_cb, ta);
|
||||
} else {
|
||||
isc_job_run(loopmgr, shutdown_cb, loopmgr);
|
||||
isc_job_run(isc_loop_current(loopmgr), &ta->job, shutdown_cb,
|
||||
ta);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
job_run_cb(void *arg __attribute__((__unused__))) {
|
||||
job_run_cb(void *arg) {
|
||||
struct test_arg *ta = arg;
|
||||
atomic_fetch_add(&scheduled, 1);
|
||||
|
||||
isc_job_run(loopmgr, job_cb, loopmgr);
|
||||
if (arg == NULL) {
|
||||
ta = isc_mem_get(mctx, sizeof(*ta));
|
||||
*ta = (struct test_arg){ .job = ISC_JOB_INITIALIZER };
|
||||
}
|
||||
|
||||
isc_job_run(isc_loop_current(loopmgr), &ta->job, job_cb, ta);
|
||||
}
|
||||
|
||||
ISC_RUN_TEST_IMPL(isc_job_run) {
|
||||
atomic_init(&scheduled, 0);
|
||||
atomic_init(&executed, 0);
|
||||
|
||||
isc_loopmgr_setup(loopmgr, job_run_cb, loopmgr);
|
||||
isc_loopmgr_setup(loopmgr, job_run_cb, NULL);
|
||||
|
||||
isc_loopmgr_run(loopmgr);
|
||||
|
||||
|
|
@ -77,12 +95,18 @@ ISC_RUN_TEST_IMPL(isc_job_run) {
|
|||
}
|
||||
|
||||
static char string[32] = "";
|
||||
int n1 = 1, n2 = 2, n3 = 3, n4 = 4, n5 = 5;
|
||||
struct test_arg n1 = { .job = ISC_JOB_INITIALIZER, .arg.n = 1 };
|
||||
struct test_arg n2 = { .job = ISC_JOB_INITIALIZER, .arg.n = 2 };
|
||||
struct test_arg n3 = { .job = ISC_JOB_INITIALIZER, .arg.n = 3 };
|
||||
struct test_arg n4 = { .job = ISC_JOB_INITIALIZER, .arg.n = 4 };
|
||||
struct test_arg n5 = { .job = ISC_JOB_INITIALIZER, .arg.n = 5 };
|
||||
|
||||
static void
|
||||
append(void *arg) {
|
||||
struct test_arg *ta = arg;
|
||||
|
||||
char value[32];
|
||||
sprintf(value, "%d", *(int *)arg);
|
||||
sprintf(value, "%d", ta->arg.n);
|
||||
strlcat(string, value, 10);
|
||||
}
|
||||
|
||||
|
|
@ -90,12 +114,12 @@ static void
|
|||
job_multiple(void *arg) {
|
||||
UNUSED(arg);
|
||||
|
||||
/* These will be processed in reverse order */
|
||||
isc_job_run(loopmgr, append, &n1);
|
||||
isc_job_run(loopmgr, append, &n2);
|
||||
isc_job_run(loopmgr, append, &n3);
|
||||
isc_job_run(loopmgr, append, &n4);
|
||||
isc_job_run(loopmgr, append, &n5);
|
||||
/* These will be processed in normal order */
|
||||
isc_job_run(mainloop, &n1.job, append, &n1);
|
||||
isc_job_run(mainloop, &n2.job, append, &n2);
|
||||
isc_job_run(mainloop, &n3.job, append, &n3);
|
||||
isc_job_run(mainloop, &n4.job, append, &n4);
|
||||
isc_job_run(mainloop, &n5.job, append, &n5);
|
||||
isc_loopmgr_shutdown(loopmgr);
|
||||
}
|
||||
|
||||
|
|
@ -103,7 +127,7 @@ ISC_RUN_TEST_IMPL(isc_job_multiple) {
|
|||
string[0] = '\0';
|
||||
isc_loop_setup(isc_loop_main(loopmgr), job_multiple, loopmgr);
|
||||
isc_loopmgr_run(loopmgr);
|
||||
assert_string_equal(string, "54321");
|
||||
assert_string_equal(string, "12345");
|
||||
}
|
||||
|
||||
ISC_TEST_LIST_START
|
||||
|
|
|
|||
|
|
@ -66,11 +66,11 @@ ISC_RUN_TEST_IMPL(isc_loopmgr) {
|
|||
|
||||
static void
|
||||
runjob(void *arg __attribute__((__unused__))) {
|
||||
if (isc_tid() == 0) {
|
||||
isc_job_run(loopmgr, shutdown_loopmgr, loopmgr);
|
||||
}
|
||||
isc_async_current(loopmgr, count, loopmgr);
|
||||
|
||||
isc_job_run(loopmgr, count, loopmgr);
|
||||
if (isc_tid() == 0) {
|
||||
isc_async_current(loopmgr, shutdown_loopmgr, loopmgr);
|
||||
}
|
||||
}
|
||||
|
||||
ISC_RUN_TEST_IMPL(isc_loopmgr_runjob) {
|
||||
|
|
|
|||
|
|
@ -368,8 +368,9 @@ connect_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
|
|||
if (have_expected_cconnects(atomic_fetch_add(&cconnects, 1) + 1)) {
|
||||
do_cconnects_shutdown(loopmgr);
|
||||
} else if (do_send) {
|
||||
isc_job_run(loopmgr, stream_recv_send_connect,
|
||||
(cbarg == NULL ? get_stream_connect_function()
|
||||
isc_async_current(loopmgr, stream_recv_send_connect,
|
||||
(cbarg == NULL
|
||||
? get_stream_connect_function()
|
||||
: (stream_connect_function)cbarg));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -100,25 +100,21 @@ ISC_LOOP_TEST_IMPL(tcpdns_timeout_recovery) {
|
|||
connect_readcb = timeout_retry_cb;
|
||||
isc_nm_settimeouts(connect_nm, T_SOFT, T_SOFT, T_SOFT, T_SOFT);
|
||||
|
||||
isc_async_run(isc_loop_current(loopmgr), stream_recv_send_connect,
|
||||
tcpdns_connect);
|
||||
isc_async_current(loopmgr, stream_recv_send_connect, tcpdns_connect);
|
||||
}
|
||||
|
||||
ISC_LOOP_TEST_IMPL(tcpdns_recv_one) {
|
||||
start_listening(ISC_NM_LISTEN_ONE, listen_accept_cb, listen_read_cb);
|
||||
|
||||
isc_async_run(isc_loop_current(loopmgr), stream_recv_send_connect,
|
||||
tcpdns_connect);
|
||||
isc_async_current(loopmgr, stream_recv_send_connect, tcpdns_connect);
|
||||
}
|
||||
|
||||
ISC_LOOP_TEST_IMPL(tcpdns_recv_two) {
|
||||
start_listening(ISC_NM_LISTEN_ONE, listen_accept_cb, listen_read_cb);
|
||||
|
||||
isc_async_run(isc_loop_current(loopmgr), stream_recv_send_connect,
|
||||
tcpdns_connect);
|
||||
isc_async_current(loopmgr, stream_recv_send_connect, tcpdns_connect);
|
||||
|
||||
isc_async_run(isc_loop_current(loopmgr), stream_recv_send_connect,
|
||||
tcpdns_connect);
|
||||
isc_async_current(loopmgr, stream_recv_send_connect, tcpdns_connect);
|
||||
}
|
||||
|
||||
ISC_LOOP_TEST_IMPL(tcpdns_recv_send) {
|
||||
|
|
|
|||
|
|
@ -540,7 +540,7 @@ ISC_TEARDOWN_TEST_IMPL(udp_shutdown_connect) {
|
|||
ISC_LOOP_TEST_IMPL(udp_shutdown_connect) {
|
||||
isc_loopmgr_shutdown(loopmgr);
|
||||
isc_refcount_increment0(&active_cconnects);
|
||||
isc_job_run(loopmgr, udp_connect_udpconnect, netmgr);
|
||||
isc_async_current(loopmgr, udp_connect_udpconnect, netmgr);
|
||||
}
|
||||
|
||||
static void
|
||||
|
|
@ -864,7 +864,7 @@ udp__connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
|
|||
{
|
||||
do_cconnects_shutdown(loopmgr);
|
||||
} else if (do_send) {
|
||||
isc_job_run(loopmgr, udp__connect, cbarg);
|
||||
isc_async_current(loopmgr, udp__connect, cbarg);
|
||||
}
|
||||
|
||||
isc_refcount_increment0(&active_creads);
|
||||
|
|
|
|||
Loading…
Reference in a new issue