diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index ed1aecb9..dd2b5b0a 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -186,7 +186,7 @@ func run() int { } callbackCfg = &history.SyncCallbackConf{ - Name: "notifications_sync", + StatPtr: &telemetry.Stats.NotificationSync, KeyStructPtr: notifications.SyncKeyStructPtrs, Fn: notificationsSource.Submit, } diff --git a/pkg/icingadb/history/retention.go b/pkg/icingadb/history/retention.go index e9d893bd..2d3a6de1 100644 --- a/pkg/icingadb/history/retention.go +++ b/pkg/icingadb/history/retention.go @@ -230,7 +230,7 @@ func (r *Retention) Start(ctx context.Context) error { deleted, err := stmt.CleanupOlderThan( ctx, r.db, e.Id, r.count, olderThan, - database.OnSuccessIncrement[struct{}](telemetry.Stats.Get(telemetry.StatHistoryCleanup)), + database.OnSuccessIncrement[struct{}](&telemetry.Stats.HistoryCleanup), ) if err != nil { select { diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index 3e553a93..073d352f 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -32,8 +32,8 @@ type Sync struct { // SyncCallbackConf configures a callback stage given to Sync.Sync. type SyncCallbackConf struct { - // Name of this callback, used in [telemetry.Stats]. - Name string + // StatPtr refers a [com.Counter] from the [telemetry.Stats] struct, e.g., Stats.NotificationSync. + StatPtr *com.Counter // KeyStructPtr says which pipeline keys should be mapped to which type, identified by a struct pointer. If // a key is missing from the map, it will not be used for the callback. KeyStructPtr map[string]any @@ -60,7 +60,7 @@ func (s Sync) Sync(ctx context.Context, callbackCfg *SyncCallbackConf) error { callbackStageFn = makeSortedCallbackStageFunc( ctx, s.logger, - callbackCfg.Name, + callbackCfg.StatPtr, callbackCfg.KeyStructPtr, callbackCfg.Fn) } @@ -417,7 +417,7 @@ func countElementStage(ctx context.Context, _ Sync, _ string, in <-chan redis.XM return nil } - telemetry.Stats.Get(telemetry.StatHistory).Add(1) + telemetry.Stats.History.Add(1) out <- msg case <-ctx.Done(): @@ -440,12 +440,12 @@ func countElementStage(ctx context.Context, _ Sync, _ string, in <-chan redis.XM // If the callback function returns false, the message will be retried after an increasing backoff. All subsequent // messages will wait until this one succeeds. // -// For each successfully submitted message, the telemetry stat named after this callback is incremented. Thus, a delta -// between [telemetry.StatHistory] and this stat indicates blocking callbacks. +// For each successfully submitted message, the telemetry stat referenced via a pointer s incremented. Thus, a delta +// between telemetry.Stats.History and this stat indicates blocking callbacks. func makeSortedCallbackStageFunc( ctx context.Context, logger *logging.Logger, - name string, + statPtr *com.Counter, keyStructPtrs map[string]any, fn func(database.Entity) bool, ) stageFunc { @@ -490,7 +490,7 @@ func makeSortedCallbackStageFunc( success := fn(entity) if success { - telemetry.Stats.Get(name).Add(1) + statPtr.Add(1) } return success } diff --git a/pkg/icingadb/overdue/sync.go b/pkg/icingadb/overdue/sync.go index 049f217d..b1b2f488 100644 --- a/pkg/icingadb/overdue/sync.go +++ b/pkg/icingadb/overdue/sync.go @@ -219,7 +219,7 @@ func (s Sync) updateOverdue( } counter.Add(uint64(len(ids))) - telemetry.Stats.Get(telemetry.StatOverdue).Add(uint64(len(ids))) + telemetry.Stats.Overdue.Add(uint64(len(ids))) var op func(ctx context.Context, key string, members ...any) *redis.IntCmd if overdue { diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index e5b5efb2..888f4b4d 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -184,7 +184,7 @@ func (r *RuntimeUpdates) Sync( return r.db.NamedBulkExec( ctx, cvStmt, cvCount, sem, customvars, database.SplitOnDupId[database.Entity], database.OnSuccessIncrement[database.Entity](&counter), - database.OnSuccessIncrement[database.Entity](telemetry.Stats.Get(telemetry.StatConfig)), + database.OnSuccessIncrement[database.Entity](&telemetry.Stats.Config), ) }) @@ -204,7 +204,7 @@ func (r *RuntimeUpdates) Sync( return r.db.NamedBulkExec( ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, database.SplitOnDupId[database.Entity], database.OnSuccessIncrement[database.Entity](&counter), - database.OnSuccessIncrement[database.Entity](telemetry.Stats.Get(telemetry.StatConfig)), + database.OnSuccessIncrement[database.Entity](&telemetry.Stats.Config), ) }) diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 73e4d2e3..b353c784 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -225,8 +225,8 @@ func (s Sync) SyncCustomvars(ctx context.Context) error { func getCounterForEntity(e database.Entity) *com.Counter { switch e.(type) { case *v1.HostState, *v1.ServiceState: - return telemetry.Stats.Get(telemetry.StatState) + return &telemetry.Stats.State default: - return telemetry.Stats.Get(telemetry.StatConfig) + return &telemetry.Stats.Config } } diff --git a/pkg/icingaredis/telemetry/stats.go b/pkg/icingaredis/telemetry/stats.go index bbe8c03c..106e81af 100644 --- a/pkg/icingaredis/telemetry/stats.go +++ b/pkg/icingaredis/telemetry/stats.go @@ -2,74 +2,42 @@ package telemetry import ( "context" - "fmt" "github.com/icinga/icinga-go-library/com" "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-go-library/periodic" "github.com/icinga/icinga-go-library/redis" "github.com/icinga/icinga-go-library/utils" "go.uber.org/zap" - "iter" "strconv" - "sync" "time" ) -// StatsKeeper holds multiple [com.Counter] values by name, to be used for statistics in WriteStats. -type StatsKeeper struct { - m sync.Map +var Stats struct { + // Config & co. are to be increased by the T sync once for every T object synced. + Config com.Counter + State com.Counter + History com.Counter + Callback com.Counter + Overdue com.Counter + HistoryCleanup com.Counter + NotificationSync com.Counter } -// Get or create a [com.Counter] by its name. -func (statsKeeper *StatsKeeper) Get(key string) *com.Counter { - ctrAny, _ := statsKeeper.m.LoadOrStore(key, &com.Counter{}) - - ctr, ok := ctrAny.(*com.Counter) - if !ok { - // Should not happen unless someone messes with the internal map. - panic(fmt.Sprintf( - "StatsKeeper.Get(%q) returned something of type %T, not *com.Counter", - key, ctrAny)) - } - - return ctr -} - -// Iterator over all keys and their [com.Counter]. -func (statsKeeper *StatsKeeper) Iterator() iter.Seq2[string, *com.Counter] { - return func(yield func(string, *com.Counter) bool) { - statsKeeper.m.Range(func(keyAny, ctrAny any) bool { - key, keyOk := keyAny.(string) - ctr, ctrOk := ctrAny.(*com.Counter) - if !keyOk || !ctrOk { - // Should not happen unless someone messes with the internal map. - panic(fmt.Sprintf( - "iterating StatsKeeper on key %q got types (%T, %T), not (string, *com.Counter)", - keyAny, keyAny, ctrAny)) - } - - return yield(key, ctr) - }) - } -} - -// Stats is the singleton StatsKeeper to be used to access a [com.Counter]. -var Stats = &StatsKeeper{} - -// Keys for different well known Stats entries. -const ( - StatConfig = "config_sync" - StatState = "state_sync" - StatHistory = "history_sync" - StatOverdue = "overdue_sync" - StatHistoryCleanup = "history_cleanup" -) - // WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2. func WriteStats(ctx context.Context, client *redis.Client, logger *logging.Logger) { + counters := map[string]*com.Counter{ + "config_sync": &Stats.Config, + "state_sync": &Stats.State, + "history_sync": &Stats.History, + "callback_sync": &Stats.Callback, + "overdue_sync": &Stats.Overdue, + "history_cleanup": &Stats.HistoryCleanup, + "notification_sync": &Stats.NotificationSync, + } + periodic.Start(ctx, time.Second, func(_ periodic.Tick) { var data []string - for kind, counter := range Stats.Iterator() { + for kind, counter := range counters { if cnt := counter.Reset(); cnt > 0 { data = append(data, kind, strconv.FormatUint(cnt, 10)) } diff --git a/pkg/icingaredis/telemetry/stats_test.go b/pkg/icingaredis/telemetry/stats_test.go deleted file mode 100644 index 4b851e98..00000000 --- a/pkg/icingaredis/telemetry/stats_test.go +++ /dev/null @@ -1,44 +0,0 @@ -package telemetry - -import ( - "github.com/stretchr/testify/assert" - "testing" -) - -func TestStatsKeeper(t *testing.T) { - desiredState := map[string]uint64{ - "foo": 23, - "bar": 42, - "baz": 0, - } - - stats := &StatsKeeper{} - - // Populate based on desiredState - for key, counterValue := range desiredState { - ctr := stats.Get(key) - ctr.Add(counterValue) - } - - // Check if desiredState is set - for key, counterValue := range desiredState { - ctr := stats.Get(key) - assert.Equal(t, counterValue, ctr.Val()) - } - - // Get reference, change value, compare - fooKey := "foo" - fooCtr := stats.Get(fooKey) - assert.Equal(t, desiredState[fooKey], fooCtr.Reset()) - assert.Equal(t, uint64(0), fooCtr.Val()) - assert.Equal(t, uint64(0), stats.Get(fooKey).Val()) - fooCtr.Add(desiredState[fooKey]) - assert.Equal(t, desiredState[fooKey], stats.Get(fooKey).Val()) - - // Range over - for key, ctr := range stats.Iterator() { - desired, ok := desiredState[key] - assert.True(t, ok) - assert.Equal(t, desired, ctr.Val()) - } -}