diff --git a/services/outside_network.c b/services/outside_network.c index 71a7adebd..ee8275c7f 100644 --- a/services/outside_network.c +++ b/services/outside_network.c @@ -364,6 +364,23 @@ static struct waiting_tcp* reuse_write_wait_pop(struct reuse_tcp* reuse) if(!w) return NULL; log_assert(w->write_wait_queued); + log_assert(!w->write_wait_prev); + reuse->write_wait_first = w->write_wait_next; + if(w->write_wait_next) + w->write_wait_next->write_wait_prev = NULL; + else reuse->write_wait_last = NULL; + w->write_wait_queued = 0; + return w; +} + +/** remove the element from the writewait list */ +static void reuse_write_wait_remove(struct reuse_tcp* reuse, + struct waiting_tcp* w) +{ + if(!w) + return; + if(!w->write_wait_queued) + return; if(w->write_wait_prev) w->write_wait_prev->write_wait_next = w->write_wait_next; else reuse->write_wait_first = w->write_wait_next; @@ -371,7 +388,6 @@ static struct waiting_tcp* reuse_write_wait_pop(struct reuse_tcp* reuse) w->write_wait_next->write_wait_prev = w->write_wait_prev; else reuse->write_wait_last = w->write_wait_prev; w->write_wait_queued = 0; - return w; } /** push the element after the last on the writewait list */ @@ -608,6 +624,17 @@ outnet_tcp_take_into_use(struct waiting_tcp* w) return 1; } +/** call callback on waiting_tcp, if not NULL */ +static void +waiting_tcp_callback(struct waiting_tcp* w, struct comm_point* c, int error, + struct comm_reply* reply_info) +{ + if(w->cb) { + fptr_ok(fptr_whitelist_pending_tcp(w->cb)); + (void)(*w->cb)(c, w->cb_arg, error, reply_info); + } +} + /** see if buffers can be used to service TCP queries */ static void use_free_buffer(struct outside_network* outnet) @@ -641,11 +668,9 @@ use_free_buffer(struct outside_network* outnet) } } else { if(!outnet_tcp_take_into_use(w)) { - comm_point_callback_type* cb = w->cb; - void* cb_arg = w->cb_arg; + waiting_tcp_callback(w, NULL, NETEVENT_CLOSED, + NULL); waiting_tcp_delete(w); - fptr_ok(fptr_whitelist_pending_tcp(cb)); - (void)(*cb)(NULL, cb_arg, NETEVENT_CLOSED, NULL); } } } @@ -780,10 +805,7 @@ static void reuse_cb_writewait_for_failure(struct pending_tcp* pend, int err) struct waiting_tcp* w; w = pend->reuse.write_wait_first; while(w) { - comm_point_callback_type* cb = w->cb; - void* cb_arg = w->cb_arg; - fptr_ok(fptr_whitelist_pending_tcp(cb)); - (void)(*cb)(NULL, cb_arg, err, NULL); + waiting_tcp_callback(w, NULL, err, NULL); w = w->write_wait_next; } } @@ -798,10 +820,7 @@ static void reuse_cb_readwait_for_failure(struct pending_tcp* pend, int err) node = rbtree_first(&pend->reuse.tree_by_id); while(node && node != RBTREE_NULL) { struct waiting_tcp* w = (struct waiting_tcp*)node->key; - comm_point_callback_type* cb = w->cb; - void* cb_arg = w->cb_arg; - fptr_ok(fptr_whitelist_pending_tcp(cb)); - (void)(*cb)(NULL, cb_arg, err, NULL); + waiting_tcp_callback(w, NULL, err, NULL); node = rbtree_next(node); } } @@ -811,10 +830,7 @@ static void reuse_cb_curquery_for_failure(struct pending_tcp* pend, int err) { struct waiting_tcp* w = pend->query; if(w) { - comm_point_callback_type* cb = w->cb; - void* cb_arg = w->cb_arg; - fptr_ok(fptr_whitelist_pending_tcp(cb)); - (void)(*cb)(NULL, cb_arg, err, NULL); + waiting_tcp_callback(w, NULL, err, NULL); } } @@ -829,7 +845,15 @@ reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w) /** set timeout on tcp fd and setup read event to catch incoming dns msgs */ static void -reuse_tcp_setup_readtimeout(struct pending_tcp* pend_tcp) +reuse_tcp_setup_timeout(struct pending_tcp* pend_tcp) +{ + log_reuse_tcp(5, "reuse_tcp_setup_timeout", &pend_tcp->reuse); + comm_point_start_listening(pend_tcp->c, -1, REUSE_TIMEOUT); +} + +/** set timeout on tcp fd and setup read event to catch incoming dns msgs */ +static void +reuse_tcp_setup_read_and_timeout(struct pending_tcp* pend_tcp) { log_reuse_tcp(5, "reuse_tcp_setup_readtimeout", &pend_tcp->reuse); sldns_buffer_clear(pend_tcp->c->buffer); @@ -875,7 +899,7 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error, outnet_tcp_take_query_setup(pend->c->fd, pend, pend->query); } else { - reuse_tcp_setup_readtimeout(pend); + reuse_tcp_setup_timeout(pend); } return 0; } else if(error != NETEVENT_NOERROR) { @@ -897,13 +921,11 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error, * query again to the same destination. */ if(outnet->tcp_reuse.count < outnet->tcp_reuse_max) { (void)reuse_tcp_insert(outnet, pend); - reuse_tcp_setup_readtimeout(pend); } } if(pend->query) { reuse_tree_by_id_delete(&pend->reuse, pend->query); - fptr_ok(fptr_whitelist_pending_tcp(pend->query->cb)); - (void)(*pend->query->cb)(c, pend->query->cb_arg, error, reply_info); + waiting_tcp_callback(pend->query, c, error, reply_info); waiting_tcp_delete(pend->query); pend->query = NULL; } @@ -912,6 +934,7 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error, verbose(5, "outnet_tcp_cb reuse after cb: keep it"); /* it is in the reuse_tcp tree, with other queries, or * on the empty list. do not decommission it */ + reuse_tcp_setup_read_and_timeout(pend); return 0; } verbose(5, "outnet_tcp_cb reuse after cb: decommission it"); @@ -1742,12 +1765,9 @@ outnet_tcptimer(void* arg) verbose(5, "outnet_tcptimer"); if(w->on_tcp_waiting_list) { /* it is on the waiting list */ - comm_point_callback_type* cb = w->cb; - void* cb_arg = w->cb_arg; waiting_list_remove(outnet, w); + waiting_tcp_callback(w, NULL, NETEVENT_TIMEOUT, NULL); waiting_tcp_delete(w); - fptr_ok(fptr_whitelist_pending_tcp(cb)); - (void)(*cb)(NULL, cb_arg, NETEVENT_TIMEOUT, NULL); } else { /* it was in use */ struct pending_tcp* pend=(struct pending_tcp*)w->next_waiting; @@ -2154,6 +2174,11 @@ reuse_tcp_remove_serviced_keep(struct waiting_tcp* w, { struct pending_tcp* pend_tcp = (struct pending_tcp*)w->next_waiting; verbose(5, "reuse_tcp_remove_serviced_keep"); + /* remove the callback. let query continue to write to not cancel + * the stream itself. also keep it as an entry in the tree_by_id, + * in case the answer returns (that we no longer want), but we cannot + * pick the same ID number meanwhile */ + pend_tcp->query->cb = NULL; /* see if can be entered in reuse tree * for that the FD has to be non-1 */ if(pend_tcp->c->fd == -1) { @@ -2163,10 +2188,8 @@ reuse_tcp_remove_serviced_keep(struct waiting_tcp* w, /* if in tree and used by other queries */ if(pend_tcp->reuse.node.key) { verbose(5, "reuse_tcp_remove_serviced_keep: in use by other queries"); - /* note less use of stream */ - /* remove id value used by this svcd. */ /* do not reset the keepalive timer, for that - * we'd need traffic, and this is where the servicedq is + * we'd need traffic, and this is where the serviced is * removed due to state machine internal reasons, * eg. iterator no longer interested in this query */ return 1; @@ -2175,13 +2198,11 @@ reuse_tcp_remove_serviced_keep(struct waiting_tcp* w, if(pend_tcp->c->fd != -1 && sq->outnet->tcp_reuse.count < sq->outnet->tcp_reuse_max) { verbose(5, "reuse_tcp_remove_serviced_keep: keep open"); - /* note less use of stream */ - /* remove id value used by this svcd. */ /* set a keepalive timer on it */ if(!reuse_tcp_insert(sq->outnet, pend_tcp)) { return 0; } - reuse_tcp_setup_readtimeout(pend_tcp); + reuse_tcp_setup_timeout(pend_tcp); return 1; } return 0; @@ -2207,31 +2228,38 @@ serviced_delete(struct serviced_query* sq) * mesh */ outnet_send_wait_udp(sq->outnet); } else { - struct waiting_tcp* p = (struct waiting_tcp*) + struct waiting_tcp* w = (struct waiting_tcp*) sq->pending; verbose(5, "serviced_delete: TCP"); - /* TODO: if on stream-write-waiting list then + /* if on stream-write-waiting list then * remove from waiting list and waiting_tcp_delete */ - if(!p->on_tcp_waiting_list) { + if(w->write_wait_queued) { + struct pending_tcp* pend = + (struct pending_tcp*)w->next_waiting; + verbose(5, "serviced_delete: writewait"); + reuse_tree_by_id_delete(&pend->reuse, w); + reuse_write_wait_remove(&pend->reuse, w); + waiting_tcp_delete(w); + } else if(!w->on_tcp_waiting_list) { + struct pending_tcp* pend = + (struct pending_tcp*)w->next_waiting; verbose(5, "serviced_delete: tcpreusekeep"); - if(!reuse_tcp_remove_serviced_keep(p, sq)) { + if(!reuse_tcp_remove_serviced_keep(w, sq)) { reuse_cb_curquery_for_failure( - (struct pending_tcp*)p-> - next_waiting, NETEVENT_CLOSED); + pend, NETEVENT_CLOSED); reuse_cb_readwait_for_failure( - (struct pending_tcp*)p-> - next_waiting, NETEVENT_CLOSED); + pend, NETEVENT_CLOSED); reuse_cb_writewait_for_failure( - (struct pending_tcp*)p-> - next_waiting, NETEVENT_CLOSED); + pend, NETEVENT_CLOSED); decommission_pending_tcp(sq->outnet, - (struct pending_tcp*)p->next_waiting); + pend); use_free_buffer(sq->outnet); } + sq->pending = NULL; } else { verbose(5, "serviced_delete: tcpwait"); - waiting_list_remove(sq->outnet, p); - waiting_tcp_delete(p); + waiting_list_remove(sq->outnet, w); + waiting_tcp_delete(w); } } } diff --git a/services/outside_network.h b/services/outside_network.h index 7c061938a..0dcf1b2e1 100644 --- a/services/outside_network.h +++ b/services/outside_network.h @@ -375,7 +375,10 @@ struct waiting_tcp { uint8_t* pkt; /** length of query packet. */ size_t pkt_len; - /** callback for the timeout, error or reply to the message */ + /** callback for the timeout, error or reply to the message, + * or NULL if no user is waiting. the entry uses an ID number. + * a query that was written is no longer needed, but the ID number + * and a reply will come back and can be ignored if NULL */ comm_point_callback_type* cb; /** callback user argument */ void* cb_arg;