tcp input.

git-svn-id: file:///svn/unbound/trunk@75 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
Wouter Wijngaards 2007-02-07 14:18:42 +00:00
parent 076c61d4e8
commit fa56f5ece1
4 changed files with 232 additions and 42 deletions

View file

@ -6,6 +6,7 @@
- set addrlen value when calling recvfrom.
- comparison of addrs more portable.
- LIBEVENT option for testbed to set libevent directory.
- work on tcp input.
6 February 2007: Wouter
- reviewed code and improved in places.

View file

@ -59,31 +59,6 @@
/** byte size of ip6 address */
#define INET6_SIZE 16
/** send addr to logfile */
static void
log_addr(struct sockaddr_storage* addr, socklen_t addrlen)
{
uint16_t port;
const char* family = "unknown";
char dest[100];
int af = (int)((struct sockaddr_in*)addr)->sin_family;
void* sinaddr = &((struct sockaddr_in*)addr)->sin_addr;
switch(af) {
case AF_INET: family="ip4"; break;
case AF_INET6: family="ip6";
sinaddr = &((struct sockaddr_in6*)addr)->sin6_addr;
break;
case AF_UNIX: family="unix"; break;
default: break;
}
if(inet_ntop(af, sinaddr, dest, (socklen_t)sizeof(dest)) == 0) {
strncpy(dest, "(inet_ntop error)", sizeof(dest));
}
port = ntohs(((struct sockaddr_in*)addr)->sin_port);
log_info("addr fam=%s port=%d dest=%s len=%d",
family, (int)port, dest, (int)addrlen);
}
/** compare function of pending rbtree */
static int
pending_cmp(const void* key1, const void* key2)

View file

