Mesh used now.

git-svn-id: file:///svn/unbound/trunk@423 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
Wouter Wijngaards 2007-06-26 13:06:44 +00:00
parent 489e48b3d1
commit 06cfef3252
11 changed files with 334 additions and 765 deletions

View file

@ -186,7 +186,6 @@ static void daemon_setup_modules(struct daemon* daemon)
daemon->env->worker = NULL;
daemon->env->send_packet = &worker_send_packet;
daemon->env->send_query = &worker_send_query;
daemon->env->remove_subqueries = &worker_slumber_subqueries;
for(i=0; i<daemon->num_modules; i++) {
log_info("init module %d: %s", i, daemon->modfunc[i]->name);
if(!(*daemon->modfunc[i]->init)(daemon->env, i)) {
@ -194,7 +193,6 @@ static void daemon_setup_modules(struct daemon* daemon)
daemon->modfunc[i]->name);
}
}
}
/**

View file

@ -42,6 +42,7 @@
#include "config.h"
#include "daemon/stats.h"
#include "daemon/worker.h"
#include "services/mesh.h"
void server_stats_init(struct server_stats* stats)
{
@ -51,9 +52,9 @@ void server_stats_init(struct server_stats* stats)
void server_stats_querymiss(struct server_stats* stats, struct worker* worker)
{
stats->num_queries_missed_cache++;
stats->sum_query_list_size += worker->num_requests;
if(worker->num_requests > stats->max_query_list_size)
stats->max_query_list_size = worker->num_requests;
stats->sum_query_list_size += worker->env.mesh->all.count;
if(worker->env.mesh->all.count > stats->max_query_list_size)
stats->max_query_list_size = worker->env.mesh->all.count;
}
void server_stats_log(struct server_stats* stats, int threadnum)

View file

@ -81,248 +81,19 @@ worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
log_err("write socket: %s", strerror(errno));
}
/** delete subrequest */
static void
qstate_cleanup(struct worker* worker, struct module_qstate* qstate)
{
int i;
if(!qstate)
return;
/* call de-init while all is OK */
for(i=0; i<worker->daemon->num_modules; i++)
(*worker->daemon->modfunc[i]->clear)(qstate, i);
/* cleanup this query */
region_free_all(qstate->region);
query_info_clear(&qstate->qinfo);
if(qstate->parent) {
/* subquery of parent */
module_subreq_remove(&qstate->parent->subquery_first, qstate);
region_destroy(qstate->region);
free(qstate);
} else if (!qstate->work_info) {
/* slumbering query */
module_subreq_remove(&worker->slumber_list, qstate);
region_destroy(qstate->region);
free(qstate);
verbose(VERB_ALGO, "cleanup: slumber list has %d entries",
module_subreq_num(worker->slumber_list));
}
}
/** delete subrequest recursively */
static void
qstate_free_recurs_list(struct worker* worker, struct module_qstate* list)
{
struct module_qstate* n;
/* remove subqueries */
while(list) {
n = list->subquery_next;
qstate_free_recurs_list(worker, list->subquery_first);
qstate_cleanup(worker, list);
list = n;
}
}
/** delete subrequest */
static void
qstate_free(struct worker* worker, struct module_qstate* qstate)
{
if(!qstate)
return;
worker_slumber_subqueries(qstate);
qstate_cleanup(worker, qstate);
}
/** release workrequest back to the freelist,
* note that the w->qinfo still needs to be cleared after this.
*/
static void
req_release(struct work_query* w)
{
struct worker* worker = w->state.env->worker;
if(worker->num_requests == worker->request_size) {
/* no longer at max, start accepting again. */
listen_resume(worker->front);
}
log_assert(worker->num_requests >= 1);
worker->num_requests --;
w->next = worker->free_queries;
worker->free_queries = w;
verbose(VERB_ALGO, "released query to pool, %d in use",
(int)worker->num_requests);
}
/** reply to query with given error code */
static void
replyerror(int r, struct work_query* w)
{
error_encode(w->query_reply.c->buffer, r, &w->state.qinfo,
w->query_id, w->state.query_flags, &w->state.edns);
comm_point_send_reply(&w->query_reply);
req_release(w);
query_info_clear(&w->state.qinfo);
}
/** init qstate module states */
static void
set_extstates_initial(struct worker* worker, struct module_qstate* qstate)
{
int i;
for(i=0; i<worker->daemon->num_modules; i++)
qstate->ext_state[i] = module_state_initial;
}
/** recursive debug logging of (sub)query structure */
static void
run_debug(struct module_qstate* p, int d)
{
char buf[80+1+1]; /* max nn=80; marker is 1, zero at end is 1 */
int i, nn = d*2;
if(nn > 80)
nn = 80;
for(i=0; i<nn; i++) {
buf[i] = ' ';
}
buf[i++] = 'o';
buf[i] = 0;
log_query_info(VERB_ALGO, buf, &p->qinfo);
for(p = p->subquery_first; p; p = p->subquery_next) {
run_debug(p, d+1);
}
}
/** find runnable recursive */
static struct module_qstate*
find_run_in(struct module_qstate* pfirst)
{
struct module_qstate* q, *p;
for(p = pfirst; p; p = p->subquery_next) {
if(p->ext_state[p->curmod] == module_state_initial)
return p;
if((q=find_run_in(p->subquery_first)))
return q;
}
return NULL;
}
/** find other runnable subqueries */
static struct module_qstate*
find_runnable(struct module_qstate* subq)
{
struct module_qstate* p = subq;
verbose(VERB_ALGO, "find runnable");
if(p->subquery_next && p->subquery_next->ext_state[
p->subquery_next->curmod] == module_state_initial)
return p->subquery_next;
while(p->parent)
p = p->parent;
if(verbosity >= VERB_ALGO)
run_debug(p, 0);
p = find_run_in(p->subquery_first);
if(p) return p;
p = find_run_in(subq->env->worker->slumber_list);
return p;
}
/** process incoming request */
static void
worker_process_query(struct worker* worker, struct module_qstate* qstate,
enum module_ev event, struct outbound_entry* entry)
{
enum module_ext_state s;
verbose(VERB_DETAIL, "worker process handle event");
if(event == module_event_new) {
qstate->curmod = 0;
set_extstates_initial(worker, qstate);
}
/* allow current module to run */
/* loops for subqueries or parent queries. */
while(1) {
(*worker->daemon->modfunc[qstate->curmod]->operate)(qstate,
event, qstate->curmod, entry);
region_free_all(worker->scratchpad);
qstate->reply = NULL;
s = qstate->ext_state[qstate->curmod];
verbose(VERB_ALGO, "worker_process_query: module "
"exit state is %s", strextstate(s));
if(s == module_state_initial) {
log_err("module exit in initial state, "
"it loops; parent query is aborted");
while(qstate->parent)
qstate = qstate->parent;
s = module_error;
}
/* examine results, start further modules, etc. */
if(s != module_error && s != module_finished) {
/* see if we can continue with other subrequests */
struct module_qstate* nxt = find_runnable(qstate);
if(nxt) {
/* start submodule */
qstate = nxt;
set_extstates_initial(worker, qstate);
entry = NULL;
event = module_event_pass;
continue;
}
}
/* subrequest done */
if(s == module_error && qstate->parent) {
struct module_qstate* up = qstate->parent;
qstate_free(worker, qstate);
qstate = up;
entry = NULL;
event = module_event_subq_error;
continue;
}
if(s == module_finished && qstate->parent) {
struct module_qstate* up = qstate->parent;
qstate_free(worker, qstate);
qstate = up;
entry = NULL;
event = module_event_subq_done;
continue;
}
break;
}
/* request done */
if(s == module_error) {
if(qstate->work_info) {
replyerror(LDNS_RCODE_SERVFAIL, qstate->work_info);
}
qstate_free(worker, qstate);
verbose(VERB_DETAIL, "worker process suspend");
return;
}
if(s == module_finished) {
if(qstate->work_info) {
memcpy(ldns_buffer_begin(qstate->work_info->
query_reply.c->buffer), &qstate->
work_info->query_id, sizeof(qstate->
work_info->query_id));
comm_point_send_reply(&qstate->work_info->query_reply);
req_release(qstate->work_info);
}
qstate_free(worker, qstate);
verbose(VERB_DETAIL, "worker process suspend");
return;
}
/* suspend, waits for wakeup callback */
verbose(VERB_DETAIL, "worker process suspend");
}
/** process incoming replies from the network */
static int
worker_handle_reply(struct comm_point* c, void* arg, int error,
struct comm_reply* reply_info)
{
struct work_query* w = (struct work_query*)arg;
struct worker* worker = w->state.env->worker;
struct module_qstate* q = (struct module_qstate*)arg;
struct worker* worker = q->env->worker;
struct outbound_entry e;
e.qstate = q;
e.qsent = NULL;
w->state.reply = reply_info;
if(error != 0) {
worker_process_query(worker, &w->state,
module_event_timeout, NULL);
mesh_report_reply(worker->env.mesh, &e, 0, reply_info);
return 0;
}
/* sanity check. */
@ -332,11 +103,10 @@ worker_handle_reply(struct comm_point* c, void* arg, int error,
|| LDNS_QDCOUNT(ldns_buffer_begin(c->buffer)) > 1) {
/* error becomes timeout for the module as if this reply
* never arrived. */
worker_process_query(worker, &w->state,
module_event_timeout, NULL);
mesh_report_reply(worker->env.mesh, &e, 0, reply_info);
return 0;
}
worker_process_query(worker, &w->state, module_event_reply, NULL);
mesh_report_reply(worker->env.mesh, &e, 1, reply_info);
return 0;
}
@ -349,10 +119,8 @@ worker_handle_service_reply(struct comm_point* c, void* arg, int error,
struct worker* worker = e->qstate->env->worker;
verbose(VERB_ALGO, "worker scvd callback for qstate %p", e->qstate);
e->qstate->reply = reply_info;
if(error != 0) {
worker_process_query(worker, e->qstate,
module_event_timeout, e);
mesh_report_reply(worker->env.mesh, e, 0, reply_info);
return 0;
}
/* sanity check. */
@ -363,11 +131,10 @@ worker_handle_service_reply(struct comm_point* c, void* arg, int error,
/* error becomes timeout for the module as if this reply
* never arrived. */
verbose(VERB_ALGO, "worker: bad reply handled as timeout");
worker_process_query(worker, e->qstate,
module_event_timeout, e);
mesh_report_reply(worker->env.mesh, e, 0, reply_info);
return 0;
}
worker_process_query(worker, e->qstate, module_event_reply, e);
mesh_report_reply(worker->env.mesh, e, 1, reply_info);
return 0;
}
@ -503,7 +270,6 @@ worker_handle_request(struct comm_point* c, void* arg, int error,
hashvalue_t h;
struct lruhash_entry* e;
struct query_info qinfo;
struct work_query* w;
struct edns_data edns;
if(error != NETEVENT_NOERROR) {
@ -588,45 +354,29 @@ worker_handle_request(struct comm_point* c, void* arg, int error,
}
ldns_buffer_rewind(c->buffer);
server_stats_querymiss(&worker->stats, worker);
/* perform memory allocation(s) */
if(!query_info_allocqname(&qinfo)) {
comm_point_drop_reply(repinfo);
return 0;
}
/* grab a work request structure for this new request */
if(!(w = worker->free_queries)) {
if(worker->env.mesh->all.count > worker->request_size) {
/* we could get this due to a slow tcp incoming query,
that started before we performed listen_pushback */
verbose(VERB_DETAIL, "worker: too many incoming requests "
"active. dropping incoming query.");
verbose(VERB_ALGO, "currently servicing %d of %d queries",
(int)worker->num_requests, (int)worker->request_size);
(int)worker->env.mesh->all.count,
(int)worker->request_size);
worker->stats.num_query_list_exceeded++;
comm_point_drop_reply(repinfo);
query_info_clear(&qinfo);
return 0;
}
w->state.edns = edns;
worker->free_queries = w->next;
worker->num_requests ++;
log_assert(worker->num_requests <= worker->request_size);
if(worker->num_requests == worker->request_size) {
mesh_new_client(worker->env.mesh, &qinfo,
ldns_buffer_read_u16_at(c->buffer, 2),
&edns, repinfo, *(uint16_t*)ldns_buffer_begin(c->buffer));
if(worker->env.mesh->all.count == worker->request_size) {
/* the max request number has been reached, stop accepting */
listen_pushback(worker->front);
}
/* init request */
w->next = NULL;
w->state.query_hash = h;
memcpy(&w->query_reply, repinfo, sizeof(struct comm_reply));
memcpy(&w->state.qinfo, &qinfo, sizeof(struct query_info));
memcpy(&w->query_id, ldns_buffer_begin(c->buffer), sizeof(uint16_t));
w->state.query_flags = ldns_buffer_read_u16_at(c->buffer, 2);
/* answer it */
w->state.buf = c->buffer;
worker_process_query(worker, &w->state, module_event_new, NULL);
return 0;
}
@ -694,51 +444,6 @@ worker_create(struct daemon* daemon, int id)
return worker;
}
/** create request handling structures */
static int
reqs_init(struct worker* worker)
{
size_t i;
for(i=0; i<worker->request_size; i++) {
struct work_query* q = (struct work_query*)calloc(1,
sizeof(struct work_query));
if(!q) return 0;
q->state.buf = worker->front->udp_buff;
q->state.region = region_create_custom(malloc, free, 1024,
64, 16, 0);
if(!q->state.region) {
free(q);
return 0;
}
q->state.env = &worker->env;
q->state.parent = NULL;
q->state.work_info = q;
q->next = worker->free_queries;
worker->free_queries = q;
q->all_next = worker->all_queries;
worker->all_queries = q;
}
return 1;
}
/** delete request list */
static void
reqs_delete(struct worker* worker)
{
struct work_query* q = worker->all_queries;
struct work_query* n;
while(q) {
n = q->all_next;
log_assert(q->state.env->worker == worker);
/* comm_reply closed in outside_network_delete */
qstate_free_recurs_list(worker, q->state.subquery_first);
qstate_cleanup(worker, &q->state);
region_destroy(q->state.region);
free(q);
q = n;
}
}
int
worker_init(struct worker* worker, struct config_file *cfg,
struct listen_port* ports, size_t buffer_size, int do_sigs)
@ -825,11 +530,6 @@ worker_init(struct worker* worker, struct config_file *cfg,
return 0;
}
worker->request_size = cfg->num_queries_per_thread;
if(!reqs_init(worker)) {
worker_delete(worker);
return 0;
}
worker->slumber_list = NULL;
server_stats_init(&worker->stats);
alloc_init(&worker->alloc, &worker->daemon->superalloc,
@ -843,6 +543,7 @@ worker_init(struct worker* worker, struct config_file *cfg,
worker->daemon->modfunc, &worker->env);
worker->env.detach_subs = &mesh_detach_subs;
worker->env.attach_sub = &mesh_attach_sub;
worker->env.kill_sub = &mesh_state_delete;
worker->env.query_done = &mesh_query_done;
worker->env.walk_supers = &mesh_walk_supers;
if(!worker->env.mesh) {
@ -865,8 +566,6 @@ worker_delete(struct worker* worker)
return;
server_stats_log(&worker->stats, worker->thread_num);
mesh_delete(worker->env.mesh);
reqs_delete(worker);
qstate_free_recurs_list(worker, worker->slumber_list);
listen_delete(worker->front);
outside_network_delete(worker->back);
comm_signal_delete(worker->comsig);
@ -894,11 +593,11 @@ worker_send_packet(ldns_buffer* pkt, struct sockaddr_storage* addr,
struct worker* worker = q->env->worker;
if(use_tcp) {
return pending_tcp_query(worker->back, pkt, addr, addrlen,
timeout, worker_handle_reply, q->work_info,
timeout, worker_handle_reply, q,
worker->rndstate) != 0;
}
return pending_udp_query(worker->back, pkt, addr, addrlen,
timeout*1000, worker_handle_reply, q->work_info,
timeout*1000, worker_handle_reply, q,
worker->rndstate) != 0;
}
@ -934,21 +633,3 @@ worker_send_query(uint8_t* qname, size_t qnamelen, uint16_t qtype,
}
return e;
}
void
worker_slumber_subqueries(struct module_qstate* qstate)
{
struct worker* worker = qstate->env->worker;
if(qstate->subquery_first) {
while(qstate->subquery_first) {
/* put subqueries on slumber list */
struct module_qstate* s = qstate->subquery_first;
module_subreq_remove(&qstate->subquery_first, s);
s->parent = NULL;
s->work_info = NULL;
module_subreq_insert(&worker->slumber_list, s);
}
verbose(VERB_ALGO, "worker: slumber list has %d entries",
module_subreq_num(worker->slumber_list));
}
}

View file

@ -68,20 +68,6 @@ enum worker_commands {
worker_cmd_quit
};
/** information per query that is in processing */
struct work_query {
/** next query in freelist */
struct work_query* next;
/** query state */
struct module_qstate state;
/** the query reply destination, packet buffer and where to send. */
struct comm_reply query_reply;
/** id of query, in network byteorder. */
uint16_t query_id;
/** next query in all-list */
struct work_query* all_next;
};
/**
* Structure holding working information for unbound.
* Holds globally visible information.
@ -108,16 +94,8 @@ struct worker {
/** commpoint to listen to commands. */
struct comm_point* cmd_com;
/** number of requests currently active */
size_t num_requests;
/** number of requests that can be handled by this worker */
size_t request_size;
/** the free working queries */
struct work_query* free_queries;
/** list of all working queries */
struct work_query* all_queries;
/** list of slumbering states, with promiscuous queries */
struct module_qstate* slumber_list;
/** random() table for this worker. */
struct ub_randstate* rndstate;
@ -215,10 +193,4 @@ struct outbound_entry* worker_send_query(uint8_t* qname, size_t qnamelen,
struct sockaddr_storage* addr, socklen_t addrlen,
struct module_qstate* q);
/**
* Remove subqueries, by moving them to the slumber list.
* @param qstate: this state has subqueries removed.
*/
void worker_slumber_subqueries(struct module_qstate* qstate);
#endif /* DAEMON_WORKER_H */

View file

@ -1,3 +1,8 @@
26 June 2007: Wouter
- mesh is called by worker, and iterator uses it.
This removes the hierarchical code.
QueryTargets state and Finished state are merged for iterator.
25 June 2007: Wouter
- more mesh work.
- error encode routine for ease.

View file

@ -145,31 +145,24 @@ fwd_new(struct module_qstate* qstate, int id)
/** iterator handle reply from authoritative server */
static int
iter_handlereply(struct module_qstate* qstate, int id,
struct outbound_entry* ATTR_UNUSED(outbound))
iter_handlereply(struct module_qstate* qstate, int id)
{
struct module_env* env = qstate->env;
uint16_t us = qstate->edns.udp_size;
struct query_info reply_qinfo;
struct reply_info* reply_msg;
struct edns_data reply_edns;
hashvalue_t h;
int r;
if((r=reply_info_parse(qstate->reply->c->buffer, env->alloc,
&reply_qinfo, &reply_msg, qstate->env->scratch,
&reply_edns))!=0)
return 0;
qstate->edns.edns_version = EDNS_ADVERTISED_VERSION;
qstate->edns.udp_size = EDNS_ADVERTISED_SIZE;
qstate->edns.ext_rcode = 0;
qstate->edns.bits &= EDNS_DO;
if(!reply_info_answer_encode(&reply_qinfo, reply_msg, 0,
qstate->query_flags, qstate->buf, 0, 0,
qstate->env->scratch, us, &qstate->edns,
(int)(qstate->edns.bits&EDNS_DO)))
return 0;
dns_cache_store_msg(qstate->env, &reply_qinfo, qstate->query_hash,
reply_msg);
h = query_info_hash(&qstate->qinfo);
(*qstate->env->query_done)(qstate, LDNS_RCODE_NOERROR, reply_msg);
/* there should be no dependencies in this forwarding mode */
(*qstate->env->walk_supers)(qstate, id, LDNS_RCODE_SERVFAIL, NULL);
dns_cache_store_msg(qstate->env, &reply_qinfo, h, reply_msg);
qstate->ext_state[id] = module_finished;
return 1;
}
@ -181,26 +174,45 @@ perform_forward(struct module_qstate* qstate, enum module_ev event, int id,
{
verbose(VERB_ALGO, "iterator: forwarding");
if(event == module_event_new) {
if(!fwd_new(qstate, id))
if(!fwd_new(qstate, id)) {
(*qstate->env->query_done)(qstate,
LDNS_RCODE_SERVFAIL, NULL);
(*qstate->env->walk_supers)(qstate, id,
LDNS_RCODE_SERVFAIL, NULL);
qstate->ext_state[id] = module_error;
return;
}
return;
}
/* it must be a query reply */
if(!outbound) {
verbose(VERB_ALGO, "query reply was not serviced");
(*qstate->env->query_done)(qstate, LDNS_RCODE_SERVFAIL, NULL);
(*qstate->env->walk_supers)(qstate, id,
LDNS_RCODE_SERVFAIL, NULL);
qstate->ext_state[id] = module_error;
return;
}
if(event == module_event_timeout || event == module_event_error) {
(*qstate->env->query_done)(qstate, LDNS_RCODE_SERVFAIL, NULL);
(*qstate->env->walk_supers)(qstate, id,
LDNS_RCODE_SERVFAIL, NULL);
qstate->ext_state[id] = module_error;
return;
}
if(event == module_event_reply) {
if(!iter_handlereply(qstate, id, outbound))
if(!iter_handlereply(qstate, id)) {
(*qstate->env->query_done)(qstate,
LDNS_RCODE_SERVFAIL, NULL);
(*qstate->env->walk_supers)(qstate, id,
LDNS_RCODE_SERVFAIL, NULL);
qstate->ext_state[id] = module_error;
}
return;
}
log_err("bad event for iterator[forwarding]");
(*qstate->env->query_done)(qstate, LDNS_RCODE_SERVFAIL, NULL);
(*qstate->env->walk_supers)(qstate, id, LDNS_RCODE_SERVFAIL, NULL);
qstate->ext_state[id] = module_error;
}
@ -247,8 +259,54 @@ final_state(struct iter_qstate* iq)
return next_state(iq, iq->final_state);
}
/**
* Callback routine to handle errors in parent query states
* @param qstate: query state that failed.
* @param id: module id.
* @param super: super state.
* @param rcode: the error code.
*/
static void
error_supers(struct module_qstate* qstate, int id,
struct module_qstate* super, int rcode)
{
struct iter_qstate* super_iq = (struct iter_qstate*)super->minfo[id];
log_assert(rcode != LDNS_RCODE_NOERROR);
if(qstate->qinfo.qtype == LDNS_RR_TYPE_A ||
qstate->qinfo.qtype == LDNS_RR_TYPE_AAAA) {
/* mark address as failed. */
struct delegpt_ns* dpns = NULL;
if(super_iq->dp)
dpns = delegpt_find_ns(super_iq->dp,
qstate->qinfo.qname, qstate->qinfo.qname_len);
if(!dpns) {
/* not interested */
verbose(VERB_ALGO, "subq error, but not interested");
log_query_info(VERB_ALGO, "superq", &super->qinfo);
delegpt_log(super_iq->dp);
log_assert(0);
return;
}
dpns->resolved = 1; /* mark as failed */
super_iq->num_target_queries--;
}
if(qstate->qinfo.qtype == LDNS_RR_TYPE_NS) {
/* prime failed to get delegation */
super_iq->dp = NULL;
}
/* evaluate targets again */
super_iq->state = QUERYTARGETS_STATE;
/* super becomes runnable, and will process this change */
}
/**
* Return an error to the client
* @param qstate: our query state
* @param id: module id
* @param rcode: error code (DNS errcode).
* @return: 0 for use by caller, to make notation easy, like:
* return error_response(..).
*/
static int
error_response(struct module_qstate* qstate, int id, int rcode)
@ -256,21 +314,10 @@ error_response(struct module_qstate* qstate, int id, int rcode)
verbose(VERB_DETAIL, "return error response %s",
ldns_lookup_by_id(ldns_rcodes, rcode)?
ldns_lookup_by_id(ldns_rcodes, rcode)->name:"??");
qinfo_query_encode(qstate->buf, &qstate->qinfo);
LDNS_RCODE_SET(ldns_buffer_begin(qstate->buf), rcode);
LDNS_RA_SET(ldns_buffer_begin(qstate->buf));
LDNS_QR_SET(ldns_buffer_begin(qstate->buf));
if((qstate->query_flags & BIT_RD))
LDNS_RD_SET(ldns_buffer_begin(qstate->buf));
if((qstate->query_flags & BIT_CD))
LDNS_CD_SET(ldns_buffer_begin(qstate->buf));
if(qstate->parent) {
/* return subquery error module event to parent */
qstate->ext_state[id] = module_error;
return 0;
}
/* return to client */
/* tell clients that we failed */
(*qstate->env->query_done)(qstate, rcode, NULL);
/* tell our parents that we failed */
(*qstate->env->walk_supers)(qstate, id, rcode, &error_supers);
qstate->ext_state[id] = module_finished;
return 0;
}
@ -304,42 +351,6 @@ iter_prepend(struct iter_qstate* iq, struct dns_msg* msg,
return 1;
}
/**
* Encode response message for iterator responses. Into response buffer.
* On error an error message is encoded.
* @param qstate: query state. With qinfo information.
* @param iq: iterator query state. With prepend list.
* @param msg: answer message.
* @param id: module id (used in error condition).
*/
static void
iter_encode_respmsg(struct module_qstate* qstate, struct iter_qstate* iq,
struct dns_msg* msg, int id)
{
struct edns_data edns;
if(iq->prepend_list) {
if(!iter_prepend(iq, msg, qstate->region)) {
log_err("prepend rrsets: out of memory");
error_response(qstate, id, LDNS_RCODE_SERVFAIL);
return;
}
}
edns.edns_present = qstate->edns.edns_present;
edns.edns_version = EDNS_ADVERTISED_VERSION;
edns.udp_size = EDNS_ADVERTISED_SIZE;
edns.ext_rcode = 0;
edns.bits = qstate->edns.bits & EDNS_DO;
if(!reply_info_answer_encode(&qstate->qinfo, msg->rep, 0,
qstate->query_flags, qstate->buf, 0, 1,
qstate->env->scratch, qstate->edns.udp_size,
&edns, (int)(qstate->edns.bits & EDNS_DO))) {
/* encode servfail */
error_response(qstate, id, LDNS_RCODE_SERVFAIL);
return;
}
}
/**
* Add rrset to prepend list
* @param qstate: query state.
@ -434,68 +445,30 @@ handle_cname_response(struct module_qstate* qstate, struct iter_qstate* iq,
* need iterative processing
* @param final_state The final state for the response to this
* request.
* @return generated subquerystate, or NULL on error (malloc).
* @param subq_ret: if newly allocated, the subquerystate, or NULL if it does
* not need initialisation.
* @return false on error (malloc).
*/
static struct module_qstate*
static int
generate_sub_request(uint8_t* qname, size_t qnamelen, uint16_t qtype,
uint16_t qclass, struct module_qstate* qstate, int id,
struct iter_qstate* iq, enum iter_state initial_state,
enum iter_state final_state)
enum iter_state final_state, struct module_qstate** subq_ret)
{
struct module_qstate* subq = (struct module_qstate*)malloc(
sizeof(struct module_qstate));
struct iter_qstate* subiq;
if(!subq)
return NULL;
memset(subq, 0, sizeof(*subq));
subq->qinfo.qname = memdup(qname, qnamelen);
if(!subq->qinfo.qname) {
free(subq);
return NULL;
}
subq->qinfo.qname_len = qnamelen;
subq->qinfo.qtype = qtype;
subq->qinfo.qclass = qclass;
subq->query_hash = query_info_hash(&subq->qinfo);
subq->query_flags = 0; /* OPCODE QUERY, no flags */
subq->edns.udp_size = 65535;
subq->buf = qstate->buf;
subq->env = qstate->env;
subq->env->scratch = qstate->env->scratch;
subq->region = region_create(malloc, free);
if(!subq->region) {
free(subq->qinfo.qname);
free(subq);
return NULL;
}
subq->curmod = id;
subq->ext_state[id] = module_state_initial;
subq->minfo[id] = region_alloc(subq->region,
sizeof(struct iter_qstate));
if(!subq->minfo[id]) {
region_destroy(subq->region);
free(subq->qinfo.qname);
free(subq);
return NULL;
}
subq->work_info = NULL;
subq->parent = qstate;
module_subreq_insert(&qstate->subquery_first, subq);
subiq = (struct iter_qstate*)subq->minfo[id];
memset(subiq, 0, sizeof(*subiq));
subiq->num_target_queries = 0;
subiq->num_current_queries = 0;
subiq->depth = iq->depth+1;
outbound_list_init(&subiq->outlist);
subiq->state = initial_state;
subiq->final_state = final_state;
subiq->qchase = subq->qinfo;
struct module_qstate* subq = NULL;
struct iter_qstate* subiq = NULL;
uint16_t qflags = 0; /* OPCODE QUERY, no flags */
struct query_info qinf;
int prime = (final_state == PRIME_RESP_STATE)?1:0;
qinf.qname = qname;
qinf.qname_len = qnamelen;
qinf.qtype = qtype;
qinf.qclass = qclass;
/* RD should be set only when sending the query back through the INIT
* state. */
if(initial_state == INIT_REQUEST_STATE)
subq->query_flags |= BIT_RD;
qflags |= BIT_RD;
/* We set the CD flag so we can send this through the "head" of
* the resolution chain, which might have a validator. We are
* uninterested in validating things not on the direct resolution
@ -503,9 +476,35 @@ generate_sub_request(uint8_t* qname, size_t qnamelen, uint16_t qtype,
/* Turned off! CD does not make a difference in query results.
qstate->query_flags |= BIT_CD;
*/
subiq->chase_flags = subq->query_flags;
return subq;
/* attach subquery, lookup existing or make a new one */
if(!(*qstate->env->attach_sub)(qstate, &qinf, qflags, prime, &subq)) {
return 0;
}
*subq_ret = subq;
if(subq) {
/* initialise the new subquery */
subq->curmod = id;
subq->ext_state[id] = module_state_initial;
subq->minfo[id] = region_alloc(subq->region,
sizeof(struct iter_qstate));
if(!subq->minfo[id]) {
log_err("init subq: out of memory");
(*qstate->env->kill_sub)(subq);
return 0;
}
subiq = (struct iter_qstate*)subq->minfo[id];
memset(subiq, 0, sizeof(*subiq));
subiq->num_target_queries = 0;
subiq->num_current_queries = 0;
subiq->depth = iq->depth+1;
outbound_list_init(&subiq->outlist);
subiq->state = initial_state;
subiq->final_state = final_state;
subiq->qchase = subq->qinfo;
subiq->chase_flags = subq->query_flags;
}
return 1;
}
/**
@ -522,7 +521,6 @@ prime_root(struct module_qstate* qstate, struct iter_qstate* iq,
{
struct delegpt* dp;
struct module_qstate* subq;
struct iter_qstate* subiq;
verbose(VERB_ALGO, "priming . NS %s",
ldns_lookup_by_id(ldns_rr_classes, (int)qclass)?
ldns_lookup_by_id(ldns_rr_classes, (int)qclass)->name:"??");
@ -533,19 +531,21 @@ prime_root(struct module_qstate* qstate, struct iter_qstate* iq,
}
/* Priming requests start at the QUERYTARGETS state, skipping
* the normal INIT state logic (which would cause an infloop). */
subq = generate_sub_request((uint8_t*)"\000", 1, LDNS_RR_TYPE_NS,
qclass, qstate, id, iq, QUERYTARGETS_STATE, PRIME_RESP_STATE);
if(!subq) {
if(!generate_sub_request((uint8_t*)"\000", 1, LDNS_RR_TYPE_NS,
qclass, qstate, id, iq, QUERYTARGETS_STATE, PRIME_RESP_STATE,
&subq)) {
log_err("out of memory priming root");
return 0;
}
subiq = (struct iter_qstate*)subq->minfo[id];
/* Set the initial delegation point to the hint. */
subiq->dp = dp;
/* suppress any target queries. */
subiq->num_target_queries = 0;
subiq->priming = 1;
if(subq) {
struct iter_qstate* subiq =
(struct iter_qstate*)subq->minfo[id];
/* Set the initial delegation point to the hint. */
subiq->dp = dp;
/* there should not be any target queries. */
subiq->num_target_queries = 0;
subiq->priming = 1;
}
/* this module stops, our submodule starts, and does the query. */
qstate->ext_state[id] = module_wait_subquery;
@ -574,7 +574,6 @@ prime_stub(struct module_qstate* qstate, struct iter_qstate* iq,
struct delegpt* stub_dp = hints_lookup_stub(ie->hints, qname, qclass,
iq->dp);
struct module_qstate* subq;
struct iter_qstate* subiq;
/* The stub (if there is one) does not need priming. */
if(!stub_dp)
return 0;
@ -585,23 +584,26 @@ prime_stub(struct module_qstate* qstate, struct iter_qstate* iq,
/* Stub priming events start at the QUERYTARGETS state to avoid the
* redundant INIT state processing. */
subq = generate_sub_request(stub_dp->name, stub_dp->namelen,
if(!generate_sub_request(stub_dp->name, stub_dp->namelen,
LDNS_RR_TYPE_NS, qclass, qstate, id, iq,
QUERYTARGETS_STATE, PRIME_RESP_STATE);
if(!subq) {
QUERYTARGETS_STATE, PRIME_RESP_STATE, &subq)) {
log_err("out of memory priming stub");
(void)error_response(qstate, id, LDNS_RCODE_SERVFAIL);
return 1; /* return 1 to make module stop, with error */
}
subiq = (struct iter_qstate*)subq->minfo[id];
if(subq) {
struct iter_qstate* subiq =
(struct iter_qstate*)subq->minfo[id];
/* Set the initial delegation point to the hint. */
subiq->dp = stub_dp;
/* suppress any target queries -- although there wouldn't be anyway,
* since stub hints never have missing targets.*/
subiq->num_target_queries = 0;
subiq->priming = 1;
subiq->priming_stub = 1;
/* Set the initial delegation point to the hint. */
subiq->dp = stub_dp;
/* there should not be any target queries -- although there
* wouldn't be anyway, since stub hints never have
* missing targets. */
subiq->num_target_queries = 0;
subiq->priming = 1;
subiq->priming_stub = 1;
}
/* this module stops, our submodule starts, and does the query. */
qstate->ext_state[id] = module_wait_subquery;
@ -853,22 +855,22 @@ static int
generate_target_query(struct module_qstate* qstate, struct iter_qstate* iq,
int id, uint8_t* name, size_t namelen, uint16_t qtype, uint16_t qclass)
{
struct module_qstate* subq = generate_sub_request(name, namelen, qtype,
qclass, qstate, id, iq, INIT_REQUEST_STATE, TARGET_RESP_STATE);
struct iter_qstate* subiq;
if(!subq)
return 0;
subiq = (struct iter_qstate*)subq->minfo[id];
subiq->dp = delegpt_copy(iq->dp, subq->region);
if(!subiq->dp) {
module_subreq_remove(&qstate->subquery_first, subq);
region_destroy(subq->region);
free(subq->qinfo.qname);
free(subq);
struct module_qstate* subq;
if(!generate_sub_request(name, namelen, qtype, qclass, qstate,
id, iq, INIT_REQUEST_STATE, FINISHED_STATE, &subq))
return 0;
if(subq) {
struct iter_qstate* subiq =
(struct iter_qstate*)subq->minfo[id];
subiq->dp = delegpt_copy(iq->dp, subq->region);
if(!subiq->dp) {
log_err("init targetq: out of memory");
(*qstate->env->kill_sub)(subq);
return 0;
}
delegpt_log(subiq->dp);
}
log_nametypeclass(VERB_DETAIL, "new target", name, qtype, qclass);
delegpt_log(subiq->dp);
return 1;
}
@ -1077,7 +1079,7 @@ processQueryTargets(struct module_qstate* qstate, struct iter_qstate* iq,
}
/* move other targets to slumber list */
if(iq->num_target_queries>0) {
(*qstate->env->remove_subqueries)(qstate);
(*qstate->env->detach_subs)(qstate);
iq->num_target_queries = 0;
}
@ -1140,7 +1142,7 @@ processQueryResponse(struct module_qstate* qstate, struct iter_qstate* iq,
/* close down outstanding requests to be discarded */
outbound_list_clear(&iq->outlist);
iq->num_current_queries = 0;
(*qstate->env->remove_subqueries)(qstate);
(*qstate->env->detach_subs)(qstate);
iq->num_target_queries = 0;
return final_state(iq);
} else if(type == RESPONSE_TYPE_REFERRAL) {
@ -1168,7 +1170,7 @@ processQueryResponse(struct module_qstate* qstate, struct iter_qstate* iq,
*/
outbound_list_clear(&iq->outlist);
iq->num_current_queries = 0;
(*qstate->env->remove_subqueries)(qstate);
(*qstate->env->detach_subs)(qstate);
iq->num_target_queries = 0;
verbose(VERB_ALGO, "cleared outbound list for next round");
return next_state(iq, QUERYTARGETS_STATE);
@ -1204,7 +1206,7 @@ processQueryResponse(struct module_qstate* qstate, struct iter_qstate* iq,
*/
outbound_list_clear(&iq->outlist);
iq->num_current_queries = 0;
(*qstate->env->remove_subqueries)(qstate);
(*qstate->env->detach_subs)(qstate);
iq->num_target_queries = 0;
verbose(VERB_ALGO, "cleared outbound list for query restart");
/* go to INIT_REQUEST_STATE for new qname. */
@ -1238,35 +1240,28 @@ processQueryResponse(struct module_qstate* qstate, struct iter_qstate* iq,
}
/**
* This handles the response to a priming query. This is used to handle both
* root and stub priming responses. This is basically the equivalent of the
* QUERY_RESP_STATE, but will not handle CNAME responses and will treat
* REFERRALs as ANSWERS. It will also update and reactivate the originating
* event.
* Return priming query results to interestes super querystates.
*
* @param qstate: query state.
* @param iq: iterator query state.
* Sets the delegation point and delegation message (not nonRD queries).
* This is a callback from walk_supers.
*
* @param qstate: priming query state that finished.
* @param id: module id.
* @return true if the event needs more immediate processing, false if not.
* This state always returns false.
* @param forq: the qstate for which priming has been done.
* @param rcode: error code.
*/
static int
processPrimeResponse(struct module_qstate* qstate, struct iter_qstate* iq,
int id)
static void
prime_supers(struct module_qstate* qstate, int id,
struct module_qstate* forq, int rcode)
{
struct module_qstate* forq = qstate->parent;
struct iter_qstate* foriq;
struct iter_qstate* iq = (struct iter_qstate*)qstate->minfo[id];
struct iter_qstate* foriq = (struct iter_qstate*)forq->minfo[id];
struct delegpt* dp = NULL;
enum response_type type = response_type_from_server(iq->response,
&iq->qchase, iq->dp);
/* This event is finished. */
qstate->ext_state[id] = module_finished;
if(!qstate->parent) {
/* no more parent - it is not interested anymore */
return 0;
}
log_assert(rcode == LDNS_RCODE_NOERROR);
log_assert(iq->priming || iq->priming_stub);
if(type == RESPONSE_TYPE_ANSWER) {
/* Convert our response to a delegation point */
dp = delegpt_from_message(iq->response, forq->region);
@ -1276,17 +1271,20 @@ processPrimeResponse(struct module_qstate* qstate, struct iter_qstate* iq,
* the ANSWER type was (presumably) a negative answer. */
verbose(VERB_ALGO, "prime response was not a positive "
"ANSWER; failing");
return error_response(qstate, id, LDNS_RCODE_SERVFAIL);
foriq->dp = NULL;
foriq->state = QUERYTARGETS_STATE;
return;
}
log_query_info(VERB_DETAIL, "priming successful for", &iq->qchase);
delegpt_log(dp);
foriq = (struct iter_qstate*)forq->minfo[id];
foriq->dp = dp;
foriq->deleg_msg = dns_copy_msg(iq->response, forq->region);
if(!foriq->deleg_msg) {
log_err("copy prime response: out of memory");
return error_response(qstate, id, LDNS_RCODE_SERVFAIL);
foriq->dp = NULL;
foriq->state = QUERYTARGETS_STATE;
return;
}
/* root priming responses go to init stage 2, priming stub
@ -1295,6 +1293,32 @@ processPrimeResponse(struct module_qstate* qstate, struct iter_qstate* iq,
foriq->state = INIT_REQUEST_3_STATE;
else foriq->state = INIT_REQUEST_2_STATE;
/* because we are finished, the parent will be reactivated */
}
/**
* This handles the response to a priming query. This is used to handle both
* root and stub priming responses. This is basically the equivalent of the
* QUERY_RESP_STATE, but will not handle CNAME responses and will treat
* REFERRALs as ANSWERS. It will also update and reactivate the originating
* event.
*
* @param qstate: query state.
* @param id: module id.
* @return true if the event needs more immediate processing, false if not.
* This state always returns false.
*/
static int
processPrimeResponse(struct module_qstate* qstate, int id)
{
/* This event is finished. */
qstate->ext_state[id] = module_finished;
/* there should be no outside clients subscribed tell them to
* bugger off (and retry) */
(*qstate->env->query_done)(qstate, LDNS_RCODE_SERVFAIL, NULL);
/* tell interested supers that priming is done */
(*qstate->env->walk_supers)(qstate, id, LDNS_RCODE_NOERROR,
&prime_supers);
return 0;
}
@ -1304,30 +1328,26 @@ processPrimeResponse(struct module_qstate* qstate, struct iter_qstate* iq,
* responsible for reactiving the original event, and housekeeping related
* to received target responses (caching, updating the current delegation
* point, etc).
* Callback from walk_supers for every super state that is interested in
* the results from thiis query.
*
* @param qstate: query state.
* @param iq: iterator query state.
* @param id: module id.
* @return true if the event requires more (response) processing
* immediately, false if not. This particular state always returns
* false.
* @param forq: super query state.
* @param rcode: if not NOERROR, an error occurred.
*/
static int
processTargetResponse(struct module_qstate* qstate, struct iter_qstate* iq,
int id)
static void
processTargetResponse(struct module_qstate* qstate, int id,
struct module_qstate* forq, int rcode)
{
struct iter_qstate* iq = (struct iter_qstate*)qstate->minfo[id];
struct iter_qstate* foriq = (struct iter_qstate*)forq->minfo[id];
struct ub_packed_rrset_key* rrset;
struct delegpt_ns* dpns;
struct module_qstate* forq = qstate->parent;
struct iter_qstate* foriq;
qstate->ext_state[id] = module_finished;
if(!qstate->parent) {
/* no parent, it is not interested anymore */
return 0;
}
foriq = (struct iter_qstate*)forq->minfo[id];
foriq->state = QUERYTARGETS_STATE;
/* use error_response for errs*/
log_assert(rcode == LDNS_RCODE_NOERROR);
/* check to see if parent event is still interested (in orig name). */
dpns = delegpt_find_ns(foriq->dp, qstate->qinfo.qname,
@ -1337,7 +1357,10 @@ processTargetResponse(struct module_qstate* qstate, struct iter_qstate* iq,
* anyways? */
/* If not, just stop processing this event */
verbose(VERB_ALGO, "subq: parent not interested anymore");
return 0;
/* this is an error, and will cause parent to be reactivated
* even though nothing has happened */
log_assert(0);
return;
}
/* Tell the originating event that this target query has finished
@ -1363,11 +1386,6 @@ processTargetResponse(struct module_qstate* qstate, struct iter_qstate* iq,
} else dpns->resolved = 1; /* fail the target */
log_assert(dpns->resolved); /* one way or another it is now done */
/* Reactivate the forEvent, now that it has either a new target or a
* failed target. */
foriq->state = QUERYTARGETS_STATE;
return 0;
}
/**
@ -1409,9 +1427,22 @@ processFinished(struct module_qstate* qstate, struct iter_qstate* iq,
/* TODO: we are using a private TTL, trim the response. */
/* if (mPrivateTTL > 0){IterUtils.setPrivateTTL(resp, mPrivateTTL); } */
/* Makes sure the final response contains the original question. */
/* and prepends items we have to prepend. Stores reponse in buffer */
iter_encode_respmsg(qstate, iq, iq->response, id);
/* prepend any items we have accumulated */
if(iq->prepend_list) {
if(!iter_prepend(iq, iq->response, qstate->region)) {
log_err("prepend rrsets: out of memory");
return error_response(qstate, id, LDNS_RCODE_SERVFAIL);
}
}
if(query_dname_compare(qstate->qinfo.qname,
iq->response->qinfo.qname) == 0) {
/* use server supplied upper/lower case */
qstate->qinfo.qname = iq->response->qinfo.qname;
}
(*qstate->env->query_done)(qstate, LDNS_RCODE_NOERROR,
iq->response->rep);
(*qstate->env->walk_supers)(qstate, id, LDNS_RCODE_NOERROR,
&processTargetResponse);
return 0;
}
@ -1453,10 +1484,7 @@ iter_handle(struct module_qstate* qstate, struct iter_qstate* iq,
cont = processQueryResponse(qstate, iq, id);
break;
case PRIME_RESP_STATE:
cont = processPrimeResponse(qstate, iq, id);
break;
case TARGET_RESP_STATE:
cont = processTargetResponse(qstate, iq, id);
cont = processPrimeResponse(qstate, id);
break;
case FINISHED_STATE:
cont = processFinished(qstate, iq, id);
@ -1552,50 +1580,6 @@ handle_it:
outbound_list_remove(&iq->outlist, outbound);
iter_handle(qstate, iq, ie, id);
}
/**
* Handles subquery errors. Checks if query is still relevant, and adjusts
* the state.
* @param qstate: query state.
* @param ie: iterator shared global environment.
* @param iq: iterator query state.
* @param id: module id.
*/
static void
process_subq_error(struct module_qstate* qstate, struct iter_qstate* iq,
struct iter_env* ie, int id)
{
struct query_info errinf;
struct delegpt_ns* dpns = NULL;
if(!query_info_parse(&errinf, qstate->buf)) {
log_err("Could not parse error from sub module");
return;
}
if(errinf.qtype == LDNS_RR_TYPE_NS) {
/* a priming query has failed. */
(void)error_response(qstate, id, LDNS_RCODE_SERVFAIL);
return;
}
if(errinf.qtype != LDNS_RR_TYPE_A &&
errinf.qtype != LDNS_RR_TYPE_AAAA) {
log_err("Bad error from sub module");
return;
}
/* see if we are still interested in this subquery result */
if(iq->dp)
dpns = delegpt_find_ns(iq->dp, errinf.qname,
errinf.qname_len);
if(!dpns) {
/* not interested */
verbose(VERB_ALGO, "got subq error, but not interested");
log_query_info(VERB_ALGO, "errname", &errinf);
delegpt_log(iq->dp);
return;
}
dpns->resolved = 1; /* mark as failed */
iq->num_target_queries--; /* and the query is finished */
iq->state = QUERYTARGETS_STATE; /* evaluate targets again */
iter_handle(qstate, iq, ie, id);
}
/** iterator operate on a query */
static void
@ -1634,16 +1618,6 @@ iter_operate(struct module_qstate* qstate, enum module_ev event, int id,
process_response(qstate, iq, ie, id, outbound, event);
return;
}
if(event == module_event_subq_done) {
/* subquery has set our state correctly */
iter_handle(qstate, iq, ie, id);
return;
}
if(event == module_event_subq_error) {
/* need to delist subquery and continue processing */
process_subq_error(qstate, iq, ie, id);
return;
}
if(event == module_event_error) {
verbose(VERB_ALGO, "got called with event error, giving up");
(void)error_response(qstate, id, LDNS_RCODE_SERVFAIL);
@ -1700,8 +1674,6 @@ iter_state_to_string(enum iter_state state)
return "PRIME RESPONSE STATE";
case QUERY_RESP_STATE :
return "QUERY RESPONSE STATE";
case TARGET_RESP_STATE :
return "TARGET RESPONSE STATE";
case FINISHED_STATE :
return "FINISHED RESPONSE STATE";
default :

View file

@ -135,10 +135,8 @@ enum iter_state {
/** Responses to priming queries finish at this state. */
PRIME_RESP_STATE,
/** Responses to target queries start at this state. */
TARGET_RESP_STATE,
/** Responses that are to be returned upstream end at this state. */
/** Responses that are to be returned upstream end at this state.
* As well as responses to target queries. */
FINISHED_STATE
};

View file

@ -149,7 +149,7 @@ void mesh_new_client(struct mesh_area* mesh, struct query_info* qinfo,
qinfo, qid, qflags, edns);
comm_point_send_reply(rep);
if(added)
mesh_state_delete(s);
mesh_state_delete(&s->s);
return;
}
/* update statistics */
@ -240,12 +240,14 @@ mesh_state_cleanup(struct mesh_state* mstate)
}
void
mesh_state_delete(struct mesh_state* mstate)
mesh_state_delete(struct module_qstate* qstate)
{
struct mesh_area* mesh;
struct mesh_state_ref* super, ref;
if(!mstate)
struct mesh_state* mstate;
if(!qstate)
return;
mstate = qstate->mesh_info;
mesh = mstate->s.env->mesh;
mesh_detach_subs(&mstate->s);
if(!mstate->reply_list && mstate->super_set.count == 0) {
@ -350,7 +352,7 @@ timeval_subtract(struct timeval* d, struct timeval* end, struct timeval* start)
#ifndef S_SPLINT_S
d->tv_sec = end->tv_sec - start->tv_sec;
while(end->tv_usec < start->tv_usec) {
d->tv_usec += 1000000;
end->tv_usec += 1000000;
d->tv_sec--;
}
d->tv_usec = end->tv_usec - start->tv_usec;
@ -371,6 +373,25 @@ timeval_add(struct timeval* d, struct timeval* add)
#endif
}
/** divide sum of timers to get average */
static void
timeval_divide(struct timeval* avg, struct timeval* sum, size_t d)
{
#ifndef S_SPLINT_S
size_t leftover;
if(d == 0) {
avg->tv_sec = 0;
avg->tv_usec = 0;
return;
}
avg->tv_sec = sum->tv_sec / d;
avg->tv_usec = sum->tv_usec / d;
/* handle fraction from seconds divide */
leftover = sum->tv_sec - avg->tv_sec*d;
avg->tv_usec += (leftover*1000000)/d;
#endif
}
/**
* Send reply to mesh reply entry
* @param m: mesh state to send it for.
@ -412,7 +433,7 @@ mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep,
} else {
struct timeval duration;
timeval_subtract(&duration, &end_time, &r->start_time);
verbose(VERB_ALGO, "query took %d s %d usec",
verbose(VERB_ALGO, "query took %d.%6.6d sec",
(int)duration.tv_sec, (int)duration.tv_usec);
m->s.env->mesh->replies_sent++;
timeval_add(&m->s.env->mesh->replies_sum_wait, &duration);
@ -503,7 +524,7 @@ void mesh_run(struct mesh_area* mesh, struct mesh_state* mstate,
if(s == module_error || s == module_finished) {
/* must have called _done and _supers */
log_assert(mstate->debug_flags == 3);
mesh_state_delete(mstate);
mesh_state_delete(&mstate->s);
}
/* run more modules */
@ -515,5 +536,17 @@ void mesh_run(struct mesh_area* mesh, struct mesh_state* mstate,
(void)rbtree_delete(&mesh->run, mstate);
} else mstate = NULL;
}
verbose(VERB_ALGO, "mesh_run: end");
verbose(VERB_ALGO, "mesh_run: end, %u states (%u with reply, "
"%u detached), %u total replies", (unsigned)mesh->all.count,
(unsigned)mesh->num_reply_states,
(unsigned)mesh->num_detached_states,
(unsigned)mesh->num_reply_addrs);
if(1) {
struct timeval avg;
timeval_divide(&avg, &mesh->replies_sum_wait,
mesh->replies_sent);
verbose(VERB_ALGO, "send %u replies, with average wait "
"of %d.%6.6d", (unsigned)mesh->replies_sent,
(int)avg.tv_sec, (int)avg.tv_usec);
}
}

View file

@ -266,6 +266,13 @@ void mesh_query_done(struct module_qstate* qstate, int rcode,
void mesh_walk_supers(struct module_qstate* qstate, int id, int rcode,
void (*cb)(struct module_qstate*, int, struct module_qstate*, int));
/**
* Delete mesh state, cleanup and also rbtrees and so on.
* Will detach from all super/subnodes.
* @param qstate: to remove.
*/
void mesh_state_delete(struct module_qstate* qstate);
/* ------------------- Functions for mesh -------------------- */
/**
@ -288,13 +295,6 @@ struct mesh_state* mesh_state_create(struct module_env* env,
*/
void mesh_state_cleanup(struct mesh_state* mstate);
/**
* Delete mesh state, cleanup and also rbtrees and so on.
* Will detach from all super/subnodes.
* @param mstate: to remove.
*/
void mesh_state_delete(struct mesh_state* mstate);
/**
* Find a mesh state in the mesh area. Pass relevant flags.
*

View file

@ -62,47 +62,7 @@ strmodulevent(enum module_ev e)
case module_event_pass: return "module_event_pass";
case module_event_reply: return "module_event_reply";
case module_event_timeout: return "module_event_timeout";
case module_event_mod_done: return "module_event_mod_done";
case module_event_subq_done: return "module_event_subq_done";
case module_event_subq_error: return "module_event_subq_error";
case module_event_error: return "module_event_error";
}
return "bad_event_value";
}
void
module_subreq_remove(struct module_qstate** head, struct module_qstate* sub)
{
if(!sub || !head)
return;
/* snip off double linked list */
if(sub->subquery_prev)
sub->subquery_prev->subquery_next = sub->subquery_next;
else *head = sub->subquery_next;
if(sub->subquery_next)
sub->subquery_next->subquery_prev = sub->subquery_prev;
/* cleanup values for robustness */
sub->subquery_next = NULL;
sub->subquery_prev = NULL;
}
void
module_subreq_insert(struct module_qstate** head, struct module_qstate* sub)
{
if(*head)
(*head)->subquery_prev = sub;
sub->subquery_next = *head;
sub->subquery_prev = NULL;
*head = sub;
}
int
module_subreq_num(struct module_qstate* q)
{
int n = 0;
while(q) {
n++;
q = q->subquery_next;
}
return n;
}

View file

@ -153,6 +153,15 @@ struct module_env {
struct query_info* qinfo, uint16_t qflags, int prime,
struct module_qstate** newq);
/**
* Kill newly attached sub. If attach_sub returns newq for
* initialisation, but that fails, then this routine will cleanup and
* delete the fresly created sub.
* @param newq: the new subquery that is no longer needed.
* It is removed.
*/
void (*kill_sub)(struct module_qstate* newq);
/**
* Query state is done, send messages to reply entries.
* Encode messages using reply entry values and the querystate
@ -202,17 +211,6 @@ struct module_env {
struct ub_randstate* rnd;
/** module specific data. indexed by module id. */
void* modinfo[MAX_MODULE];
/** @@@ TO BE DELETED */
/**
* Cleanup subqueries from this query state. Either delete or
* move them somewhere else. This query state no longer needs the
* results from those subqueries.
* @param qstate: query state.
* subqueries are (re)moved so that no subq_done events from
* them will reach this qstate.
*/
void (*remove_subqueries)(struct module_qstate* qstate);
};
/**
@ -247,12 +245,6 @@ enum module_ev {
module_event_reply,
/** timeout */
module_event_timeout,
/** other module finished */
module_event_mod_done,
/** subquery finished */
module_event_subq_done,
/** subquery finished with error */
module_event_subq_error,
/** error */
module_event_error
};
@ -281,24 +273,6 @@ struct module_qstate {
struct module_env* env;
/** mesh related information for this query */
struct mesh_state* mesh_info;
/** ----- TO DELETE */
struct work_query* work_info;
/** hash value of the query qinfo */
hashvalue_t query_hash;
/** edns data from the query */
struct edns_data edns;
/** buffer, store resulting reply here.
* May be cleared when module blocks. */
ldns_buffer* buf;
/** parent query, only nonNULL for subqueries */
struct module_qstate* parent;
/** pointer to first subquery below this one; makes list with next */
struct module_qstate* subquery_first;
/** pointer to next sibling subquery (not above or below this one) */
struct module_qstate* subquery_next;
/** pointer to prev sibling subquery (not above or below this one) */
struct module_qstate* subquery_prev;
};
/**
@ -361,29 +335,4 @@ const char* strextstate(enum module_ext_state s);
*/
const char* strmodulevent(enum module_ev e);
/**
* Remove subqrequest from list.
* @param head: List head. pointer to start of subquery_next/prev sibling list.
* mostly reference to the parent subquery_first.
* @param sub: subrequest. It is snipped off.
*/
void module_subreq_remove(struct module_qstate** head,
struct module_qstate* sub);
/**
* Insert subqrequest in list. You must set the parent ptr of sub correctly.
* @param head: List head. pointer to start of subquery_next/prev sibling list.
* mostly reference to the parent subquery_first.
* @param sub: subrequest. It is added to the list.
*/
void module_subreq_insert(struct module_qstate** head,
struct module_qstate* sub);
/**
* Calculate number of queries in the query list.
* @param q: the start of the list, pass subquery_first.
* @return: number, 0 if q was NULL.
*/
int module_subreq_num(struct module_qstate* q);
#endif /* UTIL_MODULE_H */