iothread work.

This commit is contained in:
W.C.A. Wijngaards 2020-01-21 17:01:25 +01:00
parent 57ad169605
commit efc79beb2d
7 changed files with 398 additions and 5 deletions

View file

@ -452,10 +452,9 @@ daemon_create_workers(struct daemon* daemon)
if(daemon->cfg->dnstap) {
#ifdef USE_DNSTAP
daemon->dtenv = dt_create(daemon->cfg->dnstap_socket_path,
(unsigned int)daemon->num);
(unsigned int)daemon->num, daemon->cfg);
if (!daemon->dtenv)
fatal_exit("dt_create failed");
dt_apply_cfg(daemon->dtenv, daemon->cfg);
#else
fatal_exit("dnstap enabled in config but not built with dnstap support");
#endif

View file

@ -78,6 +78,7 @@
#include "sldns/wire2str.h"
#include "util/shm_side/shm_main.h"
#include "dnscrypt/dnscrypt.h"
#include "dnstap/dtstream.h"
#ifdef HAVE_SYS_TYPES_H
# include <sys/types.h>
@ -1876,6 +1877,19 @@ worker_init(struct worker* worker, struct config_file *cfg,
) {
auth_xfer_pickup_initial(worker->env.auth_zones, &worker->env);
}
#ifdef USE_DNSTAP
if(worker->daemon->cfg->dnstap
#ifndef THREADS_DISABLED
&& worker->thread_num == 0
#endif
) {
if(!dt_io_thread_start(dtenv->dtio)) {
log_err("could not start dnstap io thread");
worker_delete(worker);
return 0;
}
}
#endif /* USE_DNSTAP */
if(!worker->env.mesh || !worker->env.scratch_buffer) {
worker_delete(worker);
return 0;
@ -1925,8 +1939,15 @@ worker_delete(struct worker* worker)
}
comm_base_delete(worker->base);
#ifdef USE_DNSTAP
dt_deinit(&worker->dtenv);
if(worker->daemon->cfg->dnstap
#ifndef THREADS_DISABLED
&& worker->thread_num == 0
#endif
) {
dt_io_thread_stop(worker->dtenv.dtio);
}
dt_deinit(&worker->dtenv);
#endif /* USE_DNSTAP */
ub_randfree(worker->rndstate);
alloc_clear(&worker->alloc);
regional_destroy(worker->env.scratch);

BIN
dnstap/.dtstream.h.swp Normal file

Binary file not shown.

View file

@ -130,7 +130,7 @@ check_socket_file(const char* socket_path)
}
struct dt_env *
dt_create(const char *socket_path, unsigned num_workers)
dt_create(const char *socket_path, unsigned num_workers, struct config_file* cfg)
{
#ifdef UNBOUND_DEBUG
fstrm_res res;
@ -180,6 +180,16 @@ dt_create(const char *socket_path, unsigned num_workers)
fstrm_unix_writer_options_destroy(&fuwopt);
fstrm_writer_options_destroy(&fwopt);
env->dtio = dt_io_thread_create();
if(!env->dtio) {
log_err("malloc failure");
fstrm_writer_destroy(&fw);
fstrm_iothr_destroy(&env->iothr);
free(env);
return NULL;
}
dt_io_thread_apply_cfg(env->dtio, cfg);
dt_apply_cfg(env, cfg);
return env;
}
@ -275,12 +285,17 @@ dt_init(struct dt_env *env)
log_err("malloc failure");
return 0;
}
if(!dt_io_thread_register_queue(env->dtio, env->msgqueue)) {
log_err("malloc failure");
return 0;
}
return 1;
}
void
dt_deinit(struct dt_env* env)
{
dt_io_thread_unregister_queue(env->dtio, env->msgqueue);
dt_msg_queue_delete(env->msgqueue);
}
@ -291,6 +306,7 @@ dt_delete(struct dt_env *env)
return;
verbose(VERB_OPS, "closing dnstap socket");
fstrm_iothr_destroy(&env->iothr);
dt_io_thread_delete(env->dtio);
free(env->identity);
free(env->version);
free(env);

View file

