diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index aac043bf..f0e4420e 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -41,11 +41,11 @@ func run() int { cmd.Config.Logging.Level, cmd.Config.Logging.Output, cmd.Config.Logging.Options, + cmd.Config.Logging.Interval, ) if err != nil { utils.Fatal(errors.Wrap(err, "can't configure logging")) } - // When started by systemd, NOTIFY_SOCKET is set by systemd for Type=notify supervised services, which is the // default setting for the Icinga DB service. So we notify that Icinga DB finished starting up. _ = sdnotify.Ready() @@ -149,7 +149,7 @@ func run() int { dump := icingadb.NewDumpSignals(rc, logs.GetChildLogger("dump-signals")) g.Go(func() error { - logger.Info("Staring config dump signal handling") + logger.Debug("Staring config dump signal handling") return dump.Listen(synctx) }) @@ -172,8 +172,8 @@ func run() int { return ods.Sync(synctx) }) + syncStart := time.Now() logger.Info("Starting config sync") - for _, factory := range v1.ConfigFactories { factory := factory @@ -184,7 +184,7 @@ func run() int { return s.SyncAfterDump(synctx, common.NewSyncSubject(factory), dump) }) } - + logger.Info("Starting initial state sync") for _, factory := range v1.StateFactories { factory := factory @@ -211,6 +211,39 @@ func run() int { g.Go(func() error { configInitSync.Wait() + + elapsed := time.Since(syncStart) + logger := logs.GetChildLogger("config-sync") + if synctx.Err() == nil { + logger.Infof("Finished config sync in %s", elapsed) + } else { + logger.Warnf("Aborted config sync after %s", elapsed) + } + + return nil + }) + + g.Go(func() error { + stateInitSync.Wait() + + elapsed := time.Since(syncStart) + logger := logs.GetChildLogger("config-sync") + if synctx.Err() == nil { + logger.Infof("Finished initial state sync in %s", elapsed) + } else { + logger.Warnf("Aborted initial state sync after %s", elapsed) + } + + return nil + }) + + g.Go(func() error { + configInitSync.Wait() + + if err := synctx.Err(); err != nil { + return err + } + logger.Info("Starting config runtime updates sync") return rt.Sync(synctx, v1.ConfigFactories, runtimeConfigUpdateStreams) @@ -218,7 +251,13 @@ func run() int { g.Go(func() error { stateInitSync.Wait() + + if err := synctx.Err(); err != nil { + return err + } + logger.Info("Starting state runtime updates sync") + return rt.Sync(synctx, v1.StateFactories, runtimeStateUpdateStreams) }) @@ -275,7 +314,7 @@ func checkDbSchema(ctx context.Context, db *icingadb.DB) error { } // monitorRedisSchema monitors rc's icinga:schema version validity. -func monitorRedisSchema(logger *zap.SugaredLogger, rc *icingaredis.Client, pos string) { +func monitorRedisSchema(logger *logging.Logger, rc *icingaredis.Client, pos string) { for { var err error pos, err = checkRedisSchema(logger, rc, pos) @@ -287,7 +326,7 @@ func monitorRedisSchema(logger *zap.SugaredLogger, rc *icingaredis.Client, pos s } // checkRedisSchema verifies rc's icinga:schema version. -func checkRedisSchema(logger *zap.SugaredLogger, rc *icingaredis.Client, pos string) (newPos string, err error) { +func checkRedisSchema(logger *logging.Logger, rc *icingaredis.Client, pos string) (newPos string, err error) { if pos == "0-0" { defer time.AfterFunc(3*time.Second, func() { logger.Info("Waiting for current Redis schema version") }).Stop() } else { diff --git a/config.yml.example b/config.yml.example index a887359c..d8940a86 100644 --- a/config.yml.example +++ b/config.yml.example @@ -19,14 +19,20 @@ logging: # If not set, logs to systemd-journald when running under systemd, otherwise stderr. output: + # Interval for periodic logging defined as duration string. + # A duration string is a sequence of decimal numbers and a unit suffix, such as "20s". + # Valid units are "ms", "s", "m", "h". + # Defaults to "20s". + interval: + # Map of component-logging level pairs to define a different log level than the default value for each component. options: - database: - redis: - heartbeat: - high-availability: - config-sync: - history-sync: - runtime-updates: - overdue-sync: - dump-signals: +# database: +# redis: +# heartbeat: +# high-availability: +# config-sync: +# history-sync: +# runtime-updates: +# overdue-sync: +# dump-signals: diff --git a/doc/03-Configuration.md b/doc/03-Configuration.md index 8248a407..746b2622 100644 --- a/doc/03-Configuration.md +++ b/doc/03-Configuration.md @@ -43,6 +43,7 @@ Option | Description -------------------------|----------------------------------------------- level | **Optional.** Specifies the default logging level. Can be set to `fatal`, `error`, `warning`, `info` or `debug`. Defaults to `info`. output | **Optional.** Configures the logging output. Can be set to `console` (stderr) or `systemd-journald`. If not set, logs to systemd-journald when running under systemd, otherwise stderr. +interval | **Optional.** Interval for periodic logging defined as [duration string](#duration-string). Defaults to `"20s"`. options | **Optional.** Map of component name to logging level in order to set a different logging level for each component instead of the default one. See [logging components](#logging-components) for details. ### Logging Components @@ -58,3 +59,8 @@ history-sync | Synchronization of history entries from Redis to MySQ runtime-updates | Runtime updates of config objects after the initial config synchronization. overdue-sync | Calculation and synchronization of the overdue status of checkables. dump-signals | Dump signals received from Icinga. + +### Duration String + +A duration string is a sequence of decimal numbers and a unit suffix, such as `"20s"`. +Valid units are `"ms"`, `"s"`, `"m"` and `"h"`. diff --git a/internal/command/command.go b/internal/command/command.go index f9c35816..765559b0 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -6,10 +6,10 @@ import ( "github.com/icinga/icingadb/pkg/config" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" goflags "github.com/jessevdk/go-flags" "github.com/pkg/errors" - "go.uber.org/zap" "os" ) @@ -48,11 +48,11 @@ func New() *Command { } // Database creates and returns a new icingadb.DB connection from config.Config. -func (c Command) Database(l *zap.SugaredLogger) (*icingadb.DB, error) { +func (c Command) Database(l *logging.Logger) (*icingadb.DB, error) { return c.Config.Database.Open(l) } // Redis creates and returns a new icingaredis.Client connection from config.Config. -func (c Command) Redis(l *zap.SugaredLogger) (*icingaredis.Client, error) { +func (c Command) Redis(l *logging.Logger) (*icingaredis.Client, error) { return c.Config.Redis.NewClient(l) } diff --git a/pkg/com/counter.go b/pkg/com/counter.go index 3692c63d..52f9f7ff 100644 --- a/pkg/com/counter.go +++ b/pkg/com/counter.go @@ -1,13 +1,20 @@ package com -import "sync/atomic" +import ( + "sync" + "sync/atomic" +) // Counter implements an atomic counter. -type Counter uint64 +type Counter struct { + value uint64 + mu sync.Mutex // Protects total. + total uint64 +} // Add adds the given delta to the counter. func (c *Counter) Add(delta uint64) { - atomic.AddUint64(c.ptr(), delta) + atomic.AddUint64(&c.value, delta) } // Inc increments the counter by one. @@ -15,11 +22,27 @@ func (c *Counter) Inc() { c.Add(1) } -// Val returns the counter value. -func (c *Counter) Val() uint64 { - return atomic.LoadUint64(c.ptr()) +// Reset resets the counter to 0 and returns its previous value. +// Does not reset the total value returned from Total. +func (c *Counter) Reset() uint64 { + c.mu.Lock() + defer c.mu.Unlock() + + v := atomic.SwapUint64(&c.value, 0) + c.total += v + + return v } -func (c *Counter) ptr() *uint64 { - return (*uint64)(c) +// Total returns the total counter value. +func (c *Counter) Total() uint64 { + c.mu.Lock() + defer c.mu.Unlock() + + return c.total + c.Val() +} + +// Val returns the current counter value. +func (c *Counter) Val() uint64 { + return atomic.LoadUint64(&c.value) } diff --git a/pkg/config/database.go b/pkg/config/database.go index afc2ce6e..f2981da1 100644 --- a/pkg/config/database.go +++ b/pkg/config/database.go @@ -5,11 +5,11 @@ import ( "github.com/go-sql-driver/mysql" "github.com/icinga/icingadb/pkg/driver" "github.com/icinga/icingadb/pkg/icingadb" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx/reflectx" "github.com/pkg/errors" - "go.uber.org/zap" "net" "sync" "time" @@ -30,7 +30,7 @@ type Database struct { // Open prepares the DSN string and driver configuration, // calls sqlx.Open, but returns *icingadb.DB. -func (d *Database) Open(logger *zap.SugaredLogger) (*icingadb.DB, error) { +func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { registerDriverOnce.Do(func() { driver.Register(logger) }) diff --git a/pkg/config/logging.go b/pkg/config/logging.go index 5f4a7732..9ccd35e0 100644 --- a/pkg/config/logging.go +++ b/pkg/config/logging.go @@ -2,8 +2,10 @@ package config import ( "github.com/icinga/icingadb/pkg/logging" + "github.com/pkg/errors" "go.uber.org/zap/zapcore" "os" + "time" ) // Logging defines Logger configuration. @@ -11,6 +13,8 @@ type Logging struct { // zapcore.Level at 0 is for info level. Level zapcore.Level `yaml:"level" default:"0"` Output string `yaml:"output"` + // Interval for periodic logging. + Interval time.Duration `yaml:"interval" default:"20s"` logging.Options `yaml:"options"` } @@ -19,6 +23,10 @@ type Logging struct { // Also configures the log output if it is not configured: // systemd-journald is used when Icinga DB is running under systemd, otherwise stderr. func (l *Logging) Validate() error { + if l.Interval <= 0 { + return errors.New("periodic logging interval must be positive") + } + if l.Output == "" { if _, ok := os.LookupEnv("NOTIFY_SOCKET"); ok { // When started by systemd, NOTIFY_SOCKET is set by systemd for Type=notify supervised services, diff --git a/pkg/config/redis.go b/pkg/config/redis.go index c99ee2d1..259d87f8 100644 --- a/pkg/config/redis.go +++ b/pkg/config/redis.go @@ -6,6 +6,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/pkg/backoff" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/retry" "github.com/pkg/errors" "go.uber.org/zap" @@ -27,7 +28,7 @@ type ctxDialerFunc = func(ctx context.Context, network, addr string) (net.Conn, // NewClient prepares Redis client configuration, // calls redis.NewClient, but returns *icingaredis.Client. -func (r *Redis) NewClient(logger *zap.SugaredLogger) (*icingaredis.Client, error) { +func (r *Redis) NewClient(logger *logging.Logger) (*icingaredis.Client, error) { tlsConfig, err := r.TLS.MakeConfig(r.Address) if err != nil { return nil, err @@ -59,7 +60,7 @@ func (r *Redis) NewClient(logger *zap.SugaredLogger) (*icingaredis.Client, error } // dialWithLogging returns a Redis Dialer with logging capabilities. -func dialWithLogging(dialer ctxDialerFunc, logger *zap.SugaredLogger) ctxDialerFunc { +func dialWithLogging(dialer ctxDialerFunc, logger *logging.Logger) ctxDialerFunc { // dial behaves like net.Dialer#DialContext, but re-tries on syscall.ECONNREFUSED. return func(ctx context.Context, network, addr string) (conn net.Conn, err error) { err = retry.WithBackoff( diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index 462ab294..f246861e 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -6,6 +6,7 @@ import ( "database/sql/driver" "github.com/go-sql-driver/mysql" "github.com/icinga/icingadb/pkg/backoff" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/retry" "github.com/pkg/errors" "go.uber.org/zap" @@ -58,7 +59,7 @@ func (c RetryConnector) Driver() driver.Driver { // Driver wraps a driver.Driver that also must implement driver.DriverContext with logging capabilities and provides our RetryConnector. type Driver struct { ctxDriver - Logger *zap.SugaredLogger + Logger *logging.Logger } // OpenConnector implements the DriverContext interface. @@ -75,7 +76,7 @@ func (d Driver) OpenConnector(name string) (driver.Connector, error) { } // Register makes our database Driver available under the name "icingadb-mysql". -func Register(logger *zap.SugaredLogger) { +func Register(logger *logging.Logger) { sql.Register("icingadb-mysql", &Driver{ctxDriver: &mysql.MySQLDriver{}, Logger: logger}) _ = mysql.SetLogger(mysqlLogger(func(v ...interface{}) { logger.Debug(v...) })) } diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 606e7ebf..bfbf9246 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -9,11 +9,12 @@ import ( "github.com/icinga/icingadb/pkg/backoff" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/retry" "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "github.com/pkg/errors" - "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "reflect" @@ -29,7 +30,7 @@ type DB struct { Options *Options - logger *zap.SugaredLogger + logger *logging.Logger tableSemaphores map[string]*semaphore.Weighted tableSemaphoresMu sync.Mutex } @@ -75,7 +76,7 @@ func (o *Options) Validate() error { } // NewDb returns a new icingadb.DB wrapper for a pre-existing *sqlx.DB. -func NewDb(db *sqlx.DB, logger *zap.SugaredLogger, options *Options) *DB { +func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB { return &DB{ DB: db, logger: logger, @@ -208,17 +209,15 @@ func (db *DB) BuildWhere(subject interface{}) (string, int) { // derives and expands a query and executes it with this set of arguments until the arg stream has been processed. // The derived queries are executed in a separate goroutine with a weighting of 1 // and can be executed concurrently to the extent allowed by the semaphore passed in sem. -func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}) error { - var cnt com.Counter +// Arguments for which the query ran successfully will be streamed on the succeeded channel. +func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}, succeeded chan<- interface{}) error { + var counter com.Counter + defer db.log(ctx, query, &counter).Stop() + g, ctx := errgroup.WithContext(ctx) // Use context from group. bulk := com.Bulk(ctx, arg, count) - db.logger.Debugf("Executing %s", query) - defer utils.Timed(time.Now(), func(elapsed time.Duration) { - db.logger.Debugf("Executed %s with %d rows in %s", query, cnt.Val(), elapsed) - }) - g.Go(func() error { g, ctx := errgroup.WithContext(ctx) @@ -245,7 +244,17 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph return internal.CantPerformQuery(err, query) } - cnt.Add(uint64(len(b))) + counter.Add(uint64(len(b))) + + if succeeded != nil { + for _, row := range b { + select { + case <-ctx.Done(): + return ctx.Err() + case succeeded <- row: + } + } + } return nil }, @@ -274,15 +283,12 @@ func (db *DB) NamedBulkExec( ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity, succeeded chan<- contracts.Entity, ) error { - var cnt com.Counter + var counter com.Counter + defer db.log(ctx, query, &counter).Stop() + g, ctx := errgroup.WithContext(ctx) bulk := com.BulkEntities(ctx, arg, count) - db.logger.Debugf("Executing %s", query) - defer utils.Timed(time.Now(), func(elapsed time.Duration) { - db.logger.Debugf("Executed %s with %d rows in %s", query, cnt.Val(), elapsed) - }) - g.Go(func() error { for { select { @@ -302,13 +308,12 @@ func (db *DB) NamedBulkExec( return retry.WithBackoff( ctx, func(ctx context.Context) error { - db.logger.Debugf("Executing %s with %d rows..", query, len(b)) _, err := db.NamedExecContext(ctx, query, b) if err != nil { return internal.CantPerformQuery(err, query) } - cnt.Add(uint64(len(b))) + counter.Add(uint64(len(b))) if succeeded != nil { for _, row := range b { @@ -346,15 +351,12 @@ func (db *DB) NamedBulkExec( func (db *DB) NamedBulkExecTx( ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity, ) error { - var cnt com.Counter + var counter com.Counter + defer db.log(ctx, query, &counter).Stop() + g, ctx := errgroup.WithContext(ctx) bulk := com.BulkEntities(ctx, arg, count) - db.logger.Debugf("Executing %s", query) - defer utils.Timed(time.Now(), func(elapsed time.Duration) { - db.logger.Debugf("Executed %s with %d rows in %s", query, cnt.Val(), elapsed) - }) - g.Go(func() error { for { select { @@ -394,7 +396,7 @@ func (db *DB) NamedBulkExecTx( return errors.Wrap(err, "can't commit transaction") } - cnt.Add(uint64(len(b))) + counter.Add(uint64(len(b))) return nil }, @@ -428,18 +430,13 @@ func (db *DB) BatchSizeByPlaceholders(n int) int { // scans each resulting row into an entity returned by the factory function, // and streams them into a returned channel. func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, scope interface{}) (<-chan contracts.Entity, <-chan error) { - var cnt com.Counter entities := make(chan contracts.Entity, 1) g, ctx := errgroup.WithContext(ctx) - db.logger.Infof("Syncing %s", query) - g.Go(func() error { + var counter com.Counter + defer db.log(ctx, query, &counter).Stop() defer close(entities) - defer utils.Timed(time.Now(), func(elapsed time.Duration) { - v := factoryFunc() - db.logger.Infof("Fetched %d elements of %s in %s", cnt.Val(), utils.Name(v), elapsed) - }) rows, err := db.NamedQueryContext(ctx, query, scope) if err != nil { @@ -456,7 +453,7 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF select { case entities <- e: - cnt.Inc() + counter.Inc() case <-ctx.Done(): return ctx.Err() } @@ -519,9 +516,10 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Enti // The delete statement is created using BuildDeleteStmt with the passed entityType. // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and // concurrency is controlled via Options.MaxConnectionsPerTable. -func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}) error { +// IDs for which the query ran successfully will be streamed on the succeeded channel. +func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}, succeeded chan<- interface{}) error { sem := db.GetSemaphoreForTable(utils.TableName(entityType)) - return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids) + return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, succeeded) } // Delete creates a channel from the specified ids and @@ -533,7 +531,7 @@ func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []int } close(idsCh) - return db.DeleteStreamed(ctx, entityType, idsCh) + return db.DeleteStreamed(ctx, entityType, idsCh, nil) } func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted { @@ -549,6 +547,16 @@ func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted { } } +func (db *DB) log(ctx context.Context, query string, counter *com.Counter) periodic.Stopper { + return periodic.Start(ctx, db.logger.Interval(), func(tick periodic.Tick) { + if count := counter.Reset(); count > 0 { + db.logger.Debugf("Executed %q with %d rows", query, count) + } + }, periodic.OnStop(func(tick periodic.Tick) { + db.logger.Debugf("Finished executing %q with %d rows in %s", query, counter.Total(), tick.Elapsed) + })) +} + // IsRetryable checks whether the given error is retryable. func IsRetryable(err error) bool { if errors.Is(err, driver.ErrBadConn) { diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go index b5bdcf2c..4f6d0989 100644 --- a/pkg/icingadb/delta.go +++ b/pkg/icingadb/delta.go @@ -2,8 +2,10 @@ package icingadb import ( "context" + "fmt" "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" "go.uber.org/zap" "time" @@ -16,12 +18,12 @@ type Delta struct { Delete EntitiesById Subject *common.SyncSubject done chan error - logger *zap.SugaredLogger + logger *logging.Logger } // NewDelta creates a new Delta and starts calculating it. The caller must ensure // that no duplicate entities are sent to the same stream. -func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *zap.SugaredLogger) *Delta { +func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *logging.Logger) *Delta { delta := &Delta{ Subject: subject, done: make(chan error, 1), @@ -101,7 +103,7 @@ func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan contract delta.Update = update delta.Delete = actual - delta.logger.Debugw("Delta finished", + delta.logger.Debugw(fmt.Sprintf("Finished %s delta", utils.Name(delta.Subject.Entity())), zap.String("subject", utils.Name(delta.Subject.Entity())), zap.Duration("time_total", time.Since(start)), zap.Duration("time_actual", endActual.Sub(start)), diff --git a/pkg/icingadb/delta_test.go b/pkg/icingadb/delta_test.go index 9f1cdf88..909cc229 100644 --- a/pkg/icingadb/delta_test.go +++ b/pkg/icingadb/delta_test.go @@ -6,6 +6,7 @@ import ( "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -15,6 +16,7 @@ import ( "strconv" "sync" "testing" + "time" ) func TestDelta(t *testing.T) { @@ -88,7 +90,7 @@ func TestDelta(t *testing.T) { chActual := make(chan contracts.Entity) chDesired := make(chan contracts.Entity) subject := common.NewSyncSubject(v1.NewEndpoint) - logger := zaptest.NewLogger(t).Sugar() + logger := logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Second) go func() { sendOrder.Send(id, test, chActual, chDesired) @@ -117,7 +119,7 @@ func TestDelta(t *testing.T) { chActual := make(chan contracts.Entity) chDesired := make(chan contracts.Entity) subject := common.NewSyncSubject(v1.NewEndpoint) - logger := zaptest.NewLogger(t).Sugar() + logger := logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Second) expectedCreate := make(map[uint64]uint64) expectedUpdate := make(map[uint64]uint64) @@ -256,7 +258,7 @@ func benchmarkDelta(b *testing.B, numEntities int) { } subject := common.NewSyncSubject(v1.NewEndpoint) // logger := zaptest.NewLogger(b).Sugar() - logger := zap.New(zapcore.NewTee()).Sugar() + logger := logging.NewLogger(zap.New(zapcore.NewTee()).Sugar(), time.Second) b.ResetTimer() for i := 0; i < b.N; i++ { d := NewDelta(context.Background(), chActual[i], chDesired[i], subject, logger) diff --git a/pkg/icingadb/dump_signals.go b/pkg/icingadb/dump_signals.go index cfc82c60..cb0bd3fd 100644 --- a/pkg/icingadb/dump_signals.go +++ b/pkg/icingadb/dump_signals.go @@ -4,6 +4,7 @@ import ( "context" "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" "github.com/pkg/errors" "go.uber.org/zap" "sync" @@ -13,7 +14,7 @@ import ( // Dump-done signals are passed on via Done channels, while InProgress must be checked for dump-wip signals. type DumpSignals struct { redis *icingaredis.Client - logger *zap.SugaredLogger + logger *logging.Logger mutex sync.Mutex doneCh map[string]chan struct{} allDoneCh chan struct{} @@ -21,7 +22,7 @@ type DumpSignals struct { } // NewDumpSignals returns new DumpSignals. -func NewDumpSignals(redis *icingaredis.Client, logger *zap.SugaredLogger) *DumpSignals { +func NewDumpSignals(redis *icingaredis.Client, logger *logging.Logger) *DumpSignals { return &DumpSignals{ redis: redis, logger: logger, diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 30d31222..00a0bf68 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -11,6 +11,7 @@ import ( v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" @@ -30,7 +31,7 @@ type HA struct { environmentMu sync.Mutex environment *v1.Environment heartbeat *icingaredis.Heartbeat - logger *zap.SugaredLogger + logger *logging.Logger responsible bool handover chan struct{} takeover chan struct{} @@ -41,7 +42,7 @@ type HA struct { } // NewHA returns a new HA and starts the controller loop. -func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger *zap.SugaredLogger) *HA { +func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger *logging.Logger) *HA { ctx, cancelCtx := context.WithCancel(ctx) instanceId := uuid.New() diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index 4953fc86..1b02ea80 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -10,26 +10,26 @@ import ( v1types "github.com/icinga/icingadb/pkg/icingadb/v1" v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/structify" "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" - "go.uber.org/zap" "golang.org/x/sync/errgroup" "reflect" "sync" - "time" ) // Sync specifies the source and destination of a history sync. type Sync struct { db *icingadb.DB redis *icingaredis.Client - logger *zap.SugaredLogger + logger *logging.Logger } // NewSync creates a new Sync. -func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync { +func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) *Sync { return &Sync{ db: db, redis: redis, @@ -45,7 +45,7 @@ func (s Sync) Sync(ctx context.Context) error { key := key pipeline := pipeline - s.logger.Debugw("Starting history sync", zap.String("type", key)) + s.logger.Debugf("Starting %s history sync", key) // The pipeline consists of n+2 stages connected sequentially using n+1 channels of type chan redis.XMessage, // where n = len(pipeline), i.e. the number of actual sync stages. So the resulting pipeline looks like this: @@ -134,16 +134,15 @@ func (s Sync) readFromRedis(ctx context.Context, key string, output chan<- redis // deleteFromRedis is the last stage of the history sync pipeline. It receives history entries from the second to last // pipeline stage and then deletes the stream entry from Redis as all pipeline stages successfully processed the entry. func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redis.XMessage) error { - const logInterval = 20 * time.Second - - var count uint64 // Count of synced entries for periodic logging. - stream := "icinga:history:stream:" + key - - logTicker := time.NewTicker(logInterval) - defer logTicker.Stop() + var counter com.Counter + defer periodic.Start(ctx, s.logger.Interval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + s.logger.Infof("Synced %d %s history items", count, key) + } + }).Stop() bulks := com.BulkXMessages(ctx, input, s.redis.Options.HScanCount) - + stream := "icinga:history:stream:" + key for { select { case bulk := <-bulks: @@ -157,14 +156,7 @@ func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redi return icingaredis.WrapCmdErr(cmd) } - count += uint64(len(ids)) - - case <-logTicker.C: - if count > 0 { - s.logger.Infof("Inserted %d %s history entries in the last %s", count, key, logInterval) - count = 0 - } - + counter.Add(uint64(len(ids))) case <-ctx.Done(): return ctx.Err() } diff --git a/pkg/icingadb/overdue/sync.go b/pkg/icingadb/overdue/sync.go index aaa588f9..0c0a7dbe 100644 --- a/pkg/icingadb/overdue/sync.go +++ b/pkg/icingadb/overdue/sync.go @@ -6,16 +6,17 @@ import ( "github.com/go-redis/redis/v8" "github.com/google/uuid" "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingadb/v1/overdue" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" "github.com/pkg/errors" - "go.uber.org/zap" "golang.org/x/sync/errgroup" "strconv" - "sync/atomic" "time" ) @@ -23,11 +24,11 @@ import ( type Sync struct { db *icingadb.DB redis *icingaredis.Client - logger *zap.SugaredLogger + logger *logging.Logger } // NewSync creates a new Sync. -func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync { +func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) *Sync { return &Sync{ db: db, redis: redis, @@ -57,23 +58,19 @@ func (s Sync) Sync(ctx context.Context) error { } g, ctx := errgroup.WithContext(ctx) - hostCounter := new(uint64) - serviceCounter := new(uint64) + + var hostCounter com.Counter + defer s.log(ctx, "host", &hostCounter).Stop() + + var serviceCounter com.Counter + defer s.log(ctx, "service", &serviceCounter).Stop() g.Go(func() error { - return s.sync(ctx, "host", overdue.NewHostState, hostCounter) + return s.sync(ctx, "host", overdue.NewHostState, &hostCounter) }) g.Go(func() error { - return s.log(ctx, "host", hostCounter) - }) - - g.Go(func() error { - return s.sync(ctx, "service", overdue.NewServiceState, serviceCounter) - }) - - g.Go(func() error { - return s.log(ctx, "service", serviceCounter) + return s.sync(ctx, "service", overdue.NewServiceState, &serviceCounter) }) return g.Wait() @@ -81,7 +78,7 @@ func (s Sync) Sync(ctx context.Context) error { // initSync initializes icingadb:overdue:objectType from the database. func (s Sync) initSync(ctx context.Context, objectType string) error { - s.logger.Infof("Refreshing already synced %s overdue indicators", objectType) + s.logger.Debugf("Refreshing already synced %s overdue indicators", objectType) start := time.Now() var rows []v1.IdMeta @@ -112,7 +109,7 @@ func (s Sync) initSync(ctx context.Context, objectType string) error { }) if err == nil { - s.logger.Infof( + s.logger.Debugf( "Refreshing %d already synced %s overdue indicators took %s", len(rows), objectType, time.Since(start), ) @@ -124,21 +121,12 @@ func (s Sync) initSync(ctx context.Context, objectType string) error { } // log periodically logs sync's workload. -func (s Sync) log(ctx context.Context, objectType string, counter *uint64) error { - const period = 20 * time.Second - periodically := time.NewTicker(period) - defer periodically.Stop() - - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-periodically.C: - if count := atomic.SwapUint64(counter, 0); count > 0 { - s.logger.Infof("Synced %d %s overdue indicators in the last %s", count, objectType, period) - } +func (s Sync) log(ctx context.Context, objectType string, counter *com.Counter) periodic.Stopper { + return periodic.Start(ctx, s.logger.Interval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + s.logger.Infof("Synced %d %s overdue indicators", count, objectType) } - } + }) } // luaGetOverdues takes the following KEYS: @@ -176,8 +164,8 @@ return res `) // sync synchronizes Redis overdue sets from s.redis to s.db for objectType. -func (s Sync) sync(ctx context.Context, objectType string, factory factory, counter *uint64) error { - s.logger.Infof("Syncing %s overdue indicators", objectType) +func (s Sync) sync(ctx context.Context, objectType string, factory factory, counter *com.Counter) error { + s.logger.Debugf("Syncing %s overdue indicators", objectType) keys := [3]string{"icinga:nextupdate:" + objectType, "icingadb:overdue:" + objectType, ""} if rand, err := uuid.NewRandom(); err == nil { @@ -231,7 +219,7 @@ func (s Sync) sync(ctx context.Context, objectType string, factory factory, coun // updateOverdue sets objectType_state#is_overdue for ids to overdue // and updates icingadb:overdue:objectType respectively. func (s Sync) updateOverdue( - ctx context.Context, objectType string, factory factory, counter *uint64, ids []interface{}, overdue bool, + ctx context.Context, objectType string, factory factory, counter *com.Counter, ids []interface{}, overdue bool, ) error { if len(ids) < 1 { return nil @@ -241,7 +229,7 @@ func (s Sync) updateOverdue( return errors.Wrap(err, "can't update overdue indicators") } - atomic.AddUint64(counter, uint64(len(ids))) + counter.Add(uint64(len(ids))) var op func(ctx context.Context, key string, members ...interface{}) *redis.IntCmd if overdue { diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 89f90efe..a953b39e 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -9,10 +9,11 @@ import ( "github.com/icinga/icingadb/pkg/contracts" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/structify" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" - "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "reflect" @@ -23,11 +24,11 @@ import ( type RuntimeUpdates struct { db *DB redis *icingaredis.Client - logger *zap.SugaredLogger + logger *logging.Logger } // NewRuntimeUpdates creates a new RuntimeUpdates. -func NewRuntimeUpdates(db *DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *RuntimeUpdates { +func NewRuntimeUpdates(db *DB, redis *icingaredis.Client, logger *logging.Logger) *RuntimeUpdates { return &RuntimeUpdates{ db: db, redis: redis, @@ -75,14 +76,63 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti r.logger.Debugf("Syncing runtime updates of %s", s.Name()) g.Go(structifyStream(ctx, updateMessages, upsertEntities, deleteIds, structify.MakeMapStructifier(reflect.TypeOf(s.Entity()).Elem(), "json"))) + upserted := make(chan contracts.Entity) g.Go(func() error { + defer close(upserted) + stmt, placeholders := r.db.BuildUpsertStmt(s.Entity()) // Updates must be executed in order, ensure this by using a semaphore with maximum 1. sem := semaphore.NewWeighted(1) - return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, nil) + return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, upserted) }) g.Go(func() error { - return r.db.DeleteStreamed(ctx, s.Entity(), deleteIds) + var counter com.Counter + defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Upserted %d %s items", count, s.Name()) + } + }).Stop() + + for { + select { + case _, ok := <-upserted: + if !ok { + return nil + } + + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + deleted := make(chan interface{}) + g.Go(func() error { + defer close(deleted) + + return r.db.DeleteStreamed(ctx, s.Entity(), deleteIds, deleted) + }) + g.Go(func() error { + var counter com.Counter + defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Deleted %d %s items", count, s.Name()) + } + }).Stop() + + for { + select { + case _, ok := <-deleted: + if !ok { + return nil + } + + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + } }) } @@ -103,18 +153,67 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti customvars, flatCustomvars, errs := v1.ExpandCustomvars(ctx, upsertEntities) com.ErrgroupReceive(g, errs) + + upsertedCustomvars := make(chan contracts.Entity) g.Go(func() error { + defer close(upsertedCustomvars) + stmt, placeholders := r.db.BuildUpsertStmt(cv.Entity()) // Updates must be executed in order, ensure this by using a semaphore with maximum 1. sem := semaphore.NewWeighted(1) - return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, customvars, nil) + return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, customvars, upsertedCustomvars) + }) + g.Go(func() error { + var counter com.Counter + defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Upserted %d %s items", count, cv.Name()) + } + }).Stop() + + for { + select { + case _, ok := <-upsertedCustomvars: + if !ok { + return nil + } + + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + } }) + upsertedFlatCustomvars := make(chan contracts.Entity) g.Go(func() error { + defer close(upsertedFlatCustomvars) + stmt, placeholders := r.db.BuildUpsertStmt(cvFlat.Entity()) // Updates must be executed in order, ensure this by using a semaphore with maximum 1. sem := semaphore.NewWeighted(1) - return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, flatCustomvars, nil) + return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, flatCustomvars, upsertedFlatCustomvars) + }) + g.Go(func() error { + var counter com.Counter + defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Upserted %d %s items", count, cvFlat.Name()) + } + }).Stop() + + for { + select { + case _, ok := <-upsertedFlatCustomvars: + if !ok { + return nil + } + + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + } }) g.Go(func() error { diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index f1d57d07..7f139673 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -8,6 +8,7 @@ import ( "github.com/icinga/icingadb/pkg/contracts" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" "go.uber.org/zap" @@ -20,11 +21,11 @@ import ( type Sync struct { db *DB redis *icingaredis.Client - logger *zap.SugaredLogger + logger *logging.Logger } // NewSync returns a new Sync. -func NewSync(db *DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync { +func NewSync(db *DB, redis *icingaredis.Client, logger *logging.Logger) *Sync { return &Sync{ db: db, redis: redis, @@ -39,9 +40,9 @@ func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, du key := "icinga:" + utils.Key(typeName, ':') startTime := time.Now() - logTicker := time.NewTicker(20 * time.Second) - loggedWaiting := false + logTicker := time.NewTicker(s.logger.Interval()) defer logTicker.Stop() + loggedWaiting := false for { select { @@ -70,8 +71,6 @@ func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, du // Sync synchronizes entities between Icinga DB and Redis created with the specified sync subject. // This function does not respect dump signals. For this, use SyncAfterDump. func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error { - s.logger.Infof("Syncing %s", utils.Key(utils.Name(subject.Entity()), ' ')) - g, ctx := errgroup.WithContext(ctx) desired, redisErrs := s.redis.YieldAll(ctx, subject) @@ -105,6 +104,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { // Create if len(delta.Create) > 0 { + s.logger.Infof("Inserting %d items of type %s", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) var entities <-chan contracts.Entity if delta.Subject.WithChecksum() { pairs, errs := s.redis.HMYield( @@ -131,7 +131,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { // Update if len(delta.Update) > 0 { - s.logger.Infof("Updating %d rows of type %s", len(delta.Update), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) + s.logger.Infof("Updating %d items of type %s", len(delta.Update), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) pairs, errs := s.redis.HMYield( ctx, fmt.Sprintf("icinga:%s", utils.Key(utils.Name(delta.Subject.Entity()), ':')), @@ -155,7 +155,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { // Delete if len(delta.Delete) > 0 { - s.logger.Infof("Deleting %d rows of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) + s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) g.Go(func() error { return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs()) }) @@ -166,9 +166,6 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { // SyncCustomvars synchronizes customvar and customvar_flat. func (s Sync) SyncCustomvars(ctx context.Context) error { - s.logger.Info("Syncing customvar") - s.logger.Info("Syncing customvar_flat") - e, ok := v1.EnvironmentFromContext(ctx) if !ok { return errors.New("can't get environment from context") diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go index 5df2ac93..3a7922b9 100644 --- a/pkg/icingaredis/client.go +++ b/pkg/icingaredis/client.go @@ -6,9 +6,10 @@ import ( "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" - "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "runtime" @@ -22,7 +23,7 @@ type Client struct { Options *Options - logger *zap.SugaredLogger + logger *logging.Logger } // Options define user configurable Redis options. @@ -56,7 +57,7 @@ func (o *Options) Validate() error { } // NewClient returns a new icingaredis.Client wrapper for a pre-existing *redis.Client. -func NewClient(client *redis.Client, logger *zap.SugaredLogger, options *Options) *Client { +func NewClient(client *redis.Client, logger *logging.Logger, options *Options) *Client { return &Client{Client: client, logger: logger, Options: options} } @@ -70,18 +71,13 @@ type HPair struct { func (c *Client) HYield(ctx context.Context, key string) (<-chan HPair, <-chan error) { pairs := make(chan HPair, c.Options.HScanCount) - c.logger.Infof("Syncing %s", key) - return pairs, com.WaitAsync(contracts.WaiterFunc(func() error { + var counter com.Counter + defer c.log(ctx, key, &counter).Stop() defer close(pairs) seen := make(map[string]struct{}) - var cnt uint64 - defer utils.Timed(time.Now(), func(elapsed time.Duration) { - c.logger.Infof("Fetched %d elements of %s in %s", cnt, key, elapsed) - }) - var cursor uint64 var err error var page []string @@ -107,7 +103,7 @@ func (c *Client) HYield(ctx context.Context, key string) (<-chan HPair, <-chan e Field: page[i], Value: page[i+1], }: - cnt++ + counter.Inc() case <-ctx.Done(): return ctx.Err() } @@ -127,6 +123,9 @@ func (c *Client) HMYield(ctx context.Context, key string, fields ...string) (<-c pairs := make(chan HPair) return pairs, com.WaitAsync(contracts.WaiterFunc(func() error { + var counter com.Counter + defer c.log(ctx, key, &counter).Stop() + g, ctx := errgroup.WithContext(ctx) defer func() { @@ -169,6 +168,7 @@ func (c *Client) HMYield(ctx context.Context, key string, fields ...string) (<-c Field: batch[i], Value: v.(string), }: + counter.Inc() case <-ctx.Done(): return ctx.Err() } @@ -220,3 +220,16 @@ func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-ch return desired, com.WaitAsync(g) } + +func (c *Client) log(ctx context.Context, key string, counter *com.Counter) periodic.Stopper { + return periodic.Start(ctx, c.logger.Interval(), func(tick periodic.Tick) { + // We may never get to progress logging here, + // as fetching should be completed before the interval expires, + // but if it does, it is good to have this log message. + if count := counter.Reset(); count > 0 { + c.logger.Debugf("Fetched %d items from %s", count, key) + } + }, periodic.OnStop(func(tick periodic.Tick) { + c.logger.Debugf("Finished fetching from %s with %d items in %s", key, counter.Total(), tick.Elapsed) + })) +} diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index fe7a966c..2fc08873 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -5,6 +5,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/internal" v1 "github.com/icinga/icingadb/pkg/icingaredis/v1" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" @@ -28,11 +29,11 @@ type Heartbeat struct { done chan struct{} errMu sync.Mutex err error - logger *zap.SugaredLogger + logger *logging.Logger } // NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop. -func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger) *Heartbeat { +func NewHeartbeat(ctx context.Context, client *Client, logger *logging.Logger) *Heartbeat { ctx, cancelCtx := context.WithCancel(ctx) heartbeat := &Heartbeat{ @@ -81,8 +82,6 @@ func (h *Heartbeat) Err() error { func (h *Heartbeat) controller(ctx context.Context) { defer close(h.done) - h.logger.Info("Waiting for Icinga 2 heartbeat") - messages := make(chan *HeartbeatMessage) defer close(messages) @@ -130,17 +129,17 @@ func (h *Heartbeat) controller(ctx context.Context) { if err != nil { return err } - h.logger.Infow("Received first Icinga 2 heartbeat", zap.String("environment", envId.String())) + h.logger.Infow("Received Icinga heartbeat", zap.String("environment", envId.String())) h.active = true } h.sendEvent(m) case <-time.After(timeout): if h.active { - h.logger.Warnw("Lost Icinga 2 heartbeat", zap.Duration("timeout", timeout)) + h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", timeout)) h.sendEvent(nil) h.active = false } else { - h.logger.Warn("Waiting for Icinga 2 heartbeat") + h.logger.Warn("Waiting for Icinga heartbeat") } case <-ctx.Done(): return ctx.Err() diff --git a/pkg/logging/logger.go b/pkg/logging/logger.go new file mode 100644 index 00000000..490445e1 --- /dev/null +++ b/pkg/logging/logger.go @@ -0,0 +1,26 @@ +package logging + +import ( + "go.uber.org/zap" + "time" +) + +// Logger wraps zap.SugaredLogger and +// allows to get the interval for periodic logging. +type Logger struct { + *zap.SugaredLogger + interval time.Duration +} + +// NewLogger returns a new Logger. +func NewLogger(base *zap.SugaredLogger, interval time.Duration) *Logger { + return &Logger{ + SugaredLogger: base, + interval: interval, + } +} + +// Interval returns the interval for periodic logging. +func (l *Logger) Interval() time.Duration { + return l.interval +} diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index 527b063a..e3106956 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -6,6 +6,7 @@ import ( "go.uber.org/zap/zapcore" "os" "sync" + "time" ) const ( @@ -36,15 +37,16 @@ type Options map[string]zapcore.Level // fall back on a default log level. // Logs either to the console or to systemd-journald. type Logging struct { - logger *zap.SugaredLogger + logger *Logger output string verbosity zap.AtomicLevel + interval time.Duration // coreFactory creates zapcore.Core based on the log level and the log output. coreFactory func(zap.AtomicLevel) zapcore.Core mu sync.Mutex - loggers map[string]*zap.SugaredLogger + loggers map[string]*Logger options Options } @@ -53,7 +55,7 @@ type Logging struct { // output where log messages are written to, // options having log levels for named child loggers // and returns a new Logging. -func NewLogging(name string, level zapcore.Level, output string, options Options) (*Logging, error) { +func NewLogging(name string, level zapcore.Level, output string, options Options, interval time.Duration) (*Logging, error) { verbosity := zap.NewAtomicLevelAt(level) var coreFactory func(zap.AtomicLevel) zapcore.Core @@ -72,14 +74,15 @@ func NewLogging(name string, level zapcore.Level, output string, options Options return nil, invalidOutput(output) } - logger := zap.New(coreFactory(verbosity)).Named(name).Sugar() + logger := NewLogger(zap.New(coreFactory(verbosity)).Named(name).Sugar(), interval) return &Logging{ logger: logger, output: output, verbosity: verbosity, + interval: interval, coreFactory: coreFactory, - loggers: map[string]*zap.SugaredLogger{}, + loggers: make(map[string]*Logger), options: options, }, nil @@ -88,7 +91,7 @@ func NewLogging(name string, level zapcore.Level, output string, options Options // GetChildLogger returns a named child logger. // Log levels for named child loggers are obtained from the logging options and, if not found, // set to the default log level. -func (l *Logging) GetChildLogger(name string) *zap.SugaredLogger { +func (l *Logging) GetChildLogger(name string) *Logger { l.mu.Lock() defer l.mu.Unlock() @@ -103,14 +106,14 @@ func (l *Logging) GetChildLogger(name string) *zap.SugaredLogger { verbosity = l.verbosity } - logger := zap.New(l.coreFactory(verbosity)).Named(name).Sugar() + logger := NewLogger(zap.New(l.coreFactory(verbosity)).Named(name).Sugar(), l.interval) l.loggers[name] = logger return logger } // GetLogger returns the default logger. -func (l *Logging) GetLogger() *zap.SugaredLogger { +func (l *Logging) GetLogger() *Logger { return l.logger } diff --git a/pkg/periodic/periodic.go b/pkg/periodic/periodic.go new file mode 100644 index 00000000..3f19ac86 --- /dev/null +++ b/pkg/periodic/periodic.go @@ -0,0 +1,99 @@ +package periodic + +import ( + "context" + "sync" + "time" +) + +// Option configures Start. +type Option interface { + apply(*periodic) +} + +// Stopper implements the Stop method, +// which stops a periodic task from Start(). +type Stopper interface { + Stop() // Stops a periodic task. +} + +// Tick is the value for periodic task callbacks that +// contains the time of the tick and +// the time elapsed since the start of the periodic task. +type Tick struct { + Elapsed time.Duration + Time time.Time +} + +// OnStop configures a callback that is executed when a periodic task is stopped or canceled. +func OnStop(f func(Tick)) Option { + return optionFunc(func(p *periodic) { + p.onStop = f + }) +} + +// Start starts a periodic task with a ticker at the specified interval, +// which executes the given callback after each tick. +// Call Stop() on the return value in order to stop the ticker and to release associated resources. +// The interval must be greater than zero. +func Start(ctx context.Context, interval time.Duration, callback func(Tick), options ...Option) Stopper { + t := &periodic{ + interval: interval, + callback: callback, + } + + for _, option := range options { + option.apply(t) + } + + ctx, cancelCtx := context.WithCancel(ctx) + ticker := time.NewTicker(t.interval) + start := time.Now() + + go func() { + defer ticker.Stop() + + for { + select { + case tickTime := <-ticker.C: + t.callback(Tick{ + Elapsed: tickTime.Sub(start), + Time: tickTime, + }) + case <-ctx.Done(): + if t.onStop != nil { + now := time.Now() + t.onStop(Tick{ + Elapsed: now.Sub(start), + Time: now, + }) + } + + return + } + } + }() + + return stoperFunc(func() { + t.stop.Do(cancelCtx) + }) +} + +type optionFunc func(*periodic) + +func (f optionFunc) apply(p *periodic) { + f(p) +} + +type stoperFunc func() + +func (f stoperFunc) Stop() { + f() +} + +type periodic struct { + interval time.Duration + callback func(Tick) + stop sync.Once + onStop func(Tick) +} diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index 29faf8a7..0ebae3c2 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -48,6 +48,10 @@ func WithBackoff( return } + if err = parentCtx.Err(); err != nil { + return + } + if settings.OnError != nil { settings.OnError(time.Since(start), attempt, err, prevErr) }