xdp-tcp: major refactoring + store outgoing buffers

- both unsent and unacked buffers of outgoing payload stored
- no longer uses dynarray
- multiple in-buffers per relay
- packets are only sent in knot_tcp_send()
This commit is contained in:
Libor Peltan 2021-09-10 16:56:33 +02:00 committed by Daniel Salzman
parent 26cbe01a60
commit 302b690199
16 changed files with 820 additions and 525 deletions

View file

@ -496,6 +496,7 @@ xdp:
tcp\-inbuf\-max\-size: SIZE
tcp\-idle\-close\-timeout: TIME
tcp\-idle\-reset\-timeout: TIME
tcp\-resend\-timeout: TIME
route\-check: BOOL
.ft P
.fi
@ -611,6 +612,14 @@ Time in seconds, after which any idle connection is forcibly closed.
\fIMinimum:\fP 1 s
.sp
\fIDefault:\fP 20 s
.SS tcp\-resend\-timeout
.sp
Resend outgoing data packets (with DNS response payload) if not ACKed
before this timeout.
.sp
\fIMinimum:\fP 1 s
.sp
\fIDefault:\fP 5 s
.SS route\-check
.sp
If enabled, routing information from the operating system is considered

View file

@ -524,6 +524,7 @@ Various options related to XDP listening, especially TCP.
tcp-inbuf-max-size: SIZE
tcp-idle-close-timeout: TIME
tcp-idle-reset-timeout: TIME
tcp-resend-timeout: TIME
route-check: BOOL
.. CAUTION::
@ -632,6 +633,18 @@ Time in seconds, after which any idle connection is forcibly closed.
*Default:* 20 s
.. _xdp_tcp-resend-timeout:
tcp-resend-timeout
------------------
Resend outgoing data packets (with DNS response payload) if not ACKed
before this timeout.
*Minimum:* 1 s
*Default:* 5 s
.. _xdp_route-check:
route-check

View file

@ -201,6 +201,9 @@ static void init_cache(
val = conf_get(conf, C_XDP, C_TCP_IDLE_RESET);
conf->cache.xdp_tcp_idle_reset = conf_int(&val);
val = conf_get(conf, C_XDP, C_TCP_RESEND);
conf->cache.xdp_tcp_idle_resend = conf_int(&val);
conf->cache.xdp_tcp = running_xdp_tcp;
conf->cache.xdp_route_check = running_route_check;

View file

@ -136,6 +136,7 @@ typedef struct {
size_t xdp_tcp_inbuf_max_size;
uint32_t xdp_tcp_idle_close;
uint32_t xdp_tcp_idle_reset;
uint32_t xdp_tcp_idle_resend;
bool xdp_tcp;
bool xdp_route_check;
int ctl_timeout;

View file

@ -246,6 +246,7 @@ static const yp_item_t desc_xdp[] = {
{ C_TCP_INBUF_MAX_SIZE, YP_TINT, YP_VINT = { MEGA(1), SSIZE_MAX, MEGA(100), YP_SSIZE } },
{ C_TCP_IDLE_CLOSE, YP_TINT, YP_VINT = { 1, INT32_MAX, 10, YP_STIME } },
{ C_TCP_IDLE_RESET, YP_TINT, YP_VINT = { 1, INT32_MAX, 20, YP_STIME } },
{ C_TCP_RESEND, YP_TINT, YP_VINT = { 1, INT32_MAX, 5, YP_STIME } },
{ C_ROUTE_CHECK, YP_TBOOL, YP_VNONE },
{ NULL }
};

View file

@ -130,6 +130,7 @@
#define C_TCP_INBUF_MAX_SIZE "\x12""tcp-inbuf-max-size"
#define C_TCP_IO_TIMEOUT "\x0E""tcp-io-timeout"
#define C_TCP_MAX_CLIENTS "\x0F""tcp-max-clients"
#define C_TCP_RESEND "\x12""tcp-resend-timeout"
#define C_TCP_REUSEPORT "\x0D""tcp-reuseport"
#define C_TCP_RMT_IO_TIMEOUT "\x15""tcp-remote-io-timeout"
#define C_TCP_WORKERS "\x0B""tcp-workers"

View file

@ -200,9 +200,6 @@ int axfr_process_query(knot_pkt_t *pkt, knotd_qdata_t *qdata)
knot_strerror(ret));
return KNOT_STATE_FAIL;
}
} else if (qdata->params->xdp_msg != NULL) {
qdata->rcode = KNOT_RCODE_SERVFAIL;
return KNOT_STATE_FAIL;
}
/* Reserve space for TSIG. */

View file

@ -292,9 +292,6 @@ int ixfr_process_query(knot_pkt_t *pkt, knotd_qdata_t *qdata)
knot_strerror(ret));
return KNOT_STATE_FAIL;
}
} else if (qdata->params->xdp_msg != NULL) {
qdata->rcode = KNOT_RCODE_SERVFAIL;
return KNOT_STATE_FAIL;
}
/* Reserve space for TSIG. */

View file

