diff --git a/dnstap/dtstream.c b/dnstap/dtstream.c index bbf202c57..f3d50a027 100644 --- a/dnstap/dtstream.c +++ b/dnstap/dtstream.c @@ -52,6 +52,11 @@ /** number of messages to process in one output callback */ #define DTIO_MESSAGES_PER_CALLBACK 100 +/** the msec to wait for reconnect (if not immediate, the first attempt) */ +#define DTIO_RECONNECT_TIMEOUT_MIN 10 +/** the msec to wait for reconnect max after backoff */ +#define DTIO_RECONNECT_TIMEOUT_MAX 1000 + /** DTIO command channel commands */ enum { /** DTIO command channel stop */ @@ -60,6 +65,13 @@ enum { DTIO_COMMAND_WAKEUP = 1 } dtio_channel_command; +/** open the output channel */ +static void dtio_open_output(struct dt_io_thread* dtio); +/** add output event for read and write */ +static void dtio_add_output_event_write(struct dt_io_thread* dtio); +/** start reconnection attempts */ +static void dtio_reconnect_enable(struct dt_io_thread* dtio); + void* fstrm_create_control_frame_start(char* contenttype, size_t* len) { uint32_t* control; @@ -359,6 +371,73 @@ static int dtio_find_msg(struct dt_io_thread* dtio) return 0; } +/** callback for the dnstap reconnect, to start reconnecting to output */ +static void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd), + short ATTR_UNUSED(bits), void* arg) +{ + struct dt_io_thread* dtio = (struct dt_io_thread*)arg; + dtio->reconnect_is_added = 0; + verbose(VERB_ALGO, "dnstap io: reconnect timer"); + + dtio_open_output(dtio); + if(dtio->event) { + dtio_add_output_event_write(dtio); + /* nothing wrong so far, wait on the output event */ + return; + } + /* exponential backoff and retry on timer */ + dtio_reconnect_enable(dtio); +} + +/** attempt to reconnect to the output, after a timeout */ +static void dtio_reconnect_enable(struct dt_io_thread* dtio) +{ + struct timeval tv; + int msec; + if(dtio->reconnect_is_added) + return; /* already done */ + + /* exponential backoff, store the value for next timeout */ + msec = dtio->reconnect_timeout; + if(msec == 0) { + dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN; + } else { + dtio->reconnect_timeout = msec*2; + if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX) + dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX; + } + verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec", + msec); + + /* setup wait timer */ + memset(&tv, 0, sizeof(tv)); + tv.tv_sec = msec/1000; + tv.tv_usec = (msec%1000)*1000; + if(ub_timer_add(dtio->reconnect_timer, dtio->event_base, + &dtio_reconnect_timeout_cb, dtio, &tv) != 0) { + log_err("dnstap io: could not reconnect ev timer add"); + return; + } + dtio->reconnect_is_added = 1; +} + +/** remove dtio reconnect timer */ +static void dtio_reconnect_del(struct dt_io_thread* dtio) +{ + if(!dtio->reconnect_is_added) + return; + ub_timer_del(dtio->reconnect_timer); + dtio->reconnect_is_added = 0; +} + +/** clear the reconnect exponential backoff timer. + * We have successfully connected so we can try again with short timeouts. */ +static void dtio_reconnect_clear(struct dt_io_thread* dtio) +{ + dtio->reconnect_timeout = 0; + dtio_reconnect_del(dtio); +} + /** delete the current message in the dtio, and reset counters */ static void dtio_cur_msg_free(struct dt_io_thread* dtio) { @@ -399,6 +478,7 @@ static void dtio_close_output(struct dt_io_thread* dtio) if(dtio->cur_msg) { dtio_cur_msg_free(dtio); } + dtio_reconnect_enable(dtio); } /** check for pending nonblocking connect errors, @@ -443,6 +523,7 @@ static int dtio_check_nb_connect(struct dt_io_thread* dtio) } verbose(VERB_ALGO, "dnstap io: connected to \"%s\"", dtio->socket_path); + dtio_reconnect_clear(dtio); dtio->check_nb_connect = 0; return 1; /* everything okay */ } @@ -822,6 +903,17 @@ static void dtio_setup_cmd(struct dt_io_thread* dtio) } } +/** setup the reconnect event for dnstap io */ +static void dtio_setup_reconnect(struct dt_io_thread* dtio) +{ + dtio_reconnect_clear(dtio); + dtio->reconnect_timer = ub_event_new(dtio->event_base, -1, + UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio); + if(!dtio->reconnect_timer) { + fatal_exit("dnstap io: out of memory"); + } +} + /** * structure to keep track of information during stop flush */ @@ -1032,6 +1124,8 @@ static void dtio_desetup(struct dt_io_thread* dtio) _close(dtio->commandpipe[0]); #endif dtio->commandpipe[0] = -1; + dtio_reconnect_del(dtio); + ub_event_free(dtio->reconnect_timer); dtio_cur_msg_free(dtio); ub_event_base_free(dtio->event_base); } @@ -1096,6 +1190,7 @@ static void dtio_open_output(struct dt_io_thread* dtio) closesocket(dtio->fd); #endif dtio->fd = -1; + dtio_reconnect_enable(dtio); return; } dtio->check_nb_connect = 1; @@ -1105,19 +1200,21 @@ static void dtio_open_output(struct dt_io_thread* dtio) UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb, dtio); if(!ev) { + log_err("dnstap io: out of memory"); #ifndef USE_WINSOCK close(dtio->fd); #else closesocket(dtio->fd); #endif dtio->fd = -1; - log_err("dnstap io: out of memory"); + dtio_reconnect_enable(dtio); return; } dtio->event = ev; /* setup protocol control message to start */ if(!dtio_control_start_send(dtio)) { + log_err("dnstap io: out of memory"); ub_event_free(dtio->event); dtio->event = NULL; #ifndef USE_WINSOCK @@ -1126,7 +1223,7 @@ static void dtio_open_output(struct dt_io_thread* dtio) closesocket(dtio->fd); #endif dtio->fd = -1; - log_err("dnstap io: out of memory"); + dtio_reconnect_enable(dtio); return; } } @@ -1139,11 +1236,12 @@ static void* dnstap_io(void* arg) struct timeval now; /* setup */ + verbose(VERB_ALGO, "start dnstap io thread"); dtio_setup_base(dtio, &secs, &now); dtio_setup_cmd(dtio); + dtio_setup_reconnect(dtio); dtio_open_output(dtio); dtio_add_output_event_write(dtio); - verbose(VERB_ALGO, "start dnstap io thread"); /* run */ if(ub_event_base_dispatch(dtio->event_base) < 0) { diff --git a/dnstap/dtstream.h b/dnstap/dtstream.h index 77bda6ae0..2f1546527 100644 --- a/dnstap/dtstream.h +++ b/dnstap/dtstream.h @@ -102,6 +102,7 @@ struct dt_io_thread { struct dt_io_list_item* io_list_iter; /** thread id, of the io thread */ ub_thread_type tid; + /** file descriptor that the thread writes to */ int fd; /** event structure that the thread uses */ @@ -112,6 +113,7 @@ struct dt_io_thread { int event_added_is_write; /** check for nonblocking connect errors on fd */ int check_nb_connect; + /** the buffer that currently getting written, or NULL if no * (partial) message written now */ void* cur_msg; @@ -131,6 +133,14 @@ struct dt_io_thread { /** the io thread wants to exit */ int want_to_exit; + /** the timer event for connection retries */ + void* reconnect_timer; + /** if the reconnect timer is added to the event base */ + int reconnect_is_added; + /** the current reconnection timeout, it is increased with + * exponential backoff, in msec */ + int reconnect_timeout; + /** If the log server is connected to over unix domain sockets, * eg. a file is named that is created to log onto. */ int upstream_is_unix;