mirror of
https://github.com/Icinga/icingadb.git
synced 2026-05-28 04:35:54 -04:00
Enhance HA "Taking over", "Handing over" logging
The reason for a switch in the HA roles was not always directly clear.
This change now introduces additional debug logging, indicating the
reasoning for either taking over or handing over the HA responsibility.
First, some logic was moved from the SQL query selecting active Icinga
DB instances to Go code. This allowed distinguishing between no
available responsible instances and responsible instances with an
expired heartbeat.
As the HA's peer timeout is logically bound to the Redis timeout, it
will now reference this timeout with an additional grace timeout. Doing
so eliminates a race between a handing over and a "forceful" take over.
As the old code indicated a takeover on the fact that no other instance
is active, it will now additionally check if it is already being the
active/responsible node. In this case, the takeover logic - which will
be interrupted at a later point as the node is already responsible - can
be skipped.
Next to the additional logging messages, both the takeover and handover
channel are now transporting a string to communicate the reason instead
of an empty struct{}. By doing so, both the "Taking over" and "Handing
over" log messages are enriched with reason.
This also required a change in the suppressed logging handling of the
HA.realize method, which got its logging enabled through the shouldLog
parameter. Now, there are both recurring events, which might be
suppressed, as well as state changing events, which should be logged.
Therefore, and because the logTicker's functionality was not clear to me
on first glance, I renamed it to routineLogTicker.
While dealing with the code, some function signature documentation were
added, to ease both mine as well as the understanding of future readers.
Additionally, the error handling of the SQL query selecting active
Icinga DB instances was changed slightly to also handle wrapped
sql.ErrNoRows errors.
Closes #688.
This commit is contained in:
parent
6f7558d96f
commit
779afd1da3
3 changed files with 92 additions and 57 deletions
|
|
@ -162,8 +162,8 @@ func run() int {
|
|||
hactx, cancelHactx := context.WithCancel(ctx)
|
||||
for hactx.Err() == nil {
|
||||
select {
|
||||
case <-ha.Takeover():
|
||||
logger.Info("Taking over")
|
||||
case takeoverReason := <-ha.Takeover():
|
||||
logger.Infow("Taking over", zap.String("reason", takeoverReason))
|
||||
|
||||
go func() {
|
||||
for hactx.Err() == nil {
|
||||
|
|
@ -324,8 +324,8 @@ func run() int {
|
|||
}
|
||||
}
|
||||
}()
|
||||
case <-ha.Handover():
|
||||
logger.Warn("Handing over")
|
||||
case handoverReason := <-ha.Handover():
|
||||
logger.Warnw("Handing over", zap.String("reason", handoverReason))
|
||||
|
||||
cancelHactx()
|
||||
case <-hactx.Done():
|
||||
|
|
|
|||
|
|
@ -23,7 +23,10 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
var timeout = 60 * time.Second
|
||||
// peerTimeout defines the timeout for HA heartbeats, being used to detect absent nodes.
|
||||
//
|
||||
// Because this timeout relies on icingaredis.Timeout, it is icingaredis.Timeout plus a short grace period.
|
||||
const peerTimeout = icingaredis.Timeout + 5*time.Second
|
||||
|
||||
type haState struct {
|
||||
responsibleTsMilli int64
|
||||
|
|
@ -43,8 +46,8 @@ type HA struct {
|
|||
heartbeat *icingaredis.Heartbeat
|
||||
logger *logging.Logger
|
||||
responsible bool
|
||||
handover chan struct{}
|
||||
takeover chan struct{}
|
||||
handover chan string
|
||||
takeover chan string
|
||||
done chan struct{}
|
||||
errOnce sync.Once
|
||||
errMu sync.Mutex
|
||||
|
|
@ -64,8 +67,8 @@ func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger
|
|||
db: db,
|
||||
heartbeat: heartbeat,
|
||||
logger: logger,
|
||||
handover: make(chan struct{}),
|
||||
takeover: make(chan struct{}),
|
||||
handover: make(chan string),
|
||||
takeover: make(chan string),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
|
|
@ -107,13 +110,13 @@ func (h *HA) Err() error {
|
|||
return h.err
|
||||
}
|
||||
|
||||
// Handover returns a channel with which handovers are signaled.
|
||||
func (h *HA) Handover() chan struct{} {
|
||||
// Handover returns a channel with which handovers and their reasons are signaled.
|
||||
func (h *HA) Handover() chan string {
|
||||
return h.handover
|
||||
}
|
||||
|
||||
// Takeover returns a channel with which takeovers are signaled.
|
||||
func (h *HA) Takeover() chan struct{} {
|
||||
// Takeover returns a channel with which takeovers and their reasons are signaled.
|
||||
func (h *HA) Takeover() chan string {
|
||||
return h.takeover
|
||||
}
|
||||
|
||||
|
|
@ -141,9 +144,10 @@ func (h *HA) controller() {
|
|||
|
||||
oldInstancesRemoved := false
|
||||
|
||||
logTicker := time.NewTicker(time.Second * 60)
|
||||
defer logTicker.Stop()
|
||||
shouldLog := true
|
||||
// Suppress recurring log messages in the realize method to be only logged this often.
|
||||
routineLogTicker := time.NewTicker(5 * time.Minute)
|
||||
defer routineLogTicker.Stop()
|
||||
shouldLogRoutineEvents := true
|
||||
|
||||
for {
|
||||
select {
|
||||
|
|
@ -158,9 +162,9 @@ func (h *HA) controller() {
|
|||
if tt.After(now.Add(1 * time.Second)) {
|
||||
h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt))
|
||||
}
|
||||
if tt.Before(now.Add(-1 * timeout)) {
|
||||
if tt.Before(now.Add(-1 * peerTimeout)) {
|
||||
h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt))
|
||||
h.signalHandover()
|
||||
h.signalHandover("received heartbeat from the past")
|
||||
h.realizeLostHeartbeat()
|
||||
continue
|
||||
}
|
||||
|
|
@ -192,8 +196,8 @@ func (h *HA) controller() {
|
|||
}
|
||||
|
||||
select {
|
||||
case <-logTicker.C:
|
||||
shouldLog = true
|
||||
case <-routineLogTicker.C:
|
||||
shouldLogRoutineEvents = true
|
||||
default:
|
||||
}
|
||||
|
||||
|
|
@ -204,10 +208,10 @@ func (h *HA) controller() {
|
|||
} else {
|
||||
realizeCtx, cancelRealizeCtx = context.WithCancel(h.ctx)
|
||||
}
|
||||
err = h.realize(realizeCtx, s, t, envId, shouldLog)
|
||||
err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents)
|
||||
cancelRealizeCtx()
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
h.signalHandover()
|
||||
h.signalHandover("context deadline exceeded")
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
|
|
@ -219,10 +223,10 @@ func (h *HA) controller() {
|
|||
oldInstancesRemoved = true
|
||||
}
|
||||
|
||||
shouldLog = false
|
||||
shouldLogRoutineEvents = false
|
||||
} else {
|
||||
h.logger.Error("Lost heartbeat")
|
||||
h.signalHandover()
|
||||
h.signalHandover("lost heartbeat")
|
||||
h.realizeLostHeartbeat()
|
||||
}
|
||||
case <-h.heartbeat.Done():
|
||||
|
|
@ -235,13 +239,25 @@ func (h *HA) controller() {
|
|||
}
|
||||
}
|
||||
|
||||
func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary, shouldLog bool) error {
|
||||
var takeover, otherResponsible bool
|
||||
// realize a HA cycle triggered by a heartbeat event.
|
||||
//
|
||||
// shouldLogRoutineEvents indicates if recurrent events should be logged.
|
||||
func (h *HA) realize(
|
||||
ctx context.Context,
|
||||
s *icingaredisv1.IcingaStatus,
|
||||
t *types.UnixMilli,
|
||||
envId types.Binary,
|
||||
shouldLogRoutineEvents bool,
|
||||
) error {
|
||||
var (
|
||||
takeover string
|
||||
otherResponsible bool
|
||||
)
|
||||
|
||||
err := retry.WithBackoff(
|
||||
ctx,
|
||||
func(ctx context.Context) error {
|
||||
takeover = false
|
||||
takeover = ""
|
||||
otherResponsible = false
|
||||
isoLvl := sql.LevelSerializable
|
||||
selectLock := ""
|
||||
|
|
@ -259,25 +275,41 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
|
|||
}
|
||||
|
||||
query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+
|
||||
"WHERE environment_id = ? AND responsible = ? AND id <> ? AND heartbeat > ?") + selectLock
|
||||
"WHERE environment_id = ? AND responsible = ? AND id <> ?") + selectLock
|
||||
|
||||
instance := &v1.IcingadbInstance{}
|
||||
errQuery := tx.QueryRowxContext(ctx, query, envId, "y", h.instanceId).StructScan(instance)
|
||||
|
||||
errQuery := tx.QueryRowxContext(
|
||||
ctx, query, envId, "y", h.instanceId, time.Now().Add(-1*timeout).UnixMilli(),
|
||||
).StructScan(instance)
|
||||
switch errQuery {
|
||||
case nil:
|
||||
otherResponsible = true
|
||||
if shouldLog {
|
||||
h.logger.Infow("Another instance is active",
|
||||
zap.String("instance_id", instance.Id.String()),
|
||||
zap.String("environment", envId.String()),
|
||||
"heartbeat", instance.Heartbeat,
|
||||
zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time())))
|
||||
switch {
|
||||
case errQuery == nil:
|
||||
fields := []any{
|
||||
zap.String("instance_id", instance.Id.String()),
|
||||
zap.String("environment", envId.String()),
|
||||
zap.Time("heartbeat", instance.Heartbeat.Time()),
|
||||
zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time())),
|
||||
}
|
||||
case sql.ErrNoRows:
|
||||
takeover = true
|
||||
|
||||
if instance.Heartbeat.Time().Before(time.Now().Add(-1 * peerTimeout)) {
|
||||
takeover = "other instance's heartbeat has expired"
|
||||
h.logger.Debugw("Preparing to take over HA as other instance's heartbeat has expired", fields...)
|
||||
} else {
|
||||
otherResponsible = true
|
||||
if shouldLogRoutineEvents {
|
||||
h.logger.Infow("Another instance is active", fields...)
|
||||
}
|
||||
}
|
||||
|
||||
case errors.Is(errQuery, sql.ErrNoRows):
|
||||
fields := []any{
|
||||
zap.String("instance_id", h.instanceId.String()),
|
||||
zap.String("environment", envId.String())}
|
||||
if !h.responsible {
|
||||
takeover = "no other instance is active"
|
||||
h.logger.Debugw("Preparing to take over HA as no instance is active", fields...)
|
||||
} else if h.responsible && shouldLogRoutineEvents {
|
||||
h.logger.Debugw("Continuing being the active instance", fields...)
|
||||
}
|
||||
|
||||
default:
|
||||
return internal.CantPerformQuery(errQuery, query)
|
||||
}
|
||||
|
|
@ -292,7 +324,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
|
|||
EnvironmentId: envId,
|
||||
},
|
||||
Heartbeat: *t,
|
||||
Responsible: types.Bool{Bool: takeover || h.responsible, Valid: true},
|
||||
Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true},
|
||||
EndpointId: s.EndpointId,
|
||||
Icinga2Version: s.Version,
|
||||
Icinga2StartTime: s.ProgramStart,
|
||||
|
|
@ -309,7 +341,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
|
|||
return internal.CantPerformQuery(err, stmt)
|
||||
}
|
||||
|
||||
if takeover {
|
||||
if takeover != "" {
|
||||
stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?")
|
||||
_, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId)
|
||||
|
||||
|
|
@ -343,14 +375,14 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
|
|||
return err
|
||||
}
|
||||
|
||||
if takeover {
|
||||
if takeover != "" {
|
||||
// Insert the environment after each heartbeat takeover if it does not already exist in the database
|
||||
// as the environment may have changed, although this is likely to happen very rarely.
|
||||
if err := h.insertEnvironment(); err != nil {
|
||||
return errors.Wrap(err, "can't insert environment")
|
||||
}
|
||||
|
||||
h.signalTakeover()
|
||||
h.signalTakeover(takeover)
|
||||
} else if otherResponsible {
|
||||
if state, _ := h.state.Load(); !state.otherResponsible {
|
||||
state.otherResponsible = true
|
||||
|
|
@ -361,6 +393,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
|
|||
return nil
|
||||
}
|
||||
|
||||
// realizeLostHeartbeat updates "responsible = n" for this HA into the database.
|
||||
func (h *HA) realizeLostHeartbeat() {
|
||||
stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE id = ?")
|
||||
if _, err := h.db.ExecContext(h.ctx, stmt, "n", h.instanceId); err != nil && !utils.IsContextCanceled(err) {
|
||||
|
|
@ -394,10 +427,10 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar
|
|||
select {
|
||||
case <-h.ctx.Done():
|
||||
return
|
||||
case <-time.After(timeout):
|
||||
case <-time.After(peerTimeout):
|
||||
query := h.db.Rebind("DELETE FROM icingadb_instance " +
|
||||
"WHERE id <> ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?")
|
||||
heartbeat := types.UnixMilli(time.Now().Add(-timeout))
|
||||
heartbeat := types.UnixMilli(time.Now().Add(-1 * peerTimeout))
|
||||
result, err := h.db.ExecContext(h.ctx, query, h.instanceId, envId,
|
||||
s.EndpointId, heartbeat)
|
||||
if err != nil {
|
||||
|
|
@ -416,7 +449,8 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar
|
|||
}
|
||||
}
|
||||
|
||||
func (h *HA) signalHandover() {
|
||||
// signalHandover gives up HA.responsible and notifies the HA.Handover chan.
|
||||
func (h *HA) signalHandover(reason string) {
|
||||
if h.responsible {
|
||||
h.state.Store(haState{
|
||||
responsibleTsMilli: time.Now().UnixMilli(),
|
||||
|
|
@ -425,7 +459,7 @@ func (h *HA) signalHandover() {
|
|||
})
|
||||
|
||||
select {
|
||||
case h.handover <- struct{}{}:
|
||||
case h.handover <- reason:
|
||||
h.responsible = false
|
||||
case <-h.ctx.Done():
|
||||
// Noop
|
||||
|
|
@ -433,7 +467,8 @@ func (h *HA) signalHandover() {
|
|||
}
|
||||
}
|
||||
|
||||
func (h *HA) signalTakeover() {
|
||||
// signalTakeover claims HA.responsible and notifies the HA.Takeover chan.
|
||||
func (h *HA) signalTakeover(reason string) {
|
||||
if !h.responsible {
|
||||
h.state.Store(haState{
|
||||
responsibleTsMilli: time.Now().UnixMilli(),
|
||||
|
|
@ -442,7 +477,7 @@ func (h *HA) signalTakeover() {
|
|||
})
|
||||
|
||||
select {
|
||||
case h.takeover <- struct{}{}:
|
||||
case h.takeover <- reason:
|
||||
h.responsible = true
|
||||
case <-h.ctx.Done():
|
||||
// Noop
|
||||
|
|
|
|||
|
|
@ -16,9 +16,9 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
// timeout defines how long a heartbeat may be absent if a heartbeat has already been received.
|
||||
// Timeout defines how long a heartbeat may be absent if a heartbeat has already been received.
|
||||
// After this time, a heartbeat loss is propagated.
|
||||
var timeout = 60 * time.Second
|
||||
const Timeout = time.Minute
|
||||
|
||||
// Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received.
|
||||
// Also signals on if the heartbeat is Lost.
|
||||
|
|
@ -141,9 +141,9 @@ func (h *Heartbeat) controller(ctx context.Context) {
|
|||
|
||||
atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli())
|
||||
h.sendEvent(m)
|
||||
case <-time.After(timeout):
|
||||
case <-time.After(Timeout):
|
||||
if h.active {
|
||||
h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", timeout))
|
||||
h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", Timeout))
|
||||
h.sendEvent(nil)
|
||||
h.active = false
|
||||
} else {
|
||||
|
|
@ -217,5 +217,5 @@ func (m *HeartbeatMessage) EnvironmentID() (types.Binary, error) {
|
|||
|
||||
// ExpiryTime returns the timestamp when the heartbeat expires.
|
||||
func (m *HeartbeatMessage) ExpiryTime() time.Time {
|
||||
return m.received.Add(timeout)
|
||||
return m.received.Add(Timeout)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue