Threading.

git-svn-id: file:///svn/unbound/trunk@146 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
Wouter Wijngaards 2007-02-26 14:49:11 +00:00
parent e679f51ed9
commit e4d39152b3
13 changed files with 467 additions and 47 deletions

View file

@ -49,6 +49,10 @@
#include "util/config_file.h"
#include "services/listen_dnsport.h"
/* @@@ TODO remove */
#include "pthread.h"
#include <signal.h>
struct daemon*
daemon_init()
{
@ -74,22 +78,126 @@ daemon_open_shared_ports(struct daemon* daemon)
return 1;
}
/**
* Allocate empty worker structures. With backptr and thread-number,
* from 0..numthread initialised. Used as user arguments to new threads.
* @param daemon: the daemon with (new) config settings.
*/
static void
daemon_create_workers(struct daemon* daemon)
{
int i;
log_assert(daemon && daemon->cfg);
/* only one thread for now */
daemon->num = daemon->cfg->num_threads;
#if !defined(HAVE_PTHREAD) && !defined(HAVE_SOLARIS_THREADS)
if(daemon->num != 1) {
log_err("configed %d threads, but executable was compiled "
"with no thread support. Continuing with 1.", daemon->num);
daemon->num = 1;
}
#endif /* no threads */
daemon->workers = (struct worker**)calloc((size_t)daemon->num,
sizeof(struct worker*));
for(i=0; i<daemon->num; i++) {
if(!(daemon->workers[i] = worker_create(daemon, i)))
fatal_exit("malloc failure");
}
}
/**
* Function to start one thread.
* @param arg: user argument.
* @return: void* user return value could be used for thread_join results.
*/
static void*
thread_start(void* arg)
{
struct worker* worker = (struct worker*)arg;
int num = worker->thread_num;
ub_thread_blocksigs();
if(!worker_init(worker, worker->daemon->cfg, worker->daemon->ports,
BUFSZ, 0))
fatal_exit("Could not initialize thread #%d", num);
worker_work(worker);
return NULL;
}
/**
* Fork and init the other threads. Main thread returns for special handling.
* @param daemon: the daemon with other threads to fork.
*/
static void
daemon_start_others(struct daemon* daemon)
{
int i;
log_assert(daemon);
log_info("start others");
/* skip i=0, is this thread */
for(i=1; i<daemon->num; i++) {
ub_thread_create(&daemon->workers[i]->thr_id,
thread_start, daemon->workers[i]);
}
}
/**
* Stop the other threads.
* @param daemon: the daemon with other threads.
*/
static void
daemon_stop_others(struct daemon* daemon)
{
int i, err;
log_assert(daemon);
log_info("stop others");
/* skip i=0, is this thread */
/* use i=0 buffer for sending cmds; because we are #0 */
for(i=1; i<daemon->num; i++) {
log_info("killing thread %d", i);
worker_send_cmd(daemon->workers[i],
daemon->workers[0]->front->udp_buff, worker_cmd_quit);
}
/** wait for them to quit */
for(i=1; i<daemon->num; i++) {
/* join it to make sure its dead */
log_info("join %d", i);
if((err=pthread_join(daemon->workers[i]->thr_id, NULL)))
log_err("pthread_join: %s", strerror(err));
log_info("join success %d", i);
}
}
void
daemon_fork(struct daemon* daemon)
{
/* only one thread for now */
log_assert(daemon);
daemon->num = 1;
daemon->workers = (struct worker**)calloc((size_t)daemon->num,
sizeof(struct worker*));
if(!(daemon->workers[0] = worker_init(daemon->cfg, daemon->ports,
BUFSZ)))
fatal_exit("could not initialize thread # %d", 0);
daemon->workers[0]->daemon = daemon;
daemon->workers[0]->thread_num = 0;
log_info("daemon_fork");
/* first create all the worker structures, so we can pass
* them to the newly created threads.
*/
daemon_create_workers(daemon);
/* Now create the threads and init the workers.
* By the way, this is thread #0 (the main thread).
*/
daemon_start_others(daemon);
/* Special handling for the main thread. This is the thread
* that handles signals.
*/
if(!worker_init(daemon->workers[0], daemon->cfg, daemon->ports,
BUFSZ, 1))
fatal_exit("Could not initialize main thread");
/* see if other threads have started correctly? */
/* Start resolver service on main thread. */
log_info("start of service (%s).", PACKAGE_STRING);
worker_work(daemon->workers[0]);
/* we exited! a signal happened! Stop other threads */
daemon_stop_others(daemon);
if(daemon->workers[0]->need_to_restart)
daemon->need_to_exit = 0;
else daemon->need_to_exit = 1;

View file

@ -60,6 +60,20 @@
/** the size of ID and flags, opcode, rcode in dns packet */
#define ID_AND_FLAGS 4
void
worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
enum worker_commands cmd)
{
ldns_buffer_clear(buffer);
/* like DNS message, length data */
ldns_buffer_write_u16(buffer, sizeof(uint32_t));
ldns_buffer_write_u32(buffer, (uint32_t)cmd);
ldns_buffer_flip(buffer);
if(!write_socket(worker->cmd_send_fd, ldns_buffer_begin(buffer),
ldns_buffer_limit(buffer)))
log_err("write socket: %s", strerror(errno));
}
/** reply to query with given error code */
static void
replyerror(int r, struct worker* worker)
@ -149,6 +163,36 @@ worker_check_request(ldns_buffer* pkt)
return 0;
}
/** process control messages from the main thread. */
static int
worker_handle_control_cmd(struct comm_point* c, void* arg, int error,
struct comm_reply* ATTR_UNUSED(reply_info))
{
struct worker* worker = (struct worker*)arg;
enum worker_commands cmd;
if(error != NETEVENT_NOERROR) {
if(error == NETEVENT_CLOSED)
comm_base_exit(worker->base);
else log_info("control cmd: %d", error);
return 0;
}
log_info("control cmd");
if(ldns_buffer_limit(c->buffer) != sizeof(uint32_t)) {
fatal_exit("bad control msg length %d",
(int)ldns_buffer_limit(c->buffer));
}
cmd = ldns_buffer_read_u32(c->buffer);
switch(cmd) {
case worker_cmd_quit:
comm_base_exit(worker->base);
break;
default:
log_err("bad command %d", (int)cmd);
break;
}
return 0;
}
/** handles callbacks from listening event interface */
static int
worker_handle_request(struct comm_point* c, void* arg, int error,
@ -210,59 +254,97 @@ worker_sighandler(int sig, void* arg)
}
struct worker*
worker_init(struct config_file *cfg, struct listen_port* ports,
size_t buffer_size)
worker_create(struct daemon* daemon, int id)
{
struct worker* worker = (struct worker*)calloc(1,
sizeof(struct worker));
unsigned int seed;
int sv[2];
if(!worker)
return NULL;
worker->daemon = daemon;
worker->thread_num = id;
worker->cmd_send_fd = -1;
worker->cmd_recv_fd = -1;
/* create socketpair to communicate with worker */
if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
free(worker);
log_err("socketpair: %s", strerror(errno));
return NULL;
}
worker->cmd_send_fd = sv[0];
worker->cmd_recv_fd = sv[1];
return worker;
}
int
worker_init(struct worker* worker, struct config_file *cfg,
struct listen_port* ports, size_t buffer_size, int do_sigs)
{
unsigned int seed;
int startport;
worker->need_to_restart = 0;
worker->base = comm_base_create();
if(!worker->base) {
log_err("could not create event handling base");
worker_delete(worker);
return NULL;
return 0;
}
worker->comsig = comm_signal_create(worker->base, worker_sighandler,
worker);
if(do_sigs) {
worker->comsig = comm_signal_create(worker->base,
worker_sighandler, worker);
if(!worker->comsig || !comm_signal_bind(worker->comsig, SIGHUP)
|| !comm_signal_bind(worker->comsig, SIGINT)
|| !comm_signal_bind(worker->comsig, SIGQUIT)) {
log_err("could not create signal handlers");
worker_delete(worker);
return NULL;
return 0;
}
ub_thread_sig_unblock(SIGHUP);
ub_thread_sig_unblock(SIGINT);
ub_thread_sig_unblock(SIGQUIT);
} else { /* !do_sigs */
worker->comsig = 0;
}
worker->front = listen_create(worker->base, ports,
buffer_size, worker_handle_request, worker);
if(!worker->front) {
log_err("could not create listening sockets");
worker_delete(worker);
return NULL;
return 0;
}
startport = cfg->outgoing_base_port +
cfg->outgoing_num_ports * worker->thread_num;
worker->back = outside_network_create(worker->base,
buffer_size, (size_t)cfg->outgoing_num_ports, cfg->ifs,
cfg->num_ifs, cfg->do_ip4, cfg->do_ip6,
cfg->outgoing_base_port);
cfg->num_ifs, cfg->do_ip4, cfg->do_ip6, startport);
if(!worker->back) {
log_err("could not create outgoing sockets");
worker_delete(worker);
return NULL;
return 0;
}
/* init random(), large table size. */
if(!(worker->rndstate = (struct ub_randstate*)calloc(1,
sizeof(struct ub_randstate)))) {
log_err("malloc rndtable failed.");
worker_delete(worker);
return NULL;
return 0;
}
seed = (unsigned int)time(NULL) ^ (unsigned int)getpid();
seed = (unsigned int)time(NULL) ^ (unsigned int)getpid() ^
(unsigned int)worker->thread_num;
if(!ub_initstate(seed, worker->rndstate, RND_STATE_SIZE)) {
log_err("could not init random numbers.");
worker_delete(worker);
return NULL;
return 0;
}
/* start listening to commands */
if(!(worker->cmd_com=comm_point_create_local(worker->base,
worker->cmd_recv_fd, buffer_size, worker_handle_control_cmd,
worker))) {
log_err("could not create control compt.");
worker_delete(worker);
return 0;
}
/* set forwarder address */
if(cfg->fwd_address && cfg->fwd_address[0]) {
if(!worker_set_fwd(worker, cfg->fwd_address, cfg->fwd_port)) {
@ -270,7 +352,7 @@ worker_init(struct config_file *cfg, struct listen_port* ports,
fatal_exit("could not set forwarder address");
}
}
return worker;
return 1;
}
void
@ -284,6 +366,10 @@ worker_delete(struct worker* worker)
{
if(!worker)
return;
close(worker->cmd_send_fd);
worker->cmd_send_fd = -1;
close(worker->cmd_recv_fd);
worker->cmd_recv_fd = -1;
listen_delete(worker->front);
outside_network_delete(worker->back);
comm_signal_delete(worker->comsig);

View file

@ -56,6 +56,13 @@ struct ub_randstate;
/** size of table used for random numbers. large to be more secure. */
#define RND_STATE_SIZE 256
/** worker commands */
enum worker_commands {
/** make the worker quit */
worker_cmd_quit
};
/**
* Structure holding working information for unbound.
* Holds globally visible information.
@ -65,6 +72,12 @@ struct worker {
struct daemon* daemon;
/** the thread number (in daemon array). */
int thread_num;
/** thread id */
ub_thread_t thr_id;
/** fd 0 of socketpair, write commands for worker to this one */
int cmd_send_fd;
/** fd 1 of socketpair, worker listens on this one */
int cmd_recv_fd;
/** the event base this worker works with */
struct comm_base* base;
/** the frontside listening interface where request events come in */
@ -73,6 +86,8 @@ struct worker {
struct outside_network* back;
/** the signal handler */
struct comm_signal* comsig;
/** commpoint to listen to commands. */
struct comm_point* cmd_com;
/** number of requests currently active */
int num_requests;
@ -92,16 +107,27 @@ struct worker {
int need_to_restart;
};
/**
* Create the worker structure. Bare bones version, zeroed struct,
* with backpointers only. Use worker_init on it later.
* @param daemon: the daemon that this worker thread is part of.
* @param id: the thread number from 0.. numthreads-1.
* @return: the new worker or NULL on alloc failure.
*/
struct worker* worker_create(struct daemon* daemon, int id);
/**
* Initialize worker.
* Allocates event base, listens to ports
* @param worker: worker to initialize, created with worker_create.
* @param cfg: configuration settings.
* @param ports: list of shared query ports.
* @param buffer_size: size of datagram buffer.
* @return: The worker, or NULL on error.
* @param do_sigs: if true, worker installs signal handlers.
* @return: false on error.
*/
struct worker* worker_init(struct config_file *cfg, struct listen_port* ports,
size_t buffer_size);
int worker_init(struct worker* worker, struct config_file *cfg,
struct listen_port* ports, size_t buffer_size, int do_sigs);
/**
* Make worker work.
@ -122,4 +148,13 @@ void worker_delete(struct worker* worker);
*/
int worker_set_fwd(struct worker* worker, const char* ip, int port);
/**
* Send a command to a worker. Uses blocking writes.
* @param worker: worker to send command to.
* @param buffer: an empty buffer to use.
* @param cmd: command to send.
*/
void worker_send_cmd(struct worker* worker, ldns_buffer* buffer,
enum worker_commands cmd);
#endif /* DAEMON_WORKER_H */

View file

@ -1,5 +1,8 @@
26 February 2007: Wouter
- ub_random code used to select ID and port.
- log code prints thread id.
- unbound can thread itself, with reload(HUP) and quit working
correctly.
23 February 2007: Wouter
- Can do reloads on sigHUP. Everything is stopped, and freed,

View file

@ -12,7 +12,7 @@
server:
# whitespace is not necessary, but looks cleaner.
# verbosity number, 0 is least verbose.
# verbosity number, 0 is least verbose. 1 is default.
verbosity: 2
# number of threads to create. 1 disables threading.

View file

@ -45,6 +45,9 @@ clause.
The verbosity number, level 0 means no verbosity, only errors. Level 1
gives operational information. Level 2 gives query level information,
output per query. Level 3 gives algorithm level information.
Default is level 1. The verbosity can also be increased from the commandline,
see
.Xr unbound 8 .
.It \fBnum-threads:\fR <number>
The number of threads to create to serve clients. Use 1 for no threading.
.It \fBport:\fR <port number>
@ -87,7 +90,7 @@ Sets the working directory for the program.
.It \fBlogfile:\fR <filename>
If "" is given, logging goes to stderr, or nowhere once daemonized.
The logfile is appended to, in the following format:
[seconds since 1970] unbound[pid]: type: message.
[seconds since 1970] unbound[pid:tid]: type: message.
.It \fBpidfile:\fR <filename>
The process id is written to the file. Default is "unbound.pid". So,
kill -HUP `cat /etc/unbound/unbound.pid` will trigger a reload,

View file

@ -41,4 +41,52 @@
#include "config.h"
#include "util/locks.h"
#include <signal.h>
void
ub_thread_blocksigs()
{
#ifdef HAVE_PTHREAD
int err;
sigset_t sigset;
sigfillset(&sigset);
log_info("blocking signals");
if((err=pthread_sigmask(SIG_SETMASK, &sigset, NULL)))
fatal_exit("pthread_sigmask: %s", strerror(err));
#else
# ifdef HAVE_SOLARIS_THREADS
int err;
sigset_t sigset;
sigfillset(&sigset);
if((err=thr_sigsetmask(SIG_SETMASK, &sigset, NULL)))
fatal_exit("thr_sigsetmask: %s", strerror(err));
# else
/* have nothing, do nothing */
# endif /* HAVE_SOLARIS_THREADS */
#endif /* HAVE_PTHREAD */
}
void ub_thread_sig_unblock(int sig)
{
#ifdef HAVE_PTHREAD
int err;
sigset_t sigset;
sigemptyset(&sigset);
sigaddset(&sigset, sig);
log_info("unblocking signal %d", sig);
if((err=pthread_sigmask(SIG_UNBLOCK, &sigset, NULL)))
fatal_exit("pthread_sigmask: %s", strerror(err));
#else
# ifdef HAVE_SOLARIS_THREADS
int err;
sigset_t sigset;
sigemptyset(&sigset);
sigaddset(&sigset, sig);
if((err=thr_sigsetmask(SIG_UNBLOCK, &sigset, NULL)))
fatal_exit("thr_sigsetmask: %s", strerror(err));
# else
/* have nothing, do nothing */
# endif /* HAVE_SOLARIS_THREADS */
#endif /* HAVE_PTHREAD */
}

View file

@ -121,6 +121,8 @@ typedef pthread_spinlock_t lock_quick_t;
typedef pthread_t ub_thread_t;
/** Pass where to store tread_t in thr. Use default NULL attributes. */
#define ub_thread_create(thr, func, arg) LOCKRET(pthread_create(thr, NULL, func, arg))
/** get self id. */
#define ub_thread_self() pthread_self()
#else /* we do not HAVE_PTHREAD */
#ifdef HAVE_SOLARIS_THREADS
@ -150,6 +152,7 @@ typedef mutex_t lock_quick_t;
/** Thread creation, create a default thread. */
typedef thread_t ub_thread_t;
#define ub_thread_create(thr, func, arg) LOCKRET(thr_create(NULL, NULL, func, arg, NULL, thr))
#define ub_thread_self() thr_self()
#else /* we do not HAVE_SOLARIS_THREADS and no PTHREADS */
@ -182,7 +185,20 @@ typedef int ub_thread_t;
#define ub_thread_create(thr, func, arg) \
fatal_exit("%s %d called thread create, but no thread support " \
"has been compiled in.", __FILE__, __LINE__)
#define ub_thread_self() 0
#endif /* HAVE_SOLARIS_THREADS */
#endif /* HAVE_PTHREAD */
/**
* Block all signals for this thread.
* fatal exit on error.
*/
void ub_thread_blocksigs();
/**
* unblock one signal for this thread.
*/
void ub_thread_sig_unblock(int sig);
#endif /* UTIL_LOCKS_H */

View file

@ -39,6 +39,7 @@
#include "config.h"
#include "util/log.h"
#include "util/locks.h"
#ifdef HAVE_TIME_H
#include <time.h>
#endif
@ -64,6 +65,7 @@ log_init(const char* filename)
strerror(errno));
return;
}
verbose(VERB_DETAIL, "switching to logfile %s", filename);
if(logfile && logfile != stderr)
fclose(logfile);
logfile = f;
@ -75,8 +77,10 @@ log_vmsg(const char* type, const char *format, va_list args)
char message[MAXSYSLOGMSGLEN];
const char* ident="unbound";
vsnprintf(message, sizeof(message), format, args);
fprintf(logfile, "[%d] %s[%d] %s: %s\n",
(int)time(NULL), ident, (int)getpid(), type, message);
fprintf(logfile, "[%d] %s[%d:%x] %s: %s\n",
(int)time(NULL), ident, (int)getpid(),
(int)ub_thread_self(),
type, message);
fflush(logfile);
}

