- moved pipe actions to util/tube.c. easier porting and shared code.

- check _raw() commpoint callbacks with fptr_wlist.
- iana port update.


git-svn-id: file:///svn/unbound/trunk@1163 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
Wouter Wijngaards 2008-07-23 09:23:03 +00:00
parent b61f24e8d3
commit 34d75b1994
16 changed files with 639 additions and 457 deletions

View file

@ -357,8 +357,7 @@ daemon_stop_others(struct daemon* daemon)
/* skip i=0, is this thread */ /* skip i=0, is this thread */
/* use i=0 buffer for sending cmds; because we are #0 */ /* use i=0 buffer for sending cmds; because we are #0 */
for(i=1; i<daemon->num; i++) { for(i=1; i<daemon->num; i++) {
worker_send_cmd(daemon->workers[i], worker_send_cmd(daemon->workers[i], worker_cmd_quit);
daemon->workers[0]->front->udp_buff, worker_cmd_quit);
} }
/* wait for them to quit */ /* wait for them to quit */
for(i=1; i<daemon->num; i++) { for(i=1; i<daemon->num; i++) {

View file

@ -191,15 +191,10 @@ worker_mem_report(struct worker* ATTR_UNUSED(worker),
} }
void void
worker_send_cmd(struct worker* worker, ldns_buffer* buffer, worker_send_cmd(struct worker* worker, enum worker_commands cmd)
enum worker_commands cmd)
{ {
ldns_buffer_clear(buffer); uint32_t c = (uint32_t)cmd;
/* like DNS message, length data */ if(!tube_write_msg(worker->cmd, (uint8_t*)&c, sizeof(c), 0)) {
ldns_buffer_write_u16(buffer, sizeof(uint32_t));
ldns_buffer_write_u32(buffer, (uint32_t)cmd);
ldns_buffer_flip(buffer);
if(!tube_send_cmd(worker->cmd, buffer)) {
log_err("worker send cmd %d failed", (int)cmd); log_err("worker send cmd %d failed", (int)cmd);
} }
} }
@ -320,22 +315,23 @@ worker_check_request(ldns_buffer* pkt, struct worker* worker)
} }
void void
worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), ldns_buffer* buffer, worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), uint8_t* msg,
int error, void* arg) size_t len, int error, void* arg)
{ {
struct worker* worker = (struct worker*)arg; struct worker* worker = (struct worker*)arg;
enum worker_commands cmd; enum worker_commands cmd;
if(error != NETEVENT_NOERROR) { if(error != NETEVENT_NOERROR) {
free(msg);
if(error == NETEVENT_CLOSED) if(error == NETEVENT_CLOSED)
comm_base_exit(worker->base); comm_base_exit(worker->base);
else log_info("control event: %d", error); else log_info("control event: %d", error);
return; return;
} }
if(ldns_buffer_limit(buffer) != sizeof(uint32_t)) { if(len != sizeof(uint32_t)) {
fatal_exit("bad control msg length %d", fatal_exit("bad control msg length %d", (int)len);
(int)ldns_buffer_limit(buffer));
} }
cmd = ldns_buffer_read_u32(buffer); cmd = ldns_read_uint32(msg);
free(msg);
switch(cmd) { switch(cmd) {
case worker_cmd_quit: case worker_cmd_quit:
verbose(VERB_ALGO, "got control cmd quit"); verbose(VERB_ALGO, "got control cmd quit");
@ -998,9 +994,8 @@ worker_init(struct worker* worker, struct config_file *cfg,
} }
if(worker->thread_num != 0) { if(worker->thread_num != 0) {
/* start listening to commands */ /* start listening to commands */
if(!tube_listen_cmd(worker->cmd, worker->base, if(!tube_setup_bg_listen(worker->cmd, worker->base,
cfg->msg_buffer_size, &worker_handle_control_cmd, &worker_handle_control_cmd, worker)) {
worker)) {
log_err("could not create control compt."); log_err("could not create control compt.");
worker_delete(worker); worker_delete(worker);
return 0; return 0;
@ -1179,6 +1174,13 @@ int libworker_handle_service_reply(struct comm_point* ATTR_UNUSED(c),
return 0; return 0;
} }
void libworker_handle_control_cmd(struct tube* ATTR_UNUSED(tube),
uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len),
int ATTR_UNUSED(error), void* ATTR_UNUSED(arg))
{
log_assert(0);
}
int context_query_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b)) int context_query_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b))
{ {
log_assert(0); log_assert(0);

View file

@ -150,11 +150,9 @@ void worker_delete(struct worker* worker);
/** /**
* Send a command to a worker. Uses blocking writes. * Send a command to a worker. Uses blocking writes.
* @param worker: worker to send command to. * @param worker: worker to send command to.
* @param buffer: an empty buffer to use.
* @param cmd: command to send. * @param cmd: command to send.
*/ */
void worker_send_cmd(struct worker* worker, ldns_buffer* buffer, void worker_send_cmd(struct worker* worker, enum worker_commands cmd);
enum worker_commands cmd);
/** /**
* Worker signal handler function. User argument is the worker itself. * Worker signal handler function. User argument is the worker itself.
@ -199,11 +197,12 @@ struct outbound_entry* worker_send_query(uint8_t* qname, size_t qnamelen,
/** /**
* process control messages from the main thread. * process control messages from the main thread.
* @param tube: tube control message came on. * @param tube: tube control message came on.
* @param buffer: buffer with message in it. * @param msg: message contents.
* @param len: length of message.
* @param error: if error (NETEVENT_*) happened. * @param error: if error (NETEVENT_*) happened.
* @param arg: user argument * @param arg: user argument
*/ */
void worker_handle_control_cmd(struct tube* tube, ldns_buffer* buffer, void worker_handle_control_cmd(struct tube* tube, uint8_t* msg, size_t len,
int error, void* arg); int error, void* arg);
/** handles callbacks from listening event interface */ /** handles callbacks from listening event interface */

View file

@ -1,3 +1,8 @@
22 July 2008: Wouter
- moved pipe actions to util/tube.c. easier porting and shared code.
- check _raw() commpoint callbacks with fptr_wlist.
- iana port update.
21 July 2008: Wouter 21 July 2008: Wouter
- #198: nicer entropy warning message. manpage OS hints. - #198: nicer entropy warning message. manpage OS hints.

View file

@ -47,6 +47,7 @@
#include "libunbound/unbound.h" #include "libunbound/unbound.h"
#include "util/data/packed_rrset.h" #include "util/data/packed_rrset.h"
struct libworker; struct libworker;
struct tube;
/** /**
* The context structure * The context structure
@ -59,12 +60,12 @@ struct ub_ctx {
/* --- pipes --- */ /* --- pipes --- */
/** mutex on query write pipe */ /** mutex on query write pipe */
lock_basic_t qqpipe_lock; lock_basic_t qqpipe_lock;
/** the query write pipe, [0] read from, [1] write on */ /** the query write pipe */
int qqpipe[2]; struct tube* qq_pipe;
/** mutex on result read pipe */ /** mutex on result read pipe */
lock_basic_t rrpipe_lock; lock_basic_t rrpipe_lock;
/** the result read pipe, [0] read from, [1] write on */ /** the result read pipe */
int rrpipe[2]; struct tube* rr_pipe;
/* --- shared data --- */ /* --- shared data --- */
/** mutex for access to env.cfg, finalized and dothread */ /** mutex for access to env.cfg, finalized and dothread */

View file

@ -54,6 +54,7 @@
#include "util/log.h" #include "util/log.h"
#include "util/random.h" #include "util/random.h"
#include "util/net_help.h" #include "util/net_help.h"
#include "util/tube.h"
#include "services/modstack.h" #include "services/modstack.h"
#include "services/localzone.h" #include "services/localzone.h"
#include "services/cache/infra.h" #include "services/cache/infra.h"
@ -95,45 +96,28 @@ ub_ctx_create()
return NULL; return NULL;
} }
seed = 0; seed = 0;
if(socketpair(AF_UNIX, SOCK_STREAM, 0, ctx->qqpipe) == -1) { if((ctx->qq_pipe = tube_create()) == NULL) {
ub_randfree(ctx->seed_rnd);
free(ctx);
return NULL;
}
if(socketpair(AF_UNIX, SOCK_STREAM, 0, ctx->rrpipe) == -1) {
int e = errno; int e = errno;
close(ctx->qqpipe[0]);
close(ctx->qqpipe[1]);
ub_randfree(ctx->seed_rnd); ub_randfree(ctx->seed_rnd);
free(ctx); free(ctx);
errno = e; errno = e;
return NULL; return NULL;
} }
#ifndef USE_WINSOCK if((ctx->rr_pipe = tube_create()) == NULL) {
if(!fd_set_nonblock(ctx->rrpipe[0]) ||
!fd_set_nonblock(ctx->rrpipe[1]) ||
!fd_set_nonblock(ctx->qqpipe[0]) ||
!fd_set_nonblock(ctx->qqpipe[1])) {
int e = errno; int e = errno;
close(ctx->rrpipe[0]); tube_delete(ctx->qq_pipe);
close(ctx->rrpipe[1]);
close(ctx->qqpipe[0]);
close(ctx->qqpipe[1]);
ub_randfree(ctx->seed_rnd); ub_randfree(ctx->seed_rnd);
free(ctx); free(ctx);
errno = e; errno = e;
return NULL; return NULL;
} }
#endif /* !USE_WINSOCK - it is a pipe(nonsocket) on windows) */
lock_basic_init(&ctx->qqpipe_lock); lock_basic_init(&ctx->qqpipe_lock);
lock_basic_init(&ctx->rrpipe_lock); lock_basic_init(&ctx->rrpipe_lock);
lock_basic_init(&ctx->cfglock); lock_basic_init(&ctx->cfglock);
ctx->env = (struct module_env*)calloc(1, sizeof(*ctx->env)); ctx->env = (struct module_env*)calloc(1, sizeof(*ctx->env));
if(!ctx->env) { if(!ctx->env) {
close(ctx->rrpipe[0]); tube_delete(ctx->qq_pipe);
close(ctx->rrpipe[1]); tube_delete(ctx->rr_pipe);
close(ctx->qqpipe[0]);
close(ctx->qqpipe[1]);
ub_randfree(ctx->seed_rnd); ub_randfree(ctx->seed_rnd);
free(ctx); free(ctx);
errno = ENOMEM; errno = ENOMEM;
@ -141,10 +125,8 @@ ub_ctx_create()
} }
ctx->env->cfg = config_create_forlib(); ctx->env->cfg = config_create_forlib();
if(!ctx->env->cfg) { if(!ctx->env->cfg) {
close(ctx->rrpipe[0]); tube_delete(ctx->qq_pipe);
close(ctx->rrpipe[1]); tube_delete(ctx->rr_pipe);
close(ctx->qqpipe[0]);
close(ctx->qqpipe[1]);
free(ctx->env); free(ctx->env);
ub_randfree(ctx->seed_rnd); ub_randfree(ctx->seed_rnd);
free(ctx); free(ctx);
@ -180,11 +162,11 @@ ub_ctx_delete(struct ub_ctx* ctx)
uint32_t cmd = UB_LIBCMD_QUIT; uint32_t cmd = UB_LIBCMD_QUIT;
lock_basic_unlock(&ctx->cfglock); lock_basic_unlock(&ctx->cfglock);
lock_basic_lock(&ctx->qqpipe_lock); lock_basic_lock(&ctx->qqpipe_lock);
(void)libworker_write_msg(ctx->qqpipe[1], (uint8_t*)&cmd, (void)tube_write_msg(ctx->qq_pipe, (uint8_t*)&cmd,
(uint32_t)sizeof(cmd), 0); (uint32_t)sizeof(cmd), 0);
lock_basic_unlock(&ctx->qqpipe_lock); lock_basic_unlock(&ctx->qqpipe_lock);
lock_basic_lock(&ctx->rrpipe_lock); lock_basic_lock(&ctx->rrpipe_lock);
while(libworker_read_msg(ctx->rrpipe[0], &msg, &len, 0)) { while(tube_read_msg(ctx->rr_pipe, &msg, &len, 0)) {
/* discard all results except a quit confirm */ /* discard all results except a quit confirm */
if(context_serial_getcmd(msg, len) == UB_LIBCMD_QUIT) { if(context_serial_getcmd(msg, len) == UB_LIBCMD_QUIT) {
free(msg); free(msg);
@ -222,18 +204,8 @@ ub_ctx_delete(struct ub_ctx* ctx)
lock_basic_destroy(&ctx->qqpipe_lock); lock_basic_destroy(&ctx->qqpipe_lock);
lock_basic_destroy(&ctx->rrpipe_lock); lock_basic_destroy(&ctx->rrpipe_lock);
lock_basic_destroy(&ctx->cfglock); lock_basic_destroy(&ctx->cfglock);
if(ctx->qqpipe[0] != -1) tube_delete(ctx->qq_pipe);
close(ctx->qqpipe[0]); tube_delete(ctx->rr_pipe);
if(ctx->qqpipe[1] != -1)
close(ctx->qqpipe[1]);
if(ctx->rrpipe[0] != -1)
close(ctx->rrpipe[0]);
if(ctx->rrpipe[1] != -1)
close(ctx->rrpipe[1]);
ctx->qqpipe[0] = -1;
ctx->qqpipe[1] = -1;
ctx->rrpipe[0] = -1;
ctx->rrpipe[1] = -1;
if(ctx->env) { if(ctx->env) {
slabhash_delete(ctx->env->msg_cache); slabhash_delete(ctx->env->msg_cache);
rrset_cache_delete(ctx->env->rrset_cache); rrset_cache_delete(ctx->env->rrset_cache);
@ -376,35 +348,17 @@ ub_ctx_async(struct ub_ctx* ctx, int dothread)
return UB_NOERROR; return UB_NOERROR;
} }
/** perform a select() on the result read pipe */
static int
pollit(struct ub_ctx* ctx, struct timeval* t)
{
fd_set r;
#ifndef S_SPLINT_S
FD_ZERO(&r);
FD_SET(FD_SET_T ctx->rrpipe[0], &r);
#endif
if(select(ctx->rrpipe[0]+1, &r, NULL, NULL, t) == -1) {
return 0;
}
errno = 0;
return FD_ISSET(ctx->rrpipe[0], &r);
}
int int
ub_poll(struct ub_ctx* ctx) ub_poll(struct ub_ctx* ctx)
{ {
struct timeval t;
memset(&t, 0, sizeof(t));
/* no need to hold lock while testing for readability. */ /* no need to hold lock while testing for readability. */
return pollit(ctx, &t); return tube_poll(ctx->rr_pipe);
} }
int int
ub_fd(struct ub_ctx* ctx) ub_fd(struct ub_ctx* ctx)
{ {
return ctx->rrpipe[0]; return tube_read_fd(ctx->rr_pipe);
} }
/** process answer from bg worker */ /** process answer from bg worker */
@ -501,7 +455,7 @@ ub_process(struct ub_ctx* ctx)
while(1) { while(1) {
msg = NULL; msg = NULL;
lock_basic_lock(&ctx->rrpipe_lock); lock_basic_lock(&ctx->rrpipe_lock);
r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1); r = tube_read_msg(ctx->rr_pipe, &msg, &len, 1);
lock_basic_unlock(&ctx->rrpipe_lock); lock_basic_unlock(&ctx->rrpipe_lock);
if(r == 0) if(r == 0)
return UB_PIPE; return UB_PIPE;
@ -527,7 +481,7 @@ ub_wait(struct ub_ctx* ctx)
uint8_t* msg; uint8_t* msg;
uint32_t len; uint32_t len;
/* this is basically the same loop as _process(), but with changes. /* this is basically the same loop as _process(), but with changes.
* holds the rrpipe lock and waits with pollit */ * holds the rrpipe lock and waits with tube_wait */
while(1) { while(1) {
lock_basic_lock(&ctx->rrpipe_lock); lock_basic_lock(&ctx->rrpipe_lock);
lock_basic_lock(&ctx->cfglock); lock_basic_lock(&ctx->cfglock);
@ -544,9 +498,9 @@ ub_wait(struct ub_ctx* ctx)
* o possibly decrementing num_async * o possibly decrementing num_async
* do callback without lock * do callback without lock
*/ */
r = pollit(ctx, NULL); r = tube_wait(ctx->rr_pipe);
if(r) { if(r) {
r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1); r = tube_read_msg(ctx->rr_pipe, &msg, &len, 1);
if(r == 0) { if(r == 0) {
lock_basic_unlock(&ctx->rrpipe_lock); lock_basic_unlock(&ctx->rrpipe_lock);
return UB_PIPE; return UB_PIPE;
@ -667,7 +621,7 @@ ub_resolve_async(struct ub_ctx* ctx, char* name, int rrtype,
lock_basic_unlock(&ctx->cfglock); lock_basic_unlock(&ctx->cfglock);
lock_basic_lock(&ctx->qqpipe_lock); lock_basic_lock(&ctx->qqpipe_lock);
if(!libworker_write_msg(ctx->qqpipe[1], msg, len, 0)) { if(!tube_write_msg(ctx->qq_pipe, msg, len, 0)) {
lock_basic_unlock(&ctx->qqpipe_lock); lock_basic_unlock(&ctx->qqpipe_lock);
free(msg); free(msg);
return UB_PIPE; return UB_PIPE;
@ -705,7 +659,7 @@ ub_cancel(struct ub_ctx* ctx, int async_id)
} }
/* send cancel to background worker */ /* send cancel to background worker */
lock_basic_lock(&ctx->qqpipe_lock); lock_basic_lock(&ctx->qqpipe_lock);
if(!libworker_write_msg(ctx->qqpipe[1], msg, len, 0)) { if(!tube_write_msg(ctx->qq_pipe, msg, len, 0)) {
lock_basic_unlock(&ctx->qqpipe_lock); lock_basic_unlock(&ctx->qqpipe_lock);
free(msg); free(msg);
return UB_PIPE; return UB_PIPE;

View file

@ -79,10 +79,7 @@ libworker_delete(struct libworker* w)
ub_randfree(w->env->rnd); ub_randfree(w->env->rnd);
free(w->env); free(w->env);
} }
free(w->cmd_msg);
outside_network_delete(w->back); outside_network_delete(w->back);
comm_point_delete(w->cmd_com);
comm_point_delete(w->res_com);
comm_base_delete(w->base); comm_base_delete(w->base);
free(w); free(w);
} }
@ -231,130 +228,19 @@ libworker_do_cmd(struct libworker* w, uint8_t* msg, uint32_t len)
} }
/** handle control command coming into server */ /** handle control command coming into server */
int void
libworker_handle_control_cmd(struct comm_point* c, void* arg, libworker_handle_control_cmd(struct tube* ATTR_UNUSED(tube),
int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep)) uint8_t* msg, size_t len, int err, void* arg)
{ {
struct libworker* w = (struct libworker*)arg; struct libworker* w = (struct libworker*)arg;
ssize_t r;
if(w->cmd_read < sizeof(w->cmd_len)) { if(err != 0) {
/* complete reading the length of control msg */ free(msg);
r = read(c->fd, ((uint8_t*)&w->cmd_len) + w->cmd_read,
sizeof(w->cmd_len) - w->cmd_read);
if(r==0) {
/* error has happened or */
/* parent closed pipe, must have exited somehow */
/* it is of no use to go on, exit */ /* it is of no use to go on, exit */
comm_base_exit(w->base); comm_base_exit(w->base);
return 0; return;
} }
if(r==-1) { libworker_do_cmd(w, msg, len); /* also frees the buf */
if(errno != EAGAIN && errno != EINTR) {
log_err("rpipe error: %s", strerror(errno));
}
/* nothing to read now, try later */
return 0;
}
w->cmd_read += r;
if(w->cmd_read < sizeof(w->cmd_len)) {
/* not complete, try later */
return 0;
}
w->cmd_msg = (uint8_t*)calloc(1, w->cmd_len);
if(!w->cmd_msg) {
log_err("malloc failure");
w->cmd_read = 0;
return 0;
}
}
/* cmd_len has been read, read remainder */
r = read(c->fd, w->cmd_msg + w->cmd_read - sizeof(w->cmd_len),
w->cmd_len - (w->cmd_read - sizeof(w->cmd_len)));
if(r==0) {
/* error has happened or */
/* parent closed pipe, must have exited somehow */
/* it is of no use to go on, exit */
comm_base_exit(w->base);
return 0;
}
if(r==-1) {
/* nothing to read now, try later */
if(errno != EAGAIN && errno != EINTR) {
log_err("rpipe error: %s", strerror(errno));
}
return 0;
}
w->cmd_read += r;
if(w->cmd_read < sizeof(w->cmd_len) + w->cmd_len) {
/* not complete, try later */
return 0;
}
w->cmd_read = 0;
libworker_do_cmd(w, w->cmd_msg, w->cmd_len); /* also frees the buf */
w->cmd_msg = NULL;
return 0;
}
/** handle opportunity to write result back */
int
libworker_handle_result_write(struct comm_point* c, void* arg,
int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep))
{
struct libworker* w = (struct libworker*)arg;
struct libworker_res_list* item = w->res_list;
ssize_t r;
if(!item) {
comm_point_stop_listening(c);
return 0;
}
if(w->res_write < sizeof(item->len)) {
r = write(c->fd, ((uint8_t*)&item->len) + w->res_write,
sizeof(item->len) - w->res_write);
if(r == -1) {
if(errno != EAGAIN && errno != EINTR) {
log_err("wpipe error: %s", strerror(errno));
}
return 0; /* try again later */
}
if(r == 0) {
/* error on pipe, must have exited somehow */
/* it is of no use to go on, exit */
comm_base_exit(w->base);
return 0;
}
w->res_write += r;
if(w->res_write < sizeof(item->len))
return 0;
}
r = write(c->fd, item->buf + w->res_write - sizeof(item->len),
item->len - (w->res_write - sizeof(item->len)));
if(r == -1) {
if(errno != EAGAIN && errno != EINTR) {
log_err("wpipe error: %s", strerror(errno));
}
return 0; /* try again later */
}
if(r == 0) {
/* error on pipe, must have exited somehow */
/* it is of no use to go on, exit */
comm_base_exit(w->base);
return 0;
}
w->res_write += r;
if(w->res_write < sizeof(item->len) + item->len)
return 0;
/* done this result, remove it */
free(item->buf);
item->buf = NULL;
w->res_list = w->res_list->next;
free(item);
if(!w->res_list) {
w->res_last = NULL;
comm_point_stop_listening(c);
}
w->res_write = 0;
return 0;
} }
/** the background thread func */ /** the background thread func */
@ -363,7 +249,6 @@ libworker_dobg(void* arg)
{ {
/* setup */ /* setup */
uint32_t m; uint32_t m;
int fd;
struct libworker* w = (struct libworker*)arg; struct libworker* w = (struct libworker*)arg;
struct ub_ctx* ctx = w->ctx; struct ub_ctx* ctx = w->ctx;
log_thread_set(&w->thread_num); log_thread_set(&w->thread_num);
@ -371,27 +256,20 @@ libworker_dobg(void* arg)
/* we are forked */ /* we are forked */
w->is_bg_thread = 0; w->is_bg_thread = 0;
/* close non-used parts of the pipes */ /* close non-used parts of the pipes */
if(ctx->qqpipe[1] != -1) { tube_close_write(ctx->qq_pipe);
close(ctx->qqpipe[1]); tube_close_read(ctx->rr_pipe);
ctx->qqpipe[1] = -1;
}
if(ctx->rrpipe[0] != -1) {
close(ctx->rrpipe[0]);
ctx->rrpipe[0] = -1;
}
#endif #endif
if(!w) { if(!w) {
log_err("libunbound bg worker init failed, nomem"); log_err("libunbound bg worker init failed, nomem");
return NULL; return NULL;
} }
if(!(w->cmd_com=comm_point_create_raw(w->base, ctx->qqpipe[0], 0, if(!tube_setup_bg_listen(ctx->qq_pipe, w->base,
libworker_handle_control_cmd, w))) { libworker_handle_control_cmd, w)) {
log_err("libunbound bg worker init failed, no cmdcom"); log_err("libunbound bg worker init failed, no bglisten");
return NULL; return NULL;
} }
if(!(w->res_com=comm_point_create_raw(w->base, ctx->rrpipe[1], 1, if(!tube_setup_bg_write(ctx->rr_pipe, w->base)) {
libworker_handle_result_write, w))) { log_err("libunbound bg worker init failed, no bgwrite");
log_err("libunbound bg worker init failed, no rescom");
return NULL; return NULL;
} }
@ -399,14 +277,17 @@ libworker_dobg(void* arg)
comm_base_dispatch(w->base); comm_base_dispatch(w->base);
/* cleanup */ /* cleanup */
fd = ctx->rrpipe[1];
ctx->rrpipe[1] = -1;
m = UB_LIBCMD_QUIT; m = UB_LIBCMD_QUIT;
tube_remove_bg_listen(w->ctx->qq_pipe);
tube_remove_bg_write(w->ctx->rr_pipe);
libworker_delete(w); libworker_delete(w);
close(ctx->qqpipe[0]); (void)tube_write_msg(ctx->rr_pipe, (uint8_t*)&m,
ctx->qqpipe[0] = -1; (uint32_t)sizeof(m), 0);
(void)libworker_write_msg(fd, (uint8_t*)&m, (uint32_t)sizeof(m), 0); #ifdef THREADS_DISABLED
close(fd); /* close pipes from forked process before exit */
tube_close_read(ctx->qq_pipe);
tube_close_write(ctx->rr_pipe);
#endif
return NULL; return NULL;
} }
@ -435,10 +316,8 @@ int libworker_bg(struct ub_ctx* ctx)
w = libworker_setup(ctx, 1); w = libworker_setup(ctx, 1);
if(!w) fatal_exit("out of memory"); if(!w) fatal_exit("out of memory");
/* close non-used parts of the pipes */ /* close non-used parts of the pipes */
close(ctx->qqpipe[1]); tube_close_write(ctx->qq_pipe);
close(ctx->rrpipe[0]); tube_close_read(ctx->rr_pipe);
ctx->qqpipe[1] = -1;
ctx->rrpipe[0] = -1;
(void)libworker_dobg(w); (void)libworker_dobg(w);
exit(0); exit(0);
break; break;
@ -655,7 +534,6 @@ add_bg_result(struct libworker* w, struct ctx_query* q, ldns_buffer* pkt,
{ {
uint8_t* msg = NULL; uint8_t* msg = NULL;
uint32_t len = 0; uint32_t len = 0;
struct libworker_res_list* item;
/* serialize and delete unneeded q */ /* serialize and delete unneeded q */
if(w->is_bg_thread) { if(w->is_bg_thread) {
@ -677,24 +555,10 @@ add_bg_result(struct libworker* w, struct ctx_query* q, ldns_buffer* pkt,
log_err("out of memory for async answer"); log_err("out of memory for async answer");
return; return;
} }
item = (struct libworker_res_list*)malloc(sizeof(*item)); if(!tube_queue_item(w->ctx->rr_pipe, msg, len)) {
if(!item) {
free(msg);
log_err("out of memory for async answer"); log_err("out of memory for async answer");
return; return;
} }
item->buf = msg;
item->len = len;
item->next = NULL;
/* add at back of list, since the first one may be partially written */
if(w->res_last)
w->res_last->next = item;
else w->res_list = item;
w->res_last = item;
if(w->res_list == w->res_last) {
/* first added item, start the write process */
comm_point_start_listening(w->res_com, -1, -1);
}
} }
/** callback with bg results */ /** callback with bg results */
@ -873,100 +737,10 @@ libworker_handle_service_reply(struct comm_point* c, void* arg, int error,
return 0; return 0;
} }
int
libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock)
{
ssize_t r;
/* test */
if(nonblock) {
r = write(fd, &len, sizeof(len));
if(r == -1) {
if(errno==EINTR || errno==EAGAIN)
return -1;
log_err("msg write failed: %s", strerror(errno));
return -1; /* can still continue, perhaps */
}
} else r = 0;
if(!fd_set_block(fd))
return 0;
/* write remainder */
if(r != (ssize_t)sizeof(len)) {
if(write(fd, (char*)(&len)+r, sizeof(len)-r) == -1) {
log_err("msg write failed: %s", strerror(errno));
(void)fd_set_nonblock(fd);
return 0;
}
}
if(write(fd, buf, len) == -1) {
log_err("msg write failed: %s", strerror(errno));
(void)fd_set_nonblock(fd);
return 0;
}
if(!fd_set_nonblock(fd))
return 0;
return 1;
}
int
libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock)
{
ssize_t r;
/* test */
*len = 0;
if(nonblock) {
r = read(fd, len, sizeof(*len));
if(r == -1) {
if(errno==EINTR || errno==EAGAIN)
return -1;
log_err("msg read failed: %s", strerror(errno));
return -1; /* we can still continue, perhaps */
}
if(r == 0) /* EOF */
return 0;
} else r = 0;
if(!fd_set_block(fd))
return 0;
/* read remainder */
if(r != (ssize_t)sizeof(*len)) {
if((r=read(fd, (char*)(len)+r, sizeof(*len)-r)) == -1) {
log_err("msg read failed: %s", strerror(errno));
(void)fd_set_nonblock(fd);
return 0;
}
if(r == 0) /* EOF */ {
(void)fd_set_nonblock(fd);
return 0;
}
}
*buf = (uint8_t*)malloc(*len);
if(!*buf) {
log_err("out of memory");
(void)fd_set_nonblock(fd);
return 0;
}
if((r=read(fd, *buf, *len)) == -1) {
log_err("msg read failed: %s", strerror(errno));
(void)fd_set_nonblock(fd);
free(*buf);
return 0;
}
if(r == 0) { /* EOF */
(void)fd_set_nonblock(fd);
free(*buf);
return 0;
}
if(!fd_set_nonblock(fd)) {
free(*buf);
return 0;
}
return 1;
}
/* --- fake callbacks for fptr_wlist to work --- */ /* --- fake callbacks for fptr_wlist to work --- */
void worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), void worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube),
ldns_buffer* ATTR_UNUSED(buffer), int ATTR_UNUSED(error), uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len),
void* ATTR_UNUSED(arg)) int ATTR_UNUSED(error), void* ATTR_UNUSED(arg))
{ {
log_assert(0); log_assert(0);
} }

View file

@ -55,8 +55,8 @@ struct outbound_entry;
struct module_qstate; struct module_qstate;
struct comm_point; struct comm_point;
struct comm_reply; struct comm_reply;
struct libworker_res_list;
struct regional; struct regional;
struct tube;
/** /**
* The library-worker status structure * The library-worker status structure
@ -81,37 +81,6 @@ struct libworker {
struct outside_network* back; struct outside_network* back;
/** random() table for this worker. */ /** random() table for this worker. */
struct ub_randstate* rndstate; struct ub_randstate* rndstate;
/** commpoint to listen to commands */
struct comm_point* cmd_com;
/** are we currently reading a command, 0 if not, else bytecount */
size_t cmd_read;
/** size of current read command, may be partially read */
uint32_t cmd_len;
/** the current read command content, malloced, can be partially read*/
uint8_t* cmd_msg;
/** commpoint to write results back */
struct comm_point* res_com;
/** are we curently writing a result, 0 if not, else bytecount into
* the res_list first entry. */
size_t res_write;
/** list of outstanding results to be written back */
struct libworker_res_list* res_list;
/** last in list */
struct libworker_res_list* res_last;
};
/**
* List of results (arbitrary command serializations) to write back
*/
struct libworker_res_list {
/** next in list */
struct libworker_res_list* next;
/** serialized buffer to write */
uint8_t* buf;
/** length to write */
uint32_t len;
}; };
/** /**
@ -180,46 +149,12 @@ int libworker_handle_service_reply(struct comm_point* c, void* arg, int error,
struct comm_reply* reply_info); struct comm_reply* reply_info);
/** handle control command coming into server */ /** handle control command coming into server */
int libworker_handle_control_cmd(struct comm_point* c, void* arg, void libworker_handle_control_cmd(struct tube* tube, uint8_t* msg, size_t len,
int err, struct comm_reply* rep); int err, void* arg);
/** handle opportunity to write result back */ /** handle opportunity to write result back */
int libworker_handle_result_write(struct comm_point* c, void* arg, void libworker_handle_result_write(struct tube* tube, uint8_t* msg, size_t len,
int err, struct comm_reply* rep); int err, void* arg);
/**
* Write length bytes followed by message.
* @param fd: the socket to write on. Is nonblocking.
* Set to blocking by the function,
* and back to non-blocking at exit of function.
* @param buf: the message.
* @param len: length of message.
* @param nonblock: if set to true, the first write is nonblocking.
* If the first write fails the function returns -1.
* If set false, the first write is blocking.
* @return: all remainder writes are nonblocking.
* return 0 on error, in that case blocking/nonblocking of socket is
* unknown.
* return 1 if all OK.
*/
int libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock);
/**
* Read length bytes followed by message.
* @param fd: the socket to write on. Is nonblocking.
* Set to blocking by the function,
* and back to non-blocking at exit of function.
* @param buf: the message, malloced.
* @param len: length of message, returned.
* @param nonblock: if set to true, the first read is nonblocking.
* If the first read fails the function returns -1.
* If set false, the first read is blocking.
* @return: all remainder reads are nonblocking.
* return 0 on error, in that case blocking/nonblocking of socket is
* unknown. On EOF 0 is returned.
* return 1 if all OK.
*/
int libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock);
/** /**
* fill result from parsed message, on error fills servfail * fill result from parsed message, on error fills servfail

View file

@ -48,8 +48,8 @@ struct module_qstate;
struct tube; struct tube;
void worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), void worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube),
ldns_buffer* ATTR_UNUSED(buffer), int ATTR_UNUSED(error), uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len),
void* ATTR_UNUSED(arg)) int ATTR_UNUSED(error), void* ATTR_UNUSED(arg))
{ {
log_assert(0); log_assert(0);
} }
@ -150,6 +150,13 @@ int libworker_handle_service_reply(struct comm_point* ATTR_UNUSED(c),
return 0; return 0;
} }
void libworker_handle_control_cmd(struct tube* ATTR_UNUSED(tube),
uint8_t* ATTR_UNUSED(buffer), size_t ATTR_UNUSED(len),
int ATTR_UNUSED(error), void* ATTR_UNUSED(arg))
{
log_assert(0);
}
int context_query_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b)) int context_query_cmp(const void* ATTR_UNUSED(a), const void* ATTR_UNUSED(b))
{ {
log_assert(0); log_assert(0);

View file

@ -945,6 +945,26 @@ struct comm_point* comm_point_create_local(struct comm_base* ATTR_UNUSED(base),
return calloc(1, 1); return calloc(1, 1);
} }
struct comm_point* comm_point_create_raw(struct comm_base* ATTR_UNUSED(base),
int ATTR_UNUSED(fd), int ATTR_UNUSED(writing),
comm_point_callback_t* ATTR_UNUSED(callback),
void* ATTR_UNUSED(callback_arg))
{
/* no pipe comm possible */
return calloc(1, 1);
}
void comm_point_start_listening(struct comm_point* ATTR_UNUSED(c),
int ATTR_UNUSED(newfd), int ATTR_UNUSED(sec))
{
/* no bg write pipe comm possible */
}
void comm_point_stop_listening(struct comm_point* ATTR_UNUSED(c))
{
/* no bg write pipe comm possible */
}
/* only cmd com _local gets deleted */ /* only cmd com _local gets deleted */
void comm_point_delete(struct comm_point* c) void comm_point_delete(struct comm_point* c)
{ {

View file

@ -80,6 +80,14 @@ fptr_whitelist_comm_point(comm_point_callback_t *fptr)
return 0; return 0;
} }
int
fptr_whitelist_comm_point_raw(comm_point_callback_t *fptr)
{
if(fptr == &tube_handle_listen) return 1;
else if(fptr == &tube_handle_write) return 1;
return 0;
}
int int
fptr_whitelist_comm_timer(void (*fptr)(void*)) fptr_whitelist_comm_timer(void (*fptr)(void*))
{ {
@ -330,5 +338,6 @@ fptr_whitelist_alloc_cleanup(void (*fptr)(void*))
int fptr_whitelist_tube_listen(tube_callback_t* fptr) int fptr_whitelist_tube_listen(tube_callback_t* fptr)
{ {
if(fptr == &worker_handle_control_cmd) return 1; if(fptr == &worker_handle_control_cmd) return 1;
else if(fptr == &libworker_handle_control_cmd) return 1;
return 0; return 0;
} }

View file

@ -77,6 +77,14 @@
*/ */
int fptr_whitelist_comm_point(comm_point_callback_t *fptr); int fptr_whitelist_comm_point(comm_point_callback_t *fptr);
/**
* Check function pointer whitelist for raw comm_point callback values.
*
* @param fptr: function pointer to check.
* @return false if not in whitelist.
*/
int fptr_whitelist_comm_point_raw(comm_point_callback_t *fptr);
/** /**
* Check function pointer whitelist for comm_timer callback values. * Check function pointer whitelist for comm_timer callback values.
* *

View file

@ -5044,6 +5044,7 @@
24249, 24249,
24321, 24321,
24386, 24386,
24465,
24554, 24554,
24677, 24677,
24678, 24678,

View file

@ -943,6 +943,7 @@ void comm_point_raw_handle_callback(int ATTR_UNUSED(fd),
log_assert(c->type == comm_raw); log_assert(c->type == comm_raw);
comm_base_now(c->ev->base); comm_base_now(c->ev->base);
fptr_ok(fptr_whitelist_comm_point_raw(c->callback));
(void)(*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, NULL); (void)(*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, NULL);
} }

View file

@ -49,19 +49,28 @@ struct tube* tube_create(void)
{ {
struct tube* tube = (struct tube*)calloc(1, sizeof(*tube)); struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
int sv[2]; int sv[2];
if(!tube) return 0; if(!tube) {
int err = errno;
log_err("tube_create: out of memory");
errno = err;
return NULL;
}
tube->sr = -1; tube->sr = -1;
tube->sw = -1; tube->sw = -1;
if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) { if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
int err = errno;
log_err("socketpair: %s", strerror(errno)); log_err("socketpair: %s", strerror(errno));
free(tube); free(tube);
errno = err;
return NULL; return NULL;
} }
tube->sr = sv[0]; tube->sr = sv[0];
tube->sw = sv[1]; tube->sw = sv[1];
if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) { if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) {
int err = errno;
log_err("tube: cannot set nonblocking"); log_err("tube: cannot set nonblocking");
tube_delete(tube); tube_delete(tube);
errno = err;
return NULL; return NULL;
} }
return tube; return tube;
@ -70,52 +79,375 @@ struct tube* tube_create(void)
void tube_delete(struct tube* tube) void tube_delete(struct tube* tube)
{ {
if(!tube) return; if(!tube) return;
if(tube->listen_com) { tube_remove_bg_listen(tube);
comm_point_delete(tube->listen_com); tube_remove_bg_write(tube);
}
/* close fds after deleting commpoints, to be sure. /* close fds after deleting commpoints, to be sure.
* Also epoll does not like closing fd before event_del */ * Also epoll does not like closing fd before event_del */
if(tube->sr != -1) close(tube->sr); tube_close_read(tube);
if(tube->sw != -1) close(tube->sw); tube_close_write(tube);
tube->sr = -1;
tube->sw = -1;
free(tube); free(tube);
} }
void tube_close_read(struct tube* tube)
{
if(tube->sr != -1) {
close(tube->sr);
tube->sr = -1;
}
}
void tube_close_write(struct tube* tube)
{
if(tube->sw != -1) {
close(tube->sw);
tube->sw = -1;
}
}
void tube_remove_bg_listen(struct tube* tube)
{
if(tube->listen_com) {
comm_point_delete(tube->listen_com);
tube->listen_com = NULL;
}
if(tube->cmd_msg) {
free(tube->cmd_msg);
tube->cmd_msg = NULL;
}
}
void tube_remove_bg_write(struct tube* tube)
{
if(tube->res_com) {
comm_point_delete(tube->res_com);
tube->res_com = NULL;
}
if(tube->res_list) {
struct tube_res_list* np, *p = tube->res_list;
tube->res_list = NULL;
tube->res_last = NULL;
while(p) {
np = p->next;
free(p->buf);
free(p);
p = np;
}
}
}
int int
tube_handle_listen(struct comm_point* c, void* arg, int error, tube_handle_listen(struct comm_point* c, void* arg, int error,
struct comm_reply* ATTR_UNUSED(reply_info)) struct comm_reply* ATTR_UNUSED(reply_info))
{ {
struct tube* tube = (struct tube*)arg; struct tube* tube = (struct tube*)arg;
ssize_t r;
if(error != NETEVENT_NOERROR) { if(error != NETEVENT_NOERROR) {
fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
(*tube->listen_cb)(tube, NULL, error, tube->listen_arg); (*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg);
return 0; return 0;
} }
if(tube->cmd_read < sizeof(tube->cmd_len)) {
/* complete reading the length of control msg */
r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read,
sizeof(tube->cmd_len) - tube->cmd_read);
if(r==0) {
/* error has happened or */
/* parent closed pipe, must have exited somehow */
fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
(*tube->listen_cb)(tube, c->buffer, error, tube->listen_arg); (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
tube->listen_arg);
return 0;
}
if(r==-1) {
if(errno != EAGAIN && errno != EINTR) {
log_err("rpipe error: %s", strerror(errno));
}
/* nothing to read now, try later */
return 0;
}
tube->cmd_read += r;
if(tube->cmd_read < sizeof(tube->cmd_len)) {
/* not complete, try later */
return 0;
}
tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len);
if(!tube->cmd_msg) {
log_err("malloc failure");
tube->cmd_read = 0;
return 0;
}
}
/* cmd_len has been read, read remainder */
r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len),
tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len)));
if(r==0) {
/* error has happened or */
/* parent closed pipe, must have exited somehow */
fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
(*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
tube->listen_arg);
return 0;
}
if(r==-1) {
/* nothing to read now, try later */
if(errno != EAGAIN && errno != EINTR) {
log_err("rpipe error: %s", strerror(errno));
}
return 0;
}
tube->cmd_read += r;
if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) {
/* not complete, try later */
return 0;
}
tube->cmd_read = 0;
fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
(*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len,
NETEVENT_NOERROR, tube->listen_arg);
/* also frees the buf */
tube->cmd_msg = NULL;
return 0; return 0;
} }
int tube_listen_cmd(struct tube* tube, struct comm_base* base, int
size_t msg_buffer_sz, tube_callback_t* cb, void* arg) tube_handle_write(struct comm_point* c, void* arg, int error,
struct comm_reply* ATTR_UNUSED(reply_info))
{
struct tube* tube = (struct tube*)arg;
struct tube_res_list* item = tube->res_list;
ssize_t r;
if(error != NETEVENT_NOERROR) {
log_err("tube_handle_write net error %d", error);
return 0;
}
if(!item) {
comm_point_stop_listening(c);
return 0;
}
if(tube->res_write < sizeof(item->len)) {
r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write,
sizeof(item->len) - tube->res_write);
if(r == -1) {
if(errno != EAGAIN && errno != EINTR) {
log_err("wpipe error: %s", strerror(errno));
}
return 0; /* try again later */
}
if(r == 0) {
/* error on pipe, must have exited somehow */
/* cannot signal this to pipe user */
return 0;
}
tube->res_write += r;
if(tube->res_write < sizeof(item->len))
return 0;
}
r = write(c->fd, item->buf + tube->res_write - sizeof(item->len),
item->len - (tube->res_write - sizeof(item->len)));
if(r == -1) {
if(errno != EAGAIN && errno != EINTR) {
log_err("wpipe error: %s", strerror(errno));
}
return 0; /* try again later */
}
if(r == 0) {
/* error on pipe, must have exited somehow */
/* cannot signal this to pipe user */
return 0;
}
tube->res_write += r;
if(tube->res_write < sizeof(item->len) + item->len)
return 0;
/* done this result, remove it */
free(item->buf);
item->buf = NULL;
tube->res_list = tube->res_list->next;
free(item);
if(!tube->res_list) {
tube->res_last = NULL;
comm_point_stop_listening(c);
}
tube->res_write = 0;
return 0;
}
int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
int nonblock)
{
ssize_t r;
int fd = tube->sw;
/* test */
if(nonblock) {
r = write(fd, &len, sizeof(len));
if(r == -1) {
if(errno==EINTR || errno==EAGAIN)
return -1;
log_err("tube msg write failed: %s", strerror(errno));
return -1; /* can still continue, perhaps */
}
} else r = 0;
if(!fd_set_block(fd))
return 0;
/* write remainder */
if(r != (ssize_t)sizeof(len)) {
if(write(fd, (char*)(&len)+r, sizeof(len)-r) == -1) {
log_err("tube msg write failed: %s", strerror(errno));
(void)fd_set_nonblock(fd);
return 0;
}
}
if(write(fd, buf, len) == -1) {
log_err("tube msg write failed: %s", strerror(errno));
(void)fd_set_nonblock(fd);
return 0;
}
if(!fd_set_nonblock(fd))
return 0;
return 1;
}
int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
int nonblock)
{
ssize_t r;
int fd = tube->sr;
/* test */
*len = 0;
if(nonblock) {
r = read(fd, len, sizeof(*len));
if(r == -1) {
if(errno==EINTR || errno==EAGAIN)
return -1;
log_err("tube msg read failed: %s", strerror(errno));
return -1; /* we can still continue, perhaps */
}
if(r == 0) /* EOF */
return 0;
} else r = 0;
if(!fd_set_block(fd))
return 0;
/* read remainder */
if(r != (ssize_t)sizeof(*len)) {
if((r=read(fd, (char*)(len)+r, sizeof(*len)-r)) == -1) {
log_err("tube msg read failed: %s", strerror(errno));
(void)fd_set_nonblock(fd);
return 0;
}
if(r == 0) /* EOF */ {
(void)fd_set_nonblock(fd);
return 0;
}
}
*buf = (uint8_t*)malloc(*len);
if(!*buf) {
log_err("tube read out of memory");
(void)fd_set_nonblock(fd);
return 0;
}
if((r=read(fd, *buf, *len)) == -1) {
log_err("tube msg read failed: %s", strerror(errno));
(void)fd_set_nonblock(fd);
free(*buf);
return 0;
}
if(r == 0) { /* EOF */
(void)fd_set_nonblock(fd);
free(*buf);
return 0;
}
if(!fd_set_nonblock(fd)) {
free(*buf);
return 0;
}
return 1;
}
/** perform a select() on the fd */
static int
pollit(int fd, struct timeval* t)
{
fd_set r;
#ifndef S_SPLINT_S
FD_ZERO(&r);
FD_SET(FD_SET_T fd, &r);
#endif
if(select(fd+1, &r, NULL, NULL, t) == -1) {
return 0;
}
errno = 0;
return FD_ISSET(fd, &r);
}
int tube_poll(struct tube* tube)
{
struct timeval t;
memset(&t, 0, sizeof(t));
return pollit(tube->sr, &t);
}
int tube_wait(struct tube* tube)
{
return pollit(tube->sr, NULL);
}
int tube_read_fd(struct tube* tube)
{
return tube->sr;
}
int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
tube_callback_t* cb, void* arg)
{ {
tube->listen_cb = cb; tube->listen_cb = cb;
tube->listen_arg = arg; tube->listen_arg = arg;
if(!(tube->listen_com = comm_point_create_local(base, tube->sr, if(!(tube->listen_com = comm_point_create_raw(base, tube->sr,
msg_buffer_sz, tube_handle_listen, tube))) { 0, tube_handle_listen, tube))) {
log_err("tube_listen_cmd: commpoint creation failed"); int err = errno;
log_err("tube_setup_bg_l: commpoint creation failed");
errno = err;
return 0; return 0;
} }
return 1; return 1;
} }
int tube_send_cmd(struct tube* tube, ldns_buffer* buffer) int tube_setup_bg_write(struct tube* tube, struct comm_base* base)
{ {
if(!write_socket(tube->sw, ldns_buffer_begin(buffer), if(!(tube->res_com = comm_point_create_raw(base, tube->sw,
ldns_buffer_limit(buffer))) { 1, tube_handle_write, tube))) {
log_err("write socket: %s", strerror(errno)); int err = errno;
log_err("tube_setup_bg_w: commpoint creation failed");
errno = err;
return 0; return 0;
} }
return 1; return 1;
} }
int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
{
struct tube_res_list* item =
(struct tube_res_list*)malloc(sizeof(*item));
if(!item) {
free(msg);
log_err("out of memory for async answer");
return 0;
}
item->buf = msg;
item->len = len;
item->next = NULL;
/* add at back of list, since the first one may be partially written */
if(tube->res_last)
tube->res_last->next = item;
else tube->res_list = item;
tube->res_last = item;
if(tube->res_list == tube->res_last) {
/* first added item, start the write process */
comm_point_start_listening(tube->res_com, -1, -1);
}
return 1;
}

