diff --git a/include/proto/connection.h b/include/proto/connection.h index 90c4fddee..47825a05e 100644 --- a/include/proto/connection.h +++ b/include/proto/connection.h @@ -291,17 +291,6 @@ static inline void conn_cond_update_polling(struct connection *c) } } -/* recompute the mux polling flags after updating the current conn_stream and - * propagate the result down the transport layer. - */ -static inline void cs_update_mux_polling(struct conn_stream *cs) -{ - struct connection *conn = cs->conn; - - if (conn->mux && conn->mux->update_poll) - conn->mux->update_poll(cs); -} - /***** Event manipulation primitives for use by DATA I/O callbacks *****/ /* The __conn_* versions do not propagate to lower layers and are only meant * to be used by handlers called by the connection handler. The other ones @@ -317,28 +306,6 @@ static inline void __conn_xprt_stop_recv(struct connection *c) c->flags &= ~CO_FL_XPRT_RD_ENA; } -static inline void __cs_want_recv(struct conn_stream *cs) -{ - cs->flags |= CS_FL_DATA_RD_ENA; -} - -static inline void __cs_stop_recv(struct conn_stream *cs) -{ - cs->flags &= ~CS_FL_DATA_RD_ENA; -} - -static inline void cs_want_recv(struct conn_stream *cs) -{ - __cs_want_recv(cs); - cs_update_mux_polling(cs); -} - -static inline void cs_stop_recv(struct conn_stream *cs) -{ - __cs_stop_recv(cs); - cs_update_mux_polling(cs); -} - /* this one is used only to stop speculative recv(). It doesn't stop it if the * fd is already polled in order to avoid expensive polling status changes. * Since it might require the upper layer to re-enable reading, we'll return 1 @@ -368,40 +335,6 @@ static inline void __conn_xprt_stop_both(struct connection *c) c->flags &= ~(CO_FL_XPRT_WR_ENA | CO_FL_XPRT_RD_ENA); } -static inline void __cs_want_send(struct conn_stream *cs) -{ - cs->flags |= CS_FL_DATA_WR_ENA; -} - -static inline void __cs_stop_send(struct conn_stream *cs) -{ - cs->flags &= ~CS_FL_DATA_WR_ENA; -} - -static inline void cs_stop_send(struct conn_stream *cs) -{ - __cs_stop_send(cs); - cs_update_mux_polling(cs); -} - -static inline void cs_want_send(struct conn_stream *cs) -{ - __cs_want_send(cs); - cs_update_mux_polling(cs); -} - -static inline void __cs_stop_both(struct conn_stream *cs) -{ - cs->flags &= ~(CS_FL_DATA_WR_ENA | CS_FL_DATA_RD_ENA); -} - -static inline void cs_stop_both(struct conn_stream *cs) -{ - __cs_stop_both(cs); - cs_update_mux_polling(cs); -} - - static inline void conn_xprt_want_recv(struct connection *c) { __conn_xprt_want_recv(c); @@ -546,7 +479,6 @@ static inline void conn_xprt_shutw_hard(struct connection *c) /* shut read */ static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode) { - __cs_stop_recv(cs); /* clean data-layer shutdown */ if (cs->conn->mux && cs->conn->mux->shutr) @@ -557,7 +489,6 @@ static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode) /* shut write */ static inline void cs_shutw(struct conn_stream *cs, enum cs_shw_mode mode) { - __cs_stop_send(cs); /* clean data-layer shutdown */ if (cs->conn->mux && cs->conn->mux->shutw) diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index 7860c79c8..2b50c06ea 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -199,7 +199,6 @@ static inline void si_idle_cs(struct stream_interface *si, struct list *pool) LIST_ADD(pool, &conn->list); cs_attach(cs, si, &si_idle_conn_cb); - cs_want_recv(cs); } /* Attach conn_stream to the stream interface . The stream interface @@ -349,13 +348,15 @@ static inline void si_update(struct stream_interface *si) /* Calls chk_rcv on the connection using the data layer */ static inline void si_chk_rcv(struct stream_interface *si) { - si->ops->chk_rcv(si); + if (si->ops->chk_rcv) + si->ops->chk_rcv(si); } /* Calls chk_snd on the connection using the data layer */ static inline void si_chk_snd(struct stream_interface *si) { - si->ops->chk_snd(si); + if (si->ops->chk_snd) + si->ops->chk_snd(si); } /* Calls chk_snd on the connection using the ctrl layer */ @@ -378,10 +379,6 @@ static inline int si_connect(struct stream_interface *si) } else { /* reuse the existing connection */ - if (!channel_is_empty(si_oc(si))) { - /* we'll have to send a request there. */ - cs_want_send(cs); - } /* the connection is established */ si->state = SI_ST_EST; diff --git a/include/types/connection.h b/include/types/connection.h index 3a7de64a5..da530598a 100644 --- a/include/types/connection.h +++ b/include/types/connection.h @@ -66,9 +66,6 @@ union conn_handle { /* conn_stream flags */ enum { CS_FL_NONE = 0x00000000, /* Just for initialization purposes */ - CS_FL_DATA_RD_ENA = 0x00000001, /* receiving data is allowed */ - CS_FL_DATA_WR_ENA = 0x00000002, /* sending data is desired */ - CS_FL_SHRD = 0x00000010, /* read shut, draining extra data */ CS_FL_SHRR = 0x00000020, /* read shut, resetting extra data */ CS_FL_SHR = CS_FL_SHRD | CS_FL_SHRR, /* read shut status */ @@ -315,7 +312,6 @@ struct xprt_ops { struct mux_ops { int (*init)(struct connection *conn, struct proxy *prx); /* early initialization */ int (*wake)(struct connection *conn); /* mux-layer callback to report activity, mandatory */ - void (*update_poll)(struct conn_stream *cs); /* commit cs flags to mux/conn */ size_t (*rcv_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to get data */ size_t (*snd_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to send data */ int (*rcv_pipe)(struct conn_stream *cs, struct pipe *pipe, unsigned int count); /* recv-to-pipe callback */ diff --git a/src/checks.c b/src/checks.c index a3110e73c..4829a5446 100644 --- a/src/checks.c +++ b/src/checks.c @@ -745,7 +745,6 @@ static void __event_srv_chk_w(struct conn_stream *cs) if (retrieve_errno_from_socket(conn)) { chk_report_conn_err(check, errno, 0); - __cs_stop_both(cs); goto out_wakeup; } @@ -771,7 +770,6 @@ static void __event_srv_chk_w(struct conn_stream *cs) b_realign_if_empty(&check->bo); if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) { chk_report_conn_err(check, errno, 0); - __cs_stop_both(cs); goto out_wakeup; } if (b_data(&check->bo)) { @@ -785,12 +783,10 @@ static void __event_srv_chk_w(struct conn_stream *cs) t->expire = tick_add_ifset(now_ms, s->proxy->timeout.check); task_queue(t); } - goto out_nowake; + goto out; out_wakeup: task_wakeup(t, TASK_WOKEN_IO); - out_nowake: - __cs_stop_send(cs); /* nothing more to write */ out: return; } @@ -1373,7 +1369,6 @@ static void __event_srv_chk_r(struct conn_stream *cs) * range quickly. To avoid sending RSTs all the time, we first try to * drain pending data. */ - __cs_stop_both(cs); cs_shutw(cs, CS_SHW_NORMAL); /* OK, let's not stay here forever */ @@ -1385,7 +1380,6 @@ out: return; wait_more_data: - __cs_want_recv(cs); cs->conn->mux->subscribe(cs, SUB_CAN_RECV, &check->wait_list); goto out; } @@ -1420,10 +1414,9 @@ static int wake_srv_chk(struct conn_stream *cs) * we expect errno to still be valid. */ chk_report_conn_err(check, errno, 0); - __cs_stop_both(cs); task_wakeup(check->task, TASK_WOKEN_IO); } - else if (!(conn->flags & CO_FL_HANDSHAKE) && !(cs->flags & (CS_FL_DATA_RD_ENA|CS_FL_DATA_WR_ENA))) { + else if (!(conn->flags & CO_FL_HANDSHAKE) && !check->type) { /* we may get here if only a connection probe was required : we * don't have any data to send nor anything expected in response, * so the completion of the connection establishment is enough. @@ -1624,8 +1617,6 @@ static int connect_conn_chk(struct task *t) if (proto && proto->connect) ret = proto->connect(conn, check->type, quickack ? 2 : 0); - if (check->type) - cs_want_send(cs); #ifdef USE_OPENSSL if (s->check.sni) @@ -2180,10 +2171,8 @@ static struct task *process_chk_conn(struct task *t, void *context, unsigned sho t->expire = tick_first(t->expire, t_con); } - if (check->type) { - cs_want_recv(cs); /* prepare for reading a possible reply */ + if (check->type) __event_srv_chk_r(cs); - } task_set_affinity(t, tid_bit); goto reschedule; @@ -2683,10 +2672,6 @@ static int tcpcheck_main(struct check *check) t->expire = tick_add_ifset(now_ms, s->proxy->timeout.check); } - /* It's only the rules which will enable send/recv */ - if (cs) - cs_stop_both(cs); - while (1) { /* We have to try to flush the output buffer before reading, at * the end, or if we're about to send a string that does not fit @@ -2699,14 +2684,12 @@ static int tcpcheck_main(struct check *check) check->current_step->string_len >= b_room(&check->bo))) { int ret; - __cs_want_send(cs); ret = cs->conn->mux->snd_buf(cs, &check->bo, b_data(&check->bo), 0); b_realign_if_empty(&check->bo); if (ret <= 0) { if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) { chk_report_conn_err(check, errno, 0); - __cs_stop_both(cs); goto out_end_tcpcheck; } break; @@ -2924,7 +2907,6 @@ static int tcpcheck_main(struct check *check) if (unlikely(check->result == CHK_RES_FAILED)) goto out_end_tcpcheck; - __cs_want_recv(cs); if (cs->conn->mux->rcv_buf(cs, &check->bi, b_size(&check->bi), 0) <= 0) { if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH) || cs->flags & CS_FL_ERROR) { done = 1; @@ -3026,7 +3008,6 @@ static int tcpcheck_main(struct check *check) if (check->current_step->action == TCPCHK_ACT_EXPECT) goto tcpcheck_expect; - __cs_stop_recv(cs); } } else { @@ -3046,7 +3027,6 @@ static int tcpcheck_main(struct check *check) if (check->current_step->action == TCPCHK_ACT_EXPECT) goto tcpcheck_expect; - __cs_stop_recv(cs); } /* not matched but was supposed to => ERROR */ else { @@ -3098,15 +3078,9 @@ static int tcpcheck_main(struct check *check) goto out_end_tcpcheck; } - /* warning, current_step may now point to the head */ - if (b_data(&check->bo)) - __cs_want_send(cs); - if (&check->current_step->list != head && - check->current_step->action == TCPCHK_ACT_EXPECT) { - __cs_want_recv(cs); + check->current_step->action == TCPCHK_ACT_EXPECT) __event_srv_chk_r(cs); - } goto out; out_end_tcpcheck: @@ -3120,8 +3094,6 @@ static int tcpcheck_main(struct check *check) if (check->result == CHK_RES_FAILED) conn->flags |= CO_FL_ERROR; - __cs_stop_both(cs); - out: return retcode; } diff --git a/src/connection.c b/src/connection.c index b62cccecd..502277290 100644 --- a/src/connection.c +++ b/src/connection.c @@ -134,6 +134,7 @@ void conn_fd_handler(int fd) conn->send_wait = NULL; } else io_available = 1; + __conn_xprt_stop_send(conn); } /* The data transfer starts here and stops on error and handshakes. Note @@ -153,6 +154,7 @@ void conn_fd_handler(int fd) conn->recv_wait = NULL; } else io_available = 1; + __conn_xprt_stop_recv(conn); } /* It may happen during the data phase that a handshake is @@ -341,6 +343,7 @@ int conn_unsubscribe(struct connection *conn, int event_type, void *param) conn->recv_wait = NULL; sw->wait_reason &= ~SUB_CAN_RECV; } + __conn_xprt_stop_recv(conn); } if (event_type & SUB_CAN_SEND) { sw = param; @@ -348,7 +351,9 @@ int conn_unsubscribe(struct connection *conn, int event_type, void *param) conn->send_wait = NULL; sw->wait_reason &= ~SUB_CAN_SEND; } + __conn_xprt_stop_send(conn); } + conn_update_xprt_polling(conn); return 0; } @@ -363,6 +368,7 @@ int conn_subscribe(struct connection *conn, int event_type, void *param) conn->recv_wait = sw; } event_type &= ~SUB_CAN_RECV; + __conn_xprt_want_recv(conn); } if (event_type & SUB_CAN_SEND) { sw = param; @@ -371,9 +377,11 @@ int conn_subscribe(struct connection *conn, int event_type, void *param) conn->send_wait = sw; } event_type &= ~SUB_CAN_SEND; + __conn_xprt_want_send(conn); } if (event_type != 0) return (-1); + conn_update_xprt_polling(conn); return 0; } @@ -603,6 +611,7 @@ int conn_recv_proxy(struct connection *conn, int flag) } line++; } + __conn_xprt_stop_recv(conn); if (!dst_s || !sport_s || !dport_s) goto bad_header; diff --git a/src/mux_h2.c b/src/mux_h2.c index f7999d279..5255ca0a1 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -286,24 +286,18 @@ static int h2_buf_available(void *target) if ((h2c->flags & H2_CF_DEM_DALLOC) && b_alloc_margin(&h2c->dbuf, 0)) { h2c->flags &= ~H2_CF_DEM_DALLOC; - if (h2_recv_allowed(h2c)) { - conn_xprt_want_recv(h2c->conn); + if (h2_recv_allowed(h2c)) tasklet_wakeup(h2c->wait_event.task); - } return 1; } if ((h2c->flags & H2_CF_MUX_MALLOC) && b_alloc_margin(&h2c->mbuf, 0)) { h2c->flags &= ~H2_CF_MUX_MALLOC; - if (!(h2c->flags & H2_CF_MUX_BLOCK_ANY)) - conn_xprt_want_send(h2c->conn); if (h2c->flags & H2_CF_DEM_MROOM) { h2c->flags &= ~H2_CF_DEM_MROOM; - if (h2_recv_allowed(h2c)) { - conn_xprt_want_recv(h2c->conn); + if (h2_recv_allowed(h2c)) tasklet_wakeup(h2c->wait_event.task); - } } return 1; } @@ -312,10 +306,8 @@ static int h2_buf_available(void *target) (h2s = h2c_st_by_id(h2c, h2c->dsi)) && h2s->cs && b_alloc_margin(&h2s->rxbuf, 0)) { h2c->flags &= ~H2_CF_DEM_SALLOC; - if (h2_recv_allowed(h2c)) { - conn_xprt_want_recv(h2c->conn); + if (h2_recv_allowed(h2c)) tasklet_wakeup(h2c->wait_event.task); - } return 1; } @@ -427,7 +419,6 @@ static int h2_init(struct connection *conn, struct proxy *prx) task_queue(t); /* prepare to read something */ - conn_xprt_want_recv(conn); tasklet_wakeup(h2c->wait_event.task); return 0; fail: @@ -2255,10 +2246,8 @@ static int h2_recv(struct h2c *h2c) ret = 0; } while (ret > 0); - if (h2_recv_allowed(h2c) && (b_data(buf) < buf->size)) { - conn_xprt_want_recv(conn); + if (h2_recv_allowed(h2c) && (b_data(buf) < buf->size)) conn->xprt->subscribe(conn, SUB_CAN_RECV, &h2c->wait_event); - } if (!b_data(buf)) { h2_release_buf(h2c, &h2c->dbuf); @@ -2445,25 +2434,13 @@ static int h2_process(struct h2c *h2c) if (!b_data(&h2c->dbuf)) h2_release_buf(h2c, &h2c->dbuf); - /* stop being notified of incoming data if we can't process them */ - if (!h2_recv_allowed(h2c)) - __conn_xprt_stop_recv(conn); - else - __conn_xprt_want_recv(conn); - - /* adjust output polling */ - if (!(conn->flags & CO_FL_SOCK_WR_SH) && - h2c->st0 != H2_CS_ERROR2 && !(h2c->flags & H2_CF_GOAWAY_FAILED) && - (h2c->st0 == H2_CS_ERROR || - b_data(&h2c->mbuf) || - (h2c->mws > 0 && !LIST_ISEMPTY(&h2c->fctl_list)) || - (!(h2c->flags & H2_CF_MUX_BLOCK_ANY) && !LIST_ISEMPTY(&h2c->send_list)))) { - __conn_xprt_want_send(conn); - } - else { + if ((conn->flags & CO_FL_SOCK_WR_SH) || + h2c->st0 == H2_CS_ERROR2 || (h2c->flags & H2_CF_GOAWAY_FAILED) || + (h2c->st0 != H2_CS_ERROR && + !b_data(&h2c->mbuf) && + (h2c->mws <= 0 || LIST_ISEMPTY(&h2c->fctl_list)) && + ((h2c->flags & H2_CF_MUX_BLOCK_ANY) || LIST_ISEMPTY(&h2c->send_list)))) h2_release_buf(h2c, &h2c->mbuf); - __conn_xprt_stop_send(conn); - } if (h2c->task) { if (eb_is_empty(&h2c->streams_by_id) || b_data(&h2c->mbuf)) { @@ -2553,48 +2530,6 @@ static struct conn_stream *h2_attach(struct connection *conn) return NULL; } -/* callback used to update the mux's polling flags after changing a cs' status. - * The caller (cs_update_mux_polling) will take care of propagating any changes - * to the transport layer. - */ -static void h2_update_poll(struct conn_stream *cs) -{ - struct h2s *h2s = cs->ctx; - - if (!h2s) - return; - - /* we may unblock a blocked read */ - - if (cs->flags & CS_FL_DATA_RD_ENA) { - /* the stream indicates it's willing to read */ - h2s->h2c->flags &= ~H2_CF_DEM_SFULL; - if (h2s->h2c->dsi == h2s->id) { - conn_xprt_want_recv(cs->conn); - tasklet_wakeup(h2s->h2c->wait_event.task); - conn_xprt_want_send(cs->conn); - } - } - - /* Note: the stream and stream-int code doesn't allow us to perform a - * synchronous send() here unfortunately, because this code is called - * as si_update() from the process_stream() context. This means that - * we have to queue the current cs and defer its processing after the - * connection's cs list is processed anyway. - */ - - if (cs->flags & CS_FL_DATA_WR_ENA) { - if (!b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_SOCK_WR_SH)) - conn_xprt_want_send(cs->conn); - tasklet_wakeup(h2s->h2c->wait_event.task); - } - /* We don't support unsubscribing from here, it shouldn't be a problem */ - - /* this can happen from within si_chk_snd() */ - if (b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_XPRT_WR_ENA)) - conn_xprt_want_send(cs->conn); -} - /* * Detach the stream from the connection and possibly release the connection. */ @@ -2613,11 +2548,8 @@ static void h2_detach(struct conn_stream *cs) if (h2c->flags & H2_CF_DEM_TOOMANY && !h2_has_too_many_cs(h2c)) { h2c->flags &= ~H2_CF_DEM_TOOMANY; - if (h2_recv_allowed(h2c)) { - __conn_xprt_want_recv(h2c->conn); + if (h2_recv_allowed(h2c)) tasklet_wakeup(h2c->wait_event.task); - conn_xprt_want_send(h2c->conn); - } } /* this stream may be blocked waiting for some data to leave (possibly @@ -2635,9 +2567,7 @@ static void h2_detach(struct conn_stream *cs) */ h2c->flags &= ~H2_CF_DEM_BLOCK_ANY; h2c->flags &= ~H2_CF_MUX_BLOCK_ANY; - conn_xprt_want_recv(cs->conn); tasklet_wakeup(h2c->wait_event.task); - conn_xprt_want_send(cs->conn); } h2s_destroy(h2s); @@ -2688,8 +2618,6 @@ static void h2_do_shutr(struct h2s *h2s) !(h2s->h2c->flags & (H2_CF_GOAWAY_SENT|H2_CF_GOAWAY_FAILED)) && h2c_send_goaway_error(h2c, h2s) <= 0) return; - if (b_data(&h2c->mbuf) && !(h2c->conn->flags & CO_FL_XPRT_WR_ENA)) - conn_xprt_want_send(h2c->conn); h2s_close(h2s); @@ -2747,8 +2675,6 @@ static void h2_do_shutw(struct h2s *h2s) h2s_close(h2s); } - if (b_data(&h2s->h2c->mbuf) && !(h2c->conn->flags & CO_FL_XPRT_WR_ENA)) - conn_xprt_want_send(h2c->conn); add_to_list: if (LIST_ISEMPTY(&h2s->list)) { @@ -3690,7 +3616,6 @@ static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun b_del(buf, total); if (total > 0) { - conn_xprt_want_send(h2s->h2c->conn); if (!(h2s->h2c->wait_event.wait_reason & SUB_CAN_SEND)) tasklet_wakeup(h2s->h2c->wait_event.task); } @@ -3795,7 +3720,6 @@ static int h2_parse_max_concurrent_streams(char **args, int section_type, struct const struct mux_ops h2_ops = { .init = h2_init, .wake = h2_wake, - .update_poll = h2_update_poll, .snd_buf = h2_snd_buf, .rcv_buf = h2_rcv_buf, .subscribe = h2_subscribe, diff --git a/src/mux_pt.c b/src/mux_pt.c index 3a573b5a9..beca8c2ea 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -60,31 +60,9 @@ static int mux_pt_wake(struct connection *conn) if ((conn->flags & (CO_FL_EARLY_DATA | CO_FL_EARLY_SSL_HS | CO_FL_HANDSHAKE)) == CO_FL_EARLY_DATA) conn->flags &= ~CO_FL_EARLY_DATA; - if (ret >= 0) - cs_update_mux_polling(cs); return ret; } -/* callback used to update the mux's polling flags after changing a cs' status. - * The caller (cs_mux_update_poll) will take care of propagating any changes to - * the transport layer. - */ -static void mux_pt_update_poll(struct conn_stream *cs) -{ - struct connection *conn = cs->conn; - int flags = 0; - - conn_refresh_polling_flags(conn); - - if (cs->flags & CS_FL_DATA_RD_ENA) - flags |= CO_FL_XPRT_RD_ENA; - if (cs->flags & CS_FL_DATA_WR_ENA) - flags |= CO_FL_XPRT_WR_ENA; - - conn->flags = (conn->flags & ~(CO_FL_XPRT_RD_ENA | CO_FL_XPRT_WR_ENA)) | flags; - conn_cond_update_xprt_polling(conn); -} - /* * Attach a new stream to a connection * (Used for outgoing connections) @@ -191,7 +169,6 @@ static int mux_pt_snd_pipe(struct conn_stream *cs, struct pipe *pipe) const struct mux_ops mux_pt_ops = { .init = mux_pt_init, .wake = mux_pt_wake, - .update_poll = mux_pt_update_poll, .rcv_buf = mux_pt_rcv_buf, .snd_buf = mux_pt_snd_buf, .subscribe = mux_pt_subscribe, diff --git a/src/stream.c b/src/stream.c index 7444115a4..e202b87e4 100644 --- a/src/stream.c +++ b/src/stream.c @@ -268,9 +268,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) goto out_fail_accept; /* finish initialization of the accepted file descriptor */ - if (cs) - cs_want_recv(cs); - else if (appctx) + if (appctx) si_applet_want_get(&s->si[0]); if (sess->fe->accept && sess->fe->accept(s) < 0) @@ -1679,6 +1677,7 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) si_cs_send(cs); si_cs_recv(cs); } +redo: //DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, // si_f->state, si_b->state, si_b->err_type, req->flags, res->flags); @@ -2444,6 +2443,19 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED)) stream_process_counters(s); + cs = objt_cs(si_f->end); + ret = 0; + if (cs && !(cs->conn->flags & CO_FL_ERROR) && + !(cs->flags & CS_FL_ERROR) && !(si_oc(si_f)->flags & CF_SHUTW)) + ret = si_cs_send(cs); + cs = objt_cs(si_b->end); + if (cs && !(cs->conn->flags & CO_FL_ERROR) && + !(cs->flags & CS_FL_ERROR) && !(si_oc(si_b)->flags & CF_SHUTW)) + ret |= si_cs_send(cs); + + if (ret) + goto redo; + if (si_f->state == SI_ST_EST) si_update(si_f); @@ -2510,22 +2522,6 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES); stream_release_buffers(s); /* We may have free'd some space in buffers, or have more to send/recv, try again */ - cs = objt_cs(si_f->end); - ret = 0; - if (cs && !(cs->conn->flags & CO_FL_ERROR)) { - ret |= si_cs_send(cs); - si_cs_recv(cs); - ret |= (si_ic(si_f)->flags & CF_READ_PARTIAL) | (cs->conn->flags & CO_FL_ERROR); - } - cs = objt_cs(si_b->end); - if (cs && !(cs->conn->flags & CO_FL_ERROR)) { - ret |= si_cs_send(cs); - si_cs_recv(cs); - ret |= (si_ic(si_b)->flags & CF_READ_PARTIAL) | (cs->conn->flags & CO_FL_ERROR); - - } - if (ret) - task_wakeup(t, TASK_WOKEN_IO); return t; /* nothing more to do */ } diff --git a/src/stream_interface.c b/src/stream_interface.c index a0487ef5a..8057d2751 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -66,7 +66,6 @@ struct si_ops si_embedded_ops = { /* stream-interface operations for connections */ struct si_ops si_conn_ops = { - .update = stream_int_update_conn, .chk_rcv = stream_int_chk_rcv_conn, .chk_snd = stream_int_chk_snd_conn, .shutr = stream_int_shutr_conn, @@ -258,6 +257,7 @@ static void stream_int_chk_rcv(struct stream_interface *si) else { /* (re)start reading */ si->flags &= ~SI_FL_WAIT_ROOM; + tasklet_wakeup(si->wait_event.task); if (!(si->flags & SI_FL_DONT_WAKE)) task_wakeup(si_task(si), TASK_WOKEN_IO); } @@ -518,8 +518,10 @@ void stream_int_notify(struct stream_interface *si) /* check if the consumer has freed some space either in the * buffer or in the pipe. */ - if (channel_may_recv(ic) && new_len < last_len) + if (channel_may_recv(ic) && new_len < last_len) { + tasklet_wakeup(si->wait_event.task); si->flags &= ~SI_FL_WAIT_ROOM; + } } if (si->flags & SI_FL_WAIT_ROOM) { @@ -566,6 +568,7 @@ static int si_cs_process(struct conn_stream *cs) struct stream_interface *si = cs->data; struct channel *ic = si_ic(si); struct channel *oc = si_oc(si); + int wait_room = si->flags & SI_FL_WAIT_ROOM; /* If we have data to send, try it now */ if (!channel_is_empty(oc) && objt_cs(si->end)) @@ -600,20 +603,9 @@ static int si_cs_process(struct conn_stream *cs) stream_int_notify(si); channel_release_buffer(ic, &(si_strm(si)->buffer_wait)); - /* Third step : update the connection's polling status based on what - * was done above (eg: maybe some buffers got emptied). - */ - if (channel_is_empty(oc)) - __cs_stop_send(cs); - - - if (si->flags & SI_FL_WAIT_ROOM) { - __cs_stop_recv(cs); - } - else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL && - channel_may_recv(ic)) { - __cs_want_recv(cs); - } + /* Try to recv() again if we free'd some room in the process */ + if (wait_room && !(si->flags & SI_FL_WAIT_ROOM)) + si_cs_recv(cs); return 0; } @@ -720,10 +712,8 @@ int si_cs_send(struct conn_stream *cs) } } /* We couldn't send all of our data, let the mux know we'd like to send more */ - if (co_data(oc)) { - cs_want_send(cs); + if (co_data(oc)) conn->mux->subscribe(cs, SUB_CAN_SEND, &si->wait_event); - } return did_send; } @@ -776,6 +766,7 @@ void stream_int_update(struct stream_interface *si) * have updated it if there has been a completed I/O. */ si->flags &= ~SI_FL_WAIT_ROOM; + tasklet_wakeup(si->wait_event.task); if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex)) ic->rex = tick_add_ifset(now_ms, ic->rto); } @@ -814,37 +805,6 @@ void stream_int_update(struct stream_interface *si) } } -/* Updates the polling status of a connection outside of the connection handler - * based on the channel's flags and the stream interface's flags. It needs to be - * called once after the channels' flags have settled down and the stream has - * been updated. It is not designed to be called from within the connection - * handler itself. - */ -void stream_int_update_conn(struct stream_interface *si) -{ - struct channel *ic = si_ic(si); - struct channel *oc = si_oc(si); - struct conn_stream *cs = __objt_cs(si->end); - - if (!(ic->flags & CF_SHUTR)) { - /* Read not closed */ - if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) - __cs_stop_recv(cs); - else - __cs_want_recv(cs); - } - - if (!(oc->flags & CF_SHUTW)) { - /* Write not closed */ - if (channel_is_empty(oc)) - __cs_stop_send(cs); - else - __cs_want_send(cs); - } - - cs_update_mux_polling(cs); -} - /* * This function performs a shutdown-read on a stream interface attached to * a connection in a connected or init state (it does nothing for other @@ -858,7 +818,6 @@ void stream_int_update_conn(struct stream_interface *si) static void stream_int_shutr_conn(struct stream_interface *si) { struct conn_stream *cs = __objt_cs(si->end); - struct connection *conn = cs->conn; struct channel *ic = si_ic(si); ic->flags &= ~CF_SHUTR_NOW; @@ -880,10 +839,6 @@ static void stream_int_shutr_conn(struct stream_interface *si) /* we want to immediately forward this close to the write side */ return stream_int_shutw_conn(si); } - else if (conn->ctrl) { - /* we want the caller to disable polling on this FD */ - cs_stop_recv(cs); - } } /* @@ -980,24 +935,23 @@ static void stream_int_shutw_conn(struct stream_interface *si) static void stream_int_chk_rcv_conn(struct stream_interface *si) { struct channel *ic = si_ic(si); - struct conn_stream *cs = __objt_cs(si->end); if (unlikely(si->state > SI_ST_EST || (ic->flags & CF_SHUTR))) return; if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) { /* stop reading */ - if (!(ic->flags & CF_DONT_READ)) /* full */ { - si->flags |= SI_FL_WAIT_ROOM; - } - __cs_stop_recv(cs); + si->flags |= SI_FL_WAIT_ROOM; } else { + struct conn_stream *cs = objt_cs(si->end); /* (re)start reading */ si->flags &= ~SI_FL_WAIT_ROOM; - __cs_want_recv(cs); + if (cs) { + si_cs_recv(cs); + tasklet_wakeup(si->wait_event.task); + } } - cs_update_mux_polling(cs); } @@ -1024,20 +978,10 @@ static void stream_int_chk_snd_conn(struct stream_interface *si) !(si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */ return; - if (cs->flags & CS_FL_DATA_WR_ENA) { - /* already subscribed to write notifications, will be called - * anyway, so let's avoid calling it especially if the reader - * is not ready. - */ - return; - } - - __cs_want_send(cs); - si_cs_send(cs); + tasklet_wakeup(si->wait_event.task); if (cs->flags & CS_FL_ERROR || cs->conn->flags & CO_FL_ERROR) { /* Write error on the file descriptor */ - __cs_stop_both(cs); si->flags |= SI_FL_ERR; goto out_wakeup; } @@ -1051,7 +995,6 @@ static void stream_int_chk_snd_conn(struct stream_interface *si) * ->o limit was reached. Maybe we just wrote the last * chunk and need to close. */ - __cs_stop_send(cs); if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) == (CF_AUTO_CLOSE|CF_SHUTW_NOW)) && (si->state == SI_ST_EST)) { @@ -1067,7 +1010,6 @@ static void stream_int_chk_snd_conn(struct stream_interface *si) /* Otherwise there are remaining data to be sent in the buffer, * which means we have to poll before doing so. */ - __cs_want_send(cs); si->flags &= ~SI_FL_WAIT_DATA; if (!tick_isset(oc->wex)) oc->wex = tick_add_ifset(now_ms, oc->wto); @@ -1105,9 +1047,6 @@ static void stream_int_chk_snd_conn(struct stream_interface *si) if (!(si->flags & SI_FL_DONT_WAKE)) task_wakeup(si_task(si), TASK_WOKEN_IO); } - - /* commit possible polling changes */ - cs_update_mux_polling(cs); } /* @@ -1208,7 +1147,6 @@ int si_cs_recv(struct conn_stream *cs) * could soon be full. Let's stop before needing to poll. */ si->flags |= SI_FL_WAIT_ROOM; - __cs_stop_recv(cs); } /* splice not possible (anymore), let's go on on standard copy */ @@ -1274,7 +1212,6 @@ int si_cs_recv(struct conn_stream *cs) * This was changed to accomodate with the mux code, * but we may have lost a worthwhile optimization. */ - __cs_stop_recv(cs); si->flags |= SI_FL_WAIT_ROOM; break; } @@ -1347,9 +1284,10 @@ int si_cs_recv(struct conn_stream *cs) goto out_shutdown_r; /* Subscribe to receive events */ - conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_event); + if (!(si->flags & SI_FL_WAIT_ROOM)) + conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_event); - return cur_read != 0; + return (cur_read != 0 || (si->flags & SI_FL_WAIT_ROOM)); out_shutdown_r: /* we received a shutdown */ @@ -1392,7 +1330,6 @@ void stream_sock_read0(struct stream_interface *si) } /* otherwise that's just a normal read shutdown */ - __cs_stop_recv(cs); return; do_close: