diff --git a/doc/Changelog b/doc/Changelog index 63573bee4..842985cf0 100644 --- a/doc/Changelog +++ b/doc/Changelog @@ -1,4 +1,9 @@ -29 January 2008: Wouter +31 January 2008: Wouter + - bg thread/process reads and writes the pipe nonblocking all the time + so that even if the pipe is buffered or so, the bg thread does not + block, and services both pipes and queries. + +30 January 2008: Wouter - check trailing / on chrootdir in checkconf. - check if root hints and anchor files are in chrootdir. - no route to host tcp error is verbosity level 2. diff --git a/libunbound/worker.c b/libunbound/worker.c index 80b17c0f8..632416d67 100644 --- a/libunbound/worker.c +++ b/libunbound/worker.c @@ -78,6 +78,7 @@ libworker_delete(struct libworker* w) ub_randfree(w->env->rnd); free(w->env); } + free(w->cmd_msg); outside_network_delete(w->back); comm_point_delete(w->cmd_com); comm_point_delete(w->res_com); @@ -195,15 +196,70 @@ handle_cancel(struct libworker* w, uint8_t* buf, uint32_t len) free(buf); } +/** do control command coming into bg server */ +static void +libworker_do_cmd(struct libworker* w, uint8_t* msg, uint32_t len) +{ + switch(context_serial_getcmd(msg, len)) { + default: + case UB_LIBCMD_ANSWER: + log_err("unknown command for bg worker %d", + (int)context_serial_getcmd(msg, len)); + /* and fall through to quit */ + case UB_LIBCMD_QUIT: + free(msg); + comm_base_exit(w->base); + break; + case UB_LIBCMD_NEWQUERY: + handle_newq(w, msg, len); + break; + case UB_LIBCMD_CANCEL: + handle_cancel(w, msg, len); + break; + } +} + /** handle control command coming into server */ int libworker_handle_control_cmd(struct comm_point* c, void* arg, int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep)) { struct libworker* w = (struct libworker*)arg; - uint32_t len = 0; - uint8_t* buf = NULL; - int r = libworker_read_msg(c->fd, &buf, &len, 1); + ssize_t r; + + if(w->cmd_read < sizeof(w->cmd_len)) { + /* complete reading the length of control msg */ + r = read(c->fd, ((uint8_t*)&w->cmd_len) + w->cmd_read, + sizeof(w->cmd_len) - w->cmd_read); + if(r==0) { + /* error has happened or */ + /* parent closed pipe, must have exited somehow */ + /* it is of no use to go on, exit */ + comm_base_exit(w->base); + return 0; + } + if(r==-1) { + if(errno != EAGAIN && errno != EINTR) { + log_err("rpipe error: %s", strerror(errno)); + } + /* nothing to read now, try later */ + return 0; + } + w->cmd_read += r; + if(w->cmd_read < sizeof(w->cmd_len)) { + /* not complete, try later */ + return 0; + } + w->cmd_msg = (uint8_t*)calloc(1, w->cmd_len); + if(!w->cmd_msg) { + log_err("malloc failure"); + w->cmd_read = 0; + return 0; + } + } + /* cmd_len has been read, read remainder */ + r = read(c->fd, w->cmd_msg + w->cmd_read - sizeof(w->cmd_len), + w->cmd_len - (w->cmd_read - sizeof(w->cmd_len))); if(r==0) { /* error has happened or */ /* parent closed pipe, must have exited somehow */ @@ -211,26 +267,21 @@ libworker_handle_control_cmd(struct comm_point* c, void* arg, comm_base_exit(w->base); return 0; } - if(r==-1) /* nothing to read now, try later */ + if(r==-1) { + /* nothing to read now, try later */ + if(errno != EAGAIN && errno != EINTR) { + log_err("rpipe error: %s", strerror(errno)); + } return 0; - - switch(context_serial_getcmd(buf, len)) { - default: - case UB_LIBCMD_ANSWER: - log_err("unknown command for bg worker %d", - (int)context_serial_getcmd(buf, len)); - /* and fall through to quit */ - case UB_LIBCMD_QUIT: - free(buf); - comm_base_exit(w->base); - break; - case UB_LIBCMD_NEWQUERY: - handle_newq(w, buf, len); - break; - case UB_LIBCMD_CANCEL: - handle_cancel(w, buf, len); - break; } + w->cmd_read += r; + if(w->cmd_read < sizeof(w->cmd_len) + w->cmd_len) { + /* not complete, try later */ + return 0; + } + w->cmd_read = 0; + libworker_do_cmd(w, w->cmd_msg, w->cmd_len); /* also frees the buf */ + w->cmd_msg = NULL; return 0; } @@ -241,20 +292,47 @@ libworker_handle_result_write(struct comm_point* c, void* arg, { struct libworker* w = (struct libworker*)arg; struct libworker_res_list* item = w->res_list; - int r; + ssize_t r; if(!item) { comm_point_stop_listening(c); return 0; } - r = libworker_write_msg(c->fd, item->buf, item->len, 1); - if(r == -1) + if(w->res_write < sizeof(item->len)) { + r = write(c->fd, ((uint8_t*)&item->len) + w->res_write, + sizeof(item->len) - w->res_write); + if(r == -1) { + if(errno != EAGAIN && errno != EINTR) { + log_err("wpipe error: %s", strerror(errno)); + } + return 0; /* try again later */ + } + if(r == 0) { + /* error on pipe, must have exited somehow */ + /* it is of no use to go on, exit */ + comm_base_exit(w->base); + return 0; + } + w->res_write += r; + if(w->res_write < sizeof(item->len)) + return 0; + } + r = write(c->fd, item->buf + w->res_write - sizeof(item->len), + item->len - (w->res_write - sizeof(item->len))); + if(r == -1) { + if(errno != EAGAIN && errno != EINTR) { + log_err("wpipe error: %s", strerror(errno)); + } return 0; /* try again later */ + } if(r == 0) { /* error on pipe, must have exited somehow */ /* it is of no use to go on, exit */ comm_base_exit(w->base); return 0; } + w->res_write += r; + if(w->res_write < sizeof(item->len) + item->len) + return 0; /* done this result, remove it */ free(item->buf); item->buf = NULL; @@ -264,6 +342,7 @@ libworker_handle_result_write(struct comm_point* c, void* arg, w->res_last = NULL; comm_point_stop_listening(c); } + w->res_write = 0; return 0; } @@ -591,7 +670,7 @@ add_bg_result(struct libworker* w, struct ctx_query* q, ldns_buffer* pkt, item->buf = msg; item->len = len; item->next = NULL; - /* add at back of list */ + /* add at back of list, since the first one may be partially written */ if(w->res_last) w->res_last->next = item; else w->res_list = item; diff --git a/libunbound/worker.h b/libunbound/worker.h index 275f10463..d32b60709 100644 --- a/libunbound/worker.h +++ b/libunbound/worker.h @@ -79,14 +79,23 @@ struct libworker { struct comm_base* base; /** the backside outside network interface to the auth servers */ struct outside_network* back; - /** random() table for this worker. */ struct ub_randstate* rndstate; + /** commpoint to listen to commands */ struct comm_point* cmd_com; + /** are we currently reading a command, 0 if not, else bytecount */ + size_t cmd_read; + /** size of current read command, may be partially read */ + uint32_t cmd_len; + /** the current read command content, malloced, can be partially read*/ + uint8_t* cmd_msg; + /** commpoint to write results back */ struct comm_point* res_com; - + /** are we curently writing a result, 0 if not, else bytecount into + * the res_list first entry. */ + size_t res_write; /** list of outstanding results to be written back */ struct libworker_res_list* res_list; /** last in list */