mirror of
https://github.com/haproxy/haproxy.git
synced 2026-04-22 14:49:45 -04:00
MINOR: mux-quic: define Rx connection buffer for QMux
When QMux is used, mux-quic must actively performed reception of new content. This has been implemented by the previous patch. The current patch extends this by defining a buffer on QCC dedicated to this operation. This replaces the usage of the trash buffer. This is necessary to deal with incomplete reads.
This commit is contained in:
parent
068baf4ddf
commit
81f22cd68a
3 changed files with 25 additions and 9 deletions
|
|
@ -84,6 +84,9 @@ struct qcc {
|
|||
struct quic_pacer pacer; /* engine used to pace emission */
|
||||
int paced_sent_ctr; /* counter for when emission is interrupted due to pacing */
|
||||
} tx;
|
||||
struct {
|
||||
struct buffer qstrm_buf;
|
||||
} rx;
|
||||
|
||||
uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */
|
||||
uint64_t largest_uni_r; /* largest remote uni stream ID opened. */
|
||||
|
|
|
|||
|
|
@ -3745,6 +3745,15 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
|
|||
qcc->pacing_task->state |= TASK_F_WANTS_TIME;
|
||||
}
|
||||
|
||||
if (!conn_is_quic(conn)) {
|
||||
qcc->rx.qstrm_buf = BUF_NULL;
|
||||
b_alloc(&qcc->rx.qstrm_buf, DB_MUX_RX);
|
||||
if (!b_size(&qcc->rx.qstrm_buf)) {
|
||||
TRACE_ERROR("rx qstrm buf alloc failure", QMUX_EV_QCC_NEW);
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
|
||||
if (conn_is_back(conn)) {
|
||||
qcc->next_bidi_l = 0x00;
|
||||
qcc->largest_bidi_r = 0x01;
|
||||
|
|
|
|||
|
|
@ -42,7 +42,8 @@ static int qstrm_parse_frm(struct qcc *qcc, struct buffer *buf)
|
|||
old = pos = (unsigned char *)b_head(buf);
|
||||
end = (unsigned char *)b_head(buf) + b_data(buf);
|
||||
ret = qc_parse_frm_type(&frm, &pos, end, NULL);
|
||||
BUG_ON(!ret);
|
||||
if (!ret)
|
||||
return 0;
|
||||
|
||||
if (!qstrm_is_frm_valid(&frm)) {
|
||||
/* TODO close connection with FRAME_ENCODING_ERROR */
|
||||
|
|
@ -51,7 +52,8 @@ static int qstrm_parse_frm(struct qcc *qcc, struct buffer *buf)
|
|||
}
|
||||
|
||||
ret = qc_parse_frm_payload(&frm, &pos, end, NULL);
|
||||
BUG_ON(!ret);
|
||||
if (!ret)
|
||||
return 0;
|
||||
|
||||
if (frm.type >= QUIC_FT_STREAM_8 &&
|
||||
frm.type <= QUIC_FT_STREAM_F) {
|
||||
|
|
@ -86,24 +88,26 @@ static int qstrm_parse_frm(struct qcc *qcc, struct buffer *buf)
|
|||
*/
|
||||
int qcc_qstrm_recv(struct qcc *qcc)
|
||||
{
|
||||
/* TODO add a buffer on the connection for incomplete data read */
|
||||
struct connection *conn = qcc->conn;
|
||||
struct buffer *buf = &qcc->rx.qstrm_buf;
|
||||
int total = 0, frm_ret;
|
||||
size_t ret;
|
||||
|
||||
TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
|
||||
|
||||
do {
|
||||
b_reset(&trash);
|
||||
ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, &trash, trash.size, NULL, 0, 0);
|
||||
b_realign_if_empty(buf);
|
||||
ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, b_contig_space(buf), NULL, 0, 0);
|
||||
BUG_ON(conn->flags & CO_FL_ERROR);
|
||||
|
||||
total += ret;
|
||||
while (b_data(&trash)) {
|
||||
frm_ret = qstrm_parse_frm(qcc, &trash);
|
||||
BUG_ON(!frm_ret);
|
||||
while (b_data(buf)) {
|
||||
frm_ret = qstrm_parse_frm(qcc, buf);
|
||||
BUG_ON(frm_ret < 0); /* TODO handle fatal errors */
|
||||
if (!frm_ret)
|
||||
break;
|
||||
|
||||
b_del(&trash, frm_ret);
|
||||
b_del(buf, frm_ret);
|
||||
}
|
||||
} while (ret > 0);
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue