serialize, deserialize, raw commpoints.

case preserve note.


git-svn-id: file:///svn/unbound/trunk@881 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
Wouter Wijngaards 2008-01-21 16:03:59 +00:00
parent c7ad292438
commit 05f9d35f00
9 changed files with 574 additions and 42 deletions

View file

@ -1,3 +1,6 @@
21 January 2008: Wouter
- libworker work, netevent raw commpoints, write_msg, serialize.
18 January 2008: Wouter
- touch up of manpage for libunbound.
- support for IP_RECVDSTADDR (for *BSD ip4).

View file

@ -200,3 +200,9 @@ o If a client makes a query without RD bit, in the case of a returned
ascertains that RRSIGs are OK (and not omitted), but does not
check NSEC/NSEC3.
o Case preservation
Unbound preserves the casing received from authority servers as best
as possible. It compresses without case, so case can get lost there.
The casing from the authority server is used in preference to the casing
of the query name. This is different from BIND. RFC4343 allows either
behaviour.

View file

@ -42,6 +42,7 @@
#include "libunbound/context.h"
#include "util/module.h"
#include "util/config_file.h"
#include "util/net_help.h"
#include "services/modstack.h"
#include "services/localzone.h"
#include "services/cache/rrset.h"
@ -183,3 +184,158 @@ context_release_alloc(struct ub_val_ctx* ctx, struct alloc_cache* alloc)
ctx->alloc_list = alloc;
lock_basic_unlock(&ctx->cfglock);
}
uint8_t*
context_serialize_new_query(struct ctx_query* q, uint32_t* len)
{
/* format for new query is
* o uint32 cmd
* o uint32 id
* o uint32 type
* o uint32 class
* o rest queryname (string)
*/
uint8_t* p;
size_t slen = strlen(q->res->qname) + 1/*end of string*/;
*len = sizeof(uint32_t)*4 + slen;
p = (uint8_t*)malloc(*len);
if(!p) return NULL;
ldns_write_uint32(p, UB_LIBCMD_NEWQUERY);
ldns_write_uint32(p+sizeof(uint32_t), (uint32_t)q->querynum);
ldns_write_uint32(p+2*sizeof(uint32_t), (uint32_t)q->res->qtype);
ldns_write_uint32(p+3*sizeof(uint32_t), (uint32_t)q->res->qclass);
memmove(p+4*sizeof(uint32_t), q->res->qname, slen);
return p;
}
struct ctx_query*
context_deserialize_new_query(struct ub_val_ctx* ctx, uint8_t* p, uint32_t len)
{
struct ctx_query* q = (struct ctx_query*)calloc(1, sizeof(*q));
if(!q) return NULL;
if(len < 4*sizeof(uint32_t)+1) {
free(q);
return NULL;
}
log_assert( ldns_read_uint32(p) == UB_LIBCMD_NEWQUERY);
q->querynum = (int)ldns_read_uint32(p+sizeof(uint32_t));
q->node.key = &q->querynum;
q->async = 1;
q->res = (struct ub_val_result*)calloc(1, sizeof(*q->res));
if(!q->res) {
free(q);
return NULL;
}
q->res->qtype = (int)ldns_read_uint32(p+2*sizeof(uint32_t));
q->res->qclass = (int)ldns_read_uint32(p+3*sizeof(uint32_t));
q->res->qname = strdup((char*)(p+4*sizeof(uint32_t)));
if(!q->res->qname) {
free(q->res);
free(q);
return NULL;
}
/** add to query list */
ctx->num_async++;
(void)rbtree_insert(&ctx->queries, &q->node);
return q;
}
uint8_t*
context_serialize_answer(struct ctx_query* q, int err, uint32_t* len)
{
/* answer format
* o uint32 cmd
* o uint32 id
* o uint32 error_code
* o uint32 msg_security
* o the remainder is the answer msg from resolver lookup.
* remainder can be length 0.
*/
uint8_t* p;
*len = sizeof(uint32_t)*4 + q->msg_len;
p = (uint8_t*)malloc(*len);
if(!p) return NULL;
ldns_write_uint32(p, UB_LIBCMD_ANSWER);
ldns_write_uint32(p+sizeof(uint32_t), (uint32_t)q->querynum);
ldns_write_uint32(p+2*sizeof(uint32_t), (uint32_t)err);
ldns_write_uint32(p+3*sizeof(uint32_t), (uint32_t)q->msg_security);
memmove(p+4*sizeof(uint32_t), q->msg, q->msg_len);
return p;
}
struct ctx_query*
context_deserialize_answer(struct ub_val_ctx* ctx,
uint8_t* p, uint32_t len, int* err)
{
struct ctx_query* q = NULL ;
int id;
if(len < 4*sizeof(uint32_t)) return NULL;
log_assert( ldns_read_uint32(p) == UB_LIBCMD_ANSWER);
id = (int)ldns_read_uint32(p+sizeof(uint32_t));
q = (struct ctx_query*)rbtree_search(&ctx->queries, &id);
if(!q) return NULL;
*err = (int)ldns_read_uint32(p+2*sizeof(uint32_t));
q->msg_security = ldns_read_uint32(p+3*sizeof(uint32_t));
if(len > 4*sizeof(uint32_t)) {
q->msg_len = len - 4*sizeof(uint32_t);
q->msg = (uint8_t*)memdup(p+4*sizeof(uint32_t), q->msg_len);
if(!q->msg) {
/* pass malloc failure to the user callback */
q->msg_len = 0;
*err = UB_NOMEM;
return q;
}
} else {
q->msg_len = 0;
free(q->msg);
q->msg = NULL;
}
return q;
}
uint8_t*
context_serialize_cancel(struct ctx_query* q, uint32_t* len)
{
/* format of cancel:
* o uint32 cmd
* o uint32 async-id */
uint8_t* p = (uint8_t*)malloc(2*sizeof(uint32_t));
if(!p) return NULL;
*len = 2*sizeof(uint32_t);
ldns_write_uint32(p, UB_LIBCMD_CANCEL);
ldns_write_uint32(p+sizeof(uint32_t), (uint32_t)q->querynum);
return p;
}
struct ctx_query* context_deserialize_cancel(struct ub_val_ctx* ctx,
uint8_t* p, uint32_t len)
{
struct ctx_query* q;
int id;
if(len != 2*sizeof(uint32_t)) return NULL;
log_assert( ldns_read_uint32(p) == UB_LIBCMD_CANCEL);
id = (int)ldns_read_uint32(p+sizeof(uint32_t));
q = (struct ctx_query*)rbtree_search(&ctx->queries, &id);
return q;
}
uint8_t*
context_serialize_quit(uint32_t* len)
{
uint8_t* p = (uint8_t*)malloc(sizeof(uint32_t));
if(!p)
return NULL;
*len = sizeof(uint32_t);
ldns_write_uint32(p, UB_LIBCMD_QUIT);
return p;
}
enum ub_ctx_cmd context_serial_getcmd(uint8_t* p, uint32_t len)
{
uint32_t v;
if((size_t)len < sizeof(v))
return UB_LIBCMD_QUIT;
v = ldns_read_uint32(p);
return v;
}

