From 56f23a9dc39461b2248e5808dae53a72983bfc3d Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Thu, 21 Mar 2019 14:16:54 +0100 Subject: [PATCH] Fix benchmark goroutine leak --- configobject/sync/sync.go | 66 +++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 24 deletions(-) diff --git a/configobject/sync/sync.go b/configobject/sync/sync.go index e788d68f..d092454b 100644 --- a/configobject/sync/sync.go +++ b/configobject/sync/sync.go @@ -104,6 +104,21 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { go UpdatePrepWorker(super, ctx, done, chUpdate, chUpdateBack) go UpdateExecWorker(super, ctx, done, chUpdateBack, wgUpdate, updateCounter) + waitOrKill := func(wg *sync.WaitGroup, done chan struct{}) (kill bool) { + waitDone := make(chan bool) + go func() { + wg.Wait() + close(waitDone) + }() + + select { + case <-waitDone: + return false + case <-done: + return true + } + } + go func() { benchmarc := benchmark.NewBenchmark() wgInsert.Add(len(insert)) @@ -112,15 +127,16 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { chInsert <- insert // Wait for all IDs to be inserted into MySQL - wgInsert.Wait() - + kill := waitOrKill(wgInsert, done) benchmarc.Stop() - log.WithFields(log.Fields{ - "type": ctx.ObjectType, - "count": len(insert), - "benchmark": benchmarc.String(), - "action": "insert", - }).Infof("Inserted %v %ss in %v", len(insert), ctx.ObjectType, benchmarc.String()) + if !kill { + log.WithFields(log.Fields{ + "type": ctx.ObjectType, + "count": len(insert), + "benchmark": benchmarc.String(), + "action": "insert", + }).Infof("Inserted %v %ss in %v", len(insert), ctx.ObjectType, benchmarc.String()) + } }() go func() { @@ -131,15 +147,16 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { chDelete <- delete // Wait for all IDs to be deleted from MySQL - wgDelete.Wait() - + kill := waitOrKill(wgDelete, done) benchmarc.Stop() - log.WithFields(log.Fields{ - "type": ctx.ObjectType, - "count": len(delete), - "benchmark": benchmarc.String(), - "action": "delete", - }).Infof("Deleted %v %ss in %v", len(delete), ctx.ObjectType, benchmarc.String()) + if !kill { + log.WithFields(log.Fields{ + "type": ctx.ObjectType, + "count": len(delete), + "benchmark": benchmarc.String(), + "action": "delete", + }).Infof("Deleted %v %ss in %v", len(delete), ctx.ObjectType, benchmarc.String()) + } }() go func() { @@ -150,15 +167,16 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { chUpdateComp <- update // Wait for all IDs to be update in MySQL - wgUpdate.Wait() - + kill := waitOrKill(wgUpdate, done) benchmarc.Stop() - log.WithFields(log.Fields{ - "type": ctx.ObjectType, - "count": atomic.LoadUint32(updateCounter), - "benchmark": benchmarc.String(), - "action": "update", - }).Infof("Updated %v %ss in %v", atomic.LoadUint32(updateCounter), ctx.ObjectType, benchmarc.String()) + if !kill { + log.WithFields(log.Fields{ + "type": ctx.ObjectType, + "count": atomic.LoadUint32(updateCounter), + "benchmark": benchmarc.String(), + "action": "update", + }).Infof("Updated %v %ss in %v", atomic.LoadUint32(updateCounter), ctx.ObjectType, benchmarc.String()) + } }() } }