Write ops/s by op and s to icingadb:telemetry:stats

This commit is contained in:
Alexander A. Klimov 2022-05-02 15:38:33 +02:00
parent 0e5d098be4
commit fac9f5e4e5
9 changed files with 88 additions and 6 deletions

View file

@ -11,6 +11,7 @@ import (
"github.com/icinga/icingadb/pkg/icingadb/overdue"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/utils"
"github.com/okzk/sdnotify"
@ -117,6 +118,8 @@ func run() int {
}
defer db.Close()
ha = icingadb.NewHA(ctx, db, heartbeat, logs.GetChildLogger("high-availability"))
telemetry.WriteStats(ctx, rc, logs.GetChildLogger("telemetry"))
}
// Closing ha on exit ensures that this instance retracts its heartbeat
// from the database so that another instance can take over immediately.

View file

@ -47,6 +47,7 @@ logging:
# redis:
# retention:
# runtime-updates:
# telemetry:
retention:
# Number of days to retain full historical data. By default, historical data is retained forever.

View file

@ -63,6 +63,7 @@ overdue-sync | Calculation and synchronization of the overdue status
redis | Redis connection status and queries.
retention | Deletes historical data that exceed their configured retention period.
runtime-updates | Runtime updates of config objects after the initial config synchronization.
telemetry | Reporting of Icinga DB status to Icinga 2 via Redis (for monitoring purposes).
### Duration String <a id="duration-string"></a>

