diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go new file mode 100644 index 00000000..1deed748 --- /dev/null +++ b/pkg/icingadb/delta.go @@ -0,0 +1,122 @@ +package icingadb + +import ( + "context" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/utils" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "sync" + "time" +) + +type Delta struct { + Create *sync.Map + Update *sync.Map + Delete *sync.Map + WithChecksum bool + done chan error + err error + logger *zap.SugaredLogger +} + +func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, withChecksum bool, logger *zap.SugaredLogger) *Delta { + delta := &Delta{ + WithChecksum: withChecksum, + done: make(chan error, 1), + logger: logger, + } + + go delta.start(ctx, actual, desired) + + return delta +} + +func (delta Delta) Wait() error { + return <-delta.done +} + +func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contracts.Entity) { + defer close(delta.done) + + var update sync.Map + if delta.WithChecksum { + update = sync.Map{} + } + actual := sync.Map{} + desired := sync.Map{} + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + var cnt com.Counter + defer utils.Timed(time.Now(), func(elapsed time.Duration) { + delta.logger.Debugf("Synced %d actual elements in %s", cnt.Val(), elapsed) + }) + for { + select { + case a, ok := <-actualCh: + if !ok { + return nil + } + + id := a.ID().String() + if d, ok := desired.Load(id); ok { + desired.Delete(id) + + if delta.WithChecksum && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) { + update.Store(id, d) + } + } else { + actual.Store(id, a) + } + + cnt.Inc() + case <-ctx.Done(): + return nil + } + } + }) + + g.Go(func() error { + var cnt com.Counter + defer utils.Timed(time.Now(), func(elapsed time.Duration) { + delta.logger.Debugf("Synced %d desired elements in %s", cnt.Val(), elapsed) + }) + for { + select { + case d, ok := <-desiredCh: + if !ok { + return nil + } + + id := d.ID().String() + if a, ok := actual.Load(id); ok { + actual.Delete(id) + + if delta.WithChecksum && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) { + update.Store(id, d) + } + } else { + desired.Store(id, d) + } + + cnt.Inc() + case <-ctx.Done(): + return nil + } + } + }) + + if err := g.Wait(); err != nil { + delta.done <- err + + return + } + + delta.Create = &desired + delta.Delete = &actual + if delta.WithChecksum { + delta.Update = &update + } +} diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go new file mode 100644 index 00000000..50097110 --- /dev/null +++ b/pkg/icingadb/sync.go @@ -0,0 +1,157 @@ +package icingadb + +import ( + "context" + "fmt" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/utils" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "runtime" +) + +var ( + // Redis concurrency settings. + count = 1 << 12 + concurrent = 1 << 3 +) + +// Sync implements a rendezvous point for Icinga DB and Redis to synchronize their entities. +type Sync struct { + db *DB + redis *icingaredis.Client + logger *zap.SugaredLogger +} + +func NewSync(db *DB, redis *icingaredis.Client, logger *zap.SugaredLogger) *Sync { + return &Sync{ + db: db, + redis: redis, + logger: logger, + } +} + +func (s Sync) GetDelta(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) *Delta { + // Redis key. + var redisKey string + // Whether we're syncing an entity that implements contracts.Checksumer. + var withChecksum bool + // Value from the factory so that we know what we are synchronizing here. + v := factoryFunc() + // Error channel. + errs := make(chan error, 1) + + if _, ok := v.(contracts.Checksumer); ok { + withChecksum = true + redisKey = fmt.Sprintf("icinga:checksum:%s", utils.Key(utils.Name(v), ':')) + } else { + redisKey = fmt.Sprintf("icinga:config:%s", utils.Key(utils.Name(v), ':')) + } + + desired, err := s.fromRedis(ctx, factoryFunc, redisKey) + com.PipeError(err, errs) + + actual, err := s.db.YieldAll(ctx, factoryFunc, s.db.BuildSelectStmt(v, v.Fingerprint())) + com.PipeError(err, errs) + + return NewDelta(ctx, actual, desired, withChecksum, s.logger) +} + +// Synchronize entities between Icinga DB and Redis created with the specified factory function. +func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) error { + // Value from the factory so that we know what we are synchronizing here. + v := factoryFunc() + // Group for the sync. Whole sync will be cancelled if an error occurs. + g, ctx := errgroup.WithContext(ctx) + + s.logger.Infof("Syncing %s", utils.Key(utils.Name(v), ' ')) + + delta := s.GetDelta(ctx, factoryFunc) + if err := delta.Wait(); err != nil { + return err + } + + // Create + { + var entities <-chan contracts.Entity + if delta.WithChecksum { + pairs, errs := s.redis.HMYield( + ctx, + fmt.Sprintf("icinga:config:%s", utils.Key(utils.Name(v), ':')), + count, + concurrent, + utils.SyncMapKeys(delta.Create)...) + // Let errors from Redis cancel our group. + com.ErrgroupReceive(g, errs) + + entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, factoryFunc, pairs, runtime.NumCPU()) + // Let errors from CreateEntities cancel our group. + com.ErrgroupReceive(g, errs) + entities, errs = icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Create, runtime.NumCPU()) + // Let errors from SetChecksums cancel our group. + com.ErrgroupReceive(g, errs) + } else { + entities = utils.SyncMapEntities(delta.Create) + } + + g.Go(func() error { + return s.db.Create(ctx, entities) + }) + } + + // Update + { + s.logger.Infof("Updating %d rows of type %s", len(utils.SyncMapKeys(delta.Update)), utils.Key(utils.Name(v), ' ')) + pairs, errs := s.redis.HMYield( + ctx, + fmt.Sprintf("icinga:config:%s", utils.Key(utils.Name(v), ':')), + count, + concurrent, + utils.SyncMapKeys(delta.Update)...) + // Let errors from Redis cancel our group. + com.ErrgroupReceive(g, errs) + + entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, factoryFunc, pairs, runtime.NumCPU()) + // Let errors from CreateEntities cancel our group. + com.ErrgroupReceive(g, errs) + entities, errs := icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Update, runtime.NumCPU()) + // Let errors from SetChecksums cancel our group. + com.ErrgroupReceive(g, errs) + + g.Go(func() error { + // TODO (el): This is very slow in high latency scenarios. + // Use strings.Repeat() on the query and create a stmt + // with a size near the default value of max_allowed_packet. + return s.db.Update(ctx, entities) + }) + } + + // Delete + { + s.logger.Infof("Deleting %d rows of type %s", len(utils.SyncMapKeys(delta.Delete)), utils.Key(utils.Name(v), ' ')) + g.Go(func() error { + return s.db.BulkExec(ctx, s.db.BuildDeleteStmt(v), 1<<15, 1<<3, utils.SyncMapIDs(delta.Delete)) + }) + } + + return g.Wait() +} + +func (s Sync) fromRedis(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, key string) (<-chan contracts.Entity, <-chan error) { + // Channel for Redis field-value pairs for the specified key and errors. + pairs, errs := s.redis.HYield( + ctx, key, count) + // Group for the Redis sync. Redis sync will be cancelled if an error occurs. + // Note that we're calling HYield with the original context. + g, ctx := errgroup.WithContext(ctx) + // Let errors from HYield cancel our group. + com.ErrgroupReceive(g, errs) + + desired, errs := icingaredis.CreateEntities(ctx, factoryFunc, pairs, runtime.NumCPU()) + // Let errors from CreateEntities cancel our group. + com.ErrgroupReceive(g, errs) + + return desired, com.WaitAsync(g) +} diff --git a/pkg/icingaredis/utils.go b/pkg/icingaredis/utils.go new file mode 100644 index 00000000..893129d0 --- /dev/null +++ b/pkg/icingaredis/utils.go @@ -0,0 +1,90 @@ +package icingaredis + +import ( + "context" + "encoding/json" + "errors" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" + "golang.org/x/sync/errgroup" + "sync" +) + +func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, pairs <-chan HPair, concurrent int) (<-chan contracts.Entity, <-chan error) { + entities := make(chan contracts.Entity, 0) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + defer close(entities) + + g, ctx := errgroup.WithContext(ctx) + + for i := 0; i < concurrent; i++ { + g.Go(func() error { + for pair := range pairs { + var id types.Binary + + if err := id.UnmarshalText([]byte(pair.Field)); err != nil { + return err + } + + e := factoryFunc() + if err := json.Unmarshal([]byte(pair.Value), e); err != nil { + return err + } + e.SetID(id) + + select { + case entities <- e: + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + }) + } + + return g.Wait() + }) + + return entities, com.WaitAsync(g) +} + +func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums *sync.Map, concurrent int) (<-chan contracts.Entity, <-chan error) { + entitiesWithChecksum := make(chan contracts.Entity, 0) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + defer close(entitiesWithChecksum) + + g, ctx := errgroup.WithContext(ctx) + + for i := 0; i < concurrent; i++ { + g.Go(func() error { + for entity := range entities { + if checksumer, ok := checksums.Load(entity.ID().String()); ok { + entity.(contracts.Checksumer).SetChecksum(checksumer.(contracts.Checksumer).Checksum()) + } else { + panic("no checksum") + // TODO(el): Error is not published + return errors.New("no checksum") + } + + select { + case entitiesWithChecksum <- entity: + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + }) + } + + return g.Wait() + }) + + return entitiesWithChecksum, com.WaitAsync(g) +}