heartbeat: wrap messages with a timestamp

Track when a heartbeat was received to allow other components to check when it
will expire.
This commit is contained in:
Julian Brost 2021-09-24 16:42:39 +02:00
parent 8b2cb3acb8
commit 217ab03e59
2 changed files with 34 additions and 11 deletions

View file

@ -123,7 +123,7 @@ func (h *HA) controller() {
case m := <-h.heartbeat.Events():
if m != nil {
now := time.Now()
t, err := m.Time()
t, err := m.Stats().Time()
if err != nil {
h.abort(err)
}
@ -136,7 +136,7 @@ func (h *HA) controller() {
h.signalHandover()
continue
}
s, err := m.IcingaStatus()
s, err := m.Stats().IcingaStatus()
if err != nil {
h.abort(err)
}

View file

@ -20,7 +20,7 @@ var timeout = 60 * time.Second
// Also signals on if the heartbeat is Lost.
type Heartbeat struct {
active bool
events chan *v1.StatsMessage
events chan *HeartbeatMessage
cancelCtx context.CancelFunc
client *Client
done chan struct{}
@ -34,7 +34,7 @@ func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger
ctx, cancelCtx := context.WithCancel(ctx)
heartbeat := &Heartbeat{
events: make(chan *v1.StatsMessage, 1),
events: make(chan *HeartbeatMessage, 1),
cancelCtx: cancelCtx,
client: client,
done: make(chan struct{}),
@ -49,7 +49,7 @@ func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger
// Events returns a channel that is sent to on heartbeat events.
//
// A non-nil pointer signals that a heartbeat was received from Icinga 2 whereas a nil pointer signals a heartbeat loss.
func (h *Heartbeat) Events() <-chan *v1.StatsMessage {
func (h *Heartbeat) Events() <-chan *HeartbeatMessage {
return h.events
}
@ -81,7 +81,7 @@ func (h *Heartbeat) controller(ctx context.Context) {
h.logger.Info("Waiting for Icinga 2 heartbeat")
messages := make(chan v1.StatsMessage)
messages := make(chan *HeartbeatMessage)
defer close(messages)
g, ctx := errgroup.WithContext(ctx)
@ -103,8 +103,13 @@ func (h *Heartbeat) controller(ctx context.Context) {
return WrapCmdErr(cmd)
}
m := &HeartbeatMessage{
received: time.Now(),
stats: streams[0].Messages[0].Values,
}
select {
case messages <- streams[0].Messages[0].Values:
case messages <- m:
case <-ctx.Done():
return ctx.Err()
}
@ -119,14 +124,14 @@ func (h *Heartbeat) controller(ctx context.Context) {
select {
case m := <-messages:
if !h.active {
s, err := m.IcingaStatus()
s, err := m.Stats().IcingaStatus()
if err != nil {
return errors.Wrapf(err, "can't parse Icinga 2 status from message %#v", m)
}
h.logger.Infow("Received first Icinga 2 heartbeat", zap.String("environment", s.Environment))
h.active = true
}
h.sendEvent(&m)
h.sendEvent(m)
case <-time.After(timeout):
if h.active {
h.logger.Warnw("Lost Icinga 2 heartbeat", zap.Duration("timeout", timeout))
@ -158,12 +163,14 @@ func (h *Heartbeat) setError(err error) {
h.err = errors.Wrap(err, "heartbeat failed")
}
func (h *Heartbeat) sendEvent(m *v1.StatsMessage) {
func (h *Heartbeat) sendEvent(m *HeartbeatMessage) {
// Remove any not yet delivered event
select {
case old := <-h.events:
if old != nil {
h.logger.Debug("Previous heartbeat not read from channel")
h.logger.Debugw("Previous heartbeat not read from channel",
zap.Time("previous", old.received),
zap.Time("current", m.received))
} else {
h.logger.Debug("Previous heartbeat loss event not read from channel")
}
@ -172,3 +179,19 @@ func (h *Heartbeat) sendEvent(m *v1.StatsMessage) {
h.events <- m
}
// HeartbeatMessage represents a heartbeat received from Icinga 2 together with a timestamp when it was received.
type HeartbeatMessage struct {
received time.Time
stats v1.StatsMessage
}
// Stats returns the underlying heartbeat message from the icinga:stats stream.
func (m *HeartbeatMessage) Stats() *v1.StatsMessage {
return &m.stats
}
// ExpiryTime returns the timestamp when the heartbeat expires.
func (m *HeartbeatMessage) ExpiryTime() time.Time {
return m.received.Add(timeout)
}