Merge branch 'feature/new-ha' into 'master'

Feature/new ha

See merge request icingadb/icingadb-ha!1
This commit is contained in:
Noah Hilverling 2019-05-13 13:29:11 +02:00
commit 29dcd99098
3 changed files with 142 additions and 530 deletions

555
ha.go
View file

@ -1,460 +1,187 @@
package icingadb_ha
import (
"git.icinga.com/icingadb/icingadb-connection"
"bytes"
"encoding/hex"
"errors"
"fmt"
"git.icinga.com/icingadb/icingadb-main/supervisor"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"sync/atomic"
"time"
)
const (
// readyForTakeover says that we aren't responsible, but we could take over.
resp_ReadyForTakeover = iota
// TakeoverNoSync says that we've taken over, but we aren't actually syncing config, yet.
resp_TakeoverNoSync
// TakeoverSync says that we've taken over and are actually syncing config.
resp_TakeoverSync
// stop says that we've taken over and are actually syncing config, but we're going to stop it.
resp_Stop
// notReadyForTakeover says that we aren't responsible and can't take over.
resp_NotReadyForTakeover
)
const (
// noAction says that we won't do anything.
action_NoAction = iota
// tryTakeover says that we're going to try to take over.
action_TryTakeover
// doTakeover says that we're going to take over.
action_DoTakeover
// ceaseOperation says that we're going to release our responsibility.
action_CeaseOperation
)
const (
Notify_StartSync = iota
Notify_StopSync
)
type HA struct {
super *supervisor.Supervisor
ourUUID uuid.UUID
ourEnv *Environment
icinga2MTime int64
// responsibility tells whether we're responsible for our environment.
responsibility int32
// responsibleSince tells since when we're responsible for our environment.
responsibleSince time.Time
// runningCriticalOperations counts the currently running critical operations.
runningCriticalOperations uint64
// lastCriticalOperationEnd tells when the last critical operation finished.
lastCriticalOperationEnd int64
notificationListeners []chan int
isActive bool
icinga2MTime int64
uid uuid.UUID
super *supervisor.Supervisor
notificationListeners []chan int
}
func NewHA(super *supervisor.Supervisor) HA {
ha := HA{
func NewHA(super *supervisor.Supervisor) (*HA, error) {
var err error
ho := HA{
super: super,
}
return ha
}
// RunCriticalOperation runs op and manages HA#runningCriticalOperations if we're responsible.
func (h *HA) RunCriticalOperation(op func() error) error {
switch h.getResponsibility() {
case resp_TakeoverSync, resp_Stop:
atomic.AddUint64(&h.runningCriticalOperations, 1)
err := op()
atomic.StoreInt64(&h.lastCriticalOperationEnd, time.Now().Unix())
atomic.AddUint64(&h.runningCriticalOperations, ^uint64(0))
return err
if ho.uid, err = uuid.NewRandom(); err != nil {
return nil, err
}
return nil
return &ho, nil
}
func (h *HA) Icinga2HeartBeat() {
atomic.StoreInt64(&h.icinga2MTime, time.Now().Unix())
func (h *HA) icinga2HeartBeat() {
h.icinga2MTime = time.Now().Unix()
}
func (h *HA) IsResponsible() bool {
return h.getResponsibility() == resp_TakeoverSync
func (h *HA) AreWeActive() bool {
return h.isActive
}
func (h *HA) Run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.DBWrapper, chEnv chan *Environment, chErr chan error) {
go cleanUpInstancesAsync(dbw, chErr)
func (h *HA) updateOwnInstance() error {
_, err := h.super.Dbw.SqlExec("update icingadb_instance by id",
fmt.Sprintf("UPDATE icingadb_instance SET heartbeat = %d WHERE id = '%s'", h.icinga2MTime, h.uid[:]))
return err
}
if errRun := h.run(rdb, dbw, chEnv); errRun != nil {
chErr <- errRun
return
func (h *HA) takeOverInstance() error {
_, err := h.super.Dbw.SqlExec("update icingadb_instance by environment_id",
fmt.Sprintf("UPDATE icingadb_instance SET id = '%s', heartbeat = %d WHERE environment_id = '%s'",
h.uid[:], h.icinga2MTime, h.super.EnvId))
return err
}
func (h *HA) insertInstance() error {
_, err := h.super.Dbw.SqlExec("insert into icingadb_instance",
fmt.Sprintf("INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES ('%s', '%s', %d, 'y')",
h.uid[:], h.super.EnvId, h.icinga2MTime))
return err
}
func (h *HA) getInstance() (uuid.UUID, int64, error) {
rows, err := h.super.Dbw.SqlFetchAll("select id, heartbeat from icingadb_instance where environment_id = ourEnvID",
"SELECT id, heartbeat from icingadb_instance where environment_id = ? LIMIT 1",
h.super.EnvId,
)
if err != nil {
return uuid.UUID{}, 0, err
}
}
// cleanUpInstancesAsync cleans up icingadb_instance periodically.
func cleanUpInstancesAsync(dbw *icingadb_connection.DBWrapper, chErr chan error) {
every5m := time.NewTicker(5 * time.Minute)
defer every5m.Stop()
for {
<-every5m.C
if errCI := cleanUpInstances(dbw); errCI != nil {
chErr <- errCI
}
if len(rows) == 0 {
return uuid.UUID{}, 0, nil
}
}
// cleanUpInstances cleans up icingadb_instance periodically.
func cleanUpInstances(dbw *icingadb_connection.DBWrapper) error {
log.WithFields(log.Fields{"context": "HA"}).Info("Cleaning up icingadb_instance")
errTx := dbw.SqlTransaction(true, true, false, func(tx icingadb_connection.DbTransaction) error {
_, errExec := dbw.SqlExecTx(
tx,
"delete from icingadb_instance by heartbeat",
`DELETE FROM icingadb_instance WHERE ? - heartbeat >= 30`,
time.Now().Unix(),
)
return errExec
})
return errTx
}
func (h *HA) handleResponsibility() (cont bool, nextAction int) {
switch h.getResponsibility() {
case resp_ReadyForTakeover:
if !h.icinga2IsAlive() {
log.WithFields(log.Fields{
"context": "HA",
"uuid": h.ourUUID.String(),
"env": h.ourEnv.Name,
}).Warn("Icinga 2 detected as not running, stopping.")
h.setResponsibility(resp_NotReadyForTakeover)
cont = true
}
nextAction = action_TryTakeover
case resp_TakeoverNoSync:
if !h.icinga2IsAlive() {
log.WithFields(log.Fields{
"context": "HA",
"uuid": h.ourUUID.String(),
"env": h.ourEnv.Name,
}).Warn("Icinga 2 detected as not running, stopping.")
h.setResponsibility(resp_Stop)
cont = true
}
nextAction = action_TryTakeover
case resp_TakeoverSync:
if !h.icinga2IsAlive() {
log.WithFields(log.Fields{
"context": "HA",
"uuid": h.ourUUID.String(),
"env": h.ourEnv.Name,
}).Warn("Icinga 2 detected as not running, stopping.")
h.setResponsibility(resp_Stop)
h.notifyNotificationListener(Notify_StopSync)
cont = true
}
nextAction = action_DoTakeover
case resp_Stop:
if atomic.LoadUint64(&h.runningCriticalOperations) == 0 && time.Now().Unix()-atomic.LoadInt64(&h.lastCriticalOperationEnd) >= 5 {
nextAction = action_CeaseOperation
} else {
nextAction = action_DoTakeover
}
cont = false
case resp_NotReadyForTakeover:
if h.icinga2IsAlive() {
log.WithFields(log.Fields{
"context": "HA",
"uuid": h.ourUUID.String(),
"env": h.ourEnv.Name,
}).Info("Icinga 2 detected as running again.")
h.setResponsibility(resp_ReadyForTakeover)
cont = true
}
nextAction = action_NoAction
}
return
}
func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.DBWrapper, chEnv chan *Environment) error {
log.WithFields(log.Fields{"context": "HA"}).Info("Waiting for Icinga 2 to tell us its environment")
var hasEnv bool
h.ourEnv, hasEnv = <-chEnv
if !hasEnv {
return nil
}
var err error
if h.ourUUID, err = uuid.NewRandom(); err != nil {
return err
}
log.WithFields(log.Fields{
"context": "HA",
"uuid": h.ourUUID.String(),
"env": h.ourEnv.ID,
}).Info("Received environment from Icinga 2")
h.super.EnvLock.Lock()
h.super.EnvId = h.ourEnv.ID
h.super.EnvLock.Unlock()
everySecond := time.NewTicker(time.Second)
defer everySecond.Stop()
var theirUUID uuid.UUID
copy(theirUUID[:], rows[0][0].([]byte))
// Even if Icinga 2 is offline now, Redis may be filled
h.Icinga2HeartBeat()
return theirUUID, rows[0][1].(int64), nil
}
func (h *HA) Run(chEnv chan *Environment) {
// Wait for first heartbeat
env := <-chEnv
if env == nil {
log.WithFields(log.Fields{
"context": "HA",
}).Error("Received empty environment.")
h.super.ChErr <- errors.New("received empty environment")
return
}
h.super.EnvId = env.ID
haLogger := log.WithFields(log.Fields{
"context": "HA",
"environment": hex.EncodeToString(h.super.EnvId),
"UUID": h.uid,
})
haLogger.Info("Got initial environment.")
// We have a new UUID with every restart, no use comparing them.
_, beat, err := h.getInstance()
if err != nil {
haLogger.Errorf("Failed to fetch instance: %v", err)
h.super.ChErr <- errors.New("failed to fetch instance")
return
}
if time.Now().Unix()-beat > 15 {
haLogger.Info("Taking over.")
// This means there was no instance row match, insert
if beat == 0 {
err = h.insertInstance()
} else {
err = h.takeOverInstance()
}
if err != nil {
haLogger.Errorf("Failed to insert/update instance: %v", err)
h.super.ChErr <- errors.New("failed to insert/update instance")
return
}
h.isActive = true
h.notifyNotificationListener(Notify_StartSync)
} else {
haLogger.Info("Other instance is active.")
h.isActive = false
}
timerHA := time.NewTimer(time.Second * 15)
for {
cont, nextAction := h.handleResponsibility()
if cont {
continue
}
switch nextAction {
case action_NoAction:
break
case action_TryTakeover, action_DoTakeover:
var justTakenOver bool
errTx := dbw.SqlTransaction(true, true, true, func(tx icingadb_connection.DbTransaction) error {
{
rows, errFA := dbw.SqlFetchAllTxQuiet(
tx,
"select from icingadb_instance by id",
`SELECT 1 FROM icingadb_instance WHERE id = ?`,
h.ourUUID[:],
)
if errFA != nil {
return errFA
}
if len(rows) > 0 {
_, errExec := dbw.SqlExecTxQuiet(
tx,
"update icingadb_instance by id",
`UPDATE icingadb_instance SET environment_id=?, heartbeat=? WHERE id = ?`,
h.ourEnv.ID,
time.Now().Unix(),
h.ourUUID[:],
)
if errExec != nil {
return errExec
}
} else {
_, errExec := dbw.SqlExecTxQuiet(
tx,
"insert into icingadb_instance",
`INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES (?, ?, ?, ?)`,
h.ourUUID[:],
h.ourEnv.ID,
time.Now().Unix(),
"n",
)
if errExec != nil {
return errExec
}
}
}
justTakenOver = false
rows, errFA := dbw.SqlFetchAllTxQuiet(
tx,
"select from icingadb_instance by environment_id, responsible",
`SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ?`,
h.ourEnv.ID,
"y",
)
if errFA != nil {
return errFA
}
if len(rows) > 0 {
copy(theirUUID[:], rows[0][0].([]byte))
if theirUUID == h.ourUUID {
justTakenOver = true
} else if time.Now().Unix()-rows[0][1].(int64) >= 10 {
{
_, errExec := dbw.SqlExecTxQuiet(
tx,
"update icingadb_instance by environment_id",
`UPDATE icingadb_instance SET responsible=? WHERE environment_id = ?`,
"n",
h.ourEnv.ID,
)
if errExec != nil {
return errExec
}
}
_, errExec := dbw.SqlExecTxQuiet(
tx,
"update icingadb_instance by id",
`UPDATE icingadb_instance SET responsible=? WHERE id = ?`,
"y",
h.ourUUID[:],
)
if errExec != nil {
return errExec
}
justTakenOver = true
}
} else {
_, errExec := dbw.SqlExecTxQuiet(
tx,
"update icingadb_instance by id",
`UPDATE icingadb_instance SET responsible=? WHERE id = ?`,
"y",
h.ourUUID[:],
)
if errExec != nil {
return errExec
}
justTakenOver = true
}
return nil
})
if errTx != nil {
return errTx
}
if justTakenOver && h.getResponsibility() != resp_Stop {
if h.responsibleSince == (time.Time{}) {
h.responsibleSince = time.Now()
h.setResponsibility(resp_TakeoverNoSync)
} else {
responsibleFor := time.Now().Sub(h.responsibleSince).Seconds()
if responsibleFor >= 5.0 {
if h.setResponsibility(resp_TakeoverSync) == resp_TakeoverNoSync {
log.WithFields(log.Fields{
"context": "HA",
"env": h.ourEnv.ID,
"their_uuid": theirUUID.String(),
}).Info("Taking over")
// TODO: This should not be done here, but on configDumpInProgress changes.
// It's only possible to do it here, because we always lose responsibility during config dump once
if !h.ourEnv.configDumpInProgress {
h.notifyNotificationListener(Notify_StartSync)
}
}
if _, errRP := rdb.Publish("icingadb:wakeup", h.ourUUID.String()).Result(); errRP != nil {
return errRP
}
}
}
}
if !justTakenOver {
log.WithFields(log.Fields{
"context": "HA",
"env": h.ourEnv.ID,
"their_uuid": theirUUID.String(),
}).Info("Other instance is responsible")
}
case action_CeaseOperation:
errTx := dbw.SqlTransaction(true, true, true, func(tx icingadb_connection.DbTransaction) error {
rows, errFA := dbw.SqlFetchAllTxQuiet(
tx,
"select from icingadb_instance by environment_id, responsible, heartbeat",
`SELECT 1 FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND ? - heartbeat < 10`,
h.ourEnv.ID,
"n",
time.Now().Unix(),
)
if errFA != nil {
return errFA
}
if len(rows) > 0 {
_, errExec := dbw.SqlExecTxQuiet(
tx,
"update icingadb_instance",
`UPDATE icingadb_instance SET responsible=? WHERE id = ?`,
"n",
h.ourUUID[:],
)
return errExec
}
return nil
})
if errTx != nil {
return errTx
}
log.WithFields(log.Fields{
"context": "HA",
"env": h.ourEnv.ID,
}).Info("Other instance is responsible. Ceasing operations.")
h.responsibleSince = time.Time{}
h.setResponsibility(resp_NotReadyForTakeover)
}
select {
case h.ourEnv, hasEnv = <-chEnv:
if !hasEnv {
return nil
case env := <-chEnv:
if bytes.Compare(env.ID, h.super.EnvId) != 0 {
haLogger.Error("Received environment is not the one we expected. Panic.")
h.super.ChErr <- errors.New("received unexpected environment")
}
h.super.EnvLock.Lock()
h.super.EnvId = h.ourEnv.ID
h.super.EnvLock.Unlock()
timerHA.Reset(time.Second * 15)
previous := h.icinga2MTime
h.icinga2HeartBeat()
h.Icinga2HeartBeat()
<-everySecond.C
case <-everySecond.C:
break
if h.icinga2MTime-previous < 10 && h.isActive {
err = h.updateOwnInstance()
} else {
they, beat, err := h.getInstance()
if err != nil {
haLogger.Errorf("Failed to fetch instance: %v", err)
h.super.ChErr <- errors.New("failed to fetch instance")
return
}
if they == h.uid {
haLogger.Debug("We are active.")
if err := h.updateOwnInstance(); err != nil {
haLogger.Errorf("Failed to update instance: %v", err)
h.super.ChErr <- errors.New("failed to update instance")
return
}
} else if h.icinga2MTime-beat > 15 {
haLogger.Info("Taking over.")
if err := h.takeOverInstance(); err != nil {
haLogger.Errorf("Failed to update instance: %v", err)
h.super.ChErr <- errors.New("failed to update instance")
}
h.isActive = true
h.notifyNotificationListener(Notify_StartSync)
} else {
haLogger.Debug("Other instance is active.")
}
}
case <-timerHA.C:
haLogger.Info("Icinga 2 sent no heartbeat for 15 seconds, pronouncing dead.")
h.isActive = false
h.notifyNotificationListener(Notify_StopSync)
}
}
}
// icinga2IsAlive returns whether Icinga 2 seems to be running.
func (h *HA) icinga2IsAlive() bool {
return time.Now().Unix()-atomic.LoadInt64(&h.icinga2MTime) < 15
}
// getResponsibility gets the responsibility.
func (h *HA) getResponsibility() int {
return int(atomic.LoadInt32(&h.responsibility))
}
// setResponsibility sets the responsibility and returns the previous one.
func (h *HA) setResponsibility(r int32) int32 {
return atomic.SwapInt32(&h.responsibility, r)
}
func (h *HA) RegisterNotificationListener() chan int {
ch := make(chan int)
h.notificationListeners = append(h.notificationListeners, ch)
@ -467,4 +194,4 @@ func (h *HA) notifyNotificationListener(msg int) {
ch <- msg
}(c)
}
}
}

View file

@ -1,115 +0,0 @@
package icingadb_ha
import (
"git.icinga.com/icingadb/icingadb-connection"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
var testID, _ = uuid.FromBytes(make([]byte, 16))
var testEnv = make([]byte, 20)
func TestHA_setResponsibility(t *testing.T) {
responsibilities := [6]int32{ resp_ReadyForTakeover, resp_TakeoverNoSync, resp_TakeoverSync, resp_Stop, resp_NotReadyForTakeover }
h := new(HA)
previous := int32(0)
for _,r := range responsibilities {
assert.Equal(t, previous, h.setResponsibility(r), "Should be equal")
previous = r
}
}
func TestHA_IsResponsible(t *testing.T) {
h := new(HA)
h.setResponsibility(resp_TakeoverSync)
assert.True(t, h.IsResponsible(), "Should be responsible")
h.setResponsibility(resp_TakeoverNoSync)
assert.False(t, h.IsResponsible(), "Should not be responsible")
}
func TestHA_icinga2IsAlive(t *testing.T) {
h := new(HA)
h.icinga2MTime = time.Now().Unix() - 5
assert.True(t, h.icinga2IsAlive(), "Should be alive")
h.icinga2MTime = h.icinga2MTime - 15
assert.False(t, h.icinga2IsAlive(), "Should be dead")
}
func TestHA_handleResponsibility(t *testing.T) {
h := new(HA)
h.ourEnv = &icingadb_connection.Environment{make([]byte, 20), "test"}
h.setResponsibility(resp_ReadyForTakeover)
var cont bool
var na int
print(h.icinga2MTime)
cont, na = h.handleResponsibility()
assert.True(t, cont)
assert.Equal(t, action_TryTakeover, na)
assert.Equal(t, resp_NotReadyForTakeover, h.getResponsibility())
//AWAKEN
h.icinga2MTime = time.Now().Unix()
cont, na = h.handleResponsibility()
assert.True(t, cont)
assert.Equal(t, resp_ReadyForTakeover, h.getResponsibility())
h.setResponsibility(resp_TakeoverSync)
cont, na = h.handleResponsibility()
assert.False(t, cont)
assert.Equal(t, action_DoTakeover, na)
assert.Equal(t, resp_TakeoverSync, h.getResponsibility())
//SLEEP
h.icinga2MTime = 0
cont, na = h.handleResponsibility()
assert.True(t, cont)
assert.Equal(t, na, action_DoTakeover)
}
func Test_cleanUpInstances(t *testing.T) {
var dbw, err = icingadb_connection.NewDBWrapper(
"module-dev:icinga0815!@tcp(127.0.0.1:3306)/icingadb?")
assert.NoError(t, err, "SQL error")
_, err = dbw.SqlExec(
"insert into icingadb_instance",
`INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES (?, ?, ?, ?)`,
testID[:],
testEnv,
time.Now().Unix() - 25,
"n",
)
assert.NoError(t, err, "SQL error")
rows, err := dbw.SqlFetchAll("", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:])
assert.Equal(t, 1, len(rows))
err = cleanUpInstances(dbw)
assert.NoError(t, err, "Clean up failed")
rows, err = dbw.SqlFetchAll("", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:])
assert.NoError(t, err, "SQL error")
assert.Equal(t, 1, len(rows))
time.Sleep(time.Second * 5)
err = cleanUpInstances(dbw)
assert.NoError(t, err, "Clean up failed")
rows, err = dbw.SqlFetchAll("", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:])
assert.NoError(t, err, "SQL error")
assert.Equal(t, 0, len(rows))
}

View file

@ -21,7 +21,7 @@ func TestIcingaEventsBroker(t *testing.T) {
t.Fatal("This test needs a working Redis connection")
}
chEnv := make(chan *icingadb_connection.Environment)
chEnv := make(chan *Environment)
go func() {
err := IcingaEventsBroker(rdb, chEnv)