mirror of
https://github.com/Icinga/icingadb.git
synced 2026-05-28 04:35:54 -04:00
Merge pull request #800 from Icinga/ha-desperate-timing-fixes
Abort HA Realization Logic After Timeout
This commit is contained in:
commit
63d51dfa93
3 changed files with 70 additions and 29 deletions
|
|
@ -326,7 +326,10 @@ func run() int {
|
|||
|
||||
cancelHactx()
|
||||
case <-hactx.Done():
|
||||
// Nothing to do here, surrounding loop will terminate now.
|
||||
if ctx.Err() != nil {
|
||||
logger.Fatalf("%+v", errors.New("main context closed unexpectedly"))
|
||||
}
|
||||
// Otherwise, there is nothing to do here, surrounding loop will terminate now.
|
||||
case <-ha.Done():
|
||||
if err := ha.Err(); err != nil {
|
||||
logger.Fatalf("%+v", errors.Wrap(err, "HA exited with an error"))
|
||||
|
|
@ -338,8 +341,6 @@ func run() int {
|
|||
cancelHactx()
|
||||
|
||||
return ExitFailure
|
||||
case <-ctx.Done():
|
||||
logger.Fatalf("%+v", errors.New("main context closed unexpectedly"))
|
||||
case s := <-sig:
|
||||
logger.Infow("Exiting due to signal", zap.String("signal", s.String()))
|
||||
cancelHactx()
|
||||
|
|
|
|||
|
|
@ -173,7 +173,7 @@ func (h *HA) controller() {
|
|||
}
|
||||
tt := t.Time()
|
||||
if tt.After(now.Add(1 * time.Second)) {
|
||||
h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt))
|
||||
h.logger.Warnw("Received heartbeat from the future", zap.Time("time", tt))
|
||||
}
|
||||
if tt.Before(now.Add(-1 * peerTimeout)) {
|
||||
h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt))
|
||||
|
|
@ -221,7 +221,7 @@ func (h *HA) controller() {
|
|||
|
||||
// Ensure that updating/inserting the instance row is completed by the current heartbeat's expiry time.
|
||||
realizeCtx, cancelRealizeCtx := context.WithDeadline(h.ctx, m.ExpiryTime())
|
||||
err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents)
|
||||
err = h.realize(realizeCtx, s, envId, shouldLogRoutineEvents)
|
||||
cancelRealizeCtx()
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
h.signalHandover("instance update/insert deadline exceeded heartbeat expiry time")
|
||||
|
|
@ -267,11 +267,16 @@ func (h *HA) controller() {
|
|||
|
||||
// realize a HA cycle triggered by a heartbeat event.
|
||||
//
|
||||
// The context passed is expected to have a deadline, otherwise the method will panic. This deadline is strictly
|
||||
// enforced to abort the realization logic the moment the context expires.
|
||||
//
|
||||
// shouldLogRoutineEvents indicates if recurrent events should be logged.
|
||||
//
|
||||
// The internal, retryable function always fetches the last received heartbeat's timestamp instead of reusing the one
|
||||
// from the calling controller loop. Doing so results in inserting a more accurate timestamp if a retry happens.
|
||||
func (h *HA) realize(
|
||||
ctx context.Context,
|
||||
s *icingaredisv1.IcingaStatus,
|
||||
t *types.UnixMilli,
|
||||
envId types.Binary,
|
||||
shouldLogRoutineEvents bool,
|
||||
) error {
|
||||
|
|
@ -303,6 +308,7 @@ func (h *HA) realize(
|
|||
if errBegin != nil {
|
||||
return errors.Wrap(errBegin, "can't start transaction")
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
|
||||
query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+
|
||||
"WHERE environment_id = ? AND responsible = ? AND id <> ?") + selectLock
|
||||
|
|
@ -353,7 +359,7 @@ func (h *HA) realize(
|
|||
EnvironmentMeta: v1.EnvironmentMeta{
|
||||
EnvironmentId: envId,
|
||||
},
|
||||
Heartbeat: *t,
|
||||
Heartbeat: types.UnixMilli(time.UnixMilli(h.heartbeat.LastMessageTime())),
|
||||
Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true},
|
||||
EndpointId: s.EndpointId,
|
||||
Icinga2Version: s.Version,
|
||||
|
|
@ -373,15 +379,51 @@ func (h *HA) realize(
|
|||
|
||||
if takeover != "" {
|
||||
stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?")
|
||||
_, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId)
|
||||
if _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId); err != nil {
|
||||
return database.CantPerformQuery(err, stmt)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// Insert the environment after each heartbeat takeover if it does not already exist in the database
|
||||
// as the environment may have changed, although this is likely to happen very rarely.
|
||||
stmt, _ = h.db.BuildInsertIgnoreStmt(h.environment)
|
||||
if _, err := h.db.NamedExecContext(ctx, stmt, h.environment); err != nil {
|
||||
return database.CantPerformQuery(err, stmt)
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return errors.Wrap(err, "can't commit transaction")
|
||||
// In general, cancellation does not work for COMMIT and ROLLBACK. Some database drivers may support a
|
||||
// context-based abort, but only if the DBMS allows it. This was also discussed in the initial issue about
|
||||
// context support to Go's sql package: https://github.com/golang/go/issues/15123#issuecomment-245882486
|
||||
//
|
||||
// This paragraph is implementation knowledge, not covered by the API specification. Go's sql.Tx.Commit() -
|
||||
// which is not being overridden by sqlx.Tx - performs a preflight check on the context before handing over
|
||||
// to the driver's Commit() method. Drivers may behave differently. For example, the used
|
||||
// github.com/go-sql-driver/mysql package calls its internal exec() method with a COMMIT query, writing and
|
||||
// reading packets without honoring the context.
|
||||
//
|
||||
// In a nutshell, one cannot expect a Tx.Commit() call to be covered by the transaction context. For this
|
||||
// reason, the following Commit() call has been moved to its own goroutine, which communicates back via a
|
||||
// channel selected along with the context. If the context ends before Commit(), this retryable function
|
||||
// returns with a non-retryable error.
|
||||
//
|
||||
// However, while the COMMIT continues in the background, it may still succeed. In this case, the state of
|
||||
// the database does not match the state of Icinga DB, specifically the database says that this instance is
|
||||
// active while this instance thinks otherwise. Fortunately, this mismatch is not critical because when this
|
||||
// function is re-entered, the initial SELECT query would be empty for this Icinga DB node and imply the
|
||||
// presence of another active instance for the other node. Effectively, this could result in a single HA
|
||||
// cycle with no active node. Afterwards, either this instance takes over due to the false impression that
|
||||
// no other node is active, or the other instances does so as the inserted heartbeat has already expired.
|
||||
// Not great, not terrible.
|
||||
commitErrCh := make(chan error, 1)
|
||||
go func() { commitErrCh <- tx.Commit() }()
|
||||
|
||||
select {
|
||||
case err := <-commitErrCh:
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "can't commit transaction")
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
@ -423,12 +465,6 @@ func (h *HA) realize(
|
|||
}
|
||||
|
||||
if takeover != "" {
|
||||
// Insert the environment after each heartbeat takeover if it does not already exist in the database
|
||||
// as the environment may have changed, although this is likely to happen very rarely.
|
||||
if err := h.insertEnvironment(); err != nil {
|
||||
return errors.Wrap(err, "can't insert environment")
|
||||
}
|
||||
|
||||
h.signalTakeover(takeover)
|
||||
} else if otherResponsible {
|
||||
if state := h.state.Load(); !state.otherResponsible {
|
||||
|
|
@ -451,18 +487,6 @@ func (h *HA) realizeLostHeartbeat() {
|
|||
}
|
||||
}
|
||||
|
||||
// insertEnvironment inserts the environment from the specified state into the database if it does not already exist.
|
||||
func (h *HA) insertEnvironment() error {
|
||||
// Instead of checking whether the environment already exists, use an INSERT statement that does nothing if it does.
|
||||
stmt, _ := h.db.BuildInsertIgnoreStmt(h.environment)
|
||||
|
||||
if _, err := h.db.NamedExecContext(h.ctx, stmt, h.environment); err != nil {
|
||||
return database.CantPerformQuery(err, stmt)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *HA) removeInstance(ctx context.Context) {
|
||||
h.logger.Debugw("Removing our row from icingadb_instance", zap.String("instance_id", hex.EncodeToString(h.instanceId)))
|
||||
// Intentionally not using h.ctx here as it's already cancelled.
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ type Heartbeat struct {
|
|||
active bool
|
||||
events chan *HeartbeatMessage
|
||||
lastReceivedMs atomic.Int64
|
||||
lastMessageMs atomic.Int64
|
||||
cancelCtx context.CancelFunc
|
||||
client *redis.Client
|
||||
done chan struct{}
|
||||
|
|
@ -62,6 +63,11 @@ func (h *Heartbeat) LastReceived() int64 {
|
|||
return h.lastReceivedMs.Load()
|
||||
}
|
||||
|
||||
// LastMessageTime returns the last message's time in ms.
|
||||
func (h *Heartbeat) LastMessageTime() int64 {
|
||||
return h.lastMessageMs.Load()
|
||||
}
|
||||
|
||||
// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any.
|
||||
// Implements the io.Closer interface.
|
||||
func (h *Heartbeat) Close() error {
|
||||
|
|
@ -139,6 +145,15 @@ func (h *Heartbeat) controller(ctx context.Context) {
|
|||
}
|
||||
|
||||
h.lastReceivedMs.Store(m.received.UnixMilli())
|
||||
|
||||
statsT, err := m.stats.Time()
|
||||
if err != nil {
|
||||
h.logger.Warnw("Received Icinga heartbeat with invalid stats time", zap.Error(err))
|
||||
h.lastMessageMs.Store(0)
|
||||
} else {
|
||||
h.lastMessageMs.Store(statsT.Time().UnixMilli())
|
||||
}
|
||||
|
||||
h.sendEvent(m)
|
||||
case <-time.After(Timeout):
|
||||
if h.active {
|
||||
|
|
@ -150,6 +165,7 @@ func (h *Heartbeat) controller(ctx context.Context) {
|
|||
}
|
||||
|
||||
h.lastReceivedMs.Store(0)
|
||||
h.lastMessageMs.Store(0)
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue