bg resolution works.

git-svn-id: file:///svn/unbound/trunk@890 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
Wouter Wijngaards 2008-01-23 15:15:37 +00:00
parent faa7ad14c6
commit e56a3a60ef
8 changed files with 235 additions and 95 deletions

View file

@ -1,5 +1,9 @@
23 January 2008: Wouter
- removed debug prints from if-auto, verb-algo enables some.
- libunbound QUIT setup, remove memory leaks, when using threads
will share memory for passing results instead of writing it over
the pipe, only writes ID number over the pipe (towards the handler
thread that does process() ).
22 January 2008: Wouter
- library code for async in libunbound/unbound.c.

View file

@ -250,6 +250,24 @@ context_deserialize_new_query(struct ub_val_ctx* ctx, uint8_t* p, uint32_t len)
return q;
}
struct ctx_query*
context_lookup_new_query(struct ub_val_ctx* ctx, uint8_t* p, uint32_t len)
{
struct ctx_query* q;
int querynum;
if(len < 4*sizeof(uint32_t)+1) {
return NULL;
}
log_assert( ldns_read_uint32(p) == UB_LIBCMD_NEWQUERY);
querynum = (int)ldns_read_uint32(p+sizeof(uint32_t));
q = (struct ctx_query*)rbtree_search(&ctx->queries, &querynum);
if(!q) {
return NULL;
}
log_assert(q->async);
return q;
}
uint8_t*
context_serialize_answer(struct ctx_query* q, int err, ldns_buffer* pkt,
uint32_t* len)
@ -298,11 +316,7 @@ context_deserialize_answer(struct ub_val_ctx* ctx,
*err = UB_NOMEM;
return q;
}
} else {
q->msg_len = 0;
free(q->msg);
q->msg = NULL;
}
}
return q;
}

View file

