tcp queries get answers.

git-svn-id: file:///svn/unbound/trunk@76 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
Wouter Wijngaards 2007-02-07 15:44:19 +00:00
parent fa56f5ece1
commit b9f0fb076f

View file

@ -44,8 +44,8 @@
#include <errno.h>
/* -------- Start of local definitions -------- */
/** The TCP reading query timeout in seconds */
#define TCP_READ_TIMEOUT 120
/** The TCP reading or writing query timeout in seconds */
#define TCP_QUERY_TIMEOUT 120
/* We define libevent structures here to hide the libevent stuff. */
@ -255,7 +255,7 @@ setup_tcp_handler(struct comm_point* c, int fd)
ldns_buffer_clear(c->buffer);
c->tcp_is_reading = 1;
c->tcp_byte_count = 0;
comm_point_start_listening(c, fd, TCP_READ_TIMEOUT);
comm_point_start_listening(c, fd, TCP_QUERY_TIMEOUT);
}
static void
@ -313,64 +313,74 @@ reclaim_tcp_handler(struct comm_point* c)
}
}
/** do the callback when writing is done */
static void
tcp_callback_writer(struct comm_point* c)
{
log_assert(c->type == comm_tcp);
ldns_buffer_clear(c->buffer);
c->tcp_is_reading = 1;
c->tcp_byte_count = 0;
comm_point_stop_listening(c);
/* for listening socket */
reclaim_tcp_handler(c);
}
/** do the callback when reading is done */
static void
tcp_callback_reader(struct comm_point* c)
{
struct comm_reply rep;
log_assert(c->type == comm_tcp);
ldns_buffer_flip(c->buffer);
c->tcp_is_reading = 0;
c->tcp_byte_count = 0;
comm_point_stop_listening(c);
if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, NULL) ) {
/* setup to send reply */
rep.c = c;
rep.addrlen = 0;
if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &rep) ) {
comm_point_start_listening(c, -1, TCP_QUERY_TIMEOUT);
}
}
static void
comm_point_tcp_handle_callback(int fd, short event, void* arg)
/** Handle tcp reading callback.
* @param fd: file descriptor of socket.
* @param c: comm point to read from into buffer.
* @return: 0 on error
*/
static int
comm_point_tcp_handle_read(int fd, struct comm_point* c)
{
struct comm_point* c = (struct comm_point*)arg;
ssize_t r;
log_info("callback tcphandle for %x", (int)c);
log_assert(c->type == comm_tcp);
if(!(event&EV_READ)) {
if(event&EV_TIMEOUT) {
verbose(VERB_DETAIL, "tcp read took to long drop");
reclaim_tcp_handler(c);
return;
}
log_err("Ignored event %d for tcphdl.", event);
return;
}
if(!c->tcp_is_reading) {
reclaim_tcp_handler(c);
return;
}
if(!c->tcp_is_reading)
return 0;
if(c->tcp_byte_count < sizeof(uint16_t)) {
/* read length bytes */
r = read(fd, ldns_buffer_at(c->buffer, c->tcp_byte_count),
sizeof(uint16_t)-c->tcp_byte_count);
if(r == 0) {
reclaim_tcp_handler(c);
return;
} else if(r == -1) {
if(errno != EINTR && errno != EAGAIN)
log_err("read (in tcp s): %s", strerror(errno));
return;
if(r == 0)
return 0;
else if(r == -1) {
if(errno == EINTR || errno == EAGAIN)
return 1;
log_err("read (in tcp s): %s", strerror(errno));
return 0;
}
c->tcp_byte_count += r;
if(c->tcp_byte_count != sizeof(uint16_t))
return;
return 1;
if(ldns_buffer_read_u16_at(c->buffer, 0) >
ldns_buffer_capacity(c->buffer)) {
verbose(VERB_DETAIL, "tcp: dropped larger than buffer");
return 0;
}
ldns_buffer_set_limit(c->buffer,
ldns_buffer_read_u16_at(c->buffer, 0));
if(ldns_buffer_limit(c->buffer) < LDNS_HEADER_SIZE) {
verbose(VERB_DETAIL, "tcp: dropped bogus too short.");
reclaim_tcp_handler(c);
return;
return 0;
}
log_info("Reading tcp query of length %d",
ldns_buffer_limit(c->buffer));
@ -379,17 +389,97 @@ comm_point_tcp_handle_callback(int fd, short event, void* arg)
r = read(fd, ldns_buffer_current(c->buffer),
ldns_buffer_remaining(c->buffer));
if(r == 0) {
reclaim_tcp_handler(c);
return;
return 0;
} else if(r == -1) {
if(errno != EINTR && errno != EAGAIN)
log_err("read (in tcp r): %s", strerror(errno));
return;
if(errno == EINTR || errno == EAGAIN)
return 1;
log_err("read (in tcp r): %s", strerror(errno));
return 0;
}
ldns_buffer_skip(c->buffer, r);
if(ldns_buffer_remaining(c->buffer) <= 0) {
tcp_callback_reader(c);
}
return 1;
}
/** Handle tcp writing callback.
* @param fd: file descriptor of socket.
* @param c: comm point to write buffer out of.
* @return: 0 on error
*/
static int
comm_point_tcp_handle_write(int fd, struct comm_point* c)
{
ssize_t r;
log_assert(c->type == comm_tcp);
if(c->tcp_is_reading)
return 0;
if(c->tcp_byte_count < sizeof(uint16_t)) {
uint16_t len = htons(ldns_buffer_limit(c->buffer));
r = write(fd, &len, sizeof(uint16_t)-c->tcp_byte_count);
if(r == -1) {
if(errno == EINTR || errno == EAGAIN)
return 1;
log_err("tcp write(s): %s", strerror(errno));
return 0;
}
c->tcp_byte_count += r;
if(c->tcp_byte_count != sizeof(uint16_t))
return 1;
ldns_buffer_set_position(c->buffer, 0);
}
r = write(fd, ldns_buffer_current(c->buffer),
ldns_buffer_remaining(c->buffer));
if(r == -1) {
if(errno == EINTR || errno == EAGAIN)
return 1;
log_err("tcp write(w): %s", strerror(errno));
return 0;
}
ldns_buffer_skip(c->buffer, r);
if(ldns_buffer_remaining(c->buffer) == 0) {
tcp_callback_writer(c);
}
return 1;
}
static void
comm_point_tcp_handle_callback(int fd, short event, void* arg)
{
struct comm_point* c = (struct comm_point*)arg;
log_assert(c->type == comm_tcp);
if(event&EV_READ) {
if(!comm_point_tcp_handle_read(fd, c)) {
reclaim_tcp_handler(c);
if(!c->tcp_do_close)
(void)(*c->callback)(c, c->cb_arg,
NETEVENT_CLOSED, NULL);
}
return;
}
if(event&EV_WRITE) {
if(!comm_point_tcp_handle_write(fd, c)) {
reclaim_tcp_handler(c);
if(!c->tcp_do_close)
(void)(*c->callback)(c, c->cb_arg,
NETEVENT_CLOSED, NULL);
}
return;
}
if(event&EV_TIMEOUT) {
verbose(VERB_DETAIL, "tcp took too long, dropped");
reclaim_tcp_handler(c);
if(!c->tcp_do_close)
(void)(*c->callback)(c, c->cb_arg,
NETEVENT_TIMEOUT, NULL);
return;
}
log_err("Ignored event %d for tcphdl.", event);
}
struct comm_point*
@ -599,13 +689,13 @@ comm_point_send_reply(struct comm_reply *repinfo)
comm_point_send_udp_msg(repinfo->c, repinfo->c->buffer,
(struct sockaddr*)&repinfo->addr, repinfo->addrlen);
} else {
log_info("tcp reply");
/* TODO */
comm_point_start_listening(repinfo->c, -1, TCP_QUERY_TIMEOUT);
}
}
void comm_point_stop_listening(struct comm_point* c)
{
log_info("comm point stop listening %x", (int)c);
if(event_del(&c->ev->ev) != 0) {
log_err("event_del error to stoplisten");
}
@ -613,6 +703,7 @@ void comm_point_stop_listening(struct comm_point* c)
void comm_point_start_listening(struct comm_point* c, int newfd, int sec)
{
log_info("comm point start listening %x", (int)c);
if(c->type == comm_tcp_accept && !c->tcp_free) {
/* no use to start listening no free slots. */
return;
@ -626,13 +717,23 @@ void comm_point_start_listening(struct comm_point* c, int newfd, int sec)
return;
}
}
#ifndef S_SPLINT_S /* splint fails on struct timeval. */
c->timeout->tv_sec = sec;
c->timeout->tv_usec = 0;
#endif /* S_SPLINT_S */
}
if(c->type == comm_tcp) {
c->ev->ev.ev_events &= ~(EV_READ|EV_WRITE);
if(c->tcp_is_reading)
c->ev->ev.ev_events |= EV_READ;
else c->ev->ev.ev_events |= EV_WRITE;
}
if(newfd != -1) {
if(c->fd != -1)
close(c->fd);
c->fd = newfd;
c->ev->ev.ev_fd = c->fd;
}
if(c->fd != -1)
close(c->fd);
c->fd = newfd;
c->ev->ev.ev_fd = c->fd;
if(event_add(&c->ev->ev, sec==0?NULL:c->timeout) != 0) {
log_err("event_add failed. in cpsl.");
}