stream reuse, send queries one by one when upstream refuses multiple queries,

by closing the connection.
This commit is contained in:
W.C.A. Wijngaards 2020-07-27 16:59:46 +02:00
parent 3b7b7adffc
commit 2932d530c1
2 changed files with 74 additions and 21 deletions

View file

@ -759,6 +759,75 @@ use_free_buffer(struct outside_network* outnet)
}
}
/** add waiting_tcp element to the outnet tcp waiting list */
static void
outnet_add_tcp_waiting(struct outside_network* outnet, struct waiting_tcp* w)
{
struct timeval tv;
w->next_waiting = NULL;
if(outnet->tcp_wait_last)
outnet->tcp_wait_last->next_waiting = w;
else outnet->tcp_wait_first = w;
outnet->tcp_wait_last = w;
w->on_tcp_waiting_list = 1;
#ifndef S_SPLINT_S
tv.tv_sec = w->timeout/1000;
tv.tv_usec = (w->timeout%1000)*1000;
#endif
comm_timer_set(w->timer, &tv);
}
/** delete element from tree by id */
static void
reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w)
{
log_assert(w->id_node.key != NULL);
rbtree_delete(&reuse->tree_by_id, w);
w->id_node.key = NULL;
}
/** more writewait list to go for another connection. */
static void
reuse_move_writewait_away(struct outside_network* outnet,
struct pending_tcp* pend)
{
/* the writewait list has not been written yet, so if the
* stream was closed, they have not actually been failed, only
* the queries written. Other queries can get written to another
* stream. For upstreams that do not support multiple queries
* and answers, the stream can get closed, and then the queries
* can get written on a new socket */
struct waiting_tcp* w;
if(pend->query && pend->query->error_count == 0 &&
pend->c->tcp_write_pkt == pend->query->pkt &&
pend->c->tcp_write_pkt_len == pend->query->pkt_len) {
/* since the current query is not written, it can also
* move to a free buffer */
verbose(5, "reuse_move_writewait_away current %d done",
(int)pend->c->tcp_write_byte_count);
pend->c->tcp_write_pkt = NULL;
pend->c->tcp_write_pkt_len = 0;
pend->c->tcp_write_and_read = 0;
pend->c->tcp_more_read_again = 0;
pend->c->tcp_more_write_again = 0;
pend->c->tcp_is_reading = 1;
w = pend->query;
pend->query = NULL;
/* increase error count, so that if the next socket fails too
* the server selection is run again with this query failed
* and it can select a different server (if possible), or
* fail the query */
w->error_count ++;
reuse_tree_by_id_delete(&pend->reuse, w);
outnet_add_tcp_waiting(outnet, w);
}
while((w = reuse_write_wait_pop(&pend->reuse)) != NULL) {
verbose(5, "reuse_move_writewait_away item");
reuse_tree_by_id_delete(&pend->reuse, w);
outnet_add_tcp_waiting(outnet, w);
}
}
/** remove reused element from tree and lru list */
static void
reuse_tcp_remove_tree_list(struct outside_network* outnet,
@ -874,15 +943,6 @@ static void reuse_cb_and_decommission(struct outside_network* outnet,
reuse_del_readwait(&store);
}
/** delete element from tree by id */
static void
reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w)
{
log_assert(w->id_node.key != NULL);
rbtree_delete(&reuse->tree_by_id, w);
w->id_node.key = NULL;
}
/** set timeout on tcp fd and setup read event to catch incoming dns msgs */
static void
reuse_tcp_setup_timeout(struct pending_tcp* pend_tcp)
@ -958,6 +1018,7 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
return 0;
} else if(error != NETEVENT_NOERROR) {
verbose(VERB_QUERY, "outnettcp got tcp error %d", error);
reuse_move_writewait_away(outnet, pend);
/* pass error below and exit */
} else {
/* check ID */
@ -2009,6 +2070,7 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
w->write_wait_prev = NULL;
w->write_wait_next = NULL;
w->write_wait_queued = 0;
w->error_count = 0;
if(pend) {
/* we have a buffer available right now */
if(reuse) {
@ -2047,19 +2109,8 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
/* queue up */
/* waiting for a buffer on the outside network buffer wait
* list */
struct timeval tv;
verbose(5, "pending_tcp_query: queue to wait");
w->next_waiting = NULL;
if(sq->outnet->tcp_wait_last)
sq->outnet->tcp_wait_last->next_waiting = w;
else sq->outnet->tcp_wait_first = w;
sq->outnet->tcp_wait_last = w;
w->on_tcp_waiting_list = 1;
#ifndef S_SPLINT_S
tv.tv_sec = timeout/1000;
tv.tv_usec = (timeout%1000)*1000;
#endif
comm_timer_set(w->timer, &tv);
outnet_add_tcp_waiting(sq->outnet, w);
}
#ifdef USE_DNSTAP
if(sq->outnet->dtenv &&

View file

@ -386,6 +386,8 @@ struct waiting_tcp {
int ssl_upstream;
/** ref to the tls_auth_name from the serviced_query */
char* tls_auth_name;
/** the packet was involved in an error, to stop looping errors */
int error_count;
};
/**