From b7ecfb9df28155d8530fc3371d66ac566f8c995a Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Thu, 15 Jul 2021 14:06:21 +0200 Subject: [PATCH 1/2] Do not store ctx inside Cond and add Godoc --- pkg/com/cond.go | 100 ++++++++++++++++++++++++++---------------------- 1 file changed, 54 insertions(+), 46 deletions(-) diff --git a/pkg/com/cond.go b/pkg/com/cond.go index 63ef1950..72ba347c 100644 --- a/pkg/com/cond.go +++ b/pkg/com/cond.go @@ -2,81 +2,89 @@ package com import ( "context" - "errors" + "github.com/pkg/errors" ) -var ErrCondClosed = errors.New("condition closed") - -// Cond implements a condition variable, a rendezvous point -// for goroutines waiting for or announcing the occurrence -// of an event. +// Cond implements a channel-based synchronization for goroutines that wait for signals or send them. +// Internally based on a controller loop that handles the synchronization of new listeners and signal propagation, +// which is only started when NewCond is called. Thus the zero value cannot be used. type Cond struct { - ctx context.Context - cancel context.CancelFunc broadcast chan struct{} + done chan struct{} + cancel context.CancelFunc listeners chan chan struct{} } +// NewCond returns a new Cond and starts the controller loop. func NewCond(ctx context.Context) *Cond { - done, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(ctx) c := &Cond{ broadcast: make(chan struct{}), cancel: cancel, - ctx: done, + done: make(chan struct{}), listeners: make(chan chan struct{}), } - go c.controller() + go c.controller(ctx) return c } +// Broadcast sends a signal to all current listeners by closing the previously returned channel from Wait. +// Panics if the controller loop has already ended. +func (c *Cond) Broadcast() { + select { + case c.broadcast <- struct{}{}: + case <-c.done: + panic(errors.New("condition closed")) + } +} + +// Close stops the controller loop, waits for it to finish, and returns an error if any. +// Implements the io.Closer interface. +func (c *Cond) Close() error { + c.cancel() + <-c.done + + return nil +} + +// Done returns a channel that will be closed when the controller loop has ended. +func (c *Cond) Done() <-chan struct{} { + return c.done +} + +// Wait returns a channel that is closed with the next signal. +// Panics if the controller loop has already ended. +func (c *Cond) Wait() <-chan struct{} { + select { + case l := <-c.listeners: + return l + case <-c.done: + panic(errors.New("condition closed")) + } +} + // controller loop. -func (c *Cond) controller() { +func (c *Cond) controller(ctx context.Context) { + defer close(c.done) + + // Note that the notify channel does not close when the controller loop ends + // in order not to notify pending listeners. notify := make(chan struct{}) for { select { case <-c.broadcast: - // all current receivers get a closed channel + // Close channel to notify all current listeners. close(notify) - // set up next batch of receivers. + // Create a new channel for the next listeners. notify = make(chan struct{}) case c.listeners <- notify: - // great. A Receiver has our channel - case <-c.ctx.Done(): + // A new listener received the channel. + case <-ctx.Done(): return } } } - -// Close implements the io.Closer interface. -func (c *Cond) Close() error { - c.cancel() - - return nil -} - -// Wait returns a channel on which the next (close) signal will be sent. -func (c *Cond) Wait() <-chan struct{} { - select { - case l := <-c.listeners: - return l - case <-c.ctx.Done(): - panic(ErrCondClosed) - } -} - -// Broadcast wakes all current listeners. -func (c *Cond) Broadcast() { - select { - case c.broadcast <- struct{}{}: - case <-c.ctx.Done(): - panic(ErrCondClosed) - } -} - -func (c *Cond) Done() <-chan struct{} { - return c.ctx.Done() -} From 725e70f0b91333f615f77c5e0e886ba869bfb312 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Thu, 15 Jul 2021 14:09:01 +0200 Subject: [PATCH 2/2] Pointer receivers, Cond usage, pass ctx and Godoc for Heartbeat Heartbeat now uses pointer receivers for its methods because some methods actually change the heartbeat values. The context is no longer stored in the structure, but passed to the controller loop. The beat and the lost channels are replaced by Cond and the last heartbeat is stored independently to not be affected by a slow HA receiver. If the database connections are occupied by the config, HA cannot update the instance and does not read from the beat channel in time. In addition, heartbeat errors are no longer swallowed, but handled in HA. --- pkg/icingadb/ha.go | 15 ++--- pkg/icingaredis/heartbeat.go | 110 ++++++++++++++++++++++------------- 2 files changed, 78 insertions(+), 47 deletions(-) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index bf87ab6d..2a7d7133 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -114,11 +114,8 @@ func (h *HA) controller() { for { select { - case m, ok := <-h.heartbeat.Beat(): - if !ok { - // Beat channel closed. - return - } + case <-h.heartbeat.Beat(): + m := h.heartbeat.Message() now := time.Now() t, err := m.Time() if err != nil { @@ -126,10 +123,10 @@ func (h *HA) controller() { } tt := t.Time() if tt.After(now.Add(1 * time.Second)) { - h.logger.Debugw("Received heartbeat from future", "time", t) + h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt)) } if tt.Before(now.Add(-1 * timeout)) { - h.logger.Errorw("Received heartbeat from the past", "time", t) + h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) h.signalHandover() continue } @@ -155,6 +152,10 @@ func (h *HA) controller() { case <-h.heartbeat.Lost(): h.logger.Error("Lost heartbeat") h.signalHandover() + case <-h.heartbeat.Done(): + if err := h.heartbeat.Err(); err != nil { + h.abort(err) + } case <-h.ctx.Done(): return } diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index 69d00b26..888ffffb 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -3,6 +3,7 @@ package icingaredis import ( "context" "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/pkg/com" v1 "github.com/icinga/icingadb/pkg/icingaredis/v1" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" @@ -12,79 +13,98 @@ import ( "time" ) +// timeout defines how long a heartbeat may be absent if a heartbeat has already been received. +// After this time, a heartbeat loss is propagated. var timeout = 60 * time.Second +// Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received. +// Also signals on if the heartbeat is Lost. type Heartbeat struct { - ctx context.Context - cancel context.CancelFunc - client *Client - logger *zap.SugaredLogger - active bool - beat chan v1.StatsMessage - lost chan struct{} - done chan struct{} - mu *sync.Mutex - err error + active bool + beat *com.Cond + cancel context.CancelFunc + client *Client + done chan struct{} + errMu sync.Mutex + err error + logger *zap.SugaredLogger + lost *com.Cond + message *v1.StatsMessage + messageMu sync.Mutex } +// NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop. func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger) *Heartbeat { ctx, cancel := context.WithCancel(ctx) heartbeat := &Heartbeat{ - ctx: ctx, + beat: com.NewCond(ctx), cancel: cancel, client: client, - logger: logger, - beat: make(chan v1.StatsMessage), - lost: make(chan struct{}), done: make(chan struct{}), - mu: &sync.Mutex{}, + logger: logger, + lost: com.NewCond(ctx), } - go heartbeat.controller() + go heartbeat.controller(ctx) return heartbeat } -// Close implements the io.Closer interface. -func (h Heartbeat) Close() error { - // Cancel ctx. +// Beat returns a channel that will be closed when a new heartbeat is received. +func (h *Heartbeat) Beat() <-chan struct{} { + return h.beat.Wait() +} + +// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any. +// Implements the io.Closer interface. +func (h *Heartbeat) Close() error { h.cancel() - // Wait until the controller loop ended. <-h.Done() - // And return an error, if any. + return h.Err() } -func (h Heartbeat) Done() <-chan struct{} { +// Done returns a channel that will be closed when the heartbeat controller loop has ended. +func (h *Heartbeat) Done() <-chan struct{} { return h.done } -func (h Heartbeat) Err() error { - h.mu.Lock() - defer h.mu.Unlock() +// Err returns an error if Done has been closed and there is an error. Otherwise returns nil. +func (h *Heartbeat) Err() error { + h.errMu.Lock() + defer h.errMu.Unlock() return h.err } -func (h Heartbeat) Beat() <-chan v1.StatsMessage { - return h.beat +// Lost returns a channel that will be closed if the heartbeat is lost. +func (h *Heartbeat) Lost() <-chan struct{} { + return h.lost.Wait() } -func (h Heartbeat) Lost() <-chan struct{} { - return h.lost +// Message returns the last heartbeat message. +func (h *Heartbeat) Message() *v1.StatsMessage { + h.messageMu.Lock() + defer h.messageMu.Unlock() + + return h.message } // controller loop. -func (h Heartbeat) controller() { +func (h *Heartbeat) controller(ctx context.Context) { + defer close(h.done) + + h.logger.Info("Waiting for Icinga 2 heartbeat") + messages := make(chan v1.StatsMessage) defer close(messages) - g, ctx := errgroup.WithContext(h.ctx) + g, ctx := errgroup.WithContext(ctx) - // Message producer loop + // Message producer loop. g.Go(func() error { - // We expect heartbeats every second but only read them every 3 seconds + // We expect heartbeats every second but only read them every 3 seconds. throttle := time.NewTicker(time.Second * 3) defer throttle.Stop() @@ -109,7 +129,7 @@ func (h Heartbeat) controller() { } }) - // State loop + // State loop. g.Go(func() error { for { select { @@ -122,11 +142,12 @@ func (h Heartbeat) controller() { h.logger.Infow("Received first Icinga 2 heartbeat", zap.String("environment", s.Environment)) h.active = true } - h.beat <- m + h.setMessage(&m) + h.beat.Broadcast() case <-time.After(timeout): if h.active { h.logger.Warn("Lost Icinga 2 heartbeat", zap.Duration("timeout", timeout)) - h.lost <- struct{}{} + h.lost.Broadcast() h.active = false } else { h.logger.Warn("Waiting for Icinga 2 heartbeat") @@ -140,14 +161,23 @@ func (h Heartbeat) controller() { // Since the goroutines of the group actually run endlessly, // we wait here forever, unless an error occurs. if err := g.Wait(); err != nil && !utils.IsContextCanceled(err) { - // Do not propagate context-aborted errors here, - // as this is to be expected when Close was called. + // Do not propagate any context-canceled errors here, + // as this is to be expected when calling Close or + // when the parent context is canceled. h.setError(err) } } func (h *Heartbeat) setError(err error) { - h.mu.Lock() + h.errMu.Lock() + defer h.errMu.Unlock() + h.err = errors.Wrap(err, "heartbeat failed") - h.mu.Unlock() +} + +func (h *Heartbeat) setMessage(m *v1.StatsMessage) { + h.messageMu.Lock() + defer h.messageMu.Unlock() + + h.message = m }