dnstap io, sleeps thread when there is no traffic.

This commit is contained in:
W.C.A. Wijngaards 2020-01-23 13:27:21 +01:00
parent c0f410f721
commit 8c47d16e29
2 changed files with 126 additions and 22 deletions

View file

@ -52,6 +52,13 @@
/** number of messages to process in one output callback */
#define DTIO_MESSAGES_PER_CALLBACK 100
/** DTIO command channel commands */
enum {
/** DTIO command channel stop */
DTIO_COMMAND_STOP = 0,
/** DTIO command channel wakeup */
DTIO_COMMAND_WAKEUP = 1
} dtio_channel_command;
void* fstrm_create_control_frame_start(char* contenttype, size_t* len)
{
@ -137,9 +144,38 @@ dt_msg_queue_delete(struct dt_msg_queue* mq)
free(mq);
}
/** make the dtio wake up by sending a wakeup command */
static void dtio_wakeup(struct dt_io_thread* dtio)
{
uint8_t cmd = DTIO_COMMAND_WAKEUP;
if(!dtio) return;
if(!dtio->event_base) return; /* not started */
while(1) {
ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
if(r == -1) {
#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
continue;
log_err("dnstap io wakeup: write: %s", strerror(errno));
#else
if(WSAGetLastError() == WSAEINPROGRESS)
continue;
if(WSAGetLastError() == WSAEWOULDBLOCK)
continue;
log_err("dnstap io stop: write: %s",
wsa_strerror(WSAGetLastError()));
#endif
break;
}
break;
}
}
void
dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
{
int wakeup = 0;
struct dt_msg_entry* entry;
/* check conditions */
@ -170,6 +206,9 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
/* aqcuire lock */
lock_basic_lock(&mq->lock);
/* list was empty, wakeup dtio */
if(mq->first == NULL)
wakeup = 1;
/* see if it is going to fit */
if(mq->cursize + len > mq->maxsize) {
/* buffer full, or congested. */
@ -188,6 +227,9 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
mq->last = entry;
/* release lock */
lock_basic_unlock(&mq->lock);
if(wakeup)
dtio_wakeup(mq->dtio);
}
struct dt_io_thread* dt_io_thread_create(void)
@ -221,6 +263,7 @@ int dt_io_thread_register_queue(struct dt_io_thread* dtio,
{
struct dt_io_list_item* item = malloc(sizeof(*item));
if(!item) return 0;
mq->dtio = dtio;
item->queue = mq;
item->next = dtio->io_list;
dtio->io_list = item;
@ -238,6 +281,7 @@ void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
if(prev) prev->next = item->next;
else dtio->io_list = item->next;
/* the queue itself only registered, not deleted */
item->queue->dtio = NULL;
free(item);
dtio->io_list_iter = NULL;
return;
@ -288,16 +332,25 @@ static int dtio_find_in_queue(struct dt_io_thread* dtio,
/** find a new message to write, search message queues, false if none */
static int dtio_find_msg(struct dt_io_thread* dtio)
{
struct dt_io_list_item* item;
struct dt_io_list_item *spot, *item;
if(dtio->io_list_iter)
item = dtio->io_list_iter;
else item = dtio->io_list;
spot = dtio->io_list_iter;
/* use the next queue for the next message lookup,
* if we hit the end(NULL) the NULL restarts the iter at start. */
if(item)
dtio->io_list_iter = item->next;
if(spot)
dtio->io_list_iter = spot->next;
else if(dtio->io_list)
dtio->io_list_iter = dtio->io_list->next;
/* scan from spot to end-of-io_list */
item = spot;
while(item) {
if(dtio_find_in_queue(dtio, item->queue))
return 1;
item = item->next;
}
/* scan starting at the start-of-list (to wrap around the end) */
item = dtio->io_list;
while(item) {
if(dtio_find_in_queue(dtio, item->queue))
return 1;
@ -323,6 +376,7 @@ static void dtio_del_output_event(struct dt_io_thread* dtio)
return;
ub_event_del(dtio->event);
dtio->event_added = 0;
dtio->event_added_is_write = 0;
}
/** close and stop the output file descriptor event */
@ -607,6 +661,52 @@ static int dtio_check_close(struct dt_io_thread* dtio)
return 0;
}
/** add the output file descriptor event for listening, read only */
static void dtio_add_output_event_read(struct dt_io_thread* dtio)
{
if(!dtio->event)
return;
if(dtio->event_added && !dtio->event_added_is_write)
return;
/* we have to (re-)register the event */
if(dtio->event_added)
ub_event_del(dtio->event);
ub_event_del_bits(dtio->event, UB_EV_WRITE);
if(ub_event_add(dtio->event, NULL) != 0) {
log_err("dnstap io: out of memory (adding event)");
return;
}
dtio->event_added = 1;
dtio->event_added_is_write = 0;
}
/** add the output file descriptor event for listening, read and write */
static void dtio_add_output_event_write(struct dt_io_thread* dtio)
{
if(!dtio->event)
return;
if(dtio->event_added && dtio->event_added_is_write)
return;
/* we have to (re-)register the event */
if(dtio->event_added)
ub_event_del(dtio->event);
ub_event_add_bits(dtio->event, UB_EV_WRITE);
if(ub_event_add(dtio->event, NULL) != 0) {
log_err("dnstap io: out of memory (adding event)");
return;
}
dtio->event_added = 1;
dtio->event_added_is_write = 1;
}
/** put the dtio thread to sleep */
static void dtio_sleep(struct dt_io_thread* dtio)
{
/* unregister the event polling for write, because there is
* nothing to be written */
dtio_add_output_event_read(dtio);
}
/** callback for the dnstap events, to write to the output */
static void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
{
@ -628,8 +728,14 @@ static void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
/* see if there are messages that need writing */
if(!dtio->cur_msg) {
if(!dtio_find_msg(dtio))
if(!dtio_find_msg(dtio)) {
if(i == 0) {
/* no messages on the first iteration,
* the queues are all empty */
dtio_sleep(dtio);
}
return; /* nothing to do */
}
}
/* write it */
@ -668,8 +774,13 @@ static void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
/* and then fall through to quit the thread */
} else if(r == 0) {
verbose(VERB_ALGO, "dnstap io: cmd channel closed");
} else if(r == 1 && cmd == 0) {
} else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
} else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
/* reregister event */
dtio_add_output_event_write(dtio);
return;
} else if(r == 1) {
verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
}
@ -1016,18 +1127,6 @@ static void dtio_open_output(struct dt_io_thread* dtio)
}
}
/** add the output file descriptor event for listening */
static void dtio_add_output_event(struct dt_io_thread* dtio)
{
if(!dtio->event)
return;
if(ub_event_add(dtio->event, NULL) != 0) {
log_err("dnstap io: out of memory (adding event)");
return;
}
dtio->event_added = 1;
}
/** the IO thread function for the DNSTAP IO */
static void* dnstap_io(void* arg)
{
@ -1039,7 +1138,7 @@ static void* dnstap_io(void* arg)
dtio_setup_base(dtio, &secs, &now);
dtio_setup_cmd(dtio);
dtio_open_output(dtio);
dtio_add_output_event(dtio);
dtio_add_output_event_write(dtio);
verbose(VERB_ALGO, "start dnstap io thread");
/* run */
@ -1077,7 +1176,7 @@ int dt_io_thread_start(struct dt_io_thread* dtio)
void dt_io_thread_stop(struct dt_io_thread* dtio)
{
uint8_t cmd = 0;
uint8_t cmd = DTIO_COMMAND_STOP;
if(!dtio) return;
if(!dtio->event_base) return; /* not started */

View file

@ -47,6 +47,7 @@
#include "util/locks.h"
struct dt_msg_entry;
struct dt_io_list_item;
struct dt_io_thread;
struct config_file;
/**
@ -70,6 +71,8 @@ struct dt_msg_queue {
/** list of messages. The messages are added to the back and taken
* out from the front. */
struct dt_msg_entry* first, *last;
/** reference to the io thread to wakeup */
struct dt_io_thread* dtio;
};
/**
@ -105,6 +108,8 @@ struct dt_io_thread {
void* event;
/** the event is added */
int event_added;
/** event added is a write event */
int event_added_is_write;
/** check for nonblocking connect errors on fd */
int check_nb_connect;
/** the buffer that currently getting written, or NULL if no