Use type common.SyncSubject

This commit is contained in:
Eric Lippmann 2021-04-21 11:36:41 +02:00
parent 51d2532f18
commit 442e04bbf1
3 changed files with 28 additions and 40 deletions

View file

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/icinga/icingadb/internal/command"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/icingadb/history"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
@ -106,7 +107,7 @@ func run() int {
g.Go(func() error {
defer wg.Done()
return s.SyncAfterDump(synctx, factory.WithInit, dump)
return s.SyncAfterDump(synctx, common.NewSyncSubject(factory.WithInit), dump)
})
}

View file

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/utils"
@ -34,36 +35,24 @@ func NewSync(db *DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync
}
}
func (s Sync) GetDelta(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) *Delta {
// Redis key.
var redisKey string
// Whether we're syncing an entity that implements contracts.Checksumer.
var withChecksum bool
// Value from the factory so that we know what we are synchronizing here.
v := factoryFunc()
func (s Sync) GetDelta(ctx context.Context, subject *common.SyncSubject) *Delta {
// Error channel.
errs := make(chan error, 1)
err := make(chan error, 1)
if _, ok := v.(contracts.Checksumer); ok {
withChecksum = true
redisKey = fmt.Sprintf("icinga:checksum:%s", utils.Key(utils.Name(v), ':'))
} else {
redisKey = fmt.Sprintf("icinga:%s", utils.Key(utils.Name(v), ':'))
}
desired, errs := s.redis.YieldAll(ctx, subject)
com.PipeError(errs, err)
desired, err := s.fromRedis(ctx, factoryFunc, redisKey)
com.PipeError(err, errs)
actual, errs := s.db.YieldAll(
ctx, subject.Factory(), s.db.BuildSelectStmt(subject.Entity(), subject.Entity().Fingerprint()))
com.PipeError(errs, err)
actual, err := s.db.YieldAll(ctx, factoryFunc, s.db.BuildSelectStmt(v, v.Fingerprint()))
com.PipeError(err, errs)
return NewDelta(ctx, actual, desired, withChecksum, s.logger)
return NewDelta(ctx, actual, desired, subject.WithChecksum(), s.logger)
}
// SyncAfterDump waits for a config dump to finish (using the dump parameter) and then starts a sync for the type given
// by factoryFunc using the Sync function.
func (s Sync) SyncAfterDump(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, dump *DumpSignals) error {
typeName := utils.Name(factoryFunc())
// SyncAfterDump waits for a config dump to finish (using the dump parameter) and then starts a sync for the given
// sync subject using the Sync function.
func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, dump *DumpSignals) error {
typeName := utils.Name(subject.Entity())
key := "icinga:" + utils.Key(typeName, ':')
startTime := time.Now()
@ -88,24 +77,22 @@ func (s Sync) SyncAfterDump(ctx context.Context, factoryFunc contracts.EntityFac
zap.String("type", typeName),
zap.String("key", key),
zap.Duration("waited", time.Now().Sub(startTime)))
return s.Sync(ctx, factoryFunc)
return s.Sync(ctx, subject)
case <-ctx.Done():
return ctx.Err()
}
}
}
// Sync synchronizes entities between Icinga DB and Redis created with the specified factory function.
// Sync synchronizes entities between Icinga DB and Redis created with the specified sync subject.
// This function does not respect dump signals. For this, use SyncAfterDump.
func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) error {
// Value from the factory so that we know what we are synchronizing here.
v := factoryFunc()
func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error {
// Group for the sync. Whole sync will be cancelled if an error occurs.
g, ctx := errgroup.WithContext(ctx)
s.logger.Infof("Syncing %s", utils.Key(utils.Name(v), ' '))
s.logger.Infof("Syncing %s", utils.Key(utils.Name(subject.Entity()), ' '))
delta := s.GetDelta(ctx, factoryFunc)
delta := s.GetDelta(ctx, subject)
if err := delta.Wait(); err != nil {
return err
}
@ -116,14 +103,14 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc)
if delta.WithChecksum {
pairs, errs := s.redis.HMYield(
ctx,
fmt.Sprintf("icinga:%s", utils.Key(utils.Name(v), ':')),
fmt.Sprintf("icinga:%s", utils.Key(utils.Name(subject.Entity()), ':')),
count,
concurrent,
delta.Create.Keys()...)
// Let errors from Redis cancel our group.
com.ErrgroupReceive(g, errs)
entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, factoryFunc, pairs, runtime.NumCPU())
entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, subject.Factory(), pairs, runtime.NumCPU())
// Let errors from CreateEntities cancel our group.
com.ErrgroupReceive(g, errs)
entities, errs = icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Create, runtime.NumCPU())
@ -140,17 +127,17 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc)
// Update
if len(delta.Update) > 0 {
s.logger.Infof("Updating %d rows of type %s", len(delta.Update), utils.Key(utils.Name(v), ' '))
s.logger.Infof("Updating %d rows of type %s", len(delta.Update), utils.Key(utils.Name(subject.Entity()), ' '))
pairs, errs := s.redis.HMYield(
ctx,
fmt.Sprintf("icinga:%s", utils.Key(utils.Name(v), ':')),
fmt.Sprintf("icinga:%s", utils.Key(utils.Name(subject.Entity()), ':')),
count,
concurrent,
delta.Update.Keys()...)
// Let errors from Redis cancel our group.
com.ErrgroupReceive(g, errs)
entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, factoryFunc, pairs, runtime.NumCPU())
entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, subject.Factory(), pairs, runtime.NumCPU())
// Let errors from CreateEntities cancel our group.
com.ErrgroupReceive(g, errs)
entities, errs := icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Update, runtime.NumCPU())
@ -167,9 +154,9 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc)
// Delete
if len(delta.Delete) > 0 {
s.logger.Infof("Deleting %d rows of type %s", len(delta.Delete), utils.Key(utils.Name(v), ' '))
s.logger.Infof("Deleting %d rows of type %s", len(delta.Delete), utils.Key(utils.Name(subject.Entity()), ' '))
g.Go(func() error {
return s.db.Delete(ctx, v, delta.Delete.IDs())
return s.db.Delete(ctx, subject.Entity(), delta.Delete.IDs())
})
}

View file

@ -171,7 +171,7 @@ func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-ch
if subject.WithChecksum() {
key = "icinga:checksum:" + key
} else {
key = "icinga:config:" + key
key = "icinga:" + key
}
pairs, errs := c.HYield(ctx, key, 1<<12)