From fb3dadd352b19534212d6eac597fb4293ce2ed5b Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Tue, 19 Mar 2019 13:03:14 +0100 Subject: [PATCH] Use done channel to indicate HA responsibility loss in workers --- configobject/sync/sync.go | 155 ++++++++++++++++++++++++++------------ 1 file changed, 105 insertions(+), 50 deletions(-) diff --git a/configobject/sync/sync.go b/configobject/sync/sync.go index 4bbc7da5..066b1af2 100644 --- a/configobject/sync/sync.go +++ b/configobject/sync/sync.go @@ -66,57 +66,38 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", ctx.ObjectType, len(insert), len(update), len(delete)) var ( - chInsert chan []string - chInsertBack chan []configobject.Row - chDelete chan []string - chUpdateComp chan []string - chUpdate chan []string - chUpdateBack chan []configobject.Row - wgInsert = &sync.WaitGroup{} - wgDelete = &sync.WaitGroup{} - wgUpdate = &sync.WaitGroup{} + done chan struct{} + chInsert = make(chan []string) + chInsertBack = make(chan []configobject.Row) + chDelete = make(chan []string) + chUpdateComp = make(chan []string) + chUpdate = make(chan []string) + chUpdateBack = make(chan []configobject.Row) + wgInsert = &sync.WaitGroup{} + wgDelete = &sync.WaitGroup{} + wgUpdate = &sync.WaitGroup{} ) go func() { for msg := range chHA { switch msg { case icingadb_ha.Notify_IsNotResponsible: log.Info(fmt.Sprintf("%s: Lost responsibility", ctx.ObjectType)) - if chInsert != nil { - close(chInsert) - } - if chInsertBack != nil { - close(chInsertBack) - } - if chDelete != nil { - close(chDelete) - } - if chUpdateComp != nil { - close(chUpdateComp) - } - if chUpdate != nil { - close(chUpdate) - } - if chUpdateBack != nil { - close(chUpdateBack) + if done != nil { + close(done) } case icingadb_ha.Notify_IsResponsible: log.Infof("%s: Got responsibility", ctx.ObjectType) - chInsert = make(chan []string) - chInsertBack = make(chan []configobject.Row) - chDelete = make(chan []string) - chUpdateComp = make(chan []string) - chUpdate = make(chan []string) - chUpdateBack = make(chan []configobject.Row) + done = make(chan struct{}) - go InsertPrepWorker(super, ctx, chInsert, chInsertBack) - go InsertExecWorker(super, ctx, chInsertBack, wgInsert) + go InsertPrepWorker(super, ctx, done, chInsert, chInsertBack) + go InsertExecWorker(super, ctx, done, chInsertBack, wgInsert) - go DeleteExecWorker(super, ctx, chDelete, wgDelete) + go DeleteExecWorker(super, ctx, done, chDelete, wgDelete) - go UpdateCompWorker(super, ctx, chUpdateComp, chUpdate, wgUpdate) - go UpdatePrepWorker(super, ctx, chUpdate, chUpdateBack) - go UpdateExecWorker(super, ctx, chUpdateBack, wgUpdate) + go UpdateCompWorker(super, ctx, done, chUpdateComp, chUpdate, wgUpdate) + go UpdatePrepWorker(super, ctx, done, chUpdate, chUpdateBack) + go UpdateExecWorker(super, ctx, done, chUpdateBack, wgUpdate) go func() { benchmarc := benchmark.NewBenchmark() @@ -124,7 +105,7 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { chInsert <- insert wgInsert.Wait() benchmarc.Stop() - log.Infof("Inserted %v %ss in %v seconds", len(insert), ctx.ObjectType, benchmarc.String()) + log.Infof("Inserted %v %ss in %v", len(insert), ctx.ObjectType, benchmarc.String()) }() go func() { @@ -133,7 +114,7 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { chDelete <- delete wgDelete.Wait() benchmarc.Stop() - log.Infof("Deleted %v %ss in %v seconds", len(delete), ctx.ObjectType, benchmarc.String()) + log.Infof("Deleted %v %ss in %v", len(delete), ctx.ObjectType, benchmarc.String()) }() go func() { @@ -142,7 +123,7 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { chUpdateComp <- update wgUpdate.Wait() benchmarc.Stop() - log.Infof("Updated %v %ss in %v seconds", len(update), ctx.ObjectType, benchmarc.String()) + log.Infof("Updated %v %ss in %v", len(update), ctx.ObjectType, benchmarc.String()) }() } } @@ -151,7 +132,7 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { return nil } -func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, chInsert <-chan []string, chInsertBack chan<- []configobject.Row) { +func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chInsert <-chan []string, chInsertBack chan<- []configobject.Row) { defer log.Infof("%s: Insert preparation routine stopped", ctx.ObjectType) prep := func(chunk *icingadb_connection.ConfigChunk) { @@ -159,6 +140,13 @@ func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, chInsert <-cha ChBack: chInsertBack, } for i, key := range chunk.Keys { + select { + case _, ok := <-done: + if !ok { + return + } + } + if chunk.Configs[i] == nil || chunk.Checksums[i] == nil { continue } @@ -176,7 +164,13 @@ func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, chInsert <-cha } for keys := range chInsert { - done := make(chan struct{}) + select { + case _, ok := <-done: + if !ok { + return + } + } + ch := super.Rdbw.PipeConfigChunks(done, keys, ctx.ObjectType) go func() { for chunk := range ch { @@ -186,8 +180,15 @@ func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, chInsert <-cha } } -func InsertExecWorker(super *supervisor.Supervisor, ctx *Context, chInsertBack <-chan []configobject.Row, wg *sync.WaitGroup) { +func InsertExecWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chInsertBack <-chan []configobject.Row, wg *sync.WaitGroup) { for rows := range chInsertBack { + select { + case _, ok := <-done: + if !ok { + return + } + } + go func(rows []configobject.Row) { super.ChErr <- super.Dbw.SqlBulkInsert(rows, ctx.InsertStmt) wg.Add(-len(rows)) @@ -195,8 +196,15 @@ func InsertExecWorker(super *supervisor.Supervisor, ctx *Context, chInsertBack < } } -func DeleteExecWorker(super *supervisor.Supervisor, ctx *Context, chDelete <-chan []string, wg *sync.WaitGroup) { +func DeleteExecWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chDelete <-chan []string, wg *sync.WaitGroup) { for keys := range chDelete { + select { + case _, ok := <-done: + if !ok { + return + } + } + go func(keys []string) { super.ChErr <- super.Dbw.SqlBulkDelete(keys, ctx.DeleteStmt) wg.Add(-len(keys)) @@ -204,10 +212,17 @@ func DeleteExecWorker(super *supervisor.Supervisor, ctx *Context, chDelete <-cha } } -func UpdateCompWorker(super *supervisor.Supervisor, ctx *Context, chUpdate <-chan []string, chUpdateBack chan<- []string, wg *sync.WaitGroup) { +func UpdateCompWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chUpdate <-chan []string, chUpdateBack chan<- []string, wg *sync.WaitGroup) { prep := func(chunk *icingadb_connection.ChecksumChunk, mysqlChecksums map[string]map[string]string) { changed := make([]string, 0) for i, key := range chunk.Keys { + select { + case _, ok := <-done: + if !ok { + return + } + } + if chunk.Checksums[i] == nil { continue } @@ -229,7 +244,13 @@ func UpdateCompWorker(super *supervisor.Supervisor, ctx *Context, chUpdate <-cha } for keys := range chUpdate { - done := make(chan struct{}) + select { + case _, ok := <-done: + if !ok { + return + } + } + ch := super.Rdbw.PipeChecksumChunks(done, keys, ctx.ObjectType) checksums, err := super.Dbw.SqlFetchChecksums(ctx.ObjectType, keys) if err != nil { @@ -238,18 +259,32 @@ func UpdateCompWorker(super *supervisor.Supervisor, ctx *Context, chUpdate <-cha go func() { for chunk := range ch { + select { + case _, ok := <-done: + if !ok { + return + } + } + go prep(chunk, checksums) } }() } } -func UpdatePrepWorker(super *supervisor.Supervisor, ctx *Context, chUpdate <-chan []string, chUpdateBack chan<- []configobject.Row) { +func UpdatePrepWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chUpdate <-chan []string, chUpdateBack chan<- []configobject.Row) { prep := func(chunk *icingadb_connection.ConfigChunk) { pkgs := icingadb_json_decoder.JsonDecodePackages{ ChBack: chUpdateBack, } for i, key := range chunk.Keys { + select { + case _, ok := <-done: + if !ok { + return + } + } + if chunk.Configs[i] == nil || chunk.Checksums[i] == nil { continue } @@ -267,18 +302,38 @@ func UpdatePrepWorker(super *supervisor.Supervisor, ctx *Context, chUpdate <-cha } for keys := range chUpdate { - done := make(chan struct{}) + select { + case _, ok := <-done: + if !ok { + return + } + } + ch := super.Rdbw.PipeConfigChunks(done, keys, ctx.ObjectType) go func() { for chunk := range ch { + select { + case _, ok := <-done: + if !ok { + return + } + } + go prep(chunk) } }() } } -func UpdateExecWorker(super *supervisor.Supervisor, ctx *Context, chUpdateBack <-chan []configobject.Row, wg *sync.WaitGroup) { +func UpdateExecWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chUpdateBack <-chan []configobject.Row, wg *sync.WaitGroup) { for rows := range chUpdateBack { + select { + case _, ok := <-done: + if !ok { + return + } + } + go func(rows []configobject.Row) { super.ChErr <- super.Dbw.SqlBulkUpdate(rows, ctx.UpdateStmt) wg.Add(-len(rows))