diff --git a/configobject/configsync/configsync.go b/configobject/configsync/configsync.go index 438f3e28..bd4e25de 100644 --- a/configobject/configsync/configsync.go +++ b/configobject/configsync/configsync.go @@ -72,6 +72,8 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co continue } + super.WgConfigSync.Add(3) + log.Debugf("%s: Got responsibility", objectInformation.ObjectType) //TODO: This should only be done, if HA was taken over from another instance @@ -119,6 +121,8 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co } go func() { + defer super.WgConfigSync.Done() + benchmarc := utils.NewBenchmark() wgInsert.Add(len(insert)) @@ -139,6 +143,8 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co }() go func() { + defer super.WgConfigSync.Done() + benchmarc := utils.NewBenchmark() wgDelete.Add(len(delete)) @@ -160,6 +166,8 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co if objectInformation.HasChecksum { go func() { + defer super.WgConfigSync.Done() + benchmarc := utils.NewBenchmark() wgUpdate.Add(len(update)) @@ -178,6 +186,8 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co }).Infof("Updated %v %ss in %v", atomic.LoadUint32(updateCounter), objectInformation.ObjectType, benchmarc.String()) } }() + } else { + super.WgConfigSync.Done() } } } diff --git a/configobject/configsync/configsync_test.go b/configobject/configsync/configsync_test.go index 5ca1b185..1e03d105 100644 --- a/configobject/configsync/configsync_test.go +++ b/configobject/configsync/configsync_test.go @@ -26,12 +26,13 @@ func SetupConfigSync(t *testing.T, objectTypes []*configobject.ObjectInformation require.NoError(t, err, "Is the MySQL server running?") super := supervisor.Supervisor{ - ChErr: make(chan error), - ChDecode: make(chan *jsondecoder.JsonDecodePackages), - Rdbw: rdbw, - Dbw: dbw, - EnvLock: &sync.Mutex{}, - EnvId: utils.EncodeChecksum("e057d4ea363fbab414a874371da253dba3d713bc"), + ChErr: make(chan error), + ChDecode: make(chan *jsondecoder.JsonDecodePackages), + Rdbw: rdbw, + Dbw: dbw, + EnvLock: &sync.Mutex{}, + EnvId: utils.EncodeChecksum("e057d4ea363fbab414a874371da253dba3d713bc"), + WgConfigSync: &sync.WaitGroup{}, } go jsondecoder.DecodePool(super.ChDecode, super.ChErr, 16) diff --git a/configobject/statesync/statesync.go b/configobject/statesync/statesync.go index 2d7d9229..a86057a4 100644 --- a/configobject/statesync/statesync.go +++ b/configobject/statesync/statesync.go @@ -70,6 +70,9 @@ func logSyncCounters() { // syncStates tries to sync the states of given object type every second. func syncStates(super *supervisor.Supervisor, objectType string, counter *uint64, observer prometheus.Observer) { + // Do not sync states during initial config sync + super.WgConfigSync.Wait() + if super.EnvId == nil { log.Debug("StateSync: Waiting for EnvId to be set") time.Sleep(time.Second) diff --git a/main.go b/main.go index 9ce3b171..147fe828 100644 --- a/main.go +++ b/main.go @@ -115,11 +115,12 @@ func main() { } super := supervisor.Supervisor{ - ChErr: make(chan error), - ChDecode: make(chan *jsondecoder.JsonDecodePackages), - Rdbw: redisConn, - Dbw: mysqlConn, - EnvLock: &sync.Mutex{}, + ChErr: make(chan error), + ChDecode: make(chan *jsondecoder.JsonDecodePackages), + Rdbw: redisConn, + Dbw: mysqlConn, + EnvLock: &sync.Mutex{}, + WgConfigSync: &sync.WaitGroup{}, } chEnv := make(chan *ha.Environment) diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index 98596214..4868670a 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -9,10 +9,11 @@ import ( ) type Supervisor struct { - ChErr chan error - ChDecode chan *jsondecoder.JsonDecodePackages - Rdbw *connection.RDBWrapper - Dbw *connection.DBWrapper - EnvId []byte - EnvLock *sync.Mutex + ChErr chan error + ChDecode chan *jsondecoder.JsonDecodePackages + Rdbw *connection.RDBWrapper + Dbw *connection.DBWrapper + EnvId []byte + EnvLock *sync.Mutex + WgConfigSync *sync.WaitGroup }