View file

@ -49,3 +49,23 @@ str_is_ip6(const char* str)
else return 0;
}
int
write_socket(int s, const void *buf, size_t size)
{
const char* data = (const char*)buf;
size_t total_count = 0;
while (total_count < size) {
ssize_t count
= write(s, data + total_count, size - total_count);
if (count == -1) {
if (errno != EAGAIN && errno != EINTR) {
return 0;
} else {
continue;
}
}
total_count += count;
}
return 1;
}

View file

@ -49,4 +49,14 @@
*/
int str_is_ip6(const char* str);
/**
* Write (blocking) to a nonblocking socket.
* @param s: fd.
* @param buf: data buffer.
* @param size: length of data to send.
* @return: 0 on error. errno is set.
*/
int
write_socket(int s, const void *buf, size_t size);
#endif /* NET_HELP_H */

View file

@ -137,6 +137,9 @@ static void comm_timer_callback(int fd, short event, void* arg);
*/
static void comm_signal_callback(int fd, short event, void* arg);
/** libevent callback for AF_UNIX fds. */
static void comm_point_local_handle_callback(int fd, short event, void* arg);
/** create a tcp handler with a parent */
static struct comm_point* comm_point_create_tcp_handler(
struct comm_base *base, struct comm_point* parent, size_t bufsize,
@ -327,6 +330,7 @@ tcp_callback_writer(struct comm_point* c)
{
log_assert(c->type == comm_tcp);
ldns_buffer_clear(c->buffer);
if(c->tcp_do_toggle_rw)
c->tcp_is_reading = 1;
c->tcp_byte_count = 0;
comm_point_stop_listening(c);
@ -339,10 +343,12 @@ static void
tcp_callback_reader(struct comm_point* c)
{
struct comm_reply rep;
log_assert(c->type == comm_tcp);
log_assert(c->type == comm_tcp || c->type == comm_local);
ldns_buffer_flip(c->buffer);
if(c->tcp_do_toggle_rw)
c->tcp_is_reading = 0;
c->tcp_byte_count = 0;
if(c->type == comm_tcp)
comm_point_stop_listening(c);
rep.c = c;
rep.addrlen = 0;
@ -354,13 +360,14 @@ tcp_callback_reader(struct comm_point* c)
/** Handle tcp reading callback.
* @param fd: file descriptor of socket.
* @param c: comm point to read from into buffer.
* @param short_ok: if true, very short packets are OK (for comm_local).
* @return: 0 on error
*/
static int
comm_point_tcp_handle_read(int fd, struct comm_point* c)
comm_point_tcp_handle_read(int fd, struct comm_point* c, int short_ok)
{
ssize_t r;
log_assert(c->type == comm_tcp);
log_assert(c->type == comm_tcp || c->type == comm_local);
if(!c->tcp_is_reading)
return 0;
@ -386,7 +393,8 @@ comm_point_tcp_handle_read(int fd, struct comm_point* c)
}
ldns_buffer_set_limit(c->buffer,
ldns_buffer_read_u16_at(c->buffer, 0));
if(ldns_buffer_limit(c->buffer) < LDNS_HEADER_SIZE) {
if(!short_ok &&
ldns_buffer_limit(c->buffer) < LDNS_HEADER_SIZE) {
verbose(VERB_DETAIL, "tcp: dropped bogus too short.");
return 0;
}
@ -462,7 +470,7 @@ comm_point_tcp_handle_callback(int fd, short event, void* arg)
log_assert(c->type == comm_tcp);
if(event&EV_READ) {
if(!comm_point_tcp_handle_read(fd, c)) {
if(!comm_point_tcp_handle_read(fd, c, 0)) {
reclaim_tcp_handler(c);
if(!c->tcp_do_close)
(void)(*c->callback)(c, c->cb_arg,
@ -490,6 +498,20 @@ comm_point_tcp_handle_callback(int fd, short event, void* arg)
log_err("Ignored event %d for tcphdl.", event);
}
static void comm_point_local_handle_callback(int fd, short event, void* arg)
{
struct comm_point* c = (struct comm_point*)arg;
log_assert(c->type == comm_local);
if(event&EV_READ) {
if(!comm_point_tcp_handle_read(fd, c, 1)) {
log_err("error in localhdl");
}
return;
}
log_err("Ignored event %d for localhdl.", event);
}
struct comm_point*
comm_point_create_udp(struct comm_base *base, int fd, ldns_buffer* buffer,
comm_point_callback_t* callback, void* callback_arg)
@ -571,7 +593,7 @@ comm_point_create_tcp_handler(struct comm_base *base,
c->type = comm_tcp;
c->tcp_do_close = 0;
c->do_not_close = 0;
c->tcp_do_toggle_rw = 0;
c->tcp_do_toggle_rw = 1;
c->callback = callback;
c->cb_arg = callback_arg;
/* add to parent free list */
@ -653,6 +675,56 @@ comm_point_create_tcp(struct comm_base *base, int fd, int num, size_t bufsize,
return c;
}
struct comm_point*
comm_point_create_local(struct comm_base *base, int fd, size_t bufsize,
comm_point_callback_t* callback, void* callback_arg)
{
struct comm_point* c = (struct comm_point*)calloc(1,
sizeof(struct comm_point));
short evbits;
if(!c)
return NULL;
c->ev = (struct internal_event*)calloc(1,
sizeof(struct internal_event));
if(!c->ev) {
free(c);
return NULL;
}
c->fd = fd;
c->buffer = ldns_buffer_new(bufsize);
if(!c->buffer) {
free(c->ev);
free(c);
return NULL;
}
c->timeout = NULL;
c->tcp_is_reading = 1;
c->tcp_byte_count = 0;
c->tcp_parent = NULL;
c->max_tcp_count = 0;
c->tcp_handlers = NULL;
c->tcp_free = NULL;
c->type = comm_local;
c->tcp_do_close = 0;
c->do_not_close = 1;
c->tcp_do_toggle_rw = 0;
c->callback = callback;
c->cb_arg = callback_arg;
/* libevent stuff */
evbits = EV_PERSIST | EV_READ;
event_set(&c->ev->ev, c->fd, evbits, comm_point_local_handle_callback,
c);
if(event_base_set(base->eb->base, &c->ev->ev) != 0 ||
event_add(&c->ev->ev, c->timeout) != 0 )
{
log_err("could not add tcphdl event");
free(c->ev);
free(c);
return NULL;
}
return c;
}
void
comm_point_close(struct comm_point* c)
{

View file

@ -135,7 +135,9 @@ struct comm_point {
/** TCP accept socket - only creates handlers if readable. */
comm_tcp_accept,
/** TCP handler socket - handle byteperbyte readwrite. */
comm_tcp
comm_tcp,
/** AF_UNIX socket - for internal commands. */
comm_local
} type;
/* ---------- Behaviour ----------- */
@ -286,6 +288,19 @@ struct comm_point* comm_point_create_tcp(struct comm_base* base,
int fd, int num, size_t bufsize,
comm_point_callback_t* callback, void* callback_arg);
/**
* Create commpoint to listen to a local domain file descriptor.
* @param base: in which base to alloc the commpoint.
* @param fd: file descriptor of open AF_UNIX socket set to listen nonblocking.
* @param bufsize: size of buffer to create for handlers.
* @param callback: callback function pointer for the handler.
* @param callback_arg: will be passed to your callback function.
* @return: the commpoint or NULL on error.
*/
struct comm_point* comm_point_create_local(struct comm_base* base,
int fd, size_t bufsize,
comm_point_callback_t* callback, void* callback_arg);
/**
* Close a comm point fd.
* @param c: comm point to close.