Add periodic logging for runtime updates

This commit is contained in:
Eric Lippmann 2021-11-03 09:06:54 +01:00
parent 6232773943
commit 8ec157e39b
2 changed files with 120 additions and 8 deletions

View file

@ -209,7 +209,8 @@ func (db *DB) BuildWhere(subject interface{}) (string, int) {
// derives and expands a query and executes it with this set of arguments until the arg stream has been processed.
// The derived queries are executed in a separate goroutine with a weighting of 1
// and can be executed concurrently to the extent allowed by the semaphore passed in sem.
func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}) error {
// Arguments for which the query ran successfully will be streamed on the succeeded channel.
func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}, succeeded chan<- interface{}) error {
var counter com.Counter
defer db.log(ctx, query, &counter).Stop()
@ -245,6 +246,16 @@ func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaph
counter.Add(uint64(len(b)))
if succeeded != nil {
for _, row := range b {
select {
case <-ctx.Done():
return ctx.Err()
case succeeded <- row:
}
}
}
return nil
},
IsRetryable,
@ -505,9 +516,10 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Enti
// The delete statement is created using BuildDeleteStmt with the passed entityType.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}) error {
// IDs for which the query ran successfully will be streamed on the succeeded channel.
func (db *DB) DeleteStreamed(ctx context.Context, entityType contracts.Entity, ids <-chan interface{}, succeeded chan<- interface{}) error {
sem := db.GetSemaphoreForTable(utils.TableName(entityType))
return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids)
return db.BulkExec(ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, succeeded)
}
// Delete creates a channel from the specified ids and
@ -519,7 +531,7 @@ func (db *DB) Delete(ctx context.Context, entityType contracts.Entity, ids []int
}
close(idsCh)
return db.DeleteStreamed(ctx, entityType, idsCh)
return db.DeleteStreamed(ctx, entityType, idsCh, nil)
}
func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted {

View file

@ -4,11 +4,13 @@ import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/internal"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/periodic"
"github.com/icinga/icingadb/pkg/structify"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
@ -75,14 +77,63 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti
r.logger.Debugf("Syncing runtime updates of %s", s.Name())
g.Go(structifyStream(ctx, updateMessages, upsertEntities, deleteIds, structify.MakeMapStructifier(reflect.TypeOf(s.Entity()).Elem(), "json")))
upserted := make(chan contracts.Entity)
g.Go(func() error {
defer close(upserted)
stmt, placeholders := r.db.BuildUpsertStmt(s.Entity())
// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, nil)
return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, upserted)
})
g.Go(func() error {
return r.db.DeleteStreamed(ctx, s.Entity(), deleteIds)
var counter com.Counter
defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) {
if count := counter.Reset(); count > 0 {
r.logger.Infof("Upserted %d %s items", count, s.Name())
}
}).Stop()
for {
select {
case _, ok := <-upserted:
if !ok {
return nil
}
counter.Inc()
case <-ctx.Done():
return ctx.Err()
}
}
})
deleted := make(chan interface{})
g.Go(func() error {
defer close(deleted)
return r.db.DeleteStreamed(ctx, s.Entity(), deleteIds, deleted)
})
g.Go(func() error {
var counter com.Counter
defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) {
if count := counter.Reset(); count > 0 {
r.logger.Infof("Deleted %d %s items", count, s.Name())
}
}).Stop()
for {
select {
case _, ok := <-deleted:
if !ok {
return nil
}
counter.Inc()
case <-ctx.Done():
return ctx.Err()
}
}
})
}
@ -103,18 +154,67 @@ func (r *RuntimeUpdates) Sync(ctx context.Context, factoryFuncs []contracts.Enti
customvars, flatCustomvars, errs := v1.ExpandCustomvars(ctx, upsertEntities)
com.ErrgroupReceive(g, errs)
upsertedCustomvars := make(chan contracts.Entity)
g.Go(func() error {
defer close(upsertedCustomvars)
stmt, placeholders := r.db.BuildUpsertStmt(cv.Entity())
// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, customvars, nil)
return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, customvars, upsertedCustomvars)
})
g.Go(func() error {
var counter com.Counter
defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) {
if count := counter.Reset(); count > 0 {
r.logger.Infof("Upserted %d %s items", count, cv.Name())
}
}).Stop()
for {
select {
case _, ok := <-upsertedCustomvars:
if !ok {
return nil
}
counter.Inc()
case <-ctx.Done():
return ctx.Err()
}
}
})
upsertedFlatCustomvars := make(chan contracts.Entity)
g.Go(func() error {
defer close(upsertedFlatCustomvars)
stmt, placeholders := r.db.BuildUpsertStmt(cvFlat.Entity())
// Updates must be executed in order, ensure this by using a semaphore with maximum 1.
sem := semaphore.NewWeighted(1)
return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, flatCustomvars, nil)
return r.db.NamedBulkExec(ctx, stmt, r.db.BatchSizeByPlaceholders(placeholders), sem, flatCustomvars, upsertedFlatCustomvars)
})
g.Go(func() error {
var counter com.Counter
defer periodic.Start(ctx, internal.LoggingInterval(), func(_ periodic.Tick) {
if count := counter.Reset(); count > 0 {
r.logger.Infof("Upserted %d %s items", count, cvFlat.Name())
}
}).Stop()
for {
select {
case _, ok := <-upsertedFlatCustomvars:
if !ok {
return nil
}
counter.Inc()
case <-ctx.Done():
return ctx.Err()
}
}
})
g.Go(func() error {