diff --git a/configobject/sync/sync.go b/configobject/sync/sync.go index 91748819..e788d68f 100644 --- a/configobject/sync/sync.go +++ b/configobject/sync/sync.go @@ -15,6 +15,7 @@ import ( "sync/atomic" ) +// Context provides an Operator with all necessary information to sync a config type type Context struct { ObjectType string Factory configobject.RowFactory @@ -38,12 +39,26 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", ctx.ObjectType, len(insert), len(update), len(delete)) var ( + // If this IcingaDB-Instance looses responsibility, this channel will be + // closed, resulting in a shutdown of all underlying workers done chan struct{} + // Used by this Operator to provide the InsertPrepWorker with IDs to insert + // Operator -> InsertPrepWorker chInsert chan []string + // Used by the JsonDecodePool to provide the InsertExecWorker with decoded rows, ready to be inserted + // JsonDecodePool -> InsertExecWorker chInsertBack chan []configobject.Row + // Used by this Operator to provide the DeleteExecWorker with IDs to delete + // Operator -> DeleteExecWorker chDelete chan []string + // Used by this Operator to provide the UpdateCompWorker with IDs to compare + // Operator -> UpdateCompWorker chUpdateComp chan []string + // Used by the UpdateCompWorker to provide the UpdatePrepWorker with IDs that have to be updated + // UpdateCompWorker -> UpdatePrepWorker chUpdate chan []string + // Used by the JsonDecodePool to provide the UpdateExecWorker with decoded rows, ready to be updated + // JsonDecodePool -> UpdateExecWorker chUpdateBack chan []configobject.Row wgInsert *sync.WaitGroup wgDelete *sync.WaitGroup @@ -66,6 +81,7 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { insert, update, delete = GetDelta(super, ctx) log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", ctx.ObjectType, len(insert), len(update), len(delete)) + // Clean up all channels and wait groups for a fresh config dump done = make(chan struct{}) chInsert = make(chan []string) chInsertBack = make(chan []configobject.Row) @@ -91,8 +107,13 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { go func() { benchmarc := benchmark.NewBenchmark() wgInsert.Add(len(insert)) + + // Provide the InsertPrepWorker with IDs to insert chInsert <- insert + + // Wait for all IDs to be inserted into MySQL wgInsert.Wait() + benchmarc.Stop() log.WithFields(log.Fields{ "type": ctx.ObjectType, @@ -105,8 +126,13 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { go func() { benchmarc := benchmark.NewBenchmark() wgDelete.Add(len(delete)) + + // Provide the DeleteExecWorker with IDs to delete chDelete <- delete + + // Wait for all IDs to be deleted from MySQL wgDelete.Wait() + benchmarc.Stop() log.WithFields(log.Fields{ "type": ctx.ObjectType, @@ -119,8 +145,13 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { go func() { benchmarc := benchmark.NewBenchmark() wgUpdate.Add(len(update)) + + // Provide the UpdateCompWorker with IDs to compare chUpdateComp <- update + + // Wait for all IDs to be update in MySQL wgUpdate.Wait() + benchmarc.Stop() log.WithFields(log.Fields{ "type": ctx.ObjectType,