View file

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/icinga/icingadb/pkg/icingadb"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/pkg/errors"
@ -185,7 +186,10 @@ func (r *Retention) Start(ctx context.Context) error {
r.logger.Debugf("Cleaning up historical data for category %s from table %s older than %s",
stmt.Category, stmt.Table, olderThan)
deleted, err := r.db.CleanupOlderThan(ctx, stmt.CleanupStmt, e.Id, r.count, olderThan)
deleted, err := r.db.CleanupOlderThan(
ctx, stmt.CleanupStmt, e.Id, r.count, olderThan,
icingadb.OnSuccessIncrement[struct{}](&telemetry.Stats.HistoryCleanup),
)
if err != nil {
select {
case errs <- err:

View file

@ -10,6 +10,7 @@ import (
v1types "github.com/icinga/icingadb/pkg/icingadb/v1"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/icinga/icingadb/pkg/structify"
@ -157,6 +158,7 @@ func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redi
}
counter.Add(uint64(len(ids)))
telemetry.Stats.History.Add(uint64(len(ids)))
case <-ctx.Done():
return ctx.Err()
}

View file

@ -13,6 +13,7 @@ import (
"github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingadb/v1/overdue"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/pkg/errors"
@ -206,6 +207,7 @@ func (s Sync) updateOverdue(
}
counter.Add(uint64(len(ids)))
telemetry.Stats.Overdue.Add(uint64(len(ids)))
var op func(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
if overdue {

View file

@ -9,6 +9,7 @@ import (
"github.com/icinga/icingadb/pkg/contracts"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/icinga/icingadb/pkg/structify"
@ -67,6 +68,7 @@ func (r *RuntimeUpdates) Sync(
for _, factoryFunc := range factoryFuncs {
s := common.NewSyncSubject(factoryFunc)
stat := getCounterForEntity(s.Entity())
updateMessages := make(chan redis.XMessage, r.redis.Options.XReadCount)
upsertEntities := make(chan contracts.Entity, r.redis.Options.XReadCount)
@ -107,7 +109,9 @@ func (r *RuntimeUpdates) Sync(
// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
sem := semaphore.NewWeighted(1)
onSuccess := []OnSuccess[contracts.Entity]{OnSuccessIncrement[contracts.Entity](&counter)}
onSuccess := []OnSuccess[contracts.Entity]{
OnSuccessIncrement[contracts.Entity](&counter), OnSuccessIncrement[contracts.Entity](stat),
}
if !allowParallel {
onSuccess = append(onSuccess, OnSuccessSendTo(upsertedFifo))
}
@ -127,7 +131,7 @@ func (r *RuntimeUpdates) Sync(
sem := r.db.GetSemaphoreForTable(utils.TableName(s.Entity()))
onSuccess := []OnSuccess[any]{OnSuccessIncrement[any](&counter)}
onSuccess := []OnSuccess[any]{OnSuccessIncrement[any](&counter), OnSuccessIncrement[any](stat)}
if !allowParallel {
onSuccess = append(onSuccess, OnSuccessSendTo(deletedFifo))
}
@ -173,6 +177,7 @@ func (r *RuntimeUpdates) Sync(
return r.db.NamedBulkExec(
ctx, cvStmt, cvCount, sem, customvars, com.SplitOnDupId[contracts.Entity],
OnSuccessIncrement[contracts.Entity](&counter),
OnSuccessIncrement[contracts.Entity](&telemetry.Stats.Config),
)
})
@ -192,6 +197,7 @@ func (r *RuntimeUpdates) Sync(
return r.db.NamedBulkExec(
ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars,
com.SplitOnDupId[contracts.Entity], OnSuccessIncrement[contracts.Entity](&counter),
OnSuccessIncrement[contracts.Entity](&telemetry.Stats.Config),
)
})

View file

@ -8,6 +8,7 @@ import (
"github.com/icinga/icingadb/pkg/contracts"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
@ -101,6 +102,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
}
g, ctx := errgroup.WithContext(ctx)
stat := getCounterForEntity(delta.Subject.Entity())
// Create
if len(delta.Create) > 0 {
@ -125,7 +127,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
}
g.Go(func() error {
return s.db.CreateStreamed(ctx, entities)
return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat))
})
}
@ -149,7 +151,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
g.Go(func() error {
// Using upsert here on purpose as this is the fastest way to do bulk updates.
// However, there is a risk that errors in the sync implementation could silently insert new rows.
return s.db.UpsertStreamed(ctx, entities)
return s.db.UpsertStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat))
})
}
@ -157,7 +159,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
if len(delta.Delete) > 0 {
s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
g.Go(func() error {
return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs())
return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), OnSuccessIncrement[any](stat))
})
}
@ -201,3 +203,13 @@ func (s Sync) SyncCustomvars(ctx context.Context) error {
return g.Wait()
}
// getCounterForEntity returns the appropriate counter (config/state) from telemetry.Stats for e.
func getCounterForEntity(e contracts.Entity) *com.Counter {
switch e.(type) {
case *v1.HostState, *v1.ServiceState:
return &telemetry.Stats.State
default:
return &telemetry.Stats.Config
}
}

View file

@ -0,0 +1,51 @@
package telemetry
import (
"context"
"github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/icinga/icingadb/pkg/utils"
"go.uber.org/zap"
"strconv"
"time"
)
var Stats struct {
// Config & co. are to be increased by the T sync once for every T object synced.
Config, State, History, Overdue, HistoryCleanup com.Counter
}
// WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2.
func WriteStats(ctx context.Context, client *icingaredis.Client, logger *logging.Logger) {
counters := map[string]*com.Counter{
"sync_config": &Stats.Config,
"sync_state": &Stats.State,
"sync_history": &Stats.History,
"sync_overdue": &Stats.Overdue,
"cleanup_history": &Stats.HistoryCleanup,
}
periodic.Start(ctx, time.Second, func(_ periodic.Tick) {
var data []string
for kind, counter := range counters {
if cnt := counter.Reset(); cnt > 0 {
data = append(data, kind, strconv.FormatUint(cnt, 10))
}
}
if data != nil {
cmd := client.XAdd(ctx, &redis.XAddArgs{
Stream: "icingadb:telemetry:stats",
MaxLen: 15 * 60,
Approx: true,
Values: data,
})
if err := cmd.Err(); err != nil && !utils.IsContextCanceled(err) {
logger.Warnw("Can't update own stats", zap.Error(icingaredis.WrapCmdErr(cmd)))
}
}
})
}