diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c index 20fa29084..532d49dff 100644 --- a/src/ev_kqueue.c +++ b/src/ev_kqueue.c @@ -29,7 +29,7 @@ /* private data */ -static int kqueue_fd; +static int kqueue_fd[MAX_THREADS]; // per-thread kqueue_fd static THREAD_LOCAL struct kevent *kev = NULL; /* @@ -61,35 +61,34 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) fdtab[fd].state = en; HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); - if ((eo ^ en) & FD_EV_POLLED_RW) { - /* poll status changed */ - if ((eo ^ en) & FD_EV_POLLED_R) { - /* read poll status changed */ - if (en & FD_EV_POLLED_R) { - EV_SET(&kev[changes], fd, EVFILT_READ, EV_ADD, 0, 0, NULL); - changes++; - } - else { - EV_SET(&kev[changes], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); - changes++; - } + if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) { + if (!(fdtab[fd].polled_mask & tid_bit)) { + /* fd was not watched, it's still not */ + continue; } + /* fd totally removed from poll list */ + EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit); + } + else { + /* OK fd has to be monitored, it was either added or changed */ - if ((eo ^ en) & FD_EV_POLLED_W) { - /* write poll status changed */ - if (en & FD_EV_POLLED_W) { - EV_SET(&kev[changes], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); - changes++; - } - else { - EV_SET(&kev[changes], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); - changes++; - } - } + if (en & FD_EV_POLLED_R) + EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL); + else if (fdtab[fd].polled_mask & tid_bit) + EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + + if (en & FD_EV_POLLED_W) + EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); + else if (fdtab[fd].polled_mask & tid_bit) + EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + + HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit); } } if (changes) - kevent(kqueue_fd, kev, changes, NULL, 0, NULL); + kevent(kqueue_fd[tid], kev, changes, NULL, 0, NULL); fd_nbupdt = 0; delta_ms = 0; @@ -113,7 +112,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) fd = MIN(maxfd, global.tune.maxpollevents); gettimeofday(&before_poll, NULL); - status = kevent(kqueue_fd, // int kq + status = kevent(kqueue_fd[tid], // int kq NULL, // const struct kevent *changelist 0, // int nchanges kev, // struct kevent *eventlist @@ -155,11 +154,32 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) static int init_kqueue_per_thread() { + int fd; + /* we can have up to two events per fd (*/ kev = calloc(1, sizeof(struct kevent) * 2 * global.maxsock); if (kev == NULL) - return 0; + goto fail_alloc; + + if (tid) { + kqueue_fd[tid] = kqueue(); + if (kqueue_fd[tid] < 0) + goto fail_fd; + } + + /* we may have to unregister some events initially registered on the + * original fd when it was alone, and/or to register events on the new + * fd for this thread. Let's just mark them as updated, the poller will + * do the rest. + */ + for (fd = 0; fd < maxfd; fd++) + updt_fd_polling(fd); + return 1; + fail_fd: + free(kev); + fail_alloc: + return 0; } static void deinit_kqueue_per_thread() @@ -177,8 +197,8 @@ REGPRM1 static int _do_init(struct poller *p) { p->private = NULL; - kqueue_fd = kqueue(); - if (kqueue_fd < 0) + kqueue_fd[tid] = kqueue(); + if (kqueue_fd[tid] < 0) goto fail_fd; hap_register_per_thread_init(init_kqueue_per_thread); @@ -196,9 +216,9 @@ REGPRM1 static int _do_init(struct poller *p) */ REGPRM1 static void _do_term(struct poller *p) { - if (kqueue_fd >= 0) { - close(kqueue_fd); - kqueue_fd = -1; + if (kqueue_fd[tid] >= 0) { + close(kqueue_fd[tid]); + kqueue_fd[tid] = -1; } p->private = NULL; @@ -227,8 +247,8 @@ REGPRM1 static int _do_test(struct poller *p) */ REGPRM1 static int _do_fork(struct poller *p) { - kqueue_fd = kqueue(); - if (kqueue_fd < 0) + kqueue_fd[tid] = kqueue(); + if (kqueue_fd[tid] < 0) return 0; return 1; } @@ -242,11 +262,14 @@ __attribute__((constructor)) static void _do_register(void) { struct poller *p; + int i; if (nbpollers >= MAX_POLLERS) return; - kqueue_fd = -1; + for (i = 0; i < MAX_THREADS; i++) + kqueue_fd[i] = -1; + p = &pollers[nbpollers++]; p->name = "kqueue";