@ -32,7 +32,7 @@ typedef struct xdp_handle_ctx {
knot_xdp_socket_t *sock;
knot_xdp_msg_t msg_recv[XDP_BATCHLEN];
knot_xdp_msg_t msg_send_udp[XDP_BATCHLEN];
knot_tcp_relay_dynarray_t tcp_relays;
knot_tcp_relay_t relays[XDP_BATCHLEN];
uint32_t msg_recv_count;
uint32_t msg_udp_count;
knot_tcp_table_t *tcp_table;
@ -40,8 +40,10 @@ typedef struct xdp_handle_ctx {
bool tcp;
size_t tcp_max_conns;
size_t tcp_max_inbufs;
size_t tcp_max_obufs;
uint32_t tcp_idle_close; // In microseconds.
uint32_t tcp_idle_reset; // In microseconds.
uint32_t tcp_idle_resend;
} xdp_handle_ctx_t;
static bool udp_state_active(int state)
@ -66,8 +68,10 @@ void xdp_handle_reconfigure(xdp_handle_ctx_t *ctx)
ctx->tcp = pconf->cache.xdp_tcp;
ctx->tcp_max_conns = pconf->cache.xdp_tcp_max_clients / pconf->cache.srv_xdp_threads;
ctx->tcp_max_inbufs = pconf->cache.xdp_tcp_inbuf_max_size / pconf->cache.srv_xdp_threads;
ctx->tcp_max_obufs = pconf->cache.xdp_tcp_inbuf_max_size / pconf->cache.srv_xdp_threads; // FIXME another setting for outbuf!!
ctx->tcp_idle_close = pconf->cache.xdp_tcp_idle_close * 1000000;
ctx->tcp_idle_reset = pconf->cache.xdp_tcp_idle_reset * 1000000;
ctx->tcp_idle_resend= pconf->cache.xdp_tcp_idle_resend * 1000000;
rcu_read_unlock();
}
@ -181,50 +185,37 @@ static void handle_udp(xdp_handle_ctx_t *ctx, knot_layer_t *layer,
static void handle_tcp(xdp_handle_ctx_t *ctx, knot_layer_t *layer,
knotd_qdata_params_t *params)
{
uint32_t ack_errors = 0;
int ret = knot_tcp_relay(ctx->sock, ctx->msg_recv, ctx->msg_recv_count,
ctx->tcp_table, NULL, &ctx->tcp_relays, &ack_errors);
int ret = knot_tcp_recv(ctx->relays, ctx->msg_recv, ctx->msg_recv_count, ctx->tcp_table, NULL);
if (ret != KNOT_EOK) {
log_notice("TCP, failed to process some packets (%s)", knot_strerror(ret));
return;
} else if (ctx->tcp_relays.size == 0) {
} else if (knot_tcp_relay_empty(&ctx->relays[0])) { // no TCP traffic
return;
} else if (ack_errors > 0) {
log_notice("TCP, failed to send some ACK packets");
}
uint8_t ans_buf[KNOT_WIRE_MAX_PKTSIZE];
// Note dynaray_foreach can't be used as we insert into the dynarray inside the loop.
for (int n_tcp_relays = ctx->tcp_relays.size, rli = 0; rli < n_tcp_relays; rli++) {
knot_tcp_relay_t *rl = knot_tcp_relay_dynarray_arr(&ctx->tcp_relays) + rli;
if ((rl->action & XDP_TCP_DATA) == 0 || rl->answer != XDP_TCP_NOOP) {
continue;
}
for (uint32_t i = 0; i < ctx->msg_recv_count; i++) {
knot_tcp_relay_t *rl = &ctx->relays[i];
// Consume the query.
handle_init(params, layer, rl->msg, &rl->data);
for (size_t j = 0; j < rl->inbufs_count; j++) {
// Consume the query.
handle_init(params, layer, rl->msg, &rl->inbufs[j]);
// Process the reply.
knot_pkt_t *ans = knot_pkt_new(ans_buf, sizeof(ans_buf), layer->mm);
while (tcp_active_state(layer->state)) {
knot_layer_produce(layer, ans);
if (!tcp_send_state(layer->state)) {
continue;
// Process the reply.
knot_pkt_t *ans = knot_pkt_new(ans_buf, sizeof(ans_buf), layer->mm);
while (tcp_active_state(layer->state)) {
knot_layer_produce(layer, ans);
if (!tcp_send_state(layer->state)) {
continue;
}
(void)knot_tcp_reply_data(rl, ctx->tcp_table, ans->wire, ans->size);
// ignore unprobable ENOMEM here
}
ret = knot_tcp_relay_answer(&ctx->tcp_relays, rl,
ans->wire, ans->size);
if (ret != KNOT_EOK) {
char addr[SOCKADDR_STRLEN];
sockaddr_tostr(addr, sizeof(addr), params->remote);
log_notice("TCP, failed to reply, address %s (%s)",
addr, knot_strerror(ret));
layer->state = KNOT_STATE_FAIL;
}
handle_finish(layer);
}
handle_finish(layer);
}
}
@ -254,8 +245,7 @@ void xdp_handle_send(xdp_handle_ctx_t *ctx)
uint32_t unused;
(void)knot_xdp_send(ctx->sock, ctx->msg_send_udp, ctx->msg_udp_count, &unused);
if (ctx->tcp) {
int ret = knot_tcp_send(ctx->sock, knot_tcp_relay_dynarray_arr(&ctx->tcp_relays),
ctx->tcp_relays.size);
int ret = knot_tcp_send(ctx->sock, ctx->relays, ctx->msg_recv_count, XDP_BATCHLEN);
if (ret != KNOT_EOK) {
log_notice("TCP, failed to send some packets");
}
@ -263,18 +253,10 @@ void xdp_handle_send(xdp_handle_ctx_t *ctx)
(void)knot_xdp_send_finish(ctx->sock);
if (ctx->tcp) {
knot_tcp_relay_free(&ctx->tcp_relays);
knot_tcp_cleanup(ctx->tcp_table, ctx->relays, ctx->msg_recv_count);
}
}
static size_t overweight(size_t weight, size_t max_weight)
{
int64_t w = weight;
w -= max_weight;
w = MAX(w, 0);
return w;
}
void xdp_handle_sweep(xdp_handle_ctx_t *ctx)
{
if (!ctx->tcp) {
@ -284,13 +266,17 @@ void xdp_handle_sweep(xdp_handle_ctx_t *ctx)
uint32_t prev_reset;
uint32_t total_reset = 0, total_close = 0;
int ret = KNOT_EOK;
knot_tcp_relay_t sweep_relays[XDP_BATCHLEN];
do {
prev_reset = total_reset;
ret = knot_tcp_sweep(ctx->tcp_table, ctx->sock, 20,
ctx->tcp_idle_close, ctx->tcp_idle_reset,
overweight(ctx->tcp_table->usage, ctx->tcp_max_conns),
overweight(ctx->tcp_table->inbufs_total, ctx->tcp_max_inbufs),
&total_close, &total_reset);
ret = knot_tcp_sweep(ctx->tcp_table, ctx->tcp_idle_close, ctx->tcp_idle_reset,
ctx->tcp_idle_resend,
ctx->tcp_max_conns, ctx->tcp_max_inbufs, ctx->tcp_max_obufs,
sweep_relays, XDP_BATCHLEN, &total_close, &total_reset);
if (ret == KNOT_EOK) {
ret = knot_tcp_send(ctx->sock, sweep_relays, XDP_BATCHLEN, XDP_BATCHLEN);
}
knot_tcp_cleanup(ctx->tcp_table, sweep_relays, XDP_BATCHLEN);
} while (ret == KNOT_EOK && prev_reset < total_reset);
if (total_close > 0 || total_reset > 0) {

View file

@ -20,9 +20,9 @@
#include <time.h>
#include "libknot/xdp/tcp.h"
#include "libknot/xdp/tcp_iobuf.h"
#include "libknot/attribute.h"
#include "libknot/error.h"
#include "libknot/xdp/tcp_iobuf.h"
#include "libdnssec/random.h"
#include "contrib/macros.h"
#include "contrib/openbsd/siphash.h"
@ -110,8 +110,6 @@ static knot_tcp_conn_t **tcp_table_lookup(const struct sockaddr_in6 *rem,
while (*res != NULL) {
if (memcmp(&(*res)->ip_rem, rem, sdl) == 0 &&
memcmp(&(*res)->ip_loc, loc, sdl) == 0) {
rem_node(tcp_conn_node(*res));
add_tail(tcp_table_timeout(table), tcp_conn_node(*res));
break;
}
res = &(*res)->next;
@ -119,6 +117,16 @@ static knot_tcp_conn_t **tcp_table_lookup(const struct sockaddr_in6 *rem,
return res;
}
static knot_tcp_conn_t **tcp_table_re_lookup(knot_tcp_conn_t *conn,
knot_tcp_table_t *table)
{
uint64_t unused_hash;
knot_tcp_conn_t **res = tcp_table_lookup(&conn->ip_rem, &conn->ip_loc,
&unused_hash, table);
assert(*res == conn);
return res;
}
static void tcp_table_del_conn(knot_tcp_conn_t **todel)
{
knot_tcp_conn_t *conn = *todel;
@ -126,6 +134,7 @@ static void tcp_table_del_conn(knot_tcp_conn_t **todel)
*todel = conn->next; // remove from conn-table linked list
rem_node(tcp_conn_node(conn)); // remove from timeout double-linked list
free(conn->inbuf.iov_base);
free(conn->outbufs.bufs);
free(conn);
}
}
@ -138,16 +147,6 @@ static void tcp_table_del(knot_tcp_conn_t **todel, knot_tcp_table_t *table)
table->usage--;
}
static void tcp_table_del_lookup(knot_tcp_conn_t *todel, knot_tcp_table_t *table)
{
// re-lookup is needed to find the **pointer in the table
uint64_t unused_hash;
knot_tcp_conn_t **pconn = tcp_table_lookup(&todel->ip_rem, &todel->ip_loc,
&unused_hash, table);
assert(*pconn == todel);
tcp_table_del(pconn, table);
}
// WARNING you shall ensure that it's not in the table already!
static int tcp_table_add(knot_xdp_msg_t *msg, uint64_t hash, knot_tcp_table_t *table,
knot_tcp_conn_t **res)
@ -168,11 +167,14 @@ static int tcp_table_add(knot_xdp_msg_t *msg, uint64_t hash, knot_tcp_table_t *t
c->ackno = msg->ackno;
c->acked = msg->ackno;
c->window_size = 65536; // FIXME
c->last_active = get_timestamp();
add_tail(tcp_table_timeout(table), tcp_conn_node(c));
c->state = XDP_TCP_NORMAL;
memset(&c->inbuf, 0, sizeof(c->inbuf));
memset(&c->outbufs, 0, sizeof(c->outbufs));
c->next = *addto;
*addto = c;
@ -182,8 +184,6 @@ static int tcp_table_add(knot_xdp_msg_t *msg, uint64_t hash, knot_tcp_table_t *t
return KNOT_EOK;
}
knot_dynarray_define(knot_tcp_relay, knot_tcp_relay_t, DYNARRAY_VISIBILITY_PUBLIC)
static bool check_seq_ack(const knot_xdp_msg_t *msg, const knot_tcp_conn_t *conn)
{
if (conn == NULL || conn->seqno != msg->seqno) {
@ -198,88 +198,56 @@ static bool check_seq_ack(const knot_xdp_msg_t *msg, const knot_tcp_conn_t *conn
}
_public_
int knot_tcp_relay(knot_xdp_socket_t *socket, knot_xdp_msg_t msgs[], uint32_t msg_count,
knot_tcp_table_t *tcp_table, knot_tcp_table_t *syn_table,
knot_tcp_relay_dynarray_t *relays, uint32_t *ack_errors)
int knot_tcp_recv(knot_tcp_relay_t *relays, knot_xdp_msg_t *msgs, uint32_t count,
knot_tcp_table_t *tcp_table, knot_tcp_table_t *syn_table)
{
if (msg_count == 0) {
if (count == 0) {
return KNOT_EOK;
}
if (socket == NULL || msgs == NULL || tcp_table == NULL || relays == NULL) {
if (relays == NULL || msgs == NULL || tcp_table == NULL) {
return KNOT_EINVAL;
}
memset(relays, 0, count * sizeof(*relays));
knot_xdp_send_prepare(socket);
knot_xdp_msg_t acks[msg_count];
uint32_t n_acks = 0;
knot_tcp_relay_t *relay = relays;
int ret = KNOT_EOK;
#define resp_ack(msg, flag) \
{ \
knot_xdp_msg_t *ack = &acks[n_acks++]; \
int ackret = knot_xdp_reply_alloc(socket, (msg), ack); \
if (ackret != KNOT_EOK) { \
if (ack_errors != NULL) (*ack_errors)++; \
n_acks--; \
continue; \
} \
ack->payload.iov_len = 0; \
ack->flags |= (flag); \
}
for (size_t i = 0; i < msg_count && ret == KNOT_EOK; i++) {
knot_xdp_msg_t *msg = &msgs[i];
for (knot_xdp_msg_t *msg = msgs; msg != msgs + count && ret == KNOT_EOK; msg++) {
if (!(msg->flags & KNOT_XDP_MSG_TCP)) {
continue;
}
uint64_t conn_hash;
knot_tcp_conn_t **conn = tcp_table_lookup(&msg->ip_from, &msg->ip_to,
&conn_hash, tcp_table);
bool seq_ack_match = check_seq_ack(msg, *conn);
knot_tcp_conn_t **pconn = tcp_table_lookup(&msg->ip_from, &msg->ip_to,
&conn_hash, tcp_table);
knot_tcp_conn_t *conn = *pconn;
bool seq_ack_match = check_seq_ack(msg, conn);
if (seq_ack_match) {
assert((*conn)->mss != 0);
(*conn)->seqno = knot_tcp_next_seqno(msg);
memcpy((*conn)->last_eth_rem, msg->eth_from, sizeof((*conn)->last_eth_rem));
memcpy((*conn)->last_eth_loc, msg->eth_to, sizeof((*conn)->last_eth_loc));
(*conn)->last_active = get_timestamp();
assert(conn->mss != 0);
conn->seqno = knot_tcp_next_seqno(msg);
memcpy(conn->last_eth_rem, msg->eth_from, sizeof(conn->last_eth_rem));
memcpy(conn->last_eth_loc, msg->eth_to, sizeof(conn->last_eth_loc));
conn->last_active = get_timestamp();
rem_node(tcp_conn_node(conn));
add_tail(tcp_table_timeout(tcp_table), tcp_conn_node(conn));
if (msg->flags & KNOT_XDP_MSG_ACK) {
(*conn)->acked = msg->ackno;
conn->acked = msg->ackno;
tcp_outbufs_ack(&conn->outbufs, msg->ackno, &tcp_table->outbufs_total);
}
}
knot_tcp_relay_t relay = { .msg = msg, .conn = *conn };
relay->msg = msg;
relay->conn = conn;
// process incoming data
if (seq_ack_match && (msg->flags & KNOT_XDP_MSG_ACK) && msg->payload.iov_len > 0) {
resp_ack(msg, KNOT_XDP_MSG_ACK);
relay.action = XDP_TCP_DATA;
struct iovec msg_payload = msg->payload, tofree;
ret = tcp_inbuf_update(&(*conn)->inbuf, &msg_payload,
&tofree, &tcp_table->inbufs_total);
if (tofree.iov_len > 0 && ret == KNOT_EOK) {
relay.data.iov_base = tofree.iov_base + sizeof(uint16_t);
relay.data.iov_len = tofree.iov_len - sizeof(uint16_t);
relay.free_data = XDP_TCP_FREE_PREFIX;
if (knot_tcp_relay_dynarray_add(relays, &relay) == NULL) {
ret = KNOT_ENOMEM;
}
relay.free_data = XDP_TCP_FREE_NONE;
}
while (msg_payload.iov_len > 0 && ret == KNOT_EOK) {
size_t dns_len = tcp_payload_len(&msg_payload);
assert(dns_len >= msg_payload.iov_len);
relay.data.iov_base = msg_payload.iov_base + sizeof(uint16_t);
relay.data.iov_len = dns_len - sizeof(uint16_t);
if (knot_tcp_relay_dynarray_add(relays, &relay) == NULL) {
ret = KNOT_ENOMEM;
}
msg_payload.iov_base += dns_len;
msg_payload.iov_len -= dns_len;
relay->auto_answer = KNOT_XDP_MSG_ACK;
ret = tcp_inbuf_update(&conn->inbuf, msg->payload, &relay->inbufs,
&relay->inbufs_count, &tcp_table->inbufs_total);
if (ret != KNOT_EOK) {
break;
}
}
@ -288,165 +256,162 @@ int knot_tcp_relay(knot_xdp_socket_t *socket, knot_xdp_msg_t msgs[], uint32_t ms
KNOT_XDP_MSG_FIN | KNOT_XDP_MSG_RST)) {
case KNOT_XDP_MSG_SYN:
case (KNOT_XDP_MSG_SYN | KNOT_XDP_MSG_ACK):
if (*conn == NULL) {
if (conn == NULL) {
bool synack = (msg->flags & KNOT_XDP_MSG_ACK);
resp_ack(msg, synack ? KNOT_XDP_MSG_ACK :
(KNOT_XDP_MSG_SYN | KNOT_XDP_MSG_ACK));
relay.action = synack ? XDP_TCP_ESTABLISH : XDP_TCP_SYN;
ret = tcp_table_add(msg, conn_hash,
(syn_table == NULL || synack) ? tcp_table : syn_table,
&relay.conn);
if (knot_tcp_relay_dynarray_add(relays, &relay) == NULL) {
ret = KNOT_ENOMEM;
}
&relay->conn);
if (ret == KNOT_EOK) {
relay.conn->state = XDP_TCP_ESTABLISHING;
relay.conn->seqno++;
relay.conn->mss = MAX(msg->mss, 536); // minimal MSS, most importantly not zero!
relay.conn->acked = acks[n_acks - 1].seqno;
relay.conn->ackno = relay.conn->acked + (synack ? 0 : 1);
relay->action = synack ? XDP_TCP_ESTABLISH : XDP_TCP_SYN;
relay->auto_answer = synack ? KNOT_XDP_MSG_ACK : (KNOT_XDP_MSG_SYN | KNOT_XDP_MSG_ACK);
conn = relay->conn;
conn->state = XDP_TCP_ESTABLISHING;
conn->seqno++;
conn->mss = MAX(msg->mss, 536); // minimal MSS, most importantly not zero!
if (!synack) {
conn->acked = dnssec_random_uint32_t();
conn->ackno = conn->acked;
}
}
} else {
resp_ack(msg, KNOT_XDP_MSG_RST); // TODO consider resetting the OLD conn and accepting new one
relay->auto_answer = KNOT_XDP_MSG_RST; // TODO consider resetting the OLD conn and accepting new one
relay->del_from = pconn;
}
break;
case KNOT_XDP_MSG_ACK:
if (!seq_ack_match) {
uint64_t syn_hash;
if (syn_table != NULL && msg->payload.iov_len == 0 &&
*(conn = tcp_table_lookup(&msg->ip_from, &msg->ip_to, &syn_hash, syn_table)) != NULL &&
check_seq_ack(msg, *conn)) {
tcp_table_del(conn, syn_table);
*conn = NULL;
relay.action = XDP_TCP_ESTABLISH;
ret = tcp_table_add(msg, conn_hash, tcp_table, &relay.conn);
if (ret == KNOT_EOK && knot_tcp_relay_dynarray_add(relays, &relay) == NULL) {
ret = KNOT_ENOMEM;
}
*(pconn = tcp_table_lookup(&msg->ip_from, &msg->ip_to, &syn_hash, syn_table)) != NULL &&
check_seq_ack(msg, (conn = *pconn))) {
tcp_table_del(pconn, syn_table);
*pconn = NULL;
relay->action = XDP_TCP_ESTABLISH;
ret = tcp_table_add(msg, conn_hash, tcp_table, &relay->conn);
}
// unmatching ACK is ignored, this includes:
// - incoming out-of-order data
// - ACK of some previous part of outgoing data
} else {
switch ((*conn)->state) {
switch (conn->state) {
case XDP_TCP_NORMAL:
case XDP_TCP_CLOSING1: // just a mess, ignore
break;
case XDP_TCP_ESTABLISHING:
(*conn)->state = XDP_TCP_NORMAL;
conn->state = XDP_TCP_NORMAL;
break;
case XDP_TCP_CLOSING2:
tcp_table_del(conn, tcp_table);
tcp_table_del(pconn, tcp_table);
relay->conn = NULL;
break;
}
}
break; // sole ACK without PSH is ignored
break;
case (KNOT_XDP_MSG_FIN | KNOT_XDP_MSG_ACK):
if (!seq_ack_match) {
resp_ack(msg, KNOT_XDP_MSG_RST);
if (conn != NULL) {
relay->auto_answer = KNOT_XDP_MSG_RST;
relay->del_from = pconn;
} // else ignore. It would be better and possible, but no big value for the price of CPU.
} else {
if ((*conn)->state == XDP_TCP_CLOSING1) {
resp_ack(msg, KNOT_XDP_MSG_ACK);
relay.action = XDP_TCP_CLOSE;
if (knot_tcp_relay_dynarray_add(relays, &relay) == NULL) {
ret = KNOT_ENOMEM;
}
tcp_table_del(conn, tcp_table);
if (conn->state == XDP_TCP_CLOSING1) {
relay->action = XDP_TCP_CLOSE;
relay->auto_answer = KNOT_XDP_MSG_ACK;
relay->del_from = pconn;
} else if (msg->payload.iov_len == 0) { // otherwise ignore FIN
resp_ack(msg, KNOT_XDP_MSG_FIN | KNOT_XDP_MSG_ACK);
relay.action = XDP_TCP_CLOSE;
if (knot_tcp_relay_dynarray_add(relays, &relay) == NULL) {
ret = KNOT_ENOMEM;
}
(*conn)->state = XDP_TCP_CLOSING2;
(*conn)->ackno++;
relay->action = XDP_TCP_CLOSE;
relay->auto_answer = KNOT_XDP_MSG_FIN | KNOT_XDP_MSG_ACK;
conn->state = XDP_TCP_CLOSING2;
}
}
break;
case KNOT_XDP_MSG_RST:
if (conn != NULL && msg->seqno == conn->seqno) {
relay.action = XDP_TCP_RESET;
if (knot_tcp_relay_dynarray_add(relays, &relay) == NULL) {
ret = KNOT_ENOMEM;
}
tcp_table_del(conn, tcp_table);
relay->action = XDP_TCP_RESET;
tcp_table_del(pconn, tcp_table);
relay->conn = NULL;
}
break;
default:
break;
}
}
if (n_acks > 0 && ret == KNOT_EOK) {
uint32_t sent_unused;
(void)knot_xdp_send(socket, acks, n_acks, &sent_unused);
(void)knot_xdp_send_finish(socket);
if (!knot_tcp_relay_empty(relay)) {
relay++;
}
}
#undef resp_ack
return ret;
}
_public_
int knot_tcp_relay_answer(knot_tcp_relay_dynarray_t *relays, const knot_tcp_relay_t *relay,
void *data, size_t data_len)
int knot_tcp_reply_data(knot_tcp_relay_t *relay, knot_tcp_table_t *tcp_table,
uint8_t *data, size_t len)
{
if (relays == NULL || relay == NULL || data == NULL) {
if (relay == NULL || tcp_table == NULL || relay->conn == NULL) {
return KNOT_EINVAL;
}
assert(data_len <= UINT16_MAX);
uint16_t prefix = htobe16(data_len);
#define PREFIX_LEN (prefix == 0 ? 0 : sizeof(prefix))
while (data_len > 0) {
knot_tcp_relay_t *clone = knot_tcp_relay_dynarray_add(relays, relay);
if (clone == NULL) {
return KNOT_ENOMEM;
}
size_t chunk = MIN(data_len + PREFIX_LEN, relay->conn->mss);
assert(chunk >= PREFIX_LEN);
clone->data.iov_base = malloc(chunk);
if (clone->data.iov_base == NULL) {
return KNOT_ENOMEM;
}
clone->data.iov_len = chunk;
memcpy(clone->data.iov_base, &prefix, PREFIX_LEN);
chunk -= PREFIX_LEN;
memcpy(clone->data.iov_base + PREFIX_LEN, data, chunk);
clone->answer = XDP_TCP_ANSWER | XDP_TCP_DATA;
clone->free_data = XDP_TCP_FREE_DATA;
data += chunk;
data_len -= chunk;
prefix = 0;
}
return KNOT_EOK;
return tcp_outbufs_add(&relay->conn->outbufs, data, len,
relay->conn->mss, &tcp_table->outbufs_total);
}
_public_
void knot_tcp_relay_free(knot_tcp_relay_dynarray_t *relays)
static knot_xdp_msg_t *first_msg(knot_xdp_msg_t *msgs, uint32_t n_msgs)
{
if (relays == NULL) {
return;
memset(msgs, 0, n_msgs * sizeof(*msgs));
return msgs - 1; // will be incremented just before first use
}
static int send_msgs(knot_xdp_msg_t *msgs, uint32_t n_msgs, knot_xdp_socket_t *socket)
{
uint32_t sent = 0;
if (n_msgs == 0) {
return KNOT_EOK;
}
knot_dynarray_foreach(knot_tcp_relay, knot_tcp_relay_t, i, *relays) {
if (i->free_data != XDP_TCP_FREE_NONE) {
free(i->data.iov_base -
(i->free_data == XDP_TCP_FREE_PREFIX ? sizeof(uint16_t) : 0));
}
int ret = knot_xdp_send(socket, msgs, n_msgs, &sent);
if (ret != KNOT_EOK) {
printf("TCP send[%u/%u]: %s\n", sent, n_msgs, knot_strerror(ret));
}
knot_tcp_relay_dynarray_free(relays);
return KNOT_EOK; // ignore errcode from send
}
static int next_msg(knot_xdp_msg_t *msgs, uint32_t n_msgs, knot_xdp_msg_t **cur,
knot_xdp_socket_t *socket, knot_tcp_relay_t *rl)
{
(*cur)++;
if (*cur - msgs >= n_msgs) {
(void)send_msgs(msgs, n_msgs, socket);
*cur = first_msg(msgs, n_msgs);
(*cur)++;
}
knot_xdp_msg_t *msg = *cur;
knot_xdp_msg_flag_t fl = KNOT_XDP_MSG_TCP;
if (rl->conn->ip_loc.sin6_family == AF_INET6) {
fl |= KNOT_XDP_MSG_IPV6;
}
int ret = knot_xdp_send_alloc(socket, fl, msg);
if (ret != KNOT_EOK) {
return ret;
}
memcpy( msg->eth_from, rl->conn->last_eth_loc, sizeof(msg->eth_from));
memcpy( msg->eth_to, rl->conn->last_eth_rem, sizeof(msg->eth_to));
memcpy(&msg->ip_from, &rl->conn->ip_loc, sizeof(msg->ip_from));
memcpy(&msg->ip_to, &rl->conn->ip_rem, sizeof(msg->ip_to));
msg->ackno = rl->conn->seqno;
msg->seqno = rl->conn->ackno;
msg->payload.iov_len = 0;
return ret;
}
_public_
int knot_tcp_send(knot_xdp_socket_t *socket, knot_tcp_relay_t relays[], uint32_t relay_count)
int knot_tcp_send(knot_xdp_socket_t *socket, knot_tcp_relay_t relays[], uint32_t relay_count,
uint32_t max_at_once)
{
if (relay_count == 0) {
return KNOT_EOK;
@ -455,140 +420,145 @@ int knot_tcp_send(knot_xdp_socket_t *socket, knot_tcp_relay_t relays[], uint32_t
return KNOT_EINVAL;
}
knot_xdp_msg_t msgs[relay_count], *msg = &msgs[0];
int ret = KNOT_EOK, n_msgs = 0;
knot_xdp_send_prepare(socket);
for (size_t irl = 0; irl < relay_count; irl++) {
knot_tcp_relay_t *rl = &relays[irl];
if ((rl->answer & 0x0f) == XDP_TCP_NOOP) {
continue;
knot_xdp_msg_t msgs[max_at_once], *first = first_msg(msgs, max_at_once), *msg = first;
int ret = KNOT_EOK;
for (uint32_t i = 0; i < relay_count; i++) {
knot_tcp_relay_t *rl = &relays[i];
#define NEXT_MSG if ((ret = next_msg(msgs, max_at_once, &msg, socket, rl)) != KNOT_EOK) { return ret; }
if (rl->auto_answer != 0) {
NEXT_MSG
msg->flags |= rl->auto_answer;
if (msg->flags & (KNOT_XDP_MSG_SYN | KNOT_XDP_MSG_FIN)) {
rl->conn->ackno++;
}
}
knot_xdp_msg_flag_t fl = KNOT_XDP_MSG_TCP;
if (rl->conn->ip_loc.sin6_family == AF_INET6) {
fl |= KNOT_XDP_MSG_IPV6;
}
ret = knot_xdp_send_alloc(socket, fl, msg);
if (ret != KNOT_EOK) {
break;
}
memcpy( msg->eth_from, rl->conn->last_eth_loc, sizeof(msg->eth_from));
memcpy( msg->eth_to, rl->conn->last_eth_rem, sizeof(msg->eth_to));
memcpy(&msg->ip_from, &rl->conn->ip_loc, sizeof(msg->ip_from));
memcpy(&msg->ip_to, &rl->conn->ip_rem, sizeof(msg->ip_to));
msg->ackno = rl->conn->seqno;
msg->seqno = rl->conn->ackno;
switch (rl->answer & 0x0f) {
case XDP_TCP_ESTABLISH:
NEXT_MSG
msg->flags |= KNOT_XDP_MSG_SYN;
msg->payload.iov_len = 0;
break;
case XDP_TCP_DATA:
msg->flags |= KNOT_XDP_MSG_ACK;
if (rl->data.iov_len > UINT16_MAX ||
rl->data.iov_len > msg->payload.iov_len) {
ret = KNOT_ESPACE;
} else {
memcpy(msg->payload.iov_base, rl->data.iov_base,
rl->data.iov_len);
msg->payload.iov_len = rl->data.iov_len;
}
assert(rl->conn != NULL);
rl->conn->ackno += msg->payload.iov_len;
rl->conn->ackno++;
break;
case XDP_TCP_CLOSE:
NEXT_MSG
msg->flags |= (KNOT_XDP_MSG_FIN | KNOT_XDP_MSG_ACK);
msg->payload.iov_len = 0;
assert(rl->conn != NULL);
rl->conn->ackno++;
rl->conn->state = XDP_TCP_CLOSING1;
break;
case XDP_TCP_RESET:
default:
NEXT_MSG
msg->flags |= KNOT_XDP_MSG_RST;
msg->payload.iov_len = 0;
break;
case XDP_TCP_NOOP:
default:
break;
}
msg++;
n_msgs++;
if (ret != KNOT_EOK) {
break;
size_t can_data = 0;
struct tcp_outbuf *ob;
if (rl->conn != NULL) {
tcp_outbufs_can_send(&rl->conn->outbufs, rl->conn->window_size,
rl->answer == XDP_TCP_RESEND, &ob, &can_data);
}
while (can_data > 0) {
NEXT_MSG
msg->flags |= KNOT_XDP_MSG_ACK;
msg->payload.iov_len = ob->len;
memcpy(msg->payload.iov_base, ob->bytes, ob->len);
if (!ob->sent) {
assert(rl->conn->ackno == msg->seqno);
rl->conn->ackno += msg->payload.iov_len;
} else {
msg->seqno = ob->seqno;
}
ob->sent = true;
ob->seqno = msg->seqno;
can_data--;
ob = ob->next;
}
}
uint32_t sent_unused;
(void)knot_xdp_send(socket, msgs, n_msgs, &sent_unused);
#undef NEXT_MSG
ret = send_msgs(msgs, msg - first, socket);
(void)knot_xdp_send_finish(socket);
return ret;
}
_public_
int knot_tcp_sweep(knot_tcp_table_t *tcp_table, knot_xdp_socket_t *socket,
uint32_t max_at_once, uint32_t close_timeout, uint32_t reset_timeout,
uint32_t reset_at_least, size_t reset_buf_size,
int knot_tcp_sweep(knot_tcp_table_t *tcp_table,
uint32_t close_timeout, uint32_t reset_timeout,
uint32_t resend_timeout, uint32_t limit_n_conn,
size_t limit_ibuf_size, size_t limit_obuf_size,
knot_tcp_relay_t *relays, size_t max_relays,
uint32_t *close_count, uint32_t *reset_count)
{
if (tcp_table == NULL) {
if (tcp_table == NULL || relays == NULL || max_relays < 1) {
return KNOT_EINVAL;
}
knot_tcp_relay_t rl = { 0 };
knot_tcp_relay_dynarray_t relays = { 0 };
uint32_t now = get_timestamp(), i = 0;
memset(relays, 0, max_relays * sizeof(*relays));
knot_tcp_relay_t *rl = relays;
ssize_t free_conns = (ssize_t)tcp_table->usage - limit_n_conn;
ssize_t free_inbuf = (ssize_t)tcp_table->inbufs_total - limit_ibuf_size;
ssize_t free_outbuf = (ssize_t)tcp_table->outbufs_total - limit_obuf_size;
knot_tcp_conn_t *conn, *next;
list_t to_remove;
init_list(&to_remove);
WALK_LIST_DELSAFE(conn, next, *tcp_table_timeout(tcp_table)) {
if (i++ < reset_at_least ||
rl->conn = conn;
if (i++ < free_conns ||
now - conn->last_active >= reset_timeout ||
(reset_buf_size > 0 && conn->inbuf.iov_len > 0)) {
rl.answer = XDP_TCP_RESET;
(free_inbuf > 0 && conn->inbuf.iov_len > 0) ||
(free_outbuf > 0 && tcp_outbufs_usage(&conn->outbufs) > 0)) {
rl->answer = XDP_TCP_RESET;
rl->del_from = tcp_table_re_lookup(conn, tcp_table);
// move this conn into to-remove list
rem_node((node_t *)conn);
add_tail(&to_remove, (node_t *)conn);
free_inbuf -= conn->inbuf.iov_len;
free_outbuf -= tcp_outbufs_usage(&conn->outbufs);
reset_buf_size -= MIN(reset_buf_size, conn->inbuf.iov_len);
if (reset_count != NULL) {
(*reset_count)++;
}
} else if (now - conn->last_active >= close_timeout) {
if (conn->state != XDP_TCP_CLOSING1) {
rl.answer = XDP_TCP_CLOSE;
rl->answer = XDP_TCP_CLOSE;
if (close_count != NULL) {
(*close_count)++;
}
}
} else if (reset_buf_size == 0) {
break;
} else if (now - conn->last_active >= resend_timeout &&
conn->outbufs.bufs != NULL && conn->outbufs.bufs->sent) {
rl->answer = XDP_TCP_RESEND;
}
rl.conn = conn;
if (rl.answer != XDP_TCP_NOOP) {
(void)knot_tcp_relay_dynarray_add(&relays, &rl);
rl.answer = XDP_TCP_NOOP;
}
if (relays.size >= max_at_once) {
break;
if (rl->answer != XDP_TCP_NOOP) {
if (++rl == relays + max_relays) {
break;
}
}
}
knot_xdp_send_prepare(socket);
(void)knot_tcp_send(socket, knot_tcp_relay_dynarray_arr(&relays), relays.size);
(void)knot_xdp_send_finish(socket);
// immediately remove reset connections
if (reset_count != NULL) {
*reset_count += list_size(&to_remove);
}
WALK_LIST_DELSAFE(conn, next, to_remove) {
tcp_table_del_lookup(conn, tcp_table);
}
knot_tcp_relay_free(&relays);
return KNOT_EOK;
}
_public_
void knot_tcp_cleanup(knot_tcp_table_t *tcp_table, knot_tcp_relay_t *relays, size_t n_relays)
{
for (uint32_t i = 0; i < n_relays; i++) {
if (relays[i].del_from != NULL) {
tcp_table_del(relays[i].del_from, tcp_table);
}
free(relays[i].inbufs);
}
}

View file

@ -35,8 +35,7 @@ typedef enum {
XDP_TCP_ESTABLISH = 2,
XDP_TCP_CLOSE = 3,
XDP_TCP_RESET = 4,
XDP_TCP_DATA = (1 << 3),
XDP_TCP_ANSWER = (1 << 4),
XDP_TCP_RESEND = 5,
} knot_tcp_action_t;
typedef enum {
@ -52,6 +51,10 @@ typedef enum {
XDP_TCP_FREE_PREFIX,
} knot_tcp_relay_free_t;
typedef struct tcp_outbufs {
struct tcp_outbuf *bufs;
} tcp_outbufs_t; // this typedef belongs to tcp_iobuf.h, but is here to avoid issues with symbols
typedef struct knot_tcp_conn {
struct {
void *list_node_placeholder1;
@ -65,9 +68,11 @@ typedef struct knot_tcp_conn {
uint32_t seqno;
uint32_t ackno;
uint32_t acked;
uint32_t window_size;
uint32_t last_active;
knot_tcp_state_t state;
struct iovec inbuf;
tcp_outbufs_t outbufs;
struct knot_tcp_conn *next;
} knot_tcp_conn_t;
@ -75,6 +80,7 @@ typedef struct {
size_t size;
size_t usage;
size_t inbufs_total;
size_t outbufs_total;
uint64_t hash_secret[2];
knot_tcp_conn_t *conns[];
} knot_tcp_table_t;
@ -82,17 +88,14 @@ typedef struct {
typedef struct {
const knot_xdp_msg_t *msg;
knot_tcp_action_t action;
knot_xdp_msg_flag_t auto_answer;
knot_tcp_action_t answer;
struct iovec data;
knot_tcp_relay_free_t free_data;
struct iovec *inbufs;
size_t inbufs_count;
knot_tcp_conn_t *conn;
knot_tcp_conn_t **del_from;
} knot_tcp_relay_t;
#define TCP_RELAY_DEFAULT_COUNT 10
knot_dynarray_declare(knot_tcp_relay, knot_tcp_relay_t, DYNARRAY_VISIBILITY_PUBLIC,
TCP_RELAY_DEFAULT_COUNT)
/*!
* \brief Return next TCP sequence number.
*/
@ -105,6 +108,11 @@ inline static uint32_t knot_tcp_next_seqno(const knot_xdp_msg_t *msg)
return res;
}
inline static bool knot_tcp_relay_empty(const knot_tcp_relay_t *r)
{
return r->action == XDP_TCP_NOOP && r->auto_answer == 0 && r->inbufs_count == 0;
}
/*!
* \brief Allocate TCP connection-handling hash table.
*
@ -124,39 +132,31 @@ knot_tcp_table_t *knot_tcp_table_new(size_t size);
void knot_tcp_table_free(knot_tcp_table_t *table);
/*!
* \brief Process received packets, send ACKs, pick incoming data.
* \brief Process received packets, prepare automatick responses (e.g. ACK), pick incoming data.
*
* \param socket XDP socket to answer through.
* \param msgs Packets received by knot_xdp_recv().
* \param msg_count Number of received packets.
* \param tcp_table Table of TCP connections.
* \param syn_table Optional: extra table for handling partially established connections.
* \param relays Out: connection changes and data.
* \param ack_errors Out: incremented with number of unsent ACKs due to a buffer allocation error.
* \param relays Out: relays to be filled with message/connection details.
* \param msgs Packets received by knot_xdp_recv();
* \param count Number of received packets.
* \param tcp_table Table of TCP connections.
* \param syn_table Optional: extra table for handling partially established connections.
*
* \return KNOT_E*
*/
int knot_tcp_relay(knot_xdp_socket_t *socket, knot_xdp_msg_t msgs[], uint32_t msg_count,
knot_tcp_table_t *tcp_table, knot_tcp_table_t *syn_table,
knot_tcp_relay_dynarray_t *relays, uint32_t *ack_errors);
int knot_tcp_recv(knot_tcp_relay_t *relays, knot_xdp_msg_t *msgs, uint32_t count,
knot_tcp_table_t *tcp_table, knot_tcp_table_t *syn_table);
/*!
* \brief Fetch answer to one relay with one or more relays with data payload.
* \brief Prepare data (payload) to be sent as a response on specific relay.
*
* \param relays Relays.
* \param relay The relay to answer to.
* \param data Data payload, possibly > MSS.
* \param data_len Payload length.
* \param relay Relay with active connection.
* \param tcp_table TCP table.
* \param data Data payload, possibly > MSS and > window.
* \param len Payload length, < 64k.
*
* \return KNOT_EOK, KNOT_ENOMEM
* \return KNOT_E*
*/
int knot_tcp_relay_answer(knot_tcp_relay_dynarray_t *relays, const knot_tcp_relay_t *relay,
void *data, size_t data_len);
/*!
* \brief Free resources in 'relays'.
*/
void knot_tcp_relay_free(knot_tcp_relay_dynarray_t *relays);
int knot_tcp_reply_data(knot_tcp_relay_t *relay, knot_tcp_table_t *tcp_table,
uint8_t *data, size_t len);
/*!
* \brief Send TCP packets.
@ -164,31 +164,43 @@ void knot_tcp_relay_free(knot_tcp_relay_dynarray_t *relays);
* \param socket XDP socket to send through.
* \param relays Connection changes and data.
* \param relay_count Number of connection changes and data.
* \param max_at_once Limit of packet batch sent by knot_xdp_send().
*
* \return KNOT_E*
*/
int knot_tcp_send(knot_xdp_socket_t *socket, knot_tcp_relay_t relays[], uint32_t relay_count);
int knot_tcp_send(knot_xdp_socket_t *socket, knot_tcp_relay_t relays[], uint32_t relay_count,
uint32_t max_at_once);
/*!
* \brief Cleanup old TCP connections, perform timeout checks.
*
* \param tcp_table TCP connection table to clean up.
* \param socket XDP socket for close messages.
* \param max_at_once Don't close more connections at once.
* \param close_timeout Gracefully close connections older than this (usecs).
* \param reset_timeout Reset connections older than this (usecs).
* \param reset_at_least Reset at least this number of oldest connections, even
* when not yet timed out.
* \param reset_buf_size Reset oldest connection with buffered partial DNS messages
* to free up this amount of space.
* \param close_count Optional: Out: incremented with number of closed connections.
* \param reset_count Optional: Out: incremented with number of reset connections.
* \param limit_n_conn Limit of active connections in TCP table, reset if more.
* \param limit_ibuf_size Limit of memory usage by input buffers, reset if exceeded.
* \param limit_obuf_size Limit of memory usage by output buffers, reset if exceeded.
* \param relays Out: relays to be filled with close/reset instructions for knot_tcp_send().
* \param max_relays Maximum relays to be used.
* \param close_count Out: number of connection closed.
* \param reset_count Out: number of connections reset.
*
* \return KNOT_E*
* \return KNOT_E*
*/
int knot_tcp_sweep(knot_tcp_table_t *tcp_table, knot_xdp_socket_t *socket,
uint32_t max_at_once, uint32_t close_timeout, uint32_t reset_timeout,
uint32_t reset_at_least, size_t reset_buf_size,
int knot_tcp_sweep(knot_tcp_table_t *tcp_table,
uint32_t close_timeout, uint32_t reset_timeout,
uint32_t resend_timeout, uint32_t limit_n_conn,
size_t limit_ibuf_size, size_t limit_obuf_size,
knot_tcp_relay_t *relays, size_t max_relays,
uint32_t *close_count, uint32_t *reset_count);
/*!
* \brief Free resources of closed/reset connections.
*
* \param tcp_table TCP table with connections.
* \param relays Relays with closed/resettted (or other, ignored) connections.
* \param n_relays Number of relays.
*/
void knot_tcp_cleanup(knot_tcp_table_t *tcp_table, knot_tcp_relay_t *relays, size_t n_relays);
/*! @} */

View file

@ -14,80 +14,244 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include "libknot/xdp/tcp_iobuf.h"
#include "libknot/xdp/tcp.h" // just tcp_outbufs_t
#include "libknot/error.h"
#include "contrib/macros.h"
int tcp_inbuf_update(struct iovec *buffer, struct iovec *data,
struct iovec *data_tofree, size_t *buffers_total)
static void iov_clear(struct iovec *iov)
{
memset(data_tofree, 0, sizeof(*data_tofree));
if (data->iov_len < 1) {
free(iov->iov_base);
memset(iov, 0, sizeof(*iov));
}
static void iov_inc(struct iovec *iov, size_t shift)
{
assert(shift <= iov->iov_len);
iov->iov_base += shift;
iov->iov_len -= shift;
}
/*! \brief Strip 2-byte length prefix from a payload. */
static void iov_inc2(struct iovec *iov)
{
iov_inc(iov, sizeof(uint16_t));
}
static size_t tcp_payload_len(const struct iovec *payload)
{
assert(payload->iov_len >= 2);
uint16_t val;
memcpy(&val, payload->iov_base, sizeof(val));
return be16toh(val) + sizeof(val);
}
static bool iov_inc_pf(struct iovec *iov)
{
size_t shift = tcp_payload_len(iov);
if (iov->iov_len >= shift) {
iov_inc(iov, shift);
return true;
} else {
return false;
}
}
static size_t iov_count(const struct iovec *iov)
{
size_t res = 0;
struct iovec tmp = *iov;
while (tmp.iov_len >= sizeof(uint16_t) && iov_inc_pf(&tmp)) {
res++;
}
return res;
}
static void iov_append(struct iovec *what, const struct iovec *with)
{
// NOTE: what->iov_base must be pre-allocated large enough
memcpy(what->iov_base + what->iov_len, with->iov_base, with->iov_len);
what->iov_len += with->iov_len;
}
int tcp_inbuf_update(struct iovec *buffer, struct iovec data,
struct iovec **inbufs, size_t *inbufs_count,
size_t *buffers_total)
{
size_t res_count = 0;
struct iovec *res = NULL, *cur = NULL;
*inbufs = NULL;
*inbufs_count = 0;
if (data.iov_len < 1) {
return KNOT_EOK;
}
if (buffer->iov_len == 1) {
((uint8_t *)buffer->iov_base)[1] = ((uint8_t *)data->iov_base)[0];
((uint8_t *)buffer->iov_base)[1] = ((uint8_t *)data.iov_base)[0];
buffer->iov_len++;
data->iov_base++;
data->iov_len--;
if (data->iov_len < 1) {
iov_inc(&data, 1);
if (data.iov_len < 1) {
return KNOT_EOK;
}
}
if (buffer->iov_len > 0) {
size_t buffer_req = tcp_payload_len(buffer);
assert(buffer_req > buffer->iov_len);
size_t data_use = buffer_req - buffer->iov_len;
if (data_use <= data->iov_len) { // usable payload combined from buffer and data ---> data_tofree
data_tofree->iov_len = buffer_req;
data_tofree->iov_base = realloc(buffer->iov_base, buffer_req);
if (data_tofree->iov_base == NULL) {
struct iovec data_use = { data.iov_base, buffer_req - buffer->iov_len };
if (data_use.iov_len <= data.iov_len) { // usable payload combined from buffer and data ---> res[0] allocated tohether with res
iov_inc(&data, data_use.iov_len);
res_count = 1 + iov_count(&data);
res = malloc(res_count * sizeof(*res) + buffer_req);
if (res == NULL) {
return KNOT_ENOMEM;
}
memcpy(data_tofree->iov_base + buffer->iov_len, data->iov_base, data_use);
res[0].iov_base = (void *)(res + res_count);
res[0].iov_len = 0;
iov_append(&res[0], buffer);
iov_append(&res[0], &data_use);
assert(res[0].iov_len == buffer_req);
iov_inc2(&res[0]);
cur = &res[1];
*buffers_total -= buffer->iov_len;
buffer->iov_base = NULL;
buffer->iov_len = 0;
data->iov_base += data_use;
data->iov_len -= data_use;
iov_clear(buffer);
} else { // just extend the buffer with data
void *bufnew = realloc(buffer->iov_base, buffer->iov_len + data->iov_len);
void *bufnew = realloc(buffer->iov_base, buffer->iov_len + data.iov_len);
if (bufnew == NULL) {
return KNOT_ENOMEM;
}
buffer->iov_base = bufnew;
memcpy(buffer->iov_base + buffer->iov_len, data->iov_base, data->iov_len);
*buffers_total += data->iov_len;
buffer->iov_len += data->iov_len;
data->iov_base += data->iov_len;
data->iov_len = 0;
iov_append(buffer, &data);
*buffers_total += data.iov_len;
return KNOT_EOK;
}
} else { // just allocate res
res_count = iov_count(&data);
if (res_count > 0) {
res = malloc(res_count * sizeof(*res));
if (res == NULL) {
return KNOT_ENOMEM;
}
cur = &res[0];
}
}
// skip whole usable payloads in data
struct iovec data_end = *data;
size_t data_req;
while (data_end.iov_len > 1 && (data_req = tcp_payload_len(&data_end)) <= data_end.iov_len) {
data_end.iov_base += data_req;
data_end.iov_len -= data_req;
void *last;
while (data.iov_len > 1) {
last = data.iov_base;
if (!iov_inc_pf(&data)) {
break;
}
cur->iov_base = last;
cur->iov_len = data.iov_base - last;
iov_inc2(cur);
cur++;
}
assert(cur == res + res_count);
// store the final incomplete payload to buffer
if (data_end.iov_len > 0) {
if (data.iov_len > 0) {
assert(buffer->iov_base == NULL);
buffer->iov_base = malloc(MAX(data_end.iov_len, 2));
buffer->iov_base = malloc(MAX(data.iov_len, 2));
if (buffer->iov_base == NULL) {
free(data_tofree->iov_base);
memset(data_tofree, 0, sizeof(*data_tofree));
free(res);
return KNOT_ENOMEM;
}
*buffers_total += MAX(data_end.iov_len, 2);
buffer->iov_len = data_end.iov_len;
memcpy(buffer->iov_base, data_end.iov_base, data_end.iov_len);
data->iov_len -= data_end.iov_len;
*buffers_total += MAX(data.iov_len, 2);
buffer->iov_len = 0;
iov_append(buffer, &data);
}
*inbufs = res;
*inbufs_count = res_count;
return KNOT_EOK;
}
int tcp_outbufs_add(struct tcp_outbufs *ob, uint8_t *data, size_t len,
uint32_t mss, size_t *outbufs_total)
{
if (len > UINT16_MAX) {
return KNOT_ELIMIT;
}
struct tcp_outbuf **end = &ob->bufs;
while (*end != NULL) { // NOTE: this can be optimized by adding "end" pointer for the price of larger knot_tcp_conn_t struct
end = &(*end)->next;
}
uint16_t prefix = htobe16(len), prefix_len = sizeof(prefix);
while (len > 0) {
uint16_t newlen = MIN(len + prefix_len, mss);
struct tcp_outbuf *newob = calloc(1, sizeof(*newob) + newlen);
if (newob == NULL) {
return KNOT_ENOMEM;
}
*outbufs_total += sizeof(*newob) + newlen;
newob->len = newlen;
memcpy(newob->bytes, &prefix, prefix_len);
memcpy(newob->bytes + prefix_len, data, newlen - prefix_len);
*end = newob;
end = &newob->next;
data += newlen - prefix_len;
len -= newlen - prefix_len;
prefix_len = 0;
}
return KNOT_EOK;
}
static bool seqno_lower(uint32_t seqno, uint32_t ackno, uint32_t ackno_min)
{
if (ackno_min <= ackno) {
return (seqno >= ackno_min && seqno <= ackno);
} else {
return (seqno >= ackno_min || seqno <= ackno);
}
}
void tcp_outbufs_ack(struct tcp_outbufs *ob, uint32_t ackno, size_t *outbufs_total)
{
uint32_t ackno_min = ackno - (UINT32_MAX / 2); // FIXME better?
while (ob->bufs != NULL && ob->bufs->sent && seqno_lower(ob->bufs->seqno + ob->bufs->len, ackno, ackno_min)) {
struct tcp_outbuf *tofree = ob->bufs;
ob->bufs = tofree->next;
*outbufs_total -= tofree->len + sizeof(*tofree);
free(tofree);
}
}
void tcp_outbufs_can_send(struct tcp_outbufs *ob, ssize_t window_size, bool resend,
struct tcp_outbuf **send_start, size_t *send_count)
{
*send_count = 0;
*send_start = ob->bufs;
while (*send_start != NULL && (*send_start)->sent && !resend) {
window_size -= (*send_start)->len;
*send_start = (*send_start)->next;
}
struct tcp_outbuf *can_send = *send_start;
while (can_send != NULL && window_size >= can_send->len) {
(*send_count)++;
window_size -= can_send->len;
can_send = can_send->next;
}
}
size_t tcp_outbufs_usage(struct tcp_outbufs *ob)
{
size_t res = 0;
for (struct tcp_outbuf *i = ob->bufs; i != NULL; i = i->next) {
res += i->len + sizeof(*i);
}
return res;
}

View file

@ -31,28 +31,69 @@
#include "libknot/endian.h"
/*!
* \brief Return the required length for payload buffer.
*/
inline static size_t tcp_payload_len(const struct iovec *payload)
{
assert(payload->iov_len >= 2);
uint16_t val;
memcpy(&val, payload->iov_base, sizeof(val));
return be16toh(val) + sizeof(val);
}
struct tcp_outbuf {
struct tcp_outbuf *next;
uint32_t len;
uint32_t seqno;
bool sent;
uint8_t bytes[];
};
struct tcp_outbufs; // see tcp.h
/*!
* \brief Handle DNS-over-TCP payloads in buffer and message.
*
* \param buffer In/out: persistent buffer to store incomplete DNS payloads between receiving packets.
* \param data In/out: momental DNS payloads in incoming packet.
* \param data_tofree Out: once more DNS payload defragmented from multiple packets.
* \param data In: momental DNS payloads in incoming packet.
* \param inbufs Out: list of incoming DNS messages.
* \param inbufs_count Out: number of inbufs.
* \param buffers_total In/Out: total size of buffers (will be increased or decreased).
*
* \return KNOT_EOK, KNOT_ENOMEM
*/
int tcp_inbuf_update(struct iovec *buffer, struct iovec *data,
struct iovec *data_tofree, size_t *buffers_total);
int tcp_inbuf_update(struct iovec *buffer, struct iovec data,
struct iovec **inbufs, size_t *inbufs_count,
size_t *buffers_total);
/*!
* \brief Add payload to be sent by TCP, to output buffers.
*
* \param ob Output buffers to be updated.
* \param data Payload to be sent.
* \param len Payload length.
* \param mss Connection outgoing MSS.
* \param outbufs_total In/out: total outbuf statistic to be updated.
*
* \return KNOT_E*
*/
int tcp_outbufs_add(struct tcp_outbufs *ob, uint8_t *data, size_t len,
uint32_t mss, size_t *outbufs_total);
/*!
* \brief Remove+free acked data from output buffers.
*
* \param ob Output buffers to be updated.
* \param ackno Ackno of received ACK.
* \param outbufs_total In/out: total outbuf statistic to be updated.
*/
void tcp_outbufs_ack(struct tcp_outbufs *ob, uint32_t ackno, size_t *outbufs_total);
/*!
* \brief Prepare output buffers to be sent now.
*
* \param ob Output buffers to be updated.
* \param window_size Connection outgoing window size.
* \param resend Send also possibly already sent data.
* \param send_start Out: first output buffer to be sent.
* \param send_count Out: number of output buffers to be sent.
*/
void tcp_outbufs_can_send(struct tcp_outbufs *ob, ssize_t window_size, bool resend,
struct tcp_outbuf **send_start, size_t *send_count);
/*!
* \brief Compute allocated size of output buffers.
*/
size_t tcp_outbufs_usage(struct tcp_outbufs *ob);
/*! @} */

View file

@ -419,36 +419,25 @@ void *xdp_gun_thread(void *_ctx)
break;
}
if (ctx->tcp) {
uint32_t ack_errors = 0;
knot_tcp_relay_dynarray_t relays = { 0 };
ret = knot_tcp_relay(xsk, pkts, recvd, tcp_table, NULL,
&relays, &ack_errors);
lost += ack_errors;
knot_tcp_relay_t relays[recvd];
ret = knot_tcp_recv(relays, pkts, recvd, tcp_table, NULL);
if (ret != KNOT_EOK) {
errors++;
break;
}
size_t relays_answer = relays.size;
for (size_t i = 0; i < relays_answer; i++) {
knot_tcp_relay_t *rl = &knot_tcp_relay_dynarray_arr(&relays)[i];
for (size_t i = 0; i < recvd; i++) {
knot_tcp_relay_t *rl = &relays[i];
struct iovec payl;
switch (rl->action) {
case XDP_TCP_ESTABLISH:
local_stats.synack_recv++;
rl->answer = XDP_TCP_ANSWER | XDP_TCP_DATA;
put_dns_payload(&payl, true, ctx, &payload_ptr);
ret = knot_tcp_relay_answer(&relays, rl, payl.iov_base,
payl.iov_len);
ret = knot_tcp_reply_data(rl, tcp_table, payl.iov_base, payl.iov_len);
if (ret != KNOT_EOK) {
errors++;
}
break;
case XDP_TCP_DATA:
if (check_dns_payload(&rl->data, ctx, &local_stats)) {
rl->answer = XDP_TCP_ANSWER | XDP_TCP_CLOSE;
}
break;
case XDP_TCP_CLOSE:
local_stats.finack_recv++;
break;
@ -458,16 +447,20 @@ void *xdp_gun_thread(void *_ctx)
default:
break;
}
for (size_t j = 0; j < rl->inbufs_count; j++) {
if (check_dns_payload(&rl->inbufs[j], ctx, &local_stats)) {
rl->answer = XDP_TCP_CLOSE;
}
}
}
ret = knot_tcp_send(xsk, knot_tcp_relay_dynarray_arr(&relays),
relays.size);
ret = knot_tcp_send(xsk, relays, recvd, ctx->at_once);
if (ret != KNOT_EOK) {
errors++;
}
(void)knot_xdp_send_finish(xsk);
knot_tcp_relay_free(&relays);
knot_tcp_cleanup(tcp_table, relays, recvd);
} else {
for (int i = 0; i < recvd; i++) {
(void)check_dns_payload(&pkts[i].payload, ctx,

View file

@ -994,6 +994,7 @@ static const yp_item_t desc_xdp[] = {
{ C_TCP_INBUF_MAX_SIZE, YP_TINT, YP_VNONE },
{ C_TCP_IDLE_CLOSE, YP_TINT, YP_VNONE },
{ C_TCP_IDLE_RESET, YP_TINT, YP_VNONE },
{ C_TCP_RESEND, YP_TINT, YP_VNONE },
{ C_ROUTE_CHECK, YP_TBOOL, YP_VNONE },
{ NULL }
};

View file

@ -32,6 +32,8 @@ size_t sent_syns = 0;
size_t sent_fins = 0;
uint32_t sent_seqno = 0;
uint32_t sent_ackno = 0;
size_t sent2_data = 0;
size_t send2_mss = 0;
knot_xdp_socket_t *test_sock = NULL;
@ -53,19 +55,15 @@ static size_t tcp_table_timeout_length(knot_tcp_table_t *table)
* \param tcp_table TCP connection table to clean up.
* \param timeout Remove connections older than this (usecs).
* \param at_least Remove at least this number of connections.
* \param cleaned Optional: Out: number of removed connections.
*/
static void tcp_cleanup(knot_tcp_table_t *tcp_table, uint32_t timeout,
uint32_t at_least, uint32_t *cleaned)
uint32_t at_least)
{
uint32_t now = get_timestamp(), i = 0;
knot_tcp_conn_t *conn, *next;
WALK_LIST_DELSAFE(conn, next, *tcp_table_timeout(tcp_table)) {
if (i++ < at_least || now - conn->last_active >= timeout) {
tcp_table_del_lookup(conn, tcp_table);
if (cleaned != NULL) {
(*cleaned)++;
}
tcp_table_del(tcp_table_re_lookup(conn, tcp_table), tcp_table);
}
}
}
@ -129,9 +127,26 @@ static int mock_send_nocheck(_unused_ knot_xdp_socket_t *sock, const knot_xdp_ms
return KNOT_EOK;
}
static int mock_send2(_unused_ knot_xdp_socket_t *sock, const knot_xdp_msg_t msgs[],
uint32_t n_msgs, _unused_ uint32_t *sent)
{
ok(n_msgs <= 20, "send2: not too many at once");
for (uint32_t i = 0; i < n_msgs; i++) {
const knot_xdp_msg_t *msg = msgs + i;
ok(msg->flags & KNOT_XDP_MSG_TCP, "send2: is TCP message");
ok(msg->flags & KNOT_XDP_MSG_ACK, "send2: has ACK");
ok(msg->payload.iov_len <= send2_mss, "send2: fulfilled MSS");
sent2_data += msg->payload.iov_len;
sent_seqno = msg->seqno;
sent_ackno = msg->ackno;
}
return KNOT_EOK;
}
static void clean_table(void)
{
(void)tcp_cleanup(test_table, 0, UINT32_MAX, NULL);
(void)tcp_cleanup(test_table, 0, UINT32_MAX);
}
static void clean_sent(void)
@ -190,71 +205,70 @@ static void fix_seqacks(knot_xdp_msg_t *msgs, size_t count)
void test_syn(void)
{
knot_xdp_msg_t msg;
knot_tcp_relay_dynarray_t relays = { 0 };
knot_tcp_relay_t rl;
prepare_msg(&msg, KNOT_XDP_MSG_SYN, 1, 2);
int ret = knot_tcp_relay(test_sock, &msg, 1, test_table, NULL, &relays, NULL);
int ret = knot_tcp_recv(&rl, &msg, 1, test_table, NULL);
is_int(KNOT_EOK, ret, "SYN: relay OK");
ret = knot_tcp_send(test_sock, &rl, 1, 1);
is_int(KNOT_EOK, ret, "SYN: send OK");
is_int(msg.seqno + 1, sent_ackno, "SYN: ackno");
check_sent(0, 0, 1, 0);
is_int(1, relays.size, "SYN: one relay");
knot_tcp_relay_t *rl = &knot_tcp_relay_dynarray_arr(&relays)[0];
is_int(XDP_TCP_SYN, rl->action, "SYN: relay action");
is_int(XDP_TCP_NOOP, rl->answer, "SYN: relay answer");
is_int(0, rl->data.iov_len, "SYN: no payload");
is_int(XDP_TCP_SYN, rl.action, "SYN: relay action");
is_int(XDP_TCP_NOOP, rl.answer, "SYN: relay answer");
is_int(0, rl.inbufs_count, "SYN: no payload");
is_int(1, test_table->usage, "SYN: one connection in table");
knot_tcp_conn_t *conn = tcp_table_find(test_table, &msg);
ok(conn != NULL, "SYN: connection present");
ok(conn == rl->conn, "SYN: relay points to connection");
ok(conn == rl.conn, "SYN: relay points to connection");
is_int(XDP_TCP_ESTABLISHING, conn->state, "SYN: connection state");
ok(memcmp(&conn->ip_rem, &msg.ip_from, sizeof(msg.ip_from)) == 0, "SYN: conn IP from");
ok(memcmp(&conn->ip_loc, &msg.ip_to, sizeof(msg.ip_to)) == 0, "SYN: conn IP to");
knot_tcp_relay_free(&relays);
knot_tcp_cleanup(test_table, &rl, 1);
test_conn = conn;
}
void test_establish(void)
{
knot_xdp_msg_t msg;
knot_tcp_relay_dynarray_t relays = { 0 };
knot_tcp_relay_t rl;
prepare_msg(&msg, KNOT_XDP_MSG_ACK, 1, 2);
prepare_seqack(&msg, 0, 1);
int ret = knot_tcp_relay(test_sock, &msg, 1, test_table, NULL, &relays, NULL);
int ret = knot_tcp_recv(&rl, &msg, 1, test_table, NULL);
is_int(KNOT_EOK, ret, "establish: relay OK");
ret = knot_tcp_send(test_sock, &rl, 1, 1);
is_int(KNOT_EOK, ret, "establish: send OK");
check_sent(0, 0, 0, 0);
is_int(0, relays.size, "establish: no relay");
/*knot_tcp_relay_t *rl = &knot_tcp_relay_dynarray_arr(&relays)[0];
is_int(XDP_TCP_ESTABLISH, rl->action, "establish: relay action");
ok(rl->conn != NULL, "establish: connection present");
ok(rl->conn == test_conn, "establish: same connection");
is_int(XDP_TCP_NORMAL, rl->conn->state, "establish: connection state");*/
is_int(0, rl.auto_answer, "establish: no auto answer");
knot_tcp_relay_free(&relays);
knot_tcp_cleanup(test_table, &rl, 1);
clean_table();
}
void test_syn_ack(void)
{
knot_xdp_msg_t msg;
knot_tcp_relay_dynarray_t relays = { 0 };
knot_tcp_relay_t rl;
prepare_msg(&msg, KNOT_XDP_MSG_SYN | KNOT_XDP_MSG_ACK, 1000, 2000);
int ret = knot_tcp_relay(test_sock, &msg, 1, test_table, NULL, &relays, NULL);
int ret = knot_tcp_recv(&rl, &msg, 1, test_table, NULL);
is_int(KNOT_EOK, ret, "SYN+ACK: relay OK");
ret = knot_tcp_send(test_sock, &rl, 1, 1);
is_int(KNOT_EOK, ret, "SYN+ACK: send OK");
is_int(msg.seqno + 1, sent_ackno, "SYN+ACK: ackno");
check_sent(1, 0, 0, 0);
is_int(1, relays.size, "SYN+ACK: one relay");
knot_tcp_relay_t *rl = &knot_tcp_relay_dynarray_arr(&relays)[0];
is_int(XDP_TCP_ESTABLISH, rl->action, "SYN+ACK: relay action");
ok(rl->conn != NULL, "SYN+ACK: connection present");
is_int(XDP_TCP_ESTABLISH, rl.action, "SYN+ACK: relay action");
ok(rl.conn != NULL, "SYN+ACK: connection present");
test_conn = rl->conn;
knot_tcp_relay_free(&relays);
test_conn = rl.conn;
knot_tcp_cleanup(test_table, &rl, 1);
}
void test_data_fragments(void)
{
knot_xdp_msg_t msgs[4];
knot_tcp_relay_dynarray_t relays = { 0 };
const size_t CONNS = 4;
knot_xdp_msg_t msgs[CONNS];
knot_tcp_relay_t rls[CONNS];
memset(rls, 0, CONNS * sizeof(*rls));
// first msg contains one whole payload and one fragment
prepare_msg(&msgs[0], KNOT_XDP_MSG_ACK, 1000, 2000);
@ -276,37 +290,44 @@ void test_data_fragments(void)
prepare_seqack(&msgs[3], 15, 0);
prepare_data(&msgs[3], "\x02""AB""\xff\xff""abcdefghijklmnopqrstuvwxyz...", 34);
int ret = knot_tcp_relay(test_sock, msgs, sizeof(msgs) / sizeof(msgs[0]),
test_table, NULL, &relays, NULL);
int ret = knot_tcp_recv(rls, msgs, CONNS, test_table, NULL);
is_int(KNOT_EOK, ret, "fragments: relay OK");
ret = knot_tcp_send(test_sock, rls, CONNS, CONNS);
is_int(KNOT_EOK, ret, "fragments: send OK");
is_int(msgs[3].ackno, sent_seqno, "fragments: seqno");
is_int(msgs[3].seqno + msgs[3].payload.iov_len, sent_ackno, "fragments: ackno");
check_sent(4, 0, 0, 0);
knot_tcp_relay_t *rls = knot_tcp_relay_dynarray_arr(&relays);
is_int(XDP_TCP_DATA, rls[0].action, "fragments0: action");
is_int(XDP_TCP_NOOP, rls[0].answer, "fragments0: answer");
is_int(3, rls[0].data.iov_len, "fragments0: data length");
ok(memcmp("xyz", rls[0].data.iov_base, rls[0].data.iov_len) == 0, "fragments0: data");
is_int(KNOT_XDP_MSG_ACK, rls[0].auto_answer, "fragments[0]: auto answer");
ok(rls[0].conn != NULL, "fragments0: connection present");
ok(rls[0].conn == test_conn, "fragments0: same connection");
is_int(1, rls[0].inbufs_count, "fragments0: inbufs count");
is_int(3, rls[0].inbufs[0].iov_len, "fragments0: data length");
is_int(0, memcmp("xyz", rls[0].inbufs[0].iov_base, rls[0].inbufs[0].iov_len), "fragments0: data");
is_int(XDP_TCP_DATA, rls[1].action, "fragments1: action");
is_int(4, rls[1].data.iov_len, "fragments1: data length");
ok(memcmp("abcd", rls[1].data.iov_base, rls[1].data.iov_len) == 0, "fragments1: data");
ok(rls[1].conn == test_conn, "fragments1: same connection");
is_int(KNOT_XDP_MSG_ACK, rls[1].auto_answer, "fragments[1]: auto answer");
is_int(XDP_TCP_NOOP, rls[1].action, "fragments[1]: action"); // NOTE: NOOP
ok(rls[0].conn != NULL, "fragments1: connection present");
ok(rls[0].conn == test_conn, "fragments1: same connection");
is_int(0, rls[1].inbufs_count, "fragments1: inbufs count");
is_int(XDP_TCP_DATA, rls[2].action, "fragments2: action");
is_int(1, rls[2].data.iov_len, "fragments2: data length");
ok(memcmp("i", rls[2].data.iov_base, rls[2].data.iov_len) == 0, "fragments2: data");
ok(rls[2].conn == test_conn, "fragments2: same connection");
is_int(KNOT_XDP_MSG_ACK, rls[2].auto_answer, "fragments[2]: auto answer");
ok(rls[0].conn != NULL, "fragments2: connection present");
ok(rls[0].conn == test_conn, "fragments2: same connection");
is_int(2, rls[2].inbufs_count, "fragments2: inbufs count");
is_int(4, rls[2].inbufs[0].iov_len, "fragments2-0: data length");
is_int(0, memcmp("abcd", rls[2].inbufs[0].iov_base, rls[2].inbufs[0].iov_len), "fragments2-0: data");
is_int(1, rls[2].inbufs[1].iov_len, "fragments2-1: data length");
is_int(0, memcmp("i", rls[2].inbufs[1].iov_base, rls[2].inbufs[1].iov_len), "fragments2-1: data");
is_int(XDP_TCP_DATA, rls[3].action, "fragments3: action");
is_int(2, rls[3].data.iov_len, "fragments3: data length");
ok(memcmp("AB", rls[3].data.iov_base, rls[3].data.iov_len) == 0, "fragments3: data");
ok(rls[3].conn == test_conn, "fragments3: same connection");
is_int(KNOT_XDP_MSG_ACK, rls[3].auto_answer, "fragments[3]: auto answer");
ok(rls[0].conn != NULL, "fragments3: connection present");
ok(rls[0].conn == test_conn, "fragments3: same connection");
is_int(1, rls[3].inbufs_count, "fragments3: inbufs count");
is_int(2, rls[3].inbufs[0].iov_len, "fragments3: data length");
is_int(0, memcmp("AB", rls[3].inbufs[0].iov_base, rls[3].inbufs[0].iov_len), "fragments3: data");
knot_tcp_relay_free(&relays);
knot_tcp_cleanup(test_table, rls, 4);
}
void test_close(void)
@ -314,25 +335,26 @@ void test_close(void)
size_t conns_pre = test_table->usage;
knot_xdp_msg_t msg;
knot_tcp_relay_dynarray_t relays = { 0 };
knot_tcp_relay_t rl;
prepare_msg(&msg, KNOT_XDP_MSG_FIN | KNOT_XDP_MSG_ACK,
be16toh(test_conn->ip_rem.sin6_port),
be16toh(test_conn->ip_loc.sin6_port));
prepare_seqack(&msg, 0, 0);
int ret = knot_tcp_relay(test_sock, &msg, 1, test_table, NULL, &relays, NULL);
int ret = knot_tcp_recv(&rl, &msg, 1, test_table, NULL);
is_int(KNOT_EOK, ret, "close: relay 1 OK");
ret = knot_tcp_send(test_sock, &rl, 1, 1);
is_int(KNOT_EOK, ret, "close: send OK");
check_sent(0, 0, 0, 1);
is_int(1, relays.size, "close: one relay");
knot_tcp_relay_t *rl = &knot_tcp_relay_dynarray_arr(&relays)[0];
is_int(XDP_TCP_CLOSE, rl->action, "close: relay action");
ok(rl->conn == test_conn, "close: same connection");
is_int(XDP_TCP_CLOSING2, rl->conn->state, "close: conn state");
knot_tcp_relay_free(&relays);
is_int(XDP_TCP_CLOSE, rl.action, "close: relay action");
ok(rl.conn == test_conn, "close: same connection");
is_int(XDP_TCP_CLOSING2, rl.conn->state, "close: conn state");
msg.flags &= ~KNOT_XDP_MSG_FIN;
prepare_seqack(&msg, 0, 0);
ret = knot_tcp_relay(test_sock, &msg, 1, test_table, NULL, &relays, NULL);
ret = knot_tcp_recv(&rl, &msg, 1, test_table, NULL);
is_int(KNOT_EOK, ret, "close: relay 2 OK");
ret = knot_tcp_send(test_sock, &rl, 1, 1);
is_int(KNOT_EOK, ret, "close: send 2 OK");
check_sent(0, 0, 0, 0);
is_int(conns_pre - 1, test_table->usage, "close: connection removed");
is_int(conns_pre - 1, tcp_table_timeout_length(test_table), "close: timeout list size");
@ -349,61 +371,71 @@ void test_many(void)
for (size_t i = 0; i < CONNS; i++) {
prepare_msg(&msgs[i], KNOT_XDP_MSG_SYN, i + 2, 1);
}
knot_tcp_relay_t *rls = malloc(CONNS * sizeof(*rls));
knot_tcp_relay_dynarray_t relays = { 0 };
int ret = knot_tcp_relay(test_sock, msgs, CONNS, test_table, NULL, &relays, NULL);
int ret = knot_tcp_recv(rls, msgs, CONNS, test_table, NULL);
is_int(KNOT_EOK, ret, "many: relay OK");
ret = knot_tcp_send(test_sock, rls, CONNS, CONNS);
is_int(KNOT_EOK, ret, "many: relay send OK");
check_sent(0, 0, CONNS, 0);
is_int(CONNS, relays.size, "many: relays count");
is_int(CONNS, test_table->usage, "many: table usage");
knot_tcp_relay_free(&relays);
knot_tcp_cleanup(test_table, rls, CONNS);
usleep(timeout_time);
knot_xdp_msg_t *survive = &msgs[i_survive];
knot_tcp_relay_t surv_rl;
survive->flags = (KNOT_XDP_MSG_TCP | KNOT_XDP_MSG_ACK);
knot_tcp_conn_t *surv_conn = tcp_table_find(test_table, survive);
fix_seqack(survive);
prepare_data(survive, "\x00\x00", 2);
(void)knot_tcp_relay(test_sock, survive, 1, test_table, NULL, &relays, NULL);
is_int(1, relays.size, "many/survivor: one relay");
knot_tcp_relay_t *rl = &knot_tcp_relay_dynarray_arr(&relays)[0];
ret = knot_tcp_recv(&surv_rl, survive, 1, test_table, NULL);
is_int(KNOT_EOK, ret, "many/survivor: OK");
clean_sent();
uint32_t reset_count = 0, close_count = 0;
ret = knot_tcp_sweep(test_table, test_sock, UINT32_MAX, timeout_time, UINT32_MAX,
0, 0, &close_count, &reset_count);
ret = knot_tcp_sweep(test_table, timeout_time, UINT32_MAX, UINT32_MAX, UINT32_MAX, UINT32_MAX,
UINT32_MAX, rls, CONNS, &close_count, &reset_count);
is_int(KNOT_EOK, ret, "many/timeout1: OK");
is_int(CONNS - 1, close_count, "many/timeout1: close count");
is_int(0, reset_count, "may/timeout1: reset count");
ret = knot_tcp_send(test_sock, rls, CONNS, CONNS);
is_int(KNOT_EOK, ret, "many/timeout1: send OK");
check_sent(0, 0, 0, CONNS - 1);
close_count = 0;
ret = knot_tcp_sweep(test_table, test_sock, UINT32_MAX, UINT32_MAX, timeout_time,
0, 0, &close_count, &reset_count);
ret = knot_tcp_sweep(test_table, UINT32_MAX, timeout_time, UINT32_MAX, UINT32_MAX, UINT32_MAX,
UINT32_MAX, rls, CONNS, &close_count, &reset_count);
is_int(KNOT_EOK, ret, "many/timeout2: OK");
is_int(0, close_count, "many/timeout2: close count");
is_int(CONNS - 1, reset_count, "may/timeout2: reset count");
ret = knot_tcp_send(test_sock, rls, CONNS, CONNS);
is_int(KNOT_EOK, ret, "many/timeout2: send OK");
check_sent(0, CONNS - 1, 0, 0);
knot_tcp_cleanup(test_table, rls, CONNS);
is_int(1, test_table->usage, "many/timeout: one survivor");
is_int(1, tcp_table_timeout_length(test_table), "many/timeout: one survivor in timeout list");
ok(surv_conn != NULL, "many/timeout: survivor connection present");
ok(surv_conn == rl->conn, "many/timeout: same connection");
ok(surv_conn == surv_rl.conn, "many/timeout: same connection");
knot_tcp_cleanup(test_table, &surv_rl, 1);
free(msgs);
free(rls);
}
void test_ibufs_size(void)
{
int CONNS = 4;
knot_xdp_msg_t msgs[CONNS];
knot_tcp_relay_dynarray_t relays = { 0 };
knot_tcp_relay_t rls[CONNS];
// just open connections
for (int i = 0; i < CONNS; i++) {
prepare_msg(&msgs[i], KNOT_XDP_MSG_SYN, i + 2000, 1);
}
int ret = knot_tcp_relay(test_sock, msgs, CONNS, test_table, NULL, &relays, NULL);
int ret = knot_tcp_recv(rls, msgs, CONNS, test_table, NULL);
is_int(KNOT_EOK, ret, "ibufs: open OK");
ret = knot_tcp_send(test_sock, rls, CONNS, CONNS);
is_int(KNOT_EOK, ret, "ibufs: first send OK");
check_sent(0, 0, CONNS, 0);
for (int i = 0; i < CONNS; i++) {
msgs[i].flags = KNOT_XDP_MSG_TCP | KNOT_XDP_MSG_ACK;
@ -414,11 +446,13 @@ void test_ibufs_size(void)
// first connection will start a fragment buf then finish it
fix_seqack(&msgs[0]);
prepare_data(&msgs[0], "\x00\x0a""lorem", 7);
ret = knot_tcp_relay(test_sock, &msgs[0], 1, test_table, NULL, &relays, NULL);
ret = knot_tcp_recv(&rls[0], &msgs[0], 1, test_table, NULL);
is_int(KNOT_EOK, ret, "ibufs: must be OK");
ret = knot_tcp_send(test_sock, &rls[0], 1, 1);
is_int(KNOT_EOK, ret, "ibufs: must send OK");
check_sent(1, 0, 0, 0);
is_int(7, test_table->inbufs_total, "inbufs: first inbuf");
knot_tcp_relay_free(&relays);
knot_tcp_cleanup(test_table, &rls[0], 1);
// other connection will just store fragments
fix_seqacks(msgs, CONNS);
@ -426,22 +460,28 @@ void test_ibufs_size(void)
prepare_data(&msgs[1], "\x00\xff""12345", 7);
prepare_data(&msgs[2], "\xff\xff""abcde", 7);
prepare_data(&msgs[3], "\xff\xff""abcde", 7);
ret = knot_tcp_relay(test_sock, msgs, CONNS, test_table, NULL, &relays, NULL);
ret = knot_tcp_recv(rls, msgs, CONNS, test_table, NULL);
is_int(KNOT_EOK, ret, "inbufs: relay OK");
ret = knot_tcp_send(test_sock, rls, CONNS, CONNS);
is_int(KNOT_EOK, ret, "inbufs: send OK");
check_sent(CONNS, 0, 0, 0);
is_int(21, test_table->inbufs_total, "inbufs: after change");
is_int(1, relays.size, "inbufs: one relay");
is_int(10, knot_tcp_relay_dynarray_arr(&relays)[0].data.iov_len, "inbufs: data length");
knot_tcp_relay_free(&relays);
is_int(0, rls[1].action, "inbufs: one relay");
is_int(10, rls[0].inbufs[0].iov_len, "inbufs: data length");
knot_tcp_cleanup(test_table, rls, CONNS);
// now free some
uint32_t reset_count = 0, close_count = 0;
ret = knot_tcp_sweep(test_table, test_sock, UINT32_MAX, UINT32_MAX, UINT32_MAX,
0, 8, &close_count, &reset_count);
ret = knot_tcp_sweep(test_table, UINT32_MAX, UINT32_MAX, UINT32_MAX, UINT32_MAX,
test_table->inbufs_total - 8, UINT32_MAX, rls,
CONNS, &close_count, &reset_count);
is_int(KNOT_EOK, ret, "inbufs: timeout OK");
ret = knot_tcp_send(test_sock, rls, CONNS, CONNS);
is_int(KNOT_EOK, ret, "inbufs: timeout send OK");
check_sent(0, 2, 0, 0);
is_int(0, close_count, "inbufs: close count");
is_int(2, reset_count, "inbufs: reset count");
knot_tcp_cleanup(test_table, rls, CONNS);
is_int(7, test_table->inbufs_total, "inbufs: final state");
ok(NULL != tcp_table_find(test_table, &msgs[0]), "inbufs: first conn survived");
ok(NULL == tcp_table_find(test_table, &msgs[1]), "inbufs: second conn not survived");
@ -451,6 +491,68 @@ void test_ibufs_size(void)
clean_table();
}
void test_obufs(void)
{
knot_xdp_msg_t msg;
knot_tcp_relay_t rl;
prepare_msg(&msg, KNOT_XDP_MSG_SYN, 1, 2);
(void)knot_tcp_recv(&rl, &msg, 1, test_table, NULL); // SYN
(void)knot_tcp_send(test_sock, &rl, 1, 1); // SYN+ACK
prepare_msg(&msg, KNOT_XDP_MSG_ACK, 1, 2);
prepare_seqack(&msg, 0, 1);
(void)knot_tcp_recv(&rl, &msg, 1, test_table, NULL); // ACK
size_t TEST_MSS = 1111;
size_t DATA_LEN = 65535; // with 2-byte len prefix, this is > 64k == window_size
uint8_t *data = calloc(DATA_LEN, 1);
rl.conn->mss = TEST_MSS;
send2_mss = TEST_MSS;
int ret = knot_tcp_reply_data(&rl, test_table, data, DATA_LEN), i = 0;
is_int(KNOT_EOK, ret, "obufs: fill with data");
for (struct tcp_outbuf *ob = rl.conn->outbufs.bufs; ob != NULL; ob = ob->next, i++) {
if (ob->next == NULL) {
ok(ob->len > 0, "init last ob[%d]: non-trivial", i);
ok(ob->len <= TEST_MSS, "init last ob[%d]: fulfills MSS", i);
} else {
is_int(TEST_MSS, ob->len, "init ob[%d]: exactly MSS", i);
}
ok(!ob->sent, "init ob[%d]: not sent", i);
}
ret = knot_tcp_send(test_sock, &rl, 1, 20), i = 0;
is_int(KNOT_EOK, ret, "obufs: send OK");
is_int((DATA_LEN + 2) / TEST_MSS * TEST_MSS, sent2_data, "obufs: sent all but one MSS");
for (struct tcp_outbuf *ob = rl.conn->outbufs.bufs; ob != NULL; ob = ob->next, i++) {
if (ob->next == NULL) {
ok(!ob->sent, "last ob[%d]: not sent", i);
} else {
ok(ob->sent, "ob[%d]: sent", i);
if (ob->next->next != NULL) {
is_int(ob->seqno + ob->len, ob->next->seqno, "init ob[%d+1]: seqno", i);
}
}
}
knot_tcp_cleanup(test_table, &rl, 1);
prepare_seqack(&msg, 0, TEST_MSS);
ret = knot_tcp_recv(&rl, &msg, 1, test_table, NULL);
is_int(KNOT_EOK, ret, "obufs: ACKed data");
struct tcp_outbuf *surv_ob = rl.conn->outbufs.bufs;
ok(surv_ob != NULL, "obufs: unACKed survived");
ok(surv_ob->next == NULL, "obufs: just one survived");
ok(!surv_ob->sent, "obufs: survivor not sent");
ret = knot_tcp_send(test_sock, &rl, 1, 20);
is_int(KNOT_EOK, ret, "obufs: send rest OK");
is_int(DATA_LEN + 2, sent2_data, "obufs: sent all");
ok(surv_ob->sent, "obufs: survivor sent");
is_int(sent_seqno, surv_ob->seqno, "obufs: survivor seqno");
knot_tcp_cleanup(test_table, &rl, 1);
clean_table();
free(data);
}
static void init_mock(knot_xdp_socket_t **socket, void *send_mock)
{
*socket = calloc(1, sizeof(**socket));
@ -481,6 +583,10 @@ int main(int argc, char *argv[])
init_mock(&test_sock, mock_send_nocheck);
test_many();
knot_xdp_deinit(test_sock);
init_mock(&test_sock, mock_send2);
test_obufs();
knot_xdp_deinit(test_sock);
knot_tcp_table_free(test_table);