TCP outgoing services.

git-svn-id: file:///svn/unbound/trunk@294 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
Wouter Wijngaards 2007-05-08 13:25:21 +00:00
parent ebd337689a
commit 6c3c370b2a
13 changed files with 420 additions and 19 deletions

View file

@ -672,7 +672,8 @@ worker_init(struct worker* worker, struct config_file *cfg,
cfg->outgoing_num_ports * worker->thread_num;
worker->back = outside_network_create(worker->base,
buffer_size, (size_t)cfg->outgoing_num_ports, cfg->ifs,
cfg->num_ifs, cfg->do_ip4, cfg->do_ip6, startport);
cfg->num_ifs, cfg->do_ip4, cfg->do_ip6, startport,
cfg->do_tcp?cfg->outgoing_num_tcp:0);
if(!worker->back) {
log_err("could not create outgoing sockets");
worker_delete(worker);

View file

@ -1,3 +1,10 @@
8 May 2007: Wouter
- outgoing network keeps list of available tcp buffers for outgoing
tcp queries.
- outgoing-num-tcp config option.
- outgoing network keeps waiting list of queries waiting for buffer.
- netevent supports outgoing tcp commpoints, nonblocking connects.
7 May 2007: Wouter
- EDNS read from query, used to make reply smaller.
- advertised edns value constants.

View file

@ -39,6 +39,9 @@ server:
# But also takes more system resources (for open sockets).
# outgoing-range: 16
# number of outgoing simultaneous tcp buffers to hold per thread.
# outgoing-num-tcp: 10
# the amount of memory to use for the message cache.
# in bytes. default is 4 Mb
# msg-cache-size: 4194304

View file

@ -63,6 +63,9 @@ Number of ports to open. This number is opened per thread for every outgoing
query interface. Must be at least 1. Default is 16.
Larger numbers give more protection against spoofing attempts, but need
extra resources from the operating system.
.It \fBoutgoing-num-tcp:\fR <number>
Number of outgoing TCP buffers to allocate per thread. Default is 10. If set
to 0, or if do_tcp is "no", no TCP queries to authoritative servers are done.
.It \fBmsg-cache-size:\fR <number>
Number of bytes size of the message cache. Default is 4 megabytes.
.It \fBmsg-cache-slabs:\fR <number>

View file

@ -109,6 +109,98 @@ pending_cmp(const void* key1, const void* key2)
}
}
/** use next free buffer to service a tcp query */
static void
outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt)
{
struct pending_tcp* pend = w->outnet->tcp_free;
int s;
log_assert(pend);
log_assert(pkt);
/* open socket */
#ifndef INET6
if(addr_is_ip6(addr))
s = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP);
else
#endif
s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if(s == -1) {
log_err("outgoing tcp: socket: %s", strerror(errno));
log_addr(&w->addr, w->addrlen);
(void)(*w->cb)(NULL, w->cb_arg, NETEVENT_CLOSED, NULL);
free(w);
return;
}
fd_set_nonblock(s);
if(connect(s, (struct sockaddr*)&w->addr, w->addrlen) == -1) {
if(errno != EINPROGRESS) {
log_err("outgoing tcp: connect: %s", strerror(errno));
log_addr(&w->addr, w->addrlen);
close(s);
(void)(*w->cb)(NULL, w->cb_arg, NETEVENT_CLOSED, NULL);
free(w);
return;
}
}
w->pkt = NULL;
w->next_waiting = (void*)pend;
memmove(&pend->id, pkt, sizeof(uint16_t));
w->outnet->tcp_free = pend->next_free;
pend->next_free = NULL;
pend->query = w;
ldns_buffer_clear(pend->c->buffer);
ldns_buffer_write(pend->c->buffer, pkt, w->pkt_len);
ldns_buffer_flip(pend->c->buffer);
pend->c->tcp_is_reading = 0;
pend->c->tcp_byte_count = 0;
comm_point_start_listening(pend->c, s, -1);
return;
}
/** see if buffers can be used to service TCP queries. */
static void
use_free_buffer(struct outside_network* outnet)
{
struct waiting_tcp* w;
while(outnet->tcp_free && outnet->tcp_wait_first) {
w = outnet->tcp_wait_first;
outnet->tcp_wait_first = w->next_waiting;
if(outnet->tcp_wait_last == w)
outnet->tcp_wait_last = NULL;
outnet_tcp_take_into_use(w, w->pkt);
}
}
/** callback for pending tcp connections */
static int
outnet_tcp_cb(struct comm_point* c, void* arg, int error,
struct comm_reply *reply_info)
{
struct pending_tcp* pend = (struct pending_tcp*)arg;
struct outside_network* outnet = pend->query->outnet;
verbose(VERB_ALGO, "outnettcp cb");
if(error != NETEVENT_NOERROR) {
log_info("outnettcp got tcp error %d", error);
/* pass error below and exit */
} else {
/* check ID */
if(ldns_buffer_limit(c->buffer) < sizeof(uint16_t) ||
LDNS_ID_WIRE(ldns_buffer_begin(c->buffer))!=pend->id) {
log_info("outnettcp: bad ID in reply");
log_addr(&pend->query->addr, pend->query->addrlen);
error = NETEVENT_CLOSED;
}
}
(void)(*pend->query->cb)(c, pend->query->cb_arg, error, reply_info);
comm_point_close(c);
pend->next_free = outnet->tcp_free;
outnet->tcp_free = pend;
free(pend->query);
pend->query = NULL;
use_free_buffer(outnet);
return 0;
}
/** callback for incoming udp answers from the network. */
static int
outnet_udp_cb(struct comm_point* c, void* arg, int error,
@ -271,10 +363,34 @@ pending_udp_timer_cb(void *arg)
pending_delete(p->outnet, p);
}
/** create pending_tcp buffers */
static int
create_pending_tcp(struct outside_network* outnet, size_t bufsize)
{
size_t i;
if(outnet->num_tcp == 0)
return 1; /* no tcp needed, nothing to do */
if(!(outnet->tcp_conns = (struct pending_tcp **)calloc(
outnet->num_tcp, sizeof(struct pending_tcp*))))
return 0;
for(i=0; i<outnet->num_tcp; i++) {
if(!(outnet->tcp_conns[i] = (struct pending_tcp*)calloc(1,
sizeof(struct pending_tcp))))
return 0;
outnet->tcp_conns[i]->next_free = outnet->tcp_free;
outnet->tcp_free = outnet->tcp_conns[i];
outnet->tcp_conns[i]->c = comm_point_create_tcp_out(
bufsize, outnet_tcp_cb, outnet->tcp_conns[i]);
if(!outnet->tcp_conns[i]->c)
return 0;
}
return 1;
}
struct outside_network*
outside_network_create(struct comm_base *base, size_t bufsize,
size_t num_ports, char** ifs, int num_ifs, int do_ip4,
int do_ip6, int port_base)
int do_ip6, int port_base, size_t num_tcp)
{
struct outside_network* outnet = (struct outside_network*)
calloc(1, sizeof(struct outside_network));
@ -284,6 +400,7 @@ outside_network_create(struct comm_base *base, size_t bufsize,
return NULL;
}
outnet->base = base;
outnet->num_tcp = num_tcp;
#ifndef INET6
do_ip6 = 0;
#endif
@ -295,7 +412,8 @@ outside_network_create(struct comm_base *base, size_t bufsize,
outnet->num_udp4+1, sizeof(struct comm_point*))) ||
!(outnet->udp6_ports = (struct comm_point **)calloc(
outnet->num_udp6+1, sizeof(struct comm_point*))) ||
!(outnet->pending = rbtree_create(pending_cmp)) ) {
!(outnet->pending = rbtree_create(pending_cmp)) ||
!create_pending_tcp(outnet, bufsize)) {
log_err("malloc failed");
outside_network_delete(outnet);
return NULL;
@ -376,6 +494,15 @@ outside_network_delete(struct outside_network* outnet)
comm_point_delete(outnet->udp6_ports[i]);
free(outnet->udp6_ports);
}
if(outnet->tcp_conns) {
size_t i;
for(i=0; i<outnet->num_tcp; i++)
if(outnet->tcp_conns[i]) {
comm_point_delete(outnet->tcp_conns[i]->c);
free(outnet->tcp_conns[i]);
}
free(outnet->tcp_conns);
}
free(outnet);
}
@ -531,3 +658,86 @@ pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
tv.tv_usec = 0;
comm_timer_set(pend->timer, &tv);
}
/** callback for outgoing TCP timer event */
static void
outnet_tcptimer(void* arg)
{
struct waiting_tcp* w = (struct waiting_tcp*)arg;
struct outside_network* outnet = w->outnet;
if(w->pkt) {
/* it is on the waiting list */
struct waiting_tcp* p=outnet->tcp_wait_first, *prev=NULL;
while(p) {
if(p == w) {
if(prev) prev->next_waiting = w->next_waiting;
else outnet->tcp_wait_first=w->next_waiting;
outnet->tcp_wait_last = prev;
break;
}
prev = p;
p=p->next_waiting;
}
} else {
/* it was in use */
struct pending_tcp* pend=(struct pending_tcp*)w->next_waiting;
comm_point_close(pend->c);
pend->query = NULL;
pend->next_free = outnet->tcp_free;
outnet->tcp_free = pend;
}
(void)(*w->cb)(NULL, w->cb_arg, NETEVENT_TIMEOUT, NULL);
free(w);
use_free_buffer(outnet);
}
void
pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
comm_point_callback_t* callback, void* callback_arg,
struct ub_randstate* rnd)
{
struct pending_tcp* pend = outnet->tcp_free;
struct waiting_tcp* w;
struct timeval tv;
uint16_t id;
/* if no buffer is free allocate space to store query */
w = (struct waiting_tcp*)malloc(sizeof(struct waiting_tcp)
+ (pend?0:ldns_buffer_limit(packet)));
if(!w) {
/* callback user for the error */
(void)(*callback)(NULL, callback_arg, NETEVENT_CLOSED, NULL);
return;
}
if(!(w->timer = comm_timer_create(outnet->base, outnet_tcptimer, w))) {
free(w);
(void)(*callback)(NULL, callback_arg, NETEVENT_CLOSED, NULL);
return;
}
w->pkt = NULL;
w->pkt_len = ldns_buffer_limit(packet);
/* id uses lousy random() TODO use better and entropy */
id = ((unsigned)ub_random(rnd)>>8) & 0xffff;
LDNS_ID_SET(ldns_buffer_begin(packet), id);
memcpy(&w->addr, addr, addrlen);
w->addrlen = addrlen;
w->outnet = outnet;
w->cb = callback;
w->cb_arg = callback_arg;
tv.tv_sec = timeout;
tv.tv_usec = 0;
comm_timer_set(w->timer, &tv);
if(pend) {
/* we have a buffer available right now */
outnet_tcp_take_into_use(w, ldns_buffer_begin(packet));
} else {
/* queue up */
w->pkt = (uint8_t*)w + sizeof(struct waiting_tcp);
memmove(w->pkt, ldns_buffer_begin(packet), w->pkt_len);
w->next_waiting = NULL;
if(outnet->tcp_wait_last)
outnet->tcp_wait_last->next_waiting = w;
else outnet->tcp_wait_first = w;
outnet->tcp_wait_last = w;
}
}

