diff --git a/src/mux_h1.c b/src/mux_h1.c index 94925382a..cbbcd90aa 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -37,12 +37,7 @@ /* Flags indicating why reading input data are blocked. */ #define H1C_F_IN_ALLOC 0x00000010 /* mux is blocked on lack of input buffer */ #define H1C_F_IN_FULL 0x00000020 /* mux is blocked on input buffer full */ -/* 0x00000040 - 0x00000080 unused */ - -/* Flags indicating why parsing data are blocked */ -#define H1C_F_RX_ALLOC 0x00000100 /* mux is blocked on lack of rx buffer */ -#define H1C_F_RX_FULL 0x00000200 /* mux is blocked on rx buffer full */ -/* 0x00000400 - 0x00000800 unused */ +/* 0x00000040 - 0x00000800 unused */ #define H1C_F_CS_ERROR 0x00001000 /* connection must be closed ASAP because an error occurred */ #define H1C_F_CS_SHUTW_NOW 0x00002000 /* connection must be shut down for writes ASAP */ @@ -58,13 +53,13 @@ #define H1S_F_ERROR 0x00000001 /* An error occurred on the H1 stream */ #define H1S_F_REQ_ERROR 0x00000002 /* An error occurred during the request parsing/xfer */ #define H1S_F_RES_ERROR 0x00000004 /* An error occurred during the response parsing/xfer */ -#define H1S_F_MSG_XFERED 0x00000008 /* current message was transferred to the data layer */ +/* 0x00000008 unused */ #define H1S_F_WANT_KAL 0x00000010 #define H1S_F_WANT_TUN 0x00000020 #define H1S_F_WANT_CLO 0x00000040 #define H1S_F_WANT_MSK 0x00000070 #define H1S_F_NOT_FIRST 0x00000080 /* The H1 stream is not the first one */ -#define H1S_F_BUF_FLUSH 0x00000100 /* Flush input buffers (ibuf and rxbuf) and don't read more data */ +#define H1S_F_BUF_FLUSH 0x00000100 /* Flush input buffer and don't read more data */ /* H1 connection descriptor */ @@ -88,8 +83,6 @@ struct h1s { struct conn_stream *cs; uint32_t flags; /* Connection flags: H1S_F_* */ - struct buffer rxbuf; /*receive buffer, always valid (buf_empty or real buffer) */ - struct wait_event *recv_wait; /* Address of the wait_event the conn_stream associated is waiting on */ struct wait_event *send_wait; /* Address of the wait_event the conn_stream associated is waiting on */ @@ -168,13 +161,6 @@ static int h1_buf_available(void *target) return 1; } - if ((h1c->flags & H1C_F_RX_ALLOC) && h1c->h1s && b_alloc_margin(&h1c->h1s->rxbuf, 0)) { - h1c->flags &= ~H1C_F_RX_ALLOC; - if (h1_recv_allowed(h1c)) - tasklet_wakeup(h1c->wait_event.task); - return 1; - } - return 0; } @@ -255,7 +241,6 @@ static struct h1s *h1s_create(struct h1c *h1c, struct conn_stream *cs) h1c->h1s = h1s; h1s->cs = NULL; - h1s->rxbuf = BUF_NULL; h1s->flags = H1S_F_NONE; h1s->recv_wait = NULL; @@ -283,18 +268,18 @@ static struct h1s *h1s_create(struct h1c *h1c, struct conn_stream *cs) h1s->res.err_pos = -1; } + /* If a conn_stream already exists, attach it to this H1S. Otherwise we + * create a new one. + */ if (cs) { - /* If a conn_stream already exists, attach it to this H1S */ cs->ctx = h1s; h1s->cs = cs; } -#if 1 else { cs = h1s_new_cs(h1s); if (!cs) goto fail; } -#endif return h1s; fail: @@ -308,7 +293,6 @@ static void h1s_destroy(struct h1s *h1s) struct h1c *h1c = h1s->h1c; h1c->h1s = NULL; - h1c->flags &= ~(H1C_F_RX_FULL|H1C_F_RX_ALLOC); if (h1s->recv_wait != NULL) h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV; @@ -319,7 +303,6 @@ static void h1s_destroy(struct h1s *h1s) if (h1s->flags & (H1S_F_REQ_ERROR|H1S_F_RES_ERROR)) h1c->flags |= H1C_F_CS_ERROR; - h1_release_buf(h1c, &h1s->rxbuf); cs_free(h1s->cs); pool_free(pool_head_h1s, h1s); } @@ -1030,7 +1013,7 @@ static void h1_sync_messages(struct h1c *h1c) if (h1s->res.state == H1_MSG_DONE && (h1s->status < 200 && (h1s->status == 100 || h1s->status >= 102)) && - ((!conn_is_back(h1c->conn) && !b_data(&h1c->obuf)) || !b_data(&h1s->rxbuf))) { + (conn_is_back(h1c->conn) || !b_data(&h1c->obuf))) { /* For 100-Continue response or any other informational 1xx * response which is non-final, don't reset the request, the * transaction is not finished. We take care the response was @@ -1039,7 +1022,7 @@ static void h1_sync_messages(struct h1c *h1c) h1m_init_res(&h1s->res); h1s->res.flags |= H1_MF_NO_PHDR; } - else if (!b_data(&h1s->rxbuf) && !b_data(&h1c->obuf) && + else if (!b_data(&h1c->obuf) && h1s->req.state == H1_MSG_DONE && h1s->res.state == H1_MSG_DONE) { if (h1s->flags & H1S_F_WANT_TUN) { h1m_init_req(&h1s->req); @@ -1052,38 +1035,29 @@ static void h1_sync_messages(struct h1c *h1c) /* * Process incoming data. It parses data and transfer them from h1c->ibuf into - * h1s->rxbuf. It returns the number of bytes parsed and transferred if > 0, or - * 0 if it couldn't proceed. + * . It returns the number of bytes parsed and transferred if > 0, or 0 if + * it couldn't proceed. */ -static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count) +static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, int flags) { - struct h1s *h1s = NULL; + struct h1s *h1s = h1c->h1s; struct h1m *h1m; struct htx *htx; size_t total = 0; size_t ret = 0; - size_t max; + size_t count, max; int errflag; - h1s = NULL; - - /* Create a new H1S if not already done */ - if (!h1c->h1s && !h1s_create(h1c, NULL)) - goto fatal_err; - h1s = h1c->h1s; -#if 0 - /* Create the CS if not already attached to the H1S */ - if (!h1s->cs && !h1s_new_cs(h1s)) - goto fatal_err; -#endif - if (!count) - goto end; - if (!h1_get_buf(h1c, &h1s->rxbuf)) { - h1c->flags |= H1C_F_RX_ALLOC; - goto end; + htx = htx_from_buf(buf); + count = b_data(&h1c->ibuf); + max = htx_free_space(htx); + if (flags & CO_RFL_KEEP_RSV) { + if (max < global.tune.maxrewrite) + goto end; + max -= global.tune.maxrewrite; } - - htx = htx_from_buf(&h1s->rxbuf); + if (count > max) + count = max; if (!conn_is_back(h1c->conn)) { h1m = &h1s->req; @@ -1094,32 +1068,21 @@ static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count errflag = H1S_F_RES_ERROR; } - max = count; - while (!(h1s->flags & errflag) && max) { + while (!(h1s->flags & errflag) && count) { if (h1m->state <= H1_MSG_LAST_LF) { - ret = h1_process_headers(h1s, h1m, htx, buf, &total, max); + ret = h1_process_headers(h1s, h1m, htx, &h1c->ibuf, &total, count); if (!ret) break; -#if 0 - /* Create the CS if not already attached to the H1S */ - if (!h1s->cs && !h1s_new_cs(h1s)) - goto fatal_err; -#endif } else if (h1m->state <= H1_MSG_TRAILERS) { - /* Do not parse the body if the header part is not yet - * transferred to the stream. - */ - if (!(h1s->flags & H1S_F_MSG_XFERED)) - break; - ret = h1_process_data(h1s, h1m, htx, buf, &total, max); + ret = h1_process_data(h1s, h1m, htx, &h1c->ibuf, &total, count); if (!ret) break; } else if (h1m->state == H1_MSG_DONE) break; else if (h1m->state == H1_MSG_TUNNEL) { - ret = h1_process_data(h1s, h1m, htx, buf, &total, max); + ret = h1_process_data(h1s, h1m, htx, &h1c->ibuf, &total, count); if (!ret) break; } @@ -1128,44 +1091,45 @@ static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count break; } - max -= ret; + count -= ret; } if (h1s->flags & errflag) goto parsing_err; - b_del(buf, total); - if (htx_is_not_empty(htx)) { - b_set_data(&h1s->rxbuf, b_size(&h1s->rxbuf)); - if (!htx_free_data_space(htx)) - h1c->flags |= H1C_F_RX_FULL; - } - else - h1_release_buf(h1c, &h1s->rxbuf); + b_del(&h1c->ibuf, total); - ret = count - max; - if (h1s->recv_wait) { - h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV; - tasklet_wakeup(h1s->recv_wait->task); - h1s->recv_wait = NULL; - } end: - return ret; + if (htx_is_not_empty(htx)) + b_set_data(buf, b_size(buf)); + else { + htx_reset(htx); + b_set_data(buf, 0); + } - fatal_err: - h1c->flags |= H1C_F_CS_ERROR; - sess_log(h1c->conn->owner); - return 0; + if (h1c->flags & H1C_F_IN_FULL && b_room(&h1c->ibuf)) { + h1c->flags &= ~H1C_F_IN_FULL; + tasklet_wakeup(h1c->wait_event.task); + } + + if (b_data(&h1c->ibuf)) + h1s->cs->flags |= CS_FL_RCV_MORE; + else { + h1_release_buf(h1c, &h1c->ibuf); + h1_sync_messages(h1c); + + h1s->cs->flags &= ~CS_FL_RCV_MORE; + if (h1s->cs->flags & CS_FL_REOS) + h1s->cs->flags |= CS_FL_EOS; + } + return total; parsing_err: // FIXME: create an error snapshot here b_reset(&h1c->ibuf); - h1s->cs->flags |= CS_FL_REOS; - if (h1s->recv_wait) { - h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV; - tasklet_wakeup(h1s->recv_wait->task); - h1s->recv_wait = NULL; - } + htx->flags |= HTX_FL_PARSING_ERROR; + b_set_data(buf, b_size(buf)); + h1s->cs->flags |= CS_FL_EOS; return 0; } @@ -1345,76 +1309,6 @@ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t coun return total; } -/* - * Transfer data from h1s->rxbuf into the channel buffer. It returns the number - * of bytes transferred. - */ -static size_t h1_xfer(struct h1s *h1s, struct buffer *buf, int flags) -{ - struct h1c *h1c = h1s->h1c; - struct h1m *h1m; - struct conn_stream *cs = h1s->cs; - struct htx *mux_htx, *chn_htx; - struct htx_ret htx_ret; - size_t count, ret = 0; - - h1m = (!conn_is_back(h1c->conn) ? &h1s->req : &h1s->res); - mux_htx = htx_from_buf(&h1s->rxbuf); - chn_htx = htx_from_buf(buf); - - if (h1s->flags & (H1S_F_REQ_ERROR|H1S_F_RES_ERROR)) { - chn_htx->flags |= HTX_FL_PARSING_ERROR; - b_set_data(buf, b_size(buf)); - goto end; - } - if (htx_is_empty(mux_htx)) - goto end; - count = htx_free_space(chn_htx); - if (flags & CO_RFL_KEEP_RSV) { - if (count < global.tune.maxrewrite) - goto end; - count -= global.tune.maxrewrite; - } - - // FIXME: if chn empty and count > htx => b_xfer ! - if (!(h1s->flags & H1S_F_MSG_XFERED)) { - htx_ret = htx_xfer_blks(chn_htx, mux_htx, count, - ((h1m->state == H1_MSG_DONE) ? HTX_BLK_EOM : HTX_BLK_EOH)); - ret = htx_ret.ret; - if (htx_ret.blk && htx_get_blk_type(htx_ret.blk) >= HTX_BLK_EOH) - h1s->flags |= H1S_F_MSG_XFERED; - } - else { - htx_ret = htx_xfer_blks(chn_htx, mux_htx, count, HTX_BLK_EOM); - ret = htx_ret.ret; - } - chn_htx->extra = mux_htx->extra; - if (h1m->flags & H1_MF_XFER_LEN) - chn_htx->extra += mux_htx->data; - - if (htx_is_not_empty(chn_htx)) - b_set_data(buf, b_size(buf)); - end: - if (h1c->flags & H1C_F_RX_FULL && htx_free_data_space(mux_htx)) { - h1c->flags &= ~H1C_F_RX_FULL; - tasklet_wakeup(h1c->wait_event.task); - } - - if (htx_is_not_empty(mux_htx)) { - cs->flags |= CS_FL_RCV_MORE; - } - else { - h1c->flags &= ~H1C_F_RX_FULL; - h1_release_buf(h1c, &h1s->rxbuf); - h1_sync_messages(h1c); - - cs->flags &= ~CS_FL_RCV_MORE; - if (!b_data(&h1c->ibuf) && (cs->flags & CS_FL_REOS)) - cs->flags |= CS_FL_EOS; - } - return ret; -} - /*********************************************************/ /* functions below are I/O callbacks from the connection */ /*********************************************************/ @@ -1430,11 +1324,8 @@ static int h1_recv(struct h1c *h1c) if (h1c->wait_event.wait_reason & SUB_CAN_RECV) return 0; - if (!h1_recv_allowed(h1c)) { - if (h1c->h1s && b_data(&h1c->h1s->rxbuf)) - rcvd = 1; + if (!h1_recv_allowed(h1c)) goto end; - } if (h1c->h1s && (h1c->h1s->flags & H1S_F_BUF_FLUSH)) { rcvd = 1; @@ -1500,12 +1391,6 @@ static int h1_send(struct h1c *h1c) h1c->flags &= ~H1C_F_OUT_FULL; b_del(&h1c->obuf, ret); sent = 1; - - if (h1c->h1s && h1c->h1s->send_wait) { - h1c->h1s->send_wait->wait_reason &= ~SUB_CAN_SEND; - tasklet_wakeup(h1c->h1s->send_wait->task); - h1c->h1s->send_wait = NULL; - } } end: @@ -1563,40 +1448,33 @@ static int h1_process(struct h1c * h1c) { struct connection *conn = h1c->conn; - if (!(h1c->flags & (H1C_F_CS_ERROR|H1C_F_RX_FULL|H1C_F_RX_ALLOC))) { - size_t ret; - - ret = h1_process_input(h1c, &h1c->ibuf, b_data(&h1c->ibuf)); - if (ret > 0) { - h1c->flags &= ~H1C_F_IN_FULL; - if (!b_data(&h1c->ibuf)) - h1_release_buf(h1c, &h1c->ibuf); - } - } - - h1_send(h1c); - if (!conn->mux_ctx) return -1; if (h1c->flags & H1C_F_CS_WAIT_CONN) { - if (conn->flags & (CO_FL_CONNECTED|CO_FL_ERROR)) { - h1c->flags &= ~H1C_F_CS_WAIT_CONN; - h1_wake_stream(h1c); - } - goto end; + if (!(conn->flags & (CO_FL_CONNECTED|CO_FL_ERROR))) + goto end; + h1c->flags &= ~H1C_F_CS_WAIT_CONN; } - if ((h1c->flags & H1C_F_CS_ERROR) || (conn->flags & CO_FL_ERROR) || conn_xprt_read0_pending(conn)) { - h1_wake_stream(h1c); - if (!h1c->h1s || !h1c->h1s->cs) { - h1_release(conn); - return -1; + if (!h1c->h1s) { + if (h1c->flags & H1C_F_CS_ERROR || + conn->flags & CO_FL_ERROR || + conn_xprt_read0_pending(conn)) + goto release; + if (!(h1c->flags & (H1C_F_CS_SHUTW_NOW|H1C_F_CS_SHUTW))) { + if (!h1s_create(h1c, NULL)) + goto release; } } + h1_wake_stream(h1c); end: return 0; + + release: + h1_release(conn); + return -1; } static struct task *h1_io_cb(struct task *t, void *ctx, unsigned short status) @@ -1608,7 +1486,7 @@ static struct task *h1_io_cb(struct task *t, void *ctx, unsigned short status) ret = h1_send(h1c); if (!(h1c->wait_event.wait_reason & SUB_CAN_RECV)) ret |= h1_recv(h1c); - if (ret || b_data(&h1c->ibuf) || (h1c->h1s && b_data(&h1c->h1s->rxbuf))) + if (ret/* || b_data(&h1c->ibuf)*/) h1_process(h1c); return NULL; } @@ -1618,7 +1496,8 @@ static int h1_wake(struct connection *conn) { struct h1c *h1c = conn->mux_ctx; - return (h1_process(h1c)); + h1_send(h1c); + return h1_process(h1c); } /*******************************************/ @@ -1817,20 +1696,18 @@ static int h1_subscribe(struct conn_stream *cs, int event_type, void *param) static size_t h1_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags) { struct h1s *h1s = cs->ctx; + struct h1c *h1c = h1s->h1c; size_t ret = 0; - if (!h1s) - return ret; - - if (!(h1s->h1c->flags & H1C_F_RX_ALLOC)) - ret = h1_xfer(h1s, buf, flags); + if (!(h1c->flags & H1C_F_IN_ALLOC)) + ret = h1_process_input(h1c, buf, flags); if (flags & CO_RFL_BUF_FLUSH) h1s->flags |= H1S_F_BUF_FLUSH; else if (ret > 0 || (h1s->flags & H1S_F_BUF_FLUSH)) { h1s->flags &= ~H1S_F_BUF_FLUSH; - if (!(h1s->h1c->wait_event.wait_reason & SUB_CAN_RECV)) - tasklet_wakeup(h1s->h1c->wait_event.task); + if (!(h1c->wait_event.wait_reason & SUB_CAN_RECV)) + tasklet_wakeup(h1c->wait_event.task); } return ret; } @@ -1851,12 +1728,16 @@ static size_t h1_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun if (h1c->flags & H1C_F_CS_WAIT_CONN) return 0; - if (!(h1c->flags & (H1C_F_OUT_FULL|H1C_F_OUT_ALLOC)) && b_data(buf)) + if (!(h1c->flags & (H1C_F_OUT_FULL|H1C_F_OUT_ALLOC))) ret = h1_process_output(h1c, buf, count); if (ret > 0) { h1_send(h1c); - /* We need to do that because of the infinite forwarding. */ + /* We need to do that because of the infinite forwarding. + * contains HTX messages so when infinite forwarding is enabled, + * count is equal to the buffer size. From outside, the buffer + * appears as full. + */ if (!b_data(buf)) ret = count; } @@ -1871,7 +1752,7 @@ static int h1_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int c struct h1m *h1m = (!conn_is_back(cs->conn) ? &h1s->req : &h1s->res); int ret = 0; - if (b_data(&h1s->rxbuf) || b_data(&h1s->h1c->ibuf)) + if (b_data(&h1s->h1c->ibuf)) goto end; if (h1m->state == H1_MSG_DATA && count > h1m->curr_len) count = h1m->curr_len;