stream reuse, write and read again if more data can go over the channel,

this amortizes the event loop mechanism for busy channels, for performance.
This commit is contained in:
W.C.A. Wijngaards 2020-07-13 15:16:59 +02:00
parent 9914b7216b
commit 19a35fb839
3 changed files with 75 additions and 0 deletions

View file

@ -911,12 +911,19 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
/* setup to write next packet or setup read timeout */
if(pend->reuse.write_wait_first) {
verbose(VERB_ALGO, "outnet tcp setup next pkt");
/* we can write it straight away perhaps, set flag
* because this callback called after a tcp write
* succeeded and likely more buffer space is available
* and we can write some more. */
pend->c->tcp_more_write_again = 1;
pend->query = reuse_write_wait_pop(&pend->reuse);
outnet_tcp_take_query_setup(pend->c->fd, pend,
pend->query);
} else {
verbose(VERB_ALGO, "outnet tcp writes done, wait");
pend->c->tcp_write_and_read = 0;
pend->c->tcp_more_read_again = 0;
pend->c->tcp_more_write_again = 0;
pend->c->tcp_is_reading = 1;
reuse_tcp_setup_timeout(pend);
}
@ -964,6 +971,12 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error,
verbose(5, "outnet_tcp_cb reuse after cb: keep it");
/* it is in the reuse_tcp tree, with other queries, or
* on the empty list. do not decommission it */
/* if there are more outstanding queries, we could try to
* read again, to see if it is on the input,
* because this callback called after a successful read
* and there could be more bytes to read on the input */
if(pend->reuse.tree_by_id.count != 0)
pend->c->tcp_more_read_again = 1;
reuse_tcp_setup_read_and_timeout(pend);
return 0;
}

View file

@ -985,6 +985,8 @@ reclaim_tcp_handler(struct comm_point* c)
comm_point_start_listening(c->tcp_parent, -1, -1);
}
}
c->tcp_more_read_again = 0;
c->tcp_more_write_again = 0;
}
/** do the callback when writing is done */
@ -1852,6 +1854,52 @@ tcp_req_info_read_again(int fd, struct comm_point* c)
}
}
/** read again to drain buffers when there could be more to read */
static void
tcp_more_read_again(int fd, struct comm_point* c)
{
/* if the packet is done, but another one could be waiting on
* the connection, the callback signals this, and we try again */
/* this continues until the read routines get EAGAIN or so,
* and thus does not call the callback, and the bool is 0 */
while(c->tcp_more_read_again) {
c->tcp_more_read_again = 0;
if(!comm_point_tcp_handle_read(fd, c, 0)) {
reclaim_tcp_handler(c);
if(!c->tcp_do_close) {
fptr_ok(fptr_whitelist_comm_point(
c->callback));
(void)(*c->callback)(c, c->cb_arg,
NETEVENT_CLOSED, NULL);
}
return;
}
}
}
/** write again to fill up when there could be more to write */
static void
tcp_more_write_again(int fd, struct comm_point* c)
{
/* if the packet is done, but another is waiting to be written,
* the callback signals it and we try again. */
/* this continues until the write routines get EAGAIN or so,
* and thus does not call the callback, and the bool is 0 */
while(c->tcp_more_write_again) {
c->tcp_more_write_again = 0;
if(!comm_point_tcp_handle_write(fd, c)) {
reclaim_tcp_handler(c);
if(!c->tcp_do_close) {
fptr_ok(fptr_whitelist_comm_point(
c->callback));
(void)(*c->callback)(c, c->cb_arg,
NETEVENT_CLOSED, NULL);
}
return;
}
}
}
void
comm_point_tcp_handle_callback(int fd, short event, void* arg)
{
@ -1903,6 +1951,8 @@ comm_point_tcp_handle_callback(int fd, short event, void* arg)
}
if(has_tcpq && c->tcp_req_info && c->tcp_req_info->read_again)
tcp_req_info_read_again(fd, c);
if(c->tcp_more_read_again)
tcp_more_read_again(fd, c);
return;
}
if(event&UB_EV_WRITE) {
@ -1918,6 +1968,8 @@ comm_point_tcp_handle_callback(int fd, short event, void* arg)
}
if(has_tcpq && c->tcp_req_info && c->tcp_req_info->read_again)
tcp_req_info_read_again(fd, c);
if(c->tcp_more_write_again)
tcp_more_write_again(fd, c);
return;
}
log_err("Ignored event %d for tcphdl.", event);

View file

@ -268,6 +268,16 @@ struct comm_point {
/** length of tcp_write_pkt in bytes */
size_t tcp_write_pkt_len;
/** if set try to read another packet again (over connection with
* multiple packets), once set, tries once, then zero again,
* so set it in the packet complete section. */
int tcp_more_read_again;
/** if set try to write another packet (over connection with
* multiple packets), once set, tries once, then zero again,
* so set it in the packet complete section. */
int tcp_more_write_again;
/** if set, read/write completes:
read/write state of tcp is toggled.
buffer reset/bytecount reset.