- Initial commit for out-of-order processing for TCP and TLS.

git-svn-id: file:///svn/unbound/trunk@5032 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
Wouter Wijngaards 2019-01-11 14:12:27 +00:00
parent 42d2c04ae1
commit dd19026e91
11 changed files with 611 additions and 44 deletions

View file

@ -744,7 +744,10 @@ listen_dnsport.lo listen_dnsport.o: $(srcdir)/services/listen_dnsport.c config.h
$(srcdir)/services/listen_dnsport.h $(srcdir)/util/netevent.h $(srcdir)/dnscrypt/dnscrypt.h \ $(srcdir)/services/listen_dnsport.h $(srcdir)/util/netevent.h $(srcdir)/dnscrypt/dnscrypt.h \
$(srcdir)/services/outside_network.h $(srcdir)/util/rbtree.h \ $(srcdir)/services/outside_network.h $(srcdir)/util/rbtree.h \
$(srcdir)/util/log.h $(srcdir)/util/config_file.h $(srcdir)/util/net_help.h \ $(srcdir)/util/log.h $(srcdir)/util/config_file.h $(srcdir)/util/net_help.h \
$(srcdir)/sldns/sbuffer.h $(srcdir)/sldns/sbuffer.h $(srcdir)/services/mesh.h $(srcdir)/util/data/msgparse.h \
$(srcdir)/util/storage/lruhash.h $(srcdir)/util/locks.h $(srcdir)/sldns/pkthdr.h $(srcdir)/sldns/rrdef.h \
$(srcdir)/util/module.h $(srcdir)/util/data/msgreply.h $(srcdir)/util/data/packed_rrset.h \
$(srcdir)/services/modstack.h $(srcdir)/util/fptr_wlist.h $(srcdir)/util/tube.h
localzone.lo localzone.o: $(srcdir)/services/localzone.c config.h $(srcdir)/services/localzone.h \ localzone.lo localzone.o: $(srcdir)/services/localzone.c config.h $(srcdir)/services/localzone.h \
$(srcdir)/util/rbtree.h $(srcdir)/util/locks.h $(srcdir)/util/log.h $(srcdir)/util/storage/dnstree.h \ $(srcdir)/util/rbtree.h $(srcdir)/util/locks.h $(srcdir)/util/log.h $(srcdir)/util/storage/dnstree.h \
$(srcdir)/util/module.h $(srcdir)/util/storage/lruhash.h $(srcdir)/util/data/msgreply.h \ $(srcdir)/util/module.h $(srcdir)/util/storage/lruhash.h $(srcdir)/util/data/msgreply.h \
@ -762,7 +765,8 @@ mesh.lo mesh.o: $(srcdir)/services/mesh.c config.h $(srcdir)/services/mesh.h $(s
$(srcdir)/util/data/msgencode.h $(srcdir)/util/timehist.h $(srcdir)/util/fptr_wlist.h $(srcdir)/util/tube.h \ $(srcdir)/util/data/msgencode.h $(srcdir)/util/timehist.h $(srcdir)/util/fptr_wlist.h $(srcdir)/util/tube.h \
$(srcdir)/util/alloc.h $(srcdir)/util/config_file.h $(srcdir)/util/edns.h $(srcdir)/sldns/sbuffer.h \ $(srcdir)/util/alloc.h $(srcdir)/util/config_file.h $(srcdir)/util/edns.h $(srcdir)/sldns/sbuffer.h \
$(srcdir)/sldns/wire2str.h $(srcdir)/services/localzone.h $(srcdir)/util/storage/dnstree.h \ $(srcdir)/sldns/wire2str.h $(srcdir)/services/localzone.h $(srcdir)/util/storage/dnstree.h \
$(srcdir)/services/view.h $(srcdir)/util/data/dname.h $(srcdir)/respip/respip.h $(srcdir)/services/view.h $(srcdir)/util/data/dname.h $(srcdir)/respip/respip.h \
$(srcdir)/services/listen_dnsport.h
modstack.lo modstack.o: $(srcdir)/services/modstack.c config.h $(srcdir)/services/modstack.h \ modstack.lo modstack.o: $(srcdir)/services/modstack.c config.h $(srcdir)/services/modstack.h \
$(srcdir)/util/module.h $(srcdir)/util/storage/lruhash.h $(srcdir)/util/locks.h $(srcdir)/util/log.h \ $(srcdir)/util/module.h $(srcdir)/util/storage/lruhash.h $(srcdir)/util/locks.h $(srcdir)/util/log.h \
$(srcdir)/util/data/msgreply.h $(srcdir)/util/data/packed_rrset.h $(srcdir)/util/data/msgparse.h \ $(srcdir)/util/data/msgreply.h $(srcdir)/util/data/packed_rrset.h $(srcdir)/util/data/msgparse.h \
@ -870,7 +874,7 @@ netevent.lo netevent.o: $(srcdir)/util/netevent.c config.h $(srcdir)/util/neteve
$(srcdir)/util/data/msgreply.h $(srcdir)/util/data/packed_rrset.h $(srcdir)/util/data/msgparse.h \ $(srcdir)/util/data/msgreply.h $(srcdir)/util/data/packed_rrset.h $(srcdir)/util/data/msgparse.h \
$(srcdir)/sldns/pkthdr.h $(srcdir)/sldns/rrdef.h $(srcdir)/util/tube.h $(srcdir)/services/mesh.h \ $(srcdir)/sldns/pkthdr.h $(srcdir)/sldns/rrdef.h $(srcdir)/util/tube.h $(srcdir)/services/mesh.h \
$(srcdir)/services/modstack.h $(srcdir)/sldns/sbuffer.h $(srcdir)/sldns/str2wire.h $(srcdir)/dnstap/dnstap.h \ $(srcdir)/services/modstack.h $(srcdir)/sldns/sbuffer.h $(srcdir)/sldns/str2wire.h $(srcdir)/dnstap/dnstap.h \
\ $(srcdir)/services/listen_dnsport.h \
net_help.lo net_help.o: $(srcdir)/util/net_help.c config.h $(srcdir)/util/net_help.h $(srcdir)/util/log.h \ net_help.lo net_help.o: $(srcdir)/util/net_help.c config.h $(srcdir)/util/net_help.h $(srcdir)/util/log.h \
$(srcdir)/util/data/dname.h $(srcdir)/util/storage/lruhash.h $(srcdir)/util/locks.h $(srcdir)/util/module.h \ $(srcdir)/util/data/dname.h $(srcdir)/util/storage/lruhash.h $(srcdir)/util/locks.h $(srcdir)/util/module.h \

View file

@ -1,3 +1,6 @@
11 January 2018: Wouter
- Initial commit for out-of-order processing for TCP and TLS.
9 January 2018: Wouter 9 January 2018: Wouter
- Log query name for looping module errors. - Log query name for looping module errors.

View file

@ -53,6 +53,8 @@
#include "util/config_file.h" #include "util/config_file.h"
#include "util/net_help.h" #include "util/net_help.h"
#include "sldns/sbuffer.h" #include "sldns/sbuffer.h"
#include "services/mesh.h"
#include "util/fptr_wlist.h"
#ifdef HAVE_NETDB_H #ifdef HAVE_NETDB_H
#include <netdb.h> #include <netdb.h>
@ -1276,11 +1278,13 @@ listen_create(struct comm_base* base, struct listen_port* ports,
ports->ftype == listen_type_tcp_dnscrypt) ports->ftype == listen_type_tcp_dnscrypt)
cp = comm_point_create_tcp(base, ports->fd, cp = comm_point_create_tcp(base, ports->fd,
tcp_accept_count, tcp_idle_timeout, tcp_accept_count, tcp_idle_timeout,
tcp_conn_limit, bufsize, cb, cb_arg); tcp_conn_limit, bufsize, front->udp_buff,
cb, cb_arg);
else if(ports->ftype == listen_type_ssl) { else if(ports->ftype == listen_type_ssl) {
cp = comm_point_create_tcp(base, ports->fd, cp = comm_point_create_tcp(base, ports->fd,
tcp_accept_count, tcp_idle_timeout, tcp_accept_count, tcp_idle_timeout,
tcp_conn_limit, bufsize, cb, cb_arg); tcp_conn_limit, bufsize, front->udp_buff,
cb, cb_arg);
cp->ssl = sslctx; cp->ssl = sslctx;
} else if(ports->ftype == listen_type_udpancil || } else if(ports->ftype == listen_type_udpancil ||
ports->ftype == listen_type_udpancil_dnscrypt) ports->ftype == listen_type_udpancil_dnscrypt)
@ -1508,3 +1512,311 @@ void listen_start_accept(struct listen_dnsport* listen)
} }
} }
struct tcp_req_info*
tcp_req_info_create(struct sldns_buffer* spoolbuf)
{
struct tcp_req_info* req = (struct tcp_req_info*)malloc(sizeof(*req));
if(!req) {
log_err("malloc failure for new stream outoforder processing structure");
return NULL;
}
memset(req, 0, sizeof(*req));
req->spool_buffer = spoolbuf;
return req;
}
void
tcp_req_info_delete(struct tcp_req_info* req)
{
if(!req) return;
tcp_req_info_clear(req);
/* cp is pointer back to commpoint that owns this struct and
* called delete on us */
/* spool_buffer is shared udp buffer, not deleted here */
free(req);
}
void tcp_req_info_clear(struct tcp_req_info* req)
{
struct tcp_req_open_item* open, *nopen;
struct tcp_req_done_item* item, *nitem;
if(!req) return;
/* free outstanding request mesh reply entries */
open = req->open_req_list;
while(open) {
nopen = open->next;
mesh_state_remove_reply(open->mesh, open->mesh_state, req->cp);
free(open);
open = nopen;
}
req->open_req_list = NULL;
req->num_open_req = 0;
/* free pending writable result packets */
item = req->done_req_list;
while(item) {
nitem = item->next;
free(item->buf);
free(item);
item = nitem;
}
req->done_req_list = NULL;
req->num_done_req = 0;
req->read_is_closed = 0;
}
void
tcp_req_info_remove_mesh_state(struct tcp_req_info* req, struct mesh_state* m)
{
struct tcp_req_open_item* open, *prev = NULL;
if(!req || !m) return;
open = req->open_req_list;
while(open) {
if(open->mesh_state == m) {
struct tcp_req_open_item* next;
if(prev) prev->next = open->next;
else req->open_req_list = open->next;
/* caller has to manage the mesh state reply entry */
next = open->next;
free(open);
req->num_open_req --;
/* prev = prev; */
open = next;
continue;
}
prev = open;
open = open->next;
}
}
/** number of simultaneous requests a client can have */
#define TCP_MAX_REQ_SIMULTANEOUS 10
/** setup listening for read or write */
static void
tcp_req_info_setup_listen(struct tcp_req_info* req)
{
int wr = 0;
int rd = 0;
if(req->cp->tcp_byte_count != 0) {
/* cannot change, halfway through */
return;
}
if(!req->cp->tcp_is_reading)
wr = 1;
if(req->num_open_req + req->num_done_req < TCP_MAX_REQ_SIMULTANEOUS &&
!req->read_is_closed)
rd = 1;
if(wr) {
req->cp->tcp_is_reading = 0;
comm_point_start_listening(req->cp, -1,
req->cp->tcp_timeout_msec);
} else if(rd) {
req->cp->tcp_is_reading = 1;
comm_point_start_listening(req->cp, -1,
req->cp->tcp_timeout_msec);
} else {
comm_point_start_listening(req->cp, -1,
req->cp->tcp_timeout_msec);
comm_point_listen_for_rw(req->cp, 0, 0);
}
}
/** remove first item from list of pending results */
static struct tcp_req_done_item*
tcp_req_info_pop_done(struct tcp_req_info* req)
{
struct tcp_req_done_item* item;
log_assert(req->num_done_req > 0 && req->done_req_list);
item = req->done_req_list;
req->done_req_list = req->done_req_list->next;
req->num_done_req --;
return item;
}
/** Send given buffer and setup to write */
static void
tcp_req_info_start_write_buf(struct tcp_req_info* req, uint8_t* buf,
size_t len)
{
sldns_buffer_clear(req->cp->buffer);
sldns_buffer_write(req->cp->buffer, buf, len);
sldns_buffer_flip(req->cp->buffer);
req->cp->tcp_is_reading = 0; /* we are now writing */
}
/** pick up the next result and start writing it to the channel */
static void
tcp_req_pickup_next_result(struct tcp_req_info* req)
{
if(req->num_done_req > 0) {
/* unlist the done item from the list of pending results */
struct tcp_req_done_item* item = tcp_req_info_pop_done(req);
tcp_req_info_start_write_buf(req, item->buf, item->len);
free(item->buf);
free(item);
}
}
/** the read channel has closed */
int
tcp_req_info_handle_read_close(struct tcp_req_info* req)
{
verbose(VERB_ALGO, "tcp channel read side closed %d", req->cp->fd);
if(req->num_done_req != 0) {
tcp_req_pickup_next_result(req);
tcp_req_info_setup_listen(req);
return 1;
}
if(req->num_open_req == 0 && req->num_done_req == 0)
return 0;
req->read_is_closed = 1;
tcp_req_info_setup_listen(req);
return 1;
}
void
tcp_req_info_handle_writedone(struct tcp_req_info* req)
{
/* back to reading state, we finished this write event */
sldns_buffer_clear(req->cp->buffer);
req->cp->tcp_is_reading = 1;
/* see if another result needs writing */
tcp_req_pickup_next_result(req);
/* see if there is more to write, if not stop_listening for writing */
/* see if new requests are allowed, if so, start_listening
* for reading */
tcp_req_info_setup_listen(req);
}
void
tcp_req_info_handle_readdone(struct tcp_req_info* req)
{
struct comm_point* c = req->cp;
/* we want to read up several requests, unless there are
* pending answers */
req->is_drop = 0;
req->is_reply = 0;
req->in_worker_handle = 1;
/* handle the current request, */
fptr_ok(fptr_whitelist_comm_point(c->callback));
if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &c->repinfo) ) {
req->in_worker_handle = 0;
/* there is an answer, put it up. It is already in the
* c->buffer, just send it. */
/* since we were just reading a query, the channel is
* clear to write to */
send_it:
c->tcp_is_reading = 0;
comm_point_start_listening(c, -1, c->tcp_timeout_msec);
return;
}
req->in_worker_handle = 0;
/* it should be waiting in the mesh for recursion.
* If mesh failed(formerr) and called commpoint_drop_reply. Then the
* mesh state has been cleared. */
if(req->is_drop) {
return;
}
/* If mesh failed(mallocfail) and called commpoint_send_reply with
* something like servfail then we pick up that reply below. */
if(req->is_reply) {
goto send_it;
}
sldns_buffer_clear(req->cp->buffer);
/* if pending answers, pick up an answer and start sending it */
tcp_req_pickup_next_result(req);
/* if answers pending, start sending answers */
/* read more requests if we can have more requests */
tcp_req_info_setup_listen(req);
}
int
tcp_req_info_add_meshstate(struct tcp_req_info* req,
struct mesh_area* mesh, struct mesh_state* m)
{
struct tcp_req_open_item* item;
log_assert(req && mesh && m);
item = (struct tcp_req_open_item*)malloc(sizeof(*item));
if(!item) return 0;
item->next = req->open_req_list;
item->mesh = mesh;
item->mesh_state = m;
req->open_req_list = item;
req->num_open_req++;
return 1;
}
/** Add a result to the result list. At the end. */
static int
tcp_req_info_add_result(struct tcp_req_info* req, uint8_t* buf, size_t len)
{
struct tcp_req_done_item* last = NULL;
struct tcp_req_done_item* item;
/* find last element */
last = req->done_req_list;
while(last && last->next)
last = last->next;
/* create new element */
item = (struct tcp_req_done_item*)malloc(sizeof(*item));
if(!item) {
return 0;
}
item->next = NULL;
item->len = len;
item->buf = memdup(buf, len);
if(!item->buf) {
free(item);
return 0;
}
/* link in */
if(last) last->next = item;
else req->done_req_list = item;
req->num_done_req++;
return 1;
}
void
tcp_req_info_send_reply(struct tcp_req_info* req)
{
if(req->in_worker_handle) {
/* It is in the right buffer to answer straight away */
req->is_reply = 1;
return;
}
/* now that the query has been handled, that mesh_reply entry
* should be removed, from the tcp_req_info list */
/* TODO: find it, need mstate ptr */
/* see if we can send it straight away (we are not doing
* anything else). If so, copy to buffer and start */
if(req->cp->tcp_is_reading && req->cp->tcp_byte_count == 0) {
/* buffer is free, and was ready to read new query into,
* but we are now going to use it to send this answer */
tcp_req_info_start_write_buf(req,
sldns_buffer_begin(req->spool_buffer),
sldns_buffer_limit(req->spool_buffer));
/* switch to listen to write events */
comm_point_stop_listening(req->cp);
comm_point_start_listening(req->cp, -1,
req->cp->tcp_timeout_msec);
return;
}
/* queue up the answer behind the others already pending */
if(!tcp_req_info_add_result(req, sldns_buffer_begin(req->spool_buffer),
sldns_buffer_limit(req->spool_buffer))) {
log_err("malloc failure adding reply to stream result list");
}
}

