ConfigSync: Bulk insert runtime updates

This commit is contained in:
Noah Hilverling 2019-10-29 10:47:38 +01:00
parent 8bd415a958
commit f99f85ddcf

View file

@ -9,10 +9,12 @@ import (
"git.icinga.com/icingadb/icingadb-main/jsondecoder"
"git.icinga.com/icingadb/icingadb-main/supervisor"
"git.icinga.com/icingadb/icingadb-main/utils"
"github.com/go-redis/redis"
log "github.com/sirupsen/logrus"
"regexp"
"sync"
"sync/atomic"
"time"
)
type Checksums struct {
@ -433,6 +435,38 @@ func RuntimeUpdateWorker(super *supervisor.Supervisor, objectInformation *config
super.ChErr <- err
}
currentUpdatePackage := []string{}
currentDeletePackage := []string{}
updateMutex := sync.Mutex{}
deleteMutex := sync.Mutex{}
insertCurrentUpdatePackage := func() {
updateLen := len(currentUpdatePackage)
chUpdate <- currentUpdatePackage
wgUpdate.Add(updateLen)
currentUpdatePackage = []string{}
log.WithFields(log.Fields{
"type": objectInformation.ObjectType,
"action": "runtime insert/update",
}).Infof("Inserting %v %ss on runtime update", updateLen, objectInformation.ObjectType)
}
insertCurrentDeletePackage := func() {
deleteLen := len(currentDeletePackage)
chDelete <- currentDeletePackage
wgDelete.Add(deleteLen)
currentDeletePackage = []string{}
log.WithFields(log.Fields{
"type": objectInformation.ObjectType,
"action": "runtime delete",
}).Infof("Deleting %v %ss on runtime update", deleteLen, objectInformation.ObjectType)
}
ticker1s := time.NewTicker(time.Second)
msgCh := subscription.Channel()
for {
@ -441,33 +475,49 @@ func RuntimeUpdateWorker(super *supervisor.Supervisor, objectInformation *config
if !ok {
return
}
case msg := <-msgCh:
// Split string on last ':'
// host:customvar:050ecceaf1ce87e7d503184135d99f47eda5ee85
// => [host:customvar:050ecceaf1ce87e7d503184135d99f47eda5ee85 host:customvar 050ecceaf1ce87e7d503184135d99f47eda5ee85]
re := regexp.MustCompile(`\A(.*):(.*?)\z`)
data := re.FindStringSubmatch(msg.Payload)
case message := <-msgCh:
go func(msg *redis.Message) {
// Split string on last ':'
// host:customvar:050ecceaf1ce87e7d503184135d99f47eda5ee85
// => [host:customvar:050ecceaf1ce87e7d503184135d99f47eda5ee85 host:customvar 050ecceaf1ce87e7d503184135d99f47eda5ee85]
re := regexp.MustCompile(`\A(.*):(.*?)\z`)
data := re.FindStringSubmatch(msg.Payload)
objectType := data[1]
if objectType == objectInformation.RedisKey {
objectId := data[2]
switch msg.Channel {
case "icinga:config:update":
wgUpdate.Add(1)
chUpdate <- []string{objectId}
log.WithFields(log.Fields{
"type": objectInformation.ObjectType,
"action": "runtime insert/update",
}).Infof("Inserting 1 %v on runtime update (%s)", objectInformation.ObjectType, objectId)
case "icinga:config:delete":
wgDelete.Add(1)
chDelete <- []string{objectId}
log.WithFields(log.Fields{
"type": objectInformation.ObjectType,
"action": "runtime delete",
}).Infof("Deleting 1 %v on runtime update (%s)", objectInformation.ObjectType, objectId)
objectType := data[1]
if objectType == objectInformation.RedisKey {
objectId := data[2]
switch msg.Channel {
case "icinga:config:update":
updateMutex.Lock()
currentUpdatePackage = append(currentUpdatePackage, objectId)
if len(currentUpdatePackage) >= 1000 {
insertCurrentUpdatePackage()
}
updateMutex.Unlock()
case "icinga:config:delete":
deleteMutex.Lock()
currentDeletePackage = append(currentDeletePackage, objectId)
if len(currentDeletePackage) >= 1000 {
insertCurrentDeletePackage()
}
deleteMutex.Unlock()
}
}
}(message)
case <-ticker1s.C:
updateMutex.Lock()
updateLen := len(currentUpdatePackage)
if updateLen > 0 {
insertCurrentUpdatePackage()
}
updateMutex.Unlock()
deleteMutex.Lock()
deleteLen := len(currentDeletePackage)
if deleteLen > 0 {
insertCurrentDeletePackage()
}
deleteMutex.Unlock()
}
}
}