- random port selection out of the configged ports.

- fixup threadsafety for libevent-1.4.3+ (event_base_get_method).



git-svn-id: file:///svn/unbound/trunk@1029 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
Wouter Wijngaards 2008-04-11 13:24:49 +00:00
parent 234defa371
commit a8bf62f962
18 changed files with 772 additions and 336 deletions

View file

@ -27,6 +27,9 @@
/* Define to 1 if you have the `event_base_free' function. */ /* Define to 1 if you have the `event_base_free' function. */
#undef HAVE_EVENT_BASE_FREE #undef HAVE_EVENT_BASE_FREE
/* Define to 1 if you have the `event_base_get_method' function. */
#undef HAVE_EVENT_BASE_GET_METHOD
/* Define to 1 if you have the `event_base_once' function. */ /* Define to 1 if you have the `event_base_once' function. */
#undef HAVE_EVENT_BASE_ONCE #undef HAVE_EVENT_BASE_ONCE

94
configure vendored
View file

@ -22338,6 +22338,100 @@ _ACEOF
fi fi
done done
# only in libevent 1.4? and later # only in libevent 1.4? and later
for ac_func in event_base_get_method
do
as_ac_var=`echo "ac_cv_func_$ac_func" | $as_tr_sh`
{ echo "$as_me:$LINENO: checking for $ac_func" >&5
echo $ECHO_N "checking for $ac_func... $ECHO_C" >&6; }
if { as_var=$as_ac_var; eval "test \"\${$as_var+set}\" = set"; }; then
echo $ECHO_N "(cached) $ECHO_C" >&6
else
cat >conftest.$ac_ext <<_ACEOF
/* confdefs.h. */
_ACEOF
cat confdefs.h >>conftest.$ac_ext
cat >>conftest.$ac_ext <<_ACEOF
/* end confdefs.h. */
/* Define $ac_func to an innocuous variant, in case <limits.h> declares $ac_func.
For example, HP-UX 11i <limits.h> declares gettimeofday. */
#define $ac_func innocuous_$ac_func
/* System header to define __stub macros and hopefully few prototypes,
which can conflict with char $ac_func (); below.
Prefer <limits.h> to <assert.h> if __STDC__ is defined, since
<limits.h> exists even on freestanding compilers. */
#ifdef __STDC__
# include <limits.h>
#else
# include <assert.h>
#endif
#undef $ac_func
/* Override any GCC internal prototype to avoid an error.
Use char because int might match the return type of a GCC
builtin and then its argument prototype would still apply. */
#ifdef __cplusplus
extern "C"
#endif
char $ac_func ();
/* The GNU C library defines this for functions which it implements
to always fail with ENOSYS. Some functions are actually named
something starting with __ and the normal name is an alias. */
#if defined __stub_$ac_func || defined __stub___$ac_func
choke me
#endif
int
main ()
{
return $ac_func ();
;
return 0;
}
_ACEOF
rm -f conftest.$ac_objext conftest$ac_exeext
if { (ac_try="$ac_link"
case "(($ac_try" in
*\"* | *\`* | *\\*) ac_try_echo=\$ac_try;;
*) ac_try_echo=$ac_try;;
esac
eval "echo \"\$as_me:$LINENO: $ac_try_echo\"") >&5
(eval "$ac_link") 2>conftest.er1
ac_status=$?
grep -v '^ *+' conftest.er1 >conftest.err
rm -f conftest.er1
cat conftest.err >&5
echo "$as_me:$LINENO: \$? = $ac_status" >&5
(exit $ac_status); } && {
test -z "$ac_c_werror_flag" ||
test ! -s conftest.err
} && test -s conftest$ac_exeext &&
$as_test_x conftest$ac_exeext; then
eval "$as_ac_var=yes"
else
echo "$as_me: failed program was:" >&5
sed 's/^/| /' conftest.$ac_ext >&5
eval "$as_ac_var=no"
fi
rm -f core conftest.err conftest.$ac_objext conftest_ipa8_conftest.oo \
conftest$ac_exeext conftest.$ac_ext
fi
ac_res=`eval echo '${'$as_ac_var'}'`
{ echo "$as_me:$LINENO: result: $ac_res" >&5
echo "${ECHO_T}$ac_res" >&6; }
if test `eval echo '${'$as_ac_var'}'` = yes; then
cat >>confdefs.h <<_ACEOF
#define `echo "HAVE_$ac_func" | $as_tr_cpp` 1
_ACEOF
fi
done
# only in libevent 1.4.3 and later
if test -n "$BAK_LDFLAGS"; then if test -n "$BAK_LDFLAGS"; then
LDFLAGS="$BAK_LDFLAGS" LDFLAGS="$BAK_LDFLAGS"
fi fi

View file

@ -555,6 +555,7 @@ large outgoing port ranges. ])
AC_CHECK_HEADERS([event.h],,, [AC_INCLUDES_DEFAULT]) AC_CHECK_HEADERS([event.h],,, [AC_INCLUDES_DEFAULT])
AC_CHECK_FUNCS([event_base_free]) # only in libevent 1.2 and later AC_CHECK_FUNCS([event_base_free]) # only in libevent 1.2 and later
AC_CHECK_FUNCS([event_base_once]) # only in libevent 1.4? and later AC_CHECK_FUNCS([event_base_once]) # only in libevent 1.4? and later
AC_CHECK_FUNCS([event_base_get_method]) # only in libevent 1.4.3 and later
if test -n "$BAK_LDFLAGS"; then if test -n "$BAK_LDFLAGS"; then
LDFLAGS="$BAK_LDFLAGS" LDFLAGS="$BAK_LDFLAGS"
fi fi

View file

@ -53,6 +53,7 @@
#include "services/localzone.h" #include "services/localzone.h"
#include "services/modstack.h" #include "services/modstack.h"
#include "util/module.h" #include "util/module.h"
#include "util/random.h"
#include <signal.h> #include <signal.h>
/** How many quit requests happened. */ /** How many quit requests happened. */
@ -172,23 +173,73 @@ static void daemon_setup_modules(struct daemon* daemon)
} }
} }
/**
* Obtain allowed port numbers, concatenate the list, and shuffle them
* (ready to be handed out to threads).
* @param daemon: the daemon. Uses rand and cfg.
* @param shufport: the portlist output.
* @return number of ports available.
*/
int daemon_get_shufport(struct daemon* daemon, int* shufport)
{
int i, n, k, temp;
int avail = 0;
for(i=0; i<65536; i++) {
if(daemon->cfg->outgoing_avail_ports[i]) {
shufport[avail++] = daemon->cfg->
outgoing_avail_ports[i];
}
}
if(avail == 0)
fatal_exit("no ports are permitted for UDP, add "
"with outgoing-port-permit");
/* Knuth shuffle */
n = avail;
while(--n > 0) {
k = ub_random(daemon->rand) % (n+1); /* 0<= k<= n */
temp = shufport[k];
shufport[k] = shufport[n];
shufport[n] = temp;
}
return avail;
}
/** /**
* Allocate empty worker structures. With backptr and thread-number, * Allocate empty worker structures. With backptr and thread-number,
* from 0..numthread initialised. Used as user arguments to new threads. * from 0..numthread initialised. Used as user arguments to new threads.
* Creates the daemon random generator if it does not exist yet.
* The random generator stays existing between reloads with a unique state.
* @param daemon: the daemon with (new) config settings. * @param daemon: the daemon with (new) config settings.
*/ */
static void static void
daemon_create_workers(struct daemon* daemon) daemon_create_workers(struct daemon* daemon)
{ {
int i; int i, numport;
int* shufport;
log_assert(daemon && daemon->cfg); log_assert(daemon && daemon->cfg);
if(!daemon->rand) {
unsigned int seed = (unsigned int)time(NULL) ^
(unsigned int)getpid() ^ 0x438;
daemon->rand = ub_initstate(seed, NULL);
if(!daemon->rand)
fatal_exit("could not init random generator");
}
shufport = (int*)calloc(65536, sizeof(int));
if(!shufport)
fatal_exit("out of memory during daemon init");
numport = daemon_get_shufport(daemon, shufport);
daemon->num = daemon->cfg->num_threads; daemon->num = daemon->cfg->num_threads;
daemon->workers = (struct worker**)calloc((size_t)daemon->num, daemon->workers = (struct worker**)calloc((size_t)daemon->num,
sizeof(struct worker*)); sizeof(struct worker*));
for(i=0; i<daemon->num; i++) { for(i=0; i<daemon->num; i++) {
if(!(daemon->workers[i] = worker_create(daemon, i))) if(!(daemon->workers[i] = worker_create(daemon, i,
shufport+numport*i/daemon->num,
numport*(i+1)/daemon->num - numport*i/daemon->num)))
/* the above is not ports/numthr, due to rounding */
fatal_exit("could not create worker"); fatal_exit("could not create worker");
} }
free(shufport);
} }
/** /**
@ -363,6 +414,7 @@ daemon_delete(struct daemon* daemon)
rrset_cache_delete(daemon->env->rrset_cache); rrset_cache_delete(daemon->env->rrset_cache);
infra_delete(daemon->env->infra_cache); infra_delete(daemon->env->infra_cache);
} }
ub_randfree(daemon->rand);
alloc_clear(&daemon->superalloc); alloc_clear(&daemon->superalloc);
acl_list_delete(daemon->acl); acl_list_delete(daemon->acl);
free(daemon->pidfile); free(daemon->pidfile);

View file

@ -53,6 +53,7 @@ struct module_env;
struct rrset_cache; struct rrset_cache;
struct acl_list; struct acl_list;
struct local_zones; struct local_zones;
struct ub_randstate;
/** /**
* Structure holding worker list. * Structure holding worker list.
@ -73,6 +74,8 @@ struct daemon {
struct worker** workers; struct worker** workers;
/** do we need to exit unbound (or is it only a reload?) */ /** do we need to exit unbound (or is it only a reload?) */
int need_to_exit; int need_to_exit;
/** master random table ; used for port div between threads on reload*/
struct ub_randstate* rand;
/** master allocation cache */ /** master allocation cache */
struct alloc_cache superalloc; struct alloc_cache superalloc;
/** the module environment master value, copied and changed by threads*/ /** the module environment master value, copied and changed by threads*/

