mirror of
https://github.com/Icinga/icingadb.git
synced 2026-06-06 15:22:08 -04:00
Merge pull request #28 from lippserd/bugfix/delta-race
Delta#start(): avoid a race across maps by using a mutex
This commit is contained in:
commit
0c8a9139b5
6 changed files with 84 additions and 75 deletions
|
|
@ -67,7 +67,7 @@ func main() {
|
|||
return err
|
||||
}
|
||||
|
||||
entities := utils.SyncMapEntities(delta.Create)
|
||||
entities := delta.Create.Entities(synctx)
|
||||
flat := make(chan contracts.Entity, 0)
|
||||
|
||||
cvg, _ := errgroup.WithContext(synctx)
|
||||
|
|
|
|||
|
|
@ -12,9 +12,9 @@ import (
|
|||
)
|
||||
|
||||
type Delta struct {
|
||||
Create *sync.Map
|
||||
Update *sync.Map
|
||||
Delete *sync.Map
|
||||
Create EntitiesById
|
||||
Update EntitiesById
|
||||
Delete EntitiesById
|
||||
WithChecksum bool
|
||||
done chan error
|
||||
err error
|
||||
|
|
@ -40,12 +40,13 @@ func (delta Delta) Wait() error {
|
|||
func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contracts.Entity) {
|
||||
defer close(delta.done)
|
||||
|
||||
var update sync.Map
|
||||
var update EntitiesById
|
||||
if delta.WithChecksum {
|
||||
update = sync.Map{}
|
||||
update = EntitiesById{}
|
||||
}
|
||||
actual := sync.Map{}
|
||||
desired := sync.Map{}
|
||||
actual := EntitiesById{}
|
||||
desired := EntitiesById{}
|
||||
var mtx, updateMtx sync.Mutex
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
g.Go(func() error {
|
||||
|
|
@ -61,14 +62,20 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra
|
|||
}
|
||||
|
||||
id := a.ID().String()
|
||||
if d, ok := desired.Load(id); ok {
|
||||
desired.Delete(id)
|
||||
mtx.Lock()
|
||||
|
||||
if d, ok := desired[id]; ok {
|
||||
delete(desired, id)
|
||||
mtx.Unlock()
|
||||
|
||||
if delta.WithChecksum && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) {
|
||||
update.Store(id, d)
|
||||
updateMtx.Lock()
|
||||
update[id] = d
|
||||
updateMtx.Unlock()
|
||||
}
|
||||
} else {
|
||||
actual.Store(id, a)
|
||||
actual[id] = a
|
||||
mtx.Unlock()
|
||||
}
|
||||
|
||||
cnt.Inc()
|
||||
|
|
@ -91,14 +98,20 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra
|
|||
}
|
||||
|
||||
id := d.ID().String()
|
||||
if a, ok := actual.Load(id); ok {
|
||||
actual.Delete(id)
|
||||
mtx.Lock()
|
||||
|
||||
if a, ok := actual[id]; ok {
|
||||
delete(actual, id)
|
||||
mtx.Unlock()
|
||||
|
||||
if delta.WithChecksum && !a.(contracts.Checksumer).Checksum().Equal(d.(contracts.Checksumer).Checksum()) {
|
||||
update.Store(id, d)
|
||||
updateMtx.Lock()
|
||||
update[id] = d
|
||||
updateMtx.Unlock()
|
||||
}
|
||||
} else {
|
||||
desired.Store(id, d)
|
||||
desired[id] = d
|
||||
mtx.Unlock()
|
||||
}
|
||||
|
||||
cnt.Inc()
|
||||
|
|
@ -114,9 +127,9 @@ func (delta *Delta) start(ctx context.Context, actualCh, desiredCh <-chan contra
|
|||
return
|
||||
}
|
||||
|
||||
delta.Create = &desired
|
||||
delta.Delete = &actual
|
||||
delta.Create = desired
|
||||
delta.Delete = actual
|
||||
if delta.WithChecksum {
|
||||
delta.Update = &update
|
||||
delta.Update = update
|
||||
}
|
||||
}
|
||||
|
|
|
|||
44
pkg/icingadb/entitiesbyid.go
Normal file
44
pkg/icingadb/entitiesbyid.go
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
package icingadb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
)
|
||||
|
||||
type EntitiesById map[string]contracts.Entity
|
||||
|
||||
func (ebi EntitiesById) Keys() []string {
|
||||
keys := make([]string, 0, len(ebi))
|
||||
for k := range ebi {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
|
||||
return keys
|
||||
}
|
||||
|
||||
func (ebi EntitiesById) IDs() []interface{} {
|
||||
ids := make([]interface{}, 0, len(ebi))
|
||||
for _, v := range ebi {
|
||||
ids = append(ids, v.(contracts.IDer).ID())
|
||||
}
|
||||
|
||||
return ids
|
||||
}
|
||||
|
||||
func (ebi EntitiesById) Entities(ctx context.Context) <-chan contracts.Entity {
|
||||
entities := make(chan contracts.Entity, 0)
|
||||
|
||||
go func() {
|
||||
defer close(entities)
|
||||
|
||||
for _, v := range ebi {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case entities <- v:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return entities
|
||||
}
|
||||
|
|
@ -82,7 +82,7 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc)
|
|||
fmt.Sprintf("icinga:config:%s", utils.Key(utils.Name(v), ':')),
|
||||
count,
|
||||
concurrent,
|
||||
utils.SyncMapKeys(delta.Create)...)
|
||||
delta.Create.Keys()...)
|
||||
// Let errors from Redis cancel our group.
|
||||
com.ErrgroupReceive(g, errs)
|
||||
|
||||
|
|
@ -93,7 +93,7 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc)
|
|||
// Let errors from SetChecksums cancel our group.
|
||||
com.ErrgroupReceive(g, errs)
|
||||
} else {
|
||||
entities = utils.SyncMapEntities(delta.Create)
|
||||
entities = delta.Create.Entities(ctx)
|
||||
}
|
||||
|
||||
g.Go(func() error {
|
||||
|
|
@ -103,13 +103,13 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc)
|
|||
|
||||
// Update
|
||||
{
|
||||
s.logger.Infof("Updating %d rows of type %s", len(utils.SyncMapKeys(delta.Update)), utils.Key(utils.Name(v), ' '))
|
||||
s.logger.Infof("Updating %d rows of type %s", len(delta.Update), utils.Key(utils.Name(v), ' '))
|
||||
pairs, errs := s.redis.HMYield(
|
||||
ctx,
|
||||
fmt.Sprintf("icinga:config:%s", utils.Key(utils.Name(v), ':')),
|
||||
count,
|
||||
concurrent,
|
||||
utils.SyncMapKeys(delta.Update)...)
|
||||
delta.Update.Keys()...)
|
||||
// Let errors from Redis cancel our group.
|
||||
com.ErrgroupReceive(g, errs)
|
||||
|
||||
|
|
@ -130,9 +130,9 @@ func (s Sync) Sync(ctx context.Context, factoryFunc contracts.EntityFactoryFunc)
|
|||
|
||||
// Delete
|
||||
{
|
||||
s.logger.Infof("Deleting %d rows of type %s", len(utils.SyncMapKeys(delta.Delete)), utils.Key(utils.Name(v), ' '))
|
||||
s.logger.Infof("Deleting %d rows of type %s", len(delta.Delete), utils.Key(utils.Name(v), ' '))
|
||||
g.Go(func() error {
|
||||
return s.db.BulkExec(ctx, s.db.BuildDeleteStmt(v), 1<<15, 1<<3, utils.SyncMapIDs(delta.Delete))
|
||||
return s.db.BulkExec(ctx, s.db.BuildDeleteStmt(v), 1<<15, 1<<3, delta.Delete.IDs())
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ import (
|
|||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"github.com/icinga/icingadb/pkg/types"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, pairs <-chan HPair, concurrent int) (<-chan contracts.Entity, <-chan error) {
|
||||
|
|
@ -52,7 +51,7 @@ func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc
|
|||
return entities, com.WaitAsync(g)
|
||||
}
|
||||
|
||||
func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums *sync.Map, concurrent int) (<-chan contracts.Entity, <-chan error) {
|
||||
func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums map[string]contracts.Entity, concurrent int) (<-chan contracts.Entity, <-chan error) {
|
||||
entitiesWithChecksum := make(chan contracts.Entity, 0)
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
|
|
@ -64,7 +63,7 @@ func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksu
|
|||
for i := 0; i < concurrent; i++ {
|
||||
g.Go(func() error {
|
||||
for entity := range entities {
|
||||
if checksumer, ok := checksums.Load(entity.ID().String()); ok {
|
||||
if checksumer, ok := checksums[entity.ID().String()]; ok {
|
||||
entity.(contracts.Checksumer).SetChecksum(checksumer.(contracts.Checksumer).Checksum())
|
||||
} else {
|
||||
panic("no checksum")
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ import (
|
|||
"fmt"
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"github.com/google/uuid"
|
||||
"github.com/icinga/icingadb/pkg/contracts"
|
||||
"go.uber.org/zap"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
|
|
@ -120,52 +119,6 @@ func BatchSliceOfInterfaces(ctx context.Context, keys []interface{}, count int)
|
|||
return batches
|
||||
}
|
||||
|
||||
func SyncMapKeys(m *sync.Map) []string {
|
||||
keys := make([]string, 0)
|
||||
|
||||
if m != nil {
|
||||
m.Range(func(key, value interface{}) bool {
|
||||
keys = append(keys, key.(string))
|
||||
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
return keys
|
||||
}
|
||||
|
||||
func SyncMapIDs(m *sync.Map) []interface{} {
|
||||
ids := make([]interface{}, 0)
|
||||
|
||||
if m != nil {
|
||||
m.Range(func(key, value interface{}) bool {
|
||||
ids = append(ids, value.(contracts.IDer).ID())
|
||||
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
return ids
|
||||
}
|
||||
|
||||
func SyncMapEntities(m *sync.Map) <-chan contracts.Entity {
|
||||
entities := make(chan contracts.Entity, 0)
|
||||
|
||||
go func() {
|
||||
defer close(entities)
|
||||
if m != nil {
|
||||
m.Range(func(key, value interface{}) bool {
|
||||
entities <- value.(contracts.Entity)
|
||||
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
return entities
|
||||
}
|
||||
|
||||
func IsContextCanceled(err error) bool {
|
||||
return errors.Is(err, context.Canceled)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue