Merge pull request #107 from Icinga/feature/runtime-updates-use-one-pub-sub-per-object-type

Runtime updates: Use one pub/sub channel per object type
This commit is contained in:
Alexander Aleksandrovič Klimov 2020-01-07 17:13:37 +01:00 committed by GitHub
commit 71cc2b2c5b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -13,7 +13,6 @@ import (
"github.com/Icinga/icingadb/utils"
"github.com/go-redis/redis"
log "github.com/sirupsen/logrus"
"regexp"
"sync"
"sync/atomic"
"time"
@ -447,7 +446,11 @@ func UpdateExecWorker(super *supervisor.Supervisor, objectInformation *configobj
func RuntimeUpdateWorker(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation, done chan struct{}, chUpdate chan []string, chDelete chan []string, wgUpdate *sync.WaitGroup, wgDelete *sync.WaitGroup) {
subscription := super.Rdbw.Subscribe()
defer subscription.Close()
if err := subscription.Subscribe("icinga:config:delete", "icinga:config:update"); err != nil {
deletePubSubString := "icinga:config:delete:" + objectInformation.RedisKey
updatePubSubString := "icinga:config:update:" + objectInformation.RedisKey
if err := subscription.Subscribe(deletePubSubString, updatePubSubString); err != nil {
super.ChErr <- err
}
@ -492,31 +495,22 @@ func RuntimeUpdateWorker(super *supervisor.Supervisor, objectInformation *config
}
case message := <-msgCh:
go func(msg *redis.Message) {
// Split string on last ':'
// host:customvar:050ecceaf1ce87e7d503184135d99f47eda5ee85
// => [host:customvar:050ecceaf1ce87e7d503184135d99f47eda5ee85 host:customvar 050ecceaf1ce87e7d503184135d99f47eda5ee85]
re := regexp.MustCompile(`\A(.*):(.*?)\z`)
data := re.FindStringSubmatch(msg.Payload)
objectType := data[1]
if objectType == objectInformation.RedisKey {
objectId := data[2]
switch msg.Channel {
case "icinga:config:update":
updateMutex.Lock()
currentUpdatePackage = append(currentUpdatePackage, objectId)
if len(currentUpdatePackage) >= 1000 {
insertCurrentUpdatePackage()
}
updateMutex.Unlock()
case "icinga:config:delete":
deleteMutex.Lock()
currentDeletePackage = append(currentDeletePackage, objectId)
if len(currentDeletePackage) >= 1000 {
insertCurrentDeletePackage()
}
deleteMutex.Unlock()
objectId := msg.Payload
switch msg.Channel {
case updatePubSubString:
updateMutex.Lock()
currentUpdatePackage = append(currentUpdatePackage, objectId)
if len(currentUpdatePackage) >= 1000 {
insertCurrentUpdatePackage()
}
updateMutex.Unlock()
case deletePubSubString:
deleteMutex.Lock()
currentDeletePackage = append(currentDeletePackage, objectId)
if len(currentDeletePackage) >= 1000 {
insertCurrentDeletePackage()
}
deleteMutex.Unlock()
}
}(message)
case <-ticker1s.C: