Stream reuse branch, for TCP and TLS stream reuse.

This is for upstream pipes and using them again for the next query.

Signposted code for reuse_tcp structure in outside_network.h
This commit is contained in:
W.C.A. Wijngaards 2020-01-16 17:12:32 +01:00
parent 1e0c957dcd
commit 57aefd102e
4 changed files with 356 additions and 21 deletions

View file

@ -131,6 +131,34 @@ serviced_cmp(const void* key1, const void* key2)
return sockaddr_cmp(&q1->addr, q1->addrlen, &q2->addr, q2->addrlen);
}
int
reuse_cmp(const void* key1, const void* key2)
{
struct reuse_tcp* r1 = (struct reuse_tcp*)key1;
struct reuse_tcp* r2 = (struct reuse_tcp*)key2;
int r;
/* make sure the entries are in use (have a waiting_tcp entry) */
if(!r1->pending->query && !r2->pending->query)
return 0;
if(r1->pending->query && !r2->pending->query)
return 1;
if(!r1->pending->query && r2->pending->query)
return -1;
/* compare address and port */
r = sockaddr_cmp(&r1->pending->query->addr, r1->pending->query->addrlen,
&r2->pending->query->addr, r2->pending->query->addrlen);
if(r != 0)
return r;
/* compare if SSL-enabled */
if(r1->pending->c->ssl && !r2->pending->c->ssl)
return 1;
if(!r1->pending->c->ssl && r2->pending->c->ssl)
return -1;
return 0;
}
/** delete waiting_tcp entry. Does not unlink from waiting list.
* @param w: to delete.
*/
@ -281,6 +309,20 @@ outnet_tcp_connect(int s, struct sockaddr_storage* addr, socklen_t addrlen)
return 1;
}
/** use the buffer to setup writing the query */
static void
outnet_tcp_take_query_setup(int s, struct pending_tcp* pend, uint8_t* pkt,
size_t pkt_len)
{
pend->id = LDNS_ID_WIRE(pkt);
sldns_buffer_clear(pend->c->buffer);
sldns_buffer_write(pend->c->buffer, pkt, pkt_len);
sldns_buffer_flip(pend->c->buffer);
pend->c->tcp_is_reading = 0;
pend->c->tcp_byte_count = 0;
comm_point_start_listening(pend->c, s, -1);
}
/** use next free buffer to service a tcp query */
static int
outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt, size_t pkt_len)
@ -412,19 +454,13 @@ outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt, size_t pkt_len)
}
w->pkt = NULL;
w->next_waiting = (void*)pend;
pend->id = LDNS_ID_WIRE(pkt);
w->outnet->num_tcp_outgoing++;
w->outnet->tcp_free = pend->next_free;
pend->next_free = NULL;
pend->query = w;
pend->c->repinfo.addrlen = w->addrlen;
memcpy(&pend->c->repinfo.addr, &w->addr, w->addrlen);
sldns_buffer_clear(pend->c->buffer);
sldns_buffer_write(pend->c->buffer, pkt, pkt_len);
sldns_buffer_flip(pend->c->buffer);
pend->c->tcp_is_reading = 0;
pend->c->tcp_byte_count = 0;
comm_point_start_listening(pend->c, s, -1);
outnet_tcp_take_query_setup(s, pend, pkt, pkt_len);
return 1;
}
@ -449,6 +485,42 @@ use_free_buffer(struct outside_network* outnet)
}
}
/** remove reused element from tree and lru list */
static void
reuse_tcp_remove_tree_list(struct outside_network* outnet,
struct reuse_tcp* reuse)
{
if(reuse->node.key) {
/* delete it from reuse tree */
(void)rbtree_delete(&outnet->tcp_reuse, &reuse->node);
reuse->node.key = NULL;
}
/* delete from reuse list */
if(reuse->pending) {
if(reuse->prev) {
/* assert that members of the lru list are waiting
* and thus have a pending pointer to the struct */
log_assert(reuse->prev->pending);
reuse->prev->next = reuse->next;
} else {
log_assert(!reuse->next || reuse->next->pending);
outnet->tcp_reuse_first =
(reuse->next?reuse->next->pending:NULL);
}
if(reuse->next) {
/* assert that members of the lru list are waiting
* and thus have a pending pointer to the struct */
log_assert(reuse->next->pending);
reuse->next->prev = reuse->prev;
} else {
log_assert(!reuse->prev || reuse->prev->pending);
outnet->tcp_reuse_last =
(reuse->prev?reuse->prev->pending:NULL);
}
reuse->pending = NULL;
}
}
/** decommission a tcp buffer, closes commpoint and frees waiting_tcp entry */
static void
decommission_pending_tcp(struct outside_network* outnet,
@ -464,9 +536,37 @@ decommission_pending_tcp(struct outside_network* outnet,
comm_point_close(pend->c);
pend->next_free = outnet->tcp_free;
outnet->tcp_free = pend;
if(pend->reuse.pending) {
/* needs unlink from the reuse tree to get deleted */
reuse_tcp_remove_tree_list(outnet, &pend->reuse);
}
waiting_tcp_delete(pend->query);
pend->query = NULL;
use_free_buffer(outnet);
}
/** insert into reuse tcp tree and LRU, false on failure (duplicate) */
static int
reuse_tcp_insert(struct outside_network* outnet, struct pending_tcp* pend_tcp)
{
pend_tcp->reuse.node.key = &pend_tcp->reuse;
pend_tcp->reuse.pending = pend_tcp;
if(!rbtree_insert(&outnet->tcp_reuse, &pend_tcp->reuse.node)) {
/* this is a duplicate connection, close this one */
pend_tcp->reuse.node.key = NULL;
pend_tcp->reuse.pending = NULL;
return 0;
}
/* insert into LRU, first is newest */
pend_tcp->reuse.prev = NULL;
if(outnet->tcp_reuse_first) {
pend_tcp->reuse.next = &outnet->tcp_reuse_first->reuse;
outnet->tcp_reuse_first->reuse.prev = &pend_tcp->reuse;
} else {
pend_tcp->reuse.next = NULL;
outnet->tcp_reuse_last = pend_tcp;
}
outnet->tcp_reuse_first = pend_tcp;
return 1;
}
int
@ -489,9 +589,21 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
error = NETEVENT_CLOSED;
}
}
if(error == NETEVENT_NOERROR) {
/* add to reuse tree so it can be reused, if not a failure.
* This is possible if the state machine wants to make a tcp
* query again to the same destination. */
(void)reuse_tcp_insert(outnet, pend);
}
fptr_ok(fptr_whitelist_pending_tcp(pend->query->cb));
(void)(*pend->query->cb)(c, pend->query->cb_arg, error, reply_info);
/* if reused, it should not be decommissioned, TODO */
/* or if another query wants to write, write that and read for more
* or more outstanding queries on the stream. TODO */
/* TODO also write multiple queries over the stream, even if no
* replies have returned yet */
decommission_pending_tcp(outnet, pend);
use_free_buffer(outnet);
return 0;
}
@ -816,6 +928,8 @@ outside_network_create(struct comm_base *base, size_t bufsize,
outside_network_delete(outnet);
return NULL;
}
rbtree_init(&outnet->tcp_reuse, reuse_cmp);
outnet->tcp_reuse_max = num_tcp;
/* allocate commpoints */
for(k=0; k<num_ports; k++) {
@ -977,6 +1091,8 @@ outside_network_delete(struct outside_network* outnet)
if(outnet->tcp_conns[i]) {
comm_point_delete(outnet->tcp_conns[i]->c);
waiting_tcp_delete(outnet->tcp_conns[i]->query);
/* TODO: loop over tcpwrite wait list and
* delete waiting_tcp_delete them */
free(outnet->tcp_conns[i]);
}
free(outnet->tcp_conns);
@ -989,6 +1105,10 @@ outside_network_delete(struct outside_network* outnet)
p = np;
}
}
/* was allocated in struct pending that was deleted above */
rbtree_init(&outnet->tcp_reuse, reuse_cmp);
outnet->tcp_reuse_first = NULL;
outnet->tcp_reuse_last = NULL;
if(outnet->udp_wait_first) {
struct pending* p = outnet->udp_wait_first, *np;
while(p) {
@ -1283,14 +1403,20 @@ outnet_tcptimer(void* arg)
{
struct waiting_tcp* w = (struct waiting_tcp*)arg;
struct outside_network* outnet = w->outnet;
comm_point_callback_type* cb;
void* cb_arg;
int do_callback = 1;
if(w->pkt) {
/* it is on the waiting list */
waiting_list_remove(outnet, w);
} else {
/* it was in use */
struct pending_tcp* pend=(struct pending_tcp*)w->next_waiting;
/* see if it needs unlink from reuse tree */
if(pend->reuse.pending) {
reuse_tcp_remove_tree_list(outnet, &pend->reuse);
do_callback = 0;
}
/* do failure callbacks for all the queries in the
* wait for write list and in the id-tree TODO */
if(pend->c->ssl) {
#ifdef HAVE_SSL
SSL_shutdown(pend->c->ssl);
@ -1303,23 +1429,100 @@ outnet_tcptimer(void* arg)
pend->next_free = outnet->tcp_free;
outnet->tcp_free = pend;
}
cb = w->cb;
cb_arg = w->cb_arg;
waiting_tcp_delete(w);
fptr_ok(fptr_whitelist_pending_tcp(cb));
(void)(*cb)(NULL, cb_arg, NETEVENT_TIMEOUT, NULL);
if(do_callback) {
comm_point_callback_type* cb = w->cb;
void* cb_arg = w->cb_arg;
waiting_tcp_delete(w);
fptr_ok(fptr_whitelist_pending_tcp(cb));
(void)(*cb)(NULL, cb_arg, NETEVENT_TIMEOUT, NULL);
} else {
waiting_tcp_delete(w);
}
use_free_buffer(outnet);
}
/** close the oldest reuse_tcp connection to make a fd and struct pend
* available for a new stream connection */
static void
reuse_tcp_close_oldest(struct outside_network* outnet)
{
struct pending_tcp* pend;
if(!outnet->tcp_reuse_last) return;
pend = outnet->tcp_reuse_last;
/* snip off of LRU */
log_assert(pend->reuse.next == NULL);
if(pend->reuse.prev) {
log_assert(pend->reuse.prev->pending);
outnet->tcp_reuse_last = pend->reuse.prev->pending;
pend->reuse.prev->next = NULL;
} else {
outnet->tcp_reuse_last = NULL;
outnet->tcp_reuse_first = NULL;
}
/* TODO should only close unused in tree, not ones that are in use,
* for which we need also a tree to find in-use streams for multiple
* queries on them */
/* free up */
decommission_pending_tcp(outnet, pend);
}
/** find reuse tcp stream to destination for query, or NULL if none */
static struct reuse_tcp*
reuse_tcp_find(struct outside_network* outnet, struct serviced_query* sq)
{
struct waiting_tcp key_w;
struct pending_tcp key_p;
struct comm_point c;
memset(&key_w, 0, sizeof(key_w));
memset(&key_p, 0, sizeof(key_p));
memset(&c, 0, sizeof(c));
key_p.query = &key_w;
key_p.c = &c;
key_p.reuse.pending = &key_p;
key_p.reuse.node.key = &key_p.reuse;
if(sq->ssl_upstream) /* something nonNULL for comparisons in tree */
key_p.c->ssl = (void*)1;
if(sq->addrlen > sizeof(key_w.addr))
return NULL;
memmove(&key_w.addr, &sq->addr, sq->addrlen);
key_w.addrlen = sq->addrlen;
return (struct reuse_tcp*)rbtree_search(&outnet->tcp_reuse,
&key_p.reuse.node);
}
struct waiting_tcp*
pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
int timeout, comm_point_callback_type* callback, void* callback_arg)
{
struct pending_tcp* pend = sq->outnet->tcp_free;
struct reuse_tcp* reuse = NULL;
struct waiting_tcp* w;
struct timeval tv;
uint16_t id;
/* find out if a reused stream to the target exists */
/* if so, take it into use */
reuse = reuse_tcp_find(sq->outnet, sq);
if(reuse) {
log_assert(reuse->pending);
pend = reuse->pending;
}
/* if !pend but we have reuse streams, close a reuse stream
* to be able to open a new one to this target, no use waiting
* to reuse a file descriptor while another query needs to use
* that buffer and file descriptor now. */
if(!pend) {
reuse_tcp_close_oldest(sq->outnet);
pend = sq->outnet->tcp_free;
}
/* if no buffer is free allocate space to store query */
/* TODO: if reuse cannot write right now, store query even though
* pend is nonNULL */
w = (struct waiting_tcp*)malloc(sizeof(struct waiting_tcp)
+ (pend?0:sldns_buffer_limit(packet)));
if(!w) {
@ -1347,10 +1550,26 @@ pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
comm_timer_set(w->timer, &tv);
if(pend) {
/* we have a buffer available right now */
if(!outnet_tcp_take_into_use(w, sldns_buffer_begin(packet),
sldns_buffer_limit(packet))) {
waiting_tcp_delete(w);
return NULL;
if(reuse) {
/* if cannot write now, store query and put it
* in the waiting list for this stream TODO */
/* and also delete it from waitlst if query gone,
* eg. sq is deleted TODO */
/* and also servfail all waiting queries if
* stream closes TODO */
/* reuse existing fd, write query and continue */
outnet_tcp_take_query_setup(pend->c->fd, pend,
sldns_buffer_begin(packet),
sldns_buffer_limit(packet));
} else {
/* create new fd and connect to addr, setup to
* write query */
if(!outnet_tcp_take_into_use(w,
sldns_buffer_begin(packet),
sldns_buffer_limit(packet))) {
waiting_tcp_delete(w);
return NULL;
}
}
#ifdef USE_DNSTAP
if(sq->outnet->dtenv &&
@ -1502,6 +1721,42 @@ waiting_list_remove(struct outside_network* outnet, struct waiting_tcp* w)
}
}
/** reuse tcp stream, remove serviced query from stream,
* return true if the stream is kept, false if it is to be closed */
static int
reuse_tcp_remove_serviced_keep(struct waiting_tcp* w,
struct serviced_query* sq)
{
struct pending_tcp* pend_tcp = (struct pending_tcp*)w->next_waiting;
/* see if can be entered in reuse tree
* for that the FD has to be non-1 */
if(pend_tcp->c->fd == -1) {
return 0;
}
/* if in tree and used by other queries */
if(pend_tcp->reuse.node.key) {
/* note less use of stream */
/* remove id value used by this svcd. */
/* do not reset the keepalive timer, for that
* we'd need traffic, and this is where the servicedq is
* removed due to state machine internal reasons,
* eg. iterator no longer interested in this query */
return 1;
}
/* if still open and want to keep it open */
if(pend_tcp->c->fd != -1 && sq->outnet->tcp_reuse.count <
sq->outnet->tcp_reuse_max) {
/* note less use of stream */
/* remove id value used by this svcd. */
/* set a keepalive timer on it */
if(!reuse_tcp_insert(sq->outnet, pend_tcp)) {
return 0;
}
return 1;
}
return 0;
}
/** cleanup serviced query entry */
static void
serviced_delete(struct serviced_query* sq)
@ -1522,9 +1777,14 @@ serviced_delete(struct serviced_query* sq)
} else {
struct waiting_tcp* p = (struct waiting_tcp*)
sq->pending;
/* TODO: if on stream-write-waiting list then
* remove from waiting list and waiting_tcp_delete */
if(p->pkt == NULL) {
decommission_pending_tcp(sq->outnet,
(struct pending_tcp*)p->next_waiting);
if(!reuse_tcp_remove_serviced_keep(p, sq)) {
decommission_pending_tcp(sq->outnet,
(struct pending_tcp*)p->next_waiting);
use_free_buffer(sq->outnet);
}
} else {
waiting_list_remove(sq->outnet, p);
waiting_tcp_delete(p);

View file

@ -52,6 +52,7 @@ struct ub_randstate;
struct pending_tcp;
struct waiting_tcp;
struct waiting_udp;
struct reuse_tcp;
struct infra_cache;
struct port_comm;
struct port_if;
@ -150,6 +151,21 @@ struct outside_network {
size_t num_tcp;
/** number of tcp communication points in use. */
size_t num_tcp_outgoing;
/**
* tree of still-open and waiting tcp connections for reuse.
* can be closed and reopened to get a new tcp connection.
* or reused to the same destination again. with timeout to close.
* Entries are of type struct reuse_tcp.
* The entries are both active and empty connections.
*/
rbtree_type tcp_reuse;
/** max number of tcp_reuse entries we want to keep open */
size_t tcp_reuse_max;
/** first and last(oldest) in lru list of reuse connections.
* the oldest can be closed to get a new free pending_tcp if needed
* The list contains empty connections, that wait for timeout or
* a new query that can use the existing connection. */
struct pending_tcp* tcp_reuse_first, *tcp_reuse_last;
/** list of tcp comm points that are free for use */
struct pending_tcp* tcp_free;
/** list of tcp queries waiting for a buffer */
@ -205,6 +221,43 @@ struct port_comm {
struct comm_point* cp;
};
/**
* Reuse TCP connection, still open can be used again.
*/
struct reuse_tcp {
/** rbtree node with links in tcp_reuse tree. key is NULL when not
* in tree. Both active and empty connections are in the tree. */
rbnode_type node;
/** lru chain, so that the oldest can be removed to get a new
* connection when all are in (re)use. oldest is last in list.
* The lru only contains empty connections waiting for reuse,
* the ones with active queries are not on the list because they
* do not need to be closed to make space for others. They already
* service a query so the close for another query does not help
* service a larger number of queries.
* TODO
*/
struct reuse_tcp* next, *prev;
/** the connection to reuse, the fd is non-1 and is open.
* the addr and port determine where the connection is going,
* and is key to the rbtree. The SSL ptr determines if it is
* a TLS connection or a plain TCP connection there. And TLS
* or not is also part of the key to the rbtree.
* There is a timeout and read event on the fd, to close it.
*/
struct pending_tcp* pending;
/** rbtree with other queries waiting on the connection, by ID number,
* of type struct waiting_tcp. It is for looking up received
* answers to the structure for callback. And also to see if ID
* numbers are unused and can be used for a new query. TODO */
rbtree_type tree_by_id;
/** list of queries waiting to be written on the channel,
* if NULL no queries are waiting to be written and the pending->query
* is the query currently serviced. The first is the next in line.
* Once written, a query moves to the tree_by_id. TODO */
struct waiting_tcp* write_wait_first, *write_wait_last;
};
/**
* A query that has an answer pending for it.
*/
@ -255,6 +308,11 @@ struct pending_tcp {
struct comm_point* c;
/** the query being serviced, NULL if the pending_tcp is unused. */
struct waiting_tcp* query;
/** the pre-allocated reuse tcp structure. if ->pending is nonNULL
* it is in use and the connection is waiting for reuse.
* It is here for memory pre-allocation, and used to make this
* pending_tcp wait for reuse. */
struct reuse_tcp reuse;
};
/**
@ -266,6 +324,13 @@ struct waiting_tcp {
* if pkt==0, this points to the pending_tcp structure.
*/
struct waiting_tcp* next_waiting;
/** next and prev in query waiting list for stream connection */
struct waiting_tcp* write_wait_prev, *write_wait_next;
/** true if the waiting_tcp structure is on the write_wait queue */
int write_wait_queued;
/** entry in reuse.tree_by_id, if key is NULL, not in tree, otherwise,
* this struct is key and sorted by ID from pending_tcp->id. */
rbnode_type id_node;
/** timeout event; timer keeps running whether the query is
* waiting for a buffer or the tcp reply is pending */
struct comm_timer* timer;
@ -635,4 +700,7 @@ int pending_cmp(const void* key1, const void* key2);
/** compare function of serviced query rbtree */
int serviced_cmp(const void* key1, const void* key2);
/** compare function of reuse_tcp rbtree */
int reuse_cmp(const void* key1, const void* key2);
#endif /* OUTSIDE_NETWORK_H */

View file

@ -1488,6 +1488,12 @@ int serviced_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b))
return 0;
}
int reuse_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b))
{
log_assert(0);
return 0;
}
/* timers in testbound for autotrust. statistics tested in tdir. */
struct comm_timer* comm_timer_create(struct comm_base* base,
void (*cb)(void*), void* cb_arg)

View file

@ -210,6 +210,7 @@ fptr_whitelist_rbtree_cmp(int (*fptr) (const void *, const void *))
else if(fptr == &fwd_cmp) return 1;
else if(fptr == &pending_cmp) return 1;
else if(fptr == &serviced_cmp) return 1;
else if(fptr == &reuse_cmp) return 1;
else if(fptr == &name_tree_compare) return 1;
else if(fptr == &order_lock_cmp) return 1;
else if(fptr == &codeline_cmp) return 1;