diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index ed539225..c47cea83 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -171,8 +171,11 @@ func run() int { signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) go func() { - var callback func(database.Entity) bool - var callbackKeyStructPtr map[string]any + var ( + callbackName string + callbackKeyStructPtr map[string]any + callbackFn func(database.Entity) bool + ) if cfg := cmd.Config.NotificationsSource; cfg.ApiBaseUrl != "" { logger.Info("Starting Icinga Notifications source") @@ -183,13 +186,20 @@ func run() int { rc, logs.GetChildLogger("notifications-source"), cfg) - callback = notificationsSource.Submit + + callbackName = "notifications_sync" callbackKeyStructPtr = notifications.SyncKeyStructPtrs + callbackFn = notificationsSource.Submit } logger.Info("Starting history sync") - if err := hs.Sync(ctx, callbackKeyStructPtr, callback); err != nil && !utils.IsContextCanceled(err) { + if err := hs.Sync( + ctx, + callbackName, + callbackKeyStructPtr, + callbackFn, + ); err != nil && !utils.IsContextCanceled(err) { logger.Fatalf("%+v", err) } }() diff --git a/pkg/icingadb/history/retention.go b/pkg/icingadb/history/retention.go index 2d3a6de1..e9d893bd 100644 --- a/pkg/icingadb/history/retention.go +++ b/pkg/icingadb/history/retention.go @@ -230,7 +230,7 @@ func (r *Retention) Start(ctx context.Context) error { deleted, err := stmt.CleanupOlderThan( ctx, r.db, e.Id, r.count, olderThan, - database.OnSuccessIncrement[struct{}](&telemetry.Stats.HistoryCleanup), + database.OnSuccessIncrement[struct{}](telemetry.Stats.Get(telemetry.StatHistoryCleanup)), ) if err != nil { select { diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index 82ba9374..49fbfdf6 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -41,13 +41,23 @@ func NewSync(db *database.DB, redis *redis.Client, logger *logging.Logger) *Sync // Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success. // -// An optional callback and callbackKeyStructPtr might be given. Both most either be nil or not nil. +// It is possible to enable a callback functionality, e.g., for the Icinga Notifications integration. To do so, the +// optional callbackFn and callbackKeyStructPtr must be set. Both must either be nil or not nil. If set, the additional +// callbackName must also be set, to be used in [telemetry.Stats]. // // The callbackKeyStructPtr says which pipeline keys should be mapped to which type, identified by a struct pointer. If -// a key is missing from the map, it will not be used for the callback. The callback function itself shall not block. -func (s Sync) Sync(ctx context.Context, callbackKeyStructPtr map[string]any, callback func(database.Entity) bool) error { - if (callbackKeyStructPtr == nil) != (callback == nil) { - return fmt.Errorf("either both callbackKeyStructPtr and callback must be nil or none") +// a key is missing from the map, it will not be used for the callback. The callbackFn function shall not block. +func (s Sync) Sync( + ctx context.Context, + callbackName string, + callbackKeyStructPtr map[string]any, + callbackFn func(database.Entity) bool, +) error { + if (callbackKeyStructPtr == nil) != (callbackFn == nil) { + return fmt.Errorf("either both callbackKeyStructPtr and callbackFn must be nil or none") + } + if (callbackKeyStructPtr != nil) && (callbackName == "") { + return fmt.Errorf("if callbackKeyStructPtr and callbackFn are set, a callbackName is required") } g, ctx := errgroup.WithContext(ctx) @@ -87,7 +97,7 @@ func (s Sync) Sync(ctx context.Context, callbackKeyStructPtr map[string]any, cal // Shadowed variable to allow appending custom callbacks. pipeline := pipeline if hasCallbackStage { - pipeline = append(pipeline, makeCallbackStageFunc(callbackKeyStructPtr, callback)) + pipeline = append(pipeline, makeCallbackStageFunc(callbackName, callbackKeyStructPtr, callbackFn)) } ch := make([]chan redis.XMessage, len(pipeline)+1) @@ -402,7 +412,7 @@ func countElementStage(ctx context.Context, _ Sync, _ string, in <-chan redis.XM return nil } - telemetry.Stats.History.Add(1) + telemetry.Stats.Get(telemetry.StatHistory).Add(1) out <- msg case <-ctx.Done(): @@ -423,9 +433,13 @@ func countElementStage(ctx context.Context, _ Sync, _ string, in <-chan redis.XM // callback method, it will be forwarded to the out channel. Thus, this stage might "block" or "hold back" certain // messages during unhappy callback times. // -// For each successfully submitted message, [telemetry.State.Callback] is incremented. Thus, a delta between -// [telemetry.State.History] and [telemetry.State.Callback] indicates blocking callbacks. -func makeCallbackStageFunc(keyStructPtrs map[string]any, callback func(database.Entity) bool) stageFunc { +// For each successfully submitted message, the telemetry stat named after this callback is incremented. Thus, a delta +// between [telemetry.StatHistory] and this stat indicates blocking callbacks. +func makeCallbackStageFunc( + name string, + keyStructPtrs map[string]any, + fn func(database.Entity) bool, +) stageFunc { return func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error { defer close(out) @@ -479,9 +493,9 @@ func makeCallbackStageFunc(keyStructPtrs map[string]any, callback func(database. return err } - if callback(entity) { + if fn(entity) { out <- msg - telemetry.Stats.Callback.Add(1) + telemetry.Stats.Get(name).Add(1) backlogLastId = "" } else { backlogLastId = msg.ID @@ -521,10 +535,10 @@ func makeCallbackStageFunc(keyStructPtrs map[string]any, callback func(database. return errors.Wrapf(err, "can't structify backlog value %q for %q", backlogLastId, key) } - if callback(entity) { + if fn(entity) { out <- msg backlogMsgCounter++ - telemetry.Stats.Callback.Add(1) + telemetry.Stats.Get(name).Add(1) if len(msgs) == 1 { backlogLastId = "" diff --git a/pkg/icingadb/overdue/sync.go b/pkg/icingadb/overdue/sync.go index b1b2f488..049f217d 100644 --- a/pkg/icingadb/overdue/sync.go +++ b/pkg/icingadb/overdue/sync.go @@ -219,7 +219,7 @@ func (s Sync) updateOverdue( } counter.Add(uint64(len(ids))) - telemetry.Stats.Overdue.Add(uint64(len(ids))) + telemetry.Stats.Get(telemetry.StatOverdue).Add(uint64(len(ids))) var op func(ctx context.Context, key string, members ...any) *redis.IntCmd if overdue { diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 888f4b4d..e5b5efb2 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -184,7 +184,7 @@ func (r *RuntimeUpdates) Sync( return r.db.NamedBulkExec( ctx, cvStmt, cvCount, sem, customvars, database.SplitOnDupId[database.Entity], database.OnSuccessIncrement[database.Entity](&counter), - database.OnSuccessIncrement[database.Entity](&telemetry.Stats.Config), + database.OnSuccessIncrement[database.Entity](telemetry.Stats.Get(telemetry.StatConfig)), ) }) @@ -204,7 +204,7 @@ func (r *RuntimeUpdates) Sync( return r.db.NamedBulkExec( ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, database.SplitOnDupId[database.Entity], database.OnSuccessIncrement[database.Entity](&counter), - database.OnSuccessIncrement[database.Entity](&telemetry.Stats.Config), + database.OnSuccessIncrement[database.Entity](telemetry.Stats.Get(telemetry.StatConfig)), ) }) diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index b353c784..73e4d2e3 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -225,8 +225,8 @@ func (s Sync) SyncCustomvars(ctx context.Context) error { func getCounterForEntity(e database.Entity) *com.Counter { switch e.(type) { case *v1.HostState, *v1.ServiceState: - return &telemetry.Stats.State + return telemetry.Stats.Get(telemetry.StatState) default: - return &telemetry.Stats.Config + return telemetry.Stats.Get(telemetry.StatConfig) } } diff --git a/pkg/icingaredis/telemetry/stats.go b/pkg/icingaredis/telemetry/stats.go index a3484512..bbe8c03c 100644 --- a/pkg/icingaredis/telemetry/stats.go +++ b/pkg/icingaredis/telemetry/stats.go @@ -2,40 +2,74 @@ package telemetry import ( "context" + "fmt" "github.com/icinga/icinga-go-library/com" "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-go-library/periodic" "github.com/icinga/icinga-go-library/redis" "github.com/icinga/icinga-go-library/utils" "go.uber.org/zap" + "iter" "strconv" + "sync" "time" ) -var Stats struct { - // Config & co. are to be increased by the T sync once for every T object synced. - Config com.Counter - State com.Counter - History com.Counter - Callback com.Counter - Overdue com.Counter - HistoryCleanup com.Counter +// StatsKeeper holds multiple [com.Counter] values by name, to be used for statistics in WriteStats. +type StatsKeeper struct { + m sync.Map } +// Get or create a [com.Counter] by its name. +func (statsKeeper *StatsKeeper) Get(key string) *com.Counter { + ctrAny, _ := statsKeeper.m.LoadOrStore(key, &com.Counter{}) + + ctr, ok := ctrAny.(*com.Counter) + if !ok { + // Should not happen unless someone messes with the internal map. + panic(fmt.Sprintf( + "StatsKeeper.Get(%q) returned something of type %T, not *com.Counter", + key, ctrAny)) + } + + return ctr +} + +// Iterator over all keys and their [com.Counter]. +func (statsKeeper *StatsKeeper) Iterator() iter.Seq2[string, *com.Counter] { + return func(yield func(string, *com.Counter) bool) { + statsKeeper.m.Range(func(keyAny, ctrAny any) bool { + key, keyOk := keyAny.(string) + ctr, ctrOk := ctrAny.(*com.Counter) + if !keyOk || !ctrOk { + // Should not happen unless someone messes with the internal map. + panic(fmt.Sprintf( + "iterating StatsKeeper on key %q got types (%T, %T), not (string, *com.Counter)", + keyAny, keyAny, ctrAny)) + } + + return yield(key, ctr) + }) + } +} + +// Stats is the singleton StatsKeeper to be used to access a [com.Counter]. +var Stats = &StatsKeeper{} + +// Keys for different well known Stats entries. +const ( + StatConfig = "config_sync" + StatState = "state_sync" + StatHistory = "history_sync" + StatOverdue = "overdue_sync" + StatHistoryCleanup = "history_cleanup" +) + // WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2. func WriteStats(ctx context.Context, client *redis.Client, logger *logging.Logger) { - counters := map[string]*com.Counter{ - "config_sync": &Stats.Config, - "state_sync": &Stats.State, - "history_sync": &Stats.History, - "callback_sync": &Stats.Callback, - "overdue_sync": &Stats.Overdue, - "history_cleanup": &Stats.HistoryCleanup, - } - periodic.Start(ctx, time.Second, func(_ periodic.Tick) { var data []string - for kind, counter := range counters { + for kind, counter := range Stats.Iterator() { if cnt := counter.Reset(); cnt > 0 { data = append(data, kind, strconv.FormatUint(cnt, 10)) } diff --git a/pkg/icingaredis/telemetry/stats_test.go b/pkg/icingaredis/telemetry/stats_test.go new file mode 100644 index 00000000..4b851e98 --- /dev/null +++ b/pkg/icingaredis/telemetry/stats_test.go @@ -0,0 +1,44 @@ +package telemetry + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestStatsKeeper(t *testing.T) { + desiredState := map[string]uint64{ + "foo": 23, + "bar": 42, + "baz": 0, + } + + stats := &StatsKeeper{} + + // Populate based on desiredState + for key, counterValue := range desiredState { + ctr := stats.Get(key) + ctr.Add(counterValue) + } + + // Check if desiredState is set + for key, counterValue := range desiredState { + ctr := stats.Get(key) + assert.Equal(t, counterValue, ctr.Val()) + } + + // Get reference, change value, compare + fooKey := "foo" + fooCtr := stats.Get(fooKey) + assert.Equal(t, desiredState[fooKey], fooCtr.Reset()) + assert.Equal(t, uint64(0), fooCtr.Val()) + assert.Equal(t, uint64(0), stats.Get(fooKey).Val()) + fooCtr.Add(desiredState[fooKey]) + assert.Equal(t, desiredState[fooKey], stats.Get(fooKey).Val()) + + // Range over + for key, ctr := range stats.Iterator() { + desired, ok := desiredState[key] + assert.True(t, ok) + assert.Equal(t, desired, ctr.Val()) + } +}