Merge remote-tracking branch 'ha/master'

This commit is contained in:
Noah Hilverling 2019-05-13 14:59:47 +02:00
commit ad71b52cea
3 changed files with 293 additions and 0 deletions

197
ha/ha.go Normal file
View file

@ -0,0 +1,197 @@
package icingadb_ha
import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"git.icinga.com/icingadb/icingadb-main/supervisor"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"time"
)
const (
Notify_StartSync = iota
Notify_StopSync
)
type HA struct {
isActive bool
icinga2MTime int64
uid uuid.UUID
super *supervisor.Supervisor
notificationListeners []chan int
}
func NewHA(super *supervisor.Supervisor) (*HA, error) {
var err error
ho := HA{
super: super,
}
if ho.uid, err = uuid.NewRandom(); err != nil {
return nil, err
}
return &ho, nil
}
func (h *HA) icinga2HeartBeat() {
h.icinga2MTime = time.Now().Unix()
}
func (h *HA) AreWeActive() bool {
return h.isActive
}
func (h *HA) updateOwnInstance() error {
_, err := h.super.Dbw.SqlExec("update icingadb_instance by id",
fmt.Sprintf("UPDATE icingadb_instance SET heartbeat = %d WHERE id = '%s'", h.icinga2MTime, h.uid[:]))
return err
}
func (h *HA) takeOverInstance() error {
_, err := h.super.Dbw.SqlExec("update icingadb_instance by environment_id",
fmt.Sprintf("UPDATE icingadb_instance SET id = '%s', heartbeat = %d WHERE environment_id = '%s'",
h.uid[:], h.icinga2MTime, h.super.EnvId))
return err
}
func (h *HA) insertInstance() error {
_, err := h.super.Dbw.SqlExec("insert into icingadb_instance",
fmt.Sprintf("INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES ('%s', '%s', %d, 'y')",
h.uid[:], h.super.EnvId, h.icinga2MTime))
return err
}
func (h *HA) getInstance() (uuid.UUID, int64, error) {
rows, err := h.super.Dbw.SqlFetchAll("select id, heartbeat from icingadb_instance where environment_id = ourEnvID",
"SELECT id, heartbeat from icingadb_instance where environment_id = ? LIMIT 1",
h.super.EnvId,
)
if err != nil {
return uuid.UUID{}, 0, err
}
if len(rows) == 0 {
return uuid.UUID{}, 0, nil
}
var theirUUID uuid.UUID
copy(theirUUID[:], rows[0][0].([]byte))
return theirUUID, rows[0][1].(int64), nil
}
func (h *HA) Run(chEnv chan *Environment) {
// Wait for first heartbeat
env := <-chEnv
if env == nil {
log.WithFields(log.Fields{
"context": "HA",
}).Error("Received empty environment.")
h.super.ChErr <- errors.New("received empty environment")
return
}
h.super.EnvId = env.ID
haLogger := log.WithFields(log.Fields{
"context": "HA",
"environment": hex.EncodeToString(h.super.EnvId),
"UUID": h.uid,
})
haLogger.Info("Got initial environment.")
// We have a new UUID with every restart, no use comparing them.
_, beat, err := h.getInstance()
if err != nil {
haLogger.Errorf("Failed to fetch instance: %v", err)
h.super.ChErr <- errors.New("failed to fetch instance")
return
}
if time.Now().Unix()-beat > 15 {
haLogger.Info("Taking over.")
// This means there was no instance row match, insert
if beat == 0 {
err = h.insertInstance()
} else {
err = h.takeOverInstance()
}
if err != nil {
haLogger.Errorf("Failed to insert/update instance: %v", err)
h.super.ChErr <- errors.New("failed to insert/update instance")
return
}
h.isActive = true
h.notifyNotificationListener(Notify_StartSync)
} else {
haLogger.Info("Other instance is active.")
h.isActive = false
}
timerHA := time.NewTimer(time.Second * 15)
for {
select {
case env := <-chEnv:
if bytes.Compare(env.ID, h.super.EnvId) != 0 {
haLogger.Error("Received environment is not the one we expected. Panic.")
h.super.ChErr <- errors.New("received unexpected environment")
}
timerHA.Reset(time.Second * 15)
previous := h.icinga2MTime
h.icinga2HeartBeat()
if h.icinga2MTime-previous < 10 && h.isActive {
err = h.updateOwnInstance()
} else {
they, beat, err := h.getInstance()
if err != nil {
haLogger.Errorf("Failed to fetch instance: %v", err)
h.super.ChErr <- errors.New("failed to fetch instance")
return
}
if they == h.uid {
haLogger.Debug("We are active.")
if err := h.updateOwnInstance(); err != nil {
haLogger.Errorf("Failed to update instance: %v", err)
h.super.ChErr <- errors.New("failed to update instance")
return
}
} else if h.icinga2MTime-beat > 15 {
haLogger.Info("Taking over.")
if err := h.takeOverInstance(); err != nil {
haLogger.Errorf("Failed to update instance: %v", err)
h.super.ChErr <- errors.New("failed to update instance")
}
h.isActive = true
h.notifyNotificationListener(Notify_StartSync)
} else {
haLogger.Debug("Other instance is active.")
}
}
case <-timerHA.C:
haLogger.Info("Icinga 2 sent no heartbeat for 15 seconds, pronouncing dead.")
h.isActive = false
h.notifyNotificationListener(Notify_StopSync)
}
}
}
func (h *HA) RegisterNotificationListener() chan int {
ch := make(chan int)
h.notificationListeners = append(h.notificationListeners, ch)
return ch
}
func (h *HA) notifyNotificationListener(msg int) {
for _, c := range h.notificationListeners {
go func(ch chan int) {
ch <- msg
}(c)
}
}

