mirror of
https://github.com/NLnetLabs/unbound.git
synced 2025-12-20 23:00:56 -05:00
Multiple queries per thread.
git-svn-id: file:///svn/unbound/trunk@201 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
parent
ccf3d7190d
commit
1eb9ff1da3
5 changed files with 140 additions and 80 deletions
156
daemon/worker.c
156
daemon/worker.c
|
|
@ -74,18 +74,34 @@ worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
|
||||||
log_err("write socket: %s", strerror(errno));
|
log_err("write socket: %s", strerror(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** release workrequest back to the freelist,
|
||||||
|
* note that the w->qinfo still needs to be cleared after this.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
req_release(struct work_query* w)
|
||||||
|
{
|
||||||
|
if(w->worker->num_requests == w->worker->request_size) {
|
||||||
|
/* no longer at max, start accepting again. */
|
||||||
|
listen_resume(w->worker->front);
|
||||||
|
}
|
||||||
|
w->worker->num_requests --;
|
||||||
|
log_assert(w->worker->num_requests >= 0);
|
||||||
|
w->next = w->worker->free_queries;
|
||||||
|
w->worker->free_queries = w;
|
||||||
|
}
|
||||||
|
|
||||||
/** reply to query with given error code */
|
/** reply to query with given error code */
|
||||||
static void
|
static void
|
||||||
replyerror(int r, struct worker* worker)
|
replyerror(int r, struct work_query* w)
|
||||||
{
|
{
|
||||||
ldns_buffer* buf = worker->query_reply.c->buffer;
|
ldns_buffer* buf = w->query_reply.c->buffer;
|
||||||
uint16_t flags;
|
uint16_t flags;
|
||||||
verbose(VERB_DETAIL, "reply with error");
|
verbose(VERB_DETAIL, "reply with error");
|
||||||
|
|
||||||
ldns_buffer_clear(buf);
|
ldns_buffer_clear(buf);
|
||||||
ldns_buffer_write_u16(buf, worker->query_id);
|
ldns_buffer_write_u16(buf, w->query_id);
|
||||||
flags = (uint16_t)(0x8000 | r); /* QR and retcode*/
|
flags = (uint16_t)(0x8000 | r); /* QR and retcode*/
|
||||||
flags |= (worker->query_flags & 0x0100); /* copy RD bit */
|
flags |= (w->query_flags & 0x0100); /* copy RD bit */
|
||||||
ldns_buffer_write_u16(buf, flags);
|
ldns_buffer_write_u16(buf, flags);
|
||||||
flags = 1;
|
flags = 1;
|
||||||
ldns_buffer_write_u16(buf, flags);
|
ldns_buffer_write_u16(buf, flags);
|
||||||
|
|
@ -93,17 +109,13 @@ replyerror(int r, struct worker* worker)
|
||||||
ldns_buffer_write(buf, &flags, sizeof(uint16_t));
|
ldns_buffer_write(buf, &flags, sizeof(uint16_t));
|
||||||
ldns_buffer_write(buf, &flags, sizeof(uint16_t));
|
ldns_buffer_write(buf, &flags, sizeof(uint16_t));
|
||||||
ldns_buffer_write(buf, &flags, sizeof(uint16_t));
|
ldns_buffer_write(buf, &flags, sizeof(uint16_t));
|
||||||
ldns_buffer_write(buf, worker->qinfo.qname, worker->qinfo.qnamesize);
|
ldns_buffer_write(buf, w->qinfo.qname, w->qinfo.qnamesize);
|
||||||
ldns_buffer_write_u16(buf, worker->qinfo.qtype);
|
ldns_buffer_write_u16(buf, w->qinfo.qtype);
|
||||||
ldns_buffer_write_u16(buf, worker->qinfo.qclass);
|
ldns_buffer_write_u16(buf, w->qinfo.qclass);
|
||||||
ldns_buffer_flip(buf);
|
ldns_buffer_flip(buf);
|
||||||
comm_point_send_reply(&worker->query_reply);
|
comm_point_send_reply(&w->query_reply);
|
||||||
if(worker->num_requests == 1) {
|
req_release(w);
|
||||||
/* no longer at max, start accepting again. */
|
query_info_clear(&w->qinfo);
|
||||||
listen_resume(worker->front);
|
|
||||||
}
|
|
||||||
worker->num_requests --;
|
|
||||||
query_info_clear(&worker->qinfo);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** process incoming replies from the network */
|
/** process incoming replies from the network */
|
||||||
|
|
@ -111,13 +123,12 @@ static int
|
||||||
worker_handle_reply(struct comm_point* c, void* arg, int error,
|
worker_handle_reply(struct comm_point* c, void* arg, int error,
|
||||||
struct comm_reply* ATTR_UNUSED(reply_info))
|
struct comm_reply* ATTR_UNUSED(reply_info))
|
||||||
{
|
{
|
||||||
struct worker* worker = (struct worker*)arg;
|
struct work_query* w = (struct work_query*)arg;
|
||||||
struct reply_info* rep;
|
struct reply_info* rep;
|
||||||
struct msgreply_entry* e;
|
struct msgreply_entry* e;
|
||||||
verbose(VERB_DETAIL, "reply to query with stored ID %d",
|
verbose(VERB_DETAIL, "reply to query with stored ID %d", w->query_id);
|
||||||
worker->query_id);
|
|
||||||
if(error != 0) {
|
if(error != 0) {
|
||||||
replyerror(LDNS_RCODE_SERVFAIL, worker);
|
replyerror(LDNS_RCODE_SERVFAIL, w);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
/* sanity check. */
|
/* sanity check. */
|
||||||
|
|
@ -131,7 +142,7 @@ worker_handle_reply(struct comm_point* c, void* arg, int error,
|
||||||
rep = (struct reply_info*)malloc(sizeof(struct reply_info));
|
rep = (struct reply_info*)malloc(sizeof(struct reply_info));
|
||||||
if(!rep) {
|
if(!rep) {
|
||||||
log_err("out of memory");
|
log_err("out of memory");
|
||||||
replyerror(LDNS_RCODE_SERVFAIL, worker);
|
replyerror(LDNS_RCODE_SERVFAIL, w);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
rep->replysize = ldns_buffer_limit(c->buffer) - 2; /* minus ID */
|
rep->replysize = ldns_buffer_limit(c->buffer) - 2; /* minus ID */
|
||||||
|
|
@ -140,44 +151,33 @@ worker_handle_reply(struct comm_point* c, void* arg, int error,
|
||||||
if(!rep->reply) {
|
if(!rep->reply) {
|
||||||
free(rep);
|
free(rep);
|
||||||
log_err("out of memory");
|
log_err("out of memory");
|
||||||
replyerror(LDNS_RCODE_SERVFAIL, worker);
|
replyerror(LDNS_RCODE_SERVFAIL, w);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
memmove(rep->reply, ldns_buffer_at(c->buffer, 2), rep->replysize);
|
memmove(rep->reply, ldns_buffer_at(c->buffer, 2), rep->replysize);
|
||||||
ldns_buffer_write_u16_at(worker->query_reply.c->buffer, 0,
|
ldns_buffer_write_u16_at(w->query_reply.c->buffer, 0, w->query_id);
|
||||||
worker->query_id);
|
reply_info_answer(rep, w->query_flags, w->query_reply.c->buffer);
|
||||||
reply_info_answer(rep, worker->query_flags, worker->query_reply.c->
|
comm_point_send_reply(&w->query_reply);
|
||||||
buffer);
|
req_release(w);
|
||||||
comm_point_send_reply(&worker->query_reply);
|
|
||||||
if(worker->num_requests == 1) {
|
|
||||||
/* no longer at max, start accepting again. */
|
|
||||||
listen_resume(worker->front);
|
|
||||||
}
|
|
||||||
worker->num_requests --;
|
|
||||||
/* store or update reply in the cache */
|
/* store or update reply in the cache */
|
||||||
if(!(e = query_info_entrysetup(&worker->qinfo, rep,
|
if(!(e = query_info_entrysetup(&w->qinfo, rep, w->query_hash))) {
|
||||||
worker->query_hash))) {
|
|
||||||
log_err("out of memory");
|
log_err("out of memory");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
slabhash_insert(worker->daemon->msg_cache, worker->query_hash,
|
slabhash_insert(w->worker->daemon->msg_cache, w->query_hash,
|
||||||
&e->entry, rep);
|
&e->entry, rep);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** process incoming request */
|
/** process incoming request */
|
||||||
static void
|
static void
|
||||||
worker_process_query(struct worker* worker)
|
worker_process_query(struct worker* worker, struct work_query* w)
|
||||||
{
|
{
|
||||||
/* query the forwarding address */
|
/* query the forwarding address */
|
||||||
worker->query_id = LDNS_ID_WIRE(ldns_buffer_begin(
|
verbose(VERB_DETAIL, "process_query ID %d", w->query_id);
|
||||||
worker->query_reply.c->buffer));
|
pending_udp_query(worker->back, w->query_reply.c->buffer,
|
||||||
worker->query_flags = ldns_buffer_read_u16_at(worker->
|
|
||||||
query_reply.c->buffer, 2);
|
|
||||||
verbose(VERB_DETAIL, "process_query ID %d", worker->query_id);
|
|
||||||
pending_udp_query(worker->back, worker->query_reply.c->buffer,
|
|
||||||
&worker->fwd_addr, worker->fwd_addrlen, UDP_QUERY_TIMEOUT,
|
&worker->fwd_addr, worker->fwd_addrlen, UDP_QUERY_TIMEOUT,
|
||||||
worker_handle_reply, worker, worker->rndstate);
|
worker_handle_reply, w, worker->rndstate);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** check request sanity. Returns error code, 0 OK, or -1 discard.
|
/** check request sanity. Returns error code, 0 OK, or -1 discard.
|
||||||
|
|
@ -261,6 +261,9 @@ worker_handle_request(struct comm_point* c, void* arg, int error,
|
||||||
int ret;
|
int ret;
|
||||||
hashvalue_t h;
|
hashvalue_t h;
|
||||||
struct lruhash_entry* e;
|
struct lruhash_entry* e;
|
||||||
|
struct query_info qinfo;
|
||||||
|
struct work_query* w;
|
||||||
|
|
||||||
verbose(VERB_DETAIL, "worker handle request");
|
verbose(VERB_DETAIL, "worker handle request");
|
||||||
if(error != NETEVENT_NOERROR) {
|
if(error != NETEVENT_NOERROR) {
|
||||||
log_err("called with err=%d", error);
|
log_err("called with err=%d", error);
|
||||||
|
|
@ -275,27 +278,17 @@ worker_handle_request(struct comm_point* c, void* arg, int error,
|
||||||
comm_point_drop_reply(repinfo);
|
comm_point_drop_reply(repinfo);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
if(worker->num_requests > 0) {
|
|
||||||
/* we could get this due to a slow tcp incoming query,
|
|
||||||
that started before we performed listen_pushback */
|
|
||||||
verbose(VERB_DETAIL, "worker: too many incoming requests "
|
|
||||||
"active. dropping incoming query.");
|
|
||||||
comm_point_drop_reply(repinfo);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
/* see if query is in the cache */
|
/* see if query is in the cache */
|
||||||
if(!query_info_parse(&worker->qinfo, c->buffer)) {
|
if(!query_info_parse(&qinfo, c->buffer)) {
|
||||||
LDNS_QR_SET(ldns_buffer_begin(c->buffer));
|
LDNS_QR_SET(ldns_buffer_begin(c->buffer));
|
||||||
LDNS_RCODE_SET(ldns_buffer_begin(c->buffer),
|
LDNS_RCODE_SET(ldns_buffer_begin(c->buffer),
|
||||||
LDNS_RCODE_FORMERR);
|
LDNS_RCODE_FORMERR);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
h = query_info_hash(&worker->qinfo);
|
h = query_info_hash(&qinfo);
|
||||||
if((e=slabhash_lookup(worker->daemon->msg_cache, h, &worker->qinfo,
|
if((e=slabhash_lookup(worker->daemon->msg_cache, h, &qinfo, 0))) {
|
||||||
0))) {
|
|
||||||
/* answer from cache */
|
/* answer from cache */
|
||||||
log_info("answer from the cache");
|
log_info("answer from the cache");
|
||||||
query_info_clear(&worker->qinfo);
|
|
||||||
/* id is still in the buffer, no need to touch it */
|
/* id is still in the buffer, no need to touch it */
|
||||||
reply_info_answer((struct reply_info*)e->data,
|
reply_info_answer((struct reply_info*)e->data,
|
||||||
ldns_buffer_read_u16_at(c->buffer, 2), c->buffer);
|
ldns_buffer_read_u16_at(c->buffer, 2), c->buffer);
|
||||||
|
|
@ -303,16 +296,40 @@ worker_handle_request(struct comm_point* c, void* arg, int error,
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
ldns_buffer_rewind(c->buffer);
|
ldns_buffer_rewind(c->buffer);
|
||||||
|
/* perform memory allocation(s) */
|
||||||
|
if(!query_info_allocqname(&qinfo)) {
|
||||||
|
comm_point_drop_reply(repinfo);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/* answer it */
|
/* grab a work request structure for this new request */
|
||||||
worker->query_hash = h;
|
if(!worker->free_queries) {
|
||||||
|
/* we could get this due to a slow tcp incoming query,
|
||||||
|
that started before we performed listen_pushback */
|
||||||
|
verbose(VERB_DETAIL, "worker: too many incoming requests "
|
||||||
|
"active. dropping incoming query.");
|
||||||
|
comm_point_drop_reply(repinfo);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
w = worker->free_queries;
|
||||||
|
worker->free_queries = w->next;
|
||||||
worker->num_requests ++;
|
worker->num_requests ++;
|
||||||
if(worker->num_requests >= 1) {
|
log_assert(worker->num_requests <= worker->request_size);
|
||||||
|
if(worker->num_requests == worker->request_size) {
|
||||||
/* the max request number has been reached, stop accepting */
|
/* the max request number has been reached, stop accepting */
|
||||||
listen_pushback(worker->front);
|
listen_pushback(worker->front);
|
||||||
}
|
}
|
||||||
memcpy(&worker->query_reply, repinfo, sizeof(struct comm_reply));
|
|
||||||
worker_process_query(worker);
|
/* init request */
|
||||||
|
w->next = NULL;
|
||||||
|
w->query_hash = h;
|
||||||
|
memcpy(&w->query_reply, repinfo, sizeof(struct comm_reply));
|
||||||
|
memcpy(&w->qinfo, &qinfo, sizeof(struct query_info));
|
||||||
|
w->query_id = LDNS_ID_WIRE(ldns_buffer_begin(c->buffer));
|
||||||
|
w->query_flags = ldns_buffer_read_u16_at(c->buffer, 2);
|
||||||
|
|
||||||
|
/* answer it */
|
||||||
|
worker_process_query(worker, w);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -377,6 +394,22 @@ worker_create(struct daemon* daemon, int id)
|
||||||
return worker;
|
return worker;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** create request handling structures */
|
||||||
|
static int
|
||||||
|
reqs_init(struct worker* worker)
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
for(i=0; i<worker->request_size; i++) {
|
||||||
|
struct work_query* q = (struct work_query*)calloc(1,
|
||||||
|
sizeof(struct work_query));
|
||||||
|
if(!q) return 0;
|
||||||
|
q->worker = worker;
|
||||||
|
q->next = worker->free_queries;
|
||||||
|
worker->free_queries = q;
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
worker_init(struct worker* worker, struct config_file *cfg,
|
worker_init(struct worker* worker, struct config_file *cfg,
|
||||||
struct listen_port* ports, size_t buffer_size, int do_sigs)
|
struct listen_port* ports, size_t buffer_size, int do_sigs)
|
||||||
|
|
@ -451,6 +484,11 @@ worker_init(struct worker* worker, struct config_file *cfg,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
worker->request_size = 1;
|
||||||
|
if(!reqs_init(worker)) {
|
||||||
|
worker_delete(worker);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/* set forwarder address */
|
/* set forwarder address */
|
||||||
if(cfg->fwd_address && cfg->fwd_address[0]) {
|
if(cfg->fwd_address && cfg->fwd_address[0]) {
|
||||||
|
|
|
||||||
|
|
@ -64,6 +64,23 @@ enum worker_commands {
|
||||||
worker_cmd_quit
|
worker_cmd_quit
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/** information per query that is in processing */
|
||||||
|
struct work_query {
|
||||||
|
/** next query in freelist */
|
||||||
|
struct work_query* next;
|
||||||
|
/** the worker for this query */
|
||||||
|
struct worker* worker;
|
||||||
|
/** the query reply destination, packet buffer and where to send. */
|
||||||
|
struct comm_reply query_reply;
|
||||||
|
/** the query_info structure from the query */
|
||||||
|
struct query_info qinfo;
|
||||||
|
/** hash value of the query qinfo */
|
||||||
|
hashvalue_t query_hash;
|
||||||
|
/** id of query */
|
||||||
|
uint16_t query_id;
|
||||||
|
/** flags uint16 from query */
|
||||||
|
uint16_t query_flags;
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Structure holding working information for unbound.
|
* Structure holding working information for unbound.
|
||||||
|
|
@ -93,16 +110,10 @@ struct worker {
|
||||||
|
|
||||||
/** number of requests currently active */
|
/** number of requests currently active */
|
||||||
int num_requests;
|
int num_requests;
|
||||||
/** our one and only query, packet buffer and where to send. */
|
/** number of requests that can be handled by this worker */
|
||||||
struct comm_reply query_reply;
|
int request_size;
|
||||||
/** id of query */
|
/** the free working queries */
|
||||||
uint16_t query_id;
|
struct work_query* free_queries;
|
||||||
/** flags uint16 from query */
|
|
||||||
uint16_t query_flags;
|
|
||||||
/** the query_info structure from the query */
|
|
||||||
struct query_info qinfo;
|
|
||||||
/** hash value of the query qinfo */
|
|
||||||
hashvalue_t query_hash;
|
|
||||||
|
|
||||||
/** address to forward to */
|
/** address to forward to */
|
||||||
struct sockaddr_storage fwd_addr;
|
struct sockaddr_storage fwd_addr;
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@
|
||||||
- created test that checks if items drop out of the cache.
|
- created test that checks if items drop out of the cache.
|
||||||
- added word 'partitioned hash table' to documentation on slab hash.
|
- added word 'partitioned hash table' to documentation on slab hash.
|
||||||
A slab hash is a partitioned hash table.
|
A slab hash is a partitioned hash table.
|
||||||
|
- worker can handle multiple queries at a time.
|
||||||
|
|
||||||
26 March 2007: Wouter
|
26 March 2007: Wouter
|
||||||
- config settings for slab hash message cache.
|
- config settings for slab hash message cache.
|
||||||
|
|
|
||||||
|
|
@ -79,22 +79,25 @@ int query_info_parse(struct query_info* m, ldns_buffer* query)
|
||||||
log_assert(ldns_buffer_position(query) == 0);
|
log_assert(ldns_buffer_position(query) == 0);
|
||||||
m->has_cd = (int)LDNS_CD_WIRE(q);
|
m->has_cd = (int)LDNS_CD_WIRE(q);
|
||||||
ldns_buffer_skip(query, LDNS_HEADER_SIZE);
|
ldns_buffer_skip(query, LDNS_HEADER_SIZE);
|
||||||
q = ldns_buffer_current(query);
|
m->qname = ldns_buffer_current(query);
|
||||||
if((m->qnamesize = query_dname_len(query)) == 0)
|
if((m->qnamesize = query_dname_len(query)) == 0)
|
||||||
return 0; /* parse error */
|
return 0; /* parse error */
|
||||||
|
if(ldns_buffer_remaining(query) < 4)
|
||||||
|
return 0; /* need qtype, qclass */
|
||||||
|
m->qtype = ldns_buffer_read_u16(query);
|
||||||
|
m->qclass = ldns_buffer_read_u16(query);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
query_info_allocqname(struct query_info* m)
|
||||||
|
{
|
||||||
|
uint8_t* q = m->qname;
|
||||||
if(!(m->qname = (uint8_t*)malloc(m->qnamesize))) {
|
if(!(m->qname = (uint8_t*)malloc(m->qnamesize))) {
|
||||||
log_err("query_info_parse: out of memory");
|
log_err("query_info_allocqname: out of memory");
|
||||||
return 0; /* out of memory */
|
return 0; /* out of memory */
|
||||||
}
|
}
|
||||||
memmove(m->qname, q, m->qnamesize);
|
memmove(m->qname, q, m->qnamesize);
|
||||||
|
|
||||||
if(ldns_buffer_remaining(query) < 4) {
|
|
||||||
free(m->qname);
|
|
||||||
m->qname = NULL;
|
|
||||||
return 0; /* need qtype, qclass */
|
|
||||||
}
|
|
||||||
m->qtype = ldns_buffer_read_u16(query);
|
|
||||||
m->qclass = ldns_buffer_read_u16(query);
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -94,6 +94,13 @@ struct msgreply_entry {
|
||||||
*/
|
*/
|
||||||
int query_info_parse(struct query_info* m, ldns_buffer* query);
|
int query_info_parse(struct query_info* m, ldns_buffer* query);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate and copy the qname (obtained from query_info_parse()).
|
||||||
|
* @param m: the queryinfo structure.
|
||||||
|
* @return: 0 on alloc failure.
|
||||||
|
*/
|
||||||
|
int query_info_allocqname(struct query_info* m);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compare two queryinfo structures, on query,
|
* Compare two queryinfo structures, on query,
|
||||||
* The qname is _not_ sorted in canonical ordering.
|
* The qname is _not_ sorted in canonical ordering.
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue