diff --git a/configobject/host/host.go b/configobject/host/host.go index dd74c825..20317396 100644 --- a/configobject/host/host.go +++ b/configobject/host/host.go @@ -248,6 +248,8 @@ func SyncOperator(super *supervisor.Supervisor, chHA chan int) error { wgInsert := &sync.WaitGroup{} wgInsert.Add(len(insert)) + wgDelete := &sync.WaitGroup{} + wgDelete.Add(len(delete)) chInsert = make(chan []string) chDelete = make(chan []string) @@ -256,16 +258,23 @@ func SyncOperator(super *supervisor.Supervisor, chHA chan int) error { go InsertPrepWorker(super, chInsert, chInsertBack) go InsertExecWorker(super, chInsertBack, wgInsert) + go DeleteExecWorker(super, chDelete, wgDelete) + go func() { benchmarc := benchmark.NewBenchmark() chInsert <- insert wgInsert.Wait() benchmarc.Stop() - log.Infof("Synced %v hosts in %v seconds", len(insert), benchmarc.String()) + log.Infof("Inserted %v hosts in %v seconds", len(insert), benchmarc.String()) }() - //chUpdate <- update - //chDelete <- delete + go func() { + benchmarc := benchmark.NewBenchmark() + chDelete <- delete + wgDelete.Wait() + benchmarc.Stop() + log.Infof("Deleted %v hosts in %v seconds", len(delete), benchmarc.String()) + }() } } return nil @@ -274,7 +283,7 @@ func SyncOperator(super *supervisor.Supervisor, chHA chan int) error { func InsertPrepWorker(super *supervisor.Supervisor, chInsert <-chan []string, chInsertBack chan<- []configobject.Row) { defer log.Info("Host: Insert preparation routine stopped") - worker := func(chunk *icingadb_connection.ConfigChunk) { + prep := func(chunk *icingadb_connection.ConfigChunk) { pkgs := icingadb_json_decoder.JsonDecodePackages{ ChBack: chInsertBack, } @@ -298,22 +307,32 @@ func InsertPrepWorker(super *supervisor.Supervisor, chInsert <-chan []string, ch for keys := range chInsert { done := make(chan struct{}) ch := super.Rdbw.PipeConfigChunks(done, keys, "host") - for chunk := range ch { - worker(chunk) - } + go func() { + for chunk := range ch { + go prep(chunk) + } + }() } } func InsertExecWorker(super *supervisor.Supervisor, chInsertBack <-chan []configobject.Row, wg *sync.WaitGroup) { defer log.Info("Host: Insert exec routine stopped") - worker := func(rows []configobject.Row) { - super.Dbw.SqlBulkInsert(rows, BulkInsertStmt) - wg.Add(-len(rows)) - } - for rows := range chInsertBack { - //log.Infof("Inserting %v hosts...", len(rows)) - go worker(rows) + go func(rows []configobject.Row) { + super.ChErr <- super.Dbw.SqlBulkInsert(rows, BulkInsertStmt) + wg.Add(-len(rows)) + }(rows) + } +} + +func DeleteExecWorker(super *supervisor.Supervisor, chDelete <-chan []string, wg *sync.WaitGroup) { + defer log.Info("Host: Delete exec routine stopped") + + for keys := range chDelete { + go func(keys []string) { + super.ChErr <- super.Dbw.SqlBulkDelete(keys, BulkDeleteStmt) + wg.Add(-len(keys)) + }(keys) } } \ No newline at end of file