diff --git a/daemon/worker.c b/daemon/worker.c index 759fd0457..85d5b7de0 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -1884,7 +1884,7 @@ worker_init(struct worker* worker, struct config_file *cfg, #endif ) { if(!dt_io_thread_start(dtenv->dtio, comm_base_internal( - worker->base))) { + worker->base), worker->daemon->num)) { log_err("could not start dnstap io thread"); worker_delete(worker); return 0; diff --git a/dnstap/dtstream.c b/dnstap/dtstream.c index 8d98c559d..bd4d39acb 100644 --- a/dnstap/dtstream.c +++ b/dnstap/dtstream.c @@ -277,7 +277,9 @@ int dt_io_thread_register_queue(struct dt_io_thread* dtio, { struct dt_io_list_item* item = malloc(sizeof(*item)); if(!item) return 0; + lock_basic_lock(&mq->lock); mq->dtio = dtio; + lock_basic_unlock(&mq->lock); item->queue = mq; item->next = dtio->io_list; dtio->io_list = item; @@ -297,7 +299,9 @@ void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, if(prev) prev->next = item->next; else dtio->io_list = item->next; /* the queue itself only registered, not deleted */ + lock_basic_lock(&item->queue->lock); item->queue->dtio = NULL; + lock_basic_unlock(&item->queue->lock); free(item); dtio->io_list_iter = NULL; return; @@ -1269,6 +1273,7 @@ static void* dnstap_io(void* arg) struct dt_io_thread* dtio = (struct dt_io_thread*)arg; time_t secs = 0; struct timeval now; + log_thread_set(&dtio->threadnum); /* setup */ verbose(VERB_ALGO, "start dnstap io thread"); @@ -1288,7 +1293,8 @@ static void* dnstap_io(void* arg) } #endif /* THREADS_DISABLED */ -int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr) +int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr, + int numworkers) { /* set up the thread, can fail */ #ifndef USE_WINSOCK @@ -1305,6 +1311,7 @@ int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr) #endif /* start the thread */ + dtio->threadnum = numworkers+1; dtio->started = 1; #ifndef THREADS_DISABLED ub_thread_create(&dtio->tid, dnstap_io, dtio); diff --git a/dnstap/dtstream.h b/dnstap/dtstream.h index c2b898b86..9309f5c0b 100644 --- a/dnstap/dtstream.h +++ b/dnstap/dtstream.h @@ -92,6 +92,9 @@ struct dt_msg_entry { * IO thread that reads from the queues and writes them. */ struct dt_io_thread { + /** the thread number for the dtio thread, + * must be first to cast thread arg to int* in checklock code. */ + int threadnum; /** event base, for event handling */ void* event_base; /** list of queues that is registered to get written */ @@ -348,9 +351,12 @@ void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, * @param event_base_nothr: the event base to attach the events to, in case * we are running without threads. With threads, this is ignored * and a thread is started to process the dnstap log messages. + * @param numworkers: number of worker threads. The dnstap io thread is + * that number +1 as the threadnumber (in logs). * @return false on failure. */ -int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr); +int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr, + int numworkers); /** * Stop the io thread