diff --git a/configobject/configobject.go b/configobject/configobject.go index a2c36d21..b38facb0 100644 --- a/configobject/configobject.go +++ b/configobject/configobject.go @@ -5,12 +5,13 @@ import ( ) type ObjectInformation struct { - ObjectType string - RedisKey string - DeltaMySqlField string - HasChecksum bool - Factory connection.RowFactory - BulkInsertStmt *connection.BulkInsertStmt - BulkDeleteStmt *connection.BulkDeleteStmt - BulkUpdateStmt *connection.BulkUpdateStmt + ObjectType string + RedisKey string + DeltaMySqlField string + HasChecksum bool + NotificationListenerType string + Factory connection.RowFactory + BulkInsertStmt *connection.BulkInsertStmt + BulkDeleteStmt *connection.BulkDeleteStmt + BulkUpdateStmt *connection.BulkUpdateStmt } \ No newline at end of file diff --git a/configobject/configsync/configsync.go b/configobject/configsync/configsync.go index ca26fd78..a8d0020e 100644 --- a/configobject/configsync/configsync.go +++ b/configobject/configsync/configsync.go @@ -54,22 +54,27 @@ func Operator(super *supervisor.Supervisor, chHA chan int, objectInformation *co wgDelete *sync.WaitGroup wgUpdate *sync.WaitGroup ) + log.Infof("%s: Ready", objectInformation.ObjectType) for msg := range chHA { switch msg { // Icinga 2 probably died, stop operations and tell all workers to shut down. case ha.Notify_StopSync: - log.Info(fmt.Sprintf("%s: Lost responsibility", objectInformation.ObjectType)) if done != nil { + log.Info(fmt.Sprintf("%s: Lost responsibility", objectInformation.ObjectType)) close(done) done = nil } // Starts up the whole sync process. case ha.Notify_StartSync: + if done != nil { + continue + } + log.Infof("%s: Got responsibility", objectInformation.ObjectType) //TODO: This should only be done, if HA was taken over from another instance insert, update, delete := GetDelta(super, objectInformation) - log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", objectInformation.ObjectType, len(insert), len(update), len(delete)) + //log.Infof("%s - Delta: (Insert: %d, Maybe Update: %d, Delete: %d)", objectInformation.ObjectType, len(insert), len(update), len(delete)) // Clean up all channels and wait groups for a fresh config dump done = make(chan struct{}) diff --git a/ha/ha.go b/ha/ha.go index 68aa1a56..f55890c4 100644 --- a/ha/ha.go +++ b/ha/ha.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "git.icinga.com/icingadb/icingadb-main/supervisor" + "github.com/go-redis/redis" "github.com/google/uuid" log "github.com/sirupsen/logrus" "sync" @@ -18,19 +19,22 @@ const ( ) type HA struct { - isActive bool - icinga2MTime int64 - uid uuid.UUID - super *supervisor.Supervisor - notificationListeners []chan int - notificationListenersMutex sync.Mutex + isActive bool + icinga2MTime int64 + uid uuid.UUID + super *supervisor.Supervisor + notificationListeners map[string][]chan int + notificationListenersMutex sync.Mutex + lastEventId string } func NewHA(super *supervisor.Supervisor) (*HA, error) { var err error ho := HA{ super: super, + notificationListeners: make(map[string][]chan int), notificationListenersMutex: sync.Mutex{}, + lastEventId: "0-0", } if ho.uid, err = uuid.NewRandom(); err != nil { @@ -130,7 +134,6 @@ func (h *HA) Run(chEnv chan *Environment) { } h.isActive = true - h.notifyNotificationListener(Notify_StartSync) } else { haLogger.Info("Other instance is active.") h.isActive = false @@ -163,7 +166,6 @@ func (h *HA) Run(chEnv chan *Environment) { if !h.isActive { haLogger.Info("Icinga 2 sent heartbeat after restart. Taking over.") h.isActive = true - h.notifyNotificationListener(Notify_StartSync) } if err := h.updateOwnInstance(); err != nil { @@ -178,7 +180,6 @@ func (h *HA) Run(chEnv chan *Environment) { h.super.ChErr <- errors.New("failed to update instance") } h.isActive = true - h.notifyNotificationListener(Notify_StartSync) } else { haLogger.Debug("Other instance is active.") } @@ -186,23 +187,67 @@ func (h *HA) Run(chEnv chan *Environment) { case <-timerHA.C: haLogger.Info("Icinga 2 sent no heartbeat for 15 seconds, pronouncing dead.") h.isActive = false - h.notifyNotificationListener(Notify_StopSync) + h.notifyNotificationListener("*", Notify_StopSync) } } } -func (h *HA) RegisterNotificationListener() chan int { +func (h *HA) StartEventListener() { + go func() { + every1s := time.NewTicker(time.Second) + + for { + <-every1s.C + h.runEventListener() + } + }() +} + +func (h *HA) runEventListener() { + if !h.isActive { + return + } + + result := h.super.Rdbw.XRead(&redis.XReadArgs{Streams: []string{"icinga:dump", h.lastEventId}}) + streams, err := result.Result() + if err != nil { + h.super.ChErr <- err + return + } + + events := streams[0].Messages + if len(events) == 0 { + return + } + + for _, event := range events { + h.lastEventId = event.ID + values := event.Values + + if values["state"] == "done" { + h.notifyNotificationListener(values["type"].(string), Notify_StartSync) + } else { + h.notifyNotificationListener(values["type"].(string), Notify_StopSync) + } + } +} + +func (h *HA) RegisterNotificationListener(listenerType string) chan int { ch := make(chan int) h.notificationListenersMutex.Lock() - h.notificationListeners = append(h.notificationListeners, ch) + h.notificationListeners[listenerType] = append(h.notificationListeners[listenerType], ch) h.notificationListenersMutex.Unlock() return ch } -func (h *HA) notifyNotificationListener(msg int) { - for _, c := range h.notificationListeners { - go func(ch chan int) { - ch <- msg - }(c) +func (h *HA) notifyNotificationListener(listenerType string, msg int) { + for t, chs := range h.notificationListeners { + if t == listenerType || listenerType == "*" { + for _, c := range chs { + go func(ch chan int) { + ch <- msg + }(c) + } + } } } diff --git a/main.go b/main.go index e1e53da7..7a4bf974 100644 --- a/main.go +++ b/main.go @@ -108,6 +108,8 @@ func main() { statesync.StartStateSync(&super) + haInstance.StartEventListener() + go prometheus.HandleHttp("0.0.0.0:8080", super.ChErr) for { @@ -187,7 +189,7 @@ func startConfigSyncOperators(super *supervisor.Supervisor, haInstance *ha.HA) { for _, objectInformation := range objectTypes { go func(information *configobject.ObjectInformation) { - super.ChErr <- configsync.Operator(super, haInstance.RegisterNotificationListener(), information) + super.ChErr <- configsync.Operator(super, haInstance.RegisterNotificationListener(information.NotificationListenerType), information) }(objectInformation) } }