Configurable callback sync telemetry stat name

Refactor the telemetry.Stats to allow custom names. This enabled dynamic
callback names for the Redis history sync, used by Icinga Notifications.
This commit is contained in:
Alvar Penning 2025-10-21 10:15:40 +02:00
parent 7ec28098f8
commit ad26a7857d
No known key found for this signature in database
8 changed files with 144 additions and 42 deletions

View file

@ -171,8 +171,11 @@ func run() int {
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
go func() {
var callback func(database.Entity) bool
var callbackKeyStructPtr map[string]any
var (
callbackName string
callbackKeyStructPtr map[string]any
callbackFn func(database.Entity) bool
)
if cfg := cmd.Config.NotificationsSource; cfg.ApiBaseUrl != "" {
logger.Info("Starting Icinga Notifications source")
@ -183,13 +186,20 @@ func run() int {
rc,
logs.GetChildLogger("notifications-source"),
cfg)
callback = notificationsSource.Submit
callbackName = "notifications_sync"
callbackKeyStructPtr = notifications.SyncKeyStructPtrs
callbackFn = notificationsSource.Submit
}
logger.Info("Starting history sync")
if err := hs.Sync(ctx, callbackKeyStructPtr, callback); err != nil && !utils.IsContextCanceled(err) {
if err := hs.Sync(
ctx,
callbackName,
callbackKeyStructPtr,
callbackFn,
); err != nil && !utils.IsContextCanceled(err) {
logger.Fatalf("%+v", err)
}
}()

View file

@ -230,7 +230,7 @@ func (r *Retention) Start(ctx context.Context) error {
deleted, err := stmt.CleanupOlderThan(
ctx, r.db, e.Id, r.count, olderThan,
database.OnSuccessIncrement[struct{}](&telemetry.Stats.HistoryCleanup),
database.OnSuccessIncrement[struct{}](telemetry.Stats.Get(telemetry.StatHistoryCleanup)),
)
if err != nil {
select {

View file

@ -41,13 +41,23 @@ func NewSync(db *database.DB, redis *redis.Client, logger *logging.Logger) *Sync
// Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success.
//
// An optional callback and callbackKeyStructPtr might be given. Both most either be nil or not nil.
// It is possible to enable a callback functionality, e.g., for the Icinga Notifications integration. To do so, the
// optional callbackFn and callbackKeyStructPtr must be set. Both must either be nil or not nil. If set, the additional
// callbackName must also be set, to be used in [telemetry.Stats].
//
// The callbackKeyStructPtr says which pipeline keys should be mapped to which type, identified by a struct pointer. If
// a key is missing from the map, it will not be used for the callback. The callback function itself shall not block.
func (s Sync) Sync(ctx context.Context, callbackKeyStructPtr map[string]any, callback func(database.Entity) bool) error {
if (callbackKeyStructPtr == nil) != (callback == nil) {
return fmt.Errorf("either both callbackKeyStructPtr and callback must be nil or none")
// a key is missing from the map, it will not be used for the callback. The callbackFn function shall not block.
func (s Sync) Sync(
ctx context.Context,
callbackName string,
callbackKeyStructPtr map[string]any,
callbackFn func(database.Entity) bool,
) error {
if (callbackKeyStructPtr == nil) != (callbackFn == nil) {
return fmt.Errorf("either both callbackKeyStructPtr and callbackFn must be nil or none")
}
if (callbackKeyStructPtr != nil) && (callbackName == "") {
return fmt.Errorf("if callbackKeyStructPtr and callbackFn are set, a callbackName is required")
}
g, ctx := errgroup.WithContext(ctx)
@ -87,7 +97,7 @@ func (s Sync) Sync(ctx context.Context, callbackKeyStructPtr map[string]any, cal
// Shadowed variable to allow appending custom callbacks.
pipeline := pipeline
if hasCallbackStage {
pipeline = append(pipeline, makeCallbackStageFunc(callbackKeyStructPtr, callback))
pipeline = append(pipeline, makeCallbackStageFunc(callbackName, callbackKeyStructPtr, callbackFn))
}
ch := make([]chan redis.XMessage, len(pipeline)+1)
@ -402,7 +412,7 @@ func countElementStage(ctx context.Context, _ Sync, _ string, in <-chan redis.XM
return nil
}
telemetry.Stats.History.Add(1)
telemetry.Stats.Get(telemetry.StatHistory).Add(1)
out <- msg
case <-ctx.Done():
@ -423,9 +433,13 @@ func countElementStage(ctx context.Context, _ Sync, _ string, in <-chan redis.XM
// callback method, it will be forwarded to the out channel. Thus, this stage might "block" or "hold back" certain
// messages during unhappy callback times.
//
// For each successfully submitted message, [telemetry.State.Callback] is incremented. Thus, a delta between
// [telemetry.State.History] and [telemetry.State.Callback] indicates blocking callbacks.
func makeCallbackStageFunc(keyStructPtrs map[string]any, callback func(database.Entity) bool) stageFunc {
// For each successfully submitted message, the telemetry stat named after this callback is incremented. Thus, a delta
// between [telemetry.StatHistory] and this stat indicates blocking callbacks.
func makeCallbackStageFunc(
name string,
keyStructPtrs map[string]any,
fn func(database.Entity) bool,
) stageFunc {
return func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error {
defer close(out)
@ -479,9 +493,9 @@ func makeCallbackStageFunc(keyStructPtrs map[string]any, callback func(database.
return err
}
if callback(entity) {
if fn(entity) {
out <- msg
telemetry.Stats.Callback.Add(1)
telemetry.Stats.Get(name).Add(1)
backlogLastId = ""
} else {
backlogLastId = msg.ID
@ -521,10 +535,10 @@ func makeCallbackStageFunc(keyStructPtrs map[string]any, callback func(database.
return errors.Wrapf(err, "can't structify backlog value %q for %q", backlogLastId, key)
}
if callback(entity) {
if fn(entity) {
out <- msg
backlogMsgCounter++
telemetry.Stats.Callback.Add(1)
telemetry.Stats.Get(name).Add(1)
if len(msgs) == 1 {
backlogLastId = ""

View file

@ -219,7 +219,7 @@ func (s Sync) updateOverdue(
}
counter.Add(uint64(len(ids)))
telemetry.Stats.Overdue.Add(uint64(len(ids)))
telemetry.Stats.Get(telemetry.StatOverdue).Add(uint64(len(ids)))
var op func(ctx context.Context, key string, members ...any) *redis.IntCmd
if overdue {

View file

@ -184,7 +184,7 @@ func (r *RuntimeUpdates) Sync(
return r.db.NamedBulkExec(
ctx, cvStmt, cvCount, sem, customvars, database.SplitOnDupId[database.Entity],
database.OnSuccessIncrement[database.Entity](&counter),
database.OnSuccessIncrement[database.Entity](&telemetry.Stats.Config),
database.OnSuccessIncrement[database.Entity](telemetry.Stats.Get(telemetry.StatConfig)),
)
})
@ -204,7 +204,7 @@ func (r *RuntimeUpdates) Sync(
return r.db.NamedBulkExec(
ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars,
database.SplitOnDupId[database.Entity], database.OnSuccessIncrement[database.Entity](&counter),
database.OnSuccessIncrement[database.Entity](&telemetry.Stats.Config),
database.OnSuccessIncrement[database.Entity](telemetry.Stats.Get(telemetry.StatConfig)),
)
})

View file

@ -225,8 +225,8 @@ func (s Sync) SyncCustomvars(ctx context.Context) error {
func getCounterForEntity(e database.Entity) *com.Counter {
switch e.(type) {
case *v1.HostState, *v1.ServiceState:
return &telemetry.Stats.State
return telemetry.Stats.Get(telemetry.StatState)
default:
return &telemetry.Stats.Config
return telemetry.Stats.Get(telemetry.StatConfig)
}
}

View file

@ -2,40 +2,74 @@ package telemetry
import (
"context"
"fmt"
"github.com/icinga/icinga-go-library/com"
"github.com/icinga/icinga-go-library/logging"
"github.com/icinga/icinga-go-library/periodic"
"github.com/icinga/icinga-go-library/redis"
"github.com/icinga/icinga-go-library/utils"
"go.uber.org/zap"
"iter"
"strconv"
"sync"
"time"
)
var Stats struct {
// Config & co. are to be increased by the T sync once for every T object synced.
Config com.Counter
State com.Counter
History com.Counter
Callback com.Counter
Overdue com.Counter
HistoryCleanup com.Counter
// StatsKeeper holds multiple [com.Counter] values by name, to be used for statistics in WriteStats.
type StatsKeeper struct {
m sync.Map
}
// Get or create a [com.Counter] by its name.
func (statsKeeper *StatsKeeper) Get(key string) *com.Counter {
ctrAny, _ := statsKeeper.m.LoadOrStore(key, &com.Counter{})
ctr, ok := ctrAny.(*com.Counter)
if !ok {
// Should not happen unless someone messes with the internal map.
panic(fmt.Sprintf(
"StatsKeeper.Get(%q) returned something of type %T, not *com.Counter",
key, ctrAny))
}
return ctr
}
// Iterator over all keys and their [com.Counter].
func (statsKeeper *StatsKeeper) Iterator() iter.Seq2[string, *com.Counter] {
return func(yield func(string, *com.Counter) bool) {
statsKeeper.m.Range(func(keyAny, ctrAny any) bool {
key, keyOk := keyAny.(string)
ctr, ctrOk := ctrAny.(*com.Counter)
if !keyOk || !ctrOk {
// Should not happen unless someone messes with the internal map.
panic(fmt.Sprintf(
"iterating StatsKeeper on key %q got types (%T, %T), not (string, *com.Counter)",
keyAny, keyAny, ctrAny))
}
return yield(key, ctr)
})
}
}
// Stats is the singleton StatsKeeper to be used to access a [com.Counter].
var Stats = &StatsKeeper{}
// Keys for different well known Stats entries.
const (
StatConfig = "config_sync"
StatState = "state_sync"
StatHistory = "history_sync"
StatOverdue = "overdue_sync"
StatHistoryCleanup = "history_cleanup"
)
// WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2.
func WriteStats(ctx context.Context, client *redis.Client, logger *logging.Logger) {
counters := map[string]*com.Counter{
"config_sync": &Stats.Config,
"state_sync": &Stats.State,
"history_sync": &Stats.History,
"callback_sync": &Stats.Callback,
"overdue_sync": &Stats.Overdue,
"history_cleanup": &Stats.HistoryCleanup,
}
periodic.Start(ctx, time.Second, func(_ periodic.Tick) {
var data []string
for kind, counter := range counters {
for kind, counter := range Stats.Iterator() {
if cnt := counter.Reset(); cnt > 0 {
data = append(data, kind, strconv.FormatUint(cnt, 10))
}

View file

@ -0,0 +1,44 @@
package telemetry
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestStatsKeeper(t *testing.T) {
desiredState := map[string]uint64{
"foo": 23,
"bar": 42,
"baz": 0,
}
stats := &StatsKeeper{}
// Populate based on desiredState
for key, counterValue := range desiredState {
ctr := stats.Get(key)
ctr.Add(counterValue)
}
// Check if desiredState is set
for key, counterValue := range desiredState {
ctr := stats.Get(key)
assert.Equal(t, counterValue, ctr.Val())
}
// Get reference, change value, compare
fooKey := "foo"
fooCtr := stats.Get(fooKey)
assert.Equal(t, desiredState[fooKey], fooCtr.Reset())
assert.Equal(t, uint64(0), fooCtr.Val())
assert.Equal(t, uint64(0), stats.Get(fooKey).Val())
fooCtr.Add(desiredState[fooKey])
assert.Equal(t, desiredState[fooKey], stats.Get(fooKey).Val())
// Range over
for key, ctr := range stats.Iterator() {
desired, ok := desiredState[key]
assert.True(t, ok)
assert.Equal(t, desired, ctr.Val())
}
}