View file

@ -44,13 +44,14 @@
struct comm_reply; struct comm_reply;
struct comm_base; struct comm_base;
struct tube; struct tube;
struct tube_res_list;
/** /**
* Callback from pipe listen function * Callback from pipe listen function
* void mycallback(tube, buffer, error, argument); * void mycallback(tube, msg, len, error, user_argument);
* if error is true (NETEVENT_*), buffer is probably NULL. * if error is true (NETEVENT_*), msg is probably NULL.
*/ */
typedef void tube_callback_t(struct tube*, ldns_buffer*, int, void*); typedef void tube_callback_t(struct tube*, uint8_t*, size_t, int, void*);
/** /**
* A pipe * A pipe
@ -67,6 +68,34 @@ struct tube {
tube_callback_t* listen_cb; tube_callback_t* listen_cb;
/** listen callback user arg */ /** listen callback user arg */
void* listen_arg; void* listen_arg;
/** are we currently reading a command, 0 if not, else bytecount */
size_t cmd_read;
/** size of current read command, may be partially read */
uint32_t cmd_len;
/** the current read command content, malloced, can be partially read*/
uint8_t* cmd_msg;
/** background write queue, commpoint to write results back */
struct comm_point* res_com;
/** are we curently writing a result, 0 if not, else bytecount into
* the res_list first entry. */
size_t res_write;
/** list of outstanding results to be written back */
struct tube_res_list* res_list;
/** last in list */
struct tube_res_list* res_last;
};
/**
* List of results (arbitrary command serializations) to write back
*/
struct tube_res_list {
/** next in list */
struct tube_res_list* next;
/** serialized buffer to write */
uint8_t* buf;
/** length to write */
uint32_t len;
}; };
/** /**
@ -82,30 +111,136 @@ struct tube* tube_create(void);
void tube_delete(struct tube* tube); void tube_delete(struct tube* tube);
/** /**
* Start listening for information over the pipe * Write length bytes followed by message.
* @param tube: the tube to write on.
* If that tube is a pipe, its write fd is used as
* the socket to write on. Is nonblocking.
* Set to blocking by the function,
* and back to non-blocking at exit of function.
* @param buf: the message.
* @param len: length of message.
* @param nonblock: if set to true, the first write is nonblocking.
* If the first write fails the function returns -1.
* If set false, the first write is blocking.
* @return: all remainder writes are nonblocking.
* return 0 on error, in that case blocking/nonblocking of socket is
* unknown.
* return 1 if all OK.
*/
int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
int nonblock);
/**
* Read length bytes followed by message.
* @param tube: The tube to read on.
* If that tube is a pipe, its read fd is used as
* the socket to read on. Is nonblocking.
* Set to blocking by the function,
* and back to non-blocking at exit of function.
* @param buf: the message, malloced.
* @param len: length of message, returned.
* @param nonblock: if set to true, the first read is nonblocking.
* If the first read fails the function returns -1.
* If set false, the first read is blocking.
* @return: all remainder reads are nonblocking.
* return 0 on error, in that case blocking/nonblocking of socket is
* unknown. On EOF 0 is returned.
* return 1 if all OK.
*/
int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
int nonblock);
/**
* Close read part of the pipe.
* The tube can no longer be read from.
* @param tube: tube to operate on.
*/
void tube_close_read(struct tube* tube);
/**
* Close write part of the pipe.
* The tube can no longer be written to.
* @param tube: tube to operate on.
*/
void tube_close_write(struct tube* tube);
/**
* See if data is ready for reading on the tube without blocking.
* @param tube: tube to check for readable items
* @return true if readable items are present. False if not (or error).
* true on pipe_closed.
*/
int tube_poll(struct tube* tube);
/**
* Wait for data to be ready for reading on the tube. is blocking.
* No timeout.
* @param tube: the tube to wait on.
* @return: if there was something to read (false on error).
* true on pipe_closed.
*/
int tube_wait(struct tube* tube);
/**
* Get FD that is readable when new information arrives.
* @param tube
* @return file descriptor.
*/
int tube_read_fd(struct tube* tube);
/**
* Start listening for information over the pipe.
* Background registration of a read listener, callback when read completed.
* Do not mix with tube_read_msg style direct reads from the pipe.
* @param tube: tube to listen on * @param tube: tube to listen on
* @param base: what base to register event callback. * @param base: what base to register event callback.
* @param msg_buffer_sz: what message buffer size to use.
* @param cb: callback routine. * @param cb: callback routine.
* @param arg: user argument for callback routine. * @param arg: user argument for callback routine.
* @return true if successful, false on error. * @return true if successful, false on error.
*/ */
int tube_listen_cmd(struct tube* tube, struct comm_base* base, int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
size_t msg_buffer_sz, tube_callback_t* cb, void* arg); tube_callback_t* cb, void* arg);
/** /**
* Send a command over a pipe, blocking operation. * Remove bg listen setup from event base.
* @param tube: tube to send the info on. * @param tube: what tube to cleanup
* @param buffer: buffer to send. starts with network order uint16 with
* length of remainder of buffer.
* The receiver does not receive the length uint16 in the buffer
* (the buffer is sized appropriately).
* @return 0 on error, true on success.
*/ */
int tube_send_cmd(struct tube* tube, ldns_buffer* buffer); void tube_remove_bg_listen(struct tube* tube);
/** decl for fptr_wlist of tube pipe listen handler */ /**
* Start background write handler for the pipe.
* Do not mix with tube_write_msg style direct writes to the pipe.
* @param tube: tube to write on
* @param base: what base to register event handler on.
* @return true if successful, false on error.
*/
int tube_setup_bg_write(struct tube* tube, struct comm_base* base);
/**
* Remove bg write setup from event base.
* @param tube: what tube to cleanup
*/
void tube_remove_bg_write(struct tube* tube);
/**
* Append data item to background list of writes.
* Mallocs a list entry behind the scenes.
* Not locked behind the scenes, call from one thread or lock on outside.
* @param tube: what tube to queue on.
* @param msg: memory message to send. Is free()d after use.
* Put at the end of the to-send queue.
* @param len: length of item.
* @return 0 on failure (msg freed).
*/
int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len);
/** for fptr wlist, callback function */
int tube_handle_listen(struct comm_point* c, void* arg, int error, int tube_handle_listen(struct comm_point* c, void* arg, int error,
struct comm_reply* reply_info); struct comm_reply* reply_info);
/** for fptr wlist, callback function */
int tube_handle_write(struct comm_point* c, void* arg, int error,
struct comm_reply* reply_info);
#endif /* UTIL_TUBE_H */ #endif /* UTIL_TUBE_H */