- Add serviced_query timer to send upstream queries outside of the mesh

flow to prevent race conditions.
This commit is contained in:
George Thessalonikefs 2022-01-25 00:01:43 +01:00
parent 8e76eb95a0
commit c3c0186658
4 changed files with 90 additions and 22 deletions

View file

@ -94,6 +94,16 @@ static void waiting_list_remove(struct outside_network* outnet,
static uint16_t tcp_select_id(struct outside_network* outnet,
struct reuse_tcp* reuse);
/** Perform serviced query UDP sending operation */
static int serviced_udp_send(struct serviced_query* sq, sldns_buffer* buff);
/** Send serviced query over TCP return false on initial failure */
static int serviced_tcp_send(struct serviced_query* sq, sldns_buffer* buff);
/** call the callbacks for a serviced query */
static void serviced_callbacks(struct serviced_query* sq, int error,
struct comm_point* c, struct comm_reply* rep);
int
pending_cmp(const void* key1, const void* key2)
{
@ -836,6 +846,7 @@ outnet_add_tcp_waiting_first(struct outside_network* outnet,
if(w->on_tcp_waiting_list)
return;
w->next_waiting = outnet->tcp_wait_first;
log_assert(w->next_waiting != w);
if(!outnet->tcp_wait_last)
outnet->tcp_wait_last = w;
outnet->tcp_wait_first = w;
@ -1271,6 +1282,8 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
}
}
if(w) {
log_assert(!w->on_tcp_waiting_list);
log_assert(!w->write_wait_queued);
reuse_tree_by_id_delete(&pend->reuse, w);
verbose(VERB_CLIENT, "outnet tcp callback query err %d buflen %d",
error, (int)sldns_buffer_limit(c->buffer));
@ -1330,7 +1343,7 @@ outnet_send_wait_udp(struct outside_network* outnet)
{
struct pending* pend;
/* process waiting queries */
while(outnet->udp_wait_first && outnet->unused_fds
while(outnet->udp_wait_first && outnet->unused_fds
&& !outnet->want_to_quit) {
pend = outnet->udp_wait_first;
outnet->udp_wait_first = pend->next_waiting;
@ -1339,8 +1352,10 @@ outnet_send_wait_udp(struct outside_network* outnet)
sldns_buffer_write(outnet->udp_buff, pend->pkt, pend->pkt_len);
sldns_buffer_flip(outnet->udp_buff);
free(pend->pkt); /* freeing now makes get_mem correct */
pend->pkt = NULL;
pend->pkt = NULL;
pend->pkt_len = 0;
log_assert(!pend->sq->busy);
pend->sq->busy = 1;
if(!randomize_and_send_udp(pend, outnet->udp_buff,
pend->timeout)) {
/* callback error on pending */
@ -1351,6 +1366,7 @@ outnet_send_wait_udp(struct outside_network* outnet)
}
pending_delete(outnet, pend);
}
pend->sq->busy = 0;
}
}
@ -1460,7 +1476,6 @@ calc_num46(char** ifs, int num_ifs, int do_ip4, int do_ip6,
(*num_ip4)++;
}
}
}
void
@ -1715,6 +1730,8 @@ serviced_node_del(rbnode_type* node, void* ATTR_UNUSED(arg))
{
struct serviced_query* sq = (struct serviced_query*)node;
alloc_reg_release(sq->alloc, sq->region);
if(sq->timer)
comm_timer_delete(sq->timer);
free(sq);
}
@ -2171,10 +2188,13 @@ pending_udp_query(struct serviced_query* sq, struct sldns_buffer* packet,
sq->outnet->udp_wait_last = pend;
return pend;
}
log_assert(!sq->busy);
sq->busy = 1;
if(!randomize_and_send_udp(pend, packet, timeout)) {
pending_delete(sq->outnet, pend);
return NULL;
}
sq->busy = 0;
return pend;
}
@ -2244,7 +2264,7 @@ reuse_tcp_select_id(struct reuse_tcp* reuse, struct outside_network* outnet)
}
/* equally pick a random unused element from the tree that is
* not in use. Pick a the n-th index of an ununused number,
* not in use. Pick a the n-th index of an unused number,
* then loop over the empty spaces in the tree and find it */
log_assert(reuse->tree_by_id.count < 0xffff);
select = ub_random_max(outnet->rnd, 0xffff - reuse->tree_by_id.count);
@ -2453,6 +2473,32 @@ lookup_serviced(struct outside_network* outnet, sldns_buffer* buff, int dnssec,
return (struct serviced_query*)rbtree_search(outnet->serviced, &key);
}
void
serviced_timer_cb(void* arg)
{
struct serviced_query* sq = (struct serviced_query*)arg;
struct outside_network* outnet = sq->outnet;
verbose(VERB_ALGO, "serviced send timer");
/* By the time this cb is called, if we don't have any registered
* callbacks for this serviced_query anymore; do not send. */
if(!sq->cblist)
goto delete;
/* perform first network action */
if(outnet->do_udp && !(sq->tcp_upstream || sq->ssl_upstream)) {
if(!serviced_udp_send(sq, outnet->udp_buff))
goto delete;
} else {
if(!serviced_tcp_send(sq, outnet->udp_buff))
goto delete;
}
/* Maybe by this time we don't have callbacks attached anymore. Don't
* proactively try to delete; let it run and maybe another callback
* will get attached by the time we get an answer. */
return;
delete:
serviced_callbacks(sq, NETEVENT_CLOSED, NULL, NULL);
}
/** Create new serviced entry */
static struct serviced_query*
serviced_create(struct outside_network* outnet, sldns_buffer* buff, int dnssec,
@ -2463,6 +2509,7 @@ serviced_create(struct outside_network* outnet, sldns_buffer* buff, int dnssec,
struct regional* region)
{
struct serviced_query* sq = (struct serviced_query*)malloc(sizeof(*sq));
struct timeval t;
#ifdef UNBOUND_DEBUG
rbnode_type* ins;
#endif
@ -2505,6 +2552,15 @@ serviced_create(struct outside_network* outnet, sldns_buffer* buff, int dnssec,
memcpy(&sq->addr, addr, addrlen);
sq->addrlen = addrlen;
sq->opt_list = opt_list;
sq->busy = 0;
sq->timer = comm_timer_create(outnet->base, serviced_timer_cb, sq);
if(!sq->timer) {
alloc_reg_release(alloc, region);
free(sq);
return NULL;
}
memset(&t, 0, sizeof(t));
comm_timer_set(sq->timer, &t);
sq->outnet = outnet;
sq->cblist = NULL;
sq->pending = NULL;
@ -2611,6 +2667,7 @@ serviced_delete(struct serviced_query* sq)
struct waiting_tcp* w = (struct waiting_tcp*)
sq->pending;
verbose(VERB_CLIENT, "serviced_delete: TCP");
log_assert(!(w->write_wait_queued && w->on_tcp_waiting_list));
/* if on stream-write-waiting list then
* remove from waiting list and waiting_tcp_delete */
if(w->write_wait_queued) {
@ -2624,6 +2681,10 @@ serviced_delete(struct serviced_query* sq)
struct pending_tcp* pend =
(struct pending_tcp*)w->next_waiting;
verbose(VERB_CLIENT, "serviced_delete: tcpreusekeep");
/* w needs to stay on tree_by_id to not assign
* the same ID; remove the callback since its
* serviced_query will be gone. */
w->cb = NULL;
if(!reuse_tcp_remove_serviced_keep(w, sq)) {
reuse_cb_and_decommission(sq->outnet,
pend, NETEVENT_CLOSED);
@ -2921,7 +2982,7 @@ serviced_tcp_callback(struct comm_point* c, void* arg, int error,
struct waiting_tcp* w = (struct waiting_tcp*)sq->pending;
struct pending_tcp* pend_tcp = NULL;
struct port_if* pi = NULL;
if(!w->on_tcp_waiting_list && w->next_waiting) {
if(w && !w->on_tcp_waiting_list && w->next_waiting) {
pend_tcp = (struct pending_tcp*)w->next_waiting;
pi = pend_tcp->pi;
}
@ -3017,8 +3078,11 @@ serviced_tcp_initiate(struct serviced_query* sq, sldns_buffer* buff)
sq->status==serviced_query_TCP_EDNS?"EDNS":"");
serviced_encode(sq, buff, sq->status == serviced_query_TCP_EDNS);
sq->last_sent_time = *sq->outnet->now_tv;
log_assert(!sq->busy);
sq->busy = 1;
sq->pending = pending_tcp_query(sq, buff, sq->outnet->tcp_auth_query_timeout,
serviced_tcp_callback, sq);
sq->busy = 0;
if(!sq->pending) {
/* delete from tree so that a retry by above layer does not
* clash with this entry */
@ -3050,8 +3114,11 @@ serviced_tcp_send(struct serviced_query* sq, sldns_buffer* buff)
} else {
timeout = sq->outnet->tcp_auth_query_timeout;
}
log_assert(!sq->busy);
sq->busy = 1;
sq->pending = pending_tcp_query(sq, buff, timeout,
serviced_tcp_callback, sq);
sq->busy = 0;
return sq->pending != NULL;
}
@ -3312,20 +3379,8 @@ outnet_serviced_query(struct outside_network* outnet,
serviced_node_del(&sq->node, NULL);
return NULL;
}
/* perform first network action */
if(outnet->do_udp && !(tcp_upstream || ssl_upstream)) {
if(!serviced_udp_send(sq, buff)) {
(void)rbtree_delete(outnet->serviced, sq);
serviced_node_del(&sq->node, NULL);
return NULL;
}
} else {
if(!serviced_tcp_send(sq, buff)) {
(void)rbtree_delete(outnet->serviced, sq);
serviced_node_del(&sq->node, NULL);
return NULL;
}
}
/* No network action at this point; it will be invoked with the
* serviced_query timer instead to run outside of the mesh. */
} else {
/* We don't need this region anymore. */
alloc_reg_release(env->alloc, region);
@ -3362,13 +3417,13 @@ callback_list_remove(struct serviced_query* sq, void* cb_arg)
void outnet_serviced_query_stop(struct serviced_query* sq, void* cb_arg)
{
if(!sq)
if(!sq)
return;
callback_list_remove(sq, cb_arg);
/* if callbacks() routine scheduled deletion, let it do that */
if(!sq->cblist && !sq->to_be_deleted) {
if(!sq->cblist && !sq->busy && !sq->to_be_deleted) {
(void)rbtree_delete(sq->outnet->serviced, sq);
serviced_delete(sq);
serviced_delete(sq);
}
}

View file

@ -519,6 +519,10 @@ struct serviced_query {
struct regional* region;
/** allocation service for the region */
struct alloc_cache* alloc;
/** flash timer to start the net I/O as a separate event */
struct comm_timer* timer;
/** true if serviced_query is currently doing net I/O and may block */
int busy;
};
/**
@ -792,6 +796,9 @@ void pending_udp_timer_delay_cb(void *arg);
/** callback for outgoing TCP timer event */
void outnet_tcptimer(void* arg);
/** callback to send serviced queries */
void serviced_timer_cb(void *arg);
/** callback for serviced query UDP answers */
int serviced_udp_callback(struct comm_point* c, void* arg, int error,
struct comm_reply* rep);

View file

@ -1442,6 +1442,11 @@ void pending_udp_timer_cb(void *ATTR_UNUSED(arg))
log_assert(0);
}
void serviced_timer_cb(void *ATTR_UNUSED(arg))
{
log_assert(0);
}
void pending_udp_timer_delay_cb(void *ATTR_UNUSED(arg))
{
log_assert(0);

View file

@ -138,6 +138,7 @@ fptr_whitelist_comm_timer(void (*fptr)(void*))
else if(fptr == &auth_xfer_probe_timer_callback) return 1;
else if(fptr == &auth_xfer_transfer_timer_callback) return 1;
else if(fptr == &mesh_serve_expired_callback) return 1;
else if(fptr == &serviced_timer_cb) return 1;
#ifdef USE_DNSTAP
else if(fptr == &mq_wakeup_cb) return 1;
#endif