diff --git a/ha.go b/ha.go index 1d34f23a..1fa3564d 100644 --- a/ha.go +++ b/ha.go @@ -1,460 +1,187 @@ package icingadb_ha import ( - "git.icinga.com/icingadb/icingadb-connection" + "bytes" + "encoding/hex" + "errors" + "fmt" "git.icinga.com/icingadb/icingadb-main/supervisor" "github.com/google/uuid" log "github.com/sirupsen/logrus" - "sync/atomic" "time" ) -const ( - // readyForTakeover says that we aren't responsible, but we could take over. - resp_ReadyForTakeover = iota - // TakeoverNoSync says that we've taken over, but we aren't actually syncing config, yet. - resp_TakeoverNoSync - // TakeoverSync says that we've taken over and are actually syncing config. - resp_TakeoverSync - // stop says that we've taken over and are actually syncing config, but we're going to stop it. - resp_Stop - // notReadyForTakeover says that we aren't responsible and can't take over. - resp_NotReadyForTakeover -) - -const ( - // noAction says that we won't do anything. - action_NoAction = iota - // tryTakeover says that we're going to try to take over. - action_TryTakeover - // doTakeover says that we're going to take over. - action_DoTakeover - // ceaseOperation says that we're going to release our responsibility. - action_CeaseOperation -) - const ( Notify_StartSync = iota Notify_StopSync ) type HA struct { - super *supervisor.Supervisor - ourUUID uuid.UUID - ourEnv *Environment - icinga2MTime int64 - // responsibility tells whether we're responsible for our environment. - responsibility int32 - // responsibleSince tells since when we're responsible for our environment. - responsibleSince time.Time - // runningCriticalOperations counts the currently running critical operations. - runningCriticalOperations uint64 - // lastCriticalOperationEnd tells when the last critical operation finished. - lastCriticalOperationEnd int64 - notificationListeners []chan int + isActive bool + icinga2MTime int64 + uid uuid.UUID + super *supervisor.Supervisor + notificationListeners []chan int } -func NewHA(super *supervisor.Supervisor) HA { - ha := HA{ +func NewHA(super *supervisor.Supervisor) (*HA, error) { + var err error + ho := HA{ super: super, } - return ha -} - -// RunCriticalOperation runs op and manages HA#runningCriticalOperations if we're responsible. -func (h *HA) RunCriticalOperation(op func() error) error { - switch h.getResponsibility() { - case resp_TakeoverSync, resp_Stop: - atomic.AddUint64(&h.runningCriticalOperations, 1) - - err := op() - - atomic.StoreInt64(&h.lastCriticalOperationEnd, time.Now().Unix()) - atomic.AddUint64(&h.runningCriticalOperations, ^uint64(0)) - - return err + if ho.uid, err = uuid.NewRandom(); err != nil { + return nil, err } - return nil + return &ho, nil } -func (h *HA) Icinga2HeartBeat() { - atomic.StoreInt64(&h.icinga2MTime, time.Now().Unix()) +func (h *HA) icinga2HeartBeat() { + h.icinga2MTime = time.Now().Unix() } -func (h *HA) IsResponsible() bool { - return h.getResponsibility() == resp_TakeoverSync +func (h *HA) AreWeActive() bool { + return h.isActive } -func (h *HA) Run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.DBWrapper, chEnv chan *Environment, chErr chan error) { - go cleanUpInstancesAsync(dbw, chErr) +func (h *HA) updateOwnInstance() error { + _, err := h.super.Dbw.SqlExec("update icingadb_instance by id", + fmt.Sprintf("UPDATE icingadb_instance SET heartbeat = %d WHERE id = '%s'", h.icinga2MTime, h.uid[:])) + return err +} - if errRun := h.run(rdb, dbw, chEnv); errRun != nil { - chErr <- errRun - return +func (h *HA) takeOverInstance() error { + _, err := h.super.Dbw.SqlExec("update icingadb_instance by environment_id", + fmt.Sprintf("UPDATE icingadb_instance SET id = '%s', heartbeat = %d WHERE environment_id = '%s'", + h.uid[:], h.icinga2MTime, h.super.EnvId)) + return err +} + +func (h *HA) insertInstance() error { + _, err := h.super.Dbw.SqlExec("insert into icingadb_instance", + fmt.Sprintf("INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES ('%s', '%s', %d, 'y')", + h.uid[:], h.super.EnvId, h.icinga2MTime)) + return err +} + +func (h *HA) getInstance() (uuid.UUID, int64, error) { + rows, err := h.super.Dbw.SqlFetchAll("select id, heartbeat from icingadb_instance where environment_id = ourEnvID", + "SELECT id, heartbeat from icingadb_instance where environment_id = ? LIMIT 1", + h.super.EnvId, + ) + if err != nil { + return uuid.UUID{}, 0, err } -} - -// cleanUpInstancesAsync cleans up icingadb_instance periodically. -func cleanUpInstancesAsync(dbw *icingadb_connection.DBWrapper, chErr chan error) { - every5m := time.NewTicker(5 * time.Minute) - defer every5m.Stop() - - for { - <-every5m.C - - if errCI := cleanUpInstances(dbw); errCI != nil { - chErr <- errCI - } + if len(rows) == 0 { + return uuid.UUID{}, 0, nil } -} - -// cleanUpInstances cleans up icingadb_instance periodically. -func cleanUpInstances(dbw *icingadb_connection.DBWrapper) error { - - log.WithFields(log.Fields{"context": "HA"}).Info("Cleaning up icingadb_instance") - - errTx := dbw.SqlTransaction(true, true, false, func(tx icingadb_connection.DbTransaction) error { - _, errExec := dbw.SqlExecTx( - tx, - "delete from icingadb_instance by heartbeat", - `DELETE FROM icingadb_instance WHERE ? - heartbeat >= 30`, - time.Now().Unix(), - ) - - return errExec - }) - return errTx -} - -func (h *HA) handleResponsibility() (cont bool, nextAction int) { - switch h.getResponsibility() { - case resp_ReadyForTakeover: - if !h.icinga2IsAlive() { - log.WithFields(log.Fields{ - "context": "HA", - "uuid": h.ourUUID.String(), - "env": h.ourEnv.Name, - }).Warn("Icinga 2 detected as not running, stopping.") - - h.setResponsibility(resp_NotReadyForTakeover) - cont = true - } - - nextAction = action_TryTakeover - case resp_TakeoverNoSync: - if !h.icinga2IsAlive() { - log.WithFields(log.Fields{ - "context": "HA", - "uuid": h.ourUUID.String(), - "env": h.ourEnv.Name, - }).Warn("Icinga 2 detected as not running, stopping.") - - h.setResponsibility(resp_Stop) - cont = true - } - - nextAction = action_TryTakeover - case resp_TakeoverSync: - if !h.icinga2IsAlive() { - log.WithFields(log.Fields{ - "context": "HA", - "uuid": h.ourUUID.String(), - "env": h.ourEnv.Name, - }).Warn("Icinga 2 detected as not running, stopping.") - - h.setResponsibility(resp_Stop) - h.notifyNotificationListener(Notify_StopSync) - cont = true - } - - nextAction = action_DoTakeover - case resp_Stop: - if atomic.LoadUint64(&h.runningCriticalOperations) == 0 && time.Now().Unix()-atomic.LoadInt64(&h.lastCriticalOperationEnd) >= 5 { - nextAction = action_CeaseOperation - } else { - nextAction = action_DoTakeover - } - cont = false - case resp_NotReadyForTakeover: - if h.icinga2IsAlive() { - log.WithFields(log.Fields{ - "context": "HA", - "uuid": h.ourUUID.String(), - "env": h.ourEnv.Name, - }).Info("Icinga 2 detected as running again.") - - h.setResponsibility(resp_ReadyForTakeover) - cont = true - } - - nextAction = action_NoAction - } - - return -} - -func (h *HA) run(rdb *icingadb_connection.RDBWrapper, dbw *icingadb_connection.DBWrapper, chEnv chan *Environment) error { - log.WithFields(log.Fields{"context": "HA"}).Info("Waiting for Icinga 2 to tell us its environment") - - var hasEnv bool - h.ourEnv, hasEnv = <-chEnv - if !hasEnv { - return nil - } - - var err error - if h.ourUUID, err = uuid.NewRandom(); err != nil { - return err - } - - log.WithFields(log.Fields{ - "context": "HA", - "uuid": h.ourUUID.String(), - "env": h.ourEnv.ID, - }).Info("Received environment from Icinga 2") - - h.super.EnvLock.Lock() - h.super.EnvId = h.ourEnv.ID - h.super.EnvLock.Unlock() - - everySecond := time.NewTicker(time.Second) - defer everySecond.Stop() var theirUUID uuid.UUID + copy(theirUUID[:], rows[0][0].([]byte)) - // Even if Icinga 2 is offline now, Redis may be filled - h.Icinga2HeartBeat() + return theirUUID, rows[0][1].(int64), nil +} +func (h *HA) Run(chEnv chan *Environment) { + // Wait for first heartbeat + env := <-chEnv + if env == nil { + log.WithFields(log.Fields{ + "context": "HA", + }).Error("Received empty environment.") + h.super.ChErr <- errors.New("received empty environment") + return + } + h.super.EnvId = env.ID + + haLogger := log.WithFields(log.Fields{ + "context": "HA", + "environment": hex.EncodeToString(h.super.EnvId), + "UUID": h.uid, + }) + haLogger.Info("Got initial environment.") + + // We have a new UUID with every restart, no use comparing them. + _, beat, err := h.getInstance() + if err != nil { + haLogger.Errorf("Failed to fetch instance: %v", err) + h.super.ChErr <- errors.New("failed to fetch instance") + return + } + + if time.Now().Unix()-beat > 15 { + haLogger.Info("Taking over.") + + // This means there was no instance row match, insert + if beat == 0 { + err = h.insertInstance() + } else { + err = h.takeOverInstance() + } + + if err != nil { + haLogger.Errorf("Failed to insert/update instance: %v", err) + h.super.ChErr <- errors.New("failed to insert/update instance") + return + } + + h.isActive = true + h.notifyNotificationListener(Notify_StartSync) + } else { + haLogger.Info("Other instance is active.") + h.isActive = false + } + + timerHA := time.NewTimer(time.Second * 15) for { - cont, nextAction := h.handleResponsibility() - if cont { - continue - } - - switch nextAction { - case action_NoAction: - break - case action_TryTakeover, action_DoTakeover: - var justTakenOver bool - - errTx := dbw.SqlTransaction(true, true, true, func(tx icingadb_connection.DbTransaction) error { - { - rows, errFA := dbw.SqlFetchAllTxQuiet( - tx, - "select from icingadb_instance by id", - `SELECT 1 FROM icingadb_instance WHERE id = ?`, - h.ourUUID[:], - ) - if errFA != nil { - return errFA - } - - if len(rows) > 0 { - _, errExec := dbw.SqlExecTxQuiet( - tx, - "update icingadb_instance by id", - `UPDATE icingadb_instance SET environment_id=?, heartbeat=? WHERE id = ?`, - h.ourEnv.ID, - time.Now().Unix(), - h.ourUUID[:], - ) - if errExec != nil { - return errExec - } - } else { - _, errExec := dbw.SqlExecTxQuiet( - tx, - "insert into icingadb_instance", - `INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES (?, ?, ?, ?)`, - h.ourUUID[:], - h.ourEnv.ID, - time.Now().Unix(), - "n", - ) - if errExec != nil { - return errExec - } - } - } - - justTakenOver = false - - rows, errFA := dbw.SqlFetchAllTxQuiet( - tx, - "select from icingadb_instance by environment_id, responsible", - `SELECT id, heartbeat FROM icingadb_instance WHERE environment_id = ? AND responsible = ?`, - h.ourEnv.ID, - "y", - ) - if errFA != nil { - return errFA - } - - if len(rows) > 0 { - copy(theirUUID[:], rows[0][0].([]byte)) - - if theirUUID == h.ourUUID { - justTakenOver = true - } else if time.Now().Unix()-rows[0][1].(int64) >= 10 { - { - _, errExec := dbw.SqlExecTxQuiet( - tx, - "update icingadb_instance by environment_id", - `UPDATE icingadb_instance SET responsible=? WHERE environment_id = ?`, - "n", - h.ourEnv.ID, - ) - if errExec != nil { - return errExec - } - } - - _, errExec := dbw.SqlExecTxQuiet( - tx, - "update icingadb_instance by id", - `UPDATE icingadb_instance SET responsible=? WHERE id = ?`, - "y", - h.ourUUID[:], - ) - if errExec != nil { - return errExec - } - - justTakenOver = true - } - } else { - _, errExec := dbw.SqlExecTxQuiet( - tx, - "update icingadb_instance by id", - `UPDATE icingadb_instance SET responsible=? WHERE id = ?`, - "y", - h.ourUUID[:], - ) - if errExec != nil { - return errExec - } - - justTakenOver = true - } - - return nil - }) - if errTx != nil { - return errTx - } - - if justTakenOver && h.getResponsibility() != resp_Stop { - if h.responsibleSince == (time.Time{}) { - h.responsibleSince = time.Now() - h.setResponsibility(resp_TakeoverNoSync) - } else { - responsibleFor := time.Now().Sub(h.responsibleSince).Seconds() - - if responsibleFor >= 5.0 { - if h.setResponsibility(resp_TakeoverSync) == resp_TakeoverNoSync { - log.WithFields(log.Fields{ - "context": "HA", - "env": h.ourEnv.ID, - "their_uuid": theirUUID.String(), - }).Info("Taking over") - - // TODO: This should not be done here, but on configDumpInProgress changes. - // It's only possible to do it here, because we always lose responsibility during config dump once - if !h.ourEnv.configDumpInProgress { - h.notifyNotificationListener(Notify_StartSync) - } - } - - if _, errRP := rdb.Publish("icingadb:wakeup", h.ourUUID.String()).Result(); errRP != nil { - return errRP - } - } - } - } - - if !justTakenOver { - log.WithFields(log.Fields{ - "context": "HA", - "env": h.ourEnv.ID, - "their_uuid": theirUUID.String(), - }).Info("Other instance is responsible") - } - case action_CeaseOperation: - errTx := dbw.SqlTransaction(true, true, true, func(tx icingadb_connection.DbTransaction) error { - rows, errFA := dbw.SqlFetchAllTxQuiet( - tx, - "select from icingadb_instance by environment_id, responsible, heartbeat", - `SELECT 1 FROM icingadb_instance WHERE environment_id = ? AND responsible = ? AND ? - heartbeat < 10`, - h.ourEnv.ID, - "n", - time.Now().Unix(), - ) - if errFA != nil { - return errFA - } - - if len(rows) > 0 { - _, errExec := dbw.SqlExecTxQuiet( - tx, - "update icingadb_instance", - `UPDATE icingadb_instance SET responsible=? WHERE id = ?`, - "n", - h.ourUUID[:], - ) - - return errExec - } - - return nil - }) - if errTx != nil { - return errTx - } - - log.WithFields(log.Fields{ - "context": "HA", - "env": h.ourEnv.ID, - }).Info("Other instance is responsible. Ceasing operations.") - - h.responsibleSince = time.Time{} - h.setResponsibility(resp_NotReadyForTakeover) - } - select { - case h.ourEnv, hasEnv = <-chEnv: - if !hasEnv { - return nil + case env := <-chEnv: + if bytes.Compare(env.ID, h.super.EnvId) != 0 { + haLogger.Error("Received environment is not the one we expected. Panic.") + h.super.ChErr <- errors.New("received unexpected environment") } - h.super.EnvLock.Lock() - h.super.EnvId = h.ourEnv.ID - h.super.EnvLock.Unlock() + timerHA.Reset(time.Second * 15) + previous := h.icinga2MTime + h.icinga2HeartBeat() - h.Icinga2HeartBeat() - <-everySecond.C - case <-everySecond.C: - break + if h.icinga2MTime-previous < 10 && h.isActive { + err = h.updateOwnInstance() + } else { + they, beat, err := h.getInstance() + if err != nil { + haLogger.Errorf("Failed to fetch instance: %v", err) + h.super.ChErr <- errors.New("failed to fetch instance") + return + } + if they == h.uid { + haLogger.Debug("We are active.") + if err := h.updateOwnInstance(); err != nil { + haLogger.Errorf("Failed to update instance: %v", err) + h.super.ChErr <- errors.New("failed to update instance") + return + } + } else if h.icinga2MTime-beat > 15 { + haLogger.Info("Taking over.") + if err := h.takeOverInstance(); err != nil { + haLogger.Errorf("Failed to update instance: %v", err) + h.super.ChErr <- errors.New("failed to update instance") + } + h.isActive = true + h.notifyNotificationListener(Notify_StartSync) + } else { + haLogger.Debug("Other instance is active.") + } + } + case <-timerHA.C: + haLogger.Info("Icinga 2 sent no heartbeat for 15 seconds, pronouncing dead.") + h.isActive = false + h.notifyNotificationListener(Notify_StopSync) } } } -// icinga2IsAlive returns whether Icinga 2 seems to be running. -func (h *HA) icinga2IsAlive() bool { - return time.Now().Unix()-atomic.LoadInt64(&h.icinga2MTime) < 15 -} - -// getResponsibility gets the responsibility. -func (h *HA) getResponsibility() int { - return int(atomic.LoadInt32(&h.responsibility)) -} - -// setResponsibility sets the responsibility and returns the previous one. -func (h *HA) setResponsibility(r int32) int32 { - return atomic.SwapInt32(&h.responsibility, r) -} - func (h *HA) RegisterNotificationListener() chan int { ch := make(chan int) h.notificationListeners = append(h.notificationListeners, ch) @@ -467,4 +194,4 @@ func (h *HA) notifyNotificationListener(msg int) { ch <- msg }(c) } -} \ No newline at end of file +} diff --git a/ha_test.go b/ha_test.go deleted file mode 100644 index a076b513..00000000 --- a/ha_test.go +++ /dev/null @@ -1,115 +0,0 @@ -package icingadb_ha - -import ( - "git.icinga.com/icingadb/icingadb-connection" - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -var testID, _ = uuid.FromBytes(make([]byte, 16)) -var testEnv = make([]byte, 20) - -func TestHA_setResponsibility(t *testing.T) { - responsibilities := [6]int32{ resp_ReadyForTakeover, resp_TakeoverNoSync, resp_TakeoverSync, resp_Stop, resp_NotReadyForTakeover } - h := new(HA) - - previous := int32(0) - for _,r := range responsibilities { - assert.Equal(t, previous, h.setResponsibility(r), "Should be equal") - previous = r - } -} - -func TestHA_IsResponsible(t *testing.T) { - h := new(HA) - h.setResponsibility(resp_TakeoverSync) - assert.True(t, h.IsResponsible(), "Should be responsible") - h.setResponsibility(resp_TakeoverNoSync) - assert.False(t, h.IsResponsible(), "Should not be responsible") -} - -func TestHA_icinga2IsAlive(t *testing.T) { - h := new(HA) - h.icinga2MTime = time.Now().Unix() - 5 - assert.True(t, h.icinga2IsAlive(), "Should be alive") - h.icinga2MTime = h.icinga2MTime - 15 - assert.False(t, h.icinga2IsAlive(), "Should be dead") -} - -func TestHA_handleResponsibility(t *testing.T) { - h := new(HA) - h.ourEnv = &icingadb_connection.Environment{make([]byte, 20), "test"} - h.setResponsibility(resp_ReadyForTakeover) - var cont bool - var na int - - print(h.icinga2MTime) - - cont, na = h.handleResponsibility() - assert.True(t, cont) - assert.Equal(t, action_TryTakeover, na) - assert.Equal(t, resp_NotReadyForTakeover, h.getResponsibility()) - - //AWAKEN - h.icinga2MTime = time.Now().Unix() - cont, na = h.handleResponsibility() - assert.True(t, cont) - assert.Equal(t, resp_ReadyForTakeover, h.getResponsibility()) - - h.setResponsibility(resp_TakeoverSync) - cont, na = h.handleResponsibility() - assert.False(t, cont) - assert.Equal(t, action_DoTakeover, na) - assert.Equal(t, resp_TakeoverSync, h.getResponsibility()) - - //SLEEP - h.icinga2MTime = 0 - cont, na = h.handleResponsibility() - assert.True(t, cont) - assert.Equal(t, na, action_DoTakeover) -} - -func Test_cleanUpInstances(t *testing.T) { - var dbw, err = icingadb_connection.NewDBWrapper( - "module-dev:icinga0815!@tcp(127.0.0.1:3306)/icingadb?") - assert.NoError(t, err, "SQL error") - - _, err = dbw.SqlExec( - "insert into icingadb_instance", - `INSERT INTO icingadb_instance(id, environment_id, heartbeat, responsible) VALUES (?, ?, ?, ?)`, - testID[:], - testEnv, - time.Now().Unix() - 25, - "n", - ) - - assert.NoError(t, err, "SQL error") - - rows, err := dbw.SqlFetchAll("", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:]) - - assert.Equal(t, 1, len(rows)) - - err = cleanUpInstances(dbw) - - assert.NoError(t, err, "Clean up failed") - - rows, err = dbw.SqlFetchAll("", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:]) - - assert.NoError(t, err, "SQL error") - - assert.Equal(t, 1, len(rows)) - - time.Sleep(time.Second * 5) - - err = cleanUpInstances(dbw) - - assert.NoError(t, err, "Clean up failed") - - rows, err = dbw.SqlFetchAll("", "SELECT 1 FROM icingadb_instance WHERE id = ?", testID[:]) - - assert.NoError(t, err, "SQL error") - - assert.Equal(t, 0, len(rows)) -} \ No newline at end of file diff --git a/heartbeat_test.go b/heartbeat_test.go index 616d24e8..402c0a9d 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -21,7 +21,7 @@ func TestIcingaEventsBroker(t *testing.T) { t.Fatal("This test needs a working Redis connection") } - chEnv := make(chan *icingadb_connection.Environment) + chEnv := make(chan *Environment) go func() { err := IcingaEventsBroker(rdb, chEnv)