iterator module.

git-svn-id: file:///svn/unbound/trunk@311 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
Wouter Wijngaards 2007-05-11 14:16:42 +00:00
parent 696a07e7eb
commit 49bc2f887c
14 changed files with 683 additions and 282 deletions

View file

@ -50,7 +50,7 @@ LINTFLAGS+="-DBN_ULONG=unsigned long" -Dkrb5_int32=int "-Dkrb5_ui_4=unsigned int
INSTALL=$(srcdir)/install-sh INSTALL=$(srcdir)/install-sh
COMMON_SRC=$(wildcard services/*.c util/*.c util/data/*.c util/storage/*.c) util/configparser.c util/configlexer.c testcode/checklocks.c COMMON_SRC=$(wildcard services/*.c util/*.c util/data/*.c util/storage/*.c iterator/*.c) util/configparser.c util/configlexer.c testcode/checklocks.c
COMMON_OBJ=$(addprefix $(BUILD),$(COMMON_SRC:.c=.o)) COMMON_OBJ=$(addprefix $(BUILD),$(COMMON_SRC:.c=.o))
COMPAT_OBJ=$(addprefix $(BUILD)compat/,$(LIBOBJS)) COMPAT_OBJ=$(addprefix $(BUILD)compat/,$(LIBOBJS))
UNITTEST_SRC=$(wildcard testcode/unit*.c) testcode/readhex.c $(COMMON_SRC) UNITTEST_SRC=$(wildcard testcode/unit*.c) testcode/readhex.c $(COMMON_SRC)

View file

@ -50,6 +50,8 @@
#include "util/data/msgreply.h" #include "util/data/msgreply.h"
#include "util/storage/slabhash.h" #include "util/storage/slabhash.h"
#include "services/listen_dnsport.h" #include "services/listen_dnsport.h"
#include "util/module.h"
#include "iterator/iterator.h"
#include <signal.h> #include <signal.h>
/** How many quit requests happened. */ /** How many quit requests happened. */
@ -136,6 +138,11 @@ daemon_init()
return NULL; return NULL;
} }
alloc_init(&daemon->superalloc, NULL, 0); alloc_init(&daemon->superalloc, NULL, 0);
if(!(daemon->env = (struct module_env*)calloc(1,
sizeof(*daemon->env)))) {
daemon_delete(daemon);
return NULL;
}
return daemon; return daemon;
} }
@ -152,6 +159,37 @@ daemon_open_shared_ports(struct daemon* daemon)
return 1; return 1;
} }
/**
* Setup modules. Assigns ids and calls module_init.
* @param daemon: the daemon
*/
static void daemon_setup_modules(struct daemon* daemon)
{
int i;
/* fixed setup of the modules */
daemon->num_modules = 1;
daemon->modfunc = (struct module_func_block**)calloc((size_t)
daemon->num_modules, sizeof(struct module_func_block*));
if(!daemon->modfunc) {
fatal_exit("malloc failure allocating function callbacks");
}
daemon->modfunc[0] = iter_get_funcblock();
daemon->env->cfg = daemon->cfg;
daemon->env->msg_cache = daemon->msg_cache;
daemon->env->rrset_cache = daemon->rrset_cache;
daemon->env->alloc = &daemon->superalloc;
daemon->env->worker = NULL;
daemon->env->send_query = &worker_send_query;
for(i=0; i<daemon->num_modules; i++) {
log_info("init module %d: %s", i, daemon->modfunc[i]->name);
if(!(*daemon->modfunc[i]->init)(daemon->env, i)) {
fatal_exit("module init for module %s failed",
daemon->modfunc[i]->name);
}
}
}
/** /**
* Allocate empty worker structures. With backptr and thread-number, * Allocate empty worker structures. With backptr and thread-number,
* from 0..numthread initialised. Used as user arguments to new threads. * from 0..numthread initialised. Used as user arguments to new threads.
@ -264,10 +302,30 @@ daemon_stop_others(struct daemon* daemon)
} }
} }
/**
* Desetup the modules, deinit, delete.
* @param daemon: the daemon.
*/
static void
daemon_desetup_modules(struct daemon* daemon)
{
int i;
for(i=0; i<daemon->num_modules; i++) {
(*daemon->modfunc[i]->deinit)(daemon->env, i);
}
daemon->num_modules = 0;
free(daemon->modfunc);
daemon->modfunc = 0;
}
void void
daemon_fork(struct daemon* daemon) daemon_fork(struct daemon* daemon)
{ {
log_assert(daemon); log_assert(daemon);
/* setup modules */
daemon_setup_modules(daemon);
/* first create all the worker structures, so we can pass /* first create all the worker structures, so we can pass
* them to the newly created threads. * them to the newly created threads.
*/ */
@ -293,6 +351,9 @@ daemon_fork(struct daemon* daemon)
/* we exited! a signal happened! Stop other threads */ /* we exited! a signal happened! Stop other threads */
daemon_stop_others(daemon); daemon_stop_others(daemon);
/* de-setup modules */
daemon_desetup_modules(daemon);
if(daemon->workers[0]->need_to_restart) if(daemon->workers[0]->need_to_restart)
daemon->need_to_exit = 0; daemon->need_to_exit = 0;
else daemon->need_to_exit = 1; else daemon->need_to_exit = 1;
@ -326,6 +387,7 @@ daemon_delete(struct daemon* daemon)
alloc_clear(&daemon->superalloc); alloc_clear(&daemon->superalloc);
free(daemon->cwd); free(daemon->cwd);
free(daemon->pidfile); free(daemon->pidfile);
free(daemon->env);
free(daemon); free(daemon);
checklock_stop(); checklock_stop();
} }

View file

