Merge pull request #37 from lippserd/sync-cv-flat

Sync customvar flat
This commit is contained in:
Eric Lippmann 2021-05-11 20:08:09 +02:00 committed by GitHub
commit da030805b5
11 changed files with 268 additions and 90 deletions

View file

@ -4,6 +4,9 @@ import (
"context"
"fmt"
"github.com/icinga/icingadb/internal/command"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/icingadb/history"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
@ -106,10 +109,69 @@ func run() int {
g.Go(func() error {
defer wg.Done()
return s.SyncAfterDump(synctx, factory.WithInit, dump)
return s.SyncAfterDump(synctx, common.NewSyncSubject(factory.WithInit), dump)
})
}
wg.Add(1)
g.Go(func() error {
defer wg.Done()
<-dump.Done("icinga:customvar")
cv := common.NewSyncSubject(v1.NewCustomvar)
cvs, redisErrs := rc.YieldAll(synctx, cv)
// Let errors from Redis cancel our group.
com.ErrgroupReceive(g, redisErrs)
// Multiplex cvs to use them both for customvar and customvar_flat.
cvs1, cvs2 := make(chan contracts.Entity), make(chan contracts.Entity)
g.Go(func() error {
defer close(cvs1)
defer close(cvs2)
for {
select {
case cv, ok := <-cvs:
if !ok {
return nil
}
cvs1 <- cv
cvs2 <- cv
case <-synctx.Done():
return synctx.Err()
}
}
})
actualCvs, dbErrs := db.YieldAll(
ctx, cv.Factory(), db.BuildSelectStmt(cv.Entity(), cv.Entity().Fingerprint()))
// Let errors from DB cancel our group.
com.ErrgroupReceive(g, dbErrs)
g.Go(func() error {
return s.ApplyDelta(ctx, icingadb.NewDelta(ctx, actualCvs, cvs1, cv, logger))
})
cvFlat := common.NewSyncSubject(v1.NewCustomvarFlat)
cvFlats, flattenErrs := v1.FlattenCustomvars(ctx, cvs2)
// Let errors from Flatten cancel our group.
com.ErrgroupReceive(g, flattenErrs)
actualCvFlats, dbErrs := db.YieldAll(
ctx, cvFlat.Factory(), db.BuildSelectStmt(cvFlat.Entity(), cvFlat.Entity().Fingerprint()))
// Let errors from DB cancel our group.
com.ErrgroupReceive(g, dbErrs)
g.Go(func() error {
return s.ApplyDelta(ctx, icingadb.NewDelta(ctx, actualCvFlats, cvFlats, cvFlat, logger))
})
return nil
})
g.Go(func() error {
wg.Wait()

View file

@ -0,0 +1,39 @@
package common
import (
"github.com/icinga/icingadb/pkg/contracts"
)
// SyncSubject defines information about entities to be synchronized.
type SyncSubject struct {
entity contracts.Entity
factory contracts.EntityFactoryFunc
withChecksum bool
}
// NewSyncSubject returns a new SyncSubject.
func NewSyncSubject(factoryFunc contracts.EntityFactoryFunc) *SyncSubject {
e := factoryFunc()
_, withChecksum := e.(contracts.Checksumer)
return &SyncSubject{
entity: e,
factory: factoryFunc,
withChecksum: withChecksum,
}
}
// Entity returns one value from the factory. Always returns the same entity.
func (s SyncSubject) Entity() contracts.Entity {
return s.entity
}
// Factory returns the entity factory function.
func (s SyncSubject) Factory() contracts.EntityFactoryFunc {
return s.factory
}
// WithChecksum returns whether entities from the factory implement contracts.Checksumer.
func (s SyncSubject) WithChecksum() bool {
return s.withChecksum
}

View file

@ -32,7 +32,6 @@ func (d Driver) Open(dsn string) (c driver.Conn, err error) {
c, err = d.Driver.Open(dsn)
if err == nil {
// No error. Return immediately.
fmt.Println("Returning connection")
return
}

View file

@ -13,13 +13,11 @@ func Flatten(value interface{}, prefix string) map[string]interface{} {
switch value := value.(type) {
case map[string]interface{}:
for k, v := range value {
key += "." + k
flatten(key, v)
flatten(key+"."+k, v)
}
case []interface{}:
for i, v := range value {
key += "[" + strconv.Itoa(i) + "]"
flatten(key, v)
flatten(key+"["+strconv.Itoa(i)+"]", v)
}
default:
flattened[key] = value

View file

@ -205,13 +205,11 @@ func (db DB) NamedBulkExec(
ctx,
func() error {
db.logger.Debugf("Executing %s with %d rows..", query, len(b))
start := time.Now()
_, err := db.NamedExecContext(ctx, query, b)
if err != nil {
fmt.Println(err)
return err
}
db.logger.Debugf("..took %s", time.Since(start))
cnt.Add(uint64(len(b)))

View file

@ -3,6 +3,7 @@ package icingadb
import (
"context"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/utils"
"go.uber.org/zap"
@ -12,20 +13,20 @@ import (
)
type Delta struct {
Create EntitiesById
Update EntitiesById
Delete EntitiesById
WithChecksum bool
done chan error
err error
logger *zap.SugaredLogger
Create EntitiesById
Update EntitiesById
Delete EntitiesById
Subject *common.SyncSubject
done chan error
err error
logger *zap.SugaredLogger
}
func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, withChecksum bool, logger *zap.SugaredLogger) *Delta {
func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *zap.SugaredLogger) *Delta {
delta := &Delta{
WithChecksum: withChecksum,
done: make(chan error, 1),
logger: logger,
Subject: subject,
done: make(chan error, 1),
logger: logger,
}
go delta.start(ctx, actual, desired)
@ -33,7 +34,7 @@ func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, with
return delta
}
func (delta Delta) Wait() error {
func (delta *Delta) Wait() error {
return <-delta.done
}
@ -41,7 +42,7 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra
defer close(delta.done)
var update EntitiesById
if delta.WithChecksum {
if delta.Subject.WithChecksum() {
update = EntitiesById{}
}
actual := EntitiesById{}
@ -52,7 +53,8 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra
g.Go(func() error {
var cnt com.Counter
defer utils.Timed(time.Now(), func(elapsed time.Duration) {
delta.logger.Debugf("Synced %d actual elements in %s", cnt.Val(), elapsed)
delta.logger.Debugf(
"Synced %d actual elements of type %s in %s", cnt.Val(), utils.Name(delta.Subject.Entity()), elapsed)
})
for {
select {
@ -68,7 +70,7 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra
delete(desired, id)
mtx.Unlock()
if delta.WithChecksum && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) {
if delta.Subject.WithChecksum() && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) {
updateMtx.Lock()
update[id] = d
updateMtx.Unlock()
@ -88,7 +90,8 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra
g.Go(func() error {
var cnt com.Counter
defer utils.Timed(time.Now(), func(elapsed time.Duration) {
delta.logger.Debugf("Synced %d desired elements in %s", cnt.Val(), elapsed)
delta.logger.Debugf(
"Synced %d desired elements of type %s in %s", cnt.Val(), utils.Name(delta.Subject.Entity()), elapsed)
})
for {
select {
@ -104,7 +107,7 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra
delete(actual, id)
mtx.Unlock()
if delta.WithChecksum && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) {
if delta.Subject.WithChecksum() && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) {
updateMtx.Lock()
update[id] = d
updateMtx.Unlock()
@ -129,7 +132,7 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra
delta.Create = desired
delta.Delete = actual
if delta.WithChecksum {
if delta.Subject.WithChecksum() {
delta.Update = update
}
}

View file

@ -10,6 +10,17 @@ import (
"sort"
)
// MustPackAny calls PackAny using in and panics if there was an error.
func MustPackAny(in ...interface{}) []byte {
var buf bytes.Buffer
if err := PackAny(in, &buf); err != nil {
panic(err)
}
return buf.Bytes()
}
// PackAny packs any JSON-encodable value (ex. structs, also ignores interfaces like encoding.TextMarshaler)
// to a BSON-similar format suitable for consistent hashing. Spec:
//

View file

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/utils"
@ -34,36 +35,10 @@ func NewSync(db *DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync
}
}
func (s Sync) GetDelta(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) *Delta {
// Redis key.
var redisKey string
// Whether we're syncing an entity that implements contracts.Checksumer.
var withChecksum bool
// Value from the factory so that we know what we are synchronizing here.
v := factoryFunc()
// Error channel.
errs := make(chan error, 1)
if _, ok := v.(contracts.Checksumer); ok {
withChecksum = true
redisKey = fmt.Sprintf("icinga:checksum:%s", utils.Key(utils.Name(v), ':'))
} else {
redisKey = fmt.Sprintf("icinga:%s", utils.Key(utils.Name(v), ':'))
}
desired, err := s.fromRedis(ctx, factoryFunc, redisKey)
com.PipeError(err, errs)
actual, err := s.db.YieldAll(ctx, factoryFunc, s.db.BuildSelectStmt(v, v.Fingerprint()))
com.PipeError(err, errs)
return NewDelta(ctx, actual, desired, withChecksum, s.logger)
}
// SyncAfterDump waits for a config dump to finish (using the dump parameter) and then starts a sync for the type given
// by factoryFunc using the Sync function.
func (s Sync) SyncAfterDump(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, dump *DumpSignals) error {
typeName := utils.Name(factoryFunc())
// SyncAfterDump waits for a config dump to finish (using the dump parameter) and then starts a sync for the given
// sync subject using the Sync function.
func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, dump *DumpSignals) error {
typeName := utils.Name(subject.Entity())
key := "icinga:" + utils.Key(typeName, ':')
startTime := time.Now()
@ -88,42 +63,58 @@ func (s Sync) SyncAfterDump(ctx context.Context, factoryFunc contracts.EntityFac
zap.String("type", typeName),
zap.String("key", key),
zap.Duration("waited", time.Now().Sub(startTime)))
return s.Sync(ctx, factoryFunc)
return s.Sync(ctx, subject)
case <-ctx.Done():
return ctx.Err()
}
}
}
// Sync synchronizes entities between Icinga DB and Redis created with the specified factory function.
// Sync synchronizes entities between Icinga DB and Redis created with the specified sync subject.
// This function does not respect dump signals. For this, use SyncAfterDump.
func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) error {
// Value from the factory so that we know what we are synchronizing here.
v := factoryFunc()
// Group for the sync. Whole sync will be cancelled if an error occurs.
func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error {
s.logger.Infof("Syncing %s", utils.Key(utils.Name(subject.Entity()), ' '))
g, ctx := errgroup.WithContext(ctx)
s.logger.Infof("Syncing %s", utils.Key(utils.Name(v), ' '))
desired, redisErrs := s.redis.YieldAll(ctx, subject)
// Let errors from Redis cancel our group.
com.ErrgroupReceive(g, redisErrs)
delta := s.GetDelta(ctx, factoryFunc)
actual, dbErrs := s.db.YieldAll(
ctx, subject.Factory(), s.db.BuildSelectStmt(subject.Entity(), subject.Entity().Fingerprint()))
// Let errors from DB cancel our group.
com.ErrgroupReceive(g, dbErrs)
g.Go(func() error {
return s.ApplyDelta(ctx, NewDelta(ctx, actual, desired, subject, s.logger))
})
return g.Wait()
}
// ApplyDelta applies all changes from Delta to the database.
func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
if err := delta.Wait(); err != nil {
return err
}
g, ctx := errgroup.WithContext(ctx)
// Create
if len(delta.Create) > 0 {
var entities <-chan contracts.Entity
if delta.WithChecksum {
if delta.Subject.WithChecksum() {
pairs, errs := s.redis.HMYield(
ctx,
fmt.Sprintf("icinga:%s", utils.Key(utils.Name(v), ':')),
fmt.Sprintf("icinga:%s", utils.Key(utils.Name(delta.Subject.Entity()), ':')),
count,
concurrent,
delta.Create.Keys()...)
// Let errors from Redis cancel our group.
com.ErrgroupReceive(g, errs)
entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, factoryFunc, pairs, runtime.NumCPU())
entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, delta.Subject.Factory(), pairs, runtime.NumCPU())
// Let errors from CreateEntities cancel our group.
com.ErrgroupReceive(g, errs)
entities, errs = icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Create, runtime.NumCPU())
@ -140,17 +131,17 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc)
// Update
if len(delta.Update) > 0 {
s.logger.Infof("Updating %d rows of type %s", len(delta.Update), utils.Key(utils.Name(v), ' '))
s.logger.Infof("Updating %d rows of type %s", len(delta.Update), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
pairs, errs := s.redis.HMYield(
ctx,
fmt.Sprintf("icinga:%s", utils.Key(utils.Name(v), ':')),
fmt.Sprintf("icinga:%s", utils.Key(utils.Name(delta.Subject.Entity()), ':')),
count,
concurrent,
delta.Update.Keys()...)
// Let errors from Redis cancel our group.
com.ErrgroupReceive(g, errs)
entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, factoryFunc, pairs, runtime.NumCPU())
entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, delta.Subject.Factory(), pairs, runtime.NumCPU())
// Let errors from CreateEntities cancel our group.
com.ErrgroupReceive(g, errs)
entities, errs := icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Update, runtime.NumCPU())
@ -167,28 +158,11 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc)
// Delete
if len(delta.Delete) > 0 {
s.logger.Infof("Deleting %d rows of type %s", len(delta.Delete), utils.Key(utils.Name(v), ' '))
s.logger.Infof("Deleting %d rows of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
g.Go(func() error {
return s.db.Delete(ctx, v, delta.Delete.IDs())
return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs())
})
}
return g.Wait()
}
func (s Sync) fromRedis(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, key string) (<-chan contracts.Entity, <-chan error) {
// Channel for Redis field-value pairs for the specified key and errors.
pairs, errs := s.redis.HYield(
ctx, key, count)
// Group for the Redis sync. Redis sync will be cancelled if an error occurs.
// Note that we're calling HYield with the original context.
g, ctx := errgroup.WithContext(ctx)
// Let errors from HYield cancel our group.
com.ErrgroupReceive(g, errs)
desired, errs := icingaredis.CreateEntities(ctx, factoryFunc, pairs, runtime.NumCPU())
// Let errors from CreateEntities cancel our group.
com.ErrgroupReceive(g, errs)
return desired, com.WaitAsync(g)
}

View file

@ -1,8 +1,17 @@
package v1
import (
"context"
"encoding/json"
"fmt"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/flatten"
"github.com/icinga/icingadb/pkg/icingadb/objectpacker"
"github.com/icinga/icingadb/pkg/types"
"github.com/icinga/icingadb/pkg/utils"
"golang.org/x/sync/errgroup"
"runtime"
)
type Customvar struct {
@ -22,3 +31,65 @@ type CustomvarFlat struct {
func NewCustomvar() contracts.Entity {
return &Customvar{}
}
func NewCustomvarFlat() contracts.Entity {
return &CustomvarFlat{}
}
// FlattenCustomvars creates and yields flat custom variables from the provided custom variables.
func FlattenCustomvars(ctx context.Context, cvs <-chan contracts.Entity) (<-chan contracts.Entity, <-chan error) {
cvFlats := make(chan contracts.Entity)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
defer close(cvFlats)
g, _ := errgroup.WithContext(ctx)
for i := 0; i < runtime.NumCPU(); i++ {
g.Go(func() error {
for entity := range cvs {
var value interface{}
customvar := entity.(*Customvar)
if err := json.Unmarshal([]byte(customvar.Value), &value); err != nil {
return err
}
flattened := flatten.Flatten(value, customvar.Name)
for flatname, flatvalue := range flattened {
flatvalue := fmt.Sprintf("%v", flatvalue)
select {
case cvFlats <- &CustomvarFlat{
CustomvarMeta: CustomvarMeta{
EntityWithoutChecksum: EntityWithoutChecksum{
IdMeta: IdMeta{
// TODO(el): Schema comment is wrong.
// Without customvar.Id we would produce duplicate keys here.
Id: utils.Checksum(objectpacker.MustPackAny(customvar.EnvironmentId, customvar.Id, flatname, flatvalue)),
},
},
EnvironmentMeta: EnvironmentMeta{
EnvironmentId: customvar.EnvironmentId,
},
CustomvarId: customvar.Id,
},
Flatname: flatname,
FlatnameChecksum: utils.Checksum(flatname),
Flatvalue: flatvalue,
}:
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
})
}
return g.Wait()
})
return cvFlats, com.WaitAsync(g)
}

View file

@ -11,7 +11,6 @@ var Factories = []contracts.EntityFactoryFunc{
NewCheckcommandCustomvar,
NewCheckcommandEnvvar,
NewComment,
NewCustomvar,
NewDowntime,
NewEndpoint,
NewEventcommand,

View file

@ -4,10 +4,13 @@ import (
"context"
"github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/utils"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"runtime"
"time"
)
@ -161,3 +164,24 @@ func (c *Client) StreamLastId(ctx context.Context, stream string) (string, error
return lastId, nil
}
// YieldAll yields all entities from Redis that belong to the specified SyncSubject.
func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-chan contracts.Entity, <-chan error) {
key := utils.Key(utils.Name(subject.Entity()), ':')
if subject.WithChecksum() {
key = "icinga:checksum:" + key
} else {
key = "icinga:" + key
}
pairs, errs := c.HYield(ctx, key, 1<<12)
g, ctx := errgroup.WithContext(ctx)
// Let errors from HYield cancel the group.
com.ErrgroupReceive(g, errs)
desired, errs := CreateEntities(ctx, subject.Factory(), pairs, runtime.NumCPU())
// Let errors from CreateEntities cancel the group.
com.ErrgroupReceive(g, errs)
return desired, com.WaitAsync(g)
}