dnstap io, windows portability improvements.

This commit is contained in:
W.C.A. Wijngaards 2020-01-28 14:24:14 +01:00
parent bb81684206
commit e13675d6cb
2 changed files with 37 additions and 15 deletions

View file

@ -49,6 +49,7 @@
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif
#include <fcntl.h>
/** number of messages to process in one output callback */
#define DTIO_MESSAGES_PER_CALLBACK 100
@ -549,7 +550,7 @@ static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
return 0;
}
}
ret = send(dtio->fd, buf, len, 0);
ret = send(dtio->fd, (void*)buf, len, 0);
if(ret == -1) {
#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
@ -719,7 +720,7 @@ static int dtio_check_close(struct dt_io_thread* dtio)
uint8_t buf[1024];
if(dtio->fd == -1) return 0;
while(1) {
r = recv(dtio->fd, buf, sizeof(buf), 0);
r = recv(dtio->fd, (void*)buf, sizeof(buf), 0);
if(r == -1) {
#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
@ -1169,6 +1170,7 @@ static int dtio_control_start_send(struct dt_io_thread* dtio)
/** open the output file descriptor */
static void dtio_open_output(struct dt_io_thread* dtio)
{
#ifdef HAVE_SYS_UN_H
struct ub_event* ev;
struct sockaddr_un s;
dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
@ -1243,6 +1245,7 @@ static void dtio_open_output(struct dt_io_thread* dtio)
dtio_reconnect_enable(dtio);
return;
}
#endif /* HAVE_SYS_UN_H */
}
/** the IO thread function for the DNSTAP IO */

View file

@ -80,6 +80,14 @@ static void usage(char* argv[])
/** long format option, for multiline printout per message */
static int longformat = 0;
/** main tap callback data */
struct main_tap_data {
/** the event base (to loopexit) */
struct ub_event_base* base;
/** the event (that is accept()ed) */
struct ub_event* ev;
};
/** tap callback variables */
struct tap_data {
/** the fd */
@ -252,12 +260,14 @@ static char* tv_to_str(protobuf_c_boolean has_time_sec, uint64_t time_sec,
{
char buf[64], buf2[256];
struct timeval tv;
time_t time_t_sec;
memset(&tv, 0, sizeof(tv));
if(has_time_sec) tv.tv_sec = time_sec;
if(has_time_nsec) tv.tv_usec = time_nsec;
buf[0]=0;
(void)ctime_r(&tv.tv_sec, buf);
time_t_sec = tv.tv_sec;
(void)ctime_r(&time_t_sec, buf);
snprintf(buf2, sizeof(buf2), "%u.%9.9u %s",
(unsigned)time_sec, (unsigned)time_nsec, buf);
return strdup(buf2);
@ -364,7 +374,7 @@ static void log_data_frame(uint8_t* pkt, size_t len)
/** receive bytes from fd, prints errors if bad,
* returns 0: closed/error, -1: continue, >0 number of bytes */
static ssize_t receive_bytes(int fd, void* buf, size_t len)
static ssize_t receive_bytes(int fd, void* buf, size_t len, struct ub_event* ev)
{
ssize_t ret = recv(fd, buf, len, 0);
if(ret == 0) {
@ -374,6 +384,7 @@ static ssize_t receive_bytes(int fd, void* buf, size_t len)
} else if(ret == -1) {
/* error */
#ifndef USE_WINSOCK
(void)ev;
if(errno == EINTR || errno == EAGAIN)
return -1;
log_err("could not recv: %s", strerror(errno));
@ -381,8 +392,7 @@ static ssize_t receive_bytes(int fd, void* buf, size_t len)
if(WSAGetLastError() == WSAEINPROGRESS)
return -1;
if(WSAGetLastError() == WSAEWOULDBLOCK) {
ub_winsock_tcp_wouldblock(data->ev,
UB_EV_READ);
ub_winsock_tcp_wouldblock(ev, UB_EV_READ);
return -1;
}
log_err("could not recv: %s",
@ -433,7 +443,7 @@ static int reply_with_accept(int fd)
strlen(DNSTAP_CONTENT_TYPE));
fd_set_block(fd);
if(send(fd, acceptframe, len, 0) == -1) {
if(send(fd, (void*)acceptframe, len, 0) == -1) {
#ifndef USE_WINSOCK
log_err("send failed: %s", strerror(errno));
#else
@ -468,7 +478,7 @@ static int reply_with_finish(int fd)
finishframe[2] = htonl(FSTRM_CONTROL_FRAME_FINISH);
fd_set_block(fd);
if(send(fd, finishframe, len, 0) == -1) {
if(send(fd, (void*)finishframe, len, 0) == -1) {
#ifndef USE_WINSOCK
log_err("send failed: %s", strerror(errno));
#else
@ -491,7 +501,8 @@ static void tap_callback(int fd, short ATTR_UNUSED(bits), void* arg)
while(data->len_done < 4) {
uint32_t l = (uint32_t)data->len;
ssize_t ret = receive_bytes(fd,
((uint8_t*)&l)+data->len_done, 4-data->len_done);
((uint8_t*)&l)+data->len_done, 4-data->len_done,
data->ev);
if(verbosity>=4) log_info("s recv %d", (int)ret);
if(ret == 0) {
/* closed or error */
@ -526,7 +537,7 @@ static void tap_callback(int fd, short ATTR_UNUSED(bits), void* arg)
/* we want to read the full length now */
if(data->data_done < data->len) {
ssize_t r = receive_bytes(fd, data->frame + data->data_done,
data->len - data->data_done);
data->len - data->data_done, data->ev);
if(verbosity>=4) log_info("f recv %d", (int)r);
if(r == 0) {
/* closed or error */
@ -576,10 +587,10 @@ static void tap_callback(int fd, short ATTR_UNUSED(bits), void* arg)
/** callback for main listening file descriptor */
void mainfdcallback(int fd, short ATTR_UNUSED(bits), void* arg)
{
struct main_tap_data* maindata = (struct main_tap_data*)arg;
struct tap_data* data;
struct sockaddr_storage addr;
socklen_t addrlen = (socklen_t)sizeof(addr);
struct ub_event_base* base = (struct ub_event_base*)arg;
int s = accept(fd, (struct sockaddr*)&addr, &addrlen);
if(s == -1) {
#ifndef USE_WINSOCK
@ -602,17 +613,18 @@ void mainfdcallback(int fd, short ATTR_UNUSED(bits), void* arg)
WSAGetLastError() == WSAECONNRESET)
return;
if(WSAGetLastError() == WSAEWOULDBLOCK) {
ub_winsock_tcp_wouldblock(c->ev->ev, UB_EV_READ);
ub_winsock_tcp_wouldblock(maindata->ev, UB_EV_READ);
return;
}
log_err_addr("accept failed", wsa_strerror(WSAGetLastError()),
addr, *addrlen);
&addr, addrlen);
#endif
return;
}
fd_set_nonblock(s);
if(verbosity) {
if(addr.ss_family == AF_LOCAL) {
#ifdef HAVE_SYS_UN_H
struct sockaddr_un* usock = calloc(1, sizeof(struct sockaddr_un) + 1);
if(usock) {
socklen_t ulen = sizeof(struct sockaddr_un);
@ -625,6 +637,7 @@ void mainfdcallback(int fd, short ATTR_UNUSED(bits), void* arg)
} else {
log_info("accepted new dnstap client");
}
#endif /* HAVE_SYS_UN_H */
} else {
log_info("accepted new dnstap client");
}
@ -633,7 +646,7 @@ void mainfdcallback(int fd, short ATTR_UNUSED(bits), void* arg)
data = calloc(1, sizeof(*data));
if(!data) fatal_exit("out of memory");
data->fd = s;
data->ev = ub_event_new(base, s, UB_EV_READ | UB_EV_PERSIST,
data->ev = ub_event_new(maindata->base, s, UB_EV_READ | UB_EV_PERSIST,
&tap_callback, data);
if(!data->ev) fatal_exit("could not ub_event_new");
if(ub_event_add(data->ev, NULL) != 0) fatal_exit("could not ub_event_add");
@ -653,20 +666,25 @@ setup_and_run(char* socketpath)
int fd;
time_t secs = 0;
struct timeval now;
struct main_tap_data* maindata;
struct ub_event_base* base;
const char *evnm="event", *evsys="", *evmethod="";
struct ub_event *ev;
maindata = calloc(1, sizeof(*maindata));
if(!maindata) fatal_exit("out of memory");
memset(&now, 0, sizeof(now));
base = ub_default_event_base(1, &secs, &now);
if(!base) fatal_exit("could not create ub_event base");
maindata->base = base;
fd = setup_fd(socketpath);
ub_get_event_sys(base, &evnm, &evsys, &evmethod);
if(verbosity) log_info("%s %s uses %s method", evnm, evsys, evmethod);
ev = ub_event_new(base, fd, UB_EV_READ | UB_EV_PERSIST,
&mainfdcallback, base);
&mainfdcallback, maindata);
if(!ev) fatal_exit("could not ub_event_new");
if(ub_event_add(ev, NULL) != 0) fatal_exit("could not ub_event_add");
maindata->ev = ev;
ub_event_base_dispatch(base);
@ -674,6 +692,7 @@ setup_and_run(char* socketpath)
ub_event_free(ev);
ub_event_base_free(base);
close(fd);
free(maindata);
}
/** getopt global, in case header files fail to declare it. */