diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 32101737..857c8379 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -67,7 +67,7 @@ func main() { return err } - entities := utils.SyncMapEntities(delta.Create) + entities := delta.Create.Entities(synctx) flat := make(chan contracts.Entity, 0) cvg, _ := errgroup.WithContext(synctx) diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go index 1deed748..a9e9d5a1 100644 --- a/pkg/icingadb/delta.go +++ b/pkg/icingadb/delta.go @@ -12,9 +12,9 @@ import ( ) type Delta struct { - Create *sync.Map - Update *sync.Map - Delete *sync.Map + Create EntitiesById + Update EntitiesById + Delete EntitiesById WithChecksum bool done chan error err error @@ -40,12 +40,13 @@ func (delta Delta) Wait() error { func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contracts.Entity) { defer close(delta.done) - var update sync.Map + var update EntitiesById if delta.WithChecksum { - update = sync.Map{} + update = EntitiesById{} } - actual := sync.Map{} - desired := sync.Map{} + actual := EntitiesById{} + desired := EntitiesById{} + var mtx, updateMtx sync.Mutex g, ctx := errgroup.WithContext(ctx) g.Go(func() error { @@ -61,14 +62,20 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra } id := a.ID().String() - if d, ok := desired.Load(id); ok { - desired.Delete(id) + mtx.Lock() + + if d, ok := desired[id]; ok { + delete(desired, id) + mtx.Unlock() if delta.WithChecksum && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) { - update.Store(id, d) + updateMtx.Lock() + update[id] = d + updateMtx.Unlock() } } else { - actual.Store(id, a) + actual[id] = a + mtx.Unlock() } cnt.Inc() @@ -91,14 +98,20 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra } id := d.ID().String() - if a, ok := actual.Load(id); ok { - actual.Delete(id) + mtx.Lock() + + if a, ok := actual[id]; ok { + delete(actual, id) + mtx.Unlock() if delta.WithChecksum && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) { - update.Store(id, d) + updateMtx.Lock() + update[id] = d + updateMtx.Unlock() } } else { - desired.Store(id, d) + desired[id] = d + mtx.Unlock() } cnt.Inc() @@ -114,9 +127,9 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra return } - delta.Create = &desired - delta.Delete = &actual + delta.Create = desired + delta.Delete = actual if delta.WithChecksum { - delta.Update = &update + delta.Update = update } } diff --git a/pkg/icingadb/entitiesbyid.go b/pkg/icingadb/entitiesbyid.go new file mode 100644 index 00000000..2f7788d6 --- /dev/null +++ b/pkg/icingadb/entitiesbyid.go @@ -0,0 +1,44 @@ +package icingadb + +import ( + "context" + "github.com/icinga/icingadb/pkg/contracts" +) + +type EntitiesById map[string]contracts.Entity + +func (ebi EntitiesById) Keys() []string { + keys := make([]string, 0, len(ebi)) + for k := range ebi { + keys = append(keys, k) + } + + return keys +} + +func (ebi EntitiesById) IDs() []interface{} { + ids := make([]interface{}, 0, len(ebi)) + for _, v := range ebi { + ids = append(ids, v.(contracts.IDer).ID()) + } + + return ids +} + +func (ebi EntitiesById) Entities(ctx context.Context) <-chan contracts.Entity { + entities := make(chan contracts.Entity, 0) + + go func() { + defer close(entities) + + for _, v := range ebi { + select { + case <-ctx.Done(): + return + case entities <- v: + } + } + }() + + return entities +} diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 50097110..5c03986f 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -82,7 +82,7 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) fmt.Sprintf("icinga:config:%s", utils.Key(utils.Name(v), ':')), count, concurrent, - utils.SyncMapKeys(delta.Create)...) + delta.Create.Keys()...) // Let errors from Redis cancel our group. com.ErrgroupReceive(g, errs) @@ -93,7 +93,7 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) // Let errors from SetChecksums cancel our group. com.ErrgroupReceive(g, errs) } else { - entities = utils.SyncMapEntities(delta.Create) + entities = delta.Create.Entities(ctx) } g.Go(func() error { @@ -103,13 +103,13 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) // Update { - s.logger.Infof("Updating %d rows of type %s", len(utils.SyncMapKeys(delta.Update)), utils.Key(utils.Name(v), ' ')) + s.logger.Infof("Updating %d rows of type %s", len(delta.Update), utils.Key(utils.Name(v), ' ')) pairs, errs := s.redis.HMYield( ctx, fmt.Sprintf("icinga:config:%s", utils.Key(utils.Name(v), ':')), count, concurrent, - utils.SyncMapKeys(delta.Update)...) + delta.Update.Keys()...) // Let errors from Redis cancel our group. com.ErrgroupReceive(g, errs) @@ -130,9 +130,9 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) // Delete { - s.logger.Infof("Deleting %d rows of type %s", len(utils.SyncMapKeys(delta.Delete)), utils.Key(utils.Name(v), ' ')) + s.logger.Infof("Deleting %d rows of type %s", len(delta.Delete), utils.Key(utils.Name(v), ' ')) g.Go(func() error { - return s.db.BulkExec(ctx, s.db.BuildDeleteStmt(v), 1<<15, 1<<3, utils.SyncMapIDs(delta.Delete)) + return s.db.BulkExec(ctx, s.db.BuildDeleteStmt(v), 1<<15, 1<<3, delta.Delete.IDs()) }) } diff --git a/pkg/icingaredis/utils.go b/pkg/icingaredis/utils.go index 893129d0..03212b14 100644 --- a/pkg/icingaredis/utils.go +++ b/pkg/icingaredis/utils.go @@ -8,7 +8,6 @@ import ( "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/types" "golang.org/x/sync/errgroup" - "sync" ) func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, pairs <-chan HPair, concurrent int) (<-chan contracts.Entity, <-chan error) { @@ -52,7 +51,7 @@ func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc return entities, com.WaitAsync(g) } -func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums *sync.Map, concurrent int) (<-chan contracts.Entity, <-chan error) { +func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums map[string]contracts.Entity, concurrent int) (<-chan contracts.Entity, <-chan error) { entitiesWithChecksum := make(chan contracts.Entity, 0) g, ctx := errgroup.WithContext(ctx) @@ -64,7 +63,7 @@ func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksu for i := 0; i < concurrent; i++ { g.Go(func() error { for entity := range entities { - if checksumer, ok := checksums.Load(entity.ID().String()); ok { + if checksumer, ok := checksums[entity.ID().String()]; ok { entity.(contracts.Checksumer).SetChecksum(checksumer.(contracts.Checksumer).Checksum()) } else { panic("no checksum") diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index e0752043..86672465 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -7,7 +7,6 @@ import ( "fmt" "github.com/go-sql-driver/mysql" "github.com/google/uuid" - "github.com/icinga/icingadb/pkg/contracts" "go.uber.org/zap" "io/ioutil" "math" @@ -120,52 +119,6 @@ func BatchSliceOfInterfaces(ctx context.Context, keys []interface{}, count int) return batches } -func SyncMapKeys(m *sync.Map) []string { - keys := make([]string, 0) - - if m != nil { - m.Range(func(key, value interface{}) bool { - keys = append(keys, key.(string)) - - return true - }) - } - - return keys -} - -func SyncMapIDs(m *sync.Map) []interface{} { - ids := make([]interface{}, 0) - - if m != nil { - m.Range(func(key, value interface{}) bool { - ids = append(ids, value.(contracts.IDer).ID()) - - return true - }) - } - - return ids -} - -func SyncMapEntities(m *sync.Map) <-chan contracts.Entity { - entities := make(chan contracts.Entity, 0) - - go func() { - defer close(entities) - if m != nil { - m.Range(func(key, value interface{}) bool { - entities <- value.(contracts.Entity) - - return true - }) - } - - }() - - return entities -} - func IsContextCanceled(err error) bool { return errors.Is(err, context.Canceled) }