diff --git a/configobject/configsync/configsync.go b/configobject/configsync/configsync.go index 5d1c8e6c..2eca5dfd 100644 --- a/configobject/configsync/configsync.go +++ b/configobject/configsync/configsync.go @@ -104,6 +104,21 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { go UpdatePrepWorker(super, ctx, done, chUpdate, chUpdateBack) go UpdateExecWorker(super, ctx, done, chUpdateBack, wgUpdate, updateCounter) + waitOrKill := func(wg *sync.WaitGroup, done chan struct{}) (kill bool) { + waitDone := make(chan bool) + go func() { + wg.Wait() + close(waitDone) + }() + + select { + case <-waitDone: + return false + case <-done: + return true + } + } + go func() { benchmarc := benchmark.NewBenchmark() wgInsert.Add(len(insert)) @@ -112,15 +127,16 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { chInsert <- insert // Wait for all IDs to be inserted into MySQL - wgInsert.Wait() - + kill := waitOrKill(wgInsert, done) benchmarc.Stop() - log.WithFields(log.Fields{ - "type": ctx.ObjectType, - "count": len(insert), - "benchmark": benchmarc.String(), - "action": "insert", - }).Infof("Inserted %v %ss in %v", len(insert), ctx.ObjectType, benchmarc.String()) + if !kill { + log.WithFields(log.Fields{ + "type": ctx.ObjectType, + "count": len(insert), + "benchmark": benchmarc.String(), + "action": "insert", + }).Infof("Inserted %v %ss in %v", len(insert), ctx.ObjectType, benchmarc.String()) + } }() go func() { @@ -131,15 +147,16 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { chDelete <- delete // Wait for all IDs to be deleted from MySQL - wgDelete.Wait() - + kill := waitOrKill(wgDelete, done) benchmarc.Stop() - log.WithFields(log.Fields{ - "type": ctx.ObjectType, - "count": len(delete), - "benchmark": benchmarc.String(), - "action": "delete", - }).Infof("Deleted %v %ss in %v", len(delete), ctx.ObjectType, benchmarc.String()) + if !kill { + log.WithFields(log.Fields{ + "type": ctx.ObjectType, + "count": len(delete), + "benchmark": benchmarc.String(), + "action": "delete", + }).Infof("Deleted %v %ss in %v", len(delete), ctx.ObjectType, benchmarc.String()) + } }() go func() { @@ -150,15 +167,16 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { chUpdateComp <- update // Wait for all IDs to be update in MySQL - wgUpdate.Wait() - + kill := waitOrKill(wgUpdate, done) benchmarc.Stop() - log.WithFields(log.Fields{ - "type": ctx.ObjectType, - "count": atomic.LoadUint32(updateCounter), - "benchmark": benchmarc.String(), - "action": "update", - }).Infof("Updated %v %ss in %v", atomic.LoadUint32(updateCounter), ctx.ObjectType, benchmarc.String()) + if !kill { + log.WithFields(log.Fields{ + "type": ctx.ObjectType, + "count": atomic.LoadUint32(updateCounter), + "benchmark": benchmarc.String(), + "action": "update", + }).Infof("Updated %v %ss in %v", atomic.LoadUint32(updateCounter), ctx.ObjectType, benchmarc.String()) + } }() } }