View file

@ -892,12 +892,18 @@ void worker_stat_timer_cb(void* arg)
} }
struct worker* struct worker*
worker_create(struct daemon* daemon, int id) worker_create(struct daemon* daemon, int id, int* ports, int n)
{ {
struct worker* worker = (struct worker*)calloc(1, struct worker* worker = (struct worker*)calloc(1,
sizeof(struct worker)); sizeof(struct worker));
if(!worker) if(!worker)
return NULL; return NULL;
worker->numports = n;
worker->ports = (int*)memdup(ports, sizeof(int)*n);
if(!worker->ports) {
free(worker);
return NULL;
}
worker->daemon = daemon; worker->daemon = daemon;
worker->thread_num = id; worker->thread_num = id;
worker->cmd_send_fd = -1; worker->cmd_send_fd = -1;
@ -980,7 +986,7 @@ worker_init(struct worker* worker, struct config_file *cfg,
cfg->out_ifs, cfg->num_out_ifs, cfg->do_ip4, cfg->do_ip6, cfg->out_ifs, cfg->num_out_ifs, cfg->do_ip4, cfg->do_ip6,
startport, cfg->do_tcp?cfg->outgoing_num_tcp:0, startport, cfg->do_tcp?cfg->outgoing_num_tcp:0,
worker->daemon->env->infra_cache, worker->rndstate, worker->daemon->env->infra_cache, worker->rndstate,
cfg->use_caps_bits_for_id); cfg->use_caps_bits_for_id, worker->ports, worker->numports);
if(!worker->back) { if(!worker->back) {
log_err("could not create outgoing sockets"); log_err("could not create outgoing sockets");
worker_delete(worker); worker_delete(worker);
@ -1069,6 +1075,7 @@ worker_delete(struct worker* worker)
comm_signal_delete(worker->comsig); comm_signal_delete(worker->comsig);
comm_point_delete(worker->cmd_com); comm_point_delete(worker->cmd_com);
comm_timer_delete(worker->stat_timer); comm_timer_delete(worker->stat_timer);
free(worker->ports);
if(worker->thread_num == 0) if(worker->thread_num == 0)
log_set_time(NULL); log_set_time(NULL);
comm_base_delete(worker->base); comm_base_delete(worker->base);
@ -1093,12 +1100,10 @@ worker_send_packet(ldns_buffer* pkt, struct sockaddr_storage* addr,
struct worker* worker = q->env->worker; struct worker* worker = q->env->worker;
if(use_tcp) { if(use_tcp) {
return pending_tcp_query(worker->back, pkt, addr, addrlen, return pending_tcp_query(worker->back, pkt, addr, addrlen,
timeout, worker_handle_reply, q, timeout, worker_handle_reply, q) != 0;
worker->rndstate) != 0;
} }
return pending_udp_query(worker->back, pkt, addr, addrlen, return pending_udp_query(worker->back, pkt, addr, addrlen,
timeout*1000, worker_handle_reply, q, timeout*1000, worker_handle_reply, q) != 0;
worker->rndstate) != 0;
} }
/** compare outbound entry qstates */ /** compare outbound entry qstates */

View file

@ -86,6 +86,10 @@ struct worker {
struct listen_dnsport* front; struct listen_dnsport* front;
/** the backside outside network interface to the auth servers */ /** the backside outside network interface to the auth servers */
struct outside_network* back; struct outside_network* back;
/** ports to be used by this worker. */
int* ports;
/** number of ports for this worker */
int numports;
/** the signal handler */ /** the signal handler */
struct comm_signal* comsig; struct comm_signal* comsig;
/** commpoint to listen to commands. */ /** commpoint to listen to commands. */
@ -116,9 +120,11 @@ struct worker {
* with backpointers only. Use worker_init on it later. * with backpointers only. Use worker_init on it later.
* @param daemon: the daemon that this worker thread is part of. * @param daemon: the daemon that this worker thread is part of.
* @param id: the thread number from 0.. numthreads-1. * @param id: the thread number from 0.. numthreads-1.
* @param ports: the ports it is allowed to use, array.
* @param n: the number of ports.
* @return: the new worker or NULL on alloc failure. * @return: the new worker or NULL on alloc failure.
*/ */
struct worker* worker_create(struct daemon* daemon, int id); struct worker* worker_create(struct daemon* daemon, int id, int* ports, int n);
/** /**
* Initialize worker. * Initialize worker.

View file

@ -1,7 +1,11 @@
11 April 2008: Wouter
- random port selection out of the configged ports.
- fixup threadsafety for libevent-1.4.3+ (event_base_get_method).
10 April 2008: Wouter 10 April 2008: Wouter
- --with-libevent works with latest libevent 1.4.99-trunk. - --with-libevent works with latest libevent 1.4.99-trunk.
- added log file statistics perl script to contrib. - added log file statistics perl script to contrib.
- automatic iana ports update from makefile. - automatic iana ports update from makefile. 60058 available.
9 April 2008: Wouter 9 April 2008: Wouter
- configure can detect libev(from its build directory) when passed - configure can detect libev(from its build directory) when passed

View file

@ -65,6 +65,9 @@ server:
# deny unbound the use this of port number or port range for # deny unbound the use this of port number or port range for
# making outgoing queries, using an outgoing interface. # making outgoing queries, using an outgoing interface.
# Use this to make sure unbound does not grab a UDP port that some
# other server on this computer needs. The default is to avoid
# IANA-assigned port numbers.
# outgoing-port-avoid: "3200-3208" # outgoing-port-avoid: "3200-3208"
# number of outgoing simultaneous tcp buffers to hold per thread. # number of outgoing simultaneous tcp buffers to hold per thread.

View file

@ -93,6 +93,8 @@ libworker_setup(struct ub_ctx* ctx, int is_bg)
unsigned int seed; unsigned int seed;
struct libworker* w = (struct libworker*)calloc(1, sizeof(*w)); struct libworker* w = (struct libworker*)calloc(1, sizeof(*w));
struct config_file* cfg = ctx->env->cfg; struct config_file* cfg = ctx->env->cfg;
int* ports;
int numports;
if(!w) return NULL; if(!w) return NULL;
w->is_bg = is_bg; w->is_bg = is_bg;
w->ctx = ctx; w->ctx = ctx;
@ -149,14 +151,21 @@ libworker_setup(struct ub_ctx* ctx, int is_bg)
if(!w->is_bg || w->is_bg_thread) { if(!w->is_bg || w->is_bg_thread) {
lock_basic_lock(&ctx->cfglock); lock_basic_lock(&ctx->cfglock);
} }
numports = cfg_condense_ports(cfg, &ports);
if(numports == 0) {
libworker_delete(w);
return NULL;
}
w->back = outside_network_create(w->base, cfg->msg_buffer_size, w->back = outside_network_create(w->base, cfg->msg_buffer_size,
(size_t)cfg->outgoing_num_ports, cfg->out_ifs, (size_t)cfg->outgoing_num_ports, cfg->out_ifs,
cfg->num_out_ifs, cfg->do_ip4, cfg->do_ip6, -1, cfg->num_out_ifs, cfg->do_ip4, cfg->do_ip6, -1,
cfg->do_tcp?cfg->outgoing_num_tcp:0, cfg->do_tcp?cfg->outgoing_num_tcp:0,
w->env->infra_cache, w->env->rnd, cfg->use_caps_bits_for_id); w->env->infra_cache, w->env->rnd, cfg->use_caps_bits_for_id,
ports, numports);
if(!w->is_bg || w->is_bg_thread) { if(!w->is_bg || w->is_bg_thread) {
lock_basic_unlock(&ctx->cfglock); lock_basic_unlock(&ctx->cfglock);
} }
free(ports);
if(!w->back) { if(!w->back) {
libworker_delete(w); libworker_delete(w);
return NULL; return NULL;
@ -767,12 +776,10 @@ int libworker_send_packet(ldns_buffer* pkt, struct sockaddr_storage* addr,
struct libworker* w = (struct libworker*)q->env->worker; struct libworker* w = (struct libworker*)q->env->worker;
if(use_tcp) { if(use_tcp) {
return pending_tcp_query(w->back, pkt, addr, addrlen, return pending_tcp_query(w->back, pkt, addr, addrlen,
timeout, libworker_handle_reply, q, timeout, libworker_handle_reply, q) != 0;
q->env->rnd) != 0;
} }
return pending_udp_query(w->back, pkt, addr, addrlen, return pending_udp_query(w->back, pkt, addr, addrlen,
timeout*1000, libworker_handle_reply, q, timeout*1000, libworker_handle_reply, q) != 0;
q->env->rnd) != 0;
} }
/** compare outbound entry qstates */ /** compare outbound entry qstates */

View file

@ -86,7 +86,8 @@ verbose_print_addr(struct addrinfo *addr)
} }
int int
create_udp_sock(struct addrinfo *addr, int v6only, int* inuse) create_udp_sock(int family, int socktype, struct sockaddr* addr,
socklen_t addrlen, int v6only, int* inuse)
{ {
int s; int s;
# if defined(IPV6_USE_MIN_MTU) # if defined(IPV6_USE_MIN_MTU)
@ -94,13 +95,12 @@ create_udp_sock(struct addrinfo *addr, int v6only, int* inuse)
# else # else
(void)v6only; (void)v6only;
# endif # endif
verbose_print_addr(addr); if((s = socket(family, socktype, 0)) == -1) {
if((s = socket(addr->ai_family, addr->ai_socktype, 0)) == -1) {
log_err("can't create socket: %s", strerror(errno)); log_err("can't create socket: %s", strerror(errno));
*inuse = 0; *inuse = 0;
return -1; return -1;
} }
if(addr->ai_family == AF_INET6) { if(family == AF_INET6) {
# if defined(IPV6_V6ONLY) # if defined(IPV6_V6ONLY)
if(v6only) { if(v6only) {
int val=(v6only==2)?0:1; int val=(v6only==2)?0:1;
@ -131,7 +131,7 @@ create_udp_sock(struct addrinfo *addr, int v6only, int* inuse)
} }
# endif # endif
} }
if(bind(s, (struct sockaddr*)addr->ai_addr, addr->ai_addrlen) != 0) { if(bind(s, (struct sockaddr*)addr, addrlen) != 0) {
#ifdef EADDRINUSE #ifdef EADDRINUSE
*inuse = (errno == EADDRINUSE); *inuse = (errno == EADDRINUSE);
if(errno != EADDRINUSE) if(errno != EADDRINUSE)
@ -184,7 +184,7 @@ create_tcp_accept_sock(struct addrinfo *addr, int v6only)
#else #else
(void)v6only; (void)v6only;
#endif /* IPV6_V6ONLY */ #endif /* IPV6_V6ONLY */
if(bind(s, (struct sockaddr*)addr->ai_addr, addr->ai_addrlen) != 0) { if(bind(s, addr->ai_addr, addr->ai_addrlen) != 0) {
log_err("can't bind socket: %s", strerror(errno)); log_err("can't bind socket: %s", strerror(errno));
return -1; return -1;
} }
@ -221,7 +221,10 @@ make_sock(int stype, const char* ifname, const char* port,
return -1; return -1;
} }
if(stype == SOCK_DGRAM) { if(stype == SOCK_DGRAM) {
s = create_udp_sock(res, v6only, &inuse); verbose_print_addr(res);
s = create_udp_sock(res->ai_family, res->ai_socktype,
(struct sockaddr*)res->ai_addr,
res->ai_addrlen, v6only, &inuse);
if(s == -1 && inuse) { if(s == -1 && inuse) {
log_err("bind: address already in use"); log_err("bind: address already in use");
} }

View file

@ -45,7 +45,6 @@
#include "config.h" #include "config.h"
#include "util/netevent.h" #include "util/netevent.h"
struct listen_list; struct listen_list;
struct addrinfo;
struct config_file; struct config_file;
/** /**
@ -165,12 +164,16 @@ size_t listen_get_mem(struct listen_dnsport* listen);
/** /**
* Create and bind nonblocking UDP socket * Create and bind nonblocking UDP socket
* @param addr: address info ready to make socket. * @param family: for socket call.
* @param socktype: for socket call.
* @param addr: for bind call.
* @param addrlen: for bind call.
* @param v6only: if enabled, IP6 sockets get IP6ONLY option set. * @param v6only: if enabled, IP6 sockets get IP6ONLY option set.
* if enabled with value 2 IP6ONLY option is disabled. * if enabled with value 2 IP6ONLY option is disabled.
* @param inuse: on error, this is set true if the port was in use. * @param inuse: on error, this is set true if the port was in use.
* @return: the socket. -1 on error. * @return: the socket. -1 on error.
*/ */
int create_udp_sock(struct addrinfo* addr, int v6only, int* inuse); int create_udp_sock(int family, int socktype, struct sockaddr* addr,
socklen_t addrlen, int v6only, int* inuse);
#endif /* LISTEN_DNSPORT_H */ #endif /* LISTEN_DNSPORT_H */

View file

@ -61,12 +61,17 @@
/** number of times to retry making a random ID that is unique. */ /** number of times to retry making a random ID that is unique. */
#define MAX_ID_RETRY 1000 #define MAX_ID_RETRY 1000
/** number of times to retry finding interface, port that can be opened. */
#define MAX_PORT_RETRY 1000
/** number of retries on outgoing UDP queries */ /** number of retries on outgoing UDP queries */
#define OUTBOUND_UDP_RETRY 1 #define OUTBOUND_UDP_RETRY 1
/** initiate TCP transaction for serviced query */ /** initiate TCP transaction for serviced query */
static void serviced_tcp_initiate(struct outside_network* outnet, static void serviced_tcp_initiate(struct outside_network* outnet,
struct serviced_query* sq, ldns_buffer* buff); struct serviced_query* sq, ldns_buffer* buff);
/** with a fd available, randomize and send UDP */
static int randomize_and_send_udp(struct outside_network* outnet,
struct pending* pend, ldns_buffer* packet, int timeout);
int int
pending_cmp(const void* key1, const void* key2) pending_cmp(const void* key1, const void* key2)
@ -226,6 +231,53 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
return 0; return 0;
} }
/** lower use count on pc, see if it can be closed */
static void
portcomm_loweruse(struct outside_network* outnet, struct port_comm* pc)
{
struct port_if* pif;
pc->num_outstanding--;
if(pc->num_outstanding > 0) {
return;
}
/* close it and replace in unused list */
comm_point_close(pc->cp);
pif = pc->pif;
log_assert(pif->inuse > 0);
pif->avail_ports[pif->avail_total - pif->inuse] = pc->number;
pif->inuse--;
pif->out[pc->index] = pif->out[pif->inuse];
pc->next = outnet->unused_fds;
outnet->unused_fds = pc;
}
/** try to send waiting UDP queries */
static void
outnet_send_wait_udp(struct outside_network* outnet)
{
struct pending* pend;
/* process waiting queries */
while(outnet->udp_wait_first && outnet->unused_fds) {
pend = outnet->udp_wait_first;
outnet->udp_wait_first = pend->next_waiting;
if(!pend->next_waiting) outnet->udp_wait_last = NULL;
ldns_buffer_clear(outnet->udp_buff);
ldns_buffer_write(outnet->udp_buff, pend->pkt, pend->pkt_len);
ldns_buffer_flip(outnet->udp_buff);
free(pend->pkt); /* freeing now makes get_mem correct */
pend->pkt = NULL;
pend->pkt_len = 0;
if(!randomize_and_send_udp(outnet, pend, outnet->udp_buff,
pend->timeout)) {
/* callback error on pending */
fptr_ok(fptr_whitelist_pending_udp(pend->cb));
(void)(*pend->cb)(pend->pc->cp, pend->cb_arg,
NETEVENT_CLOSED, NULL);
pending_delete(outnet, pend);
}
}
}
int int
outnet_udp_cb(struct comm_point* c, void* arg, int error, outnet_udp_cb(struct comm_point* c, void* arg, int error,
struct comm_reply *reply_info) struct comm_reply *reply_info)
@ -246,7 +298,7 @@ outnet_udp_cb(struct comm_point* c, void* arg, int error,
log_assert(reply_info); log_assert(reply_info);
/* setup lookup key */ /* setup lookup key */
key.id = LDNS_ID_WIRE(ldns_buffer_begin(c->buffer)); key.id = (unsigned)LDNS_ID_WIRE(ldns_buffer_begin(c->buffer));
memcpy(&key.addr, &reply_info->addr, reply_info->addrlen); memcpy(&key.addr, &reply_info->addr, reply_info->addrlen);
key.addrlen = reply_info->addrlen; key.addrlen = reply_info->addrlen;
verbose(VERB_ALGO, "Incoming reply id = %4.4x", key.id); verbose(VERB_ALGO, "Incoming reply id = %4.4x", key.id);
@ -264,7 +316,7 @@ outnet_udp_cb(struct comm_point* c, void* arg, int error,
verbose(VERB_ALGO, "received udp reply."); verbose(VERB_ALGO, "received udp reply.");
log_buf(VERB_ALGO, "udp message", c->buffer); log_buf(VERB_ALGO, "udp message", c->buffer);
if(p->c != c) { if(p->pc->cp != c) {
verbose(VERB_QUERY, "received reply id,addr on wrong port. " verbose(VERB_QUERY, "received reply id,addr on wrong port. "
"dropped."); "dropped.");
return 0; return 0;
@ -274,134 +326,36 @@ outnet_udp_cb(struct comm_point* c, void* arg, int error,
/* delete from tree first in case callback creates a retry */ /* delete from tree first in case callback creates a retry */
(void)rbtree_delete(outnet->pending, p->node.key); (void)rbtree_delete(outnet->pending, p->node.key);
fptr_ok(fptr_whitelist_pending_udp(p->cb)); fptr_ok(fptr_whitelist_pending_udp(p->cb));
(void)(*p->cb)(p->c, p->cb_arg, NETEVENT_NOERROR, reply_info); (void)(*p->cb)(p->pc->cp, p->cb_arg, NETEVENT_NOERROR, reply_info);
p->c->inuse--; portcomm_loweruse(outnet, p->pc);
if(p->c->inuse == 0)
comm_point_stop_listening(p->c);
pending_delete(NULL, p); pending_delete(NULL, p);
outnet_send_wait_udp(outnet);
return 0; return 0;
} }
/** open another udp port to listen to, every thread has its own range /** calculate number of ip4 and ip6 interfaces*/
* of open ports.
* @param ifname: on which interface to open the port.
* @param hints: hints on family and passiveness preset.
* @param porthint: if not -1, it gives the port to base range on.
* @param inuse: on error, true if the port was in use.
* @return: file descriptor
*/
static int
open_udp_port_range(const char* ifname, struct addrinfo* hints, int porthint,
int* inuse)
{
struct addrinfo *res = NULL;
int r, s;
char portstr[32];
if(porthint != -1)
snprintf(portstr, sizeof(portstr), "%d", porthint);
else if(!ifname) {
if(hints->ai_family == AF_INET)
ifname = "0.0.0.0";
else ifname="::";
}
if((r=getaddrinfo(ifname, ((porthint==-1)?NULL:portstr), hints,
&res)) != 0 || !res) {
log_err("node %s %s getaddrinfo: %s %s",
ifname?ifname:"default", (porthint!=-1)?portstr:"eph",
gai_strerror(r),
r==EAI_SYSTEM?(char*)strerror(errno):"");
return -1;
}
s = create_udp_sock(res, 1, inuse);
freeaddrinfo(res);
return s;
}
/**
* Create range of UDP ports on the given interface.
* Returns number of ports bound.
* @param coms: communication point array start position. Filled with entries.
* @param ifname: name of interface to make port on.
* @param num_ports: number of ports opened.
* @param do_ip4: if true make ip4 ports.
* @param do_ip6: if true make ip6 ports.
* @param porthint: -1 for system chosen port, or a base of port range.
* @param outnet: network structure with comm base, shared udp buffer.
* @return: the number of ports successfully opened, entries filled in coms.
*/
static size_t
make_udp_range(struct comm_point** coms, const char* ifname,
size_t num_ports, int do_ip4, int do_ip6, int porthint,
struct outside_network* outnet)
{
size_t i;
size_t done = 0;
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_flags = AI_PASSIVE;
if(ifname)
hints.ai_flags |= AI_NUMERICHOST;
hints.ai_family = AF_UNSPEC;
if(do_ip4 && do_ip6)
hints.ai_family = AF_UNSPEC;
else if(do_ip4)
hints.ai_family = AF_INET;
else if(do_ip6)
hints.ai_family = AF_INET6;
hints.ai_socktype = SOCK_DGRAM;
for(i=0; i<num_ports; i++) {
int fd = -1;
int inuse = 1;
while(fd == -1 && inuse) {
inuse = 0;
fd = open_udp_port_range(ifname, &hints,
porthint, &inuse);
if(fd == -1 && porthint != -1 && inuse)
verbose(VERB_DETAIL, "%sport %d already in use, skipped",
(do_ip6?"IP6 ":""), porthint);
if(porthint != -1) {
porthint++;
if(porthint > 65535) {
log_err("ports maxed. cannot open ports");
return done;
}
}
}
coms[done] = comm_point_create_udp(outnet->base, fd,
outnet->udp_buff, outnet_udp_cb, outnet);
if(coms[done]) {
log_assert(coms[done]->inuse == 0);
comm_point_stop_listening(coms[done]);
done++;
}
}
return done;
}
/** calculate number of ip4 and ip6 interfaces, times multiplier */
static void static void
calc_num46(char** ifs, int num_ifs, int do_ip4, int do_ip6, calc_num46(char** ifs, int num_ifs, int do_ip4, int do_ip6,
size_t multiplier, size_t* num_ip4, size_t* num_ip6) int* num_ip4, int* num_ip6)
{ {
int i; int i;
*num_ip4 = 0; *num_ip4 = 0;
*num_ip6 = 0; *num_ip6 = 0;
if(num_ifs <= 0) { if(num_ifs <= 0) {
if(do_ip4) if(do_ip4)
*num_ip4 = multiplier; *num_ip4 = 1;
if(do_ip6) if(do_ip6)
*num_ip6 = multiplier; *num_ip6 = 1;
return; return;
} }
for(i=0; i<num_ifs; i++) for(i=0; i<num_ifs; i++)
{ {
if(str_is_ip6(ifs[i])) { if(str_is_ip6(ifs[i])) {
if(do_ip6) if(do_ip6)
*num_ip6 += multiplier; (*num_ip6)++;
} else { } else {
if(do_ip4) if(do_ip4)
*num_ip4 += multiplier; (*num_ip4)++;
} }
} }
@ -411,14 +365,14 @@ void
pending_udp_timer_cb(void *arg) pending_udp_timer_cb(void *arg)
{ {
struct pending* p = (struct pending*)arg; struct pending* p = (struct pending*)arg;
struct outside_network* outnet = p->outnet;
/* it timed out */ /* it timed out */
verbose(VERB_ALGO, "timeout udp"); verbose(VERB_ALGO, "timeout udp");
fptr_ok(fptr_whitelist_pending_udp(p->cb)); fptr_ok(fptr_whitelist_pending_udp(p->cb));
(void)(*p->cb)(p->c, p->cb_arg, NETEVENT_TIMEOUT, NULL); (void)(*p->cb)(p->pc->cp, p->cb_arg, NETEVENT_TIMEOUT, NULL);
p->c->inuse--; portcomm_loweruse(outnet, p->pc);
if(p->c->inuse == 0) pending_delete(outnet, p);
comm_point_stop_listening(p->c); outnet_send_wait_udp(outnet);
pending_delete(p->outnet, p);
} }
/** create pending_tcp buffers */ /** create pending_tcp buffers */
@ -446,15 +400,35 @@ create_pending_tcp(struct outside_network* outnet, size_t bufsize)
return 1; return 1;
} }
/** setup an outgoing interface, ready address */
static int setup_if(struct port_if* pif, const char* addrstr,
int* avail, int numavail, size_t numfd)
{
pif->avail_total = numavail;
pif->avail_ports = (int*)memdup(avail, (size_t)numavail*sizeof(int));
if(!pif->avail_ports)
return 0;
if(!ipstrtoaddr(addrstr, UNBOUND_DNS_PORT, &pif->addr, &pif->addrlen))
return 0;
pif->maxout = (int)numfd;
pif->inuse = 0;
pif->out = (struct port_comm**)calloc(numfd,
sizeof(struct port_comm*));
if(!pif->out)
return 0;
return 1;
}
struct outside_network* struct outside_network*
outside_network_create(struct comm_base *base, size_t bufsize, outside_network_create(struct comm_base *base, size_t bufsize,
size_t num_ports, char** ifs, int num_ifs, int do_ip4, size_t num_ports, char** ifs, int num_ifs, int do_ip4,
int do_ip6, int port_base, size_t num_tcp, struct infra_cache* infra, int do_ip6, int port_base, size_t num_tcp, struct infra_cache* infra,
struct ub_randstate* rnd, int use_caps_for_id) struct ub_randstate* rnd, int use_caps_for_id, int* availports,
int numavailports)
{ {
struct outside_network* outnet = (struct outside_network*) struct outside_network* outnet = (struct outside_network*)
calloc(1, sizeof(struct outside_network)); calloc(1, sizeof(struct outside_network));
int k; size_t k;
if(!outnet) { if(!outnet) {
log_err("malloc failed"); log_err("malloc failed");
return NULL; return NULL;
@ -466,17 +440,33 @@ outside_network_create(struct comm_base *base, size_t bufsize,
outnet->rnd = rnd; outnet->rnd = rnd;
outnet->svcd_overhead = 0; outnet->svcd_overhead = 0;
outnet->use_caps_for_id = use_caps_for_id; outnet->use_caps_for_id = use_caps_for_id;
if(numavailports == 0) {
log_err("no outgoing ports available");
outside_network_delete(outnet);
return NULL;
}
#ifndef INET6 #ifndef INET6
do_ip6 = 0; do_ip6 = 0;
#endif #endif
calc_num46(ifs, num_ifs, do_ip4, do_ip6, num_ports, calc_num46(ifs, num_ifs, do_ip4, do_ip6,
&outnet->num_udp4, &outnet->num_udp6); &outnet->num_ip4, &outnet->num_ip6);
/* adds +1 to portnums so we do not allocate zero bytes. */ if(outnet->num_ip4 != 0) {
if(!(outnet->ip4_ifs = (struct port_if*)calloc(
(size_t)outnet->num_ip4, sizeof(struct port_if)))) {
log_err("malloc failed");
outside_network_delete(outnet);
return NULL;
}
}
if(outnet->num_ip6 != 0) {
if(!(outnet->ip6_ifs = (struct port_if*)calloc(
(size_t)outnet->num_ip6, sizeof(struct port_if)))) {
log_err("malloc failed");
outside_network_delete(outnet);
return NULL;
}
}
if( !(outnet->udp_buff = ldns_buffer_new(bufsize)) || if( !(outnet->udp_buff = ldns_buffer_new(bufsize)) ||
!(outnet->udp4_ports = (struct comm_point **)calloc(
outnet->num_udp4+1, sizeof(struct comm_point*))) ||
!(outnet->udp6_ports = (struct comm_point **)calloc(
outnet->num_udp6+1, sizeof(struct comm_point*))) ||
!(outnet->pending = rbtree_create(pending_cmp)) || !(outnet->pending = rbtree_create(pending_cmp)) ||
!(outnet->serviced = rbtree_create(serviced_cmp)) || !(outnet->serviced = rbtree_create(serviced_cmp)) ||
!create_pending_tcp(outnet, bufsize)) { !create_pending_tcp(outnet, bufsize)) {
@ -484,50 +474,66 @@ outside_network_create(struct comm_base *base, size_t bufsize,
outside_network_delete(outnet); outside_network_delete(outnet);
return NULL; return NULL;
} }
/* Try to get ip6 and ip4 ports. Ip6 first, in case second fails. */
/* allocate commpoints */
for(k=0; k<num_ports; k++) {
struct port_comm* pc;
pc = (struct port_comm*)calloc(1, sizeof(*pc));
if(!pc) {
log_err("malloc failed");
outside_network_delete(outnet);
return NULL;
}
pc->cp = comm_point_create_udp(outnet->base, -1,
outnet->udp_buff, outnet_udp_cb, outnet);
if(!pc->cp) {
log_err("malloc failed");
free(pc);
outside_network_delete(outnet);
return NULL;
}
pc->next = outnet->unused_fds;
outnet->unused_fds = pc;
}
/* allocate interfaces */
if(num_ifs == 0) { if(num_ifs == 0) {
if(do_ip6) { if(do_ip4 && !setup_if(&outnet->ip4_ifs[0], "0.0.0.0",
outnet->num_udp6 = make_udp_range(outnet->udp6_ports, availports, numavailports, num_ports)) {
NULL, num_ports, 0, 1, port_base, outnet); log_err("malloc failed");
}
if(do_ip4) {
outnet->num_udp4 = make_udp_range(outnet->udp4_ports,
NULL, num_ports, 1, 0, port_base, outnet);
}
if( (do_ip4 && outnet->num_udp4 != num_ports) ||
(do_ip6 && outnet->num_udp6 != num_ports)) {
log_err("Could not open all networkside ports");
outside_network_delete(outnet); outside_network_delete(outnet);
return NULL; return NULL;
} }
if(do_ip6 && !setup_if(&outnet->ip6_ifs[0], "::",
availports, numavailports, num_ports)) {
log_err("malloc failed");
outside_network_delete(outnet);
return NULL;
} }
else { } else {
size_t done_4 = 0, done_6 = 0; size_t done_4 = 0, done_6 = 0;
for(k=0; k<num_ifs; k++) { int i;
if(str_is_ip6(ifs[k]) && do_ip6) { for(i=0; i<num_ifs; i++) {
done_6 += make_udp_range( if(str_is_ip6(ifs[i]) && do_ip6) {
outnet->udp6_ports+done_6, ifs[k], if(!setup_if(&outnet->ip6_ifs[done_6], ifs[i],
num_ports, 0, 1, port_base, outnet); availports, numavailports, num_ports)){
} log_err("malloc failed");
if(!str_is_ip6(ifs[k]) && do_ip4) {
done_4 += make_udp_range(
outnet->udp4_ports+done_4, ifs[k],
num_ports, 1, 0, port_base, outnet);
}
}
if(done_6 != outnet->num_udp6 || done_4 != outnet->num_udp4) {
log_err("Could not open all ports on all interfaces");
outside_network_delete(outnet); outside_network_delete(outnet);
return NULL; return NULL;
} }
outnet->num_udp6 = done_6; done_6++;
outnet->num_udp4 = done_4;
} }
if(outnet->num_udp4 + outnet->num_udp6 == 0) { if(!str_is_ip6(ifs[i]) && do_ip4) {
log_err("Could not open any ports on outgoing interfaces"); if(!setup_if(&outnet->ip4_ifs[done_4], ifs[i],
availports, numavailports, num_ports)){
log_err("malloc failed");
outside_network_delete(outnet); outside_network_delete(outnet);
return NULL; return NULL;
} }
done_4++;
}
}
}
return outnet; return outnet;
} }
@ -537,9 +543,6 @@ pending_node_del(rbnode_t* node, void* arg)
{ {
struct pending* pend = (struct pending*)node; struct pending* pend = (struct pending*)node;
struct outside_network* outnet = (struct outside_network*)arg; struct outside_network* outnet = (struct outside_network*)arg;
pend->c->inuse--;
if(pend->c->inuse == 0)
comm_point_stop_listening(pend->c);
pending_delete(outnet, pend); pending_delete(outnet, pend);
} }
@ -575,17 +578,43 @@ outside_network_delete(struct outside_network* outnet)
} }
if(outnet->udp_buff) if(outnet->udp_buff)
ldns_buffer_free(outnet->udp_buff); ldns_buffer_free(outnet->udp_buff);
if(outnet->udp4_ports) { if(outnet->unused_fds) {
size_t i; struct port_comm* p = outnet->unused_fds, *np;
for(i=0; i<outnet->num_udp4; i++) while(p) {
comm_point_delete(outnet->udp4_ports[i]); np = p->next;
free(outnet->udp4_ports); comm_point_delete(p->cp);
free(p);
p = np;
} }
if(outnet->udp6_ports) { outnet->unused_fds = NULL;
size_t i; }
for(i=0; i<outnet->num_udp6; i++) if(outnet->ip4_ifs) {
comm_point_delete(outnet->udp6_ports[i]); int i, k;
free(outnet->udp6_ports); for(i=0; i<outnet->num_ip4; i++) {
for(k=0; k<outnet->ip4_ifs[i].inuse; k++) {
struct port_comm* pc = outnet->ip4_ifs[i].
out[k];
comm_point_delete(pc->cp);
free(pc);
}
free(outnet->ip4_ifs[i].avail_ports);
free(outnet->ip4_ifs[i].out);
}
free(outnet->ip4_ifs);
}
if(outnet->ip6_ifs) {
int i, k;
for(i=0; i<outnet->num_ip6; i++) {
for(k=0; k<outnet->ip6_ifs[i].inuse; k++) {
struct port_comm* pc = outnet->ip6_ifs[i].
out[k];
comm_point_delete(pc->cp);
free(pc);
}
free(outnet->ip6_ifs[i].avail_ports);
free(outnet->ip6_ifs[i].out);
}
free(outnet->ip6_ifs);
} }
if(outnet->tcp_conns) { if(outnet->tcp_conns) {
size_t i; size_t i;
@ -605,7 +634,14 @@ outside_network_delete(struct outside_network* outnet)
p = np; p = np;
} }
} }
if(outnet->udp_wait_first) {
struct pending* p = outnet->udp_wait_first, *np;
while(p) {
np = p->next_waiting;
pending_delete(NULL, p);
p = np;
}
}
free(outnet); free(outnet);
} }
@ -619,133 +655,216 @@ pending_delete(struct outside_network* outnet, struct pending* p)
} }
if(p->timer) if(p->timer)
comm_timer_delete(p->timer); comm_timer_delete(p->timer);
free(p->pkt);
free(p); free(p);
} }
/** create a new pending item with given characteristics, false on failure */ /**
static struct pending* * Try to open a UDP socket for outgoing communication.
new_pending(struct outside_network* outnet, ldns_buffer* packet, * Sets sockets options as needed.
struct sockaddr_storage* addr, socklen_t addrlen, * @param addr: socket address.
comm_point_callback_t* callback, void* callback_arg, * @param addrlen: length of address.
struct ub_randstate* rnd) * @param port: port override for addr.
* @param inuse: if -1 is returned, this bool means the port was in use.
* @return fd or -1
*/
static int
udp_sockport(struct sockaddr_storage* addr, socklen_t addrlen, int port,
int* inuse)
{
int fd;
if(addr_is_ip6(addr, addrlen)) {
struct sockaddr_in6* sa = (struct sockaddr_in6*)addr;
sa->sin6_port = (in_port_t)htons((uint16_t)port);
fd = create_udp_sock(AF_INET6, SOCK_DGRAM,
(struct sockaddr*)addr, addrlen, 1, inuse);
} else {
struct sockaddr_in* sa = (struct sockaddr_in*)addr;
sa->sin_port = (in_port_t)htons((uint16_t)port);
fd = create_udp_sock(AF_INET, SOCK_DGRAM,
(struct sockaddr*)addr, addrlen, 1, inuse);
}
return fd;
}
/** Select random ID */
static int
select_id(struct outside_network* outnet, struct pending* pend,
ldns_buffer* packet)
{ {
/* alloc */
int id_tries = 0; int id_tries = 0;
struct pending* pend = (struct pending*)calloc(1, pend->id = ((unsigned)ub_random(outnet->rnd)>>8) & 0xffff;
sizeof(struct pending));
if(!pend) {
log_err("malloc failure");
return NULL;
}
pend->timer = comm_timer_create(outnet->base, pending_udp_timer_cb,
pend);
if(!pend->timer) {
free(pend);
return NULL;
}
/* set */
pend->id = ((unsigned)ub_random(rnd)>>8) & 0xffff;
LDNS_ID_SET(ldns_buffer_begin(packet), pend->id); LDNS_ID_SET(ldns_buffer_begin(packet), pend->id);
memcpy(&pend->addr, addr, addrlen);
pend->addrlen = addrlen;
pend->cb = callback;
pend->cb_arg = callback_arg;
pend->outnet = outnet;
/* insert in tree */ /* insert in tree */
pend->node.key = pend; pend->node.key = pend;
while(!rbtree_insert(outnet->pending, &pend->node)) { while(!rbtree_insert(outnet->pending, &pend->node)) {
/* change ID to avoid collision */ /* change ID to avoid collision */
pend->id = ((unsigned)ub_random(rnd)>>8) & 0xffff; pend->id = ((unsigned)ub_random(outnet->rnd)>>8) & 0xffff;
LDNS_ID_SET(ldns_buffer_begin(packet), pend->id); LDNS_ID_SET(ldns_buffer_begin(packet), pend->id);
id_tries++; id_tries++;
if(id_tries == MAX_ID_RETRY) { if(id_tries == MAX_ID_RETRY) {
pend->id=99999; /* non existant ID */
log_err("failed to generate unique ID, drop msg"); log_err("failed to generate unique ID, drop msg");
pending_delete(NULL, pend); return 0;
return NULL;
} }
} }
verbose(VERB_ALGO, "inserted new pending reply id=%4.4x", pend->id); verbose(VERB_ALGO, "inserted new pending reply id=%4.4x", pend->id);
return pend; return 1;
} }
/** /** Select random interface and port */
* Select outgoing comm point for a query. Fills in c. static int
* @param outnet: network structure that has arrays of ports to choose from. select_ifport(struct outside_network* outnet, struct pending* pend,
* @param pend: the message to send. c is filled in, randomly chosen. int num_if, struct port_if* ifs)
* @param rnd: random state for generating ID and port.
*/
static void
select_port(struct outside_network* outnet, struct pending* pend,
struct ub_randstate* rnd)
{ {
double precho; int my_if, my_port, fd, portno, inuse, tries=0;
int chosen, nummax; struct port_if* pif;
/* randomly select interface and port */
log_assert(outnet && pend); if(num_if == 0) {
/* first select ip4 or ip6. */ verbose(VERB_QUERY, "Need to send query but have no "
if(addr_is_ip6(&pend->addr, pend->addrlen)) "outgoing interfaces of that family");
nummax = (int)outnet->num_udp6; return 0;
else nummax = (int)outnet->num_udp4;
if(nummax == 0) {
/* could try ip4to6 mapping if no ip4 ports available */
log_err("Need to send query but have no ports of that family");
return;
} }
log_assert(outnet->unused_fds);
tries = 0;
while(1) {
my_if = ub_random(outnet->rnd) % num_if;
pif = &ifs[my_if];
my_port = ub_random(outnet->rnd) % pif->avail_total;
if(my_port < pif->inuse) {
/* port already open */
pend->pc = pif->out[my_port];
verbose(VERB_ALGO, "using UDP if=%d port=%d",
my_if, pend->pc->number);
break;
}
/* try to open new port, if fails, loop to try again */
log_assert(pif->inuse < pif->maxout);
portno = pif->avail_ports[my_port - pif->inuse];
fd = udp_sockport(&pif->addr, pif->addrlen, portno, &inuse);
if(fd == -1 && !inuse) {
/* nonrecoverable error making socket */
return 0;
}
if(fd != -1) {
verbose(VERB_ALGO, "opened UDP if=%d port=%d",
my_if, portno);
/* grab fd */
pend->pc = outnet->unused_fds;
outnet->unused_fds = pend->pc->next;
/* choose a random outgoing port and interface */ /* setup portcomm */
precho = (double)ub_random(rnd) * (double)nummax / pend->pc->next = NULL;
((double)RAND_MAX + 1.0); pend->pc->number = portno;
chosen = (int)precho; pend->pc->pif = pif;
pend->pc->index = pif->inuse;
pend->pc->num_outstanding = 0;
comm_point_start_listening(pend->pc->cp, fd, -1);
/* don't trust in perfect double rounding */ /* grab port in interface */
if(chosen < 0) chosen = 0; pif->out[pif->inuse] = pend->pc;
if(chosen >= nummax) chosen = nummax-1; pif->avail_ports[my_port - pif->inuse] =
pif->avail_ports[pif->avail_total-pif->inuse-1];
pif->inuse++;
break;
}
/* failed, already in use */
verbose(VERB_QUERY, "port %d in use, trying another", portno);
tries++;
if(tries == MAX_PORT_RETRY) {
log_err("failed to find an open port, drop msg");
return 0;
}
}
log_assert(pend->pc);
pend->pc->num_outstanding++;
if(addr_is_ip6(&pend->addr, pend->addrlen)) return 1;
pend->c = outnet->udp6_ports[chosen];
else pend->c = outnet->udp4_ports[chosen];
log_assert(pend->c);
verbose(VERB_ALGO, "query %x outbound on port %d of %d", pend->id, chosen, nummax);
} }
static int
struct pending* randomize_and_send_udp(struct outside_network* outnet, struct pending* pend,
pending_udp_query(struct outside_network* outnet, ldns_buffer* packet, ldns_buffer* packet, int timeout)
struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
comm_point_callback_t* cb, void* cb_arg, struct ub_randstate* rnd)
{ {
struct pending* pend;
struct timeval tv; struct timeval tv;
/* create pending struct and change ID to be unique */ /* select id */
if(!(pend=new_pending(outnet, packet, addr, addrlen, cb, cb_arg, if(!select_id(outnet, pend, packet)) {
rnd))) { return 0;
return NULL;
}
select_port(outnet, pend, rnd);
if(!pend->c) {
pending_delete(outnet, pend);
return NULL;
} }
/* send it over the commlink */ /* select src_if, port */
if(!comm_point_send_udp_msg(pend->c, packet, (struct sockaddr*)addr, if(addr_is_ip6(&pend->addr, pend->addrlen)) {
addrlen)) { if(!select_ifport(outnet, pend,
pending_delete(outnet, pend); outnet->num_ip6, outnet->ip6_ifs))
return NULL; return 0;
} else {
if(!select_ifport(outnet, pend,
outnet->num_ip4, outnet->ip4_ifs))
return 0;
}
log_assert(pend->pc && pend->pc->cp);
/* send it over the commlink */
if(!comm_point_send_udp_msg(pend->pc->cp, packet,
(struct sockaddr*)&pend->addr, pend->addrlen)) {
portcomm_loweruse(outnet, pend->pc);
return 0;
} }
if(pend->c->inuse == 0)
comm_point_start_listening(pend->c, -1, -1);
pend->c->inuse++;
/* system calls to set timeout after sending UDP to make roundtrip /* system calls to set timeout after sending UDP to make roundtrip
smaller. */ smaller. */
tv.tv_sec = timeout/1000; tv.tv_sec = timeout/1000;
tv.tv_usec = (timeout%1000)*1000; tv.tv_usec = (timeout%1000)*1000;
comm_timer_set(pend->timer, &tv); comm_timer_set(pend->timer, &tv);
return 1;
}
struct pending*
pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
comm_point_callback_t* cb, void* cb_arg)
{
struct pending* pend = (struct pending*)calloc(1, sizeof(*pend));
if(!pend) return NULL;
pend->outnet = outnet;
pend->addrlen = addrlen;
memmove(&pend->addr, addr, addrlen);
pend->cb = cb;
pend->cb_arg = cb_arg;
pend->node.key = pend;
pend->timer = comm_timer_create(outnet->base, pending_udp_timer_cb,
pend);
if(!pend->timer) {
free(pend);
return NULL;
}
if(outnet->unused_fds == NULL) {
/* no unused fd, cannot create a new port (randomly) */
verbose(VERB_ALGO, "no fds available, udp query waiting");
pend->timeout = timeout;
pend->pkt_len = ldns_buffer_limit(packet);
pend->pkt = (uint8_t*)memdup(ldns_buffer_begin(packet),
pend->pkt_len);
if(!pend->pkt) {
comm_timer_delete(pend->timer);
free(pend);
return NULL;
}
/* put at end of waiting list */
if(outnet->udp_wait_last)
outnet->udp_wait_last->next_waiting = pend;
else
outnet->udp_wait_first = pend;
outnet->udp_wait_last = pend;
return pend;
}
if(!randomize_and_send_udp(outnet, pend, packet, timeout)) {
pending_delete(outnet, pend);
return NULL;
}
return pend; return pend;
} }
@ -788,8 +907,7 @@ outnet_tcptimer(void* arg)
struct waiting_tcp* struct waiting_tcp*
pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet, pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
struct sockaddr_storage* addr, socklen_t addrlen, int timeout, struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
comm_point_callback_t* callback, void* callback_arg, comm_point_callback_t* callback, void* callback_arg)
struct ub_randstate* rnd)
{ {
struct pending_tcp* pend = outnet->tcp_free; struct pending_tcp* pend = outnet->tcp_free;
struct waiting_tcp* w; struct waiting_tcp* w;
@ -807,7 +925,7 @@ pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
} }
w->pkt = NULL; w->pkt = NULL;
w->pkt_len = 0; w->pkt_len = 0;
id = ((unsigned)ub_random(rnd)>>8) & 0xffff; id = ((unsigned)ub_random(outnet->rnd)>>8) & 0xffff;
LDNS_ID_SET(ldns_buffer_begin(packet), id); LDNS_ID_SET(ldns_buffer_begin(packet), id);
memcpy(&w->addr, addr, addrlen); memcpy(&w->addr, addr, addrlen);
w->addrlen = addrlen; w->addrlen = addrlen;
@ -931,10 +1049,9 @@ serviced_delete(struct serviced_query* sq)
if(sq->status == serviced_query_UDP_EDNS || if(sq->status == serviced_query_UDP_EDNS ||
sq->status == serviced_query_UDP) { sq->status == serviced_query_UDP) {
struct pending* p = (struct pending*)sq->pending; struct pending* p = (struct pending*)sq->pending;
p->c->inuse--; portcomm_loweruse(sq->outnet, p->pc);
if(p->c->inuse == 0)
comm_point_stop_listening(p->c);
pending_delete(sq->outnet, p); pending_delete(sq->outnet, p);
outnet_send_wait_udp(sq->outnet);
} else { } else {
struct waiting_tcp* p = (struct waiting_tcp*) struct waiting_tcp* p = (struct waiting_tcp*)
sq->pending; sq->pending;
@ -1043,7 +1160,7 @@ serviced_udp_send(struct serviced_query* sq, ldns_buffer* buff)
sq->last_sent_time = *sq->outnet->now_tv; sq->last_sent_time = *sq->outnet->now_tv;
verbose(VERB_ALGO, "serviced query UDP timeout=%d msec", rtt); verbose(VERB_ALGO, "serviced query UDP timeout=%d msec", rtt);
sq->pending = pending_udp_query(sq->outnet, buff, &sq->addr, sq->pending = pending_udp_query(sq->outnet, buff, &sq->addr,
sq->addrlen, rtt, serviced_udp_callback, sq, sq->outnet->rnd); sq->addrlen, rtt, serviced_udp_callback, sq);
if(!sq->pending) if(!sq->pending)
return 0; return 0;
return 1; return 1;
@ -1216,7 +1333,7 @@ serviced_tcp_initiate(struct outside_network* outnet,
serviced_encode(sq, buff, sq->status == serviced_query_TCP_EDNS); serviced_encode(sq, buff, sq->status == serviced_query_TCP_EDNS);
sq->pending = pending_tcp_query(outnet, buff, &sq->addr, sq->pending = pending_tcp_query(outnet, buff, &sq->addr,
sq->addrlen, TCP_AUTH_QUERY_TIMEOUT, serviced_tcp_callback, sq->addrlen, TCP_AUTH_QUERY_TIMEOUT, serviced_tcp_callback,
sq, outnet->rnd); sq);
if(!sq->pending) { if(!sq->pending) {
/* delete from tree so that a retry by above layer does not /* delete from tree so that a retry by above layer does not
* clash with this entry */ * clash with this entry */
@ -1399,22 +1516,52 @@ waiting_tcp_get_mem(struct waiting_tcp* w)
return s; return s;
} }
/** get memory used by port if */
static size_t
if_get_mem(struct port_if* pif)
{
size_t s;
int i;
s = sizeof(*pif) + sizeof(int)*pif->avail_total +
sizeof(struct port_comm*)*pif->maxout;
for(i=0; i<pif->inuse; i++)
s += sizeof(*pif->out[i]) +
comm_point_get_mem(pif->out[i]->cp);
return s;
}
/** get memory used by waiting udp */
static size_t
waiting_udp_get_mem(struct pending* w)
{
size_t s;
s = sizeof(*w) + comm_timer_get_mem(w->timer) + w->pkt_len;
return s;
}
size_t outnet_get_mem(struct outside_network* outnet) size_t outnet_get_mem(struct outside_network* outnet)
{ {
size_t i; size_t i;
int k;
struct waiting_tcp* w; struct waiting_tcp* w;
struct pending* u;
struct serviced_query* sq; struct serviced_query* sq;
struct service_callback* sb; struct service_callback* sb;
struct port_comm* pc;
size_t s = sizeof(*outnet) + sizeof(*outnet->base) + size_t s = sizeof(*outnet) + sizeof(*outnet->base) +
sizeof(*outnet->udp_buff) + sizeof(*outnet->udp_buff) +
ldns_buffer_capacity(outnet->udp_buff); ldns_buffer_capacity(outnet->udp_buff);
/* second buffer is not ours */ /* second buffer is not ours */
s += sizeof(struct comm_point*)*outnet->num_udp4; for(pc = outnet->unused_fds; pc; pc = pc->next) {
for(i=0; i<outnet->num_udp4; i++) s += sizeof(*pc) + comm_point_get_mem(pc->cp);
s += comm_point_get_mem(outnet->udp4_ports[i]); }
s += sizeof(struct comm_point*)*outnet->num_udp6; for(k=0; k<outnet->num_ip4; k++)
for(i=0; i<outnet->num_udp6; i++) s += if_get_mem(&outnet->ip4_ifs[k]);
s += comm_point_get_mem(outnet->udp6_ports[i]); for(k=0; k<outnet->num_ip6; k++)
s += if_get_mem(&outnet->ip6_ifs[k]);
for(u=outnet->udp_wait_first; u; u=u->next_waiting)
s += waiting_udp_get_mem(u);
s += sizeof(struct pending_tcp*)*outnet->num_tcp; s += sizeof(struct pending_tcp*)*outnet->num_tcp;
for(i=0; i<outnet->num_tcp; i++) { for(i=0; i<outnet->num_tcp; i++) {
s += sizeof(struct pending_tcp); s += sizeof(struct pending_tcp);

View file

@ -51,7 +51,10 @@ struct pending_timeout;
struct ub_randstate; struct ub_randstate;
struct pending_tcp; struct pending_tcp;
struct waiting_tcp; struct waiting_tcp;
struct waiting_udp;
struct infra_cache; struct infra_cache;
struct port_comm;
struct port_if;
/** /**
* Send queries to outside servers and wait for answers from servers. * Send queries to outside servers and wait for answers from servers.
@ -74,24 +77,24 @@ struct outside_network {
/** use x20 bits to encode additional ID random bits */ /** use x20 bits to encode additional ID random bits */
int use_caps_for_id; int use_caps_for_id;
/** /** linked list of available commpoints, unused file descriptors,
* Array of udp comm point* that are used to listen to pending events. * for use as outgoing UDP ports. cp.fd=-1 in them. */
* Each is on a different port. This is for ip4 ports. struct port_comm* unused_fds;
*/
struct comm_point** udp4_ports;
/** number of queries open on each port */
int* udp4_inuse;
/** number of udp4 ports */
size_t num_udp4;
/** /** array of outgoing IP4 interfaces */
* The opened ip6 ports. struct port_if* ip4_ifs;
*/ /** number of outgoing IP4 interfaces */
struct comm_point** udp6_ports; int num_ip4;
/** number of queries open on each port */
int* udp6_inuse; /** array of outgoing IP6 interfaces */
/** number of udp6 ports */ struct port_if* ip6_ifs;
size_t num_udp6; /** number of outgoing IP6 interfaces */
int num_ip6;
/** pending udp queries waiting to be sent out, waiting for fd */
struct pending* udp_wait_first;
/** last pending udp query in list */
struct pending* udp_wait_last;
/** pending udp answers. sorted by id, addr */ /** pending udp answers. sorted by id, addr */
rbtree_t* pending; rbtree_t* pending;
@ -119,20 +122,65 @@ struct outside_network {
struct waiting_tcp* tcp_wait_last; struct waiting_tcp* tcp_wait_last;
}; };
/**
* Outgoing interface. Ports available and currently used are tracked
* per interface
*/
struct port_if {
/** address ready to allocate new socket (except port no). */
struct sockaddr_storage addr;
/** length of addr field */
socklen_t addrlen;
/** the available ports array. These are unused.
* Only the first total-inuse part is filled. */
int* avail_ports;
/** the total number of available ports (size of the array) */
int avail_total;
/** array of the commpoints currently in use.
* allocated for max number of fds, first part in use. */
struct port_comm** out;
/** max number of fds, size of out array */
int maxout;
/** number of commpoints (and thus also ports) in use */
int inuse;
};
/**
* Outgoing commpoint for UDP port.
*/
struct port_comm {
/** next in free list */
struct port_comm* next;
/** which port number (when in use) */
int number;
/** interface it is used in */
struct port_if* pif;
/** index in the out array of the interface */
int index;
/** number of outstanding queries on this port */
int num_outstanding;
/** UDP commpoint, fd=-1 if not in use */
struct comm_point* cp;
};
/** /**
* A query that has an answer pending for it. * A query that has an answer pending for it.
*/ */
struct pending { struct pending {
/** redblacktree entry, key is the pending struct(id, addr). */ /** redblacktree entry, key is the pending struct(id, addr). */
rbnode_t node; rbnode_t node;
/** the ID for the query */ /** the ID for the query. int so that a value out of range can
uint16_t id; * be used to signify a pending that is for certain not present in
* the rbtree. (and for which deletion is safe). */
unsigned int id;
/** remote address. */ /** remote address. */
struct sockaddr_storage addr; struct sockaddr_storage addr;
/** length of addr field in use. */ /** length of addr field in use. */
socklen_t addrlen; socklen_t addrlen;
/** comm point it was sent on (and reply must come back on). */ /** comm point it was sent on (and reply must come back on). */
struct comm_point* c; struct port_comm* pc;
/** timeout event */ /** timeout event */
struct comm_timer* timer; struct comm_timer* timer;
/** callback for the timeout, error or reply to the message */ /** callback for the timeout, error or reply to the message */
@ -141,6 +189,16 @@ struct pending {
void* cb_arg; void* cb_arg;
/** the outside network it is part of */ /** the outside network it is part of */
struct outside_network* outnet; struct outside_network* outnet;
/*---- filled if udp pending is waiting -----*/
/** next in waiting list. */
struct pending* next_waiting;
/** timeout in msec */
int timeout;
/** The query itself, the query packet to send. */
uint8_t* pkt;
/** length of query packet. */
size_t pkt_len;
}; };
/** /**
@ -268,13 +326,15 @@ struct serviced_query {
* @param infra: pointer to infra cached used for serviced queries. * @param infra: pointer to infra cached used for serviced queries.
* @param rnd: stored to create random numbers for serviced queries. * @param rnd: stored to create random numbers for serviced queries.
* @param use_caps_for_id: enable to use 0x20 bits to encode id randomness. * @param use_caps_for_id: enable to use 0x20 bits to encode id randomness.
* @param availports: array of available ports.
* @param numavailports: number of available ports in array.
* @return: the new structure (with no pending answers) or NULL on error. * @return: the new structure (with no pending answers) or NULL on error.
*/ */
struct outside_network* outside_network_create(struct comm_base* base, struct outside_network* outside_network_create(struct comm_base* base,
size_t bufsize, size_t num_ports, char** ifs, int num_ifs, size_t bufsize, size_t num_ports, char** ifs, int num_ifs,
int do_ip4, int do_ip6, int port_base, size_t num_tcp, int do_ip4, int do_ip6, int port_base, size_t num_tcp,
struct infra_cache* infra, struct ub_randstate* rnd, struct infra_cache* infra, struct ub_randstate* rnd,
int use_caps_for_id); int use_caps_for_id, int* availports, int numavailports);
/** /**
* Delete outside_network structure. * Delete outside_network structure.
@ -292,13 +352,12 @@ void outside_network_delete(struct outside_network* outnet);
* @param timeout: in milliseconds from now. * @param timeout: in milliseconds from now.
* @param callback: function to call on error, timeout or reply. * @param callback: function to call on error, timeout or reply.
* @param callback_arg: user argument for callback function. * @param callback_arg: user argument for callback function.
* @param rnd: random state for generating ID and port.
* @return: NULL on error for malloc or socket. Else the pending query object. * @return: NULL on error for malloc or socket. Else the pending query object.
*/ */
struct pending* pending_udp_query(struct outside_network* outnet, struct pending* pending_udp_query(struct outside_network* outnet,
ldns_buffer* packet, struct sockaddr_storage* addr, ldns_buffer* packet, struct sockaddr_storage* addr,
socklen_t addrlen, int timeout, comm_point_callback_t* callback, socklen_t addrlen, int timeout, comm_point_callback_t* callback,
void* callback_arg, struct ub_randstate* rnd); void* callback_arg);
/** /**
* Send TCP query. May wait for TCP buffer. Selects ID to be random, and * Send TCP query. May wait for TCP buffer. Selects ID to be random, and
@ -312,13 +371,12 @@ struct pending* pending_udp_query(struct outside_network* outnet,
* without any query been sent to the server yet. * without any query been sent to the server yet.
* @param callback: function to call on error, timeout or reply. * @param callback: function to call on error, timeout or reply.
* @param callback_arg: user argument for callback function. * @param callback_arg: user argument for callback function.
* @param rnd: random state for generating ID.
* @return: false on error for malloc or socket. Else the pending TCP object. * @return: false on error for malloc or socket. Else the pending TCP object.
*/ */
struct waiting_tcp* pending_tcp_query(struct outside_network* outnet, struct waiting_tcp* pending_tcp_query(struct outside_network* outnet,
ldns_buffer* packet, struct sockaddr_storage* addr, ldns_buffer* packet, struct sockaddr_storage* addr,
socklen_t addrlen, int timeout, comm_point_callback_t* callback, socklen_t addrlen, int timeout, comm_point_callback_t* callback,
void* callback_arg, struct ub_randstate* rnd); void* callback_arg);
/** /**
* Delete pending answer. * Delete pending answer.

View file

@ -686,7 +686,9 @@ outside_network_create(struct comm_base* base, size_t bufsize,
int ATTR_UNUSED(num_ifs), int ATTR_UNUSED(do_ip4), int ATTR_UNUSED(num_ifs), int ATTR_UNUSED(do_ip4),
int ATTR_UNUSED(do_ip6), int ATTR_UNUSED(port_base), int ATTR_UNUSED(do_ip6), int ATTR_UNUSED(port_base),
size_t ATTR_UNUSED(num_tcp), struct infra_cache* ATTR_UNUSED(infra), size_t ATTR_UNUSED(num_tcp), struct infra_cache* ATTR_UNUSED(infra),
struct ub_randstate* ATTR_UNUSED(rnd), int ATTR_UNUSED(use_caps_for_id)) struct ub_randstate* ATTR_UNUSED(rnd),
int ATTR_UNUSED(use_caps_for_id), int* ATTR_UNUSED(availports),
int ATTR_UNUSED(numavailports))
{ {
struct outside_network* outnet = calloc(1, struct outside_network* outnet = calloc(1,
sizeof(struct outside_network)); sizeof(struct outside_network));
@ -711,8 +713,7 @@ outside_network_delete(struct outside_network* outnet)
struct pending* struct pending*
pending_udp_query(struct outside_network* outnet, ldns_buffer* packet, pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
struct sockaddr_storage* addr, socklen_t addrlen, int timeout, struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
comm_point_callback_t* callback, void* callback_arg, comm_point_callback_t* callback, void* callback_arg)
struct ub_randstate* ATTR_UNUSED(rnd))
{ {
struct replay_runtime* runtime = (struct replay_runtime*)outnet->base; struct replay_runtime* runtime = (struct replay_runtime*)outnet->base;
struct fake_pending* pend = (struct fake_pending*)calloc(1, struct fake_pending* pend = (struct fake_pending*)calloc(1,
@ -764,8 +765,7 @@ pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
struct waiting_tcp* struct waiting_tcp*
pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet, pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
struct sockaddr_storage* addr, socklen_t addrlen, int timeout, struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
comm_point_callback_t* callback, void* callback_arg, comm_point_callback_t* callback, void* callback_arg)
struct ub_randstate* ATTR_UNUSED(rnd))
{ {
struct replay_runtime* runtime = (struct replay_runtime*)outnet->base; struct replay_runtime* runtime = (struct replay_runtime*)outnet->base;
struct fake_pending* pend = (struct fake_pending*)calloc(1, struct fake_pending* pend = (struct fake_pending*)calloc(1,

View file

@ -524,6 +524,24 @@ cfg_scan_ports(int* avail, int num)
return count; return count;
} }
int cfg_condense_ports(struct config_file* cfg, int** avail)
{
int num = cfg_scan_ports(cfg->outgoing_avail_ports, 65536);
int i, at = 0;
*avail = NULL;
if(num == 0)
return 0;
*avail = (int*)malloc(sizeof(int)*num);
if(!*avail)
return 0;
for(i=0; i<65536; i++) {
if(cfg->outgoing_avail_ports[i])
(*avail)[at++] = cfg->outgoing_avail_ports[i];
}
log_assert(at == num);
return num;
}
/** print error with file and line number */ /** print error with file and line number */
void ub_c_error_va_list(const char *fmt, va_list args) void ub_c_error_va_list(const char *fmt, va_list args)
{ {

View file

@ -366,6 +366,14 @@ int cfg_parse_memsize(const char* str, size_t* res);
*/ */
int cfg_mark_ports(const char* str, int allow, int* avail, int num); int cfg_mark_ports(const char* str, int allow, int* avail, int num);
/**
* Get a condensed list of ports returned. allocated.
* @param cfg: config file.
* @param avail: the available ports array is returned here.
* @return: number of ports in array or 0 on error.
*/
int cfg_condense_ports(struct config_file* cfg, int** avail);
/** /**
* Scan ports available * Scan ports available
* @param avail: the array from cfg. * @param avail: the array from cfg.

View file

@ -152,7 +152,13 @@ comm_base_create()
} }
comm_base_now(b); comm_base_now(b);
verbose(VERB_ALGO, "libevent %s uses %s method.", verbose(VERB_ALGO, "libevent %s uses %s method.",
event_get_version(), event_get_method()); event_get_version(),
#ifdef HAVE_EVENT_BASE_GET_METHOD
event_base_get_method(b->eb->base)
#else
event_get_method()
#endif
);
return b; return b;
} }
@ -438,6 +444,8 @@ comm_point_udp_ancil_callback(int fd, short event, void* arg)
(void)comm_point_send_udp_msg_if(rep.c, rep.c->buffer, (void)comm_point_send_udp_msg_if(rep.c, rep.c->buffer,
(struct sockaddr*)&rep.addr, rep.addrlen, &rep); (struct sockaddr*)&rep.addr, rep.addrlen, &rep);
} }
if(rep.c->fd == -1) /* commpoint closed */
break;
} }
#else #else
fatal_exit("recvmsg: No support for IPV6_PKTINFO. " fatal_exit("recvmsg: No support for IPV6_PKTINFO. "
@ -482,6 +490,8 @@ comm_point_udp_callback(int fd, short event, void* arg)
(void)comm_point_send_udp_msg(rep.c, rep.c->buffer, (void)comm_point_send_udp_msg(rep.c, rep.c->buffer,
(struct sockaddr*)&rep.addr, rep.addrlen); (struct sockaddr*)&rep.addr, rep.addrlen);
} }
if(rep.c->fd == -1) /* commpoint closed */
break;
} }
} }
@ -856,8 +866,12 @@ comm_point_create_udp(struct comm_base *base, int fd, ldns_buffer* buffer,
evbits = EV_READ | EV_PERSIST; evbits = EV_READ | EV_PERSIST;
/* libevent stuff */ /* libevent stuff */
event_set(&c->ev->ev, c->fd, evbits, comm_point_udp_callback, c); event_set(&c->ev->ev, c->fd, evbits, comm_point_udp_callback, c);
if(event_base_set(base->eb->base, &c->ev->ev) != 0 || if(event_base_set(base->eb->base, &c->ev->ev) != 0) {
event_add(&c->ev->ev, c->timeout) != 0 ) { log_err("could not baseset udp event");
comm_point_delete(c);
return NULL;
}
if(fd!=-1 && event_add(&c->ev->ev, c->timeout) != 0 ) {
log_err("could not add udp event"); log_err("could not add udp event");
comm_point_delete(c); comm_point_delete(c);
return NULL; return NULL;
@ -902,8 +916,12 @@ comm_point_create_udp_ancil(struct comm_base *base, int fd,
evbits = EV_READ | EV_PERSIST; evbits = EV_READ | EV_PERSIST;
/* libevent stuff */ /* libevent stuff */
event_set(&c->ev->ev, c->fd, evbits, comm_point_udp_ancil_callback, c); event_set(&c->ev->ev, c->fd, evbits, comm_point_udp_ancil_callback, c);
if(event_base_set(base->eb->base, &c->ev->ev) != 0 || if(event_base_set(base->eb->base, &c->ev->ev) != 0) {
event_add(&c->ev->ev, c->timeout) != 0 ) { log_err("could not baseset udp event");
comm_point_delete(c);
return NULL;
}
if(fd!=-1 && event_add(&c->ev->ev, c->timeout) != 0 ) {
log_err("could not add udp event"); log_err("could not add udp event");
comm_point_delete(c); comm_point_delete(c);
return NULL; return NULL;
@ -1200,8 +1218,10 @@ comm_point_close(struct comm_point* c)
log_err("could not event_del on close"); log_err("could not event_del on close");
} }
/* close fd after removing from event lists, or epoll.. is messed up */ /* close fd after removing from event lists, or epoll.. is messed up */
if(c->fd != -1 && !c->do_not_close) if(c->fd != -1 && !c->do_not_close) {
verbose(VERB_ALGO, "close fd %d", c->fd);
close(c->fd); close(c->fd);
}
c->fd = -1; c->fd = -1;
} }
@ -1272,7 +1292,8 @@ comm_point_stop_listening(struct comm_point* c)
void void
comm_point_start_listening(struct comm_point* c, int newfd, int sec) comm_point_start_listening(struct comm_point* c, int newfd, int sec)
{ {
verbose(VERB_ALGO, "comm point start listening %d", c->fd); verbose(VERB_ALGO, "comm point start listening %d",
c->fd==-1?newfd:c->fd);
if(c->type == comm_tcp_accept && !c->tcp_free) { if(c->type == comm_tcp_accept && !c->tcp_free) {
/* no use to start listening no free slots. */ /* no use to start listening no free slots. */
return; return;