MEDIUM: conn-stream/applet: Add a data callback for applets

data callbacks were only used for streams attached to a connection and
for health-checks. However there is a callback used by task_run_applet. So,
si_applet_wake_cb() is first renamed to cs_applet_process() and it is
defined as the data callback for streams attached to an applet. This way,
this part now manipulates a conn-stream instead of a stream-interface. In
addition, applets are no longer handled as an exception for this part.
This commit is contained in:
Christopher Faulet 2022-04-01 16:34:53 +02:00
parent ef285c18f2
commit 6059ba4acc
4 changed files with 23 additions and 16 deletions

View file

@ -30,13 +30,14 @@
#include <haproxy/obj_type.h>
extern struct data_cb si_conn_cb;
extern struct data_cb cs_data_applet_cb;
extern struct data_cb check_conn_cb;
struct stream_interface *si_new(struct conn_stream *cs);
void si_free(struct stream_interface *si);
/* main event functions used to move data between sockets and buffers */
void si_applet_wake_cb(struct stream_interface *si);
int cs_applet_process(struct conn_stream *cs);
struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned int state);
int si_sync_recv(struct stream_interface *si);
void si_sync_send(struct stream_interface *si);

View file

@ -175,7 +175,7 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
stream_dump_and_crash(&app->obj_type, read_freq_ctr(&app->call_rate));
}
si_applet_wake_cb(cs->si);
cs->data_cb->wake(cs);
channel_release_buffer(cs_ic(cs), &app->buffer_wait);
return t;
}

View file

@ -252,7 +252,7 @@ void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx)
appctx->owner = cs;
if (cs_strm(cs)) {
cs->ops = &cs_app_applet_ops;
cs->data_cb = NULL;
cs->data_cb = &cs_data_applet_cb;
}
}
@ -282,7 +282,7 @@ int cs_attach_strm(struct conn_stream *cs, struct stream *strm)
}
else if (cs->endp->flags & CS_EP_T_APPLET) {
cs->ops = &cs_app_applet_ops;
cs->data_cb = NULL;
cs->data_cb = &cs_data_applet_cb;
}
else {
cs->ops = &cs_app_embedded_ops;

View file

@ -53,6 +53,11 @@ struct data_cb si_conn_cb = {
};
struct data_cb cs_data_applet_cb = {
.wake = cs_applet_process,
.name = "STRM",
};
struct stream_interface *si_new(struct conn_stream *cs)
{
struct stream_interface *si;
@ -908,36 +913,37 @@ static void stream_int_read0(struct stream_interface *si)
* may re-enable the applet's based on the channels and stream interface's final
* states.
*/
void si_applet_wake_cb(struct stream_interface *si)
int cs_applet_process(struct conn_stream *cs)
{
struct channel *ic = si_ic(si);
struct channel *ic = cs_ic(cs);
BUG_ON(!cs_appctx(si->cs));
BUG_ON(!cs_appctx(cs));
/* If the applet wants to write and the channel is closed, it's a
* broken pipe and it must be reported.
*/
if (!(si->flags & SI_FL_RX_WAIT_EP) && (ic->flags & CF_SHUTR))
si->cs->endp->flags |= CS_EP_ERROR;
if (!(cs->si->flags & SI_FL_RX_WAIT_EP) && (ic->flags & CF_SHUTR))
cs->endp->flags |= CS_EP_ERROR;
/* automatically mark the applet having data available if it reported
* begin blocked by the channel.
*/
if (si_rx_blocked(si))
si_rx_endp_more(si);
if (si_rx_blocked(cs->si))
si_rx_endp_more(cs->si);
/* update the stream-int, channels, and possibly wake the stream up */
stream_int_notify(si);
stream_release_buffers(si_strm(si));
stream_int_notify(cs->si);
stream_release_buffers(__cs_strm(cs));
/* stream_int_notify may have passed through chk_snd and released some
* RXBLK flags. Process_stream will consider those flags to wake up the
* appctx but in the case the task is not in runqueue we may have to
* wakeup the appctx immediately.
*/
if ((si_rx_endp_ready(si) && !si_rx_blocked(si)) ||
(si_tx_endp_ready(si) && !si_tx_blocked(si)))
appctx_wakeup(__cs_appctx(si->cs));
if ((si_rx_endp_ready(cs->si) && !si_rx_blocked(cs->si)) ||
(si_tx_endp_ready(cs->si) && !si_tx_blocked(cs->si)))
appctx_wakeup(__cs_appctx(cs));
return 0;
}
/*