tcp callback handle timeout event for read and reuse keepalive.

This commit is contained in:
W.C.A. Wijngaards 2020-06-25 14:26:29 +02:00
parent 5f5cdd3be1
commit 39a50f30a3
3 changed files with 155 additions and 64 deletions

View file

@ -346,16 +346,27 @@ outnet_tcp_connect(int s, struct sockaddr_storage* addr, socklen_t addrlen)
/** use the buffer to setup writing the query */
static void
outnet_tcp_take_query_setup(int s, struct pending_tcp* pend, uint8_t* pkt,
size_t pkt_len)
outnet_tcp_take_query_setup(int s, struct pending_tcp* pend,
struct waiting_tcp* w)
{
pend->id = LDNS_ID_WIRE(pkt);
sldns_buffer_clear(pend->c->buffer);
sldns_buffer_write(pend->c->buffer, pkt, pkt_len);
sldns_buffer_flip(pend->c->buffer);
struct timeval tv;
pend->id = LDNS_ID_WIRE(w->pkt);
pend->c->tcp_write_pkt = w->pkt;
pend->c->tcp_write_pkt_len = w->pkt_len;
pend->c->tcp_write_and_read = 1;
pend->c->tcp_write_byte_count = 0;
comm_point_start_listening(pend->c, s, -1);
/* set timer on the waiting_tcp entry, this is the write timeout
* for the written packet. The timer on pend->c is the timer
* for when there is no written packet and we have readtimeouts */
#ifndef S_SPLINT_S
tv.tv_sec = w->timeout/1000;
tv.tv_usec = (w->timeout%1000)*1000;
#endif
/* if the waiting_tcp was previously waiting for a buffer in the
* outside_network.tcpwaitlist, then the timer is reset now that
* we start writing it */
comm_timer_set(w->timer, &tv);
}
/** use next free buffer to service a tcp query */
@ -471,7 +482,7 @@ outnet_tcp_take_into_use(struct waiting_tcp* w)
pend->c->repinfo.addrlen = w->addrlen;
memcpy(&pend->c->repinfo.addr, &w->addr, w->addrlen);
pend->reuse.pending = pend;
outnet_tcp_take_query_setup(s, pend, w->pkt, w->pkt_len);
outnet_tcp_take_query_setup(s, pend, w);
return 1;
}
@ -570,7 +581,7 @@ static void
decommission_pending_tcp(struct outside_network* outnet,
struct pending_tcp* pend)
{
verbose(5, "decommision_pending_tcp");
verbose(5, "decommission_pending_tcp");
if(pend->c->ssl) {
#ifdef HAVE_SSL
SSL_shutdown(pend->c->ssl);
@ -633,6 +644,75 @@ reuse_tcp_insert(struct outside_network* outnet, struct pending_tcp* pend_tcp)
return 1;
}
/** perform failure callbacks for waiting queries in reuse write list */
static void reuse_cb_writewait_for_failure(struct pending_tcp* pend, int err)
{
struct waiting_tcp* w;
w = pend->reuse.write_wait_first;
while(w) {
comm_point_callback_type* cb = w->cb;
void* cb_arg = w->cb_arg;
fptr_ok(fptr_whitelist_pending_tcp(cb));
(void)(*cb)(NULL, cb_arg, err, NULL);
w = w->write_wait_next;
}
}
/** perform failure callbacks for waiting queries in reuse read rbtree */
static void reuse_cb_readwait_for_failure(struct pending_tcp* pend, int err)
{
rbnode_type* node;
if(pend->reuse.tree_by_id.root == NULL ||
pend->reuse.tree_by_id.root == RBTREE_NULL)
return;
node = rbtree_first(&pend->reuse.tree_by_id);
while(node && node != RBTREE_NULL) {
struct waiting_tcp* w = (struct waiting_tcp*)node->key;
comm_point_callback_type* cb = w->cb;
void* cb_arg = w->cb_arg;
fptr_ok(fptr_whitelist_pending_tcp(cb));
(void)(*cb)(NULL, cb_arg, err, NULL);
node = rbtree_next(node);
}
}
/** perform failure callbacks for current written query in reuse struct */
static void reuse_cb_curquery_for_failure(struct pending_tcp* pend, int err)
{
struct waiting_tcp* w = pend->query;
if(w) {
comm_point_callback_type* cb = w->cb;
void* cb_arg = w->cb_arg;
fptr_ok(fptr_whitelist_pending_tcp(cb));
(void)(*cb)(NULL, cb_arg, err, NULL);
}
}
/** delete element from tree by id */
static void
reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w)
{
log_assert(w->id_node.key != NULL);
rbtree_delete(&reuse->tree_by_id, w);
w->id_node.key = NULL;
}
/** pop the first element from the writewait list */
static struct waiting_tcp* reuse_write_wait_pop(struct pending_tcp* pend)
{
struct waiting_tcp* w = pend->reuse.write_wait_first;
if(!w)
return NULL;
if(w->write_wait_prev)
w->write_wait_prev->write_wait_next = w->write_wait_next;
else pend->reuse.write_wait_first = w->write_wait_next;
if(w->write_wait_next)
w->write_wait_next->write_wait_prev = w->write_wait_prev;
else pend->reuse.write_wait_last = w->write_wait_prev;
w->write_wait_queued = 0;
return w;
}
/** set timeout on tcp fd and setup read event to catch incoming dns msgs */
static void
reuse_tcp_setup_readtimeout(struct pending_tcp* pend_tcp)
@ -651,7 +731,38 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
struct pending_tcp* pend = (struct pending_tcp*)arg;
struct outside_network* outnet = pend->reuse.outnet;
verbose(VERB_ALGO, "outnettcp cb");
if(error != NETEVENT_NOERROR) {
if(error == NETEVENT_TIMEOUT) {
if(pend->c->tcp_write_and_read)
verbose(VERB_QUERY, "outnettcp got tcp timeout "
"for read, ignored because write underway");
else verbose(VERB_QUERY, "outnettcp got tcp timeout %s",
(pend->reuse.tree_by_id.count?"for reading pkt":
"for keepalive for reuse"));
/* if we are writing, ignore readtimer, wait for write timer
* or write is done */
if(pend->c->tcp_write_and_read)
return 0;
/* must be timeout for reading or keepalive reuse,
* close it. */
reuse_tcp_remove_tree_list(outnet, &pend->reuse);
} else if(error == NETEVENT_PKT_WRITTEN) {
/* the packet we want to write has been written. */
log_assert(c == pend->c);
log_assert(pend->query->pkt == pend->c->tcp_write_pkt);
log_assert(pend->query->pkt_len == pend->c->tcp_write_pkt_len);
pend->c->tcp_write_pkt = NULL;
pend->c->tcp_write_pkt_len = 0;
/* the pend.query is already in tree_by_id */
pend->query = NULL;
/* setup to write next packet or setup read timeout */
if(pend->reuse.write_wait_first) {
pend->query = reuse_write_wait_pop(pend);
outnet_tcp_take_query_setup(pend->c->fd, pend,
pend->query);
} else {
reuse_tcp_setup_readtimeout(pend);
}
} else if(error != NETEVENT_NOERROR) {
verbose(VERB_QUERY, "outnettcp got tcp error %d", error);
/* pass error below and exit */
} else {
@ -674,6 +785,7 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
}
}
if(pend->query) {
reuse_tree_by_id_delete(&pend->reuse, pend->query);
fptr_ok(fptr_whitelist_pending_tcp(pend->query->cb));
(void)(*pend->query->cb)(c, pend->query->cb_arg, error, reply_info);
waiting_tcp_delete(pend->query);
@ -687,7 +799,12 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
return 0;
}
verbose(5, "outnet_tcp_cb reuse after cb: decommission it");
/* no queries on it, no space to keep it. Close it */
/* no queries on it, no space to keep it. or timeout or closed due
* to error. Close it */
reuse_cb_readwait_for_failure(pend, (error==NETEVENT_TIMEOUT?
NETEVENT_TIMEOUT:NETEVENT_CLOSED));
reuse_cb_writewait_for_failure(pend, (error==NETEVENT_TIMEOUT?
NETEVENT_TIMEOUT:NETEVENT_CLOSED));
decommission_pending_tcp(outnet, pend);
use_free_buffer(outnet);
return 0;
@ -1501,50 +1618,6 @@ pending_udp_query(struct serviced_query* sq, struct sldns_buffer* packet,
return pend;
}
/** perform failure callbacks for waiting queries in reuse write list */
static void reuse_cb_writewait_for_failure(struct pending_tcp* pend, int err)
{
struct waiting_tcp* w;
w = pend->reuse.write_wait_first;
while(w) {
comm_point_callback_type* cb = w->cb;
void* cb_arg = w->cb_arg;
fptr_ok(fptr_whitelist_pending_tcp(cb));
(void)(*cb)(NULL, cb_arg, err, NULL);
w = w->write_wait_next;
}
}
/** perform failure callbacks for waiting queries in reuse read rbtree */
static void reuse_cb_readwait_for_failure(struct pending_tcp* pend, int err)
{
rbnode_type* node;
if(pend->reuse.tree_by_id.root == NULL ||
pend->reuse.tree_by_id.root == RBTREE_NULL)
return;
node = rbtree_first(&pend->reuse.tree_by_id);
while(node && node != RBTREE_NULL) {
struct waiting_tcp* w = (struct waiting_tcp*)node->key;
comm_point_callback_type* cb = w->cb;
void* cb_arg = w->cb_arg;
fptr_ok(fptr_whitelist_pending_tcp(cb));
(void)(*cb)(NULL, cb_arg, err, NULL);
node = rbtree_next(node);
}
}
/** perform failure callbacks for current written query in reuse struct */
static void reuse_cb_curquery_for_failure(struct pending_tcp* pend, int err)
{
struct waiting_tcp* w = pend->query;
if(w) {
comm_point_callback_type* cb = w->cb;
void* cb_arg = w->cb_arg;
fptr_ok(fptr_whitelist_pending_tcp(cb));
(void)(*cb)(NULL, cb_arg, err, NULL);
}
}
void
outnet_tcptimer(void* arg)
{
@ -1683,6 +1756,15 @@ reuse_tcp_find(struct outside_network* outnet, struct serviced_query* sq)
return NULL;
}
/** insert element in tree by id */
static void
reuse_tree_by_id_insert(struct reuse_tcp* reuse, struct waiting_tcp* w)
{
log_assert(w->id_node.key == NULL);
w->id_node.key = w;
rbtree_insert(&reuse->tree_by_id, &w->id_node);
}
/** find element in tree by id */
static struct waiting_tcp*
reuse_tcp_by_id_find(struct reuse_tcp* reuse, uint16_t id)
@ -1779,7 +1861,6 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
struct pending_tcp* pend = sq->outnet->tcp_free;
struct reuse_tcp* reuse = NULL;
struct waiting_tcp* w;
struct timeval tv;
uint16_t id;
verbose(5, "pending_tcp_query");
@ -1827,11 +1908,7 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
w->cb_arg = callback_arg;
w->ssl_upstream = sq->ssl_upstream;
w->tls_auth_name = sq->tls_auth_name;
#ifndef S_SPLINT_S
tv.tv_sec = timeout/1000;
tv.tv_usec = (timeout%1000)*1000;
#endif
comm_timer_set(w->timer, &tv);
w->timeout = timeout;
if(pend) {
/* we have a buffer available right now */
if(reuse) {
@ -1839,15 +1916,19 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
/* if cannot write now, store query and put it
* in the waiting list for this stream TODO */
/* and insert in tree_by_id */
reuse_tree_by_id_insert(&pend->reuse, w);
/* and also delete it from waitlst if query gone,
* eg. sq is deleted TODO */
/* and also servfail all waiting queries if
* stream closes TODO */
/* reuse existing fd, write query and continue */
w->next_waiting = (void*)pend;
pend->query = w;
outnet_tcp_take_query_setup(pend->c->fd, pend,
w->pkt, w->pkt_len);
if(pend->query == NULL) {
/* write straight away */
pend->query = w;
outnet_tcp_take_query_setup(pend->c->fd, pend,
w);
}
} else {
verbose(5, "pending_tcp_query: new fd, connect");
/* create new fd and connect to addr, setup to
@ -1869,6 +1950,7 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
comm_tcp, sq->zone, sq->zonelen, packet);
#endif
} else {
struct timeval tv;
/* queue up */
verbose(5, "pending_tcp_query: queue to wait");
w->next_waiting = NULL;
@ -1877,6 +1959,11 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
else sq->outnet->tcp_wait_first = w;
sq->outnet->tcp_wait_last = w;
w->on_tcp_waiting_list = 1;
#ifndef S_SPLINT_S
tv.tv_sec = timeout/1000;
tv.tv_usec = (timeout%1000)*1000;
#endif
comm_timer_set(w->timer, &tv);
}
return w;
}

View file

@ -360,6 +360,8 @@ struct waiting_tcp {
/** timeout event; timer keeps running whether the query is
* waiting for a buffer or the tcp reply is pending */
struct comm_timer* timer;
/** timeout in msec */
int timeout;
/** the outside network it is part of */
struct outside_network* outnet;
/** remote address. */

View file

@ -3221,7 +3221,9 @@ comm_point_start_listening(struct comm_point* c, int newfd, int msec)
}
if(c->type == comm_tcp || c->type == comm_http) {
ub_event_del_bits(c->ev->ev, UB_EV_READ|UB_EV_WRITE);
if(c->tcp_is_reading)
if(c->tcp_write_and_read)
ub_event_add_bits(c->ev->ev, UB_EV_READ|UB_EV_WRITE);
else if(c->tcp_is_reading)
ub_event_add_bits(c->ev->ev, UB_EV_READ);
else ub_event_add_bits(c->ev->ev, UB_EV_WRITE);
}