View file

@ -237,4 +237,127 @@ int create_tcp_accept_sock(struct addrinfo *addr, int v6only, int* noproto,
*/ */
int create_local_accept_sock(const char* path, int* noproto, int use_systemd); int create_local_accept_sock(const char* path, int* noproto, int use_systemd);
/**
* TCP request info. List of requests outstanding on the channel, that
* are asked for but not yet answered back.
*/
struct tcp_req_info {
/** the TCP comm point for this. Its buffer is used for read/write */
struct comm_point* cp;
/** the buffer to use to spool reply from mesh into,
* it can then be copied to the result list and written.
* it is a pointer to the shared udp buffer. */
struct sldns_buffer* spool_buffer;
/** are we in worker_handle function call (for recursion callback)*/
int in_worker_handle;
/** is the comm point dropped (by worker handle).
* That means we have to disconnect the channel. */
int is_drop;
/** is the comm point set to send_reply (by mesh new client in worker
* handle), if so answer is available in c.buffer */
int is_reply;
/** read channel has closed, just write pending results */
int read_is_closed;
/** number of outstanding requests */
int num_open_req;
/** list of outstanding requests */
struct tcp_req_open_item* open_req_list;
/** number of pending writeable results */
int num_done_req;
/** list of pending writable result packets, malloced one at a time */
struct tcp_req_done_item* done_req_list;
};
/**
* List of open items in TCP channel
*/
struct tcp_req_open_item {
/** next in list */
struct tcp_req_open_item* next;
/** the mesh area of the mesh_state */
struct mesh_area* mesh;
/** the mesh state */
struct mesh_state* mesh_state;
};
/**
* List of done items in TCP channel
*/
struct tcp_req_done_item {
/** next in list */
struct tcp_req_done_item* next;
/** the buffer with packet contents */
uint8_t* buf;
/** length of the buffer */
size_t len;
};
/**
* Create tcp request info structure that keeps track of open
* requests on the TCP channel that are resolved at the same time,
* and the pending results that have to get written back to that client.
* @param spoolbuf: shared buffer
* @return new structure or NULL on alloc failure.
*/
struct tcp_req_info* tcp_req_info_create(struct sldns_buffer* spoolbuf);
/**
* Delete tcp request structure. Called by owning commpoint.
* Removes mesh entry references and stored results from the lists.
* @param req: the tcp request info
*/
void tcp_req_info_delete(struct tcp_req_info* req);
/**
* Clear tcp request structure. Removes list entries, sets it up ready
* for the next connection.
* @param req: tcp request info structure.
*/
void tcp_req_info_clear(struct tcp_req_info* req);
/**
* Remove mesh state entry from list in tcp_req_info.
* caller has to manage the mesh state reply entry in the mesh state.
* @param req: the tcp req info that has the entry removed from the list.
* @param m: the state removed from the list.
*/
void tcp_req_info_remove_mesh_state(struct tcp_req_info* req,
struct mesh_state* m);
/**
* Handle write done of the last result packet
*/
void tcp_req_info_handle_writedone(struct tcp_req_info* req);
/**
* Handle read done of a new request from the client
*/
void tcp_req_info_handle_readdone(struct tcp_req_info* req);
/**
* Add mesh state to the tcp req list of open requests.
* So the comm_reply can be removed off the mesh reply list when
* the tcp channel has to be closed (for other reasons then that that
* request was done, eg. channel closed by client or some format error.
* @param req: tcp req info structure. It keeps track of the simultaneous
* requests and results on a tcp (or TLS) channel.
* @param mesh: mesh area for the state.
* @param m: mesh state to add.
* @return 0 on failure (malloc failure).
*/
int tcp_req_info_add_meshstate(struct tcp_req_info* req,
struct mesh_area* mesh, struct mesh_state* m);
/**
* Send reply on tcp simultaneous answer channel. May queue it up.
* @param req: request info structure.
*/
void tcp_req_info_send_reply(struct tcp_req_info* req);
/** the read channel has closed
* @param req: request. remaining queries are looked up and answered.
* @return zero if nothing to do, just close the tcp.
*/
int tcp_req_info_handle_read_close(struct tcp_req_info* req);
#endif /* LISTEN_DNSPORT_H */ #endif /* LISTEN_DNSPORT_H */

View file

@ -61,6 +61,7 @@
#include "services/localzone.h" #include "services/localzone.h"
#include "util/data/dname.h" #include "util/data/dname.h"
#include "respip/respip.h" #include "respip/respip.h"
#include "services/listen_dnsport.h"
/** subtract timers and the values do not overflow or become negative */ /** subtract timers and the values do not overflow or become negative */
static void static void
@ -429,6 +430,7 @@ void mesh_new_client(struct mesh_area* mesh, struct query_info* qinfo,
/* add reply to s */ /* add reply to s */
if(!mesh_state_add_reply(s, edns, rep, qid, qflags, qinfo)) { if(!mesh_state_add_reply(s, edns, rep, qid, qflags, qinfo)) {
log_err("mesh_new_client: out of memory; SERVFAIL"); log_err("mesh_new_client: out of memory; SERVFAIL");
servfail_mem:
if(!inplace_cb_reply_servfail_call(mesh->env, qinfo, &s->s, if(!inplace_cb_reply_servfail_call(mesh->env, qinfo, &s->s,
NULL, LDNS_RCODE_SERVFAIL, edns, rep, mesh->env->scratch)) NULL, LDNS_RCODE_SERVFAIL, edns, rep, mesh->env->scratch))
edns->opt_list = NULL; edns->opt_list = NULL;
@ -439,6 +441,12 @@ void mesh_new_client(struct mesh_area* mesh, struct query_info* qinfo,
mesh_state_delete(&s->s); mesh_state_delete(&s->s);
return; return;
} }
if(rep->c->tcp_req_info) {
if(!tcp_req_info_add_meshstate(rep->c->tcp_req_info, mesh, s)) {
log_err("mesh_new_client: out of memory add tcpreqinfo");
goto servfail_mem;
}
}
/* update statistics */ /* update statistics */
if(was_detached) { if(was_detached) {
log_assert(mesh->num_detached_states > 0); log_assert(mesh->num_detached_states > 0);
@ -1031,11 +1039,14 @@ mesh_do_callback(struct mesh_state* m, int rcode, struct reply_info* rep,
* @param rcode: if not 0, error code. * @param rcode: if not 0, error code.
* @param rep: reply to send (or NULL if rcode is set). * @param rep: reply to send (or NULL if rcode is set).
* @param r: reply entry * @param r: reply entry
* @param r_buffer: buffer to use for reply entry.
* @param prev: previous reply, already has its answer encoded in buffer. * @param prev: previous reply, already has its answer encoded in buffer.
* @param prev_buffer: buffer for previous reply.
*/ */
static void static void
mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep, mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep,
struct mesh_reply* r, struct mesh_reply* prev) struct mesh_reply* r, struct sldns_buffer* r_buffer,
struct mesh_reply* prev, struct sldns_buffer* prev_buffer)
{ {
struct timeval end_time; struct timeval end_time;
struct timeval duration; struct timeval duration;
@ -1063,7 +1074,7 @@ mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep,
* and still reuse the previous answer if they are the same, but that * and still reuse the previous answer if they are the same, but that
* would be complicated and error prone for the relatively minor case. * would be complicated and error prone for the relatively minor case.
* So we err on the side of safety. */ * So we err on the side of safety. */
if(prev && prev->qflags == r->qflags && if(prev && prev_buffer && prev->qflags == r->qflags &&
!prev->local_alias && !r->local_alias && !prev->local_alias && !r->local_alias &&
prev->edns.edns_present == r->edns.edns_present && prev->edns.edns_present == r->edns.edns_present &&
prev->edns.bits == r->edns.bits && prev->edns.bits == r->edns.bits &&
@ -1071,13 +1082,11 @@ mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep,
edns_opt_list_compare(prev->edns.opt_list, r->edns.opt_list) edns_opt_list_compare(prev->edns.opt_list, r->edns.opt_list)
== 0) { == 0) {
/* if the previous reply is identical to this one, fix ID */ /* if the previous reply is identical to this one, fix ID */
if(prev->query_reply.c->buffer != r->query_reply.c->buffer) if(prev_buffer != r_buffer)
sldns_buffer_copy(r->query_reply.c->buffer, sldns_buffer_copy(r_buffer, prev_buffer);
prev->query_reply.c->buffer); sldns_buffer_write_at(r_buffer, 0, &r->qid, sizeof(uint16_t));
sldns_buffer_write_at(r->query_reply.c->buffer, 0, sldns_buffer_write_at(r_buffer, 12, r->qname,
&r->qid, sizeof(uint16_t)); m->s.qinfo.qname_len);
sldns_buffer_write_at(r->query_reply.c->buffer, 12,
r->qname, m->s.qinfo.qname_len);
comm_point_send_reply(&r->query_reply); comm_point_send_reply(&r->query_reply);
} else if(rcode) { } else if(rcode) {
m->s.qinfo.qname = r->qname; m->s.qinfo.qname = r->qname;
@ -1091,8 +1100,8 @@ mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep,
&r->edns, NULL, m->s.region)) &r->edns, NULL, m->s.region))
r->edns.opt_list = NULL; r->edns.opt_list = NULL;
} }
error_encode(r->query_reply.c->buffer, rcode, &m->s.qinfo, error_encode(r_buffer, rcode, &m->s.qinfo, r->qid,
r->qid, r->qflags, &r->edns); r->qflags, &r->edns);
comm_point_send_reply(&r->query_reply); comm_point_send_reply(&r->query_reply);
} else { } else {
size_t udp_size = r->edns.udp_size; size_t udp_size = r->edns.udp_size;
@ -1108,16 +1117,15 @@ mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep,
m->s.env->cfg, r->query_reply.c, m->s.env->cfg, r->query_reply.c,
m->s.region) || m->s.region) ||
!reply_info_answer_encode(&m->s.qinfo, rep, r->qid, !reply_info_answer_encode(&m->s.qinfo, rep, r->qid,
r->qflags, r->query_reply.c->buffer, 0, 1, r->qflags, r_buffer, 0, 1, m->s.env->scratch,
m->s.env->scratch, udp_size, &r->edns, udp_size, &r->edns, (int)(r->edns.bits & EDNS_DO),
(int)(r->edns.bits & EDNS_DO), secure)) secure))
{ {
if(!inplace_cb_reply_servfail_call(m->s.env, &m->s.qinfo, &m->s, if(!inplace_cb_reply_servfail_call(m->s.env, &m->s.qinfo, &m->s,
rep, LDNS_RCODE_SERVFAIL, &r->edns, NULL, m->s.region)) rep, LDNS_RCODE_SERVFAIL, &r->edns, NULL, m->s.region))
r->edns.opt_list = NULL; r->edns.opt_list = NULL;
error_encode(r->query_reply.c->buffer, error_encode(r_buffer, LDNS_RCODE_SERVFAIL,
LDNS_RCODE_SERVFAIL, &m->s.qinfo, r->qid, &m->s.qinfo, r->qid, r->qflags, &r->edns);
r->qflags, &r->edns);
} }
r->edns = edns_bak; r->edns = edns_bak;
comm_point_send_reply(&r->query_reply); comm_point_send_reply(&r->query_reply);
@ -1132,19 +1140,17 @@ mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep,
timeval_add(&m->s.env->mesh->replies_sum_wait, &duration); timeval_add(&m->s.env->mesh->replies_sum_wait, &duration);
timehist_insert(m->s.env->mesh->histogram, &duration); timehist_insert(m->s.env->mesh->histogram, &duration);
if(m->s.env->cfg->stat_extended) { if(m->s.env->cfg->stat_extended) {
uint16_t rc = FLAGS_GET_RCODE(sldns_buffer_read_u16_at(r-> uint16_t rc = FLAGS_GET_RCODE(sldns_buffer_read_u16_at(
query_reply.c->buffer, 2)); r_buffer, 2));
if(secure) m->s.env->mesh->ans_secure++; if(secure) m->s.env->mesh->ans_secure++;
m->s.env->mesh->ans_rcode[ rc ] ++; m->s.env->mesh->ans_rcode[ rc ] ++;
if(rc == 0 && LDNS_ANCOUNT(sldns_buffer_begin(r-> if(rc == 0 && LDNS_ANCOUNT(sldns_buffer_begin(r_buffer)) == 0)
query_reply.c->buffer)) == 0)
m->s.env->mesh->ans_nodata++; m->s.env->mesh->ans_nodata++;
} }
/* Log reply sent */ /* Log reply sent */
if(m->s.env->cfg->log_replies) { if(m->s.env->cfg->log_replies) {
log_reply_info(0, &m->s.qinfo, &r->query_reply.addr, log_reply_info(0, &m->s.qinfo, &r->query_reply.addr,
r->query_reply.addrlen, duration, 0, r->query_reply.addrlen, duration, 0, r_buffer);
r->query_reply.c->buffer);
} }
} }
@ -1152,6 +1158,7 @@ void mesh_query_done(struct mesh_state* mstate)
{ {
struct mesh_reply* r; struct mesh_reply* r;
struct mesh_reply* prev = NULL; struct mesh_reply* prev = NULL;
struct sldns_buffer* prev_buffer = NULL;
struct mesh_cb* c; struct mesh_cb* c;
struct reply_info* rep = (mstate->s.return_msg? struct reply_info* rep = (mstate->s.return_msg?
mstate->s.return_msg->rep:NULL); mstate->s.return_msg->rep:NULL);
@ -1180,9 +1187,15 @@ void mesh_query_done(struct mesh_state* mstate)
if(mstate->s.is_drop) if(mstate->s.is_drop)
comm_point_drop_reply(&r->query_reply); comm_point_drop_reply(&r->query_reply);
else { else {
struct sldns_buffer* r_buffer = r->query_reply.c->buffer;
if(r->query_reply.c->tcp_req_info)
r_buffer = r->query_reply.c->tcp_req_info->spool_buffer;
mesh_send_reply(mstate, mstate->s.return_rcode, rep, mesh_send_reply(mstate, mstate->s.return_rcode, rep,
r, prev); r, r_buffer, prev, prev_buffer);
if(r->query_reply.c->tcp_req_info)
tcp_req_info_remove_mesh_state(r->query_reply.c->tcp_req_info, mstate);
prev = r; prev = r;
prev_buffer = r_buffer;
} }
} }
mstate->replies_sent = 1; mstate->replies_sent = 1;
@ -1613,3 +1626,36 @@ void mesh_list_remove(struct mesh_state* m, struct mesh_state** fp,
m->prev->next = m->next; m->prev->next = m->next;
else *fp = m->next; else *fp = m->next;
} }
void mesh_state_remove_reply(struct mesh_area* mesh, struct mesh_state* m,
struct comm_point* cp)
{
struct mesh_reply* n, *prev = NULL;
n = m->reply_list;
if(!n) return; /* nothing to remove, also no accounting needed */
while(n) {
if(n->query_reply.c == cp) {
/* unlink it */
if(prev) prev->next = n->next;
else m->reply_list = n->next;
/* delete it, but allocated in m region */
mesh->num_reply_addrs--;
/* prev = prev; */
n = n->next;
continue;
}
prev = n;
n = n->next;
}
/* it was not detached (because it had a reply list), could be now */
if(!m->reply_list && !m->cb_list
&& m->super_set.count == 0) {
mesh->num_detached_states++;
}
/* if not replies any more in mstate, it is no longer a reply_state */
if(!m->reply_list && !m->cb_list) {
log_assert(mesh->num_reply_states > 0);
mesh->num_reply_states--;
}
}