@ -48,6 +48,7 @@ struct config_file;
struct worker; struct worker;
struct listen_port; struct listen_port;
struct slabhash; struct slabhash;
struct module_env;
/** /**
* Structure holding worker list. * Structure holding worker list.
@ -76,6 +77,12 @@ struct daemon {
struct slabhash* msg_cache; struct slabhash* msg_cache;
/** the rrset cache, content is struct ub_packed_rrset_key* */ /** the rrset cache, content is struct ub_packed_rrset_key* */
struct slabhash* rrset_cache; struct slabhash* rrset_cache;
/** the module environment master value, copied and changed by threads*/
struct module_env* env;
/** number of modules active, ids from 0 to num-1. */
int num_modules;
/** the module callbacks, array of num_modules length */
struct module_func_block** modfunc;
}; };
/** /**

View file

@ -59,18 +59,6 @@
#include <netdb.h> #include <netdb.h>
#include <signal.h> #include <signal.h>
/** size of ID+FLAGS in a DNS message */
#define DNS_ID_AND_FLAGS 4
/** timeout in seconds for UDP queries to auth servers. TODO: proper rtt */
#define UDP_QUERY_TIMEOUT 4
/** timeout in seconds for TCP queries to auth servers. TODO: proper rtt */
#define TCP_QUERY_TIMEOUT 30
/** Advertised version of EDNS capabilities */
#define EDNS_ADVERTISED_VERSION 0
/** Advertised size of EDNS capabilities */
#define EDNS_ADVERTISED_SIZE 4096
void void
worker_send_cmd(struct worker* worker, ldns_buffer* buffer, worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
enum worker_commands cmd) enum worker_commands cmd)
@ -91,14 +79,15 @@ worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
static void static void
req_release(struct work_query* w) req_release(struct work_query* w)
{ {
if(w->worker->num_requests == w->worker->request_size) { struct worker* worker = w->state.env->worker;
if(worker->num_requests == worker->request_size) {
/* no longer at max, start accepting again. */ /* no longer at max, start accepting again. */
listen_resume(w->worker->front); listen_resume(worker->front);
} }
log_assert(w->worker->num_requests >= 1); log_assert(worker->num_requests >= 1);
w->worker->num_requests --; worker->num_requests --;
w->next = w->worker->free_queries; w->next = worker->free_queries;
w->worker->free_queries = w; worker->free_queries = w;
} }
/** create error and fill into buffer */ /** create error and fill into buffer */
@ -131,179 +120,79 @@ replyerror_fillbuf(int r, struct comm_reply* repinfo, uint16_t id,
static void static void
replyerror(int r, struct work_query* w) replyerror(int r, struct work_query* w)
{ {
w->edns.edns_version = EDNS_ADVERTISED_VERSION; w->state.edns.edns_version = EDNS_ADVERTISED_VERSION;
w->edns.udp_size = EDNS_ADVERTISED_SIZE; w->state.edns.udp_size = EDNS_ADVERTISED_SIZE;
w->edns.ext_rcode = 0; w->state.edns.ext_rcode = 0;
w->edns.bits &= EDNS_DO; w->state.edns.bits &= EDNS_DO;
replyerror_fillbuf(r, &w->query_reply, w->query_id, w->query_flags, replyerror_fillbuf(r, &w->query_reply, w->query_id,
&w->qinfo); w->state.query_flags, &w->state.qinfo);
attach_edns_record(w->query_reply.c->buffer, &w->edns); attach_edns_record(w->query_reply.c->buffer, &w->state.edns);
comm_point_send_reply(&w->query_reply); comm_point_send_reply(&w->query_reply);
req_release(w); req_release(w);
query_info_clear(&w->qinfo); query_info_clear(&w->state.qinfo);
} }
/** see if rrset needs to be updated in the cache */ /** process incoming request */
static int
need_to_update_rrset(struct packed_rrset_data* newd,
struct packed_rrset_data* cached)
{
/* o if current RRset is more trustworthy - insert it */
if( newd->trust > cached->trust )
return 1;
/* o same trust, but different in data - insert it */
if( newd->trust == cached->trust &&
!rrsetdata_equal(newd, cached))
return 1;
/* o see if TTL is better than TTL in cache. */
/* if so, see if rrset+rdata is the same */
/* if so, update TTL in cache, even if trust is worse. */
if( newd->ttl > cached->ttl &&
rrsetdata_equal(newd, cached))
return 1;
return 0;
}
/** store rrsets in the rrset cache. */
static void static void
worker_store_rrsets(struct worker* worker, struct reply_info* rep) worker_process_query(struct worker* worker, struct work_query* w,
enum module_ev event)
{ {
struct lruhash_entry* e; int i;
size_t i; if(event == module_event_new) {
/* see if rrset already exists in cache, if not insert it. */ w->state.curmod = 0;
/* if it does exist: check to insert it */ for(i=0; i<worker->daemon->num_modules; i++)
for(i=0; i<rep->rrset_count; i++) { w->state.ext_state[i] = module_state_initial;
rep->ref[i].key = rep->rrsets[i];
rep->ref[i].id = rep->rrsets[i]->id;
/* looks up item with a readlock - no editing! */
if((e=slabhash_lookup(worker->daemon->rrset_cache,
rep->rrsets[i]->entry.hash, rep->rrsets[i]->entry.key,
0)) != 0) {
struct packed_rrset_data* data =
(struct packed_rrset_data*)e->data;
struct packed_rrset_data* rd =
(struct packed_rrset_data*)
rep->rrsets[i]->entry.data;
rep->ref[i].key = (struct ub_packed_rrset_key*)e->key;
rep->ref[i].id = rep->rrsets[i]->id;
/* found in cache, do checks above */
if(!need_to_update_rrset(rd, data)) {
lock_rw_unlock(&e->lock);
ub_packed_rrset_parsedelete(rep->rrsets[i],
&worker->alloc);
rep->rrsets[i] = rep->ref[i].key;
continue; /* use cached item instead */
} }
if(rd->trust < data->trust) /* allow current module to run */
rd->trust = data->trust; (*worker->daemon->modfunc[w->state.curmod]->operate)(&w->state, event,
lock_rw_unlock(&e->lock); w->state.curmod);
/* small gap here, where entry is not locked. /* TODO examine results, start further modules, etc.
* possibly entry is updated with something else. * assume it went to sleep
* this is just too bad, its cache anyway. */ */
/* use insert to update entry to manage lruhash region_free_all(worker->scratchpad);
* cache size values nicely. */ if(w->state.ext_state[w->state.curmod] == module_error) {
region_free_all(w->state.region);
replyerror(LDNS_RCODE_SERVFAIL, w);
return;
} }
slabhash_insert(worker->daemon->rrset_cache, if(w->state.ext_state[w->state.curmod] == module_finished) {
rep->rrsets[i]->entry.hash, &rep->rrsets[i]->entry, memcpy(ldns_buffer_begin(w->query_reply.c->buffer),
rep->rrsets[i]->entry.data, &worker->alloc); &w->query_id, sizeof(w->query_id));
if(e) rep->rrsets[i] = rep->ref[i].key; comm_point_send_reply(&w->query_reply);
region_free_all(w->state.region);
req_release(w);
query_info_clear(&w->state.qinfo);
return;
} }
/* suspend, waits for wakeup callback */
} }
/** process incoming replies from the network */ /** process incoming replies from the network */
static int static int
worker_handle_reply(struct comm_point* c, void* arg, int error, worker_handle_reply(struct comm_point* c, void* arg, int error,
struct comm_reply* ATTR_UNUSED(reply_info)) struct comm_reply* reply_info)
{ {
struct work_query* w = (struct work_query*)arg; struct work_query* w = (struct work_query*)arg;
struct query_info qinf; struct worker* worker = w->state.env->worker;
struct reply_info* rep;
struct msgreply_entry* e;
struct edns_data svr_edns; /* unused server edns advertisement */
uint16_t us;
int r;
verbose(VERB_DETAIL, "reply to query with stored ID %d", w->state.reply = reply_info;
ntohs(w->query_id)); /* byteswapped so same as dig prints */
if(error != 0) { if(error != 0) {
replyerror(LDNS_RCODE_SERVFAIL, w); worker_process_query(worker, w, module_event_timeout);
return 0; return 0;
} }
/* sanity check. */ /* sanity check. */
if(!LDNS_QR_WIRE(ldns_buffer_begin(c->buffer))) if(!LDNS_QR_WIRE(ldns_buffer_begin(c->buffer))
return 0; /* not a reply. */ || LDNS_OPCODE_WIRE(ldns_buffer_begin(c->buffer)) !=
if(LDNS_OPCODE_WIRE(ldns_buffer_begin(c->buffer)) != LDNS_PACKET_QUERY) LDNS_PACKET_QUERY
return 0; /* not a reply to a query. */ || LDNS_QDCOUNT(ldns_buffer_begin(c->buffer)) > 1) {
if(LDNS_QDCOUNT(ldns_buffer_begin(c->buffer)) > 1) /* error becomes timeout for the module as if this reply
return 0; /* too much in the query section */ * never arrived. */
/* see if it is truncated */ worker_process_query(worker, w, module_event_timeout);
if(LDNS_TC_WIRE(ldns_buffer_begin(c->buffer)) && c->type == comm_udp) {
log_info("TC: truncated. retry in TCP mode.");
qinfo_query_encode(w->worker->back->udp_buff, &w->qinfo);
pending_tcp_query(w->worker->back, w->worker->back->udp_buff,
&w->worker->fwd_addr, w->worker->fwd_addrlen,
TCP_QUERY_TIMEOUT, worker_handle_reply, w,
w->worker->rndstate);
return 0; return 0;
} }
/* woohoo a reply! */ worker_process_query(worker, w, module_event_reply);
if((r=reply_info_parse(c->buffer, &w->worker->alloc, &qinf, &rep,
w->worker->scratchpad, &svr_edns))!=0) {
if(r == LDNS_RCODE_SERVFAIL)
log_err("reply_info_parse: out of memory");
/* formerr on my parse gives servfail to my client */
replyerror(LDNS_RCODE_SERVFAIL, w);
region_free_all(w->worker->scratchpad);
return 0; return 0;
} }
us = w->edns.udp_size;
w->edns.edns_version = EDNS_ADVERTISED_VERSION;
w->edns.udp_size = EDNS_ADVERTISED_SIZE;
w->edns.ext_rcode = 0;
w->edns.bits &= EDNS_DO;
if(!reply_info_answer_encode(&qinf, rep, w->query_id, w->query_flags,
w->query_reply.c->buffer, 0, 0, w->worker->scratchpad, us,
&w->edns)) {
replyerror(LDNS_RCODE_SERVFAIL, w);
query_info_clear(&qinf);
reply_info_parsedelete(rep, &w->worker->alloc);
region_free_all(w->worker->scratchpad);
return 0;
}
comm_point_send_reply(&w->query_reply);
region_free_all(w->worker->scratchpad);
req_release(w);
query_info_clear(&w->qinfo);
if(rep->ttl == 0) {
log_info("TTL 0: dropped");
query_info_clear(&qinf);
reply_info_parsedelete(rep, &w->worker->alloc);
return 0;
}
reply_info_set_ttls(rep, time(0));
worker_store_rrsets(w->worker, rep);
reply_info_sortref(rep);
/* store msg in the cache */
if(!(e = query_info_entrysetup(&qinf, rep, w->query_hash))) {
query_info_clear(&qinf);
reply_info_parsedelete(rep, &w->worker->alloc);
return 0;
}
slabhash_insert(w->worker->daemon->msg_cache, w->query_hash,
&e->entry, rep, &w->worker->alloc);
return 0;
}
/** process incoming request */
static void
worker_process_query(struct worker* worker, struct work_query* w)
{
/* query the forwarding address */
verbose(VERB_DETAIL, "process_query ID %d", ntohs(w->query_id));
pending_udp_query(worker->back, w->query_reply.c->buffer,
&worker->fwd_addr, worker->fwd_addrlen, UDP_QUERY_TIMEOUT,
worker_handle_reply, w, worker->rndstate);
}
/** check request sanity. Returns error code, 0 OK, or -1 discard. /** check request sanity. Returns error code, 0 OK, or -1 discard.
* @param pkt: the wire packet to examine for sanity. * @param pkt: the wire packet to examine for sanity.
@ -515,7 +404,7 @@ worker_handle_request(struct comm_point* c, void* arg, int error,
query_info_clear(&qinfo); query_info_clear(&qinfo);
return 0; return 0;
} }
w->edns = edns; w->state.edns = edns;
worker->free_queries = w->next; worker->free_queries = w->next;
worker->num_requests ++; worker->num_requests ++;
log_assert(worker->num_requests <= worker->request_size); log_assert(worker->num_requests <= worker->request_size);
@ -526,14 +415,15 @@ worker_handle_request(struct comm_point* c, void* arg, int error,
/* init request */ /* init request */
w->next = NULL; w->next = NULL;
w->query_hash = h; w->state.query_hash = h;
memcpy(&w->query_reply, repinfo, sizeof(struct comm_reply)); memcpy(&w->query_reply, repinfo, sizeof(struct comm_reply));
memcpy(&w->qinfo, &qinfo, sizeof(struct query_info)); memcpy(&w->state.qinfo, &qinfo, sizeof(struct query_info));
memcpy(&w->query_id, ldns_buffer_begin(c->buffer), sizeof(uint16_t)); memcpy(&w->query_id, ldns_buffer_begin(c->buffer), sizeof(uint16_t));
w->query_flags = ldns_buffer_read_u16_at(c->buffer, 2); w->state.query_flags = ldns_buffer_read_u16_at(c->buffer, 2);
/* answer it */ /* answer it */
worker_process_query(worker, w); w->state.buf = c->buffer;
worker_process_query(worker, w, module_event_new);
return 0; return 0;
} }
@ -610,7 +500,16 @@ reqs_init(struct worker* worker)
struct work_query* q = (struct work_query*)calloc(1, struct work_query* q = (struct work_query*)calloc(1,
sizeof(struct work_query)); sizeof(struct work_query));
if(!q) return 0; if(!q) return 0;
q->worker = worker; q->state.buf = worker->front->udp_buff;
q->state.scratch = worker->scratchpad;
q->state.region = region_create_custom(malloc, free, 1024,
64, 16, 0);
if(!q->state.region) {
free(q);
return 0;
}
q->state.env = &worker->env;
q->state.work_info = q;
q->next = worker->free_queries; q->next = worker->free_queries;
worker->free_queries = q; worker->free_queries = q;
q->all_next = worker->all_queries; q->all_next = worker->all_queries;
@ -627,9 +526,10 @@ reqs_delete(struct worker* worker)
struct work_query* n; struct work_query* n;
while(q) { while(q) {
n = q->all_next; n = q->all_next;
log_assert(q->worker == worker); log_assert(q->state.env->worker == worker);
/* comm_reply closed in outside_network_delete */ /* comm_reply closed in outside_network_delete */
query_info_clear(&q->qinfo); query_info_clear(&q->state.qinfo);
region_destroy(q->state.region);
free(q); free(q);
q = n; q = n;
} }
@ -710,24 +610,25 @@ worker_init(struct worker* worker, struct config_file *cfg,
return 0; return 0;
} }
} }
worker->scratchpad = region_create_custom(malloc, free,
65536, 8192, 32, 1);
if(!worker->scratchpad) {
log_err("malloc failure");
worker_delete(worker);
return 0;
}
worker->request_size = cfg->num_queries_per_thread; worker->request_size = cfg->num_queries_per_thread;
if(!reqs_init(worker)) { if(!reqs_init(worker)) {
worker_delete(worker); worker_delete(worker);
return 0; return 0;
} }
/* set forwarder address */
if(cfg->fwd_address && cfg->fwd_address[0]) {
if(!worker_set_fwd(worker, cfg->fwd_address, cfg->fwd_port)) {
worker_delete(worker);
fatal_exit("could not set forwarder address");
}
}
server_stats_init(&worker->stats); server_stats_init(&worker->stats);
alloc_init(&worker->alloc, &worker->daemon->superalloc, alloc_init(&worker->alloc, &worker->daemon->superalloc,
worker->thread_num); worker->thread_num);
worker->scratchpad = region_create_custom(malloc, free, worker->env = *worker->daemon->env;
65536, 8192, 32, 1); worker->env.worker = worker;
worker->env.alloc = &worker->alloc;
return 1; return 1;
} }
@ -765,34 +666,15 @@ worker_delete(struct worker* worker)
} }
int int
worker_set_fwd(struct worker* worker, const char* ip, int port) worker_send_query(ldns_buffer* pkt, struct sockaddr_storage* addr,
socklen_t addrlen, int timeout, struct module_qstate* q, int use_tcp)
{ {
uint16_t p; struct worker* worker = q->env->worker;
log_assert(worker && ip); if(use_tcp) {
p = (uint16_t) port; return pending_tcp_query(worker->back, pkt, addr, addrlen,
if(str_is_ip6(ip)) { timeout, worker_handle_reply, q->work_info,
struct sockaddr_in6* sa = worker->rndstate);
(struct sockaddr_in6*)&worker->fwd_addr;
worker->fwd_addrlen = (socklen_t)sizeof(struct sockaddr_in6);
memset(sa, 0, worker->fwd_addrlen);
sa->sin6_family = AF_INET6;
sa->sin6_port = (in_port_t)htons(p);
if(inet_pton((int)sa->sin6_family, ip, &sa->sin6_addr) <= 0) {
log_err("Bad ip6 address %s", ip);
return 0;
} }
} else { /* ip4 */ return pending_udp_query(worker->back, pkt, addr, addrlen, timeout,
struct sockaddr_in* sa = worker_handle_reply, q->work_info, worker->rndstate);
(struct sockaddr_in*)&worker->fwd_addr;
worker->fwd_addrlen = (socklen_t)sizeof(struct sockaddr_in);
memset(sa, 0, worker->fwd_addrlen);
sa->sin_family = AF_INET;
sa->sin_port = (in_port_t)htons(p);
if(inet_pton((int)sa->sin_family, ip, &sa->sin_addr) <= 0) {
log_err("Bad ip4 address %s", ip);
return 0;
}
}
verbose(VERB_ALGO, "fwd queries to: %s %d", ip, p);
return 1;
} }