53
ha/heartbeat.go Normal file
View file

@ -0,0 +1,53 @@
package icingadb_ha
import (
"crypto/sha1"
"encoding/json"
"git.icinga.com/icingadb/icingadb-connection"
log "github.com/sirupsen/logrus"
)
type Environment struct {
ID []byte
Name string
configDumpInProgress bool
}
// Compute SHA1
func Sha1bytes(bytes []byte) []byte {
hash := sha1.New()
hash.Write(bytes)
return hash.Sum(nil)
}
func IcingaEventsBroker(rdb *icingadb_connection.RDBWrapper, chEnv chan *Environment) error {
log.Info("Starting Events broker")
subscription := rdb.Subscribe()
defer subscription.Close()
if err := subscription.Subscribe(
"icinga:config:dump", "icinga:config:delete", "icinga:config:update", "icinga:stats"); err != nil {
return err
}
for {
msg, err := subscription.ReceiveMessage()
if err != nil {
return err
}
switch msg.Channel {
case "icinga:stats":
var unJson interface{} = nil
if err = json.Unmarshal([]byte(msg.Payload), &unJson); err != nil {
return err
}
environment := unJson.(map[string]interface{})["IcingaApplication"].(map[string]interface{})["status"].(map[string]interface{})["icingaapplication"].(map[string]interface{})["app"].(map[string]interface{})["environment"].(string)
configDumpInProgress := unJson.(map[string]interface{})["config_dump_in_progress"].(bool)
env := &Environment{Name: environment, ID: Sha1bytes([]byte(environment)), configDumpInProgress: configDumpInProgress}
chEnv <- env
}
}
}

43
ha/heartbeat_test.go Normal file
View file

@ -0,0 +1,43 @@
package icingadb_ha
import (
"encoding/json"
"git.icinga.com/icingadb/icingadb-connection"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
var icingastate = "{\"IcingaApplication\":" +
"{\"status\": " +
"{\"icingaapplication\":" +
"{\"app\":{" +
"\"environment\": \"\"" +
"}}}}}"
func TestIcingaEventsBroker(t *testing.T) {
rdb, err := icingadb_connection.NewRDBWrapper("127.0.0.1:6379")
if err != nil {
t.Fatal("This test needs a working Redis connection")
}
chEnv := make(chan *Environment)
go func() {
err := IcingaEventsBroker(rdb, chEnv)
assert.NoError(t, err, "redis connection error")
}()
time.Sleep(time.Second * 2)
var uj interface{} = nil
if err := json.Unmarshal([]byte(icingastate), &uj); err != nil {
assert.Nil(t, err)
}
rdb.Rdb.Publish("icinga:stats", icingastate)
env := <-chEnv
assert.NotNil(t, env.ID, "no valid env received")
}