View file

@ -633,4 +633,14 @@ void mesh_list_insert(struct mesh_state* m, struct mesh_state** fp,
void mesh_list_remove(struct mesh_state* m, struct mesh_state** fp, void mesh_list_remove(struct mesh_state* m, struct mesh_state** fp,
struct mesh_state** lp); struct mesh_state** lp);
/**
* Remove mesh reply entry from the reply entry list. Searches for
* the repinfo pointer.
* @param mesh: to update the counters.
* @param m: the mesh state.
* @param cp: the commpoint to remove from the list.
*/
void mesh_state_remove_reply(struct mesh_area* mesh, struct mesh_state* m,
struct comm_point* cp);
#endif /* SERVICES_MESH_H */ #endif /* SERVICES_MESH_H */

View file

@ -1802,4 +1802,18 @@ int outnet_tcp_connect(int ATTR_UNUSED(s), struct sockaddr_storage* ATTR_UNUSED(
return 0; return 0;
} }
int tcp_req_info_add_meshstate(struct tcp_req_info* ATTR_UNUSED(req),
struct mesh_area* ATTR_UNUSED(mesh), struct mesh_state* ATTR_UNUSED(m))
{
log_assert(0);
return 0;
}
void
tcp_req_info_remove_mesh_state(struct tcp_req_info* ATTR_UNUSED(req),
struct mesh_state* ATTR_UNUSED(m))
{
log_assert(0);
}
/*********** End of Dummy routines ***********/ /*********** End of Dummy routines ***********/

View file

@ -8,3 +8,4 @@
. ../common.sh . ../common.sh
kill_pid $FWD_PID kill_pid $FWD_PID
kill_pid $UNBOUND_PID kill_pid $UNBOUND_PID
cat unbound.log

View file

@ -6,9 +6,9 @@
# check what sort of netcat we have # check what sort of netcat we have
if nc -h 2>&1 | grep "q secs"; then if nc -h 2>&1 | grep "q secs"; then
ncopt="-q 3 -w 2" ncopt="-q 3 -i 2"
else else
ncopt="-w 2" ncopt="-i 2"
fi fi
PRE="../.." PRE="../.."

View file

@ -50,6 +50,7 @@
#include "sldns/str2wire.h" #include "sldns/str2wire.h"
#include "dnstap/dnstap.h" #include "dnstap/dnstap.h"
#include "dnscrypt/dnscrypt.h" #include "dnscrypt/dnscrypt.h"
#include "services/listen_dnsport.h"
#ifdef HAVE_OPENSSL_SSL_H #ifdef HAVE_OPENSSL_SSL_H
#include <openssl/ssl.h> #include <openssl/ssl.h>
#endif #endif
@ -150,7 +151,8 @@ struct internal_signal {
/** create a tcp handler with a parent */ /** create a tcp handler with a parent */
static struct comm_point* comm_point_create_tcp_handler( static struct comm_point* comm_point_create_tcp_handler(
struct comm_base *base, struct comm_point* parent, size_t bufsize, struct comm_base *base, struct comm_point* parent, size_t bufsize,
comm_point_callback_type* callback, void* callback_arg); struct sldns_buffer* spoolbuf, comm_point_callback_type* callback,
void* callback_arg);
/* -------- End of local definitions -------- */ /* -------- End of local definitions -------- */
@ -988,7 +990,11 @@ tcp_callback_writer(struct comm_point* c)
c->tcp_byte_count = 0; c->tcp_byte_count = 0;
/* switch from listening(write) to listening(read) */ /* switch from listening(write) to listening(read) */
comm_point_stop_listening(c); comm_point_stop_listening(c);
comm_point_start_listening(c, -1, -1); if(c->tcp_req_info) {
tcp_req_info_handle_writedone(c->tcp_req_info);
} else {
comm_point_start_listening(c, -1, -1);
}
} }
/** do the callback when reading is done */ /** do the callback when reading is done */
@ -1002,9 +1008,13 @@ tcp_callback_reader(struct comm_point* c)
c->tcp_byte_count = 0; c->tcp_byte_count = 0;
if(c->type == comm_tcp) if(c->type == comm_tcp)
comm_point_stop_listening(c); comm_point_stop_listening(c);
fptr_ok(fptr_whitelist_comm_point(c->callback)); if(c->tcp_req_info) {
if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &c->repinfo) ) { tcp_req_info_handle_readdone(c->tcp_req_info);
comm_point_start_listening(c, -1, c->tcp_timeout_msec); } else {
fptr_ok(fptr_whitelist_comm_point(c->callback));
if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &c->repinfo) ) {
comm_point_start_listening(c, -1, c->tcp_timeout_msec);
}
} }
} }
@ -1163,6 +1173,8 @@ ssl_handle_read(struct comm_point* c)
c->tcp_byte_count))) <= 0) { c->tcp_byte_count))) <= 0) {
int want = SSL_get_error(c->ssl, r); int want = SSL_get_error(c->ssl, r);
if(want == SSL_ERROR_ZERO_RETURN) { if(want == SSL_ERROR_ZERO_RETURN) {
if(c->tcp_req_info)
return tcp_req_info_handle_read_close(c->tcp_req_info);
return 0; /* shutdown, closed */ return 0; /* shutdown, closed */
} else if(want == SSL_ERROR_WANT_READ) { } else if(want == SSL_ERROR_WANT_READ) {
ub_winsock_tcp_wouldblock(c->ev->ev, UB_EV_READ); ub_winsock_tcp_wouldblock(c->ev->ev, UB_EV_READ);
@ -1205,6 +1217,8 @@ ssl_handle_read(struct comm_point* c)
if(r <= 0) { if(r <= 0) {
int want = SSL_get_error(c->ssl, r); int want = SSL_get_error(c->ssl, r);
if(want == SSL_ERROR_ZERO_RETURN) { if(want == SSL_ERROR_ZERO_RETURN) {
if(c->tcp_req_info)
return tcp_req_info_handle_read_close(c->tcp_req_info);
return 0; /* shutdown, closed */ return 0; /* shutdown, closed */
} else if(want == SSL_ERROR_WANT_READ) { } else if(want == SSL_ERROR_WANT_READ) {
ub_winsock_tcp_wouldblock(c->ev->ev, UB_EV_READ); ub_winsock_tcp_wouldblock(c->ev->ev, UB_EV_READ);
@ -1365,9 +1379,11 @@ comm_point_tcp_handle_read(int fd, struct comm_point* c, int short_ok)
/* read length bytes */ /* read length bytes */
r = recv(fd,(void*)sldns_buffer_at(c->buffer,c->tcp_byte_count), r = recv(fd,(void*)sldns_buffer_at(c->buffer,c->tcp_byte_count),
sizeof(uint16_t)-c->tcp_byte_count, 0); sizeof(uint16_t)-c->tcp_byte_count, 0);
if(r == 0) if(r == 0) {
if(c->tcp_req_info)
return tcp_req_info_handle_read_close(c->tcp_req_info);
return 0; return 0;
else if(r == -1) { } else if(r == -1) {
#ifndef USE_WINSOCK #ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN) if(errno == EINTR || errno == EAGAIN)
return 1; return 1;
@ -1416,6 +1432,8 @@ comm_point_tcp_handle_read(int fd, struct comm_point* c, int short_ok)
r = recv(fd, (void*)sldns_buffer_current(c->buffer), r = recv(fd, (void*)sldns_buffer_current(c->buffer),
sldns_buffer_remaining(c->buffer), 0); sldns_buffer_remaining(c->buffer), 0);
if(r == 0) { if(r == 0) {
if(c->tcp_req_info)
return tcp_req_info_handle_read_close(c->tcp_req_info);
return 0; return 0;
} else if(r == -1) { } else if(r == -1) {
#ifndef USE_WINSOCK #ifndef USE_WINSOCK
@ -2523,7 +2541,8 @@ comm_point_create_udp_ancil(struct comm_base *base, int fd,
static struct comm_point* static struct comm_point*
comm_point_create_tcp_handler(struct comm_base *base, comm_point_create_tcp_handler(struct comm_base *base,
struct comm_point* parent, size_t bufsize, struct comm_point* parent, size_t bufsize,
comm_point_callback_type* callback, void* callback_arg) struct sldns_buffer* spoolbuf, comm_point_callback_type* callback,
void* callback_arg)
{ {
struct comm_point* c = (struct comm_point*)calloc(1, struct comm_point* c = (struct comm_point*)calloc(1,
sizeof(struct comm_point)); sizeof(struct comm_point));
@ -2579,6 +2598,20 @@ comm_point_create_tcp_handler(struct comm_base *base,
c->repinfo.c = c; c->repinfo.c = c;
c->callback = callback; c->callback = callback;
c->cb_arg = callback_arg; c->cb_arg = callback_arg;
if(spoolbuf) {
c->tcp_req_info = tcp_req_info_create(spoolbuf);
if(!c->tcp_req_info) {
log_err("could not create tcp commpoint");
sldns_buffer_free(c->buffer);
free(c->timeout);
free(c->ev);
free(c);
return NULL;
}
c->tcp_req_info->cp = c;
c->tcp_do_close = 1;
c->tcp_do_toggle_rw = 0;
}
/* add to parent free list */ /* add to parent free list */
c->tcp_free = parent->tcp_free; c->tcp_free = parent->tcp_free;
parent->tcp_free = c; parent->tcp_free = c;
@ -2590,6 +2623,9 @@ comm_point_create_tcp_handler(struct comm_base *base,
{ {
log_err("could not basetset tcphdl event"); log_err("could not basetset tcphdl event");
parent->tcp_free = c->tcp_free; parent->tcp_free = c->tcp_free;
tcp_req_info_delete(c->tcp_req_info);
sldns_buffer_free(c->buffer);
free(c->timeout);
free(c->ev); free(c->ev);
free(c); free(c);
return NULL; return NULL;
@ -2600,7 +2636,8 @@ comm_point_create_tcp_handler(struct comm_base *base,
struct comm_point* struct comm_point*
comm_point_create_tcp(struct comm_base *base, int fd, int num, comm_point_create_tcp(struct comm_base *base, int fd, int num,
int idle_timeout, struct tcl_list* tcp_conn_limit, size_t bufsize, int idle_timeout, struct tcl_list* tcp_conn_limit, size_t bufsize,
comm_point_callback_type* callback, void* callback_arg) struct sldns_buffer* spoolbuf, comm_point_callback_type* callback,
void* callback_arg)
{ {
struct comm_point* c = (struct comm_point*)calloc(1, struct comm_point* c = (struct comm_point*)calloc(1,
sizeof(struct comm_point)); sizeof(struct comm_point));
@ -2667,7 +2704,7 @@ comm_point_create_tcp(struct comm_base *base, int fd, int num,
/* now prealloc the tcp handlers */ /* now prealloc the tcp handlers */
for(i=0; i<num; i++) { for(i=0; i<num; i++) {
c->tcp_handlers[i] = comm_point_create_tcp_handler(base, c->tcp_handlers[i] = comm_point_create_tcp_handler(base,
c, bufsize, callback, callback_arg); c, bufsize, spoolbuf, callback, callback_arg);
if(!c->tcp_handlers[i]) { if(!c->tcp_handlers[i]) {
comm_point_delete(c); comm_point_delete(c);
return NULL; return NULL;
@ -2949,6 +2986,8 @@ comm_point_close(struct comm_point* c)
} }
} }
tcl_close_connection(c->tcl_addr); tcl_close_connection(c->tcl_addr);
if(c->tcp_req_info)
tcp_req_info_clear(c->tcp_req_info);
/* close fd after removing from event lists, or epoll.. is messed up */ /* close fd after removing from event lists, or epoll.. is messed up */
if(c->fd != -1 && !c->do_not_close) { if(c->fd != -1 && !c->do_not_close) {
if(c->type == comm_tcp || c->type == comm_http) { if(c->type == comm_tcp || c->type == comm_http) {
@ -2992,6 +3031,9 @@ comm_point_delete(struct comm_point* c)
sldns_buffer_free(c->dnscrypt_buffer); sldns_buffer_free(c->dnscrypt_buffer);
} }
#endif #endif
if(c->tcp_req_info) {
tcp_req_info_delete(c->tcp_req_info);
}
} }
ub_event_free(c->ev->ev); ub_event_free(c->ev->ev);
free(c->ev); free(c->ev);
@ -3032,8 +3074,12 @@ comm_point_send_reply(struct comm_reply *repinfo)
dt_msg_send_client_response(repinfo->c->tcp_parent->dtenv, dt_msg_send_client_response(repinfo->c->tcp_parent->dtenv,
&repinfo->addr, repinfo->c->type, repinfo->c->buffer); &repinfo->addr, repinfo->c->type, repinfo->c->buffer);
#endif #endif
comm_point_start_listening(repinfo->c, -1, if(repinfo->c->tcp_req_info) {
repinfo->c->tcp_timeout_msec); tcp_req_info_send_reply(repinfo->c->tcp_req_info);
} else {
comm_point_start_listening(repinfo->c, -1,
repinfo->c->tcp_timeout_msec);
}
} }
} }
@ -3046,6 +3092,8 @@ comm_point_drop_reply(struct comm_reply* repinfo)
log_assert(repinfo->c->type != comm_tcp_accept); log_assert(repinfo->c->type != comm_tcp_accept);
if(repinfo->c->type == comm_udp) if(repinfo->c->type == comm_udp)
return; return;
if(repinfo->c->tcp_req_info)
repinfo->c->tcp_req_info->is_drop = 1;
reclaim_tcp_handler(repinfo->c); reclaim_tcp_handler(repinfo->c);
} }

View file

@ -268,6 +268,9 @@ struct comm_point {
/** the entry for the connection. */ /** the entry for the connection. */
struct tcl_addr* tcl_addr; struct tcl_addr* tcl_addr;
/** the structure to keep track of open requests on this channel */
struct tcp_req_info* tcp_req_info;
#ifdef USE_MSG_FASTOPEN #ifdef USE_MSG_FASTOPEN
/** used to track if the sendto() call should be done when using TFO. */ /** used to track if the sendto() call should be done when using TFO. */
int tcp_do_fastopen; int tcp_do_fastopen;
@ -455,6 +458,8 @@ struct comm_point* comm_point_create_udp_ancil(struct comm_base* base,
* @param idle_timeout: TCP idle timeout in ms. * @param idle_timeout: TCP idle timeout in ms.
* @param tcp_conn_limit: TCP connection limit info. * @param tcp_conn_limit: TCP connection limit info.
* @param bufsize: size of buffer to create for handlers. * @param bufsize: size of buffer to create for handlers.
* @param spoolbuf: shared spool buffer for tcp_req_info structures.
* or NULL to not create those structures in the tcp handlers.
* @param callback: callback function pointer for TCP handlers. * @param callback: callback function pointer for TCP handlers.
* @param callback_arg: will be passed to your callback function. * @param callback_arg: will be passed to your callback function.
* @return: returns the TCP listener commpoint. You can find the * @return: returns the TCP listener commpoint. You can find the
@ -464,7 +469,8 @@ struct comm_point* comm_point_create_udp_ancil(struct comm_base* base,
*/ */
struct comm_point* comm_point_create_tcp(struct comm_base* base, struct comm_point* comm_point_create_tcp(struct comm_base* base,
int fd, int num, int idle_timeout, struct tcl_list* tcp_conn_limit, int fd, int num, int idle_timeout, struct tcl_list* tcp_conn_limit,
size_t bufsize, comm_point_callback_type* callback, void* callback_arg); size_t bufsize, struct sldns_buffer* spoolbuf,
comm_point_callback_type* callback, void* callback_arg);
/** /**
* Create an outgoing TCP commpoint. No file descriptor is opened, left at -1. * Create an outgoing TCP commpoint. No file descriptor is opened, left at -1.