diff --git a/configobject/configsync/configsync.go b/configobject/configsync/configsync.go index a8d0020e..58563103 100644 --- a/configobject/configsync/configsync.go +++ b/configobject/configsync/configsync.go @@ -10,6 +10,7 @@ import ( "git.icinga.com/icingadb/icingadb-main/supervisor" "git.icinga.com/icingadb/icingadb-main/utils" log "github.com/sirupsen/logrus" + "regexp" "sync" "sync/atomic" ) @@ -54,7 +55,7 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co wgDelete *sync.WaitGroup wgUpdate *sync.WaitGroup ) - log.Infof("%s: Ready", objectInformation.ObjectType) + log.Debugf("%s: Ready", objectInformation.ObjectType) for msg := range chHA { switch msg { // Icinga 2 probably died, stop operations and tell all workers to shut down. @@ -99,6 +100,8 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co go UpdatePrepWorker(super, objectInformation, done, chUpdate, chUpdateBack) go UpdateExecWorker(super, objectInformation, done, chUpdateBack, wgUpdate, updateCounter) + go RuntimeUpdateWorker(super, objectInformation, done, chInsert, chDelete, wgInsert, wgDelete) + waitOrKill := func(wg *sync.WaitGroup, done chan struct{}) (kill bool) { waitDone := make(chan bool) go func() { @@ -422,3 +425,54 @@ func UpdateExecWorker(super *supervisor.Supervisor, objectInformation *configobj }(rows) } } + +func RuntimeUpdateWorker(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation, done chan struct{}, chInsert chan []string, chDelete chan []string, wgInsert *sync.WaitGroup, wgDelete *sync.WaitGroup) { + subscription := super.Rdbw.Subscribe() + defer subscription.Close() + if err := subscription.Subscribe("icinga:config:delete", "icinga:config:update"); err != nil { + super.ChErr <- err + } + + for { + msg, err := subscription.ReceiveMessage() + + select { + case _, ok := <-done: + if !ok { + return + } + default: + } + + if err != nil { + super.ChErr <- err + } + + // Split string on last ':' + // host:customvar:050ecceaf1ce87e7d503184135d99f47eda5ee85 + // => [host:customvar:050ecceaf1ce87e7d503184135d99f47eda5ee85 host:customvar 050ecceaf1ce87e7d503184135d99f47eda5ee85] + re := regexp.MustCompile(`\A(.*):(.*?)\z`) + data := re.FindStringSubmatch(msg.Payload) + + objectType := data[1] + if objectType == objectInformation.RedisKey { + objectId := data[2] + switch msg.Channel { + case "icinga:config:update": + wgInsert.Add(1) + chInsert <- []string{objectId} + log.WithFields(log.Fields{ + "type": objectInformation.ObjectType, + "action": "runtime insert/update", + }).Infof("Inserting 1 %v on runtime update", objectInformation.ObjectType) + case "icinga:config:delete": + wgDelete.Add(1) + chDelete <- []string{objectId} + log.WithFields(log.Fields{ + "type": objectInformation.ObjectType, + "action": "runtime delete", + }).Infof("Deleting 1 %v on runtime update", objectInformation.ObjectType) + } + } + } +} diff --git a/ha/heartbeat.go b/ha/heartbeat.go index 3b380efb..c5868065 100644 --- a/ha/heartbeat.go +++ b/ha/heartbeat.go @@ -27,7 +27,7 @@ func IcingaHeartbeatListener(rdb *connection.RDBWrapper, chEnv chan *Environment subscription := rdb.Subscribe() defer subscription.Close() if err := subscription.Subscribe( - "icinga:config:dump", "icinga:config:delete", "icinga:config:update", "icinga:stats"); err != nil { + "icinga:stats"); err != nil { return err } @@ -37,17 +37,14 @@ func IcingaHeartbeatListener(rdb *connection.RDBWrapper, chEnv chan *Environment return err } - switch msg.Channel { - case "icinga:stats": - var unJson interface{} = nil - if err = json.Unmarshal([]byte(msg.Payload), &unJson); err != nil { - return err - } - - environment := unJson.(map[string]interface{})["IcingaApplication"].(map[string]interface{})["status"].(map[string]interface{})["icingaapplication"].(map[string]interface{})["app"].(map[string]interface{})["environment"].(string) - configDumpInProgress := unJson.(map[string]interface{})["config_dump_in_progress"].(bool) - env := &Environment{Name: environment, ID: Sha1bytes([]byte(environment)), configDumpInProgress: configDumpInProgress} - chEnv <- env + var unJson interface{} = nil + if err = json.Unmarshal([]byte(msg.Payload), &unJson); err != nil { + return err } + + environment := unJson.(map[string]interface{})["IcingaApplication"].(map[string]interface{})["status"].(map[string]interface{})["icingaapplication"].(map[string]interface{})["app"].(map[string]interface{})["environment"].(string) + configDumpInProgress := unJson.(map[string]interface{})["config_dump_in_progress"].(bool) + env := &Environment{Name: environment, ID: Sha1bytes([]byte(environment)), configDumpInProgress: configDumpInProgress} + chEnv <- env } }