Use done channel to indicate HA responsibility loss in workers

This commit is contained in:
Noah Hilverling 2019-03-19 13:03:14 +01:00
parent dd4246fe27
commit fb3dadd352

View file

@ -66,57 +66,38 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", ctx.ObjectType, len(insert), len(update), len(delete))
var (
chInsert chan []string
chInsertBack chan []configobject.Row
chDelete chan []string
chUpdateComp chan []string
chUpdate chan []string
chUpdateBack chan []configobject.Row
wgInsert = &sync.WaitGroup{}
wgDelete = &sync.WaitGroup{}
wgUpdate = &sync.WaitGroup{}
done chan struct{}
chInsert = make(chan []string)
chInsertBack = make(chan []configobject.Row)
chDelete = make(chan []string)
chUpdateComp = make(chan []string)
chUpdate = make(chan []string)
chUpdateBack = make(chan []configobject.Row)
wgInsert = &sync.WaitGroup{}
wgDelete = &sync.WaitGroup{}
wgUpdate = &sync.WaitGroup{}
)
go func() {
for msg := range chHA {
switch msg {
case icingadb_ha.Notify_IsNotResponsible:
log.Info(fmt.Sprintf("%s: Lost responsibility", ctx.ObjectType))
if chInsert != nil {
close(chInsert)
}
if chInsertBack != nil {
close(chInsertBack)
}
if chDelete != nil {
close(chDelete)
}
if chUpdateComp != nil {
close(chUpdateComp)
}
if chUpdate != nil {
close(chUpdate)
}
if chUpdateBack != nil {
close(chUpdateBack)
if done != nil {
close(done)
}
case icingadb_ha.Notify_IsResponsible:
log.Infof("%s: Got responsibility", ctx.ObjectType)
chInsert = make(chan []string)
chInsertBack = make(chan []configobject.Row)
chDelete = make(chan []string)
chUpdateComp = make(chan []string)
chUpdate = make(chan []string)
chUpdateBack = make(chan []configobject.Row)
done = make(chan struct{})
go InsertPrepWorker(super, ctx, chInsert, chInsertBack)
go InsertExecWorker(super, ctx, chInsertBack, wgInsert)
go InsertPrepWorker(super, ctx, done, chInsert, chInsertBack)
go InsertExecWorker(super, ctx, done, chInsertBack, wgInsert)
go DeleteExecWorker(super, ctx, chDelete, wgDelete)
go DeleteExecWorker(super, ctx, done, chDelete, wgDelete)
go UpdateCompWorker(super, ctx, chUpdateComp, chUpdate, wgUpdate)
go UpdatePrepWorker(super, ctx, chUpdate, chUpdateBack)
go UpdateExecWorker(super, ctx, chUpdateBack, wgUpdate)
go UpdateCompWorker(super, ctx, done, chUpdateComp, chUpdate, wgUpdate)
go UpdatePrepWorker(super, ctx, done, chUpdate, chUpdateBack)
go UpdateExecWorker(super, ctx, done, chUpdateBack, wgUpdate)
go func() {
benchmarc := benchmark.NewBenchmark()
@ -124,7 +105,7 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
chInsert <- insert
wgInsert.Wait()
benchmarc.Stop()
log.Infof("Inserted %v %ss in %v seconds", len(insert), ctx.ObjectType, benchmarc.String())
log.Infof("Inserted %v %ss in %v", len(insert), ctx.ObjectType, benchmarc.String())
}()
go func() {
@ -133,7 +114,7 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
chDelete <- delete
wgDelete.Wait()
benchmarc.Stop()
log.Infof("Deleted %v %ss in %v seconds", len(delete), ctx.ObjectType, benchmarc.String())
log.Infof("Deleted %v %ss in %v", len(delete), ctx.ObjectType, benchmarc.String())
}()
go func() {
@ -142,7 +123,7 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
chUpdateComp <- update
wgUpdate.Wait()
benchmarc.Stop()
log.Infof("Updated %v %ss in %v seconds", len(update), ctx.ObjectType, benchmarc.String())
log.Infof("Updated %v %ss in %v", len(update), ctx.ObjectType, benchmarc.String())
}()
}
}
@ -151,7 +132,7 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
return nil
}
func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, chInsert <-chan []string, chInsertBack chan<- []configobject.Row) {
func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chInsert <-chan []string, chInsertBack chan<- []configobject.Row) {
defer log.Infof("%s: Insert preparation routine stopped", ctx.ObjectType)
prep := func(chunk *icingadb_connection.ConfigChunk) {
@ -159,6 +140,13 @@ func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, chInsert <-cha
ChBack: chInsertBack,
}
for i, key := range chunk.Keys {
select {
case _, ok := <-done:
if !ok {
return
}
}
if chunk.Configs[i] == nil || chunk.Checksums[i] == nil {
continue
}
@ -176,7 +164,13 @@ func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, chInsert <-cha
}
for keys := range chInsert {
done := make(chan struct{})
select {
case _, ok := <-done:
if !ok {
return
}
}
ch := super.Rdbw.PipeConfigChunks(done, keys, ctx.ObjectType)
go func() {
for chunk := range ch {
@ -186,8 +180,15 @@ func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, chInsert <-cha
}
}
func InsertExecWorker(super *supervisor.Supervisor, ctx *Context, chInsertBack <-chan []configobject.Row, wg *sync.WaitGroup) {
func InsertExecWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chInsertBack <-chan []configobject.Row, wg *sync.WaitGroup) {
for rows := range chInsertBack {
select {
case _, ok := <-done:
if !ok {
return
}
}
go func(rows []configobject.Row) {
super.ChErr <- super.Dbw.SqlBulkInsert(rows, ctx.InsertStmt)
wg.Add(-len(rows))
@ -195,8 +196,15 @@ func InsertExecWorker(super *supervisor.Supervisor, ctx *Context, chInsertBack <
}
}
func DeleteExecWorker(super *supervisor.Supervisor, ctx *Context, chDelete <-chan []string, wg *sync.WaitGroup) {
func DeleteExecWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chDelete <-chan []string, wg *sync.WaitGroup) {
for keys := range chDelete {
select {
case _, ok := <-done:
if !ok {
return
}
}
go func(keys []string) {
super.ChErr <- super.Dbw.SqlBulkDelete(keys, ctx.DeleteStmt)
wg.Add(-len(keys))
@ -204,10 +212,17 @@ func DeleteExecWorker(super *supervisor.Supervisor, ctx *Context, chDelete <-cha
}
}
func UpdateCompWorker(super *supervisor.Supervisor, ctx *Context, chUpdate <-chan []string, chUpdateBack chan<- []string, wg *sync.WaitGroup) {
func UpdateCompWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chUpdate <-chan []string, chUpdateBack chan<- []string, wg *sync.WaitGroup) {
prep := func(chunk *icingadb_connection.ChecksumChunk, mysqlChecksums map[string]map[string]string) {
changed := make([]string, 0)
for i, key := range chunk.Keys {
select {
case _, ok := <-done:
if !ok {
return
}
}
if chunk.Checksums[i] == nil {
continue
}
@ -229,7 +244,13 @@ func UpdateCompWorker(super *supervisor.Supervisor, ctx *Context, chUpdate <-cha
}
for keys := range chUpdate {
done := make(chan struct{})
select {
case _, ok := <-done:
if !ok {
return
}
}
ch := super.Rdbw.PipeChecksumChunks(done, keys, ctx.ObjectType)
checksums, err := super.Dbw.SqlFetchChecksums(ctx.ObjectType, keys)
if err != nil {
@ -238,18 +259,32 @@ func UpdateCompWorker(super *supervisor.Supervisor, ctx *Context, chUpdate <-cha
go func() {
for chunk := range ch {
select {
case _, ok := <-done:
if !ok {
return
}
}
go prep(chunk, checksums)
}
}()
}
}
func UpdatePrepWorker(super *supervisor.Supervisor, ctx *Context, chUpdate <-chan []string, chUpdateBack chan<- []configobject.Row) {
func UpdatePrepWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chUpdate <-chan []string, chUpdateBack chan<- []configobject.Row) {
prep := func(chunk *icingadb_connection.ConfigChunk) {
pkgs := icingadb_json_decoder.JsonDecodePackages{
ChBack: chUpdateBack,
}
for i, key := range chunk.Keys {
select {
case _, ok := <-done:
if !ok {
return
}
}
if chunk.Configs[i] == nil || chunk.Checksums[i] == nil {
continue
}
@ -267,18 +302,38 @@ func UpdatePrepWorker(super *supervisor.Supervisor, ctx *Context, chUpdate <-cha
}
for keys := range chUpdate {
done := make(chan struct{})
select {
case _, ok := <-done:
if !ok {
return
}
}
ch := super.Rdbw.PipeConfigChunks(done, keys, ctx.ObjectType)
go func() {
for chunk := range ch {
select {
case _, ok := <-done:
if !ok {
return
}
}
go prep(chunk)
}
}()
}
}
func UpdateExecWorker(super *supervisor.Supervisor, ctx *Context, chUpdateBack <-chan []configobject.Row, wg *sync.WaitGroup) {
func UpdateExecWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chUpdateBack <-chan []configobject.Row, wg *sync.WaitGroup) {
for rows := range chUpdateBack {
select {
case _, ok := <-done:
if !ok {
return
}
}
go func(rows []configobject.Row) {
super.ChErr <- super.Dbw.SqlBulkUpdate(rows, ctx.UpdateStmt)
wg.Add(-len(rows))