nonblock bg pipes.

git-svn-id: file:///svn/unbound/trunk@913 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
Wouter Wijngaards 2008-01-31 10:40:58 +00:00
parent 635c65f4ed
commit e48ecb0e15
3 changed files with 121 additions and 28 deletions

View file

@ -1,4 +1,9 @@
29 January 2008: Wouter
31 January 2008: Wouter
- bg thread/process reads and writes the pipe nonblocking all the time
so that even if the pipe is buffered or so, the bg thread does not
block, and services both pipes and queries.
30 January 2008: Wouter
- check trailing / on chrootdir in checkconf.
- check if root hints and anchor files are in chrootdir.
- no route to host tcp error is verbosity level 2.

View file

@ -78,6 +78,7 @@ libworker_delete(struct libworker* w)
ub_randfree(w->env->rnd);
free(w->env);
}
free(w->cmd_msg);
outside_network_delete(w->back);
comm_point_delete(w->cmd_com);
comm_point_delete(w->res_com);
@ -195,15 +196,70 @@ handle_cancel(struct libworker* w, uint8_t* buf, uint32_t len)
free(buf);
}
/** do control command coming into bg server */
static void
libworker_do_cmd(struct libworker* w, uint8_t* msg, uint32_t len)
{
switch(context_serial_getcmd(msg, len)) {
default:
case UB_LIBCMD_ANSWER:
log_err("unknown command for bg worker %d",
(int)context_serial_getcmd(msg, len));
/* and fall through to quit */
case UB_LIBCMD_QUIT:
free(msg);
comm_base_exit(w->base);
break;
case UB_LIBCMD_NEWQUERY:
handle_newq(w, msg, len);
break;
case UB_LIBCMD_CANCEL:
handle_cancel(w, msg, len);
break;
}
}
/** handle control command coming into server */
int
libworker_handle_control_cmd(struct comm_point* c, void* arg,
int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep))
{
struct libworker* w = (struct libworker*)arg;
uint32_t len = 0;
uint8_t* buf = NULL;
int r = libworker_read_msg(c->fd, &buf, &len, 1);
ssize_t r;
if(w->cmd_read < sizeof(w->cmd_len)) {
/* complete reading the length of control msg */
r = read(c->fd, ((uint8_t*)&w->cmd_len) + w->cmd_read,
sizeof(w->cmd_len) - w->cmd_read);
if(r==0) {
/* error has happened or */
/* parent closed pipe, must have exited somehow */
/* it is of no use to go on, exit */
comm_base_exit(w->base);
return 0;
}
if(r==-1) {
if(errno != EAGAIN && errno != EINTR) {
log_err("rpipe error: %s", strerror(errno));
}
/* nothing to read now, try later */
return 0;
}
w->cmd_read += r;
if(w->cmd_read < sizeof(w->cmd_len)) {
/* not complete, try later */
return 0;
}
w->cmd_msg = (uint8_t*)calloc(1, w->cmd_len);
if(!w->cmd_msg) {
log_err("malloc failure");
w->cmd_read = 0;
return 0;
}
}
/* cmd_len has been read, read remainder */
r = read(c->fd, w->cmd_msg + w->cmd_read - sizeof(w->cmd_len),
w->cmd_len - (w->cmd_read - sizeof(w->cmd_len)));
if(r==0) {
/* error has happened or */
/* parent closed pipe, must have exited somehow */
@ -211,26 +267,21 @@ libworker_handle_control_cmd(struct comm_point* c, void* arg,
comm_base_exit(w->base);
return 0;
}
if(r==-1) /* nothing to read now, try later */
if(r==-1) {
/* nothing to read now, try later */
if(errno != EAGAIN && errno != EINTR) {
log_err("rpipe error: %s", strerror(errno));
}
return 0;
switch(context_serial_getcmd(buf, len)) {
default:
case UB_LIBCMD_ANSWER:
log_err("unknown command for bg worker %d",
(int)context_serial_getcmd(buf, len));
/* and fall through to quit */
case UB_LIBCMD_QUIT:
free(buf);
comm_base_exit(w->base);
break;
case UB_LIBCMD_NEWQUERY:
handle_newq(w, buf, len);
break;
case UB_LIBCMD_CANCEL:
handle_cancel(w, buf, len);
break;
}
w->cmd_read += r;
if(w->cmd_read < sizeof(w->cmd_len) + w->cmd_len) {
/* not complete, try later */
return 0;
}
w->cmd_read = 0;
libworker_do_cmd(w, w->cmd_msg, w->cmd_len); /* also frees the buf */
w->cmd_msg = NULL;
return 0;
}
@ -241,20 +292,47 @@ libworker_handle_result_write(struct comm_point* c, void* arg,
{
struct libworker* w = (struct libworker*)arg;
struct libworker_res_list* item = w->res_list;
int r;
ssize_t r;
if(!item) {
comm_point_stop_listening(c);
return 0;
}
r = libworker_write_msg(c->fd, item->buf, item->len, 1);
if(r == -1)
if(w->res_write < sizeof(item->len)) {
r = write(c->fd, ((uint8_t*)&item->len) + w->res_write,
sizeof(item->len) - w->res_write);
if(r == -1) {
if(errno != EAGAIN && errno != EINTR) {
log_err("wpipe error: %s", strerror(errno));
}
return 0; /* try again later */
}
if(r == 0) {
/* error on pipe, must have exited somehow */
/* it is of no use to go on, exit */
comm_base_exit(w->base);
return 0;
}
w->res_write += r;
if(w->res_write < sizeof(item->len))
return 0;
}
r = write(c->fd, item->buf + w->res_write - sizeof(item->len),
item->len - (w->res_write - sizeof(item->len)));
if(r == -1) {
if(errno != EAGAIN && errno != EINTR) {
log_err("wpipe error: %s", strerror(errno));
}
return 0; /* try again later */
}
if(r == 0) {
/* error on pipe, must have exited somehow */
/* it is of no use to go on, exit */
comm_base_exit(w->base);
return 0;
}
w->res_write += r;
if(w->res_write < sizeof(item->len) + item->len)
return 0;
/* done this result, remove it */
free(item->buf);
item->buf = NULL;
@ -264,6 +342,7 @@ libworker_handle_result_write(struct comm_point* c, void* arg,
w->res_last = NULL;
comm_point_stop_listening(c);
}
w->res_write = 0;
return 0;
}
@ -591,7 +670,7 @@ add_bg_result(struct libworker* w, struct ctx_query* q, ldns_buffer* pkt,
item->buf = msg;
item->len = len;
item->next = NULL;
/* add at back of list */
/* add at back of list, since the first one may be partially written */
if(w->res_last)
w->res_last->next = item;
else w->res_list = item;

View file

@ -79,14 +79,23 @@ struct libworker {
struct comm_base* base;
/** the backside outside network interface to the auth servers */
struct outside_network* back;
/** random() table for this worker. */
struct ub_randstate* rndstate;
/** commpoint to listen to commands */
struct comm_point* cmd_com;
/** are we currently reading a command, 0 if not, else bytecount */
size_t cmd_read;
/** size of current read command, may be partially read */
uint32_t cmd_len;
/** the current read command content, malloced, can be partially read*/
uint8_t* cmd_msg;
/** commpoint to write results back */
struct comm_point* res_com;
/** are we curently writing a result, 0 if not, else bytecount into
* the res_list first entry. */
size_t res_write;
/** list of outstanding results to be written back */
struct libworker_res_list* res_list;
/** last in list */