View file

@ -50,6 +50,7 @@
#include "util/data/msgreply.h" #include "util/data/msgreply.h"
#include "util/data/msgparse.h" #include "util/data/msgparse.h"
#include "daemon/stats.h" #include "daemon/stats.h"
#include "util/module.h"
struct listen_dnsport; struct listen_dnsport;
struct outside_network; struct outside_network;
struct config_file; struct config_file;
@ -71,22 +72,14 @@ enum worker_commands {
struct work_query { struct work_query {
/** next query in freelist */ /** next query in freelist */
struct work_query* next; struct work_query* next;
/** the worker for this query */ /** query state */
struct worker* worker; struct module_qstate state;
/** the query reply destination, packet buffer and where to send. */ /** the query reply destination, packet buffer and where to send. */
struct comm_reply query_reply; struct comm_reply query_reply;
/** the query_info structure from the query */
struct query_info qinfo;
/** hash value of the query qinfo */
hashvalue_t query_hash;
/** next query in all-list */
struct work_query* all_next;
/** id of query, in network byteorder. */ /** id of query, in network byteorder. */
uint16_t query_id; uint16_t query_id;
/** flags uint16 from query */ /** next query in all-list */
uint16_t query_flags; struct work_query* all_next;
/** edns data from the query */
struct edns_data edns;
}; };
/** /**
@ -124,11 +117,6 @@ struct worker {
/** list of all working queries */ /** list of all working queries */
struct work_query* all_queries; struct work_query* all_queries;
/** address to forward to */
struct sockaddr_storage fwd_addr;
/** length of fwd_addr */
socklen_t fwd_addrlen;
/** random() table for this worker. */ /** random() table for this worker. */
struct ub_randstate* rndstate; struct ub_randstate* rndstate;
/** do we need to restart (instead of exit) ? */ /** do we need to restart (instead of exit) ? */
@ -139,6 +127,9 @@ struct worker {
struct server_stats stats; struct server_stats stats;
/** thread scratch region */ /** thread scratch region */
struct region* scratchpad; struct region* scratchpad;
/** module environment passed to modules, changed for this thread */
struct module_env env;
}; };
/** /**
@ -173,15 +164,6 @@ void worker_work(struct worker* worker);
*/ */
void worker_delete(struct worker* worker); void worker_delete(struct worker* worker);
/**
* Set forwarder
* @param worker: the worker to modify.
* @param ip: the server name.
* @param port: port on server or NULL for default 53.
* @return: false on error.
*/
int worker_set_fwd(struct worker* worker, const char* ip, int port);
/** /**
* Send a command to a worker. Uses blocking writes. * Send a command to a worker. Uses blocking writes.
* @param worker: worker to send command to. * @param worker: worker to send command to.
@ -198,4 +180,18 @@ void worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
*/ */
void worker_sighandler(int sig, void* arg); void worker_sighandler(int sig, void* arg);
/**
* Worker service routine to send udp messages for modules.
* @param pkt: packet to send.
* @param addr: where to.
* @param addrlen: length of addr.
* @param timeout: seconds to wait until timeout.
* @param q: wich query state to reactivate upon return.
* @param use_tcp: true to use TCP, false for UDP.
* return: false on failure (memory or socket related). no query was
* sent.
*/
int worker_send_query(ldns_buffer* pkt, struct sockaddr_storage* addr,
socklen_t addrlen, int timeout, struct module_qstate* q, int use_tcp);
#endif /* DAEMON_WORKER_H */ #endif /* DAEMON_WORKER_H */

