This commit is contained in:
stoops 2026-05-07 23:57:51 +08:00 committed by GitHub
commit b28fab6729
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 1693 additions and 213 deletions

View file

@ -45,6 +45,11 @@
#include "memdbg.h"
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <pthread.h>
counter_type link_read_bytes_global; /* GLOBAL */
counter_type link_write_bytes_global; /* GLOBAL */
@ -612,6 +617,21 @@ buffer_turnover(const uint8_t *orig_buf, struct buffer *dest_stub, struct buffer
}
}
uint8_t *buff_prepsize(uint8_t *buff, int *size)
{
buff[0] = (uint8_t)((*size >> 8) & 0xff);
buff[1] = (uint8_t)((*size >> 0) & 0xff);
buff += 2;
return buff;
}
uint8_t *buff_postsize(uint8_t *buff, int *size)
{
*size = ((buff[0] << 8) + (buff[1] << 0));
buff += 2;
return buff;
}
/*
* Compress, fragment, encrypt and HMAC-sign an outgoing packet.
* Input: c->c2.buf
@ -889,7 +909,7 @@ socks_postprocess_incoming_link(struct context *c, struct link_socket *sock)
{
if (sock->socks_proxy && sock->info.proto == PROTO_UDP)
{
socks_process_incoming_udp(&c->c2.buf, &c->c2.from);
socks_process_incoming_udp(&c->c2.buf2, &c->c2.from);
}
}
@ -919,7 +939,7 @@ link_socket_write_post_size_adjust(int *size, int size_delta, struct buffer *buf
}
/*
* Output: c->c2.buf
* Output: c->c2.buf2
*/
void
@ -933,10 +953,10 @@ read_incoming_link(struct context *c, struct link_socket *sock)
/*ASSERT (!c->c2.to_tun.len);*/
c->c2.buf = c->c2.buffers->read_link_buf;
ASSERT(buf_init(&c->c2.buf, c->c2.frame.buf.headroom));
c->c2.buf2 = c->c2.buffers->read_link_buf;
ASSERT(buf_init(&c->c2.buf2, c->c2.frame.buf.headroom));
status = link_socket_read(sock, &c->c2.buf, &c->c2.from);
status = link_socket_read(sock, &c->c2.buf2, &c->c2.from);
if (socket_connection_reset(sock, status))
{
@ -989,11 +1009,11 @@ process_incoming_link_part1(struct context *c, struct link_socket_info *lsi, boo
struct gc_arena gc = gc_new();
bool decrypt_status = false;
if (c->c2.buf.len > 0)
if (c->c2.buf2.len > 0)
{
c->c2.link_read_bytes += c->c2.buf.len;
link_read_bytes_global += c->c2.buf.len;
c->c2.original_recv_size = c->c2.buf.len;
c->c2.link_read_bytes += c->c2.buf2.len;
link_read_bytes_global += c->c2.buf2.len;
c->c2.original_recv_size = c->c2.buf2.len;
}
else
{
@ -1006,21 +1026,22 @@ process_incoming_link_part1(struct context *c, struct link_socket_info *lsi, boo
{
if (!ask_gremlin(c->options.gremlin))
{
c->c2.buf.len = 0;
c->c2.buf2.len = 0;
}
corrupt_gremlin(&c->c2.buf, c->options.gremlin);
corrupt_gremlin(&c->c2.buf2, c->options.gremlin);
}
#endif
/* log incoming packet */
#ifdef LOG_RW
if (c->c2.log_rw && c->c2.buf.len > 0)
if (c->c2.log_rw && c->c2.buf2.len > 0)
{
fprintf(stderr, "R");
}
#endif
msg(D_LINK_RW, "%s READ [%d] from %s: %s", proto2ascii(lsi->proto, lsi->af, true),
BLEN(&c->c2.buf), print_link_socket_actual(&c->c2.from, &gc), PROTO_DUMP(&c->c2.buf, &gc));
BLEN(&c->c2.buf2), print_link_socket_actual(&c->c2.from, &gc), PROTO_DUMP(&c->c2.buf2, &gc));
/*
* Good, non-zero length packet received.
@ -1029,18 +1050,18 @@ process_incoming_link_part1(struct context *c, struct link_socket_info *lsi, boo
* If any stage fails, it sets buf.len to 0 or -1,
* telling downstream stages to ignore the packet.
*/
if (c->c2.buf.len > 0)
if (c->c2.buf2.len > 0)
{
struct crypto_options *co = NULL;
const uint8_t *ad_start = NULL;
if (!link_socket_verify_incoming_addr(&c->c2.buf, lsi, &c->c2.from))
if (!link_socket_verify_incoming_addr(&c->c2.buf2, lsi, &c->c2.from))
{
link_socket_bad_incoming_addr(&c->c2.buf, lsi, &c->c2.from);
link_socket_bad_incoming_addr(&c->c2.buf2, lsi, &c->c2.from);
}
if (c->c2.tls_multi)
{
uint8_t opcode = *BPTR(&c->c2.buf) >> P_OPCODE_SHIFT;
uint8_t opcode = *BPTR(&c->c2.buf2) >> P_OPCODE_SHIFT;
/*
* If DCO is enabled, the kernel drivers require that the
@ -1054,7 +1075,7 @@ process_incoming_link_part1(struct context *c, struct link_socket_info *lsi, boo
{
msg(D_LINK_ERRORS, "Data Channel Offload doesn't support DATA_V1 packets. "
"Upgrade your server to 2.4.5 or newer.");
c->c2.buf.len = 0;
c->c2.buf2.len = 0;
}
/*
@ -1067,7 +1088,7 @@ process_incoming_link_part1(struct context *c, struct link_socket_info *lsi, boo
* will load crypto_options with the correct encryption key
* and return false.
*/
if (tls_pre_decrypt(c->c2.tls_multi, &c->c2.from, &c->c2.buf, &co, floated, &ad_start))
if (tls_pre_decrypt(c->c2.tls_multi, &c->c2.from, &c->c2.buf2, &co, floated, &ad_start))
{
interval_action(&c->c2.tmp_int);
@ -1090,12 +1111,12 @@ process_incoming_link_part1(struct context *c, struct link_socket_info *lsi, boo
*/
if (c->c2.tls_multi && c->c2.tls_multi->multi_state < CAS_CONNECT_DONE)
{
c->c2.buf.len = 0;
c->c2.buf2.len = 0;
}
/* authenticate and decrypt the incoming packet */
decrypt_status =
openvpn_decrypt(&c->c2.buf, c->c2.buffers->decrypt_buf, co, &c->c2.frame, ad_start);
openvpn_decrypt(&c->c2.buf2, c->c2.buffers->decrypt_buf, co, &c->c2.frame, ad_start);
if (!decrypt_status
/* on the instance context we have only one socket, so just check the first one */
@ -1120,12 +1141,12 @@ void
process_incoming_link_part2(struct context *c, struct link_socket_info *lsi,
const uint8_t *orig_buf)
{
if (c->c2.buf.len > 0)
if (c->c2.buf2.len > 0)
{
#ifdef ENABLE_FRAGMENT
if (c->c2.fragment)
{
fragment_incoming(c->c2.fragment, &c->c2.buf, &c->c2.frame_fragment);
fragment_incoming(c->c2.fragment, &c->c2.buf2, &c->c2.frame_fragment);
}
#endif
@ -1133,14 +1154,14 @@ process_incoming_link_part2(struct context *c, struct link_socket_info *lsi,
/* decompress the incoming packet */
if (c->c2.comp_context)
{
(*c->c2.comp_context->alg.decompress)(&c->c2.buf, c->c2.buffers->decompress_buf,
(*c->c2.comp_context->alg.decompress)(&c->c2.buf2, c->c2.buffers->decompress_buf,
c->c2.comp_context, &c->c2.frame);
}
#endif
#ifdef PACKET_TRUNCATION_CHECK
/* if (c->c2.buf.len > 1) --c->c2.buf.len; */
ipv4_packet_size_verify(BPTR(&c->c2.buf), BLEN(&c->c2.buf), TUNNEL_TYPE(c->c1.tuntap),
/* if (c->c2.buf2.len > 1) --c->c2.buf2.len; */
ipv4_packet_size_verify(BPTR(&c->c2.buf2), BLEN(&c->c2.buf2), TUNNEL_TYPE(c->c1.tuntap),
"POST_DECRYPT", &c->c2.n_trunc_post_decrypt);
#endif
@ -1153,39 +1174,39 @@ process_incoming_link_part2(struct context *c, struct link_socket_info *lsi,
*
* Also, update the persisted version of our packet-id.
*/
if (!TLS_MODE(c) && c->c2.buf.len > 0)
if (!TLS_MODE(c) && c->c2.buf2.len > 0)
{
link_socket_set_outgoing_addr(lsi, &c->c2.from, NULL, c->c2.es);
}
/* reset packet received timer */
if (c->options.ping_rec_timeout && c->c2.buf.len > 0)
if (c->options.ping_rec_timeout && c->c2.buf2.len > 0)
{
event_timeout_reset(&c->c2.ping_rec_interval);
}
/* increment authenticated receive byte count */
if (c->c2.buf.len > 0)
if (c->c2.buf2.len > 0)
{
c->c2.link_read_bytes_auth += c->c2.buf.len;
c->c2.link_read_bytes_auth += c->c2.buf2.len;
c->c2.max_recv_size_local =
max_int(c->c2.original_recv_size, c->c2.max_recv_size_local);
}
/* Did we just receive an openvpn ping packet? */
if (is_ping_msg(&c->c2.buf))
if (is_ping_msg(&c->c2.buf2))
{
dmsg(D_PING, "RECEIVED PING PACKET");
c->c2.buf.len = 0; /* drop packet */
c->c2.buf2.len = 0; /* drop packet */
}
/* Did we just receive an OCC packet? */
if (is_occ_msg(&c->c2.buf))
if (is_occ_msg(&c->c2.buf2))
{
process_received_occ_msg(c);
}
buffer_turnover(orig_buf, &c->c2.to_tun, &c->c2.buf, &c->c2.buffers->read_link_buf);
buffer_turnover(orig_buf, &c->c2.to_tun, &c->c2.buf2, &c->c2.buffers->read_link_buf);
/* to_tun defined + unopened tuntap can cause deadlock */
if (!tuntap_defined(c->c1.tuntap))
@ -1199,14 +1220,31 @@ process_incoming_link_part2(struct context *c, struct link_socket_info *lsi,
}
}
void process_incoming_link_part3(struct context *c)
{
if (BULK_MODE(c))
{
if (c->c2.buf2.len > 0)
{
c->c2.to_tun.offset += 2;
c->c2.buf2.offset += 2;
}
else
{
buf_reset(&c->c2.to_tun);
}
}
}
static void
process_incoming_link(struct context *c, struct link_socket *sock)
{
struct link_socket_info *lsi = &sock->info;
const uint8_t *orig_buf = c->c2.buf.data;
const uint8_t *orig_buf = c->c2.buf2.data;
process_incoming_link_part1(c, lsi, false);
process_incoming_link_part2(c, lsi, orig_buf);
process_incoming_link_part3(c);
}
void
@ -1297,7 +1335,7 @@ process_incoming_dco(dco_context_t *dco)
*/
void
read_incoming_tun(struct context *c)
read_incoming_tun_part2(struct context *c)
{
/*
* Setup for read() call on TUN/TAP device.
@ -1317,8 +1355,7 @@ read_incoming_tun(struct context *c)
ASSERT(buf_safe(&c->c2.buf, c->c2.frame.buf.payload_size));
if (c->c1.tuntap->backend_driver == DRIVER_AFUNIX)
{
c->c2.buf.len =
(int)read_tun_afunix(c->c1.tuntap, BPTR(&c->c2.buf), c->c2.frame.buf.payload_size);
c->c2.buf.len = (int)read_tun_afunix(c->c1.tuntap, BPTR(&c->c2.buf), c->c2.frame.buf.payload_size);
}
else
{
@ -1352,6 +1389,63 @@ read_incoming_tun(struct context *c)
check_status(c->c2.buf.len, "read from TUN/TAP", NULL, c->c1.tuntap);
}
void read_incoming_tun_part3(struct context *c)
{
fd_set rfds;
struct timeval timo;
int plen = 0, pidx = -1;
int fdno = c->c1.tuntap->fd;
for (int x = 0; x < TUN_BAT_MIN; ++x)
{
int leng = plen, indx = (pidx + 1);
if (leng < 1)
{
FD_ZERO(&rfds);
FD_SET(fdno, &rfds);
timo.tv_sec = 0;
timo.tv_usec = 0;
select(fdno+1, &rfds, NULL, NULL, &timo);
if (FD_ISSET(fdno, &rfds))
{
read_incoming_tun_part2(c);
plen = BLEN(&c->c2.buf);
}
else
{
break;
}
}
leng = plen;
if (leng > 0)
{
c->c2.buffers->read_tun_bufs[indx].offset = TUN_BAT_OFF;
c->c2.buffers->read_tun_bufs[indx].len = leng;
bcopy(BPTR(&c->c2.buf), BPTR(&c->c2.buffers->read_tun_bufs[indx]), leng);
c->c2.bufs[indx] = c->c2.buffers->read_tun_bufs[indx];
pidx = indx;
}
else
{
break;
}
plen = 0;
}
c->c2.buffers->bulk_indx = 0;
c->c2.buffers->bulk_leng = (pidx + 1);
}
void read_incoming_tun(struct context *c)
{
if (!BULK_MODE(c))
{
read_incoming_tun_part2(c);
}
else
{
read_incoming_tun_part3(c);
}
}
/**
* Drops UDP packets which OS decided to route via tun.
*
@ -1476,7 +1570,7 @@ drop_if_recursive_routing(struct context *c, struct buffer *buf)
*/
void
process_incoming_tun(struct context *c, struct link_socket *out_sock)
process_incoming_tun_part2(struct context *c, struct link_socket *out_sock)
{
if (c->c2.buf.len > 0)
{
@ -1491,7 +1585,7 @@ process_incoming_tun(struct context *c, struct link_socket *out_sock)
#endif
/* Show packet content */
dmsg(D_TUN_RW, "TUN READ [%d]", BLEN(&c->c2.buf));
dmsg(D_TUN_RW, "TUN READ [%d] [%d]", BLEN(&c->c2.buf), c->c2.frame.buf.payload_size);
if (c->c2.buf.len > 0)
{
@ -1515,7 +1609,10 @@ process_incoming_tun(struct context *c, struct link_socket *out_sock)
}
if (c->c2.buf.len > 0)
{
encrypt_sign(c, true);
if (!BULK_MODE(c))
{
encrypt_sign(c, true);
}
}
else
{
@ -1523,6 +1620,60 @@ process_incoming_tun(struct context *c, struct link_socket *out_sock)
}
}
void process_incoming_tun_part3(struct context *c, struct link_socket *out_sock)
{
if (BULK_DATA(c->c2.buffers))
{
c->c2.buffers->read_tun_max.offset = TUN_BAT_OFF;
c->c2.buffers->read_tun_max.len = 0;
uint8_t *temp = BPTR(&c->c2.buffers->read_tun_max);
int leng = c->c2.buffers->bulk_leng;
int plen = 0, maxl = 0;
for (int x = 0; x < leng; ++x)
{
c->c2.buf = c->c2.bufs[x];
process_incoming_tun_part2(c, out_sock);
plen = BLEN(&c->c2.buf);
if (plen > 0)
{
temp = buff_prepsize(temp, &plen);
bcopy(BPTR(&c->c2.buf), temp, plen);
temp += plen; maxl += (plen + 2);
}
c->c2.bufs[x].len = 0;
}
if (maxl > 0)
{
c->c2.buffers->read_tun_max.offset = TUN_BAT_OFF;
c->c2.buffers->read_tun_max.len = maxl;
c->c2.buf = c->c2.buffers->read_tun_max;
encrypt_sign(c, true);
}
else
{
buf_reset(&c->c2.to_link);
}
}
else
{
buf_reset(&c->c2.to_link);
}
c->c2.buffers->bulk_indx = -1;
c->c2.buffers->bulk_leng = -1;
}
void process_incoming_tun(struct context *c, struct link_socket *out_sock)
{
if (!BULK_MODE(c))
{
process_incoming_tun_part2(c, out_sock);
}
else
{
process_incoming_tun_part3(c, out_sock);
}
}
/**
* Forges a IPv6 ICMP packet with a no route to host error code from the
* IPv6 packet in buf and sends it directly back to the client via the tun
@ -1748,7 +1899,7 @@ process_outgoing_link(struct context *c, struct link_socket *sock)
struct gc_arena gc = gc_new();
int error_code = 0;
if (c->c2.to_link.len > 0 && c->c2.to_link.len <= c->c2.frame.buf.payload_size)
if (c->c2.to_link.len > 0 && (c->c2.to_link.len <= c->c2.frame.buf.payload_size || c->c2.frame.bulk_size > 0))
{
/*
* Setup for call to send/sendto which will send
@ -1793,6 +1944,7 @@ process_outgoing_link(struct context *c, struct link_socket *sock)
fprintf(stderr, "W");
}
#endif
msg(D_LINK_RW, "%s WRITE [%d] to %s: %s",
proto2ascii(sock->info.proto, sock->info.af, true), BLEN(&c->c2.to_link),
print_link_socket_actual(c->c2.to_link_addr, &gc), PROTO_DUMP(&c->c2.to_link, &gc));
@ -1877,7 +2029,7 @@ process_outgoing_link(struct context *c, struct link_socket *sock)
*/
void
process_outgoing_tun(struct context *c, struct link_socket *in_sock)
process_outgoing_tun_part2(struct context *c, struct link_socket *in_sock)
{
/*
* Set up for write() call to TUN/TAP
@ -1908,7 +2060,8 @@ process_outgoing_tun(struct context *c, struct link_socket *in_sock)
fprintf(stderr, "w");
}
#endif
dmsg(D_TUN_RW, "TUN WRITE [%d]", BLEN(&c->c2.to_tun));
dmsg(D_TUN_RW, "TUN WRITE [%d] [%d]", BLEN(&c->c2.to_tun), c->c2.frame.buf.payload_size);
#ifdef PACKET_TRUNCATION_CHECK
ipv4_packet_size_verify(BPTR(&c->c2.to_tun), BLEN(&c->c2.to_tun), TUNNEL_TYPE(c->c1.tuntap),
@ -1962,6 +2115,49 @@ process_outgoing_tun(struct context *c, struct link_socket *in_sock)
buf_reset(&c->c2.to_tun);
}
void process_outgoing_tun_part3(struct context *c, struct link_socket *in_sock)
{
if ((c->c2.to_tun.len > 0) && (c->c2.to_tun.offset > 1))
{
c->c2.to_tun.offset -= 2;
buf_init(&c->c2.buffers->send_tun_max, TUN_BAT_OFF);
buf_copy(&c->c2.buffers->send_tun_max, &c->c2.to_tun);
int maxl = 0, plen = 0;
int leng = BLEN(&c->c2.buffers->send_tun_max);
uint8_t *temp = BPTR(&c->c2.buffers->send_tun_max);
for (int x = 0; x < TUN_BAT_MIN; ++x)
{
temp = buff_postsize(temp, &plen);
if ((leng > 0) && (plen > 0) && ((maxl + plen) < leng))
{
c->c2.to_tun = c->c2.buffers->to_tun_max;
c->c2.to_tun.offset = TUN_BAT_OFF;
c->c2.to_tun.len = plen;
bcopy(temp, BPTR(&c->c2.to_tun), plen);
temp += plen; maxl += (plen + 2);
process_outgoing_tun_part2(c, in_sock);
}
else
{
break;
}
}
}
buf_reset(&c->c2.to_tun);
}
void process_outgoing_tun(struct context *c, struct link_socket *in_sock)
{
if (!BULK_MODE(c))
{
process_outgoing_tun_part2(c, in_sock);
}
else
{
process_outgoing_tun_part3(c, in_sock);
}
}
void
pre_select(struct context *c)
{
@ -2213,7 +2409,7 @@ io_wait(struct context *c, const unsigned int flags)
if (!c->sig->signal_received)
{
if (!(flags & IOW_CHECK_RESIDUAL) || !sockets_read_residual(c))
if (true)
{
int status;
@ -2265,9 +2461,9 @@ io_wait(struct context *c, const unsigned int flags)
c->c2.event_set_status = ES_TIMEOUT;
}
}
else
if (sockets_read_residual(c))
{
c->c2.event_set_status = SOCKET_READ;
c->c2.event_set_status |= (SOCKET_READ << SOCKET_SHIFT);
}
}
@ -2283,8 +2479,28 @@ io_wait(struct context *c, const unsigned int flags)
dmsg(D_EVENT_WAIT, "I/O WAIT status=0x%04x", c->c2.event_set_status);
}
void threaded_fwd_inp_intf(struct context *c, struct link_socket *sock, struct thread_pointer *b)
{
if (b->p->h == b->p->n)
{
ssize_t size;
uint8_t temp[1];
size = read(c->c1.tuntap->fd, temp, 1);
if (size < 1) { /* no-op */ }
if (!IS_SIG(c))
{
if (!BULK_MODE(c))
{
c->c2.buf = c->c2.buffers->read_tun_buf;
}
process_incoming_tun(c, sock);
}
size = write(c->c1.tuntap->fz, temp, 1);
}
}
void
process_io(struct context *c, struct link_socket *sock)
process_io(struct context *c, struct link_socket *sock, struct thread_pointer *b)
{
const unsigned int status = c->c2.event_set_status;
@ -2318,11 +2534,7 @@ process_io(struct context *c, struct link_socket *sock)
/* Incoming data on TUN device */
else if (status & TUN_READ)
{
read_incoming_tun(c);
if (!IS_SIG(c))
{
process_incoming_tun(c, sock);
}
threaded_fwd_inp_intf(c, sock, b);
}
else if (status & DCO_READ)
{

View file

@ -34,6 +34,11 @@
* file
*/
#define BULK_MODE(c) (c && c->c2.frame.bulk_size > 0)
#define BULK_DATA(b) (b && (b->bulk_leng > 0) && (b->bulk_indx < b->bulk_leng))
#define INST_LENG(a) (a && (a->inst_leng > 0) && (a->inst_indx < a->inst_leng))
#define LINK_LEFT(i) (i && sockets_read_residual(i))
#define TUN_OUT(c) (BLEN(&(c)->c2.to_tun) > 0)
#define LINK_OUT(c) (BLEN(&(c)->c2.to_link) > 0)
#define ANY_OUT(c) (TUN_OUT(c) || LINK_OUT(c))
@ -74,7 +79,7 @@ void io_wait(struct context *c, const unsigned int flags);
void pre_select(struct context *c);
void process_io(struct context *c, struct link_socket *sock);
void process_io(struct context *c, struct link_socket *sock, struct thread_pointer *b);
/**********************************************************************/
@ -193,6 +198,8 @@ bool process_incoming_link_part1(struct context *c, struct link_socket_info *lsi
void process_incoming_link_part2(struct context *c, struct link_socket_info *lsi,
const uint8_t *orig_buf);
void process_incoming_link_part3(struct context *c);
/**
* Transfers \c float_sa data extracted from an incoming DCO
* PEER_FLOAT_NTF to \c out_osaddr for later processing.

View file

@ -1906,6 +1906,7 @@ do_open_tun(struct context *c, int *error_flags)
}
/* do ifconfig */
c->c1.tuntap->skip_bind = c->skip_bind;
if (!ifconfig_noexec_enabled(c) && ifconfig_order(c->c1.tuntap) == IFCONFIG_BEFORE_TUN_OPEN)
{
/* guess actual tun/tap unit number that will be returned
@ -1999,6 +2000,10 @@ do_open_tun(struct context *c, int *error_flags)
add_wfp_block(c);
}
if (c->c1.tuntap)
{
c->c1.tuntap->fe = c->c1.tuntap->fd;
}
gc_free(&gc);
return ret;
}
@ -2962,6 +2967,11 @@ frame_finalize_options(struct context *c, const struct options *o)
tailroom += COMP_EXTRA_BUFFER(payload_size);
#endif
if (frame->bulk_size > 0)
{
payload_size = BAT_SIZE(TUN_BAT_ONE, frame->tun_mtu, TUN_BAT_OFF);
}
frame->buf.payload_size = payload_size;
frame->buf.headroom = headroom;
frame->buf.tailroom = tailroom;
@ -3485,6 +3495,10 @@ do_init_frame_tls(struct context *c)
if (c->c2.tls_multi)
{
tls_multi_init_finalize(c->c2.tls_multi, c->options.ce.tls_mtu);
if (c->c2.frame.bulk_size > 0)
{
c->c2.tls_multi->opt.frame.buf.payload_size = c->c2.frame.tun_mtu;
}
ASSERT(c->c2.tls_multi->opt.frame.buf.payload_size <= c->c2.frame.buf.payload_size);
frame_print(&c->c2.tls_multi->opt.frame, D_MTU_INFO, "Control Channel MTU parms");
@ -3552,6 +3566,14 @@ do_init_frame(struct context *c)
c->c2.frame.extra_tun += c->options.ce.tun_mtu_extra;
}
/*
* Adjust bulk size based on the --bulk-mode parameter.
*/
if (c->options.ce.bulk_mode)
{
c->c2.frame.bulk_size = c->options.ce.tun_mtu;
}
/*
* Fill in the blanks in the frame parameters structure,
* make sure values are rational, etc.
@ -3692,9 +3714,45 @@ init_context_buffers(const struct frame *frame)
size_t buf_size = BUF_SIZE(frame);
if (frame->bulk_size > 0)
{
size_t off_size = (frame->buf.headroom + TUN_BAT_OFF + frame->buf.tailroom);
buf_size = BAT_SIZE(TUN_BAT_MAX, frame->tun_mtu, off_size);
}
dmsg(M_INFO, "BULK bufs [%ld] [%d+%d+%d]", buf_size, frame->buf.headroom, frame->buf.payload_size, frame->buf.tailroom);
b->read_link_buf = alloc_buf(buf_size);
b->read_tun_buf = alloc_buf(buf_size);
if (frame->bulk_size > 0)
{
size_t off_size = (frame->buf.headroom + TUN_BAT_OFF + frame->buf.tailroom);
size_t one_size = BAT_SIZE(TUN_BAT_ONE, frame->tun_mtu, off_size);
for (int x = 0; x < TUN_BAT_MAX; ++x)
{
b->read_tun_bufs[x] = alloc_buf(one_size);
b->read_tun_bufs[x].offset = TUN_BAT_OFF;
b->read_tun_bufs[x].len = 0;
}
b->read_tun_max = alloc_buf(buf_size);
b->read_tun_max.offset = TUN_BAT_OFF;
b->read_tun_max.len = 0;
b->send_tun_max = alloc_buf(buf_size);
b->send_tun_max.offset = TUN_BAT_OFF;
b->send_tun_max.len = 0;
b->to_tun_max = alloc_buf(buf_size);
b->to_tun_max.offset = TUN_BAT_OFF;
b->to_tun_max.len = 0;
}
b->bulk_indx = -1;
b->bulk_leng = -1;
b->aux_buf = alloc_buf(buf_size);
b->encrypt_buf = alloc_buf(buf_size);
@ -3717,6 +3775,17 @@ free_context_buffers(struct context_buffers *b)
free_buf(&b->read_tun_buf);
free_buf(&b->aux_buf);
if (b->to_tun_max.data)
{
free_buf(&b->to_tun_max);
free_buf(&b->send_tun_max);
free_buf(&b->read_tun_max);
for (int x = 0; x < TUN_BAT_MAX; ++x)
{
free_buf(&b->read_tun_bufs[x]);
}
}
#ifdef USE_COMP
free_buf(&b->compress_buf);
free_buf(&b->decompress_buf);

View file

@ -38,15 +38,18 @@
#endif
struct multi_instance *
multi_create_instance_tcp(struct multi_context *m, struct link_socket *sock)
multi_create_instance_tcp(struct thread_pointer *b, struct link_socket *sock)
{
struct gc_arena gc = gc_new();
struct multi_context *m = b->p->m[b->i-1];
struct multi_instance *mi = NULL;
struct hash *hash = m->hash;
mi = multi_create_instance(m, NULL, sock);
mi = multi_create_instance(b, NULL, sock);
if (mi)
{
m = b->p->p;
hash = m->hash;
mi->real.proto = sock->info.proto;
struct hash_element *he;
const uint32_t hv = hash_value(hash, &mi->real);

View file

@ -45,7 +45,7 @@ bool multi_tcp_process_outgoing_link(struct multi_context *m, bool defer,
bool multi_tcp_process_outgoing_link_ready(struct multi_context *m, struct multi_instance *mi,
const unsigned int mpp_flags);
struct multi_instance *multi_create_instance_tcp(struct multi_context *m, struct link_socket *sock);
struct multi_instance *multi_create_instance_tcp(struct thread_pointer *b, struct link_socket *sock);
void multi_tcp_link_out_deferred(struct multi_context *m, struct multi_instance *mi);

View file

@ -41,9 +41,16 @@ void
alloc_buf_sock_tun(struct buffer *buf, const struct frame *frame)
{
/* allocate buffer for overlapped I/O */
*buf = alloc_buf(BUF_SIZE(frame));
int alen = BUF_SIZE(frame);
int blen = frame->buf.payload_size;
if (frame->bulk_size > 0)
{
alen = BAT_SIZE(TUN_BAT_MAX, frame->tun_mtu, TUN_BAT_OFF);
blen = BAT_SIZE(TUN_BAT_MAX, frame->tun_mtu, TUN_BAT_NOP);
}
*buf = alloc_buf(alen);
ASSERT(buf_init(buf, frame->buf.headroom));
buf->len = frame->buf.payload_size;
buf->len = blen;
ASSERT(buf_safe(buf, 0));
}

View file

@ -58,6 +58,15 @@
*/
#define TUN_MTU_MIN 100
/*
* Bulk mode static define values.
*/
#define TUN_BAT_MIN 6
#define TUN_BAT_MAX 9
#define TUN_BAT_OFF 250
#define TUN_BAT_NOP 0
#define TUN_BAT_ONE 1
/*
* Default MTU of network over which tunnel data will pass by TCP/UDP.
*/
@ -162,6 +171,11 @@ struct frame
* which defaults to 0 for tun and 32
* (\c TAP_MTU_EXTRA_DEFAULT) for tap.
* */
int bulk_size; /**< Configure and setup in the init library
* frame function to signal and inform the various
* related function calls to process bulk mode data transfers.
* */
};
/* Forward declarations, to prevent includes */
@ -181,6 +195,7 @@ struct options;
* larger than the headroom.
*/
#define BUF_SIZE(f) ((f)->buf.headroom + (f)->buf.payload_size + (f)->buf.tailroom)
#define BAT_SIZE(a, b, c) ((a * b) + c)
/*
* Function prototypes.

View file

@ -193,6 +193,8 @@ multi_get_create_instance_udp(struct multi_context *m, bool *floated, struct lin
struct mroute_addr real = { 0 };
struct multi_instance *mi = NULL;
struct hash *hash = m->hash;
struct context_pointer p = { 0 };
struct thread_pointer b = { 0 };
real.proto = sock->info.proto;
m->hmac_reply_ls = sock;
@ -266,7 +268,8 @@ multi_get_create_instance_udp(struct multi_context *m, bool *floated, struct lin
* connect-freq but not against connect-freq-initial */
reflect_filter_rate_limit_decrease(m->initial_rate_limiter);
mi = multi_create_instance(m, &real, sock);
p.p = m; b.p = &p; b.i = -1;
mi = multi_create_instance(&b, &real, sock);
if (mi)
{
hash_add_fast(hash, bucket, &mi->real, hv, mi);
@ -367,16 +370,16 @@ unsigned int
p2mp_iow_flags(const struct multi_context *m)
{
unsigned int flags = IOW_WAIT_SIGNAL;
if (m->pending)
if (m->pending || m->pending2)
{
if (TUN_OUT(&m->pending->context))
{
flags |= IOW_TO_TUN;
}
if (LINK_OUT(&m->pending->context))
if (m->pending && LINK_OUT(&m->pending->context))
{
flags |= IOW_TO_LINK;
}
if (m->pending2 && TUN_OUT(&m->pending2->context))
{
flags |= IOW_TO_TUN;
}
}
else if (mbuf_defined(m->mbuf))
{

File diff suppressed because it is too large Load diff

View file

@ -145,6 +145,10 @@ struct multi_instance
#ifdef ENABLE_ASYNC_PUSH
int inotify_watch; /* watch descriptor for acf */
#endif
int mtio_stat;
int mtio_idno;
struct multi_address mtio_addr;
};
@ -193,6 +197,7 @@ struct multi_context
#endif
struct multi_instance *pending;
struct multi_instance *pending2;
struct multi_instance *earliest_wakeup;
struct multi_instance **mpp_touched;
struct context_buffers *context_buffers;
@ -216,8 +221,17 @@ struct multi_context
#endif
struct deferred_signal_schedule_entry deferred_shutdown_signal;
int inst_indx;
int inst_leng;
struct multi_instance **inst_list;
int mtio_stat;
int mtio_idno;
struct multi_info mtio_info;
};
/**
* Return values used by the client connect call-back functions.
*/
@ -253,8 +267,15 @@ struct multi_route
*
* @param top - Top-level context structure.
*/
void tunnel_server(struct context *top);
void threaded_tunnel_server(struct context *c, struct context *d);
int min_max(int a, int b, int c);
bool multi_context_switch_addr(struct multi_context *m, struct multi_instance *i, bool s, bool l);
struct multi_context *multi_context_switch_conn(struct thread_pointer *b, struct multi_context *m, struct multi_instance *i);
struct multi_instance *multi_learn_in_addr_t(struct multi_context *m, struct multi_instance *mi, in_addr_t a, int netbits, bool primary);
const char *multi_instance_string(const struct multi_instance *mi, bool null, struct gc_arena *gc);
@ -262,9 +283,7 @@ const char *multi_instance_string(const struct multi_instance *mi, bool null, st
* Called by mtcp.c, mudp.c, or other (to be written) protocol drivers
*/
struct multi_instance *multi_create_instance(struct multi_context *m,
const struct mroute_addr *real,
struct link_socket *sock);
struct multi_instance *multi_create_instance(struct thread_pointer *b, const struct mroute_addr *real, struct link_socket *sock);
void multi_close_instance(struct multi_context *m, struct multi_instance *mi, bool shutdown);
@ -352,6 +371,9 @@ bool multi_process_incoming_link(struct multi_context *m, struct multi_instance
*/
bool multi_process_incoming_tun(struct multi_context *m, const unsigned int mpp_flags);
bool multi_process_inp_tun_post(struct multi_context *m, const unsigned int mpp_flags);
bool threaded_multi_inp_tun(struct multi_context *m, const unsigned int mpp_flags);
void multi_process_drop_outgoing_tun(struct multi_context *m, const unsigned int mpp_flags);
@ -413,7 +435,6 @@ static inline struct multi_instance *
multi_process_outgoing_link_pre(struct multi_context *m)
{
struct multi_instance *mi = NULL;
if (m->pending)
{
mi = m->pending;
@ -638,7 +659,7 @@ multi_get_timeout_instance(struct multi_context *m, struct timeval *dest)
static inline bool
multi_process_outgoing_tun(struct multi_context *m, const unsigned int mpp_flags)
{
struct multi_instance *mi = m->pending;
struct multi_instance *mi = m->pending2;
bool ret = true;
ASSERT(mi);
@ -658,8 +679,7 @@ multi_process_outgoing_tun(struct multi_context *m, const unsigned int mpp_flags
| OPT_P_COMP | OPT_P_SOCKFLAGS)
static inline bool
multi_process_outgoing_link_dowork(struct multi_context *m, struct multi_instance *mi,
const unsigned int mpp_flags)
multi_process_outgoing_link_dowork(struct multi_context *m, struct multi_instance *mi, const unsigned int mpp_flags)
{
bool ret = true;
set_prefix(mi);
@ -706,6 +726,13 @@ multi_set_pending(struct multi_context *m, struct multi_instance *mi)
{
m->pending = mi;
}
static inline void
multi_set_pending2(struct multi_context *m, struct multi_instance *mi)
{
m->pending2 = mi;
}
/**
* Assigns a peer-id to a a client and adds the instance to the
* the instances array of the \c multi_context structure.

View file

@ -284,12 +284,9 @@ multi_io_dispatch(struct multi_context *m, struct multi_instance *mi, const int
switch (action)
{
case TA_INST_LENG:
case TA_TUN_READ:
read_incoming_tun(&m->top);
if (!IS_SIG(&m->top))
{
multi_process_incoming_tun(m, mpp_flags);
}
threaded_multi_inp_tun(m, mpp_flags);
break;
case TA_SOCKET_READ:
@ -355,60 +352,43 @@ multi_io_post(struct multi_context *m, struct multi_instance *mi, const int acti
struct context *c = multi_get_context(m, mi);
int newaction = TA_UNDEF;
#define MTP_NONE 0
#define MTP_TUN_OUT (1 << 0)
#define MTP_LINK_OUT (1 << 1)
unsigned int flags = MTP_NONE;
if (LINK_OUT(c))
{
newaction = TA_SOCKET_WRITE;
goto last;
}
else if (INST_LENG(m))
{
newaction = TA_INST_LENG;
goto last;
}
if (TUN_OUT(c))
{
flags |= MTP_TUN_OUT;
newaction = TA_TUN_WRITE;
goto last;
}
if (LINK_OUT(c))
else if (LINK_LEFT(c))
{
flags |= MTP_LINK_OUT;
newaction = TA_SOCKET_READ_RESIDUAL;
goto last;
}
switch (flags)
if (mi)
{
case MTP_TUN_OUT | MTP_LINK_OUT:
case MTP_TUN_OUT:
newaction = TA_TUN_WRITE;
break;
case MTP_LINK_OUT:
newaction = TA_SOCKET_WRITE;
break;
case MTP_NONE:
if (mi && sockets_read_residual(c))
{
newaction = TA_SOCKET_READ_RESIDUAL;
}
else
{
multi_io_set_global_rw_flags(m, mi);
}
break;
default:
{
struct gc_arena gc = gc_new();
msg(M_FATAL, "MULTI IO: multi_io_post bad state, mi=%s flags=%d",
multi_instance_string(mi, false, &gc), flags);
gc_free(&gc);
break;
}
multi_io_set_global_rw_flags(m, mi);
}
last:
dmsg(D_MULTI_DEBUG, "MULTI IO: multi_io_post %s -> %s", pract(action), pract(newaction));
return newaction;
}
void
multi_io_process_io(struct multi_context *m)
multi_io_process_io(struct thread_pointer *b)
{
struct multi_context *m = b->p->m[b->i-1];
struct multi_io *multi_io = m->multi_io;
int i;
@ -453,19 +433,14 @@ multi_io_process_io(struct multi_context *m)
if (!proto_is_dgram(ev_arg->u.sock->info.proto))
{
socket_reset_listen_persistent(ev_arg->u.sock);
mi = multi_create_instance_tcp(m, ev_arg->u.sock);
mi = multi_create_instance_tcp(b, ev_arg->u.sock);
if (mi) { multi_io_action(b->p->p, mi, TA_INITIAL, false); }
}
else
{
multi_process_io_udp(m, ev_arg->u.sock);
mi = m->pending;
}
/* monitor and/or handle events that are
* triggered in succession by the first one
* before returning to the main loop. */
if (mi)
{
multi_io_action(m, mi, TA_INITIAL, false);
if (m->pending) { multi_io_action(m, m->pending, TA_INITIAL, false); }
if (m->pending2) { multi_io_action(m, m->pending2, TA_INITIAL, false); }
}
break;
}
@ -492,7 +467,6 @@ multi_io_process_io(struct multi_context *m)
multi_io_action(m, NULL, TA_TUN_READ, false);
}
}
#if defined(ENABLE_DCO)
/* incoming data on DCO? */
else if (e->arg == MULTI_IO_DCO)
@ -529,6 +503,22 @@ multi_io_process_io(struct multi_context *m)
multi_io_action(m, mi, TA_SOCKET_WRITE, true);
}
}
if (m->mtio_stat == 3)
{
int numb_clis = (int)m->max_clients;
for (int x = 0; x < numb_clis; ++x)
{
struct multi_instance *j = m->instances[x];
if (!j) { continue; }
if (j->mtio_stat == 3)
{
multi_context_switch_addr(m, j, true, true);
j->mtio_stat = 5;
}
}
m->mtio_stat = 5;
}
}
void
@ -551,7 +541,7 @@ multi_io_action(struct multi_context *m, struct multi_instance *mi, int action,
* On our first pass, poll will be false because we already know
* that input is available, and to call io_wait would be redundant.
*/
if (poll && action != TA_SOCKET_READ_RESIDUAL)
if (poll && action != TA_SOCKET_READ_RESIDUAL && action != TA_INST_LENG)
{
const int orig_action = action;
action = multi_io_wait_lite(m, mi, action, &tun_input_pending);
@ -586,10 +576,22 @@ multi_io_action(struct multi_context *m, struct multi_instance *mi, int action,
* for a particular instance, point to
* that instance.
*/
int retry_undef = 0;
if (m->pending)
{
mi = m->pending;
}
if (m->pending2)
{
if (!m->pending)
{
mi = m->pending2;
}
else
{
retry_undef = 1;
}
}
/*
* Based on the effects of the action,
@ -597,6 +599,11 @@ multi_io_action(struct multi_context *m, struct multi_instance *mi, int action,
* possibly transition to a new action state.
*/
action = multi_io_post(m, mi, action);
if ((action == TA_UNDEF) && (retry_undef == 1))
{
mi = m->pending2;
action = multi_io_post(m, mi, action);
}
/*
* If we are finished processing the original action,

View file

@ -44,6 +44,7 @@
#define TA_INITIAL 8
#define TA_TIMEOUT 9
#define TA_TUN_WRITE_TIMEOUT 10
#define TA_INST_LENG 11
/*
* I/O state and events tracker
@ -67,7 +68,7 @@ void multi_io_free(struct multi_io *multi_io);
int multi_io_wait(struct multi_context *m);
void multi_io_process_io(struct multi_context *m);
void multi_io_process_io(struct thread_pointer *b);
void multi_io_set_global_rw_flags(struct multi_context *m, struct multi_instance *mi);

View file

@ -37,6 +37,10 @@
#define P2P_CHECK_SIG() EVENT_LOOP_CHECK_SIGNAL(c, process_signal_p2p, c);
#define TEST_CONN_ADDR_MAPS(s, d, i) (((s.v4.addr == i.srca) && (d.v4.addr == i.dsta)) || ((d.v4.addr == i.srca) && (s.v4.addr == i.dsta)))
#define TEST_CONN_ADDR_NOTS(s, d, i) ((s.v4.addr == i) || (d.v4.addr == i))
#define TEST_CONN_ADDR_MSKS(s, d, i, m) (((s.v4.addr & m) == i) && ((d.v4.addr & m) == i))
static bool
process_signal_p2p(struct context *c)
{
@ -45,6 +49,77 @@ process_signal_p2p(struct context *c)
}
int proc_conn_addr_maps(struct mtio_args *args, struct mtio_cons *cons, uint8_t *buff_data, int buff_size, bool mtio_mode)
{
int thid = *(args->thid), expr = args->expr;
struct mroute_addr srca = { 0 }, dsta = { 0 };
struct buffer buff;
buff.data = buff_data; buff.offset = 0; buff.len = buff_size;
if (mtio_mode && (mroute_extract_addr_ip(&srca, &dsta, &buff) == MROUTE_EXTRACT_SUCCEEDED))
{
int flag = 1;
for (int y = 0; y < args->notl; ++y)
{
if (TEST_CONN_ADDR_NOTS(srca, dsta, args->nots[y]))
{
flag = 0;
}
}
for (int y = 0; (y + 1) < args->mskl; y += 2)
{
if (TEST_CONN_ADDR_MSKS(srca, dsta, args->msks[y], args->msks[y + 1]))
{
flag = 0;
}
}
if (flag == 1)
{
int indx = -1, scan = 0, memr = 4, logs = -1;
time_t secs = time(NULL);
uint32_t srcn = srca.v4.addr, dstn = dsta.v4.addr;
uint32_t srch = (HASH_PART(srcn, 24, 11, 103) + HASH_PART(srcn, 16, 13, 107) + HASH_PART(srcn, 8, 17, 109) + HASH_PART(srcn, 0, 19, 113));
uint32_t dsth = (HASH_PART(dstn, 24, 31, 131) + HASH_PART(dstn, 16, 53, 137) + HASH_PART(dstn, 8, 67, 139) + HASH_PART(dstn, 0, 79, 151));
uint32_t hidx = (((srch * 163) + (dsth * 167)) % MAX_CSTATES);
for (int y = 0; y < MAX_CSTATES; ++y)
{
if (TEST_CONN_ADDR_MAPS(srca, dsta, cons[hidx]))
{
indx = hidx;
if ((secs - cons[hidx].last) >= expr) { indx = (-1 * (hidx + 11)); }
break;
}
else if ((secs - cons[hidx].last) >= expr)
{
if (indx == -1) { indx = (-1 * (hidx + 11)); }
}
if ((scan >= memr) && (indx != -1)) { break; }
hidx = ((hidx + 1) % MAX_CSTATES);
scan += 1;
}
if (indx < 0)
{
indx = (indx == -1) ? (int)hidx : ((indx * -1) - 11);
cons[indx].srca = srca.v4.addr;
cons[indx].dsta = dsta.v4.addr;
cons[indx].thid = thid;
logs = 1;
}
if (logs == 1)
{
char a[28], b[28];
struct in_addr t;
t.s_addr = srca.v4.addr; bzero(a, 28); snprintf(a, 24, "%s", inet_ntoa(t));
t.s_addr = dsta.v4.addr; bzero(b, 28); snprintf(b, 24, "%s", inet_ntoa(t));
msg(M_INFO, "%s MTIO maps <%d>(%d) [%s][%s] {%d}{%d}", args->pref, indx, expr, a, b, thid, cons[indx].thid);
}
thid = cons[indx].thid;
cons[indx].last = secs;
}
}
return thid;
}
/**************************************************************************/
/**
* Main event loop for OpenVPN in client mode, where only one VPN tunnel
@ -53,9 +128,13 @@ process_signal_p2p(struct context *c)
*
* @param c - The context structure of the single active VPN tunnel.
*/
static void
tunnel_point_to_point(struct context *c)
void *tunnel_point_to_point(void *a)
{
struct thread_pointer *b = (struct thread_pointer *)a;
struct context_pointer *p = b->p;
struct context *c = (b->n == 1) ? p->c : b->c;
struct context *d = (b->n == 1) ? b->c : p->c;
context_clear_2(c);
/* set point-to-point mode */
@ -66,12 +145,44 @@ tunnel_point_to_point(struct context *c)
init_instance_handle_signals(c, c->es, stdin_config ? 0 : CC_HARD_USR1_TO_HUP);
if (IS_SIG(c))
{
return;
return NULL;
}
if (b->i == 1)
{
while (p->h < p->n)
{
if (p->z == -1) { break; } else { sleep(1); }
}
p->z = 1;
}
else
{
b->h += 1; p->h += 1;
while ((p->z != 1) || (!(d->c1.tuntap)) || (d->c1.tuntap->ff <= 1))
{
if (p->z == -1) { break; } else { sleep(1); }
}
}
msg(M_INFO, "TCPv4_CLIENT MTIO init [%d][%d] [%d][%d] {%d}{%d}", b->h, b->n, p->h, p->n, p->z, b->i);
/* main event loop */
while (true)
{
if (p->z != 1) { break; }
if (c->c1.tuntap && (c->c1.tuntap->fd > 1) && (c->c1.tuntap->ff <= 1))
{
socketpair(AF_UNIX, SOCK_DGRAM, 0, p->s[b->i-1]);
socketpair(AF_UNIX, SOCK_DGRAM, 0, p->r[b->i-1]);
c->c1.tuntap->ff = c->c1.tuntap->fd;
c->c1.tuntap->fe = (b->i == 1) ? c->c1.tuntap->ff : d->c1.tuntap->ff;
//c->c1.tuntap->fd = (b->i == 1) ? c->c1.tuntap->ff : d->c1.tuntap->ff;
c->c1.tuntap->fd = p->s[b->i-1][0];
c->c1.tuntap->fz = p->r[b->i-1][1];
msg(M_INFO, "TCPv4_CLIENT MTIO fdno [%d][%d][%d][%d] {%d}", c->c1.tuntap->fd, c->c1.tuntap->fe, c->c1.tuntap->ff, c->c1.tuntap->fz, b->i);
}
/* process timers, TLS, etc. */
pre_select(c);
P2P_CHECK_SIG();
@ -87,16 +198,279 @@ tunnel_point_to_point(struct context *c)
}
/* process the I/O which triggered select */
process_io(c, c->c2.link_sockets[0]);
process_io(c, c->c2.link_sockets[0], b);
P2P_CHECK_SIG();
}
msg(M_INFO, "TCPv4_CLIENT MTIO fins [%d][%d] [%d][%d] {%d}{%d}", b->h, b->n, p->h, p->n, p->z, b->i);
p->z = -1;
if (c->c1.tuntap && (c->c1.tuntap->ff > 1))
{
close(p->s[b->i-1][0]); close(p->s[b->i-1][1]);
close(p->r[b->i-1][0]); close(p->r[b->i-1][1]);
c->c1.tuntap->fd = c->c1.tuntap->ff;
c->c1.tuntap->ff = -1;
}
persist_client_stats(c);
uninit_management_callback();
/* tear down tunnel instance (unless --persist-tun) */
close_instance(c);
return NULL;
}
void *threaded_io_management(void *a)
{
struct thread_pointer *b = (struct thread_pointer *)a;
struct context_pointer *p = b->p;
struct context *c, *d;
int maxt = p->n, maxf = 0, maxl = 0;
int maxz = TUN_BAT_MIN, maxx = (MAX_THREADS * TUN_BAT_MIN);
int fdno = 0, indx = 0, size = 0, selw = 0;
int thid = 0, tidx = 0, tbeg = 0, tend = 0;
ssize_t leng = 0;
uint8_t *ptra;
uint8_t busy[maxx], proc[maxx], sels[maxx];
fd_set rfds;
struct timeval timo;
struct mtio_args marg;
struct mtio_cons cons[MAX_CSTATES];
while (true)
{
if (p->z == -1) { break; }
if ((p->z == 1) && (p->h == p->n)) { break; }
sleep(1);
}
d = (p->m && p->m[0]) ? &(p->m[0]->top) : b[0].c;
size = d->c2.frame.buf.payload_size;
if (size < 1) { size = 1; }
while (true)
{
if (p->z != 1) { break; }
int stat = 0;
for (int x = 0; x < maxt; ++x)
{
if (p->m) { stat += 1; }
else
{
struct tls_multi *m = b[x].c->c2.tls_multi;
if (m && (m->multi_state == CAS_CONNECT_DONE)) { stat += 1; }
}
}
if (stat == maxt) { break; }
sleep(1);
}
bool mtmo = d->options.ce.mtio_mode;
int expr = d->options.ce.mtio_time;
in_addr_t nots[] = { inet_addr("0.0.0.0"), inet_addr("255.255.255.255") };
in_addr_t msks[] = { inet_addr("10.0.0.0"), inet_addr("255.0.0.0") };
int sizs[maxx], idxs[maxx], lens[maxt];
uint8_t bufl[maxx][size];
uint8_t *bufs[maxx], *swap[maxx];
for (int x = 0; x < maxx; ++x)
{
sizs[x] = 0; idxs[x] = 0;
bufs[x] = bufl[x];
}
msg(M_INFO, "%s MTIO mgmt [%d] {%d}", (p->m) ? "TCPv4_SERVER" : "TCPv4_CLIENT", size, BULK_MODE(d));
marg.pref = (p->m) ? "TCPv4_SERVER" : "TCPv4_CLIENT";
marg.thid = &(thid); marg.expr = expr; marg.busy = busy;
marg.nots = nots; marg.notl = (sizeof(nots) / sizeof(nots[0]));
marg.msks = msks; marg.mskl = (sizeof(msks) / sizeof(msks[0]));
bzero(busy, maxt * sizeof(uint8_t));
bzero(cons, MAX_CSTATES * sizeof(struct mtio_cons));
while (true)
{
if (p->z != 1) { break; }
indx = -1; maxf = 0; maxl = 0;
FD_ZERO(&rfds);
for (int x = 0; x < maxt; ++x)
{
if (busy[x] != 1) { indx = x; }
if (p->r[x][0] > 1) { FD_SET(p->r[x][0], &rfds); maxl += 1; }
if (p->r[x][0] > maxf) { maxf = p->r[x][0]; }
}
if (maxl != maxt)
{
sleep(1);
continue;
}
timo.tv_sec = 0; timo.tv_usec = 0;
select(maxf+1, &rfds, NULL, NULL, (indx < 0) ? NULL : &timo);
for (int x = 0; x < maxt; ++x)
{
if (FD_ISSET(p->r[x][0], &rfds) || ((busy[x] == 1) && (sels[x] == 1) && (selw == 1)))
{
leng = read(p->r[x][0], &(busy[maxt+1]), 1);
busy[x] = 0;
}
if (busy[x] != 1)
{
if (busy[thid] == 1) { thid = x; }
lens[x] = 0;
}
proc[x] = 0; sels[x] = 0;
}
tbeg = 0; tend = 0; selw = 0;
for (int x = 0; x < maxx; ++x)
{
if (sizs[x] > 0) { tend = (x + 1); }
else if (sizs[tbeg] > 0) { tbeg = x; }
}
d = (p->m) ? &(p->m[0]->top) : b[0].c;
fdno = d->c1.tuntap->ff;
FD_ZERO(&rfds); FD_SET(fdno, &rfds);
timo.tv_sec = 1; timo.tv_usec = 750000;
if (tend > 0) { timo.tv_sec = 0; timo.tv_usec = 0; }
if (BULK_MODE(d))
{
for (int x = 0; (fdno > 1) && (x < maxx); ++x)
{
if (x >= tend)
{
tend = min_max(tend, 0, maxx);
select(fdno+1, &rfds, NULL, NULL, &timo);
if ((p->z == 1) && FD_ISSET(fdno, &rfds) && (tend < maxx))
{
leng = read(fdno, bufs[tend], size);
maxl = (int)leng;
tidx = thid;
thid = proc_conn_addr_maps(&(marg), cons, bufs[tend], maxl, mtmo);
sizs[tend] = maxl; idxs[tend] = thid;
tend = (x + 1);
thid = ((tidx + 1) % maxt);
}
FD_ZERO(&rfds); FD_SET(fdno, &rfds);
timo.tv_sec = 0; timo.tv_usec = 0;
}
if (sizs[x] > 0)
{
tidx = idxs[x]; sels[tidx] = 1; selw = 1;
c = (p->m) ? &(p->m[tidx]->top) : b[tidx].c;
indx = min_max(lens[tidx], 0, maxz);
if ((p->z == 1) && (busy[tidx] != 1) && (indx < maxz))
{
c->c2.buffers->read_tun_bufs[indx].len = sizs[x];
c->c2.buffers->read_tun_bufs[indx].offset = TUN_BAT_OFF;
ptra = BPTR(&c->c2.buffers->read_tun_bufs[indx]);
bcopy(bufs[x], ptra, sizs[x]);
c->c2.bufs[indx] = c->c2.buffers->read_tun_bufs[indx];
c->c2.buffers->bulk_indx = 0;
c->c2.buffers->bulk_leng = (indx + 1);
sizs[x] = 0; proc[tidx] = 1; lens[tidx] += 1;
}
}
if ((sizs[x] < 1) && ((x < tbeg) || (sizs[tbeg] > 0)))
{
tbeg = x;
}
if ((sizs[x] > 0) && ((x > tbeg) && (sizs[tbeg] < 1)))
{
swap[tbeg] = bufs[tbeg]; bufs[tbeg] = bufs[x]; bufs[x] = swap[tbeg];
sizs[tbeg] = sizs[x]; idxs[tbeg] = idxs[x]; sizs[x] = 0;
while ((tbeg <= x) && (sizs[tbeg] > 0))
{
++tbeg;
}
}
}
}
else
{
if (fdno > 1)
{
tend = (maxx - 1);
if (sizs[tend] < 1)
{
tend = min_max(tend, 0, maxx);
select(fdno+1, &rfds, NULL, NULL, &timo);
if ((p->z == 1) && FD_ISSET(fdno, &rfds))
{
leng = read(fdno, bufs[tend], size);
maxl = (int)leng;
tidx = proc_conn_addr_maps(&(marg), cons, bufs[tend], maxl, mtmo);
sizs[tend] = maxl; idxs[tend] = tidx;
thid = ((thid + 1) % maxt);
}
}
if (sizs[tend] > 0)
{
tidx = idxs[tend];
if ((p->z == 1) && (busy[tidx] != 1))
{
c = (p->m) ? &(p->m[tidx]->top) : b[tidx].c;
c->c2.buffers->read_tun_buf.len = sizs[tend];
ptra = BPTR(&c->c2.buffers->read_tun_buf);
bcopy(bufs[tend], ptra, sizs[tend]);
sizs[tend] = 0; proc[tidx] = 1;
}
}
}
}
for (int x = 0; x < maxt; ++x)
{
if (proc[x] == 1)
{
leng = write(p->s[x][1], busy, 1);
busy[x] = 1; sels[x] = 0; selw = 0;
}
}
}
p->z = -1;
return NULL;
}
void threaded_tunnel_point_to_point(struct context *c, struct context *d)
{
int maxt = (c->options.ce.mtio_mode) ? MAX_THREADS : 1;
struct context_pointer p;
struct thread_pointer b[MAX_THREADS];
pthread_t thrm, thrd[MAX_THREADS];
pthread_mutex_t lock;
bzero(&(p), sizeof(struct context_pointer));
p.c = c; p.i = 1; p.n = maxt; p.h = 1; p.z = 0;
p.l = &(lock);
bzero(p.l, sizeof(pthread_mutex_t));
pthread_mutex_init(p.l, NULL);
c->skip_bind = 0;
b[0].p = &(p); b[0].c = c; b[0].i = 1; b[0].n = p.n; b[0].h = 0;
bzero(&(thrd[0]), sizeof(pthread_t));
pthread_create(&(thrd[0]), NULL, tunnel_point_to_point, &(b[0]));
bzero(&(thrm), sizeof(pthread_t));
pthread_create(&(thrm), NULL, threaded_io_management, &(b[0]));
for (int x = 1; x < p.n; ++x)
{
d[x].skip_bind = -1;
b[x].p = &(p); b[x].c = &(d[x]); b[x].i = (x + 1); b[x].n = p.n; b[x].h = 1;
bzero(&(thrd[x]), sizeof(pthread_t));
pthread_create(&(thrd[x]), NULL, tunnel_point_to_point, &(b[x]));
}
pthread_join(thrd[0], NULL);
for (int x = 1; x < p.n; ++x)
{
pthread_join(thrd[x], NULL);
}
pthread_join(thrm, NULL);
}
#undef PROCESS_SIGNAL_P2P
@ -153,6 +527,9 @@ static int
openvpn_main(int argc, char *argv[])
{
struct context c;
struct context d[MAX_THREADS];
char devs[MAX_THREADS][MAX_STRLENG];
char fils[MAX_THREADS][MAX_STRLENG];
#if PEDANTIC
fprintf(stderr, "Sorry, I was built with --enable-pedantic and I am incapable of doing any real work!\n");
@ -297,17 +674,44 @@ openvpn_main(int argc, char *argv[])
/* finish context init */
context_init_1(&c);
if (c.options.ce.mtio_mode)
{
for (int x = 0; x < MAX_THREADS; ++x)
{
struct context *b = &(d[x]);
bcopy(&c, b, sizeof(struct context));
context_init_1(b);
if (c.options.dev)
{
bzero(devs[x], MAX_STRLENG * sizeof(char));
snprintf(devs[x], MAX_STRLENG-5, "%st%02d", c.options.dev, x);
b->options.dev = devs[x];
}
if (c.options.status_file)
{
bzero(fils[x], MAX_STRLENG * sizeof(char));
snprintf(fils[x], MAX_STRLENG-5, "%st%02d", c.options.status_file, x);
b->options.status_file = fils[x];
}
msg(M_INFO, "INFO MTIO init [%d] [%s][%s]", x, b->options.dev, b->options.status_file);
}
}
do
{
/* run tunnel depending on mode */
switch (c.options.mode)
{
case MODE_POINT_TO_POINT:
tunnel_point_to_point(&c);
threaded_tunnel_point_to_point(&c, d);
break;
case MODE_SERVER:
tunnel_server(&c);
threaded_tunnel_server(&c, d);
break;
default:
@ -325,12 +729,32 @@ openvpn_main(int argc, char *argv[])
/* pass restart status to management subsystem */
signal_restart_status(c.sig);
if (c.options.ce.mtio_mode)
{
for (int x = 0; x < MAX_THREADS; ++x)
{
d[x].first_time = false;
signal_restart_status(d[x].sig);
}
}
} while (signal_reset(c.sig, SIGUSR1) == SIGUSR1);
env_set_destroy(c.es);
uninit_options(&c.options);
gc_reset(&c.gc);
uninit_early(&c);
/*if (c.options.ce.mtio_mode)
{
for (int x = 0; x < MAX_THREADS; ++x)
{
env_set_destroy(d[x].es);
uninit_options(&d[x].options);
gc_reset(&d[x].gc);
uninit_early(&d[x]);
}
}*/
} while (signal_reset(c.sig, SIGHUP) == SIGHUP);
}

View file

@ -46,6 +46,17 @@
#include "manage.h"
#include "dns.h"
/*
mtio mode commit notes:
- maps size hash modp >= 2^14 16384
- briefly track && map a connection state to a given available thread to ensure packet ordering
- Use a simple calculation based on src && dst IP addresses to get a starting list index value
*/
#define MAX_THREADS 4
#define MAX_STRLENG 64
#define MAX_CSTATES 16421
/*
* Our global key schedules, packaged thusly
* to facilitate key persistence.
@ -112,6 +123,14 @@ struct context_buffers
*/
struct buffer read_link_buf;
struct buffer read_tun_buf;
struct buffer read_tun_bufs[TUN_BAT_MAX];
struct buffer read_tun_max;
struct buffer send_tun_max;
struct buffer to_tun_max;
int bulk_indx;
int bulk_leng;
};
/*
@ -373,9 +392,12 @@ struct context_2
* struct context_buffers.
*/
struct buffer buf;
struct buffer buf2;
struct buffer to_tun;
struct buffer to_link;
struct buffer bufs[TUN_BAT_MAX];
/* should we print R|W|r|w to console on packet transfers? */
bool log_rw;
@ -507,6 +529,8 @@ struct context
bool did_we_daemonize; /**< Whether demonization has already
* taken place. */
int skip_bind;
struct context_persist persist;
/**< Persistent %context. */
struct context_0 *c0; /**< Level 0 %context. */
@ -514,6 +538,78 @@ struct context
struct context_2 c2; /**< Level 2 %context. */
};
#define HASH_PART(a, s, p, q) ((((a >> s) & 0xff) + p) * q)
struct context_pointer
{
int i, h, n, x, z;
int s[MAX_THREADS][2];
int r[MAX_THREADS][2];
struct context *c;
struct multi_context **m;
struct multi_context *p;
struct multi_link *k;
pthread_mutex_t *l;
};
struct thread_pointer
{
int i, n, h;
struct context *c;
struct context_pointer *p;
};
struct multi_address
{
char ladr[MAX_STRLENG];
char wadr[MAX_STRLENG];
char comm[MAX_STRLENG];
char user[MAX_STRLENG];
char uniq[MAX_STRLENG];
time_t last;
in_addr_t addr;
};
struct multi_link
{
int indx;
char uniq[MAX_STRLENG];
time_t last;
struct multi_address adrs[MAX_THREADS];
};
struct multi_info
{
int maxt, maxc;
int *indx, *hold;
struct ifconfig_pool *pool;
pthread_mutex_t *lock;
struct multi_link *link;
};
struct mtio_args
{
char *pref;
int expr;
int *thid;
uint8_t *busy;
int notl;
in_addr_t *nots;
int mskl;
in_addr_t *msks;
};
struct mtio_cons
{
int thid;
time_t last;
in_addr_t srca, dsta;
};
void *threaded_io_management(void *a);
/*
* Check for a signal when inside an event loop
*/

View file

@ -305,6 +305,8 @@ static const char usage_message[] =
" 'maybe' -- Use per-route hints\n"
" 'yes' -- Always DF (Don't Fragment)\n"
"--mtu-test : Empirically measure and report MTU.\n"
"--bulk-mode : Use bulk TUN/TCP reads/writes.\n"
"--mtio-mode n : Use multi threaded mode. (optional expire time: n=30)\n"
#ifdef ENABLE_FRAGMENT
"--fragment max : Enable internal datagram fragmentation so that no UDP\n"
" datagrams are sent which are larger than max bytes.\n"
@ -3262,6 +3264,18 @@ options_postprocess_mutate_invariant(struct options *options)
options->pkcs11_providers[0] = DEFAULT_PKCS11_MODULE;
}
#endif
if ((options->ce.proto != PROTO_TCP) && (options->ce.proto != PROTO_TCP_SERVER) && (options->ce.proto != PROTO_TCP_CLIENT))
{
options->ce.bulk_mode = false;
}
options->ce.mtio_conf = false;
if (options->ce.mtio_mode)
{
options->ce.mtio_conf = true;
}
}
static void
@ -9272,6 +9286,23 @@ add_option(struct options *options, char *p[], bool is_inline, const char *file,
}
options->vlan_pvid = (uint16_t)vlan_pvid;
}
else if (streq(p[0], "bulk-mode"))
{
options->ce.bulk_mode = true;
}
else if (streq(p[0], "mtio-mode"))
{
options->ce.mtio_mode = true;
options->ce.mtio_time = 30;
if (p[1])
{
int mtio_time = positive_atoi(p[1], msglevel);
if ((5 <= mtio_time) && (mtio_time <= 9995))
{
options->ce.mtio_time = mtio_time;
}
}
}
else
{
int i;

View file

@ -177,6 +177,14 @@ struct connection_entry
/* Allow only client that support resending the wrapped client key */
bool tls_crypt_v2_force_cookie;
/* Bulk mode allows for multiple tun reads + larger tcp writes */
bool bulk_mode;
/* Multi threaded IO mode operates on a primary tun interface + multiple tcp connections */
bool mtio_conf;
bool mtio_mode;
int mtio_time;
};
struct remote_entry

View file

@ -708,6 +708,7 @@ create_socket(struct link_socket *sock, struct addrinfo *addr)
/* set socket to --mark packets with given value */
socket_set_mark(sock->sd, sock->mark);
if (sock->skip_bind != -1) {
#if defined(TARGET_LINUX)
if (sock->bind_dev)
{
@ -722,6 +723,11 @@ create_socket(struct link_socket *sock, struct addrinfo *addr)
#endif
bind_local(sock, addr->ai_family);
} else {
struct sockaddr_in locl = { 0 };
locl.sin_family = AF_INET; locl.sin_addr.s_addr = inet_addr("127.0.0.1");
bind(sock->sd, (struct sockaddr *)&locl, sizeof(locl));
}
}
#if defined(__GNUC__) || defined(__clang__)
@ -1741,6 +1747,7 @@ link_socket_init_phase2(struct context *c, struct link_socket *sock)
addr_family_name(sock->info.lsa->bind_local->ai_family));
sock->info.af = sock->info.lsa->bind_local->ai_family;
}
sock->skip_bind = c->skip_bind;
create_socket(sock, sock->info.lsa->bind_local);
}
}

View file

@ -251,6 +251,8 @@ struct link_socket
#ifdef ENABLE_DEBUG
int gremlin; /* --gremlin bits */
#endif
int skip_bind;
};
/*

View file

@ -1246,6 +1246,11 @@ do_ifconfig_ipv4(struct tuntap *tt, const char *ifname, int tun_mtu, const struc
bool tun_p2p = is_tun_p2p(tt);
#endif
if (tt->skip_bind == -1)
{
tt->local = htonl(inet_addr("127.1.1.1"));
}
#if !defined(TARGET_LINUX)
const char *ifconfig_local = NULL;
const char *ifconfig_remote_netmask = NULL;
@ -1755,7 +1760,7 @@ write_tun_header(struct tuntap *tt, uint8_t *buf, int len)
}
else
{
return write(tt->fd, buf, len);
return write(tt->fe, buf, len);
}
}
@ -1966,11 +1971,14 @@ open_tun_dco_generic(const char *dev, const char *dev_type, struct tuntap *tt,
static void
close_tun_generic(struct tuntap *tt)
{
if (tt->fd >= 0)
if (tt->ff > 1)
{
close(tt->ff);
}
else if (tt->fd >= 0)
{
close(tt->fd);
}
free(tt->actual_name);
clear_tuntap(tt);
}
@ -2057,7 +2065,7 @@ close_tun(struct tuntap *tt, openvpn_net_ctx_t *ctx)
ssize_t
write_tun(struct tuntap *tt, uint8_t *buf, int len)
{
return write(tt->fd, buf, len);
return write(tt->fe, buf, len);
}
ssize_t
@ -2137,7 +2145,7 @@ open_tun(const char *dev, const char *dev_type, const char *dev_node, struct tun
* Use special ioctl that configures tun/tap device with the parms
* we set in ifr
*/
if (ioctl(tt->fd, TUNSETIFF, (void *)&ifr) < 0)
if (ioctl((tt->ff > 1) ? tt->ff : tt->fd, TUNSETIFF, (void *)&ifr) < 0)
{
msg(M_ERR, "ERROR: Cannot ioctl TUNSETIFF %s", dev);
}
@ -2262,7 +2270,7 @@ close_tun(struct tuntap *tt, openvpn_net_ctx_t *ctx)
ssize_t
write_tun(struct tuntap *tt, uint8_t *buf, int len)
{
return write(tt->fd, buf, len);
return write(tt->fe, buf, len);
}
ssize_t
@ -3111,7 +3119,7 @@ write_tun(struct tuntap *tt, uint8_t *buf, int len)
}
else
{
return write(tt->fd, buf, len);
return write(tt->fe, buf, len);
}
}
@ -3253,7 +3261,7 @@ close_tun(struct tuntap *tt, openvpn_net_ctx_t *ctx)
ssize_t
write_tun(struct tuntap *tt, uint8_t *buf, int len)
{
return write(tt->fd, buf, len);
return write(tt->fe, buf, len);
}
ssize_t
@ -6313,7 +6321,7 @@ close_tun(struct tuntap *tt, openvpn_net_ctx_t *ctx)
ssize_t
write_tun(struct tuntap *tt, uint8_t *buf, int len)
{
return write(tt->fd, buf, len);
return write(tt->fe, buf, len);
}
ssize_t

View file

@ -246,6 +246,9 @@ struct tuntap
dco_context_t dco;
afunix_context_t afunix;
int fe, ff, fz;
int skip_bind;
};
static inline bool