stream reuse, fix double callback and double delete, items are in the tree.

This commit is contained in:
W.C.A. Wijngaards 2020-07-09 15:47:24 +02:00
parent 8201d1422b
commit 79f315f480

View file

@ -753,20 +753,6 @@ static void reuse_del_readwait(struct pending_tcp* pend)
rbtree_init(&pend->reuse.tree_by_id, reuse_id_cmp);
}
/** delete writewait waiting_tcp elements, deletes the elements in the list */
static void reuse_del_writewait(struct pending_tcp* pend)
{
struct waiting_tcp* w, *n;
w = pend->reuse.write_wait_first;
while(w) {
n = w->write_wait_next;
waiting_tcp_delete(w);
w = n;
}
pend->reuse.write_wait_first = NULL;
pend->reuse.write_wait_last = NULL;
}
/** decommission a tcp buffer, closes commpoint and frees waiting_tcp entry */
static void
decommission_pending_tcp(struct outside_network* outnet,
@ -787,11 +773,12 @@ decommission_pending_tcp(struct outside_network* outnet,
/* needs unlink from the reuse tree to get deleted */
reuse_tcp_remove_tree_list(outnet, &pend->reuse);
}
if(pend->query && !rbtree_search(&pend->reuse.tree_by_id, pend->query))
waiting_tcp_delete(pend->query);
/* unlink the query and writewait list, it is part of the tree
* nodes and is deleted */
pend->query = NULL;
pend->reuse.write_wait_first = NULL;
pend->reuse.write_wait_last = NULL;
reuse_del_readwait(pend);
reuse_del_writewait(pend);
}
/** insert into reuse tcp tree and LRU, false on failure (duplicate) */
@ -823,17 +810,6 @@ reuse_tcp_insert(struct outside_network* outnet, struct pending_tcp* pend_tcp)
return 1;
}
/** perform failure callbacks for waiting queries in reuse write list */
static void reuse_cb_writewait_for_failure(struct pending_tcp* pend, int err)
{
struct waiting_tcp* w;
w = pend->reuse.write_wait_first;
while(w) {
waiting_tcp_callback(w, NULL, err, NULL);
w = w->write_wait_next;
}
}
/** perform failure callbacks for waiting queries in reuse read rbtree */
static void reuse_cb_readwait_for_failure(struct pending_tcp* pend, int err)
{
@ -849,15 +825,6 @@ static void reuse_cb_readwait_for_failure(struct pending_tcp* pend, int err)
}
}
/** perform failure callbacks for current written query in reuse struct */
static void reuse_cb_curquery_for_failure(struct pending_tcp* pend, int err)
{
struct waiting_tcp* w = pend->query;
if(w) {
waiting_tcp_callback(w, NULL, err, NULL);
}
}
/** delete element from tree by id */
static void
reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w)
@ -984,8 +951,6 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
* to error. Close it */
reuse_cb_readwait_for_failure(pend, (error==NETEVENT_TIMEOUT?
NETEVENT_TIMEOUT:NETEVENT_CLOSED));
reuse_cb_writewait_for_failure(pend, (error==NETEVENT_TIMEOUT?
NETEVENT_TIMEOUT:NETEVENT_CLOSED));
decommission_pending_tcp(outnet, pend);
use_free_buffer(outnet);
return 0;
@ -1843,18 +1808,10 @@ outnet_tcptimer(void* arg)
outnet->tcp_free = pend;
/* do failure callbacks for all the queries in the
* wait for write list and in the id-tree */
/* callback for 'w' arg already in list of curquery,
* readwait list, writewait list */
reuse_cb_curquery_for_failure(&pickup, NETEVENT_TIMEOUT);
* id-tree, that includes the pend.query and write list */
reuse_cb_readwait_for_failure(&pickup, NETEVENT_TIMEOUT);
reuse_cb_writewait_for_failure(&pickup, NETEVENT_TIMEOUT);
/* delete the stored callback structures */
if(pickup.query &&
!rbtree_search(&pickup.reuse.tree_by_id, pickup.query))
waiting_tcp_delete(pickup.query);
reuse_del_readwait(&pickup);
reuse_del_writewait(&pickup);
}
use_free_buffer(outnet);
}
@ -1880,9 +1837,7 @@ reuse_tcp_close_oldest(struct outside_network* outnet)
}
/* free up */
reuse_cb_curquery_for_failure(pend, NETEVENT_CLOSED);
reuse_cb_readwait_for_failure(pend, NETEVENT_CLOSED);
reuse_cb_writewait_for_failure(pend, NETEVENT_CLOSED);
decommission_pending_tcp(outnet, pend);
}
@ -2288,12 +2243,8 @@ serviced_delete(struct serviced_query* sq)
(struct pending_tcp*)w->next_waiting;
verbose(5, "serviced_delete: tcpreusekeep");
if(!reuse_tcp_remove_serviced_keep(w, sq)) {
reuse_cb_curquery_for_failure(
pend, NETEVENT_CLOSED);
reuse_cb_readwait_for_failure(
pend, NETEVENT_CLOSED);
reuse_cb_writewait_for_failure(
pend, NETEVENT_CLOSED);
decommission_pending_tcp(sq->outnet,
pend);
use_free_buffer(sq->outnet);