in outside_network.c: also log messages that end up on the waiting list.

with dnstap.
for tcp use_free_buffer reuse existing entry if second wait entry on the
same addr as the other waiting.
This commit is contained in:
W.C.A. Wijngaards 2020-06-25 16:05:25 +02:00
parent 39a50f30a3
commit 34c063701e

View file

@ -344,6 +344,127 @@ outnet_tcp_connect(int s, struct sockaddr_storage* addr, socklen_t addrlen)
return 1;
}
/** log reuse item addr and ptr with message */
static void
log_reuse_tcp(enum verbosity_value v, const char* msg, struct reuse_tcp* reuse)
{
uint16_t port;
char addrbuf[128];
if(verbosity < v) return;
addr_to_str(&reuse->addr, reuse->addrlen, addrbuf, sizeof(addrbuf));
port = ntohs(((struct sockaddr_in*)&reuse->addr)->sin_port);
verbose(v, "%s %s#%u 0x%llx fd %d", msg, addrbuf, (unsigned)port,
(unsigned long long)reuse, reuse->pending->c->fd);
}
/** pop the first element from the writewait list */
static struct waiting_tcp* reuse_write_wait_pop(struct reuse_tcp* reuse)
{
struct waiting_tcp* w = reuse->write_wait_first;
if(!w)
return NULL;
log_assert(w->write_wait_queued);
if(w->write_wait_prev)
w->write_wait_prev->write_wait_next = w->write_wait_next;
else reuse->write_wait_first = w->write_wait_next;
if(w->write_wait_next)
w->write_wait_next->write_wait_prev = w->write_wait_prev;
else reuse->write_wait_last = w->write_wait_prev;
w->write_wait_queued = 0;
return w;
}
/** push the element after the last on the writewait list */
static void reuse_write_wait_push_back(struct reuse_tcp* reuse,
struct waiting_tcp* w)
{
if(!w) return;
log_assert(!w->write_wait_queued);
if(reuse->write_wait_last) {
reuse->write_wait_last->write_wait_next = w;
w->write_wait_prev = reuse->write_wait_last;
} else {
reuse->write_wait_first = w;
}
reuse->write_wait_last = w;
w->write_wait_queued = 1;
}
/** insert element in tree by id */
static void
reuse_tree_by_id_insert(struct reuse_tcp* reuse, struct waiting_tcp* w)
{
log_assert(w->id_node.key == NULL);
w->id_node.key = w;
rbtree_insert(&reuse->tree_by_id, &w->id_node);
}
/** find reuse tcp stream to destination for query, or NULL if none */
static struct reuse_tcp*
reuse_tcp_find(struct outside_network* outnet, struct sockaddr_storage* addr,
socklen_t addrlen, int use_ssl)
{
struct waiting_tcp key_w;
struct pending_tcp key_p;
struct comm_point c;
rbnode_type* result = NULL, *prev;
verbose(5, "reuse_tcp_find");
memset(&key_w, 0, sizeof(key_w));
memset(&key_p, 0, sizeof(key_p));
memset(&c, 0, sizeof(c));
key_p.query = &key_w;
key_p.c = &c;
key_p.reuse.pending = &key_p;
key_p.reuse.node.key = &key_p.reuse;
if(use_ssl) /* something nonNULL for comparisons in tree */
key_p.c->ssl = (void*)1;
if(addrlen > sizeof(key_p.reuse.addr))
return NULL;
memmove(&key_p.reuse.addr, addr, addrlen);
key_p.reuse.addrlen = addrlen;
verbose(5, "reuse_tcp_find: num reuse streams %u",
(unsigned)outnet->tcp_reuse.count);
if(outnet->tcp_reuse.root == NULL ||
outnet->tcp_reuse.root == RBTREE_NULL)
return NULL;
if(rbtree_find_less_equal(&outnet->tcp_reuse, &key_p.reuse.node,
&result)) {
/* exact match */
/* but the key is on stack, and ptr is compared, impossible */
log_assert(&key_p.reuse != (struct reuse_tcp*)result);
log_assert(&key_p != ((struct reuse_tcp*)result)->pending);
}
/* not found, return null */
if(!result || result == RBTREE_NULL)
return NULL;
verbose(5, "reuse_tcp_find check inexact match");
/* inexact match, find one of possibly several connections to the
* same destination address, with the correct port, ssl, and
* also less than max number of open queries, or else, fail to open
* a new one */
/* rewind to start of sequence of same address,port,ssl */
prev = rbtree_previous(result);
while(prev && prev != RBTREE_NULL &&
reuse_cmp_addrportssl(prev->key, &key_p.reuse) == 0) {
result = prev;
prev = rbtree_previous(result);
}
/* loop to find first one that has correct characteristics */
while(result && result != RBTREE_NULL &&
reuse_cmp_addrportssl(result->key, &key_p.reuse) == 0) {
if(((struct reuse_tcp*)result)->tree_by_id.count <
MAX_REUSE_TCP_QUERIES) {
/* same address, port, ssl-yes-or-no, and has
* space for another query */
return (struct reuse_tcp*)result;
}
result = rbtree_next(result);
}
return NULL;
}
/** use the buffer to setup writing the query */
static void
outnet_tcp_take_query_setup(int s, struct pending_tcp* pend,
@ -482,6 +603,7 @@ outnet_tcp_take_into_use(struct waiting_tcp* w)
pend->c->repinfo.addrlen = w->addrlen;
memcpy(&pend->c->repinfo.addr, &w->addr, w->addrlen);
pend->reuse.pending = pend;
reuse_tree_by_id_insert(&pend->reuse, w);
outnet_tcp_take_query_setup(s, pend, w);
return 1;
}
@ -493,17 +615,32 @@ use_free_buffer(struct outside_network* outnet)
struct waiting_tcp* w;
while(outnet->tcp_free && outnet->tcp_wait_first
&& !outnet->want_to_quit) {
struct reuse_tcp* reuse = NULL;
w = outnet->tcp_wait_first;
outnet->tcp_wait_first = w->next_waiting;
if(outnet->tcp_wait_last == w)
outnet->tcp_wait_last = NULL;
w->on_tcp_waiting_list = 0;
if(!outnet_tcp_take_into_use(w)) {
comm_point_callback_type* cb = w->cb;
void* cb_arg = w->cb_arg;
waiting_tcp_delete(w);
fptr_ok(fptr_whitelist_pending_tcp(cb));
(void)(*cb)(NULL, cb_arg, NETEVENT_CLOSED, NULL);
reuse = reuse_tcp_find(outnet, &w->addr, w->addrlen,
w->ssl_upstream);
if(reuse) {
log_reuse_tcp(5, "use free buffer for waiting tcp: "
"found reuse", reuse);
if(reuse->pending->query) {
/* on the write wait list */
comm_timer_disable(w->timer);
w->next_waiting = (void*)reuse->pending;
reuse_tree_by_id_insert(reuse, w);
reuse_write_wait_push_back(reuse, w);
}
} else {
if(!outnet_tcp_take_into_use(w)) {
comm_point_callback_type* cb = w->cb;
void* cb_arg = w->cb_arg;
waiting_tcp_delete(w);
fptr_ok(fptr_whitelist_pending_tcp(cb));
(void)(*cb)(NULL, cb_arg, NETEVENT_CLOSED, NULL);
}
}
}
}
@ -602,19 +739,6 @@ decommission_pending_tcp(struct outside_network* outnet,
reuse_del_writewait(pend);
}
/** log reuse item addr and ptr with message */
static void
log_reuse_tcp(enum verbosity_value v, const char* msg, struct reuse_tcp* reuse)
{
uint16_t port;
char addrbuf[128];
if(verbosity < v) return;
addr_to_str(&reuse->addr, reuse->addrlen, addrbuf, sizeof(addrbuf));
port = ntohs(((struct sockaddr_in*)&reuse->addr)->sin_port);
verbose(v, "%s %s#%u 0x%llx fd %d", msg, addrbuf, (unsigned)port,
(unsigned long long)reuse, reuse->pending->c->fd);
}
/** insert into reuse tcp tree and LRU, false on failure (duplicate) */
static int
reuse_tcp_insert(struct outside_network* outnet, struct pending_tcp* pend_tcp)
@ -697,22 +821,6 @@ reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w)
w->id_node.key = NULL;
}
/** pop the first element from the writewait list */
static struct waiting_tcp* reuse_write_wait_pop(struct pending_tcp* pend)
{
struct waiting_tcp* w = pend->reuse.write_wait_first;
if(!w)
return NULL;
if(w->write_wait_prev)
w->write_wait_prev->write_wait_next = w->write_wait_next;
else pend->reuse.write_wait_first = w->write_wait_next;
if(w->write_wait_next)
w->write_wait_next->write_wait_prev = w->write_wait_prev;
else pend->reuse.write_wait_last = w->write_wait_prev;
w->write_wait_queued = 0;
return w;
}
/** set timeout on tcp fd and setup read event to catch incoming dns msgs */
static void
reuse_tcp_setup_readtimeout(struct pending_tcp* pend_tcp)
@ -756,7 +864,7 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
pend->query = NULL;
/* setup to write next packet or setup read timeout */
if(pend->reuse.write_wait_first) {
pend->query = reuse_write_wait_pop(pend);
pend->query = reuse_write_wait_pop(&pend->reuse);
outnet_tcp_take_query_setup(pend->c->fd, pend,
pend->query);
} else {
@ -1691,80 +1799,6 @@ reuse_tcp_close_oldest(struct outside_network* outnet)
decommission_pending_tcp(outnet, pend);
}
/** find reuse tcp stream to destination for query, or NULL if none */
static struct reuse_tcp*
reuse_tcp_find(struct outside_network* outnet, struct serviced_query* sq)
{
struct waiting_tcp key_w;
struct pending_tcp key_p;
struct comm_point c;
rbnode_type* result = NULL, *prev;
verbose(5, "reuse_tcp_find");
memset(&key_w, 0, sizeof(key_w));
memset(&key_p, 0, sizeof(key_p));
memset(&c, 0, sizeof(c));
key_p.query = &key_w;
key_p.c = &c;
key_p.reuse.pending = &key_p;
key_p.reuse.node.key = &key_p.reuse;
if(sq->ssl_upstream) /* something nonNULL for comparisons in tree */
key_p.c->ssl = (void*)1;
if(sq->addrlen > sizeof(key_p.reuse.addr))
return NULL;
memmove(&key_p.reuse.addr, &sq->addr, sq->addrlen);
key_p.reuse.addrlen = sq->addrlen;
verbose(5, "reuse_tcp_find: num reuse streams %u",
(unsigned)outnet->tcp_reuse.count);
if(outnet->tcp_reuse.root == NULL ||
outnet->tcp_reuse.root == RBTREE_NULL)
return NULL;
if(rbtree_find_less_equal(&outnet->tcp_reuse, &key_p.reuse.node,
&result)) {
/* exact match */
/* but the key is on stack, and ptr is compared, impossible */
log_assert(&key_p.reuse != (struct reuse_tcp*)result);
log_assert(&key_p != ((struct reuse_tcp*)result)->pending);
}
/* not found, return null */
if(!result || result == RBTREE_NULL)
return NULL;
verbose(5, "reuse_tcp_find check inexact match");
/* inexact match, find one of possibly several connections to the
* same destination address, with the correct port, ssl, and
* also less than max number of open queries, or else, fail to open
* a new one */
/* rewind to start of sequence of same address,port,ssl */
prev = rbtree_previous(result);
while(prev && prev != RBTREE_NULL &&
reuse_cmp_addrportssl(prev->key, &key_p.reuse) == 0) {
result = prev;
prev = rbtree_previous(result);
}
/* loop to find first one that has correct characteristics */
while(result && result != RBTREE_NULL &&
reuse_cmp_addrportssl(result->key, &key_p.reuse) == 0) {
if(((struct reuse_tcp*)result)->tree_by_id.count <
MAX_REUSE_TCP_QUERIES) {
/* same address, port, ssl-yes-or-no, and has
* space for another query */
return (struct reuse_tcp*)result;
}
result = rbtree_next(result);
}
return NULL;
}
/** insert element in tree by id */
static void
reuse_tree_by_id_insert(struct reuse_tcp* reuse, struct waiting_tcp* w)
{
log_assert(w->id_node.key == NULL);
w->id_node.key = w;
rbtree_insert(&reuse->tree_by_id, &w->id_node);
}
/** find element in tree by id */
static struct waiting_tcp*
reuse_tcp_by_id_find(struct reuse_tcp* reuse, uint16_t id)
@ -1866,7 +1900,8 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
verbose(5, "pending_tcp_query");
/* find out if a reused stream to the target exists */
/* if so, take it into use */
reuse = reuse_tcp_find(sq->outnet, sq);
reuse = reuse_tcp_find(sq->outnet, &sq->addr, sq->addrlen,
sq->ssl_upstream);
if(reuse) {
log_reuse_tcp(5, "pending_tcp_query: found reuse", reuse);
log_assert(reuse->pending);
@ -1909,25 +1944,28 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
w->ssl_upstream = sq->ssl_upstream;
w->tls_auth_name = sq->tls_auth_name;
w->timeout = timeout;
w->id_node.key = NULL;
w->write_wait_prev = NULL;
w->write_wait_next = NULL;
w->write_wait_queued = 0;
if(pend) {
/* we have a buffer available right now */
if(reuse) {
verbose(5, "pending_tcp_query: reuse, store");
/* if cannot write now, store query and put it
* in the waiting list for this stream TODO */
/* and insert in tree_by_id */
reuse_tree_by_id_insert(&pend->reuse, w);
/* and also delete it from waitlst if query gone,
* eg. sq is deleted TODO */
/* and also servfail all waiting queries if
* stream closes TODO */
/* reuse existing fd, write query and continue */
/* store query in tree by id */
w->next_waiting = (void*)pend;
reuse_tree_by_id_insert(&pend->reuse, w);
/* can we write right now? */
if(pend->query == NULL) {
/* write straight away */
pend->query = w;
outnet_tcp_take_query_setup(pend->c->fd, pend,
w);
} else {
/* put it in the waiting list for
* this stream */
reuse_write_wait_push_back(&pend->reuse, w);
}
} else {
verbose(5, "pending_tcp_query: new fd, connect");
@ -1942,13 +1980,6 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
return NULL;
}
}
#ifdef USE_DNSTAP
if(sq->outnet->dtenv &&
(sq->outnet->dtenv->log_resolver_query_messages ||
sq->outnet->dtenv->log_forwarder_query_messages))
dt_msg_send_outside_query(sq->outnet->dtenv, &sq->addr,
comm_tcp, sq->zone, sq->zonelen, packet);
#endif
} else {
struct timeval tv;
/* queue up */
@ -1965,6 +1996,13 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
#endif
comm_timer_set(w->timer, &tv);
}
#ifdef USE_DNSTAP
if(sq->outnet->dtenv &&
(sq->outnet->dtenv->log_resolver_query_messages ||
sq->outnet->dtenv->log_forwarder_query_messages))
dt_msg_send_outside_query(sq->outnet->dtenv, &sq->addr,
comm_tcp, sq->zone, sq->zonelen, packet);
#endif
return w;
}