View file

@ -49,6 +49,8 @@
struct pending;
struct pending_timeout;
struct ub_randstate;
struct pending_tcp;
struct waiting_tcp;
/**
* Send queries to outside servers and wait for answers from servers.
@ -77,8 +79,24 @@ struct outside_network {
/** number of udp6 ports */
size_t num_udp6;
/** pending answers. sorted by id, addr */
/** pending udp answers. sorted by id, addr */
rbtree_t *pending;
/**
* Array of tcp pending used for outgoing TCP connections.
* Each can be used to establish a TCP connection with a server.
* The file descriptors are -1 if its free, need to be opened for
* the tcp connection. Can be used for ip4 and ip6.
*/
struct pending_tcp **tcp_conns;
/** number of tcp communication points. */
size_t num_tcp;
/** list of tcp comm points that are free for use */
struct pending_tcp* tcp_free;
/** list of tcp queries waiting for a buffer */
struct waiting_tcp* tcp_wait_first;
/** last of waiting query list */
struct waiting_tcp* tcp_wait_last;
};
/**
@ -105,6 +123,53 @@ struct pending {
struct outside_network* outnet;
};
/**
* Pending TCP query to server.
*/
struct pending_tcp {
/** next in list of free tcp comm points, or NULL. */
struct pending_tcp* next_free;
/** the ID for the query; checked in reply */
uint16_t id;
/** tcp comm point it was sent on (and reply must come back on). */
struct comm_point* c;
/** the query being serviced, NULL if the pending_tcp is unused. */
struct waiting_tcp* query;
};
/**
* Query waiting for TCP buffer.
*/
struct waiting_tcp {
/**
* next in waiting list.
* if pkt==0, this points to the pending_tcp structure.
*/
struct waiting_tcp* next_waiting;
/** timeout event; timer keeps running whether the query is
* waiting for a buffer or the tcp reply is pending */
struct comm_timer* timer;
/** the outside network it is part of */
struct outside_network* outnet;
/** remote address. */
struct sockaddr_storage addr;
/** length of addr field in use. */
socklen_t addrlen;
/**
* The query itself, the query packet to send.
* allocated after the waiting_tcp structure.
* set to NULL when the query is serviced and it part of pending_tcp.
* if this is NULL, the next_waiting points to the pending_tcp.
*/
uint8_t* pkt;
/** length of query packet. */
size_t pkt_len;
/** callback for the timeout, error or reply to the message */
comm_point_callback_t* cb;
/** callback user argument */
void* cb_arg;
};
/**
* Create outside_network structure with N udp ports.
* @param base: the communication base to use for event handling.
@ -117,11 +182,12 @@ struct pending {
* @param do_ip6: service IP6.
* @param port_base: if -1 system assigns ports, otherwise try to get
* the ports numbered from this starting number.
* @param num_tcp: number of outgoing tcp buffers to preallocate.
* @return: the new structure (with no pending answers) or NULL on error.
*/
struct outside_network* outside_network_create(struct comm_base* base,
size_t bufsize, size_t num_ports, char** ifs, int num_ifs,
int do_ip4, int do_ip6, int port_base);
int do_ip4, int do_ip6, int port_base, size_t num_tcp);
/**
* Delete outside_network structure.
@ -148,6 +214,27 @@ void pending_udp_query(struct outside_network* outnet, ldns_buffer* packet,
comm_point_callback_t* callback, void* callback_arg,
struct ub_randstate* rnd);
/**
* Send TCP query. May wait for TCP buffer. Selects ID to be random, and
* checks id.
* @param outnet: provides the event handling.
* @param packet: wireformat query to send to destination. copied from.
* @param addr: address to send to.
* @param addrlen: length of addr.
* @param timeout: in seconds from now.
* Timer starts running now. Timer may expire if all buffers are used,
* without any query been sent to the server yet.
* @param callback: function to call on error, timeout or reply.
* The routine does not return an error, instead it calls the callback,
* with an error code if an error happens.
* @param callback_arg: user argument for callback function.
* @param rnd: random state for generating ID.
*/
void pending_tcp_query(struct outside_network* outnet, ldns_buffer* packet,
struct sockaddr_storage* addr, socklen_t addrlen, int timeout,
comm_point_callback_t* callback, void* callback_arg,
struct ub_randstate* rnd);
/**
* Delete pending answer.
* @param outnet: outside network the pending query is part of.

View file

@ -633,7 +633,8 @@ struct outside_network*
outside_network_create(struct comm_base* base, size_t bufsize,
size_t ATTR_UNUSED(num_ports), char** ATTR_UNUSED(ifs),
int ATTR_UNUSED(num_ifs), int ATTR_UNUSED(do_ip4),
int ATTR_UNUSED(do_ip6), int ATTR_UNUSED(port_base))
int ATTR_UNUSED(do_ip6), int ATTR_UNUSED(port_base),
size_t ATTR_UNUSED(num_tcp))
{
struct outside_network* outnet = calloc(1,
sizeof(struct outside_network));

View file

@ -77,6 +77,7 @@ config_create()
cfg->do_tcp = 1;
cfg->outgoing_base_port = cfg->port + 1000;
cfg->outgoing_num_ports = 16;
cfg->outgoing_num_tcp = 10;
cfg->msg_cache_size = 4 * 1024 * 1024;
cfg->msg_cache_slabs = 4;
cfg->num_queries_per_thread = 1024;

View file

@ -68,6 +68,8 @@ struct config_file {
int outgoing_base_port;
/** outgoing port range number of ports (per thread, per if) */
int outgoing_num_ports;
/** number of outgoing tcp buffers per (per thread) */
size_t outgoing_num_tcp;
/** size of the message cache */
size_t msg_cache_size;

