- outside network more UDP work.

- moved * closer to type.
       - comm_timer object and events.


git-svn-id: file:///svn/unbound/trunk@49 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
Wouter Wijngaards 2007-02-01 15:06:38 +00:00
parent 57aef52d48
commit 9b718bc539
7 changed files with 307 additions and 78 deletions

View file

@ -1,3 +1,8 @@
1 February 2007: Wouter
- outside network more UDP work.
- moved * closer to type.
- comm_timer object and events.
31 January 2007: Wouter
- Added makedist.sh script to make release tarball.
- Removed listen callback layer, did not add anything.

View file

@ -52,6 +52,9 @@
#include <errno.h>
#include <fcntl.h>
/** number of times to retry making a random ID that is unique. */
#define MAX_ID_RETRY 1000
/** compare function of pending rbtree */
static int pending_cmp(const void* key1, const void* key2)
{
@ -70,33 +73,41 @@ static int pending_cmp(const void* key1, const void* key2)
return memcmp(&p1->addr, &p2->addr, p1->addrlen);
}
/** compare function of pending_timeout rbtree */
static int pending_timeout_cmp(const void* key1, const void* key2)
/** callback for incoming udp answers from the network. */
static int outnet_udp_cb(struct comm_point* c, void* arg, int error,
struct comm_reply *reply_info)
{
struct pending_timeout *p1 = (struct pending_timeout*)key1;
struct pending_timeout *p2 = (struct pending_timeout*)key2;
if(p1->timeout.tv_sec < p2->timeout.tv_sec)
return -1;
if(p1->timeout.tv_sec > p2->timeout.tv_sec)
return 1;
log_assert(p1->timeout.tv_sec == p2->timeout.tv_sec);
if(p1->timeout.tv_usec < p2->timeout.tv_usec)
return -1;
if(p1->timeout.tv_usec > p2->timeout.tv_usec)
return 1;
log_assert(p1->timeout.tv_usec == p2->timeout.tv_usec);
if(p1 < p2)
return -1;
if(p1 > p2)
return 1;
struct outside_network* outnet = (struct outside_network*)arg;
struct pending key;
struct pending* p;
log_info("answer cb");
if(error != 0) {
log_info("outnetudp got udp error %d", error);
return 0;
}
log_assert(reply_info);
/* setup lookup key */
key.id = LDNS_ID_WIRE(ldns_buffer_begin(c->buffer));
memcpy(&key.addr, &reply_info->addr, reply_info->addrlen);
key.addrlen = reply_info->addrlen;
/* find it, see if this thing is a valid query */
p = (struct pending*)rbtree_search(outnet->pending, &key);
if(!p) {
verbose(VERB_DETAIL, "received uncalled udp reply. dropped.");
return 0;
}
/** callback for incoming udp answers from the network. */
static int outnet_udp_cb(struct comm_point* c, void* my_arg, int error,
struct comm_reply *reply_info)
{
log_info("answer cb");
verbose(VERB_ALGO, "received udp reply.");
if(p->c != c) {
verbose(VERB_DETAIL, "received answer on wrong port. dropped");
return 0;
}
comm_timer_disable(p->timer);
/* TODO handle it */
return 0;
}
@ -166,6 +177,13 @@ make_udp_range(struct outside_network* outnet, const char* ifname,
return 1;
}
/** callback for udp timeout */
static void pending_udp_timer_cb(void *arg)
{
struct pending* p = (struct pending*)arg;
/* it timed out . TODO handle it. */
}
struct outside_network*
outside_network_create(struct comm_base *base, size_t bufsize,
size_t num_ports, const char** ifs, int num_ifs, int do_ip4,
@ -183,9 +201,7 @@ outside_network_create(struct comm_base *base, size_t bufsize,
if( !(outnet->udp_buff = ldns_buffer_new(bufsize)) ||
!(outnet->udp_ports = (struct comm_point **)calloc(
outnet->num_udp, sizeof(struct comm_point*))) ||
!(outnet->pending = rbtree_create(pending_cmp)) ||
!(outnet->pending_timeout = rbtree_create(
pending_timeout_cmp))) {
!(outnet->pending = rbtree_create(pending_cmp)) ) {
log_err("malloc failed");
outside_network_delete(outnet);
return NULL;
@ -204,7 +220,6 @@ outside_network_create(struct comm_base *base, size_t bufsize,
return NULL;
}
}
return outnet;
}
@ -216,17 +231,13 @@ void outside_network_delete(struct outside_network* outnet)
if(outnet->pending) {
struct pending *p, *np;
p = (struct pending*)rbtree_first(outnet->pending);
while(p) {
while(p && (rbnode_t*)p!=RBTREE_NULL) {
np = (struct pending*)rbtree_next((rbnode_t*)p);
pending_delete(NULL, p);
p = np;
}
free(outnet->pending);
}
if(outnet->pending_timeout) {
log_assert(outnet->pending_timeout->count == 0);
free(outnet->pending_timeout);
}
if(outnet->udp_buff)
ldns_buffer_free(outnet->udp_buff);
if(outnet->udp_ports) {
@ -246,22 +257,91 @@ void pending_delete(struct outside_network* outnet, struct pending* p)
return;
if(outnet) {
(void)rbtree_delete(outnet->pending, p->node.key);
(void)rbtree_delete(outnet->pending_timeout,
p->timeout->node.key);
}
free(p->timeout);
if(p->timer)
comm_timer_delete(p->timer);
free(p);
}
/** create a new pending item with given characteristics, false on failure */
static struct pending*
new_pending(struct outside_network* outnet, ldns_buffer* packet,
struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
struct comm_point* c)
{
/* alloc */
int id_tries = 0;
struct timeval tv;
struct pending* pend = (struct pending*)calloc(1,
sizeof(struct pending));
if(!pend) {
log_err("malloc failure");
return NULL;
}
pend->timer = comm_timer_create(outnet->base, pending_udp_timer_cb,
pend);
if(!pend->timer) {
free(pend);
return NULL;
}
/* set */
pend->id = LDNS_ID_WIRE(ldns_buffer_begin(packet));
memcpy(&pend->addr, addr, addrlen);
pend->addrlen = addrlen;
pend->c = c;
/* insert in tree */
pend->node.key = pend;
while(!rbtree_insert(outnet->pending, &pend->node)) {
/* change ID to avoid collision */
pend->id = (random()>>8) & 0xffff;
LDNS_ID_SET(ldns_buffer_begin(packet), pend->id);
id_tries++;
if(id_tries == MAX_ID_RETRY) {
log_err("failed to generate unique ID, drop msg");
pending_delete(NULL, pend);
return NULL;
}
}
tv.tv_sec = time(NULL) + timeout;
tv.tv_usec = 0;
comm_timer_set(pend->timer, &tv);
return pend;
}
void pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
struct sockaddr_storage* addr, socklen_t addrlen, int timeout)
{
struct pending* pend;
/* choose a random outgoing port and interface */
/* uses lousy random() function. TODO: entropy source. */
double precho = (double)random() * (double)outnet->num_udp /
((double)RAND_MAX + 1.0);
int chosen = (int)precho;
struct comm_point *c;
log_assert(outnet && outnet->udp_ports);
/* don't trust in perfect double rounding */
if(chosen < 0) chosen = 0;
if(chosen >= (int)outnet->num_udp) chosen = (int)outnet->num_udp-1;
c = outnet->udp_ports[chosen];
log_assert(c);
/* create pending struct (and possibly change ID to be unique) */
if(!(pend=new_pending(outnet, packet, addr, addrlen, timeout, c))) {
(void)(*c->callback)(c, c->cb_arg, 1, NULL);
return;
}
log_info("chose query %x outbound %d of %d",
LDNS_ID_WIRE(ldns_buffer_begin(packet)), chosen,
outnet->num_udp);
/* send it over the commlink */
if(!comm_point_send_udp_msg(c, packet, (struct sockaddr*)addr,
addrlen)) {
/* error, call error callback function */
pending_delete(outnet, pend);
(void)(*c->callback)(c, c->cb_arg, 1, NULL);
return;
}
}

View file

@ -72,8 +72,6 @@ struct outside_network {
/** pending answers. sorted by id, addr */
rbtree_t *pending;
/** Each pending answer has a timeout, sorted by timeout. */
rbtree_t *pending_timeout;
};
/**
@ -90,20 +88,8 @@ struct pending {
socklen_t addrlen;
/** comm point it was sent on (and reply must come back on). */
struct comm_point* c;
/** the timeout of the query */
struct pending_timeout *timeout;
};
/**
* Timeout structure for pending queries
*/
struct pending_timeout {
/** entry in rbtree. key is timeout value, then pending ptr(uniq). */
rbnode_t node;
/** timeout, an absolute time value. */
struct timeval timeout;
/** pending query */
struct pending* pending;
/** timeout event */
struct comm_timer* timer;
};
/**

View file

@ -65,6 +65,16 @@ struct internal_base {
struct event_base* base;
};
/**
* Internal timer structure, to store timer event in.
*/
struct internal_timer {
/** libevent event type, alloced here */
struct event ev;
/** is timer enabled, yes or no */
uint8_t enabled;
};
/**
* handle libevent callback for udp comm point.
* @param fd: file descriptor.
@ -92,6 +102,15 @@ static void comm_point_tcp_accept_callback(int fd, short event, void* arg);
*/
static void comm_point_tcp_handle_callback(int fd, short event, void* arg);
/**
* handle libevent callback for timer comm.
* @param fd: file descriptor (always -1).
* @param event: event bits from libevent:
* EV_READ, EV_WRITE, EV_SIGNAL, EV_TIMEOUT.
* @param arg: the comm_point structure.
*/
static void comm_timer_callback(int fd, short event, void* arg);
/** create a tcp handler with a parent */
static struct comm_point* comm_point_create_tcp_handler(
struct comm_base *base, struct comm_point* parent, size_t bufsize,
@ -147,19 +166,23 @@ comm_base_dispatch(struct comm_base* b)
}
/** send a UDP reply */
static void
comm_point_send_udp_msg(struct comm_point *c, struct sockaddr* addr,
socklen_t addrlen) {
int
comm_point_send_udp_msg(struct comm_point *c, ldns_buffer* packet,
struct sockaddr* addr, socklen_t addrlen)
{
ssize_t sent;
sent = sendto(c->fd, ldns_buffer_begin(c->buffer),
ldns_buffer_remaining(c->buffer), 0,
sent = sendto(c->fd, ldns_buffer_begin(packet),
ldns_buffer_remaining(packet), 0,
addr, addrlen);
if(sent == -1) {
log_err("sendto failed: %s", strerror(errno));
} else if((size_t)sent != ldns_buffer_remaining(c->buffer)) {
return 0;
} else if((size_t)sent != ldns_buffer_remaining(packet)) {
log_err("sent %d in place of %d bytes",
sent, (int)ldns_buffer_remaining(c->buffer));
sent, (int)ldns_buffer_remaining(packet));
return 0;
}
return 1;
}
static void
@ -188,8 +211,8 @@ comm_point_udp_callback(int fd, short event, void* arg)
ldns_buffer_flip(rep.c->buffer);
if((*rep.c->callback)(rep.c, rep.c->cb_arg, 0, &rep)) {
/* send back immediate reply */
comm_point_send_udp_msg(rep.c, (struct sockaddr*)&rep.addr,
rep.addrlen);
(void)comm_point_send_udp_msg(rep.c, rep.c->buffer,
(struct sockaddr*)&rep.addr, rep.addrlen);
}
}
@ -402,10 +425,77 @@ void comm_point_send_reply(struct comm_reply *repinfo)
{
log_assert(repinfo && repinfo->c);
if(repinfo->c->type == comm_udp) {
comm_point_send_udp_msg(repinfo->c,
comm_point_send_udp_msg(repinfo->c, repinfo->c->buffer,
(struct sockaddr*)&repinfo->addr, repinfo->addrlen);
} else {
log_info("tcp reply");
}
}
struct comm_timer* comm_timer_create(struct comm_base* base,
void (*cb)(void*), void* cb_arg)
{
struct comm_timer *tm = (struct comm_timer*)calloc(1,
sizeof(struct comm_timer));
if(!tm)
return NULL;
tm->ev_timer = (struct internal_timer*)calloc(1,
sizeof(struct internal_timer));
if(!tm->ev_timer) {
log_err("malloc failed");
free(tm);
return NULL;
}
tm->callback = cb;
tm->cb_arg = cb_arg;
evtimer_set(&tm->ev_timer->ev, comm_timer_callback, tm);
if(event_base_set(base->eb->base, &tm->ev_timer->ev) != 0) {
log_err("timer_create: event_base_set failed.");
free(tm->ev_timer);
free(tm);
return NULL;
}
return tm;
}
void comm_timer_disable(struct comm_timer* timer)
{
if(!timer)
return;
evtimer_del(&timer->ev_timer->ev);
timer->ev_timer->enabled = 0;
}
void comm_timer_set(struct comm_timer* timer, struct timeval* tv)
{
if(timer->ev_timer->enabled)
comm_timer_disable(timer);
memcpy((struct timeval*)&timer->timeout, tv, sizeof(struct timeval));
evtimer_add(&timer->ev_timer->ev, (struct timeval*)&timer->timeout);
timer->ev_timer->enabled = 1;
}
void comm_timer_delete(struct comm_timer* timer)
{
if(!timer)
return;
comm_timer_disable(timer);
free(timer->ev_timer);
free(timer);
}
static void
comm_timer_callback(int ATTR_UNUSED(fd), short event, void* arg)
{
struct comm_timer* tm = (struct comm_timer*)arg;
if(!(event&EV_TIMEOUT))
return;
tm->ev_timer->enabled = 0;
(*tm->callback)(tm->cb_arg);
}
int
comm_timer_is_set(struct comm_timer* timer)
{
return (int)timer->ev_timer->enabled;
}

View file

@ -60,6 +60,7 @@ struct comm_reply;
/* internal event notification data storage structure. */
struct internal_event;
struct internal_base;
struct internal_timer;
/** callback from communication point function type */
typedef int comm_point_callback_t(struct comm_point*, void*, int,
@ -176,6 +177,26 @@ struct comm_reply {
socklen_t addrlen;
};
/**
* Structure only for making timeout events.
*/
struct comm_timer {
/** the internal event stuff */
struct internal_timer* ev_timer;
/**
* the timeout, absolute value seconds.
* Do not write to this, call comm_timer_set instead.
*/
const struct timeval timeout;
/** callback function, takes user arg only */
void (*callback)(void*);
/** callback user argument */
void* cb_arg;
};
/**
* Create a new comm base.
* @return: the new comm base. NULL on error.
@ -257,4 +278,51 @@ void comm_point_set_cb_arg(struct comm_point* c, void *arg);
*/
void comm_point_send_reply(struct comm_reply* repinfo);
/**
* Send an udp message over a commpoint.
* @param c: commpoint to send it from.
* @param packet: what to send.
* @param addr: where to send it to.
* @param addrlen: length of addr.
* @return: false on a failure.
*/
int comm_point_send_udp_msg(struct comm_point* c, ldns_buffer* packet,
struct sockaddr* addr, socklen_t addrlen);
/**
* create timer. Not active upon creation.
* @param base: event handling base.
* @param cb: callback function: void myfunc(void* myarg);
* @param cb_arg: user callback argument.
* @return: the new timer or NULL on error.
*/
struct comm_timer* comm_timer_create(struct comm_base* base,
void (*cb)(void*), void* cb_arg);
/**
* disable timer. Stops callbacks from happening.
* @param timer: to disable.
*/
void comm_timer_disable(struct comm_timer* timer);
/**
* reset timevalue for timer.
* @param timer: timer to (re)set.
* @param tv: when the timer should activate. if NULL timer is disabled.
*/
void comm_timer_set(struct comm_timer* timer, struct timeval* tv);
/**
* delete timer.
* @param timer: to delete.
*/
void comm_timer_delete(struct comm_timer* timer);
/**
* see if timeout has been set to a value.
* @param timer: the timer to examine.
* @return: false if disabled or not set.
*/
int comm_timer_is_set(struct comm_timer* timer);
#endif /* NET_EVENT_H */