Add comments to Operator

This commit is contained in:
Noah Hilverling 2019-03-21 13:15:49 +01:00
parent 68ca27e54e
commit 0b0dc660ac

View file

@ -15,6 +15,7 @@ import (
"sync/atomic"
)
// Context provides an Operator with all necessary information to sync a config type
type Context struct {
ObjectType string
Factory configobject.RowFactory
@ -38,12 +39,26 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", ctx.ObjectType, len(insert), len(update), len(delete))
var (
// If this IcingaDB-Instance looses responsibility, this channel will be
// closed, resulting in a shutdown of all underlying workers
done chan struct{}
// Used by this Operator to provide the InsertPrepWorker with IDs to insert
// Operator -> InsertPrepWorker
chInsert chan []string
// Used by the JsonDecodePool to provide the InsertExecWorker with decoded rows, ready to be inserted
// JsonDecodePool -> InsertExecWorker
chInsertBack chan []configobject.Row
// Used by this Operator to provide the DeleteExecWorker with IDs to delete
// Operator -> DeleteExecWorker
chDelete chan []string
// Used by this Operator to provide the UpdateCompWorker with IDs to compare
// Operator -> UpdateCompWorker
chUpdateComp chan []string
// Used by the UpdateCompWorker to provide the UpdatePrepWorker with IDs that have to be updated
// UpdateCompWorker -> UpdatePrepWorker
chUpdate chan []string
// Used by the JsonDecodePool to provide the UpdateExecWorker with decoded rows, ready to be updated
// JsonDecodePool -> UpdateExecWorker
chUpdateBack chan []configobject.Row
wgInsert *sync.WaitGroup
wgDelete *sync.WaitGroup
@ -66,6 +81,7 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
insert, update, delete = GetDelta(super, ctx)
log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", ctx.ObjectType, len(insert), len(update), len(delete))
// Clean up all channels and wait groups for a fresh config dump
done = make(chan struct{})
chInsert = make(chan []string)
chInsertBack = make(chan []configobject.Row)
@ -91,8 +107,13 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
go func() {
benchmarc := benchmark.NewBenchmark()
wgInsert.Add(len(insert))
// Provide the InsertPrepWorker with IDs to insert
chInsert <- insert
// Wait for all IDs to be inserted into MySQL
wgInsert.Wait()
benchmarc.Stop()
log.WithFields(log.Fields{
"type": ctx.ObjectType,
@ -105,8 +126,13 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
go func() {
benchmarc := benchmark.NewBenchmark()
wgDelete.Add(len(delete))
// Provide the DeleteExecWorker with IDs to delete
chDelete <- delete
// Wait for all IDs to be deleted from MySQL
wgDelete.Wait()
benchmarc.Stop()
log.WithFields(log.Fields{
"type": ctx.ObjectType,
@ -119,8 +145,13 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
go func() {
benchmarc := benchmark.NewBenchmark()
wgUpdate.Add(len(update))
// Provide the UpdateCompWorker with IDs to compare
chUpdateComp <- update
// Wait for all IDs to be update in MySQL
wgUpdate.Wait()
benchmarc.Stop()
log.WithFields(log.Fields{
"type": ctx.ObjectType,