diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 4b267f78..78f9c725 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -11,6 +11,7 @@ import ( "github.com/icinga/icingadb/pkg/icingadb/overdue" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" "github.com/okzk/sdnotify" @@ -117,6 +118,8 @@ func run() int { } defer db.Close() ha = icingadb.NewHA(ctx, db, heartbeat, logs.GetChildLogger("high-availability")) + + telemetry.WriteStats(ctx, rc, logs.GetChildLogger("telemetry")) } // Closing ha on exit ensures that this instance retracts its heartbeat // from the database so that another instance can take over immediately. diff --git a/config.example.yml b/config.example.yml index 76c82d4a..c6cdbc5a 100644 --- a/config.example.yml +++ b/config.example.yml @@ -47,6 +47,7 @@ logging: # redis: # retention: # runtime-updates: +# telemetry: retention: # Number of days to retain full historical data. By default, historical data is retained forever. diff --git a/doc/03-Configuration.md b/doc/03-Configuration.md index 24322a60..14f58e59 100644 --- a/doc/03-Configuration.md +++ b/doc/03-Configuration.md @@ -63,6 +63,7 @@ overdue-sync | Calculation and synchronization of the overdue status redis | Redis connection status and queries. retention | Deletes historical data that exceed their configured retention period. runtime-updates | Runtime updates of config objects after the initial config synchronization. +telemetry | Reporting of Icinga DB status to Icinga 2 via Redis (for monitoring purposes). ### Duration String diff --git a/pkg/icingadb/history/retention.go b/pkg/icingadb/history/retention.go index 2e899fc4..ff217cdd 100644 --- a/pkg/icingadb/history/retention.go +++ b/pkg/icingadb/history/retention.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/icinga/icingadb/pkg/icingadb" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/pkg/errors" @@ -185,7 +186,10 @@ func (r *Retention) Start(ctx context.Context) error { r.logger.Debugf("Cleaning up historical data for category %s from table %s older than %s", stmt.Category, stmt.Table, olderThan) - deleted, err := r.db.CleanupOlderThan(ctx, stmt.CleanupStmt, e.Id, r.count, olderThan) + deleted, err := r.db.CleanupOlderThan( + ctx, stmt.CleanupStmt, e.Id, r.count, olderThan, + icingadb.OnSuccessIncrement[struct{}](&telemetry.Stats.HistoryCleanup), + ) if err != nil { select { case errs <- err: diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index 9e961f00..25e319bb 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -10,6 +10,7 @@ import ( v1types "github.com/icinga/icingadb/pkg/icingadb/v1" v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/structify" @@ -157,6 +158,7 @@ func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redi } counter.Add(uint64(len(ids))) + telemetry.Stats.History.Add(uint64(len(ids))) case <-ctx.Done(): return ctx.Err() } diff --git a/pkg/icingadb/overdue/sync.go b/pkg/icingadb/overdue/sync.go index d8ba24bd..5cd4d674 100644 --- a/pkg/icingadb/overdue/sync.go +++ b/pkg/icingadb/overdue/sync.go @@ -13,6 +13,7 @@ import ( "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingadb/v1/overdue" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/pkg/errors" @@ -206,6 +207,7 @@ func (s Sync) updateOverdue( } counter.Add(uint64(len(ids))) + telemetry.Stats.Overdue.Add(uint64(len(ids))) var op func(ctx context.Context, key string, members ...interface{}) *redis.IntCmd if overdue { diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 4c92fc29..2ecd206f 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -9,6 +9,7 @@ import ( "github.com/icinga/icingadb/pkg/contracts" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/structify" @@ -67,6 +68,7 @@ func (r *RuntimeUpdates) Sync( for _, factoryFunc := range factoryFuncs { s := common.NewSyncSubject(factoryFunc) + stat := getCounterForEntity(s.Entity()) updateMessages := make(chan redis.XMessage, r.redis.Options.XReadCount) upsertEntities := make(chan contracts.Entity, r.redis.Options.XReadCount) @@ -107,7 +109,9 @@ func (r *RuntimeUpdates) Sync( // Updates must be executed in order, ensure this by using a semaphore with maximum 1. sem := semaphore.NewWeighted(1) - onSuccess := []OnSuccess[contracts.Entity]{OnSuccessIncrement[contracts.Entity](&counter)} + onSuccess := []OnSuccess[contracts.Entity]{ + OnSuccessIncrement[contracts.Entity](&counter), OnSuccessIncrement[contracts.Entity](stat), + } if !allowParallel { onSuccess = append(onSuccess, OnSuccessSendTo(upsertedFifo)) } @@ -127,7 +131,7 @@ func (r *RuntimeUpdates) Sync( sem := r.db.GetSemaphoreForTable(utils.TableName(s.Entity())) - onSuccess := []OnSuccess[any]{OnSuccessIncrement[any](&counter)} + onSuccess := []OnSuccess[any]{OnSuccessIncrement[any](&counter), OnSuccessIncrement[any](stat)} if !allowParallel { onSuccess = append(onSuccess, OnSuccessSendTo(deletedFifo)) } @@ -173,6 +177,7 @@ func (r *RuntimeUpdates) Sync( return r.db.NamedBulkExec( ctx, cvStmt, cvCount, sem, customvars, com.SplitOnDupId[contracts.Entity], OnSuccessIncrement[contracts.Entity](&counter), + OnSuccessIncrement[contracts.Entity](&telemetry.Stats.Config), ) }) @@ -192,6 +197,7 @@ func (r *RuntimeUpdates) Sync( return r.db.NamedBulkExec( ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, com.SplitOnDupId[contracts.Entity], OnSuccessIncrement[contracts.Entity](&counter), + OnSuccessIncrement[contracts.Entity](&telemetry.Stats.Config), ) }) diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 3372e98b..9af78e86 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -8,6 +8,7 @@ import ( "github.com/icinga/icingadb/pkg/contracts" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" @@ -101,6 +102,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { } g, ctx := errgroup.WithContext(ctx) + stat := getCounterForEntity(delta.Subject.Entity()) // Create if len(delta.Create) > 0 { @@ -125,7 +127,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { } g.Go(func() error { - return s.db.CreateStreamed(ctx, entities) + return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat)) }) } @@ -149,7 +151,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { g.Go(func() error { // Using upsert here on purpose as this is the fastest way to do bulk updates. // However, there is a risk that errors in the sync implementation could silently insert new rows. - return s.db.UpsertStreamed(ctx, entities) + return s.db.UpsertStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat)) }) } @@ -157,7 +159,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { if len(delta.Delete) > 0 { s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) g.Go(func() error { - return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs()) + return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), OnSuccessIncrement[any](stat)) }) } @@ -201,3 +203,13 @@ func (s Sync) SyncCustomvars(ctx context.Context) error { return g.Wait() } + +// getCounterForEntity returns the appropriate counter (config/state) from telemetry.Stats for e. +func getCounterForEntity(e contracts.Entity) *com.Counter { + switch e.(type) { + case *v1.HostState, *v1.ServiceState: + return &telemetry.Stats.State + default: + return &telemetry.Stats.Config + } +} diff --git a/pkg/icingaredis/telemetry/stats.go b/pkg/icingaredis/telemetry/stats.go new file mode 100644 index 00000000..79cf38a6 --- /dev/null +++ b/pkg/icingaredis/telemetry/stats.go @@ -0,0 +1,51 @@ +package telemetry + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/icinga/icingadb/pkg/utils" + "go.uber.org/zap" + "strconv" + "time" +) + +var Stats struct { + // Config & co. are to be increased by the T sync once for every T object synced. + Config, State, History, Overdue, HistoryCleanup com.Counter +} + +// WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2. +func WriteStats(ctx context.Context, client *icingaredis.Client, logger *logging.Logger) { + counters := map[string]*com.Counter{ + "sync_config": &Stats.Config, + "sync_state": &Stats.State, + "sync_history": &Stats.History, + "sync_overdue": &Stats.Overdue, + "cleanup_history": &Stats.HistoryCleanup, + } + + periodic.Start(ctx, time.Second, func(_ periodic.Tick) { + var data []string + for kind, counter := range counters { + if cnt := counter.Reset(); cnt > 0 { + data = append(data, kind, strconv.FormatUint(cnt, 10)) + } + } + + if data != nil { + cmd := client.XAdd(ctx, &redis.XAddArgs{ + Stream: "icingadb:telemetry:stats", + MaxLen: 15 * 60, + Approx: true, + Values: data, + }) + if err := cmd.Err(); err != nil && !utils.IsContextCanceled(err) { + logger.Warnw("Can't update own stats", zap.Error(icingaredis.WrapCmdErr(cmd))) + } + } + }) +}