View file

@ -1,3 +1,6 @@
11 May 2007: Wouter
- iterator/iterator.c module.
10 May 2007: Wouter 10 May 2007: Wouter
- created release-0.3 svn tag. - created release-0.3 svn tag.
- util/module.h - util/module.h

309
iterator/iterator.c Normal file
View file

@ -0,0 +1,309 @@
/*
* iterator/iterator.c - iterative resolver DNS query response module
*
* Copyright (c) 2007, NLnet Labs. All rights reserved.
*
* This software is open source.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* Neither the name of the NLNET LABS nor the names of its contributors may
* be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* \file
*
* This file contains a module that performs recusive iterative DNS query
* processing.
*/
#include "config.h"
#include "iterator/iterator.h"
#include "util/module.h"
#include "util/netevent.h"
#include "util/config_file.h"
#include "util/net_help.h"
#include "util/storage/slabhash.h"
/**
* Set forwarder address
* @param ie: iterator global state.
* @param ip: the server name.
* @param port: port on server or NULL for default 53.
* @return: false on error.
*/
static int
iter_set_fwd(struct iter_env* ie, const char* ip, int port)
{
uint16_t p;
log_assert(ie && ip);
p = (uint16_t) port;
if(str_is_ip6(ip)) {
struct sockaddr_in6* sa =
(struct sockaddr_in6*)&ie->fwd_addr;
ie->fwd_addrlen = (socklen_t)sizeof(struct sockaddr_in6);
memset(sa, 0, ie->fwd_addrlen);
sa->sin6_family = AF_INET6;
sa->sin6_port = (in_port_t)htons(p);
if(inet_pton((int)sa->sin6_family, ip, &sa->sin6_addr) <= 0) {
log_err("Bad ip6 address %s", ip);
return 0;
}
} else { /* ip4 */
struct sockaddr_in* sa =
(struct sockaddr_in*)&ie->fwd_addr;
ie->fwd_addrlen = (socklen_t)sizeof(struct sockaddr_in);
memset(sa, 0, ie->fwd_addrlen);
sa->sin_family = AF_INET;
sa->sin_port = (in_port_t)htons(p);
if(inet_pton((int)sa->sin_family, ip, &sa->sin_addr) <= 0) {
log_err("Bad ip4 address %s", ip);
return 0;
}
}
verbose(VERB_ALGO, "iterator: fwd queries to: %s %d", ip, p);
return 1;
}
/** iterator init */
static int
iter_init(struct module_env* env, int id)
{
struct iter_env* iter_env = (struct iter_env*)calloc(1,
sizeof(struct iter_env));
if(!iter_env) {
log_err("malloc failure");
return 0;
}
env->modinfo[id] = (void*)iter_env;
/* set forwarder address */
if(env->cfg->fwd_address && env->cfg->fwd_address[0]) {
if(!iter_set_fwd(iter_env, env->cfg->fwd_address,
env->cfg->fwd_port)) {
log_err("iterator: could not set forwarder address");
return 0;
}
}
return 1;
}
/** iterator deinit */
static void
iter_deinit(struct module_env* env, int id)
{
struct iter_env* iter_env;
if(!env || !env->modinfo)
return;
iter_env = (struct iter_env*)env->modinfo[id];
if(iter_env)
free(iter_env);
}
/** see if rrset needs to be updated in the cache */
static int
need_to_update_rrset(struct packed_rrset_data* newd,
struct packed_rrset_data* cached)
{
/* o if current RRset is more trustworthy - insert it */
if( newd->trust > cached->trust )
return 1;
/* o same trust, but different in data - insert it */
if( newd->trust == cached->trust &&
!rrsetdata_equal(newd, cached))
return 1;
/* o see if TTL is better than TTL in cache. */
/* if so, see if rrset+rdata is the same */
/* if so, update TTL in cache, even if trust is worse. */
if( newd->ttl > cached->ttl &&
rrsetdata_equal(newd, cached))
return 1;
return 0;
}
/** store rrsets in the rrset cache. */
static void
worker_store_rrsets(struct module_env* env, struct reply_info* rep)
{
struct lruhash_entry* e;
size_t i;
/* see if rrset already exists in cache, if not insert it. */
/* if it does exist: check to insert it */
for(i=0; i<rep->rrset_count; i++) {
rep->ref[i].key = rep->rrsets[i];
rep->ref[i].id = rep->rrsets[i]->id;
/* looks up item with a readlock - no editing! */
if((e=slabhash_lookup(env->rrset_cache,
rep->rrsets[i]->entry.hash, rep->rrsets[i]->entry.key,
0)) != 0) {
struct packed_rrset_data* data =
(struct packed_rrset_data*)e->data;
struct packed_rrset_data* rd =
(struct packed_rrset_data*)
rep->rrsets[i]->entry.data;
rep->ref[i].key = (struct ub_packed_rrset_key*)e->key;
rep->ref[i].id = rep->rrsets[i]->id;
/* found in cache, do checks above */
if(!need_to_update_rrset(rd, data)) {
lock_rw_unlock(&e->lock);
ub_packed_rrset_parsedelete(rep->rrsets[i],
env->alloc);
rep->rrsets[i] = rep->ref[i].key;
continue; /* use cached item instead */
}
if(rd->trust < data->trust)
rd->trust = data->trust;
lock_rw_unlock(&e->lock);
/* small gap here, where entry is not locked.
* possibly entry is updated with something else.
* this is just too bad, its cache anyway. */
/* use insert to update entry to manage lruhash
* cache size values nicely. */
}
slabhash_insert(env->rrset_cache,
rep->rrsets[i]->entry.hash, &rep->rrsets[i]->entry,
rep->rrsets[i]->entry.data, env->alloc);
if(e) rep->rrsets[i] = rep->ref[i].key;
}
}
/** store message in the cache */
static void
store_msg(struct module_qstate* qstate, struct query_info* qinfo,
struct reply_info* rep)
{
struct msgreply_entry* e;
reply_info_set_ttls(rep, time(0));
worker_store_rrsets(qstate->env, rep);
if(rep->ttl == 0) {
log_info("TTL 0: dropped msg from cache");
return;
}
reply_info_sortref(rep);
/* store msg in the cache */
if(!(e = query_info_entrysetup(qinfo, rep, qstate->query_hash))) {
log_err("store_msg: malloc failed");
return;
}
slabhash_insert(qstate->env->msg_cache, qstate->query_hash,
&e->entry, rep, &qstate->env->alloc);
}
/** iterator operate on a query */
static void
iter_operate(struct module_qstate* qstate, enum module_ev event, int id)
{
struct module_env* env = qstate->env;
struct iter_env* ie = (struct iter_env*)env->modinfo[id];
verbose(VERB_ALGO, "iterator[module %d] operate: extstate:%s event:%s",
id, strextstate(qstate->ext_state[id]), strmodulevent(event));
if(event == module_event_error) {
qstate->ext_state[id] = module_error;
return;
}
if(event == module_event_new) {
/* send UDP query to forwarder address */
(*env->send_query)(qstate->buf, &ie->fwd_addr,
ie->fwd_addrlen, UDP_QUERY_TIMEOUT, qstate, 0);
qstate->ext_state[id] = module_wait_reply;
qstate->minfo[id] = NULL;
return;
}
if(event == module_event_timeout) {
/* try TCP if UDP fails */
if(qstate->reply->c->type == comm_udp) {
qinfo_query_encode(qstate->buf, &qstate->qinfo);
(*env->send_query)(qstate->buf, &ie->fwd_addr,
ie->fwd_addrlen, TCP_QUERY_TIMEOUT, qstate, 1);
return;
}
qstate->ext_state[id] = module_error;
return;
}
if(event == module_event_reply) {
uint16_t us = qstate->edns.udp_size;
struct query_info reply_qinfo;
struct reply_info* reply_msg;
struct edns_data reply_edns;
int r;
/* see if it is truncated */
if(LDNS_TC_WIRE(ldns_buffer_begin(qstate->reply->c->buffer))
&& qstate->reply->c->type == comm_udp) {
log_info("TC: truncated. retry in TCP mode.");
qinfo_query_encode(qstate->buf, &qstate->qinfo);
(*env->send_query)(qstate->buf, &ie->fwd_addr,
ie->fwd_addrlen, TCP_QUERY_TIMEOUT, qstate, 1);
/* stay in wait_reply state */
return;
}
if((r=reply_info_parse(qstate->reply->c->buffer, env->alloc,
&reply_qinfo, &reply_msg, qstate->scratch,
&reply_edns))!=0) {
qstate->ext_state[id] = module_error;
return;
}
qstate->edns.edns_version = EDNS_ADVERTISED_VERSION;
qstate->edns.udp_size = EDNS_ADVERTISED_SIZE;
qstate->edns.ext_rcode = 0;
qstate->edns.bits &= EDNS_DO;
if(!reply_info_answer_encode(&reply_qinfo, reply_msg, 0,
qstate->query_flags, qstate->buf, 0, 0,
qstate->scratch, us, &qstate->edns)) {
qstate->ext_state[id] = module_error;
return;
}
store_msg(qstate, &reply_qinfo, reply_msg);
qstate->ext_state[id] = module_finished;
return;
}
log_err("bad event for iterator");
qstate->ext_state[id] = module_error;
}
/** iterator cleanup query state */
static void
iter_clear(struct module_qstate* qstate, int id)
{
if(!qstate)
return;
/* allocated in region, so nothing to do */
qstate->minfo[id] = NULL;
}
/**
* The iterator function block
*/
static struct module_func_block iter_block = {
"iterator",
&iter_init, &iter_deinit, &iter_operate, &iter_clear
};
struct module_func_block*
iter_get_funcblock()
{
return &iter_block;
}

