diff --git a/doc/man/knot.conf.5in b/doc/man/knot.conf.5in index f727f76c7..6abffe390 100644 --- a/doc/man/knot.conf.5in +++ b/doc/man/knot.conf.5in @@ -496,6 +496,7 @@ xdp: tcp\-inbuf\-max\-size: SIZE tcp\-idle\-close\-timeout: TIME tcp\-idle\-reset\-timeout: TIME + tcp\-resend\-timeout: TIME route\-check: BOOL .ft P .fi @@ -611,6 +612,14 @@ Time in seconds, after which any idle connection is forcibly closed. \fIMinimum:\fP 1 s .sp \fIDefault:\fP 20 s +.SS tcp\-resend\-timeout +.sp +Resend outgoing data packets (with DNS response payload) if not ACKed +before this timeout. +.sp +\fIMinimum:\fP 1 s +.sp +\fIDefault:\fP 5 s .SS route\-check .sp If enabled, routing information from the operating system is considered diff --git a/doc/reference.rst b/doc/reference.rst index 04ca5c4e0..c38a44b84 100644 --- a/doc/reference.rst +++ b/doc/reference.rst @@ -524,6 +524,7 @@ Various options related to XDP listening, especially TCP. tcp-inbuf-max-size: SIZE tcp-idle-close-timeout: TIME tcp-idle-reset-timeout: TIME + tcp-resend-timeout: TIME route-check: BOOL .. CAUTION:: @@ -632,6 +633,18 @@ Time in seconds, after which any idle connection is forcibly closed. *Default:* 20 s +.. _xdp_tcp-resend-timeout: + +tcp-resend-timeout +------------------ + +Resend outgoing data packets (with DNS response payload) if not ACKed +before this timeout. + +*Minimum:* 1 s + +*Default:* 5 s + .. _xdp_route-check: route-check diff --git a/src/knot/conf/base.c b/src/knot/conf/base.c index e480e4408..ed43bfb3d 100644 --- a/src/knot/conf/base.c +++ b/src/knot/conf/base.c @@ -201,6 +201,9 @@ static void init_cache( val = conf_get(conf, C_XDP, C_TCP_IDLE_RESET); conf->cache.xdp_tcp_idle_reset = conf_int(&val); + val = conf_get(conf, C_XDP, C_TCP_RESEND); + conf->cache.xdp_tcp_idle_resend = conf_int(&val); + conf->cache.xdp_tcp = running_xdp_tcp; conf->cache.xdp_route_check = running_route_check; diff --git a/src/knot/conf/base.h b/src/knot/conf/base.h index 939a78319..76bc61360 100644 --- a/src/knot/conf/base.h +++ b/src/knot/conf/base.h @@ -136,6 +136,7 @@ typedef struct { size_t xdp_tcp_inbuf_max_size; uint32_t xdp_tcp_idle_close; uint32_t xdp_tcp_idle_reset; + uint32_t xdp_tcp_idle_resend; bool xdp_tcp; bool xdp_route_check; int ctl_timeout; diff --git a/src/knot/conf/schema.c b/src/knot/conf/schema.c index 90eee60d4..c27261d42 100644 --- a/src/knot/conf/schema.c +++ b/src/knot/conf/schema.c @@ -246,6 +246,7 @@ static const yp_item_t desc_xdp[] = { { C_TCP_INBUF_MAX_SIZE, YP_TINT, YP_VINT = { MEGA(1), SSIZE_MAX, MEGA(100), YP_SSIZE } }, { C_TCP_IDLE_CLOSE, YP_TINT, YP_VINT = { 1, INT32_MAX, 10, YP_STIME } }, { C_TCP_IDLE_RESET, YP_TINT, YP_VINT = { 1, INT32_MAX, 20, YP_STIME } }, + { C_TCP_RESEND, YP_TINT, YP_VINT = { 1, INT32_MAX, 5, YP_STIME } }, { C_ROUTE_CHECK, YP_TBOOL, YP_VNONE }, { NULL } }; diff --git a/src/knot/conf/schema.h b/src/knot/conf/schema.h index 99b532947..bee119a7e 100644 --- a/src/knot/conf/schema.h +++ b/src/knot/conf/schema.h @@ -130,6 +130,7 @@ #define C_TCP_INBUF_MAX_SIZE "\x12""tcp-inbuf-max-size" #define C_TCP_IO_TIMEOUT "\x0E""tcp-io-timeout" #define C_TCP_MAX_CLIENTS "\x0F""tcp-max-clients" +#define C_TCP_RESEND "\x12""tcp-resend-timeout" #define C_TCP_REUSEPORT "\x0D""tcp-reuseport" #define C_TCP_RMT_IO_TIMEOUT "\x15""tcp-remote-io-timeout" #define C_TCP_WORKERS "\x0B""tcp-workers" diff --git a/src/knot/nameserver/axfr.c b/src/knot/nameserver/axfr.c index 5f46a4c0c..45ab976cf 100644 --- a/src/knot/nameserver/axfr.c +++ b/src/knot/nameserver/axfr.c @@ -200,9 +200,6 @@ int axfr_process_query(knot_pkt_t *pkt, knotd_qdata_t *qdata) knot_strerror(ret)); return KNOT_STATE_FAIL; } - } else if (qdata->params->xdp_msg != NULL) { - qdata->rcode = KNOT_RCODE_SERVFAIL; - return KNOT_STATE_FAIL; } /* Reserve space for TSIG. */ diff --git a/src/knot/nameserver/ixfr.c b/src/knot/nameserver/ixfr.c index 98c01b0eb..1491e20f5 100644 --- a/src/knot/nameserver/ixfr.c +++ b/src/knot/nameserver/ixfr.c @@ -292,9 +292,6 @@ int ixfr_process_query(knot_pkt_t *pkt, knotd_qdata_t *qdata) knot_strerror(ret)); return KNOT_STATE_FAIL; } - } else if (qdata->params->xdp_msg != NULL) { - qdata->rcode = KNOT_RCODE_SERVFAIL; - return KNOT_STATE_FAIL; } /* Reserve space for TSIG. */ diff --git a/src/knot/server/xdp-handler.c b/src/knot/server/xdp-handler.c index 5136aa117..73f4c2b45 100644 --- a/src/knot/server/xdp-handler.c +++ b/src/knot/server/xdp-handler.c @@ -32,7 +32,7 @@ typedef struct xdp_handle_ctx { knot_xdp_socket_t *sock; knot_xdp_msg_t msg_recv[XDP_BATCHLEN]; knot_xdp_msg_t msg_send_udp[XDP_BATCHLEN]; - knot_tcp_relay_dynarray_t tcp_relays; + knot_tcp_relay_t relays[XDP_BATCHLEN]; uint32_t msg_recv_count; uint32_t msg_udp_count; knot_tcp_table_t *tcp_table; @@ -40,8 +40,10 @@ typedef struct xdp_handle_ctx { bool tcp; size_t tcp_max_conns; size_t tcp_max_inbufs; + size_t tcp_max_obufs; uint32_t tcp_idle_close; // In microseconds. uint32_t tcp_idle_reset; // In microseconds. + uint32_t tcp_idle_resend; } xdp_handle_ctx_t; static bool udp_state_active(int state) @@ -66,8 +68,10 @@ void xdp_handle_reconfigure(xdp_handle_ctx_t *ctx) ctx->tcp = pconf->cache.xdp_tcp; ctx->tcp_max_conns = pconf->cache.xdp_tcp_max_clients / pconf->cache.srv_xdp_threads; ctx->tcp_max_inbufs = pconf->cache.xdp_tcp_inbuf_max_size / pconf->cache.srv_xdp_threads; + ctx->tcp_max_obufs = pconf->cache.xdp_tcp_inbuf_max_size / pconf->cache.srv_xdp_threads; // FIXME another setting for outbuf!! ctx->tcp_idle_close = pconf->cache.xdp_tcp_idle_close * 1000000; ctx->tcp_idle_reset = pconf->cache.xdp_tcp_idle_reset * 1000000; + ctx->tcp_idle_resend= pconf->cache.xdp_tcp_idle_resend * 1000000; rcu_read_unlock(); } @@ -181,50 +185,37 @@ static void handle_udp(xdp_handle_ctx_t *ctx, knot_layer_t *layer, static void handle_tcp(xdp_handle_ctx_t *ctx, knot_layer_t *layer, knotd_qdata_params_t *params) { - uint32_t ack_errors = 0; - int ret = knot_tcp_relay(ctx->sock, ctx->msg_recv, ctx->msg_recv_count, - ctx->tcp_table, NULL, &ctx->tcp_relays, &ack_errors); + int ret = knot_tcp_recv(ctx->relays, ctx->msg_recv, ctx->msg_recv_count, ctx->tcp_table, NULL); if (ret != KNOT_EOK) { log_notice("TCP, failed to process some packets (%s)", knot_strerror(ret)); return; - } else if (ctx->tcp_relays.size == 0) { + } else if (knot_tcp_relay_empty(&ctx->relays[0])) { // no TCP traffic return; - } else if (ack_errors > 0) { - log_notice("TCP, failed to send some ACK packets"); } uint8_t ans_buf[KNOT_WIRE_MAX_PKTSIZE]; - // Note dynaray_foreach can't be used as we insert into the dynarray inside the loop. - for (int n_tcp_relays = ctx->tcp_relays.size, rli = 0; rli < n_tcp_relays; rli++) { - knot_tcp_relay_t *rl = knot_tcp_relay_dynarray_arr(&ctx->tcp_relays) + rli; - if ((rl->action & XDP_TCP_DATA) == 0 || rl->answer != XDP_TCP_NOOP) { - continue; - } + for (uint32_t i = 0; i < ctx->msg_recv_count; i++) { + knot_tcp_relay_t *rl = &ctx->relays[i]; - // Consume the query. - handle_init(params, layer, rl->msg, &rl->data); + for (size_t j = 0; j < rl->inbufs_count; j++) { + // Consume the query. + handle_init(params, layer, rl->msg, &rl->inbufs[j]); - // Process the reply. - knot_pkt_t *ans = knot_pkt_new(ans_buf, sizeof(ans_buf), layer->mm); - while (tcp_active_state(layer->state)) { - knot_layer_produce(layer, ans); - if (!tcp_send_state(layer->state)) { - continue; + // Process the reply. + knot_pkt_t *ans = knot_pkt_new(ans_buf, sizeof(ans_buf), layer->mm); + while (tcp_active_state(layer->state)) { + knot_layer_produce(layer, ans); + if (!tcp_send_state(layer->state)) { + continue; + } + + (void)knot_tcp_reply_data(rl, ctx->tcp_table, ans->wire, ans->size); + // ignore unprobable ENOMEM here } - ret = knot_tcp_relay_answer(&ctx->tcp_relays, rl, - ans->wire, ans->size); - if (ret != KNOT_EOK) { - char addr[SOCKADDR_STRLEN]; - sockaddr_tostr(addr, sizeof(addr), params->remote); - log_notice("TCP, failed to reply, address %s (%s)", - addr, knot_strerror(ret)); - layer->state = KNOT_STATE_FAIL; - } + handle_finish(layer); } - - handle_finish(layer); } } @@ -254,8 +245,7 @@ void xdp_handle_send(xdp_handle_ctx_t *ctx) uint32_t unused; (void)knot_xdp_send(ctx->sock, ctx->msg_send_udp, ctx->msg_udp_count, &unused); if (ctx->tcp) { - int ret = knot_tcp_send(ctx->sock, knot_tcp_relay_dynarray_arr(&ctx->tcp_relays), - ctx->tcp_relays.size); + int ret = knot_tcp_send(ctx->sock, ctx->relays, ctx->msg_recv_count, XDP_BATCHLEN); if (ret != KNOT_EOK) { log_notice("TCP, failed to send some packets"); } @@ -263,18 +253,10 @@ void xdp_handle_send(xdp_handle_ctx_t *ctx) (void)knot_xdp_send_finish(ctx->sock); if (ctx->tcp) { - knot_tcp_relay_free(&ctx->tcp_relays); + knot_tcp_cleanup(ctx->tcp_table, ctx->relays, ctx->msg_recv_count); } } -static size_t overweight(size_t weight, size_t max_weight) -{ - int64_t w = weight; - w -= max_weight; - w = MAX(w, 0); - return w; -} - void xdp_handle_sweep(xdp_handle_ctx_t *ctx) { if (!ctx->tcp) { @@ -284,13 +266,17 @@ void xdp_handle_sweep(xdp_handle_ctx_t *ctx) uint32_t prev_reset; uint32_t total_reset = 0, total_close = 0; int ret = KNOT_EOK; + knot_tcp_relay_t sweep_relays[XDP_BATCHLEN]; do { prev_reset = total_reset; - ret = knot_tcp_sweep(ctx->tcp_table, ctx->sock, 20, - ctx->tcp_idle_close, ctx->tcp_idle_reset, - overweight(ctx->tcp_table->usage, ctx->tcp_max_conns), - overweight(ctx->tcp_table->inbufs_total, ctx->tcp_max_inbufs), - &total_close, &total_reset); + ret = knot_tcp_sweep(ctx->tcp_table, ctx->tcp_idle_close, ctx->tcp_idle_reset, + ctx->tcp_idle_resend, + ctx->tcp_max_conns, ctx->tcp_max_inbufs, ctx->tcp_max_obufs, + sweep_relays, XDP_BATCHLEN, &total_close, &total_reset); + if (ret == KNOT_EOK) { + ret = knot_tcp_send(ctx->sock, sweep_relays, XDP_BATCHLEN, XDP_BATCHLEN); + } + knot_tcp_cleanup(ctx->tcp_table, sweep_relays, XDP_BATCHLEN); } while (ret == KNOT_EOK && prev_reset < total_reset); if (total_close > 0 || total_reset > 0) { diff --git a/src/libknot/xdp/tcp.c b/src/libknot/xdp/tcp.c index 3a0f0fd38..94e78c47e 100644 --- a/src/libknot/xdp/tcp.c +++ b/src/libknot/xdp/tcp.c @@ -20,9 +20,9 @@ #include #include "libknot/xdp/tcp.h" +#include "libknot/xdp/tcp_iobuf.h" #include "libknot/attribute.h" #include "libknot/error.h" -#include "libknot/xdp/tcp_iobuf.h" #include "libdnssec/random.h" #include "contrib/macros.h" #include "contrib/openbsd/siphash.h" @@ -110,8 +110,6 @@ static knot_tcp_conn_t **tcp_table_lookup(const struct sockaddr_in6 *rem, while (*res != NULL) { if (memcmp(&(*res)->ip_rem, rem, sdl) == 0 && memcmp(&(*res)->ip_loc, loc, sdl) == 0) { - rem_node(tcp_conn_node(*res)); - add_tail(tcp_table_timeout(table), tcp_conn_node(*res)); break; } res = &(*res)->next; @@ -119,6 +117,16 @@ static knot_tcp_conn_t **tcp_table_lookup(const struct sockaddr_in6 *rem, return res; } +static knot_tcp_conn_t **tcp_table_re_lookup(knot_tcp_conn_t *conn, + knot_tcp_table_t *table) +{ + uint64_t unused_hash; + knot_tcp_conn_t **res = tcp_table_lookup(&conn->ip_rem, &conn->ip_loc, + &unused_hash, table); + assert(*res == conn); + return res; +} + static void tcp_table_del_conn(knot_tcp_conn_t **todel) { knot_tcp_conn_t *conn = *todel; @@ -126,6 +134,7 @@ static void tcp_table_del_conn(knot_tcp_conn_t **todel) *todel = conn->next; // remove from conn-table linked list rem_node(tcp_conn_node(conn)); // remove from timeout double-linked list free(conn->inbuf.iov_base); + free(conn->outbufs.bufs); free(conn); } } @@ -138,16 +147,6 @@ static void tcp_table_del(knot_tcp_conn_t **todel, knot_tcp_table_t *table) table->usage--; } -static void tcp_table_del_lookup(knot_tcp_conn_t *todel, knot_tcp_table_t *table) -{ - // re-lookup is needed to find the **pointer in the table - uint64_t unused_hash; - knot_tcp_conn_t **pconn = tcp_table_lookup(&todel->ip_rem, &todel->ip_loc, - &unused_hash, table); - assert(*pconn == todel); - tcp_table_del(pconn, table); -} - // WARNING you shall ensure that it's not in the table already! static int tcp_table_add(knot_xdp_msg_t *msg, uint64_t hash, knot_tcp_table_t *table, knot_tcp_conn_t **res) @@ -168,11 +167,14 @@ static int tcp_table_add(knot_xdp_msg_t *msg, uint64_t hash, knot_tcp_table_t *t c->ackno = msg->ackno; c->acked = msg->ackno; + c->window_size = 65536; // FIXME + c->last_active = get_timestamp(); add_tail(tcp_table_timeout(table), tcp_conn_node(c)); c->state = XDP_TCP_NORMAL; memset(&c->inbuf, 0, sizeof(c->inbuf)); + memset(&c->outbufs, 0, sizeof(c->outbufs)); c->next = *addto; *addto = c; @@ -182,8 +184,6 @@ static int tcp_table_add(knot_xdp_msg_t *msg, uint64_t hash, knot_tcp_table_t *t return KNOT_EOK; } -knot_dynarray_define(knot_tcp_relay, knot_tcp_relay_t, DYNARRAY_VISIBILITY_PUBLIC) - static bool check_seq_ack(const knot_xdp_msg_t *msg, const knot_tcp_conn_t *conn) { if (conn == NULL || conn->seqno != msg->seqno) { @@ -198,88 +198,56 @@ static bool check_seq_ack(const knot_xdp_msg_t *msg, const knot_tcp_conn_t *conn } _public_ -int knot_tcp_relay(knot_xdp_socket_t *socket, knot_xdp_msg_t msgs[], uint32_t msg_count, - knot_tcp_table_t *tcp_table, knot_tcp_table_t *syn_table, - knot_tcp_relay_dynarray_t *relays, uint32_t *ack_errors) +int knot_tcp_recv(knot_tcp_relay_t *relays, knot_xdp_msg_t *msgs, uint32_t count, + knot_tcp_table_t *tcp_table, knot_tcp_table_t *syn_table) { - if (msg_count == 0) { + if (count == 0) { return KNOT_EOK; } - if (socket == NULL || msgs == NULL || tcp_table == NULL || relays == NULL) { + if (relays == NULL || msgs == NULL || tcp_table == NULL) { return KNOT_EINVAL; } + memset(relays, 0, count * sizeof(*relays)); - knot_xdp_send_prepare(socket); - - knot_xdp_msg_t acks[msg_count]; - uint32_t n_acks = 0; + knot_tcp_relay_t *relay = relays; int ret = KNOT_EOK; -#define resp_ack(msg, flag) \ - { \ - knot_xdp_msg_t *ack = &acks[n_acks++]; \ - int ackret = knot_xdp_reply_alloc(socket, (msg), ack); \ - if (ackret != KNOT_EOK) { \ - if (ack_errors != NULL) (*ack_errors)++; \ - n_acks--; \ - continue; \ - } \ - ack->payload.iov_len = 0; \ - ack->flags |= (flag); \ - } - - for (size_t i = 0; i < msg_count && ret == KNOT_EOK; i++) { - knot_xdp_msg_t *msg = &msgs[i]; + for (knot_xdp_msg_t *msg = msgs; msg != msgs + count && ret == KNOT_EOK; msg++) { if (!(msg->flags & KNOT_XDP_MSG_TCP)) { continue; } uint64_t conn_hash; - knot_tcp_conn_t **conn = tcp_table_lookup(&msg->ip_from, &msg->ip_to, - &conn_hash, tcp_table); - bool seq_ack_match = check_seq_ack(msg, *conn); + knot_tcp_conn_t **pconn = tcp_table_lookup(&msg->ip_from, &msg->ip_to, + &conn_hash, tcp_table); + knot_tcp_conn_t *conn = *pconn; + bool seq_ack_match = check_seq_ack(msg, conn); if (seq_ack_match) { - assert((*conn)->mss != 0); - (*conn)->seqno = knot_tcp_next_seqno(msg); - memcpy((*conn)->last_eth_rem, msg->eth_from, sizeof((*conn)->last_eth_rem)); - memcpy((*conn)->last_eth_loc, msg->eth_to, sizeof((*conn)->last_eth_loc)); - (*conn)->last_active = get_timestamp(); + assert(conn->mss != 0); + conn->seqno = knot_tcp_next_seqno(msg); + memcpy(conn->last_eth_rem, msg->eth_from, sizeof(conn->last_eth_rem)); + memcpy(conn->last_eth_loc, msg->eth_to, sizeof(conn->last_eth_loc)); + + conn->last_active = get_timestamp(); + rem_node(tcp_conn_node(conn)); + add_tail(tcp_table_timeout(tcp_table), tcp_conn_node(conn)); + if (msg->flags & KNOT_XDP_MSG_ACK) { - (*conn)->acked = msg->ackno; + conn->acked = msg->ackno; + tcp_outbufs_ack(&conn->outbufs, msg->ackno, &tcp_table->outbufs_total); } } - knot_tcp_relay_t relay = { .msg = msg, .conn = *conn }; + relay->msg = msg; + relay->conn = conn; // process incoming data if (seq_ack_match && (msg->flags & KNOT_XDP_MSG_ACK) && msg->payload.iov_len > 0) { - resp_ack(msg, KNOT_XDP_MSG_ACK); - relay.action = XDP_TCP_DATA; - - struct iovec msg_payload = msg->payload, tofree; - ret = tcp_inbuf_update(&(*conn)->inbuf, &msg_payload, - &tofree, &tcp_table->inbufs_total); - - if (tofree.iov_len > 0 && ret == KNOT_EOK) { - relay.data.iov_base = tofree.iov_base + sizeof(uint16_t); - relay.data.iov_len = tofree.iov_len - sizeof(uint16_t); - relay.free_data = XDP_TCP_FREE_PREFIX; - if (knot_tcp_relay_dynarray_add(relays, &relay) == NULL) { - ret = KNOT_ENOMEM; - } - relay.free_data = XDP_TCP_FREE_NONE; - } - while (msg_payload.iov_len > 0 && ret == KNOT_EOK) { - size_t dns_len = tcp_payload_len(&msg_payload); - assert(dns_len >= msg_payload.iov_len); - relay.data.iov_base = msg_payload.iov_base + sizeof(uint16_t); - relay.data.iov_len = dns_len - sizeof(uint16_t); - if (knot_tcp_relay_dynarray_add(relays, &relay) == NULL) { - ret = KNOT_ENOMEM; - } - - msg_payload.iov_base += dns_len; - msg_payload.iov_len -= dns_len; + relay->auto_answer = KNOT_XDP_MSG_ACK; + ret = tcp_inbuf_update(&conn->inbuf, msg->payload, &relay->inbufs, + &relay->inbufs_count, &tcp_table->inbufs_total); + if (ret != KNOT_EOK) { + break; } } @@ -288,165 +256,162 @@ int knot_tcp_relay(knot_xdp_socket_t *socket, knot_xdp_msg_t msgs[], uint32_t ms KNOT_XDP_MSG_FIN | KNOT_XDP_MSG_RST)) { case KNOT_XDP_MSG_SYN: case (KNOT_XDP_MSG_SYN | KNOT_XDP_MSG_ACK): - if (*conn == NULL) { + if (conn == NULL) { bool synack = (msg->flags & KNOT_XDP_MSG_ACK); - resp_ack(msg, synack ? KNOT_XDP_MSG_ACK : - (KNOT_XDP_MSG_SYN | KNOT_XDP_MSG_ACK)); - relay.action = synack ? XDP_TCP_ESTABLISH : XDP_TCP_SYN; + ret = tcp_table_add(msg, conn_hash, (syn_table == NULL || synack) ? tcp_table : syn_table, - &relay.conn); - if (knot_tcp_relay_dynarray_add(relays, &relay) == NULL) { - ret = KNOT_ENOMEM; - } + &relay->conn); if (ret == KNOT_EOK) { - relay.conn->state = XDP_TCP_ESTABLISHING; - relay.conn->seqno++; - relay.conn->mss = MAX(msg->mss, 536); // minimal MSS, most importantly not zero! - relay.conn->acked = acks[n_acks - 1].seqno; - relay.conn->ackno = relay.conn->acked + (synack ? 0 : 1); + relay->action = synack ? XDP_TCP_ESTABLISH : XDP_TCP_SYN; + relay->auto_answer = synack ? KNOT_XDP_MSG_ACK : (KNOT_XDP_MSG_SYN | KNOT_XDP_MSG_ACK); + + conn = relay->conn; + conn->state = XDP_TCP_ESTABLISHING; + conn->seqno++; + conn->mss = MAX(msg->mss, 536); // minimal MSS, most importantly not zero! + if (!synack) { + conn->acked = dnssec_random_uint32_t(); + conn->ackno = conn->acked; + } } } else { - resp_ack(msg, KNOT_XDP_MSG_RST); // TODO consider resetting the OLD conn and accepting new one + relay->auto_answer = KNOT_XDP_MSG_RST; // TODO consider resetting the OLD conn and accepting new one + relay->del_from = pconn; } break; case KNOT_XDP_MSG_ACK: if (!seq_ack_match) { uint64_t syn_hash; if (syn_table != NULL && msg->payload.iov_len == 0 && - *(conn = tcp_table_lookup(&msg->ip_from, &msg->ip_to, &syn_hash, syn_table)) != NULL && - check_seq_ack(msg, *conn)) { - tcp_table_del(conn, syn_table); - *conn = NULL; - relay.action = XDP_TCP_ESTABLISH; - ret = tcp_table_add(msg, conn_hash, tcp_table, &relay.conn); - if (ret == KNOT_EOK && knot_tcp_relay_dynarray_add(relays, &relay) == NULL) { - ret = KNOT_ENOMEM; - } + *(pconn = tcp_table_lookup(&msg->ip_from, &msg->ip_to, &syn_hash, syn_table)) != NULL && + check_seq_ack(msg, (conn = *pconn))) { + tcp_table_del(pconn, syn_table); + *pconn = NULL; + relay->action = XDP_TCP_ESTABLISH; + ret = tcp_table_add(msg, conn_hash, tcp_table, &relay->conn); } - // unmatching ACK is ignored, this includes: - // - incoming out-of-order data - // - ACK of some previous part of outgoing data } else { - switch ((*conn)->state) { + switch (conn->state) { case XDP_TCP_NORMAL: case XDP_TCP_CLOSING1: // just a mess, ignore break; case XDP_TCP_ESTABLISHING: - (*conn)->state = XDP_TCP_NORMAL; + conn->state = XDP_TCP_NORMAL; break; case XDP_TCP_CLOSING2: - tcp_table_del(conn, tcp_table); + tcp_table_del(pconn, tcp_table); + relay->conn = NULL; break; } } - break; // sole ACK without PSH is ignored + break; case (KNOT_XDP_MSG_FIN | KNOT_XDP_MSG_ACK): if (!seq_ack_match) { - resp_ack(msg, KNOT_XDP_MSG_RST); + if (conn != NULL) { + relay->auto_answer = KNOT_XDP_MSG_RST; + relay->del_from = pconn; + } // else ignore. It would be better and possible, but no big value for the price of CPU. } else { - if ((*conn)->state == XDP_TCP_CLOSING1) { - resp_ack(msg, KNOT_XDP_MSG_ACK); - relay.action = XDP_TCP_CLOSE; - if (knot_tcp_relay_dynarray_add(relays, &relay) == NULL) { - ret = KNOT_ENOMEM; - } - tcp_table_del(conn, tcp_table); + if (conn->state == XDP_TCP_CLOSING1) { + relay->action = XDP_TCP_CLOSE; + relay->auto_answer = KNOT_XDP_MSG_ACK; + relay->del_from = pconn; } else if (msg->payload.iov_len == 0) { // otherwise ignore FIN - resp_ack(msg, KNOT_XDP_MSG_FIN | KNOT_XDP_MSG_ACK); - relay.action = XDP_TCP_CLOSE; - if (knot_tcp_relay_dynarray_add(relays, &relay) == NULL) { - ret = KNOT_ENOMEM; - } - (*conn)->state = XDP_TCP_CLOSING2; - (*conn)->ackno++; + relay->action = XDP_TCP_CLOSE; + relay->auto_answer = KNOT_XDP_MSG_FIN | KNOT_XDP_MSG_ACK; + conn->state = XDP_TCP_CLOSING2; } } break; case KNOT_XDP_MSG_RST: if (conn != NULL && msg->seqno == conn->seqno) { - relay.action = XDP_TCP_RESET; - if (knot_tcp_relay_dynarray_add(relays, &relay) == NULL) { - ret = KNOT_ENOMEM; - } - tcp_table_del(conn, tcp_table); + relay->action = XDP_TCP_RESET; + tcp_table_del(pconn, tcp_table); + relay->conn = NULL; } break; default: break; } - } - if (n_acks > 0 && ret == KNOT_EOK) { - uint32_t sent_unused; - (void)knot_xdp_send(socket, acks, n_acks, &sent_unused); - (void)knot_xdp_send_finish(socket); + if (!knot_tcp_relay_empty(relay)) { + relay++; + } } -#undef resp_ack - return ret; } _public_ -int knot_tcp_relay_answer(knot_tcp_relay_dynarray_t *relays, const knot_tcp_relay_t *relay, - void *data, size_t data_len) +int knot_tcp_reply_data(knot_tcp_relay_t *relay, knot_tcp_table_t *tcp_table, + uint8_t *data, size_t len) { - if (relays == NULL || relay == NULL || data == NULL) { + if (relay == NULL || tcp_table == NULL || relay->conn == NULL) { return KNOT_EINVAL; } - - assert(data_len <= UINT16_MAX); - uint16_t prefix = htobe16(data_len); -#define PREFIX_LEN (prefix == 0 ? 0 : sizeof(prefix)) - - while (data_len > 0) { - knot_tcp_relay_t *clone = knot_tcp_relay_dynarray_add(relays, relay); - if (clone == NULL) { - return KNOT_ENOMEM; - } - - size_t chunk = MIN(data_len + PREFIX_LEN, relay->conn->mss); - assert(chunk >= PREFIX_LEN); - - clone->data.iov_base = malloc(chunk); - if (clone->data.iov_base == NULL) { - return KNOT_ENOMEM; - } - clone->data.iov_len = chunk; - - memcpy(clone->data.iov_base, &prefix, PREFIX_LEN); - chunk -= PREFIX_LEN; - - memcpy(clone->data.iov_base + PREFIX_LEN, data, chunk); - clone->answer = XDP_TCP_ANSWER | XDP_TCP_DATA; - clone->free_data = XDP_TCP_FREE_DATA; - - data += chunk; - data_len -= chunk; - prefix = 0; - } - return KNOT_EOK; + return tcp_outbufs_add(&relay->conn->outbufs, data, len, + relay->conn->mss, &tcp_table->outbufs_total); } -_public_ -void knot_tcp_relay_free(knot_tcp_relay_dynarray_t *relays) +static knot_xdp_msg_t *first_msg(knot_xdp_msg_t *msgs, uint32_t n_msgs) { - if (relays == NULL) { - return; + memset(msgs, 0, n_msgs * sizeof(*msgs)); + return msgs - 1; // will be incremented just before first use +} + +static int send_msgs(knot_xdp_msg_t *msgs, uint32_t n_msgs, knot_xdp_socket_t *socket) +{ + uint32_t sent = 0; + if (n_msgs == 0) { + return KNOT_EOK; } - knot_dynarray_foreach(knot_tcp_relay, knot_tcp_relay_t, i, *relays) { - if (i->free_data != XDP_TCP_FREE_NONE) { - free(i->data.iov_base - - (i->free_data == XDP_TCP_FREE_PREFIX ? sizeof(uint16_t) : 0)); - } + int ret = knot_xdp_send(socket, msgs, n_msgs, &sent); + if (ret != KNOT_EOK) { + printf("TCP send[%u/%u]: %s\n", sent, n_msgs, knot_strerror(ret)); } - knot_tcp_relay_dynarray_free(relays); + return KNOT_EOK; // ignore errcode from send +} + +static int next_msg(knot_xdp_msg_t *msgs, uint32_t n_msgs, knot_xdp_msg_t **cur, + knot_xdp_socket_t *socket, knot_tcp_relay_t *rl) +{ + (*cur)++; + if (*cur - msgs >= n_msgs) { + (void)send_msgs(msgs, n_msgs, socket); + *cur = first_msg(msgs, n_msgs); + (*cur)++; + } + + knot_xdp_msg_t *msg = *cur; + + knot_xdp_msg_flag_t fl = KNOT_XDP_MSG_TCP; + if (rl->conn->ip_loc.sin6_family == AF_INET6) { + fl |= KNOT_XDP_MSG_IPV6; + } + + int ret = knot_xdp_send_alloc(socket, fl, msg); + if (ret != KNOT_EOK) { + return ret; + } + + memcpy( msg->eth_from, rl->conn->last_eth_loc, sizeof(msg->eth_from)); + memcpy( msg->eth_to, rl->conn->last_eth_rem, sizeof(msg->eth_to)); + memcpy(&msg->ip_from, &rl->conn->ip_loc, sizeof(msg->ip_from)); + memcpy(&msg->ip_to, &rl->conn->ip_rem, sizeof(msg->ip_to)); + + msg->ackno = rl->conn->seqno; + msg->seqno = rl->conn->ackno; + + msg->payload.iov_len = 0; + + return ret; } _public_ -int knot_tcp_send(knot_xdp_socket_t *socket, knot_tcp_relay_t relays[], uint32_t relay_count) +int knot_tcp_send(knot_xdp_socket_t *socket, knot_tcp_relay_t relays[], uint32_t relay_count, + uint32_t max_at_once) { if (relay_count == 0) { return KNOT_EOK; @@ -455,140 +420,145 @@ int knot_tcp_send(knot_xdp_socket_t *socket, knot_tcp_relay_t relays[], uint32_t return KNOT_EINVAL; } - knot_xdp_msg_t msgs[relay_count], *msg = &msgs[0]; - int ret = KNOT_EOK, n_msgs = 0; + knot_xdp_send_prepare(socket); - for (size_t irl = 0; irl < relay_count; irl++) { - knot_tcp_relay_t *rl = &relays[irl]; - if ((rl->answer & 0x0f) == XDP_TCP_NOOP) { - continue; + knot_xdp_msg_t msgs[max_at_once], *first = first_msg(msgs, max_at_once), *msg = first; + int ret = KNOT_EOK; + + for (uint32_t i = 0; i < relay_count; i++) { + knot_tcp_relay_t *rl = &relays[i]; + +#define NEXT_MSG if ((ret = next_msg(msgs, max_at_once, &msg, socket, rl)) != KNOT_EOK) { return ret; } + + if (rl->auto_answer != 0) { + NEXT_MSG + msg->flags |= rl->auto_answer; + if (msg->flags & (KNOT_XDP_MSG_SYN | KNOT_XDP_MSG_FIN)) { + rl->conn->ackno++; + } } - knot_xdp_msg_flag_t fl = KNOT_XDP_MSG_TCP; - if (rl->conn->ip_loc.sin6_family == AF_INET6) { - fl |= KNOT_XDP_MSG_IPV6; - } - - ret = knot_xdp_send_alloc(socket, fl, msg); - if (ret != KNOT_EOK) { - break; - } - - memcpy( msg->eth_from, rl->conn->last_eth_loc, sizeof(msg->eth_from)); - memcpy( msg->eth_to, rl->conn->last_eth_rem, sizeof(msg->eth_to)); - memcpy(&msg->ip_from, &rl->conn->ip_loc, sizeof(msg->ip_from)); - memcpy(&msg->ip_to, &rl->conn->ip_rem, sizeof(msg->ip_to)); - - msg->ackno = rl->conn->seqno; - msg->seqno = rl->conn->ackno; - switch (rl->answer & 0x0f) { case XDP_TCP_ESTABLISH: + NEXT_MSG msg->flags |= KNOT_XDP_MSG_SYN; - msg->payload.iov_len = 0; - break; - case XDP_TCP_DATA: - msg->flags |= KNOT_XDP_MSG_ACK; - if (rl->data.iov_len > UINT16_MAX || - rl->data.iov_len > msg->payload.iov_len) { - ret = KNOT_ESPACE; - } else { - memcpy(msg->payload.iov_base, rl->data.iov_base, - rl->data.iov_len); - msg->payload.iov_len = rl->data.iov_len; - } - assert(rl->conn != NULL); - rl->conn->ackno += msg->payload.iov_len; + rl->conn->ackno++; break; case XDP_TCP_CLOSE: + NEXT_MSG msg->flags |= (KNOT_XDP_MSG_FIN | KNOT_XDP_MSG_ACK); - msg->payload.iov_len = 0; - assert(rl->conn != NULL); rl->conn->ackno++; rl->conn->state = XDP_TCP_CLOSING1; break; case XDP_TCP_RESET: - default: + NEXT_MSG msg->flags |= KNOT_XDP_MSG_RST; - msg->payload.iov_len = 0; + break; + case XDP_TCP_NOOP: + default: break; } - msg++; - n_msgs++; - if (ret != KNOT_EOK) { - break; + size_t can_data = 0; + struct tcp_outbuf *ob; + if (rl->conn != NULL) { + tcp_outbufs_can_send(&rl->conn->outbufs, rl->conn->window_size, + rl->answer == XDP_TCP_RESEND, &ob, &can_data); + } + while (can_data > 0) { + NEXT_MSG + msg->flags |= KNOT_XDP_MSG_ACK; + msg->payload.iov_len = ob->len; + memcpy(msg->payload.iov_base, ob->bytes, ob->len); + + if (!ob->sent) { + assert(rl->conn->ackno == msg->seqno); + rl->conn->ackno += msg->payload.iov_len; + } else { + msg->seqno = ob->seqno; + } + + ob->sent = true; + ob->seqno = msg->seqno; + + can_data--; + ob = ob->next; } } - uint32_t sent_unused; - (void)knot_xdp_send(socket, msgs, n_msgs, &sent_unused); +#undef NEXT_MSG + + ret = send_msgs(msgs, msg - first, socket); + (void)knot_xdp_send_finish(socket); return ret; } _public_ -int knot_tcp_sweep(knot_tcp_table_t *tcp_table, knot_xdp_socket_t *socket, - uint32_t max_at_once, uint32_t close_timeout, uint32_t reset_timeout, - uint32_t reset_at_least, size_t reset_buf_size, +int knot_tcp_sweep(knot_tcp_table_t *tcp_table, + uint32_t close_timeout, uint32_t reset_timeout, + uint32_t resend_timeout, uint32_t limit_n_conn, + size_t limit_ibuf_size, size_t limit_obuf_size, + knot_tcp_relay_t *relays, size_t max_relays, uint32_t *close_count, uint32_t *reset_count) { - if (tcp_table == NULL) { + if (tcp_table == NULL || relays == NULL || max_relays < 1) { return KNOT_EINVAL; } - knot_tcp_relay_t rl = { 0 }; - knot_tcp_relay_dynarray_t relays = { 0 }; uint32_t now = get_timestamp(), i = 0; + memset(relays, 0, max_relays * sizeof(*relays)); + knot_tcp_relay_t *rl = relays; + + ssize_t free_conns = (ssize_t)tcp_table->usage - limit_n_conn; + ssize_t free_inbuf = (ssize_t)tcp_table->inbufs_total - limit_ibuf_size; + ssize_t free_outbuf = (ssize_t)tcp_table->outbufs_total - limit_obuf_size; + knot_tcp_conn_t *conn, *next; - list_t to_remove; - init_list(&to_remove); - WALK_LIST_DELSAFE(conn, next, *tcp_table_timeout(tcp_table)) { - if (i++ < reset_at_least || + rl->conn = conn; + + if (i++ < free_conns || now - conn->last_active >= reset_timeout || - (reset_buf_size > 0 && conn->inbuf.iov_len > 0)) { - rl.answer = XDP_TCP_RESET; + (free_inbuf > 0 && conn->inbuf.iov_len > 0) || + (free_outbuf > 0 && tcp_outbufs_usage(&conn->outbufs) > 0)) { + rl->answer = XDP_TCP_RESET; + rl->del_from = tcp_table_re_lookup(conn, tcp_table); - // move this conn into to-remove list - rem_node((node_t *)conn); - add_tail(&to_remove, (node_t *)conn); + free_inbuf -= conn->inbuf.iov_len; + free_outbuf -= tcp_outbufs_usage(&conn->outbufs); - reset_buf_size -= MIN(reset_buf_size, conn->inbuf.iov_len); + if (reset_count != NULL) { + (*reset_count)++; + } } else if (now - conn->last_active >= close_timeout) { if (conn->state != XDP_TCP_CLOSING1) { - rl.answer = XDP_TCP_CLOSE; + rl->answer = XDP_TCP_CLOSE; if (close_count != NULL) { (*close_count)++; } } - } else if (reset_buf_size == 0) { - break; + } else if (now - conn->last_active >= resend_timeout && + conn->outbufs.bufs != NULL && conn->outbufs.bufs->sent) { + rl->answer = XDP_TCP_RESEND; } - rl.conn = conn; - if (rl.answer != XDP_TCP_NOOP) { - (void)knot_tcp_relay_dynarray_add(&relays, &rl); - rl.answer = XDP_TCP_NOOP; - } - if (relays.size >= max_at_once) { - break; + if (rl->answer != XDP_TCP_NOOP) { + if (++rl == relays + max_relays) { + break; + } } } - - knot_xdp_send_prepare(socket); - (void)knot_tcp_send(socket, knot_tcp_relay_dynarray_arr(&relays), relays.size); - (void)knot_xdp_send_finish(socket); - - // immediately remove reset connections - if (reset_count != NULL) { - *reset_count += list_size(&to_remove); - } - WALK_LIST_DELSAFE(conn, next, to_remove) { - tcp_table_del_lookup(conn, tcp_table); - } - - knot_tcp_relay_free(&relays); - return KNOT_EOK; } + +_public_ +void knot_tcp_cleanup(knot_tcp_table_t *tcp_table, knot_tcp_relay_t *relays, size_t n_relays) +{ + for (uint32_t i = 0; i < n_relays; i++) { + if (relays[i].del_from != NULL) { + tcp_table_del(relays[i].del_from, tcp_table); + } + free(relays[i].inbufs); + } +} diff --git a/src/libknot/xdp/tcp.h b/src/libknot/xdp/tcp.h index c5ee051da..883647054 100644 --- a/src/libknot/xdp/tcp.h +++ b/src/libknot/xdp/tcp.h @@ -35,8 +35,7 @@ typedef enum { XDP_TCP_ESTABLISH = 2, XDP_TCP_CLOSE = 3, XDP_TCP_RESET = 4, - XDP_TCP_DATA = (1 << 3), - XDP_TCP_ANSWER = (1 << 4), + XDP_TCP_RESEND = 5, } knot_tcp_action_t; typedef enum { @@ -52,6 +51,10 @@ typedef enum { XDP_TCP_FREE_PREFIX, } knot_tcp_relay_free_t; +typedef struct tcp_outbufs { + struct tcp_outbuf *bufs; +} tcp_outbufs_t; // this typedef belongs to tcp_iobuf.h, but is here to avoid issues with symbols + typedef struct knot_tcp_conn { struct { void *list_node_placeholder1; @@ -65,9 +68,11 @@ typedef struct knot_tcp_conn { uint32_t seqno; uint32_t ackno; uint32_t acked; + uint32_t window_size; uint32_t last_active; knot_tcp_state_t state; struct iovec inbuf; + tcp_outbufs_t outbufs; struct knot_tcp_conn *next; } knot_tcp_conn_t; @@ -75,6 +80,7 @@ typedef struct { size_t size; size_t usage; size_t inbufs_total; + size_t outbufs_total; uint64_t hash_secret[2]; knot_tcp_conn_t *conns[]; } knot_tcp_table_t; @@ -82,17 +88,14 @@ typedef struct { typedef struct { const knot_xdp_msg_t *msg; knot_tcp_action_t action; + knot_xdp_msg_flag_t auto_answer; knot_tcp_action_t answer; - struct iovec data; - knot_tcp_relay_free_t free_data; + struct iovec *inbufs; + size_t inbufs_count; knot_tcp_conn_t *conn; + knot_tcp_conn_t **del_from; } knot_tcp_relay_t; -#define TCP_RELAY_DEFAULT_COUNT 10 - -knot_dynarray_declare(knot_tcp_relay, knot_tcp_relay_t, DYNARRAY_VISIBILITY_PUBLIC, - TCP_RELAY_DEFAULT_COUNT) - /*! * \brief Return next TCP sequence number. */ @@ -105,6 +108,11 @@ inline static uint32_t knot_tcp_next_seqno(const knot_xdp_msg_t *msg) return res; } +inline static bool knot_tcp_relay_empty(const knot_tcp_relay_t *r) +{ + return r->action == XDP_TCP_NOOP && r->auto_answer == 0 && r->inbufs_count == 0; +} + /*! * \brief Allocate TCP connection-handling hash table. * @@ -124,39 +132,31 @@ knot_tcp_table_t *knot_tcp_table_new(size_t size); void knot_tcp_table_free(knot_tcp_table_t *table); /*! - * \brief Process received packets, send ACKs, pick incoming data. + * \brief Process received packets, prepare automatick responses (e.g. ACK), pick incoming data. * - * \param socket XDP socket to answer through. - * \param msgs Packets received by knot_xdp_recv(). - * \param msg_count Number of received packets. - * \param tcp_table Table of TCP connections. - * \param syn_table Optional: extra table for handling partially established connections. - * \param relays Out: connection changes and data. - * \param ack_errors Out: incremented with number of unsent ACKs due to a buffer allocation error. + * \param relays Out: relays to be filled with message/connection details. + * \param msgs Packets received by knot_xdp_recv(); + * \param count Number of received packets. + * \param tcp_table Table of TCP connections. + * \param syn_table Optional: extra table for handling partially established connections. * * \return KNOT_E* */ -int knot_tcp_relay(knot_xdp_socket_t *socket, knot_xdp_msg_t msgs[], uint32_t msg_count, - knot_tcp_table_t *tcp_table, knot_tcp_table_t *syn_table, - knot_tcp_relay_dynarray_t *relays, uint32_t *ack_errors); +int knot_tcp_recv(knot_tcp_relay_t *relays, knot_xdp_msg_t *msgs, uint32_t count, + knot_tcp_table_t *tcp_table, knot_tcp_table_t *syn_table); /*! - * \brief Fetch answer to one relay with one or more relays with data payload. + * \brief Prepare data (payload) to be sent as a response on specific relay. * - * \param relays Relays. - * \param relay The relay to answer to. - * \param data Data payload, possibly > MSS. - * \param data_len Payload length. + * \param relay Relay with active connection. + * \param tcp_table TCP table. + * \param data Data payload, possibly > MSS and > window. + * \param len Payload length, < 64k. * - * \return KNOT_EOK, KNOT_ENOMEM + * \return KNOT_E* */ -int knot_tcp_relay_answer(knot_tcp_relay_dynarray_t *relays, const knot_tcp_relay_t *relay, - void *data, size_t data_len); - -/*! - * \brief Free resources in 'relays'. - */ -void knot_tcp_relay_free(knot_tcp_relay_dynarray_t *relays); +int knot_tcp_reply_data(knot_tcp_relay_t *relay, knot_tcp_table_t *tcp_table, + uint8_t *data, size_t len); /*! * \brief Send TCP packets. @@ -164,31 +164,43 @@ void knot_tcp_relay_free(knot_tcp_relay_dynarray_t *relays); * \param socket XDP socket to send through. * \param relays Connection changes and data. * \param relay_count Number of connection changes and data. + * \param max_at_once Limit of packet batch sent by knot_xdp_send(). * * \return KNOT_E* */ -int knot_tcp_send(knot_xdp_socket_t *socket, knot_tcp_relay_t relays[], uint32_t relay_count); +int knot_tcp_send(knot_xdp_socket_t *socket, knot_tcp_relay_t relays[], uint32_t relay_count, + uint32_t max_at_once); /*! * \brief Cleanup old TCP connections, perform timeout checks. * * \param tcp_table TCP connection table to clean up. - * \param socket XDP socket for close messages. - * \param max_at_once Don't close more connections at once. * \param close_timeout Gracefully close connections older than this (usecs). * \param reset_timeout Reset connections older than this (usecs). - * \param reset_at_least Reset at least this number of oldest connections, even - * when not yet timed out. - * \param reset_buf_size Reset oldest connection with buffered partial DNS messages - * to free up this amount of space. - * \param close_count Optional: Out: incremented with number of closed connections. - * \param reset_count Optional: Out: incremented with number of reset connections. + * \param limit_n_conn Limit of active connections in TCP table, reset if more. + * \param limit_ibuf_size Limit of memory usage by input buffers, reset if exceeded. + * \param limit_obuf_size Limit of memory usage by output buffers, reset if exceeded. + * \param relays Out: relays to be filled with close/reset instructions for knot_tcp_send(). + * \param max_relays Maximum relays to be used. + * \param close_count Out: number of connection closed. + * \param reset_count Out: number of connections reset. * - * \return KNOT_E* + * \return KNOT_E* */ -int knot_tcp_sweep(knot_tcp_table_t *tcp_table, knot_xdp_socket_t *socket, - uint32_t max_at_once, uint32_t close_timeout, uint32_t reset_timeout, - uint32_t reset_at_least, size_t reset_buf_size, +int knot_tcp_sweep(knot_tcp_table_t *tcp_table, + uint32_t close_timeout, uint32_t reset_timeout, + uint32_t resend_timeout, uint32_t limit_n_conn, + size_t limit_ibuf_size, size_t limit_obuf_size, + knot_tcp_relay_t *relays, size_t max_relays, uint32_t *close_count, uint32_t *reset_count); +/*! + * \brief Free resources of closed/reset connections. + * + * \param tcp_table TCP table with connections. + * \param relays Relays with closed/resettted (or other, ignored) connections. + * \param n_relays Number of relays. + */ +void knot_tcp_cleanup(knot_tcp_table_t *tcp_table, knot_tcp_relay_t *relays, size_t n_relays); + /*! @} */ diff --git a/src/libknot/xdp/tcp_iobuf.c b/src/libknot/xdp/tcp_iobuf.c index a578f5c99..fded5ff43 100644 --- a/src/libknot/xdp/tcp_iobuf.c +++ b/src/libknot/xdp/tcp_iobuf.c @@ -14,80 +14,244 @@ along with this program. If not, see . */ +#include +#include #include #include +#include #include "libknot/xdp/tcp_iobuf.h" +#include "libknot/xdp/tcp.h" // just tcp_outbufs_t #include "libknot/error.h" #include "contrib/macros.h" -int tcp_inbuf_update(struct iovec *buffer, struct iovec *data, - struct iovec *data_tofree, size_t *buffers_total) +static void iov_clear(struct iovec *iov) { - memset(data_tofree, 0, sizeof(*data_tofree)); - if (data->iov_len < 1) { + free(iov->iov_base); + memset(iov, 0, sizeof(*iov)); +} + +static void iov_inc(struct iovec *iov, size_t shift) +{ + assert(shift <= iov->iov_len); + iov->iov_base += shift; + iov->iov_len -= shift; +} + +/*! \brief Strip 2-byte length prefix from a payload. */ +static void iov_inc2(struct iovec *iov) +{ + iov_inc(iov, sizeof(uint16_t)); +} + +static size_t tcp_payload_len(const struct iovec *payload) +{ + assert(payload->iov_len >= 2); + uint16_t val; + memcpy(&val, payload->iov_base, sizeof(val)); + return be16toh(val) + sizeof(val); +} + +static bool iov_inc_pf(struct iovec *iov) +{ + size_t shift = tcp_payload_len(iov); + if (iov->iov_len >= shift) { + iov_inc(iov, shift); + return true; + } else { + return false; + } +} + +static size_t iov_count(const struct iovec *iov) +{ + size_t res = 0; + struct iovec tmp = *iov; + while (tmp.iov_len >= sizeof(uint16_t) && iov_inc_pf(&tmp)) { + res++; + } + return res; +} + +static void iov_append(struct iovec *what, const struct iovec *with) +{ + // NOTE: what->iov_base must be pre-allocated large enough + memcpy(what->iov_base + what->iov_len, with->iov_base, with->iov_len); + what->iov_len += with->iov_len; +} + +int tcp_inbuf_update(struct iovec *buffer, struct iovec data, + struct iovec **inbufs, size_t *inbufs_count, + size_t *buffers_total) +{ + size_t res_count = 0; + struct iovec *res = NULL, *cur = NULL; + + *inbufs = NULL; + *inbufs_count = 0; + + if (data.iov_len < 1) { return KNOT_EOK; } if (buffer->iov_len == 1) { - ((uint8_t *)buffer->iov_base)[1] = ((uint8_t *)data->iov_base)[0]; + ((uint8_t *)buffer->iov_base)[1] = ((uint8_t *)data.iov_base)[0]; buffer->iov_len++; - data->iov_base++; - data->iov_len--; - if (data->iov_len < 1) { + iov_inc(&data, 1); + if (data.iov_len < 1) { return KNOT_EOK; } } if (buffer->iov_len > 0) { size_t buffer_req = tcp_payload_len(buffer); assert(buffer_req > buffer->iov_len); - size_t data_use = buffer_req - buffer->iov_len; - if (data_use <= data->iov_len) { // usable payload combined from buffer and data ---> data_tofree - data_tofree->iov_len = buffer_req; - data_tofree->iov_base = realloc(buffer->iov_base, buffer_req); - if (data_tofree->iov_base == NULL) { + struct iovec data_use = { data.iov_base, buffer_req - buffer->iov_len }; + if (data_use.iov_len <= data.iov_len) { // usable payload combined from buffer and data ---> res[0] allocated tohether with res + iov_inc(&data, data_use.iov_len); + + res_count = 1 + iov_count(&data); + res = malloc(res_count * sizeof(*res) + buffer_req); + if (res == NULL) { return KNOT_ENOMEM; } - memcpy(data_tofree->iov_base + buffer->iov_len, data->iov_base, data_use); + res[0].iov_base = (void *)(res + res_count); + res[0].iov_len = 0; + iov_append(&res[0], buffer); + iov_append(&res[0], &data_use); + assert(res[0].iov_len == buffer_req); + iov_inc2(&res[0]); + + cur = &res[1]; *buffers_total -= buffer->iov_len; - buffer->iov_base = NULL; - buffer->iov_len = 0; - data->iov_base += data_use; - data->iov_len -= data_use; + iov_clear(buffer); } else { // just extend the buffer with data - void *bufnew = realloc(buffer->iov_base, buffer->iov_len + data->iov_len); + void *bufnew = realloc(buffer->iov_base, buffer->iov_len + data.iov_len); if (bufnew == NULL) { return KNOT_ENOMEM; } buffer->iov_base = bufnew; - memcpy(buffer->iov_base + buffer->iov_len, data->iov_base, data->iov_len); - *buffers_total += data->iov_len; - buffer->iov_len += data->iov_len; - data->iov_base += data->iov_len; - data->iov_len = 0; + iov_append(buffer, &data); + *buffers_total += data.iov_len; + return KNOT_EOK; + } + } else { // just allocate res + res_count = iov_count(&data); + if (res_count > 0) { + res = malloc(res_count * sizeof(*res)); + if (res == NULL) { + return KNOT_ENOMEM; + } + cur = &res[0]; } } - // skip whole usable payloads in data - struct iovec data_end = *data; - size_t data_req; - while (data_end.iov_len > 1 && (data_req = tcp_payload_len(&data_end)) <= data_end.iov_len) { - data_end.iov_base += data_req; - data_end.iov_len -= data_req; + void *last; + while (data.iov_len > 1) { + last = data.iov_base; + if (!iov_inc_pf(&data)) { + break; + } + cur->iov_base = last; + cur->iov_len = data.iov_base - last; + iov_inc2(cur); + cur++; } + assert(cur == res + res_count); // store the final incomplete payload to buffer - if (data_end.iov_len > 0) { + if (data.iov_len > 0) { assert(buffer->iov_base == NULL); - buffer->iov_base = malloc(MAX(data_end.iov_len, 2)); + buffer->iov_base = malloc(MAX(data.iov_len, 2)); if (buffer->iov_base == NULL) { - free(data_tofree->iov_base); - memset(data_tofree, 0, sizeof(*data_tofree)); + free(res); return KNOT_ENOMEM; } - *buffers_total += MAX(data_end.iov_len, 2); - buffer->iov_len = data_end.iov_len; - memcpy(buffer->iov_base, data_end.iov_base, data_end.iov_len); - data->iov_len -= data_end.iov_len; + *buffers_total += MAX(data.iov_len, 2); + buffer->iov_len = 0; + iov_append(buffer, &data); + } + + *inbufs = res; + *inbufs_count = res_count; + + return KNOT_EOK; +} + +int tcp_outbufs_add(struct tcp_outbufs *ob, uint8_t *data, size_t len, + uint32_t mss, size_t *outbufs_total) +{ + if (len > UINT16_MAX) { + return KNOT_ELIMIT; + } + struct tcp_outbuf **end = &ob->bufs; + while (*end != NULL) { // NOTE: this can be optimized by adding "end" pointer for the price of larger knot_tcp_conn_t struct + end = &(*end)->next; + } + uint16_t prefix = htobe16(len), prefix_len = sizeof(prefix); + while (len > 0) { + uint16_t newlen = MIN(len + prefix_len, mss); + struct tcp_outbuf *newob = calloc(1, sizeof(*newob) + newlen); + if (newob == NULL) { + return KNOT_ENOMEM; + } + *outbufs_total += sizeof(*newob) + newlen; + newob->len = newlen; + memcpy(newob->bytes, &prefix, prefix_len); + memcpy(newob->bytes + prefix_len, data, newlen - prefix_len); + + *end = newob; + end = &newob->next; + + data += newlen - prefix_len; + len -= newlen - prefix_len; + + prefix_len = 0; } return KNOT_EOK; } + +static bool seqno_lower(uint32_t seqno, uint32_t ackno, uint32_t ackno_min) +{ + if (ackno_min <= ackno) { + return (seqno >= ackno_min && seqno <= ackno); + } else { + return (seqno >= ackno_min || seqno <= ackno); + } +} + +void tcp_outbufs_ack(struct tcp_outbufs *ob, uint32_t ackno, size_t *outbufs_total) +{ + uint32_t ackno_min = ackno - (UINT32_MAX / 2); // FIXME better? + while (ob->bufs != NULL && ob->bufs->sent && seqno_lower(ob->bufs->seqno + ob->bufs->len, ackno, ackno_min)) { + struct tcp_outbuf *tofree = ob->bufs; + ob->bufs = tofree->next; + *outbufs_total -= tofree->len + sizeof(*tofree); + free(tofree); + } +} + +void tcp_outbufs_can_send(struct tcp_outbufs *ob, ssize_t window_size, bool resend, + struct tcp_outbuf **send_start, size_t *send_count) +{ + *send_count = 0; + *send_start = ob->bufs; + while (*send_start != NULL && (*send_start)->sent && !resend) { + window_size -= (*send_start)->len; + *send_start = (*send_start)->next; + } + + struct tcp_outbuf *can_send = *send_start; + while (can_send != NULL && window_size >= can_send->len) { + (*send_count)++; + window_size -= can_send->len; + can_send = can_send->next; + } +} + +size_t tcp_outbufs_usage(struct tcp_outbufs *ob) +{ + size_t res = 0; + for (struct tcp_outbuf *i = ob->bufs; i != NULL; i = i->next) { + res += i->len + sizeof(*i); + } + return res; +} diff --git a/src/libknot/xdp/tcp_iobuf.h b/src/libknot/xdp/tcp_iobuf.h index 56e90a556..b140dffe7 100644 --- a/src/libknot/xdp/tcp_iobuf.h +++ b/src/libknot/xdp/tcp_iobuf.h @@ -31,28 +31,69 @@ #include "libknot/endian.h" -/*! - * \brief Return the required length for payload buffer. - */ -inline static size_t tcp_payload_len(const struct iovec *payload) -{ - assert(payload->iov_len >= 2); - uint16_t val; - memcpy(&val, payload->iov_base, sizeof(val)); - return be16toh(val) + sizeof(val); -} +struct tcp_outbuf { + struct tcp_outbuf *next; + uint32_t len; + uint32_t seqno; + bool sent; + uint8_t bytes[]; +}; + +struct tcp_outbufs; // see tcp.h /*! * \brief Handle DNS-over-TCP payloads in buffer and message. * * \param buffer In/out: persistent buffer to store incomplete DNS payloads between receiving packets. - * \param data In/out: momental DNS payloads in incoming packet. - * \param data_tofree Out: once more DNS payload defragmented from multiple packets. + * \param data In: momental DNS payloads in incoming packet. + * \param inbufs Out: list of incoming DNS messages. + * \param inbufs_count Out: number of inbufs. * \param buffers_total In/Out: total size of buffers (will be increased or decreased). * * \return KNOT_EOK, KNOT_ENOMEM */ -int tcp_inbuf_update(struct iovec *buffer, struct iovec *data, - struct iovec *data_tofree, size_t *buffers_total); +int tcp_inbuf_update(struct iovec *buffer, struct iovec data, + struct iovec **inbufs, size_t *inbufs_count, + size_t *buffers_total); + +/*! + * \brief Add payload to be sent by TCP, to output buffers. + * + * \param ob Output buffers to be updated. + * \param data Payload to be sent. + * \param len Payload length. + * \param mss Connection outgoing MSS. + * \param outbufs_total In/out: total outbuf statistic to be updated. + * + * \return KNOT_E* + */ +int tcp_outbufs_add(struct tcp_outbufs *ob, uint8_t *data, size_t len, + uint32_t mss, size_t *outbufs_total); + +/*! + * \brief Remove+free acked data from output buffers. + * + * \param ob Output buffers to be updated. + * \param ackno Ackno of received ACK. + * \param outbufs_total In/out: total outbuf statistic to be updated. + */ +void tcp_outbufs_ack(struct tcp_outbufs *ob, uint32_t ackno, size_t *outbufs_total); + +/*! + * \brief Prepare output buffers to be sent now. + * + * \param ob Output buffers to be updated. + * \param window_size Connection outgoing window size. + * \param resend Send also possibly already sent data. + * \param send_start Out: first output buffer to be sent. + * \param send_count Out: number of output buffers to be sent. + */ +void tcp_outbufs_can_send(struct tcp_outbufs *ob, ssize_t window_size, bool resend, + struct tcp_outbuf **send_start, size_t *send_count); + +/*! + * \brief Compute allocated size of output buffers. + */ +size_t tcp_outbufs_usage(struct tcp_outbufs *ob); /*! @} */ diff --git a/src/utils/kxdpgun/main.c b/src/utils/kxdpgun/main.c index 2ccf0efb0..dd4cf370a 100644 --- a/src/utils/kxdpgun/main.c +++ b/src/utils/kxdpgun/main.c @@ -419,36 +419,25 @@ void *xdp_gun_thread(void *_ctx) break; } if (ctx->tcp) { - uint32_t ack_errors = 0; - knot_tcp_relay_dynarray_t relays = { 0 }; - ret = knot_tcp_relay(xsk, pkts, recvd, tcp_table, NULL, - &relays, &ack_errors); - lost += ack_errors; + knot_tcp_relay_t relays[recvd]; + ret = knot_tcp_recv(relays, pkts, recvd, tcp_table, NULL); if (ret != KNOT_EOK) { errors++; break; } - size_t relays_answer = relays.size; - for (size_t i = 0; i < relays_answer; i++) { - knot_tcp_relay_t *rl = &knot_tcp_relay_dynarray_arr(&relays)[i]; + for (size_t i = 0; i < recvd; i++) { + knot_tcp_relay_t *rl = &relays[i]; struct iovec payl; switch (rl->action) { case XDP_TCP_ESTABLISH: local_stats.synack_recv++; - rl->answer = XDP_TCP_ANSWER | XDP_TCP_DATA; put_dns_payload(&payl, true, ctx, &payload_ptr); - ret = knot_tcp_relay_answer(&relays, rl, payl.iov_base, - payl.iov_len); + ret = knot_tcp_reply_data(rl, tcp_table, payl.iov_base, payl.iov_len); if (ret != KNOT_EOK) { errors++; } break; - case XDP_TCP_DATA: - if (check_dns_payload(&rl->data, ctx, &local_stats)) { - rl->answer = XDP_TCP_ANSWER | XDP_TCP_CLOSE; - } - break; case XDP_TCP_CLOSE: local_stats.finack_recv++; break; @@ -458,16 +447,20 @@ void *xdp_gun_thread(void *_ctx) default: break; } + for (size_t j = 0; j < rl->inbufs_count; j++) { + if (check_dns_payload(&rl->inbufs[j], ctx, &local_stats)) { + rl->answer = XDP_TCP_CLOSE; + } + } } - ret = knot_tcp_send(xsk, knot_tcp_relay_dynarray_arr(&relays), - relays.size); + ret = knot_tcp_send(xsk, relays, recvd, ctx->at_once); if (ret != KNOT_EOK) { errors++; } (void)knot_xdp_send_finish(xsk); - knot_tcp_relay_free(&relays); + knot_tcp_cleanup(tcp_table, relays, recvd); } else { for (int i = 0; i < recvd; i++) { (void)check_dns_payload(&pkts[i].payload, ctx, diff --git a/tests/knot/test_confio.c b/tests/knot/test_confio.c index ce9c2a5ec..03ee7a092 100644 --- a/tests/knot/test_confio.c +++ b/tests/knot/test_confio.c @@ -994,6 +994,7 @@ static const yp_item_t desc_xdp[] = { { C_TCP_INBUF_MAX_SIZE, YP_TINT, YP_VNONE }, { C_TCP_IDLE_CLOSE, YP_TINT, YP_VNONE }, { C_TCP_IDLE_RESET, YP_TINT, YP_VNONE }, + { C_TCP_RESEND, YP_TINT, YP_VNONE }, { C_ROUTE_CHECK, YP_TBOOL, YP_VNONE }, { NULL } }; diff --git a/tests/libknot/test_xdp_tcp.c b/tests/libknot/test_xdp_tcp.c index 130f6a47e..4d3f7303e 100644 --- a/tests/libknot/test_xdp_tcp.c +++ b/tests/libknot/test_xdp_tcp.c @@ -32,6 +32,8 @@ size_t sent_syns = 0; size_t sent_fins = 0; uint32_t sent_seqno = 0; uint32_t sent_ackno = 0; +size_t sent2_data = 0; +size_t send2_mss = 0; knot_xdp_socket_t *test_sock = NULL; @@ -53,19 +55,15 @@ static size_t tcp_table_timeout_length(knot_tcp_table_t *table) * \param tcp_table TCP connection table to clean up. * \param timeout Remove connections older than this (usecs). * \param at_least Remove at least this number of connections. - * \param cleaned Optional: Out: number of removed connections. */ static void tcp_cleanup(knot_tcp_table_t *tcp_table, uint32_t timeout, - uint32_t at_least, uint32_t *cleaned) + uint32_t at_least) { uint32_t now = get_timestamp(), i = 0; knot_tcp_conn_t *conn, *next; WALK_LIST_DELSAFE(conn, next, *tcp_table_timeout(tcp_table)) { if (i++ < at_least || now - conn->last_active >= timeout) { - tcp_table_del_lookup(conn, tcp_table); - if (cleaned != NULL) { - (*cleaned)++; - } + tcp_table_del(tcp_table_re_lookup(conn, tcp_table), tcp_table); } } } @@ -129,9 +127,26 @@ static int mock_send_nocheck(_unused_ knot_xdp_socket_t *sock, const knot_xdp_ms return KNOT_EOK; } +static int mock_send2(_unused_ knot_xdp_socket_t *sock, const knot_xdp_msg_t msgs[], + uint32_t n_msgs, _unused_ uint32_t *sent) +{ + ok(n_msgs <= 20, "send2: not too many at once"); + for (uint32_t i = 0; i < n_msgs; i++) { + const knot_xdp_msg_t *msg = msgs + i; + ok(msg->flags & KNOT_XDP_MSG_TCP, "send2: is TCP message"); + ok(msg->flags & KNOT_XDP_MSG_ACK, "send2: has ACK"); + ok(msg->payload.iov_len <= send2_mss, "send2: fulfilled MSS"); + sent2_data += msg->payload.iov_len; + + sent_seqno = msg->seqno; + sent_ackno = msg->ackno; + } + return KNOT_EOK; +} + static void clean_table(void) { - (void)tcp_cleanup(test_table, 0, UINT32_MAX, NULL); + (void)tcp_cleanup(test_table, 0, UINT32_MAX); } static void clean_sent(void) @@ -190,71 +205,70 @@ static void fix_seqacks(knot_xdp_msg_t *msgs, size_t count) void test_syn(void) { knot_xdp_msg_t msg; - knot_tcp_relay_dynarray_t relays = { 0 }; + knot_tcp_relay_t rl; prepare_msg(&msg, KNOT_XDP_MSG_SYN, 1, 2); - int ret = knot_tcp_relay(test_sock, &msg, 1, test_table, NULL, &relays, NULL); + int ret = knot_tcp_recv(&rl, &msg, 1, test_table, NULL); is_int(KNOT_EOK, ret, "SYN: relay OK"); + ret = knot_tcp_send(test_sock, &rl, 1, 1); + is_int(KNOT_EOK, ret, "SYN: send OK"); is_int(msg.seqno + 1, sent_ackno, "SYN: ackno"); check_sent(0, 0, 1, 0); - is_int(1, relays.size, "SYN: one relay"); - knot_tcp_relay_t *rl = &knot_tcp_relay_dynarray_arr(&relays)[0]; - is_int(XDP_TCP_SYN, rl->action, "SYN: relay action"); - is_int(XDP_TCP_NOOP, rl->answer, "SYN: relay answer"); - is_int(0, rl->data.iov_len, "SYN: no payload"); + is_int(XDP_TCP_SYN, rl.action, "SYN: relay action"); + is_int(XDP_TCP_NOOP, rl.answer, "SYN: relay answer"); + is_int(0, rl.inbufs_count, "SYN: no payload"); is_int(1, test_table->usage, "SYN: one connection in table"); knot_tcp_conn_t *conn = tcp_table_find(test_table, &msg); ok(conn != NULL, "SYN: connection present"); - ok(conn == rl->conn, "SYN: relay points to connection"); + ok(conn == rl.conn, "SYN: relay points to connection"); is_int(XDP_TCP_ESTABLISHING, conn->state, "SYN: connection state"); ok(memcmp(&conn->ip_rem, &msg.ip_from, sizeof(msg.ip_from)) == 0, "SYN: conn IP from"); ok(memcmp(&conn->ip_loc, &msg.ip_to, sizeof(msg.ip_to)) == 0, "SYN: conn IP to"); - knot_tcp_relay_free(&relays); + knot_tcp_cleanup(test_table, &rl, 1); test_conn = conn; } void test_establish(void) { knot_xdp_msg_t msg; - knot_tcp_relay_dynarray_t relays = { 0 }; + knot_tcp_relay_t rl; prepare_msg(&msg, KNOT_XDP_MSG_ACK, 1, 2); prepare_seqack(&msg, 0, 1); - int ret = knot_tcp_relay(test_sock, &msg, 1, test_table, NULL, &relays, NULL); + int ret = knot_tcp_recv(&rl, &msg, 1, test_table, NULL); is_int(KNOT_EOK, ret, "establish: relay OK"); + ret = knot_tcp_send(test_sock, &rl, 1, 1); + is_int(KNOT_EOK, ret, "establish: send OK"); check_sent(0, 0, 0, 0); - is_int(0, relays.size, "establish: no relay"); - /*knot_tcp_relay_t *rl = &knot_tcp_relay_dynarray_arr(&relays)[0]; - is_int(XDP_TCP_ESTABLISH, rl->action, "establish: relay action"); - ok(rl->conn != NULL, "establish: connection present"); - ok(rl->conn == test_conn, "establish: same connection"); - is_int(XDP_TCP_NORMAL, rl->conn->state, "establish: connection state");*/ + is_int(0, rl.auto_answer, "establish: no auto answer"); - knot_tcp_relay_free(&relays); + knot_tcp_cleanup(test_table, &rl, 1); clean_table(); } void test_syn_ack(void) { knot_xdp_msg_t msg; - knot_tcp_relay_dynarray_t relays = { 0 }; + knot_tcp_relay_t rl; prepare_msg(&msg, KNOT_XDP_MSG_SYN | KNOT_XDP_MSG_ACK, 1000, 2000); - int ret = knot_tcp_relay(test_sock, &msg, 1, test_table, NULL, &relays, NULL); + int ret = knot_tcp_recv(&rl, &msg, 1, test_table, NULL); is_int(KNOT_EOK, ret, "SYN+ACK: relay OK"); + ret = knot_tcp_send(test_sock, &rl, 1, 1); + is_int(KNOT_EOK, ret, "SYN+ACK: send OK"); is_int(msg.seqno + 1, sent_ackno, "SYN+ACK: ackno"); check_sent(1, 0, 0, 0); - is_int(1, relays.size, "SYN+ACK: one relay"); - knot_tcp_relay_t *rl = &knot_tcp_relay_dynarray_arr(&relays)[0]; - is_int(XDP_TCP_ESTABLISH, rl->action, "SYN+ACK: relay action"); - ok(rl->conn != NULL, "SYN+ACK: connection present"); + is_int(XDP_TCP_ESTABLISH, rl.action, "SYN+ACK: relay action"); + ok(rl.conn != NULL, "SYN+ACK: connection present"); - test_conn = rl->conn; - knot_tcp_relay_free(&relays); + test_conn = rl.conn; + knot_tcp_cleanup(test_table, &rl, 1); } void test_data_fragments(void) { - knot_xdp_msg_t msgs[4]; - knot_tcp_relay_dynarray_t relays = { 0 }; + const size_t CONNS = 4; + knot_xdp_msg_t msgs[CONNS]; + knot_tcp_relay_t rls[CONNS]; + memset(rls, 0, CONNS * sizeof(*rls)); // first msg contains one whole payload and one fragment prepare_msg(&msgs[0], KNOT_XDP_MSG_ACK, 1000, 2000); @@ -276,37 +290,44 @@ void test_data_fragments(void) prepare_seqack(&msgs[3], 15, 0); prepare_data(&msgs[3], "\x02""AB""\xff\xff""abcdefghijklmnopqrstuvwxyz...", 34); - int ret = knot_tcp_relay(test_sock, msgs, sizeof(msgs) / sizeof(msgs[0]), - test_table, NULL, &relays, NULL); + int ret = knot_tcp_recv(rls, msgs, CONNS, test_table, NULL); is_int(KNOT_EOK, ret, "fragments: relay OK"); + ret = knot_tcp_send(test_sock, rls, CONNS, CONNS); + is_int(KNOT_EOK, ret, "fragments: send OK"); is_int(msgs[3].ackno, sent_seqno, "fragments: seqno"); is_int(msgs[3].seqno + msgs[3].payload.iov_len, sent_ackno, "fragments: ackno"); check_sent(4, 0, 0, 0); - knot_tcp_relay_t *rls = knot_tcp_relay_dynarray_arr(&relays); - is_int(XDP_TCP_DATA, rls[0].action, "fragments0: action"); - is_int(XDP_TCP_NOOP, rls[0].answer, "fragments0: answer"); - is_int(3, rls[0].data.iov_len, "fragments0: data length"); - ok(memcmp("xyz", rls[0].data.iov_base, rls[0].data.iov_len) == 0, "fragments0: data"); + is_int(KNOT_XDP_MSG_ACK, rls[0].auto_answer, "fragments[0]: auto answer"); ok(rls[0].conn != NULL, "fragments0: connection present"); ok(rls[0].conn == test_conn, "fragments0: same connection"); + is_int(1, rls[0].inbufs_count, "fragments0: inbufs count"); + is_int(3, rls[0].inbufs[0].iov_len, "fragments0: data length"); + is_int(0, memcmp("xyz", rls[0].inbufs[0].iov_base, rls[0].inbufs[0].iov_len), "fragments0: data"); - is_int(XDP_TCP_DATA, rls[1].action, "fragments1: action"); - is_int(4, rls[1].data.iov_len, "fragments1: data length"); - ok(memcmp("abcd", rls[1].data.iov_base, rls[1].data.iov_len) == 0, "fragments1: data"); - ok(rls[1].conn == test_conn, "fragments1: same connection"); + is_int(KNOT_XDP_MSG_ACK, rls[1].auto_answer, "fragments[1]: auto answer"); + is_int(XDP_TCP_NOOP, rls[1].action, "fragments[1]: action"); // NOTE: NOOP + ok(rls[0].conn != NULL, "fragments1: connection present"); + ok(rls[0].conn == test_conn, "fragments1: same connection"); + is_int(0, rls[1].inbufs_count, "fragments1: inbufs count"); - is_int(XDP_TCP_DATA, rls[2].action, "fragments2: action"); - is_int(1, rls[2].data.iov_len, "fragments2: data length"); - ok(memcmp("i", rls[2].data.iov_base, rls[2].data.iov_len) == 0, "fragments2: data"); - ok(rls[2].conn == test_conn, "fragments2: same connection"); + is_int(KNOT_XDP_MSG_ACK, rls[2].auto_answer, "fragments[2]: auto answer"); + ok(rls[0].conn != NULL, "fragments2: connection present"); + ok(rls[0].conn == test_conn, "fragments2: same connection"); + is_int(2, rls[2].inbufs_count, "fragments2: inbufs count"); + is_int(4, rls[2].inbufs[0].iov_len, "fragments2-0: data length"); + is_int(0, memcmp("abcd", rls[2].inbufs[0].iov_base, rls[2].inbufs[0].iov_len), "fragments2-0: data"); + is_int(1, rls[2].inbufs[1].iov_len, "fragments2-1: data length"); + is_int(0, memcmp("i", rls[2].inbufs[1].iov_base, rls[2].inbufs[1].iov_len), "fragments2-1: data"); - is_int(XDP_TCP_DATA, rls[3].action, "fragments3: action"); - is_int(2, rls[3].data.iov_len, "fragments3: data length"); - ok(memcmp("AB", rls[3].data.iov_base, rls[3].data.iov_len) == 0, "fragments3: data"); - ok(rls[3].conn == test_conn, "fragments3: same connection"); + is_int(KNOT_XDP_MSG_ACK, rls[3].auto_answer, "fragments[3]: auto answer"); + ok(rls[0].conn != NULL, "fragments3: connection present"); + ok(rls[0].conn == test_conn, "fragments3: same connection"); + is_int(1, rls[3].inbufs_count, "fragments3: inbufs count"); + is_int(2, rls[3].inbufs[0].iov_len, "fragments3: data length"); + is_int(0, memcmp("AB", rls[3].inbufs[0].iov_base, rls[3].inbufs[0].iov_len), "fragments3: data"); - knot_tcp_relay_free(&relays); + knot_tcp_cleanup(test_table, rls, 4); } void test_close(void) @@ -314,25 +335,26 @@ void test_close(void) size_t conns_pre = test_table->usage; knot_xdp_msg_t msg; - knot_tcp_relay_dynarray_t relays = { 0 }; + knot_tcp_relay_t rl; prepare_msg(&msg, KNOT_XDP_MSG_FIN | KNOT_XDP_MSG_ACK, be16toh(test_conn->ip_rem.sin6_port), be16toh(test_conn->ip_loc.sin6_port)); prepare_seqack(&msg, 0, 0); - int ret = knot_tcp_relay(test_sock, &msg, 1, test_table, NULL, &relays, NULL); + int ret = knot_tcp_recv(&rl, &msg, 1, test_table, NULL); is_int(KNOT_EOK, ret, "close: relay 1 OK"); + ret = knot_tcp_send(test_sock, &rl, 1, 1); + is_int(KNOT_EOK, ret, "close: send OK"); check_sent(0, 0, 0, 1); - is_int(1, relays.size, "close: one relay"); - knot_tcp_relay_t *rl = &knot_tcp_relay_dynarray_arr(&relays)[0]; - is_int(XDP_TCP_CLOSE, rl->action, "close: relay action"); - ok(rl->conn == test_conn, "close: same connection"); - is_int(XDP_TCP_CLOSING2, rl->conn->state, "close: conn state"); - knot_tcp_relay_free(&relays); + is_int(XDP_TCP_CLOSE, rl.action, "close: relay action"); + ok(rl.conn == test_conn, "close: same connection"); + is_int(XDP_TCP_CLOSING2, rl.conn->state, "close: conn state"); msg.flags &= ~KNOT_XDP_MSG_FIN; prepare_seqack(&msg, 0, 0); - ret = knot_tcp_relay(test_sock, &msg, 1, test_table, NULL, &relays, NULL); + ret = knot_tcp_recv(&rl, &msg, 1, test_table, NULL); is_int(KNOT_EOK, ret, "close: relay 2 OK"); + ret = knot_tcp_send(test_sock, &rl, 1, 1); + is_int(KNOT_EOK, ret, "close: send 2 OK"); check_sent(0, 0, 0, 0); is_int(conns_pre - 1, test_table->usage, "close: connection removed"); is_int(conns_pre - 1, tcp_table_timeout_length(test_table), "close: timeout list size"); @@ -349,61 +371,71 @@ void test_many(void) for (size_t i = 0; i < CONNS; i++) { prepare_msg(&msgs[i], KNOT_XDP_MSG_SYN, i + 2, 1); } + knot_tcp_relay_t *rls = malloc(CONNS * sizeof(*rls)); - knot_tcp_relay_dynarray_t relays = { 0 }; - int ret = knot_tcp_relay(test_sock, msgs, CONNS, test_table, NULL, &relays, NULL); + int ret = knot_tcp_recv(rls, msgs, CONNS, test_table, NULL); is_int(KNOT_EOK, ret, "many: relay OK"); + ret = knot_tcp_send(test_sock, rls, CONNS, CONNS); + is_int(KNOT_EOK, ret, "many: relay send OK"); check_sent(0, 0, CONNS, 0); - is_int(CONNS, relays.size, "many: relays count"); is_int(CONNS, test_table->usage, "many: table usage"); - knot_tcp_relay_free(&relays); + knot_tcp_cleanup(test_table, rls, CONNS); usleep(timeout_time); knot_xdp_msg_t *survive = &msgs[i_survive]; + knot_tcp_relay_t surv_rl; survive->flags = (KNOT_XDP_MSG_TCP | KNOT_XDP_MSG_ACK); knot_tcp_conn_t *surv_conn = tcp_table_find(test_table, survive); fix_seqack(survive); prepare_data(survive, "\x00\x00", 2); - (void)knot_tcp_relay(test_sock, survive, 1, test_table, NULL, &relays, NULL); - is_int(1, relays.size, "many/survivor: one relay"); - knot_tcp_relay_t *rl = &knot_tcp_relay_dynarray_arr(&relays)[0]; + ret = knot_tcp_recv(&surv_rl, survive, 1, test_table, NULL); + is_int(KNOT_EOK, ret, "many/survivor: OK"); clean_sent(); uint32_t reset_count = 0, close_count = 0; - ret = knot_tcp_sweep(test_table, test_sock, UINT32_MAX, timeout_time, UINT32_MAX, - 0, 0, &close_count, &reset_count); + ret = knot_tcp_sweep(test_table, timeout_time, UINT32_MAX, UINT32_MAX, UINT32_MAX, UINT32_MAX, + UINT32_MAX, rls, CONNS, &close_count, &reset_count); is_int(KNOT_EOK, ret, "many/timeout1: OK"); is_int(CONNS - 1, close_count, "many/timeout1: close count"); is_int(0, reset_count, "may/timeout1: reset count"); + ret = knot_tcp_send(test_sock, rls, CONNS, CONNS); + is_int(KNOT_EOK, ret, "many/timeout1: send OK"); check_sent(0, 0, 0, CONNS - 1); close_count = 0; - ret = knot_tcp_sweep(test_table, test_sock, UINT32_MAX, UINT32_MAX, timeout_time, - 0, 0, &close_count, &reset_count); + ret = knot_tcp_sweep(test_table, UINT32_MAX, timeout_time, UINT32_MAX, UINT32_MAX, UINT32_MAX, + UINT32_MAX, rls, CONNS, &close_count, &reset_count); is_int(KNOT_EOK, ret, "many/timeout2: OK"); is_int(0, close_count, "many/timeout2: close count"); is_int(CONNS - 1, reset_count, "may/timeout2: reset count"); + ret = knot_tcp_send(test_sock, rls, CONNS, CONNS); + is_int(KNOT_EOK, ret, "many/timeout2: send OK"); check_sent(0, CONNS - 1, 0, 0); + knot_tcp_cleanup(test_table, rls, CONNS); is_int(1, test_table->usage, "many/timeout: one survivor"); is_int(1, tcp_table_timeout_length(test_table), "many/timeout: one survivor in timeout list"); ok(surv_conn != NULL, "many/timeout: survivor connection present"); - ok(surv_conn == rl->conn, "many/timeout: same connection"); + ok(surv_conn == surv_rl.conn, "many/timeout: same connection"); + knot_tcp_cleanup(test_table, &surv_rl, 1); free(msgs); + free(rls); } void test_ibufs_size(void) { int CONNS = 4; knot_xdp_msg_t msgs[CONNS]; - knot_tcp_relay_dynarray_t relays = { 0 }; + knot_tcp_relay_t rls[CONNS]; // just open connections for (int i = 0; i < CONNS; i++) { prepare_msg(&msgs[i], KNOT_XDP_MSG_SYN, i + 2000, 1); } - int ret = knot_tcp_relay(test_sock, msgs, CONNS, test_table, NULL, &relays, NULL); + int ret = knot_tcp_recv(rls, msgs, CONNS, test_table, NULL); is_int(KNOT_EOK, ret, "ibufs: open OK"); + ret = knot_tcp_send(test_sock, rls, CONNS, CONNS); + is_int(KNOT_EOK, ret, "ibufs: first send OK"); check_sent(0, 0, CONNS, 0); for (int i = 0; i < CONNS; i++) { msgs[i].flags = KNOT_XDP_MSG_TCP | KNOT_XDP_MSG_ACK; @@ -414,11 +446,13 @@ void test_ibufs_size(void) // first connection will start a fragment buf then finish it fix_seqack(&msgs[0]); prepare_data(&msgs[0], "\x00\x0a""lorem", 7); - ret = knot_tcp_relay(test_sock, &msgs[0], 1, test_table, NULL, &relays, NULL); + ret = knot_tcp_recv(&rls[0], &msgs[0], 1, test_table, NULL); is_int(KNOT_EOK, ret, "ibufs: must be OK"); + ret = knot_tcp_send(test_sock, &rls[0], 1, 1); + is_int(KNOT_EOK, ret, "ibufs: must send OK"); check_sent(1, 0, 0, 0); is_int(7, test_table->inbufs_total, "inbufs: first inbuf"); - knot_tcp_relay_free(&relays); + knot_tcp_cleanup(test_table, &rls[0], 1); // other connection will just store fragments fix_seqacks(msgs, CONNS); @@ -426,22 +460,28 @@ void test_ibufs_size(void) prepare_data(&msgs[1], "\x00\xff""12345", 7); prepare_data(&msgs[2], "\xff\xff""abcde", 7); prepare_data(&msgs[3], "\xff\xff""abcde", 7); - ret = knot_tcp_relay(test_sock, msgs, CONNS, test_table, NULL, &relays, NULL); + ret = knot_tcp_recv(rls, msgs, CONNS, test_table, NULL); is_int(KNOT_EOK, ret, "inbufs: relay OK"); + ret = knot_tcp_send(test_sock, rls, CONNS, CONNS); + is_int(KNOT_EOK, ret, "inbufs: send OK"); check_sent(CONNS, 0, 0, 0); is_int(21, test_table->inbufs_total, "inbufs: after change"); - is_int(1, relays.size, "inbufs: one relay"); - is_int(10, knot_tcp_relay_dynarray_arr(&relays)[0].data.iov_len, "inbufs: data length"); - knot_tcp_relay_free(&relays); + is_int(0, rls[1].action, "inbufs: one relay"); + is_int(10, rls[0].inbufs[0].iov_len, "inbufs: data length"); + knot_tcp_cleanup(test_table, rls, CONNS); // now free some uint32_t reset_count = 0, close_count = 0; - ret = knot_tcp_sweep(test_table, test_sock, UINT32_MAX, UINT32_MAX, UINT32_MAX, - 0, 8, &close_count, &reset_count); + ret = knot_tcp_sweep(test_table, UINT32_MAX, UINT32_MAX, UINT32_MAX, UINT32_MAX, + test_table->inbufs_total - 8, UINT32_MAX, rls, + CONNS, &close_count, &reset_count); is_int(KNOT_EOK, ret, "inbufs: timeout OK"); + ret = knot_tcp_send(test_sock, rls, CONNS, CONNS); + is_int(KNOT_EOK, ret, "inbufs: timeout send OK"); check_sent(0, 2, 0, 0); is_int(0, close_count, "inbufs: close count"); is_int(2, reset_count, "inbufs: reset count"); + knot_tcp_cleanup(test_table, rls, CONNS); is_int(7, test_table->inbufs_total, "inbufs: final state"); ok(NULL != tcp_table_find(test_table, &msgs[0]), "inbufs: first conn survived"); ok(NULL == tcp_table_find(test_table, &msgs[1]), "inbufs: second conn not survived"); @@ -451,6 +491,68 @@ void test_ibufs_size(void) clean_table(); } +void test_obufs(void) +{ + knot_xdp_msg_t msg; + knot_tcp_relay_t rl; + + prepare_msg(&msg, KNOT_XDP_MSG_SYN, 1, 2); + (void)knot_tcp_recv(&rl, &msg, 1, test_table, NULL); // SYN + (void)knot_tcp_send(test_sock, &rl, 1, 1); // SYN+ACK + prepare_msg(&msg, KNOT_XDP_MSG_ACK, 1, 2); + prepare_seqack(&msg, 0, 1); + (void)knot_tcp_recv(&rl, &msg, 1, test_table, NULL); // ACK + + size_t TEST_MSS = 1111; + size_t DATA_LEN = 65535; // with 2-byte len prefix, this is > 64k == window_size + uint8_t *data = calloc(DATA_LEN, 1); + rl.conn->mss = TEST_MSS; + send2_mss = TEST_MSS; + + int ret = knot_tcp_reply_data(&rl, test_table, data, DATA_LEN), i = 0; + is_int(KNOT_EOK, ret, "obufs: fill with data"); + for (struct tcp_outbuf *ob = rl.conn->outbufs.bufs; ob != NULL; ob = ob->next, i++) { + if (ob->next == NULL) { + ok(ob->len > 0, "init last ob[%d]: non-trivial", i); + ok(ob->len <= TEST_MSS, "init last ob[%d]: fulfills MSS", i); + } else { + is_int(TEST_MSS, ob->len, "init ob[%d]: exactly MSS", i); + } + ok(!ob->sent, "init ob[%d]: not sent", i); + } + ret = knot_tcp_send(test_sock, &rl, 1, 20), i = 0; + is_int(KNOT_EOK, ret, "obufs: send OK"); + is_int((DATA_LEN + 2) / TEST_MSS * TEST_MSS, sent2_data, "obufs: sent all but one MSS"); + for (struct tcp_outbuf *ob = rl.conn->outbufs.bufs; ob != NULL; ob = ob->next, i++) { + if (ob->next == NULL) { + ok(!ob->sent, "last ob[%d]: not sent", i); + } else { + ok(ob->sent, "ob[%d]: sent", i); + if (ob->next->next != NULL) { + is_int(ob->seqno + ob->len, ob->next->seqno, "init ob[%d+1]: seqno", i); + } + } + } + knot_tcp_cleanup(test_table, &rl, 1); + + prepare_seqack(&msg, 0, TEST_MSS); + ret = knot_tcp_recv(&rl, &msg, 1, test_table, NULL); + is_int(KNOT_EOK, ret, "obufs: ACKed data"); + struct tcp_outbuf *surv_ob = rl.conn->outbufs.bufs; + ok(surv_ob != NULL, "obufs: unACKed survived"); + ok(surv_ob->next == NULL, "obufs: just one survived"); + ok(!surv_ob->sent, "obufs: survivor not sent"); + ret = knot_tcp_send(test_sock, &rl, 1, 20); + is_int(KNOT_EOK, ret, "obufs: send rest OK"); + is_int(DATA_LEN + 2, sent2_data, "obufs: sent all"); + ok(surv_ob->sent, "obufs: survivor sent"); + is_int(sent_seqno, surv_ob->seqno, "obufs: survivor seqno"); + + knot_tcp_cleanup(test_table, &rl, 1); + clean_table(); + free(data); +} + static void init_mock(knot_xdp_socket_t **socket, void *send_mock) { *socket = calloc(1, sizeof(**socket)); @@ -481,6 +583,10 @@ int main(int argc, char *argv[]) init_mock(&test_sock, mock_send_nocheck); test_many(); + knot_xdp_deinit(test_sock); + init_mock(&test_sock, mock_send2); + test_obufs(); + knot_xdp_deinit(test_sock); knot_tcp_table_free(test_table);