the framestream queue.

This commit is contained in:
W.C.A. Wijngaards 2020-01-21 14:50:37 +01:00
parent bb55cc1685
commit 57ad169605
5 changed files with 142 additions and 7 deletions

View file

@ -1924,6 +1924,9 @@ worker_delete(struct worker* worker)
#endif /* UB_ON_WINDOWS */
}
comm_base_delete(worker->base);
#ifdef USE_DNSTAP
dt_deinit(&worker->dtenv);
#endif
ub_randfree(worker->rndstate);
alloc_clear(&worker->alloc);
regional_destroy(worker->env.scratch);

View file

@ -53,6 +53,7 @@
#include <protobuf-c/protobuf-c.h>
#include "dnstap/dnstap.h"
#include "dnstap/dtstream.h"
#include "dnstap/dnstap.pb-c.h"
#define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap"
@ -90,13 +91,7 @@ dt_pack(const Dnstap__Dnstap *d, void **buf, size_t *sz)
static void
dt_send(const struct dt_env *env, void *buf, size_t len_buf)
{
fstrm_res res;
if (!buf)
return;
res = fstrm_iothr_submit(env->iothr, env->ioq, buf, len_buf,
fstrm_free_wrapper, NULL);
if (res != fstrm_res_success)
free(buf);
dt_msg_queue_submit(env->msgqueue, buf, len_buf);
}
static void
@ -275,9 +270,20 @@ dt_init(struct dt_env *env)
env->ioq = fstrm_iothr_get_input_queue(env->iothr);
if (env->ioq == NULL)
return 0;
env->msgqueue = dt_msg_queue_create();
if(!env->msgqueue) {
log_err("malloc failure");
return 0;
}
return 1;
}
void
dt_deinit(struct dt_env* env)
{
dt_msg_queue_delete(env->msgqueue);
}
void
dt_delete(struct dt_env *env)
{

View file

@ -43,6 +43,7 @@ struct config_file;
struct fstrm_io;
struct fstrm_queue;
struct sldns_buffer;
struct dt_msg_queue;
struct dt_env {
/** dnstap I/O thread */
@ -50,6 +51,9 @@ struct dt_env {
/** dnstap I/O thread input queue */
struct fstrm_iothr_queue *ioq;
/** valid in worker struct, not in daemon struct, the per-worker
* message list */
struct dt_msg_queue* msgqueue;
/** dnstap "identity" field, NULL if disabled */
char *identity;
@ -107,6 +111,11 @@ dt_apply_cfg(struct dt_env *env, struct config_file *cfg);
int
dt_init(struct dt_env *env);
/**
* Deletes the per-worker state created by dt_init
*/
void dt_deinit(struct dt_env *env);
/**
* Delete dnstap environment object. Closes dnstap I/O socket and deletes all
* per-worker I/O queues.

View file

@ -44,3 +44,94 @@
#include "config.h"
#include "dnstap/dtstream.h"
struct dt_msg_queue*
dt_msg_queue_create(void)
{
struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
if(!mq) return NULL;
mq->maxsize = 1*1024*1024; /* set max size of buffer, per worker,
about 1 M should contain 64K messages with some overhead,
or a whole bunch smaller ones */
lock_basic_init(&mq->lock);
lock_protect(&mq->lock, mq, sizeof(*mq));
return mq;
}
/** clear the message list, caller must hold the lock */
static void
dt_msg_queue_clear(struct dt_msg_queue* mq)
{
struct dt_msg_entry* e = mq->first, *next=NULL;
while(e) {
next = e->next;
free(e->buf);
free(e);
e = next;
}
mq->first = NULL;
mq->last = NULL;
mq->cursize = 0;
}
void
dt_msg_queue_delete(struct dt_msg_queue* mq)
{
if(!mq) return;
lock_basic_destroy(&mq->lock);
dt_msg_queue_clear(mq);
free(mq);
}
void
dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
{
struct dt_msg_entry* entry;
/* check conditions */
if(!buf) return;
if(len == 0) {
/* it is not possible to log entries with zero length,
* because the framestream protocol does not carry it.
* However the protobuf serialization does not create zero
* length datagrams for dnstap, so this should not happen. */
free(buf);
return;
}
if(!mq) {
free(buf);
return;
}
/* allocate memory for queue entry */
entry = malloc(sizeof(*entry));
if(!entry) {
log_err("out of memory logging dnstap");
free(buf);
return;
}
entry->next = NULL;
entry->buf = buf;
entry->len = len;
/* aqcuire lock */
lock_basic_lock(&mq->lock);
/* see if it is going to fit */
if(mq->cursize + len > mq->maxsize) {
/* buffer full, or congested. */
/* drop */
lock_basic_unlock(&mq->lock);
free(buf);
return;
}
mq->cursize += len;
/* append to list */
if(mq->last) {
mq->last->next = entry;
} else {
mq->first = entry;
}
mq->last = entry;
/* release lock */
lock_basic_unlock(&mq->lock);
}

View file

@ -209,5 +209,31 @@ struct dt_io_list_item {
/* routine to send a frame. */
/* routine to send STOP message. */
/**
* Create new (empty) worker message queue. Limit set to default on max.
* @return NULL on malloc failure or a new queue (not locked).
*/
struct dt_msg_queue* dt_msg_queue_create(void);
/**
* Delete a worker message queue. It has to be unlinked from access,
* so it can be deleted without lock worries. The queue is emptied (deleted).
* @param mq: message queue.
*/
void dt_msg_queue_delete(struct dt_msg_queue* mq);
/**
* Submit a message to the queue. The queue is locked by the routine,
* the message is inserted, and then the queue is unlocked so the
* message can be picked up by the writer thread.
* @param mq: message queue.
* @param buf: buffer with message (dnstap contents).
* The buffer must have been malloced by caller. It is linked in
* the queue, and is free()d after use. If the routine fails
* the buffer is freed as well (and nothing happens, the item
* could not be logged).
* @param len: length of buffer.
*/
void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len);
#endif /* DTSTREAM_H */