Merge branch 'fix/benchmark-goroutine-leak' into 'master'

Fix benchmark goroutine leak

See merge request icingadb/icingadb-main!6
This commit is contained in:
Jean Flach 2019-03-22 12:48:54 +01:00
commit 81fe7083df

View file

@ -104,6 +104,21 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
go UpdatePrepWorker(super, ctx, done, chUpdate, chUpdateBack)
go UpdateExecWorker(super, ctx, done, chUpdateBack, wgUpdate, updateCounter)
waitOrKill := func(wg *sync.WaitGroup, done chan struct{}) (kill bool) {
waitDone := make(chan bool)
go func() {
wg.Wait()
close(waitDone)
}()
select {
case <-waitDone:
return false
case <-done:
return true
}
}
go func() {
benchmarc := benchmark.NewBenchmark()
wgInsert.Add(len(insert))
@ -112,15 +127,16 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
chInsert <- insert
// Wait for all IDs to be inserted into MySQL
wgInsert.Wait()
kill := waitOrKill(wgInsert, done)
benchmarc.Stop()
log.WithFields(log.Fields{
"type": ctx.ObjectType,
"count": len(insert),
"benchmark": benchmarc.String(),
"action": "insert",
}).Infof("Inserted %v %ss in %v", len(insert), ctx.ObjectType, benchmarc.String())
if !kill {
log.WithFields(log.Fields{
"type": ctx.ObjectType,
"count": len(insert),
"benchmark": benchmarc.String(),
"action": "insert",
}).Infof("Inserted %v %ss in %v", len(insert), ctx.ObjectType, benchmarc.String())
}
}()
go func() {
@ -131,15 +147,16 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
chDelete <- delete
// Wait for all IDs to be deleted from MySQL
wgDelete.Wait()
kill := waitOrKill(wgDelete, done)
benchmarc.Stop()
log.WithFields(log.Fields{
"type": ctx.ObjectType,
"count": len(delete),
"benchmark": benchmarc.String(),
"action": "delete",
}).Infof("Deleted %v %ss in %v", len(delete), ctx.ObjectType, benchmarc.String())
if !kill {
log.WithFields(log.Fields{
"type": ctx.ObjectType,
"count": len(delete),
"benchmark": benchmarc.String(),
"action": "delete",
}).Infof("Deleted %v %ss in %v", len(delete), ctx.ObjectType, benchmarc.String())
}
}()
go func() {
@ -150,15 +167,16 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
chUpdateComp <- update
// Wait for all IDs to be update in MySQL
wgUpdate.Wait()
kill := waitOrKill(wgUpdate, done)
benchmarc.Stop()
log.WithFields(log.Fields{
"type": ctx.ObjectType,
"count": atomic.LoadUint32(updateCounter),
"benchmark": benchmarc.String(),
"action": "update",
}).Infof("Updated %v %ss in %v", atomic.LoadUint32(updateCounter), ctx.ObjectType, benchmarc.String())
if !kill {
log.WithFields(log.Fields{
"type": ctx.ObjectType,
"count": atomic.LoadUint32(updateCounter),
"benchmark": benchmarc.String(),
"action": "update",
}).Infof("Updated %v %ss in %v", atomic.LoadUint32(updateCounter), ctx.ObjectType, benchmarc.String())
}
}()
}
}