From dc7511cd2504c1da580bfe9b42363a6f0b6ea7c2 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 12 Oct 2021 12:02:46 +0200 Subject: [PATCH 01/26] Don't log if context is canceled --- cmd/icingadb/main.go | 11 +++++++++++ pkg/retry/retry.go | 4 ++++ 2 files changed, 15 insertions(+) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index aac043bf..c8931d00 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -211,6 +211,11 @@ func run() int { g.Go(func() error { configInitSync.Wait() + + if err := synctx.Err(); err != nil { + return err + } + logger.Info("Starting config runtime updates sync") return rt.Sync(synctx, v1.ConfigFactories, runtimeConfigUpdateStreams) @@ -218,7 +223,13 @@ func run() int { g.Go(func() error { stateInitSync.Wait() + + if err := synctx.Err(); err != nil { + return err + } + logger.Info("Starting state runtime updates sync") + return rt.Sync(synctx, v1.StateFactories, runtimeStateUpdateStreams) }) diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index 29faf8a7..0ebae3c2 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -48,6 +48,10 @@ func WithBackoff( return } + if err = parentCtx.Err(); err != nil { + return + } + if settings.OnError != nil { settings.OnError(time.Since(start), attempt, err, prevErr) } From 797dc1968c374e12707c930e77f644922638c189 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 26 Oct 2021 12:44:49 +0200 Subject: [PATCH 02/26] Comment out component-logging level pairs Otherwise they would default to the info level. --- config.yml.example | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/config.yml.example b/config.yml.example index a887359c..451228b4 100644 --- a/config.yml.example +++ b/config.yml.example @@ -21,12 +21,12 @@ logging: # Map of component-logging level pairs to define a different log level than the default value for each component. options: - database: - redis: - heartbeat: - high-availability: - config-sync: - history-sync: - runtime-updates: - overdue-sync: - dump-signals: +# database: +# redis: +# heartbeat: +# high-availability: +# config-sync: +# history-sync: +# runtime-updates: +# overdue-sync: +# dump-signals: From 8a03745273aed27c6e764abfb9af7b31a1353278 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 26 Oct 2021 12:48:15 +0200 Subject: [PATCH 03/26] Speak of Icinga heartbeat not Icinga 2 heartbeat --- pkg/icingaredis/heartbeat.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index fe7a966c..b2ebb42c 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -130,17 +130,17 @@ func (h *Heartbeat) controller(ctx context.Context) { if err != nil { return err } - h.logger.Infow("Received first Icinga 2 heartbeat", zap.String("environment", envId.String())) + h.logger.Infow("Received Icinga heartbeat", zap.String("environment", envId.String())) h.active = true } h.sendEvent(m) case <-time.After(timeout): if h.active { - h.logger.Warnw("Lost Icinga 2 heartbeat", zap.Duration("timeout", timeout)) + h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", timeout)) h.sendEvent(nil) h.active = false } else { - h.logger.Warn("Waiting for Icinga 2 heartbeat") + h.logger.Warn("Waiting for Icinga heartbeat") } case <-ctx.Done(): return ctx.Err() From b067ed2147c907f01e8863cbfaee4b23071f5c70 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 26 Oct 2021 12:49:27 +0200 Subject: [PATCH 04/26] Introduce Counter.Reset() --- pkg/com/counter.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/com/counter.go b/pkg/com/counter.go index 3692c63d..beea0e51 100644 --- a/pkg/com/counter.go +++ b/pkg/com/counter.go @@ -15,6 +15,11 @@ func (c *Counter) Inc() { c.Add(1) } +// Reset resets the counter to 0 and returns its previous value. +func (c *Counter) Reset() uint64 { + return atomic.SwapUint64(c.ptr(), 0) +} + // Val returns the counter value. func (c *Counter) Val() uint64 { return atomic.LoadUint64(c.ptr()) From 986e685ee0f734412808db75752826352cffde1a Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 26 Oct 2021 13:05:42 +0200 Subject: [PATCH 05/26] Allow to configure interval for periodic logging --- cmd/icingadb/main.go | 3 ++- config.yml.example | 6 ++++++ doc/03-Configuration.md | 6 ++++++ internal/config.go | 19 +++++++++++++++++++ pkg/config/logging.go | 8 ++++++++ 5 files changed, 41 insertions(+), 1 deletion(-) create mode 100644 internal/config.go diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index c8931d00..6c3dd8fa 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -3,6 +3,7 @@ package main import ( "context" "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/internal/command" "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/icingadb" @@ -45,7 +46,7 @@ func run() int { if err != nil { utils.Fatal(errors.Wrap(err, "can't configure logging")) } - + internal.SetLoggingInterval(cmd.Config.Logging.Interval) // When started by systemd, NOTIFY_SOCKET is set by systemd for Type=notify supervised services, which is the // default setting for the Icinga DB service. So we notify that Icinga DB finished starting up. _ = sdnotify.Ready() diff --git a/config.yml.example b/config.yml.example index 451228b4..d8940a86 100644 --- a/config.yml.example +++ b/config.yml.example @@ -19,6 +19,12 @@ logging: # If not set, logs to systemd-journald when running under systemd, otherwise stderr. output: + # Interval for periodic logging defined as duration string. + # A duration string is a sequence of decimal numbers and a unit suffix, such as "20s". + # Valid units are "ms", "s", "m", "h". + # Defaults to "20s". + interval: + # Map of component-logging level pairs to define a different log level than the default value for each component. options: # database: diff --git a/doc/03-Configuration.md b/doc/03-Configuration.md index 8248a407..746b2622 100644 --- a/doc/03-Configuration.md +++ b/doc/03-Configuration.md @@ -43,6 +43,7 @@ Option | Description -------------------------|----------------------------------------------- level | **Optional.** Specifies the default logging level. Can be set to `fatal`, `error`, `warning`, `info` or `debug`. Defaults to `info`. output | **Optional.** Configures the logging output. Can be set to `console` (stderr) or `systemd-journald`. If not set, logs to systemd-journald when running under systemd, otherwise stderr. +interval | **Optional.** Interval for periodic logging defined as [duration string](#duration-string). Defaults to `"20s"`. options | **Optional.** Map of component name to logging level in order to set a different logging level for each component instead of the default one. See [logging components](#logging-components) for details. ### Logging Components @@ -58,3 +59,8 @@ history-sync | Synchronization of history entries from Redis to MySQ runtime-updates | Runtime updates of config objects after the initial config synchronization. overdue-sync | Calculation and synchronization of the overdue status of checkables. dump-signals | Dump signals received from Icinga. + +### Duration String + +A duration string is a sequence of decimal numbers and a unit suffix, such as `"20s"`. +Valid units are `"ms"`, `"s"`, `"m"` and `"h"`. diff --git a/internal/config.go b/internal/config.go new file mode 100644 index 00000000..6476b1d2 --- /dev/null +++ b/internal/config.go @@ -0,0 +1,19 @@ +package internal + +import "time" + +// LoggingInterval returns the interval for periodic logging. +func LoggingInterval() time.Duration { + return c.LoggingInterval +} + +// SetLoggingInterval configures the interval for periodic logging. +func SetLoggingInterval(i time.Duration) { + c.LoggingInterval = i +} + +var c config + +type config struct { + LoggingInterval time.Duration +} diff --git a/pkg/config/logging.go b/pkg/config/logging.go index 5f4a7732..9ccd35e0 100644 --- a/pkg/config/logging.go +++ b/pkg/config/logging.go @@ -2,8 +2,10 @@ package config import ( "github.com/icinga/icingadb/pkg/logging" + "github.com/pkg/errors" "go.uber.org/zap/zapcore" "os" + "time" ) // Logging defines Logger configuration. @@ -11,6 +13,8 @@ type Logging struct { // zapcore.Level at 0 is for info level. Level zapcore.Level `yaml:"level" default:"0"` Output string `yaml:"output"` + // Interval for periodic logging. + Interval time.Duration `yaml:"interval" default:"20s"` logging.Options `yaml:"options"` } @@ -19,6 +23,10 @@ type Logging struct { // Also configures the log output if it is not configured: // systemd-journald is used when Icinga DB is running under systemd, otherwise stderr. func (l *Logging) Validate() error { + if l.Interval <= 0 { + return errors.New("periodic logging interval must be positive") + } + if l.Output == "" { if _, ok := os.LookupEnv("NOTIFY_SOCKET"); ok { // When started by systemd, NOTIFY_SOCKET is set by systemd for Type=notify supervised services, From c335a3c99c3024609376e85a93bcc80c7d7e6f62 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 26 Oct 2021 18:01:54 +0200 Subject: [PATCH 06/26] Introduce package periodic --- pkg/periodic/periodic.go | 99 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 pkg/periodic/periodic.go diff --git a/pkg/periodic/periodic.go b/pkg/periodic/periodic.go new file mode 100644 index 00000000..0370f0c8 --- /dev/null +++ b/pkg/periodic/periodic.go @@ -0,0 +1,99 @@ +package periodic + +import ( + "context" + "sync" + "time" +) + +// Option configures Start. +type Option interface { + apply(*periodic) +} + +// Stoper implements the Stop method, +// which stops a periodic task from Start(). +type Stoper interface { + Stop() // Stops a periodic task. +} + +// Tick is the value for periodic task callbacks that +// contains the time of the tick and +// the time elapsed since the start of the periodic task. +type Tick struct { + Elapsed time.Duration + Time time.Time +} + +// OnStop configures a callback that is executed when a periodic task is stopped or canceled. +func OnStop(f func(Tick)) Option { + return optionFunc(func(p *periodic) { + p.onStop = f + }) +} + +// Start starts a periodic task with a ticker at the specified interval, +// which executes the given callback after each tick. +// Call Stop() on the return value in order to stop the ticker and to release associated resources. +// The interval must be greater than zero. +func Start(ctx context.Context, interval time.Duration, callback func(Tick), options ...Option) Stoper { + t := &periodic{ + interval: interval, + callback: callback, + } + + for _, option := range options { + option.apply(t) + } + + ctx, cancelCtx := context.WithCancel(ctx) + ticker := time.NewTicker(t.interval) + start := time.Now() + + go func() { + defer ticker.Stop() + + for { + select { + case tickTime := <-ticker.C: + t.callback(Tick{ + Elapsed: tickTime.Sub(start), + Time: tickTime, + }) + case <-ctx.Done(): + if t.onStop != nil { + now := time.Now() + t.onStop(Tick{ + Elapsed: now.Sub(start), + Time: now, + }) + } + + return + } + } + }() + + return stoperFunc(func() { + t.stop.Do(cancelCtx) + }) +} + +type optionFunc func(*periodic) + +func (f optionFunc) apply(p *periodic) { + f(p) +} + +type stoperFunc func() + +func (f stoperFunc) Stop() { + f() +} + +type periodic struct { + interval time.Duration + callback func(Tick) + stop sync.Once + onStop func(Tick) +} From a6e02e7f3c167084d704224c5bb281c286427d3e Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 27 Oct 2021 10:36:04 +0200 Subject: [PATCH 07/26] Introduce Counter.Total() --- pkg/com/counter.go | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/pkg/com/counter.go b/pkg/com/counter.go index beea0e51..52f9f7ff 100644 --- a/pkg/com/counter.go +++ b/pkg/com/counter.go @@ -1,13 +1,20 @@ package com -import "sync/atomic" +import ( + "sync" + "sync/atomic" +) // Counter implements an atomic counter. -type Counter uint64 +type Counter struct { + value uint64 + mu sync.Mutex // Protects total. + total uint64 +} // Add adds the given delta to the counter. func (c *Counter) Add(delta uint64) { - atomic.AddUint64(c.ptr(), delta) + atomic.AddUint64(&c.value, delta) } // Inc increments the counter by one. @@ -16,15 +23,26 @@ func (c *Counter) Inc() { } // Reset resets the counter to 0 and returns its previous value. +// Does not reset the total value returned from Total. func (c *Counter) Reset() uint64 { - return atomic.SwapUint64(c.ptr(), 0) + c.mu.Lock() + defer c.mu.Unlock() + + v := atomic.SwapUint64(&c.value, 0) + c.total += v + + return v } -// Val returns the counter value. +// Total returns the total counter value. +func (c *Counter) Total() uint64 { + c.mu.Lock() + defer c.mu.Unlock() + + return c.total + c.Val() +} + +// Val returns the current counter value. func (c *Counter) Val() uint64 { - return atomic.LoadUint64(c.ptr()) -} - -func (c *Counter) ptr() *uint64 { - return (*uint64)(c) + return atomic.LoadUint64(&c.value) } From 5f1639aca22925eb0353673a0a9444811dfcfc15 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 26 Oct 2021 13:35:26 +0200 Subject: [PATCH 08/26] Use pkg periodic for Redis logs --- pkg/icingaredis/client.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go index 5df2ac93..59f53431 100644 --- a/pkg/icingaredis/client.go +++ b/pkg/icingaredis/client.go @@ -3,9 +3,11 @@ package icingaredis import ( "context" "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" "go.uber.org/zap" @@ -73,15 +75,12 @@ func (c *Client) HYield(ctx context.Context, key string) (<-chan HPair, <-chan e c.logger.Infof("Syncing %s", key) return pairs, com.WaitAsync(contracts.WaiterFunc(func() error { + var counter com.Counter + defer c.log(ctx, key, &counter).Stop() defer close(pairs) seen := make(map[string]struct{}) - var cnt uint64 - defer utils.Timed(time.Now(), func(elapsed time.Duration) { - c.logger.Infof("Fetched %d elements of %s in %s", cnt, key, elapsed) - }) - var cursor uint64 var err error var page []string @@ -107,7 +106,7 @@ func (c *Client) HYield(ctx context.Context, key string) (<-chan HPair, <-chan e Field: page[i], Value: page[i+1], }: - cnt++ + counter.Inc() case <-ctx.Done(): return ctx.Err() } @@ -127,6 +126,9 @@ func (c *Client) HMYield(ctx context.Context, key string, fields ...string) (<-c pairs := make(chan HPair) return pairs, com.WaitAsync(contracts.WaiterFunc(func() error { + var counter com.Counter + defer c.log(ctx, key, &counter).Stop() + g, ctx := errgroup.WithContext(ctx) defer func() { @@ -169,6 +171,7 @@ func (c *Client) HMYield(ctx context.Context, key string, fields ...string) (<-c Field: batch[i], Value: v.(string), }: + counter.Inc() case <-ctx.Done(): return ctx.Err() } @@ -220,3 +223,16 @@ func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-ch return desired, com.WaitAsync(g) } + +func (c *Client) log(ctx context.Context, key string, counter *com.Counter) periodic.Stoper { + return periodic.Start(ctx, internal.LoggingInterval(), func(tick periodic.Tick) { + // We may never get to progress logging here, + // as fetching should be completed before the interval expires, + // but if it does, it is good to have this log message. + if count := counter.Reset(); count > 0 { + c.logger.Debugf("Fetched %d items from %s", count, key) + } + }, periodic.OnStop(func(tick periodic.Tick) { + c.logger.Debugf("Finished fetching from %s with %d items in %s", key, counter.Total(), tick.Elapsed) + })) +} From addfabbde1dcd883d169fc9104fcaac3cb7f679e Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 26 Oct 2021 13:36:51 +0200 Subject: [PATCH 09/26] Speak of items instead of rows --- pkg/icingadb/sync.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index f1d57d07..0f9d14e2 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -131,7 +131,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { // Update if len(delta.Update) > 0 { - s.logger.Infof("Updating %d rows of type %s", len(delta.Update), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) + s.logger.Infof("Updating %d items of type %s", len(delta.Update), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) pairs, errs := s.redis.HMYield( ctx, fmt.Sprintf("icinga:%s", utils.Key(utils.Name(delta.Subject.Entity()), ':')), @@ -155,7 +155,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { // Delete if len(delta.Delete) > 0 { - s.logger.Infof("Deleting %d rows of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) + s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) g.Go(func() error { return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs()) }) From dbb64a0de38b93fef278bdc7a619374f30d43059 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 26 Oct 2021 13:37:29 +0200 Subject: [PATCH 10/26] Log how many items to insert --- pkg/icingadb/sync.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 0f9d14e2..3d1a6558 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -105,6 +105,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { // Create if len(delta.Create) > 0 { + s.logger.Infof("Inserting %d items of type %s", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) var entities <-chan contracts.Entity if delta.Subject.WithChecksum() { pairs, errs := s.redis.HMYield( From b10d038ba88f0d8e35e8b276293701a020da95fe Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 26 Oct 2021 13:37:45 +0200 Subject: [PATCH 11/26] Use internal.LoggingInterval() --- pkg/icingadb/sync.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 3d1a6558..0229bdae 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -3,6 +3,7 @@ package icingadb import ( "context" "fmt" + "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" @@ -39,9 +40,9 @@ func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, du key := "icinga:" + utils.Key(typeName, ':') startTime := time.Now() - logTicker := time.NewTicker(20 * time.Second) - loggedWaiting := false + logTicker := time.NewTicker(internal.LoggingInterval()) defer logTicker.Stop() + loggedWaiting := false for { select { From 12525c78726e8951e864b279e36398994484ed09 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 26 Oct 2021 14:50:39 +0200 Subject: [PATCH 12/26] Use pkg periodic for overdue sync logs --- pkg/icingadb/overdue/sync.go | 48 ++++++++++++++---------------------- 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/pkg/icingadb/overdue/sync.go b/pkg/icingadb/overdue/sync.go index aaa588f9..754d1738 100644 --- a/pkg/icingadb/overdue/sync.go +++ b/pkg/icingadb/overdue/sync.go @@ -6,16 +6,17 @@ import ( "github.com/go-redis/redis/v8" "github.com/google/uuid" "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingadb/v1/overdue" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/periodic" "github.com/pkg/errors" "go.uber.org/zap" "golang.org/x/sync/errgroup" "strconv" - "sync/atomic" "time" ) @@ -57,23 +58,19 @@ func (s Sync) Sync(ctx context.Context) error { } g, ctx := errgroup.WithContext(ctx) - hostCounter := new(uint64) - serviceCounter := new(uint64) + + var hostCounter com.Counter + defer s.log(ctx, "host", &hostCounter).Stop() + + var serviceCounter com.Counter + defer s.log(ctx, "service", &serviceCounter).Stop() g.Go(func() error { - return s.sync(ctx, "host", overdue.NewHostState, hostCounter) + return s.sync(ctx, "host", overdue.NewHostState, &hostCounter) }) g.Go(func() error { - return s.log(ctx, "host", hostCounter) - }) - - g.Go(func() error { - return s.sync(ctx, "service", overdue.NewServiceState, serviceCounter) - }) - - g.Go(func() error { - return s.log(ctx, "service", serviceCounter) + return s.sync(ctx, "service", overdue.NewServiceState, &serviceCounter) }) return g.Wait() @@ -124,21 +121,12 @@ func (s Sync) initSync(ctx context.Context, objectType string) error { } // log periodically logs sync's workload. -func (s Sync) log(ctx context.Context, objectType string, counter *uint64) error { - const period = 20 * time.Second - periodically := time.NewTicker(period) - defer periodically.Stop() - - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-periodically.C: - if count := atomic.SwapUint64(counter, 0); count > 0 { - s.logger.Infof("Synced %d %s overdue indicators in the last %s", count, objectType, period) - } +func (s Sync) log(ctx context.Context, objectType string, counter *com.Counter) periodic.Stoper { + return periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + s.logger.Infof("Synced %d %s overdue indicators", count, objectType) } - } + }) } // luaGetOverdues takes the following KEYS: @@ -176,7 +164,7 @@ return res `) // sync synchronizes Redis overdue sets from s.redis to s.db for objectType. -func (s Sync) sync(ctx context.Context, objectType string, factory factory, counter *uint64) error { +func (s Sync) sync(ctx context.Context, objectType string, factory factory, counter *com.Counter) error { s.logger.Infof("Syncing %s overdue indicators", objectType) keys := [3]string{"icinga:nextupdate:" + objectType, "icingadb:overdue:" + objectType, ""} @@ -231,7 +219,7 @@ func (s Sync) sync(ctx context.Context, objectType string, factory factory, coun // updateOverdue sets objectType_state#is_overdue for ids to overdue // and updates icingadb:overdue:objectType respectively. func (s Sync) updateOverdue( - ctx context.Context, objectType string, factory factory, counter *uint64, ids []interface{}, overdue bool, + ctx context.Context, objectType string, factory factory, counter *com.Counter, ids []interface{}, overdue bool, ) error { if len(ids) < 1 { return nil @@ -241,7 +229,7 @@ func (s Sync) updateOverdue( return errors.Wrap(err, "can't update overdue indicators") } - atomic.AddUint64(counter, uint64(len(ids))) + counter.Add(uint64(len(ids))) var op func(ctx context.Context, key string, members ...interface{}) *redis.IntCmd if overdue { From 4b239d69bb64a3acf896b60a083c208e545569c9 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 26 Oct 2021 14:52:55 +0200 Subject: [PATCH 13/26] Use debug instead of info for overdue refresh logs --- pkg/icingadb/overdue/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/icingadb/overdue/sync.go b/pkg/icingadb/overdue/sync.go index 754d1738..58cac790 100644 --- a/pkg/icingadb/overdue/sync.go +++ b/pkg/icingadb/overdue/sync.go @@ -109,7 +109,7 @@ func (s Sync) initSync(ctx context.Context, objectType string) error { }) if err == nil { - s.logger.Infof( + s.logger.Debugf( "Refreshing %d already synced %s overdue indicators took %s", len(rows), objectType, time.Since(start), ) From d6a28d7672330db05983a02d0c6f0ef33b5551fd Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 26 Oct 2021 14:54:30 +0200 Subject: [PATCH 14/26] Use pkg periodic for history sync logs --- pkg/icingadb/history/sync.go | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index 4953fc86..fc0a7c2b 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -10,6 +10,7 @@ import ( v1types "github.com/icinga/icingadb/pkg/icingadb/v1" v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/structify" "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" @@ -18,7 +19,6 @@ import ( "golang.org/x/sync/errgroup" "reflect" "sync" - "time" ) // Sync specifies the source and destination of a history sync. @@ -134,16 +134,15 @@ func (s Sync) readFromRedis(ctx context.Context, key string, output chan<- redis // deleteFromRedis is the last stage of the history sync pipeline. It receives history entries from the second to last // pipeline stage and then deletes the stream entry from Redis as all pipeline stages successfully processed the entry. func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redis.XMessage) error { - const logInterval = 20 * time.Second - - var count uint64 // Count of synced entries for periodic logging. - stream := "icinga:history:stream:" + key - - logTicker := time.NewTicker(logInterval) - defer logTicker.Stop() + var counter com.Counter + defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + s.logger.Infof("Synced %d %s history items", count, key) + } + }).Stop() bulks := com.BulkXMessages(ctx, input, s.redis.Options.HScanCount) - + stream := "icinga:history:stream:" + key for { select { case bulk := <-bulks: @@ -157,14 +156,7 @@ func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redi return icingaredis.WrapCmdErr(cmd) } - count += uint64(len(ids)) - - case <-logTicker.C: - if count > 0 { - s.logger.Infof("Inserted %d %s history entries in the last %s", count, key, logInterval) - count = 0 - } - + counter.Add(uint64(len(ids))) case <-ctx.Done(): return ctx.Err() } From 8e7564b2aa57afd237093ffdae08b08179469950 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 26 Oct 2021 15:39:55 +0200 Subject: [PATCH 15/26] Log which delta finished --- pkg/icingadb/delta.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go index b5bdcf2c..54e4903c 100644 --- a/pkg/icingadb/delta.go +++ b/pkg/icingadb/delta.go @@ -2,6 +2,7 @@ package icingadb import ( "context" + "fmt" "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/utils" @@ -101,7 +102,7 @@ func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan contract delta.Update = update delta.Delete = actual - delta.logger.Debugw("Delta finished", + delta.logger.Debugw(fmt.Sprintf("Finished %s delta", utils.Name(delta.Subject.Entity())), zap.String("subject", utils.Name(delta.Subject.Entity())), zap.Duration("time_total", time.Since(start)), zap.Duration("time_actual", endActual.Sub(start)), From bd23f17eda4c0d6155ba38b9193134968fb56351 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 26 Oct 2021 15:42:05 +0200 Subject: [PATCH 16/26] Use pkg periodic for database logs --- pkg/icingadb/db.go | 56 +++++++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 606e7ebf..5f061f3d 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -9,6 +9,7 @@ import ( "github.com/icinga/icingadb/pkg/backoff" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/retry" "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" @@ -209,16 +210,13 @@ func (db *DB) BuildWhere(subject interface{}) (string, int) { // The derived queries are executed in a separate goroutine with a weighting of 1 // and can be executed concurrently to the extent allowed by the semaphore passed in sem. func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}) error { - var cnt com.Counter + var counter com.Counter + defer db.log(ctx, query, &counter).Stop() + g, ctx := errgroup.WithContext(ctx) // Use context from group. bulk := com.Bulk(ctx, arg, count) - db.logger.Debugf("Executing %s", query) - defer utils.Timed(time.Now(), func(elapsed time.Duration) { - db.logger.Debugf("Executed %s with %d rows in %s", query, cnt.Val(), elapsed) - }) - g.Go(func() error { g, ctx := errgroup.WithContext(ctx) @@ -245,7 +243,7 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph return internal.CantPerformQuery(err, query) } - cnt.Add(uint64(len(b))) + counter.Add(uint64(len(b))) return nil }, @@ -274,15 +272,12 @@ func (db *DB) NamedBulkExec( ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity, succeeded chan<- contracts.Entity, ) error { - var cnt com.Counter + var counter com.Counter + defer db.log(ctx, query, &counter).Stop() + g, ctx := errgroup.WithContext(ctx) bulk := com.BulkEntities(ctx, arg, count) - db.logger.Debugf("Executing %s", query) - defer utils.Timed(time.Now(), func(elapsed time.Duration) { - db.logger.Debugf("Executed %s with %d rows in %s", query, cnt.Val(), elapsed) - }) - g.Go(func() error { for { select { @@ -302,13 +297,12 @@ func (db *DB) NamedBulkExec( return retry.WithBackoff( ctx, func(ctx context.Context) error { - db.logger.Debugf("Executing %s with %d rows..", query, len(b)) _, err := db.NamedExecContext(ctx, query, b) if err != nil { return internal.CantPerformQuery(err, query) } - cnt.Add(uint64(len(b))) + counter.Add(uint64(len(b))) if succeeded != nil { for _, row := range b { @@ -346,15 +340,12 @@ func (db *DB) NamedBulkExec( func (db *DB) NamedBulkExecTx( ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity, ) error { - var cnt com.Counter + var counter com.Counter + defer db.log(ctx, query, &counter).Stop() + g, ctx := errgroup.WithContext(ctx) bulk := com.BulkEntities(ctx, arg, count) - db.logger.Debugf("Executing %s", query) - defer utils.Timed(time.Now(), func(elapsed time.Duration) { - db.logger.Debugf("Executed %s with %d rows in %s", query, cnt.Val(), elapsed) - }) - g.Go(func() error { for { select { @@ -394,7 +385,7 @@ func (db *DB) NamedBulkExecTx( return errors.Wrap(err, "can't commit transaction") } - cnt.Add(uint64(len(b))) + counter.Add(uint64(len(b))) return nil }, @@ -428,18 +419,13 @@ func (db *DB) BatchSizeByPlaceholders(n int) int { // scans each resulting row into an entity returned by the factory function, // and streams them into a returned channel. func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, scope interface{}) (<-chan contracts.Entity, <-chan error) { - var cnt com.Counter entities := make(chan contracts.Entity, 1) g, ctx := errgroup.WithContext(ctx) - db.logger.Infof("Syncing %s", query) - g.Go(func() error { + var counter com.Counter + defer db.log(ctx, query, &counter).Stop() defer close(entities) - defer utils.Timed(time.Now(), func(elapsed time.Duration) { - v := factoryFunc() - db.logger.Infof("Fetched %d elements of %s in %s", cnt.Val(), utils.Name(v), elapsed) - }) rows, err := db.NamedQueryContext(ctx, query, scope) if err != nil { @@ -456,7 +442,7 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF select { case entities <- e: - cnt.Inc() + counter.Inc() case <-ctx.Done(): return ctx.Err() } @@ -549,6 +535,16 @@ func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted { } } +func (db *DB) log(ctx context.Context, query string, counter *com.Counter) periodic.Stoper { + return periodic.Start(ctx, internal.LoggingInterval(), func(tick periodic.Tick) { + if count := counter.Reset(); count > 0 { + db.logger.Debugf("Executed %q with %d rows", query, count) + } + }, periodic.OnStop(func(tick periodic.Tick) { + db.logger.Debugf("Finished executing %q with %d rows in %s", query, counter.Total(), tick.Elapsed) + })) +} + // IsRetryable checks whether the given error is retryable. func IsRetryable(err error) bool { if errors.Is(err, driver.ErrBadConn) { From 313e3cc7350463bc3715d728e3eaa58db948f5de Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 26 Oct 2021 15:43:49 +0200 Subject: [PATCH 17/26] Log time taken for the config and initial state sync --- cmd/icingadb/main.go | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 6c3dd8fa..74d55277 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -173,8 +173,8 @@ func run() int { return ods.Sync(synctx) }) + syncStart := time.Now() logger.Info("Starting config sync") - for _, factory := range v1.ConfigFactories { factory := factory @@ -185,7 +185,7 @@ func run() int { return s.SyncAfterDump(synctx, common.NewSyncSubject(factory), dump) }) } - + logger.Info("Starting initial state sync") for _, factory := range v1.StateFactories { factory := factory @@ -210,6 +210,34 @@ func run() int { return s.SyncCustomvars(synctx) }) + g.Go(func() error { + configInitSync.Wait() + + elapsed := time.Since(syncStart) + logger := logs.GetChildLogger("config-sync") + if synctx.Err() == nil { + logger.Infof("Finished config sync in %s", elapsed) + } else { + logger.Warnf("Aborted config sync after %s", elapsed) + } + + return nil + }) + + g.Go(func() error { + stateInitSync.Wait() + + elapsed := time.Since(syncStart) + logger := logs.GetChildLogger("config-sync") + if synctx.Err() == nil { + logger.Infof("Finished initial state sync in %s", elapsed) + } else { + logger.Warnf("Aborted initial state sync after %s", elapsed) + } + + return nil + }) + g.Go(func() error { configInitSync.Wait() From 8ce917d45aa6af923bb38d9ef5b9ad98f8ce2acb Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 27 Oct 2021 10:59:26 +0200 Subject: [PATCH 18/26] Remove waiting for heartbeat message If a heartbeat is pending, we log it every 60 seconds anyway. --- pkg/icingaredis/heartbeat.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index b2ebb42c..6eedbc19 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -81,8 +81,6 @@ func (h *Heartbeat) Err() error { func (h *Heartbeat) controller(ctx context.Context) { defer close(h.done) - h.logger.Info("Waiting for Icinga 2 heartbeat") - messages := make(chan *HeartbeatMessage) defer close(messages) From 43bcd2bbee3363936a968543d74fcd7534e65456 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 27 Oct 2021 11:01:00 +0200 Subject: [PATCH 19/26] Remove syncing $redisKey log message This info message just pollutes the logs and for debugging we log the execution anyway. --- pkg/icingaredis/client.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go index 59f53431..6f6b9cec 100644 --- a/pkg/icingaredis/client.go +++ b/pkg/icingaredis/client.go @@ -72,8 +72,6 @@ type HPair struct { func (c *Client) HYield(ctx context.Context, key string) (<-chan HPair, <-chan error) { pairs := make(chan HPair, c.Options.HScanCount) - c.logger.Infof("Syncing %s", key) - return pairs, com.WaitAsync(contracts.WaiterFunc(func() error { var counter com.Counter defer c.log(ctx, key, &counter).Stop() From 5fd4d35907a64fed9e4169e2c54b4d806562b009 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 27 Oct 2021 11:04:00 +0200 Subject: [PATCH 20/26] Remove syncing $subject log message This info message just pollutes the logs and for debugging we log the execution anyway --- pkg/icingadb/sync.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 0229bdae..7c295978 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -71,8 +71,6 @@ func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, du // Sync synchronizes entities between Icinga DB and Redis created with the specified sync subject. // This function does not respect dump signals. For this, use SyncAfterDump. func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error { - s.logger.Infof("Syncing %s", utils.Key(utils.Name(subject.Entity()), ' ')) - g, ctx := errgroup.WithContext(ctx) desired, redisErrs := s.redis.YieldAll(ctx, subject) @@ -168,9 +166,6 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { // SyncCustomvars synchronizes customvar and customvar_flat. func (s Sync) SyncCustomvars(ctx context.Context) error { - s.logger.Info("Syncing customvar") - s.logger.Info("Syncing customvar_flat") - e, ok := v1.EnvironmentFromContext(ctx) if !ok { return errors.New("can't get environment from context") From 2d4b5419af2271431624dcbddfbbc0b54344305c Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 27 Oct 2021 11:07:06 +0200 Subject: [PATCH 21/26] Log which history sync started --- pkg/icingadb/history/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index fc0a7c2b..5662f730 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -45,7 +45,7 @@ func (s Sync) Sync(ctx context.Context) error { key := key pipeline := pipeline - s.logger.Debugw("Starting history sync", zap.String("type", key)) + s.logger.Debugf("Starting %s history sync", key) // The pipeline consists of n+2 stages connected sequentially using n+1 channels of type chan redis.XMessage, // where n = len(pipeline), i.e. the number of actual sync stages. So the resulting pipeline looks like this: From 62327739430b94dad4feabc5aeff0ea0ae3c45dd Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 27 Oct 2021 11:11:48 +0200 Subject: [PATCH 22/26] Use debug instead of info for some log messages These log messages are not relevant for the info level. --- cmd/icingadb/main.go | 2 +- pkg/icingadb/overdue/sync.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 74d55277..7b74927b 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -150,7 +150,7 @@ func run() int { dump := icingadb.NewDumpSignals(rc, logs.GetChildLogger("dump-signals")) g.Go(func() error { - logger.Info("Staring config dump signal handling") + logger.Debug("Staring config dump signal handling") return dump.Listen(synctx) }) diff --git a/pkg/icingadb/overdue/sync.go b/pkg/icingadb/overdue/sync.go index 58cac790..35cae63d 100644 --- a/pkg/icingadb/overdue/sync.go +++ b/pkg/icingadb/overdue/sync.go @@ -78,7 +78,7 @@ func (s Sync) Sync(ctx context.Context) error { // initSync initializes icingadb:overdue:objectType from the database. func (s Sync) initSync(ctx context.Context, objectType string) error { - s.logger.Infof("Refreshing already synced %s overdue indicators", objectType) + s.logger.Debugf("Refreshing already synced %s overdue indicators", objectType) start := time.Now() var rows []v1.IdMeta @@ -165,7 +165,7 @@ return res // sync synchronizes Redis overdue sets from s.redis to s.db for objectType. func (s Sync) sync(ctx context.Context, objectType string, factory factory, counter *com.Counter) error { - s.logger.Infof("Syncing %s overdue indicators", objectType) + s.logger.Debugf("Syncing %s overdue indicators", objectType) keys := [3]string{"icinga:nextupdate:" + objectType, "icingadb:overdue:" + objectType, ""} if rand, err := uuid.NewRandom(); err == nil { From 8ec157e39b3e4435d6909a044a9aacd09c75e791 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 3 Nov 2021 09:06:54 +0100 Subject: [PATCH 23/26] Add periodic logging for runtime updates --- pkg/icingadb/db.go | 20 ++++-- pkg/icingadb/runtime_updates.go | 108 ++++++++++++++++++++++++++++++-- 2 files changed, 120 insertions(+), 8 deletions(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 5f061f3d..5b46964d 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -209,7 +209,8 @@ func (db *DB) BuildWhere(subject interface{}) (string, int) { // derives and expands a query and executes it with this set of arguments until the arg stream has been processed. // The derived queries are executed in a separate goroutine with a weighting of 1 // and can be executed concurrently to the extent allowed by the semaphore passed in sem. -func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}) error { +// Arguments for which the query ran successfully will be streamed on the succeeded channel. +func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}, succeeded chan<- interface{}) error { var counter com.Counter defer db.log(ctx, query, &counter).Stop() @@ -245,6 +246,16 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph counter.Add(uint64(len(b))) + if succeeded != nil { + for _, row := range b { + select { + case <-ctx.Done(): + return ctx.Err() + case succeeded <- row: + } + } + } + return nil }, IsRetryable, @@ -505,9 +516,10 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Enti // The delete statement is created using BuildDeleteStmt with the passed entityType. // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and // concurrency is controlled via Options.MaxConnectionsPerTable. -func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}) error { +// IDs for which the query ran successfully will be streamed on the succeeded channel. +func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}, succeeded chan<- interface{}) error { sem := db.GetSemaphoreForTable(utils.TableName(entityType)) - return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids) + return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, succeeded) } // Delete creates a channel from the specified ids and @@ -519,7 +531,7 @@ func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []int } close(idsCh) - return db.DeleteStreamed(ctx, entityType, idsCh) + return db.DeleteStreamed(ctx, entityType, idsCh, nil) } func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted { diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 89f90efe..8dca1301 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/structify" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" @@ -75,14 +77,63 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti r.logger.Debugf("Syncing runtime updates of %s", s.Name()) g.Go(structifyStream(ctx, updateMessages, upsertEntities, deleteIds, structify.MakeMapStructifier(reflect.TypeOf(s.Entity()).Elem(), "json"))) + upserted := make(chan contracts.Entity) g.Go(func() error { + defer close(upserted) + stmt, placeholders := r.db.BuildUpsertStmt(s.Entity()) // Updates must be executed in order, ensure this by using a semaphore with maximum 1. sem := semaphore.NewWeighted(1) - return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, nil) + return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, upserted) }) g.Go(func() error { - return r.db.DeleteStreamed(ctx, s.Entity(), deleteIds) + var counter com.Counter + defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Upserted %d %s items", count, s.Name()) + } + }).Stop() + + for { + select { + case _, ok := <-upserted: + if !ok { + return nil + } + + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + deleted := make(chan interface{}) + g.Go(func() error { + defer close(deleted) + + return r.db.DeleteStreamed(ctx, s.Entity(), deleteIds, deleted) + }) + g.Go(func() error { + var counter com.Counter + defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Deleted %d %s items", count, s.Name()) + } + }).Stop() + + for { + select { + case _, ok := <-deleted: + if !ok { + return nil + } + + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + } }) } @@ -103,18 +154,67 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti customvars, flatCustomvars, errs := v1.ExpandCustomvars(ctx, upsertEntities) com.ErrgroupReceive(g, errs) + + upsertedCustomvars := make(chan contracts.Entity) g.Go(func() error { + defer close(upsertedCustomvars) + stmt, placeholders := r.db.BuildUpsertStmt(cv.Entity()) // Updates must be executed in order, ensure this by using a semaphore with maximum 1. sem := semaphore.NewWeighted(1) - return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, customvars, nil) + return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, customvars, upsertedCustomvars) + }) + g.Go(func() error { + var counter com.Counter + defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Upserted %d %s items", count, cv.Name()) + } + }).Stop() + + for { + select { + case _, ok := <-upsertedCustomvars: + if !ok { + return nil + } + + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + } }) + upsertedFlatCustomvars := make(chan contracts.Entity) g.Go(func() error { + defer close(upsertedFlatCustomvars) + stmt, placeholders := r.db.BuildUpsertStmt(cvFlat.Entity()) // Updates must be executed in order, ensure this by using a semaphore with maximum 1. sem := semaphore.NewWeighted(1) - return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, flatCustomvars, nil) + return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, flatCustomvars, upsertedFlatCustomvars) + }) + g.Go(func() error { + var counter com.Counter + defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Upserted %d %s items", count, cvFlat.Name()) + } + }).Stop() + + for { + select { + case _, ok := <-upsertedFlatCustomvars: + if !ok { + return nil + } + + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + } }) g.Go(func() error { From ccda48234ec8bc31c2da2c601d47eb6e508bbb12 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 3 Nov 2021 11:12:26 +0100 Subject: [PATCH 24/26] Use custom logger for accessing the interval for periodic logging --- cmd/icingadb/main.go | 7 +++---- internal/command/command.go | 6 +++--- internal/config.go | 19 ------------------- pkg/config/database.go | 4 ++-- pkg/config/redis.go | 5 +++-- pkg/driver/driver.go | 5 +++-- pkg/icingadb/db.go | 8 ++++---- pkg/icingadb/delta.go | 5 +++-- pkg/icingadb/dump_signals.go | 5 +++-- pkg/icingadb/ha.go | 5 +++-- pkg/icingadb/history/sync.go | 8 ++++---- pkg/icingadb/overdue/sync.go | 8 ++++---- pkg/icingadb/runtime_updates.go | 15 +++++++-------- pkg/icingadb/sync.go | 8 ++++---- pkg/icingaredis/client.go | 9 ++++----- pkg/icingaredis/heartbeat.go | 5 +++-- pkg/logging/logger.go | 26 ++++++++++++++++++++++++++ pkg/logging/logging.go | 19 +++++++++++-------- 18 files changed, 90 insertions(+), 77 deletions(-) delete mode 100644 internal/config.go create mode 100644 pkg/logging/logger.go diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 7b74927b..f0e4420e 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -3,7 +3,6 @@ package main import ( "context" "github.com/go-redis/redis/v8" - "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/internal/command" "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/icingadb" @@ -42,11 +41,11 @@ func run() int { cmd.Config.Logging.Level, cmd.Config.Logging.Output, cmd.Config.Logging.Options, + cmd.Config.Logging.Interval, ) if err != nil { utils.Fatal(errors.Wrap(err, "can't configure logging")) } - internal.SetLoggingInterval(cmd.Config.Logging.Interval) // When started by systemd, NOTIFY_SOCKET is set by systemd for Type=notify supervised services, which is the // default setting for the Icinga DB service. So we notify that Icinga DB finished starting up. _ = sdnotify.Ready() @@ -315,7 +314,7 @@ func checkDbSchema(ctx context.Context, db *icingadb.DB) error { } // monitorRedisSchema monitors rc's icinga:schema version validity. -func monitorRedisSchema(logger *zap.SugaredLogger, rc *icingaredis.Client, pos string) { +func monitorRedisSchema(logger *logging.Logger, rc *icingaredis.Client, pos string) { for { var err error pos, err = checkRedisSchema(logger, rc, pos) @@ -327,7 +326,7 @@ func monitorRedisSchema(logger *zap.SugaredLogger, rc *icingaredis.Client, pos s } // checkRedisSchema verifies rc's icinga:schema version. -func checkRedisSchema(logger *zap.SugaredLogger, rc *icingaredis.Client, pos string) (newPos string, err error) { +func checkRedisSchema(logger *logging.Logger, rc *icingaredis.Client, pos string) (newPos string, err error) { if pos == "0-0" { defer time.AfterFunc(3*time.Second, func() { logger.Info("Waiting for current Redis schema version") }).Stop() } else { diff --git a/internal/command/command.go b/internal/command/command.go index f9c35816..765559b0 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -6,10 +6,10 @@ import ( "github.com/icinga/icingadb/pkg/config" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" goflags "github.com/jessevdk/go-flags" "github.com/pkg/errors" - "go.uber.org/zap" "os" ) @@ -48,11 +48,11 @@ func New() *Command { } // Database creates and returns a new icingadb.DB connection from config.Config. -func (c Command) Database(l *zap.SugaredLogger) (*icingadb.DB, error) { +func (c Command) Database(l *logging.Logger) (*icingadb.DB, error) { return c.Config.Database.Open(l) } // Redis creates and returns a new icingaredis.Client connection from config.Config. -func (c Command) Redis(l *zap.SugaredLogger) (*icingaredis.Client, error) { +func (c Command) Redis(l *logging.Logger) (*icingaredis.Client, error) { return c.Config.Redis.NewClient(l) } diff --git a/internal/config.go b/internal/config.go deleted file mode 100644 index 6476b1d2..00000000 --- a/internal/config.go +++ /dev/null @@ -1,19 +0,0 @@ -package internal - -import "time" - -// LoggingInterval returns the interval for periodic logging. -func LoggingInterval() time.Duration { - return c.LoggingInterval -} - -// SetLoggingInterval configures the interval for periodic logging. -func SetLoggingInterval(i time.Duration) { - c.LoggingInterval = i -} - -var c config - -type config struct { - LoggingInterval time.Duration -} diff --git a/pkg/config/database.go b/pkg/config/database.go index afc2ce6e..f2981da1 100644 --- a/pkg/config/database.go +++ b/pkg/config/database.go @@ -5,11 +5,11 @@ import ( "github.com/go-sql-driver/mysql" "github.com/icinga/icingadb/pkg/driver" "github.com/icinga/icingadb/pkg/icingadb" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx/reflectx" "github.com/pkg/errors" - "go.uber.org/zap" "net" "sync" "time" @@ -30,7 +30,7 @@ type Database struct { // Open prepares the DSN string and driver configuration, // calls sqlx.Open, but returns *icingadb.DB. -func (d *Database) Open(logger *zap.SugaredLogger) (*icingadb.DB, error) { +func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { registerDriverOnce.Do(func() { driver.Register(logger) }) diff --git a/pkg/config/redis.go b/pkg/config/redis.go index c99ee2d1..259d87f8 100644 --- a/pkg/config/redis.go +++ b/pkg/config/redis.go @@ -6,6 +6,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/pkg/backoff" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/retry" "github.com/pkg/errors" "go.uber.org/zap" @@ -27,7 +28,7 @@ type ctxDialerFunc = func(ctx context.Context, network, addr string) (net.Conn, // NewClient prepares Redis client configuration, // calls redis.NewClient, but returns *icingaredis.Client. -func (r *Redis) NewClient(logger *zap.SugaredLogger) (*icingaredis.Client, error) { +func (r *Redis) NewClient(logger *logging.Logger) (*icingaredis.Client, error) { tlsConfig, err := r.TLS.MakeConfig(r.Address) if err != nil { return nil, err @@ -59,7 +60,7 @@ func (r *Redis) NewClient(logger *zap.SugaredLogger) (*icingaredis.Client, error } // dialWithLogging returns a Redis Dialer with logging capabilities. -func dialWithLogging(dialer ctxDialerFunc, logger *zap.SugaredLogger) ctxDialerFunc { +func dialWithLogging(dialer ctxDialerFunc, logger *logging.Logger) ctxDialerFunc { // dial behaves like net.Dialer#DialContext, but re-tries on syscall.ECONNREFUSED. return func(ctx context.Context, network, addr string) (conn net.Conn, err error) { err = retry.WithBackoff( diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index 462ab294..f246861e 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -6,6 +6,7 @@ import ( "database/sql/driver" "github.com/go-sql-driver/mysql" "github.com/icinga/icingadb/pkg/backoff" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/retry" "github.com/pkg/errors" "go.uber.org/zap" @@ -58,7 +59,7 @@ func (c RetryConnector) Driver() driver.Driver { // Driver wraps a driver.Driver that also must implement driver.DriverContext with logging capabilities and provides our RetryConnector. type Driver struct { ctxDriver - Logger *zap.SugaredLogger + Logger *logging.Logger } // OpenConnector implements the DriverContext interface. @@ -75,7 +76,7 @@ func (d Driver) OpenConnector(name string) (driver.Connector, error) { } // Register makes our database Driver available under the name "icingadb-mysql". -func Register(logger *zap.SugaredLogger) { +func Register(logger *logging.Logger) { sql.Register("icingadb-mysql", &Driver{ctxDriver: &mysql.MySQLDriver{}, Logger: logger}) _ = mysql.SetLogger(mysqlLogger(func(v ...interface{}) { logger.Debug(v...) })) } diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 5b46964d..98c4d809 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -9,12 +9,12 @@ import ( "github.com/icinga/icingadb/pkg/backoff" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/retry" "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "github.com/pkg/errors" - "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "reflect" @@ -30,7 +30,7 @@ type DB struct { Options *Options - logger *zap.SugaredLogger + logger *logging.Logger tableSemaphores map[string]*semaphore.Weighted tableSemaphoresMu sync.Mutex } @@ -76,7 +76,7 @@ func (o *Options) Validate() error { } // NewDb returns a new icingadb.DB wrapper for a pre-existing *sqlx.DB. -func NewDb(db *sqlx.DB, logger *zap.SugaredLogger, options *Options) *DB { +func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB { return &DB{ DB: db, logger: logger, @@ -548,7 +548,7 @@ func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted { } func (db *DB) log(ctx context.Context, query string, counter *com.Counter) periodic.Stoper { - return periodic.Start(ctx, internal.LoggingInterval(), func(tick periodic.Tick) { + return periodic.Start(ctx, db.logger.Interval(), func(tick periodic.Tick) { if count := counter.Reset(); count > 0 { db.logger.Debugf("Executed %q with %d rows", query, count) } diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go index 54e4903c..4f6d0989 100644 --- a/pkg/icingadb/delta.go +++ b/pkg/icingadb/delta.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" "go.uber.org/zap" "time" @@ -17,12 +18,12 @@ type Delta struct { Delete EntitiesById Subject *common.SyncSubject done chan error - logger *zap.SugaredLogger + logger *logging.Logger } // NewDelta creates a new Delta and starts calculating it. The caller must ensure // that no duplicate entities are sent to the same stream. -func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *zap.SugaredLogger) *Delta { +func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *logging.Logger) *Delta { delta := &Delta{ Subject: subject, done: make(chan error, 1), diff --git a/pkg/icingadb/dump_signals.go b/pkg/icingadb/dump_signals.go index cfc82c60..cb0bd3fd 100644 --- a/pkg/icingadb/dump_signals.go +++ b/pkg/icingadb/dump_signals.go @@ -4,6 +4,7 @@ import ( "context" "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" "github.com/pkg/errors" "go.uber.org/zap" "sync" @@ -13,7 +14,7 @@ import ( // Dump-done signals are passed on via Done channels, while InProgress must be checked for dump-wip signals. type DumpSignals struct { redis *icingaredis.Client - logger *zap.SugaredLogger + logger *logging.Logger mutex sync.Mutex doneCh map[string]chan struct{} allDoneCh chan struct{} @@ -21,7 +22,7 @@ type DumpSignals struct { } // NewDumpSignals returns new DumpSignals. -func NewDumpSignals(redis *icingaredis.Client, logger *zap.SugaredLogger) *DumpSignals { +func NewDumpSignals(redis *icingaredis.Client, logger *logging.Logger) *DumpSignals { return &DumpSignals{ redis: redis, logger: logger, diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 30d31222..00a0bf68 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -11,6 +11,7 @@ import ( v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" @@ -30,7 +31,7 @@ type HA struct { environmentMu sync.Mutex environment *v1.Environment heartbeat *icingaredis.Heartbeat - logger *zap.SugaredLogger + logger *logging.Logger responsible bool handover chan struct{} takeover chan struct{} @@ -41,7 +42,7 @@ type HA struct { } // NewHA returns a new HA and starts the controller loop. -func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger *zap.SugaredLogger) *HA { +func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger *logging.Logger) *HA { ctx, cancelCtx := context.WithCancel(ctx) instanceId := uuid.New() diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index 5662f730..1b02ea80 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -10,12 +10,12 @@ import ( v1types "github.com/icinga/icingadb/pkg/icingadb/v1" v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/structify" "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" - "go.uber.org/zap" "golang.org/x/sync/errgroup" "reflect" "sync" @@ -25,11 +25,11 @@ import ( type Sync struct { db *icingadb.DB redis *icingaredis.Client - logger *zap.SugaredLogger + logger *logging.Logger } // NewSync creates a new Sync. -func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync { +func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) *Sync { return &Sync{ db: db, redis: redis, @@ -135,7 +135,7 @@ func (s Sync) readFromRedis(ctx context.Context, key string, output chan<- redis // pipeline stage and then deletes the stream entry from Redis as all pipeline stages successfully processed the entry. func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redis.XMessage) error { var counter com.Counter - defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) { + defer periodic.Start(ctx, s.logger.Interval(), func(_ periodic.Tick) { if count := counter.Reset(); count > 0 { s.logger.Infof("Synced %d %s history items", count, key) } diff --git a/pkg/icingadb/overdue/sync.go b/pkg/icingadb/overdue/sync.go index 35cae63d..dcd15eb0 100644 --- a/pkg/icingadb/overdue/sync.go +++ b/pkg/icingadb/overdue/sync.go @@ -12,9 +12,9 @@ import ( "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingadb/v1/overdue" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/pkg/errors" - "go.uber.org/zap" "golang.org/x/sync/errgroup" "strconv" "time" @@ -24,11 +24,11 @@ import ( type Sync struct { db *icingadb.DB redis *icingaredis.Client - logger *zap.SugaredLogger + logger *logging.Logger } // NewSync creates a new Sync. -func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync { +func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) *Sync { return &Sync{ db: db, redis: redis, @@ -122,7 +122,7 @@ func (s Sync) initSync(ctx context.Context, objectType string) error { // log periodically logs sync's workload. func (s Sync) log(ctx context.Context, objectType string, counter *com.Counter) periodic.Stoper { - return periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) { + return periodic.Start(ctx, s.logger.Interval(), func(_ periodic.Tick) { if count := counter.Reset(); count > 0 { s.logger.Infof("Synced %d %s overdue indicators", count, objectType) } diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 8dca1301..a953b39e 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -4,17 +4,16 @@ import ( "context" "fmt" "github.com/go-redis/redis/v8" - "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/structify" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" - "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "reflect" @@ -25,11 +24,11 @@ import ( type RuntimeUpdates struct { db *DB redis *icingaredis.Client - logger *zap.SugaredLogger + logger *logging.Logger } // NewRuntimeUpdates creates a new RuntimeUpdates. -func NewRuntimeUpdates(db *DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *RuntimeUpdates { +func NewRuntimeUpdates(db *DB, redis *icingaredis.Client, logger *logging.Logger) *RuntimeUpdates { return &RuntimeUpdates{ db: db, redis: redis, @@ -88,7 +87,7 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti }) g.Go(func() error { var counter com.Counter - defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) { + defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { if count := counter.Reset(); count > 0 { r.logger.Infof("Upserted %d %s items", count, s.Name()) } @@ -116,7 +115,7 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti }) g.Go(func() error { var counter com.Counter - defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) { + defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { if count := counter.Reset(); count > 0 { r.logger.Infof("Deleted %d %s items", count, s.Name()) } @@ -166,7 +165,7 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti }) g.Go(func() error { var counter com.Counter - defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) { + defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { if count := counter.Reset(); count > 0 { r.logger.Infof("Upserted %d %s items", count, cv.Name()) } @@ -197,7 +196,7 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti }) g.Go(func() error { var counter com.Counter - defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) { + defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { if count := counter.Reset(); count > 0 { r.logger.Infof("Upserted %d %s items", count, cvFlat.Name()) } diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 7c295978..7f139673 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -3,12 +3,12 @@ package icingadb import ( "context" "fmt" - "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" "go.uber.org/zap" @@ -21,11 +21,11 @@ import ( type Sync struct { db *DB redis *icingaredis.Client - logger *zap.SugaredLogger + logger *logging.Logger } // NewSync returns a new Sync. -func NewSync(db *DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync { +func NewSync(db *DB, redis *icingaredis.Client, logger *logging.Logger) *Sync { return &Sync{ db: db, redis: redis, @@ -40,7 +40,7 @@ func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, du key := "icinga:" + utils.Key(typeName, ':') startTime := time.Now() - logTicker := time.NewTicker(internal.LoggingInterval()) + logTicker := time.NewTicker(s.logger.Interval()) defer logTicker.Stop() loggedWaiting := false diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go index 6f6b9cec..fdfb8ea1 100644 --- a/pkg/icingaredis/client.go +++ b/pkg/icingaredis/client.go @@ -3,14 +3,13 @@ package icingaredis import ( "context" "github.com/go-redis/redis/v8" - "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" - "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "runtime" @@ -24,7 +23,7 @@ type Client struct { Options *Options - logger *zap.SugaredLogger + logger *logging.Logger } // Options define user configurable Redis options. @@ -58,7 +57,7 @@ func (o *Options) Validate() error { } // NewClient returns a new icingaredis.Client wrapper for a pre-existing *redis.Client. -func NewClient(client *redis.Client, logger *zap.SugaredLogger, options *Options) *Client { +func NewClient(client *redis.Client, logger *logging.Logger, options *Options) *Client { return &Client{Client: client, logger: logger, Options: options} } @@ -223,7 +222,7 @@ func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-ch } func (c *Client) log(ctx context.Context, key string, counter *com.Counter) periodic.Stoper { - return periodic.Start(ctx, internal.LoggingInterval(), func(tick periodic.Tick) { + return periodic.Start(ctx, c.logger.Interval(), func(tick periodic.Tick) { // We may never get to progress logging here, // as fetching should be completed before the interval expires, // but if it does, it is good to have this log message. diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index 6eedbc19..2fc08873 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -5,6 +5,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/internal" v1 "github.com/icinga/icingadb/pkg/icingaredis/v1" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" @@ -28,11 +29,11 @@ type Heartbeat struct { done chan struct{} errMu sync.Mutex err error - logger *zap.SugaredLogger + logger *logging.Logger } // NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop. -func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger) *Heartbeat { +func NewHeartbeat(ctx context.Context, client *Client, logger *logging.Logger) *Heartbeat { ctx, cancelCtx := context.WithCancel(ctx) heartbeat := &Heartbeat{ diff --git a/pkg/logging/logger.go b/pkg/logging/logger.go new file mode 100644 index 00000000..490445e1 --- /dev/null +++ b/pkg/logging/logger.go @@ -0,0 +1,26 @@ +package logging + +import ( + "go.uber.org/zap" + "time" +) + +// Logger wraps zap.SugaredLogger and +// allows to get the interval for periodic logging. +type Logger struct { + *zap.SugaredLogger + interval time.Duration +} + +// NewLogger returns a new Logger. +func NewLogger(base *zap.SugaredLogger, interval time.Duration) *Logger { + return &Logger{ + SugaredLogger: base, + interval: interval, + } +} + +// Interval returns the interval for periodic logging. +func (l *Logger) Interval() time.Duration { + return l.interval +} diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index 527b063a..e3106956 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -6,6 +6,7 @@ import ( "go.uber.org/zap/zapcore" "os" "sync" + "time" ) const ( @@ -36,15 +37,16 @@ type Options map[string]zapcore.Level // fall back on a default log level. // Logs either to the console or to systemd-journald. type Logging struct { - logger *zap.SugaredLogger + logger *Logger output string verbosity zap.AtomicLevel + interval time.Duration // coreFactory creates zapcore.Core based on the log level and the log output. coreFactory func(zap.AtomicLevel) zapcore.Core mu sync.Mutex - loggers map[string]*zap.SugaredLogger + loggers map[string]*Logger options Options } @@ -53,7 +55,7 @@ type Logging struct { // output where log messages are written to, // options having log levels for named child loggers // and returns a new Logging. -func NewLogging(name string, level zapcore.Level, output string, options Options) (*Logging, error) { +func NewLogging(name string, level zapcore.Level, output string, options Options, interval time.Duration) (*Logging, error) { verbosity := zap.NewAtomicLevelAt(level) var coreFactory func(zap.AtomicLevel) zapcore.Core @@ -72,14 +74,15 @@ func NewLogging(name string, level zapcore.Level, output string, options Options return nil, invalidOutput(output) } - logger := zap.New(coreFactory(verbosity)).Named(name).Sugar() + logger := NewLogger(zap.New(coreFactory(verbosity)).Named(name).Sugar(), interval) return &Logging{ logger: logger, output: output, verbosity: verbosity, + interval: interval, coreFactory: coreFactory, - loggers: map[string]*zap.SugaredLogger{}, + loggers: make(map[string]*Logger), options: options, }, nil @@ -88,7 +91,7 @@ func NewLogging(name string, level zapcore.Level, output string, options Options // GetChildLogger returns a named child logger. // Log levels for named child loggers are obtained from the logging options and, if not found, // set to the default log level. -func (l *Logging) GetChildLogger(name string) *zap.SugaredLogger { +func (l *Logging) GetChildLogger(name string) *Logger { l.mu.Lock() defer l.mu.Unlock() @@ -103,14 +106,14 @@ func (l *Logging) GetChildLogger(name string) *zap.SugaredLogger { verbosity = l.verbosity } - logger := zap.New(l.coreFactory(verbosity)).Named(name).Sugar() + logger := NewLogger(zap.New(l.coreFactory(verbosity)).Named(name).Sugar(), l.interval) l.loggers[name] = logger return logger } // GetLogger returns the default logger. -func (l *Logging) GetLogger() *zap.SugaredLogger { +func (l *Logging) GetLogger() *Logger { return l.logger } From 886c60c95a48402d6f7042c8eb1daadce8c5775f Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Thu, 4 Nov 2021 13:19:32 +0100 Subject: [PATCH 25/26] Use custom logger with 1 second interval for delta tests --- pkg/icingadb/delta_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/icingadb/delta_test.go b/pkg/icingadb/delta_test.go index 9f1cdf88..909cc229 100644 --- a/pkg/icingadb/delta_test.go +++ b/pkg/icingadb/delta_test.go @@ -6,6 +6,7 @@ import ( "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -15,6 +16,7 @@ import ( "strconv" "sync" "testing" + "time" ) func TestDelta(t *testing.T) { @@ -88,7 +90,7 @@ func TestDelta(t *testing.T) { chActual := make(chan contracts.Entity) chDesired := make(chan contracts.Entity) subject := common.NewSyncSubject(v1.NewEndpoint) - logger := zaptest.NewLogger(t).Sugar() + logger := logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Second) go func() { sendOrder.Send(id, test, chActual, chDesired) @@ -117,7 +119,7 @@ func TestDelta(t *testing.T) { chActual := make(chan contracts.Entity) chDesired := make(chan contracts.Entity) subject := common.NewSyncSubject(v1.NewEndpoint) - logger := zaptest.NewLogger(t).Sugar() + logger := logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Second) expectedCreate := make(map[uint64]uint64) expectedUpdate := make(map[uint64]uint64) @@ -256,7 +258,7 @@ func benchmarkDelta(b *testing.B, numEntities int) { } subject := common.NewSyncSubject(v1.NewEndpoint) // logger := zaptest.NewLogger(b).Sugar() - logger := zap.New(zapcore.NewTee()).Sugar() + logger := logging.NewLogger(zap.New(zapcore.NewTee()).Sugar(), time.Second) b.ResetTimer() for i := 0; i < b.N; i++ { d := NewDelta(context.Background(), chActual[i], chDesired[i], subject, logger) From ea74dc172a634f4fb5bb3638c9e65581f2bd2b57 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Fri, 5 Nov 2021 09:33:44 +0100 Subject: [PATCH 26/26] Rename periodic.Stoper to periodic.Stopper --- pkg/icingadb/db.go | 2 +- pkg/icingadb/overdue/sync.go | 2 +- pkg/icingaredis/client.go | 2 +- pkg/periodic/periodic.go | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 98c4d809..bfbf9246 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -547,7 +547,7 @@ func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted { } } -func (db *DB) log(ctx context.Context, query string, counter *com.Counter) periodic.Stoper { +func (db *DB) log(ctx context.Context, query string, counter *com.Counter) periodic.Stopper { return periodic.Start(ctx, db.logger.Interval(), func(tick periodic.Tick) { if count := counter.Reset(); count > 0 { db.logger.Debugf("Executed %q with %d rows", query, count) diff --git a/pkg/icingadb/overdue/sync.go b/pkg/icingadb/overdue/sync.go index dcd15eb0..0c0a7dbe 100644 --- a/pkg/icingadb/overdue/sync.go +++ b/pkg/icingadb/overdue/sync.go @@ -121,7 +121,7 @@ func (s Sync) initSync(ctx context.Context, objectType string) error { } // log periodically logs sync's workload. -func (s Sync) log(ctx context.Context, objectType string, counter *com.Counter) periodic.Stoper { +func (s Sync) log(ctx context.Context, objectType string, counter *com.Counter) periodic.Stopper { return periodic.Start(ctx, s.logger.Interval(), func(_ periodic.Tick) { if count := counter.Reset(); count > 0 { s.logger.Infof("Synced %d %s overdue indicators", count, objectType) diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go index fdfb8ea1..3a7922b9 100644 --- a/pkg/icingaredis/client.go +++ b/pkg/icingaredis/client.go @@ -221,7 +221,7 @@ func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-ch return desired, com.WaitAsync(g) } -func (c *Client) log(ctx context.Context, key string, counter *com.Counter) periodic.Stoper { +func (c *Client) log(ctx context.Context, key string, counter *com.Counter) periodic.Stopper { return periodic.Start(ctx, c.logger.Interval(), func(tick periodic.Tick) { // We may never get to progress logging here, // as fetching should be completed before the interval expires, diff --git a/pkg/periodic/periodic.go b/pkg/periodic/periodic.go index 0370f0c8..3f19ac86 100644 --- a/pkg/periodic/periodic.go +++ b/pkg/periodic/periodic.go @@ -11,9 +11,9 @@ type Option interface { apply(*periodic) } -// Stoper implements the Stop method, +// Stopper implements the Stop method, // which stops a periodic task from Start(). -type Stoper interface { +type Stopper interface { Stop() // Stops a periodic task. } @@ -36,7 +36,7 @@ func OnStop(f func(Tick)) Option { // which executes the given callback after each tick. // Call Stop() on the return value in order to stop the ticker and to release associated resources. // The interval must be greater than zero. -func Start(ctx context.Context, interval time.Duration, callback func(Tick), options ...Option) Stoper { +func Start(ctx context.Context, interval time.Duration, callback func(Tick), options ...Option) Stopper { t := &periodic{ interval: interval, callback: callback,