diff --git a/configobject/host/host.go b/configobject/host/host.go index 40efbbbb..8650ebd2 100644 --- a/configobject/host/host.go +++ b/configobject/host/host.go @@ -16,7 +16,7 @@ import ( var ( BulkInsertStmt *icingadb_connection.BulkInsertStmt BulkDeleteStmt *icingadb_connection.BulkDeleteStmt - UpdateStmt *icingadb_connection.UpdateStmt + BulkUpdateStmt *icingadb_connection.BulkUpdateStmt Fields = []string{ "id", "env_id", @@ -186,7 +186,7 @@ func (h *Host) SetId(id string) { func init() { BulkInsertStmt = icingadb_connection.NewBulkInsertStmt("host", Fields) BulkDeleteStmt = icingadb_connection.NewBulkDeleteStmt("host") - UpdateStmt = icingadb_connection.NewUpdateStmt("host", Fields[1:]) // Omit Id from fields + BulkUpdateStmt = icingadb_connection.NewBulkUpdateStmt("host", Fields) } func SyncOperator(super *supervisor.Supervisor, chHA chan int) error { @@ -234,6 +234,7 @@ func SyncOperator(super *supervisor.Supervisor, chHA chan int) error { chUpdateBack chan []configobject.Row wgInsert = &sync.WaitGroup{} wgDelete = &sync.WaitGroup{} + wgUpdate = &sync.WaitGroup{} ) go func() { for msg := range chHA { @@ -273,15 +274,14 @@ func SyncOperator(super *supervisor.Supervisor, chHA chan int) error { go DeleteExecWorker(super, chDelete, wgDelete) - go UpdateCompWorker(super, chUpdateComp, chUpdate) + go UpdateCompWorker(super, chUpdateComp, chUpdate, wgUpdate) go UpdatePrepWorker(super, chUpdate, chUpdateBack) - go UpdateExecWorker(super, chUpdateBack) + go UpdateExecWorker(super, chUpdateBack, wgUpdate) go func() { benchmarc := benchmark.NewBenchmark() wgInsert.Add(len(insert)) chInsert <- insert - //insert = nil wgInsert.Wait() benchmarc.Stop() log.Infof("Inserted %v hosts in %v seconds", len(insert), benchmarc.String()) @@ -291,15 +291,18 @@ func SyncOperator(super *supervisor.Supervisor, chHA chan int) error { benchmarc := benchmark.NewBenchmark() wgDelete.Add(len(delete)) chDelete <- delete - //delete = nil wgDelete.Wait() benchmarc.Stop() log.Infof("Deleted %v hosts in %v seconds", len(delete), benchmarc.String()) }() go func() { + benchmarc := benchmark.NewBenchmark() + wgUpdate.Add(len(update)) chUpdateComp <- update - //update = nil + wgUpdate.Wait() + benchmarc.Stop() + log.Infof("Updated %v hosts in %v seconds", len(update), benchmarc.String()) }() } } @@ -372,7 +375,7 @@ type HostChecksums struct { GroupsChecksum string `json:"groups_checksum"` } -func UpdateCompWorker(super *supervisor.Supervisor, chUpdate <-chan []string, chUpdateBack chan<- []string) { +func UpdateCompWorker(super *supervisor.Supervisor, chUpdate <-chan []string, chUpdateBack chan<- []string, wg *sync.WaitGroup) { defer log.Info("Host: Update comparison routine stopped") prep := func(chunk *icingadb_connection.ChecksumChunk, mysqlChecksums map[string]map[string]string) { @@ -382,7 +385,7 @@ func UpdateCompWorker(super *supervisor.Supervisor, chUpdate <-chan []string, ch continue } - //TODO: Check if this can be done better (json in this func) + //TODO: Check if this can be done better (json should not be processed in this func) redisChecksums := &HostChecksums{} err := json.Unmarshal([]byte(chunk.Checksums[i].(string)), redisChecksums) if err != nil { @@ -391,6 +394,8 @@ func UpdateCompWorker(super *supervisor.Supervisor, chUpdate <-chan []string, ch if redisChecksums.PropertiesChecksum != mysqlChecksums[key]["properties_checksum"] { changed = append(changed, key) + } else { + wg.Done() } } chUpdateBack <- changed @@ -447,13 +452,13 @@ func UpdatePrepWorker(super *supervisor.Supervisor, chUpdate <-chan []string, ch } } -func UpdateExecWorker(super *supervisor.Supervisor, chUpdateBack <-chan []configobject.Row) { - defer log.Info("Host: Insert exec routine stopped") +func UpdateExecWorker(super *supervisor.Supervisor, chUpdateBack <-chan []configobject.Row, wg *sync.WaitGroup) { + defer log.Info("Host: Update exec routine stopped") for rows := range chUpdateBack { go func(rows []configobject.Row) { - super.ChErr <- super.Dbw.SqlBulkUpdate(rows, UpdateStmt) - log.Infof("Updated %v hosts", len(rows)) + super.ChErr <- super.Dbw.SqlBulkUpdate(rows, BulkUpdateStmt) + wg.Add(-len(rows)) }(rows) } } \ No newline at end of file