68
iterator/iterator.h Normal file
View file

@ -0,0 +1,68 @@
/*
* iterator/iterator.h - iterative resolver DNS query response module
*
* Copyright (c) 2007, NLnet Labs. All rights reserved.
*
* This software is open source.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* Neither the name of the NLNET LABS nor the names of its contributors may
* be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* \file
*
* This file contains a module that performs recusive iterative DNS query
* processing.
*/
#ifndef ITERATOR_ITERATOR_H
#define ITERATOR_ITERATOR_H
struct module_func_block;
/**
* Global state for the iterator.
*/
struct iter_env {
/** address to forward to */
struct sockaddr_storage fwd_addr;
/** length of fwd_addr */
socklen_t fwd_addrlen;
};
/**
* Per query state for the iterator module.
*/
struct iter_qstate {
};
/**
* Get the iterator function block.
*/
struct module_func_block* iter_get_funcblock();
#endif /* ITERATOR_ITERATOR_H */

View file

@ -122,7 +122,7 @@ waiting_tcp_delete(struct waiting_tcp* w)
} }
/** use next free buffer to service a tcp query */ /** use next free buffer to service a tcp query */
static void static int
outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt) outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt)
{ {
struct pending_tcp* pend = w->outnet->tcp_free; struct pending_tcp* pend = w->outnet->tcp_free;
@ -139,9 +139,7 @@ outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt)
if(s == -1) { if(s == -1) {
log_err("outgoing tcp: socket: %s", strerror(errno)); log_err("outgoing tcp: socket: %s", strerror(errno));
log_addr(&w->addr, w->addrlen); log_addr(&w->addr, w->addrlen);
(void)(*w->cb)(NULL, w->cb_arg, NETEVENT_CLOSED, NULL); return 0;
waiting_tcp_delete(w);
return;
} }
fd_set_nonblock(s); fd_set_nonblock(s);
if(connect(s, (struct sockaddr*)&w->addr, w->addrlen) == -1) { if(connect(s, (struct sockaddr*)&w->addr, w->addrlen) == -1) {
@ -149,9 +147,7 @@ outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt)
log_err("outgoing tcp: connect: %s", strerror(errno)); log_err("outgoing tcp: connect: %s", strerror(errno));
log_addr(&w->addr, w->addrlen); log_addr(&w->addr, w->addrlen);
close(s); close(s);
(void)(*w->cb)(NULL, w->cb_arg, NETEVENT_CLOSED, NULL); return 0;
waiting_tcp_delete(w);
return;
} }
} }
w->pkt = NULL; w->pkt = NULL;
@ -166,7 +162,7 @@ outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt)
pend->c->tcp_is_reading = 0; pend->c->tcp_is_reading = 0;
pend->c->tcp_byte_count = 0; pend->c->tcp_byte_count = 0;
comm_point_start_listening(pend->c, s, -1); comm_point_start_listening(pend->c, s, -1);
return; return 1;
} }
/** see if buffers can be used to service TCP queries. */ /** see if buffers can be used to service TCP queries. */
@ -179,7 +175,10 @@ use_free_buffer(struct outside_network* outnet)
outnet->tcp_wait_first = w->next_waiting; outnet->tcp_wait_first = w->next_waiting;
if(outnet->tcp_wait_last == w) if(outnet->tcp_wait_last == w)
outnet->tcp_wait_last = NULL; outnet->tcp_wait_last = NULL;
outnet_tcp_take_into_use(w, w->pkt); if(!outnet_tcp_take_into_use(w, w->pkt)) {
(void)(*w->cb)(NULL, w->cb_arg, NETEVENT_CLOSED, NULL);
waiting_tcp_delete(w);
}
} }
} }
@ -649,7 +648,7 @@ select_port(struct outside_network* outnet, struct pending* pend,
} }
void int
pending_udp_query(struct outside_network* outnet, ldns_buffer* packet, pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
struct sockaddr_storage* addr, socklen_t addrlen, int timeout, struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
comm_point_callback_t* cb, void* cb_arg, struct ub_randstate* rnd) comm_point_callback_t* cb, void* cb_arg, struct ub_randstate* rnd)
@ -660,19 +659,15 @@ pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
/* create pending struct and change ID to be unique */ /* create pending struct and change ID to be unique */
if(!(pend=new_pending(outnet, packet, addr, addrlen, cb, cb_arg, if(!(pend=new_pending(outnet, packet, addr, addrlen, cb, cb_arg,
rnd))) { rnd))) {
/* callback user for the error */ return 0;
(void)(*cb)(NULL, cb_arg, NETEVENT_CLOSED, NULL);
return;
} }
select_port(outnet, pend, rnd); select_port(outnet, pend, rnd);
/* send it over the commlink */ /* send it over the commlink */
if(!comm_point_send_udp_msg(pend->c, packet, (struct sockaddr*)addr, if(!comm_point_send_udp_msg(pend->c, packet, (struct sockaddr*)addr,
addrlen)) { addrlen)) {
/* callback user for the error */
(void)(*pend->cb)(pend->c, pend->cb_arg, NETEVENT_CLOSED, NULL);
pending_delete(outnet, pend); pending_delete(outnet, pend);
return; return 0;
} }
/* system calls to set timeout after sending UDP to make roundtrip /* system calls to set timeout after sending UDP to make roundtrip
@ -680,6 +675,7 @@ pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
tv.tv_sec = timeout; tv.tv_sec = timeout;
tv.tv_usec = 0; tv.tv_usec = 0;
comm_timer_set(pend->timer, &tv); comm_timer_set(pend->timer, &tv);
return 1;
} }
/** callback for outgoing TCP timer event */ /** callback for outgoing TCP timer event */
@ -714,7 +710,7 @@ outnet_tcptimer(void* arg)
use_free_buffer(outnet); use_free_buffer(outnet);
} }
void int
pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet, pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
struct sockaddr_storage* addr, socklen_t addrlen, int timeout, struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
comm_point_callback_t* callback, void* callback_arg, comm_point_callback_t* callback, void* callback_arg,
@ -728,14 +724,11 @@ pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
w = (struct waiting_tcp*)malloc(sizeof(struct waiting_tcp) w = (struct waiting_tcp*)malloc(sizeof(struct waiting_tcp)
+ (pend?0:ldns_buffer_limit(packet))); + (pend?0:ldns_buffer_limit(packet)));
if(!w) { if(!w) {
/* callback user for the error */ return 0;
(void)(*callback)(NULL, callback_arg, NETEVENT_CLOSED, NULL);
return;
} }
if(!(w->timer = comm_timer_create(outnet->base, outnet_tcptimer, w))) { if(!(w->timer = comm_timer_create(outnet->base, outnet_tcptimer, w))) {
free(w); free(w);
(void)(*callback)(NULL, callback_arg, NETEVENT_CLOSED, NULL); return 0;
return;
} }
w->pkt = NULL; w->pkt = NULL;
w->pkt_len = ldns_buffer_limit(packet); w->pkt_len = ldns_buffer_limit(packet);
@ -752,7 +745,10 @@ pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
comm_timer_set(w->timer, &tv); comm_timer_set(w->timer, &tv);
if(pend) { if(pend) {
/* we have a buffer available right now */ /* we have a buffer available right now */
outnet_tcp_take_into_use(w, ldns_buffer_begin(packet)); if(!outnet_tcp_take_into_use(w, ldns_buffer_begin(packet))) {
waiting_tcp_delete(w);
return 0;
}
} else { } else {
/* queue up */ /* queue up */
w->pkt = (uint8_t*)w + sizeof(struct waiting_tcp); w->pkt = (uint8_t*)w + sizeof(struct waiting_tcp);
@ -763,4 +759,5 @@ pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
else outnet->tcp_wait_first = w; else outnet->tcp_wait_first = w;
outnet->tcp_wait_last = w; outnet->tcp_wait_last = w;
} }
return 1;
} }

