From 7c29a1d885a3c0eb1bddf27817be96ccfda97e52 Mon Sep 17 00:00:00 2001 From: Noah Hilverling Date: Tue, 7 Jan 2020 16:43:14 +0100 Subject: [PATCH] Runtime updates: Use one pub/sub channel per object type --- configobject/configsync/configsync.go | 46 ++++++++++++--------------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/configobject/configsync/configsync.go b/configobject/configsync/configsync.go index f7a49d76..534b251f 100644 --- a/configobject/configsync/configsync.go +++ b/configobject/configsync/configsync.go @@ -13,7 +13,6 @@ import ( "github.com/Icinga/icingadb/utils" "github.com/go-redis/redis" log "github.com/sirupsen/logrus" - "regexp" "sync" "sync/atomic" "time" @@ -447,7 +446,11 @@ func UpdateExecWorker(super *supervisor.Supervisor, objectInformation *configobj func RuntimeUpdateWorker(super *supervisor.Supervisor, objectInformation *configobject.ObjectInformation, done chan struct{}, chUpdate chan []string, chDelete chan []string, wgUpdate *sync.WaitGroup, wgDelete *sync.WaitGroup) { subscription := super.Rdbw.Subscribe() defer subscription.Close() - if err := subscription.Subscribe("icinga:config:delete", "icinga:config:update"); err != nil { + + deletePubSubString := "icinga:config:delete:" + objectInformation.RedisKey + updatePubSubString := "icinga:config:update:" + objectInformation.RedisKey + + if err := subscription.Subscribe(deletePubSubString, updatePubSubString); err != nil { super.ChErr <- err } @@ -492,31 +495,22 @@ func RuntimeUpdateWorker(super *supervisor.Supervisor, objectInformation *config } case message := <-msgCh: go func(msg *redis.Message) { - // Split string on last ':' - // host:customvar:050ecceaf1ce87e7d503184135d99f47eda5ee85 - // => [host:customvar:050ecceaf1ce87e7d503184135d99f47eda5ee85 host:customvar 050ecceaf1ce87e7d503184135d99f47eda5ee85] - re := regexp.MustCompile(`\A(.*):(.*?)\z`) - data := re.FindStringSubmatch(msg.Payload) - - objectType := data[1] - if objectType == objectInformation.RedisKey { - objectId := data[2] - switch msg.Channel { - case "icinga:config:update": - updateMutex.Lock() - currentUpdatePackage = append(currentUpdatePackage, objectId) - if len(currentUpdatePackage) >= 1000 { - insertCurrentUpdatePackage() - } - updateMutex.Unlock() - case "icinga:config:delete": - deleteMutex.Lock() - currentDeletePackage = append(currentDeletePackage, objectId) - if len(currentDeletePackage) >= 1000 { - insertCurrentDeletePackage() - } - deleteMutex.Unlock() + objectId := msg.Payload + switch msg.Channel { + case updatePubSubString: + updateMutex.Lock() + currentUpdatePackage = append(currentUpdatePackage, objectId) + if len(currentUpdatePackage) >= 1000 { + insertCurrentUpdatePackage() } + updateMutex.Unlock() + case deletePubSubString: + deleteMutex.Lock() + currentDeletePackage = append(currentDeletePackage, objectId) + if len(currentDeletePackage) >= 1000 { + insertCurrentDeletePackage() + } + deleteMutex.Unlock() } }(message) case <-ticker1s.C: