make pipe work with fallback on windows.

git-svn-id: file:///svn/unbound/trunk@1166 be551aaa-1e26-0410-a405-d3ace91eadb9
This commit is contained in:
Wouter Wijngaards 2008-07-25 09:26:15 +00:00
parent d452ba59b5
commit 75ac5e0fe0
9 changed files with 150 additions and 11 deletions

View file

@ -1,5 +1,7 @@
25 July 2008: Wouter 25 July 2008: Wouter
- added original copyright statement of OpenBSD arc4random code. - added original copyright statement of OpenBSD arc4random code.
- created tube signaling solution on windows, as a pipe replacement.
this makes background asynchronous resolution work on windows.
22 July 2008: Wouter 22 July 2008: Wouter
- moved pipe actions to util/tube.c. easier porting and shared code. - moved pipe actions to util/tube.c. easier porting and shared code.

View file

@ -1116,4 +1116,10 @@ void comm_timer_delete(struct comm_timer* timer)
free(timer); free(timer);
} }
struct event_base* comm_base_internal(struct comm_base* ATTR_UNUSED(b))
{
/* no pipe comm possible in testbound */
return NULL;
}
/*********** End of Dummy routines ***********/ /*********** End of Dummy routines ***********/

View file

@ -115,6 +115,7 @@ fptr_whitelist_event(void (*fptr)(int, short, void *))
else if(fptr == &comm_signal_callback) return 1; else if(fptr == &comm_signal_callback) return 1;
else if(fptr == &comm_point_local_handle_callback) return 1; else if(fptr == &comm_point_local_handle_callback) return 1;
else if(fptr == &comm_point_raw_handle_callback) return 1; else if(fptr == &comm_point_raw_handle_callback) return 1;
else if(fptr == &tube_handle_signal) return 1;
return 0; return 0;
} }

View file

