From 961a3650e8db5e01ea439f3bde9914966575ce0c Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 21 Apr 2021 11:26:05 +0200 Subject: [PATCH 01/16] Introduce type common.SyncSubject --- pkg/common/sync_subject.go | 39 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 pkg/common/sync_subject.go diff --git a/pkg/common/sync_subject.go b/pkg/common/sync_subject.go new file mode 100644 index 00000000..7bf73a01 --- /dev/null +++ b/pkg/common/sync_subject.go @@ -0,0 +1,39 @@ +package common + +import ( + "github.com/icinga/icingadb/pkg/contracts" +) + +// SyncSubject defines information about entities to be synchronized. +type SyncSubject struct { + entity contracts.Entity + factory contracts.EntityFactoryFunc + withChecksum bool +} + +// NewSyncSubject returns a new SyncSubject. +func NewSyncSubject(factoryFunc contracts.EntityFactoryFunc) *SyncSubject { + e := factoryFunc() + _, withChecksum := e.(contracts.Checksumer) + + return &SyncSubject{ + entity: e, + factory: factoryFunc, + withChecksum: withChecksum, + } +} + +// Entity returns one value from the factory. Always returns the same entity. +func (s SyncSubject) Entity() contracts.Entity { + return s.entity +} + +// Factory returns the entity factory function. +func (s SyncSubject) Factory() contracts.EntityFactoryFunc { + return s.factory +} + +// WithChecksum returns whether entities from the factory implement contracts.Checksumer. +func (s SyncSubject) WithChecksum() bool { + return s.withChecksum +} From 51d2532f188dd3abd898cd5dcd8885b1e5d57d52 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 21 Apr 2021 11:36:55 +0200 Subject: [PATCH 02/16] Introduce func icingaredis.Client.YieldAll() --- pkg/icingaredis/client.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go index 777de056..cb04962d 100644 --- a/pkg/icingaredis/client.go +++ b/pkg/icingaredis/client.go @@ -4,10 +4,13 @@ import ( "context" "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/utils" "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" + "runtime" "time" ) @@ -161,3 +164,24 @@ func (c *Client) StreamLastId(ctx context.Context, stream string) (string, error return lastId, nil } + +// YieldAll yields all entities from Redis that belong to the specified SyncSubject. +func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-chan contracts.Entity, <-chan error) { + key := utils.Key(utils.Name(subject.Entity()), ':') + if subject.WithChecksum() { + key = "icinga:checksum:" + key + } else { + key = "icinga:config:" + key + } + + pairs, errs := c.HYield(ctx, key, 1<<12) + g, ctx := errgroup.WithContext(ctx) + // Let errors from HYield cancel the group. + com.ErrgroupReceive(g, errs) + + desired, errs := CreateEntities(ctx, subject.Factory(), pairs, runtime.NumCPU()) + // Let errors from CreateEntities cancel the group. + com.ErrgroupReceive(g, errs) + + return desired, com.WaitAsync(g) +} From 442e04bbf128e4cf2ddd834cc9b64c21b05ee871 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 21 Apr 2021 11:36:41 +0200 Subject: [PATCH 03/16] Use type common.SyncSubject --- cmd/icingadb/main.go | 3 +- pkg/icingadb/sync.go | 63 ++++++++++++++++----------------------- pkg/icingaredis/client.go | 2 +- 3 files changed, 28 insertions(+), 40 deletions(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 17e6f740..5816d802 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/icinga/icingadb/internal/command" + "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/icingadb/history" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" @@ -106,7 +107,7 @@ func run() int { g.Go(func() error { defer wg.Done() - return s.SyncAfterDump(synctx, factory.WithInit, dump) + return s.SyncAfterDump(synctx, common.NewSyncSubject(factory.WithInit), dump) }) } diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 49de0341..aeffdc4b 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/icingaredis" "github.com/icinga/icingadb/pkg/utils" @@ -34,36 +35,24 @@ func NewSync(db *DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync } } -func (s Sync) GetDelta(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) *Delta { - // Redis key. - var redisKey string - // Whether we're syncing an entity that implements contracts.Checksumer. - var withChecksum bool - // Value from the factory so that we know what we are synchronizing here. - v := factoryFunc() +func (s Sync) GetDelta(ctx context.Context, subject *common.SyncSubject) *Delta { // Error channel. - errs := make(chan error, 1) + err := make(chan error, 1) - if _, ok := v.(contracts.Checksumer); ok { - withChecksum = true - redisKey = fmt.Sprintf("icinga:checksum:%s", utils.Key(utils.Name(v), ':')) - } else { - redisKey = fmt.Sprintf("icinga:%s", utils.Key(utils.Name(v), ':')) - } + desired, errs := s.redis.YieldAll(ctx, subject) + com.PipeError(errs, err) - desired, err := s.fromRedis(ctx, factoryFunc, redisKey) - com.PipeError(err, errs) + actual, errs := s.db.YieldAll( + ctx, subject.Factory(), s.db.BuildSelectStmt(subject.Entity(), subject.Entity().Fingerprint())) + com.PipeError(errs, err) - actual, err := s.db.YieldAll(ctx, factoryFunc, s.db.BuildSelectStmt(v, v.Fingerprint())) - com.PipeError(err, errs) - - return NewDelta(ctx, actual, desired, withChecksum, s.logger) + return NewDelta(ctx, actual, desired, subject.WithChecksum(), s.logger) } -// SyncAfterDump waits for a config dump to finish (using the dump parameter) and then starts a sync for the type given -// by factoryFunc using the Sync function. -func (s Sync) SyncAfterDump(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, dump *DumpSignals) error { - typeName := utils.Name(factoryFunc()) +// SyncAfterDump waits for a config dump to finish (using the dump parameter) and then starts a sync for the given +// sync subject using the Sync function. +func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, dump *DumpSignals) error { + typeName := utils.Name(subject.Entity()) key := "icinga:" + utils.Key(typeName, ':') startTime := time.Now() @@ -88,24 +77,22 @@ func (s Sync) SyncAfterDump(ctx context.Context, factoryFunc contracts.EntityFac zap.String("type", typeName), zap.String("key", key), zap.Duration("waited", time.Now().Sub(startTime))) - return s.Sync(ctx, factoryFunc) + return s.Sync(ctx, subject) case <-ctx.Done(): return ctx.Err() } } } -// Sync synchronizes entities between Icinga DB and Redis created with the specified factory function. +// Sync synchronizes entities between Icinga DB and Redis created with the specified sync subject. // This function does not respect dump signals. For this, use SyncAfterDump. -func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) error { - // Value from the factory so that we know what we are synchronizing here. - v := factoryFunc() +func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error { // Group for the sync. Whole sync will be cancelled if an error occurs. g, ctx := errgroup.WithContext(ctx) - s.logger.Infof("Syncing %s", utils.Key(utils.Name(v), ' ')) + s.logger.Infof("Syncing %s", utils.Key(utils.Name(subject.Entity()), ' ')) - delta := s.GetDelta(ctx, factoryFunc) + delta := s.GetDelta(ctx, subject) if err := delta.Wait(); err != nil { return err } @@ -116,14 +103,14 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) if delta.WithChecksum { pairs, errs := s.redis.HMYield( ctx, - fmt.Sprintf("icinga:%s", utils.Key(utils.Name(v), ':')), + fmt.Sprintf("icinga:%s", utils.Key(utils.Name(subject.Entity()), ':')), count, concurrent, delta.Create.Keys()...) // Let errors from Redis cancel our group. com.ErrgroupReceive(g, errs) - entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, factoryFunc, pairs, runtime.NumCPU()) + entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, subject.Factory(), pairs, runtime.NumCPU()) // Let errors from CreateEntities cancel our group. com.ErrgroupReceive(g, errs) entities, errs = icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Create, runtime.NumCPU()) @@ -140,17 +127,17 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) // Update if len(delta.Update) > 0 { - s.logger.Infof("Updating %d rows of type %s", len(delta.Update), utils.Key(utils.Name(v), ' ')) + s.logger.Infof("Updating %d rows of type %s", len(delta.Update), utils.Key(utils.Name(subject.Entity()), ' ')) pairs, errs := s.redis.HMYield( ctx, - fmt.Sprintf("icinga:%s", utils.Key(utils.Name(v), ':')), + fmt.Sprintf("icinga:%s", utils.Key(utils.Name(subject.Entity()), ':')), count, concurrent, delta.Update.Keys()...) // Let errors from Redis cancel our group. com.ErrgroupReceive(g, errs) - entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, factoryFunc, pairs, runtime.NumCPU()) + entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, subject.Factory(), pairs, runtime.NumCPU()) // Let errors from CreateEntities cancel our group. com.ErrgroupReceive(g, errs) entities, errs := icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Update, runtime.NumCPU()) @@ -167,9 +154,9 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) // Delete if len(delta.Delete) > 0 { - s.logger.Infof("Deleting %d rows of type %s", len(delta.Delete), utils.Key(utils.Name(v), ' ')) + s.logger.Infof("Deleting %d rows of type %s", len(delta.Delete), utils.Key(utils.Name(subject.Entity()), ' ')) g.Go(func() error { - return s.db.Delete(ctx, v, delta.Delete.IDs()) + return s.db.Delete(ctx, subject.Entity(), delta.Delete.IDs()) }) } diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go index cb04962d..8a024cae 100644 --- a/pkg/icingaredis/client.go +++ b/pkg/icingaredis/client.go @@ -171,7 +171,7 @@ func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-ch if subject.WithChecksum() { key = "icinga:checksum:" + key } else { - key = "icinga:config:" + key + key = "icinga:" + key } pairs, errs := c.HYield(ctx, key, 1<<12) From 34363e19ac0d19edab043c92ec78c11c6b01177b Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 21 Apr 2021 15:49:18 +0200 Subject: [PATCH 04/16] Expect common.SyncSubject in icingadb.Delta --- pkg/icingadb/delta.go | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go index 46686dfd..724e8041 100644 --- a/pkg/icingadb/delta.go +++ b/pkg/icingadb/delta.go @@ -3,6 +3,7 @@ package icingadb import ( "context" "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/utils" "go.uber.org/zap" @@ -12,20 +13,20 @@ import ( ) type Delta struct { - Create EntitiesById - Update EntitiesById - Delete EntitiesById - WithChecksum bool - done chan error - err error - logger *zap.SugaredLogger + Create EntitiesById + Update EntitiesById + Delete EntitiesById + Subject *common.SyncSubject + done chan error + err error + logger *zap.SugaredLogger } -func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, withChecksum bool, logger *zap.SugaredLogger) *Delta { +func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *zap.SugaredLogger) *Delta { delta := &Delta{ - WithChecksum: withChecksum, - done: make(chan error, 1), - logger: logger, + Subject: subject, + done: make(chan error, 1), + logger: logger, } go delta.start(ctx, actual, desired) @@ -41,7 +42,7 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra defer close(delta.done) var update EntitiesById - if delta.WithChecksum { + if delta.Subject.WithChecksum() { update = EntitiesById{} } actual := EntitiesById{} @@ -68,7 +69,7 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra delete(desired, id) mtx.Unlock() - if delta.WithChecksum && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) { + if delta.Subject.WithChecksum() && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) { updateMtx.Lock() update[id] = d updateMtx.Unlock() @@ -104,7 +105,7 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra delete(actual, id) mtx.Unlock() - if delta.WithChecksum && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) { + if delta.Subject.WithChecksum() && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) { updateMtx.Lock() update[id] = d updateMtx.Unlock() @@ -129,7 +130,7 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra delta.Create = desired delta.Delete = actual - if delta.WithChecksum { + if delta.Subject.WithChecksum() { delta.Update = update } } From 7c70a72991870a71e696cb77b8af4fe97c3f06ad Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 21 Apr 2021 15:50:04 +0200 Subject: [PATCH 05/16] Introduce icingadb.Sync.ApplyDelta() --- pkg/icingadb/sync.go | 59 +++++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index aeffdc4b..8af90bf6 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -35,20 +35,6 @@ func NewSync(db *DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync } } -func (s Sync) GetDelta(ctx context.Context, subject *common.SyncSubject) *Delta { - // Error channel. - err := make(chan error, 1) - - desired, errs := s.redis.YieldAll(ctx, subject) - com.PipeError(errs, err) - - actual, errs := s.db.YieldAll( - ctx, subject.Factory(), s.db.BuildSelectStmt(subject.Entity(), subject.Entity().Fingerprint())) - com.PipeError(errs, err) - - return NewDelta(ctx, actual, desired, subject.WithChecksum(), s.logger) -} - // SyncAfterDump waits for a config dump to finish (using the dump parameter) and then starts a sync for the given // sync subject using the Sync function. func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, dump *DumpSignals) error { @@ -87,30 +73,48 @@ func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, du // Sync synchronizes entities between Icinga DB and Redis created with the specified sync subject. // This function does not respect dump signals. For this, use SyncAfterDump. func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error { - // Group for the sync. Whole sync will be cancelled if an error occurs. - g, ctx := errgroup.WithContext(ctx) - s.logger.Infof("Syncing %s", utils.Key(utils.Name(subject.Entity()), ' ')) - delta := s.GetDelta(ctx, subject) + g, ctx := errgroup.WithContext(ctx) + + desired, redisErrs := s.redis.YieldAll(ctx, subject) + // Let errors from Redis cancel our group. + com.ErrgroupReceive(g, redisErrs) + + actual, dbErrs := s.db.YieldAll( + ctx, subject.Factory(), s.db.BuildSelectStmt(subject.Entity(), subject.Entity().Fingerprint())) + // Let errors from DB cancel our group. + com.ErrgroupReceive(g, dbErrs) + + g.Go(func() error { + return s.ApplyDelta(ctx, NewDelta(ctx, actual, desired, subject, s.logger)) + }) + + return g.Wait() +} + +// ApplyDelta applies all changes from Delta to the database. +func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { if err := delta.Wait(); err != nil { return err } + g, ctx := errgroup.WithContext(ctx) + // Create if len(delta.Create) > 0 { var entities <-chan contracts.Entity - if delta.WithChecksum { + if delta.Subject.WithChecksum() { pairs, errs := s.redis.HMYield( ctx, - fmt.Sprintf("icinga:%s", utils.Key(utils.Name(subject.Entity()), ':')), + fmt.Sprintf("icinga:%s", utils.Key(utils.Name(delta.Subject.Entity()), ':')), count, concurrent, delta.Create.Keys()...) // Let errors from Redis cancel our group. com.ErrgroupReceive(g, errs) - entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, subject.Factory(), pairs, runtime.NumCPU()) + entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, delta.Subject.Factory(), pairs, runtime.NumCPU()) // Let errors from CreateEntities cancel our group. com.ErrgroupReceive(g, errs) entities, errs = icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Create, runtime.NumCPU()) @@ -127,17 +131,17 @@ func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error { // Update if len(delta.Update) > 0 { - s.logger.Infof("Updating %d rows of type %s", len(delta.Update), utils.Key(utils.Name(subject.Entity()), ' ')) + s.logger.Infof("Updating %d rows of type %s", len(delta.Update), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) pairs, errs := s.redis.HMYield( ctx, - fmt.Sprintf("icinga:%s", utils.Key(utils.Name(subject.Entity()), ':')), + fmt.Sprintf("icinga:%s", utils.Key(utils.Name(delta.Subject.Entity()), ':')), count, concurrent, delta.Update.Keys()...) // Let errors from Redis cancel our group. com.ErrgroupReceive(g, errs) - entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, subject.Factory(), pairs, runtime.NumCPU()) + entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, delta.Subject.Factory(), pairs, runtime.NumCPU()) // Let errors from CreateEntities cancel our group. com.ErrgroupReceive(g, errs) entities, errs := icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Update, runtime.NumCPU()) @@ -154,9 +158,9 @@ func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error { // Delete if len(delta.Delete) > 0 { - s.logger.Infof("Deleting %d rows of type %s", len(delta.Delete), utils.Key(utils.Name(subject.Entity()), ' ')) + s.logger.Infof("Deleting %d rows of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) g.Go(func() error { - return s.db.Delete(ctx, subject.Entity(), delta.Delete.IDs()) + return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs()) }) } @@ -165,8 +169,7 @@ func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error { func (s Sync) fromRedis(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, key string) (<-chan contracts.Entity, <-chan error) { // Channel for Redis field-value pairs for the specified key and errors. - pairs, errs := s.redis.HYield( - ctx, key, count) + pairs, errs := s.redis.HYield(ctx, key, count) // Group for the Redis sync. Redis sync will be cancelled if an error occurs. // Note that we're calling HYield with the original context. g, ctx := errgroup.WithContext(ctx) From 5550c47be3b9e3cc595d2149a38204f5b177e120 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 21 Apr 2021 15:51:07 +0200 Subject: [PATCH 06/16] Add NewCustomvarFlat() --- pkg/icingadb/v1/customvar.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/icingadb/v1/customvar.go b/pkg/icingadb/v1/customvar.go index cc321400..56f2b81e 100644 --- a/pkg/icingadb/v1/customvar.go +++ b/pkg/icingadb/v1/customvar.go @@ -22,3 +22,7 @@ type CustomvarFlat struct { func NewCustomvar() contracts.Entity { return &Customvar{} } + +func NewCustomvarFlat() contracts.Entity { + return &CustomvarFlat{} +} From 9c86198302abe4ad1bc3aa99543cd1bf80036043 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 21 Apr 2021 15:54:23 +0200 Subject: [PATCH 07/16] Remove icingadb.Sync.fromRedis() --- pkg/icingadb/sync.go | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 8af90bf6..16e703a0 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -166,19 +166,3 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { return g.Wait() } - -func (s Sync) fromRedis(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, key string) (<-chan contracts.Entity, <-chan error) { - // Channel for Redis field-value pairs for the specified key and errors. - pairs, errs := s.redis.HYield(ctx, key, count) - // Group for the Redis sync. Redis sync will be cancelled if an error occurs. - // Note that we're calling HYield with the original context. - g, ctx := errgroup.WithContext(ctx) - // Let errors from HYield cancel our group. - com.ErrgroupReceive(g, errs) - - desired, errs := icingaredis.CreateEntities(ctx, factoryFunc, pairs, runtime.NumCPU()) - // Let errors from CreateEntities cancel our group. - com.ErrgroupReceive(g, errs) - - return desired, com.WaitAsync(g) -} From eea1d0e486c59b81a1992e1d27cc1ea8c22e2167 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 21 Apr 2021 16:35:52 +0200 Subject: [PATCH 08/16] Fix race --- pkg/icingadb/delta.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go index 724e8041..f5fd8db2 100644 --- a/pkg/icingadb/delta.go +++ b/pkg/icingadb/delta.go @@ -34,7 +34,7 @@ func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subj return delta } -func (delta Delta) Wait() error { +func (delta *Delta) Wait() error { return <-delta.done } From 850de4d371db032e5a4a1050dc3b6812143357bc Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 21 Apr 2021 17:05:27 +0200 Subject: [PATCH 09/16] Remove log message --- pkg/driver/driver.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index d32f207d..765ac6c7 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -32,7 +32,6 @@ func (d Driver) Open(dsn string) (c driver.Conn, err error) { c, err = d.Driver.Open(dsn) if err == nil { // No error. Return immediately. - fmt.Println("Returning connection") return } From 766740974f81158480ca7a5f51881a7cc1ee0f94 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 21 Apr 2021 17:05:48 +0200 Subject: [PATCH 10/16] Precise log messages --- pkg/icingadb/delta.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go index f5fd8db2..7f21a496 100644 --- a/pkg/icingadb/delta.go +++ b/pkg/icingadb/delta.go @@ -53,7 +53,8 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra g.Go(func() error { var cnt com.Counter defer utils.Timed(time.Now(), func(elapsed time.Duration) { - delta.logger.Debugf("Synced %d actual elements in %s", cnt.Val(), elapsed) + delta.logger.Debugf( + "Synced %d actual elements of type %s in %s", cnt.Val(), utils.Name(delta.Subject.Entity()), elapsed) }) for { select { @@ -89,7 +90,8 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra g.Go(func() error { var cnt com.Counter defer utils.Timed(time.Now(), func(elapsed time.Duration) { - delta.logger.Debugf("Synced %d desired elements in %s", cnt.Val(), elapsed) + delta.logger.Debugf( + "Synced %d desired elements of type %s in %s", cnt.Val(), utils.Name(delta.Subject.Entity()), elapsed) }) for { select { From f4ec939b9fa4495c8050fc9f8fa249e6d29d896a Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 21 Apr 2021 17:07:27 +0200 Subject: [PATCH 11/16] Add func icingadb.v1.FlattenCustomvars() --- pkg/icingadb/v1/customvar.go | 66 ++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/pkg/icingadb/v1/customvar.go b/pkg/icingadb/v1/customvar.go index 56f2b81e..fd8d3c09 100644 --- a/pkg/icingadb/v1/customvar.go +++ b/pkg/icingadb/v1/customvar.go @@ -1,8 +1,16 @@ package v1 import ( + "context" + "encoding/json" + "fmt" + "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/flatten" "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "golang.org/x/sync/errgroup" + "runtime" ) type Customvar struct { @@ -26,3 +34,61 @@ func NewCustomvar() contracts.Entity { func NewCustomvarFlat() contracts.Entity { return &CustomvarFlat{} } + +// FlattenCustomvars creates and yields flat custom variables from the provided custom variables. +func FlattenCustomvars(ctx context.Context, cvs <-chan contracts.Entity) (<-chan contracts.Entity, <-chan error) { + cvFlats := make(chan contracts.Entity) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + defer close(cvFlats) + + g, _ := errgroup.WithContext(ctx) + + for i := 0; i < runtime.NumCPU(); i++ { + g.Go(func() error { + for entity := range cvs { + var value interface{} + customvar := entity.(*Customvar) + if err := json.Unmarshal([]byte(customvar.Value), &value); err != nil { + return err + } + + flattened := flatten.Flatten(value, customvar.Name) + + for flatname, flatvalue := range flattened { + flatvalue := fmt.Sprintf("%v", flatvalue) + select { + case cvFlats <- &CustomvarFlat{ + CustomvarMeta: CustomvarMeta{ + EntityWithoutChecksum: EntityWithoutChecksum{ + IdMeta: IdMeta{ + // TODO(el): Schema comment is wrong. + // Without customvar.Id we would produce duplicate keys here. + Id: utils.Checksum(customvar.EnvironmentId.String() + customvar.Id.String() + flatname + flatvalue), + }, + }, + EnvironmentMeta: EnvironmentMeta{ + EnvironmentId: customvar.EnvironmentId, + }, + CustomvarId: customvar.Id, + }, + Flatname: flatname, + FlatnameChecksum: utils.Checksum(flatname), + Flatvalue: flatvalue, + }: + case <-ctx.Done(): + return ctx.Err() + } + } + } + + return nil + }) + } + + return g.Wait() + }) + + return cvFlats, com.WaitAsync(g) +} From 52e5eb774e0c379286a2038d0222d9c00aa4fe75 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 21 Apr 2021 17:08:18 +0200 Subject: [PATCH 12/16] Sync customvar flat --- cmd/icingadb/main.go | 61 ++++++++++++++++++++++++++++++++++++ pkg/icingadb/v1/customvar.go | 3 +- 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 5816d802..49197079 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "github.com/icinga/icingadb/internal/command" + "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/icingadb/history" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" @@ -111,6 +113,65 @@ func run() int { }) } + wg.Add(1) + g.Go(func() error { + defer wg.Done() + + <-dump.Done("icinga:customvar") + + cv := common.NewSyncSubject(v1.NewCustomvar) + + cvs, redisErrs := rc.YieldAll(synctx, cv) + // Let errors from Redis cancel our group. + com.ErrgroupReceive(g, redisErrs) + + // Multiplex cvs to use them both for customvar and customvar_flat. + cvs1, cvs2 := make(chan contracts.Entity), make(chan contracts.Entity) + g.Go(func() error { + defer close(cvs1) + defer close(cvs2) + for { + select { + case cv, ok := <-cvs: + if !ok { + return nil + } + + cvs1 <- cv + cvs2 <- cv + case <-synctx.Done(): + return synctx.Err() + } + } + }) + + actualCvs, dbErrs := db.YieldAll( + ctx, cv.Factory(), db.BuildSelectStmt(cv.Entity(), cv.Entity().Fingerprint())) + // Let errors from DB cancel our group. + com.ErrgroupReceive(g, dbErrs) + + g.Go(func() error { + return s.ApplyDelta(ctx, icingadb.NewDelta(ctx, actualCvs, cvs1, cv, logger)) + }) + + cvFlat := common.NewSyncSubject(v1.NewCustomvarFlat) + + cvFlats, flattenErrs := v1.FlattenCustomvars(ctx, cvs2) + // Let errors from Flatten cancel our group. + com.ErrgroupReceive(g, flattenErrs) + + actualCvFlats, dbErrs := db.YieldAll( + ctx, cvFlat.Factory(), db.BuildSelectStmt(cvFlat.Entity(), cvFlat.Entity().Fingerprint())) + // Let errors from DB cancel our group. + com.ErrgroupReceive(g, dbErrs) + + g.Go(func() error { + return s.ApplyDelta(ctx, icingadb.NewDelta(ctx, actualCvFlats, cvFlats, cvFlat, logger)) + }) + + return nil + }) + g.Go(func() error { wg.Wait() diff --git a/pkg/icingadb/v1/customvar.go b/pkg/icingadb/v1/customvar.go index fd8d3c09..52e19b07 100644 --- a/pkg/icingadb/v1/customvar.go +++ b/pkg/icingadb/v1/customvar.go @@ -7,6 +7,7 @@ import ( "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/flatten" + "github.com/icinga/icingadb/pkg/icingadb/objectpacker" "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" "golang.org/x/sync/errgroup" @@ -65,7 +66,7 @@ func FlattenCustomvars(ctx context.Context, cvs <-chan contracts.Entity) (<-chan IdMeta: IdMeta{ // TODO(el): Schema comment is wrong. // Without customvar.Id we would produce duplicate keys here. - Id: utils.Checksum(customvar.EnvironmentId.String() + customvar.Id.String() + flatname + flatvalue), + Id: utils.Checksum(objectpacker.MustPackAny(customvar.EnvironmentId, customvar.Id, flatname, flatvalue)), }, }, EnvironmentMeta: EnvironmentMeta{ From e71833defbc4abedf71e9790745bf8e088bc0b86 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Fri, 23 Apr 2021 17:45:29 +0200 Subject: [PATCH 13/16] Flatten: Fix keys --- pkg/flatten/flatten.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/flatten/flatten.go b/pkg/flatten/flatten.go index 4adc0258..a31705be 100644 --- a/pkg/flatten/flatten.go +++ b/pkg/flatten/flatten.go @@ -13,13 +13,11 @@ func Flatten(value interface{}, prefix string) map[string]interface{} { switch value := value.(type) { case map[string]interface{}: for k, v := range value { - key += "." + k - flatten(key, v) + flatten(key+"."+k, v) } case []interface{}: for i, v := range value { - key += "[" + strconv.Itoa(i) + "]" - flatten(key, v) + flatten(key+"["+strconv.Itoa(i)+"]", v) } default: flattened[key] = value From bf27824980fc734e5c3f2e2b7acf5fe371dc08f8 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 28 Apr 2021 16:57:24 +0200 Subject: [PATCH 14/16] Remove NewCustomvar from Factories We now explicitly sync it in main.go. --- pkg/icingadb/v1/v1.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/icingadb/v1/v1.go b/pkg/icingadb/v1/v1.go index f8cef88a..337c5c46 100644 --- a/pkg/icingadb/v1/v1.go +++ b/pkg/icingadb/v1/v1.go @@ -11,7 +11,6 @@ var Factories = []contracts.EntityFactoryFunc{ NewCheckcommandCustomvar, NewCheckcommandEnvvar, NewComment, - NewCustomvar, NewDowntime, NewEndpoint, NewEventcommand, From ac9aa0365a966e399caddd10283198687220676f Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Fri, 7 May 2021 09:00:43 +0200 Subject: [PATCH 15/16] Remove time taken debug log for named queries --- pkg/icingadb/db.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index a79be654..994653e0 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -205,13 +205,11 @@ func (db DB) NamedBulkExec( ctx, func() error { db.logger.Debugf("Executing %s with %d rows..", query, len(b)) - start := time.Now() _, err := db.NamedExecContext(ctx, query, b) if err != nil { fmt.Println(err) return err } - db.logger.Debugf("..took %s", time.Since(start)) cnt.Add(uint64(len(b))) From 58f80bc8d8bd214a64b9e55f970415f4393a8c4f Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Tue, 11 May 2021 13:54:13 +0200 Subject: [PATCH 16/16] Introduce objectpacker.MustPackAny() --- pkg/icingadb/objectpacker/objectpacker.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/icingadb/objectpacker/objectpacker.go b/pkg/icingadb/objectpacker/objectpacker.go index 44c6b92b..0a5bce27 100644 --- a/pkg/icingadb/objectpacker/objectpacker.go +++ b/pkg/icingadb/objectpacker/objectpacker.go @@ -10,6 +10,17 @@ import ( "sort" ) +// MustPackAny calls PackAny using in and panics if there was an error. +func MustPackAny(in ...interface{}) []byte { + var buf bytes.Buffer + + if err := PackAny(in, &buf); err != nil { + panic(err) + } + + return buf.Bytes() +} + // PackAny packs any JSON-encodable value (ex. structs, also ignores interfaces like encoding.TextMarshaler) // to a BSON-similar format suitable for consistent hashing. Spec: //