Merge pull request #319 from Icinga/heartbeat

Pointer receivers, Cond usage, pass ctx and Godoc for Heartbeat
This commit is contained in:
Eric Lippmann 2021-07-21 19:06:13 +02:00 committed by GitHub
commit ab4caa32a2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 132 additions and 93 deletions

View file

@ -2,81 +2,89 @@ package com
import (
"context"
"errors"
"github.com/pkg/errors"
)
var ErrCondClosed = errors.New("condition closed")
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
// Cond implements a channel-based synchronization for goroutines that wait for signals or send them.
// Internally based on a controller loop that handles the synchronization of new listeners and signal propagation,
// which is only started when NewCond is called. Thus the zero value cannot be used.
type Cond struct {
ctx context.Context
cancel context.CancelFunc
broadcast chan struct{}
done chan struct{}
cancel context.CancelFunc
listeners chan chan struct{}
}
// NewCond returns a new Cond and starts the controller loop.
func NewCond(ctx context.Context) *Cond {
done, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(ctx)
c := &Cond{
broadcast: make(chan struct{}),
cancel: cancel,
ctx: done,
done: make(chan struct{}),
listeners: make(chan chan struct{}),
}
go c.controller()
go c.controller(ctx)
return c
}
// Broadcast sends a signal to all current listeners by closing the previously returned channel from Wait.
// Panics if the controller loop has already ended.
func (c *Cond) Broadcast() {
select {
case c.broadcast <- struct{}{}:
case <-c.done:
panic(errors.New("condition closed"))
}
}
// Close stops the controller loop, waits for it to finish, and returns an error if any.
// Implements the io.Closer interface.
func (c *Cond) Close() error {
c.cancel()
<-c.done
return nil
}
// Done returns a channel that will be closed when the controller loop has ended.
func (c *Cond) Done() <-chan struct{} {
return c.done
}
// Wait returns a channel that is closed with the next signal.
// Panics if the controller loop has already ended.
func (c *Cond) Wait() <-chan struct{} {
select {
case l := <-c.listeners:
return l
case <-c.done:
panic(errors.New("condition closed"))
}
}
// controller loop.
func (c *Cond) controller() {
func (c *Cond) controller(ctx context.Context) {
defer close(c.done)
// Note that the notify channel does not close when the controller loop ends
// in order not to notify pending listeners.
notify := make(chan struct{})
for {
select {
case <-c.broadcast:
// all current receivers get a closed channel
// Close channel to notify all current listeners.
close(notify)
// set up next batch of receivers.
// Create a new channel for the next listeners.
notify = make(chan struct{})
case c.listeners <- notify:
// great. A Receiver has our channel
case <-c.ctx.Done():
// A new listener received the channel.
case <-ctx.Done():
return
}
}
}
// Close implements the io.Closer interface.
func (c *Cond) Close() error {
c.cancel()
return nil
}
// Wait returns a channel on which the next (close) signal will be sent.
func (c *Cond) Wait() <-chan struct{} {
select {
case l := <-c.listeners:
return l
case <-c.ctx.Done():
panic(ErrCondClosed)
}
}
// Broadcast wakes all current listeners.
func (c *Cond) Broadcast() {
select {
case c.broadcast <- struct{}{}:
case <-c.ctx.Done():
panic(ErrCondClosed)
}
}
func (c *Cond) Done() <-chan struct{} {
return c.ctx.Done()
}

View file

@ -114,11 +114,8 @@ func (h *HA) controller() {
for {
select {
case m, ok := <-h.heartbeat.Beat():
if !ok {
// Beat channel closed.
return
}
case <-h.heartbeat.Beat():
m := h.heartbeat.Message()
now := time.Now()
t, err := m.Time()
if err != nil {
@ -126,10 +123,10 @@ func (h *HA) controller() {
}
tt := t.Time()
if tt.After(now.Add(1 * time.Second)) {
h.logger.Debugw("Received heartbeat from future", "time", t)
h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt))
}
if tt.Before(now.Add(-1 * timeout)) {
h.logger.Errorw("Received heartbeat from the past", "time", t)
h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt))
h.signalHandover()
continue
}
@ -155,6 +152,10 @@ func (h *HA) controller() {
case <-h.heartbeat.Lost():
h.logger.Error("Lost heartbeat")
h.signalHandover()
case <-h.heartbeat.Done():
if err := h.heartbeat.Err(); err != nil {
h.abort(err)
}
case <-h.ctx.Done():
return
}

View file

@ -3,6 +3,7 @@ package icingaredis
import (
"context"
"github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/pkg/com"
v1 "github.com/icinga/icingadb/pkg/icingaredis/v1"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
@ -12,79 +13,98 @@ import (
"time"
)
// timeout defines how long a heartbeat may be absent if a heartbeat has already been received.
// After this time, a heartbeat loss is propagated.
var timeout = 60 * time.Second
// Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received.
// Also signals on if the heartbeat is Lost.
type Heartbeat struct {
ctx context.Context
cancel context.CancelFunc
client *Client
logger *zap.SugaredLogger
active bool
beat chan v1.StatsMessage
lost chan struct{}
done chan struct{}
mu *sync.Mutex
err error
active bool
beat *com.Cond
cancel context.CancelFunc
client *Client
done chan struct{}
errMu sync.Mutex
err error
logger *zap.SugaredLogger
lost *com.Cond
message *v1.StatsMessage
messageMu sync.Mutex
}
// NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop.
func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger) *Heartbeat {
ctx, cancel := context.WithCancel(ctx)
heartbeat := &Heartbeat{
ctx: ctx,
beat: com.NewCond(ctx),
cancel: cancel,
client: client,
logger: logger,
beat: make(chan v1.StatsMessage),
lost: make(chan struct{}),
done: make(chan struct{}),
mu: &sync.Mutex{},
logger: logger,
lost: com.NewCond(ctx),
}
go heartbeat.controller()
go heartbeat.controller(ctx)
return heartbeat
}
// Close implements the io.Closer interface.
func (h Heartbeat) Close() error {
// Cancel ctx.
// Beat returns a channel that will be closed when a new heartbeat is received.
func (h *Heartbeat) Beat() <-chan struct{} {
return h.beat.Wait()
}
// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any.
// Implements the io.Closer interface.
func (h *Heartbeat) Close() error {
h.cancel()
// Wait until the controller loop ended.
<-h.Done()
// And return an error, if any.
return h.Err()
}
func (h Heartbeat) Done() <-chan struct{} {
// Done returns a channel that will be closed when the heartbeat controller loop has ended.
func (h *Heartbeat) Done() <-chan struct{} {
return h.done
}
func (h Heartbeat) Err() error {
h.mu.Lock()
defer h.mu.Unlock()
// Err returns an error if Done has been closed and there is an error. Otherwise returns nil.
func (h *Heartbeat) Err() error {
h.errMu.Lock()
defer h.errMu.Unlock()
return h.err
}
func (h Heartbeat) Beat() <-chan v1.StatsMessage {
return h.beat
// Lost returns a channel that will be closed if the heartbeat is lost.
func (h *Heartbeat) Lost() <-chan struct{} {
return h.lost.Wait()
}
func (h Heartbeat) Lost() <-chan struct{} {
return h.lost
// Message returns the last heartbeat message.
func (h *Heartbeat) Message() *v1.StatsMessage {
h.messageMu.Lock()
defer h.messageMu.Unlock()
return h.message
}
// controller loop.
func (h Heartbeat) controller() {
func (h *Heartbeat) controller(ctx context.Context) {
defer close(h.done)
h.logger.Info("Waiting for Icinga 2 heartbeat")
messages := make(chan v1.StatsMessage)
defer close(messages)
g, ctx := errgroup.WithContext(h.ctx)
g, ctx := errgroup.WithContext(ctx)
// Message producer loop
// Message producer loop.
g.Go(func() error {
// We expect heartbeats every second but only read them every 3 seconds
// We expect heartbeats every second but only read them every 3 seconds.
throttle := time.NewTicker(time.Second * 3)
defer throttle.Stop()
@ -109,7 +129,7 @@ func (h Heartbeat) controller() {
}
})
// State loop
// State loop.
g.Go(func() error {
for {
select {
@ -122,11 +142,12 @@ func (h Heartbeat) controller() {
h.logger.Infow("Received first Icinga 2 heartbeat", zap.String("environment", s.Environment))
h.active = true
}
h.beat <- m
h.setMessage(&m)
h.beat.Broadcast()
case <-time.After(timeout):
if h.active {
h.logger.Warn("Lost Icinga 2 heartbeat", zap.Duration("timeout", timeout))
h.lost <- struct{}{}
h.lost.Broadcast()
h.active = false
} else {
h.logger.Warn("Waiting for Icinga 2 heartbeat")
@ -140,14 +161,23 @@ func (h Heartbeat) controller() {
// Since the goroutines of the group actually run endlessly,
// we wait here forever, unless an error occurs.
if err := g.Wait(); err != nil && !utils.IsContextCanceled(err) {
// Do not propagate context-aborted errors here,
// as this is to be expected when Close was called.
// Do not propagate any context-canceled errors here,
// as this is to be expected when calling Close or
// when the parent context is canceled.
h.setError(err)
}
}
func (h *Heartbeat) setError(err error) {
h.mu.Lock()
h.errMu.Lock()
defer h.errMu.Unlock()
h.err = errors.Wrap(err, "heartbeat failed")
h.mu.Unlock()
}
func (h *Heartbeat) setMessage(m *v1.StatsMessage) {
h.messageMu.Lock()
defer h.messageMu.Unlock()
h.message = m
}