View file

@ -130,6 +130,8 @@ struct ctx_query {
int querynum;
/** was this an async query? */
int async;
/** has this query been cancelled? (for bg thread) */
int cancelled;
/** for async query, the callback function */
ub_val_callback_t cb;
@ -171,6 +173,25 @@ enum ub_ctx_err {
UB_INITFAIL = -7
};
/**
* Command codes for libunbound pipe.
*
* Serialization looks like this:
* o length (of remainder) uint32.
* o uint32 command code.
* o per command format.
*/
enum ub_ctx_cmd {
/** QUIT */
UB_LIBCMD_QUIT = 0,
/** New query, sent to bg worker */
UB_LIBCMD_NEWQUERY,
/** Cancel query, sent to bg worker */
UB_LIBCMD_CANCEL,
/** Query result, originates from bg worker */
UB_LIBCMD_ANSWER
};
/**
* finalize a context.
* @param ctx: context to finalize. creates shared data.
@ -208,4 +229,80 @@ struct alloc_cache* context_obtain_alloc(struct ub_val_ctx* ctx);
*/
void context_release_alloc(struct ub_val_ctx* ctx, struct alloc_cache* alloc);
/**
* Serialize a context query that questions data.
* This serializes the query name, type, ...
* As well as command code 'new_query'.
* @param q: context query
* @param len: the length of the allocation is returned.
* @return: an alloc, or NULL on mem error.
*/
uint8_t* context_serialize_new_query(struct ctx_query* q, uint32_t* len);
/**
* Serialize a context_query result to hand back to user.
* This serializes the query name, type, ..., and result.
* As well as command code 'answer'.
* @param q: context query
* @param err: error code to pass to client.
* @param len: the length of the allocation is returned.
* @return: an alloc, or NULL on mem error.
*/
uint8_t* context_serialize_answer(struct ctx_query* q, int err, uint32_t* len);
/**
* Serialize a query cancellation. Serializes query async id
* as well as command code 'cancel'
* @param q: context query
* @param len: the length of the allocation is returned.
* @return: an alloc, or NULL on mem error.
*/
uint8_t* context_serialize_cancel(struct ctx_query* q, uint32_t* len);
/**
* Serialize a 'quit' command.
* @param len: the length of the allocation is returned.
* @return: an alloc, or NULL on mem error.
*/
uint8_t* context_serialize_quit(uint32_t* len);
/**
* Obtain command code from serialized buffer
* @param p: buffer serialized.
* @param len: length of buffer.
* @return command code or QUIT on error.
*/
enum ub_ctx_cmd context_serial_getcmd(uint8_t* p, uint32_t len);
/**
* Deserialize a new_query buffer.
* @param ctx: context
* @param p: buffer serialized.
* @param len: length of buffer.
* @return new ctx_query or NULL for malloc failure.
*/
struct ctx_query* context_deserialize_new_query(struct ub_val_ctx* ctx,
uint8_t* p, uint32_t len);
/**
* Deserialize an answer buffer.
* @param ctx: context
* @param p: buffer serialized.
* @param len: length of buffer.
* @param err: error code to be returned to client is passed.
* @return ctx_query with answer added or NULL for malloc failure.
*/
struct ctx_query* context_deserialize_answer(struct ub_val_ctx* ctx,
uint8_t* p, uint32_t len, int* err);
/**
* Deserialize a cancel buffer.
* @param ctx: context
* @param p: buffer serialized.
* @param len: length of buffer.
* @return ctx_query to cancel or NULL for failure.
*/
struct ctx_query* context_deserialize_cancel(struct ub_val_ctx* ctx,
uint8_t* p, uint32_t len);
#endif /* LIBUNBOUND_CONTEXT_H */

View file

@ -63,6 +63,9 @@
/** size of table used for random numbers. large to be more secure. */
#define RND_STATE_SIZE 256
/** handle new query command for bg worker */
static void handle_newq(struct libworker* w, uint8_t* buf, uint32_t len);
/** delete libworker struct */
static void
libworker_delete(struct libworker* w)
@ -155,51 +158,89 @@ libworker_setup(struct ub_val_ctx* ctx)
return w;
}
static int
/** handle cancel command for bg worker */
static void
handle_cancel(struct libworker* w, uint8_t* buf, uint32_t len)
{
struct ctx_query* q = context_deserialize_cancel(w->ctx, buf, len);
if(!q) {
log_err("deserialize cancel failed");
return;
}
q->cancelled = 1;
}
/** handle control command coming into server */
int
libworker_handle_control_cmd(struct comm_point* c, void* arg,
int err, struct comm_reply* ATTR_UNUSED(rep))
int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep))
{
/*struct ub_val_ctx* ctx = (struct ub_val_ctx*)arg;*/
if(err != NETEVENT_NOERROR) {
if(err == NETEVENT_CLOSED) {
/* parent closed pipe, must have exited somehow */
/* it is of no use to go on, exit */
exit(0);
}
log_err("internal error: control cmd err %d", err);
exit(0);
struct libworker* w = (struct libworker*)arg;
uint32_t len = 0;
uint8_t* buf = NULL;
int r = libworker_read_msg(c->fd, &buf, &len, 1);
if(r==0) {
/* error has happened or */
/* parent closed pipe, must have exited somehow */
/* it is of no use to go on, exit */
comm_base_exit(w->base);
return 0;
}
if(r==-1) /* nothing to read now, try later */
return 0;
switch(context_serial_getcmd(buf, len)) {
default:
case UB_LIBCMD_ANSWER:
log_err("unknown command for bg worker %d",
(int)context_serial_getcmd(buf, len));
/* and fall through to quit */
case UB_LIBCMD_QUIT:
comm_base_exit(w->base);
break;
case UB_LIBCMD_NEWQUERY:
handle_newq(w, buf, len);
break;
case UB_LIBCMD_CANCEL:
handle_cancel(w, buf, len);
break;
}
return 0;
}
static int
/** handle opportunity to write result back */
int
libworker_handle_result_write(struct comm_point* c, void* arg,
int err, struct comm_reply* ATTR_UNUSED(rep))
int ATTR_UNUSED(err), struct comm_reply* ATTR_UNUSED(rep))
{
/*struct ub_val_ctx* ctx = (struct ub_val_ctx*)arg;*/
if(err != NETEVENT_NOERROR) {
if(err == NETEVENT_CLOSED) {
/* parent closed pipe, must have exited somehow */
/* it is of no use to go on, exit */
exit(0);
}
log_err("internal error: pipe comm err %d", err);
exit(0);
struct libworker* w = (struct libworker*)arg;
struct libworker_res_list* item = w->res_list;
int r;
if(!item) {
comm_point_stop_listening(c);
return 0;
}
r = libworker_write_msg(c->fd, item->buf, item->len, 1);
if(r == -1)
return 0; /* try again later */
if(r == 0) {
/* error on pipe, must have exited somehow */
/* it is of no use to go on, exit */
comm_base_exit(w->base);
return 0;
}
/* done this result, remove it */
free(item->buf);
item->buf = NULL;
w->res_list = w->res_list->next;
free(item);
if(!w->res_list) {
w->res_last = NULL;
comm_point_stop_listening(c);
}
return 0;
}
/** get bufsize for cfg */
static size_t
getbufsz(struct ub_val_ctx* ctx)
{
size_t s = 65535;
lock_basic_lock(&ctx->cfglock);
s = ctx->env->cfg->msg_buffer_size;
lock_basic_unlock(&ctx->cfglock);
return s;
}
/** the background thread func */
static void*
libworker_dobg(void* arg)
@ -207,24 +248,22 @@ libworker_dobg(void* arg)
/* setup */
struct ub_val_ctx* ctx = (struct ub_val_ctx*)arg;
struct libworker* w = libworker_setup(ctx);
size_t bufsz = getbufsz(ctx);
log_thread_set(&w->thread_num);
if(!w) {
log_err("libunbound bg worker init failed, nomem");
return NULL;
}
lock_basic_lock(&ctx->qqpipe_lock);
if(!(w->cmd_com=comm_point_create_local(w->base, ctx->qqpipe[0],
bufsz, libworker_handle_control_cmd, w))) {
if(!(w->cmd_com=comm_point_create_raw(w->base, ctx->qqpipe[0], 0,
libworker_handle_control_cmd, w))) {
lock_basic_unlock(&ctx->qqpipe_lock);
log_err("libunbound bg worker init failed, no cmdcom");
return NULL;
}
lock_basic_unlock(&ctx->qqpipe_lock);
lock_basic_lock(&ctx->rrpipe_lock);
/* TODO create writing local commpoint */
if(!(w->res_com=comm_point_create_local(w->base, ctx->rrpipe[1],
bufsz, libworker_handle_result_write, w))) {
if(!(w->res_com=comm_point_create_raw(w->base, ctx->rrpipe[1], 1,
libworker_handle_result_write, w))) {
lock_basic_unlock(&ctx->qqpipe_lock);
log_err("libunbound bg worker init failed, no cmdcom");
return NULL;
@ -441,6 +480,18 @@ int libworker_fg(struct ub_val_ctx* ctx, struct ctx_query* q)
return UB_NOERROR;
}
/** handle new query command for bg worker */
static void
handle_newq(struct libworker* w, uint8_t* buf, uint32_t len)
{
struct ctx_query* q = context_deserialize_new_query(w->ctx, buf, len);
if(!q) {
log_err("failed to deserialize newq");
return;
}
/* TODO start new query in bg mode */
}
void libworker_alloc_cleanup(void* arg)
{
struct libworker* w = (struct libworker*)arg;
@ -547,6 +598,83 @@ libworker_handle_service_reply(struct comm_point* c, void* arg, int error,
return 0;
}
int
libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock)
{
ssize_t r;
/* test */
if(nonblock) {
r = write(fd, &len, sizeof(len));
if(r == -1) {
if(errno==EINTR || errno==EAGAIN)
return -1;
log_err("msg write failed: %s", strerror(errno));
return -1; /* can still continue, perhaps */
}
} else r = 0;
if(!fd_set_block(fd))
return 0;
/* write remainder */
if(r != (ssize_t)sizeof(len)) {
if(write(fd, (char*)(&len)+r, sizeof(len)-r) == -1) {
log_err("msg write failed: %s", strerror(errno));
return 0;
}
}
if(write(fd, buf, len) == -1) {
log_err("msg write failed: %s", strerror(errno));
return 0;
}
if(!fd_set_nonblock(fd))
return 0;
return 1;
}
int
libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock)
{
ssize_t r;
/* test */
*len = 0;
if(nonblock) {
r = read(fd, len, sizeof(*len));
if(r == -1) {
if(errno==EINTR || errno==EAGAIN)
return -1;
log_err("msg read failed: %s", strerror(errno));
return -1; /* we can still continue, perhaps */
}
if(r == 0) /* EOF */
return 0;
} else r = 0;
if(!fd_set_block(fd))
return 0;
/* read remainder */
if(r != (ssize_t)sizeof(*len)) {
if((r=read(fd, (char*)(len)+r, sizeof(*len)-r)) == -1) {
log_err("msg read failed: %s", strerror(errno));
return 0;
}
if(r == 0) /* EOF */
return 0;
}
*buf = (uint8_t*)malloc(*len);
if(!*buf) {
log_err("out of memory");
return 0;
}
if((r=read(fd, *buf, *len)) == -1) {
log_err("msg read failed: %s", strerror(errno));
return 0;
}
if(r == 0) /* EOF */
return 0;
if(!fd_set_nonblock(fd))
return 0;
return 1;
}
/* --- fake callbacks for fptr_wlist to work --- */
int worker_handle_control_cmd(struct comm_point* ATTR_UNUSED(c),
void* ATTR_UNUSED(arg), int ATTR_UNUSED(error),

View file

@ -53,6 +53,7 @@ struct outbound_entry;
struct module_qstate;
struct comm_point;
struct comm_reply;
struct libworker_res_list;
/**
* The library-worker status structure
@ -78,8 +79,13 @@ struct libworker {
struct ub_randstate* rndstate;
/** commpoint to listen to commands */
struct comm_point* cmd_com;
/** commpoint to write results back (nonblocking) */
/** commpoint to write results back */
struct comm_point* res_com;
/** list of outstanding results to be written back */
struct libworker_res_list* res_list;
/** last in list */
struct libworker_res_list* res_last;
};
/**
@ -92,6 +98,18 @@ struct libworker_fg_data {
struct ctx_query* q;
};
/**
* List of results (arbitrary command serializations) to write back
*/
struct libworker_res_list {
/** next in list */
struct libworker_res_list* next;
/** serialized buffer to write */
uint8_t* buf;
/** length to write */
uint32_t len;
};
/**
* Create a background worker
* @param ctx: is updated with pid/tid of the background worker.
@ -157,4 +175,46 @@ int libworker_handle_reply(struct comm_point* c, void* arg, int error,
int libworker_handle_service_reply(struct comm_point* c, void* arg, int error,
struct comm_reply* reply_info);
/** handle control command coming into server */
int libworker_handle_control_cmd(struct comm_point* c, void* arg,
int err, struct comm_reply* rep);
/** handle opportunity to write result back */
int libworker_handle_result_write(struct comm_point* c, void* arg,
int err, struct comm_reply* rep);
/**
* Write length bytes followed by message.
* @param fd: the socket to write on. Is nonblocking.
* Set to blocking by the function,
* and back to non-blocking at exit of function.
* @param buf: the message.
* @param len: length of message.
* @param nonblock: if set to true, the first write is nonblocking.
* If the first write fails the function returns -1.
* If set false, the first write is blocking.
* @return: all remainder writes are nonblocking.
* return 0 on error, in that case blocking/nonblocking of socket is
* unknown.
* return 1 if all OK.
*/
int libworker_write_msg(int fd, uint8_t* buf, uint32_t len, int nonblock);
/**
* Read length bytes followed by message.
* @param fd: the socket to write on. Is nonblocking.
* Set to blocking by the function,
* and back to non-blocking at exit of function.
* @param buf: the message, malloced.
* @param len: length of message, returned.
* @param nonblock: if set to true, the first read is nonblocking.
* If the first read fails the function returns -1.
* If set false, the first read is blocking.
* @return: all remainder reads are nonblocking.
* return 0 on error, in that case blocking/nonblocking of socket is
* unknown. On EOF 0 is returned.
* return 1 if all OK.
*/
int libworker_read_msg(int fd, uint8_t** buf, uint32_t* len, int nonblock);
#endif /* LIBUNBOUND_WORKER_H */

View file

@ -105,6 +105,7 @@ fptr_whitelist_event(void (*fptr)(int, short, void *))
else if(fptr == &comm_timer_callback) return 1;
else if(fptr == &comm_signal_callback) return 1;
else if(fptr == &comm_point_local_handle_callback) return 1;
else if(fptr == &comm_point_raw_handle_callback) return 1;
return 0;
}

View file

@ -743,6 +743,15 @@ void comm_point_local_handle_callback(int fd, short event, void* arg)
log_err("Ignored event %d for localhdl.", event);
}
void comm_point_raw_handle_callback(int ATTR_UNUSED(fd),
short ATTR_UNUSED(event), void* arg)
{
struct comm_point* c = (struct comm_point*)arg;
log_assert(c->type == comm_raw);
(void)(*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, NULL);
}
struct comm_point*
comm_point_create_udp(struct comm_base *base, int fd, ldns_buffer* buffer,
comm_point_callback_t* callback, void* callback_arg)
@ -1048,7 +1057,55 @@ comm_point_create_local(struct comm_base *base, int fd, size_t bufsize,
if(event_base_set(base->eb->base, &c->ev->ev) != 0 ||
event_add(&c->ev->ev, c->timeout) != 0 )
{
log_err("could not add tcphdl event");
log_err("could not add localhdl event");
free(c->ev);
free(c);
return NULL;
}
return c;
}
struct comm_point*
comm_point_create_raw(struct comm_base* base, int fd, int writing,
comm_point_callback_t* callback, void* callback_arg)
{
struct comm_point* c = (struct comm_point*)calloc(1,
sizeof(struct comm_point));
short evbits;
if(!c)
return NULL;
c->ev = (struct internal_event*)calloc(1,
sizeof(struct internal_event));
if(!c->ev) {
free(c);
return NULL;
}
c->fd = fd;
c->buffer = NULL;
c->timeout = NULL;
c->tcp_is_reading = 0;
c->tcp_byte_count = 0;
c->tcp_parent = NULL;
c->max_tcp_count = 0;
c->tcp_handlers = NULL;
c->tcp_free = NULL;
c->type = comm_raw;
c->tcp_do_close = 0;
c->do_not_close = 1;
c->tcp_do_toggle_rw = 0;
c->tcp_check_nb_connect = 0;
c->callback = callback;
c->cb_arg = callback_arg;
/* libevent stuff */
if(writing)
evbits = EV_PERSIST | EV_WRITE;
else evbits = EV_PERSIST | EV_READ;
event_set(&c->ev->ev, c->fd, evbits, comm_point_raw_handle_callback,
c);
if(event_base_set(base->eb->base, &c->ev->ev) != 0 ||
event_add(&c->ev->ev, c->timeout) != 0 )
{
log_err("could not add rawhdl event");
free(c->ev);
free(c);
return NULL;

View file

@ -166,7 +166,9 @@ struct comm_point {
/** TCP handler socket - handle byteperbyte readwrite. */
comm_tcp,
/** AF_UNIX socket - for internal commands. */
comm_local
comm_local,
/** raw - not DNS format - for pipe readers and writers */
comm_raw
}
/** variable with type of socket, UDP,TCP-accept,TCP,pipe */
type;
@ -350,6 +352,19 @@ struct comm_point* comm_point_create_local(struct comm_base* base,
int fd, size_t bufsize,
comm_point_callback_t* callback, void* callback_arg);
/**
* Create commpoint to listen to a local domain pipe descriptor.
* @param base: in which base to alloc the commpoint.
* @param fd: file descriptor.
* @param writing: true if you want to listen to writes, false for reads.
* @param callback: callback function pointer for the handler.
* @param callback_arg: will be passed to your callback function.
* @return: the commpoint or NULL on error.
*/
struct comm_point* comm_point_create_raw(struct comm_base* base,
int fd, int writing,
comm_point_callback_t* callback, void* callback_arg);
/**
* Close a comm point fd.
* @param c: comm point to close.
@ -569,5 +584,14 @@ void comm_signal_callback(int fd, short event, void* arg);
*/
void comm_point_local_handle_callback(int fd, short event, void* arg);
/**
* This routine is published for checks and tests, and is only used internally.
* libevent callback for raw fd access.
* @param fd: file descriptor.
* @param event: event bits from libevent:
* EV_READ, EV_WRITE, EV_SIGNAL, EV_TIMEOUT.
* @param arg: the comm_point structure.
*/
void comm_point_raw_handle_callback(int fd, short event, void* arg);
#endif /* NET_EVENT_H */