diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 63f49665..1c026e37 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -12,6 +12,7 @@ import ( "github.com/icinga/icingadb/pkg/icingadb/overdue" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" "github.com/okzk/sdnotify" @@ -22,6 +23,7 @@ import ( "os" "os/signal" "sync" + "sync/atomic" "syscall" "time" ) @@ -119,6 +121,10 @@ func run() int { } defer db.Close() ha = icingadb.NewHA(ctx, db, heartbeat, logs.GetChildLogger("high-availability")) + + telemetryLogger := logs.GetChildLogger("telemetry") + telemetry.StartHeartbeat(ctx, rc, telemetryLogger, ha, heartbeat) + telemetry.WriteStats(ctx, rc, telemetryLogger) } // Closing ha on exit ensures that this instance retracts its heartbeat // from the database so that another instance can take over immediately. @@ -171,9 +177,9 @@ func run() int { configInitSync := sync.WaitGroup{} stateInitSync := &sync.WaitGroup{} - // Get the last IDs of the runtime update streams before starting anything else, + // Clear the runtime update streams before starting anything else (rather than after the sync), // otherwise updates may be lost. - runtimeConfigUpdateStreams, runtimeStateUpdateStreams, err := rt.Streams(synctx) + runtimeConfigUpdateStreams, runtimeStateUpdateStreams, err := rt.ClearStreams(synctx) if err != nil { logger.Fatalf("%+v", err) } @@ -204,6 +210,8 @@ func run() int { }) syncStart := time.Now() + atomic.StoreInt64(&telemetry.OngoingSyncStartMilli, syncStart.UnixMilli()) + logger.Info("Starting config sync") for _, factory := range v1.ConfigFactories { factory := factory @@ -242,10 +250,18 @@ func run() int { g.Go(func() error { configInitSync.Wait() + atomic.StoreInt64(&telemetry.OngoingSyncStartMilli, 0) - elapsed := time.Since(syncStart) + syncEnd := time.Now() + elapsed := syncEnd.Sub(syncStart) logger := logs.GetChildLogger("config-sync") + if synctx.Err() == nil { + telemetry.LastSuccessfulSync.Store(telemetry.SuccessfulSync{ + FinishMilli: syncEnd.UnixMilli(), + DurationMilli: elapsed.Milliseconds(), + }) + logger.Infof("Finished config sync in %s", elapsed) } else { logger.Warnf("Aborted config sync after %s", elapsed) diff --git a/config.example.yml b/config.example.yml index 76c82d4a..c6cdbc5a 100644 --- a/config.example.yml +++ b/config.example.yml @@ -47,6 +47,7 @@ logging: # redis: # retention: # runtime-updates: +# telemetry: retention: # Number of days to retain full historical data. By default, historical data is retained forever. diff --git a/doc/02-Installation.md b/doc/02-Installation.md index 1236649f..83ec8fc9 100644 --- a/doc/02-Installation.md +++ b/doc/02-Installation.md @@ -2,7 +2,7 @@ ## Requirements -* Local Redis instance (Will be installed during this documentation) +* Local Redis (≥6.2) instance (Will be installed during this documentation) * MySQL (≥5.5), MariaDB (≥10.1), or PostgreSQL (≥9.6): database, user and schema imports (Will be set up during this documentation) ## Setting up Icinga DB diff --git a/doc/03-Configuration.md b/doc/03-Configuration.md index 24322a60..14f58e59 100644 --- a/doc/03-Configuration.md +++ b/doc/03-Configuration.md @@ -63,6 +63,7 @@ overdue-sync | Calculation and synchronization of the overdue status redis | Redis connection status and queries. retention | Deletes historical data that exceed their configured retention period. runtime-updates | Runtime updates of config objects after the initial config synchronization. +telemetry | Reporting of Icinga DB status to Icinga 2 via Redis (for monitoring purposes). ### Duration String diff --git a/pkg/com/atomic.go b/pkg/com/atomic.go new file mode 100644 index 00000000..316413df --- /dev/null +++ b/pkg/com/atomic.go @@ -0,0 +1,38 @@ +package com + +import "sync/atomic" + +// Atomic is a type-safe wrapper around atomic.Value. +type Atomic[T any] struct { + v atomic.Value +} + +func (a *Atomic[T]) Load() (_ T, ok bool) { + if v, ok := a.v.Load().(box[T]); ok { + return v.v, true + } + + return +} + +func (a *Atomic[T]) Store(v T) { + a.v.Store(box[T]{v}) +} + +func (a *Atomic[T]) Swap(new T) (old T, ok bool) { + if old, ok := a.v.Swap(box[T]{new}).(box[T]); ok { + return old.v, true + } + + return +} + +func (a *Atomic[T]) CompareAndSwap(old, new T) (swapped bool) { + return a.v.CompareAndSwap(box[T]{old}, box[T]{new}) +} + +// box allows, for the case T is an interface, nil values and values of different specific types implementing T +// to be stored in Atomic[T]#v (bypassing atomic.Value#Store()'s policy) by wrapping it (into a non-interface). +type box[T any] struct { + v T +} diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index c4066baf..f529db44 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -6,6 +6,7 @@ import ( "database/sql/driver" "github.com/go-sql-driver/mysql" "github.com/icinga/icingadb/pkg/backoff" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/retry" "github.com/jmoiron/sqlx" @@ -39,11 +40,15 @@ func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) { retry.Settings{ Timeout: timeout, OnError: func(_ time.Duration, _ uint64, err, lastErr error) { + telemetry.UpdateCurrentDbConnErr(err) + if lastErr == nil || err.Error() != lastErr.Error() { c.driver.Logger.Warnw("Can't connect to database. Retrying", zap.Error(err)) } }, OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) { + telemetry.UpdateCurrentDbConnErr(nil) + if attempt > 0 { c.driver.Logger.Infow("Reconnected to database", zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1)) diff --git a/pkg/icingadb/cleanup.go b/pkg/icingadb/cleanup.go index a4f98e8b..e57eafaa 100644 --- a/pkg/icingadb/cleanup.go +++ b/pkg/icingadb/cleanup.go @@ -34,9 +34,11 @@ DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table, } // CleanupOlderThan deletes all rows with the specified statement that are older than the given time. -// Deletes a maximum of as many rows per round as defined in count. Returns the number of rows deleted. +// Deletes a maximum of as many rows per round as defined in count. Actually deleted rows will be passed to onSuccess. +// Returns the total number of rows deleted. func (db *DB) CleanupOlderThan( - ctx context.Context, stmt CleanupStmt, envId types.Binary, count uint64, olderThan time.Time, + ctx context.Context, stmt CleanupStmt, envId types.Binary, + count uint64, olderThan time.Time, onSuccess ...OnSuccess[struct{}], ) (uint64, error) { var counter com.Counter defer db.log(ctx, stmt.Build(db.DriverName(), 0), &counter).Stop() @@ -58,6 +60,12 @@ func (db *DB) CleanupOlderThan( counter.Add(uint64(n)) + for _, onSuccess := range onSuccess { + if err := onSuccess(ctx, make([]struct{}, n)); err != nil { + return 0, err + } + } + if n < int64(count) { break } diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 8d5b9438..33cbb990 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -228,13 +228,39 @@ func (db *DB) BuildWhere(subject interface{}) (string, int) { return strings.Join(where, ` AND `), len(columns) } +// OnSuccess is a callback for successful (bulk) DML operations. +type OnSuccess[T any] func(ctx context.Context, affectedRows []T) (err error) + +func OnSuccessIncrement[T any](counter *com.Counter) OnSuccess[T] { + return func(_ context.Context, rows []T) error { + counter.Add(uint64(len(rows))) + return nil + } +} + +func OnSuccessSendTo[T any](ch chan<- T) OnSuccess[T] { + return func(ctx context.Context, rows []T) error { + for _, row := range rows { + select { + case ch <- row: + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + } +} + // BulkExec bulk executes queries with a single slice placeholder in the form of `IN (?)`. // Takes in up to the number of arguments specified in count from the arg stream, // derives and expands a query and executes it with this set of arguments until the arg stream has been processed. // The derived queries are executed in a separate goroutine with a weighting of 1 // and can be executed concurrently to the extent allowed by the semaphore passed in sem. -// Arguments for which the query ran successfully will be streamed on the succeeded channel. -func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}, succeeded chan<- interface{}) error { +// Arguments for which the query ran successfully will be passed to onSuccess. +func (db *DB) BulkExec( + ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan any, onSuccess ...OnSuccess[any], +) error { var counter com.Counter defer db.log(ctx, query, &counter).Stop() @@ -270,13 +296,9 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph counter.Add(uint64(len(b))) - if succeeded != nil { - for _, row := range b { - select { - case <-ctx.Done(): - return ctx.Err() - case succeeded <- row: - } + for _, onSuccess := range onSuccess { + if err := onSuccess(ctx, b); err != nil { + return err } } @@ -302,10 +324,10 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph // this set of arguments, until the arg stream has been processed. // The queries are executed in a separate goroutine with a weighting of 1 // and can be executed concurrently to the extent allowed by the semaphore passed in sem. -// Entities for which the query ran successfully will be streamed on the succeeded channel. +// Entities for which the query ran successfully will be passed to onSuccess. func (db *DB) NamedBulkExec( ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity, - succeeded chan<- contracts.Entity, splitPolicyFactory com.BulkChunkSplitPolicyFactory[contracts.Entity], + splitPolicyFactory com.BulkChunkSplitPolicyFactory[contracts.Entity], onSuccess ...OnSuccess[contracts.Entity], ) error { var counter com.Counter defer db.log(ctx, query, &counter).Stop() @@ -339,13 +361,9 @@ func (db *DB) NamedBulkExec( counter.Add(uint64(len(b))) - if succeeded != nil { - for _, row := range b { - select { - case <-ctx.Done(): - return ctx.Err() - case succeeded <- row: - } + for _, onSuccess := range onSuccess { + if err := onSuccess(ctx, b); err != nil { + return err } } @@ -493,7 +511,10 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF // The insert statement is created using BuildInsertStmt with the first entity from the entities stream. // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and // concurrency is controlled via Options.MaxConnectionsPerTable. -func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Entity) error { +// Entities for which the query ran successfully will be passed to onSuccess. +func (db *DB) CreateStreamed( + ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity], +) error { first, forward, err := com.CopyFirst(ctx, entities) if first == nil { return errors.Wrap(err, "can't copy first entity") @@ -503,7 +524,8 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti stmt, placeholders := db.BuildInsertStmt(first) return db.NamedBulkExec( - ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil, com.NeverSplit[contracts.Entity], + ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, + forward, com.NeverSplit[contracts.Entity], onSuccess..., ) } @@ -511,7 +533,10 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti // The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream. // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and // concurrency is controlled via Options.MaxConnectionsPerTable. -func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Entity, succeeded chan<- contracts.Entity) error { +// Entities for which the query ran successfully will be passed to onSuccess. +func (db *DB) UpsertStreamed( + ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity], +) error { first, forward, err := com.CopyFirst(ctx, entities) if first == nil { return errors.Wrap(err, "can't copy first entity") @@ -522,7 +547,7 @@ func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Enti return db.NamedBulkExec( ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, - forward, succeeded, com.SplitOnDupId[contracts.Entity], + forward, com.SplitOnDupId[contracts.Entity], onSuccess..., ) } @@ -545,22 +570,29 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Enti // The delete statement is created using BuildDeleteStmt with the passed entityType. // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and // concurrency is controlled via Options.MaxConnectionsPerTable. -// IDs for which the query ran successfully will be streamed on the succeeded channel. -func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}, succeeded chan<- interface{}) error { +// IDs for which the query ran successfully will be passed to onSuccess. +func (db *DB) DeleteStreamed( + ctx context.Context, entityType contracts.Entity, ids <-chan interface{}, onSuccess ...OnSuccess[any], +) error { sem := db.GetSemaphoreForTable(utils.TableName(entityType)) - return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, succeeded) + return db.BulkExec( + ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, onSuccess..., + ) } // Delete creates a channel from the specified ids and // bulk deletes them by passing the channel along with the entityType to DeleteStreamed. -func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []interface{}) error { +// IDs for which the query ran successfully will be passed to onSuccess. +func (db *DB) Delete( + ctx context.Context, entityType contracts.Entity, ids []interface{}, onSuccess ...OnSuccess[any], +) error { idsCh := make(chan interface{}, len(ids)) for _, id := range ids { idsCh <- id } close(idsCh) - return db.DeleteStreamed(ctx, entityType, idsCh, nil) + return db.DeleteStreamed(ctx, entityType, idsCh, onSuccess...) } func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted { diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index b6dbd743..0a81bb5b 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -8,6 +8,7 @@ import ( "github.com/google/uuid" "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/backoff" + "github.com/icinga/icingadb/pkg/com" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1" @@ -23,8 +24,15 @@ import ( var timeout = 60 * time.Second +type haState struct { + responsibleTsMilli int64 + responsible bool + otherResponsible bool +} + // HA provides high availability and indicates whether a Takeover or Handover must be made. type HA struct { + state com.Atomic[haState] ctx context.Context cancelCtx context.CancelFunc instanceId types.Binary @@ -108,6 +116,12 @@ func (h *HA) Takeover() chan struct{} { return h.takeover } +// State returns the status quo. +func (h *HA) State() (responsibleTsMilli int64, responsible, otherResponsible bool) { + state, _ := h.state.Load() + return state.responsibleTsMilli, state.responsible, state.otherResponsible +} + func (h *HA) abort(err error) { h.errOnce.Do(func() { h.errMu.Lock() @@ -226,12 +240,13 @@ func (h *HA) controller() { } func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary, shouldLog bool) error { - var takeover bool + var takeover, otherResponsible bool err := retry.WithBackoff( ctx, func(ctx context.Context) error { takeover = false + otherResponsible = false tx, errBegin := h.db.BeginTxx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) if errBegin != nil { @@ -248,6 +263,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type ).StructScan(instance) switch errQuery { case nil: + otherResponsible = true if shouldLog { h.logger.Infow("Another instance is active", zap.String("instance_id", instance.Id.String()), @@ -330,6 +346,11 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type } h.signalTakeover() + } else if otherResponsible { + if state, _ := h.state.Load(); !state.otherResponsible { + state.otherResponsible = true + h.state.Store(state) + } } return nil @@ -392,6 +413,12 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar func (h *HA) signalHandover() { if h.responsible { + h.state.Store(haState{ + responsibleTsMilli: time.Now().UnixMilli(), + responsible: false, + otherResponsible: false, + }) + select { case h.handover <- struct{}{}: h.responsible = false @@ -403,6 +430,12 @@ func (h *HA) signalHandover() { func (h *HA) signalTakeover() { if !h.responsible { + h.state.Store(haState{ + responsibleTsMilli: time.Now().UnixMilli(), + responsible: true, + otherResponsible: false, + }) + select { case h.takeover <- struct{}{}: h.responsible = true diff --git a/pkg/icingadb/history/retention.go b/pkg/icingadb/history/retention.go index 2e899fc4..ff217cdd 100644 --- a/pkg/icingadb/history/retention.go +++ b/pkg/icingadb/history/retention.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/icinga/icingadb/pkg/icingadb" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/pkg/errors" @@ -185,7 +186,10 @@ func (r *Retention) Start(ctx context.Context) error { r.logger.Debugf("Cleaning up historical data for category %s from table %s older than %s", stmt.Category, stmt.Table, olderThan) - deleted, err := r.db.CleanupOlderThan(ctx, stmt.CleanupStmt, e.Id, r.count, olderThan) + deleted, err := r.db.CleanupOlderThan( + ctx, stmt.CleanupStmt, e.Id, r.count, olderThan, + icingadb.OnSuccessIncrement[struct{}](&telemetry.Stats.HistoryCleanup), + ) if err != nil { select { case errs <- err: diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index d5bebd65..25e319bb 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -10,6 +10,7 @@ import ( v1types "github.com/icinga/icingadb/pkg/icingadb/v1" v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/structify" @@ -157,6 +158,7 @@ func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redi } counter.Add(uint64(len(ids))) + telemetry.Stats.History.Add(uint64(len(ids))) case <-ctx.Done(): return ctx.Err() } @@ -253,7 +255,7 @@ func writeMultiEntityStage(entryToEntities func(entry redis.XMessage) ([]v1.Upse g.Go(func() error { defer close(inserted) - return s.db.UpsertStreamed(ctx, insert, inserted) + return s.db.UpsertStreamed(ctx, insert, icingadb.OnSuccessSendTo[contracts.Entity](inserted)) }) g.Go(func() error { diff --git a/pkg/icingadb/overdue/sync.go b/pkg/icingadb/overdue/sync.go index d8ba24bd..5cd4d674 100644 --- a/pkg/icingadb/overdue/sync.go +++ b/pkg/icingadb/overdue/sync.go @@ -13,6 +13,7 @@ import ( "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingadb/v1/overdue" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/pkg/errors" @@ -206,6 +207,7 @@ func (s Sync) updateOverdue( } counter.Add(uint64(len(ids))) + telemetry.Stats.Overdue.Add(uint64(len(ids))) var op func(ctx context.Context, key string, members ...interface{}) *redis.IntCmd if overdue { diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 636c0bb6..bc144b17 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -9,14 +9,18 @@ import ( "github.com/icinga/icingadb/pkg/contracts" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/structify" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" + "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "reflect" + "strconv" + "strings" "sync" ) @@ -36,22 +40,20 @@ func NewRuntimeUpdates(db *DB, redis *icingaredis.Client, logger *logging.Logger } } -// Streams returns the stream key to ID mapping of the runtime update streams for later use in Sync. -func (r *RuntimeUpdates) Streams(ctx context.Context) (config, state icingaredis.Streams, err error) { +// ClearStreams returns the stream key to ID mapping of the runtime update streams +// for later use in Sync and clears the streams themselves. +func (r *RuntimeUpdates) ClearStreams(ctx context.Context) (config, state icingaredis.Streams, err error) { config = icingaredis.Streams{"icinga:runtime": "0-0"} state = icingaredis.Streams{"icinga:runtime:state": "0-0"} + var keys []string for _, streams := range [...]icingaredis.Streams{config, state} { for key := range streams { - id, err := r.redis.StreamLastId(ctx, key) - if err != nil { - return nil, nil, err - } - - streams[key] = id + keys = append(keys, key) } } + err = icingaredis.WrapCmdErr(r.redis.Del(ctx, keys...)) return } @@ -67,30 +69,25 @@ func (r *RuntimeUpdates) Sync( for _, factoryFunc := range factoryFuncs { s := common.NewSyncSubject(factoryFunc) + stat := getCounterForEntity(s.Entity()) updateMessages := make(chan redis.XMessage, r.redis.Options.XReadCount) upsertEntities := make(chan contracts.Entity, r.redis.Options.XReadCount) deleteIds := make(chan interface{}, r.redis.Options.XReadCount) - var upserted chan contracts.Entity var upsertedFifo chan contracts.Entity - var deleted chan interface{} var deletedFifo chan interface{} var upsertCount int var deleteCount int upsertStmt, upsertPlaceholders := r.db.BuildUpsertStmt(s.Entity()) if !allowParallel { - upserted = make(chan contracts.Entity, 1) upsertedFifo = make(chan contracts.Entity, 1) - deleted = make(chan interface{}, 1) deletedFifo = make(chan interface{}, 1) upsertCount = 1 deleteCount = 1 } else { upsertCount = r.db.BatchSizeByPlaceholders(upsertPlaceholders) deleteCount = r.db.Options.MaxPlaceholdersPerStatement - upserted = make(chan contracts.Entity, upsertCount) - deleted = make(chan interface{}, deleteCount) } updateMessagesByKey[fmt.Sprintf("icinga:%s", utils.Key(s.Name(), ':'))] = updateMessages @@ -102,16 +99,6 @@ func (r *RuntimeUpdates) Sync( structify.MakeMapStructifier(reflect.TypeOf(s.Entity()).Elem(), "json"), )) - g.Go(func() error { - defer close(upserted) - - // Updates must be executed in order, ensure this by using a semaphore with maximum 1. - sem := semaphore.NewWeighted(1) - - return r.db.NamedBulkExec( - ctx, upsertStmt, upsertCount, sem, upsertEntities, upserted, com.SplitOnDupId[contracts.Entity], - ) - }) g.Go(func() error { var counter com.Counter defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { @@ -120,37 +107,21 @@ func (r *RuntimeUpdates) Sync( } }).Stop() - for { - select { - case v, ok := <-upserted: - if !ok { - return nil - } + // Updates must be executed in order, ensure this by using a semaphore with maximum 1. + sem := semaphore.NewWeighted(1) - counter.Inc() - - if !allowParallel { - select { - case upsertedFifo <- v: - case <-ctx.Done(): - return ctx.Err() - } - } - case <-ctx.Done(): - return ctx.Err() - } + onSuccess := []OnSuccess[contracts.Entity]{ + OnSuccessIncrement[contracts.Entity](&counter), OnSuccessIncrement[contracts.Entity](stat), + } + if !allowParallel { + onSuccess = append(onSuccess, OnSuccessSendTo(upsertedFifo)) } - }) - g.Go(func() error { - defer close(deleted) - - sem := r.db.GetSemaphoreForTable(utils.TableName(s.Entity())) - - return r.db.BulkExec( - ctx, r.db.BuildDeleteStmt(s.Entity()), deleteCount, sem, deleteIds, deleted, + return r.db.NamedBulkExec( + ctx, upsertStmt, upsertCount, sem, upsertEntities, com.SplitOnDupId[contracts.Entity], onSuccess..., ) }) + g.Go(func() error { var counter com.Counter defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { @@ -159,26 +130,14 @@ func (r *RuntimeUpdates) Sync( } }).Stop() - for { - select { - case v, ok := <-deleted: - if !ok { - return nil - } + sem := r.db.GetSemaphoreForTable(utils.TableName(s.Entity())) - counter.Inc() - - if !allowParallel { - select { - case deletedFifo <- v: - case <-ctx.Done(): - return ctx.Err() - } - } - case <-ctx.Done(): - return ctx.Err() - } + onSuccess := []OnSuccess[any]{OnSuccessIncrement[any](&counter), OnSuccessIncrement[any](stat)} + if !allowParallel { + onSuccess = append(onSuccess, OnSuccessSendTo(deletedFifo)) } + + return r.db.BulkExec(ctx, r.db.BuildDeleteStmt(s.Entity()), deleteCount, sem, deleteIds, onSuccess...) }) } @@ -205,17 +164,6 @@ func (r *RuntimeUpdates) Sync( cvStmt, cvPlaceholders := r.db.BuildUpsertStmt(cv.Entity()) cvCount := r.db.BatchSizeByPlaceholders(cvPlaceholders) - upsertedCustomvars := make(chan contracts.Entity, cvCount) - g.Go(func() error { - defer close(upsertedCustomvars) - - // Updates must be executed in order, ensure this by using a semaphore with maximum 1. - sem := semaphore.NewWeighted(1) - - return r.db.NamedBulkExec( - ctx, cvStmt, cvCount, sem, customvars, upsertedCustomvars, com.SplitOnDupId[contracts.Entity], - ) - }) g.Go(func() error { var counter com.Counter defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { @@ -224,34 +172,18 @@ func (r *RuntimeUpdates) Sync( } }).Stop() - for { - select { - case _, ok := <-upsertedCustomvars: - if !ok { - return nil - } - - counter.Inc() - case <-ctx.Done(): - return ctx.Err() - } - } - }) - - cvFlatStmt, cvFlatPlaceholders := r.db.BuildUpsertStmt(cvFlat.Entity()) - cvFlatCount := r.db.BatchSizeByPlaceholders(cvFlatPlaceholders) - upsertedFlatCustomvars := make(chan contracts.Entity) - g.Go(func() error { - defer close(upsertedFlatCustomvars) - // Updates must be executed in order, ensure this by using a semaphore with maximum 1. sem := semaphore.NewWeighted(1) return r.db.NamedBulkExec( - ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, - upsertedFlatCustomvars, com.SplitOnDupId[contracts.Entity], + ctx, cvStmt, cvCount, sem, customvars, com.SplitOnDupId[contracts.Entity], + OnSuccessIncrement[contracts.Entity](&counter), + OnSuccessIncrement[contracts.Entity](&telemetry.Stats.Config), ) }) + + cvFlatStmt, cvFlatPlaceholders := r.db.BuildUpsertStmt(cvFlat.Entity()) + cvFlatCount := r.db.BatchSizeByPlaceholders(cvFlatPlaceholders) g.Go(func() error { var counter com.Counter defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { @@ -260,18 +192,14 @@ func (r *RuntimeUpdates) Sync( } }).Stop() - for { - select { - case _, ok := <-upsertedFlatCustomvars: - if !ok { - return nil - } + // Updates must be executed in order, ensure this by using a semaphore with maximum 1. + sem := semaphore.NewWeighted(1) - counter.Inc() - case <-ctx.Done(): - return ctx.Err() - } - } + return r.db.NamedBulkExec( + ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, + com.SplitOnDupId[contracts.Entity], OnSuccessIncrement[contracts.Entity](&counter), + OnSuccessIncrement[contracts.Entity](&telemetry.Stats.Config), + ) }) g.Go(func() error { @@ -322,6 +250,7 @@ func (r *RuntimeUpdates) xRead(ctx context.Context, updateMessagesByKey map[stri return icingaredis.WrapCmdErr(cmd) } + pipe := r.redis.Pipeline() for _, stream := range rs { var id string @@ -344,8 +273,25 @@ func (r *RuntimeUpdates) xRead(ctx context.Context, updateMessagesByKey map[stri return ctx.Err() } } + + tsAndSerial := strings.Split(id, "-") + if s, err := strconv.ParseUint(tsAndSerial[1], 10, 64); err == nil { + tsAndSerial[1] = strconv.FormatUint(s+1, 10) + } + + pipe.XTrimMinIDApprox(ctx, stream.Stream, strings.Join(tsAndSerial, "-"), 0) streams[stream.Stream] = id } + + if cmds, err := pipe.Exec(ctx); err != nil { + r.logger.Errorw("Can't execute Redis pipeline", zap.Error(errors.WithStack(err))) + } else { + for _, cmd := range cmds { + if cmd.Err() != nil { + r.logger.Errorw("Can't trim runtime updates stream", zap.Error(icingaredis.WrapCmdErr(cmd))) + } + } + } } } } diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 7f139673..9af78e86 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -8,6 +8,7 @@ import ( "github.com/icinga/icingadb/pkg/contracts" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" @@ -101,6 +102,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { } g, ctx := errgroup.WithContext(ctx) + stat := getCounterForEntity(delta.Subject.Entity()) // Create if len(delta.Create) > 0 { @@ -125,7 +127,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { } g.Go(func() error { - return s.db.CreateStreamed(ctx, entities) + return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat)) }) } @@ -149,7 +151,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { g.Go(func() error { // Using upsert here on purpose as this is the fastest way to do bulk updates. // However, there is a risk that errors in the sync implementation could silently insert new rows. - return s.db.UpsertStreamed(ctx, entities, nil) + return s.db.UpsertStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat)) }) } @@ -157,7 +159,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { if len(delta.Delete) > 0 { s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) g.Go(func() error { - return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs()) + return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), OnSuccessIncrement[any](stat)) }) } @@ -201,3 +203,13 @@ func (s Sync) SyncCustomvars(ctx context.Context) error { return g.Wait() } + +// getCounterForEntity returns the appropriate counter (config/state) from telemetry.Stats for e. +func getCounterForEntity(e contracts.Entity) *com.Counter { + switch e.(type) { + case *v1.HostState, *v1.ServiceState: + return &telemetry.Stats.State + default: + return &telemetry.Stats.Config + } +} diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go index 2af5e825..8ea9d54c 100644 --- a/pkg/icingaredis/client.go +++ b/pkg/icingaredis/client.go @@ -182,24 +182,6 @@ func (c *Client) HMYield(ctx context.Context, key string, fields ...string) (<-c })) } -// StreamLastId fetches the last message of a stream and returns its ID. -func (c *Client) StreamLastId(ctx context.Context, stream string) (string, error) { - lastId := "0-0" - - cmd := c.XRevRangeN(ctx, stream, "+", "-", 1) - messages, err := cmd.Result() - - if err != nil { - return "", WrapCmdErr(cmd) - } - - for _, message := range messages { - lastId = message.ID - } - - return lastId, nil -} - // YieldAll yields all entities from Redis that belong to the specified SyncSubject. func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-chan contracts.Entity, <-chan error) { key := utils.Key(utils.Name(subject.Entity()), ':') diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index 2fc08873..72178026 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -12,6 +12,7 @@ import ( "go.uber.org/zap" "golang.org/x/sync/errgroup" "sync" + "sync/atomic" "time" ) @@ -22,14 +23,15 @@ var timeout = 60 * time.Second // Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received. // Also signals on if the heartbeat is Lost. type Heartbeat struct { - active bool - events chan *HeartbeatMessage - cancelCtx context.CancelFunc - client *Client - done chan struct{} - errMu sync.Mutex - err error - logger *logging.Logger + active bool + events chan *HeartbeatMessage + lastReceivedMs int64 + cancelCtx context.CancelFunc + client *Client + done chan struct{} + errMu sync.Mutex + err error + logger *logging.Logger } // NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop. @@ -56,6 +58,11 @@ func (h *Heartbeat) Events() <-chan *HeartbeatMessage { return h.events } +// LastReceived returns the last heartbeat's receive time in ms. +func (h *Heartbeat) LastReceived() int64 { + return atomic.LoadInt64(&h.lastReceivedMs) +} + // Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any. // Implements the io.Closer interface. func (h *Heartbeat) Close() error { @@ -132,6 +139,8 @@ func (h *Heartbeat) controller(ctx context.Context) { h.logger.Infow("Received Icinga heartbeat", zap.String("environment", envId.String())) h.active = true } + + atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli()) h.sendEvent(m) case <-time.After(timeout): if h.active { @@ -141,6 +150,8 @@ func (h *Heartbeat) controller(ctx context.Context) { } else { h.logger.Warn("Waiting for Icinga heartbeat") } + + atomic.StoreInt64(&h.lastReceivedMs, 0) case <-ctx.Done(): return ctx.Err() } diff --git a/pkg/icingaredis/telemetry/heartbeat.go b/pkg/icingaredis/telemetry/heartbeat.go new file mode 100644 index 00000000..ee476a15 --- /dev/null +++ b/pkg/icingaredis/telemetry/heartbeat.go @@ -0,0 +1,203 @@ +package telemetry + +import ( + "context" + "fmt" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "go.uber.org/zap" + "regexp" + "runtime/metrics" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" +) + +// ha represents icingadb.HA to avoid import cycles. +type ha interface { + State() (weResponsibleMilli int64, weResponsible, otherResponsible bool) +} + +type SuccessfulSync struct { + FinishMilli int64 + DurationMilli int64 +} + +// currentDbConnErr stores ongoing errors from database connections. +var currentDbConnErr struct { + mu sync.Mutex + message string + sinceMilli int64 +} + +// UpdateCurrentDbConnErr updates the current error information stored in currentDbConnErr. +func UpdateCurrentDbConnErr(err error) { + now := time.Now().UnixMilli() + + currentDbConnErr.mu.Lock() + defer currentDbConnErr.mu.Unlock() + + if currentDbConnErr.sinceMilli >= now { + // Already updated with a more recent error, ignore this one. + return + } + + message := "" + if err != nil { + message = err.Error() + } + + if currentDbConnErr.message == message { + // Error stayed the same, no update needed, keeping the old timestamp. + return + } + + if currentDbConnErr.message == "" || message == "" { + // Either first error or recovery from an error, update timestamp. + currentDbConnErr.sinceMilli = now + } + + currentDbConnErr.message = message +} + +// GetCurrentDbConnErr returns the last error message (or the empty string if not in an error state) and a timestamp in +// milliseconds of the last change from OK to error or from error to OK. +func GetCurrentDbConnErr() (string, int64) { + currentDbConnErr.mu.Lock() + defer currentDbConnErr.mu.Unlock() + + return currentDbConnErr.message, currentDbConnErr.sinceMilli +} + +// OngoingSyncStartMilli is to be updated by the main() function. +var OngoingSyncStartMilli int64 + +// LastSuccessfulSync is to be updated by the main() function. +var LastSuccessfulSync com.Atomic[SuccessfulSync] + +var boolToStr = map[bool]string{false: "0", true: "1"} +var startTime = time.Now().UnixMilli() + +// StartHeartbeat periodically writes heartbeats to Redis for being monitored by Icinga 2. +func StartHeartbeat( + ctx context.Context, client *icingaredis.Client, logger *logging.Logger, ha ha, heartbeat *icingaredis.Heartbeat, +) { + goMetrics := NewGoMetrics() + + const interval = time.Second + + var lastErr string + var silenceUntil time.Time + + periodic.Start(ctx, interval, func(tick periodic.Tick) { + heartbeat := heartbeat.LastReceived() + responsibleTsMilli, responsible, otherResponsible := ha.State() + ongoingSyncStart := atomic.LoadInt64(&OngoingSyncStartMilli) + sync, _ := LastSuccessfulSync.Load() + dbConnErr, dbConnErrSinceMilli := GetCurrentDbConnErr() + now := time.Now() + + values := map[string]string{ + "version": internal.Version.Version, + "time": strconv.FormatInt(now.UnixMilli(), 10), + "start-time": strconv.FormatInt(startTime, 10), + "error": dbConnErr, + "error-since": strconv.FormatInt(dbConnErrSinceMilli, 10), + "performance-data": goMetrics.PerformanceData(), + "last-heartbeat-received": strconv.FormatInt(heartbeat, 10), + "ha-responsible": boolToStr[responsible], + "ha-responsible-ts": strconv.FormatInt(responsibleTsMilli, 10), + "ha-other-responsible": boolToStr[otherResponsible], + "sync-ongoing-since": strconv.FormatInt(ongoingSyncStart, 10), + "sync-success-finish": strconv.FormatInt(sync.FinishMilli, 10), + "sync-success-duration": strconv.FormatInt(sync.DurationMilli, 10), + } + + ctx, cancel := context.WithDeadline(ctx, tick.Time.Add(interval)) + defer cancel() + + cmd := client.XAdd(ctx, &redis.XAddArgs{ + Stream: "icingadb:telemetry:heartbeat", + MaxLen: 1, + Values: values, + }) + if err := cmd.Err(); err != nil && !utils.IsContextCanceled(err) && !errors.Is(err, context.DeadlineExceeded) { + logw := logger.Debugw + currentErr := err.Error() + + if currentErr != lastErr || now.After(silenceUntil) { + logw = logger.Warnw + lastErr = currentErr + silenceUntil = now.Add(time.Minute) + } + + logw("Can't update own heartbeat", zap.Error(icingaredis.WrapCmdErr(cmd))) + } else { + lastErr = "" + silenceUntil = time.Time{} + } + }) +} + +type goMetrics struct { + names []string + units []string + samples []metrics.Sample +} + +func NewGoMetrics() *goMetrics { + m := &goMetrics{} + + forbiddenRe := regexp.MustCompile(`\W`) + + for _, d := range metrics.All() { + switch d.Kind { + case metrics.KindUint64, metrics.KindFloat64: + name := "go_" + strings.TrimLeft(forbiddenRe.ReplaceAllString(d.Name, "_"), "_") + + unit := "" + if strings.HasSuffix(d.Name, ":bytes") { + unit = "B" + } else if strings.HasSuffix(d.Name, ":seconds") { + unit = "s" + } else if d.Cumulative { + unit = "c" + } + + m.names = append(m.names, name) + m.units = append(m.units, unit) + m.samples = append(m.samples, metrics.Sample{Name: d.Name}) + } + } + + return m +} + +func (g *goMetrics) PerformanceData() string { + metrics.Read(g.samples) + + var buf strings.Builder + + for i, sample := range g.samples { + if i > 0 { + buf.WriteByte(' ') + } + + switch sample.Value.Kind() { + case metrics.KindUint64: + _, _ = fmt.Fprintf(&buf, "%s=%d%s", g.names[i], sample.Value.Uint64(), g.units[i]) + case metrics.KindFloat64: + _, _ = fmt.Fprintf(&buf, "%s=%f%s", g.names[i], sample.Value.Float64(), g.units[i]) + } + } + + return buf.String() +} diff --git a/pkg/icingaredis/telemetry/stats.go b/pkg/icingaredis/telemetry/stats.go new file mode 100644 index 00000000..86db0b3e --- /dev/null +++ b/pkg/icingaredis/telemetry/stats.go @@ -0,0 +1,51 @@ +package telemetry + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/icinga/icingadb/pkg/utils" + "go.uber.org/zap" + "strconv" + "time" +) + +var Stats struct { + // Config & co. are to be increased by the T sync once for every T object synced. + Config, State, History, Overdue, HistoryCleanup com.Counter +} + +// WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2. +func WriteStats(ctx context.Context, client *icingaredis.Client, logger *logging.Logger) { + counters := map[string]*com.Counter{ + "config_sync": &Stats.Config, + "state_sync": &Stats.State, + "history_sync": &Stats.History, + "overdue_sync": &Stats.Overdue, + "history_cleanup": &Stats.HistoryCleanup, + } + + periodic.Start(ctx, time.Second, func(_ periodic.Tick) { + var data []string + for kind, counter := range counters { + if cnt := counter.Reset(); cnt > 0 { + data = append(data, kind, strconv.FormatUint(cnt, 10)) + } + } + + if data != nil { + cmd := client.XAdd(ctx, &redis.XAddArgs{ + Stream: "icingadb:telemetry:stats", + MaxLen: 15 * 60, + Approx: true, + Values: data, + }) + if err := cmd.Err(); err != nil && !utils.IsContextCanceled(err) { + logger.Warnw("Can't update own stats", zap.Error(icingaredis.WrapCmdErr(cmd))) + } + } + }) +}