From 44c734f72d6dee8ee5f0232b23259157bbdc49ce Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Wed, 12 May 2021 13:49:03 +0200 Subject: [PATCH] Improve database and HA logging --- cmd/icingadb/main.go | 33 +++++++++++++++++++++++++++++ internal/command/command.go | 4 ++-- pkg/icingadb/ha.go | 37 ++++++++++++++++++++++++++------- pkg/icingadb/runtime_updates.go | 4 +--- pkg/icingaredis/heartbeat.go | 8 ++++++- 5 files changed, 72 insertions(+), 14 deletions(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index ad556844..5c108af9 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -47,9 +47,27 @@ func run() int { } } }() + + logger.Info("Starting Icinga DB") + db := cmd.Database() defer db.Close() + { + logger.Info("Connecting to database") + err := db.Ping() + if err != nil { + panic(errors.Wrap(err, "can't connect to database")) + } + } + rc := cmd.Redis() + { + logger.Info("Connecting to Redis") + _, err := rc.Ping(context.Background()).Result() + if err != nil { + panic(errors.Wrap(err, "can't connect to Redis")) + } + } ctx, cancelCtx := context.WithCancel(context.Background()) heartbeat := icingaredis.NewHeartbeat(ctx, rc, logger) @@ -71,6 +89,8 @@ func run() int { for hactx.Err() == nil { select { case <-ha.Takeover(): + logger.Info("Taking over") + go func() { for hactx.Err() == nil { synctx, cancelSynctx := context.WithCancel(hactx) @@ -81,6 +101,8 @@ func run() int { dump := icingadb.NewDumpSignals(rc, logger) g.Go(func() error { + logger.Info("Staring config dump signal handling") + return dump.Listen(synctx) }) @@ -101,13 +123,18 @@ func run() int { }) g.Go(func() error { + logger.Info("Starting history sync") + return hs.Sync(synctx) }) g.Go(func() error { + logger.Info("Starting overdue sync") + return ods.Sync(synctx) }) + logger.Info("Starting config sync") for _, factory := range v1.Factories { factory := factory @@ -125,6 +152,8 @@ func run() int { <-dump.Done("icinga:customvar") + logger.Infof("Syncing customvar") + cv := common.NewSyncSubject(v1.NewCustomvar) cvs, redisErrs := rc.YieldAll(synctx, cv) @@ -181,6 +210,8 @@ func run() int { g.Go(func() error { wg.Wait() + logger.Infof("Starting runtime updates sync") + return rt.Sync(synctx, v1.Factories, lastRuntimeStreamId) }) @@ -190,6 +221,8 @@ func run() int { } }() case <-ha.Handover(): + logger.Warn("Handing over") + cancelHactx() case <-hactx.Done(): // Nothing to do here, surrounding loop will terminate now. diff --git a/internal/command/command.go b/internal/command/command.go index 0f6a90f5..7f7df9b0 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -41,7 +41,7 @@ func New() *Command { func (c Command) Database() *icingadb.DB { db, err := c.Config.Database.Open(c.Logger) if err != nil { - c.Logger.Fatal("can't create database connection pool from config", zap.Error(err)) + c.Logger.Fatalw("can't create database connection pool from config", zap.Error(err)) } return db @@ -50,7 +50,7 @@ func (c Command) Database() *icingadb.DB { func (c Command) Redis() *icingaredis.Client { rc, err := c.Config.Redis.NewClient(c.Logger) if err != nil { - c.Logger.Fatal("can't create Redis client from config", zap.Error(err)) + c.Logger.Fatalw("can't create Redis client from config", zap.Error(err)) } return rc diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index aee3cedc..0a90bd06 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -105,6 +105,10 @@ func (h *HA) controller() { oldInstancesRemoved := false + logTicker := time.NewTicker(time.Second * 60) + defer logTicker.Stop() + shouldLog := true + for { select { case m, ok := <-h.heartbeat.Beat(): @@ -122,7 +126,7 @@ func (h *HA) controller() { h.logger.Debugw("Received heartbeat from future", "time", t) } if tt.Before(now.Add(-1 * timeout)) { - h.logger.Errorw("Received heartbeat from the past %s", "time", t) + h.logger.Errorw("Received heartbeat from the past", "time", t) h.signalHandover() continue } @@ -130,13 +134,21 @@ func (h *HA) controller() { if err != nil { h.abort(err) } - if err = h.realize(s, t); err != nil { + + select { + case <-logTicker.C: + shouldLog = true + default: + } + if err = h.realize(s, t, shouldLog); err != nil { h.abort(err) } if !oldInstancesRemoved { go h.removeOldInstances(s) oldInstancesRemoved = true } + + shouldLog = false case <-h.heartbeat.Lost(): h.logger.Error("Lost heartbeat") h.signalHandover() @@ -146,12 +158,13 @@ func (h *HA) controller() { } } -func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error { +func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLog bool) error { // boff := backoff.NewExponentialWithJitter(time.Millisecond*1, time.Second*1) for attempt := 0; true; attempt++ { // sleep := boff(uint64(attempt)) // h.logger.Debugf("Sleeping for %s..", sleep) // time.Sleep(sleep) + ctx, cancel := context.WithCancel(h.ctx) tx, err := h.db.BeginTxx(ctx, &sql.TxOptions{ Isolation: sql.LevelSerializable, @@ -159,12 +172,22 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error { if err != nil { return err } - rows, err := tx.QueryxContext(ctx, `SELECT 1 FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?`, s.EnvironmentID(), "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout))) + rows, err := tx.QueryxContext(ctx, `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?`, s.EnvironmentID(), "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout))) if err != nil { return err } takeover := true for rows.Next() { + instance := &v1.IcingadbInstance{} + err := rows.StructScan(instance) + if err != nil { + h.logger.Errorw("Can't scan currently active instance", zap.Error(err)) + break + } + + if shouldLog { + h.logger.Infow("Another instance is active", "instance_id", instance.Id, "heartbeat", instance.Heartbeat, zap.Duration("heartbeat_age", time.Now().Sub(instance.Heartbeat.Time()))) + } takeover = false break } @@ -197,10 +220,10 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error { if err != nil { cancel() if !utils.IsDeadlock(err) { - h.logger.Errorw("Can't Update or insert instance.", zap.Error(err)) + h.logger.Errorw("Can't update or insert instance", zap.Error(err)) break } else { - h.logger.Infow("Can't Update or insert instance. Retrying..", zap.Error(err)) + h.logger.Infow("Can't update or insert instance. Retrying", zap.Error(err)) continue } } @@ -248,7 +271,6 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus) { func (h *HA) signalHandover() { if h.responsible { - h.logger.Warn("Handing over..") h.responsible = false h.handover <- struct{}{} } @@ -256,7 +278,6 @@ func (h *HA) signalHandover() { func (h *HA) signalTakeover() { if !h.responsible { - h.logger.Info("Taking over..") h.responsible = true h.takeover <- struct{}{} } diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 995929cb..354754b4 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -40,8 +40,6 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti stream := "icinga:runtime" updateMessagesByKey := make(map[string]chan<- redis.XMessage) - r.logger.Infof("Syncing runtime updates") - for _, factoryFunc := range factoryFuncs { factoryFunc = factoryFunc.WithInit @@ -54,7 +52,7 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti updateMessagesByKey[fmt.Sprintf("icinga:%s", utils.Key(name, ':'))] = updateMessages - r.logger.Infof("Syncing runtime updates of %s", name) + r.logger.Debugf("Syncing runtime updates of %s", name) g.Go(structifyStream(ctx, updateMessages, upsertEntities, deleteIds, structify.MakeMapStructifier(reflect.TypeOf(v).Elem(), "json"))) g.Go(func() error { diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index 166ce33d..c347b330 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -111,12 +111,18 @@ func (h Heartbeat) controller() { for { select { case m := <-messages: - h.active = true // TODO(el): We might only want to set this once + if !h.active { + h.logger.Info("Received first Icinga 2 heartbeat") + h.active = true + } h.beat <- m case <-time.After(timeout): if h.active { + h.logger.Warn("Lost Icinga 2 heartbeat", zap.Duration("timeout", timeout)) h.lost <- struct{}{} h.active = false + } else { + h.logger.Warn("Waiting for Icinga 2 heartbeat") } case <-ctx.Done(): return ctx.Err()