@ -44,6 +44,9 @@
#include <errno.h>
/* -------- Start of local definitions -------- */
/** The TCP reading query timeout in seconds */
#define TCP_READ_TIMEOUT 120
/* We define libevent structures here to hide the libevent stuff. */
/* we use libevent */
@ -243,24 +246,150 @@ comm_point_udp_callback(int fd, short event, void* arg)
}
}
static void
comm_point_tcp_accept_callback(int ATTR_UNUSED(fd), short ATTR_UNUSED(event),
void* arg)
/** Use a new tcp handler for new query fd, set to read query. */
static void
setup_tcp_handler(struct comm_point* c, int fd)
{
struct comm_point* c = (struct comm_point*)arg;
log_info("callback tcpaccept for %x", (int)c);
log_assert(c->type == comm_tcp_accept);
/* TODO */
log_assert(c->type == comm_tcp);
log_assert(c->fd == -1);
ldns_buffer_clear(c->buffer);
c->tcp_is_reading = 1;
c->tcp_byte_count = 0;
comm_point_start_listening(c, fd, TCP_READ_TIMEOUT);
}
static void
comm_point_tcp_handle_callback(int ATTR_UNUSED(fd), short ATTR_UNUSED(event),
void* arg)
comm_point_tcp_accept_callback(int fd, short event, void* arg)
{
struct comm_point* c = (struct comm_point*)arg, *c_hdl;
struct comm_reply rep;
int new_fd;
log_info("callback tcpaccept for %x", (int)c);
log_assert(c->type == comm_tcp_accept);
if(!(event & EV_READ)) {
log_info("ignoring tcp accept event %d", (int)event);
return;
}
/* accept incoming connection. */
rep.c = NULL;
rep.addrlen = (socklen_t)sizeof(rep.addr);
new_fd = accept(fd, (struct sockaddr*)&rep.addr, &rep.addrlen);
if(new_fd == -1) {
/* EINTR is signal interrupt. others are closed connection. */
if(errno != EINTR && errno != EWOULDBLOCK &&
errno != ECONNABORTED && errno != EPROTO)
log_err("accept failed: %s", strerror(errno));
return;
}
/* find free tcp handler. */
if(!c->tcp_free) {
log_err("accepted too many tcp, connections full, from:");
log_addr(&rep.addr, rep.addrlen);
close(new_fd);
return;
}
/* grab it */
c_hdl = c->tcp_free;
c->tcp_free = c_hdl->tcp_free;
if(!c->tcp_free) {
/* stop accepting incoming queries for now. */
comm_point_stop_listening(c);
}
/* addr is dropped. Not needed for tcp reply. */
setup_tcp_handler(c_hdl, new_fd);
}
/** Make tcp handler free for next assignment. */
static void
reclaim_tcp_handler(struct comm_point* c)
{
log_assert(c->type == comm_tcp);
comm_point_close(c);
c->tcp_free = c->tcp_parent->tcp_free;
c->tcp_parent->tcp_free = c;
if(!c->tcp_free) {
/* re-enable listening on accept socket */
comm_point_start_listening(c->tcp_parent, -1, -1);
}
}
/** do the callback when reading is done */
static void
tcp_callback_reader(struct comm_point* c)
{
log_assert(c->type == comm_tcp);
ldns_buffer_flip(c->buffer);
c->tcp_is_reading = 0;
c->tcp_byte_count = 0;
comm_point_stop_listening(c);
if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, NULL) ) {
/* setup to send reply */
}
}
static void
comm_point_tcp_handle_callback(int fd, short event, void* arg)
{
struct comm_point* c = (struct comm_point*)arg;
log_info("callback tcpaccept for %x", (int)c);
ssize_t r;
log_info("callback tcphandle for %x", (int)c);
log_assert(c->type == comm_tcp);
/* TODO */
if(!(event&EV_READ)) {
if(event&EV_TIMEOUT) {
verbose(VERB_DETAIL, "tcp read took to long drop");
reclaim_tcp_handler(c);
return;
}
log_err("Ignored event %d for tcphdl.", event);
return;
}
if(!c->tcp_is_reading) {
reclaim_tcp_handler(c);
return;
}
if(c->tcp_byte_count < sizeof(uint16_t)) {
/* read length bytes */
r = read(fd, ldns_buffer_at(c->buffer, c->tcp_byte_count),
sizeof(uint16_t)-c->tcp_byte_count);
if(r == 0) {
reclaim_tcp_handler(c);
return;
} else if(r == -1) {
if(errno != EINTR && errno != EAGAIN)
log_err("read (in tcp s): %s", strerror(errno));
return;
}
c->tcp_byte_count += r;
if(c->tcp_byte_count != sizeof(uint16_t))
return;
ldns_buffer_set_limit(c->buffer,
ldns_buffer_read_u16_at(c->buffer, 0));
if(ldns_buffer_limit(c->buffer) < LDNS_HEADER_SIZE) {
verbose(VERB_DETAIL, "tcp: dropped bogus too short.");
reclaim_tcp_handler(c);
return;
}
log_info("Reading tcp query of length %d",
ldns_buffer_limit(c->buffer));
}
r = read(fd, ldns_buffer_current(c->buffer),
ldns_buffer_remaining(c->buffer));
if(r == 0) {
reclaim_tcp_handler(c);
return;
} else if(r == -1) {
if(errno != EINTR && errno != EAGAIN)
log_err("read (in tcp r): %s", strerror(errno));
return;
}
ldns_buffer_skip(c->buffer, r);
if(ldns_buffer_remaining(c->buffer) <= 0) {
tcp_callback_reader(c);
}
}
struct comm_point*
@ -284,7 +413,6 @@ comm_point_create_udp(struct comm_base *base, int fd, ldns_buffer* buffer,
c->tcp_is_reading = 0;
c->tcp_byte_count = 0;
c->tcp_parent = NULL;
c->cur_tcp_count = 0;
c->max_tcp_count = 0;
c->tcp_handlers = NULL;
c->tcp_free = NULL;
@ -323,11 +451,21 @@ comm_point_create_tcp_handler(struct comm_base *base,
}
c->fd = -1;
c->buffer = ldns_buffer_new(bufsize);
c->timeout = NULL;
if(!c->buffer) {
free(c->ev);
free(c);
return NULL;
}
c->timeout = (struct timeval*)malloc(sizeof(struct timeval));
if(!c->timeout) {
ldns_buffer_free(c->buffer);
free(c->ev);
free(c);
return NULL;
}
c->tcp_is_reading = 0;
c->tcp_byte_count = 0;
c->tcp_parent = parent;
c->cur_tcp_count = 0;
c->max_tcp_count = 0;
c->tcp_handlers = NULL;
c->tcp_free = NULL;
@ -376,7 +514,6 @@ comm_point_create_tcp(struct comm_base *base, int fd, int num, size_t bufsize,
c->tcp_is_reading = 0;
c->tcp_byte_count = 0;
c->tcp_parent = NULL;
c->cur_tcp_count = 0;
c->max_tcp_count = num;
c->tcp_handlers = (struct comm_point**)calloc((size_t)num,
sizeof(struct comm_point*));
@ -467,6 +604,40 @@ comm_point_send_reply(struct comm_reply *repinfo)
}
}
void comm_point_stop_listening(struct comm_point* c)
{
if(event_del(&c->ev->ev) != 0) {
log_err("event_del error to stoplisten");
}
}
void comm_point_start_listening(struct comm_point* c, int newfd, int sec)
{
if(c->type == comm_tcp_accept && !c->tcp_free) {
/* no use to start listening no free slots. */
return;
}
if(sec != -1 && sec != 0) {
if(!c->timeout) {
c->timeout = (struct timeval*)malloc(sizeof(
struct timeval));
if(!c->timeout) {
log_err("cpsl: malloc failed. No net read.");
return;
}
}
c->timeout->tv_sec = sec;
c->timeout->tv_usec = 0;
}
if(c->fd != -1)
close(c->fd);
c->fd = newfd;
c->ev->ev.ev_fd = c->fd;
if(event_add(&c->ev->ev, sec==0?NULL:c->timeout) != 0) {
log_err("event_add failed. in cpsl.");
}
}
struct comm_timer*
comm_timer_create(struct comm_base* base, void (*cb)(void*), void* cb_arg)
{
@ -607,3 +778,27 @@ comm_signal_delete(struct comm_signal* comsig)
}
free(comsig);
}
void
log_addr(struct sockaddr_storage* addr, socklen_t addrlen)
{
uint16_t port;
const char* family = "unknown";
char dest[100];
int af = (int)((struct sockaddr_in*)addr)->sin_family;
void* sinaddr = &((struct sockaddr_in*)addr)->sin_addr;
switch(af) {
case AF_INET: family="ip4"; break;
case AF_INET6: family="ip6";
sinaddr = &((struct sockaddr_in6*)addr)->sin6_addr;
break;
case AF_UNIX: family="unix"; break;
default: break;
}
if(inet_ntop(af, sinaddr, dest, (socklen_t)sizeof(dest)) == 0) {
strncpy(dest, "(inet_ntop error)", sizeof(dest));
}
port = ntohs(((struct sockaddr_in*)addr)->sin_port);
log_info("addr fam=%s port=%d dest=%s len=%d",
family, (int)port, dest, (int)addrlen);
}

