unbound.c pipe code.

git-svn-id: file:///svn/unbound/trunk@883 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
Wouter Wijngaards 2008-01-22 11:10:49 +00:00
parent 84ae5c8e79
commit 0e000a8587
7 changed files with 202 additions and 51 deletions

View file

@ -1,3 +1,6 @@
22 January 2008: Wouter
- library code for async in libunbound/unbound.c.
21 January 2008: Wouter 21 January 2008: Wouter
- libworker work, netevent raw commpoints, write_msg, serialize. - libworker work, netevent raw commpoints, write_msg, serialize.

View file

@ -102,13 +102,13 @@ static int
find_id(struct ub_val_ctx* ctx, int* id) find_id(struct ub_val_ctx* ctx, int* id)
{ {
size_t tries = 0; size_t tries = 0;
ctx->next_querynum++;
while(rbtree_search(&ctx->queries, &ctx->next_querynum)) { while(rbtree_search(&ctx->queries, &ctx->next_querynum)) {
ctx->next_querynum++; /* numerical wraparound is fine */ ctx->next_querynum++; /* numerical wraparound is fine */
if(tries++ > NUM_ID_TRIES) if(tries++ > NUM_ID_TRIES)
return 0; return 0;
} }
*id = ctx->next_querynum; *id = ctx->next_querynum;
ctx->next_querynum++;
return 1; return 1;
} }

View file

