Merge pull request #464 from Icinga/OwnHeartbeat

Write own status into Redis
This commit is contained in:
Eric Lippmann 2022-06-28 15:21:03 +02:00 committed by GitHub
commit 5f29caecbe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 524 additions and 177 deletions

View file

@ -12,6 +12,7 @@ import (
"github.com/icinga/icingadb/pkg/icingadb/overdue"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/utils"
"github.com/okzk/sdnotify"
@ -22,6 +23,7 @@ import (
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
)
@ -119,6 +121,10 @@ func run() int {
}
defer db.Close()
ha = icingadb.NewHA(ctx, db, heartbeat, logs.GetChildLogger("high-availability"))
telemetryLogger := logs.GetChildLogger("telemetry")
telemetry.StartHeartbeat(ctx, rc, telemetryLogger, ha, heartbeat)
telemetry.WriteStats(ctx, rc, telemetryLogger)
}
// Closing ha on exit ensures that this instance retracts its heartbeat
// from the database so that another instance can take over immediately.
@ -171,9 +177,9 @@ func run() int {
configInitSync := sync.WaitGroup{}
stateInitSync := &sync.WaitGroup{}
// Get the last IDs of the runtime update streams before starting anything else,
// Clear the runtime update streams before starting anything else (rather than after the sync),
// otherwise updates may be lost.
runtimeConfigUpdateStreams, runtimeStateUpdateStreams, err := rt.Streams(synctx)
runtimeConfigUpdateStreams, runtimeStateUpdateStreams, err := rt.ClearStreams(synctx)
if err != nil {
logger.Fatalf("%+v", err)
}
@ -204,6 +210,8 @@ func run() int {
})
syncStart := time.Now()
atomic.StoreInt64(&telemetry.OngoingSyncStartMilli, syncStart.UnixMilli())
logger.Info("Starting config sync")
for _, factory := range v1.ConfigFactories {
factory := factory
@ -242,10 +250,18 @@ func run() int {
g.Go(func() error {
configInitSync.Wait()
atomic.StoreInt64(&telemetry.OngoingSyncStartMilli, 0)
elapsed := time.Since(syncStart)
syncEnd := time.Now()
elapsed := syncEnd.Sub(syncStart)
logger := logs.GetChildLogger("config-sync")
if synctx.Err() == nil {
telemetry.LastSuccessfulSync.Store(telemetry.SuccessfulSync{
FinishMilli: syncEnd.UnixMilli(),
DurationMilli: elapsed.Milliseconds(),
})
logger.Infof("Finished config sync in %s", elapsed)
} else {
logger.Warnf("Aborted config sync after %s", elapsed)

View file

@ -47,6 +47,7 @@ logging:
# redis:
# retention:
# runtime-updates:
# telemetry:
retention:
# Number of days to retain full historical data. By default, historical data is retained forever.

View file

@ -2,7 +2,7 @@
## Requirements <a id="installation-requirements"></a>
* Local Redis instance (Will be installed during this documentation)
* Local Redis (≥6.2) instance (Will be installed during this documentation)
* MySQL (≥5.5), MariaDB (≥10.1), or PostgreSQL (≥9.6): database, user and schema imports (Will be set up during this documentation)
## Setting up Icinga DB <a id="setting-up-icingadb"></a>

View file

@ -63,6 +63,7 @@ overdue-sync | Calculation and synchronization of the overdue status
redis | Redis connection status and queries.
retention | Deletes historical data that exceed their configured retention period.
runtime-updates | Runtime updates of config objects after the initial config synchronization.
telemetry | Reporting of Icinga DB status to Icinga 2 via Redis (for monitoring purposes).
### Duration String <a id="duration-string"></a>

38
pkg/com/atomic.go Normal file
View file

@ -0,0 +1,38 @@
package com
import "sync/atomic"
// Atomic is a type-safe wrapper around atomic.Value.
type Atomic[T any] struct {
v atomic.Value
}
func (a *Atomic[T]) Load() (_ T, ok bool) {
if v, ok := a.v.Load().(box[T]); ok {
return v.v, true
}
return
}
func (a *Atomic[T]) Store(v T) {
a.v.Store(box[T]{v})
}
func (a *Atomic[T]) Swap(new T) (old T, ok bool) {
if old, ok := a.v.Swap(box[T]{new}).(box[T]); ok {
return old.v, true
}
return
}
func (a *Atomic[T]) CompareAndSwap(old, new T) (swapped bool) {
return a.v.CompareAndSwap(box[T]{old}, box[T]{new})
}
// box allows, for the case T is an interface, nil values and values of different specific types implementing T
// to be stored in Atomic[T]#v (bypassing atomic.Value#Store()'s policy) by wrapping it (into a non-interface).
type box[T any] struct {
v T
}

View file

@ -6,6 +6,7 @@ import (
"database/sql/driver"
"github.com/go-sql-driver/mysql"
"github.com/icinga/icingadb/pkg/backoff"
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/retry"
"github.com/jmoiron/sqlx"
@ -39,11 +40,15 @@ func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) {
retry.Settings{
Timeout: timeout,
OnError: func(_ time.Duration, _ uint64, err, lastErr error) {
telemetry.UpdateCurrentDbConnErr(err)
if lastErr == nil || err.Error() != lastErr.Error() {
c.driver.Logger.Warnw("Can't connect to database. Retrying", zap.Error(err))
}
},
OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) {
telemetry.UpdateCurrentDbConnErr(nil)
if attempt > 0 {
c.driver.Logger.Infow("Reconnected to database",
zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1))

View file

@ -34,9 +34,11 @@ DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table,
}
// CleanupOlderThan deletes all rows with the specified statement that are older than the given time.
// Deletes a maximum of as many rows per round as defined in count. Returns the number of rows deleted.
// Deletes a maximum of as many rows per round as defined in count. Actually deleted rows will be passed to onSuccess.
// Returns the total number of rows deleted.
func (db *DB) CleanupOlderThan(
ctx context.Context, stmt CleanupStmt, envId types.Binary, count uint64, olderThan time.Time,
ctx context.Context, stmt CleanupStmt, envId types.Binary,
count uint64, olderThan time.Time, onSuccess ...OnSuccess[struct{}],
) (uint64, error) {
var counter com.Counter
defer db.log(ctx, stmt.Build(db.DriverName(), 0), &counter).Stop()
@ -58,6 +60,12 @@ func (db *DB) CleanupOlderThan(
counter.Add(uint64(n))
for _, onSuccess := range onSuccess {
if err := onSuccess(ctx, make([]struct{}, n)); err != nil {
return 0, err
}
}
if n < int64(count) {
break
}

View file

@ -228,13 +228,39 @@ func (db *DB) BuildWhere(subject interface{}) (string, int) {
return strings.Join(where, ` AND `), len(columns)
}
// OnSuccess is a callback for successful (bulk) DML operations.
type OnSuccess[T any] func(ctx context.Context, affectedRows []T) (err error)
func OnSuccessIncrement[T any](counter *com.Counter) OnSuccess[T] {
return func(_ context.Context, rows []T) error {
counter.Add(uint64(len(rows)))
return nil
}
}
func OnSuccessSendTo[T any](ch chan<- T) OnSuccess[T] {
return func(ctx context.Context, rows []T) error {
for _, row := range rows {
select {
case ch <- row:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
}
// BulkExec bulk executes queries with a single slice placeholder in the form of `IN (?)`.
// Takes in up to the number of arguments specified in count from the arg stream,
// derives and expands a query and executes it with this set of arguments until the arg stream has been processed.
// The derived queries are executed in a separate goroutine with a weighting of 1
// and can be executed concurrently to the extent allowed by the semaphore passed in sem.
// Arguments for which the query ran successfully will be streamed on the succeeded channel.
func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}, succeeded chan<- interface{}) error {
// Arguments for which the query ran successfully will be passed to onSuccess.
func (db *DB) BulkExec(
ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan any, onSuccess ...OnSuccess[any],
) error {
var counter com.Counter
defer db.log(ctx, query, &counter).Stop()
@ -270,13 +296,9 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph
counter.Add(uint64(len(b)))
if succeeded != nil {
for _, row := range b {
select {
case <-ctx.Done():
return ctx.Err()
case succeeded <- row:
}
for _, onSuccess := range onSuccess {
if err := onSuccess(ctx, b); err != nil {
return err
}
}
@ -302,10 +324,10 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph
// this set of arguments, until the arg stream has been processed.
// The queries are executed in a separate goroutine with a weighting of 1
// and can be executed concurrently to the extent allowed by the semaphore passed in sem.
// Entities for which the query ran successfully will be streamed on the succeeded channel.
// Entities for which the query ran successfully will be passed to onSuccess.
func (db *DB) NamedBulkExec(
ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity,
succeeded chan<- contracts.Entity, splitPolicyFactory com.BulkChunkSplitPolicyFactory[contracts.Entity],
splitPolicyFactory com.BulkChunkSplitPolicyFactory[contracts.Entity], onSuccess ...OnSuccess[contracts.Entity],
) error {
var counter com.Counter
defer db.log(ctx, query, &counter).Stop()
@ -339,13 +361,9 @@ func (db *DB) NamedBulkExec(
counter.Add(uint64(len(b)))
if succeeded != nil {
for _, row := range b {
select {
case <-ctx.Done():
return ctx.Err()
case succeeded <- row:
}
for _, onSuccess := range onSuccess {
if err := onSuccess(ctx, b); err != nil {
return err
}
}
@ -493,7 +511,10 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF
// The insert statement is created using BuildInsertStmt with the first entity from the entities stream.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Entity) error {
// Entities for which the query ran successfully will be passed to onSuccess.
func (db *DB) CreateStreamed(
ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity],
) error {
first, forward, err := com.CopyFirst(ctx, entities)
if first == nil {
return errors.Wrap(err, "can't copy first entity")
@ -503,7 +524,8 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti
stmt, placeholders := db.BuildInsertStmt(first)
return db.NamedBulkExec(
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil, com.NeverSplit[contracts.Entity],
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem,
forward, com.NeverSplit[contracts.Entity], onSuccess...,
)
}
@ -511,7 +533,10 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti
// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Entity, succeeded chan<- contracts.Entity) error {
// Entities for which the query ran successfully will be passed to onSuccess.
func (db *DB) UpsertStreamed(
ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity],
) error {
first, forward, err := com.CopyFirst(ctx, entities)
if first == nil {
return errors.Wrap(err, "can't copy first entity")
@ -522,7 +547,7 @@ func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Enti
return db.NamedBulkExec(
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem,
forward, succeeded, com.SplitOnDupId[contracts.Entity],
forward, com.SplitOnDupId[contracts.Entity], onSuccess...,
)
}
@ -545,22 +570,29 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Enti
// The delete statement is created using BuildDeleteStmt with the passed entityType.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
// IDs for which the query ran successfully will be streamed on the succeeded channel.
func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}, succeeded chan<- interface{}) error {
// IDs for which the query ran successfully will be passed to onSuccess.
func (db *DB) DeleteStreamed(
ctx context.Context, entityType contracts.Entity, ids <-chan interface{}, onSuccess ...OnSuccess[any],
) error {
sem := db.GetSemaphoreForTable(utils.TableName(entityType))
return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, succeeded)
return db.BulkExec(
ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, onSuccess...,
)
}
// Delete creates a channel from the specified ids and
// bulk deletes them by passing the channel along with the entityType to DeleteStreamed.
func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []interface{}) error {
// IDs for which the query ran successfully will be passed to onSuccess.
func (db *DB) Delete(
ctx context.Context, entityType contracts.Entity, ids []interface{}, onSuccess ...OnSuccess[any],
) error {
idsCh := make(chan interface{}, len(ids))
for _, id := range ids {
idsCh <- id
}
close(idsCh)
return db.DeleteStreamed(ctx, entityType, idsCh, nil)
return db.DeleteStreamed(ctx, entityType, idsCh, onSuccess...)
}
func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted {

View file

@ -8,6 +8,7 @@ import (
"github.com/google/uuid"
"github.com/icinga/icingadb/internal"
"github.com/icinga/icingadb/pkg/backoff"
"github.com/icinga/icingadb/pkg/com"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis"
icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1"
@ -23,8 +24,15 @@ import (
var timeout = 60 * time.Second
type haState struct {
responsibleTsMilli int64
responsible bool
otherResponsible bool
}
// HA provides high availability and indicates whether a Takeover or Handover must be made.
type HA struct {
state com.Atomic[haState]
ctx context.Context
cancelCtx context.CancelFunc
instanceId types.Binary
@ -108,6 +116,12 @@ func (h *HA) Takeover() chan struct{} {
return h.takeover
}
// State returns the status quo.
func (h *HA) State() (responsibleTsMilli int64, responsible, otherResponsible bool) {
state, _ := h.state.Load()
return state.responsibleTsMilli, state.responsible, state.otherResponsible
}
func (h *HA) abort(err error) {
h.errOnce.Do(func() {
h.errMu.Lock()
@ -226,12 +240,13 @@ func (h *HA) controller() {
}
func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary, shouldLog bool) error {
var takeover bool
var takeover, otherResponsible bool
err := retry.WithBackoff(
ctx,
func(ctx context.Context) error {
takeover = false
otherResponsible = false
tx, errBegin := h.db.BeginTxx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if errBegin != nil {
@ -248,6 +263,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
).StructScan(instance)
switch errQuery {
case nil:
otherResponsible = true
if shouldLog {
h.logger.Infow("Another instance is active",
zap.String("instance_id", instance.Id.String()),
@ -330,6 +346,11 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
}
h.signalTakeover()
} else if otherResponsible {
if state, _ := h.state.Load(); !state.otherResponsible {
state.otherResponsible = true
h.state.Store(state)
}
}
return nil
@ -392,6 +413,12 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar
func (h *HA) signalHandover() {
if h.responsible {
h.state.Store(haState{
responsibleTsMilli: time.Now().UnixMilli(),
responsible: false,
otherResponsible: false,
})
select {
case h.handover <- struct{}{}:
h.responsible = false
@ -403,6 +430,12 @@ func (h *HA) signalHandover() {
func (h *HA) signalTakeover() {
if !h.responsible {
h.state.Store(haState{
responsibleTsMilli: time.Now().UnixMilli(),
responsible: true,
otherResponsible: false,
})
select {
case h.takeover <- struct{}{}:
h.responsible = true

View file

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/icinga/icingadb/pkg/icingadb"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/pkg/errors"
@ -185,7 +186,10 @@ func (r *Retention) Start(ctx context.Context) error {
r.logger.Debugf("Cleaning up historical data for category %s from table %s older than %s",
stmt.Category, stmt.Table, olderThan)
deleted, err := r.db.CleanupOlderThan(ctx, stmt.CleanupStmt, e.Id, r.count, olderThan)
deleted, err := r.db.CleanupOlderThan(
ctx, stmt.CleanupStmt, e.Id, r.count, olderThan,
icingadb.OnSuccessIncrement[struct{}](&telemetry.Stats.HistoryCleanup),
)
if err != nil {
select {
case errs <- err:

View file

@ -10,6 +10,7 @@ import (
v1types "github.com/icinga/icingadb/pkg/icingadb/v1"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/icinga/icingadb/pkg/structify"
@ -157,6 +158,7 @@ func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redi
}
counter.Add(uint64(len(ids)))
telemetry.Stats.History.Add(uint64(len(ids)))
case <-ctx.Done():
return ctx.Err()
}
@ -253,7 +255,7 @@ func writeMultiEntityStage(entryToEntities func(entry redis.XMessage) ([]v1.Upse
g.Go(func() error {
defer close(inserted)
return s.db.UpsertStreamed(ctx, insert, inserted)
return s.db.UpsertStreamed(ctx, insert, icingadb.OnSuccessSendTo[contracts.Entity](inserted))
})
g.Go(func() error {

View file

@ -13,6 +13,7 @@ import (
"github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingadb/v1/overdue"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/pkg/errors"
@ -206,6 +207,7 @@ func (s Sync) updateOverdue(
}
counter.Add(uint64(len(ids)))
telemetry.Stats.Overdue.Add(uint64(len(ids)))
var op func(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
if overdue {

View file

@ -9,14 +9,18 @@ import (
"github.com/icinga/icingadb/pkg/contracts"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/icinga/icingadb/pkg/structify"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"reflect"
"strconv"
"strings"
"sync"
)
@ -36,22 +40,20 @@ func NewRuntimeUpdates(db *DB, redis *icingaredis.Client, logger *logging.Logger
}
}
// Streams returns the stream key to ID mapping of the runtime update streams for later use in Sync.
func (r *RuntimeUpdates) Streams(ctx context.Context) (config, state icingaredis.Streams, err error) {
// ClearStreams returns the stream key to ID mapping of the runtime update streams
// for later use in Sync and clears the streams themselves.
func (r *RuntimeUpdates) ClearStreams(ctx context.Context) (config, state icingaredis.Streams, err error) {
config = icingaredis.Streams{"icinga:runtime": "0-0"}
state = icingaredis.Streams{"icinga:runtime:state": "0-0"}
var keys []string
for _, streams := range [...]icingaredis.Streams{config, state} {
for key := range streams {
id, err := r.redis.StreamLastId(ctx, key)
if err != nil {
return nil, nil, err
}
streams[key] = id
keys = append(keys, key)
}
}
err = icingaredis.WrapCmdErr(r.redis.Del(ctx, keys...))
return
}
@ -67,30 +69,25 @@ func (r *RuntimeUpdates) Sync(
for _, factoryFunc := range factoryFuncs {
s := common.NewSyncSubject(factoryFunc)
stat := getCounterForEntity(s.Entity())
updateMessages := make(chan redis.XMessage, r.redis.Options.XReadCount)
upsertEntities := make(chan contracts.Entity, r.redis.Options.XReadCount)
deleteIds := make(chan interface{}, r.redis.Options.XReadCount)
var upserted chan contracts.Entity
var upsertedFifo chan contracts.Entity
var deleted chan interface{}
var deletedFifo chan interface{}
var upsertCount int
var deleteCount int
upsertStmt, upsertPlaceholders := r.db.BuildUpsertStmt(s.Entity())
if !allowParallel {
upserted = make(chan contracts.Entity, 1)
upsertedFifo = make(chan contracts.Entity, 1)
deleted = make(chan interface{}, 1)
deletedFifo = make(chan interface{}, 1)
upsertCount = 1
deleteCount = 1
} else {
upsertCount = r.db.BatchSizeByPlaceholders(upsertPlaceholders)
deleteCount = r.db.Options.MaxPlaceholdersPerStatement
upserted = make(chan contracts.Entity, upsertCount)
deleted = make(chan interface{}, deleteCount)
}
updateMessagesByKey[fmt.Sprintf("icinga:%s", utils.Key(s.Name(), ':'))] = updateMessages
@ -102,16 +99,6 @@ func (r *RuntimeUpdates) Sync(
structify.MakeMapStructifier(reflect.TypeOf(s.Entity()).Elem(), "json"),
))
g.Go(func() error {
defer close(upserted)
// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(
ctx, upsertStmt, upsertCount, sem, upsertEntities, upserted, com.SplitOnDupId[contracts.Entity],
)
})
g.Go(func() error {
var counter com.Counter
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
@ -120,37 +107,21 @@ func (r *RuntimeUpdates) Sync(
}
}).Stop()
for {
select {
case v, ok := <-upserted:
if !ok {
return nil
}
// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
sem := semaphore.NewWeighted(1)
counter.Inc()
if !allowParallel {
select {
case upsertedFifo <- v:
case <-ctx.Done():
return ctx.Err()
}
}
case <-ctx.Done():
return ctx.Err()
}
onSuccess := []OnSuccess[contracts.Entity]{
OnSuccessIncrement[contracts.Entity](&counter), OnSuccessIncrement[contracts.Entity](stat),
}
if !allowParallel {
onSuccess = append(onSuccess, OnSuccessSendTo(upsertedFifo))
}
})
g.Go(func() error {
defer close(deleted)
sem := r.db.GetSemaphoreForTable(utils.TableName(s.Entity()))
return r.db.BulkExec(
ctx, r.db.BuildDeleteStmt(s.Entity()), deleteCount, sem, deleteIds, deleted,
return r.db.NamedBulkExec(
ctx, upsertStmt, upsertCount, sem, upsertEntities, com.SplitOnDupId[contracts.Entity], onSuccess...,
)
})
g.Go(func() error {
var counter com.Counter
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
@ -159,26 +130,14 @@ func (r *RuntimeUpdates) Sync(
}
}).Stop()
for {
select {
case v, ok := <-deleted:
if !ok {
return nil
}
sem := r.db.GetSemaphoreForTable(utils.TableName(s.Entity()))
counter.Inc()
if !allowParallel {
select {
case deletedFifo <- v:
case <-ctx.Done():
return ctx.Err()
}
}
case <-ctx.Done():
return ctx.Err()
}
onSuccess := []OnSuccess[any]{OnSuccessIncrement[any](&counter), OnSuccessIncrement[any](stat)}
if !allowParallel {
onSuccess = append(onSuccess, OnSuccessSendTo(deletedFifo))
}
return r.db.BulkExec(ctx, r.db.BuildDeleteStmt(s.Entity()), deleteCount, sem, deleteIds, onSuccess...)
})
}
@ -205,17 +164,6 @@ func (r *RuntimeUpdates) Sync(
cvStmt, cvPlaceholders := r.db.BuildUpsertStmt(cv.Entity())
cvCount := r.db.BatchSizeByPlaceholders(cvPlaceholders)
upsertedCustomvars := make(chan contracts.Entity, cvCount)
g.Go(func() error {
defer close(upsertedCustomvars)
// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(
ctx, cvStmt, cvCount, sem, customvars, upsertedCustomvars, com.SplitOnDupId[contracts.Entity],
)
})
g.Go(func() error {
var counter com.Counter
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
@ -224,34 +172,18 @@ func (r *RuntimeUpdates) Sync(
}
}).Stop()
for {
select {
case _, ok := <-upsertedCustomvars:
if !ok {
return nil
}
counter.Inc()
case <-ctx.Done():
return ctx.Err()
}
}
})
cvFlatStmt, cvFlatPlaceholders := r.db.BuildUpsertStmt(cvFlat.Entity())
cvFlatCount := r.db.BatchSizeByPlaceholders(cvFlatPlaceholders)
upsertedFlatCustomvars := make(chan contracts.Entity)
g.Go(func() error {
defer close(upsertedFlatCustomvars)
// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(
ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars,
upsertedFlatCustomvars, com.SplitOnDupId[contracts.Entity],
ctx, cvStmt, cvCount, sem, customvars, com.SplitOnDupId[contracts.Entity],
OnSuccessIncrement[contracts.Entity](&counter),
OnSuccessIncrement[contracts.Entity](&telemetry.Stats.Config),
)
})
cvFlatStmt, cvFlatPlaceholders := r.db.BuildUpsertStmt(cvFlat.Entity())
cvFlatCount := r.db.BatchSizeByPlaceholders(cvFlatPlaceholders)
g.Go(func() error {
var counter com.Counter
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
@ -260,18 +192,14 @@ func (r *RuntimeUpdates) Sync(
}
}).Stop()
for {
select {
case _, ok := <-upsertedFlatCustomvars:
if !ok {
return nil
}
// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
sem := semaphore.NewWeighted(1)
counter.Inc()
case <-ctx.Done():
return ctx.Err()
}
}
return r.db.NamedBulkExec(
ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars,
com.SplitOnDupId[contracts.Entity], OnSuccessIncrement[contracts.Entity](&counter),
OnSuccessIncrement[contracts.Entity](&telemetry.Stats.Config),
)
})
g.Go(func() error {
@ -322,6 +250,7 @@ func (r *RuntimeUpdates) xRead(ctx context.Context, updateMessagesByKey map[stri
return icingaredis.WrapCmdErr(cmd)
}
pipe := r.redis.Pipeline()
for _, stream := range rs {
var id string
@ -344,8 +273,25 @@ func (r *RuntimeUpdates) xRead(ctx context.Context, updateMessagesByKey map[stri
return ctx.Err()
}
}
tsAndSerial := strings.Split(id, "-")
if s, err := strconv.ParseUint(tsAndSerial[1], 10, 64); err == nil {
tsAndSerial[1] = strconv.FormatUint(s+1, 10)
}
pipe.XTrimMinIDApprox(ctx, stream.Stream, strings.Join(tsAndSerial, "-"), 0)
streams[stream.Stream] = id
}
if cmds, err := pipe.Exec(ctx); err != nil {
r.logger.Errorw("Can't execute Redis pipeline", zap.Error(errors.WithStack(err)))
} else {
for _, cmd := range cmds {
if cmd.Err() != nil {
r.logger.Errorw("Can't trim runtime updates stream", zap.Error(icingaredis.WrapCmdErr(cmd)))
}
}
}
}
}
}

View file

@ -8,6 +8,7 @@ import (
"github.com/icinga/icingadb/pkg/contracts"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
@ -101,6 +102,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
}
g, ctx := errgroup.WithContext(ctx)
stat := getCounterForEntity(delta.Subject.Entity())
// Create
if len(delta.Create) > 0 {
@ -125,7 +127,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
}
g.Go(func() error {
return s.db.CreateStreamed(ctx, entities)
return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat))
})
}
@ -149,7 +151,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
g.Go(func() error {
// Using upsert here on purpose as this is the fastest way to do bulk updates.
// However, there is a risk that errors in the sync implementation could silently insert new rows.
return s.db.UpsertStreamed(ctx, entities, nil)
return s.db.UpsertStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat))
})
}
@ -157,7 +159,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
if len(delta.Delete) > 0 {
s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
g.Go(func() error {
return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs())
return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), OnSuccessIncrement[any](stat))
})
}
@ -201,3 +203,13 @@ func (s Sync) SyncCustomvars(ctx context.Context) error {
return g.Wait()
}
// getCounterForEntity returns the appropriate counter (config/state) from telemetry.Stats for e.
func getCounterForEntity(e contracts.Entity) *com.Counter {
switch e.(type) {
case *v1.HostState, *v1.ServiceState:
return &telemetry.Stats.State
default:
return &telemetry.Stats.Config
}
}

View file

@ -182,24 +182,6 @@ func (c *Client) HMYield(ctx context.Context, key string, fields ...string) (<-c
}))
}
// StreamLastId fetches the last message of a stream and returns its ID.
func (c *Client) StreamLastId(ctx context.Context, stream string) (string, error) {
lastId := "0-0"
cmd := c.XRevRangeN(ctx, stream, "+", "-", 1)
messages, err := cmd.Result()
if err != nil {
return "", WrapCmdErr(cmd)
}
for _, message := range messages {
lastId = message.ID
}
return lastId, nil
}
// YieldAll yields all entities from Redis that belong to the specified SyncSubject.
func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-chan contracts.Entity, <-chan error) {
key := utils.Key(utils.Name(subject.Entity()), ':')

View file

@ -12,6 +12,7 @@ import (
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"sync"
"sync/atomic"
"time"
)
@ -22,14 +23,15 @@ var timeout = 60 * time.Second
// Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received.
// Also signals on if the heartbeat is Lost.
type Heartbeat struct {
active bool
events chan *HeartbeatMessage
cancelCtx context.CancelFunc
client *Client
done chan struct{}
errMu sync.Mutex
err error
logger *logging.Logger
active bool
events chan *HeartbeatMessage
lastReceivedMs int64
cancelCtx context.CancelFunc
client *Client
done chan struct{}
errMu sync.Mutex
err error
logger *logging.Logger
}
// NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop.
@ -56,6 +58,11 @@ func (h *Heartbeat) Events() <-chan *HeartbeatMessage {
return h.events
}
// LastReceived returns the last heartbeat's receive time in ms.
func (h *Heartbeat) LastReceived() int64 {
return atomic.LoadInt64(&h.lastReceivedMs)
}
// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any.
// Implements the io.Closer interface.
func (h *Heartbeat) Close() error {
@ -132,6 +139,8 @@ func (h *Heartbeat) controller(ctx context.Context) {
h.logger.Infow("Received Icinga heartbeat", zap.String("environment", envId.String()))
h.active = true
}
atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli())
h.sendEvent(m)
case <-time.After(timeout):
if h.active {
@ -141,6 +150,8 @@ func (h *Heartbeat) controller(ctx context.Context) {
} else {
h.logger.Warn("Waiting for Icinga heartbeat")
}
atomic.StoreInt64(&h.lastReceivedMs, 0)
case <-ctx.Done():
return ctx.Err()
}

View file

@ -0,0 +1,203 @@
package telemetry
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/internal"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
"go.uber.org/zap"
"regexp"
"runtime/metrics"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
// ha represents icingadb.HA to avoid import cycles.
type ha interface {
State() (weResponsibleMilli int64, weResponsible, otherResponsible bool)
}
type SuccessfulSync struct {
FinishMilli int64
DurationMilli int64
}
// currentDbConnErr stores ongoing errors from database connections.
var currentDbConnErr struct {
mu sync.Mutex
message string
sinceMilli int64
}
// UpdateCurrentDbConnErr updates the current error information stored in currentDbConnErr.
func UpdateCurrentDbConnErr(err error) {
now := time.Now().UnixMilli()
currentDbConnErr.mu.Lock()
defer currentDbConnErr.mu.Unlock()
if currentDbConnErr.sinceMilli >= now {
// Already updated with a more recent error, ignore this one.
return
}
message := ""
if err != nil {
message = err.Error()
}
if currentDbConnErr.message == message {
// Error stayed the same, no update needed, keeping the old timestamp.
return
}
if currentDbConnErr.message == "" || message == "" {
// Either first error or recovery from an error, update timestamp.
currentDbConnErr.sinceMilli = now
}
currentDbConnErr.message = message
}
// GetCurrentDbConnErr returns the last error message (or the empty string if not in an error state) and a timestamp in
// milliseconds of the last change from OK to error or from error to OK.
func GetCurrentDbConnErr() (string, int64) {
currentDbConnErr.mu.Lock()
defer currentDbConnErr.mu.Unlock()
return currentDbConnErr.message, currentDbConnErr.sinceMilli
}
// OngoingSyncStartMilli is to be updated by the main() function.
var OngoingSyncStartMilli int64
// LastSuccessfulSync is to be updated by the main() function.
var LastSuccessfulSync com.Atomic[SuccessfulSync]
var boolToStr = map[bool]string{false: "0", true: "1"}
var startTime = time.Now().UnixMilli()
// StartHeartbeat periodically writes heartbeats to Redis for being monitored by Icinga 2.
func StartHeartbeat(
ctx context.Context, client *icingaredis.Client, logger *logging.Logger, ha ha, heartbeat *icingaredis.Heartbeat,
) {
goMetrics := NewGoMetrics()
const interval = time.Second
var lastErr string
var silenceUntil time.Time
periodic.Start(ctx, interval, func(tick periodic.Tick) {
heartbeat := heartbeat.LastReceived()
responsibleTsMilli, responsible, otherResponsible := ha.State()
ongoingSyncStart := atomic.LoadInt64(&OngoingSyncStartMilli)
sync, _ := LastSuccessfulSync.Load()
dbConnErr, dbConnErrSinceMilli := GetCurrentDbConnErr()
now := time.Now()
values := map[string]string{
"version": internal.Version.Version,
"time": strconv.FormatInt(now.UnixMilli(), 10),
"start-time": strconv.FormatInt(startTime, 10),
"error": dbConnErr,
"error-since": strconv.FormatInt(dbConnErrSinceMilli, 10),
"performance-data": goMetrics.PerformanceData(),
"last-heartbeat-received": strconv.FormatInt(heartbeat, 10),
"ha-responsible": boolToStr[responsible],
"ha-responsible-ts": strconv.FormatInt(responsibleTsMilli, 10),
"ha-other-responsible": boolToStr[otherResponsible],
"sync-ongoing-since": strconv.FormatInt(ongoingSyncStart, 10),
"sync-success-finish": strconv.FormatInt(sync.FinishMilli, 10),
"sync-success-duration": strconv.FormatInt(sync.DurationMilli, 10),
}
ctx, cancel := context.WithDeadline(ctx, tick.Time.Add(interval))
defer cancel()
cmd := client.XAdd(ctx, &redis.XAddArgs{
Stream: "icingadb:telemetry:heartbeat",
MaxLen: 1,
Values: values,
})
if err := cmd.Err(); err != nil && !utils.IsContextCanceled(err) && !errors.Is(err, context.DeadlineExceeded) {
logw := logger.Debugw
currentErr := err.Error()
if currentErr != lastErr || now.After(silenceUntil) {
logw = logger.Warnw
lastErr = currentErr
silenceUntil = now.Add(time.Minute)
}
logw("Can't update own heartbeat", zap.Error(icingaredis.WrapCmdErr(cmd)))
} else {
lastErr = ""
silenceUntil = time.Time{}
}
})
}
type goMetrics struct {
names []string
units []string
samples []metrics.Sample
}
func NewGoMetrics() *goMetrics {
m := &goMetrics{}
forbiddenRe := regexp.MustCompile(`\W`)
for _, d := range metrics.All() {
switch d.Kind {
case metrics.KindUint64, metrics.KindFloat64:
name := "go_" + strings.TrimLeft(forbiddenRe.ReplaceAllString(d.Name, "_"), "_")
unit := ""
if strings.HasSuffix(d.Name, ":bytes") {
unit = "B"
} else if strings.HasSuffix(d.Name, ":seconds") {
unit = "s"
} else if d.Cumulative {
unit = "c"
}
m.names = append(m.names, name)
m.units = append(m.units, unit)
m.samples = append(m.samples, metrics.Sample{Name: d.Name})
}
}
return m
}
func (g *goMetrics) PerformanceData() string {
metrics.Read(g.samples)
var buf strings.Builder
for i, sample := range g.samples {
if i > 0 {
buf.WriteByte(' ')
}
switch sample.Value.Kind() {
case metrics.KindUint64:
_, _ = fmt.Fprintf(&buf, "%s=%d%s", g.names[i], sample.Value.Uint64(), g.units[i])
case metrics.KindFloat64:
_, _ = fmt.Fprintf(&buf, "%s=%f%s", g.names[i], sample.Value.Float64(), g.units[i])
}
}
return buf.String()
}

View file

@ -0,0 +1,51 @@
package telemetry
import (
"context"
"github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/icinga/icingadb/pkg/utils"
"go.uber.org/zap"
"strconv"
"time"
)
var Stats struct {
// Config & co. are to be increased by the T sync once for every T object synced.
Config, State, History, Overdue, HistoryCleanup com.Counter
}
// WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2.
func WriteStats(ctx context.Context, client *icingaredis.Client, logger *logging.Logger) {
counters := map[string]*com.Counter{
"config_sync": &Stats.Config,
"state_sync": &Stats.State,
"history_sync": &Stats.History,
"overdue_sync": &Stats.Overdue,
"history_cleanup": &Stats.HistoryCleanup,
}
periodic.Start(ctx, time.Second, func(_ periodic.Tick) {
var data []string
for kind, counter := range counters {
if cnt := counter.Reset(); cnt > 0 {
data = append(data, kind, strconv.FormatUint(cnt, 10))
}
}
if data != nil {
cmd := client.XAdd(ctx, &redis.XAddArgs{
Stream: "icingadb:telemetry:stats",
MaxLen: 15 * 60,
Approx: true,
Values: data,
})
if err := cmd.Err(); err != nil && !utils.IsContextCanceled(err) {
logger.Warnw("Can't update own stats", zap.Error(icingaredis.WrapCmdErr(cmd)))
}
}
})
}