From 476d2bc20e20e237d200cfed33f6cd1c52417e34 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Tue, 2 Mar 2021 11:21:33 +0100 Subject: [PATCH 1/5] Insert endpoint_id into icingadb_instance --- pkg/icingadb/ha.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index a4c20b1f..132fd534 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -170,6 +170,7 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error { }, Heartbeat: *t, Responsible: types.Yes, + EndpointId: s.EndpointId, Icinga2Version: s.Version, Icinga2StartTime: s.ProgramStart, Icinga2NotificationsEnabled: s.NotificationsEnabled, From 4293a51c886e1dd40d1965416cf0c1a51cd464dc Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Tue, 2 Mar 2021 11:22:09 +0100 Subject: [PATCH 2/5] Only signal HA takeover if a takeover was attempted --- pkg/icingadb/ha.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 132fd534..1e13a4b3 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -197,7 +197,9 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error { if err := tx.Commit(); err != nil { return err } - h.signalTakeover() + if takeover { + h.signalTakeover() + } } return nil From ab268bfbb6ec1d3f85324ad620e5cbeef13dd06a Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Tue, 2 Mar 2021 12:23:39 +0100 Subject: [PATCH 3/5] Always write HA heartbeat --- pkg/icingadb/ha.go | 63 ++++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 33 deletions(-) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 1e13a4b3..d1f541bf 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -137,7 +137,7 @@ func (h *HA) controller() { func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error { // boff := backoff.NewExponentialWithJitter(time.Millisecond*1, time.Second*1) - for attempt, retry := 0, true; retry; attempt++ { + for attempt := 0; true; attempt++ { // sleep := boff(uint64(attempt)) // h.logger.Debugf("Sleeping for %s..", sleep) // time.Sleep(sleep) @@ -158,41 +158,37 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error { break } _ = rows.Close() - if takeover { - i := v1.IcingadbInstance{ - EntityWithoutChecksum: v1.EntityWithoutChecksum{ - IdMeta: v1.IdMeta{ - Id: h.instanceId, - }, + i := v1.IcingadbInstance{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{ + Id: h.instanceId, }, - EnvironmentMeta: v1.EnvironmentMeta{ - EnvironmentId: s.EnvironmentID(), - }, - Heartbeat: *t, - Responsible: types.Yes, - EndpointId: s.EndpointId, - Icinga2Version: s.Version, - Icinga2StartTime: s.ProgramStart, - Icinga2NotificationsEnabled: s.NotificationsEnabled, - Icinga2ActiveServiceChecksEnabled: s.ActiveServiceChecksEnabled, - Icinga2ActiveHostChecksEnabled: s.ActiveHostChecksEnabled, - Icinga2EventHandlersEnabled: s.EventHandlersEnabled, - Icinga2FlapDetectionEnabled: s.FlapDetectionEnabled, - Icinga2PerformanceDataEnabled: s.PerformanceDataEnabled, - } - _, err := tx.NamedExecContext(ctx, h.db.BuildUpsertStmt(i), i) - if err != nil { - cancel() - if !utils.IsDeadlock(err) { - retry = false - h.logger.Errorw("Can't Update or insert instance.", zap.Error(err)) - } else { - h.logger.Infow("Can't Update or insert instance. Retrying..", zap.Error(err)) - } + }, + EnvironmentMeta: v1.EnvironmentMeta{ + EnvironmentId: s.EnvironmentID(), + }, + Heartbeat: *t, + Responsible: types.Bool{Bool: takeover || h.responsible, Valid: true}, + EndpointId: s.EndpointId, + Icinga2Version: s.Version, + Icinga2StartTime: s.ProgramStart, + Icinga2NotificationsEnabled: s.NotificationsEnabled, + Icinga2ActiveServiceChecksEnabled: s.ActiveServiceChecksEnabled, + Icinga2ActiveHostChecksEnabled: s.ActiveHostChecksEnabled, + Icinga2EventHandlersEnabled: s.EventHandlersEnabled, + Icinga2FlapDetectionEnabled: s.FlapDetectionEnabled, + Icinga2PerformanceDataEnabled: s.PerformanceDataEnabled, + } + _, err = tx.NamedExecContext(ctx, h.db.BuildUpsertStmt(i), i) + if err != nil { + cancel() + if !utils.IsDeadlock(err) { + h.logger.Errorw("Can't Update or insert instance.", zap.Error(err)) + break + } else { + h.logger.Infow("Can't Update or insert instance. Retrying..", zap.Error(err)) continue } - } else { - retry = false } if err := tx.Commit(); err != nil { return err @@ -200,6 +196,7 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error { if takeover { h.signalTakeover() } + break } return nil From a474c83651e6ec7301b6ae0b7574d5647581b4c0 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Tue, 2 Mar 2021 12:28:28 +0100 Subject: [PATCH 4/5] Replace persisted instance ID with a random one and remove old rows from icingadb_instance --- cmd/icingadb/main.go | 3 +-- internal/command/command.go | 17 ----------------- pkg/icingadb/ha.go | 29 +++++++++++++++++++++++++++-- 3 files changed, 28 insertions(+), 21 deletions(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 54c0e079..94bea2fb 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -18,7 +18,6 @@ import ( func main() { cmd := command.New() - instanceId := cmd.InstanceId() logger := cmd.Logger defer logger.Sync() defer func() { @@ -39,7 +38,7 @@ func main() { ctx := context.Background() heartbeat := icingaredis.NewHeartbeat(ctx, rc, logger) - ha := icingadb.NewHA(ctx, instanceId, db, heartbeat, logger) + ha := icingadb.NewHA(ctx, db, heartbeat, logger) s := icingadb.NewSync(db, rc, logger) // For temporary exit after sync diff --git a/internal/command/command.go b/internal/command/command.go index 3764a09b..0f6a90f5 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -1,14 +1,11 @@ package command import ( - "fmt" "github.com/icinga/icingadb/pkg/config" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/icingaredis" - "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" "go.uber.org/zap" - "path/filepath" ) type Command struct { @@ -50,20 +47,6 @@ func (c Command) Database() *icingadb.DB { return db } -func (c Command) InstanceId() types.Binary { - var instanceId types.Binary - path := filepath.Join(c.Flags.Datadir, "instance-id") - - instanceId, err := utils.CreateOrRead(path, utils.Uuid) - if err != nil { - c.Logger.Fatalw(fmt.Sprintf("can't create or read instance-id file %s", path), zap.Error(err)) - } - instanceId = utils.Checksum([]byte(instanceId)) - c.Logger.Infof("My instance ID is %s", instanceId) - - return instanceId -} - func (c Command) Redis() *icingaredis.Client { rc, err := c.Config.Redis.NewClient(c.Logger) if err != nil { diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index d1f541bf..bd218a79 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -3,7 +3,9 @@ package icingadb import ( "context" "database/sql" + "encoding/hex" "errors" + "github.com/google/uuid" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1" @@ -32,13 +34,15 @@ type HA struct { errOnce sync.Once } -func NewHA(ctx context.Context, instanceId types.Binary, db *DB, heartbeat *icingaredis.Heartbeat, logger *zap.SugaredLogger) *HA { +func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger *zap.SugaredLogger) *HA { ctx, cancel := context.WithCancel(ctx) + instanceId := uuid.New() + ha := &HA{ ctx: ctx, cancel: cancel, - instanceId: instanceId, + instanceId: instanceId[:], db: db, heartbeat: heartbeat, logger: logger, @@ -94,6 +98,8 @@ func (h *HA) abort(err error) { // controller loop. func (h *HA) controller() { + h.logger.Debugw("Starting HA", zap.String("instance_id", hex.EncodeToString(h.instanceId))) + for { select { case b, ok := <-h.heartbeat.Beat(): @@ -126,6 +132,7 @@ func (h *HA) controller() { if err = h.realize(s, t); err != nil { h.abort(err) } + go h.removeOldInstances(s) case <-h.heartbeat.Lost(): h.logger.Error("Lost heartbeat") h.signalHandover() @@ -202,6 +209,24 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error { return nil } +func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus) { + result, err := h.db.ExecContext(h.ctx, "DELETE FROM icingadb_instance "+ + "WHERE id != ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?", + h.instanceId, s.EnvironmentID(), s.EndpointId, types.UnixMilli(time.Now().Add(-timeout))) + if err != nil { + h.logger.Errorw("Can't remove rows of old instances", zap.Error(err)) + return + } + affected, err := result.RowsAffected() + if err != nil { + h.logger.Errorw("Can't get number of removed old instances", zap.Error(err)) + return + } + if affected > 0 { + h.logger.Debugf("Removed %d old instances", affected) + } +} + func (h *HA) signalHandover() { if h.responsible { h.logger.Warn("Handing over..") From 6ccbc7d0917e4746829c9f8988686fe314556022 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Wed, 3 Mar 2021 14:18:12 +0100 Subject: [PATCH 5/5] HA: only execute query to remove old instances once --- pkg/icingadb/ha.go | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index bd218a79..cd59ff00 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -100,6 +100,8 @@ func (h *HA) abort(err error) { func (h *HA) controller() { h.logger.Debugw("Starting HA", zap.String("instance_id", hex.EncodeToString(h.instanceId))) + oldInstancesRemoved := false + for { select { case b, ok := <-h.heartbeat.Beat(): @@ -132,7 +134,10 @@ func (h *HA) controller() { if err = h.realize(s, t); err != nil { h.abort(err) } - go h.removeOldInstances(s) + if !oldInstancesRemoved { + go h.removeOldInstances(s) + oldInstancesRemoved = true + } case <-h.heartbeat.Lost(): h.logger.Error("Lost heartbeat") h.signalHandover() @@ -210,19 +215,22 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error { } func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus) { - result, err := h.db.ExecContext(h.ctx, "DELETE FROM icingadb_instance "+ - "WHERE id != ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?", - h.instanceId, s.EnvironmentID(), s.EndpointId, types.UnixMilli(time.Now().Add(-timeout))) - if err != nil { - h.logger.Errorw("Can't remove rows of old instances", zap.Error(err)) + select { + case <-h.ctx.Done(): return - } - affected, err := result.RowsAffected() - if err != nil { - h.logger.Errorw("Can't get number of removed old instances", zap.Error(err)) - return - } - if affected > 0 { + case <-time.After(timeout): + result, err := h.db.ExecContext(h.ctx, "DELETE FROM icingadb_instance "+ + "WHERE id != ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?", + h.instanceId, s.EnvironmentID(), s.EndpointId, types.UnixMilli(time.Now().Add(-timeout))) + if err != nil { + h.logger.Errorw("Can't remove rows of old instances", zap.Error(err)) + return + } + affected, err := result.RowsAffected() + if err != nil { + h.logger.Errorw("Can't get number of removed old instances", zap.Error(err)) + return + } h.logger.Debugf("Removed %d old instances", affected) } }