From 442e04bbf128e4cf2ddd834cc9b64c21b05ee871 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 21 Apr 2021 11:36:41 +0200 Subject: [PATCH] Use type common.SyncSubject --- cmd/icingadb/main.go | 3 +- pkg/icingadb/sync.go | 63 ++++++++++++++++----------------------- pkg/icingaredis/client.go | 2 +- 3 files changed, 28 insertions(+), 40 deletions(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 17e6f740..5816d802 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/icinga/icingadb/internal/command" + "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/icingadb/history" v1 "github.com/icinga/icingadb/pkg/icingadb/v1" @@ -106,7 +107,7 @@ func run() int { g.Go(func() error { defer wg.Done() - return s.SyncAfterDump(synctx, factory.WithInit, dump) + return s.SyncAfterDump(synctx, common.NewSyncSubject(factory.WithInit), dump) }) } diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 49de0341..aeffdc4b 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/common" "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/icingaredis" "github.com/icinga/icingadb/pkg/utils" @@ -34,36 +35,24 @@ func NewSync(db *DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync } } -func (s Sync) GetDelta(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) *Delta { - // Redis key. - var redisKey string - // Whether we're syncing an entity that implements contracts.Checksumer. - var withChecksum bool - // Value from the factory so that we know what we are synchronizing here. - v := factoryFunc() +func (s Sync) GetDelta(ctx context.Context, subject *common.SyncSubject) *Delta { // Error channel. - errs := make(chan error, 1) + err := make(chan error, 1) - if _, ok := v.(contracts.Checksumer); ok { - withChecksum = true - redisKey = fmt.Sprintf("icinga:checksum:%s", utils.Key(utils.Name(v), ':')) - } else { - redisKey = fmt.Sprintf("icinga:%s", utils.Key(utils.Name(v), ':')) - } + desired, errs := s.redis.YieldAll(ctx, subject) + com.PipeError(errs, err) - desired, err := s.fromRedis(ctx, factoryFunc, redisKey) - com.PipeError(err, errs) + actual, errs := s.db.YieldAll( + ctx, subject.Factory(), s.db.BuildSelectStmt(subject.Entity(), subject.Entity().Fingerprint())) + com.PipeError(errs, err) - actual, err := s.db.YieldAll(ctx, factoryFunc, s.db.BuildSelectStmt(v, v.Fingerprint())) - com.PipeError(err, errs) - - return NewDelta(ctx, actual, desired, withChecksum, s.logger) + return NewDelta(ctx, actual, desired, subject.WithChecksum(), s.logger) } -// SyncAfterDump waits for a config dump to finish (using the dump parameter) and then starts a sync for the type given -// by factoryFunc using the Sync function. -func (s Sync) SyncAfterDump(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, dump *DumpSignals) error { - typeName := utils.Name(factoryFunc()) +// SyncAfterDump waits for a config dump to finish (using the dump parameter) and then starts a sync for the given +// sync subject using the Sync function. +func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, dump *DumpSignals) error { + typeName := utils.Name(subject.Entity()) key := "icinga:" + utils.Key(typeName, ':') startTime := time.Now() @@ -88,24 +77,22 @@ func (s Sync) SyncAfterDump(ctx context.Context, factoryFunc contracts.EntityFac zap.String("type", typeName), zap.String("key", key), zap.Duration("waited", time.Now().Sub(startTime))) - return s.Sync(ctx, factoryFunc) + return s.Sync(ctx, subject) case <-ctx.Done(): return ctx.Err() } } } -// Sync synchronizes entities between Icinga DB and Redis created with the specified factory function. +// Sync synchronizes entities between Icinga DB and Redis created with the specified sync subject. // This function does not respect dump signals. For this, use SyncAfterDump. -func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) error { - // Value from the factory so that we know what we are synchronizing here. - v := factoryFunc() +func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error { // Group for the sync. Whole sync will be cancelled if an error occurs. g, ctx := errgroup.WithContext(ctx) - s.logger.Infof("Syncing %s", utils.Key(utils.Name(v), ' ')) + s.logger.Infof("Syncing %s", utils.Key(utils.Name(subject.Entity()), ' ')) - delta := s.GetDelta(ctx, factoryFunc) + delta := s.GetDelta(ctx, subject) if err := delta.Wait(); err != nil { return err } @@ -116,14 +103,14 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) if delta.WithChecksum { pairs, errs := s.redis.HMYield( ctx, - fmt.Sprintf("icinga:%s", utils.Key(utils.Name(v), ':')), + fmt.Sprintf("icinga:%s", utils.Key(utils.Name(subject.Entity()), ':')), count, concurrent, delta.Create.Keys()...) // Let errors from Redis cancel our group. com.ErrgroupReceive(g, errs) - entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, factoryFunc, pairs, runtime.NumCPU()) + entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, subject.Factory(), pairs, runtime.NumCPU()) // Let errors from CreateEntities cancel our group. com.ErrgroupReceive(g, errs) entities, errs = icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Create, runtime.NumCPU()) @@ -140,17 +127,17 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) // Update if len(delta.Update) > 0 { - s.logger.Infof("Updating %d rows of type %s", len(delta.Update), utils.Key(utils.Name(v), ' ')) + s.logger.Infof("Updating %d rows of type %s", len(delta.Update), utils.Key(utils.Name(subject.Entity()), ' ')) pairs, errs := s.redis.HMYield( ctx, - fmt.Sprintf("icinga:%s", utils.Key(utils.Name(v), ':')), + fmt.Sprintf("icinga:%s", utils.Key(utils.Name(subject.Entity()), ':')), count, concurrent, delta.Update.Keys()...) // Let errors from Redis cancel our group. com.ErrgroupReceive(g, errs) - entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, factoryFunc, pairs, runtime.NumCPU()) + entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, subject.Factory(), pairs, runtime.NumCPU()) // Let errors from CreateEntities cancel our group. com.ErrgroupReceive(g, errs) entities, errs := icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Update, runtime.NumCPU()) @@ -167,9 +154,9 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) // Delete if len(delta.Delete) > 0 { - s.logger.Infof("Deleting %d rows of type %s", len(delta.Delete), utils.Key(utils.Name(v), ' ')) + s.logger.Infof("Deleting %d rows of type %s", len(delta.Delete), utils.Key(utils.Name(subject.Entity()), ' ')) g.Go(func() error { - return s.db.Delete(ctx, v, delta.Delete.IDs()) + return s.db.Delete(ctx, subject.Entity(), delta.Delete.IDs()) }) } diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go index cb04962d..8a024cae 100644 --- a/pkg/icingaredis/client.go +++ b/pkg/icingaredis/client.go @@ -171,7 +171,7 @@ func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-ch if subject.WithChecksum() { key = "icinga:checksum:" + key } else { - key = "icinga:config:" + key + key = "icinga:" + key } pairs, errs := c.HYield(ctx, key, 1<<12)