diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 44cd87b5..d67e5bc4 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -120,44 +120,60 @@ func (h *HA) controller() { for { select { - case <-h.heartbeat.Beat(): - m := h.heartbeat.Message() - now := time.Now() - t, err := m.Time() - if err != nil { - h.abort(err) - } - tt := t.Time() - if tt.After(now.Add(1 * time.Second)) { - h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt)) - } - if tt.Before(now.Add(-1 * timeout)) { - h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) + case m := <-h.heartbeat.Events(): + if m != nil { + now := time.Now() + t, err := m.Stats().Time() + if err != nil { + h.abort(err) + } + tt := t.Time() + if tt.After(now.Add(1 * time.Second)) { + h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt)) + } + if tt.Before(now.Add(-1 * timeout)) { + h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) + h.signalHandover() + continue + } + s, err := m.Stats().IcingaStatus() + if err != nil { + h.abort(err) + } + + select { + case <-logTicker.C: + shouldLog = true + default: + } + + var realizeCtx context.Context + var cancelRealizeCtx context.CancelFunc + if h.responsible { + realizeCtx, cancelRealizeCtx = context.WithDeadline(h.ctx, m.ExpiryTime()) + } else { + realizeCtx, cancelRealizeCtx = context.WithCancel(h.ctx) + } + err = h.realize(realizeCtx, s, t, shouldLog) + cancelRealizeCtx() + if errors.Is(err, context.DeadlineExceeded) { + h.signalHandover() + continue + } + if err != nil { + h.abort(err) + } + + if !oldInstancesRemoved { + go h.removeOldInstances(s) + oldInstancesRemoved = true + } + + shouldLog = false + } else { + h.logger.Error("Lost heartbeat") h.signalHandover() - continue } - s, err := m.IcingaStatus() - if err != nil { - h.abort(err) - } - - select { - case <-logTicker.C: - shouldLog = true - default: - } - if err = h.realize(s, t, shouldLog); err != nil { - h.abort(err) - } - if !oldInstancesRemoved { - go h.removeOldInstances(s) - oldInstancesRemoved = true - } - - shouldLog = false - case <-h.heartbeat.Lost(): - h.logger.Error("Lost heartbeat") - h.signalHandover() case <-h.heartbeat.Done(): if err := h.heartbeat.Err(); err != nil { h.abort(err) @@ -168,13 +184,13 @@ func (h *HA) controller() { } } -func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLog bool) error { +func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLog bool) error { boff := backoff.NewExponentialWithJitter(time.Millisecond*256, time.Second*3) for attempt := 0; true; attempt++ { sleep := boff(uint64(attempt)) time.Sleep(sleep) - ctx, cancelCtx := context.WithCancel(h.ctx) + ctx, cancelCtx := context.WithCancel(ctx) tx, err := h.db.BeginTxx(ctx, &sql.TxOptions{ Isolation: sql.LevelSerializable, }) diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index 198d1138..b33a8af0 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -3,7 +3,6 @@ package icingaredis import ( "context" "github.com/go-redis/redis/v8" - "github.com/icinga/icingadb/pkg/com" v1 "github.com/icinga/icingadb/pkg/icingaredis/v1" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" @@ -21,16 +20,13 @@ var timeout = 60 * time.Second // Also signals on if the heartbeat is Lost. type Heartbeat struct { active bool - beat *com.Cond + events chan *HeartbeatMessage cancelCtx context.CancelFunc client *Client done chan struct{} errMu sync.Mutex err error logger *zap.SugaredLogger - lost *com.Cond - message *v1.StatsMessage - messageMu sync.Mutex } // NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop. @@ -38,12 +34,11 @@ func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger ctx, cancelCtx := context.WithCancel(ctx) heartbeat := &Heartbeat{ - beat: com.NewCond(ctx), + events: make(chan *HeartbeatMessage, 1), cancelCtx: cancelCtx, client: client, done: make(chan struct{}), logger: logger, - lost: com.NewCond(ctx), } go heartbeat.controller(ctx) @@ -51,9 +46,11 @@ func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger return heartbeat } -// Beat returns a channel that will be closed when a new heartbeat is received. -func (h *Heartbeat) Beat() <-chan struct{} { - return h.beat.Wait() +// Events returns a channel that is sent to on heartbeat events. +// +// A non-nil pointer signals that a heartbeat was received from Icinga 2 whereas a nil pointer signals a heartbeat loss. +func (h *Heartbeat) Events() <-chan *HeartbeatMessage { + return h.events } // Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any. @@ -78,26 +75,13 @@ func (h *Heartbeat) Err() error { return h.err } -// Lost returns a channel that will be closed if the heartbeat is lost. -func (h *Heartbeat) Lost() <-chan struct{} { - return h.lost.Wait() -} - -// Message returns the last heartbeat message. -func (h *Heartbeat) Message() *v1.StatsMessage { - h.messageMu.Lock() - defer h.messageMu.Unlock() - - return h.message -} - // controller loop. func (h *Heartbeat) controller(ctx context.Context) { defer close(h.done) h.logger.Info("Waiting for Icinga 2 heartbeat") - messages := make(chan v1.StatsMessage) + messages := make(chan *HeartbeatMessage) defer close(messages) g, ctx := errgroup.WithContext(ctx) @@ -119,8 +103,13 @@ func (h *Heartbeat) controller(ctx context.Context) { return WrapCmdErr(cmd) } + m := &HeartbeatMessage{ + received: time.Now(), + stats: streams[0].Messages[0].Values, + } + select { - case messages <- streams[0].Messages[0].Values: + case messages <- m: case <-ctx.Done(): return ctx.Err() } @@ -135,19 +124,18 @@ func (h *Heartbeat) controller(ctx context.Context) { select { case m := <-messages: if !h.active { - s, err := m.IcingaStatus() + s, err := m.Stats().IcingaStatus() if err != nil { return errors.Wrapf(err, "can't parse Icinga 2 status from message %#v", m) } h.logger.Infow("Received first Icinga 2 heartbeat", zap.String("environment", s.Environment)) h.active = true } - h.setMessage(&m) - h.beat.Broadcast() + h.sendEvent(m) case <-time.After(timeout): if h.active { h.logger.Warnw("Lost Icinga 2 heartbeat", zap.Duration("timeout", timeout)) - h.lost.Broadcast() + h.sendEvent(nil) h.active = false } else { h.logger.Warn("Waiting for Icinga 2 heartbeat") @@ -175,9 +163,35 @@ func (h *Heartbeat) setError(err error) { h.err = errors.Wrap(err, "heartbeat failed") } -func (h *Heartbeat) setMessage(m *v1.StatsMessage) { - h.messageMu.Lock() - defer h.messageMu.Unlock() +func (h *Heartbeat) sendEvent(m *HeartbeatMessage) { + // Remove any not yet delivered event + select { + case old := <-h.events: + if old != nil { + h.logger.Debugw("Previous heartbeat not read from channel", + zap.Time("previous", old.received), + zap.Time("current", m.received)) + } else { + h.logger.Debug("Previous heartbeat loss event not read from channel") + } + default: + } - h.message = m + h.events <- m +} + +// HeartbeatMessage represents a heartbeat received from Icinga 2 together with a timestamp when it was received. +type HeartbeatMessage struct { + received time.Time + stats v1.StatsMessage +} + +// Stats returns the underlying heartbeat message from the icinga:stats stream. +func (m *HeartbeatMessage) Stats() *v1.StatsMessage { + return &m.stats +} + +// ExpiryTime returns the timestamp when the heartbeat expires. +func (m *HeartbeatMessage) ExpiryTime() time.Time { + return m.received.Add(timeout) } diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index b8c1c92a..29faf8a7 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -28,6 +28,8 @@ type Settings struct { func WithBackoff( ctx context.Context, retryableFunc RetryableFunc, retryable IsRetryable, b backoff.Backoff, settings Settings, ) (err error) { + parentCtx := ctx + if settings.Timeout > 0 { var cancelCtx context.CancelFunc ctx, cancelCtx = context.WithTimeout(ctx, settings.Timeout) @@ -65,10 +67,14 @@ func WithBackoff( sleep := b(attempt) select { case <-ctx.Done(): - if err == nil { - err = ctx.Err() + if outerErr := parentCtx.Err(); outerErr != nil { + err = errors.Wrap(outerErr, "outer context canceled") + } else { + if err == nil { + err = ctx.Err() + } + err = errors.Wrap(err, "can't retry") } - err = errors.Wrap(err, "can't retry") return case <-time.After(sleep):