@ -48,6 +48,8 @@ struct dt_msg_queue;
struct dt_env {
/** dnstap I/O thread */
struct fstrm_iothr *iothr;
/** the io thread (made by the struct daemon) */
struct dt_io_thread* dtio;
/** dnstap I/O thread input queue */
struct fstrm_iothr_queue *ioq;
@ -90,10 +92,11 @@ struct dt_env {
* share access to the dnstap I/O socket.
* @param socket_path: path to dnstap logging socket, must be non-NULL.
* @param num_workers: number of worker threads, must be > 0.
* @param cfg: with config settings.
* @return dt_env object, NULL on failure.
*/
struct dt_env *
dt_create(const char *socket_path, unsigned num_workers);
dt_create(const char *socket_path, unsigned num_workers, struct config_file* cfg);
/**
* Apply config settings.

View file

@ -43,6 +43,12 @@
#include "config.h"
#include "dnstap/dtstream.h"
#include "util/config_file.h"
#include "util/ub_event.h"
#include "util/net_help.h"
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif
struct dt_msg_queue*
dt_msg_queue_create(void)
@ -135,3 +141,285 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
lock_basic_unlock(&mq->lock);
}
struct dt_io_thread* dt_io_thread_create(void)
{
struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
return dtio;
}
void dt_io_thread_delete(struct dt_io_thread* dtio)
{
struct dt_io_list_item* item, *nextitem;
if(!dtio) return;
item=dtio->io_list;
while(item) {
nextitem = item->next;
free(item);
item = nextitem;
}
free(dtio->socket_path);
free(dtio);
}
void dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
{
dtio->upstream_is_unix = 1;
dtio->socket_path = strdup(cfg->dnstap_socket_path);
}
int dt_io_thread_register_queue(struct dt_io_thread* dtio,
struct dt_msg_queue* mq)
{
struct dt_io_list_item* item = malloc(sizeof(*item));
if(!item) return 0;
item->queue = mq;
item->next = dtio->io_list;
dtio->io_list = item;
return 1;
}
void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
struct dt_msg_queue* mq)
{
struct dt_io_list_item* item=dtio->io_list, *prev=NULL;
while(item) {
if(item->queue == mq) {
/* found it */
if(prev) prev->next = item->next;
else dtio->io_list = item->next;
/* the queue itself only registered, not deleted */
free(item);
return;
}
prev = item;
item = item->next;
}
}
/** find a new message to write, search message queues, false if none */
static int dtio_find_msg(struct dt_io_thread* dtio)
{
}
/** write more of the current messsage. false if incomplete, true if
* the message is done */
static int dtio_write_more(struct dt_io_thread* dtio)
{
}
/** callback for the dnstap events, to write to the output */
static void dtio_output_cb(int fd, short ATTR_UNUSED(bits), void* arg)
{
struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
/* see if there are messages that need writing */
if(!dtio->cur_msg) {
if(!dtio_find_msg(dtio))
return; /* nothing to do */
}
/* write it */
if(dtio->cur_msg_done < dtio->cur_msg_len) {
if(!dtio_write_more(dtio))
return;
}
/* done with the current message */
free(dtio->cur_msg);
dtio->cur_msg = NULL;
dtio->cur_msg_len = 0;
dtio->cur_msg_done = 0;
}
/** callback for the dnstap commandpipe, to stop the dnstap IO */
static void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
{
struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
uint8_t cmd;
ssize_t r;
if(dtio->want_to_exit)
return;
r = read(fd, &cmd, sizeof(cmd));
if(r == -1) {
if(errno == EINTR || errno == EAGAIN)
return; /* ignore this */
log_err("dnstap io: failed to read: %s", strerror(errno));
/* and then fall through to quit the thread */
}
if(r == 0) {
verbose(VERB_ALGO, "dnstap io: cmd channel closed");
} else if(r == 1 && cmd == 0) {
verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
}
dtio->want_to_exit = 1;
if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
!= 0) {
log_err("dnstap io: could not loopexit");
}
}
/** setup the event base for the dnstap io thread */
static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
struct timeval* now)
{
memset(now, 0, sizeof(*now));
dtio->event_base = ub_default_event_base(0, secs, now);
if(!dtio->event_base) {
fatal_exit("dnstap io: could not create event_base");
}
}
/** setup the cmd event for dnstap io */
static void dtio_setup_cmd(struct dt_io_thread* dtio)
{
struct ub_event* cmdev;
fd_set_nonblock(dtio->commandpipe[0]);
cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
if(!cmdev) {
fatal_exit("dnstap io: out of memory");
}
dtio->command_event = cmdev;
if(ub_event_add(cmdev, NULL) != 0) {
fatal_exit("dnstap io: out of memory (adding event)");
}
}
/** del the output file descriptor event for listening */
static void dtio_del_output_event(struct dt_io_thread* dtio)
{
if(!dtio->event_added)
return;
ub_event_del(dtio->event);
dtio->event_added = 0;
}
/** close and stop the output file descriptor event */
static void dtio_close_output(struct dt_io_thread* dtio)
{
if(!dtio->event)
return;
ub_event_free(dtio->event);
dtio->event = NULL;
close(dtio->fd);
dtio->fd = -1;
}
/** perform desetup and free stuff when the dnstap io thread exits */
static void dtio_desetup(struct dt_io_thread* dtio)
{
dtio_del_output_event(dtio);
dtio_close_output(dtio);
ub_event_del(dtio->command_event);
ub_event_free(dtio->command_event);
close(dtio->commandpipe[0]);
dtio->commandpipe[0] = -1;
ub_event_base_free(dtio->event_base);
}
/** open the output file descriptor */
static void dtio_open_output(struct dt_io_thread* dtio)
{
struct ub_event* ev;
struct sockaddr_un s;
dtio->fd = socket(AF_LOCAL, SOCK_STREAM, SOCK_CLOEXEC);
if(dtio->fd == -1) {
log_err("dnstap io: failed to create socket: %s",
strerror(errno));
return;
}
memset(&s, 0, sizeof(s));
#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
/* this member exists on BSDs, not Linux */
s.sun_len = (unsigned)sizeof(usock);
#endif
s.sun_family = AF_LOCAL;
/* length is 92-108, 104 on FreeBSD */
(void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
== -1) {
log_err("dnstap io: failed to connect: %s", strerror(errno));
return;
}
fd_set_nonblock(dtio->fd);
/* the EV_READ is to catch channel close, write to write packets */
ev = ub_event_new(dtio->event_base, dtio->fd,
UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
dtio);
if(!ev) {
fatal_exit("dnstap io: out of memory");
}
dtio->event = ev;
}
/** add the output file descriptor event for listening */
static void dtio_add_output_event(struct dt_io_thread* dtio)
{
if(ub_event_add(dtio->event, NULL) != 0) {
fatal_exit("dnstap io: out of memory (adding event)");
}
dtio->event_added = 1;
}
/** the IO thread function for the DNSTAP IO */
static void* dnstap_io(void* arg)
{
struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
time_t secs = 0;
struct timeval now;
/* setup */
dtio_setup_base(dtio, &secs, &now);
dtio_setup_cmd(dtio);
dtio_open_output(dtio);
dtio_add_output_event(dtio);
verbose(VERB_ALGO, "start dnstap io thread");
/* run */
if(ub_event_base_dispatch(dtio->event_base) < 0) {
log_err("dnstap io: dispatch failed, errno is %s",
strerror(errno));
}
/* cleanup */
verbose(VERB_ALGO, "stop dnstap io thread");
dtio_desetup(dtio);
return NULL;
}
int dt_io_thread_start(struct dt_io_thread* dtio)
{
/* set up the thread, can fail */
if(pipe(dtio->commandpipe) == -1) {
log_err("failed to create pipe: %s", strerror(errno));
return 0;
}
/* start the thread */
ub_thread_create(&dtio->tid, dnstap_io, dtio);
return 1;
}
void dt_io_thread_stop(struct dt_io_thread* dtio)
{
uint8_t cmd = 0;
if(!dtio) return;
if(!dtio->event_base) return; /* not started */
while(1) {
ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
if(r == -1) {
if(errno == EINTR || errno == EAGAIN)
continue;
log_err("dnstap io stop: write: %s", strerror(errno));
break;
}
break;
}
close(dtio->commandpipe[1]);
dtio->commandpipe[1] = -1;
ub_thread_join(dtio->tid);
}