View file

@ -204,12 +204,11 @@ void outside_network_delete(struct outside_network* outnet);
* @param addrlen: length of addr. * @param addrlen: length of addr.
* @param timeout: in seconds from now. * @param timeout: in seconds from now.
* @param callback: function to call on error, timeout or reply. * @param callback: function to call on error, timeout or reply.
* The routine does not return an error, instead it calls the callback,
* with an error code if an error happens.
* @param callback_arg: user argument for callback function. * @param callback_arg: user argument for callback function.
* @param rnd: random state for generating ID and port. * @param rnd: random state for generating ID and port.
* @return: false on error for malloc or socket.
*/ */
void pending_udp_query(struct outside_network* outnet, ldns_buffer* packet, int pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
struct sockaddr_storage* addr, socklen_t addrlen, int timeout, struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
comm_point_callback_t* callback, void* callback_arg, comm_point_callback_t* callback, void* callback_arg,
struct ub_randstate* rnd); struct ub_randstate* rnd);
@ -225,12 +224,11 @@ void pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
* Timer starts running now. Timer may expire if all buffers are used, * Timer starts running now. Timer may expire if all buffers are used,
* without any query been sent to the server yet. * without any query been sent to the server yet.
* @param callback: function to call on error, timeout or reply. * @param callback: function to call on error, timeout or reply.
* The routine does not return an error, instead it calls the callback,
* with an error code if an error happens.
* @param callback_arg: user argument for callback function. * @param callback_arg: user argument for callback function.
* @param rnd: random state for generating ID. * @param rnd: random state for generating ID.
* @return: false on error for malloc or socket.
*/ */
void pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet, int pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
struct sockaddr_storage* addr, socklen_t addrlen, int timeout, struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
comm_point_callback_t* callback, void* callback_arg, comm_point_callback_t* callback, void* callback_arg,
struct ub_randstate* rnd); struct ub_randstate* rnd);