View file

@ -102,6 +102,7 @@ verbosity{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_VERBOSITY;}
port{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_PORT;}
outgoing-port{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_OUTGOING_PORT;}
outgoing-range{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_OUTGOING_RANGE;}
outgoing-num-tcp{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_OUTGOING_NUM_TCP;}
do-ip4{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_DO_IP4;}
do-ip6{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_DO_IP6;}
do-udp{COLON} { LEXOUT(("v(%s) ", yytext)); return VAR_DO_UDP;}

View file

@ -73,7 +73,7 @@ extern struct config_parser_state* cfg_parser;
%token VAR_FORWARD_TO VAR_FORWARD_TO_PORT VAR_CHROOT
%token VAR_USERNAME VAR_DIRECTORY VAR_LOGFILE VAR_PIDFILE
%token VAR_MSG_CACHE_SIZE VAR_MSG_CACHE_SLABS VAR_NUM_QUERIES_PER_THREAD
%token VAR_RRSET_CACHE_SIZE VAR_RRSET_CACHE_SLABS
%token VAR_RRSET_CACHE_SIZE VAR_RRSET_CACHE_SLABS VAR_OUTGOING_NUM_TCP
%%
toplevelvars: /* empty */ | toplevelvars toplevelvar ;
@ -97,7 +97,7 @@ content_server: server_num_threads | server_verbosity | server_port |
server_username | server_directory | server_logfile | server_pidfile |
server_msg_cache_size | server_msg_cache_slabs |
server_num_queries_per_thread | server_rrset_cache_size |
server_rrset_cache_slabs
server_rrset_cache_slabs | server_outgoing_num_tcp
;
server_num_threads: VAR_NUM_THREADS STRING
{
@ -157,6 +157,15 @@ server_outgoing_range: VAR_OUTGOING_RANGE STRING
free($2);
}
;
server_outgoing_num_tcp: VAR_OUTGOING_NUM_TCP STRING
{
OUTYY(("P(server_outgoing_num_tcp:%s)\n", $2));
if(atoi($2) == 0 && strcmp($2, "0") != 0)
yyerror("number expected");
else cfg_parser->cfg->outgoing_num_tcp = atoi($2);
free($2);
}
;
server_do_ip4: VAR_DO_IP4 STRING
{
OUTYY(("P(server_do_ip4:%s)\n", $2));

View file

@ -318,11 +318,13 @@ reclaim_tcp_handler(struct comm_point* c)
{
log_assert(c->type == comm_tcp);
comm_point_close(c);
c->tcp_free = c->tcp_parent->tcp_free;
c->tcp_parent->tcp_free = c;
if(!c->tcp_free) {
/* re-enable listening on accept socket */
comm_point_start_listening(c->tcp_parent, -1, -1);
if(c->tcp_parent) {
c->tcp_free = c->tcp_parent->tcp_free;
c->tcp_parent->tcp_free = c;
if(!c->tcp_free) {
/* re-enable listening on accept socket */
comm_point_start_listening(c->tcp_parent, -1, -1);
}
}
}
@ -336,8 +338,10 @@ tcp_callback_writer(struct comm_point* c)
c->tcp_is_reading = 1;
c->tcp_byte_count = 0;
comm_point_stop_listening(c);
/* for listening socket */
reclaim_tcp_handler(c);
if(c->tcp_parent) /* for listening socket */
reclaim_tcp_handler(c);
else /* its outgoing socket, start listening for reading */
comm_point_start_listening(c, -1, -1);
}
/** do the callback when reading is done */
@ -421,7 +425,8 @@ comm_point_tcp_handle_read(int fd, struct comm_point* c, int short_ok)
return 1;
}
/** Handle tcp writing callback.
/**
* Handle tcp writing callback.
* @param fd: file descriptor of socket.
* @param c: comm point to write buffer out of.
* @return: 0 on error
@ -433,6 +438,21 @@ comm_point_tcp_handle_write(int fd, struct comm_point* c)
log_assert(c->type == comm_tcp);
if(c->tcp_is_reading)
return 0;
if(c->tcp_byte_count == 0 && c->tcp_check_nb_connect) {
/* check for pending error from nonblocking connect */
/* from Stevens, unix network programming, vol1, 3rd ed, p450*/
int error = 0;
socklen_t len = (socklen_t)sizeof(error);
if(getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0){
error = errno; /* on solaris errno is error */
}
if(error == EINPROGRESS || error == EWOULDBLOCK)
return 1; /* try again later */
if(error != 0) {
log_err("tcp connect: %s", strerror(error));
return 0;
}
}
if(c->tcp_byte_count < sizeof(uint16_t)) {
uint16_t len = htons(ldns_buffer_limit(c->buffer));
@ -553,6 +573,7 @@ comm_point_create_udp(struct comm_base *base, int fd, ldns_buffer* buffer,
c->tcp_do_close = 0;
c->do_not_close = 0;
c->tcp_do_toggle_rw = 0;
c->tcp_check_nb_connect = 0;
c->callback = callback;
c->cb_arg = callback_arg;
evbits = EV_READ | EV_PERSIST;
@ -607,6 +628,7 @@ comm_point_create_tcp_handler(struct comm_base *base,
c->tcp_do_close = 0;
c->do_not_close = 0;
c->tcp_do_toggle_rw = 1;
c->tcp_check_nb_connect = 0;
c->callback = callback;
c->cb_arg = callback_arg;
/* add to parent free list */
@ -662,6 +684,7 @@ comm_point_create_tcp(struct comm_base *base, int fd, int num, size_t bufsize,
c->tcp_do_close = 0;
c->do_not_close = 0;
c->tcp_do_toggle_rw = 0;
c->tcp_check_nb_connect = 0;
c->callback = NULL;
c->cb_arg = NULL;
evbits = EV_READ | EV_PERSIST;
@ -688,6 +711,44 @@ comm_point_create_tcp(struct comm_base *base, int fd, int num, size_t bufsize,
return c;
}
struct comm_point*
comm_point_create_tcp_out(size_t bufsize,
comm_point_callback_t* callback, void* callback_arg)
{
struct comm_point* c = (struct comm_point*)calloc(1,
sizeof(struct comm_point));
if(!c)
return NULL;
c->ev = (struct internal_event*)calloc(1,
sizeof(struct internal_event));
if(!c->ev) {
free(c);
return NULL;
}
c->fd = -1;
c->buffer = ldns_buffer_new(bufsize);
if(!c->buffer) {
free(c->ev);
free(c);
return NULL;
}
c->timeout = NULL;
c->tcp_is_reading = 0;
c->tcp_byte_count = 0;
c->tcp_parent = NULL;
c->max_tcp_count = 0;
c->tcp_handlers = NULL;
c->tcp_free = NULL;
c->type = comm_tcp;
c->tcp_do_close = 0;
c->do_not_close = 0;
c->tcp_do_toggle_rw = 1;
c->tcp_check_nb_connect = 1;
c->callback = callback;
c->cb_arg = callback_arg;
return c;
}
struct comm_point*
comm_point_create_local(struct comm_base *base, int fd, size_t bufsize,
comm_point_callback_t* callback, void* callback_arg)
@ -721,6 +782,7 @@ comm_point_create_local(struct comm_base *base, int fd, size_t bufsize,
c->tcp_do_close = 0;
c->do_not_close = 1;
c->tcp_do_toggle_rw = 0;
c->tcp_check_nb_connect = 0;
c->callback = callback;
c->cb_arg = callback_arg;
/* libevent stuff */
@ -743,9 +805,10 @@ comm_point_close(struct comm_point* c)
{
if(!c)
return;
if(event_del(&c->ev->ev) != 0) {
log_err("could not event_del on close");
}
if(c->fd != -1)
if(event_del(&c->ev->ev) != 0) {
log_err("could not event_del on close");
}
/* close fd after removing from event lists, or epoll.. is messed up */
if(c->fd != -1 && !c->do_not_close)
close(c->fd);

View file

@ -155,6 +155,9 @@ struct comm_point {
So that when that is done the callback is called. */
int tcp_do_toggle_rw;
/** if set, checks for pending error from nonblocking connect() call.*/
int tcp_check_nb_connect;
/** callback when done.
tcp_accept does not get called back, is NULL then.
If a timeout happens, callback with timeout=1 is called.
@ -288,6 +291,16 @@ struct comm_point* comm_point_create_tcp(struct comm_base* base,
int fd, int num, size_t bufsize,
comm_point_callback_t* callback, void* callback_arg);
/**
* Create an outgoing TCP commpoint. No file descriptor is opened, left at -1.
* @param bufsize: size of buffer to create for handlers.
* @param callback: callback function pointer for the handler.
* @param callback_arg: will be passed to your callback function.
* @return: the commpoint or NULL on error.
*/
struct comm_point* comm_point_create_tcp_out(size_t bufsize,
comm_point_callback_t* callback, void* callback_arg);
/**
* Create commpoint to listen to a local domain file descriptor.
* @param base: in which base to alloc the commpoint.