telemetry: Undo Stats rework

Effectively reverting cf4bd92611 and
passing a pointer to the relevant com.Counter to the history sync.
This commit is contained in:
Alvar Penning 2025-10-31 15:51:24 +01:00
parent e283ac0d66
commit e475a5ef91
No known key found for this signature in database
8 changed files with 35 additions and 111 deletions

View file

@ -186,7 +186,7 @@ func run() int {
}
callbackCfg = &history.SyncCallbackConf{
Name: "notifications_sync",
StatPtr: &telemetry.Stats.NotificationSync,
KeyStructPtr: notifications.SyncKeyStructPtrs,
Fn: notificationsSource.Submit,
}

View file

@ -230,7 +230,7 @@ func (r *Retention) Start(ctx context.Context) error {
deleted, err := stmt.CleanupOlderThan(
ctx, r.db, e.Id, r.count, olderThan,
database.OnSuccessIncrement[struct{}](telemetry.Stats.Get(telemetry.StatHistoryCleanup)),
database.OnSuccessIncrement[struct{}](&telemetry.Stats.HistoryCleanup),
)
if err != nil {
select {

View file

@ -32,8 +32,8 @@ type Sync struct {
// SyncCallbackConf configures a callback stage given to Sync.Sync.
type SyncCallbackConf struct {
// Name of this callback, used in [telemetry.Stats].
Name string
// StatPtr refers a [com.Counter] from the [telemetry.Stats] struct, e.g., Stats.NotificationSync.
StatPtr *com.Counter
// KeyStructPtr says which pipeline keys should be mapped to which type, identified by a struct pointer. If
// a key is missing from the map, it will not be used for the callback.
KeyStructPtr map[string]any
@ -60,7 +60,7 @@ func (s Sync) Sync(ctx context.Context, callbackCfg *SyncCallbackConf) error {
callbackStageFn = makeSortedCallbackStageFunc(
ctx,
s.logger,
callbackCfg.Name,
callbackCfg.StatPtr,
callbackCfg.KeyStructPtr,
callbackCfg.Fn)
}
@ -417,7 +417,7 @@ func countElementStage(ctx context.Context, _ Sync, _ string, in <-chan redis.XM
return nil
}
telemetry.Stats.Get(telemetry.StatHistory).Add(1)
telemetry.Stats.History.Add(1)
out <- msg
case <-ctx.Done():
@ -440,12 +440,12 @@ func countElementStage(ctx context.Context, _ Sync, _ string, in <-chan redis.XM
// If the callback function returns false, the message will be retried after an increasing backoff. All subsequent
// messages will wait until this one succeeds.
//
// For each successfully submitted message, the telemetry stat named after this callback is incremented. Thus, a delta
// between [telemetry.StatHistory] and this stat indicates blocking callbacks.
// For each successfully submitted message, the telemetry stat referenced via a pointer s incremented. Thus, a delta
// between telemetry.Stats.History and this stat indicates blocking callbacks.
func makeSortedCallbackStageFunc(
ctx context.Context,
logger *logging.Logger,
name string,
statPtr *com.Counter,
keyStructPtrs map[string]any,
fn func(database.Entity) bool,
) stageFunc {
@ -490,7 +490,7 @@ func makeSortedCallbackStageFunc(
success := fn(entity)
if success {
telemetry.Stats.Get(name).Add(1)
statPtr.Add(1)
}
return success
}

View file

@ -219,7 +219,7 @@ func (s Sync) updateOverdue(
}
counter.Add(uint64(len(ids)))
telemetry.Stats.Get(telemetry.StatOverdue).Add(uint64(len(ids)))
telemetry.Stats.Overdue.Add(uint64(len(ids)))
var op func(ctx context.Context, key string, members ...any) *redis.IntCmd
if overdue {

View file

@ -184,7 +184,7 @@ func (r *RuntimeUpdates) Sync(
return r.db.NamedBulkExec(
ctx, cvStmt, cvCount, sem, customvars, database.SplitOnDupId[database.Entity],
database.OnSuccessIncrement[database.Entity](&counter),
database.OnSuccessIncrement[database.Entity](telemetry.Stats.Get(telemetry.StatConfig)),
database.OnSuccessIncrement[database.Entity](&telemetry.Stats.Config),
)
})
@ -204,7 +204,7 @@ func (r *RuntimeUpdates) Sync(
return r.db.NamedBulkExec(
ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars,
database.SplitOnDupId[database.Entity], database.OnSuccessIncrement[database.Entity](&counter),
database.OnSuccessIncrement[database.Entity](telemetry.Stats.Get(telemetry.StatConfig)),
database.OnSuccessIncrement[database.Entity](&telemetry.Stats.Config),
)
})

View file

@ -225,8 +225,8 @@ func (s Sync) SyncCustomvars(ctx context.Context) error {
func getCounterForEntity(e database.Entity) *com.Counter {
switch e.(type) {
case *v1.HostState, *v1.ServiceState:
return telemetry.Stats.Get(telemetry.StatState)
return &telemetry.Stats.State
default:
return telemetry.Stats.Get(telemetry.StatConfig)
return &telemetry.Stats.Config
}
}

View file

@ -2,74 +2,42 @@ package telemetry
import (
"context"
"fmt"
"github.com/icinga/icinga-go-library/com"
"github.com/icinga/icinga-go-library/logging"
"github.com/icinga/icinga-go-library/periodic"
"github.com/icinga/icinga-go-library/redis"
"github.com/icinga/icinga-go-library/utils"
"go.uber.org/zap"
"iter"
"strconv"
"sync"
"time"
)
// StatsKeeper holds multiple [com.Counter] values by name, to be used for statistics in WriteStats.
type StatsKeeper struct {
m sync.Map
var Stats struct {
// Config & co. are to be increased by the T sync once for every T object synced.
Config com.Counter
State com.Counter
History com.Counter
Callback com.Counter
Overdue com.Counter
HistoryCleanup com.Counter
NotificationSync com.Counter
}
// Get or create a [com.Counter] by its name.
func (statsKeeper *StatsKeeper) Get(key string) *com.Counter {
ctrAny, _ := statsKeeper.m.LoadOrStore(key, &com.Counter{})
ctr, ok := ctrAny.(*com.Counter)
if !ok {
// Should not happen unless someone messes with the internal map.
panic(fmt.Sprintf(
"StatsKeeper.Get(%q) returned something of type %T, not *com.Counter",
key, ctrAny))
}
return ctr
}
// Iterator over all keys and their [com.Counter].
func (statsKeeper *StatsKeeper) Iterator() iter.Seq2[string, *com.Counter] {
return func(yield func(string, *com.Counter) bool) {
statsKeeper.m.Range(func(keyAny, ctrAny any) bool {
key, keyOk := keyAny.(string)
ctr, ctrOk := ctrAny.(*com.Counter)
if !keyOk || !ctrOk {
// Should not happen unless someone messes with the internal map.
panic(fmt.Sprintf(
"iterating StatsKeeper on key %q got types (%T, %T), not (string, *com.Counter)",
keyAny, keyAny, ctrAny))
}
return yield(key, ctr)
})
}
}
// Stats is the singleton StatsKeeper to be used to access a [com.Counter].
var Stats = &StatsKeeper{}
// Keys for different well known Stats entries.
const (
StatConfig = "config_sync"
StatState = "state_sync"
StatHistory = "history_sync"
StatOverdue = "overdue_sync"
StatHistoryCleanup = "history_cleanup"
)
// WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2.
func WriteStats(ctx context.Context, client *redis.Client, logger *logging.Logger) {
counters := map[string]*com.Counter{
"config_sync": &Stats.Config,
"state_sync": &Stats.State,
"history_sync": &Stats.History,
"callback_sync": &Stats.Callback,
"overdue_sync": &Stats.Overdue,
"history_cleanup": &Stats.HistoryCleanup,
"notification_sync": &Stats.NotificationSync,
}
periodic.Start(ctx, time.Second, func(_ periodic.Tick) {
var data []string
for kind, counter := range Stats.Iterator() {
for kind, counter := range counters {
if cnt := counter.Reset(); cnt > 0 {
data = append(data, kind, strconv.FormatUint(cnt, 10))
}

View file

@ -1,44 +0,0 @@
package telemetry
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestStatsKeeper(t *testing.T) {
desiredState := map[string]uint64{
"foo": 23,
"bar": 42,
"baz": 0,
}
stats := &StatsKeeper{}
// Populate based on desiredState
for key, counterValue := range desiredState {
ctr := stats.Get(key)
ctr.Add(counterValue)
}
// Check if desiredState is set
for key, counterValue := range desiredState {
ctr := stats.Get(key)
assert.Equal(t, counterValue, ctr.Val())
}
// Get reference, change value, compare
fooKey := "foo"
fooCtr := stats.Get(fooKey)
assert.Equal(t, desiredState[fooKey], fooCtr.Reset())
assert.Equal(t, uint64(0), fooCtr.Val())
assert.Equal(t, uint64(0), stats.Get(fooKey).Val())
fooCtr.Add(desiredState[fooKey])
assert.Equal(t, desiredState[fooKey], stats.Get(fooKey).Val())
// Range over
for key, ctr := range stats.Iterator() {
desired, ok := desiredState[key]
assert.True(t, ok)
assert.Equal(t, desired, ctr.Val())
}
}