@ -209,6 +209,11 @@ void comm_base_exit(struct comm_base* b)
} }
} }
struct event_base* comm_base_internal(struct comm_base* b)
{
return b->eb->base;
}
/* send a UDP reply */ /* send a UDP reply */
int int
comm_point_send_udp_msg(struct comm_point *c, ldns_buffer* packet, comm_point_send_udp_msg(struct comm_point *c, ldns_buffer* packet,

View file

@ -63,6 +63,7 @@
#include "config.h" #include "config.h"
struct comm_point; struct comm_point;
struct comm_reply; struct comm_reply;
struct event_base;
/* internal event notification data storage structure. */ /* internal event notification data storage structure. */
struct internal_event; struct internal_event;
@ -289,6 +290,13 @@ void comm_base_dispatch(struct comm_base* b);
*/ */
void comm_base_exit(struct comm_base* b); void comm_base_exit(struct comm_base* b);
/**
* Access internal data structure (for util/tube.c on windows)
* @param b: comm base
* @return event_base. Could be libevent, or internal event handler.
*/
struct event_base* comm_base_internal(struct comm_base* b);
/** /**
* Create an UDP comm point. Calls malloc. * Create an UDP comm point. Calls malloc.
* setups the structure with the parameters you provide. * setups the structure with the parameters you provide.

View file

@ -45,8 +45,7 @@
#include "util/netevent.h" #include "util/netevent.h"
#include "util/fptr_wlist.h" #include "util/fptr_wlist.h"
/*#ifndef USE_WINSOCK TODO */ #ifndef USE_WINSOCK
#if 1
/* on unix */ /* on unix */
struct tube* tube_create(void) struct tube* tube_create(void)
@ -456,6 +455,11 @@ int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
return 1; return 1;
} }
void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
void* ATTR_UNUSED(arg))
{
log_assert(0);
}
#else /* USE_WINSOCK */ #else /* USE_WINSOCK */
/* on windows */ /* on windows */
@ -479,6 +483,7 @@ struct tube* tube_create(void)
log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError())); log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError()));
} }
lock_basic_init(&tube->res_lock); lock_basic_init(&tube->res_lock);
verbose(VERB_ALGO, "tube created");
return tube; return tube;
} }
@ -492,15 +497,18 @@ void tube_delete(struct tube* tube)
if(!WSACloseEvent(tube->event)) if(!WSACloseEvent(tube->event))
log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError())); log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError()));
lock_basic_destroy(&tube->res_lock); lock_basic_destroy(&tube->res_lock);
verbose(VERB_ALGO, "tube deleted");
free(tube); free(tube);
} }
void tube_close_read(struct tube* ATTR_UNUSED(tube)) void tube_close_read(struct tube* ATTR_UNUSED(tube))
{ {
verbose(VERB_ALGO, "tube close_read");
} }
void tube_close_write(struct tube* ATTR_UNUSED(tube)) void tube_close_write(struct tube* ATTR_UNUSED(tube))
{ {
verbose(VERB_ALGO, "tube close_write");
/* wake up waiting reader with an empty queue */ /* wake up waiting reader with an empty queue */
if(!WSASetEvent(tube->event)) { if(!WSASetEvent(tube->event)) {
log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError())); log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
@ -509,10 +517,13 @@ void tube_close_write(struct tube* ATTR_UNUSED(tube))
void tube_remove_bg_listen(struct tube* tube) void tube_remove_bg_listen(struct tube* tube)
{ {
verbose(VERB_ALGO, "tube remove_bg_listen");
winsock_unregister_wsaevent(&tube->ev_listen);
} }
void tube_remove_bg_write(struct tube* tube) void tube_remove_bg_write(struct tube* tube)
{ {
verbose(VERB_ALGO, "tube remove_bg_write");
if(tube->res_list) { if(tube->res_list) {
struct tube_res_list* np, *p = tube->res_list; struct tube_res_list* np, *p = tube->res_list;
tube->res_list = NULL; tube->res_list = NULL;
@ -529,16 +540,25 @@ void tube_remove_bg_write(struct tube* tube)
int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
int ATTR_UNUSED(nonblock)) int ATTR_UNUSED(nonblock))
{ {
uint8_t* a;
verbose(VERB_ALGO, "tube write_msg len %d", (int)len);
a = (uint8_t*)memdup(buf, len);
if(!a) {
log_err("out of memory in tube_write_msg");
return 0;
}
/* always nonblocking, this pipe cannot get full */ /* always nonblocking, this pipe cannot get full */
return tube_queue_item(tube, buf, len); return tube_queue_item(tube, a, len);
} }
int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
int nonblock) int nonblock)
{ {
struct tube_res_list* item = NULL; struct tube_res_list* item = NULL;
verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking");
*buf = NULL; *buf = NULL;
if(!tube_poll(tube)) { if(!tube_poll(tube)) {
verbose(VERB_ALGO, "tube read_msg nodata");
/* nothing ready right now, wait if we want to */ /* nothing ready right now, wait if we want to */
if(nonblock) if(nonblock)
return -1; /* would block waiting for items */ return -1; /* would block waiting for items */
@ -552,9 +572,10 @@ int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
if(tube->res_last == item) { if(tube->res_last == item) {
/* the list is now empty */ /* the list is now empty */
tube->res_last = NULL; tube->res_last = NULL;
if(!WSAResetEvent(&tube->event)) { verbose(VERB_ALGO, "tube read_msg lastdata");
if(!WSAResetEvent(tube->event)) {
log_err("WSAResetEvent: %s", log_err("WSAResetEvent: %s",
wsa_strerror(errno)); wsa_strerror(WSAGetLastError()));
} }
} }
} }
@ -564,6 +585,7 @@ int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
*buf = item->buf; *buf = item->buf;
*len = item->len; *len = item->len;
free(item); free(item);
verbose(VERB_ALGO, "tube read_msg len %d", (int)*len);
return 1; return 1;
} }
@ -605,10 +627,11 @@ int tube_read_fd(struct tube* ATTR_UNUSED(tube))
} }
int int
tube_handle_listen(struct comm_point* c, void* arg, int error, tube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
struct comm_reply* ATTR_UNUSED(reply_info)) int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
{ {
/* TODO */ log_assert(0);
return 0;
} }
int int
@ -622,7 +645,10 @@ tube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
int tube_setup_bg_listen(struct tube* tube, struct comm_base* base, int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
tube_callback_t* cb, void* arg) tube_callback_t* cb, void* arg)
{ {
/* TODO register with event base */ tube->listen_cb = cb;
tube->listen_arg = arg;
return winsock_register_wsaevent(comm_base_internal(base),
&tube->ev_listen, tube->event, &tube_handle_signal, tube);
} }
int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube), int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube),
@ -636,6 +662,7 @@ int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
{ {
struct tube_res_list* item = struct tube_res_list* item =
(struct tube_res_list*)malloc(sizeof(*item)); (struct tube_res_list*)malloc(sizeof(*item));
verbose(VERB_ALGO, "tube queue_item len %d", (int)len);
if(!item) { if(!item) {
free(msg); free(msg);
log_err("out of memory for async answer"); log_err("out of memory for async answer");
@ -658,4 +685,20 @@ int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
return 1; return 1;
} }
void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
void* arg)
{
struct tube* tube = (struct tube*)arg;
uint8_t* buf;
uint32_t len;
verbose(VERB_ALGO, "tube handle_signal");
while(tube_poll(tube)) {
if(tube_read_msg(tube, &buf, &len, 1)) {
fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
(*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR,
tube->listen_arg);
}
}
}
#endif /* USE_WINSOCK */ #endif /* USE_WINSOCK */

View file

@ -48,6 +48,7 @@ struct tube;
struct tube_res_list; struct tube_res_list;
#ifdef USE_WINSOCK #ifdef USE_WINSOCK
#include "util/locks.h" #include "util/locks.h"
#include "util/winsock_event.h"
#endif #endif
/** /**
@ -97,6 +98,8 @@ struct tube {
void* listen_arg; void* listen_arg;
/** the windows sockets event (signaled if items in pipe) */ /** the windows sockets event (signaled if items in pipe) */
WSAEVENT event; WSAEVENT event;
/** winsock event storage when registered with event base */
struct event ev_listen;
/** lock on the list of outstanding items */ /** lock on the list of outstanding items */
lock_basic_t res_lock; lock_basic_t res_lock;
@ -264,4 +267,7 @@ int tube_handle_listen(struct comm_point* c, void* arg, int error,
int tube_handle_write(struct comm_point* c, void* arg, int error, int tube_handle_write(struct comm_point* c, void* arg, int error,
struct comm_reply* reply_info); struct comm_reply* reply_info);
/** for fptr wlist, winsock signal event callback function */
void tube_handle_signal(int fd, short events, void* arg);
#endif /* UTIL_TUBE_H */ #endif /* UTIL_TUBE_H */

View file

@ -204,7 +204,7 @@ static int handle_select(struct event_base* base, struct timeval* wait)
/* prepare event array */ /* prepare event array */
for(i=0; i<base->max; i++) { for(i=0; i<base->max; i++) {
if(base->items[i]->ev_fd == -1) if(base->items[i]->ev_fd == -1 && !base->items[i]->is_signal)
continue; /* skip timer only events */ continue; /* skip timer only events */
eventlist[numwait] = base->items[i]; eventlist[numwait] = base->items[i];
waitfor[numwait++] = base->items[i]->hEvent; waitfor[numwait++] = base->items[i]->hEvent;
@ -247,6 +247,15 @@ static int handle_select(struct event_base* base, struct timeval* wait)
for(i=startidx; i<numwait; i++) { for(i=startidx; i<numwait; i++) {
short bits = 0; short bits = 0;
/* eventlist[i] fired */ /* eventlist[i] fired */
if(eventlist[i]->is_signal) {
/* not a network event at all */
fptr_ok(fptr_whitelist_event(
eventlist[i]->ev_callback));
(*eventlist[i]->ev_callback)(eventlist[i]->ev_fd,
eventlist[i]->ev_events,
eventlist[i]->ev_arg);
continue;
}
if(WSAEnumNetworkEvents(eventlist[i]->ev_fd, if(WSAEnumNetworkEvents(eventlist[i]->ev_fd,
waitfor[i], /* reset the event handle */ waitfor[i], /* reset the event handle */
/*NULL,*/ /* do not reset the event handle */ /*NULL,*/ /* do not reset the event handle */
@ -418,6 +427,7 @@ int event_add(struct event *ev, struct timeval *tv)
ev->idx = ev->ev_base->max++; ev->idx = ev->ev_base->max++;
ev->ev_base->items[ev->idx] = ev; ev->ev_base->items[ev->idx] = ev;
ev->is_tcp = 0; ev->is_tcp = 0;
ev->is_signal = 0;
if((ev->ev_events&(EV_READ|EV_WRITE)) && ev->ev_fd != -1) { if((ev->ev_events&(EV_READ|EV_WRITE)) && ev->ev_fd != -1) {
BOOL b=0; BOOL b=0;
@ -562,4 +572,36 @@ void winsock_tcp_wouldblock(struct event* ev, int eventbits)
*/ */
} }
int winsock_register_wsaevent(struct event_base* base, struct event* ev,
WSAEVENT wsaevent, void (*cb)(int, short, void*), void* arg)
{
if(base->max == base->cap)
return 0;
memset(ev, 0, sizeof(*ev));
ev->ev_fd = -1;
ev->ev_events = EV_READ;
ev->ev_callback = cb;
ev->ev_arg = arg;
ev->is_signal = 1;
ev->hEvent = wsaevent;
ev->added = 1;
ev->ev_base = base;
ev->idx = ev->ev_base->max++;
ev->ev_base->items[ev->idx] = ev;
return 1;
}
void winsock_unregister_wsaevent(struct event* ev)
{
if(!ev || !ev->added) return;
log_assert(ev->added && ev->ev_base->max > 0)
/* remove item and compact the list */
ev->ev_base->items[ev->idx] = ev->ev_base->items[ev->ev_base->max-1];
ev->ev_base->items[ev->ev_base->max-1] = NULL;
ev->ev_base->max--;
if(ev->idx < ev->ev_base->max)
ev->ev_base->items[ev->idx]->idx = ev->idx;
ev->added = 0;
}
#endif /* USE_WINSOCK */ #endif /* USE_WINSOCK */

View file

@ -167,7 +167,7 @@ struct event {
struct timeval ev_timeout; struct timeval ev_timeout;
/** callback to call: fd, eventbits, userarg */ /** callback to call: fd, eventbits, userarg */
void (*ev_callback)(int, short, void *arg); void (*ev_callback)(int, short, void *);
/** callback user arg */ /** callback user arg */
void *ev_arg; void *ev_arg;
@ -183,6 +183,11 @@ struct event {
/** should remembered EV_ values be used for TCP streams. /** should remembered EV_ values be used for TCP streams.
* Reset after WOULDBLOCK is signaled using the function. */ * Reset after WOULDBLOCK is signaled using the function. */
int stick_events; int stick_events;
/** true if this event is a signaling WSAEvent by the user.
* User created and user closed WSAEvent. Only signaled/unsigneled,
* no read/write/distinctions needed. */
int is_signal;
}; };
/** create event base */ /** create event base */
@ -231,5 +236,26 @@ int mini_ev_cmp(const void* a, const void* b);
*/ */
void winsock_tcp_wouldblock(struct event* ev, int eventbit); void winsock_tcp_wouldblock(struct event* ev, int eventbit);
/**
* Routine for windows only. where you pass a signal WSAEvent that
* you wait for. When the event is signaled, the callback gets called.
* The callback has to WSAResetEvent to disable the signal.
* @param base: the event base.
* @param ev: the event structure for data storage
* can be passed uninitialised.
* @param wsaevent: the WSAEvent that gets signaled.
* @param cb: callback routine.
* @param arg: user argument to callback routine.
* @return false on error.
*/
int winsock_register_wsaevent(struct event_base* base, struct event* ev,
WSAEVENT wsaevent, void (*cb)(int, short, void*), void* arg);
/**
* Unregister a wsaevent. User has to close the WSAEVENT itself.
* @param ev: event data storage.
*/
void winsock_unregister_wsaevent(struct event* ev);
#endif /* USE_WINSOCK */ #endif /* USE_WINSOCK */
#endif /* UTIL_WINSOCK_EVENT_H */ #endif /* UTIL_WINSOCK_EVENT_H */