diff --git a/configobject/configsync/configsync.go b/configobject/configsync/configsync.go index 70619352..ee9f1202 100644 --- a/configobject/configsync/configsync.go +++ b/configobject/configsync/configsync.go @@ -9,10 +9,12 @@ import ( "git.icinga.com/icingadb/icingadb-main/jsondecoder" "git.icinga.com/icingadb/icingadb-main/supervisor" "git.icinga.com/icingadb/icingadb-main/utils" + "github.com/go-redis/redis" log "github.com/sirupsen/logrus" "regexp" "sync" "sync/atomic" + "time" ) type Checksums struct { @@ -433,6 +435,38 @@ func RuntimeUpdateWorker(super *supervisor.Supervisor, objectInformation *config super.ChErr <- err } + currentUpdatePackage := []string{} + currentDeletePackage := []string{} + updateMutex := sync.Mutex{} + deleteMutex := sync.Mutex{} + + insertCurrentUpdatePackage := func() { + updateLen := len(currentUpdatePackage) + chUpdate <- currentUpdatePackage + wgUpdate.Add(updateLen) + currentUpdatePackage = []string{} + + log.WithFields(log.Fields{ + "type": objectInformation.ObjectType, + "action": "runtime insert/update", + }).Infof("Inserting %v %ss on runtime update", updateLen, objectInformation.ObjectType) + } + + insertCurrentDeletePackage := func() { + deleteLen := len(currentDeletePackage) + chDelete <- currentDeletePackage + wgDelete.Add(deleteLen) + currentDeletePackage = []string{} + + log.WithFields(log.Fields{ + "type": objectInformation.ObjectType, + "action": "runtime delete", + }).Infof("Deleting %v %ss on runtime update", deleteLen, objectInformation.ObjectType) + + } + + ticker1s := time.NewTicker(time.Second) + msgCh := subscription.Channel() for { @@ -441,33 +475,49 @@ func RuntimeUpdateWorker(super *supervisor.Supervisor, objectInformation *config if !ok { return } - case msg := <-msgCh: - // Split string on last ':' - // host:customvar:050ecceaf1ce87e7d503184135d99f47eda5ee85 - // => [host:customvar:050ecceaf1ce87e7d503184135d99f47eda5ee85 host:customvar 050ecceaf1ce87e7d503184135d99f47eda5ee85] - re := regexp.MustCompile(`\A(.*):(.*?)\z`) - data := re.FindStringSubmatch(msg.Payload) + case message := <-msgCh: + go func(msg *redis.Message) { + // Split string on last ':' + // host:customvar:050ecceaf1ce87e7d503184135d99f47eda5ee85 + // => [host:customvar:050ecceaf1ce87e7d503184135d99f47eda5ee85 host:customvar 050ecceaf1ce87e7d503184135d99f47eda5ee85] + re := regexp.MustCompile(`\A(.*):(.*?)\z`) + data := re.FindStringSubmatch(msg.Payload) - objectType := data[1] - if objectType == objectInformation.RedisKey { - objectId := data[2] - switch msg.Channel { - case "icinga:config:update": - wgUpdate.Add(1) - chUpdate <- []string{objectId} - log.WithFields(log.Fields{ - "type": objectInformation.ObjectType, - "action": "runtime insert/update", - }).Infof("Inserting 1 %v on runtime update (%s)", objectInformation.ObjectType, objectId) - case "icinga:config:delete": - wgDelete.Add(1) - chDelete <- []string{objectId} - log.WithFields(log.Fields{ - "type": objectInformation.ObjectType, - "action": "runtime delete", - }).Infof("Deleting 1 %v on runtime update (%s)", objectInformation.ObjectType, objectId) + objectType := data[1] + if objectType == objectInformation.RedisKey { + objectId := data[2] + switch msg.Channel { + case "icinga:config:update": + updateMutex.Lock() + currentUpdatePackage = append(currentUpdatePackage, objectId) + if len(currentUpdatePackage) >= 1000 { + insertCurrentUpdatePackage() + } + updateMutex.Unlock() + case "icinga:config:delete": + deleteMutex.Lock() + currentDeletePackage = append(currentDeletePackage, objectId) + if len(currentDeletePackage) >= 1000 { + insertCurrentDeletePackage() + } + deleteMutex.Unlock() + } } + }(message) + case <-ticker1s.C: + updateMutex.Lock() + updateLen := len(currentUpdatePackage) + if updateLen > 0 { + insertCurrentUpdatePackage() } + updateMutex.Unlock() + + deleteMutex.Lock() + deleteLen := len(currentDeletePackage) + if deleteLen > 0 { + insertCurrentDeletePackage() + } + deleteMutex.Unlock() } } }