mirror of
https://github.com/Icinga/icingadb.git
synced 2026-06-08 16:34:29 -04:00
Merge pull request #432 from Icinga/bugfix/crash-after-interuption-of-mysql-connection-427
HA#realize(): re-try transaction on temporary error
This commit is contained in:
commit
b65e427232
1 changed files with 80 additions and 81 deletions
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/icinga/icingadb/pkg/icingaredis"
|
||||
icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/icinga/icingadb/pkg/retry"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
"github.com/pkg/errors"
|
||||
|
|
@ -223,32 +224,28 @@ func (h *HA) controller() {
|
|||
}
|
||||
|
||||
func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary, shouldLog bool) error {
|
||||
boff := backoff.NewExponentialWithJitter(time.Millisecond*256, time.Second*3)
|
||||
for attempt := 0; true; attempt++ {
|
||||
sleep := boff(uint64(attempt))
|
||||
time.Sleep(sleep)
|
||||
var takeover bool
|
||||
|
||||
err := retry.WithBackoff(
|
||||
ctx,
|
||||
func(ctx context.Context) error {
|
||||
takeover = false
|
||||
|
||||
tx, errBegin := h.db.BeginTxx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
|
||||
if errBegin != nil {
|
||||
return errors.Wrap(errBegin, "can't start transaction")
|
||||
}
|
||||
|
||||
query := `SELECT id, heartbeat FROM icingadb_instance` +
|
||||
` WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?`
|
||||
|
||||
ctx, cancelCtx := context.WithCancel(ctx)
|
||||
tx, err := h.db.BeginTxx(ctx, &sql.TxOptions{
|
||||
Isolation: sql.LevelSerializable,
|
||||
})
|
||||
if err != nil {
|
||||
cancelCtx()
|
||||
return errors.Wrap(err, "can't start transaction")
|
||||
}
|
||||
query := `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?`
|
||||
rows, err := tx.QueryxContext(ctx, query, envId, "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout)))
|
||||
if err != nil {
|
||||
cancelCtx()
|
||||
return internal.CantPerformQuery(err, query)
|
||||
}
|
||||
takeover := true
|
||||
if rows.Next() {
|
||||
instance := &v1.IcingadbInstance{}
|
||||
err := rows.StructScan(instance)
|
||||
if err != nil {
|
||||
h.logger.Errorw("Can't scan currently active instance", zap.Error(err))
|
||||
} else {
|
||||
|
||||
errQuery := tx.QueryRowxContext(
|
||||
ctx, query, envId, "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout)),
|
||||
).StructScan(instance)
|
||||
switch errQuery {
|
||||
case nil:
|
||||
if shouldLog {
|
||||
h.logger.Infow("Another instance is active",
|
||||
zap.String("instance_id", instance.Id.String()),
|
||||
|
|
@ -256,70 +253,72 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
|
|||
"heartbeat", instance.Heartbeat,
|
||||
zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time())))
|
||||
}
|
||||
takeover = false
|
||||
case sql.ErrNoRows:
|
||||
takeover = true
|
||||
default:
|
||||
return internal.CantPerformQuery(errQuery, query)
|
||||
}
|
||||
}
|
||||
_ = rows.Close()
|
||||
i := v1.IcingadbInstance{
|
||||
EntityWithoutChecksum: v1.EntityWithoutChecksum{
|
||||
IdMeta: v1.IdMeta{
|
||||
Id: h.instanceId,
|
||||
|
||||
i := v1.IcingadbInstance{
|
||||
EntityWithoutChecksum: v1.EntityWithoutChecksum{
|
||||
IdMeta: v1.IdMeta{
|
||||
Id: h.instanceId,
|
||||
},
|
||||
},
|
||||
},
|
||||
EnvironmentMeta: v1.EnvironmentMeta{
|
||||
EnvironmentId: envId,
|
||||
},
|
||||
Heartbeat: *t,
|
||||
Responsible: types.Bool{Bool: takeover || h.responsible, Valid: true},
|
||||
EndpointId: s.EndpointId,
|
||||
Icinga2Version: s.Version,
|
||||
Icinga2StartTime: s.ProgramStart,
|
||||
Icinga2NotificationsEnabled: s.NotificationsEnabled,
|
||||
Icinga2ActiveServiceChecksEnabled: s.ActiveServiceChecksEnabled,
|
||||
Icinga2ActiveHostChecksEnabled: s.ActiveHostChecksEnabled,
|
||||
Icinga2EventHandlersEnabled: s.EventHandlersEnabled,
|
||||
Icinga2FlapDetectionEnabled: s.FlapDetectionEnabled,
|
||||
Icinga2PerformanceDataEnabled: s.PerformanceDataEnabled,
|
||||
}
|
||||
EnvironmentMeta: v1.EnvironmentMeta{
|
||||
EnvironmentId: envId,
|
||||
},
|
||||
Heartbeat: *t,
|
||||
Responsible: types.Bool{Bool: takeover || h.responsible, Valid: true},
|
||||
EndpointId: s.EndpointId,
|
||||
Icinga2Version: s.Version,
|
||||
Icinga2StartTime: s.ProgramStart,
|
||||
Icinga2NotificationsEnabled: s.NotificationsEnabled,
|
||||
Icinga2ActiveServiceChecksEnabled: s.ActiveServiceChecksEnabled,
|
||||
Icinga2ActiveHostChecksEnabled: s.ActiveHostChecksEnabled,
|
||||
Icinga2EventHandlersEnabled: s.EventHandlersEnabled,
|
||||
Icinga2FlapDetectionEnabled: s.FlapDetectionEnabled,
|
||||
Icinga2PerformanceDataEnabled: s.PerformanceDataEnabled,
|
||||
}
|
||||
|
||||
stmt, _ := h.db.BuildUpsertStmt(i)
|
||||
_, err = tx.NamedExecContext(ctx, stmt, i)
|
||||
stmt, _ := h.db.BuildUpsertStmt(i)
|
||||
if _, err := tx.NamedExecContext(ctx, stmt, i); err != nil {
|
||||
return internal.CantPerformQuery(err, stmt)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
cancelCtx()
|
||||
err = internal.CantPerformQuery(err, stmt)
|
||||
if !utils.IsDeadlock(err) {
|
||||
h.logger.Errorw("Can't update or insert instance", zap.Error(err))
|
||||
break
|
||||
} else {
|
||||
if attempt > 2 {
|
||||
// Log with info level after third attempt
|
||||
h.logger.Infow("Can't update or insert instance. Retrying", zap.Error(err), zap.Int("retry count", attempt))
|
||||
} else {
|
||||
h.logger.Debugw("Can't update or insert instance. Retrying", zap.Error(err), zap.Int("retry count", attempt))
|
||||
if err := tx.Commit(); err != nil {
|
||||
return errors.Wrap(err, "can't commit transaction")
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
IsRetryable,
|
||||
backoff.NewExponentialWithJitter(time.Millisecond*256, time.Second*3),
|
||||
retry.Settings{
|
||||
OnError: func(_ time.Duration, attempt uint64, err, lastErr error) {
|
||||
if lastErr == nil || err.Error() != lastErr.Error() {
|
||||
log := h.logger.Debugw
|
||||
if attempt > 2 {
|
||||
log = h.logger.Infow
|
||||
}
|
||||
|
||||
log("Can't update or insert instance. Retrying", zap.Error(err), zap.Uint64("retry count", attempt))
|
||||
}
|
||||
continue
|
||||
}
|
||||
},
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if takeover {
|
||||
// Insert the environment after each heartbeat takeover if it does not already exist in the database
|
||||
// as the environment may have changed, although this is likely to happen very rarely.
|
||||
if err := h.insertEnvironment(); err != nil {
|
||||
return errors.Wrap(err, "can't insert environment")
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
cancelCtx()
|
||||
return errors.Wrap(err, "can't commit transaction")
|
||||
}
|
||||
|
||||
if takeover {
|
||||
// Insert the environment after each heartbeat takeover if it does not already exist in the database
|
||||
// as the environment may have changed, although this is likely to happen very rarely.
|
||||
if err := h.insertEnvironment(); err != nil {
|
||||
cancelCtx()
|
||||
return errors.Wrap(err, "can't insert environment")
|
||||
}
|
||||
|
||||
h.signalTakeover()
|
||||
}
|
||||
|
||||
cancelCtx()
|
||||
break
|
||||
h.signalTakeover()
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
|||
Loading…
Reference in a new issue