mirror of
https://github.com/NLnetLabs/unbound.git
synced 2025-12-22 07:41:16 -05:00
delayer in TCP.
git-svn-id: file:///svn/unbound/trunk@990 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
parent
1dc9abfb79
commit
5979bc061f
2 changed files with 408 additions and 22 deletions
|
|
@ -1,4 +1,9 @@
|
||||||
23 February 2008: Wouter
|
26 February 2008: Wouter
|
||||||
|
- delay utility delays TCP as well. If the server that is forwarded
|
||||||
|
to has a TCP error, the delay utility closes the connection.
|
||||||
|
- delay does REUSE_ADDR, and can handle a server that closes its end.
|
||||||
|
|
||||||
|
25 February 2008: Wouter
|
||||||
- delay utility works. Gets decent thoughput too (>20000).
|
- delay utility works. Gets decent thoughput too (>20000).
|
||||||
|
|
||||||
22 February 2008: Wouter
|
22 February 2008: Wouter
|
||||||
|
|
|
||||||
|
|
@ -86,6 +86,51 @@ struct proxy {
|
||||||
struct proxy* next;
|
struct proxy* next;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An item that has to be TCP relayed
|
||||||
|
*/
|
||||||
|
struct tcp_send_list {
|
||||||
|
/** the data item */
|
||||||
|
uint8_t* item;
|
||||||
|
/** size of item */
|
||||||
|
size_t len;
|
||||||
|
/** time when the item can be transmitted on */
|
||||||
|
struct timeval wait;
|
||||||
|
/** how much of the item has already been transmitted */
|
||||||
|
size_t done;
|
||||||
|
/** next in list */
|
||||||
|
struct tcp_send_list* next;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List of TCP proxy fd pairs to TCP connect client to server
|
||||||
|
*/
|
||||||
|
struct tcp_proxy {
|
||||||
|
/** the fd to listen for client query */
|
||||||
|
int client_s;
|
||||||
|
/** the fd to listen for server answer */
|
||||||
|
int server_s;
|
||||||
|
|
||||||
|
/** remote client address */
|
||||||
|
struct sockaddr_storage addr;
|
||||||
|
/** length of address */
|
||||||
|
socklen_t addr_len;
|
||||||
|
/** timeout on this entry */
|
||||||
|
struct timeval timeout;
|
||||||
|
|
||||||
|
/** list of query items to send to server */
|
||||||
|
struct tcp_send_list* querylist;
|
||||||
|
/** last in query list */
|
||||||
|
struct tcp_send_list* querylast;
|
||||||
|
/** list of answer items to send to client */
|
||||||
|
struct tcp_send_list* answerlist;
|
||||||
|
/** last in answerlist */
|
||||||
|
struct tcp_send_list* answerlast;
|
||||||
|
|
||||||
|
/** next in list */
|
||||||
|
struct tcp_proxy* next;
|
||||||
|
};
|
||||||
|
|
||||||
/** usage information for delayer */
|
/** usage information for delayer */
|
||||||
void usage(char* argv[])
|
void usage(char* argv[])
|
||||||
{
|
{
|
||||||
|
|
@ -94,7 +139,8 @@ void usage(char* argv[])
|
||||||
printf(" -b addr : bind to this address to listen.\n");
|
printf(" -b addr : bind to this address to listen.\n");
|
||||||
printf(" -p port : bind to this port (use 0 for random).\n");
|
printf(" -p port : bind to this port (use 0 for random).\n");
|
||||||
printf(" -m mem : use this much memory for waiting queries.\n");
|
printf(" -m mem : use this much memory for waiting queries.\n");
|
||||||
printf(" -d delay: queries are delayed n milliseconds.\n");
|
printf(" -d delay: UDP queries are delayed n milliseconds.\n");
|
||||||
|
printf(" TCP is delayed twice (on send, on recv).\n");
|
||||||
printf(" -h : this help message\n");
|
printf(" -h : this help message\n");
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
@ -372,24 +418,21 @@ service_proxy(fd_set* rset, int retsock, struct proxy* proxies,
|
||||||
static struct proxy*
|
static struct proxy*
|
||||||
find_create_proxy(struct sockaddr_storage* from, socklen_t from_len,
|
find_create_proxy(struct sockaddr_storage* from, socklen_t from_len,
|
||||||
fd_set* rorig, int* max, struct proxy** proxies, int serv_ip6,
|
fd_set* rorig, int* max, struct proxy** proxies, int serv_ip6,
|
||||||
struct timeval* now)
|
struct timeval* now, struct timeval* reuse_timeout)
|
||||||
{
|
{
|
||||||
struct proxy* p;
|
struct proxy* p;
|
||||||
struct timeval t;
|
struct timeval t;
|
||||||
struct timeval reuse_timeout;
|
|
||||||
for(p = *proxies; p; p = p->next) {
|
for(p = *proxies; p; p = p->next) {
|
||||||
if(sockaddr_cmp(from, from_len, &p->addr, p->addr_len)==0)
|
if(sockaddr_cmp(from, from_len, &p->addr, p->addr_len)==0)
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
/* possibly: reuse lapsed entries */
|
/* possibly: reuse lapsed entries */
|
||||||
reuse_timeout.tv_sec = 1;
|
|
||||||
reuse_timeout.tv_usec = 0;
|
|
||||||
for(p = *proxies; p; p = p->next) {
|
for(p = *proxies; p; p = p->next) {
|
||||||
if(p->numwait > p->numsent || p->numsent > p->numreturn)
|
if(p->numwait > p->numsent || p->numsent > p->numreturn)
|
||||||
continue;
|
continue;
|
||||||
t = *now;
|
t = *now;
|
||||||
dl_tv_subtract(&t, &p->lastuse);
|
dl_tv_subtract(&t, &p->lastuse);
|
||||||
if(dl_tv_smaller(&t, &reuse_timeout))
|
if(dl_tv_smaller(&t, reuse_timeout))
|
||||||
continue;
|
continue;
|
||||||
/* yes! */
|
/* yes! */
|
||||||
verbose(1, "reuse existing entry");
|
verbose(1, "reuse existing entry");
|
||||||
|
|
@ -399,7 +442,7 @@ find_create_proxy(struct sockaddr_storage* from, socklen_t from_len,
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
/* create new */
|
/* create new */
|
||||||
p = calloc(1, sizeof(*p));
|
p = (struct proxy*)calloc(1, sizeof(*p));
|
||||||
if(!p) fatal_exit("out of memory");
|
if(!p) fatal_exit("out of memory");
|
||||||
p->s = socket(serv_ip6?AF_INET6:AF_INET, SOCK_DGRAM, 0);
|
p->s = socket(serv_ip6?AF_INET6:AF_INET, SOCK_DGRAM, 0);
|
||||||
if(p->s == -1) fatal_exit("socket: %s", strerror(errno));
|
if(p->s == -1) fatal_exit("socket: %s", strerror(errno));
|
||||||
|
|
@ -419,7 +462,7 @@ static void
|
||||||
service_recv(int s, struct ringbuf* ring, ldns_buffer* pkt,
|
service_recv(int s, struct ringbuf* ring, ldns_buffer* pkt,
|
||||||
fd_set* rorig, int* max, struct proxy** proxies,
|
fd_set* rorig, int* max, struct proxy** proxies,
|
||||||
struct sockaddr_storage* srv_addr, socklen_t srv_len,
|
struct sockaddr_storage* srv_addr, socklen_t srv_len,
|
||||||
struct timeval* now, struct timeval* delay)
|
struct timeval* now, struct timeval* delay, struct timeval* reuse)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
struct sockaddr_storage from;
|
struct sockaddr_storage from;
|
||||||
|
|
@ -439,7 +482,7 @@ service_recv(int s, struct ringbuf* ring, ldns_buffer* pkt,
|
||||||
ldns_buffer_set_limit(pkt, (size_t)len);
|
ldns_buffer_set_limit(pkt, (size_t)len);
|
||||||
/* find its proxy element */
|
/* find its proxy element */
|
||||||
p = find_create_proxy(&from, from_len, rorig, max, proxies,
|
p = find_create_proxy(&from, from_len, rorig, max, proxies,
|
||||||
addr_is_ip6(srv_addr, srv_len), now);
|
addr_is_ip6(srv_addr, srv_len), now, reuse);
|
||||||
if(!p) fatal_exit("error: cannot find or create proxy");
|
if(!p) fatal_exit("error: cannot find or create proxy");
|
||||||
p->lastuse = *now;
|
p->lastuse = *now;
|
||||||
ring_add(ring, pkt, now, delay, p);
|
ring_add(ring, pkt, now, delay, p);
|
||||||
|
|
@ -448,15 +491,299 @@ service_recv(int s, struct ringbuf* ring, ldns_buffer* pkt,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** delete tcp proxy */
|
||||||
|
static void
|
||||||
|
tcp_proxy_delete(struct tcp_proxy* p)
|
||||||
|
{
|
||||||
|
struct tcp_send_list* s, *sn;
|
||||||
|
if(!p)
|
||||||
|
return;
|
||||||
|
log_addr(1, "delete tcp proxy", &p->addr, p->addr_len);
|
||||||
|
s = p->querylist;
|
||||||
|
while(s) {
|
||||||
|
sn = s->next;
|
||||||
|
free(s->item);
|
||||||
|
free(s);
|
||||||
|
s = sn;
|
||||||
|
}
|
||||||
|
s = p->answerlist;
|
||||||
|
while(s) {
|
||||||
|
sn = s->next;
|
||||||
|
free(s->item);
|
||||||
|
free(s);
|
||||||
|
s = sn;
|
||||||
|
}
|
||||||
|
close(p->client_s);
|
||||||
|
if(p->server_s != -1)
|
||||||
|
close(p->server_s);
|
||||||
|
free(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** accept new TCP connections, and set them up */
|
||||||
|
static void
|
||||||
|
service_tcp_listen(int s, fd_set* rorig, int* max, struct tcp_proxy** proxies,
|
||||||
|
struct sockaddr_storage* srv_addr, socklen_t srv_len,
|
||||||
|
struct timeval* now, struct timeval* tcp_timeout)
|
||||||
|
{
|
||||||
|
int newfd;
|
||||||
|
struct sockaddr_storage addr;
|
||||||
|
struct tcp_proxy* p;
|
||||||
|
socklen_t addr_len;
|
||||||
|
newfd = accept(s, (struct sockaddr*)&addr, &addr_len);
|
||||||
|
if(newfd == -1) {
|
||||||
|
if(errno == EAGAIN || errno == EINTR)
|
||||||
|
return;
|
||||||
|
fatal_exit("accept: %s", strerror(errno));
|
||||||
|
}
|
||||||
|
p = (struct tcp_proxy*)calloc(1, sizeof(*p));
|
||||||
|
if(!p) fatal_exit("out of memory");
|
||||||
|
memmove(&p->addr, &addr, addr_len);
|
||||||
|
p->addr_len = addr_len;
|
||||||
|
log_addr(1, "new tcp proxy", &p->addr, p->addr_len);
|
||||||
|
p->client_s = newfd;
|
||||||
|
p->server_s = socket(addr_is_ip6(srv_addr, srv_len)?AF_INET6:AF_INET,
|
||||||
|
SOCK_STREAM, 0);
|
||||||
|
if(p->server_s == -1)
|
||||||
|
fatal_exit("tcp socket: %s", strerror(errno));
|
||||||
|
fd_set_nonblock(p->client_s);
|
||||||
|
fd_set_nonblock(p->server_s);
|
||||||
|
if(connect(p->server_s, (struct sockaddr*)srv_addr, srv_len) == -1) {
|
||||||
|
if(errno != EINPROGRESS) {
|
||||||
|
log_err("tcp connect: %s", strerror(errno));
|
||||||
|
close(p->server_s);
|
||||||
|
close(p->client_s);
|
||||||
|
free(p);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
p->timeout = *now;
|
||||||
|
dl_tv_add(&p->timeout, tcp_timeout);
|
||||||
|
|
||||||
|
/* listen to client and server */
|
||||||
|
FD_SET(p->client_s, rorig);
|
||||||
|
FD_SET(p->server_s, rorig);
|
||||||
|
if(p->client_s+1 > *max)
|
||||||
|
*max = p->client_s+1;
|
||||||
|
if(p->server_s+1 > *max)
|
||||||
|
*max = p->server_s+1;
|
||||||
|
|
||||||
|
/* add into proxy list */
|
||||||
|
p->next = *proxies;
|
||||||
|
*proxies = p;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** relay TCP, read a part */
|
||||||
|
static int
|
||||||
|
tcp_relay_read(int s, struct tcp_send_list** first,
|
||||||
|
struct tcp_send_list** last, struct timeval* now,
|
||||||
|
struct timeval* delay, ldns_buffer* pkt)
|
||||||
|
{
|
||||||
|
struct tcp_send_list* item;
|
||||||
|
ssize_t r = read(s, ldns_buffer_begin(pkt), ldns_buffer_capacity(pkt));
|
||||||
|
if(r == -1) {
|
||||||
|
if(errno == EINTR || errno == EAGAIN)
|
||||||
|
return 1;
|
||||||
|
log_err("tcp read: %s", strerror(errno));
|
||||||
|
return 0;
|
||||||
|
} else if(r == 0) {
|
||||||
|
/* connection closed */
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
item = (struct tcp_send_list*)malloc(sizeof(*item));
|
||||||
|
if(!item) {
|
||||||
|
log_err("out of memory");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
verbose(1, "read item len %d", (int)r);
|
||||||
|
item->len = (size_t)r;
|
||||||
|
item->item = memdup(ldns_buffer_begin(pkt), item->len);
|
||||||
|
if(!item->item) {
|
||||||
|
free(item);
|
||||||
|
log_err("out of memory");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
item->done = 0;
|
||||||
|
item->wait = *now;
|
||||||
|
dl_tv_add(&item->wait, delay);
|
||||||
|
item->next = NULL;
|
||||||
|
|
||||||
|
/* link in */
|
||||||
|
if(*first) {
|
||||||
|
(*last)->next = item;
|
||||||
|
} else {
|
||||||
|
*first = item;
|
||||||
|
}
|
||||||
|
*last = item;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** relay TCP, write a part */
|
||||||
|
static int
|
||||||
|
tcp_relay_write(int s, struct tcp_send_list** first,
|
||||||
|
struct tcp_send_list** last, struct timeval* now)
|
||||||
|
{
|
||||||
|
ssize_t r;
|
||||||
|
struct tcp_send_list* p;
|
||||||
|
while(*first) {
|
||||||
|
p = *first;
|
||||||
|
/* is the item ready? */
|
||||||
|
if(!dl_tv_smaller(&p->wait, now))
|
||||||
|
return 1;
|
||||||
|
/* write it */
|
||||||
|
r = write(s, p->item + p->done, p->len - p->done);
|
||||||
|
if(r == -1) {
|
||||||
|
if(errno == EAGAIN || errno == EINTR)
|
||||||
|
return 1;
|
||||||
|
log_err("tcp write: %s", strerror(errno));
|
||||||
|
return 0;
|
||||||
|
} else if(r == 0) {
|
||||||
|
/* closed */
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
/* account it */
|
||||||
|
p->done += (size_t)r;
|
||||||
|
verbose(1, "write item %d of %d", (int)p->done, (int)p->len);
|
||||||
|
if(p->done >= p->len) {
|
||||||
|
free(p->item);
|
||||||
|
*first = p->next;
|
||||||
|
if(!*first)
|
||||||
|
*last = NULL;
|
||||||
|
free(p);
|
||||||
|
} else {
|
||||||
|
/* partial write */
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** perform TCP relaying */
|
||||||
|
static void
|
||||||
|
service_tcp_relay(struct tcp_proxy** tcp_proxies, struct timeval* now,
|
||||||
|
struct timeval* delay, struct timeval* tcp_timeout, ldns_buffer* pkt,
|
||||||
|
fd_set* rset, fd_set* rorig, fd_set* worig)
|
||||||
|
{
|
||||||
|
struct tcp_proxy* p, **prev;
|
||||||
|
struct timeval tout;
|
||||||
|
int delete_it;
|
||||||
|
p = *tcp_proxies;
|
||||||
|
prev = tcp_proxies;
|
||||||
|
tout = *now;
|
||||||
|
dl_tv_add(&tout, tcp_timeout);
|
||||||
|
|
||||||
|
while(p) {
|
||||||
|
delete_it = 0;
|
||||||
|
/* can we receive further queries? */
|
||||||
|
if(!delete_it && FD_ISSET(p->client_s, rset)) {
|
||||||
|
p->timeout = tout;
|
||||||
|
log_addr(1, "read tcp query", &p->addr, p->addr_len);
|
||||||
|
if(!tcp_relay_read(p->client_s, &p->querylist,
|
||||||
|
&p->querylast, now, delay, pkt))
|
||||||
|
delete_it = 1;
|
||||||
|
}
|
||||||
|
/* can we receive further answers? */
|
||||||
|
if(!delete_it && p->server_s != -1 &&
|
||||||
|
FD_ISSET(p->server_s, rset)) {
|
||||||
|
p->timeout = tout;
|
||||||
|
log_addr(1, "read tcp answer", &p->addr, p->addr_len);
|
||||||
|
if(!tcp_relay_read(p->server_s, &p->answerlist,
|
||||||
|
&p->answerlast, now, delay, pkt)) {
|
||||||
|
close(p->server_s);
|
||||||
|
FD_CLR(p->server_s, worig);
|
||||||
|
FD_CLR(p->server_s, rorig);
|
||||||
|
p->server_s = -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* can we send on further queries */
|
||||||
|
if(!delete_it && p->querylist && p->server_s != -1) {
|
||||||
|
p->timeout = tout;
|
||||||
|
if(dl_tv_smaller(&p->querylist->wait, now))
|
||||||
|
log_addr(1, "write tcp query",
|
||||||
|
&p->addr, p->addr_len);
|
||||||
|
if(!tcp_relay_write(p->server_s, &p->querylist,
|
||||||
|
&p->querylast, now))
|
||||||
|
delete_it = 1;
|
||||||
|
if(p->querylist && p->server_s != -1 &&
|
||||||
|
dl_tv_smaller(&p->querylist->wait, now))
|
||||||
|
FD_SET(p->server_s, worig);
|
||||||
|
else FD_CLR(p->server_s, worig);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* can we send on further answers */
|
||||||
|
if(!delete_it && p->answerlist) {
|
||||||
|
p->timeout = tout;
|
||||||
|
if(dl_tv_smaller(&p->answerlist->wait, now))
|
||||||
|
log_addr(1, "write tcp answer",
|
||||||
|
&p->addr, p->addr_len);
|
||||||
|
if(!tcp_relay_write(p->client_s, &p->answerlist,
|
||||||
|
&p->answerlast, now))
|
||||||
|
delete_it = 1;
|
||||||
|
if(p->answerlist && dl_tv_smaller(&p->answerlist->wait,
|
||||||
|
now))
|
||||||
|
FD_SET(p->client_s, worig);
|
||||||
|
else FD_CLR(p->client_s, worig);
|
||||||
|
if(!p->answerlist && p->server_s == -1)
|
||||||
|
delete_it = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* does this entry timeout? (unused too long) */
|
||||||
|
if(dl_tv_smaller(&p->timeout, now)) {
|
||||||
|
delete_it = 1;
|
||||||
|
}
|
||||||
|
if(delete_it) {
|
||||||
|
struct tcp_proxy* np = p->next;
|
||||||
|
*prev = np;
|
||||||
|
FD_CLR(p->client_s, rorig);
|
||||||
|
FD_CLR(p->client_s, worig);
|
||||||
|
if(p->server_s != -1) {
|
||||||
|
FD_CLR(p->server_s, rorig);
|
||||||
|
FD_CLR(p->server_s, worig);
|
||||||
|
}
|
||||||
|
tcp_proxy_delete(p);
|
||||||
|
p = np;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
prev = &p->next;
|
||||||
|
p = p->next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** find waiting time */
|
/** find waiting time */
|
||||||
static int
|
static int
|
||||||
service_findwait(struct timeval* now, struct timeval* wait,
|
service_findwait(struct timeval* now, struct timeval* wait,
|
||||||
struct ringbuf* ring)
|
struct ringbuf* ring, struct tcp_proxy* tcplist)
|
||||||
{
|
{
|
||||||
/* first item is the time to wait */
|
/* first item is the time to wait */
|
||||||
struct timeval* peek = ring_peek_time(ring);
|
struct timeval* peek = ring_peek_time(ring);
|
||||||
|
struct timeval tcv;
|
||||||
|
int have_tcpval = 0;
|
||||||
|
struct tcp_proxy* p;
|
||||||
|
|
||||||
|
/* also for TCP list the first in sendlists is the time to wait */
|
||||||
|
for(p=tcplist; p; p=p->next) {
|
||||||
|
if(!have_tcpval)
|
||||||
|
tcv = p->timeout;
|
||||||
|
have_tcpval = 1;
|
||||||
|
if(dl_tv_smaller(&p->timeout, &tcv))
|
||||||
|
tcv = p->timeout;
|
||||||
|
if(p->querylist && dl_tv_smaller(&p->querylist->wait, &tcv))
|
||||||
|
tcv = p->querylist->wait;
|
||||||
|
if(p->answerlist && dl_tv_smaller(&p->answerlist->wait, &tcv))
|
||||||
|
tcv = p->answerlist->wait;
|
||||||
|
}
|
||||||
if(peek) {
|
if(peek) {
|
||||||
|
/* peek can be unaligned */
|
||||||
|
/* use wait as a temp variable */
|
||||||
memmove(wait, peek, sizeof(*wait));
|
memmove(wait, peek, sizeof(*wait));
|
||||||
|
if(!have_tcpval)
|
||||||
|
tcv = *wait;
|
||||||
|
else if(dl_tv_smaller(wait, &tcv))
|
||||||
|
tcv = *wait;
|
||||||
|
have_tcpval = 1;
|
||||||
|
}
|
||||||
|
if(have_tcpval) {
|
||||||
|
*wait = tcv;
|
||||||
dl_tv_subtract(wait, now);
|
dl_tv_subtract(wait, now);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
@ -495,29 +822,51 @@ proxy_list_clear(struct proxy* p)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** clear TCP proxy list */
|
||||||
|
static void
|
||||||
|
tcp_proxy_list_clear(struct tcp_proxy* p)
|
||||||
|
{
|
||||||
|
struct tcp_proxy* np;
|
||||||
|
while(p) {
|
||||||
|
np = p->next;
|
||||||
|
tcp_proxy_delete(p);
|
||||||
|
p = np;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** delayer service loop */
|
/** delayer service loop */
|
||||||
static void
|
static void
|
||||||
service_loop(int udp_s, struct ringbuf* ring, struct timeval* delay,
|
service_loop(int udp_s, int listen_s, struct ringbuf* ring,
|
||||||
|
struct timeval* delay, struct timeval* reuse,
|
||||||
struct sockaddr_storage* srv_addr, socklen_t srv_len,
|
struct sockaddr_storage* srv_addr, socklen_t srv_len,
|
||||||
ldns_buffer* pkt)
|
ldns_buffer* pkt)
|
||||||
{
|
{
|
||||||
fd_set rset, rorig;
|
fd_set rset, rorig;
|
||||||
|
fd_set wset, worig;
|
||||||
struct timeval now, wait;
|
struct timeval now, wait;
|
||||||
int max, have_wait = 0;
|
int max, have_wait = 0;
|
||||||
struct proxy* proxies = NULL;
|
struct proxy* proxies = NULL;
|
||||||
|
struct tcp_proxy* tcp_proxies = NULL;
|
||||||
|
struct timeval tcp_timeout;
|
||||||
|
tcp_timeout.tv_sec = 120;
|
||||||
|
tcp_timeout.tv_usec = 0;
|
||||||
#ifndef S_SPLINT_S
|
#ifndef S_SPLINT_S
|
||||||
FD_ZERO(&rorig);
|
FD_ZERO(&rorig);
|
||||||
|
FD_ZERO(&worig);
|
||||||
FD_SET(udp_s, &rorig);
|
FD_SET(udp_s, &rorig);
|
||||||
|
FD_SET(listen_s, &rorig);
|
||||||
#endif
|
#endif
|
||||||
max = udp_s + 1;
|
max = udp_s + 1;
|
||||||
|
if(listen_s + 1 > max) max = listen_s + 1;
|
||||||
while(!do_quit) {
|
while(!do_quit) {
|
||||||
/* wait for events */
|
/* wait for events */
|
||||||
rset = rorig;
|
rset = rorig;
|
||||||
|
wset = worig;
|
||||||
if(have_wait)
|
if(have_wait)
|
||||||
verbose(1, "wait for %d.%6.6d",
|
verbose(1, "wait for %d.%6.6d",
|
||||||
(unsigned)wait.tv_sec, (unsigned)wait.tv_usec);
|
(unsigned)wait.tv_sec, (unsigned)wait.tv_usec);
|
||||||
else verbose(1, "wait");
|
else verbose(1, "wait");
|
||||||
if(select(max, &rset, NULL, NULL, have_wait?&wait:NULL) < 0) {
|
if(select(max, &rset, &wset, NULL, have_wait?&wait:NULL) < 0) {
|
||||||
if(errno == EAGAIN || errno == EINTR)
|
if(errno == EAGAIN || errno == EINTR)
|
||||||
continue;
|
continue;
|
||||||
fatal_exit("select: %s", strerror(errno));
|
fatal_exit("select: %s", strerror(errno));
|
||||||
|
|
@ -528,8 +877,7 @@ service_loop(int udp_s, struct ringbuf* ring, struct timeval* delay,
|
||||||
continue;
|
continue;
|
||||||
fatal_exit("gettimeofday: %s", strerror(errno));
|
fatal_exit("gettimeofday: %s", strerror(errno));
|
||||||
}
|
}
|
||||||
verbose(1, " ");
|
verbose(1, "process at %u.%6.6u\n",
|
||||||
verbose(1, "process at %u.%6.6u",
|
|
||||||
(unsigned)now.tv_sec, (unsigned)now.tv_usec);
|
(unsigned)now.tv_sec, (unsigned)now.tv_usec);
|
||||||
/* sendout delayed queries to master server (frees up buffer)*/
|
/* sendout delayed queries to master server (frees up buffer)*/
|
||||||
service_send(ring, &now, pkt, srv_addr, srv_len);
|
service_send(ring, &now, pkt, srv_addr, srv_len);
|
||||||
|
|
@ -537,11 +885,18 @@ service_loop(int udp_s, struct ringbuf* ring, struct timeval* delay,
|
||||||
service_proxy(&rset, udp_s, proxies, pkt, &now);
|
service_proxy(&rset, udp_s, proxies, pkt, &now);
|
||||||
/* see what can be received to start waiting */
|
/* see what can be received to start waiting */
|
||||||
service_recv(udp_s, ring, pkt, &rorig, &max, &proxies,
|
service_recv(udp_s, ring, pkt, &rorig, &max, &proxies,
|
||||||
srv_addr, srv_len, &now, delay);
|
srv_addr, srv_len, &now, delay, reuse);
|
||||||
|
/* see if there are new tcp connections */
|
||||||
|
service_tcp_listen(listen_s, &rorig, &max, &tcp_proxies,
|
||||||
|
srv_addr, srv_len, &now, &tcp_timeout);
|
||||||
|
/* service tcp connections */
|
||||||
|
service_tcp_relay(&tcp_proxies, &now, delay, &tcp_timeout,
|
||||||
|
pkt, &rset, &rorig, &worig);
|
||||||
/* see what next timeout is (if any) */
|
/* see what next timeout is (if any) */
|
||||||
have_wait = service_findwait(&now, &wait, ring);
|
have_wait = service_findwait(&now, &wait, ring, tcp_proxies);
|
||||||
}
|
}
|
||||||
proxy_list_clear(proxies);
|
proxy_list_clear(proxies);
|
||||||
|
tcp_proxy_list_clear(tcp_proxies);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** delayer main service routine */
|
/** delayer main service routine */
|
||||||
|
|
@ -552,11 +907,17 @@ service(char* bind_str, int bindport, char* serv_str, size_t memsize,
|
||||||
struct sockaddr_storage bind_addr, srv_addr;
|
struct sockaddr_storage bind_addr, srv_addr;
|
||||||
socklen_t bind_len, srv_len;
|
socklen_t bind_len, srv_len;
|
||||||
struct ringbuf* ring = ring_create(memsize);
|
struct ringbuf* ring = ring_create(memsize);
|
||||||
struct timeval delay;
|
struct timeval delay, reuse;
|
||||||
ldns_buffer* pkt;
|
ldns_buffer* pkt;
|
||||||
int i, s;
|
int i, s, listen_s;
|
||||||
delay.tv_sec = delay_msec / 1000;
|
delay.tv_sec = delay_msec / 1000;
|
||||||
delay.tv_usec = (delay_msec % 1000)*1000;
|
delay.tv_usec = (delay_msec % 1000)*1000;
|
||||||
|
reuse = delay; /* reuse is max(4*delay, 1 second) */
|
||||||
|
dl_tv_add(&reuse, &delay);
|
||||||
|
dl_tv_add(&reuse, &delay);
|
||||||
|
dl_tv_add(&reuse, &delay);
|
||||||
|
if(reuse.tv_sec == 0)
|
||||||
|
reuse.tv_sec = 1;
|
||||||
if(!extstrtoaddr(serv_str, &srv_addr, &srv_len)) {
|
if(!extstrtoaddr(serv_str, &srv_addr, &srv_len)) {
|
||||||
printf("cannot parse forward address: %s\n", serv_str);
|
printf("cannot parse forward address: %s\n", serv_str);
|
||||||
exit(1);
|
exit(1);
|
||||||
|
|
@ -592,15 +953,35 @@ service(char* bind_str, int bindport, char* serv_str, size_t memsize,
|
||||||
} else break;
|
} else break;
|
||||||
}
|
}
|
||||||
fd_set_nonblock(s);
|
fd_set_nonblock(s);
|
||||||
|
/* and TCP port */
|
||||||
|
if((listen_s = socket(str_is_ip6(bind_str)?AF_INET6:AF_INET,
|
||||||
|
SOCK_STREAM, 0)) == -1)
|
||||||
|
fatal_exit("tcp socket: %s", strerror(errno));
|
||||||
|
#ifdef SO_REUSEADDR
|
||||||
|
if(1) {
|
||||||
|
int on = 1;
|
||||||
|
if(setsockopt(listen_s, SOL_SOCKET, SO_REUSEADDR, &on,
|
||||||
|
(socklen_t)sizeof(on)) < 0)
|
||||||
|
fatal_exit("setsockopt(.. SO_REUSEADDR ..) failed: %s",
|
||||||
|
strerror(errno));
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
if(bind(listen_s, (struct sockaddr*)&bind_addr, bind_len) == -1)
|
||||||
|
fatal_exit("tcp bind: %s", strerror(errno));
|
||||||
|
if(listen(listen_s, 5) == -1)
|
||||||
|
fatal_exit("tcp listen: %s", strerror(errno));
|
||||||
|
fd_set_nonblock(listen_s);
|
||||||
printf("listening on port: %d\n", bindport);
|
printf("listening on port: %d\n", bindport);
|
||||||
|
|
||||||
/* process loop */
|
/* process loop */
|
||||||
do_quit = 0;
|
do_quit = 0;
|
||||||
service_loop(s, ring, &delay, &srv_addr, srv_len, pkt);
|
service_loop(s, listen_s, ring, &delay, &reuse, &srv_addr, srv_len,
|
||||||
|
pkt);
|
||||||
|
|
||||||
/* cleanup */
|
/* cleanup */
|
||||||
verbose(1, "cleanup");
|
verbose(1, "cleanup");
|
||||||
close(s);
|
close(s);
|
||||||
|
close(listen_s);
|
||||||
ldns_buffer_free(pkt);
|
ldns_buffer_free(pkt);
|
||||||
ring_delete(ring);
|
ring_delete(ring);
|
||||||
}
|
}
|
||||||
|
|
@ -620,7 +1001,7 @@ int main(int argc, char** argv)
|
||||||
size_t memsize = 10*1024*1024;
|
size_t memsize = 10*1024*1024;
|
||||||
int delay = 100;
|
int delay = 100;
|
||||||
|
|
||||||
verbosity = 0;
|
verbosity = 1;
|
||||||
log_init(0, 0, 0);
|
log_init(0, 0, 0);
|
||||||
log_ident_set("delayer");
|
log_ident_set("delayer");
|
||||||
srandom(time(NULL) ^ getpid());
|
srandom(time(NULL) ^ getpid());
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue