diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 54c0e079..94bea2fb 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -18,7 +18,6 @@ import ( func main() { cmd := command.New() - instanceId := cmd.InstanceId() logger := cmd.Logger defer logger.Sync() defer func() { @@ -39,7 +38,7 @@ func main() { ctx := context.Background() heartbeat := icingaredis.NewHeartbeat(ctx, rc, logger) - ha := icingadb.NewHA(ctx, instanceId, db, heartbeat, logger) + ha := icingadb.NewHA(ctx, db, heartbeat, logger) s := icingadb.NewSync(db, rc, logger) // For temporary exit after sync diff --git a/internal/command/command.go b/internal/command/command.go index 3764a09b..0f6a90f5 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -1,14 +1,11 @@ package command import ( - "fmt" "github.com/icinga/icingadb/pkg/config" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/icingaredis" - "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" "go.uber.org/zap" - "path/filepath" ) type Command struct { @@ -50,20 +47,6 @@ func (c Command) Database() *icingadb.DB { return db } -func (c Command) InstanceId() types.Binary { - var instanceId types.Binary - path := filepath.Join(c.Flags.Datadir, "instance-id") - - instanceId, err := utils.CreateOrRead(path, utils.Uuid) - if err != nil { - c.Logger.Fatalw(fmt.Sprintf("can't create or read instance-id file %s", path), zap.Error(err)) - } - instanceId = utils.Checksum([]byte(instanceId)) - c.Logger.Infof("My instance ID is %s", instanceId) - - return instanceId -} - func (c Command) Redis() *icingaredis.Client { rc, err := c.Config.Redis.NewClient(c.Logger) if err != nil { diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index a4c20b1f..cd59ff00 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -3,7 +3,9 @@ package icingadb import ( "context" "database/sql" + "encoding/hex" "errors" + "github.com/google/uuid" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1" @@ -32,13 +34,15 @@ type HA struct { errOnce sync.Once } -func NewHA(ctx context.Context, instanceId types.Binary, db *DB, heartbeat *icingaredis.Heartbeat, logger *zap.SugaredLogger) *HA { +func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger *zap.SugaredLogger) *HA { ctx, cancel := context.WithCancel(ctx) + instanceId := uuid.New() + ha := &HA{ ctx: ctx, cancel: cancel, - instanceId: instanceId, + instanceId: instanceId[:], db: db, heartbeat: heartbeat, logger: logger, @@ -94,6 +98,10 @@ func (h *HA) abort(err error) { // controller loop. func (h *HA) controller() { + h.logger.Debugw("Starting HA", zap.String("instance_id", hex.EncodeToString(h.instanceId))) + + oldInstancesRemoved := false + for { select { case b, ok := <-h.heartbeat.Beat(): @@ -126,6 +134,10 @@ func (h *HA) controller() { if err = h.realize(s, t); err != nil { h.abort(err) } + if !oldInstancesRemoved { + go h.removeOldInstances(s) + oldInstancesRemoved = true + } case <-h.heartbeat.Lost(): h.logger.Error("Lost heartbeat") h.signalHandover() @@ -137,7 +149,7 @@ func (h *HA) controller() { func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error { // boff := backoff.NewExponentialWithJitter(time.Millisecond*1, time.Second*1) - for attempt, retry := 0, true; retry; attempt++ { + for attempt := 0; true; attempt++ { // sleep := boff(uint64(attempt)) // h.logger.Debugf("Sleeping for %s..", sleep) // time.Sleep(sleep) @@ -158,50 +170,71 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error { break } _ = rows.Close() - if takeover { - i := v1.IcingadbInstance{ - EntityWithoutChecksum: v1.EntityWithoutChecksum{ - IdMeta: v1.IdMeta{ - Id: h.instanceId, - }, + i := v1.IcingadbInstance{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{ + Id: h.instanceId, }, - EnvironmentMeta: v1.EnvironmentMeta{ - EnvironmentId: s.EnvironmentID(), - }, - Heartbeat: *t, - Responsible: types.Yes, - Icinga2Version: s.Version, - Icinga2StartTime: s.ProgramStart, - Icinga2NotificationsEnabled: s.NotificationsEnabled, - Icinga2ActiveServiceChecksEnabled: s.ActiveServiceChecksEnabled, - Icinga2ActiveHostChecksEnabled: s.ActiveHostChecksEnabled, - Icinga2EventHandlersEnabled: s.EventHandlersEnabled, - Icinga2FlapDetectionEnabled: s.FlapDetectionEnabled, - Icinga2PerformanceDataEnabled: s.PerformanceDataEnabled, - } - _, err := tx.NamedExecContext(ctx, h.db.BuildUpsertStmt(i), i) - if err != nil { - cancel() - if !utils.IsDeadlock(err) { - retry = false - h.logger.Errorw("Can't Update or insert instance.", zap.Error(err)) - } else { - h.logger.Infow("Can't Update or insert instance. Retrying..", zap.Error(err)) - } + }, + EnvironmentMeta: v1.EnvironmentMeta{ + EnvironmentId: s.EnvironmentID(), + }, + Heartbeat: *t, + Responsible: types.Bool{Bool: takeover || h.responsible, Valid: true}, + EndpointId: s.EndpointId, + Icinga2Version: s.Version, + Icinga2StartTime: s.ProgramStart, + Icinga2NotificationsEnabled: s.NotificationsEnabled, + Icinga2ActiveServiceChecksEnabled: s.ActiveServiceChecksEnabled, + Icinga2ActiveHostChecksEnabled: s.ActiveHostChecksEnabled, + Icinga2EventHandlersEnabled: s.EventHandlersEnabled, + Icinga2FlapDetectionEnabled: s.FlapDetectionEnabled, + Icinga2PerformanceDataEnabled: s.PerformanceDataEnabled, + } + _, err = tx.NamedExecContext(ctx, h.db.BuildUpsertStmt(i), i) + if err != nil { + cancel() + if !utils.IsDeadlock(err) { + h.logger.Errorw("Can't Update or insert instance.", zap.Error(err)) + break + } else { + h.logger.Infow("Can't Update or insert instance. Retrying..", zap.Error(err)) continue } - } else { - retry = false } if err := tx.Commit(); err != nil { return err } - h.signalTakeover() + if takeover { + h.signalTakeover() + } + break } return nil } +func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus) { + select { + case <-h.ctx.Done(): + return + case <-time.After(timeout): + result, err := h.db.ExecContext(h.ctx, "DELETE FROM icingadb_instance "+ + "WHERE id != ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?", + h.instanceId, s.EnvironmentID(), s.EndpointId, types.UnixMilli(time.Now().Add(-timeout))) + if err != nil { + h.logger.Errorw("Can't remove rows of old instances", zap.Error(err)) + return + } + affected, err := result.RowsAffected() + if err != nil { + h.logger.Errorw("Can't get number of removed old instances", zap.Error(err)) + return + } + h.logger.Debugf("Removed %d old instances", affected) + } +} + func (h *HA) signalHandover() { if h.responsible { h.logger.Warn("Handing over..")