View file

@ -656,7 +656,7 @@ outside_network_delete(struct outside_network* outnet)
free(outnet); free(outnet);
} }
void int
pending_udp_query(struct outside_network* outnet, ldns_buffer* packet, pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
struct sockaddr_storage* addr, socklen_t addrlen, int timeout, struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
comm_point_callback_t* callback, void* callback_arg, comm_point_callback_t* callback, void* callback_arg,
@ -701,9 +701,10 @@ pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
/* add to list */ /* add to list */
pend->next = runtime->pending_list; pend->next = runtime->pending_list;
runtime->pending_list = pend; runtime->pending_list = pend;
return 1;
} }
void int
pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet, pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
struct sockaddr_storage* addr, socklen_t addrlen, int timeout, struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
comm_point_callback_t* callback, void* callback_arg, comm_point_callback_t* callback, void* callback_arg,
@ -748,6 +749,7 @@ pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
/* add to list */ /* add to list */
pend->next = runtime->pending_list; pend->next = runtime->pending_list;
runtime->pending_list = pend; runtime->pending_list = pend;
return 1;
} }
struct listen_port* listening_ports_open(struct config_file* ATTR_UNUSED(cfg)) struct listen_port* listening_ports_open(struct config_file* ATTR_UNUSED(cfg))

