From 34d75b1994bb7fb67a88da495a57ff87ecf8889a Mon Sep 17 00:00:00 2001 From: Wouter Wijngaards Date: Wed, 23 Jul 2008 09:23:03 +0000 Subject: [PATCH] - moved pipe actions to util/tube.c. easier porting and shared code. - check _raw() commpoint callbacks with fptr_wlist. - iana port update. git-svn-id: file:///svn/unbound/trunk@1163 be551aaa-1e26-0410-a405-d3ace91eadb9 --- daemon/daemon.c | 3 +- daemon/worker.c | 36 ++-- daemon/worker.h | 9 +- doc/Changelog | 5 + libunbound/context.h | 9 +- libunbound/libunbound.c | 86 +++------ libunbound/libworker.c | 282 +++--------------------------- libunbound/libworker.h | 75 +------- smallapp/worker_cb.c | 11 +- testcode/fake_event.c | 20 +++ util/fptr_wlist.c | 9 + util/fptr_wlist.h | 8 + util/iana_ports.inc | 1 + util/netevent.c | 1 + util/tube.c | 374 +++++++++++++++++++++++++++++++++++++--- util/tube.h | 167 ++++++++++++++++-- 16 files changed, 639 insertions(+), 457 deletions(-) diff --git a/daemon/daemon.c b/daemon/daemon.c index f34a1c0d7..00a32d3c5 100644 --- a/daemon/daemon.c +++ b/daemon/daemon.c @@ -357,8 +357,7 @@ daemon_stop_others(struct daemon* daemon) /* skip i=0, is this thread */ /* use i=0 buffer for sending cmds; because we are #0 */ for(i=1; inum; i++) { - worker_send_cmd(daemon->workers[i], - daemon->workers[0]->front->udp_buff, worker_cmd_quit); + worker_send_cmd(daemon->workers[i], worker_cmd_quit); } /* wait for them to quit */ for(i=1; inum; i++) { diff --git a/daemon/worker.c b/daemon/worker.c index d0e276c57..910ca1ecc 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -191,15 +191,10 @@ worker_mem_report(struct worker* ATTR_UNUSED(worker), } void -worker_send_cmd(struct worker* worker, ldns_buffer* buffer, - enum worker_commands cmd) +worker_send_cmd(struct worker* worker, enum worker_commands cmd) { - ldns_buffer_clear(buffer); - /* like DNS message, length data */ - ldns_buffer_write_u16(buffer, sizeof(uint32_t)); - ldns_buffer_write_u32(buffer, (uint32_t)cmd); - ldns_buffer_flip(buffer); - if(!tube_send_cmd(worker->cmd, buffer)) { + uint32_t c = (uint32_t)cmd; + if(!tube_write_msg(worker->cmd, (uint8_t*)&c, sizeof(c), 0)) { log_err("worker send cmd %d failed", (int)cmd); } } @@ -320,22 +315,23 @@ worker_check_request(ldns_buffer* pkt, struct worker* worker) } void -worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), ldns_buffer* buffer, - int error, void* arg) +worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), uint8_t* msg, + size_t len, int error, void* arg) { struct worker* worker = (struct worker*)arg; enum worker_commands cmd; if(error != NETEVENT_NOERROR) { + free(msg); if(error == NETEVENT_CLOSED) comm_base_exit(worker->base); else log_info("control event: %d", error); return; } - if(ldns_buffer_limit(buffer) != sizeof(uint32_t)) { - fatal_exit("bad control msg length %d", - (int)ldns_buffer_limit(buffer)); + if(len != sizeof(uint32_t)) { + fatal_exit("bad control msg length %d", (int)len); } - cmd = ldns_buffer_read_u32(buffer); + cmd = ldns_read_uint32(msg); + free(msg); switch(cmd) { case worker_cmd_quit: verbose(VERB_ALGO, "got control cmd quit"); @@ -998,9 +994,8 @@ worker_init(struct worker* worker, struct config_file *cfg, } if(worker->thread_num != 0) { /* start listening to commands */ - if(!tube_listen_cmd(worker->cmd, worker->base, - cfg->msg_buffer_size, &worker_handle_control_cmd, - worker)) { + if(!tube_setup_bg_listen(worker->cmd, worker->base, + &worker_handle_control_cmd, worker)) { log_err("could not create control compt."); worker_delete(worker); return 0; @@ -1179,6 +1174,13 @@ int libworker_handle_service_reply(struct comm_point* ATTR_UNUSED(c), return 0; } +void libworker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), + uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len), + int ATTR_UNUSED(error), void* ATTR_UNUSED(arg)) +{ + log_assert(0); +} + int context_query_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b)) { log_assert(0); diff --git a/daemon/worker.h b/daemon/worker.h index 2823d4003..20e11002f 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -150,11 +150,9 @@ void worker_delete(struct worker* worker); /** * Send a command to a worker. Uses blocking writes. * @param worker: worker to send command to. - * @param buffer: an empty buffer to use. * @param cmd: command to send. */ -void worker_send_cmd(struct worker* worker, ldns_buffer* buffer, - enum worker_commands cmd); +void worker_send_cmd(struct worker* worker, enum worker_commands cmd); /** * Worker signal handler function. User argument is the worker itself. @@ -199,11 +197,12 @@ struct outbound_entry* worker_send_query(uint8_t* qname, size_t qnamelen, /** * process control messages from the main thread. * @param tube: tube control message came on. - * @param buffer: buffer with message in it. + * @param msg: message contents. + * @param len: length of message. * @param error: if error (NETEVENT_*) happened. * @param arg: user argument */ -void worker_handle_control_cmd(struct tube* tube, ldns_buffer* buffer, +void worker_handle_control_cmd(struct tube* tube, uint8_t* msg, size_t len, int error, void* arg); /** handles callbacks from listening event interface */ diff --git a/doc/Changelog b/doc/Changelog index 5a277aa7f..054d390b8 100644 --- a/doc/Changelog +++ b/doc/Changelog @@ -1,3 +1,8 @@ +22 July 2008: Wouter + - moved pipe actions to util/tube.c. easier porting and shared code. + - check _raw() commpoint callbacks with fptr_wlist. + - iana port update. + 21 July 2008: Wouter - #198: nicer entropy warning message. manpage OS hints. diff --git a/libunbound/context.h b/libunbound/context.h index 13a6c4f36..e1542a6ea 100644 --- a/libunbound/context.h +++ b/libunbound/context.h @@ -47,6 +47,7 @@ #include "libunbound/unbound.h" #include "util/data/packed_rrset.h" struct libworker; +struct tube; /** * The context structure @@ -59,12 +60,12 @@ struct ub_ctx { /* --- pipes --- */ /** mutex on query write pipe */ lock_basic_t qqpipe_lock; - /** the query write pipe, [0] read from, [1] write on */ - int qqpipe[2]; + /** the query write pipe */ + struct tube* qq_pipe; /** mutex on result read pipe */ lock_basic_t rrpipe_lock; - /** the result read pipe, [0] read from, [1] write on */ - int rrpipe[2]; + /** the result read pipe */ + struct tube* rr_pipe; /* --- shared data --- */ /** mutex for access to env.cfg, finalized and dothread */ diff --git a/libunbound/libunbound.c b/libunbound/libunbound.c index 901e87033..380fc9e3d 100644 --- a/libunbound/libunbound.c +++ b/libunbound/libunbound.c @@ -54,6 +54,7 @@ #include "util/log.h" #include "util/random.h" #include "util/net_help.h" +#include "util/tube.h" #include "services/modstack.h" #include "services/localzone.h" #include "services/cache/infra.h" @@ -95,45 +96,28 @@ ub_ctx_create() return NULL; } seed = 0; - if(socketpair(AF_UNIX, SOCK_STREAM, 0, ctx->qqpipe) == -1) { - ub_randfree(ctx->seed_rnd); - free(ctx); - return NULL; - } - if(socketpair(AF_UNIX, SOCK_STREAM, 0, ctx->rrpipe) == -1) { + if((ctx->qq_pipe = tube_create()) == NULL) { int e = errno; - close(ctx->qqpipe[0]); - close(ctx->qqpipe[1]); ub_randfree(ctx->seed_rnd); free(ctx); errno = e; return NULL; } -#ifndef USE_WINSOCK - if(!fd_set_nonblock(ctx->rrpipe[0]) || - !fd_set_nonblock(ctx->rrpipe[1]) || - !fd_set_nonblock(ctx->qqpipe[0]) || - !fd_set_nonblock(ctx->qqpipe[1])) { + if((ctx->rr_pipe = tube_create()) == NULL) { int e = errno; - close(ctx->rrpipe[0]); - close(ctx->rrpipe[1]); - close(ctx->qqpipe[0]); - close(ctx->qqpipe[1]); + tube_delete(ctx->qq_pipe); ub_randfree(ctx->seed_rnd); free(ctx); errno = e; return NULL; } -#endif /* !USE_WINSOCK - it is a pipe(nonsocket) on windows) */ lock_basic_init(&ctx->qqpipe_lock); lock_basic_init(&ctx->rrpipe_lock); lock_basic_init(&ctx->cfglock); ctx->env = (struct module_env*)calloc(1, sizeof(*ctx->env)); if(!ctx->env) { - close(ctx->rrpipe[0]); - close(ctx->rrpipe[1]); - close(ctx->qqpipe[0]); - close(ctx->qqpipe[1]); + tube_delete(ctx->qq_pipe); + tube_delete(ctx->rr_pipe); ub_randfree(ctx->seed_rnd); free(ctx); errno = ENOMEM; @@ -141,10 +125,8 @@ ub_ctx_create() } ctx->env->cfg = config_create_forlib(); if(!ctx->env->cfg) { - close(ctx->rrpipe[0]); - close(ctx->rrpipe[1]); - close(ctx->qqpipe[0]); - close(ctx->qqpipe[1]); + tube_delete(ctx->qq_pipe); + tube_delete(ctx->rr_pipe); free(ctx->env); ub_randfree(ctx->seed_rnd); free(ctx); @@ -180,11 +162,11 @@ ub_ctx_delete(struct ub_ctx* ctx) uint32_t cmd = UB_LIBCMD_QUIT; lock_basic_unlock(&ctx->cfglock); lock_basic_lock(&ctx->qqpipe_lock); - (void)libworker_write_msg(ctx->qqpipe[1], (uint8_t*)&cmd, + (void)tube_write_msg(ctx->qq_pipe, (uint8_t*)&cmd, (uint32_t)sizeof(cmd), 0); lock_basic_unlock(&ctx->qqpipe_lock); lock_basic_lock(&ctx->rrpipe_lock); - while(libworker_read_msg(ctx->rrpipe[0], &msg, &len, 0)) { + while(tube_read_msg(ctx->rr_pipe, &msg, &len, 0)) { /* discard all results except a quit confirm */ if(context_serial_getcmd(msg, len) == UB_LIBCMD_QUIT) { free(msg); @@ -222,18 +204,8 @@ ub_ctx_delete(struct ub_ctx* ctx) lock_basic_destroy(&ctx->qqpipe_lock); lock_basic_destroy(&ctx->rrpipe_lock); lock_basic_destroy(&ctx->cfglock); - if(ctx->qqpipe[0] != -1) - close(ctx->qqpipe[0]); - if(ctx->qqpipe[1] != -1) - close(ctx->qqpipe[1]); - if(ctx->rrpipe[0] != -1) - close(ctx->rrpipe[0]); - if(ctx->rrpipe[1] != -1) - close(ctx->rrpipe[1]); - ctx->qqpipe[0] = -1; - ctx->qqpipe[1] = -1; - ctx->rrpipe[0] = -1; - ctx->rrpipe[1] = -1; + tube_delete(ctx->qq_pipe); + tube_delete(ctx->rr_pipe); if(ctx->env) { slabhash_delete(ctx->env->msg_cache); rrset_cache_delete(ctx->env->rrset_cache); @@ -376,35 +348,17 @@ ub_ctx_async(struct ub_ctx* ctx, int dothread) return UB_NOERROR; } -/** perform a select() on the result read pipe */ -static int -pollit(struct ub_ctx* ctx, struct timeval* t) -{ - fd_set r; -#ifndef S_SPLINT_S - FD_ZERO(&r); - FD_SET(FD_SET_T ctx->rrpipe[0], &r); -#endif - if(select(ctx->rrpipe[0]+1, &r, NULL, NULL, t) == -1) { - return 0; - } - errno = 0; - return FD_ISSET(ctx->rrpipe[0], &r); -} - int ub_poll(struct ub_ctx* ctx) { - struct timeval t; - memset(&t, 0, sizeof(t)); /* no need to hold lock while testing for readability. */ - return pollit(ctx, &t); + return tube_poll(ctx->rr_pipe); } int ub_fd(struct ub_ctx* ctx) { - return ctx->rrpipe[0]; + return tube_read_fd(ctx->rr_pipe); } /** process answer from bg worker */ @@ -501,7 +455,7 @@ ub_process(struct ub_ctx* ctx) while(1) { msg = NULL; lock_basic_lock(&ctx->rrpipe_lock); - r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1); + r = tube_read_msg(ctx->rr_pipe, &msg, &len, 1); lock_basic_unlock(&ctx->rrpipe_lock); if(r == 0) return UB_PIPE; @@ -527,7 +481,7 @@ ub_wait(struct ub_ctx* ctx) uint8_t* msg; uint32_t len; /* this is basically the same loop as _process(), but with changes. - * holds the rrpipe lock and waits with pollit */ + * holds the rrpipe lock and waits with tube_wait */ while(1) { lock_basic_lock(&ctx->rrpipe_lock); lock_basic_lock(&ctx->cfglock); @@ -544,9 +498,9 @@ ub_wait(struct ub_ctx* ctx) * o possibly decrementing num_async * do callback without lock */ - r = pollit(ctx, NULL); + r = tube_wait(ctx->rr_pipe); if(r) { - r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1); + r = tube_read_msg(ctx->rr_pipe, &msg, &len, 1); if(r == 0) { lock_basic_unlock(&ctx->rrpipe_lock); return UB_PIPE; @@ -667,7 +621,7 @@ ub_resolve_async(struct ub_ctx* ctx, char* name, int rrtype, lock_basic_unlock(&ctx->cfglock); lock_basic_lock(&ctx->qqpipe_lock); - if(!libworker_write_msg(ctx->qqpipe[1], msg, len, 0)) { + if(!tube_write_msg(ctx->qq_pipe, msg, len, 0)) { lock_basic_unlock(&ctx->qqpipe_lock); free(msg); return UB_PIPE; @@ -705,7 +659,7 @@ ub_cancel(struct ub_ctx* ctx, int async_id) } /* send cancel to background worker */ lock_basic_lock(&ctx->qqpipe_lock); - if(!libworker_write_msg(ctx->qqpipe[1], msg, len, 0)) { + if(!tube_write_msg(ctx->qq_pipe, msg, len, 0)) { lock_basic_unlock(&ctx->qqpipe_lock); free(msg); return UB_PIPE; diff --git a/libunbound/libworker.c b/libunbound/libworker.c index 629eb3f6b..a4fe7a74b 100644 --- a/libunbound/libworker.c +++ b/libunbound/libworker.c @@ -79,10 +79,7 @@ libworker_delete(struct libworker* w) ub_randfree(w->env->rnd); free(w->env); } - free(w->cmd_msg); outside_network_delete(w->back); - comm_point_delete(w->cmd_com); - comm_point_delete(w->res_com); comm_base_delete(w->base); free(w); } @@ -231,130 +228,19 @@ libworker_do_cmd(struct libworker* w, uint8_t* msg, uint32_t len) } /** handle control command coming into server */ -int -libworker_handle_control_cmd(struct comm_point* c, void* arg, - int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep)) +void +libworker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), + uint8_t* msg, size_t len, int err, void* arg) { struct libworker* w = (struct libworker*)arg; - ssize_t r; - if(w->cmd_read < sizeof(w->cmd_len)) { - /* complete reading the length of control msg */ - r = read(c->fd, ((uint8_t*)&w->cmd_len) + w->cmd_read, - sizeof(w->cmd_len) - w->cmd_read); - if(r==0) { - /* error has happened or */ - /* parent closed pipe, must have exited somehow */ - /* it is of no use to go on, exit */ - comm_base_exit(w->base); - return 0; - } - if(r==-1) { - if(errno != EAGAIN && errno != EINTR) { - log_err("rpipe error: %s", strerror(errno)); - } - /* nothing to read now, try later */ - return 0; - } - w->cmd_read += r; - if(w->cmd_read < sizeof(w->cmd_len)) { - /* not complete, try later */ - return 0; - } - w->cmd_msg = (uint8_t*)calloc(1, w->cmd_len); - if(!w->cmd_msg) { - log_err("malloc failure"); - w->cmd_read = 0; - return 0; - } - } - /* cmd_len has been read, read remainder */ - r = read(c->fd, w->cmd_msg + w->cmd_read - sizeof(w->cmd_len), - w->cmd_len - (w->cmd_read - sizeof(w->cmd_len))); - if(r==0) { - /* error has happened or */ - /* parent closed pipe, must have exited somehow */ + if(err != 0) { + free(msg); /* it is of no use to go on, exit */ comm_base_exit(w->base); - return 0; + return; } - if(r==-1) { - /* nothing to read now, try later */ - if(errno != EAGAIN && errno != EINTR) { - log_err("rpipe error: %s", strerror(errno)); - } - return 0; - } - w->cmd_read += r; - if(w->cmd_read < sizeof(w->cmd_len) + w->cmd_len) { - /* not complete, try later */ - return 0; - } - w->cmd_read = 0; - libworker_do_cmd(w, w->cmd_msg, w->cmd_len); /* also frees the buf */ - w->cmd_msg = NULL; - return 0; -} - -/** handle opportunity to write result back */ -int -libworker_handle_result_write(struct comm_point* c, void* arg, - int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep)) -{ - struct libworker* w = (struct libworker*)arg; - struct libworker_res_list* item = w->res_list; - ssize_t r; - if(!item) { - comm_point_stop_listening(c); - return 0; - } - if(w->res_write < sizeof(item->len)) { - r = write(c->fd, ((uint8_t*)&item->len) + w->res_write, - sizeof(item->len) - w->res_write); - if(r == -1) { - if(errno != EAGAIN && errno != EINTR) { - log_err("wpipe error: %s", strerror(errno)); - } - return 0; /* try again later */ - } - if(r == 0) { - /* error on pipe, must have exited somehow */ - /* it is of no use to go on, exit */ - comm_base_exit(w->base); - return 0; - } - w->res_write += r; - if(w->res_write < sizeof(item->len)) - return 0; - } - r = write(c->fd, item->buf + w->res_write - sizeof(item->len), - item->len - (w->res_write - sizeof(item->len))); - if(r == -1) { - if(errno != EAGAIN && errno != EINTR) { - log_err("wpipe error: %s", strerror(errno)); - } - return 0; /* try again later */ - } - if(r == 0) { - /* error on pipe, must have exited somehow */ - /* it is of no use to go on, exit */ - comm_base_exit(w->base); - return 0; - } - w->res_write += r; - if(w->res_write < sizeof(item->len) + item->len) - return 0; - /* done this result, remove it */ - free(item->buf); - item->buf = NULL; - w->res_list = w->res_list->next; - free(item); - if(!w->res_list) { - w->res_last = NULL; - comm_point_stop_listening(c); - } - w->res_write = 0; - return 0; + libworker_do_cmd(w, msg, len); /* also frees the buf */ } /** the background thread func */ @@ -363,7 +249,6 @@ libworker_dobg(void* arg) { /* setup */ uint32_t m; - int fd; struct libworker* w = (struct libworker*)arg; struct ub_ctx* ctx = w->ctx; log_thread_set(&w->thread_num); @@ -371,27 +256,20 @@ libworker_dobg(void* arg) /* we are forked */ w->is_bg_thread = 0; /* close non-used parts of the pipes */ - if(ctx->qqpipe[1] != -1) { - close(ctx->qqpipe[1]); - ctx->qqpipe[1] = -1; - } - if(ctx->rrpipe[0] != -1) { - close(ctx->rrpipe[0]); - ctx->rrpipe[0] = -1; - } + tube_close_write(ctx->qq_pipe); + tube_close_read(ctx->rr_pipe); #endif if(!w) { log_err("libunbound bg worker init failed, nomem"); return NULL; } - if(!(w->cmd_com=comm_point_create_raw(w->base, ctx->qqpipe[0], 0, - libworker_handle_control_cmd, w))) { - log_err("libunbound bg worker init failed, no cmdcom"); + if(!tube_setup_bg_listen(ctx->qq_pipe, w->base, + libworker_handle_control_cmd, w)) { + log_err("libunbound bg worker init failed, no bglisten"); return NULL; } - if(!(w->res_com=comm_point_create_raw(w->base, ctx->rrpipe[1], 1, - libworker_handle_result_write, w))) { - log_err("libunbound bg worker init failed, no rescom"); + if(!tube_setup_bg_write(ctx->rr_pipe, w->base)) { + log_err("libunbound bg worker init failed, no bgwrite"); return NULL; } @@ -399,14 +277,17 @@ libworker_dobg(void* arg) comm_base_dispatch(w->base); /* cleanup */ - fd = ctx->rrpipe[1]; - ctx->rrpipe[1] = -1; m = UB_LIBCMD_QUIT; + tube_remove_bg_listen(w->ctx->qq_pipe); + tube_remove_bg_write(w->ctx->rr_pipe); libworker_delete(w); - close(ctx->qqpipe[0]); - ctx->qqpipe[0] = -1; - (void)libworker_write_msg(fd, (uint8_t*)&m, (uint32_t)sizeof(m), 0); - close(fd); + (void)tube_write_msg(ctx->rr_pipe, (uint8_t*)&m, + (uint32_t)sizeof(m), 0); +#ifdef THREADS_DISABLED + /* close pipes from forked process before exit */ + tube_close_read(ctx->qq_pipe); + tube_close_write(ctx->rr_pipe); +#endif return NULL; } @@ -435,10 +316,8 @@ int libworker_bg(struct ub_ctx* ctx) w = libworker_setup(ctx, 1); if(!w) fatal_exit("out of memory"); /* close non-used parts of the pipes */ - close(ctx->qqpipe[1]); - close(ctx->rrpipe[0]); - ctx->qqpipe[1] = -1; - ctx->rrpipe[0] = -1; + tube_close_write(ctx->qq_pipe); + tube_close_read(ctx->rr_pipe); (void)libworker_dobg(w); exit(0); break; @@ -655,7 +534,6 @@ add_bg_result(struct libworker* w, struct ctx_query* q, ldns_buffer* pkt, { uint8_t* msg = NULL; uint32_t len = 0; - struct libworker_res_list* item; /* serialize and delete unneeded q */ if(w->is_bg_thread) { @@ -677,24 +555,10 @@ add_bg_result(struct libworker* w, struct ctx_query* q, ldns_buffer* pkt, log_err("out of memory for async answer"); return; } - item = (struct libworker_res_list*)malloc(sizeof(*item)); - if(!item) { - free(msg); + if(!tube_queue_item(w->ctx->rr_pipe, msg, len)) { log_err("out of memory for async answer"); return; } - item->buf = msg; - item->len = len; - item->next = NULL; - /* add at back of list, since the first one may be partially written */ - if(w->res_last) - w->res_last->next = item; - else w->res_list = item; - w->res_last = item; - if(w->res_list == w->res_last) { - /* first added item, start the write process */ - comm_point_start_listening(w->res_com, -1, -1); - } } /** callback with bg results */ @@ -873,100 +737,10 @@ libworker_handle_service_reply(struct comm_point* c, void* arg, int error, return 0; } -int -libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock) -{ - ssize_t r; - /* test */ - if(nonblock) { - r = write(fd, &len, sizeof(len)); - if(r == -1) { - if(errno==EINTR || errno==EAGAIN) - return -1; - log_err("msg write failed: %s", strerror(errno)); - return -1; /* can still continue, perhaps */ - } - } else r = 0; - if(!fd_set_block(fd)) - return 0; - /* write remainder */ - if(r != (ssize_t)sizeof(len)) { - if(write(fd, (char*)(&len)+r, sizeof(len)-r) == -1) { - log_err("msg write failed: %s", strerror(errno)); - (void)fd_set_nonblock(fd); - return 0; - } - } - if(write(fd, buf, len) == -1) { - log_err("msg write failed: %s", strerror(errno)); - (void)fd_set_nonblock(fd); - return 0; - } - if(!fd_set_nonblock(fd)) - return 0; - return 1; -} - -int -libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock) -{ - ssize_t r; - - /* test */ - *len = 0; - if(nonblock) { - r = read(fd, len, sizeof(*len)); - if(r == -1) { - if(errno==EINTR || errno==EAGAIN) - return -1; - log_err("msg read failed: %s", strerror(errno)); - return -1; /* we can still continue, perhaps */ - } - if(r == 0) /* EOF */ - return 0; - } else r = 0; - if(!fd_set_block(fd)) - return 0; - /* read remainder */ - if(r != (ssize_t)sizeof(*len)) { - if((r=read(fd, (char*)(len)+r, sizeof(*len)-r)) == -1) { - log_err("msg read failed: %s", strerror(errno)); - (void)fd_set_nonblock(fd); - return 0; - } - if(r == 0) /* EOF */ { - (void)fd_set_nonblock(fd); - return 0; - } - } - *buf = (uint8_t*)malloc(*len); - if(!*buf) { - log_err("out of memory"); - (void)fd_set_nonblock(fd); - return 0; - } - if((r=read(fd, *buf, *len)) == -1) { - log_err("msg read failed: %s", strerror(errno)); - (void)fd_set_nonblock(fd); - free(*buf); - return 0; - } - if(r == 0) { /* EOF */ - (void)fd_set_nonblock(fd); - free(*buf); - return 0; - } - if(!fd_set_nonblock(fd)) { - free(*buf); - return 0; - } - return 1; -} - /* --- fake callbacks for fptr_wlist to work --- */ void worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), - ldns_buffer* ATTR_UNUSED(buffer), int ATTR_UNUSED(error), - void* ATTR_UNUSED(arg)) + uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len), + int ATTR_UNUSED(error), void* ATTR_UNUSED(arg)) { log_assert(0); } diff --git a/libunbound/libworker.h b/libunbound/libworker.h index 6b2543717..224f26775 100644 --- a/libunbound/libworker.h +++ b/libunbound/libworker.h @@ -55,8 +55,8 @@ struct outbound_entry; struct module_qstate; struct comm_point; struct comm_reply; -struct libworker_res_list; struct regional; +struct tube; /** * The library-worker status structure @@ -81,37 +81,6 @@ struct libworker { struct outside_network* back; /** random() table for this worker. */ struct ub_randstate* rndstate; - - /** commpoint to listen to commands */ - struct comm_point* cmd_com; - /** are we currently reading a command, 0 if not, else bytecount */ - size_t cmd_read; - /** size of current read command, may be partially read */ - uint32_t cmd_len; - /** the current read command content, malloced, can be partially read*/ - uint8_t* cmd_msg; - - /** commpoint to write results back */ - struct comm_point* res_com; - /** are we curently writing a result, 0 if not, else bytecount into - * the res_list first entry. */ - size_t res_write; - /** list of outstanding results to be written back */ - struct libworker_res_list* res_list; - /** last in list */ - struct libworker_res_list* res_last; -}; - -/** - * List of results (arbitrary command serializations) to write back - */ -struct libworker_res_list { - /** next in list */ - struct libworker_res_list* next; - /** serialized buffer to write */ - uint8_t* buf; - /** length to write */ - uint32_t len; }; /** @@ -180,46 +149,12 @@ int libworker_handle_service_reply(struct comm_point* c, void* arg, int error, struct comm_reply* reply_info); /** handle control command coming into server */ -int libworker_handle_control_cmd(struct comm_point* c, void* arg, - int err, struct comm_reply* rep); +void libworker_handle_control_cmd(struct tube* tube, uint8_t* msg, size_t len, + int err, void* arg); /** handle opportunity to write result back */ -int libworker_handle_result_write(struct comm_point* c, void* arg, - int err, struct comm_reply* rep); - -/** - * Write length bytes followed by message. - * @param fd: the socket to write on. Is nonblocking. - * Set to blocking by the function, - * and back to non-blocking at exit of function. - * @param buf: the message. - * @param len: length of message. - * @param nonblock: if set to true, the first write is nonblocking. - * If the first write fails the function returns -1. - * If set false, the first write is blocking. - * @return: all remainder writes are nonblocking. - * return 0 on error, in that case blocking/nonblocking of socket is - * unknown. - * return 1 if all OK. - */ -int libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock); - -/** - * Read length bytes followed by message. - * @param fd: the socket to write on. Is nonblocking. - * Set to blocking by the function, - * and back to non-blocking at exit of function. - * @param buf: the message, malloced. - * @param len: length of message, returned. - * @param nonblock: if set to true, the first read is nonblocking. - * If the first read fails the function returns -1. - * If set false, the first read is blocking. - * @return: all remainder reads are nonblocking. - * return 0 on error, in that case blocking/nonblocking of socket is - * unknown. On EOF 0 is returned. - * return 1 if all OK. - */ -int libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock); +void libworker_handle_result_write(struct tube* tube, uint8_t* msg, size_t len, + int err, void* arg); /** * fill result from parsed message, on error fills servfail diff --git a/smallapp/worker_cb.c b/smallapp/worker_cb.c index 54d1f1fc8..fa3714666 100644 --- a/smallapp/worker_cb.c +++ b/smallapp/worker_cb.c @@ -48,8 +48,8 @@ struct module_qstate; struct tube; void worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), - ldns_buffer* ATTR_UNUSED(buffer), int ATTR_UNUSED(error), - void* ATTR_UNUSED(arg)) + uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len), + int ATTR_UNUSED(error), void* ATTR_UNUSED(arg)) { log_assert(0); } @@ -150,6 +150,13 @@ int libworker_handle_service_reply(struct comm_point* ATTR_UNUSED(c), return 0; } +void libworker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), + uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len), + int ATTR_UNUSED(error), void* ATTR_UNUSED(arg)) +{ + log_assert(0); +} + int context_query_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b)) { log_assert(0); diff --git a/testcode/fake_event.c b/testcode/fake_event.c index f68772918..dfc8cc8a2 100644 --- a/testcode/fake_event.c +++ b/testcode/fake_event.c @@ -945,6 +945,26 @@ struct comm_point* comm_point_create_local(struct comm_base* ATTR_UNUSED(base), return calloc(1, 1); } +struct comm_point* comm_point_create_raw(struct comm_base* ATTR_UNUSED(base), + int ATTR_UNUSED(fd), int ATTR_UNUSED(writing), + comm_point_callback_t* ATTR_UNUSED(callback), + void* ATTR_UNUSED(callback_arg)) +{ + /* no pipe comm possible */ + return calloc(1, 1); +} + +void comm_point_start_listening(struct comm_point* ATTR_UNUSED(c), + int ATTR_UNUSED(newfd), int ATTR_UNUSED(sec)) +{ + /* no bg write pipe comm possible */ +} + +void comm_point_stop_listening(struct comm_point* ATTR_UNUSED(c)) +{ + /* no bg write pipe comm possible */ +} + /* only cmd com _local gets deleted */ void comm_point_delete(struct comm_point* c) { diff --git a/util/fptr_wlist.c b/util/fptr_wlist.c index 98caef554..23013386a 100644 --- a/util/fptr_wlist.c +++ b/util/fptr_wlist.c @@ -80,6 +80,14 @@ fptr_whitelist_comm_point(comm_point_callback_t *fptr) return 0; } +int +fptr_whitelist_comm_point_raw(comm_point_callback_t *fptr) +{ + if(fptr == &tube_handle_listen) return 1; + else if(fptr == &tube_handle_write) return 1; + return 0; +} + int fptr_whitelist_comm_timer(void (*fptr)(void*)) { @@ -330,5 +338,6 @@ fptr_whitelist_alloc_cleanup(void (*fptr)(void*)) int fptr_whitelist_tube_listen(tube_callback_t* fptr) { if(fptr == &worker_handle_control_cmd) return 1; + else if(fptr == &libworker_handle_control_cmd) return 1; return 0; } diff --git a/util/fptr_wlist.h b/util/fptr_wlist.h index b8451260e..28507b5a2 100644 --- a/util/fptr_wlist.h +++ b/util/fptr_wlist.h @@ -77,6 +77,14 @@ */ int fptr_whitelist_comm_point(comm_point_callback_t *fptr); +/** + * Check function pointer whitelist for raw comm_point callback values. + * + * @param fptr: function pointer to check. + * @return false if not in whitelist. + */ +int fptr_whitelist_comm_point_raw(comm_point_callback_t *fptr); + /** * Check function pointer whitelist for comm_timer callback values. * diff --git a/util/iana_ports.inc b/util/iana_ports.inc index ae9c8a967..3c41790f8 100644 --- a/util/iana_ports.inc +++ b/util/iana_ports.inc @@ -5044,6 +5044,7 @@ 24249, 24321, 24386, +24465, 24554, 24677, 24678, diff --git a/util/netevent.c b/util/netevent.c index 47bec1863..8ca541109 100644 --- a/util/netevent.c +++ b/util/netevent.c @@ -943,6 +943,7 @@ void comm_point_raw_handle_callback(int ATTR_UNUSED(fd), log_assert(c->type == comm_raw); comm_base_now(c->ev->base); + fptr_ok(fptr_whitelist_comm_point_raw(c->callback)); (void)(*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, NULL); } diff --git a/util/tube.c b/util/tube.c index 4f58054f5..5022ee65f 100644 --- a/util/tube.c +++ b/util/tube.c @@ -49,19 +49,28 @@ struct tube* tube_create(void) { struct tube* tube = (struct tube*)calloc(1, sizeof(*tube)); int sv[2]; - if(!tube) return 0; + if(!tube) { + int err = errno; + log_err("tube_create: out of memory"); + errno = err; + return NULL; + } tube->sr = -1; tube->sw = -1; if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) { + int err = errno; log_err("socketpair: %s", strerror(errno)); free(tube); + errno = err; return NULL; } tube->sr = sv[0]; tube->sw = sv[1]; if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) { + int err = errno; log_err("tube: cannot set nonblocking"); tube_delete(tube); + errno = err; return NULL; } return tube; @@ -70,52 +79,375 @@ struct tube* tube_create(void) void tube_delete(struct tube* tube) { if(!tube) return; - if(tube->listen_com) { - comm_point_delete(tube->listen_com); - } + tube_remove_bg_listen(tube); + tube_remove_bg_write(tube); /* close fds after deleting commpoints, to be sure. * Also epoll does not like closing fd before event_del */ - if(tube->sr != -1) close(tube->sr); - if(tube->sw != -1) close(tube->sw); - tube->sr = -1; - tube->sw = -1; + tube_close_read(tube); + tube_close_write(tube); free(tube); } +void tube_close_read(struct tube* tube) +{ + if(tube->sr != -1) { + close(tube->sr); + tube->sr = -1; + } +} + +void tube_close_write(struct tube* tube) +{ + if(tube->sw != -1) { + close(tube->sw); + tube->sw = -1; + } +} + +void tube_remove_bg_listen(struct tube* tube) +{ + if(tube->listen_com) { + comm_point_delete(tube->listen_com); + tube->listen_com = NULL; + } + if(tube->cmd_msg) { + free(tube->cmd_msg); + tube->cmd_msg = NULL; + } +} + +void tube_remove_bg_write(struct tube* tube) +{ + if(tube->res_com) { + comm_point_delete(tube->res_com); + tube->res_com = NULL; + } + if(tube->res_list) { + struct tube_res_list* np, *p = tube->res_list; + tube->res_list = NULL; + tube->res_last = NULL; + while(p) { + np = p->next; + free(p->buf); + free(p); + p = np; + } + } +} + int tube_handle_listen(struct comm_point* c, void* arg, int error, struct comm_reply* ATTR_UNUSED(reply_info)) { struct tube* tube = (struct tube*)arg; + ssize_t r; if(error != NETEVENT_NOERROR) { fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); - (*tube->listen_cb)(tube, NULL, error, tube->listen_arg); + (*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg); return 0; } + + if(tube->cmd_read < sizeof(tube->cmd_len)) { + /* complete reading the length of control msg */ + r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read, + sizeof(tube->cmd_len) - tube->cmd_read); + if(r==0) { + /* error has happened or */ + /* parent closed pipe, must have exited somehow */ + fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); + (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, + tube->listen_arg); + return 0; + } + if(r==-1) { + if(errno != EAGAIN && errno != EINTR) { + log_err("rpipe error: %s", strerror(errno)); + } + /* nothing to read now, try later */ + return 0; + } + tube->cmd_read += r; + if(tube->cmd_read < sizeof(tube->cmd_len)) { + /* not complete, try later */ + return 0; + } + tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len); + if(!tube->cmd_msg) { + log_err("malloc failure"); + tube->cmd_read = 0; + return 0; + } + } + /* cmd_len has been read, read remainder */ + r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len), + tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len))); + if(r==0) { + /* error has happened or */ + /* parent closed pipe, must have exited somehow */ + fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); + (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, + tube->listen_arg); + return 0; + } + if(r==-1) { + /* nothing to read now, try later */ + if(errno != EAGAIN && errno != EINTR) { + log_err("rpipe error: %s", strerror(errno)); + } + return 0; + } + tube->cmd_read += r; + if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) { + /* not complete, try later */ + return 0; + } + tube->cmd_read = 0; + fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); - (*tube->listen_cb)(tube, c->buffer, error, tube->listen_arg); + (*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len, + NETEVENT_NOERROR, tube->listen_arg); + /* also frees the buf */ + tube->cmd_msg = NULL; return 0; } -int tube_listen_cmd(struct tube* tube, struct comm_base* base, - size_t msg_buffer_sz, tube_callback_t* cb, void* arg) +int +tube_handle_write(struct comm_point* c, void* arg, int error, + struct comm_reply* ATTR_UNUSED(reply_info)) { - tube->listen_cb = cb; - tube->listen_arg = arg; - if(!(tube->listen_com = comm_point_create_local(base, tube->sr, - msg_buffer_sz, tube_handle_listen, tube))) { - log_err("tube_listen_cmd: commpoint creation failed"); + struct tube* tube = (struct tube*)arg; + struct tube_res_list* item = tube->res_list; + ssize_t r; + if(error != NETEVENT_NOERROR) { + log_err("tube_handle_write net error %d", error); + return 0; + } + + if(!item) { + comm_point_stop_listening(c); + return 0; + } + + if(tube->res_write < sizeof(item->len)) { + r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write, + sizeof(item->len) - tube->res_write); + if(r == -1) { + if(errno != EAGAIN && errno != EINTR) { + log_err("wpipe error: %s", strerror(errno)); + } + return 0; /* try again later */ + } + if(r == 0) { + /* error on pipe, must have exited somehow */ + /* cannot signal this to pipe user */ + return 0; + } + tube->res_write += r; + if(tube->res_write < sizeof(item->len)) + return 0; + } + r = write(c->fd, item->buf + tube->res_write - sizeof(item->len), + item->len - (tube->res_write - sizeof(item->len))); + if(r == -1) { + if(errno != EAGAIN && errno != EINTR) { + log_err("wpipe error: %s", strerror(errno)); + } + return 0; /* try again later */ + } + if(r == 0) { + /* error on pipe, must have exited somehow */ + /* cannot signal this to pipe user */ + return 0; + } + tube->res_write += r; + if(tube->res_write < sizeof(item->len) + item->len) + return 0; + /* done this result, remove it */ + free(item->buf); + item->buf = NULL; + tube->res_list = tube->res_list->next; + free(item); + if(!tube->res_list) { + tube->res_last = NULL; + comm_point_stop_listening(c); + } + tube->res_write = 0; + return 0; +} + +int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, + int nonblock) +{ + ssize_t r; + int fd = tube->sw; + + /* test */ + if(nonblock) { + r = write(fd, &len, sizeof(len)); + if(r == -1) { + if(errno==EINTR || errno==EAGAIN) + return -1; + log_err("tube msg write failed: %s", strerror(errno)); + return -1; /* can still continue, perhaps */ + } + } else r = 0; + if(!fd_set_block(fd)) + return 0; + /* write remainder */ + if(r != (ssize_t)sizeof(len)) { + if(write(fd, (char*)(&len)+r, sizeof(len)-r) == -1) { + log_err("tube msg write failed: %s", strerror(errno)); + (void)fd_set_nonblock(fd); + return 0; + } + } + if(write(fd, buf, len) == -1) { + log_err("tube msg write failed: %s", strerror(errno)); + (void)fd_set_nonblock(fd); + return 0; + } + if(!fd_set_nonblock(fd)) + return 0; + return 1; +} + +int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, + int nonblock) +{ + ssize_t r; + int fd = tube->sr; + + /* test */ + *len = 0; + if(nonblock) { + r = read(fd, len, sizeof(*len)); + if(r == -1) { + if(errno==EINTR || errno==EAGAIN) + return -1; + log_err("tube msg read failed: %s", strerror(errno)); + return -1; /* we can still continue, perhaps */ + } + if(r == 0) /* EOF */ + return 0; + } else r = 0; + if(!fd_set_block(fd)) + return 0; + /* read remainder */ + if(r != (ssize_t)sizeof(*len)) { + if((r=read(fd, (char*)(len)+r, sizeof(*len)-r)) == -1) { + log_err("tube msg read failed: %s", strerror(errno)); + (void)fd_set_nonblock(fd); + return 0; + } + if(r == 0) /* EOF */ { + (void)fd_set_nonblock(fd); + return 0; + } + } + *buf = (uint8_t*)malloc(*len); + if(!*buf) { + log_err("tube read out of memory"); + (void)fd_set_nonblock(fd); + return 0; + } + if((r=read(fd, *buf, *len)) == -1) { + log_err("tube msg read failed: %s", strerror(errno)); + (void)fd_set_nonblock(fd); + free(*buf); + return 0; + } + if(r == 0) { /* EOF */ + (void)fd_set_nonblock(fd); + free(*buf); + return 0; + } + if(!fd_set_nonblock(fd)) { + free(*buf); return 0; } return 1; } -int tube_send_cmd(struct tube* tube, ldns_buffer* buffer) +/** perform a select() on the fd */ +static int +pollit(int fd, struct timeval* t) { - if(!write_socket(tube->sw, ldns_buffer_begin(buffer), - ldns_buffer_limit(buffer))) { - log_err("write socket: %s", strerror(errno)); + fd_set r; +#ifndef S_SPLINT_S + FD_ZERO(&r); + FD_SET(FD_SET_T fd, &r); +#endif + if(select(fd+1, &r, NULL, NULL, t) == -1) { + return 0; + } + errno = 0; + return FD_ISSET(fd, &r); +} + +int tube_poll(struct tube* tube) +{ + struct timeval t; + memset(&t, 0, sizeof(t)); + return pollit(tube->sr, &t); +} + +int tube_wait(struct tube* tube) +{ + return pollit(tube->sr, NULL); +} + +int tube_read_fd(struct tube* tube) +{ + return tube->sr; +} + +int tube_setup_bg_listen(struct tube* tube, struct comm_base* base, + tube_callback_t* cb, void* arg) +{ + tube->listen_cb = cb; + tube->listen_arg = arg; + if(!(tube->listen_com = comm_point_create_raw(base, tube->sr, + 0, tube_handle_listen, tube))) { + int err = errno; + log_err("tube_setup_bg_l: commpoint creation failed"); + errno = err; return 0; } return 1; } + +int tube_setup_bg_write(struct tube* tube, struct comm_base* base) +{ + if(!(tube->res_com = comm_point_create_raw(base, tube->sw, + 1, tube_handle_write, tube))) { + int err = errno; + log_err("tube_setup_bg_w: commpoint creation failed"); + errno = err; + return 0; + } + return 1; +} + +int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len) +{ + struct tube_res_list* item = + (struct tube_res_list*)malloc(sizeof(*item)); + if(!item) { + free(msg); + log_err("out of memory for async answer"); + return 0; + } + item->buf = msg; + item->len = len; + item->next = NULL; + /* add at back of list, since the first one may be partially written */ + if(tube->res_last) + tube->res_last->next = item; + else tube->res_list = item; + tube->res_last = item; + if(tube->res_list == tube->res_last) { + /* first added item, start the write process */ + comm_point_start_listening(tube->res_com, -1, -1); + } + return 1; +} diff --git a/util/tube.h b/util/tube.h index fecd1cf33..e3ddc850c 100644 --- a/util/tube.h +++ b/util/tube.h @@ -44,13 +44,14 @@ struct comm_reply; struct comm_base; struct tube; +struct tube_res_list; /** * Callback from pipe listen function - * void mycallback(tube, buffer, error, argument); - * if error is true (NETEVENT_*), buffer is probably NULL. + * void mycallback(tube, msg, len, error, user_argument); + * if error is true (NETEVENT_*), msg is probably NULL. */ -typedef void tube_callback_t(struct tube*, ldns_buffer*, int, void*); +typedef void tube_callback_t(struct tube*, uint8_t*, size_t, int, void*); /** * A pipe @@ -67,6 +68,34 @@ struct tube { tube_callback_t* listen_cb; /** listen callback user arg */ void* listen_arg; + /** are we currently reading a command, 0 if not, else bytecount */ + size_t cmd_read; + /** size of current read command, may be partially read */ + uint32_t cmd_len; + /** the current read command content, malloced, can be partially read*/ + uint8_t* cmd_msg; + + /** background write queue, commpoint to write results back */ + struct comm_point* res_com; + /** are we curently writing a result, 0 if not, else bytecount into + * the res_list first entry. */ + size_t res_write; + /** list of outstanding results to be written back */ + struct tube_res_list* res_list; + /** last in list */ + struct tube_res_list* res_last; +}; + +/** + * List of results (arbitrary command serializations) to write back + */ +struct tube_res_list { + /** next in list */ + struct tube_res_list* next; + /** serialized buffer to write */ + uint8_t* buf; + /** length to write */ + uint32_t len; }; /** @@ -82,30 +111,136 @@ struct tube* tube_create(void); void tube_delete(struct tube* tube); /** - * Start listening for information over the pipe + * Write length bytes followed by message. + * @param tube: the tube to write on. + * If that tube is a pipe, its write fd is used as + * the socket to write on. Is nonblocking. + * Set to blocking by the function, + * and back to non-blocking at exit of function. + * @param buf: the message. + * @param len: length of message. + * @param nonblock: if set to true, the first write is nonblocking. + * If the first write fails the function returns -1. + * If set false, the first write is blocking. + * @return: all remainder writes are nonblocking. + * return 0 on error, in that case blocking/nonblocking of socket is + * unknown. + * return 1 if all OK. + */ +int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, + int nonblock); + +/** + * Read length bytes followed by message. + * @param tube: The tube to read on. + * If that tube is a pipe, its read fd is used as + * the socket to read on. Is nonblocking. + * Set to blocking by the function, + * and back to non-blocking at exit of function. + * @param buf: the message, malloced. + * @param len: length of message, returned. + * @param nonblock: if set to true, the first read is nonblocking. + * If the first read fails the function returns -1. + * If set false, the first read is blocking. + * @return: all remainder reads are nonblocking. + * return 0 on error, in that case blocking/nonblocking of socket is + * unknown. On EOF 0 is returned. + * return 1 if all OK. + */ +int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, + int nonblock); + +/** + * Close read part of the pipe. + * The tube can no longer be read from. + * @param tube: tube to operate on. + */ +void tube_close_read(struct tube* tube); + +/** + * Close write part of the pipe. + * The tube can no longer be written to. + * @param tube: tube to operate on. + */ +void tube_close_write(struct tube* tube); + +/** + * See if data is ready for reading on the tube without blocking. + * @param tube: tube to check for readable items + * @return true if readable items are present. False if not (or error). + * true on pipe_closed. + */ +int tube_poll(struct tube* tube); + +/** + * Wait for data to be ready for reading on the tube. is blocking. + * No timeout. + * @param tube: the tube to wait on. + * @return: if there was something to read (false on error). + * true on pipe_closed. + */ +int tube_wait(struct tube* tube); + +/** + * Get FD that is readable when new information arrives. + * @param tube + * @return file descriptor. + */ +int tube_read_fd(struct tube* tube); + +/** + * Start listening for information over the pipe. + * Background registration of a read listener, callback when read completed. + * Do not mix with tube_read_msg style direct reads from the pipe. * @param tube: tube to listen on * @param base: what base to register event callback. - * @param msg_buffer_sz: what message buffer size to use. * @param cb: callback routine. * @param arg: user argument for callback routine. * @return true if successful, false on error. */ -int tube_listen_cmd(struct tube* tube, struct comm_base* base, - size_t msg_buffer_sz, tube_callback_t* cb, void* arg); +int tube_setup_bg_listen(struct tube* tube, struct comm_base* base, + tube_callback_t* cb, void* arg); /** - * Send a command over a pipe, blocking operation. - * @param tube: tube to send the info on. - * @param buffer: buffer to send. starts with network order uint16 with - * length of remainder of buffer. - * The receiver does not receive the length uint16 in the buffer - * (the buffer is sized appropriately). - * @return 0 on error, true on success. + * Remove bg listen setup from event base. + * @param tube: what tube to cleanup */ -int tube_send_cmd(struct tube* tube, ldns_buffer* buffer); +void tube_remove_bg_listen(struct tube* tube); -/** decl for fptr_wlist of tube pipe listen handler */ +/** + * Start background write handler for the pipe. + * Do not mix with tube_write_msg style direct writes to the pipe. + * @param tube: tube to write on + * @param base: what base to register event handler on. + * @return true if successful, false on error. + */ +int tube_setup_bg_write(struct tube* tube, struct comm_base* base); + +/** + * Remove bg write setup from event base. + * @param tube: what tube to cleanup + */ +void tube_remove_bg_write(struct tube* tube); + + +/** + * Append data item to background list of writes. + * Mallocs a list entry behind the scenes. + * Not locked behind the scenes, call from one thread or lock on outside. + * @param tube: what tube to queue on. + * @param msg: memory message to send. Is free()d after use. + * Put at the end of the to-send queue. + * @param len: length of item. + * @return 0 on failure (msg freed). + */ +int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len); + +/** for fptr wlist, callback function */ int tube_handle_listen(struct comm_point* c, void* arg, int error, struct comm_reply* reply_info); +/** for fptr wlist, callback function */ +int tube_handle_write(struct comm_point* c, void* arg, int error, + struct comm_reply* reply_info); + #endif /* UTIL_TUBE_H */