Add GetDelta()

This commit is contained in:
Noah Hilverling 2019-03-20 12:25:47 +01:00
parent 88adc4e8fb
commit be5e2cb8c2

View file

@ -31,39 +31,7 @@ type Checksums struct {
}
func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
var (
redisIds []string
mysqlIds []string
wg = sync.WaitGroup{}
)
//get ids from redis
wg.Add(1)
go func() {
defer wg.Done()
var err error
res, err := super.Rdbw.HKeys(fmt.Sprintf("icinga:config:checksum:%s", ctx.ObjectType)).Result()
if err != nil {
super.ChErr <- err
return
}
redisIds = res
}()
//get ids from mysql
wg.Add(1)
go func() {
defer wg.Done()
var err error
mysqlIds, err = super.Dbw.SqlFetchIds(ctx.ObjectType)
if err != nil {
super.ChErr <- err
return
}
}()
wg.Wait()
insert, update, delete := icingadb_utils.Delta(redisIds, mysqlIds)
insert, update, delete := GetDelta(super, ctx)
log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", ctx.ObjectType, len(insert), len(update), len(delete))
var (
@ -89,6 +57,9 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
case icingadb_ha.Notify_IsResponsible:
log.Infof("%s: Got responsibility", ctx.ObjectType)
//TODO: This should only be done, if HA was taken over from another instance
insert, update, delete = GetDelta(super, ctx)
done = make(chan struct{})
updateCounter := new(uint32)
@ -134,6 +105,42 @@ func Operator(super *supervisor.Supervisor, chHA chan int, ctx *Context) error {
return nil
}
func GetDelta(super *supervisor.Supervisor, ctx *Context) ([]string, []string, []string) {
var (
redisIds []string
mysqlIds []string
wg = sync.WaitGroup{}
)
//get ids from redis
wg.Add(1)
go func() {
defer wg.Done()
var err error
res, err := super.Rdbw.HKeys(fmt.Sprintf("icinga:config:checksum:%s", ctx.ObjectType)).Result()
if err != nil {
super.ChErr <- err
return
}
redisIds = res
}()
//get ids from mysql
wg.Add(1)
go func() {
defer wg.Done()
var err error
mysqlIds, err = super.Dbw.SqlFetchIds(ctx.ObjectType)
if err != nil {
super.ChErr <- err
return
}
}()
wg.Wait()
return icingadb_utils.Delta(redisIds, mysqlIds)
}
func InsertPrepWorker(super *supervisor.Supervisor, ctx *Context, done chan struct{}, chInsert <-chan []string, chInsertBack chan<- []configobject.Row) {
defer log.Infof("%s: Insert preparation routine stopped", ctx.ObjectType)