Improve update logic

This commit is contained in:
Noah Hilverling 2019-03-19 10:43:10 +01:00
parent 2c63d6b176
commit 60c240cc4e

View file

@ -16,7 +16,7 @@ import (
var (
BulkInsertStmt *icingadb_connection.BulkInsertStmt
BulkDeleteStmt *icingadb_connection.BulkDeleteStmt
UpdateStmt *icingadb_connection.UpdateStmt
BulkUpdateStmt *icingadb_connection.BulkUpdateStmt
Fields = []string{
"id",
"env_id",
@ -186,7 +186,7 @@ func (h *Host) SetId(id string) {
func init() {
BulkInsertStmt = icingadb_connection.NewBulkInsertStmt("host", Fields)
BulkDeleteStmt = icingadb_connection.NewBulkDeleteStmt("host")
UpdateStmt = icingadb_connection.NewUpdateStmt("host", Fields[1:]) // Omit Id from fields
BulkUpdateStmt = icingadb_connection.NewBulkUpdateStmt("host", Fields)
}
func SyncOperator(super *supervisor.Supervisor, chHA chan int) error {
@ -234,6 +234,7 @@ func SyncOperator(super *supervisor.Supervisor, chHA chan int) error {
chUpdateBack chan []configobject.Row
wgInsert = &sync.WaitGroup{}
wgDelete = &sync.WaitGroup{}
wgUpdate = &sync.WaitGroup{}
)
go func() {
for msg := range chHA {
@ -273,15 +274,14 @@ func SyncOperator(super *supervisor.Supervisor, chHA chan int) error {
go DeleteExecWorker(super, chDelete, wgDelete)
go UpdateCompWorker(super, chUpdateComp, chUpdate)
go UpdateCompWorker(super, chUpdateComp, chUpdate, wgUpdate)
go UpdatePrepWorker(super, chUpdate, chUpdateBack)
go UpdateExecWorker(super, chUpdateBack)
go UpdateExecWorker(super, chUpdateBack, wgUpdate)
go func() {
benchmarc := benchmark.NewBenchmark()
wgInsert.Add(len(insert))
chInsert <- insert
//insert = nil
wgInsert.Wait()
benchmarc.Stop()
log.Infof("Inserted %v hosts in %v seconds", len(insert), benchmarc.String())
@ -291,15 +291,18 @@ func SyncOperator(super *supervisor.Supervisor, chHA chan int) error {
benchmarc := benchmark.NewBenchmark()
wgDelete.Add(len(delete))
chDelete <- delete
//delete = nil
wgDelete.Wait()
benchmarc.Stop()
log.Infof("Deleted %v hosts in %v seconds", len(delete), benchmarc.String())
}()
go func() {
benchmarc := benchmark.NewBenchmark()
wgUpdate.Add(len(update))
chUpdateComp <- update
//update = nil
wgUpdate.Wait()
benchmarc.Stop()
log.Infof("Updated %v hosts in %v seconds", len(update), benchmarc.String())
}()
}
}
@ -372,7 +375,7 @@ type HostChecksums struct {
GroupsChecksum string `json:"groups_checksum"`
}
func UpdateCompWorker(super *supervisor.Supervisor, chUpdate <-chan []string, chUpdateBack chan<- []string) {
func UpdateCompWorker(super *supervisor.Supervisor, chUpdate <-chan []string, chUpdateBack chan<- []string, wg *sync.WaitGroup) {
defer log.Info("Host: Update comparison routine stopped")
prep := func(chunk *icingadb_connection.ChecksumChunk, mysqlChecksums map[string]map[string]string) {
@ -382,7 +385,7 @@ func UpdateCompWorker(super *supervisor.Supervisor, chUpdate <-chan []string, ch
continue
}
//TODO: Check if this can be done better (json in this func)
//TODO: Check if this can be done better (json should not be processed in this func)
redisChecksums := &HostChecksums{}
err := json.Unmarshal([]byte(chunk.Checksums[i].(string)), redisChecksums)
if err != nil {
@ -391,6 +394,8 @@ func UpdateCompWorker(super *supervisor.Supervisor, chUpdate <-chan []string, ch
if redisChecksums.PropertiesChecksum != mysqlChecksums[key]["properties_checksum"] {
changed = append(changed, key)
} else {
wg.Done()
}
}
chUpdateBack <- changed
@ -447,13 +452,13 @@ func UpdatePrepWorker(super *supervisor.Supervisor, chUpdate <-chan []string, ch
}
}
func UpdateExecWorker(super *supervisor.Supervisor, chUpdateBack <-chan []configobject.Row) {
defer log.Info("Host: Insert exec routine stopped")
func UpdateExecWorker(super *supervisor.Supervisor, chUpdateBack <-chan []configobject.Row, wg *sync.WaitGroup) {
defer log.Info("Host: Update exec routine stopped")
for rows := range chUpdateBack {
go func(rows []configobject.Row) {
super.ChErr <- super.Dbw.SqlBulkUpdate(rows, UpdateStmt)
log.Infof("Updated %v hosts", len(rows))
super.ChErr <- super.Dbw.SqlBulkUpdate(rows, BulkUpdateStmt)
wg.Add(-len(rows))
}(rows)
}
}