View file

@ -39,3 +39,32 @@
#include "config.h" #include "config.h"
#include "util/module.h" #include "util/module.h"
const char*
strextstate(enum module_ext_state s)
{
switch(s) {
case module_state_initial: return "module_state_initial";
case module_wait_reply: return "module_wait_reply";
case module_wait_module: return "module_wait_module";
case module_wait_subquery: return "module_wait_subquery";
case module_error: return "module_error";
case module_finished: return "module_finished";
}
return "bad_extstate_value";
}
const char*
strmodulevent(enum module_ev e)
{
switch(e) {
case module_event_new: return "module_event_new";
case module_event_pass: return "module_event_pass";
case module_event_reply: return "module_event_reply";
case module_event_timeout: return "module_event_timeout";
case module_event_mod_done: return "module_event_mod_done";
case module_event_subq_done: return "module_event_subq_done";
case module_event_error: return "module_event_error";
}
return "bad_event_value";
}

View file

@ -42,12 +42,16 @@
#ifndef UTIL_MODULE_H #ifndef UTIL_MODULE_H
#define UTIL_MODULE_H #define UTIL_MODULE_H
#include "util/storage/lruhash.h" #include "util/storage/lruhash.h"
#include "util/data/msgreply.h"
#include "util/data/msgparse.h"
struct alloc_cache;
struct config_file; struct config_file;
struct slabhash; struct slabhash;
struct query_info; struct query_info;
struct edns_data; struct edns_data;
struct region; struct region;
struct worker; struct worker;
struct module_qstate;
/** Maximum number of modules in operation */ /** Maximum number of modules in operation */
#define MAX_MODULE 2 #define MAX_MODULE 2
@ -66,11 +70,27 @@ struct module_env {
struct slabhash* rrset_cache; struct slabhash* rrset_cache;
/* --- services --- */ /* --- services --- */
/** send DNS query to server. operate() should return with wait_reply */ /**
* Send DNS query to server. operate() should return with wait_reply.
* Later on a callback will cause operate() to be called with event
* timeout or reply.
* @param pkt: packet to send.
* @param addr: where to.
* @param addrlen: length of addr.
* @param timeout: seconds to wait until timeout.
* @param q: wich query state to reactivate upon return.
* @param use_tcp: set to true to send over TCP. 0 for UDP.
* return: false on failure (memory or socket related). no query was
* sent.
*/
int (*send_query)(ldns_buffer* pkt, struct sockaddr_storage* addr,
socklen_t addrlen, int timeout, struct module_qstate* q,
int use_tcp);
/** create a subquery. operate should then return with wait_subq */ /** create a subquery. operate should then return with wait_subq */
/** allocation service */ /** allocation service */
struct alloc* alloc; struct alloc_cache* alloc;
/** internal data for daemon - worker thread. */ /** internal data for daemon - worker thread. */
struct worker* worker; struct worker* worker;
/** module specific data. indexed by module id. */ /** module specific data. indexed by module id. */
@ -122,30 +142,34 @@ enum module_ev {
*/ */
struct module_qstate { struct module_qstate {
/** which query is being answered: name, type, class */ /** which query is being answered: name, type, class */
struct query_info* qinfo; struct query_info qinfo;
/** hash value of the query qinfo */ /** hash value of the query qinfo */
hashvalue_t query_hash; hashvalue_t query_hash;
/** flags uint16 from query */ /** flags uint16 from query */
uint16_t query_flags; uint16_t query_flags;
/** edns data from the query */ /** edns data from the query */
struct edns_data* edns; struct edns_data edns;
/** buffer, contains server replies, store resulting reply here. /** buffer, store resulting reply here.
* May be cleared when module blocks. */ * May be cleared when module blocks. */
ldns_buffer* buf; ldns_buffer* buf;
/** parsed message from server */ /** comm_reply contains server replies */
struct msg_parse* msg_parse; struct comm_reply* reply;
/** region for temporary usage. May be cleared when module blocks. */ /** region for temporary usage. May be cleared when module blocks. */
struct region* scratch; struct region* scratch;
/** region for this query. Cleared when query process finishes. */ /** region for this query. Cleared when query process finishes. */
struct region* region; struct region* region;
/** which module is executing */
int curmod;
/** module states */ /** module states */
enum module_ext_state ext_state[MAX_MODULE]; enum module_ext_state ext_state[MAX_MODULE];
/** module specific data for query. indexed by module id. */ /** module specific data for query. indexed by module id. */
void* minfo[MAX_MODULE]; void* minfo[MAX_MODULE];
/** environment for this query */ /** environment for this query */
struct module_env* module_env; struct module_env* env;
/** worker related state for this query */
struct work_query* work_info;
/** parent query, only nonNULL for subqueries */ /** parent query, only nonNULL for subqueries */
struct module_qstate* parent; struct module_qstate* parent;
@ -163,14 +187,15 @@ struct module_func_block {
char* name; char* name;
/** /**
* init the module * init the module. Called once for the global state.
* This is the place to apply settings from the config file.
* @param env: module environment. * @param env: module environment.
* @param id: module id number. * @param id: module id number.
* return: 0 on error * return: 0 on error
*/ */
int (*init)(struct module_env* env, int id); int (*init)(struct module_env* env, int id);
/** /**
* de-init, delete, the module. * de-init, delete, the module. Called once for the global state.
* @param env: module environment. * @param env: module environment.
* @param id: module id number. * @param id: module id number.
*/ */
@ -197,4 +222,18 @@ struct module_func_block {
void (*clear)(struct module_qstate* qstate, int id); void (*clear)(struct module_qstate* qstate, int id);
}; };
/**
* Debug utility: module external qstate to string
* @param s: the state value.
* @return descriptive string.
*/
const char* strextstate(enum module_ext_state s);
/**
* Debug utility: module event to string
* @param e: the module event value.
* @return descriptive string.
*/
const char* strmodulevent(enum module_ev e);
#endif /* UTIL_MODULE_H */ #endif /* UTIL_MODULE_H */

View file

@ -52,6 +52,15 @@
/** QR flag */ /** QR flag */
#define BIT_QR 0x8000 #define BIT_QR 0x8000
/** timeout in seconds for UDP queries to auth servers. TODO: proper rtt */
#define UDP_QUERY_TIMEOUT 4
/** timeout in seconds for TCP queries to auth servers. TODO: proper rtt */
#define TCP_QUERY_TIMEOUT 30
/** Advertised version of EDNS capabilities */
#define EDNS_ADVERTISED_VERSION 0
/** Advertised size of EDNS capabilities */
#define EDNS_ADVERTISED_SIZE 4096
/** /**
* See if string is ip4 or ip6. * See if string is ip4 or ip6.
* @param str: IP specification. * @param str: IP specification.