mirror of
https://github.com/Icinga/icingadb.git
synced 2026-05-28 04:35:54 -04:00
Implement sync
This commit is contained in:
parent
262749c575
commit
bb9a2b0251
3 changed files with 369 additions and 0 deletions
122
pkg/icingadb/delta.go
Normal file
122
pkg/icingadb/delta.go
Normal file
|
|
@ -0,0 +1,122 @@
|
|||
package icingadb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/icinga/icingadb/pkg/com"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Delta struct {
|
||||
Create *sync.Map
|
||||
Update *sync.Map
|
||||
Delete *sync.Map
|
||||
WithChecksum bool
|
||||
done chan error
|
||||
err error
|
||||
logger *zap.SugaredLogger
|
||||
}
|
||||
|
||||
func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, withChecksum bool, logger *zap.SugaredLogger) *Delta {
|
||||
delta := &Delta{
|
||||
WithChecksum: withChecksum,
|
||||
done: make(chan error, 1),
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
go delta.start(ctx, actual, desired)
|
||||
|
||||
return delta
|
||||
}
|
||||
|
||||
func (delta Delta) Wait() error {
|
||||
return <-delta.done
|
||||
}
|
||||
|
||||
func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contracts.Entity) {
|
||||
defer close(delta.done)
|
||||
|
||||
var update sync.Map
|
||||
if delta.WithChecksum {
|
||||
update = sync.Map{}
|
||||
}
|
||||
actual := sync.Map{}
|
||||
desired := sync.Map{}
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
g.Go(func() error {
|
||||
var cnt com.Counter
|
||||
defer utils.Timed(time.Now(), func(elapsed time.Duration) {
|
||||
delta.logger.Debugf("Synced %d actual elements in %s", cnt.Val(), elapsed)
|
||||
})
|
||||
for {
|
||||
select {
|
||||
case a, ok := <-actualCh:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
id := a.ID().String()
|
||||
if d, ok := desired.Load(id); ok {
|
||||
desired.Delete(id)
|
||||
|
||||
if delta.WithChecksum && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) {
|
||||
update.Store(id, d)
|
||||
}
|
||||
} else {
|
||||
actual.Store(id, a)
|
||||
}
|
||||
|
||||
cnt.Inc()
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
g.Go(func() error {
|
||||
var cnt com.Counter
|
||||
defer utils.Timed(time.Now(), func(elapsed time.Duration) {
|
||||
delta.logger.Debugf("Synced %d desired elements in %s", cnt.Val(), elapsed)
|
||||
})
|
||||
for {
|
||||
select {
|
||||
case d, ok := <-desiredCh:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
id := d.ID().String()
|
||||
if a, ok := actual.Load(id); ok {
|
||||
actual.Delete(id)
|
||||
|
||||
if delta.WithChecksum && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) {
|
||||
update.Store(id, d)
|
||||
}
|
||||
} else {
|
||||
desired.Store(id, d)
|
||||
}
|
||||
|
||||
cnt.Inc()
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
delta.done <- err
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
delta.Create = &desired
|
||||
delta.Delete = &actual
|
||||
if delta.WithChecksum {
|
||||
delta.Update = &update
|
||||
}
|
||||
}
|
||||
157
pkg/icingadb/sync.go
Normal file
157
pkg/icingadb/sync.go
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
package icingadb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/icinga/icingadb/pkg/com"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/icingaredis"
|
||||
"github.com/icinga/icingadb/pkg/utils"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"runtime"
|
||||
)
|
||||
|
||||
var (
|
||||
// Redis concurrency settings.
|
||||
count = 1 << 12
|
||||
concurrent = 1 << 3
|
||||
)
|
||||
|
||||
// Sync implements a rendezvous point for Icinga DB and Redis to synchronize their entities.
|
||||
type Sync struct {
|
||||
db *DB
|
||||
redis *icingaredis.Client
|
||||
logger *zap.SugaredLogger
|
||||
}
|
||||
|
||||
func NewSync(db *DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync {
|
||||
return &Sync{
|
||||
db: db,
|
||||
redis: redis,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (s Sync) GetDelta(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) *Delta {
|
||||
// Redis key.
|
||||
var redisKey string
|
||||
// Whether we're syncing an entity that implements contracts.Checksumer.
|
||||
var withChecksum bool
|
||||
// Value from the factory so that we know what we are synchronizing here.
|
||||
v := factoryFunc()
|
||||
// Error channel.
|
||||
errs := make(chan error, 1)
|
||||
|
||||
if _, ok := v.(contracts.Checksumer); ok {
|
||||
withChecksum = true
|
||||
redisKey = fmt.Sprintf("icinga:checksum:%s", utils.Key(utils.Name(v), ':'))
|
||||
} else {
|
||||
redisKey = fmt.Sprintf("icinga:config:%s", utils.Key(utils.Name(v), ':'))
|
||||
}
|
||||
|
||||
desired, err := s.fromRedis(ctx, factoryFunc, redisKey)
|
||||
com.PipeError(err, errs)
|
||||
|
||||
actual, err := s.db.YieldAll(ctx, factoryFunc, s.db.BuildSelectStmt(v, v.Fingerprint()))
|
||||
com.PipeError(err, errs)
|
||||
|
||||
return NewDelta(ctx, actual, desired, withChecksum, s.logger)
|
||||
}
|
||||
|
||||
// Synchronize entities between Icinga DB and Redis created with the specified factory function.
|
||||
func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) error {
|
||||
// Value from the factory so that we know what we are synchronizing here.
|
||||
v := factoryFunc()
|
||||
// Group for the sync. Whole sync will be cancelled if an error occurs.
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
s.logger.Infof("Syncing %s", utils.Key(utils.Name(v), ' '))
|
||||
|
||||
delta := s.GetDelta(ctx, factoryFunc)
|
||||
if err := delta.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create
|
||||
{
|
||||
var entities <-chan contracts.Entity
|
||||
if delta.WithChecksum {
|
||||
pairs, errs := s.redis.HMYield(
|
||||
ctx,
|
||||
fmt.Sprintf("icinga:config:%s", utils.Key(utils.Name(v), ':')),
|
||||
count,
|
||||
concurrent,
|
||||
utils.SyncMapKeys(delta.Create)...)
|
||||
// Let errors from Redis cancel our group.
|
||||
com.ErrgroupReceive(g, errs)
|
||||
|
||||
entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, factoryFunc, pairs, runtime.NumCPU())
|
||||
// Let errors from CreateEntities cancel our group.
|
||||
com.ErrgroupReceive(g, errs)
|
||||
entities, errs = icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Create, runtime.NumCPU())
|
||||
// Let errors from SetChecksums cancel our group.
|
||||
com.ErrgroupReceive(g, errs)
|
||||
} else {
|
||||
entities = utils.SyncMapEntities(delta.Create)
|
||||
}
|
||||
|
||||
g.Go(func() error {
|
||||
return s.db.Create(ctx, entities)
|
||||
})
|
||||
}
|
||||
|
||||
// Update
|
||||
{
|
||||
s.logger.Infof("Updating %d rows of type %s", len(utils.SyncMapKeys(delta.Update)), utils.Key(utils.Name(v), ' '))
|
||||
pairs, errs := s.redis.HMYield(
|
||||
ctx,
|
||||
fmt.Sprintf("icinga:config:%s", utils.Key(utils.Name(v), ':')),
|
||||
count,
|
||||
concurrent,
|
||||
utils.SyncMapKeys(delta.Update)...)
|
||||
// Let errors from Redis cancel our group.
|
||||
com.ErrgroupReceive(g, errs)
|
||||
|
||||
entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, factoryFunc, pairs, runtime.NumCPU())
|
||||
// Let errors from CreateEntities cancel our group.
|
||||
com.ErrgroupReceive(g, errs)
|
||||
entities, errs := icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Update, runtime.NumCPU())
|
||||
// Let errors from SetChecksums cancel our group.
|
||||
com.ErrgroupReceive(g, errs)
|
||||
|
||||
g.Go(func() error {
|
||||
// TODO (el): This is very slow in high latency scenarios.
|
||||
// Use strings.Repeat() on the query and create a stmt
|
||||
// with a size near the default value of max_allowed_packet.
|
||||
return s.db.Update(ctx, entities)
|
||||
})
|
||||
}
|
||||
|
||||
// Delete
|
||||
{
|
||||
s.logger.Infof("Deleting %d rows of type %s", len(utils.SyncMapKeys(delta.Delete)), utils.Key(utils.Name(v), ' '))
|
||||
g.Go(func() error {
|
||||
return s.db.BulkExec(ctx, s.db.BuildDeleteStmt(v), 1<<15, 1<<3, utils.SyncMapIDs(delta.Delete))
|
||||
})
|
||||
}
|
||||
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
func (s Sync) fromRedis(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, key string) (<-chan contracts.Entity, <-chan error) {
|
||||
// Channel for Redis field-value pairs for the specified key and errors.
|
||||
pairs, errs := s.redis.HYield(
|
||||
ctx, key, count)
|
||||
// Group for the Redis sync. Redis sync will be cancelled if an error occurs.
|
||||
// Note that we're calling HYield with the original context.
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
// Let errors from HYield cancel our group.
|
||||
com.ErrgroupReceive(g, errs)
|
||||
|
||||
desired, errs := icingaredis.CreateEntities(ctx, factoryFunc, pairs, runtime.NumCPU())
|
||||
// Let errors from CreateEntities cancel our group.
|
||||
com.ErrgroupReceive(g, errs)
|
||||
|
||||
return desired, com.WaitAsync(g)
|
||||
}
|
||||
90
pkg/icingaredis/utils.go
Normal file
90
pkg/icingaredis/utils.go
Normal file
|
|
@ -0,0 +1,90 @@
|
|||
package icingaredis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/icinga/icingadb/pkg/com"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, pairs <-chan HPair, concurrent int) (<-chan contracts.Entity, <-chan error) {
|
||||
entities := make(chan contracts.Entity, 0)
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
g.Go(func() error {
|
||||
defer close(entities)
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
for i := 0; i < concurrent; i++ {
|
||||
g.Go(func() error {
|
||||
for pair := range pairs {
|
||||
var id types.Binary
|
||||
|
||||
if err := id.UnmarshalText([]byte(pair.Field)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e := factoryFunc()
|
||||
if err := json.Unmarshal([]byte(pair.Value), e); err != nil {
|
||||
return err
|
||||
}
|
||||
e.SetID(id)
|
||||
|
||||
select {
|
||||
case entities <- e:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
return g.Wait()
|
||||
})
|
||||
|
||||
return entities, com.WaitAsync(g)
|
||||
}
|
||||
|
||||
func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums *sync.Map, concurrent int) (<-chan contracts.Entity, <-chan error) {
|
||||
entitiesWithChecksum := make(chan contracts.Entity, 0)
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
g.Go(func() error {
|
||||
defer close(entitiesWithChecksum)
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
for i := 0; i < concurrent; i++ {
|
||||
g.Go(func() error {
|
||||
for entity := range entities {
|
||||
if checksumer, ok := checksums.Load(entity.ID().String()); ok {
|
||||
entity.(contracts.Checksumer).SetChecksum(checksumer.(contracts.Checksumer).Checksum())
|
||||
} else {
|
||||
panic("no checksum")
|
||||
// TODO(el): Error is not published
|
||||
return errors.New("no checksum")
|
||||
}
|
||||
|
||||
select {
|
||||
case entitiesWithChecksum <- entity:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
return g.Wait()
|
||||
})
|
||||
|
||||
return entitiesWithChecksum, com.WaitAsync(g)
|
||||
}
|
||||
Loading…
Reference in a new issue