From 8ec157e39b3e4435d6909a044a9aacd09c75e791 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 3 Nov 2021 09:06:54 +0100 Subject: [PATCH] Add periodic logging for runtime updates --- pkg/icingadb/db.go | 20 ++++-- pkg/icingadb/runtime_updates.go | 108 ++++++++++++++++++++++++++++++-- 2 files changed, 120 insertions(+), 8 deletions(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 5f061f3d..5b46964d 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -209,7 +209,8 @@ func (db *DB) BuildWhere(subject interface{}) (string, int) { // derives and expands a query and executes it with this set of arguments until the arg stream has been processed. // The derived queries are executed in a separate goroutine with a weighting of 1 // and can be executed concurrently to the extent allowed by the semaphore passed in sem. -func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}) error { +// Arguments for which the query ran successfully will be streamed on the succeeded channel. +func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}, succeeded chan<- interface{}) error { var counter com.Counter defer db.log(ctx, query, &counter).Stop() @@ -245,6 +246,16 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph counter.Add(uint64(len(b))) + if succeeded != nil { + for _, row := range b { + select { + case <-ctx.Done(): + return ctx.Err() + case succeeded <- row: + } + } + } + return nil }, IsRetryable, @@ -505,9 +516,10 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Enti // The delete statement is created using BuildDeleteStmt with the passed entityType. // Bulk size is controlled via Options.MaxPlaceholdersPerStatement and // concurrency is controlled via Options.MaxConnectionsPerTable. -func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}) error { +// IDs for which the query ran successfully will be streamed on the succeeded channel. +func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}, succeeded chan<- interface{}) error { sem := db.GetSemaphoreForTable(utils.TableName(entityType)) - return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids) + return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, succeeded) } // Delete creates a channel from the specified ids and @@ -519,7 +531,7 @@ func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []int } close(idsCh) - return db.DeleteStreamed(ctx, entityType, idsCh) + return db.DeleteStreamed(ctx, entityType, idsCh, nil) } func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted { diff --git a/pkg/icingadb/runtime_updates.go b/pkg/icingadb/runtime_updates.go index 89f90efe..8dca1301 100644 --- a/pkg/icingadb/runtime_updates.go +++ b/pkg/icingadb/runtime_updates.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/periodic" "github.com/icinga/icingadb/pkg/structify" "github.com/icinga/icingadb/pkg/utils" "github.com/pkg/errors" @@ -75,14 +77,63 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti r.logger.Debugf("Syncing runtime updates of %s", s.Name()) g.Go(structifyStream(ctx, updateMessages, upsertEntities, deleteIds, structify.MakeMapStructifier(reflect.TypeOf(s.Entity()).Elem(), "json"))) + upserted := make(chan contracts.Entity) g.Go(func() error { + defer close(upserted) + stmt, placeholders := r.db.BuildUpsertStmt(s.Entity()) // Updates must be executed in order, ensure this by using a semaphore with maximum 1. sem := semaphore.NewWeighted(1) - return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, nil) + return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, upserted) }) g.Go(func() error { - return r.db.DeleteStreamed(ctx, s.Entity(), deleteIds) + var counter com.Counter + defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Upserted %d %s items", count, s.Name()) + } + }).Stop() + + for { + select { + case _, ok := <-upserted: + if !ok { + return nil + } + + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + deleted := make(chan interface{}) + g.Go(func() error { + defer close(deleted) + + return r.db.DeleteStreamed(ctx, s.Entity(), deleteIds, deleted) + }) + g.Go(func() error { + var counter com.Counter + defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Deleted %d %s items", count, s.Name()) + } + }).Stop() + + for { + select { + case _, ok := <-deleted: + if !ok { + return nil + } + + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + } }) } @@ -103,18 +154,67 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti customvars, flatCustomvars, errs := v1.ExpandCustomvars(ctx, upsertEntities) com.ErrgroupReceive(g, errs) + + upsertedCustomvars := make(chan contracts.Entity) g.Go(func() error { + defer close(upsertedCustomvars) + stmt, placeholders := r.db.BuildUpsertStmt(cv.Entity()) // Updates must be executed in order, ensure this by using a semaphore with maximum 1. sem := semaphore.NewWeighted(1) - return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, customvars, nil) + return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, customvars, upsertedCustomvars) + }) + g.Go(func() error { + var counter com.Counter + defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Upserted %d %s items", count, cv.Name()) + } + }).Stop() + + for { + select { + case _, ok := <-upsertedCustomvars: + if !ok { + return nil + } + + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + } }) + upsertedFlatCustomvars := make(chan contracts.Entity) g.Go(func() error { + defer close(upsertedFlatCustomvars) + stmt, placeholders := r.db.BuildUpsertStmt(cvFlat.Entity()) // Updates must be executed in order, ensure this by using a semaphore with maximum 1. sem := semaphore.NewWeighted(1) - return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, flatCustomvars, nil) + return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, flatCustomvars, upsertedFlatCustomvars) + }) + g.Go(func() error { + var counter com.Counter + defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + r.logger.Infof("Upserted %d %s items", count, cvFlat.Name()) + } + }).Stop() + + for { + select { + case _, ok := <-upsertedFlatCustomvars: + if !ok { + return nil + } + + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + } }) g.Go(func() error {