diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index ad556844..5c108af9 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -47,9 +47,27 @@ func run() int { } } }() + + logger.Info("Starting Icinga DB") + db := cmd.Database() defer db.Close() + { + logger.Info("Connecting to database") + err := db.Ping() + if err != nil { + panic(errors.Wrap(err, "can't connect to database")) + } + } + rc := cmd.Redis() + { + logger.Info("Connecting to Redis") + _, err := rc.Ping(context.Background()).Result() + if err != nil { + panic(errors.Wrap(err, "can't connect to Redis")) + } + } ctx, cancelCtx := context.WithCancel(context.Background()) heartbeat := icingaredis.NewHeartbeat(ctx, rc, logger) @@ -71,6 +89,8 @@ func run() int { for hactx.Err() == nil { select { case <-ha.Takeover(): + logger.Info("Taking over") + go func() { for hactx.Err() == nil { synctx, cancelSynctx := context.WithCancel(hactx) @@ -81,6 +101,8 @@ func run() int { dump := icingadb.NewDumpSignals(rc, logger) g.Go(func() error { + logger.Info("Staring config dump signal handling") + return dump.Listen(synctx) }) @@ -101,13 +123,18 @@ func run() int { }) g.Go(func() error { + logger.Info("Starting history sync") + return hs.Sync(synctx) }) g.Go(func() error { + logger.Info("Starting overdue sync") + return ods.Sync(synctx) }) + logger.Info("Starting config sync") for _, factory := range v1.Factories { factory := factory @@ -125,6 +152,8 @@ func run() int { <-dump.Done("icinga:customvar") + logger.Infof("Syncing customvar") + cv := common.NewSyncSubject(v1.NewCustomvar) cvs, redisErrs := rc.YieldAll(synctx, cv) @@ -181,6 +210,8 @@ func run() int { g.Go(func() error { wg.Wait() + logger.Infof("Starting runtime updates sync") + return rt.Sync(synctx, v1.Factories, lastRuntimeStreamId) }) @@ -190,6 +221,8 @@ func run() int { } }() case <-ha.Handover(): + logger.Warn("Handing over") + cancelHactx() case <-hactx.Done(): // Nothing to do here, surrounding loop will terminate now. diff --git a/internal/command/command.go b/internal/command/command.go index 0f6a90f5..7f7df9b0 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -41,7 +41,7 @@ func New() *Command { func (c Command) Database() *icingadb.DB { db, err := c.Config.Database.Open(c.Logger) if err != nil { - c.Logger.Fatal("can't create database connection pool from config", zap.Error(err)) + c.Logger.Fatalw("can't create database connection pool from config", zap.Error(err)) } return db @@ -50,7 +50,7 @@ func (c Command) Database() *icingadb.DB { func (c Command) Redis() *icingaredis.Client { rc, err := c.Config.Redis.NewClient(c.Logger) if err != nil { - c.Logger.Fatal("can't create Redis client from config", zap.Error(err)) + c.Logger.Fatalw("can't create Redis client from config", zap.Error(err)) } return rc diff --git a/pkg/config/redis.go b/pkg/config/redis.go index cac508f0..ec429bb5 100644 --- a/pkg/config/redis.go +++ b/pkg/config/redis.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" "net" "os" + "sync" "syscall" "time" ) @@ -27,7 +28,7 @@ type Redis struct { func (r *Redis) NewClient(logger *zap.SugaredLogger) (*icingaredis.Client, error) { c := redis.NewClient(&redis.Options{ Addr: r.Address, - Dialer: dial, + Dialer: dialWithLogging(logger), Password: r.Password, DB: 0, // Use default DB, ReadTimeout: r.Timeout, @@ -40,27 +41,36 @@ func (r *Redis) NewClient(logger *zap.SugaredLogger) (*icingaredis.Client, error return icingaredis.NewClient(c, logger, &r.Options), nil } -// dial behaves like net.Dialer#DialContext, but re-tries on syscall.ECONNREFUSED. -func dial(ctx context.Context, network, addr string) (conn net.Conn, err error) { - var dl net.Dialer +// dialWithLogging returns a Redis Dialer with logging capabilities. +func dialWithLogging(logger *zap.SugaredLogger) func(context.Context, string, string) (net.Conn, error) { + // dial behaves like net.Dialer#DialContext, but re-tries on syscall.ECONNREFUSED. + return func(ctx context.Context, network, addr string) (conn net.Conn, err error) { + var dl net.Dialer + var logFirstError sync.Once - err = retry.WithBackoff( - ctx, - func(ctx context.Context) (err error) { - conn, err = dl.DialContext(ctx, network, addr) - return - }, - func(err error) bool { - if op, ok := err.(*net.OpError); ok { - sys, ok := op.Err.(*os.SyscallError) - return ok && sys.Err == syscall.ECONNREFUSED - } - return false - }, - backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), - 5*time.Minute, - ) - return + err = retry.WithBackoff( + ctx, + func(ctx context.Context) (err error) { + conn, err = dl.DialContext(ctx, network, addr) + logFirstError.Do(func() { + if err != nil { + logger.Warnw("Can't connect to Redis. Retrying", zap.Error(err)) + } + }) + return + }, + func(err error) bool { + if op, ok := err.(*net.OpError); ok { + sys, ok := op.Err.(*os.SyscallError) + return ok && sys.Err == syscall.ECONNREFUSED + } + return false + }, + backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), + 5*time.Minute, + ) + return + } } // UnmarshalYAML implements the yaml.Unmarshaler interface. diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index ec8197ba..ee74d58c 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -12,6 +12,7 @@ import ( "log" "net" "os" + "sync" "syscall" "time" ) @@ -26,10 +27,16 @@ type Driver struct { // TODO(el): Test DNS. func (d Driver) Open(dsn string) (c driver.Conn, err error) { + var logFirstError sync.Once err = retry.WithBackoff( context.Background(), func(context.Context) (err error) { c, err = d.Driver.Open(dsn) + logFirstError.Do(func() { + if err != nil { + d.Logger.Warnw("Can't connect to database. Retrying", zap.Error(err)) + } + }) return }, shouldRetry, diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index aee3cedc..e753eb34 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -5,6 +5,7 @@ import ( "database/sql" "encoding/hex" "github.com/google/uuid" + "github.com/icinga/icingadb/pkg/backoff" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1" @@ -105,6 +106,10 @@ func (h *HA) controller() { oldInstancesRemoved := false + logTicker := time.NewTicker(time.Second * 60) + defer logTicker.Stop() + shouldLog := true + for { select { case m, ok := <-h.heartbeat.Beat(): @@ -122,7 +127,7 @@ func (h *HA) controller() { h.logger.Debugw("Received heartbeat from future", "time", t) } if tt.Before(now.Add(-1 * timeout)) { - h.logger.Errorw("Received heartbeat from the past %s", "time", t) + h.logger.Errorw("Received heartbeat from the past", "time", t) h.signalHandover() continue } @@ -130,13 +135,21 @@ func (h *HA) controller() { if err != nil { h.abort(err) } - if err = h.realize(s, t); err != nil { + + select { + case <-logTicker.C: + shouldLog = true + default: + } + if err = h.realize(s, t, shouldLog); err != nil { h.abort(err) } if !oldInstancesRemoved { go h.removeOldInstances(s) oldInstancesRemoved = true } + + shouldLog = false case <-h.heartbeat.Lost(): h.logger.Error("Lost heartbeat") h.signalHandover() @@ -146,12 +159,12 @@ func (h *HA) controller() { } } -func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error { - // boff := backoff.NewExponentialWithJitter(time.Millisecond*1, time.Second*1) +func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLog bool) error { + boff := backoff.NewExponentialWithJitter(time.Millisecond*256, time.Second*3) for attempt := 0; true; attempt++ { - // sleep := boff(uint64(attempt)) - // h.logger.Debugf("Sleeping for %s..", sleep) - // time.Sleep(sleep) + sleep := boff(uint64(attempt)) + time.Sleep(sleep) + ctx, cancel := context.WithCancel(h.ctx) tx, err := h.db.BeginTxx(ctx, &sql.TxOptions{ Isolation: sql.LevelSerializable, @@ -159,12 +172,22 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error { if err != nil { return err } - rows, err := tx.QueryxContext(ctx, `SELECT 1 FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?`, s.EnvironmentID(), "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout))) + rows, err := tx.QueryxContext(ctx, `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?`, s.EnvironmentID(), "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout))) if err != nil { return err } takeover := true for rows.Next() { + instance := &v1.IcingadbInstance{} + err := rows.StructScan(instance) + if err != nil { + h.logger.Errorw("Can't scan currently active instance", zap.Error(err)) + break + } + + if shouldLog { + h.logger.Infow("Another instance is active", "instance_id", instance.Id, zap.String("environment", s.Environment), "heartbeat", instance.Heartbeat, zap.Duration("heartbeat_age", time.Now().Sub(instance.Heartbeat.Time()))) + } takeover = false break } @@ -197,10 +220,15 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error { if err != nil { cancel() if !utils.IsDeadlock(err) { - h.logger.Errorw("Can't Update or insert instance.", zap.Error(err)) + h.logger.Errorw("Can't update or insert instance", zap.Error(err)) break } else { - h.logger.Infow("Can't Update or insert instance. Retrying..", zap.Error(err)) + if attempt > 2 { + // Log with info level after third attempt + h.logger.Infow("Can't update or insert instance. Retrying", zap.Error(err), zap.Int("retry count", attempt)) + } else { + h.logger.Debugw("Can't update or insert instance. Retrying", zap.Error(err), zap.Int("retry count", attempt)) + } continue } } @@ -248,7 +276,6 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus) { func (h *HA) signalHandover() { if h.responsible { - h.logger.Warn("Handing over..") h.responsible = false h.handover <- struct{}{} } @@ -256,7 +283,6 @@ func (h *HA) signalHandover() { func (h *HA) signalTakeover() { if !h.responsible { - h.logger.Info("Taking over..") h.responsible = true h.takeover <- struct{}{} } diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 995929cb..354754b4 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -40,8 +40,6 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti stream := "icinga:runtime" updateMessagesByKey := make(map[string]chan<- redis.XMessage) - r.logger.Infof("Syncing runtime updates") - for _, factoryFunc := range factoryFuncs { factoryFunc = factoryFunc.WithInit @@ -54,7 +52,7 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti updateMessagesByKey[fmt.Sprintf("icinga:%s", utils.Key(name, ':'))] = updateMessages - r.logger.Infof("Syncing runtime updates of %s", name) + r.logger.Debugf("Syncing runtime updates of %s", name) g.Go(structifyStream(ctx, updateMessages, upsertEntities, deleteIds, structify.MakeMapStructifier(reflect.TypeOf(v).Elem(), "json"))) g.Go(func() error { diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index 166ce33d..bbb160c2 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -111,12 +111,22 @@ func (h Heartbeat) controller() { for { select { case m := <-messages: - h.active = true // TODO(el): We might only want to set this once + if !h.active { + s, err := m.IcingaStatus() + if err != nil { + return err + } + h.logger.Infow("Received first Icinga 2 heartbeat", zap.String("environment", s.Environment)) + h.active = true + } h.beat <- m case <-time.After(timeout): if h.active { + h.logger.Warn("Lost Icinga 2 heartbeat", zap.Duration("timeout", timeout)) h.lost <- struct{}{} h.active = false + } else { + h.logger.Warn("Waiting for Icinga 2 heartbeat") } case <-ctx.Done(): return ctx.Err()