View file

@ -119,8 +119,6 @@ struct comm_point {
struct comm_point* tcp_parent;
/* -------- TCP Accept -------- */
/** current number of TCP connections on this socket */
int cur_tcp_count;
/** the number of TCP handlers for this tcp-accept socket */
int max_tcp_count;
/** malloced array of tcp handlers for a tcp-accept,
@ -322,6 +320,20 @@ void comm_point_send_reply(struct comm_reply* repinfo);
int comm_point_send_udp_msg(struct comm_point* c, ldns_buffer* packet,
struct sockaddr* addr, socklen_t addrlen);
/**
* Stop listening for input on the commpoint. No callbacks will happen.
* @param c: commpoint to disable. The fd is not closed.
*/
void comm_point_stop_listening(struct comm_point* c);
/**
* Start listening again for input on the comm point.
* @param c: commpoint to enable again.
* @param newfd: new fd, or -1 to leave fd be.
* @param sec: timeout in seconds, or -1 for no (change to the) timeout.
*/
void comm_point_start_listening(struct comm_point* c, int newfd, int sec);
/**
* create timer. Not active upon creation.
* @param base: event handling base.
@ -383,4 +395,11 @@ int comm_signal_bind(struct comm_signal* comsig, int sig);
*/
void comm_signal_delete(struct comm_signal* comsig);
/**
* Prints the sockaddr in readable format with log_info. Debug helper.
* @param addr: the sockaddr to print. Can be ip4 or ip6.
* @param addrlen: length of addr.
*/
void log_addr(struct sockaddr_storage* addr, socklen_t addrlen);
#endif /* NET_EVENT_H */