mirror of
https://github.com/Icinga/icingadb.git
synced 2026-05-28 04:35:54 -04:00
Use custom logger for accessing the interval for periodic logging
This commit is contained in:
parent
8ec157e39b
commit
ccda48234e
18 changed files with 90 additions and 77 deletions
|
|
@ -3,7 +3,6 @@ package main
|
|||
import (
|
||||
"context"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/icinga/icingadb/internal"
|
||||
"github.com/icinga/icingadb/internal/command"
|
||||
"github.com/icinga/icingadb/pkg/common"
|
||||
"github.com/icinga/icingadb/pkg/icingadb"
|
||||
|
|
@ -42,11 +41,11 @@ func run() int {
|
|||
cmd.Config.Logging.Level,
|
||||
cmd.Config.Logging.Output,
|
||||
cmd.Config.Logging.Options,
|
||||
cmd.Config.Logging.Interval,
|
||||
)
|
||||
if err != nil {
|
||||
utils.Fatal(errors.Wrap(err, "can't configure logging"))
|
||||
}
|
||||
internal.SetLoggingInterval(cmd.Config.Logging.Interval)
|
||||
// When started by systemd, NOTIFY_SOCKET is set by systemd for Type=notify supervised services, which is the
|
||||
// default setting for the Icinga DB service. So we notify that Icinga DB finished starting up.
|
||||
_ = sdnotify.Ready()
|
||||
|
|
@ -315,7 +314,7 @@ func checkDbSchema(ctx context.Context, db *icingadb.DB) error {
|
|||
}
|
||||
|
||||
// monitorRedisSchema monitors rc's icinga:schema version validity.
|
||||
func monitorRedisSchema(logger *zap.SugaredLogger, rc *icingaredis.Client, pos string) {
|
||||
func monitorRedisSchema(logger *logging.Logger, rc *icingaredis.Client, pos string) {
|
||||
for {
|
||||
var err error
|
||||
pos, err = checkRedisSchema(logger, rc, pos)
|
||||
|
|
@ -327,7 +326,7 @@ func monitorRedisSchema(logger *zap.SugaredLogger, rc *icingaredis.Client, pos s
|
|||
}
|
||||
|
||||
// checkRedisSchema verifies rc's icinga:schema version.
|
||||
func checkRedisSchema(logger *zap.SugaredLogger, rc *icingaredis.Client, pos string) (newPos string, err error) {
|
||||
func checkRedisSchema(logger *logging.Logger, rc *icingaredis.Client, pos string) (newPos string, err error) {
|
||||
if pos == "0-0" {
|
||||
defer time.AfterFunc(3*time.Second, func() { logger.Info("Waiting for current Redis schema version") }).Stop()
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -6,10 +6,10 @@ import (
|
|||
"github.com/icinga/icingadb/pkg/config"
|
||||
"github.com/icinga/icingadb/pkg/icingadb"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
goflags "github.com/jessevdk/go-flags"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
"os"
|
||||
)
|
||||
|
||||
|
|
@ -48,11 +48,11 @@ func New() *Command {
|
|||
}
|
||||
|
||||
// Database creates and returns a new icingadb.DB connection from config.Config.
|
||||
func (c Command) Database(l *zap.SugaredLogger) (*icingadb.DB, error) {
|
||||
func (c Command) Database(l *logging.Logger) (*icingadb.DB, error) {
|
||||
return c.Config.Database.Open(l)
|
||||
}
|
||||
|
||||
// Redis creates and returns a new icingaredis.Client connection from config.Config.
|
||||
func (c Command) Redis(l *zap.SugaredLogger) (*icingaredis.Client, error) {
|
||||
func (c Command) Redis(l *logging.Logger) (*icingaredis.Client, error) {
|
||||
return c.Config.Redis.NewClient(l)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,19 +0,0 @@
|
|||
package internal
|
||||
|
||||
import "time"
|
||||
|
||||
// LoggingInterval returns the interval for periodic logging.
|
||||
func LoggingInterval() time.Duration {
|
||||
return c.LoggingInterval
|
||||
}
|
||||
|
||||
// SetLoggingInterval configures the interval for periodic logging.
|
||||
func SetLoggingInterval(i time.Duration) {
|
||||
c.LoggingInterval = i
|
||||
}
|
||||
|
||||
var c config
|
||||
|
||||
type config struct {
|
||||
LoggingInterval time.Duration
|
||||
}
|
||||
|
|
@ -5,11 +5,11 @@ import (
|
|||
"github.com/go-sql-driver/mysql"
|
||||
"github.com/icinga/icingadb/pkg/driver"
|
||||
"github.com/icinga/icingadb/pkg/icingadb"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/jmoiron/sqlx/reflectx"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
|
@ -30,7 +30,7 @@ type Database struct {
|
|||
|
||||
// Open prepares the DSN string and driver configuration,
|
||||
// calls sqlx.Open, but returns *icingadb.DB.
|
||||
func (d *Database) Open(logger *zap.SugaredLogger) (*icingadb.DB, error) {
|
||||
func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) {
|
||||
registerDriverOnce.Do(func() {
|
||||
driver.Register(logger)
|
||||
})
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import (
|
|||
"github.com/go-redis/redis/v8"
|
||||
"github.com/icinga/icingadb/pkg/backoff"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/icinga/icingadb/pkg/retry"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
|
|
@ -27,7 +28,7 @@ type ctxDialerFunc = func(ctx context.Context, network, addr string) (net.Conn,
|
|||
|
||||
// NewClient prepares Redis client configuration,
|
||||
// calls redis.NewClient, but returns *icingaredis.Client.
|
||||
func (r *Redis) NewClient(logger *zap.SugaredLogger) (*icingaredis.Client, error) {
|
||||
func (r *Redis) NewClient(logger *logging.Logger) (*icingaredis.Client, error) {
|
||||
tlsConfig, err := r.TLS.MakeConfig(r.Address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -59,7 +60,7 @@ func (r *Redis) NewClient(logger *zap.SugaredLogger) (*icingaredis.Client, error
|
|||
}
|
||||
|
||||
// dialWithLogging returns a Redis Dialer with logging capabilities.
|
||||
func dialWithLogging(dialer ctxDialerFunc, logger *zap.SugaredLogger) ctxDialerFunc {
|
||||
func dialWithLogging(dialer ctxDialerFunc, logger *logging.Logger) ctxDialerFunc {
|
||||
// dial behaves like net.Dialer#DialContext, but re-tries on syscall.ECONNREFUSED.
|
||||
return func(ctx context.Context, network, addr string) (conn net.Conn, err error) {
|
||||
err = retry.WithBackoff(
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import (
|
|||
"database/sql/driver"
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"github.com/icinga/icingadb/pkg/backoff"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/icinga/icingadb/pkg/retry"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
|
|
@ -58,7 +59,7 @@ func (c RetryConnector) Driver() driver.Driver {
|
|||
// Driver wraps a driver.Driver that also must implement driver.DriverContext with logging capabilities and provides our RetryConnector.
|
||||
type Driver struct {
|
||||
ctxDriver
|
||||
Logger *zap.SugaredLogger
|
||||
Logger *logging.Logger
|
||||
}
|
||||
|
||||
// OpenConnector implements the DriverContext interface.
|
||||
|
|
@ -75,7 +76,7 @@ func (d Driver) OpenConnector(name string) (driver.Connector, error) {
|
|||
}
|
||||
|
||||
// Register makes our database Driver available under the name "icingadb-mysql".
|
||||
func Register(logger *zap.SugaredLogger) {
|
||||
func Register(logger *logging.Logger) {
|
||||
sql.Register("icingadb-mysql", &Driver{ctxDriver: &mysql.MySQLDriver{}, Logger: logger})
|
||||
_ = mysql.SetLogger(mysqlLogger(func(v ...interface{}) { logger.Debug(v...) }))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,12 +9,12 @@ import (
|
|||
"github.com/icinga/icingadb/pkg/backoff"
|
||||
"github.com/icinga/icingadb/pkg/com"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/icinga/icingadb/pkg/periodic"
|
||||
"github.com/icinga/icingadb/pkg/retry"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/sync/semaphore"
|
||||
"reflect"
|
||||
|
|
@ -30,7 +30,7 @@ type DB struct {
|
|||
|
||||
Options *Options
|
||||
|
||||
logger *zap.SugaredLogger
|
||||
logger *logging.Logger
|
||||
tableSemaphores map[string]*semaphore.Weighted
|
||||
tableSemaphoresMu sync.Mutex
|
||||
}
|
||||
|
|
@ -76,7 +76,7 @@ func (o *Options) Validate() error {
|
|||
}
|
||||
|
||||
// NewDb returns a new icingadb.DB wrapper for a pre-existing *sqlx.DB.
|
||||
func NewDb(db *sqlx.DB, logger *zap.SugaredLogger, options *Options) *DB {
|
||||
func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB {
|
||||
return &DB{
|
||||
DB: db,
|
||||
logger: logger,
|
||||
|
|
@ -548,7 +548,7 @@ func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted {
|
|||
}
|
||||
|
||||
func (db *DB) log(ctx context.Context, query string, counter *com.Counter) periodic.Stoper {
|
||||
return periodic.Start(ctx, internal.LoggingInterval(), func(tick periodic.Tick) {
|
||||
return periodic.Start(ctx, db.logger.Interval(), func(tick periodic.Tick) {
|
||||
if count := counter.Reset(); count > 0 {
|
||||
db.logger.Debugf("Executed %q with %d rows", query, count)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"github.com/icinga/icingadb/pkg/common"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
|
|
@ -17,12 +18,12 @@ type Delta struct {
|
|||
Delete EntitiesById
|
||||
Subject *common.SyncSubject
|
||||
done chan error
|
||||
logger *zap.SugaredLogger
|
||||
logger *logging.Logger
|
||||
}
|
||||
|
||||
// NewDelta creates a new Delta and starts calculating it. The caller must ensure
|
||||
// that no duplicate entities are sent to the same stream.
|
||||
func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *zap.SugaredLogger) *Delta {
|
||||
func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *logging.Logger) *Delta {
|
||||
delta := &Delta{
|
||||
Subject: subject,
|
||||
done: make(chan error, 1),
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
"sync"
|
||||
|
|
@ -13,7 +14,7 @@ import (
|
|||
// Dump-done signals are passed on via Done channels, while InProgress must be checked for dump-wip signals.
|
||||
type DumpSignals struct {
|
||||
redis *icingaredis.Client
|
||||
logger *zap.SugaredLogger
|
||||
logger *logging.Logger
|
||||
mutex sync.Mutex
|
||||
doneCh map[string]chan struct{}
|
||||
allDoneCh chan struct{}
|
||||
|
|
@ -21,7 +22,7 @@ type DumpSignals struct {
|
|||
}
|
||||
|
||||
// NewDumpSignals returns new DumpSignals.
|
||||
func NewDumpSignals(redis *icingaredis.Client, logger *zap.SugaredLogger) *DumpSignals {
|
||||
func NewDumpSignals(redis *icingaredis.Client, logger *logging.Logger) *DumpSignals {
|
||||
return &DumpSignals{
|
||||
redis: redis,
|
||||
logger: logger,
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import (
|
|||
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis"
|
||||
icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
"github.com/pkg/errors"
|
||||
|
|
@ -30,7 +31,7 @@ type HA struct {
|
|||
environmentMu sync.Mutex
|
||||
environment *v1.Environment
|
||||
heartbeat *icingaredis.Heartbeat
|
||||
logger *zap.SugaredLogger
|
||||
logger *logging.Logger
|
||||
responsible bool
|
||||
handover chan struct{}
|
||||
takeover chan struct{}
|
||||
|
|
@ -41,7 +42,7 @@ type HA struct {
|
|||
}
|
||||
|
||||
// NewHA returns a new HA and starts the controller loop.
|
||||
func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger *zap.SugaredLogger) *HA {
|
||||
func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger *logging.Logger) *HA {
|
||||
ctx, cancelCtx := context.WithCancel(ctx)
|
||||
|
||||
instanceId := uuid.New()
|
||||
|
|
|
|||
|
|
@ -10,12 +10,12 @@ import (
|
|||
v1types "github.com/icinga/icingadb/pkg/icingadb/v1"
|
||||
v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/icinga/icingadb/pkg/periodic"
|
||||
"github.com/icinga/icingadb/pkg/structify"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
|
@ -25,11 +25,11 @@ import (
|
|||
type Sync struct {
|
||||
db *icingadb.DB
|
||||
redis *icingaredis.Client
|
||||
logger *zap.SugaredLogger
|
||||
logger *logging.Logger
|
||||
}
|
||||
|
||||
// NewSync creates a new Sync.
|
||||
func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync {
|
||||
func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) *Sync {
|
||||
return &Sync{
|
||||
db: db,
|
||||
redis: redis,
|
||||
|
|
@ -135,7 +135,7 @@ func (s Sync) readFromRedis(ctx context.Context, key string, output chan<- redis
|
|||
// pipeline stage and then deletes the stream entry from Redis as all pipeline stages successfully processed the entry.
|
||||
func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redis.XMessage) error {
|
||||
var counter com.Counter
|
||||
defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) {
|
||||
defer periodic.Start(ctx, s.logger.Interval(), func(_ periodic.Tick) {
|
||||
if count := counter.Reset(); count > 0 {
|
||||
s.logger.Infof("Synced %d %s history items", count, key)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,9 +12,9 @@ import (
|
|||
"github.com/icinga/icingadb/pkg/icingadb/v1"
|
||||
"github.com/icinga/icingadb/pkg/icingadb/v1/overdue"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/icinga/icingadb/pkg/periodic"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"strconv"
|
||||
"time"
|
||||
|
|
@ -24,11 +24,11 @@ import (
|
|||
type Sync struct {
|
||||
db *icingadb.DB
|
||||
redis *icingaredis.Client
|
||||
logger *zap.SugaredLogger
|
||||
logger *logging.Logger
|
||||
}
|
||||
|
||||
// NewSync creates a new Sync.
|
||||
func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync {
|
||||
func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) *Sync {
|
||||
return &Sync{
|
||||
db: db,
|
||||
redis: redis,
|
||||
|
|
@ -122,7 +122,7 @@ func (s Sync) initSync(ctx context.Context, objectType string) error {
|
|||
|
||||
// log periodically logs sync's workload.
|
||||
func (s Sync) log(ctx context.Context, objectType string, counter *com.Counter) periodic.Stoper {
|
||||
return periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) {
|
||||
return periodic.Start(ctx, s.logger.Interval(), func(_ periodic.Tick) {
|
||||
if count := counter.Reset(); count > 0 {
|
||||
s.logger.Infof("Synced %d %s overdue indicators", count, objectType)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,17 +4,16 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/icinga/icingadb/internal"
|
||||
"github.com/icinga/icingadb/pkg/com"
|
||||
"github.com/icinga/icingadb/pkg/common"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/icinga/icingadb/pkg/periodic"
|
||||
"github.com/icinga/icingadb/pkg/structify"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/sync/semaphore"
|
||||
"reflect"
|
||||
|
|
@ -25,11 +24,11 @@ import (
|
|||
type RuntimeUpdates struct {
|
||||
db *DB
|
||||
redis *icingaredis.Client
|
||||
logger *zap.SugaredLogger
|
||||
logger *logging.Logger
|
||||
}
|
||||
|
||||
// NewRuntimeUpdates creates a new RuntimeUpdates.
|
||||
func NewRuntimeUpdates(db *DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *RuntimeUpdates {
|
||||
func NewRuntimeUpdates(db *DB, redis *icingaredis.Client, logger *logging.Logger) *RuntimeUpdates {
|
||||
return &RuntimeUpdates{
|
||||
db: db,
|
||||
redis: redis,
|
||||
|
|
@ -88,7 +87,7 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti
|
|||
})
|
||||
g.Go(func() error {
|
||||
var counter com.Counter
|
||||
defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) {
|
||||
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
|
||||
if count := counter.Reset(); count > 0 {
|
||||
r.logger.Infof("Upserted %d %s items", count, s.Name())
|
||||
}
|
||||
|
|
@ -116,7 +115,7 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti
|
|||
})
|
||||
g.Go(func() error {
|
||||
var counter com.Counter
|
||||
defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) {
|
||||
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
|
||||
if count := counter.Reset(); count > 0 {
|
||||
r.logger.Infof("Deleted %d %s items", count, s.Name())
|
||||
}
|
||||
|
|
@ -166,7 +165,7 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti
|
|||
})
|
||||
g.Go(func() error {
|
||||
var counter com.Counter
|
||||
defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) {
|
||||
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
|
||||
if count := counter.Reset(); count > 0 {
|
||||
r.logger.Infof("Upserted %d %s items", count, cv.Name())
|
||||
}
|
||||
|
|
@ -197,7 +196,7 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti
|
|||
})
|
||||
g.Go(func() error {
|
||||
var counter com.Counter
|
||||
defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) {
|
||||
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
|
||||
if count := counter.Reset(); count > 0 {
|
||||
r.logger.Infof("Upserted %d %s items", count, cvFlat.Name())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,12 +3,12 @@ package icingadb
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/icinga/icingadb/internal"
|
||||
"github.com/icinga/icingadb/pkg/com"
|
||||
"github.com/icinga/icingadb/pkg/common"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
|
|
@ -21,11 +21,11 @@ import (
|
|||
type Sync struct {
|
||||
db *DB
|
||||
redis *icingaredis.Client
|
||||
logger *zap.SugaredLogger
|
||||
logger *logging.Logger
|
||||
}
|
||||
|
||||
// NewSync returns a new Sync.
|
||||
func NewSync(db *DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync {
|
||||
func NewSync(db *DB, redis *icingaredis.Client, logger *logging.Logger) *Sync {
|
||||
return &Sync{
|
||||
db: db,
|
||||
redis: redis,
|
||||
|
|
@ -40,7 +40,7 @@ func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, du
|
|||
key := "icinga:" + utils.Key(typeName, ':')
|
||||
|
||||
startTime := time.Now()
|
||||
logTicker := time.NewTicker(internal.LoggingInterval())
|
||||
logTicker := time.NewTicker(s.logger.Interval())
|
||||
defer logTicker.Stop()
|
||||
loggedWaiting := false
|
||||
|
||||
|
|
|
|||
|
|
@ -3,14 +3,13 @@ package icingaredis
|
|||
import (
|
||||
"context"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/icinga/icingadb/internal"
|
||||
"github.com/icinga/icingadb/pkg/com"
|
||||
"github.com/icinga/icingadb/pkg/common"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/icinga/icingadb/pkg/periodic"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/sync/semaphore"
|
||||
"runtime"
|
||||
|
|
@ -24,7 +23,7 @@ type Client struct {
|
|||
|
||||
Options *Options
|
||||
|
||||
logger *zap.SugaredLogger
|
||||
logger *logging.Logger
|
||||
}
|
||||
|
||||
// Options define user configurable Redis options.
|
||||
|
|
@ -58,7 +57,7 @@ func (o *Options) Validate() error {
|
|||
}
|
||||
|
||||
// NewClient returns a new icingaredis.Client wrapper for a pre-existing *redis.Client.
|
||||
func NewClient(client *redis.Client, logger *zap.SugaredLogger, options *Options) *Client {
|
||||
func NewClient(client *redis.Client, logger *logging.Logger, options *Options) *Client {
|
||||
return &Client{Client: client, logger: logger, Options: options}
|
||||
}
|
||||
|
||||
|
|
@ -223,7 +222,7 @@ func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-ch
|
|||
}
|
||||
|
||||
func (c *Client) log(ctx context.Context, key string, counter *com.Counter) periodic.Stoper {
|
||||
return periodic.Start(ctx, internal.LoggingInterval(), func(tick periodic.Tick) {
|
||||
return periodic.Start(ctx, c.logger.Interval(), func(tick periodic.Tick) {
|
||||
// We may never get to progress logging here,
|
||||
// as fetching should be completed before the interval expires,
|
||||
// but if it does, it is good to have this log message.
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"github.com/go-redis/redis/v8"
|
||||
"github.com/icinga/icingadb/internal"
|
||||
v1 "github.com/icinga/icingadb/pkg/icingaredis/v1"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
"github.com/pkg/errors"
|
||||
|
|
@ -28,11 +29,11 @@ type Heartbeat struct {
|
|||
done chan struct{}
|
||||
errMu sync.Mutex
|
||||
err error
|
||||
logger *zap.SugaredLogger
|
||||
logger *logging.Logger
|
||||
}
|
||||
|
||||
// NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop.
|
||||
func NewHeartbeat(ctx context.Context, client *Client, logger *zap.SugaredLogger) *Heartbeat {
|
||||
func NewHeartbeat(ctx context.Context, client *Client, logger *logging.Logger) *Heartbeat {
|
||||
ctx, cancelCtx := context.WithCancel(ctx)
|
||||
|
||||
heartbeat := &Heartbeat{
|
||||
|
|
|
|||
26
pkg/logging/logger.go
Normal file
26
pkg/logging/logger.go
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
package logging
|
||||
|
||||
import (
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Logger wraps zap.SugaredLogger and
|
||||
// allows to get the interval for periodic logging.
|
||||
type Logger struct {
|
||||
*zap.SugaredLogger
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
// NewLogger returns a new Logger.
|
||||
func NewLogger(base *zap.SugaredLogger, interval time.Duration) *Logger {
|
||||
return &Logger{
|
||||
SugaredLogger: base,
|
||||
interval: interval,
|
||||
}
|
||||
}
|
||||
|
||||
// Interval returns the interval for periodic logging.
|
||||
func (l *Logger) Interval() time.Duration {
|
||||
return l.interval
|
||||
}
|
||||
|
|
@ -6,6 +6,7 @@ import (
|
|||
"go.uber.org/zap/zapcore"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -36,15 +37,16 @@ type Options map[string]zapcore.Level
|
|||
// fall back on a default log level.
|
||||
// Logs either to the console or to systemd-journald.
|
||||
type Logging struct {
|
||||
logger *zap.SugaredLogger
|
||||
logger *Logger
|
||||
output string
|
||||
verbosity zap.AtomicLevel
|
||||
interval time.Duration
|
||||
|
||||
// coreFactory creates zapcore.Core based on the log level and the log output.
|
||||
coreFactory func(zap.AtomicLevel) zapcore.Core
|
||||
|
||||
mu sync.Mutex
|
||||
loggers map[string]*zap.SugaredLogger
|
||||
loggers map[string]*Logger
|
||||
|
||||
options Options
|
||||
}
|
||||
|
|
@ -53,7 +55,7 @@ type Logging struct {
|
|||
// output where log messages are written to,
|
||||
// options having log levels for named child loggers
|
||||
// and returns a new Logging.
|
||||
func NewLogging(name string, level zapcore.Level, output string, options Options) (*Logging, error) {
|
||||
func NewLogging(name string, level zapcore.Level, output string, options Options, interval time.Duration) (*Logging, error) {
|
||||
verbosity := zap.NewAtomicLevelAt(level)
|
||||
|
||||
var coreFactory func(zap.AtomicLevel) zapcore.Core
|
||||
|
|
@ -72,14 +74,15 @@ func NewLogging(name string, level zapcore.Level, output string, options Options
|
|||
return nil, invalidOutput(output)
|
||||
}
|
||||
|
||||
logger := zap.New(coreFactory(verbosity)).Named(name).Sugar()
|
||||
logger := NewLogger(zap.New(coreFactory(verbosity)).Named(name).Sugar(), interval)
|
||||
|
||||
return &Logging{
|
||||
logger: logger,
|
||||
output: output,
|
||||
verbosity: verbosity,
|
||||
interval: interval,
|
||||
coreFactory: coreFactory,
|
||||
loggers: map[string]*zap.SugaredLogger{},
|
||||
loggers: make(map[string]*Logger),
|
||||
options: options,
|
||||
},
|
||||
nil
|
||||
|
|
@ -88,7 +91,7 @@ func NewLogging(name string, level zapcore.Level, output string, options Options
|
|||
// GetChildLogger returns a named child logger.
|
||||
// Log levels for named child loggers are obtained from the logging options and, if not found,
|
||||
// set to the default log level.
|
||||
func (l *Logging) GetChildLogger(name string) *zap.SugaredLogger {
|
||||
func (l *Logging) GetChildLogger(name string) *Logger {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
|
|
@ -103,14 +106,14 @@ func (l *Logging) GetChildLogger(name string) *zap.SugaredLogger {
|
|||
verbosity = l.verbosity
|
||||
}
|
||||
|
||||
logger := zap.New(l.coreFactory(verbosity)).Named(name).Sugar()
|
||||
logger := NewLogger(zap.New(l.coreFactory(verbosity)).Named(name).Sugar(), l.interval)
|
||||
l.loggers[name] = logger
|
||||
|
||||
return logger
|
||||
}
|
||||
|
||||
// GetLogger returns the default logger.
|
||||
func (l *Logging) GetLogger() *zap.SugaredLogger {
|
||||
func (l *Logging) GetLogger() *Logger {
|
||||
return l.logger
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue