diff --git a/src/peers.c b/src/peers.c index 41e358524..740480b27 100644 --- a/src/peers.c +++ b/src/peers.c @@ -1245,6 +1245,7 @@ static inline int peer_send_msg(struct appctx *appctx, int (*peer_prepare_msg)(char *, size_t, struct peer_prep_params *), struct peer_prep_params *params) { + struct peer *peer = appctx->svcctx; int ret, msglen; TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_TX_MSG, appctx); @@ -1264,6 +1265,11 @@ static inline int peer_send_msg(struct appctx *appctx, appctx->st0 = PEER_SESS_ST_END; } } + else if (peer) { + /* A message was sent, rearm the heartbeat timer */ + peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); + peer->flags &= ~PEER_F_HEARTBEAT; + } TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_TX_MSG, appctx); return ret; @@ -2516,7 +2522,6 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee } else if (msg_head[1] == PEER_MSG_CTRL_HEARTBEAT) { TRACE_PROTO("Heartbeat message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer); - peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); peer->rx_hbt++; } } @@ -3108,6 +3113,7 @@ switchstate: if (!peer_treat_awaited_msg(appctx, curpeer, msg_head, &msg_cur, msg_end, msg_len, totl)) goto switchstate; + curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); curpeer->flags |= PEER_F_ALIVE; /* skip consumed message */ @@ -3122,8 +3128,15 @@ switchstate: goto switchstate; send_msgs: - if (curpeer->flags & PEER_F_HEARTBEAT) { - curpeer->flags &= ~PEER_F_HEARTBEAT; + /* we get here when a peer_recv_msg() returns 0 in reql */ + repl = peer_send_msgs(appctx, curpeer, curpeer->peers); + if (repl <= 0) { + if (repl == -1) + goto out; + goto switchstate; + } + + if (tick_is_expired(curpeer->heartbeat, now_ms) || (curpeer->flags & PEER_F_HEARTBEAT)) { repl = peer_send_heartbeatmsg(appctx, curpeer, curpeer->peers); if (repl <= 0) { if (repl == -1) @@ -3132,13 +3145,6 @@ send_msgs: } curpeer->tx_hbt++; } - /* we get here when a peer_recv_msg() returns 0 in reql */ - repl = peer_send_msgs(appctx, curpeer, curpeer->peers); - if (repl <= 0) { - if (repl == -1) - goto out; - goto switchstate; - } /* noting more to do */ goto out; @@ -3518,65 +3524,43 @@ static void __process_running_peer_sync(struct task *task, struct peers *peers, appctx_wakeup(peer->appctx); } else { - int update_to_push = 0; - - /* Awake session if there is data to push */ - for (st = peer->tables; st ; st = st->next) { - if (tick_is_le(peer->last_update, st->table->last_update)) - update_to_push = 1; - - if (update_to_push == 1) { - /* wake up the peer handler to push local updates */ - /* There is no need to send a heartbeat message - * when some updates must be pushed. The remote - * peer will consider peer as alive when it will - * receive these updates. + if (tick_is_expired(peer->reconnect, now_ms)) { + if (peer->flags & PEER_F_ALIVE) { + /* This peer was alive during a 'reconnect' period. + * Flag it as not alive again for the next period. */ - peer->flags &= ~PEER_F_HEARTBEAT; - /* Re-schedule another one later. */ - peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); - /* Refresh reconnect if necessary */ - if (tick_is_expired(peer->reconnect, now_ms)) - peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); - /* We are going to send updates, let's ensure we will - * come back to send heartbeat messages or to reconnect. - */ - TRACE_DEVEL("wakeup peer session to send update", PEERS_EV_SESS_WAKE, NULL, peer); - task->expire = tick_first(peer->reconnect, peer->heartbeat); - appctx_wakeup(peer->appctx); - break; + peer->flags &= ~PEER_F_ALIVE; + TRACE_STATE("unresponsive peer session detected", PEERS_EV_SESS_SHUT, NULL, peer); + peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); + } + else { + peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); + peer->heartbeat = TICK_ETERNITY; + TRACE_STATE("dead peer session, force shutdown", PEERS_EV_SESS_SHUT, NULL, peer); + peer_session_forceshutdown(peer); + sync_peer_app_state(peers, peer); + peer->no_hbt++; } } - /* When there are updates to send we do not reconnect - * and do not send heartbeat message either. - */ - if (!update_to_push) { - if (tick_is_expired(peer->reconnect, now_ms)) { - if (peer->flags & PEER_F_ALIVE) { - /* This peer was alive during a 'reconnect' period. - * Flag it as not alive again for the next period. - */ - peer->flags &= ~PEER_F_ALIVE; - TRACE_STATE("unresponsive peer session detected", PEERS_EV_SESS_SHUT, NULL, peer); - peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); - } - else { - peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); - peer->heartbeat = TICK_ETERNITY; - TRACE_STATE("dead peer session, force shutdown", PEERS_EV_SESS_SHUT, NULL, peer); - peer_session_forceshutdown(peer); - sync_peer_app_state(peers, peer); - peer->no_hbt++; - } - } - else if (tick_is_expired(peer->heartbeat, now_ms)) { + + if (peer->appctx) { + if (tick_is_expired(peer->heartbeat, now_ms)) { peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); peer->flags |= PEER_F_HEARTBEAT; TRACE_DEVEL("wakeup peer session to send heartbeat message", PEERS_EV_SESS_WAKE, NULL, peer); appctx_wakeup(peer->appctx); } - task->expire = tick_first(peer->reconnect, peer->heartbeat); + else { + for (st = peer->tables; st ; st = st->next) { + if (tick_is_le(peer->last_update, st->table->last_update)) { + appctx_wakeup(peer->appctx); + break; + } + } + } } + + task->expire = tick_first(peer->reconnect, peer->heartbeat); } /* else do nothing */ } /* SUCCESSCODE */