mirror of
https://github.com/Icinga/icingadb.git
synced 2026-05-28 04:35:54 -04:00
Merge pull request #56 from lippserd/feature/ha-and-database-logging
Improve database and HA logging
This commit is contained in:
commit
7004e99cb5
7 changed files with 123 additions and 39 deletions
|
|
@ -47,9 +47,27 @@ func run() int {
|
|||
}
|
||||
}
|
||||
}()
|
||||
|
||||
logger.Info("Starting Icinga DB")
|
||||
|
||||
db := cmd.Database()
|
||||
defer db.Close()
|
||||
{
|
||||
logger.Info("Connecting to database")
|
||||
err := db.Ping()
|
||||
if err != nil {
|
||||
panic(errors.Wrap(err, "can't connect to database"))
|
||||
}
|
||||
}
|
||||
|
||||
rc := cmd.Redis()
|
||||
{
|
||||
logger.Info("Connecting to Redis")
|
||||
_, err := rc.Ping(context.Background()).Result()
|
||||
if err != nil {
|
||||
panic(errors.Wrap(err, "can't connect to Redis"))
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||
heartbeat := icingaredis.NewHeartbeat(ctx, rc, logger)
|
||||
|
|
@ -71,6 +89,8 @@ func run() int {
|
|||
for hactx.Err() == nil {
|
||||
select {
|
||||
case <-ha.Takeover():
|
||||
logger.Info("Taking over")
|
||||
|
||||
go func() {
|
||||
for hactx.Err() == nil {
|
||||
synctx, cancelSynctx := context.WithCancel(hactx)
|
||||
|
|
@ -81,6 +101,8 @@ func run() int {
|
|||
|
||||
dump := icingadb.NewDumpSignals(rc, logger)
|
||||
g.Go(func() error {
|
||||
logger.Info("Staring config dump signal handling")
|
||||
|
||||
return dump.Listen(synctx)
|
||||
})
|
||||
|
||||
|
|
@ -101,13 +123,18 @@ func run() int {
|
|||
})
|
||||
|
||||
g.Go(func() error {
|
||||
logger.Info("Starting history sync")
|
||||
|
||||
return hs.Sync(synctx)
|
||||
})
|
||||
|
||||
g.Go(func() error {
|
||||
logger.Info("Starting overdue sync")
|
||||
|
||||
return ods.Sync(synctx)
|
||||
})
|
||||
|
||||
logger.Info("Starting config sync")
|
||||
for _, factory := range v1.Factories {
|
||||
factory := factory
|
||||
|
||||
|
|
@ -125,6 +152,8 @@ func run() int {
|
|||
|
||||
<-dump.Done("icinga:customvar")
|
||||
|
||||
logger.Infof("Syncing customvar")
|
||||
|
||||
cv := common.NewSyncSubject(v1.NewCustomvar)
|
||||
|
||||
cvs, redisErrs := rc.YieldAll(synctx, cv)
|
||||
|
|
@ -181,6 +210,8 @@ func run() int {
|
|||
g.Go(func() error {
|
||||
wg.Wait()
|
||||
|
||||
logger.Infof("Starting runtime updates sync")
|
||||
|
||||
return rt.Sync(synctx, v1.Factories, lastRuntimeStreamId)
|
||||
})
|
||||
|
||||
|
|
@ -190,6 +221,8 @@ func run() int {
|
|||
}
|
||||
}()
|
||||
case <-ha.Handover():
|
||||
logger.Warn("Handing over")
|
||||
|
||||
cancelHactx()
|
||||
case <-hactx.Done():
|
||||
// Nothing to do here, surrounding loop will terminate now.
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ func New() *Command {
|
|||
func (c Command) Database() *icingadb.DB {
|
||||
db, err := c.Config.Database.Open(c.Logger)
|
||||
if err != nil {
|
||||
c.Logger.Fatal("can't create database connection pool from config", zap.Error(err))
|
||||
c.Logger.Fatalw("can't create database connection pool from config", zap.Error(err))
|
||||
}
|
||||
|
||||
return db
|
||||
|
|
@ -50,7 +50,7 @@ func (c Command) Database() *icingadb.DB {
|
|||
func (c Command) Redis() *icingaredis.Client {
|
||||
rc, err := c.Config.Redis.NewClient(c.Logger)
|
||||
if err != nil {
|
||||
c.Logger.Fatal("can't create Redis client from config", zap.Error(err))
|
||||
c.Logger.Fatalw("can't create Redis client from config", zap.Error(err))
|
||||
}
|
||||
|
||||
return rc
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
|
@ -27,7 +28,7 @@ type Redis struct {
|
|||
func (r *Redis) NewClient(logger *zap.SugaredLogger) (*icingaredis.Client, error) {
|
||||
c := redis.NewClient(&redis.Options{
|
||||
Addr: r.Address,
|
||||
Dialer: dial,
|
||||
Dialer: dialWithLogging(logger),
|
||||
Password: r.Password,
|
||||
DB: 0, // Use default DB,
|
||||
ReadTimeout: r.Timeout,
|
||||
|
|
@ -40,27 +41,36 @@ func (r *Redis) NewClient(logger *zap.SugaredLogger) (*icingaredis.Client, error
|
|||
return icingaredis.NewClient(c, logger, &r.Options), nil
|
||||
}
|
||||
|
||||
// dial behaves like net.Dialer#DialContext, but re-tries on syscall.ECONNREFUSED.
|
||||
func dial(ctx context.Context, network, addr string) (conn net.Conn, err error) {
|
||||
var dl net.Dialer
|
||||
// dialWithLogging returns a Redis Dialer with logging capabilities.
|
||||
func dialWithLogging(logger *zap.SugaredLogger) func(context.Context, string, string) (net.Conn, error) {
|
||||
// dial behaves like net.Dialer#DialContext, but re-tries on syscall.ECONNREFUSED.
|
||||
return func(ctx context.Context, network, addr string) (conn net.Conn, err error) {
|
||||
var dl net.Dialer
|
||||
var logFirstError sync.Once
|
||||
|
||||
err = retry.WithBackoff(
|
||||
ctx,
|
||||
func(ctx context.Context) (err error) {
|
||||
conn, err = dl.DialContext(ctx, network, addr)
|
||||
return
|
||||
},
|
||||
func(err error) bool {
|
||||
if op, ok := err.(*net.OpError); ok {
|
||||
sys, ok := op.Err.(*os.SyscallError)
|
||||
return ok && sys.Err == syscall.ECONNREFUSED
|
||||
}
|
||||
return false
|
||||
},
|
||||
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
|
||||
5*time.Minute,
|
||||
)
|
||||
return
|
||||
err = retry.WithBackoff(
|
||||
ctx,
|
||||
func(ctx context.Context) (err error) {
|
||||
conn, err = dl.DialContext(ctx, network, addr)
|
||||
logFirstError.Do(func() {
|
||||
if err != nil {
|
||||
logger.Warnw("Can't connect to Redis. Retrying", zap.Error(err))
|
||||
}
|
||||
})
|
||||
return
|
||||
},
|
||||
func(err error) bool {
|
||||
if op, ok := err.(*net.OpError); ok {
|
||||
sys, ok := op.Err.(*os.SyscallError)
|
||||
return ok && sys.Err == syscall.ECONNREFUSED
|
||||
}
|
||||
return false
|
||||
},
|
||||
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
|
||||
5*time.Minute,
|
||||
)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// UnmarshalYAML implements the yaml.Unmarshaler interface.
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import (
|
|||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
|
@ -26,10 +27,16 @@ type Driver struct {
|
|||
|
||||
// TODO(el): Test DNS.
|
||||
func (d Driver) Open(dsn string) (c driver.Conn, err error) {
|
||||
var logFirstError sync.Once
|
||||
err = retry.WithBackoff(
|
||||
context.Background(),
|
||||
func(context.Context) (err error) {
|
||||
c, err = d.Driver.Open(dsn)
|
||||
logFirstError.Do(func() {
|
||||
if err != nil {
|
||||
d.Logger.Warnw("Can't connect to database. Retrying", zap.Error(err))
|
||||
}
|
||||
})
|
||||
return
|
||||
},
|
||||
shouldRetry,
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"database/sql"
|
||||
"encoding/hex"
|
||||
"github.com/google/uuid"
|
||||
"github.com/icinga/icingadb/pkg/backoff"
|
||||
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis"
|
||||
icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1"
|
||||
|
|
@ -105,6 +106,10 @@ func (h *HA) controller() {
|
|||
|
||||
oldInstancesRemoved := false
|
||||
|
||||
logTicker := time.NewTicker(time.Second * 60)
|
||||
defer logTicker.Stop()
|
||||
shouldLog := true
|
||||
|
||||
for {
|
||||
select {
|
||||
case m, ok := <-h.heartbeat.Beat():
|
||||
|
|
@ -122,7 +127,7 @@ func (h *HA) controller() {
|
|||
h.logger.Debugw("Received heartbeat from future", "time", t)
|
||||
}
|
||||
if tt.Before(now.Add(-1 * timeout)) {
|
||||
h.logger.Errorw("Received heartbeat from the past %s", "time", t)
|
||||
h.logger.Errorw("Received heartbeat from the past", "time", t)
|
||||
h.signalHandover()
|
||||
continue
|
||||
}
|
||||
|
|
@ -130,13 +135,21 @@ func (h *HA) controller() {
|
|||
if err != nil {
|
||||
h.abort(err)
|
||||
}
|
||||
if err = h.realize(s, t); err != nil {
|
||||
|
||||
select {
|
||||
case <-logTicker.C:
|
||||
shouldLog = true
|
||||
default:
|
||||
}
|
||||
if err = h.realize(s, t, shouldLog); err != nil {
|
||||
h.abort(err)
|
||||
}
|
||||
if !oldInstancesRemoved {
|
||||
go h.removeOldInstances(s)
|
||||
oldInstancesRemoved = true
|
||||
}
|
||||
|
||||
shouldLog = false
|
||||
case <-h.heartbeat.Lost():
|
||||
h.logger.Error("Lost heartbeat")
|
||||
h.signalHandover()
|
||||
|
|
@ -146,12 +159,12 @@ func (h *HA) controller() {
|
|||
}
|
||||
}
|
||||
|
||||
func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error {
|
||||
// boff := backoff.NewExponentialWithJitter(time.Millisecond*1, time.Second*1)
|
||||
func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli, shouldLog bool) error {
|
||||
boff := backoff.NewExponentialWithJitter(time.Millisecond*256, time.Second*3)
|
||||
for attempt := 0; true; attempt++ {
|
||||
// sleep := boff(uint64(attempt))
|
||||
// h.logger.Debugf("Sleeping for %s..", sleep)
|
||||
// time.Sleep(sleep)
|
||||
sleep := boff(uint64(attempt))
|
||||
time.Sleep(sleep)
|
||||
|
||||
ctx, cancel := context.WithCancel(h.ctx)
|
||||
tx, err := h.db.BeginTxx(ctx, &sql.TxOptions{
|
||||
Isolation: sql.LevelSerializable,
|
||||
|
|
@ -159,12 +172,22 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rows, err := tx.QueryxContext(ctx, `SELECT 1 FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?`, s.EnvironmentID(), "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout)))
|
||||
rows, err := tx.QueryxContext(ctx, `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND id != ? AND heartbeat > ?`, s.EnvironmentID(), "y", h.instanceId, utils.UnixMilli(time.Now().Add(-1*timeout)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
takeover := true
|
||||
for rows.Next() {
|
||||
instance := &v1.IcingadbInstance{}
|
||||
err := rows.StructScan(instance)
|
||||
if err != nil {
|
||||
h.logger.Errorw("Can't scan currently active instance", zap.Error(err))
|
||||
break
|
||||
}
|
||||
|
||||
if shouldLog {
|
||||
h.logger.Infow("Another instance is active", "instance_id", instance.Id, zap.String("environment", s.Environment), "heartbeat", instance.Heartbeat, zap.Duration("heartbeat_age", time.Now().Sub(instance.Heartbeat.Time())))
|
||||
}
|
||||
takeover = false
|
||||
break
|
||||
}
|
||||
|
|
@ -197,10 +220,15 @@ func (h *HA) realize(s *icingaredisv1.IcingaStatus, t *types.UnixMilli) error {
|
|||
if err != nil {
|
||||
cancel()
|
||||
if !utils.IsDeadlock(err) {
|
||||
h.logger.Errorw("Can't Update or insert instance.", zap.Error(err))
|
||||
h.logger.Errorw("Can't update or insert instance", zap.Error(err))
|
||||
break
|
||||
} else {
|
||||
h.logger.Infow("Can't Update or insert instance. Retrying..", zap.Error(err))
|
||||
if attempt > 2 {
|
||||
// Log with info level after third attempt
|
||||
h.logger.Infow("Can't update or insert instance. Retrying", zap.Error(err), zap.Int("retry count", attempt))
|
||||
} else {
|
||||
h.logger.Debugw("Can't update or insert instance. Retrying", zap.Error(err), zap.Int("retry count", attempt))
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
@ -248,7 +276,6 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus) {
|
|||
|
||||
func (h *HA) signalHandover() {
|
||||
if h.responsible {
|
||||
h.logger.Warn("Handing over..")
|
||||
h.responsible = false
|
||||
h.handover <- struct{}{}
|
||||
}
|
||||
|
|
@ -256,7 +283,6 @@ func (h *HA) signalHandover() {
|
|||
|
||||
func (h *HA) signalTakeover() {
|
||||
if !h.responsible {
|
||||
h.logger.Info("Taking over..")
|
||||
h.responsible = true
|
||||
h.takeover <- struct{}{}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,8 +40,6 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti
|
|||
stream := "icinga:runtime"
|
||||
updateMessagesByKey := make(map[string]chan<- redis.XMessage)
|
||||
|
||||
r.logger.Infof("Syncing runtime updates")
|
||||
|
||||
for _, factoryFunc := range factoryFuncs {
|
||||
factoryFunc = factoryFunc.WithInit
|
||||
|
||||
|
|
@ -54,7 +52,7 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti
|
|||
|
||||
updateMessagesByKey[fmt.Sprintf("icinga:%s", utils.Key(name, ':'))] = updateMessages
|
||||
|
||||
r.logger.Infof("Syncing runtime updates of %s", name)
|
||||
r.logger.Debugf("Syncing runtime updates of %s", name)
|
||||
g.Go(structifyStream(ctx, updateMessages, upsertEntities, deleteIds, structify.MakeMapStructifier(reflect.TypeOf(v).Elem(), "json")))
|
||||
|
||||
g.Go(func() error {
|
||||
|
|
|
|||
|
|
@ -111,12 +111,22 @@ func (h Heartbeat) controller() {
|
|||
for {
|
||||
select {
|
||||
case m := <-messages:
|
||||
h.active = true // TODO(el): We might only want to set this once
|
||||
if !h.active {
|
||||
s, err := m.IcingaStatus()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
h.logger.Infow("Received first Icinga 2 heartbeat", zap.String("environment", s.Environment))
|
||||
h.active = true
|
||||
}
|
||||
h.beat <- m
|
||||
case <-time.After(timeout):
|
||||
if h.active {
|
||||
h.logger.Warn("Lost Icinga 2 heartbeat", zap.Duration("timeout", timeout))
|
||||
h.lost <- struct{}{}
|
||||
h.active = false
|
||||
} else {
|
||||
h.logger.Warn("Waiting for Icinga 2 heartbeat")
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
|
|
|
|||
Loading…
Reference in a new issue