diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index e7293d747..d1c022c26 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -278,6 +278,18 @@ static inline void si_rx_endp_done(struct stream_interface *si) si->flags |= SI_FL_RX_WAIT_EP; } +/* Tell a stream interface the input channel is OK with it sending it some data */ +static inline void si_rx_chan_rdy(struct stream_interface *si) +{ + si->flags &= ~SI_FL_RXBLK_CHAN; +} + +/* Tell a stream interface the input channel is not OK with it sending it some data */ +static inline void si_rx_chan_blk(struct stream_interface *si) +{ + si->flags |= SI_FL_RXBLK_CHAN; +} + /* The stream interface just got the input buffer it was waiting for */ static inline void si_rx_buff_rdy(struct stream_interface *si) { diff --git a/src/stream_interface.c b/src/stream_interface.c index 80412e7c1..c349aac2e 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -481,10 +481,13 @@ void stream_int_notify(struct stream_interface *si) } if ((sio->flags & SI_FL_RXBLK_ROOM) && - ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL || - channel_is_empty(oc))) { + ((oc->flags & CF_WRITE_PARTIAL) || channel_is_empty(oc))) sio->flags &= ~SI_FL_RXBLK_ROOM; - } + + if (oc->flags & CF_DONT_READ) + si_rx_chan_blk(sio); + else + si_rx_chan_rdy(sio); /* Notify the other side when we've injected data into the IC that * needs to be forwarded. We can do fast-forwarding as soon as there @@ -519,13 +522,16 @@ void stream_int_notify(struct stream_interface *si) si->flags &= ~SI_FL_RXBLK_ROOM; } + if (!(ic->flags & CF_DONT_READ)) + si_rx_chan_rdy(si); + si_chk_rcv(si); si_chk_rcv(sio); - if (si->flags & SI_FL_RXBLK_ROOM) { + if (si_rx_blocked(si)) { ic->rex = TICK_ETERNITY; } - else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL) { + else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL)) == CF_READ_PARTIAL) { /* we must re-enable reading if si_chk_snd() has freed some space */ if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex)) ic->rex = tick_add_ifset(now_ms, ic->rto); @@ -725,11 +731,8 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state) if (!(si->wait_event.wait_reason & SUB_CAN_SEND) && !channel_is_empty(si_oc(si))) ret = si_cs_send(cs); - if (!(si->wait_event.wait_reason & SUB_CAN_RECV)) { + if (!(si->wait_event.wait_reason & SUB_CAN_RECV)) ret |= si_cs_recv(cs); - if (!(si_ic(si)->flags & (CF_SHUTR|CF_DONT_READ))) - si->flags &= ~SI_FL_RX_WAIT_EP; - } if (ret != 0) si_cs_process(cs); @@ -751,10 +754,14 @@ void stream_int_update(struct stream_interface *si) if (!(ic->flags & CF_SHUTR)) { /* Read not closed, update FD status and timeout for reads */ - if ((ic->flags & CF_DONT_READ) || !channel_is_empty(ic)) { + if (ic->flags & CF_DONT_READ) + si_rx_chan_blk(si); + else + si_rx_chan_rdy(si); + + if (!channel_is_empty(ic)) { /* stop reading, imposed by channel's policy or contents */ si_cant_put(si); - ic->rex = TICK_ETERNITY; } else { /* (re)start reading and update timeout. Note: we don't recompute the timeout @@ -763,9 +770,12 @@ void stream_int_update(struct stream_interface *si) * have updated it if there has been a completed I/O. */ si->flags &= ~SI_FL_RXBLK_ROOM; - if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex)) - ic->rex = tick_add_ifset(now_ms, ic->rto); } + if (si->flags & SI_FL_RXBLK_ANY & ~SI_FL_RX_WAIT_EP) + ic->rex = TICK_ETERNITY; + else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex)) + ic->rex = tick_add_ifset(now_ms, ic->rto); + si_chk_rcv(si); } else @@ -1254,7 +1264,7 @@ int si_cs_recv(struct conn_stream *cs) if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) { /* we're stopped by the channel's policy */ - si_cant_put(si); + si_rx_chan_blk(si); break; } @@ -1269,7 +1279,7 @@ int si_cs_recv(struct conn_stream *cs) */ if (ic->flags & CF_STREAMER) { /* we're stopped by the channel's policy */ - si_cant_put(si); + si_rx_chan_blk(si); break; } @@ -1278,7 +1288,7 @@ int si_cs_recv(struct conn_stream *cs) */ if (ret >= global.tune.recv_enough) { /* we're stopped by the channel's policy */ - si_cant_put(si); + si_rx_chan_blk(si); break; } } @@ -1286,7 +1296,7 @@ int si_cs_recv(struct conn_stream *cs) /* if we are waiting for more space, don't try to read more data * right now. */ - if (si->flags & SI_FL_RXBLK_ROOM) + if (si_rx_blocked(si)) break; } /* while !flags */