Implement runtime updates

This commit is contained in:
Noah Hilverling 2019-09-18 15:29:13 +02:00
parent 35a1080879
commit ca217ebfe0
2 changed files with 64 additions and 13 deletions

View file

@ -10,6 +10,7 @@ import (
"git.icinga.com/icingadb/icingadb-main/supervisor"
"git.icinga.com/icingadb/icingadb-main/utils"
log "github.com/sirupsen/logrus"
"regexp"
"sync"
"sync/atomic"
)
@ -54,7 +55,7 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co
wgDelete *sync.WaitGroup
wgUpdate *sync.WaitGroup
)
log.Infof("%s: Ready", objectInformation.ObjectType)
log.Debugf("%s: Ready", objectInformation.ObjectType)
for msg := range chHA {
switch msg {
// Icinga 2 probably died, stop operations and tell all workers to shut down.
@ -99,6 +100,8 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co
go UpdatePrepWorker(super, objectInformation, done, chUpdate, chUpdateBack)
go UpdateExecWorker(super, objectInformation, done, chUpdateBack, wgUpdate, updateCounter)
go RuntimeUpdateWorker(super, objectInformation, done, chInsert, chDelete, wgInsert, wgDelete)
waitOrKill := func(wg *sync.WaitGroup, done chan struct{}) (kill bool) {
waitDone := make(chan bool)
go func() {
@ -422,3 +425,54 @@ func UpdateExecWorker(super *supervisor.Supervisor, objectInformation *configobj
}(rows)
}
}
func RuntimeUpdateWorker(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation, done chan struct{}, chInsert chan []string, chDelete chan []string, wgInsert *sync.WaitGroup, wgDelete *sync.WaitGroup) {
subscription := super.Rdbw.Subscribe()
defer subscription.Close()
if err := subscription.Subscribe("icinga:config:delete", "icinga:config:update"); err != nil {
super.ChErr <- err
}
for {
msg, err := subscription.ReceiveMessage()
select {
case _, ok := <-done:
if !ok {
return
}
default:
}
if err != nil {
super.ChErr <- err
}
// Split string on last ':'
// host:customvar:050ecceaf1ce87e7d503184135d99f47eda5ee85
// => [host:customvar:050ecceaf1ce87e7d503184135d99f47eda5ee85 host:customvar 050ecceaf1ce87e7d503184135d99f47eda5ee85]
re := regexp.MustCompile(`\A(.*):(.*?)\z`)
data := re.FindStringSubmatch(msg.Payload)
objectType := data[1]
if objectType == objectInformation.RedisKey {
objectId := data[2]
switch msg.Channel {
case "icinga:config:update":
wgInsert.Add(1)
chInsert <- []string{objectId}
log.WithFields(log.Fields{
"type": objectInformation.ObjectType,
"action": "runtime insert/update",
}).Infof("Inserting 1 %v on runtime update", objectInformation.ObjectType)
case "icinga:config:delete":
wgDelete.Add(1)
chDelete <- []string{objectId}
log.WithFields(log.Fields{
"type": objectInformation.ObjectType,
"action": "runtime delete",
}).Infof("Deleting 1 %v on runtime update", objectInformation.ObjectType)
}
}
}
}

View file

@ -27,7 +27,7 @@ func IcingaHeartbeatListener(rdb *connection.RDBWrapper, chEnv chan *Environment
subscription := rdb.Subscribe()
defer subscription.Close()
if err := subscription.Subscribe(
"icinga:config:dump", "icinga:config:delete", "icinga:config:update", "icinga:stats"); err != nil {
"icinga:stats"); err != nil {
return err
}
@ -37,17 +37,14 @@ func IcingaHeartbeatListener(rdb *connection.RDBWrapper, chEnv chan *Environment
return err
}
switch msg.Channel {
case "icinga:stats":
var unJson interface{} = nil
if err = json.Unmarshal([]byte(msg.Payload), &unJson); err != nil {
return err
}
environment := unJson.(map[string]interface{})["IcingaApplication"].(map[string]interface{})["status"].(map[string]interface{})["icingaapplication"].(map[string]interface{})["app"].(map[string]interface{})["environment"].(string)
configDumpInProgress := unJson.(map[string]interface{})["config_dump_in_progress"].(bool)
env := &Environment{Name: environment, ID: Sha1bytes([]byte(environment)), configDumpInProgress: configDumpInProgress}
chEnv <- env
var unJson interface{} = nil
if err = json.Unmarshal([]byte(msg.Payload), &unJson); err != nil {
return err
}
environment := unJson.(map[string]interface{})["IcingaApplication"].(map[string]interface{})["status"].(map[string]interface{})["icingaapplication"].(map[string]interface{})["app"].(map[string]interface{})["environment"].(string)
configDumpInProgress := unJson.(map[string]interface{})["config_dump_in_progress"].(bool)
env := &Environment{Name: environment, ID: Sha1bytes([]byte(environment)), configDumpInProgress: configDumpInProgress}
chEnv <- env
}
}