@ -170,7 +170,9 @@ enum ub_ctx_err {
/** cfg change after finalize() */ /** cfg change after finalize() */
UB_AFTERFINAL = -6, UB_AFTERFINAL = -6,
/** initialization failed (bad settings) */ /** initialization failed (bad settings) */
UB_INITFAIL = -7 UB_INITFAIL = -7,
/** error in pipe communication with async bg worker */
UB_PIPE = -8
}; };
/** /**

View file

@ -50,6 +50,7 @@
#include "util/config_file.h" #include "util/config_file.h"
#include "util/alloc.h" #include "util/alloc.h"
#include "util/module.h" #include "util/module.h"
#include "util/regional.h"
#include "util/log.h" #include "util/log.h"
#include "services/modstack.h" #include "services/modstack.h"
#include "services/localzone.h" #include "services/localzone.h"
@ -104,14 +105,22 @@ ub_val_ctx_create()
return ctx; return ctx;
} }
/** delete context query */
static void
context_query_delete(struct ctx_query* q)
{
if(!q) return;
ub_val_result_free(q->res);
free(q->msg);
free(q);
}
/** delete q */ /** delete q */
static void static void
delq(rbnode_t* n, void* ATTR_UNUSED(arg)) delq(rbnode_t* n, void* ATTR_UNUSED(arg))
{ {
struct ctx_query* q = (struct ctx_query*)n; struct ctx_query* q = (struct ctx_query*)n;
if(!q) return; context_query_delete(q);
ub_val_result_free(q->res);
free(q);
} }
void void
@ -275,12 +284,14 @@ ub_val_ctx_poll(struct ub_val_ctx* ctx)
int int
ub_val_ctx_wait(struct ub_val_ctx* ctx) ub_val_ctx_wait(struct ub_val_ctx* ctx)
{ {
int r;
lock_basic_lock(&ctx->cfglock); lock_basic_lock(&ctx->cfglock);
while(ctx->num_async > 0) { while(ctx->num_async > 0) {
lock_basic_lock(&ctx->rrpipe_lock);
lock_basic_unlock(&ctx->cfglock); lock_basic_unlock(&ctx->cfglock);
(void)pollit(ctx, NULL); lock_basic_lock(&ctx->rrpipe_lock);
r = pollit(ctx, NULL);
lock_basic_unlock(&ctx->rrpipe_lock); lock_basic_unlock(&ctx->rrpipe_lock);
if(r)
ub_val_ctx_process(ctx); ub_val_ctx_process(ctx);
lock_basic_lock(&ctx->cfglock); lock_basic_lock(&ctx->cfglock);
} }
@ -291,15 +302,93 @@ ub_val_ctx_wait(struct ub_val_ctx* ctx)
int int
ub_val_ctx_fd(struct ub_val_ctx* ctx) ub_val_ctx_fd(struct ub_val_ctx* ctx)
{ {
return ctx->rrpipe[0]; int fd;
lock_basic_lock(&ctx->rrpipe_lock);
fd = ctx->rrpipe[0];
lock_basic_unlock(&ctx->rrpipe_lock);
return fd;
}
/** process answer from bg worker */
static int
process_answer(struct ub_val_ctx* ctx, uint8_t* msg, uint32_t len)
{
int err;
struct ctx_query* q;
ub_val_callback_t cb;
void* cbarg;
struct ub_val_result* res;
if(context_serial_getcmd(msg, len) != UB_LIBCMD_ANSWER) {
log_err("error: bad data from bg worker %d",
(int)context_serial_getcmd(msg, len));
return 0;
}
lock_basic_lock(&ctx->cfglock);
q = context_deserialize_answer(ctx, msg, len, &err);
if(!q) {
lock_basic_unlock(&ctx->cfglock);
return 0;
}
log_assert(q->async);
/* grab cb while locked */
cb = q->cb;
cbarg = q->cb_arg;
if(err) {
res = NULL;
ub_val_result_free(q->res);
} else {
/* parse the message, extract rcode, fill result */
ldns_buffer* buf = ldns_buffer_new(q->msg_len);
struct regional* region = regional_create();
res = q->res;
res->rcode = LDNS_RCODE_SERVFAIL;
if(region && buf) {
ldns_buffer_clear(buf);
ldns_buffer_write(buf, q->msg, q->msg_len);
ldns_buffer_flip(buf);
libworker_enter_result(res, buf, region,
q->msg_security);
}
ldns_buffer_free(buf);
regional_destroy(region);
}
q->res = NULL;
/* delete the q from list */
(void)rbtree_delete(&ctx->queries, q->node.key);
ctx->num_async--;
context_query_delete(q);
lock_basic_unlock(&ctx->cfglock);
/* no locks held while calling callback, so that library is
* re-entrant. */
(*cb)(cbarg, err, res);
return 1;
} }
int int
ub_val_ctx_process(struct ub_val_ctx* ctx) ub_val_ctx_process(struct ub_val_ctx* ctx)
{ {
/* TODO */ int r;
/* ctx->num_asynx-- when handled; */ uint8_t* msg = NULL;
return UB_NOMEM; uint32_t len = 0;
while(1) {
lock_basic_lock(&ctx->rrpipe_lock);
r = libworker_read_msg(ctx->rrpipe[0], &msg, &len, 1);
lock_basic_unlock(&ctx->rrpipe_lock);
if(r == 0)
return UB_PIPE;
else if(r == -1)
return UB_NOERROR;
if(!process_answer(ctx, msg, len)) {
free(msg);
return UB_PIPE;
}
free(msg);
}
return UB_NOERROR;
} }
int int
@ -327,15 +416,14 @@ ub_val_resolve(struct ub_val_ctx* ctx, char* name, int rrtype,
r = libworker_fg(ctx, q); r = libworker_fg(ctx, q);
if(r) { if(r) {
ub_val_result_free(q->res); context_query_delete(q);
free(q);
return r; return r;
} }
*result = q->res; *result = q->res;
q->res = NULL;
(void)rbtree_delete(&ctx->queries, q->node.key); (void)rbtree_delete(&ctx->queries, q->node.key);
free(q->msg); context_query_delete(q);
free(q);
return UB_NOERROR; return UB_NOERROR;
} }
@ -344,7 +432,10 @@ ub_val_resolve_async(struct ub_val_ctx* ctx, char* name, int rrtype,
int rrclass, void* mydata, ub_val_callback_t callback, int* async_id) int rrclass, void* mydata, ub_val_callback_t callback, int* async_id)
{ {
struct ctx_query* q; struct ctx_query* q;
uint8_t* msg = NULL;
uint32_t len = 0;
*async_id = 0;
lock_basic_lock(&ctx->cfglock); lock_basic_lock(&ctx->cfglock);
if(!ctx->finalized) { if(!ctx->finalized) {
int r = context_finalize(ctx); int r = context_finalize(ctx);
@ -367,29 +458,61 @@ ub_val_resolve_async(struct ub_val_ctx* ctx, char* name, int rrtype,
q = context_new(ctx, name, rrtype, rrclass, callback, mydata); q = context_new(ctx, name, rrtype, rrclass, callback, mydata);
if(!q) if(!q)
return UB_NOMEM; return UB_NOMEM;
/* TODO write over pipe to background worker */
*async_id = q->querynum; /* write over pipe to background worker */
lock_basic_lock(&ctx->cfglock);
msg = context_serialize_new_query(q, &len);
if(!msg) {
(void)rbtree_delete(&ctx->queries, q->node.key);
ctx->num_async--;
context_query_delete(q);
lock_basic_unlock(&ctx->cfglock);
return UB_NOMEM; return UB_NOMEM;
} }
*async_id = q->querynum;
lock_basic_unlock(&ctx->cfglock);
lock_basic_lock(&ctx->qqpipe_lock);
libworker_write_msg(ctx->qqpipe[1], msg, len, 0);
lock_basic_unlock(&ctx->qqpipe_lock);
free(msg);
return UB_NOERROR;
}
int int
ub_val_cancel(struct ub_val_ctx* ctx, int async_id) ub_val_cancel(struct ub_val_ctx* ctx, int async_id)
{ {
struct ctx_query* q; struct ctx_query* q;
uint8_t* msg = NULL;
uint32_t len = 0;
lock_basic_lock(&ctx->cfglock); lock_basic_lock(&ctx->cfglock);
q = (struct ctx_query*)rbtree_search(&ctx->queries, &async_id); q = (struct ctx_query*)rbtree_search(&ctx->queries, &async_id);
if(!q || !q->async) {
/* it is not there, so nothing to do */
lock_basic_unlock(&ctx->cfglock); lock_basic_unlock(&ctx->cfglock);
if(!q || !q->async) /* it is not there, so nothing to do */
return UB_NOERROR; return UB_NOERROR;
/* TODO ; send cancel to background worker */ }
log_assert(q->async);
lock_basic_lock(&ctx->cfglock); msg = context_serialize_cancel(q, &len);
(void)rbtree_delete(&ctx->queries, &async_id); if(!msg) {
ctx->num_async--;
lock_basic_unlock(&ctx->cfglock); lock_basic_unlock(&ctx->cfglock);
return UB_NOMEM; return UB_NOMEM;
} }
/* delete it */
(void)rbtree_delete(&ctx->queries, q->node.key);
ctx->num_async--;
context_query_delete(q);
lock_basic_unlock(&ctx->cfglock);
/* send cancel to background worker */
lock_basic_lock(&ctx->qqpipe_lock);
libworker_write_msg(ctx->qqpipe[1], msg, len, 0);
lock_basic_unlock(&ctx->qqpipe_lock);
free(msg);
return UB_NOERROR;
}
void void
ub_val_result_free(struct ub_val_result* result) ub_val_result_free(struct ub_val_result* result)
{ {
@ -418,6 +541,7 @@ ub_val_strerror(int err)
case UB_FORKFAIL: return "could not fork"; case UB_FORKFAIL: return "could not fork";
case UB_INITFAIL: return "initialization failure"; case UB_INITFAIL: return "initialization failure";
case UB_AFTERFINAL: return "setting change after finalize"; case UB_AFTERFINAL: return "setting change after finalize";
case UB_PIPE: return "error in pipe communication with async";
default: return "unknown error"; default: return "unknown error";
} }
} }

View file

@ -179,8 +179,7 @@ struct ub_val_result {
/** /**
* Callback for results of async queries. * Callback for results of async queries.
* The readable function definition looks like: * The readable function definition looks like:
* void my_callback(void* my_arg, int err, int secure, int havedata, * void my_callback(void* my_arg, int err, struct ub_val_result* result);
* struct ub_val_result* result);
* It is called with * It is called with
* void* my_arg: your pointer to a (struct of) data of your choice, * void* my_arg: your pointer to a (struct of) data of your choice,
* or NULL. * or NULL.
@ -333,7 +332,7 @@ int ub_val_resolve(struct ub_val_ctx* ctx, char* name, int rrtype,
/** /**
* Perform resolution and validation of the target name. * Perform resolution and validation of the target name.
* Asynchronous, after a while, the callback will be called with your * Asynchronous, after a while, the callback will be called with your
* data and the result + secure status. * data and the result.
* @param ctx: context. * @param ctx: context.
* If no thread or process has been created yet to perform the * If no thread or process has been created yet to perform the
* work in the background, it is created now. * work in the background, it is created now.
@ -345,8 +344,7 @@ int ub_val_resolve(struct ub_val_ctx* ctx, char* name, int rrtype,
* and is passed on to the callback function. * and is passed on to the callback function.
* @param callback: this is called on completion of the resolution. * @param callback: this is called on completion of the resolution.
* It is called as: * It is called as:
* void callback(void* mydata, int err, int secure, int havedata, * void callback(void* mydata, int err, struct ub_val_result* result)
* struct ub_val_result* result)
* with mydata: the same as passed here, you may pass NULL, * with mydata: the same as passed here, you may pass NULL,
* with err: is 0 when a result has been found. * with err: is 0 when a result has been found.
* with result: a newly allocated result structure. * with result: a newly allocated result structure.

View file

@ -368,12 +368,37 @@ fill_res(struct ub_val_result* res, struct ub_packed_rrset_key* answer,
return 1; return 1;
} }
/** fill result from parsed message, on error fills servfail */
void
libworker_enter_result(struct ub_val_result* res, ldns_buffer* buf,
struct regional* temp, enum sec_status msg_security)
{
struct query_info rq;
struct reply_info* rep;
res->rcode = LDNS_RCODE_SERVFAIL;
rep = parse_reply(buf, temp, &rq);
if(!rep) {
return; /* error parsing buf, or out of memory */
}
if(!fill_res(res, reply_find_answer_rrset(&rq, rep),
reply_find_final_cname_target(&rq, rep), &rq))
return; /* out of memory */
/* rcode, havedata, nxdomain, secure, bogus */
res->rcode = (int)FLAGS_GET_RCODE(rep->flags);
if(res->data && res->data[0])
res->havedata = 1;
if(res->rcode == LDNS_RCODE_NXDOMAIN)
res->nxdomain = 1;
if(msg_security == sec_status_secure)
res->secure = 1;
if(msg_security == sec_status_bogus)
res->bogus = 1;
}
/** callback with fg results */ /** callback with fg results */
static void static void
libworker_fg_done_cb(void* arg, int rcode, ldns_buffer* buf, enum sec_status s) libworker_fg_done_cb(void* arg, int rcode, ldns_buffer* buf, enum sec_status s)
{ {
struct query_info rq; /* replied query */
struct reply_info* rep;
struct libworker_fg_data* d = (struct libworker_fg_data*)arg; struct libworker_fg_data* d = (struct libworker_fg_data*)arg;
/* fg query is done; exit comm base */ /* fg query is done; exit comm base */
comm_base_exit(d->w->base); comm_base_exit(d->w->base);
@ -389,29 +414,13 @@ libworker_fg_done_cb(void* arg, int rcode, ldns_buffer* buf, enum sec_status s)
d->q->msg = memdup(ldns_buffer_begin(buf), ldns_buffer_limit(buf)); d->q->msg = memdup(ldns_buffer_begin(buf), ldns_buffer_limit(buf));
d->q->msg_len = ldns_buffer_limit(buf); d->q->msg_len = ldns_buffer_limit(buf);
if(!d->q->msg) { if(!d->q->msg) {
return; /* error in rcode */ return; /* the error is in the rcode */
} }
/* canonname and results */ /* canonname and results */
rep = parse_reply(buf, d->w->env->scratch, &rq);
if(!rep) {
return; /* error parsing buf, or out of memory */
}
if(!fill_res(d->q->res, reply_find_answer_rrset(&rq, rep),
reply_find_final_cname_target(&rq, rep), &rq))
return; /* out of memory */
/* rcode, havedata, nxdomain, secure, bogus */
d->q->res->rcode = (int)LDNS_RCODE_WIRE(d->q->msg);
if(d->q->res->data && d->q->res->data[0])
d->q->res->havedata = 1;
if(d->q->res->rcode == LDNS_RCODE_NXDOMAIN)
d->q->res->nxdomain = 1;
if(s == sec_status_secure)
d->q->res->secure = 1;
if(s == sec_status_bogus)
d->q->res->bogus = 1;
d->q->msg_security = s; d->q->msg_security = s;
libworker_enter_result(d->q->res, buf, d->w->env->scratch, s);
} }
/** setup qinfo and edns */ /** setup qinfo and edns */

View file

@ -44,6 +44,7 @@
#ifndef LIBUNBOUND_WORKER_H #ifndef LIBUNBOUND_WORKER_H
#define LIBUNBOUND_WORKER_H #define LIBUNBOUND_WORKER_H
struct ub_val_ctx; struct ub_val_ctx;
struct ub_val_result;
struct module_env; struct module_env;
struct comm_base; struct comm_base;
struct outside_network; struct outside_network;
@ -54,6 +55,8 @@ struct module_qstate;
struct comm_point; struct comm_point;
struct comm_reply; struct comm_reply;
struct libworker_res_list; struct libworker_res_list;
struct regional;
enum sec_status;
/** /**
* The library-worker status structure * The library-worker status structure
@ -217,4 +220,16 @@ int libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock);
*/ */
int libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock); int libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock);
/**
* fill result from parsed message, on error fills servfail
* @param res: is clear at start, filled in at end.
* @param buf: contains DNS message.
* @param temp: temporary buffer for parse.
* @param msg_security: security status of the DNS message.
* On error, the res may contain a different status
* (out of memory is not secure, not bogus).
*/
void libworker_enter_result(struct ub_val_result* res, ldns_buffer* buf,
struct regional* temp, enum sec_status msg_security);
#endif /* LIBUNBOUND_WORKER_H */ #endif /* LIBUNBOUND_WORKER_H */