View file

@ -47,6 +47,7 @@
#include "util/locks.h"
struct dt_msg_entry;
struct dt_io_list_item;
struct config_file;
/**
* A message buffer with dnstap messages queued up. It is per-worker.
@ -92,16 +93,29 @@ struct dt_io_thread {
void* event_base;
/** list of queues that is registered to get written */
struct dt_io_list_item* io_list;
/** thread id, of the io thread */
ub_thread_type tid;
/** file descriptor that the thread writes to */
int fd;
/** event structure that the thread uses */
void* event;
/** the event is added */
int event_added;
/** the buffer that currently getting written, or NULL if no
* (partial) message written now */
void* cur_msg;
/** length of the current message */
size_t cur_msg_len;
/** number of bytes written for the current message */
size_t cur_msg_done;
/** command pipe that stops the pipe if closed. Used to quit
* the program. [0] is read, [1] is written to. */
int commandpipe[2];
/** the event to listen to the commandpipe */
void* command_event;
/** the io thread wants to exit */
int want_to_exit;
/** If the log server is connected to over unix domain sockets,
* eg. a file is named that is created to log onto. */
@ -236,4 +250,56 @@ void dt_msg_queue_delete(struct dt_msg_queue* mq);
*/
void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len);
/**
* Create IO thread.
* @return new io thread object. not yet started. or NULL malloc failure.
*/
struct dt_io_thread* dt_io_thread_create(void);
/**
* Delete the IO thread structure.
* @param dtio: the io thread that is deleted. It must not be running.
*/
void dt_io_thread_delete(struct dt_io_thread* dtio);
/**
* Apply config to the dtio thread
* @param dtio: io thread, not yet started.
* @param cfg: config file struct.
*/
void dt_io_thread_apply_cfg(struct dt_io_thread* dtio,
struct config_file *cfg);
/**
* Register a msg queue to the io thread. It will be polled to see if
* there are messages and those then get removed and sent, when the thread
* is running.
* @param dtio: the io thread.
* @param mq: message queue to register.
* @return false on failure (malloc failure).
*/
int dt_io_thread_register_queue(struct dt_io_thread* dtio,
struct dt_msg_queue* mq);
/**
* Unregister queue from io thread.
* @param dtio: the io thread.
* @param mq: message queue.
*/
void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
struct dt_msg_queue* mq);
/**
* Start the io thread
* @param dtio: the io thread.
* @return false on failure.
*/
int dt_io_thread_start(struct dt_io_thread* dtio);
/**
* Stop the io thread
* @param dtio: the io thread.
*/
void dt_io_thread_stop(struct dt_io_thread* dtio);
#endif /* DTSTREAM_H */