Merge pull request #396 from Icinga/logging

Logging
This commit is contained in:
Julian Brost 2021-11-08 18:14:27 +01:00 committed by GitHub
commit bfa4a27204
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 498 additions and 180 deletions

View file

@ -41,11 +41,11 @@ func run() int {
cmd.Config.Logging.Level,
cmd.Config.Logging.Output,
cmd.Config.Logging.Options,
cmd.Config.Logging.Interval,
)
if err != nil {
utils.Fatal(errors.Wrap(err, "can't configure logging"))
}
// When started by systemd, NOTIFY_SOCKET is set by systemd for Type=notify supervised services, which is the
// default setting for the Icinga DB service. So we notify that Icinga DB finished starting up.
_ = sdnotify.Ready()
@ -149,7 +149,7 @@ func run() int {
dump := icingadb.NewDumpSignals(rc, logs.GetChildLogger("dump-signals"))
g.Go(func() error {
logger.Info("Staring config dump signal handling")
logger.Debug("Staring config dump signal handling")
return dump.Listen(synctx)
})
@ -172,8 +172,8 @@ func run() int {
return ods.Sync(synctx)
})
syncStart := time.Now()
logger.Info("Starting config sync")
for _, factory := range v1.ConfigFactories {
factory := factory
@ -184,7 +184,7 @@ func run() int {
return s.SyncAfterDump(synctx, common.NewSyncSubject(factory), dump)
})
}
logger.Info("Starting initial state sync")
for _, factory := range v1.StateFactories {
factory := factory
@ -211,6 +211,39 @@ func run() int {
g.Go(func() error {
configInitSync.Wait()
elapsed := time.Since(syncStart)
logger := logs.GetChildLogger("config-sync")
if synctx.Err() == nil {
logger.Infof("Finished config sync in %s", elapsed)
} else {
logger.Warnf("Aborted config sync after %s", elapsed)
}
return nil
})
g.Go(func() error {
stateInitSync.Wait()
elapsed := time.Since(syncStart)
logger := logs.GetChildLogger("config-sync")
if synctx.Err() == nil {
logger.Infof("Finished initial state sync in %s", elapsed)
} else {
logger.Warnf("Aborted initial state sync after %s", elapsed)
}
return nil
})
g.Go(func() error {
configInitSync.Wait()
if err := synctx.Err(); err != nil {
return err
}
logger.Info("Starting config runtime updates sync")
return rt.Sync(synctx, v1.ConfigFactories, runtimeConfigUpdateStreams)
@ -218,7 +251,13 @@ func run() int {
g.Go(func() error {
stateInitSync.Wait()
if err := synctx.Err(); err != nil {
return err
}
logger.Info("Starting state runtime updates sync")
return rt.Sync(synctx, v1.StateFactories, runtimeStateUpdateStreams)
})
@ -275,7 +314,7 @@ func checkDbSchema(ctx context.Context, db *icingadb.DB) error {
}
// monitorRedisSchema monitors rc's icinga:schema version validity.
func monitorRedisSchema(logger *zap.SugaredLogger, rc *icingaredis.Client, pos string) {
func monitorRedisSchema(logger *logging.Logger, rc *icingaredis.Client, pos string) {
for {
var err error
pos, err = checkRedisSchema(logger, rc, pos)
@ -287,7 +326,7 @@ func monitorRedisSchema(logger *zap.SugaredLogger, rc *icingaredis.Client, pos s
}
// checkRedisSchema verifies rc's icinga:schema version.
func checkRedisSchema(logger *zap.SugaredLogger, rc *icingaredis.Client, pos string) (newPos string, err error) {
func checkRedisSchema(logger *logging.Logger, rc *icingaredis.Client, pos string) (newPos string, err error) {
if pos == "0-0" {
defer time.AfterFunc(3*time.Second, func() { logger.Info("Waiting for current Redis schema version") }).Stop()
} else {

View file

@ -19,14 +19,20 @@ logging:
# If not set, logs to systemd-journald when running under systemd, otherwise stderr.
output:
# Interval for periodic logging defined as duration string.
# A duration string is a sequence of decimal numbers and a unit suffix, such as "20s".
# Valid units are "ms", "s", "m", "h".
# Defaults to "20s".
interval:
# Map of component-logging level pairs to define a different log level than the default value for each component.
options:
database:
redis:
heartbeat:
high-availability:
config-sync:
history-sync:
runtime-updates:
overdue-sync:
dump-signals:
# database:
# redis:
# heartbeat:
# high-availability:
# config-sync:
# history-sync:
# runtime-updates:
# overdue-sync:
# dump-signals:

View file

@ -43,6 +43,7 @@ Option | Description
-------------------------|-----------------------------------------------
level | **Optional.** Specifies the default logging level. Can be set to `fatal`, `error`, `warning`, `info` or `debug`. Defaults to `info`.
output | **Optional.** Configures the logging output. Can be set to `console` (stderr) or `systemd-journald`. If not set, logs to systemd-journald when running under systemd, otherwise stderr.
interval | **Optional.** Interval for periodic logging defined as [duration string](#duration-string). Defaults to `"20s"`.
options | **Optional.** Map of component name to logging level in order to set a different logging level for each component instead of the default one. See [logging components](#logging-components) for details.
### Logging Components <a id="logging-components"></a>
@ -58,3 +59,8 @@ history-sync | Synchronization of history entries from Redis to MySQ
runtime-updates | Runtime updates of config objects after the initial config synchronization.
overdue-sync | Calculation and synchronization of the overdue status of checkables.
dump-signals | Dump signals received from Icinga.
### Duration String <a id="duration-string"></a>
A duration string is a sequence of decimal numbers and a unit suffix, such as `"20s"`.
Valid units are `"ms"`, `"s"`, `"m"` and `"h"`.

View file

@ -6,10 +6,10 @@ import (
"github.com/icinga/icingadb/pkg/config"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/utils"
goflags "github.com/jessevdk/go-flags"
"github.com/pkg/errors"
"go.uber.org/zap"
"os"
)
@ -48,11 +48,11 @@ func New() *Command {
}
// Database creates and returns a new icingadb.DB connection from config.Config.
func (c Command) Database(l *zap.SugaredLogger) (*icingadb.DB, error) {
func (c Command) Database(l *logging.Logger) (*icingadb.DB, error) {
return c.Config.Database.Open(l)
}
// Redis creates and returns a new icingaredis.Client connection from config.Config.
func (c Command) Redis(l *zap.SugaredLogger) (*icingaredis.Client, error) {
func (c Command) Redis(l *logging.Logger) (*icingaredis.Client, error) {
return c.Config.Redis.NewClient(l)
}

View file

@ -1,13 +1,20 @@
package com
import "sync/atomic"
import (
"sync"
"sync/atomic"
)
// Counter implements an atomic counter.
type Counter uint64
type Counter struct {
value uint64
mu sync.Mutex // Protects total.
total uint64
}
// Add adds the given delta to the counter.
func (c *Counter) Add(delta uint64) {
atomic.AddUint64(c.ptr(), delta)
atomic.AddUint64(&c.value, delta)
}
// Inc increments the counter by one.
@ -15,11 +22,27 @@ func (c *Counter) Inc() {
c.Add(1)
}
// Val returns the counter value.
func (c *Counter) Val() uint64 {
return atomic.LoadUint64(c.ptr())
// Reset resets the counter to 0 and returns its previous value.
// Does not reset the total value returned from Total.
func (c *Counter) Reset() uint64 {
c.mu.Lock()
defer c.mu.Unlock()
v := atomic.SwapUint64(&c.value, 0)
c.total += v
return v
}
func (c *Counter) ptr() *uint64 {
return (*uint64)(c)
// Total returns the total counter value.
func (c *Counter) Total() uint64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.total + c.Val()
}
// Val returns the current counter value.
func (c *Counter) Val() uint64 {
return atomic.LoadUint64(&c.value)
}

View file

@ -5,11 +5,11 @@ import (
"github.com/go-sql-driver/mysql"
"github.com/icinga/icingadb/pkg/driver"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/utils"
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/reflectx"
"github.com/pkg/errors"
"go.uber.org/zap"
"net"
"sync"
"time"
@ -30,7 +30,7 @@ type Database struct {
// Open prepares the DSN string and driver configuration,
// calls sqlx.Open, but returns *icingadb.DB.
func (d *Database) Open(logger *zap.SugaredLogger) (*icingadb.DB, error) {
func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) {
registerDriverOnce.Do(func() {
driver.Register(logger)
})

View file

@ -2,8 +2,10 @@ package config
import (
"github.com/icinga/icingadb/pkg/logging"
"github.com/pkg/errors"
"go.uber.org/zap/zapcore"
"os"
"time"
)
// Logging defines Logger configuration.
@ -11,6 +13,8 @@ type Logging struct {
// zapcore.Level at 0 is for info level.
Level zapcore.Level `yaml:"level" default:"0"`
Output string `yaml:"output"`
// Interval for periodic logging.
Interval time.Duration `yaml:"interval" default:"20s"`
logging.Options `yaml:"options"`
}
@ -19,6 +23,10 @@ type Logging struct {
// Also configures the log output if it is not configured:
// systemd-journald is used when Icinga DB is running under systemd, otherwise stderr.
func (l *Logging) Validate() error {
if l.Interval <= 0 {
return errors.New("periodic logging interval must be positive")
}
if l.Output == "" {
if _, ok := os.LookupEnv("NOTIFY_SOCKET"); ok {
// When started by systemd, NOTIFY_SOCKET is set by systemd for Type=notify supervised services,

View file

@ -6,6 +6,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/pkg/backoff"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/retry"
"github.com/pkg/errors"
"go.uber.org/zap"
@ -27,7 +28,7 @@ type ctxDialerFunc = func(ctx context.Context, network, addr string) (net.Conn,
// NewClient prepares Redis client configuration,
// calls redis.NewClient, but returns *icingaredis.Client.
func (r *Redis) NewClient(logger *zap.SugaredLogger) (*icingaredis.Client, error) {
func (r *Redis) NewClient(logger *logging.Logger) (*icingaredis.Client, error) {
tlsConfig, err := r.TLS.MakeConfig(r.Address)
if err != nil {
return nil, err
@ -59,7 +60,7 @@ func (r *Redis) NewClient(logger *zap.SugaredLogger) (*icingaredis.Client, error
}
// dialWithLogging returns a Redis Dialer with logging capabilities.
func dialWithLogging(dialer ctxDialerFunc, logger *zap.SugaredLogger) ctxDialerFunc {
func dialWithLogging(dialer ctxDialerFunc, logger *logging.Logger) ctxDialerFunc {
// dial behaves like net.Dialer#DialContext, but re-tries on syscall.ECONNREFUSED.
return func(ctx context.Context, network, addr string) (conn net.Conn, err error) {
err = retry.WithBackoff(

View file

@ -6,6 +6,7 @@ import (
"database/sql/driver"
"github.com/go-sql-driver/mysql"
"github.com/icinga/icingadb/pkg/backoff"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/retry"
"github.com/pkg/errors"
"go.uber.org/zap"
@ -58,7 +59,7 @@ func (c RetryConnector) Driver() driver.Driver {
// Driver wraps a driver.Driver that also must implement driver.DriverContext with logging capabilities and provides our RetryConnector.
type Driver struct {
ctxDriver
Logger *zap.SugaredLogger
Logger *logging.Logger
}
// OpenConnector implements the DriverContext interface.
@ -75,7 +76,7 @@ func (d Driver) OpenConnector(name string) (driver.Connector, error) {
}
// Register makes our database Driver available under the name "icingadb-mysql".
func Register(logger *zap.SugaredLogger) {
func Register(logger *logging.Logger) {
sql.Register("icingadb-mysql", &Driver{ctxDriver: &mysql.MySQLDriver{}, Logger: logger})
_ = mysql.SetLogger(mysqlLogger(func(v ...interface{}) { logger.Debug(v...) }))
}

View file

@ -9,11 +9,12 @@ import (
"github.com/icinga/icingadb/pkg/backoff"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/icinga/icingadb/pkg/retry"
"github.com/icinga/icingadb/pkg/utils"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"reflect"
@ -29,7 +30,7 @@ type DB struct {
Options *Options
logger *zap.SugaredLogger
logger *logging.Logger
tableSemaphores map[string]*semaphore.Weighted
tableSemaphoresMu sync.Mutex
}
@ -75,7 +76,7 @@ func (o *Options) Validate() error {
}
// NewDb returns a new icingadb.DB wrapper for a pre-existing *sqlx.DB.
func NewDb(db *sqlx.DB, logger *zap.SugaredLogger, options *Options) *DB {
func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB {
return &DB{
DB: db,
logger: logger,
@ -208,17 +209,15 @@ func (db *DB) BuildWhere(subject interface{}) (string, int) {
// derives and expands a query and executes it with this set of arguments until the arg stream has been processed.
// The derived queries are executed in a separate goroutine with a weighting of 1
// and can be executed concurrently to the extent allowed by the semaphore passed in sem.
func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}) error {
var cnt com.Counter
// Arguments for which the query ran successfully will be streamed on the succeeded channel.
func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}, succeeded chan<- interface{}) error {
var counter com.Counter
defer db.log(ctx, query, &counter).Stop()
g, ctx := errgroup.WithContext(ctx)
// Use context from group.
bulk := com.Bulk(ctx, arg, count)
db.logger.Debugf("Executing %s", query)
defer utils.Timed(time.Now(), func(elapsed time.Duration) {
db.logger.Debugf("Executed %s with %d rows in %s", query, cnt.Val(), elapsed)
})
g.Go(func() error {
g, ctx := errgroup.WithContext(ctx)
@ -245,7 +244,17 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph
return internal.CantPerformQuery(err, query)
}
cnt.Add(uint64(len(b)))
counter.Add(uint64(len(b)))
if succeeded != nil {
for _, row := range b {
select {
case <-ctx.Done():
return ctx.Err()
case succeeded <- row:
}
}
}
return nil
},
@ -274,15 +283,12 @@ func (db *DB) NamedBulkExec(
ctx context.Context, query string, count int, sem *semaphore.Weighted,
arg <-chan contracts.Entity, succeeded chan<- contracts.Entity,
) error {
var cnt com.Counter
var counter com.Counter
defer db.log(ctx, query, &counter).Stop()
g, ctx := errgroup.WithContext(ctx)
bulk := com.BulkEntities(ctx, arg, count)
db.logger.Debugf("Executing %s", query)
defer utils.Timed(time.Now(), func(elapsed time.Duration) {
db.logger.Debugf("Executed %s with %d rows in %s", query, cnt.Val(), elapsed)
})
g.Go(func() error {
for {
select {
@ -302,13 +308,12 @@ func (db *DB) NamedBulkExec(
return retry.WithBackoff(
ctx,
func(ctx context.Context) error {
db.logger.Debugf("Executing %s with %d rows..", query, len(b))
_, err := db.NamedExecContext(ctx, query, b)
if err != nil {
return internal.CantPerformQuery(err, query)
}
cnt.Add(uint64(len(b)))
counter.Add(uint64(len(b)))
if succeeded != nil {
for _, row := range b {
@ -346,15 +351,12 @@ func (db *DB) NamedBulkExec(
func (db *DB) NamedBulkExecTx(
ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity,
) error {
var cnt com.Counter
var counter com.Counter
defer db.log(ctx, query, &counter).Stop()
g, ctx := errgroup.WithContext(ctx)
bulk := com.BulkEntities(ctx, arg, count)
db.logger.Debugf("Executing %s", query)
defer utils.Timed(time.Now(), func(elapsed time.Duration) {
db.logger.Debugf("Executed %s with %d rows in %s", query, cnt.Val(), elapsed)
})
g.Go(func() error {
for {
select {
@ -394,7 +396,7 @@ func (db *DB) NamedBulkExecTx(
return errors.Wrap(err, "can't commit transaction")
}
cnt.Add(uint64(len(b)))
counter.Add(uint64(len(b)))
return nil
},
@ -428,18 +430,13 @@ func (db *DB) BatchSizeByPlaceholders(n int) int {
// scans each resulting row into an entity returned by the factory function,
// and streams them into a returned channel.
func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, scope interface{}) (<-chan contracts.Entity, <-chan error) {
var cnt com.Counter
entities := make(chan contracts.Entity, 1)
g, ctx := errgroup.WithContext(ctx)
db.logger.Infof("Syncing %s", query)
g.Go(func() error {
var counter com.Counter
defer db.log(ctx, query, &counter).Stop()
defer close(entities)
defer utils.Timed(time.Now(), func(elapsed time.Duration) {
v := factoryFunc()
db.logger.Infof("Fetched %d elements of %s in %s", cnt.Val(), utils.Name(v), elapsed)
})
rows, err := db.NamedQueryContext(ctx, query, scope)
if err != nil {
@ -456,7 +453,7 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF
select {
case entities <- e:
cnt.Inc()
counter.Inc()
case <-ctx.Done():
return ctx.Err()
}
@ -519,9 +516,10 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Enti
// The delete statement is created using BuildDeleteStmt with the passed entityType.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}) error {
// IDs for which the query ran successfully will be streamed on the succeeded channel.
func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}, succeeded chan<- interface{}) error {
sem := db.GetSemaphoreForTable(utils.TableName(entityType))
return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids)
return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, succeeded)
}
// Delete creates a channel from the specified ids and
@ -533,7 +531,7 @@ func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []int
}
close(idsCh)
return db.DeleteStreamed(ctx, entityType, idsCh)
return db.DeleteStreamed(ctx, entityType, idsCh, nil)
}
func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted {
@ -549,6 +547,16 @@ func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted {
}
}
func (db *DB) log(ctx context.Context, query string, counter *com.Counter) periodic.Stopper {
return periodic.Start(ctx, db.logger.Interval(), func(tick periodic.Tick) {
if count := counter.Reset(); count > 0 {
db.logger.Debugf("Executed %q with %d rows", query, count)
}
}, periodic.OnStop(func(tick periodic.Tick) {
db.logger.Debugf("Finished executing %q with %d rows in %s", query, counter.Total(), tick.Elapsed)
}))
}
// IsRetryable checks whether the given error is retryable.
func IsRetryable(err error) bool {
if errors.Is(err, driver.ErrBadConn) {

View file

@ -2,8 +2,10 @@ package icingadb
import (
"context"
"fmt"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/utils"
"go.uber.org/zap"
"time"
@ -16,12 +18,12 @@ type Delta struct {
Delete EntitiesById
Subject *common.SyncSubject
done chan error
logger *zap.SugaredLogger
logger *logging.Logger
}
// NewDelta creates a new Delta and starts calculating it. The caller must ensure
// that no duplicate entities are sent to the same stream.
func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *zap.SugaredLogger) *Delta {
func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *logging.Logger) *Delta {
delta := &Delta{
Subject: subject,
done: make(chan error, 1),
@ -101,7 +103,7 @@ func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan contract
delta.Update = update
delta.Delete = actual
delta.logger.Debugw("Delta finished",
delta.logger.Debugw(fmt.Sprintf("Finished %s delta", utils.Name(delta.Subject.Entity())),
zap.String("subject", utils.Name(delta.Subject.Entity())),
zap.Duration("time_total", time.Since(start)),
zap.Duration("time_actual", endActual.Sub(start)),

View file

@ -6,6 +6,7 @@ import (
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -15,6 +16,7 @@ import (
"strconv"
"sync"
"testing"
"time"
)
func TestDelta(t *testing.T) {
@ -88,7 +90,7 @@ func TestDelta(t *testing.T) {
chActual := make(chan contracts.Entity)
chDesired := make(chan contracts.Entity)
subject := common.NewSyncSubject(v1.NewEndpoint)
logger := zaptest.NewLogger(t).Sugar()
logger := logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Second)
go func() {
sendOrder.Send(id, test, chActual, chDesired)
@ -117,7 +119,7 @@ func TestDelta(t *testing.T) {
chActual := make(chan contracts.Entity)
chDesired := make(chan contracts.Entity)
subject := common.NewSyncSubject(v1.NewEndpoint)
logger := zaptest.NewLogger(t).Sugar()
logger := logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Second)
expectedCreate := make(map[uint64]uint64)
expectedUpdate := make(map[uint64]uint64)
@ -256,7 +258,7 @@ func benchmarkDelta(b *testing.B, numEntities int) {
}
subject := common.NewSyncSubject(v1.NewEndpoint)
// logger := zaptest.NewLogger(b).Sugar()
logger := zap.New(zapcore.NewTee()).Sugar()
logger := logging.NewLogger(zap.New(zapcore.NewTee()).Sugar(), time.Second)
b.ResetTimer()
for i := 0; i < b.N; i++ {
d := NewDelta(context.Background(), chActual[i], chDesired[i], subject, logger)

View file

@ -4,6 +4,7 @@ import (
"context"
"github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/logging"
"github.com/pkg/errors"
"go.uber.org/zap"
"sync"
@ -13,7 +14,7 @@ import (
// Dump-done signals are passed on via Done channels, while InProgress must be checked for dump-wip signals.
type DumpSignals struct {
redis *icingaredis.Client
logger *zap.SugaredLogger
logger *logging.Logger
mutex sync.Mutex
doneCh map[string]chan struct{}
allDoneCh chan struct{}
@ -21,7 +22,7 @@ type DumpSignals struct {
}
// NewDumpSignals returns new DumpSignals.
func NewDumpSignals(redis *icingaredis.Client, logger *zap.SugaredLogger) *DumpSignals {
func NewDumpSignals(redis *icingaredis.Client, logger *logging.Logger) *DumpSignals {
return &DumpSignals{
redis: redis,
logger: logger,

View file

@ -11,6 +11,7 @@ import (
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis"
icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/types"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
@ -30,7 +31,7 @@ type HA struct {
environmentMu sync.Mutex
environment *v1.Environment
heartbeat *icingaredis.Heartbeat
logger *zap.SugaredLogger
logger *logging.Logger
responsible bool
handover chan struct{}
takeover chan struct{}
@ -41,7 +42,7 @@ type HA struct {
}
// NewHA returns a new HA and starts the controller loop.
func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger *zap.SugaredLogger) *HA {
func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger *logging.Logger) *HA {
ctx, cancelCtx := context.WithCancel(ctx)
instanceId := uuid.New()

View file

@ -10,26 +10,26 @@ import (
v1types "github.com/icinga/icingadb/pkg/icingadb/v1"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/icinga/icingadb/pkg/structify"
"github.com/icinga/icingadb/pkg/types"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"reflect"
"sync"
"time"
)
// Sync specifies the source and destination of a history sync.
type Sync struct {
db *icingadb.DB
redis *icingaredis.Client
logger *zap.SugaredLogger
logger *logging.Logger
}
// NewSync creates a new Sync.
func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync {
func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) *Sync {
return &Sync{
db: db,
redis: redis,
@ -45,7 +45,7 @@ func (s Sync) Sync(ctx context.Context) error {
key := key
pipeline := pipeline
s.logger.Debugw("Starting history sync", zap.String("type", key))
s.logger.Debugf("Starting %s history sync", key)
// The pipeline consists of n+2 stages connected sequentially using n+1 channels of type chan redis.XMessage,
// where n = len(pipeline), i.e. the number of actual sync stages. So the resulting pipeline looks like this:
@ -134,16 +134,15 @@ func (s Sync) readFromRedis(ctx context.Context, key string, output chan<- redis
// deleteFromRedis is the last stage of the history sync pipeline. It receives history entries from the second to last
// pipeline stage and then deletes the stream entry from Redis as all pipeline stages successfully processed the entry.
func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redis.XMessage) error {
const logInterval = 20 * time.Second
var count uint64 // Count of synced entries for periodic logging.
stream := "icinga:history:stream:" + key
logTicker := time.NewTicker(logInterval)
defer logTicker.Stop()
var counter com.Counter
defer periodic.Start(ctx, s.logger.Interval(), func(_ periodic.Tick) {
if count := counter.Reset(); count > 0 {
s.logger.Infof("Synced %d %s history items", count, key)
}
}).Stop()
bulks := com.BulkXMessages(ctx, input, s.redis.Options.HScanCount)
stream := "icinga:history:stream:" + key
for {
select {
case bulk := <-bulks:
@ -157,14 +156,7 @@ func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redi
return icingaredis.WrapCmdErr(cmd)
}
count += uint64(len(ids))
case <-logTicker.C:
if count > 0 {
s.logger.Infof("Inserted %d %s history entries in the last %s", count, key, logInterval)
count = 0
}
counter.Add(uint64(len(ids)))
case <-ctx.Done():
return ctx.Err()
}

View file

@ -6,16 +6,17 @@ import (
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/icinga/icingadb/internal"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingadb/v1/overdue"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"strconv"
"sync/atomic"
"time"
)
@ -23,11 +24,11 @@ import (
type Sync struct {
db *icingadb.DB
redis *icingaredis.Client
logger *zap.SugaredLogger
logger *logging.Logger
}
// NewSync creates a new Sync.
func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync {
func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) *Sync {
return &Sync{
db: db,
redis: redis,
@ -57,23 +58,19 @@ func (s Sync) Sync(ctx context.Context) error {
}
g, ctx := errgroup.WithContext(ctx)
hostCounter := new(uint64)
serviceCounter := new(uint64)
var hostCounter com.Counter
defer s.log(ctx, "host", &hostCounter).Stop()
var serviceCounter com.Counter
defer s.log(ctx, "service", &serviceCounter).Stop()
g.Go(func() error {
return s.sync(ctx, "host", overdue.NewHostState, hostCounter)
return s.sync(ctx, "host", overdue.NewHostState, &hostCounter)
})
g.Go(func() error {
return s.log(ctx, "host", hostCounter)
})
g.Go(func() error {
return s.sync(ctx, "service", overdue.NewServiceState, serviceCounter)
})
g.Go(func() error {
return s.log(ctx, "service", serviceCounter)
return s.sync(ctx, "service", overdue.NewServiceState, &serviceCounter)
})
return g.Wait()
@ -81,7 +78,7 @@ func (s Sync) Sync(ctx context.Context) error {
// initSync initializes icingadb:overdue:objectType from the database.
func (s Sync) initSync(ctx context.Context, objectType string) error {
s.logger.Infof("Refreshing already synced %s overdue indicators", objectType)
s.logger.Debugf("Refreshing already synced %s overdue indicators", objectType)
start := time.Now()
var rows []v1.IdMeta
@ -112,7 +109,7 @@ func (s Sync) initSync(ctx context.Context, objectType string) error {
})
if err == nil {
s.logger.Infof(
s.logger.Debugf(
"Refreshing %d already synced %s overdue indicators took %s",
len(rows), objectType, time.Since(start),
)
@ -124,21 +121,12 @@ func (s Sync) initSync(ctx context.Context, objectType string) error {
}
// log periodically logs sync's workload.
func (s Sync) log(ctx context.Context, objectType string, counter *uint64) error {
const period = 20 * time.Second
periodically := time.NewTicker(period)
defer periodically.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-periodically.C:
if count := atomic.SwapUint64(counter, 0); count > 0 {
s.logger.Infof("Synced %d %s overdue indicators in the last %s", count, objectType, period)
}
func (s Sync) log(ctx context.Context, objectType string, counter *com.Counter) periodic.Stopper {
return periodic.Start(ctx, s.logger.Interval(), func(_ periodic.Tick) {
if count := counter.Reset(); count > 0 {
s.logger.Infof("Synced %d %s overdue indicators", count, objectType)
}
}
})
}
// luaGetOverdues takes the following KEYS:
@ -176,8 +164,8 @@ return res
`)
// sync synchronizes Redis overdue sets from s.redis to s.db for objectType.
func (s Sync) sync(ctx context.Context, objectType string, factory factory, counter *uint64) error {
s.logger.Infof("Syncing %s overdue indicators", objectType)
func (s Sync) sync(ctx context.Context, objectType string, factory factory, counter *com.Counter) error {
s.logger.Debugf("Syncing %s overdue indicators", objectType)
keys := [3]string{"icinga:nextupdate:" + objectType, "icingadb:overdue:" + objectType, ""}
if rand, err := uuid.NewRandom(); err == nil {
@ -231,7 +219,7 @@ func (s Sync) sync(ctx context.Context, objectType string, factory factory, coun
// updateOverdue sets objectType_state#is_overdue for ids to overdue
// and updates icingadb:overdue:objectType respectively.
func (s Sync) updateOverdue(
ctx context.Context, objectType string, factory factory, counter *uint64, ids []interface{}, overdue bool,
ctx context.Context, objectType string, factory factory, counter *com.Counter, ids []interface{}, overdue bool,
) error {
if len(ids) < 1 {
return nil
@ -241,7 +229,7 @@ func (s Sync) updateOverdue(
return errors.Wrap(err, "can't update overdue indicators")
}
atomic.AddUint64(counter, uint64(len(ids)))
counter.Add(uint64(len(ids)))
var op func(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
if overdue {

View file

@ -9,10 +9,11 @@ import (
"github.com/icinga/icingadb/pkg/contracts"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/icinga/icingadb/pkg/structify"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"reflect"
@ -23,11 +24,11 @@ import (
type RuntimeUpdates struct {
db *DB
redis *icingaredis.Client
logger *zap.SugaredLogger
logger *logging.Logger
}
// NewRuntimeUpdates creates a new RuntimeUpdates.
func NewRuntimeUpdates(db *DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *RuntimeUpdates {
func NewRuntimeUpdates(db *DB, redis *icingaredis.Client, logger *logging.Logger) *RuntimeUpdates {
return &RuntimeUpdates{
db: db,
redis: redis,
@ -75,14 +76,63 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti
r.logger.Debugf("Syncing runtime updates of %s", s.Name())
g.Go(structifyStream(ctx, updateMessages, upsertEntities, deleteIds, structify.MakeMapStructifier(reflect.TypeOf(s.Entity()).Elem(), "json")))
upserted := make(chan contracts.Entity)
g.Go(func() error {
defer close(upserted)
stmt, placeholders := r.db.BuildUpsertStmt(s.Entity())
// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, nil)
return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, upserted)
})
g.Go(func() error {
return r.db.DeleteStreamed(ctx, s.Entity(), deleteIds)
var counter com.Counter
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
if count := counter.Reset(); count > 0 {
r.logger.Infof("Upserted %d %s items", count, s.Name())
}
}).Stop()
for {
select {
case _, ok := <-upserted:
if !ok {
return nil
}
counter.Inc()
case <-ctx.Done():
return ctx.Err()
}
}
})
deleted := make(chan interface{})
g.Go(func() error {
defer close(deleted)
return r.db.DeleteStreamed(ctx, s.Entity(), deleteIds, deleted)
})
g.Go(func() error {
var counter com.Counter
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
if count := counter.Reset(); count > 0 {
r.logger.Infof("Deleted %d %s items", count, s.Name())
}
}).Stop()
for {
select {
case _, ok := <-deleted:
if !ok {
return nil
}
counter.Inc()
case <-ctx.Done():
return ctx.Err()
}
}
})
}
@ -103,18 +153,67 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti
customvars, flatCustomvars, errs := v1.ExpandCustomvars(ctx, upsertEntities)
com.ErrgroupReceive(g, errs)
upsertedCustomvars := make(chan contracts.Entity)
g.Go(func() error {
defer close(upsertedCustomvars)
stmt, placeholders := r.db.BuildUpsertStmt(cv.Entity())
// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, customvars, nil)
return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, customvars, upsertedCustomvars)
})
g.Go(func() error {
var counter com.Counter
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
if count := counter.Reset(); count > 0 {
r.logger.Infof("Upserted %d %s items", count, cv.Name())
}
}).Stop()
for {
select {
case _, ok := <-upsertedCustomvars:
if !ok {
return nil
}
counter.Inc()
case <-ctx.Done():
return ctx.Err()
}
}
})
upsertedFlatCustomvars := make(chan contracts.Entity)
g.Go(func() error {
defer close(upsertedFlatCustomvars)
stmt, placeholders := r.db.BuildUpsertStmt(cvFlat.Entity())
// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, flatCustomvars, nil)
return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, flatCustomvars, upsertedFlatCustomvars)
})
g.Go(func() error {
var counter com.Counter
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
if count := counter.Reset(); count > 0 {
r.logger.Infof("Upserted %d %s items", count, cvFlat.Name())
}
}).Stop()
for {
select {
case _, ok := <-upsertedFlatCustomvars:
if !ok {
return nil
}
counter.Inc()
case <-ctx.Done():
return ctx.Err()
}
}
})
g.Go(func() error {

View file

@ -8,6 +8,7 @@ import (
"github.com/icinga/icingadb/pkg/contracts"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
"go.uber.org/zap"
@ -20,11 +21,11 @@ import (
type Sync struct {
db *DB
redis *icingaredis.Client
logger *zap.SugaredLogger
logger *logging.Logger
}
// NewSync returns a new Sync.
func NewSync(db *DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync {
func NewSync(db *DB, redis *icingaredis.Client, logger *logging.Logger) *Sync {
return &Sync{
db: db,
redis: redis,
@ -39,9 +40,9 @@ func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, du
key := "icinga:" + utils.Key(typeName, ':')
startTime := time.Now()
logTicker := time.NewTicker(20 * time.Second)
loggedWaiting := false
logTicker := time.NewTicker(s.logger.Interval())
defer logTicker.Stop()
loggedWaiting := false
for {
select {
@ -70,8 +71,6 @@ func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, du
// Sync synchronizes entities between Icinga DB and Redis created with the specified sync subject.
// This function does not respect dump signals. For this, use SyncAfterDump.
func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error {
s.logger.Infof("Syncing %s", utils.Key(utils.Name(subject.Entity()), ' '))
g, ctx := errgroup.WithContext(ctx)
desired, redisErrs := s.redis.YieldAll(ctx, subject)
@ -105,6 +104,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
// Create
if len(delta.Create) > 0 {
s.logger.Infof("Inserting %d items of type %s", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
var entities <-chan contracts.Entity
if delta.Subject.WithChecksum() {
pairs, errs := s.redis.HMYield(
@ -131,7 +131,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
// Update
if len(delta.Update) > 0 {
s.logger.Infof("Updating %d rows of type %s", len(delta.Update), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
s.logger.Infof("Updating %d items of type %s", len(delta.Update), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
pairs, errs := s.redis.HMYield(
ctx,
fmt.Sprintf("icinga:%s", utils.Key(utils.Name(delta.Subject.Entity()), ':')),
@ -155,7 +155,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
// Delete
if len(delta.Delete) > 0 {
s.logger.Infof("Deleting %d rows of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
g.Go(func() error {
return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs())
})
@ -166,9 +166,6 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
// SyncCustomvars synchronizes customvar and customvar_flat.
func (s Sync) SyncCustomvars(ctx context.Context) error {
s.logger.Info("Syncing customvar")
s.logger.Info("Syncing customvar_flat")
e, ok := v1.EnvironmentFromContext(ctx)
if !ok {
return errors.New("can't get environment from context")

View file

@ -6,9 +6,10 @@ import (
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"runtime"
@ -22,7 +23,7 @@ type Client struct {
Options *Options
logger *zap.SugaredLogger
logger *logging.Logger
}
// Options define user configurable Redis options.
@ -56,7 +57,7 @@ func (o *Options) Validate() error {
}
// NewClient returns a new icingaredis.Client wrapper for a pre-existing *redis.Client.
func NewClient(client *redis.Client, logger *zap.SugaredLogger, options *Options) *Client {
func NewClient(client *redis.Client, logger *logging.Logger, options *Options) *Client {
return &Client{Client: client, logger: logger, Options: options}
}
@ -70,18 +71,13 @@ type HPair struct {
func (c *Client) HYield(ctx context.Context, key string) (<-chan HPair, <-chan error) {
pairs := make(chan HPair, c.Options.HScanCount)
c.logger.Infof("Syncing %s", key)
return pairs, com.WaitAsync(contracts.WaiterFunc(func() error {
var counter com.Counter
defer c.log(ctx, key, &counter).Stop()
defer close(pairs)
seen := make(map[string]struct{})
var cnt uint64
defer utils.Timed(time.Now(), func(elapsed time.Duration) {
c.logger.Infof("Fetched %d elements of %s in %s", cnt, key, elapsed)
})
var cursor uint64
var err error
var page []string
@ -107,7 +103,7 @@ func (c *Client) HYield(ctx context.Context, key string) (<-chan HPair, <-chan e
Field: page[i],
Value: page[i+1],
}:
cnt++
counter.Inc()
case <-ctx.Done():
return ctx.Err()
}
@ -127,6 +123,9 @@ func (c *Client) HMYield(ctx context.Context, key string, fields ...string) (<-c
pairs := make(chan HPair)
return pairs, com.WaitAsync(contracts.WaiterFunc(func() error {
var counter com.Counter
defer c.log(ctx, key, &counter).Stop()
g, ctx := errgroup.WithContext(ctx)
defer func() {
@ -169,6 +168,7 @@ func (c *Client) HMYield(ctx context.Context, key string, fields ...string) (<-c
Field: batch[i],
Value: v.(string),
}:
counter.Inc()
case <-ctx.Done():
return ctx.Err()
}
@ -220,3 +220,16 @@ func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-ch
return desired, com.WaitAsync(g)
}
func (c *Client) log(ctx context.Context, key string, counter *com.Counter) periodic.Stopper {
return periodic.Start(ctx, c.logger.Interval(), func(tick periodic.Tick) {
// We may never get to progress logging here,
// as fetching should be completed before the interval expires,
// but if it does, it is good to have this log message.
if count := counter.Reset(); count > 0 {
c.logger.Debugf("Fetched %d items from %s", count, key)
}
}, periodic.OnStop(func(tick periodic.Tick) {
c.logger.Debugf("Finished fetching from %s with %d items in %s", key, counter.Total(), tick.Elapsed)
}))
}

View file

@ -5,6 +5,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/internal"
v1 "github.com/icinga/icingadb/pkg/icingaredis/v1"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/types"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
@ -28,11 +29,11 @@ type Heartbeat struct {
done chan struct{}
errMu sync.Mutex
err error
logger *zap.SugaredLogger
logger *logging.Logger
}
// NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop.
func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger) *Heartbeat {
func NewHeartbeat(ctx context.Context, client *Client, logger *logging.Logger) *Heartbeat {
ctx, cancelCtx := context.WithCancel(ctx)
heartbeat := &Heartbeat{
@ -81,8 +82,6 @@ func (h *Heartbeat) Err() error {
func (h *Heartbeat) controller(ctx context.Context) {
defer close(h.done)
h.logger.Info("Waiting for Icinga 2 heartbeat")
messages := make(chan *HeartbeatMessage)
defer close(messages)
@ -130,17 +129,17 @@ func (h *Heartbeat) controller(ctx context.Context) {
if err != nil {
return err
}
h.logger.Infow("Received first Icinga 2 heartbeat", zap.String("environment", envId.String()))
h.logger.Infow("Received Icinga heartbeat", zap.String("environment", envId.String()))
h.active = true
}
h.sendEvent(m)
case <-time.After(timeout):
if h.active {
h.logger.Warnw("Lost Icinga 2 heartbeat", zap.Duration("timeout", timeout))
h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", timeout))
h.sendEvent(nil)
h.active = false
} else {
h.logger.Warn("Waiting for Icinga 2 heartbeat")
h.logger.Warn("Waiting for Icinga heartbeat")
}
case <-ctx.Done():
return ctx.Err()

26
pkg/logging/logger.go Normal file
View file

@ -0,0 +1,26 @@
package logging
import (
"go.uber.org/zap"
"time"
)
// Logger wraps zap.SugaredLogger and
// allows to get the interval for periodic logging.
type Logger struct {
*zap.SugaredLogger
interval time.Duration
}
// NewLogger returns a new Logger.
func NewLogger(base *zap.SugaredLogger, interval time.Duration) *Logger {
return &Logger{
SugaredLogger: base,
interval: interval,
}
}
// Interval returns the interval for periodic logging.
func (l *Logger) Interval() time.Duration {
return l.interval
}

View file

@ -6,6 +6,7 @@ import (
"go.uber.org/zap/zapcore"
"os"
"sync"
"time"
)
const (
@ -36,15 +37,16 @@ type Options map[string]zapcore.Level
// fall back on a default log level.
// Logs either to the console or to systemd-journald.
type Logging struct {
logger *zap.SugaredLogger
logger *Logger
output string
verbosity zap.AtomicLevel
interval time.Duration
// coreFactory creates zapcore.Core based on the log level and the log output.
coreFactory func(zap.AtomicLevel) zapcore.Core
mu sync.Mutex
loggers map[string]*zap.SugaredLogger
loggers map[string]*Logger
options Options
}
@ -53,7 +55,7 @@ type Logging struct {
// output where log messages are written to,
// options having log levels for named child loggers
// and returns a new Logging.
func NewLogging(name string, level zapcore.Level, output string, options Options) (*Logging, error) {
func NewLogging(name string, level zapcore.Level, output string, options Options, interval time.Duration) (*Logging, error) {
verbosity := zap.NewAtomicLevelAt(level)
var coreFactory func(zap.AtomicLevel) zapcore.Core
@ -72,14 +74,15 @@ func NewLogging(name string, level zapcore.Level, output string, options Options
return nil, invalidOutput(output)
}
logger := zap.New(coreFactory(verbosity)).Named(name).Sugar()
logger := NewLogger(zap.New(coreFactory(verbosity)).Named(name).Sugar(), interval)
return &Logging{
logger: logger,
output: output,
verbosity: verbosity,
interval: interval,
coreFactory: coreFactory,
loggers: map[string]*zap.SugaredLogger{},
loggers: make(map[string]*Logger),
options: options,
},
nil
@ -88,7 +91,7 @@ func NewLogging(name string, level zapcore.Level, output string, options Options
// GetChildLogger returns a named child logger.
// Log levels for named child loggers are obtained from the logging options and, if not found,
// set to the default log level.
func (l *Logging) GetChildLogger(name string) *zap.SugaredLogger {
func (l *Logging) GetChildLogger(name string) *Logger {
l.mu.Lock()
defer l.mu.Unlock()
@ -103,14 +106,14 @@ func (l *Logging) GetChildLogger(name string) *zap.SugaredLogger {
verbosity = l.verbosity
}
logger := zap.New(l.coreFactory(verbosity)).Named(name).Sugar()
logger := NewLogger(zap.New(l.coreFactory(verbosity)).Named(name).Sugar(), l.interval)
l.loggers[name] = logger
return logger
}
// GetLogger returns the default logger.
func (l *Logging) GetLogger() *zap.SugaredLogger {
func (l *Logging) GetLogger() *Logger {
return l.logger
}

99
pkg/periodic/periodic.go Normal file
View file

@ -0,0 +1,99 @@
package periodic
import (
"context"
"sync"
"time"
)
// Option configures Start.
type Option interface {
apply(*periodic)
}
// Stopper implements the Stop method,
// which stops a periodic task from Start().
type Stopper interface {
Stop() // Stops a periodic task.
}
// Tick is the value for periodic task callbacks that
// contains the time of the tick and
// the time elapsed since the start of the periodic task.
type Tick struct {
Elapsed time.Duration
Time time.Time
}
// OnStop configures a callback that is executed when a periodic task is stopped or canceled.
func OnStop(f func(Tick)) Option {
return optionFunc(func(p *periodic) {
p.onStop = f
})
}
// Start starts a periodic task with a ticker at the specified interval,
// which executes the given callback after each tick.
// Call Stop() on the return value in order to stop the ticker and to release associated resources.
// The interval must be greater than zero.
func Start(ctx context.Context, interval time.Duration, callback func(Tick), options ...Option) Stopper {
t := &periodic{
interval: interval,
callback: callback,
}
for _, option := range options {
option.apply(t)
}
ctx, cancelCtx := context.WithCancel(ctx)
ticker := time.NewTicker(t.interval)
start := time.Now()
go func() {
defer ticker.Stop()
for {
select {
case tickTime := <-ticker.C:
t.callback(Tick{
Elapsed: tickTime.Sub(start),
Time: tickTime,
})
case <-ctx.Done():
if t.onStop != nil {
now := time.Now()
t.onStop(Tick{
Elapsed: now.Sub(start),
Time: now,
})
}
return
}
}
}()
return stoperFunc(func() {
t.stop.Do(cancelCtx)
})
}
type optionFunc func(*periodic)
func (f optionFunc) apply(p *periodic) {
f(p)
}
type stoperFunc func()
func (f stoperFunc) Stop() {
f()
}
type periodic struct {
interval time.Duration
callback func(Tick)
stop sync.Once
onStop func(Tick)
}

View file

@ -48,6 +48,10 @@ func WithBackoff(
return
}
if err = parentCtx.Err(); err != nil {
return
}
if settings.OnError != nil {
settings.OnError(time.Since(start), attempt, err, prevErr)
}