diff --git a/src/openvpn/forward.c b/src/openvpn/forward.c index 1fc9d5e1..74fa192a 100644 --- a/src/openvpn/forward.c +++ b/src/openvpn/forward.c @@ -46,7 +46,9 @@ #include "memdbg.h" #include +#include #include +#include counter_type link_read_bytes_global; /* GLOBAL */ counter_type link_write_bytes_global; /* GLOBAL */ @@ -1353,8 +1355,7 @@ read_incoming_tun_part2(struct context *c) ASSERT(buf_safe(&c->c2.buf, c->c2.frame.buf.payload_size)); if (c->c1.tuntap->backend_driver == DRIVER_AFUNIX) { - c->c2.buf.len = - (int)read_tun_afunix(c->c1.tuntap, BPTR(&c->c2.buf), c->c2.frame.buf.payload_size); + c->c2.buf.len = (int)read_tun_afunix(c->c1.tuntap, BPTR(&c->c2.buf), c->c2.frame.buf.payload_size); } else { @@ -2478,8 +2479,28 @@ io_wait(struct context *c, const unsigned int flags) dmsg(D_EVENT_WAIT, "I/O WAIT status=0x%04x", c->c2.event_set_status); } +void threaded_fwd_inp_intf(struct context *c, struct link_socket *sock, struct thread_pointer *b) +{ + if (b->p->h == b->p->n) + { + ssize_t size; + uint8_t temp[1]; + size = read(c->c1.tuntap->fd, temp, 1); + if (size < 1) { /* no-op */ } + if (!IS_SIG(c)) + { + if (!BULK_MODE(c)) + { + c->c2.buf = c->c2.buffers->read_tun_buf; + } + process_incoming_tun(c, sock); + } + size = write(c->c1.tuntap->fz, temp, 1); + } +} + void -process_io(struct context *c, struct link_socket *sock) +process_io(struct context *c, struct link_socket *sock, struct thread_pointer *b) { const unsigned int status = c->c2.event_set_status; @@ -2513,11 +2534,7 @@ process_io(struct context *c, struct link_socket *sock) /* Incoming data on TUN device */ else if (status & TUN_READ) { - read_incoming_tun(c); - if (!IS_SIG(c)) - { - process_incoming_tun(c, sock); - } + threaded_fwd_inp_intf(c, sock, b); } else if (status & DCO_READ) { diff --git a/src/openvpn/forward.h b/src/openvpn/forward.h index 440cface..f56dcdfb 100644 --- a/src/openvpn/forward.h +++ b/src/openvpn/forward.h @@ -79,7 +79,7 @@ void io_wait(struct context *c, const unsigned int flags); void pre_select(struct context *c); -void process_io(struct context *c, struct link_socket *sock); +void process_io(struct context *c, struct link_socket *sock, struct thread_pointer *b); /**********************************************************************/ diff --git a/src/openvpn/init.c b/src/openvpn/init.c index f98386e4..8bdb715e 100644 --- a/src/openvpn/init.c +++ b/src/openvpn/init.c @@ -1906,6 +1906,7 @@ do_open_tun(struct context *c, int *error_flags) } /* do ifconfig */ + c->c1.tuntap->skip_bind = c->skip_bind; if (!ifconfig_noexec_enabled(c) && ifconfig_order(c->c1.tuntap) == IFCONFIG_BEFORE_TUN_OPEN) { /* guess actual tun/tap unit number that will be returned @@ -1999,6 +2000,10 @@ do_open_tun(struct context *c, int *error_flags) add_wfp_block(c); } + if (c->c1.tuntap) + { + c->c1.tuntap->fe = c->c1.tuntap->fd; + } gc_free(&gc); return ret; } diff --git a/src/openvpn/mtcp.c b/src/openvpn/mtcp.c index 7651b4d1..174e6f51 100644 --- a/src/openvpn/mtcp.c +++ b/src/openvpn/mtcp.c @@ -38,15 +38,18 @@ #endif struct multi_instance * -multi_create_instance_tcp(struct multi_context *m, struct link_socket *sock) +multi_create_instance_tcp(struct thread_pointer *b, struct link_socket *sock) { struct gc_arena gc = gc_new(); + struct multi_context *m = b->p->m[b->i-1]; struct multi_instance *mi = NULL; struct hash *hash = m->hash; - mi = multi_create_instance(m, NULL, sock); + mi = multi_create_instance(b, NULL, sock); if (mi) { + m = b->p->p; + hash = m->hash; mi->real.proto = sock->info.proto; struct hash_element *he; const uint32_t hv = hash_value(hash, &mi->real); diff --git a/src/openvpn/mtcp.h b/src/openvpn/mtcp.h index 9b7d1d2f..ff6cb3a8 100644 --- a/src/openvpn/mtcp.h +++ b/src/openvpn/mtcp.h @@ -45,7 +45,7 @@ bool multi_tcp_process_outgoing_link(struct multi_context *m, bool defer, bool multi_tcp_process_outgoing_link_ready(struct multi_context *m, struct multi_instance *mi, const unsigned int mpp_flags); -struct multi_instance *multi_create_instance_tcp(struct multi_context *m, struct link_socket *sock); +struct multi_instance *multi_create_instance_tcp(struct thread_pointer *b, struct link_socket *sock); void multi_tcp_link_out_deferred(struct multi_context *m, struct multi_instance *mi); diff --git a/src/openvpn/mudp.c b/src/openvpn/mudp.c index e6e9a0fe..0459db8b 100644 --- a/src/openvpn/mudp.c +++ b/src/openvpn/mudp.c @@ -193,6 +193,8 @@ multi_get_create_instance_udp(struct multi_context *m, bool *floated, struct lin struct mroute_addr real = { 0 }; struct multi_instance *mi = NULL; struct hash *hash = m->hash; + struct context_pointer p = { 0 }; + struct thread_pointer b = { 0 }; real.proto = sock->info.proto; m->hmac_reply_ls = sock; @@ -266,7 +268,8 @@ multi_get_create_instance_udp(struct multi_context *m, bool *floated, struct lin * connect-freq but not against connect-freq-initial */ reflect_filter_rate_limit_decrease(m->initial_rate_limiter); - mi = multi_create_instance(m, &real, sock); + p.p = m; b.p = &p; b.i = -1; + mi = multi_create_instance(&b, &real, sock); if (mi) { hash_add_fast(hash, bucket, &mi->real, hv, mi); diff --git a/src/openvpn/multi.c b/src/openvpn/multi.c index 5e49cc75..01ab1906 100644 --- a/src/openvpn/multi.c +++ b/src/openvpn/multi.c @@ -160,10 +160,15 @@ multi_ifconfig_pool_persist(struct multi_context *m, bool force) static void multi_reap_range(const struct multi_context *m, uint32_t start_bucket, uint32_t end_bucket) { - struct gc_arena gc = gc_new(); struct hash_iterator hi; struct hash_element *he; + /*if (m->top.options.ce.mtio_conf) + { + return; + }*/ + + struct gc_arena gc = gc_new(); dmsg(D_MULTI_DEBUG, "MULTI: REAP range %d -> %d", start_bucket, end_bucket); hash_iterator_init_range(m->vhash, &hi, start_bucket, end_bucket); while ((he = hash_iterator_next(&hi)) != NULL) @@ -171,12 +176,13 @@ multi_reap_range(const struct multi_context *m, uint32_t start_bucket, uint32_t struct multi_route *r = (struct multi_route *)he->value; if (!multi_route_defined(m, r)) { - dmsg(D_MULTI_DEBUG, "MULTI: REAP DEL %s", mroute_addr_print(&r->addr, &gc)); + msg(M_INFO, "MULTI: REAP DEL %s", mroute_addr_print(&r->addr, &gc)); learn_address_script(m, NULL, "delete", &r->addr); multi_route_del(r); hash_iterator_delete_element(&hi); } } + hash_iterator_free(&hi); gc_free(&gc); } @@ -416,6 +422,10 @@ multi_init(struct context *t) m->inst_indx = -1; m->inst_leng = -1; m->inst_list = calloc(TUN_BAT_MAX, sizeof(struct multi_instance *)); + + m->mtio_stat = 1; + m->mtio_idno = 1; + bzero(&(m->mtio_info), sizeof(struct multi_info)); } const char * @@ -620,7 +630,7 @@ multi_close_instance(struct multi_context *m, struct multi_instance *mi, bool sh schedule_remove_entry(m->schedule, (struct schedule_entry *)mi); - ifconfig_pool_release(m->ifconfig_pool, mi->vaddr_handle, false); + ifconfig_pool_release(m->mtio_info.pool, mi->vaddr_handle, false); if (mi->did_iroutes) { @@ -636,6 +646,10 @@ multi_close_instance(struct multi_context *m, struct multi_instance *mi, bool sh mbuf_dereference_instance(m->mbuf, mi); } + mi->mtio_stat = 1; + mi->mtio_idno = m->mtio_idno; + bzero(&(mi->mtio_addr), sizeof(struct multi_address)); + #ifdef ENABLE_MANAGEMENT set_cc_config(mi, NULL); #endif @@ -704,18 +718,172 @@ multi_uninit(struct multi_context *m) m->inst_indx = -1; m->inst_leng = -1; free(m->inst_list); + + m->mtio_stat = 1; + m->mtio_idno = 1; + bzero(&(m->mtio_info), sizeof(struct multi_info)); } } +bool multi_context_switch_addr(struct multi_context *m, struct multi_instance *i, bool s, bool l) +{ + struct gc_arena g = gc_new(); + + in_addr_t ladr_objc = i->context.c2.push_ifconfig_local; + struct sockaddr_in *wadr_objc = (struct sockaddr_in *)&i->context.c2.link_sockets[0]->info.lsa->actual.dest.addr.sa; + + const char *ladr = print_in_addr_t(ladr_objc, IA_EMPTY_IF_UNDEF, &g); + if ((strcmp(i->mtio_addr.ladr, "") == 0) && ladr) + { + bzero(i->mtio_addr.ladr, MAX_STRLENG * sizeof(char)); + strncpy(i->mtio_addr.ladr, ladr, MAX_STRLENG-5); + } + + const char *wadr = inet_ntoa(wadr_objc->sin_addr); + if ((strcmp(i->mtio_addr.wadr, "") == 0) && wadr) + { + bzero(i->mtio_addr.wadr, MAX_STRLENG * sizeof(char)); + strncpy(i->mtio_addr.wadr, wadr, MAX_STRLENG-5); + } + + const char *comm = tls_common_name(i->context.c2.tls_multi, true); + if ((strcmp(i->mtio_addr.comm, "") == 0) && comm) + { + bzero(i->mtio_addr.comm, MAX_STRLENG * sizeof(char)); + strncpy(i->mtio_addr.comm, comm, MAX_STRLENG-5); + } + + const char *conn = tls_username(i->context.c2.tls_multi, true); + if ((strcmp(i->mtio_addr.user, "") == 0) && conn) + { + bzero(i->mtio_addr.user, MAX_STRLENG * sizeof(char)); + strncpy(i->mtio_addr.user, conn, MAX_STRLENG-5); + } + + const char *uniq = i->mtio_addr.uniq; + if ((strcmp(i->mtio_addr.uniq, "") == 0) && wadr) + { + bzero(i->mtio_addr.uniq, MAX_STRLENG * sizeof(char)); + snprintf(i->mtio_addr.uniq, MAX_STRLENG-5, "%s", wadr); + } + + i->mtio_addr.addr = ladr_objc; + + if (strcmp(uniq, "") == 0) + { + goto last; + } + + if (m) + { + pthread_mutex_lock(m->mtio_info.lock); + for (int x = 0; x < m->mtio_info.maxc; ++x) + { + struct multi_link *l = &(m->mtio_info.link[x]); + if (strcmp(l->uniq, uniq) == 0) + { + if (s) + { + int indx = (i->mtio_idno % MAX_THREADS); + l->adrs[indx] = i->mtio_addr; + } + if (l) + { + for (int y = 0; y < MAX_THREADS; ++y) + { + struct multi_address *a = &(l->adrs[y]); + if (strcmp(a->uniq, "") != 0) + { + multi_learn_in_addr_t(m, i, a->addr, -1, true); + } + } + } + } + } + pthread_mutex_unlock(m->mtio_info.lock); + } + +last: + gc_free(&g); + + return true; +} + +struct multi_context *multi_context_switch_conn(struct thread_pointer *b, struct multi_context *m, struct multi_instance *i) +{ + if (b->i <= 0) + { + b->p->p = m; + return m; + } + + int indx = -1, fidx = 0; + time_t secs = time(NULL); + time_t last = b->p->k[fidx].last; + struct multi_link *link; + + struct sockaddr_in *wadr_objc = (struct sockaddr_in *)&i->context.c2.link_sockets[0]->info.lsa->actual.dest.addr.sa; + const char *wadr = inet_ntoa(wadr_objc->sin_addr); + + char uniq[MAX_STRLENG]; + bzero(uniq, MAX_STRLENG * sizeof(char)); + if (strcmp(wadr, "") != 0) + { + snprintf(uniq, MAX_STRLENG-5, "%s", wadr); + } + + if (strcmp(uniq, "") == 0) + { + goto last; + } + + for (int x = 0; x < b->p->x; ++x) + { + link = &(b->p->k[x]); + if ((link->last < 1) || (link->last < last)) + { + fidx = x; + last = link->last; + } + if (strcmp(link->uniq, uniq) == 0) + { + indx = x; + break; + } + } + + pthread_mutex_lock(m->mtio_info.lock); + if (indx < 0) + { + indx = fidx; + link = &(b->p->k[indx]); + bzero(link, sizeof(struct multi_link)); + strncpy(link->uniq, uniq, MAX_STRLENG-5); + } + pthread_mutex_unlock(m->mtio_info.lock); + + link = &(b->p->k[indx]); + m = b->p->m[link->indx]; + b->p->p = b->p->m[link->indx]; + i->mtio_idno = m->mtio_idno; + link->indx = ((link->indx + 1) % b->p->n); + link->last = secs; + +last: + msg(M_INFO, "TCPv4_SERVER MTIO conn [%s][%p] [%d][%d] {%d}{%d}", uniq, m, indx, fidx, i->mtio_idno, m->mtio_idno); + + return m; +} + /* * Create a client instance object for a newly connected client. */ struct multi_instance * -multi_create_instance(struct multi_context *m, const struct mroute_addr *real, - struct link_socket *sock) +multi_create_instance(struct thread_pointer *b, const struct mroute_addr *real, struct link_socket *sock) { struct gc_arena gc = gc_new(); struct multi_instance *mi; + struct multi_context *m = (b->i > 0) ? b->p->m[b->i-1] : b->p->p; msg(D_MULTI_MEDIUM, "MULTI: multi_create_instance called"); @@ -738,6 +906,7 @@ multi_create_instance(struct multi_context *m, const struct mroute_addr *real, { goto err; } + m = multi_context_switch_conn(b, m, mi); mi->context.c2.tls_multi->multi_state = CAS_NOT_CONNECTED; @@ -780,6 +949,10 @@ multi_create_instance(struct multi_context *m, const struct mroute_addr *real, mi->ev_arg.type = EVENT_ARG_MULTI_INSTANCE; mi->ev_arg.u.mi = mi; + mi->mtio_stat = 1; + mi->mtio_idno = m->mtio_idno; + bzero(&(mi->mtio_addr), sizeof(struct multi_address)); + gc_free(&gc); return mi; @@ -1020,8 +1193,7 @@ multi_print_status(struct multi_context *m, struct status_output *so, const int * or NULL if none. */ static struct multi_instance * -multi_learn_addr(struct multi_context *m, struct multi_instance *mi, const struct mroute_addr *addr, - const unsigned int flags) +multi_learn_addr(struct multi_context *m, struct multi_instance *mi, const struct mroute_addr *addr, const unsigned int flags) { struct hash_element *he; const uint32_t hv = hash_value(m->vhash, addr); @@ -1030,6 +1202,18 @@ multi_learn_addr(struct multi_context *m, struct multi_instance *mi, const struc struct multi_instance *owner = NULL; struct gc_arena gc = gc_new(); + struct sockaddr_in *wadr_objc = (struct sockaddr_in *)&mi->context.c2.link_sockets[0]->info.lsa->actual.dest.addr.sa; + const char *ladr = print_in_addr_t(mi->context.c2.push_ifconfig_local, IA_EMPTY_IF_UNDEF, &gc); + const char *madr = mroute_addr_print(addr, &gc); + const char *wadr = inet_ntoa(wadr_objc->sin_addr); + + if (strcmp(ladr, "") == 0) + { + goto last; + } + + msg(M_INFO, "TCPv4_SERVER MTIO addr [%s][%s] [%s][%d] {%d}{%d}", ladr, madr, wadr, mi->mtio_stat, m->mtio_idno, mi->mtio_idno); + /* if route currently exists, get the instance which owns it */ he = hash_lookup_fast(m->vhash, bucket, addr, hv); if (he) @@ -1100,6 +1284,8 @@ multi_learn_addr(struct multi_context *m, struct multi_instance *mi, const struc free(newroute); } } + +last: gc_free(&gc); return owner; @@ -1151,7 +1337,10 @@ multi_get_instance_by_virtual_addr(struct multi_context *m, const struct mroute_ { /* found an applicable route, cache host route */ struct multi_instance *mi = route->instance; - multi_learn_addr(m, mi, addr, MULTI_ROUTE_CACHE | MULTI_ROUTE_AGEABLE); + if (!m->top.options.ce.mtio_conf) + { + multi_learn_addr(m, mi, addr, MULTI_ROUTE_CACHE | MULTI_ROUTE_AGEABLE); + } ret = mi; break; } @@ -1182,11 +1371,10 @@ multi_get_instance_by_virtual_addr(struct multi_context *m, const struct mroute_ /* * Helper function to multi_learn_addr(). + * netbits: -1 if host route, otherwise # of network bits in address */ -static struct multi_instance * -multi_learn_in_addr_t(struct multi_context *m, struct multi_instance *mi, in_addr_t a, - int netbits, /* -1 if host route, otherwise # of network bits in address */ - bool primary) +struct multi_instance * +multi_learn_in_addr_t(struct multi_context *m, struct multi_instance *mi, in_addr_t a, int netbits, bool primary) { struct openvpn_sockaddr remote_si; struct mroute_addr addr = { 0 }; @@ -1306,7 +1494,10 @@ multi_add_iroutes(struct multi_context *m, struct multi_instance *mi) mroute_helper_add_iroute46(m->route_helper, ir->netbits); - multi_learn_in_addr_t(m, mi, ir->network, ir->netbits, false); + if (!m->top.options.ce.mtio_conf) + { + multi_learn_in_addr_t(m, mi, ir->network, ir->netbits, false); + } } for (ir6 = mi->context.options.iroutes_ipv6; ir6 != NULL; ir6 = ir6->next) { @@ -1329,6 +1520,11 @@ multi_add_iroutes(struct multi_context *m, struct multi_instance *mi) static void multi_delete_dup(struct multi_context *m, struct multi_instance *new_mi) { + if (m->top.options.ce.mtio_conf) + { + return; + } + if (new_mi) { const char *new_cn = tls_common_name(new_mi->context.c2.tls_multi, true); @@ -1344,6 +1540,7 @@ multi_delete_dup(struct multi_context *m, struct multi_instance *new_mi) const char *cn = tls_common_name(mi->context.c2.tls_multi, true); if (cn && !strcmp(cn, new_cn)) { + msg(M_INFO, "MULTI: DEL DUP %s -> %s", cn, new_cn); multi_close_instance(m, mi, false); ++count; } @@ -1367,6 +1564,11 @@ check_stale_routes(struct multi_context *m) struct hash_iterator hi; struct hash_element *he; + if (m->top.options.ce.mtio_conf) + { + return; + } + dmsg(D_MULTI_DEBUG, "MULTI: Checking stale routes"); hash_iterator_init_range(m->vhash, &hi, 0, hash_n_buckets(m->vhash)); while ((he = hash_iterator_next(&hi)) != NULL) @@ -1375,13 +1577,13 @@ check_stale_routes(struct multi_context *m) if (multi_route_defined(m, r) && difftime(now, r->last_reference) >= m->top.options.stale_routes_ageing_time) { - dmsg(D_MULTI_DEBUG, "MULTI: Deleting stale route for address '%s'", - mroute_addr_print(&r->addr, &gc)); + msg(M_INFO, "MULTI: Deleting stale route for address '%s'", mroute_addr_print(&r->addr, &gc)); learn_address_script(m, NULL, "delete", &r->addr); multi_route_del(r); hash_iterator_delete_element(&hi); } } + hash_iterator_free(&hi); gc_free(&gc); } @@ -1425,7 +1627,7 @@ multi_select_virtual_addr(struct multi_context *m, struct multi_instance *mi) * release dynamic allocation */ if (mi->vaddr_handle >= 0) { - ifconfig_pool_release(m->ifconfig_pool, mi->vaddr_handle, true); + ifconfig_pool_release(m->mtio_info.pool, mi->vaddr_handle, true); mi->vaddr_handle = -1; } @@ -1446,20 +1648,21 @@ multi_select_virtual_addr(struct multi_context *m, struct multi_instance *mi) "MULTI_sva: WARNING: if --ifconfig-push is used for IPv4, automatic IPv6 assignment from --ifconfig-ipv6-pool does not work. Use --ifconfig-ipv6-push for IPv6 then."); } } - else if (m->ifconfig_pool && mi->vaddr_handle < 0) /* otherwise, choose a pool address */ + else if (m->mtio_info.pool && mi->vaddr_handle < 0) /* otherwise, choose a pool address */ { in_addr_t local = 0, remote = 0; struct in6_addr remote_ipv6; const char *cn = NULL; - if (!mi->context.options.duplicate_cn) + if ((!mi->context.options.duplicate_cn) && (!(m->top.options.ce.mtio_mode))) { cn = tls_common_name(mi->context.c2.tls_multi, true); } CLEAR(remote_ipv6); - mi->vaddr_handle = - ifconfig_pool_acquire(m->ifconfig_pool, &local, &remote, &remote_ipv6, cn); + pthread_mutex_lock(m->mtio_info.lock); + mi->vaddr_handle = ifconfig_pool_acquire(m->mtio_info.pool, &local, &remote, &remote_ipv6, cn); + pthread_mutex_unlock(m->mtio_info.lock); if (mi->vaddr_handle >= 0) { const int tunnel_type = TUNNEL_TYPE(mi->context.c1.tuntap); @@ -2364,6 +2567,12 @@ multi_client_connect_late_setup(struct multi_context *m, struct multi_instance * mi->reporting_addr = mi->context.c2.push_ifconfig_local; mi->reporting_addr_ipv6 = mi->context.c2.push_ifconfig_ipv6_local; + if (mi->mtio_stat <= 1) + { + mi->mtio_stat = 3; + m->mtio_stat = 3; + } + /* set context-level authentication flag */ mi->context.c2.tls_multi->multi_state = CAS_CONNECT_DONE; @@ -2395,7 +2604,7 @@ multi_client_connect_late_setup(struct multi_context *m, struct multi_instance * */ if (TUNNEL_TYPE(mi->context.c1.tuntap) == DEV_TYPE_TUN) { - if (mi->context.c2.push_ifconfig_defined) + if (mi->context.c2.push_ifconfig_defined && !m->top.options.ce.mtio_conf) { multi_learn_in_addr_t(m, mi, mi->context.c2.push_ifconfig_local, -1, true); msg(D_MULTI_LOW, "MULTI: primary virtual IP for %s: %s", @@ -2415,7 +2624,11 @@ multi_client_connect_late_setup(struct multi_context *m, struct multi_instance * /* add routes locally, pointing to new client, if * --iroute options have been specified */ - multi_add_iroutes(m, mi); + if (!mi->did_iroutes) + { + multi_add_iroutes(m, mi); + mi->did_iroutes = true; + } /* * iroutes represent subnets which are "owned" by a particular @@ -3310,6 +3523,43 @@ multi_process_incoming_dco(dco_context_t *dco) } #endif /* if defined(ENABLE_DCO) */ +struct multi_instance *multi_learn_peer_addr(struct multi_context *m, struct multi_instance *i, struct mroute_addr *p) +{ + int numb_clis = (int)m->max_clients; + struct multi_instance *r = NULL; + in_addr_t b = ntohl(p->v4.addr); + if (i) + { + multi_context_switch_addr(m, i, false, true); + r = i; + } + else + { + for (int z = 0; z < numb_clis; ++z) + { + struct multi_instance *j = m->instances[z]; + if (!j) { continue; } + for (int x = 0; x < m->mtio_info.maxc; ++x) + { + struct multi_link *l = &(m->mtio_info.link[x]); + if (strcmp(l->uniq, "") != 0) + { + for (int y = 0; y < MAX_THREADS; ++y) + { + struct multi_address *a = &(l->adrs[y]); + if (a->addr == b) + { + multi_context_switch_addr(m, j, false, true); + break; + } + } + } + } + } + } + return r; +} + /* * Process packets in the TCP/UDP socket -> TUN/TAP interface direction, * i.e. client -> server direction. @@ -3387,8 +3637,7 @@ multi_process_incoming_link(struct multi_context *m, struct multi_instance *inst if (TUNNEL_TYPE(m->top.c1.tuntap) == DEV_TYPE_TUN) { /* extract packet source and dest addresses */ - mroute_flags = - mroute_extract_addr_from_packet(&src, &dest, 0, &c->c2.to_tun, DEV_TYPE_TUN); + mroute_flags = mroute_extract_addr_from_packet(&src, &dest, 0, &c->c2.to_tun, DEV_TYPE_TUN); /* drop packet if extract failed */ if (!(mroute_flags & MROUTE_EXTRACT_SUCCEEDED)) @@ -3396,7 +3645,7 @@ multi_process_incoming_link(struct multi_context *m, struct multi_instance *inst c->c2.to_tun.len = 0; } /* make sure that source address is associated with this client */ - else if (multi_get_instance_by_virtual_addr(m, &src, true) != m->pending2) + else if ((multi_get_instance_by_virtual_addr(m, &src, true) != m->pending2) && (multi_learn_peer_addr(m, m->pending2, &src) != m->pending2)) { /* IPv6 link-local address (fe80::xxx)? */ if ((src.type & MR_ADDR_MASK) == MR_ADDR_IPV6 @@ -3406,7 +3655,7 @@ multi_process_incoming_link(struct multi_context *m, struct multi_instance *inst } else { - msg(D_MULTI_DROPPED, + msg(D_MULTI_ERRORS, "MULTI: bad source address from client [%s], packet dropped", mroute_addr_print(&src, &gc)); } @@ -3456,8 +3705,7 @@ multi_process_incoming_link(struct multi_context *m, struct multi_instance *inst } } /* extract packet source and dest addresses */ - mroute_flags = - mroute_extract_addr_from_packet(&src, &dest, vid, &c->c2.to_tun, DEV_TYPE_TAP); + mroute_flags = mroute_extract_addr_from_packet(&src, &dest, vid, &c->c2.to_tun, DEV_TYPE_TAP); if (mroute_flags & MROUTE_EXTRACT_SUCCEEDED) { @@ -3486,7 +3734,7 @@ multi_process_incoming_link(struct multi_context *m, struct multi_instance *inst } else { - msg(D_MULTI_DROPPED, + msg(D_MULTI_ERRORS, "MULTI: bad source address from client [%s], packet dropped", mroute_addr_print(&src, &gc)); c->c2.to_tun.len = 0; @@ -3560,6 +3808,13 @@ multi_process_incoming_tun_part2(struct multi_context *m, const unsigned int mpp if (mroute_flags & MROUTE_EXTRACT_SUCCEEDED) { struct context *c; + struct multi_instance *i = multi_get_instance_by_virtual_addr(m, &dest, dev_type == DEV_TYPE_TUN); + + if (!i) + { + multi_learn_peer_addr(m, m->pending, &dest); + i = multi_get_instance_by_virtual_addr(m, &dest, dev_type == DEV_TYPE_TUN); + } /* broadcast or multicast dest addr? */ if (mroute_flags & (MROUTE_EXTRACT_BCAST | MROUTE_EXTRACT_MCAST)) @@ -3569,20 +3824,19 @@ multi_process_incoming_tun_part2(struct multi_context *m, const unsigned int mpp } else if (m->inst_indx == -9) { - struct multi_instance *inst = multi_get_instance_by_virtual_addr(m, &dest, dev_type == DEV_TYPE_TUN); - if (inst) + if (i) { int leng = m->inst_leng; for (int x = 0; x < leng; ++x) { - if (m->inst_list[x] == inst) + if (m->inst_list[x] == i) { m->inst_indx = x; return true; } } leng = min_max(leng, 0, TUN_BAT_MIN - 1); - m->inst_list[leng] = inst; + m->inst_list[leng] = i; m->inst_indx = leng; m->inst_leng = (leng + 1); } @@ -3590,7 +3844,7 @@ multi_process_incoming_tun_part2(struct multi_context *m, const unsigned int mpp } else { - multi_set_pending(m, multi_get_instance_by_virtual_addr(m, &dest, dev_type == DEV_TYPE_TUN)); + multi_set_pending(m, i); if (m->pending) { @@ -3712,7 +3966,7 @@ bool multi_process_incoming_tun(struct multi_context *m, const unsigned int mpp_ } } -bool multi_in_tun(struct multi_context *m, const unsigned int mpp_flags) +bool threaded_multi_inp_tun(struct multi_context *m, const unsigned int mpp_flags) { if (INST_LENG(m)) { @@ -3721,16 +3975,23 @@ bool multi_in_tun(struct multi_context *m, const unsigned int mpp_flags) else { struct context *c = &(m->top); - read_incoming_tun(c); - if (!IS_SIG(c)) + if (*(m->mtio_info.hold) == m->mtio_info.maxt) { - multi_process_incoming_tun(m, mpp_flags); + ssize_t size; + uint8_t temp[1]; + size = read(c->c1.tuntap->fd, temp, 1); + if (size < 1) { /* no-op */ } + if (!IS_SIG(c)) + { + multi_process_incoming_tun(m, mpp_flags); + } + if (!IS_SIG(c)) + { + multi_process_inp_tun_post(m, mpp_flags); + } + size = write(c->c1.tuntap->fz, temp, 1); + return true; } - if (!IS_SIG(c)) - { - multi_process_inp_tun_post(m, mpp_flags); - } - return true; } return false; } @@ -4280,13 +4541,72 @@ multi_get_timeout(struct multi_context *multi, struct timeval *timeval) * * @param multi context structure */ -static void -tunnel_server_loop(struct multi_context *multi) +static void tunnel_server_loop(struct thread_pointer *b) { int status; + struct context_pointer *p = b->p; + + status = 0; + while (status == 0) + { + status = 1; + for (int x = 0; x < p->n; ++x) + { + if (p->m[x] == NULL) + { + status = 0; + } + } + sleep(1); + } + + struct multi_context *multi = p->m[b->i-1]; + struct context *c = &(p->m[b->i-1]->top); + struct context *d = &(p->m[0]->top); + + multi->mtio_idno = b->i; + multi->mtio_info.maxt = b->n; + multi->mtio_info.maxc = p->x; + multi->mtio_info.link = p->k; + multi->mtio_info.lock = p->l; + multi->mtio_info.indx = &(p->i); + multi->mtio_info.hold = &(p->h); + multi->mtio_info.pool = p->m[0]->ifconfig_pool; + + if (b->i == 1) + { + while (p->h < p->n) + { + if (p->z == -1) { break; } else { sleep(1); } + } + p->z = 1; + } + else + { + b->h += 1; p->h += 1; + while ((p->z != 1) || (!(d->c1.tuntap)) || (d->c1.tuntap->ff <= 1)) + { + if (p->z == -1) { break; } else { sleep(1); } + } + } + + msg(M_INFO, "TCPv4_SERVER MTIO init [%d][%d] [%d][%d] {%d}{%d}", b->h, b->n, p->h, p->n, p->z, b->i); while (true) { + if (p->z != 1) { break; } + if (c->c1.tuntap && (c->c1.tuntap->fd > 1) && (c->c1.tuntap->ff <= 1)) + { + socketpair(AF_UNIX, SOCK_DGRAM, 0, p->s[b->i-1]); + socketpair(AF_UNIX, SOCK_DGRAM, 0, p->r[b->i-1]); + c->c1.tuntap->ff = c->c1.tuntap->fd; + c->c1.tuntap->fe = (b->i == 1) ? c->c1.tuntap->ff : d->c1.tuntap->ff; + //c->c1.tuntap->fd = (b->i == 1) ? c->c1.tuntap->ff : d->c1.tuntap->ff; + c->c1.tuntap->fd = p->s[b->i-1][0]; + c->c1.tuntap->fz = p->r[b->i-1][1]; + msg(M_INFO, "TCPv4_SERVER MTIO fdno [%d][%d][%d][%d] {%d}", c->c1.tuntap->fd, c->c1.tuntap->fe, c->c1.tuntap->ff, c->c1.tuntap->fz, b->i); + } + /* wait on tun/socket list */ multi_get_timeout(multi, &multi->top.c2.timeval); status = multi_io_wait(multi); @@ -4299,7 +4619,7 @@ tunnel_server_loop(struct multi_context *multi) if (status > 0) { /* process the I/O which triggered select */ - multi_io_process_io(multi); + multi_io_process_io(b); } else if (status == 0) { @@ -4308,18 +4628,34 @@ tunnel_server_loop(struct multi_context *multi) MULTI_CHECK_SIG(multi); } + + msg(M_INFO, "TCPv4_SERVER MTIO fins [%d][%d] [%d][%d] {%d}{%d}", b->h, b->n, p->h, p->n, p->z, b->i); + + p->z = -1; + + if (c->c1.tuntap && (c->c1.tuntap->ff > 1)) + { + close(p->s[b->i-1][0]); close(p->s[b->i-1][1]); + close(p->r[b->i-1][0]); close(p->r[b->i-1][1]); + c->c1.tuntap->fd = c->c1.tuntap->ff; + c->c1.tuntap->ff = -1; + } } /* * Top level event loop. */ -void -tunnel_server(struct context *top) +void *tunnel_server(void *args) { - ASSERT(top->options.mode == MODE_SERVER); - + struct thread_pointer *arg = (struct thread_pointer *)args; + struct context_pointer *ptr = arg->p; + struct context *top = (arg->i == 1) ? ptr->c : arg->c; struct multi_context multi; + if (arg->i == 1) { sleep(1); } + + ASSERT(top->options.mode == MODE_SERVER); + top->mode = CM_TOP; top->multi = &multi; context_clear_2(top); @@ -4328,7 +4664,7 @@ tunnel_server(struct context *top) init_instance_handle_signals(top, top->es, CC_HARD_USR1_TO_HUP); if (IS_SIG(top)) { - return; + return NULL; } /* initialize global multi_context object */ @@ -4351,7 +4687,10 @@ tunnel_server(struct context *top) } #endif - tunnel_server_loop(&multi); + bzero(&(multi.mtio_info), sizeof(struct multi_info)); + ptr->m[arg->i-1] = &multi; + + tunnel_server_loop(arg); #ifdef ENABLE_ASYNC_PUSH msg(D_LOW, "%s: close multi.top.c2.inotify_fd (%d)", @@ -4369,6 +4708,54 @@ tunnel_server(struct context *top) multi_uninit(&multi); multi_top_free(&multi); close_instance(top); + + return NULL; +} + +void threaded_tunnel_server(struct context *c, struct context *d) +{ + int maxt = (c->options.ce.mtio_mode) ? MAX_THREADS : 1; + int maxc = c->options.max_clients; + struct thread_pointer b[MAX_THREADS]; + struct context_pointer p; + struct multi_link k[maxc]; + pthread_mutex_t lock; + pthread_t thrm, thrd[MAX_THREADS]; + + bzero(&(p), sizeof(struct context_pointer)); + p.i = 1; p.h = 1; p.n = maxt; p.x = maxc; p.z = 0; + p.c = c; p.k = k; p.l = &(lock); p.p = NULL; + p.m = calloc(MAX_THREADS, sizeof(struct multi_context *)); + bzero(p.k, maxc * sizeof(struct multi_link)); + bzero(p.l, sizeof(pthread_mutex_t)); + pthread_mutex_init(p.l, NULL); + + c->skip_bind = 0; + b[0].p = &(p); b[0].c = c; b[0].i = 1; b[0].n = p.n; b[0].h = 0; + bzero(&(thrd[0]), sizeof(pthread_t)); + pthread_create(&(thrd[0]), NULL, tunnel_server, &(b[0])); + + bzero(&(thrm), sizeof(pthread_t)); + pthread_create(&(thrm), NULL, threaded_io_management, &(b[0])); + + for (int x = 1; x < p.n; ++x) + { + d[x].skip_bind = -1; + b[x].p = &(p); b[x].c = &(d[x]); b[x].i = (x + 1); b[x].n = p.n; b[x].h = 1; + bzero(&(thrd[x]), sizeof(pthread_t)); + pthread_create(&(thrd[x]), NULL, tunnel_server, &(b[x])); + } + + pthread_join(thrd[0], NULL); + + for (int x = 1; x < p.n; ++x) + { + pthread_join(thrd[x], NULL); + } + + pthread_join(thrm, NULL); + + free(p.m); } /* Searches for the address and deletes it if it is owned by the multi_instance */ @@ -4380,6 +4767,11 @@ multi_unlearn_addr(struct multi_context *m, struct multi_instance *mi, const str struct hash_bucket *bucket = hash_bucket(m->vhash, hv); struct multi_route *r = NULL; + if (m->top.options.ce.mtio_conf) + { + return; + } + /* if route currently exists, get the instance which owns it */ he = hash_lookup_fast(m->vhash, bucket, addr, hv); if (he) @@ -4394,7 +4786,7 @@ multi_unlearn_addr(struct multi_context *m, struct multi_instance *mi, const str } struct gc_arena gc = gc_new(); - msg(D_MULTI_LOW, "MULTI: Unlearn: %s -> %s", mroute_addr_print(&r->addr, &gc), multi_instance_string(mi, false, &gc)); + msg(M_INFO, "MULTI: Unlearn: %s -> %s", mroute_addr_print(&r->addr, &gc), multi_instance_string(mi, false, &gc)); learn_address_script(m, NULL, "delete", &r->addr); hash_remove_by_value(m->vhash, r); multi_route_del(r); @@ -4476,22 +4868,33 @@ unlearn_ifconfig_ipv6(struct multi_context *m, struct multi_instance *mi) void update_vhash(struct multi_context *m, struct multi_instance *mi, const char *new_ip, const char *new_ipv6) { + if (m->top.options.ce.mtio_conf) + { + return; + } + if (new_ip) { + in_addr_t old_addr_t = mi->context.c2.push_ifconfig_local; + + struct in_addr new_addr; + CLEAR(new_addr); + int addr_stat = inet_pton(AF_INET, new_ip, &new_addr); + in_addr_t new_addr_t = ntohl(new_addr.s_addr); + /* Remove old IP */ - if (mi->context.c2.push_ifconfig_defined) + if (addr_stat == 1 && new_addr_t != old_addr_t + && mi->context.c2.push_ifconfig_defined) { unlearn_ifconfig(m, mi); } /* Add new IP */ - struct in_addr new_addr; - CLEAR(new_addr); - if (inet_pton(AF_INET, new_ip, &new_addr) == 1 - && multi_learn_in_addr_t(m, mi, ntohl(new_addr.s_addr), -1, true)) + if (addr_stat == 1 && new_addr_t != old_addr_t + && multi_learn_in_addr_t(m, mi, new_addr_t, -1, true)) { mi->context.c2.push_ifconfig_defined = true; - mi->context.c2.push_ifconfig_local = ntohl(new_addr.s_addr); + mi->context.c2.push_ifconfig_local = new_addr_t; /* set our client's VPN endpoint for status reporting purposes */ mi->reporting_addr = mi->context.c2.push_ifconfig_local; } diff --git a/src/openvpn/multi.h b/src/openvpn/multi.h index ebd99c27..b7e77be5 100644 --- a/src/openvpn/multi.h +++ b/src/openvpn/multi.h @@ -145,6 +145,10 @@ struct multi_instance #ifdef ENABLE_ASYNC_PUSH int inotify_watch; /* watch descriptor for acf */ #endif + + int mtio_stat; + int mtio_idno; + struct multi_address mtio_addr; }; @@ -221,8 +225,13 @@ struct multi_context int inst_indx; int inst_leng; struct multi_instance **inst_list; + + int mtio_stat; + int mtio_idno; + struct multi_info mtio_info; }; + /** * Return values used by the client connect call-back functions. */ @@ -258,19 +267,23 @@ struct multi_route * * @param top - Top-level context structure. */ -void tunnel_server(struct context *top); +void threaded_tunnel_server(struct context *c, struct context *d); int min_max(int a, int b, int c); +bool multi_context_switch_addr(struct multi_context *m, struct multi_instance *i, bool s, bool l); + +struct multi_context *multi_context_switch_conn(struct thread_pointer *b, struct multi_context *m, struct multi_instance *i); + +struct multi_instance *multi_learn_in_addr_t(struct multi_context *m, struct multi_instance *mi, in_addr_t a, int netbits, bool primary); + const char *multi_instance_string(const struct multi_instance *mi, bool null, struct gc_arena *gc); /* * Called by mtcp.c, mudp.c, or other (to be written) protocol drivers */ -struct multi_instance *multi_create_instance(struct multi_context *m, - const struct mroute_addr *real, - struct link_socket *sock); +struct multi_instance *multi_create_instance(struct thread_pointer *b, const struct mroute_addr *real, struct link_socket *sock); void multi_close_instance(struct multi_context *m, struct multi_instance *mi, bool shutdown); @@ -360,7 +373,7 @@ bool multi_process_incoming_tun(struct multi_context *m, const unsigned int mpp_ bool multi_process_inp_tun_post(struct multi_context *m, const unsigned int mpp_flags); -bool multi_in_tun(struct multi_context *m, const unsigned int mpp_flags); +bool threaded_multi_inp_tun(struct multi_context *m, const unsigned int mpp_flags); void multi_process_drop_outgoing_tun(struct multi_context *m, const unsigned int mpp_flags); diff --git a/src/openvpn/multi_io.c b/src/openvpn/multi_io.c index e59a54e3..ea069152 100644 --- a/src/openvpn/multi_io.c +++ b/src/openvpn/multi_io.c @@ -286,7 +286,7 @@ multi_io_dispatch(struct multi_context *m, struct multi_instance *mi, const int { case TA_INST_LENG: case TA_TUN_READ: - multi_in_tun(m, mpp_flags); + threaded_multi_inp_tun(m, mpp_flags); break; case TA_SOCKET_READ: @@ -386,8 +386,9 @@ last: } void -multi_io_process_io(struct multi_context *m) +multi_io_process_io(struct thread_pointer *b) { + struct multi_context *m = b->p->m[b->i-1]; struct multi_io *multi_io = m->multi_io; int i; @@ -432,8 +433,8 @@ multi_io_process_io(struct multi_context *m) if (!proto_is_dgram(ev_arg->u.sock->info.proto)) { socket_reset_listen_persistent(ev_arg->u.sock); - mi = multi_create_instance_tcp(m, ev_arg->u.sock); - if (mi) { multi_io_action(m, mi, TA_INITIAL, false); } + mi = multi_create_instance_tcp(b, ev_arg->u.sock); + if (mi) { multi_io_action(b->p->p, mi, TA_INITIAL, false); } } else { @@ -441,9 +442,6 @@ multi_io_process_io(struct multi_context *m) if (m->pending) { multi_io_action(m, m->pending, TA_INITIAL, false); } if (m->pending2) { multi_io_action(m, m->pending2, TA_INITIAL, false); } } - /* monitor and/or handle events that are - * triggered in succession by the first one - * before returning to the main loop. */ break; } } @@ -469,7 +467,6 @@ multi_io_process_io(struct multi_context *m) multi_io_action(m, NULL, TA_TUN_READ, false); } } - #if defined(ENABLE_DCO) /* incoming data on DCO? */ else if (e->arg == MULTI_IO_DCO) @@ -506,6 +503,22 @@ multi_io_process_io(struct multi_context *m) multi_io_action(m, mi, TA_SOCKET_WRITE, true); } } + + if (m->mtio_stat == 3) + { + int numb_clis = (int)m->max_clients; + for (int x = 0; x < numb_clis; ++x) + { + struct multi_instance *j = m->instances[x]; + if (!j) { continue; } + if (j->mtio_stat == 3) + { + multi_context_switch_addr(m, j, true, true); + j->mtio_stat = 5; + } + } + m->mtio_stat = 5; + } } void diff --git a/src/openvpn/multi_io.h b/src/openvpn/multi_io.h index a5d6edb5..1998d9c4 100644 --- a/src/openvpn/multi_io.h +++ b/src/openvpn/multi_io.h @@ -68,7 +68,7 @@ void multi_io_free(struct multi_io *multi_io); int multi_io_wait(struct multi_context *m); -void multi_io_process_io(struct multi_context *m); +void multi_io_process_io(struct thread_pointer *b); void multi_io_set_global_rw_flags(struct multi_context *m, struct multi_instance *mi); diff --git a/src/openvpn/openvpn.c b/src/openvpn/openvpn.c index f38660f2..89ebb41e 100644 --- a/src/openvpn/openvpn.c +++ b/src/openvpn/openvpn.c @@ -37,6 +37,10 @@ #define P2P_CHECK_SIG() EVENT_LOOP_CHECK_SIGNAL(c, process_signal_p2p, c); +#define TEST_CONN_ADDR_MAPS(s, d, i) (((s.v4.addr == i.srca) && (d.v4.addr == i.dsta)) || ((d.v4.addr == i.srca) && (s.v4.addr == i.dsta))) +#define TEST_CONN_ADDR_NOTS(s, d, i) ((s.v4.addr == i) || (d.v4.addr == i)) +#define TEST_CONN_ADDR_MSKS(s, d, i, m) (((s.v4.addr & m) == i) && ((d.v4.addr & m) == i)) + static bool process_signal_p2p(struct context *c) { @@ -45,6 +49,77 @@ process_signal_p2p(struct context *c) } +int proc_conn_addr_maps(struct mtio_args *args, struct mtio_cons *cons, uint8_t *buff_data, int buff_size, bool mtio_mode) +{ + int thid = *(args->thid), expr = args->expr; + struct mroute_addr srca = { 0 }, dsta = { 0 }; + struct buffer buff; + buff.data = buff_data; buff.offset = 0; buff.len = buff_size; + if (mtio_mode && (mroute_extract_addr_ip(&srca, &dsta, &buff) == MROUTE_EXTRACT_SUCCEEDED)) + { + int flag = 1; + for (int y = 0; y < args->notl; ++y) + { + if (TEST_CONN_ADDR_NOTS(srca, dsta, args->nots[y])) + { + flag = 0; + } + } + for (int y = 0; (y + 1) < args->mskl; y += 2) + { + if (TEST_CONN_ADDR_MSKS(srca, dsta, args->msks[y], args->msks[y + 1])) + { + flag = 0; + } + } + if (flag == 1) + { + int indx = -1, scan = 0, memr = 4, logs = -1; + time_t secs = time(NULL); + uint32_t srcn = srca.v4.addr, dstn = dsta.v4.addr; + uint32_t srch = (HASH_PART(srcn, 24, 11, 103) + HASH_PART(srcn, 16, 13, 107) + HASH_PART(srcn, 8, 17, 109) + HASH_PART(srcn, 0, 19, 113)); + uint32_t dsth = (HASH_PART(dstn, 24, 31, 131) + HASH_PART(dstn, 16, 53, 137) + HASH_PART(dstn, 8, 67, 139) + HASH_PART(dstn, 0, 79, 151)); + uint32_t hidx = (((srch * 163) + (dsth * 167)) % MAX_CSTATES); + for (int y = 0; y < MAX_CSTATES; ++y) + { + if (TEST_CONN_ADDR_MAPS(srca, dsta, cons[hidx])) + { + indx = hidx; + if ((secs - cons[hidx].last) >= expr) { indx = (-1 * (hidx + 11)); } + break; + } + else if ((secs - cons[hidx].last) >= expr) + { + if (indx == -1) { indx = (-1 * (hidx + 11)); } + } + if ((scan >= memr) && (indx != -1)) { break; } + hidx = ((hidx + 1) % MAX_CSTATES); + scan += 1; + } + if (indx < 0) + { + indx = (indx == -1) ? (int)hidx : ((indx * -1) - 11); + cons[indx].srca = srca.v4.addr; + cons[indx].dsta = dsta.v4.addr; + cons[indx].thid = thid; + logs = 1; + } + if (logs == 1) + { + char a[28], b[28]; + struct in_addr t; + t.s_addr = srca.v4.addr; bzero(a, 28); snprintf(a, 24, "%s", inet_ntoa(t)); + t.s_addr = dsta.v4.addr; bzero(b, 28); snprintf(b, 24, "%s", inet_ntoa(t)); + msg(M_INFO, "%s MTIO maps <%d>(%d) [%s][%s] {%d}{%d}", args->pref, indx, expr, a, b, thid, cons[indx].thid); + } + thid = cons[indx].thid; + cons[indx].last = secs; + } + } + return thid; +} + + /**************************************************************************/ /** * Main event loop for OpenVPN in client mode, where only one VPN tunnel @@ -53,9 +128,13 @@ process_signal_p2p(struct context *c) * * @param c - The context structure of the single active VPN tunnel. */ -static void -tunnel_point_to_point(struct context *c) +void *tunnel_point_to_point(void *a) { + struct thread_pointer *b = (struct thread_pointer *)a; + struct context_pointer *p = b->p; + struct context *c = (b->n == 1) ? p->c : b->c; + struct context *d = (b->n == 1) ? b->c : p->c; + context_clear_2(c); /* set point-to-point mode */ @@ -66,12 +145,44 @@ tunnel_point_to_point(struct context *c) init_instance_handle_signals(c, c->es, stdin_config ? 0 : CC_HARD_USR1_TO_HUP); if (IS_SIG(c)) { - return; + return NULL; } + if (b->i == 1) + { + while (p->h < p->n) + { + if (p->z == -1) { break; } else { sleep(1); } + } + p->z = 1; + } + else + { + b->h += 1; p->h += 1; + while ((p->z != 1) || (!(d->c1.tuntap)) || (d->c1.tuntap->ff <= 1)) + { + if (p->z == -1) { break; } else { sleep(1); } + } + } + + msg(M_INFO, "TCPv4_CLIENT MTIO init [%d][%d] [%d][%d] {%d}{%d}", b->h, b->n, p->h, p->n, p->z, b->i); + /* main event loop */ while (true) { + if (p->z != 1) { break; } + if (c->c1.tuntap && (c->c1.tuntap->fd > 1) && (c->c1.tuntap->ff <= 1)) + { + socketpair(AF_UNIX, SOCK_DGRAM, 0, p->s[b->i-1]); + socketpair(AF_UNIX, SOCK_DGRAM, 0, p->r[b->i-1]); + c->c1.tuntap->ff = c->c1.tuntap->fd; + c->c1.tuntap->fe = (b->i == 1) ? c->c1.tuntap->ff : d->c1.tuntap->ff; + //c->c1.tuntap->fd = (b->i == 1) ? c->c1.tuntap->ff : d->c1.tuntap->ff; + c->c1.tuntap->fd = p->s[b->i-1][0]; + c->c1.tuntap->fz = p->r[b->i-1][1]; + msg(M_INFO, "TCPv4_CLIENT MTIO fdno [%d][%d][%d][%d] {%d}", c->c1.tuntap->fd, c->c1.tuntap->fe, c->c1.tuntap->ff, c->c1.tuntap->fz, b->i); + } + /* process timers, TLS, etc. */ pre_select(c); P2P_CHECK_SIG(); @@ -87,16 +198,279 @@ tunnel_point_to_point(struct context *c) } /* process the I/O which triggered select */ - process_io(c, c->c2.link_sockets[0]); + process_io(c, c->c2.link_sockets[0], b); P2P_CHECK_SIG(); } + msg(M_INFO, "TCPv4_CLIENT MTIO fins [%d][%d] [%d][%d] {%d}{%d}", b->h, b->n, p->h, p->n, p->z, b->i); + + p->z = -1; + + if (c->c1.tuntap && (c->c1.tuntap->ff > 1)) + { + close(p->s[b->i-1][0]); close(p->s[b->i-1][1]); + close(p->r[b->i-1][0]); close(p->r[b->i-1][1]); + c->c1.tuntap->fd = c->c1.tuntap->ff; + c->c1.tuntap->ff = -1; + } + persist_client_stats(c); uninit_management_callback(); /* tear down tunnel instance (unless --persist-tun) */ close_instance(c); + + return NULL; +} + +void *threaded_io_management(void *a) +{ + struct thread_pointer *b = (struct thread_pointer *)a; + struct context_pointer *p = b->p; + struct context *c, *d; + int maxt = p->n, maxf = 0, maxl = 0; + int maxz = TUN_BAT_MIN, maxx = (MAX_THREADS * TUN_BAT_MIN); + int fdno = 0, indx = 0, size = 0, selw = 0; + int thid = 0, tidx = 0, tbeg = 0, tend = 0; + ssize_t leng = 0; + uint8_t *ptra; + uint8_t busy[maxx], proc[maxx], sels[maxx]; + fd_set rfds; + struct timeval timo; + struct mtio_args marg; + struct mtio_cons cons[MAX_CSTATES]; + + while (true) + { + if (p->z == -1) { break; } + if ((p->z == 1) && (p->h == p->n)) { break; } + sleep(1); + } + d = (p->m && p->m[0]) ? &(p->m[0]->top) : b[0].c; + size = d->c2.frame.buf.payload_size; + if (size < 1) { size = 1; } + while (true) + { + if (p->z != 1) { break; } + int stat = 0; + for (int x = 0; x < maxt; ++x) + { + if (p->m) { stat += 1; } + else + { + struct tls_multi *m = b[x].c->c2.tls_multi; + if (m && (m->multi_state == CAS_CONNECT_DONE)) { stat += 1; } + } + } + if (stat == maxt) { break; } + sleep(1); + } + + bool mtmo = d->options.ce.mtio_mode; + int expr = d->options.ce.mtio_time; + in_addr_t nots[] = { inet_addr("0.0.0.0"), inet_addr("255.255.255.255") }; + in_addr_t msks[] = { inet_addr("10.0.0.0"), inet_addr("255.0.0.0") }; + int sizs[maxx], idxs[maxx], lens[maxt]; + uint8_t bufl[maxx][size]; + uint8_t *bufs[maxx], *swap[maxx]; + + for (int x = 0; x < maxx; ++x) + { + sizs[x] = 0; idxs[x] = 0; + bufs[x] = bufl[x]; + } + + msg(M_INFO, "%s MTIO mgmt [%d] {%d}", (p->m) ? "TCPv4_SERVER" : "TCPv4_CLIENT", size, BULK_MODE(d)); + + marg.pref = (p->m) ? "TCPv4_SERVER" : "TCPv4_CLIENT"; + marg.thid = &(thid); marg.expr = expr; marg.busy = busy; + marg.nots = nots; marg.notl = (sizeof(nots) / sizeof(nots[0])); + marg.msks = msks; marg.mskl = (sizeof(msks) / sizeof(msks[0])); + + bzero(busy, maxt * sizeof(uint8_t)); + bzero(cons, MAX_CSTATES * sizeof(struct mtio_cons)); + while (true) + { + if (p->z != 1) { break; } + indx = -1; maxf = 0; maxl = 0; + FD_ZERO(&rfds); + for (int x = 0; x < maxt; ++x) + { + if (busy[x] != 1) { indx = x; } + if (p->r[x][0] > 1) { FD_SET(p->r[x][0], &rfds); maxl += 1; } + if (p->r[x][0] > maxf) { maxf = p->r[x][0]; } + } + if (maxl != maxt) + { + sleep(1); + continue; + } + timo.tv_sec = 0; timo.tv_usec = 0; + select(maxf+1, &rfds, NULL, NULL, (indx < 0) ? NULL : &timo); + for (int x = 0; x < maxt; ++x) + { + if (FD_ISSET(p->r[x][0], &rfds) || ((busy[x] == 1) && (sels[x] == 1) && (selw == 1))) + { + leng = read(p->r[x][0], &(busy[maxt+1]), 1); + busy[x] = 0; + } + if (busy[x] != 1) + { + if (busy[thid] == 1) { thid = x; } + lens[x] = 0; + } + proc[x] = 0; sels[x] = 0; + } + tbeg = 0; tend = 0; selw = 0; + for (int x = 0; x < maxx; ++x) + { + if (sizs[x] > 0) { tend = (x + 1); } + else if (sizs[tbeg] > 0) { tbeg = x; } + } + d = (p->m) ? &(p->m[0]->top) : b[0].c; + fdno = d->c1.tuntap->ff; + FD_ZERO(&rfds); FD_SET(fdno, &rfds); + timo.tv_sec = 1; timo.tv_usec = 750000; + if (tend > 0) { timo.tv_sec = 0; timo.tv_usec = 0; } + if (BULK_MODE(d)) + { + for (int x = 0; (fdno > 1) && (x < maxx); ++x) + { + if (x >= tend) + { + tend = min_max(tend, 0, maxx); + select(fdno+1, &rfds, NULL, NULL, &timo); + if ((p->z == 1) && FD_ISSET(fdno, &rfds) && (tend < maxx)) + { + leng = read(fdno, bufs[tend], size); + maxl = (int)leng; + tidx = thid; + thid = proc_conn_addr_maps(&(marg), cons, bufs[tend], maxl, mtmo); + sizs[tend] = maxl; idxs[tend] = thid; + tend = (x + 1); + thid = ((tidx + 1) % maxt); + } + FD_ZERO(&rfds); FD_SET(fdno, &rfds); + timo.tv_sec = 0; timo.tv_usec = 0; + } + if (sizs[x] > 0) + { + tidx = idxs[x]; sels[tidx] = 1; selw = 1; + c = (p->m) ? &(p->m[tidx]->top) : b[tidx].c; + indx = min_max(lens[tidx], 0, maxz); + if ((p->z == 1) && (busy[tidx] != 1) && (indx < maxz)) + { + c->c2.buffers->read_tun_bufs[indx].len = sizs[x]; + c->c2.buffers->read_tun_bufs[indx].offset = TUN_BAT_OFF; + ptra = BPTR(&c->c2.buffers->read_tun_bufs[indx]); + bcopy(bufs[x], ptra, sizs[x]); + c->c2.bufs[indx] = c->c2.buffers->read_tun_bufs[indx]; + c->c2.buffers->bulk_indx = 0; + c->c2.buffers->bulk_leng = (indx + 1); + sizs[x] = 0; proc[tidx] = 1; lens[tidx] += 1; + } + } + if ((sizs[x] < 1) && ((x < tbeg) || (sizs[tbeg] > 0))) + { + tbeg = x; + } + if ((sizs[x] > 0) && ((x > tbeg) && (sizs[tbeg] < 1))) + { + swap[tbeg] = bufs[tbeg]; bufs[tbeg] = bufs[x]; bufs[x] = swap[tbeg]; + sizs[tbeg] = sizs[x]; idxs[tbeg] = idxs[x]; sizs[x] = 0; + while ((tbeg <= x) && (sizs[tbeg] > 0)) + { + ++tbeg; + } + } + } + } + else + { + if (fdno > 1) + { + tend = (maxx - 1); + if (sizs[tend] < 1) + { + tend = min_max(tend, 0, maxx); + select(fdno+1, &rfds, NULL, NULL, &timo); + if ((p->z == 1) && FD_ISSET(fdno, &rfds)) + { + leng = read(fdno, bufs[tend], size); + maxl = (int)leng; + tidx = proc_conn_addr_maps(&(marg), cons, bufs[tend], maxl, mtmo); + sizs[tend] = maxl; idxs[tend] = tidx; + thid = ((thid + 1) % maxt); + } + } + if (sizs[tend] > 0) + { + tidx = idxs[tend]; + if ((p->z == 1) && (busy[tidx] != 1)) + { + c = (p->m) ? &(p->m[tidx]->top) : b[tidx].c; + c->c2.buffers->read_tun_buf.len = sizs[tend]; + ptra = BPTR(&c->c2.buffers->read_tun_buf); + bcopy(bufs[tend], ptra, sizs[tend]); + sizs[tend] = 0; proc[tidx] = 1; + } + } + } + } + for (int x = 0; x < maxt; ++x) + { + if (proc[x] == 1) + { + leng = write(p->s[x][1], busy, 1); + busy[x] = 1; sels[x] = 0; selw = 0; + } + } + } + + p->z = -1; + + return NULL; +} + +void threaded_tunnel_point_to_point(struct context *c, struct context *d) +{ + int maxt = (c->options.ce.mtio_mode) ? MAX_THREADS : 1; + struct context_pointer p; + struct thread_pointer b[MAX_THREADS]; + pthread_t thrm, thrd[MAX_THREADS]; + pthread_mutex_t lock; + + bzero(&(p), sizeof(struct context_pointer)); + p.c = c; p.i = 1; p.n = maxt; p.h = 1; p.z = 0; + p.l = &(lock); + bzero(p.l, sizeof(pthread_mutex_t)); + pthread_mutex_init(p.l, NULL); + + c->skip_bind = 0; + b[0].p = &(p); b[0].c = c; b[0].i = 1; b[0].n = p.n; b[0].h = 0; + bzero(&(thrd[0]), sizeof(pthread_t)); + pthread_create(&(thrd[0]), NULL, tunnel_point_to_point, &(b[0])); + + bzero(&(thrm), sizeof(pthread_t)); + pthread_create(&(thrm), NULL, threaded_io_management, &(b[0])); + + for (int x = 1; x < p.n; ++x) + { + d[x].skip_bind = -1; + b[x].p = &(p); b[x].c = &(d[x]); b[x].i = (x + 1); b[x].n = p.n; b[x].h = 1; + bzero(&(thrd[x]), sizeof(pthread_t)); + pthread_create(&(thrd[x]), NULL, tunnel_point_to_point, &(b[x])); + } + + pthread_join(thrd[0], NULL); + + for (int x = 1; x < p.n; ++x) + { + pthread_join(thrd[x], NULL); + } + + pthread_join(thrm, NULL); } #undef PROCESS_SIGNAL_P2P @@ -153,6 +527,9 @@ static int openvpn_main(int argc, char *argv[]) { struct context c; + struct context d[MAX_THREADS]; + char devs[MAX_THREADS][MAX_STRLENG]; + char fils[MAX_THREADS][MAX_STRLENG]; #if PEDANTIC fprintf(stderr, "Sorry, I was built with --enable-pedantic and I am incapable of doing any real work!\n"); @@ -297,17 +674,44 @@ openvpn_main(int argc, char *argv[]) /* finish context init */ context_init_1(&c); + if (c.options.ce.mtio_mode) + { + for (int x = 0; x < MAX_THREADS; ++x) + { + struct context *b = &(d[x]); + + bcopy(&c, b, sizeof(struct context)); + context_init_1(b); + + if (c.options.dev) + { + bzero(devs[x], MAX_STRLENG * sizeof(char)); + snprintf(devs[x], MAX_STRLENG-5, "%st%02d", c.options.dev, x); + b->options.dev = devs[x]; + } + + if (c.options.status_file) + { + bzero(fils[x], MAX_STRLENG * sizeof(char)); + snprintf(fils[x], MAX_STRLENG-5, "%st%02d", c.options.status_file, x); + b->options.status_file = fils[x]; + } + + msg(M_INFO, "INFO MTIO init [%d] [%s][%s]", x, b->options.dev, b->options.status_file); + } + } + do { /* run tunnel depending on mode */ switch (c.options.mode) { case MODE_POINT_TO_POINT: - tunnel_point_to_point(&c); + threaded_tunnel_point_to_point(&c, d); break; case MODE_SERVER: - tunnel_server(&c); + threaded_tunnel_server(&c, d); break; default: @@ -325,12 +729,32 @@ openvpn_main(int argc, char *argv[]) /* pass restart status to management subsystem */ signal_restart_status(c.sig); + + if (c.options.ce.mtio_mode) + { + for (int x = 0; x < MAX_THREADS; ++x) + { + d[x].first_time = false; + signal_restart_status(d[x].sig); + } + } } while (signal_reset(c.sig, SIGUSR1) == SIGUSR1); env_set_destroy(c.es); uninit_options(&c.options); gc_reset(&c.gc); uninit_early(&c); + + /*if (c.options.ce.mtio_mode) + { + for (int x = 0; x < MAX_THREADS; ++x) + { + env_set_destroy(d[x].es); + uninit_options(&d[x].options); + gc_reset(&d[x].gc); + uninit_early(&d[x]); + } + }*/ } while (signal_reset(c.sig, SIGHUP) == SIGHUP); } diff --git a/src/openvpn/openvpn.h b/src/openvpn/openvpn.h index e59930f9..f133d92f 100644 --- a/src/openvpn/openvpn.h +++ b/src/openvpn/openvpn.h @@ -46,6 +46,17 @@ #include "manage.h" #include "dns.h" +/* + mtio mode commit notes: + - maps size hash modp >= 2^14 16384 + - briefly track && map a connection state to a given available thread to ensure packet ordering + - Use a simple calculation based on src && dst IP addresses to get a starting list index value +*/ + +#define MAX_THREADS 4 +#define MAX_STRLENG 64 +#define MAX_CSTATES 16421 + /* * Our global key schedules, packaged thusly * to facilitate key persistence. @@ -518,6 +529,8 @@ struct context bool did_we_daemonize; /**< Whether demonization has already * taken place. */ + int skip_bind; + struct context_persist persist; /**< Persistent %context. */ struct context_0 *c0; /**< Level 0 %context. */ @@ -525,6 +538,78 @@ struct context struct context_2 c2; /**< Level 2 %context. */ }; + +#define HASH_PART(a, s, p, q) ((((a >> s) & 0xff) + p) * q) + +struct context_pointer +{ + int i, h, n, x, z; + int s[MAX_THREADS][2]; + int r[MAX_THREADS][2]; + struct context *c; + struct multi_context **m; + struct multi_context *p; + struct multi_link *k; + pthread_mutex_t *l; +}; + +struct thread_pointer +{ + int i, n, h; + struct context *c; + struct context_pointer *p; +}; + +struct multi_address +{ + char ladr[MAX_STRLENG]; + char wadr[MAX_STRLENG]; + char comm[MAX_STRLENG]; + char user[MAX_STRLENG]; + char uniq[MAX_STRLENG]; + time_t last; + in_addr_t addr; +}; + +struct multi_link +{ + int indx; + char uniq[MAX_STRLENG]; + time_t last; + struct multi_address adrs[MAX_THREADS]; +}; + +struct multi_info +{ + int maxt, maxc; + int *indx, *hold; + struct ifconfig_pool *pool; + pthread_mutex_t *lock; + struct multi_link *link; +}; + +struct mtio_args +{ + char *pref; + int expr; + int *thid; + uint8_t *busy; + int notl; + in_addr_t *nots; + int mskl; + in_addr_t *msks; +}; + +struct mtio_cons +{ + int thid; + time_t last; + in_addr_t srca, dsta; +}; + +void *threaded_io_management(void *a); + + /* * Check for a signal when inside an event loop */ diff --git a/src/openvpn/options.c b/src/openvpn/options.c index 6e0b25dc..34ed861f 100644 --- a/src/openvpn/options.c +++ b/src/openvpn/options.c @@ -306,6 +306,7 @@ static const char usage_message[] = " 'yes' -- Always DF (Don't Fragment)\n" "--mtu-test : Empirically measure and report MTU.\n" "--bulk-mode : Use bulk TUN/TCP reads/writes.\n" + "--mtio-mode n : Use multi threaded mode. (optional expire time: n=30)\n" #ifdef ENABLE_FRAGMENT "--fragment max : Enable internal datagram fragmentation so that no UDP\n" " datagrams are sent which are larger than max bytes.\n" @@ -3268,6 +3269,13 @@ options_postprocess_mutate_invariant(struct options *options) { options->ce.bulk_mode = false; } + + options->ce.mtio_conf = false; + + if (options->ce.mtio_mode) + { + options->ce.mtio_conf = true; + } } static void @@ -9282,6 +9290,19 @@ add_option(struct options *options, char *p[], bool is_inline, const char *file, { options->ce.bulk_mode = true; } + else if (streq(p[0], "mtio-mode")) + { + options->ce.mtio_mode = true; + options->ce.mtio_time = 30; + if (p[1]) + { + int mtio_time = positive_atoi(p[1], msglevel); + if ((5 <= mtio_time) && (mtio_time <= 9995)) + { + options->ce.mtio_time = mtio_time; + } + } + } else { int i; diff --git a/src/openvpn/options.h b/src/openvpn/options.h index a74429cd..106cd17d 100644 --- a/src/openvpn/options.h +++ b/src/openvpn/options.h @@ -180,6 +180,11 @@ struct connection_entry /* Bulk mode allows for multiple tun reads + larger tcp writes */ bool bulk_mode; + + /* Multi threaded IO mode operates on a primary tun interface + multiple tcp connections */ + bool mtio_conf; + bool mtio_mode; + int mtio_time; }; struct remote_entry diff --git a/src/openvpn/socket.c b/src/openvpn/socket.c index 624ce4f5..aceb3cf2 100644 --- a/src/openvpn/socket.c +++ b/src/openvpn/socket.c @@ -708,6 +708,7 @@ create_socket(struct link_socket *sock, struct addrinfo *addr) /* set socket to --mark packets with given value */ socket_set_mark(sock->sd, sock->mark); + if (sock->skip_bind != -1) { #if defined(TARGET_LINUX) if (sock->bind_dev) { @@ -722,6 +723,11 @@ create_socket(struct link_socket *sock, struct addrinfo *addr) #endif bind_local(sock, addr->ai_family); + } else { + struct sockaddr_in locl = { 0 }; + locl.sin_family = AF_INET; locl.sin_addr.s_addr = inet_addr("127.0.0.1"); + bind(sock->sd, (struct sockaddr *)&locl, sizeof(locl)); + } } #if defined(__GNUC__) || defined(__clang__) @@ -1741,6 +1747,7 @@ link_socket_init_phase2(struct context *c, struct link_socket *sock) addr_family_name(sock->info.lsa->bind_local->ai_family)); sock->info.af = sock->info.lsa->bind_local->ai_family; } + sock->skip_bind = c->skip_bind; create_socket(sock, sock->info.lsa->bind_local); } } diff --git a/src/openvpn/socket.h b/src/openvpn/socket.h index cd4e8ed7..8edfbd0d 100644 --- a/src/openvpn/socket.h +++ b/src/openvpn/socket.h @@ -251,6 +251,8 @@ struct link_socket #ifdef ENABLE_DEBUG int gremlin; /* --gremlin bits */ #endif + + int skip_bind; }; /* diff --git a/src/openvpn/tun.c b/src/openvpn/tun.c index f46802fa..33d3ae88 100644 --- a/src/openvpn/tun.c +++ b/src/openvpn/tun.c @@ -1246,6 +1246,11 @@ do_ifconfig_ipv4(struct tuntap *tt, const char *ifname, int tun_mtu, const struc bool tun_p2p = is_tun_p2p(tt); #endif + if (tt->skip_bind == -1) + { + tt->local = htonl(inet_addr("127.1.1.1")); + } + #if !defined(TARGET_LINUX) const char *ifconfig_local = NULL; const char *ifconfig_remote_netmask = NULL; @@ -1755,7 +1760,7 @@ write_tun_header(struct tuntap *tt, uint8_t *buf, int len) } else { - return write(tt->fd, buf, len); + return write(tt->fe, buf, len); } } @@ -1966,11 +1971,14 @@ open_tun_dco_generic(const char *dev, const char *dev_type, struct tuntap *tt, static void close_tun_generic(struct tuntap *tt) { - if (tt->fd >= 0) + if (tt->ff > 1) + { + close(tt->ff); + } + else if (tt->fd >= 0) { close(tt->fd); } - free(tt->actual_name); clear_tuntap(tt); } @@ -2057,7 +2065,7 @@ close_tun(struct tuntap *tt, openvpn_net_ctx_t *ctx) ssize_t write_tun(struct tuntap *tt, uint8_t *buf, int len) { - return write(tt->fd, buf, len); + return write(tt->fe, buf, len); } ssize_t @@ -2137,7 +2145,7 @@ open_tun(const char *dev, const char *dev_type, const char *dev_node, struct tun * Use special ioctl that configures tun/tap device with the parms * we set in ifr */ - if (ioctl(tt->fd, TUNSETIFF, (void *)&ifr) < 0) + if (ioctl((tt->ff > 1) ? tt->ff : tt->fd, TUNSETIFF, (void *)&ifr) < 0) { msg(M_ERR, "ERROR: Cannot ioctl TUNSETIFF %s", dev); } @@ -2262,7 +2270,7 @@ close_tun(struct tuntap *tt, openvpn_net_ctx_t *ctx) ssize_t write_tun(struct tuntap *tt, uint8_t *buf, int len) { - return write(tt->fd, buf, len); + return write(tt->fe, buf, len); } ssize_t @@ -3111,7 +3119,7 @@ write_tun(struct tuntap *tt, uint8_t *buf, int len) } else { - return write(tt->fd, buf, len); + return write(tt->fe, buf, len); } } @@ -3253,7 +3261,7 @@ close_tun(struct tuntap *tt, openvpn_net_ctx_t *ctx) ssize_t write_tun(struct tuntap *tt, uint8_t *buf, int len) { - return write(tt->fd, buf, len); + return write(tt->fe, buf, len); } ssize_t @@ -6313,7 +6321,7 @@ close_tun(struct tuntap *tt, openvpn_net_ctx_t *ctx) ssize_t write_tun(struct tuntap *tt, uint8_t *buf, int len) { - return write(tt->fd, buf, len); + return write(tt->fe, buf, len); } ssize_t diff --git a/src/openvpn/tun.h b/src/openvpn/tun.h index 6a7af850..b4629c1e 100644 --- a/src/openvpn/tun.h +++ b/src/openvpn/tun.h @@ -246,6 +246,9 @@ struct tuntap dco_context_t dco; afunix_context_t afunix; + + int fe, ff, fz; + int skip_bind; }; static inline bool