2021-03-03 15:10:04 -05:00
|
|
|
package icingaredis
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
2024-05-22 05:47:44 -04:00
|
|
|
"github.com/icinga/icinga-go-library/logging"
|
|
|
|
|
"github.com/icinga/icinga-go-library/redis"
|
|
|
|
|
"github.com/icinga/icinga-go-library/types"
|
|
|
|
|
"github.com/icinga/icinga-go-library/utils"
|
2021-03-03 15:10:04 -05:00
|
|
|
v1 "github.com/icinga/icingadb/pkg/icingaredis/v1"
|
2021-06-01 04:49:35 -04:00
|
|
|
"github.com/pkg/errors"
|
2021-03-03 15:10:04 -05:00
|
|
|
"go.uber.org/zap"
|
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
|
"sync"
|
2022-05-17 06:41:15 -04:00
|
|
|
"sync/atomic"
|
2021-03-03 15:10:04 -05:00
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
Enhance HA "Taking over", "Handing over" logging
The reason for a switch in the HA roles was not always directly clear.
This change now introduces additional debug logging, indicating the
reasoning for either taking over or handing over the HA responsibility.
First, some logic was moved from the SQL query selecting active Icinga
DB instances to Go code. This allowed distinguishing between no
available responsible instances and responsible instances with an
expired heartbeat.
As the HA's peer timeout is logically bound to the Redis timeout, it
will now reference this timeout with an additional grace timeout. Doing
so eliminates a race between a handing over and a "forceful" take over.
As the old code indicated a takeover on the fact that no other instance
is active, it will now additionally check if it is already being the
active/responsible node. In this case, the takeover logic - which will
be interrupted at a later point as the node is already responsible - can
be skipped.
Next to the additional logging messages, both the takeover and handover
channel are now transporting a string to communicate the reason instead
of an empty struct{}. By doing so, both the "Taking over" and "Handing
over" log messages are enriched with reason.
This also required a change in the suppressed logging handling of the
HA.realize method, which got its logging enabled through the shouldLog
parameter. Now, there are both recurring events, which might be
suppressed, as well as state changing events, which should be logged.
Therefore, and because the logTicker's functionality was not clear to me
on first glance, I renamed it to routineLogTicker.
While dealing with the code, some function signature documentation were
added, to ease both mine as well as the understanding of future readers.
Additionally, the error handling of the SQL query selecting active
Icinga DB instances was changed slightly to also handle wrapped
sql.ErrNoRows errors.
Closes #688.
2024-03-12 07:25:10 -04:00
|
|
|
// Timeout defines how long a heartbeat may be absent if a heartbeat has already been received.
|
2021-07-15 08:09:01 -04:00
|
|
|
// After this time, a heartbeat loss is propagated.
|
Enhance HA "Taking over", "Handing over" logging
The reason for a switch in the HA roles was not always directly clear.
This change now introduces additional debug logging, indicating the
reasoning for either taking over or handing over the HA responsibility.
First, some logic was moved from the SQL query selecting active Icinga
DB instances to Go code. This allowed distinguishing between no
available responsible instances and responsible instances with an
expired heartbeat.
As the HA's peer timeout is logically bound to the Redis timeout, it
will now reference this timeout with an additional grace timeout. Doing
so eliminates a race between a handing over and a "forceful" take over.
As the old code indicated a takeover on the fact that no other instance
is active, it will now additionally check if it is already being the
active/responsible node. In this case, the takeover logic - which will
be interrupted at a later point as the node is already responsible - can
be skipped.
Next to the additional logging messages, both the takeover and handover
channel are now transporting a string to communicate the reason instead
of an empty struct{}. By doing so, both the "Taking over" and "Handing
over" log messages are enriched with reason.
This also required a change in the suppressed logging handling of the
HA.realize method, which got its logging enabled through the shouldLog
parameter. Now, there are both recurring events, which might be
suppressed, as well as state changing events, which should be logged.
Therefore, and because the logTicker's functionality was not clear to me
on first glance, I renamed it to routineLogTicker.
While dealing with the code, some function signature documentation were
added, to ease both mine as well as the understanding of future readers.
Additionally, the error handling of the SQL query selecting active
Icinga DB instances was changed slightly to also handle wrapped
sql.ErrNoRows errors.
Closes #688.
2024-03-12 07:25:10 -04:00
|
|
|
const Timeout = time.Minute
|
2021-03-03 15:10:04 -05:00
|
|
|
|
2021-07-15 08:09:01 -04:00
|
|
|
// Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received.
|
|
|
|
|
// Also signals on if the heartbeat is Lost.
|
2021-03-03 15:10:04 -05:00
|
|
|
type Heartbeat struct {
|
2022-05-17 06:41:15 -04:00
|
|
|
active bool
|
|
|
|
|
events chan *HeartbeatMessage
|
|
|
|
|
lastReceivedMs int64
|
|
|
|
|
cancelCtx context.CancelFunc
|
2024-05-21 05:58:08 -04:00
|
|
|
client *redis.Client
|
2022-05-17 06:41:15 -04:00
|
|
|
done chan struct{}
|
|
|
|
|
errMu sync.Mutex
|
|
|
|
|
err error
|
|
|
|
|
logger *logging.Logger
|
2021-03-03 15:10:04 -05:00
|
|
|
}
|
|
|
|
|
|
2021-07-15 08:09:01 -04:00
|
|
|
// NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop.
|
2024-05-21 05:58:08 -04:00
|
|
|
func NewHeartbeat(ctx context.Context, client *redis.Client, logger *logging.Logger) *Heartbeat {
|
2021-06-10 08:29:50 -04:00
|
|
|
ctx, cancelCtx := context.WithCancel(ctx)
|
2021-03-03 15:10:04 -05:00
|
|
|
|
|
|
|
|
heartbeat := &Heartbeat{
|
2021-09-24 10:42:39 -04:00
|
|
|
events: make(chan *HeartbeatMessage, 1),
|
2021-06-10 08:29:50 -04:00
|
|
|
cancelCtx: cancelCtx,
|
|
|
|
|
client: client,
|
|
|
|
|
done: make(chan struct{}),
|
|
|
|
|
logger: logger,
|
2021-03-03 15:10:04 -05:00
|
|
|
}
|
|
|
|
|
|
2021-07-15 08:09:01 -04:00
|
|
|
go heartbeat.controller(ctx)
|
2021-03-03 15:10:04 -05:00
|
|
|
|
|
|
|
|
return heartbeat
|
|
|
|
|
}
|
|
|
|
|
|
2021-09-23 09:48:30 -04:00
|
|
|
// Events returns a channel that is sent to on heartbeat events.
|
|
|
|
|
//
|
|
|
|
|
// A non-nil pointer signals that a heartbeat was received from Icinga 2 whereas a nil pointer signals a heartbeat loss.
|
2021-09-24 10:42:39 -04:00
|
|
|
func (h *Heartbeat) Events() <-chan *HeartbeatMessage {
|
2021-09-23 09:48:30 -04:00
|
|
|
return h.events
|
2021-07-15 08:09:01 -04:00
|
|
|
}
|
|
|
|
|
|
2022-05-17 06:41:15 -04:00
|
|
|
// LastReceived returns the last heartbeat's receive time in ms.
|
|
|
|
|
func (h *Heartbeat) LastReceived() int64 {
|
|
|
|
|
return atomic.LoadInt64(&h.lastReceivedMs)
|
|
|
|
|
}
|
|
|
|
|
|
2021-07-15 08:09:01 -04:00
|
|
|
// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any.
|
|
|
|
|
// Implements the io.Closer interface.
|
|
|
|
|
func (h *Heartbeat) Close() error {
|
2021-06-10 08:29:50 -04:00
|
|
|
h.cancelCtx()
|
2021-03-03 15:10:04 -05:00
|
|
|
<-h.Done()
|
2021-07-15 08:09:01 -04:00
|
|
|
|
2021-03-03 15:10:04 -05:00
|
|
|
return h.Err()
|
|
|
|
|
}
|
|
|
|
|
|
2021-07-15 08:09:01 -04:00
|
|
|
// Done returns a channel that will be closed when the heartbeat controller loop has ended.
|
|
|
|
|
func (h *Heartbeat) Done() <-chan struct{} {
|
2021-03-03 15:10:04 -05:00
|
|
|
return h.done
|
|
|
|
|
}
|
|
|
|
|
|
2021-07-15 08:09:01 -04:00
|
|
|
// Err returns an error if Done has been closed and there is an error. Otherwise returns nil.
|
|
|
|
|
func (h *Heartbeat) Err() error {
|
|
|
|
|
h.errMu.Lock()
|
|
|
|
|
defer h.errMu.Unlock()
|
2021-03-03 15:10:04 -05:00
|
|
|
|
|
|
|
|
return h.err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// controller loop.
|
2021-07-15 08:09:01 -04:00
|
|
|
func (h *Heartbeat) controller(ctx context.Context) {
|
|
|
|
|
defer close(h.done)
|
|
|
|
|
|
2021-09-24 10:42:39 -04:00
|
|
|
messages := make(chan *HeartbeatMessage)
|
2021-03-03 15:10:04 -05:00
|
|
|
defer close(messages)
|
|
|
|
|
|
2021-07-15 08:09:01 -04:00
|
|
|
g, ctx := errgroup.WithContext(ctx)
|
2021-03-03 15:10:04 -05:00
|
|
|
|
2021-07-15 08:09:01 -04:00
|
|
|
// Message producer loop.
|
2021-03-03 15:10:04 -05:00
|
|
|
g.Go(func() error {
|
2021-07-15 08:09:01 -04:00
|
|
|
// We expect heartbeats every second but only read them every 3 seconds.
|
2024-04-10 09:53:43 -04:00
|
|
|
throttle := time.NewTicker(3 * time.Second)
|
2021-05-28 08:00:23 -04:00
|
|
|
defer throttle.Stop()
|
|
|
|
|
|
2022-06-22 04:45:24 -04:00
|
|
|
for id := "$"; ; {
|
|
|
|
|
streams, err := h.client.XReadUntilResult(ctx, &redis.XReadArgs{
|
|
|
|
|
Streams: []string{"icinga:stats", id},
|
2021-05-18 08:10:45 -04:00
|
|
|
})
|
2021-03-03 15:10:04 -05:00
|
|
|
if err != nil {
|
2022-06-22 04:45:24 -04:00
|
|
|
return errors.Wrap(err, "can't read Icinga heartbeat")
|
2021-03-03 15:10:04 -05:00
|
|
|
}
|
|
|
|
|
|
2021-09-24 10:42:39 -04:00
|
|
|
m := &HeartbeatMessage{
|
|
|
|
|
received: time.Now(),
|
|
|
|
|
stats: streams[0].Messages[0].Values,
|
|
|
|
|
}
|
|
|
|
|
|
2021-03-03 15:10:04 -05:00
|
|
|
select {
|
2021-09-24 10:42:39 -04:00
|
|
|
case messages <- m:
|
2021-03-03 15:10:04 -05:00
|
|
|
case <-ctx.Done():
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
}
|
|
|
|
|
|
2022-06-22 04:45:24 -04:00
|
|
|
id = streams[0].Messages[0].ID
|
|
|
|
|
|
2021-05-28 08:00:23 -04:00
|
|
|
<-throttle.C
|
2021-03-03 15:10:04 -05:00
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
2021-07-15 08:09:01 -04:00
|
|
|
// State loop.
|
2021-03-03 15:10:04 -05:00
|
|
|
g.Go(func() error {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case m := <-messages:
|
2021-05-12 07:49:03 -04:00
|
|
|
if !h.active {
|
2021-10-08 09:50:23 -04:00
|
|
|
envId, err := m.EnvironmentID()
|
2021-05-25 10:25:04 -04:00
|
|
|
if err != nil {
|
2021-10-08 09:50:23 -04:00
|
|
|
return err
|
2021-05-25 10:25:04 -04:00
|
|
|
}
|
2021-10-26 06:48:15 -04:00
|
|
|
h.logger.Infow("Received Icinga heartbeat", zap.String("environment", envId.String()))
|
2021-05-12 07:49:03 -04:00
|
|
|
h.active = true
|
|
|
|
|
}
|
2022-05-17 06:41:15 -04:00
|
|
|
|
|
|
|
|
atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli())
|
2021-09-24 10:42:39 -04:00
|
|
|
h.sendEvent(m)
|
Enhance HA "Taking over", "Handing over" logging
The reason for a switch in the HA roles was not always directly clear.
This change now introduces additional debug logging, indicating the
reasoning for either taking over or handing over the HA responsibility.
First, some logic was moved from the SQL query selecting active Icinga
DB instances to Go code. This allowed distinguishing between no
available responsible instances and responsible instances with an
expired heartbeat.
As the HA's peer timeout is logically bound to the Redis timeout, it
will now reference this timeout with an additional grace timeout. Doing
so eliminates a race between a handing over and a "forceful" take over.
As the old code indicated a takeover on the fact that no other instance
is active, it will now additionally check if it is already being the
active/responsible node. In this case, the takeover logic - which will
be interrupted at a later point as the node is already responsible - can
be skipped.
Next to the additional logging messages, both the takeover and handover
channel are now transporting a string to communicate the reason instead
of an empty struct{}. By doing so, both the "Taking over" and "Handing
over" log messages are enriched with reason.
This also required a change in the suppressed logging handling of the
HA.realize method, which got its logging enabled through the shouldLog
parameter. Now, there are both recurring events, which might be
suppressed, as well as state changing events, which should be logged.
Therefore, and because the logTicker's functionality was not clear to me
on first glance, I renamed it to routineLogTicker.
While dealing with the code, some function signature documentation were
added, to ease both mine as well as the understanding of future readers.
Additionally, the error handling of the SQL query selecting active
Icinga DB instances was changed slightly to also handle wrapped
sql.ErrNoRows errors.
Closes #688.
2024-03-12 07:25:10 -04:00
|
|
|
case <-time.After(Timeout):
|
2021-03-03 15:10:04 -05:00
|
|
|
if h.active {
|
Enhance HA "Taking over", "Handing over" logging
The reason for a switch in the HA roles was not always directly clear.
This change now introduces additional debug logging, indicating the
reasoning for either taking over or handing over the HA responsibility.
First, some logic was moved from the SQL query selecting active Icinga
DB instances to Go code. This allowed distinguishing between no
available responsible instances and responsible instances with an
expired heartbeat.
As the HA's peer timeout is logically bound to the Redis timeout, it
will now reference this timeout with an additional grace timeout. Doing
so eliminates a race between a handing over and a "forceful" take over.
As the old code indicated a takeover on the fact that no other instance
is active, it will now additionally check if it is already being the
active/responsible node. In this case, the takeover logic - which will
be interrupted at a later point as the node is already responsible - can
be skipped.
Next to the additional logging messages, both the takeover and handover
channel are now transporting a string to communicate the reason instead
of an empty struct{}. By doing so, both the "Taking over" and "Handing
over" log messages are enriched with reason.
This also required a change in the suppressed logging handling of the
HA.realize method, which got its logging enabled through the shouldLog
parameter. Now, there are both recurring events, which might be
suppressed, as well as state changing events, which should be logged.
Therefore, and because the logTicker's functionality was not clear to me
on first glance, I renamed it to routineLogTicker.
While dealing with the code, some function signature documentation were
added, to ease both mine as well as the understanding of future readers.
Additionally, the error handling of the SQL query selecting active
Icinga DB instances was changed slightly to also handle wrapped
sql.ErrNoRows errors.
Closes #688.
2024-03-12 07:25:10 -04:00
|
|
|
h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", Timeout))
|
2021-09-23 09:48:30 -04:00
|
|
|
h.sendEvent(nil)
|
2021-03-03 15:10:04 -05:00
|
|
|
h.active = false
|
2021-05-12 07:49:03 -04:00
|
|
|
} else {
|
2021-10-26 06:48:15 -04:00
|
|
|
h.logger.Warn("Waiting for Icinga heartbeat")
|
2021-03-03 15:10:04 -05:00
|
|
|
}
|
2022-05-17 06:41:15 -04:00
|
|
|
|
|
|
|
|
atomic.StoreInt64(&h.lastReceivedMs, 0)
|
2021-03-03 15:10:04 -05:00
|
|
|
case <-ctx.Done():
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
// Since the goroutines of the group actually run endlessly,
|
|
|
|
|
// we wait here forever, unless an error occurs.
|
|
|
|
|
if err := g.Wait(); err != nil && !utils.IsContextCanceled(err) {
|
2021-07-15 08:09:01 -04:00
|
|
|
// Do not propagate any context-canceled errors here,
|
|
|
|
|
// as this is to be expected when calling Close or
|
|
|
|
|
// when the parent context is canceled.
|
2021-03-03 15:10:04 -05:00
|
|
|
h.setError(err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *Heartbeat) setError(err error) {
|
2021-07-15 08:09:01 -04:00
|
|
|
h.errMu.Lock()
|
|
|
|
|
defer h.errMu.Unlock()
|
|
|
|
|
|
2021-06-01 04:49:35 -04:00
|
|
|
h.err = errors.Wrap(err, "heartbeat failed")
|
2021-07-15 08:09:01 -04:00
|
|
|
}
|
|
|
|
|
|
2021-09-24 10:42:39 -04:00
|
|
|
func (h *Heartbeat) sendEvent(m *HeartbeatMessage) {
|
2021-09-23 09:48:30 -04:00
|
|
|
// Remove any not yet delivered event
|
|
|
|
|
select {
|
|
|
|
|
case old := <-h.events:
|
|
|
|
|
if old != nil {
|
2023-01-19 10:55:11 -05:00
|
|
|
kv := []any{zap.Time("previous", old.received)}
|
|
|
|
|
if m != nil {
|
|
|
|
|
kv = append(kv, zap.Time("current", m.received))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
h.logger.Debugw("Previous heartbeat not read from channel", kv...)
|
2021-09-23 09:48:30 -04:00
|
|
|
} else {
|
|
|
|
|
h.logger.Debug("Previous heartbeat loss event not read from channel")
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
}
|
2021-07-15 08:09:01 -04:00
|
|
|
|
2021-09-23 09:48:30 -04:00
|
|
|
h.events <- m
|
2021-03-03 15:10:04 -05:00
|
|
|
}
|
2021-09-24 10:42:39 -04:00
|
|
|
|
|
|
|
|
// HeartbeatMessage represents a heartbeat received from Icinga 2 together with a timestamp when it was received.
|
|
|
|
|
type HeartbeatMessage struct {
|
|
|
|
|
received time.Time
|
|
|
|
|
stats v1.StatsMessage
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Stats returns the underlying heartbeat message from the icinga:stats stream.
|
|
|
|
|
func (m *HeartbeatMessage) Stats() *v1.StatsMessage {
|
|
|
|
|
return &m.stats
|
|
|
|
|
}
|
|
|
|
|
|
2021-10-08 09:50:23 -04:00
|
|
|
// EnvironmentID returns the Icinga DB environment ID stored in the heartbeat message.
|
|
|
|
|
func (m *HeartbeatMessage) EnvironmentID() (types.Binary, error) {
|
|
|
|
|
var id types.Binary
|
2024-05-21 05:49:45 -04:00
|
|
|
err := types.UnmarshalJSON([]byte(m.stats["icingadb_environment"].(string)), &id)
|
2021-10-08 09:50:23 -04:00
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return id, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-09-24 10:42:39 -04:00
|
|
|
// ExpiryTime returns the timestamp when the heartbeat expires.
|
|
|
|
|
func (m *HeartbeatMessage) ExpiryTime() time.Time {
|
Enhance HA "Taking over", "Handing over" logging
The reason for a switch in the HA roles was not always directly clear.
This change now introduces additional debug logging, indicating the
reasoning for either taking over or handing over the HA responsibility.
First, some logic was moved from the SQL query selecting active Icinga
DB instances to Go code. This allowed distinguishing between no
available responsible instances and responsible instances with an
expired heartbeat.
As the HA's peer timeout is logically bound to the Redis timeout, it
will now reference this timeout with an additional grace timeout. Doing
so eliminates a race between a handing over and a "forceful" take over.
As the old code indicated a takeover on the fact that no other instance
is active, it will now additionally check if it is already being the
active/responsible node. In this case, the takeover logic - which will
be interrupted at a later point as the node is already responsible - can
be skipped.
Next to the additional logging messages, both the takeover and handover
channel are now transporting a string to communicate the reason instead
of an empty struct{}. By doing so, both the "Taking over" and "Handing
over" log messages are enriched with reason.
This also required a change in the suppressed logging handling of the
HA.realize method, which got its logging enabled through the shouldLog
parameter. Now, there are both recurring events, which might be
suppressed, as well as state changing events, which should be logged.
Therefore, and because the logTicker's functionality was not clear to me
on first glance, I renamed it to routineLogTicker.
While dealing with the code, some function signature documentation were
added, to ease both mine as well as the understanding of future readers.
Additionally, the error handling of the SQL query selecting active
Icinga DB instances was changed slightly to also handle wrapped
sql.ErrNoRows errors.
Closes #688.
2024-03-12 07:25:10 -04:00
|
|
|
return m.received.Add(Timeout)
|
2021-09-24 10:42:39 -04:00
|
|
|
}
|