diff --git a/pkg/com/cond.go b/pkg/com/cond.go index 63ef1950..72ba347c 100644 --- a/pkg/com/cond.go +++ b/pkg/com/cond.go @@ -2,81 +2,89 @@ package com import ( "context" - "errors" + "github.com/pkg/errors" ) -var ErrCondClosed = errors.New("condition closed") - -// Cond implements a condition variable, a rendezvous point -// for goroutines waiting for or announcing the occurrence -// of an event. +// Cond implements a channel-based synchronization for goroutines that wait for signals or send them. +// Internally based on a controller loop that handles the synchronization of new listeners and signal propagation, +// which is only started when NewCond is called. Thus the zero value cannot be used. type Cond struct { - ctx context.Context - cancel context.CancelFunc broadcast chan struct{} + done chan struct{} + cancel context.CancelFunc listeners chan chan struct{} } +// NewCond returns a new Cond and starts the controller loop. func NewCond(ctx context.Context) *Cond { - done, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(ctx) c := &Cond{ broadcast: make(chan struct{}), cancel: cancel, - ctx: done, + done: make(chan struct{}), listeners: make(chan chan struct{}), } - go c.controller() + go c.controller(ctx) return c } +// Broadcast sends a signal to all current listeners by closing the previously returned channel from Wait. +// Panics if the controller loop has already ended. +func (c *Cond) Broadcast() { + select { + case c.broadcast <- struct{}{}: + case <-c.done: + panic(errors.New("condition closed")) + } +} + +// Close stops the controller loop, waits for it to finish, and returns an error if any. +// Implements the io.Closer interface. +func (c *Cond) Close() error { + c.cancel() + <-c.done + + return nil +} + +// Done returns a channel that will be closed when the controller loop has ended. +func (c *Cond) Done() <-chan struct{} { + return c.done +} + +// Wait returns a channel that is closed with the next signal. +// Panics if the controller loop has already ended. +func (c *Cond) Wait() <-chan struct{} { + select { + case l := <-c.listeners: + return l + case <-c.done: + panic(errors.New("condition closed")) + } +} + // controller loop. -func (c *Cond) controller() { +func (c *Cond) controller(ctx context.Context) { + defer close(c.done) + + // Note that the notify channel does not close when the controller loop ends + // in order not to notify pending listeners. notify := make(chan struct{}) for { select { case <-c.broadcast: - // all current receivers get a closed channel + // Close channel to notify all current listeners. close(notify) - // set up next batch of receivers. + // Create a new channel for the next listeners. notify = make(chan struct{}) case c.listeners <- notify: - // great. A Receiver has our channel - case <-c.ctx.Done(): + // A new listener received the channel. + case <-ctx.Done(): return } } } - -// Close implements the io.Closer interface. -func (c *Cond) Close() error { - c.cancel() - - return nil -} - -// Wait returns a channel on which the next (close) signal will be sent. -func (c *Cond) Wait() <-chan struct{} { - select { - case l := <-c.listeners: - return l - case <-c.ctx.Done(): - panic(ErrCondClosed) - } -} - -// Broadcast wakes all current listeners. -func (c *Cond) Broadcast() { - select { - case c.broadcast <- struct{}{}: - case <-c.ctx.Done(): - panic(ErrCondClosed) - } -} - -func (c *Cond) Done() <-chan struct{} { - return c.ctx.Done() -} diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index bf87ab6d..2a7d7133 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -114,11 +114,8 @@ func (h *HA) controller() { for { select { - case m, ok := <-h.heartbeat.Beat(): - if !ok { - // Beat channel closed. - return - } + case <-h.heartbeat.Beat(): + m := h.heartbeat.Message() now := time.Now() t, err := m.Time() if err != nil { @@ -126,10 +123,10 @@ func (h *HA) controller() { } tt := t.Time() if tt.After(now.Add(1 * time.Second)) { - h.logger.Debugw("Received heartbeat from future", "time", t) + h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt)) } if tt.Before(now.Add(-1 * timeout)) { - h.logger.Errorw("Received heartbeat from the past", "time", t) + h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) h.signalHandover() continue } @@ -155,6 +152,10 @@ func (h *HA) controller() { case <-h.heartbeat.Lost(): h.logger.Error("Lost heartbeat") h.signalHandover() + case <-h.heartbeat.Done(): + if err := h.heartbeat.Err(); err != nil { + h.abort(err) + } case <-h.ctx.Done(): return } diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index 69d00b26..888ffffb 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -3,6 +3,7 @@ package icingaredis import ( "context" "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/pkg/com" v1 "github.com/icinga/icingadb/pkg/icingaredis/v1" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" @@ -12,79 +13,98 @@ import ( "time" ) +// timeout defines how long a heartbeat may be absent if a heartbeat has already been received. +// After this time, a heartbeat loss is propagated. var timeout = 60 * time.Second +// Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received. +// Also signals on if the heartbeat is Lost. type Heartbeat struct { - ctx context.Context - cancel context.CancelFunc - client *Client - logger *zap.SugaredLogger - active bool - beat chan v1.StatsMessage - lost chan struct{} - done chan struct{} - mu *sync.Mutex - err error + active bool + beat *com.Cond + cancel context.CancelFunc + client *Client + done chan struct{} + errMu sync.Mutex + err error + logger *zap.SugaredLogger + lost *com.Cond + message *v1.StatsMessage + messageMu sync.Mutex } +// NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop. func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger) *Heartbeat { ctx, cancel := context.WithCancel(ctx) heartbeat := &Heartbeat{ - ctx: ctx, + beat: com.NewCond(ctx), cancel: cancel, client: client, - logger: logger, - beat: make(chan v1.StatsMessage), - lost: make(chan struct{}), done: make(chan struct{}), - mu: &sync.Mutex{}, + logger: logger, + lost: com.NewCond(ctx), } - go heartbeat.controller() + go heartbeat.controller(ctx) return heartbeat } -// Close implements the io.Closer interface. -func (h Heartbeat) Close() error { - // Cancel ctx. +// Beat returns a channel that will be closed when a new heartbeat is received. +func (h *Heartbeat) Beat() <-chan struct{} { + return h.beat.Wait() +} + +// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any. +// Implements the io.Closer interface. +func (h *Heartbeat) Close() error { h.cancel() - // Wait until the controller loop ended. <-h.Done() - // And return an error, if any. + return h.Err() } -func (h Heartbeat) Done() <-chan struct{} { +// Done returns a channel that will be closed when the heartbeat controller loop has ended. +func (h *Heartbeat) Done() <-chan struct{} { return h.done } -func (h Heartbeat) Err() error { - h.mu.Lock() - defer h.mu.Unlock() +// Err returns an error if Done has been closed and there is an error. Otherwise returns nil. +func (h *Heartbeat) Err() error { + h.errMu.Lock() + defer h.errMu.Unlock() return h.err } -func (h Heartbeat) Beat() <-chan v1.StatsMessage { - return h.beat +// Lost returns a channel that will be closed if the heartbeat is lost. +func (h *Heartbeat) Lost() <-chan struct{} { + return h.lost.Wait() } -func (h Heartbeat) Lost() <-chan struct{} { - return h.lost +// Message returns the last heartbeat message. +func (h *Heartbeat) Message() *v1.StatsMessage { + h.messageMu.Lock() + defer h.messageMu.Unlock() + + return h.message } // controller loop. -func (h Heartbeat) controller() { +func (h *Heartbeat) controller(ctx context.Context) { + defer close(h.done) + + h.logger.Info("Waiting for Icinga 2 heartbeat") + messages := make(chan v1.StatsMessage) defer close(messages) - g, ctx := errgroup.WithContext(h.ctx) + g, ctx := errgroup.WithContext(ctx) - // Message producer loop + // Message producer loop. g.Go(func() error { - // We expect heartbeats every second but only read them every 3 seconds + // We expect heartbeats every second but only read them every 3 seconds. throttle := time.NewTicker(time.Second * 3) defer throttle.Stop() @@ -109,7 +129,7 @@ func (h Heartbeat) controller() { } }) - // State loop + // State loop. g.Go(func() error { for { select { @@ -122,11 +142,12 @@ func (h Heartbeat) controller() { h.logger.Infow("Received first Icinga 2 heartbeat", zap.String("environment", s.Environment)) h.active = true } - h.beat <- m + h.setMessage(&m) + h.beat.Broadcast() case <-time.After(timeout): if h.active { h.logger.Warn("Lost Icinga 2 heartbeat", zap.Duration("timeout", timeout)) - h.lost <- struct{}{} + h.lost.Broadcast() h.active = false } else { h.logger.Warn("Waiting for Icinga 2 heartbeat") @@ -140,14 +161,23 @@ func (h Heartbeat) controller() { // Since the goroutines of the group actually run endlessly, // we wait here forever, unless an error occurs. if err := g.Wait(); err != nil && !utils.IsContextCanceled(err) { - // Do not propagate context-aborted errors here, - // as this is to be expected when Close was called. + // Do not propagate any context-canceled errors here, + // as this is to be expected when calling Close or + // when the parent context is canceled. h.setError(err) } } func (h *Heartbeat) setError(err error) { - h.mu.Lock() + h.errMu.Lock() + defer h.errMu.Unlock() + h.err = errors.Wrap(err, "heartbeat failed") - h.mu.Unlock() +} + +func (h *Heartbeat) setMessage(m *v1.StatsMessage) { + h.messageMu.Lock() + defer h.messageMu.Unlock() + + h.message = m }