From 217ab03e5963af8b2ffec50181f9be0903923bbc Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Fri, 24 Sep 2021 16:42:39 +0200 Subject: [PATCH] heartbeat: wrap messages with a timestamp Track when a heartbeat was received to allow other components to check when it will expire. --- pkg/icingadb/ha.go | 4 ++-- pkg/icingaredis/heartbeat.go | 41 ++++++++++++++++++++++++++++-------- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 3b991d1f..64f7c8ff 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -123,7 +123,7 @@ func (h *HA) controller() { case m := <-h.heartbeat.Events(): if m != nil { now := time.Now() - t, err := m.Time() + t, err := m.Stats().Time() if err != nil { h.abort(err) } @@ -136,7 +136,7 @@ func (h *HA) controller() { h.signalHandover() continue } - s, err := m.IcingaStatus() + s, err := m.Stats().IcingaStatus() if err != nil { h.abort(err) } diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index d84afe4a..b33a8af0 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -20,7 +20,7 @@ var timeout = 60 * time.Second // Also signals on if the heartbeat is Lost. type Heartbeat struct { active bool - events chan *v1.StatsMessage + events chan *HeartbeatMessage cancelCtx context.CancelFunc client *Client done chan struct{} @@ -34,7 +34,7 @@ func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger ctx, cancelCtx := context.WithCancel(ctx) heartbeat := &Heartbeat{ - events: make(chan *v1.StatsMessage, 1), + events: make(chan *HeartbeatMessage, 1), cancelCtx: cancelCtx, client: client, done: make(chan struct{}), @@ -49,7 +49,7 @@ func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger // Events returns a channel that is sent to on heartbeat events. // // A non-nil pointer signals that a heartbeat was received from Icinga 2 whereas a nil pointer signals a heartbeat loss. -func (h *Heartbeat) Events() <-chan *v1.StatsMessage { +func (h *Heartbeat) Events() <-chan *HeartbeatMessage { return h.events } @@ -81,7 +81,7 @@ func (h *Heartbeat) controller(ctx context.Context) { h.logger.Info("Waiting for Icinga 2 heartbeat") - messages := make(chan v1.StatsMessage) + messages := make(chan *HeartbeatMessage) defer close(messages) g, ctx := errgroup.WithContext(ctx) @@ -103,8 +103,13 @@ func (h *Heartbeat) controller(ctx context.Context) { return WrapCmdErr(cmd) } + m := &HeartbeatMessage{ + received: time.Now(), + stats: streams[0].Messages[0].Values, + } + select { - case messages <- streams[0].Messages[0].Values: + case messages <- m: case <-ctx.Done(): return ctx.Err() } @@ -119,14 +124,14 @@ func (h *Heartbeat) controller(ctx context.Context) { select { case m := <-messages: if !h.active { - s, err := m.IcingaStatus() + s, err := m.Stats().IcingaStatus() if err != nil { return errors.Wrapf(err, "can't parse Icinga 2 status from message %#v", m) } h.logger.Infow("Received first Icinga 2 heartbeat", zap.String("environment", s.Environment)) h.active = true } - h.sendEvent(&m) + h.sendEvent(m) case <-time.After(timeout): if h.active { h.logger.Warnw("Lost Icinga 2 heartbeat", zap.Duration("timeout", timeout)) @@ -158,12 +163,14 @@ func (h *Heartbeat) setError(err error) { h.err = errors.Wrap(err, "heartbeat failed") } -func (h *Heartbeat) sendEvent(m *v1.StatsMessage) { +func (h *Heartbeat) sendEvent(m *HeartbeatMessage) { // Remove any not yet delivered event select { case old := <-h.events: if old != nil { - h.logger.Debug("Previous heartbeat not read from channel") + h.logger.Debugw("Previous heartbeat not read from channel", + zap.Time("previous", old.received), + zap.Time("current", m.received)) } else { h.logger.Debug("Previous heartbeat loss event not read from channel") } @@ -172,3 +179,19 @@ func (h *Heartbeat) sendEvent(m *v1.StatsMessage) { h.events <- m } + +// HeartbeatMessage represents a heartbeat received from Icinga 2 together with a timestamp when it was received. +type HeartbeatMessage struct { + received time.Time + stats v1.StatsMessage +} + +// Stats returns the underlying heartbeat message from the icinga:stats stream. +func (m *HeartbeatMessage) Stats() *v1.StatsMessage { + return &m.stats +} + +// ExpiryTime returns the timestamp when the heartbeat expires. +func (m *HeartbeatMessage) ExpiryTime() time.Time { + return m.received.Add(timeout) +}