mirror of
https://github.com/haproxy/haproxy.git
synced 2026-04-15 21:59:41 -04:00
STREAM frames have dedicated handling on retransmission. A special check
is done to remove data already acked in case of duplicated frames, thus
only unacked data are retransmitted.
This handling is faulty in case of an empty STREAM frame with FIN set.
On retransmission, this frame does not cover any unacked range as it is
empty and is thus discarded. This may cause the transfer to freeze with
the client waiting indefinitely for the FIN notification.
To handle retransmission of empty FIN STREAM frame, qc_stream_desc layer
have been extended. A new flag QC_SD_FL_WAIT_FOR_FIN is set by MUX QUIC
when FIN has been transmitted. If set, it prevents qc_stream_desc to be
freed until FIN is acknowledged. On retransmission side,
qc_stream_frm_is_acked() has been updated. It now reports false if
FIN bit is set on the frame and qc_stream_desc has QC_SD_FL_WAIT_FOR_FIN
set.
This must be backported up to 2.6. However, this modifies heavily
critical section for ACK handling and retransmission. As such, it must
be backported only after a period of observation.
This issue can be reproduced by using the following socat command as
server to add delay between the response and connection closure :
$ socat TCP-LISTEN:<port>,fork,reuseaddr,crlf SYSTEM:'echo "HTTP/1.1 200 OK"; echo ""; sleep 1;'
On the client side, ngtcp2 can be used to simulate packet drop. Without
this patch, connection will be interrupted on QUIC idle timeout or
haproxy client timeout with ERR_DRAINING on ngtcp2 :
$ ngtcp2-client --exit-on-all-streams-close -r 0.3 <host> <port> "http://<host>:<port>/?s=32o"
Alternatively to ngtcp2 random loss, an extra haproxy patch can also be
used to force skipping the emission of the empty STREAM frame :
diff --git a/include/haproxy/quic_tx-t.h b/include/haproxy/quic_tx-t.h
index efbdfe687..1ff899acd 100644
--- a/include/haproxy/quic_tx-t.h
+++ b/include/haproxy/quic_tx-t.h
@@ -26,6 +26,8 @@ extern struct pool_head *pool_head_quic_cc_buf;
/* Flag a sent packet as being probing with old data */
#define QUIC_FL_TX_PACKET_PROBE_WITH_OLD_DATA (1UL << 5)
+#define QUIC_FL_TX_PACKET_SKIP_SENDTO (1UL << 6)
+
/* Structure to store enough information about TX QUIC packets. */
struct quic_tx_packet {
/* List entry point. */
diff --git a/src/quic_tx.c b/src/quic_tx.c
index 2f199ac3c..2702fc9b9 100644
--- a/src/quic_tx.c
+++ b/src/quic_tx.c
@@ -318,7 +318,7 @@ static int qc_send_ppkts(struct buffer *buf, struct ssl_sock_ctx *ctx)
tmpbuf.size = tmpbuf.data = dglen;
TRACE_PROTO("TX dgram", QUIC_EV_CONN_SPPKTS, qc);
- if (!skip_sendto) {
+ if (!skip_sendto && !(first_pkt->flags & QUIC_FL_TX_PACKET_SKIP_SENDTO)) {
int ret = qc_snd_buf(qc, &tmpbuf, tmpbuf.data, 0, gso);
if (ret < 0) {
if (gso && ret == -EIO) {
@@ -354,6 +354,7 @@ static int qc_send_ppkts(struct buffer *buf, struct ssl_sock_ctx *ctx)
qc->cntrs.sent_bytes_gso += ret;
}
}
+ first_pkt->flags &= ~QUIC_FL_TX_PACKET_SKIP_SENDTO;
b_del(buf, dglen + QUIC_DGRAM_HEADLEN);
qc->bytes.tx += tmpbuf.data;
@@ -2066,6 +2067,17 @@ static int qc_do_build_pkt(unsigned char *pos, const unsigned char *end,
continue;
}
+ switch (cf->type) {
+ case QUIC_FT_STREAM_8 ... QUIC_FT_STREAM_F:
+ if (!cf->stream.len && (qc->flags & QUIC_FL_CONN_TX_MUX_CONTEXT)) {
+ TRACE_USER("artificially drop packet with empty STREAM frame", QUIC_EV_CONN_TXPKT, qc);
+ pkt->flags |= QUIC_FL_TX_PACKET_SKIP_SENDTO;
+ }
+ break;
+ default:
+ break;
+ }
+
quic_tx_packet_refinc(pkt);
cf->pkt = pkt;
}
316 lines
8.8 KiB
C
316 lines
8.8 KiB
C
#include <haproxy/quic_stream.h>
|
|
|
|
#include <import/eb64tree.h>
|
|
|
|
#include <haproxy/api.h>
|
|
#include <haproxy/buf.h>
|
|
#include <haproxy/dynbuf.h>
|
|
#include <haproxy/list.h>
|
|
#include <haproxy/mux_quic.h>
|
|
#include <haproxy/pool.h>
|
|
#include <haproxy/quic_conn.h>
|
|
#include <haproxy/task.h>
|
|
|
|
DECLARE_STATIC_POOL(pool_head_quic_stream_desc, "qc_stream_desc",
|
|
sizeof(struct qc_stream_desc));
|
|
DECLARE_STATIC_POOL(pool_head_quic_stream_buf, "qc_stream_buf",
|
|
sizeof(struct qc_stream_buf));
|
|
|
|
|
|
/* Returns true if nothing to ack yet for stream <s> including FIN bit. */
|
|
static inline int qc_stream_desc_done(const struct qc_stream_desc *s)
|
|
{
|
|
return !(s->flags & QC_SD_FL_WAIT_FOR_FIN) && LIST_ISEMPTY(&s->buf_list);
|
|
}
|
|
|
|
static void qc_stream_buf_free(struct qc_stream_desc *stream,
|
|
struct qc_stream_buf **stream_buf)
|
|
{
|
|
struct quic_conn *qc = stream->qc;
|
|
struct buffer *buf = &(*stream_buf)->buf;
|
|
|
|
LIST_DEL_INIT(&(*stream_buf)->list);
|
|
|
|
/* Reset current buf ptr if deleted instance is the same one. */
|
|
if (*stream_buf == stream->buf)
|
|
stream->buf = NULL;
|
|
|
|
b_free(buf);
|
|
offer_buffers(NULL, 1);
|
|
pool_free(pool_head_quic_stream_buf, *stream_buf);
|
|
*stream_buf = NULL;
|
|
|
|
/* notify MUX about available buffers. */
|
|
--qc->stream_buf_count;
|
|
if (qc->mux_state == QC_MUX_READY) {
|
|
/* notify MUX about available buffers.
|
|
*
|
|
* TODO several streams may be woken up even if a single buffer
|
|
* is available for now.
|
|
*/
|
|
while (qcc_notify_buf(qc->qcc))
|
|
;
|
|
}
|
|
}
|
|
|
|
/* Allocate a new stream descriptor with id <id>. The caller is responsible to
|
|
* store the stream in the appropriate tree. -1 special value must be used for
|
|
* a CRYPTO data stream, the type being ignored.
|
|
*
|
|
* Returns the newly allocated instance on success or else NULL.
|
|
*/
|
|
struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type type, void *ctx,
|
|
struct quic_conn *qc)
|
|
{
|
|
struct qc_stream_desc *stream;
|
|
|
|
stream = pool_alloc(pool_head_quic_stream_desc);
|
|
if (!stream)
|
|
return NULL;
|
|
|
|
if (id == (uint64_t)-1) {
|
|
stream->by_id.key = (uint64_t)-1;
|
|
}
|
|
else {
|
|
stream->by_id.key = id;
|
|
eb64_insert(&qc->streams_by_id, &stream->by_id);
|
|
qc->rx.strms[type].nb_streams++;
|
|
}
|
|
stream->qc = qc;
|
|
|
|
stream->buf = NULL;
|
|
LIST_INIT(&stream->buf_list);
|
|
stream->buf_offset = 0;
|
|
|
|
stream->acked_frms = EB_ROOT;
|
|
stream->ack_offset = 0;
|
|
stream->flags = 0;
|
|
stream->ctx = ctx;
|
|
|
|
return stream;
|
|
}
|
|
|
|
/* Mark the stream descriptor <stream> as released. It will be freed as soon as
|
|
* all its buffered data are acknowledged. Does nothing if <stream> is already
|
|
* NULL.
|
|
*
|
|
* <final_size> corresponds to the last offset sent for this stream. If there
|
|
* is unsent data present, they will be remove first to guarantee that buffer
|
|
* is freed after receiving all acknowledges.
|
|
*/
|
|
void qc_stream_desc_release(struct qc_stream_desc *stream,
|
|
uint64_t final_size)
|
|
{
|
|
if (!stream)
|
|
return;
|
|
|
|
/* A stream can be released only one time. */
|
|
BUG_ON(stream->flags & QC_SD_FL_RELEASE);
|
|
|
|
stream->flags |= QC_SD_FL_RELEASE;
|
|
stream->ctx = NULL;
|
|
|
|
if (stream->buf) {
|
|
struct qc_stream_buf *stream_buf = stream->buf;
|
|
struct buffer *buf = &stream_buf->buf;
|
|
const uint64_t tail_offset =
|
|
MAX(stream->buf_offset, stream->ack_offset) + b_data(buf);
|
|
|
|
/* final_size cannot be greater than all currently stored data. */
|
|
BUG_ON(final_size > tail_offset);
|
|
|
|
/* Remove unsent data from current buffer. */
|
|
if (final_size < tail_offset) {
|
|
b_sub(buf, tail_offset - final_size);
|
|
/* Remove buffer if all ACK already received. */
|
|
if (!b_data(buf))
|
|
qc_stream_buf_free(stream, &stream_buf);
|
|
}
|
|
|
|
/* A released stream does not use <stream.buf>. */
|
|
stream->buf = NULL;
|
|
}
|
|
|
|
if (qc_stream_desc_done(stream)) {
|
|
/* if no buffer left we can free the stream. */
|
|
qc_stream_desc_free(stream, 0);
|
|
}
|
|
}
|
|
|
|
/* Acknowledge data at <offset> of length <len> for <stream> with <fin> set for
|
|
* the final data. After data removal, if the stream does not contains data
|
|
* any more and is already released, the instance stream is freed. <stream> is
|
|
* set to NULL to indicate this.
|
|
*
|
|
* Returns the count of byte removed from stream. Do not forget to check if
|
|
* <stream> is NULL after invocation.
|
|
*/
|
|
int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len,
|
|
int fin)
|
|
{
|
|
struct qc_stream_desc *s = *stream;
|
|
struct qc_stream_buf *stream_buf = NULL;
|
|
struct buffer *buf = NULL;
|
|
size_t diff;
|
|
|
|
/* Cannot advertise FIN for an inferior data range. */
|
|
BUG_ON(fin && offset + len < s->ack_offset);
|
|
|
|
if (offset + len < s->ack_offset || offset > s->ack_offset)
|
|
return 0;
|
|
|
|
diff = offset + len - s->ack_offset;
|
|
if (diff) {
|
|
/* Buf list cannot be empty if there is still unacked data. */
|
|
BUG_ON(LIST_ISEMPTY(&s->buf_list));
|
|
|
|
/* get oldest buffer from buf_list */
|
|
stream_buf = LIST_NEXT(&s->buf_list, struct qc_stream_buf *, list);
|
|
buf = &stream_buf->buf;
|
|
|
|
s->ack_offset += diff;
|
|
b_del(buf, diff);
|
|
|
|
/* Free oldest buffer if all data acknowledged. */
|
|
if (!b_data(buf)) {
|
|
qc_stream_buf_free(s, &stream_buf);
|
|
buf = NULL;
|
|
}
|
|
}
|
|
|
|
if (fin) {
|
|
/* Mark FIN as acknowledged. */
|
|
s->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
|
|
}
|
|
|
|
/* Free stream instance if already released and everything acknowledged. */
|
|
if ((s->flags & QC_SD_FL_RELEASE) && qc_stream_desc_done(s)) {
|
|
qc_stream_desc_free(s, 0);
|
|
*stream = NULL;
|
|
}
|
|
|
|
return diff;
|
|
}
|
|
|
|
/* Free the stream descriptor <stream> content. This function should be used
|
|
* when all its data have been acknowledged or on full connection closing if <closing>
|
|
* boolean is set to 1. It must only be called after the stream is released.
|
|
*/
|
|
void qc_stream_desc_free(struct qc_stream_desc *stream, int closing)
|
|
{
|
|
struct qc_stream_buf *buf, *buf_back;
|
|
struct quic_conn *qc = stream->qc;
|
|
struct eb64_node *frm_node;
|
|
unsigned int free_count = 0;
|
|
|
|
/* This function only deals with released streams. */
|
|
BUG_ON(!(stream->flags & QC_SD_FL_RELEASE));
|
|
|
|
/* free remaining stream buffers */
|
|
list_for_each_entry_safe(buf, buf_back, &stream->buf_list, list) {
|
|
if (!(b_data(&buf->buf)) || closing) {
|
|
b_free(&buf->buf);
|
|
LIST_DELETE(&buf->list);
|
|
pool_free(pool_head_quic_stream_buf, buf);
|
|
|
|
++free_count;
|
|
}
|
|
}
|
|
|
|
if (free_count) {
|
|
offer_buffers(NULL, free_count);
|
|
|
|
qc->stream_buf_count -= free_count;
|
|
if (qc->mux_state == QC_MUX_READY) {
|
|
/* notify MUX about available buffers.
|
|
*
|
|
* TODO several streams may be woken up even if a single buffer
|
|
* is available for now.
|
|
*/
|
|
while (qcc_notify_buf(qc->qcc))
|
|
;
|
|
}
|
|
}
|
|
|
|
/* qc_stream_desc might be freed before having received all its ACKs.
|
|
* This is the case if some frames were retransmitted.
|
|
*/
|
|
frm_node = eb64_first(&stream->acked_frms);
|
|
while (frm_node) {
|
|
struct qf_stream *strm_frm;
|
|
struct quic_frame *frm;
|
|
|
|
strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
|
|
|
|
frm_node = eb64_next(frm_node);
|
|
eb64_delete(&strm_frm->offset);
|
|
|
|
frm = container_of(strm_frm, struct quic_frame, stream);
|
|
qc_release_frm(qc, frm);
|
|
}
|
|
|
|
if (stream->by_id.key != (uint64_t)-1)
|
|
eb64_delete(&stream->by_id);
|
|
pool_free(pool_head_quic_stream_desc, stream);
|
|
}
|
|
|
|
/* Return the current buffer of <stream>. May be NULL if not allocated. */
|
|
struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream)
|
|
{
|
|
if (!stream->buf)
|
|
return NULL;
|
|
|
|
return &stream->buf->buf;
|
|
}
|
|
|
|
/* Returns the count of available buffer left for <qc>. */
|
|
static int qc_stream_buf_avail(struct quic_conn *qc)
|
|
{
|
|
BUG_ON(qc->stream_buf_count > global.tune.quic_streams_buf);
|
|
return global.tune.quic_streams_buf - qc->stream_buf_count;
|
|
}
|
|
|
|
/* Allocate a new current buffer for <stream>. The buffer limit count for the
|
|
* connection is checked first. This function is not allowed if current buffer
|
|
* is not NULL prior to this call. The new buffer represents stream payload at
|
|
* offset <offset>.
|
|
*
|
|
* Returns the buffer or NULL on error. Caller may check <avail> to ensure if
|
|
* the connection buffer limit was reached or a fatal error was encountered.
|
|
*/
|
|
struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
|
|
uint64_t offset, int *avail)
|
|
{
|
|
struct quic_conn *qc = stream->qc;
|
|
|
|
/* current buffer must be released first before allocate a new one. */
|
|
BUG_ON(stream->buf);
|
|
|
|
*avail = qc_stream_buf_avail(qc);
|
|
if (!*avail)
|
|
return NULL;
|
|
|
|
stream->buf_offset = offset;
|
|
stream->buf = pool_alloc(pool_head_quic_stream_buf);
|
|
if (!stream->buf)
|
|
return NULL;
|
|
|
|
++qc->stream_buf_count;
|
|
|
|
stream->buf->buf = BUF_NULL;
|
|
LIST_APPEND(&stream->buf_list, &stream->buf->list);
|
|
|
|
return &stream->buf->buf;
|
|
}
|
|
|
|
/* Release the current buffer of <stream>. It will be kept internally by
|
|
* the <stream>. The current buffer cannot be NULL.
|
|
*/
|
|
void qc_stream_buf_release(struct qc_stream_desc *stream)
|
|
{
|
|
/* current buffer already released */
|
|
BUG_ON(!stream->buf);
|
|
|
|
stream->buf = NULL;
|
|
stream->buf_offset = 0;
|
|
}
|