From db4725218e47dcb29167becb3f2aecc553769957 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 31 May 2022 18:05:20 +0200 Subject: [PATCH 01/14] DB#{*BulkExec,*Streamed}(): replace succeeded channel with callbacks to allow to avoid a goroutine reading that channel. --- pkg/icingadb/db.go | 75 ++++++++++++------- pkg/icingadb/history/sync.go | 2 +- pkg/icingadb/runtime_updates.go | 129 +++++++------------------------- pkg/icingadb/sync.go | 2 +- 4 files changed, 77 insertions(+), 131 deletions(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 8d5b9438..e5375bd8 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -228,13 +228,39 @@ func (db *DB) BuildWhere(subject interface{}) (string, int) { return strings.Join(where, ` AND `), len(columns) } +// OnSuccess is a callback for successful (bulk) DML operations. +type OnSuccess[T any] func(ctx context.Context, affectedRows []T) (err error) + +func OnSuccessIncrement[T any](counter *com.Counter) OnSuccess[T] { + return func(_ context.Context, rows []T) error { + counter.Add(uint64(len(rows))) + return nil + } +} + +func OnSuccessSendTo[T any](ch chan<- T) OnSuccess[T] { + return func(ctx context.Context, rows []T) error { + for _, row := range rows { + select { + case ch <- row: + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + } +} + // BulkExec bulk executes queries with a single slice placeholder in the form of `IN (?)`. // Takes in up to the number of arguments specified in count from the arg stream, // derives and expands a query and executes it with this set of arguments until the arg stream has been processed. // The derived queries are executed in a separate goroutine with a weighting of 1 // and can be executed concurrently to the extent allowed by the semaphore passed in sem. -// Arguments for which the query ran successfully will be streamed on the succeeded channel. -func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}, succeeded chan<- interface{}) error { +// Arguments for which the query ran successfully will be passed to onSuccess. +func (db *DB) BulkExec( + ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan any, onSuccess ...OnSuccess[any], +) error { var counter com.Counter defer db.log(ctx, query, &counter).Stop() @@ -270,13 +296,9 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph counter.Add(uint64(len(b))) - if succeeded != nil { - for _, row := range b { - select { - case <-ctx.Done(): - return ctx.Err() - case succeeded <- row: - } + for _, onSuccess := range onSuccess { + if err := onSuccess(ctx, b); err != nil { + return err } } @@ -302,10 +324,10 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph // this set of arguments, until the arg stream has been processed. // The queries are executed in a separate goroutine with a weighting of 1 // and can be executed concurrently to the extent allowed by the semaphore passed in sem. -// Entities for which the query ran successfully will be streamed on the succeeded channel. +// Entities for which the query ran successfully will be passed to onSuccess. func (db *DB) NamedBulkExec( ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity, - succeeded chan<- contracts.Entity, splitPolicyFactory com.BulkChunkSplitPolicyFactory[contracts.Entity], + splitPolicyFactory com.BulkChunkSplitPolicyFactory[contracts.Entity], onSuccess ...OnSuccess[contracts.Entity], ) error { var counter com.Counter defer db.log(ctx, query, &counter).Stop() @@ -339,13 +361,9 @@ func (db *DB) NamedBulkExec( counter.Add(uint64(len(b))) - if succeeded != nil { - for _, row := range b { - select { - case <-ctx.Done(): - return ctx.Err() - case succeeded <- row: - } + for _, onSuccess := range onSuccess { + if err := onSuccess(ctx, b); err != nil { + return err } } @@ -503,7 +521,7 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti stmt, placeholders := db.BuildInsertStmt(first) return db.NamedBulkExec( - ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, nil, com.NeverSplit[contracts.Entity], + ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, com.NeverSplit[contracts.Entity], ) } @@ -511,7 +529,10 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti // The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream. // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and // concurrency is controlled via Options.MaxConnectionsPerTable. -func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Entity, succeeded chan<- contracts.Entity) error { +// Entities for which the query ran successfully will be passed to onSuccess. +func (db *DB) UpsertStreamed( + ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity], +) error { first, forward, err := com.CopyFirst(ctx, entities) if first == nil { return errors.Wrap(err, "can't copy first entity") @@ -522,7 +543,7 @@ func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan contracts.Enti return db.NamedBulkExec( ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, - forward, succeeded, com.SplitOnDupId[contracts.Entity], + forward, com.SplitOnDupId[contracts.Entity], onSuccess..., ) } @@ -545,10 +566,14 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Enti // The delete statement is created using BuildDeleteStmt with the passed entityType. // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and // concurrency is controlled via Options.MaxConnectionsPerTable. -// IDs for which the query ran successfully will be streamed on the succeeded channel. -func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}, succeeded chan<- interface{}) error { +// IDs for which the query ran successfully will be passed to onSuccess. +func (db *DB) DeleteStreamed( + ctx context.Context, entityType contracts.Entity, ids <-chan interface{}, onSuccess ...OnSuccess[any], +) error { sem := db.GetSemaphoreForTable(utils.TableName(entityType)) - return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, succeeded) + return db.BulkExec( + ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, onSuccess..., + ) } // Delete creates a channel from the specified ids and @@ -560,7 +585,7 @@ func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []int } close(idsCh) - return db.DeleteStreamed(ctx, entityType, idsCh, nil) + return db.DeleteStreamed(ctx, entityType, idsCh) } func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted { diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index d5bebd65..9e961f00 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -253,7 +253,7 @@ func writeMultiEntityStage(entryToEntities func(entry redis.XMessage) ([]v1.Upse g.Go(func() error { defer close(inserted) - return s.db.UpsertStreamed(ctx, insert, inserted) + return s.db.UpsertStreamed(ctx, insert, icingadb.OnSuccessSendTo[contracts.Entity](inserted)) }) g.Go(func() error { diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 636c0bb6..4c92fc29 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -72,25 +72,19 @@ func (r *RuntimeUpdates) Sync( upsertEntities := make(chan contracts.Entity, r.redis.Options.XReadCount) deleteIds := make(chan interface{}, r.redis.Options.XReadCount) - var upserted chan contracts.Entity var upsertedFifo chan contracts.Entity - var deleted chan interface{} var deletedFifo chan interface{} var upsertCount int var deleteCount int upsertStmt, upsertPlaceholders := r.db.BuildUpsertStmt(s.Entity()) if !allowParallel { - upserted = make(chan contracts.Entity, 1) upsertedFifo = make(chan contracts.Entity, 1) - deleted = make(chan interface{}, 1) deletedFifo = make(chan interface{}, 1) upsertCount = 1 deleteCount = 1 } else { upsertCount = r.db.BatchSizeByPlaceholders(upsertPlaceholders) deleteCount = r.db.Options.MaxPlaceholdersPerStatement - upserted = make(chan contracts.Entity, upsertCount) - deleted = make(chan interface{}, deleteCount) } updateMessagesByKey[fmt.Sprintf("icinga:%s", utils.Key(s.Name(), ':'))] = updateMessages @@ -102,16 +96,6 @@ func (r *RuntimeUpdates) Sync( structify.MakeMapStructifier(reflect.TypeOf(s.Entity()).Elem(), "json"), )) - g.Go(func() error { - defer close(upserted) - - // Updates must be executed in order, ensure this by using a semaphore with maximum 1. - sem := semaphore.NewWeighted(1) - - return r.db.NamedBulkExec( - ctx, upsertStmt, upsertCount, sem, upsertEntities, upserted, com.SplitOnDupId[contracts.Entity], - ) - }) g.Go(func() error { var counter com.Counter defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { @@ -120,37 +104,19 @@ func (r *RuntimeUpdates) Sync( } }).Stop() - for { - select { - case v, ok := <-upserted: - if !ok { - return nil - } + // Updates must be executed in order, ensure this by using a semaphore with maximum 1. + sem := semaphore.NewWeighted(1) - counter.Inc() - - if !allowParallel { - select { - case upsertedFifo <- v: - case <-ctx.Done(): - return ctx.Err() - } - } - case <-ctx.Done(): - return ctx.Err() - } + onSuccess := []OnSuccess[contracts.Entity]{OnSuccessIncrement[contracts.Entity](&counter)} + if !allowParallel { + onSuccess = append(onSuccess, OnSuccessSendTo(upsertedFifo)) } - }) - g.Go(func() error { - defer close(deleted) - - sem := r.db.GetSemaphoreForTable(utils.TableName(s.Entity())) - - return r.db.BulkExec( - ctx, r.db.BuildDeleteStmt(s.Entity()), deleteCount, sem, deleteIds, deleted, + return r.db.NamedBulkExec( + ctx, upsertStmt, upsertCount, sem, upsertEntities, com.SplitOnDupId[contracts.Entity], onSuccess..., ) }) + g.Go(func() error { var counter com.Counter defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { @@ -159,26 +125,14 @@ func (r *RuntimeUpdates) Sync( } }).Stop() - for { - select { - case v, ok := <-deleted: - if !ok { - return nil - } + sem := r.db.GetSemaphoreForTable(utils.TableName(s.Entity())) - counter.Inc() - - if !allowParallel { - select { - case deletedFifo <- v: - case <-ctx.Done(): - return ctx.Err() - } - } - case <-ctx.Done(): - return ctx.Err() - } + onSuccess := []OnSuccess[any]{OnSuccessIncrement[any](&counter)} + if !allowParallel { + onSuccess = append(onSuccess, OnSuccessSendTo(deletedFifo)) } + + return r.db.BulkExec(ctx, r.db.BuildDeleteStmt(s.Entity()), deleteCount, sem, deleteIds, onSuccess...) }) } @@ -205,17 +159,6 @@ func (r *RuntimeUpdates) Sync( cvStmt, cvPlaceholders := r.db.BuildUpsertStmt(cv.Entity()) cvCount := r.db.BatchSizeByPlaceholders(cvPlaceholders) - upsertedCustomvars := make(chan contracts.Entity, cvCount) - g.Go(func() error { - defer close(upsertedCustomvars) - - // Updates must be executed in order, ensure this by using a semaphore with maximum 1. - sem := semaphore.NewWeighted(1) - - return r.db.NamedBulkExec( - ctx, cvStmt, cvCount, sem, customvars, upsertedCustomvars, com.SplitOnDupId[contracts.Entity], - ) - }) g.Go(func() error { var counter com.Counter defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { @@ -224,34 +167,17 @@ func (r *RuntimeUpdates) Sync( } }).Stop() - for { - select { - case _, ok := <-upsertedCustomvars: - if !ok { - return nil - } - - counter.Inc() - case <-ctx.Done(): - return ctx.Err() - } - } - }) - - cvFlatStmt, cvFlatPlaceholders := r.db.BuildUpsertStmt(cvFlat.Entity()) - cvFlatCount := r.db.BatchSizeByPlaceholders(cvFlatPlaceholders) - upsertedFlatCustomvars := make(chan contracts.Entity) - g.Go(func() error { - defer close(upsertedFlatCustomvars) - // Updates must be executed in order, ensure this by using a semaphore with maximum 1. sem := semaphore.NewWeighted(1) return r.db.NamedBulkExec( - ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, - upsertedFlatCustomvars, com.SplitOnDupId[contracts.Entity], + ctx, cvStmt, cvCount, sem, customvars, com.SplitOnDupId[contracts.Entity], + OnSuccessIncrement[contracts.Entity](&counter), ) }) + + cvFlatStmt, cvFlatPlaceholders := r.db.BuildUpsertStmt(cvFlat.Entity()) + cvFlatCount := r.db.BatchSizeByPlaceholders(cvFlatPlaceholders) g.Go(func() error { var counter com.Counter defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) { @@ -260,18 +186,13 @@ func (r *RuntimeUpdates) Sync( } }).Stop() - for { - select { - case _, ok := <-upsertedFlatCustomvars: - if !ok { - return nil - } + // Updates must be executed in order, ensure this by using a semaphore with maximum 1. + sem := semaphore.NewWeighted(1) - counter.Inc() - case <-ctx.Done(): - return ctx.Err() - } - } + return r.db.NamedBulkExec( + ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, + com.SplitOnDupId[contracts.Entity], OnSuccessIncrement[contracts.Entity](&counter), + ) }) g.Go(func() error { diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 7f139673..3372e98b 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -149,7 +149,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { g.Go(func() error { // Using upsert here on purpose as this is the fastest way to do bulk updates. // However, there is a risk that errors in the sync implementation could silently insert new rows. - return s.db.UpsertStreamed(ctx, entities, nil) + return s.db.UpsertStreamed(ctx, entities) }) } From cc352252ecea8e969df81fe94f6fffb1bae7bbaa Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 31 May 2022 18:11:30 +0200 Subject: [PATCH 02/14] DB#CreateStreamed(): allow monitoring succeeded items --- pkg/icingadb/db.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index e5375bd8..ea0e0a96 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -511,7 +511,10 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF // The insert statement is created using BuildInsertStmt with the first entity from the entities stream. // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and // concurrency is controlled via Options.MaxConnectionsPerTable. -func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Entity) error { +// Entities for which the query ran successfully will be passed to onSuccess. +func (db *DB) CreateStreamed( + ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity], +) error { first, forward, err := com.CopyFirst(ctx, entities) if first == nil { return errors.Wrap(err, "can't copy first entity") @@ -521,7 +524,8 @@ func (db *DB) CreateStreamed(ctx context.Context, entities <-chan contracts.Enti stmt, placeholders := db.BuildInsertStmt(first) return db.NamedBulkExec( - ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, com.NeverSplit[contracts.Entity], + ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, + forward, com.NeverSplit[contracts.Entity], onSuccess..., ) } From 48b7bb4c3529c2e0c6af08b98dfa2e7e9b720d01 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 31 May 2022 18:14:15 +0200 Subject: [PATCH 03/14] DB#Delete(): allow monitoring succeeded items --- pkg/icingadb/db.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index ea0e0a96..33cbb990 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -582,14 +582,17 @@ func (db *DB) DeleteStreamed( // Delete creates a channel from the specified ids and // bulk deletes them by passing the channel along with the entityType to DeleteStreamed. -func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []interface{}) error { +// IDs for which the query ran successfully will be passed to onSuccess. +func (db *DB) Delete( + ctx context.Context, entityType contracts.Entity, ids []interface{}, onSuccess ...OnSuccess[any], +) error { idsCh := make(chan interface{}, len(ids)) for _, id := range ids { idsCh <- id } close(idsCh) - return db.DeleteStreamed(ctx, entityType, idsCh) + return db.DeleteStreamed(ctx, entityType, idsCh, onSuccess...) } func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted { From 0e5d098be40d8781a665d81ce5a83f283729f7fa Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 31 May 2022 18:23:59 +0200 Subject: [PATCH 04/14] DB#CleanupOlderThan(): allow to get done work counted in real time --- pkg/icingadb/cleanup.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/icingadb/cleanup.go b/pkg/icingadb/cleanup.go index a4f98e8b..e57eafaa 100644 --- a/pkg/icingadb/cleanup.go +++ b/pkg/icingadb/cleanup.go @@ -34,9 +34,11 @@ DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table, } // CleanupOlderThan deletes all rows with the specified statement that are older than the given time. -// Deletes a maximum of as many rows per round as defined in count. Returns the number of rows deleted. +// Deletes a maximum of as many rows per round as defined in count. Actually deleted rows will be passed to onSuccess. +// Returns the total number of rows deleted. func (db *DB) CleanupOlderThan( - ctx context.Context, stmt CleanupStmt, envId types.Binary, count uint64, olderThan time.Time, + ctx context.Context, stmt CleanupStmt, envId types.Binary, + count uint64, olderThan time.Time, onSuccess ...OnSuccess[struct{}], ) (uint64, error) { var counter com.Counter defer db.log(ctx, stmt.Build(db.DriverName(), 0), &counter).Stop() @@ -58,6 +60,12 @@ func (db *DB) CleanupOlderThan( counter.Add(uint64(n)) + for _, onSuccess := range onSuccess { + if err := onSuccess(ctx, make([]struct{}, n)); err != nil { + return 0, err + } + } + if n < int64(count) { break } From fac9f5e4e59541d4c4f4a5010d2873bbe75ef1e9 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 2 May 2022 15:38:33 +0200 Subject: [PATCH 05/14] Write ops/s by op and s to icingadb:telemetry:stats --- cmd/icingadb/main.go | 3 ++ config.example.yml | 1 + doc/03-Configuration.md | 1 + pkg/icingadb/history/retention.go | 6 +++- pkg/icingadb/history/sync.go | 2 ++ pkg/icingadb/overdue/sync.go | 2 ++ pkg/icingadb/runtime_updates.go | 10 ++++-- pkg/icingadb/sync.go | 18 +++++++++-- pkg/icingaredis/telemetry/stats.go | 51 ++++++++++++++++++++++++++++++ 9 files changed, 88 insertions(+), 6 deletions(-) create mode 100644 pkg/icingaredis/telemetry/stats.go diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 4b267f78..78f9c725 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -11,6 +11,7 @@ import ( "github.com/icinga/icingadb/pkg/icingadb/overdue" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" "github.com/okzk/sdnotify" @@ -117,6 +118,8 @@ func run() int { } defer db.Close() ha = icingadb.NewHA(ctx, db, heartbeat, logs.GetChildLogger("high-availability")) + + telemetry.WriteStats(ctx, rc, logs.GetChildLogger("telemetry")) } // Closing ha on exit ensures that this instance retracts its heartbeat // from the database so that another instance can take over immediately. diff --git a/config.example.yml b/config.example.yml index 76c82d4a..c6cdbc5a 100644 --- a/config.example.yml +++ b/config.example.yml @@ -47,6 +47,7 @@ logging: # redis: # retention: # runtime-updates: +# telemetry: retention: # Number of days to retain full historical data. By default, historical data is retained forever. diff --git a/doc/03-Configuration.md b/doc/03-Configuration.md index 24322a60..14f58e59 100644 --- a/doc/03-Configuration.md +++ b/doc/03-Configuration.md @@ -63,6 +63,7 @@ overdue-sync | Calculation and synchronization of the overdue status redis | Redis connection status and queries. retention | Deletes historical data that exceed their configured retention period. runtime-updates | Runtime updates of config objects after the initial config synchronization. +telemetry | Reporting of Icinga DB status to Icinga 2 via Redis (for monitoring purposes). ### Duration String diff --git a/pkg/icingadb/history/retention.go b/pkg/icingadb/history/retention.go index 2e899fc4..ff217cdd 100644 --- a/pkg/icingadb/history/retention.go +++ b/pkg/icingadb/history/retention.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/icinga/icingadb/pkg/icingadb" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/pkg/errors" @@ -185,7 +186,10 @@ func (r *Retention) Start(ctx context.Context) error { r.logger.Debugf("Cleaning up historical data for category %s from table %s older than %s", stmt.Category, stmt.Table, olderThan) - deleted, err := r.db.CleanupOlderThan(ctx, stmt.CleanupStmt, e.Id, r.count, olderThan) + deleted, err := r.db.CleanupOlderThan( + ctx, stmt.CleanupStmt, e.Id, r.count, olderThan, + icingadb.OnSuccessIncrement[struct{}](&telemetry.Stats.HistoryCleanup), + ) if err != nil { select { case errs <- err: diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index 9e961f00..25e319bb 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -10,6 +10,7 @@ import ( v1types "github.com/icinga/icingadb/pkg/icingadb/v1" v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/structify" @@ -157,6 +158,7 @@ func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redi } counter.Add(uint64(len(ids))) + telemetry.Stats.History.Add(uint64(len(ids))) case <-ctx.Done(): return ctx.Err() } diff --git a/pkg/icingadb/overdue/sync.go b/pkg/icingadb/overdue/sync.go index d8ba24bd..5cd4d674 100644 --- a/pkg/icingadb/overdue/sync.go +++ b/pkg/icingadb/overdue/sync.go @@ -13,6 +13,7 @@ import ( "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingadb/v1/overdue" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/pkg/errors" @@ -206,6 +207,7 @@ func (s Sync) updateOverdue( } counter.Add(uint64(len(ids))) + telemetry.Stats.Overdue.Add(uint64(len(ids))) var op func(ctx context.Context, key string, members ...interface{}) *redis.IntCmd if overdue { diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 4c92fc29..2ecd206f 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -9,6 +9,7 @@ import ( "github.com/icinga/icingadb/pkg/contracts" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/structify" @@ -67,6 +68,7 @@ func (r *RuntimeUpdates) Sync( for _, factoryFunc := range factoryFuncs { s := common.NewSyncSubject(factoryFunc) + stat := getCounterForEntity(s.Entity()) updateMessages := make(chan redis.XMessage, r.redis.Options.XReadCount) upsertEntities := make(chan contracts.Entity, r.redis.Options.XReadCount) @@ -107,7 +109,9 @@ func (r *RuntimeUpdates) Sync( // Updates must be executed in order, ensure this by using a semaphore with maximum 1. sem := semaphore.NewWeighted(1) - onSuccess := []OnSuccess[contracts.Entity]{OnSuccessIncrement[contracts.Entity](&counter)} + onSuccess := []OnSuccess[contracts.Entity]{ + OnSuccessIncrement[contracts.Entity](&counter), OnSuccessIncrement[contracts.Entity](stat), + } if !allowParallel { onSuccess = append(onSuccess, OnSuccessSendTo(upsertedFifo)) } @@ -127,7 +131,7 @@ func (r *RuntimeUpdates) Sync( sem := r.db.GetSemaphoreForTable(utils.TableName(s.Entity())) - onSuccess := []OnSuccess[any]{OnSuccessIncrement[any](&counter)} + onSuccess := []OnSuccess[any]{OnSuccessIncrement[any](&counter), OnSuccessIncrement[any](stat)} if !allowParallel { onSuccess = append(onSuccess, OnSuccessSendTo(deletedFifo)) } @@ -173,6 +177,7 @@ func (r *RuntimeUpdates) Sync( return r.db.NamedBulkExec( ctx, cvStmt, cvCount, sem, customvars, com.SplitOnDupId[contracts.Entity], OnSuccessIncrement[contracts.Entity](&counter), + OnSuccessIncrement[contracts.Entity](&telemetry.Stats.Config), ) }) @@ -192,6 +197,7 @@ func (r *RuntimeUpdates) Sync( return r.db.NamedBulkExec( ctx, cvFlatStmt, cvFlatCount, sem, flatCustomvars, com.SplitOnDupId[contracts.Entity], OnSuccessIncrement[contracts.Entity](&counter), + OnSuccessIncrement[contracts.Entity](&telemetry.Stats.Config), ) }) diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 3372e98b..9af78e86 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -8,6 +8,7 @@ import ( "github.com/icinga/icingadb/pkg/contracts" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" @@ -101,6 +102,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { } g, ctx := errgroup.WithContext(ctx) + stat := getCounterForEntity(delta.Subject.Entity()) // Create if len(delta.Create) > 0 { @@ -125,7 +127,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { } g.Go(func() error { - return s.db.CreateStreamed(ctx, entities) + return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat)) }) } @@ -149,7 +151,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { g.Go(func() error { // Using upsert here on purpose as this is the fastest way to do bulk updates. // However, there is a risk that errors in the sync implementation could silently insert new rows. - return s.db.UpsertStreamed(ctx, entities) + return s.db.UpsertStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat)) }) } @@ -157,7 +159,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { if len(delta.Delete) > 0 { s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) g.Go(func() error { - return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs()) + return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), OnSuccessIncrement[any](stat)) }) } @@ -201,3 +203,13 @@ func (s Sync) SyncCustomvars(ctx context.Context) error { return g.Wait() } + +// getCounterForEntity returns the appropriate counter (config/state) from telemetry.Stats for e. +func getCounterForEntity(e contracts.Entity) *com.Counter { + switch e.(type) { + case *v1.HostState, *v1.ServiceState: + return &telemetry.Stats.State + default: + return &telemetry.Stats.Config + } +} diff --git a/pkg/icingaredis/telemetry/stats.go b/pkg/icingaredis/telemetry/stats.go new file mode 100644 index 00000000..79cf38a6 --- /dev/null +++ b/pkg/icingaredis/telemetry/stats.go @@ -0,0 +1,51 @@ +package telemetry + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/icinga/icingadb/pkg/utils" + "go.uber.org/zap" + "strconv" + "time" +) + +var Stats struct { + // Config & co. are to be increased by the T sync once for every T object synced. + Config, State, History, Overdue, HistoryCleanup com.Counter +} + +// WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2. +func WriteStats(ctx context.Context, client *icingaredis.Client, logger *logging.Logger) { + counters := map[string]*com.Counter{ + "sync_config": &Stats.Config, + "sync_state": &Stats.State, + "sync_history": &Stats.History, + "sync_overdue": &Stats.Overdue, + "cleanup_history": &Stats.HistoryCleanup, + } + + periodic.Start(ctx, time.Second, func(_ periodic.Tick) { + var data []string + for kind, counter := range counters { + if cnt := counter.Reset(); cnt > 0 { + data = append(data, kind, strconv.FormatUint(cnt, 10)) + } + } + + if data != nil { + cmd := client.XAdd(ctx, &redis.XAddArgs{ + Stream: "icingadb:telemetry:stats", + MaxLen: 15 * 60, + Approx: true, + Values: data, + }) + if err := cmd.Err(); err != nil && !utils.IsContextCanceled(err) { + logger.Warnw("Can't update own stats", zap.Error(icingaredis.WrapCmdErr(cmd))) + } + } + }) +} From 9b618c690ae365f7d1e431926744913886ee3929 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 10 May 2022 13:14:57 +0200 Subject: [PATCH 06/14] XTRIM data XREAD from icinga:runtime* for Icinga 2 to monitor pending runtime updates. --- doc/02-Installation.md | 2 +- pkg/icingadb/runtime_updates.go | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/doc/02-Installation.md b/doc/02-Installation.md index 1236649f..83ec8fc9 100644 --- a/doc/02-Installation.md +++ b/doc/02-Installation.md @@ -2,7 +2,7 @@ ## Requirements -* Local Redis instance (Will be installed during this documentation) +* Local Redis (≥6.2) instance (Will be installed during this documentation) * MySQL (≥5.5), MariaDB (≥10.1), or PostgreSQL (≥9.6): database, user and schema imports (Will be set up during this documentation) ## Setting up Icinga DB diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 2ecd206f..866ff65e 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -15,9 +15,12 @@ import ( "github.com/icinga/icingadb/pkg/structify" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" + "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "reflect" + "strconv" + "strings" "sync" ) @@ -249,6 +252,7 @@ func (r *RuntimeUpdates) xRead(ctx context.Context, updateMessagesByKey map[stri return icingaredis.WrapCmdErr(cmd) } + pipe := r.redis.Pipeline() for _, stream := range rs { var id string @@ -271,8 +275,25 @@ func (r *RuntimeUpdates) xRead(ctx context.Context, updateMessagesByKey map[stri return ctx.Err() } } + + tsAndSerial := strings.Split(id, "-") + if s, err := strconv.ParseUint(tsAndSerial[1], 10, 64); err == nil { + tsAndSerial[1] = strconv.FormatUint(s+1, 10) + } + + pipe.XTrimMinIDApprox(ctx, stream.Stream, strings.Join(tsAndSerial, "-"), 0) streams[stream.Stream] = id } + + if cmds, err := pipe.Exec(ctx); err != nil { + r.logger.Errorw("Can't execute Redis pipeline", zap.Error(errors.WithStack(err))) + } else { + for _, cmd := range cmds { + if cmd.Err() != nil { + r.logger.Errorw("Can't trim runtime updates stream", zap.Error(icingaredis.WrapCmdErr(cmd))) + } + } + } } } } From d85d070d1f728d9b391531574d1a56385a3a81c1 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 23 Jun 2022 16:50:40 +0200 Subject: [PATCH 07/14] Clear icinga:runtime* and read from 0-0 later instead of preserving the (never read) data and reading beyond its end later. This indicates the correct number of pending runtime updates (for monitoring by Icinga 2) from the beginning. --- cmd/icingadb/main.go | 4 ++-- pkg/icingadb/runtime_updates.go | 14 ++++++-------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 78f9c725..52eef291 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -172,9 +172,9 @@ func run() int { configInitSync := sync.WaitGroup{} stateInitSync := &sync.WaitGroup{} - // Get the last IDs of the runtime update streams before starting anything else, + // Clear the runtime update streams before starting anything else (rather than after the sync), // otherwise updates may be lost. - runtimeConfigUpdateStreams, runtimeStateUpdateStreams, err := rt.Streams(synctx) + runtimeConfigUpdateStreams, runtimeStateUpdateStreams, err := rt.ClearStreams(synctx) if err != nil { logger.Fatalf("%+v", err) } diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 866ff65e..bc144b17 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -40,22 +40,20 @@ func NewRuntimeUpdates(db *DB, redis *icingaredis.Client, logger *logging.Logger } } -// Streams returns the stream key to ID mapping of the runtime update streams for later use in Sync. -func (r *RuntimeUpdates) Streams(ctx context.Context) (config, state icingaredis.Streams, err error) { +// ClearStreams returns the stream key to ID mapping of the runtime update streams +// for later use in Sync and clears the streams themselves. +func (r *RuntimeUpdates) ClearStreams(ctx context.Context) (config, state icingaredis.Streams, err error) { config = icingaredis.Streams{"icinga:runtime": "0-0"} state = icingaredis.Streams{"icinga:runtime:state": "0-0"} + var keys []string for _, streams := range [...]icingaredis.Streams{config, state} { for key := range streams { - id, err := r.redis.StreamLastId(ctx, key) - if err != nil { - return nil, nil, err - } - - streams[key] = id + keys = append(keys, key) } } + err = icingaredis.WrapCmdErr(r.redis.Del(ctx, keys...)) return } From 64d7f1be43cdf05d0cf82ab71c9f1bfbfa39e042 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 23 Jun 2022 16:55:04 +0200 Subject: [PATCH 08/14] Remove unused StreamLastId() --- pkg/icingaredis/client.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go index 2af5e825..8ea9d54c 100644 --- a/pkg/icingaredis/client.go +++ b/pkg/icingaredis/client.go @@ -182,24 +182,6 @@ func (c *Client) HMYield(ctx context.Context, key string, fields ...string) (<-c })) } -// StreamLastId fetches the last message of a stream and returns its ID. -func (c *Client) StreamLastId(ctx context.Context, stream string) (string, error) { - lastId := "0-0" - - cmd := c.XRevRangeN(ctx, stream, "+", "-", 1) - messages, err := cmd.Result() - - if err != nil { - return "", WrapCmdErr(cmd) - } - - for _, message := range messages { - lastId = message.ID - } - - return lastId, nil -} - // YieldAll yields all entities from Redis that belong to the specified SyncSubject. func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-chan contracts.Entity, <-chan error) { key := utils.Key(utils.Name(subject.Entity()), ':') From 80ab82329475b093557fc9c2a5fe1524e745a8b9 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 11 May 2022 16:33:05 +0200 Subject: [PATCH 09/14] Introduce Atomic[T] --- pkg/com/atomic.go | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 pkg/com/atomic.go diff --git a/pkg/com/atomic.go b/pkg/com/atomic.go new file mode 100644 index 00000000..316413df --- /dev/null +++ b/pkg/com/atomic.go @@ -0,0 +1,38 @@ +package com + +import "sync/atomic" + +// Atomic is a type-safe wrapper around atomic.Value. +type Atomic[T any] struct { + v atomic.Value +} + +func (a *Atomic[T]) Load() (_ T, ok bool) { + if v, ok := a.v.Load().(box[T]); ok { + return v.v, true + } + + return +} + +func (a *Atomic[T]) Store(v T) { + a.v.Store(box[T]{v}) +} + +func (a *Atomic[T]) Swap(new T) (old T, ok bool) { + if old, ok := a.v.Swap(box[T]{new}).(box[T]); ok { + return old.v, true + } + + return +} + +func (a *Atomic[T]) CompareAndSwap(old, new T) (swapped bool) { + return a.v.CompareAndSwap(box[T]{old}, box[T]{new}) +} + +// box allows, for the case T is an interface, nil values and values of different specific types implementing T +// to be stored in Atomic[T]#v (bypassing atomic.Value#Store()'s policy) by wrapping it (into a non-interface). +type box[T any] struct { + v T +} From e1ff704affaf1346a639f0148675b68fd4251689 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 17 May 2022 12:41:15 +0200 Subject: [PATCH 10/14] Write own heartbeat into icingadb:telemetry:heartbeat including version, current DB error and HA status quo. --- cmd/icingadb/main.go | 17 ++- pkg/driver/driver.go | 29 +++++ pkg/icingadb/ha.go | 35 +++++- pkg/icingaredis/heartbeat.go | 27 +++-- pkg/icingaredis/telemetry/heartbeat.go | 150 +++++++++++++++++++++++++ 5 files changed, 247 insertions(+), 11 deletions(-) create mode 100644 pkg/icingaredis/telemetry/heartbeat.go diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 52eef291..5fbb4e68 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -21,6 +21,7 @@ import ( "os" "os/signal" "sync" + "sync/atomic" "syscall" "time" ) @@ -119,7 +120,9 @@ func run() int { defer db.Close() ha = icingadb.NewHA(ctx, db, heartbeat, logs.GetChildLogger("high-availability")) - telemetry.WriteStats(ctx, rc, logs.GetChildLogger("telemetry")) + telemetryLogger := logs.GetChildLogger("telemetry") + telemetry.StartHeartbeat(ctx, rc, telemetryLogger, ha, heartbeat) + telemetry.WriteStats(ctx, rc, telemetryLogger) } // Closing ha on exit ensures that this instance retracts its heartbeat // from the database so that another instance can take over immediately. @@ -205,6 +208,8 @@ func run() int { }) syncStart := time.Now() + atomic.StoreInt64(&telemetry.OngoingSyncStartMilli, syncStart.UnixMilli()) + logger.Info("Starting config sync") for _, factory := range v1.ConfigFactories { factory := factory @@ -243,10 +248,18 @@ func run() int { g.Go(func() error { configInitSync.Wait() + atomic.StoreInt64(&telemetry.OngoingSyncStartMilli, 0) - elapsed := time.Since(syncStart) + syncEnd := time.Now() + elapsed := syncEnd.Sub(syncStart) logger := logs.GetChildLogger("config-sync") + if synctx.Err() == nil { + telemetry.LastSuccessfulSync.Store(telemetry.SuccessfulSync{ + FinishMilli: syncEnd.UnixMilli(), + DurationMilli: elapsed.Milliseconds(), + }) + logger.Infof("Finished config sync in %s", elapsed) } else { logger.Warnf("Aborted config sync after %s", elapsed) diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index c4066baf..0910a327 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -6,6 +6,7 @@ import ( "database/sql/driver" "github.com/go-sql-driver/mysql" "github.com/icinga/icingadb/pkg/backoff" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/retry" "github.com/jmoiron/sqlx" @@ -39,11 +40,15 @@ func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) { retry.Settings{ Timeout: timeout, OnError: func(_ time.Duration, _ uint64, err, lastErr error) { + updateCurrentDbConnErr(err) + if lastErr == nil || err.Error() != lastErr.Error() { c.driver.Logger.Warnw("Can't connect to database. Retrying", zap.Error(err)) } }, OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) { + updateCurrentDbConnErr(nil) + if attempt > 0 { c.driver.Logger.Infow("Reconnected to database", zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1)) @@ -54,6 +59,30 @@ func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) { return conn, err } +// updateCurrentDbConnErr updates telemetry.CurrentDbConnErr if necessary. +func updateCurrentDbConnErr(err error) { + ours := telemetry.DbConnErr{SinceMilli: time.Now().UnixMilli()} + if err != nil { + ours.Message = err.Error() + } + + for { + theirs, _ := telemetry.CurrentDbConnErr.Load() + if theirs.SinceMilli >= ours.SinceMilli || theirs.Message == ours.Message { + break + } + + merge := ours + if theirs.Message != "" && ours.Message != "" { + merge.SinceMilli = theirs.SinceMilli + } + + if telemetry.CurrentDbConnErr.CompareAndSwap(theirs, merge) { + break + } + } +} + // Driver implements part of the driver.Connector interface. func (c RetryConnector) Driver() driver.Driver { return c.driver diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index b6dbd743..0a81bb5b 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -8,6 +8,7 @@ import ( "github.com/google/uuid" "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/backoff" + "github.com/icinga/icingadb/pkg/com" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" icingaredisv1 "github.com/icinga/icingadb/pkg/icingaredis/v1" @@ -23,8 +24,15 @@ import ( var timeout = 60 * time.Second +type haState struct { + responsibleTsMilli int64 + responsible bool + otherResponsible bool +} + // HA provides high availability and indicates whether a Takeover or Handover must be made. type HA struct { + state com.Atomic[haState] ctx context.Context cancelCtx context.CancelFunc instanceId types.Binary @@ -108,6 +116,12 @@ func (h *HA) Takeover() chan struct{} { return h.takeover } +// State returns the status quo. +func (h *HA) State() (responsibleTsMilli int64, responsible, otherResponsible bool) { + state, _ := h.state.Load() + return state.responsibleTsMilli, state.responsible, state.otherResponsible +} + func (h *HA) abort(err error) { h.errOnce.Do(func() { h.errMu.Lock() @@ -226,12 +240,13 @@ func (h *HA) controller() { } func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary, shouldLog bool) error { - var takeover bool + var takeover, otherResponsible bool err := retry.WithBackoff( ctx, func(ctx context.Context) error { takeover = false + otherResponsible = false tx, errBegin := h.db.BeginTxx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) if errBegin != nil { @@ -248,6 +263,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type ).StructScan(instance) switch errQuery { case nil: + otherResponsible = true if shouldLog { h.logger.Infow("Another instance is active", zap.String("instance_id", instance.Id.String()), @@ -330,6 +346,11 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type } h.signalTakeover() + } else if otherResponsible { + if state, _ := h.state.Load(); !state.otherResponsible { + state.otherResponsible = true + h.state.Store(state) + } } return nil @@ -392,6 +413,12 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar func (h *HA) signalHandover() { if h.responsible { + h.state.Store(haState{ + responsibleTsMilli: time.Now().UnixMilli(), + responsible: false, + otherResponsible: false, + }) + select { case h.handover <- struct{}{}: h.responsible = false @@ -403,6 +430,12 @@ func (h *HA) signalHandover() { func (h *HA) signalTakeover() { if !h.responsible { + h.state.Store(haState{ + responsibleTsMilli: time.Now().UnixMilli(), + responsible: true, + otherResponsible: false, + }) + select { case h.takeover <- struct{}{}: h.responsible = true diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index 2fc08873..72178026 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -12,6 +12,7 @@ import ( "go.uber.org/zap" "golang.org/x/sync/errgroup" "sync" + "sync/atomic" "time" ) @@ -22,14 +23,15 @@ var timeout = 60 * time.Second // Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received. // Also signals on if the heartbeat is Lost. type Heartbeat struct { - active bool - events chan *HeartbeatMessage - cancelCtx context.CancelFunc - client *Client - done chan struct{} - errMu sync.Mutex - err error - logger *logging.Logger + active bool + events chan *HeartbeatMessage + lastReceivedMs int64 + cancelCtx context.CancelFunc + client *Client + done chan struct{} + errMu sync.Mutex + err error + logger *logging.Logger } // NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop. @@ -56,6 +58,11 @@ func (h *Heartbeat) Events() <-chan *HeartbeatMessage { return h.events } +// LastReceived returns the last heartbeat's receive time in ms. +func (h *Heartbeat) LastReceived() int64 { + return atomic.LoadInt64(&h.lastReceivedMs) +} + // Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any. // Implements the io.Closer interface. func (h *Heartbeat) Close() error { @@ -132,6 +139,8 @@ func (h *Heartbeat) controller(ctx context.Context) { h.logger.Infow("Received Icinga heartbeat", zap.String("environment", envId.String())) h.active = true } + + atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli()) h.sendEvent(m) case <-time.After(timeout): if h.active { @@ -141,6 +150,8 @@ func (h *Heartbeat) controller(ctx context.Context) { } else { h.logger.Warn("Waiting for Icinga heartbeat") } + + atomic.StoreInt64(&h.lastReceivedMs, 0) case <-ctx.Done(): return ctx.Err() } diff --git a/pkg/icingaredis/telemetry/heartbeat.go b/pkg/icingaredis/telemetry/heartbeat.go new file mode 100644 index 00000000..32ee7f8b --- /dev/null +++ b/pkg/icingaredis/telemetry/heartbeat.go @@ -0,0 +1,150 @@ +package telemetry + +import ( + "context" + "encoding/json" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "go.uber.org/zap" + "runtime/metrics" + "strconv" + "sync/atomic" + "time" +) + +// ha represents icingadb.HA to avoid import cycles. +type ha interface { + State() (weResponsibleMilli int64, weResponsible, otherResponsible bool) +} + +// jsonMetricValue allows JSON-encoding a metrics.Value. +type jsonMetricValue struct { + v *metrics.Value +} + +// MarshalJSON implements the json.Marshaler interface. +func (jmv jsonMetricValue) MarshalJSON() ([]byte, error) { + switch kind := jmv.v.Kind(); kind { + case metrics.KindUint64: + return json.Marshal(jmv.v.Uint64()) + case metrics.KindFloat64: + return json.Marshal(jmv.v.Float64()) + default: + return nil, errors.Errorf("can't JSON-encode Go metric value of kind %d", int(kind)) + } +} + +type SuccessfulSync struct { + FinishMilli int64 + DurationMilli int64 +} + +type DbConnErr struct { + Message string + SinceMilli int64 +} + +// CurrentDbConnErr is to be updated by the DB connector. +var CurrentDbConnErr com.Atomic[DbConnErr] + +// OngoingSyncStartMilli is to be updated by the main() function. +var OngoingSyncStartMilli int64 + +// LastSuccessfulSync is to be updated by the main() function. +var LastSuccessfulSync com.Atomic[SuccessfulSync] + +var boolToStr = map[bool]string{false: "0", true: "1"} +var startTime = time.Now().UnixMilli() + +// StartHeartbeat periodically writes heartbeats to Redis for being monitored by Icinga 2. +func StartHeartbeat( + ctx context.Context, client *icingaredis.Client, logger *logging.Logger, ha ha, heartbeat *icingaredis.Heartbeat, +) { + allMetrics := metrics.All() + samples := make([]metrics.Sample, 0, len(allMetrics)) + cumulative := map[string]jsonMetricValue{} + notCumulative := map[string]jsonMetricValue{} + byCumulative := map[bool]map[string]jsonMetricValue{true: cumulative, false: notCumulative} + mtrcs := map[string]map[string]jsonMetricValue{"cumulative": cumulative, "not-cumulative": notCumulative} + + for _, m := range allMetrics { + switch m.Kind { + case metrics.KindUint64, metrics.KindFloat64: + samples = append(samples, metrics.Sample{Name: m.Name}) + byCumulative[m.Cumulative][m.Name] = jsonMetricValue{&samples[len(samples)-1].Value} + } + } + + const interval = time.Second + + var lastErr string + var silenceUntil time.Time + + periodic.Start(ctx, interval, func(tick periodic.Tick) { + metrics.Read(samples) + + heartbeat := heartbeat.LastReceived() + responsibleTsMilli, responsible, otherResponsible := ha.State() + ongoingSyncStart := atomic.LoadInt64(&OngoingSyncStartMilli) + sync, _ := LastSuccessfulSync.Load() + dbConnErr, _ := CurrentDbConnErr.Load() + now := time.Now() + + var metricsStr string + if metricsBytes, err := json.Marshal(mtrcs); err == nil { + metricsStr = string(metricsBytes) + } else { + metricsStr = "{}" + logger.Warnw("Can't JSON-encode Go metrics", zap.Error(errors.WithStack(err))) + } + + values := map[string]string{ + "general:version": internal.Version.Version, + "general:time": strconv.FormatInt(now.UnixMilli(), 10), + "general:start-time": strconv.FormatInt(startTime, 10), + "general:err": dbConnErr.Message, + "general:err-since": strconv.FormatInt(dbConnErr.SinceMilli, 10), + "go:metrics": metricsStr, + "heartbeat:last-received": strconv.FormatInt(heartbeat, 10), + "ha:responsible": boolToStr[responsible], + "ha:responsible-ts": strconv.FormatInt(responsibleTsMilli, 10), + "ha:other-responsible": boolToStr[otherResponsible], + "sync:ongoing-since": strconv.FormatInt(ongoingSyncStart, 10), + "sync:success-finish": strconv.FormatInt(sync.FinishMilli, 10), + "sync:success-duration": strconv.FormatInt(sync.DurationMilli, 10), + } + + ctx, cancel := context.WithDeadline(ctx, tick.Time.Add(interval)) + defer cancel() + + cmd := client.XAdd(ctx, &redis.XAddArgs{ + Stream: "icingadb:telemetry:heartbeat", + MaxLen: 1, + Values: values, + }) + if err := cmd.Err(); err != nil && !utils.IsContextCanceled(err) && !errors.Is(err, context.DeadlineExceeded) { + logw := logger.Debugw + currentErr := err.Error() + + if currentErr != lastErr || now.After(silenceUntil) { + logw = logger.Warnw + lastErr = currentErr + silenceUntil = now.Add(time.Minute) + } + + logw("Can't update own heartbeat", zap.Error(icingaredis.WrapCmdErr(cmd))) + } else { + lastErr = "" + silenceUntil = time.Time{} + } + }) +} + +// Assert interface compliance. +var _ json.Marshaler = jsonMetricValue{} From 36d5f7b33c2a4bb1e4769df2a0a835aa9c8bf8d0 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Fri, 24 Jun 2022 12:33:55 +0200 Subject: [PATCH 11/14] Telemetry: send Go metrics as performance data string Rather than using a JSON structure to convey these values, simply use the existing format to communicate performance data to Icinga 2. Also removes the reference to Go in the Redis structure, allowing this string to be extended with more metrics in the future without running into naming issues. --- pkg/icingaredis/telemetry/heartbeat.go | 128 ++++++++++++++----------- 1 file changed, 71 insertions(+), 57 deletions(-) diff --git a/pkg/icingaredis/telemetry/heartbeat.go b/pkg/icingaredis/telemetry/heartbeat.go index 32ee7f8b..f52ca638 100644 --- a/pkg/icingaredis/telemetry/heartbeat.go +++ b/pkg/icingaredis/telemetry/heartbeat.go @@ -2,7 +2,7 @@ package telemetry import ( "context" - "encoding/json" + "fmt" "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/com" @@ -12,8 +12,10 @@ import ( "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" "go.uber.org/zap" + "regexp" "runtime/metrics" "strconv" + "strings" "sync/atomic" "time" ) @@ -23,23 +25,6 @@ type ha interface { State() (weResponsibleMilli int64, weResponsible, otherResponsible bool) } -// jsonMetricValue allows JSON-encoding a metrics.Value. -type jsonMetricValue struct { - v *metrics.Value -} - -// MarshalJSON implements the json.Marshaler interface. -func (jmv jsonMetricValue) MarshalJSON() ([]byte, error) { - switch kind := jmv.v.Kind(); kind { - case metrics.KindUint64: - return json.Marshal(jmv.v.Uint64()) - case metrics.KindFloat64: - return json.Marshal(jmv.v.Float64()) - default: - return nil, errors.Errorf("can't JSON-encode Go metric value of kind %d", int(kind)) - } -} - type SuccessfulSync struct { FinishMilli int64 DurationMilli int64 @@ -66,20 +51,7 @@ var startTime = time.Now().UnixMilli() func StartHeartbeat( ctx context.Context, client *icingaredis.Client, logger *logging.Logger, ha ha, heartbeat *icingaredis.Heartbeat, ) { - allMetrics := metrics.All() - samples := make([]metrics.Sample, 0, len(allMetrics)) - cumulative := map[string]jsonMetricValue{} - notCumulative := map[string]jsonMetricValue{} - byCumulative := map[bool]map[string]jsonMetricValue{true: cumulative, false: notCumulative} - mtrcs := map[string]map[string]jsonMetricValue{"cumulative": cumulative, "not-cumulative": notCumulative} - - for _, m := range allMetrics { - switch m.Kind { - case metrics.KindUint64, metrics.KindFloat64: - samples = append(samples, metrics.Sample{Name: m.Name}) - byCumulative[m.Cumulative][m.Name] = jsonMetricValue{&samples[len(samples)-1].Value} - } - } + goMetrics := NewGoMetrics() const interval = time.Second @@ -87,8 +59,6 @@ func StartHeartbeat( var silenceUntil time.Time periodic.Start(ctx, interval, func(tick periodic.Tick) { - metrics.Read(samples) - heartbeat := heartbeat.LastReceived() responsibleTsMilli, responsible, otherResponsible := ha.State() ongoingSyncStart := atomic.LoadInt64(&OngoingSyncStartMilli) @@ -96,28 +66,20 @@ func StartHeartbeat( dbConnErr, _ := CurrentDbConnErr.Load() now := time.Now() - var metricsStr string - if metricsBytes, err := json.Marshal(mtrcs); err == nil { - metricsStr = string(metricsBytes) - } else { - metricsStr = "{}" - logger.Warnw("Can't JSON-encode Go metrics", zap.Error(errors.WithStack(err))) - } - values := map[string]string{ - "general:version": internal.Version.Version, - "general:time": strconv.FormatInt(now.UnixMilli(), 10), - "general:start-time": strconv.FormatInt(startTime, 10), - "general:err": dbConnErr.Message, - "general:err-since": strconv.FormatInt(dbConnErr.SinceMilli, 10), - "go:metrics": metricsStr, - "heartbeat:last-received": strconv.FormatInt(heartbeat, 10), - "ha:responsible": boolToStr[responsible], - "ha:responsible-ts": strconv.FormatInt(responsibleTsMilli, 10), - "ha:other-responsible": boolToStr[otherResponsible], - "sync:ongoing-since": strconv.FormatInt(ongoingSyncStart, 10), - "sync:success-finish": strconv.FormatInt(sync.FinishMilli, 10), - "sync:success-duration": strconv.FormatInt(sync.DurationMilli, 10), + "general:version": internal.Version.Version, + "general:time": strconv.FormatInt(now.UnixMilli(), 10), + "general:start-time": strconv.FormatInt(startTime, 10), + "general:err": dbConnErr.Message, + "general:err-since": strconv.FormatInt(dbConnErr.SinceMilli, 10), + "general:performance-data": goMetrics.PerformanceData(), + "heartbeat:last-received": strconv.FormatInt(heartbeat, 10), + "ha:responsible": boolToStr[responsible], + "ha:responsible-ts": strconv.FormatInt(responsibleTsMilli, 10), + "ha:other-responsible": boolToStr[otherResponsible], + "sync:ongoing-since": strconv.FormatInt(ongoingSyncStart, 10), + "sync:success-finish": strconv.FormatInt(sync.FinishMilli, 10), + "sync:success-duration": strconv.FormatInt(sync.DurationMilli, 10), } ctx, cancel := context.WithDeadline(ctx, tick.Time.Add(interval)) @@ -146,5 +108,57 @@ func StartHeartbeat( }) } -// Assert interface compliance. -var _ json.Marshaler = jsonMetricValue{} +type goMetrics struct { + names []string + units []string + samples []metrics.Sample +} + +func NewGoMetrics() *goMetrics { + m := &goMetrics{} + + forbiddenRe := regexp.MustCompile(`\W`) + + for _, d := range metrics.All() { + switch d.Kind { + case metrics.KindUint64, metrics.KindFloat64: + name := "go_" + strings.TrimLeft(forbiddenRe.ReplaceAllString(d.Name, "_"), "_") + + unit := "" + if strings.HasSuffix(d.Name, ":bytes") { + unit = "B" + } else if strings.HasSuffix(d.Name, ":seconds") { + unit = "s" + } else if d.Cumulative { + unit = "c" + } + + m.names = append(m.names, name) + m.units = append(m.units, unit) + m.samples = append(m.samples, metrics.Sample{Name: d.Name}) + } + } + + return m +} + +func (g *goMetrics) PerformanceData() string { + metrics.Read(g.samples) + + var buf strings.Builder + + for i, sample := range g.samples { + if i > 0 { + buf.WriteByte(' ') + } + + switch sample.Value.Kind() { + case metrics.KindUint64: + _, _ = fmt.Fprintf(&buf, "%s=%d%s", g.names[i], sample.Value.Uint64(), g.units[i]) + case metrics.KindFloat64: + _, _ = fmt.Fprintf(&buf, "%s=%f%s", g.names[i], sample.Value.Float64(), g.units[i]) + } + } + + return buf.String() +} From 741460c935ac83efece90c6f79b1ef1622d17a0e Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Fri, 24 Jun 2022 14:32:22 +0200 Subject: [PATCH 12/14] Telemetry: rename keys in heartbeat stream In both C++ and Go, the keys are only used as constant strings, so namespacing them just adds clutter for the `general:*` keys, therefore remove it. --- pkg/icingaredis/telemetry/heartbeat.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/icingaredis/telemetry/heartbeat.go b/pkg/icingaredis/telemetry/heartbeat.go index f52ca638..597c4fe8 100644 --- a/pkg/icingaredis/telemetry/heartbeat.go +++ b/pkg/icingaredis/telemetry/heartbeat.go @@ -67,19 +67,19 @@ func StartHeartbeat( now := time.Now() values := map[string]string{ - "general:version": internal.Version.Version, - "general:time": strconv.FormatInt(now.UnixMilli(), 10), - "general:start-time": strconv.FormatInt(startTime, 10), - "general:err": dbConnErr.Message, - "general:err-since": strconv.FormatInt(dbConnErr.SinceMilli, 10), - "general:performance-data": goMetrics.PerformanceData(), - "heartbeat:last-received": strconv.FormatInt(heartbeat, 10), - "ha:responsible": boolToStr[responsible], - "ha:responsible-ts": strconv.FormatInt(responsibleTsMilli, 10), - "ha:other-responsible": boolToStr[otherResponsible], - "sync:ongoing-since": strconv.FormatInt(ongoingSyncStart, 10), - "sync:success-finish": strconv.FormatInt(sync.FinishMilli, 10), - "sync:success-duration": strconv.FormatInt(sync.DurationMilli, 10), + "version": internal.Version.Version, + "time": strconv.FormatInt(now.UnixMilli(), 10), + "start-time": strconv.FormatInt(startTime, 10), + "error": dbConnErr.Message, + "error-since": strconv.FormatInt(dbConnErr.SinceMilli, 10), + "performance-data": goMetrics.PerformanceData(), + "last-heartbeat-received": strconv.FormatInt(heartbeat, 10), + "ha-responsible": boolToStr[responsible], + "ha-responsible-ts": strconv.FormatInt(responsibleTsMilli, 10), + "ha-other-responsible": boolToStr[otherResponsible], + "sync-ongoing-since": strconv.FormatInt(ongoingSyncStart, 10), + "sync-success-finish": strconv.FormatInt(sync.FinishMilli, 10), + "sync-success-duration": strconv.FormatInt(sync.DurationMilli, 10), } ctx, cancel := context.WithDeadline(ctx, tick.Time.Add(interval)) From def7c5f22c70af64f3b8aeee96f0391551878962 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Fri, 24 Jun 2022 16:30:26 +0200 Subject: [PATCH 13/14] Telemetry: change stats names in Redis The same names are used in perfdata names and config_sync sounds more natural than sync_config. --- pkg/icingaredis/telemetry/stats.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/icingaredis/telemetry/stats.go b/pkg/icingaredis/telemetry/stats.go index 79cf38a6..86db0b3e 100644 --- a/pkg/icingaredis/telemetry/stats.go +++ b/pkg/icingaredis/telemetry/stats.go @@ -21,11 +21,11 @@ var Stats struct { // WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2. func WriteStats(ctx context.Context, client *icingaredis.Client, logger *logging.Logger) { counters := map[string]*com.Counter{ - "sync_config": &Stats.Config, - "sync_state": &Stats.State, - "sync_history": &Stats.History, - "sync_overdue": &Stats.Overdue, - "cleanup_history": &Stats.HistoryCleanup, + "config_sync": &Stats.Config, + "state_sync": &Stats.State, + "history_sync": &Stats.History, + "overdue_sync": &Stats.Overdue, + "history_cleanup": &Stats.HistoryCleanup, } periodic.Start(ctx, time.Second, func(_ periodic.Tick) { From 061660b023e84903ec324e99bcffb9dd9af4d9f6 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Tue, 28 Jun 2022 09:54:50 +0200 Subject: [PATCH 14/14] Telemetry: use mutex for synchronizing last database error The old CompareAndSwap based code tended to end up in an endless loop. Replace it by simple syncrhonization mechanisms where this can't happen. --- pkg/driver/driver.go | 28 +------------ pkg/icingaredis/telemetry/heartbeat.go | 55 ++++++++++++++++++++++---- 2 files changed, 49 insertions(+), 34 deletions(-) diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index 0910a327..f529db44 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -40,14 +40,14 @@ func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) { retry.Settings{ Timeout: timeout, OnError: func(_ time.Duration, _ uint64, err, lastErr error) { - updateCurrentDbConnErr(err) + telemetry.UpdateCurrentDbConnErr(err) if lastErr == nil || err.Error() != lastErr.Error() { c.driver.Logger.Warnw("Can't connect to database. Retrying", zap.Error(err)) } }, OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) { - updateCurrentDbConnErr(nil) + telemetry.UpdateCurrentDbConnErr(nil) if attempt > 0 { c.driver.Logger.Infow("Reconnected to database", @@ -59,30 +59,6 @@ func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) { return conn, err } -// updateCurrentDbConnErr updates telemetry.CurrentDbConnErr if necessary. -func updateCurrentDbConnErr(err error) { - ours := telemetry.DbConnErr{SinceMilli: time.Now().UnixMilli()} - if err != nil { - ours.Message = err.Error() - } - - for { - theirs, _ := telemetry.CurrentDbConnErr.Load() - if theirs.SinceMilli >= ours.SinceMilli || theirs.Message == ours.Message { - break - } - - merge := ours - if theirs.Message != "" && ours.Message != "" { - merge.SinceMilli = theirs.SinceMilli - } - - if telemetry.CurrentDbConnErr.CompareAndSwap(theirs, merge) { - break - } - } -} - // Driver implements part of the driver.Connector interface. func (c RetryConnector) Driver() driver.Driver { return c.driver diff --git a/pkg/icingaredis/telemetry/heartbeat.go b/pkg/icingaredis/telemetry/heartbeat.go index 597c4fe8..ee476a15 100644 --- a/pkg/icingaredis/telemetry/heartbeat.go +++ b/pkg/icingaredis/telemetry/heartbeat.go @@ -16,6 +16,7 @@ import ( "runtime/metrics" "strconv" "strings" + "sync" "sync/atomic" "time" ) @@ -30,13 +31,51 @@ type SuccessfulSync struct { DurationMilli int64 } -type DbConnErr struct { - Message string - SinceMilli int64 +// currentDbConnErr stores ongoing errors from database connections. +var currentDbConnErr struct { + mu sync.Mutex + message string + sinceMilli int64 } -// CurrentDbConnErr is to be updated by the DB connector. -var CurrentDbConnErr com.Atomic[DbConnErr] +// UpdateCurrentDbConnErr updates the current error information stored in currentDbConnErr. +func UpdateCurrentDbConnErr(err error) { + now := time.Now().UnixMilli() + + currentDbConnErr.mu.Lock() + defer currentDbConnErr.mu.Unlock() + + if currentDbConnErr.sinceMilli >= now { + // Already updated with a more recent error, ignore this one. + return + } + + message := "" + if err != nil { + message = err.Error() + } + + if currentDbConnErr.message == message { + // Error stayed the same, no update needed, keeping the old timestamp. + return + } + + if currentDbConnErr.message == "" || message == "" { + // Either first error or recovery from an error, update timestamp. + currentDbConnErr.sinceMilli = now + } + + currentDbConnErr.message = message +} + +// GetCurrentDbConnErr returns the last error message (or the empty string if not in an error state) and a timestamp in +// milliseconds of the last change from OK to error or from error to OK. +func GetCurrentDbConnErr() (string, int64) { + currentDbConnErr.mu.Lock() + defer currentDbConnErr.mu.Unlock() + + return currentDbConnErr.message, currentDbConnErr.sinceMilli +} // OngoingSyncStartMilli is to be updated by the main() function. var OngoingSyncStartMilli int64 @@ -63,15 +102,15 @@ func StartHeartbeat( responsibleTsMilli, responsible, otherResponsible := ha.State() ongoingSyncStart := atomic.LoadInt64(&OngoingSyncStartMilli) sync, _ := LastSuccessfulSync.Load() - dbConnErr, _ := CurrentDbConnErr.Load() + dbConnErr, dbConnErrSinceMilli := GetCurrentDbConnErr() now := time.Now() values := map[string]string{ "version": internal.Version.Version, "time": strconv.FormatInt(now.UnixMilli(), 10), "start-time": strconv.FormatInt(startTime, 10), - "error": dbConnErr.Message, - "error-since": strconv.FormatInt(dbConnErr.SinceMilli, 10), + "error": dbConnErr, + "error-since": strconv.FormatInt(dbConnErrSinceMilli, 10), "performance-data": goMetrics.PerformanceData(), "last-heartbeat-received": strconv.FormatInt(heartbeat, 10), "ha-responsible": boolToStr[responsible],