From efc79beb2d8bae70c62fa4ca703587bd810590ff Mon Sep 17 00:00:00 2001 From: "W.C.A. Wijngaards" Date: Tue, 21 Jan 2020 17:01:25 +0100 Subject: [PATCH] iothread work. --- daemon/daemon.c | 3 +- daemon/worker.c | 23 +++- dnstap/.dtstream.h.swp | Bin 0 -> 28672 bytes dnstap/dnstap.c | 18 ++- dnstap/dnstap.h | 5 +- dnstap/dtstream.c | 288 +++++++++++++++++++++++++++++++++++++++++ dnstap/dtstream.h | 66 ++++++++++ 7 files changed, 398 insertions(+), 5 deletions(-) create mode 100644 dnstap/.dtstream.h.swp diff --git a/daemon/daemon.c b/daemon/daemon.c index 0b1200a2e..65d51e182 100644 --- a/daemon/daemon.c +++ b/daemon/daemon.c @@ -452,10 +452,9 @@ daemon_create_workers(struct daemon* daemon) if(daemon->cfg->dnstap) { #ifdef USE_DNSTAP daemon->dtenv = dt_create(daemon->cfg->dnstap_socket_path, - (unsigned int)daemon->num); + (unsigned int)daemon->num, daemon->cfg); if (!daemon->dtenv) fatal_exit("dt_create failed"); - dt_apply_cfg(daemon->dtenv, daemon->cfg); #else fatal_exit("dnstap enabled in config but not built with dnstap support"); #endif diff --git a/daemon/worker.c b/daemon/worker.c index 382bbd384..d14409570 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -78,6 +78,7 @@ #include "sldns/wire2str.h" #include "util/shm_side/shm_main.h" #include "dnscrypt/dnscrypt.h" +#include "dnstap/dtstream.h" #ifdef HAVE_SYS_TYPES_H # include @@ -1876,6 +1877,19 @@ worker_init(struct worker* worker, struct config_file *cfg, ) { auth_xfer_pickup_initial(worker->env.auth_zones, &worker->env); } +#ifdef USE_DNSTAP + if(worker->daemon->cfg->dnstap +#ifndef THREADS_DISABLED + && worker->thread_num == 0 +#endif + ) { + if(!dt_io_thread_start(dtenv->dtio)) { + log_err("could not start dnstap io thread"); + worker_delete(worker); + return 0; + } + } +#endif /* USE_DNSTAP */ if(!worker->env.mesh || !worker->env.scratch_buffer) { worker_delete(worker); return 0; @@ -1925,8 +1939,15 @@ worker_delete(struct worker* worker) } comm_base_delete(worker->base); #ifdef USE_DNSTAP - dt_deinit(&worker->dtenv); + if(worker->daemon->cfg->dnstap +#ifndef THREADS_DISABLED + && worker->thread_num == 0 #endif + ) { + dt_io_thread_stop(worker->dtenv.dtio); + } + dt_deinit(&worker->dtenv); +#endif /* USE_DNSTAP */ ub_randfree(worker->rndstate); alloc_clear(&worker->alloc); regional_destroy(worker->env.scratch); diff --git a/dnstap/.dtstream.h.swp b/dnstap/.dtstream.h.swp new file mode 100644 index 0000000000000000000000000000000000000000..22f3859addf6106a6718d567664f7a22a1b7e0ba GIT binary patch literal 28672 zcmeI5du(LaUB@SnHeH&8s#QhPKj383fbGuOn@vKiU7(q@$Lndmwlg!{2WdLFGjqqj z*_nGMbMLIzDSaeG4KzxVKnZD+mPj;}+KT*99u>4oLR$z>Qc*xWk|Nqvk*E+6grM=Izt_a{Rn5m;3!6|Ec$U z`;zN&mD_T;ji{gaJ$Z5FAFoZaW)x0`?a)u!UOncw9kRUN7C0048Z-T{9`(a!t>eXH zo@s`0;&o@5Nu2b2uQR>AL-y=Z#z4lvxiZjCT6^ApN$%=>SM8M|uGoFKdBf{h&$YBH zU&cVjK*m7EK*m7EK*m7EK*m7E!2d%AlJ2W=AHeSOo$YDheV+TA{`+e8dDm&uUFe>_ z%KaO-f7ATw^BdgraLfA(+~=FzzfWv=?>=V#WDH~sWDH~sWDH~sWDH~sWDH~sWDH~s zWDH~soC5=1Gne}b@;xnqI_Ljs{Qu&Q=5oIWejB_OoB~Hd1zZB2xHOmh68IwMfNQ{B z@ZC#tx%#fzp1vrTyBmBM+z#4c0z8Ab@hR|& z-~(VCybb&$cn0C(L2xIS2X6%zfeXO};AMo5KLn3}&w-DDTfuw4o57{vdBlxp!871H z;2*%J!9(C9paE9EkAW8uV!jE!3_b#G1642&-U==Q(pQf=VP%(@G80{|mjqsW(scYd z_SSr}(F>B?u9>MRldSuu-fy-19>02tY4m$NKTO)2X3bBMAY9Y5i67=o)H6$~i;E^` znJ~)j3c|$rCrMlLnoYlXU|60$H9>6jg_)Yk?K&9+%_*DQCA5jS&e3lq{hn`Y7i*Gr zZg@0JKlWqID)qF|;#)!6H%&io^nz}JL2C0zE@>F~l1Q`m>or?iE!pf+a*!6|>Ij;& zszo8b38J)OG1r=9nn^8)YV9EA-^A}skxDUXAdH~^yv3*z8bzm}M`A3Ae{5(YkU51Z2?*5fcE>74s)1d^D1O)v4% zTzTV##t$129%AN}D%HZuzzN5R?!;>~oT;}tWkUZ{;>wp_O%Q6iLbqBmWMYuX44KeA z=y>gR)bN|rLQX+Upyae5x>YhNE7PVCg^3r0v57)|i;1HQsiTTV4Csq^zGc{p zhf^=xJO{;4JBSlqP=Cm?K32W z&sh3|0UnVcH6Yfay>`rWC@hN2)isEnYw|_|}jOsY6_b5k2Z}I=<4<)0IT00()gIeHGhTl6k!=CF>yV`n^3HQSXG` za|6u{Y(``&SgXlge_(`^(W!X!`CvdE$XyYHjds83o8A2+XwL}E@$~v`t}`|LRv_cM zTB(+cg~PQ4(zZgLjy|F>GtwYw^{TyqxMfBV?^=@yx#cy2c93kQ6HL(Qw*3x%<^TN0r^%B$VMM>0%+A`l$t5p1MX|tV{tts-bvOHfsS|}Iu zrg)^dWJ>dDbAVYc~e~|nw8~BrF5{gSgIbEN0rst1v6JTTsTxz>z0a#7E6bU zOS46@TsF((C?6&Jq$!k3mD187Q(7{GrQ_ykf$Xc*wA9rK>7{~FMXXpX6>JrGb9A97 zg-E{H<)v!5Fk8(VcrDFVN0U)mb-7$M6Q!lu#nm}#l5$qc%k!mb<$&07aFzOtl?o+Q zi{Q8kr? z0^2k3h}q?pnmwN@z_dPdXENeFH;)kx_!Wt6e9jKaA}H@E<8fB=gh(5G5s$p8`1aZtTRQG41WG|_EF{Zt!$8=6&245#ieEGM@4wS*^ z_kxoobDJh+yEtkk8?q?NbjHFiRWAAbuJgZdLK)j`pf4>h6>)>Y!3uk0Ssv_bHWe)v z57r5G0==N#XU`h5AaABkU^bp1y9JUKGLS9Pa8t$RM+-=sW!Dk61Eu(8 zAQKOJZrLwTeJf%Iv>{WKtPE{0=wPcd4FOK@XhSbHp?ktfF+u4{Q zz1Q)R{bEulYKpVy{LCW88+8<1ra9z$5Kj#08*;myDPKg zBPppW8-an3@q5(8d2J~|g`}As@H^3%H2e1M+nYB_i(H~Ni*k$J6xhNk{}Oe?eh&vu zvu?XFZ}yn+CEE zPk~3ko!}0z0*#QvfQ+zq}+|RJZgNTfUt$eG9fx{$Dr$G}N03E5D>& zF*(|vZH<+fJ*|2)p*Yq~KEz7d>xUsb8%-_SF}5SR(5twaZh<1)ftR5wa5lnj>UvG| zi)gbUuVg1<9a&b}vJ>tM(m}lDXvI27H6&J}*NuRcmz`yoU6$Qj#y*K!Hg-!{9|>g5 zRHL=Fqu+SlZhN!F+p~8V67QESB~daO6pE)o}o`Pq$WqCNEUw6 z4#HjH%TBOmAhmiv-irdX;p5K}vUerZx;<4W& zjIf;W(oQVOlgMWMP=qMTR1MLWW>nMzWTJ5=GH&!aORSASR@Jo0F$$$6~;+MqOo35g{nFynY+OSQU6T7qHh#M+HelhebGB9-N8j@x~5m zJVq*N!|7ggtb)$wT*(ETZU7iL`O7mF30Se5trNSF0XhDe5ESC;edd zp4xkrcq){u-U&(K_M^q7;s;6OZ?a`mBfB}5jD?0xolf&jA4=`1lNQhu(KGNx4cBuy zw-k`^bVim7hl`lHRH`h9W!Gv&Vv81~D(gwF-Dfoy;-$9KjFA*BKW3(K@D?hx7Ubc% z<5I~r=TwQ!{t9(ypHs!v=TwE}e#IKZC8%QGIaVRrsYBxbcM#u4K;nNwSN-{CK7Sc} z2HXzh{SDv>a50ec|1W?C!7*?dxB#3F{*t)=QE(S{KR5_P3-}&!zvuy<2e*TpK>8V+G26(P<5epxNywuyY{EG9?tqk_9NqB)7ZRW$G)3csD^oF2FP^NXa_9YX2TQJ zR3s@kXzlzrl0wh0bn?^%YkJxquV^~mY`BurbmGqwcB|kejdhl&ag*e%+%|l2q?Vx_ zjuczOSXOuD)SOiHbF5lB8N6a1nKA1@Gw8A9M8x&lc3u>1Sx$?bnk`$2cEIVbh^r!L zik;ftlyyCswQJV++$cRWmh#%G_OX_o(!(-)K)81gw<&J0hk!)%xqY<1=Pk3)6~}*a?4a+7c~iX;Q8_Pu@A@pc*xj zPJyA3Cc573dYkQt%QF+o8UdEQ!!bE+MQBv&z>49vfNd7JD<6*Y)IRGuZhN8ikH4{6 zd8}{5COzjIbvL&q*{c>iH;a9CTOqo>4Fu_Qq@telB;z(gycS?AVdENQGN7QYn{Fm~+^}Ta#@K_)O~XQn_TMXK;28T}lR`=x&yQ zp#|3Uy|$WL_whJ8Xl(?Rc7WTM+oEh|492c)x6HHRdRmTD+Yf_1qm$5BFURSyA2mrm4(yR8)2 ztuG-SV$&F1%53arPGLaC^iHjRr4q(w0p-%Jh=X36AG>4VLN@Fi?MWu3andEXbVWKp z$nTA)BZq=kKWkxEXT~*Yx}o!09C6^Ov$)(6YWkeQF{mblDM^_6lazIRpD5a1HY1Rh zNzq;Y>dXB@uDHo^%NT8HY%x~Xy7AkQ%ndS7@i+TMoT+Wa_FaR~`q&+AC_KhPvsFSk}sRK%#|7|{;Yq1Kk(UDNVp8xnJaM?@i2-sB1)W3ZQQ-)ar?5O;q^ zFGjgdkvvwxGPc6*bl2}#0aJJyE8&E{c{T-*Y*>IO+6=^+txb)HUGK>3Jz|7C2xjQx zI5KpH)hF$^W*Ml-`M%yjLmksp;^?s)`fEGtd_5V`E3yS8=#*jC{pwh{bSrx2(l;dmMLrf+I>Er@GyWO)(y z*@@M35)xiy?2oimHf=*z?A~tZb;1)+5b1O6nn5R=Wp%SFZt)yeDI;ZjeZrZoP&RLfG@`}R1nEe*g<%zLD@kc{q@~Q1 z(U1Zsm8KMElQs}eYDrY{PX*~cSnc#wmnS-sX&QPYg|cZCW0}D*w5FGbM|#>`CMLa% zTNY-1lLB4xZosZzI1)NXp53tT#seDv$3*-;l(>`lKmGgsmx=Ko1s?_y`%i&4f|rT! z<=+1VN_;9~Ia#PweT z4}&|v&0r0*z;5te;(WOq@ay1R;2q#+z)QsWayK9X6W}G{`yYbe0UrbRf(4KR-z3ie z2DlGg53U5e!QT<*-vdqpx&QwI;{4A7A6yUQzW>*V^M4bhvHzR+JqKRA0G$Ec3yy&| zgV%uPiS7RaJPH03{0X=dc;E`4yi>&g!A~YJuz!^;oZGl%5(7&`CNZP|+gSXaNer39 zkVy<|DN=bblNg)|kx2}yrKPHLCNT`PkW6A2{lmpfVz3w1GKnFR7_34hlE(kM#DJi( zqspQJP&4YtpH^5Mh+8)r|9_O&{To2ye?n*dd74=N>)>H<4>$%6f&w@nNctxwPxg~B zkTH-kkTH-kkTH-kkTH-kkTH-kkTH-k@W0K#F8RJ7Ir-+^&l=aJrp*m|Z_9Z@-K!{mTwLq<3G{m6pZ@E_cxH?kwsBB!u`K YvbswG)((4bVdt-glRE<3mXrGa6Lv6hg8%>k literal 0 HcmV?d00001 diff --git a/dnstap/dnstap.c b/dnstap/dnstap.c index fccf4a721..e942c4194 100644 --- a/dnstap/dnstap.c +++ b/dnstap/dnstap.c @@ -130,7 +130,7 @@ check_socket_file(const char* socket_path) } struct dt_env * -dt_create(const char *socket_path, unsigned num_workers) +dt_create(const char *socket_path, unsigned num_workers, struct config_file* cfg) { #ifdef UNBOUND_DEBUG fstrm_res res; @@ -180,6 +180,16 @@ dt_create(const char *socket_path, unsigned num_workers) fstrm_unix_writer_options_destroy(&fuwopt); fstrm_writer_options_destroy(&fwopt); + env->dtio = dt_io_thread_create(); + if(!env->dtio) { + log_err("malloc failure"); + fstrm_writer_destroy(&fw); + fstrm_iothr_destroy(&env->iothr); + free(env); + return NULL; + } + dt_io_thread_apply_cfg(env->dtio, cfg); + dt_apply_cfg(env, cfg); return env; } @@ -275,12 +285,17 @@ dt_init(struct dt_env *env) log_err("malloc failure"); return 0; } + if(!dt_io_thread_register_queue(env->dtio, env->msgqueue)) { + log_err("malloc failure"); + return 0; + } return 1; } void dt_deinit(struct dt_env* env) { + dt_io_thread_unregister_queue(env->dtio, env->msgqueue); dt_msg_queue_delete(env->msgqueue); } @@ -291,6 +306,7 @@ dt_delete(struct dt_env *env) return; verbose(VERB_OPS, "closing dnstap socket"); fstrm_iothr_destroy(&env->iothr); + dt_io_thread_delete(env->dtio); free(env->identity); free(env->version); free(env); diff --git a/dnstap/dnstap.h b/dnstap/dnstap.h index 6183300b6..428691ed9 100644 --- a/dnstap/dnstap.h +++ b/dnstap/dnstap.h @@ -48,6 +48,8 @@ struct dt_msg_queue; struct dt_env { /** dnstap I/O thread */ struct fstrm_iothr *iothr; + /** the io thread (made by the struct daemon) */ + struct dt_io_thread* dtio; /** dnstap I/O thread input queue */ struct fstrm_iothr_queue *ioq; @@ -90,10 +92,11 @@ struct dt_env { * share access to the dnstap I/O socket. * @param socket_path: path to dnstap logging socket, must be non-NULL. * @param num_workers: number of worker threads, must be > 0. + * @param cfg: with config settings. * @return dt_env object, NULL on failure. */ struct dt_env * -dt_create(const char *socket_path, unsigned num_workers); +dt_create(const char *socket_path, unsigned num_workers, struct config_file* cfg); /** * Apply config settings. diff --git a/dnstap/dtstream.c b/dnstap/dtstream.c index 6d28f4c67..318e04bfc 100644 --- a/dnstap/dtstream.c +++ b/dnstap/dtstream.c @@ -43,6 +43,12 @@ #include "config.h" #include "dnstap/dtstream.h" +#include "util/config_file.h" +#include "util/ub_event.h" +#include "util/net_help.h" +#ifdef HAVE_SYS_UN_H +#include +#endif struct dt_msg_queue* dt_msg_queue_create(void) @@ -135,3 +141,285 @@ dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len) lock_basic_unlock(&mq->lock); } +struct dt_io_thread* dt_io_thread_create(void) +{ + struct dt_io_thread* dtio = calloc(1, sizeof(*dtio)); + return dtio; +} + +void dt_io_thread_delete(struct dt_io_thread* dtio) +{ + struct dt_io_list_item* item, *nextitem; + if(!dtio) return; + item=dtio->io_list; + while(item) { + nextitem = item->next; + free(item); + item = nextitem; + } + free(dtio->socket_path); + free(dtio); +} + +void dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg) +{ + dtio->upstream_is_unix = 1; + dtio->socket_path = strdup(cfg->dnstap_socket_path); +} + +int dt_io_thread_register_queue(struct dt_io_thread* dtio, + struct dt_msg_queue* mq) +{ + struct dt_io_list_item* item = malloc(sizeof(*item)); + if(!item) return 0; + item->queue = mq; + item->next = dtio->io_list; + dtio->io_list = item; + return 1; +} + +void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, + struct dt_msg_queue* mq) +{ + struct dt_io_list_item* item=dtio->io_list, *prev=NULL; + while(item) { + if(item->queue == mq) { + /* found it */ + if(prev) prev->next = item->next; + else dtio->io_list = item->next; + /* the queue itself only registered, not deleted */ + free(item); + return; + } + prev = item; + item = item->next; + } +} + +/** find a new message to write, search message queues, false if none */ +static int dtio_find_msg(struct dt_io_thread* dtio) +{ +} + +/** write more of the current messsage. false if incomplete, true if + * the message is done */ +static int dtio_write_more(struct dt_io_thread* dtio) +{ +} + +/** callback for the dnstap events, to write to the output */ +static void dtio_output_cb(int fd, short ATTR_UNUSED(bits), void* arg) +{ + struct dt_io_thread* dtio = (struct dt_io_thread*)arg; + + /* see if there are messages that need writing */ + if(!dtio->cur_msg) { + if(!dtio_find_msg(dtio)) + return; /* nothing to do */ + } + + /* write it */ + if(dtio->cur_msg_done < dtio->cur_msg_len) { + if(!dtio_write_more(dtio)) + return; + } + + /* done with the current message */ + free(dtio->cur_msg); + dtio->cur_msg = NULL; + dtio->cur_msg_len = 0; + dtio->cur_msg_done = 0; +} + +/** callback for the dnstap commandpipe, to stop the dnstap IO */ +static void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg) +{ + struct dt_io_thread* dtio = (struct dt_io_thread*)arg; + uint8_t cmd; + ssize_t r; + if(dtio->want_to_exit) + return; + r = read(fd, &cmd, sizeof(cmd)); + if(r == -1) { + if(errno == EINTR || errno == EAGAIN) + return; /* ignore this */ + log_err("dnstap io: failed to read: %s", strerror(errno)); + /* and then fall through to quit the thread */ + } + if(r == 0) { + verbose(VERB_ALGO, "dnstap io: cmd channel closed"); + } else if(r == 1 && cmd == 0) { + verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit"); + } + dtio->want_to_exit = 1; + if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base) + != 0) { + log_err("dnstap io: could not loopexit"); + } +} + +/** setup the event base for the dnstap io thread */ +static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs, + struct timeval* now) +{ + memset(now, 0, sizeof(*now)); + dtio->event_base = ub_default_event_base(0, secs, now); + if(!dtio->event_base) { + fatal_exit("dnstap io: could not create event_base"); + } +} + +/** setup the cmd event for dnstap io */ +static void dtio_setup_cmd(struct dt_io_thread* dtio) +{ + struct ub_event* cmdev; + fd_set_nonblock(dtio->commandpipe[0]); + cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0], + UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio); + if(!cmdev) { + fatal_exit("dnstap io: out of memory"); + } + dtio->command_event = cmdev; + if(ub_event_add(cmdev, NULL) != 0) { + fatal_exit("dnstap io: out of memory (adding event)"); + } +} + +/** del the output file descriptor event for listening */ +static void dtio_del_output_event(struct dt_io_thread* dtio) +{ + if(!dtio->event_added) + return; + ub_event_del(dtio->event); + dtio->event_added = 0; +} + +/** close and stop the output file descriptor event */ +static void dtio_close_output(struct dt_io_thread* dtio) +{ + if(!dtio->event) + return; + ub_event_free(dtio->event); + dtio->event = NULL; + close(dtio->fd); + dtio->fd = -1; +} + +/** perform desetup and free stuff when the dnstap io thread exits */ +static void dtio_desetup(struct dt_io_thread* dtio) +{ + dtio_del_output_event(dtio); + dtio_close_output(dtio); + ub_event_del(dtio->command_event); + ub_event_free(dtio->command_event); + close(dtio->commandpipe[0]); + dtio->commandpipe[0] = -1; + ub_event_base_free(dtio->event_base); +} + +/** open the output file descriptor */ +static void dtio_open_output(struct dt_io_thread* dtio) +{ + struct ub_event* ev; + struct sockaddr_un s; + dtio->fd = socket(AF_LOCAL, SOCK_STREAM, SOCK_CLOEXEC); + if(dtio->fd == -1) { + log_err("dnstap io: failed to create socket: %s", + strerror(errno)); + return; + } + memset(&s, 0, sizeof(s)); +#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN + /* this member exists on BSDs, not Linux */ + s.sun_len = (unsigned)sizeof(usock); +#endif + s.sun_family = AF_LOCAL; + /* length is 92-108, 104 on FreeBSD */ + (void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path)); + if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s)) + == -1) { + log_err("dnstap io: failed to connect: %s", strerror(errno)); + return; + } + fd_set_nonblock(dtio->fd); + + /* the EV_READ is to catch channel close, write to write packets */ + ev = ub_event_new(dtio->event_base, dtio->fd, + UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb, + dtio); + if(!ev) { + fatal_exit("dnstap io: out of memory"); + } + dtio->event = ev; + +} + +/** add the output file descriptor event for listening */ +static void dtio_add_output_event(struct dt_io_thread* dtio) +{ + if(ub_event_add(dtio->event, NULL) != 0) { + fatal_exit("dnstap io: out of memory (adding event)"); + } + dtio->event_added = 1; +} + +/** the IO thread function for the DNSTAP IO */ +static void* dnstap_io(void* arg) +{ + struct dt_io_thread* dtio = (struct dt_io_thread*)arg; + time_t secs = 0; + struct timeval now; + + /* setup */ + dtio_setup_base(dtio, &secs, &now); + dtio_setup_cmd(dtio); + dtio_open_output(dtio); + dtio_add_output_event(dtio); + verbose(VERB_ALGO, "start dnstap io thread"); + + /* run */ + if(ub_event_base_dispatch(dtio->event_base) < 0) { + log_err("dnstap io: dispatch failed, errno is %s", + strerror(errno)); + } + + /* cleanup */ + verbose(VERB_ALGO, "stop dnstap io thread"); + dtio_desetup(dtio); + return NULL; +} + +int dt_io_thread_start(struct dt_io_thread* dtio) +{ + /* set up the thread, can fail */ + if(pipe(dtio->commandpipe) == -1) { + log_err("failed to create pipe: %s", strerror(errno)); + return 0; + } + + /* start the thread */ + ub_thread_create(&dtio->tid, dnstap_io, dtio); + return 1; +} + +void dt_io_thread_stop(struct dt_io_thread* dtio) +{ + uint8_t cmd = 0; + if(!dtio) return; + if(!dtio->event_base) return; /* not started */ + + while(1) { + ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd)); + if(r == -1) { + if(errno == EINTR || errno == EAGAIN) + continue; + log_err("dnstap io stop: write: %s", strerror(errno)); + break; + } + break; + } + + close(dtio->commandpipe[1]); + dtio->commandpipe[1] = -1; + ub_thread_join(dtio->tid); +} diff --git a/dnstap/dtstream.h b/dnstap/dtstream.h index 6030b86d6..5d4e57646 100644 --- a/dnstap/dtstream.h +++ b/dnstap/dtstream.h @@ -47,6 +47,7 @@ #include "util/locks.h" struct dt_msg_entry; struct dt_io_list_item; +struct config_file; /** * A message buffer with dnstap messages queued up. It is per-worker. @@ -92,16 +93,29 @@ struct dt_io_thread { void* event_base; /** list of queues that is registered to get written */ struct dt_io_list_item* io_list; + /** thread id, of the io thread */ + ub_thread_type tid; /** file descriptor that the thread writes to */ int fd; /** event structure that the thread uses */ void* event; + /** the event is added */ + int event_added; + /** the buffer that currently getting written, or NULL if no + * (partial) message written now */ + void* cur_msg; + /** length of the current message */ + size_t cur_msg_len; + /** number of bytes written for the current message */ + size_t cur_msg_done; /** command pipe that stops the pipe if closed. Used to quit * the program. [0] is read, [1] is written to. */ int commandpipe[2]; /** the event to listen to the commandpipe */ void* command_event; + /** the io thread wants to exit */ + int want_to_exit; /** If the log server is connected to over unix domain sockets, * eg. a file is named that is created to log onto. */ @@ -236,4 +250,56 @@ void dt_msg_queue_delete(struct dt_msg_queue* mq); */ void dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len); +/** + * Create IO thread. + * @return new io thread object. not yet started. or NULL malloc failure. + */ +struct dt_io_thread* dt_io_thread_create(void); + +/** + * Delete the IO thread structure. + * @param dtio: the io thread that is deleted. It must not be running. + */ +void dt_io_thread_delete(struct dt_io_thread* dtio); + +/** + * Apply config to the dtio thread + * @param dtio: io thread, not yet started. + * @param cfg: config file struct. + */ +void dt_io_thread_apply_cfg(struct dt_io_thread* dtio, + struct config_file *cfg); + +/** + * Register a msg queue to the io thread. It will be polled to see if + * there are messages and those then get removed and sent, when the thread + * is running. + * @param dtio: the io thread. + * @param mq: message queue to register. + * @return false on failure (malloc failure). + */ +int dt_io_thread_register_queue(struct dt_io_thread* dtio, + struct dt_msg_queue* mq); + +/** + * Unregister queue from io thread. + * @param dtio: the io thread. + * @param mq: message queue. + */ +void dt_io_thread_unregister_queue(struct dt_io_thread* dtio, + struct dt_msg_queue* mq); + +/** + * Start the io thread + * @param dtio: the io thread. + * @return false on failure. + */ +int dt_io_thread_start(struct dt_io_thread* dtio); + +/** + * Stop the io thread + * @param dtio: the io thread. + */ +void dt_io_thread_stop(struct dt_io_thread* dtio); + #endif /* DTSTREAM_H */