Merge pull request #122 from Icinga/bugfix/configsync-overwriting-statesync

Fix config sync overwriting state sync
This commit is contained in:
Alexander Aleksandrovič Klimov 2020-03-05 15:58:03 +01:00 committed by GitHub
commit b6395e5e00
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 33 additions and 17 deletions

View file

@ -72,6 +72,8 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co
continue
}
super.WgConfigSync.Add(3)
log.Debugf("%s: Got responsibility", objectInformation.ObjectType)
//TODO: This should only be done, if HA was taken over from another instance
@ -119,6 +121,8 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co
}
go func() {
defer super.WgConfigSync.Done()
benchmarc := utils.NewBenchmark()
wgInsert.Add(len(insert))
@ -139,6 +143,8 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co
}()
go func() {
defer super.WgConfigSync.Done()
benchmarc := utils.NewBenchmark()
wgDelete.Add(len(delete))
@ -160,6 +166,8 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co
if objectInformation.HasChecksum {
go func() {
defer super.WgConfigSync.Done()
benchmarc := utils.NewBenchmark()
wgUpdate.Add(len(update))
@ -178,6 +186,8 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co
}).Infof("Updated %v %ss in %v", atomic.LoadUint32(updateCounter), objectInformation.ObjectType, benchmarc.String())
}
}()
} else {
super.WgConfigSync.Done()
}
}
}

View file

@ -26,12 +26,13 @@ func SetupConfigSync(t *testing.T, objectTypes []*configobject.ObjectInformation
require.NoError(t, err, "Is the MySQL server running?")
super := supervisor.Supervisor{
ChErr: make(chan error),
ChDecode: make(chan *jsondecoder.JsonDecodePackages),
Rdbw: rdbw,
Dbw: dbw,
EnvLock: &sync.Mutex{},
EnvId: utils.EncodeChecksum("e057d4ea363fbab414a874371da253dba3d713bc"),
ChErr: make(chan error),
ChDecode: make(chan *jsondecoder.JsonDecodePackages),
Rdbw: rdbw,
Dbw: dbw,
EnvLock: &sync.Mutex{},
EnvId: utils.EncodeChecksum("e057d4ea363fbab414a874371da253dba3d713bc"),
WgConfigSync: &sync.WaitGroup{},
}
go jsondecoder.DecodePool(super.ChDecode, super.ChErr, 16)

View file

@ -70,6 +70,9 @@ func logSyncCounters() {
// syncStates tries to sync the states of given object type every second.
func syncStates(super *supervisor.Supervisor, objectType string, counter *uint64, observer prometheus.Observer) {
// Do not sync states during initial config sync
super.WgConfigSync.Wait()
if super.EnvId == nil {
log.Debug("StateSync: Waiting for EnvId to be set")
time.Sleep(time.Second)

11
main.go
View file

@ -115,11 +115,12 @@ func main() {
}
super := supervisor.Supervisor{
ChErr: make(chan error),
ChDecode: make(chan *jsondecoder.JsonDecodePackages),
Rdbw: redisConn,
Dbw: mysqlConn,
EnvLock: &sync.Mutex{},
ChErr: make(chan error),
ChDecode: make(chan *jsondecoder.JsonDecodePackages),
Rdbw: redisConn,
Dbw: mysqlConn,
EnvLock: &sync.Mutex{},
WgConfigSync: &sync.WaitGroup{},
}
chEnv := make(chan *ha.Environment)

View file

@ -9,10 +9,11 @@ import (
)
type Supervisor struct {
ChErr chan error
ChDecode chan *jsondecoder.JsonDecodePackages
Rdbw *connection.RDBWrapper
Dbw *connection.DBWrapper
EnvId []byte
EnvLock *sync.Mutex
ChErr chan error
ChDecode chan *jsondecoder.JsonDecodePackages
Rdbw *connection.RDBWrapper
Dbw *connection.DBWrapper
EnvId []byte
EnvLock *sync.Mutex
WgConfigSync *sync.WaitGroup
}