Improve event system

This commit is contained in:
Noah Hilverling 2019-09-16 10:28:00 +02:00
parent 1dbd299e12
commit 8bde33e09e
4 changed files with 81 additions and 28 deletions

View file

@ -5,12 +5,13 @@ import (
)
type ObjectInformation struct {
ObjectType string
RedisKey string
DeltaMySqlField string
HasChecksum bool
Factory connection.RowFactory
BulkInsertStmt *connection.BulkInsertStmt
BulkDeleteStmt *connection.BulkDeleteStmt
BulkUpdateStmt *connection.BulkUpdateStmt
ObjectType string
RedisKey string
DeltaMySqlField string
HasChecksum bool
NotificationListenerType string
Factory connection.RowFactory
BulkInsertStmt *connection.BulkInsertStmt
BulkDeleteStmt *connection.BulkDeleteStmt
BulkUpdateStmt *connection.BulkUpdateStmt
}

View file

@ -54,22 +54,27 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co
wgDelete *sync.WaitGroup
wgUpdate *sync.WaitGroup
)
log.Infof("%s: Ready", objectInformation.ObjectType)
for msg := range chHA {
switch msg {
// Icinga 2 probably died, stop operations and tell all workers to shut down.
case ha.Notify_StopSync:
log.Info(fmt.Sprintf("%s: Lost responsibility", objectInformation.ObjectType))
if done != nil {
log.Info(fmt.Sprintf("%s: Lost responsibility", objectInformation.ObjectType))
close(done)
done = nil
}
// Starts up the whole sync process.
case ha.Notify_StartSync:
if done != nil {
continue
}
log.Infof("%s: Got responsibility", objectInformation.ObjectType)
//TODO: This should only be done, if HA was taken over from another instance
insert, update, delete := GetDelta(super, objectInformation)
log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", objectInformation.ObjectType, len(insert), len(update), len(delete))
//log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", objectInformation.ObjectType, len(insert), len(update), len(delete))
// Clean up all channels and wait groups for a fresh config dump
done = make(chan struct{})

View file

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"git.icinga.com/icingadb/icingadb-main/supervisor"
"github.com/go-redis/redis"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"sync"
@ -18,19 +19,22 @@ const (
)
type HA struct {
isActive bool
icinga2MTime int64
uid uuid.UUID
super *supervisor.Supervisor
notificationListeners []chan int
notificationListenersMutex sync.Mutex
isActive bool
icinga2MTime int64
uid uuid.UUID
super *supervisor.Supervisor
notificationListeners map[string][]chan int
notificationListenersMutex sync.Mutex
lastEventId string
}
func NewHA(super *supervisor.Supervisor) (*HA, error) {
var err error
ho := HA{
super: super,
notificationListeners: make(map[string][]chan int),
notificationListenersMutex: sync.Mutex{},
lastEventId: "0-0",
}
if ho.uid, err = uuid.NewRandom(); err != nil {
@ -130,7 +134,6 @@ func (h *HA) Run(chEnv chan *Environment) {
}
h.isActive = true
h.notifyNotificationListener(Notify_StartSync)
} else {
haLogger.Info("Other instance is active.")
h.isActive = false
@ -163,7 +166,6 @@ func (h *HA) Run(chEnv chan *Environment) {
if !h.isActive {
haLogger.Info("Icinga 2 sent heartbeat after restart. Taking over.")
h.isActive = true
h.notifyNotificationListener(Notify_StartSync)
}
if err := h.updateOwnInstance(); err != nil {
@ -178,7 +180,6 @@ func (h *HA) Run(chEnv chan *Environment) {
h.super.ChErr <- errors.New("failed to update instance")
}
h.isActive = true
h.notifyNotificationListener(Notify_StartSync)
} else {
haLogger.Debug("Other instance is active.")
}
@ -186,23 +187,67 @@ func (h *HA) Run(chEnv chan *Environment) {
case <-timerHA.C:
haLogger.Info("Icinga 2 sent no heartbeat for 15 seconds, pronouncing dead.")
h.isActive = false
h.notifyNotificationListener(Notify_StopSync)
h.notifyNotificationListener("*", Notify_StopSync)
}
}
}
func (h *HA) RegisterNotificationListener() chan int {
func (h *HA) StartEventListener() {
go func() {
every1s := time.NewTicker(time.Second)
for {
<-every1s.C
h.runEventListener()
}
}()
}
func (h *HA) runEventListener() {
if !h.isActive {
return
}
result := h.super.Rdbw.XRead(&redis.XReadArgs{Streams: []string{"icinga:dump", h.lastEventId}})
streams, err := result.Result()
if err != nil {
h.super.ChErr <- err
return
}
events := streams[0].Messages
if len(events) == 0 {
return
}
for _, event := range events {
h.lastEventId = event.ID
values := event.Values
if values["state"] == "done" {
h.notifyNotificationListener(values["type"].(string), Notify_StartSync)
} else {
h.notifyNotificationListener(values["type"].(string), Notify_StopSync)
}
}
}
func (h *HA) RegisterNotificationListener(listenerType string) chan int {
ch := make(chan int)
h.notificationListenersMutex.Lock()
h.notificationListeners = append(h.notificationListeners, ch)
h.notificationListeners[listenerType] = append(h.notificationListeners[listenerType], ch)
h.notificationListenersMutex.Unlock()
return ch
}
func (h *HA) notifyNotificationListener(msg int) {
for _, c := range h.notificationListeners {
go func(ch chan int) {
ch <- msg
}(c)
func (h *HA) notifyNotificationListener(listenerType string, msg int) {
for t, chs := range h.notificationListeners {
if t == listenerType || listenerType == "*" {
for _, c := range chs {
go func(ch chan int) {
ch <- msg
}(c)
}
}
}
}

View file

@ -108,6 +108,8 @@ func main() {
statesync.StartStateSync(&super)
haInstance.StartEventListener()
go prometheus.HandleHttp("0.0.0.0:8080", super.ChErr)
for {
@ -187,7 +189,7 @@ func startConfigSyncOperators(super *supervisor.Supervisor, haInstance *ha.HA) {
for _, objectInformation := range objectTypes {
go func(information *configobject.ObjectInformation) {
super.ChErr <- configsync.Operator(super, haInstance.RegisterNotificationListener(), information)
super.ChErr <- configsync.Operator(super, haInstance.RegisterNotificationListener(information.NotificationListenerType), information)
}(objectInformation)
}
}