diff --git a/daemon/worker.c b/daemon/worker.c index cffaecc43..f1fcc45e1 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -1675,6 +1675,7 @@ worker_init(struct worker* worker, struct config_file *cfg, worker->env.worker_base = worker->base; worker->env.send_query = &worker_send_query; worker->env.alloc = &worker->alloc; + worker->env.outnet = worker->back; worker->env.rnd = worker->rndstate; /* If case prefetch is triggered, the corresponding mesh will clear * the scratchpad for the module env in the middle of request handling. diff --git a/libunbound/libworker.c b/libunbound/libworker.c index 301a3e856..326603c2c 100644 --- a/libunbound/libworker.c +++ b/libunbound/libworker.c @@ -236,6 +236,7 @@ libworker_setup(struct ub_ctx* ctx, int is_bg, struct ub_event_base* eb) cfg->outgoing_tcp_mss, &libworker_alloc_cleanup, w, cfg->do_udp || cfg->udp_upstream_without_downstream, w->sslctx, cfg->delay_close, NULL); + w->env->outnet = w->back; if(!w->is_bg || w->is_bg_thread) { lock_basic_unlock(&ctx->cfglock); } diff --git a/services/authzone.c b/services/authzone.c index e795ed5fb..92566c2df 100644 --- a/services/authzone.c +++ b/services/authzone.c @@ -54,7 +54,10 @@ #include "util/config_file.h" #include "util/log.h" #include "util/module.h" +#include "util/random.h" #include "services/cache/dns.h" +#include "services/outside_network.h" +#include "services/listen_dnsport.h" #include "sldns/rrdef.h" #include "sldns/pkthdr.h" #include "sldns/sbuffer.h" @@ -69,6 +72,13 @@ /** max number of CNAMEs we are willing to follow (in one answer) */ #define MAX_CNAME_CHAIN 8 +/** pick up nextprobe task to start waiting to perform transfer actions */ +static void xfr_set_timeout(struct auth_xfer* xfr, struct module_env* env, + int failure); +/** move to sending the probe packets, next if fails. task_probe */ +static void xfr_probe_send_or_end(struct auth_xfer* xfr, + struct module_env* env); + /** create new dns_msg */ static struct dns_msg* msg_create(struct regional* region, struct query_info* qinfo) @@ -2755,32 +2765,400 @@ xfr_master_start(struct auth_xfer* xfr) return 0; } -/** xfer nextprobe timeout callback */ -void auth_xfer_timer(void* arg) +/** true if at end of list, task_probe */ +static int +xfr_probe_end_of_list(struct auth_xfer* xfr) { - struct auth_xfer* xfr = (struct auth_xfer*)arg; - log_assert(xfr->task_nextprobe); + return !xfr->task_probe->scan_specific && !xfr->task_probe->scan_target; +} - /* see if zone has expired, and if so, also set auth_zone expired */ - if(xfr->have_zone && !xfr->zone_expired && - *(xfr->task_nextprobe->env->now) >= xfr->task_nextprobe->lease_time - + xfr->expiry) { - auth_xfer_set_expired(xfr, xfr->task_nextprobe->env, 1); +/** move to next master in list, task_probe */ +static int +xfr_probe_nextmaster(struct auth_xfer* xfr) +{ + if(!xfr->task_probe->scan_specific && !xfr->task_probe->scan_target) + return 0; + if(xfr->task_probe->scan_specific) { + xfr->task_probe->scan_specific = NULL; + xfr->task_probe->scan_target = xfr->task_probe->masters; + return 1; + } + if(!xfr->task_probe->scan_target) + return 0; + if(!xfr->task_probe->scan_target->next) + return 0; + xfr->task_probe->scan_target = xfr->task_probe->scan_target->next; + return 1; +} + +/** find out if the master is ipv6 */ +static int +master_is_ip6(struct auth_master* master) +{ + if(str_is_ip6(master->host)) + return 1; + /* TODO other addrs */ + return 0; +} + +/** create fd to send to this master */ +static int +xfr_fd_for_master(struct module_env* env, struct auth_master* master) +{ + struct sockaddr_storage* addr; + socklen_t addrlen; + int i; + int try; + + /* select interface */ + if(master_is_ip6(master)) { + if(env->outnet->num_ip6 == 0) { + log_err("need ipv6 to send, but no ipv6 outgoing interfaces, for %s", master->host); + return -1; + } + i = ub_random_max(env->rnd, env->outnet->num_ip6); + addr = &env->outnet->ip6_ifs[i].addr; + addrlen = env->outnet->ip6_ifs[i].addrlen; + } else { + if(env->outnet->num_ip4 == 0) { + log_err("need ipv4 to send, but no ipv4 outgoing interfaces, for %s", master->host); + return -1; + } + i = ub_random_max(env->rnd, env->outnet->num_ip4); + addr = &env->outnet->ip4_ifs[i].addr; + addrlen = env->outnet->ip4_ifs[i].addrlen; } - /* see if we need to start a probe (or maybe it is already in - * progress (due to notify)) */ + /* create fd */ + for(try = 0; try<1000; try++) { + int freebind = 0; + int noproto = 0; + int inuse = 0; + int port = ub_random(env->rnd)&0xffff; + int fd = -1; + if(master_is_ip6(master)) { + struct sockaddr_in6 sa = *(struct sockaddr_in6*)addr; + sa.sin6_port = (in_port_t)htons((uint16_t)port); + fd = create_udp_sock(AF_INET6, SOCK_DGRAM, + (struct sockaddr*)&sa, addrlen, 1, &inuse, &noproto, + 0, 0, 0, NULL, 0, freebind, 0); + } else { + struct sockaddr_in* sa = (struct sockaddr_in*)addr; + sa->sin_port = (in_port_t)htons((uint16_t)port); + fd = create_udp_sock(AF_INET, SOCK_DGRAM, + (struct sockaddr*)&sa, addrlen, 1, &inuse, &noproto, + 0, 0, 0, NULL, 0, freebind, 0); + } + if(fd != -1) { + return fd; + } + if(!inuse) { + return -1; + } + } + /* too many tries */ + log_err("cannot send probe, ports are in use"); + return -1; +} + +/** create probe packet for xfr */ +static void +xfr_create_probe_packet(struct auth_xfer* xfr, struct module_env* env, + sldns_buffer* buf, int soa) +{ + struct query_info qinfo; + uint32_t serial; + int have_zone; + lock_basic_lock(&xfr->lock); + have_zone = xfr->have_zone; + serial = xfr->serial; + lock_basic_unlock(&xfr->lock); + + memset(&qinfo, 0, sizeof(qinfo)); + qinfo.qname = xfr->name; + qinfo.qname_len = xfr->namelen; + if(soa) { + qinfo.qtype = LDNS_RR_TYPE_SOA; + } else { + qinfo.qtype = LDNS_RR_TYPE_IXFR; + if(!have_zone) + qinfo.qtype = LDNS_RR_TYPE_AXFR; + } + qinfo.qclass = xfr->dclass; + qinfo_query_encode(buf, &qinfo); + xfr->task_probe->id = ub_random(env->rnd)&0xffff; + sldns_buffer_write_at(buf, 0, &xfr->task_probe->id, 2); + + /* append serial for IXFR */ + if(qinfo.qtype == LDNS_RR_TYPE_IXFR) { + sldns_buffer_set_position(buf, sldns_buffer_limit(buf)); + /* auth section count 1 */ + sldns_buffer_write_u16_at(buf, 1, LDNS_NSCOUNT_OFF); + /* write SOA */ + sldns_buffer_write_u8(buf, 0xC0); /* compressed ptr to qname */ + sldns_buffer_write_u8(buf, 0x0C); + sldns_buffer_write_u16(buf, qinfo.qtype); + sldns_buffer_write_u16(buf, qinfo.qclass); + sldns_buffer_write_u32(buf, 0); /* ttl */ + sldns_buffer_write_u16(buf, 22); /* rdata length */ + sldns_buffer_write_u8(buf, 0); /* . */ + sldns_buffer_write_u8(buf, 0); /* . */ + sldns_buffer_write_u32(buf, serial); /* serial */ + sldns_buffer_write_u32(buf, 0); /* refresh */ + sldns_buffer_write_u32(buf, 0); /* retry */ + sldns_buffer_write_u32(buf, 0); /* expire */ + sldns_buffer_write_u32(buf, 0); /* minimum */ + sldns_buffer_flip(buf); + } +} + +/** check if returned packet is OK */ +static int +check_packet_ok(sldns_buffer* pkt, uint16_t qtype, struct auth_xfer* xfr, + uint32_t* serial) +{ + uint16_t id; + /* TODO parse to see if packet worked, valid reply */ + + /* check serial number of SOA */ + if(sldns_buffer_limit(pkt) < LDNS_HEADER_SIZE) + return 0; + + /* check ID */ + sldns_buffer_read_at(pkt, 0, &id, 2); + if(id != xfr->task_probe->id) + return 0; + + /* check flag bits and rcode */ + if(!LDNS_QR_WIRE(sldns_buffer_begin(pkt))) + return 0; + if(LDNS_OPCODE_WIRE(sldns_buffer_begin(pkt)) != LDNS_PACKET_QUERY) + return 0; + if(LDNS_RCODE_WIRE(sldns_buffer_begin(pkt)) != LDNS_RCODE_NOERROR) + return 0; + + /* check qname */ + if(LDNS_QDCOUNT(sldns_buffer_begin(pkt)) != 1) + return 0; + sldns_buffer_skip(pkt, LDNS_HEADER_SIZE); + if(sldns_buffer_remaining(pkt) < xfr->namelen) + return 0; + if(query_dname_compare(sldns_buffer_current(pkt), xfr->name) != 0) + return 0; + sldns_buffer_skip(pkt, xfr->namelen); + + /* check qtype, qclass */ + if(sldns_buffer_remaining(pkt) < 4) + return 0; + if(sldns_buffer_read_u16(pkt) != qtype) + return 0; + if(sldns_buffer_read_u16(pkt) != xfr->dclass) + return 0; + + if(serial) { + uint16_t rdlen; + /* read serial number, from answer section SOA */ + if(LDNS_ANCOUNT(sldns_buffer_begin(pkt)) == 0) + return 0; + /* read from first record SOA record */ + if(sldns_buffer_remaining(pkt) < 1) + return 0; + if(dname_pkt_compare(pkt, sldns_buffer_current(pkt), + xfr->name) != 0) + return 0; + if(!pkt_dname_len(pkt)) + return 0; + /* type, class, ttl, rdatalen */ + if(sldns_buffer_remaining(pkt) < 4+4+2) + return 0; + if(sldns_buffer_read_u16(pkt) != qtype) + return 0; + if(sldns_buffer_read_u16(pkt) != xfr->dclass) + return 0; + sldns_buffer_skip(pkt, 4); /* ttl */ + rdlen = sldns_buffer_read_u16(pkt); + if(sldns_buffer_remaining(pkt) < rdlen) + return 0; + if(sldns_buffer_remaining(pkt) < 1) + return 0; + if(!pkt_dname_len(pkt)) /* soa name */ + return 0; + if(sldns_buffer_remaining(pkt) < 1) + return 0; + if(!pkt_dname_len(pkt)) /* soa name */ + return 0; + if(sldns_buffer_remaining(pkt) < 20) + return 0; + *serial = sldns_buffer_read_u32(pkt); + } + return 1; +} + +/** callback for task_probe udp packets */ +int +auth_xfer_probe_udp_callback(struct comm_point* c, void* arg, int err, + struct comm_reply* repinfo) +{ + struct auth_xfer* xfr = (struct auth_xfer*)arg; + struct module_env* env; + log_assert(xfr->task_probe); + env = xfr->task_probe->env; + + (void)c; + (void)repinfo; + /* TODO need a comm_timer for a timeout too */ /* TODO */ - /* if we don't end up resetting the timer, delete it, because - * the next worker to pick this up does not have the same - * event base */ + if(err == NETEVENT_NOERROR) { + uint32_t serial; + if(check_packet_ok(c->buffer, LDNS_RR_TYPE_SOA, xfr, + &serial)) { + /* successful lookup */ + /* TODO */ + } + } + + /* failed lookup */ + /* remove the commpoint */ + comm_point_delete(c); + xfr->task_probe->cp = NULL; + /* the comm_point_udp_callback is in a for loop for NUM_UDP_PER_SELECT + * and we set rep.c=NULL to stop if from looking inside the commpoint*/ + repinfo->c = NULL; + + /* create a new commpoint (new fd, address), for next packet, maybe */ + /* this, if the result was not a successfull probe, we need + * to send the next one */ + xfr_probe_send_or_end(xfr, env); + return 0; +} + +/** send the UDP probe to the master, this is part of task_probe */ +static int +xfr_probe_send_probe(struct auth_xfer* xfr, struct module_env* env) +{ + struct sockaddr_storage addr; + socklen_t addrlen = 0; + /* pick master */ + struct auth_master* master = xfr->task_probe->scan_specific; + if(!master) master = xfr->task_probe->scan_target; + if(!master) return 0; + + /* create packet */ + xfr_create_probe_packet(xfr, env, env->scratch_buffer, 1); + if(!xfr->task_probe->cp) { + int fd = xfr_fd_for_master(env, master); + if(fd == -1) { + char zname[255+1]; + dname_str(xfr->name, zname); + log_err("cannot create fd for probe %s to %s", + zname, master->host); + return 0; + } + xfr->task_probe->cp = comm_point_create_udp(env->worker_base, + fd, env->outnet->udp_buff, auth_xfer_probe_udp_callback, + xfr); + if(!xfr->task_probe->cp) { + close(fd); + log_err("malloc failure"); + return 0; + } + } + + /* get master addr */ + if(!extstrtoaddr(master->host, &addr, &addrlen)) { + log_err("cannot parse master %s", master->host); + return 0; + } + + /* send udp packet */ + if(!comm_point_send_udp_msg(xfr->task_probe->cp, env->scratch_buffer, + (struct sockaddr*)&addr, addrlen)) { + char zname[255+1]; + dname_str(xfr->name, zname); + verbose(VERB_ALGO, "failed to send soa probe for %s to %s", + zname, master->host); + return 0; + } + + return 1; +} + +/** move to sending the probe packets, next if fails. task_probe */ +static void +xfr_probe_send_or_end(struct auth_xfer* xfr, struct module_env* env) +{ + while(!xfr_probe_end_of_list(xfr)) { + if(xfr_probe_send_probe(xfr, env)) { + /* successfully sent probe, wait for callback */ + return; + } + /* failed to send probe, next master */ + if(!xfr_probe_nextmaster(xfr)) { + break; + } + } + + lock_basic_lock(&xfr->lock); + /* we failed to send this as well, move to the wait task, + * use the shorter retry timeout */ + comm_point_delete(xfr->task_probe->cp); + xfr->task_probe->cp = NULL; + /* we don't own this item anymore */ + xfr->task_probe->worker = NULL; + xfr->task_probe->env = NULL; + + /* pick up the nextprobe task and wait */ + xfr_set_timeout(xfr, env, 1); + lock_basic_unlock(&xfr->lock); +} + +/** xfer nextprobe timeout callback, this is part of task_nextprobe */ +void +auth_xfer_timer(void* arg) +{ + struct auth_xfer* xfr = (struct auth_xfer*)arg; + struct module_env* env; + log_assert(xfr->task_nextprobe); + env = xfr->task_nextprobe->env; + + /* see if zone has expired, and if so, also set auth_zone expired */ + lock_basic_lock(&xfr->lock); + if(xfr->have_zone && !xfr->zone_expired && + *env->now >= xfr->task_nextprobe->lease_time + xfr->expiry) { + lock_basic_unlock(&xfr->lock); + auth_xfer_set_expired(xfr, env, 1); + lock_basic_lock(&xfr->lock); + } + + /* delete the timer, because the next worker to pick this up may + * not have the same event base */ comm_timer_delete(xfr->task_nextprobe->timer); xfr->task_nextprobe->timer = NULL; xfr->task_nextprobe->next_probe = 0; /* we don't own this item anymore */ xfr->task_nextprobe->worker = NULL; xfr->task_nextprobe->env = NULL; + + /* see if we need to start a probe (or maybe it is already in + * progress (due to notify)) */ + if(xfr->task_probe->worker == NULL) { + /* pick up the probe task ourselves */ + xfr->task_probe->worker = env->worker; + lock_basic_unlock(&xfr->lock); + xfr->task_probe->env = env; + xfr->task_probe->cp = NULL; + + /* start the task */ + /* timeout, no specific (notified) host to scan */ + xfr->task_probe->scan_specific = NULL; + /* pick up first scan target */ + xfr->task_probe->scan_target = xfr->task_probe->masters; + /* send the probe packet or next send, or end task */ + xfr_probe_send_or_end(xfr, env); + } else { + lock_basic_unlock(&xfr->lock); + } } /** for task_nextprobe. @@ -2886,7 +3264,6 @@ auth_xfer_new(struct auth_zone* z) free(xfr); return NULL; } - xfr->task_probe->workernum = -1; xfr->task_transfer = (struct auth_transfer*)calloc(1, sizeof(struct auth_transfer)); if(!xfr->task_transfer) { @@ -2896,16 +3273,15 @@ auth_xfer_new(struct auth_zone* z) free(xfr); return NULL; } - xfr->task_transfer->workernum = -1; lock_basic_init(&xfr->lock); lock_protect(&xfr->lock, xfr, sizeof(*xfr)); - lock_protect(&xfr->lock, &xfr->task_nextprobe->workernum, - sizeof(xfr->task_nextprobe->workernum)); - lock_protect(&xfr->lock, &xfr->task_probe->workernum, - sizeof(xfr->task_probe->workernum)); - lock_protect(&xfr->lock, &xfr->task_transfer->workernum, - sizeof(xfr->task_transfer->workernum)); + lock_protect(&xfr->lock, &xfr->task_nextprobe->worker, + sizeof(xfr->task_nextprobe->worker)); + lock_protect(&xfr->lock, &xfr->task_probe->worker, + sizeof(xfr->task_probe->worker)); + lock_protect(&xfr->lock, &xfr->task_transfer->worker, + sizeof(xfr->task_transfer->worker)); return xfr; } diff --git a/services/authzone.h b/services/authzone.h index 152704f5f..c724a9a1e 100644 --- a/services/authzone.h +++ b/services/authzone.h @@ -56,6 +56,7 @@ struct module_env; struct worker; struct comm_point; struct comm_timer; +struct comm_reply; struct auth_rrset; struct auth_nextprobe; struct auth_probe; @@ -275,10 +276,10 @@ struct auth_nextprobe { * waiting uninterrupted). */ struct auth_probe { - /** worker num (or -1 unowned) that is performing this task */ - int workernum; - /* Worker pointer. Used by the worker during callbacks. */ + /* Worker pointer. NULL means unowned. */ struct worker* worker; + /* module env for this task */ + struct module_env* env; /** list of upstream masters for this zone, from config */ struct auth_master* masters; @@ -290,6 +291,8 @@ struct auth_probe { * or NULL if not working on sequential scan */ struct auth_master* scan_target; + /** dns id of packet in flight */ + uint16_t id; /** the SOA probe udp event. * on the workers event base. */ struct comm_point* cp; @@ -301,9 +304,7 @@ struct auth_probe { * with failure or success. If failure, use shorter timeout for wait time. */ struct auth_transfer { - /** worker num (or -1 unowned) that is performing this task */ - int workernum; - /* Worker pointer. Used by the worker during callbacks. */ + /* Worker pointer. NULL means unowned. */ struct worker* worker; /** xfer data that has been transferred, the data is applied @@ -488,4 +489,12 @@ struct auth_xfer* auth_xfer_create(struct auth_zones* az, struct auth_zone* z); */ int xfer_set_masters(struct auth_master** list, struct config_auth* c); + +/** xfer nextprobe timeout callback, this is part of task_nextprobe */ +void auth_xfer_timer(void* arg); + +/** callback for commpoint udp replies to task_probe */ +int auth_xfer_probe_udp_callback(struct comm_point* c, void* arg, int err, + struct comm_reply* repinfo); + #endif /* SERVICES_AUTHZONE_H */ diff --git a/util/fptr_wlist.c b/util/fptr_wlist.c index 35268a36e..eaf6d9cc3 100644 --- a/util/fptr_wlist.c +++ b/util/fptr_wlist.c @@ -98,6 +98,7 @@ fptr_whitelist_comm_point(comm_point_callback_type *fptr) else if(fptr == &outnet_udp_cb) return 1; else if(fptr == &outnet_tcp_cb) return 1; else if(fptr == &tube_handle_listen) return 1; + else if(fptr == &auth_xfer_probe_udp_callback) return 1; return 0; } @@ -122,6 +123,7 @@ fptr_whitelist_comm_timer(void (*fptr)(void*)) #ifdef UB_ON_WINDOWS else if(fptr == &wsvc_cron_cb) return 1; #endif + else if(fptr == &auth_xfer_timer) return 1; return 0; } diff --git a/util/module.h b/util/module.h index 90894d5ec..73db994bd 100644 --- a/util/module.h +++ b/util/module.h @@ -168,6 +168,7 @@ struct regional; struct worker; struct comm_base; struct auth_zones; +struct outside_network; struct module_qstate; struct ub_randstate; struct mesh_area; @@ -449,6 +450,8 @@ struct module_env { struct worker* worker; /** the worker event base */ struct comm_base* worker_base; + /** the outside network */ + struct outside_network* outnet; /** mesh area with query state dependencies */ struct mesh_area* mesh; /** allocation service */ diff --git a/util/netevent.c b/util/netevent.c index 5965a2d9a..7c0ced83b 100644 --- a/util/netevent.c +++ b/util/netevent.c @@ -648,7 +648,7 @@ comm_point_udp_ancil_callback(int fd, short event, void* arg) (void)comm_point_send_udp_msg_if(rep.c, rep.c->buffer, (struct sockaddr*)&rep.addr, rep.addrlen, &rep); } - if(rep.c->fd == -1) /* commpoint closed */ + if(!rep.c || rep.c->fd == -1) /* commpoint closed */ break; } #else @@ -711,7 +711,7 @@ comm_point_udp_callback(int fd, short event, void* arg) (void)comm_point_send_udp_msg(rep.c, buffer, (struct sockaddr*)&rep.addr, rep.addrlen); } - if(rep.c->fd != fd) /* commpoint closed to -1 or reused for + if(!rep.c || rep.c->fd != fd) /* commpoint closed to -1 or reused for another UDP port. Note rep.c cannot be reused with TCP fd. */ break; }