From d36928afd3781ccfaf592c869167a9d408978efb Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 8 Apr 2021 16:28:42 +0200 Subject: [PATCH 1/4] Delta#start(): avoid a race across maps by using a mutex Imagine an Icinga restart w/o any config changes and a full dump already being done. One goroutine reads Redis, the other the database. Both get the same object at the same time and check it in the map of the other goroutine - not present. So they store it in their own map. I.e. the same object hasn't been changed, but has to be deleted and inserted. If the insert comes first, that causes a duplicate key error. --- pkg/icingadb/delta.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go index 1deed748..4510997a 100644 --- a/pkg/icingadb/delta.go +++ b/pkg/icingadb/delta.go @@ -46,6 +46,7 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra } actual := sync.Map{} desired := sync.Map{} + var mtx sync.Mutex g, ctx := errgroup.WithContext(ctx) g.Go(func() error { @@ -61,14 +62,18 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra } id := a.ID().String() + mtx.Lock() + if d, ok := desired.Load(id); ok { desired.Delete(id) + mtx.Unlock() if delta.WithChecksum && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) { update.Store(id, d) } } else { actual.Store(id, a) + mtx.Unlock() } cnt.Inc() @@ -91,14 +96,18 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra } id := d.ID().String() + mtx.Lock() + if a, ok := actual.Load(id); ok { actual.Delete(id) + mtx.Unlock() if delta.WithChecksum && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) { update.Store(id, d) } } else { desired.Store(id, d) + mtx.Unlock() } cnt.Inc() From 3974a7bd4fd2769a8f296ac6d854db447aa8d0bd Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 12 Apr 2021 17:24:11 +0200 Subject: [PATCH 2/4] Introduce EntitiesById --- pkg/icingadb/entitiesbyid.go | 44 ++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 pkg/icingadb/entitiesbyid.go diff --git a/pkg/icingadb/entitiesbyid.go b/pkg/icingadb/entitiesbyid.go new file mode 100644 index 00000000..2f7788d6 --- /dev/null +++ b/pkg/icingadb/entitiesbyid.go @@ -0,0 +1,44 @@ +package icingadb + +import ( + "context" + "github.com/icinga/icingadb/pkg/contracts" +) + +type EntitiesById map[string]contracts.Entity + +func (ebi EntitiesById) Keys() []string { + keys := make([]string, 0, len(ebi)) + for k := range ebi { + keys = append(keys, k) + } + + return keys +} + +func (ebi EntitiesById) IDs() []interface{} { + ids := make([]interface{}, 0, len(ebi)) + for _, v := range ebi { + ids = append(ids, v.(contracts.IDer).ID()) + } + + return ids +} + +func (ebi EntitiesById) Entities(ctx context.Context) <-chan contracts.Entity { + entities := make(chan contracts.Entity, 0) + + go func() { + defer close(entities) + + for _, v := range ebi { + select { + case <-ctx.Done(): + return + case entities <- v: + } + } + }() + + return entities +} From fac47fb330daacfad0eb246db114e1d30ee6c3b8 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 8 Apr 2021 18:12:05 +0200 Subject: [PATCH 3/4] Delta: don't over-lock --- cmd/icingadb/main.go | 2 +- pkg/icingadb/delta.go | 42 ++++++++++++++++++++++------------------ pkg/icingadb/sync.go | 12 ++++++------ pkg/icingaredis/utils.go | 5 ++--- 4 files changed, 32 insertions(+), 29 deletions(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 7f2ded72..e772b717 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -65,7 +65,7 @@ func main() { return err } - entities := utils.SyncMapEntities(delta.Create) + entities := delta.Create.Entities(synctx) flat := make(chan contracts.Entity, 0) cvg, _ := errgroup.WithContext(synctx) diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go index 4510997a..a9e9d5a1 100644 --- a/pkg/icingadb/delta.go +++ b/pkg/icingadb/delta.go @@ -12,9 +12,9 @@ import ( ) type Delta struct { - Create *sync.Map - Update *sync.Map - Delete *sync.Map + Create EntitiesById + Update EntitiesById + Delete EntitiesById WithChecksum bool done chan error err error @@ -40,13 +40,13 @@ func (delta Delta) Wait() error { func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contracts.Entity) { defer close(delta.done) - var update sync.Map + var update EntitiesById if delta.WithChecksum { - update = sync.Map{} + update = EntitiesById{} } - actual := sync.Map{} - desired := sync.Map{} - var mtx sync.Mutex + actual := EntitiesById{} + desired := EntitiesById{} + var mtx, updateMtx sync.Mutex g, ctx := errgroup.WithContext(ctx) g.Go(func() error { @@ -64,15 +64,17 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra id := a.ID().String() mtx.Lock() - if d, ok := desired.Load(id); ok { - desired.Delete(id) + if d, ok := desired[id]; ok { + delete(desired, id) mtx.Unlock() if delta.WithChecksum && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) { - update.Store(id, d) + updateMtx.Lock() + update[id] = d + updateMtx.Unlock() } } else { - actual.Store(id, a) + actual[id] = a mtx.Unlock() } @@ -98,15 +100,17 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra id := d.ID().String() mtx.Lock() - if a, ok := actual.Load(id); ok { - actual.Delete(id) + if a, ok := actual[id]; ok { + delete(actual, id) mtx.Unlock() if delta.WithChecksum && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) { - update.Store(id, d) + updateMtx.Lock() + update[id] = d + updateMtx.Unlock() } } else { - desired.Store(id, d) + desired[id] = d mtx.Unlock() } @@ -123,9 +127,9 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra return } - delta.Create = &desired - delta.Delete = &actual + delta.Create = desired + delta.Delete = actual if delta.WithChecksum { - delta.Update = &update + delta.Update = update } } diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 50097110..5c03986f 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -82,7 +82,7 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) fmt.Sprintf("icinga:config:%s", utils.Key(utils.Name(v), ':')), count, concurrent, - utils.SyncMapKeys(delta.Create)...) + delta.Create.Keys()...) // Let errors from Redis cancel our group. com.ErrgroupReceive(g, errs) @@ -93,7 +93,7 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) // Let errors from SetChecksums cancel our group. com.ErrgroupReceive(g, errs) } else { - entities = utils.SyncMapEntities(delta.Create) + entities = delta.Create.Entities(ctx) } g.Go(func() error { @@ -103,13 +103,13 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) // Update { - s.logger.Infof("Updating %d rows of type %s", len(utils.SyncMapKeys(delta.Update)), utils.Key(utils.Name(v), ' ')) + s.logger.Infof("Updating %d rows of type %s", len(delta.Update), utils.Key(utils.Name(v), ' ')) pairs, errs := s.redis.HMYield( ctx, fmt.Sprintf("icinga:config:%s", utils.Key(utils.Name(v), ':')), count, concurrent, - utils.SyncMapKeys(delta.Update)...) + delta.Update.Keys()...) // Let errors from Redis cancel our group. com.ErrgroupReceive(g, errs) @@ -130,9 +130,9 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc) // Delete { - s.logger.Infof("Deleting %d rows of type %s", len(utils.SyncMapKeys(delta.Delete)), utils.Key(utils.Name(v), ' ')) + s.logger.Infof("Deleting %d rows of type %s", len(delta.Delete), utils.Key(utils.Name(v), ' ')) g.Go(func() error { - return s.db.BulkExec(ctx, s.db.BuildDeleteStmt(v), 1<<15, 1<<3, utils.SyncMapIDs(delta.Delete)) + return s.db.BulkExec(ctx, s.db.BuildDeleteStmt(v), 1<<15, 1<<3, delta.Delete.IDs()) }) } diff --git a/pkg/icingaredis/utils.go b/pkg/icingaredis/utils.go index 893129d0..03212b14 100644 --- a/pkg/icingaredis/utils.go +++ b/pkg/icingaredis/utils.go @@ -8,7 +8,6 @@ import ( "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/types" "golang.org/x/sync/errgroup" - "sync" ) func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, pairs <-chan HPair, concurrent int) (<-chan contracts.Entity, <-chan error) { @@ -52,7 +51,7 @@ func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc return entities, com.WaitAsync(g) } -func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums *sync.Map, concurrent int) (<-chan contracts.Entity, <-chan error) { +func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums map[string]contracts.Entity, concurrent int) (<-chan contracts.Entity, <-chan error) { entitiesWithChecksum := make(chan contracts.Entity, 0) g, ctx := errgroup.WithContext(ctx) @@ -64,7 +63,7 @@ func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksu for i := 0; i < concurrent; i++ { g.Go(func() error { for entity := range entities { - if checksumer, ok := checksums.Load(entity.ID().String()); ok { + if checksumer, ok := checksums[entity.ID().String()]; ok { entity.(contracts.Checksumer).SetChecksum(checksumer.(contracts.Checksumer).Checksum()) } else { panic("no checksum") From bf1a77a67c7c8ca9d6a7bc8aebc54252746ea6b7 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 8 Apr 2021 18:12:51 +0200 Subject: [PATCH 4/4] Drop utils.SyncMap*() --- pkg/utils/utils.go | 47 ---------------------------------------------- 1 file changed, 47 deletions(-) diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index ae8e537b..49693838 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -7,7 +7,6 @@ import ( "fmt" "github.com/go-sql-driver/mysql" "github.com/google/uuid" - "github.com/icinga/icingadb/pkg/contracts" "go.uber.org/zap" "io/ioutil" "math" @@ -111,52 +110,6 @@ func BatchSliceOfInterfaces(ctx context.Context, keys []interface{}, count int) return batches } -func SyncMapKeys(m *sync.Map) []string { - keys := make([]string, 0) - - if m != nil { - m.Range(func(key, value interface{}) bool { - keys = append(keys, key.(string)) - - return true - }) - } - - return keys -} - -func SyncMapIDs(m *sync.Map) []interface{} { - ids := make([]interface{}, 0) - - if m != nil { - m.Range(func(key, value interface{}) bool { - ids = append(ids, value.(contracts.IDer).ID()) - - return true - }) - } - - return ids -} - -func SyncMapEntities(m *sync.Map) <-chan contracts.Entity { - entities := make(chan contracts.Entity, 0) - - go func() { - defer close(entities) - if m != nil { - m.Range(func(key, value interface{}) bool { - entities <- value.(contracts.Entity) - - return true - }) - } - - }() - - return entities -} - func IsContextCanceled(err error) bool { return errors.Is(err, context.Canceled) }