From be5e2cb8c20a41b4bb4b3fa3ed5b48ec2598a6e1 Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Wed, 20 Mar 2019 12:25:47 +0100 Subject: [PATCH] Add GetDelta() --- configobject/sync/sync.go | 73 +++++++++++++++++++++------------------ 1 file changed, 40 insertions(+), 33 deletions(-) diff --git a/configobject/sync/sync.go b/configobject/sync/sync.go index d875f1d6..a15ebab4 100644 --- a/configobject/sync/sync.go +++ b/configobject/sync/sync.go @@ -31,39 +31,7 @@ type Checksums struct { } func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { - var ( - redisIds []string - mysqlIds []string - wg = sync.WaitGroup{} - ) - - //get ids from redis - wg.Add(1) - go func() { - defer wg.Done() - var err error - res, err := super.Rdbw.HKeys(fmt.Sprintf("icinga:config:checksum:%s", ctx.ObjectType)).Result() - if err != nil { - super.ChErr <- err - return - } - redisIds = res - }() - - //get ids from mysql - wg.Add(1) - go func() { - defer wg.Done() - var err error - mysqlIds, err = super.Dbw.SqlFetchIds(ctx.ObjectType) - if err != nil { - super.ChErr <- err - return - } - }() - - wg.Wait() - insert, update, delete := icingadb_utils.Delta(redisIds, mysqlIds) + insert, update, delete := GetDelta(super, ctx) log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", ctx.ObjectType, len(insert), len(update), len(delete)) var ( @@ -89,6 +57,9 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { case icingadb_ha.Notify_IsResponsible: log.Infof("%s: Got responsibility", ctx.ObjectType) + //TODO: This should only be done, if HA was taken over from another instance + insert, update, delete = GetDelta(super, ctx) + done = make(chan struct{}) updateCounter := new(uint32) @@ -134,6 +105,42 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error { return nil } +func GetDelta(super *supervisor.Supervisor, ctx *Context) ([]string, []string, []string) { + var ( + redisIds []string + mysqlIds []string + wg = sync.WaitGroup{} + ) + + //get ids from redis + wg.Add(1) + go func() { + defer wg.Done() + var err error + res, err := super.Rdbw.HKeys(fmt.Sprintf("icinga:config:checksum:%s", ctx.ObjectType)).Result() + if err != nil { + super.ChErr <- err + return + } + redisIds = res + }() + + //get ids from mysql + wg.Add(1) + go func() { + defer wg.Done() + var err error + mysqlIds, err = super.Dbw.SqlFetchIds(ctx.ObjectType) + if err != nil { + super.ChErr <- err + return + } + }() + + wg.Wait() + return icingadb_utils.Delta(redisIds, mysqlIds) +} + func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chInsert <-chan []string, chInsertBack chan<- []configobject.Row) { defer log.Infof("%s: Insert preparation routine stopped", ctx.ObjectType)