@ -46,6 +46,7 @@
#include "services/modstack.h"
#include "libunbound/unbound.h"
#include "util/data/packed_rrset.h"
struct libworker;
/**
* The context structure
@ -130,7 +131,7 @@ struct ctx_query {
int querynum;
/** was this an async query? */
int async;
/** has this query been cancelled? (for bg thread) */
/** was this query cancelled (for bg worker) */
int cancelled;
/** for async query, the callback function */
@ -144,6 +145,8 @@ struct ctx_query {
size_t msg_len;
/** validation status on security */
enum sec_status msg_security;
/** store libworker that is handling this query */
struct libworker* w;
/** result structure, also contains original query, type, class.
* malloced ptr ready to hand to the client. */
@ -284,6 +287,16 @@ uint8_t* context_serialize_quit(uint32_t* len);
*/
enum ub_ctx_cmd context_serial_getcmd(uint8_t* p, uint32_t len);
/**
* Lookup query from new_query buffer.
* @param ctx: context
* @param p: buffer serialized.
* @param len: length of buffer.
* @return looked up ctx_query or NULL for malloc failure.
*/
struct ctx_query* context_lookup_new_query(struct ub_val_ctx* ctx,
uint8_t* p, uint32_t len);
/**
* Deserialize a new_query buffer.
* @param ctx: context

View file

@ -118,6 +118,30 @@ ub_val_ctx_delete(struct ub_val_ctx* ctx)
{
struct alloc_cache* a, *na;
if(!ctx) return;
/* stop the bg thread */
lock_basic_lock(&ctx->cfglock);
if(ctx->created_bg) {
uint8_t* msg;
uint32_t len;
uint32_t cmd = UB_LIBCMD_QUIT;
lock_basic_unlock(&ctx->cfglock);
lock_basic_lock(&ctx->qqpipe_lock);
(void)libworker_write_msg(ctx->qqpipe[1], (uint8_t*)&cmd,
(uint32_t)sizeof(cmd), 0);
lock_basic_unlock(&ctx->qqpipe_lock);
lock_basic_lock(&ctx->rrpipe_lock);
while(libworker_read_msg(ctx->rrpipe[0], &msg, &len, 0)) {
/* discard all results except a quit confirm */
if(context_serial_getcmd(msg, len) == UB_LIBCMD_QUIT) {
free(msg);
break;
}
free(msg);
}
lock_basic_unlock(&ctx->rrpipe_lock);
}
else lock_basic_unlock(&ctx->cfglock);
modstack_desetup(&ctx->mods, ctx->env);
a = ctx->alloc_list;
while(a) {
@ -131,10 +155,18 @@ ub_val_ctx_delete(struct ub_val_ctx* ctx)
lock_basic_destroy(&ctx->qqpipe_lock);
lock_basic_destroy(&ctx->rrpipe_lock);
lock_basic_destroy(&ctx->cfglock);
close(ctx->qqpipe[0]);
close(ctx->qqpipe[1]);
close(ctx->rrpipe[0]);
close(ctx->rrpipe[1]);
if(ctx->qqpipe[0] != -1)
close(ctx->qqpipe[0]);
if(ctx->qqpipe[1] != -1)
close(ctx->qqpipe[1]);
if(ctx->rrpipe[0] != -1)
close(ctx->rrpipe[0]);
if(ctx->rrpipe[1] != -1)
close(ctx->rrpipe[1]);
ctx->qqpipe[0] = -1;
ctx->qqpipe[1] = -1;
ctx->rrpipe[0] = -1;
ctx->rrpipe[1] = -1;
if(ctx->env) {
slabhash_delete(ctx->env->msg_cache);
rrset_cache_delete(ctx->env->rrset_cache);
@ -233,6 +265,10 @@ ub_val_ctx_debuglevel(struct ub_val_ctx* ctx, int d)
int
ub_val_ctx_async(struct ub_val_ctx* ctx, int dothread)
{
#if !defined(HAVE_PTHREAD) && !defined(HAVE_SOLARIS_THREADS)
if(dothread) /* cannot do threading */
return UB_NOERROR;
#endif
lock_basic_lock(&ctx->cfglock);
if(ctx->finalized) {
lock_basic_unlock(&ctx->cfglock);
@ -406,14 +442,19 @@ ub_val_resolve(struct ub_val_ctx* ctx, char* name, int rrtype,
r = libworker_fg(ctx, q);
if(r) {
lock_basic_lock(&ctx->cfglock);
(void)rbtree_delete(&ctx->queries, q->node.key);
context_query_delete(q);
lock_basic_unlock(&ctx->cfglock);
return r;
}
*result = q->res;
q->res = NULL;
lock_basic_lock(&ctx->cfglock);
(void)rbtree_delete(&ctx->queries, q->node.key);
context_query_delete(q);
lock_basic_unlock(&ctx->cfglock);
return UB_NOERROR;
}
@ -435,14 +476,17 @@ ub_val_resolve_async(struct ub_val_ctx* ctx, char* name, int rrtype,
}
}
if(!ctx->created_bg) {
int r = libworker_bg(ctx);
int r;
ctx->created_bg = 1;
lock_basic_unlock(&ctx->cfglock);
r = libworker_bg(ctx);
if(r) {
lock_basic_lock(&ctx->cfglock);
ctx->created_bg = 0;
lock_basic_unlock(&ctx->cfglock);
return r;
}
ctx->created_bg = 1;
}
lock_basic_unlock(&ctx->cfglock);
} else lock_basic_unlock(&ctx->cfglock);
/* create new ctx_query and attempt to add to the list */
q = context_new(ctx, name, rrtype, rrclass, callback, mydata);
@ -490,9 +534,11 @@ ub_val_cancel(struct ub_val_ctx* ctx, int async_id)
}
/* delete it */
(void)rbtree_delete(&ctx->queries, q->node.key);
ctx->num_async--;
context_query_delete(q);
if(!ctx->dothread) { /* if forked */
(void)rbtree_delete(&ctx->queries, q->node.key);
ctx->num_async--;
context_query_delete(q);
}
lock_basic_unlock(&ctx->cfglock);
/* send cancel to background worker */

View file

@ -108,6 +108,7 @@ libworker_setup(struct ub_val_ctx* ctx)
libworker_delete(w);
return NULL;
}
w->thread_num = w->env->alloc->thread_num;
alloc_set_id_cleanup(w->env->alloc, &libworker_alloc_cleanup, w);
w->env->scratch = regional_create_custom(cfg->msg_buffer_size);
w->env->scratch_buffer = ldns_buffer_new(cfg->msg_buffer_size);
@ -169,6 +170,7 @@ handle_cancel(struct libworker* w, uint8_t* buf, uint32_t len)
return;
}
q->cancelled = 1;
free(buf);
}
/** handle control command coming into server */
@ -190,7 +192,6 @@ libworker_handle_control_cmd(struct comm_point* c, void* arg,
if(r==-1) /* nothing to read now, try later */
return 0;
log_info("bg got cmd %d", (int)context_serial_getcmd(buf, len));
switch(context_serial_getcmd(buf, len)) {
default:
case UB_LIBCMD_ANSWER:
@ -198,6 +199,7 @@ libworker_handle_control_cmd(struct comm_point* c, void* arg,
(int)context_serial_getcmd(buf, len));
/* and fall through to quit */
case UB_LIBCMD_QUIT:
free(buf);
comm_base_exit(w->base);
break;
case UB_LIBCMD_NEWQUERY:
@ -222,8 +224,6 @@ libworker_handle_result_write(struct comm_point* c, void* arg,
comm_point_stop_listening(c);
return 0;
}
log_info("bg write msg %d", (int)context_serial_getcmd(
item->buf, item->len));
r = libworker_write_msg(c->fd, item->buf, item->len, 1);
if(r == -1)
return 0; /* try again later */
@ -250,60 +250,85 @@ static void*
libworker_dobg(void* arg)
{
/* setup */
struct ub_val_ctx* ctx = (struct ub_val_ctx*)arg;
struct libworker* w = libworker_setup(ctx);
uint32_t m;
int fd;
struct libworker* w = (struct libworker*)arg;
struct ub_val_ctx* ctx = w->ctx;
log_thread_set(&w->thread_num);
log_info("start bg"); /* @@@ DEBUG */
/*verbosity=3; @@@ DEBUG */
#if !defined(HAVE_PTHREAD) && !defined(HAVE_SOLARIS_THREADS)
/* we are forked */
w->is_bg_thread = 0;
/* close non-used parts of the pipes */
if(ctx->qqpipe[1] != -1) {
close(ctx->qqpipe[1]);
ctx->qqpipe[1] = -1;
}
if(ctx->rrpipe[0] != -1) {
close(ctx->rrpipe[0]);
ctx->rrpipe[0] = -1;
}
#endif
if(!w) {
log_err("libunbound bg worker init failed, nomem");
return NULL;
}
lock_basic_lock(&ctx->qqpipe_lock);
if(!(w->cmd_com=comm_point_create_raw(w->base, ctx->qqpipe[0], 0,
libworker_handle_control_cmd, w))) {
lock_basic_unlock(&ctx->qqpipe_lock);
log_err("libunbound bg worker init failed, no cmdcom");
return NULL;
}
lock_basic_unlock(&ctx->qqpipe_lock);
lock_basic_lock(&ctx->rrpipe_lock);
if(!(w->res_com=comm_point_create_raw(w->base, ctx->rrpipe[1], 1,
libworker_handle_result_write, w))) {
lock_basic_unlock(&ctx->qqpipe_lock);
log_err("libunbound bg worker init failed, no cmdcom");
log_err("libunbound bg worker init failed, no rescom");
return NULL;
}
lock_basic_unlock(&ctx->rrpipe_lock);
/* do the work */
comm_base_dispatch(w->base);
/* cleanup */
fd = ctx->rrpipe[1];
ctx->rrpipe[1] = -1;
m = UB_LIBCMD_QUIT;
close(ctx->qqpipe[0]);
ctx->qqpipe[0] = -1;
libworker_delete(w);
(void)libworker_write_msg(fd, (uint8_t*)&m, (uint32_t)sizeof(m), 0);
close(fd);
return NULL;
}
int libworker_bg(struct ub_val_ctx* ctx)
{
struct libworker* w;
/* fork or threadcreate */
lock_basic_lock(&ctx->cfglock);
if(ctx->dothread) {
ub_thread_create(&ctx->bg_tid, libworker_dobg, ctx);
lock_basic_unlock(&ctx->cfglock);
w = libworker_setup(ctx);
w->is_bg_thread = 1;
if(!w) return UB_NOMEM;
ub_thread_create(&ctx->bg_tid, libworker_dobg, w);
} else {
lock_basic_unlock(&ctx->cfglock);
switch((ctx->bg_pid=fork())) {
case 0:
lock_basic_unlock(&ctx->cfglock);
(void)libworker_dobg(ctx);
w = libworker_setup(ctx);
if(!w) fatal_exit("out of memory");
/* close non-used parts of the pipes */
close(ctx->qqpipe[1]);
close(ctx->rrpipe[0]);
ctx->qqpipe[1] = -1;
ctx->rrpipe[0] = -1;
(void)libworker_dobg(w);
exit(0);
break;
case -1:
lock_basic_unlock(&ctx->cfglock);
return UB_FORKFAIL;
default:
break;
}
}
lock_basic_unlock(&ctx->cfglock);
return UB_NOERROR;
}
@ -383,6 +408,7 @@ libworker_enter_result(struct ub_val_result* res, ldns_buffer* buf,
res->rcode = LDNS_RCODE_SERVFAIL;
rep = parse_reply(buf, temp, &rq);
if(!rep) {
log_err("cannot parse buf");
return; /* error parsing buf, or out of memory */
}
if(!fill_res(res, reply_find_answer_rrset(&rq, rep),
@ -404,27 +430,27 @@ libworker_enter_result(struct ub_val_result* res, ldns_buffer* buf,
static void
libworker_fg_done_cb(void* arg, int rcode, ldns_buffer* buf, enum sec_status s)
{
struct libworker_cb_data* d = (struct libworker_cb_data*)arg;
struct ctx_query* q = (struct ctx_query*)arg;
/* fg query is done; exit comm base */
comm_base_exit(d->w->base);
comm_base_exit(q->w->base);
if(rcode != 0) {
d->q->res->rcode = rcode;
d->q->msg_security = s;
q->res->rcode = rcode;
q->msg_security = s;
return;
}
d->q->res->rcode = LDNS_RCODE_SERVFAIL;
d->q->msg_security = 0;
d->q->msg = memdup(ldns_buffer_begin(buf), ldns_buffer_limit(buf));
d->q->msg_len = ldns_buffer_limit(buf);
if(!d->q->msg) {
q->res->rcode = LDNS_RCODE_SERVFAIL;
q->msg_security = 0;
q->msg = memdup(ldns_buffer_begin(buf), ldns_buffer_limit(buf));
q->msg_len = ldns_buffer_limit(buf);
if(!q->msg) {
return; /* the error is in the rcode */
}
/* canonname and results */
d->q->msg_security = s;
libworker_enter_result(d->q->res, buf, d->w->env->scratch, s);
q->msg_security = s;
libworker_enter_result(q->res, buf, q->w->env->scratch, s);
}
/** setup qinfo and edns */
@ -459,7 +485,6 @@ int libworker_fg(struct ub_val_ctx* ctx, struct ctx_query* q)
uint16_t qflags, qid;
struct query_info qinfo;
struct edns_data edns;
struct libworker_cb_data d;
if(!w)
return UB_INITFAIL;
if(!setup_qinfo_edns(w, q, &qinfo, &edns)) {
@ -468,12 +493,11 @@ int libworker_fg(struct ub_val_ctx* ctx, struct ctx_query* q)
}
qid = 0;
qflags = BIT_RD;
d.q = q;
d.w = w;
q->w = w;
/* see if there is a fixed answer */
if(local_zones_answer(ctx->local_zones, &qinfo, &edns,
w->back->udp_buff, w->env->scratch)) {
libworker_fg_done_cb(&d, LDNS_RCODE_NOERROR,
libworker_fg_done_cb(q, LDNS_RCODE_NOERROR,
w->back->udp_buff, sec_status_insecure);
libworker_delete(w);
free(qinfo.qname);
@ -481,7 +505,7 @@ int libworker_fg(struct ub_val_ctx* ctx, struct ctx_query* q)
}
/* process new query */
if(!mesh_new_callback(w->env->mesh, &qinfo, qflags, &edns,
w->back->udp_buff, qid, libworker_fg_done_cb, &d)) {
w->back->udp_buff, qid, libworker_fg_done_cb, q)) {
free(qinfo.qname);
return UB_NOMEM;
}
@ -504,9 +528,20 @@ add_bg_result(struct libworker* w, struct ctx_query* q, ldns_buffer* pkt,
struct libworker_res_list* item;
/* serialize and delete unneeded q */
msg = context_serialize_answer(q, err, pkt, &len);
(void)rbtree_delete(&w->ctx->queries, q->node.key);
context_query_delete(q);
if(w->is_bg_thread) {
lock_basic_lock(&w->ctx->cfglock);
q->msg_len = ldns_buffer_remaining(pkt);
q->msg = memdup(ldns_buffer_begin(pkt), q->msg_len);
if(!q->msg)
msg = context_serialize_answer(q, UB_NOMEM, NULL, &len);
else msg = context_serialize_answer(q, err, NULL, &len);
lock_basic_unlock(&w->ctx->cfglock);
} else {
msg = context_serialize_answer(q, err, pkt, &len);
(void)rbtree_delete(&w->ctx->queries, q->node.key);
w->ctx->num_async--;
context_query_delete(q);
}
if(!msg) {
log_err("out of memory for async answer");
@ -536,13 +571,26 @@ add_bg_result(struct libworker* w, struct ctx_query* q, ldns_buffer* pkt,
static void
libworker_bg_done_cb(void* arg, int rcode, ldns_buffer* buf, enum sec_status s)
{
struct libworker_cb_data* d = (struct libworker_cb_data*)arg;
struct ctx_query* q = (struct ctx_query*)arg;
d->q->msg_security = s;
if(q->cancelled) {
if(q->w->is_bg_thread) {
/* delete it now */
struct ub_val_ctx* ctx = q->w->ctx;
lock_basic_lock(&ctx->cfglock);
(void)rbtree_delete(&ctx->queries, q->node.key);
ctx->num_async--;
context_query_delete(q);
lock_basic_unlock(&ctx->cfglock);
}
/* cancelled, do not give answer */
return;
}
q->msg_security = s;
if(rcode != 0) {
error_encode(buf, rcode, NULL, 0, BIT_RD, NULL);
}
add_bg_result(d->w, d->q, buf, UB_NOERROR);
add_bg_result(q->w, q, buf, UB_NOERROR);
}
@ -553,8 +601,13 @@ handle_newq(struct libworker* w, uint8_t* buf, uint32_t len)
uint16_t qflags, qid;
struct query_info qinfo;
struct edns_data edns;
struct libworker_cb_data d;
struct ctx_query* q = context_deserialize_new_query(w->ctx, buf, len);
struct ctx_query* q;
if(w->is_bg_thread) {
q = context_lookup_new_query(w->ctx, buf, len);
} else {
q = context_deserialize_new_query(w->ctx, buf, len);
}
free(buf);
if(!q) {
log_err("failed to deserialize newq");
return;
@ -565,8 +618,6 @@ handle_newq(struct libworker* w, uint8_t* buf, uint32_t len)
}
qid = 0;
qflags = BIT_RD;
d.q = q;
d.w = w;
/* see if there is a fixed answer */
if(local_zones_answer(w->ctx->local_zones, &qinfo, &edns,
w->back->udp_buff, w->env->scratch)) {
@ -575,9 +626,10 @@ handle_newq(struct libworker* w, uint8_t* buf, uint32_t len)
free(qinfo.qname);
return;
}
q->w = w;
/* process new query */
if(!mesh_new_callback(w->env->mesh, &qinfo, qflags, &edns,
w->back->udp_buff, qid, libworker_bg_done_cb, &d)) {
w->back->udp_buff, qid, libworker_bg_done_cb, q)) {
add_bg_result(w, q, NULL, UB_NOMEM);
}
free(qinfo.qname);

View file

@ -68,8 +68,8 @@ struct libworker {
/** context we are operating under */
struct ub_val_ctx* ctx;
/** is this a background worker? */
int is_bg;
/** is this a bg worker that is threaded (not forked)? */
int is_bg_thread;
/** copy of the module environment with worker local entries. */
struct module_env* env;
@ -91,16 +91,6 @@ struct libworker {
struct libworker_res_list* res_last;
};
/**
* Libworker query cb struct
*/
struct libworker_cb_data {
/** the worker involved */
struct libworker* w;
/** the query involved */
struct ctx_query* q;
};
/**
* List of results (arbitrary command serializations) to write back
*/

View file

@ -65,6 +65,8 @@ void usage(char* argv[])
{
printf("usage: %s name ...\n", argv[0]);
printf("names are looked up at the same time, asynchronously.\n");
printf("-d : enable debug output\n");
printf("-t : use a resolver thread instead of forking a process\n");
exit(1);
}
@ -89,9 +91,28 @@ int main(int argc, char** argv)
if(argc == 1) {
usage(argv);
}
if(argc > 1 && strcmp(argv[1], "-h") == 0)
usage(argv);
argc--;
argv++;
/* create context */
ctx = ub_val_ctx_create();
if(!ctx) {
printf("could not create context, %s", strerror(errno));
return 1;
}
if(argc > 0 && strcmp(argv[0], "-d") == 0) {
ub_val_ctx_debuglevel(ctx, 3);
argc--;
argv++;
}
if(argc > 0 && strcmp(argv[0], "-t") == 0) {
ub_val_ctx_async(ctx, 1);
argc--;
argv++;
}
/* allocate array for results. */
lookups = (struct lookinfo*)calloc((size_t)argc,
sizeof(struct lookinfo));
@ -99,20 +120,10 @@ int main(int argc, char** argv)
printf("out of memory\n");
return 1;
}
/* create context */
ctx = ub_val_ctx_create();
if(!ctx) {
printf("could not create context, %s", strerror(errno));
return 1;
}
/* perform asyncronous calls */
num_wait = argc;
for(i=0; i<argc; i++) {
if(strcmp(argv[i], "-d") == 0) {
ub_val_ctx_debuglevel(ctx, 3);
continue;
}
fprintf(stderr, "start lookup %s\n", argv[i]);
lookups[i].qname = argv[i];
r = ub_val_resolve_async(ctx, argv[i], LDNS_RR_TYPE_A,
@ -142,22 +153,30 @@ int main(int argc, char** argv)
/* print lookup results */
for(i=0; i<argc; i++) {
char buf[100];
if(lookups[i].err)
if(lookups[i].err) /* error (from libunbound) */
printf("%s: error %s\n", lookups[i].qname,
ub_val_strerror(lookups[i].err));
else if(lookups[i].result->rcode != 0)
printf("%s: DNS error %d\n", lookups[i].qname,
lookups[i].result->rcode);
else if(!lookups[i].result->havedata)
printf("%s: no data %s\n", lookups[i].qname,
else if(lookups[i].result->havedata)
printf("%s: %s\n", lookups[i].qname,
inet_ntop(AF_INET, lookups[i].result->data[0],
buf, (socklen_t)sizeof(buf)));
else {
/* there is no data, why that? */
if(lookups[i].result->rcode == 0 /*noerror*/ ||
lookups[i].result->nxdomain)
printf("%s: no data %s\n", lookups[i].qname,
lookups[i].result->nxdomain?"(no such host)":
"(no IP4 address)");
else printf("%s: %s\n", lookups[i].qname,
inet_ntop(AF_INET, lookups[i].result->data[0],
buf, (socklen_t)sizeof(buf)));
else /* some error (from the server) */
printf("%s: DNS error %d\n", lookups[i].qname,
lookups[i].result->rcode);
}
}
ub_val_ctx_delete(ctx);
for(i=0; i<argc; i++) {
ub_val_result_free(lookups[i].result);
}
free(lookups);
return 0;
}

View file

@ -658,7 +658,9 @@ static void* checklock_main(void* arg)
/* Hack to get same numbers as in log file */
thr->num = *(int*)(thr->arg);
log_assert(thr->num < THRDEBUG_MAX_THREADS);
log_assert(thread_infos[thr->num] == NULL);
/* as an aside, due to this, won't work for libunbound bg thread */
if(thread_infos[thr->num] != NULL)
log_warn("thread warning, thr->num %d not NULL", thr->num);
thread_infos[thr->num] = thr;
LOCKRET(pthread_setspecific(thr_debug_key, thr));
if(check_locking_order)