diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 00a0bf68..1195462d 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -12,6 +12,7 @@ import ( "github.com/icinga/icingadb/pkg/icingaredis" icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1" "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/retry" "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" @@ -223,32 +224,28 @@ func (h *HA) controller() { } func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary, shouldLog bool) error { - boff := backoff.NewExponentialWithJitter(time.Millisecond*256, time.Second*3) - for attempt := 0; true; attempt++ { - sleep := boff(uint64(attempt)) - time.Sleep(sleep) + var takeover bool + + err := retry.WithBackoff( + ctx, + func(ctx context.Context) error { + takeover = false + + tx, errBegin := h.db.BeginTxx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) + if errBegin != nil { + return errors.Wrap(errBegin, "can't start transaction") + } + + query := `SELECT id, heartbeat FROM icingadb_instance` + + ` WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?` - ctx, cancelCtx := context.WithCancel(ctx) - tx, err := h.db.BeginTxx(ctx, &sql.TxOptions{ - Isolation: sql.LevelSerializable, - }) - if err != nil { - cancelCtx() - return errors.Wrap(err, "can't start transaction") - } - query := `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?` - rows, err := tx.QueryxContext(ctx, query, envId, "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout))) - if err != nil { - cancelCtx() - return internal.CantPerformQuery(err, query) - } - takeover := true - if rows.Next() { instance := &v1.IcingadbInstance{} - err := rows.StructScan(instance) - if err != nil { - h.logger.Errorw("Can't scan currently active instance", zap.Error(err)) - } else { + + errQuery := tx.QueryRowxContext( + ctx, query, envId, "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout)), + ).StructScan(instance) + switch errQuery { + case nil: if shouldLog { h.logger.Infow("Another instance is active", zap.String("instance_id", instance.Id.String()), @@ -256,70 +253,72 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type "heartbeat", instance.Heartbeat, zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time()))) } - takeover = false + case sql.ErrNoRows: + takeover = true + default: + return internal.CantPerformQuery(errQuery, query) } - } - _ = rows.Close() - i := v1.IcingadbInstance{ - EntityWithoutChecksum: v1.EntityWithoutChecksum{ - IdMeta: v1.IdMeta{ - Id: h.instanceId, + + i := v1.IcingadbInstance{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{ + Id: h.instanceId, + }, }, - }, - EnvironmentMeta: v1.EnvironmentMeta{ - EnvironmentId: envId, - }, - Heartbeat: *t, - Responsible: types.Bool{Bool: takeover || h.responsible, Valid: true}, - EndpointId: s.EndpointId, - Icinga2Version: s.Version, - Icinga2StartTime: s.ProgramStart, - Icinga2NotificationsEnabled: s.NotificationsEnabled, - Icinga2ActiveServiceChecksEnabled: s.ActiveServiceChecksEnabled, - Icinga2ActiveHostChecksEnabled: s.ActiveHostChecksEnabled, - Icinga2EventHandlersEnabled: s.EventHandlersEnabled, - Icinga2FlapDetectionEnabled: s.FlapDetectionEnabled, - Icinga2PerformanceDataEnabled: s.PerformanceDataEnabled, - } + EnvironmentMeta: v1.EnvironmentMeta{ + EnvironmentId: envId, + }, + Heartbeat: *t, + Responsible: types.Bool{Bool: takeover || h.responsible, Valid: true}, + EndpointId: s.EndpointId, + Icinga2Version: s.Version, + Icinga2StartTime: s.ProgramStart, + Icinga2NotificationsEnabled: s.NotificationsEnabled, + Icinga2ActiveServiceChecksEnabled: s.ActiveServiceChecksEnabled, + Icinga2ActiveHostChecksEnabled: s.ActiveHostChecksEnabled, + Icinga2EventHandlersEnabled: s.EventHandlersEnabled, + Icinga2FlapDetectionEnabled: s.FlapDetectionEnabled, + Icinga2PerformanceDataEnabled: s.PerformanceDataEnabled, + } - stmt, _ := h.db.BuildUpsertStmt(i) - _, err = tx.NamedExecContext(ctx, stmt, i) + stmt, _ := h.db.BuildUpsertStmt(i) + if _, err := tx.NamedExecContext(ctx, stmt, i); err != nil { + return internal.CantPerformQuery(err, stmt) + } - if err != nil { - cancelCtx() - err = internal.CantPerformQuery(err, stmt) - if !utils.IsDeadlock(err) { - h.logger.Errorw("Can't update or insert instance", zap.Error(err)) - break - } else { - if attempt > 2 { - // Log with info level after third attempt - h.logger.Infow("Can't update or insert instance. Retrying", zap.Error(err), zap.Int("retry count", attempt)) - } else { - h.logger.Debugw("Can't update or insert instance. Retrying", zap.Error(err), zap.Int("retry count", attempt)) + if err := tx.Commit(); err != nil { + return errors.Wrap(err, "can't commit transaction") + } + + return nil + }, + IsRetryable, + backoff.NewExponentialWithJitter(time.Millisecond*256, time.Second*3), + retry.Settings{ + OnError: func(_ time.Duration, attempt uint64, err, lastErr error) { + if lastErr == nil || err.Error() != lastErr.Error() { + log := h.logger.Debugw + if attempt > 2 { + log = h.logger.Infow + } + + log("Can't update or insert instance. Retrying", zap.Error(err), zap.Uint64("retry count", attempt)) } - continue - } + }, + }, + ) + if err != nil { + return err + } + + if takeover { + // Insert the environment after each heartbeat takeover if it does not already exist in the database + // as the environment may have changed, although this is likely to happen very rarely. + if err := h.insertEnvironment(); err != nil { + return errors.Wrap(err, "can't insert environment") } - if err := tx.Commit(); err != nil { - cancelCtx() - return errors.Wrap(err, "can't commit transaction") - } - - if takeover { - // Insert the environment after each heartbeat takeover if it does not already exist in the database - // as the environment may have changed, although this is likely to happen very rarely. - if err := h.insertEnvironment(); err != nil { - cancelCtx() - return errors.Wrap(err, "can't insert environment") - } - - h.signalTakeover() - } - - cancelCtx() - break + h.signalTakeover() } return nil