Merge pull request #376 from Icinga/bugfix/lost-heartbeat-events

Reliably handle heartbeat events in HA and stop writing a heartbeat to the DB after it expires and hand over
This commit is contained in:
Julian Brost 2021-10-06 14:54:07 +02:00 committed by GitHub
commit c701b68e2a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 110 additions and 74 deletions

View file

@ -120,44 +120,60 @@ func (h *HA) controller() {
for {
select {
case <-h.heartbeat.Beat():
m := h.heartbeat.Message()
now := time.Now()
t, err := m.Time()
if err != nil {
h.abort(err)
}
tt := t.Time()
if tt.After(now.Add(1 * time.Second)) {
h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt))
}
if tt.Before(now.Add(-1 * timeout)) {
h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt))
case m := <-h.heartbeat.Events():
if m != nil {
now := time.Now()
t, err := m.Stats().Time()
if err != nil {
h.abort(err)
}
tt := t.Time()
if tt.After(now.Add(1 * time.Second)) {
h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt))
}
if tt.Before(now.Add(-1 * timeout)) {
h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt))
h.signalHandover()
continue
}
s, err := m.Stats().IcingaStatus()
if err != nil {
h.abort(err)
}
select {
case <-logTicker.C:
shouldLog = true
default:
}
var realizeCtx context.Context
var cancelRealizeCtx context.CancelFunc
if h.responsible {
realizeCtx, cancelRealizeCtx = context.WithDeadline(h.ctx, m.ExpiryTime())
} else {
realizeCtx, cancelRealizeCtx = context.WithCancel(h.ctx)
}
err = h.realize(realizeCtx, s, t, shouldLog)
cancelRealizeCtx()
if errors.Is(err, context.DeadlineExceeded) {
h.signalHandover()
continue
}
if err != nil {
h.abort(err)
}
if !oldInstancesRemoved {
go h.removeOldInstances(s)
oldInstancesRemoved = true
}
shouldLog = false
} else {
h.logger.Error("Lost heartbeat")
h.signalHandover()
continue
}
s, err := m.IcingaStatus()
if err != nil {
h.abort(err)
}
select {
case <-logTicker.C:
shouldLog = true
default:
}
if err = h.realize(s, t, shouldLog); err != nil {
h.abort(err)
}
if !oldInstancesRemoved {
go h.removeOldInstances(s)
oldInstancesRemoved = true
}
shouldLog = false
case <-h.heartbeat.Lost():
h.logger.Error("Lost heartbeat")
h.signalHandover()
case <-h.heartbeat.Done():
if err := h.heartbeat.Err(); err != nil {
h.abort(err)
@ -168,13 +184,13 @@ func (h *HA) controller() {
}
}
func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLog bool) error {
func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLog bool) error {
boff := backoff.NewExponentialWithJitter(time.Millisecond*256, time.Second*3)
for attempt := 0; true; attempt++ {
sleep := boff(uint64(attempt))
time.Sleep(sleep)
ctx, cancelCtx := context.WithCancel(h.ctx)
ctx, cancelCtx := context.WithCancel(ctx)
tx, err := h.db.BeginTxx(ctx, &sql.TxOptions{
Isolation: sql.LevelSerializable,
})

View file

@ -3,7 +3,6 @@ package icingaredis
import (
"context"
"github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/pkg/com"
v1 "github.com/icinga/icingadb/pkg/icingaredis/v1"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
@ -21,16 +20,13 @@ var timeout = 60 * time.Second
// Also signals on if the heartbeat is Lost.
type Heartbeat struct {
active bool
beat *com.Cond
events chan *HeartbeatMessage
cancelCtx context.CancelFunc
client *Client
done chan struct{}
errMu sync.Mutex
err error
logger *zap.SugaredLogger
lost *com.Cond
message *v1.StatsMessage
messageMu sync.Mutex
}
// NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop.
@ -38,12 +34,11 @@ func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger
ctx, cancelCtx := context.WithCancel(ctx)
heartbeat := &Heartbeat{
beat: com.NewCond(ctx),
events: make(chan *HeartbeatMessage, 1),
cancelCtx: cancelCtx,
client: client,
done: make(chan struct{}),
logger: logger,
lost: com.NewCond(ctx),
}
go heartbeat.controller(ctx)
@ -51,9 +46,11 @@ func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger
return heartbeat
}
// Beat returns a channel that will be closed when a new heartbeat is received.
func (h *Heartbeat) Beat() <-chan struct{} {
return h.beat.Wait()
// Events returns a channel that is sent to on heartbeat events.
//
// A non-nil pointer signals that a heartbeat was received from Icinga 2 whereas a nil pointer signals a heartbeat loss.
func (h *Heartbeat) Events() <-chan *HeartbeatMessage {
return h.events
}
// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any.
@ -78,26 +75,13 @@ func (h *Heartbeat) Err() error {
return h.err
}
// Lost returns a channel that will be closed if the heartbeat is lost.
func (h *Heartbeat) Lost() <-chan struct{} {
return h.lost.Wait()
}
// Message returns the last heartbeat message.
func (h *Heartbeat) Message() *v1.StatsMessage {
h.messageMu.Lock()
defer h.messageMu.Unlock()
return h.message
}
// controller loop.
func (h *Heartbeat) controller(ctx context.Context) {
defer close(h.done)
h.logger.Info("Waiting for Icinga 2 heartbeat")
messages := make(chan v1.StatsMessage)
messages := make(chan *HeartbeatMessage)
defer close(messages)
g, ctx := errgroup.WithContext(ctx)
@ -119,8 +103,13 @@ func (h *Heartbeat) controller(ctx context.Context) {
return WrapCmdErr(cmd)
}
m := &HeartbeatMessage{
received: time.Now(),
stats: streams[0].Messages[0].Values,
}
select {
case messages <- streams[0].Messages[0].Values:
case messages <- m:
case <-ctx.Done():
return ctx.Err()
}
@ -135,19 +124,18 @@ func (h *Heartbeat) controller(ctx context.Context) {
select {
case m := <-messages:
if !h.active {
s, err := m.IcingaStatus()
s, err := m.Stats().IcingaStatus()
if err != nil {
return errors.Wrapf(err, "can't parse Icinga 2 status from message %#v", m)
}
h.logger.Infow("Received first Icinga 2 heartbeat", zap.String("environment", s.Environment))
h.active = true
}
h.setMessage(&m)
h.beat.Broadcast()
h.sendEvent(m)
case <-time.After(timeout):
if h.active {
h.logger.Warnw("Lost Icinga 2 heartbeat", zap.Duration("timeout", timeout))
h.lost.Broadcast()
h.sendEvent(nil)
h.active = false
} else {
h.logger.Warn("Waiting for Icinga 2 heartbeat")
@ -175,9 +163,35 @@ func (h *Heartbeat) setError(err error) {
h.err = errors.Wrap(err, "heartbeat failed")
}
func (h *Heartbeat) setMessage(m *v1.StatsMessage) {
h.messageMu.Lock()
defer h.messageMu.Unlock()
func (h *Heartbeat) sendEvent(m *HeartbeatMessage) {
// Remove any not yet delivered event
select {
case old := <-h.events:
if old != nil {
h.logger.Debugw("Previous heartbeat not read from channel",
zap.Time("previous", old.received),
zap.Time("current", m.received))
} else {
h.logger.Debug("Previous heartbeat loss event not read from channel")
}
default:
}
h.message = m
h.events <- m
}
// HeartbeatMessage represents a heartbeat received from Icinga 2 together with a timestamp when it was received.
type HeartbeatMessage struct {
received time.Time
stats v1.StatsMessage
}
// Stats returns the underlying heartbeat message from the icinga:stats stream.
func (m *HeartbeatMessage) Stats() *v1.StatsMessage {
return &m.stats
}
// ExpiryTime returns the timestamp when the heartbeat expires.
func (m *HeartbeatMessage) ExpiryTime() time.Time {
return m.received.Add(timeout)
}

View file

@ -28,6 +28,8 @@ type Settings struct {
func WithBackoff(
ctx context.Context, retryableFunc RetryableFunc, retryable IsRetryable, b backoff.Backoff, settings Settings,
) (err error) {
parentCtx := ctx
if settings.Timeout > 0 {
var cancelCtx context.CancelFunc
ctx, cancelCtx = context.WithTimeout(ctx, settings.Timeout)
@ -65,10 +67,14 @@ func WithBackoff(
sleep := b(attempt)
select {
case <-ctx.Done():
if err == nil {
err = ctx.Err()
if outerErr := parentCtx.Err(); outerErr != nil {
err = errors.Wrap(outerErr, "outer context canceled")
} else {
if err == nil {
err = ctx.Err()
}
err = errors.Wrap(err, "can't retry")
}
err = errors.Wrap(err, "can't retry")
return
case <-time.After(sleep):