Implement host delete

This commit is contained in:
Noah Hilverling 2019-03-13 11:27:49 +01:00
parent e47c38d06e
commit 209be71799

View file

@ -248,6 +248,8 @@ func SyncOperator(super *supervisor.Supervisor, chHA chan int) error {
wgInsert := &sync.WaitGroup{}
wgInsert.Add(len(insert))
wgDelete := &sync.WaitGroup{}
wgDelete.Add(len(delete))
chInsert = make(chan []string)
chDelete = make(chan []string)
@ -256,16 +258,23 @@ func SyncOperator(super *supervisor.Supervisor, chHA chan int) error {
go InsertPrepWorker(super, chInsert, chInsertBack)
go InsertExecWorker(super, chInsertBack, wgInsert)
go DeleteExecWorker(super, chDelete, wgDelete)
go func() {
benchmarc := benchmark.NewBenchmark()
chInsert <- insert
wgInsert.Wait()
benchmarc.Stop()
log.Infof("Synced %v hosts in %v seconds", len(insert), benchmarc.String())
log.Infof("Inserted %v hosts in %v seconds", len(insert), benchmarc.String())
}()
//chUpdate <- update
//chDelete <- delete
go func() {
benchmarc := benchmark.NewBenchmark()
chDelete <- delete
wgDelete.Wait()
benchmarc.Stop()
log.Infof("Deleted %v hosts in %v seconds", len(delete), benchmarc.String())
}()
}
}
return nil
@ -274,7 +283,7 @@ func SyncOperator(super *supervisor.Supervisor, chHA chan int) error {
func InsertPrepWorker(super *supervisor.Supervisor, chInsert <-chan []string, chInsertBack chan<- []configobject.Row) {
defer log.Info("Host: Insert preparation routine stopped")
worker := func(chunk *icingadb_connection.ConfigChunk) {
prep := func(chunk *icingadb_connection.ConfigChunk) {
pkgs := icingadb_json_decoder.JsonDecodePackages{
ChBack: chInsertBack,
}
@ -298,22 +307,32 @@ func InsertPrepWorker(super *supervisor.Supervisor, chInsert <-chan []string, ch
for keys := range chInsert {
done := make(chan struct{})
ch := super.Rdbw.PipeConfigChunks(done, keys, "host")
for chunk := range ch {
worker(chunk)
}
go func() {
for chunk := range ch {
go prep(chunk)
}
}()
}
}
func InsertExecWorker(super *supervisor.Supervisor, chInsertBack <-chan []configobject.Row, wg *sync.WaitGroup) {
defer log.Info("Host: Insert exec routine stopped")
worker := func(rows []configobject.Row) {
super.Dbw.SqlBulkInsert(rows, BulkInsertStmt)
wg.Add(-len(rows))
}
for rows := range chInsertBack {
//log.Infof("Inserting %v hosts...", len(rows))
go worker(rows)
go func(rows []configobject.Row) {
super.ChErr <- super.Dbw.SqlBulkInsert(rows, BulkInsertStmt)
wg.Add(-len(rows))
}(rows)
}
}
func DeleteExecWorker(super *supervisor.Supervisor, chDelete <-chan []string, wg *sync.WaitGroup) {
defer log.Info("Host: Delete exec routine stopped")
for keys := range chDelete {
go func(keys []string) {
super.ChErr <- super.Dbw.SqlBulkDelete(keys, BulkDeleteStmt)
wg.Add(-len(keys))
}(keys)
}
}