mirror of
https://github.com/Icinga/icingadb.git
synced 2026-05-28 04:35:54 -04:00
Improve database and HA logging
This commit is contained in:
parent
ac2810aa6a
commit
44c734f72d
5 changed files with 72 additions and 14 deletions
|
|
@ -47,9 +47,27 @@ func run() int {
|
|||
}
|
||||
}
|
||||
}()
|
||||
|
||||
logger.Info("Starting Icinga DB")
|
||||
|
||||
db := cmd.Database()
|
||||
defer db.Close()
|
||||
{
|
||||
logger.Info("Connecting to database")
|
||||
err := db.Ping()
|
||||
if err != nil {
|
||||
panic(errors.Wrap(err, "can't connect to database"))
|
||||
}
|
||||
}
|
||||
|
||||
rc := cmd.Redis()
|
||||
{
|
||||
logger.Info("Connecting to Redis")
|
||||
_, err := rc.Ping(context.Background()).Result()
|
||||
if err != nil {
|
||||
panic(errors.Wrap(err, "can't connect to Redis"))
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||
heartbeat := icingaredis.NewHeartbeat(ctx, rc, logger)
|
||||
|
|
@ -71,6 +89,8 @@ func run() int {
|
|||
for hactx.Err() == nil {
|
||||
select {
|
||||
case <-ha.Takeover():
|
||||
logger.Info("Taking over")
|
||||
|
||||
go func() {
|
||||
for hactx.Err() == nil {
|
||||
synctx, cancelSynctx := context.WithCancel(hactx)
|
||||
|
|
@ -81,6 +101,8 @@ func run() int {
|
|||
|
||||
dump := icingadb.NewDumpSignals(rc, logger)
|
||||
g.Go(func() error {
|
||||
logger.Info("Staring config dump signal handling")
|
||||
|
||||
return dump.Listen(synctx)
|
||||
})
|
||||
|
||||
|
|
@ -101,13 +123,18 @@ func run() int {
|
|||
})
|
||||
|
||||
g.Go(func() error {
|
||||
logger.Info("Starting history sync")
|
||||
|
||||
return hs.Sync(synctx)
|
||||
})
|
||||
|
||||
g.Go(func() error {
|
||||
logger.Info("Starting overdue sync")
|
||||
|
||||
return ods.Sync(synctx)
|
||||
})
|
||||
|
||||
logger.Info("Starting config sync")
|
||||
for _, factory := range v1.Factories {
|
||||
factory := factory
|
||||
|
||||
|
|
@ -125,6 +152,8 @@ func run() int {
|
|||
|
||||
<-dump.Done("icinga:customvar")
|
||||
|
||||
logger.Infof("Syncing customvar")
|
||||
|
||||
cv := common.NewSyncSubject(v1.NewCustomvar)
|
||||
|
||||
cvs, redisErrs := rc.YieldAll(synctx, cv)
|
||||
|
|
@ -181,6 +210,8 @@ func run() int {
|
|||
g.Go(func() error {
|
||||
wg.Wait()
|
||||
|
||||
logger.Infof("Starting runtime updates sync")
|
||||
|
||||
return rt.Sync(synctx, v1.Factories, lastRuntimeStreamId)
|
||||
})
|
||||
|
||||
|
|
@ -190,6 +221,8 @@ func run() int {
|
|||
}
|
||||
}()
|
||||
case <-ha.Handover():
|
||||
logger.Warn("Handing over")
|
||||
|
||||
cancelHactx()
|
||||
case <-hactx.Done():
|
||||
// Nothing to do here, surrounding loop will terminate now.
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ func New() *Command {
|
|||
func (c Command) Database() *icingadb.DB {
|
||||
db, err := c.Config.Database.Open(c.Logger)
|
||||
if err != nil {
|
||||
c.Logger.Fatal("can't create database connection pool from config", zap.Error(err))
|
||||
c.Logger.Fatalw("can't create database connection pool from config", zap.Error(err))
|
||||
}
|
||||
|
||||
return db
|
||||
|
|
@ -50,7 +50,7 @@ func (c Command) Database() *icingadb.DB {
|
|||
func (c Command) Redis() *icingaredis.Client {
|
||||
rc, err := c.Config.Redis.NewClient(c.Logger)
|
||||
if err != nil {
|
||||
c.Logger.Fatal("can't create Redis client from config", zap.Error(err))
|
||||
c.Logger.Fatalw("can't create Redis client from config", zap.Error(err))
|
||||
}
|
||||
|
||||
return rc
|
||||
|
|
|
|||
|
|
@ -105,6 +105,10 @@ func (h *HA) controller() {
|
|||
|
||||
oldInstancesRemoved := false
|
||||
|
||||
logTicker := time.NewTicker(time.Second * 60)
|
||||
defer logTicker.Stop()
|
||||
shouldLog := true
|
||||
|
||||
for {
|
||||
select {
|
||||
case m, ok := <-h.heartbeat.Beat():
|
||||
|
|
@ -122,7 +126,7 @@ func (h *HA) controller() {
|
|||
h.logger.Debugw("Received heartbeat from future", "time", t)
|
||||
}
|
||||
if tt.Before(now.Add(-1 * timeout)) {
|
||||
h.logger.Errorw("Received heartbeat from the past %s", "time", t)
|
||||
h.logger.Errorw("Received heartbeat from the past", "time", t)
|
||||
h.signalHandover()
|
||||
continue
|
||||
}
|
||||
|
|
@ -130,13 +134,21 @@ func (h *HA) controller() {
|
|||
if err != nil {
|
||||
h.abort(err)
|
||||
}
|
||||
if err = h.realize(s, t); err != nil {
|
||||
|
||||
select {
|
||||
case <-logTicker.C:
|
||||
shouldLog = true
|
||||
default:
|
||||
}
|
||||
if err = h.realize(s, t, shouldLog); err != nil {
|
||||
h.abort(err)
|
||||
}
|
||||
if !oldInstancesRemoved {
|
||||
go h.removeOldInstances(s)
|
||||
oldInstancesRemoved = true
|
||||
}
|
||||
|
||||
shouldLog = false
|
||||
case <-h.heartbeat.Lost():
|
||||
h.logger.Error("Lost heartbeat")
|
||||
h.signalHandover()
|
||||
|
|
@ -146,12 +158,13 @@ func (h *HA) controller() {
|
|||
}
|
||||
}
|
||||
|
||||
func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error {
|
||||
func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLog bool) error {
|
||||
// boff := backoff.NewExponentialWithJitter(time.Millisecond*1, time.Second*1)
|
||||
for attempt := 0; true; attempt++ {
|
||||
// sleep := boff(uint64(attempt))
|
||||
// h.logger.Debugf("Sleeping for %s..", sleep)
|
||||
// time.Sleep(sleep)
|
||||
|
||||
ctx, cancel := context.WithCancel(h.ctx)
|
||||
tx, err := h.db.BeginTxx(ctx, &sql.TxOptions{
|
||||
Isolation: sql.LevelSerializable,
|
||||
|
|
@ -159,12 +172,22 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rows, err := tx.QueryxContext(ctx, `SELECT 1 FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?`, s.EnvironmentID(), "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout)))
|
||||
rows, err := tx.QueryxContext(ctx, `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?`, s.EnvironmentID(), "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
takeover := true
|
||||
for rows.Next() {
|
||||
instance := &v1.IcingadbInstance{}
|
||||
err := rows.StructScan(instance)
|
||||
if err != nil {
|
||||
h.logger.Errorw("Can't scan currently active instance", zap.Error(err))
|
||||
break
|
||||
}
|
||||
|
||||
if shouldLog {
|
||||
h.logger.Infow("Another instance is active", "instance_id", instance.Id, "heartbeat", instance.Heartbeat, zap.Duration("heartbeat_age", time.Now().Sub(instance.Heartbeat.Time())))
|
||||
}
|
||||
takeover = false
|
||||
break
|
||||
}
|
||||
|
|
@ -197,10 +220,10 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error {
|
|||
if err != nil {
|
||||
cancel()
|
||||
if !utils.IsDeadlock(err) {
|
||||
h.logger.Errorw("Can't Update or insert instance.", zap.Error(err))
|
||||
h.logger.Errorw("Can't update or insert instance", zap.Error(err))
|
||||
break
|
||||
} else {
|
||||
h.logger.Infow("Can't Update or insert instance. Retrying..", zap.Error(err))
|
||||
h.logger.Infow("Can't update or insert instance. Retrying", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
@ -248,7 +271,6 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus) {
|
|||
|
||||
func (h *HA) signalHandover() {
|
||||
if h.responsible {
|
||||
h.logger.Warn("Handing over..")
|
||||
h.responsible = false
|
||||
h.handover <- struct{}{}
|
||||
}
|
||||
|
|
@ -256,7 +278,6 @@ func (h *HA) signalHandover() {
|
|||
|
||||
func (h *HA) signalTakeover() {
|
||||
if !h.responsible {
|
||||
h.logger.Info("Taking over..")
|
||||
h.responsible = true
|
||||
h.takeover <- struct{}{}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,8 +40,6 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti
|
|||
stream := "icinga:runtime"
|
||||
updateMessagesByKey := make(map[string]chan<- redis.XMessage)
|
||||
|
||||
r.logger.Infof("Syncing runtime updates")
|
||||
|
||||
for _, factoryFunc := range factoryFuncs {
|
||||
factoryFunc = factoryFunc.WithInit
|
||||
|
||||
|
|
@ -54,7 +52,7 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti
|
|||
|
||||
updateMessagesByKey[fmt.Sprintf("icinga:%s", utils.Key(name, ':'))] = updateMessages
|
||||
|
||||
r.logger.Infof("Syncing runtime updates of %s", name)
|
||||
r.logger.Debugf("Syncing runtime updates of %s", name)
|
||||
g.Go(structifyStream(ctx, updateMessages, upsertEntities, deleteIds, structify.MakeMapStructifier(reflect.TypeOf(v).Elem(), "json")))
|
||||
|
||||
g.Go(func() error {
|
||||
|
|
|
|||
|
|
@ -111,12 +111,18 @@ func (h Heartbeat) controller() {
|
|||
for {
|
||||
select {
|
||||
case m := <-messages:
|
||||
h.active = true // TODO(el): We might only want to set this once
|
||||
if !h.active {
|
||||
h.logger.Info("Received first Icinga 2 heartbeat")
|
||||
h.active = true
|
||||
}
|
||||
h.beat <- m
|
||||
case <-time.After(timeout):
|
||||
if h.active {
|
||||
h.logger.Warn("Lost Icinga 2 heartbeat", zap.Duration("timeout", timeout))
|
||||
h.lost <- struct{}{}
|
||||
h.active = false
|
||||
} else {
|
||||
h.logger.Warn("Waiting for Icinga 2 heartbeat")
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
|
|
|
|||
Loading…
Reference in a new issue