From 8b2cb3acb8be7a0df325739e3f3da42c86269205 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Thu, 23 Sep 2021 15:48:30 +0200 Subject: [PATCH 1/5] heartbeat: use a single channel for all beat/loss events Using Cond does not allow to reliably catch all events as one will only receive events that occour after starting to listen. For heartbeat loss events it's import to reliably catch them to not remain in an HA active state incorrectly. fixes #360 --- pkg/icingadb/ha.go | 73 ++++++++++++++++++------------------ pkg/icingaredis/heartbeat.go | 51 +++++++++++-------------- 2 files changed, 58 insertions(+), 66 deletions(-) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 44cd87b5..3b991d1f 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -120,44 +120,45 @@ func (h *HA) controller() { for { select { - case <-h.heartbeat.Beat(): - m := h.heartbeat.Message() - now := time.Now() - t, err := m.Time() - if err != nil { - h.abort(err) - } - tt := t.Time() - if tt.After(now.Add(1 * time.Second)) { - h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt)) - } - if tt.Before(now.Add(-1 * timeout)) { - h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) + case m := <-h.heartbeat.Events(): + if m != nil { + now := time.Now() + t, err := m.Time() + if err != nil { + h.abort(err) + } + tt := t.Time() + if tt.After(now.Add(1 * time.Second)) { + h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt)) + } + if tt.Before(now.Add(-1 * timeout)) { + h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) + h.signalHandover() + continue + } + s, err := m.IcingaStatus() + if err != nil { + h.abort(err) + } + + select { + case <-logTicker.C: + shouldLog = true + default: + } + if err = h.realize(s, t, shouldLog); err != nil { + h.abort(err) + } + if !oldInstancesRemoved { + go h.removeOldInstances(s) + oldInstancesRemoved = true + } + + shouldLog = false + } else { + h.logger.Error("Lost heartbeat") h.signalHandover() - continue } - s, err := m.IcingaStatus() - if err != nil { - h.abort(err) - } - - select { - case <-logTicker.C: - shouldLog = true - default: - } - if err = h.realize(s, t, shouldLog); err != nil { - h.abort(err) - } - if !oldInstancesRemoved { - go h.removeOldInstances(s) - oldInstancesRemoved = true - } - - shouldLog = false - case <-h.heartbeat.Lost(): - h.logger.Error("Lost heartbeat") - h.signalHandover() case <-h.heartbeat.Done(): if err := h.heartbeat.Err(); err != nil { h.abort(err) diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index 198d1138..d84afe4a 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -3,7 +3,6 @@ package icingaredis import ( "context" "github.com/go-redis/redis/v8" - "github.com/icinga/icingadb/pkg/com" v1 "github.com/icinga/icingadb/pkg/icingaredis/v1" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" @@ -21,16 +20,13 @@ var timeout = 60 * time.Second // Also signals on if the heartbeat is Lost. type Heartbeat struct { active bool - beat *com.Cond + events chan *v1.StatsMessage cancelCtx context.CancelFunc client *Client done chan struct{} errMu sync.Mutex err error logger *zap.SugaredLogger - lost *com.Cond - message *v1.StatsMessage - messageMu sync.Mutex } // NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop. @@ -38,12 +34,11 @@ func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger ctx, cancelCtx := context.WithCancel(ctx) heartbeat := &Heartbeat{ - beat: com.NewCond(ctx), + events: make(chan *v1.StatsMessage, 1), cancelCtx: cancelCtx, client: client, done: make(chan struct{}), logger: logger, - lost: com.NewCond(ctx), } go heartbeat.controller(ctx) @@ -51,9 +46,11 @@ func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger return heartbeat } -// Beat returns a channel that will be closed when a new heartbeat is received. -func (h *Heartbeat) Beat() <-chan struct{} { - return h.beat.Wait() +// Events returns a channel that is sent to on heartbeat events. +// +// A non-nil pointer signals that a heartbeat was received from Icinga 2 whereas a nil pointer signals a heartbeat loss. +func (h *Heartbeat) Events() <-chan *v1.StatsMessage { + return h.events } // Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any. @@ -78,19 +75,6 @@ func (h *Heartbeat) Err() error { return h.err } -// Lost returns a channel that will be closed if the heartbeat is lost. -func (h *Heartbeat) Lost() <-chan struct{} { - return h.lost.Wait() -} - -// Message returns the last heartbeat message. -func (h *Heartbeat) Message() *v1.StatsMessage { - h.messageMu.Lock() - defer h.messageMu.Unlock() - - return h.message -} - // controller loop. func (h *Heartbeat) controller(ctx context.Context) { defer close(h.done) @@ -142,12 +126,11 @@ func (h *Heartbeat) controller(ctx context.Context) { h.logger.Infow("Received first Icinga 2 heartbeat", zap.String("environment", s.Environment)) h.active = true } - h.setMessage(&m) - h.beat.Broadcast() + h.sendEvent(&m) case <-time.After(timeout): if h.active { h.logger.Warnw("Lost Icinga 2 heartbeat", zap.Duration("timeout", timeout)) - h.lost.Broadcast() + h.sendEvent(nil) h.active = false } else { h.logger.Warn("Waiting for Icinga 2 heartbeat") @@ -175,9 +158,17 @@ func (h *Heartbeat) setError(err error) { h.err = errors.Wrap(err, "heartbeat failed") } -func (h *Heartbeat) setMessage(m *v1.StatsMessage) { - h.messageMu.Lock() - defer h.messageMu.Unlock() +func (h *Heartbeat) sendEvent(m *v1.StatsMessage) { + // Remove any not yet delivered event + select { + case old := <-h.events: + if old != nil { + h.logger.Debug("Previous heartbeat not read from channel") + } else { + h.logger.Debug("Previous heartbeat loss event not read from channel") + } + default: + } - h.message = m + h.events <- m } From 217ab03e5963af8b2ffec50181f9be0903923bbc Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Fri, 24 Sep 2021 16:42:39 +0200 Subject: [PATCH 2/5] heartbeat: wrap messages with a timestamp Track when a heartbeat was received to allow other components to check when it will expire. --- pkg/icingadb/ha.go | 4 ++-- pkg/icingaredis/heartbeat.go | 41 ++++++++++++++++++++++++++++-------- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 3b991d1f..64f7c8ff 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -123,7 +123,7 @@ func (h *HA) controller() { case m := <-h.heartbeat.Events(): if m != nil { now := time.Now() - t, err := m.Time() + t, err := m.Stats().Time() if err != nil { h.abort(err) } @@ -136,7 +136,7 @@ func (h *HA) controller() { h.signalHandover() continue } - s, err := m.IcingaStatus() + s, err := m.Stats().IcingaStatus() if err != nil { h.abort(err) } diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index d84afe4a..b33a8af0 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -20,7 +20,7 @@ var timeout = 60 * time.Second // Also signals on if the heartbeat is Lost. type Heartbeat struct { active bool - events chan *v1.StatsMessage + events chan *HeartbeatMessage cancelCtx context.CancelFunc client *Client done chan struct{} @@ -34,7 +34,7 @@ func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger ctx, cancelCtx := context.WithCancel(ctx) heartbeat := &Heartbeat{ - events: make(chan *v1.StatsMessage, 1), + events: make(chan *HeartbeatMessage, 1), cancelCtx: cancelCtx, client: client, done: make(chan struct{}), @@ -49,7 +49,7 @@ func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger // Events returns a channel that is sent to on heartbeat events. // // A non-nil pointer signals that a heartbeat was received from Icinga 2 whereas a nil pointer signals a heartbeat loss. -func (h *Heartbeat) Events() <-chan *v1.StatsMessage { +func (h *Heartbeat) Events() <-chan *HeartbeatMessage { return h.events } @@ -81,7 +81,7 @@ func (h *Heartbeat) controller(ctx context.Context) { h.logger.Info("Waiting for Icinga 2 heartbeat") - messages := make(chan v1.StatsMessage) + messages := make(chan *HeartbeatMessage) defer close(messages) g, ctx := errgroup.WithContext(ctx) @@ -103,8 +103,13 @@ func (h *Heartbeat) controller(ctx context.Context) { return WrapCmdErr(cmd) } + m := &HeartbeatMessage{ + received: time.Now(), + stats: streams[0].Messages[0].Values, + } + select { - case messages <- streams[0].Messages[0].Values: + case messages <- m: case <-ctx.Done(): return ctx.Err() } @@ -119,14 +124,14 @@ func (h *Heartbeat) controller(ctx context.Context) { select { case m := <-messages: if !h.active { - s, err := m.IcingaStatus() + s, err := m.Stats().IcingaStatus() if err != nil { return errors.Wrapf(err, "can't parse Icinga 2 status from message %#v", m) } h.logger.Infow("Received first Icinga 2 heartbeat", zap.String("environment", s.Environment)) h.active = true } - h.sendEvent(&m) + h.sendEvent(m) case <-time.After(timeout): if h.active { h.logger.Warnw("Lost Icinga 2 heartbeat", zap.Duration("timeout", timeout)) @@ -158,12 +163,14 @@ func (h *Heartbeat) setError(err error) { h.err = errors.Wrap(err, "heartbeat failed") } -func (h *Heartbeat) sendEvent(m *v1.StatsMessage) { +func (h *Heartbeat) sendEvent(m *HeartbeatMessage) { // Remove any not yet delivered event select { case old := <-h.events: if old != nil { - h.logger.Debug("Previous heartbeat not read from channel") + h.logger.Debugw("Previous heartbeat not read from channel", + zap.Time("previous", old.received), + zap.Time("current", m.received)) } else { h.logger.Debug("Previous heartbeat loss event not read from channel") } @@ -172,3 +179,19 @@ func (h *Heartbeat) sendEvent(m *v1.StatsMessage) { h.events <- m } + +// HeartbeatMessage represents a heartbeat received from Icinga 2 together with a timestamp when it was received. +type HeartbeatMessage struct { + received time.Time + stats v1.StatsMessage +} + +// Stats returns the underlying heartbeat message from the icinga:stats stream. +func (m *HeartbeatMessage) Stats() *v1.StatsMessage { + return &m.stats +} + +// ExpiryTime returns the timestamp when the heartbeat expires. +func (m *HeartbeatMessage) ExpiryTime() time.Time { + return m.received.Add(timeout) +} From a34aef4fc5ec97249bdd728fb41e62cc18612074 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Tue, 28 Sep 2021 11:16:51 +0200 Subject: [PATCH 3/5] retry: if stopped due to outer context, return that error If there is an outer context that is canceled or exceeds its deadline before the internal timeout is reached, its error should be passed on as the failure didn't happen due to retry giving up. --- pkg/retry/retry.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index b8c1c92a..29faf8a7 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -28,6 +28,8 @@ type Settings struct { func WithBackoff( ctx context.Context, retryableFunc RetryableFunc, retryable IsRetryable, b backoff.Backoff, settings Settings, ) (err error) { + parentCtx := ctx + if settings.Timeout > 0 { var cancelCtx context.CancelFunc ctx, cancelCtx = context.WithTimeout(ctx, settings.Timeout) @@ -65,10 +67,14 @@ func WithBackoff( sleep := b(attempt) select { case <-ctx.Done(): - if err == nil { - err = ctx.Err() + if outerErr := parentCtx.Err(); outerErr != nil { + err = errors.Wrap(outerErr, "outer context canceled") + } else { + if err == nil { + err = ctx.Err() + } + err = errors.Wrap(err, "can't retry") } - err = errors.Wrap(err, "can't retry") return case <-time.After(sleep): From 239d2ea41067db00465fbff85909949444a42cdb Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Fri, 24 Sep 2021 17:08:01 +0200 Subject: [PATCH 4/5] HA: after heartbeat expiry, stop writing to database and hand over If it's not possible for Icinga DB to write through the heartbeat within its validity period it cannot signal to other instances that it still is alive and has the hand over. There's also no point in retrying for this individual heartbeat any longer. --- pkg/icingadb/ha.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 64f7c8ff..3d3daa72 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -146,9 +146,18 @@ func (h *HA) controller() { shouldLog = true default: } - if err = h.realize(s, t, shouldLog); err != nil { + + realizeCtx, cancelRealizeCtx := context.WithDeadline(h.ctx, m.ExpiryTime()) + err = h.realize(realizeCtx, s, t, shouldLog) + cancelRealizeCtx() + if errors.Is(err, context.DeadlineExceeded) { + h.signalHandover() + continue + } + if err != nil { h.abort(err) } + if !oldInstancesRemoved { go h.removeOldInstances(s) oldInstancesRemoved = true @@ -169,13 +178,13 @@ func (h *HA) controller() { } } -func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLog bool) error { +func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLog bool) error { boff := backoff.NewExponentialWithJitter(time.Millisecond*256, time.Second*3) for attempt := 0; true; attempt++ { sleep := boff(uint64(attempt)) time.Sleep(sleep) - ctx, cancelCtx := context.WithCancel(h.ctx) + ctx, cancelCtx := context.WithCancel(ctx) tx, err := h.db.BeginTxx(ctx, &sql.TxOptions{ Isolation: sql.LevelSerializable, }) From c5af0cd287c0e639373581bc25f94c860bc71722 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Mon, 4 Oct 2021 16:51:18 +0200 Subject: [PATCH 5/5] HA: only set realize timeout when active When inactive, this is the only query running so it has to retry for longer to eventually trigger a fatal error if the database is gone for too long (5 minutes at the moment). --- pkg/icingadb/ha.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 3d3daa72..d67e5bc4 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -147,7 +147,13 @@ func (h *HA) controller() { default: } - realizeCtx, cancelRealizeCtx := context.WithDeadline(h.ctx, m.ExpiryTime()) + var realizeCtx context.Context + var cancelRealizeCtx context.CancelFunc + if h.responsible { + realizeCtx, cancelRealizeCtx = context.WithDeadline(h.ctx, m.ExpiryTime()) + } else { + realizeCtx, cancelRealizeCtx = context.WithCancel(h.ctx) + } err = h.realize(realizeCtx, s, t, shouldLog) cancelRealizeCtx() if errors.Is(err, context.DeadlineExceeded) {